PPO深度指南

关键词速览

|| 核心概念 | 信任域 | 剪裁目标 | KL散度 | 自适应惩罚 | ||:---------|:-------|:---------|:-------|:-----------| || 自然梯度 | 价值裁剪 | 广义优势估计 | 策略更新 | 目标函数 |

核心关键词表

术语英文符号/技术说明
近端策略优化PPOPPO稳定策略更新的算法
信任域策略优化TRPOTRPO基于KL散度约束的策略优化
剪裁目标函数Clipped Objective防止策略过大更新
替代优势函数Surrogate Advantage策略比值的加权
KL散度KL Divergence$D_{KL}(\pi_{\theta_{old}}
自适应KL惩罚Adaptive KL Penalty自动调整的KL惩罚系数
价值函数裁剪Value Clipping防止价值函数剧变
广义优势估计GAE偏差-方差平衡的优势估计
目标函数ObjectivePPO综合目标
策略比值Probability Ratio$r_t(\theta) = \frac{\pi_\theta(a_ts_t)}{\pi_{\theta_{old}}(a_t

前言:为什么你需要学PPO?

在强化学习的世界里,PPO(Proximal Policy Optimization,近端策略优化)就像是一个”老司机”——它稳重、可靠、不容易翻车。相比之下,早期的策略梯度算法就像新手开车,随时可能因为步子迈太大而栽跟头。

如果你做过强化学习项目,大概率会遇到这些头疼的问题:训练不稳定、策略突然崩溃、方差大到根本收敛不了。PPO就是来解决这些问题的。它在2017年被提出后迅速成为业界标配,OpenAI用它训练GPT的早期版本,波士顿动力用它做机器人控制,各种游戏AI也都在用它。

这篇文章我会从最基础的概念讲起,用大量类比和直觉解释,确保你不仅知道PPO”怎么用”,更理解它”为什么有效”。最后还会有完整的代码实现,手把手教你跑起来。


一、策略梯度到底是什么?

1.1 用掷飞镖来理解

想象你在玩一个掷飞镖的游戏。你站在固定位置,向靶子掷飞镖。每次掷出去,结果要么命中靶心(好结果),要么偏得离谱(坏结果)。

现在问题来了:你怎么学习”怎么掷”?

一个朴素的想法是:记住成功的例子,忘掉失败的经验。如果某次掷出去,飞镖扎在9环,你就记住这个动作,下次尽量模仿。如果飞镖飞出靶外,那就尽量避免再做同样的动作。

这就是策略梯度(Policy Gradient)的核心思想。

在强化学习里,我们把”飞镖脱手那一刻的动作”叫做策略(Policy),记作 。它是一个概率分布——在状态 下,选择动作 的概率是多少。这个概率分布由神经网络参数 控制。

“掷飞镖的结果”叫做回报(Return)。我们希望增加好结果的概率,降低坏结果的概率

1.2 策略梯度的数学直觉

策略梯度的目标函数定义为:

其中 是一条完整的轨迹(从开始到结束), 是这条轨迹的累计回报。

根据策略梯度定理(Policy Gradient Theorem),我们可以用梯度上升来优化这个目标:

别被公式吓到,它说的就是:对于轨迹上的每一步,我们用 告诉网络”往哪个方向调整参数”,然后用回报 决定”调整多少”

打个比方:如果一个飞镖手这次掷出了10环,我们会:

  1. 分析他这次的动作细节(手型、角度、力道)
  2. 记住这个成功动作的”特征模式”
  3. 下次遇到类似情况时,尝试复现这个模式

1.3 代码里的策略梯度长这样

import torch
import torch.nn as nn
import numpy as np
 
class SimplePolicy(nn.Module):
    """最简单的策略网络"""
    def __init__(self, obs_dim, act_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, 64),
            nn.ReLU(),
            nn.Linear(64, act_dim),
            nn.Softmax(dim=-1)  # 输出动作概率分布
        )
    
    def forward(self, x):
        return self.net(x)
    
    def get_action(self, obs):
        """给定观测,选择动作"""
        obs_tensor = torch.FloatTensor(obs).unsqueeze(0)
        probs = self(obs_tensor)
        action = torch.multinomial(probs, 1).item()
        return action
 
def compute_policy_gradient_loss(policy, obs, actions, returns):
    """
    策略梯度损失计算
    
    核心思想:如果一个动作带来了正回报,就增加选择它的概率
              如果一个动作带来了负回报,就减少选择它的概率
    """
    log_probs = torch.log(policy(obs) + 1e-8)
    
    # 选择每个动作对应的log概率
    action_log_probs = log_probs.gather(1, actions.unsqueeze(1)).squeeze()
    
    # 策略梯度:log概率 * 回报
    loss = -(action_log_probs * returns).mean()
    
    return loss

二、REINFORCE:最纯粹的策略梯度

2.1 REINFORCE的原理

REINFORCE是Williams在1992年提出的,可以说是策略梯度算法的”老祖宗”。它的想法特别直接:

“如果一个动作最终获得了好的结果(高回报),我就提高选择这个动作的概率;如果结果不好,就降低概率。”

REINFORCE的梯度估计是:

其中 是从时刻t开始的累计折扣回报:

2.2 完整REINFORCE代码

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym
 
class REINFORCEAgent:
    """
    REINFORCE算法实现
    
    这个算法特别简单:
    1. 用当前策略收集一批轨迹
    2. 计算每条轨迹的回报
    3. 用策略梯度更新网络
    """
    def __init__(self, obs_dim, act_dim, lr=3e-4, gamma=0.99):
        self.gamma = gamma
        
        self.policy = nn.Sequential(
            nn.Linear(obs_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, act_dim),
            nn.Softmax(dim=-1)
        )
        
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
    
    def get_action(self, obs):
        """选择动作(按概率采样)"""
        obs_tensor = torch.FloatTensor(obs).unsqueeze(0)
        probs = self.policy(obs_tensor)
        action = torch.multinomial(probs, 1).item()
        return action
    
    def compute_returns(self, rewards):
        """
        计算每一步的累计回报
        
        比如:rewards = [1, 2, 3], gamma = 0.9
        返回: [1 + 0.9*2 + 0.9^2*3, 2 + 0.9*3, 3]
        """
        returns = []
        G = 0
        
        for r in reversed(rewards):
            G = r + self.gamma * G
            returns.insert(0, G)
        
        return torch.FloatTensor(returns)
    
    def update(self, obs_list, actions_list, rewards_list):
        """
        策略梯度更新
        
        核心逻辑:
        1. 对每个样本:log_prob(action) * return
        2. 累加取平均
        3. 梯度上升(取负号变成梯度下降)
        """
        policy_losses = []
        
        for obs, actions, rewards in zip(obs_list, actions_list, rewards_list):
            # 计算累计回报
            returns = self.compute_returns(rewards)
            
            # 归一化(可选,让训练更稳定)
            returns = (returns - returns.mean()) / (returns.std() + 1e-8)
            
            # 策略梯度
            obs_tensor = torch.FloatTensor(np.array(obs))
            actions_tensor = torch.LongTensor(actions)
            returns_tensor = returns
            
            log_probs = torch.log(self.policy(obs_tensor) + 1e-8)
            action_log_probs = log_probs.gather(1, actions_tensor.unsqueeze(1)).squeeze()
            
            # 策略梯度 = log概率 * 回报
            policy_loss = -(action_log_probs * returns_tensor).mean()
            policy_losses.append(policy_loss)
        
        # 更新网络
        total_loss = torch.stack(policy_losses).mean()
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()
 
def train_reinforce(env_name='CartPole-v1', num_episodes=1000):
    """训练REINFORCE"""
    env = gym.make(env_name)
    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.n
    
    agent = REINFORCEAgent(obs_dim, act_dim)
    
    for episode in range(num_episodes):
        obs_list, actions_list, rewards_list = [], [], []
        
        obs, _ = env.reset()
        obs_data, actions_data, rewards_data = [], [], []
        
        done = False
        while not done:
            action = agent.get_action(obs)
            next_obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            obs_data.append(obs)
            actions_data.append(action)
            rewards_data.append(reward)
            
            obs = next_obs
        
        # 更新
        agent.update([obs_data], [actions_data], [rewards_data])
        
        if episode % 50 == 0:
            total_reward = sum(rewards_data)
            print(f"Episode {episode}, Reward: {total_reward}")

2.3 为什么REINFORCE的方差这么大?

REINFORCE有个致命问题:方差太大了

想象一下:你掷飞镖,掷了100次。前10次都飞出了靶外,后90次命中了9环。那这前10次”失败”的经验会严重影响你的学习。

问题出在哪?因为REINFORCE用的是蒙特卡洛采样——它用完整轨迹的累计回报 来估计每一步动作的价值。但这个估计高度依赖随机性:

  1. 环境本身有随机性:同样的动作,下一次结果可能完全不同
  2. 策略有随机性:你选了”正确”的动作,但环境可能给你一个坏结果
  3. 累计误差:越到后面的时刻,误差累积得越厉害

用一个夸张的例子说明:假设你掷飞镖,10次里有1次侥幸命中靶心。REINFORCE会认为这次”命中靶心”的动作就是好动作,值得强化。但实际上这次成功纯属运气,下次你按同样的方式掷,很可能又飞了。

方差大 = 学习信号噪声大 = 收敛慢、不稳定


三、Actor-Critic:结合值函数减少方差

3.1 核心思想:用”裁判”来辅助学习

为了解决方差大的问题,Actor-Critic架构引入了两个角色:

  • Actor(演员):负责输出策略,决定动作选择(就是原来的策略网络)
  • Critic(裁判):负责估计状态价值,判断当前状态好不好(新增的价值网络)

Critic的作用是:给Actor提供一个更稳定的学习信号

回到掷飞镖的例子:原来REINFORCE的做法是”不管三七二十一,就看最终结果”。Actor-Critic的做法是”每掷一次飞镖,先问问裁判觉得这个位置怎么样,再决定要不要学习这次经验”。

3.2 优势函数:去掉那些”本来就好”的状态

Actor-Critic用**优势函数(Advantage Function)**来估计动作的相对价值:

  • :在状态 下选动作 的价值
  • :在状态 下的平均价值

优势函数的含义是:选这个动作,比平均水平的动作好多少?

如果优势是正的,说明这个动作比平均水平好,值得强化;如果是负的,说明这个动作不如平均水平,应该抑制。

用公式表示优势函数:

这就是TD(0)误差(Temporal Difference Error),它既考虑了即时奖励,又利用了价值函数的估计。

3.3 Actor-Critic代码

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym
 
class ActorCritic(nn.Module):
    """
    Actor-Critic网络
    
    共享一部分特征提取层,然后分成两个头:
    - actor头:输出动作概率分布
    - critic头:输出状态价值估计
    """
    def __init__(self, obs_dim, act_dim, hidden_dim=64):
        super().__init__()
        
        # 共享特征提取层
        self.shared = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh()
        )
        
        # Actor头:输出动作概率
        self.actor = nn.Linear(hidden_dim, act_dim)
        
        # Critic头:输出状态价值
        self.critic = nn.Linear(hidden_dim, 1)
    
    def forward(self, x):
        features = self.shared(x)
        probs = torch.softmax(self.actor(features), dim=-1)
        value = self.critic(features)
        return probs, value
    
    def get_action(self, obs):
        """采样动作(同时返回log概率和价值估计)"""
        probs, value = self.forward(obs)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action, log_prob, value.squeeze()
 
class A2CAgent:
    """
    A2C = Advantage Actor-Critic
    
    A2C和A3C是兄弟算法,核心思想一样:
    - A3C:异步并行,多个worker同时探索
    - A2C:同步并行,多个worker同时探索,然后汇总更新
    """
    def __init__(self, obs_dim, act_dim, lr=3e-4, gamma=0.99, entropy_coef=0.01):
        self.gamma = gamma
        self.entropy_coef = entropy_coef
        
        self.policy = ActorCritic(obs_dim, act_dim)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
    
    def get_action(self, obs):
        """获取动作"""
        obs_tensor = torch.FloatTensor(obs).unsqueeze(0)
        with torch.no_grad():
            action, log_prob, value = self.policy.get_action(obs_tensor)
        return action.item(), log_prob.item(), value.item()
    
    def compute_gae(self, rewards, values, dones, next_value):
        """
        计算GAE(广义优势估计)
        
        GAE是PPO的核心技巧之一,后面会详细讲
        """
        advantages = []
        gae = 0
        
        values = values + [next_value]
        
        for t in reversed(range(len(rewards))):
            delta = rewards[t] + self.gamma * values[t+1] * (1 - dones[t]) - values[t]
            gae = delta + self.gamma * 0.95 * (1 - dones[t]) * gae
            advantages.insert(0, gae)
        
        return np.array(advantages)
    
    def update(self, obs_list, actions_list, rewards_list, values_list, dones_list):
        """
        更新网络
        """
        # 计算GAE
        obs = np.array(obs_list)
        actions = np.array(actions_list)
        rewards = np.array(rewards_list)
        values = np.array(values_list)
        dones = np.array(dones_list)
        
        with torch.no_grad():
            _, _, next_value = self.policy.get_action(torch.FloatTensor(obs[-1:]))
        
        advantages = self.compute_gae(rewards, values, dones, next_value.item())
        returns = advantages + values
        
        # 归一化优势函数
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        
        # 转成tensor
        obs_tensor = torch.FloatTensor(obs)
        actions_tensor = torch.LongTensor(actions)
        advantages_tensor = torch.FloatTensor(advantages)
        returns_tensor = torch.FloatTensor(returns)
        
        # 前向传播
        probs, values_pred = self.policy(obs_tensor)
        
        # Actor loss(策略梯度)
        dist = torch.distributions.Categorical(probs)
        log_probs = dist.log_prob(actions_tensor)
        policy_loss = -(log_probs * advantages_tensor).mean()
        
        # Entropy bonus(鼓励探索)
        entropy = dist.entropy().mean()
        entropy_loss = -self.entropy_coef * entropy
        
        # Critic loss(价值函数)
        value_loss = nn.MSELoss()(values_pred.squeeze(), returns_tensor)
        
        # 总损失
        total_loss = policy_loss + value_loss + entropy_loss
        
        # 更新
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()
        
        return policy_loss.item(), value_loss.item(), entropy.item()

四、TRPO vs PPO:信赖域方法 vs 剪裁代理

4.1 TRPO:想用数学约束来保证稳定

TRPO(Trust Region Policy Optimization,信赖域策略优化)2015年由Schulman提出,它的想法很聪明:

“我不限制一步更新多大,我限制新旧策略之间的差异。”

TRPO的约束是:

翻译成人话:新旧策略在每个状态的KL散度(可以理解为概率分布的差异)不能超过

用一个比喻:TRPO像是给策略更新装了一个”限速器”——不管你怎么踩油门,车速都不会超过某个阈值。

4.2 TRPO的计算复杂度问题

TRPO的实现非常复杂:

  1. 需要求解约束优化问题
  2. 用共轭梯度法近似计算自然梯度
  3. 需要存储Fisher信息矩阵(或者用K-FAC近似)
  4. 内存开销巨大
  5. 和GAE等技术配合不友好

这些复杂性让很多研究者望而却步——我只是想训练个机器人,为什么要搞得像在发射火箭?

4.3 PPO:用简单方法解决复杂问题

PPO的核心洞察是:我们不需要精确地限制KL散度,只需要”不要太离谱”就行。

PPO用了一个巧妙的设计:剪裁代理目标函数(Clipped Surrogate Objective)

这个公式看起来吓人,但其实非常直觉。


五、PPO核心公式直观解读:Clip到底在Clip什么?

5.1 理解策略比值 r(θ)

策略比值 定义为:

它的含义是:新策略选择这个动作的概率,是旧策略的多少倍?

  • 如果 :新策略更倾向于选这个动作
  • 如果 :新策略更不愿意选这个动作
  • 如果 :新旧策略选择这个动作的概率一样

5.2 Clip在Clip什么?

我们把公式拆解来看:

的意思是: 限制在 范围内

  • 如果 ,就把它”截断”成
  • 如果 ,就把它”截断”成

5.3 用游戏来理解

假设你在玩一个游戏,epsilon = 0.2(最常用的值)。

场景1:优势函数 A > 0(好动作)

假设某步的优势是 +10,说明这个动作比平均水平好。

  • 如果 :这意味着新策略选择这个动作的概率是旧策略的1.5倍

    • 原来目标:
    • Clip后:

    结论:PPO不让你拿那么多”功劳”,防止你过度强化这个动作。

  • 如果 :Clip不起作用,你正常拿到

场景2:优势函数 A < 0(坏动作)

假设某步的优势是 -5,说明这个动作比平均水平差。

  • 如果 :这意味着新策略几乎不选这个动作了
    • 原来目标:(负数,表示惩罚)
    • Clip后:
    结论:PPO让你不要”惩罚过头”,新策略已经不愿意选这个动作了,给它留点余地。

5.4 Clip的保护机制

总结一下Clip的作用:

  1. 当动作好时(A > 0):防止新策略过度增加这个动作的概率。打个比方:你考试考好了(动作好),但Clip不让你骄傲过头。
  2. 当动作差时(A < 0):防止新策略过度惩罚这个动作。打个比方:你考试考差了(动作差),但Clip不让你完全否定自己,还有翻盘的机会。

这个设计非常精妙:它让策略更新变得更加保守和稳定


六、GAE(广义优势估计):平衡偏差与方差

6.1 为什么需要GAE?

前面说过,REINFORCE用蒙特卡洛采样估计回报,方差大但无偏;用TD(0)误差方差小但有偏。

GAE的想法是:我能不能在两者之间找一个平衡点?

6.2 GAE的公式

其中 是TD误差。

如果把无穷级数截断并求和,可以写成递归形式:

其中 是done标志(如果episode结束就为1)。

6.3 λ参数的艺术

λ控制着偏差-方差的权衡:

  • λ = 0(就是TD(0),方差小但有偏)
  • λ = 1(接近蒙特卡洛,无偏但方差大)
  • λ = 0.95(常用值):在两者之间取得很好的平衡

类比理解:λ就像一个”望远镜”的焦距。λ小,看近处清晰(只关注即时反馈);λ大,看远处也清晰(考虑长期影响)。λ=0.95的意思是:既要看到眼前的利益,也要适当考虑长远,但不要太激进。

6.4 GAE代码实现

def compute_gae(rewards, values, dones, gamma=0.99, lam=0.95):
    """
    计算GAE
    
    参数:
    - rewards: 每步的即时奖励
    - values: 每步的价值估计(包括最后一步后面的价值,用0填充)
    - dones: 每步是否结束
    - gamma: 折扣因子
    - lam: GAE的λ参数
    
    返回:
    - advantages: 每步的优势估计
    """
    advantages = []
    gae = 0
    
    # 逆向计算(从后往前)
    for t in reversed(range(len(rewards))):
        # TD误差
        delta = rewards[t] + gamma * values[t + 1] * (1 - dones[t]) - values[t]
        
        # GAE递归公式
        gae = delta + gamma * lam * (1 - dones[t]) * gae
        advantages.insert(0, gae)
    
    return np.array(advantages)
 
# 使用例子
rewards = [0, 0, 0, 0, 1]  # 前4步没得分,第5步得1分
values = [0.1, 0.2, 0.3, 0.4, 0.5, 0.0]  # 价值估计,最后用0填充
dones = [False, False, False, False, True]  # 第5步结束
 
advantages = compute_gae(rewards, values, dones)
# advantages 会反映每一步的"相对"价值

七、PPO vs Q学习:什么时候用什么?

7.1 两类强化学习方法

强化学习方法大致可以分为两类:

基于值函数的方法(Value-based)

  • 代表算法:DQN、Q学习
  • 思想:学习状态-动作价值Q(s,a),然后选Q值最大的动作
  • 优点:样本效率高,理论基础好
  • 缺点:不适合高维/连续动作空间,方差较大

基于策略的方法(Policy-based)

  • 代表算法:REINFORCE、PPO、SAC
  • 思想:直接学习策略函数 π(a|s)
  • 优点:适合连续动作空间,收敛性好
  • 缺点:样本效率相对较低

7.2 PPO vs Q学习的对比

特性Q学习/DQNPPO
动作空间离散、低维离散或连续都行
收敛稳定性容易不稳定比较稳定
样本效率高(经验回放)中等
探索策略需要epsilon-greedy等策略自带随机性
超参数敏感度较敏感相对鲁棒
实现难度中等中等偏上

7.3 什么时候用PPO?

适合用PPO的场景

  1. 连续动作空间:比如控制机器人的关节角度、无人车的方向盘转角
  2. 高维离散动作空间:比如玩Atari游戏有几十种可能的动作组合
  3. 需要稳定收敛:工程项目中不允许训练过程大起大落
  4. 需要端到端学习:策略和价值函数一起学

可能不需要PPO的场景

  1. 简单的离散动作、低维状态:比如GridWorld,一个简单的Q表就够了
  2. 对样本效率要求极高:比如真实机器人的在线学习,数据太珍贵
  3. 只需要快速验证想法:先试试DQN,更简单

7.4 PPO vs SAC vs TD3:怎么选?

这是三个最流行的策略优化算法,各有特点:

算法类型特点适用场景
PPOOn-policy稳定、鲁棒、易调默认首选,大多数场景
SACOff-policy最大熵,探索强需要充分探索的任务
TD3Off-policy连续控制,稳定机器人控制,精确定位

简单记忆法

  • 不确定用什么 → 先试PPO
  • 任务探索困难 → 试SAC
  • 任务需要精确控制 → 试TD3

八、完整PPO代码:PyTorch实现

8.1 整体架构

PPO的完整流程如下:

1. 初始化策略网络(Actor)和价值网络(Critic)
2. 循环采集数据:
   a. 用当前策略与环境交互,收集 (s, a, r, s', done, log_prob, V)
   b. 存储到经验池
3. 循环更新(多个epoch):
   a. 计算GAE advantage
   b. 对每个mini-batch:
      - 用新策略计算log_prob
      - 计算ratio = exp(log_prob_new - log_prob_old)
      - 计算 clipped surrogate loss
      - 计算 value loss
      - 梯度下降更新网络
4. 重复2-3直到收敛

8.2 完整代码

import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributions as distributions
import numpy as np
import gym
from collections import deque
 
class PPONetwork(nn.Module):
    """
    PPO网络架构
    
    采用Actor-Critic架构:
    - Actor:输出动作的均值和标准差(假设动作服从正态分布)
    - Critic:输出状态价值估计
    
    对于连续动作空间,我们用对角高斯策略:
    - 均值由神经网络输出
    - 标准差可以是可学习的参数或者由网络输出
    """
    def __init__(self, obs_dim, act_dim, hidden_dim=64):
        super().__init__()
        
        # 共享特征提取层
        self.shared = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh()
        )
        
        # Actor头:输出动作均值
        self.actor_mean = nn.Linear(hidden_dim, act_dim)
        
        # Actor头:输出动作标准差(这里用可学习参数,对数标准差更稳定)
        self.actor_log_std = nn.Parameter(torch.zeros(act_dim))
        
        # Critic头:输出状态价值
        self.critic = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1)
        )
    
    def forward(self, x):
        """前向传播"""
        features = self.shared(x)
        mean = self.actor_mean(features)
        std = torch.exp(self.actor_log_std)
        value = self.critic(features)
        return mean, std, value
    
    def get_action(self, x, deterministic=False):
        """
        采样动作
        
        参数:
        - x: 观测
        - deterministic: 是否使用确定性策略(用于评估/测试)
        
        返回:
        - action: 采样的动作
        - log_prob: 动作的对数概率
        - entropy: 策略熵
        - value: 价值估计
        """
        mean, std, value = self.forward(x)
        
        if deterministic:
            action = torch.tanh(mean)  # 使用均值
            return action, None, None, value.squeeze()
        
        dist = distributions.Normal(mean, std)
        
        # 使用reparameterization trick采样
        action = dist.rsample()
        action_log_prob = dist.log_prob(action)
        entropy = dist.entropy()
        
        # 如果是连续动作,通常用tanh压缩到[-1, 1]
        action = torch.tanh(action)
        
        # 注意:使用tanh后需要修正log概率
        # 这是一个小技巧,防止概率计算出问题
        action_log_prob -= torch.log(1 - action.pow(2) + 1e-6)
        
        return action, action_log_prob.sum(-1, keepdim=True), entropy.sum(-1, keepdim=True), value
    
    def evaluate_actions(self, x, actions):
        """
        评估动作的对数概率(用于更新阶段)
        
        这个函数和get_action类似,但是不采样,直接用给定动作计算概率
        """
        mean, std, value = self.forward(x)
        dist = distributions.Normal(mean, std)
        
        # 计算动作的对数概率
        # 注意:actions可能已经是tanh压缩过的
        action_log_prob = dist.log_prob(actions)
        entropy = dist.entropy()
        
        # tanh修正
        action_log_prob -= torch.log(1 - actions.pow(2) + 1e-6)
        
        return action_log_prob.sum(-1, keepdim=True), entropy.sum(-1, keepdim=True), value.squeeze()
 
 
