摘要

滑动窗口技术是处理长序列上下文的核心方法,通过固定窗口或重叠窗口机制实现信息的连续处理。本文详细讲解固定窗口与滑动窗口的设计原理、重叠窗口的参数选择、与摘要技术的结合策略,并提供完整的Python实现代码。

关键词速览

术语英文说明
滑动窗口Sliding Window固定大小窗口的移动机制
步长Stride窗口每次移动的距离
重叠率Overlap Ratio相邻窗口重叠比例
固定窗口Fixed Window大小不变的处理窗口
上下文分块Context Chunking将长文本分割为块
跳跃连接Skip Connection跨窗口的信息传递
局部注意力Local Attention只关注局部范围的注意力
全局注意力Global Attention关注所有位置的注意力
分层窗口Hierarchical Window多尺度窗口结构
动态窗口Dynamic Window大小可变的窗口

一、滑动窗口基础原理

1.1 什么是滑动窗口

滑动窗口是一种处理序列数据的技术,通过一个固定大小的”窗口”在序列上滑动,每次处理窗口覆盖的内容,然后移动到下一个位置。这与卷积神经网络中的卷积核滑动类似,但应用于文本序列和注意力计算。

在LLM上下午处理中,滑动窗口解决的核心问题是:如何用有限的上下文窗口处理理论上无限长的序列?

1.2 固定窗口 vs 滑动窗口

1.2.1 固定窗口

固定窗口是最简单的分块策略,将序列均匀切分为固定大小的块:

序列: [A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 A12]
      |_____|     |_____|     |_____|     |_____|
      Chunk 1      Chunk 2     Chunk 3     Chunk 4
      (无重叠)

特点

  • 实现简单,计算效率高
  • 每个块独立处理,无信息丢失(边界除外)
  • 适合结构清晰的文档
  • 缺点:可能切断语义单元(如句子、段落)
def fixed_window_split(text: str, chunk_size: int = 1000) -> list:
    """固定窗口分割"""
    tokens = text.split()  # 简单按空格分割
    chunks = []
    
    for i in range(0, len(tokens), chunk_size):
        chunk = ' '.join(tokens[i:i + chunk_size])
        chunks.append({
            'content': chunk,
            'start': i,
            'end': min(i + chunk_size, len(tokens))
        })
    
    return chunks

1.2.2 滑动窗口

滑动窗口通过重叠区域保持上下文连续性:

序列: [A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 A12]
      |_____|
      Chunk 1 (窗口1-5)
          |_____|
          Chunk 2 (窗口3-7) - 包含重叠
              |_____|
              Chunk 3 (窗口5-9)
                  |_____|
                  Chunk 4 (窗口7-11)

特点

  • 保持相邻块之间的上下文连续性
  • 边界信息可以被多次处理
  • 适合处理跨块的语义依赖
  • 代价是增加计算量和存储需求
def sliding_window_split(
    text: str, 
    chunk_size: int = 1000, 
    stride: int = 500
) -> list:
    """
    滑动窗口分割
    
    Args:
        text: 输入文本
        chunk_size: 窗口大小(token数)
        stride: 步长(每次移动的距离)
    """
    tokens = text.split()
    chunks = []
    
    for i in range(0, len(tokens), stride):
        chunk_tokens = tokens[i:i + chunk_size]
        if len(chunk_tokens) < chunk_size // 2:  # 最后一个窗口太小则跳过
            continue
            
        chunks.append({
            'content': ' '.join(chunk_tokens),
            'start': i,
            'end': min(i + chunk_size, len(tokens)),
            'window_id': len(chunks)
        })
    
    return chunks

二、重叠窗口设计详解

2.1 核心参数

重叠窗口的设计涉及三个核心参数:

参数定义影响
窗口大小 (Window Size)每个窗口包含的token数量决定单次处理的上下文量
步长 (Stride)窗口每次移动的距离决定重叠程度
重叠率 (Overlap)相邻窗口重叠的比例= (窗口大小 - 步长) / 窗口大小

