知识库评估体系

摘要

评估是RAG系统迭代优化的核心依据。本文档系统讲解RAGAS、Trulens等主流评估框架,深入解析检索指标(Recall、Precision、MRR、NDCG)和生成指标(Faithfulness、Answer Relevancy),并提供完整的评估代码实现与最佳实践。


关键词速览

术语英文核心概念
RAGASRAG AssessmentRAG系统专用评估框架
TrulensTruLens Evaluation多维度RAG评估工具
召回率Recall检索到相关文档的比例
精确率Precision检索结果中相关文档的比例
忠诚度Faithfulness生成内容与检索内容的一致性
答案相关性Answer Relevancy生成答案与问题的相关程度
MRRMean Reciprocal Rank平均倒数排名
NDCGNormalized DCG归一化折损累积增益
混合检索技术Hybrid Search多模态检索融合

一、评估体系概述

1.1 为什么需要评估

RAG系统的评估面临三大挑战:

挑战维度具体问题评估重点
检索质量是否找到正确文档?Recall、Precision、MRR
生成质量回答是否准确流畅?Faithfulness、Bleu、ROUGE
端到端效果整体是否满足用户需求?答案相关性、上下文利用率

1.2 评估指标体系

graph TB
    A[RAG评估] --> B[检索评估]
    A --> C[生成评估]
    A --> D[端到端评估]
    
    B --> B1[Recall@K]
    B --> B2[Precision@K]
    B --> B3[MRR]
    B --> B4[NDCG@K]
    
    C --> C1[Faithfulness]
    C --> C2[Answer Relevancy]
    C --> C3[Bleu/ROUGE]
    C --> C4[毒性检测]
    
    D --> D1[RAGAS]
    D --> D2[Trulens]
    D --> D3[人工评估]

二、检索评估指标

2.1 核心指标定义

指标公式说明理想值
Recall@KRelevant∩Retrieved / Relevant召回率越高越好
Precision@KRelevant∩Retrieved / Retrieved精确率权衡recall
MRRΣ1/rank(Relevant) / N平均倒数排名越接近1越好
NDCG@KDCG/IDCG归一化收益越接近1越好
MAPΣAP / N平均精确率越接近1越好

2.2 检索评估实现

import numpy as np
from typing import List, Dict, Tuple, Set
from dataclasses import dataclass
import math
 
@dataclass
class RetrievalMetrics:
    """检索评估指标容器"""
    recall: float
    precision: float
    mrr: float
    ndcg: float
    map_score: float
    f1: float
 
class RetrievalEvaluator:
    """检索评估器"""
    
    def __init__(self, k_values: List[int] = None):
        """
        Args:
            k_values: 评估的K值列表,如[1, 3, 5, 10]
        """
        self.k_values = k_values or [1, 3, 5, 10]
    
    def evaluate_single(
        self,
        retrieved_docs: List[str],
        relevant_docs: Set[str],
        k: int = None
    ) -> Dict[str, float]:
        """
        评估单个查询的检索效果
        
        Args:
            retrieved_docs: 检索返回的文档ID列表(有序)
            relevant_docs: 实际相关的文档ID集合
            k: 截取前k个结果评估
        
        Returns:
            各指标得分
        """
        if k:
            retrieved_docs = retrieved_docs[:k]
        
        retrieved_set = set(retrieved_docs)
        
        # 计算交集
        true_positives = len(retrieved_set & relevant_docs)
        
        # Recall
        recall = true_positives / len(relevant_docs) if relevant_docs else 0.0
        
        # Precision
        precision = true_positives / len(retrieved_docs) if retrieved_docs else 0.0
        
        # F1
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
        
        # MRR
        mrr = 0.0
        for i, doc_id in enumerate(retrieved_docs):
            if doc_id in relevant_docs:
                mrr = 1.0 / (i + 1)
                break
        
        # NDCG
        ndcg = self._calculate_ndcg(retrieved_docs, relevant_docs)
        
        # Average Precision
        ap = self._calculate_ap(retrieved_docs, relevant_docs)
        
        return {
            'recall': recall,
            'precision': precision,
            'f1': f1,
            'mrr': mrr,
            'ndcg': ndcg,
            'average_precision': ap
        }
    
    def evaluate_batch(
        self,
        results: List[Tuple[List[str], Set[str]]],
        k: int = None
    ) -> Dict[str, float]:
        """
        批量评估
        
        Args:
            results: [(retrieved_docs, relevant_docs), ...]
            k: 评估的K值
        
        Returns:
            平均指标
        """
        metrics = {
            'recall': [],
            'precision': [],
            'f1': [],
            'mrr': [],
            'ndcg': [],
            'average_precision': []
        }
        
        for retrieved, relevant in results:
            result = self.evaluate_single(retrieved, relevant, k)
            for key in metrics:
                metrics[key].append(result[key])
        
        # 计算平均值
        avg_metrics = {
            f'{key}@{k}' if k else key: np.mean(values)
            for key, values in metrics.items()
        }
        
        # 添加标准差
        for key, values in metrics.items():
            key_name = f'{key}@{k}' if k else key
            avg_metrics[f'{key_name}_std'] = np.std(values)
        
        return avg_metrics
    
    def _calculate_ndcg(
        self,
        retrieved_docs: List[str],
        relevant_docs: Set[str],
        k: int = None
    ) -> float:
        """计算NDCG"""
        if k:
            retrieved_docs = retrieved_docs[:k]
        
        # DCG
        dcg = 0.0
        for i, doc_id in enumerate(retrieved_docs):
            relevance = 1.0 if doc_id in relevant_docs else 0.0
            dcg += relevance / math.log2(i + 2)
        
        # IDCG (理想情况)
        num_relevant = min(len(relevant_docs), len(retrieved_docs))
        idcg = sum(1.0 / math.log2(i + 2) for i in range(num_relevant))
        
        return dcg / idcg if idcg > 0 else 0.0
    
    def _calculate_ap(
        self,
        retrieved_docs: List[str],
        relevant_docs: Set[str]
    ) -> float:
        """计算Average Precision"""
        num_relevant = 0
        sum_precision = 0.0
        
        for i, doc_id in enumerate(retrieved_docs):
            if doc_id in relevant_docs:
                num_relevant += 1
                precision_at_i = num_relevant / (i + 1)
                sum_precision += precision_at_i
        
        return sum_precision / len(relevant_docs) if relevant_docs else 0.0
 
 