class PPO:
    """
    PPO算法完整实现
    
    超参数说明:
    - gamma: 折扣因子,越大越注重长期收益
    - lam: GAE的λ参数,越大越偏向蒙特卡洛估计
    - clip_epsilon: PPO的裁剪参数,0.1-0.3常用
    - value_coef: 价值损失的系数
    - entropy_coef: 熵正则化的系数,鼓励探索
    - ppo_epochs: 每次更新用多少个epoch
    - batch_size: mini-batch大小
    - max_grad_norm: 梯度裁剪的阈值
    """
    def __init__(
        self,
        obs_dim,
        act_dim,
        hidden_dim=64,
        lr=3e-4,
        gamma=0.99,
        lam=0.95,
        clip_epsilon=0.2,
        value_coef=0.5,
        entropy_coef=0.0,
        ppo_epochs=10,
        batch_size=64,
        max_grad_norm=0.5
    ):
        self.gamma = gamma
        self.lam = lam
        self.clip_epsilon = clip_epsilon
        self.value_coef = value_coef
        self.entropy_coef = entropy_coef
        self.ppo_epochs = ppo_epochs
        self.batch_size = batch_size
        self.max_grad_norm = max_grad_norm
        
        # 创建网络
        self.network = PPONetwork(obs_dim, act_dim, hidden_dim)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
    
    def select_action(self, obs, deterministic=False):
        """选择动作(用于和环境交互)"""
        obs_tensor = torch.FloatTensor(obs).unsqueeze(0)
        
        with torch.no_grad():
            action, log_prob, entropy, value = self.network.get_action(
                obs_tensor, deterministic
            )
        
        return (
            action.numpy()[0],
            log_prob.item() if log_prob is not None else None,
            value.item()
        )
    
    def compute_gae(self, rewards, values, dones):
        """
        计算GAE(广义优势估计)
        
        这是PPO的核心技巧之一,用于获得更稳定的学习信号
        """
        advantages = []
        gae = 0
        
        # 在values开头和结尾加上起始价值和结束价值
        values = list(values) + [0.0]
        
        # 逆向计算
        for t in reversed(range(len(rewards))):
            # TD误差
            delta = rewards[t] + self.gamma * values[t + 1] * (1 - dones[t]) - values[t]
            # GAE递归
            gae = delta + self.gamma * self.lam * (1 - dones[t]) * gae
            advantages.insert(0, gae)
        
        return np.array(advantages)
    
    def update(self, observations, actions, rewards, dones, old_log_probs, values):
        """
        PPO更新步骤
        
        这是PPO的核心:通过对多个epoch的数据进行重复使用来提高效率
        """
        # 转换为numpy数组
        observations = np.array(observations)
        actions = np.array(actions)
        rewards = np.array(rewards)
        dones = np.array(dones)
        old_log_probs = np.array(old_log_probs)
        values = np.array(values)
        
        # 计算GAE advantages
        advantages = self.compute_gae(rewards, values, dones)
        
        # 计算回报(advantage + value)
        returns = advantages + values
        
        # 归一化advantages(这会让训练更稳定)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        
        # 转换为tensor
        observations_t = torch.FloatTensor(observations)
        actions_t = torch.FloatTensor(actions)
        old_log_probs_t = torch.FloatTensor(old_log_probs)
        advantages_t = torch.FloatTensor(advantages)
        returns_t = torch.FloatTensor(returns)
        
        # 存储losses用于日志
        policy_losses = []
        value_losses = []
        entropies = []
        kl_divs = []
        
        # PPO更新多个epoch
        dataset_size = len(observations)
        indices = np.arange(dataset_size)
        
        for epoch in range(self.ppo_epochs):
            # 打乱数据顺序
            np.random.shuffle(indices)
            
            # mini-batch更新
            for start in range(0, dataset_size, self.batch_size):
                end = start + self.batch_size
                batch_idx = indices[start:end]
                
                # 获取mini-batch数据
                batch_obs = observations_t[batch_idx]
                batch_actions = actions_t[batch_idx]
                batch_old_log_probs = old_log_probs_t[batch_idx]
                batch_advantages = advantages_t[batch_idx]
                batch_returns = returns_t[batch_idx]
                
                # 评估动作
                log_probs, entropy, values_pred = self.network.evaluate_actions(
                    batch_obs, batch_actions
                )
                
                # ==================== PPO核心公式 ====================
                # 计算策略比值 r(θ) = π_θ(a|s) / π_θ_old(a|s)
                # 这里用exp(log_prob_new - log_prob_old)来计算,更数值稳定
                ratio = torch.exp(log_probs - batch_old_log_probs)
                
                # 计算surrogate loss
                # surr1: 原始的surrogate loss
                surr1 = ratio * batch_advantages
                
                # surr2: clipped后的surrogate loss
                # clip的作用是防止策略更新过大
                surr2 = torch.clamp(
                    ratio,
                    1 - self.clip_epsilon,
                    1 + self.clip_epsilon
                ) * batch_advantages
                
                # 取最小值(当A>0时clip下界,当A<0时clip上界)
                # 这个min操作是PPO的精髓所在
                policy_loss = -torch.min(surr1, surr2).mean()
                # ====================================================
                
                # 计算价值损失
                value_loss = nn.MSELoss()(values_pred, batch_returns)
                
                # 计算熵损失(鼓励探索)
                entropy_loss = -entropy.mean()
                
                # 总损失
                loss = (
                    policy_loss +
                    self.value_coef * value_loss +
                    self.entropy_coef * entropy_loss
                )
                
                # 梯度下降
                self.optimizer.zero_grad()
                loss.backward()
                
                # 梯度裁剪(防止梯度爆炸)
                torch.nn.utils.clip_grad_norm_(
                    self.network.parameters(),
                    self.max_grad_norm
                )
                
                self.optimizer.step()
                
                # 记录
                policy_losses.append(policy_loss.item())
                value_losses.append(value_loss.item())
                entropies.append(entropy.mean().item())
                
                # 计算KL散度(用于监控)
                with torch.no_grad():
                    kl = (batch_old_log_probs - log_probs).mean()
                    kl_divs.append(kl.item())
        
        return {
            'policy_loss': np.mean(policy_losses),
            'value_loss': np.mean(value_losses),
            'entropy': np.mean(entropies),
            'kl_divergence': np.mean(kl_divs)
        }
 
 
