在线算法详解

关键词速览

关键词解释
竞争分析在线算法与最优离线算法的比较框架
竞争比在线算法性能与最优算法的比值上界
最优在线决策基于部分信息的顺序决策问题
Secretary Problem经典的选择最优候选人问题
在线装箱物品序列的在线分配
负载均衡任务分配到多台机器
Ski Rental在线租买决策问题
对抗性输入分析在线算法的输入模型
随机化在线使用随机性的在线算法
Yao原理期望竞争比的下界证明
缓存置换页面替换的在线策略
LRU最近最少使用策略
分页算法虚拟内存管理
列表更新自我组织的线性搜索
多臂老虎机探索利用权衡的数学模型
上置信界UCB算法
注册模型在线学习的形式化框架
后悔值在线学习与最优的差距
次线性算法只需次线性信息访问的算法
流算法数据流模型下的在线处理

摘要

在线算法研究在信息不完整条件下进行决策的理论与实践。与离线算法拥有全部输入不同,在线算法必须在收到每个输入后立即做出不可撤回的决定。本文系统阐述竞争分析框架,通过经典案例(秘书问题、在线装箱、负载均衡、Ski Rental问题)展示问题建模与算法设计技术,并探讨随机化在线算法的优势。在线算法的思想广泛应用于操作系统调度、网络路由、搜索引擎排序和推荐系统等AI领域。

在线算法的研究起源于20世纪70年代的”表调度”(list scheduling)和”分页”(paging)问题。1975年,Graham对调度问题的开创性工作引入了竞争分析的概念。1979年,Sleator和Tarjan发表了关于自我调整列表(self-adjusting lists)的经典论文,开启了在线算法理论分析的序幕。此后,Karp、Borodin和El-Yaniv等学者系统发展了竞争分析的理论框架。

本文将覆盖以下内容:首先是竞争分析的数学基础与输入模型;其次是秘书问题、在线装箱、负载均衡等经典问题的深入分析;然后是缓存置换算法、列表更新等操作系统中的在线问题;接着探讨多臂老虎机等在线学习框架;再讨论随机化在线算法与Yao原理;最后展示在线算法在AI和现代计算中的应用。


1. 在线算法的基本框架

1.1 在线与离线问题的对比

特征离线算法在线算法
输入完整输入提前已知逐步到达
决策时机全局优化后决策即时决策
后悔度
分析框架近似比/精确性竞争分析
可逆性决策可修改决策不可撤回
信息量完备信息部分信息

核心挑战:在线算法必须在信息不完全时做出选择,且决策通常不可撤销。

在线性(Onliness)的来源

  1. 时间在线性:数据随时间到达,必须实时处理
  2. 空间在线性:数据存储超出内存,需要决定保留什么
  3. 结构在线性:图的顶点逐步暴露,边连接时需决策

在线性程度的光谱

完全离线 ←————————————————————→ 完全在线
    |                               |
    |      半在线(部分信息已知)       |
    |           |                    |
    |   批处理(有限 lookahead)      |
    |      预约式(可延迟)           |
    |                               |
    ←————————————— 决策可逆 ———————————————→

1.2 竞争分析的定义

竞争分析是在线算法性能评估的标准方法。设 为算法 在输入 上的代价, 为最优离线算法在相同输入上的代价。

竞争比的定义

一个在线算法 -竞争的,当且仅当存在常数 使得

对所有输入 成立。

渐进竞争比的定义

时,若 ,则算法是渐近 -竞争的。

Note

通常关注渐进竞争比,即忽略常数 ,要求 对足够大的输入成立。

最小化 vs 最大化问题

  • 最小化问题:
  • 最大化问题:

1.3 输入模型分类

模型描述分析方法例子
对抗性模型输入由敌手生成,最坏情况竞争比经典竞争分析
随机模型输入服从已知分布期望竞争比概率分析
确定性子分布输入来自任意分布通用性Yao原理
自适应对抗敌手可观测算法决策动态策略追踪分析

对抗性模型的强度层次

  1. ** oblivious 对抗**:在看到算法决策之前固定整个输入
  2. 轻度自适应对抗:每次决策后生成下一个输入
  3. 完全自适应对抗:知道算法的完整历史并优化最坏情况
def adversarial_input_generation(problem, algorithm):
    """
    对抗性输入生成器
    
    oblivious: 一次性生成所有输入
    adaptive: 根据算法决策生成下一输入
    """
    if model == 'oblivious':
        # 敌手在算法开始前生成完整输入
        input_sequence = generate_worst_case_input(problem, algorithm)
        return algorithm.process(input_sequence)
    
    elif model == 'adaptive':
        # 敌手根据算法决策动态生成输入
        remaining = generate_full_input(problem)
        while not algorithm.is_done():
            next_input = remaining.pop_adaptive(algorithm.state)
            algorithm.process_single(next_input)
        return algorithm.total_cost()

1.4 竞争分析的数学基础

竞争比的上界:构造性地证明算法在任意输入下表现良好。

竞争比的下界:证明不存在任何在线算法能超过某个竞争比。

分离引理(Lower Bound Separator)

若要证明问题 的竞争比下界为 ,需要:

  1. 构造一族输入实例
  2. 对任何在线算法 ,存在 使得

2. 秘书问题:最优停止理论

2.1 问题描述

秘书问题(Secretary Problem / Best Choice Problem)是最优停止理论的经典案例:

问题设定

  • 面试 个候选人
  • 候选人以随机顺序到达
  • 每次面试后必须立即决定是否录用
  • 一旦拒绝不可召回
  • 目标:选择最佳候选人的概率最大

为什么称为”秘书问题”

这个问题最早由Martin Gardner于1960年在《科学美国人》上提出,模拟的是选择最佳秘书的场景。现在已扩展到招聘、购房、投资等”在有限选项中做一次性选择”的场景。

问题的数学形式化

是候选人的独立同分布随机变量(表示质量/适配度),排列统计量为 。目标是最大化选中 的概率。

2.2 最优策略:观察-选择规则

