随机算法深度

关键词速览

关键词解释
Las Vegas必定正确但运行时间随机
Monte Carlo运行时间确定但可能出错
Karger算法随机化最小割算法
随机化快速排序期望O(n log n)排序
概率分析期望性能的数学分析
PCP定理概率可检验证明
随机游走随机过程的图论应用
2-SAT随机游走求解
随机化在ML随机梯度下降等
Chernoff界独立和的尾界
Hoeffding界有界变量的浓度不等式
马尔可夫链随机过程的数学模型
布雷斯悖论网络中的反直觉现象
布隆过滤器空间高效的概率数据结构
跳表随机化平衡搜索树
概率方法用期望证明存在性
随机化负载均衡随机分配策略
去随机化消除随机性的技术
Yao原理期望竞争比下界
概率剪枝随机化搜索优化

摘要

随机算法利用随机性来简化算法设计、提高效率或绕过问题的固有难度。本文系统阐述随机算法的两大范式——Las Vegas算法(保证正确性,运行时间随机)和Monte Carlo算法(确定时间,可能出错),深入分析随机快速排序、Karger最小割算法、随机游走求解2-SAT等经典技术,并探讨PCP定理与计算难度的联系。随机算法不仅在理论计算机科学中占据核心地位,更是现代机器学习、密码学和分布式系统的基石——从随机梯度下降到差分隐私,从蒙特卡洛树搜索到量子计算中的随机算法,处处可见随机性的身影。

本文将从以下维度展开:首先建立随机算法的数学基础,包括概率论核心工具(Chernoff界、Hoeffding界等);然后系统梳理Las Vegas与Monte Carlo两大范式的经典案例;接着深入探讨概率方法(probabilistic method)的存在性证明技术;随后分析随机化在现代机器学习中的核心应用;再讨论去随机化(derandomization)技术;最后探索前沿方向,包括随机算法的电路复杂性、密码学安全性和量子随机性。


1. 随机算法的两种范式

1.1 Las Vegas算法

Las Vegas算法的核心特征:

  • 保证正确性:输出必定正确
  • 随机运行时间:运行时间依赖随机选择
  • 典型应用:快速排序、随机化选择

形式化定义

Las Vegas的哲学

Las Vegas算法体现了一种”宁缺毋滥”的思想:宁可花费更多时间,也要保证结果的正确性。这在需要精确解的场景中尤为重要。

典型的Las Vegas算法

  1. 随机化快速排序:运行时间随机,正确性确定
  2. 随机化选择算法(QuickSelect):期望线性时间找到第k小元素
  3. 随机化字符串匹配:精确匹配带通配符的模式
  4. 随机化素数测试(Miller-Rabin):虽然有概率错误,但可以迭代消除错误

1.2 Monte Carlo算法

Monte Carlo算法的特征:

  • 确定运行时间:时间不依赖随机性
  • 可能出错:存在失败概率
  • 可调精度:通过重复提高成功率

形式化定义

两类错误

  • 单边错误:只可能返回错误答案(false positive/negative)
  • 双边错误:可能返回任意错误

Monte Carlo的策略

Monte Carlo算法的核心思想是”用时间换确定性”。通过独立重复运行,可以将错误概率指数级降低:

def amplify_monte_carlo(algorithm, instance, epsilon, delta):
    """
    错误概率放大
    
    若原算法错误概率 ≤ 1/3,则重复 log(1/delta) 次取多数投票
    新错误概率 ≤ delta
    """
    t = int(math.log(1/delta) / math.log(3))  # 基于 1/3 的基础错误率
    
    votes = []
    for _ in range(t):
        votes.append(algorithm(instance))
    
    # 多数投票
    return majority_vote(votes)

两类算法的对比

特征Las VegasMonte Carlo
正确性100%可能失败
运行时间随机确定
应用场景排序、搜索、精确匹配优化、近似、采样
错误控制N/A重复采样
典型代表QuickSortKarger MinCut

Note

许多NP完全问题存在指数时间的Monte Carlo算法(如素数测试),但在确定型模型下需要指数时间。Miller-Rabin素数测试在 时间内以 的单次错误率判断一个数是否为素数;重复 次可将错误率降至

1.3 随机算法的历史与里程碑

随机算法的发展历程是一部理论与实践相互驱动的历史:

1940s-1960s:萌芽期

  • Metropolis算法(1949):统计物理中的蒙特卡洛方法
  • 蒙特卡洛树搜索概念的雏形
  • 随机投点法计算积分

1970s-1980s:理论奠基

  • 1973:Rabin提出随机化字符串匹配
  • 1977:Solovay-Strassen素数测试
  • 1980:Freivalds提出随机矩阵乘法验证
  • 1983:随机化快速排序的期望分析成熟

1990s:PCP革命

  • 1990:PCP定理的雏形
  • 1992:Karger算法的提出
  • 1995:Goemans-Williamson的半定规划近似
  • 1997:差分隐私概念的萌芽

2000s-现在:深度应用

  • 随机梯度下降成为机器学习核心
  • 随机化数值线性代数(RandNLA)
  • 张量方法的随机算法
  • 量子随机性(叠加态测量)

2. 随机化快速排序分析

2.1 算法描述

随机化快速排序通过随机选择pivot打破最坏情况:

import random
 
def randomized_quicksort(A, lo, hi):
    """
    随机化快速排序
    
    期望时间: O(n log n)
    空间复杂度: O(log n) 递归栈 + O(1) 原地分区
    """
    if lo < hi:
        # 随机选择pivot
        pivot_idx = random.randint(lo, hi)
        A[lo], A[pivot_idx] = A[pivot_idx], A[lo]
        
        # 分区
        p = partition(A, lo, hi)
        
        # 递归排序
        randomized_quicksort(A, lo, p - 1)
        randomized_quicksort(A, p + 1, hi)
 
def partition(A, lo, hi):
    """
    Lomuto分区方案
    
    假设A[hi]是pivot
    """
    pivot = A[hi]
    i = lo - 1
    
    for j in range(lo, hi):
        if A[j] <= pivot:
            i += 1
            A[i], A[j] = A[j], A[i]
    
    A[i + 1], A[hi] = A[hi], A[i + 1]
    return i + 1

2.2 期望运行时间分析

