关键词

| 数据增强 | 回译增强 | 语义等价 | 合成数据 | 课程学习 | 数据混合 | LLM生成数据 | Prompt扩增 | 噪声注入 | 质量控制 |


一、回译增强(Back-translation)

1.1 回译增强原理

回译增强是一种经典的数据增强技术,其核心思想是将源语言文本翻译成一种或多种中间语言,然后再翻译回源语言,从而生成语义相似但表达方式不同的新样本。这种方法在机器翻译领域已有成熟应用,近年来也被引入到大模型训练数据的增强中。

回译增强的优势体现在多个层面:

  • 多样性提升:通过翻译过程引入不同语言的表达习惯和句式结构
  • 泛化能力增强:模型能够学习到相同语义的不同表达方式
  • 数据规模扩展:每次回译可将单条数据扩展为多条

回译增强的局限性

回译生成的数据可能存在语义漂移(semantic drift),即多轮翻译后原意发生改变。此外,翻译模型的质量直接影响增强效果,低质量的翻译模型可能引入错误信息。

1.2 多语言回译实现

class MultilingualBackTranslator:
    """多语言回译数据增强器"""
    
    def __init__(self, translator_config):
        self.translators = self._initialize_translators(translator_config)
        self.intermediate_languages = translator_config.get(
            "languages", 
            ["de", "fr", "es", "ja", "ko", "ru"]
        )
        
    def _initialize_translators(self, config):
        """初始化翻译器"""
        translators = {}
        
        if config.get("use_openai"):
            from openai import OpenAI
            self.openai_client = OpenAI(api_key=config["api_key"])
        else:
            # 使用本地翻译模型
            for lang_pair in config.get("model_paths", []):
                translators[lang_pair] = self._load_model(lang_pair)
                
        return translators
        
    def back_translate(self, text, source_lang="zh", target_langs=None):
        """
        执行回译增强
        
        Args:
            text: 源文本
            source_lang: 源语言
            target_langs: 中间翻译语言列表
        """
        target_langs = target_langs or self.intermediate_languages
        results = []
        
        for target_lang in target_langs:
            try:
                # 正向翻译:源语言 -> 目标语言
                forward = self._translate(text, source_lang, target_lang)
                
                # 反向翻译:目标语言 -> 源语言
                backward = self._translate(forward, target_lang, source_lang)
                
                # 计算相似度
                similarity = self._calculate_similarity(text, backward)
                
                results.append({
                    "intermediate_lang": target_lang,
                    "forward_text": forward,
                    "backward_text": backward,
                    "similarity": similarity,
                    "quality": "high" if similarity > 0.85 else 
                              "medium" if similarity > 0.7 else "low"
                })
                
            except Exception as e:
                results.append({
                    "intermediate_lang": target_lang,
                    "error": str(e),
                    "quality": "failed"
                })
                
        return results
        
    def _translate(self, text, source_lang, target_lang):
        """执行翻译"""
        if hasattr(self, 'openai_client'):
            prompt = f"Translate the following text from {source_lang} to {target_lang}. Only output the translation, nothing else.\n\nText: {text}"
            
            response = self.openai_client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": prompt}],
                temperature=0.3,
                max_tokens=2048
            )
            
            return response.choices[0].message.content.strip()
        else:
            # 本地模型翻译
            return self.translators[f"{source_lang}_{target_lang}"].translate(text)
            
    def _calculate_similarity(self, text1, text2):
        """计算文本相似度"""
        from difflib import SequenceMatcher
        return SequenceMatcher(None, text1, text2).ratio()
        
    def batch_back_translate(self, texts, source_lang="zh",
                            quality_threshold=0.7, max_variants=3):
        """
        批量回译增强
        
        Args:
            texts: 文本列表
            source_lang: 源语言
            quality_threshold: 质量阈值(低于此值的增强结果将被丢弃)
            max_variants: 每个文本最多保留的变体数量
        """
        all_variants = []
        
        for i, text in enumerate(texts):
            variants = self.back_translate(text, source_lang)
            
            # 过滤低质量结果
            high_quality = [
                v for v in variants 
                if v["quality"] in ["high", "medium"]
                and v.get("similarity", 0) >= quality_threshold
            ]
            
            # 选择最佳变体
            best_variants = sorted(
                high_quality, 
                key=lambda x: x["similarity"], 
                reverse=True
            )[:max_variants]
            
            for variant in best_variants:
                all_variants.append({
                    "original": text,
                    "enhanced": variant["backward_text"],
                    "intermediate_lang": variant["intermediate_lang"],
                    "similarity": variant["similarity"],
                    "original_id": i
                })
                
        return all_variants