重叠率计算

def calculate_overlap_ratio(window_size: int, stride: int) -> float:
    """计算重叠率"""
    overlap = window_size - stride
    return overlap / window_size
 
# 示例
print(calculate_overlap_ratio(1000, 500))  # 0.5 (50%重叠)
print(calculate_overlap_ratio(1000, 750))  # 0.25 (25%重叠)
print(calculate_overlap_ratio(1000, 250))  # 0.75 (75%重叠)

2.2 步长选择的艺术

步长的选择需要在覆盖率和效率之间取得平衡:

场景推荐步长重叠率说明
高精度检索窗口大小的25-33%67-75%减少信息遗漏
标准处理窗口大小的50%50%平衡效率与覆盖率
高效处理窗口大小的75-80%20-25%最大化效率
边界敏感可变步长端点加倍重叠保护文档边界
def adaptive_stride(
    text_length: int,
    window_size: int,
    mode: str = "balanced"
) -> int:
    """
    自适应步长计算
    
    mode: 'precision' (高精度), 'balanced' (平衡), 'efficient' (高效)
    """
    if mode == "precision":
        return int(window_size * 0.25)  # 75%重叠
    elif mode == "efficient":
        return int(window_size * 0.8)   # 20%重叠
    else:  # balanced
        return int(window_size * 0.5)   # 50%重叠

2.3 重叠边界设计

2.3.1 句子边界对齐

尽量让窗口边界与句子边界对齐,减少语义切割:

import re
 
def sentence_aware_split(
    text: str,
    window_size: int,
    stride: int
) -> list:
    """句子感知的滑动窗口分割"""
    # 按句子分割
    sentences = re.split(r'[。!?.!?]+', text)
    sentences = [s.strip() for s in sentences if s.strip()]
    
    chunks = []
    current_tokens = []
    current_size = 0
    
    for sentence in sentences:
        sentence_tokens = sentence.split()
        sentence_size = len(sentence_tokens)
        
        # 如果加上这个句子会超出窗口大小
        if current_size + sentence_size > window_size:
            # 保存当前窗口
            if current_tokens:
                chunks.append(' '.join(current_tokens))
            
            # 检查是否需要回溯以保持重叠
            # 取前一个句子的后半部分作为新窗口的开始
            overlap_tokens = []
            if len(chunks) > 0 and stride < window_size:
                overlap_count = window_size - stride
                overlap_tokens = current_tokens[-overlap_count:]
            
            current_tokens = overlap_tokens + sentence_tokens
            current_size = len(current_tokens)
        else:
            current_tokens.extend(sentence_tokens)
            current_size += sentence_size
    
    # 处理最后一个窗口
    if current_tokens:
        chunks.append(' '.join(current_tokens))
    
    return chunks

2.3.2 段落边界对齐

def paragraph_aware_split(
    text: str,
    window_size: int,
    stride: int
) -> list:
    """段落感知的滑动窗口分割"""
    paragraphs = text.split('\n\n')
    chunks = []
    
    current_tokens = []
    paragraph_boundaries = []  # 记录段落边界位置
    
    for para in paragraphs:
        para_tokens = para.split()
        para_size = len(para_tokens)
        
        if para_size > window_size:
            # 段落本身太长,需要进一步分割
            if current_tokens:
                chunks.append(' '.join(current_tokens))
                current_tokens = []
            
            # 对大段落使用固定窗口
            sub_chunks = fixed_window_split(para, window_size)
            chunks.extend([c['content'] for c in sub_chunks])
        else:
            if current_size + para_size > window_size:
                chunks.append(' '.join(current_tokens))
                current_tokens = para_tokens
            else:
                current_tokens.extend(para_tokens)
    
    if current_tokens:
        chunks.append(' '.join(current_tokens))
    
    return chunks

三、与摘要技术的结合策略

3.1 分层处理架构

原始长文档
    ↓
