随机算法与蒙特卡洛方法实战
关键词速览
| 关键词 | 解释 |
|---|---|
| Las Vegas | 保证正确的随机算法,运行时间随机 |
| Monte Carlo | 运行时间固定,结果可能有小概率错误 |
| 蒙特卡洛积分 | 用随机采样估计积分值 |
| MCTS | 蒙特卡洛树搜索,AlphaGo的核心 |
| Miller-Rabin | 素数测试的随机化算法 |
| Karger算法 | 随机化最小割算法 |
| 期望时间 | 算法运行时间的期望值 |
| 概率保证 | 算法正确的概率下界 |
摘要
随机算法是一类在执行过程中使用随机数的算法。它们往往比确定性算法更简单、更快,甚至在某些情况下更强大。从排序算法到密码学,从游戏AI到科学计算,随机算法的应用无处不在。本文深入讲解Las Vegas和Monte Carlo两大随机算法类型,介绍蒙特卡洛方法的原理,通过代码实战展示如何在素数测试、最小割查找、游戏AI等场景中使用随机算法,以及如何分析随机算法的期望性能和概率保证。
0. 为什么需要随机算法?
0.1 一个生活中的例子
假设你要在一堆简历中找出最好的候选人,但你只有时间面试5个人。
确定性方法:面试前5份简历。但这可能不巧都是一般的。
随机方法:随机选5份简历面试。平均而言,这5份比随机挑的更有代表性。
这个例子展示了随机性的一个关键优势:随机选择可以避免最坏情况输入的陷阱。
0.2 随机算法的发展史
随机算法的历史可以追溯到20世纪50年代:
- 1957年:Monte Carlo方法用于核物理模拟
- 1977年:Rabin和Miller提出素数测试的随机化算法
- 1980年:Karp提出随机化归约
- 1987年:随机化快速排序成为标准
- 2006年:Coulom提出蒙特卡洛树搜索(MCTS)
- 2016年:AlphaGo使用MCTS击败围棋世界冠军
0.3 随机算法的分类
随机算法
├── Las Vegas
│ └── 保证正确,结果确定
│ └── 运行时间随机
│ └── 例如:快速排序(随机化版本)
│
└── Monte Carlo
├── 运行时间确定
├── 结果可能小概率错误
├── 可以通过多次运行提高正确率
└── 例如:Miller-Rabin素数测试
1. Las Vegas算法:永远正确,只是慢一点
1.1 什么是Las Vegas算法?
Las Vegas算法的特点是:保证得到正确答案,但运行时间是随机的。
这听起来有点矛盾——如果总是正确,那”随机”体现在哪里?
关键在于:算法可能在某些随机选择上”运气不好”,需要重新尝试。但每次尝试都保证是正确的,只是可能要多试几次。
1.2 随机化快速排序
快速排序的随机化版本是一个经典的Las Vegas算法。
为什么需要随机化?
标准的快速排序如果每次选的枢轴恰好是最大或最小值,就会退化为O(n²)。但如果输入是”精心设计”的对抗数据,确定性算法可能被卡住。
随机化快速排序的思想是:随机选择枢轴。这样即使有人想”攻击”你的算法,也很难预测你的随机选择。
import random
from typing import List
def randomized_quicksort(arr: List[int]) -> List[int]:
"""随机化快速排序(Las Vegas:总是返回正确结果)"""
if len(arr) <= 1:
return arr
# 随机选择枢轴
pivot_idx = random.randint(0, len(arr) - 1)
pivot = arr[pivot_idx]
# 分区
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
# 递归排序
return randomized_quicksort(left) + middle + randomized_quicksort(right)
def deterministic_quicksort(arr: List[int]) -> List[int]:
"""确定性快速排序(用第一个元素作为枢轴)"""
if len(arr) <= 1:
return arr
pivot = arr[0]
left = [x for x in arr[1:] if x < pivot]
right = [x for x in arr[1:] if x >= pivot]
return deterministic_quicksort(left) + [pivot] + deterministic_quicksort(right)
def worst_case_input(n: int) -> List[int]:
"""生成最坏情况输入(已排序数组)"""
return list(range(n))
def test_quicksort():
"""测试随机化vs确定性快速排序"""
import time
print("=" * 60)
print("随机化快速排序 vs 确定性快速排序")
print("=" * 60)
sizes = [1000, 5000, 10000]
for n in sizes:
# 正常情况
arr = [random.randint(0, 10000) for _ in range(n)]
start = time.time()
randomized_quicksort(arr.copy())
rand_time = time.time() - start
start = time.time()
deterministic_quicksort(arr.copy())
det_time = time.time() - start
print(f"\nn={n}(随机输入):")
print(f" 随机化快排: {rand_time*1000:.2f}ms")
print(f" 确定性快排: {det_time*1000:.2f}ms")
# 最坏情况
worst = worst_case_input(n)
start = time.time()
randomized_quicksort(worst.copy())
rand_time = time.time() - start
# 确定性快排在最坏情况下非常慢
print(f"\nn={n}(已排序输入,最坏情况):")
print(f" 随机化快排: {rand_time*1000:.2f}ms")
print(f" 确定性快排: ⚠️ 非常慢(O(n²))")
if __name__ == "__main__":
test_quicksort()1.3 随机化选择算法
问题:在数组中找出第k小的元素。
确定性算法:用BFPRT(Median of Medians)可以达到O(n),但常数很大。
随机化算法:快速选择(Quickselect),期望O(n)。
def randomized_select(arr: List[int], k: int) -> int:
"""
随机化选择算法:找第k小的元素
期望时间: O(n)
Las Vegas: 保证正确
"""
if len(arr) == 1:
return arr[0]
# 随机选择枢轴
pivot_idx = random.randint(0, len(arr) - 1)
pivot = arr[pivot_idx]
# 分区
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
if k < len(left):
return randomized_select(left, k)
elif k < len(left) + len(middle):
return pivot
else:
return randomized_select(right, k - len(left) - len(middle))
def quickselect(arr: List[int], k: int) -> int:
"""原地版本的快速选择"""
arr = arr.copy()
while True:
if len(arr) == 1:
return arr[0]
pivot_idx = random.randint(0, len(arr) - 1)
pivot = arr[pivot_idx]
# 三路分区
i = 0
less = []
equal = []
greater = []
for x in arr:
if x < pivot:
less.append(x)
elif x == pivot:
equal.append(x)
else:
greater.append(x)
if k < len(less):
arr = less
elif k < len(less) + len(equal):
return pivot
else:
arr = greater
k -= len(less) + len(equal)
def test_select():
"""测试随机化选择"""
print("\n" + "=" * 60)
print("随机化选择算法测试")
print("=" * 60)
arr = [random.randint(0, 100) for _ in range(20)]
print(f"数组: {sorted(arr)}")
for k in [0, 4, 9, 14, 19]:
result = randomized_select(arr.copy(), k)
expected = sorted(arr)[k]
status = "✓" if result == expected else "✗"
print(f"第{k+1}小的元素: {result} (期望: {expected}) {status}")
if __name__ == "__main__":
test_select()2. Monte Carlo算法:可能犯错,但概率很小
2.1 Monte Carlo算法原理
与Las Vegas相反,Monte Carlo算法的特点是:运行时间确定,但结果可能小概率错误。
Monte Carlo算法的核心思想是:用随机采样来估计正确答案。只要采样足够多,估计值会以很高概率接近真实值。
2.2 蒙特卡洛积分
蒙特卡洛方法最经典的应用之一是计算积分。
问题:计算∫₀¹ f(x)dx,其中f(x)可能是复杂函数。
方法:在[0,1]×[0, max(f)]的矩形中随机投点,统计落在f(x)曲线下方的比例。
import random
import math
def monte_carlo_integral(f, a, b, max_f, n_samples=1000000):
"""
蒙特卡洛积分
f: 被积函数
a, b: 积分区间
max_f: f在[a,b]上的最大值估计
n_samples: 采样点数
"""
count = 0
for _ in range(n_samples):
x = random.uniform(a, b)
y = random.uniform(0, max_f)
if y <= f(x):
count += 1
area = (b - a) * max_f
return area * (count / n_samples)
def estimate_pi(n_samples=1000000):
"""
用蒙特卡洛方法估计π
原理:单位圆面积 = π,在[-1,1]×[-1,1]的正方形中投点
"""
count = 0
for _ in range(n_samples):
x = random.uniform(-1, 1)
y = random.uniform(-1, 1)
if x*x + y*y <= 1:
count += 1
# 正方形面积 = 4,圆面积 = π
# count/n ≈ π/4
return 4 * count / n_samples
def demo_monte_carlo_integration():
"""蒙特卡洛积分演示"""
print("=" * 60)
print("蒙特卡洛积分演示")
print("=" * 60)
# 估计π
print("\n估计π:")
for n in [100, 1000, 10000, 100000, 1000000]:
pi_estimate = estimate_pi(n)
error = abs(pi_estimate - math.pi)
print(f" n={n:>10}: π ≈ {pi_estimate:.8f}, 误差: {error:.8f}")
print(f"\n真实π: {math.pi:.8f}")
# 估计其他积分
print("\n估计∫₀¹ x² dx = 1/3:")
for n in [100, 1000, 10000, 100000]:
result = monte_carlo_integral(lambda x: x**2, 0, 1, 1, n)
print(f" n={n:>10}: 积分 ≈ {result:.8f}")
print(f"\n真实值: {1/3:.8f}")
# 复杂函数的积分
print("\n估计∫₀¹ sin(x) dx = 1 - cos(1):")
for n in [100, 1000, 10000, 100000]:
result = monte_carlo_integral(math.sin, 0, 1, 1, n)
print(f" n={n:>10}: 积分 ≈ {result:.8f}")
print(f"\n真实值: {1 - math.cos(1):.8f}")
if __name__ == "__main__":
demo_monte_carlo_integration()2.3 蒙特卡洛积分的可视化
import matplotlib.pyplot as plt
import numpy as np
def visualize_pi_estimation(n_samples=5000):
"""可视化π的蒙特卡洛估计"""
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# 随机投点
x_inside = []
y_inside = []
x_outside = []
y_outside = []
for _ in range(n_samples):
x = random.uniform(-1, 1)
y = random.uniform(-1, 1)
if x*x + y*y <= 1:
x_inside.append(x)
y_inside.append(y)
else:
x_outside.append(x)
y_outside.append(y)
# 左图:散点图
axes[0].scatter(x_outside, y_outside, c='red', s=1, alpha=0.5, label='在圆外')
axes[0].scatter(x_inside, y_inside, c='blue', s=1, alpha=0.5, label='在圆内')
circle = plt.Circle((0, 0), 1, fill=False, color='green', linewidth=2)
axes[0].add_patch(circle)
axes[0].set_xlim(-1.1, 1.1)
axes[0].set_ylim(-1.1, 1.1)
axes[0].set_aspect('equal')
axes[0].legend()
axes[0].set_title(f'蒙特卡洛π估计 (n={n_samples})')
# 右图:估计值收敛图
estimates = []
for n in range(100, n_samples + 1, 100):
count = 0
for _ in range(100):
x = random.uniform(-1, 1)
y = random.uniform(-1, 1)
if x*x + y*y <= 1:
count += 1
estimates.append(4 * count / 100)
axes[1].plot(range(100, n_samples + 1, 100), estimates, 'b-', alpha=0.5)
axes[1].axhline(y=math.pi, color='r', linestyle='--', label=f'π={math.pi:.6f}')
axes[1].set_xlabel('采样数')
axes[1].set_ylabel('π的估计')
axes[1].set_title('π估计的收敛性')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('monte_carlo_pi.png', dpi=150)
print("可视化已保存到 monte_carlo_pi.png")
if __name__ == "__main__":
visualize_pi_estimation()3. Miller-Rabin素数测试:密码学的基石
3.1 素数测试的重要性
素数测试是现代密码学的基石。RSA加密需要生成大素数,而测试一个几百位的大数是否是素数,必须用快速算法。
确定性素数测试:AKS算法可以在多项式时间内判定素数,但对于实际应用来说太慢了。
随机素数测试:Miller-Rabin测试可以在毫秒内测试一个100位以上的数,而且错误概率可以控制在极低水平。
3.2 Miller-Rabin算法原理
Miller-Rabin基于数论中的一个定理:
Fermat小定理:如果p是素数,a是任意不是p倍数的整数,则 a^(p-1) ≡ 1 (mod p)。
逆否命题:如果存在a使得 a^(n-1) ≢ 1 (mod n),则n一定是合数。
Miller-Rabin测试的思想是:随机选几个a,检查是否满足条件。如果任何一个a不满足,n一定是合数;如果都满足,n”很可能是”素数。
3.3 Python实现Miller-Rabin
import random
import math
def is_probable_prime(n: int, rounds: int = 10) -> bool:
"""
Miller-Rabin素数测试
返回: True表示"可能是素数",False表示一定是合数
错误概率: <= 4^(-rounds)
"""
if n < 2:
return False
if n == 2 or n == 3:
return True
if n % 2 == 0:
return False
# 将n-1写成 2^r * d 的形式
r, d = 0, n - 1
while d % 2 == 0:
r += 1
d //= 2
def is_strong_probable_prime(a: int) -> bool:
"""检查a是否是n的强概率素数"""
x = pow(a, d, n) # a^d mod n
if x == 1 or x == n - 1:
return True
for _ in range(r - 1):
x = pow(x, 2, n)
if x == n - 1:
return True
return False
# 进行多轮测试
for _ in range(rounds):
a = random.randrange(2, n - 1)
if not is_strong_probable_prime(a):
return False
return True
def generate_prime(bits: int = 256) -> int:
"""生成指定位数的素数"""
while True:
# 生成随机奇数
n = random.getrandbits(bits) | (1 << bits - 1) | 1
# Miller-Rabin测试
if is_probable_prime(n, rounds=20):
return n
def trial_division(n: int, limit: int = 1000) -> bool:
"""朴素试除法(用于小规模测试)"""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
i = 3
while i * i <= limit and i * i <= n:
if n % i == 0:
return False
i += 2
return True
def test_miller_rabin():
"""测试Miller-Rabin算法"""
print("=" * 60)
print("Miller-Rabin素数测试")
print("=" * 60)
# 测试小素数
test_numbers = [2, 3, 17, 97, 127, 997, 1009, 10007]
print("\n小规模素数测试:")
for n in test_numbers:
result = is_probable_prime(n)
status = "✓" if trial_division(n, int(math.sqrt(n))+1) == result else "✗"
print(f" {n}: {result} {status}")
# 测试合数
print("\n合数测试:")
composites = [4, 9, 15, 21, 91, 341, 561] # 561是Carmichael数
for n in composites:
result = is_probable_prime(n)
status = "✓" if result == False else "✗ (错误)"
print(f" {n}: {result} {status}")
# 性能测试
import time
large_numbers = [
104729, # 10000以内的素数
15485863, # 100万以内的素数
179424673, # 1000万以内的素数
]
print("\n大数性能测试:")
for n in large_numbers:
start = time.time()
for _ in range(100):
is_probable_prime(n)
rand_time = time.time() - start
start = time.time()
for _ in range(100):
trial_division(n, int(math.sqrt(n))+1)
det_time = time.time() - start
print(f" {n}: Miller-Rabin={rand_time*1000:.2f}ms, 试除法={det_time*1000:.2f}ms")
# 生成大素数
print("\n生成大素数:")
for bits in [64, 128, 256]:
start = time.time()
p = generate_prime(bits)
gen_time = time.time() - start
print(f" {bits}位素数: 生成时间={gen_time*1000:.2f}ms")
print(f" {p}")
# 错误概率分析
print(f"\n错误概率分析:")
print(f" 10轮测试: P(error) <= 4^(-10) = {4**(-10):.10f}")
print(f" 20轮测试: P(error) <= 4^(-20) = {4**(-20):.20f}")
if __name__ == "__main__":
test_miller_rabin()4. Karger算法:随机化最小割
4.1 最小割问题
最小割(Minimum Cut):在图中找一组边,删除后图变得不连通,且这组边的总权重最小。
确定性算法:Stoer-Wagner算法,O(VE log V)。
随机化算法:Karger算法更简单、更快,虽然结果可能不是最优,但可以通过重复运行来提高正确率。
4.2 Karger算法原理
Karger算法的核心操作是边收缩(Edge Contraction):
1. 随机选择一条边(u, v)
2. 将u和v合并成一个新顶点
3. 删除自环
4. 重复直到只剩2个顶点
5. 这2个顶点之间的边就是割
为什么能工作?
如果最小割有k条边,每条边被选中的概率是1/C(C-1),其中C是顶点数。在收缩过程中,最小割被破坏的概率随着C减小而增加。
重复运行:运行Karger算法N次,选择最小的割。正确率可以通过N来控制。
4.3 Python实现Karger算法
import random
from typing import Set, Tuple, Dict, List
import networkx as nx
class KargerMinCut:
"""Karger随机化最小割算法"""
def __init__(self, graph: nx.Graph):
self.original_graph = graph
self.n = graph.number_of_nodes()
def contract_edge(self, graph: nx.Graph) -> nx.Graph:
"""随机收缩一条边"""
edges = list(graph.edges())
if len(edges) == 0:
return graph
# 随机选择一条边
u, v = random.choice(edges)
# 创建新图
new_graph = nx.Graph()
# 映射旧顶点到新顶点
contraction = {v: u} # v被合并到u
# 添加所有顶点和边
for node in graph.nodes():
if node == v:
continue
new_node = contraction.get(node, node)
for neighbor in graph.neighbors(node):
if neighbor == v:
continue
new_neighbor = contraction.get(neighbor, neighbor)
if new_node != new_neighbor: # 跳过自环
if new_graph.has_edge(new_node, new_neighbor):
new_graph[new_node][new_neighbor]['weight'] += graph[node][neighbor].get('weight', 1)
else:
new_graph.add_edge(new_node, new_neighbor,
weight=graph[node][neighbor].get('weight', 1))
return new_graph
def min_cut_once(self) -> Tuple[int, Set[Tuple[int, int]]]:
"""运行一次Karger算法"""
G = self.original_graph.copy()
while G.number_of_nodes() > 2:
G = self.contract_edge(G)
# 最后两个顶点之间的边就是割
remaining_nodes = list(G.nodes())
u, v = remaining_nodes[0], remaining_nodes[1]
cut_edges = set()
for edge in G.edges():
cut_edges.add(tuple(sorted(edge)))
cut_weight = sum(G[u][v].get('weight', 1) for _ in G.neighbors(v) if u in G.neighbors(v))
return cut_weight, cut_edges
def min_cut(self, trials: int = None) -> Tuple[int, Set[Tuple[int, int]]]:
"""
多次运行Karger算法
如果不指定trials,使用理论最优值: n*n/2
"""
if trials is None:
trials = self.n * self.n // 2
best_cut = float('inf')
best_edges = set()
for _ in range(trials):
cut_weight, cut_edges = self.min_cut_once()
if cut_weight < best_cut:
best_cut = cut_weight
best_edges = cut_edges
return best_cut, best_edges
def test_karger():
"""测试Karger算法"""
print("=" * 60)
print("Karger随机化最小割算法")
print("=" * 60)
# 创建测试图
G = nx.Graph()
G.add_edge(1, 2, weight=1)
G.add_edge(1, 3, weight=1)
G.add_edge(1, 4, weight=1)
G.add_edge(2, 3, weight=1)
G.add_edge(3, 4, weight=1)
G.add_edge(4, 5, weight=1)
G.add_edge(4, 6, weight=1)
G.add_edge(5, 6, weight=1)
print("\n图结构:")
print(f"顶点数: {G.number_of_nodes()}")
print(f"边数: {G.number_of_edges()}")
# 使用Karger算法
karger = KargerMinCut(G)
# 单次运行
cut_weight, cut_edges = karger.min_cut_once()
print(f"\n单次Karger算法:")
print(f" 最小割权重: {cut_weight}")
print(f" 割边: {cut_edges}")
# 多次运行
cut_weight, cut_edges = karger.min_cut(trials=100)
print(f"\n100次运行取最优:")
print(f" 最小割权重: {cut_weight}")
print(f" 割边: {cut_edges}")
# 对比确定性算法
print("\n--- 对比确定性算法 ---")
stoer Wagner not available, using networkx's built-in
if hasattr(nx, 'stoer_wagner'):
min_cut_weight, partition = nx.stoer_wagner(G)
print(f"Stoer-Wagner结果: {min_cut_weight}")
# 更复杂的图测试
print("\n" + "=" * 60)
print("复杂图测试")
print("=" * 60)
# Erdős–Rényi随机图
for n in [20, 50]:
G_random = nx.gnp_random_graph(n, 0.3, seed=42)
karger = KargerMinCut(G_random)
cut_weight, _ = karger.min_cut(trials=n * n // 2)
print(f"\nn={n}的随机图:")
print(f" Karger最小割: {cut_weight}")
print(f" 边数: {G_random.number_of_edges()}")
if __name__ == "__main__":
test_karger()5. 蒙特卡洛树搜索:游戏AI的核心
5.1 MCTS原理
蒙特卡洛树搜索(Monte Carlo Tree Search, MCTS)是近年来最重要的随机算法之一,它让计算机在围棋、象棋等复杂博弈游戏中达到了超越人类的水平。
MCTS的核心循环:
1. 选择(Selection):从根节点开始,用UCB1选择子节点,直到找到未完全展开的节点
2. 扩展(Expansion):添加一个新的子节点
3. 模拟(Simulation):随机 rollout 直到游戏结束
4. 回溯(Backpropagation):更新路径上所有节点的统计信息
UCB1公式:
UCB1(u) = Q(u)/N(u) + C × √(ln(N(parent)) / N(u))
其中Q是平均得分,N是访问次数,C是探索常数。
5.2 Python实现MCTS
import math
import random
from typing import Dict, Optional, Tuple, List
import copy
class MCTSNode:
"""MCTS树节点"""
def __init__(self, state, parent=None, action=None):
self.state = state # 游戏状态
self.parent = parent # 父节点
self.action = action # 从父节点到当前节点的动作
self.children: Dict[any, MCTSNode] = {} # 子节点
self.visits = 0 # 访问次数
self.wins = 0 # 胜利次数
self.untried_actions = state.get_legal_actions()
def ucb1(self, c=1.414) -> float:
"""UCB1公式"""
if self.visits == 0:
return float('inf')
exploitation = self.wins / self.visits
exploration = c * math.sqrt(math.log(self.parent.visits) / self.visits)
return exploitation + exploration
def is_fully_expanded(self) -> bool:
return len(self.untried_actions) == 0
def is_terminal(self) -> bool:
return self.state.is_game_over()
class TicTacToe:
"""井字棋游戏"""
def __init__(self, board=None, current_player=1):
if board is None:
self.board = [[0, 0, 0] for _ in range(3)]
else:
self.board = [row[:] for row in board]
self.current_player = current_player # 1=X, -1=O
def clone(self):
return TicTacToe(self.board, self.current_player)
def get_legal_actions(self) -> List[Tuple[int, int]]:
return [(i, j) for i in range(3) for j in range(3)
if self.board[i][j] == 0]
def apply_action(self, action: Tuple[int, int]):
i, j = action
self.board[i][j] = self.current_player
self.current_player *= -1
def is_game_over(self) -> bool:
# 检查行
for i in range(3):
if self.board[i][0] == self.board[i][1] == self.board[i][2] != 0:
return True
# 检查列
for j in range(3):
if self.board[0][j] == self.board[1][j] == self.board[2][j] != 0:
return True
# 检查对角线
if self.board[0][0] == self.board[1][1] == self.board[2][2] != 0:
return True
if self.board[0][2] == self.board[1][1] == self.board[2][0] != 0:
return True
# 检查平局
if all(self.board[i][j] != 0 for i in range(3) for j in range(3)):
return True
return False
def get_winner(self) -> int:
"""返回胜者: 1=X赢, -1=O赢, 0=平局, None=游戏未结束"""
# 检查行
for i in range(3):
if self.board[i][0] == self.board[i][1] == self.board[i][2] != 0:
return self.board[i][0]
# 检查列
for j in range(3):
if self.board[0][j] == self.board[1][j] == self.board[2][j] != 0:
return self.board[0][j]
# 检查对角线
if self.board[0][0] == self.board[1][1] == self.board[2][2] != 0:
return self.board[0][0]
if self.board[0][2] == self.board[1][1] == self.board[2][0] != 0:
return self.board[0][2]
# 检查平局
if all(self.board[i][j] != 0 for i in range(3) for j in range(3)):
return 0
return None
def get_result(self, player: int) -> float:
"""返回player视角的结果: 1=赢, 0.5=平, 0=输"""
winner = self.get_winner()
if winner is None:
return 0.5 # 游戏未结束
if winner == player:
return 1.0
elif winner == 0:
return 0.5
else:
return 0.0
def print_board(self):
symbols = {0: '.', 1: 'X', -1: 'O'}
for row in self.board:
print(' '.join(symbols[x] for x in row))
print()
class MCTS:
"""蒙特卡洛树搜索"""
def __init__(self, game: TicTacToe, simulations=1000):
self.game = game
self.simulations = simulations
self.root = MCTSNode(game)
def selection(self, node: MCTSNode) -> MCTSNode:
"""选择:UCB1最高的完全展开节点"""
while not node.is_terminal():
if node.is_fully_expanded():
node = self.best_child(node)
else:
return node
return node
def expansion(self, node: MCTSNode) -> MCTSNode:
"""扩展:添加一个新子节点"""
action = random.choice(node.untried_actions)
node.untried_actions.remove(action)
new_state = node.state.clone()
new_state.apply_action(action)
new_node = MCTSNode(new_state, parent=node, action=action)
node.children[action] = new_node
return new_node
def simulation(self, node: MCTSNode) -> float:
"""模拟:随机 rollout"""
state = node.state.clone()
player = 1 # X
while not state.is_game_over():
actions = state.get_legal_actions()
action = random.choice(actions)
state.apply_action(action)
return state.get_result(player)
def backpropagation(self, node: MCTSNode, result: float):
"""回溯:更新统计信息"""
while node is not None:
node.visits += 1
node.wins += result
node = node.parent
def run(self) -> Tuple[int, int]:
"""运行MCTS,返回最佳动作"""
for _ in range(self.simulations):
# 选择
node = self.selection(self.root)
# 扩展
if not node.is_terminal() and node.untried_actions:
node = self.expansion(node)
# 模拟
result = self.simulation(node)
# 回溯
self.backpropagation(node, result)
# 选择访问次数最多的子节点
best_action = max(self.root.children.keys(),
key=lambda a: self.root.children[a].visits)
return best_action
def update_root(self, action: Tuple[int, int]):
"""更新根节点(对手走后)"""
if action in self.root.children:
self.root = self.root.children[action]
self.root.parent = None
else:
# 对方走了我们没有探索的动作,重置
self.game.apply_action(action)
self.root = MCTSNode(self.game)
def demo_mcts_tictactoe():
"""MCTS井字棋演示"""
print("=" * 60)
print("MCTS井字棋 AI vs 随机对手")
print("=" * 60)
game = TicTacToe()
mcts = MCTS(game, simulations=1000)
print("初始棋盘:")
game.print_board()
# AI先手
ai_player = 1
for turn in range(9):
if game.current_player == ai_player:
# AI的回合
action = mcts.run()
game.apply_action(action)
mcts.update_root(action)
print(f"AI (X) 下在 ({action[0]}, {action[1]})")
else:
# 随机对手
actions = game.get_legal_actions()
action = random.choice(actions)
game.apply_action(action)
mcts.update_root(action)
print(f"对手 (O) 下在 ({action[0]}, {action[1]})")
game.print_board()
if game.is_game_over():
winner = game.get_winner()
if winner == ai_player:
print("AI获胜!")
elif winner == 0:
print("平局!")
else:
print("对手获胜!")
break
def mcts_vs_mcts(n_games=100):
"""MCTS AI vs MCTS AI"""
print("\n" + "=" * 60)
print(f"MCTS vs MCTS: {n_games}局对弈统计")
print("=" * 60)
results = {'X_wins': 0, 'O_wins': 0, 'draws': 0}
for game_num in range(n_games):
game = TicTacToe()
mcts = MCTS(game, simulations=500)
while not game.is_game_over():
action = mcts.run()
game.apply_action(action)
mcts.update_root(action)
winner = game.get_winner()
if winner == 1:
results['X_wins'] += 1
elif winner == -1:
results['O_wins'] += 1
else:
results['draws'] += 1
print(f"X (先手) 胜: {results['X_wins']}")
print(f"O (后手) 胜: {results['O_wins']}")
print(f"平局: {results['draws']}")
if __name__ == "__main__":
demo_mcts_tictactoe()
mcts_vs_mcts(100)6. 随机化负载均衡:Power of Two Choices
6.1 负载均衡问题
问题:有n个服务器,m个请求。如何分配请求使得负载均衡?
简单哈希:hash(request) % n。可能导致负载不均(哈希碰撞)。
随机选择:随机选两个服务器,选择负载较小的那个。
这就是”Power of Two Choices”(两种选择的威力)!
6.2 为什么两种选择就够了?
数学上可以证明:使用两种选择,最大负载从O(log n / log log n)降到O(log log n)。
这意味着:即使只有两种选择,负载均衡也好得多。
6.3 Python实现
import random
from collections import defaultdict
import matplotlib.pyplot as plt
class LoadBalancer:
"""随机化负载均衡器"""
def __init__(self, n_servers: int):
self.n_servers = n_servers
self.loads = [0] * n_servers
self.assignments = []
def random_assignment(self, request_id: int) -> int:
"""简单随机分配"""
server = random.randint(0, self.n_servers - 1)
self.loads[server] += 1
self.assignments.append(server)
return server
def two_choices(self, request_id: int) -> int:
"""两种选择"""
# 随机选两个服务器
s1 = random.randint(0, self.n_servers - 1)
s2 = random.randint(0, self.n_servers - 1)
# 选择负载较小的
if self.loads[s1] <= self.loads[s2]:
chosen = s1
else:
chosen = s2
self.loads[chosen] += 1
self.assignments.append(chosen)
return chosen
def get_max_load(self) -> int:
return max(self.loads)
def get_load_variance(self) -> float:
mean = sum(self.loads) / self.n_servers
return sum((l - mean) ** 2 for l in self.loads) / self.n_servers
def reset(self):
self.loads = [0] * self.n_servers
self.assignments = []
def test_load_balancing():
"""测试负载均衡算法"""
print("=" * 60)
print("负载均衡算法测试")
print("=" * 60)
n_servers = 100
n_requests = 10000
# 测试简单随机
lb1 = LoadBalancer(n_servers)
for i in range(n_requests):
lb1.random_assignment(i)
print(f"\n简单随机分配 (n={n_requests}请求, {n_servers}服务器):")
print(f" 最大负载: {lb1.get_max_load()}")
print(f" 负载方差: {lb1.get_load_variance():.2f}")
print(f" 理想负载: {n_requests / n_servers:.2f}")
# 测试两种选择
lb2 = LoadBalancer(n_servers)
for i in range(n_requests):
lb2.two_choices(i)
print(f"\n两种选择 (n={n_requests}请求, {n_servers}服务器):")
print(f" 最大负载: {lb2.get_max_load()}")
print(f" 负载方差: {lb2.get_load_variance():.2f}")
print(f" 理想负载: {n_requests / n_servers:.2f}")
# 多次实验统计
print(f"\n" + "=" * 60)
print("大规模统计 (100次实验)")
print("=" * 60)
n_experiments = 100
n_servers_list = [10, 50, 100, 500]
n_requests = 10000
results = {'random': [], 'two_choices': []}
for _ in range(n_experiments):
lb1 = LoadBalancer(n_servers)
for i in range(n_requests):
lb1.random_assignment(i)
results['random'].append(lb1.get_max_load())
lb2 = LoadBalancer(n_servers)
for i in range(n_requests):
lb2.two_choices(i)
results['two_choices'].append(lb2.get_max_load())
for n_servers in n_servers_list:
lb1 = LoadBalancer(n_servers)
lb2 = LoadBalancer(n_servers)
for i in range(n_requests):
lb1.random_assignment(i)
lb2.two_choices(i)
print(f"\nn={n_servers}服务器, {n_requests}请求:")
print(f" 随机分配 - 最大负载: {lb1.get_max_load():.2f}")
print(f" 两种选择 - 最大负载: {lb2.get_max_load():.2f}")
print(f" 理论最大负载: {n_requests / n_servers:.2f}")
def visualize_load_distribution():
"""可视化负载分布"""
n_servers = 50
n_requests = 5000
lb1 = LoadBalancer(n_servers)
lb2 = LoadBalancer(n_servers)
for i in range(n_requests):
lb1.random_assignment(i)
lb2.two_choices(i)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].bar(range(n_servers), lb1.loads, color='steelblue')
axes[0].axhline(y=n_requests/n_servers, color='r', linestyle='--',
label=f'理想负载={n_requests/n_servers:.1f}')
axes[0].set_xlabel('服务器编号')
axes[0].set_ylabel('负载')
axes[0].set_title(f'简单随机分配 (max={lb1.get_max_load()})')
axes[0].legend()
axes[1].bar(range(n_servers), lb2.loads, color='darkorange')
axes[1].axhline(y=n_requests/n_servers, color='r', linestyle='--',
label=f'理想负载={n_requests/n_servers:.1f}')
axes[1].set_xlabel('服务器编号')
axes[1].set_ylabel('负载')
axes[1].set_title(f'两种选择 (max={lb2.get_max_load()})')
axes[1].legend()
plt.tight_layout()
plt.savefig('load_balancing.png', dpi=150)
print("可视化已保存到 load_balancing.png")
if __name__ == "__main__":
test_load_balancing()
visualize_load_distribution()7. 随机化算法分析
7.1 期望时间分析
随机化算法的”快”通常指的是期望运行时间。
定义:设T(x)是算法在输入x上的运行时间,则期望运行时间是E[T(x)],其中期望是关于算法内部随机选择的。
分析方法:
- 线性期望可加
- 全概率公式
- 指示器随机变量
7.2 概率保证
随机化算法通常给出概率保证:
- “以概率≥1-1/n成功”
- “期望O(n log n),且以≥1-1/n²的概率至少这么快”
例子:快速排序的期望比较次数是O(n log n)。但实际上,运气不好时可能达到O(n²)。我们可以证明:以概率≥1-1/n²,运行时间不超过某个界限。
7.3 尾部不等式
Chernoff界:对于独立随机变量的和,尾部概率呈指数下降。
设X = X₁ + X₂ + … + Xₙ,每个Xᵢ是[0,1]的独立随机变量,E[Xᵢ] = pᵢ。
则对于δ > 0:
- P[X > (1+δ)μ] ≤ exp(-δ²μ/3)
- P[X < (1-δ)μ] ≤ exp(-δ²μ/2)
应用:如果负载均衡中每个请求独立随机分配,则最大负载以很高概率接近平均值。
8. 高级应用:MCTS实现2048游戏AI
class Game2048:
"""2048游戏"""
def __init__(self, board=None):
if board is None:
self.board = [[0]*4 for _ in range(4)]
self.add_random_tile()
self.add_random_tile()
else:
self.board = [row[:] for row in board]
def clone(self):
return Game2048(self.board)
def get_legal_actions(self):
"""返回可执行的动作: 'up', 'down', 'left', 'right'"""
actions = []
for direction in ['up', 'down', 'left', 'right']:
if self.can_move(direction):
actions.append(direction)
return actions
def can_move(self, direction):
"""检查是否可以移动"""
board = self.board
if direction == 'up':
for j in range(4):
for i in range(1, 4):
if board[i][j] == 0 or board[i][j] == board[i-1][j]:
if board[i][j] != 0:
return True
# 上面有空格
for k in range(i-1, -1, -1):
if board[k][j] != 0:
break
if k == 0:
return True
return False
# ... 其他方向的逻辑类似
return False
def move(self, direction):
"""执行移动,返回是否成功"""
board = self.board
moved = False
if direction == 'left':
for i in range(4):
row = [x for x in board[i] if x != 0]
new_row = []
j = 0
while j < len(row):
if j + 1 < len(row) and row[j] == row[j+1]:
new_row.append(row[j] * 2)
j += 2
else:
new_row.append(row[j])
j += 1
new_row += [0] * (4 - len(new_row))
if new_row != board[i]:
moved = True
board[i] = new_row
# ... 其他方向类似
if moved:
self.add_random_tile()
return moved
def add_random_tile(self):
"""添加随机瓦片(90%是2,10%是4)"""
empty = [(i, j) for i in range(4) for j in range(4) if self.board[i][j] == 0]
if empty:
i, j = random.choice(empty)
self.board[i][j] = 4 if random.random() < 0.1 else 2
def is_game_over(self):
"""检查游戏是否结束"""
# 还有空格
for row in self.board:
if 0 in row:
return False
# 可以合并
for i in range(4):
for j in range(4):
if i < 3 and self.board[i][j] == self.board[i+1][j]:
return False
if j < 3 and self.board[i][j] == self.board[i][j+1]:
return False
return True
def get_score(self):
return sum(sum(row) for row in self.board)
def get_max_tile(self):
return max(max(row) for row in self.board)
class MCTS2048:
"""2048游戏的MCTS AI"""
def __init__(self, game: Game2048, simulations=100):
self.game = game
self.simulations = simulations
def selection(self, node):
"""UCB1选择"""
while not node.is_terminal():
if node.is_fully_expanded():
node = node.best_child()
else:
return node
return node
def expansion(self, node):
"""扩展"""
action = random.choice(node.untried_actions)
node.untried_actions.remove(action)
new_game = node.state.clone()
new_game.move(action)
new_node = GameNode(new_game, parent=node, action=action)
node.children[action] = new_node
return new_node
def simulation(self, node):
"""模拟"""
game = node.state.clone()
while not game.is_game_over():
actions = game.get_legal_actions()
if not actions:
break
action = random.choice(actions)
game.move(action)
return game.get_score()
def backpropagation(self, node, score):
"""回溯"""
while node:
node.visits += 1
node.wins += score
node = node.parent
def run(self):
"""运行MCTS并返回最佳动作"""
root = GameNode(self.game)
for _ in range(self.simulations):
node = self.selection(root)
if not node.is_terminal() and node.untried_actions:
node = self.expansion(node)
score = self.simulation(node)
self.backpropagation(node, score)
# 选择访问最多的动作
if not root.children:
return None
best_action = max(root.children.keys(),
key=lambda a: root.children[a].visits)
return best_action
class GameNode:
"""2048 MCTS节点"""
def __init__(self, state, parent=None, action=None):
self.state = state
self.parent = parent
self.action = action
self.children = {}
self.visits = 0
self.wins = 0
self.untried_actions = state.get_legal_actions()
def is_terminal(self):
return self.state.is_game_over()
def is_fully_expanded(self):
return len(self.untried_actions) == 0
def best_child(self, c=1.414):
return max(self.children.values(),
key=lambda n: n.wins/n.visits + c*math.sqrt(math.log(self.visits)/n.visits))
def demo_2048():
"""2048 AI演示"""
print("=" * 60)
print("2048游戏 MCTS AI")
print("=" * 60)
game = Game2048()
ai = MCTS2048(game, simulations=200)
print("初始状态:")
for row in game.board:
print(' '.join(f'{x:5}' for x in row))
moves = 0
while not game.is_game_over():
action = ai.run()
if action is None:
break
game.move(action)
moves += 1
if moves % 5 == 0:
print(f"\n第{moves}步后:")
for row in game.board:
print(' '.join(f'{x:5}' for x in row))
print(f"\n游戏结束! 分数: {game.get_score()}, 最大瓦片: {game.get_max_tile()}, 移动次数: {moves}")
if __name__ == "__main__":
demo_2048()9. 总结:随机算法的工程实践
9.1 何时使用随机算法?
| 场景 | 推荐算法 |
|---|---|
| 避免最坏情况 | 随机化快排、快速选择 |
| 简化算法设计 | Karger最小割 |
| 无法精确求解 | 蒙特卡洛积分 |
| 游戏AI | MCTS |
| 密码学 | Miller-Rabin |
| 负载均衡 | Power of Two Choices |
9.2 随机算法的优缺点
优点:
- 实现简单
- 期望性能好
- 避免对抗输入
- 适合并行化
缺点:
- 结果不确定
- 难以调试
- 需要好的随机数生成器
- 某些场景不可接受错误
9.3 实用技巧
- 种子固定:调试时固定随机种子,结果可复现
- 多次运行:对Monte Carlo算法,多次运行取最优
- 概率验证:输出结果时附带置信区间
- 混合策略:随机算法 + 确定性算法
# 调试时固定种子
random.seed(42)
# 多次运行取最优
best_result = None
best_score = float('-inf')
for _ in range(10):
result = monte_carlo_algorithm(...)
if score(result) > best_score:
best_score = score(result)
best_result = result参考来源
- Motwani, R., & Raghavan, P. (1995). Randomized Algorithms. Cambridge University Press.
- Karger, D. R. (1994). Random sampling in cut, flow, and network design problems. STOC ‘94.
- Browne, C. B., et al. (2012). A survey of Monte Carlo tree search methods. IEEE Transactions on Computational Intelligence and AI in Games.
- Karp, R. M., & Rabin, M. O. (1987). Efficient randomized pattern-matching algorithms. IBM Journal of Research and Development.
相关文档
- P_NP问题与NPC详解 - 计算复杂性基础
- 近似算法实战指南 - NP难问题的近似求解
- 组合优化应用实战 - 整数规划和组合优化
- 计算复杂性理论深度 - 复杂度类的完整理论