在线算法详解

关键词速览

关键词解释
竞争分析在线算法与最优离线算法的比较框架
竞争比在线算法性能与最优算法的比值上界
最优在线决策基于部分信息的顺序决策问题
Secretary Problem经典的选择最优候选人问题
在线装箱物品序列的在线分配
负载均衡任务分配到多台机器
Ski Rental在线租买决策问题
对抗性输入分析在线算法的输入模型
随机化在线使用随机性的在线算法
Yao原理期望竞争比的下界证明
缓存置换页面替换的在线策略
LRU最近最少使用策略
分页算法虚拟内存管理
列表更新自我组织的线性搜索
多臂老虎机探索利用权衡的数学模型
上置信界UCB算法
注册模型在线学习的形式化框架
后悔值在线学习与最优的差距
次线性算法只需次线性信息访问的算法
流算法数据流模型下的在线处理

摘要

在线算法研究在信息不完整条件下进行决策的理论与实践。与离线算法拥有全部输入不同,在线算法必须在收到每个输入后立即做出不可撤回的决定。本文系统阐述竞争分析框架,通过经典案例(秘书问题、在线装箱、负载均衡、Ski Rental问题)展示问题建模与算法设计技术,并探讨随机化在线算法的优势。在线算法的思想广泛应用于操作系统调度、网络路由、搜索引擎排序和推荐系统等AI领域。

在线算法的研究起源于20世纪70年代的”表调度”(list scheduling)和”分页”(paging)问题。1975年,Graham对调度问题的开创性工作引入了竞争分析的概念。1979年,Sleator和Tarjan发表了关于自我调整列表(self-adjusting lists)的经典论文,开启了在线算法理论分析的序幕。此后,Karp、Borodin和El-Yaniv等学者系统发展了竞争分析的理论框架。

本文将覆盖以下内容:首先是竞争分析的数学基础与输入模型;其次是秘书问题、在线装箱、负载均衡等经典问题的深入分析;然后是缓存置换算法、列表更新等操作系统中的在线问题;接着探讨多臂老虎机等在线学习框架;再讨论随机化在线算法与Yao原理;最后展示在线算法在AI和现代计算中的应用。


1. 在线算法入门:为什么看不到未来也能做决策?

1.1 现实中的在线决策困境

想象这样一个场景:你站在一家自助餐厅门口,手里拿着餐票,不知道今天会有什么菜。你可能特别喜欢某个菜(比如糖醋排骨),但问题是——如果你现在拿了别的菜,后面再想拿糖醋排骨时可能已经被抢光了。反过来,如果你一开始就冲向糖醋排骨,结果发现今天的糖醋排骨巨难吃,那你这顿饭就毁了。

这就是在线决策的本质困境:你必须在信息不完全的时候做出选择,而且一旦选了就改不了。

现实世界里这种困境无处不在:

购物决策:双十一你看到一件大衣打折,到底是现在买还是等更低价?万一后面涨价了呢?

职业选择:毕业季你拿到了几个offer,是接受现在这个还是再等等后面的机会?

相亲约会:见了几个相亲对象,什么时候该定下来?

这些问题都有一个共同特点:你不知道未来会发生什么,但必须现在就做决定,而且决定往往不可逆。

在线算法就是研究这类问题的学科,只不过我们把它抽象化、数学化,然后想办法设计出”还不错”的决策策略。

1.2 什么是在线算法?

在线算法(Online Algorithm) 是指一种处理输入的算法,其输入是随着时间逐步到达的,而不是一次性全部给出的。在线算法必须在收到每个输入片段后立即做出响应,且这些响应通常是不可撤回的。

这跟离线算法(Offline Algorithm) 形成了鲜明对比。离线算法就像是一个全知全能的上帝,它在做出决定之前已经看到了所有的输入。比如,如果你知道未来一年股票市场的所有走势,那你当然可以做出最优的投资决策——但问题是,这种”上帝视角”在现实中根本不存在。

举几个在线算法的具体例子:

电梯调度:当你在某一层按下电梯按钮时,电梯控制系统必须立即决定是停还是不停。它不知道后面还有多少人会按按钮,也不知道现在等待的人要上还是下。这就是在线决策。

缓存置换:你的CPU缓存或者浏览器缓存空间有限,当需要加载一个新页面而缓存已满时,系统必须决定把哪个旧页面踢出去。它不知道这个被踢出去的页面后面还会不会被用到。这也是在线决策。

搜索引擎排名:用户输入一个查询,搜索引擎必须立即返回排名结果。它不知道用户最终会点击哪个结果,也不知道这个用户下次会搜索什么。这也是在线决策。

1.3 在线性的三个来源

为什么会有在线问题?主要是因为三种”在线性”:

时间在线性:数据随着时间流逝不断到来,你必须实时处理。比如网络数据流、传感器读数、用户点击行为等。凌晨2点到达的日志和下午2点到达的日志,处理方式可能完全不同,但你必须在它们到来的时候立刻做出反应。

空间在线性:你只能看到数据的一部分,无法一次性获取所有数据。比如处理一个巨大的数据库,你可能无法一次性把数据全部加载到内存中,只能分批处理。

结构在线性:数据的结构本身是逐步暴露的。比如在图的遍历中,你可能只看到当前节点的邻居,要决定是否探索这条边时还看不到更远的节点。

1.4 在线性程度的光谱

并不是所有问题都是”完全在线”的,在线和离线之间有一个渐变的光谱:

完全离线 ←————————————————————→ 完全在线
    |                               |
    |      半在线(部分信息已知)       |
    |           |                    |
    |   批处理(有限 lookahead)      |
    |      预约式(可延迟)           |
    |                               |
    ←————————————— 决策可逆 ———————————————→

完全离线:拥有完整信息,可以做出全局最优决策。

半在线:知道输入的部分统计特性或分布,比如知道商品价格的波动范围。

有限lookahead:可以看到未来一小段,比如天气预报能预测未来几天的天气。

决策可逆:有些决定虽然要即时做出,但后面可以反悔,比如网上购物有七天无理由退货。

完全在线:信息完全未知,必须立即决策,且完全不可逆。


2. 竞争比分析:在线算法的”性能保证”是什么?

2.1 为什么不能只看准确率?

在机器学习中,我们通常用准确率、召回率这些指标来衡量算法好坏。但在在线算法领域,这些指标不太好用。为什么呢?

因为在线算法的”正确答案”取决于未来,而未来是未知的。所以我们需要一个不同的比较框架。

2.2 竞争分析的核心思想

竞争分析(Competitive Analysis) 是在线算法领域的核心评估框架。它的基本思想是:把在线算法的表现和”知道未来的最优算法”进行比较。

这里的”最优离线算法”(记作OPT)是一个假想的全知全能算法,它能看到所有输入,然后做出最优决策。我们让在线算法ALG和OPT在相同的输入序列上运行,比较它们的代价。

形式化地说:

表示在线算法 在输入 上的代价(或收益), 表示最优离线算法在相同输入 上的代价(或收益)。

竞争比的定义: 一个在线算法 -竞争的,当且仅当存在常数 使得

对所有输入 成立。

这里 就是竞争比(Competitive Ratio),也叫竞争因子

  • 对于最小化问题(如成本、时间),我们希望 越小越好
  • 对于最大化问题(如收益、准确率),我们希望 越大越好(通常写成 的形式)

2.3 竞争比意味着什么?

竞争比给了我们一个很有意思的保证:不管未来输入是什么样的,我的算法最多只比最优算法差