class DiversityEvaluator:
    """多样性评估器"""
    
    def evaluate_diversity(
        self,
        retrieved_docs: List[Dict],
        diversity_threshold: float = 0.5
    ) -> Dict[str, float]:
        """
        评估检索结果的多样性
        
        Args:
            retrieved_docs: 检索结果列表
            diversity_threshold: 多样性阈值
        
        Returns:
            多样性指标
        """
        if len(retrieved_docs) <= 1:
            return {'diversity_score': 1.0, 'unique_ratio': 1.0}
        
        # 计算嵌入多样性
        embeddings = [doc.get('embedding') for doc in retrieved_docs if 'embedding' in doc]
        
        if len(embeddings) < 2:
            return {'diversity_score': 0.5, 'unique_ratio': 0.8}
        
        # 计算 pairwise 相似度
        similarities = []
        for i in range(len(embeddings)):
            for j in range(i + 1, len(embeddings)):
                sim = self._cosine_similarity(embeddings[i], embeddings[j])
                similarities.append(sim)
        
        avg_similarity = np.mean(similarities)
        diversity_score = 1.0 - avg_similarity
        
        # 计算类别多样性
        categories = [doc.get('category') for doc in retrieved_docs]
        unique_categories = len(set(categories))
        unique_ratio = unique_categories / len(categories) if categories else 0
        
        return {
            'diversity_score': diversity_score,
            'unique_ratio': unique_ratio,
            'avg_pairwise_similarity': avg_similarity,
            'unique_categories': unique_categories
        }
    
    def _cosine_similarity(self, v1, v2) -> float:
        """计算余弦相似度"""
        dot = np.dot(v1, v2)
        norm1 = np.linalg.norm(v1)
        norm2 = np.linalg.norm(v2)
        return dot / (norm1 * norm2) if norm1 * norm2 > 0 else 0

三、生成评估指标

3.1 Faithfulness(忠诚度)

定义:生成内容与检索上下文中事实的一致性程度。