1.3 级联回译策略

class CascadedBackTranslator:
    """级联回译增强器"""
    
    def __init__(self):
        self.pipeline = []
        
    def add_stage(self, lang_pair):
        """
        添加回译阶段
        
        例如:添加("zh", "en")表示 zh -> en -> zh
        """
        self.pipeline.append(lang_pair)
        
    def cascaded_translate(self, text, source_lang="zh"):
        """
        执行级联回译
        
        例如:zh -> en -> fr -> de -> zh
        """
        current_text = text
        current_lang = source_lang
        history = [(text, source_lang, "original")]
        
        for target_lang, return_lang in self.pipeline:
            # 正向翻译
            forward = self._translate(current_text, current_lang, target_lang)
            history.append((forward, target_lang, "forward"))
            
            # 反向翻译(如果是往返阶段)
            if return_lang:
                backward = self._translate(forward, target_lang, return_lang)
                history.append((backward, return_lang, "backward"))
                current_text = backward
                current_lang = return_lang
            else:
                current_text = forward
                current_lang = target_lang
                
        return {
            "final_text": current_text,
            "translation_history": history,
            "n_stages": len(self.pipeline)
        }

二、语义等价变换

2.1 同义词替换与改写

class SemanticParaphraser:
    """语义等价改写器"""
    
    def __init__(self, model_path=None):
        if model_path:
            self.model = self._load_model(model_path)
        else:
            self.synonym_dict = self._load_synonym_dict()
            
    def synonym_replacement(self, text, n=5, strategy="random"):
        """
        同义词替换增强
        
        Args:
            text: 输入文本
            n: 替换次数
            strategy: 替换策略 (random, balanced, targeted)
        """
        import random
        
        words = text.split()
        result = words.copy()
        replaced_positions = []
        
        for _ in range(n):
            # 找到可替换的词
            replaceable = [
                i for i, w in enumerate(result) 
                if w in self.synonym_dict
                and i not in replaced_positions
            ]
            
            if not replaceable:
                break
                
            pos = random.choice(replaceable)
            original_word = result[pos]
            
            # 替换为同义词
            synonyms = self.synonym_dict[original_word]
            new_word = random.choice(synonyms)
            result[pos] = new_word
            
            replaced_positions.append(pos)
            
        return {
            "original": text,
            "paraphrased": " ".join(result),
            "n_replacements": len(replaced_positions),
            "replaced_positions": replaced_positions
        }
        
    def sentence_paraphrase(self, text, num_variants=3):
        """
        句子级别改写(使用LLM)
        """
        prompt = f"""请将下面的句子改写成{num_variants}个语义等价但表达不同的版本。
保持原意,但改变句式结构和用词。
 
原文:{text}
 
要求:
1. 每个改写版本都要保持与原文相同的核心语义
2. 改写版本之间应该有明显的表达差异
3. 保持相同的长度级别
 
直接输出{num_variants}个改写版本,用换行分隔:"""
        
        response = self._call_llm(prompt)
        
        variants = [v.strip() for v in response.split('\n') if v.strip()]
        
        return {
            "original": text,
            "variants": variants[:num_variants]
        }
        
    def structure_transformation(self, text):
        """
        句式结构转换
        """
        transformations = {
            "active_to_passive": self._to_passive,
            "direct_to_indirect": self._to_indirect_speech,
            "simple_to_complex": self._to_compound,
            "declarative_to_interrogative": self._to_question
        }
        
        results = {}
        for transform_name, transform_func in transformations.items():
            try:
                results[transform_name] = transform_func(text)
            except:
                results[transform_name] = None
                
        return {
            "original": text,
            "transformations": {k: v for k, v in results.items() if v}
        }