举例来说,如果一个在线装箱算法的竞争比是 1.7,这意味着:

  • 不管来了什么形状的箱子
  • 不管这些箱子以什么顺序到达
  • 我们的算法用的箱子数最多是最优算法的 1.7 倍

这听起来可能有点抽象,但想象一下这个场景:你在组织一场装箱比赛,规则是你必须实时把物品装进箱子(不能预知后面的物品),而你的对手可以预先看到所有物品然后慢慢规划。即使在这种情况下,你的算法也不会输得太惨,最多也就多用 70% 的箱子。

2.4 竞争比的直观理解

用一个生活化的例子来理解:

假设你在玩一个游戏,游戏规则是你要选择每天是走路还是打车去上班。如果你选走路,花费是0元但耗时60分钟;如果你选打车,花费是50元但耗时20分钟。

完全不知道未来天气的情况下,你每天都要做选择:今天是走路还是打车?

如果天气预报告诉你未来一周都会下雨,那最优策略显然是每天都打车。但如果没有任何信息,你就陷入了在线决策的困境。

竞争比衡量的就是:在最坏情况下,你的”盲目决策”比”全知决策”差多少倍。

2.5 竞争比分析的两个方向

竞争比上界:通过构造性证明,展示某个在线算法的竞争比不会超过某个值。

竞争比下界:证明任何在线算法都不可能做到比某个竞争比更好。

这两个方向同样重要。上界告诉我们”能做到多好”,下界告诉我们”能指望多好”。如果某个问题的上界等于下界,那就意味着我们找到了最优的在线算法。

2.6 渐近竞争比

在实际应用中,我们通常更关心当输入规模很大时的表现,所以会使用渐近竞争比(Asymptotic Competitive Ratio)

时,若 ,则算法是渐近 -竞争的。

用人话来说就是:对于足够大的输入,我们的算法最多比最优算法差 倍,常数项的影响可以忽略。


3. Ski Rental问题:租雪具还是买雪具?

3.1 问题的来龙去脉

Ski Rental问题(租雪具问题)是理解在线算法的一个绝佳入口,因为它足够简单,但又足够深刻。

问题的场景是这样的:冬天到了,你想滑雪,但不确定今年会滑几次。雪具店有两种方案:

  • :每天租金
  • :一次性付 元,之后就免费滑了

问题是:你事先不知道今年会滑多少天。每天滑完后,你可以决定明天还滑不滑(所以你可以根据当天滑的情况来决定明天是否继续),但你必须在当天结束时决定,而不是等到知道明天的情况后再决定。

到底什么时候该买?

3.2 租还是买:直觉上的两难

表面上这个问题很简单:

  • 如果你打算滑很多天,买肯定更划算
  • 如果你只打算滑一两天,租更划算

但问题是:你不知道到底会滑多少天

如果你决定租到底,最坏情况是滑了 天以上,那你就亏了(租金加起来超过了买的价格)。

如果你直接买,最坏情况是只滑了一天,那你就亏了(买了个可能再也不用的东西)。

3.3 最优策略:找到一个”买断点”

经过数学分析,这个问题有一个漂亮的最优策略:存在一个最优的买断点

策略如下:

  1. 天选择租
  2. 天之后,选择买(不管之前租了几天)

为什么这样是对的呢?让我们来算一笔账:

情况1:你总共滑了

  • 如果 ,最优就是一直租,花费
  • 如果 ,最优是先租 天,然后买,花费

我们的策略在这些情况下都不会太差。最坏情况是 时,我们的花费是 ,而最优是 ,比值最多是

def ski_rental_decision(r, p, days_remaining, days_rented):
    """
    Ski Rental 最优决策
    
    参数:
    - r: 每日租金
    - p: 购买价格
    - days_remaining: 还剩多少天要决定(今天结束后)
    - days_rented: 已经租了多少天
    
    返回: 'rent' 或 'buy'
    """
    # 如果租了 p/r 天以上,买更划算
    if days_rented * r >= p:
        return 'buy'
    else:
        # 继续租
        return 'rent'
 
def ski_rental_optimal_strategy(r, p, actual_days):
    """
    模拟最优策略的执行
    """
    days_rented = 0
    total_cost = 0
    
    while days_rented < actual_days:
        decision = ski_rental_decision(r, p, 
                                       actual_days - days_rented,
                                       days_rented)
        if decision == 'rent':
            total_cost += r
            days_rented += 1
            print(f"第{days_rented}天:租,花费{total_cost}元")
        else:
            total_cost += p
            print(f"第{days_rented+1}天:买断,总花费{total_cost}元")
            break
    
    return total_cost

3.4 竞争比:2-差不多是最优的

这个策略的竞争比是 。由于 (否则没人会考虑买),竞争比在 1 到 2 之间。

更有意思的是:任何确定性在线算法的竞争比都不可能小于 2 - r/p。所以我们的策略实际上是最优的!

为什么竞争比不能更好?考虑这个场景:

  • 敌手知道你会在第 天买
  • 如果你租了 天就决定买,那敌手就只给你 天的滑雪日,让你白白买了却没机会用
  • 如果你决定一直租,那敌手就给你无限多的滑雪日,让你租到亏本

这就是 Ski Rental 问题告诉我们的:在信息完全缺失的情况下,你不可能做得比”最多亏一倍”更好

3.5 Ski Rental的现实映射

Ski Rental 问题看似是个滑雪场景,实际上它可以映射到很多现实决策:

软件订阅 vs 永久授权:月付 元 vs 一次付 元买断。什么时候该从订阅转为买断?

云服务选型:按需付费 vs 预留实例。对于一家快速成长的公司,什么时候该从按需付费转向预留实例?

健身卡选择:次卡 vs 月卡 vs 年卡。你每周去几次健身房时,该选哪种卡?

设备采购:租设备 vs 买设备。什么时候该从租赁转向购买?

所有这些问题本质上都和 Ski Rental 是同一个数学问题。


4. 缓存置换算法:系统内存不够了怎么办?

4.1 缓存置换的直觉

想象你是一个图书管理员,你的书架只能放10本书。现在有人来借书,你需要把书架上的某些书挪走,把新书放上去。但问题是,你不知道这些被挪走的书以后还会不会有人借。

这就是缓存置换问题(Cache Replacement Problem)

在计算机系统中,缓存置换无处不在:

  • CPU缓存:L1/L2/L3缓存空间有限
  • 内存分页:物理内存不够时需要换出到磁盘
  • 浏览器缓存:本地存储空间有限
  • CDN缓存:边缘节点存储空间有限
  • 数据库缓冲池:内存页的置换

所有这些场景,本质上都是同一个问题:当缓存满了,需要踢掉一个旧条目放入新条目,该踢哪个?

4.2 三种经典策略

FIFO(First In First Out):最早进来的,优先出去。

这个策略很简单,就像排队一样,先来先服务。它的优点是实现起来非常容易,缺点是可能会踢掉经常用的东西——因为一个东西进缓存很久,不代表它不重要。

from collections import deque
 
class FIFOCache:
    """
    FIFO 缓存置换算法
    
    策略:最早进入的页面先被踢出
    特点:实现简单,但可能踢掉热点数据
    """
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = set()
        self.queue = deque()  # 记录进入顺序
    
    def access(self, page):
        """访问一个页面"""
        if page in self.cache:
            print(f"页面 {page}: 命中")
            return True  # 命中
        
        print(f"页面 {page}: 未命中")
        # 未命中,需要加载
        if len(self.cache) >= self.capacity:
            # 缓存满了,踢出队首
            evicted = self.queue.popleft()
            self.cache.remove(evicted)
            print(f"  -> 驱逐页面 {evicted}")
        
        self.cache.add(page)
        self.queue.append(page)
        print(f"  -> 加载页面 {page}")
        return False
    
    def print_state(self):
        print(f"  当前缓存: {sorted(self.cache)}")
        print(f"  队列顺序: {list(self.queue)}")

