策略梯度方法详解

关键词速览

核心概念策略梯度定理基线方差缩减Actor-Critic
REINFORCEA3C优势函数策略参数化软更新

核心关键词表

术语英文符号/技术说明
策略梯度Policy Gradient策略参数的梯度方向
策略梯度定理PGT策略梯度的理论基础
REINFORCEREINFORCEMonte Carlo PG基于回报的策略梯度估计
基线Baseline缩减方差但不引入偏差
优势函数Advantage相对基线的优势
Actor-CriticActor-CriticAC结合值函数和策略
A3CAsynchronous ACA3C异步并行Actor-Critic
策略参数化Policy Param策略的函数表示
价值基线Value Baseline状态价值函数作为基线
回报估计Return累积折扣回报

一、策略梯度入门:为什么直接从策略入手?

1.1 两种不同的思路

说起强化学习,你可能首先想到的是Q学习、DQN这些名字。它们的套路是:先学习一个”动作价值函数”Q(s,a),告诉我每个状态-动作对有多好,然后用这个Q值来决定在某个状态下该选哪个动作。说白了,就是先”评估”再”决策”。

但策略梯度走的是另一条路。它根本不关心Q值是多少,直接端到端地学习一个策略函数 :看到状态s,输出该选什么动作a。这个策略就是一个神经网络,输入是状态,输出是动作(或者动作的概率分布)。

打个比方的话,Q学习就像是一个经验丰富的教练,他在脑子里记着”在这种局面下,你走这一步,历史平均胜率是68%“,然后每次都挑胜率最高的走。而策略梯度更像是直接训练你的肌肉记忆——看到一个局面,手就自动动了,根本不需要中间那个”胜率评估”环节。

1.2 策略梯度解决了什么问题?

你可能会问,既然Q学习已经能work了,干嘛还要学策略梯度?这就要说到Q学习的几个痛点了。

第一个痛点是连续动作空间。 想象你要控制一个机器人的关节角度,关节角度是一个连续值,从0度到180度之间的任何角度都有可能。Q学习怎么处理这个问题?要么把角度离散化成”0度、30度、60度…”这种离散的档位,但这太粗糙了,丢失了很多细节;要么用一些技巧比如NAF(归一化动作场)来处理,但这些方法要么粗糙要么复杂。

策略梯度就不一样了,它输出的是动作的均值和方差,想输出什么连续值都行,完美解决。

第二个痛点是策略的随机性。 Q学习学出来的策略通常是确定性的——给定状态s,每次都选同一个动作。但在真实环境里,有时候我们需要保持一定的随机性。比如在石头剪刀布游戏里,确定性策略是必输的,只有随机策略才能达到纳什均衡。Q学习想学随机策略挺费劲的,策略梯度就很自然地能学到 这个概率分布。

第三个痛点是探索-利用的平衡。 Q学习通常需要额外的技巧(比如ε-greedy)来保证探索。但策略梯度因为策略本身就是随机的,每一次选择动作都是从分布里采样,天生就带了探索属性。

1.3 策略怎么表示?

策略梯度里的策略 是一个参数化的函数,参数是 。具体怎么表示取决于动作空间是离散的还是连续的。

对于离散动作空间,最常见的是用softmax输出每个动作的概率:

这里的 可以是一个神经网络,输入状态s和动作a,输出一个”分数”,然后用softmax把这些分数转成概率。

对于连续动作空间,通常假设动作服从高斯分布:

也就是说,给定状态s,策略输出动作的均值 和方差 ,然后从正态分布里采样得到实际动作。这样做的好处是,神经网络只需要学习均值和方差,采样是自动完成的。

下面是一个用PyTorch实现高斯策略的完整代码:

import torch
import torch.nn as nn
 
class GaussianPolicy(nn.Module):
    """连续动作空间的高斯策略网络。"""
    
    def __init__(self, obs_dim, act_dim, hidden_dim=256):
        super().__init__()
        
        # 共享的特征提取层
        self.actor = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        
        # 均值输出头
        self.mean = nn.Linear(hidden_dim, act_dim)
        
        # 对数标准差输出头
        # 用log_std而不是直接输出std,这样可以让std永远是正的
        self.log_std = nn.Linear(hidden_dim, act_dim)
    
    def forward(self, obs):
        """前向传播,返回均值和对数标准差。"""
        features = self.actor(obs)
        mean = self.mean(features)
        
        # clamp限制范围,防止数值爆炸
        log_std = self.log_std(features).clamp(-20, 2)
        std = log_std.exp()
        
        return mean, std
    
    def sample(self, obs, deterministic=False):
        """
        从策略中采样动作。
        
        Args:
            obs: 观测张量
            deterministic: 是否使用确定性策略(直接返回均值)
        
        Returns:
            action: 采样的动作
            log_prob: 动作的对数概率(用于策略梯度计算)
        """
        mean, std = self.forward(obs)
        
        if deterministic:
            return torch.tanh(mean), None
        
        # 创建正态分布
        dist = torch.distributions.Normal(mean, std)
        
        # 重参数化采样:让采样过程可导
        x_t = dist.rsample()
        
        # 用tanh压缩到[-1, 1]范围
        action = torch.tanh(x_t)
        
        # 计算修正的对数概率(因为用了tanh压缩)
        # 这个公式是从unbounded正态分布到[-1,1]压缩的概率修正
        log_prob = dist.log_prob(x_t)
        log_prob -= torch.log(1 - action.pow(2) + 1e-6)
        log_prob = log_prob.sum(dim=-1, keepdim=True)
        
        return action, log_prob

