策略梯度方法详解

关键词速览

核心概念策略梯度定理基线方差缩减Actor-Critic
REINFORCEA3C优势函数策略参数化软更新

核心关键词表

术语英文符号/技术说明
策略梯度Policy Gradient策略参数的梯度方向
策略梯度定理PGT策略梯度的理论基础
REINFORCEREINFORCEMonte Carlo PG基于回报的策略梯度估计
基线Baseline缩减方差但不引入偏差
优势函数Advantage相对基线的优势
Actor-CriticActor-CriticAC结合值函数和策略
A3CAsynchronous ACA3C异步并行Actor-Critic
策略参数化Policy Param策略的函数表示
价值基线Value Baseline状态价值函数作为基线
回报估计Return累积折扣回报

一、策略梯度方法概述

与基于值函数的方法(如Q学习DQN)不同,策略梯度方法直接对策略进行参数化优化。这种方法天然适合连续动作空间问题,且具有收敛性更好的理论保证。本章将系统介绍策略梯度方法的理论基础、核心算法和实践技巧。

1.1 为什么需要策略梯度?

策略梯度方法相较于值函数方法具有以下优势:

策略梯度 vs 值函数方法

策略梯度的优势:

  1. 连续动作空间:自然处理连续动作,无需离散化或argmax操作
  2. 随机策略:可以学习真正的随机策略,捕获环境不确定性
  3. 收敛性:在策略空间中优化,局部最优解即为策略的稳定点
  4. 探索性:策略的随机性天然提供探索,无需额外ε-greedy

值函数方法的局限:

  • 连续动作空间需要额外的连续动作处理机制(如NAF、Steering)
  • Q值最大化的确定性策略难以处理多峰分布
  • 策略震荡可能导致训练不稳定

1.2 策略参数化

策略 由参数向量 定义。对于离散动作,使用softmax函数:

对于连续动作,使用高斯策略:

神经网络参数化时,策略网络输出动作分布(如均值和方差):

class GaussianPolicy(nn.Module):
    """Continuous action Gaussian policy."""
    
    def __init__(self, obs_dim, act_dim, hidden_dim=256):
        super().__init__()
        
        self.actor = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        
        # 均值和标准差输出头
        self.mean = nn.Linear(hidden_dim, act_dim)
        self.log_std = nn.Linear(hidden_dim, act_dim)
    
    def forward(self, obs):
        features = self.actor(obs)
        mean = self.mean(features)
        log_std = self.log_std(features).clamp(-20, 2)
        std = log_std.exp()
        return mean, std
    
    def sample(self, obs):
        mean, std = self.forward(obs)
        normal = torch.distributions.Normal(mean, std)
        x_t = normal.rsample()  # 重参数化采样
        action = torch.tanh(x_t)  # Squash to [-1, 1]
        return action

二、策略梯度定理

策略梯度定理(Policy Gradient Theorem, PGT)是策略梯度方法的理论基石,由Sutton等人于1999年提出。

2.1 目标函数

策略梯度方法的目标是最大化期望累积回报:

其中 是由策略 生成的轨迹。

2.2 策略梯度定理

策略梯度定理的核心断言是:

更精确地,对于状态-动作轨迹:

策略梯度定理的直观理解

定理的核心洞察是:增加导致高回报的动作的概率,减少导致低回报的动作的概率。对数概率梯度 指示了”如何调整参数以增加 “,而 提供了”是否应该增加”的信号。

2.3 证明概要

策略梯度定理的证明基于概率图模型的梯度计算。关键步骤:

  1. 轨迹概率分解
  1. 对数梯度恒等式
  1. 期望梯度交换

三、REINFORCE算法

REINFORCE是最基础的策略梯度算法,由Williams于1992年提出,基于蒙特卡洛回报估计。

3.1 算法推导

使用无折扣回报 作为 的无偏估计:

3.2 REINFORCE完整实现

