强化学习调参与工程实战

强化学习调参的独特挑战

强化学习的调参跟其他机器学习不太一样,有几个让人头秃的特点:

第一,数据不独立同分布(Non-i.i.d.)。普通监督学习里,每个样本都是独立抽样的。但在RL里,你的训练数据是由当前策略生成的,策略在变,数据分布也在变。你改了一个超参数,收集的数据全变了,可能导致其他超参数的行为也变了。这就像你在调一个会自我改变的机器。

第二,训练曲线像心电图。监督学习的loss通常单调下降,RL的reward曲线可能今天涨明天跌后天又涨回去。有时候看起来收敛了,结果过两天又崩了。所以判断”训练好了”本身就是个技术活。

第三,不同环境差异巨大。CartPole调好的参数放到Pendulum上可能完全不好用。不同动作空间、奖励尺度、状态维度都需要不同的调参策略。

第四,随机种子影响大。RL对随机种子敏感得离谱。同一套参数,换个种子可能一个能收敛一个不能。所以报告结果时要同时报mean和std。

def set_seed(seed=42):
    """设置所有随机种子"""
    import random
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    
    # Gymnasium的seed
    import gymnasium as gym
    gym.envs.registration.register(id='CustomEnv-v0', ...)
    
    # 确保结果可复现
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
 
def multiple_seed_run(agent_class, env_name, seeds=[42, 43, 44, 45, 46]):
    """用多个种子跑实验"""
    results = []
    
    for seed in seeds:
        set_seed(seed)
        agent = agent_class()
        result = train(agent, env_name)
        results.append(result)
    
    mean = np.mean(results)
    std = np.std(results)
    
    print(f"Results over {len(seeds)} seeds:")
    print(f"  Mean: {mean:.2f}")
    print(f"  Std: {std:.2f}")
    print(f"  Min: {np.min(results):.2f}")
    print(f"  Max: {np.max(results):.2f}")
    
    return mean, std

奖励塑造:设计有效的奖励函数

奖励函数是RL里最玄学也最重要的部分。设计得好,智能体学得又快又好;设计得差,可能学出奇怪的行为甚至完全学不会。

奖励成形(Reward Shaping)

Luketina等人证明了:如果额外奖励满足以下条件,那么最优策略不变

其中是某个势函数。这就是著名的势函数重塑定理。

实践中的奖励工程往往更粗暴直接:

class RewardShaping:
    """
    奖励函数工程
    """
    
    def __init__(self, reward_weights):
        self.weights = reward_weights
        # reward_weights = {
        #     'progress': 1.0,      # 向目标前进
        #     'collision': -10.0,   # 碰撞惩罚
        #     'energy': -0.01,      # 能量消耗
        #     'success': 100.0,    # 成功奖励
        # }
    
    def compute_reward(self, state, action, next_state, info):
        reward = 0
        
        # 1. 任务进展奖励
        if 'distance_to_goal' in info:
            progress = info.get('prev_distance', 0) - info['distance_to_goal']
            reward += self.weights['progress'] * progress
        
        # 2. 碰撞惩罚
        if info.get('collision', False):
            reward += self.weights['collision']
        
        # 3. 能量惩罚(鼓励省力)
        if 'energy_used' in info:
            reward -= self.weights['energy'] * info['energy_used']
        
        # 4. 成功/失败奖励
        if info.get('success', False):
            reward += self.weights['success']
        
        return reward
 
class SparseToDenseWrapper:
    """
    把稀疏奖励转成密集奖励(稀疏奖励太难学)
    """
    
    def __init__(self, env, goal_threshold=0.1):
        self.env = env
        self.goal_threshold = goal_threshold
        self.goal = None
    
    def reset(self):
        obs, info = self.env.reset()
        self.goal = info.get('goal', obs[:3])  # 假设goal在obs里
        return obs
    
    def step(self, action):
        obs, sparse_reward, done, truncated, info = self.env.step(action)
        
        # 计算密集奖励
        dense_reward = self.compute_dense_reward(obs, action, info)
        
        # 如果成功,给额外奖励
        if sparse_reward > 0:
            dense_reward += 100
        
        return obs, dense_reward, done, truncated, info
    
    def compute_dense_reward(self, obs, action, info):
        """密集奖励:基于到目标的距离"""
        if self.goal is None:
            return 0
        
        current_pos = obs[:3]
        distance = np.linalg.norm(current_pos - self.goal)
        
        # 距离越小奖励越高
        reward = -distance * 10
        
        # 加上动作能量惩罚
        reward -= 0.01 * np.sum(action ** 2)
        
        return reward

