摘要

上下文压缩是解决LLM上下文窗口限制的核心技术。本文系统讲解LLM-based摘要压缩、规则-based压缩、选择性保留策略、语义压缩与截断的对比,以及实体保留策略,帮助在有限窗口内最大化信息价值。

关键词速览

术语英文说明
上下文压缩Context Compression减少上下文长度的技术
语义压缩Semantic Compression保留语义的压缩
规则压缩Rule-based Compression基于规则的压缩
实体保留Entity Preservation保留关键实体信息
摘要压缩Summarization-based基于摘要的压缩
选择性保留Selective Retention选择性保留重要内容
信息密度Information Density单位长度信息量
Token预算Token Budget可用的token数量
幻觉Hallucination生成内容与事实不符
压缩比Compression Ratio压缩前后比例

一、上下文压缩概述

1.1 为什么需要压缩

压缩前:
┌─────────────────────────────────────────────────────────────┐
│ 用户: 帮我分析这份100页的产品文档,找出所有功能点...         │
│                                                             │
│ 文档长度: 50,000 tokens                                      │
│ 模型限制: 128,000 tokens                                     │
│ 可用空间: 128,000 - 系统(2K) - 输出(4K) = 122,000 tokens     │
│                                                             │
│ 问题: 直接放入超出限制                                        │
└─────────────────────────────────────────────────────────────┘

压缩后:
┌─────────────────────────────────────────────────────────────┐
│ [压缩摘要]                                                   │
│ 产品X核心功能:                                                │
│ 1. 用户管理: 创建/编辑/删除/角色分配                          │
│ 2. 数据分析: 实时仪表盘/自定义报表/数据导出                    │
│ 3. 集成: REST API/Webhook/第三方登录                         │
│ 4. 安全: SSO/双因素认证/审计日志                             │
│                                                             │
│ 压缩后: 约800 tokens                                         │
│ 压缩比: 62:1                                                │
└─────────────────────────────────────────────────────────────┘

1.2 压缩方法对比

方法压缩比信息保留速度成本适用场景
规则压缩1.5-2x70-80%格式化文本
LLM摘要5-20x60-80%语义丰富内容
选择性保留3-10x80-90%精确检索
实体保留3-5x85-95%知识密集型
混合压缩5-30x50-75%复杂场景

二、LLM-based摘要压缩

2.1 基本摘要压缩

class LLMSummarizer:
    """基于LLM的摘要压缩"""
    
    def __init__(self, llm_client):
        self.llm = llm_client
    
    def compress(
        self,
        text: str,
        target_ratio: float = 0.2,
        preserve_key_points: bool = True
    ) -> str:
        """
        使用LLM生成摘要压缩
        
        Args:
            text: 原始文本
            target_ratio: 目标压缩比
            preserve_key_points: 是否保留关键点
        """
        target_words = int(len(text.split()) * target_ratio)
        
        prompt = f"""请压缩以下文本,保留核心信息。
 
要求:
1. 目标字数:约{target_words}
2. 保留所有关键事实和数据
3. 保留重要的观点和结论
4. 删除重复和冗余表述
5. 保持原文的语言风格
6. 保持内容的逻辑连贯性
 
原文:
{text}
 
压缩后的版本:"""
        
        return self.llm.generate(prompt).strip()
    
    def compress_for_task(
        self,
        text: str,
        task: str,
        preserve_aspects: List[str] = None
    ) -> str:
        """
        针对特定任务的压缩
        
        根据任务类型决定保留什么
        """
        prompt = f"""你是一个专业的文档压缩助手。
 
任务类型:{task}
{("关注方面:" + ", ".join(preserve_aspects)) if preserve_aspects else ""}
 
请压缩以下文档,只保留与任务相关的信息。
 
原文:
{text}
 
压缩要求:
1. 提取与任务相关的核心内容
2. 保留关键细节和数据
3. 删除无关的背景和描述
4. 保持结构的清晰性
 
压缩后:"""
        
        return self.llm.generate(prompt).strip()

2.2 迭代式摘要