def train_ppo(env_name, num_steps=1000000, num_env_steps=2048, update_freq=2048):
    """
    PPO训练主循环
    
    参数:
    - env_name: gym环境名称
    - num_steps: 总训练步数
    - num_env_steps: 每次更新前收集多少步数据
    - update_freq: 更新频率
    """
    # 创建环境
    env = gym.make(env_name)
    
    obs_dim = env.observation_space.shape[0]
    
    # 判断动作空间类型
    if hasattr(env.action_space, 'n'):
        act_dim = env.action_space.n
        action_type = 'discrete'
    else:
        act_dim = env.action_space.shape[0]
        action_type = 'continuous'
    
    print(f"环境: {env_name}")
    print(f"观测维度: {obs_dim}, 动作维度: {act_dim}")
    print(f"动作类型: {action_type}")
    
    # 创建PPO agent
    agent = PPO(
        obs_dim=obs_dim,
        act_dim=act_dim,
        hidden_dim=64,
        lr=3e-4,
        gamma=0.99,
        lam=0.95,
        clip_epsilon=0.2,
        value_coef=0.5,
        entropy_coef=0.01,
        ppo_epochs=10,
        batch_size=64
    )
    
    # 训练
    obs, _ = env.reset()
    episode_rewards = deque(maxlen=10)
    total_steps = 0
    
    while total_steps < num_steps:
        # 数据收集阶段
        observations, actions, rewards, dones, old_log_probs, values = [], [], [], [], [], []
        
        episode_reward = 0
        
        for _ in range(num_env_steps):
            # 选择动作
            action, log_prob, value = agent.select_action(obs)
            
            # 与环境交互
            next_obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            # 存储数据
            observations.append(obs)
            actions.append(action)
            rewards.append(reward)
            dones.append(done)
            old_log_probs.append(log_prob)
            values.append(value)
            
            episode_reward += reward
            obs = next_obs
            
            if done:
                episode_rewards.append(episode_reward)
                obs, _ = env.reset()
                episode_reward = 0
        
        total_steps += num_env_steps
        
        # PPO更新
        losses = agent.update(
            observations,
            actions,
            rewards,
            dones,
            old_log_probs,
            values
        )
        
        # 打印训练进度
        if len(episode_rewards) > 0:
            avg_reward = np.mean(episode_rewards)
            print(f"Steps: {total_steps:8d} | "
                  f"Avg Reward: {avg_reward:7.2f} | "
                  f"Policy Loss: {losses['policy_loss']:7.3f} | "
                  f"Value Loss: {losses['value_loss']:7.3f} | "
                  f"Entropy: {losses['entropy']:6.3f}")
        
        # 保存模型(可选)
        # torch.save(agent.network.state_dict(), f'ppo_{env_name}_{total_steps}.pt')
    
    env.close()
    return agent
 
 
