摘要
滑动窗口技术是处理长序列上下文的核心方法,通过固定窗口或重叠窗口机制实现信息的连续处理。本文详细讲解固定窗口与滑动窗口的设计原理、重叠窗口的参数选择、与摘要技术的结合策略,并提供完整的Python实现代码。
关键词速览
| 术语 | 英文 | 说明 |
|---|---|---|
| 滑动窗口 | Sliding Window | 固定大小窗口的移动机制 |
| 步长 | Stride | 窗口每次移动的距离 |
| 重叠率 | Overlap Ratio | 相邻窗口重叠比例 |
| 固定窗口 | Fixed Window | 大小不变的处理窗口 |
| 上下文分块 | Context Chunking | 将长文本分割为块 |
| 跳跃连接 | Skip Connection | 跨窗口的信息传递 |
| 局部注意力 | Local Attention | 只关注局部范围的注意力 |
| 全局注意力 | Global Attention | 关注所有位置的注意力 |
| 分层窗口 | Hierarchical Window | 多尺度窗口结构 |
| 动态窗口 | Dynamic Window | 大小可变的窗口 |
一、滑动窗口基础原理
1.1 什么是滑动窗口
滑动窗口是一种处理序列数据的技术,通过一个固定大小的”窗口”在序列上滑动,每次处理窗口覆盖的内容,然后移动到下一个位置。这与卷积神经网络中的卷积核滑动类似,但应用于文本序列和注意力计算。
在LLM上下午处理中,滑动窗口解决的核心问题是:如何用有限的上下文窗口处理理论上无限长的序列?
1.2 固定窗口 vs 滑动窗口
1.2.1 固定窗口
固定窗口是最简单的分块策略,将序列均匀切分为固定大小的块:
序列: [A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 A12]
|_____| |_____| |_____| |_____|
Chunk 1 Chunk 2 Chunk 3 Chunk 4
(无重叠)
特点:
- 实现简单,计算效率高
- 每个块独立处理,无信息丢失(边界除外)
- 适合结构清晰的文档
- 缺点:可能切断语义单元(如句子、段落)
def fixed_window_split(text: str, chunk_size: int = 1000) -> list:
"""固定窗口分割"""
tokens = text.split() # 简单按空格分割
chunks = []
for i in range(0, len(tokens), chunk_size):
chunk = ' '.join(tokens[i:i + chunk_size])
chunks.append({
'content': chunk,
'start': i,
'end': min(i + chunk_size, len(tokens))
})
return chunks1.2.2 滑动窗口
滑动窗口通过重叠区域保持上下文连续性:
序列: [A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 A12]
|_____|
Chunk 1 (窗口1-5)
|_____|
Chunk 2 (窗口3-7) - 包含重叠
|_____|
Chunk 3 (窗口5-9)
|_____|
Chunk 4 (窗口7-11)
特点:
- 保持相邻块之间的上下文连续性
- 边界信息可以被多次处理
- 适合处理跨块的语义依赖
- 代价是增加计算量和存储需求
def sliding_window_split(
text: str,
chunk_size: int = 1000,
stride: int = 500
) -> list:
"""
滑动窗口分割
Args:
text: 输入文本
chunk_size: 窗口大小(token数)
stride: 步长(每次移动的距离)
"""
tokens = text.split()
chunks = []
for i in range(0, len(tokens), stride):
chunk_tokens = tokens[i:i + chunk_size]
if len(chunk_tokens) < chunk_size // 2: # 最后一个窗口太小则跳过
continue
chunks.append({
'content': ' '.join(chunk_tokens),
'start': i,
'end': min(i + chunk_size, len(tokens)),
'window_id': len(chunks)
})
return chunks二、重叠窗口设计详解
2.1 核心参数
重叠窗口的设计涉及三个核心参数:
| 参数 | 定义 | 影响 |
|---|---|---|
| 窗口大小 (Window Size) | 每个窗口包含的token数量 | 决定单次处理的上下文量 |
| 步长 (Stride) | 窗口每次移动的距离 | 决定重叠程度 |
| 重叠率 (Overlap) | 相邻窗口重叠的比例 | = (窗口大小 - 步长) / 窗口大小 |
重叠率计算:
def calculate_overlap_ratio(window_size: int, stride: int) -> float:
"""计算重叠率"""
overlap = window_size - stride
return overlap / window_size
# 示例
print(calculate_overlap_ratio(1000, 500)) # 0.5 (50%重叠)
print(calculate_overlap_ratio(1000, 750)) # 0.25 (25%重叠)
print(calculate_overlap_ratio(1000, 250)) # 0.75 (75%重叠)2.2 步长选择的艺术
步长的选择需要在覆盖率和效率之间取得平衡:
| 场景 | 推荐步长 | 重叠率 | 说明 |
|---|---|---|---|
| 高精度检索 | 窗口大小的25-33% | 67-75% | 减少信息遗漏 |
| 标准处理 | 窗口大小的50% | 50% | 平衡效率与覆盖率 |
| 高效处理 | 窗口大小的75-80% | 20-25% | 最大化效率 |
| 边界敏感 | 可变步长 | 端点加倍重叠 | 保护文档边界 |
def adaptive_stride(
text_length: int,
window_size: int,
mode: str = "balanced"
) -> int:
"""
自适应步长计算
mode: 'precision' (高精度), 'balanced' (平衡), 'efficient' (高效)
"""
if mode == "precision":
return int(window_size * 0.25) # 75%重叠
elif mode == "efficient":
return int(window_size * 0.8) # 20%重叠
else: # balanced
return int(window_size * 0.5) # 50%重叠2.3 重叠边界设计
2.3.1 句子边界对齐
尽量让窗口边界与句子边界对齐,减少语义切割:
import re
def sentence_aware_split(
text: str,
window_size: int,
stride: int
) -> list:
"""句子感知的滑动窗口分割"""
# 按句子分割
sentences = re.split(r'[。!?.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks = []
current_tokens = []
current_size = 0
for sentence in sentences:
sentence_tokens = sentence.split()
sentence_size = len(sentence_tokens)
# 如果加上这个句子会超出窗口大小
if current_size + sentence_size > window_size:
# 保存当前窗口
if current_tokens:
chunks.append(' '.join(current_tokens))
# 检查是否需要回溯以保持重叠
# 取前一个句子的后半部分作为新窗口的开始
overlap_tokens = []
if len(chunks) > 0 and stride < window_size:
overlap_count = window_size - stride
overlap_tokens = current_tokens[-overlap_count:]
current_tokens = overlap_tokens + sentence_tokens
current_size = len(current_tokens)
else:
current_tokens.extend(sentence_tokens)
current_size += sentence_size
# 处理最后一个窗口
if current_tokens:
chunks.append(' '.join(current_tokens))
return chunks2.3.2 段落边界对齐
def paragraph_aware_split(
text: str,
window_size: int,
stride: int
) -> list:
"""段落感知的滑动窗口分割"""
paragraphs = text.split('\n\n')
chunks = []
current_tokens = []
paragraph_boundaries = [] # 记录段落边界位置
for para in paragraphs:
para_tokens = para.split()
para_size = len(para_tokens)
if para_size > window_size:
# 段落本身太长,需要进一步分割
if current_tokens:
chunks.append(' '.join(current_tokens))
current_tokens = []
# 对大段落使用固定窗口
sub_chunks = fixed_window_split(para, window_size)
chunks.extend([c['content'] for c in sub_chunks])
else:
if current_size + para_size > window_size:
chunks.append(' '.join(current_tokens))
current_tokens = para_tokens
else:
current_tokens.extend(para_tokens)
if current_tokens:
chunks.append(' '.join(current_tokens))
return chunks三、与摘要技术的结合策略
3.1 分层处理架构
原始长文档
↓
[层1: 滑动窗口分块]
↓
[层2: 每块独立摘要]
↓
[层3: 摘要聚合]
↓
[层4: 最终全局摘要]
↓
用户查询/LLM处理
3.2 增量摘要策略
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class ChunkSummary:
chunk_id: int
content: str
summary: str
key_points: List[str]
start_token: int
end_token: int
class IncrementalSummarizer:
"""增量摘要处理器"""
def __init__(
self,
llm_client,
window_size: int = 2000,
stride: int = 1000
):
self.window_size = window_size
self.stride = stride
self.llm = llm_client
def process_document(
self,
document: str,
summary_prompt: str = None
) -> List[ChunkSummary]:
"""处理文档并生成每块摘要"""
if summary_prompt is None:
summary_prompt = """请总结以下文本的核心内容,提取关键观点。
格式要求:
- 摘要:不超过100字
- 关键点:3-5个要点"""
chunks = sliding_window_split(
document,
self.window_size,
self.stride
)
results = []
for i, chunk in enumerate(chunks):
# 对每个块生成摘要
summary_response = self.llm.generate(
f"{summary_prompt}\n\n文本:\n{chunk['content']}"
)
results.append(ChunkSummary(
chunk_id=i,
content=chunk['content'],
summary=summary_response.summary,
key_points=summary_response.key_points,
start_token=chunk['start'],
end_token=chunk['end']
))
return results
def aggregate_summaries(
self,
chunk_summaries: List[ChunkSummary],
focus_topics: List[str] = None
) -> str:
"""聚合多个块的摘要"""
# 构建摘要树
summary_tree = "\n\n".join([
f"## Chunk {cs.chunk_id} (tokens {cs.start_token}-{cs.end_token})\n"
f"{cs.summary}\n"
f"关键点:\n" + "\n".join([f"- {kp}" for kp in cs.key_points])
for cs in chunk_summaries
])
# 聚合提示
aggregate_prompt = f"""以下是文档各部分的摘要,请整合为一个连贯的全局摘要:
{summary_tree}
{"注意:重点关注以下主题:" + ", ".join(focus_topics) if focus_topics else ""}
"""
final_summary = self.llm.generate(aggregate_prompt)
return final_summary3.3 带记忆的滑动窗口
class MemoryAugmentedSlidingWindow:
"""带记忆的滑动窗口处理器"""
def __init__(
self,
window_size: int,
stride: int,
memory_size: int = 500
):
self.window_size = window_size
self.stride = stride
self.memory_size = memory_size # 携带到下一个窗口的记忆大小
self.memory_buffer = []
def process_with_memory(
self,
chunks: List[str],
process_func: callable
) -> List:
"""使用记忆处理所有块"""
results = []
for i, chunk in enumerate(chunks):
# 准备上下文:记忆 + 当前块
memory_content = ' '.join(self.memory_buffer) if self.memory_buffer else ""
if memory_content:
context = f"[前文摘要]\n{memory_content}\n\n[当前内容]\n{chunk}"
else:
context = chunk
# 处理当前块
result = process_func(context)
results.append(result)
# 更新记忆:从当前块中提取信息存入记忆
self.update_memory(result)
return results
def update_memory(self, new_info: str):
"""更新记忆缓冲区"""
# 简单的FIFO更新策略
self.memory_buffer.append(new_info)
# 限制记忆大小
total_memory_tokens = sum(len(m.split()) for m in self.memory_buffer)
while total_memory_tokens > self.memory_size and self.memory_buffer:
removed = self.memory_buffer.pop(0)
total_memory_tokens -= len(removed.split())四、完整实现示例
4.1 生产级滑动窗口处理器
import tiktoken
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from enum import Enum
class OverlapStrategy(Enum):
FIXED = "fixed"
SENTENCE_AWARE = "sentence_aware"
SEMANTIC = "semantic"
@dataclass
class WindowConfig:
"""窗口配置"""
window_size: int = 4000 # tokens
stride: int = 2000 # tokens
overlap_strategy: OverlapStrategy = OverlapStrategy.FIXED
min_chunk_size: int = 500 # tokens
preserve_metadata: bool = True
@dataclass
class Chunk:
"""分块结果"""
chunk_id: int
content: str
token_count: int
start_position: int
end_position: int
metadata: Dict = field(default_factory=dict)
overlaps_with: List[int] = field(default_factory=list)
class ProductionSlidingWindow:
"""生产级滑动窗口处理器"""
def __init__(
self,
config: WindowConfig,
model_name: str = "cl100k_base" # GPT-4/Claude使用的tokenizer
):
self.config = config
self.enc = tiktoken.get_encoding(model_name)
def split(self, text: str, metadata: Optional[Dict] = None) -> List[Chunk]:
"""执行滑动窗口分割"""
tokens = self.enc.encode(text)
total_tokens = len(tokens)
chunks = []
start = 0
while start < total_tokens:
end = min(start + self.config.window_size, total_tokens)
chunk_tokens = tokens[start:end]
chunk_content = self.enc.decode(chunk_tokens)
token_count = len(chunk_tokens)
# 检查最小块大小
if token_count >= self.config.min_chunk_size or start == 0:
chunk = Chunk(
chunk_id=len(chunks),
content=chunk_content,
token_count=token_count,
start_position=start,
end_position=end,
metadata=metadata or {},
overlaps_with=[]
)
chunks.append(chunk)
# 移动窗口
next_start = start + self.config.stride
# 记录重叠关系
if chunks and next_start < end:
# 找到重叠的前一个块
prev_chunk_id = len(chunks) - 1
if prev_chunk_id > 0:
chunks[-1].overlaps_with.append(prev_chunk_id - 1)
start = next_start
# 清理最后一个块的标记
if chunks and start >= total_tokens:
# 如果最后一个块太小,合并到前一个
if chunks[-1].token_count < self.config.min_chunk_size:
last_chunk = chunks.pop()
if chunks:
chunks[-1].content += "\n\n" + last_chunk.content
chunks[-1].end_position = last_chunk.end_position
chunks[-1].token_count += last_chunk.token_count
return chunks
def process_with_overlap(
self,
text: str,
process_func: callable,
metadata: Optional[Dict] = None
) -> List:
"""使用滑动窗口处理文本"""
chunks = self.split(text, metadata)
results = []
for i, chunk in enumerate(chunks):
# 构建上下文:包含重叠部分
context_parts = [chunk.content]
# 添加前一个重叠块的部分内容
if i > 0 and chunks[i-1].overlaps_with:
prev_chunk_id = chunks[i-1].overlaps_with[0]
if prev_chunk_id < len(chunks):
context_parts.insert(0, f"[上文延续]\n{chunks[prev_chunk_id].content[-500:]}")
context = "\n\n".join(context_parts)
# 处理
result = process_func(context, chunk)
results.append(result)
return results
# 使用示例
config = WindowConfig(
window_size=4000,
stride=2000,
overlap_strategy=OverlapStrategy.SENTENCE_AWARE,
min_chunk_size=500
)
processor = ProductionSlidingWindow(config)
def summarize_chunk(context: str, chunk: Chunk) -> dict:
"""处理单个块的函数"""
# 这里应该调用LLM进行摘要
# 简化示例
return {
'chunk_id': chunk.chunk_id,
'summary': f"摘要: {chunk.content[:100]}...",
'token_count': chunk.token_count
}
text = open("long_document.txt").read()
results = processor.process_with_overlap(text, summarize_chunk)4.2 自适应重叠窗口
class AdaptiveOverlapWindow:
"""基于内容自适应调整重叠"""
def __init__(self, base_window_size: int = 4000):
self.base_window_size = base_window_size
def calculate_adaptive_stride(
self,
chunk: str,
chunk_boundaries: List[str]
) -> int:
"""根据内容边界计算自适应步长"""
# 检测语义边界
semantic_boundaries = self._find_semantic_boundaries(chunk)
# 如果有强语义边界,可以增大步长
if len(semantic_boundaries) > 0:
# 找到最接近窗口中点的语义边界
mid_point = len(chunk) // 2
nearest_boundary = min(
semantic_boundaries,
key=lambda b: abs(b - mid_point)
)
# 调整步长使其正好落在边界处
stride = nearest_boundary - (self.base_window_size // 4)
return max(stride, self.base_window_size // 2)
# 默认使用50%重叠
return self.base_window_size // 2
def _find_semantic_boundaries(self, text: str) -> List[int]:
"""查找语义边界(句子、段落边界)"""
import re
boundaries = []
# 段落边界
paragraph_pattern = r'\n\n+'
for match in re.finditer(paragraph_pattern, text):
boundaries.append(match.start())
# 句子边界
sentence_pattern = r'[。!?.!?]\s+'
for match in re.finditer(sentence_pattern, text):
if match.start() > 100 and match.start() < len(text) - 100: # 避开首尾
boundaries.append(match.start())
return sorted(set(boundaries))五、性能优化
5.1 并行处理
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelSlidingWindow:
"""并行滑动窗口处理"""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
def process_parallel(
self,
chunks: List[Chunk],
process_func: callable
) -> List:
"""并行处理多个块"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = list(executor.map(process_func, chunks))
return results
async def process_async(
self,
chunks: List[Chunk],
process_func: callable
) -> List:
"""异步处理多个块"""
tasks = [process_func(chunk) for chunk in chunks]
results = await asyncio.gather(*tasks)
return results5.2 缓存优化
from functools import lru_cache
class CachedSlidingWindow:
"""带缓存的滑动窗口"""
def __init__(self, window_size: int, stride: int):
self.window_size = window_size
self.stride = stride
self._cache = {}
def _get_cache_key(self, text_hash: str, start: int, end: int) -> str:
return f"{text_hash}_{start}_{end}"
@lru_cache(maxsize=1000)
def get_chunk_cached(self, text: str, start: int, end: int) -> str:
"""缓存的块获取"""
return text[start:end]六、实战配置建议
| 场景 | 窗口大小 | 步长 | 重叠率 | 策略 |
|---|---|---|---|---|
| 通用文档 | 4000 | 2000 | 50% | 固定 |
| 代码分析 | 2000 | 1000 | 50% | 语义感知 |
| 长篇小说 | 8000 | 6000 | 25% | 段落感知 |
| 法律文档 | 3000 | 1500 | 50% | 句子感知 |
| 对话历史 | 2000 | 1500 | 25% | 消息边界 |
七、相关主题
八、参考文献
- Beltagy, I., et al. (2020). Longformer: The Long-Document Transformer. arXiv.
- Child, R., et al. (2019). Generating Long Sequences with Sparse Transformers. arXiv.
- Zaheer, M., et al. (2020). Big Bird: Transformers for Longer Sequences. NeurIPS.