MDP与Bellman方程详解
关键词速览
| 核心概念 | 状态空间 | 动作空间 | 转移概率 | 即时奖励 |
|---|---|---|---|---|
| 策略函数 | 价值函数 | Q函数 | 最优策略 | 累积折扣回报 |
核心关键词表
术语 英文 符号 说明 马尔可夫决策过程 MDP 序列决策问题的数学框架 状态 State 智能体对环境的感知 动作 Action 智能体的决策行为 奖励 Reward 环境对动作的即时反馈 转移概率 Transition $P(s’ s,a)$ 策略 Policy 状态到动作的映射 价值函数 Value Function 状态的价值评估 Q函数 Action-Value 状态-动作对的价值评估
| 折扣因子 | Discount Factor | | 未来奖励的重要性衰减系数 | | 最优价值函数 | Optimal Value | | 所有策略中的最大价值函数 | | 最优Q函数 | Optimal Q | | 最优策略下的Q值 |
一、马尔可夫决策过程(MDP)定义
马尔可夫决策过程(Markov Decision Process, MDP)是强化学习的理论基础,为序贯决策问题提供了优雅的数学描述框架。MDP的核心假设是马尔可夫性质:给定当前状态,未来状态的条件概率分布仅依赖于当前状态,而与历史状态序列无关。这一特性使得MDP能够捕捉环境动态的精髓,同时保持数学处理的简洁性。
MDP由五元组 完整定义,其中 表示状态空间, 表示动作空间, 定义了状态转移概率函数, 是奖励函数, 为折扣因子。状态空间可以是离散的有限集合,也可以是连续的无限空间,如机器人的位置坐标。动作空间同样可以是离散的(上下左右移动)或连续的(关节角度、力矩大小)。
1.1 状态与状态空间
状态 是智能体对环境当前情况的完整描述。在理想情况下,状态应包含做出最优决策所需的全部信息,即满足充分性原则。实际应用中,状态可能是原始感知数据(如像素值),也可能是经过特征工程提取的高级特征。状态空间 的选择直接影响学习算法的难度——过于简化的状态空间可能丢失关键信息,而过于复杂的状态空间则会导致维度灾难。
状态设计的艺术
1.2 动作与动作空间
动作 是智能体在给定状态下可以采取的行为。动作空间同样分为离散型和连续型。离散动作空间如游戏中的上下左右移动、围棋中的落子位置;连续动作空间如机械臂关节的角度控制、汽车的转向角和加速度。动作空间的选择涉及控制粒度与计算复杂度的权衡。
1.3 奖励机制
奖励函数 是智能体与环境交互的信号,定义了”什么是好的行为”。奖励可以是即时的(每一步获得 )或延迟的(仅在任务结束时获得)。奖励设计是强化学习中的关键环节,错误的奖励设计可能导致智能体发现意想不到的”作弊”行为。Shane Legg等人的研究表明,奖励函数设计需要考虑激励相容性(Incentive Compatibility)问题。
二、策略(Policy)
策略 是智能体的核心,决定了在每个状态下应采取何种动作。策略可以是确定性的 ,也可以是随机性的 。确定性策略给出的是具体的动作选择,而随机策略输出的是各动作的概率分布。
2.1 确定性策略
确定性策略 直接指定每个状态对应的最优动作。这种策略形式简单,在实际应用中计算效率高,特别适用于离散动作空间的确定性问题。
2.2 随机性策略
随机策略 提供了动作选择的概率分布。随机策略在以下场景中尤为重要:
- 部分可观测环境(POMDP):当智能体无法完全观测环境状态时
- 探索-利用平衡Q学习中的ε-greedy策略
- 连续动作空间:无法枚举所有动作,必须用概率分布表示
策略选择原则
确定性策略适合确定性环境和需要精确控制的应用;随机策略适合需要探索的复杂环境和连续动作空间问题。
三、价值函数与Q函数
价值函数是评估策略优劣的核心工具,定义了从长期角度看”什么是好的状态”或”什么是好的状态-动作对”。
3.1 累积折扣回报
智能体的目标是最大化累积折扣回报(Cumulative Discounted Return):
折扣因子 体现了未来奖励的重要性。 时,智能体只关注即时奖励; 时,智能体更重视长期回报。折扣因子还保证了无限时间 horizon 下回报的有限性。
3.2 状态价值函数
状态价值函数 定义为从状态 开始,遵循策略 获得的期望累积折扣回报:
回答了”如果我处于状态 ,平均而言我能获得多少回报”这一根本问题。
3.3 动作价值函数(Q函数)
动作价值函数 定义为从状态 执行动作 后,遵循策略 获得的期望累积折扣回报:
回答了”在状态 下采取动作 ,之后遵循策略 ,平均而言能获得多少回报”。
3.4 价值函数与Q函数的转换
两种价值函数之间存在明确的数学关系:
这表明状态价值是所有可能动作价值的加权平均,权重为策略 选择的概率。
四、Bellman方程推导
Bellman方程是动态规划的数学基础,揭示了价值函数内部的递归结构。这一方程由Richard Bellman在1950年代提出,是强化学习理论的基石。
4.1 Bellman期望方程
将累积回报的定义展开,可以得到价值函数的递归形式——Bellman期望方程:
类似地,Q函数的Bellman期望方程为:
Bellman方程的直观理解
Bellman方程的核心思想是最优性原理:整体最优等价于局部最优。具体而言,一个策略在状态 下的价值,等于立即获得的奖励加上折扣后的未来价值。这一递归结构使得我们可以用迭代方法求解价值函数。
4.2 Bellman最优方程
当目标是寻找最优策略时,价值函数满足Bellman最优方程。对于最优价值函数 :
对于最优Q函数 :
最优价值函数 和最优Q函数 满足以下关系:
五、最优性原理与最优策略
5.1 最优性原理
最优性原理(Principle of Optimality)由Bellman提出,其核心断言是:一个最优策略具有这样的性质——无论初始状态和初始动作如何,接下来的决策必定构成针对新状态的最优策略。这为逆推法(Backward Induction)提供了理论基础。
5.2 最优策略的存在性
对于有限状态空间的MDP,最优策略必定存在。对于连续空间,最优策略的存在性需要额外的技术条件。在无限horizon折扣 MDP中,如果状态空间和动作空间都是有限的,则必定存在确定性的最优策略。
5.3 最优价值函数的唯一性
虽然最优策略可能不唯一,但最优价值函数 和最优Q函数 是唯一的。任何达到最优价值函数的策略都是最优策略。
六、MDP求解方法概述
6.1 动态规划方法
当环境模型( 和 )已知时,可以使用经典的动态规划方法:
- 策略迭代(Policy Iteration):交替进行策略评估和策略改进
- 价值迭代(Value Iteration):直接迭代求解Bellman最优方程
6.2 无模型方法
当环境模型未知时,需要使用无模型方法,包括:
这些方法将在后续章节详细讨论。
冰湖环境示例
在经典的FrozenLake-v0环境中:
- (起点、冰面、洞、目标)
- 和 定义了状态转移和奖励规则
- 智能体需要学习到达目标的最优策略
七、数学形式化总结
MDP核心公式
Bellman最优方程
八、相关文档
- Q学习算法 — 无模型强化学习的经典方法
- 深度Q网络 — 结合深度学习的Q学习扩展
- 策略梯度方法 — 直接优化策略的强化学习方法
- PPO算法 — 主流的策略优化算法
- 多智能体RL — 多个智能体协同决策
- RL应用场景 — 强化学习的实际应用
参考文献
- Sutton, R. S., & Barto, A. G. (2018). Reinforcement Learning: An Introduction (2nd ed.). MIT Press.
- Bellman, R. (1957). Dynamic Programming. Princeton University Press.
- Puterman, M. L. (1994). Markov Decision Processes: Discrete Stochastic Dynamic Programming. Wiley.
- Bertsekas, D. P. (2017). Dynamic Programming and Optimal Control (Vol. I-II). Athena Scientific.
- Mnih, V., et al. (2015). Human-level control through deep reinforcement learning. Nature, 518(7540), 529-533.
本文档为强化学习系列的核心理论基础,后续所有算法均建立在MDP与Bellman方程框架之上。
九、MDP的扩展形式
9.1 部分可观测MDP(POMDP)
当智能体无法完全观测环境状态时,需要使用POMDP(Partially Observable Markov Decision Process):
其中 是观测函数,智能体维护信念状态(Belief State)。
class POMDP:
"""
Partially Observable Markov Decision Process.
"""
def __init__(self, n_states, n_actions, n_observations):
self.n_states = n_states
self.n_actions = n_actions
self.n_observations = n_observations
# 转移概率 P(s'|s, a)
self.transition = np.zeros((n_states, n_actions, n_states))
# 观测概率 P(o|s)
self.observation = np.zeros((n_states, n_observations))
# 奖励函数 R(s, a)
self.reward = np.zeros((n_states, n_actions))
# 折扣因子
self.gamma = 0.99
def update_belief(self, belief, action, observation):
"""
使用贝叶斯更新信念状态.
"""
# 预测步骤
predicted_belief = np.zeros(self.n_states)
for s in range(self.n_states):
for s_prev in range(self.n_states):
predicted_belief[s] += belief[s_prev] * self.transition[s_prev, action, s]
# 更新步骤
updated_belief = np.zeros(self.n_states)
for s in range(self.n_states):
updated_belief[s] = self.observation[s, observation] * predicted_belief[s]
# 归一化
if updated_belief.sum() > 0:
updated_belief /= updated_belief.sum()
return updated_belief
def belief_value(self, belief):
"""
计算信念状态的价值.
"""
# 期望价值
V = 0
for s in range(self.n_states):
V += belief[s] * self.V[s] # 需要存储状态价值
return V
def pomdp_solver(self, horizon=50):
"""
POMDP求解(使用点基值迭代).
"""
# 初始化值函数为超平面集合
self.alpha_vectors = [np.zeros(self.n_states)]
self.action_vectors = [0]
for t in range(horizon):
# 生成新的alpha向量
new_alpha = []
new_actions = []
for a in range(self.n_actions):
for alpha in self.alpha_vectors:
# 对于每个观测,计算
for o in range(self.n_observations):
new_alpha_vec = self.compute_alpha_vector(a, alpha, o)
new_alpha.append(new_alpha_vec)
new_actions.append(a)
# 剪枝
self.alpha_vectors, self.action_vectors = self.prune(new_alpha, new_actions)
return self.alpha_vectors, self.action_vectors9.2 平均奖励MDP
对于持续性任务,折扣回报可能不适合。使用平均奖励MDP:
相应的价值函数定义为:
class AverageRewardMDP:
"""
MDP with average reward criterion.
"""
def __init__(self, n_states, n_actions):
self.n_states = n_states
self.n_actions = n_actions
self.transition = np.zeros((n_states, n_actions, n_states))
self.reward = np.zeros((n_states, n_actions))
def relative_value_iteration(self, theta=1e-6):
"""
相对价值迭代算法.
"""
# 初始化
V = np.zeros(self.n_states)
rho = 0 # 平均奖励估计
while True:
delta = 0
for s in range(self.n_states):
old_v = V[s]
# 计算新价值
new_values = []
for a in range(self.n_actions):
val = 0
for s_next in range(self.n_states):
val += self.transition[s, a, s_next] * (
self.reward[s, a] - rho + V[s_next]
)
new_values.append(val)
V[s] = max(new_values)
delta = max(delta, abs(V[s] - old_v))
# 更新平均奖励
rho = np.mean([max([sum(self.transition[s, a, s_next] * (
self.reward[s, a] + V[s_next]
) for s_next in range(self.n_states)) for a in range(self.n_actions)])
for s in range(self.n_states)])
if delta < theta:
break
return V, rho9.3 随机MDP与鲁棒MDP
class RobustMDP:
"""
Robust MDP for handling transition uncertainty.
"""
def __init__(self, n_states, n_actions, uncertainty_ball=0.1):
self.n_states = n_states
self.n_actions = n_actions
self.uncertainty_ball = uncertainty_ball # 不确定性球半径
# 标称转移概率
self.nominal_transition = np.zeros((n_states, n_actions, n_states))
# 奖励
self.reward = np.zeros((n_states, n_actions))
def worst_case_value(self, s, a, V):
"""
计算最坏情况下的价值.
"""
# 定义不确定集
nominal = self.nominal_transition[s, a]
# 使用ℓ∞球作为不确定集
# 最坏情况:向价值低的方向移动概率
# 简化:假设最坏情况是所有概率集中在最低价值状态
worst_value = float('inf')
for perturbation in self.generate_perturbations(nominal):
value = sum(perturbation[s_next] * (self.reward[s, a] + V[s_next])
for s_next in range(self.n_states))
worst_value = min(worst_value, value)
return worst_value
def robust_value_iteration(self, gamma=0.99, theta=1e-6):
"""
鲁棒价值迭代.
"""
V = np.zeros(self.n_states)
policy = np.zeros(self.n_states, dtype=int)
while True:
delta = 0
for s in range(self.n_states):
old_v = V[s]
# 对每个动作计算最坏情况价值
action_values = []
for a in range(self.n_actions):
worst = self.worst_case_value(s, a, V)
action_values.append(worst)
V[s] = max(action_values)
policy[s] = np.argmax(action_values)
delta = max(delta, abs(V[s] - old_v))
if delta < theta:
break
return V, policy十、MDP求解的高级算法
10.1 异步动态规划
不需要遍历所有状态的动态规划:
class AsynchronousDP:
"""
Asynchronous Dynamic Programming for MDP.
"""
def __init__(self, mdp):
self.mdp = mdp
self.V = np.zeros(mdp.n_states)
self.policy = np.zeros(mdp.n_states, dtype=int)
def prioritized_sweeping(self, theta=0.0001):
"""
优先级扫描:优先更新高优先级状态.
"""
# 计算初始优先级
self.priority_queue = PriorityQueue()
for s in range(self.mdp.n_states):
for a in range(self.mdp.n_actions):
self.compute_priority(s, a)
while not self.priority_queue.empty():
# 弹出最高优先级
s, a = self.priority_queue.pop()
# 更新状态
self.update_state(s, a)
# 更新相关状态的优先级
self.update_related_priorities(s)
def compute_priority(self, s, a):
"""计算状态-动作对的优先级(TD误差)."""
target = 0
for s_next in range(self.mdp.n_states):
target += self.mdp.transition[s, a, s_next] * (
self.mdp.reward[s, a] + self.mdp.gamma * self.V[s_next]
)
td_error = abs(target - self.V[s])
if td_error > self.theta:
self.priority_queue.push((s, a), td_error)
def update_state(self, s, a):
"""更新状态价值."""
target = 0
for s_next in range(self.mdp.n_states):
target += self.mdp.transition[s, a, s_next] * (
self.mdp.reward[s, a] + self.mdp.gamma * self.V[s_next]
)
self.V[s] = max(self.V[s], target)
def update_related_priorities(self, s):
"""更新与状态s相关的状态的优先级."""
for s_prev in range(self.mdp.n_states):
for a in range(self.mdp.n_actions):
prob = self.mdp.transition[s_prev, a, s]
if prob > 0:
self.compute_priority(s_prev, a)10.2 线性规划方法
MDP可以形式化为线性规划问题:
from scipy.optimize import linprog
class LinearProgrammingMDP:
"""
Solving MDP via Linear Programming.
"""
def __init__(self, n_states, n_actions, gamma=0.99):
self.n_states = n_states
self.n_actions = n_actions
self.gamma = gamma
self.transition = None
self.reward = None
self.initial_distribution = None
def setup_lp(self):
"""设置LP问题."""
# 目标函数系数:初始分布
c = -self.initial_distribution # 最大化转为最小化
# 约束矩阵
n_constraints = self.n_states * self.n_actions
A_ub = np.zeros((n_constraints, self.n_states))
b_ub = np.zeros(n_constraints)
idx = 0
for s in range(self.n_states):
for a in range(self.n_actions):
# v(s)的系数
A_ub[idx, s] = 1 - self.gamma * self.transition[s, a, s]
# v(s')的系数
for s_next in range(self.n_states):
if s_next != s:
A_ub[idx, s_next] = -self.gamma * self.transition[s, a, s_next]
# RHS
b_ub[idx] = self.reward[s, a]
idx += 1
return c, A_ub, b_ub
def solve(self):
"""求解LP."""
c, A_ub, b_ub = self.setup_lp()
# 求解
result = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=(None, None))
if result.success:
return result.x
else:
raise ValueError("LP求解失败")10.3 近似动态规划
对于大规模MDP,使用函数逼近:
class ApproximateDP:
"""
Approximate Dynamic Programming with function approximation.
"""
def __init__(self, feature_dim, n_actions, gamma=0.99):
self.feature_dim = feature_dim
self.n_actions = n_actions
self.gamma = gamma
# 线性函数逼近
self.theta = np.zeros((n_actions, feature_dim))
def compute_features(self, state):
"""计算状态特征."""
# 简单的多项式特征
x, y = state[0], state[1]
return np.array([1, x, y, x**2, y**2, x*y])
def approximate_value(self, state, action):
"""近似价值函数."""
features = self.compute_features(state)
return np.dot(self.theta[action], features)
def lstdqp(self, transitions, gamma=0.99):
"""
Least Squares Temporal Difference with Q-function parameterization.
"""
n_samples = len(transitions)
# 构建A和b矩阵
A = np.zeros((self.feature_dim * self.n_actions,
self.feature_dim * self.n_actions))
b = np.zeros(self.feature_dim * self.n_actions)
for s, a, r, s_next, done in transitions:
phi = self.compute_features(s)
phi_next = self.compute_features(s_next)
# 动作one-hot编码
action_vec = np.zeros(self.n_actions)
action_vec[a] = 1
# 下一状态最大Q值对应的特征
best_next_action = self.get_best_action(s_next)
phi_next_best = phi_next * action_vec[best_next_action]
# 更新
phi_a = np.kron(action_vec, phi)
phi_next_a = np.kron(action_vec, phi_next_best)
if not done:
A += np.outer(phi_a - gamma * phi_next_a, phi_a)
b += phi_a * r
else:
A += np.outer(phi_a, phi_a)
b += phi_a * r
# 求解
theta_flat = np.linalg.solve(A + 1e-8 * np.eye(A.shape[0]), b)
self.theta = theta_flat.reshape(self.n_actions, self.feature_dim)
def get_best_action(self, state):
"""获取最优动作."""
q_values = [self.approximate_value(state, a) for a in range(self.n_actions)]
return np.argmax(q_values)十一、MDP的高级拓展
11.1 DEC-POMDP(多智能体POMDP)
多个智能体协作决策:
class DECPOMDP:
"""
Decentralized POMDP for multi-agent coordination.
"""
def __init__(self, n_agents, n_states, n_actions_per_agent, n_obs_per_agent):
self.n_agents = n_agents
self.n_states = n_states
self.n_actions = [n_actions_per_agent] * n_agents
self.n_obs = [n_obs_per_agent] * n_agents
# 全局转移 P(s'|s, a_1, ..., a_n)
self.transition = None
# 联合观测 P(o_1,...,o_n | s')
self.observation = None
# 联合奖励
self.reward = None
def joint_value_function(self, joint_policy, horizon):
"""
计算联合策略的价值.
"""
V = 0
state = self.get_initial_state()
for t in range(horizon):
# 每个智能体根据局部观测选择动作
joint_action = tuple(
policy.sample_action(agent_i, obs_i)
for agent_i, obs_i in enumerate(self.get_local_observations(state))
)
# 执行联合动作
reward = self.reward_function(state, joint_action)
V += self.gamma ** t * reward
# 转移
state = self.sample_next_state(state, joint_action)
return V
def iterated_best_response(self, initial_policies, max_iterations=100):
"""
迭代最佳响应算法求解DEC-POMDP.
"""
policies = list(initial_policies)
for iteration in range(max_iterations):
improved = False
for agent_i in range(self.n_agents):
# 其他智能体的策略
other_policies = [p for j, p in enumerate(policies) if j != agent_i]
# 为当前智能体计算最佳响应
best_response = self.compute_best_response(agent_i, other_policies)
if best_response != policies[agent_i]:
policies[agent_i] = best_response
improved = True
if not improved:
break
return policies11.2 约束MDP(CMDP)
带有约束的MDP:
class ConstrainedMDP:
"""
Constrained MDP with constraint satisfaction.
"""
def __init__(self, n_states, n_actions, constraints):
self.n_states = n_states
self.n_actions = n_actions
self.constraints = constraints # 约束列表
self.transition = None
self.reward = None
self.constraint_rewards = None # 每个约束的奖励函数
def compute_reward(self, state, action):
"""主要目标奖励."""
return self.reward[state, action]
def compute_constraint_reward(self, state, action, constraint_idx):
"""约束奖励."""
return self.constraint_rewards[constraint_idx][state, action]
def lagrangian_policy_gradient(self, lambda_init=None):
"""
拉格朗日乘子法策略梯度.
"""
# 初始化拉格朗日乘子
if lambda_init is None:
lambda_init = np.zeros(len(self.constraints))
self.lambda_k = lambda_init
for iteration in range(1000):
# 1. 固定lambda,优化策略
policy = self.optimize_policy(self.lambda_k)
# 2. 评估约束违反
violations = self.evaluate_constraints(policy)
# 3. 更新lambda(梯度上升)
lr = 0.01
self.lambda_k += lr * violations
self.lambda_k = np.maximum(self.lambda_k, 0) # 非负约束
# 检查收敛
if np.max(np.abs(violations)) < 1e-3:
break
return policy, self.lambda_k
def optimize_policy(self, lambda_vec):
"""给定lambda优化策略."""
# 加权奖励
weighted_reward = np.zeros((self.n_states, self.n_actions))
weighted_reward += self.reward
for i, (lambda_i, constraint_reward) in enumerate(zip(lambda_vec, self.constraint_rewards)):
weighted_reward += lambda_i * constraint_reward
# 使用策略迭代求解
policy = self.policy_iteration(weighted_reward)
return policy11.3 层次MDP(HMDP)
class HierarchicalMDP:
"""
Hierarchical MDP with options framework.
"""
def __init__(self, primitive_actions, primitive_mdp):
self.primitive_actions = primitive_actions
self.primitive_mdp = primitive_mdp
# 高层动作(选项)
self.options = []
def add_option(self, option):
"""添加高层选项."""
self.options.append(option)
def smdp_value_iteration(self, gamma=0.99, theta=1e-6):
"""
Semi-MDP价值迭代.
"""
# 初始化
V = np.zeros(self.primitive_mdp.n_states)
while True:
delta = 0
for s in range(self.primitive_mdp.n_states):
old_v = V[s]
# 考虑所有选项(primitive + options)
all_actions = self.primitive_actions + self.options
action_values = []
for action in all_actions:
if isinstance(action, Option):
value = self.option_value(s, action, V, gamma)
else:
value = self.primitive_value(s, action, V, gamma)
action_values.append(value)
V[s] = max(action_values)
delta = max(delta, abs(V[s] - old_v))
if delta < theta:
break
return V
def option_value(self, s, option, V, gamma):
"""计算选项的价值."""
tau, reward_sum = self.simulate_option(s, option)
# 选项终止状态的价值
final_value = V[tau]
return reward_sum + gamma ** tau * final_value
def primitive_value(self, s, a, V, gamma):
"""计算原始动作的价值."""
value = 0
for s_next in range(self.primitive_mdp.n_states):
value += self.primitive_mdp.transition[s, a, s_next] * (
self.primitive_mdp.reward[s, a] + gamma * V[s_next]
)
return value十二、MDP的理论深度
12.1 MDP的线性代数视角
MDP的价值函数可以用线性方程组表示:
其中 是策略诱导的转移矩阵, 是奖励向量。
class LinearAlgebraMDP:
"""
MDP analysis using linear algebra.
"""
def __init__(self, transition, reward, gamma):
self.P = transition
self.r = reward
self.gamma = gamma
def compute_value_matrix_inverse(self, policy):
"""
使用矩阵求逆计算价值函数.
"""
n_states = self.P.shape[0]
# 策略诱导的转移和奖励
P_pi = np.zeros((n_states, n_states))
r_pi = np.zeros(n_states)
for s in range(n_states):
a = policy[s]
P_pi[s] = self.P[s, a]
r_pi[s] = self.r[s, a]
# V = (I - γP)^(-1) r
I = np.eye(n_states)
V = np.linalg.inv(I - self.gamma * P_pi) @ r_pi
return V
def compute_attention_matrix(self, policy):
"""
计算访问矩阵(基本矩阵).
"""
n_states = self.P.shape[0]
P_pi = np.zeros((n_states, n_states))
for s in range(n_states):
a = policy[s]
P_pi[s] = self.P[s, a]
# 基本矩阵 Z = (I - P)^(-1)
I = np.eye(n_states)
Z = np.linalg.inv(I - P_pi)
# Z[i,j] 表示从状态i出发访问状态j的期望次数
return Z
def analyze_stationary_distribution(self, policy):
"""
计算平稳分布.
"""
n_states = self.P.shape[0]
# 构建转移矩阵
P_pi = np.zeros((n_states, n_states))
for s in range(n_states):
a = policy[s]
P_pi[s] = self.P[s, a]
# 求解平稳分布 π = πP
eigenvalues, eigenvectors = np.linalg.eig(P_pi.T)
# 找到特征值为1的特征向量
idx = np.argmin(np.abs(eigenvalues - 1))
stationary = np.real(eigenvectors[:, idx])
stationary = stationary / stationary.sum() # 归一化
return stationary12.2 收缩映射定理的详细证明
定理:Bellman期望算子 是 -收缩的。
证明:
令 为Bellman期望算子,即:
对任意两个价值函数 :
因此 。
12.3 策略梯度定理的推导
策略梯度定理是策略梯度方法的基础:
通过重要性采样和微分计算可得:
十三、MDP的实践应用
13.1 迷宫求解
class MazeMDP:
"""
MDP formulation for maze navigation.
"""
def __init__(self, maze):
self.maze = maze
self.height, self.width = maze.shape
# 状态:每个格子一个状态
self.n_states = self.height * self.width
# 动作:上下左右
self.n_actions = 4
# 构建转移和奖励
self.build_mdp()
def state_to_pos(self, s):
"""状态转位置."""
return s // self.width, s % self.width
def pos_to_state(self, row, col):
"""位置转状态."""
if 0 <= row < self.height and 0 <= col < self.width:
return row * self.width + col
return -1
def build_mdp(self):
"""构建MDP."""
self.transition = np.zeros((self.n_states, self.n_actions, self.n_states))
self.reward = np.zeros((self.n_states, self.n_actions))
for row in range(self.height):
for col in range(self.width):
s = self.pos_to_state(row, col)
if self.maze[row, col] == 1: # 障碍
continue
# 动作:0=上, 1=下, 2=左, 3=右
for a, (dr, dc) in enumerate([(-1,0), (1,0), (0,-1), (0,1)]):
nr, nc = row + dr, col + dc
ns = self.pos_to_state(nr, nc)
if ns >= 0 and self.maze[nr, nc] != 1: # 可达
self.transition[s, a, ns] = 1.0
# 奖励
if self.maze[nr, nc] == 2: # 目标
self.reward[s, a] = 10.0
else:
self.reward[s, a] = -0.01 # 小惩罚鼓励短路径
else: # 撞墙,留在原地
self.transition[s, a, s] = 1.0
self.reward[s, a] = -0.5
def solve_value_iteration(self, gamma=0.99, theta=1e-6):
"""价值迭代求解."""
V = np.zeros(self.n_states)
policy = np.zeros(self.n_states, dtype=int)
while True:
delta = 0
for s in range(self.n_states):
if self.transition[s].sum() == 0: # 障碍
continue
old_v = V[s]
# 寻找最优动作
action_values = []
for a in range(self.n_actions):
v = 0
for s_next in range(self.n_states):
v += self.transition[s, a, s_next] * (
self.reward[s, a] + gamma * V[s_next]
)
action_values.append(v)
V[s] = max(action_values)
policy[s] = np.argmax(action_values)
delta = max(delta, abs(V[s] - old_v))
if delta < theta:
break
return V, policy
def visualize(self, policy):
"""可视化策略."""
symbols = {0: '↑', 1: '↓', 2: '←', 3: '→'}
grid = self.maze.copy().astype(str)
for s in range(self.n_states):
row, col = self.state_to_pos(s)
if self.maze[row, col] == 0: # 可通行
grid[row, col] = symbols[policy[s]]
return grid13.2 库存管理问题
class InventoryMDP:
"""
MDP for inventory management.
"""
def __init__(self, max_inventory=10, max_order=5, holding_cost=1, stockout_cost=10):
self.max_inventory = max_inventory
self.max_order = max_order
self.holding_cost = holding_cost
self.stockout_cost = stockout_cost
# 状态空间:当前库存
self.n_states = max_inventory + 1
# 动作空间:订购数量
self.n_actions = max_order + 1
# 需求分布(假设泊松分布)
self.demand_lambda = 3
def demand_prob(self, d):
"""需求概率(泊松分布)."""
from scipy.stats import poisson
return poisson.pmf(d, self.demand_lambda)
def transition_prob(self, s, a, s_next):
"""计算转移概率."""
# s' = min(s + a - d, max_inventory)
# 其中d是需求
demand = s + a - s_next
if demand < 0:
return 0 # 不可能
# 最多需求max_inventory
if demand > self.max_inventory:
demand = self.max_inventory
return self.demand_prob(demand)
def immediate_cost(self, s, a):
"""即时成本."""
# 订购成本
order_cost = a * 2
# 期望持有/缺货成本
expected_cost = 0
for d in range(self.max_inventory + 2):
prob = self.demand_prob(d)
# 销售后的库存
inventory_after_demand = max(0, min(s + a, self.max_inventory) - d)
# 销售损失
sales_lost = max(0, d - s - a)
expected_cost += prob * (
self.holding_cost * inventory_after_demand +
self.stockout_cost * sales_lost
)
return order_cost + expected_cost
def solve(self, gamma=0.9):
"""求解最优策略."""
V = np.zeros(self.n_states)
policy = np.zeros(self.n_states, dtype=int)
for _ in range(100):
for s in range(self.n_states):
action_values = []
for a in range(self.n_actions):
# 计算期望价值
expected_v = 0
for s_next in range(self.n_states):
p = self.transition_prob(s, a, s_next)
expected_v += p * V[s_next]
total_value = self.immediate_cost(s, a) + gamma * expected_v
action_values.append(total_value)
policy[s] = np.argmin(action_values) # 最小化成本
V[s] = min(action_values)
return V, policy十四、MDP的高级分析
14.1 灵敏度分析
class MDPSensitivityAnalysis:
"""
Sensitivity analysis for MDP parameters.
"""
def __init__(self, base_mdp):
self.base_mdp = base_mdp
self.base_V, self.base_policy = base_mdp.solve()
def gamma_sensitivity(self, gamma_range):
"""
分析折扣因子对最优策略的影响.
"""
results = []
for gamma in gamma_range:
mdp = copy.deepcopy(self.base_mdp)
mdp.gamma = gamma
V, policy = mdp.solve()
# 计算策略变化
policy_change = np.mean(policy != self.base_policy)
# 计算价值变化
value_change = np.mean(np.abs(V - self.base_V))
results.append({
'gamma': gamma,
'policy_change': policy_change,
'value_change': value_change
})
return results
def reward_sensitivity(self, reward_variation):
"""
分析奖励变化的影响.
"""
results = []
for delta in reward_variation:
mdp = copy.deepcopy(self.base_mdp)
mdp.reward += delta
V, policy = mdp.solve()
results.append({
'delta': delta,
'value_change': np.mean(np.abs(V - self.base_V)),
'policy_change': np.mean(policy != self.base_policy)
})
return results14.2 MDP的稳定性分析
class MDPStabilityAnalysis:
"""
Stability analysis for MDP and policies.
"""
def __init__(self, mdp):
self.mdp = mdp
def compute_spectrum(self, policy):
"""
计算策略诱导转移矩阵的特征值谱.
"""
n_states = self.mdp.n_states
# 构建策略诱导的转移矩阵
P_pi = np.zeros((n_states, n_states))
for s in range(n_states):
a = policy[s]
P_pi[s] = self.mdp.transition[s, a]
# 特征值分解
eigenvalues, eigenvectors = np.linalg.eig(P_pi.T)
return eigenvalues
def mixing_time(self, policy, threshold=0.25):
"""
计算混合时间.
"""
eigenvalues = self.compute_spectrum(policy)
# 排除特征值1
other_eigenvalues = np.abs(eigenvalues[eigenvalues != 1.0])
if len(other_eigenvalues) == 0:
return np.inf
# 找到使谱半径小于threshold的特征值
sorted_eigs = np.sort(other_eigenvalues)[::-1]
for i, eig in enumerate(sorted_eigs):
if eig < threshold:
return i + 1
return len(sorted_eigenvalues) + 1十五、MDP与机器学习的融合
15.1 从数据学习MDP
class MDPLearning:
"""
Learning MDP model from observed data.
"""
def __init__(self, n_states, n_actions):
self.n_states = n_states
self.n_actions = n_actions
# 转移计数
self.transition_counts = np.zeros((n_states, n_actions, n_states))
# 奖励样本
self.reward_samples = []
# 访问计数
self.state_action_counts = np.zeros((n_states, n_actions))
def update_from_trajectory(self, trajectory):
"""
从轨迹数据更新模型.
"""
states, actions, rewards = trajectory
for i in range(len(states) - 1):
s, a, r = states[i], actions[i], rewards[i]
s_next = states[i+1]
self.transition_counts[s, a, s_next] += 1
self.reward_samples.append((s, a, r))
self.state_action_counts[s, a] += 1
def estimate_transition(self, s, a):
"""
最大似然估计转移概率.
"""
counts = self.transition_counts[s, a]
total = counts.sum()
if total == 0:
return np.ones(self.n_states) / self.n_states
return counts / total
def estimate_reward(self, s, a):
"""
估计奖励函数(均值).
"""
samples = [r for (ss, aa, r) in self.reward_samples if ss == s and aa == a]
if len(samples) == 0:
return 0
return np.mean(samples)
def build_mdp(self):
"""
从数据构建MDP模型.
"""
transition = np.zeros((self.n_states, self.n_actions, self.n_states))
reward = np.zeros((self.n_states, self.n_actions))
for s in range(self.n_states):
for a in range(self.n_actions):
transition[s, a] = self.estimate_transition(s, a)
reward[s, a] = self.estimate_reward(s, a)
return MDP(transition, reward, gamma=0.99)十六、总结与展望
16.1 MDP理论的核心要点
马尔可夫决策过程(MDP)是强化学习的数学基石,它将序贯决策问题形式化为一个优雅的数学框架。理解MDP的关键在于把握以下几个核心概念:
-
状态与马尔可夫性:状态是对环境的完整描述,马尔可夫性保证了当前状态蕴含了做出最优决策所需的全部信息。
-
策略与价值函数:策略定义了智能体的行为方式,价值函数评估了策略的优劣,两者相互依存。
-
Bellman方程:揭示了价值函数的递归结构,是所有动态规划方法的基础。
-
最优性原理:最优策略具有”无论初始状态如何,接下来的决策都是最优的”这一性质,使得逆推法成为可能。
16.2 未来研究方向
MDP研究仍有许多开放问题值得探索:
-
大规模MDP:状态空间爆炸问题需要新的近似和分解技术。
-
鲁棒MDP:如何设计对模型不确定性和对抗干扰鲁棒的决策策略。
-
层次MDP:如何自动发现和学习有意义的抽象决策层次。
-
多智能体MDP:协作与竞争环境下的决策理论。
-
因果推断与MDP:将因果推理的思想融入MDP框架。
参考文献(续)
- Bertsekas, D. P. (2012). Dynamic Programming and Optimal Control (Vol. II). Athena Scientific.
- Puterman, M. L., & Shin, M. C. (1978). Modified policy iteration algorithms for discounted Markov decision problems. Management Science, 24(11), 1127-1137.
- Hansen, E. A., & Zilberstein, S. (2001). LAO* planning: A heuristic search approach to planning with uncertainty. Artificial Intelligence, 128(1-2), 89-115.
- Kaelbling, L. P., Littman, M. L., & Cassandra, A. R. (1998). Planning and acting in partially observable stochastic domains. Artificial Intelligence, 101(1-2), 99-134.
- Bagnell, J. A., et al. (2001). Policy invariant under approximation for discounted Markov decision processes. NIPS.
MDP是连接决策科学与机器学习的桥梁。深入理解MDP不仅对强化学习至关重要,也为运筹学、经济学、机器人学等领域提供了统一的分析框架。
15.2 基于模拟的优化
class SimulationBasedOptimization:
"""
Simulation-based optimization for MDP.
"""
def __init__(self, simulator, n_states, n_actions):
self.simulator = simulator
self.n_states = n_states
self.n_actions = n_actions
def rollout(self, policy, n_rollouts=100, horizon=100):
"""
蒙特卡洛回滚评估策略.
"""
total_returns = []
for _ in range(n_rollouts):
state = self.simulator.reset()
G = 0
for t in range(horizon):
action = policy[state]
state, reward, done, _ = self.simulator.step(action)
G += (self.gamma ** t) * reward
if done:
break
total_returns.append(G)
return np.mean(total_returns), np.std(total_returns)
def policy_search(self, n_iterations=100):
"""
基于模拟的策略搜索.
"""
# 随机初始化策略
best_policy = np.random.randint(0, self.n_actions, self.n_states)
best_value = self.rollout(best_policy)[0]
for iteration in range(n_iterations):
# 随机扰动
new_policy = best_policy.copy()
# 随机选择一些状态改变
n_changes = np.random.randint(1, 5)
states_to_change = np.random.randint(0, self.n_states, n_changes)
for s in states_to_change:
new_policy[s] = np.random.randint(0, self.n_actions)
# 评估新策略
new_value, _ = self.rollout(new_policy)
if new_value > best_value:
best_policy = new_policy
best_value = new_value
return best_policy, best_valueMDP理论是强化学习大厦的基石。从理论基础到高级算法,从理论分析到实践应用,MDP为我们提供了分析和解决序贯决策问题的完整框架。