# 运行训练
if __name__ == '__main__':
    # 连续控制任务示例
    agent = train_ppo('HalfCheetah-v2', num_steps=500000)
    
    # 离散控制任务示例(取消注释即可)
    # agent = train_ppo('CartPole-v1', num_steps=500000)

九、PPO调参实战:经验总结

9.1 核心超参数推荐

参数推荐范围调整建议
学习率3e-4(连续)、3e-4(离散)可以用线性衰减,从3e-4慢慢降到1e-4
Clip epsilon0.1 - 0.3(默认0.2)连续动作空间常用0.2,离散可用0.3
GAE lambda0.9 - 0.99(默认0.95)越大越平滑,越小越激进
PPO epochs10 - 30(默认10)数据量大可以多几个epoch
Mini-batch size32 - 256(默认64)连续任务用大点,离散任务用小点
价值系数0.5 - 1.0(默认0.5)价值估计不准时增大
熵系数0 - 0.01(默认0)鼓励探索时加一点,比如0.01
梯度裁剪0.5(连续)、1.0(离散)防止梯度爆炸
环境数量1 - 16根据CPU核心数调整

9.2 调参技巧

1. 先用默认参数跑通整个流程

不要一上来就调参,先确保代码能跑起来,能收敛。哪怕收敛慢一点,也比跑不起来强。