class FaithfulnessEvaluator:
    """忠诚度评估器"""
    
    def __init__(
        self,
        llm_client = None,
        embedding_model = None
    ):
        self.llm = llm_client
        self.embedding = embedding_model
    
    def evaluate(
        self,
        question: str,
        context: List[str],
        answer: str
    ) -> Dict[str, any]:
        """
        评估答案的忠诚度
        
        Args:
            question: 用户问题
            context: 检索到的上下文
            answer: 生成的答案
        
        Returns:
            评估结果 {score, claims, supported_claims, unsupported_claims}
        """
        # Step 1: 从答案中提取声明
        claims = self._extract_claims(answer)
        
        # Step 2: 检查每个声明是否被上下文支持
        supported = []
        unsupported = []
        
        for claim in claims:
            is_supported = self._check_claim_support(claim, context)
            if is_supported:
                supported.append(claim)
            else:
                unsupported.append(claim)
        
        # Step 3: 计算忠诚度分数
        score = len(supported) / len(claims) if claims else 1.0
        
        return {
            'score': score,
            'total_claims': len(claims),
            'supported_claims': supported,
            'unsupported_claims': unsupported,
            'faithfulness_level': self._get_faithfulness_level(score)
        }
    
    def _extract_claims(self, text: str) -> List[str]:
        """从文本中提取声明语句"""
        if self.llm:
            prompt = f"""
从以下文本中提取所有可验证的事实声明。每条声明应该是一个完整的陈述句。
 
文本:{text}
 
请以JSON格式返回声明列表:
{{"claims": ["声明1", "声明2", ...]}}
"""
            response = self.llm.generate(prompt)
            return json.loads(response.content)['claims']
        
        # 简单规则提取
        sentences = text.replace('。', '.\n').split('\n')
        return [s.strip() for s in sentences if len(s.strip()) > 10]
    
    def _check_claim_support(
        self,
        claim: str,
        context: List[str]
    ) -> bool:
        """检查声明是否被上下文支持"""
        if self.llm:
            prompt = f"""
给定以下上下文和声明,判断该声明是否可以从上下文中推断出来。
 
上下文:
{chr(10).join(context)}
 
声明:{claim}
 
请回答:支持 或 不支持
"""
            response = self.llm.generate(prompt).strip()
            return '支持' in response
        
        # 基于关键词的简单匹配
        claim_words = set(claim.lower().split())
        context_text = ' '.join(context).lower()
        
        overlap = len(claim_words & set(context_text.split()))
        return overlap / len(claim_words) > 0.5
    
    def _get_faithfulness_level(self, score: float) -> str:
        """获取忠诚度等级"""
        if score >= 0.9:
            return "Excellent"
        elif score >= 0.7:
            return "Good"
        elif score >= 0.5:
            return "Fair"
        else:
            return "Poor"

3.2 Answer Relevancy(答案相关性)

class AnswerRelevancyEvaluator:
    """答案相关性评估器"""
    
    def __init__(
        self,
        llm_client = None,
        embedding_model = None
    ):
        self.llm = llm_client
        self.embedding = embedding_model
    
    def evaluate(
        self,
        question: str,
        answer: str,
        num_questions: int = 3
    ) -> Dict[str, any]:
        """
        评估答案与问题的相关性
        
        Args:
            question: 用户问题
            answer: 生成的答案
            num_questions: 生成的逆向问题数量
        
        Returns:
            评估结果
        """
        # Step 1: 从答案生成逆向问题
        generated_questions = self._generate_reverse_questions(
            question, answer, num_questions
        )
        
        # Step 2: 计算问题与原问题的语义相似度
        question_embedding = self.embedding.encode(question)
        
        similarities = []
        for gen_q in generated_questions:
            gen_embedding = self.embedding.encode(gen_q)
            sim = self._cosine_similarity(question_embedding, gen_embedding)
            similarities.append(sim)
        
        # Step 3: 计算相关性分数
        avg_similarity = np.mean(similarities)
        score = avg_similarity
        
        return {
            'score': score,
            'generated_questions': generated_questions,
            'similarities': similarities,
            'relevancy_level': self._get_relevancy_level(score)
        }
    
    def _generate_reverse_questions(
        self,
        question: str,
        answer: str,
        num: int
    ) -> List[str]:
        """从答案生成可以推导出原问题的问题"""
        prompt = f"""
给定原始问题和答案,请生成{num}个能够从该答案中推导出的问题。
 
原始问题:{question}
答案:{answer}
 
要求:
1. 生成的问题应该与原问题语义相似
2. 问题应该能够被给定的答案所回答
3. 格式:一个问题一行
 
生成的问题:
"""
        response = self.llm.generate(prompt)
        questions = [q.strip() for q in response.split('\n') if q.strip()]
        return questions[:num]
    
    def _cosine_similarity(self, v1, v2) -> float:
        """余弦相似度"""
        return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
    
    def _get_relevancy_level(self, score: float) -> str:
        """获取相关性等级"""
        if score >= 0.85:
            return "Highly Relevant"
        elif score >= 0.7:
            return "Relevant"
        elif score >= 0.5:
            return "Partially Relevant"
        else:
            return "Irrelevant"

四、RAGAS框架

4.1 RAGAS概述

RAGAS(Retrieval-Augmented Generation Assessment)是专门为RAG系统设计的评估框架,提供:

指标评估维度数据需求
context_relevance检索相关性question, context
faithfulness忠诚度question, context, answer
answer_relevancy答案相关性question, answer
context_precision上下文精确度question, context

4.2 RAGAS实现

from typing import List, Dict, Optional, Tuple
import numpy as np
import json
 