class REINFORCE:
    """REINFORCE algorithm with baseline."""
    
    def __init__(self, obs_dim, act_dim, hidden_dim=128, 
                 lr=3e-4, gamma=0.99, device='cuda'):
        
        self.gamma = gamma
        self.device = device
        
        # 策略网络
        self.policy = GaussianPolicy(obs_dim, act_dim, hidden_dim).to(device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        
        # 价值网络(可选基线)
        self.value_net = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        ).to(device)
        self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr)
    
    def select_action(self, obs, deterministic=False):
        """Sample action from policy."""
        obs_tensor = torch.FloatTensor(obs).to(self.device).unsqueeze(0)
        
        with torch.no_grad():
            mean, std = self.policy(obs_tensor)
            if deterministic:
                action = mean
            else:
                dist = torch.distributions.Normal(mean, std)
                action = dist.sample()
            action = torch.tanh(action)
        
        return action.cpu().numpy()[0]
    
    def compute_returns(self, rewards, dones):
        """Compute discounted returns."""
        returns = []
        discounted = 0
        
        for reward, done in zip(reversed(rewards), reversed(dones)):
            discounted = reward + self.gamma * discounted * (1 - done)
            returns.insert(0, discounted)
        
        return torch.FloatTensor(returns)
    
    def update(self, obs_batch, actions, rewards, dones, next_obs_batch=None):
        """Perform one gradient update."""
        # 计算回报
        returns = self.compute_returns(rewards, dones).to(self.device)
        
        # 可选:计算优势函数
        with torch.no_grad():
            values = self.value_net(torch.FloatTensor(obs_batch).to(self.device)).squeeze()
            advantages = returns - values
        
        # 策略梯度更新
        obs_tensor = torch.FloatTensor(obs_batch).to(self.device)
        actions_tensor = torch.FloatTensor(actions).to(self.device)
        
        mean, std = self.policy(obs_tensor)
        dist = torch.distributions.Normal(mean, std)
        
        # 对数概率
        log_probs = dist.log_prob(actions_tensor)
        
        # 策略损失(负号因为梯度上升)
        policy_loss = -(log_probs * advantages.detach()).mean()
        
        self.optimizer.zero_grad()
        policy_loss.backward()
        self.optimizer.step()
        
        # 价值网络更新(如果使用基线)
        if self.value_net is not None:
            value_loss = nn.MSELoss()(values, returns)
            self.value_optimizer.zero_grad()
            value_loss.backward()
            self.value_optimizer.step()
        
        return policy_loss.item(), value_loss.item() if self.value_net else 0
 
def collect_trajectory(env, agent, max_steps=1000):
    """Collect one trajectory using current policy."""
    obs = env.reset()
    observations, actions, rewards, dones = [], [], [], []
    
    for step in range(max_steps):
        action = agent.select_action(obs)
        next_obs, reward, done, _ = env.step(action)
        
        observations.append(obs)
        actions.append(action)
        rewards.append(reward)
        dones.append(done)
        
        obs = next_obs
        if done:
            break
    
    return observations, actions, rewards, dones
 
def train_reinforce(env, agent, num_episodes=1000, max_steps=1000):
    """Train REINFORCE agent."""
    for episode in range(num_episodes):
        # 收集轨迹
        obs, actions, rewards, dones = collect_trajectory(env, agent, max_steps)
        
        # 更新策略
        policy_loss, value_loss = agent.update(obs, actions, rewards, dones)
        
        if (episode + 1) % 10 == 0:
            total_reward = sum(rewards)
            print(f"Episode {episode+1}, Reward: {total_reward:.2f}, "
                  f"Policy Loss: {policy_loss:.4f}")

四、基线与方差缩减

4.1 引入基线

策略梯度估计的高方差是核心挑战之一。引入状态相关基线 可以缩减方差而不引入偏差:

无偏性证明

基线的选择

最优基线是最小化方差的状态价值函数 ,此时 (优势函数)。

4.2 优势函数

优势函数 度量了”在状态 下采取动作 相对于平均水平的优势”。

def compute_gae(rewards, values, dones, gamma=0.99, lam=0.95):
    """
    Generalized Advantage Estimation (GAE).
    Combines multi-step returns with TD errors.
    """
    advantages = []
    gae = 0
    
    for t in reversed(range(len(rewards))):
        if t == len(rewards) - 1:
            next_value = 0
        else:
            next_value = values[t + 1]
        
        delta = rewards[t] + gamma * next_value * (1 - dones[t]) - values[t]
        gae = delta + gamma * lam * (1 - dones[t]) * gae
        advantages.insert(0, gae)
    
    return torch.FloatTensor(advantages)

五、Actor-Critic架构

Actor-Critic将策略梯度(Actor)与值函数近似(Critic)结合,同时获得低方差和高效率。