这段代码里有几个值得注意的地方。一个是用了 rsample() 而不是 sample(),这是重参数化技巧——我们把随机性用到一个独立的噪声变量上,这样梯度就能流过来了。另一个是用tanh把输出压缩到[-1,1]范围,同时需要对数概率做个修正。


二、策略梯度定理:为什么梯度是指南针?

2.1 我们要优化的目标

在策略梯度里,我们的目标是找到一组参数 ,让策略 获得的期望回报最大化。这个期望回报定义为:

这里的 是由策略 生成的一条轨迹, 是这条轨迹的累计回报, 是折扣因子。

换句话说, 就是:如果你用当前的策略 去和环境交互,得到的平均回报是多少?我们当然希望这个平均值越高越好。

现在问题来了:怎么调整 变大?

2.2 梯度!梯度!

在数学上,函数增长最快的方向就是它的梯度方向。所以如果我们能算出 ,就知道应该往哪个方向调整参数 能让目标函数变大。然后沿着这个方向走一小步,再重新计算梯度,再走一小步……这就是梯度上升(或者如果我们要最小化,就是梯度下降)。

问题是, 是一个关于 的期望,而 本身又依赖于 ,这让直接求导变得很麻烦。

2.3 策略梯度定理登场

这个时候,策略梯度定理就出场了。它告诉我们一个巧妙的等式:

或者更常用的状态-动作形式:

这个定理的证明基于一个很聪明的恒等式:

你可能会问,这个恒等式哪来的?其实很简单:对 求导,得到 ,移项就得到了。

2.4 直观理解:为什么这个公式是对的?

这个公式的直觉是这样的: 指向”如何调整参数让 变大”的方向。说人话就是:看到这个梯度,就知道往哪个方向改参数能让选择这个动作的概率提高。

则是”裁判”,它告诉我们在状态s下选动作a到底好不好。如果Q值高,说明这个动作是个明智的选择,值得增加它的概率;如果Q值低,说明这个动作不怎么样,应该降低它的概率。

两者相乘,就是:找到那些”概率应该增加”的动作(由对数梯度指示),但只对那些”确实带来高回报”的动作做增强(由Q值加权)。

这就像你玩超级玛丽,记录下每一局游戏的轨迹和最终得分。游戏结束后,你说”这一把我死在了第三个坑,那次跳跃应该少用力”,或者说”这一把我成功吃到了星星然后过了那一关,那个决策是对的,下次还这么做”——策略梯度就是在做类似的事情。


三、REINFORCE算法:最简单的策略梯度

3.1 Monte Carlo的方法

REINFORCE是最基础的策略梯度算法,1992年由Ronald Williams提出。它的核心思想是:用实际采样的轨迹来估计期望回报。

具体来说,策略梯度定理里的 我们通常不知道,因为它需要知道在策略 下从状态s出发选动作a的期望回报。但我们可以用蒙特卡洛方法,用实际采样的回报来近似:

这里的 是第i条轨迹里从时刻t开始的累计折扣回报:

因为用的是实际的轨迹回报,所以这是无偏估计——如果采样足够多,梯度估计的平均值会收敛到真实的策略梯度。

3.2 REINFORCE的完整实现

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
 