2. 观察这些关键指标

# 如果看到这些情况,说明有问题
if ratio.mean() > 2.0:
    print("⚠️ 策略更新过大,可能不稳定")
    
if entropy < 0.1:
    print("⚠️ 熵太小,策略可能已经收敛或崩溃")
    
if kl_divergence > 0.1:
    print("⚠️ KL散度太大,考虑减小学习率")
    
if value_loss > 100:
    print("⚠️ 价值损失太大,考虑调整价值系数")

3. 常见问题排查

问题可能原因解决方案
训练崩溃(reward突然变0)策略更新过大减小学习率,减小clip epsilon
训练不收敛探索不足或学习率不对增加熵系数,减小学习率
方差大,曲线很抖batch太小或GAE lambda太小增加batch size,增大GAE lambda
价值估计不准价值网络太弱或价值系数太小增大价值网络,增加价值系数

9.3 不同任务的参数调整

连续控制任务(机器人、无人机)

PPO(
    hidden_dim=128,      # 动作复杂,需要更大的网络
    clip_epsilon=0.2,   # 连续空间常用0.2
    entropy_coef=0.0,    # 一般不需要额外的熵正则化
    batch_size=256,      # 连续任务可以用更大的batch
)

离散控制任务(Atari游戏、棋类)

PPO(
    hidden_dim=64,
    clip_epsilon=0.1,    # 离散空间可以用更小的clip
    entropy_coef=0.01,   # 鼓励探索
    batch_size=32,      # 离散任务batch可以小一点
)

