摘要

滑动窗口,听起来高大上,其实就是”移动的截取窗口”。这篇专门为零基础读者讲解:滑动窗口的核心原理、三种实现方式(简单滑动、平均注意力、语义分块)、与Transformer的结合、以及在流式处理中的应用。看完全篇,你不仅能理解原理,还能写出生产级的滑动窗口代码。

先用一个生活例子理解滑动窗口

想象你看电影

你在看电影,但只能一次看10分钟的片段:

电影总时长:60分钟
窗口大小:10分钟

第1次看:0-10分钟 ✓
第2次看:5-15分钟(滑动5分钟)
第3次看:10-20分钟(再滑动5分钟)
...

关键点:

  • 窗口大小固定:每次只看10分钟
  • 滑动步长可调:可以每次滑动5分钟、3分钟、甚至1分钟
  • 重叠区域:相邻窗口之间有重叠,这样可以保证连续性

滑动窗口在AI中的含义

在AI领域,滑动窗口解决的问题是:

问题:对话历史有100轮,怎么让AI看到所有历史?

解决:用滑动窗口,每次只看最近20轮

历史:[1][2][3]...[80][81][82]...[100]

窗口1:    [81][82]...[100]
窗口2:   [79][80]...[98]
窗口3:  [77][78]...[96]
...

一、滑动窗口的核心原理

基本概念

# 滑动窗口示意图
"""
原始序列:[A][B][C][D][E][F][G][H][I][J]
 
窗口大小 = 4
步长 = 2
 
窗口1:[A][B][C][D]      位置 0-4
窗口2:   [C][D][E][F]    位置 2-6
窗口3:      [E][F][G][H]  位置 4-8
窗口4:         [G][H][I][J] 位置 6-10
 
重叠:C D E F G H 在多个窗口中重复出现
"""

数学定义

滑动窗口可以用数学公式表达:

设序列为 S = [s₁, s₂, s₃, ..., sₙ]
窗口大小为 w
步长为 s

第i个窗口:
Wᵢ = [sᵢ, sᵢ₊₁, sᵢ₊₂, ..., sᵢ₊w₋₁]

窗口数量:
N = floor((n - w) / s) + 1

三个关键参数

参数含义选择建议
窗口大小(w)每次处理多少个token取决于模型限制和任务需求
步长(s)每次滑动多少小=细节多/计算大,大=计算快/可能丢信息
重叠率(w-s)/w一般50%-75%效果较好

二、三种滑动窗口实现

方式1:简单滑动窗口(最基础)

from typing import List, Any, Iterator
 
class SimpleSlidingWindow:
    """简单滑动窗口 - 最基础的实现"""
    
    def __init__(self, window_size: int, step: int = 1):
        """
        Args:
            window_size: 窗口大小
            step: 步长(滑动距离)
        """
        self.window_size = window_size
        self.step = step
    
    def create_windows(self, data: List[Any]) -> List[List[Any]]:
        """创建所有窗口"""
        windows = []
        start = 0
        
        while start + self.window_size <= len(data):
            window = data[start:start + self.window_size]
            windows.append(window)
            start += self.step
        
        return windows
    
    def windows_iter(self, data: List[Any]) -> Iterator[List[Any]]:
        """迭代器版本(节省内存)"""
        start = 0
        
        while start + self.window_size <= len(data):
            yield data[start:start + self.window_size]
            start += self.step
    
    def get_windows_with_position(self, data: List[Any]) -> List[tuple]:
        """获取窗口及其位置信息"""
        windows = []
        start = 0
        
        while start + self.window_size <= len(data):
            window = data[start:start + self.window_size]
            windows.append({
                'window': window,
                'start_idx': start,
                'end_idx': start + self.window_size,
                'window_id': len(windows)
            })
            start += self.step
        
        return windows
 
 
