随机算法与蒙特卡洛方法实战

关键词速览

关键词解释
Las Vegas保证正确的随机算法,运行时间随机
Monte Carlo运行时间固定,结果可能有小概率错误
蒙特卡洛积分用随机采样估计积分值
MCTS蒙特卡洛树搜索,AlphaGo的核心
Miller-Rabin素数测试的随机化算法
Karger算法随机化最小割算法
期望时间算法运行时间的期望值
概率保证算法正确的概率下界

摘要

随机算法是一类在执行过程中使用随机数的算法。它们往往比确定性算法更简单、更快,甚至在某些情况下更强大。从排序算法到密码学,从游戏AI到科学计算,随机算法的应用无处不在。本文深入讲解Las Vegas和Monte Carlo两大随机算法类型,介绍蒙特卡洛方法的原理,通过代码实战展示如何在素数测试、最小割查找、游戏AI等场景中使用随机算法,以及如何分析随机算法的期望性能和概率保证。


0. 为什么需要随机算法?

0.1 一个生活中的例子

假设你要在一堆简历中找出最好的候选人,但你只有时间面试5个人。

确定性方法:面试前5份简历。但这可能不巧都是一般的。

随机方法:随机选5份简历面试。平均而言,这5份比随机挑的更有代表性。

这个例子展示了随机性的一个关键优势:随机选择可以避免最坏情况输入的陷阱

0.2 随机算法的发展史

随机算法的历史可以追溯到20世纪50年代:

  • 1957年:Monte Carlo方法用于核物理模拟
  • 1977年:Rabin和Miller提出素数测试的随机化算法
  • 1980年:Karp提出随机化归约
  • 1987年:随机化快速排序成为标准
  • 2006年:Coulom提出蒙特卡洛树搜索(MCTS)
  • 2016年:AlphaGo使用MCTS击败围棋世界冠军

0.3 随机算法的分类

随机算法
├── Las Vegas
│   └── 保证正确,结果确定
│   └── 运行时间随机
│   └── 例如:快速排序(随机化版本)
│
└── Monte Carlo
    ├── 运行时间确定
    ├── 结果可能小概率错误
    ├── 可以通过多次运行提高正确率
    └── 例如:Miller-Rabin素数测试

1. Las Vegas算法:永远正确,只是慢一点

1.1 什么是Las Vegas算法?

Las Vegas算法的特点是:保证得到正确答案,但运行时间是随机的

这听起来有点矛盾——如果总是正确,那”随机”体现在哪里?

关键在于:算法可能在某些随机选择上”运气不好”,需要重新尝试。但每次尝试都保证是正确的,只是可能要多试几次。

1.2 随机化快速排序

快速排序的随机化版本是一个经典的Las Vegas算法。

为什么需要随机化?

标准的快速排序如果每次选的枢轴恰好是最大或最小值,就会退化为O(n²)。但如果输入是”精心设计”的对抗数据,确定性算法可能被卡住。

随机化快速排序的思想是:随机选择枢轴。这样即使有人想”攻击”你的算法,也很难预测你的随机选择。

import random
from typing import List
 
def randomized_quicksort(arr: List[int]) -> List[int]:
    """随机化快速排序(Las Vegas:总是返回正确结果)"""
    if len(arr) <= 1:
        return arr
    
    # 随机选择枢轴
    pivot_idx = random.randint(0, len(arr) - 1)
    pivot = arr[pivot_idx]
    
    # 分区
    left = [x for x in arr if x < pivot]
    middle = [x for x in arr if x == pivot]
    right = [x for x in arr if x > pivot]
    
    # 递归排序
    return randomized_quicksort(left) + middle + randomized_quicksort(right)
 
 
def deterministic_quicksort(arr: List[int]) -> List[int]:
    """确定性快速排序(用第一个元素作为枢轴)"""
    if len(arr) <= 1:
        return arr
    
    pivot = arr[0]
    left = [x for x in arr[1:] if x < pivot]
    right = [x for x in arr[1:] if x >= pivot]
    
    return deterministic_quicksort(left) + [pivot] + deterministic_quicksort(right)
 
 
def worst_case_input(n: int) -> List[int]:
    """生成最坏情况输入(已排序数组)"""
    return list(range(n))
 
 
def test_quicksort():
    """测试随机化vs确定性快速排序"""
    import time
    
    print("=" * 60)
    print("随机化快速排序 vs 确定性快速排序")
    print("=" * 60)
    
    sizes = [1000, 5000, 10000]
    
    for n in sizes:
        # 正常情况
        arr = [random.randint(0, 10000) for _ in range(n)]
        
        start = time.time()
        randomized_quicksort(arr.copy())
        rand_time = time.time() - start
        
        start = time.time()
        deterministic_quicksort(arr.copy())
        det_time = time.time() - start
        
        print(f"\nn={n}(随机输入):")
        print(f"  随机化快排: {rand_time*1000:.2f}ms")
        print(f"  确定性快排: {det_time*1000:.2f}ms")
        
        # 最坏情况
        worst = worst_case_input(n)
        
        start = time.time()
        randomized_quicksort(worst.copy())
        rand_time = time.time() - start
        
        # 确定性快排在最坏情况下非常慢
        print(f"\nn={n}(已排序输入,最坏情况):")
        print(f"  随机化快排: {rand_time*1000:.2f}ms")
        print(f"  确定性快排: ⚠️ 非常慢(O(n²))")
 
 
if __name__ == "__main__":
    test_quicksort()

1.3 随机化选择算法

问题:在数组中找出第k小的元素。

确定性算法:用BFPRT(Median of Medians)可以达到O(n),但常数很大。

随机化算法:快速选择(Quickselect),期望O(n)。