class IterativeSummarizer:
    """迭代式摘要压缩"""
    
    def __init__(self, llm_client, max_iterations: int = 3):
        self.llm = llm_client
        self.max_iterations = max_iterations
    
    def compress_iteratively(
        self,
        text: str,
        max_tokens: int,
        strategy: str = "progressive"
    ) -> str:
        """
        迭代压缩直到达到目标长度
        
        strategy: 'progressive'(渐进) 或 'binary'(二分)
        """
        if strategy == "progressive":
            return self._progressive_compress(text, max_tokens)
        else:
            return self._binary_compress(text, max_tokens)
    
    def _progressive_compress(
        self,
        text: str,
        max_tokens: int
    ) -> str:
        """渐进式压缩"""
        current_text = text
        current_tokens = self._estimate_tokens(text)
        
        for iteration in range(self.max_iterations):
            if current_tokens <= max_tokens:
                return current_text
            
            # 每次压缩30%
            target_ratio = max_tokens / current_tokens * 1.2  # 多压缩一点
            
            prompt = f"""请压缩以下文本,保留约{target_ratio:.0%}的内容。
 
压缩原则:
1. 保留关键信息和核心观点
2. 删除冗余描述
3. 简化复杂句子
4. 保持逻辑连贯
 
原文(约{current_tokens}字):
{current_text}
 
压缩后:"""
            
            compressed = self.llm.generate(prompt).strip()
            current_tokens = self._estimate_tokens(compressed)
            
            # 如果压缩后变长,使用更激进的方式
            if len(compressed) > len(current_text) * 0.8:
                current_text = self._aggressive_compress(current_text, max_tokens)
                break
            
            current_text = compressed
        
        return current_text
    
    def _binary_compress(
        self,
        text: str,
        max_tokens: int
    ) -> str:
        """二分压缩"""
        low_ratio = 0.1
        high_ratio = 0.9
        
        best_result = text
        best_tokens = self._estimate_tokens(text)
        
        for _ in range(self.max_iterations):
            mid_ratio = (low_ratio + high_ratio) / 2
            
            prompt = f"""将以下文本压缩到约{mid_ratio:.0%}
 
{text}
 
压缩后:"""
            
            result = self.llm.generate(prompt).strip()
            tokens = self._estimate_tokens(result)
            
            if tokens <= max_tokens:
                best_result = result
                best_tokens = tokens
                high_ratio = mid_ratio
            else:
                low_ratio = mid_ratio
        
        return best_result
    
    def _aggressive_compress(self, text: str, max_tokens: int) -> str:
        """激进压缩(强制截断)"""
        words = text.split()
        target_words = max_tokens // 4  # 约4字/token
        
        if len(words) <= target_words:
            return text
        
        # 按句子压缩
        sentences = text.replace('。', '。|').split('|')
        selected = []
        current_words = 0
        
        for sent in sentences:
            sent_words = len(sent.split())
            if current_words + sent_words <= target_words:
                selected.append(sent)
                current_words += sent_words
            else:
                break
        
        return '。'.join(selected) + '。' if selected else text[:max_tokens*4]
    
    @staticmethod
    def _estimate_tokens(text: str) -> int:
        chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
        english = len(text.split()) - chinese
        return int(chinese * 0.5 + english * 0.25)

三、规则-based压缩

3.1 基础规则压缩

import re
 