class REINFORCE:
    """REINFORCE算法,带可选的基线。"""
    
    def __init__(self, obs_dim, act_dim, hidden_dim=128, 
                 lr=3e-4, gamma=0.99, device='cuda'):
        
        self.gamma = gamma
        self.device = device
        
        # 策略网络
        self.policy = GaussianPolicy(obs_dim, act_dim, hidden_dim).to(device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        
        # 价值网络(用作基线,减少方差)
        self.value_net = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        ).to(device)
        self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr)
    
    def select_action(self, obs, deterministic=False):
        """
        根据当前策略选择动作。
        
        Args:
            obs: 环境观测(numpy数组)
            deterministic: 如果为True,返回均值(用于评估)
        
        Returns:
            action: 选择的动作(numpy数组)
        """
        obs_tensor = torch.FloatTensor(obs).to(self.device).unsqueeze(0)
        
        with torch.no_grad():
            mean, std = self.policy(obs_tensor)
            if deterministic:
                action = mean
            else:
                dist = torch.distributions.Normal(mean, std)
                action = dist.sample()
            action = torch.tanh(action)
        
        return action.cpu().numpy()[0]
    
    def compute_returns(self, rewards, dones):
        """
        计算折扣回报。
        
        Args:
            rewards: 奖励列表
            dones: done标志列表
        
        Returns:
            returns: 折扣回报张量
        """
        returns = []
        discounted = 0
        
        # 从后往前计算
        for reward, done in zip(reversed(rewards), reversed(dones)):
            discounted = reward + self.gamma * discounted * (1 - done)
            returns.insert(0, discounted)
        
        return torch.FloatTensor(returns)
    
    def update(self, observations, actions, rewards, dones):
        """
        执行一次策略梯度更新。
        
        Args:
            observations: 观测列表
            actions: 动作列表
            rewards: 奖励列表
            dones: done标志列表
        
        Returns:
            policy_loss: 策略损失
            value_loss: 价值损失
        """
        # 转换为张量
        obs_tensor = torch.FloatTensor(np.array(observations)).to(self.device)
        actions_tensor = torch.FloatTensor(np.array(actions)).to(self.device)
        rewards_tensor = torch.FloatTensor(np.array(rewards)).to(self.device)
        dones_tensor = torch.FloatTensor(np.array(dones)).to(self.device)
        
        # 计算折扣回报
        returns = self.compute_returns(rewards, dones).to(self.device)
        
        # 计算优势函数(回报减去基线)
        with torch.no_grad():
            values = self.value_net(obs_tensor).squeeze()
            advantages = returns - values
        
        # 策略梯度更新
        _, log_probs = self.policy.sample(obs_tensor)
        
        # 策略损失 = -log_prob * advantage
        # 负号是因为我们想最大化回报,但PyTorch默认最小化损失
        policy_loss = -(log_probs.squeeze() * advantages.detach()).mean()
        
        self.optimizer.zero_grad()
        policy_loss.backward()
        self.optimizer.step()
        
        # 价值网络更新(最小化价值误差)
        value_loss = nn.MSELoss()(values, returns.detach())
        
        self.value_optimizer.zero_grad()
        value_loss.backward()
        self.value_optimizer.step()
        
        return policy_loss.item(), value_loss.item()
 
 
def collect_trajectory(env, agent, max_steps=1000):
    """
    用当前策略收集一条轨迹。
    
    Returns:
        observations, actions, rewards, dones: 轨迹数据
    """
    obs = env.reset()
    observations, actions, rewards, dones = [], [], [], []
    
    for step in range(max_steps):
        action = agent.select_action(obs)
        next_obs, reward, done, _ = env.step(action)
        
        observations.append(obs)
        actions.append(action)
        rewards.append(reward)
        dones.append(done)
        
        obs = next_obs
        if done:
            break
    
    return observations, actions, rewards, dones
 
 
def train_reinforce(env, agent, num_episodes=1000, max_steps=1000, print_every=10):
    """
    训练REINFORCE智能体。
    """
    for episode in range(num_episodes):
        # 收集一条轨迹
        obs, actions, rewards, dones = collect_trajectory(env, agent, max_steps)
        
        # 更新策略
        policy_loss, value_loss = agent.update(obs, actions, rewards, dones)
        
        if (episode + 1) % print_every == 0:
            total_reward = sum(rewards)
            print(f"Episode {episode+1}/{num_episodes} | "
                  f"Reward: {total_reward:.2f} | "
                  f"Policy Loss: {policy_loss:.4f} | "
                  f"Value Loss: {value_loss:.4f}")

3.3 REINFORCE的问题

REINFORCE简单直观,但它有个致命的弱点:方差太高了

为什么方差高?因为累计回报 是从时刻t到游戏结束的所有奖励之和。如果游戏很长,或者奖励信号有噪声,这个值的波动就会很大。同一个状态s,可能因为后面发生的事情不同, 差别巨大。

高方差意味着梯度估计不稳定,训练收敛慢。你可能见过强化学习里那种”曲线像心电图一样”的训练过程,这就是方差在作怪。


四、基准函数Baseline:减少方差的简单技巧

4.1 什么是基线?

有一个特别巧妙的技巧可以降低方差,而且完全不会引入偏差。那就是引入一个只依赖于状态的函数 ,从优势函数里减去它:

这个技巧为什么有效?因为:

第二个等号成立是因为对数概率的梯度在概率分布上的期望为0,这是一个数学性质。换句话说,无论 是什么(只要它不依赖于动作a),从期望里减掉它都不会改变梯度。

4.2 怎么选基线?

虽然任意基线都不会引入偏差,但不同的基线降低方差的效果不一样。最优的基线是最小化方差的那个,而最优的基线恰好是状态价值函数 ——也就是”在状态s下,平均来说能获得多少回报”。

为什么是 ?直觉上, 衡量的是”选动作a比平均水平好多少”。如果某个动作的Q值和V值差不多,那它就是个中规中矩的选择,不值得特别增加或减少概率;如果Q值远高于V值,那它是个惊喜,应该提高概率。