def randomized_select(arr: List[int], k: int) -> int:
    """
    随机化选择算法:找第k小的元素
    期望时间: O(n)
    Las Vegas: 保证正确
    """
    if len(arr) == 1:
        return arr[0]
    
    # 随机选择枢轴
    pivot_idx = random.randint(0, len(arr) - 1)
    pivot = arr[pivot_idx]
    
    # 分区
    left = [x for x in arr if x < pivot]
    middle = [x for x in arr if x == pivot]
    right = [x for x in arr if x > pivot]
    
    if k < len(left):
        return randomized_select(left, k)
    elif k < len(left) + len(middle):
        return pivot
    else:
        return randomized_select(right, k - len(left) - len(middle))
 
 
def quickselect(arr: List[int], k: int) -> int:
    """原地版本的快速选择"""
    arr = arr.copy()
    
    while True:
        if len(arr) == 1:
            return arr[0]
        
        pivot_idx = random.randint(0, len(arr) - 1)
        pivot = arr[pivot_idx]
        
        # 三路分区
        i = 0
        less = []
        equal = []
        greater = []
        
        for x in arr:
            if x < pivot:
                less.append(x)
            elif x == pivot:
                equal.append(x)
            else:
                greater.append(x)
        
        if k < len(less):
            arr = less
        elif k < len(less) + len(equal):
            return pivot
        else:
            arr = greater
            k -= len(less) + len(equal)
 
 
def test_select():
    """测试随机化选择"""
    print("\n" + "=" * 60)
    print("随机化选择算法测试")
    print("=" * 60)
    
    arr = [random.randint(0, 100) for _ in range(20)]
    print(f"数组: {sorted(arr)}")
    
    for k in [0, 4, 9, 14, 19]:
        result = randomized_select(arr.copy(), k)
        expected = sorted(arr)[k]
        status = "✓" if result == expected else "✗"
        print(f"第{k+1}小的元素: {result} (期望: {expected}) {status}")
 
 
if __name__ == "__main__":
    test_select()

2. Monte Carlo算法:可能犯错,但概率很小

2.1 Monte Carlo算法原理

与Las Vegas相反,Monte Carlo算法的特点是:运行时间确定,但结果可能小概率错误

Monte Carlo算法的核心思想是:用随机采样来估计正确答案。只要采样足够多,估计值会以很高概率接近真实值。

2.2 蒙特卡洛积分

蒙特卡洛方法最经典的应用之一是计算积分。

问题:计算∫₀¹ f(x)dx,其中f(x)可能是复杂函数。

方法:在[0,1]×[0, max(f)]的矩形中随机投点,统计落在f(x)曲线下方的比例。

import random
import math
 
def monte_carlo_integral(f, a, b, max_f, n_samples=1000000):
    """
    蒙特卡洛积分
    f: 被积函数
    a, b: 积分区间
    max_f: f在[a,b]上的最大值估计
    n_samples: 采样点数
    """
    count = 0
    
    for _ in range(n_samples):
        x = random.uniform(a, b)
        y = random.uniform(0, max_f)
        
        if y <= f(x):
            count += 1
    
    area = (b - a) * max_f
    return area * (count / n_samples)
 
 
def estimate_pi(n_samples=1000000):
    """
    用蒙特卡洛方法估计π
    原理:单位圆面积 = π,在[-1,1]×[-1,1]的正方形中投点
    """
    count = 0
    
    for _ in range(n_samples):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        
        if x*x + y*y <= 1:
            count += 1
    
    # 正方形面积 = 4,圆面积 = π
    # count/n ≈ π/4
    return 4 * count / n_samples
 
 
def demo_monte_carlo_integration():
    """蒙特卡洛积分演示"""
    print("=" * 60)
    print("蒙特卡洛积分演示")
    print("=" * 60)
    
    # 估计π
    print("\n估计π:")
    for n in [100, 1000, 10000, 100000, 1000000]:
        pi_estimate = estimate_pi(n)
        error = abs(pi_estimate - math.pi)
        print(f"  n={n:>10}: π ≈ {pi_estimate:.8f}, 误差: {error:.8f}")
    
    print(f"\n真实π: {math.pi:.8f}")
    
    # 估计其他积分
    print("\n估计∫₀¹ x² dx = 1/3:")
    for n in [100, 1000, 10000, 100000]:
        result = monte_carlo_integral(lambda x: x**2, 0, 1, 1, n)
        print(f"  n={n:>10}: 积分 ≈ {result:.8f}")
    
    print(f"\n真实值: {1/3:.8f}")
    
    # 复杂函数的积分
    print("\n估计∫₀¹ sin(x) dx = 1 - cos(1):")
    for n in [100, 1000, 10000, 100000]:
        result = monte_carlo_integral(math.sin, 0, 1, 1, n)
        print(f"  n={n:>10}: 积分 ≈ {result:.8f}")
    
    print(f"\n真实值: {1 - math.cos(1):.8f}")
 
 
if __name__ == "__main__":
    demo_monte_carlo_integration()

2.3 蒙特卡洛积分的可视化

import matplotlib.pyplot as plt
import numpy as np
 