2.2 Prompt模板扩增

class PromptTemplateAugmenter:
    """Prompt模板扩增器"""
    
    def __init__(self):
        self.templates = self._load_templates()
        
    def _load_templates(self):
        """加载Prompt模板库"""
        return {
            "instruction_prefixes": [
                "请", "麻烦", "请问", "能否", "能否请您",
                "我需要", "我想知道", "帮我", "请帮我"
            ],
            "question_markers": [
                "?", "?", "吗", "呢", "啊"
            ],
            "response_starters": [
                "当然可以", "好的", "没问题", "以下是",
                "根据我的理解", "让我来解释", "答案是"
            ],
            "politeness_levels": [
                "", "请", "麻烦您", "劳烦", "恭请"
            ]
        }
        
    def augment_instruction(self, instruction):
        """
        通过模板扩增增加指令多样性
        """
        variants = []
        
        # 生成不同前缀组合
        for prefix in self.templates["instruction_prefixes"]:
            for suffix in ["", "一下", "帮忙"]:
                variant = f"{prefix}{instruction}{suffix}".strip()
                variants.append(variant)
                
        # 生成不同句式
        if not instruction.endswith(("?", "?", "吗")):
            question_variant = f"{self.templates['question_markers'][0].replace('?', '')}{instruction}?"
            variants.append(question_variant)
            
        return list(set(variants))
        
    def augment_response(self, response, include_style_variants=True):
        """
        响应模板扩增
        """
        variants = []
        
        # 添加不同的开头语
        if include_style_variants:
            for starter in self.templates["response_starters"]:
                if starter:
                    variant = f"{starter}{response}"
                    variants.append(variant)
                    
        # 添加/删除礼貌用语
        if response.startswith("好的"):
            variants.append(response.replace("好的", "行"))
            variants.append(response.replace("好的", "没问题"))
        elif response.startswith("可以"):
            variants.append(response.replace("可以", "没问题"))
            
        return variants
        
    def full_conversation_augment(self, instruction, response):
        """
        完整对话扩增
        """
        augmented_instructions = self.augment_instruction(instruction)
        augmented_responses = self.augment_response(response)
        
        combinations = []
        
        for inst in augmented_instructions[:5]:  # 限制组合数量
            for resp in augmented_responses[:3]:
                combinations.append({
                    "instruction": inst,
                    "response": resp
                })
                
        return combinations

三、数据混合策略

3.1 数据配比优化