数学上可以证明, 是最小化方差的最优基线选择。

4.3 优势函数

当我们用 作为基线时, 有一个专门的名字,叫优势函数,记作

优势函数的直观含义是:在状态s下选动作a,相对于”随便选个动作”的平均水平,是更好还是更差。

  • :选a比平均好,值得增加概率
  • :选a比平均差,应该减少概率
  • :选a就是平均水平

这就是为什么用优势函数比用原始回报更好——它去掉了与动作选择无关的环境因素造成的方差。

4.4 GAE:偏差-方差的权衡

在实际中, 我们都不知道,需要用函数近似来估计。这就引出了广义优势估计(GAE),它提供了一种可以调节偏差-方差权衡的方法。

def compute_gae(rewards, values, dones, gamma=0.99, lam=0.95):
    """
    广义优势估计(GAE)。
    
    Args:
        rewards: 奖励列表
        values: 价值估计列表
        dones: done标志列表
        gamma: 折扣因子
        lam: GAE参数,控制偏差-方差权衡
    
    Returns:
        advantages: 优势估计
    """
    advantages = []
    gae = 0
    
    # 从后往前计算
    for t in reversed(range(len(rewards))):
        if t == len(rewards) - 1:
            # 最后一个状态,下一个价值是0(因为没有后续状态了)
            next_value = 0
        else:
            next_value = values[t + 1]
        
        # TD误差
        delta = rewards[t] + gamma * next_value * (1 - dones[t]) - values[t]
        
        # GAE累加
        gae = delta + gamma * lam * (1 - dones[t]) * gae
        advantages.insert(0, gae)
    
    return torch.FloatTensor(advantages)

GAE的参数 控制了偏差和方差的权衡:

  • :只考虑一步TD误差,,方差最低但偏差较大
  • :相当于蒙特卡洛回报,无偏但方差最大

通常 是比较常见的选择,在偏差和方差之间取得好的平衡。


五、Actor-Critic:策略与价值的联姻

5.1 为什么需要Actor-Critic?

REINFORCE用蒙特卡洛方法估计回报,优点是无偏,缺点是方差大。如果我们知道真实的价值函数 ,就可以用它来代替采样回报,大幅降低方差。

但问题是,真实的 我们并不知道。

Actor-Critic的思路是:同时学习两个东西

  1. Actor(策略):负责选择动作,根据策略梯度更新
  2. Critic(评论家):负责估计价值函数,帮Actor降低方差

两全其美。

5.2 Actor-Critic的算法框架

class ActorCritic:
    """
    Actor-Critic算法框架。
    Actor负责策略,Critic负责价值估计。
    """
    
    def __init__(self, obs_dim, act_dim, hidden_dim=256, 
                 pi_lr=3e-4, vf_lr=1e-3, gamma=0.99, lam=0.95):
        
        self.gamma = gamma
        self.lam = lam
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        
        # Actor(策略网络)
        self.actor = GaussianPolicy(obs_dim, act_dim, hidden_dim).to(self.device)
        self.pi_optimizer = optim.Adam(self.actor.parameters(), lr=pi_lr)
        
        # Critic(价值网络)
        self.critic = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        ).to(self.device)
        self.vf_optimizer = optim.Adam(self.critic.parameters(), lr=vf_lr)
    
    def get_action(self, obs, deterministic=False):
        """选择动作。"""
        obs_tensor = torch.FloatTensor(obs).to(self.device).unsqueeze(0)
        action, _ = self.actor.sample(obs_tensor, deterministic)
        return action.cpu().numpy()[0]
    
    def compute_value(self, obs):
        """估计状态价值。"""
        obs_tensor = torch.FloatTensor(obs).to(self.device).unsqueeze(0)
        return self.critic(obs_tensor).item()
    
    def update(self, observations, actions, rewards, dones, next_observations):
        """
        执行一次Actor-Critic更新。
        """
        obs_tensor = torch.FloatTensor(np.array(observations)).to(self.device)
        actions_tensor = torch.FloatTensor(np.array(actions)).to(self.device)
        rewards_tensor = torch.FloatTensor(np.array(rewards)).to(self.device)
        dones_tensor = torch.FloatTensor(np.array(dones)).to(self.device)
        next_obs_tensor = torch.FloatTensor(np.array(next_observations)).to(self.device)
        
        # 1. Critic更新:用TD(0)目标更新价值估计
        with torch.no_grad():
            values = self.critic(obs_tensor).squeeze()
            next_values = self.critic(next_obs_tensor).squeeze()
            
            # TD目标
            td_targets = rewards_tensor + self.gamma * next_values * (1 - dones_tensor)
        
        # 价值损失
        values_pred = self.critic(obs_tensor).squeeze()
        vf_loss = nn.MSELoss()(values_pred, td_targets)
        
        self.vf_optimizer.zero_grad()
        vf_loss.backward()
        self.vf_optimizer.step()
        
        # 2. Actor更新:用优势函数更新策略
        with torch.no_grad():
            advantages = td_targets - values_pred.detach()
        
        _, log_probs = self.actor.sample(obs_tensor)
        
        # 策略损失
        pi_loss = -(log_probs.squeeze() * advantages.detach()).mean()
        
        self.pi_optimizer.zero_grad()
        pi_loss.backward()
        self.pi_optimizer.step()
        
        return pi_loss.item(), vf_loss.item()