class RuleBasedCompressor:
    """基于规则的上下文压缩器"""
    
    def __init__(self):
        # 停用词列表
        self.stopwords = {
            '的', '了', '是', '在', '和', '与', '等', '以及', '这', '那',
            '一个', '我们', '你们', '他们', '可以', '能够', '需要', '应该',
            '非常', '特别', '十分', '比较', '相当', '极其', '格外',
            '一般来说', '通常情况下', '一般来说', '基本上',
            '大家知道', '众所周知', '正如前面所述'
        }
        
        # 模板短语
        self.template_phrases = [
            r'^请注意',
            r'^下面(为大家|我们)?介绍',
            r'^首先(我们)?来看',
            r'^接下来',
            r'^综上所述',
            r'^总而言之',
            r'^事实上',
            r'^实际上'
        ]
        
        # 冗余模式
        self.redundant_patterns = [
            (r' {2,}', ' '),  # 多余空格
            (r'\n{3,}', '\n\n'),  # 多余换行
            (r'+', '。'),  # 多余句号
        ]
    
    def compress(self, text: str, level: str = "medium") -> str:
        """
        基于规则的压缩
        
        level: 'light', 'medium', 'aggressive'
        """
        # 1. 基础清理
        text = self._basic_cleanup(text)
        
        # 2. 移除模板短语
        text = self._remove_templates(text)
        
        # 3. 压缩句子
        if level == "light":
            pass
        elif level == "medium":
            text = self._remove_adverbials(text)
        else:
            text = self._aggressive_compress(text)
        
        return text.strip()
    
    def _basic_cleanup(self, text: str) -> str:
        """基础清理"""
        # 移除HTML
        text = re.sub(r'<[^>]+>', '', text)
        
        # 规范化空白
        text = re.sub(r'\t', ' ', text)
        text = re.sub(r' {2,}', ' ', text)
        text = re.sub(r'\n{3,}', '\n\n', text)
        
        return text
    
    def _remove_templates(self, text: str) -> str:
        """移除模板短语"""
        lines = text.split('\n')
        cleaned_lines = []
        
        for line in lines:
            # 检查是否是模板行
            is_template = False
            for pattern in self.template_phrases:
                if re.search(pattern, line):
                    is_template = True
                    break
            
            if not is_template:
                cleaned_lines.append(line)
        
        return '\n'.join(cleaned_lines)
    
    def _remove_adverbials(self, text: str) -> str:
        """移除副词性修饰"""
        adverbial_patterns = [
            r'非常\s+(?=[\u4e00-\u9fa5])',
            r'特别\s+(?=[\u4e00-\u9fa5])',
            r'十分\s+(?=[\u4e00-\u9fa5])',
            r'比较\s+(?=[\u4e00-\u9fa5])',
            r'相当\s+(?=[\u4e00-\u9fa5])',
        ]
        
        for pattern in adverbial_patterns:
            text = re.sub(pattern, '', text)
        
        return text
    
    def _aggressive_compress(self, text: str) -> str:
        """激进压缩"""
        # 移除停用词
        words = text.split()
        filtered = [w for w in words if w not in self.stopwords]
        
        # 重建文本
        return ' '.join(filtered)

3.2 句子级别压缩