# 使用示例
if __name__ == "__main__":
    messages = [f"消息{i}" for i in range(1, 21)]
    
    sw = SimpleSlidingWindow(window_size=5, step=2)
    
    print(f"原始数据:{len(messages)}条消息")
    print("=" * 50)
    
    for w in sw.create_windows(messages):
        print(w)

方式2:重叠率滑动窗口(更常用)

class OverlapSlidingWindow:
    """重叠式滑动窗口 - 保持上下文连续性"""
    
    def __init__(
        self,
        window_size: int,
        overlap_ratio: float = 0.5
    ):
        """
        Args:
            window_size: 窗口大小
            overlap_ratio: 重叠比例(0-1之间)
        """
        self.window_size = window_size
        self.overlap_ratio = overlap_ratio
        self.step = int(window_size * (1 - overlap_ratio))
    
    def create_windows(self, data: List[Any]) -> List[tuple]:
        """创建带重叠的窗口"""
        windows = []
        
        if len(data) <= self.window_size:
            return [(0, data)]
        
        start = 0
        while start + self.window_size < len(data):
            end = start + self.window_size
            windows.append((start, data[start:end]))
            start += self.step
        
        # 最后一个窗口:包含所有剩余数据
        remaining = data[start:]
        if remaining:
            windows.append((start, remaining))
        
        return windows
    
    def get_overlap_pairs(self, data: List[Any]) -> List[tuple]:
        """获取相邻窗口之间的重叠部分"""
        pairs = []
        windows = self.create_windows(data)
        
        for i in range(len(windows) - 1):
            current_window = windows[i][1]
            next_window = windows[i + 1][1]
            
            # 计算重叠部分
            overlap_size = self.window_size - self.step
            overlap = current_window[-overlap_size:]
            
            pairs.append({
                'window_id': i,
                'overlap': overlap,
                'non_overlap': next_window[overlap_size:]
            })
        
        return pairs
    
    def merge_overlapping_context(
        self,
        windows: List[tuple],
        merge_strategy: str = "last"
    ) -> List[Any]:
        """
        合并重叠窗口
        merge_strategy: 'last'|'first'|'combine'
        """
        if merge_strategy == "last":
            # 保留最后部分(更新鲜)
            return [w[-1] for _, w in windows]
        
        elif merge_strategy == "first":
            # 保留最前部分(更完整)
            return [w[0] for _, w in windows]
        
        elif merge_strategy == "combine":
            # 合并所有
            result = []
            for _, w in windows:
                result.extend(w)
            return result
        
        raise ValueError(f"Unknown merge strategy: {merge_strategy}")
 
 
# 使用示例
if __name__ == "__main__":
    # 模拟100轮对话
    messages = [f"对话{i}" for i in range(1, 101)]
    
    sw = OverlapSlidingWindow(
        window_size=20,
        overlap_ratio=0.5  # 50%重叠
    )
    
    windows = sw.create_windows(messages)
    print(f"原始数据:{len(messages)}条")
    print(f"窗口数量:{len(windows)}")
    print(f"每个窗口大小:{sw.window_size}")
    print(f"步长:{sw.step}")
    print(f"重叠:{sw.window_size - sw.step}条")

方式3:语义滑动窗口(最智能)

简单滑动窗口的缺点是:可能在句子中间截断!