class RAGASEvaluator:
    """RAGAS评估框架实现"""
    
    def __init__(
        self,
        llm_client,
        embedding_model
    ):
        self.llm = llm_client
        self.embedding = embedding_model
    
    def evaluate(
        self,
        question: str,
        answer: str,
        contexts: List[str],
        ground_truth: Optional[str] = None
    ) -> Dict[str, float]:
        """
        执行完整的RAGAS评估
        
        Args:
            question: 用户问题
            answer: 生成的答案
            contexts: 检索到的上下文列表
            ground_truth: 标准答案(可选)
        
        Returns:
            各维度评估分数
        """
        results = {}
        
        # 1. Context Relevance(上下文相关性)
        context_relevance = self._evaluate_context_relevance(question, contexts)
        results['context_relevance'] = context_relevance
        
        # 2. Context Precision(上下文精确度)
        context_precision = self._evaluate_context_precision(question, contexts)
        results['context_precision'] = context_precision
        
        # 3. Faithfulness(忠诚度)
        faithfulness = self._evaluate_faithfulness(question, contexts, answer)
        results['faithfulness'] = faithfulness
        
        # 4. Answer Relevancy(答案相关性)
        answer_relevancy = self._evaluate_answer_relevancy(question, answer)
        results['answer_relevancy'] = answer_relevancy
        
        # 5. Context Recall(上下文召回率,依赖ground_truth)
        if ground_truth:
            context_recall = self._evaluate_context_recall(
                ground_truth, contexts
            )
            results['context_recall'] = context_recall
        
        # 计算综合得分
        results['overall_score'] = np.mean([
            results['context_relevance'],
            results['faithfulness'],
            results['answer_relevancy']
        ])
        
        return results
    
    def _evaluate_context_relevance(
        self,
        question: str,
        contexts: List[str]
    ) -> float:
        """
        评估上下文与问题的相关性
        
        原理:使用LLM评估每个上下文片段对回答问题的必要程度
        """
        prompt = f"""
评估以下上下文对回答问题的必要程度。
 
问题:{question}
 
上下文:
{chr(10).join(f'{i+1}. {ctx}' for i, ctx in enumerate(contexts))}
 
请评估每个上下文片段的必要性(0-1分):
- 1: 完全必要,包含关键信息
- 0.5: 部分相关,有辅助作用
- 0: 不相关,不包含有用信息
 
请以JSON格式返回:
{{"scores": [0.8, 0.9, 0.3], "reasoning": "..."}}
"""
        response = self.llm.generate(prompt)
        result = json.loads(response.content)
        
        scores = result.get('scores', [])
        return np.mean(scores) if scores else 0.0
    
    def _evaluate_context_precision(
        self,
        question: str,
        contexts: List[str]
    ) -> float:
        """
        评估上下文的排序精确度
        
        原理:越相关的上下文应该排在越前面
        """
        if len(contexts) <= 1:
            return 1.0
        
        # 计算每个上下文的相关性
        relevancies = []
        for ctx in contexts:
            rel = self._compute_similarity(question, ctx)
            relevancies.append(rel)
        
        # 计算精确度指标(考虑位置权重)
        precision_scores = []
        for i, rel in enumerate(relevancies):
            position_weight = 1.0 / np.log2(i + 2)
            precision_scores.append(rel * position_weight)
        
        return np.sum(precision_scores)
    
    def _evaluate_faithfulness(
        self,
        question: str,
        contexts: List[str],
        answer: str
    ) -> float:
        """评估答案忠诚度"""
        combined_context = '\n'.join(contexts)
        
        prompt = f"""
评估答案是否忠实于给定的上下文。
 
上下文:
{combined_context}
 
问题:{question}
 
答案:{answer}
 
请从答案中提取所有关键声明,并判断每个声明是否可以从上下文中推导出来。
计算忠诚度分数(支持的声明数 / 总声明数)。
 
请以JSON格式返回:
{{
    "total_claims": 5,
    "supported_claims": 4,
    "faithfulness_score": 0.8,
    "unsupported_details": ["..."]
}}
"""
        response = self.llm.generate(prompt)
        result = json.loads(response.content)
        
        return result.get('faithfulness_score', 0.0)
    
    def _evaluate_answer_relevancy(
        self,
        question: str,
        answer: str
    ) -> float:
        """评估答案相关性"""
        prompt = f"""
从给定的答案生成几个可以由该答案回答的问题,然后评估这些生成的问题与原问题的相似度。
 
原问题:{question}
 
答案:{answer}
 
步骤:
1. 从答案生成3个可能的逆向问题
2. 计算每个逆向问题与原问题的语义相似度
3. 返回平均相似度作为答案相关性分数
 
请以JSON格式返回:
{{
    "generated_questions": ["问题1", "问题2", "问题3"],
    "similarities": [0.9, 0.85, 0.88],
    "answer_relevancy_score": 0.876
}}
"""
        response = self.llm.generate(prompt)
        result = json.loads(response.content)
        
        return result.get('answer_relevancy_score', 0.0)
    
    def _evaluate_context_recall(
        self,
        ground_truth: str,
        contexts: List[str]
    ) -> float:
        """评估上下文对标准答案的召回率"""
        combined_context = '\n'.join(contexts)
        
        prompt = f"""
评估上下文是否包含了回答问题所需的全部关键信息。
 
标准答案/期望内容:{ground_truth}
 
上下文:
{combined_context}
 
请评估上下文覆盖了标准答案中关键信息的比例(0-1)。
 
请以JSON格式返回:
{{
    "covered_aspects": ["..."],
    "missing_aspects": ["..."],
    "context_recall_score": 0.75
}}
"""
        response = self.llm.generate(prompt)
        result = json.loads(response.content)
        
        return result.get('context_recall_score', 0.0)
    
    def _compute_similarity(self, text1: str, text2: str) -> float:
        """计算文本相似度"""
        emb1 = self.embedding.encode(text1)
        emb2 = self.embedding.encode(text2)
        return self._cosine_similarity(emb1, emb2)
    
    def _cosine_similarity(self, v1, v2) -> float:
        return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
 
 