def secretary_strategy(n, observe_fraction=1/math.e):
    """
    秘书问题的最优策略
    
    策略:观察前 r 个候选人,但不选择
         从第 r+1 个开始,选择第一个比之前所有候选人都好的
    
    参数:r = floor(n/e) 时成功概率最大,约为 1/e ≈ 0.3679
    """
    import math
    
    r = int(n * observe_fraction)
    
    best_observed = -float('inf')
    
    for i in range(1, n + 1):
        rating = get_candidate_rating(i)  # 假设有这个函数
        
        if i <= r:
            # 观察期:只记录,不选择
            best_observed = max(best_observed, rating)
        else:
            # 选择期:选择第一个超过历史最佳的
            if rating > best_observed:
                return i  # 选择第 i 个候选人
    
    # 如果没有更好的,选择最后一个
    return n

渐进最优策略

def secretary_strategy_asymptotic(n):
    """
    渐近最优策略
    
    随着 n 增大,1/e 是最优的观察比例
    """
    import math
    
    r = int(n / math.e)
    
    # 其余逻辑同上
    return r

2.3 最优观察期分析

定理:当 时,成功概率收敛到

推导

设选中的候选人是第 个,且被选中当且仅当它是最佳且

由于随机顺序,第 个是最佳的先验概率是

在观察到 时,前面 个候选人中最佳的排名是均匀分布的 。因此它是前 个最佳的概率是

足够大且 时,

最大化 得到 ,此时成功概率为

Example

招聘场景:面试100人,观察前37人不录用,从第38人开始录用第一个比之前所有都优秀的人,成功概率约36.8%。

2.4 秘书问题的变种

变种1:允许选择多个候选人

def secretary_k_choices(n, k, m=None):
    """
    选择最优的 k 个候选人
    
    m: 观察期长度,通常 m = n * k / (k+1)
    """
    import math
    
    if m is None:
        m = int(n * k / (k + 1))
    
    # 观察期
    top_k_observed = get_top_k(n, k)
    
    # 选择期:选前 k 个超过观察期最佳的选择
    selected = []
    threshold = min(top_k_observed)
    
    for i in range(m + 1, n + 1):
        if get_candidate_rating(i) > threshold:
            selected.append(i)
            if len(selected) == k:
                break
    
    return selected

变种2:伯努利秘书问题

候选人以一定概率 被录用(不录用则继续),目标是最大化被录用且最优的概率。

def bernoulli_secretary(n, p):
    """
    伯努利秘书问题
    
    每次面试后,以概率 p 决定是否录用
    """
    import math
    
    r = int(n / math.e)
    best_observed = -float('inf')
    
    for i in range(1, n + 1):
        rating = get_candidate_rating(i)
        
        if i <= r:
            best_observed = max(best_observed, rating)
        else:
            if rating > best_observed and random.random() < p:
                return i
    
    return n

变种3:带价值的秘书问题

每个候选人不仅有质量还有雇佣成本,目标是最大化质量-成本比。


3. 在线装箱问题

3.1 问题定义

在线装箱(Online Bin Packing)问题:

  • 输入:物品序列 ,每个物品大小
  • 目标:将物品装入最少数量的箱子(容量为1)

问题的应用场景

  1. 内存分配:将进程加载到内存页框
  2. 集装箱装载:货物装箱问题
  3. 资源分配:云服务器资源调度
  4. 数据包分片:网络传输中的分片重组

3.2 经典在线装箱算法

Next Fit (NF)

class NextFitBinPacker:
    """
    Next Fit 算法
    
    策略:当前箱子装不下时开新箱
    竞争比:2
    优点:简单,O(1) 空间
    缺点:浪费空间
    """
    
    def __init__(self, capacity=1.0):
        self.capacity = capacity
        self.current_bin = []
        self.current_load = 0
        self.num_bins = 0
    
    def add_item(self, size):
        if size > self.capacity:
            raise ValueError("Item too large")
        
        if self.current_load + size <= self.capacity:
            self.current_bin.append(size)
            self.current_load += size
        else:
            # 开新箱
            self.num_bins += 1
            self.current_bin = [size]
            self.current_load = size
    
    def close(self):
        """关闭当前箱子"""
        if self.current_bin:
            self.num_bins += 1
            self.current_bin = []
            self.current_load = 0

Best Fit (BF)

class BestFitBinPacker:
    """
    Best Fit 算法
    
    策略:将物品放入能容纳它且当前最满的箱子
    竞争比:1.7
    """
    
    def __init__(self, capacity=1.0):
        self.capacity = capacity
        self.bins = []  # (load, bin_id)
        self.bin_contents = {}
    
    def add_item(self, size):
        # 找到最满且能容纳物品的箱子
        best_idx = -1
        best_load = self.capacity + 1
        
        for idx, (load, _) in enumerate(self.bins):
            if load + size <= self.capacity and load + size > best_load:
                best_load = load + size
                best_idx = idx
        
        if best_idx == -1:
            # 创建新箱
            self.bins.append((size, len(self.bins)))
            self.bin_contents[len(self.bins) - 1] = [size]
        else:
            # 更新最佳箱子
            old_load, bin_id = self.bins[best_idx]
            self.bins[best_idx] = (old_load + size, bin_id)
            self.bin_contents[bin_id].append(size)

First Fit (FF)

class FirstFitBinPacker:
    """
    First Fit 算法
    
    策略:从第一个箱子开始找第一个能容纳物品的
    竞争比:1.7
    实现:使用有序结构优化到 O(n log n)
    """
    
    def __init__(self, capacity=1.0):
        self.capacity = capacity
        # 按负载排序的箱子
        self.bins = []  # [(load, bin_id, contents)]
    
    def _find_bin(self, size):
        """二分查找能容纳物品的最左边箱子"""
        lo, hi = 0, len(self.bins)
        while lo < hi:
            mid = (lo + hi) // 2
            if self.bins[mid][0] + size <= self.capacity:
                hi = mid
            else:
                lo = mid + 1
        return lo if lo < len(self.bins) and self.bins[lo][0] + size <= self.capacity else -1
    
    def add_item(self, size):
        idx = self._find_bin(size)
        
        if idx == -1:
            # 创建新箱
            self.bins.append((size, len(self.bins), [size]))
        else:
            # 更新箱子
            load, bin_id, contents = self.bins[idx]
            self.bins[idx] = (load + size, bin_id, contents + [size])

Harmonic Fit

