重排技术深度指南:让搜索结果更靠谱

这篇文章解决什么问题

检索阶段找到了一堆相关文档,但排序乱七八糟,好的排在后面,差的反而排前面怎么办?重排技术就是来解决这个”排序混乱”问题的。

前言:检索和排序是两回事

先说个我踩过的坑。

做知识库问答的时候,用向量检索找到了20个相关文档。理论上应该把最相关的排在前面,但实际上:

排名第1的:一篇文章的摘要,和问题有那么点关系 排名第5的:直接回答了用户问题的好内容 排名第15的:相关性很差,但包含了一些关键词

你说用户看到排名第1的结果不是自己想要的,是不是直接骂街?

后来学会了重排技术,加了一个排序步骤,效果立竿见影。

一、为什么需要重排

1.1 检索和排序的区别

打个比方:

检索就像海选:找到所有可能相关的候选人(速度要快) 排序就像终选:从候选人里选出最合适的(质量要高)

向量检索的目的是”快”,快速从海量文档里找到可能相关的top-k。

但”可能相关”不等于”最相关”,需要更精细的排序来筛选。

1.2 向量检索的局限性

向量检索用的余弦相似度有很多问题:

问题说明例子
语义漂移表面相似但实际不相关”如何煮咖啡”匹配到”咖啡店经营”
关键词权重关键词命中不等于真正需要”Python教程”匹配到讲Python人生的文章
长文档劣势长文档向量容易被稀释一篇万字文章包含了一句话的答案反而排不上

1.3 重排的价值

重排的核心价值

  1. 精细化评估:考虑更多维度的相关性
  2. 上下文理解:理解查询和文档的真正关系
  3. 质量提升:把最好的结果排在最前面

二、重排的核心技术

2.1 Bi-Encoder:双塔模型

原理:查询和文档分别编码,在向量空间里比较相似度

查询 → [Encoder] → 向量Q
文档 → [Encoder] → 向量D
相似度 = cosine(向量Q, 向量D)

特点

  • 速度快:可以预先计算文档向量
  • 精度一般:无法捕获查询和文档的细粒度交互
  • 适合初筛
class BiEncoderReranker:
    """
    Bi-Encoder重排
    使用预计算的文档向量
    """
    
    def __init__(self, encoder, device="cpu"):
        self.encoder = encoder
        self.device = device
        self.doc_embeddings = {}  # doc_id -> embedding
    
    def index(self, documents):
        """预先计算文档向量"""
        for doc in documents:
            embedding = self.encoder.encode(doc.content)
            self.doc_embeddings[doc.id] = embedding
    
    def rerank(self, query, candidates, top_k=10):
        """重排候选文档"""
        
        # 编码查询
        query_vector = self.encoder.encode(query)
        
        # 计算每个候选文档的相似度
        scores = []
        for doc in candidates:
            doc_vector = self.doc_embeddings.get(doc.id)
            if doc_vector is None:
                # 动态编码
                doc_vector = self.encoder.encode(doc.content)
            
            score = cosine_similarity(query_vector, doc_vector)
            scores.append((doc, score))
        
        # 按分数排序
        scores.sort(key=lambda x: x[1], reverse=True)
        
        return scores[:top_k]

2.2 Cross-Encoder:交叉编码

原理:查询和文档一起输入,直接输出相关性分数

[Query; Document] → [Cross-Encoder] → 相似度分数

特点

  • 精度高:捕获查询和文档的细粒度交互
  • 速度慢:每个(query, doc)对都需要单独计算
  • 适合精排
