向量数据库部署
摘要
本文档提供主流向量数据库的完整部署指南,涵盖Docker容器化部署、客户端SDK配置、索引类型选择与调优、以及生产环境性能优化策略。通过详细的配置文件示例和步骤说明,帮助快速搭建生产级别的向量检索基础设施。
关键词速查表
| 关键词 | 说明 |
|---|---|
| Docker Compose | 多容器编排工具 |
| Kubernetes | 容器编排平台 |
| Helm Chart | Kubernetes包管理工具 |
| 索引配置 | HNSW/IVF/PQ等参数设置 |
| 资源配额 | CPU/内存/GPU配置 |
| 连接池 | 客户端连接复用 |
| 批量操作 | 提升吞吐量的批量接口 |
| 监控告警 | Prometheus/Grafana集成 |
| 备份恢复 | 数据持久化策略 |
| 滚动升级 | 零停机更新方案 |
一、Milvus部署指南
1.1 单机快速部署
Milvus的单机部署适合开发和测试环境,使用Docker Compose可以快速启动。
# 创建工作目录
mkdir -p milvus/volumes && cd milvus
# 下载docker-compose配置
curl -sL https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-standalone-docker-compose.yml \
-o docker-compose.yml
# 启动服务
docker-compose up -d
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs -f milvus
# 停止服务
docker-compose down# docker-compose.yml 配置文件详解
version: '3.8'
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- ./volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379
-listen-client-urls http://0.0.0.0:2379
--data-dir /etcd
networks:
- milvus
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
ports:
- "9001:9001"
- "9000:9000"
volumes:
- ./volumes/minio:/minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
networks:
- milvus
milvus:
container_name: milvus-standalone
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
# 日志配置
LOG_LEVEL: info
LOG_FORMAT: text
# 存储配置
COMMON_STORAGETYPE: local
volumes:
- ./volumes/milvus:/var/lib/milvus
- ./volumes/files:/home/milvus/connect grammed-files
ports:
- "19530:19530" # GRPC端口
- "9091:9091" # Prometheus端口
depends_on:
- etcd
- minio
networks:
- milvus
networks:
milvus:
driver: bridge1.2 集群模式部署
生产环境推荐使用Kubernetes部署Milvus集群,以获得高可用和横向扩展能力。
# milvus-cluster.yaml - Milvus集群Helm配置
# 添加Helm仓库
# helm repo add milvus https://milvus-io.github.io/milvus-helm/
# helm repo update
# values.yaml 详细配置
values:
cluster:
enabled: true
# 各组件副本数
milvus:
enabled: true
name: milvus
replicas: 3 # DataNode和QueryNode副本
# 索引节点(计算密集型)
indexNode:
enabled: true
replicas: 2
resources:
requests:
cpu: "2"
memory: 8Gi
limits:
cpu: "4"
memory: 16Gi
# nvidia.com/gpu: "1" # GPU加速索引构建
# 查询节点(内存密集型)
queryNode:
enabled: true
replicas: 3
resources:
requests:
cpu: "2"
memory: 32Gi
limits:
cpu: "8"
memory: 64Gi
# 数据节点
dataNode:
enabled: true
replicas: 2
resources:
requests:
cpu: "1"
memory: 8Gi
limits:
cpu: "4"
memory: 16Gi
# 代理节点(接入层)
proxy:
enabled: true
replicas: 2
service:
type: LoadBalancer
resources:
requests:
cpu: "1"
memory: 4Gi
limits:
cpu: "2"
memory: 8Gi
# 存储配置
etcd:
enabled: true
mode: cluster # 使用集群模式
replicaCount: 3
persistence:
enabled: true
storageClass: "ssd"
size: 20Gi
minio:
enabled: true
mode: distributed # 分布式模式
replicaCount: 4
persistence:
enabled: true
storageClass: "ssd"
size: 100Gi
# 配置
config:
log:
level: info
cache:
insertBufferSize: 16GB # 插入缓冲区大小
grpc:
port: 19530
serverMaxSendSize: 2147483647 # 2GB
serverMaxRecvSize: 2147483647# 使用Helm部署
helm install milvus-cluster milvus/milvus \
-n milvus-system \
--create-namespace \
-f values.yaml
# 查看部署状态
kubectl get pods -n milvus-system
# 扩缩容
kubectl scale deployment milvus-datacoord -n milvus-system --replicas=5
# 更新配置
helm upgrade milvus-cluster milvus/milvus -n milvus-system -f values.yaml
# 卸载
helm uninstall milvus-cluster -n milvus-system1.3 Milvus客户端配置
"""
Milvus Python客户端最佳配置
安装: pip install pymilvus[torch]
"""
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
from pymilvus.client import types
import numpy as np
from typing import List
class MilvusClient:
"""Milvus客户端封装"""
def __init__(self, host="localhost", port="19530", **kwargs):
# 建立连接
connections.connect(
alias="default",
host=host,
port=port,
# 连接池配置
pool_size=10,
max_pool_size=100,
**kwargs
)
self.alias = "default"
def create_collection(self, name: str, dim: int,
metric_type: str = "COSINE",
index_type: str = "HNSW",
**kwargs):
"""创建集合(Collection)"""
# 字段定义
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dim),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="timestamp", dtype=DataType.INT64), # 用于时间范围查询
]
schema = CollectionSchema(
fields=fields,
description=f"Collection: {name}",
enable_dynamic_field=False
)
# 创建集合
collection = Collection(name=name, schema=schema, using=self.alias)
# 创建索引
index_params = self._get_index_params(metric_type, index_type, **kwargs)
collection.create_index(
field_name="embedding",
index_params=index_params
)
# 创建分区(用于冷热数据分离)
collection.create_partition("hot_data", description="最近30天数据")
collection.create_partition("cold_data", description="历史数据")
return collection
def _get_index_params(self, metric_type: str, index_type: str, **kwargs):
"""根据索引类型生成参数"""
params_map = {
"HNSW": {
"index_type": "HNSW",
"metric_type": metric_type,
"params": {
"M": kwargs.get("M", 16), # 节点连接数,越大越精确但越慢
"efConstruction": kwargs.get("efConstruction", 200) # 构建时探索因子
}
},
"IVF_FLAT": {
"index_type": "IVF_FLAT",
"metric_type": metric_type,
"params": {
"nlist": kwargs.get("nlist", 1024) # 聚类中心数
}
},
"IVF_PQ": {
"index_type": "IVF_PQ",
"metric_type": metric_type,
"params": {
"nlist": kwargs.get("nlist", 1024),
"nbits": kwargs.get("nbits", 8) # 每个子向量位数
}
}
}
return params_map.get(index_type, params_map["HNSW"])
def insert_batch(self, collection_name: str,
embeddings: List[List[float]],
texts: List[str],
categories: List[str],
partition_name: str = None):
"""批量插入"""
collection = Collection(collection_name)
# 生成时间戳
import time
timestamps = [int(time.time() * 1000)] * len(embeddings)
data = [
embeddings,
texts,
categories,
timestamps
]
result = collection.insert(data, partition_name=partition_name)
# 定期刷新以提高可见性
collection.flush()
return result
def search(self, collection_name: str,
query_vector: List[float],
top_k: int = 10,
expr: str = None,
partition_names: List[str] = None,
**kwargs):
"""向量搜索"""
collection = Collection(collection_name)
# 搜索参数
search_params = {
"metric_type": kwargs.get("metric_type", "COSINE"),
"params": kwargs.get("search_params", {"ef": 128})
}
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=top_k,
expr=expr,
partition_names=partition_names,
output_fields=["text", "category", "timestamp"]
)
return self._format_results(results)
def _format_results(self, results):
"""格式化搜索结果"""
formatted = []
for hits in results:
for hit in hits:
formatted.append({
"id": hit.id,
"distance": hit.distance,
"entity": hit.entity
})
return formatted
def get_collection_stats(self, collection_name: str):
"""获取集合统计信息"""
collection = Collection(collection_name)
return collection.num_entities
def close(self):
"""关闭连接"""
connections.disconnect(alias=self.alias)
# 使用示例
client = MilvusClient(host="milvus-cluster.milvus-system.svc.cluster.local")
# 创建集合
client.create_collection(
name="knowledge_base",
dim=768,
metric_type="COSINE",
index_type="HNSW",
M=16,
efConstruction=200
)
# 插入数据
client.insert_batch(
collection_name="knowledge_base",
embeddings=[[0.1]*768 for _ in range(1000)],
texts=[f"文档{i}内容" for i in range(1000)],
categories=[f"category-{i%5}" for i in range(1000)]
)
# 搜索
results = client.search(
collection_name="knowledge_base",
query_vector=[0.1]*768,
top_k=10,
expr='category == "category-0"',
search_params={"ef": 128}
)1.4 性能监控
# Prometheus配置 - Milvus监控
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'milvus'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: milvus
- source_labels: [__meta_kubernetes_pod_container_port_number]
action: keep
regex: "9091"二、Qdrant部署指南
2.1 Docker快速部署
# 创建数据目录
mkdir -p qdrant/storage
# 启动Qdrant服务
docker run -d \
--name qdrant \
-p 6333:6333 \
-p 6334:6334 \
-v $(pwd)/qdrant/storage:/qdrant/storage \
qdrant/qdrant:v1.7.0
# 验证服务
curl http://localhost:6333/readyz2.2 Docker Compose生产配置
# qdrant-docker-compose.yml
version: '3.8'
services:
qdrant:
image: qdrant/qdrant:v1.7.0
container_name: qdrant
restart: unless-stopped
ports:
- "6333:6333" # REST API
- "6334:6334" # gRPC API
volumes:
- ./storage:/qdrant/storage
- ./config:/qdrant/config
environment:
# 日志级别
QDRANT__LOG_LEVEL: INFO
# 性能配置
QDRANT__SERVICE__MAX_REQUEST_SIZE_MB: 32
QDRANT__SERVICE__MAX_WORKERS: 16
# 存储配置
QDRANT__STORAGE__STORAGE_PATH: /qdrant/storage
QDRANT__STORAGE__SNAPSHOT_TX_SIZE: 104857600 # 100MB
# 优化器配置
QDRANT__STORAGE__OPTIMIZERS__INDEXING_THRESHOLD: 20000
QDRANT__STORAGE__OPTIMIZERS__MEMMAP_THRESHOLD_KB: 1024000
# HNSW参数
QDRANT__STORAGE__HNSW_INDEX__M: 16
QDRANT__STORAGE__HNSW_INDEX__EF_CONSTRUCTION: 200
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:6333/health"]
interval: 10s
timeout: 5s
retries: 5
deploy:
resources:
limits:
cpus: '4'
memory: 16G
reservations:
cpus: '2'
memory: 8G
# 监控导出器(可选)
qdrant-metrics:
image: prom/client_python:latest
depends_on:
- qdrant
command: python -c "
import prometheus_client
import requests
import time
from prometheus_client import Counter, Histogram, Gauge
# 定义指标
requests_total = Counter('qdrant_requests_total', 'Total requests', ['method', 'endpoint'])
request_duration = Histogram('qdrant_request_duration_seconds', 'Request duration')
vector_count = Gauge('qdrant_vector_count', 'Number of vectors')
while True:
try:
# 获取Qdrant指标
resp = requests.get('http://qdrant:6333/metrics')
print(resp.text)
except:
pass
time.sleep(15)
"
restart: unless-stopped2.3 Kubernetes部署
# qdrant-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: qdrant
namespace: vector-system
spec:
serviceName: qdrant-headless
replicas: 3
selector:
matchLabels:
app: qdrant
template:
metadata:
labels:
app: qdrant
spec:
containers:
- name: qdrant
image: qdrant/qdrant:v1.7.0
ports:
- containerPort: 6333
name: http
- containerPort: 6334
name: grpc
resources:
requests:
cpu: "2"
memory: 8Gi
limits:
cpu: "4"
memory: 16Gi
env:
- name: QDRANT__SERVICE__TLS
value: "false"
volumeMounts:
- name: qdrant-storage
mountPath: /qdrant/storage
livenessProbe:
httpGet:
path: /health
port: 6333
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /readyz
port: 6333
initialDelaySeconds: 5
periodSeconds: 5
# 启动命令
command: ["qdrant"]
args:
- "--address"
- "0.0.0.0:6333"
- "--config-path"
- "/qdrant/config/config.yaml"
volumes:
- name: qdrant-storage
persistentVolumeClaim:
claimName: qdrant-pvc
---
# Service
apiVersion: v1
kind: Service
metadata:
name: qdrant-service
namespace: vector-system
spec:
type: LoadBalancer
selector:
app: qdrant
ports:
- port: 6333
targetPort: 6333
name: http
- port: 6334
targetPort: 6334
name: grpc# qdrant-config.yaml
storage:
storage_path: /qdrant/storage
snapshots_path: /qdrant/snapshots
# 优化器配置
optimizers:
indexing_threshold: 20000
memmap_threshold_kb: 1024000
flush_interval_sec: 5
max_optimization_threads: 2
# HNSW索引配置
hnsw_index:
m: 16
ef_construct: 200
full_scan_threshold: 10000
on_disk: false
max_indexing_threads: 2
service:
host: 0.0.0.0
http_port: 6333
grpc_port: 6334
max_request_size_mb: 32
max_workers: 162.4 Qdrant Python客户端
"""
Qdrant Python客户端最佳实践
安装: pip install qdrant-client
"""
from qdrant_client import QdrantClient, models
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter
from qdrant_client.qdrant_fastembed import FastEmbedEmbeddingFunction
import numpy as np
from typing import List, Optional
class QdrantVectorStore:
"""Qdrant向量存储封装"""
def __init__(self, host="localhost", port=6333,
api_key: Optional[str] = None):
self.client = QdrantClient(
host=host,
port=port,
api_key=api_key,
# 连接配置
timeout=30,
prefer_grpc=True # gRPC性能更好
)
self._embedding_function = FastEmbedEmbeddingFunction(
model_name="BAAI/bge-small-zh-v1.5",
parallel=4 # 并行化编码
)
def create_collection(self, name: str, vector_size: int = 768,
distance: str = "Cosine"):
"""创建集合"""
# 删除已存在的集合
if self.client.collection_exists(name):
self.client.delete_collection(name)
# 距离度量映射
distance_map = {
"Cosine": Distance.COSINE,
"Euclidean": Distance.EUCLID,
"Dot": Distance.DOT
}
self.client.create_collection(
collection_name=name,
vectors_config=VectorParams(
size=vector_size,
distance=distance_map.get(distance, Distance.COSINE),
on_disk=True # 向量存储在磁盘,节省内存
),
# 稀疏向量配置(支持混合搜索)
sparse_vectors_config={
"text": models.SparseVectorParams(
index=models.SparseIndexParams(
on_disk=False,
full_scan_threshold=10000
)
)
},
# 优化器配置
optimizers_config=models.OptimizersConfig(
indexing_threshold=20000,
memmap_threshold_kb=1024000,
max_optimization_threads=2
),
# HNSW参数
hnsw_config=models.HnswConfig(
m=16,
ef_construct=200,
full_scan_threshold=10000,
max_indexing_threads=2
)
)
def upsert(self, collection_name: str,
points: List[PointStruct]):
"""批量插入向量"""
# 分批处理,避免大请求超时
batch_size = 1000
for i in range(0, len(points), batch_size):
batch = points[i:i+batch_size]
self.client.upsert(
collection_name=collection_name,
points=batch,
wait=True # 等待索引更新完成
)
def search(self, collection_name: str,
query_vector: List[float],
top_k: int = 10,
query_filter: Optional[Filter] = None,
score_threshold: Optional[float] = None,
with_payload: bool = True,
with_vectors: bool = False):
"""向量搜索"""
search_params = models.SearchParams(
hnsw_ef=128, # 查询时探索因子,越大越精确
exact=False # 是否使用精确搜索(慢但100%召回)
)
results = self.client.search(
collection_name=collection_name,
query_vector=query_vector,
query_filter=query_filter,
search_params=search_params,
limit=top_k,
score_threshold=score_threshold,
with_payload=with_payload,
with_vectors=with_vectors
)
return [{
"id": r.id,
"score": r.score,
"payload": r.payload
} for r in results]
def search_batch(self, collection_name: str,
query_vectors: List[List[float]],
top_k: int = 10,
query_filter: Optional[Filter] = None):
"""批量搜索"""
results = self.client.search_batch(
collection_name=collection_name,
requests=[
models.SearchRequest(
vector=qv,
filter=query_filter,
limit=top_k,
params=models.SearchParams(hnsw_ef=128)
)
for qv in query_vectors
]
)
return [[{
"id": r.id,
"score": r.score,
"payload": r.payload
} for r in batch_results] for batch_results in results]
def hybrid_search(self, collection_name: str,
query: str,
query_vector: List[float],
top_k: int = 10):
"""混合搜索(向量+关键词)"""
# 获取查询的稀疏向量
sparse_vector = self._embedding_function.query_encode(query)
results = self.client.search(
collection_name=collection_name,
query_vector=query_vector,
query_filter=None,
search_params=models.SearchParams(
hnsw_ef=128,
exact=False
),
limit=top_k,
with_payload=True,
# 稀疏向量检索
sparse_vector=models.NamedSparseVector(
name="text",
vector=sparse_vector
),
# 融合策略
fusion=models.FusionQuery(
fusion=models.Fusion.RRF # Reciprocal Rank Fusion
)
)
return results
def scroll(self, collection_name: str,
limit: int = 100,
offset: Optional[str] = None,
filter_cond: Optional[Filter] = None):
"""分页浏览数据"""
return self.client.scroll(
collection_name=collection_name,
limit=limit,
offset=offset,
filter=filter_cond,
with_payload=True
)
def get_collection_info(self, collection_name: str):
"""获取集合信息"""
info = self.client.get_collection(collection_name)
return {
"name": info.name,
"vectors_count": info.vectors_count,
"points_count": info.points_count,
"status": info.status,
"indexed_vectors_count": info.indexed_vectors_count
}
def create_snapshot(self, collection_name: str):
"""创建快照"""
return self.client.create_snapshot(collection_name=collection_name)
def restore_snapshot(self, collection_name: str, location: str):
"""从快照恢复"""
self.client.recover_from_snapshot(
collection_name=collection_name,
location=location
)
# 使用示例
store = QdrantVectorStore(
host="qdrant-service.vector-system.svc.cluster.local",
port=6333
)
# 创建集合
store.create_collection(
name="knowledge_base",
vector_size=768,
distance="Cosine"
)
# 批量插入
points = [
PointStruct(
id=f"doc-{i}",
vector=np.random.rand(768).tolist(),
payload={
"text": f"文档{i}内容",
"category": f"category-{i%5}",
"created_at": "2024-01-01"
}
)
for i in range(10000)
]
store.upsert("knowledge_base", points)
# 搜索
results = store.search(
"knowledge_base",
query_vector=np.random.rand(768).tolist(),
top_k=10,
query_filter=Filter(
must=[
models.FieldCondition(
key="category",
match=models.MatchValue(value="category-0")
)
]
),
score_threshold=0.7
)
# 混合搜索
results = store.hybrid_search(
"knowledge_base",
query="机器学习",
query_vector=np.random.rand(768).tolist(),
top_k=10
)三、Weaviate部署指南
3.1 Docker Compose部署
# weaviate-docker-compose.yml
version: '3.8'
services:
weaviate:
image: semitechnologies/weaviate:1.22.4
restart: unless-stopped
ports:
- "8080:8080" # HTTP API
environment:
# 认证配置
AUTHENTICATION_ENABLED: "true"
AUTHENTICATION_APIKEY_ENABLED: "true"
AUTHENTICATION_APIKEY_ALLOWED_KEYS: "your-api-key-here"
AUTHENTICATION_APIKEY_ADMIN_KEY: "admin-api-key-here"
# 授权配置
AUTHORIZATION_ADMINLIST_ENABLED: "true"
AUTHORIZATION_ADMINLIST_USERS: "admin-user-1,admin-user-2"
# PERSISTENCE配置
PERSISTENCE_DATA_PATH: "/var/lib/weaviate"
PERSISTENCE_LSM_ACCESS_STRATEGY: "preferDisk"
# 启用异步索引
ASYNC_INDEXING: "true"
REFRESH_TIME: "1s"
# 索引和查询配置
INDEX_MISSING_TEXT_WEBHOOK: "" # 可配置文本注入钩子
# 向量化模块配置
ENABLE_MODULES: "text2vec-openai,text2vec-cohere,text2vec-transformers,ref2vec-centroid,qna-openai,generative-openai"
DEFAULT_VECTORIZER_MODULE: "text2vec-transformers"
# Transformer模型配置(本地)
TRANSFORMERS_INFERENCE_API: "http://t2v-transformers:8080"
ENCLS_ENABLE_CUDA: "0" # 使用CPU
# 日志级别
LOG_LEVEL: "info"
# 资源限制
PROMETHEUS_MONITORING_ENABLED: "true"
RANGE_KEY_ALGORITHM: "zCurve"
volumes:
- weaviate_data:/var/lib/weaviate
deploy:
resources:
limits:
cpus: '4'
memory: 8G
reservations:
cpus: '2'
memory: 4G
healthcheck:
test: ["CMD", "wget", "-f", "http://localhost:8080/.well-known/health"]
interval: 30s
timeout: 10s
retries: 3
depends_on:
- t2v-transformers
# 本地向量化模型(可选)
t2v-transformers:
image: semitechnologies/transformers-inference:sentence-transformers-paraphrase-multilingual-MiniLM-L12-v2
environment:
ENABLE_CUDA: "0"
deploy:
resources:
limits:
cpus: '4'
memory: 4G
volumes:
weaviate_data:3.2 Kubernetes部署
# weaviate-k8s.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: weaviate
namespace: vector-system
spec:
replicas: 3
selector:
matchLabels:
app: weaviate
template:
metadata:
labels:
app: weaviate
spec:
containers:
- name: weaviate
image: semitechnologies/weaviate:1.22.4
ports:
- containerPort: 8080
env:
- name: PERSISTENCE_DATA_PATH
value: /var/lib/weaviate
- name: ENABLE_MODULES
value: "text2vec-openai,qna-openai,generative-openai"
- name: DEFAULT_VECTORIZER_MODULE
value: "text2vec-openai"
- name: OPENAI_APIKEY
valueFrom:
secretKeyRef:
name: weaviate-secrets
key: openai-api-key
- name: ASYNC_INDEXING
value: "true"
- name: AUTHENTICATION_APIKEY_ENABLED
value: "true"
- name: AUTHENTICATION_APIKEY_ALLOWED_KEYS
value: "user-api-key-1,user-api-key-2"
- name: AUTHENTICATION_APIKEY_ADMIN_KEY
valueFrom:
secretKeyRef:
name: weaviate-secrets
key: admin-api-key
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "4"
memory: 8Gi
volumeMounts:
- name: weaviate-storage
mountPath: /var/lib/weaviate
volumes:
- name: weaviate-storage
persistentVolumeClaim:
claimName: weaviate-pvc
---
apiVersion: v1
kind: Service
metadata:
name: weaviate-service
namespace: vector-system
spec:
type: ClusterIP
selector:
app: weaviate
ports:
- port: 8080
targetPort: 8080
---
# 水平自动扩缩容
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: weaviate-hpa
namespace: vector-system
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: weaviate
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 703.3 Weaviate Python客户端
"""
Weaviate Python客户端
安装: pip install weaviate-client
"""
import weaviate
from weaviate.classes.config import Configure, Property, DataType
from weaviate.classes.query import Filter, MetadataQuery
import weaviate.classes as wvc
import numpy as np
from typing import List, Optional
class WeaviateVectorStore:
"""Weaviate向量存储封装"""
def __init__(self, url: str, api_key: Optional[str] = None):
# 初始化客户端
self.client = weaviate.connect_to_local(
host="localhost",
port=8080,
api_key=api_key
)
def create_collection(self, name: str,
vectorizer: str = "text2vec-transformers"):
"""创建Collection(类比表)"""
# 如果已存在则删除
if self.client.collections.exists(name):
self.client.collections.delete(name)
# 根据向量化方式选择配置
if vectorizer == "text2vec-transformers":
vectorizer_config = Configure.Vectorizer.text2vec_transformers(
vectorize_collection_name=False, # 不自动向量化集合名
pooling_strategy="mean"
)
elif vectorizer == "text2vec-openai":
vectorizer_config = Configure.Vectorizer.text2vec_openai(
model="ada",
vectorize_collection_name=False
)
# 创建Collection
articles = self.client.collections.create(
name=name,
properties=[
Property(name="title", data_type=DataType.TEXT),
Property(name="content", data_type=DataType.TEXT),
Property(name="category", data_type=DataType.TEXT),
Property(name="author", data_type=DataType.TEXT),
Property(name="created_at", data_type=DataType.DATE),
],
vectorizer_config=vectorizer_config,
# 索引配置
vector_index_config=Configure.VectorIndex.hnsw(
distance_metric=Configure.VectorIndex.HnswDistanceMetric.COSINE,
m=16,
ef_construction=200,
ef=128,
max_connections=64
),
# 租户配置(多租户场景)
multi_tenancy_config=Configure.multi_tenancy(
enabled=False
)
)
return articles
def insert(self, collection_name: str,
properties: dict,
vector: Optional[List[float]] = None):
"""插入单个对象(自动向量化或使用提供的向量)"""
collection = self.client.collections.get(collection_name)
# 如果提供向量,使用生成的;否则由Weaviate自动生成
if vector:
data_object = collection.data.insert(
properties=properties,
vector=vector
)
else:
data_object = collection.data.insert(
properties=properties
)
return data_object["id"]
def insert_batch(self, collection_name: str,
objects: List[dict],
vectors: Optional[List[List[float]]] = None):
"""批量插入"""
collection = self.client.collections.get(collection_name)
# 构建数据对象
data_objects = [
wvc.data.DataObject(
properties=obj["properties"],
vector=vectors[i] if vectors else None
)
for i, obj in enumerate(objects)
]
# 批量插入
response = collection.data.insert_many(data_objects)
return {
"successful": len(response.successful),
"failed": len(response.errors)
}
def search(self, collection_name: str,
query: Optional[str] = None,
vector: Optional[List[float]] = None,
limit: int = 10,
filters: Optional[Filter] = None,
return_properties: List[str] = None):
"""向量搜索"""
collection = self.client.collections.get(collection_name)
# 基础搜索
response = collection.query.bm25(
query=query,
limit=limit,
return_properties=return_properties,
return_metadata=MetadataQuery.full()
)
# 或使用向量搜索
# response = collection.query.near_vector(
# near_vector=vector,
# limit=limit,
# filters=filters
# )
# 混合搜索(推荐)
response = collection.query.hybrid(
query=query or "",
vector=vector,
alpha=0.7, # 0=纯关键词, 1=纯向量
limit=limit,
filters=filters,
return_properties=return_properties,
return_metadata=MetadataQuery(score=True, certainty=True)
)
return [{
"id": obj.uuid,
"properties": obj.properties,
"metadata": obj.metadata
} for obj in response.objects]
def get_by_id(self, collection_name: str, uuid: str):
"""根据ID获取对象"""
collection = self.client.collections.get(collection_name)
return collection.query.fetch_object_by_id(uuid)
def delete(self, collection_name: str, uuid: str):
"""删除对象"""
collection = self.client.collections.get(collection_name)
collection.data.delete_by_id(uuid)
# 使用示例
store = WeaviateVectorStore(
url="http://weaviate-service:8080",
api_key="your-api-key"
)
# 创建Collection
store.create_collection("knowledge_base")
# 批量插入
objects = [
{"properties": {"title": f"文章{i}", "content": f"内容{i}", "category": f"cat-{i%3}"}}
for i in range(100)
]
vectors = [np.random.rand(768).tolist() for _ in range(100)]
result = store.insert_batch("knowledge_base", objects, vectors)
# 搜索
results = store.search(
"knowledge_base",
query="机器学习",
limit=10,
return_properties=["title", "content", "category"]
)四、运维监控与性能调优
4.1 监控指标体系
# Prometheus + Grafana监控栈
# docker-compose.monitoring.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:v2.47.0
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--storage.tsdb.retention.time=30d'
grafana:
image: grafana/grafana:10.1.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
# 向量数据库监控面板
alertmanager:
image: prom/alertmanager:v0.26.0
ports:
- "9093:9093"
volumes:
prometheus_data:
grafana_data:# prometheus.yml - 监控配置
global:
scrape_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
rule_files:
- "alerts/*.yml"
scrape_configs:
# Milvus监控
- job_name: 'milvus'
static_configs:
- targets: ['milvus-service:9091']
# Qdrant监控
- job_name: 'qdrant'
static_configs:
- targets: ['qdrant:6333']
metrics_path: /metrics
# Weaviate监控
- job_name: 'weaviate'
static_configs:
- targets: ['weaviate:8080']
metrics_path: /v1/metrics
alerting:
alert_rules:
# 向量数据库告警规则
- alert: VectorDBHighLatency
expr: vector_db_query_latency_seconds > 1
for: 5m
labels:
severity: warning
annotations:
summary: "向量数据库查询延迟过高"
description: "P99延迟超过1秒,当前值: {{ $value }}"
- alert: VectorDBHighMemory
expr: vector_db_memory_usage_ratio > 0.9
for: 10m
labels:
severity: critical
annotations:
summary: "向量数据库内存使用率过高"
description: "内存使用超过90%,请检查数据量和索引配置"4.2 性能调优参数
"""
向量数据库性能调优指南
主要调优维度:
1. 索引参数优化
2. 内存配置
3. 批量操作
4. 连接管理
"""
class PerformanceTuning:
"""性能调优建议"""
# HNSW参数调优
hnsw_tuning = {
"M": {
"low": 4, # 低内存占用,高召回
"medium": 16, # 平衡配置
"high": 32 # 高精度,高内存
},
"efConstruction": {
"fast": 100, # 快速构建
"normal": 200, # 标准配置
"quality": 400 # 高质量构建
},
"ef": {
"fast": 64, # 快速查询
"normal": 128, # 标准查询
"quality": 256 # 精确查询
}
}
# 内存调优建议
memory_tuning = {
"per_vector_mb": {
"768dim_float32": 768 * 4 / (1024 * 1024), # ~3KB
"768dim_float16": 768 * 2 / (1024 * 1024), # ~1.5KB
"768dim_int8": 768 * 1 / (1024 * 1024), # ~0.75KB
},
"recommendation": """
内存配置建议:
- 数据量 < 1M向量: 8-16GB
- 数据量 1-10M向量: 32-64GB
- 数据量 > 10M向量: 128GB+
公式: 所需内存 ≈ 向量数 × 向量维度 × 4bytes × 1.2(开销系数)
"""
}
# 批量操作优化
batch_optimization = {
"insert_batch_size": 1000, # 单批次向量数
"bulk_request_timeout": 300, # 超时秒数
"concurrent_requests": 10, # 并发请求数
"compression": True # 启用压缩
}4.3 备份恢复策略
"""
向量数据库备份恢复策略
"""
class BackupStrategy:
"""备份恢复策略"""
@staticmethod
def create_backup(db_type: str, collection_name: str, backup_path: str):
"""创建备份"""
if db_type == "milvus":
# Milvus使用snapshot
from pymilvus import utility
snapshot = utility.snapshot(collection_name)
# 保存到指定路径
pass
elif db_type == "qdrant":
# Qdrant创建collection snapshot
# client.create_snapshot(collection_name)
pass
elif db_type == "weaviate":
# Weaviate导出数据
# client.backup.create()
pass
@staticmethod
def restore_backup(db_type: str, collection_name: str, backup_path: str):
"""恢复备份"""
if db_type == "milvus":
# 从snapshot恢复
# utility.load_snapshot(collection_name, snapshot_path)
pass
elif db_type == "qdrant":
# Qdrant从snapshot恢复
# client.recover_from_snapshot(collection_name, location)
pass
@staticmethod
def schedule_backups():
"""定时备份配置"""
backup_config = """
# cron表达式: 分 时 日 月 周
# 每天凌晨2点备份
0 2 * * * /scripts/backup_vector_db.sh
# 备份保留策略
- 每日备份: 保留7天
- 每周备份: 保留4周
- 每月备份: 保留12个月
"""
return backup_config五、相关主题链接
- 向量数据库对比 - 各数据库特性对比
- Embedding模型选择 - 向量生成模型
- 重排技术深度指南 - 检索后优化
- 混合检索技术 - 多路召回策略
- GraphRAG深度指南 - GraphRAG架构
- 知识库评估体系 - 系统评估方法
更新日志
- 2026-04-18: 初始版本完成
- 涵盖Milvus、Qdrant、Weaviate的完整部署指南
- 提供Kubernetes配置和性能调优建议