class RAGASEvaluatorBatch:
    """批量RAGAS评估"""
    
    def __init__(self, ragas_evaluator: RAGASEvaluator):
        self.evaluator = ragas_evaluator
    
    def evaluate_dataset(
        self,
        dataset: List[Dict],
        progress_callback=None
    ) -> Dict[str, any]:
        """
        批量评估数据集
        
        Args:
            dataset: 数据集,每项包含 question, answer, contexts, ground_truth
            progress_callback: 进度回调函数
        
        Returns:
            批量评估结果统计
        """
        all_results = []
        
        for i, item in enumerate(dataset):
            result = self.evaluator.evaluate(
                question=item['question'],
                answer=item['answer'],
                contexts=item['contexts'],
                ground_truth=item.get('ground_truth')
            )
            
            result['question_id'] = item.get('id', i)
            all_results.append(result)
            
            if progress_callback:
                progress_callback(i + 1, len(dataset))
        
        # 汇总统计
        metrics = ['context_relevance', 'context_precision', 
                   'faithfulness', 'answer_relevancy', 'overall_score']
        
        summary = {
            'total_samples': len(all_results),
            'metrics': {}
        }
        
        for metric in metrics:
            if metric in all_results[0]:
                values = [r[metric] for r in all_results if metric in r]
                summary['metrics'][metric] = {
                    'mean': np.mean(values),
                    'std': np.std(values),
                    'min': np.min(values),
                    'max': np.max(values),
                    'median': np.median(values)
                }
        
        # 识别问题样本
        low_performance = [
            r for r in all_results
            if r.get('overall_score', 1.0) < 0.5
        ]
        summary['low_performance_samples'] = low_performance
        
        return {
            'summary': summary,
            'detailed_results': all_results
        }

五、Trulens框架

5.1 Trulens概述

Trulens是AnotherAI出品的RAG评估框架,提供更细粒度的追踪和评估能力:

特性说明
LangSmith集成与LangChain生态无缝对接
异步评估支持大规模并发评估
仪表盘提供可视化分析界面
自定义回调支持灵活的评估扩展

5.2 Trulens实现

from typing import List, Dict, Optional, Callable, Any
from dataclasses import dataclass, field
from datetime import datetime
import time
import json
 
@dataclass
class TrulensRecord:
    """Trulens评估记录"""
    record_id: str
    timestamp: datetime
    inputs: Dict[str, Any]
    outputs: Dict[str, Any]
    costs: Dict[str, float] = field(default_factory=dict)
    latencies: Dict[str, float] = field(default_factory=dict)
    metrics: Dict[str, float] = field(default_factory=dict)
    feedback_results: List[Dict] = field(default_factory=list)
 