5.3 三种方法的对比

特性REINFORCE值函数方法(Q学习)Actor-Critic
策略类型随机确定性随机/确定性均可
价值估计蒙特卡洛(高方差)TD学习(低方差)混合(可控方差)
动作空间连续/离散离散为主连续/离散均可
收敛性一般较好
样本效率中等

Actor-Critic结合了两边的优点:既有策略梯度处理连续动作的能力,又有值函数方法的低方差。


六、A2C与A3C:并行化加速训练

6.1 A2C:同步优势Actor-Critic

A2C(Advantage Actor-Critic)是Actor-Critic的标准化版本,主要改进是:

  1. 使用优势函数代替原始回报
  2. 引入熵正则化鼓励探索
class A2C:
    """同步优势Actor-Critic。"""
    
    def __init__(self, obs_dim, act_dim, hidden_dim=256,
                 pi_lr=3e-4, vf_lr=1e-3, gamma=0.99, lam=0.95,
                 ent_coef=0.01):
        
        self.gamma = gamma
        self.lam = lam
        self.ent_coef = ent_coef
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        
        # 策略网络
        self.actor = GaussianPolicy(obs_dim, act_dim, hidden_dim).to(self.device)
        self.pi_optimizer = optim.Adam(self.actor.parameters(), lr=pi_lr)
        
        # 价值网络
        self.critic = ValueNetwork(obs_dim, hidden_dim).to(self.device)
        self.vf_optimizer = optim.Adam(self.critic.parameters(), lr=vf_lr)
    
    def compute_loss(self, obs_batch, actions, advantages, returns):
        """计算总损失(策略损失 + 价值损失 + 熵奖励)。"""
        # 策略损失
        _, log_probs = self.actor.sample(obs_batch)
        pi_loss = -(log_probs * advantages.detach()).mean()
        
        # 价值损失
        values_pred = self.critic(obs_batch).squeeze()
        vf_loss = nn.MSELoss()(values_pred, returns.detach())
        
        # 熵损失(熵越高越好,所以是减)
        with torch.no_grad():
            _, log_probs = self.actor.sample(obs_batch)
            entropy = -(log_probs.exp() * log_probs).mean()
        
        total_loss = pi_loss + 0.5 * vf_loss - self.ent_coef * entropy
        
        return total_loss, pi_loss, vf_loss, entropy

6.2 A3C:异步并行Actor-Critic

A3C(Asynchronous Advantage Actor-Critic)由DeepMind的Mnih等人2016年提出,是深度强化学习的里程碑算法。

它的核心思想是:多线程并行探索不同的轨迹,然后把学到的东西汇总到全局网络上。

import threading
import multiprocessing as mp
from queue import Queue
 
