重排技术深度指南:让搜索结果更靠谱
这篇文章解决什么问题
检索阶段找到了一堆相关文档,但排序乱七八糟,好的排在后面,差的反而排前面怎么办?重排技术就是来解决这个”排序混乱”问题的。
前言:检索和排序是两回事
先说个我踩过的坑。
做知识库问答的时候,用向量检索找到了20个相关文档。理论上应该把最相关的排在前面,但实际上:
排名第1的:一篇文章的摘要,和问题有那么点关系 排名第5的:直接回答了用户问题的好内容 排名第15的:相关性很差,但包含了一些关键词
你说用户看到排名第1的结果不是自己想要的,是不是直接骂街?
后来学会了重排技术,加了一个排序步骤,效果立竿见影。
一、为什么需要重排
1.1 检索和排序的区别
打个比方:
检索就像海选:找到所有可能相关的候选人(速度要快) 排序就像终选:从候选人里选出最合适的(质量要高)
向量检索的目的是”快”,快速从海量文档里找到可能相关的top-k。
但”可能相关”不等于”最相关”,需要更精细的排序来筛选。
1.2 向量检索的局限性
向量检索用的余弦相似度有很多问题:
| 问题 | 说明 | 例子 |
|---|---|---|
| 语义漂移 | 表面相似但实际不相关 | ”如何煮咖啡”匹配到”咖啡店经营” |
| 关键词权重 | 关键词命中不等于真正需要 | ”Python教程”匹配到讲Python人生的文章 |
| 长文档劣势 | 长文档向量容易被稀释 | 一篇万字文章包含了一句话的答案反而排不上 |
1.3 重排的价值
重排的核心价值:
- 精细化评估:考虑更多维度的相关性
- 上下文理解:理解查询和文档的真正关系
- 质量提升:把最好的结果排在最前面
二、重排的核心技术
2.1 Bi-Encoder:双塔模型
原理:查询和文档分别编码,在向量空间里比较相似度
查询 → [Encoder] → 向量Q
文档 → [Encoder] → 向量D
相似度 = cosine(向量Q, 向量D)
特点:
- 速度快:可以预先计算文档向量
- 精度一般:无法捕获查询和文档的细粒度交互
- 适合初筛
class BiEncoderReranker:
"""
Bi-Encoder重排
使用预计算的文档向量
"""
def __init__(self, encoder, device="cpu"):
self.encoder = encoder
self.device = device
self.doc_embeddings = {} # doc_id -> embedding
def index(self, documents):
"""预先计算文档向量"""
for doc in documents:
embedding = self.encoder.encode(doc.content)
self.doc_embeddings[doc.id] = embedding
def rerank(self, query, candidates, top_k=10):
"""重排候选文档"""
# 编码查询
query_vector = self.encoder.encode(query)
# 计算每个候选文档的相似度
scores = []
for doc in candidates:
doc_vector = self.doc_embeddings.get(doc.id)
if doc_vector is None:
# 动态编码
doc_vector = self.encoder.encode(doc.content)
score = cosine_similarity(query_vector, doc_vector)
scores.append((doc, score))
# 按分数排序
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]2.2 Cross-Encoder:交叉编码
原理:查询和文档一起输入,直接输出相关性分数
[Query; Document] → [Cross-Encoder] → 相似度分数
特点:
- 精度高:捕获查询和文档的细粒度交互
- 速度慢:每个(query, doc)对都需要单独计算
- 适合精排
class CrossEncoderReranker:
"""
Cross-Encoder重排
精度高但速度慢
"""
def __init__(self, model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(self, query, candidates, top_k=10):
"""
对候选文档进行重排
参数:
- query: 查询字符串
- candidates: 候选文档列表
- top_k: 返回前k个结果
"""
# 准备输入对
pairs = [(query, doc.content) for doc in candidates]
# 批量计算分数
scores = self.model.predict(pairs)
# 按分数排序
scored_docs = list(zip(candidates, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
# 返回top_k
return scored_docs[:top_k]2.3 Bi-Encoder vs Cross-Encoder
| 维度 | Bi-Encoder | Cross-Encoder |
|---|---|---|
| 速度 | 快(O(1)查表) | 慢(O(n)计算) |
| 精度 | 一般 | 高 |
| 文档更新 | 只需重算该文档 | 所有相关分数都要重算 |
| 适合场景 | 初筛 | 精排 |
| 显存占用 | 低(向量小) | 高(需要模型推理) |
2.4 ColBERT:中间相遇
原理:Query和Document的每个token分别编码,用延迟交互计算相似度
class ColBERTReranker:
"""
ColBERT重排
在速度和精度之间取得平衡
"""
def __init__(self, model_name="colbert-ir/colbertv2.0"):
from transformers import AutoModel, AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name)
def _encode(self, text, mask=False):
"""编码文本,返回每个token的向量"""
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self.model(**inputs)
# 使用最后一层hidden state
embeddings = outputs.last_hidden_state[0] # [seq_len, hidden_dim]
if mask:
# 只保留token embedding(去掉[CLS]和[SEP])
mask = inputs["attention_mask"][0]
embeddings = embeddings[mask.bool()]
return embeddings
def score(self, query, document):
"""
计算Query和Document的ColBERT分数
"""
# 编码query和document
Q = self._encode(query, mask=True) # [query_len, dim]
D = self._encode(document, mask=True) # [doc_len, dim]
# 计算相似度矩阵
# Q: [query_len, dim], D: [doc_len, dim]
# sim_matrix: [query_len, doc_len]
sim_matrix = torch.matmul(Q, D.T)
# MaxSim:每个query token找最相似的doc token
max_sim = sim_matrix.max(dim=1)[0] # [query_len]
# 求和作为最终分数
score = max_sim.sum().item()
return score
def rerank(self, query, candidates, top_k=10):
"""重排"""
scores = []
for doc in candidates:
score = self.score(query, doc.content)
scores.append((doc, score))
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]三、重排的工作流程
3.1 两阶段流程
阶段1:向量检索(召回)
↓
找到top-100候选文档
↓
阶段2:重排(排序)
↓
返回top-10最终结果
3.2 代码实现
class TwoStageRetriever:
"""两阶段检索+重排"""
def __init__(self, vector_db, reranker):
self.vector_db = vector_db
self.reranker = reranker
def retrieve(self, query, recall_k=100, final_k=10):
"""
两阶段检索
参数:
- query: 查询
- recall_k: 召回阶段返回的数量
- final_k: 最终返回的数量
"""
# 阶段1:向量检索召回
recall_results = self.vector_db.search(query, top_k=recall_k)
# 阶段2:重排精排
final_results = self.reranker.rerank(query, recall_results, top_k=final_k)
return final_results3.3 实际使用示例
from sentence_transformers import CrossEncoder
# 初始化
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
reranker = CrossEncoderReranker(cross_encoder)
# 两阶段检索
retriever = TwoStageRetriever(
vector_db=your_vector_db,
reranker=reranker
)
# 检索
results = retriever.retrieve(
"如何用Python处理JSON数据",
recall_k=50,
final_k=10
)
for doc, score in results:
print(f"[{score:.4f}] {doc.content[:100]}...")四、重排模型的选择
4.1 开源模型推荐
| 模型 | 类型 | 速度 | 精度 | 适用场景 |
|---|---|---|---|---|
| BGE-Reranker | Cross | 中 | 高 | 通用场景 |
| Cohere Rerank | Cross | 快 | 高 | 英文为主 |
| Jina Reranker | Cross | 快 | 高 | 多语言 |
| ms-marco-MiniLM | Cross | 很快 | 中 | 快速部署 |
| FlagEmbedding | Cross | 中 | 高 | 中文优化 |
4.2 使用BGE-Reranker
from flag_model import FlagReranker
# 初始化
reranker = FlagReranker('BAAI/bge-reranker-v2-m3', use_fp16=True)
# 重排
query = "什么是机器学习"
documents = [
"机器学习是人工智能的一个分支...",
"深度学习是机器学习的子领域...",
"今天天气不错..."
]
# 输入格式:query, document pairs
pairs = [[query, doc] for doc in documents]
scores = reranker.compute_score(pairs)
# 按分数排序
scored_docs = list(zip(documents, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
for doc, score in scored_docs:
print(f"[{score:.4f}] {doc[:50]}...")4.3 使用Cohere Rerank
import cohere
# 初始化
co = cohere.Client("your-api-key")
def cohere_rerank(query, documents, top_n=10):
"""使用Cohere Rerank"""
response = co.rerank(
query=query,
documents=documents,
model="rerank-multilingual-v2.0",
top_n=top_n
)
results = []
for result in response.results:
results.append({
"index": result.index,
"document": documents[result.index],
"score": result.relevance_score
})
return results
# 使用
documents = ["文档1内容", "文档2内容", "文档3内容"]
results = cohere_rerank("Python教程", documents, top_n=2)五、学习排序(Learning to Rank)
5.1 什么是Learning to Rank
Learning to Rank(LTR)是用机器学习来学习排序。
核心思想:
- 收集标注数据:(查询, 文档, 相关性标签)
- 训练排序模型
- 用模型预测新查询的文档排序
5.2 Pointwise方法
原理:每个文档单独打分
from sklearn.gradient_boosting import GradientBoostingRegressor
class PointwiseRanker:
"""
Pointwise排序
把排序问题变成回归问题
"""
def __init__(self):
self.model = GradientBoostingRegressor(n_estimators=100)
self.feature_extractor = RankingFeatureExtractor()
def train(self, train_data):
"""
训练数据格式:
[
{"query": "机器学习", "doc": "ML是AI的分支", "label": 3},
{"query": "机器学习", "doc": "今天天气好", "label": 0},
...
]
"""
X = []
y = []
for item in train_data:
features = self.feature_extractor.extract(item["query"], item["doc"])
X.append(features)
y.append(item["label"])
self.model.fit(X, y)
def predict(self, query, documents):
"""预测排序"""
X = [self.feature_extractor.extract(query, doc) for doc in documents]
scores = self.model.predict(X)
scored_docs = list(zip(documents, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
return scored_docs5.3 Pairwise方法
原理:比较文档对的相对顺序
class PairwiseRanker:
"""
Pairwise排序
把排序问题变成分类问题:哪个文档更好
"""
def __init__(self):
from xgboost import XGBClassifier
self.model = XGBClassifier()
self.feature_extractor = RankingFeatureExtractor()
def train(self, train_data):
"""
训练数据格式:
[
{"query": "机器学习", "doc1": "ML是AI的分支", "doc2": "今天天气好", "label": 1},
# label=1表示doc1比doc2好
...
]
"""
X = []
y = []
for item in train_data:
features = self.feature_extractor.extract_pair(
item["query"],
item["doc1"],
item["doc2"]
)
X.append(features)
y.append(item["label"])
self.model.fit(X, y)5.4 Listwise方法
原理:直接优化整个列表的排序
class ListwiseRanker:
"""
Listwise排序
直接优化NDCG等排序指标
"""
def __init__(self, model_type="lambdarank"):
# 实际使用LightGBM的lambdarank
from lightgbm import LGBMRanker
self.model = LGBMRanker(
objective="lambdarank",
metric="ndcg",
n_estimators=100
)
self.feature_extractor = RankingFeatureExtractor()
def train(self, train_data):
"""
训练数据格式:
{
"query": "机器学习",
"docs": ["文档1", "文档2", ...],
"labels": [3, 0, 2, ...] # 相关性标签
}
"""
X = []
y = []
groups = []
for item in train_data:
query = item["query"]
docs = item["docs"]
labels = item["labels"]
for doc in docs:
features = self.feature_extractor.extract(query, doc)
X.append(features)
y.append(labels[docs.index(doc)])
groups.append(len(docs))
self.model.fit(X, y, group=groups)六、实战代码
6.1 完整重排管道
from dataclasses import dataclass
from typing import List
import numpy as np
@dataclass
class RerankingConfig:
recall_k: int = 100 # 召回阶段返回数量
rerank_k: int = 20 # 重排阶段返回数量
final_k: int = 10 # 最终返回数量
use_cross_encoder: bool = True
class RerankingPipeline:
"""
完整重排管道
召回 → 重排 → 返回结果
"""
def __init__(self, vector_db, reranker, config: RerankingConfig = None):
self.vector_db = vector_db
self.reranker = reranker
self.config = config or RerankingConfig()
def search(self, query: str) -> List[dict]:
"""
执行检索+重排
"""
# 阶段1:召回
recall_results = self.vector_db.search(query, top_k=self.config.recall_k)
# 阶段2:重排
reranked = self.reranker.rerank(
query,
recall_results,
top_k=self.config.rerank_k
)
# 阶段3:格式化返回
final_results = []
for doc, score in reranked[:self.config.final_k]:
final_results.append({
"content": doc.content,
"score": float(score),
"metadata": doc.metadata
})
return final_results6.2 批量重排
class BatchReranker:
"""批量重排优化"""
def __init__(self, reranker, batch_size=32):
self.reranker = reranker
self.batch_size = batch_size
def rerank_batch(self, queries: List[str], documents: List[List[str]]):
"""
批量重排多个查询
参数:
- queries: 查询列表
- documents: 每个查询对应的候选文档列表
"""
results = []
# 分批处理
for i in range(0, len(queries), self.batch_size):
batch_queries = queries[i:i+self.batch_size]
batch_docs = documents[i:i+self.batch_size]
# 批量重排
batch_results = self._process_batch(batch_queries, batch_docs)
results.extend(batch_results)
return results
def _process_batch(self, queries, documents):
"""处理一批查询"""
all_pairs = []
# 准备所有(query, doc)对
for query, docs in zip(queries, documents):
for doc in docs:
all_pairs.append([query, doc])
# 批量计算分数
if hasattr(self.reranker, 'model'):
# Cross-Encoder批量预测
scores = self.reranker.model.predict(all_pairs)
else:
scores = [self.reranker.score(p[0], p[1]) for p in all_pairs]
# 解析结果
results = []
idx = 0
for docs in documents:
query_results = []
for doc in docs:
query_results.append((doc, scores[idx]))
idx += 1
# 排序
query_results.sort(key=lambda x: x[1], reverse=True)
results.append(query_results)
return results6.3 混合重排
class HybridReranker:
"""
混合重排
结合多种信号进行排序
"""
def __init__(self, vector_db, cross_encoder, bm25_scorer):
self.vector_db = vector_db
self.cross_encoder = cross_encoder
self.bm25_scorer = bm25_scorer
def rerank(self, query, candidates, weights=None):
"""
混合重排
参数:
- weights: 各信号的权重,默认为 {"vector": 0.3, "cross": 0.5, "bm25": 0.2}
"""
weights = weights or {"vector": 0.3, "cross": 0.5, "bm25": 0.2}
# 1. 向量相似度
vector_scores = self._get_vector_scores(query, candidates)
# 2. Cross-Encoder分数
cross_scores = self._get_cross_scores(query, candidates)
# 3. BM25分数
bm25_scores = self._get_bm25_scores(query, candidates)
# 4. 归一化分数
vector_norm = self._normalize(vector_scores)
cross_norm = self._normalize(cross_scores)
bm25_norm = self._normalize(bm25_scores)
# 5. 加权求和
final_scores = []
for i, doc in enumerate(candidates):
score = (
weights["vector"] * vector_norm[i] +
weights["cross"] * cross_norm[i] +
weights["bm25"] * bm25_norm[i]
)
final_scores.append((doc, score))
# 6. 排序
final_scores.sort(key=lambda x: x[1], reverse=True)
return final_scores
def _get_vector_scores(self, query, candidates):
# 获取向量相似度分数
return [0.5] * len(candidates) # 占位
def _get_cross_scores(self, query, candidates):
pairs = [[query, doc.content] for doc in candidates]
return self.cross_encoder.predict(pairs)
def _get_bm25_scores(self, query, candidates):
return [self.bm25_scorer.score(query, doc.content) for doc in candidates]
def _normalize(self, scores):
"""Min-Max归一化"""
min_s, max_s = min(scores), max(scores)
if max_s == min_s:
return [0.5] * len(scores)
return [(s - min_s) / (max_s - min_s) for s in scores]七、重排的注意事项
7.1 性能优化
问题:Cross-Encoder速度慢
解决方案:
# 1. 用更小的模型
model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-2-v2')
# 2. 限制候选文档数量
recall_k = 50 # 不要召回太多
# 3. 批量处理
model.predict(pairs) # 一次预测多个
# 4. 混合策略:先用Bi-Encoder粗排,再用Cross-Encoder精排7.2 效果评估
def evaluate_reranker(reranker, test_data):
"""
评估重排效果
test_data格式:
{
"query": "...",
"documents": ["doc1", "doc2", ...],
"relevant": [0, 2, ...] # 相关文档的索引
}
"""
from sklearn.metrics import ndcg_score
ndcg_scores = []
for item in test_data:
query = item["query"]
docs = item["documents"]
relevant = item["relevant"]
# 重排
ranked = reranker.rerank(query, docs, top_k=len(docs))
# 计算NDCG
ideal_ranking = sorted(range(len(docs)), key=lambda i: i in relevant, reverse=True)
# ... 简化计算
ndcg_scores.append(ndcg)
return np.mean(ndcg_scores)7.3 常见问题
Q:重排后效果反而变差?
A:可能原因:
- Cross-Encoder模型不适合你的场景
- 候选文档数量不合适(太多或太少)
- 权重设置不合理
Q:速度太慢怎么办?
A:优化方案:
- 减少召回阶段的候选数量
- 使用更快的轻量模型
- 批量处理
- 混合策略(粗排+精排)
Q:内存占用太高?
A:解决方案:
- 使用量化模型
- 减少batch size
- 及时释放显存
八、总结
重排的核心价值:
- 把”可能相关”变成”最相关”
- 提升最终结果质量
- 用户体验大幅改善
技术选择:
- 初筛阶段用Bi-Encoder(快)
- 精排阶段用Cross-Encoder(准)
- 复杂场景用混合重排(稳)
实战建议:
- 两阶段流程:召回100个 → 重排返回10个
- 根据场景选择合适的重排模型
- 监控重排前后的效果变化
记住:重排不是万能的,它是检索的补充,不是替代。
相关主题
更新记录
- 2026-04-24:改写完成,语言风格优化
- 增加ColBERT和学习排序内容