关键词
| 术语 | 英文 | 核心概念 |
|---|---|---|
| Stackelberg博弈 | Stackelberg Game | 领导者-追随者结构的不完全信息博弈 |
| 演化博弈论 | Evolutionary Game Theory | 策略随时间演化学习 |
| 复制者动态 | Replicator Dynamics | 策略频率随收益动态变化 |
| 鲁棒博弈 | Robust Game | 参数不确定下的均衡分析 |
| 贝叶斯博弈 | Bayesian Game | 不完全信息博弈 |
| 拍卖理论 | Auction Theory | 资源分配的激励机制设计 |
| 联邦学习 | Federated Learning | 分布式机器学习的博弈建模 |
| 机制设计 | Mechanism Design | 激励相容的规则设计 |
| 相关均衡 | Correlated Equilibrium | 比Nash更弱的均衡概念 |
| 多智能体强化学习 | MARL | 多智能体环境下的强化学习 |
| 纳什均衡 | Nash Equilibrium | 无人可单方面改进的策略组合 |
| 帕累托最优 | Pareto Optimality | 无法使某人更好而不使他人更差的资源配置 |
| 社会福利 | Social Welfare | 所有参与人收益的总和 |
| 激励相容 | Incentive Compatibility | 说实话是占优策略 |
1. 引言:多智能体系统的复杂性
现实世界中的决策问题很少是孤立的——自动驾驶车辆需要相互协调、电子商务平台需要应对竞争商家的策略、电网调度需要平衡多方利益。多智能体博弈(Multi-Agent Games)正是研究这类复杂交互场景的核心框架,它将单智能体博弈论扩展到多个具有自主决策能力的智能体之间的策略交互。
多智能体博弈的挑战
与两人零和博弈不同,多智能体系统面临的核心挑战包括:策略空间指数级增长、均衡概念的多重性、计算复杂性、通讯与协调问题,以及如何在不完全信息和动态环境中做出决策。
1.1 多智能体系统的分类
多智能体系统可以从多个维度进行分类:
按智能体关系分类:
- 合作博弈:智能体有共同目标,需要协调行动
- 竞争博弈:智能体目标冲突,如零和博弈
- 混合博弈:同时包含合作和竞争元素
按信息结构分类:
- 完全信息博弈:所有智能体知道博弈结构
- 不完全信息博弈:智能体拥有关于其他智能体的私有信息
- 完美信息博弈:智能体知道博弈的完整历史
- 不完美信息博弈:智能体只能观察到部分历史
按决策时序分类:
- 同时行动博弈:所有智能体同时选择策略
- 序贯博弈:智能体按顺序行动,后行者可观察先行者
1.2 多智能体系统的应用场景
多智能体博弈理论在现实中有着广泛的应用:
经济领域:
- 市场竞争分析:多家企业的定价策略博弈
- 拍卖市场:频谱拍卖、广告拍卖的机制设计
- 电网调度:多方电力生产者的竞价博弈
人工智能:
- 自动驾驶:多车协同决策
- 游戏AI:多人对战游戏的智能对手
- 机器人团队:分布式机器人协调
网络安全:
- 入侵检测:攻击者与防御者的博弈
- 隐私保护:数据发布与隐私泄露的对抗
- 网络安全资源分配:防御资源的优化部署
2. Stackelberg博弈
2.1 博弈结构与数学建模
Stackelberg博弈是一种序贯博弈,其中一个参与人(领导者)先行动,其他参与人(追随者)在观察到领导者的行动后做出反应。这种”主从”结构广泛存在于市场竞争、安全巡逻、资源分配等场景。
import numpy as np
from scipy.optimize import minimize, linprog
class StackelbergGame:
"""
Stackelberg博弈:领导者-追随者结构
数学模型:
- 领导者:选择策略 x ∈ X ⊆ ℝ^n
- 追随者:观察到 x 后选择策略 y ∈ Y(x) ⊆ ℝ^m
目标函数:
- 领导者:max_{x} u_L(x, y^*(x))
- 追随者:y^*(x) = argmax_{y ∈ Y(x)} u_F(x, y)
其中 y^*(x) 是追随者的最优响应函数
"""
def __init__(self, leader_payoff, follower_payoff,
leader_action_set, follower_action_set):
self.leader_payoff = leader_payoff # 领导者收益函数
self.follower_payoff = follower_payoff # 追随者收益函数
self.leader_action_set = leader_action_set # 领导者行动空间
self.follower_action_set = follower_action_set # 追随者行动空间
def follower_best_response(self, leader_action):
"""
追随者的最优响应函数
给定领导者的行动,求追随者的最优反应
"""
best_y = None
best_payoff = float('-inf')
for y in self.follower_action_set:
payoff = self.follower_payoff(leader_action, y)
if payoff > best_payoff:
best_payoff = payoff
best_y = y
return best_y, best_payoff
def solve_leader_problem(self):
"""
求解Stackelberg均衡
领导者的问题:max_{x} u_L(x, BR_F(x))
其中 BR_F(x) 是追随者的最优响应
"""
best_x = None
best_leader_payoff = float('-inf')
for x in self.leader_action_set:
y_star, _ = self.follower_best_response(x)
leader_payoff = self.leader_payoff(x, y_star)
if leader_payoff > best_leader_payoff:
best_leader_payoff = leader_payoff
best_x = x
y_star, follower_payoff = self.follower_best_response(best_x)
return {
'leader_action': best_x,
'follower_action': y_star,
'leader_payoff': best_leader_payoff,
'follower_payoff': follower_payoff
}
# 安全博弈示例:Stackelberg安全游戏
class SecurityGame:
"""
Stackelberg安全博弈(LSG)
场景:防御者(领导者)决定资源部署,追击者(追随者)选择攻击目标
应用:
- 机场安检部署
- 网络安全资源分配
- 野生动物保护巡逻
"""
def __init__(self, targets, attacker_types, target_values, attack_costs):
"""
参数:
targets: 目标集合
attacker_types: 攻击者类型
target_values: 目标价值 V_t
attack_costs: 攻击成本 C_t^k
"""
self.targets = targets
self.attacker_types = attacker_types
self.target_values = np.array(target_values)
self.attack_costs = attack_costs # C[t, attacker_type]
def defender_payoff(self, coverage, attacker_action, attacker_type):
"""
防御者收益
如果攻击目标被覆盖:成功阻止攻击,获得目标价值
如果攻击目标未被覆盖:攻击成功,失去目标价值
"""
if attacker_action in coverage:
# 成功阻止
return self.target_values[attacker_action]
else:
# 攻击成功
return -self.target_values[attacker_action]
def attacker_payoff(self, coverage, attacker_action, attacker_type):
"""
攻击者收益
如果攻击被阻止:损失攻击成本
如果攻击成功:获得目标价值减去成本
"""
attack_cost = self.attack_costs[attacker_action, attacker_type]
if attacker_action in coverage:
# 攻击被阻止
return -attack_cost
else:
# 攻击成功
return self.target_values[attacker_action] - attack_cost
def attacker_best_response(self, coverage, attacker_type):
"""
攻击者的最优响应:选择期望收益最高的目标
"""
best_target = None
best_payoff = float('-inf')
for target in self.targets:
payoff = self.attacker_payoff(coverage, target, attacker_type)
if payoff > best_payoff:
best_payoff = payoff
best_target = target
return best_target, best_payoff
def solve_optimal_coverage(self, num_coverage):
"""
求解最优覆盖策略
使用MILP(混合整数线性规划)求解Stackelberg均衡
"""
n_targets = len(self.targets)
n_types = len(self.attacker_types)
# 变量:x_t = 1 if 目标t被覆盖
# 约束:sum(x_t) <= num_coverage
best_coverage = None
best_defender_utility = float('-inf')
# 暴力搜索(对于小规模问题)
from itertools import combinations
for coverage in combinations(range(n_targets), num_coverage):
coverage = set(coverage)
# 计算每个攻击者类型的期望收益
defender_utilities = []
for k in range(n_types):
target, _ = self.attacker_best_response(coverage, k)
util = self.defender_payoff(coverage, target, k)
defender_utilities.append(util)
# 防御者收益是所有攻击者类型的最小值(最坏情况)
min_util = min(defender_utilities)
if min_util > best_defender_utility:
best_defender_utility = min_util
best_coverage = coverage
return best_coverage, best_defender_utility2.2 Stackelberg均衡与Nash均衡的关系
Stackelberg博弈中的均衡概念比Nash均衡更强,因为领导者的先动优势(First-Mover Advantage)意味着追随者必须对领导者的策略做出最优响应。
def compare_stackelberg_nash(game):
"""
比较Stackelberg均衡和Nash均衡
Stackelberg均衡:
- 领导者选择策略 x^*
- 追随者选择 y^*(x^*)
- x^* 使得领导者收益最大化
Nash均衡:
- 同时行动
- 每个参与人都无法单方面改进
"""
# 对于简单的2x2博弈
payoff_matrix_leader = game.leader_payoff
payoff_matrix_follower = game.follower_payoff
# 计算Stackelberg均衡
stackelberg = game.solve_leader_problem()
# 计算Nash均衡
nash_equilibria = []
for i in range(len(game.leader_action_set)):
for j in range(len(game.follower_action_set)):
is_nash = True
# 检查领导者能否改进
for i_prime in range(len(game.leader_action_set)):
if payoff_matrix_leader[i_prime, j] > payoff_matrix_leader[i, j]:
is_nash = False
break
# 检查追随者能否改进
for j_prime in range(len(game.follower_action_set)):
if payoff_matrix_follower[i, j_prime] > payoff_matrix_follower[i, j]:
is_nash = False
break
if is_nash:
nash_equilibria.append((i, j))
return stackelberg, nash_equilibriaStackelberg vs Nash
在某些博弈中,Stackelberg均衡可能比Nash均衡给领导者带来更高的收益。这是因为领导者可以通过承诺(commitment)来限制追随者的选择,从而利用先动优势。
3. 演化博弈论
3.1 演化博弈的核心思想
演化博弈论(Evolutionary Game Theory)源于生物学研究,由John Maynard Smith在1982年系统化。它假设参与人不是完全理性的,而是通过策略的”自然选择”和”复制”过程逐渐趋向均衡。
核心假设:
- 有限理性:参与人通过试错学习,而非完美计算
- 群体选择:采用高收益策略的参与人更可能”复制”成功
- 动态演化:策略频率随时间变化
class EvolutionaryGame:
"""
演化博弈论框架
核心概念:
- 群体:大量参与人的集合
- 策略频率:群体中使用各策略的比例
- 适应度:策略的收益,决定复制速度
- 演化稳定策略(ESS):无法被小突变群体入侵的策略
"""
def __init__(self, payoff_matrix, population_sizes=None):
"""
参数:
payoff_matrix: 收益矩阵(大小:策略数 × 策略数)
population_sizes: 各策略的初始人口数
"""
self.payoff_matrix = np.array(payoff_matrix)
self.n_strategies = payoff_matrix.shape[0]
if population_sizes is None:
self.population = np.ones(self.n_strategies)
else:
self.population = np.array(population_sizes)
self.history = [self.population.copy()]
def compute_fitness(self, strategy_idx):
"""
计算策略的适应度
适应度 = 策略与其他策略博弈的平均收益
"""
population_frequencies = self.population / self.population.sum()
return np.dot(self.payoff_matrix[strategy_idx], population_frequencies)
def replicator_dynamics(self, dt=0.01):
"""
复制者动态方程
d x_i / dt = x_i * (f_i(x) - φ(x))
其中:
- x_i: 策略i的人口比例
- f_i(x): 策略i的适应度
- φ(x): 平均适应度 = sum(x_i * f_i(x))
"""
total_population = self.population.sum()
frequencies = self.population / total_population
# 计算各策略适应度
fitness = np.array([self.compute_fitness(i) for i in range(self.n_strategies)])
mean_fitness = np.dot(frequencies, fitness)
# 复制者动态
growth_rates = fitness - mean_fitness
self.population = self.population * (1 + dt * growth_rates)
# 确保人口非负
self.population = np.maximum(self.population, 0)
self.history.append(self.population.copy())
def simulate_evolution(self, num_steps=1000, dt=0.01):
"""
演化模拟
"""
for _ in range(num_steps):
self.replicator_dynamics(dt)
return self.get_equilibrium()
def get_equilibrium(self):
"""获取当前均衡状态"""
total = self.population.sum()
return self.population / total
def find_evolutionary_stable_strategies(self):
"""
寻找演化稳定策略(ESS)
ESS定义:
对于策略x^*,如果对于任意其他策略y ≠ x^*,当y的比例很小时:
u(x^*, εy + (1-ε)x^*) > u(y, εy + (1-ε)x^*)
其中ε是y的突变比例
"""
ess_candidates = []
# 检查纯策略是否为ESS
for i in range(self.n_strategies):
is_ess = True
for j in range(self.n_strategies):
if i == j:
continue
# 条件1:均衡时i的收益 >= j的收益
if self.payoff_matrix[i, i] < self.payoff_matrix[j, i]:
is_ess = False
break
# 条件2:如果收益相等,则i对i的收益 > j对i的收益
if self.payoff_matrix[i, i] == self.payoff_matrix[j, i]:
if self.payoff_matrix[i, j] <= self.payoff_matrix[j, j]:
is_ess = False
break
if is_ess:
ess_candidates.append(i)
return ess_candidates3.2 鹰鸽博弈:演化博弈的经典案例
def hawk_dove_game():
"""
鹰鸽博弈(Hawks and Doves Game)
场景:两只动物争夺价值V的资源
- 鹰策略:战斗直到受伤或对手撤退
- 鸽子策略:示威或撤退
收益矩阵(每场冲突):
鸽子 鹰
鸽子 V/2, V/2 0, V
鹰 V, 0 (V-C)/2, (V-C)/2
其中C是战斗受伤的成本(通常C > V)
"""
V = 10 # 资源价值
C = 20 # 战斗成本
payoff_matrix = np.array([
[V/2, 0], # 鸽子 vs 鸽子, 鸽子
[V, (V-C)/2] # 鹰 vs 鸽子, 鹰
])
game = EvolutionaryGame(payoff_matrix)
# 模拟演化过程
print("鹰鸽博弈演化分析:")
print(f"初始分布: {game.get_equilibrium()}")
game.simulate_evolution(num_steps=1000)
print(f"稳定分布: {game.get_equilibrium()}")
ess = game.find_evolutionary_stable_strategies()
print(f"ESS候选: {[['鸽子', '鹰'][i] for i in ess]}")
# 计算混合策略ESS
# 设p为选择鹰的概率
# ESS条件:p = (V - C + V) / (2V - 2C + 2V - C) = ...
# 简化:p* = V/C
p_ess = V / C
print(f"\n混合策略ESS: 鹰的概率 = {p_ess:.2f}")
print(f"解释:当群体中鹰的比例为{V/C:.0%}时,鸽子无法入侵")
# 运行分析
hawk_dove_game()3.3 石头-剪刀-布博弈
class RockPaperScissorsEvolution:
"""
石头剪刀布的演化博弈分析
收益矩阵:
R P S
R 0,0 -1,1 1,-1
P 1,-1 0,0 -1,1
S -1,1 1,-1 0,0
"""
def __init__(self):
# R=0, P=1, S=2
self.payoff_matrix = np.array([
[0, -1, 1], # R
[1, 0, -1], # P
[-1, 1, 0] # S
])
self.population = np.array([100, 100, 100]) # 初始人口
self.history = []
def fitness(self, strategy_idx):
"""计算策略适应度"""
total = self.population.sum()
freqs = self.population / total
return np.dot(self.payoff_matrix[strategy_idx], freqs)
def step(self, dt=0.01):
"""单步演化"""
fitnesses = np.array([self.fitness(i) for i in range(3)])
mean_fitness = np.dot(fitnesses, self.population / self.population.sum())
# 复制者动态
growth = self.population * (fitnesses - mean_fitness)
self.population = np.maximum(self.population + dt * growth, 0)
self.population = self.population / self.population.sum() * 300 # 保持总数
self.history.append(self.population.copy())
def simulate(self, steps=1000):
"""完整模拟"""
for _ in range(steps):
self.step()
return self.get_state()
def get_state(self):
"""获取当前状态"""
total = self.population.sum()
return self.population / total
def cyclic_analysis(self):
"""
分析循环动力学
石头剪刀布不收敛到固定点,而是在三维空间中循环
"""
import numpy as np
from scipy.integrate import odeint
# 连续时间复制者动态
def replicator_dt(pop, t, payoff):
pop = np.maximum(pop, 0)
pop = pop / pop.sum()
fitness = payoff @ pop
return pop * (fitness - np.dot(pop, fitness))
t = np.linspace(0, 50, 1000)
initial = [0.4, 0.3, 0.3]
trajectory = odeint(replicator_dt, initial, t, args=(self.payoff_matrix,))
return t, trajectory4. 博弈论中的学习算法
4.1 Replicator Dynamics(复制者动态)
复制者动态是演化博弈论中最基本的学习模型,它假设采用高收益策略的个体以更高速率复制:
class ReplicatorDynamics:
"""
复制者动态实现
方程:dx_i/dt = x_i * (f_i(x) - φ(x))
其中:
- x_i: 策略i的人口比例
- f_i(x): 策略i的期望收益
- φ(x): 平均收益
"""
def __init__(self, payoff_matrix, initial_distribution):
self.payoff_matrix = np.array(payoff_matrix)
self.x = np.array(initial_distribution)
self.history = [self.x.copy()]
def step(self, dt=0.01):
"""单步演化"""
# 计算各策略收益
fitness = self.payoff_matrix @ self.x
mean_fitness = np.dot(self.x, fitness)
# 复制者动态
growth_rate = fitness - mean_fitness
self.x = self.x * (1 + dt * growth_rate)
# 归一化
self.x = np.maximum(self.x, 0) # 确保非负
self.x = self.x / self.x.sum()
self.history.append(self.x.copy())
def run(self, num_steps=1000, dt=0.01):
"""运行完整演化"""
for _ in range(num_steps):
self.step(dt)
return self.x
class SpatialReplicatorDynamics:
"""
空间复制者动态
考虑空间结构的演化博弈
个体只与邻居交互
"""
def __init__(self, payoff_matrix, grid_size=20, neighbor_type='moore'):
self.payoff_matrix = payoff_matrix
self.grid_size = grid_size
self.neighbor_type = neighbor_type
self.n_strategies = payoff_matrix.shape[0]
# 随机初始化网格
self.grid = np.random.randint(0, self.n_strategies, (grid_size, grid_size))
def get_neighbors(self, x, y):
"""获取邻居"""
if self.neighbor_type == 'moore':
# 8邻域
neighbors = []
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
if dx == 0 and dy == 0:
continue
nx, ny = (x + dx) % self.grid_size, (y + dy) % self.grid_size
neighbors.append(self.grid[nx, ny])
else:
# 4邻域
neighbors = []
for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
nx, ny = (x + dx) % self.grid_size, (y + dy) % self.grid_size
neighbors.append(self.grid[nx, ny])
return neighbors
def fitness(self, x, y):
"""计算位置(x,y)的适应度"""
strategy = self.grid[x, y]
neighbors = self.get_neighbors(x, y)
total_payoff = 0
for neighbor_strategy in neighbors + [strategy]:
total_payoff += self.payoff_matrix[strategy, neighbor_strategy]
return total_payoff / (len(neighbors) + 1)
def step(self):
"""单步演化"""
new_grid = self.grid.copy()
for x in range(self.grid_size):
for y in range(self.grid_size):
# 计算当前适应度
current_fitness = self.fitness(x, y)
# 找邻居中适应度最高的
neighbors = self.get_neighbors(x, y)
best_neighbor = None
best_fitness = current_fitness
for nx, ny in neighbors:
f = self.fitness(nx, ny)
if f > best_fitness:
best_fitness = f
best_neighbor = (nx, ny)
# 如果有更好的邻居,复制其策略
if best_neighbor is not None:
new_grid[x, y] = self.grid[best_neighbor]
self.grid = new_grid
def run(self, num_steps=100):
"""运行模拟"""
for _ in range(num_steps):
self.step()
return self.grid4.2 虚拟对局与无遗憾学习
class FictitiousPlay:
"""
虚拟对局(Fictitious Play)
学习规则:
1. 记录对手历史行动的平均策略
2. 对平均策略选择最优响应
3. 更新自己的行动历史
"""
def __init__(self, payoff_matrix):
self.payoff_matrix = np.array(payoff_matrix)
self.n_strategies = payoff_matrix.shape[0]
self.action_counts = np.zeros(self.n_strategies)
self.opponent_history = []
def update_opponent_history(self, opponent_action):
"""更新对手历史"""
self.opponent_history.append(opponent_action)
self.action_counts[opponent_action] += 1
def opponent_average_strategy(self):
"""对手的平均策略"""
return self.action_counts / self.action_counts.sum()
def best_response(self, opponent_avg_strategy):
"""对对手平均策略的最优响应"""
expected_payoffs = self.payoff_matrix @ opponent_avg_strategy
return np.argmax(expected_payoffs)
def choose_action(self):
"""选择行动"""
opponent_avg = self.opponent_average_strategy()
return self.best_response(opponent_avg)
def learn(self, num_iterations, opponent_policy=None):
"""
学习过程
参数:
opponent_policy: 对手策略函数(历史) -> 行动
"""
for t in range(num_iterations):
action = self.choose_action()
if opponent_policy:
opponent_action = opponent_policy(self.opponent_history)
else:
opponent_action = np.random.randint(self.n_strategies)
self.update_opponent_history(opponent_action)
return self.action_counts / sum(self.action_counts)
class StochasticFictitiousPlay:
"""
随机虚拟对局
带噪声的虚拟对局,更接近真实学习过程
"""
def __init__(self, payoff_matrix, temperature=1.0):
self.payoff_matrix = np.array(payoff_matrix)
self.n_strategies = payoff_matrix.shape[0]
self.temperature = temperature
self.action_counts = np.zeros(self.n_strategies)
self.opponent_history = []
def boltzmann_probability(self, expected_payoffs):
"""Boltzmann概率"""
exp_payoffs = np.exp(expected_payoffs / self.temperature)
return exp_payoffs / exp_payoffs.sum()
def best_response(self, opponent_avg_strategy, stochastic=True):
"""最优响应(可选择是否随机)"""
expected_payoffs = self.payoff_matrix @ opponent_avg_strategy
if stochastic:
return np.random.choice(self.n_strategies, p=self.boltzmann_probability(expected_payoffs))
else:
return np.argmax(expected_payoffs)
def learn(self, num_iterations, opponent_policy=None):
"""学习过程"""
for t in range(num_iterations):
opponent_avg = self.action_counts / max(1, sum(self.action_counts))
action = self.best_response(opponent_avg)
opponent_action = opponent_policy(self.opponent_history) if opponent_policy else np.random.randint(self.n_strategies)
self.update_opponent_history(opponent_action)
class HedgeAlgorithm:
"""
Hedge算法(指数加权算法)
一种无遗憾学习算法
每个动作的权重按指数增长
"""
def __init__(self, payoff_matrix, learning_rate=0.1):
self.payoff_matrix = np.array(payoff_matrix)
self.n_strategies = payoff_matrix.shape[0]
self.learning_rate = learning_rate
self.weights = np.ones(self.n_strategies)
self.history = []
def select_action(self):
"""按概率加权选择行动"""
probs = self.weights / self.weights.sum()
return np.random.choice(self.n_strategies, p=probs)
def update_weights(self, action, opponent_action):
"""更新权重"""
payoff = self.payoff_matrix[action, opponent_action]
self.weights *= np.exp(self.learning_rate * payoff)
self.history.append(self.weights.copy() / self.weights.sum())
def run(self, num_iterations, opponent_policy=None):
"""运行算法"""
for _ in range(num_iterations):
action = self.select_action()
opponent_action = opponent_policy() if opponent_policy else np.random.randint(self.n_strategies)
self.update_weights(action, opponent_action)
return self.weights / self.weights.sum()
class MultiplicativeWeightsUpdate:
"""
乘法权重更新算法(MWU)
另一种经典的无遗憾学习算法
"""
def __init__(self, n_actions, learning_rate=0.1):
self.n_actions = n_actions
self.lr = learning_rate
self.weights = np.ones(n_actions)
self.loss_history = []
def get_distribution(self):
"""获取行动分布"""
return self.weights / self.weights.sum()
def update(self, action_losses):
"""
更新权重
参数:
action_losses: 各行动的损失(归一化到[0,1])
"""
# 乘法更新规则
self.weights *= (1 - self.lr) ** action_losses
self.weights = np.maximum(self.weights, 1e-10) # 防止数值问题
self.loss_history.append(action_losses)
def expected_regret(self, best_action_payoff, num_rounds):
"""
计算期望遗憾
遗憾 = 最佳单一策略的累积收益 - 算法累积收益
"""
if not self.loss_history:
return 0
cumulative_loss = np.sum([loss[action] for loss, action in
zip(self.loss_history, self.get_action_history())])
best_loss = min([np.sum([loss[a] for loss in self.loss_history])
for a in range(self.n_actions)])
return (cumulative_loss - best_loss) / num_rounds5. 鲁棒博弈与不确定博弈
5.1 鲁棒博弈的定义
在实际问题中,参与人往往无法确切知道博弈的参数(如收益函数)。鲁棒博弈(Robust Games)研究在参数不确定下如何做决策。
class RobustGame:
"""
鲁棒博弈框架
不确定性建模:
- 集合U表示可能的参数值集合
- 每个参与人在最坏情况下优化
"""
def __init__(self, payoff_uncertainties, strategy_spaces):
"""
参数:
payoff_uncertainties: 各参与人收益的不确定性范围
strategy_spaces: 策略空间
"""
self.uncertainties = payoff_uncertainties
self.strategy_spaces = strategy_spaces
def worst_case_payoff(self, player_idx, my_action, opponent_actions,
uncertainty_samples=None):
"""
计算最坏情况下的收益
保守策略:在不确定性下最大化最小收益
"""
if uncertainty_samples is None:
# 网格搜索不确定性参数
uncertainty_samples = self._generate_uncertainty_samples()
worst_payoff = float('inf')
for uncertainty in uncertainty_samples:
payoff = self._compute_payoff_with_uncertainty(
player_idx, my_action, opponent_actions, uncertainty
)
worst_payoff = min(worst_payoff, payoff)
return worst_payoff
def robust_best_response(self, player_idx, opponent_actions):
"""
鲁棒最优响应:最大化最坏情况收益
"""
best_action = None
best_payoff = float('-inf')
for action in self.strategy_spaces[player_idx]:
worst_payoff = self.worst_case_payoff(player_idx, action, opponent_actions)
if worst_payoff > best_payoff:
best_payoff = worst_payoff
best_action = action
return best_action
def _compute_payoff_with_uncertainty(self, player_idx, my_action,
opponent_actions, uncertainty):
"""使用特定不确定性参数计算收益"""
# 简化实现:假设收益在基础值上添加扰动
base_payoff = self.base_payoff_matrix[player_idx][my_action][opponent_actions]
perturbation = uncertainty[player_idx]
return base_payoff + perturbation
def _generate_uncertainty_samples(self, num_samples=100):
"""生成不确定性样本"""
# 假设不确定性在[-ε, ε]范围内均匀分布
samples = []
for _ in range(num_samples):
sample = {}
for player, (low, high) in self.uncertainties.items():
sample[player] = np.random.uniform(low, high)
samples.append(sample)
return samples
class DistributionallyRobustGame:
"""
分布鲁棒博弈
假设收益分布不确定,在最坏情况分布下优化
"""
def __init__(self, base_payoffs, ambiguity_set_type='wasserstein'):
"""
参数:
base_payoffs: 基础收益矩阵
ambiguity_set_type: 不确定性集合类型
"""
self.base_payoffs = base_payoffs
self.ambiguity_set_type = ambiguity_set_type
def compute_ambiguity_set(self, radius=0.1):
"""
构建模糊集合
Wasserstein球:在Wasserstein距离radius内的所有分布
"""
# 简化实现
return {'type': 'wasserstein', 'radius': radius}
def worst_case_expectation(self, action_profile, ambiguity_set):
"""
计算最坏情况期望收益
在模糊集合中找到使期望收益最小的分布
"""
base_payoff = self.base_payoffs[tuple(action_profile)]
if self.ambiguity_set_type == 'wasserstein':
# Wasserstein球:扰动后的收益
worst_payoff = base_payoff - ambiguity_set['radius'] * np.linalg.norm(action_profile)
else:
worst_payoff = base_payoff
return worst_payoff5.2 贝叶斯博弈
class BayesianGame:
"""
贝叶斯博弈(不完全信息博弈)
博弈组成:
- 类型空间 T_i:每个参与人的私人信息
- 先验分布 p(t_1, ..., t_n)
- 类型依赖的收益函数 u_i(s_1, ..., s_n, t_i)
"""
def __init__(self, types, prior, payoff_functions, strategy_spaces):
self.types = types
self.prior = prior # 先验分布
self.payoff_functions = payoff_functions
self.strategy_spaces = strategy_spaces
def bayesian_nash_equilibrium(self):
"""
求解贝叶斯Nash均衡
贝叶斯Nash均衡条件:
对于每个参与人i和每个类型t_i ∈ T_i:
s_i^*(t_i) ∈ argmax_{s_i} E_{t_{-i}~p(t_{-i}|t_i)}[u_i(s_i, s_{-i}^*(t_{-i}), t_i)]
"""
# 简化实现:假设两个参与人,两个类型
pass # 复杂实现省略
def perfect_bayesian_equilibrium(self):
"""
完美贝叶斯均衡(PBE)
PBE条件:
1. 给定信念,行动是最优的
2. 信念通过贝叶斯规则更新(可能时)
"""
pass
class SignalingGame:
"""
信号博弈(Signaling Game)
发送者有私人信息,接收者根据接收到的信号行动
应用:
- 求职面试
- 品牌广告
- 医疗诊断
"""
def __init__(self, sender_types, receiver_actions, prior, payoff_matrices):
"""
参数:
sender_types: 发送者类型
receiver_actions: 接收者行动
prior: 类型先验概率
payoff_matrices: 收益矩阵
"""
self.sender_types = sender_types
self.receiver_actions = receiver_actions
self.prior = prior # P(type)
self.payoff_sender = payoff_matrices['sender'] # u_s(signal, action, type)
self.payoff_receiver = payoff_matrices['receiver'] # u_r(action, type)
def sender_strategy(self):
"""
发送者策略:给定类型,选择信号
策略映射:τ: T -> Signal
"""
pass
def receiver_belief_update(self, signal):
"""
接收者信念更新
使用贝叶斯规则:P(type|signal) ∝ P(signal|type) * P(type)
"""
pass
def receiver_strategy(self):
"""
接收者策略:给定信号,选择行动
策略映射:ρ: Signal -> Action
"""
pass
def separating_equilibrium(self):
"""
分离均衡
不同类型发送不同信号,接收者可以完全区分类型
"""
# 求解分离均衡
pass
def pooling_equilibrium(self):
"""
混同均衡
不同类型发送相同信号,接收者无法区分
"""
# 求解混同均衡
pass6. 拍卖理论基础
6.1 拍卖机制设计
class AuctionMechanism:
"""
拍卖机制基类
核心问题:
- 激励相容:说实话是占优策略
- 社会福利最大化
- 个人理性
"""
def __init__(self, reserve_price=0):
self.reserve_price = reserve_price
self.bidders = []
def submit_bid(self, bidder_id, bid):
"""提交投标"""
if bid >= self.reserve_price:
self.bidders.append((bidder_id, bid))
def determine_allocation(self):
"""确定分配(子类实现)"""
raise NotImplementedError
def determine_payment(self):
"""确定支付(子类实现)"""
raise NotImplementedError
class FirstPriceSealedBidAuction(FirstPriceSealedBidAuction):
"""
第一价格密封拍卖
特点:
- 投标是私密的
- 最高价者获胜
- 获胜者支付自己的投标价格
"""
def determine_winner(self):
"""确定胜者"""
if not self.bidders:
return None, None
sorted_bids = sorted(self.bidders, key=lambda x: x[1], reverse=True)
winner_id, winning_bid = sorted_bids[0]
return winner_id, winning_bid
class SecondPriceSealedBidAuction(VickreyAuction):
"""
第二价格密封拍卖(Vickrey拍卖)
核心性质:
- 激励相容:说实话是占优策略
- 胜者支付第二高价
"""
def determine_winner_and_payment(self):
"""确定胜者和支付价格"""
if len(self.bidders) < 2:
winner = self.bidders[0] if self.bidders else None
return winner, self.reserve_price
sorted_bids = sorted(self.bidders, key=lambda x: x[1], reverse=True)
winner = sorted_bids[0]
second_price = max(sorted_bids[1][1], self.reserve_price)
return winner, second_price
class EnglishAuction:
"""
英式拍卖(公开竞价)
特点:
- 价格逐渐上升
- 第一个放弃的人失去机会
- 最后一个出价的人获胜
"""
def __init__(self, bidders, reserve_price=0):
self.bidders = bidders
self.reserve_price = reserve_price
self.current_price = 0
self.active_bidders = set(bidders)
self.bid_history = []
def step(self, price_increment=1):
"""单步竞价"""
# 收集出价
new_bids = {}
for bidder in self.active_bidders:
# 假设每个竞价者有预算约束
if bidder.remaining_budget >= self.current_price + price_increment:
if bidder.willing_to_continue():
new_bids[bidder] = self.current_price + price_increment
if not new_bids:
# 没有人愿意继续,当前价格获胜
return None, self.current_price
# 最高出价者继续
highest_bidder = max(new_bids.keys(), key=lambda b: b.valuation)
self.current_price = new_bids[highest_bidder]
self.bid_history.append((highest_bidder, self.current_price))
return highest_bidder, self.current_price
def simulate(self):
"""模拟完整拍卖"""
while len(self.active_bidders) > 1:
result = self.step()
if result[0] is None:
# 拍卖结束
winner = self.bid_history[-1][0] if self.bid_history else None
return winner, self.current_price
# 其他竞价者决定是否继续
for bidder in list(self.active_bidders):
if bidder != result[0] and not bidder.willing_to_continue(self.current_price):
self.active_bidders.remove(bidder)
return list(self.active_bidders)[0], self.current_price
class DutchAuction:
"""
荷兰式拍卖(降价拍卖)
特点:
- 价格从高到低逐渐下降
- 第一个接受价格的人获胜
"""
def __init__(self, bidders, start_price, reserve_price=0):
self.bidders = bidders
self.start_price = start_price
self.reserve_price = reserve_price
self.current_price = start_price
self.price_decrement = 1
def step(self):
"""单步降价"""
self.current_price -= self.price_decrement
if self.current_price < self.reserve_price:
return None, None # 未售出
# 检查是否有人接受
for bidder in self.bidders:
if bidder.valuation >= self.current_price:
return bidder, self.current_price
return None, self.current_price
def simulate(self):
"""模拟完整拍卖"""
while self.current_price >= self.reserve_price:
result = self.step()
if result[0] is not None:
return result
return None, None # 未售出
class CombinatorialAuction:
"""
组合拍卖
允许投标人对商品组合出价
解决商品间的互补/替代效应
"""
def __init__(self):
self.items = []
self.bids = [] # (bidder_id, items, price)
def submit_bundle_bid(self, bidder_id, items, price):
"""提交组合投标"""
self.bids.append((bidder_id, set(items), price))
def winner_determination(self):
"""
胜者确定问题(Winner Determination Problem)
最大化社会福利:
max sum(v_i(S_i) * x_i)
s.t. 每个物品最多分配给一个投标人
这是NP-hard问题,使用贪婪或MIP求解
"""
from itertools import combinations
n_bids = len(self.bids)
n_items = len(self.items)
# 简化:使用贪婪算法
sorted_bids = sorted(self.bids, key=lambda x: x[2]/len(x[1]), reverse=True)
allocation = {}
allocated_items = set()
for bidder_id, items, price in sorted_bids:
# 检查物品是否可用
if items.isdisjoint(allocated_items):
allocation[bidder_id] = (items, price)
allocated_items.update(items)
return allocation
class DoubleAuction:
"""
双向拍卖
同时允许买家和卖家出价
用于去中心化市场
"""
def __init__(self, buyers, sellers, platform_fee=0):
"""
参数:
buyers: 买家出价 [(buyer_id, bid), ...]
sellers: 卖家要价 [(seller_id, ask), ...]
platform_fee: 平台费用比例
"""
self.buyers = sorted(buyers, key=lambda x: x[1], reverse=True) # 按出价降序
self.sellers = sorted(sellers, key=lambda x: x[1]) # 按要价升序
self.platform_fee = platform_fee
self.trades = []
def match(self):
"""
匹配买家和卖家
使用瓦尔德曼规则:
- 按出价降序排列买家
- 按要价升序排列卖家
- 在交叉点进行交易
"""
buyer_idx = 0
seller_idx = 0
while buyer_idx < len(self.buyers) and seller_idx < len(self.sellers):
buyer_id, buyer_bid = self.buyers[buyer_idx]
seller_id, seller_ask = self.sellers[seller_idx]
# 检查是否可交易
if buyer_bid >= seller_ask:
# 计算交易价格(使用中点价格)
price = (buyer_bid + seller_ask) / 2 * (1 - self.platform_fee)
self.trades.append({
'buyer': buyer_id,
'seller': seller_id,
'price': price,
'buyer_surplus': buyer_bid - price,
'seller_surplus': price - seller_ask
})
buyer_idx += 1
seller_idx += 1
else:
# 买家最高出价低于卖家最低要价,无法交易
break
return self.trades
def market_clearing_price(self):
"""计算市场出清价格"""
trades = self.match()
if not trades:
return None
return np.mean([t['price'] for t in trades])拍卖类型对比
拍卖类型 激励相容 效率 计算复杂性 第一价格密封拍卖 否 可能低效 低 第二价格密封拍卖 是 社会最优 低 组合拍卖 部分 近似最优 NP-hard
7. 机制设计高级理论
7.1 VCG机制
VCG(Vickrey-Clarke-Groves)机制是实现社会福利最大化的通用机制:
class VCGMechanism:
"""
VCG(Vickrey-Clarke-Groves)机制
核心性质:
- 激励相容:说实话是占优策略
- 社会福利最大化
- 个人理性
支付规则:
p_i = (sum_{j≠i} v_j(S^{-i})) - sum_{j≠i} v_j(S)
其中S是社会最优分配,S^{-i}是不包含i的分配
"""
def __init__(self, valuations, num_items):
"""
参数:
valuations: 各参与人的价值函数 v_i(S)
num_items: 可分配物品数量
"""
self.valuations = valuations
self.num_items = num_items
self.n_agents = len(valuations)
def social_welfare(self, allocation):
"""计算社会福利"""
welfare = 0
for i, item in enumerate(allocation):
welfare += self.valuations[i]({item})
return welfare
def allocate(self):
"""
求解最优分配
简化版本:单物品拍卖
"""
# 单物品拍卖:分配给出价最高者
bids = [self.valuations[i]({0}) for i in range(self.n_agents)]
winner = np.argmax(bids)
return [1 if i == winner else 0 for i in range(self.n_agents)]
def compute_payments(self):
"""
计算各参与人的支付
公式:p_i = (max_{S} sum_{j≠i} v_j(S)) - sum_{j≠i} v_j(S^*)
"""
allocation = self.allocate()
# 计算不含各参与人时的最优分配
payments = []
for i in range(self.n_agents):
# 社会最优(不含i)
other_welfare = 0
for j in range(self.n_agents):
if j != i and allocation[j] == 1:
other_welfare += self.valuations[j]({j})
# 含i的最优分配的社会福利(不含i的部分)
welfare_without_i = 0
for j in range(self.n_agents):
if j != i and allocation[j] == 1:
welfare_without_i += self.valuations[j]({j})
payment = welfare_without_i - other_welfare
payments.append(max(0, payment))
return payments
class MyersonLemma:
"""
Myerson引理:单参数环境的可检验性
一个分配规则和支付规则是激励相容的
当且仅当:
1. 分配规则是单调的:价值越高,获得物品的概率不降低
2. 支付按特定公式计算
"""
@staticmethod
def monotonicity_check(allocation_rule, values):
"""检查单调性"""
monotonic = True
for i in range(len(values)):
# 比较两个不同价值下的分配
for v1, v2 in zip(values[:-1], values[1:]):
if allocation_rule(v1) < allocation_rule(v2):
monotonic = False
break
return monotonic
@staticmethod
def payment_formula(allocation_rule, value, threshold):
"""
计算支付
p = v * x(v) - ∫_0^v x(t) dt
其中x是分配规则
"""
import scipy.integrate as integrate
def integrand(t):
return allocation_rule(t)
integral, _ = integrate.quad(integrand, 0, value)
payment = value * allocation_rule(value) - integral
return max(0, payment)7.2 肾脏交换匹配
class KidneyExchange:
"""
肾脏交换博弈
多对捐献者-接受者之间的配对交换
目标:最大化交换数量
"""
def __init__(self, donor_recipient_pairs):
"""
参数:
donor_recipient_pairs: [(donor_id, recipient_id, blood_type), ...]
"""
self.pairs = donor_recipient_pairs
self.compatibility = self._build_compatibility_graph()
def _build_compatibility_graph(self):
"""构建兼容性图"""
compatibility = {}
for i, (d1, r1, bt1) in enumerate(self.pairs):
for j, (d2, r2, bt2) in enumerate(self.pairs):
if i != j:
# 检查兼容性:donor2可以捐给recipient1
if self._compatible(bt2, bt1):
compatibility[(i, j)] = True
return compatibility
def _compatible(self, donor_bt, recipient_bt):
"""检查血型兼容性"""
compatibility = {
('A', 'A'): True, ('A', 'O'): True,
('B', 'B'): True, ('B', 'O'): True,
('O', 'O'): True,
('AB', 'A'): True, ('AB', 'B'): True, ('AB', 'AB'): True, ('AB', 'O'): False
}
return compatibility.get((donor_bt, recipient_bt), False)
def find_cycles(self, max_cycle_length=3):
"""
寻找交换循环
使用DFS寻找长度不超过max_cycle_length的循环
"""
from collections import defaultdict
graph = defaultdict(list)
for i in range(len(self.pairs)):
for j in range(len(self.pairs)):
if i != j and (i, j) in self.compatibility:
graph[i].append(j)
cycles = []
def dfs(start, current, path, visited):
if len(path) >= 2 and current == start:
cycles.append(path[:])
return
if len(path) >= max_cycle_length:
return
for neighbor in graph[current]:
if neighbor not in visited or (neighbor == start and len(path) >= 2):
if neighbor not in visited:
visited.add(neighbor)
path.append(neighbor)
dfs(start, neighbor, path, visited)
path.pop()
if neighbor not in visited:
visited.remove(neighbor)
for i in range(len(self.pairs)):
dfs(i, i, [i], {i})
return cycles
def solve(self, max_cycle_length=3):
"""
求解最优交换方案
使用整数规划最大化交换数量
"""
cycles = self.find_cycles(max_cycle_length)
if not cycles:
return []
# 贪婪选择不相交的循环
selected = []
used_pairs = set()
for cycle in sorted(cycles, key=len, reverse=True):
if not any(p in used_pairs for p in cycle):
selected.append(cycle)
used_pairs.update(cycle)
return selected8. 联邦学习中的博弈论
8.1 联邦学习的博弈建模
联邦学习(Federated Learning)中多个客户端在保持数据本地化的同时协作训练模型。这自然产生了一个博弈问题:每个客户端需要决定参与意愿、资源贡献和模型更新策略。
class FederatedLearningGame:
"""
联邦学习博弈模型
参与人:N个客户端
行动:是否参与联邦学习、贡献多少资源
收益:模型性能提升 vs 通讯成本/隐私泄露
"""
def __init__(self, clients, base_model_performance, communication_cost,
privacy_sensitivity):
"""
参数:
clients: 客户端列表
base_model_performance: 基础模型性能
communication_cost: 各客户端的通讯成本
privacy_sensitivity: 各客户端的隐私敏感度
"""
self.clients = clients
self.n_clients = len(clients)
self.base_performance = base_model_performance
self.communication_cost = communication_cost
self.privacy_sensitivity = privacy_sensitivity
def participation_payoff(self, client_idx, participation_decision,
global_model_quality):
"""
计算客户端参与联邦学习的收益
收益 = 模型提升 - 通讯成本 - 隐私成本
"""
if participation_decision == 0:
# 不参与:只获得基础模型
return 0
model_gain = global_model_quality - self.base_performance
comm_cost = self.communication_cost[client_idx]
privacy_cost = self.privacy_sensitivity[client_idx] * participation_decision
return model_gain - comm_cost - privacy_cost
def global_model_quality(self, participation_vector):
"""
根据参与率估计全局模型质量
简化为:质量 = base + α * participation_rate
"""
participation_rate = sum(participation_vector) / self.n_clients
return self.base_performance + 0.1 * participation_rate
def best_response(self, client_idx, others_decisions):
"""
客户端的最优响应:是否参与
"""
others_decisions = list(others_decisions)
others_decisions.insert(client_idx, 0) # 临时设为0
# 两种决策的收益对比
quality_with = self.global_model_quality(
others_decisions[:client_idx] + [1] + others_decisions[client_idx+1:]
)
quality_without = self.global_model_quality(others_decisions)
payoff_with = self.participation_payoff(client_idx, 1, quality_with)
payoff_without = 0 # 不参与的收益
return 1 if payoff_with > payoff_without else 0
def find_equilibrium(self):
"""
寻找博弈均衡
使用最佳响应动态迭代
"""
# 初始:所有人参与
participation = [1] * self.n_clients
for iteration in range(100):
changed = False
for i in range(self.n_clients):
best = self.best_response(i, participation[:i] + participation[i+1:])
if best != participation[i]:
participation[i] = best
changed = True
if not changed:
break
return participation
def incentive_analysis(self):
"""
激励分析:哪些客户端有动机参与
"""
results = []
for i, client in enumerate(self.clients):
# 不参与时的基准质量
base_quality = self.global_model_quality([0] * self.n_clients)
# 参与时的边际贡献
marginal_quality = self.global_model_quality([1] * self.n_clients)
marginal_gain = marginal_quality - base_quality
# 参与成本
cost = self.communication_cost[i] + self.privacy_sensitivity[i]
incentive_compatible = marginal_gain > cost
results.append({
'client': client,
'marginal_gain': marginal_gain,
'cost': cost,
'incentive_compatible': incentive_compatible
})
return results
class FederatedLearningMechanism:
"""
联邦学习激励机制设计
目标:
- 激励客户端诚实报告贡献
- 公平分配收益
- 鼓励高质量数据贡献
"""
def __init__(self, contribution_metrics):
"""
参数:
contribution_metrics: 各客户端的贡献度量
"""
self.contributions = contribution_metrics
def shapley_value(self):
"""
计算Shapley值:公平分配联合收益
Shapley值的公理化定义:
1. 效率性:分配等于所有参与人的边际贡献之和
2. 对称性:相同贡献的参与人获得相同分配
3. 虚拟性:无贡献参与人获得零分配
4. 可加性:合作收益是可加的
"""
n = len(self.contributions)
shapley = np.zeros(n)
# 精确Shapley值(对于线性贡献函数)
for i in range(n):
# 客户端i的Shapley值 = 贡献比例
total_contribution = sum(self.contributions)
shapley[i] = self.contributions[i] / total_contribution * sum(self.contributions) if total_contribution > 0 else 0
return shapley
def tmc_payment(self, client_idx, accuracy_with, accuracy_without):
"""
边际贡献支付(TMC)
支付 = accuracy_with - accuracy_without
"""
return accuracy_with - accuracy_without
def design_truthful_mechanism(self):
"""
设计激励相容的支付机制
使用VCG机制
"""
total_contribution = sum(self.contributions)
n = len(self.contributions)
payments = []
for i in range(n):
# 计算其他人排除i后的总贡献
others_contribution = total_contribution - self.contributions[i]
# VCG支付:社会福利损失
payment = total_contribution - others_contribution
payments.append(payment)
return payments
def stackelberg_incentive(self, central_server, clients):
"""
Stackelberg激励设计
中心服务器(领导者)设计激励方案
客户端(追随者)根据激励决定参与
"""
# 中心服务器选择激励参数
# 客户端决定参与程度
pass9. 多智能体强化学习(MARL)
9.1 合作多智能体强化学习
class MultiAgentReinforcementLearning:
"""
多智能体强化学习框架
核心挑战:
1. 部分可观测性
2. 非静态环境(其他智能体也在学习)
3. 信用分配问题
4. 通信与协调
"""
def __init__(self, n_agents, state_dim, action_dim):
self.n_agents = n_agents
self.state_dim = state_dim
self.action_dim = action_dim
# 各智能体的策略网络
self.policies = [PolicyNetwork(state_dim, action_dim) for _ in range(n_agents)]
# 各智能体的价值网络
self.value_networks = [ValueNetwork(state_dim) for _ in range(n_agents)]
def centralized_training_decentralized_execution(self, state):
"""
中心化训练去中心化执行(CTDE)
训练时使用全局状态
执行时只使用本地观测
"""
# 训练阶段
all_actions = [policy(state) for policy in self.policies]
# 计算联合Q值
joint_q = self.compute_joint_q(state, all_actions)
# 更新各智能体的策略
for i, policy in enumerate(self.policies):
policy.update(joint_q[i])
def compute_joint_q(self, state, actions):
"""计算联合Q值"""
joint_q = []
for i, value_net in enumerate(self.value_networks):
q_i = value_net(state, actions)
joint_q.append(q_i)
return joint_q
class IndependentQLearning:
"""
独立Q学习(IQL)
每个智能体独立学习Q函数,忽略其他智能体的存在
简单但可能不稳定
"""
def __init__(self, n_agents, state_dim, action_dim, learning_rate=0.1, gamma=0.99):
self.n_agents = n_agents
self.learning_rate = learning_rate
self.gamma = gamma
# 每个智能体独立的Q表/网络
self.q_tables = [np.zeros((state_dim, action_dim)) for _ in range(n_agents)]
def select_action(self, agent_id, state, epsilon=0.1):
"""ε-贪心选择动作"""
if np.random.rand() < epsilon:
return np.random.randint(self.action_dim)
else:
return np.argmax(self.q_tables[agent_id][state])
def update(self, agent_id, state, action, reward, next_state, done):
"""更新Q值"""
current_q = self.q_tables[agent_id][state, action]
max_next_q = np.max(self.q_tables[agent_id][next_state])
td_target = reward + self.gamma * max_next_q * (1 - done)
td_error = td_target - current_q
self.q_tables[agent_id][state, action] += self.learning_rate * td_error
def train(self, env, num_episodes=1000):
"""训练过程"""
for episode in range(num_episodes):
state = env.reset()
done = False
while not done:
actions = [self.select_action(i, s) for i, s in enumerate(state)]
next_state, rewards, done, info = env.step(actions)
for i in range(self.n_agents):
self.update(i, state[i], actions[i], rewards[i],
next_state[i], done)
state = next_state
class QMIX:
"""
QMIX:联合Q值分解
核心思想:
- 学习全局Q值Q_tot
- 使用单调混合网络保证局部最优响应
- 可通过中心化训练去中心化执行
"""
def __init__(self, n_agents, state_dim, action_dim):
self.n_agents = n_agents
# 各智能体的Q网络
self.q_nets = [QNetwork(state_dim, action_dim) for _ in range(n_agents)]
# 混合网络:将局部Q值映射到全局Q值
self.mixer = MixingNetwork(n_agents, state_dim)
def get_q_values(self, states):
"""获取各智能体的Q值"""
return [q_net(state) for q_net, state in zip(self.q_nets, states)]
def get_total_q(self, states, actions):
"""获取混合后的全局Q值"""
q_values = self.get_q_values(states)
return self.mixer(q_values, states[0]) # 全局状态用于混合网络
def update(self, states, actions, rewards, next_states, done):
"""更新"""
# 计算目标Q值
with torch.no_grad():
next_q_total = self.get_total_q(next_states, actions)
target_q = torch.tensor(rewards) + self.gamma * next_q_total * (1 - torch.tensor(done))
# 计算当前Q值
current_q = self.get_total_q(states, actions)
# 损失
loss = F.mse_loss(current_q, target_q)
# 反向传播
self.mixer.optimizer.zero_grad()
for q_net in self.q_nets:
q_net.optimizer.zero_grad()
loss.backward()
self.mixer.optimizer.step()
for q_net in self.q_nets:
q_net.optimizer.step()
class CommNet:
"""
CommNet:通信网络
智能体之间通过通信协调行动
"""
def __init__(self, n_agents, state_dim, action_dim, hidden_dim=64):
self.n_agents = n_agents
self.comm_layers = 3
# 编码器
self.encoder = nn.Linear(state_dim, hidden_dim)
# 通信层
self.comm_layers_net = nn.ModuleList([
CommunicationLayer(hidden_dim) for _ in range(self.comm_layers)
])
# 解码器
self.decoder = nn.Linear(hidden_dim, action_dim)
def communicate(self, hidden_states):
"""
智能体之间交换隐藏状态
每轮通信中,智能体接收其他智能体消息的平均
"""
for comm_layer in self.comm_layers_net:
new_hidden = []
for i in range(self.n_agents):
# 聚合其他智能体的消息
message = torch.stack([hidden_states[j] for j in range(self.n_agents) if j != i]).mean(0)
new_hidden.append(comm_layer(hidden_states[i], message))
hidden_states = new_hidden
return hidden_states
def forward(self, observations):
"""前向传播"""
# 编码观测
hidden_states = [self.encoder(obs) for obs in observations]
# 通信
hidden_states = self.communicate(hidden_states)
# 解码为动作概率
action_probs = [F.softmax(self.decoder(h), dim=-1) for h in hidden_states]
return action_probs9.2 竞争多智能体强化学习
class SelfPlay:
"""
自我对弈(Self-Play)
智能体与自己过去的版本对弈
逐渐提升策略水平
"""
def __init__(self, policy):
self.policy = policy
self.policy_pool = [copy.deepcopy(policy)]
self.opponent_pool = [copy.deepcopy(policy)]
def select_opponent(self):
"""选择对手策略"""
# 随机选择过去的策略作为对手
return random.choice(self.opponent_pool)
def update(self, episode_history):
"""更新策略"""
# 使用 episode_history 更新当前策略
self.policy.update(episode_history)
# 定期将当前策略加入池
if len(self.policy_pool) > 10:
self.policy_pool.pop(0)
self.policy_pool.append(copy.deepcopy(self.policy))
if len(self.opponent_pool) > 10:
self.opponent_pool.pop(0)
self.opponent_pool.append(copy.deepcopy(self.policy))
def train(self, env, num_episodes=10000):
"""训练过程"""
for episode in range(num_episodes):
opponent = self.select_opponent()
# 对弈
episode_history = self.play(env, opponent)
# 更新
self.update(episode_history)
def play(self, env, opponent):
"""执行一局对弈"""
# 实现对弈逻辑
pass
class FictitiousSelfPlay:
"""
虚拟自我对弈(FSP)
结合虚拟对局和自我对弈
"""
def __init__(self, n_strategies):
self.n_strategies = n_strategies
self.strategy_history = []
self.payoff_matrix = np.zeros((n_strategies, n_strategies))
# 各策略的出现次数
self_strategy_counts = np.zeros(n_strategies)
opponent_strategy_counts = np.zeros(n_strategies)
def update_payoff(self, my_strategy, opponent_strategy, payoff):
"""更新收益矩阵"""
self.payoff_matrix[my_strategy, opponent_strategy] += payoff
self_strategy_counts[my_strategy] += 1
opponent_strategy_counts[opponent_strategy] += 1
def get_average_payoff(self, strategy, opponent_avg_strategy):
"""获取平均收益"""
return np.dot(self.payoff_matrix[strategy], opponent_avg_strategy)
def select_strategy(self, epsilon=0.1):
"""选择策略(带探索)"""
if np.random.rand() < epsilon:
return np.random.randint(self.n_strategies)
opponent_avg = opponent_strategy_counts / max(1, sum(opponent_strategy_counts))
expected_payoffs = self.get_average_payoff(range(self.n_strategies), opponent_avg)
return np.argmax(expected_payoffs)10. 机制设计实践
10.1 能源市场机制
class EnergyMarketMechanism:
"""
能源市场机制设计
电力市场中的出清和定价
"""
def __init__(self, generators, loads, transmission_network):
self.generators = generators # 发电商
self.loads = loads # 需求方
self.network = transmission_network # 输电网
def uniform_price_auction(self):
"""
统一价格拍卖
所有中标者按统一价格结算
"""
# 聚合供给曲线
supply_bids = sorted(self.generators, key=lambda g: g.price)
# 聚合需求曲线
demand_bids = sorted(self.loads, key=lambda l: l.price, reverse=True)
# 寻找市场出清点
quantity = 0
price = 0
matches = []
supply_idx = 0
demand_idx = 0
while supply_idx < len(supply_bids) and demand_idx < len(demand_bids):
supply = supply_bids[supply_idx]
demand = demand_bids[demand_idx]
if supply.price <= demand.price:
matches.append((supply, demand, supply.price))
supply_idx += 1
demand_idx += 1
else:
break
return matches
def pay_as_bid(self):
"""
按报价支付
每个中标者按自己的报价结算
"""
# 类似统一价格,但支付各自的报价
pass
def locational_marginal_pricing(self):
"""
节点边际电价(LMP)
考虑输电阻塞的定价
"""
# DC潮流近似
# LMP = 能量分量 + 损耗分量 + 阻塞分量
pass
class SmartContractAuction:
"""
智能合约拍卖
使用区块链智能合约实现自动执行的拍卖
"""
def __init__(self, auction_type='english'):
self.auction_type = auction_type
self.bids = []
self.winner = None
self.payment = None
def submit_bid(self, bidder, amount):
"""提交投标"""
self.bids.append({'bidder': bidder, 'amount': amount})
def determine_winner(self):
"""确定胜者"""
if not self.bids:
return None
if self.auction_type == 'english':
# 英式拍卖:最高价
self.bids.sort(key=lambda x: x['amount'], reverse=True)
self.winner = self.bids[0]['bidder']
self.payment = self.bids[0]['amount']
elif self.auction_type == 'vickrey':
# Vickrey拍卖:第二高价
self.bids.sort(key=lambda x: x['amount'], reverse=True)
self.winner = self.bids[0]['bidder']
self.payment = self.bids[1]['amount'] if len(self.bids) > 1 else 0
return self.winner, self.payment
def execute(self):
"""执行合约"""
winner, payment = self.determine_winner()
# 智能合约逻辑
# - 将物品所有权转移给胜者
# - 从胜者账户扣除支付
# - 向其他投标者退还押金
return {
'winner': winner,
'payment': payment,
'all_bids': self.bids
}11. 均衡求解算法
11.1 Nash均衡求解
class NashEquilibriumSolver:
"""
Nash均衡求解器
提供多种求解方法
"""
@staticmethod
def brute_force_nash(payoff_matrix_1, payoff_matrix_2):
"""
暴力搜索Nash均衡(双人博弈)
适用于小规模博弈
"""
n_strategies_1 = payoff_matrix_1.shape[0]
n_strategies_2 = payoff_matrix_1.shape[1]
equilibria = []
for i in range(n_strategies_1):
for j in range(n_strategies_2):
# 检查是否是Nash均衡
# 玩家1不能通过单方面改变获得更高收益
row_best = np.argmax(payoff_matrix_1[i, :])
# 玩家2不能通过单方面改变获得更高收益
col_best = np.argmax(payoff_matrix_2[:, j])
if row_best == j and col_best == i:
equilibria.append((i, j))
return equilibria
@staticmethod
def mixed_nash_support_enumeration(payoff_matrix):
"""
混合策略Nash均衡的支持枚举
适用于双人零和博弈
"""
n = payoff_matrix.shape[0]
equilibria = []
# 尝试所有可能的支持组合
for support_size in range(1, n + 1):
for support in itertools.combinations(range(n), support_size):
# 解线性方程组找到混合策略
eq = solve_support_equilibrium(payoff_matrix, support)
if eq is not None:
equilibria.append(eq)
return equilibria
def solve_support_equilibrium(payoff_matrix, support):
"""
解特定支持下的均衡
使用线性规划
"""
from scipy.optimize import linprog
n = len(support)
# 构建线性规划问题
# 约束:支持内各行动的收益相等
# 约束:支持外各行动的收益不高于支持内
A_eq = []
b_eq = []
# 均等收益约束
for i in range(n - 1):
for j in range(i + 1, n):
row = np.zeros(n)
row[i] = 1
row[j] = -1
A_eq.append(row)
b_eq.append(0)
# 概率和为1
A_eq.append(np.ones(n))
b_eq.append(1)
# 求解
result = linprog(c=np.zeros(n), A_eq=A_eq, b_eq=b_eq, bounds=(0, None))
if result.success:
return result.x
return None
class LemkeHowsonAlgorithm:
"""
Lemke-Howson算法
求解双人博弈的混合策略Nash均衡
"""
def __init__(self, payoff_matrix_1, payoff_matrix_2):
self.payoff_1 = payoff_matrix_1
self.payoff_2 = payoff_matrix_2
self.n_1 = payoff_matrix_1.shape[0]
self.n_2 = payoff_matrix_1.shape[1]
def solve(self, initial_dropped=0, max_iterations=1000):
"""
求解Nash均衡
参数:
initial_dropped: 初始删除的纯策略
max_iterations: 最大迭代次数
"""
# 构建扩展支付矩阵
# 添加辅助行/列
# Lemke-Howson主循环
pass
def _pivot(self, tableau, entering_var, leaving_var):
"""枢轴操作"""
pass11.2 连续博弈求解
class ContinuousGameSolver:
"""
连续博弈求解器
策略空间是连续的
"""
@staticmethod
def gradient_ascent_game(G1, G2, x0, y0, lr=0.01, max_iter=1000):
"""
梯度上升博弈
每个玩家沿着自己的梯度方向更新
"""
x, y = x0, y0
history = [(x, y, G1(x, y), G2(x, y))]
for _ in range(max_iter):
# 计算梯度
grad_x = numerical_gradient(lambda xi: G1(xi, y), x)
grad_y = numerical_gradient(lambda yi: G2(x, yi), y)
# 更新
x = x + lr * grad_x
y = y + lr * grad_y
history.append((x, y, G1(x, y), G2(x, y)))
return x, y, history
@staticmethod
def fictitious_play_continuous(payoff_function, num_iterations=1000):
"""
连续博弈的虚拟对局
追踪历史平均策略
"""
# 历史策略
x_history = []
y_history = []
x = 0.5 # 初始策略
y = 0.5
for t in range(num_iterations):
# 计算历史平均
x_avg = np.mean(x_history) if x_history else 0.5
y_avg = np.mean(y_history) if y_history else 0.5
# 对平均策略选择最优响应
x_new = find_best_response(lambda xi: payoff_function(xi, y_avg), x)
y_new = find_best_response(lambda yi: payoff_function(x_avg, yi), y)
x_history.append(x_new)
y_history.append(y_new)
x = x_new
y = y_new
return x, y, (x_history, y_history)12. 博弈论与AI的交叉应用
12.1 生成对抗网络与博弈论
class GANasGameTheory:
"""
GAN的博弈论视角
GAN本质上是一个两人零和博弈
"""
def __init__(self, generator, discriminator):
self.G = generator
self.D = discriminator
def zero_sum_payoff(self, D_real, D_fake):
"""
零和支付
D的收益 = -G的收益
"""
D_payoff = np.mean(np.log(D_real) + np.log(1 - D_fake))
G_payoff = -D_payoff
return D_payoff, G_payoff
def find_nash_equilibrium(self):
"""
寻找Nash均衡
在均衡时:
- D(x) = 0.5 对所有x
- G的分布 = 真实数据分布
"""
pass
def analyze_training_dynamics(self):
"""
分析训练动态
理解GAN训练中的模式崩溃等问题
"""
pass
class GenerativeAdversarialNetworkGame:
"""
博弈论视角的GAN
扩展GAN理论到更一般的博弈结构
"""
def __init__(self, players, payoff_functions):
"""
参数:
players: 参与者列表
payoff_functions: 各参与者的收益函数
"""
self.players = players
self.payoffs = payoff_functions
def projected_gradient_descent(self, lr=0.01, num_iter=1000):
"""
投影梯度下降求解均衡
类似于GAN的训练过程
"""
strategies = {p: p.initial_strategy() for p in self.players}
for _ in range(num_iter):
# 计算各玩家的梯度
gradients = {}
for p in self.players:
gradients[p] = p.compute_gradient(strategies)
# 更新
for p in self.players:
strategies[p] = p.update(strategies[p], gradients[p], lr)
# 投影到可行集
strategies[p] = p.project(strategies[p])
return strategies12.2 AI安全与博弈论
class AIAlignmentGame:
"""
AI对齐博弈
人类与AI系统的策略交互
"""
def __init__(self):
self.human_strategies = ['trust', 'verify', 'refuse']
self.ai_strategies = ['cooperate', 'defect']
def payoff_matrix(self):
"""
定义支付矩阵
人类策略 vs AI策略 -> (人类收益, AI收益)
"""
return {
('trust', 'cooperate'): (10, 10),
('trust', 'defect'): (-5, 15),
('verify', 'cooperate'): (8, 8),
('verify', 'defect'): (5, 12),
('refuse', 'cooperate'): (0, 0),
('refuse', 'defect'): (0, 0)
}
class IncentiveCompatibleML:
"""
激励相容的机器学习系统
确保系统行为符合设计者意图
"""
@staticmethod
def design_surveillance_mechanism(budget, privacy_cost):
"""
设计监控机制
平衡监控收益和隐私成本
"""
pass
@staticmethod
def alignment_penalty(predictions, true_labels, incentive_alignment):
"""
对齐惩罚项
将激励相容性作为正则化项
"""
alignment_loss = 0
for pred, true, incentive in zip(predictions, true_labels, incentive_alignment):
if pred != true and incentive:
alignment_loss += 1
return alignment_loss13. 学术引用与参考文献
- Von Stackelberg, H. (1934). “Marktform und Gleichgewicht.” Springer.
- Maynard Smith, J. (1982). “Evolution and the Theory of Games.” Cambridge University Press.
- Hofbauer, J., & Sigmund, K. (1998). “Evolutionary Games and Population Dynamics.” Cambridge University Press.
- Shoham, Y., & Leyton-Brown, K. (2008). “Multiagent Systems: Algorithmic, Game-Theoretic, and Logical Foundations.” Cambridge University Press.
- Fudenberg, D., & Tirole, J. (1991). “Game Theory.” MIT Press.
- Myerson, R. B. (1991). “Game Theory: Analysis of Conflict.” Harvard University Press.
- McMahan, H. B., et al. (2017). “Communication-Efficient Learning of Deep Networks from Decentralized Data.” AISTATS.
- Roughgarden, T. (2010). “Algorithmic Game Theory.” Communications of the ACM, 53(7), 78-86.
- Osborne, M. J., & Rubinstein, A. (1994). “A Course in Game Theory.” MIT Press.
- Nisan, N., et al. (2007). “Algorithmic Game Theory.” Cambridge University Press.
- Littman, M. L. (1994). “Markov Games as a Framework for Multi-Agent Reinforcement Learning.” ICML.
- Lowe, R., et al. (2017). “Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments.” NeurIPS.
- Foerster, J., et al. (2018). “Counterfactual Multi-Agent Policy Gradients.” AAAI.
- Rashid, T., et al. (2018). “QMIX: Monotonic Value Function Factorisation for Deep Multi-Agent Reinforcement Learning.” ICML.
- Kraemer, L., & Banerjee, B. (2016). “Multi-Agent Reinforcement Learning as a Rehearsal for Decentralized Planning.” Neurocomputing.