PPO深度指南
关键词速览
|| 核心概念 | 信任域 | 剪裁目标 | KL散度 | 自适应惩罚 | ||:---------|:-------|:---------|:-------|:-----------| || 自然梯度 | 价值裁剪 | 广义优势估计 | 策略更新 | 目标函数 |
核心关键词表
术语 英文 符号/技术 说明 近端策略优化 PPO PPO 稳定策略更新的算法 信任域策略优化 TRPO TRPO 基于KL散度约束的策略优化 剪裁目标函数 Clipped Objective 防止策略过大更新 替代优势函数 Surrogate Advantage 策略比值的加权 KL散度 KL Divergence $D_{KL}(\pi_{\theta_{old}} 自适应KL惩罚 Adaptive KL Penalty 自动调整的KL惩罚系数 价值函数裁剪 Value Clipping 防止价值函数剧变 广义优势估计 GAE 偏差-方差平衡的优势估计 目标函数 Objective PPO综合目标 策略比值 Probability Ratio $r_t(\theta) = \frac{\pi_\theta(a_t s_t)}{\pi_{\theta_{old}}(a_t
前言:为什么你需要学PPO?
在强化学习的世界里,PPO(Proximal Policy Optimization,近端策略优化)就像是一个”老司机”——它稳重、可靠、不容易翻车。相比之下,早期的策略梯度算法就像新手开车,随时可能因为步子迈太大而栽跟头。
如果你做过强化学习项目,大概率会遇到这些头疼的问题:训练不稳定、策略突然崩溃、方差大到根本收敛不了。PPO就是来解决这些问题的。它在2017年被提出后迅速成为业界标配,OpenAI用它训练GPT的早期版本,波士顿动力用它做机器人控制,各种游戏AI也都在用它。
这篇文章我会从最基础的概念讲起,用大量类比和直觉解释,确保你不仅知道PPO”怎么用”,更理解它”为什么有效”。最后还会有完整的代码实现,手把手教你跑起来。
一、策略梯度到底是什么?
1.1 用掷飞镖来理解
想象你在玩一个掷飞镖的游戏。你站在固定位置,向靶子掷飞镖。每次掷出去,结果要么命中靶心(好结果),要么偏得离谱(坏结果)。
现在问题来了:你怎么学习”怎么掷”?
一个朴素的想法是:记住成功的例子,忘掉失败的经验。如果某次掷出去,飞镖扎在9环,你就记住这个动作,下次尽量模仿。如果飞镖飞出靶外,那就尽量避免再做同样的动作。
这就是策略梯度(Policy Gradient)的核心思想。
在强化学习里,我们把”飞镖脱手那一刻的动作”叫做策略(Policy),记作 。它是一个概率分布——在状态 下,选择动作 的概率是多少。这个概率分布由神经网络参数 控制。
“掷飞镖的结果”叫做回报(Return)。我们希望增加好结果的概率,降低坏结果的概率。
1.2 策略梯度的数学直觉
策略梯度的目标函数定义为:
其中 是一条完整的轨迹(从开始到结束), 是这条轨迹的累计回报。
根据策略梯度定理(Policy Gradient Theorem),我们可以用梯度上升来优化这个目标:
别被公式吓到,它说的就是:对于轨迹上的每一步,我们用 告诉网络”往哪个方向调整参数”,然后用回报 决定”调整多少”。
打个比方:如果一个飞镖手这次掷出了10环,我们会:
- 分析他这次的动作细节(手型、角度、力道)
- 记住这个成功动作的”特征模式”
- 下次遇到类似情况时,尝试复现这个模式
1.3 代码里的策略梯度长这样
import torch
import torch.nn as nn
import numpy as np
class SimplePolicy(nn.Module):
"""最简单的策略网络"""
def __init__(self, obs_dim, act_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim, 64),
nn.ReLU(),
nn.Linear(64, act_dim),
nn.Softmax(dim=-1) # 输出动作概率分布
)
def forward(self, x):
return self.net(x)
def get_action(self, obs):
"""给定观测,选择动作"""
obs_tensor = torch.FloatTensor(obs).unsqueeze(0)
probs = self(obs_tensor)
action = torch.multinomial(probs, 1).item()
return action
def compute_policy_gradient_loss(policy, obs, actions, returns):
"""
策略梯度损失计算
核心思想:如果一个动作带来了正回报,就增加选择它的概率
如果一个动作带来了负回报,就减少选择它的概率
"""
log_probs = torch.log(policy(obs) + 1e-8)
# 选择每个动作对应的log概率
action_log_probs = log_probs.gather(1, actions.unsqueeze(1)).squeeze()
# 策略梯度:log概率 * 回报
loss = -(action_log_probs * returns).mean()
return loss二、REINFORCE:最纯粹的策略梯度
2.1 REINFORCE的原理
REINFORCE是Williams在1992年提出的,可以说是策略梯度算法的”老祖宗”。它的想法特别直接:
“如果一个动作最终获得了好的结果(高回报),我就提高选择这个动作的概率;如果结果不好,就降低概率。”
REINFORCE的梯度估计是:
其中 是从时刻t开始的累计折扣回报:
2.2 完整REINFORCE代码
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym
class REINFORCEAgent:
"""
REINFORCE算法实现
这个算法特别简单:
1. 用当前策略收集一批轨迹
2. 计算每条轨迹的回报
3. 用策略梯度更新网络
"""
def __init__(self, obs_dim, act_dim, lr=3e-4, gamma=0.99):
self.gamma = gamma
self.policy = nn.Sequential(
nn.Linear(obs_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, act_dim),
nn.Softmax(dim=-1)
)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
def get_action(self, obs):
"""选择动作(按概率采样)"""
obs_tensor = torch.FloatTensor(obs).unsqueeze(0)
probs = self.policy(obs_tensor)
action = torch.multinomial(probs, 1).item()
return action
def compute_returns(self, rewards):
"""
计算每一步的累计回报
比如:rewards = [1, 2, 3], gamma = 0.9
返回: [1 + 0.9*2 + 0.9^2*3, 2 + 0.9*3, 3]
"""
returns = []
G = 0
for r in reversed(rewards):
G = r + self.gamma * G
returns.insert(0, G)
return torch.FloatTensor(returns)
def update(self, obs_list, actions_list, rewards_list):
"""
策略梯度更新
核心逻辑:
1. 对每个样本:log_prob(action) * return
2. 累加取平均
3. 梯度上升(取负号变成梯度下降)
"""
policy_losses = []
for obs, actions, rewards in zip(obs_list, actions_list, rewards_list):
# 计算累计回报
returns = self.compute_returns(rewards)
# 归一化(可选,让训练更稳定)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# 策略梯度
obs_tensor = torch.FloatTensor(np.array(obs))
actions_tensor = torch.LongTensor(actions)
returns_tensor = returns
log_probs = torch.log(self.policy(obs_tensor) + 1e-8)
action_log_probs = log_probs.gather(1, actions_tensor.unsqueeze(1)).squeeze()
# 策略梯度 = log概率 * 回报
policy_loss = -(action_log_probs * returns_tensor).mean()
policy_losses.append(policy_loss)
# 更新网络
total_loss = torch.stack(policy_losses).mean()
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
def train_reinforce(env_name='CartPole-v1', num_episodes=1000):
"""训练REINFORCE"""
env = gym.make(env_name)
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.n
agent = REINFORCEAgent(obs_dim, act_dim)
for episode in range(num_episodes):
obs_list, actions_list, rewards_list = [], [], []
obs, _ = env.reset()
obs_data, actions_data, rewards_data = [], [], []
done = False
while not done:
action = agent.get_action(obs)
next_obs, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
obs_data.append(obs)
actions_data.append(action)
rewards_data.append(reward)
obs = next_obs
# 更新
agent.update([obs_data], [actions_data], [rewards_data])
if episode % 50 == 0:
total_reward = sum(rewards_data)
print(f"Episode {episode}, Reward: {total_reward}")2.3 为什么REINFORCE的方差这么大?
REINFORCE有个致命问题:方差太大了。
想象一下:你掷飞镖,掷了100次。前10次都飞出了靶外,后90次命中了9环。那这前10次”失败”的经验会严重影响你的学习。
问题出在哪?因为REINFORCE用的是蒙特卡洛采样——它用完整轨迹的累计回报 来估计每一步动作的价值。但这个估计高度依赖随机性:
- 环境本身有随机性:同样的动作,下一次结果可能完全不同
- 策略有随机性:你选了”正确”的动作,但环境可能给你一个坏结果
- 累计误差:越到后面的时刻,误差累积得越厉害
用一个夸张的例子说明:假设你掷飞镖,10次里有1次侥幸命中靶心。REINFORCE会认为这次”命中靶心”的动作就是好动作,值得强化。但实际上这次成功纯属运气,下次你按同样的方式掷,很可能又飞了。
方差大 = 学习信号噪声大 = 收敛慢、不稳定
三、Actor-Critic:结合值函数减少方差
3.1 核心思想:用”裁判”来辅助学习
为了解决方差大的问题,Actor-Critic架构引入了两个角色:
- Actor(演员):负责输出策略,决定动作选择(就是原来的策略网络)
- Critic(裁判):负责估计状态价值,判断当前状态好不好(新增的价值网络)
Critic的作用是:给Actor提供一个更稳定的学习信号。
回到掷飞镖的例子:原来REINFORCE的做法是”不管三七二十一,就看最终结果”。Actor-Critic的做法是”每掷一次飞镖,先问问裁判觉得这个位置怎么样,再决定要不要学习这次经验”。
3.2 优势函数:去掉那些”本来就好”的状态
Actor-Critic用**优势函数(Advantage Function)**来估计动作的相对价值:
- :在状态 下选动作 的价值
- :在状态 下的平均价值
优势函数的含义是:选这个动作,比平均水平的动作好多少?
如果优势是正的,说明这个动作比平均水平好,值得强化;如果是负的,说明这个动作不如平均水平,应该抑制。
用公式表示优势函数:
这就是TD(0)误差(Temporal Difference Error),它既考虑了即时奖励,又利用了价值函数的估计。
3.3 Actor-Critic代码
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym
class ActorCritic(nn.Module):
"""
Actor-Critic网络
共享一部分特征提取层,然后分成两个头:
- actor头:输出动作概率分布
- critic头:输出状态价值估计
"""
def __init__(self, obs_dim, act_dim, hidden_dim=64):
super().__init__()
# 共享特征提取层
self.shared = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh()
)
# Actor头:输出动作概率
self.actor = nn.Linear(hidden_dim, act_dim)
# Critic头:输出状态价值
self.critic = nn.Linear(hidden_dim, 1)
def forward(self, x):
features = self.shared(x)
probs = torch.softmax(self.actor(features), dim=-1)
value = self.critic(features)
return probs, value
def get_action(self, obs):
"""采样动作(同时返回log概率和价值估计)"""
probs, value = self.forward(obs)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
log_prob = dist.log_prob(action)
return action, log_prob, value.squeeze()
class A2CAgent:
"""
A2C = Advantage Actor-Critic
A2C和A3C是兄弟算法,核心思想一样:
- A3C:异步并行,多个worker同时探索
- A2C:同步并行,多个worker同时探索,然后汇总更新
"""
def __init__(self, obs_dim, act_dim, lr=3e-4, gamma=0.99, entropy_coef=0.01):
self.gamma = gamma
self.entropy_coef = entropy_coef
self.policy = ActorCritic(obs_dim, act_dim)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
def get_action(self, obs):
"""获取动作"""
obs_tensor = torch.FloatTensor(obs).unsqueeze(0)
with torch.no_grad():
action, log_prob, value = self.policy.get_action(obs_tensor)
return action.item(), log_prob.item(), value.item()
def compute_gae(self, rewards, values, dones, next_value):
"""
计算GAE(广义优势估计)
GAE是PPO的核心技巧之一,后面会详细讲
"""
advantages = []
gae = 0
values = values + [next_value]
for t in reversed(range(len(rewards))):
delta = rewards[t] + self.gamma * values[t+1] * (1 - dones[t]) - values[t]
gae = delta + self.gamma * 0.95 * (1 - dones[t]) * gae
advantages.insert(0, gae)
return np.array(advantages)
def update(self, obs_list, actions_list, rewards_list, values_list, dones_list):
"""
更新网络
"""
# 计算GAE
obs = np.array(obs_list)
actions = np.array(actions_list)
rewards = np.array(rewards_list)
values = np.array(values_list)
dones = np.array(dones_list)
with torch.no_grad():
_, _, next_value = self.policy.get_action(torch.FloatTensor(obs[-1:]))
advantages = self.compute_gae(rewards, values, dones, next_value.item())
returns = advantages + values
# 归一化优势函数
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# 转成tensor
obs_tensor = torch.FloatTensor(obs)
actions_tensor = torch.LongTensor(actions)
advantages_tensor = torch.FloatTensor(advantages)
returns_tensor = torch.FloatTensor(returns)
# 前向传播
probs, values_pred = self.policy(obs_tensor)
# Actor loss(策略梯度)
dist = torch.distributions.Categorical(probs)
log_probs = dist.log_prob(actions_tensor)
policy_loss = -(log_probs * advantages_tensor).mean()
# Entropy bonus(鼓励探索)
entropy = dist.entropy().mean()
entropy_loss = -self.entropy_coef * entropy
# Critic loss(价值函数)
value_loss = nn.MSELoss()(values_pred.squeeze(), returns_tensor)
# 总损失
total_loss = policy_loss + value_loss + entropy_loss
# 更新
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
return policy_loss.item(), value_loss.item(), entropy.item()四、TRPO vs PPO:信赖域方法 vs 剪裁代理
4.1 TRPO:想用数学约束来保证稳定
TRPO(Trust Region Policy Optimization,信赖域策略优化)2015年由Schulman提出,它的想法很聪明:
“我不限制一步更新多大,我限制新旧策略之间的差异。”
TRPO的约束是:
翻译成人话:新旧策略在每个状态的KL散度(可以理解为概率分布的差异)不能超过 。
用一个比喻:TRPO像是给策略更新装了一个”限速器”——不管你怎么踩油门,车速都不会超过某个阈值。
4.2 TRPO的计算复杂度问题
TRPO的实现非常复杂:
- 需要求解约束优化问题
- 用共轭梯度法近似计算自然梯度
- 需要存储Fisher信息矩阵(或者用K-FAC近似)
- 内存开销巨大
- 和GAE等技术配合不友好
这些复杂性让很多研究者望而却步——我只是想训练个机器人,为什么要搞得像在发射火箭?
4.3 PPO:用简单方法解决复杂问题
PPO的核心洞察是:我们不需要精确地限制KL散度,只需要”不要太离谱”就行。
PPO用了一个巧妙的设计:剪裁代理目标函数(Clipped Surrogate Objective)。
这个公式看起来吓人,但其实非常直觉。
五、PPO核心公式直观解读:Clip到底在Clip什么?
5.1 理解策略比值 r(θ)
策略比值 定义为:
它的含义是:新策略选择这个动作的概率,是旧策略的多少倍?
- 如果 :新策略更倾向于选这个动作
- 如果 :新策略更不愿意选这个动作
- 如果 :新旧策略选择这个动作的概率一样
5.2 Clip在Clip什么?
我们把公式拆解来看:
的意思是:把 限制在 范围内。
- 如果 ,就把它”截断”成
- 如果 ,就把它”截断”成
5.3 用游戏来理解
假设你在玩一个游戏,epsilon = 0.2(最常用的值)。
场景1:优势函数 A > 0(好动作)
假设某步的优势是 +10,说明这个动作比平均水平好。
-
如果 :这意味着新策略选择这个动作的概率是旧策略的1.5倍
- 原来目标:
- Clip后:
结论:PPO不让你拿那么多”功劳”,防止你过度强化这个动作。
-
如果 :Clip不起作用,你正常拿到
场景2:优势函数 A < 0(坏动作)
假设某步的优势是 -5,说明这个动作比平均水平差。
- 如果 :这意味着新策略几乎不选这个动作了
- 原来目标:(负数,表示惩罚)
- Clip后:
5.4 Clip的保护机制
总结一下Clip的作用:
- 当动作好时(A > 0):防止新策略过度增加这个动作的概率。打个比方:你考试考好了(动作好),但Clip不让你骄傲过头。
- 当动作差时(A < 0):防止新策略过度惩罚这个动作。打个比方:你考试考差了(动作差),但Clip不让你完全否定自己,还有翻盘的机会。
这个设计非常精妙:它让策略更新变得更加保守和稳定。
六、GAE(广义优势估计):平衡偏差与方差
6.1 为什么需要GAE?
前面说过,REINFORCE用蒙特卡洛采样估计回报,方差大但无偏;用TD(0)误差方差小但有偏。
GAE的想法是:我能不能在两者之间找一个平衡点?
6.2 GAE的公式
其中 是TD误差。
如果把无穷级数截断并求和,可以写成递归形式:
其中 是done标志(如果episode结束就为1)。
6.3 λ参数的艺术
λ控制着偏差-方差的权衡:
- λ = 0:(就是TD(0),方差小但有偏)
- λ = 1:(接近蒙特卡洛,无偏但方差大)
- λ = 0.95(常用值):在两者之间取得很好的平衡
类比理解:λ就像一个”望远镜”的焦距。λ小,看近处清晰(只关注即时反馈);λ大,看远处也清晰(考虑长期影响)。λ=0.95的意思是:既要看到眼前的利益,也要适当考虑长远,但不要太激进。
6.4 GAE代码实现
def compute_gae(rewards, values, dones, gamma=0.99, lam=0.95):
"""
计算GAE
参数:
- rewards: 每步的即时奖励
- values: 每步的价值估计(包括最后一步后面的价值,用0填充)
- dones: 每步是否结束
- gamma: 折扣因子
- lam: GAE的λ参数
返回:
- advantages: 每步的优势估计
"""
advantages = []
gae = 0
# 逆向计算(从后往前)
for t in reversed(range(len(rewards))):
# TD误差
delta = rewards[t] + gamma * values[t + 1] * (1 - dones[t]) - values[t]
# GAE递归公式
gae = delta + gamma * lam * (1 - dones[t]) * gae
advantages.insert(0, gae)
return np.array(advantages)
# 使用例子
rewards = [0, 0, 0, 0, 1] # 前4步没得分,第5步得1分
values = [0.1, 0.2, 0.3, 0.4, 0.5, 0.0] # 价值估计,最后用0填充
dones = [False, False, False, False, True] # 第5步结束
advantages = compute_gae(rewards, values, dones)
# advantages 会反映每一步的"相对"价值七、PPO vs Q学习:什么时候用什么?
7.1 两类强化学习方法
强化学习方法大致可以分为两类:
基于值函数的方法(Value-based):
- 代表算法:DQN、Q学习
- 思想:学习状态-动作价值Q(s,a),然后选Q值最大的动作
- 优点:样本效率高,理论基础好
- 缺点:不适合高维/连续动作空间,方差较大
基于策略的方法(Policy-based):
- 代表算法:REINFORCE、PPO、SAC
- 思想:直接学习策略函数 π(a|s)
- 优点:适合连续动作空间,收敛性好
- 缺点:样本效率相对较低
7.2 PPO vs Q学习的对比
| 特性 | Q学习/DQN | PPO |
|---|---|---|
| 动作空间 | 离散、低维 | 离散或连续都行 |
| 收敛稳定性 | 容易不稳定 | 比较稳定 |
| 样本效率 | 高(经验回放) | 中等 |
| 探索策略 | 需要epsilon-greedy等 | 策略自带随机性 |
| 超参数敏感度 | 较敏感 | 相对鲁棒 |
| 实现难度 | 中等 | 中等偏上 |
7.3 什么时候用PPO?
适合用PPO的场景:
- 连续动作空间:比如控制机器人的关节角度、无人车的方向盘转角
- 高维离散动作空间:比如玩Atari游戏有几十种可能的动作组合
- 需要稳定收敛:工程项目中不允许训练过程大起大落
- 需要端到端学习:策略和价值函数一起学
可能不需要PPO的场景:
- 简单的离散动作、低维状态:比如GridWorld,一个简单的Q表就够了
- 对样本效率要求极高:比如真实机器人的在线学习,数据太珍贵
- 只需要快速验证想法:先试试DQN,更简单
7.4 PPO vs SAC vs TD3:怎么选?
这是三个最流行的策略优化算法,各有特点:
| 算法 | 类型 | 特点 | 适用场景 |
|---|---|---|---|
| PPO | On-policy | 稳定、鲁棒、易调 | 默认首选,大多数场景 |
| SAC | Off-policy | 最大熵,探索强 | 需要充分探索的任务 |
| TD3 | Off-policy | 连续控制,稳定 | 机器人控制,精确定位 |
简单记忆法:
- 不确定用什么 → 先试PPO
- 任务探索困难 → 试SAC
- 任务需要精确控制 → 试TD3
八、完整PPO代码:PyTorch实现
8.1 整体架构
PPO的完整流程如下:
1. 初始化策略网络(Actor)和价值网络(Critic)
2. 循环采集数据:
a. 用当前策略与环境交互,收集 (s, a, r, s', done, log_prob, V)
b. 存储到经验池
3. 循环更新(多个epoch):
a. 计算GAE advantage
b. 对每个mini-batch:
- 用新策略计算log_prob
- 计算ratio = exp(log_prob_new - log_prob_old)
- 计算 clipped surrogate loss
- 计算 value loss
- 梯度下降更新网络
4. 重复2-3直到收敛
8.2 完整代码
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributions as distributions
import numpy as np
import gym
from collections import deque
class PPONetwork(nn.Module):
"""
PPO网络架构
采用Actor-Critic架构:
- Actor:输出动作的均值和标准差(假设动作服从正态分布)
- Critic:输出状态价值估计
对于连续动作空间,我们用对角高斯策略:
- 均值由神经网络输出
- 标准差可以是可学习的参数或者由网络输出
"""
def __init__(self, obs_dim, act_dim, hidden_dim=64):
super().__init__()
# 共享特征提取层
self.shared = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh()
)
# Actor头:输出动作均值
self.actor_mean = nn.Linear(hidden_dim, act_dim)
# Actor头:输出动作标准差(这里用可学习参数,对数标准差更稳定)
self.actor_log_std = nn.Parameter(torch.zeros(act_dim))
# Critic头:输出状态价值
self.critic = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 1)
)
def forward(self, x):
"""前向传播"""
features = self.shared(x)
mean = self.actor_mean(features)
std = torch.exp(self.actor_log_std)
value = self.critic(features)
return mean, std, value
def get_action(self, x, deterministic=False):
"""
采样动作
参数:
- x: 观测
- deterministic: 是否使用确定性策略(用于评估/测试)
返回:
- action: 采样的动作
- log_prob: 动作的对数概率
- entropy: 策略熵
- value: 价值估计
"""
mean, std, value = self.forward(x)
if deterministic:
action = torch.tanh(mean) # 使用均值
return action, None, None, value.squeeze()
dist = distributions.Normal(mean, std)
# 使用reparameterization trick采样
action = dist.rsample()
action_log_prob = dist.log_prob(action)
entropy = dist.entropy()
# 如果是连续动作,通常用tanh压缩到[-1, 1]
action = torch.tanh(action)
# 注意:使用tanh后需要修正log概率
# 这是一个小技巧,防止概率计算出问题
action_log_prob -= torch.log(1 - action.pow(2) + 1e-6)
return action, action_log_prob.sum(-1, keepdim=True), entropy.sum(-1, keepdim=True), value
def evaluate_actions(self, x, actions):
"""
评估动作的对数概率(用于更新阶段)
这个函数和get_action类似,但是不采样,直接用给定动作计算概率
"""
mean, std, value = self.forward(x)
dist = distributions.Normal(mean, std)
# 计算动作的对数概率
# 注意:actions可能已经是tanh压缩过的
action_log_prob = dist.log_prob(actions)
entropy = dist.entropy()
# tanh修正
action_log_prob -= torch.log(1 - actions.pow(2) + 1e-6)
return action_log_prob.sum(-1, keepdim=True), entropy.sum(-1, keepdim=True), value.squeeze()
class PPO:
"""
PPO算法完整实现
超参数说明:
- gamma: 折扣因子,越大越注重长期收益
- lam: GAE的λ参数,越大越偏向蒙特卡洛估计
- clip_epsilon: PPO的裁剪参数,0.1-0.3常用
- value_coef: 价值损失的系数
- entropy_coef: 熵正则化的系数,鼓励探索
- ppo_epochs: 每次更新用多少个epoch
- batch_size: mini-batch大小
- max_grad_norm: 梯度裁剪的阈值
"""
def __init__(
self,
obs_dim,
act_dim,
hidden_dim=64,
lr=3e-4,
gamma=0.99,
lam=0.95,
clip_epsilon=0.2,
value_coef=0.5,
entropy_coef=0.0,
ppo_epochs=10,
batch_size=64,
max_grad_norm=0.5
):
self.gamma = gamma
self.lam = lam
self.clip_epsilon = clip_epsilon
self.value_coef = value_coef
self.entropy_coef = entropy_coef
self.ppo_epochs = ppo_epochs
self.batch_size = batch_size
self.max_grad_norm = max_grad_norm
# 创建网络
self.network = PPONetwork(obs_dim, act_dim, hidden_dim)
self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
def select_action(self, obs, deterministic=False):
"""选择动作(用于和环境交互)"""
obs_tensor = torch.FloatTensor(obs).unsqueeze(0)
with torch.no_grad():
action, log_prob, entropy, value = self.network.get_action(
obs_tensor, deterministic
)
return (
action.numpy()[0],
log_prob.item() if log_prob is not None else None,
value.item()
)
def compute_gae(self, rewards, values, dones):
"""
计算GAE(广义优势估计)
这是PPO的核心技巧之一,用于获得更稳定的学习信号
"""
advantages = []
gae = 0
# 在values开头和结尾加上起始价值和结束价值
values = list(values) + [0.0]
# 逆向计算
for t in reversed(range(len(rewards))):
# TD误差
delta = rewards[t] + self.gamma * values[t + 1] * (1 - dones[t]) - values[t]
# GAE递归
gae = delta + self.gamma * self.lam * (1 - dones[t]) * gae
advantages.insert(0, gae)
return np.array(advantages)
def update(self, observations, actions, rewards, dones, old_log_probs, values):
"""
PPO更新步骤
这是PPO的核心:通过对多个epoch的数据进行重复使用来提高效率
"""
# 转换为numpy数组
observations = np.array(observations)
actions = np.array(actions)
rewards = np.array(rewards)
dones = np.array(dones)
old_log_probs = np.array(old_log_probs)
values = np.array(values)
# 计算GAE advantages
advantages = self.compute_gae(rewards, values, dones)
# 计算回报(advantage + value)
returns = advantages + values
# 归一化advantages(这会让训练更稳定)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# 转换为tensor
observations_t = torch.FloatTensor(observations)
actions_t = torch.FloatTensor(actions)
old_log_probs_t = torch.FloatTensor(old_log_probs)
advantages_t = torch.FloatTensor(advantages)
returns_t = torch.FloatTensor(returns)
# 存储losses用于日志
policy_losses = []
value_losses = []
entropies = []
kl_divs = []
# PPO更新多个epoch
dataset_size = len(observations)
indices = np.arange(dataset_size)
for epoch in range(self.ppo_epochs):
# 打乱数据顺序
np.random.shuffle(indices)
# mini-batch更新
for start in range(0, dataset_size, self.batch_size):
end = start + self.batch_size
batch_idx = indices[start:end]
# 获取mini-batch数据
batch_obs = observations_t[batch_idx]
batch_actions = actions_t[batch_idx]
batch_old_log_probs = old_log_probs_t[batch_idx]
batch_advantages = advantages_t[batch_idx]
batch_returns = returns_t[batch_idx]
# 评估动作
log_probs, entropy, values_pred = self.network.evaluate_actions(
batch_obs, batch_actions
)
# ==================== PPO核心公式 ====================
# 计算策略比值 r(θ) = π_θ(a|s) / π_θ_old(a|s)
# 这里用exp(log_prob_new - log_prob_old)来计算,更数值稳定
ratio = torch.exp(log_probs - batch_old_log_probs)
# 计算surrogate loss
# surr1: 原始的surrogate loss
surr1 = ratio * batch_advantages
# surr2: clipped后的surrogate loss
# clip的作用是防止策略更新过大
surr2 = torch.clamp(
ratio,
1 - self.clip_epsilon,
1 + self.clip_epsilon
) * batch_advantages
# 取最小值(当A>0时clip下界,当A<0时clip上界)
# 这个min操作是PPO的精髓所在
policy_loss = -torch.min(surr1, surr2).mean()
# ====================================================
# 计算价值损失
value_loss = nn.MSELoss()(values_pred, batch_returns)
# 计算熵损失(鼓励探索)
entropy_loss = -entropy.mean()
# 总损失
loss = (
policy_loss +
self.value_coef * value_loss +
self.entropy_coef * entropy_loss
)
# 梯度下降
self.optimizer.zero_grad()
loss.backward()
# 梯度裁剪(防止梯度爆炸)
torch.nn.utils.clip_grad_norm_(
self.network.parameters(),
self.max_grad_norm
)
self.optimizer.step()
# 记录
policy_losses.append(policy_loss.item())
value_losses.append(value_loss.item())
entropies.append(entropy.mean().item())
# 计算KL散度(用于监控)
with torch.no_grad():
kl = (batch_old_log_probs - log_probs).mean()
kl_divs.append(kl.item())
return {
'policy_loss': np.mean(policy_losses),
'value_loss': np.mean(value_losses),
'entropy': np.mean(entropies),
'kl_divergence': np.mean(kl_divs)
}
def train_ppo(env_name, num_steps=1000000, num_env_steps=2048, update_freq=2048):
"""
PPO训练主循环
参数:
- env_name: gym环境名称
- num_steps: 总训练步数
- num_env_steps: 每次更新前收集多少步数据
- update_freq: 更新频率
"""
# 创建环境
env = gym.make(env_name)
obs_dim = env.observation_space.shape[0]
# 判断动作空间类型
if hasattr(env.action_space, 'n'):
act_dim = env.action_space.n
action_type = 'discrete'
else:
act_dim = env.action_space.shape[0]
action_type = 'continuous'
print(f"环境: {env_name}")
print(f"观测维度: {obs_dim}, 动作维度: {act_dim}")
print(f"动作类型: {action_type}")
# 创建PPO agent
agent = PPO(
obs_dim=obs_dim,
act_dim=act_dim,
hidden_dim=64,
lr=3e-4,
gamma=0.99,
lam=0.95,
clip_epsilon=0.2,
value_coef=0.5,
entropy_coef=0.01,
ppo_epochs=10,
batch_size=64
)
# 训练
obs, _ = env.reset()
episode_rewards = deque(maxlen=10)
total_steps = 0
while total_steps < num_steps:
# 数据收集阶段
observations, actions, rewards, dones, old_log_probs, values = [], [], [], [], [], []
episode_reward = 0
for _ in range(num_env_steps):
# 选择动作
action, log_prob, value = agent.select_action(obs)
# 与环境交互
next_obs, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
# 存储数据
observations.append(obs)
actions.append(action)
rewards.append(reward)
dones.append(done)
old_log_probs.append(log_prob)
values.append(value)
episode_reward += reward
obs = next_obs
if done:
episode_rewards.append(episode_reward)
obs, _ = env.reset()
episode_reward = 0
total_steps += num_env_steps
# PPO更新
losses = agent.update(
observations,
actions,
rewards,
dones,
old_log_probs,
values
)
# 打印训练进度
if len(episode_rewards) > 0:
avg_reward = np.mean(episode_rewards)
print(f"Steps: {total_steps:8d} | "
f"Avg Reward: {avg_reward:7.2f} | "
f"Policy Loss: {losses['policy_loss']:7.3f} | "
f"Value Loss: {losses['value_loss']:7.3f} | "
f"Entropy: {losses['entropy']:6.3f}")
# 保存模型(可选)
# torch.save(agent.network.state_dict(), f'ppo_{env_name}_{total_steps}.pt')
env.close()
return agent
# 运行训练
if __name__ == '__main__':
# 连续控制任务示例
agent = train_ppo('HalfCheetah-v2', num_steps=500000)
# 离散控制任务示例(取消注释即可)
# agent = train_ppo('CartPole-v1', num_steps=500000)九、PPO调参实战:经验总结
9.1 核心超参数推荐
| 参数 | 推荐范围 | 调整建议 |
|---|---|---|
| 学习率 | 3e-4(连续)、3e-4(离散) | 可以用线性衰减,从3e-4慢慢降到1e-4 |
| Clip epsilon | 0.1 - 0.3(默认0.2) | 连续动作空间常用0.2,离散可用0.3 |
| GAE lambda | 0.9 - 0.99(默认0.95) | 越大越平滑,越小越激进 |
| PPO epochs | 10 - 30(默认10) | 数据量大可以多几个epoch |
| Mini-batch size | 32 - 256(默认64) | 连续任务用大点,离散任务用小点 |
| 价值系数 | 0.5 - 1.0(默认0.5) | 价值估计不准时增大 |
| 熵系数 | 0 - 0.01(默认0) | 鼓励探索时加一点,比如0.01 |
| 梯度裁剪 | 0.5(连续)、1.0(离散) | 防止梯度爆炸 |
| 环境数量 | 1 - 16 | 根据CPU核心数调整 |
9.2 调参技巧
1. 先用默认参数跑通整个流程
不要一上来就调参,先确保代码能跑起来,能收敛。哪怕收敛慢一点,也比跑不起来强。
2. 观察这些关键指标
# 如果看到这些情况,说明有问题
if ratio.mean() > 2.0:
print("⚠️ 策略更新过大,可能不稳定")
if entropy < 0.1:
print("⚠️ 熵太小,策略可能已经收敛或崩溃")
if kl_divergence > 0.1:
print("⚠️ KL散度太大,考虑减小学习率")
if value_loss > 100:
print("⚠️ 价值损失太大,考虑调整价值系数")3. 常见问题排查
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 训练崩溃(reward突然变0) | 策略更新过大 | 减小学习率,减小clip epsilon |
| 训练不收敛 | 探索不足或学习率不对 | 增加熵系数,减小学习率 |
| 方差大,曲线很抖 | batch太小或GAE lambda太小 | 增加batch size,增大GAE lambda |
| 价值估计不准 | 价值网络太弱或价值系数太小 | 增大价值网络,增加价值系数 |
9.3 不同任务的参数调整
连续控制任务(机器人、无人机):
PPO(
hidden_dim=128, # 动作复杂,需要更大的网络
clip_epsilon=0.2, # 连续空间常用0.2
entropy_coef=0.0, # 一般不需要额外的熵正则化
batch_size=256, # 连续任务可以用更大的batch
)离散控制任务(Atari游戏、棋类):
PPO(
hidden_dim=64,
clip_epsilon=0.1, # 离散空间可以用更小的clip
entropy_coef=0.01, # 鼓励探索
batch_size=32, # 离散任务batch可以小一点
)十、实战:用PPO训练你的第一个AI
10.1 任务1:训练CartPole站立
CartPole是强化学习入门的”Hello World”——让杆子保持平衡不倒。
import gym
def train_cartpole():
from PPO import train_ppo
agent = train_ppo(
env_name='CartPole-v1',
num_steps=100000,
num_env_steps=2048
)
# 测试
env = gym.make('CartPole-v1', render_mode='human')
obs, _ = env.reset()
for _ in range(1000):
action, _, _ = agent.select_action(obs, deterministic=True)
obs, _, terminated, truncated, _ = env.step(action)
if terminated or truncated:
obs, _ = env.reset()
env.close()
train_cartpole()10.2 任务2:训练机械臂抓取(需要MuJoCo)
def train_reacher():
from PPO import train_ppo
# Reacher环境:控制机械臂到达目标位置
agent = train_ppo(
env_name='Reacher-v2',
num_steps=1000000,
num_env_steps=2048
)
train_reacher()10.3 任务3:多环境并行训练
from stable_baselines3.common.vec_env import SubprocVecEnv, DummyVecEnv
import gym
def make_env(env_id):
"""创建单个环境"""
def _init():
env = gym.make(env_id)
return env
return _init
def train_multi_env():
"""多环境并行训练"""
n_envs = 8
env = SubprocVecEnv([make_env('HalfCheetah-v2') for _ in range(n_envs)])
# 观察空间和动作空间
from gym import spaces
obs_dim = env.observation_space.shape[1] # 子环境的观测维度
act_dim = env.action_space.shape[1] # 子环境的动作维度
# 创建PPO agent(代码同上)
agent = PPO(obs_dim, act_dim, ...)
# 训练循环(需要改成支持向量化环境)
# ...10.4 快速验证PPO是否有效
def quick_test():
"""快速测试:CartPole应该在100步内收敛到接近500"""
agent = train_ppo('CartPole-v1', num_steps=50000)
# 评估
env = gym.make('CartPole-v1')
obs, _ = env.reset()
total_reward = 0
for _ in range(500):
action, _, _ = agent.select_action(obs, deterministic=True)
obs, reward, terminated, truncated, _ = env.step(action)
total_reward += reward
if terminated or truncated:
break
print(f"测试 Reward: {total_reward}")
env.close()
quick_test()十一、总结
11.1 PPO的核心要点
- 策略梯度 + Actor-Critic:PPO是策略梯度算法,用Actor选择动作,用Critic估计价值
- Clip机制:通过裁剪策略比值,防止策略更新过大,保证训练稳定
- GAE:广义优势估计平衡偏差和方差,提供更稳定的学习信号
- 一阶优化:相比TRPO的二阶优化,PPO实现更简单,效率更高
11.2 PPO vs 其他算法的选择
- 简单离散任务:先试试Q学习或DQN
- 连续控制/复杂任务:PPO是首选
- 需要充分探索:试试SAC
- 需要精确控制:试试TD3
11.3 实战建议
- 从简单的环境开始:CartPole、MountainCar这种,能快速验证想法
- 用Stable Baselines3起步:先理解PPO怎么用,再考虑自己实现
- 监控关键指标:KL散度、熵、价值损失,这些能帮你诊断问题
- 耐心等待:强化学习训练慢是正常的,不要一看到曲线波动就觉得有问题
参考文献
- Schulman, J., Levine, S., Abbeel, P., Jordan, M., & Moritz, P. (2015). Trust region policy optimization. ICML, 1889-1897.
- Schulman, J., Wolski, F., Dhariwal, P., Radford, A., & Klimov, O. (2017). Proximal policy optimization algorithms. arXiv:1707.06347.
- Engstrom, L., et al. (2020). Implementation matters in RL: A case study on PPO. ICLR 2020.
- Andrychowicz, M., et al. (2020). What matters in on-policy reinforcement learning? A large-scale empirical study. arXiv:2006.05990.
- Raffin, A., et al. (2021). SB3: Stable Baselines3. JMLR.
PPO以其简洁的实现和稳定的性能,成为当前强化学习研究和应用的首选算法。