十、实战:用PPO训练你的第一个AI

10.1 任务1:训练CartPole站立

CartPole是强化学习入门的”Hello World”——让杆子保持平衡不倒。

import gym
 
def train_cartpole():
    from PPO import train_ppo
    
    agent = train_ppo(
        env_name='CartPole-v1',
        num_steps=100000,
        num_env_steps=2048
    )
    
    # 测试
    env = gym.make('CartPole-v1', render_mode='human')
    obs, _ = env.reset()
    
    for _ in range(1000):
        action, _, _ = agent.select_action(obs, deterministic=True)
        obs, _, terminated, truncated, _ = env.step(action)
        
        if terminated or truncated:
            obs, _ = env.reset()
    
    env.close()
 
train_cartpole()

10.2 任务2:训练机械臂抓取(需要MuJoCo)

def train_reacher():
    from PPO import train_ppo
    
    # Reacher环境:控制机械臂到达目标位置
    agent = train_ppo(
        env_name='Reacher-v2',
        num_steps=1000000,
        num_env_steps=2048
    )
 
train_reacher()

10.3 任务3:多环境并行训练

from stable_baselines3.common.vec_env import SubprocVecEnv, DummyVecEnv
import gym
 
def make_env(env_id):
    """创建单个环境"""
    def _init():
        env = gym.make(env_id)
        return env
    return _init
 