def visualize_pi_estimation(n_samples=5000):
    """可视化π的蒙特卡洛估计"""
    fig, axes = plt.subplots(1, 2, figsize=(14, 6))
    
    # 随机投点
    x_inside = []
    y_inside = []
    x_outside = []
    y_outside = []
    
    for _ in range(n_samples):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        
        if x*x + y*y <= 1:
            x_inside.append(x)
            y_inside.append(y)
        else:
            x_outside.append(x)
            y_outside.append(y)
    
    # 左图:散点图
    axes[0].scatter(x_outside, y_outside, c='red', s=1, alpha=0.5, label='在圆外')
    axes[0].scatter(x_inside, y_inside, c='blue', s=1, alpha=0.5, label='在圆内')
    
    circle = plt.Circle((0, 0), 1, fill=False, color='green', linewidth=2)
    axes[0].add_patch(circle)
    axes[0].set_xlim(-1.1, 1.1)
    axes[0].set_ylim(-1.1, 1.1)
    axes[0].set_aspect('equal')
    axes[0].legend()
    axes[0].set_title(f'蒙特卡洛π估计 (n={n_samples})')
    
    # 右图:估计值收敛图
    estimates = []
    for n in range(100, n_samples + 1, 100):
        count = 0
        for _ in range(100):
            x = random.uniform(-1, 1)
            y = random.uniform(-1, 1)
            if x*x + y*y <= 1:
                count += 1
        estimates.append(4 * count / 100)
    
    axes[1].plot(range(100, n_samples + 1, 100), estimates, 'b-', alpha=0.5)
    axes[1].axhline(y=math.pi, color='r', linestyle='--', label=f'π={math.pi:.6f}')
    axes[1].set_xlabel('采样数')
    axes[1].set_ylabel('π的估计')
    axes[1].set_title('π估计的收敛性')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('monte_carlo_pi.png', dpi=150)
    print("可视化已保存到 monte_carlo_pi.png")
 
 
if __name__ == "__main__":
    visualize_pi_estimation()

3. Miller-Rabin素数测试:密码学的基石

3.1 素数测试的重要性

素数测试是现代密码学的基石。RSA加密需要生成大素数,而测试一个几百位的大数是否是素数,必须用快速算法。

确定性素数测试:AKS算法可以在多项式时间内判定素数,但对于实际应用来说太慢了。

随机素数测试:Miller-Rabin测试可以在毫秒内测试一个100位以上的数,而且错误概率可以控制在极低水平。

3.2 Miller-Rabin算法原理

Miller-Rabin基于数论中的一个定理:

Fermat小定理:如果p是素数,a是任意不是p倍数的整数,则 a^(p-1) ≡ 1 (mod p)。

逆否命题:如果存在a使得 a^(n-1) ≢ 1 (mod n),则n一定是合数。

Miller-Rabin测试的思想是:随机选几个a,检查是否满足条件。如果任何一个a不满足,n一定是合数;如果都满足,n”很可能是”素数。

3.3 Python实现Miller-Rabin

import random
import math
 
def is_probable_prime(n: int, rounds: int = 10) -> bool:
    """
    Miller-Rabin素数测试
    返回: True表示"可能是素数",False表示一定是合数
    错误概率: <= 4^(-rounds)
    """
    if n < 2:
        return False
    if n == 2 or n == 3:
        return True
    if n % 2 == 0:
        return False
    
    # 将n-1写成 2^r * d 的形式
    r, d = 0, n - 1
    while d % 2 == 0:
        r += 1
        d //= 2
    
    def is_strong_probable_prime(a: int) -> bool:
        """检查a是否是n的强概率素数"""
        x = pow(a, d, n)  # a^d mod n
        
        if x == 1 or x == n - 1:
            return True
        
        for _ in range(r - 1):
            x = pow(x, 2, n)
            if x == n - 1:
                return True
        
        return False
    
    # 进行多轮测试
    for _ in range(rounds):
        a = random.randrange(2, n - 1)
        if not is_strong_probable_prime(a):
            return False
    
    return True
 
 
def generate_prime(bits: int = 256) -> int:
    """生成指定位数的素数"""
    while True:
        # 生成随机奇数
        n = random.getrandbits(bits) | (1 << bits - 1) | 1
        
        # Miller-Rabin测试
        if is_probable_prime(n, rounds=20):
            return n
 
 
def trial_division(n: int, limit: int = 1000) -> bool:
    """朴素试除法(用于小规模测试)"""
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    
    i = 3
    while i * i <= limit and i * i <= n:
        if n % i == 0:
            return False
        i += 2
    
    return True
 
 
def test_miller_rabin():
    """测试Miller-Rabin算法"""
    print("=" * 60)
    print("Miller-Rabin素数测试")
    print("=" * 60)
    
    # 测试小素数
    test_numbers = [2, 3, 17, 97, 127, 997, 1009, 10007]
    
    print("\n小规模素数测试:")
    for n in test_numbers:
        result = is_probable_prime(n)
        status = "✓" if trial_division(n, int(math.sqrt(n))+1) == result else "✗"
        print(f"  {n}: {result} {status}")
    
    # 测试合数
    print("\n合数测试:")
    composites = [4, 9, 15, 21, 91, 341, 561]  # 561是Carmichael数
    for n in composites:
        result = is_probable_prime(n)
        status = "✓" if result == False else "✗ (错误)"
        print(f"  {n}: {result} {status}")
    
    # 性能测试
    import time
    
    large_numbers = [
        104729,  # 10000以内的素数
        15485863,  # 100万以内的素数
        179424673,  # 1000万以内的素数
    ]
    
    print("\n大数性能测试:")
    for n in large_numbers:
        start = time.time()
        for _ in range(100):
            is_probable_prime(n)
        rand_time = time.time() - start
        
        start = time.time()
        for _ in range(100):
            trial_division(n, int(math.sqrt(n))+1)
        det_time = time.time() - start
        
        print(f"  {n}: Miller-Rabin={rand_time*1000:.2f}ms, 试除法={det_time*1000:.2f}ms")
    
    # 生成大素数
    print("\n生成大素数:")
    for bits in [64, 128, 256]:
        start = time.time()
        p = generate_prime(bits)
        gen_time = time.time() - start
        print(f"  {bits}位素数: 生成时间={gen_time*1000:.2f}ms")
        print(f"    {p}")
    
    # 错误概率分析
    print(f"\n错误概率分析:")
    print(f"  10轮测试: P(error) <= 4^(-10) = {4**(-10):.10f}")
    print(f"  20轮测试: P(error) <= 4^(-20) = {4**(-20):.20f}")
 
 
