在线算法详解
关键词速览
| 关键词 | 解释 |
|---|---|
| 竞争分析 | 在线算法与最优离线算法的比较框架 |
| 竞争比 | 在线算法性能与最优算法的比值上界 |
| 最优在线决策 | 基于部分信息的顺序决策问题 |
| Secretary Problem | 经典的选择最优候选人问题 |
| 在线装箱 | 物品序列的在线分配 |
| 负载均衡 | 任务分配到多台机器 |
| Ski Rental | 在线租买决策问题 |
| 对抗性输入 | 分析在线算法的输入模型 |
| 随机化在线 | 使用随机性的在线算法 |
| Yao原理 | 期望竞争比的下界证明 |
| 缓存置换 | 页面替换的在线策略 |
| LRU | 最近最少使用策略 |
| 分页算法 | 虚拟内存管理 |
| 列表更新 | 自我组织的线性搜索 |
| 多臂老虎机 | 探索利用权衡的数学模型 |
| 上置信界 | UCB算法 |
| 注册模型 | 在线学习的形式化框架 |
| 后悔值 | 在线学习与最优的差距 |
| 次线性算法 | 只需次线性信息访问的算法 |
| 流算法 | 数据流模型下的在线处理 |
摘要
在线算法研究在信息不完整条件下进行决策的理论与实践。与离线算法拥有全部输入不同,在线算法必须在收到每个输入后立即做出不可撤回的决定。本文系统阐述竞争分析框架,通过经典案例(秘书问题、在线装箱、负载均衡、Ski Rental问题)展示问题建模与算法设计技术,并探讨随机化在线算法的优势。在线算法的思想广泛应用于操作系统调度、网络路由、搜索引擎排序和推荐系统等AI领域。
在线算法的研究起源于20世纪70年代的”表调度”(list scheduling)和”分页”(paging)问题。1975年,Graham对调度问题的开创性工作引入了竞争分析的概念。1979年,Sleator和Tarjan发表了关于自我调整列表(self-adjusting lists)的经典论文,开启了在线算法理论分析的序幕。此后,Karp、Borodin和El-Yaniv等学者系统发展了竞争分析的理论框架。
本文将覆盖以下内容:首先是竞争分析的数学基础与输入模型;其次是秘书问题、在线装箱、负载均衡等经典问题的深入分析;然后是缓存置换算法、列表更新等操作系统中的在线问题;接着探讨多臂老虎机等在线学习框架;再讨论随机化在线算法与Yao原理;最后展示在线算法在AI和现代计算中的应用。
1. 在线算法入门:为什么看不到未来也能做决策?
1.1 现实中的在线决策困境
想象这样一个场景:你站在一家自助餐厅门口,手里拿着餐票,不知道今天会有什么菜。你可能特别喜欢某个菜(比如糖醋排骨),但问题是——如果你现在拿了别的菜,后面再想拿糖醋排骨时可能已经被抢光了。反过来,如果你一开始就冲向糖醋排骨,结果发现今天的糖醋排骨巨难吃,那你这顿饭就毁了。
这就是在线决策的本质困境:你必须在信息不完全的时候做出选择,而且一旦选了就改不了。
现实世界里这种困境无处不在:
购物决策:双十一你看到一件大衣打折,到底是现在买还是等更低价?万一后面涨价了呢?
职业选择:毕业季你拿到了几个offer,是接受现在这个还是再等等后面的机会?
相亲约会:见了几个相亲对象,什么时候该定下来?
这些问题都有一个共同特点:你不知道未来会发生什么,但必须现在就做决定,而且决定往往不可逆。
在线算法就是研究这类问题的学科,只不过我们把它抽象化、数学化,然后想办法设计出”还不错”的决策策略。
1.2 什么是在线算法?
在线算法(Online Algorithm) 是指一种处理输入的算法,其输入是随着时间逐步到达的,而不是一次性全部给出的。在线算法必须在收到每个输入片段后立即做出响应,且这些响应通常是不可撤回的。
这跟离线算法(Offline Algorithm) 形成了鲜明对比。离线算法就像是一个全知全能的上帝,它在做出决定之前已经看到了所有的输入。比如,如果你知道未来一年股票市场的所有走势,那你当然可以做出最优的投资决策——但问题是,这种”上帝视角”在现实中根本不存在。
举几个在线算法的具体例子:
电梯调度:当你在某一层按下电梯按钮时,电梯控制系统必须立即决定是停还是不停。它不知道后面还有多少人会按按钮,也不知道现在等待的人要上还是下。这就是在线决策。
缓存置换:你的CPU缓存或者浏览器缓存空间有限,当需要加载一个新页面而缓存已满时,系统必须决定把哪个旧页面踢出去。它不知道这个被踢出去的页面后面还会不会被用到。这也是在线决策。
搜索引擎排名:用户输入一个查询,搜索引擎必须立即返回排名结果。它不知道用户最终会点击哪个结果,也不知道这个用户下次会搜索什么。这也是在线决策。
1.3 在线性的三个来源
为什么会有在线问题?主要是因为三种”在线性”:
时间在线性:数据随着时间流逝不断到来,你必须实时处理。比如网络数据流、传感器读数、用户点击行为等。凌晨2点到达的日志和下午2点到达的日志,处理方式可能完全不同,但你必须在它们到来的时候立刻做出反应。
空间在线性:你只能看到数据的一部分,无法一次性获取所有数据。比如处理一个巨大的数据库,你可能无法一次性把数据全部加载到内存中,只能分批处理。
结构在线性:数据的结构本身是逐步暴露的。比如在图的遍历中,你可能只看到当前节点的邻居,要决定是否探索这条边时还看不到更远的节点。
1.4 在线性程度的光谱
并不是所有问题都是”完全在线”的,在线和离线之间有一个渐变的光谱:
完全离线 ←————————————————————→ 完全在线
| |
| 半在线(部分信息已知) |
| | |
| 批处理(有限 lookahead) |
| 预约式(可延迟) |
| |
←————————————— 决策可逆 ———————————————→
完全离线:拥有完整信息,可以做出全局最优决策。
半在线:知道输入的部分统计特性或分布,比如知道商品价格的波动范围。
有限lookahead:可以看到未来一小段,比如天气预报能预测未来几天的天气。
决策可逆:有些决定虽然要即时做出,但后面可以反悔,比如网上购物有七天无理由退货。
完全在线:信息完全未知,必须立即决策,且完全不可逆。
2. 竞争比分析:在线算法的”性能保证”是什么?
2.1 为什么不能只看准确率?
在机器学习中,我们通常用准确率、召回率这些指标来衡量算法好坏。但在在线算法领域,这些指标不太好用。为什么呢?
因为在线算法的”正确答案”取决于未来,而未来是未知的。所以我们需要一个不同的比较框架。
2.2 竞争分析的核心思想
竞争分析(Competitive Analysis) 是在线算法领域的核心评估框架。它的基本思想是:把在线算法的表现和”知道未来的最优算法”进行比较。
这里的”最优离线算法”(记作OPT)是一个假想的全知全能算法,它能看到所有输入,然后做出最优决策。我们让在线算法ALG和OPT在相同的输入序列上运行,比较它们的代价。
形式化地说:
设 表示在线算法 在输入 上的代价(或收益), 表示最优离线算法在相同输入 上的代价(或收益)。
竞争比的定义: 一个在线算法 是 -竞争的,当且仅当存在常数 使得
对所有输入 成立。
这里 就是竞争比(Competitive Ratio),也叫竞争因子。
- 对于最小化问题(如成本、时间),我们希望 越小越好
- 对于最大化问题(如收益、准确率),我们希望 越大越好(通常写成 的形式)
2.3 竞争比意味着什么?
竞争比给了我们一个很有意思的保证:不管未来输入是什么样的,我的算法最多只比最优算法差 倍。
举例来说,如果一个在线装箱算法的竞争比是 1.7,这意味着:
- 不管来了什么形状的箱子
- 不管这些箱子以什么顺序到达
- 我们的算法用的箱子数最多是最优算法的 1.7 倍
这听起来可能有点抽象,但想象一下这个场景:你在组织一场装箱比赛,规则是你必须实时把物品装进箱子(不能预知后面的物品),而你的对手可以预先看到所有物品然后慢慢规划。即使在这种情况下,你的算法也不会输得太惨,最多也就多用 70% 的箱子。
2.4 竞争比的直观理解
用一个生活化的例子来理解:
假设你在玩一个游戏,游戏规则是你要选择每天是走路还是打车去上班。如果你选走路,花费是0元但耗时60分钟;如果你选打车,花费是50元但耗时20分钟。
完全不知道未来天气的情况下,你每天都要做选择:今天是走路还是打车?
如果天气预报告诉你未来一周都会下雨,那最优策略显然是每天都打车。但如果没有任何信息,你就陷入了在线决策的困境。
竞争比衡量的就是:在最坏情况下,你的”盲目决策”比”全知决策”差多少倍。
2.5 竞争比分析的两个方向
竞争比上界:通过构造性证明,展示某个在线算法的竞争比不会超过某个值。
竞争比下界:证明任何在线算法都不可能做到比某个竞争比更好。
这两个方向同样重要。上界告诉我们”能做到多好”,下界告诉我们”能指望多好”。如果某个问题的上界等于下界,那就意味着我们找到了最优的在线算法。
2.6 渐近竞争比
在实际应用中,我们通常更关心当输入规模很大时的表现,所以会使用渐近竞争比(Asymptotic Competitive Ratio):
当 时,若 ,则算法是渐近 -竞争的。
用人话来说就是:对于足够大的输入,我们的算法最多比最优算法差 倍,常数项的影响可以忽略。
3. Ski Rental问题:租雪具还是买雪具?
3.1 问题的来龙去脉
Ski Rental问题(租雪具问题)是理解在线算法的一个绝佳入口,因为它足够简单,但又足够深刻。
问题的场景是这样的:冬天到了,你想滑雪,但不确定今年会滑几次。雪具店有两种方案:
- 租:每天租金 元
- 买:一次性付 元,之后就免费滑了
问题是:你事先不知道今年会滑多少天。每天滑完后,你可以决定明天还滑不滑(所以你可以根据当天滑的情况来决定明天是否继续),但你必须在当天结束时决定,而不是等到知道明天的情况后再决定。
到底什么时候该买?
3.2 租还是买:直觉上的两难
表面上这个问题很简单:
- 如果你打算滑很多天,买肯定更划算
- 如果你只打算滑一两天,租更划算
但问题是:你不知道到底会滑多少天!
如果你决定租到底,最坏情况是滑了 天以上,那你就亏了(租金加起来超过了买的价格)。
如果你直接买,最坏情况是只滑了一天,那你就亏了(买了个可能再也不用的东西)。
3.3 最优策略:找到一个”买断点”
经过数学分析,这个问题有一个漂亮的最优策略:存在一个最优的买断点。
策略如下:
- 前 天选择租
- 第 天之后,选择买(不管之前租了几天)
为什么这样是对的呢?让我们来算一笔账:
情况1:你总共滑了 天
- 如果 ,最优就是一直租,花费
- 如果 ,最优是先租 天,然后买,花费
我们的策略在这些情况下都不会太差。最坏情况是 时,我们的花费是 ,而最优是 ,比值最多是 。
def ski_rental_decision(r, p, days_remaining, days_rented):
"""
Ski Rental 最优决策
参数:
- r: 每日租金
- p: 购买价格
- days_remaining: 还剩多少天要决定(今天结束后)
- days_rented: 已经租了多少天
返回: 'rent' 或 'buy'
"""
# 如果租了 p/r 天以上,买更划算
if days_rented * r >= p:
return 'buy'
else:
# 继续租
return 'rent'
def ski_rental_optimal_strategy(r, p, actual_days):
"""
模拟最优策略的执行
"""
days_rented = 0
total_cost = 0
while days_rented < actual_days:
decision = ski_rental_decision(r, p,
actual_days - days_rented,
days_rented)
if decision == 'rent':
total_cost += r
days_rented += 1
print(f"第{days_rented}天:租,花费{total_cost}元")
else:
total_cost += p
print(f"第{days_rented+1}天:买断,总花费{total_cost}元")
break
return total_cost3.4 竞争比:2-差不多是最优的
这个策略的竞争比是 。由于 (否则没人会考虑买),竞争比在 1 到 2 之间。
更有意思的是:任何确定性在线算法的竞争比都不可能小于 2 - r/p。所以我们的策略实际上是最优的!
为什么竞争比不能更好?考虑这个场景:
- 敌手知道你会在第 天买
- 如果你租了 天就决定买,那敌手就只给你 天的滑雪日,让你白白买了却没机会用
- 如果你决定一直租,那敌手就给你无限多的滑雪日,让你租到亏本
这就是 Ski Rental 问题告诉我们的:在信息完全缺失的情况下,你不可能做得比”最多亏一倍”更好。
3.5 Ski Rental的现实映射
Ski Rental 问题看似是个滑雪场景,实际上它可以映射到很多现实决策:
软件订阅 vs 永久授权:月付 元 vs 一次付 元买断。什么时候该从订阅转为买断?
云服务选型:按需付费 vs 预留实例。对于一家快速成长的公司,什么时候该从按需付费转向预留实例?
健身卡选择:次卡 vs 月卡 vs 年卡。你每周去几次健身房时,该选哪种卡?
设备采购:租设备 vs 买设备。什么时候该从租赁转向购买?
所有这些问题本质上都和 Ski Rental 是同一个数学问题。
4. 缓存置换算法:系统内存不够了怎么办?
4.1 缓存置换的直觉
想象你是一个图书管理员,你的书架只能放10本书。现在有人来借书,你需要把书架上的某些书挪走,把新书放上去。但问题是,你不知道这些被挪走的书以后还会不会有人借。
这就是缓存置换问题(Cache Replacement Problem)。
在计算机系统中,缓存置换无处不在:
- CPU缓存:L1/L2/L3缓存空间有限
- 内存分页:物理内存不够时需要换出到磁盘
- 浏览器缓存:本地存储空间有限
- CDN缓存:边缘节点存储空间有限
- 数据库缓冲池:内存页的置换
所有这些场景,本质上都是同一个问题:当缓存满了,需要踢掉一个旧条目放入新条目,该踢哪个?
4.2 三种经典策略
FIFO(First In First Out):最早进来的,优先出去。
这个策略很简单,就像排队一样,先来先服务。它的优点是实现起来非常容易,缺点是可能会踢掉经常用的东西——因为一个东西进缓存很久,不代表它不重要。
from collections import deque
class FIFOCache:
"""
FIFO 缓存置换算法
策略:最早进入的页面先被踢出
特点:实现简单,但可能踢掉热点数据
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = set()
self.queue = deque() # 记录进入顺序
def access(self, page):
"""访问一个页面"""
if page in self.cache:
print(f"页面 {page}: 命中")
return True # 命中
print(f"页面 {page}: 未命中")
# 未命中,需要加载
if len(self.cache) >= self.capacity:
# 缓存满了,踢出队首
evicted = self.queue.popleft()
self.cache.remove(evicted)
print(f" -> 驱逐页面 {evicted}")
self.cache.add(page)
self.queue.append(page)
print(f" -> 加载页面 {page}")
return False
def print_state(self):
print(f" 当前缓存: {sorted(self.cache)}")
print(f" 队列顺序: {list(self.queue)}")LRU(Least Recently Used):最近最少使用的,优先出去。
这个策略的直觉是:如果一个东西最近被用过,那它以后被用的可能性也较大。LRU 在实践中表现很好,但实现起来稍微复杂一点(需要记录每个页面的”最近使用时间”)。
class LRUCache:
"""
LRU 缓存置换算法
策略:最久未使用的页面先被踢出
特点:符合局部性原理,实际表现优秀
实现:双向链表 + 哈希表,达到 O(1) 复杂度
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = {} # key -> node
# 双向链表
self.head = Node('head') # 最最近使用的
self.tail = Node('tail')
self.head.next = self.tail
self.tail.prev = self.head
def _remove(self, node):
"""从链表中移除一个节点"""
node.prev.next = node.next
node.next.prev = node.prev
def _add_to_front(self, node):
"""把节点加到最最近使用的位置"""
node.next = self.head.next
node.prev = self.head
self.head.next.prev = node
self.head.next = node
def access(self, page):
"""访问一个页面"""
if page in self.cache:
print(f"页面 {page}: 命中")
# 命中,更新到最前面
node = self.cache[page]
self._remove(node)
self._add_to_front(node)
return True
print(f"页面 {page}: 未命中")
# 未命中
if len(self.cache) >= self.capacity:
# 驱逐最久未使用的(tail前一个)
lru = self.tail.prev
self._remove(lru)
del self.cache[lru.key]
print(f" -> 驱逐页面 {lru.key}")
# 加载新页面
new_node = Node(page)
self.cache[page] = new_node
self._add_to_front(new_node)
print(f" -> 加载页面 {page}")
return False
def print_state(self):
"""打印当前缓存状态"""
items = []
node = self.head.next
while node != self.tail:
items.append(node.key)
node = node.next
print(f" 当前缓存(最近在前): {items}")LFU(Least Frequently Used):使用频率最低的,优先出去。
这个策略的直觉是:如果一个页面被访问过很多次,那它可能更重要。LFU 适合那些访问模式比较稳定的场景,但实现起来更复杂(需要维护访问计数)。
import heapq
class LFUCache:
"""
LFU 缓存置换算法
策略:访问频率最低的页面先被踢出
特点:适合访问模式稳定的场景
注意:这是简化版本,实际实现需要处理频率相同时的选择问题
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = {} # key -> (freq, key, value)
self.freq_map = {} # freq -> set of keys
self.min_freq = 0
def access(self, page):
"""访问一个页面"""
if page in self.cache:
print(f"页面 {page}: 命中")
freq, _, _ = self.cache[page]
new_freq = freq + 1
# 从旧频率集合移除
self.freq_map[freq].remove(page)
if not self.freq_map[freq]:
del self.freq_map[freq]
if freq == self.min_freq:
self.min_freq = new_freq
# 添加到新频率集合
if new_freq not in self.freq_map:
self.freq_map[new_freq] = set()
self.freq_map[new_freq].add(page)
# 更新缓存
self.cache[page] = (new_freq, page, None)
return True
print(f"页面 {page}: 未命中")
# 未命中
if len(self.cache) >= self.capacity:
# 驱逐频率最低的
if self.min_freq in self.freq_map:
lfu_page = next(iter(self.freq_map[self.min_freq]))
del self.cache[lfu_page]
self.freq_map[self.min_freq].remove(lfu_page)
print(f" -> 驱逐页面 {lfu_page}")
# 加载新页面
self.cache[page] = (1, page, None)
if 1 not in self.freq_map:
self.freq_map[1] = set()
self.freq_map[1].add(page)
self.min_freq = 1
print(f" -> 加载页面 {page}")
return False
def print_state(self):
print(f" 当前缓存: {[(k, f'{v[0]}次') for k, v in self.cache.items()]}")4.3 动手实验:模拟缓存置换算法
让我们写一个完整的实验来比较这些算法的表现:
import random
from collections import deque
class FIFOCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = set()
self.queue = deque()
def access(self, page):
if page in self.cache:
return True
if len(self.cache) >= self.capacity:
evicted = self.queue.popleft()
self.cache.remove(evicted)
self.cache.add(page)
self.queue.append(page)
return False
class LRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = {}
self.head = Node('head')
self.tail = Node('tail')
self.head.next = self.tail
self.tail.prev = self.head
def _remove(self, node):
node.prev.next = node.next
node.next.prev = node.prev
def _add_to_front(self, node):
node.next = self.head.next
node.prev = self.head
self.head.next.prev = node
self.head.next = node
def access(self, page):
if page in self.cache:
node = self.cache[page]
self._remove(node)
self._add_to_front(node)
return True
if len(self.cache) >= self.capacity:
lru = self.tail.prev
self._remove(lru)
del self.cache[lru.key]
new_node = Node(page)
self.cache[page] = new_node
self._add_to_front(new_node)
return False
class Node:
def __init__(self, key):
self.key = key
self.prev = None
self.next = None
def run_experiment():
"""运行缓存置换实验"""
# 模拟两种访问模式
print("=" * 60)
print("实验1:局部性访问模式(80-20规则)")
print("=" * 60)
# 80-20 分布:20%的页面占据80%的访问
def generate_zipf_request():
if random.random() < 0.8:
# 大概率访问热点页面
return random.randint(1, 4) # 热点页
else:
return random.randint(5, 20) # 非热点页
cache_size = 4
num_requests = 1000
# 测试各个算法
for name, CacheClass in [("FIFO", FIFOCache), ("LRU", LRUCache)]:
cache = CacheClass(cache_size)
misses = 0
for _ in range(num_requests):
page = generate_zipf_request()
if not cache.access(page):
misses += 1
hit_rate = 1 - misses / num_requests
print(f"{name}: 命中率 = {hit_rate:.2%}, 未命中 = {misses}")
print()
print("=" * 60)
print("实验2:循环访问模式")
print("=" * 60)
# 循环模式:1, 2, 3, 4, 1, 2, 3, 4, ...
cache_size = 4
num_requests = 1000
for name, CacheClass in [("FIFO", FIFOCache), ("LRU", LRUCache)]:
cache = CacheClass(cache_size)
misses = 0
for i in range(num_requests):
page = (i % 4) + 1 # 1, 2, 3, 4 循环
if not cache.access(page):
misses += 1
hit_rate = 1 - misses / num_requests
print(f"{name}: 命中率 = {hit_rate:.2%}, 未命中 = {misses}")
# 运行实验
run_experiment()典型的实验结果会是:
============================================================
实验1:局部性访问模式(80-20规则)
============================================================
FIFO: 命中率 = 55.20%, 未命中 = 448
LRU: 命中率 = 81.40%, 未命中 = 186
============================================================
实验2:循环访问模式
============================================================
FIFO: 命中率 = 0.00%, 未命中 = 1000
LRU: 命中率 = 0.00%, 未命中 = 1000
从实验结果可以看出:
- 在有局部性的访问模式下,LRU明显优于FIFO
- 在循环访问模式下,两者都表现很差(这是缓存大小等于工作集大小时的正常现象)
4.4 竞争比分析
在理论上,FIFO和LRU的最坏情况竞争比都是 ( 是缓存大小)。这意味着在最坏情况下,它们可能比最优算法差 倍。
这个下界是可以达到的,考虑请求序列:
FIFO和LRU在这个序列上每次都未命中,而最优算法可以用 个缓存空间达到 100% 命中率。
但在实际应用中,由于程序的局部性原理,LRU通常表现很好。这也是为什么LRU是现代计算机系统中最广泛使用的缓存置换策略。
5. 在线匹配:二分图中的在线约会
5.1 什么是在线匹配?
想象你在运营一个约会平台。用户不断注册,每个人都有一份他们感兴趣的异性列表。你的任务是把每个新来的人”匹配”给一个还没有被匹配的人。
这就是在线匹配问题。
更形式化地说,假设我们有一张二分图,左边是”在线”到来的顶点(医生),右边是固定的目标顶点(病人),每条边表示”左边顶点可以匹配右边顶点”。当左边的顶点到来时,我们必须立即决定把它匹配到右边哪个还未被匹配的顶点。
5.2 贪婪匹配:先到先得
最简单的策略是贪婪匹配:当一个新顶点到来时,随机(或按某种规则)选择一个它可以连接的未匹配顶点。
def greedy_matching(online_vertices, right_vertices, edges):
"""
贪婪在线匹配
每个在线顶点到来时,随机选择一个可用的邻居进行匹配
"""
matching = {} # 在线顶点 -> 右顶点
available_right = set(right_vertices)
for v in online_vertices:
# 找到v的所有可匹配顶点
neighbors = [u for u in edges[v] if u in available_right]
if neighbors:
chosen = random.choice(neighbors)
matching[v] = chosen
available_right.remove(chosen)
return matching贪婪匹配的竞争比分析很悲观:对于一般图,竞争比可能是 。这意味着在最坏情况下,贪婪匹配可能只匹配到最优算法 分之一的内容。
5.3 随机化帮忙
有意思的是,如果引入随机性,情况会好很多。
随机化贪婪匹配:当一个顶点到来时,不是固定选择某个邻居,而是随机打乱它所有可用的邻居,然后选择第一个。
def randomized_greedy_matching(online_vertices, right_vertices, edges):
"""
随机化贪婪匹配
"""
matching = {}
available_right = set(right_vertices)
for v in online_vertices:
neighbors = [u for u in edges[v] if u in available_right]
if neighbors:
# 随机打乱
random.shuffle(neighbors)
chosen = neighbors[0]
matching[v] = chosen
available_right.remove(chosen)
return matching随机化版本的竞争比可以改进到 。这比确定性的 好得多。
5.4 为什么随机能帮忙?
随机化帮的忙在于:它打破了”最坏情况输入”的诅咒。
确定性算法面临的困境是:敌手知道你的策略,然后精心构造一个让你表现最差的输入。但当算法使用随机性时,敌手无法预测你的下一步决策,因为它依赖于随机数。
这就像打牌:如果对手能看穿你的所有牌,你必输无疑;但如果你的决策有一定随机性,对手就无法完全针对你。
6. 在线装箱:把东西塞进箱子里
6.1 问题再描述
在线装箱问题是这样的:你有一堆物品,依次到来,每个物品的大小在0到1之间。你的任务是把这些物品装进容量为1的箱子里,每装一个物品就要决定放进哪个箱子(如果当前箱子放不下就开新箱子)。
目标是用最少的箱子。
这个问题在现实中有很多应用:
- 集装箱装货:货物一件件到来,需要实时决定装哪个集装箱
- 内存分配:进程申请内存块,操作系统需要实时分配
- 服务器资源调度:任务到来时分配到哪个服务器
6.2 四种经典策略
Next Fit(下一个箱子):当前箱子装不下了就开新箱子,之前的不再回头看。
这是最简单的策略,优点是只需要记住当前箱子,缺点是空间利用率可能很低。
class NextFitBinPacker:
"""
Next Fit 装箱算法
策略:当前箱子装不下就开新箱子
优点:O(1) 空间复杂度
缺点:空间利用率低
竞争比:2(渐近)
"""
def __init__(self, capacity=1.0):
self.capacity = capacity
self.current_bin_load = 0
self.num_bins = 0
def add_item(self, size):
if size > self.capacity:
raise ValueError(f"物品大小 {size} 超过箱子容量")
if self.current_bin_load + size <= self.capacity:
self.current_bin_load += size
else:
# 开新箱子
self.num_bins += 1
self.current_bin_load = size
return self.num_bins
def finish(self):
"""结束装箱,关闭最后一个箱子"""
if self.current_bin_load > 0:
self.num_bins += 1
return self.num_binsFirst Fit(第一个合适):从第一个箱子开始,找第一个能放下物品的。
这个策略比Next Fit好一些,但需要维护所有箱子的状态。
class FirstFitBinPacker:
"""
First Fit 装箱算法
策略:找第一个能放下物品的箱子
优点:空间利用率较高
缺点:需要维护箱子列表,插入操作较慢
竞争比:1.7(渐近)
"""
def __init__(self, capacity=1.0):
self.capacity = capacity
self.bins = [] # 每个箱子:(剩余容量, 物品列表)
def add_item(self, size):
# 线性查找第一个能放下物品的箱子
for i, (free_space, items) in enumerate(self.bins):
if free_space >= size:
self.bins[i] = (free_space - size, items + [size])
return i
# 没有能放下的箱子,开新箱子
self.bins.append((self.capacity - size, [size]))
return len(self.bins) - 1
def get_num_bins(self):
return len(self.bins)
def print_state(self):
for i, (free, items) in enumerate(self.bins):
print(f"箱子 {i}: 物品 {items}, 剩余 {free:.2f}")Best Fit(最合适):找一个刚好能放下物品的箱子(即剩余空间最小但能容纳物品的箱子)。
这个策略试图让每个箱子都尽量装满。
class BestFitBinPacker:
"""
Best Fit 装箱算法
策略:找剩余空间最小但能放下物品的箱子
优点:空间利用率高
缺点:需要排序或高效查找
竞争比:1.7(渐近)
"""
def __init__(self, capacity=1.0):
self.capacity = capacity
self.bins = [] # (剩余容量, 物品列表)
def add_item(self, size):
# 找剩余空间最小但 >= size 的箱子
best_idx = -1
best_free = self.capacity + 1
for i, (free, _) in enumerate(self.bins):
if free >= size and free < best_free:
best_free = free
best_idx = i
if best_idx == -1:
# 开新箱子
self.bins.append((self.capacity - size, [size]))
return len(self.bins) - 1
else:
# 更新最佳箱子
free, items = self.bins[best_idx]
self.bins[best_idx] = (free - size, items + [size])
return best_idxHarmonic Fit:按物品大小分类处理。
class HarmonicFitBinPacker:
"""
Harmonic Fit 装箱算法
策略:按物品大小分类,大物品单独处理,小物品按比例分配
竞争比:1.691...(最优在线算法之一)
"""
def __init__(self, capacity=1.0, k=3):
self.capacity = capacity
self.k = k
self.bins = [[] for _ in range(k)] # 不同大小类别的箱子
def _get_class(self, size):
"""根据物品大小确定类别"""
for i in range(1, self.k + 1):
if 1.0 / (i + 1) < size <= 1.0 / i:
return i - 1
return self.k # 特别小的物品
def add_item(self, size):
item_class = self._get_class(size)
# 在对应类别的箱子里找能放下的
for bin_list in self.bins[item_class]:
if sum(bin_list) + size <= self.capacity:
bin_list.append(size)
return
# 开新箱子
self.bins[item_class].append([size])
def get_num_bins(self):
return sum(len(bin_list) for bin_list in self.bins)6.3 竞争比分析
各种策略的竞争比:
| 算法 | 竞争比 | 备注 |
|---|---|---|
| Next Fit | 2 | 最简单,但浪费空间 |
| First Fit | 1.7 | 实际表现较好 |
| Best Fit | 1.7 | 与 First Fit 类似 |
| Harmonic Fit | 1.691… | 最优在线算法之一 |
| FFD(离线) | 11/9 ≈ 1.222 | 先排序再 First Fit |
有意思的是:没有确定性在线算法能做得比 1.531… 更好。这是竞争比的下界。所以 Harmonic Fit 已经非常接近最优了。
7. 多臂老虎机:探索与利用的两难困境
7.1 什么是多臂老虎机?
“多臂老虎机”这个名字听起来很唬人,但其实就是一个简单的思想实验:
想象你面前有 台老虎机,每台的”吐钱率”不同。你不知道每台机器的真实回报率,只知道每拉一次摇杆会有多少回报。你的任务是:在有限次数的拉杆中,最大化总收益。
这就是多臂老虎机问题(Multi-Armed Bandit Problem)。
这个问题在现实中太常见了:
- 新闻推荐:该推用户已知的热门新闻(利用),还是尝试推荐新内容(探索)?
- 新药试验:该给病人用已知的有效药物(利用),还是测试新药(探索)?
- 广告投放:该投放已知效果好的广告(利用),还是测试新广告(探索)?
- 网络路由:该用已知的最短路径(利用),还是尝试新路径(探索)?
这就是经典的探索-利用权衡(Exploration-Exploitation Tradeoff)。
7.2 几种简单的策略
纯利用(Greedy):一直选目前估计最好的臂。
问题:你怎么知道哪个臂最好?你没有探索过其他臂,所以你的”最好”可能只是基于很少样本的错觉。
纯探索(Random):每次随机选择。
问题:你确实探索了所有臂,但完全没有利用已知信息,效率很低。
ε-贪婪(Epsilon-Greedy):以概率 利用(选当前最好的),以概率 探索(随机选)。
import random
import math
class EpsilonGreedy:
"""
ε-贪婪算法
以概率 ε 探索(随机选择)
以概率 1-ε 利用(选择当前估计最好的)
"""
def __init__(self, n_arms, epsilon=0.1):
self.n_arms = n_arms
self.epsilon = epsilon
self.counts = [0] * n_arms # 每个臂被选择的次数
self.values = [0.0] * n_arms # 每个臂的估计价值
def select_arm(self):
"""选择一个臂"""
if random.random() > self.epsilon:
# 利用:选择当前估计价值最高的
return self.values.index(max(self.values))
else:
# 探索:随机选择
return random.randint(0, self.n_arms - 1)
def update(self, arm, reward):
"""更新臂的估计价值"""
self.counts[arm] += 1
n = self.counts[arm]
# 增量更新:Q_{n+1} = Q_n + (R_n - Q_n) / n
value = self.values[arm]
self.values[arm] = value + (reward - value) / n
def reset(self):
"""重置算法"""
self.counts = [0] * self.n_arms
self.values = [0.0] * self.n_arms7.3 UCB算法:平衡估计值和不确定性
ε-贪婪有一个问题:即使某个臂已经被充分探索,算法仍然有 ε 的概率随机选择它,造成浪费。
UCB(Upper Confidence Bound) 算法提供了一个更优雅的解决方案:
核心思想:不仅要考虑臂的估计价值,还要考虑我们对它估计的不确定性。不确定性越大的臂,越值得探索。
class UCB1:
"""
UCB1 算法
核心公式:UCB = 估计均值 + 置信上界
置信上界 = sqrt(2 * ln(总选择次数) / 该臂选择次数)
竞争比:O(sqrt(log T)) 的后悔值
"""
def __init__(self, n_arms):
self.n_arms = n_arms
self.counts = [0] * n_arms # 每个臂被选择的次数
self.values = [0.0] * n_arms # 每个臂的估计价值
self.total_counts = 0 # 总选择次数
def select_arm(self):
"""选择一个臂"""
# 首先确保每个臂至少被选一次
for arm in range(self.n_arms):
if self.counts[arm] == 0:
return arm
# 计算每个臂的UCB值
ucb_values = []
for arm in range(self.n_arms):
# 估计均值
estimated = self.values[arm]
# 置信上界
bonus = math.sqrt(
2 * math.log(self.total_counts) / self.counts[arm]
)
ucb_values.append(estimated + bonus)
# 选择UCB值最高的臂
return ucb_values.index(max(ucb_values))
def update(self, arm, reward):
"""更新臂的信息"""
self.total_counts += 1
self.counts[arm] += 1
n = self.counts[arm]
# 增量更新估计值
self.values[arm] = self.values[arm] + (reward - self.values[arm]) / nUCB的好处是:随着选择次数增加,被频繁选择的臂的不确定性会降低,而被冷落的臂的不确定性仍然较高,自动实现探索-利用的平衡。
7.4 Thompson采样:贝叶斯方法
Thompson采样是另一种优雅的解决方案,它使用贝叶斯推理:
import numpy as np
class ThompsonSampling:
"""
Thompson Sampling
假设每个臂的成功率服从 Beta 分布
每轮从后验分布采样,选择期望最高的臂
"""
def __init__(self, n_arms, prior_alpha=1, prior_beta=1):
self.n_arms = n_arms
self.alpha = [prior_alpha] * n_arms # Beta 分布的 alpha 参数
self.beta = [prior_beta] * n_arms # Beta 分布的 beta 参数
def select_arm(self):
"""选择一个臂"""
samples = []
for arm in range(self.n_arms):
# 从 Beta 分布采样
sample = np.random.beta(self.alpha[arm], self.beta[arm])
samples.append(sample)
# 选择采样值最高的臂
return samples.index(max(samples))
def update(self, arm, reward):
"""更新臂的后验分布"""
if reward == 1:
self.alpha[arm] += 1
else:
self.beta[arm] += 1
def get_expected_values(self):
"""获取每个臂的期望值"""
return [self.alpha[i] / (self.alpha[i] + self.beta[i])
for i in range(self.n_arms)]Thompson采样的好处是:它自然地在探索和利用之间取得平衡,而且实现简单,效果通常很好。
7.5 后悔值:衡量算法好坏
在多臂老虎机问题中,我们通常用后悔值(Regret) 来衡量算法好坏:
其中 是最优策略在 轮的总收益, 是算法在 轮的总收益。
好的算法应该让后悔值增长得越慢越好。理论上可以证明:
- UCB 的后悔值是
- Thompson Sampling 的后悔值也是
而纯贪婪算法,后悔值会线性增长——这意味着每轮都在犯错。
8. 对抗性分析:假设最坏的情况
8.1 为什么要考虑对抗性输入?
在设计在线算法时,我们需要考虑最坏情况。如果敌手(adversary)知道我们的算法,他可能会精心构造一个输入,让我们的算法表现很差。
这就是对抗性分析的核心思想:假设敌手是恶意且全知的,然后设计在最坏情况下仍然表现良好的算法。
8.2 对抗性模型的三种强度
Oblivious 对抗:敌手在算法运行之前就固定了整个输入,之后不再改变。
这是最弱的对抗假设。在这种模型下,敌手不能根据算法的实际决策来调整输入。
轻度自适应对抗:敌手在看到算法对当前输入的决策后,生成下一个输入。
这种敌手稍微强一些,可以针对算法的历史决策进行调整。
完全自适应对抗:敌手完全知道算法的内部状态和随机种子,可以针对性地构造最恶劣的输入。
这是最强的对抗假设。在这种模型下设计的算法,具有最强的鲁棒性保证。
8.3 对抗性分析的例子
考虑一个简单的例子:缓存置换的FIFO算法。
如果我们假设敌手是oblivious的,敌手必须在算法运行前就决定好整个请求序列。如果我们知道算法用FIFO,我们就可以构造序列 让它每次都未命中。
但如果敌手是完全自适应的,它可以根据算法的实际状态来决定下一个请求。比如,当算法刚刚驱逐了页面1,那敌手就让下一个请求正好是页面1,让算法再次未命中。
8.4 随机化作为防护
随机化是在线算法对抗自适应敌手的主要武器。
当算法使用随机性时,即使敌手知道算法的策略,它也无法预测算法的具体决策(因为不知道随机数)。这给算法提供了一种”信息不对称”的保护。
class RandomizedCache:
"""
随机化缓存
当需要驱逐时,随机选择一个页面
这比FIFO/LRU更好地抵抗自适应敌手
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = set()
def access(self, page):
if page in self.cache:
return True
if len(self.cache) >= self.capacity:
# 随机选择一个页面驱逐
evict_page = random.choice(list(self.cache))
self.cache.remove(evict_page)
self.cache.add(page)
return False9. 随机化在线算法:为什么随机能帮助?
9.1 随机化的威力
在前面的例子中我们已经看到,随机化可以帮助打破确定性算法的对称性,让我们能够抵抗自适应敌手的攻击。
但随机化的作用远不止于此。
考虑在线匹配问题:
- 确定性算法:竞争比是
- 随机化算法:竞争比是
这是一个巨大的改进!
9.2 Yao原理:从输入分布看问题
Yao原理告诉我们:要证明随机化算法的期望竞争比下界,只需要构造一个输入分布,使得所有确定性算法在该分布下的期望竞争比有一个下界。
用人话来说:如果你想证明”没有任何随机化算法能做得比某条线更好”,你只需要找到一个”困难”的输入分布,然后证明”在这个分布下,任何确定性算法都表现很差”。
def yao_principle_experiment(n_arms=3, trials=1000):
"""
演示 Yao 原理
构造一个"困难"的输入分布
然后检验不同确定性算法的表现
"""
results = []
# 假设臂的真实价值
true_values = [0.3, 0.5, 0.7] # 大部分臂都差不多好
optimal_value = 0.7
for _ in range(trials):
# 模拟一轮选择
chosen = random.randint(0, n_arms - 1)
reward = random.random() < true_values[chosen]
results.append((chosen, reward))
# 计算期望后悔
expected_reward = sum(
true_values[c] for c, r in results
) / trials
optimal_expected = optimal_value
regret = optimal_expected - expected_reward
return regret9.3 随机化 vs 确定性:什么时候随机更好?
-
当问题有”对称性破缺”的需求时:如果所有确定性算法面临相同的输入结构,随机化可以帮助打破这种对称性。
-
当敌手是自适应的时候:随机性是对抗自适应敌手的有效武器。
-
当输入分布未知或难以建模时:不需要假设输入分布,随机化算法在任意输入上都有”平均”表现保证。
10. 在线学习框架:专家建议问题
10.1 从专家建议中学习
专家建议(Experts Advice) 框架是在线学习的另一个核心模型:
- 有 个”专家”,每个专家在每个时刻给出一个预测
- 学习算法必须从专家建议中选择一个,然后观察真实结果
- 目标是最小化与最佳专家的差距(后悔值)
class OnlineLearning:
"""
在线学习框架:专家建议模型
每轮:
1. N个专家给出预测
2. 算法选择一个专家
3. 观察真实结果
4. 更新算法状态
"""
def __init__(self, n_experts):
self.n_experts = n_experts
self.cumulative_losses = [0.0] * n_experts
def predict(self, expert_predictions):
"""
基于专家预测做预测
子类需要实现这个方法
"""
raise NotImplementedError
def observe(self, actual):
"""观察真实结果"""
raise NotImplementedError
def get_best_expert_so_far(self):
"""到目前为止表现最好的专家"""
return min(range(self.n_experts),
key=lambda i: self.cumulative_losses[i])10.2 加权多数算法
加权多数算法(Weighted Majority) 是解决专家建议问题的一个经典方法:
class WeightedMajority(OnlineLearning):
"""
加权多数算法
每次专家预测错误时,将该专家的权重乘以 β(0 < β < 1)
最终预测是权重较大的预测
"""
def __init__(self, n_experts, beta=0.5):
super().__init__(n_experts)
self.weights = [1.0] * n_experts
self.beta = beta
def predict(self, expert_predictions):
"""加权投票预测"""
pos_weight = sum(
w for w, pred in zip(self.weights, expert_predictions)
if pred > 0
)
neg_weight = sum(
w for w, pred in zip(self.weights, expert_predictions)
if pred <= 0
)
return 1 if pos_weight >= neg_weight else -1
def observe(self, actual, expert_predictions):
"""观察结果并更新权重"""
for i, (pred, weight) in enumerate(
zip(expert_predictions, self.weights)
):
if pred != actual:
# 预测错误,权重衰减
self.weights[i] *= self.beta
self.cumulative_losses[i] += 110.3 Follow the Leader
Follow the Leader(跟随领先者) 是另一个直觉上很自然的策略:每轮选择到目前为止表现最好的专家。
class FollowTheLeader(OnlineLearning):
"""
Follow the Leader (FTL)
每轮选择累积损失最小的专家
"""
def predict(self, expert_predictions):
"""选择到目前为止表现最好的专家"""
return self.get_best_expert_so_far()但FTL有一个问题:它容易过拟合当前数据。考虑一个敌手,他让专家A在奇数轮正确、偶数轮错误,专家B相反。FTL会不断切换专家,而这种切换本身就会带来损失。
11. 在线算法与强化学习:探索-利用的视角
11.1 共享的困境
在线算法和强化学习分享着相同的基本困境:当前决策会影响未来信息,而未来信息又会影响未来的决策。
在强化学习中,这表现为:
- 探索:尝试新动作,获取新信息
- 利用:使用已知信息,最大化当前奖励
在在线算法中,这表现为:
- 探索:尝试新策略,学习环境
- 利用:使用已知最优策略
两者本质上是同一个问题!
11.2 在线算法的强化学习视角
从强化学习的角度看,很多在线算法问题都可以被重新诠释:
多臂老虎机 = 单状态强化学习问题
上下文老虎机 = 带特征的强化学习
完整MDP = 完全在线决策问题
class BanditAsRL:
"""
把老虎机问题建模为单状态MDP
"""
def __init__(self, n_arms):
self.n_arms = n_arms
# 状态空间 = 1(单状态)
# 动作空间 = n_arms(n个臂)
# 奖励分布未知
self.q_values = [0.0] * n_arms # 状态-动作值函数
self.counts = [0] * n_arms
def select_action(self, epsilon=0.1):
"""ε-贪婪策略"""
if random.random() < epsilon:
return random.randint(0, self.n_arms - 1)
return self.q_values.index(max(self.q_values))
def update(self, action, reward):
"""TD更新"""
self.counts[action] += 1
n = self.counts[action]
# 更新Q值(单步TD)
self.q_values[action] += (reward - self.q_values[action]) / n11.3 在线决策的实际应用
推荐系统:用户不断点击,推荐系统需要实时更新模型,同时探索新的推荐策略。
广告投放:广告主不断竞拍位置,平台需要实时决定展示哪个广告,同时学习用户的点击偏好。
自动驾驶:车辆在道路上行驶,需要实时做出驾驶决策,同时学习路况和驾驶策略。
12. Primal-Dual方法:在线算法的经典技巧
12.1 什么是Primal-Dual?
Primal-Dual方法是设计在线算法的一个强大技巧。它的基本思想是:
- 将原问题(primal)转化为一个优化问题
- 考虑其对偶问题(dual)
- 设计一个算法,同时维护primal和dual的可行性
- 利用对偶变量的信息来指导primal的决策
12.2 Ski Rental的Primal-Dual分析
让我们用Primal-Dual方法重新分析Ski Rental问题:
Primal(原问题):
- 决策变量: 表示第 天是否租, 表示是否买
- 目标:最小化总成本
Dual(对偶问题):
- 对偶变量: 与每个决策相关
- 约束:从对偶约束推导出最优策略
这个分析比较技术性,但核心思想是:对偶变量可以被解释为”继续等待的价值”,当这个价值超过某个阈值时,就该做出决策了。
def primal_dual_ski_rental(r, p, actual_days):
"""
Ski Rental 的 Primal-Dual 算法
Primal: min Σ x_d * r + y * p
约束: Σ x_d + y >= 1 (每天要么租要么买)
Dual: max Σ α_d
约束: α_d + β <= r (租) 或 p (买)
"""
y = 0 # 是否已买
x = [0] * (actual_days + 1) # 每天是否租
# Dual变量
alpha = [0.0] * (actual_days + 1)
beta = 0
for d in range(1, actual_days + 1):
# 计算dual约束
if beta + r >= p:
# 买更划算,选择买
y = 1
break
else:
# 继续租
x[d] = 1
alpha[d] = r - beta # 更新dual变量
return x, y12.3 Primal-Dual的实际价值
Primal-Dual方法的优势在于:
-
提供结构性理解:为什么这个算法是对的?对偶变量给出了直观的解释。
-
统一多种算法:很多看似不同的在线算法,实际上都可以用Primal-Dual框架来理解。
-
指导新算法设计:通过构造对偶可行解,可以”指导”设计更好的在线算法。
13. 动手实验:完整模拟
13.1 缓存置换模拟器
让我们写一个完整的缓存置换模拟器,可以比较各种策略:
import random
from collections import deque
class CacheSimulator:
"""缓存置换算法模拟器"""
def __init__(self, capacity, strategy='LRU'):
self.capacity = capacity
self.strategy = strategy
self.cache = set()
self.misses = 0
self.hits = 0
# 特定策略的数据结构
self.queue = deque() # FIFO
self.access_order = {} # LRU
def access(self, page):
"""访问一个页面"""
if page in self.cache:
self.hits += 1
if self.strategy == 'LRU':
self.access_order[page] = len(self.access_order)
return True
self.misses += 1
if len(self.cache) >= self.capacity:
self._evict()
self.cache.add(page)
if self.strategy == 'FIFO':
self.queue.append(page)
elif self.strategy == 'LRU':
self.access_order[page] = len(self.access_order)
return False
def _evict(self):
"""驱逐一个页面"""
if self.strategy == 'FIFO':
evicted = self.queue.popleft()
self.cache.remove(evicted)
elif self.strategy == 'LRU':
# 驱逐最久未使用的
lru_page = min(self.access_order.keys(),
key=lambda p: self.access_order[p])
del self.access_order[lru_page]
self.cache.remove(lru_page)
elif self.strategy == 'RANDOM':
evicted = random.choice(list(self.cache))
self.cache.remove(evicted)
def hit_rate(self):
total = self.hits + self.misses
return self.hits / total if total > 0 else 0
def generate_requests(num_requests, pattern='zipfian'):
"""生成请求序列"""
requests = []
if pattern == 'zipfian':
# 80-20 分布
for _ in range(num_requests):
if random.random() < 0.8:
requests.append(random.randint(1, 4))
else:
requests.append(random.randint(5, 20))
elif pattern == 'cyclic':
# 循环模式
for i in range(num_requests):
requests.append((i % 8) + 1)
elif pattern == 'random':
# 纯随机
for _ in range(num_requests):
requests.append(random.randint(1, 20))
elif pattern == 'locality':
# 局部性模式:大部分请求集中在少数页面
hot_pages = random.sample(range(1, 21), 5)
for _ in range(num_requests):
if random.random() < 0.7:
requests.append(random.choice(hot_pages))
else:
requests.append(random.randint(1, 20))
return requests
def run_comparison():
"""比较不同策略的表现"""
print("=" * 70)
print("缓存置换算法比较实验")
print("=" * 70)
patterns = ['zipfian', 'cyclic', 'random', 'locality']
strategies = ['FIFO', 'LRU', 'RANDOM']
cache_sizes = [3, 5, 10]
num_requests = 5000
results = {}
for pattern in patterns:
results[pattern] = {}
requests = generate_requests(num_requests, pattern)
print(f"\n请求模式: {pattern}")
print("-" * 40)
for strategy in strategies:
best_hit_rate = 0
best_cache_size = 0
for cache_size in cache_sizes:
sim = CacheSimulator(cache_size, strategy)
for page in requests:
sim.access(page)
hit_rate = sim.hit_rate()
if hit_rate > best_hit_rate:
best_hit_rate = hit_rate
best_cache_size = cache_size
print(f" {strategy}: 最佳命中率 {best_hit_rate:.2%} "
f"(缓存大小={best_cache_size})")
results[pattern][strategy] = best_hit_rate
return results
# 运行实验
if __name__ == "__main__":
results = run_comparison()13.2 多臂老虎机模拟器
import random
import math
import numpy as np
class BanditSimulator:
"""多臂老虎机模拟器"""
def __init__(self, n_arms, true_means):
self.n_arms = n_arms
self.true_means = true_means # 每个臂的真实平均奖励
self.counts = [0] * n_arms
self.values = [0.0] * n_arms
def pull(self, arm):
"""拉动指定臂"""
self.counts[arm] += 1
# 奖励 = 真实均值 + 高斯噪声
reward = self.true_means[arm] + random.gauss(0, 0.1)
return reward
def update(self, arm, reward):
"""更新估计"""
n = self.counts[arm]
self.values[arm] = self.values[arm] + (reward - self.values[arm]) / n
class EpsilonGreedy:
def __init__(self, n_arms, epsilon=0.1):
self.n_arms = n_arms
self.epsilon = epsilon
self.counts = [0] * n_arms
self.values = [0.0] * n_arms
def select_arm(self):
if random.random() > self.epsilon:
return self.values.index(max(self.values))
return random.randint(0, self.n_arms - 1)
def update(self, arm, reward):
self.counts[arm] += 1
n = self.counts[arm]
self.values[arm] = self.values[arm] + (reward - self.values[arm]) / n
class UCB1:
def __init__(self, n_arms):
self.n_arms = n_arms
self.counts = [0] * n_arms
self.values = [0.0] * n_arms
self.total_counts = 0
def select_arm(self):
for arm in range(self.n_arms):
if self.counts[arm] == 0:
return arm
ucb_values = []
for arm in range(self.n_arms):
bonus = math.sqrt(
2 * math.log(self.total_counts) / self.counts[arm]
)
ucb_values.append(self.values[arm] + bonus)
return ucb_values.index(max(ucb_values))
def update(self, arm, reward):
self.total_counts += 1
self.counts[arm] += 1
n = self.counts[arm]
self.values[arm] = self.values[arm] + (reward - self.values[arm]) / n
class ThompsonSampling:
def __init__(self, n_arms):
self.n_arms = n_arms
self.alpha = [1] * n_arms
self.beta = [1] * n_arms
def select_arm(self):
samples = [
np.random.beta(self.alpha[i], self.beta[i])
for i in range(self.n_arms)
]
return samples.index(max(samples))
def update(self, arm, reward):
if reward == 1:
self.alpha[arm] += 1
else:
self.beta[arm] += 1
def run_bandit_experiment():
"""运行老虎机实验"""
print("=" * 70)
print("多臂老虎机算法比较实验")
print("=" * 70)
# 设置真实的臂价值
true_means = [0.3, 0.5, 0.7, 0.4, 0.6]
n_arms = len(true_means)
n_rounds = 10000
algorithms = {
'ε-Greedy (ε=0.1)': EpsilonGreedy(n_arms, 0.1),
'ε-Greedy (ε=0.01)': EpsilonGreedy(n_arms, 0.01),
'UCB1': UCB1(n_arms),
'Thompson Sampling': ThompsonSampling(n_arms),
}
results = {name: {'total_reward': 0, 'regrets': []}
for name in algorithms}
# 运行实验
for name, algo in algorithms.items():
bandit = BanditSimulator(n_arms, true_means)
best_mean = max(true_means)
cumulative_reward = 0
cumulative_regret = 0
for t in range(n_rounds):
arm = algo.select_arm()
reward = bandit.pull(arm)
algo.update(arm, reward)
bandit.update(arm, reward)
cumulative_reward += reward
cumulative_regret += best_mean - true_means[arm]
if (t + 1) % 1000 == 0:
results[name]['regrets'].append(cumulative_regret)
results[name]['total_reward'] = cumulative_reward
results[name]['final_regret'] = cumulative_regret
# 打印结果
print("\n算法比较:")
print("-" * 50)
for name, result in results.items():
print(f"{name}:")
print(f" 总奖励: {result['total_reward']:.2f}")
print(f" 最终后悔值: {result['final_regret']:.2f}")
return results
if __name__ == "__main__":
results = run_bandit_experiment()14. 在线算法的设计原则总结
14.1 核心设计原则
-
延迟决策原则:尽可能推迟不可逆的决策,直到收集了更多信息。
-
保守与冒险的平衡:根据问题特性选择合适的风险水平。
-
利用局部性:如果访问模式有局部性,设计利用局部性的策略(如LRU)。
-
随机化防护:使用随机性来对抗自适应敌手的攻击。
-
Primal-Dual思维:从对偶问题的角度理解算法行为。
14.2 各类问题的最优策略
| 问题 | 确定性竞争比 | 随机化竞争比 | 理论下界 |
|---|---|---|---|
| Ski Rental | 同 | ||
| 在线装箱 | |||
| 负载均衡(台机) | |||
| 缓存置换(k页) | |||
| 列表更新 | |||
| 在线匹配 |
15. 在线算法的未来方向
15.1 与深度学习的结合
近年来,在线算法和深度学习的结合成为一个热门研究方向:
- 强化学习:用神经网络学习更好的在线策略
- 元学习:让算法学会”如何学习”
- 自适应算法:根据输入特性自动调整策略
15.2 鲁棒在线优化
传统的竞争分析假设敌手是全知全能的,但在实际应用中,敌手的能力往往是有限的。鲁棒在线优化研究在部分信息下的在线决策问题。
15.3 在线算法与因果推断
现代推荐系统和在线广告需要处理选择偏差问题,这需要将在线算法的思想与因果推断方法相结合。
参考来源
- Borodin, A., & El-Yaniv, R. (1998). Online Computation and Competitive Analysis. Cambridge University Press.
- Sleator, D. D., & Tarjan, R. E. (1985). Amortized Efficiency of List Update and Paging Rules. CACM.
- Graham, R. L. (1966). Bounds for Certain Multiprocessing Anomalies. Bell System Technical Journal.
- Karp, R. M. (1992). On-Line Algorithms. On-Line Algorithms.
- Mehta, A., Saberi, A., Vazirani, U., & Vazirani, V. (2005). AdWords and Generalized Online Matching. JACM.
- Belady, L. A. (1966). A Study of Replacement Algorithms for Virtual-Storage Computer. IBM Systems Journal.
- Auer, P., Cesa-Bianchi, N., & Fischer, P. (2002). Finite-time Analysis of the Multiarmed Bandit Problem. Machine Learning.
- Lai, T. L., & Robbins, H. (1985). Asymptotically Efficient Adaptive Allocation Rules. Advances in Applied Mathematics.
- Bansal, N., & Pruhs, K. (2010). The Geometry of Online Packing Linear Programs. Math. Oper. Res.
- Buchbinder, N., & Naor, J. (2009). The Design of Competitive Online Algorithms via a Primal-Dual Approach. Foundations and Trends in Theoretical Computer Science.