def train_multi_env():
    """多环境并行训练"""
    n_envs = 8
    env = SubprocVecEnv([make_env('HalfCheetah-v2') for _ in range(n_envs)])
    
    # 观察空间和动作空间
    from gym import spaces
    
    obs_dim = env.observation_space.shape[1]  # 子环境的观测维度
    act_dim = env.action_space.shape[1]       # 子环境的动作维度
    
    # 创建PPO agent(代码同上)
    agent = PPO(obs_dim, act_dim, ...)
    
    # 训练循环(需要改成支持向量化环境)
    # ...

10.4 快速验证PPO是否有效

def quick_test():
    """快速测试:CartPole应该在100步内收敛到接近500"""
    agent = train_ppo('CartPole-v1', num_steps=50000)
    
    # 评估
    env = gym.make('CartPole-v1')
    obs, _ = env.reset()
    
    total_reward = 0
    for _ in range(500):
        action, _, _ = agent.select_action(obs, deterministic=True)
        obs, reward, terminated, truncated, _ = env.step(action)
        total_reward += reward
        
        if terminated or truncated:
            break
    
    print(f"测试 Reward: {total_reward}")
    env.close()
 
quick_test()

十一、总结

11.1 PPO的核心要点

  1. 策略梯度 + Actor-Critic:PPO是策略梯度算法,用Actor选择动作,用Critic估计价值
  2. Clip机制:通过裁剪策略比值,防止策略更新过大,保证训练稳定
  3. GAE:广义优势估计平衡偏差和方差,提供更稳定的学习信号
  4. 一阶优化:相比TRPO的二阶优化,PPO实现更简单,效率更高