class TruLensFeedback:
    """Trulens风格反馈函数"""
    
    def __init__(self, llm_client, embedding_model):
        self.llm = llm_client
        self.embedding = embedding_model
    
    def groundedness(self, context: str, claim: str) -> float:
        """评估声明的真实性(基于上下文)"""
        prompt = f"""
评估以下声明是否可以从提供的上下文中得到支持。
 
上下文:{context}
 
声明:{claim}
 
请评估:
- 完全支持:1.0
- 部分支持:0.5
- 不支持:0.0
 
回答:"""
        response = self.llm.generate(prompt)
        
        try:
            score = float(response.strip())
            return max(0.0, min(1.0, score))
        except:
            return 0.5
    
    def answer_relevance(self, question: str, answer: str) -> float:
        """评估答案与问题的相关性"""
        prompt = f"""
评估答案对问题的相关程度。
 
问题:{question}
 
答案:{answer}
 
请评分(0-1):
- 1.0: 完全相关,直接回答问题
- 0.5: 部分相关,提供了有帮助但不完全的信息
- 0.0: 不相关,没有回答问题
 
回答:"""
        response = self.llm.generate(prompt)
        
        try:
            score = float(response.strip())
            return max(0.0, min(1.0, score))
        except:
            return 0.5
    
    def context_relevance(
        self,
        question: str,
        context: str
    ) -> float:
        """评估上下文的相关性"""
        emb_q = self.embedding.encode(question)
        emb_c = self.embedding.encode(context)
        
        return self._cosine_similarity(emb_q, emb_c)
    
    def harmfulness(self, text: str) -> float:
        """检测有害内容"""
        harmful_keywords = [
            '暴力', '色情', '歧视', '仇恨',
            'illegal', 'harmful', 'dangerous'
        ]
        
        text_lower = text.lower()
        for keyword in harmful_keywords:
            if keyword in text_lower:
                return 0.8
        
        return 0.0
    
    def misogyny(self, text: str) -> float:
        """检测性别歧视"""
        prompt = f"""
评估以下文本是否包含性别歧视或厌女内容。
 
文本:{text}
 
请评分(0-1):
- 0.0: 无性别歧视
- 0.5: 轻微偏见或刻板印象
- 1.0: 明显性别歧视
 
回答:"""
        response = self.llm.generate(prompt)
        
        try:
            score = float(response.strip())
            return max(0.0, min(1.0, score))
        except:
            return 0.0
 
 
class TruLens:
    """Trulens评估器"""
    
    def __init__(
        self,
        llm_client = None,
        embedding_model = None
    ):
        self.feedback = TruLensFeedback(llm_client, embedding_model)
        self.records: List[TrulensRecord] = []
        self.feedback_functions = self._register_default_feedback()
    
    def _register_default_feedback(self) -> Dict[str, Callable]:
        """注册默认反馈函数"""
        return {
            'groundedness': lambda ctx, claim: self.feedback.groundedness(ctx, claim),
            'answer_relevance': lambda q, a: self.feedback.answer_relevance(q, a),
            'context_relevance': lambda q, c: self.feedback.context_relevance(q, c),
            'harmfulness': lambda t: self.feedback.harmfulness(t),
            'misogyny': lambda t: self.feedback.misogyny(t)
        }
    
    def register_feedback(
        self,
        name: str,
        func: Callable
    ):
        """注册自定义反馈函数"""
        self.feedback_functions[name] = func
    
    def trace(
        self,
        func: Callable,
        record_id: str = None
    ):
        """追踪函数执行"""
        def wrapper(*args, **kwargs):
            record_id = record_id or f"record_{int(time.time()*1000)}"
            record = TrulensRecord(
                record_id=record_id,
                timestamp=datetime.now(),
                inputs={'args': args, 'kwargs': kwargs},
                outputs={}
            )
            
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                record.outputs['result'] = result
                record.outputs['success'] = True
                return result
            except Exception as e:
                record.outputs['error'] = str(e)
                record.outputs['success'] = False
                raise
            finally:
                record.latencies['total'] = time.time() - start_time
        
        return wrapper
    
    def evaluate(
        self,
        question: str,
        answer: str,
        contexts: List[str],
        ground_truth: str = None,
        feedback_names: List[str] = None
    ) -> Dict[str, float]:
        """
        执行评估
        
        Args:
            question: 问题
            answer: 答案
            contexts: 上下文列表
            ground_truth: 标准答案
            feedback_names: 要执行的反馈函数名
        
        Returns:
            各反馈函数的评估结果
        """
        if feedback_names is None:
            feedback_names = ['context_relevance', 'answer_relevance', 'groundedness']
        
        results = {}
        combined_context = '\n'.join(contexts)
        
        for name in feedback_names:
            if name == 'context_relevance':
                # 计算所有上下文的相关性
                scores = [self.feedback.context_relevance(question, ctx) for ctx in contexts]
                results[name] = np.mean(scores)
            
            elif name == 'answer_relevance':
                results[name] = self.feedback.answer_relevance(question, answer)
            
            elif name == 'groundedness':
                # 评估答案对上下文的忠诚度
                results[name] = self.feedback.groundedness(combined_context, answer)
            
            elif name in self.feedback_functions:
                results[name] = self.feedback_functions[name](combined_context, answer)
        
        # 计算综合得分
        results['overall'] = np.mean(list(results.values()))
        
        return results
    
    def get_leaderboard(
        self,
        records: List[TrulensRecord]
    ) -> Dict[str, Dict]:
        """生成评估排行榜"""
        leaderboard = {
            'overall': [],
            'by_metric': {name: [] for name in self.feedback_functions.keys()}
        }
        
        for record in records:
            if 'overall' in record.metrics:
                leaderboard['overall'].append({
                    'record_id': record.record_id,
                    'score': record.metrics['overall']
                })
            
            for metric, value in record.metrics.items():
                if metric in leaderboard['by_metric']:
                    leaderboard['by_metric'][metric].append({
                        'record_id': record.record_id,
                        'score': value
                    })
        
        # 排序
        for key in leaderboard['overall']:
            leaderboard['overall'].sort(key=lambda x: x['score'], reverse=True)
        
        return leaderboard