if __name__ == "__main__":
    test_miller_rabin()

4. Karger算法:随机化最小割

4.1 最小割问题

最小割(Minimum Cut):在图中找一组边,删除后图变得不连通,且这组边的总权重最小。

确定性算法:Stoer-Wagner算法,O(VE log V)。

随机化算法:Karger算法更简单、更快,虽然结果可能不是最优,但可以通过重复运行来提高正确率。

4.2 Karger算法原理

Karger算法的核心操作是边收缩(Edge Contraction)

1. 随机选择一条边(u, v)
2. 将u和v合并成一个新顶点
3. 删除自环
4. 重复直到只剩2个顶点
5. 这2个顶点之间的边就是割

为什么能工作?

如果最小割有k条边,每条边被选中的概率是1/C(C-1),其中C是顶点数。在收缩过程中,最小割被破坏的概率随着C减小而增加。

重复运行:运行Karger算法N次,选择最小的割。正确率可以通过N来控制。

4.3 Python实现Karger算法

import random
from typing import Set, Tuple, Dict, List
import networkx as nx
 
class KargerMinCut:
    """Karger随机化最小割算法"""
    
    def __init__(self, graph: nx.Graph):
        self.original_graph = graph
        self.n = graph.number_of_nodes()
    
    def contract_edge(self, graph: nx.Graph) -> nx.Graph:
        """随机收缩一条边"""
        edges = list(graph.edges())
        if len(edges) == 0:
            return graph
        
        # 随机选择一条边
        u, v = random.choice(edges)
        
        # 创建新图
        new_graph = nx.Graph()
        
        # 映射旧顶点到新顶点
        contraction = {v: u}  # v被合并到u
        
        # 添加所有顶点和边
        for node in graph.nodes():
            if node == v:
                continue
            new_node = contraction.get(node, node)
            
            for neighbor in graph.neighbors(node):
                if neighbor == v:
                    continue
                new_neighbor = contraction.get(neighbor, neighbor)
                
                if new_node != new_neighbor:  # 跳过自环
                    if new_graph.has_edge(new_node, new_neighbor):
                        new_graph[new_node][new_neighbor]['weight'] += graph[node][neighbor].get('weight', 1)
                    else:
                        new_graph.add_edge(new_node, new_neighbor, 
                                         weight=graph[node][neighbor].get('weight', 1))
        
        return new_graph
    
    def min_cut_once(self) -> Tuple[int, Set[Tuple[int, int]]]:
        """运行一次Karger算法"""
        G = self.original_graph.copy()
        
        while G.number_of_nodes() > 2:
            G = self.contract_edge(G)
        
        # 最后两个顶点之间的边就是割
        remaining_nodes = list(G.nodes())
        u, v = remaining_nodes[0], remaining_nodes[1]
        
        cut_edges = set()
        for edge in G.edges():
            cut_edges.add(tuple(sorted(edge)))
        
        cut_weight = sum(G[u][v].get('weight', 1) for _ in G.neighbors(v) if u in G.neighbors(v))
        
        return cut_weight, cut_edges
    
    def min_cut(self, trials: int = None) -> Tuple[int, Set[Tuple[int, int]]]:
        """
        多次运行Karger算法
        如果不指定trials,使用理论最优值: n*n/2
        """
        if trials is None:
            trials = self.n * self.n // 2
        
        best_cut = float('inf')
        best_edges = set()
        
        for _ in range(trials):
            cut_weight, cut_edges = self.min_cut_once()
            if cut_weight < best_cut:
                best_cut = cut_weight
                best_edges = cut_edges
        
        return best_cut, best_edges
 
 