定理:随机化快速排序的期望运行时间为

关键引理:对于大小为 的数组,选择第 大元素的概率为

递归方程: 设 为期望运行时间,

其中 是pivot的排名,均匀分布在

证明:观察到左右子问题对称:

使用归纳法可证明 (取合适常数 )。

详细推导

为归纳假设。对于

其中 是某个正常数。

由归纳假设:

使用积分近似

,则

Important

确定型快速排序最坏情况 ,但随机化后期望始终为 ,消除了对输入分布的依赖。这是随机化”消灭最坏情况”的经典范例。

2.3 随机选择算法(QuickSelect)

QuickSelect是快速排序的”表亲”,用于找到第k小的元素:

def randomized_select(A, lo, hi, k):
    """
    随机化选择算法
    
    期望时间: O(n)
    最坏时间: O(n^2),但期望避免
    
    k是相对于lo的偏移量,0-indexed
    """
    if lo == hi:
        return A[lo]
    
    # 随机选择pivot
    pivot_idx = random.randint(lo, hi)
    A[lo], A[pivot_idx] = A[pivot_idx], A[lo]
    pivot_idx = partition(A, lo, hi)
    
    # pivot的最终位置
    p = pivot_idx - lo
    
    if k == p:
        return A[pivot_idx]
    elif k < p:
        return randomized_select(A, lo, pivot_idx - 1, k)
    else:
        return randomized_select(A, pivot_idx + 1, hi, k - p - 1)

期望线性时间的证明

是期望时间。与快速排序类似:

但这里的关键观察是:期望上pivot会将数组”砍半”。使用指示随机变量分析可以证明:

通过积分或递归展开,可得

2.4 随机化快速排序的变种

三路划分快速排序(Dutch National Flag)

处理大量重复元素时的优化:

def quicksort_three_way(A, lo, hi):
    """
    三路划分快速排序
    
    优势:处理重复元素时避免不必要的递归
    时间:O(n log n) 对任意输入(而不只是随机输入)
    """
    if lo >= hi:
        return
    
    # 随机选择pivot
    pivot_idx = random.randint(lo, hi)
    pivot = A[pivot_idx]
    
    # 三路划分
    lt, gt, i = lo, hi, lo
    
    while i <= gt:
        if A[i] < pivot:
            A[lo], A[i] = A[i], A[lo]
            lo += 1
            i += 1
        elif A[i] > pivot:
            A[i], A[gt] = A[gt], A[i]
            gt -= 1
        else:
            i += 1
    
    # 递归排序小于和大于pivot的两部分
    quicksort_three_way(A, lo, lt - 1)
    quicksort_three_way(A, gt + 1, hi)

** introsort**:当递归深度过大时切换到堆排序,保证 的最坏情况。


3. Karger算法:随机化最小割

3.1 问题定义

Karger算法用于求解最小割问题:

  • 输入:无向图
  • 输出:最小容量的边割集

应用场景

  • 网络可靠性分析
  • 图像分割
  • 社区检测
  • 流网络的最大流计算

3.2 边的随机收缩

def karger_min_cut(G):
    """
    Karger算法的递归实现
    
    核心操作:随机选择一条边,将其两端收缩为一个超节点
    
    期望运行时间: O(n^2)
    成功概率: 2/(n(n-1))
    """
    n = len(G.vertices)
    
    # 终止条件:只剩2个顶点
    if n <= 2:
        return G.cut_value()
    
    # 随机选择一条边
    edge = random.choice(G.edges())
    u, v = edge
    
    # 收缩边(u, v) -> 合并为超节点 w
    G.contract(u, v)
    
    # 递归
    return karger_min_cut(G)

收缩操作详解

收缩边 (u, v):
1. 创建一个新顶点 w
2. 将 u 和 v 的所有邻居连接到 w
3. 删除 w 的自环
4. 保留所有平行边(只保留一条)
5. 删除 u 和 v

收缩后,边割对应原始图的一个割:被收缩的顶点之间没有边,因此任何跨超节点的边割对应原始图的一个边割。

3.3 成功概率分析

核心引理:若最小割大小为 ,则每次随机收缩保留最小割的概率至少为

证明:设图有 个顶点,最小割有 条边。

引理:最小割的 条边至少与 个顶点关联(因为每条边有2个端点)。

第1次收缩分析

  • 总边数 (握手定理的推论)
  • 选到最小割中边的概率

因此,第1次收缩不割断最小割的概率

归纳分析: 收缩后,设剩余顶点数为 。最小割在子图中仍然有 条边(被收缩边可能减少一些,但不减少最小割的相对大小)。

次收缩(剩余 个顶点)不割断最小割的概率为:

整个递归过程不割断最小割的概率:

3.4 重复提升成功率

单次成功率 极低。

重复策略:运行 次取最小值。

失败概率

总运行时间:(因为每次迭代 )。

def karger_min_cut_repeated(G, failure_prob=1e-6):
    """
    重复Karger算法直到失败概率足够低
    
    时间复杂度: O(n^2 log n)
    失败概率: ≤ failure_prob
    """
    n = len(G.vertices)
    
    # 计算需要的重复次数
    N = int(math.log(failure_prob) / math.log(1 - 2/(n*(n-1))))
    N = max(N, 1)
    
    best_cut = float('inf')
    
    for _ in range(N):
        # 创建图的深拷贝
        G_copy = G.copy()
        cut = karger_min_cut(G_copy)
        best_cut = min(best_cut, cut)
    
    return best_cut

渐进最优性:当 很大时, 次重复给出多项式时间算法。

3.5 Karger-Stein算法:自适应改进

Karger和Stein在1996年提出了自适应版本,在递归后期增加重复次数:

def karger_stein_min_cut(G):
    """
    Karger-Stein算法
    
    改进点:当顶点数减少时,增加重复次数
    
    时间复杂度: O(n^2 log n) 更小常数
    成功概率: > 1/n^2
    """
    n = len(G.vertices)
    
    if n <= 6:
        # 小规模时穷举
        return brute_force_min_cut(G)
    
    # 确定收缩次数
    t = n // int(math.sqrt(2)) + 1
    
    # 收缩到 t 个顶点
    G1 = karger_contract(G, t)
    G2 = karger_contract(G, t)
    
    # 递归
    return min(karger_stein_min_cut(G1), 
               karger_stein_min_cut(G2))
 