class SentenceCompressor:
    """句子级别压缩"""
    
    def __init__(self, llm_client=None):
        self.llm = llm_client
    
    def compress_sentence(self, sentence: str) -> str:
        """压缩单个句子"""
        # 检测句子类型
        if self._is_simple_statement(sentence):
            return sentence
        
        if self._is_compound_sentence(sentence):
            return self._compress_compound(sentence)
        
        if self._has_redundant_modifier(sentence):
            return self._remove_redundant(sentence)
        
        return sentence
    
    def compress_text_by_sentence(
        self,
        text: str,
        preserve_ratio: float = 0.7
    ) -> str:
        """按句子压缩文本"""
        sentences = re.split(r'[。!?\n]', text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        # 评分每个句子
        scored = []
        for sent in sentences:
            score = self._score_sentence(sent)
            scored.append((score, sent))
        
        # 按分数排序
        scored.sort(key=lambda x: x[0], reverse=True)
        
        # 选择保留的句子
        n_keep = max(1, int(len(sentences) * preserve_ratio))
        
        # 重新按原文顺序排列
        selected_indices = set()
        for score, sent in scored[:n_keep]:
            for i, s in enumerate(sentences):
                if s == sent and i not in selected_indices:
                    selected_indices.add(i)
                    break
        
        selected = [sentences[i] for i in sorted(selected_indices)]
        
        return '。'.join(selected) + '。' if selected else text
    
    def _score_sentence(self, sentence: str) -> float:
        """句子重要性评分"""
        score = 0.0
        
        # 长度分数
        length = len(sentence)
        if 20 <= length <= 100:
            score += 0.3
        elif length > 100:
            score -= 0.1
        
        # 关键词分数
        keywords = ['关键', '重要', '核心', '主要', '必须', '建议', '特点', '功能']
        for kw in keywords:
            if kw in sentence:
                score += 0.2
        
        # 数字分数
        if re.search(r'\d+', sentence):
            score += 0.1
        
        # 数据词分数
        data_words = ['数据', '统计', '分析', '结果', '显示']
        for dw in data_words:
            if dw in sentence:
                score += 0.15
        
        return score
    
    def _is_simple_statement(self, sentence: str) -> bool:
        """判断是否简单陈述"""
        # 简单判断:长度短且无复杂结构
        return len(sentence) < 30 and ',' not in sentence
    
    def _is_compound_sentence(self, sentence: str) -> bool:
        """判断是否复合句"""
        return ',' in sentence or ('并且' in sentence) or ('但是' in sentence)
    
    def _compress_compound(self, sentence: str) -> str:
        """压缩复合句"""
        # 保留主句,简化从句
        parts = sentence.split(',')
        
        if len(parts) <= 2:
            return sentence
        
        # 保留首尾,简化中间
        if len(parts) > 3:
            parts = [parts[0], '...', parts[-1]]
        
        return ','.join(parts)
    
    def _has_redundant_modifier(self, sentence: str) -> bool:
        """检查是否有冗余修饰"""
        redundant = ['非常', '特别', '十分', '相当', '极其']
        return any(r in sentence for r in redundant)
    
    def _remove_redundant(self, sentence: str) -> str:
        """移除冗余修饰"""
        for r in ['非常', '特别', '十分', '相当']:
            sentence = sentence.replace(r, '')
        return sentence

四、选择性保留策略

4.1 重要性打分

class ImportanceScorer:
    """内容重要性评估器"""
    
    def __init__(self, embedding_model=None, llm_client=None):
        self.embedding = embedding_model
        self.llm = llm_client
    
    def score_chunks(
        self,
        chunks: List[Dict],
        query: str = None
    ) -> List[Dict]:
        """
        对chunk进行重要性评分
        
        返回带有score的chunk列表
        """
        for chunk in chunks:
            chunk['importance_score'] = self._calculate_score(
                chunk,
                query
            )
        
        return sorted(chunks, key=lambda x: x['importance_score'], reverse=True)
    
    def _calculate_score(self, chunk: Dict, query: str = None) -> float:
        """计算综合分数"""
        scores = []
        
        # 1. 关键词匹配分数
        keyword_score = self._keyword_match_score(chunk, query)
        scores.append(('keyword', keyword_score, 0.3))
        
        # 2. 语义相关分数
        semantic_score = self._semantic_similarity_score(chunk, query)
        scores.append(('semantic', semantic_score, 0.4))
        
        # 3. 位置分数
        position_score = self._position_score(chunk)
        scores.append(('position', position_score, 0.1))
        
        # 4. 内容质量分数
        quality_score = self._content_quality_score(chunk)
        scores.append(('quality', quality_score, 0.2))
        
        # 加权求和
        total_score = sum(score * weight for _, score, weight in scores)
        
        return total_score
    
    def _keyword_match_score(self, chunk: Dict, query: str) -> float:
        """关键词匹配分数"""
        if not query:
            return 0.5
        
        query_terms = set(query.lower().split())
        content_terms = set(chunk.get('content', '').lower().split())
        
        if not query_terms:
            return 0.5
        
        overlap = len(query_terms & content_terms)
        return overlap / len(query_terms)
    
    def _semantic_similarity_score(self, chunk: Dict, query: str) -> float:
        """语义相似度分数"""
        if not query or not self.embedding:
            return 0.5
        
        try:
            query_emb = self.embedding.encode(query)
            content_emb = self.embedding.encode(chunk['content'])
            return self._cosine_similarity(query_emb, content_emb)
        except:
            return 0.5
    
    def _position_score(self, chunk: Dict) -> float:
        """位置分数(开头和结尾略高)"""
        position = chunk.get('position', 0.5)
        
        # U形曲线:开头和结尾更重要
        if position < 0.2:
            return 0.8
        elif position > 0.8:
            return 0.7
        elif 0.4 < position < 0.6:
            return 0.4
        else:
            return 0.5
    
    def _content_quality_score(self, chunk: Dict) -> float:
        """内容质量分数"""
        content = chunk.get('content', '')
        
        score = 0.5
        
        # 长度适中最佳
        length = len(content)
        if 100 < length < 1000:
            score += 0.2
        elif length <= 100:
            score += 0.1
        else:
            score -= 0.1
        
        # 包含数字/日期
        if re.search(r'\d+', content):
            score += 0.1
        
        # 包含关键术语
        key_terms = ['功能', '特点', '规格', '参数', '方法', '步骤']
        for term in key_terms:
            if term in content:
                score += 0.05
        
        return min(max(score, 0), 1)
    
    @staticmethod
    def _cosine_similarity(a, b):
        dot = sum(x * y for x, y in zip(a, b))
        norm_a = sum(x * x for x in a) ** 0.5
        norm_b = sum(x * x for x in b) ** 0.5
        return dot / (norm_a * norm_b + 1e-8)

4.2 选择性保留实现

class SelectiveRetention:
    """选择性保留压缩器"""
    
    def __init__(
        self,
        importance_scorer: ImportanceScorer,
        llm_client=None
    ):
        self.scorer = importance_scorer
        self.llm = llm_client
    
    def compress_to_budget(
        self,
        chunks: List[Dict],
        max_tokens: int,
        query: str = None
    ) -> str:
        """
        在token预算内选择性保留内容
        """
        # 1. 评分
        scored_chunks = self.scorer.score_chunks(chunks, query)
        
        # 2. 贪心选择
        selected = []
        current_tokens = 0
        
        for chunk in scored_chunks:
            chunk_tokens = self._estimate_tokens(chunk['content'])
            
            if current_tokens + chunk_tokens <= max_tokens:
                selected.append(chunk)
                current_tokens += chunk_tokens
            elif chunk['importance_score'] > 0.7:
                # 高重要性内容,尝试压缩
                compressed = self._compress_chunk(chunk['content'], max_tokens - current_tokens)
                if compressed:
                    selected.append({**chunk, 'content': compressed})
                    current_tokens += self._estimate_tokens(compressed)
        
        # 3. 按原文顺序重排
        selected.sort(key=lambda x: x.get('position', 1))
        
        # 4. 合并
        return '\n\n'.join([c['content'] for c in selected])
    
    def compress_with_overlap(
        self,
        chunks: List[Dict],
        max_tokens: int,
        overlap_ratio: float = 0.2
    ) -> str:
        """
        带重叠的选择性保留
        保留高重要性内容的同时确保覆盖连续性
        """
        # 评分
        scored_chunks = self.scorer.score_chunks(chunks)
        
        selected = []
        current_tokens = 0
        
        i = 0
        while i < len(scored_chunks) and current_tokens < max_tokens:
            chunk = scored_chunks[i]
            chunk_tokens = self._estimate_tokens(chunk['content'])
            
            if current_tokens + chunk_tokens <= max_tokens:
                selected.append(chunk)
                current_tokens += chunk_tokens
                
                # 如果是高重要性,跳过一些中间内容
                if chunk['importance_score'] > 0.8:
                    i += 3  # 跳过2个
                else:
                    i += 1
            else:
                i += 1
        
        # 按顺序重排
        selected.sort(key=lambda x: x.get('position', 1))
        
        return '\n\n'.join([c['content'] for c in selected])
    
    def _compress_chunk(self, content: str, max_tokens: int) -> str:
        """压缩单个chunk"""
        if self.llm:
            prompt = f"""将以下内容压缩到约{max_tokens}字:
 
{content}
 
压缩后:"""
            return self.llm.generate(prompt).strip()
        return None
    
    @staticmethod
    def _estimate_tokens(text: str) -> int:
        chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
        english = len(text.split()) - chinese
        return int(chinese * 0.5 + english * 0.25)

五、实体保留策略

5.1 实体识别与保留

import re
from dataclasses import dataclass
 
@dataclass
class Entity:
    """实体"""
    text: str
    entity_type: str
    start: int
    end: int
 
class EntityPreservingCompressor:
    """实体保留压缩器"""
    
    def __init__(self, llm_client=None):
        self.llm = llm_client
        
        # 实体类型正则
        self.entity_patterns = {
            'date': r'\d{4}[-/年]\d{1,2}[-/月]\d{1,2}?',
            'time': r'\d{1,2}[时点]\d{0,2}?',
            'number': r'\d+(?:\.\d+)?(?:[万亿万]?|[%%])?',
            'email': r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
            'url': r'https?://[^\s]+',
            'phone': r'\d{3,4}[-]?\d{7,8}',
            'person': r'[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*',  # 简化
            'organization': r'(?:公司|集团|机构|医院|学校|银行)[^\s,。]{0,20}',
        }
    
    def extract_entities(self, text: str) -> List[Entity]:
        """提取所有实体"""
        entities = []
        
        for entity_type, pattern in self.entity_patterns.items():
            for match in re.finditer(pattern, text):
                entities.append(Entity(
                    text=match.group(),
                    entity_type=entity_type,
                    start=match.start(),
                    end=match.end()
                ))
        
        return entities
    
    def compress_preserving_entities(
        self,
        text: str,
        target_tokens: int,
        preserve_entity_types: List[str] = None
    ) -> str:
        """
        压缩文本但保留实体
        
        策略:
        1. 提取所有实体
        2. 对非实体部分进行压缩
        3. 重新组装
        """
        if preserve_entity_types is None:
            preserve_entity_types = list(self.entity_patterns.keys())
        
        # 1. 提取实体
        entities = self.extract_entities(text)
        
        # 2. 构建实体映射
        entity_map = {e.text: e for e in entities if e.entity_type in preserve_entity_types}
        
        # 3. 替换实体为占位符
        masked_text = text
        entity_placeholders = {}
        
        for i, (entity_text, entity) in enumerate(entity_map.items()):
            placeholder = f"[ENTITY_{i}]"
            masked_text = masked_text.replace(entity_text, placeholder)
            entity_placeholders[placeholder] = entity_text
        
        # 4. 压缩非实体部分
        if self.llm:
            compressed = self.llm.summarize(masked_text, target_tokens)
        else:
            compressed = self._rule_based_compress(masked_text, target_tokens)
        
        # 5. 恢复实体
        for placeholder, entity_text in entity_placeholders.items():
            compressed = compressed.replace(placeholder, entity_text)
        
        return compressed
    
    def _rule_based_compress(self, text: str, target_tokens: int) -> str:
        """规则基础压缩"""
        current_tokens = self._estimate_tokens(text)
        
        while current_tokens > target_tokens:
            # 移除副词
            text = re.sub(r'[非常特别十分相当极其]', '', text)
            
            # 移除模板短语
            text = re.sub(r'^(请注意|下面|首先)[,,\s]+', '', text, flags=re.MULTILINE)
            
            current_tokens = self._estimate_tokens(text)
        
        return text
    
    @staticmethod
    def _estimate_tokens(text: str) -> int:
        chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
        english = len(text.split()) - chinese
        return int(chinese * 0.5 + english * 0.25)

5.2 实体保留的RAG应用

class EntityAwareRAG:
    """实体感知的RAG压缩"""
    
    def __init__(self, compressor: EntityPreservingCompressor):
        self.compressor = compressor
    
    def compress_for_rag(
        self,
        retrieved_chunks: List[Dict],
        query: str,
        max_tokens: int
    ) -> str:
        """
        为RAG场景压缩检索结果
        
        保留:
        - 与查询相关的实体
        - 关键事实和数据
        - 核心观点
        """
        # 1. 识别查询中的实体
        query_entities = self.compressor.extract_entities(query)
        
        # 2. 合并所有chunk
        combined_text = '\n\n'.join([c['content'] for c in retrieved_chunks])
        
        # 3. 提取所有实体
        all_entities = self.compressor.extract_entities(combined_text)
        
        # 4. 确定需要保留的实体
        preserve_types = {'date', 'number', 'person', 'organization'}
        
        # 5. 压缩
        compressed = self.compressor.compress_preserving_entities(
            combined_text,
            max_tokens,
            preserve_entity_types=list(preserve_types)
        )
        
        return compressed

六、混合压缩策略

6.1 自适应压缩

class AdaptiveCompressor:
    """自适应压缩器"""
    
    def __init__(
        self,
        rule_compressor: RuleBasedCompressor,
        llm_compressor: LLMSummarizer,
        entity_compressor: EntityPreservingCompressor
    ):
        self.rule = rule_compressor
        self.llm = llm_compressor
        self.entity = entity_compressor
    
    def compress(
        self,
        text: str,
        max_tokens: int,
        context_type: str = "general"
    ) -> str:
        """
        自适应压缩
        
        根据文本类型选择最优压缩策略
        """
        text_tokens = self._estimate_tokens(text)
        target_ratio = max_tokens / text_tokens
        
        if context_type == "code":
            return self._compress_code(text, max_tokens)
        elif context_type == "structured":
            return self._compress_structured(text, max_tokens)
        elif context_type == "narrative":
            return self._compress_narrative(text, max_tokens, target_ratio)
        else:
            return self._compress_general(text, max_tokens, target_ratio)
    
    def _compress_code(self, text: str, max_tokens: int) -> str:
        """代码压缩:保留结构,简化注释"""
        lines = text.split('\n')
        selected = []
        current_tokens = 0
        
        for line in lines:
            line_tokens = self._estimate_tokens(line)
            
            # 保留函数定义、关键逻辑
            if line.strip().startswith(('def ', 'class ', 'import ', 'from ')):
                selected.append(line)
                current_tokens += line_tokens
            elif current_tokens + line_tokens <= max_tokens:
                # 保留部分注释
                if '#' in line or not line.strip().startswith('#'):
                    selected.append(line)
                    current_tokens += line_tokens
        
        return '\n'.join(selected)
    
    def _compress_structured(self, text: str, max_tokens: int) -> str:
        """结构化文本:保留表格和列表"""
        # 优先保留表格
        tables = re.findall(r'\|.+\|\n\|[-|]+\|\n(?:\|.+\|\n)+', text)
        
        result_parts = []
        current_tokens = 0
        
        # 添加表格
        for table in tables:
            table_tokens = self._estimate_tokens(table)
            if current_tokens + table_tokens <= max_tokens:
                result_parts.append(table)
                current_tokens += table_tokens
        
        # 添加剩余文本
        remaining = re.sub(r'\|.+\|\n\|[-|]+\|\n(?:\|.+\|\n)+', '', text)
        remaining_tokens = self._estimate_tokens(remaining)
        
        if remaining_tokens <= max_tokens - current_tokens:
            result_parts.append(remaining)
        else:
            # 压缩剩余文本
            compressed = self.rule.compress(remaining, level="medium")
            result_parts.append(compressed)
        
        return '\n\n'.join(result_parts)
    
    def _compress_narrative(self, text: str, max_tokens: int, target_ratio: float) -> str:
        """叙述文本:LLM摘要为主"""
        if target_ratio > 0.5:
            return self.rule.compress(text, level="medium")
        else:
            return self.llm.compress(text, target_ratio=target_ratio)
    
    def _compress_general(self, text: str, max_tokens: int, target_ratio: float) -> str:
        """通用压缩"""
        # 两阶段压缩
        # 第一阶段:规则压缩
        rule_compressed = self.rule.compress(text, level="light")
        
        rule_tokens = self._estimate_tokens(rule_compressed)
        
        if rule_tokens <= max_tokens:
            return rule_compressed
        
        # 第二阶段:LLM压缩
        return self.llm.compress(rule_compressed, target_ratio=target_ratio)
    
    @staticmethod
    def _estimate_tokens(text: str) -> int:
        chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
        english = len(text.split()) - chinese
        return int(chinese * 0.5 + english * 0.25)

七、实战配置模板

COMPRESSION_CONFIGS = {
    'quick_response': {
        'method': 'rule',
        'level': 'light',
        'target_ratio': 0.7
    },
    'balanced': {
        'method': 'hybrid',
        'stages': ['rule_light', 'llm_summary'],
        'target_ratio': 0.5
    },
    'aggressive': {
        'method': 'llm',
        'level': 'progressive',
        'target_ratio': 0.2
    },
    'entity_preserving': {
        'method': 'entity',
        'preserve_types': ['date', 'number', 'person', 'organization'],
        'target_ratio': 0.4
    },
    'code_context': {
        'method': 'code_aware',
        'preserve_definitions': True,
        'simplify_comments': True
    }
}

八、相关主题

九、参考文献

  1. Jiang, H., et al. (2023). RECOMP: Improving Language Model Completion with Extractive and Generative Summaries.
  2. Xu, F., et al. (2023). LLMLingua: Compressing Prompts for Accelerated Inference of Large Language Models.
  3. Chevalier, A., et al. (2023). M3: Multi-Loss Mitigation of Hallucinations in LLM Compression.