[层1: 滑动窗口分块]
    ↓
[层2: 每块独立摘要]
    ↓
[层3: 摘要聚合]
    ↓
[层4: 最终全局摘要]
    ↓
用户查询/LLM处理

3.2 增量摘要策略

from dataclasses import dataclass
from typing import List, Optional
 
@dataclass
class ChunkSummary:
    chunk_id: int
    content: str
    summary: str
    key_points: List[str]
    start_token: int
    end_token: int
 
class IncrementalSummarizer:
    """增量摘要处理器"""
    
    def __init__(
        self,
        llm_client,
        window_size: int = 2000,
        stride: int = 1000
    ):
        self.window_size = window_size
        self.stride = stride
        self.llm = llm_client
        
    def process_document(
        self,
        document: str,
        summary_prompt: str = None
    ) -> List[ChunkSummary]:
        """处理文档并生成每块摘要"""
        if summary_prompt is None:
            summary_prompt = """请总结以下文本的核心内容,提取关键观点。
格式要求:
- 摘要:不超过100字
- 关键点:3-5个要点"""
        
        chunks = sliding_window_split(
            document, 
            self.window_size, 
            self.stride
        )
        
        results = []
        for i, chunk in enumerate(chunks):
            # 对每个块生成摘要
            summary_response = self.llm.generate(
                f"{summary_prompt}\n\n文本:\n{chunk['content']}"
            )
            
            results.append(ChunkSummary(
                chunk_id=i,
                content=chunk['content'],
                summary=summary_response.summary,
                key_points=summary_response.key_points,
                start_token=chunk['start'],
                end_token=chunk['end']
            ))
        
        return results
    
    def aggregate_summaries(
        self,
        chunk_summaries: List[ChunkSummary],
        focus_topics: List[str] = None
    ) -> str:
        """聚合多个块的摘要"""
        # 构建摘要树
        summary_tree = "\n\n".join([
            f"## Chunk {cs.chunk_id} (tokens {cs.start_token}-{cs.end_token})\n"
            f"{cs.summary}\n"
            f"关键点:\n" + "\n".join([f"- {kp}" for kp in cs.key_points])
            for cs in chunk_summaries
        ])
        
        # 聚合提示
        aggregate_prompt = f"""以下是文档各部分的摘要,请整合为一个连贯的全局摘要:
 
{summary_tree}
 
{"注意:重点关注以下主题:" + ", ".join(focus_topics) if focus_topics else ""}
"""
        
        final_summary = self.llm.generate(aggregate_prompt)
        return final_summary

3.3 带记忆的滑动窗口

class MemoryAugmentedSlidingWindow:
    """带记忆的滑动窗口处理器"""
    
    def __init__(
        self,
        window_size: int,
        stride: int,
        memory_size: int = 500
    ):
        self.window_size = window_size
        self.stride = stride
        self.memory_size = memory_size  # 携带到下一个窗口的记忆大小
        
        self.memory_buffer = []
        
    def process_with_memory(
        self,
        chunks: List[str],
        process_func: callable
    ) -> List:
        """使用记忆处理所有块"""
        results = []
        
        for i, chunk in enumerate(chunks):
            # 准备上下文:记忆 + 当前块
            memory_content = ' '.join(self.memory_buffer) if self.memory_buffer else ""
            
            if memory_content:
                context = f"[前文摘要]\n{memory_content}\n\n[当前内容]\n{chunk}"
            else:
                context = chunk
            
            # 处理当前块
            result = process_func(context)
            results.append(result)
            
            # 更新记忆:从当前块中提取信息存入记忆
            self.update_memory(result)
        
        return results
    
    def update_memory(self, new_info: str):
        """更新记忆缓冲区"""
        # 简单的FIFO更新策略
        self.memory_buffer.append(new_info)
        
        # 限制记忆大小
        total_memory_tokens = sum(len(m.split()) for m in self.memory_buffer)
        while total_memory_tokens > self.memory_size and self.memory_buffer:
            removed = self.memory_buffer.pop(0)
            total_memory_tokens -= len(removed.split())