class A3C:
    """
    异步优势Actor-Critic。
    使用多线程并行收集样本,加速训练。
    """
    
    def __init__(self, obs_dim, act_dim, num_workers=4,
                 lr=3e-4, gamma=0.99, lam=0.95, ent_coef=0.01,
                 max_steps=20):
        
        self.num_workers = num_workers
        self.gamma = gamma
        self.lam = lam
        self.ent_coef = ent_coef
        self.max_steps = max_steps
        
        # 全局网络
        self.global_actor = GaussianPolicy(obs_dim, act_dim).to('cpu')
        self.global_critic = ValueNetwork(obs_dim).to('cpu')
        self.global_optimizer = optim.Adam(
            list(self.global_actor.parameters()) + list(self.global_critic.parameters()),
            lr=lr
        )
        
        # 共享内存
        self.global_actor.share_memory()
        self.global_critic.share_memory()
        
        # 全局训练步数
        self.global_step = mp.Value('i', 0)
    
    def pull_from_global(self, local_actor, local_critic):
        """worker从全局网络拉取参数。"""
        local_actor.load_state_dict(self.global_actor.state_dict())
        local_critic.load_state_dict(self.global_critic.state_dict())
    
    def push_to_global(self, actor_grads, critic_grads):
        """worker推送梯度到全局网络。"""
        for param, grad in zip(self.global_actor.parameters(), actor_grads):
            if grad is not None:
                param._grad = grad
        for param, grad in zip(self.global_critic.parameters(), critic_grads):
            if grad is not None:
                param._grad = grad
        self.global_optimizer.step()
    
    def worker(self, worker_id, env_name, result_queue):
        """单个worker线程的工作流程。"""
        # 创建本地网络
        local_actor = GaussianPolicy(self.obs_dim, self.act_dim).to('cpu')
        local_critic = ValueNetwork(self.obs_dim).to('cpu')
        
        env = gym.make(env_name)
        
        while self.global_step.value < 50000:
            # 1. 同步全局参数
            self.pull_from_global(local_actor, local_critic)
            
            # 2. 收集样本
            obs = env.reset()
            episode_reward = 0
            
            obs_buffer, actions_buffer, rewards_buffer = [], [], []
            values_buffer, dones_buffer = [], []
            
            for step in range(self.max_steps):
                action, log_prob = local_actor.sample(
                    torch.FloatTensor(obs).unsqueeze(0)
                )
                action = action.numpy()[0]
                
                next_obs, reward, done, _ = env.step(action)
                
                obs_buffer.append(obs)
                actions_buffer.append(action)
                rewards_buffer.append(reward)
                values_buffer.append(local_critic(torch.FloatTensor(obs).unsqueeze(0)).item())
                dones_buffer.append(done)
                
                episode_reward += reward
                obs = next_obs
                
                if done:
                    break
            
            # 3. 计算GAE优势
            if done:
                next_value = 0
            else:
                next_value = local_critic(torch.FloatTensor(next_obs).unsqueeze(0)).item()
            
            advantages, returns = compute_gae_torch(
                rewards_buffer, values_buffer, dones_buffer,
                next_value, self.gamma, self.lam
            )
            
            # 4. 计算损失并反向传播
            obs_batch = torch.FloatTensor(np.array(obs_buffer))
            actions_batch = torch.FloatTensor(np.array(actions_buffer))
            
            _, log_probs = local_actor.sample(obs_batch)
            values_pred = local_critic(obs_batch).squeeze()
            
            pi_loss = -(log_probs * advantages).mean()
            vf_loss = nn.MSELoss()(values_pred, returns)
            entropy = -(log_probs.exp() * log_probs).mean()
            
            total_loss = pi_loss + 0.5 * vf_loss - self.ent_coef * entropy
            
            # 本地反向传播
            total_loss.backward()
            
            # 5. 推送梯度到全局
            actor_grads = [p.grad for p in local_actor.parameters()]
            critic_grads = [p.grad for p in local_critic.parameters()]
            
            self.push_to_global(actor_grads, critic_grads)
            
            with self.global_step.get_lock():
                self.global_step.value += 1
            
            if done:
                result_queue.put((worker_id, episode_reward))
    
    def train(self, env_name):
        """启动多线程训练。"""
        result_queue = Queue()
        threads = []
        
        for worker_id in range(self.num_workers):
            t = threading.Thread(
                target=self.worker,
                args=(worker_id, env_name, result_queue)
            )
            t.start()
            threads.append(t)
        
        # 收集结果
        for t in threads:
            t.join()

6.3 A3C为什么有效?

A3C的成功来自于几个因素:

  1. 探索多样性:多个线程同时探索,它们遇到的状态-动作对分布更广,学到的策略泛化性更好
  2. 去相关样本:异步收集的样本自然地去掉了时间相关性,这对神经网络训练很重要
  3. 计算并行:充分利用多核CPU,加速训练过程
  4. 噪声梯度:异步更新带来的”噪声梯度”反而有正则化效果,帮助跳出局部最优

七、策略梯度 vs Q学习:什么时候用哪种?

7.1 连续动作空间 vs 离散动作空间

这是最直接的选择依据:

  • 连续动作空间(机器人控制、物理模拟):必须用策略梯度。Q学习在这种场景下要么需要离散化(丢失精度),要么需要复杂的处理(如NAF),都不如策略梯度直接。
  • 离散动作空间(游戏、推荐系统):Q学习和策略梯度都可以。但Q学习通常更简单、更样本高效。

7.2 随机策略 vs 确定性策略

如果任务需要随机策略(比如博弈、不完全信息游戏),策略梯度是更自然的选择。Q学习想学随机策略需要额外技巧。

7.3 收敛性考虑

策略梯度在理论上有更好的收敛保证——它的优化目标是凸的(或近似凸的),而Q学习的优化问题更复杂,可能出现振荡。

7.4 实用建议

场景推荐方法
连续控制(机器人)PPO、SAC(策略梯度变体)
离散动作游戏DQN、Rainbow
需要随机策略策略梯度、A2C
样本有限DQN(样本效率高)
收敛困难PPO(更稳定)

八、连续动作空间:策略梯度解决了Q学习的什么问题?

8.1 Q学习处理连续动作的困境

假设你要控制一个机械臂,关节角度可以是0到180度之间的任意实数。用Q学习的话,你需要学习 ,但这里的动作 是连续的。

困境一:无法枚举所有动作。在离散动作空间,Q学习遍历所有可能的动作取最大值。但在连续空间,你根本没法遍历所有动作。