class HarmonicFitBinPacker:
    """
    Harmonic Fit 算法
    
    策略:按物品大小分类处理
    将大小区间 (1/(k+1), 1/k] 的物品分组处理
    竞争比:1.691...
    """
    
    def __init__(self, capacity=1.0, k=3):
        self.capacity = capacity
        self.k = k
        self.bins = [[] for _ in range(k)]
    
    def _get_class(self, size):
        """物品属于哪个类别"""
        for i in range(1, self.k + 1):
            if size > 1.0 / (i + 1) and size <= 1.0 / i:
                return i - 1
        return self.k
    
    def add_item(self, size):
        item_class = self._get_class(size)
        self.bins[item_class].append(size)

3.3 竞争比分析

Next Fit 的竞争比

定理:NF的渐近竞争比是2。

证明:设 是最优箱子数。任何NF创建的箱子(除了最后一个)至少半满。因此:

FF 和 BF 的竞争比

定理:FF和BF的竞争比是 (更精确地,)。

Harmonic Fit 的竞争比

变种竞争比
Harmonic
Refined Harmonic
Asymptotic PTAS

3.4 离线近似算法:FFD

FFD是一个经典的离线近似算法:

def ffd(items, capacity=1.0):
    """
    First Fit Decreasing (FFD)
    
    1. 按物品大小降序排列
    2. 依次使用 First Fit 策略装箱
    
    近似比:11/9 * OPT + 6/9 ≈ 1.222 * OPT
    """
    # 按大小降序排序
    sorted_items = sorted(items, reverse=True)
    
    # 使用 First Fit
    packer = FirstFitBinPacker(capacity)
    for item in sorted_items:
        packer.add_item(item)
    
    return packer.num_bins()

4. 在线负载均衡

4.1 问题定义

在线负载均衡(Online Load Balancing):

  • 台机器
  • 作业序列 依次到达
  • 作业 有处理时间
  • 目标:最小化最大机器负载(makespan)

问题的应用场景

  1. 云计算调度:将任务分配到服务器
  2. 数据中心管理:虚拟机迁移
  3. GPU调度:深度学习训练任务分配
  4. 实时系统:任务优先级调度

4.2 Graham的List Scheduling算法

Graham的LPT算法(离线):按处理时间降序分配

def lpt_load_balancing(jobs, m):
    """
    LPT (Longest Processing Time first) 离线算法
    
    近似比:4/3 - 1/(3m)
    """
    sorted_jobs = sorted(jobs, reverse=True)
    loads = [0] * m
    
    for job in sorted_jobs:
        min_machine = loads.index(min(loads))
        loads[min_machine] += job
    
    return max(loads)

在线List Scheduling (LS)

def list_scheduling_load_balancing(machines, jobs):
    """
    List Scheduling 在线算法
    
    策略:将每个作业分配到当前负载最小的机器
    竞争比:2 - 1/m
    """
    for job in jobs:
        # 分配到当前负载最小的机器
        min_machine = min(range(len(machines)), key=lambda i: machines[i].load)
        machines[min_machine].add(job)
    
    return max(m.load for m in machines)

4.3 竞争比分析

定理:LS的竞争比是

证明

是最优makespan。

关键引理:对于任意机器 ,LS将其最终负载分解为两部分:

  1. 最后一个分配给 的作业 (因为 是全局下界)
  2. 其他作业的总和

因此

更精确的分析考虑作业分配顺序的影响,得到

4.4 竞争比下界

定理:任何确定性在线算法的竞争比至少为

构造:前 个作业大小为1,最后一个作业大小为

  • 最优离线:可以找到平衡分配,makespan =
  • 在线算法:前 个作业分散到 台机器,大作业必须放到某台已有负载1的机器,makespan =

比值趋于2当 增大时。

def worst_case_load_balancing(m, algorithm):
    """
    构造负载均衡的最坏输入
    """
    jobs = [1.0] * (m - 1)  # 小作业
    jobs.append(float(m))     # 大作业
    
    return algorithm(jobs, m)

5. Ski Rental问题

5.1 问题描述

Ski Rental 问题模拟在线租买决策:

问题设定

  • 滑雪 天( 未知)
  • 每天租金 元,或一次性购买
  • 每天结束后决定是否继续滑雪
  • 目标:最小化总花费

问题的应用场景

  1. 软件许可:按月订阅 vs 永久购买
  2. 云服务:按需付费 vs 预留实例
  3. 设备租赁:短期租用 vs 长期拥有
  4. 健身会员:次卡 vs 年卡

5.2 最优在线策略

关键洞察:存在一个最优的”买断点”。

def ski_rental_optimal(r, p, days_actual):
    """
    Ski Rental 的最优策略
    
    策略:在租了 p/r 天后购买装备
    竞争比:2 - r/p
    """
    days_rented = 0
    total_cost = 0
    
    while days_rented < days_actual:
        if days_rented * r >= p:
            # 购买
            total_cost = p
            break
        else:
            # 继续租
            total_cost = (days_rented + 1) * r
            days_rented += 1
    
    return total_cost
 
def ski_rental_optimal_with_known_rho(rho):
    """
    当租买价格比 ρ = p/r 已知时
    
    最优策略:租 floor(ρ) 天后购买
    """
    m = int(math.floor(rho))
    return min(m * r, p)

5.3 竞争比分析

竞争比分析

设最优租期为 天。

  • :只租不买,花费 ,在线算法同
  • :最优为购买,花费

在线算法花费:

竞争比

def ski_rental_competitive_ratio(r, p):
    """
    计算 Ski Rental 的竞争比上界
    """
    return max(
        max(d * r for d in range(1, int(p/r) + 1)),
        p + (p/r - 1) * r
    ) / p

5.4 贝叶斯扩展

服从分布 ,可以计算最优策略的期望竞争比。

def bayesian_ski_rental(r, p, distribution):
    """
    贝叶斯 Ski Rental
    
    已知滑雪天数的概率分布
    """
    import numpy as np
    
    # 离散分布
    probs = distribution['probabilities']
    days = distribution['days']
    
    # 动态规划计算最优买断点
    # dp[d] = 从第d天开始的最优期望成本
    dp = np.zeros(max(days) + 1)
    
    for d in range(max(days), -1, -1):
        if d == max(days):
            dp[d] = min(r, p)  # 最后一天
        else:
            rent_today = r + dp[d + 1]
            buy_now = p
            dp[d] = min(rent_today, buy_now)
    
    # 期望成本
    expected_cost = sum(dp[0] * p for p, d in zip(probs, days))
    
    return expected_cost