塑造奖励的常见坑

坑1:奖励 Hacking

智能体会找到你没想到的”作弊”方式获取奖励:

  • 给机器人设置”向前移动”的奖励,它可能会摔倒后滚着前进
  • 给抓取任务设置”抓取成功=1”的奖励,它可能永远抓着不放

解决方案:加入额外约束惩罚

# 反例:容易reward hacking
reward = 1.0 if grasped else 0.0
 
# 正例:加上多重约束
reward = 0.0
if grasped:
    reward += 1.0
    reward -= 0.5 * lifted_time  # 惩罚一直抓着不放
    if lifted_to_target:
        reward += 5.0

坑2:局部最优陷阱

智能体可能找到”凑合”的行为然后躺平:

  • 走路机器人学会跪着蹭地
  • 投篮机器人学会把球扔向篮筐然后去接

解决方案:课程学习 + 内在奖励

class CurriculumReward:
    """课程奖励:逐步提高难度"""
    
    def __init__(self, initial_threshold=1.0, final_threshold=0.1, steps=100000):
        self.current_threshold = initial_threshold
        self.final_threshold = final_threshold
        self.steps = steps
    
    def get_threshold(self, step):
        """线性退火阈值"""
        progress = min(step / self.steps, 1.0)
        return self.current_threshold - (self.current_threshold - self.final_threshold) * progress
    
    def reward(self, state, info):
        distance = info.get('distance', 1.0)
        threshold = self.get_threshold(info['step'])
        
        if distance < threshold:
            return 1.0
        return -0.01 * distance  # 小惩罚,鼓励靠近

探索策略:平衡探索与利用

探索是RL的核心挑战之一。探索不足会陷入局部最优,探索过度会浪费样本。

ε-greedy

最简单的探索策略:

