在线算法详解
关键词速览
| 关键词 | 解释 |
|---|---|
| 竞争分析 | 在线算法与最优离线算法的比较框架 |
| 竞争比 | 在线算法性能与最优算法的比值上界 |
| 最优在线决策 | 基于部分信息的顺序决策问题 |
| Secretary Problem | 经典的选择最优候选人问题 |
| 在线装箱 | 物品序列的在线分配 |
| 负载均衡 | 任务分配到多台机器 |
| Ski Rental | 在线租买决策问题 |
| 对抗性输入 | 分析在线算法的输入模型 |
| 随机化在线 | 使用随机性的在线算法 |
| Yao原理 | 期望竞争比的下界证明 |
| 缓存置换 | 页面替换的在线策略 |
| LRU | 最近最少使用策略 |
| 分页算法 | 虚拟内存管理 |
| 列表更新 | 自我组织的线性搜索 |
| 多臂老虎机 | 探索利用权衡的数学模型 |
| 上置信界 | UCB算法 |
| 注册模型 | 在线学习的形式化框架 |
| 后悔值 | 在线学习与最优的差距 |
| 次线性算法 | 只需次线性信息访问的算法 |
| 流算法 | 数据流模型下的在线处理 |
摘要
在线算法研究在信息不完整条件下进行决策的理论与实践。与离线算法拥有全部输入不同,在线算法必须在收到每个输入后立即做出不可撤回的决定。本文系统阐述竞争分析框架,通过经典案例(秘书问题、在线装箱、负载均衡、Ski Rental问题)展示问题建模与算法设计技术,并探讨随机化在线算法的优势。在线算法的思想广泛应用于操作系统调度、网络路由、搜索引擎排序和推荐系统等AI领域。
在线算法的研究起源于20世纪70年代的”表调度”(list scheduling)和”分页”(paging)问题。1975年,Graham对调度问题的开创性工作引入了竞争分析的概念。1979年,Sleator和Tarjan发表了关于自我调整列表(self-adjusting lists)的经典论文,开启了在线算法理论分析的序幕。此后,Karp、Borodin和El-Yaniv等学者系统发展了竞争分析的理论框架。
本文将覆盖以下内容:首先是竞争分析的数学基础与输入模型;其次是秘书问题、在线装箱、负载均衡等经典问题的深入分析;然后是缓存置换算法、列表更新等操作系统中的在线问题;接着探讨多臂老虎机等在线学习框架;再讨论随机化在线算法与Yao原理;最后展示在线算法在AI和现代计算中的应用。
1. 在线算法的基本框架
1.1 在线与离线问题的对比
| 特征 | 离线算法 | 在线算法 |
|---|---|---|
| 输入 | 完整输入提前已知 | 逐步到达 |
| 决策时机 | 全局优化后决策 | 即时决策 |
| 后悔度 | 无 | 有 |
| 分析框架 | 近似比/精确性 | 竞争分析 |
| 可逆性 | 决策可修改 | 决策不可撤回 |
| 信息量 | 完备信息 | 部分信息 |
核心挑战:在线算法必须在信息不完全时做出选择,且决策通常不可撤销。
在线性(Onliness)的来源:
- 时间在线性:数据随时间到达,必须实时处理
- 空间在线性:数据存储超出内存,需要决定保留什么
- 结构在线性:图的顶点逐步暴露,边连接时需决策
在线性程度的光谱:
完全离线 ←————————————————————→ 完全在线
| |
| 半在线(部分信息已知) |
| | |
| 批处理(有限 lookahead) |
| 预约式(可延迟) |
| |
←————————————— 决策可逆 ———————————————→
1.2 竞争分析的定义
竞争分析是在线算法性能评估的标准方法。设 为算法 在输入 上的代价, 为最优离线算法在相同输入上的代价。
竞争比的定义:
一个在线算法 是 -竞争的,当且仅当存在常数 使得
对所有输入 成立。
渐进竞争比的定义:
当 时,若 ,则算法是渐近 -竞争的。
Note
通常关注渐进竞争比,即忽略常数 ,要求 对足够大的输入成立。
最小化 vs 最大化问题:
- 最小化问题:()
- 最大化问题:()
1.3 输入模型分类
| 模型 | 描述 | 分析方法 | 例子 |
|---|---|---|---|
| 对抗性模型 | 输入由敌手生成,最坏情况 | 竞争比 | 经典竞争分析 |
| 随机模型 | 输入服从已知分布 | 期望竞争比 | 概率分析 |
| 确定性子分布 | 输入来自任意分布 | 通用性 | Yao原理 |
| 自适应对抗 | 敌手可观测算法决策 | 动态策略 | 追踪分析 |
对抗性模型的强度层次:
- ** oblivious 对抗**:在看到算法决策之前固定整个输入
- 轻度自适应对抗:每次决策后生成下一个输入
- 完全自适应对抗:知道算法的完整历史并优化最坏情况
def adversarial_input_generation(problem, algorithm):
"""
对抗性输入生成器
oblivious: 一次性生成所有输入
adaptive: 根据算法决策生成下一输入
"""
if model == 'oblivious':
# 敌手在算法开始前生成完整输入
input_sequence = generate_worst_case_input(problem, algorithm)
return algorithm.process(input_sequence)
elif model == 'adaptive':
# 敌手根据算法决策动态生成输入
remaining = generate_full_input(problem)
while not algorithm.is_done():
next_input = remaining.pop_adaptive(algorithm.state)
algorithm.process_single(next_input)
return algorithm.total_cost()1.4 竞争分析的数学基础
竞争比的上界:构造性地证明算法在任意输入下表现良好。
竞争比的下界:证明不存在任何在线算法能超过某个竞争比。
分离引理(Lower Bound Separator):
若要证明问题 的竞争比下界为 ,需要:
- 构造一族输入实例
- 对任何在线算法 ,存在 使得
2. 秘书问题:最优停止理论
2.1 问题描述
秘书问题(Secretary Problem / Best Choice Problem)是最优停止理论的经典案例:
问题设定:
- 面试 个候选人
- 候选人以随机顺序到达
- 每次面试后必须立即决定是否录用
- 一旦拒绝不可召回
- 目标:选择最佳候选人的概率最大
为什么称为”秘书问题”:
这个问题最早由Martin Gardner于1960年在《科学美国人》上提出,模拟的是选择最佳秘书的场景。现在已扩展到招聘、购房、投资等”在有限选项中做一次性选择”的场景。
问题的数学形式化:
设 是候选人的独立同分布随机变量(表示质量/适配度),排列统计量为 。目标是最大化选中 的概率。
2.2 最优策略:观察-选择规则
def secretary_strategy(n, observe_fraction=1/math.e):
"""
秘书问题的最优策略
策略:观察前 r 个候选人,但不选择
从第 r+1 个开始,选择第一个比之前所有候选人都好的
参数:r = floor(n/e) 时成功概率最大,约为 1/e ≈ 0.3679
"""
import math
r = int(n * observe_fraction)
best_observed = -float('inf')
for i in range(1, n + 1):
rating = get_candidate_rating(i) # 假设有这个函数
if i <= r:
# 观察期:只记录,不选择
best_observed = max(best_observed, rating)
else:
# 选择期:选择第一个超过历史最佳的
if rating > best_observed:
return i # 选择第 i 个候选人
# 如果没有更好的,选择最后一个
return n渐进最优策略:
def secretary_strategy_asymptotic(n):
"""
渐近最优策略
随着 n 增大,1/e 是最优的观察比例
"""
import math
r = int(n / math.e)
# 其余逻辑同上
return r2.3 最优观察期分析
定理:当 时,成功概率收敛到 。
推导:
设选中的候选人是第 个,且被选中当且仅当它是最佳且 。
由于随机顺序,第 个是最佳的先验概率是 。
在观察到 时,前面 个候选人中最佳的排名是均匀分布的 。因此它是前 个最佳的概率是 。
当 足够大且 时,
最大化 得到 ,此时成功概率为 。
Example
招聘场景:面试100人,观察前37人不录用,从第38人开始录用第一个比之前所有都优秀的人,成功概率约36.8%。
2.4 秘书问题的变种
变种1:允许选择多个候选人
def secretary_k_choices(n, k, m=None):
"""
选择最优的 k 个候选人
m: 观察期长度,通常 m = n * k / (k+1)
"""
import math
if m is None:
m = int(n * k / (k + 1))
# 观察期
top_k_observed = get_top_k(n, k)
# 选择期:选前 k 个超过观察期最佳的选择
selected = []
threshold = min(top_k_observed)
for i in range(m + 1, n + 1):
if get_candidate_rating(i) > threshold:
selected.append(i)
if len(selected) == k:
break
return selected变种2:伯努利秘书问题
候选人以一定概率 被录用(不录用则继续),目标是最大化被录用且最优的概率。
def bernoulli_secretary(n, p):
"""
伯努利秘书问题
每次面试后,以概率 p 决定是否录用
"""
import math
r = int(n / math.e)
best_observed = -float('inf')
for i in range(1, n + 1):
rating = get_candidate_rating(i)
if i <= r:
best_observed = max(best_observed, rating)
else:
if rating > best_observed and random.random() < p:
return i
return n变种3:带价值的秘书问题
每个候选人不仅有质量还有雇佣成本,目标是最大化质量-成本比。
3. 在线装箱问题
3.1 问题定义
在线装箱(Online Bin Packing)问题:
- 输入:物品序列 ,每个物品大小
- 目标:将物品装入最少数量的箱子(容量为1)
问题的应用场景:
- 内存分配:将进程加载到内存页框
- 集装箱装载:货物装箱问题
- 资源分配:云服务器资源调度
- 数据包分片:网络传输中的分片重组
3.2 经典在线装箱算法
Next Fit (NF):
class NextFitBinPacker:
"""
Next Fit 算法
策略:当前箱子装不下时开新箱
竞争比:2
优点:简单,O(1) 空间
缺点:浪费空间
"""
def __init__(self, capacity=1.0):
self.capacity = capacity
self.current_bin = []
self.current_load = 0
self.num_bins = 0
def add_item(self, size):
if size > self.capacity:
raise ValueError("Item too large")
if self.current_load + size <= self.capacity:
self.current_bin.append(size)
self.current_load += size
else:
# 开新箱
self.num_bins += 1
self.current_bin = [size]
self.current_load = size
def close(self):
"""关闭当前箱子"""
if self.current_bin:
self.num_bins += 1
self.current_bin = []
self.current_load = 0Best Fit (BF):
class BestFitBinPacker:
"""
Best Fit 算法
策略:将物品放入能容纳它且当前最满的箱子
竞争比:1.7
"""
def __init__(self, capacity=1.0):
self.capacity = capacity
self.bins = [] # (load, bin_id)
self.bin_contents = {}
def add_item(self, size):
# 找到最满且能容纳物品的箱子
best_idx = -1
best_load = self.capacity + 1
for idx, (load, _) in enumerate(self.bins):
if load + size <= self.capacity and load + size > best_load:
best_load = load + size
best_idx = idx
if best_idx == -1:
# 创建新箱
self.bins.append((size, len(self.bins)))
self.bin_contents[len(self.bins) - 1] = [size]
else:
# 更新最佳箱子
old_load, bin_id = self.bins[best_idx]
self.bins[best_idx] = (old_load + size, bin_id)
self.bin_contents[bin_id].append(size)First Fit (FF):
class FirstFitBinPacker:
"""
First Fit 算法
策略:从第一个箱子开始找第一个能容纳物品的
竞争比:1.7
实现:使用有序结构优化到 O(n log n)
"""
def __init__(self, capacity=1.0):
self.capacity = capacity
# 按负载排序的箱子
self.bins = [] # [(load, bin_id, contents)]
def _find_bin(self, size):
"""二分查找能容纳物品的最左边箱子"""
lo, hi = 0, len(self.bins)
while lo < hi:
mid = (lo + hi) // 2
if self.bins[mid][0] + size <= self.capacity:
hi = mid
else:
lo = mid + 1
return lo if lo < len(self.bins) and self.bins[lo][0] + size <= self.capacity else -1
def add_item(self, size):
idx = self._find_bin(size)
if idx == -1:
# 创建新箱
self.bins.append((size, len(self.bins), [size]))
else:
# 更新箱子
load, bin_id, contents = self.bins[idx]
self.bins[idx] = (load + size, bin_id, contents + [size])Harmonic Fit:
class HarmonicFitBinPacker:
"""
Harmonic Fit 算法
策略:按物品大小分类处理
将大小区间 (1/(k+1), 1/k] 的物品分组处理
竞争比:1.691...
"""
def __init__(self, capacity=1.0, k=3):
self.capacity = capacity
self.k = k
self.bins = [[] for _ in range(k)]
def _get_class(self, size):
"""物品属于哪个类别"""
for i in range(1, self.k + 1):
if size > 1.0 / (i + 1) and size <= 1.0 / i:
return i - 1
return self.k
def add_item(self, size):
item_class = self._get_class(size)
self.bins[item_class].append(size)3.3 竞争比分析
Next Fit 的竞争比:
定理:NF的渐近竞争比是2。
证明:设 是最优箱子数。任何NF创建的箱子(除了最后一个)至少半满。因此:
FF 和 BF 的竞争比:
定理:FF和BF的竞争比是 (更精确地,)。
Harmonic Fit 的竞争比:
| 变种 | 竞争比 |
|---|---|
| Harmonic | |
| Refined Harmonic | |
| Asymptotic PTAS |
3.4 离线近似算法:FFD
FFD是一个经典的离线近似算法:
def ffd(items, capacity=1.0):
"""
First Fit Decreasing (FFD)
1. 按物品大小降序排列
2. 依次使用 First Fit 策略装箱
近似比:11/9 * OPT + 6/9 ≈ 1.222 * OPT
"""
# 按大小降序排序
sorted_items = sorted(items, reverse=True)
# 使用 First Fit
packer = FirstFitBinPacker(capacity)
for item in sorted_items:
packer.add_item(item)
return packer.num_bins()4. 在线负载均衡
4.1 问题定义
在线负载均衡(Online Load Balancing):
- 台机器
- 作业序列 依次到达
- 作业 有处理时间
- 目标:最小化最大机器负载(makespan)
问题的应用场景:
- 云计算调度:将任务分配到服务器
- 数据中心管理:虚拟机迁移
- GPU调度:深度学习训练任务分配
- 实时系统:任务优先级调度
4.2 Graham的List Scheduling算法
Graham的LPT算法(离线):按处理时间降序分配
def lpt_load_balancing(jobs, m):
"""
LPT (Longest Processing Time first) 离线算法
近似比:4/3 - 1/(3m)
"""
sorted_jobs = sorted(jobs, reverse=True)
loads = [0] * m
for job in sorted_jobs:
min_machine = loads.index(min(loads))
loads[min_machine] += job
return max(loads)在线List Scheduling (LS):
def list_scheduling_load_balancing(machines, jobs):
"""
List Scheduling 在线算法
策略:将每个作业分配到当前负载最小的机器
竞争比:2 - 1/m
"""
for job in jobs:
# 分配到当前负载最小的机器
min_machine = min(range(len(machines)), key=lambda i: machines[i].load)
machines[min_machine].add(job)
return max(m.load for m in machines)4.3 竞争比分析
定理:LS的竞争比是 。
证明:
设 是最优makespan。
关键引理:对于任意机器 ,LS将其最终负载分解为两部分:
- 最后一个分配给 的作业 :(因为 是全局下界)
- 其他作业的总和
因此 。
更精确的分析考虑作业分配顺序的影响,得到 。
4.4 竞争比下界
定理:任何确定性在线算法的竞争比至少为 。
构造:前 个作业大小为1,最后一个作业大小为 。
- 最优离线:可以找到平衡分配,makespan =
- 在线算法:前 个作业分散到 台机器,大作业必须放到某台已有负载1的机器,makespan =
比值趋于2当 增大时。
def worst_case_load_balancing(m, algorithm):
"""
构造负载均衡的最坏输入
"""
jobs = [1.0] * (m - 1) # 小作业
jobs.append(float(m)) # 大作业
return algorithm(jobs, m)5. Ski Rental问题
5.1 问题描述
Ski Rental 问题模拟在线租买决策:
问题设定:
- 滑雪 天( 未知)
- 每天租金 元,或一次性购买 元
- 每天结束后决定是否继续滑雪
- 目标:最小化总花费
问题的应用场景:
- 软件许可:按月订阅 vs 永久购买
- 云服务:按需付费 vs 预留实例
- 设备租赁:短期租用 vs 长期拥有
- 健身会员:次卡 vs 年卡
5.2 最优在线策略
关键洞察:存在一个最优的”买断点”。
def ski_rental_optimal(r, p, days_actual):
"""
Ski Rental 的最优策略
策略:在租了 p/r 天后购买装备
竞争比:2 - r/p
"""
days_rented = 0
total_cost = 0
while days_rented < days_actual:
if days_rented * r >= p:
# 购买
total_cost = p
break
else:
# 继续租
total_cost = (days_rented + 1) * r
days_rented += 1
return total_cost
def ski_rental_optimal_with_known_rho(rho):
"""
当租买价格比 ρ = p/r 已知时
最优策略:租 floor(ρ) 天后购买
"""
m = int(math.floor(rho))
return min(m * r, p)5.3 竞争比分析
竞争比分析:
设最优租期为 天。
- 若 :只租不买,花费 ,在线算法同
- 若 :最优为购买,花费
在线算法花费:
竞争比:
def ski_rental_competitive_ratio(r, p):
"""
计算 Ski Rental 的竞争比上界
"""
return max(
max(d * r for d in range(1, int(p/r) + 1)),
p + (p/r - 1) * r
) / p5.4 贝叶斯扩展
若 服从分布 ,可以计算最优策略的期望竞争比。
def bayesian_ski_rental(r, p, distribution):
"""
贝叶斯 Ski Rental
已知滑雪天数的概率分布
"""
import numpy as np
# 离散分布
probs = distribution['probabilities']
days = distribution['days']
# 动态规划计算最优买断点
# dp[d] = 从第d天开始的最优期望成本
dp = np.zeros(max(days) + 1)
for d in range(max(days), -1, -1):
if d == max(days):
dp[d] = min(r, p) # 最后一天
else:
rent_today = r + dp[d + 1]
buy_now = p
dp[d] = min(rent_today, buy_now)
# 期望成本
expected_cost = sum(dp[0] * p for p, d in zip(probs, days))
return expected_cost6. 缓存置换算法
6.1 问题定义
分页问题(Paging Problem):
- 缓存有 个页框
- 请求序列 (每个 是页号)
- 缓存满时必须驱逐一页
- 目标:最小化缓存未命中(page fault)次数
问题的应用场景:
- 虚拟内存管理:页面置换
- CPU缓存:缓存行替换
- Web缓存:CDN内容缓存
- 数据库缓冲池:数据页置换
6.2 经典置换策略
FIFO(First In First Out):
class FIFOCache:
"""
FIFO 缓存置换算法
驱逐最早进入的页面
竞争比:k(最坏情况)
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = []
self.queue = deque()
def access(self, page):
if page in self.cache:
return True # 命中
# 未命中
if len(self.cache) < self.capacity:
self.cache.append(page)
self.queue.append(page)
else:
# 驱逐队首
evicted = self.queue.popleft()
self.cache.remove(evicted)
self.cache.append(page)
self.queue.append(page)
return False # 未命中LRU(Least Recently Used):
class LRUCache:
"""
LRU 缓存置换算法
驱逐最久未使用的页面
竞争比:k(最坏情况)
实际中表现良好
O(1) 实现:双向链表 + 哈希表
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = {} # key -> node
# 双向链表:head <-> ... <-> tail
self.head = Node('head')
self.tail = Node('tail')
self.head.next = self.tail
self.tail.prev = self.head
def _remove(self, node):
node.prev.next = node.next
node.next.prev = node.prev
def _add_to_front(self, node):
node.next = self.head.next
node.prev = self.head
self.head.next.prev = node
self.head.next = node
def access(self, key):
if key in self.cache:
node = self.cache[key]
self._remove(node)
self._add_to_front(node)
return True
# 未命中
if len(self.cache) >= self.capacity:
# 驱逐最久未使用的(tail前一个)
lru = self.tail.prev
self._remove(lru)
del self.cache[lru.key]
new_node = Node(key)
self.cache[key] = new_node
self._add_to_front(new_node)
return FalseLFU(Least Frequently Used):
class LFUCache:
"""
LFU 缓存置换算法
驱逐使用频率最低的页面
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = {}
self.freq_table = {} # freq -> {keys}
self.min_freq = 0
def access(self, page):
if page in self.cache:
# 更新频率
freq = self.cache[page]['freq']
self.cache[page]['freq'] = freq + 1
# 从旧频率集合移除
self.freq_table[freq].remove(page)
if not self.freq_table[freq]:
del self.freq_table[freq]
# 添加到新频率集合
new_freq = freq + 1
if new_freq not in self.freq_table:
self.freq_table[new_freq] = set()
self.freq_table[new_freq].add(page)
return True
# 未命中
if len(self.cache) >= self.capacity:
# 驱逐频率最低的
lfu_key = next(iter(self.freq_table[self.min_freq]))
del self.cache[lfu_key]
self.freq_table[self.min_freq].remove(lfu_key)
self.cache[page] = {'freq': 1}
if 1 not in self.freq_table:
self.freq_table[1] = set()
self.freq_table[1].add(page)
self.min_freq = 1
return False6.3 竞争比分析
FIFO和LRU的竞争比:
定理:FIFO和LRU的竞争比都是 (缓存大小)。
例子:请求序列 (重复)使得FIFO和LRU都产生每次未命中。
LRU的实用性:
虽然最坏竞争比差,但实际中LRU表现良好。原因:
- 访问局部性原理
- 工作集大小通常小于缓存
Random算法的竞争比:
import random
class RandomCache:
"""
Random 缓存置换
随机驱逐一个页面
竞争比:k(与FIFO/LRU相同)
但避免了某些病态序列
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = set()
def access(self, page):
if page in self.cache:
return True
if len(self.cache) < self.capacity:
self.cache.add(page)
else:
# 随机驱逐
evicted = random.choice(list(self.cache))
self.cache.remove(evicted)
self.cache.add(page)
return False6.4 离线最优算法:LRU-OPT
Belady的MIN算法:
class BeladyMINCache:
"""
Belady's MIN (或 OPT) 算法
驱逐将来最晚被使用的页面
这是最优的离线算法
竞争比:1(与最优相比)
但需要预知未来
"""
def __init__(self, capacity, future_requests=None):
self.capacity = capacity
self.cache = set()
self.future = future_requests # 预知的未来请求
def access(self, page, future=None):
if future is not None:
self.future = future
if page in self.cache:
return True
if len(self.cache) < self.capacity:
self.cache.add(page)
else:
# 找到将来最晚使用的
evict_page = None
latest_use = -1
for p in self.cache:
# 找到 p 下次出现的位置
if p in self.future:
next_use = self.future.index(p)
else:
next_use = float('inf') # 永不出现
if next_use > latest_use:
latest_use = next_use
evict_page = p
self.cache.remove(evict_page)
self.cache.add(page)
return False7. 列表更新问题
7.1 问题定义
列表更新问题(List Update Problem):
- 有序列表
- 访问请求序列
- 访问成本:列表中元素位置
- 交换成本:移位操作
- 目标:最小化总成本
应用场景:
- 拼写检查:词典查找
- 文件系统:目录遍历
- 推荐系统:列表排序优化
7.2 经典算法
Move-To-Front (MTF):
class MoveToFrontList:
"""
Move-To-Front (MTF) 算法
策略:访问后移到列表前端
竞争比:2(确定性)
"""
def __init__(self, items):
self.items = list(items)
def access(self, item):
# 找到位置
idx = self.items.index(item)
# 成本 = 位置(1-indexed)
cost = idx + 1
# 移到前端
self.items.pop(idx)
self.items.insert(0, item)
return costTranspose:
class TransposeList:
"""
Transpose 算法
策略:访问后与前一个元素交换
"""
def access(self, item):
idx = self.items.index(item)
cost = idx + 1
if idx > 0:
self.items[idx - 1], self.items[idx] = self.items[idx], self.items[idx - 1]
return costFrequency Count:
class FrequencyCountList:
"""
Frequency Count 算法
策略:按访问频率排序
离线版本:重排列表使常用项靠前
"""
def __init__(self, items):
self.items = list(items)
self.count = {item: 0 for item in items}
def access(self, item):
idx = self.items.index(item)
cost = idx + 1
# 增加计数
self.count[item] += 1
return cost
def reorganize(self):
"""重新组织列表(按频率)"""
self.items.sort(key=lambda x: -self.count[x])7.3 竞争分析
MTF的竞争比:
定理:MTF是2-竞争的(对任意输入)。
直觉:MTF的优势来自局部性。访问模式的自相关越大,MTF表现越好。
MTF与LFU的比较:
- MTF:响应性强(立即调整)
- LFU:历史导向(长期频率)
8. 多臂老虎机问题
8.1 问题定义
多臂老虎机(Multi-Armed Bandit)是在线学习中的核心问题:
问题设定:
- 个臂(选项)
- 每个臂 有未知奖励分布
- 每轮选择一个臂,获得奖励
- 目标:最大化累积奖励
- 核心挑战:探索与利用的权衡(exploration-exploitation tradeoff)
应用场景:
- 推荐系统:探索新内容 vs 推荐已知喜欢的
- 临床试验:测试新药 vs 使用已知有效的药
- 广告投放:测试新广告 vs 投放高回报广告
- 网络路由:探索新路由 vs 使用已知最短路径
8.2 探索策略
随机探索(Random Exploration):
import random
class RandomExploration:
"""
完全随机探索
优点:渐近最优
缺点:收敛极慢
"""
def __init__(self, n_arms):
self.n_arms = n_arms
def select_arm(self):
return random.randint(0, self.n_arms - 1)贪婪策略(Greedy):
class EpsilonGreedy:
"""
ε-贪婪算法
以概率 ε 探索(随机选择)
以概率 1-ε 利用(选择当前最佳)
"""
def __init__(self, n_arms, epsilon=0.1):
self.n_arms = n_arms
self.epsilon = epsilon
self.counts = [0] * n_arms
self.values = [0.0] * n_arms
def select_arm(self):
if random.random() > self.epsilon:
# 利用:选择当前估计值最高的臂
return self.values.index(max(self.values))
else:
# 探索:随机选择
return random.randint(0, self.n_arms - 1)
def update(self, arm, reward):
self.counts[arm] += 1
n = self.counts[arm]
# 增量更新估计值
self.values[arm] = ((n - 1) * self.values[arm] + reward) / n8.3 UCB算法
上置信界(Upper Confidence Bound):
import math
class UCB1:
"""
UCB1 算法
核心思想:平衡估计均值和不确定性
UCB = 估计均值 + 置信上界
竞争比:O(sqrt(log T))
渐近最优
"""
def __init__(self, n_arms):
self.n_arms = n_arms
self.counts = [0] * n_arms
self.values = [0.0] * n_arms
self.total_counts = 0
def select_arm(self):
# 首先尝试每个臂一次
for arm in range(self.n_arms):
if self.counts[arm] == 0:
return arm
# 计算 UCB
ucb_values = []
for arm in range(self.n_arms):
bonus = math.sqrt(
2 * math.log(self.total_counts) / self.counts[arm]
)
ucb_values.append(self.values[arm] + bonus)
return ucb_values.index(max(ucb_values))
def update(self, arm, reward):
self.total_counts += 1
self.counts[arm] += 1
n = self.counts[arm]
self.values[arm] = ((n - 1) * self.values[arm] + reward) / nUCB的理论分析:
定理:UCB1的期望后悔值满足:
其中 是第 个臂与最优臂的差距。
8.4 Thompson采样
import numpy as np
from scipy import stats
class ThompsonSampling:
"""
Thompson Sampling (Bayesian)
假设每个臂的奖励服从 Beta(α, β) 分布
每轮从后验分布采样,选择期望最高的臂
"""
def __init__(self, n_arms, alpha_prior=1, beta_prior=1):
self.n_arms = n_arms
self.alpha = [alpha_prior] * n_arms
self.beta = [beta_prior] * n_arms
def select_arm(self):
samples = []
for arm in range(self.n_arms):
# 从 Beta 分布采样
sample = np.random.beta(self.alpha[arm], self.beta[arm])
samples.append(sample)
return samples.index(max(samples))
def update(self, arm, reward):
# 更新后验参数
if reward == 1:
self.alpha[arm] += 1
else:
self.beta[arm] += 19. 在线学习与后悔值
9.1 注册模型
注册模型(Experts Advice)是分析在线学习的通用框架:
class OnlineLearningFramework:
"""
在线学习的注册模型
每轮:
1. 专家提出建议
2. 算法选择一个专家
3. 观察损失
4. 更新
"""
def __init__(self, n_experts):
self.n_experts = n_experts
self.weights = [1.0] * n_experts
def predict(self):
"""加权平均预测"""
total_weight = sum(self.weights)
probs = [w / total_weight for w in self.weights]
return random.choices(range(self.n_experts), weights=probs)[0]
def update(self, expert, loss):
"""按损失更新权重"""
self.weights[expert] *= (1 - loss)9.2 加权多数算法
class WeightedMajority:
"""
加权多数算法
用于二分类问题
"""
def __init__(self, n_experts, eta=0.5):
self.n_experts = n_experts
self.weights = [1.0] * n_experts
self.eta = eta # 学习率
def predict(self):
"""加权投票"""
total_weight = sum(self.weights)
# 计算加权投票
pos_weight = 0
neg_weight = 0
# 假设专家给出 +1 或 -1 预测
for expert, weight in enumerate(self.weights):
prediction = self.expert_predictions[expert]
if prediction > 0:
pos_weight += weight
else:
neg_weight += weight
return 1 if pos_weight > neg_weight else -1
def update(self, expert, prediction, actual):
"""更新权重"""
if prediction != actual:
self.weights[expert] *= (1 - self.eta)9.3 Follow the Leader (FTL)
class FollowTheLeader:
"""
Follow the Leader (FTL)
每轮选择目前为止表现最好的专家
"""
def __init__(self, n_experts):
self.n_experts = n_experts
self.cumulative_losses = [0.0] * n_experts
def predict(self):
"""选择累积损失最小的专家"""
return self.cumulative_losses.index(min(self.cumulative_losses))
def update(self, expert, loss):
self.cumulative_losses[expert] += lossFTL的问题:容易过拟合当前数据,可能被对抗性输入击败。
9.4 Follow the Regularized Leader (FTRL)
class FTRLProximal:
"""
Follow the Regularized Leader (FTRL-Proximal)
添加正则化防止权重剧烈变化
"""
def __init__(self, n_actions, learning_rate, regularization=1.0):
self.n_actions = n_actions
self.learning_rate = learning_rate
self.regularization = regularization
self.z = [0.0] * n_actions
def predict(self):
"""选择最小化代理损失的行动"""
best_action = 0
best_value = float('inf')
for action in range(self.n_actions):
# L1 正则化代理
value = self.z[action]
if self.z[action] > 0:
value -= self.learning_rate * self.regularization
else:
value += self.learning_rate * self.regularization
if value < best_value:
best_value = value
best_action = action
return best_action
def update(self, action, gradient):
self.z[action] += gradient10. 随机化在线算法
10.1 随机化的优势
确定性在线算法在某些问题上无法改进,但随机化可以打破对称性:
例子:在线匹配
def randomized_online_matching(G):
"""
随机化在线最大匹配
竞争比:O(log n) vs 确定性算法的 O(√n)
"""
import random
# 随机处理顺序
edges = list(G.edges())
random.shuffle(edges)
M = set()
for (u, v) in edges:
if u not in M and v not in M:
M.add((u, v))
return M10.2 Yao原理:期望竞争比下界
Yao原理连接随机化在线算法和输入分布:
一个随机化算法 的期望竞争比满足
对所有输入 成立,当且仅当对所有输入分布 , 存在确定性算法 使得
推论:要证明随机化算法的期望竞争比下界,只需构造一个输入分布,使得所有确定性算法的期望竞争比有一个下界。
def yao_principle_lower_bound(problem, distribution, num_samples=1000):
"""
Yao原理:通过对输入分布求期望来获得随机化下界
对于任何随机化算法,其期望竞争比 ≥ min_{deterministic A} E[竞争比]
"""
best_deterministic_ratio = float('inf')
# 穷举确定性算法(理论上)
for algorithm in enumerate_deterministic_algorithms(problem):
ratios = []
for _ in range(num_samples):
input_instance = distribution.sample()
ratio = algorithm.cost(input_instance) / optimal.cost(input_instance)
ratios.append(ratio)
expected_ratio = mean(ratios)
best_deterministic_ratio = min(best_deterministic_ratio, expected_ratio)
return best_deterministic_ratio10.3 随机化负载均衡
随机分配(Random Assignment):
def random_assignment(jobs, m):
"""
随机分配作业到机器
每轮随机选择一台机器分配作业
"""
for job in jobs:
machine = random.randint(0, m-1)
machines[machine].add(job)竞争比分析:
设 个作业随机分配到 台机器。
每台机器的负载服从 。
使用 Chernoff 界,可以证明:
P(L_i > (1 + \epsilon) n/m) \leq e^{-\Omega(\epsilon^2 n/m))总体竞争比:
11. 对抗性分析与鲁棒性
11.1 自适应对抗模型
更强的对抗者:对抗者可以看到在线算法的实际决策,动态调整后续输入。
- 轻度自适应:对抗者在看到算法决策后生成下一输入
- 完全自适应:对抗者完全知道算法并优化最坏情况
class AdaptiveAdversary:
"""
自适应对抗者
根据算法历史决策生成最坏输入
"""
def __init__(self, algorithm):
self.algorithm = algorithm
self.history = []
def generate_worst_input(self):
"""根据算法当前状态生成下一个输入"""
# 分析算法行为模式
pattern = analyze_algorithm_behavior(self.algorithm)
# 构造针对该模式的输入
worst_input = construct_adversarial_input(pattern, self.history)
self.history.append(worst_input)
return worst_input11.2 排程问题的自适应下界
定理(对两台机器排程):任何确定性在线算法的竞争比至少为 。
构造:前两个作业大小为 ,根据算法分配决定第三个作业大小。
def scheduling_lower_bound_adversary(machines=2):
"""
排程问题的自适应对抗构造
"""
jobs = [1.0, epsilon]
# 观察算法对前两个作业的分配
assignment = observe_assignment(jobs[:2], machines)
# 决定第三个作业大小
if balanced_assignment(assignment):
# 算法平衡分配,第三作业设为大的
jobs.append(2.0)
else:
# 算法不平衡,第三作业设为利用这个不平衡
jobs.append(1.0 + epsilon)
return jobs11.3 指数后退策略
对于某些问题,随机化是抵抗自适应对抗的唯一手段。
class RandomizedExponentialBackoff:
"""
随机化指数后退
用于防止自适应对抗的主动干扰
"""
def __init__(self, base_delay, max_delay):
self.base_delay = base_delay
self.max_delay = max_delay
def wait_time(self, attempt):
"""随机化等待时间"""
max_wait = min(self.base_delay * (2 ** attempt), self.max_delay)
return random.uniform(0, max_wait)12. 在线算法在AI中的应用
12.1 推荐系统
用户交互数据流式到达,推荐算法必须在线更新模型:
| 场景 | 在线算法 | 核心挑战 |
|---|---|---|
| Bandits | UCB/Thompson Sampling | 探索利用权衡 |
| 流推荐 | 在线协同过滤 | 矩阵分解的在线更新 |
| A/B测试 | 在线实验框架 | 多臂老虎机变体 |
| 上下文推荐 | LinUCB | 特征与奖励的线性关系 |
class OnlineRecommender:
"""
在线推荐系统
结合协同过滤和上下文老虎机
"""
def __init__(self, n_items, context_dim):
self.n_items = n_items
self.context_dim = context_dim
# 每个物品的线性模型参数
self.A = [np.eye(context_dim) for _ in range(n_items)]
self.b = [np.zeros(context_dim) for _ in range(n_items)]
self.thetas = [np.zeros(context_dim) for _ in range(n_items)]
def recommend(self, user_context, epsilon=0.1):
"""基于 LinUCB 的推荐"""
if random.random() < epsilon:
return random.randint(0, self.n_items - 1)
ucb_values = []
for item in range(self.n_items):
# 估计均值
theta = np.linalg.solve(self.A[item], self.b[item])
mean = theta @ user_context
# 置信上界
uncertainty = np.sqrt(user_context @ np.linalg.inv(self.A[item]) @ user_context)
ucb = mean + 2 * uncertainty
ucb_values.append(ucb)
return np.argmax(ucb_values)
def update(self, item, user_context, reward):
"""更新模型"""
self.A[item] += user_context.reshape(-1, 1) @ user_context.reshape(1, -1)
self.b[item] += reward * user_context12.2 自动驾驶决策
自动驾驶中的路径规划和控制都是在线问题:
- 部分可观测环境
- 决策必须在实时约束内完成
- 使用模型预测控制(MPC)等在线优化技术
class ModelPredictiveControl:
"""
模型预测控制(MPC)
在线算法:每步求解有限时域优化问题
"""
def __init__(self, horizon=10, dt=0.1):
self.horizon = horizon
self.dt = dt
def solve(self, state, reference):
"""求解优化问题得到控制序列"""
# 简化的MPC步骤
trajectory = [state]
for t in range(self.horizon):
# 线性化系统(在线)
A, B = linearize_dynamics(state)
# QP求解(在线)
u = solve_qp(A, B, reference, state)
# 应用控制
state = apply_control(state, u, self.dt)
trajectory.append(state)
return trajectory[1], u # 返回第一个控制12.3 大语言模型推理
LLM的自回归生成是典型的在线决策过程:
- token逐个生成
- 不能”预知”未来内容
- 使用束搜索(beam search)进行在线剪枝
class BeamSearch:
"""
束搜索
在线决策:每步扩展 top-k 候选
"""
def __init__(self, model, beam_width=5):
self.model = model
self.beam_width = beam_width
def generate(self, prompt, max_length=100):
"""自回归生成"""
beams = [(prompt, 0.0, False)] # (sequence, score, done)
for _ in range(max_length):
candidates = []
for seq, score, done in beams:
if done:
candidates.append((seq, score, True))
continue
# 扩展 top-k
logits = self.model.predict(seq)
top_k = self.model.top_k(logits, self.beam_width)
for token, log_prob in top_k:
new_seq = seq + [token]
new_score = score + log_prob
done = token == EOS
candidates.append((new_seq, new_score, done))
# 保留 top-k
beams = sorted(candidates, key=lambda x: x[1] / len(x[0]))[-self.beam_width:]
if all(done for _, _, done in beams):
break
return max(beams, key=lambda x: x[1] / len(x[0]))[0]12.4 强化学习中的在线决策
class OnlineRLAgent:
"""
在线强化学习代理
结合探索与利用
"""
def __init__(self, state_space, action_space, algorithm='q_learning'):
self.state_space = state_space
self.action_space = action_space
self.algorithm = algorithm
# Q表或策略
self.q_values = np.zeros((state_space, action_space))
self.visits = np.zeros((state_space, action_space))
def select_action(self, state, epsilon=0.1):
"""ε-贪婪策略"""
if random.random() < epsilon:
return random.randint(0, self.action_space - 1)
return np.argmax(self.q_values[state])
def update(self, state, action, reward, next_state, learning_rate=0.1, gamma=0.9):
"""TD更新"""
if self.algorithm == 'q_learning':
# 异策略:使用最优下一状态的Q值
td_target = reward + gamma * np.max(self.q_values[next_state])
else:
# 同策略:使用当前策略的Q值
td_target = reward + gamma * self.q_values[next_state, self.select_action(next_state)]
td_error = td_target - self.q_values[state, action]
self.q_values[state, action] += learning_rate * td_error
self.visits[state, action] += 113. 数据流算法
13.1 数据流模型
数据流算法研究数据以高速流形式到达时的计算问题:
class DataStreamAlgorithm:
"""
数据流算法框架
约束:
- 只能顺序扫描数据一次(或有限次)
- 有限工作内存
- 必须产生近似解
"""
def __init__(self, stream, window_size=None):
self.stream = stream
self.window_size = window_size
self.memory = {} # 有限状态
def process(self):
"""处理数据流"""
for item in self.stream:
self.update(item)
if self.window_size:
self.evict_old_items()
def update(self, item):
"""更新状态"""
raise NotImplementedError
def query(self):
"""返回答案"""
raise NotImplementedError13.2 频率估计
class CountMinSketch:
"""
Count-Min Sketch
用于估计数据流中元素的出现频率
空间:O(log(1/δ) * log(1/ε))
"""
def __init__(self, epsilon, delta):
self.epsilon = epsilon
self.delta = delta
# 哈希函数数量和宽度
self.num_hashes = int(math.ceil(math.log(1 / delta)))
self.width = int(math.ceil(math.e / epsilon))
# 计数器数组
self.counters = [[0] * self.width for _ in range(self.num_hashes)]
# 哈希函数
self.hash_functions = [
self._generate_hash(i) for i in range(self.num_hashes)
]
def _generate_hash(self, seed):
"""生成哈希函数 h(x) = (ax + b) mod width"""
a = random.randint(1, self.width - 1)
b = random.randint(0, self.width - 1)
return lambda x: (a * hash(x) + b) % self.width
def update(self, item):
"""增加计数"""
for i, h in enumerate(self.hash_functions):
self.counters[i][h(item)] += 1
def estimate(self, item):
"""估计频率(可能偏高)"""
estimates = [self.counters[i][self.hash_functions[i](item)]
for i in range(self.num_hashes)]
return min(estimates) # Count-Min: 取最小估计13.3 基数估计
class HyperLogLog:
"""
HyperLogLog
基数估计:估计集合中不同元素的数量
空间:O(log log n) 比特每元素
标准误差:约 1.04 / sqrt(m)
"""
def __init__(self, p=10):
self.p = p # 寄存器数量 = 2^p
self.m = 1 << p
self.registers = [0] * self.m
def add(self, item):
"""添加元素"""
# 哈希
h = hash(item)
# 低p位确定桶
j = h & (self.m - 1)
# 高 (64-p) 位找第一个1的位置
w = h >> self.p
rho = 64 - self.p - int(w).bit_length() + 1 if w != 0 else 64 - self.p + 1
# 更新寄存器
self.registers[j] = max(self.registers[j], rho)
def estimate(self):
"""估计基数"""
import math
# 调和平均
m = self.m
alpha_m = 0.7213 / (1 + 1.079 / m) if m >= 128 else self._alpha(m)
Z = 1.0 / sum(2 ** -self.registers[j] for j in range(m))
E = alpha_m * m * m * Z
# 修正小基数和大基数
if E <= 2.5 * m:
# 使用线性计数
zeros = self.registers.count(0)
if zeros > 0:
return m * math.log(m / zeros)
return E
def _alpha(self, m):
"""计算 alpha_m"""
if m == 16:
return 0.673
elif m == 32:
return 0.697
elif m == 64:
return 0.709
else:
return 0.7213 / (1 + 1.079 / m)14. 核心算法对比表
| 问题 | 确定性竞争比 | 随机化竞争比 | 下界 |
|---|---|---|---|
| 秘书问题 | 1/e(随机顺序) | 1/e | 1/e |
| 在线装箱 | 1.7 | 1.691 | 1.531 |
| 负载均衡 | |||
| Ski Rental | 同 | ||
| 在线匹配 | |||
| 缓存置换(k大小) | k | k | |
| 列表更新 | 2 | ||
| 多臂老虎机 | N/A(后悔值) |
15. 在线算法的设计原则
15.1 保守 vs 冒险策略
保守策略:优先稳定,避免高损失
def conservative_strategy(state, options):
"""
保守策略:最小化最坏情况
"""
worst_case_costs = []
for option in options:
worst_cost = max(simulate_worst_case(state, option))
worst_case_costs.append(worst_cost)
return options[np.argmin(worst_case_costs)]冒险策略:追求高收益,接受高风险
def aggressive_strategy(state, options):
"""
冒险策略:最大化最好情况
"""
best_case_gains = []
for option in options:
best_gain = max(simulate_best_case(state, option))
best_case_gains.append(best_gain)
return options[np.argmax(best_case_gains)]15.2 延迟决策原则
尽可能延迟不可逆决策:
class DeferredDecision:
"""
延迟决策策略
尽可能收集信息后再做决定
"""
def __init__(self, max_buffer=100):
self.buffer = []
self.max_buffer = max_buffer
def add_request(self, request):
self.buffer.append(request)
if len(self.buffer) >= self.max_buffer:
return self._batch_decide()
return None
def _batch_decide(self):
"""批量处理积累的请求"""
decision = optimize_batch(self.buffer)
self.buffer = []
return decision15.3 资源预留策略
class ResourceReservation:
"""
资源预留策略
预留一定比例的资源用于探索/保险
"""
def __init__(self, total_resources, reserve_fraction=0.1):
self.total = total_resources
self.reserve = total_resources * reserve_fraction
self.available = total_resources - self.reserve
def allocate(self, request, min_required):
"""
分配资源,保留保险储量
"""
if min_required > self.available:
# 不能满足需求
return None
allocation = min(request, self.available)
self.available -= allocation
return allocation
def release(self, amount):
"""释放资源"""
self.available = min(self.available + amount, self.total - self.reserve)参考来源
- Borodin, A., & El-Yaniv, R. (1998). Online Computation and Competitive Analysis. Cambridge University Press.
- Sleator, D. D., & Tarjan, R. E. (1985). Amortized Efficiency of List Update and Paging Rules. CACM.
- Graham, R. L. (1966). Bounds for Certain Multiprocessing Anomalies. Bell System Technical Journal.
- Karp, R. M. (1992). On-Line Algorithms. On-Line Algorithms.
- Mehta, A., Saberi, A., Vazirani, U., & Vazirani, V. (2005). AdWords and Generalized Online Matching. JACM.
- Belady, L. A. (1966). A Study of Replacement Algorithms for Virtual-Storage Computer. IBM Systems Journal.
- Auer, P., Cesa-Bianchi, N., & Fischer, P. (2002). Finite-time Analysis of the Multiarmed Bandit Problem. Machine Learning.
- Lai, T. L., & Robbins, H. (1985). Asymptotically Efficient Adaptive Allocation Rules. Advances in Applied Mathematics.
- Bansal, N., & Pruhs, K. (2010). The Geometry of Online Packing Linear Programs. Math. Oper. Res.
- Buchbinder, N., & Naor, J. (2009). The Design of Competitive Online Algorithms via a Primal-Dual Approach. Foundations and Trends in Theoretical Computer Science.