class SemanticSlidingWindow:
    """
    语义滑动窗口 - 按语义边界切分
    不会在句子中间截断
    """
    
    def __init__(
        self,
        max_tokens: int,
        overlap_tokens: int = 200,
        tokenizer=None
    ):
        self.max_tokens = max_tokens
        self.overlap_tokens = overlap_tokens
        self.tokenizer = tokenizer or self._simple_tokenizer()
    
    @staticmethod
    def _simple_tokenizer():
        """简单token估算器"""
        def tokenize(text: str) -> int:
            chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
            english = len(text.split()) - chinese
            return chinese + english
        return tokenize
    
    def create_chunks(self, text: str) -> List[dict]:
        """按段落/句子切分,保持语义完整"""
        # 1. 先按段落分割
        paragraphs = text.split('\n\n')
        
        chunks = []
        current_chunk = []
        current_tokens = 0
        
        for para in paragraphs:
            para_tokens = self.tokenizer(para)
            
            # 如果单个段落就超限,按句子分割
            if para_tokens > self.max_tokens:
                sentences = para.split('。')
                for sent in sentences:
                    if not sent.strip():
                        continue
                    sent_tokens = self.tokenizer(sent)
                    
                    if current_tokens + sent_tokens > self.max_tokens:
                        if current_chunk:
                            chunks.append({
                                'content': '。'.join(current_chunk) + '。',
                                'tokens': current_tokens
                            })
                        current_chunk = [sent]
                        current_tokens = sent_tokens
                    else:
                        current_chunk.append(sent)
                        current_tokens += sent_tokens
            
            # 普通段落处理
            elif current_tokens + para_tokens > self.max_tokens:
                # 保存当前chunk
                if current_chunk:
                    chunks.append({
                        'content': '\n\n'.join(current_chunk),
                        'tokens': current_tokens
                    })
                
                # 开始新chunk,保留重叠
                if self.overlap_tokens > 0 and current_chunk:
                    # 把最后的段落加入新chunk(作为重叠)
                    overlap_text = '\n\n'.join(current_chunk[-1:])
                    overlap_tokens = self.tokenizer(overlap_text)
                    if overlap_tokens <= self.overlap_tokens:
                        current_chunk = [overlap_text]
                        current_tokens = overlap_tokens
                    else:
                        current_chunk = []
                        current_tokens = 0
                else:
                    current_chunk = []
                    current_tokens = 0
                
                current_chunk.append(para)
                current_tokens += para_tokens
            
            else:
                current_chunk.append(para)
                current_tokens += para_tokens
        
        # 处理最后一个chunk
        if current_chunk:
            chunks.append({
                'content': '\n\n'.join(current_chunk),
                'tokens': current_tokens
            })
        
        return chunks
    
    def create_windows_with_semantic_boundaries(
        self,
        data: List[dict],
        text_key: str = 'text'
    ) -> List[List[dict]]:
        """
        按语义边界创建窗口
        data: 包含文本的字典列表
        """
        windows = []
        current_window = []
        current_tokens = 0
        
        for item in data:
            text = item.get(text_key, '')
            tokens = self.tokenizer(text)
            
            if current_tokens + tokens > self.max_tokens:
                if current_window:
                    windows.append(current_window)
                
                # 保留重叠
                if self.overlap_tokens > 0 and current_window:
                    overlap_size = self._calculate_overlap_size(current_window)
                    current_window = current_window[-overlap_size:]
                    current_tokens = sum(
                        self.tokenizer(i.get(text_key, ''))
                        for i in current_window
                    )
                else:
                    current_window = []
                    current_tokens = 0
            
            current_window.append(item)
            current_tokens += tokens
        
        if current_window:
            windows.append(current_window)
        
        return windows
    
    def _calculate_overlap_size(self, window: List[dict]) -> int:
        """计算重叠项数量"""
        overlap_tokens = 0
        for i, item in enumerate(reversed(window)):
            text = item.get('text', '')
            overlap_tokens += self.tokenizer(text)
            if overlap_tokens >= self.overlap_tokens:
                return i + 1
        return len(window)
 
 
# 使用示例
if __name__ == "__main__":
    # 模拟文档
    document = """
    第一段内容。这是一个完整的段落,介绍了基本概念。
    
    第二段内容。这里详细解释了原理和实现方式。
    
    第三段内容。包含了代码示例和运行结果。
    
    第四段内容。继续深入讨论相关话题。
    
    第五段内容。总结和展望。
    """ * 20  # 放大文档
    
    sw = SemanticSlidingWindow(max_tokens=100)
    chunks = sw.create_chunks(document)
    
    print(f"文档总长度:{len(document)}字符")
    print(f"分块数量:{len(chunks)}")
    for i, chunk in enumerate(chunks):
        print(f"块{i+1}{chunk['tokens']}tokens")