6. 缓存置换算法

6.1 问题定义

分页问题(Paging Problem):

  • 缓存有 个页框
  • 请求序列 (每个 是页号)
  • 缓存满时必须驱逐一页
  • 目标:最小化缓存未命中(page fault)次数

问题的应用场景

  1. 虚拟内存管理:页面置换
  2. CPU缓存:缓存行替换
  3. Web缓存:CDN内容缓存
  4. 数据库缓冲池:数据页置换

6.2 经典置换策略

FIFO(First In First Out)

class FIFOCache:
    """
    FIFO 缓存置换算法
    
    驱逐最早进入的页面
    竞争比:k(最坏情况)
    """
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = []
        self.queue = deque()
    
    def access(self, page):
        if page in self.cache:
            return True  # 命中
        
        # 未命中
        if len(self.cache) < self.capacity:
            self.cache.append(page)
            self.queue.append(page)
        else:
            # 驱逐队首
            evicted = self.queue.popleft()
            self.cache.remove(evicted)
            self.cache.append(page)
            self.queue.append(page)
        
        return False  # 未命中

LRU(Least Recently Used)

class LRUCache:
    """
    LRU 缓存置换算法
    
    驱逐最久未使用的页面
    竞争比:k(最坏情况)
    实际中表现良好
    
    O(1) 实现:双向链表 + 哈希表
    """
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = {}  # key -> node
        
        # 双向链表:head <-> ... <-> tail
        self.head = Node('head')
        self.tail = Node('tail')
        self.head.next = self.tail
        self.tail.prev = self.head
    
    def _remove(self, node):
        node.prev.next = node.next
        node.next.prev = node.prev
    
    def _add_to_front(self, node):
        node.next = self.head.next
        node.prev = self.head
        self.head.next.prev = node
        self.head.next = node
    
    def access(self, key):
        if key in self.cache:
            node = self.cache[key]
            self._remove(node)
            self._add_to_front(node)
            return True
        
        # 未命中
        if len(self.cache) >= self.capacity:
            # 驱逐最久未使用的(tail前一个)
            lru = self.tail.prev
            self._remove(lru)
            del self.cache[lru.key]
        
        new_node = Node(key)
        self.cache[key] = new_node
        self._add_to_front(new_node)
        
        return False

LFU(Least Frequently Used)

class LFUCache:
    """
    LFU 缓存置换算法
    
    驱逐使用频率最低的页面
    """
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = {}
        self.freq_table = {}  # freq -> {keys}
        self.min_freq = 0
    
    def access(self, page):
        if page in self.cache:
            # 更新频率
            freq = self.cache[page]['freq']
            self.cache[page]['freq'] = freq + 1
            
            # 从旧频率集合移除
            self.freq_table[freq].remove(page)
            if not self.freq_table[freq]:
                del self.freq_table[freq]
            
            # 添加到新频率集合
            new_freq = freq + 1
            if new_freq not in self.freq_table:
                self.freq_table[new_freq] = set()
            self.freq_table[new_freq].add(page)
            
            return True
        
        # 未命中
        if len(self.cache) >= self.capacity:
            # 驱逐频率最低的
            lfu_key = next(iter(self.freq_table[self.min_freq]))
            del self.cache[lfu_key]
            self.freq_table[self.min_freq].remove(lfu_key)
        
        self.cache[page] = {'freq': 1}
        if 1 not in self.freq_table:
            self.freq_table[1] = set()
        self.freq_table[1].add(page)
        self.min_freq = 1
        
        return False

6.3 竞争比分析

FIFO和LRU的竞争比

定理:FIFO和LRU的竞争比都是 (缓存大小)。

例子:请求序列 (重复)使得FIFO和LRU都产生每次未命中。

LRU的实用性

虽然最坏竞争比差,但实际中LRU表现良好。原因:

  1. 访问局部性原理
  2. 工作集大小通常小于缓存

Random算法的竞争比

import random
 
class RandomCache:
    """
    Random 缓存置换
    
    随机驱逐一个页面
    竞争比:k(与FIFO/LRU相同)
    但避免了某些病态序列
    """
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = set()
    
    def access(self, page):
        if page in self.cache:
            return True
        
        if len(self.cache) < self.capacity:
            self.cache.add(page)
        else:
            # 随机驱逐
            evicted = random.choice(list(self.cache))
            self.cache.remove(evicted)
            self.cache.add(page)
        
        return False

6.4 离线最优算法:LRU-OPT

Belady的MIN算法

class BeladyMINCache:
    """
    Belady's MIN (或 OPT) 算法
    
    驱逐将来最晚被使用的页面
    这是最优的离线算法
    
    竞争比:1(与最优相比)
    但需要预知未来
    """
    
    def __init__(self, capacity, future_requests=None):
        self.capacity = capacity
        self.cache = set()
        self.future = future_requests  # 预知的未来请求
    
    def access(self, page, future=None):
        if future is not None:
            self.future = future
        
        if page in self.cache:
            return True
        
        if len(self.cache) < self.capacity:
            self.cache.add(page)
        else:
            # 找到将来最晚使用的
            evict_page = None
            latest_use = -1
            
            for p in self.cache:
                # 找到 p 下次出现的位置
                if p in self.future:
                    next_use = self.future.index(p)
                else:
                    next_use = float('inf')  # 永不出现
            
                if next_use > latest_use:
                    latest_use = next_use
                    evict_page = p
            
            self.cache.remove(evict_page)
            self.cache.add(page)
        
        return False

7. 列表更新问题

7.1 问题定义

列表更新问题(List Update Problem):

  • 有序列表
  • 访问请求序列
  • 访问成本:列表中元素位置
  • 交换成本:移位操作
  • 目标:最小化总成本

应用场景

  1. 拼写检查:词典查找
  2. 文件系统:目录遍历
  3. 推荐系统:列表排序优化

7.2 经典算法

Move-To-Front (MTF)

class MoveToFrontList:
    """
    Move-To-Front (MTF) 算法
    
    策略:访问后移到列表前端
    竞争比:2(确定性)
    """
    
    def __init__(self, items):
        self.items = list(items)
    
    def access(self, item):
        # 找到位置
        idx = self.items.index(item)
        
        # 成本 = 位置(1-indexed)
        cost = idx + 1
        
        # 移到前端
        self.items.pop(idx)
        self.items.insert(0, item)
        
        return cost