class CrossEncoderReranker:
    """
    Cross-Encoder重排
    精度高但速度慢
    """
    
    def __init__(self, model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)
    
    def rerank(self, query, candidates, top_k=10):
        """
        对候选文档进行重排
        
        参数:
        - query: 查询字符串
        - candidates: 候选文档列表
        - top_k: 返回前k个结果
        """
        
        # 准备输入对
        pairs = [(query, doc.content) for doc in candidates]
        
        # 批量计算分数
        scores = self.model.predict(pairs)
        
        # 按分数排序
        scored_docs = list(zip(candidates, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        # 返回top_k
        return scored_docs[:top_k]

2.3 Bi-Encoder vs Cross-Encoder

维度Bi-EncoderCross-Encoder
速度快(O(1)查表)慢(O(n)计算)
精度一般
文档更新只需重算该文档所有相关分数都要重算
适合场景初筛精排
显存占用低(向量小)高(需要模型推理)

2.4 ColBERT:中间相遇

原理:Query和Document的每个token分别编码,用延迟交互计算相似度

class ColBERTReranker:
    """
    ColBERT重排
    在速度和精度之间取得平衡
    """
    
    def __init__(self, model_name="colbert-ir/colbertv2.0"):
        from transformers import AutoModel, AutoTokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
    
    def _encode(self, text, mask=False):
        """编码文本,返回每个token的向量"""
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        
        with torch.no_grad():
            outputs = self.model(**inputs)
        
        # 使用最后一层hidden state
        embeddings = outputs.last_hidden_state[0]  # [seq_len, hidden_dim]
        
        if mask:
            # 只保留token embedding(去掉[CLS]和[SEP])
            mask = inputs["attention_mask"][0]
            embeddings = embeddings[mask.bool()]
        
        return embeddings
    
    def score(self, query, document):
        """
        计算Query和Document的ColBERT分数
        """
        
        # 编码query和document
        Q = self._encode(query, mask=True)   # [query_len, dim]
        D = self._encode(document, mask=True)  # [doc_len, dim]
        
        # 计算相似度矩阵
        # Q: [query_len, dim], D: [doc_len, dim]
        # sim_matrix: [query_len, doc_len]
        sim_matrix = torch.matmul(Q, D.T)
        
        # MaxSim:每个query token找最相似的doc token
        max_sim = sim_matrix.max(dim=1)[0]  # [query_len]
        
        # 求和作为最终分数
        score = max_sim.sum().item()
        
        return score
    
    def rerank(self, query, candidates, top_k=10):
        """重排"""
        scores = []
        for doc in candidates:
            score = self.score(query, doc.content)
            scores.append((doc, score))
        
        scores.sort(key=lambda x: x[1], reverse=True)
        return scores[:top_k]

三、重排的工作流程

3.1 两阶段流程

阶段1:向量检索(召回)
   ↓
找到top-100候选文档
   ↓
阶段2:重排(排序)
   ↓
返回top-10最终结果

3.2 代码实现

class TwoStageRetriever:
    """两阶段检索+重排"""
    
    def __init__(self, vector_db, reranker):
        self.vector_db = vector_db
        self.reranker = reranker
    
    def retrieve(self, query, recall_k=100, final_k=10):
        """
        两阶段检索
        
        参数:
        - query: 查询
        - recall_k: 召回阶段返回的数量
        - final_k: 最终返回的数量
        """
        
        # 阶段1:向量检索召回
        recall_results = self.vector_db.search(query, top_k=recall_k)
        
        # 阶段2:重排精排
        final_results = self.reranker.rerank(query, recall_results, top_k=final_k)
        
        return final_results

3.3 实际使用示例

from sentence_transformers import CrossEncoder
 
# 初始化
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
reranker = CrossEncoderReranker(cross_encoder)
 
# 两阶段检索
retriever = TwoStageRetriever(
    vector_db=your_vector_db,
    reranker=reranker
)
 
# 检索
results = retriever.retrieve(
    "如何用Python处理JSON数据",
    recall_k=50,
    final_k=10
)
 
for doc, score in results:
    print(f"[{score:.4f}] {doc.content[:100]}...")

四、重排模型的选择

4.1 开源模型推荐

模型类型速度精度适用场景
BGE-RerankerCross通用场景
Cohere RerankCross英文为主
Jina RerankerCross多语言
ms-marco-MiniLMCross很快快速部署
FlagEmbeddingCross中文优化

4.2 使用BGE-Reranker

from flag_model import FlagReranker
 
# 初始化
reranker = FlagReranker('BAAI/bge-reranker-v2-m3', use_fp16=True)
 
# 重排
query = "什么是机器学习"
documents = [
    "机器学习是人工智能的一个分支...",
    "深度学习是机器学习的子领域...",
    "今天天气不错..."
]
 
# 输入格式:query, document pairs
pairs = [[query, doc] for doc in documents]
scores = reranker.compute_score(pairs)
 
# 按分数排序
scored_docs = list(zip(documents, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
 
for doc, score in scored_docs:
    print(f"[{score:.4f}] {doc[:50]}...")

4.3 使用Cohere Rerank

import cohere
 
# 初始化
co = cohere.Client("your-api-key")
 
def cohere_rerank(query, documents, top_n=10):
    """使用Cohere Rerank"""
    
    response = co.rerank(
        query=query,
        documents=documents,
        model="rerank-multilingual-v2.0",
        top_n=top_n
    )
    
    results = []
    for result in response.results:
        results.append({
            "index": result.index,
            "document": documents[result.index],
            "score": result.relevance_score
        })
    
    return results
 
# 使用
documents = ["文档1内容", "文档2内容", "文档3内容"]
results = cohere_rerank("Python教程", documents, top_n=2)

五、学习排序(Learning to Rank)

5.1 什么是Learning to Rank

Learning to Rank(LTR)是用机器学习来学习排序。

核心思想

  • 收集标注数据:(查询, 文档, 相关性标签)
  • 训练排序模型
  • 用模型预测新查询的文档排序

5.2 Pointwise方法

原理:每个文档单独打分

from sklearn.gradient_boosting import GradientBoostingRegressor
 
class PointwiseRanker:
    """
    Pointwise排序
    把排序问题变成回归问题
    """
    
    def __init__(self):
        self.model = GradientBoostingRegressor(n_estimators=100)
        self.feature_extractor = RankingFeatureExtractor()
    
    def train(self, train_data):
        """
        训练数据格式:
        [
            {"query": "机器学习", "doc": "ML是AI的分支", "label": 3},
            {"query": "机器学习", "doc": "今天天气好", "label": 0},
            ...
        ]
        """
        X = []
        y = []
        
        for item in train_data:
            features = self.feature_extractor.extract(item["query"], item["doc"])
            X.append(features)
            y.append(item["label"])
        
        self.model.fit(X, y)
    
    def predict(self, query, documents):
        """预测排序"""
        X = [self.feature_extractor.extract(query, doc) for doc in documents]
        scores = self.model.predict(X)
        
        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        return scored_docs

5.3 Pairwise方法

原理:比较文档对的相对顺序

class PairwiseRanker:
    """
    Pairwise排序
    把排序问题变成分类问题:哪个文档更好
    """
    
    def __init__(self):
        from xgboost import XGBClassifier
        self.model = XGBClassifier()
        self.feature_extractor = RankingFeatureExtractor()
    
    def train(self, train_data):
        """
        训练数据格式:
        [
            {"query": "机器学习", "doc1": "ML是AI的分支", "doc2": "今天天气好", "label": 1},
            # label=1表示doc1比doc2好
            ...
        ]
        """
        X = []
        y = []
        
        for item in train_data:
            features = self.feature_extractor.extract_pair(
                item["query"], 
                item["doc1"], 
                item["doc2"]
            )
            X.append(features)
            y.append(item["label"])
        
        self.model.fit(X, y)

5.4 Listwise方法

原理:直接优化整个列表的排序

class ListwiseRanker:
    """
    Listwise排序
    直接优化NDCG等排序指标
    """
    
    def __init__(self, model_type="lambdarank"):
        # 实际使用LightGBM的lambdarank
        from lightgbm import LGBMRanker
        self.model = LGBMRanker(
            objective="lambdarank",
            metric="ndcg",
            n_estimators=100
        )
        self.feature_extractor = RankingFeatureExtractor()
    
    def train(self, train_data):
        """
        训练数据格式:
        {
            "query": "机器学习",
            "docs": ["文档1", "文档2", ...],
            "labels": [3, 0, 2, ...]  # 相关性标签
        }
        """
        X = []
        y = []
        groups = []
        
        for item in train_data:
            query = item["query"]
            docs = item["docs"]
            labels = item["labels"]
            
            for doc in docs:
                features = self.feature_extractor.extract(query, doc)
                X.append(features)
                y.append(labels[docs.index(doc)])
            
            groups.append(len(docs))
        
        self.model.fit(X, y, group=groups)

六、实战代码

6.1 完整重排管道

from dataclasses import dataclass
from typing import List
import numpy as np
 
@dataclass
class RerankingConfig:
    recall_k: int = 100      # 召回阶段返回数量
    rerank_k: int = 20      # 重排阶段返回数量
    final_k: int = 10        # 最终返回数量
    use_cross_encoder: bool = True
 
class RerankingPipeline:
    """
    完整重排管道
    召回 → 重排 → 返回结果
    """
    
    def __init__(self, vector_db, reranker, config: RerankingConfig = None):
        self.vector_db = vector_db
        self.reranker = reranker
        self.config = config or RerankingConfig()
    
    def search(self, query: str) -> List[dict]:
        """
        执行检索+重排
        """
        
        # 阶段1:召回
        recall_results = self.vector_db.search(query, top_k=self.config.recall_k)
        
        # 阶段2:重排
        reranked = self.reranker.rerank(
            query, 
            recall_results, 
            top_k=self.config.rerank_k
        )
        
        # 阶段3:格式化返回
        final_results = []
        for doc, score in reranked[:self.config.final_k]:
            final_results.append({
                "content": doc.content,
                "score": float(score),
                "metadata": doc.metadata
            })
        
        return final_results

6.2 批量重排

class BatchReranker:
    """批量重排优化"""
    
    def __init__(self, reranker, batch_size=32):
        self.reranker = reranker
        self.batch_size = batch_size
    
    def rerank_batch(self, queries: List[str], documents: List[List[str]]):
        """
        批量重排多个查询
        
        参数:
        - queries: 查询列表
        - documents: 每个查询对应的候选文档列表
        """
        
        results = []
        
        # 分批处理
        for i in range(0, len(queries), self.batch_size):
            batch_queries = queries[i:i+self.batch_size]
            batch_docs = documents[i:i+self.batch_size]
            
            # 批量重排
            batch_results = self._process_batch(batch_queries, batch_docs)
            results.extend(batch_results)
        
        return results
    
    def _process_batch(self, queries, documents):
        """处理一批查询"""
        all_pairs = []
        
        # 准备所有(query, doc)对
        for query, docs in zip(queries, documents):
            for doc in docs:
                all_pairs.append([query, doc])
        
        # 批量计算分数
        if hasattr(self.reranker, 'model'):
            # Cross-Encoder批量预测
            scores = self.reranker.model.predict(all_pairs)
        else:
            scores = [self.reranker.score(p[0], p[1]) for p in all_pairs]
        
        # 解析结果
        results = []
        idx = 0
        for docs in documents:
            query_results = []
            for doc in docs:
                query_results.append((doc, scores[idx]))
                idx += 1
            
            # 排序
            query_results.sort(key=lambda x: x[1], reverse=True)
            results.append(query_results)
        
        return results

6.3 混合重排

class HybridReranker:
    """
    混合重排
    结合多种信号进行排序
    """
    
    def __init__(self, vector_db, cross_encoder, bm25_scorer):
        self.vector_db = vector_db
        self.cross_encoder = cross_encoder
        self.bm25_scorer = bm25_scorer
    
    def rerank(self, query, candidates, weights=None):
        """
        混合重排
        
        参数:
        - weights: 各信号的权重,默认为 {"vector": 0.3, "cross": 0.5, "bm25": 0.2}
        """
        
        weights = weights or {"vector": 0.3, "cross": 0.5, "bm25": 0.2}
        
        # 1. 向量相似度
        vector_scores = self._get_vector_scores(query, candidates)
        
        # 2. Cross-Encoder分数
        cross_scores = self._get_cross_scores(query, candidates)
        
        # 3. BM25分数
        bm25_scores = self._get_bm25_scores(query, candidates)
        
        # 4. 归一化分数
        vector_norm = self._normalize(vector_scores)
        cross_norm = self._normalize(cross_scores)
        bm25_norm = self._normalize(bm25_scores)
        
        # 5. 加权求和
        final_scores = []
        for i, doc in enumerate(candidates):
            score = (
                weights["vector"] * vector_norm[i] +
                weights["cross"] * cross_norm[i] +
                weights["bm25"] * bm25_norm[i]
            )
            final_scores.append((doc, score))
        
        # 6. 排序
        final_scores.sort(key=lambda x: x[1], reverse=True)
        
        return final_scores
    
    def _get_vector_scores(self, query, candidates):
        # 获取向量相似度分数
        return [0.5] * len(candidates)  # 占位
    
    def _get_cross_scores(self, query, candidates):
        pairs = [[query, doc.content] for doc in candidates]
        return self.cross_encoder.predict(pairs)
    
    def _get_bm25_scores(self, query, candidates):
        return [self.bm25_scorer.score(query, doc.content) for doc in candidates]
    
    def _normalize(self, scores):
        """Min-Max归一化"""
        min_s, max_s = min(scores), max(scores)
        if max_s == min_s:
            return [0.5] * len(scores)
        return [(s - min_s) / (max_s - min_s) for s in scores]

七、重排的注意事项

7.1 性能优化

问题:Cross-Encoder速度慢

解决方案

# 1. 用更小的模型
model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-2-v2')
 
# 2. 限制候选文档数量
recall_k = 50  # 不要召回太多
 
# 3. 批量处理
model.predict(pairs)  # 一次预测多个
 
# 4. 混合策略:先用Bi-Encoder粗排,再用Cross-Encoder精排

7.2 效果评估

def evaluate_reranker(reranker, test_data):
    """
    评估重排效果
    
    test_data格式:
    {
        "query": "...",
        "documents": ["doc1", "doc2", ...],
        "relevant": [0, 2, ...]  # 相关文档的索引
    }
    """
    
    from sklearn.metrics import ndcg_score
    
    ndcg_scores = []
    
    for item in test_data:
        query = item["query"]
        docs = item["documents"]
        relevant = item["relevant"]
        
        # 重排
        ranked = reranker.rerank(query, docs, top_k=len(docs))
        
        # 计算NDCG
        ideal_ranking = sorted(range(len(docs)), key=lambda i: i in relevant, reverse=True)
        # ... 简化计算
        
        ndcg_scores.append(ndcg)
    
    return np.mean(ndcg_scores)

7.3 常见问题

Q:重排后效果反而变差?

A:可能原因:

  • Cross-Encoder模型不适合你的场景
  • 候选文档数量不合适(太多或太少)
  • 权重设置不合理

Q:速度太慢怎么办?

A:优化方案:

  • 减少召回阶段的候选数量
  • 使用更快的轻量模型
  • 批量处理
  • 混合策略(粗排+精排)

Q:内存占用太高?

A:解决方案:

  • 使用量化模型
  • 减少batch size
  • 及时释放显存

八、总结

重排的核心价值

  • 把”可能相关”变成”最相关”
  • 提升最终结果质量
  • 用户体验大幅改善

技术选择

  • 初筛阶段用Bi-Encoder(快)
  • 精排阶段用Cross-Encoder(准)
  • 复杂场景用混合重排(稳)

实战建议

  • 两阶段流程:召回100个 → 重排返回10个
  • 根据场景选择合适的重排模型
  • 监控重排前后的效果变化

记住:重排不是万能的,它是检索的补充,不是替代。

相关主题


更新记录

  • 2026-04-24:改写完成,语言风格优化
  • 增加ColBERT和学习排序内容