class EpsilonGreedy:
    def __init__(self, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
    
    def select_action(self, Q_values, training=True):
        if training and random.random() < self.epsilon:
            return random.randint(0, len(Q_values) - 1)
        return Q_values.argmax()
    
    def decay(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

Upper Confidence Bound (UCB)

UCB给每个动作加一个”不确定性奖励”:

import math
 
class UCB1:
    def __init__(self, c=2.0):
        self.c = c
        self.N = defaultdict(int)  # 总选择次数
        self.N_a = defaultdict(int)  # 每个动作的选择次数
        self.Q = defaultdict(float)  # Q值估计
    
    def select_action(self):
        """UCB1选择"""
        for a in self.Q.keys():
            if self.N_a[a] == 0:
                return a  # 未尝试过的动作优先
        
        # 计算UCB
        best_action = None
        best_value = -float('inf')
        
        for a in self.Q:
            ucb_value = self.Q[a] + self.c * math.sqrt(
                math.log(self.N['total']) / self.N_a[a]
            )
            if ucb_value > best_value:
                best_value = ucb_value
                best_action = a
        
        return best_action
    
    def update(self, action, reward):
        self.N['total'] += 1
        self.N_a[action] += 1
        
        # 增量更新Q值
        self.Q[action] += (reward - self.Q[action]) / self.N_a[action]

Thompson Sampling

贝叶斯方法,用概率分布建模每个动作的价值:

class ThompsonSampling:
    def __init__(self, num_actions):
        self.num_actions = num_actions
        # Beta分布参数(用于二值奖励)
        self.alpha = [1.0] * num_actions  # 成功次数 + 1
        self.beta = [1.0] * num_actions   # 失败次数 + 1
    
    def select_action(self):
        """从Beta分布采样"""
        samples = [random.betavariate(self.alpha[a], self.beta[a]) 
                  for a in range(self.num_actions)]
        return samples.index(max(samples))
    
    def update(self, action, reward):
        if reward > 0:
            self.alpha[action] += 1
        else:
            self.beta[action] += 1
 
class GaussianThompsonSampling:
    """高斯版本的Thompson Sampling"""
    
    def __init__(self, num_actions, prior_mean=0.0, prior_std=1.0):
        self.num_actions = num_actions
        self.means = [prior_mean] * num_actions
        self.stds = [prior_std] * num_actions
        self.counts = [0] * num_actions
    
    def select_action(self):
        samples = [random.gauss(self.means[a], self.stds[a]) 
                  for a in range(self.num_actions)]
        return samples.index(max(samples))
    
    def update(self, action, reward):
        self.counts[action] += 1
        n = self.counts[action]
        
        # 增量更新均值和方差
        old_mean = self.means[action]
        self.means[action] = old_mean + (reward - old_mean) / n
        
        # 方差更新(贝叶斯方式)
        if n > 1:
            variance = ((n - 1) * self.stds[action]**2 + 
                       (reward - old_mean) * (reward - self.means[action])) / n
            self.stds[action] = sqrt(variance)

连续动作空间的探索

class ContinuousExploration:
    """连续动作空间的探索策略"""
    
    def __init__(self, action_dim, noise_std=0.1):
        self.action_dim = action_dim
        self.noise_std = noise_std
    
    def add_exploration_noise(self, action, training=True, step=0):
        if not training:
            return action
        
        # 退火噪声
        std = self.noise_std * max(0.1, 1.0 - step / 100000)
        
        noise = np.random.randn(self.action_dim) * std
        return np.clip(action + noise, -1, 1)

课程学习:从简单到复杂的训练策略

课程学习的核心思想是:不要一口吃成胖子,先学简单的,再学难的

class CurriculumLearning:
    """
    课程学习管理器
    """
    
    def __init__(self, difficulty_levels):
        self.levels = difficulty_levels
        # levels = [
        #     {'threshold': 0.0, 'task_param': 'easy'},
        #     {'threshold': 0.3, 'task_param': 'medium'},
        #     {'threshold': 0.6, 'task_param': 'hard'},
        # ]
        self.current_level = 0
    
    def should_level_up(self, recent_rewards):
        """判断是否升级难度"""
        if self.current_level >= len(self.levels) - 1:
            return False
        
        threshold = self.levels[self.current_level]['threshold']
        avg_reward = np.mean(recent_rewards)
        
        return avg_reward >= threshold
    
    def level_up(self, recent_rewards):
        """升级难度"""
        if self.should_level_up(recent_rewards):
            self.current_level += 1
            print(f"Level up! Now at level {self.current_level}")
        return self.current_level
    
    def get_task_param(self):
        return self.levels[self.current_level]['task_param']
 
class GravityCurriculum:
    """
    重力课程:逐步增加重力
    """
    
    def __init__(self, env_name='HalfCheetahBulletEnv-v0'):
        self.env_name = env_name
        self.gravity_levels = [5.0, 10.0, 20.0, 30.0]
        self.current_gravity = self.gravity_levels[0]
        self.current_level = 0
    
    def make_env(self):
        import pybullet_envs
        env = gym.make(self.env_name)
        env.unwrapped.apply_control(
            pybullet_envs.scenes.Mujoco千万千万
        )
        # 设置重力
        env.unwrapped.set_gravity(self.current_gravity)
        return env
    
    def update(self, reward):
        """根据表现更新重力"""
        if reward > 1000 and self.current_level < len(self.gravity_levels) - 1:
            self.current_level += 1
            self.current_gravity = self.gravity_levels[self.current_level]
            return True  # 重置环境
        return False
 
class AutomaticCurriculum:
    """
    自动课程学习:根据智能体表现自动调整任务难度
    """
    
    def __init__(self, env, min_difficulty=0.1, max_difficulty=1.0):
        self.env = env
        self.difficulty = min_difficulty
        self.min_difficulty = min_difficulty
        self.max_difficulty = max_difficulty
        
        self.success_buffer = deque(maxlen=100)
        self.failure_buffer = deque(maxlen=100)
    
    def update(self, success):
        """更新课程"""
        if success:
            self.success_buffer.append(1)
        else:
            self.failure_buffer.append(1)
        
        # 成功率
        success_rate = len(self.success_buffer) / (
            len(self.success_buffer) + len(self.failure_buffer) + 1
        )
        
        # 调整难度
        if success_rate > 0.8 and self.difficulty < self.max_difficulty:
            self.difficulty = min(self.max_difficulty, self.difficulty * 1.1)
        elif success_rate < 0.2 and self.difficulty > self.min_difficulty:
            self.difficulty = max(self.min_difficulty, self.difficulty * 0.9)
        
        # 更新环境
        self.env.set_difficulty(self.difficulty)
        
        return self.difficulty

归一化技巧

归一化在RL中极其重要,不同尺度的东西放一起训练会出问题。

奖励归一化

class RunningRewardNormalizer:
    """运行时奖励归一化"""
    
    def __init__(self, clip_range=(-10, 10), gamma=0.99):
        self.clip_range = clip_range
        self.gamma = gamma
        self.running_mean = 0
        self.running_var = 1
        self.count = 1e-4
    
    def normalize(self, reward):
        # Welford's online algorithm
        self.count += 1
        delta = reward - self.running_mean
        self.running_mean += delta / self.count
        delta2 = reward - self.running_mean
        self.running_var += delta * delta2
        
        # 标准差
        std = np.sqrt(self.running_var / self.count)
        
        # 归一化
        normalized = (reward - self.running_mean) / (std + 1e-8)
        
        # 裁剪
        return np.clip(normalized, *self.clip_range)
    
    def denormalize(self, normalized_reward):
        """反归一化"""
        std = np.sqrt(self.running_var / self.count)
        return normalized_reward * std + self.running_mean
 
class ReturnsNormalizer:
    """用回报归一化"""
    
    def __init__(self, gamma=0.99, clip_range=(-10, 10)):
        self.returns = deque(maxlen=10000)
        self.gamma = gamma
        self.clip_range = clip_range
    
    def normalize(self, rewards, gamma=0.99):
        # 计算回报
        returns = []
        R = 0
        for r in reversed(rewards):
            R = r + gamma * R
            returns.insert(0, R)
        
        self.returns.extend(returns)
        
        # 归一化
        mean = np.mean(self.returns)
        std = np.std(self.returns)
        
        normalized_returns = [(r - mean) / (std + 1e-8) for r in returns]
        
        # 裁剪
        return [np.clip(r, *self.clip_range) for r in normalized_returns]

观察归一化

class RunningObsNormalizer:
    """观察归一化"""
    
    def __init__(self, shape):
        self.mean = np.zeros(shape)
        self.var = np.ones(shape)
        self.count = 1e-4
    
    def update(self, obs):
        batch_mean = np.mean(obs)
        batch_var = np.var(obs)
        batch_count = len(obs)
        
        delta = batch_mean - self.mean
        total_count = self.count + batch_count
        
        self.mean += delta * batch_count / total_count
        self.var = (
            (self.count * self.var + batch_count * batch_var) / total_count +
            (delta ** 2) * self.count * batch_count / (total_count ** 2)
        )
        self.count = total_count
    
    def normalize(self, obs):
        return (obs - self.mean) / np.sqrt(self.var + 1e-8)
 
class BatchNormWrapper(nn.Module):
    """网络中的批量归一化"""
    
    def __init__(self, input_dim):
        super().__init__()
        self.bn = nn.BatchNorm1d(input_dim)
    
    def forward(self, x):
        if len(x.shape) == 3:  # (batch, seq, dim)
            # 在batch维度上做BN
            x = x.permute(0, 2, 1)
            x = self.bn(x)
            x = x.permute(0, 2, 1)
        else:
            x = self.bn(x)
        return x

Layer Norm vs Batch Norm

class PolicyWithNormalization(nn.Module):
    """带归一化的策略网络"""
    
    def __init__(self, obs_dim, action_dim):
        super().__init__()
        
        # 输入归一化
        self.obs_norm = nn.LayerNorm(obs_dim)
        
        # 隐藏层
        self.fc1 = nn.Linear(obs_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        
        # 策略头
        self.mu = nn.Linear(256, action_dim)
        self.log_std = nn.Parameter(torch.zeros(action_dim))
        
        # 价值头
        self.value_net = nn.Sequential(
            nn.Linear(256, 256),
            nn.LayerNorm(256),
            nn.Linear(256, 1)
        )
    
    def forward(self, x):
        x = self.obs_norm(x)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        
        mu = torch.tanh(self.mu(x))
        std = torch.exp(self.log_std)
        
        value = self.value_net(x)
        
        return mu, std, value

并行环境训练

样本效率低是RL的老大难问题,并行环境训练是提速的标配。

Gymnasium VectorEnv

import gymnasium as gym
from gymnasium.vector import AsyncVectorEnv, SyncVectorEnv
 
def make_env(env_id, seed):
    """创建单个环境"""
    def thunk():
        env = gym.make(env_id)
        env.reset(seed=seed)
        return env
    return thunk
 
def parallel_envs(env_name='CartPole-v1', num_envs=8):
    """创建并行环境"""
    env_fns = [make_env(env_name, i) for i in range(num_envs)]
    
    # 异步并行(推荐)
    env = AsyncVectorEnv(env_fns, context='fork', shared_memory=True)
    
    return env
 
def train_parallel(env_name='CartPole-v1', num_envs=8, total_steps=100000):
    """并行训练"""
    env = parallel_envs(env_name, num_envs)
    
    # 初始化
    obs_dim = env.single_observation_space.shape[0]
    action_dim = env.single_action_space.n
    
    agent = ActorCritic(obs_dim, action_dim)
    optimizer = optim.Adam(agent.parameters(), lr=3e-4)
    
    obs, _ = env.reset()
    episode_rewards = [[] for _ in range(num_envs)]
    episode_count = 0
    
    for step in range(0, total_steps, num_envs):
        # 收集经验
        actions = agent.select_actions(obs)  # (num_envs,)
        next_obs, rewards, terminations, truncations, infos = env.step(actions)
        
        # 记录奖励
        for i in range(num_envs):
            episode_rewards[i].append(rewards[i])
            
            if terminations[i] or truncations[i]:
                if len(episode_rewards[i]) > 0:
                    episode_count += 1
                    if episode_count % 10 == 0:
                        avg = np.mean([sum(ep) for ep in episode_rewards if ep])
                        print(f"Step {step}: Avg reward = {avg:.1f}")
                episode_rewards[i] = []
        
        obs = next_obs
        
        # 训练(每隔几步训练一次)
        if step % 128 == 0:
            train_agent(agent, optimizer, batch)

SubprocVecEnv (stable-baselines3风格)

class SubprocVecEnv:
    """多进程向量化环境(简化版)"""
    
    def __init__(self, env_fns, context='fork'):
        self.num_envs = len(env_fns)
        
        if context == 'fork':
            self.parent_pipes = []
            self.processes = []
            
            for fn in env_fns:
                parent_conn, child_conn = multiprocessing.Pipe()
                p = multiprocessing.Process(target=worker, args=(child_conn, fn))
                p.start()
                
                self.parent_pipes.append(parent_conn)
                self.processes.append(p)
        else:
            # spawn模式
            self.parent_conns = multiprocessing.Pipe()
            self.processes = [
                multiprocessing.Process(target=spawn_worker, 
                                       args=(conn, fn))
                for conn, fn in zip(self.parent_conns, env_fns)
            ]
            for p in self.processes:
                p.start()
    
    def reset(self):
        results = [pipe.recv() for pipe in self.parent_pipes]
        return np.array(results)
    
    def step(self, actions):
        for pipe, action in zip(self.parent_pipes, actions):
            pipe.send(action)
        
        results = [pipe.recv() for pipe in self.parent_pipes]
        
        obs = np.array([r[0] for r in results])
        rewards = np.array([r[1] for r in results])
        dones = np.array([r[2] for r in results])
        infos = [r[3] for r in results]
        
        return obs, rewards, dones, infos
    
    def close(self):
        for pipe in self.parent_pipes:
            pipe.send('close')
        for p in self.processes:
            p.join()

训练稳定性技巧

梯度裁剪

def train_step(agent, optimizer, batch, max_grad_norm=0.5):
    states, actions, returns, advantages = batch
    
    # 前向
    values = agent.get_value(states)
    log_probs = agent.get_log_prob(states, actions)
    
    # 损失
    policy_loss = -(log_probs * advantages).mean()
    value_loss = F.mse_loss(values, returns)
    loss = policy_loss + 0.5 * value_loss
    
    # 反向 + 梯度裁剪
    optimizer.zero_grad()
    loss.backward()
    
    # 关键:梯度裁剪
    nn.utils.clip_grad_norm_(agent.parameters(), max_grad_norm)
    
    optimizer.step()
    
    return loss.item()

目标网络

class DQNWithTarget:
    def __init__(self, state_dim, action_dim):
        self.q_network = QNetwork(state_dim, action_dim)
        self.target_network = QNetwork(state_dim, action_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())
    
    def update_target(self, tau=0.005):
        """软更新"""
        for target, source in zip(
            self.target_network.parameters(),
            self.q_network.parameters()
        ):
            target.data.copy_(tau * source.data + (1 - tau) * target.data)
    
    def hard_update_target(self):
        """硬更新"""
        self.target_network.load_state_dict(self.q_network.state_dict())

经验回放

class PrioritizedReplayBuffer:
    """优先经验回放"""
    
    def __init__(self, capacity=100000, alpha=0.6, beta=0.4):
        self.capacity = capacity
        self.alpha = alpha
        self.beta = beta
        
        self.buffer = []
        self.priorities = []
        self.position = 0
    
    def push(self, state, action, reward, next_state, done):
        max_priority = max(self.priorities) if self.priorities else 1.0
        
        if len(self.buffer) < self.capacity:
            self.buffer.append((state, action, reward, next_state, done))
            self.priorities.append(max_priority)
        else:
            self.buffer[self.position] = (state, action, reward, next_state, done)
            self.priorities[self.position] = max_priority
        
        self.position = (self.position + 1) % self.capacity
    
    def sample(self, batch_size):
        # 计算采样概率
        probs = np.array(self.priorities) ** self.alpha
        probs /= probs.sum()
        
        # 采样
        indices = np.random.choice(len(self.buffer), batch_size, p=probs)
        
        # 计算重要性采样权重
        weights = (len(self.buffer) * probs[indices]) ** (-self.beta)
        weights /= weights.max()
        
        batch = [self.buffer[i] for i in indices]
        
        return batch, indices, weights
    
    def update_priorities(self, indices, td_errors):
        for idx, td_error in zip(indices, td_errors):
            self.priorities[idx] = abs(td_error) + 1e-5

代码实战:Stable-Baselines3快速训练

stable-baselines3(SB3)是目前最好用的RL库之一,封装的算法稳定可靠。

import stable_baselines3
from stable_baselines3 import PPO, SAC, TD3, DQN
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor
import optuna
 
def make_env(env_id, seed=0):
    """创建带监控的环境"""
    def thunk():
        env = gym.make(env_id)
        env = Monitor(env)  # 记录训练数据
        env.reset(seed=seed)
        return env
    return thunk
 
def train_ppo(env_id='CartPole-v1', total_timesteps=100000):
    """用PPO训练"""
    # 创建环境
    env = DummyVecEnv([make_env(env_id)])
    env = VecNormalize(env, norm_obs=True, norm_reward=True)
    
    # 创建PPO模型
    model = PPO(
        'MlpPolicy',
        env,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        ent_coef=0.0,
        verbose=1,
        tensorboard_log='./logs/ppo/'
    )
    
    # 回调函数
    eval_env = DummyVecEnv([make_env(env_id, seed=42)])
    eval_callback = EvalCallback(
        eval_env,
        best_model_save_path='./models/ppo/',
        log_path='./logs/ppo/',
        eval_freq=1000,
        deterministic=True,
        render=False
    )
    
    checkpoint_callback = CheckpointCallback(
        save_freq=10000,
        save_path='./models/ppo_checkpoints/',
        name_prefix='ppo_model'
    )
    
    # 训练
    model.learn(
        total_timesteps=total_timesteps,
        callback=[eval_callback, checkpoint_callback],
        progress_bar=True
    )
    
    # 保存
    model.save('ppo_cartpole')
    
    # 评估
    eval_rewards = evaluate_model(model, env_id, n_episodes=10)
    print(f"Mean reward: {np.mean(eval_rewards):.2f}")
    
    return model
 
def train_sac(env_id='HalfCheetah-v4', total_timesteps=500000):
    """用SAC训练连续控制任务"""
    env = DummyVecEnv([make_env(env_id)])
    env = VecNormalize(env, norm_obs=True, norm_reward=False)
    
    model = SAC(
        'MlpPolicy',
        env,
        learning_rate=3e-4,
        buffer_size=1000000,
        learning_starts=1000,
        batch_size=256,
        tau=0.005,
        gamma=0.99,
        train_freq=1,
        gradient_steps=1,
        verbose=1,
        tensorboard_log='./logs/sac/'
    )
    
    model.learn(total_timesteps=total_timesteps, progress_bar=True)
    model.save('sac_halfcheetah')
    
    return model
 
def hyperparameter_search(env_id='CartPole-v1', n_trials=20):
    """超参数搜索"""
    def objective(trial):
        params = {
            'learning_rate': trial.suggest_float('learning_rate', 1e-5, 1e-3, log=True),
            'n_steps': trial.suggest_categorical('n_steps', [64, 128, 256, 512, 1024]),
            'batch_size': trial.suggest_categorical('batch_size', [32, 64, 128]),
            'n_epochs': trial.suggest_categorical('n_epochs', [3, 5, 10]),
            'gamma': trial.suggest_float('gamma', 0.9, 0.999),
            'gae_lambda': trial.suggest_float('gae_lambda', 0.9, 0.99),
            'clip_range': trial.suggest_float('clip_range', 0.1, 0.3),
            'ent_coef': trial.suggest_float('ent_coef', 0.0, 0.1),
        }
        
        # 创建环境
        env = DummyVecEnv([make_env(env_id)])
        
        # 创建模型
        model = PPO('MlpPolicy', env, **params, verbose=0)
        
        # 训练(缩短时间)
        model.learn(total_timesteps=50000, progress_bar=False)
        
        # 评估
        eval_rewards = evaluate_model(model, env_id, n_episodes=10)
        mean_reward = np.mean(eval_rewards)
        
        # 清理
        model.env.close()
        del model
        
        return mean_reward
    
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
    
    print(f"Best params: {study.best_params}")
    print(f"Best reward: {study.best_value:.2f}")
    
    return study.best_params
 
def evaluate_model(model, env_id, n_episodes=10):
    """评估模型"""
    env = gym.make(env_id)
    rewards = []
    
    for _ in range(n_episodes):
        obs, _ = env.reset()
        episode_reward = 0
        done = False
        
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, terminated, truncated, _ = env.step(action)
            episode_reward += reward
            done = terminated or truncated
        
        rewards.append(episode_reward)
    
    env.close()
    return rewards

代码实战:Ray/RLlib大规模分布式训练

当单机不够用的时候,就需要分布式训练了。Ray/RLlib是大规模RL训练的事实标准。

import ray
from ray import tune
from ray.rllib.algorithms import PPO, APPO, SAC
from ray.rllib.algorithms.ppo import PPOTorchPolicy
from ray.tune.registry import register_env
 
# 初始化Ray
ray.init(address='auto')  # 连接集群
 
def create_env(env_config):
    """创建环境"""
    import gymnasium as gym
    env = gym.make(env_config['env_name'])
    return env
 
# 注册环境
register_env('MyEnv-v0', create_env)
 
def distributed_ppo_training():
    """分布式PPO训练"""
    config = {
        'env': 'MyEnv-v0',
        'env_config': {'env_name': 'HalfCheetah-v4'},
        
        # 框架设置
        'framework': 'torch',
        'num_gpus': 0.5,  # 每个worker用0.5个GPU
        'num_gpus_per_worker': 0.5,
        
        # 并行设置
        'num_workers': 8,
        'num_envs_per_worker': 8,
        
        # 模型设置
        'model': {
            'fcnet_hiddens': [256, 256],
            'fcnet_activation': 'tanh',
        },
        
        # 训练设置
        'lr': 3e-4,
        'gamma': 0.99,
        'lam': 0.95,
        'kl_target': 0.01,
        'clip_param': 0.2,
        'train_batch_size': 2048,
        'sgd_minibatch_size': 128,
        'num_sgd_iter': 10,
        
        # 资源设置
        'num_cpus_per_worker': 2,
        'num_cpus_for_driver': 2,
    }
    
    # 创建算法
    trainer = PPOTorchPolicy(config)
    
    # 训练
    for i in range(1000):
        result = trainer.train()
        
        if i % 10 == 0:
            print(f"Step {i}: reward = {result['episode_reward_mean']:.2f}")
        
        # 保存checkpoint
        if i % 100 == 0:
            trainer.save('./checkpoints/ppo_' + str(i))
    
    return trainer
 
def tune_hyperparameter_search():
    """Tune超参数搜索"""
    config = {
        'env': 'CartPole-v1',
        'framework': 'torch',
        'num_workers': 4,
        
        # 搜索空间
        'lr': tune.loguniform(1e-5, 1e-3),
        'gamma': tune.uniform(0.95, 0.999),
        'clip_param': tune.uniform(0.1, 0.3),
        'lambda': tune.uniform(0.9, 0.99),
        'entropy_coef': tune.loguniform(1e-5, 1e-2),
        'kl_target': tune.uniform(0.005, 0.05),
    }
    
    # 搜索配置
    scheduler = ash.ASH(
        time_attr='training_iteration',
        metric='episode_reward_mean',
        mode='max',
        max_t=100,
        grace_period=10,
    )
    
    # 运行搜索
    analysis = tune.run(
        'PPO',
        config=config,
        num_samples=50,  # 并行跑50个实验
        scheduler=scheduler,
        checkpoint_at_end=True,
        storage_path='./tune_results/',
    )
    
    # 输出最佳配置
    best_config = analysis.get_best_config(metric='episode_reward_mean', mode='max')
    print(f"Best config: {best_config}")
    
    return analysis
 
def multi_agent_training():
    """多智能体训练"""
    config = {
        'env': 'multiagent_cartpole',
        'env_config': {
            'num_agents': 4,
        },
        'multiagent': {
            # 每个智能体有自己的策略
            'policies': {
                f'agent_{i}': (
                    None,
                    gym.spaces.Box(low=-1, high=1, shape=(4,)),
                    gym.spaces.Discrete(2),
                    {}
                ) for i in range(4)
            },
            # 策略映射
            'policy_mapping_fn': lambda agent_id: agent_id,
        },
        'num_workers': 4,
    }
    
    trainer = PPOTorchPolicy(config)
    
    for i in range(100):
        result = trainer.train()
        print(f"Step {i}: reward = {result['episode_reward_mean']:.2f}")
    
    return trainer
 
if __name__ == '__main__':
    # 单机训练
    model = train_ppo('CartPole-v1', total_timesteps=50000)
    
    # 分布式训练(需要Ray集群)
    # distributed_ppo_training()
    
    # 超参数搜索
    # best_params = tune_hyperparameter_search()

超参数推荐

不同环境差异很大,但有些经验值是通用的:

不同环境的推荐配置

环境类型示例学习率Batch Size折扣因子Network
离散控制CartPole, LunarLander3e-464-2560.9964-128
连续控制HalfCheetah, Walker3e-4256-10240.99256-512
复杂视觉Atari, VizDoom2.5e-432-640.99CNN
稀疏奖励MineRL, RoboLearn1e-4256-5120.995512+

PPO推荐配置

PPO_CONFIG = {
    # 学习率
    'learning_rate': 3e-4,  # 或用linear schedule从3e-4降到0
    
    # 收集和更新
    'n_steps': 2048,  # 每次更新前收集的步数
    'batch_size': 64,  # minibatch大小
    'n_epochs': 10,   # 每次更新的epoch数
    
    # GAE
    'gamma': 0.99,
    'gae_lambda': 0.95,
    
    # PPO特定
    'clip_range': 0.2,  # PPO裁剪范围
    'clip_range_vf': None,  # 价值函数裁剪
    
    # 熵正则化
    'entropy_coef': 0.01,  # 鼓励探索
    
    # 网络
    'net_arch': [dict(pi=[256, 256], vf=[256, 256])],
    
    # 优化器
    'optimizer': {
        'eps': 1e-5,  # Adam epsilon
        'alpha': 0.99,  # RMSprop alpha
    },
}
 
# SAC推荐配置
SAC_CONFIG = {
    'learning_rate': 3e-4,
    'buffer_size': 1000000,
    'learning_starts': 1000,
    'batch_size': 256,
    'tau': 0.005,  # 目标网络更新速度
    'gamma': 0.99,
    'train_freq': 1,
    'gradient_steps': 1,
    'ent_coef': 'auto',  # 自动调整熵系数
}

学习率调度

def linear_schedule(initial_value, final_value=0.0):
    """线性退火学习率"""
    def func(progress):
        return final_value + (initial_value - final_value) * progress
    return func
 
def cosine_schedule(initial_value, final_value=0.0, total_steps=1000000):
    """余弦退火学习率"""
    def func(step):
        progress = step / total_steps
        return final_value + (initial_value - final_value) * (1 + np.cos(np.pi * progress)) / 2
    return func
 
# PPO + linear schedule
model = PPO(
    'MlpPolicy',
    env,
    learning_rate=linear_schedule(3e-4, 0.0),
    clip_range=linear_schedule(0.2, 0.0),
    ...
)

总结

RL调参是个经验活,没有万能公式。以下是我的血泪经验:

关键原则

  1. 先跑baseline:用SB3的默认参数先跑通,再调优
  2. 一次只改一个参数:变量太多不知道谁起作用
  3. 多跑几个种子:RL对随机种子敏感,报告mean±std
  4. 监控一切:reward、value loss、policy entropy、gradient norm

常见问题排查

  • 训练不收敛 → 降低学习率,加正则化
  • 策略崩溃 → 加熵正则,检查reward shaping
  • 价值估计不准 → 单独训练Critic,增加更新频率
  • 样本效率低 → 并行环境,减少环境交互时间

工具推荐

  • stable-baselines3:快速验证想法
  • Ray/RLlib:大规模分布式训练
  • Optuna:超参数搜索
  • Weights & Biases:实验追踪

强化学习的调参确实是玄学,但随着经验积累,你会慢慢建立起对算法的直觉。坚持实验,多看代码,勤于总结,假以时日你也能成为RL调参高手。