四、完整实现示例

4.1 生产级滑动窗口处理器

import tiktoken
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from enum import Enum
 
class OverlapStrategy(Enum):
    FIXED = "fixed"
    SENTENCE_AWARE = "sentence_aware"
    SEMANTIC = "semantic"
 
@dataclass
class WindowConfig:
    """窗口配置"""
    window_size: int = 4000  # tokens
    stride: int = 2000       # tokens
    overlap_strategy: OverlapStrategy = OverlapStrategy.FIXED
    min_chunk_size: int = 500  # tokens
    preserve_metadata: bool = True
 
@dataclass
class Chunk:
    """分块结果"""
    chunk_id: int
    content: str
    token_count: int
    start_position: int
    end_position: int
    metadata: Dict = field(default_factory=dict)
    overlaps_with: List[int] = field(default_factory=list)
 
class ProductionSlidingWindow:
    """生产级滑动窗口处理器"""
    
    def __init__(
        self,
        config: WindowConfig,
        model_name: str = "cl100k_base"  # GPT-4/Claude使用的tokenizer
    ):
        self.config = config
        self.enc = tiktoken.get_encoding(model_name)
        
    def split(self, text: str, metadata: Optional[Dict] = None) -> List[Chunk]:
        """执行滑动窗口分割"""
        tokens = self.enc.encode(text)
        total_tokens = len(tokens)
        
        chunks = []
        start = 0
        
        while start < total_tokens:
            end = min(start + self.config.window_size, total_tokens)
            
            chunk_tokens = tokens[start:end]
            chunk_content = self.enc.decode(chunk_tokens)
            token_count = len(chunk_tokens)
            
            # 检查最小块大小
            if token_count >= self.config.min_chunk_size or start == 0:
                chunk = Chunk(
                    chunk_id=len(chunks),
                    content=chunk_content,
                    token_count=token_count,
                    start_position=start,
                    end_position=end,
                    metadata=metadata or {},
                    overlaps_with=[]
                )
                chunks.append(chunk)
            
            # 移动窗口
            next_start = start + self.config.stride
            
            # 记录重叠关系
            if chunks and next_start < end:
                # 找到重叠的前一个块
                prev_chunk_id = len(chunks) - 1
                if prev_chunk_id > 0:
                    chunks[-1].overlaps_with.append(prev_chunk_id - 1)
            
            start = next_start
        
        # 清理最后一个块的标记
        if chunks and start >= total_tokens:
            # 如果最后一个块太小,合并到前一个
            if chunks[-1].token_count < self.config.min_chunk_size:
                last_chunk = chunks.pop()
                if chunks:
                    chunks[-1].content += "\n\n" + last_chunk.content
                    chunks[-1].end_position = last_chunk.end_position
                    chunks[-1].token_count += last_chunk.token_count
        
        return chunks
    
    def process_with_overlap(
        self,
        text: str,
        process_func: callable,
        metadata: Optional[Dict] = None
    ) -> List:
        """使用滑动窗口处理文本"""
        chunks = self.split(text, metadata)
        
        results = []
        for i, chunk in enumerate(chunks):
            # 构建上下文:包含重叠部分
            context_parts = [chunk.content]
            
            # 添加前一个重叠块的部分内容
            if i > 0 and chunks[i-1].overlaps_with:
                prev_chunk_id = chunks[i-1].overlaps_with[0]
                if prev_chunk_id < len(chunks):
                    context_parts.insert(0, f"[上文延续]\n{chunks[prev_chunk_id].content[-500:]}")
            
            context = "\n\n".join(context_parts)
            
            # 处理
            result = process_func(context, chunk)
            results.append(result)
        
        return results
 
# 使用示例
config = WindowConfig(
    window_size=4000,
    stride=2000,
    overlap_strategy=OverlapStrategy.SENTENCE_AWARE,
    min_chunk_size=500
)
 