5.1 算法框架

class ActorCritic:
    """
    Actor-Critic with separate policy and value networks.
    """
    
    def __init__(self, obs_dim, act_dim, hidden_dim=256, 
                 pi_lr=3e-4, vf_lr=1e-3, gamma=0.99, lam=0.95):
        
        # Actor (策略)
        self.actor = GaussianPolicy(obs_dim, act_dim, hidden_dim)
        self.pi_optimizer = optim.Adam(self.actor.parameters(), lr=pi_lr)
        
        # Critic (价值函数)
        self.critic = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
        self.vf_optimizer = optim.Adam(self.critic.parameters(), lr=vf_lr)
        
        self.gamma = gamma
        self.lam = lam
    
    def get_action(self, obs, deterministic=False):
        mean, std = self.actor(obs)
        if deterministic:
            action = mean
        else:
            dist = torch.distributions.Normal(mean, std)
            action = dist.sample()
        return torch.tanh(action)
    
    def compute_v(self, obs):
        return self.critic(obs)
    
    def update(self, obs_batch, actions, rewards, dones, next_obs_batch):
        """
        Perform Actor-Critic update with GAE.
        """
        with torch.no_grad():
            values = self.critic(torch.FloatTensor(obs_batch)).squeeze()
            next_values = self.critic(torch.FloatTensor(next_obs_batch)).squeeze()
            next_values = next_values * (1 - torch.FloatTensor(dones))
            
            # GAE
            deltas = rewards + self.gamma * next_values - values
            advantages = compute_gae(rewards, values.tolist(), dones, 
                                    self.gamma, self.lam).to(self.device)
            
            # 回报 = 优势 + 价值基线
            returns = advantages + values.detach()
        
        # Critic更新:最小化价值误差
        values_pred = self.critic(torch.FloatTensor(obs_batch)).squeeze()
        vf_loss = nn.MSELoss()(values_pred, returns)
        
        self.vf_optimizer.zero_grad()
        vf_loss.backward()
        self.vf_optimizer.step()
        
        # Actor更新:策略梯度
        mean, std = self.actor(torch.FloatTensor(obs_batch))
        dist = torch.distributions.Normal(mean, std)
        log_probs = dist.log_prob(torch.FloatTensor(actions))
        
        # 使用GAE优势
        pi_loss = -(log_probs * advantages.detach()).mean()
        
        self.pi_optimizer.zero_grad()
        pi_loss.backward()
        self.pi_optimizer.step()
        
        return pi_loss.item(), vf_loss.item()

5.2 Actor-Critic的优势

特性REINFORCEActor-Critic
方差低(值函数基线)
偏差可能有(函数逼近)
样本效率低(全轨迹)高(单步TD)
计算复杂度中等

六、A3C算法

异步优势Actor-Critic(Asynchronous Advantage Actor-Critic, A3C)由DeepMind的Mnih等人于2016年提出,是深度强化学习的里程碑算法。

6.1 异步训练框架

A3C的核心创新是使用多线程异步训练,每个线程独立与环境交互,定期同步梯度到全局网络。

import threading
import multiprocessing as mp
 