LRU(Least Recently Used):最近最少使用的,优先出去。

这个策略的直觉是:如果一个东西最近被用过,那它以后被用的可能性也较大。LRU 在实践中表现很好,但实现起来稍微复杂一点(需要记录每个页面的”最近使用时间”)。

class LRUCache:
    """
    LRU 缓存置换算法
    
    策略:最久未使用的页面先被踢出
    特点:符合局部性原理,实际表现优秀
    实现:双向链表 + 哈希表,达到 O(1) 复杂度
    """
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = {}  # key -> node
        
        # 双向链表
        self.head = Node('head')  # 最最近使用的
        self.tail = Node('tail')
        self.head.next = self.tail
        self.tail.prev = self.head
    
    def _remove(self, node):
        """从链表中移除一个节点"""
        node.prev.next = node.next
        node.next.prev = node.prev
    
    def _add_to_front(self, node):
        """把节点加到最最近使用的位置"""
        node.next = self.head.next
        node.prev = self.head
        self.head.next.prev = node
        self.head.next = node
    
    def access(self, page):
        """访问一个页面"""
        if page in self.cache:
            print(f"页面 {page}: 命中")
            # 命中,更新到最前面
            node = self.cache[page]
            self._remove(node)
            self._add_to_front(node)
            return True
        
        print(f"页面 {page}: 未命中")
        # 未命中
        if len(self.cache) >= self.capacity:
            # 驱逐最久未使用的(tail前一个)
            lru = self.tail.prev
            self._remove(lru)
            del self.cache[lru.key]
            print(f"  -> 驱逐页面 {lru.key}")
        
        # 加载新页面
        new_node = Node(page)
        self.cache[page] = new_node
        self._add_to_front(new_node)
        print(f"  -> 加载页面 {page}")
        return False
    
    def print_state(self):
        """打印当前缓存状态"""
        items = []
        node = self.head.next
        while node != self.tail:
            items.append(node.key)
            node = node.next
        print(f"  当前缓存(最近在前): {items}")

LFU(Least Frequently Used):使用频率最低的,优先出去。

这个策略的直觉是:如果一个页面被访问过很多次,那它可能更重要。LFU 适合那些访问模式比较稳定的场景,但实现起来更复杂(需要维护访问计数)。

import heapq
 
class LFUCache:
    """
    LFU 缓存置换算法
    
    策略:访问频率最低的页面先被踢出
    特点:适合访问模式稳定的场景
    
    注意:这是简化版本,实际实现需要处理频率相同时的选择问题
    """
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = {}  # key -> (freq, key, value)
        self.freq_map = {}  # freq -> set of keys
        self.min_freq = 0
    
    def access(self, page):
        """访问一个页面"""
        if page in self.cache:
            print(f"页面 {page}: 命中")
            freq, _, _ = self.cache[page]
            new_freq = freq + 1
            
            # 从旧频率集合移除
            self.freq_map[freq].remove(page)
            if not self.freq_map[freq]:
                del self.freq_map[freq]
                if freq == self.min_freq:
                    self.min_freq = new_freq
            
            # 添加到新频率集合
            if new_freq not in self.freq_map:
                self.freq_map[new_freq] = set()
            self.freq_map[new_freq].add(page)
            
            # 更新缓存
            self.cache[page] = (new_freq, page, None)
            return True
        
        print(f"页面 {page}: 未命中")
        # 未命中
        if len(self.cache) >= self.capacity:
            # 驱逐频率最低的
            if self.min_freq in self.freq_map:
                lfu_page = next(iter(self.freq_map[self.min_freq]))
                del self.cache[lfu_page]
                self.freq_map[self.min_freq].remove(lfu_page)
                print(f"  -> 驱逐页面 {lfu_page}")
        
        # 加载新页面
        self.cache[page] = (1, page, None)
        if 1 not in self.freq_map:
            self.freq_map[1] = set()
        self.freq_map[1].add(page)
        self.min_freq = 1
        print(f"  -> 加载页面 {page}")
        return False
    
    def print_state(self):
        print(f"  当前缓存: {[(k, f'{v[0]}次') for k, v in self.cache.items()]}")

4.3 动手实验:模拟缓存置换算法

让我们写一个完整的实验来比较这些算法的表现:

import random
from collections import deque
 
class FIFOCache:
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = set()
        self.queue = deque()
    
    def access(self, page):
        if page in self.cache:
            return True
        if len(self.cache) >= self.capacity:
            evicted = self.queue.popleft()
            self.cache.remove(evicted)
        self.cache.add(page)
        self.queue.append(page)
        return False
 
class LRUCache:
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = {}
        self.head = Node('head')
        self.tail = Node('tail')
        self.head.next = self.tail
        self.tail.prev = self.head
    
    def _remove(self, node):
        node.prev.next = node.next
        node.next.prev = node.prev
    
    def _add_to_front(self, node):
        node.next = self.head.next
        node.prev = self.head
        self.head.next.prev = node
        self.head.next = node
    
    def access(self, page):
        if page in self.cache:
            node = self.cache[page]
            self._remove(node)
            self._add_to_front(node)
            return True
        if len(self.cache) >= self.capacity:
            lru = self.tail.prev
            self._remove(lru)
            del self.cache[lru.key]
        new_node = Node(page)
        self.cache[page] = new_node
        self._add_to_front(new_node)
        return False
 
class Node:
    def __init__(self, key):
        self.key = key
        self.prev = None
        self.next = None
 
def run_experiment():
    """运行缓存置换实验"""
    
    # 模拟两种访问模式
    print("=" * 60)
    print("实验1:局部性访问模式(80-20规则)")
    print("=" * 60)
    
    # 80-20 分布:20%的页面占据80%的访问
    def generate_zipf_request():
        if random.random() < 0.8:
            # 大概率访问热点页面
            return random.randint(1, 4)  # 热点页
        else:
            return random.randint(5, 20)  # 非热点页
    
    cache_size = 4
    num_requests = 1000
    
    # 测试各个算法
    for name, CacheClass in [("FIFO", FIFOCache), ("LRU", LRUCache)]:
        cache = CacheClass(cache_size)
        misses = 0
        
        for _ in range(num_requests):
            page = generate_zipf_request()
            if not cache.access(page):
                misses += 1
        
        hit_rate = 1 - misses / num_requests
        print(f"{name}: 命中率 = {hit_rate:.2%}, 未命中 = {misses}")
    
    print()
    print("=" * 60)
    print("实验2:循环访问模式")
    print("=" * 60)
    
    # 循环模式:1, 2, 3, 4, 1, 2, 3, 4, ...
    cache_size = 4
    num_requests = 1000
    
    for name, CacheClass in [("FIFO", FIFOCache), ("LRU", LRUCache)]:
        cache = CacheClass(cache_size)
        misses = 0
        
        for i in range(num_requests):
            page = (i % 4) + 1  # 1, 2, 3, 4 循环
            if not cache.access(page):
                misses += 1
        
        hit_rate = 1 - misses / num_requests
        print(f"{name}: 命中率 = {hit_rate:.2%}, 未命中 = {misses}")
 
# 运行实验
run_experiment()

典型的实验结果会是:

============================================================
实验1:局部性访问模式(80-20规则)
============================================================
FIFO: 命中率 = 55.20%, 未命中 = 448
LRU: 命中率 = 81.40%, 未命中 = 186

============================================================
实验2:循环访问模式
============================================================
FIFO: 命中率 = 0.00%, 未命中 = 1000
LRU: 命中率 = 0.00%, 未命中 = 1000

从实验结果可以看出:

  • 在有局部性的访问模式下,LRU明显优于FIFO
  • 在循环访问模式下,两者都表现很差(这是缓存大小等于工作集大小时的正常现象)

4.4 竞争比分析

在理论上,FIFO和LRU的最坏情况竞争比都是 是缓存大小)。这意味着在最坏情况下,它们可能比最优算法差 倍。

这个下界是可以达到的,考虑请求序列:

FIFO和LRU在这个序列上每次都未命中,而最优算法可以用 个缓存空间达到 100% 命中率。

但在实际应用中,由于程序的局部性原理,LRU通常表现很好。这也是为什么LRU是现代计算机系统中最广泛使用的缓存置换策略。


5. 在线匹配:二分图中的在线约会

5.1 什么是在线匹配?

想象你在运营一个约会平台。用户不断注册,每个人都有一份他们感兴趣的异性列表。你的任务是把每个新来的人”匹配”给一个还没有被匹配的人。

这就是在线匹配问题

更形式化地说,假设我们有一张二分图,左边是”在线”到来的顶点(医生),右边是固定的目标顶点(病人),每条边表示”左边顶点可以匹配右边顶点”。当左边的顶点到来时,我们必须立即决定把它匹配到右边哪个还未被匹配的顶点。

5.2 贪婪匹配:先到先得

最简单的策略是贪婪匹配:当一个新顶点到来时,随机(或按某种规则)选择一个它可以连接的未匹配顶点。

def greedy_matching(online_vertices, right_vertices, edges):
    """
    贪婪在线匹配
    
    每个在线顶点到来时,随机选择一个可用的邻居进行匹配
    """
    matching = {}  # 在线顶点 -> 右顶点
    available_right = set(right_vertices)
    
    for v in online_vertices:
        # 找到v的所有可匹配顶点
        neighbors = [u for u in edges[v] if u in available_right]
        
        if neighbors:
            chosen = random.choice(neighbors)
            matching[v] = chosen
            available_right.remove(chosen)
    
    return matching

贪婪匹配的竞争比分析很悲观:对于一般图,竞争比可能是 。这意味着在最坏情况下,贪婪匹配可能只匹配到最优算法 分之一的内容。

5.3 随机化帮忙

有意思的是,如果引入随机性,情况会好很多。

随机化贪婪匹配:当一个顶点到来时,不是固定选择某个邻居,而是随机打乱它所有可用的邻居,然后选择第一个。

def randomized_greedy_matching(online_vertices, right_vertices, edges):
    """
    随机化贪婪匹配
    """
    matching = {}
    available_right = set(right_vertices)
    
    for v in online_vertices:
        neighbors = [u for u in edges[v] if u in available_right]
        
        if neighbors:
            # 随机打乱
            random.shuffle(neighbors)
            chosen = neighbors[0]
            matching[v] = chosen
            available_right.remove(chosen)
    
    return matching

随机化版本的竞争比可以改进到 。这比确定性的 好得多。

5.4 为什么随机能帮忙?

随机化帮的忙在于:它打破了”最坏情况输入”的诅咒。

确定性算法面临的困境是:敌手知道你的策略,然后精心构造一个让你表现最差的输入。但当算法使用随机性时,敌手无法预测你的下一步决策,因为它依赖于随机数。

这就像打牌:如果对手能看穿你的所有牌,你必输无疑;但如果你的决策有一定随机性,对手就无法完全针对你。


6. 在线装箱:把东西塞进箱子里

6.1 问题再描述

在线装箱问题是这样的:你有一堆物品,依次到来,每个物品的大小在0到1之间。你的任务是把这些物品装进容量为1的箱子里,每装一个物品就要决定放进哪个箱子(如果当前箱子放不下就开新箱子)。

目标是用最少的箱子。

这个问题在现实中有很多应用:

  • 集装箱装货:货物一件件到来,需要实时决定装哪个集装箱
  • 内存分配:进程申请内存块,操作系统需要实时分配
  • 服务器资源调度:任务到来时分配到哪个服务器

6.2 四种经典策略

Next Fit(下一个箱子):当前箱子装不下了就开新箱子,之前的不再回头看。

这是最简单的策略,优点是只需要记住当前箱子,缺点是空间利用率可能很低。

class NextFitBinPacker:
    """
    Next Fit 装箱算法
    
    策略:当前箱子装不下就开新箱子
    优点:O(1) 空间复杂度
    缺点:空间利用率低
    竞争比:2(渐近)
    """
    
    def __init__(self, capacity=1.0):
        self.capacity = capacity
        self.current_bin_load = 0
        self.num_bins = 0
    
    def add_item(self, size):
        if size > self.capacity:
            raise ValueError(f"物品大小 {size} 超过箱子容量")
        
        if self.current_bin_load + size <= self.capacity:
            self.current_bin_load += size
        else:
            # 开新箱子
            self.num_bins += 1
            self.current_bin_load = size
        
        return self.num_bins
    
    def finish(self):
        """结束装箱,关闭最后一个箱子"""
        if self.current_bin_load > 0:
            self.num_bins += 1
        return self.num_bins

First Fit(第一个合适):从第一个箱子开始,找第一个能放下物品的。

这个策略比Next Fit好一些,但需要维护所有箱子的状态。

class FirstFitBinPacker:
    """
    First Fit 装箱算法
    
    策略:找第一个能放下物品的箱子
    优点:空间利用率较高
    缺点:需要维护箱子列表,插入操作较慢
    竞争比:1.7(渐近)
    """
    
    def __init__(self, capacity=1.0):
        self.capacity = capacity
        self.bins = []  # 每个箱子:(剩余容量, 物品列表)
    
    def add_item(self, size):
        # 线性查找第一个能放下物品的箱子
        for i, (free_space, items) in enumerate(self.bins):
            if free_space >= size:
                self.bins[i] = (free_space - size, items + [size])
                return i
        
        # 没有能放下的箱子,开新箱子
        self.bins.append((self.capacity - size, [size]))
        return len(self.bins) - 1
    
    def get_num_bins(self):
        return len(self.bins)
    
    def print_state(self):
        for i, (free, items) in enumerate(self.bins):
            print(f"箱子 {i}: 物品 {items}, 剩余 {free:.2f}")

Best Fit(最合适):找一个刚好能放下物品的箱子(即剩余空间最小但能容纳物品的箱子)。

这个策略试图让每个箱子都尽量装满。

class BestFitBinPacker:
    """
    Best Fit 装箱算法
    
    策略:找剩余空间最小但能放下物品的箱子
    优点:空间利用率高
    缺点:需要排序或高效查找
    竞争比:1.7(渐近)
    """
    
    def __init__(self, capacity=1.0):
        self.capacity = capacity
        self.bins = []  # (剩余容量, 物品列表)
    
    def add_item(self, size):
        # 找剩余空间最小但 >= size 的箱子
        best_idx = -1
        best_free = self.capacity + 1
        
        for i, (free, _) in enumerate(self.bins):
            if free >= size and free < best_free:
                best_free = free
                best_idx = i
        
        if best_idx == -1:
            # 开新箱子
            self.bins.append((self.capacity - size, [size]))
            return len(self.bins) - 1
        else:
            # 更新最佳箱子
            free, items = self.bins[best_idx]
            self.bins[best_idx] = (free - size, items + [size])
            return best_idx