六、综合评估实践

6.1 评估数据集构建

class EvaluationDatasetBuilder:
    """评估数据集构建器"""
    
    def __init__(self):
        self.samples = []
    
    def add_sample(
        self,
        question: str,
        ground_truth_contexts: List[str],
        ground_truth_answer: str,
        difficulty: str = "medium",
        category: str = "general"
    ):
        """添加评估样本"""
        self.samples.append({
            'question': question,
            'ground_truth_contexts': ground_truth_contexts,
            'ground_truth_answer': ground_truth_answer,
            'difficulty': difficulty,
            'category': category,
            'id': len(self.samples)
        })
    
    def save(self, path: str):
        """保存数据集"""
        with open(path, 'w', encoding='utf-8') as f:
            json.dump(self.samples, f, ensure_ascii=False, indent=2)
    
    def load(self, path: str):
        """加载数据集"""
        with open(path, 'r', encoding='utf-8') as f:
            self.samples = json.load(f)
    
    def split(
        self,
        train_ratio: float = 0.8
    ) -> Tuple[List[Dict], List[Dict]]:
        """分割训练/测试集"""
        import random
        random.shuffle(self.samples)
        
        split_idx = int(len(self.samples) * train_ratio)
        return self.samples[:split_idx], self.samples[split_idx:]
 
 
class A/BTesting:
    """A/B测试比较器"""
    
    def __init__(self):
        self.variants = {}
    
    def add_variant(
        self,
        name: str,
        retriever,
        generator = None
    ):
        """添加测试变体"""
        self.variants[name] = {
            'retriever': retriever,
            'generator': generator,
            'results': []
        }
    
    def run_test(
        self,
        evaluator: RAGASEvaluator,
        test_samples: List[Dict],
        num_samples: int = 100
    ) -> Dict[str, Dict]:
        """
        运行A/B测试
        
        Args:
            evaluator: 评估器
            test_samples: 测试样本
            num_samples: 测试样本数
        
        Returns:
            各变体的评估结果对比
        """
        test_set = test_samples[:num_samples]
        comparison_results = {}
        
        for variant_name, variant in self.variants.items():
            print(f"Testing variant: {variant_name}")
            
            results = []
            for sample in test_set:
                # 检索
                retrieved = variant['retriever'].search(
                    sample['question'],
                    top_k=5
                )
                
                # 评估检索质量
                eval_result = evaluator.evaluate(
                    question=sample['question'],
                    answer=sample.get('ground_truth_answer', ''),
                    contexts=[r['content'] for r in retrieved],
                    ground_truth=sample.get('ground_truth_answer')
                )
                
                results.append(eval_result)
            
            # 汇总
            comparison_results[variant_name] = self._aggregate_results(results)
        
        return comparison_results
    
    def _aggregate_results(
        self,
        results: List[Dict]
    ) -> Dict[str, float]:
        """聚合结果"""
        metrics = ['context_relevance', 'context_precision', 
                   'faithfulness', 'answer_relevancy', 'overall_score']
        
        aggregated = {}
        for metric in metrics:
            if metric in results[0]:
                values = [r[metric] for r in results]
                aggregated[metric] = {
                    'mean': np.mean(values),
                    'std': np.std(values),
                    'p95': np.percentile(values, 95)
                }
        
        return aggregated

6.2 评估报告生成

