策略梯度方法详解
关键词速览
| 核心概念 | 策略梯度定理 | 基线 | 方差缩减 | Actor-Critic |
|---|---|---|---|---|
| REINFORCE | A3C | 优势函数 | 策略参数化 | 软更新 |
核心关键词表
术语 英文 符号/技术 说明 策略梯度 Policy Gradient 策略参数的梯度方向 策略梯度定理 PGT 策略梯度的理论基础 REINFORCE REINFORCE Monte Carlo PG 基于回报的策略梯度估计 基线 Baseline 缩减方差但不引入偏差 优势函数 Advantage 相对基线的优势 Actor-Critic Actor-Critic AC 结合值函数和策略 A3C Asynchronous AC A3C 异步并行Actor-Critic 策略参数化 Policy Param 策略的函数表示 价值基线 Value Baseline 状态价值函数作为基线 回报估计 Return 累积折扣回报
一、策略梯度方法概述
与基于值函数的方法(如Q学习和DQN)不同,策略梯度方法直接对策略进行参数化优化。这种方法天然适合连续动作空间问题,且具有收敛性更好的理论保证。本章将系统介绍策略梯度方法的理论基础、核心算法和实践技巧。
1.1 为什么需要策略梯度?
策略梯度方法相较于值函数方法具有以下优势:
策略梯度 vs 值函数方法
策略梯度的优势:
- 连续动作空间:自然处理连续动作,无需离散化或argmax操作
- 随机策略:可以学习真正的随机策略,捕获环境不确定性
- 收敛性:在策略空间中优化,局部最优解即为策略的稳定点
- 探索性:策略的随机性天然提供探索,无需额外ε-greedy
值函数方法的局限:
- 连续动作空间需要额外的连续动作处理机制(如NAF、Steering)
- Q值最大化的确定性策略难以处理多峰分布
- 策略震荡可能导致训练不稳定
1.2 策略参数化
策略 由参数向量 定义。对于离散动作,使用softmax函数:
对于连续动作,使用高斯策略:
神经网络参数化时,策略网络输出动作分布(如均值和方差):
class GaussianPolicy(nn.Module):
"""Continuous action Gaussian policy."""
def __init__(self, obs_dim, act_dim, hidden_dim=256):
super().__init__()
self.actor = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
# 均值和标准差输出头
self.mean = nn.Linear(hidden_dim, act_dim)
self.log_std = nn.Linear(hidden_dim, act_dim)
def forward(self, obs):
features = self.actor(obs)
mean = self.mean(features)
log_std = self.log_std(features).clamp(-20, 2)
std = log_std.exp()
return mean, std
def sample(self, obs):
mean, std = self.forward(obs)
normal = torch.distributions.Normal(mean, std)
x_t = normal.rsample() # 重参数化采样
action = torch.tanh(x_t) # Squash to [-1, 1]
return action二、策略梯度定理
策略梯度定理(Policy Gradient Theorem, PGT)是策略梯度方法的理论基石,由Sutton等人于1999年提出。
2.1 目标函数
策略梯度方法的目标是最大化期望累积回报:
其中 是由策略 生成的轨迹。
2.2 策略梯度定理
策略梯度定理的核心断言是:
更精确地,对于状态-动作轨迹:
策略梯度定理的直观理解
定理的核心洞察是:增加导致高回报的动作的概率,减少导致低回报的动作的概率。对数概率梯度 指示了”如何调整参数以增加 “,而 提供了”是否应该增加”的信号。
2.3 证明概要
策略梯度定理的证明基于概率图模型的梯度计算。关键步骤:
- 轨迹概率分解:
- 对数梯度恒等式:
- 期望梯度交换:
三、REINFORCE算法
REINFORCE是最基础的策略梯度算法,由Williams于1992年提出,基于蒙特卡洛回报估计。
3.1 算法推导
使用无折扣回报 作为 的无偏估计:
3.2 REINFORCE完整实现
class REINFORCE:
"""REINFORCE algorithm with baseline."""
def __init__(self, obs_dim, act_dim, hidden_dim=128,
lr=3e-4, gamma=0.99, device='cuda'):
self.gamma = gamma
self.device = device
# 策略网络
self.policy = GaussianPolicy(obs_dim, act_dim, hidden_dim).to(device)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
# 价值网络(可选基线)
self.value_net = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
).to(device)
self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr)
def select_action(self, obs, deterministic=False):
"""Sample action from policy."""
obs_tensor = torch.FloatTensor(obs).to(self.device).unsqueeze(0)
with torch.no_grad():
mean, std = self.policy(obs_tensor)
if deterministic:
action = mean
else:
dist = torch.distributions.Normal(mean, std)
action = dist.sample()
action = torch.tanh(action)
return action.cpu().numpy()[0]
def compute_returns(self, rewards, dones):
"""Compute discounted returns."""
returns = []
discounted = 0
for reward, done in zip(reversed(rewards), reversed(dones)):
discounted = reward + self.gamma * discounted * (1 - done)
returns.insert(0, discounted)
return torch.FloatTensor(returns)
def update(self, obs_batch, actions, rewards, dones, next_obs_batch=None):
"""Perform one gradient update."""
# 计算回报
returns = self.compute_returns(rewards, dones).to(self.device)
# 可选:计算优势函数
with torch.no_grad():
values = self.value_net(torch.FloatTensor(obs_batch).to(self.device)).squeeze()
advantages = returns - values
# 策略梯度更新
obs_tensor = torch.FloatTensor(obs_batch).to(self.device)
actions_tensor = torch.FloatTensor(actions).to(self.device)
mean, std = self.policy(obs_tensor)
dist = torch.distributions.Normal(mean, std)
# 对数概率
log_probs = dist.log_prob(actions_tensor)
# 策略损失(负号因为梯度上升)
policy_loss = -(log_probs * advantages.detach()).mean()
self.optimizer.zero_grad()
policy_loss.backward()
self.optimizer.step()
# 价值网络更新(如果使用基线)
if self.value_net is not None:
value_loss = nn.MSELoss()(values, returns)
self.value_optimizer.zero_grad()
value_loss.backward()
self.value_optimizer.step()
return policy_loss.item(), value_loss.item() if self.value_net else 0
def collect_trajectory(env, agent, max_steps=1000):
"""Collect one trajectory using current policy."""
obs = env.reset()
observations, actions, rewards, dones = [], [], [], []
for step in range(max_steps):
action = agent.select_action(obs)
next_obs, reward, done, _ = env.step(action)
observations.append(obs)
actions.append(action)
rewards.append(reward)
dones.append(done)
obs = next_obs
if done:
break
return observations, actions, rewards, dones
def train_reinforce(env, agent, num_episodes=1000, max_steps=1000):
"""Train REINFORCE agent."""
for episode in range(num_episodes):
# 收集轨迹
obs, actions, rewards, dones = collect_trajectory(env, agent, max_steps)
# 更新策略
policy_loss, value_loss = agent.update(obs, actions, rewards, dones)
if (episode + 1) % 10 == 0:
total_reward = sum(rewards)
print(f"Episode {episode+1}, Reward: {total_reward:.2f}, "
f"Policy Loss: {policy_loss:.4f}")四、基线与方差缩减
4.1 引入基线
策略梯度估计的高方差是核心挑战之一。引入状态相关基线 可以缩减方差而不引入偏差:
无偏性证明:
基线的选择
最优基线是最小化方差的状态价值函数 ,此时 (优势函数)。
4.2 优势函数
优势函数 度量了”在状态 下采取动作 相对于平均水平的优势”。
def compute_gae(rewards, values, dones, gamma=0.99, lam=0.95):
"""
Generalized Advantage Estimation (GAE).
Combines multi-step returns with TD errors.
"""
advantages = []
gae = 0
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_value = 0
else:
next_value = values[t + 1]
delta = rewards[t] + gamma * next_value * (1 - dones[t]) - values[t]
gae = delta + gamma * lam * (1 - dones[t]) * gae
advantages.insert(0, gae)
return torch.FloatTensor(advantages)五、Actor-Critic架构
Actor-Critic将策略梯度(Actor)与值函数近似(Critic)结合,同时获得低方差和高效率。
5.1 算法框架
class ActorCritic:
"""
Actor-Critic with separate policy and value networks.
"""
def __init__(self, obs_dim, act_dim, hidden_dim=256,
pi_lr=3e-4, vf_lr=1e-3, gamma=0.99, lam=0.95):
# Actor (策略)
self.actor = GaussianPolicy(obs_dim, act_dim, hidden_dim)
self.pi_optimizer = optim.Adam(self.actor.parameters(), lr=pi_lr)
# Critic (价值函数)
self.critic = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
self.vf_optimizer = optim.Adam(self.critic.parameters(), lr=vf_lr)
self.gamma = gamma
self.lam = lam
def get_action(self, obs, deterministic=False):
mean, std = self.actor(obs)
if deterministic:
action = mean
else:
dist = torch.distributions.Normal(mean, std)
action = dist.sample()
return torch.tanh(action)
def compute_v(self, obs):
return self.critic(obs)
def update(self, obs_batch, actions, rewards, dones, next_obs_batch):
"""
Perform Actor-Critic update with GAE.
"""
with torch.no_grad():
values = self.critic(torch.FloatTensor(obs_batch)).squeeze()
next_values = self.critic(torch.FloatTensor(next_obs_batch)).squeeze()
next_values = next_values * (1 - torch.FloatTensor(dones))
# GAE
deltas = rewards + self.gamma * next_values - values
advantages = compute_gae(rewards, values.tolist(), dones,
self.gamma, self.lam).to(self.device)
# 回报 = 优势 + 价值基线
returns = advantages + values.detach()
# Critic更新:最小化价值误差
values_pred = self.critic(torch.FloatTensor(obs_batch)).squeeze()
vf_loss = nn.MSELoss()(values_pred, returns)
self.vf_optimizer.zero_grad()
vf_loss.backward()
self.vf_optimizer.step()
# Actor更新:策略梯度
mean, std = self.actor(torch.FloatTensor(obs_batch))
dist = torch.distributions.Normal(mean, std)
log_probs = dist.log_prob(torch.FloatTensor(actions))
# 使用GAE优势
pi_loss = -(log_probs * advantages.detach()).mean()
self.pi_optimizer.zero_grad()
pi_loss.backward()
self.pi_optimizer.step()
return pi_loss.item(), vf_loss.item()5.2 Actor-Critic的优势
| 特性 | REINFORCE | Actor-Critic |
|---|---|---|
| 方差 | 高 | 低(值函数基线) |
| 偏差 | 无 | 可能有(函数逼近) |
| 样本效率 | 低(全轨迹) | 高(单步TD) |
| 计算复杂度 | 低 | 中等 |
六、A3C算法
异步优势Actor-Critic(Asynchronous Advantage Actor-Critic, A3C)由DeepMind的Mnih等人于2016年提出,是深度强化学习的里程碑算法。
6.1 异步训练框架
A3C的核心创新是使用多线程异步训练,每个线程独立与环境交互,定期同步梯度到全局网络。
import threading
import multiprocessing as mp
class A3CAgent:
"""Asynchronous Advantage Actor-Critic."""
def __init__(self, obs_dim, act_dim, global_episode_counter,
lr=3e-4, gamma=0.99, lam=0.95, ent_coef=0.01):
self.obs_dim = obs_dim
self.act_dim = act_dim
self.gamma = gamma
self.lam = lam
self.ent_coef = ent_coef
# 全局网络
self.global_policy = ActorCriticNetwork(obs_dim, act_dim)
self.global_policy.share_memory()
self.optimizer = optim.Adam(self.global_policy.parameters(), lr=lr)
# 全局 episode 计数器
self.global_episode_counter = global_episode_counter
def pull_from_global(self, local_model):
"""同步全局网络参数到本地."""
local_model.load_state_dict(self.global_policy.state_dict())
def push_to_global(self, gradients):
"""推送梯度到全局网络."""
for param, grad in zip(self.global_policy.parameters(), gradients):
param._grad = grad
def worker(self, worker_id, env_name, num_steps=20):
"""单个worker的工作流程."""
env = gym.make(env_name)
local_model = ActorCriticNetwork(self.obs_dim, self.act_dim)
agent = A3CWrapper(local_model)
while self.global_episode_counter.value < 10000:
# 同步全局参数
self.pull_from_global(local_model)
obs = env.reset()
episode_reward = 0
episode_steps = 0
obs_buffer, actions_buffer, rewards_buffer = [], [], []
values_buffer, dones_buffer = [], []
for step in range(num_steps):
action = local_model.get_action(torch.FloatTensor(obs))
next_obs, reward, done, _ = env.step(action.numpy())
obs_buffer.append(obs)
actions_buffer.append(action)
rewards_buffer.append(reward)
values_buffer.append(local_model.get_v(torch.FloatTensor(obs)).item())
dones_buffer.append(done)
episode_reward += reward
episode_steps += 1
obs = next_obs
if done:
break
# 计算优势和回报
next_value = 0 if done else local_model.get_v(torch.FloatTensor(next_obs)).item()
advantages, returns = compute_gae_torch(
rewards_buffer, values_buffer, dones_buffer,
next_value, self.gamma, self.lam
)
# 计算损失并反向传播
pi_loss, v_loss, entropy = self.compute_loss(
obs_buffer, actions_buffer, advantages, returns, local_model
)
total_loss = pi_loss + 0.5 * v_loss - self.ent_coef * entropy
self.optimizer.zero_grad()
total_loss.backward()
torch.nn.utils.clip_grad_norm_(local_model.parameters(), 50)
# 推送梯度
self.push_to_global([p.grad for p in self.global_policy.parameters()])
self.optimizer.step()
if done:
with self.global_episode_counter.get_lock():
self.global_episode_counter.value += 1
if self.global_episode_counter.value % 100 == 0:
print(f"Worker {worker_id}, Episode {self.global_episode_counter.value}")
obs = env.reset()
episode_reward = 06.2 A3C的关键创新
- 异步训练:多线程并行探索,提高样本多样性
- 优势函数:使用 缩减方差
- 多步回报:平衡偏差和方差(默认n=5)
- 熵正则化:鼓励探索,防止策略早熟收敛
A3C调参经验
- 学习率:通常使用较大的值(3e-4),因为异步带来噪声
- 线程数:4-16个,根据CPU核心数调整
- 熵系数:0.01左右,鼓励探索但不过度随机
- 折扣因子:0.99-0.999
七、策略梯度的挑战与解决方案
7.1 收敛性挑战
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 策略崩溃 | 策略方差过小 | 熵正则化 |
| 高方差 | 长轨迹累积 | GAE、基线 |
| 策略振荡 | 梯度估计噪声 | 信任域方法(PPO) |
7.2 方差-偏差权衡
策略梯度面临经典的偏差-方差权衡:
- Monte Carlo估计(REINFORCE):无偏但高方差
- TD估计(Actor-Critic):有偏但低方差
- GAE:可调的偏差-方差平衡
# GAE作为连续统
# λ=0: TD(0),低方差高偏差
# λ=1: Monte Carlo,无偏高方差
advantages = compute_gae(rewards, values, dones, gamma=0.99, lam=0.95)八、数学形式化总结
策略梯度定理
优势函数
GAE(λ)
其中
九、相关文档
参考文献
- Sutton, R. S., & Barto, A. G. (2018). Reinforcement Learning: An Introduction (2nd ed.). MIT Press.
- Williams, R. J. (1992). Simple statistical gradient-following algorithms for connectionist reinforcement learning. Machine Learning, 8(3-4), 229-256.
- Mnih, V., et al. (2016). Asynchronous methods for deep reinforcement learning. ICML, 1928-1937.
- Schulman, J., Levine, S., Abbeel, P., Jordan, M., & Moritz, P. (2015). Trust region policy optimization. ICML, 1889-1897.
- Schulman, J., Wolski, F., Dhariwal, P., Radford, A., & Klimov, O. (2017). Proximal policy optimization algorithms. arXiv:1707.06347.
- Mnih, V., et al. (2014). Recurrent models of visual attention. NIPS, 2204-2212.
策略梯度方法为连续控制问题提供了优雅的解决方案,是现代强化学习的重要支柱。