Harmonic Fit:按物品大小分类处理。

class HarmonicFitBinPacker:
    """
    Harmonic Fit 装箱算法
    
    策略:按物品大小分类,大物品单独处理,小物品按比例分配
    竞争比:1.691...(最优在线算法之一)
    """
    
    def __init__(self, capacity=1.0, k=3):
        self.capacity = capacity
        self.k = k
        self.bins = [[] for _ in range(k)]  # 不同大小类别的箱子
    
    def _get_class(self, size):
        """根据物品大小确定类别"""
        for i in range(1, self.k + 1):
            if 1.0 / (i + 1) < size <= 1.0 / i:
                return i - 1
        return self.k  # 特别小的物品
    
    def add_item(self, size):
        item_class = self._get_class(size)
        
        # 在对应类别的箱子里找能放下的
        for bin_list in self.bins[item_class]:
            if sum(bin_list) + size <= self.capacity:
                bin_list.append(size)
                return
        
        # 开新箱子
        self.bins[item_class].append([size])
    
    def get_num_bins(self):
        return sum(len(bin_list) for bin_list in self.bins)

6.3 竞争比分析

各种策略的竞争比:

算法竞争比备注
Next Fit2最简单,但浪费空间
First Fit1.7实际表现较好
Best Fit1.7与 First Fit 类似
Harmonic Fit1.691…最优在线算法之一
FFD(离线)11/9 ≈ 1.222先排序再 First Fit

有意思的是:没有确定性在线算法能做得比 1.531… 更好。这是竞争比的下界。所以 Harmonic Fit 已经非常接近最优了。


7. 多臂老虎机:探索与利用的两难困境

7.1 什么是多臂老虎机?

“多臂老虎机”这个名字听起来很唬人,但其实就是一个简单的思想实验:

想象你面前有 台老虎机,每台的”吐钱率”不同。你不知道每台机器的真实回报率,只知道每拉一次摇杆会有多少回报。你的任务是:在有限次数的拉杆中,最大化总收益。

这就是多臂老虎机问题(Multi-Armed Bandit Problem)。

这个问题在现实中太常见了:

  • 新闻推荐:该推用户已知的热门新闻(利用),还是尝试推荐新内容(探索)?
  • 新药试验:该给病人用已知的有效药物(利用),还是测试新药(探索)?
  • 广告投放:该投放已知效果好的广告(利用),还是测试新广告(探索)?
  • 网络路由:该用已知的最短路径(利用),还是尝试新路径(探索)?

这就是经典的探索-利用权衡(Exploration-Exploitation Tradeoff)

7.2 几种简单的策略

纯利用(Greedy):一直选目前估计最好的臂。

问题:你怎么知道哪个臂最好?你没有探索过其他臂,所以你的”最好”可能只是基于很少样本的错觉。

纯探索(Random):每次随机选择。

问题:你确实探索了所有臂,但完全没有利用已知信息,效率很低。

ε-贪婪(Epsilon-Greedy):以概率 利用(选当前最好的),以概率 探索(随机选)。

import random
import math
 
class EpsilonGreedy:
    """
    ε-贪婪算法
    
    以概率 ε 探索(随机选择)
    以概率 1-ε 利用(选择当前估计最好的)
    """
    
    def __init__(self, n_arms, epsilon=0.1):
        self.n_arms = n_arms
        self.epsilon = epsilon
        self.counts = [0] * n_arms      # 每个臂被选择的次数
        self.values = [0.0] * n_arms   # 每个臂的估计价值
    
    def select_arm(self):
        """选择一个臂"""
        if random.random() > self.epsilon:
            # 利用:选择当前估计价值最高的
            return self.values.index(max(self.values))
        else:
            # 探索:随机选择
            return random.randint(0, self.n_arms - 1)
    
    def update(self, arm, reward):
        """更新臂的估计价值"""
        self.counts[arm] += 1
        n = self.counts[arm]
        
        # 增量更新:Q_{n+1} = Q_n + (R_n - Q_n) / n
        value = self.values[arm]
        self.values[arm] = value + (reward - value) / n
    
    def reset(self):
        """重置算法"""
        self.counts = [0] * self.n_arms
        self.values = [0.0] * self.n_arms

7.3 UCB算法:平衡估计值和不确定性

ε-贪婪有一个问题:即使某个臂已经被充分探索,算法仍然有 ε 的概率随机选择它,造成浪费。

UCB(Upper Confidence Bound) 算法提供了一个更优雅的解决方案:

核心思想:不仅要考虑臂的估计价值,还要考虑我们对它估计的不确定性。不确定性越大的臂,越值得探索。

class UCB1:
    """
    UCB1 算法
    
    核心公式:UCB = 估计均值 + 置信上界
    置信上界 = sqrt(2 * ln(总选择次数) / 该臂选择次数)
    
    竞争比:O(sqrt(log T)) 的后悔值
    """
    
    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.counts = [0] * n_arms      # 每个臂被选择的次数
        self.values = [0.0] * n_arms    # 每个臂的估计价值
        self.total_counts = 0          # 总选择次数
    
    def select_arm(self):
        """选择一个臂"""
        # 首先确保每个臂至少被选一次
        for arm in range(self.n_arms):
            if self.counts[arm] == 0:
                return arm
        
        # 计算每个臂的UCB值
        ucb_values = []
        for arm in range(self.n_arms):
            # 估计均值
            estimated = self.values[arm]
            
            # 置信上界
            bonus = math.sqrt(
                2 * math.log(self.total_counts) / self.counts[arm]
            )
            
            ucb_values.append(estimated + bonus)
        
        # 选择UCB值最高的臂
        return ucb_values.index(max(ucb_values))
    
    def update(self, arm, reward):
        """更新臂的信息"""
        self.total_counts += 1
        self.counts[arm] += 1
        n = self.counts[arm]
        
        # 增量更新估计值
        self.values[arm] = self.values[arm] + (reward - self.values[arm]) / n

UCB的好处是:随着选择次数增加,被频繁选择的臂的不确定性会降低,而被冷落的臂的不确定性仍然较高,自动实现探索-利用的平衡。

7.4 Thompson采样:贝叶斯方法

Thompson采样是另一种优雅的解决方案,它使用贝叶斯推理:

import numpy as np
 
class ThompsonSampling:
    """
    Thompson Sampling
    
    假设每个臂的成功率服从 Beta 分布
    每轮从后验分布采样,选择期望最高的臂
    """
    
    def __init__(self, n_arms, prior_alpha=1, prior_beta=1):
        self.n_arms = n_arms
        self.alpha = [prior_alpha] * n_arms  # Beta 分布的 alpha 参数
        self.beta = [prior_beta] * n_arms    # Beta 分布的 beta 参数
    
    def select_arm(self):
        """选择一个臂"""
        samples = []
        for arm in range(self.n_arms):
            # 从 Beta 分布采样
            sample = np.random.beta(self.alpha[arm], self.beta[arm])
            samples.append(sample)
        
        # 选择采样值最高的臂
        return samples.index(max(samples))
    
    def update(self, arm, reward):
        """更新臂的后验分布"""
        if reward == 1:
            self.alpha[arm] += 1
        else:
            self.beta[arm] += 1
    
    def get_expected_values(self):
        """获取每个臂的期望值"""
        return [self.alpha[i] / (self.alpha[i] + self.beta[i]) 
                for i in range(self.n_arms)]