11.2 PPO vs 其他算法的选择

  • 简单离散任务:先试试Q学习或DQN
  • 连续控制/复杂任务:PPO是首选
  • 需要充分探索:试试SAC
  • 需要精确控制:试试TD3

11.3 实战建议

  1. 从简单的环境开始:CartPole、MountainCar这种,能快速验证想法
  2. 用Stable Baselines3起步:先理解PPO怎么用,再考虑自己实现
  3. 监控关键指标:KL散度、熵、价值损失,这些能帮你诊断问题
  4. 耐心等待:强化学习训练慢是正常的,不要一看到曲线波动就觉得有问题

参考文献

  1. Schulman, J., Levine, S., Abbeel, P., Jordan, M., & Moritz, P. (2015). Trust region policy optimization. ICML, 1889-1897.
  2. Schulman, J., Wolski, F., Dhariwal, P., Radford, A., & Klimov, O. (2017). Proximal policy optimization algorithms. arXiv:1707.06347.
  3. Engstrom, L., et al. (2020). Implementation matters in RL: A case study on PPO. ICLR 2020.
  4. Andrychowicz, M., et al. (2020). What matters in on-policy reinforcement learning? A large-scale empirical study. arXiv:2006.05990.
  5. Raffin, A., et al. (2021). SB3: Stable Baselines3. JMLR.

PPO以其简洁的实现和稳定的性能,成为当前强化学习研究和应用的首选算法。