def karger_contract(G, target_n):
    """
    将图收缩到 target_n 个顶点
    """
    while len(G.vertices) > target_n:
        edge = random.choice(G.edges())
        G.contract(edge[0], edge[1])
    
    return G

4. 随机游走与2-SAT

4.1 2-SAT问题

2-SAT是布尔可满足性的特例:每个子句恰好包含2个文字。

问题形式化

判断是否存在赋值使 为真。

为什么研究2-SAT

  1. 易求解:存在多项式时间算法(强于一般的3-SAT)
  2. 实用价值:规划、配置、约束满足问题常建模为2-SAT
  3. 理论意义:随机游走算法揭示了问题的”局部结构”

2-SAT的图论视角

每个变量 有两个文字:

每个子句 等价于两个蕴含:

构建蕴含图 ,如果同一变量的两个文字在同一强连通分量中,则公式不可满足。

4.2 随机游走算法

def random_walk_2sat(phi, n, max_steps=None):
    """
    Papadimitriou的随机游走算法求解2-SAT
    
    期望运行时间: O(m^2),其中 m 是子句数
    正确性: 若公式可满足,则以高概率返回解;若不可满足,则返回None
    
    核心思想:
    1. 从随机赋值开始
    2. 找到未满足的子句
    3. 随机翻转其中一个变量的值
    4. 重复直到找到满足赋值或超时
    """
    import random
    
    if max_steps is None:
        max_steps = 2 * n * n  # Papadimitriou的界限
    
    assignment = {i: random.choice([True, False]) for i in range(n)}
    
    for step in range(max_steps):
        # 检查是否满足
        unsatisfied = find_unsatisfied_clauses(phi, assignment)
        
        if not unsatisfied:
            return assignment  # 找到满足赋值
        
        # 随机选择一个未满足的子句
        clause = random.choice(unsatisfied)
        
        # 随机翻转其中一个变量
        var = random.choice(clause)
        assignment[abs(var)] = not assignment[abs(var)]
    
    return None  # 可能不可满足

关键函数实现

def find_unsatisfied_clauses(phi, assignment):
    """
    找到所有未满足的子句
    
    phi: [(literal1, literal2), ...]
    literal: 正整数表示变量x_i, 负整数表示¬x_|i|
    """
    unsatisfied = []
    
    for lit1, lit2 in phi:
        val1 = assignment[abs(lit1)] if lit1 > 0 else not assignment[abs(lit1)]
        val2 = assignment[abs(lit2)] if lit2 > 0 else not assignment[abs(lit2)]
        
        if not (val1 or val2):
            unsatisfied.append((lit1, lit2))
    
    return unsatisfied

4.3 期望运行时间分析

Papadimitriou定理:若公式可满足,则随机游走在 期望步数内找到解。

证明思路

考虑可满足公式 和一个满足赋值

定义当前赋值与 的”距离” = 满足 但不满足当前赋值的变量数。

关键引理:如果当前赋值不是全满足的,则存在某个未满足子句 。该子句中,至少有一个文字在 中为真。

随机游走的”漂移”分析

  • 若翻转在 中为真的变量(概率 ),距离 减少
  • 若翻转在 中为假的变量(概率 ),距离 增加

使用超调和函数的性质,可以证明这个马尔可夫链的混合时间。

Example

考虑简单的2-SAT实例 。随机游走从任意赋值开始,期望在多项式步数内找到满足赋值或证明不可满足。

4.4 随机游走的变种

加权随机游走:根据变量重要性加权选择翻转概率。

restart策略:每 步后重启,增加找到解的概率。

def random_walk_2sat_with_restarts(phi, n, num_restarts=10):
    """
    带重启的随机游走
    
    每次重启清空历史,利用"fresh start"的机会
    """
    for _ in range(num_restarts):
        result = random_walk_2sat(phi, n)
        if result is not None:
            return result
    
    return None

5. PCP定理与近似难度的联系

5.1 PCP的定义

PCP定理(概率可检验证明)是计算复杂度的里程碑:

解释:NP中的问题存在多项式时间可验证的证明,但验证者只读取 个随机选择的比特。

PCP验证者的行为

验证者(输入x, 随机性r):
    1. 生成查询位置 Q = query_positions(x, r)
    2. 从证明者处读取比特 π[Q]
    3. 运行确定性检验 pred(x, π[Q])
    4. 接受或拒绝

5.2 PCP与近似难度的联系

PCP定理的直接推论:存在优化问题,其最优值难以近似。

Max-3SAT的PCP

  • NP语言可归约为Max-3SAT的某个参数版本
  • 最大化满足的子句数
  • 区分”完全可满足”和”最多满足”是NP难的

PCP重构引理

这意味着即使 近似也是NP难的。

PCP系统的参数

参数含义PCP定理
随机位数验证者使用随机性
查询数验证者只读个证明位
完备性1正确证明必被接受
可靠性错误证明被拒绝概率≥

5.3 PCP在密码学中的应用

零知识证明(Zero-Knowledge Proof)基于PCP框架:

组件作用
证明者生成PCP证明
验证者随机查询少量位置
零知识未查询位置的信息不泄露

简化的交互式零知识协议

class PCPZeroKnowledge:
    """
    简化的PCP零知识证明
    
    证明者想要向验证者证明他知道一个布尔公式的满足赋值
    但不希望泄露赋值本身
    """
    
    def __init__(self, phi, witness):
        self.phi = phi  # 布尔公式
        self.witness = witness  # 满足赋值
    
    def prover_commitment(self):
        """
        证明者对每个变量的两种可能值都提交
        实际上证明者只准备好与witness一致的承诺
        """
        import random
        
        self.commitments = {}
        for var in self.witness:
            # 真正的值
            true_val = self.witness[var]
            # 假值(混淆)
            false_val = not true_val
            
            self.commitments[var] = {
                'true': self._commit(true_val),
                'false': self._commit(false_val)
            }
        
        return self.commitments
    
    def verifier_query(self, variable):
        """
        验证者随机查询一个变量
        证明者揭示对应的承诺
        """
        import random
        
        true_or_false = random.choice(['true', 'false'])
        return self.commitments[variable][true_or_false]
    
    def verifier_check(self, phi, opened_values):
        """
        验证者检查子句是否满足
        """
        # 检查涉及的变量是否与满足赋值一致
        return check_satisfaction(phi, opened_values)