Transpose

class TransposeList:
    """
    Transpose 算法
    
    策略:访问后与前一个元素交换
    """
    
    def access(self, item):
        idx = self.items.index(item)
        cost = idx + 1
        
        if idx > 0:
            self.items[idx - 1], self.items[idx] = self.items[idx], self.items[idx - 1]
        
        return cost

Frequency Count

class FrequencyCountList:
    """
    Frequency Count 算法
    
    策略:按访问频率排序
    离线版本:重排列表使常用项靠前
    """
    
    def __init__(self, items):
        self.items = list(items)
        self.count = {item: 0 for item in items}
    
    def access(self, item):
        idx = self.items.index(item)
        cost = idx + 1
        
        # 增加计数
        self.count[item] += 1
        
        return cost
    
    def reorganize(self):
        """重新组织列表(按频率)"""
        self.items.sort(key=lambda x: -self.count[x])

7.3 竞争分析

MTF的竞争比

定理:MTF是2-竞争的(对任意输入)。

直觉:MTF的优势来自局部性。访问模式的自相关越大,MTF表现越好。

MTF与LFU的比较

  • MTF:响应性强(立即调整)
  • LFU:历史导向(长期频率)

8. 多臂老虎机问题

8.1 问题定义

多臂老虎机(Multi-Armed Bandit)是在线学习中的核心问题:

问题设定

  • 个臂(选项)
  • 每个臂 有未知奖励分布
  • 每轮选择一个臂,获得奖励
  • 目标:最大化累积奖励
  • 核心挑战:探索与利用的权衡(exploration-exploitation tradeoff)

应用场景

  1. 推荐系统:探索新内容 vs 推荐已知喜欢的
  2. 临床试验:测试新药 vs 使用已知有效的药
  3. 广告投放:测试新广告 vs 投放高回报广告
  4. 网络路由:探索新路由 vs 使用已知最短路径

8.2 探索策略

随机探索(Random Exploration):

import random
 
class RandomExploration:
    """
    完全随机探索
    
    优点:渐近最优
    缺点:收敛极慢
    """
    
    def __init__(self, n_arms):
        self.n_arms = n_arms
    
    def select_arm(self):
        return random.randint(0, self.n_arms - 1)

贪婪策略(Greedy):

class EpsilonGreedy:
    """
    ε-贪婪算法
    
    以概率 ε 探索(随机选择)
    以概率 1-ε 利用(选择当前最佳)
    """
    
    def __init__(self, n_arms, epsilon=0.1):
        self.n_arms = n_arms
        self.epsilon = epsilon
        self.counts = [0] * n_arms
        self.values = [0.0] * n_arms
    
    def select_arm(self):
        if random.random() > self.epsilon:
            # 利用:选择当前估计值最高的臂
            return self.values.index(max(self.values))
        else:
            # 探索:随机选择
            return random.randint(0, self.n_arms - 1)
    
    def update(self, arm, reward):
        self.counts[arm] += 1
        n = self.counts[arm]
        
        # 增量更新估计值
        self.values[arm] = ((n - 1) * self.values[arm] + reward) / n

8.3 UCB算法

上置信界(Upper Confidence Bound):

import math
 
class UCB1:
    """
    UCB1 算法
    
    核心思想:平衡估计均值和不确定性
    UCB = 估计均值 + 置信上界
    
    竞争比:O(sqrt(log T))
    渐近最优
    """
    
    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.counts = [0] * n_arms
        self.values = [0.0] * n_arms
        self.total_counts = 0
    
    def select_arm(self):
        # 首先尝试每个臂一次
        for arm in range(self.n_arms):
            if self.counts[arm] == 0:
                return arm
        
        # 计算 UCB
        ucb_values = []
        for arm in range(self.n_arms):
            bonus = math.sqrt(
                2 * math.log(self.total_counts) / self.counts[arm]
            )
            ucb_values.append(self.values[arm] + bonus)
        
        return ucb_values.index(max(ucb_values))
    
    def update(self, arm, reward):
        self.total_counts += 1
        self.counts[arm] += 1
        n = self.counts[arm]
        
        self.values[arm] = ((n - 1) * self.values[arm] + reward) / n

UCB的理论分析

定理:UCB1的期望后悔值满足:

其中 是第 个臂与最优臂的差距。

8.4 Thompson采样

import numpy as np
from scipy import stats
 
class ThompsonSampling:
    """
    Thompson Sampling (Bayesian)
    
    假设每个臂的奖励服从 Beta(α, β) 分布
    每轮从后验分布采样,选择期望最高的臂
    """
    
    def __init__(self, n_arms, alpha_prior=1, beta_prior=1):
        self.n_arms = n_arms
        self.alpha = [alpha_prior] * n_arms
        self.beta = [beta_prior] * n_arms
    
    def select_arm(self):
        samples = []
        for arm in range(self.n_arms):
            # 从 Beta 分布采样
            sample = np.random.beta(self.alpha[arm], self.beta[arm])
            samples.append(sample)
        
        return samples.index(max(samples))
    
    def update(self, arm, reward):
        # 更新后验参数
        if reward == 1:
            self.alpha[arm] += 1
        else:
            self.beta[arm] += 1

9. 在线学习与后悔值

9.1 注册模型

注册模型(Experts Advice)是分析在线学习的通用框架:

class OnlineLearningFramework:
    """
    在线学习的注册模型
    
    每轮:
    1. 专家提出建议
    2. 算法选择一个专家
    3. 观察损失
    4. 更新
    """
    
    def __init__(self, n_experts):
        self.n_experts = n_experts
        self.weights = [1.0] * n_experts
    
    def predict(self):
        """加权平均预测"""
        total_weight = sum(self.weights)
        probs = [w / total_weight for w in self.weights]
        return random.choices(range(self.n_experts), weights=probs)[0]
    
    def update(self, expert, loss):
        """按损失更新权重"""
        self.weights[expert] *= (1 - loss)

9.2 加权多数算法