Thompson采样的好处是:它自然地在探索和利用之间取得平衡,而且实现简单,效果通常很好。

7.5 后悔值:衡量算法好坏

在多臂老虎机问题中,我们通常用后悔值(Regret) 来衡量算法好坏:

其中 是最优策略在 轮的总收益, 是算法在 轮的总收益。

好的算法应该让后悔值增长得越慢越好。理论上可以证明:

  • UCB 的后悔值是
  • Thompson Sampling 的后悔值也是

而纯贪婪算法,后悔值会线性增长——这意味着每轮都在犯错。


8. 对抗性分析:假设最坏的情况

8.1 为什么要考虑对抗性输入?

在设计在线算法时,我们需要考虑最坏情况。如果敌手(adversary)知道我们的算法,他可能会精心构造一个输入,让我们的算法表现很差。

这就是对抗性分析的核心思想:假设敌手是恶意且全知的,然后设计在最坏情况下仍然表现良好的算法

8.2 对抗性模型的三种强度

Oblivious 对抗:敌手在算法运行之前就固定了整个输入,之后不再改变。

这是最弱的对抗假设。在这种模型下,敌手不能根据算法的实际决策来调整输入。

轻度自适应对抗:敌手在看到算法对当前输入的决策后,生成下一个输入。

这种敌手稍微强一些,可以针对算法的历史决策进行调整。

完全自适应对抗:敌手完全知道算法的内部状态和随机种子,可以针对性地构造最恶劣的输入。

这是最强的对抗假设。在这种模型下设计的算法,具有最强的鲁棒性保证。

8.3 对抗性分析的例子

考虑一个简单的例子:缓存置换的FIFO算法。

如果我们假设敌手是oblivious的,敌手必须在算法运行前就决定好整个请求序列。如果我们知道算法用FIFO,我们就可以构造序列 让它每次都未命中。

但如果敌手是完全自适应的,它可以根据算法的实际状态来决定下一个请求。比如,当算法刚刚驱逐了页面1,那敌手就让下一个请求正好是页面1,让算法再次未命中。

8.4 随机化作为防护

随机化是在线算法对抗自适应敌手的主要武器

当算法使用随机性时,即使敌手知道算法的策略,它也无法预测算法的具体决策(因为不知道随机数)。这给算法提供了一种”信息不对称”的保护。

class RandomizedCache:
    """
    随机化缓存
    
    当需要驱逐时,随机选择一个页面
    这比FIFO/LRU更好地抵抗自适应敌手
    """
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = set()
    
    def access(self, page):
        if page in self.cache:
            return True
        
        if len(self.cache) >= self.capacity:
            # 随机选择一个页面驱逐
            evict_page = random.choice(list(self.cache))
            self.cache.remove(evict_page)
        
        self.cache.add(page)
        return False

9. 随机化在线算法:为什么随机能帮助?

9.1 随机化的威力

在前面的例子中我们已经看到,随机化可以帮助打破确定性算法的对称性,让我们能够抵抗自适应敌手的攻击。

但随机化的作用远不止于此。

考虑在线匹配问题:

  • 确定性算法:竞争比是
  • 随机化算法:竞争比是

这是一个巨大的改进!

9.2 Yao原理:从输入分布看问题

Yao原理告诉我们:要证明随机化算法的期望竞争比下界,只需要构造一个输入分布,使得所有确定性算法在该分布下的期望竞争比有一个下界

用人话来说:如果你想证明”没有任何随机化算法能做得比某条线更好”,你只需要找到一个”困难”的输入分布,然后证明”在这个分布下,任何确定性算法都表现很差”。

def yao_principle_experiment(n_arms=3, trials=1000):
    """
    演示 Yao 原理
    
    构造一个"困难"的输入分布
    然后检验不同确定性算法的表现
    """
    results = []
    
    # 假设臂的真实价值
    true_values = [0.3, 0.5, 0.7]  # 大部分臂都差不多好
    optimal_value = 0.7
    
    for _ in range(trials):
        # 模拟一轮选择
        chosen = random.randint(0, n_arms - 1)
        reward = random.random() < true_values[chosen]
        results.append((chosen, reward))
    
    # 计算期望后悔
    expected_reward = sum(
        true_values[c] for c, r in results
    ) / trials
    optimal_expected = optimal_value
    
    regret = optimal_expected - expected_reward
    return regret

9.3 随机化 vs 确定性:什么时候随机更好?

  1. 当问题有”对称性破缺”的需求时:如果所有确定性算法面临相同的输入结构,随机化可以帮助打破这种对称性。

  2. 当敌手是自适应的时候:随机性是对抗自适应敌手的有效武器。

  3. 当输入分布未知或难以建模时:不需要假设输入分布,随机化算法在任意输入上都有”平均”表现保证。


10. 在线学习框架:专家建议问题

10.1 从专家建议中学习

专家建议(Experts Advice) 框架是在线学习的另一个核心模型:

  • 个”专家”,每个专家在每个时刻给出一个预测
  • 学习算法必须从专家建议中选择一个,然后观察真实结果
  • 目标是最小化与最佳专家的差距(后悔值)
class OnlineLearning:
    """
    在线学习框架:专家建议模型
    
    每轮:
    1. N个专家给出预测
    2. 算法选择一个专家
    3. 观察真实结果
    4. 更新算法状态
    """
    
    def __init__(self, n_experts):
        self.n_experts = n_experts
        self.cumulative_losses = [0.0] * n_experts
    
    def predict(self, expert_predictions):
        """
        基于专家预测做预测
        子类需要实现这个方法
        """
        raise NotImplementedError
    
    def observe(self, actual):
        """观察真实结果"""
        raise NotImplementedError
    
    def get_best_expert_so_far(self):
        """到目前为止表现最好的专家"""
        return min(range(self.n_experts), 
                   key=lambda i: self.cumulative_losses[i])

10.2 加权多数算法

加权多数算法(Weighted Majority) 是解决专家建议问题的一个经典方法:

class WeightedMajority(OnlineLearning):
    """
    加权多数算法
    
    每次专家预测错误时,将该专家的权重乘以 β(0 < β < 1)
    最终预测是权重较大的预测
    """
    
    def __init__(self, n_experts, beta=0.5):
        super().__init__(n_experts)
        self.weights = [1.0] * n_experts
        self.beta = beta
    
    def predict(self, expert_predictions):
        """加权投票预测"""
        pos_weight = sum(
            w for w, pred in zip(self.weights, expert_predictions)
            if pred > 0
        )
        neg_weight = sum(
            w for w, pred in zip(self.weights, expert_predictions)
            if pred <= 0
        )
        
        return 1 if pos_weight >= neg_weight else -1
    
    def observe(self, actual, expert_predictions):
        """观察结果并更新权重"""
        for i, (pred, weight) in enumerate(
            zip(expert_predictions, self.weights)
        ):
            if pred != actual:
                # 预测错误,权重衰减
                self.weights[i] *= self.beta
                self.cumulative_losses[i] += 1

10.3 Follow the Leader

Follow the Leader(跟随领先者) 是另一个直觉上很自然的策略:每轮选择到目前为止表现最好的专家。

class FollowTheLeader(OnlineLearning):
    """
    Follow the Leader (FTL)
    
    每轮选择累积损失最小的专家
    """
    
    def predict(self, expert_predictions):
        """选择到目前为止表现最好的专家"""
        return self.get_best_expert_so_far()