5.4 PCP定理的现代发展

PCP定理的实用化

  • 概率可检验证明系统(PCP)在区块链零知识证明中被实际应用
  • SNARK(Succinct Non-interactive ARguments of Knowledge)基于PCP构造
  • STARK(Scalable Transparent Arguments of Knowledge)是无信任设置的变体

6. Chernoff/Hoeffding不等式

6.1 Chernoff界

Chernoff不等式是分析随机算法的基础工具:

定理(Chernoff界):设 为独立随机变量,。则对任意

常用形式:对

Chernoff界的推导

使用马尔可夫不等式和矩生成函数:

对每个 ,然后优化 得到 Chernoff 界。

def chernoff_bound_example():
    """
    Chernoff界的数值示例
    """
    n = 10000
    p = 0.01
    mu = n * p  # 期望值
    
    print(f"n = {n}, p = {p}, μ = {mu}")
    
    # 计算不同偏差下的概率上界
    for epsilon in [0.1, 0.2, 0.5]:
        upper_bound = 2 * math.exp(-mu * epsilon**2 / 3)
        print(f"P(|X - μ| > {epsilon}μ) ≤ {upper_bound:.6f}")

6.2 Hoeffding不等式

Hoeffding不等式是Chernoff界的连续扩展:

为独立有界随机变量,,则对任意

与Chernoff界的比较

特征ChernoffHoeffding
变量范围
变量方差包含不包含(只用范围)
指数形式依赖 依赖
紧致性更紧稍松

6.3 应用示例:负载均衡

应用 个作业随机分配到 台机器,负载浓度不等式的直接应用。

设每台机器期望负载为 ,则实际负载偏离超过 的概率指数小。

def random_load_balancing_analysis(n, m, epsilon):
    """
    随机分配负载的Chernoff分析
    
    n: 作业数
    m: 机器数
    epsilon: 允许的相对偏差
    
    期望每台机器负载: n/m
    """
    load_per_machine = n / m
    
    # 每台机器的负载是二项分布 B(n, 1/m) 的和
    # 使用Chernoff界
    
    # 单台机器负载超过 (1+epsilon) * n/m 的概率
    delta = epsilon
    prob_single_machine = math.exp(-load_per_machine * delta**2 / 3)
    
    # union bound: 任何机器超载的概率
    prob_any_overload = m * prob_single_machine
    
    return {
        'expected_load': load_per_machine,
        'prob_single_overload': prob_single_machine,
        'prob_any_overload': prob_any_overload
    }

6.4 Bernstein不等式与更紧的界

当方差信息已知时,Bernstein不等式给出更紧的界:

其中 是方差。


7. 随机化在机器学习中的应用

7.1 随机梯度下降(SGD)

随机梯度下降是深度学习的基础优化算法:

import numpy as np
import random
 
def sgd(model, data, lr=0.01, num_epochs=100, batch_size=32):
    """
    随机梯度下降
    
    核心思想:用随机近似的梯度代替真实梯度
    收敛性:对非凸函数收敛到静止点 O(1/sqrt(T))
    """
    n = len(data)
    
    for epoch in range(num_epochs):
        random.shuffle(data)
        
        for i in range(0, n, batch_size):
            batch = data[i:i+batch_size]
            
            # 计算随机梯度(mini-batch)
            grad = compute_gradient(model, batch)
            
            # 更新参数
            model.weights -= lr * grad
            
            # 学习率衰减
            lr *= 0.999
        
        # 每个epoch评估
        if epoch % 10 == 0:
            loss = evaluate_loss(model, data)
            print(f"Epoch {epoch}, Loss: {loss:.4f}")
    
    return model
 
def sgld(model, data, lr=0.01, num_epochs=100):
    """
    随机梯度Langevin动力学(SGLD)
    
    在SGD基础上添加噪声,趋向于后验分布
    用于贝叶斯深度学习
    """
    for epoch in range(num_epochs):
        for x, y in random.sample(data, batch_size):
            grad = compute_gradient(model, x, y)
            
            # 添加梯度噪声
            noise = np.random.randn(*grad.shape) * np.sqrt(lr)
            
            model.weights += lr * (grad + noise)
    
    return model

7.2 随机近似

坐标下降法:随机选择要更新的坐标。

def random_coordinate_descent(f, x, alpha, num_iterations=1000):
    """
    随机坐标下降法
    
    收敛速度:强凸函数 O(1/T)
    非强凸函数 O(1/sqrt(T))
    """
    n = len(x)
    
    for t in range(num_iterations):
        # 随机选择一个坐标
        i = random.randint(0, n-1)
        
        # 计算偏导数
        grad_i = partial_derivative(f, x, i)
        
        # 更新
        x[i] -= alpha * grad_i
    
    return x
 
def stochastic_coordinate_descent(A, b, x0, lam, num_iterations):
    """
    LASSO的随机坐标下降
    min (1/2)||Ax - b||_2^2 + λ||x||_1
    """
    x = x0.copy()
    n = len(x)
    
    for t in range(num_iterations):
        i = random.randint(0, n-1)
        
        # 计算坐标 i 的最优更新
        residual = b - A @ x + A[:, i] * x[i]
        x[i] = soft_threshold(residual @ A[:, i], lam) / (A[:, i] @ A[:, i])
    
    return x

7.3 随机森林与Bagging

Bagging(Bootstrap Aggregating):

import numpy as np
from sklearn.tree import DecisionTreeClassifier
 
def bagging(train_data, num_trees=100, max_features='sqrt'):
    """
    Bagging集成方法
    
    1. Bootstrap采样:从训练集中有放回采样
    2. 在每个样本上训练基学习器
    3. 集成预测(投票或平均)
    """
    trees = []
    n_samples = len(train_data)
    
    for _ in range(num_trees):
        # Bootstrap采样
        indices = np.random.choice(n_samples, size=n_samples, replace=True)
        bootstrap_sample = [train_data[i] for i in indices]
        
        # 训练决策树
        tree = DecisionTreeClassifier(max_features=max_features)
        tree.fit(bootstrap_sample)
        
        trees.append(tree)
    
    def predict(X):
        # 投票集成
        votes = np.zeros((len(X), num_trees))
        for i, tree in enumerate(trees):
            votes[:, i] = tree.predict(X)
        
        # 多数投票
        from scipy.stats import mode
        return mode(votes, axis=1)[0].flatten()
    
    return predict
 