三、滑动窗口在LLM中的应用

应用场景1:长文档处理

class LongDocumentProcessor:
    """长文档处理 - 使用滑动窗口"""
    
    def __init__(
        self,
        llm_client,
        window_size: int = 4000,
        overlap_tokens: int = 500
    ):
        self.llm = llm_client
        self.window_size = window_size
        self.overlap_tokens = overlap_tokens
    
    def process_document(self, document: str, query: str) -> str:
        """
        处理长文档并回答问题
        """
        sw = SemanticSlidingWindow(
            max_tokens=self.window_size,
            overlap_tokens=self.overlap_tokens
        )
        
        chunks = sw.create_chunks(document)
        
        # 并行处理每个chunk
        chunk_answers = []
        for i, chunk in enumerate(chunks):
            prompt = f"""基于以下内容片段,回答问题。
 
内容:
{chunk['content']}
 
问题:{query}
 
如果这段内容能回答问题,给出答案。如果不能,说明"这段内容中没有答案"。"""
 
            answer = self.llm.generate(prompt)
            chunk_answers.append({
                'chunk_id': i,
                'answer': answer,
                'has_answer': "没有答案" not in answer
            })
        
        # 合并答案
        relevant_answers = [
            a for a in chunk_answers
            if a['has_answer']
        ]
        
        if not relevant_answers:
            return "抱歉,文档中没有找到答案。"
        
        # 整合答案
        final_prompt = f"""问题:{query}
 
以下是各部分的相关答案:
{chr(10).join([a['answer'] for a in relevant_answers])}
 
请整合这些答案,给出完整回答。"""
 
        return self.llm.generate(final_prompt)

应用场景2:对话历史管理

class ConversationSlidingWindow:
    """对话滑动窗口 - 管理长对话"""
    
    def __init__(
        self,
        llm_client,
        max_tokens: int = 8000,
        overlap_tokens: int = 500
    ):
        self.llm = llm_client
        self.max_tokens = max_tokens
        self.overlap_tokens = overlap_tokens
        self.messages = []
    
    def add_message(self, role: str, content: str):
        """添加消息"""
        self.messages.append({
            'role': role,
            'content': content,
            'timestamp': datetime.now().isoformat()
        })
    
    def get_context(self, current_query: str) -> str:
        """
        获取上下文 - 使用滑动窗口
        """
        # 将消息转换为文本
        text_content = self._messages_to_text(self.messages)
        
        # 如果总长度合适,直接返回
        if self._estimate_tokens(text_content) <= self.max_tokens:
            return text_content
        
        # 使用滑动窗口
        sw = SemanticSlidingWindow(
            max_tokens=self.max_tokens - self._estimate_tokens(current_query),
            overlap_tokens=self.overlap_tokens
        )
        
        # 按语义分块
        chunks = sw.create_chunks(text_content)
        
        if not chunks:
            return self._messages_to_text(self.messages[-5:])
        
        # 返回最近的chunk
        return chunks[-1]['content']
    
    def get_context_with_history_summary(self, current_query: str) -> str:
        """
        获取上下文 - 保留历史摘要
        """
        total_tokens = self._estimate_tokens(
            self._messages_to_text(self.messages)
        )
        
        if total_tokens <= self.max_tokens:
            return self._messages_to_text(self.messages)
        
        # 计算需要摘要的部分
        window_tokens = self.max_tokens - self.overlap_tokens
        
        # 提取需要摘要的历史
        history = self.messages[:-10]  # 保留最近10条
        recent = self.messages[-10:]
        
        if not history:
            return self._messages_to_text(recent)
        
        # 摘要历史
        history_text = self._messages_to_text(history)
        summary_prompt = f"""请总结以下对话的核心内容,保留关键信息:
 
{history_text}
 
摘要要求:
1. 保留主要话题和决定
2. 保留用户的核心需求
3. 删除重复和细节
4. 控制在200字以内"""
 
        summary = self.llm.generate(summary_prompt)
        
        # 构建最终上下文
        recent_text = self._messages_to_text(recent)
        
        return f"[早期对话摘要]\n{summary}\n\n[近期对话]\n{recent_text}"
    
    def _messages_to_text(self, messages: list) -> str:
        """消息转文本"""
        return '\n'.join([f"{m['role']}: {m['content']}" for m in messages])
    
    @staticmethod
    def _estimate_tokens(text: str) -> int:
        chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
        english = len(text.split()) - chinese
        return int(chinese * 0.5 + english * 0.25)