困境二:argmax没有闭式解。即使你想找最优动作,你需要解决 。对于一般的Q函数,这个优化问题没有解析解,而且可能非凸。

已有的解决方案包括:

  • DDPG:用神经网络同时学习策略和Q函数,但策略是确定性的
  • NAF:把Q函数约束成特殊的二次形式,让argmax有解析解
  • 交叉熵方法:用采样和拟合来近似argmax

这些方法都有这样那样的限制,而且往往只能学到确定性策略。

8.2 策略梯度的天然优势

策略梯度完全绕过了这个问题。它根本不需要做那个麻烦的argmax。策略网络 直接输出动作的分布,从这个分布里采样就行了。

也就是说,Q学习是把”选择最好的动作”这个问题留给了在线优化,而策略梯度把这个问题转化成了”拟合一个好的概率分布”,这是神经网络擅长的事情。


九、策略熵正则化:为什么鼓励探索?

9.1 什么是熵?

在信息论里,熵 衡量的是概率分布的不确定性或随机性。分布越均匀,熵越大;分布越尖锐(集中在某个点),熵越小。

在策略梯度里,策略 的熵 衡量的是策略的随机程度。

9.2 为什么要鼓励高熵?

如果训练过程中策略变得过于确定(低熵),比如在某个状态下总是选同一个动作,会出现两个问题:

  1. 过早收敛到次优策略。智能体可能学到第一个看起来不错的策略,然后就停止探索其他可能性了。
  2. 无法应对环境变化。确定的策略在环境变化时适应性差。

所以我们可以在损失函数里加一项熵奖励:

其中 是熵系数,控制探索强度。减去熵(或者加上负熵),就是鼓励高熵的策略。

9.3 实践中怎么用?

def compute_policy_loss_with_entropy(obs, actions, advantages, actor, ent_coef=0.01):
    """
    带熵正则化的策略损失。
    """
    _, log_probs = actor.sample(obs)
    
    # 策略梯度损失
    policy_loss = -(log_probs * advantages.detach()).mean()
    
    # 熵(需要最大化,所以是减)
    entropy = -(log_probs.exp() * log_probs).mean()
    
    # 总损失
    total_loss = policy_loss - ent_coef * entropy
    
    return total_loss, entropy

实践中,熵系数通常设在 0.001 到 0.1 之间,具体取决于任务。


十、PPO与策略梯度:近端策略优化如何改进?

10.1 策略梯度的稳定性问题

普通策略梯度有个讨厌的问题:策略更新太大。每更新一次参数,策略可能变化剧烈,导致下次采样的分布和上次完全不同,之前的”经验”就失效了。

这叫策略崩溃训练不稳定

10.2 PPO的核心思想

PPO(Proximal Policy Optimization,近端策略优化)的核心是一个简单的约束:不要让策略更新得太远

PPO用了一个叫做”裁剪代理目标”的技巧:

其中 是新旧策略的概率比, 通常设为0.2。

当某个动作的优势为正(应该增加概率)但概率比太大(增加太多了),裁剪会阻止进一步的增加。反之亦然。

def ppo_loss(obs, actions, old_log_probs, advantages, actor, eps=0.2):
    """
    PPO裁剪损失。
    """
    _, new_log_probs = actor.sample(obs)
    
    # 概率比
    ratio = (new_log_probs - old_log_probs).exp()
    
    # 未裁剪的损失
    surr1 = ratio * advantages
    
    # 裁剪后的损失
    surr2 = torch.clamp(ratio, 1 - eps, 1 + eps) * advantages
    
    # 取较小的那个
    policy_loss = -torch.min(surr1, surr2).mean()
    
    return policy_loss

10.3 PPO vs 普通策略梯度

特性普通PGPPO
稳定性
超参敏感度
样本效率
实现复杂度
调参难度相对容易

PPO现在是连续控制任务的事实标准,因为它既稳定又高效。


十一、动手实验:用策略梯度训练软体机器人

11.1 实验环境

让我们用策略梯度训练一个简单的软体机器人(hopper)完成跳跃任务。这是MuJoCo环境套件里的经典任务。

import gym
import torch
import numpy as np
 
# 创建环境
env = gym.make('Hopper-v3')
 
# 获取状态和动作维度
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
 
print(f"状态维度: {obs_dim}, 动作维度: {act_dim}")
 
# 创建A2C智能体
agent = A2C(obs_dim, act_dim, hidden_dim=256, pi_lr=3e-4, vf_lr=1e-3)

11.2 训练循环