class DataMixOptimizer:
    """数据混合优化器"""
    
    def __init__(self):
        self.datasets = {}
        self.quality_scores = {}
        
    def register_dataset(self, name, data_path, metadata=None):
        """注册数据集"""
        self.datasets[name] = {
            "path": data_path,
            "size": self._count_lines(data_path),
            "metadata": metadata or {},
            "stats": self._analyze_dataset(data_path)
        }
        
    def calculate_optimal_mix(self, target_capabilities, 
                            total_samples=100000):
        """
        计算最优数据混合比例
        
        Args:
            target_capabilities: 目标能力分布 {"编程": 0.3, "写作": 0.2, ...}
            total_samples: 总样本数限制
        """
        # 步骤1:计算各数据集的主题覆盖度
        topic_coverage = {}
        for name, dataset_info in self.datasets.items():
            coverage = {}
            for topic, weight in target_capabilities.items():
                topic_samples = dataset_info["stats"]["topic_distribution"].get(
                    topic, 0
                )
                coverage[topic] = topic_samples / dataset_info["size"]
            topic_coverage[name] = coverage
            
        # 步骤2:计算质量加权
        quality_weighted_coverage = {}
        for name, coverage in topic_coverage.items():
            quality = self.quality_scores.get(name, 0.7)
            quality_weighted_coverage[name] = {
                topic: cov * quality 
                for topic, cov in coverage.items()
            }
            
        # 步骤3:线性规划求解最优配比
        mix = self._solve_linear_program(
            quality_weighted_coverage,
            target_capabilities,
            self.datasets
        )
        
        # 步骤4:转换为实际样本数
        final_mix = {}
        for name, ratio in mix.items():
            final_mix[name] = {
                "ratio": ratio,
                "samples": int(total_samples * ratio),
                "source_samples": self.datasets[name]["size"]
            }
            
        return final_mix
        
    def _solve_linear_program(self, coverage, targets, datasets):
        """求解线性规划问题"""
        import numpy as np
        from scipy.optimize import linprog
        
        n_datasets = len(datasets)
        n_topics = len(targets)
        
        # 目标函数(最小化质量损失)
        c = np.zeros(n_datasets)
        
        # 不等式约束:覆盖度 >= 目标
        A_ub = []
        b_ub = []
        
        for i, (topic, target) in enumerate(targets.items()):
            row = [-coverage[name].get(topic, 0) for name in datasets.keys()]
            A_ub.append(row)
            b_ub.append(-target)
            
        A_ub = np.array(A_ub)
        b_ub = np.array(b_ub)
        
        # 等式约束:总和为1
        A_eq = np.ones((1, n_datasets))
        b_eq = np.array([1.0])
        
        # 边界约束:每个数据集占比 0-100%
        bounds = [(0, 1) for _ in range(n_datasets)]
        
        result = linprog(c, A_ub=A_ub, b_ub=b_ub, 
                        A_eq=A_eq, b_eq=b_eq, 
                        bounds=bounds)
        
        if result.success:
            return {
                name: ratio 
                for name, ratio in zip(datasets.keys(), result.x)
            }
        else:
            # 回退到均匀分配
            return {name: 1/n_datasets for name in datasets.keys()}

3.2 领域平衡采样

class BalancedSampler:
    """平衡采样器"""
    
    def __init__(self, strategy="stratified"):
        self.strategy = strategy
        
    def sample(self, dataset, n_samples, group_by="topic"):
        """
        平衡采样
        
        Args:
            dataset: 数据集
            n_samples: 目标样本数
            group_by: 分组字段
        """
        if self.strategy == "stratified":
            return self._stratified_sample(dataset, n_samples, group_by)
        elif self.strategy == "tempered":
            return self._tempered_sampling(dataset, n_samples, group_by)
        elif self.strategy == "quality_guided":
            return self._quality_guided_sample(dataset, n_samples)
            
    def _stratified_sample(self, dataset, n_samples, group_by):
        """分层采样"""
        from collections import defaultdict
        
        # 按组分类
        groups = defaultdict(list)
        for item in dataset:
            group_key = item.get(group_by, "unknown")
            groups[group_key].append(item)
            
        # 计算每组应采样的数量(按比例)
        total_size = sum(len(g) for g in groups.values())
        samples = []
        
        for group_key, group_items in groups.items():
            group_ratio = len(group_items) / total_size
            group_n = int(n_samples * group_ratio)
            
            # 组内随机采样
            import random
            sampled = random.sample(
                group_items, 
                min(group_n, len(group_items))
            )
            samples.extend(sampled)
            
        return samples
        
    def _tempered_sampling(self, dataset, n_samples, group_by, temperature=2.0):
        """温度调节采样(减少主导类别的采样概率)"""
        from collections import Counter
        
        # 统计各组数量
        group_counts = Counter(item.get(group_by, "unknown") for item in dataset)
        
        # 计算采样权重
        weights = {}
        for group, count in group_counts.items():
            # 使用温度调节的概率
            weights[group] = (count ** (1/temperature)) / sum(
                c ** (1/temperature) for c in group_counts.values()
            )
            
        # 按权重采样
        import random
        samples = random.choices(
            dataset,
            weights=[weights[item.get(group_by, "unknown")] for item in dataset],
            k=n_samples
        )
        
        return samples