class RandomForest:
    """
    随机森林:Bagging + 特征子采样
    
    进一步降低过拟合
    """
    
    def __init__(self, n_estimators=100, max_depth=None, 
                 max_features='sqrt', min_samples_leaf=1):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.max_features = max_features
        self.min_samples_leaf = min_samples_leaf
        self.trees = []
    
    def fit(self, X, y):
        self.trees = []
        n_samples, n_features = X.shape
        
        for _ in range(self.n_estimators):
            # Bootstrap采样
            indices = np.random.choice(n_samples, size=n_samples, replace=True)
            
            # 特征子采样
            if self.max_features == 'sqrt':
                n_sub_features = int(np.sqrt(n_features))
            elif self.max_features == 'log2':
                n_sub_features = int(np.log2(n_features))
            else:
                n_sub_features = n_features
            
            feature_indices = np.random.choice(
                n_features, size=n_sub_features, replace=False
            )
            
            # 训练树
            tree = DecisionTreeClassifier(
                max_depth=self.max_depth,
                min_samples_leaf=self.min_samples_leaf
            )
            tree.fit(X[indices][:, feature_indices], y[indices])
            
            self.trees.append((tree, feature_indices))
        
        return self
    
    def predict(self, X):
        votes = np.array([
            tree.predict(X[:, features]) 
            for tree, features in self.trees
        ])
        from scipy.stats import mode
        return mode(votes, axis=0)[0].flatten()

7.4 变分推断与MCMC

概率机器学习使用随机化进行近似推断:

方法思想随机化成分
蒙特卡洛采样从后验直接采样马尔可夫链转移
变分推断用简单分布近似复杂后验随机梯度优化
重要性采样加权采样近似期望采样权重
粒子滤波序列重要性采样粒子重采样
def metropolis_hastings(target_log_pdf, proposal, x0, num_samples):
    """
    Metropolis-Hastings MCMC算法
    
    目标:从目标分布 π(x) ∝ exp(-target_log_pdf(x)) 采样
    提议:使用对称提议分布 q(x'|x)
    """
    samples = [x0]
    x = x0
    
    for i in range(num_samples):
        # 从提议分布采样
        x_proposed = proposal(x)
        
        # 计算接受率
        log_alpha = target_log_pdf(x_proposed) - target_log_pdf(x)
        
        # Metropolis接受准则
        if np.log(np.random.random()) < log_alpha:
            x = x_proposed
        
        samples.append(x)
    
    return np.array(samples)
 
def gibbs_sampling(joint_log_pdf, initial_state, num_samples, num_variables):
    """
    Gibbs采样
    
    每次更新一个变量,条件分布为其他变量的函数
    """
    state = initial_state.copy()
    samples = [state]
    
    for _ in range(num_samples):
        for i in range(num_variables):
            # 条件分布 p(x_i | x_{-i})
            conditional = lambda xi: joint_log_pdf(state, i, xi)
            state[i] = sample_from_conditional(conditional)
        
        samples.append(state.copy())
    
    return np.array(samples)

7.5 随机化神经架构搜索

def random_neural_architecture_search(search_space, num_samples=1000):
    """
    随机神经架构搜索
    
    最简单的NAS基线
    随机采样架构并评估
    """
    best_architecture = None
    best_score = 0
    
    for _ in range(num_samples):
        # 随机采样架构
        arch = sample_architecture(search_space)
        
        # 训练并评估
        model = build_model(arch)
        score = train_and_evaluate(model, data)
        
        if score > best_score:
            best_score = score
            best_architecture = arch
    
    return best_architecture, best_score
 
def darts_relaxation(model, search_space):
    """
    DARTS:可微架构搜索
    
    使用Gumbel-Softmax进行随机化搜索
    """
    # 混合操作权重
    alphas = {node: torch.zeros(len(search_space)) for node in model.nodes}
    
    for epoch in range(num_epochs):
        # Gumbel-Softmax采样
        for node in model.nodes:
            logits = alphas[node]
            # gumbel-softmax
            gumbels = -torch.log(-torch.log(torch.rand_like(logits)))
            weights = F.softmax((logits + gumbels) / temperature, dim=-1)
            
            # 使用加权操作构建模型
            model.set_edge_weights(node, weights)
        
        # 优化共享权重和α
        optimizer.step()

8. 随机化算法设计技术

8.1 随机化vs确定性选择

技术1:随机抽样避免最坏情况

随机化消除了输入构造者(adversary)的能力。确定性算法可能被精心构造的输入击败,但随机算法对任何输入都能提供期望保证。

  • 快速排序的随机pivot
  • 选择算法的随机采样
  • 图算法的随机起点
def randomized_string_matching(text, pattern):
    """
    随机化字符串匹配
    
    Rabin-Karp的随机版本
    使用随机化的滚动哈希
    """
    import random
    
    n, m = len(text), len(pattern)
    
    # 随机选择素数和基数
    p = random_prime(2**61 - 1)  # 使用大素数
    base = random.randint(256, p-1)
    
    # 预计算
    base_m_1 = pow(base, m-1, p)
    
    # 计算模式的哈希
    pattern_hash = hash_string(pattern, base, p)
    text_hash = hash_string(text[:m], base, p)
    
    for i in range(n - m + 1):
        if text_hash == pattern_hash:
            # 验证(避免哈希碰撞)
            if text[i:i+m] == pattern:
                return i
        
        # 滚动哈希
        text_hash = (text_hash - text[i] * base_m_1) % p
        text_hash = (text_hash * base + text[i+m]) % p
    
    return -1

技术2:随机扰动打破对称性

在无初始信息的场景中,随机化提供”公平的起点”。

  • 分布式系统的随机退避(以太网CSMA/CD)
  • 博弈论中的混合策略均衡
  • 负载均衡的随机分配