应用场景3:流式数据处理

class StreamSlidingWindow:
    """流式滑动窗口 - 处理实时数据"""
    
    def __init__(
        self,
        window_size: int = 10,
        step: int = 5,
        aggregate_fn: callable = None
    ):
        self.window_size = window_size
        self.step = step
        self.aggregate_fn = aggregate_fn or (lambda x: x[-1])
        self.buffer = []
    
    def add(self, item: Any) -> Optional[Any]:
        """
        添加数据,返回窗口结果(如果有)
        """
        self.buffer.append(item)
        
        # 如果窗口满了
        if len(self.buffer) >= self.window_size:
            window = self.buffer[:self.window_size]
            result = self.aggregate_fn(window)
            
            # 滑动
            self.buffer = self.buffer[self.step:]
            
            return result
        
        return None
    
    def add_batch(self, items: List[Any]) -> List[Any]:
        """批量添加"""
        results = []
        for item in items:
            result = self.add(item)
            if result is not None:
                results.append(result)
        return results
    
    def get_current_buffer(self) -> List[Any]:
        """获取当前缓冲区"""
        return self.buffer.copy()
 
 
# 使用示例
class StreamingTextProcessor:
    """流式文本处理"""
    
    def __init__(self, llm_client):
        self.llm = llm_client
        self.window = StreamSlidingWindow(
            window_size=20,
            step=10,
            aggregate_fn=lambda msgs: '\n'.join(msgs)
        )
    
    def process_stream(self, text_stream: Iterator[str]) -> Iterator[str]:
        """处理文本流"""
        for chunk in text_stream:
            result = self.window.add(chunk)
            if result:
                # 处理窗口内容
                yield self._process_window(result)
    
    def _process_window(self, text: str) -> str:
        """处理窗口内容"""
        prompt = f"分析以下文本,提取关键信息:\n\n{text}"
        return self.llm.generate(prompt)

四、生产级滑动窗口管理器