class WeightedMajority:
    """
    加权多数算法
    
    用于二分类问题
    """
    
    def __init__(self, n_experts, eta=0.5):
        self.n_experts = n_experts
        self.weights = [1.0] * n_experts
        self.eta = eta  # 学习率
    
    def predict(self):
        """加权投票"""
        total_weight = sum(self.weights)
        
        # 计算加权投票
        pos_weight = 0
        neg_weight = 0
        
        # 假设专家给出 +1 或 -1 预测
        for expert, weight in enumerate(self.weights):
            prediction = self.expert_predictions[expert]
            if prediction > 0:
                pos_weight += weight
            else:
                neg_weight += weight
        
        return 1 if pos_weight > neg_weight else -1
    
    def update(self, expert, prediction, actual):
        """更新权重"""
        if prediction != actual:
            self.weights[expert] *= (1 - self.eta)

9.3 Follow the Leader (FTL)

class FollowTheLeader:
    """
    Follow the Leader (FTL)
    
    每轮选择目前为止表现最好的专家
    """
    
    def __init__(self, n_experts):
        self.n_experts = n_experts
        self.cumulative_losses = [0.0] * n_experts
    
    def predict(self):
        """选择累积损失最小的专家"""
        return self.cumulative_losses.index(min(self.cumulative_losses))
    
    def update(self, expert, loss):
        self.cumulative_losses[expert] += loss

FTL的问题:容易过拟合当前数据,可能被对抗性输入击败。

9.4 Follow the Regularized Leader (FTRL)

class FTRLProximal:
    """
    Follow the Regularized Leader (FTRL-Proximal)
    
    添加正则化防止权重剧烈变化
    """
    
    def __init__(self, n_actions, learning_rate, regularization=1.0):
        self.n_actions = n_actions
        self.learning_rate = learning_rate
        self.regularization = regularization
        self.z = [0.0] * n_actions
    
    def predict(self):
        """选择最小化代理损失的行动"""
        best_action = 0
        best_value = float('inf')
        
        for action in range(self.n_actions):
            # L1 正则化代理
            value = self.z[action]
            if self.z[action] > 0:
                value -= self.learning_rate * self.regularization
            else:
                value += self.learning_rate * self.regularization
            
            if value < best_value:
                best_value = value
                best_action = action
        
        return best_action
    
    def update(self, action, gradient):
        self.z[action] += gradient

10. 随机化在线算法

10.1 随机化的优势

确定性在线算法在某些问题上无法改进,但随机化可以打破对称性:

例子:在线匹配

def randomized_online_matching(G):
    """
    随机化在线最大匹配
    
    竞争比:O(log n)  vs 确定性算法的 O(√n)
    """
    import random
    
    # 随机处理顺序
    edges = list(G.edges())
    random.shuffle(edges)
    
    M = set()
    
    for (u, v) in edges:
        if u not in M and v not in M:
            M.add((u, v))
    
    return M

10.2 Yao原理:期望竞争比下界

Yao原理连接随机化在线算法和输入分布:

一个随机化算法 的期望竞争比满足

对所有输入 成立,当且仅当对所有输入分布 , 存在确定性算法 使得

推论:要证明随机化算法的期望竞争比下界,只需构造一个输入分布,使得所有确定性算法的期望竞争比有一个下界。

def yao_principle_lower_bound(problem, distribution, num_samples=1000):
    """
    Yao原理:通过对输入分布求期望来获得随机化下界
    
    对于任何随机化算法,其期望竞争比 ≥ min_{deterministic A} E[竞争比]
    """
    best_deterministic_ratio = float('inf')
    
    # 穷举确定性算法(理论上)
    for algorithm in enumerate_deterministic_algorithms(problem):
        ratios = []
        
        for _ in range(num_samples):
            input_instance = distribution.sample()
            ratio = algorithm.cost(input_instance) / optimal.cost(input_instance)
            ratios.append(ratio)
        
        expected_ratio = mean(ratios)
        best_deterministic_ratio = min(best_deterministic_ratio, expected_ratio)
    
    return best_deterministic_ratio

10.3 随机化负载均衡

随机分配(Random Assignment):

def random_assignment(jobs, m):
    """
    随机分配作业到机器
    
    每轮随机选择一台机器分配作业
    """
    for job in jobs:
        machine = random.randint(0, m-1)
        machines[machine].add(job)

竞争比分析

个作业随机分配到 台机器。

每台机器的负载服从

使用 Chernoff 界,可以证明:

P(L_i > (1 + \epsilon) n/m) \leq e^{-\Omega(\epsilon^2 n/m))

总体竞争比:


11. 对抗性分析与鲁棒性

11.1 自适应对抗模型

更强的对抗者:对抗者可以看到在线算法的实际决策,动态调整后续输入。

  • 轻度自适应:对抗者在看到算法决策后生成下一输入
  • 完全自适应:对抗者完全知道算法并优化最坏情况
class AdaptiveAdversary:
    """
    自适应对抗者
    
    根据算法历史决策生成最坏输入
    """
    
    def __init__(self, algorithm):
        self.algorithm = algorithm
        self.history = []
    
    def generate_worst_input(self):
        """根据算法当前状态生成下一个输入"""
        # 分析算法行为模式
        pattern = analyze_algorithm_behavior(self.algorithm)
        
        # 构造针对该模式的输入
        worst_input = construct_adversarial_input(pattern, self.history)
        
        self.history.append(worst_input)
        return worst_input

11.2 排程问题的自适应下界

定理(对两台机器排程):任何确定性在线算法的竞争比至少为

构造:前两个作业大小为 ,根据算法分配决定第三个作业大小。

def scheduling_lower_bound_adversary(machines=2):
    """
    排程问题的自适应对抗构造
    """
    jobs = [1.0, epsilon]
    
    # 观察算法对前两个作业的分配
    assignment = observe_assignment(jobs[:2], machines)
    
    # 决定第三个作业大小
    if balanced_assignment(assignment):
        # 算法平衡分配,第三作业设为大的
        jobs.append(2.0)
    else:
        # 算法不平衡,第三作业设为利用这个不平衡
        jobs.append(1.0 + epsilon)
    
    return jobs

11.3 指数后退策略

对于某些问题,随机化是抵抗自适应对抗的唯一手段。

class RandomizedExponentialBackoff:
    """
    随机化指数后退
    
    用于防止自适应对抗的主动干扰
    """
    
    def __init__(self, base_delay, max_delay):
        self.base_delay = base_delay
        self.max_delay = max_delay
    
    def wait_time(self, attempt):
        """随机化等待时间"""
        max_wait = min(self.base_delay * (2 ** attempt), self.max_delay)
        return random.uniform(0, max_wait)