def exponential_backoff(protocol):
    """
    指数退避算法
    
    冲突后随机等待,避免再次冲突
    """
    attempt = 0
    max_attempts = 16
    
    while attempt < max_attempts:
        # 随机等待
        wait_time = random.uniform(0, 2**attempt * slot_time)
        sleep(wait_time)
        
        # 尝试发送
        if can_transmit(protocol):
            return True  # 成功
        
        attempt += 1
    
    return False  # 失败

技术3:概率放大

从Monte Carlo算法的常数错误率放大到任意小。

def amplify_error_probability(algorithm, instance, target_error=1e-6):
    """
    错误概率放大
    
    基于 majority vote
    需要算法单次错误率 < 1/2
    """
    import math
    
    # 单次错误率 1/3 是常用选择
    single_error_rate = 1/3
    
    # 需要的重复次数
    # P(≥ t/2 errors) ≤ (e/4)^{t/2} ≈ target_error
    # 推导: 使用 Chernoff
    num_repeats = int(math.ceil(
        2 * math.log(1/target_error) / (single_error_rate * math.log(1/single_error_rate))
    ))
    
    results = []
    for _ in range(num_repeats):
        results.append(algorithm(instance))
    
    # 多数投票
    return majority(results)

8.2 去随机化技术(Derandomization)

确定性算法可以在不损失效率的情况下消除随机性:

方法1:条件期望

利用期望的线性性质,选择使条件期望最大的分支。

def conditional_expectation_deterministic(algorithm, instance):
    """
    条件期望去随机化
    
    核心思想:对于每个随机比特,选择使期望输出最优的方向
    """
    # 初始化
    best_path = []
    
    # 遍历所有随机比特位置
    for i in range(num_random_bits):
        # 计算选择0时的期望
        value_if_0 = expected_value_given_bit(algorithm, instance, i, 0)
        
        # 计算选择1时的期望
        value_if_1 = expected_value_given_bit(algorithm, instance, i, 1)
        
        # 选择期望更大的
        if value_if_0 <= value_if_1:
            best_path.append(0)
        else:
            best_path.append(1)
    
    # 运行确定性版本
    return algorithm.run_deterministic(instance, best_path)

方法2:-网与-样本

构造确定性的小样本集以近似随机选择的效果。

def epsilon_net_sampling(domain, num_samples):
    """
    ε-网采样
    
    用于去随机化的采样技术
    确定性选择覆盖所有可能性的样本
    """
    # 使用ε-网构造确定性采样集
    samples = []
    covered = set()
    
    while len(covered) < (1 - epsilon) * len(domain):
        # 选择覆盖最多未覆盖元素的样本
        best_sample = None
        best_new_coverage = 0
        
        for candidate in candidate_set:
            new_coverage = count_new_coverage(domain, samples + [candidate])
            if new_coverage > best_new_coverage:
                best_new_coverage = new_coverage
                best_sample = candidate
        
        samples.append(best_sample)
        update_covered(covered, samples)
    
    return samples

方法3:局部屋历法

枚举所有随机种子而非使用真随机数。

def derandomized_min_cut(G):
    """
    去随机化的最小割
    
    使用确定性等价物替代随机选择
    """
    # 确定性选择:使用伪随机序列
    random.seed(42)  # 可复现的随机性
    
    n = len(G.vertices)
    
    while n > 2:
        # 使用确定性选择边
        edge = deterministic_edge_selection(G)
        G.contract(edge[0], edge[1])
        n -= 1
    
    return G.cut_value()
 
def deterministic_edge_selection(G):
    """
    确定性边选择
    
    例如:选择哈希值最小的边
    """
    min_hash = float('inf')
    min_edge = None
    
    for edge in G.edges():
        edge_hash = hash((edge[0], edge[1]))
        if edge_hash < min_hash:
            min_hash = edge_hash
            min_edge = edge
    
    return min_edge

8.3 随机化算法的电路复杂性

BPP电路:随机性可以通过电路模拟。

def bpp_circuit_simulation():
    """
    BPP ⊆ P/poly 的证明思路
    
    若语言 L ∈ BPP,则存在多项式大小的电路族 {C_n}
    C_n(x) 以 ≥ 2/3 的概率计算 x 是否在 L 中
    
    去随机化:使用ε-网或硬函数
    """
    pass

9. 复杂性类的随机化视角

9.1 BPP类

BPP(Bounded-error Probabilistic Polynomial time):

BPP的特征

  • 错误界限:误差概率 ≤ 1/3(任意固定常数即可)
  • 双向错误:既可能假阳性也可能假阴性
  • 放大:重复运行可将错误指数级降低

开放问题?大多数研究者相信随机性可以被高效消除。

支持 的证据

  • 如果 ,则 可以被亚指数时间解决
  • 随机性似乎”不够强大”来超越确定性
  • 伪随机发生器的存在暗示去随机化的可能

支持 的证据

  • 自然界中随机性似乎无处不在
  • 某些问题(如素数测试)在随机算法出现后才变得实用
  • 目前没有反例

9.2 RP与co-RP

  • RP(Randomized Polynomial time):单面错误(只假阳性)

  • co-RP:只假阴性

正确性要求错误类型
RPYES → ≥ 1/2, NO → 0单边假阳性
co-RPYES → 1, NO → ≤ 1/2单边假阴性
BPP≥ 2/3双边错误
PP> 1/2微弱多数

9.3 ZPP:零错误概率多项式时间

ZPP是Las Vegas算法的复杂性类:

特征

  • 运行时间期望多项式
  • 错误率为0
  • 等价于期望指数小错误率的算法

9.4 随机电路

,则需要超多项式大的随机电路才能模拟随机算法。

随机电路复杂性层次

其中 是非均匀多项式时间电路。


10. 随机数据结构

10.1 跳表(Skip List)

跳表是一种随机化平衡搜索结构:

import random
 
class SkipListNode:
    def __init__(self, key, value, level):
        self.key = key
        self.value = value
        self.forward = [None] * (level + 1)
 