class EvaluationReportGenerator:
    """评估报告生成器"""
    
    def __init__(self):
        self.template = self._get_template()
    
    def _get_template(self) -> str:
        return """
# RAG系统评估报告
 
## 执行摘要
 
- **评估时间**: {timestamp}
- **测试样本数**: {num_samples}
- **综合得分**: {overall_score:.2%}
 
## 一、检索性能
 
| 指标 | 平均值 | 标准差 | 最小值 | 最大值 |
|------|--------|--------|--------|--------|
| Recall@5 | {recall_mean:.2%} | {recall_std:.2%} | {recall_min:.2%} | {recall_max:.2%} |
| Precision@5 | {precision_mean:.2%} | {precision_std:.2%} | {precision_min:.2%} | {precision_max:.2%} |
| MRR | {mrr_mean:.2%} | {mrr_std:.2%} | {mrr_min:.2%} | {mrr_max:.2%} |
| NDCG@5 | {ndcg_mean:.2%} | {ndcg_std:.2%} | {ndcg_min:.2%} | {ndcg_max:.2%} |
 
## 二、生成性能
 
| 指标 | 平均值 | 标准差 | 最小值 | 最大值 |
|------|--------|--------|--------|--------|
| 忠诚度 | {faith_mean:.2%} | {faith_std:.2%} | {faith_min:.2%} | {faith_max:.2%} |
| 答案相关性 | {relevancy_mean:.2%} | {relevancy_std:.2%} | {relevancy_min:.2%} | {relevancy_max:.2%} |
 
## 三、问题样本分析
 
### 低分样本
{low_score_samples}
 
## 四、改进建议
 
{recommendations}
 
## 五、附录:详细数据
 
<details>
<summary>点击展开完整数据</summary>
 
```json
{detailed_data}
"""
def generate(
    self,
    evaluation_results: Dict,
    low_performance_samples: List[Dict] = None
) -> str:
    """生成评估报告"""
    
    metrics = evaluation_results.get('summary', {}).get('metrics', {})
    
    def get_metric_values(metric_name: str) -> Tuple:
        if metric_name in metrics:
            m = metrics[metric_name]
            return m['mean'], m['std'], m['min'], m['max']
        return 0, 0, 0, 0
    
    recall_vals = get_metric_values('context_relevance')
    faith_vals = get_metric_values('faithfulness')
    relevancy_vals = get_metric_values('answer_relevancy')
    
    # 生成建议
    recommendations = self._generate_recommendations(metrics)
    
    # 生成低分样本列表
    low_samples_text = ""
    if low_performance_samples:
        for i, sample in enumerate(low_performance_samples[:5], 1):
            low_samples_text += f"\n### {i}. 问题 #{sample.get('id', 'N/A')}\n"
            low_samples_text += f"- 问题: {sample.get('question', '')[:100]}...\n"
            low_samples_text += f"- 综合得分: {sample.get('overall_score', 0):.2%}\n"
    
    report = self.template.format(
        timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        num_samples=evaluation_results.get('summary', {}).get('total_samples', 0),
        overall_score=metrics.get('overall_score', {}).get('mean', 0),
        recall_mean=recall_vals[0],
        recall_std=recall_vals[1],
        recall_min=recall_vals[2],
        recall_max=recall_vals[3],
        precision_mean=0,
        precision_std=0,
        precision_min=0,
        precision_max=0,
        mrr_mean=0,
        mrr_std=0,
        mrr_min=0,
        mrr_max=0,
        ndcg_mean=0,
        ndcg_std=0,
        ndcg_min=0,
        ndcg_max=0,
        faith_mean=faith_vals[0],
        faith_std=faith_vals[1],
        faith_min=faith_vals[2],
        faith_max=faith_vals[3],
        relevancy_mean=relevancy_vals[0],
        relevancy_std=relevancy_vals[1],
        relevancy_min=relevancy_vals[2],
        relevancy_max=relevancy_vals[3],
        low_score_samples=low_samples_text or "无低分样本",
        recommendations=recommendations,
        detailed_data=json.dumps(evaluation_results, indent=2, ensure_ascii=False)
    )
    
    return report

def _generate_recommendations(self, metrics: Dict) -> str:
    """根据评估结果生成改进建议"""
    recommendations = []
    
    if metrics.get('context_relevance', {}).get('mean', 1) < 0.6:
        recommendations.append(
            "1. **检索优化**:上下文相关性偏低,建议优化检索策略或调整chunk大小"
        )
    
    if metrics.get('faithfulness', {}).get('mean', 1) < 0.7:
        recommendations.append(
            "2. **生成优化**:答案忠诚度偏低,可能存在幻觉问题,建议加强RAG约束"
        )
    
    if metrics.get('answer_relevancy', {}).get('mean', 1) < 0.6:
        recommendations.append(
            "3. **问答匹配**:答案相关性偏低,建议优化prompt或调整生成参数"
        )
    
    if not recommendations:
        recommendations.append("系统表现良好,继续保持当前配置")
    
    return '\n'.join(recommendations)

---

## 七、相关文档

- [[混合检索技术]] - 检索评估依赖的检索技术
- [[查询改写与扩展]] - 查询处理对评估的影响
- [[Agentic_RAG]] - 自适应RAG的评估需求
- [[向量数据库]] - 评估指标存储
- [[知识图谱构建]] - 结构化知识的评估

---

> [!note] 更新记录
> 
> - 2026-04-18:初版创建,整合RAGAS、Trulens及传统检索指标