关键词
| 数据增强 | 回译增强 | 语义等价 | 合成数据 | 课程学习 | 数据混合 | LLM生成数据 | Prompt扩增 | 噪声注入 | 质量控制 |
一、回译增强(Back-translation)
1.1 回译增强原理
回译增强是一种经典的数据增强技术,其核心思想是将源语言文本翻译成一种或多种中间语言,然后再翻译回源语言,从而生成语义相似但表达方式不同的新样本。这种方法在机器翻译领域已有成熟应用,近年来也被引入到大模型训练数据的增强中。
回译增强的优势体现在多个层面:
- 多样性提升:通过翻译过程引入不同语言的表达习惯和句式结构
- 泛化能力增强:模型能够学习到相同语义的不同表达方式
- 数据规模扩展:每次回译可将单条数据扩展为多条
回译增强的局限性
回译生成的数据可能存在语义漂移(semantic drift),即多轮翻译后原意发生改变。此外,翻译模型的质量直接影响增强效果,低质量的翻译模型可能引入错误信息。
1.2 多语言回译实现
class MultilingualBackTranslator:
"""多语言回译数据增强器"""
def __init__(self, translator_config):
self.translators = self._initialize_translators(translator_config)
self.intermediate_languages = translator_config.get(
"languages",
["de", "fr", "es", "ja", "ko", "ru"]
)
def _initialize_translators(self, config):
"""初始化翻译器"""
translators = {}
if config.get("use_openai"):
from openai import OpenAI
self.openai_client = OpenAI(api_key=config["api_key"])
else:
# 使用本地翻译模型
for lang_pair in config.get("model_paths", []):
translators[lang_pair] = self._load_model(lang_pair)
return translators
def back_translate(self, text, source_lang="zh", target_langs=None):
"""
执行回译增强
Args:
text: 源文本
source_lang: 源语言
target_langs: 中间翻译语言列表
"""
target_langs = target_langs or self.intermediate_languages
results = []
for target_lang in target_langs:
try:
# 正向翻译:源语言 -> 目标语言
forward = self._translate(text, source_lang, target_lang)
# 反向翻译:目标语言 -> 源语言
backward = self._translate(forward, target_lang, source_lang)
# 计算相似度
similarity = self._calculate_similarity(text, backward)
results.append({
"intermediate_lang": target_lang,
"forward_text": forward,
"backward_text": backward,
"similarity": similarity,
"quality": "high" if similarity > 0.85 else
"medium" if similarity > 0.7 else "low"
})
except Exception as e:
results.append({
"intermediate_lang": target_lang,
"error": str(e),
"quality": "failed"
})
return results
def _translate(self, text, source_lang, target_lang):
"""执行翻译"""
if hasattr(self, 'openai_client'):
prompt = f"Translate the following text from {source_lang} to {target_lang}. Only output the translation, nothing else.\n\nText: {text}"
response = self.openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=2048
)
return response.choices[0].message.content.strip()
else:
# 本地模型翻译
return self.translators[f"{source_lang}_{target_lang}"].translate(text)
def _calculate_similarity(self, text1, text2):
"""计算文本相似度"""
from difflib import SequenceMatcher
return SequenceMatcher(None, text1, text2).ratio()
def batch_back_translate(self, texts, source_lang="zh",
quality_threshold=0.7, max_variants=3):
"""
批量回译增强
Args:
texts: 文本列表
source_lang: 源语言
quality_threshold: 质量阈值(低于此值的增强结果将被丢弃)
max_variants: 每个文本最多保留的变体数量
"""
all_variants = []
for i, text in enumerate(texts):
variants = self.back_translate(text, source_lang)
# 过滤低质量结果
high_quality = [
v for v in variants
if v["quality"] in ["high", "medium"]
and v.get("similarity", 0) >= quality_threshold
]
# 选择最佳变体
best_variants = sorted(
high_quality,
key=lambda x: x["similarity"],
reverse=True
)[:max_variants]
for variant in best_variants:
all_variants.append({
"original": text,
"enhanced": variant["backward_text"],
"intermediate_lang": variant["intermediate_lang"],
"similarity": variant["similarity"],
"original_id": i
})
return all_variants1.3 级联回译策略
class CascadedBackTranslator:
"""级联回译增强器"""
def __init__(self):
self.pipeline = []
def add_stage(self, lang_pair):
"""
添加回译阶段
例如:添加("zh", "en")表示 zh -> en -> zh
"""
self.pipeline.append(lang_pair)
def cascaded_translate(self, text, source_lang="zh"):
"""
执行级联回译
例如:zh -> en -> fr -> de -> zh
"""
current_text = text
current_lang = source_lang
history = [(text, source_lang, "original")]
for target_lang, return_lang in self.pipeline:
# 正向翻译
forward = self._translate(current_text, current_lang, target_lang)
history.append((forward, target_lang, "forward"))
# 反向翻译(如果是往返阶段)
if return_lang:
backward = self._translate(forward, target_lang, return_lang)
history.append((backward, return_lang, "backward"))
current_text = backward
current_lang = return_lang
else:
current_text = forward
current_lang = target_lang
return {
"final_text": current_text,
"translation_history": history,
"n_stages": len(self.pipeline)
}二、语义等价变换
2.1 同义词替换与改写
class SemanticParaphraser:
"""语义等价改写器"""
def __init__(self, model_path=None):
if model_path:
self.model = self._load_model(model_path)
else:
self.synonym_dict = self._load_synonym_dict()
def synonym_replacement(self, text, n=5, strategy="random"):
"""
同义词替换增强
Args:
text: 输入文本
n: 替换次数
strategy: 替换策略 (random, balanced, targeted)
"""
import random
words = text.split()
result = words.copy()
replaced_positions = []
for _ in range(n):
# 找到可替换的词
replaceable = [
i for i, w in enumerate(result)
if w in self.synonym_dict
and i not in replaced_positions
]
if not replaceable:
break
pos = random.choice(replaceable)
original_word = result[pos]
# 替换为同义词
synonyms = self.synonym_dict[original_word]
new_word = random.choice(synonyms)
result[pos] = new_word
replaced_positions.append(pos)
return {
"original": text,
"paraphrased": " ".join(result),
"n_replacements": len(replaced_positions),
"replaced_positions": replaced_positions
}
def sentence_paraphrase(self, text, num_variants=3):
"""
句子级别改写(使用LLM)
"""
prompt = f"""请将下面的句子改写成{num_variants}个语义等价但表达不同的版本。
保持原意,但改变句式结构和用词。
原文:{text}
要求:
1. 每个改写版本都要保持与原文相同的核心语义
2. 改写版本之间应该有明显的表达差异
3. 保持相同的长度级别
直接输出{num_variants}个改写版本,用换行分隔:"""
response = self._call_llm(prompt)
variants = [v.strip() for v in response.split('\n') if v.strip()]
return {
"original": text,
"variants": variants[:num_variants]
}
def structure_transformation(self, text):
"""
句式结构转换
"""
transformations = {
"active_to_passive": self._to_passive,
"direct_to_indirect": self._to_indirect_speech,
"simple_to_complex": self._to_compound,
"declarative_to_interrogative": self._to_question
}
results = {}
for transform_name, transform_func in transformations.items():
try:
results[transform_name] = transform_func(text)
except:
results[transform_name] = None
return {
"original": text,
"transformations": {k: v for k, v in results.items() if v}
}2.2 Prompt模板扩增
class PromptTemplateAugmenter:
"""Prompt模板扩增器"""
def __init__(self):
self.templates = self._load_templates()
def _load_templates(self):
"""加载Prompt模板库"""
return {
"instruction_prefixes": [
"请", "麻烦", "请问", "能否", "能否请您",
"我需要", "我想知道", "帮我", "请帮我"
],
"question_markers": [
"?", "?", "吗", "呢", "啊"
],
"response_starters": [
"当然可以", "好的", "没问题", "以下是",
"根据我的理解", "让我来解释", "答案是"
],
"politeness_levels": [
"", "请", "麻烦您", "劳烦", "恭请"
]
}
def augment_instruction(self, instruction):
"""
通过模板扩增增加指令多样性
"""
variants = []
# 生成不同前缀组合
for prefix in self.templates["instruction_prefixes"]:
for suffix in ["", "一下", "帮忙"]:
variant = f"{prefix}{instruction}{suffix}".strip()
variants.append(variant)
# 生成不同句式
if not instruction.endswith(("?", "?", "吗")):
question_variant = f"{self.templates['question_markers'][0].replace('?', '')}{instruction}?"
variants.append(question_variant)
return list(set(variants))
def augment_response(self, response, include_style_variants=True):
"""
响应模板扩增
"""
variants = []
# 添加不同的开头语
if include_style_variants:
for starter in self.templates["response_starters"]:
if starter:
variant = f"{starter},{response}"
variants.append(variant)
# 添加/删除礼貌用语
if response.startswith("好的"):
variants.append(response.replace("好的", "行"))
variants.append(response.replace("好的", "没问题"))
elif response.startswith("可以"):
variants.append(response.replace("可以", "没问题"))
return variants
def full_conversation_augment(self, instruction, response):
"""
完整对话扩增
"""
augmented_instructions = self.augment_instruction(instruction)
augmented_responses = self.augment_response(response)
combinations = []
for inst in augmented_instructions[:5]: # 限制组合数量
for resp in augmented_responses[:3]:
combinations.append({
"instruction": inst,
"response": resp
})
return combinations三、数据混合策略
3.1 数据配比优化
class DataMixOptimizer:
"""数据混合优化器"""
def __init__(self):
self.datasets = {}
self.quality_scores = {}
def register_dataset(self, name, data_path, metadata=None):
"""注册数据集"""
self.datasets[name] = {
"path": data_path,
"size": self._count_lines(data_path),
"metadata": metadata or {},
"stats": self._analyze_dataset(data_path)
}
def calculate_optimal_mix(self, target_capabilities,
total_samples=100000):
"""
计算最优数据混合比例
Args:
target_capabilities: 目标能力分布 {"编程": 0.3, "写作": 0.2, ...}
total_samples: 总样本数限制
"""
# 步骤1:计算各数据集的主题覆盖度
topic_coverage = {}
for name, dataset_info in self.datasets.items():
coverage = {}
for topic, weight in target_capabilities.items():
topic_samples = dataset_info["stats"]["topic_distribution"].get(
topic, 0
)
coverage[topic] = topic_samples / dataset_info["size"]
topic_coverage[name] = coverage
# 步骤2:计算质量加权
quality_weighted_coverage = {}
for name, coverage in topic_coverage.items():
quality = self.quality_scores.get(name, 0.7)
quality_weighted_coverage[name] = {
topic: cov * quality
for topic, cov in coverage.items()
}
# 步骤3:线性规划求解最优配比
mix = self._solve_linear_program(
quality_weighted_coverage,
target_capabilities,
self.datasets
)
# 步骤4:转换为实际样本数
final_mix = {}
for name, ratio in mix.items():
final_mix[name] = {
"ratio": ratio,
"samples": int(total_samples * ratio),
"source_samples": self.datasets[name]["size"]
}
return final_mix
def _solve_linear_program(self, coverage, targets, datasets):
"""求解线性规划问题"""
import numpy as np
from scipy.optimize import linprog
n_datasets = len(datasets)
n_topics = len(targets)
# 目标函数(最小化质量损失)
c = np.zeros(n_datasets)
# 不等式约束:覆盖度 >= 目标
A_ub = []
b_ub = []
for i, (topic, target) in enumerate(targets.items()):
row = [-coverage[name].get(topic, 0) for name in datasets.keys()]
A_ub.append(row)
b_ub.append(-target)
A_ub = np.array(A_ub)
b_ub = np.array(b_ub)
# 等式约束:总和为1
A_eq = np.ones((1, n_datasets))
b_eq = np.array([1.0])
# 边界约束:每个数据集占比 0-100%
bounds = [(0, 1) for _ in range(n_datasets)]
result = linprog(c, A_ub=A_ub, b_ub=b_ub,
A_eq=A_eq, b_eq=b_eq,
bounds=bounds)
if result.success:
return {
name: ratio
for name, ratio in zip(datasets.keys(), result.x)
}
else:
# 回退到均匀分配
return {name: 1/n_datasets for name in datasets.keys()}3.2 领域平衡采样
class BalancedSampler:
"""平衡采样器"""
def __init__(self, strategy="stratified"):
self.strategy = strategy
def sample(self, dataset, n_samples, group_by="topic"):
"""
平衡采样
Args:
dataset: 数据集
n_samples: 目标样本数
group_by: 分组字段
"""
if self.strategy == "stratified":
return self._stratified_sample(dataset, n_samples, group_by)
elif self.strategy == "tempered":
return self._tempered_sampling(dataset, n_samples, group_by)
elif self.strategy == "quality_guided":
return self._quality_guided_sample(dataset, n_samples)
def _stratified_sample(self, dataset, n_samples, group_by):
"""分层采样"""
from collections import defaultdict
# 按组分类
groups = defaultdict(list)
for item in dataset:
group_key = item.get(group_by, "unknown")
groups[group_key].append(item)
# 计算每组应采样的数量(按比例)
total_size = sum(len(g) for g in groups.values())
samples = []
for group_key, group_items in groups.items():
group_ratio = len(group_items) / total_size
group_n = int(n_samples * group_ratio)
# 组内随机采样
import random
sampled = random.sample(
group_items,
min(group_n, len(group_items))
)
samples.extend(sampled)
return samples
def _tempered_sampling(self, dataset, n_samples, group_by, temperature=2.0):
"""温度调节采样(减少主导类别的采样概率)"""
from collections import Counter
# 统计各组数量
group_counts = Counter(item.get(group_by, "unknown") for item in dataset)
# 计算采样权重
weights = {}
for group, count in group_counts.items():
# 使用温度调节的概率
weights[group] = (count ** (1/temperature)) / sum(
c ** (1/temperature) for c in group_counts.values()
)
# 按权重采样
import random
samples = random.choices(
dataset,
weights=[weights[item.get(group_by, "unknown")] for item in dataset],
k=n_samples
)
return samples四、合成数据生成(LLM生成数据)
4.1 合成数据生成策略
种子数据扩展
class SyntheticDataGenerator:
"""合成数据生成器"""
def __init__(self, llm_client):
self.llm = llm_client
def expand_from_seeds(self, seed_data, num_variants=100,
domain="general"):
"""
从种子数据生成变体
使用少量高质量种子,生成大量相似数据
"""
prompts = {
"general": self._general_expansion_prompt,
"code": self._code_expansion_prompt,
"math": self._math_expansion_prompt,
"creative": self._creative_expansion_prompt
}
prompt_template = prompts.get(domain, self._general_expansion_prompt)
generated = []
batch_size = 10
for i in range(0, len(seed_data), batch_size):
batch = seed_data[i:i+batch_size]
prompt = prompt_template(batch, num_variants // (len(seed_data)//batch_size))
response = self.llm.generate(prompt)
# 解析生成的数据
batch_generated = self._parse_generated_data(response)
generated.extend(batch_generated)
return generated
def _general_expansion_prompt(self, seeds, num_variants):
"""通用扩展Prompt"""
seed_examples = "\n".join(
f"- {s['instruction']} -> {s['response'][:100]}..."
for s in seeds[:5]
)
return f"""你是一个高质量训练数据生成器。请基于以下种子示例,生成{num_variants}个新的指令-响应对。
种子示例:
{seed_examples}
生成要求:
1. 保持与种子示例相同的格式和质量水平
2. 覆盖不同的主题和场景
3. 指令应该清晰、具体、多样
4. 响应应该准确、有帮助、专业
5. 避免生成重复或过于相似的内容
直接输出JSON格式的数据列表,每条包含instruction和response字段:"""
def generate_instruction_variants(self, instruction,
num_variants=5,
diversity="high"):
"""
为单个指令生成多个变体
"""
diversity_prompts = {
"high": "生成风格、长度、复杂度差异大的变体",
"medium": "生成中等差异的变体",
"low": "生成相近但有细微差别的变体"
}
prompt = f"""请为以下指令生成{num_variants}个变体。
原文:{instruction}
{diversity_prompts[diversity]}
变体应该:
- 保持相同的核心意图
- 使用不同的表达方式和句式结构
- 可能有不同的复杂度级别
输出格式:每行一个变体"""
response = self.llm.generate(prompt)
variants = [v.strip() for v in response.split('\n') if v.strip()]
return {
"original": instruction,
"variants": variants[:num_variants]
}自我对弈数据生成
class SelfPlayDataGenerator:
"""自我对弈数据生成器"""
def __init__(self, model):
self.model = model
def generate_debate_data(self, topic, num_rounds=3):
"""
生成辩论式对话数据
两个模型扮演不同立场进行辩论
"""
debate_history = []
# 初始化两个立场
position_a = {
"role": "assistant",
"content": f"你是一个支持方辩手,请就「{topic}」展开论述,"
f"提供至少3个有力的支持论据。"
}
position_b = {
"role": "assistant",
"content": f"你是一个反对方辩手,请就「{topic}」展开论述,"
f"提供至少3个有力的反对论据。"
}
for round_num in range(num_rounds):
# A方发言
response_a = self.model.generate([position_a])
debate_history.append({
"round": round_num + 1,
"speaker": "A",
"content": response_a,
"position": "supporting"
})
# B方回应
context_b = self._build_debate_context(debate_history, "B")
response_b = self.model.generate(context_b)
debate_history.append({
"round": round_num + 1,
"speaker": "B",
"content": response_b,
"position": "opposing"
})
# 生成总结
summary = self._generate_debate_summary(debate_history, topic)
return {
"topic": topic,
"debate_history": debate_history,
"summary": summary
}
def generate_socratic_dialogue(self, initial_question):
"""
生成苏格拉底式对话数据
一个角色提问引导思考,另一个角色逐步深入回答
"""
dialogue = []
current_question = initial_question
current_depth = 0
max_depth = 5
while current_depth < max_depth:
# 引导者提问
probe_prompt = f"""基于这个问题:「{current_question}」
请生成一个追问,引导对方更深入地思考。不要直接给出答案,而是通过提问引导。
生成一个追问(不超过30字):"""
probe_question = self.model.generate(probe_prompt)
dialogue.append({
"speaker": "guide",
"type": "question",
"content": probe_question
})
# 回答者反思
reflect_prompt = f"""问题:「{probe_question}」
之前的回答:「{dialogue[-2]['content'] if len(dialogue) > 1 else initial_question}」
请给出一个反思性的回答,展示思考过程。不需要完整答案,而是展示思考的深度和复杂性。"""
reflection = self.model.generate(reflect_prompt)
dialogue.append({
"speaker": "reflector",
"type": "reflection",
"content": reflection
})
current_question = probe_question
current_depth += 1
return dialogue4.2 合成数据质量控制
class SyntheticDataQualityController:
"""合成数据质量控制器"""
def __init__(self, quality_model):
self.quality_model = quality_model
self.quality_threshold = 0.6
def filter_synthetic_data(self, synthetic_data):
"""
过滤低质量合成数据
"""
filtered = []
rejected = []
for item in synthetic_data:
quality_score = self._assess_quality(item)
if quality_score >= self.quality_threshold:
filtered.append({
**item,
"quality_score": quality_score,
"approved": True
})
else:
rejected.append({
**item,
"quality_score": quality_score,
"rejection_reason": self._explain_rejection(quality_score)
})
return {
"approved": filtered,
"rejected": rejected,
"approval_rate": len(filtered) / len(synthetic_data) if synthetic_data else 0
}
def _assess_quality(self, item):
"""评估单条数据质量"""
# 多维度评估
scores = {
"relevance": self._check_relevance(item),
"helpfulness": self._check_helpfulness(item),
"safety": self._check_safety(item),
"format": self._check_format(item)
}
# 综合评分
weights = {"relevance": 0.3, "helpfulness": 0.35,
"safety": 0.25, "format": 0.1}
return sum(scores[k] * weights[k] for k in scores)
def iterative_refinement(self, initial_data, max_iterations=3):
"""
迭代优化合成数据
对低质量数据进行重生成
"""
current_data = initial_data
iteration = 0
while iteration < max_iterations:
# 评估当前数据
assessment = self.filter_synthetic_data(current_data)
# 检查是否需要继续优化
if assessment["approval_rate"] > 0.9:
break
# 对低质量数据进行重生成
low_quality = assessment["rejected"]
if not low_quality:
break
regenerated = []
for item in low_quality:
refined = self._refine_item(item)
regenerated.append(refined)
# 合并优化后的数据
current_data = assessment["approved"] + regenerated
iteration += 1
return current_data五、课程学习(Curriculum Learning)
5.1 课程学习原理
课程学习是一种训练策略,其核心思想是让模型从简单样本开始学习,逐步过渡到复杂样本。这种方法受到人类学习过程的启发——我们通常先学习基础概念,再逐步掌握复杂知识。
在数据增强领域,课程学习意味着:
- 难度分级:将训练数据按难度分为多个级别
- 渐进式引入:从简单数据开始训练,逐步加入复杂数据
- 自适应调度:根据模型表现动态调整课程进度
class CurriculumScheduler:
"""课程学习调度器"""
def __init__(self, difficulty_classifier):
self.classifier = difficulty_classifier
self.current_stage = 0
def create_curriculum(self, dataset, num_stages=4):
"""
创建课程计划
将数据分为多个难度阶段
"""
# 评估每个样本的难度
difficulties = []
for item in dataset:
difficulty_score = self.classifier.predict_difficulty(item)
difficulties.append({
**item,
"difficulty_score": difficulty_score
})
# 按难度排序
sorted_data = sorted(difficulties, key=lambda x: x["difficulty_score"])
# 分割成多个阶段
stage_size = len(sorted_data) // num_stages
curriculum = []
for i in range(num_stages):
start_idx = i * stage_size
end_idx = start_idx + stage_size if i < num_stages - 1 else len(sorted_data)
stage_data = sorted_data[start_idx:end_idx]
curriculum.append({
"stage": i + 1,
"name": self._get_stage_name(i, num_stages),
"samples": stage_data,
"difficulty_range": {
"min": stage_data[0]["difficulty_score"],
"max": stage_data[-1]["difficulty_score"]
},
"avg_difficulty": np.mean([s["difficulty_score"] for s in stage_data])
})
return curriculum
def _get_stage_name(self, stage_idx, total_stages):
"""获取阶段名称"""
names = {
0: "Foundation - Basic Concepts",
1: "Development - Core Skills",
2: "Advanced - Complex Applications",
3: "Expert - Edge Cases & Mastery"
}
if total_stages == 3:
names = {0: "基础", 1: "进阶", 2: "高级"}
elif total_stages == 5:
names = {0: "入门", 1: "基础", 2: "进阶", 3: "高级", 4: "专家"}
return names.get(stage_idx, f"Stage {stage_idx + 1}")
def get_next_stage_batch(self, model_performance):
"""
根据模型表现获取下一阶段数据
"""
if model_performance >= 0.85:
# 表现优秀,升级到下一阶段
self.current_stage = min(self.current_stage + 1, len(self.curriculum) - 1)
elif model_performance < 0.6:
# 表现不佳,回退到上一阶段
self.current_stage = max(self.current_stage - 1, 0)
return self.curriculum[self.current_stage]5.2 难度评估器
class DifficultyClassifier:
"""难度分类器"""
def __init__(self):
self.features = self._extract_difficulty_features()
def _extract_difficulty_features(self):
"""提取难度特征"""
return {
"length": self._length_difficulty,
"vocabulary": self._vocabulary_difficulty,
"structure": self._structure_difficulty,
"reasoning": self._reasoning_difficulty,
"domain": self._domain_difficulty
}
def predict_difficulty(self, item):
"""
预测样本难度
综合多个维度计算难度分数
"""
instruction = item.get("instruction", "")
response = item.get("response", "")
scores = {
"length": self._length_difficulty(instruction, response),
"vocabulary": self._vocabulary_difficulty(instruction),
"structure": self._structure_difficulty(instruction, response),
"reasoning": self._reasoning_difficulty(instruction, response),
"domain": self._domain_difficulty(instruction)
}
# 加权求和
weights = {
"length": 0.15,
"vocabulary": 0.15,
"structure": 0.20,
"reasoning": 0.35,
"domain": 0.15
}
difficulty_score = sum(
scores[feature] * weights[feature]
for feature in scores
)
return {
"score": difficulty_score,
"level": self._score_to_level(difficulty_score),
"breakdown": scores
}
def _length_difficulty(self, instruction, response):
"""基于长度评估难度"""
total_length = len(instruction) + len(response)
# 长度与难度正相关(到一定阈值后趋于平稳)
if total_length < 100:
return 0.2
elif total_length < 500:
return 0.4
elif total_length < 1000:
return 0.6
elif total_length < 2000:
return 0.8
else:
return 0.95
def _reasoning_difficulty(self, instruction, response):
"""基于推理复杂度评估"""
reasoning_keywords = [
"为什么", "如何", "解释", "分析", "推理",
"证明", "计算", "推导", "比较", "对比"
]
keyword_count = sum(
1 for kw in reasoning_keywords
if kw in instruction
)
# 多步推理检测
multi_step_indicators = ["首先", "其次", "然后", "最后", "因此", "所以", "从而"]
step_count = sum(
1 for ind in multi_step_indicators
if ind in response
)
base_score = min(keyword_count * 0.15, 0.5)
step_score = min(step_count * 0.1, 0.5)
return min(base_score + step_score, 1.0)
def _score_to_level(self, score):
"""将分数转换为难度级别"""
if score < 0.3:
return "easy"
elif score < 0.5:
return "medium"
elif score < 0.7:
return "hard"
else:
return "expert"六、数据质量vs数量
6.1 质量数量权衡理论
在数据增强中,质量与数量之间存在复杂的权衡关系。研究表明:
- Jensen不等式效应:低质量数据的平均损失不等于平均损失,错误数据可能放大梯度
- 噪声放大效应:模型在训练时会放大数据中的噪声,导致过度记忆错误模式
- 稀缺信号淹没:大量低质量数据可能淹没高质量数据中的关键信号
经验法则
在指令微调阶段,100条高质量数据(准确、多样、格式规范)往往优于10000条未经筛选的原始数据。
6.2 自适应增强策略
class AdaptiveAugmentationStrategy:
"""自适应增强策略"""
def __init__(self):
self.quality_model = None
self.current_policy = {}
def decide_augmentation(self, sample, target_quality=0.8):
"""
决定是否对样本进行增强以及增强策略
Returns:
augmentation_plan: 包含增强方法和强度
"""
quality = self._assess_quality(sample)
if quality >= target_quality:
# 高质量样本:轻度增强,保持原有特性
return {
"augment": True,
"methods": ["paraphrase_light"],
"intensity": 0.3,
"expected_quality": min(quality * 0.95, target_quality)
}
elif quality >= 0.5:
# 中等质量样本:适度增强,修复潜在问题
return {
"augment": True,
"methods": ["paraphrase_medium", "format_fix"],
"intensity": 0.6,
"expected_quality": min(quality * 1.2, target_quality)
}
else:
# 低质量样本:考虑丢弃或重大重构
return {
"augment": False,
"recommendation": "discard",
"reason": "quality_too_low",
"alternative": "regenerate_from_template"
}
def optimize_augmentation_ratio(self, dataset, budget=100000):
"""
优化增强数据比例
在给定预算下找到最优的数据增强组合
"""
quality_dist = self._analyze_quality_distribution(dataset)
# 策略:优先增强中等质量数据
# 低质量数据只增强少量,高质量数据适度扩充
allocation = {
"high_quality_original": int(budget * 0.3),
"high_quality_augmented": int(budget * 0.2),
"medium_quality_original": int(budget * 0.2),
"medium_quality_augmented": int(budget * 0.25),
"low_quality_regenerated": int(budget * 0.05)
}
return allocation