但FTL有一个问题:它容易过拟合当前数据。考虑一个敌手,他让专家A在奇数轮正确、偶数轮错误,专家B相反。FTL会不断切换专家,而这种切换本身就会带来损失。


11. 在线算法与强化学习:探索-利用的视角

11.1 共享的困境

在线算法和强化学习分享着相同的基本困境:当前决策会影响未来信息,而未来信息又会影响未来的决策

在强化学习中,这表现为:

  • 探索:尝试新动作,获取新信息
  • 利用:使用已知信息,最大化当前奖励

在在线算法中,这表现为:

  • 探索:尝试新策略,学习环境
  • 利用:使用已知最优策略

两者本质上是同一个问题!

11.2 在线算法的强化学习视角

从强化学习的角度看,很多在线算法问题都可以被重新诠释:

多臂老虎机 = 单状态强化学习问题

上下文老虎机 = 带特征的强化学习

完整MDP = 完全在线决策问题

class BanditAsRL:
    """
    把老虎机问题建模为单状态MDP
    """
    
    def __init__(self, n_arms):
        self.n_arms = n_arms
        # 状态空间 = 1(单状态)
        # 动作空间 = n_arms(n个臂)
        # 奖励分布未知
        self.q_values = [0.0] * n_arms  # 状态-动作值函数
        self.counts = [0] * n_arms
    
    def select_action(self, epsilon=0.1):
        """ε-贪婪策略"""
        if random.random() < epsilon:
            return random.randint(0, self.n_arms - 1)
        return self.q_values.index(max(self.q_values))
    
    def update(self, action, reward):
        """TD更新"""
        self.counts[action] += 1
        n = self.counts[action]
        # 更新Q值(单步TD)
        self.q_values[action] += (reward - self.q_values[action]) / n

11.3 在线决策的实际应用

推荐系统:用户不断点击,推荐系统需要实时更新模型,同时探索新的推荐策略。

广告投放:广告主不断竞拍位置,平台需要实时决定展示哪个广告,同时学习用户的点击偏好。

自动驾驶:车辆在道路上行驶,需要实时做出驾驶决策,同时学习路况和驾驶策略。


12. Primal-Dual方法:在线算法的经典技巧

12.1 什么是Primal-Dual?

Primal-Dual方法是设计在线算法的一个强大技巧。它的基本思想是:

  1. 将原问题(primal)转化为一个优化问题
  2. 考虑其对偶问题(dual)
  3. 设计一个算法,同时维护primal和dual的可行性
  4. 利用对偶变量的信息来指导primal的决策

12.2 Ski Rental的Primal-Dual分析

让我们用Primal-Dual方法重新分析Ski Rental问题:

Primal(原问题)

  • 决策变量: 表示第 天是否租, 表示是否买
  • 目标:最小化总成本

Dual(对偶问题)

  • 对偶变量: 与每个决策相关
  • 约束:从对偶约束推导出最优策略

这个分析比较技术性,但核心思想是:对偶变量可以被解释为”继续等待的价值”,当这个价值超过某个阈值时,就该做出决策了

def primal_dual_ski_rental(r, p, actual_days):
    """
    Ski Rental 的 Primal-Dual 算法
    
    Primal: min Σ x_d * r + y * p
    约束: Σ x_d + y >= 1 (每天要么租要么买)
    
    Dual: max Σ α_d
    约束: α_d + β <= r (租) 或 p (买)
    """
    y = 0  # 是否已买
    x = [0] * (actual_days + 1)  # 每天是否租
    
    # Dual变量
    alpha = [0.0] * (actual_days + 1)
    beta = 0
    
    for d in range(1, actual_days + 1):
        # 计算dual约束
        if beta + r >= p:
            # 买更划算,选择买
            y = 1
            break
        else:
            # 继续租
            x[d] = 1
            alpha[d] = r - beta  # 更新dual变量
    
    return x, y

12.3 Primal-Dual的实际价值

Primal-Dual方法的优势在于:

  1. 提供结构性理解:为什么这个算法是对的?对偶变量给出了直观的解释。

  2. 统一多种算法:很多看似不同的在线算法,实际上都可以用Primal-Dual框架来理解。

  3. 指导新算法设计:通过构造对偶可行解,可以”指导”设计更好的在线算法。


13. 动手实验:完整模拟

13.1 缓存置换模拟器

让我们写一个完整的缓存置换模拟器,可以比较各种策略:

import random
from collections import deque
 
class CacheSimulator:
    """缓存置换算法模拟器"""
    
    def __init__(self, capacity, strategy='LRU'):
        self.capacity = capacity
        self.strategy = strategy
        self.cache = set()
        self.misses = 0
        self.hits = 0
        
        # 特定策略的数据结构
        self.queue = deque()  # FIFO
        self.access_order = {}  # LRU
    
    def access(self, page):
        """访问一个页面"""
        if page in self.cache:
            self.hits += 1
            if self.strategy == 'LRU':
                self.access_order[page] = len(self.access_order)
            return True
        
        self.misses += 1
        
        if len(self.cache) >= self.capacity:
            self._evict()
        
        self.cache.add(page)
        
        if self.strategy == 'FIFO':
            self.queue.append(page)
        elif self.strategy == 'LRU':
            self.access_order[page] = len(self.access_order)
        
        return False
    
    def _evict(self):
        """驱逐一个页面"""
        if self.strategy == 'FIFO':
            evicted = self.queue.popleft()
            self.cache.remove(evicted)
        elif self.strategy == 'LRU':
            # 驱逐最久未使用的
            lru_page = min(self.access_order.keys(), 
                          key=lambda p: self.access_order[p])
            del self.access_order[lru_page]
            self.cache.remove(lru_page)
        elif self.strategy == 'RANDOM':
            evicted = random.choice(list(self.cache))
            self.cache.remove(evicted)
    
    def hit_rate(self):
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0
 
 
def generate_requests(num_requests, pattern='zipfian'):
    """生成请求序列"""
    requests = []
    
    if pattern == 'zipfian':
        # 80-20 分布
        for _ in range(num_requests):
            if random.random() < 0.8:
                requests.append(random.randint(1, 4))
            else:
                requests.append(random.randint(5, 20))
    
    elif pattern == 'cyclic':
        # 循环模式
        for i in range(num_requests):
            requests.append((i % 8) + 1)
    
    elif pattern == 'random':
        # 纯随机
        for _ in range(num_requests):
            requests.append(random.randint(1, 20))
    
    elif pattern == 'locality':
        # 局部性模式:大部分请求集中在少数页面
        hot_pages = random.sample(range(1, 21), 5)
        for _ in range(num_requests):
            if random.random() < 0.7:
                requests.append(random.choice(hot_pages))
            else:
                requests.append(random.randint(1, 20))
    
    return requests
 
 
def run_comparison():
    """比较不同策略的表现"""
    print("=" * 70)
    print("缓存置换算法比较实验")
    print("=" * 70)
    
    patterns = ['zipfian', 'cyclic', 'random', 'locality']
    strategies = ['FIFO', 'LRU', 'RANDOM']
    cache_sizes = [3, 5, 10]
    num_requests = 5000
    
    results = {}
    
    for pattern in patterns:
        results[pattern] = {}
        requests = generate_requests(num_requests, pattern)
        
        print(f"\n请求模式: {pattern}")
        print("-" * 40)
        
        for strategy in strategies:
            best_hit_rate = 0
            best_cache_size = 0
            
            for cache_size in cache_sizes:
                sim = CacheSimulator(cache_size, strategy)
                
                for page in requests:
                    sim.access(page)
                
                hit_rate = sim.hit_rate()
                
                if hit_rate > best_hit_rate:
                    best_hit_rate = hit_rate
                    best_cache_size = cache_size
            
            print(f"  {strategy}: 最佳命中率 {best_hit_rate:.2%} "
                  f"(缓存大小={best_cache_size})")
            results[pattern][strategy] = best_hit_rate
    
    return results
 
 