12. 在线算法在AI中的应用

12.1 推荐系统

用户交互数据流式到达,推荐算法必须在线更新模型:

场景在线算法核心挑战
BanditsUCB/Thompson Sampling探索利用权衡
流推荐在线协同过滤矩阵分解的在线更新
A/B测试在线实验框架多臂老虎机变体
上下文推荐LinUCB特征与奖励的线性关系
class OnlineRecommender:
    """
    在线推荐系统
    
    结合协同过滤和上下文老虎机
    """
    
    def __init__(self, n_items, context_dim):
        self.n_items = n_items
        self.context_dim = context_dim
        
        # 每个物品的线性模型参数
        self.A = [np.eye(context_dim) for _ in range(n_items)]
        self.b = [np.zeros(context_dim) for _ in range(n_items)]
        
        self.thetas = [np.zeros(context_dim) for _ in range(n_items)]
    
    def recommend(self, user_context, epsilon=0.1):
        """基于 LinUCB 的推荐"""
        if random.random() < epsilon:
            return random.randint(0, self.n_items - 1)
        
        ucb_values = []
        for item in range(self.n_items):
            # 估计均值
            theta = np.linalg.solve(self.A[item], self.b[item])
            mean = theta @ user_context
            
            # 置信上界
            uncertainty = np.sqrt(user_context @ np.linalg.inv(self.A[item]) @ user_context)
            ucb = mean + 2 * uncertainty
            
            ucb_values.append(ucb)
        
        return np.argmax(ucb_values)
    
    def update(self, item, user_context, reward):
        """更新模型"""
        self.A[item] += user_context.reshape(-1, 1) @ user_context.reshape(1, -1)
        self.b[item] += reward * user_context

12.2 自动驾驶决策

自动驾驶中的路径规划和控制都是在线问题:

  • 部分可观测环境
  • 决策必须在实时约束内完成
  • 使用模型预测控制(MPC)等在线优化技术
class ModelPredictiveControl:
    """
    模型预测控制(MPC)
    
    在线算法:每步求解有限时域优化问题
    """
    
    def __init__(self, horizon=10, dt=0.1):
        self.horizon = horizon
        self.dt = dt
    
    def solve(self, state, reference):
        """求解优化问题得到控制序列"""
        # 简化的MPC步骤
        trajectory = [state]
        
        for t in range(self.horizon):
            # 线性化系统(在线)
            A, B = linearize_dynamics(state)
            
            # QP求解(在线)
            u = solve_qp(A, B, reference, state)
            
            # 应用控制
            state = apply_control(state, u, self.dt)
            trajectory.append(state)
        
        return trajectory[1], u  # 返回第一个控制

12.3 大语言模型推理

LLM的自回归生成是典型的在线决策过程:

  • token逐个生成
  • 不能”预知”未来内容
  • 使用束搜索(beam search)进行在线剪枝
class BeamSearch:
    """
    束搜索
    
    在线决策:每步扩展 top-k 候选
    """
    
    def __init__(self, model, beam_width=5):
        self.model = model
        self.beam_width = beam_width
    
    def generate(self, prompt, max_length=100):
        """自回归生成"""
        beams = [(prompt, 0.0, False)]  # (sequence, score, done)
        
        for _ in range(max_length):
            candidates = []
            
            for seq, score, done in beams:
                if done:
                    candidates.append((seq, score, True))
                    continue
                
                # 扩展 top-k
                logits = self.model.predict(seq)
                top_k = self.model.top_k(logits, self.beam_width)
                
                for token, log_prob in top_k:
                    new_seq = seq + [token]
                    new_score = score + log_prob
                    done = token == EOS
                    candidates.append((new_seq, new_score, done))
            
            # 保留 top-k
            beams = sorted(candidates, key=lambda x: x[1] / len(x[0]))[-self.beam_width:]
            
            if all(done for _, _, done in beams):
                break
        
        return max(beams, key=lambda x: x[1] / len(x[0]))[0]

12.4 强化学习中的在线决策

class OnlineRLAgent:
    """
    在线强化学习代理
    
    结合探索与利用
    """
    
    def __init__(self, state_space, action_space, algorithm='q_learning'):
        self.state_space = state_space
        self.action_space = action_space
        self.algorithm = algorithm
        
        # Q表或策略
        self.q_values = np.zeros((state_space, action_space))
        self.visits = np.zeros((state_space, action_space))
    
    def select_action(self, state, epsilon=0.1):
        """ε-贪婪策略"""
        if random.random() < epsilon:
            return random.randint(0, self.action_space - 1)
        
        return np.argmax(self.q_values[state])
    
    def update(self, state, action, reward, next_state, learning_rate=0.1, gamma=0.9):
        """TD更新"""
        if self.algorithm == 'q_learning':
            # 异策略:使用最优下一状态的Q值
            td_target = reward + gamma * np.max(self.q_values[next_state])
        else:
            # 同策略:使用当前策略的Q值
            td_target = reward + gamma * self.q_values[next_state, self.select_action(next_state)]
        
        td_error = td_target - self.q_values[state, action]
        self.q_values[state, action] += learning_rate * td_error
        self.visits[state, action] += 1

13. 数据流算法

13.1 数据流模型

数据流算法研究数据以高速流形式到达时的计算问题:

class DataStreamAlgorithm:
    """
    数据流算法框架
    
    约束:
    - 只能顺序扫描数据一次(或有限次)
    - 有限工作内存
    - 必须产生近似解
    """
    
    def __init__(self, stream, window_size=None):
        self.stream = stream
        self.window_size = window_size
        self.memory = {}  # 有限状态
    
    def process(self):
        """处理数据流"""
        for item in self.stream:
            self.update(item)
            if self.window_size:
                self.evict_old_items()
    
    def update(self, item):
        """更新状态"""
        raise NotImplementedError
    
    def query(self):
        """返回答案"""
        raise NotImplementedError

13.2 频率估计

