策略梯度方法详解
关键词速览
| 核心概念 | 策略梯度定理 | 基线 | 方差缩减 | Actor-Critic |
|---|---|---|---|---|
| REINFORCE | A3C | 优势函数 | 策略参数化 | 软更新 |
核心关键词表
术语 英文 符号/技术 说明 策略梯度 Policy Gradient 策略参数的梯度方向 策略梯度定理 PGT 策略梯度的理论基础 REINFORCE REINFORCE Monte Carlo PG 基于回报的策略梯度估计 基线 Baseline 缩减方差但不引入偏差 优势函数 Advantage 相对基线的优势 Actor-Critic Actor-Critic AC 结合值函数和策略 A3C Asynchronous AC A3C 异步并行Actor-Critic 策略参数化 Policy Param 策略的函数表示 价值基线 Value Baseline 状态价值函数作为基线 回报估计 Return 累积折扣回报
一、策略梯度入门:为什么直接从策略入手?
1.1 两种不同的思路
说起强化学习,你可能首先想到的是Q学习、DQN这些名字。它们的套路是:先学习一个”动作价值函数”Q(s,a),告诉我每个状态-动作对有多好,然后用这个Q值来决定在某个状态下该选哪个动作。说白了,就是先”评估”再”决策”。
但策略梯度走的是另一条路。它根本不关心Q值是多少,直接端到端地学习一个策略函数 :看到状态s,输出该选什么动作a。这个策略就是一个神经网络,输入是状态,输出是动作(或者动作的概率分布)。
打个比方的话,Q学习就像是一个经验丰富的教练,他在脑子里记着”在这种局面下,你走这一步,历史平均胜率是68%“,然后每次都挑胜率最高的走。而策略梯度更像是直接训练你的肌肉记忆——看到一个局面,手就自动动了,根本不需要中间那个”胜率评估”环节。
1.2 策略梯度解决了什么问题?
你可能会问,既然Q学习已经能work了,干嘛还要学策略梯度?这就要说到Q学习的几个痛点了。
第一个痛点是连续动作空间。 想象你要控制一个机器人的关节角度,关节角度是一个连续值,从0度到180度之间的任何角度都有可能。Q学习怎么处理这个问题?要么把角度离散化成”0度、30度、60度…”这种离散的档位,但这太粗糙了,丢失了很多细节;要么用一些技巧比如NAF(归一化动作场)来处理,但这些方法要么粗糙要么复杂。
策略梯度就不一样了,它输出的是动作的均值和方差,想输出什么连续值都行,完美解决。
第二个痛点是策略的随机性。 Q学习学出来的策略通常是确定性的——给定状态s,每次都选同一个动作。但在真实环境里,有时候我们需要保持一定的随机性。比如在石头剪刀布游戏里,确定性策略是必输的,只有随机策略才能达到纳什均衡。Q学习想学随机策略挺费劲的,策略梯度就很自然地能学到 这个概率分布。
第三个痛点是探索-利用的平衡。 Q学习通常需要额外的技巧(比如ε-greedy)来保证探索。但策略梯度因为策略本身就是随机的,每一次选择动作都是从分布里采样,天生就带了探索属性。
1.3 策略怎么表示?
策略梯度里的策略 是一个参数化的函数,参数是 。具体怎么表示取决于动作空间是离散的还是连续的。
对于离散动作空间,最常见的是用softmax输出每个动作的概率:
这里的 可以是一个神经网络,输入状态s和动作a,输出一个”分数”,然后用softmax把这些分数转成概率。
对于连续动作空间,通常假设动作服从高斯分布:
也就是说,给定状态s,策略输出动作的均值 和方差 ,然后从正态分布里采样得到实际动作。这样做的好处是,神经网络只需要学习均值和方差,采样是自动完成的。
下面是一个用PyTorch实现高斯策略的完整代码:
import torch
import torch.nn as nn
class GaussianPolicy(nn.Module):
"""连续动作空间的高斯策略网络。"""
def __init__(self, obs_dim, act_dim, hidden_dim=256):
super().__init__()
# 共享的特征提取层
self.actor = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
# 均值输出头
self.mean = nn.Linear(hidden_dim, act_dim)
# 对数标准差输出头
# 用log_std而不是直接输出std,这样可以让std永远是正的
self.log_std = nn.Linear(hidden_dim, act_dim)
def forward(self, obs):
"""前向传播,返回均值和对数标准差。"""
features = self.actor(obs)
mean = self.mean(features)
# clamp限制范围,防止数值爆炸
log_std = self.log_std(features).clamp(-20, 2)
std = log_std.exp()
return mean, std
def sample(self, obs, deterministic=False):
"""
从策略中采样动作。
Args:
obs: 观测张量
deterministic: 是否使用确定性策略(直接返回均值)
Returns:
action: 采样的动作
log_prob: 动作的对数概率(用于策略梯度计算)
"""
mean, std = self.forward(obs)
if deterministic:
return torch.tanh(mean), None
# 创建正态分布
dist = torch.distributions.Normal(mean, std)
# 重参数化采样:让采样过程可导
x_t = dist.rsample()
# 用tanh压缩到[-1, 1]范围
action = torch.tanh(x_t)
# 计算修正的对数概率(因为用了tanh压缩)
# 这个公式是从unbounded正态分布到[-1,1]压缩的概率修正
log_prob = dist.log_prob(x_t)
log_prob -= torch.log(1 - action.pow(2) + 1e-6)
log_prob = log_prob.sum(dim=-1, keepdim=True)
return action, log_prob这段代码里有几个值得注意的地方。一个是用了 rsample() 而不是 sample(),这是重参数化技巧——我们把随机性用到一个独立的噪声变量上,这样梯度就能流过来了。另一个是用tanh把输出压缩到[-1,1]范围,同时需要对数概率做个修正。
二、策略梯度定理:为什么梯度是指南针?
2.1 我们要优化的目标
在策略梯度里,我们的目标是找到一组参数 ,让策略 获得的期望回报最大化。这个期望回报定义为:
这里的 是由策略 生成的一条轨迹, 是这条轨迹的累计回报, 是折扣因子。
换句话说, 就是:如果你用当前的策略 去和环境交互,得到的平均回报是多少?我们当然希望这个平均值越高越好。
现在问题来了:怎么调整 让 变大?
2.2 梯度!梯度!
在数学上,函数增长最快的方向就是它的梯度方向。所以如果我们能算出 ,就知道应该往哪个方向调整参数 能让目标函数变大。然后沿着这个方向走一小步,再重新计算梯度,再走一小步……这就是梯度上升(或者如果我们要最小化,就是梯度下降)。
问题是, 是一个关于 的期望,而 本身又依赖于 ,这让直接求导变得很麻烦。
2.3 策略梯度定理登场
这个时候,策略梯度定理就出场了。它告诉我们一个巧妙的等式:
或者更常用的状态-动作形式:
这个定理的证明基于一个很聪明的恒等式:
你可能会问,这个恒等式哪来的?其实很简单:对 求导,得到 ,移项就得到了。
2.4 直观理解:为什么这个公式是对的?
这个公式的直觉是这样的: 指向”如何调整参数让 变大”的方向。说人话就是:看到这个梯度,就知道往哪个方向改参数能让选择这个动作的概率提高。
则是”裁判”,它告诉我们在状态s下选动作a到底好不好。如果Q值高,说明这个动作是个明智的选择,值得增加它的概率;如果Q值低,说明这个动作不怎么样,应该降低它的概率。
两者相乘,就是:找到那些”概率应该增加”的动作(由对数梯度指示),但只对那些”确实带来高回报”的动作做增强(由Q值加权)。
这就像你玩超级玛丽,记录下每一局游戏的轨迹和最终得分。游戏结束后,你说”这一把我死在了第三个坑,那次跳跃应该少用力”,或者说”这一把我成功吃到了星星然后过了那一关,那个决策是对的,下次还这么做”——策略梯度就是在做类似的事情。
三、REINFORCE算法:最简单的策略梯度
3.1 Monte Carlo的方法
REINFORCE是最基础的策略梯度算法,1992年由Ronald Williams提出。它的核心思想是:用实际采样的轨迹来估计期望回报。
具体来说,策略梯度定理里的 我们通常不知道,因为它需要知道在策略 下从状态s出发选动作a的期望回报。但我们可以用蒙特卡洛方法,用实际采样的回报来近似:
这里的 是第i条轨迹里从时刻t开始的累计折扣回报:
因为用的是实际的轨迹回报,所以这是无偏估计——如果采样足够多,梯度估计的平均值会收敛到真实的策略梯度。
3.2 REINFORCE的完整实现
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
class REINFORCE:
"""REINFORCE算法,带可选的基线。"""
def __init__(self, obs_dim, act_dim, hidden_dim=128,
lr=3e-4, gamma=0.99, device='cuda'):
self.gamma = gamma
self.device = device
# 策略网络
self.policy = GaussianPolicy(obs_dim, act_dim, hidden_dim).to(device)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
# 价值网络(用作基线,减少方差)
self.value_net = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
).to(device)
self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr)
def select_action(self, obs, deterministic=False):
"""
根据当前策略选择动作。
Args:
obs: 环境观测(numpy数组)
deterministic: 如果为True,返回均值(用于评估)
Returns:
action: 选择的动作(numpy数组)
"""
obs_tensor = torch.FloatTensor(obs).to(self.device).unsqueeze(0)
with torch.no_grad():
mean, std = self.policy(obs_tensor)
if deterministic:
action = mean
else:
dist = torch.distributions.Normal(mean, std)
action = dist.sample()
action = torch.tanh(action)
return action.cpu().numpy()[0]
def compute_returns(self, rewards, dones):
"""
计算折扣回报。
Args:
rewards: 奖励列表
dones: done标志列表
Returns:
returns: 折扣回报张量
"""
returns = []
discounted = 0
# 从后往前计算
for reward, done in zip(reversed(rewards), reversed(dones)):
discounted = reward + self.gamma * discounted * (1 - done)
returns.insert(0, discounted)
return torch.FloatTensor(returns)
def update(self, observations, actions, rewards, dones):
"""
执行一次策略梯度更新。
Args:
observations: 观测列表
actions: 动作列表
rewards: 奖励列表
dones: done标志列表
Returns:
policy_loss: 策略损失
value_loss: 价值损失
"""
# 转换为张量
obs_tensor = torch.FloatTensor(np.array(observations)).to(self.device)
actions_tensor = torch.FloatTensor(np.array(actions)).to(self.device)
rewards_tensor = torch.FloatTensor(np.array(rewards)).to(self.device)
dones_tensor = torch.FloatTensor(np.array(dones)).to(self.device)
# 计算折扣回报
returns = self.compute_returns(rewards, dones).to(self.device)
# 计算优势函数(回报减去基线)
with torch.no_grad():
values = self.value_net(obs_tensor).squeeze()
advantages = returns - values
# 策略梯度更新
_, log_probs = self.policy.sample(obs_tensor)
# 策略损失 = -log_prob * advantage
# 负号是因为我们想最大化回报,但PyTorch默认最小化损失
policy_loss = -(log_probs.squeeze() * advantages.detach()).mean()
self.optimizer.zero_grad()
policy_loss.backward()
self.optimizer.step()
# 价值网络更新(最小化价值误差)
value_loss = nn.MSELoss()(values, returns.detach())
self.value_optimizer.zero_grad()
value_loss.backward()
self.value_optimizer.step()
return policy_loss.item(), value_loss.item()
def collect_trajectory(env, agent, max_steps=1000):
"""
用当前策略收集一条轨迹。
Returns:
observations, actions, rewards, dones: 轨迹数据
"""
obs = env.reset()
observations, actions, rewards, dones = [], [], [], []
for step in range(max_steps):
action = agent.select_action(obs)
next_obs, reward, done, _ = env.step(action)
observations.append(obs)
actions.append(action)
rewards.append(reward)
dones.append(done)
obs = next_obs
if done:
break
return observations, actions, rewards, dones
def train_reinforce(env, agent, num_episodes=1000, max_steps=1000, print_every=10):
"""
训练REINFORCE智能体。
"""
for episode in range(num_episodes):
# 收集一条轨迹
obs, actions, rewards, dones = collect_trajectory(env, agent, max_steps)
# 更新策略
policy_loss, value_loss = agent.update(obs, actions, rewards, dones)
if (episode + 1) % print_every == 0:
total_reward = sum(rewards)
print(f"Episode {episode+1}/{num_episodes} | "
f"Reward: {total_reward:.2f} | "
f"Policy Loss: {policy_loss:.4f} | "
f"Value Loss: {value_loss:.4f}")3.3 REINFORCE的问题
REINFORCE简单直观,但它有个致命的弱点:方差太高了。
为什么方差高?因为累计回报 是从时刻t到游戏结束的所有奖励之和。如果游戏很长,或者奖励信号有噪声,这个值的波动就会很大。同一个状态s,可能因为后面发生的事情不同, 差别巨大。
高方差意味着梯度估计不稳定,训练收敛慢。你可能见过强化学习里那种”曲线像心电图一样”的训练过程,这就是方差在作怪。
四、基准函数Baseline:减少方差的简单技巧
4.1 什么是基线?
有一个特别巧妙的技巧可以降低方差,而且完全不会引入偏差。那就是引入一个只依赖于状态的函数 ,从优势函数里减去它:
这个技巧为什么有效?因为:
第二个等号成立是因为对数概率的梯度在概率分布上的期望为0,这是一个数学性质。换句话说,无论 是什么(只要它不依赖于动作a),从期望里减掉它都不会改变梯度。
4.2 怎么选基线?
虽然任意基线都不会引入偏差,但不同的基线降低方差的效果不一样。最优的基线是最小化方差的那个,而最优的基线恰好是状态价值函数 ——也就是”在状态s下,平均来说能获得多少回报”。
为什么是 ?直觉上, 衡量的是”选动作a比平均水平好多少”。如果某个动作的Q值和V值差不多,那它就是个中规中矩的选择,不值得特别增加或减少概率;如果Q值远高于V值,那它是个惊喜,应该提高概率。
数学上可以证明, 是最小化方差的最优基线选择。
4.3 优势函数
当我们用 作为基线时, 有一个专门的名字,叫优势函数,记作 :
优势函数的直观含义是:在状态s下选动作a,相对于”随便选个动作”的平均水平,是更好还是更差。
- :选a比平均好,值得增加概率
- :选a比平均差,应该减少概率
- :选a就是平均水平
这就是为什么用优势函数比用原始回报更好——它去掉了与动作选择无关的环境因素造成的方差。
4.4 GAE:偏差-方差的权衡
在实际中, 和 我们都不知道,需要用函数近似来估计。这就引出了广义优势估计(GAE),它提供了一种可以调节偏差-方差权衡的方法。
def compute_gae(rewards, values, dones, gamma=0.99, lam=0.95):
"""
广义优势估计(GAE)。
Args:
rewards: 奖励列表
values: 价值估计列表
dones: done标志列表
gamma: 折扣因子
lam: GAE参数,控制偏差-方差权衡
Returns:
advantages: 优势估计
"""
advantages = []
gae = 0
# 从后往前计算
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
# 最后一个状态,下一个价值是0(因为没有后续状态了)
next_value = 0
else:
next_value = values[t + 1]
# TD误差
delta = rewards[t] + gamma * next_value * (1 - dones[t]) - values[t]
# GAE累加
gae = delta + gamma * lam * (1 - dones[t]) * gae
advantages.insert(0, gae)
return torch.FloatTensor(advantages)GAE的参数 控制了偏差和方差的权衡:
- :只考虑一步TD误差,,方差最低但偏差较大
- :相当于蒙特卡洛回报,无偏但方差最大
通常 或 是比较常见的选择,在偏差和方差之间取得好的平衡。
五、Actor-Critic:策略与价值的联姻
5.1 为什么需要Actor-Critic?
REINFORCE用蒙特卡洛方法估计回报,优点是无偏,缺点是方差大。如果我们知道真实的价值函数 ,就可以用它来代替采样回报,大幅降低方差。
但问题是,真实的 我们并不知道。
Actor-Critic的思路是:同时学习两个东西:
- Actor(策略):负责选择动作,根据策略梯度更新
- Critic(评论家):负责估计价值函数,帮Actor降低方差
两全其美。
5.2 Actor-Critic的算法框架
class ActorCritic:
"""
Actor-Critic算法框架。
Actor负责策略,Critic负责价值估计。
"""
def __init__(self, obs_dim, act_dim, hidden_dim=256,
pi_lr=3e-4, vf_lr=1e-3, gamma=0.99, lam=0.95):
self.gamma = gamma
self.lam = lam
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Actor(策略网络)
self.actor = GaussianPolicy(obs_dim, act_dim, hidden_dim).to(self.device)
self.pi_optimizer = optim.Adam(self.actor.parameters(), lr=pi_lr)
# Critic(价值网络)
self.critic = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
).to(self.device)
self.vf_optimizer = optim.Adam(self.critic.parameters(), lr=vf_lr)
def get_action(self, obs, deterministic=False):
"""选择动作。"""
obs_tensor = torch.FloatTensor(obs).to(self.device).unsqueeze(0)
action, _ = self.actor.sample(obs_tensor, deterministic)
return action.cpu().numpy()[0]
def compute_value(self, obs):
"""估计状态价值。"""
obs_tensor = torch.FloatTensor(obs).to(self.device).unsqueeze(0)
return self.critic(obs_tensor).item()
def update(self, observations, actions, rewards, dones, next_observations):
"""
执行一次Actor-Critic更新。
"""
obs_tensor = torch.FloatTensor(np.array(observations)).to(self.device)
actions_tensor = torch.FloatTensor(np.array(actions)).to(self.device)
rewards_tensor = torch.FloatTensor(np.array(rewards)).to(self.device)
dones_tensor = torch.FloatTensor(np.array(dones)).to(self.device)
next_obs_tensor = torch.FloatTensor(np.array(next_observations)).to(self.device)
# 1. Critic更新:用TD(0)目标更新价值估计
with torch.no_grad():
values = self.critic(obs_tensor).squeeze()
next_values = self.critic(next_obs_tensor).squeeze()
# TD目标
td_targets = rewards_tensor + self.gamma * next_values * (1 - dones_tensor)
# 价值损失
values_pred = self.critic(obs_tensor).squeeze()
vf_loss = nn.MSELoss()(values_pred, td_targets)
self.vf_optimizer.zero_grad()
vf_loss.backward()
self.vf_optimizer.step()
# 2. Actor更新:用优势函数更新策略
with torch.no_grad():
advantages = td_targets - values_pred.detach()
_, log_probs = self.actor.sample(obs_tensor)
# 策略损失
pi_loss = -(log_probs.squeeze() * advantages.detach()).mean()
self.pi_optimizer.zero_grad()
pi_loss.backward()
self.pi_optimizer.step()
return pi_loss.item(), vf_loss.item()5.3 三种方法的对比
| 特性 | REINFORCE | 值函数方法(Q学习) | Actor-Critic |
|---|---|---|---|
| 策略类型 | 随机 | 确定性 | 随机/确定性均可 |
| 价值估计 | 蒙特卡洛(高方差) | TD学习(低方差) | 混合(可控方差) |
| 动作空间 | 连续/离散 | 离散为主 | 连续/离散均可 |
| 收敛性 | 好 | 一般 | 较好 |
| 样本效率 | 低 | 高 | 中等 |
Actor-Critic结合了两边的优点:既有策略梯度处理连续动作的能力,又有值函数方法的低方差。
六、A2C与A3C:并行化加速训练
6.1 A2C:同步优势Actor-Critic
A2C(Advantage Actor-Critic)是Actor-Critic的标准化版本,主要改进是:
- 使用优势函数代替原始回报
- 引入熵正则化鼓励探索
class A2C:
"""同步优势Actor-Critic。"""
def __init__(self, obs_dim, act_dim, hidden_dim=256,
pi_lr=3e-4, vf_lr=1e-3, gamma=0.99, lam=0.95,
ent_coef=0.01):
self.gamma = gamma
self.lam = lam
self.ent_coef = ent_coef
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
# 策略网络
self.actor = GaussianPolicy(obs_dim, act_dim, hidden_dim).to(self.device)
self.pi_optimizer = optim.Adam(self.actor.parameters(), lr=pi_lr)
# 价值网络
self.critic = ValueNetwork(obs_dim, hidden_dim).to(self.device)
self.vf_optimizer = optim.Adam(self.critic.parameters(), lr=vf_lr)
def compute_loss(self, obs_batch, actions, advantages, returns):
"""计算总损失(策略损失 + 价值损失 + 熵奖励)。"""
# 策略损失
_, log_probs = self.actor.sample(obs_batch)
pi_loss = -(log_probs * advantages.detach()).mean()
# 价值损失
values_pred = self.critic(obs_batch).squeeze()
vf_loss = nn.MSELoss()(values_pred, returns.detach())
# 熵损失(熵越高越好,所以是减)
with torch.no_grad():
_, log_probs = self.actor.sample(obs_batch)
entropy = -(log_probs.exp() * log_probs).mean()
total_loss = pi_loss + 0.5 * vf_loss - self.ent_coef * entropy
return total_loss, pi_loss, vf_loss, entropy6.2 A3C:异步并行Actor-Critic
A3C(Asynchronous Advantage Actor-Critic)由DeepMind的Mnih等人2016年提出,是深度强化学习的里程碑算法。
它的核心思想是:多线程并行探索不同的轨迹,然后把学到的东西汇总到全局网络上。
import threading
import multiprocessing as mp
from queue import Queue
class A3C:
"""
异步优势Actor-Critic。
使用多线程并行收集样本,加速训练。
"""
def __init__(self, obs_dim, act_dim, num_workers=4,
lr=3e-4, gamma=0.99, lam=0.95, ent_coef=0.01,
max_steps=20):
self.num_workers = num_workers
self.gamma = gamma
self.lam = lam
self.ent_coef = ent_coef
self.max_steps = max_steps
# 全局网络
self.global_actor = GaussianPolicy(obs_dim, act_dim).to('cpu')
self.global_critic = ValueNetwork(obs_dim).to('cpu')
self.global_optimizer = optim.Adam(
list(self.global_actor.parameters()) + list(self.global_critic.parameters()),
lr=lr
)
# 共享内存
self.global_actor.share_memory()
self.global_critic.share_memory()
# 全局训练步数
self.global_step = mp.Value('i', 0)
def pull_from_global(self, local_actor, local_critic):
"""worker从全局网络拉取参数。"""
local_actor.load_state_dict(self.global_actor.state_dict())
local_critic.load_state_dict(self.global_critic.state_dict())
def push_to_global(self, actor_grads, critic_grads):
"""worker推送梯度到全局网络。"""
for param, grad in zip(self.global_actor.parameters(), actor_grads):
if grad is not None:
param._grad = grad
for param, grad in zip(self.global_critic.parameters(), critic_grads):
if grad is not None:
param._grad = grad
self.global_optimizer.step()
def worker(self, worker_id, env_name, result_queue):
"""单个worker线程的工作流程。"""
# 创建本地网络
local_actor = GaussianPolicy(self.obs_dim, self.act_dim).to('cpu')
local_critic = ValueNetwork(self.obs_dim).to('cpu')
env = gym.make(env_name)
while self.global_step.value < 50000:
# 1. 同步全局参数
self.pull_from_global(local_actor, local_critic)
# 2. 收集样本
obs = env.reset()
episode_reward = 0
obs_buffer, actions_buffer, rewards_buffer = [], [], []
values_buffer, dones_buffer = [], []
for step in range(self.max_steps):
action, log_prob = local_actor.sample(
torch.FloatTensor(obs).unsqueeze(0)
)
action = action.numpy()[0]
next_obs, reward, done, _ = env.step(action)
obs_buffer.append(obs)
actions_buffer.append(action)
rewards_buffer.append(reward)
values_buffer.append(local_critic(torch.FloatTensor(obs).unsqueeze(0)).item())
dones_buffer.append(done)
episode_reward += reward
obs = next_obs
if done:
break
# 3. 计算GAE优势
if done:
next_value = 0
else:
next_value = local_critic(torch.FloatTensor(next_obs).unsqueeze(0)).item()
advantages, returns = compute_gae_torch(
rewards_buffer, values_buffer, dones_buffer,
next_value, self.gamma, self.lam
)
# 4. 计算损失并反向传播
obs_batch = torch.FloatTensor(np.array(obs_buffer))
actions_batch = torch.FloatTensor(np.array(actions_buffer))
_, log_probs = local_actor.sample(obs_batch)
values_pred = local_critic(obs_batch).squeeze()
pi_loss = -(log_probs * advantages).mean()
vf_loss = nn.MSELoss()(values_pred, returns)
entropy = -(log_probs.exp() * log_probs).mean()
total_loss = pi_loss + 0.5 * vf_loss - self.ent_coef * entropy
# 本地反向传播
total_loss.backward()
# 5. 推送梯度到全局
actor_grads = [p.grad for p in local_actor.parameters()]
critic_grads = [p.grad for p in local_critic.parameters()]
self.push_to_global(actor_grads, critic_grads)
with self.global_step.get_lock():
self.global_step.value += 1
if done:
result_queue.put((worker_id, episode_reward))
def train(self, env_name):
"""启动多线程训练。"""
result_queue = Queue()
threads = []
for worker_id in range(self.num_workers):
t = threading.Thread(
target=self.worker,
args=(worker_id, env_name, result_queue)
)
t.start()
threads.append(t)
# 收集结果
for t in threads:
t.join()6.3 A3C为什么有效?
A3C的成功来自于几个因素:
- 探索多样性:多个线程同时探索,它们遇到的状态-动作对分布更广,学到的策略泛化性更好
- 去相关样本:异步收集的样本自然地去掉了时间相关性,这对神经网络训练很重要
- 计算并行:充分利用多核CPU,加速训练过程
- 噪声梯度:异步更新带来的”噪声梯度”反而有正则化效果,帮助跳出局部最优
七、策略梯度 vs Q学习:什么时候用哪种?
7.1 连续动作空间 vs 离散动作空间
这是最直接的选择依据:
- 连续动作空间(机器人控制、物理模拟):必须用策略梯度。Q学习在这种场景下要么需要离散化(丢失精度),要么需要复杂的处理(如NAF),都不如策略梯度直接。
- 离散动作空间(游戏、推荐系统):Q学习和策略梯度都可以。但Q学习通常更简单、更样本高效。
7.2 随机策略 vs 确定性策略
如果任务需要随机策略(比如博弈、不完全信息游戏),策略梯度是更自然的选择。Q学习想学随机策略需要额外技巧。
7.3 收敛性考虑
策略梯度在理论上有更好的收敛保证——它的优化目标是凸的(或近似凸的),而Q学习的优化问题更复杂,可能出现振荡。
7.4 实用建议
| 场景 | 推荐方法 |
|---|---|
| 连续控制(机器人) | PPO、SAC(策略梯度变体) |
| 离散动作游戏 | DQN、Rainbow |
| 需要随机策略 | 策略梯度、A2C |
| 样本有限 | DQN(样本效率高) |
| 收敛困难 | PPO(更稳定) |
八、连续动作空间:策略梯度解决了Q学习的什么问题?
8.1 Q学习处理连续动作的困境
假设你要控制一个机械臂,关节角度可以是0到180度之间的任意实数。用Q学习的话,你需要学习 ,但这里的动作 是连续的。
困境一:无法枚举所有动作。在离散动作空间,Q学习遍历所有可能的动作取最大值。但在连续空间,你根本没法遍历所有动作。
困境二:argmax没有闭式解。即使你想找最优动作,你需要解决 。对于一般的Q函数,这个优化问题没有解析解,而且可能非凸。
已有的解决方案包括:
- DDPG:用神经网络同时学习策略和Q函数,但策略是确定性的
- NAF:把Q函数约束成特殊的二次形式,让argmax有解析解
- 交叉熵方法:用采样和拟合来近似argmax
这些方法都有这样那样的限制,而且往往只能学到确定性策略。
8.2 策略梯度的天然优势
策略梯度完全绕过了这个问题。它根本不需要做那个麻烦的argmax。策略网络 直接输出动作的分布,从这个分布里采样就行了。
也就是说,Q学习是把”选择最好的动作”这个问题留给了在线优化,而策略梯度把这个问题转化成了”拟合一个好的概率分布”,这是神经网络擅长的事情。
九、策略熵正则化:为什么鼓励探索?
9.1 什么是熵?
在信息论里,熵 衡量的是概率分布的不确定性或随机性。分布越均匀,熵越大;分布越尖锐(集中在某个点),熵越小。
在策略梯度里,策略 的熵 衡量的是策略的随机程度。
9.2 为什么要鼓励高熵?
如果训练过程中策略变得过于确定(低熵),比如在某个状态下总是选同一个动作,会出现两个问题:
- 过早收敛到次优策略。智能体可能学到第一个看起来不错的策略,然后就停止探索其他可能性了。
- 无法应对环境变化。确定的策略在环境变化时适应性差。
所以我们可以在损失函数里加一项熵奖励:
其中 是熵系数,控制探索强度。减去熵(或者加上负熵),就是鼓励高熵的策略。
9.3 实践中怎么用?
def compute_policy_loss_with_entropy(obs, actions, advantages, actor, ent_coef=0.01):
"""
带熵正则化的策略损失。
"""
_, log_probs = actor.sample(obs)
# 策略梯度损失
policy_loss = -(log_probs * advantages.detach()).mean()
# 熵(需要最大化,所以是减)
entropy = -(log_probs.exp() * log_probs).mean()
# 总损失
total_loss = policy_loss - ent_coef * entropy
return total_loss, entropy实践中,熵系数通常设在 0.001 到 0.1 之间,具体取决于任务。
十、PPO与策略梯度:近端策略优化如何改进?
10.1 策略梯度的稳定性问题
普通策略梯度有个讨厌的问题:策略更新太大。每更新一次参数,策略可能变化剧烈,导致下次采样的分布和上次完全不同,之前的”经验”就失效了。
这叫策略崩溃或训练不稳定。
10.2 PPO的核心思想
PPO(Proximal Policy Optimization,近端策略优化)的核心是一个简单的约束:不要让策略更新得太远。
PPO用了一个叫做”裁剪代理目标”的技巧:
其中 是新旧策略的概率比, 通常设为0.2。
当某个动作的优势为正(应该增加概率)但概率比太大(增加太多了),裁剪会阻止进一步的增加。反之亦然。
def ppo_loss(obs, actions, old_log_probs, advantages, actor, eps=0.2):
"""
PPO裁剪损失。
"""
_, new_log_probs = actor.sample(obs)
# 概率比
ratio = (new_log_probs - old_log_probs).exp()
# 未裁剪的损失
surr1 = ratio * advantages
# 裁剪后的损失
surr2 = torch.clamp(ratio, 1 - eps, 1 + eps) * advantages
# 取较小的那个
policy_loss = -torch.min(surr1, surr2).mean()
return policy_loss10.3 PPO vs 普通策略梯度
| 特性 | 普通PG | PPO |
|---|---|---|
| 稳定性 | 差 | 好 |
| 超参敏感度 | 高 | 低 |
| 样本效率 | 低 | 高 |
| 实现复杂度 | 低 | 中 |
| 调参难度 | 难 | 相对容易 |
PPO现在是连续控制任务的事实标准,因为它既稳定又高效。
十一、动手实验:用策略梯度训练软体机器人
11.1 实验环境
让我们用策略梯度训练一个简单的软体机器人(hopper)完成跳跃任务。这是MuJoCo环境套件里的经典任务。
import gym
import torch
import numpy as np
# 创建环境
env = gym.make('Hopper-v3')
# 获取状态和动作维度
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
print(f"状态维度: {obs_dim}, 动作维度: {act_dim}")
# 创建A2C智能体
agent = A2C(obs_dim, act_dim, hidden_dim=256, pi_lr=3e-4, vf_lr=1e-3)11.2 训练循环
def train_continuous_control(env, agent, num_episodes=1000,
rollout_steps=2048, update_epochs=10):
"""
训练软体机器人。
"""
episode_rewards = []
for episode in range(num_episodes):
# 收集 rollout
obs_buffer, actions_buffer, rewards_buffer = [], [], []
dones_buffer = []
obs = env.reset()
episode_reward = 0
for step in range(rollout_steps):
action = agent.get_action(obs)
next_obs, reward, done, _ = env.step(action)
obs_buffer.append(obs)
actions_buffer.append(action)
rewards_buffer.append(reward)
dones_buffer.append(done)
episode_reward += reward
obs = next_obs
if done:
obs = env.reset()
episode_rewards.append(episode_reward)
episode_reward = 0
# 准备下一个观测(用于TD计算)
next_obs_buffer = obs_buffer[1:] + [obs]
# 转换为数组
obs_batch = np.array(obs_buffer)
actions_batch = np.array(actions_buffer)
rewards_batch = np.array(rewards_buffer)
dones_batch = np.array(dones_buffer)
next_obs_batch = np.array(next_obs_buffer)
# 更新多次
for _ in range(update_epochs):
pi_loss, vf_loss = agent.update(
obs_batch, actions_batch, rewards_batch,
dones_batch, next_obs_batch
)
# 打印进度
if (episode + 1) % 10 == 0:
mean_reward = np.mean(episode_rewards[-10:])
print(f"Episode {episode+1}/{num_episodes} | "
f"最近10轮平均奖励: {mean_reward:.2f} | "
f"PI Loss: {pi_loss:.4f} | "
f"VF Loss: {vf_loss:.4f}")
return episode_rewards
# 开始训练
rewards = train_continuous_control(env, agent, num_episodes=500)11.3 训练技巧
- 归一化观测。对观测做running mean和std归一化,能显著加速收敛。
- 价值归一化。对回报做归一化也有帮助。
- 梯度裁剪。防止梯度爆炸。
- 学习率调度。训练后期适当降低学习率。
- 早停。如果连续多轮没有提升,可以考虑早停或调整策略。
# 常用的训练技巧包装器
class NormalizedEnv:
"""观测归一化包装器。"""
def __init__(self, env):
self.env = env
self.obs_mean = np.zeros(env.observation_space.shape)
self.obs_var = np.ones(env.observation_space.shape)
self.count = 1e-4
def update_stats(self, obs):
self.count += 1
delta = obs - self.obs_mean
self.obs_mean += delta / self.count
delta2 = obs - self.obs_mean
self.obs_var += delta * delta2
def normalize(self, obs):
return (obs - self.obs_mean) / np.sqrt(self.obs_var + 1e-8)11.4 常见问题排查
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 训练不收敛 | 学习率太大/太小 | 调整学习率,尝试3e-4 |
| 策略崩溃 | 策略更新太大 | 用PPO或减小更新幅度 |
| 价值估计不准 | Critic容量不够 | 增加Critic隐藏层大小 |
| 熵衰减太快 | 熵系数太小 | 增加ent_coef到0.1 |
| 梯度爆炸 | 奖励尺度不一致 | 归一化奖励 |
十二、数学形式化总结
12.1 核心公式
策略梯度定理:
优势函数:
GAE(λ):
其中
12.2 算法演变图谱
策略梯度
├── REINFORCE(蒙特卡洛PG,高方差)
│ └── 引入基线 → 方差缩减
│ └── 用价值函数作为最优基线
│ └── Actor-Critic(低方差)
│ ├── A2C(同步,更新更稳定)
│ └── A3C(异步并行,加速训练)
│ └── GAE(优势估计的改进)
│ └── PPO(近端约束,防崩溃)
十三、相关文档
- MDP与Bellman方程 — 强化学习的数学基础
- Q学习 — 值函数方法的代表
- DQN — 深度值函数方法
- PPO — 策略梯度的稳定改进
- SAC — 最大熵策略梯度
参考文献
- Sutton, R. S., & Barto, A. G. (2018). Reinforcement Learning: An Introduction (2nd ed.). MIT Press.
- Williams, R. J. (1992). Simple statistical gradient-following algorithms for connectionist reinforcement learning. Machine Learning, 8(3-4), 229-256.
- Mnih, V., et al. (2016). Asynchronous methods for deep reinforcement learning. ICML, 1928-1937.
- Schulman, J., Levine, S., Abbeel, P., Jordan, M., & Moritz, P. (2015). Trust region policy optimization. ICML, 1889-1897.
- Schulman, J., Wolski, F., Dhariwal, P., Radford, A., & Klimov, O. (2017). Proximal policy optimization algorithms. arXiv:1707.06347.
- Haarnoja, T., et al. (2018). Soft actor-critic: Off-policy maximum entropy deep reinforcement learning with a stochastic actor. ICML, 1861-1870.
策略梯度方法为连续控制问题提供了优雅的解决方案,是现代强化学习的重要支柱。从REINFORCE到PPO,这条技术路线不断演进,在稳定性、效率和泛化性上持续改进。掌握策略梯度的核心思想,你就能更好地理解强化学习的本质——在不确定环境中学习和决策。