# 运行实验
if __name__ == "__main__":
    results = run_comparison()

13.2 多臂老虎机模拟器

import random
import math
import numpy as np
 
class BanditSimulator:
    """多臂老虎机模拟器"""
    
    def __init__(self, n_arms, true_means):
        self.n_arms = n_arms
        self.true_means = true_means  # 每个臂的真实平均奖励
        self.counts = [0] * n_arms
        self.values = [0.0] * n_arms
    
    def pull(self, arm):
        """拉动指定臂"""
        self.counts[arm] += 1
        # 奖励 = 真实均值 + 高斯噪声
        reward = self.true_means[arm] + random.gauss(0, 0.1)
        return reward
    
    def update(self, arm, reward):
        """更新估计"""
        n = self.counts[arm]
        self.values[arm] = self.values[arm] + (reward - self.values[arm]) / n
 
 
class EpsilonGreedy:
    def __init__(self, n_arms, epsilon=0.1):
        self.n_arms = n_arms
        self.epsilon = epsilon
        self.counts = [0] * n_arms
        self.values = [0.0] * n_arms
    
    def select_arm(self):
        if random.random() > self.epsilon:
            return self.values.index(max(self.values))
        return random.randint(0, self.n_arms - 1)
    
    def update(self, arm, reward):
        self.counts[arm] += 1
        n = self.counts[arm]
        self.values[arm] = self.values[arm] + (reward - self.values[arm]) / n
 
 
class UCB1:
    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.counts = [0] * n_arms
        self.values = [0.0] * n_arms
        self.total_counts = 0
    
    def select_arm(self):
        for arm in range(self.n_arms):
            if self.counts[arm] == 0:
                return arm
        
        ucb_values = []
        for arm in range(self.n_arms):
            bonus = math.sqrt(
                2 * math.log(self.total_counts) / self.counts[arm]
            )
            ucb_values.append(self.values[arm] + bonus)
        
        return ucb_values.index(max(ucb_values))
    
    def update(self, arm, reward):
        self.total_counts += 1
        self.counts[arm] += 1
        n = self.counts[arm]
        self.values[arm] = self.values[arm] + (reward - self.values[arm]) / n
 
 
class ThompsonSampling:
    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.alpha = [1] * n_arms
        self.beta = [1] * n_arms
    
    def select_arm(self):
        samples = [
            np.random.beta(self.alpha[i], self.beta[i])
            for i in range(self.n_arms)
        ]
        return samples.index(max(samples))
    
    def update(self, arm, reward):
        if reward == 1:
            self.alpha[arm] += 1
        else:
            self.beta[arm] += 1
 
 
def run_bandit_experiment():
    """运行老虎机实验"""
    print("=" * 70)
    print("多臂老虎机算法比较实验")
    print("=" * 70)
    
    # 设置真实的臂价值
    true_means = [0.3, 0.5, 0.7, 0.4, 0.6]
    n_arms = len(true_means)
    n_rounds = 10000
    
    algorithms = {
        'ε-Greedy (ε=0.1)': EpsilonGreedy(n_arms, 0.1),
        'ε-Greedy (ε=0.01)': EpsilonGreedy(n_arms, 0.01),
        'UCB1': UCB1(n_arms),
        'Thompson Sampling': ThompsonSampling(n_arms),
    }
    
    results = {name: {'total_reward': 0, 'regrets': []} 
               for name in algorithms}
    
    # 运行实验
    for name, algo in algorithms.items():
        bandit = BanditSimulator(n_arms, true_means)
        best_mean = max(true_means)
        
        cumulative_reward = 0
        cumulative_regret = 0
        
        for t in range(n_rounds):
            arm = algo.select_arm()
            reward = bandit.pull(arm)
            
            algo.update(arm, reward)
            bandit.update(arm, reward)
            
            cumulative_reward += reward
            cumulative_regret += best_mean - true_means[arm]
            
            if (t + 1) % 1000 == 0:
                results[name]['regrets'].append(cumulative_regret)
        
        results[name]['total_reward'] = cumulative_reward
        results[name]['final_regret'] = cumulative_regret
    
    # 打印结果
    print("\n算法比较:")
    print("-" * 50)
    for name, result in results.items():
        print(f"{name}:")
        print(f"  总奖励: {result['total_reward']:.2f}")
        print(f"  最终后悔值: {result['final_regret']:.2f}")
    
    return results
 
 
if __name__ == "__main__":
    results = run_bandit_experiment()

14. 在线算法的设计原则总结

14.1 核心设计原则

  1. 延迟决策原则:尽可能推迟不可逆的决策,直到收集了更多信息。

  2. 保守与冒险的平衡:根据问题特性选择合适的风险水平。

  3. 利用局部性:如果访问模式有局部性,设计利用局部性的策略(如LRU)。

  4. 随机化防护:使用随机性来对抗自适应敌手的攻击。

  5. Primal-Dual思维:从对偶问题的角度理解算法行为。

14.2 各类问题的最优策略

问题确定性竞争比随机化竞争比理论下界
Ski Rental
在线装箱
负载均衡(台机)
缓存置换(k页)
列表更新
在线匹配

15. 在线算法的未来方向

15.1 与深度学习的结合

近年来,在线算法和深度学习的结合成为一个热门研究方向:

  • 强化学习:用神经网络学习更好的在线策略
  • 元学习:让算法学会”如何学习”
  • 自适应算法:根据输入特性自动调整策略

15.2 鲁棒在线优化

传统的竞争分析假设敌手是全知全能的,但在实际应用中,敌手的能力往往是有限的。鲁棒在线优化研究在部分信息下的在线决策问题。

15.3 在线算法与因果推断

现代推荐系统和在线广告需要处理选择偏差问题,这需要将在线算法的思想与因果推断方法相结合。


参考来源

  1. Borodin, A., & El-Yaniv, R. (1998). Online Computation and Competitive Analysis. Cambridge University Press.
  2. Sleator, D. D., & Tarjan, R. E. (1985). Amortized Efficiency of List Update and Paging Rules. CACM.
  3. Graham, R. L. (1966). Bounds for Certain Multiprocessing Anomalies. Bell System Technical Journal.
  4. Karp, R. M. (1992). On-Line Algorithms. On-Line Algorithms.
  5. Mehta, A., Saberi, A., Vazirani, U., & Vazirani, V. (2005). AdWords and Generalized Online Matching. JACM.
  6. Belady, L. A. (1966). A Study of Replacement Algorithms for Virtual-Storage Computer. IBM Systems Journal.
  7. Auer, P., Cesa-Bianchi, N., & Fischer, P. (2002). Finite-time Analysis of the Multiarmed Bandit Problem. Machine Learning.
  8. Lai, T. L., & Robbins, H. (1985). Asymptotically Efficient Adaptive Allocation Rules. Advances in Applied Mathematics.
  9. Bansal, N., & Pruhs, K. (2010). The Geometry of Online Packing Linear Programs. Math. Oper. Res.
  10. Buchbinder, N., & Naor, J. (2009). The Design of Competitive Online Algorithms via a Primal-Dual Approach. Foundations and Trends in Theoretical Computer Science.

相关文档