class SkipList:
    """
    跳表
    
    期望时间: O(log n) 搜索/插入/删除
    空间: O(n)
    
    核心随机性:每个新节点的高度服从几何分布
    """
    
    def __init__(self, max_level=16, p=0.5):
        self.max_level = max_level
        self.p = p  # 每个层级被扩展的概率
        self.header = SkipListNode(float('-inf'), None, max_level)
        self.level = 0
    
    def random_level(self):
        """生成随机层级(几何分布)"""
        level = 0
        while random.random() < self.p and level < self.max_level:
            level += 1
        return level
    
    def search(self, key):
        """搜索 key"""
        current = self.header
        
        # 从最高层开始搜索
        for i in range(self.level, -1, -1):
            while current.forward[i] and current.forward[i].key < key:
                current = current.forward[i]
        
        current = current.forward[0]
        
        if current and current.key == key:
            return current.value
        return None
    
    def insert(self, key, value):
        """插入 key-value 对"""
        update = [None] * (self.max_level + 1)
        current = self.header
        
        # 找到插入位置
        for i in range(self.level, -1, -1):
            while current.forward[i] and current.forward[i].key < key:
                current = current.forward[i]
            update[i] = current
        
        # 随机层级
        new_level = self.random_level()
        
        # 更新最高层级
        if new_level > self.level:
            for i in range(self.level + 1, new_level + 1):
                update[i] = self.header
            self.level = new_level
        
        # 创建新节点
        new_node = SkipListNode(key, value, new_level)
        
        # 更新指针
        for i in range(new_level + 1):
            new_node.forward[i] = update[i].forward[i]
            update[i].forward[i] = new_node
    
    def delete(self, key):
        """删除 key"""
        update = [None] * (self.max_level + 1)
        current = self.header
        
        for i in range(self.level, -1, -1):
            while current.forward[i] and current.forward[i].key < key:
                current = current.forward[i]
            update[i] = current
        
        current = current.forward[0]
        
        if current and current.key == key:
            for i in range(self.level + 1):
                if update[i].forward[i] == current:
                    update[i].forward[i] = current.forward[i]

跳表的平衡性分析

每个层级期望包含 个元素。最顶层期望只有 个元素。

搜索路径长度期望

10.2 布隆过滤器(Bloom Filter)

布隆过滤器是一种空间高效的概率数据结构:

import hashlib
 
class BloomFilter:
    """
    布隆过滤器
    
    空间效率: O(m/n) 位每元素
    查询时间: O(k) 哈希计算
    
    特性:
    - 可能假阳性(报告存在但不存在)
    - 不会假阴性(报告不存在则一定不存在)
    """
    
    def __init__(self, size, num_hashes):
        self.size = size  # 位数组大小
        self.num_hashes = num_hashes
        self.bit_array = [0] * size
    
    def _hashes(self, item):
        """生成 num_hashes 个哈希值"""
        # 使用双哈希技术
        h1 = int(hashlib.md5(item.encode()).hexdigest(), 16)
        h2 = int(hashlib.sha1(item.encode()).hexdigest(), 16)
        
        for i in range(self.num_hashes):
            yield (h1 + i * h2) % self.size
    
    def add(self, item):
        """添加元素"""
        for index in self._hashes(item):
            self.bit_array[index] = 1
    
    def contains(self, item):
        """检查元素是否可能存在"""
        for index in self._hashes(item):
            if self.bit_array[index] == 0:
                return False
        return True  # 可能存在(假阳性)
    
    def false_positive_rate(self, n):
        """
        计算假阳性率
        
        推导: (1 - e^{-kn/m})^k
        m: 位数组大小
        n: 元素个数
        k: 哈希函数个数
        """
        import math
        m = self.size
        k = self.num_hashes
        return (1 - math.exp(-k * n / m)) ** k

最优参数选择

给定位数组大小 和元素个数 ,最优哈希数:

此时假阳性率最小:

10.3 随机哈希与最小哈希

MinHash 用于估计集合相似度:

class MinHash:
    """
    MinHash
    
    用于估计Jaccard相似度 |A ∩ B| / |A ∪ B|
    典型应用:文档去重、集合相似性搜索
    """
    
    def __init__(self, num_hashes, hash_range=2**32):
        self.num_hashes = num_hashes
        self.hash_range = hash_range
        # 随机哈希参数
        self.A = [random.randint(1, hash_range) for _ in range(num_hashes)]
        self.B = [random.randint(0, hash_range) for _ in range(num_hashes)]
    
    def _hash(self, x, a, b):
        """随机哈希函数 h(x) = (ax + b) mod p"""
        return (a * x + b) % self.hash_range
    
    def minhash(self, s):
        """
        计算集合 s 的MinHash签名
        """
        signature = []
        for a, b in zip(self.A, self.B):
            min_hash = min(self._hash(x, a, b) for x in s)
            signature.append(min_hash)
        return signature
    
    def estimate_similarity(self, sig1, sig2):
        """
        估计两个集合的Jaccard相似度
        """
        return sum(h1 == h2 for h1, h2 in zip(sig1, sig2)) / self.num_hashes

11. 高级随机化技术

11.1 概率方法

概率方法是用随机性证明存在性的强大工具:

def probabilistic_method_example():
    """
    概率方法示例:Ramsey数的下界
    
    证明 R(k,k) > N 的存在性
    """
    import math
    
    # 随机着色 N 个顶点的完全图
    # 两色(红/蓝),每条边独立以 1/2 概率着色
    
    # 某个k团和k独立集同时存在的概率
    N = int(2**(k/2) / math.sqrt(2))
    
    # 计算全红k团或全蓝k团的期望数量
    # E = 2 * C(N, k) * 2^{-C(k,2)}
    
    # 若 E < 1,则存在某种着色使得两者都不存在
    # 即 R(k,k) > N

11.2 蒙特卡洛树搜索(MCTS)

