强化学习调参与工程实战
强化学习调参的独特挑战
强化学习的调参跟其他机器学习不太一样,有几个让人头秃的特点:
第一,数据不独立同分布(Non-i.i.d.)。普通监督学习里,每个样本都是独立抽样的。但在RL里,你的训练数据是由当前策略生成的,策略在变,数据分布也在变。你改了一个超参数,收集的数据全变了,可能导致其他超参数的行为也变了。这就像你在调一个会自我改变的机器。
第二,训练曲线像心电图。监督学习的loss通常单调下降,RL的reward曲线可能今天涨明天跌后天又涨回去。有时候看起来收敛了,结果过两天又崩了。所以判断”训练好了”本身就是个技术活。
第三,不同环境差异巨大。CartPole调好的参数放到Pendulum上可能完全不好用。不同动作空间、奖励尺度、状态维度都需要不同的调参策略。
第四,随机种子影响大。RL对随机种子敏感得离谱。同一套参数,换个种子可能一个能收敛一个不能。所以报告结果时要同时报mean和std。
def set_seed(seed=42):
"""设置所有随机种子"""
import random
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# Gymnasium的seed
import gymnasium as gym
gym.envs.registration.register(id='CustomEnv-v0', ...)
# 确保结果可复现
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def multiple_seed_run(agent_class, env_name, seeds=[42, 43, 44, 45, 46]):
"""用多个种子跑实验"""
results = []
for seed in seeds:
set_seed(seed)
agent = agent_class()
result = train(agent, env_name)
results.append(result)
mean = np.mean(results)
std = np.std(results)
print(f"Results over {len(seeds)} seeds:")
print(f" Mean: {mean:.2f}")
print(f" Std: {std:.2f}")
print(f" Min: {np.min(results):.2f}")
print(f" Max: {np.max(results):.2f}")
return mean, std奖励塑造:设计有效的奖励函数
奖励函数是RL里最玄学也最重要的部分。设计得好,智能体学得又快又好;设计得差,可能学出奇怪的行为甚至完全学不会。
奖励成形(Reward Shaping)
Luketina等人证明了:如果额外奖励满足以下条件,那么最优策略不变:
其中是某个势函数。这就是著名的势函数重塑定理。
实践中的奖励工程往往更粗暴直接:
class RewardShaping:
"""
奖励函数工程
"""
def __init__(self, reward_weights):
self.weights = reward_weights
# reward_weights = {
# 'progress': 1.0, # 向目标前进
# 'collision': -10.0, # 碰撞惩罚
# 'energy': -0.01, # 能量消耗
# 'success': 100.0, # 成功奖励
# }
def compute_reward(self, state, action, next_state, info):
reward = 0
# 1. 任务进展奖励
if 'distance_to_goal' in info:
progress = info.get('prev_distance', 0) - info['distance_to_goal']
reward += self.weights['progress'] * progress
# 2. 碰撞惩罚
if info.get('collision', False):
reward += self.weights['collision']
# 3. 能量惩罚(鼓励省力)
if 'energy_used' in info:
reward -= self.weights['energy'] * info['energy_used']
# 4. 成功/失败奖励
if info.get('success', False):
reward += self.weights['success']
return reward
class SparseToDenseWrapper:
"""
把稀疏奖励转成密集奖励(稀疏奖励太难学)
"""
def __init__(self, env, goal_threshold=0.1):
self.env = env
self.goal_threshold = goal_threshold
self.goal = None
def reset(self):
obs, info = self.env.reset()
self.goal = info.get('goal', obs[:3]) # 假设goal在obs里
return obs
def step(self, action):
obs, sparse_reward, done, truncated, info = self.env.step(action)
# 计算密集奖励
dense_reward = self.compute_dense_reward(obs, action, info)
# 如果成功,给额外奖励
if sparse_reward > 0:
dense_reward += 100
return obs, dense_reward, done, truncated, info
def compute_dense_reward(self, obs, action, info):
"""密集奖励:基于到目标的距离"""
if self.goal is None:
return 0
current_pos = obs[:3]
distance = np.linalg.norm(current_pos - self.goal)
# 距离越小奖励越高
reward = -distance * 10
# 加上动作能量惩罚
reward -= 0.01 * np.sum(action ** 2)
return reward塑造奖励的常见坑
坑1:奖励 Hacking
智能体会找到你没想到的”作弊”方式获取奖励:
- 给机器人设置”向前移动”的奖励,它可能会摔倒后滚着前进
- 给抓取任务设置”抓取成功=1”的奖励,它可能永远抓着不放
解决方案:加入额外约束惩罚
# 反例:容易reward hacking
reward = 1.0 if grasped else 0.0
# 正例:加上多重约束
reward = 0.0
if grasped:
reward += 1.0
reward -= 0.5 * lifted_time # 惩罚一直抓着不放
if lifted_to_target:
reward += 5.0坑2:局部最优陷阱
智能体可能找到”凑合”的行为然后躺平:
- 走路机器人学会跪着蹭地
- 投篮机器人学会把球扔向篮筐然后去接
解决方案:课程学习 + 内在奖励
class CurriculumReward:
"""课程奖励:逐步提高难度"""
def __init__(self, initial_threshold=1.0, final_threshold=0.1, steps=100000):
self.current_threshold = initial_threshold
self.final_threshold = final_threshold
self.steps = steps
def get_threshold(self, step):
"""线性退火阈值"""
progress = min(step / self.steps, 1.0)
return self.current_threshold - (self.current_threshold - self.final_threshold) * progress
def reward(self, state, info):
distance = info.get('distance', 1.0)
threshold = self.get_threshold(info['step'])
if distance < threshold:
return 1.0
return -0.01 * distance # 小惩罚,鼓励靠近探索策略:平衡探索与利用
探索是RL的核心挑战之一。探索不足会陷入局部最优,探索过度会浪费样本。
ε-greedy
最简单的探索策略:
class EpsilonGreedy:
def __init__(self, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
self.epsilon = epsilon
self.epsilon_min = epsilon_min
self.epsilon_decay = epsilon_decay
def select_action(self, Q_values, training=True):
if training and random.random() < self.epsilon:
return random.randint(0, len(Q_values) - 1)
return Q_values.argmax()
def decay(self):
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)Upper Confidence Bound (UCB)
UCB给每个动作加一个”不确定性奖励”:
import math
class UCB1:
def __init__(self, c=2.0):
self.c = c
self.N = defaultdict(int) # 总选择次数
self.N_a = defaultdict(int) # 每个动作的选择次数
self.Q = defaultdict(float) # Q值估计
def select_action(self):
"""UCB1选择"""
for a in self.Q.keys():
if self.N_a[a] == 0:
return a # 未尝试过的动作优先
# 计算UCB
best_action = None
best_value = -float('inf')
for a in self.Q:
ucb_value = self.Q[a] + self.c * math.sqrt(
math.log(self.N['total']) / self.N_a[a]
)
if ucb_value > best_value:
best_value = ucb_value
best_action = a
return best_action
def update(self, action, reward):
self.N['total'] += 1
self.N_a[action] += 1
# 增量更新Q值
self.Q[action] += (reward - self.Q[action]) / self.N_a[action]Thompson Sampling
贝叶斯方法,用概率分布建模每个动作的价值:
class ThompsonSampling:
def __init__(self, num_actions):
self.num_actions = num_actions
# Beta分布参数(用于二值奖励)
self.alpha = [1.0] * num_actions # 成功次数 + 1
self.beta = [1.0] * num_actions # 失败次数 + 1
def select_action(self):
"""从Beta分布采样"""
samples = [random.betavariate(self.alpha[a], self.beta[a])
for a in range(self.num_actions)]
return samples.index(max(samples))
def update(self, action, reward):
if reward > 0:
self.alpha[action] += 1
else:
self.beta[action] += 1
class GaussianThompsonSampling:
"""高斯版本的Thompson Sampling"""
def __init__(self, num_actions, prior_mean=0.0, prior_std=1.0):
self.num_actions = num_actions
self.means = [prior_mean] * num_actions
self.stds = [prior_std] * num_actions
self.counts = [0] * num_actions
def select_action(self):
samples = [random.gauss(self.means[a], self.stds[a])
for a in range(self.num_actions)]
return samples.index(max(samples))
def update(self, action, reward):
self.counts[action] += 1
n = self.counts[action]
# 增量更新均值和方差
old_mean = self.means[action]
self.means[action] = old_mean + (reward - old_mean) / n
# 方差更新(贝叶斯方式)
if n > 1:
variance = ((n - 1) * self.stds[action]**2 +
(reward - old_mean) * (reward - self.means[action])) / n
self.stds[action] = sqrt(variance)连续动作空间的探索
class ContinuousExploration:
"""连续动作空间的探索策略"""
def __init__(self, action_dim, noise_std=0.1):
self.action_dim = action_dim
self.noise_std = noise_std
def add_exploration_noise(self, action, training=True, step=0):
if not training:
return action
# 退火噪声
std = self.noise_std * max(0.1, 1.0 - step / 100000)
noise = np.random.randn(self.action_dim) * std
return np.clip(action + noise, -1, 1)课程学习:从简单到复杂的训练策略
课程学习的核心思想是:不要一口吃成胖子,先学简单的,再学难的。
class CurriculumLearning:
"""
课程学习管理器
"""
def __init__(self, difficulty_levels):
self.levels = difficulty_levels
# levels = [
# {'threshold': 0.0, 'task_param': 'easy'},
# {'threshold': 0.3, 'task_param': 'medium'},
# {'threshold': 0.6, 'task_param': 'hard'},
# ]
self.current_level = 0
def should_level_up(self, recent_rewards):
"""判断是否升级难度"""
if self.current_level >= len(self.levels) - 1:
return False
threshold = self.levels[self.current_level]['threshold']
avg_reward = np.mean(recent_rewards)
return avg_reward >= threshold
def level_up(self, recent_rewards):
"""升级难度"""
if self.should_level_up(recent_rewards):
self.current_level += 1
print(f"Level up! Now at level {self.current_level}")
return self.current_level
def get_task_param(self):
return self.levels[self.current_level]['task_param']
class GravityCurriculum:
"""
重力课程:逐步增加重力
"""
def __init__(self, env_name='HalfCheetahBulletEnv-v0'):
self.env_name = env_name
self.gravity_levels = [5.0, 10.0, 20.0, 30.0]
self.current_gravity = self.gravity_levels[0]
self.current_level = 0
def make_env(self):
import pybullet_envs
env = gym.make(self.env_name)
env.unwrapped.apply_control(
pybullet_envs.scenes.Mujoco千万千万
)
# 设置重力
env.unwrapped.set_gravity(self.current_gravity)
return env
def update(self, reward):
"""根据表现更新重力"""
if reward > 1000 and self.current_level < len(self.gravity_levels) - 1:
self.current_level += 1
self.current_gravity = self.gravity_levels[self.current_level]
return True # 重置环境
return False
class AutomaticCurriculum:
"""
自动课程学习:根据智能体表现自动调整任务难度
"""
def __init__(self, env, min_difficulty=0.1, max_difficulty=1.0):
self.env = env
self.difficulty = min_difficulty
self.min_difficulty = min_difficulty
self.max_difficulty = max_difficulty
self.success_buffer = deque(maxlen=100)
self.failure_buffer = deque(maxlen=100)
def update(self, success):
"""更新课程"""
if success:
self.success_buffer.append(1)
else:
self.failure_buffer.append(1)
# 成功率
success_rate = len(self.success_buffer) / (
len(self.success_buffer) + len(self.failure_buffer) + 1
)
# 调整难度
if success_rate > 0.8 and self.difficulty < self.max_difficulty:
self.difficulty = min(self.max_difficulty, self.difficulty * 1.1)
elif success_rate < 0.2 and self.difficulty > self.min_difficulty:
self.difficulty = max(self.min_difficulty, self.difficulty * 0.9)
# 更新环境
self.env.set_difficulty(self.difficulty)
return self.difficulty归一化技巧
归一化在RL中极其重要,不同尺度的东西放一起训练会出问题。
奖励归一化
class RunningRewardNormalizer:
"""运行时奖励归一化"""
def __init__(self, clip_range=(-10, 10), gamma=0.99):
self.clip_range = clip_range
self.gamma = gamma
self.running_mean = 0
self.running_var = 1
self.count = 1e-4
def normalize(self, reward):
# Welford's online algorithm
self.count += 1
delta = reward - self.running_mean
self.running_mean += delta / self.count
delta2 = reward - self.running_mean
self.running_var += delta * delta2
# 标准差
std = np.sqrt(self.running_var / self.count)
# 归一化
normalized = (reward - self.running_mean) / (std + 1e-8)
# 裁剪
return np.clip(normalized, *self.clip_range)
def denormalize(self, normalized_reward):
"""反归一化"""
std = np.sqrt(self.running_var / self.count)
return normalized_reward * std + self.running_mean
class ReturnsNormalizer:
"""用回报归一化"""
def __init__(self, gamma=0.99, clip_range=(-10, 10)):
self.returns = deque(maxlen=10000)
self.gamma = gamma
self.clip_range = clip_range
def normalize(self, rewards, gamma=0.99):
# 计算回报
returns = []
R = 0
for r in reversed(rewards):
R = r + gamma * R
returns.insert(0, R)
self.returns.extend(returns)
# 归一化
mean = np.mean(self.returns)
std = np.std(self.returns)
normalized_returns = [(r - mean) / (std + 1e-8) for r in returns]
# 裁剪
return [np.clip(r, *self.clip_range) for r in normalized_returns]观察归一化
class RunningObsNormalizer:
"""观察归一化"""
def __init__(self, shape):
self.mean = np.zeros(shape)
self.var = np.ones(shape)
self.count = 1e-4
def update(self, obs):
batch_mean = np.mean(obs)
batch_var = np.var(obs)
batch_count = len(obs)
delta = batch_mean - self.mean
total_count = self.count + batch_count
self.mean += delta * batch_count / total_count
self.var = (
(self.count * self.var + batch_count * batch_var) / total_count +
(delta ** 2) * self.count * batch_count / (total_count ** 2)
)
self.count = total_count
def normalize(self, obs):
return (obs - self.mean) / np.sqrt(self.var + 1e-8)
class BatchNormWrapper(nn.Module):
"""网络中的批量归一化"""
def __init__(self, input_dim):
super().__init__()
self.bn = nn.BatchNorm1d(input_dim)
def forward(self, x):
if len(x.shape) == 3: # (batch, seq, dim)
# 在batch维度上做BN
x = x.permute(0, 2, 1)
x = self.bn(x)
x = x.permute(0, 2, 1)
else:
x = self.bn(x)
return xLayer Norm vs Batch Norm
class PolicyWithNormalization(nn.Module):
"""带归一化的策略网络"""
def __init__(self, obs_dim, action_dim):
super().__init__()
# 输入归一化
self.obs_norm = nn.LayerNorm(obs_dim)
# 隐藏层
self.fc1 = nn.Linear(obs_dim, 256)
self.fc2 = nn.Linear(256, 256)
# 策略头
self.mu = nn.Linear(256, action_dim)
self.log_std = nn.Parameter(torch.zeros(action_dim))
# 价值头
self.value_net = nn.Sequential(
nn.Linear(256, 256),
nn.LayerNorm(256),
nn.Linear(256, 1)
)
def forward(self, x):
x = self.obs_norm(x)
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
mu = torch.tanh(self.mu(x))
std = torch.exp(self.log_std)
value = self.value_net(x)
return mu, std, value并行环境训练
样本效率低是RL的老大难问题,并行环境训练是提速的标配。
Gymnasium VectorEnv
import gymnasium as gym
from gymnasium.vector import AsyncVectorEnv, SyncVectorEnv
def make_env(env_id, seed):
"""创建单个环境"""
def thunk():
env = gym.make(env_id)
env.reset(seed=seed)
return env
return thunk
def parallel_envs(env_name='CartPole-v1', num_envs=8):
"""创建并行环境"""
env_fns = [make_env(env_name, i) for i in range(num_envs)]
# 异步并行(推荐)
env = AsyncVectorEnv(env_fns, context='fork', shared_memory=True)
return env
def train_parallel(env_name='CartPole-v1', num_envs=8, total_steps=100000):
"""并行训练"""
env = parallel_envs(env_name, num_envs)
# 初始化
obs_dim = env.single_observation_space.shape[0]
action_dim = env.single_action_space.n
agent = ActorCritic(obs_dim, action_dim)
optimizer = optim.Adam(agent.parameters(), lr=3e-4)
obs, _ = env.reset()
episode_rewards = [[] for _ in range(num_envs)]
episode_count = 0
for step in range(0, total_steps, num_envs):
# 收集经验
actions = agent.select_actions(obs) # (num_envs,)
next_obs, rewards, terminations, truncations, infos = env.step(actions)
# 记录奖励
for i in range(num_envs):
episode_rewards[i].append(rewards[i])
if terminations[i] or truncations[i]:
if len(episode_rewards[i]) > 0:
episode_count += 1
if episode_count % 10 == 0:
avg = np.mean([sum(ep) for ep in episode_rewards if ep])
print(f"Step {step}: Avg reward = {avg:.1f}")
episode_rewards[i] = []
obs = next_obs
# 训练(每隔几步训练一次)
if step % 128 == 0:
train_agent(agent, optimizer, batch)SubprocVecEnv (stable-baselines3风格)
class SubprocVecEnv:
"""多进程向量化环境(简化版)"""
def __init__(self, env_fns, context='fork'):
self.num_envs = len(env_fns)
if context == 'fork':
self.parent_pipes = []
self.processes = []
for fn in env_fns:
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=worker, args=(child_conn, fn))
p.start()
self.parent_pipes.append(parent_conn)
self.processes.append(p)
else:
# spawn模式
self.parent_conns = multiprocessing.Pipe()
self.processes = [
multiprocessing.Process(target=spawn_worker,
args=(conn, fn))
for conn, fn in zip(self.parent_conns, env_fns)
]
for p in self.processes:
p.start()
def reset(self):
results = [pipe.recv() for pipe in self.parent_pipes]
return np.array(results)
def step(self, actions):
for pipe, action in zip(self.parent_pipes, actions):
pipe.send(action)
results = [pipe.recv() for pipe in self.parent_pipes]
obs = np.array([r[0] for r in results])
rewards = np.array([r[1] for r in results])
dones = np.array([r[2] for r in results])
infos = [r[3] for r in results]
return obs, rewards, dones, infos
def close(self):
for pipe in self.parent_pipes:
pipe.send('close')
for p in self.processes:
p.join()训练稳定性技巧
梯度裁剪
def train_step(agent, optimizer, batch, max_grad_norm=0.5):
states, actions, returns, advantages = batch
# 前向
values = agent.get_value(states)
log_probs = agent.get_log_prob(states, actions)
# 损失
policy_loss = -(log_probs * advantages).mean()
value_loss = F.mse_loss(values, returns)
loss = policy_loss + 0.5 * value_loss
# 反向 + 梯度裁剪
optimizer.zero_grad()
loss.backward()
# 关键:梯度裁剪
nn.utils.clip_grad_norm_(agent.parameters(), max_grad_norm)
optimizer.step()
return loss.item()目标网络
class DQNWithTarget:
def __init__(self, state_dim, action_dim):
self.q_network = QNetwork(state_dim, action_dim)
self.target_network = QNetwork(state_dim, action_dim)
self.target_network.load_state_dict(self.q_network.state_dict())
def update_target(self, tau=0.005):
"""软更新"""
for target, source in zip(
self.target_network.parameters(),
self.q_network.parameters()
):
target.data.copy_(tau * source.data + (1 - tau) * target.data)
def hard_update_target(self):
"""硬更新"""
self.target_network.load_state_dict(self.q_network.state_dict())经验回放
class PrioritizedReplayBuffer:
"""优先经验回放"""
def __init__(self, capacity=100000, alpha=0.6, beta=0.4):
self.capacity = capacity
self.alpha = alpha
self.beta = beta
self.buffer = []
self.priorities = []
self.position = 0
def push(self, state, action, reward, next_state, done):
max_priority = max(self.priorities) if self.priorities else 1.0
if len(self.buffer) < self.capacity:
self.buffer.append((state, action, reward, next_state, done))
self.priorities.append(max_priority)
else:
self.buffer[self.position] = (state, action, reward, next_state, done)
self.priorities[self.position] = max_priority
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
# 计算采样概率
probs = np.array(self.priorities) ** self.alpha
probs /= probs.sum()
# 采样
indices = np.random.choice(len(self.buffer), batch_size, p=probs)
# 计算重要性采样权重
weights = (len(self.buffer) * probs[indices]) ** (-self.beta)
weights /= weights.max()
batch = [self.buffer[i] for i in indices]
return batch, indices, weights
def update_priorities(self, indices, td_errors):
for idx, td_error in zip(indices, td_errors):
self.priorities[idx] = abs(td_error) + 1e-5代码实战:Stable-Baselines3快速训练
stable-baselines3(SB3)是目前最好用的RL库之一,封装的算法稳定可靠。
import stable_baselines3
from stable_baselines3 import PPO, SAC, TD3, DQN
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor
import optuna
def make_env(env_id, seed=0):
"""创建带监控的环境"""
def thunk():
env = gym.make(env_id)
env = Monitor(env) # 记录训练数据
env.reset(seed=seed)
return env
return thunk
def train_ppo(env_id='CartPole-v1', total_timesteps=100000):
"""用PPO训练"""
# 创建环境
env = DummyVecEnv([make_env(env_id)])
env = VecNormalize(env, norm_obs=True, norm_reward=True)
# 创建PPO模型
model = PPO(
'MlpPolicy',
env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
ent_coef=0.0,
verbose=1,
tensorboard_log='./logs/ppo/'
)
# 回调函数
eval_env = DummyVecEnv([make_env(env_id, seed=42)])
eval_callback = EvalCallback(
eval_env,
best_model_save_path='./models/ppo/',
log_path='./logs/ppo/',
eval_freq=1000,
deterministic=True,
render=False
)
checkpoint_callback = CheckpointCallback(
save_freq=10000,
save_path='./models/ppo_checkpoints/',
name_prefix='ppo_model'
)
# 训练
model.learn(
total_timesteps=total_timesteps,
callback=[eval_callback, checkpoint_callback],
progress_bar=True
)
# 保存
model.save('ppo_cartpole')
# 评估
eval_rewards = evaluate_model(model, env_id, n_episodes=10)
print(f"Mean reward: {np.mean(eval_rewards):.2f}")
return model
def train_sac(env_id='HalfCheetah-v4', total_timesteps=500000):
"""用SAC训练连续控制任务"""
env = DummyVecEnv([make_env(env_id)])
env = VecNormalize(env, norm_obs=True, norm_reward=False)
model = SAC(
'MlpPolicy',
env,
learning_rate=3e-4,
buffer_size=1000000,
learning_starts=1000,
batch_size=256,
tau=0.005,
gamma=0.99,
train_freq=1,
gradient_steps=1,
verbose=1,
tensorboard_log='./logs/sac/'
)
model.learn(total_timesteps=total_timesteps, progress_bar=True)
model.save('sac_halfcheetah')
return model
def hyperparameter_search(env_id='CartPole-v1', n_trials=20):
"""超参数搜索"""
def objective(trial):
params = {
'learning_rate': trial.suggest_float('learning_rate', 1e-5, 1e-3, log=True),
'n_steps': trial.suggest_categorical('n_steps', [64, 128, 256, 512, 1024]),
'batch_size': trial.suggest_categorical('batch_size', [32, 64, 128]),
'n_epochs': trial.suggest_categorical('n_epochs', [3, 5, 10]),
'gamma': trial.suggest_float('gamma', 0.9, 0.999),
'gae_lambda': trial.suggest_float('gae_lambda', 0.9, 0.99),
'clip_range': trial.suggest_float('clip_range', 0.1, 0.3),
'ent_coef': trial.suggest_float('ent_coef', 0.0, 0.1),
}
# 创建环境
env = DummyVecEnv([make_env(env_id)])
# 创建模型
model = PPO('MlpPolicy', env, **params, verbose=0)
# 训练(缩短时间)
model.learn(total_timesteps=50000, progress_bar=False)
# 评估
eval_rewards = evaluate_model(model, env_id, n_episodes=10)
mean_reward = np.mean(eval_rewards)
# 清理
model.env.close()
del model
return mean_reward
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
print(f"Best params: {study.best_params}")
print(f"Best reward: {study.best_value:.2f}")
return study.best_params
def evaluate_model(model, env_id, n_episodes=10):
"""评估模型"""
env = gym.make(env_id)
rewards = []
for _ in range(n_episodes):
obs, _ = env.reset()
episode_reward = 0
done = False
while not done:
action, _ = model.predict(obs, deterministic=True)
obs, reward, terminated, truncated, _ = env.step(action)
episode_reward += reward
done = terminated or truncated
rewards.append(episode_reward)
env.close()
return rewards代码实战:Ray/RLlib大规模分布式训练
当单机不够用的时候,就需要分布式训练了。Ray/RLlib是大规模RL训练的事实标准。
import ray
from ray import tune
from ray.rllib.algorithms import PPO, APPO, SAC
from ray.rllib.algorithms.ppo import PPOTorchPolicy
from ray.tune.registry import register_env
# 初始化Ray
ray.init(address='auto') # 连接集群
def create_env(env_config):
"""创建环境"""
import gymnasium as gym
env = gym.make(env_config['env_name'])
return env
# 注册环境
register_env('MyEnv-v0', create_env)
def distributed_ppo_training():
"""分布式PPO训练"""
config = {
'env': 'MyEnv-v0',
'env_config': {'env_name': 'HalfCheetah-v4'},
# 框架设置
'framework': 'torch',
'num_gpus': 0.5, # 每个worker用0.5个GPU
'num_gpus_per_worker': 0.5,
# 并行设置
'num_workers': 8,
'num_envs_per_worker': 8,
# 模型设置
'model': {
'fcnet_hiddens': [256, 256],
'fcnet_activation': 'tanh',
},
# 训练设置
'lr': 3e-4,
'gamma': 0.99,
'lam': 0.95,
'kl_target': 0.01,
'clip_param': 0.2,
'train_batch_size': 2048,
'sgd_minibatch_size': 128,
'num_sgd_iter': 10,
# 资源设置
'num_cpus_per_worker': 2,
'num_cpus_for_driver': 2,
}
# 创建算法
trainer = PPOTorchPolicy(config)
# 训练
for i in range(1000):
result = trainer.train()
if i % 10 == 0:
print(f"Step {i}: reward = {result['episode_reward_mean']:.2f}")
# 保存checkpoint
if i % 100 == 0:
trainer.save('./checkpoints/ppo_' + str(i))
return trainer
def tune_hyperparameter_search():
"""Tune超参数搜索"""
config = {
'env': 'CartPole-v1',
'framework': 'torch',
'num_workers': 4,
# 搜索空间
'lr': tune.loguniform(1e-5, 1e-3),
'gamma': tune.uniform(0.95, 0.999),
'clip_param': tune.uniform(0.1, 0.3),
'lambda': tune.uniform(0.9, 0.99),
'entropy_coef': tune.loguniform(1e-5, 1e-2),
'kl_target': tune.uniform(0.005, 0.05),
}
# 搜索配置
scheduler = ash.ASH(
time_attr='training_iteration',
metric='episode_reward_mean',
mode='max',
max_t=100,
grace_period=10,
)
# 运行搜索
analysis = tune.run(
'PPO',
config=config,
num_samples=50, # 并行跑50个实验
scheduler=scheduler,
checkpoint_at_end=True,
storage_path='./tune_results/',
)
# 输出最佳配置
best_config = analysis.get_best_config(metric='episode_reward_mean', mode='max')
print(f"Best config: {best_config}")
return analysis
def multi_agent_training():
"""多智能体训练"""
config = {
'env': 'multiagent_cartpole',
'env_config': {
'num_agents': 4,
},
'multiagent': {
# 每个智能体有自己的策略
'policies': {
f'agent_{i}': (
None,
gym.spaces.Box(low=-1, high=1, shape=(4,)),
gym.spaces.Discrete(2),
{}
) for i in range(4)
},
# 策略映射
'policy_mapping_fn': lambda agent_id: agent_id,
},
'num_workers': 4,
}
trainer = PPOTorchPolicy(config)
for i in range(100):
result = trainer.train()
print(f"Step {i}: reward = {result['episode_reward_mean']:.2f}")
return trainer
if __name__ == '__main__':
# 单机训练
model = train_ppo('CartPole-v1', total_timesteps=50000)
# 分布式训练(需要Ray集群)
# distributed_ppo_training()
# 超参数搜索
# best_params = tune_hyperparameter_search()超参数推荐
不同环境差异很大,但有些经验值是通用的:
不同环境的推荐配置
| 环境类型 | 示例 | 学习率 | Batch Size | 折扣因子 | Network |
|---|---|---|---|---|---|
| 离散控制 | CartPole, LunarLander | 3e-4 | 64-256 | 0.99 | 64-128 |
| 连续控制 | HalfCheetah, Walker | 3e-4 | 256-1024 | 0.99 | 256-512 |
| 复杂视觉 | Atari, VizDoom | 2.5e-4 | 32-64 | 0.99 | CNN |
| 稀疏奖励 | MineRL, RoboLearn | 1e-4 | 256-512 | 0.995 | 512+ |
PPO推荐配置
PPO_CONFIG = {
# 学习率
'learning_rate': 3e-4, # 或用linear schedule从3e-4降到0
# 收集和更新
'n_steps': 2048, # 每次更新前收集的步数
'batch_size': 64, # minibatch大小
'n_epochs': 10, # 每次更新的epoch数
# GAE
'gamma': 0.99,
'gae_lambda': 0.95,
# PPO特定
'clip_range': 0.2, # PPO裁剪范围
'clip_range_vf': None, # 价值函数裁剪
# 熵正则化
'entropy_coef': 0.01, # 鼓励探索
# 网络
'net_arch': [dict(pi=[256, 256], vf=[256, 256])],
# 优化器
'optimizer': {
'eps': 1e-5, # Adam epsilon
'alpha': 0.99, # RMSprop alpha
},
}
# SAC推荐配置
SAC_CONFIG = {
'learning_rate': 3e-4,
'buffer_size': 1000000,
'learning_starts': 1000,
'batch_size': 256,
'tau': 0.005, # 目标网络更新速度
'gamma': 0.99,
'train_freq': 1,
'gradient_steps': 1,
'ent_coef': 'auto', # 自动调整熵系数
}学习率调度
def linear_schedule(initial_value, final_value=0.0):
"""线性退火学习率"""
def func(progress):
return final_value + (initial_value - final_value) * progress
return func
def cosine_schedule(initial_value, final_value=0.0, total_steps=1000000):
"""余弦退火学习率"""
def func(step):
progress = step / total_steps
return final_value + (initial_value - final_value) * (1 + np.cos(np.pi * progress)) / 2
return func
# PPO + linear schedule
model = PPO(
'MlpPolicy',
env,
learning_rate=linear_schedule(3e-4, 0.0),
clip_range=linear_schedule(0.2, 0.0),
...
)总结
RL调参是个经验活,没有万能公式。以下是我的血泪经验:
关键原则:
- 先跑baseline:用SB3的默认参数先跑通,再调优
- 一次只改一个参数:变量太多不知道谁起作用
- 多跑几个种子:RL对随机种子敏感,报告mean±std
- 监控一切:reward、value loss、policy entropy、gradient norm
常见问题排查:
- 训练不收敛 → 降低学习率,加正则化
- 策略崩溃 → 加熵正则,检查reward shaping
- 价值估计不准 → 单独训练Critic,增加更新频率
- 样本效率低 → 并行环境,减少环境交互时间
工具推荐:
- stable-baselines3:快速验证想法
- Ray/RLlib:大规模分布式训练
- Optuna:超参数搜索
- Weights & Biases:实验追踪
强化学习的调参确实是玄学,但随着经验积累,你会慢慢建立起对算法的直觉。坚持实验,多看代码,勤于总结,假以时日你也能成为RL调参高手。