processor = ProductionSlidingWindow(config)
 
def summarize_chunk(context: str, chunk: Chunk) -> dict:
    """处理单个块的函数"""
    # 这里应该调用LLM进行摘要
    # 简化示例
    return {
        'chunk_id': chunk.chunk_id,
        'summary': f"摘要: {chunk.content[:100]}...",
        'token_count': chunk.token_count
    }
 
text = open("long_document.txt").read()
results = processor.process_with_overlap(text, summarize_chunk)

4.2 自适应重叠窗口

class AdaptiveOverlapWindow:
    """基于内容自适应调整重叠"""
    
    def __init__(self, base_window_size: int = 4000):
        self.base_window_size = base_window_size
        
    def calculate_adaptive_stride(
        self,
        chunk: str,
        chunk_boundaries: List[str]
    ) -> int:
        """根据内容边界计算自适应步长"""
        
        # 检测语义边界
        semantic_boundaries = self._find_semantic_boundaries(chunk)
        
        # 如果有强语义边界,可以增大步长
        if len(semantic_boundaries) > 0:
            # 找到最接近窗口中点的语义边界
            mid_point = len(chunk) // 2
            nearest_boundary = min(
                semantic_boundaries,
                key=lambda b: abs(b - mid_point)
            )
            
            # 调整步长使其正好落在边界处
            stride = nearest_boundary - (self.base_window_size // 4)
            return max(stride, self.base_window_size // 2)
        
        # 默认使用50%重叠
        return self.base_window_size // 2
    
    def _find_semantic_boundaries(self, text: str) -> List[int]:
        """查找语义边界(句子、段落边界)"""
        import re
        boundaries = []
        
        # 段落边界
        paragraph_pattern = r'\n\n+'
        for match in re.finditer(paragraph_pattern, text):
            boundaries.append(match.start())
        
        # 句子边界
        sentence_pattern = r'[。!?.!?]\s+'
        for match in re.finditer(sentence_pattern, text):
            if match.start() > 100 and match.start() < len(text) - 100:  # 避开首尾
                boundaries.append(match.start())
        
        return sorted(set(boundaries))

五、性能优化

5.1 并行处理

from concurrent.futures import ThreadPoolExecutor
import asyncio
 
class ParallelSlidingWindow:
    """并行滑动窗口处理"""
    
    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers
        
    def process_parallel(
        self,
        chunks: List[Chunk],
        process_func: callable
    ) -> List:
        """并行处理多个块"""
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            results = list(executor.map(process_func, chunks))
        return results
    
    async def process_async(
        self,
        chunks: List[Chunk],
        process_func: callable
    ) -> List:
        """异步处理多个块"""
        tasks = [process_func(chunk) for chunk in chunks]
        results = await asyncio.gather(*tasks)
        return results

5.2 缓存优化

from functools import lru_cache
 
class CachedSlidingWindow:
    """带缓存的滑动窗口"""
    
    def __init__(self, window_size: int, stride: int):
        self.window_size = window_size
        self.stride = stride
        self._cache = {}
        
    def _get_cache_key(self, text_hash: str, start: int, end: int) -> str:
        return f"{text_hash}_{start}_{end}"
    
    @lru_cache(maxsize=1000)
    def get_chunk_cached(self, text: str, start: int, end: int) -> str:
        """缓存的块获取"""
        return text[start:end]

六、实战配置建议

场景窗口大小步长重叠率策略
通用文档4000200050%固定
代码分析2000100050%语义感知
长篇小说8000600025%段落感知
法律文档3000150050%句子感知
对话历史2000150025%消息边界

七、相关主题

八、参考文献

  1. Beltagy, I., et al. (2020). Longformer: The Long-Document Transformer. arXiv.
  2. Child, R., et al. (2019). Generating Long Sequences with Sparse Transformers. arXiv.
  3. Zaheer, M., et al. (2020). Big Bird: Transformers for Longer Sequences. NeurIPS.