class CountMinSketch:
    """
    Count-Min Sketch
    
    用于估计数据流中元素的出现频率
    空间:O(log(1/δ) * log(1/ε))
    """
    
    def __init__(self, epsilon, delta):
        self.epsilon = epsilon
        self.delta = delta
        
        # 哈希函数数量和宽度
        self.num_hashes = int(math.ceil(math.log(1 / delta)))
        self.width = int(math.ceil(math.e / epsilon))
        
        # 计数器数组
        self.counters = [[0] * self.width for _ in range(self.num_hashes)]
        
        # 哈希函数
        self.hash_functions = [
            self._generate_hash(i) for i in range(self.num_hashes)
        ]
    
    def _generate_hash(self, seed):
        """生成哈希函数 h(x) = (ax + b) mod width"""
        a = random.randint(1, self.width - 1)
        b = random.randint(0, self.width - 1)
        return lambda x: (a * hash(x) + b) % self.width
    
    def update(self, item):
        """增加计数"""
        for i, h in enumerate(self.hash_functions):
            self.counters[i][h(item)] += 1
    
    def estimate(self, item):
        """估计频率(可能偏高)"""
        estimates = [self.counters[i][self.hash_functions[i](item)] 
                     for i in range(self.num_hashes)]
        return min(estimates)  # Count-Min: 取最小估计

13.3 基数估计

class HyperLogLog:
    """
    HyperLogLog
    
    基数估计:估计集合中不同元素的数量
    空间:O(log log n) 比特每元素
    
    标准误差:约 1.04 / sqrt(m)
    """
    
    def __init__(self, p=10):
        self.p = p  # 寄存器数量 = 2^p
        self.m = 1 << p
        self.registers = [0] * self.m
    
    def add(self, item):
        """添加元素"""
        # 哈希
        h = hash(item)
        
        # 低p位确定桶
        j = h & (self.m - 1)
        
        # 高 (64-p) 位找第一个1的位置
        w = h >> self.p
        rho = 64 - self.p - int(w).bit_length() + 1 if w != 0 else 64 - self.p + 1
        
        # 更新寄存器
        self.registers[j] = max(self.registers[j], rho)
    
    def estimate(self):
        """估计基数"""
        import math
        
        # 调和平均
        m = self.m
        alpha_m = 0.7213 / (1 + 1.079 / m) if m >= 128 else self._alpha(m)
        
        Z = 1.0 / sum(2 ** -self.registers[j] for j in range(m))
        E = alpha_m * m * m * Z
        
        # 修正小基数和大基数
        if E <= 2.5 * m:
            # 使用线性计数
            zeros = self.registers.count(0)
            if zeros > 0:
                return m * math.log(m / zeros)
        
        return E
    
    def _alpha(self, m):
        """计算 alpha_m"""
        if m == 16:
            return 0.673
        elif m == 32:
            return 0.697
        elif m == 64:
            return 0.709
        else:
            return 0.7213 / (1 + 1.079 / m)

14. 核心算法对比表

问题确定性竞争比随机化竞争比下界
秘书问题1/e(随机顺序)1/e1/e
在线装箱1.71.6911.531
负载均衡
Ski Rental
在线匹配
缓存置换(k大小)kk
列表更新2
多臂老虎机N/A(后悔值)

15. 在线算法的设计原则

15.1 保守 vs 冒险策略

保守策略:优先稳定,避免高损失

def conservative_strategy(state, options):
    """
    保守策略:最小化最坏情况
    """
    worst_case_costs = []
    
    for option in options:
        worst_cost = max(simulate_worst_case(state, option))
        worst_case_costs.append(worst_cost)
    
    return options[np.argmin(worst_case_costs)]

冒险策略:追求高收益,接受高风险

def aggressive_strategy(state, options):
    """
    冒险策略:最大化最好情况
    """
    best_case_gains = []
    
    for option in options:
        best_gain = max(simulate_best_case(state, option))
        best_case_gains.append(best_gain)
    
    return options[np.argmax(best_case_gains)]

15.2 延迟决策原则

尽可能延迟不可逆决策

class DeferredDecision:
    """
    延迟决策策略
    
    尽可能收集信息后再做决定
    """
    
    def __init__(self, max_buffer=100):
        self.buffer = []
        self.max_buffer = max_buffer
    
    def add_request(self, request):
        self.buffer.append(request)
        
        if len(self.buffer) >= self.max_buffer:
            return self._batch_decide()
        
        return None
    
    def _batch_decide(self):
        """批量处理积累的请求"""
        decision = optimize_batch(self.buffer)
        self.buffer = []
        return decision

15.3 资源预留策略

class ResourceReservation:
    """
    资源预留策略
    
    预留一定比例的资源用于探索/保险
    """
    
    def __init__(self, total_resources, reserve_fraction=0.1):
        self.total = total_resources
        self.reserve = total_resources * reserve_fraction
        self.available = total_resources - self.reserve
    
    def allocate(self, request, min_required):
        """
        分配资源,保留保险储量
        """
        if min_required > self.available:
            # 不能满足需求
            return None
        
        allocation = min(request, self.available)
        self.available -= allocation
        
        return allocation
    
    def release(self, amount):
        """释放资源"""
        self.available = min(self.available + amount, self.total - self.reserve)

参考来源

  1. Borodin, A., & El-Yaniv, R. (1998). Online Computation and Competitive Analysis. Cambridge University Press.
  2. Sleator, D. D., & Tarjan, R. E. (1985). Amortized Efficiency of List Update and Paging Rules. CACM.
  3. Graham, R. L. (1966). Bounds for Certain Multiprocessing Anomalies. Bell System Technical Journal.
  4. Karp, R. M. (1992). On-Line Algorithms. On-Line Algorithms.
  5. Mehta, A., Saberi, A., Vazirani, U., & Vazirani, V. (2005). AdWords and Generalized Online Matching. JACM.
  6. Belady, L. A. (1966). A Study of Replacement Algorithms for Virtual-Storage Computer. IBM Systems Journal.
  7. Auer, P., Cesa-Bianchi, N., & Fischer, P. (2002). Finite-time Analysis of the Multiarmed Bandit Problem. Machine Learning.
  8. Lai, T. L., & Robbins, H. (1985). Asymptotically Efficient Adaptive Allocation Rules. Advances in Applied Mathematics.
  9. Bansal, N., & Pruhs, K. (2010). The Geometry of Online Packing Linear Programs. Math. Oper. Res.
  10. Buchbinder, N., & Naor, J. (2009). The Design of Competitive Online Algorithms via a Primal-Dual Approach. Foundations and Trends in Theoretical Computer Science.

相关文档