def test_karger():
    """测试Karger算法"""
    print("=" * 60)
    print("Karger随机化最小割算法")
    print("=" * 60)
    
    # 创建测试图
    G = nx.Graph()
    G.add_edge(1, 2, weight=1)
    G.add_edge(1, 3, weight=1)
    G.add_edge(1, 4, weight=1)
    G.add_edge(2, 3, weight=1)
    G.add_edge(3, 4, weight=1)
    G.add_edge(4, 5, weight=1)
    G.add_edge(4, 6, weight=1)
    G.add_edge(5, 6, weight=1)
    
    print("\n图结构:")
    print(f"顶点数: {G.number_of_nodes()}")
    print(f"边数: {G.number_of_edges()}")
    
    # 使用Karger算法
    karger = KargerMinCut(G)
    
    # 单次运行
    cut_weight, cut_edges = karger.min_cut_once()
    print(f"\n单次Karger算法:")
    print(f"  最小割权重: {cut_weight}")
    print(f"  割边: {cut_edges}")
    
    # 多次运行
    cut_weight, cut_edges = karger.min_cut(trials=100)
    print(f"\n100次运行取最优:")
    print(f"  最小割权重: {cut_weight}")
    print(f"  割边: {cut_edges}")
    
    # 对比确定性算法
    print("\n--- 对比确定性算法 ---")
    stoer Wagner not available, using networkx's built-in
    if hasattr(nx, 'stoer_wagner'):
        min_cut_weight, partition = nx.stoer_wagner(G)
        print(f"Stoer-Wagner结果: {min_cut_weight}")
    
    # 更复杂的图测试
    print("\n" + "=" * 60)
    print("复杂图测试")
    print("=" * 60)
    
    # Erdős–Rényi随机图
    for n in [20, 50]:
        G_random = nx.gnp_random_graph(n, 0.3, seed=42)
        
        karger = KargerMinCut(G_random)
        cut_weight, _ = karger.min_cut(trials=n * n // 2)
        
        print(f"\nn={n}的随机图:")
        print(f"  Karger最小割: {cut_weight}")
        print(f"  边数: {G_random.number_of_edges()}")
 
 
if __name__ == "__main__":
    test_karger()

5. 蒙特卡洛树搜索:游戏AI的核心

5.1 MCTS原理

蒙特卡洛树搜索(Monte Carlo Tree Search, MCTS)是近年来最重要的随机算法之一,它让计算机在围棋、象棋等复杂博弈游戏中达到了超越人类的水平。

MCTS的核心循环

1. 选择(Selection):从根节点开始,用UCB1选择子节点,直到找到未完全展开的节点
2. 扩展(Expansion):添加一个新的子节点
3. 模拟(Simulation):随机 rollout 直到游戏结束
4. 回溯(Backpropagation):更新路径上所有节点的统计信息

UCB1公式

UCB1(u) = Q(u)/N(u) + C × √(ln(N(parent)) / N(u))

其中Q是平均得分,N是访问次数,C是探索常数。

5.2 Python实现MCTS

import math
import random
from typing import Dict, Optional, Tuple, List
import copy
 
class MCTSNode:
    """MCTS树节点"""
    
    def __init__(self, state, parent=None, action=None):
        self.state = state  # 游戏状态
        self.parent = parent  # 父节点
        self.action = action  # 从父节点到当前节点的动作
        self.children: Dict[any, MCTSNode] = {}  # 子节点
        self.visits = 0  # 访问次数
        self.wins = 0  # 胜利次数
        self.untried_actions = state.get_legal_actions()
    
    def ucb1(self, c=1.414) -> float:
        """UCB1公式"""
        if self.visits == 0:
            return float('inf')
        
        exploitation = self.wins / self.visits
        exploration = c * math.sqrt(math.log(self.parent.visits) / self.visits)
        return exploitation + exploration
    
    def is_fully_expanded(self) -> bool:
        return len(self.untried_actions) == 0
    
    def is_terminal(self) -> bool:
        return self.state.is_game_over()
 
 
class TicTacToe:
    """井字棋游戏"""
    
    def __init__(self, board=None, current_player=1):
        if board is None:
            self.board = [[0, 0, 0] for _ in range(3)]
        else:
            self.board = [row[:] for row in board]
        self.current_player = current_player  # 1=X, -1=O
    
    def clone(self):
        return TicTacToe(self.board, self.current_player)
    
    def get_legal_actions(self) -> List[Tuple[int, int]]:
        return [(i, j) for i in range(3) for j in range(3) 
                if self.board[i][j] == 0]
    
    def apply_action(self, action: Tuple[int, int]):
        i, j = action
        self.board[i][j] = self.current_player
        self.current_player *= -1
    
    def is_game_over(self) -> bool:
        # 检查行
        for i in range(3):
            if self.board[i][0] == self.board[i][1] == self.board[i][2] != 0:
                return True
        # 检查列
        for j in range(3):
            if self.board[0][j] == self.board[1][j] == self.board[2][j] != 0:
                return True
        # 检查对角线
        if self.board[0][0] == self.board[1][1] == self.board[2][2] != 0:
            return True
        if self.board[0][2] == self.board[1][1] == self.board[2][0] != 0:
            return True
        # 检查平局
        if all(self.board[i][j] != 0 for i in range(3) for j in range(3)):
            return True
        return False
    
    def get_winner(self) -> int:
        """返回胜者: 1=X赢, -1=O赢, 0=平局, None=游戏未结束"""
        # 检查行
        for i in range(3):
            if self.board[i][0] == self.board[i][1] == self.board[i][2] != 0:
                return self.board[i][0]
        # 检查列
        for j in range(3):
            if self.board[0][j] == self.board[1][j] == self.board[2][j] != 0:
                return self.board[0][j]
        # 检查对角线
        if self.board[0][0] == self.board[1][1] == self.board[2][2] != 0:
            return self.board[0][0]
        if self.board[0][2] == self.board[1][1] == self.board[2][0] != 0:
            return self.board[0][2]
        # 检查平局
        if all(self.board[i][j] != 0 for i in range(3) for j in range(3)):
            return 0
        return None
    
    def get_result(self, player: int) -> float:
        """返回player视角的结果: 1=赢, 0.5=平, 0=输"""
        winner = self.get_winner()
        if winner is None:
            return 0.5  # 游戏未结束
        if winner == player:
            return 1.0
        elif winner == 0:
            return 0.5
        else:
            return 0.0
    
    def print_board(self):
        symbols = {0: '.', 1: 'X', -1: 'O'}
        for row in self.board:
            print(' '.join(symbols[x] for x in row))
        print()
 
 
class MCTS:
    """蒙特卡洛树搜索"""
    
    def __init__(self, game: TicTacToe, simulations=1000):
        self.game = game
        self.simulations = simulations
        self.root = MCTSNode(game)
    
    def selection(self, node: MCTSNode) -> MCTSNode:
        """选择:UCB1最高的完全展开节点"""
        while not node.is_terminal():
            if node.is_fully_expanded():
                node = self.best_child(node)
            else:
                return node
        return node
    
    def expansion(self, node: MCTSNode) -> MCTSNode:
        """扩展:添加一个新子节点"""
        action = random.choice(node.untried_actions)
        node.untried_actions.remove(action)
        
        new_state = node.state.clone()
        new_state.apply_action(action)
        
        new_node = MCTSNode(new_state, parent=node, action=action)
        node.children[action] = new_node
        return new_node
    
    def simulation(self, node: MCTSNode) -> float:
        """模拟:随机 rollout"""
        state = node.state.clone()
        player = 1  # X
        
        while not state.is_game_over():
            actions = state.get_legal_actions()
            action = random.choice(actions)
            state.apply_action(action)
        
        return state.get_result(player)
    
    def backpropagation(self, node: MCTSNode, result: float):
        """回溯:更新统计信息"""
        while node is not None:
            node.visits += 1
            node.wins += result
            node = node.parent
    
    def run(self) -> Tuple[int, int]:
        """运行MCTS,返回最佳动作"""
        for _ in range(self.simulations):
            # 选择
            node = self.selection(self.root)
            
            # 扩展
            if not node.is_terminal() and node.untried_actions:
                node = self.expansion(node)
            
            # 模拟
            result = self.simulation(node)
            
            # 回溯
            self.backpropagation(node, result)
        
        # 选择访问次数最多的子节点
        best_action = max(self.root.children.keys(),
                         key=lambda a: self.root.children[a].visits)
        return best_action
    
    def update_root(self, action: Tuple[int, int]):
        """更新根节点(对手走后)"""
        if action in self.root.children:
            self.root = self.root.children[action]
            self.root.parent = None
        else:
            # 对方走了我们没有探索的动作,重置
            self.game.apply_action(action)
            self.root = MCTSNode(self.game)
 
 
def demo_mcts_tictactoe():
    """MCTS井字棋演示"""
    print("=" * 60)
    print("MCTS井字棋 AI vs 随机对手")
    print("=" * 60)
    
    game = TicTacToe()
    mcts = MCTS(game, simulations=1000)
    
    print("初始棋盘:")
    game.print_board()
    
    # AI先手
    ai_player = 1
    
    for turn in range(9):
        if game.current_player == ai_player:
            # AI的回合
            action = mcts.run()
            game.apply_action(action)
            mcts.update_root(action)
            print(f"AI (X) 下在 ({action[0]}, {action[1]})")
        else:
            # 随机对手
            actions = game.get_legal_actions()
            action = random.choice(actions)
            game.apply_action(action)
            mcts.update_root(action)
            print(f"对手 (O) 下在 ({action[0]}, {action[1]})")
        
        game.print_board()
        
        if game.is_game_over():
            winner = game.get_winner()
            if winner == ai_player:
                print("AI获胜!")
            elif winner == 0:
                print("平局!")
            else:
                print("对手获胜!")
            break
 
 
def mcts_vs_mcts(n_games=100):
    """MCTS AI vs MCTS AI"""
    print("\n" + "=" * 60)
    print(f"MCTS vs MCTS: {n_games}局对弈统计")
    print("=" * 60)
    
    results = {'X_wins': 0, 'O_wins': 0, 'draws': 0}
    
    for game_num in range(n_games):
        game = TicTacToe()
        mcts = MCTS(game, simulations=500)
        
        while not game.is_game_over():
            action = mcts.run()
            game.apply_action(action)
            mcts.update_root(action)
        
        winner = game.get_winner()
        if winner == 1:
            results['X_wins'] += 1
        elif winner == -1:
            results['O_wins'] += 1
        else:
            results['draws'] += 1
    
    print(f"X (先手) 胜: {results['X_wins']}")
    print(f"O (后手) 胜: {results['O_wins']}")
    print(f"平局: {results['draws']}")
 
 
if __name__ == "__main__":
    demo_mcts_tictactoe()
    mcts_vs_mcts(100)

6. 随机化负载均衡:Power of Two Choices

6.1 负载均衡问题

问题:有n个服务器,m个请求。如何分配请求使得负载均衡?

简单哈希:hash(request) % n。可能导致负载不均(哈希碰撞)。

随机选择:随机选两个服务器,选择负载较小的那个。

这就是”Power of Two Choices”(两种选择的威力)!

6.2 为什么两种选择就够了?

数学上可以证明:使用两种选择,最大负载从O(log n / log log n)降到O(log log n)。

这意味着:即使只有两种选择,负载均衡也好得多。

6.3 Python实现

import random
from collections import defaultdict
import matplotlib.pyplot as plt
 
class LoadBalancer:
    """随机化负载均衡器"""
    
    def __init__(self, n_servers: int):
        self.n_servers = n_servers
        self.loads = [0] * n_servers
        self.assignments = []
    
    def random_assignment(self, request_id: int) -> int:
        """简单随机分配"""
        server = random.randint(0, self.n_servers - 1)
        self.loads[server] += 1
        self.assignments.append(server)
        return server
    
    def two_choices(self, request_id: int) -> int:
        """两种选择"""
        # 随机选两个服务器
        s1 = random.randint(0, self.n_servers - 1)
        s2 = random.randint(0, self.n_servers - 1)
        
        # 选择负载较小的
        if self.loads[s1] <= self.loads[s2]:
            chosen = s1
        else:
            chosen = s2
        
        self.loads[chosen] += 1
        self.assignments.append(chosen)
        return chosen
    
    def get_max_load(self) -> int:
        return max(self.loads)
    
    def get_load_variance(self) -> float:
        mean = sum(self.loads) / self.n_servers
        return sum((l - mean) ** 2 for l in self.loads) / self.n_servers
    
    def reset(self):
        self.loads = [0] * self.n_servers
        self.assignments = []
 
 
def test_load_balancing():
    """测试负载均衡算法"""
    print("=" * 60)
    print("负载均衡算法测试")
    print("=" * 60)
    
    n_servers = 100
    n_requests = 10000
    
    # 测试简单随机
    lb1 = LoadBalancer(n_servers)
    for i in range(n_requests):
        lb1.random_assignment(i)
    
    print(f"\n简单随机分配 (n={n_requests}请求, {n_servers}服务器):")
    print(f"  最大负载: {lb1.get_max_load()}")
    print(f"  负载方差: {lb1.get_load_variance():.2f}")
    print(f"  理想负载: {n_requests / n_servers:.2f}")
    
    # 测试两种选择
    lb2 = LoadBalancer(n_servers)
    for i in range(n_requests):
        lb2.two_choices(i)
    
    print(f"\n两种选择 (n={n_requests}请求, {n_servers}服务器):")
    print(f"  最大负载: {lb2.get_max_load()}")
    print(f"  负载方差: {lb2.get_load_variance():.2f}")
    print(f"  理想负载: {n_requests / n_servers:.2f}")
    
    # 多次实验统计
    print(f"\n" + "=" * 60)
    print("大规模统计 (100次实验)")
    print("=" * 60)
    
    n_experiments = 100
    n_servers_list = [10, 50, 100, 500]
    n_requests = 10000
    
    results = {'random': [], 'two_choices': []}
    
    for _ in range(n_experiments):
        lb1 = LoadBalancer(n_servers)
        for i in range(n_requests):
            lb1.random_assignment(i)
        results['random'].append(lb1.get_max_load())
        
        lb2 = LoadBalancer(n_servers)
        for i in range(n_requests):
            lb2.two_choices(i)
        results['two_choices'].append(lb2.get_max_load())
    
    for n_servers in n_servers_list:
        lb1 = LoadBalancer(n_servers)
        lb2 = LoadBalancer(n_servers)
        
        for i in range(n_requests):
            lb1.random_assignment(i)
            lb2.two_choices(i)
        
        print(f"\nn={n_servers}服务器, {n_requests}请求:")
        print(f"  随机分配 - 最大负载: {lb1.get_max_load():.2f}")
        print(f"  两种选择 - 最大负载: {lb2.get_max_load():.2f}")
        print(f"  理论最大负载: {n_requests / n_servers:.2f}")
 
 
def visualize_load_distribution():
    """可视化负载分布"""
    n_servers = 50
    n_requests = 5000
    
    lb1 = LoadBalancer(n_servers)
    lb2 = LoadBalancer(n_servers)
    
    for i in range(n_requests):
        lb1.random_assignment(i)
        lb2.two_choices(i)
    
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    axes[0].bar(range(n_servers), lb1.loads, color='steelblue')
    axes[0].axhline(y=n_requests/n_servers, color='r', linestyle='--', 
                   label=f'理想负载={n_requests/n_servers:.1f}')
    axes[0].set_xlabel('服务器编号')
    axes[0].set_ylabel('负载')
    axes[0].set_title(f'简单随机分配 (max={lb1.get_max_load()})')
    axes[0].legend()
    
    axes[1].bar(range(n_servers), lb2.loads, color='darkorange')
    axes[1].axhline(y=n_requests/n_servers, color='r', linestyle='--',
                   label=f'理想负载={n_requests/n_servers:.1f}')
    axes[1].set_xlabel('服务器编号')
    axes[1].set_ylabel('负载')
    axes[1].set_title(f'两种选择 (max={lb2.get_max_load()})')
    axes[1].legend()
    
    plt.tight_layout()
    plt.savefig('load_balancing.png', dpi=150)
    print("可视化已保存到 load_balancing.png")
 
 
if __name__ == "__main__":
    test_load_balancing()
    visualize_load_distribution()

7. 随机化算法分析

7.1 期望时间分析

随机化算法的”快”通常指的是期望运行时间

定义:设T(x)是算法在输入x上的运行时间,则期望运行时间是E[T(x)],其中期望是关于算法内部随机选择的。

分析方法

  • 线性期望可加
  • 全概率公式
  • 指示器随机变量

7.2 概率保证

随机化算法通常给出概率保证

  • “以概率≥1-1/n成功”
  • “期望O(n log n),且以≥1-1/n²的概率至少这么快”

例子:快速排序的期望比较次数是O(n log n)。但实际上,运气不好时可能达到O(n²)。我们可以证明:以概率≥1-1/n²,运行时间不超过某个界限。

7.3 尾部不等式

Chernoff界:对于独立随机变量的和,尾部概率呈指数下降。

设X = X₁ + X₂ + … + Xₙ,每个Xᵢ是[0,1]的独立随机变量,E[Xᵢ] = pᵢ。

则对于δ > 0:

  • P[X > (1+δ)μ] ≤ exp(-δ²μ/3)
  • P[X < (1-δ)μ] ≤ exp(-δ²μ/2)

应用:如果负载均衡中每个请求独立随机分配,则最大负载以很高概率接近平均值。


8. 高级应用:MCTS实现2048游戏AI

class Game2048:
    """2048游戏"""
    
    def __init__(self, board=None):
        if board is None:
            self.board = [[0]*4 for _ in range(4)]
            self.add_random_tile()
            self.add_random_tile()
        else:
            self.board = [row[:] for row in board]
    
    def clone(self):
        return Game2048(self.board)
    
    def get_legal_actions(self):
        """返回可执行的动作: 'up', 'down', 'left', 'right'"""
        actions = []
        for direction in ['up', 'down', 'left', 'right']:
            if self.can_move(direction):
                actions.append(direction)
        return actions
    
    def can_move(self, direction):
        """检查是否可以移动"""
        board = self.board
        
        if direction == 'up':
            for j in range(4):
                for i in range(1, 4):
                    if board[i][j] == 0 or board[i][j] == board[i-1][j]:
                        if board[i][j] != 0:
                            return True
                        # 上面有空格
                        for k in range(i-1, -1, -1):
                            if board[k][j] != 0:
                                break
                            if k == 0:
                                return True
            return False
        
        # ... 其他方向的逻辑类似
        return False
    
    def move(self, direction):
        """执行移动,返回是否成功"""
        board = self.board
        moved = False
        
        if direction == 'left':
            for i in range(4):
                row = [x for x in board[i] if x != 0]
                new_row = []
                j = 0
                while j < len(row):
                    if j + 1 < len(row) and row[j] == row[j+1]:
                        new_row.append(row[j] * 2)
                        j += 2
                    else:
                        new_row.append(row[j])
                        j += 1
                new_row += [0] * (4 - len(new_row))
                if new_row != board[i]:
                    moved = True
                board[i] = new_row
        
        # ... 其他方向类似
        
        if moved:
            self.add_random_tile()
        
        return moved
    
    def add_random_tile(self):
        """添加随机瓦片(90%是2,10%是4)"""
        empty = [(i, j) for i in range(4) for j in range(4) if self.board[i][j] == 0]
        if empty:
            i, j = random.choice(empty)
            self.board[i][j] = 4 if random.random() < 0.1 else 2
    
    def is_game_over(self):
        """检查游戏是否结束"""
        # 还有空格
        for row in self.board:
            if 0 in row:
                return False
        # 可以合并
        for i in range(4):
            for j in range(4):
                if i < 3 and self.board[i][j] == self.board[i+1][j]:
                    return False
                if j < 3 and self.board[i][j] == self.board[i][j+1]:
                    return False
        return True
    
    def get_score(self):
        return sum(sum(row) for row in self.board)
    
    def get_max_tile(self):
        return max(max(row) for row in self.board)
 
 
class MCTS2048:
    """2048游戏的MCTS AI"""
    
    def __init__(self, game: Game2048, simulations=100):
        self.game = game
        self.simulations = simulations
    
    def selection(self, node):
        """UCB1选择"""
        while not node.is_terminal():
            if node.is_fully_expanded():
                node = node.best_child()
            else:
                return node
        return node
    
    def expansion(self, node):
        """扩展"""
        action = random.choice(node.untried_actions)
        node.untried_actions.remove(action)
        
        new_game = node.state.clone()
        new_game.move(action)
        
        new_node = GameNode(new_game, parent=node, action=action)
        node.children[action] = new_node
        return new_node
    
    def simulation(self, node):
        """模拟"""
        game = node.state.clone()
        
        while not game.is_game_over():
            actions = game.get_legal_actions()
            if not actions:
                break
            action = random.choice(actions)
            game.move(action)
        
        return game.get_score()
    
    def backpropagation(self, node, score):
        """回溯"""
        while node:
            node.visits += 1
            node.wins += score
            node = node.parent
    
    def run(self):
        """运行MCTS并返回最佳动作"""
        root = GameNode(self.game)
        
        for _ in range(self.simulations):
            node = self.selection(root)
            
            if not node.is_terminal() and node.untried_actions:
                node = self.expansion(node)
            
            score = self.simulation(node)
            self.backpropagation(node, score)
        
        # 选择访问最多的动作
        if not root.children:
            return None
        
        best_action = max(root.children.keys(),
                         key=lambda a: root.children[a].visits)
        return best_action
 
 
class GameNode:
    """2048 MCTS节点"""
    
    def __init__(self, state, parent=None, action=None):
        self.state = state
        self.parent = parent
        self.action = action
        self.children = {}
        self.visits = 0
        self.wins = 0
        self.untried_actions = state.get_legal_actions()
    
    def is_terminal(self):
        return self.state.is_game_over()
    
    def is_fully_expanded(self):
        return len(self.untried_actions) == 0
    
    def best_child(self, c=1.414):
        return max(self.children.values(),
                  key=lambda n: n.wins/n.visits + c*math.sqrt(math.log(self.visits)/n.visits))
 
 
def demo_2048():
    """2048 AI演示"""
    print("=" * 60)
    print("2048游戏 MCTS AI")
    print("=" * 60)
    
    game = Game2048()
    ai = MCTS2048(game, simulations=200)
    
    print("初始状态:")
    for row in game.board:
        print(' '.join(f'{x:5}' for x in row))
    
    moves = 0
    while not game.is_game_over():
        action = ai.run()
        if action is None:
            break
        
        game.move(action)
        moves += 1
        
        if moves % 5 == 0:
            print(f"\n{moves}步后:")
            for row in game.board:
                print(' '.join(f'{x:5}' for x in row))
    
    print(f"\n游戏结束! 分数: {game.get_score()}, 最大瓦片: {game.get_max_tile()}, 移动次数: {moves}")
 
 
if __name__ == "__main__":
    demo_2048()

9. 总结:随机算法的工程实践

9.1 何时使用随机算法?

场景推荐算法
避免最坏情况随机化快排、快速选择
简化算法设计Karger最小割
无法精确求解蒙特卡洛积分
游戏AIMCTS
密码学Miller-Rabin
负载均衡Power of Two Choices

9.2 随机算法的优缺点

优点

  • 实现简单
  • 期望性能好
  • 避免对抗输入
  • 适合并行化

缺点

  • 结果不确定
  • 难以调试
  • 需要好的随机数生成器
  • 某些场景不可接受错误

9.3 实用技巧

  1. 种子固定:调试时固定随机种子,结果可复现
  2. 多次运行:对Monte Carlo算法,多次运行取最优
  3. 概率验证:输出结果时附带置信区间
  4. 混合策略:随机算法 + 确定性算法
# 调试时固定种子
random.seed(42)
 
# 多次运行取最优
best_result = None
best_score = float('-inf')
for _ in range(10):
    result = monte_carlo_algorithm(...)
    if score(result) > best_score:
        best_score = score(result)
        best_result = result

参考来源

  1. Motwani, R., & Raghavan, P. (1995). Randomized Algorithms. Cambridge University Press.
  2. Karger, D. R. (1994). Random sampling in cut, flow, and network design problems. STOC ‘94.
  3. Browne, C. B., et al. (2012). A survey of Monte Carlo tree search methods. IEEE Transactions on Computational Intelligence and AI in Games.
  4. Karp, R. M., & Rabin, M. O. (1987). Efficient randomized pattern-matching algorithms. IBM Journal of Research and Development.

相关文档