class ProductionSlidingWindow:
    """生产级滑动窗口管理器"""
    
    def __init__(
        self,
        llm_client,
        window_size: int = 4000,
        overlap_tokens: int = 500,
        strategy: str = "semantic"
    ):
        self.llm = llm_client
        self.window_size = window_size
        self.overlap_tokens = overlap_tokens
        self.strategy = strategy
        
        # 根据策略选择实现
        if strategy == "simple":
            self.processor = SimpleSlidingWindow(window_size, overlap_tokens)
        elif strategy == "overlap":
            ratio = overlap_tokens / window_size if window_size > 0 else 0.5
            self.processor = OverlapSlidingWindow(window_size, ratio)
        elif strategy == "semantic":
            self.processor = SemanticSlidingWindow(window_size, overlap_tokens)
        else:
            raise ValueError(f"Unknown strategy: {strategy}")
    
    def create_windows(self, data: List[Any]) -> List[tuple]:
        """创建窗口"""
        if self.strategy == "simple":
            windows = self.processor.create_windows(data)
            return [(0, w) for w in windows]
        elif self.strategy == "overlap":
            return self.processor.create_windows(data)
        else:
            # semantic返回的是字典
            return [(i, w) for i, w in enumerate(data)]
    
    def aggregate_windows(
        self,
        windows: List[tuple],
        query: str = None
    ) -> Any:
        """
        聚合窗口结果
        """
        if not windows:
            return None
        
        # 如果只有一个窗口,直接返回
        if len(windows) == 1:
            return windows[0][1]
        
        # 如果有查询,使用相关性加权
        if query:
            return self._query_aware_aggregate(windows, query)
        
        # 默认:返回最后一个窗口
        return windows[-1][1]
    
    def _query_aware_aggregate(
        self,
        windows: List[tuple],
        query: str
    ) -> str:
        """根据查询相关性聚合"""
        # 计算每个窗口与查询的相关性
        scored = []
        for idx, window in windows:
            content = window if isinstance(window, str) else str(window)
            relevance = self._calculate_relevance(content, query)
            scored.append((relevance, idx, window))
        
        # 按相关性排序
        scored.sort(key=lambda x: x[0], reverse=True)
        
        # 整合前N个最相关的窗口
        top_n = min(3, len(scored))
        relevant_contents = [s[2] for s in scored[:top_n]]
        
        return '\n\n'.join(relevant_contents)
    
    def _calculate_relevance(self, content: str, query: str) -> float:
        """计算内容与查询的相关性"""
        # 简单实现:关键词重叠
        query_words = set(query.split())
        content_words = set(content.split())
        
        overlap = query_words & content_words
        return len(overlap) / len(query_words) if query_words else 0
 
 
# 使用示例
def demo():
    """演示滑动窗口的完整使用"""
    
    # 模拟100轮对话
    messages = [
        {'role': 'user', 'content': f'第{i}轮对话内容'}
        for i in range(100)
    ]
    
    # 使用不同策略
    strategies = ['simple', 'overlap', 'semantic']
    
    for strategy in strategies:
        print(f"\n{'='*50}")
        print(f"策略: {strategy}")
        print('='*50)
        
        manager = ProductionSlidingWindow(
            llm_client=llm,
            window_size=20,
            overlap_tokens=5,
            strategy=strategy
        )
        
        windows = manager.create_windows(messages)
        print(f"窗口数量: {len(windows)}")
        
        # 模拟查询
        query = "第50轮对话说了什么"
        result = manager.aggregate_windows(windows, query)
        print(f"查询结果长度: {len(str(result))}字符")

五、滑动窗口的调优指南

参数选择建议

┌─────────────────────────────────────────────────────────────┐
│                    滑动窗口参数调优                              │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  📊 窗口大小选择                                              │
│  ├─ 小(<2000 tokens):细节多,准确性高,但可能丢全局信息      │
│  ├─ 中(2000-4000 tokens):平衡的选择                        │
│  └─ 大(>4000 tokens):全局视野,但计算成本高                 │
│                                                              │
│  📐 重叠比例选择                                              │
│  ├─ 25%:计算效率高,但边缘信息可能丢失                        │
│  ├─ 50%:常用选择,平衡效率和信息完整                          │
│  └─ 75%:信息完整,但计算量大                                 │
│                                                              │
│  🎯 步长选择                                                  │
│  └─ 步长 = 窗口大小 × (1 - 重叠比例)                          │
│                                                              │
└─────────────────────────────────────────────────────────────┘

常见问题与解决

问题原因解决方案
边界信息丢失窗口截断在语义边界使用语义滑动窗口
重复处理窗口重叠记录已处理位置
内存爆炸窗口数量太多使用迭代器而不是列表
结果不一致随机性固定随机种子

六、实战:构建一个问答机器人