四、合成数据生成(LLM生成数据)

4.1 合成数据生成策略

种子数据扩展

class SyntheticDataGenerator:
    """合成数据生成器"""
    
    def __init__(self, llm_client):
        self.llm = llm_client
        
    def expand_from_seeds(self, seed_data, num_variants=100,
                         domain="general"):
        """
        从种子数据生成变体
        
        使用少量高质量种子,生成大量相似数据
        """
        prompts = {
            "general": self._general_expansion_prompt,
            "code": self._code_expansion_prompt,
            "math": self._math_expansion_prompt,
            "creative": self._creative_expansion_prompt
        }
        
        prompt_template = prompts.get(domain, self._general_expansion_prompt)
        
        generated = []
        batch_size = 10
        
        for i in range(0, len(seed_data), batch_size):
            batch = seed_data[i:i+batch_size]
            
            prompt = prompt_template(batch, num_variants // (len(seed_data)//batch_size))
            
            response = self.llm.generate(prompt)
            
            # 解析生成的数据
            batch_generated = self._parse_generated_data(response)
            generated.extend(batch_generated)
            
        return generated
        
    def _general_expansion_prompt(self, seeds, num_variants):
        """通用扩展Prompt"""
        seed_examples = "\n".join(
            f"- {s['instruction']} -> {s['response'][:100]}..."
            for s in seeds[:5]
        )
        
        return f"""你是一个高质量训练数据生成器。请基于以下种子示例,生成{num_variants}个新的指令-响应对。
 
种子示例:
{seed_examples}
 
生成要求:
1. 保持与种子示例相同的格式和质量水平
2. 覆盖不同的主题和场景
3. 指令应该清晰、具体、多样
4. 响应应该准确、有帮助、专业
5. 避免生成重复或过于相似的内容
 
直接输出JSON格式的数据列表,每条包含instruction和response字段:"""
        
    def generate_instruction_variants(self, instruction, 
                                     num_variants=5,
                                     diversity="high"):
        """
        为单个指令生成多个变体
        """
        diversity_prompts = {
            "high": "生成风格、长度、复杂度差异大的变体",
            "medium": "生成中等差异的变体",
            "low": "生成相近但有细微差别的变体"
        }
        
        prompt = f"""请为以下指令生成{num_variants}个变体。
 
原文:{instruction}
 
{diversity_prompts[diversity]}
 
变体应该:
- 保持相同的核心意图
- 使用不同的表达方式和句式结构
- 可能有不同的复杂度级别
 
输出格式:每行一个变体"""
        
        response = self.llm.generate(prompt)
        
        variants = [v.strip() for v in response.split('\n') if v.strip()]
        
        return {
            "original": instruction,
            "variants": variants[:num_variants]
        }

自我对弈数据生成

class SelfPlayDataGenerator:
    """自我对弈数据生成器"""
    
    def __init__(self, model):
        self.model = model
        
    def generate_debate_data(self, topic, num_rounds=3):
        """
        生成辩论式对话数据
        
        两个模型扮演不同立场进行辩论
        """
        debate_history = []
        
        # 初始化两个立场
        position_a = {
            "role": "assistant",
            "content": f"你是一个支持方辩手,请就「{topic}」展开论述,"
                      f"提供至少3个有力的支持论据。"
        }
        
        position_b = {
            "role": "assistant", 
            "content": f"你是一个反对方辩手,请就「{topic}」展开论述,"
                      f"提供至少3个有力的反对论据。"
        }
        
        for round_num in range(num_rounds):
            # A方发言
            response_a = self.model.generate([position_a])
            debate_history.append({
                "round": round_num + 1,
                "speaker": "A",
                "content": response_a,
                "position": "supporting"
            })
            
            # B方回应
            context_b = self._build_debate_context(debate_history, "B")
            response_b = self.model.generate(context_b)
            debate_history.append({
                "round": round_num + 1,
                "speaker": "B",
                "content": response_b,
                "position": "opposing"
            })
            
        # 生成总结
        summary = self._generate_debate_summary(debate_history, topic)
        
        return {
            "topic": topic,
            "debate_history": debate_history,
            "summary": summary
        }
        
    def generate_socratic_dialogue(self, initial_question):
        """
        生成苏格拉底式对话数据
        
        一个角色提问引导思考,另一个角色逐步深入回答
        """
        dialogue = []
        
        current_question = initial_question
        current_depth = 0
        max_depth = 5
        
        while current_depth < max_depth:
            # 引导者提问
            probe_prompt = f"""基于这个问题:「{current_question}
            
请生成一个追问,引导对方更深入地思考。不要直接给出答案,而是通过提问引导。
 
生成一个追问(不超过30字):"""
            
            probe_question = self.model.generate(probe_prompt)
            dialogue.append({
                "speaker": "guide",
                "type": "question",
                "content": probe_question
            })
            
            # 回答者反思
            reflect_prompt = f"""问题:「{probe_question}
之前的回答:「{dialogue[-2]['content'] if len(dialogue) > 1 else initial_question}
 
请给出一个反思性的回答,展示思考过程。不需要完整答案,而是展示思考的深度和复杂性。"""
            
            reflection = self.model.generate(reflect_prompt)
            dialogue.append({
                "speaker": "reflector",
                "type": "reflection",
                "content": reflection
            })
            
            current_question = probe_question
            current_depth += 1
            
        return dialogue

4.2 合成数据质量控制

class SyntheticDataQualityController:
    """合成数据质量控制器"""
    
    def __init__(self, quality_model):
        self.quality_model = quality_model
        self.quality_threshold = 0.6
        
    def filter_synthetic_data(self, synthetic_data):
        """
        过滤低质量合成数据
        """
        filtered = []
        rejected = []
        
        for item in synthetic_data:
            quality_score = self._assess_quality(item)
            
            if quality_score >= self.quality_threshold:
                filtered.append({
                    **item,
                    "quality_score": quality_score,
                    "approved": True
                })
            else:
                rejected.append({
                    **item,
                    "quality_score": quality_score,
                    "rejection_reason": self._explain_rejection(quality_score)
                })
                
        return {
            "approved": filtered,
            "rejected": rejected,
            "approval_rate": len(filtered) / len(synthetic_data) if synthetic_data else 0
        }
        
    def _assess_quality(self, item):
        """评估单条数据质量"""
        # 多维度评估
        scores = {
            "relevance": self._check_relevance(item),
            "helpfulness": self._check_helpfulness(item),
            "safety": self._check_safety(item),
            "format": self._check_format(item)
        }
        
        # 综合评分
        weights = {"relevance": 0.3, "helpfulness": 0.35, 
                  "safety": 0.25, "format": 0.1}
        
        return sum(scores[k] * weights[k] for k in scores)
        
    def iterative_refinement(self, initial_data, max_iterations=3):
        """
        迭代优化合成数据
        
        对低质量数据进行重生成
        """
        current_data = initial_data
        iteration = 0
        
        while iteration < max_iterations:
            # 评估当前数据
            assessment = self.filter_synthetic_data(current_data)
            
            # 检查是否需要继续优化
            if assessment["approval_rate"] > 0.9:
                break
                
            # 对低质量数据进行重生成
            low_quality = assessment["rejected"]
            
            if not low_quality:
                break
                
            regenerated = []
            for item in low_quality:
                refined = self._refine_item(item)
                regenerated.append(refined)
                
            # 合并优化后的数据
            current_data = assessment["approved"] + regenerated
            iteration += 1
            
        return current_data

五、课程学习(Curriculum Learning)

5.1 课程学习原理

课程学习是一种训练策略,其核心思想是让模型从简单样本开始学习,逐步过渡到复杂样本。这种方法受到人类学习过程的启发——我们通常先学习基础概念,再逐步掌握复杂知识。

在数据增强领域,课程学习意味着:

  • 难度分级:将训练数据按难度分为多个级别
  • 渐进式引入:从简单数据开始训练,逐步加入复杂数据
  • 自适应调度:根据模型表现动态调整课程进度
class CurriculumScheduler:
    """课程学习调度器"""
    
    def __init__(self, difficulty_classifier):
        self.classifier = difficulty_classifier
        self.current_stage = 0
        
    def create_curriculum(self, dataset, num_stages=4):
        """
        创建课程计划
        
        将数据分为多个难度阶段
        """
        # 评估每个样本的难度
        difficulties = []
        for item in dataset:
            difficulty_score = self.classifier.predict_difficulty(item)
            difficulties.append({
                **item,
                "difficulty_score": difficulty_score
            })
            
        # 按难度排序
        sorted_data = sorted(difficulties, key=lambda x: x["difficulty_score"])
        
        # 分割成多个阶段
        stage_size = len(sorted_data) // num_stages
        curriculum = []
        
        for i in range(num_stages):
            start_idx = i * stage_size
            end_idx = start_idx + stage_size if i < num_stages - 1 else len(sorted_data)
            
            stage_data = sorted_data[start_idx:end_idx]
            
            curriculum.append({
                "stage": i + 1,
                "name": self._get_stage_name(i, num_stages),
                "samples": stage_data,
                "difficulty_range": {
                    "min": stage_data[0]["difficulty_score"],
                    "max": stage_data[-1]["difficulty_score"]
                },
                "avg_difficulty": np.mean([s["difficulty_score"] for s in stage_data])
            })
            
        return curriculum
        
    def _get_stage_name(self, stage_idx, total_stages):
        """获取阶段名称"""
        names = {
            0: "Foundation - Basic Concepts",
            1: "Development - Core Skills",
            2: "Advanced - Complex Applications",
            3: "Expert - Edge Cases & Mastery"
        }
        
        if total_stages == 3:
            names = {0: "基础", 1: "进阶", 2: "高级"}
        elif total_stages == 5:
            names = {0: "入门", 1: "基础", 2: "进阶", 3: "高级", 4: "专家"}
            
        return names.get(stage_idx, f"Stage {stage_idx + 1}")
        
    def get_next_stage_batch(self, model_performance):
        """
        根据模型表现获取下一阶段数据
        """
        if model_performance >= 0.85:
            # 表现优秀,升级到下一阶段
            self.current_stage = min(self.current_stage + 1, len(self.curriculum) - 1)
        elif model_performance < 0.6:
            # 表现不佳,回退到上一阶段
            self.current_stage = max(self.current_stage - 1, 0)
            
        return self.curriculum[self.current_stage]

5.2 难度评估器

class DifficultyClassifier:
    """难度分类器"""
    
    def __init__(self):
        self.features = self._extract_difficulty_features()
        
    def _extract_difficulty_features(self):
        """提取难度特征"""
        return {
            "length": self._length_difficulty,
            "vocabulary": self._vocabulary_difficulty,
            "structure": self._structure_difficulty,
            "reasoning": self._reasoning_difficulty,
            "domain": self._domain_difficulty
        }
        
    def predict_difficulty(self, item):
        """
        预测样本难度
        
        综合多个维度计算难度分数
        """
        instruction = item.get("instruction", "")
        response = item.get("response", "")
        
        scores = {
            "length": self._length_difficulty(instruction, response),
            "vocabulary": self._vocabulary_difficulty(instruction),
            "structure": self._structure_difficulty(instruction, response),
            "reasoning": self._reasoning_difficulty(instruction, response),
            "domain": self._domain_difficulty(instruction)
        }
        
        # 加权求和
        weights = {
            "length": 0.15,
            "vocabulary": 0.15,
            "structure": 0.20,
            "reasoning": 0.35,
            "domain": 0.15
        }
        
        difficulty_score = sum(
            scores[feature] * weights[feature]
            for feature in scores
        )
        
        return {
            "score": difficulty_score,
            "level": self._score_to_level(difficulty_score),
            "breakdown": scores
        }
        
    def _length_difficulty(self, instruction, response):
        """基于长度评估难度"""
        total_length = len(instruction) + len(response)
        
        # 长度与难度正相关(到一定阈值后趋于平稳)
        if total_length < 100:
            return 0.2
        elif total_length < 500:
            return 0.4
        elif total_length < 1000:
            return 0.6
        elif total_length < 2000:
            return 0.8
        else:
            return 0.95
            
    def _reasoning_difficulty(self, instruction, response):
        """基于推理复杂度评估"""
        reasoning_keywords = [
            "为什么", "如何", "解释", "分析", "推理",
            "证明", "计算", "推导", "比较", "对比"
        ]
        
        keyword_count = sum(
            1 for kw in reasoning_keywords
            if kw in instruction
        )
        
        # 多步推理检测
        multi_step_indicators = ["首先", "其次", "然后", "最后", "因此", "所以", "从而"]
        step_count = sum(
            1 for ind in multi_step_indicators
            if ind in response
        )
        
        base_score = min(keyword_count * 0.15, 0.5)
        step_score = min(step_count * 0.1, 0.5)
        
        return min(base_score + step_score, 1.0)
        
    def _score_to_level(self, score):
        """将分数转换为难度级别"""
        if score < 0.3:
            return "easy"
        elif score < 0.5:
            return "medium"
        elif score < 0.7:
            return "hard"
        else:
            return "expert"

六、数据质量vs数量

6.1 质量数量权衡理论

在数据增强中,质量与数量之间存在复杂的权衡关系。研究表明:

  • Jensen不等式效应:低质量数据的平均损失不等于平均损失,错误数据可能放大梯度
  • 噪声放大效应:模型在训练时会放大数据中的噪声,导致过度记忆错误模式
  • 稀缺信号淹没:大量低质量数据可能淹没高质量数据中的关键信号

经验法则

在指令微调阶段,100条高质量数据(准确、多样、格式规范)往往优于10000条未经筛选的原始数据。

6.2 自适应增强策略

class AdaptiveAugmentationStrategy:
    """自适应增强策略"""
    
    def __init__(self):
        self.quality_model = None
        self.current_policy = {}
        
    def decide_augmentation(self, sample, target_quality=0.8):
        """
        决定是否对样本进行增强以及增强策略
        
        Returns:
            augmentation_plan: 包含增强方法和强度
        """
        quality = self._assess_quality(sample)
        
        if quality >= target_quality:
            # 高质量样本:轻度增强,保持原有特性
            return {
                "augment": True,
                "methods": ["paraphrase_light"],
                "intensity": 0.3,
                "expected_quality": min(quality * 0.95, target_quality)
            }
            
        elif quality >= 0.5:
            # 中等质量样本:适度增强,修复潜在问题
            return {
                "augment": True,
                "methods": ["paraphrase_medium", "format_fix"],
                "intensity": 0.6,
                "expected_quality": min(quality * 1.2, target_quality)
            }
            
        else:
            # 低质量样本:考虑丢弃或重大重构
            return {
                "augment": False,
                "recommendation": "discard",
                "reason": "quality_too_low",
                "alternative": "regenerate_from_template"
            }
            
    def optimize_augmentation_ratio(self, dataset, budget=100000):
        """
        优化增强数据比例
        
        在给定预算下找到最优的数据增强组合
        """
        quality_dist = self._analyze_quality_distribution(dataset)
        
        # 策略:优先增强中等质量数据
        # 低质量数据只增强少量,高质量数据适度扩充
        
        allocation = {
            "high_quality_original": int(budget * 0.3),
            "high_quality_augmented": int(budget * 0.2),
            "medium_quality_original": int(budget * 0.2),
            "medium_quality_augmented": int(budget * 0.25),
            "low_quality_regenerated": int(budget * 0.05)
        }
        
        return allocation

相关文档