class MonteCarloTreeSearch:
    """
    蒙特卡洛树搜索
    
    用于完美信息博弈的决策
    结合随机模拟和树搜索
    """
    
    def __init__(self, game, exploration_weight=1.414):
        self.game = game
        self.exploration_weight = exploration_weight
        self.nodes = {}  # 状态 -> 节点统计
    
    def ucb1(self, node, parent_visits):
        """
        UCB1公式
        
        UCB1 = Q(s,a)/N(s,a) + c * sqrt(log(N(s))/N(s,a))
        """
        if node.visits == 0:
            return float('inf')  # 未访问的节点优先
        
        exploitation = node.wins / node.visits
        exploration = math.sqrt(
            math.log(parent_visits) / node.visits
        )
        
        return exploitation + self.exploration_weight * exploration
    
    def select(self, state):
        """选择:从根到叶的选择阶段"""
        path = []
        current = self.get_or_create_node(state)
        
        while not self.game.is_terminal(current.state):
            if current not in self.nodes:
                return path, current
            
            # UCB1选择最佳子节点
            best_child = max(
                current.children,
                key=lambda c: self.ucb1(c, current.visits)
            )
            
            path.append((current, best_child))
            current = best_child
        
        return path, current
    
    def expand(self, node):
        """扩展:添加子节点"""
        if self.game.is_terminal(node.state):
            return node
        
        for action in self.game.get_legal_actions(node.state):
            new_state = self.game.apply(node.state, action)
            child = Node(new_state, parent=node, action=action)
            node.children.append(child)
        
        return random.choice(node.children)
    
    def simulate(self, state):
        """模拟:随机 rollout 到终局"""
        while not self.game.is_terminal(state):
            actions = self.game.get_legal_actions(state)
            action = random.choice(actions)
            state = self.game.apply(state, action)
        
        return self.game.get_reward(state)
    
    def backpropagate(self, path, reward):
        """回溯:更新路径上所有节点"""
        for node, child in reversed(path):
            node.visits += 1
            node.wins += reward
            reward = 1 - reward  # 对方视角
    
    def search(self, state, num_simulations=1000):
        """主搜索循环"""
        for _ in range(num_simulations):
            path, leaf = self.select(state)
            
            if leaf not in self.nodes:
                self.nodes[leaf] = Node(leaf.state)
            
            child = self.expand(leaf)
            reward = self.simulate(child.state)
            self.backpropagate(path + [(leaf, child)], reward)

11.3 随机化数值算法

def random_projected_gradient_descent(f, x0, k, num_iterations=1000):
    """
    随机投影梯度下降
    
    用于高维约束优化
    将迭代投影到低维随机子空间
    
    收敛速度: O(1/sqrt(T)) 在某些条件下
    """
    d = len(x0)
    x = x0.copy()
    
    for t in range(num_iterations):
        # 随机投影矩阵 R (k × d)
        R = np.random.randn(k, d) / np.sqrt(k)
        
        # 在随机子空间中优化
        x_tilde = R @ x
        
        # 梯度下降
        grad_tilde = R @ gradient(f, x)
        x_tilde = x_tilde - (1/math.sqrt(t+1)) * grad_tilde
        
        # 反投影回原始空间(使用伪逆)
        x = x_tilde @ np.linalg.pinv(R)
    
    return x
 
def nystrom_method(kernel_matrix, rank, num_samples):
    """
    Nyström近似
    
    低秩矩阵近似
    用于核方法的随机化加速
    """
    n = kernel_matrix.shape[0]
    
    # 随机采样
    indices = np.random.choice(n, num_samples, replace=False)
    
    # 构造子矩阵
    C = kernel_matrix[indices, :]
    W = kernel_matrix[np.ix_(indices, indices)]
    
    # Nyström近似
    W_inv_sqrt = np.linalg.inv(np.sqrtm(W))
    approx = C.T @ W_inv_sqrt @ C
    
    return approx

12. 算法性能总结

算法类型期望时间成功率/保证
随机快速排序Las Vegas100%
随机选择Las Vegas100%
Karger最小割Monte Carlo
随机游走2-SATMonte Carlo
Miller-Rabin素数测试Monte Carlo单次
跳跃表搜索Las Vegas100%
布隆过滤器查询Monte Carlo无假阴性
SGDMonte Carlo收敛到静止点
MCTSMonte Carlo可调概率收敛

13. 实践建议与注意事项

13.1 随机种子管理

def reproducible_experiment():
    """
    确保实验可复现
    """
    import random
    import numpy as np
    import torch
    
    # 设置全局随机种子
    SEED = 42
    random.seed(SEED)
    np.random.seed(SEED)
    torch.manual_seed(SEED)
    torch.cuda.manual_seed_all(SEED)
    
    # 更彻底的做法:确定性算法
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

13.2 随机算法的测试

def test_randomized_algorithm():
    """
    测试随机算法的策略
    """
    # 1. 使用固定随机种子多次运行
    for seed in range(10):
        random.seed(seed)
        result = algorithm(input)
        assert is_valid(result)
    
    # 2. 统计检验
    results = [algorithm(input) for _ in range(100)]
    assert mean(results) close_to expected_value
    assert variance(results) is small
    
    # 3. 对比确定性版本
    det_result = deterministic_version(input)
    rand_results = [randomized_version(input) for _ in range(100)]
    assert median(rand_results) close_to det_result

13.3 何时使用随机算法

适合随机化的场景

  • 问题有内在随机性(如采样、模拟)
  • 存在简单的随机近似方案
  • 确定性算法设计困难
  • 需要避免对抗性输入

不适合随机化的场景

  • 需要确定性可复现结果
  • 随机性开销不可接受
  • 已知的有效确定性算法

参考来源

  1. Motwani, R., & Raghavan, P. (1995). Randomized Algorithms. Cambridge University Press.
  2. Karger, D. R. (1994). Random Sampling in Graph Optimization. SIAM Journal on Computing.
  3. Arora, S., & Barak, B. (2009). Computational Complexity: A Modern Approach. Cambridge University Press.
  4. Mitzenmacher, M., & Upfal, E. (2005). Probability and Computing. Cambridge University Press.
  5. Chernoff, H. (1952). A Measure of Asymptotic Efficiency for Tests of a Hypothesis Based on the Sum of Observations. AMS.
  6. Hoeffding, W. (1963). Probability Inequalities for Sums of Bounded Random Variables. JASA.
  7. Papadimitriou, C. H. (1991). On Selecting a Satisfying Truth Assignment. FOCS.
  8. Karp, R. M., & Pearl, J. (1983). Finding Optimal Boolean Satisfiability Assignments by Simulated Annealing. AAAI.
  9. Robbins, H., & Monro, S. (1951). A Stochastic Approximation Method. AMS.
  10. Welling, M., & Teh, Y. W. (2011). Bayesian Learning via Stochastic Gradient Langevin Dynamics. ICML.

相关文档