def train_continuous_control(env, agent, num_episodes=1000, 
                              rollout_steps=2048, update_epochs=10):
    """
    训练软体机器人。
    """
    episode_rewards = []
    
    for episode in range(num_episodes):
        # 收集 rollout
        obs_buffer, actions_buffer, rewards_buffer = [], [], []
        dones_buffer = []
        
        obs = env.reset()
        episode_reward = 0
        
        for step in range(rollout_steps):
            action = agent.get_action(obs)
            next_obs, reward, done, _ = env.step(action)
            
            obs_buffer.append(obs)
            actions_buffer.append(action)
            rewards_buffer.append(reward)
            dones_buffer.append(done)
            
            episode_reward += reward
            obs = next_obs
            
            if done:
                obs = env.reset()
                episode_rewards.append(episode_reward)
                episode_reward = 0
        
        # 准备下一个观测(用于TD计算)
        next_obs_buffer = obs_buffer[1:] + [obs]
        
        # 转换为数组
        obs_batch = np.array(obs_buffer)
        actions_batch = np.array(actions_buffer)
        rewards_batch = np.array(rewards_buffer)
        dones_batch = np.array(dones_buffer)
        next_obs_batch = np.array(next_obs_buffer)
        
        # 更新多次
        for _ in range(update_epochs):
            pi_loss, vf_loss = agent.update(
                obs_batch, actions_batch, rewards_batch,
                dones_batch, next_obs_batch
            )
        
        # 打印进度
        if (episode + 1) % 10 == 0:
            mean_reward = np.mean(episode_rewards[-10:])
            print(f"Episode {episode+1}/{num_episodes} | "
                  f"最近10轮平均奖励: {mean_reward:.2f} | "
                  f"PI Loss: {pi_loss:.4f} | "
                  f"VF Loss: {vf_loss:.4f}")
    
    return episode_rewards
 
# 开始训练
rewards = train_continuous_control(env, agent, num_episodes=500)

11.3 训练技巧

  1. 归一化观测。对观测做running mean和std归一化,能显著加速收敛。
  2. 价值归一化。对回报做归一化也有帮助。
  3. 梯度裁剪。防止梯度爆炸。
  4. 学习率调度。训练后期适当降低学习率。
  5. 早停。如果连续多轮没有提升,可以考虑早停或调整策略。
# 常用的训练技巧包装器
class NormalizedEnv:
    """观测归一化包装器。"""
    
    def __init__(self, env):
        self.env = env
        self.obs_mean = np.zeros(env.observation_space.shape)
        self.obs_var = np.ones(env.observation_space.shape)
        self.count = 1e-4
    
    def update_stats(self, obs):
        self.count += 1
        delta = obs - self.obs_mean
        self.obs_mean += delta / self.count
        delta2 = obs - self.obs_mean
        self.obs_var += delta * delta2
    
    def normalize(self, obs):
        return (obs - self.obs_mean) / np.sqrt(self.obs_var + 1e-8)

11.4 常见问题排查

问题原因解决方案
训练不收敛学习率太大/太小调整学习率,尝试3e-4
策略崩溃策略更新太大用PPO或减小更新幅度
价值估计不准Critic容量不够增加Critic隐藏层大小
熵衰减太快熵系数太小增加ent_coef到0.1
梯度爆炸奖励尺度不一致归一化奖励

十二、数学形式化总结

12.1 核心公式

策略梯度定理

优势函数

GAE(λ)

其中

12.2 算法演变图谱

策略梯度
├── REINFORCE(蒙特卡洛PG,高方差)
│   └── 引入基线 → 方差缩减
│       └── 用价值函数作为最优基线
│           └── Actor-Critic(低方差)
│               ├── A2C(同步,更新更稳定)
│               └── A3C(异步并行,加速训练)
│                   └── GAE(优势估计的改进)
│                       └── PPO(近端约束,防崩溃)

十三、相关文档

  • MDP与Bellman方程 — 强化学习的数学基础
  • Q学习 — 值函数方法的代表
  • DQN — 深度值函数方法
  • PPO — 策略梯度的稳定改进
  • SAC — 最大熵策略梯度

参考文献

  1. Sutton, R. S., & Barto, A. G. (2018). Reinforcement Learning: An Introduction (2nd ed.). MIT Press.
  2. Williams, R. J. (1992). Simple statistical gradient-following algorithms for connectionist reinforcement learning. Machine Learning, 8(3-4), 229-256.
  3. Mnih, V., et al. (2016). Asynchronous methods for deep reinforcement learning. ICML, 1928-1937.
  4. Schulman, J., Levine, S., Abbeel, P., Jordan, M., & Moritz, P. (2015). Trust region policy optimization. ICML, 1889-1897.
  5. Schulman, J., Wolski, F., Dhariwal, P., Radford, A., & Klimov, O. (2017). Proximal policy optimization algorithms. arXiv:1707.06347.
  6. Haarnoja, T., et al. (2018). Soft actor-critic: Off-policy maximum entropy deep reinforcement learning with a stochastic actor. ICML, 1861-1870.

策略梯度方法为连续控制问题提供了优雅的解决方案,是现代强化学习的重要支柱。从REINFORCE到PPO,这条技术路线不断演进,在稳定性、效率和泛化性上持续改进。掌握策略梯度的核心思想,你就能更好地理解强化学习的本质——在不确定环境中学习和决策。