class SlidingWindowQABot:
    """使用滑动窗口的问答机器人"""
    
    def __init__(self, llm_client, max_context_tokens: int = 8000):
        self.llm = llm_client
        self.max_context_tokens = max_context_tokens
        self.conversation_history = []
    
    def ask(self, question: str) -> str:
        """问答"""
        # 1. 添加用户问题
        self.conversation_history.append({
            'role': 'user',
            'content': question
        })
        
        # 2. 构建上下文
        context = self._build_context(question)
        
        # 3. 生成回答
        prompt = f"""{context}
 
用户问题:{question}
请基于对话历史和上下文回答。"""
 
        answer = self.llm.generate(prompt)
        
        # 4. 保存回答
        self.conversation_history.append({
            'role': 'assistant',
            'content': answer
        })
        
        return answer
    
    def _build_context(self, current_question: str) -> str:
        """构建上下文"""
        # 将对话历史转为文本
        history_text = self._format_history()
        
        # 估算token
        history_tokens = self._estimate_tokens(history_text)
        question_tokens = self._estimate_tokens(current_question)
        
        # 如果历史太长,使用滑动窗口
        if history_tokens + question_tokens > self.max_context_tokens:
            sw = SemanticSlidingWindow(
                max_tokens=self.max_context_tokens - question_tokens,
                overlap_tokens=500
            )
            
            # 将历史按语义分块
            chunks = sw.create_chunks(history_text)
            
            # 返回最近的chunk
            if chunks:
                return chunks[-1]['content']
        
        return history_text
    
    def _format_history(self) -> str:
        return '\n'.join([
            f"{m['role']}: {m['content']}"
            for m in self.conversation_history
        ])
    
    @staticmethod
    def _estimate_tokens(text: str) -> int:
        chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
        english = len(text.split()) - chinese
        return int(chinese * 0.5 + english * 0.25)
 
 
# 使用示例
if __name__ == "__main__":
    bot = SlidingWindowQABot(llm=llm, max_context_tokens=8000)
    
    # 模拟多轮对话
    questions = [
        "我叫小明,喜欢Python编程",
        "帮我写一个排序算法",
        "用快速排序实现",
        "优化一下性能",
        "我叫小明,记得吗?"  # 测试记忆
    ]
    
    for q in questions:
        print(f"\n问题: {q}")
        answer = bot.ask(q)
        print(f"回答: {answer[:100]}...")

七、总结

┌─────────────────────────────────────────────────────────────┐
│                     滑动窗口核心要点                            │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  🎯 核心原理                                                │
│  └─ 固定大小 + 移动步长 = 连续覆盖长序列                      │
│                                                              │
│  🔧 三种实现                                                │
│  ├─ SimpleSlidingWindow:基础,适合规则数据                 │
│  ├─ OverlapSlidingWindow:保持连续性,常用                  │
│  └─ SemanticSlidingWindow:智能,适合文本                   │
│                                                              │
│  📊 参数选择                                                │
│  ├─ 窗口大小:取决于模型限制和任务需求                       │
│  ├─ 重叠比例:50%左右效果较好                              │
│  └─ 步长:窗口大小 × (1 - 重叠比例)                         │
│                                                              │
│  💡 应用场景                                                │
│  ├─ 长文档处理:分块 + 并行 + 合并                          │
│  ├─ 对话历史管理:滑动 + 摘要                               │
│  └─ 流式数据处理:实时窗口 + 聚合                           │
│                                                              │
│  ⚠️ 注意事项                                                │
│  ├─ 避免在语义边界截断                                      │
│  ├─ 边缘信息用重叠补救                                      │
│  └─ 大数据用迭代器节省内存                                  │
│                                                              │
└─────────────────────────────────────────────────────────────┘

相关主题


参考文献

  1. Beltagy, I., et al. (2020). Longformer: The Long-Document Transformer. arXiv.
  2. Zaheer, M., et al. (2020). Big Bird: Transformers for Longer Sequences. NeurIPS.
  3. Child, R., et al. (2019). Generating Long Sequences with Sparse Transformers. arXiv.