class A3CAgent:
    """Asynchronous Advantage Actor-Critic."""
    
    def __init__(self, obs_dim, act_dim, global_episode_counter,
                 lr=3e-4, gamma=0.99, lam=0.95, ent_coef=0.01):
        
        self.obs_dim = obs_dim
        self.act_dim = act_dim
        self.gamma = gamma
        self.lam = lam
        self.ent_coef = ent_coef
        
        # 全局网络
        self.global_policy = ActorCriticNetwork(obs_dim, act_dim)
        self.global_policy.share_memory()
        self.optimizer = optim.Adam(self.global_policy.parameters(), lr=lr)
        
        # 全局 episode 计数器
        self.global_episode_counter = global_episode_counter
    
    def pull_from_global(self, local_model):
        """同步全局网络参数到本地."""
        local_model.load_state_dict(self.global_policy.state_dict())
    
    def push_to_global(self, gradients):
        """推送梯度到全局网络."""
        for param, grad in zip(self.global_policy.parameters(), gradients):
            param._grad = grad
    
    def worker(self, worker_id, env_name, num_steps=20):
        """单个worker的工作流程."""
        env = gym.make(env_name)
        local_model = ActorCriticNetwork(self.obs_dim, self.act_dim)
        agent = A3CWrapper(local_model)
        
        while self.global_episode_counter.value < 10000:
            # 同步全局参数
            self.pull_from_global(local_model)
            
            obs = env.reset()
            episode_reward = 0
            episode_steps = 0
            
            obs_buffer, actions_buffer, rewards_buffer = [], [], []
            values_buffer, dones_buffer = [], []
            
            for step in range(num_steps):
                action = local_model.get_action(torch.FloatTensor(obs))
                next_obs, reward, done, _ = env.step(action.numpy())
                
                obs_buffer.append(obs)
                actions_buffer.append(action)
                rewards_buffer.append(reward)
                values_buffer.append(local_model.get_v(torch.FloatTensor(obs)).item())
                dones_buffer.append(done)
                
                episode_reward += reward
                episode_steps += 1
                obs = next_obs
                
                if done:
                    break
            
            # 计算优势和回报
            next_value = 0 if done else local_model.get_v(torch.FloatTensor(next_obs)).item()
            advantages, returns = compute_gae_torch(
                rewards_buffer, values_buffer, dones_buffer, 
                next_value, self.gamma, self.lam
            )
            
            # 计算损失并反向传播
            pi_loss, v_loss, entropy = self.compute_loss(
                obs_buffer, actions_buffer, advantages, returns, local_model
            )
            total_loss = pi_loss + 0.5 * v_loss - self.ent_coef * entropy
            
            self.optimizer.zero_grad()
            total_loss.backward()
            torch.nn.utils.clip_grad_norm_(local_model.parameters(), 50)
            
            # 推送梯度
            self.push_to_global([p.grad for p in self.global_policy.parameters()])
            self.optimizer.step()
            
            if done:
                with self.global_episode_counter.get_lock():
                    self.global_episode_counter.value += 1
                if self.global_episode_counter.value % 100 == 0:
                    print(f"Worker {worker_id}, Episode {self.global_episode_counter.value}")
                
                obs = env.reset()
                episode_reward = 0

6.2 A3C的关键创新

  1. 异步训练:多线程并行探索,提高样本多样性
  2. 优势函数:使用 缩减方差
  3. 多步回报:平衡偏差和方差(默认n=5)
  4. 熵正则化:鼓励探索,防止策略早熟收敛

A3C调参经验

  • 学习率:通常使用较大的值(3e-4),因为异步带来噪声
  • 线程数:4-16个,根据CPU核心数调整
  • 熵系数:0.01左右,鼓励探索但不过度随机
  • 折扣因子:0.99-0.999

七、策略梯度的挑战与解决方案

7.1 收敛性挑战

问题原因解决方案
策略崩溃策略方差过小熵正则化
高方差长轨迹累积GAE、基线
策略振荡梯度估计噪声信任域方法(PPO

7.2 方差-偏差权衡

策略梯度面临经典的偏差-方差权衡:

  • Monte Carlo估计(REINFORCE):无偏但高方差
  • TD估计(Actor-Critic):有偏但低方差
  • GAE:可调的偏差-方差平衡
# GAE作为连续统
# λ=0: TD(0),低方差高偏差
# λ=1: Monte Carlo,无偏高方差
advantages = compute_gae(rewards, values, dones, gamma=0.99, lam=0.95)

八、数学形式化总结

策略梯度定理

优势函数

GAE(λ)

其中


九、相关文档


参考文献

  1. Sutton, R. S., & Barto, A. G. (2018). Reinforcement Learning: An Introduction (2nd ed.). MIT Press.
  2. Williams, R. J. (1992). Simple statistical gradient-following algorithms for connectionist reinforcement learning. Machine Learning, 8(3-4), 229-256.
  3. Mnih, V., et al. (2016). Asynchronous methods for deep reinforcement learning. ICML, 1928-1937.
  4. Schulman, J., Levine, S., Abbeel, P., Jordan, M., & Moritz, P. (2015). Trust region policy optimization. ICML, 1889-1897.
  5. Schulman, J., Wolski, F., Dhariwal, P., Radford, A., & Klimov, O. (2017). Proximal policy optimization algorithms. arXiv:1707.06347.
  6. Mnih, V., et al. (2014). Recurrent models of visual attention. NIPS, 2204-2212.

策略梯度方法为连续控制问题提供了优雅的解决方案,是现代强化学习的重要支柱。