摘要
滑动窗口,听起来高大上,其实就是”移动的截取窗口”。这篇专门为零基础读者讲解:滑动窗口的核心原理、三种实现方式(简单滑动、平均注意力、语义分块)、与Transformer的结合、以及在流式处理中的应用。看完全篇,你不仅能理解原理,还能写出生产级的滑动窗口代码。
先用一个生活例子理解滑动窗口
想象你看电影
你在看电影,但只能一次看10分钟的片段:
电影总时长:60分钟
窗口大小:10分钟
第1次看:0-10分钟 ✓
第2次看:5-15分钟(滑动5分钟)
第3次看:10-20分钟(再滑动5分钟)
...
关键点:
- 窗口大小固定:每次只看10分钟
- 滑动步长可调:可以每次滑动5分钟、3分钟、甚至1分钟
- 重叠区域:相邻窗口之间有重叠,这样可以保证连续性
滑动窗口在AI中的含义
在AI领域,滑动窗口解决的问题是:
问题:对话历史有100轮,怎么让AI看到所有历史?
解决:用滑动窗口,每次只看最近20轮
历史:[1][2][3]...[80][81][82]...[100]
窗口1: [81][82]...[100]
窗口2: [79][80]...[98]
窗口3: [77][78]...[96]
...
一、滑动窗口的核心原理
基本概念
# 滑动窗口示意图
"""
原始序列:[A][B][C][D][E][F][G][H][I][J]
窗口大小 = 4
步长 = 2
窗口1:[A][B][C][D] 位置 0-4
窗口2: [C][D][E][F] 位置 2-6
窗口3: [E][F][G][H] 位置 4-8
窗口4: [G][H][I][J] 位置 6-10
重叠:C D E F G H 在多个窗口中重复出现
"""数学定义
滑动窗口可以用数学公式表达:
设序列为 S = [s₁, s₂, s₃, ..., sₙ]
窗口大小为 w
步长为 s
第i个窗口:
Wᵢ = [sᵢ, sᵢ₊₁, sᵢ₊₂, ..., sᵢ₊w₋₁]
窗口数量:
N = floor((n - w) / s) + 1
三个关键参数
| 参数 | 含义 | 选择建议 |
|---|---|---|
| 窗口大小(w) | 每次处理多少个token | 取决于模型限制和任务需求 |
| 步长(s) | 每次滑动多少 | 小=细节多/计算大,大=计算快/可能丢信息 |
| 重叠率 | (w-s)/w | 一般50%-75%效果较好 |
二、三种滑动窗口实现
方式1:简单滑动窗口(最基础)
from typing import List, Any, Iterator
class SimpleSlidingWindow:
"""简单滑动窗口 - 最基础的实现"""
def __init__(self, window_size: int, step: int = 1):
"""
Args:
window_size: 窗口大小
step: 步长(滑动距离)
"""
self.window_size = window_size
self.step = step
def create_windows(self, data: List[Any]) -> List[List[Any]]:
"""创建所有窗口"""
windows = []
start = 0
while start + self.window_size <= len(data):
window = data[start:start + self.window_size]
windows.append(window)
start += self.step
return windows
def windows_iter(self, data: List[Any]) -> Iterator[List[Any]]:
"""迭代器版本(节省内存)"""
start = 0
while start + self.window_size <= len(data):
yield data[start:start + self.window_size]
start += self.step
def get_windows_with_position(self, data: List[Any]) -> List[tuple]:
"""获取窗口及其位置信息"""
windows = []
start = 0
while start + self.window_size <= len(data):
window = data[start:start + self.window_size]
windows.append({
'window': window,
'start_idx': start,
'end_idx': start + self.window_size,
'window_id': len(windows)
})
start += self.step
return windows
# 使用示例
if __name__ == "__main__":
messages = [f"消息{i}" for i in range(1, 21)]
sw = SimpleSlidingWindow(window_size=5, step=2)
print(f"原始数据:{len(messages)}条消息")
print("=" * 50)
for w in sw.create_windows(messages):
print(w)方式2:重叠率滑动窗口(更常用)
class OverlapSlidingWindow:
"""重叠式滑动窗口 - 保持上下文连续性"""
def __init__(
self,
window_size: int,
overlap_ratio: float = 0.5
):
"""
Args:
window_size: 窗口大小
overlap_ratio: 重叠比例(0-1之间)
"""
self.window_size = window_size
self.overlap_ratio = overlap_ratio
self.step = int(window_size * (1 - overlap_ratio))
def create_windows(self, data: List[Any]) -> List[tuple]:
"""创建带重叠的窗口"""
windows = []
if len(data) <= self.window_size:
return [(0, data)]
start = 0
while start + self.window_size < len(data):
end = start + self.window_size
windows.append((start, data[start:end]))
start += self.step
# 最后一个窗口:包含所有剩余数据
remaining = data[start:]
if remaining:
windows.append((start, remaining))
return windows
def get_overlap_pairs(self, data: List[Any]) -> List[tuple]:
"""获取相邻窗口之间的重叠部分"""
pairs = []
windows = self.create_windows(data)
for i in range(len(windows) - 1):
current_window = windows[i][1]
next_window = windows[i + 1][1]
# 计算重叠部分
overlap_size = self.window_size - self.step
overlap = current_window[-overlap_size:]
pairs.append({
'window_id': i,
'overlap': overlap,
'non_overlap': next_window[overlap_size:]
})
return pairs
def merge_overlapping_context(
self,
windows: List[tuple],
merge_strategy: str = "last"
) -> List[Any]:
"""
合并重叠窗口
merge_strategy: 'last'|'first'|'combine'
"""
if merge_strategy == "last":
# 保留最后部分(更新鲜)
return [w[-1] for _, w in windows]
elif merge_strategy == "first":
# 保留最前部分(更完整)
return [w[0] for _, w in windows]
elif merge_strategy == "combine":
# 合并所有
result = []
for _, w in windows:
result.extend(w)
return result
raise ValueError(f"Unknown merge strategy: {merge_strategy}")
# 使用示例
if __name__ == "__main__":
# 模拟100轮对话
messages = [f"对话{i}" for i in range(1, 101)]
sw = OverlapSlidingWindow(
window_size=20,
overlap_ratio=0.5 # 50%重叠
)
windows = sw.create_windows(messages)
print(f"原始数据:{len(messages)}条")
print(f"窗口数量:{len(windows)}")
print(f"每个窗口大小:{sw.window_size}")
print(f"步长:{sw.step}")
print(f"重叠:{sw.window_size - sw.step}条")方式3:语义滑动窗口(最智能)
简单滑动窗口的缺点是:可能在句子中间截断!
class SemanticSlidingWindow:
"""
语义滑动窗口 - 按语义边界切分
不会在句子中间截断
"""
def __init__(
self,
max_tokens: int,
overlap_tokens: int = 200,
tokenizer=None
):
self.max_tokens = max_tokens
self.overlap_tokens = overlap_tokens
self.tokenizer = tokenizer or self._simple_tokenizer()
@staticmethod
def _simple_tokenizer():
"""简单token估算器"""
def tokenize(text: str) -> int:
chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
english = len(text.split()) - chinese
return chinese + english
return tokenize
def create_chunks(self, text: str) -> List[dict]:
"""按段落/句子切分,保持语义完整"""
# 1. 先按段落分割
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_tokens = 0
for para in paragraphs:
para_tokens = self.tokenizer(para)
# 如果单个段落就超限,按句子分割
if para_tokens > self.max_tokens:
sentences = para.split('。')
for sent in sentences:
if not sent.strip():
continue
sent_tokens = self.tokenizer(sent)
if current_tokens + sent_tokens > self.max_tokens:
if current_chunk:
chunks.append({
'content': '。'.join(current_chunk) + '。',
'tokens': current_tokens
})
current_chunk = [sent]
current_tokens = sent_tokens
else:
current_chunk.append(sent)
current_tokens += sent_tokens
# 普通段落处理
elif current_tokens + para_tokens > self.max_tokens:
# 保存当前chunk
if current_chunk:
chunks.append({
'content': '\n\n'.join(current_chunk),
'tokens': current_tokens
})
# 开始新chunk,保留重叠
if self.overlap_tokens > 0 and current_chunk:
# 把最后的段落加入新chunk(作为重叠)
overlap_text = '\n\n'.join(current_chunk[-1:])
overlap_tokens = self.tokenizer(overlap_text)
if overlap_tokens <= self.overlap_tokens:
current_chunk = [overlap_text]
current_tokens = overlap_tokens
else:
current_chunk = []
current_tokens = 0
else:
current_chunk = []
current_tokens = 0
current_chunk.append(para)
current_tokens += para_tokens
else:
current_chunk.append(para)
current_tokens += para_tokens
# 处理最后一个chunk
if current_chunk:
chunks.append({
'content': '\n\n'.join(current_chunk),
'tokens': current_tokens
})
return chunks
def create_windows_with_semantic_boundaries(
self,
data: List[dict],
text_key: str = 'text'
) -> List[List[dict]]:
"""
按语义边界创建窗口
data: 包含文本的字典列表
"""
windows = []
current_window = []
current_tokens = 0
for item in data:
text = item.get(text_key, '')
tokens = self.tokenizer(text)
if current_tokens + tokens > self.max_tokens:
if current_window:
windows.append(current_window)
# 保留重叠
if self.overlap_tokens > 0 and current_window:
overlap_size = self._calculate_overlap_size(current_window)
current_window = current_window[-overlap_size:]
current_tokens = sum(
self.tokenizer(i.get(text_key, ''))
for i in current_window
)
else:
current_window = []
current_tokens = 0
current_window.append(item)
current_tokens += tokens
if current_window:
windows.append(current_window)
return windows
def _calculate_overlap_size(self, window: List[dict]) -> int:
"""计算重叠项数量"""
overlap_tokens = 0
for i, item in enumerate(reversed(window)):
text = item.get('text', '')
overlap_tokens += self.tokenizer(text)
if overlap_tokens >= self.overlap_tokens:
return i + 1
return len(window)
# 使用示例
if __name__ == "__main__":
# 模拟文档
document = """
第一段内容。这是一个完整的段落,介绍了基本概念。
第二段内容。这里详细解释了原理和实现方式。
第三段内容。包含了代码示例和运行结果。
第四段内容。继续深入讨论相关话题。
第五段内容。总结和展望。
""" * 20 # 放大文档
sw = SemanticSlidingWindow(max_tokens=100)
chunks = sw.create_chunks(document)
print(f"文档总长度:{len(document)}字符")
print(f"分块数量:{len(chunks)}")
for i, chunk in enumerate(chunks):
print(f"块{i+1}:{chunk['tokens']}tokens")三、滑动窗口在LLM中的应用
应用场景1:长文档处理
class LongDocumentProcessor:
"""长文档处理 - 使用滑动窗口"""
def __init__(
self,
llm_client,
window_size: int = 4000,
overlap_tokens: int = 500
):
self.llm = llm_client
self.window_size = window_size
self.overlap_tokens = overlap_tokens
def process_document(self, document: str, query: str) -> str:
"""
处理长文档并回答问题
"""
sw = SemanticSlidingWindow(
max_tokens=self.window_size,
overlap_tokens=self.overlap_tokens
)
chunks = sw.create_chunks(document)
# 并行处理每个chunk
chunk_answers = []
for i, chunk in enumerate(chunks):
prompt = f"""基于以下内容片段,回答问题。
内容:
{chunk['content']}
问题:{query}
如果这段内容能回答问题,给出答案。如果不能,说明"这段内容中没有答案"。"""
answer = self.llm.generate(prompt)
chunk_answers.append({
'chunk_id': i,
'answer': answer,
'has_answer': "没有答案" not in answer
})
# 合并答案
relevant_answers = [
a for a in chunk_answers
if a['has_answer']
]
if not relevant_answers:
return "抱歉,文档中没有找到答案。"
# 整合答案
final_prompt = f"""问题:{query}
以下是各部分的相关答案:
{chr(10).join([a['answer'] for a in relevant_answers])}
请整合这些答案,给出完整回答。"""
return self.llm.generate(final_prompt)应用场景2:对话历史管理
class ConversationSlidingWindow:
"""对话滑动窗口 - 管理长对话"""
def __init__(
self,
llm_client,
max_tokens: int = 8000,
overlap_tokens: int = 500
):
self.llm = llm_client
self.max_tokens = max_tokens
self.overlap_tokens = overlap_tokens
self.messages = []
def add_message(self, role: str, content: str):
"""添加消息"""
self.messages.append({
'role': role,
'content': content,
'timestamp': datetime.now().isoformat()
})
def get_context(self, current_query: str) -> str:
"""
获取上下文 - 使用滑动窗口
"""
# 将消息转换为文本
text_content = self._messages_to_text(self.messages)
# 如果总长度合适,直接返回
if self._estimate_tokens(text_content) <= self.max_tokens:
return text_content
# 使用滑动窗口
sw = SemanticSlidingWindow(
max_tokens=self.max_tokens - self._estimate_tokens(current_query),
overlap_tokens=self.overlap_tokens
)
# 按语义分块
chunks = sw.create_chunks(text_content)
if not chunks:
return self._messages_to_text(self.messages[-5:])
# 返回最近的chunk
return chunks[-1]['content']
def get_context_with_history_summary(self, current_query: str) -> str:
"""
获取上下文 - 保留历史摘要
"""
total_tokens = self._estimate_tokens(
self._messages_to_text(self.messages)
)
if total_tokens <= self.max_tokens:
return self._messages_to_text(self.messages)
# 计算需要摘要的部分
window_tokens = self.max_tokens - self.overlap_tokens
# 提取需要摘要的历史
history = self.messages[:-10] # 保留最近10条
recent = self.messages[-10:]
if not history:
return self._messages_to_text(recent)
# 摘要历史
history_text = self._messages_to_text(history)
summary_prompt = f"""请总结以下对话的核心内容,保留关键信息:
{history_text}
摘要要求:
1. 保留主要话题和决定
2. 保留用户的核心需求
3. 删除重复和细节
4. 控制在200字以内"""
summary = self.llm.generate(summary_prompt)
# 构建最终上下文
recent_text = self._messages_to_text(recent)
return f"[早期对话摘要]\n{summary}\n\n[近期对话]\n{recent_text}"
def _messages_to_text(self, messages: list) -> str:
"""消息转文本"""
return '\n'.join([f"{m['role']}: {m['content']}" for m in messages])
@staticmethod
def _estimate_tokens(text: str) -> int:
chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
english = len(text.split()) - chinese
return int(chinese * 0.5 + english * 0.25)应用场景3:流式数据处理
class StreamSlidingWindow:
"""流式滑动窗口 - 处理实时数据"""
def __init__(
self,
window_size: int = 10,
step: int = 5,
aggregate_fn: callable = None
):
self.window_size = window_size
self.step = step
self.aggregate_fn = aggregate_fn or (lambda x: x[-1])
self.buffer = []
def add(self, item: Any) -> Optional[Any]:
"""
添加数据,返回窗口结果(如果有)
"""
self.buffer.append(item)
# 如果窗口满了
if len(self.buffer) >= self.window_size:
window = self.buffer[:self.window_size]
result = self.aggregate_fn(window)
# 滑动
self.buffer = self.buffer[self.step:]
return result
return None
def add_batch(self, items: List[Any]) -> List[Any]:
"""批量添加"""
results = []
for item in items:
result = self.add(item)
if result is not None:
results.append(result)
return results
def get_current_buffer(self) -> List[Any]:
"""获取当前缓冲区"""
return self.buffer.copy()
# 使用示例
class StreamingTextProcessor:
"""流式文本处理"""
def __init__(self, llm_client):
self.llm = llm_client
self.window = StreamSlidingWindow(
window_size=20,
step=10,
aggregate_fn=lambda msgs: '\n'.join(msgs)
)
def process_stream(self, text_stream: Iterator[str]) -> Iterator[str]:
"""处理文本流"""
for chunk in text_stream:
result = self.window.add(chunk)
if result:
# 处理窗口内容
yield self._process_window(result)
def _process_window(self, text: str) -> str:
"""处理窗口内容"""
prompt = f"分析以下文本,提取关键信息:\n\n{text}"
return self.llm.generate(prompt)四、生产级滑动窗口管理器
class ProductionSlidingWindow:
"""生产级滑动窗口管理器"""
def __init__(
self,
llm_client,
window_size: int = 4000,
overlap_tokens: int = 500,
strategy: str = "semantic"
):
self.llm = llm_client
self.window_size = window_size
self.overlap_tokens = overlap_tokens
self.strategy = strategy
# 根据策略选择实现
if strategy == "simple":
self.processor = SimpleSlidingWindow(window_size, overlap_tokens)
elif strategy == "overlap":
ratio = overlap_tokens / window_size if window_size > 0 else 0.5
self.processor = OverlapSlidingWindow(window_size, ratio)
elif strategy == "semantic":
self.processor = SemanticSlidingWindow(window_size, overlap_tokens)
else:
raise ValueError(f"Unknown strategy: {strategy}")
def create_windows(self, data: List[Any]) -> List[tuple]:
"""创建窗口"""
if self.strategy == "simple":
windows = self.processor.create_windows(data)
return [(0, w) for w in windows]
elif self.strategy == "overlap":
return self.processor.create_windows(data)
else:
# semantic返回的是字典
return [(i, w) for i, w in enumerate(data)]
def aggregate_windows(
self,
windows: List[tuple],
query: str = None
) -> Any:
"""
聚合窗口结果
"""
if not windows:
return None
# 如果只有一个窗口,直接返回
if len(windows) == 1:
return windows[0][1]
# 如果有查询,使用相关性加权
if query:
return self._query_aware_aggregate(windows, query)
# 默认:返回最后一个窗口
return windows[-1][1]
def _query_aware_aggregate(
self,
windows: List[tuple],
query: str
) -> str:
"""根据查询相关性聚合"""
# 计算每个窗口与查询的相关性
scored = []
for idx, window in windows:
content = window if isinstance(window, str) else str(window)
relevance = self._calculate_relevance(content, query)
scored.append((relevance, idx, window))
# 按相关性排序
scored.sort(key=lambda x: x[0], reverse=True)
# 整合前N个最相关的窗口
top_n = min(3, len(scored))
relevant_contents = [s[2] for s in scored[:top_n]]
return '\n\n'.join(relevant_contents)
def _calculate_relevance(self, content: str, query: str) -> float:
"""计算内容与查询的相关性"""
# 简单实现:关键词重叠
query_words = set(query.split())
content_words = set(content.split())
overlap = query_words & content_words
return len(overlap) / len(query_words) if query_words else 0
# 使用示例
def demo():
"""演示滑动窗口的完整使用"""
# 模拟100轮对话
messages = [
{'role': 'user', 'content': f'第{i}轮对话内容'}
for i in range(100)
]
# 使用不同策略
strategies = ['simple', 'overlap', 'semantic']
for strategy in strategies:
print(f"\n{'='*50}")
print(f"策略: {strategy}")
print('='*50)
manager = ProductionSlidingWindow(
llm_client=llm,
window_size=20,
overlap_tokens=5,
strategy=strategy
)
windows = manager.create_windows(messages)
print(f"窗口数量: {len(windows)}")
# 模拟查询
query = "第50轮对话说了什么"
result = manager.aggregate_windows(windows, query)
print(f"查询结果长度: {len(str(result))}字符")五、滑动窗口的调优指南
参数选择建议
┌─────────────────────────────────────────────────────────────┐
│ 滑动窗口参数调优 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 📊 窗口大小选择 │
│ ├─ 小(<2000 tokens):细节多,准确性高,但可能丢全局信息 │
│ ├─ 中(2000-4000 tokens):平衡的选择 │
│ └─ 大(>4000 tokens):全局视野,但计算成本高 │
│ │
│ 📐 重叠比例选择 │
│ ├─ 25%:计算效率高,但边缘信息可能丢失 │
│ ├─ 50%:常用选择,平衡效率和信息完整 │
│ └─ 75%:信息完整,但计算量大 │
│ │
│ 🎯 步长选择 │
│ └─ 步长 = 窗口大小 × (1 - 重叠比例) │
│ │
└─────────────────────────────────────────────────────────────┘
常见问题与解决
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 边界信息丢失 | 窗口截断在语义边界 | 使用语义滑动窗口 |
| 重复处理 | 窗口重叠 | 记录已处理位置 |
| 内存爆炸 | 窗口数量太多 | 使用迭代器而不是列表 |
| 结果不一致 | 随机性 | 固定随机种子 |
六、实战:构建一个问答机器人
class SlidingWindowQABot:
"""使用滑动窗口的问答机器人"""
def __init__(self, llm_client, max_context_tokens: int = 8000):
self.llm = llm_client
self.max_context_tokens = max_context_tokens
self.conversation_history = []
def ask(self, question: str) -> str:
"""问答"""
# 1. 添加用户问题
self.conversation_history.append({
'role': 'user',
'content': question
})
# 2. 构建上下文
context = self._build_context(question)
# 3. 生成回答
prompt = f"""{context}
用户问题:{question}
请基于对话历史和上下文回答。"""
answer = self.llm.generate(prompt)
# 4. 保存回答
self.conversation_history.append({
'role': 'assistant',
'content': answer
})
return answer
def _build_context(self, current_question: str) -> str:
"""构建上下文"""
# 将对话历史转为文本
history_text = self._format_history()
# 估算token
history_tokens = self._estimate_tokens(history_text)
question_tokens = self._estimate_tokens(current_question)
# 如果历史太长,使用滑动窗口
if history_tokens + question_tokens > self.max_context_tokens:
sw = SemanticSlidingWindow(
max_tokens=self.max_context_tokens - question_tokens,
overlap_tokens=500
)
# 将历史按语义分块
chunks = sw.create_chunks(history_text)
# 返回最近的chunk
if chunks:
return chunks[-1]['content']
return history_text
def _format_history(self) -> str:
return '\n'.join([
f"{m['role']}: {m['content']}"
for m in self.conversation_history
])
@staticmethod
def _estimate_tokens(text: str) -> int:
chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
english = len(text.split()) - chinese
return int(chinese * 0.5 + english * 0.25)
# 使用示例
if __name__ == "__main__":
bot = SlidingWindowQABot(llm=llm, max_context_tokens=8000)
# 模拟多轮对话
questions = [
"我叫小明,喜欢Python编程",
"帮我写一个排序算法",
"用快速排序实现",
"优化一下性能",
"我叫小明,记得吗?" # 测试记忆
]
for q in questions:
print(f"\n问题: {q}")
answer = bot.ask(q)
print(f"回答: {answer[:100]}...")七、总结
┌─────────────────────────────────────────────────────────────┐
│ 滑动窗口核心要点 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 🎯 核心原理 │
│ └─ 固定大小 + 移动步长 = 连续覆盖长序列 │
│ │
│ 🔧 三种实现 │
│ ├─ SimpleSlidingWindow:基础,适合规则数据 │
│ ├─ OverlapSlidingWindow:保持连续性,常用 │
│ └─ SemanticSlidingWindow:智能,适合文本 │
│ │
│ 📊 参数选择 │
│ ├─ 窗口大小:取决于模型限制和任务需求 │
│ ├─ 重叠比例:50%左右效果较好 │
│ └─ 步长:窗口大小 × (1 - 重叠比例) │
│ │
│ 💡 应用场景 │
│ ├─ 长文档处理:分块 + 并行 + 合并 │
│ ├─ 对话历史管理:滑动 + 摘要 │
│ └─ 流式数据处理:实时窗口 + 聚合 │
│ │
│ ⚠️ 注意事项 │
│ ├─ 避免在语义边界截断 │
│ ├─ 边缘信息用重叠补救 │
│ └─ 大数据用迭代器节省内存 │
│ │
└─────────────────────────────────────────────────────────────┘
相关主题
参考文献
- Beltagy, I., et al. (2020). Longformer: The Long-Document Transformer. arXiv.
- Zaheer, M., et al. (2020). Big Bird: Transformers for Longer Sequences. NeurIPS.
- Child, R., et al. (2019). Generating Long Sequences with Sparse Transformers. arXiv.