关键词
| 术语 | 英文 | 核心概念 |
|---|---|---|
| Stackelberg博弈 | Stackelberg Game | 领导者-追随者结构的不完全信息博弈 |
| 演化博弈论 | Evolutionary Game Theory | 策略随时间演化学习 |
| 复制者动态 | Replicator Dynamics | 策略频率随收益动态变化 |
| 鲁棒博弈 | Robust Game | 参数不确定下的均衡分析 |
| 贝叶斯博弈 | Bayesian Game | 不完全信息博弈 |
| 拍卖理论 | Auction Theory | 资源分配的激励机制设计 |
| 联邦学习 | Federated Learning | 分布式机器学习的博弈建模 |
| 机制设计 | Mechanism Design | 激励相容的规则设计 |
| 相关均衡 | Correlated Equilibrium | 比Nash更弱的均衡概念 |
| 多智能体强化学习 | MARL | 多智能体环境下的强化学习 |
| 纳什均衡 | Nash Equilibrium | 无人可单方面改进的策略组合 |
| 帕累托最优 | Pareto Optimality | 无法使某人更好而不使他人更差的资源配置 |
| 社会福利 | Social Welfare | 所有参与人收益的总和 |
| 激励相容 | Incentive Compatibility | 说实话是占优策略 |
1. 引言:多智能体系统的复杂性
现实世界中的决策问题很少是孤立的——自动驾驶车辆需要相互协调、电子商务平台需要应对竞争商家的策略、电网调度需要平衡多方利益。多智能体博弈(Multi-Agent Games)正是研究这类复杂交互场景的核心框架,它将单智能体博弈论扩展到多个具有自主决策能力的智能体之间的策略交互。
多智能体博弈的挑战
与两人零和博弈不同,多智能体系统面临的核心挑战包括:策略空间指数级增长、均衡概念的多重性、计算复杂性、通讯与协调问题,以及如何在不完全信息和动态环境中做出决策。
1.1 多智能体博弈入门:多个决策者之间的互动
要理解多智能体博弈,咱们先从最简单的场景说起。想象一下,你和三个朋友在玩德州扑克,每个人都要决定是跟注、加注还是弃牌。你的收益不只取决于自己的选择,还取决于其他三个人的决策——这就是多智能体博弈最直观的样子。
为什么多智能体博弈这么难?
首先,每个智能体的策略空间可能是连续的高维空间。比如在自动驾驶场景中,一辆车要决定方向盘转多少度、油门踩多深、刹车踩多狠,这些参数组合起来就是一个巨大的策略空间。如果有10辆车同时在路上,那整体的策略空间就大得离谱了。
其次,智能体之间的关系可以是合作、竞争,或者两者兼有。合作的时候大家绑在一根绳上,要一起优化共同的目标;竞争的时候就是你死我活;更多时候是既合作又竞争,比如一个公司里的不同部门,既有共同利益也有各自的小算盘。
还有,信息不对称是个大问题。我可能知道一些别人不知道的事情,或者反过来。在拍卖的时候,每个投标者对自己的估值是清楚的,但对别人的估值一无所知。这种信息不对称会导致各种有意思的问题。
博弈的基本要素
一个博弈通常由这几样东西组成:
- 参与人:谁在参与这个博弈,可以是人、计算机程序、公司、国家,随你定。
- 策略空间:每个参与人可以选择哪些行动。比如石头剪刀布就是三个纯策略,但更复杂的博弈里可能有几十上百种选择。
- 收益函数:给定所有参与人的策略组合,每个人得到多少回报。这是参与人做决策的依据。
- 信息结构:谁知道什么,什么时候知道。信息决定了参与人能够基于什么来做决策。
1.2 竞争与合作的博弈分类:零和博弈 vs 正和博弈
多智能体博弈可以按参与人之间的关系分成几大类,理解这个分类特别重要,因为它直接影响我们分析和求解问题的方式。
零和博弈:你死我活的极端
零和博弈是最极端的竞争形式——一方的收益正好等于另一方的损失,总收益永远为零。经典的石头剪刀布就是零和博弈:你赢了我就输了,不存在双赢的可能。
这种博弈有个很好的性质:因为总收益是固定的,所以只要找到一个参与人的最优策略,另外一个的最优策略也就确定了。在数学上比较好处理。
但是现实世界很少是纯粹的零和博弈。你和同事竞争同一个晋升名额,虽然确实只有一个人能拿到,但两个候选人都提升了能力,公司也受益了——这不是零和。更典型的例子是商业竞争:两家可乐公司打价格战,看似你死我活,但如果两家公司都消失了,整个饮料行业的市场也会萎缩,消费者反而可能受害。
正和博弈:大家一起把蛋糕做大
正和博弈的核心思想是博弈不一定是分蛋糕,而是可以一起把蛋糕做大。两个人做贸易,各自从交易中获益,这就是正和博弈的经典例子。
合作博弈是正和博弈的一个重要分支。当参与人可以形成联盟、分享收益的时候,博弈就变得更加有意思了。比如在国际贸易中,各国通过谈判达成协议,每个国家都从开放市场中获益,但如何分配这些收益就成了谈判的焦点。
混合博弈:真实世界的常态
真实世界的博弈往往是合作与竞争并存的混合博弈。公司和员工之间的关系就是这样:员工需要公司发工资,公司需要员工干活——这是合作基础上的雇佣关系。但员工可能想摸鱼多拿钱,公司可能想压榨少发钱——这是竞争。
这种混合性质让问题变得复杂,但也更贴近现实。多智能体强化学习研究的一个重要方向就是如何在这种合作与竞争并存的环境中让智能体学会有效的行为。
1.3 多智能体系统的分类
多智能体系统可以从多个维度进行分类:
按智能体关系分类:
- 合作博弈:智能体有共同目标,需要协调行动
- 竞争博弈:智能体目标冲突,如零和博弈
- 混合博弈:同时包含合作和竞争元素
按信息结构分类:
- 完全信息博弈:所有智能体知道博弈结构
- 不完全信息博弈:智能体拥有关于其他智能体的私有信息
- 完美信息博弈:智能体知道博弈的完整历史
- 不完美信息博弈:智能体只能观察到部分历史
按决策时序分类:
- 同时行动博弈:所有智能体同时选择策略
- 序贯博弈:智能体按顺序行动,后行者可观察先行者
1.4 多智能体系统的应用场景
多智能体博弈理论在现实中有着广泛的应用:
经济领域:
- 市场竞争分析:多家企业的定价策略博弈
- 拍卖市场:频谱拍卖、广告拍卖的机制设计
- 电网调度:多方电力生产者的竞价博弈
人工智能:
- 自动驾驶:多车协同决策
- 游戏AI:多人对战游戏的智能对手
- 机器人团队:分布式机器人协调
网络安全:
- 入侵检测:攻击者与防御者的博弈
- 隐私保护:数据发布与隐私泄露的对抗
- 网络安全资源分配:防御资源的优化部署
2. Stackelberg博弈
2.1 博弈结构与数学建模
Stackelberg博弈是一种序贯博弈,其中一个参与人(领导者)先行动,其他参与人(追随者)在观察到领导者的行动后做出反应。这种”主从”结构广泛存在于市场竞争、安全巡逻、资源分配等场景。
import numpy as np
from scipy.optimize import minimize, linprog
class StackelbergGame:
"""
Stackelberg博弈:领导者-追随者结构
数学模型:
- 领导者:选择策略 x ∈ X ⊆ ℝ^n
- 追随者:观察到 x 后选择策略 y ∈ Y(x) ⊆ ℝ^m
目标函数:
- 领导者:max_{x} u_L(x, y^*(x))
- 追随者:y^*(x) = argmax_{y ∈ Y(x)} u_F(x, y)
其中 y^*(x) 是追随者的最优响应函数
"""
def __init__(self, leader_payoff, follower_payoff,
leader_action_set, follower_action_set):
self.leader_payoff = leader_payoff # 领导者收益函数
self.follower_payoff = follower_payoff # 追随者收益函数
self.leader_action_set = leader_action_set # 领导者行动空间
self.follower_action_set = follower_action_set # 追随者行动空间
def follower_best_response(self, leader_action):
"""
追随者的最优响应函数
给定领导者的行动,求追随者的最优反应
"""
best_y = None
best_payoff = float('-inf')
for y in self.follower_action_set:
payoff = self.follower_payoff(leader_action, y)
if payoff > best_payoff:
best_payoff = payoff
best_y = y
return best_y, best_payoff
def solve_leader_problem(self):
"""
求解Stackelberg均衡
领导者的问题:max_{x} u_L(x, BR_F(x))
其中 BR_F(x) 是追随者的最优响应
"""
best_x = None
best_leader_payoff = float('-inf')
for x in self.leader_action_set:
y_star, _ = self.follower_best_response(x)
leader_payoff = self.leader_payoff(x, y_star)
if leader_payoff > best_leader_payoff:
best_leader_payoff = leader_payoff
best_x = x
y_star, follower_payoff = self.follower_best_response(best_x)
return {
'leader_action': best_x,
'follower_action': y_star,
'leader_payoff': best_leader_payoff,
'follower_payoff': follower_payoff
}
# 安全博弈示例:Stackelberg安全游戏
class SecurityGame:
"""
Stackelberg安全博弈(LSG)
场景:防御者(领导者)决定资源部署,追击者(追随者)选择攻击目标
应用:
- 机场安检部署
- 网络安全资源分配
- 野生动物保护巡逻
"""
def __init__(self, targets, attacker_types, target_values, attack_costs):
"""
参数:
targets: 目标集合
attacker_types: 攻击者类型
target_values: 目标价值 V_t
attack_costs: 攻击成本 C_t^k
"""
self.targets = targets
self.attacker_types = attacker_types
self.target_values = np.array(target_values)
self.attack_costs = attack_costs # C[t, attacker_type]
def defender_payoff(self, coverage, attacker_action, attacker_type):
"""
防御者收益
如果攻击目标被覆盖:成功阻止攻击,获得目标价值
如果攻击目标未被覆盖:攻击成功,失去目标价值
"""
if attacker_action in coverage:
# 成功阻止
return self.target_values[attacker_action]
else:
# 攻击成功
return -self.target_values[attacker_action]
def attacker_payoff(self, coverage, attacker_action, attacker_type):
"""
攻击者收益
如果攻击被阻止:损失攻击成本
如果攻击成功:获得目标价值减去成本
"""
attack_cost = self.attack_costs[attacker_action, attacker_type]
if attacker_action in coverage:
# 攻击被阻止
return -attack_cost
else:
# 攻击成功
return self.target_values[attacker_action] - attack_cost
def attacker_best_response(self, coverage, attacker_type):
"""
攻击者的最优响应:选择期望收益最高的目标
"""
best_target = None
best_payoff = float('-inf')
for target in self.targets:
payoff = self.attacker_payoff(coverage, target, attacker_type)
if payoff > best_payoff:
best_payoff = payoff
best_target = target
return best_target, best_payoff
def solve_optimal_coverage(self, num_coverage):
"""
求解最优覆盖策略
使用MILP(混合整数线性规划)求解Stackelberg均衡
"""
n_targets = len(self.targets)
n_types = len(self.attacker_types)
# 变量:x_t = 1 if 目标t被覆盖
# 约束:sum(x_t) <= num_coverage
best_coverage = None
best_defender_utility = float('-inf')
# 暴力搜索(对于小规模问题)
from itertools import combinations
for coverage in combinations(range(n_targets), num_coverage):
coverage = set(coverage)
# 计算每个攻击者类型的期望收益
defender_utilities = []
for k in range(n_types):
target, _ = self.attacker_best_response(coverage, k)
util = self.defender_payoff(coverage, target, k)
defender_utilities.append(util)
# 防御者收益是所有攻击者类型的最小值(最坏情况)
min_util = min(defender_utilities)
if min_util > best_defender_utility:
best_defender_utility = min_util
best_coverage = coverage
return best_coverage, best_defender_utility2.2 Stackelberg均衡与Nash均衡的关系
Stackelberg博弈中的均衡概念比Nash均衡更强,因为领导者的先动优势(First-Mover Advantage)意味着追随者必须对领导者的策略做出最优响应。
def compare_stackelberg_nash(game):
"""
比较Stackelberg均衡和Nash均衡
Stackelberg均衡:
- 领导者选择策略 x^*
- 追随者选择 y^*(x^*)
- x^* 使得领导者收益最大化
Nash均衡:
- 同时行动
- 每个参与人都无法单方面改进
"""
# 对于简单的2x2博弈
payoff_matrix_leader = game.leader_payoff
payoff_matrix_follower = game.follower_payoff
# 计算Stackelberg均衡
stackelberg = game.solve_leader_problem()
# 计算Nash均衡
nash_equilibria = []
for i in range(len(game.leader_action_set)):
for j in range(len(game.follower_action_set)):
is_nash = True
# 检查领导者能否改进
for i_prime in range(len(game.leader_action_set)):
if payoff_matrix_leader[i_prime, j] > payoff_matrix_leader[i, j]:
is_nash = False
break
# 检查追随者能否改进
for j_prime in range(len(game.follower_action_set)):
if payoff_matrix_follower[i, j_prime] > payoff_matrix_follower[i, j]:
is_nash = False
break
if is_nash:
nash_equilibria.append((i, j))
return stackelberg, nash_equilibriaStackelberg vs Nash
在某些博弈中,Stackelberg均衡可能比Nash均衡给领导者带来更高的收益。这是因为领导者可以通过承诺(commitment)来限制追随者的选择,从而利用先动优势。
3. 纳什均衡与相关均衡
3.1 纳什均衡:你不动我也不动
纳什均衡可能是博弈论里最著名的概念了。它的核心思想很简单:在这个状态下,没有任何人有动机单方面改变自己的策略。
举个好理解的例子。想象你和朋友在酒吧,你想安静聊天但你朋友想跳舞。如果你两都选择安静,你开心他不开心;如果你两都选择跳舞,他开心你不开心;如果你跳舞他安静,你会被人笑话,他独自跳舞也挺尴尬;反过来也一样。这种情况下,你们两个都不满意对方的选择,但又都不想先改主意——这就是一个纳什均衡:谁先动谁就输了。
用数学的话说,在纳什均衡中,对每个参与人和任意其他策略,都有:
换句话说,给定别人不动,你改变策略只会让自己更惨,或者最多不赔不赚。
为什么纳什均衡重要?
因为它是一种”稳定”状态。如果所有人都预期大家会按纳什均衡行动,那么这个预期就是自我实现的——没有人会违背这个预期。
但纳什均衡也有问题:
-
存在性问题:并不是所有博弈都有纯策略纳什均衡。比如石头剪刀布就没有——你每次出石头,我都改出布,你又改出剪刀,永远转圈。
-
多重性问题:很多博弈有多个纳什均衡,这时候该选哪个就成了问题。
-
计算复杂性:找到纳什均衡本身可能是个很难的问题。
-
理性假设:纳什均衡假设所有人都足够理性,能够计算出自己的最优策略。但在真实场景中,人往往没那么理性。
3.2 相关均衡:引入协调者的新均衡
相关均衡是纳什均衡的推广。核心思想是:如果有一个公正的协调者给参与人提供信号,参与人根据信号选择行动,这种策略组合可能达到比纳什均衡更好的结果。
听起来有点抽象,让我用一个例子来说明。
假设两个公司面临囚徒困境:
- 两家都合作:各赚100万
- 两家都打价格战:各亏50万
- 一家合作一家打价格战:合作方亏80万,进攻方赚150万
传统的纳什均衡是两家公司都打价格战,各亏50万。谁都没有动机先合作,因为对方打价格战你就亏大了。
但是如果有个公证机构(比如行业协会)随机决定今天该由谁来打广告,另一家保持低价。参与者被要求服从随机结果:抽到”打广告”信号的公司就打广告,抽到”低价”信号的公司就低价销售。
这种机制下,只要协调者足够公正,每个参与人都没有动机违背约定。因为:
- 你抽到打广告信号,打广告是你的最优选择
- 你抽到低价信号,低价也是你的最优选择
相关均衡比纳什均衡更弱——它要求的条件更少,因此适用范围更广。同时它也更强——在某些博弈中,相关均衡可以给所有人带来更高的收益。
相关均衡的计算
class CorrelatedEquilibrium:
"""
相关均衡求解器
相关均衡条件:
E[Utility_i(a) | signal_i = s] >= E[Utility_i(a') | signal_i = s]
对所有信号s和所有替代行动a'
"""
def __init__(self, payoff_matrices):
"""
参数:
payoff_matrices: 各参与人的收益矩阵列表
"""
self.payoffs = payoff_matrices
self.n_players = len(payoff_matrices)
self.n_actions = [p.shape[0] for p in payoff_matrices]
def solve_ces(self):
"""
求解相关均衡(线性规划方法)
最大化社会福利:
max sum_{i,s,a} p(s) * x_a * u_i(a, s)
s.t. 激励相容约束
"""
from scipy.optimize import linprog
import numpy as np
# 构建线性规划问题
# 决策变量:联合概率分布 p(a_1, a_2, ..., a_n)
# 约束:概率非负、和为1、激励相容
n_vars = np.prod(self.n_actions)
# 目标函数:最大化社会福利
c = np.zeros(n_vars)
idx = 0
for actions in np.ndindex(*self.n_actions):
welfare = 0
for i, action in enumerate(actions):
welfare += self.payoffs[i][action] / self.n_players
c[idx] = -welfare # linprog是最小化
idx += 1
# 约束:概率和为1
A_eq = np.ones((1, n_vars))
b_eq = np.array([1.0])
# 求解
result = linprog(c, A_eq=A_eq, b_eq=b_eq, bounds=(0, 1))
if result.success:
return self._format_solution(result.x)
return None
def _format_solution(self, probabilities):
"""将平坦的概率向量转换为结构化格式"""
solutions = []
idx = 0
for actions in np.ndindex(*self.n_actions):
if probabilities[idx] > 1e-6:
solutions.append({
'action_profile': actions,
'probability': probabilities[idx]
})
idx += 1
return solutions4. 多人博弈与联盟博弈
4.1 从两人到多人:复杂度爆炸
当博弈从两个人扩展到很多人时,问题的复杂度不是线性增长,而是指数级爆炸。
考虑一个有个参与人的博弈,每个参与人有种纯策略。那么总的策略组合数是。要描述一个纳什均衡,你需要指定每个人的策略——当时,这就是个天文数字。
但更根本的挑战在于,多人博弈中人际关系变得更加复杂。两个人之间要么合作要么竞争,但三个人之间就可能出现派系——两个人联合对付第三个人。四个人、五个人…派系的可能性更是指数增长。
4.2 联盟博弈:大家一起干活怎么分蛋糕?
联盟博弈(Coalitional Game)是多人博弈的一个重要分支。核心问题是:当参与人组成联盟时,联盟获得的收益该怎么公平分配?
形式化地说,一个联盟博弈由两部分组成:
- :所有参与人的集合
- :联盟价值函数,表示联盟能够获得的收益
举个例子。三个程序员合作接了一个项目,单干的话每人只能接到小项目(各赚5万)。如果两人合作,可以接中等项目(两人各赚8万)。如果三人合作,可以拿下大项目(总共赚30万)。
问题是:这30万该怎么分?
Shapley值:基于边际贡献的公平分配
Shapley值是联盟博弈中最著名的解概念之一。它的核心思想是:每个人的分配应该反映他对各个联盟的”边际贡献”。
Shapley值的数学定义有点复杂,但思想很直观:
- 把所有参与人随机排成一个队
- 计算每个人加入时给联盟带来的边际贡献
- 对所有可能的排队顺序取平均
class CooperativeGame:
"""
合作博弈求解器
提供Shapley值、核仁解等核心概念的实现
"""
def __init__(self, characteristic_function):
"""
参数:
characteristic_function: 联盟价值函数 v(S) -> value
"""
self.v = characteristic_function
self.players = list(self.v({}).keys()) if self.v({}) else []
def shapley_value(self):
"""
计算Shapley值
公式:
φ_i(v) = Σ_{S ⊆ N\{i}} (|S|! * (n - |S| - 1)! / n!) * [v(S ∪ {i}) - v(S)]
其中求和遍历所有不包含i的联盟S
"""
n = len(self.players)
shapley = {p: 0 for p in self.players}
# 对每个参与人
for i in self.players:
# 枚举所有不包含i的联盟
others = [p for p in self.players if p != i]
for subset in self._power_set(others):
S = set(subset)
S_size = len(S)
# 边际贡献
marginal = self.v(S | {i}) - self.v(S)
# Shapley权重
weight = (S_size * self._factorial(n - S_size - 1)) / self._factorial(n)
shapley[i] += weight * marginal
return shapley
def _power_set(self, elements):
"""生成所有子集"""
from itertools import combinations
subsets = []
for r in range(len(elements) + 1):
for combo in combinations(elements, r):
subsets.append(list(combo))
return subsets
def _factorial(self, n):
"""阶乘"""
if n <= 1:
return 1
return n * self._factorial(n - 1)
def core_check(self, allocation):
"""
检查分配是否在核中
核的条件:
对于任何联盟S,有 Σ_{i∈S} x_i >= v(S)
"""
n = len(self.players)
total_allocation = sum(allocation.values())
# 效率性检查
if abs(total_allocation - self.v(set(self.players))) > 1e-6:
return False, "分配不满足效率性"
# 联盟合理性检查
for i in self.players:
if allocation[i] < self.v({i}):
return False, f"玩家{i}的分配低于其单干价值"
# 任意联盟的检查
others = [p for p in self.players]
for r in range(2, n):
for subset in self._power_set(others):
S = set(subset)
if len(S) == r and S:
coalition_value = self.v(S)
coalition_allocation = sum(allocation[i] for i in S)
if coalition_allocation < coalition_value - 1e-6:
return False, f"联盟{list(S)}不合理"
return True, "分配在核中"
# 程序员合作项目示例
def program_coalition_example():
"""三个程序员合作的收益分配"""
def v(S):
"""联盟价值函数"""
# 三个程序员:A擅长前端,B擅长后端,C擅长数据库
if len(S) == 0:
return 0
elif len(S) == 1:
return 5 # 单干各赚5万
elif len(S) == 2:
return 16 # 两人合作赚16万
else: # len(S) == 3
return 30 # 三人合作赚30万
game = CooperativeGame(v)
# 计算Shapley值
shapley = game.shapley_value()
print("三个程序员的Shapley值分配:")
for player, value in shapley.items():
print(f" {player}: {value:.2f}万")
# 检验核
allocation = shapley
in_core, msg = game.core_check(allocation)
print(f"\n核检验: {msg}")
return shapley4.3 核:不能被抱怨的分配
核(Core)是联盟博弈的另一个重要解概念。直观地说,一个分配在核中,当且仅当没有任何一个子联盟有动机脱离大联盟、自己组织起来。
核的定义: 对于分配,如果对所有联盟,都有:
换句话说,没有任何子联盟觉得自己分少了、应该分到更多。
核的问题在于它可能为空。有些博弈即使所有参与人都想合作,也找不到一个满足核条件的分配。这种情况通常发生在”规模经济”不够强的时候——比起大联盟抱团取暖,每个人单干或者小联盟反而更划算。
5. 演化博弈论
5.1 演化博弈的核心思想
演化博弈论(Evolutionary Game Theory)源于生物学研究,由John Maynard Smith在1982年系统化。它假设参与人不是完全理性的,而是通过策略的”自然选择”和”复制”过程逐渐趋向均衡。
核心假设:
- 有限理性:参与人通过试错学习,而非完美计算
- 群体选择:采用高收益策略的参与人更可能”复制”成功
- 动态演化:策略频率随时间变化
class EvolutionaryGame:
"""
演化博弈论框架
核心概念:
- 群体:大量参与人的集合
- 策略频率:群体中使用各策略的比例
- 适应度:策略的收益,决定复制速度
- 演化稳定策略(ESS):无法被小突变群体入侵的策略
"""
def __init__(self, payoff_matrix, population_sizes=None):
"""
参数:
payoff_matrix: 收益矩阵(大小:策略数 × 策略数)
population_sizes: 各策略的初始人口数
"""
self.payoff_matrix = np.array(payoff_matrix)
self.n_strategies = payoff_matrix.shape[0]
if population_sizes is None:
self.population = np.ones(self.n_strategies)
else:
self.population = np.array(population_sizes)
self.history = [self.population.copy()]
def compute_fitness(self, strategy_idx):
"""
计算策略的适应度
适应度 = 策略与其他策略博弈的平均收益
"""
population_frequencies = self.population / self.population.sum()
return np.dot(self.payoff_matrix[strategy_idx], population_frequencies)
def replicator_dynamics(self, dt=0.01):
"""
复制者动态方程
d x_i / dt = x_i * (f_i(x) - φ(x))
其中:
- x_i: 策略i的人口比例
- f_i(x): 策略i的适应度
- φ(x): 平均适应度 = sum(x_i * f_i(x))
"""
total_population = self.population.sum()
frequencies = self.population / total_population
# 计算各策略适应度
fitness = np.array([self.compute_fitness(i) for i in range(self.n_strategies)])
mean_fitness = np.dot(frequencies, fitness)
# 复制者动态
growth_rates = fitness - mean_fitness
self.population = self.population * (1 + dt * growth_rates)
# 确保人口非负
self.population = np.maximum(self.population, 0)
self.history.append(self.population.copy())
def simulate_evolution(self, num_steps=1000, dt=0.01):
"""
演化模拟
"""
for _ in range(num_steps):
self.replicator_dynamics(dt)
return self.get_equilibrium()
def get_equilibrium(self):
"""获取当前均衡状态"""
total = self.population.sum()
return self.population / total
def find_evolutionary_stable_strategies(self):
"""
寻找演化稳定策略(ESS)
ESS定义:
对于策略x^*,如果对于任意其他策略y ≠ x^*,当y的比例很小时:
u(x^*, εy + (1-ε)x^*) > u(y, εy + (1-ε)x^*)
其中ε是y的突变比例
"""
ess_candidates = []
# 检查纯策略是否为ESS
for i in range(self.n_strategies):
is_ess = True
for j in range(self.n_strategies):
if i == j:
continue
# 条件1:均衡时i的收益 >= j的收益
if self.payoff_matrix[i, i] < self.payoff_matrix[j, i]:
is_ess = False
break
# 条件2:如果收益相等,则i对i的收益 > j对i的收益
if self.payoff_matrix[i, i] == self.payoff_matrix[j, i]:
if self.payoff_matrix[i, j] <= self.payoff_matrix[j, j]:
is_ess = False
break
if is_ess:
ess_candidates.append(i)
return ess_candidates5.2 鹰鸽博弈:演化博弈的经典案例
def hawk_dove_game():
"""
鹰鸽博弈(Hawks and Doves Game)
场景:两只动物争夺价值V的资源
- 鹰策略:战斗直到受伤或对手撤退
- 鸽子策略:示威或撤退
收益矩阵(每场冲突):
鸽子 鹰
鸽子 V/2, V/2 0, V
鹰 V, 0 (V-C)/2, (V-C)/2
其中C是战斗受伤的成本(通常C > V)
"""
V = 10 # 资源价值
C = 20 # 战斗成本
payoff_matrix = np.array([
[V/2, 0], # 鸽子 vs 鸽子, 鸽子
[V, (V-C)/2] # 鹰 vs 鸽子, 鹰
])
game = EvolutionaryGame(payoff_matrix)
# 模拟演化过程
print("鹰鸽博弈演化分析:")
print(f"初始分布: {game.get_equilibrium()}")
game.simulate_evolution(num_steps=1000)
print(f"稳定分布: {game.get_equilibrium()}")
ess = game.find_evolutionary_stable_strategies()
print(f"ESS候选: {[['鸽子', '鹰'][i] for i in ess]}")
# 计算混合策略ESS
# 设p为选择鹰的概率
# ESS条件:p = (V - C + V) / (2V - 2C + 2V - C) = ...
# 简化:p* = V/C
p_ess = V / C
print(f"\n混合策略ESS: 鹰的概率 = {p_ess:.2f}")
print(f"解释:当群体中鹰的比例为{V/C:.0%}时,鸽子无法入侵")
# 运行分析
hawk_dove_game()5.3 石头-剪刀-布博弈
class RockPaperScissorsEvolution:
"""
石头剪刀布的演化博弈分析
收益矩阵:
R P S
R 0,0 -1,1 1,-1
P 1,-1 0,0 -1,1
S -1,1 1,-1 0,0
"""
def __init__(self):
# R=0, P=1, S=2
self.payoff_matrix = np.array([
[0, -1, 1], # R
[1, 0, -1], # P
[-1, 1, 0] # S
])
self.population = np.array([100, 100, 100]) # 初始人口
self.history = []
def fitness(self, strategy_idx):
"""计算策略适应度"""
total = self.population.sum()
freqs = self.population / total
return np.dot(self.payoff_matrix[strategy_idx], freqs)
def step(self, dt=0.01):
"""单步演化"""
fitnesses = np.array([self.fitness(i) for i in range(3)])
mean_fitness = np.dot(fitnesses, self.population / self.population.sum())
# 复制者动态
growth = self.population * (fitnesses - mean_fitness)
self.population = np.maximum(self.population + dt * growth, 0)
self.population = self.population / self.population.sum() * 300 # 保持总数
self.history.append(self.population.copy())
def simulate(self, steps=1000):
"""完整模拟"""
for _ in range(steps):
self.step()
return self.get_state()
def get_state(self):
"""获取当前状态"""
total = self.population.sum()
return self.population / total
def cyclic_analysis(self):
"""
分析循环动力学
石头剪刀布不收敛到固定点,而是在三维空间中循环
"""
import numpy as np
from scipy.integrate import odeint
# 连续时间复制者动态
def replicator_dt(pop, t, payoff):
pop = np.maximum(pop, 0)
pop = pop / pop.sum()
fitness = payoff @ pop
return pop * (fitness - np.dot(pop, fitness))
t = np.linspace(0, 50, 1000)
initial = [0.4, 0.3, 0.3]
trajectory = odeint(replicator_dt, initial, t, args=(self.payoff_matrix,))
return t, trajectory6. 博弈论中的学习算法
6.1 Replicator Dynamics(复制者动态)
复制者动态是演化博弈论中最基本的学习模型,它假设采用高收益策略的个体以更高速率复制:
class ReplicatorDynamics:
"""
复制者动态实现
方程:dx_i/dt = x_i * (f_i(x) - φ(x))
其中:
- x_i: 策略i的人口比例
- f_i(x): 策略i的期望收益
- φ(x): 平均收益
"""
def __init__(self, payoff_matrix, initial_distribution):
self.payoff_matrix = np.array(payoff_matrix)
self.x = np.array(initial_distribution)
self.history = [self.x.copy()]
def step(self, dt=0.01):
"""单步演化"""
# 计算各策略收益
fitness = self.payoff_matrix @ self.x
mean_fitness = np.dot(self.x, fitness)
# 复制者动态
growth_rate = fitness - mean_fitness
self.x = self.x * (1 + dt * growth_rate)
# 归一化
self.x = np.maximum(self.x, 0) # 确保非负
self.x = self.x / self.x.sum()
self.history.append(self.x.copy())
def run(self, num_steps=1000, dt=0.01):
"""运行完整演化"""
for _ in range(num_steps):
self.step()
return self.x
class SpatialReplicatorDynamics:
"""
空间复制者动态
考虑空间结构的演化博弈
个体只与邻居交互
"""
def __init__(self, payoff_matrix, grid_size=20, neighbor_type='moore'):
self.payoff_matrix = payoff_matrix
self.grid_size = grid_size
self.neighbor_type = neighbor_type
self.n_strategies = payoff_matrix.shape[0]
# 随机初始化网格
self.grid = np.random.randint(0, self.n_strategies, (grid_size, grid_size))
def get_neighbors(self, x, y):
"""获取邻居"""
if self.neighbor_type == 'moore':
# 8邻域
neighbors = []
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
if dx == 0 and dy == 0:
continue
nx, ny = (x + dx) % self.grid_size, (y + dy) % self.grid_size
neighbors.append(self.grid[nx, ny])
else:
# 4邻域
neighbors = []
for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
nx, ny = (x + dx) % self.grid_size, (y + dy) % self.grid_size
neighbors.append(self.grid[nx, ny])
return neighbors
def fitness(self, x, y):
"""计算位置(x,y)的适应度"""
strategy = self.grid[x, y]
neighbors = self.get_neighbors(x, y)
total_payoff = 0
for neighbor_strategy in neighbors + [strategy]:
total_payoff += self.payoff_matrix[strategy, neighbor_strategy]
return total_payoff / (len(neighbors) + 1)
def step(self):
"""单步演化"""
new_grid = self.grid.copy()
for x in range(self.grid_size):
for y in range(self.grid_size):
# 计算当前适应度
current_fitness = self.fitness(x, y)
# 找邻居中适应度最高的
neighbors = self.get_neighbors(x, y)
best_neighbor = None
best_fitness = current_fitness
for nx, ny in neighbors:
f = self.fitness(nx, ny)
if f > best_fitness:
best_fitness = f
best_neighbor = (nx, ny)
# 如果有更好的邻居,复制其策略
if best_neighbor is not None:
new_grid[x, y] = self.grid[best_neighbor]
self.grid = new_grid
def run(self, num_steps=100):
"""运行模拟"""
for _ in range(num_steps):
self.step()
return self.grid6.2 虚拟对局与无遗憾学习
class FictitiousPlay:
"""
虚拟对局(Fictitious Play)
学习规则:
1. 记录对手历史行动的平均策略
2. 对平均策略选择最优响应
3. 更新自己的行动历史
"""
def __init__(self, payoff_matrix):
self.payoff_matrix = np.array(payoff_matrix)
self.n_strategies = payoff_matrix.shape[0]
self.action_counts = np.zeros(self.n_strategies)
self.opponent_history = []
def update_opponent_history(self, opponent_action):
"""更新对手历史"""
self.opponent_history.append(opponent_action)
self.action_counts[opponent_action] += 1
def opponent_average_strategy(self):
"""对手的平均策略"""
return self.action_counts / self.action_counts.sum()
def best_response(self, opponent_avg_strategy):
"""对对手平均策略的最优响应"""
expected_payoffs = self.payoff_matrix @ opponent_avg_strategy
return np.argmax(expected_payoffs)
def choose_action(self):
"""选择行动"""
opponent_avg = self.opponent_average_strategy()
return self.best_response(opponent_avg)
def learn(self, num_iterations, opponent_policy=None):
"""
学习过程
参数:
opponent_policy: 对手策略函数(历史) -> 行动
"""
for t in range(num_iterations):
action = self.choose_action()
if opponent_policy:
opponent_action = opponent_policy(self.opponent_history)
else:
opponent_action = np.random.randint(self.n_strategies)
self.update_opponent_history(opponent_action)
return self.action_counts / sum(self.action_counts)
class StochasticFictitiousPlay:
"""
随机虚拟对局
带噪声的虚拟对局,更接近真实学习过程
"""
def __init__(self, payoff_matrix, temperature=1.0):
self.payoff_matrix = np.array(payoff_matrix)
self.n_strategies = payoff_matrix.shape[0]
self.temperature = temperature
self.action_counts = np.zeros(self.n_strategies)
self.opponent_history = []
def boltzmann_probability(self, expected_payoffs):
"""Boltzmann概率"""
exp_payoffs = np.exp(expected_payoffs / self.temperature)
return exp_payoffs / exp_payoffs.sum()
def best_response(self, opponent_avg_strategy, stochastic=True):
"""最优响应(可选择是否随机)"""
expected_payoffs = self.payoff_matrix @ opponent_avg_strategy
if stochastic:
return np.random.choice(self.n_strategies, p=self.boltzmann_probability(expected_payoffs))
else:
return np.argmax(expected_payoffs)
def learn(self, num_iterations, opponent_policy=None):
"""学习过程"""
for t in range(num_iterations):
opponent_avg = self.action_counts / max(1, sum(self.action_counts))
action = self.best_response(opponent_avg)
opponent_action = opponent_policy(self.opponent_history) if opponent_policy else np.random.randint(self.n_strategies)
self.update_opponent_history(opponent_action)
class HedgeAlgorithm:
"""
Hedge算法(指数加权算法)
一种无遗憾学习算法
每个动作的权重按指数增长
"""
def __init__(self, payoff_matrix, learning_rate=0.1):
self.payoff_matrix = np.array(payoff_matrix)
self.n_strategies = payoff_matrix.shape[0]
self.learning_rate = learning_rate
self.weights = np.ones(self.n_strategies)
self.history = []
def select_action(self):
"""按概率加权选择行动"""
probs = self.weights / self.weights.sum()
return np.random.choice(self.n_strategies, p=probs)
def update_weights(self, action, opponent_action):
"""更新权重"""
payoff = self.payoff_matrix[action, opponent_action]
self.weights *= np.exp(self.learning_rate * payoff)
self.history.append(self.weights.copy() / self.weights.sum())
def run(self, num_iterations, opponent_policy=None):
"""运行算法"""
for _ in range(num_iterations):
action = self.select_action()
opponent_action = opponent_policy() if opponent_policy else np.random.randint(self.n_strategies)
self.update_weights(action, opponent_action)
return self.weights / self.weights.sum()
class MultiplicativeWeightsUpdate:
"""
乘法权重更新算法(MWU)
另一种经典的无遗憾学习算法
"""
def __init__(self, n_actions, learning_rate=0.1):
self.n_actions = n_actions
self.lr = learning_rate
self.weights = np.ones(n_actions)
self.loss_history = []
def get_distribution(self):
"""获取行动分布"""
return self.weights / self.weights.sum()
def update(self, action_losses):
"""
更新权重
参数:
action_losses: 各行动的损失(归一化到[0,1])
"""
# 乘法更新规则
self.weights *= (1 - self.lr) ** action_losses
self.weights = np.maximum(self.weights, 1e-10) # 防止数值问题
self.loss_history.append(action_losses)
def expected_regret(self, best_action_payoff, num_rounds):
"""
计算期望遗憾
遗憾 = 最佳单一策略的累积收益 - 算法累积收益
"""
if not self.loss_history:
return 0
cumulative_loss = np.sum([loss[action] for loss, action in
zip(self.loss_history, self.get_action_history())])
best_loss = min([np.sum([loss[a] for loss in self.loss_history])
for a in range(self.n_actions)])
return (cumulative_loss - best_loss) / num_rounds7. 动手实验:用Python模拟一个简单的竞争博弈
光说不练假把式,让我们动手实现一个简单的竞争博弈模拟,加深对博弈论概念的理解。
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
class SimpleCompetitionGame:
"""
简单竞争博弈模拟器
场景:两家公司竞争市场份额
- 每家公司可以选择:高价策略 或 低价策略
- 收益取决于双方的选择
收益矩阵:
竞争者高价 竞争者低价
我方高价 50, 50 20, 80
我方低价 80, 20 10, 10
"""
def __init__(self):
# 收益矩阵(我方视角)
self.payoff_matrix = np.array([
[50, 20], # 我方高价
[80, 10] # 我方低价
])
self.strategy_names = ['高价', '低价']
self.history = []
def play_round(self, my_action, opponent_action):
"""玩一轮博弈"""
my_payoff = self.payoff_matrix[my_action, opponent_action]
opponent_payoff = self.payoff_matrix.T[my_action, opponent_action]
self.history.append({
'my_action': my_action,
'opponent_action': opponent_action,
'my_payoff': my_payoff,
'opponent_payoff': opponent_payoff
})
return my_payoff, opponent_payoff
def find_nash_equilibrium(self):
"""找纳什均衡"""
nash_eq = []
for my in range(2):
for opp in range(2):
# 检查是否是纳什均衡
# 我方能否单方面改进
my_best_response = np.argmax(self.payoff_matrix[:, opp])
# 对方能否单方面改进
opp_best_response = np.argmax(self.payoff_matrix[my, :])
if my_best_response == my and opp_best_response == opp:
nash_eq.append((my, opp))
return nash_eq
def find_pareto_optimal(self):
"""找帕累托最优"""
pareto = []
outcomes = [
(0, 0, 50, 50), # 双方高价
(0, 1, 20, 80), # 我高价,对方低价
(1, 0, 80, 20), # 我低价,对方高价
(1, 1, 10, 10) # 双方低价
]
for i, (a1, a2, p1, p2) in enumerate(outcomes):
is_pareto = True
for j, (b1, b2, q1, q2) in enumerate(outcomes):
# 检查是否被支配
if q1 >= p1 and q2 >= p2 and (q1 > p1 or q2 > p2):
is_pareto = False
break
if is_pareto:
pareto.append((a1, a2, p1, p2))
return pareto
class FictitiousPlaySimulator:
"""
虚拟对局模拟器
两个玩家通过虚拟对局学习,最终收敛到纳什均衡
"""
def __init__(self, payoff_matrix):
self.payoff_matrix = np.array(payoff_matrix)
self.n_strategies = payoff_matrix.shape[0]
# 双方的策略计数
self.my_counts = np.ones(self.n_strategies)
self.opp_counts = np.ones(self.n_strategies)
# 历史记录
self.my_history = []
self.opp_history = []
self.my_payoffs = []
self.opp_payoffs = []
def get_average_strategy(self, counts):
"""获取平均策略"""
return counts / counts.sum()
def best_response(self, payoff_matrix, strategy):
"""最优响应"""
expected = payoff_matrix @ strategy
return np.argmax(expected)
def soft_max_response(self, payoff_matrix, strategy, temperature=0.5):
"""软最大化响应(带探索)"""
expected = payoff_matrix @ strategy
exp_payoffs = np.exp(expected / temperature)
probs = exp_payoffs / exp_payoffs.sum()
return np.random.choice(len(probs), p=probs)
def simulate(self, num_rounds=1000, soft=True, temperature=0.5):
"""
运行模拟
参数:
num_rounds: 模拟轮数
soft: 是否使用软最大化
temperature: 探索温度
"""
for _ in range(num_rounds):
# 计算平均策略
my_avg = self.get_average_strategy(self.my_counts)
opp_avg = self.get_average_strategy(self.opp_counts)
# 选择行动
if soft:
my_action = self.soft_max_response(self.payoff_matrix, opp_avg, temperature)
opp_action = self.soft_max_response(self.payoff_matrix.T, my_avg, temperature)
else:
my_action = self.best_response(self.payoff_matrix, opp_avg)
opp_action = self.best_response(self.payoff_matrix.T, my_avg)
# 获取收益
my_payoff = self.payoff_matrix[my_action, opp_action]
opp_payoff = self.payoff_matrix[opp_action, my_action]
# 更新计数
self.my_counts[my_action] += 1
self.opp_counts[opp_action] += 1
# 记录历史
self.my_history.append(my_action)
self.opp_history.append(opp_action)
self.my_payoffs.append(my_payoff)
self.opp_payoffs.append(opp_payoff)
return self.get_results()
def get_results(self):
"""获取结果"""
final_my_strategy = self.get_average_strategy(self.my_counts)
final_opp_strategy = self.get_average_strategy(self.opp_counts)
return {
'my_final_strategy': final_my_strategy,
'opp_final_strategy': final_opp_strategy,
'my_avg_payoff': np.mean(self.my_payoffs[-100:]),
'opp_avg_payoff': np.mean(self.opp_payoffs[-100:]),
'my_history': self.my_history,
'opp_history': self.opp_history
}
def plot_convergence(self):
"""绘制收敛图"""
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
# 策略演化
window = 50
my_strategy_evolution = []
for i in range(window, len(self.my_history)):
window_counts = np.bincount(self.my_history[i-window:i], minlength=self.n_strategies)
my_strategy_evolution.append(window_counts / window_counts.sum())
my_strategy_evolution = np.array(my_strategy_evolution)
axes[0, 0].plot(my_strategy_evolution)
axes[0, 0].set_title('Player 1 Strategy Evolution')
axes[0, 0].set_xlabel('Round')
axes[0, 0].set_ylabel('Probability')
axes[0, 0].legend(['高价', '低价'])
axes[0, 0].grid(True)
# 收益演化
axes[0, 1].plot(self.my_payoffs, alpha=0.5)
axes[0, 1].axhline(y=50, color='r', linestyle='--', label='Pareto optimal')
axes[0, 1].set_title('Player 1 Payoff Over Time')
axes[0, 1].set_xlabel('Round')
axes[0, 1].set_ylabel('Payoff')
axes[0, 1].legend()
axes[0, 1].grid(True)
# 累积收益
axes[1, 0].plot(np.cumsum(self.my_payoffs), label='Player 1')
axes[1, 0].plot(np.cumsum(self.opp_payoffs), label='Player 2')
axes[1, 0].set_title('Cumulative Payoffs')
axes[1, 0].set_xlabel('Round')
axes[1, 0].set_ylabel('Cumulative Payoff')
axes[1, 0].legend()
axes[1, 0].grid(True)
# 最终策略分布
axes[1, 1].bar(['高价', '低价'], self.get_average_strategy(self.my_counts))
axes[1, 1].set_title('Final Strategy Distribution (Player 1)')
axes[1, 1].set_ylabel('Probability')
axes[1, 1].grid(True)
plt.tight_layout()
plt.savefig('game_simulation.png', dpi=150)
plt.show()
def run_experiment():
"""运行完整实验"""
# 创建博弈
game = SimpleCompetitionGame()
print("=" * 60)
print("简单竞争博弈分析")
print("=" * 60)
# 分析博弈结构
print("\n收益矩阵(我方视角):")
print(" 竞争者高价 竞争者低价")
print(f"我方高价 {game.payoff_matrix[0,0]:3d} {game.payoff_matrix[0,1]:3d}")
print(f"我方低价 {game.payoff_matrix[1,0]:3d} {game.payoff_matrix[1,1]:3d}")
# 找纳什均衡
nash_eq = game.find_nash_equilibrium()
print(f"\n纳什均衡: {[(game.strategy_names[a], game.strategy_names[b]) for a,b in nash_eq]}")
# 找帕累托最优
pareto = game.find_pareto_optimal()
print(f"帕累托最优: {[(game.strategy_names[a], game.strategy_names[b], p1, p2) for a,b,p1,p2 in pareto]}")
print("\n分析:")
print("- (高价, 高价) 是纳什均衡,但(低价, 低价)也是")
print("- (高价, 高价) 是帕累托最优,但(低价, 低价)不是")
print("- 这就是囚徒困境的结构!")
# 运行虚拟对局模拟
print("\n" + "=" * 60)
print("虚拟对局模拟")
print("=" * 60)
simulator = FictitiousPlaySimulator(game.payoff_matrix)
results = simulator.simulate(num_rounds=2000, soft=True, temperature=0.3)
print(f"\n最终策略分布(Player 1):")
for i, prob in enumerate(results['my_final_strategy']):
print(f" {game.strategy_names[i]}: {prob:.2%}")
print(f"\n最终策略分布(Player 2):")
for i, prob in enumerate(results['opp_final_strategy']):
print(f" {game.strategy_names[i]}: {prob:.2%}")
print(f"\n平均收益(最后100轮):")
print(f" Player 1: {results['my_avg_payoff']:.2f}")
print(f" Player 2: {results['opp_avg_payoff']:.2f}")
# 绘制收敛图
simulator.plot_convergence()
print("\n收敛图已保存为 game_simulation.png")
if __name__ == "__main__":
run_experiment()运行这个实验,你会看到:
-
博弈结构分析:纳什均衡有两个(高价-高价、低价-低价),但帕累托最优只有一个(高价-高价)。这就是经典的囚徒困境结构——明明大家都选高价对整体最好,但低价的诱惑让均衡落在了一个”次优”的位置。
-
虚拟对局收敛:两个”学习者”通过不断观察对手的历史、选择对历史平均的最优响应,最终会收敛到某个纳什均衡。由于博弈的对称性,收敛点通常是(高价, 高价)——也就是帕累托最优的那个均衡。
-
探索的作用:使用软最大化(soft-max)而不是硬最优响应,可以让学习过程带有探索性。这有助于跳出局部最优,发现更好的策略组合。
8. 均衡求解算法
8.1 Nash均衡求解
class NashEquilibriumSolver:
"""
Nash均衡求解器
提供多种求解方法
"""
@staticmethod
def brute_force_nash(payoff_matrix_1, payoff_matrix_2):
"""
暴力搜索Nash均衡(双人博弈)
适用于小规模博弈
"""
n_strategies_1 = payoff_matrix_1.shape[0]
n_strategies_2 = payoff_matrix_1.shape[1]
equilibria = []
for i in range(n_strategies_1):
for j in range(n_strategies_2):
# 检查是否是Nash均衡
# 玩家1不能通过单方面改变获得更高收益
row_best = np.argmax(payoff_matrix_1[i, :])
# 玩家2不能通过单方面改变获得更高收益
col_best = np.argmax(payoff_matrix_2[:, j])
if row_best == j and col_best == i:
equilibria.append((i, j))
return equilibria
@staticmethod
def mixed_nash_support_enumeration(payoff_matrix):
"""
混合策略Nash均衡的支持枚举
适用于双人零和博弈
"""
n = payoff_matrix.shape[0]
equilibria = []
# 尝试所有可能的支持组合
for support_size in range(1, n + 1):
for support in itertools.combinations(range(n), support_size):
# 解线性方程组找到混合策略
eq = solve_support_equilibrium(payoff_matrix, support)
if eq is not None:
equilibria.append(eq)
return equilibria
def solve_support_equilibrium(payoff_matrix, support):
"""
解特定支持下的均衡
使用线性规划
"""
from scipy.optimize import linprog
n = len(support)
# 构建线性规划问题
# 约束:支持内各行动的收益相等
# 约束:支持外各行动的收益不高于支持内
A_eq = []
b_eq = []
# 均等收益约束
for i in range(n - 1):
for j in range(i + 1, n):
row = np.zeros(n)
row[i] = 1
row[j] = -1
A_eq.append(row)
b_eq.append(0)
# 概率和为1
A_eq.append(np.ones(n))
b_eq.append(1)
# 求解
result = linprog(c=np.zeros(n), A_eq=A_eq, b_eq=b_eq, bounds=(0, None))
if result.success:
return result.x
return None
class LemkeHowsonAlgorithm:
"""
Lemke-Howson算法
求解双人博弈的混合策略Nash均衡
"""
def __init__(self, payoff_matrix_1, payoff_matrix_2):
self.payoff_1 = payoff_matrix_1
self.payoff_2 = payoff_matrix_2
self.n_1 = payoff_matrix_1.shape[0]
self.n_2 = payoff_matrix_1.shape[1]
def solve(self, initial_dropped=0, max_iterations=1000):
"""
求解Nash均衡
参数:
initial_dropped: 初始删除的纯策略
max_iterations: 最大迭代次数
"""
# 构建扩展支付矩阵
# 添加辅助行/列
# Lemke-Howson主循环
pass
def _pivot(self, tableau, entering_var, leaving_var):
"""枢轴操作"""
pass8.2 连续博弈求解
class ContinuousGameSolver:
"""
连续博弈求解器
策略空间是连续的
"""
@staticmethod
def gradient_ascent_game(G1, G2, x0, y0, lr=0.01, max_iter=1000):
"""
梯度上升博弈
每个玩家沿着自己的梯度方向更新
"""
x, y = x0, y0
history = [(x, y, G1(x, y), G2(x, y))]
for _ in range(max_iter):
# 计算梯度
grad_x = numerical_gradient(lambda xi: G1(xi, y), x)
grad_y = numerical_gradient(lambda yi: G2(x, yi), y)
# 更新
x = x + lr * grad_x
y = y + lr * grad_y
history.append((x, y, G1(x, y), G2(x, y)))
return x, y, history
@staticmethod
def fictitious_play_continuous(payoff_function, num_iterations=1000):
"""
连续博弈的虚拟对局
追踪历史平均策略
"""
# 历史策略
x_history = []
y_history = []
x = 0.5 # 初始策略
y = 0.5
for t in range(num_iterations):
# 计算历史平均
x_avg = np.mean(x_history) if x_history else 0.5
y_avg = np.mean(y_history) if y_history else 0.5
# 对平均策略选择最优响应
x_new = find_best_response(lambda xi: payoff_function(xi, y_avg), x)
y_new = find_best_response(lambda yi: payoff_function(x_avg, yi), y)
x_history.append(x_new)
y_history.append(y_new)
x = x_new
y = y_new
return x, y, (x_history, y_history)9. 拍卖理论基础
9.1 拍卖机制设计
class AuctionMechanism:
"""
拍卖机制基类
核心问题:
- 激励相容:说实话是占优策略
- 社会福利最大化
- 个人理性
"""
def __init__(self, reserve_price=0):
self.reserve_price = reserve_price
self.bidders = []
def submit_bid(self, bidder_id, bid):
"""提交投标"""
if bid >= self.reserve_price:
self.bidders.append((bidder_id, bid))
def determine_allocation(self):
"""确定分配(子类实现)"""
raise NotImplementedError
def determine_payment(self):
"""确定支付(子类实现)"""
raise NotImplementedError
class FirstPriceSealedBidAuction:
"""
第一价格密封拍卖
特点:
- 投标是私密的
- 最高价者获胜
- 获胜者支付自己的投标价格
"""
def determine_winner(self):
"""确定胜者"""
if not self.bidders:
return None, None
sorted_bids = sorted(self.bidders, key=lambda x: x[1], reverse=True)
winner_id, winning_bid = sorted_bids[0]
return winner_id, winning_bid
class VickreyAuction:
"""
第二价格密封拍卖(Vickrey拍卖)
核心性质:
- 激励相容:说实话是占优策略
- 胜者支付第二高价
"""
def determine_winner_and_payment(self):
"""确定胜者和支付价格"""
if len(self.bidders) < 2:
winner = self.bidders[0] if self.bidders else None
return winner, self.reserve_price
sorted_bids = sorted(self.bidders, key=lambda x: x[1], reverse=True)
winner = sorted_bids[0]
second_price = max(sorted_bids[1][1], self.reserve_price)
return winner, second_price
class EnglishAuction:
"""
英式拍卖(公开竞价)
特点:
- 价格逐渐上升
- 第一个放弃的人失去机会
- 最后一个出价的人获胜
"""
def __init__(self, bidders, reserve_price=0):
self.bidders = bidders
self.reserve_price = reserve_price
self.current_price = 0
self.active_bidders = set(bidders)
self.bid_history = []
def step(self, price_increment=1):
"""单步竞价"""
# 收集出价
new_bids = {}
for bidder in self.active_bidders:
# 假设每个竞价者有预算约束
if bidder.remaining_budget >= self.current_price + price_increment:
if bidder.willing_to_continue():
new_bids[bidder] = self.current_price + price_increment
if not new_bids:
# 没有人愿意继续,当前价格获胜
return None, self.current_price
# 最高出价者继续
highest_bidder = max(new_bids.keys(), key=lambda b: b.valuation)
self.current_price = new_bids[highest_bidder]
self.bid_history.append((highest_bidder, self.current_price))
return highest_bidder, self.current_price
def simulate(self):
"""模拟完整拍卖"""
while len(self.active_bidders) > 1:
result = self.step()
if result[0] is None:
# 拍卖结束
winner = self.bid_history[-1][0] if self.bid_history else None
return winner, self.current_price
# 其他竞价者决定是否继续
for bidder in list(self.active_bidders):
if bidder != result[0] and not bidder.willing_to_continue(self.current_price):
self.active_bidders.remove(bidder)
return list(self.active_bidders)[0], self.current_price
class DutchAuction:
"""
荷兰式拍卖(降价拍卖)
特点:
- 价格从高到低逐渐下降
- 第一个接受价格的人获胜
"""
def __init__(self, bidders, start_price, reserve_price=0):
self.bidders = bidders
self.start_price = start_price
self.reserve_price = reserve_price
self.current_price = start_price
self.price_decrement = 1
def step(self):
"""单步降价"""
self.current_price -= self.price_decrement
if self.current_price < self.reserve_price:
return None, None # 未售出
# 检查是否有人接受
for bidder in self.bidders:
if bidder.valuation >= self.current_price:
return bidder, self.current_price
return None, self.current_price
def simulate(self):
"""模拟完整拍卖"""
while self.current_price >= self.reserve_price:
result = self.step()
if result[0] is not None:
return result
return None, None # 未售出
class CombinatorialAuction:
"""
组合拍卖
允许投标人对商品组合出价
解决商品间的互补/替代效应
"""
def __init__(self):
self.items = []
self.bids = [] # (bidder_id, items, price)
def submit_bundle_bid(self, bidder_id, items, price):
"""提交组合投标"""
self.bids.append((bidder_id, set(items), price))
def winner_determination(self):
"""
胜者确定问题(Winner Determination Problem)
最大化社会福利:
max sum(v_i(S_i) * x_i)
s.t. 每个物品最多分配给一个投标人
这是NP-hard问题,使用贪婪或MIP求解
"""
from itertools import combinations
n_bids = len(self.bids)
n_items = len(self.items)
# 简化:使用贪婪算法
sorted_bids = sorted(self.bids, key=lambda x: x[2]/len(x[1]), reverse=True)
allocation = {}
allocated_items = set()
for bidder_id, items, price in sorted_bids:
# 检查物品是否可用
if items.isdisjoint(allocated_items):
allocation[bidder_id] = (items, price)
allocated_items.update(items)
return allocation
class DoubleAuction:
"""
双向拍卖
同时允许买家和卖家出价
用于去中心化市场
"""
def __init__(self, buyers, sellers, platform_fee=0):
"""
参数:
buyers: 买家出价 [(buyer_id, bid), ...]
sellers: 卖家要价 [(seller_id, ask), ...]
platform_fee: 平台费用比例
"""
self.buyers = sorted(buyers, key=lambda x: x[1], reverse=True) # 按出价降序
self.sellers = sorted(sellers, key=lambda x: x[1]) # 按要价升序
self.platform_fee = platform_fee
self.trades = []
def match(self):
"""
匹配买家和卖家
使用瓦尔德曼规则:
- 按出价降序排列买家
- 按要价升序排列卖家
- 在交叉点进行交易
"""
buyer_idx = 0
seller_idx = 0
while buyer_idx < len(self.buyers) and seller_idx < len(self.sellers):
buyer_id, buyer_bid = self.buyers[buyer_idx]
seller_id, seller_ask = self.sellers[seller_idx]
# 检查是否可交易
if buyer_bid >= seller_ask:
# 计算交易价格(使用中点价格)
price = (buyer_bid + seller_ask) / 2 * (1 - self.platform_fee)
self.trades.append({
'buyer': buyer_id,
'seller': seller_id,
'price': price,
'buyer_surplus': buyer_bid - price,
'seller_surplus': price - seller_ask
})
buyer_idx += 1
seller_idx += 1
else:
# 买家最高出价低于卖家最低要价,无法交易
break
return self.trades
def market_clearing_price(self):
"""计算市场出清价格"""
trades = self.match()
if not trades:
return None
return np.mean([t['price'] for t in trades])拍卖类型对比
拍卖类型 激励相容 效率 计算复杂性 第一价格密封拍卖 否 可能低效 低 第二价格密封拍卖 是 社会最优 低 组合拍卖 部分 近似最优 NP-hard
10. 机制设计高级理论
10.1 VCG机制
VCG(Vickrey-Clarke-Groves)机制是实现社会福利最大化的通用机制:
class VCGMechanism:
"""
VCG(Vickrey-Clarke-Groves)机制
核心性质:
- 激励相容:说实话是占优策略
- 社会福利最大化
- 个人理性
支付规则:
p_i = (sum_{j≠i} v_j(S^{-i})) - sum_{j≠i} v_j(S)
其中S是社会最优分配,S^{-i}是不包含i的分配
"""
def __init__(self, valuations, num_items):
"""
参数:
valuations: 各参与人的价值函数 v_i(S)
num_items: 可分配物品数量
"""
self.valuations = valuations
self.num_items = num_items
self.n_agents = len(valuations)
def social_welfare(self, allocation):
"""计算社会福利"""
welfare = 0
for i, item in enumerate(allocation):
welfare += self.valuations[i]({item})
return welfare
def allocate(self):
"""
求解最优分配
简化版本:单物品拍卖
"""
# 单物品拍卖:分配给出价最高者
bids = [self.valuations[i]({0}) for i in range(self.n_agents)]
winner = np.argmax(bids)
return [1 if i == winner else 0 for i in range(self.n_agents)]
def compute_payments(self):
"""
计算各参与人的支付
公式:p_i = (max_{S} sum_{j≠i} v_j(S)) - sum_{j≠i} v_j(S^*)
"""
allocation = self.allocate()
# 计算不含各参与人时的最优分配
payments = []
for i in range(self.n_agents):
# 社会最优(不含i)
other_welfare = 0
for j in range(self.n_agents):
if j != i and allocation[j] == 1:
other_welfare += self.valuations[j]({j})
# 含i的最优分配的社会福利(不含i的部分)
welfare_without_i = 0
for j in range(self.n_agents):
if j != i and allocation[j] == 1:
welfare_without_i += self.valuations[j]({j})
payment = welfare_without_i - other_welfare
payments.append(max(0, payment))
return payments
class MyersonLemma:
"""
Myerson引理:单参数环境的可检验性
一个分配规则和支付规则是激励相容的
当且仅当:
1. 分配规则是单调的:价值越高,获得物品的概率不降低
2. 支付按特定公式计算
"""
@staticmethod
def monotonicity_check(allocation_rule, values):
"""检查单调性"""
monotonic = True
for i in range(len(values)):
# 比较两个不同价值下的分配
for v1, v2 in zip(values[:-1], values[1:]):
if allocation_rule(v1) < allocation_rule(v2):
monotonic = False
break
return monotonic
@staticmethod
def payment_formula(allocation_rule, value, threshold):
"""
计算支付
p = v * x(v) - ∫_0^v x(t) dt
其中x是分配规则
"""
import scipy.integrate as integrate
def integrand(t):
return allocation_rule(t)
integral, _ = integrate.quad(integrand, 0, value)
payment = value * allocation_rule(value) - integral
return max(0, payment)10.2 肾脏交换匹配
class KidneyExchange:
"""
肾脏交换博弈
多对捐献者-接受者之间的配对交换
目标:最大化交换数量
"""
def __init__(self, donor_recipient_pairs):
"""
参数:
donor_recipient_pairs: [(donor_id, recipient_id, blood型), ...]
"""
self.pairs = donor_recipient_pairs
self.compatibility = self._build_compatibility_graph()
def _build_compatibility_graph(self):
"""构建兼容性图"""
compatibility = {}
for i, (d1, r1, bt1) in enumerate(self.pairs):
for j, (d2, r2, bt2) in enumerate(self.pairs):
if i != j:
# 检查兼容性:donor2可以捐给recipient1
if self._compatible(bt2, bt1):
compatibility[(i, j)] = True
return compatibility
def _compatible(self, donor_bt, recipient_bt):
"""检查血型兼容性"""
compatibility = {
('A', 'A'): True, ('A', 'O'): True,
('B', 'B'): True, ('B', 'O'): True,
('O', 'O'): True,
('AB', 'A'): True, ('AB', 'B'): True, ('AB', 'AB'): True, ('AB', 'O'): False
}
return compatibility.get((donor_bt, recipient_bt), False)
def find_cycles(self, max_cycle_length=3):
"""
寻找交换循环
使用DFS寻找长度不超过max_cycle_length的循环
"""
from collections import defaultdict
graph = defaultdict(list)
for i in range(len(self.pairs)):
for j in range(len(self.pairs)):
if i != j and (i, j) in self.compatibility:
graph[i].append(j)
cycles = []
def dfs(start, current, path, visited):
if len(path) >= 2 and current == start:
cycles.append(path[:])
return
if len(path) >= max_cycle_length:
return
for neighbor in graph[current]:
if neighbor not in visited or (neighbor == start and len(path) >= 2):
if neighbor not in visited:
visited.add(neighbor)
path.append(neighbor)
dfs(start, neighbor, path, visited)
path.pop()
if neighbor not in visited:
visited.remove(neighbor)
for i in range(len(self.pairs)):
dfs(i, i, [i], {i})
return cycles
def solve(self, max_cycle_length=3):
"""
求解最优交换方案
使用整数规划最大化交换数量
"""
cycles = self.find_cycles(max_cycle_length)
if not cycles:
return []
# 贪婪选择不相交的循环
selected = []
used_pairs = set()
for cycle in sorted(cycles, key=len, reverse=True):
if not any(p in used_pairs for p in cycle):
selected.append(cycle)
used_pairs.update(cycle)
return selected11. 联邦学习中的博弈论
11.1 联邦学习的博弈建模
联邦学习(Federated Learning)中多个客户端在保持数据本地化的同时协作训练模型。这自然产生了一个博弈问题:每个客户端需要决定参与意愿、资源贡献和模型更新策略。
class FederatedLearningGame:
"""
联邦学习博弈模型
参与人:N个客户端
行动:是否参与联邦学习、贡献多少资源
收益:模型性能提升 vs 通讯成本/隐私泄露
"""
def __init__(self, clients, base_model_performance, communication_cost,
privacy_sensitivity):
"""
参数:
clients: 客户端列表
base_model_performance: 基础模型性能
communication_cost: 各客户端的通讯成本
privacy_sensitivity: 各客户端的隐私敏感度
"""
self.clients = clients
self.n_clients = len(clients)
self.base_performance = base_model_performance
self.communication_cost = communication_cost
self.privacy_sensitivity = privacy_sensitivity
def participation_payoff(self, client_idx, participation_decision,
global_model_quality):
"""
计算客户端参与联邦学习的收益
收益 = 模型提升 - 通讯成本 - 隐私成本
"""
if participation_decision == 0:
# 不参与:只获得基础模型
return 0
model_gain = global_model_quality - self.base_performance
comm_cost = self.communication_cost[client_idx]
privacy_cost = self.privacy_sensitivity[client_idx] * participation_decision
return model_gain - comm_cost - privacy_cost
def global_model_quality(self, participation_vector):
"""
根据参与率估计全局模型质量
简化为:质量 = base + α * participation_rate
"""
participation_rate = sum(participation_vector) / self.n_clients
return self.base_performance + 0.1 * participation_rate
def best_response(self, client_idx, others_decisions):
"""
客户端的最优响应:是否参与
"""
others_decisions = list(others_decisions)
others_decisions.insert(client_idx, 0) # 临时设为0
# 两种决策的收益对比
quality_with = self.global_model_quality(
others_decisions[:client_idx] + [1] + others_decisions[client_idx+1:]
)
quality_without = self.global_model_quality(others_decisions)
payoff_with = self.participation_payoff(client_idx, 1, quality_with)
payoff_without = 0 # 不参与的收益
return 1 if payoff_with > payoff_without else 0
def find_equilibrium(self):
"""
寻找博弈均衡
使用最佳响应动态迭代
"""
# 初始:所有人参与
participation = [1] * self.n_clients
for iteration in range(100):
changed = False
for i in range(self.n_clients):
best = self.best_response(i, participation[:i] + participation[i+1:])
if best != participation[i]:
participation[i] = best
changed = True
if not changed:
break
return participation
def incentive_analysis(self):
"""
激励分析:哪些客户端有动机参与
"""
results = []
for i, client in enumerate(self.clients):
# 不参与时的基准质量
base_quality = self.global_model_quality([0] * self.n_clients)
# 参与时的边际贡献
marginal_quality = self.global_model_quality([1] * self.n_clients)
marginal_gain = marginal_quality - base_quality
# 参与成本
cost = self.communication_cost[i] + self.privacy_sensitivity[i]
incentive_compatible = marginal_gain > cost
results.append({
'client': client,
'marginal_gain': marginal_gain,
'cost': cost,
'incentive_compatible': incentive_compatible
})
return results
class FederatedLearningMechanism:
"""
联邦学习激励机制设计
目标:
- 激励客户端诚实报告贡献
- 公平分配收益
- 鼓励高质量数据贡献
"""
def __init__(self, contribution_metrics):
"""
参数:
contribution_metrics: 各客户端的贡献度量
"""
self.contributions = contribution_metrics
def shapley_value(self):
"""
计算Shapley值:公平分配联合收益
Shapley值的公理化定义:
1. 效率性:分配等于所有参与人的边际贡献之和
2. 对称性:相同贡献的参与人获得相同分配
3. 虚拟性:无贡献参与人获得零分配
4. 可加性:合作收益是可加的
"""
n = len(self.contributions)
shapley = np.zeros(n)
# 精确Shapley值(对于线性贡献函数)
for i in range(n):
# 客户端i的Shapley值 = 贡献比例
total_contribution = sum(self.contributions)
shapley[i] = self.contributions[i] / total_contribution * sum(self.contributions) if total_contribution > 0 else 0
return shapley
def tmc_payment(self, client_idx, accuracy_with, accuracy_without):
"""
边际贡献支付(TMC)
支付 = accuracy_with - accuracy_without
"""
return accuracy_with - accuracy_without
def design_truthful_mechanism(self):
"""
设计激励相容的支付机制
使用VCG机制
"""
total_contribution = sum(self.contributions)
n = len(self.contributions)
payments = []
for i in range(n):
# 计算其他人排除i后的总贡献
others_contribution = total_contribution - self.contributions[i]
# VCG支付:社会福利损失
payment = total_contribution - others_contribution
payments.append(payment)
return payments12. 多智能体强化学习(MARL)
12.1 合作多智能体强化学习
class MultiAgentReinforcementLearning:
"""
多智能体强化学习框架
核心挑战:
1. 部分可观测性
2. 非静态环境(其他智能体也在学习)
3. 信用分配问题
4. 通信与协调
"""
def __init__(self, n_agents, state_dim, action_dim):
self.n_agents = n_agents
self.state_dim = state_dim
self.action_dim = action_dim
# 各智能体的策略网络
self.policies = [PolicyNetwork(state_dim, action_dim) for _ in range(n_agents)]
# 各智能体的价值网络
self.value_networks = [ValueNetwork(state_dim) for _ in range(n_agents)]
def centralized_training_decentralized_execution(self, state):
"""
中心化训练去中心化执行(CTDE)
训练时使用全局状态
执行时只使用本地观测
"""
# 训练阶段
all_actions = [policy(state) for policy in self.policies]
# 计算联合Q值
joint_q = self.compute_joint_q(state, all_actions)
# 更新各智能体的策略
for i, policy in enumerate(self.policies):
policy.update(joint_q[i])
def compute_joint_q(self, state, actions):
"""计算联合Q值"""
joint_q = []
for i, value_net in enumerate(self.value_networks):
q_i = value_net(state, actions)
joint_q.append(q_i)
return joint_q
class IndependentQLearning:
"""
独立Q学习(IQL)
每个智能体独立学习Q函数,忽略其他智能体的存在
简单但可能不稳定
"""
def __init__(self, n_agents, state_dim, action_dim, learning_rate=0.1, gamma=0.99):
self.n_agents = n_agents
self.learning_rate = learning_rate
self.gamma = gamma
# 每个智能体独立的Q表/网络
self.q_tables = [np.zeros((state_dim, action_dim)) for _ in range(n_agents)]
def select_action(self, agent_id, state, epsilon=0.1):
"""ε-贪心选择动作"""
if np.random.rand() < epsilon:
return np.random.randint(self.action_dim)
else:
return np.argmax(self.q_tables[agent_id][state])
def update(self, agent_id, state, action, reward, next_state, done):
"""更新Q值"""
current_q = self.q_tables[agent_id][state, action]
max_next_q = np.max(self.q_tables[agent_id][next_state])
td_target = reward + self.gamma * max_next_q * (1 - done)
td_error = td_target - current_q
self.q_tables[agent_id][state, action] += self.learning_rate * td_error
def train(self, env, num_episodes=1000):
"""训练过程"""
for episode in range(num_episodes):
state = env.reset()
done = False
while not done:
actions = [self.select_action(i, s) for i, s in enumerate(state)]
next_state, rewards, done, info = env.step(actions)
for i in range(self.n_agents):
self.update(i, state[i], actions[i], rewards[i],
next_state[i], done)
state = next_state
class QMIX:
"""
QMIX:联合Q值分解
核心思想:
- 学习全局Q值Q_tot
- 使用单调混合网络保证局部最优响应
- 可通过中心化训练去中心化执行
"""
def __init__(self, n_agents, state_dim, action_dim):
self.n_agents = n_agents
# 各智能体的Q网络
self.q_nets = [QNetwork(state_dim, action_dim) for _ in range(n_agents)]
# 混合网络:将局部Q值映射到全局Q值
self.mixer = MixingNetwork(n_agents, state_dim)
def get_q_values(self, states):
"""获取各智能体的Q值"""
return [q_net(state) for q_net, state in zip(self.q_nets, states)]
def get_total_q(self, states, actions):
"""获取混合后的全局Q值"""
q_values = self.get_q_values(states)
return self.mixer(q_values, states[0]) # 全局状态用于混合网络
def update(self, states, actions, rewards, next_states, done):
"""更新"""
# 计算目标Q值
with torch.no_grad():
next_q_total = self.get_total_q(next_states, actions)
target_q = torch.tensor(rewards) + self.gamma * next_q_total * (1 - torch.tensor(done))
# 计算当前Q值
current_q = self.get_total_q(states, actions)
# 损失
loss = F.mse_loss(current_q, target_q)
# 反向传播
self.mixer.optimizer.zero_grad()
for q_net in self.q_nets:
q_net.optimizer.zero_grad()
loss.backward()
self.mixer.optimizer.step()
for q_net in self.q_nets:
q_net.optimizer.step()
class CommNet:
"""
CommNet:通信网络
智能体之间通过通信协调行动
"""
def __init__(self, n_agents, state_dim, action_dim, hidden_dim=64):
self.n_agents = n_agents
self.comm_layers = 3
# 编码器
self.encoder = nn.Linear(state_dim, hidden_dim)
# 通信层
self.comm_layers_net = nn.ModuleList([
CommunicationLayer(hidden_dim) for _ in range(self.comm_layers)
])
# 解码器
self.decoder = nn.Linear(hidden_dim, action_dim)
def communicate(self, hidden_states):
"""
智能体之间交换隐藏状态
每轮通信中,智能体接收其他智能体消息的平均
"""
for comm_layer in self.comm_layers_net:
new_hidden = []
for i in range(self.n_agents):
# 聚合其他智能体的消息
message = torch.stack([hidden_states[j] for j in range(self.n_agents) if j != i]).mean(0)
new_hidden.append(comm_layer(hidden_states[i], message))
hidden_states = new_hidden
return hidden_states
def forward(self, observations):
"""前向传播"""
# 编码观测
hidden_states = [self.encoder(obs) for obs in observations]
# 通信
hidden_states = self.communicate(hidden_states)
# 解码为动作概率
action_probs = [F.softmax(self.decoder(h), dim=-1) for h in hidden_states]
return action_probs12.2 竞争多智能体强化学习
class SelfPlay:
"""
自我对弈(Self-Play)
智能体与自己过去的版本对弈
逐渐提升策略水平
"""
def __init__(self, policy):
self.policy = policy
self.policy_pool = [copy.deepcopy(policy)]
self.opponent_pool = [copy.deepcopy(policy)]
def select_opponent(self):
"""选择对手策略"""
# 随机选择过去的策略作为对手
return random.choice(self.opponent_pool)
def update(self, episode_history):
"""更新策略"""
# 使用 episode_history 更新当前策略
self.policy.update(episode_history)
# 定期将当前策略加入池
if len(self.policy_pool) > 10:
self.policy_pool.pop(0)
self.policy_pool.append(copy.deepcopy(self.policy))
if len(self.opponent_pool) > 10:
self.opponent_pool.pop(0)
self.opponent_pool.append(copy.deepcopy(self.policy))
def train(self, env, num_episodes=10000):
"""训练过程"""
for episode in range(num_episodes):
opponent = self.select_opponent()
# 对弈
episode_history = self.play(env, opponent)
# 更新
self.update(episode_history)
def play(self, env, opponent):
"""执行一局对弈"""
# 实现对弈逻辑
pass
class FictitiousSelfPlay:
"""
虚拟自我对弈(FSP)
结合虚拟对局和自我对弈
"""
def __init__(self, n_strategies):
self.n_strategies = n_strategies
self.strategy_history = []
self.payoff_matrix = np.zeros((n_strategies, n_strategies))
# 各策略的出现次数
self_strategy_counts = np.zeros(n_strategies)
opponent_strategy_counts = np.zeros(n_strategies)
def update_payoff(self, my_strategy, opponent_strategy, payoff):
"""更新收益矩阵"""
self.payoff_matrix[my_strategy, opponent_strategy] += payoff
self_strategy_counts[my_strategy] += 1
opponent_strategy_counts[opponent_strategy] += 1
def get_average_payoff(self, strategy, opponent_avg_strategy):
"""获取平均收益"""
return np.dot(self.payoff_matrix[strategy], opponent_avg_strategy)
def select_strategy(self, epsilon=0.1):
"""选择策略(带探索)"""
if np.random.rand() < epsilon:
return np.random.randint(self.n_strategies)
opponent_avg = opponent_strategy_counts / max(1, sum(opponent_strategy_counts))
expected_payoffs = self.get_average_payoff(range(self.n_strategies), opponent_avg)
return np.argmax(expected_payoffs)13. 博弈论与AI的交叉应用
13.1 生成对抗网络与博弈论
class GANasGameTheory:
"""
GAN的博弈论视角
GAN本质上是一个两人零和博弈
"""
def __init__(self, generator, discriminator):
self.G = generator
self.D = discriminator
def zero_sum_payoff(self, D_real, D_fake):
"""
零和支付
D的收益 = -G的收益
"""
D_payoff = np.mean(np.log(D_real) + np.log(1 - D_fake))
G_payoff = -D_payoff
return D_payoff, G_payoff
def find_nash_equilibrium(self):
"""
寻找Nash均衡
在均衡时:
- D(x) = 0.5 对所有x
- G的分布 = 真实数据分布
"""
pass
def analyze_training_dynamics(self):
"""
分析训练动态
理解GAN训练中的模式崩溃等问题
"""
pass
class GenerativeAdversarialNetworkGame:
"""
博弈论视角的GAN
扩展GAN理论到更一般的博弈结构
"""
def __init__(self, players, payoff_functions):
"""
参数:
players: 参与者列表
payoff_functions: 各参与者的收益函数
"""
self.players = players
self.payoffs = payoff_functions
def projected_gradient_descent(self, lr=0.01, num_iter=1000):
"""
投影梯度下降求解均衡
类似于GAN的训练过程
"""
strategies = {p: p.initial_strategy() for p in self.players}
for _ in range(num_iter):
# 计算各玩家的梯度
gradients = {}
for p in self.players:
gradients[p] = p.compute_gradient(strategies)
# 更新
for p in self.players:
strategies[p] = p.update(strategies[p], gradients[p], lr)
# 投影到可行集
strategies[p] = p.project(strategies[p])
return strategies13.2 AI安全与博弈论
class AIAlignmentGame:
"""
AI对齐博弈
人类与AI系统的策略交互
"""
def __init__(self):
self.human_strategies = ['trust', 'verify', 'refuse']
self.ai_strategies = ['cooperate', 'defect']
def payoff_matrix(self):
"""
定义支付矩阵
人类策略 vs AI策略 -> (人类收益, AI收益)
"""
return {
('trust', 'cooperate'): (10, 10),
('trust', 'defect'): (-5, 15),
('verify', 'cooperate'): (8, 8),
('verify', 'defect'): (5, 12),
('refuse', 'cooperate'): (0, 0),
('refuse', 'defect'): (0, 0)
}
class IncentiveCompatibleML:
"""
激励相容的机器学习系统
确保系统行为符合设计者意图
"""
@staticmethod
def design_surveillance_mechanism(budget, privacy_cost):
"""
设计监控机制
平衡监控收益和隐私成本
"""
pass
@staticmethod
def alignment_penalty(predictions, true_labels, incentive_alignment):
"""
对齐惩罚项
将激励相容性作为正则化项
"""
alignment_loss = 0
for pred, true, incentive in zip(predictions, true_labels, incentive_alignment):
if pred != true and incentive:
alignment_loss += 1
return alignment_loss14. 学术引用与参考文献
- Von Stackelberg, H. (1934). “Marktform und Gleichgewicht.” Springer.
- Maynard Smith, J. (1982). “Evolution and the Theory of Games.” Cambridge University Press.
- Hofbauer, J., & Sigmund, K. (1998). “Evolutionary Games and Population Dynamics.” Cambridge University Press.
- Shoham, Y., & Leyton-Brown, K. (2008). “Multiagent Systems: Algorithmic, Game-Theoretic, and Logical Foundations.” Cambridge University Press.
- Fudenberg, D., & Tirole, J. (1991). “Game Theory.” MIT Press.
- Myerson, R. B. (1991). “Game Theory: Analysis of Conflict.” Harvard University Press.
- McMahan, H. B., et al. (2017). “Communication-Efficient Learning of Deep Networks from Decentralized Data.” AISTATS.
- Roughgarden, T. (2010). “Algorithmic Game Theory.” Communications of the ACM, 53(7), 78-86.
- Osborne, M. J., & Rubinstein, A. (1994). “A Course in Game Theory.” MIT Press.
- Nisan, N., et al. (2007). “Algorithmic Game Theory.” Cambridge University Press.
- Littman, M. L. (1994). “Markov Games as a Framework for Multi-Agent Reinforcement Learning.” ICML.
- Lowe, R., et al. (2017). “Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments.” NeurIPS.
- Foerster, J., et al. (2018). “Counterfactual Multi-Agent Policy Gradients.” AAAI.
- Rashid, T., et al. (2018). “QMIX: Monotonic Value Function Factorisation for Deep Multi-Agent Reinforcement Learning.” ICML.
- Kraemer, L., & Banerjee, B. (2016). “Multi-Agent Reinforcement Learning as a Rehearsal for Decentralized Planning.” Neurocomputing.