向量数据库部署

摘要

本文档提供主流向量数据库的完整部署指南,涵盖Docker容器化部署、客户端SDK配置、索引类型选择与调优、以及生产环境性能优化策略。通过详细的配置文件示例和步骤说明,帮助快速搭建生产级别的向量检索基础设施。

关键词速查表

关键词说明
Docker Compose多容器编排工具
Kubernetes容器编排平台
Helm ChartKubernetes包管理工具
索引配置HNSW/IVF/PQ等参数设置
资源配额CPU/内存/GPU配置
连接池客户端连接复用
批量操作提升吞吐量的批量接口
监控告警Prometheus/Grafana集成
备份恢复数据持久化策略
滚动升级零停机更新方案

一、Milvus部署指南

1.1 单机快速部署

Milvus的单机部署适合开发和测试环境,使用Docker Compose可以快速启动。

# 创建工作目录
mkdir -p milvus/volumes && cd milvus
 
# 下载docker-compose配置
curl -sL https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-standalone-docker-compose.yml \
  -o docker-compose.yml
 
# 启动服务
docker-compose up -d
 
# 查看服务状态
docker-compose ps
 
# 查看日志
docker-compose logs -f milvus
 
# 停止服务
docker-compose down
# docker-compose.yml 配置文件详解
version: '3.8'
 
services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_SNAPSHOT_COUNT=50000
    volumes:
      - ./volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 
             -listen-client-urls http://0.0.0.0:2379 
             --data-dir /etcd
    networks:
      - milvus
 
  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    ports:
      - "9001:9001"
      - "9000:9000"
    volumes:
      - ./volumes/minio:/minio_data
    command: minio server /minio_data --console-address ":9001"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3
    networks:
      - milvus
 
  milvus:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.3.3
    command: ["milvus", "run", "standalone"]
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
      # 日志配置
      LOG_LEVEL: info
      LOG_FORMAT: text
      # 存储配置
      COMMON_STORAGETYPE: local
    volumes:
      - ./volumes/milvus:/var/lib/milvus
      - ./volumes/files:/home/milvus/connect grammed-files
    ports:
      - "19530:19530"   # GRPC端口
      - "9091:9091"     # Prometheus端口
    depends_on:
      - etcd
      - minio
    networks:
      - milvus
 
networks:
  milvus:
    driver: bridge

1.2 集群模式部署

生产环境推荐使用Kubernetes部署Milvus集群,以获得高可用和横向扩展能力。

# milvus-cluster.yaml - Milvus集群Helm配置
 
# 添加Helm仓库
# helm repo add milvus https://milvus-io.github.io/milvus-helm/
# helm repo update
 
# values.yaml 详细配置
values:
  cluster:
    enabled: true
    
  # 各组件副本数
  milvus:
    enabled: true
    name: milvus
    replicas: 3  # DataNode和QueryNode副本
    
  # 索引节点(计算密集型)
  indexNode:
    enabled: true
    replicas: 2
    resources:
      requests:
        cpu: "2"
        memory: 8Gi
      limits:
        cpu: "4"
        memory: 16Gi
        # nvidia.com/gpu: "1"  # GPU加速索引构建
        
  # 查询节点(内存密集型)
  queryNode:
    enabled: true
    replicas: 3
    resources:
      requests:
        cpu: "2"
        memory: 32Gi
      limits:
        cpu: "8"
        memory: 64Gi
        
  # 数据节点
  dataNode:
    enabled: true
    replicas: 2
    resources:
      requests:
        cpu: "1"
        memory: 8Gi
      limits:
        cpu: "4"
        memory: 16Gi
        
  # 代理节点(接入层)
  proxy:
    enabled: true
    replicas: 2
    service:
      type: LoadBalancer
    resources:
      requests:
        cpu: "1"
        memory: 4Gi
      limits:
        cpu: "2"
        memory: 8Gi
 
  # 存储配置
  etcd:
    enabled: true
    mode: cluster  # 使用集群模式
    replicaCount: 3
    persistence:
      enabled: true
      storageClass: "ssd"
      size: 20Gi
      
  minio:
    enabled: true
    mode: distributed  # 分布式模式
    replicaCount: 4
    persistence:
      enabled: true
      storageClass: "ssd"
      size: 100Gi
 
  # 配置
  config:
    log:
      level: info
    cache:
      insertBufferSize: 16GB  # 插入缓冲区大小
    grpc:
      port: 19530
      serverMaxSendSize: 2147483647  # 2GB
      serverMaxRecvSize: 2147483647
# 使用Helm部署
helm install milvus-cluster milvus/milvus \
  -n milvus-system \
  --create-namespace \
  -f values.yaml
 
# 查看部署状态
kubectl get pods -n milvus-system
 
# 扩缩容
kubectl scale deployment milvus-datacoord -n milvus-system --replicas=5
 
# 更新配置
helm upgrade milvus-cluster milvus/milvus -n milvus-system -f values.yaml
 
# 卸载
helm uninstall milvus-cluster -n milvus-system

1.3 Milvus客户端配置

"""
Milvus Python客户端最佳配置
 
安装: pip install pymilvus[torch]
"""
 
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
from pymilvus.client import types
import numpy as np
from typing import List
 
class MilvusClient:
    """Milvus客户端封装"""
    
    def __init__(self, host="localhost", port="19530", **kwargs):
        # 建立连接
        connections.connect(
            alias="default",
            host=host,
            port=port,
            # 连接池配置
            pool_size=10,
            max_pool_size=100,
            **kwargs
        )
        self.alias = "default"
    
    def create_collection(self, name: str, dim: int, 
                        metric_type: str = "COSINE",
                        index_type: str = "HNSW",
                        **kwargs):
        """创建集合(Collection)"""
        
        # 字段定义
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
            FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=100),
            FieldSchema(name="timestamp", dtype=DataType.INT64),  # 用于时间范围查询
        ]
        
        schema = CollectionSchema(
            fields=fields,
            description=f"Collection: {name}",
            enable_dynamic_field=False
        )
        
        # 创建集合
        collection = Collection(name=name, schema=schema, using=self.alias)
        
        # 创建索引
        index_params = self._get_index_params(metric_type, index_type, **kwargs)
        collection.create_index(
            field_name="embedding",
            index_params=index_params
        )
        
        # 创建分区(用于冷热数据分离)
        collection.create_partition("hot_data", description="最近30天数据")
        collection.create_partition("cold_data", description="历史数据")
        
        return collection
    
    def _get_index_params(self, metric_type: str, index_type: str, **kwargs):
        """根据索引类型生成参数"""
        
        params_map = {
            "HNSW": {
                "index_type": "HNSW",
                "metric_type": metric_type,
                "params": {
                    "M": kwargs.get("M", 16),  # 节点连接数,越大越精确但越慢
                    "efConstruction": kwargs.get("efConstruction", 200)  # 构建时探索因子
                }
            },
            "IVF_FLAT": {
                "index_type": "IVF_FLAT",
                "metric_type": metric_type,
                "params": {
                    "nlist": kwargs.get("nlist", 1024)  # 聚类中心数
                }
            },
            "IVF_PQ": {
                "index_type": "IVF_PQ",
                "metric_type": metric_type,
                "params": {
                    "nlist": kwargs.get("nlist", 1024),
                    "nbits": kwargs.get("nbits", 8)  # 每个子向量位数
                }
            }
        }
        
        return params_map.get(index_type, params_map["HNSW"])
    
    def insert_batch(self, collection_name: str, 
                    embeddings: List[List[float]],
                    texts: List[str],
                    categories: List[str],
                    partition_name: str = None):
        """批量插入"""
        collection = Collection(collection_name)
        
        # 生成时间戳
        import time
        timestamps = [int(time.time() * 1000)] * len(embeddings)
        
        data = [
            embeddings,
            texts,
            categories,
            timestamps
        ]
        
        result = collection.insert(data, partition_name=partition_name)
        
        # 定期刷新以提高可见性
        collection.flush()
        
        return result
    
    def search(self, collection_name: str, 
              query_vector: List[float],
              top_k: int = 10,
              expr: str = None,
              partition_names: List[str] = None,
              **kwargs):
        """向量搜索"""
        collection = Collection(collection_name)
        
        # 搜索参数
        search_params = {
            "metric_type": kwargs.get("metric_type", "COSINE"),
            "params": kwargs.get("search_params", {"ef": 128})
        }
        
        results = collection.search(
            data=[query_vector],
            anns_field="embedding",
            param=search_params,
            limit=top_k,
            expr=expr,
            partition_names=partition_names,
            output_fields=["text", "category", "timestamp"]
        )
        
        return self._format_results(results)
    
    def _format_results(self, results):
        """格式化搜索结果"""
        formatted = []
        for hits in results:
            for hit in hits:
                formatted.append({
                    "id": hit.id,
                    "distance": hit.distance,
                    "entity": hit.entity
                })
        return formatted
    
    def get_collection_stats(self, collection_name: str):
        """获取集合统计信息"""
        collection = Collection(collection_name)
        return collection.num_entities
    
    def close(self):
        """关闭连接"""
        connections.disconnect(alias=self.alias)
 
 
# 使用示例
client = MilvusClient(host="milvus-cluster.milvus-system.svc.cluster.local")
 
# 创建集合
client.create_collection(
    name="knowledge_base",
    dim=768,
    metric_type="COSINE",
    index_type="HNSW",
    M=16,
    efConstruction=200
)
 
# 插入数据
client.insert_batch(
    collection_name="knowledge_base",
    embeddings=[[0.1]*768 for _ in range(1000)],
    texts=[f"文档{i}内容" for i in range(1000)],
    categories=[f"category-{i%5}" for i in range(1000)]
)
 
# 搜索
results = client.search(
    collection_name="knowledge_base",
    query_vector=[0.1]*768,
    top_k=10,
    expr='category == "category-0"',
    search_params={"ef": 128}
)

1.4 性能监控

# Prometheus配置 - Milvus监控
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s
 
scrape_configs:
  - job_name: 'milvus'
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_app]
        action: keep
        regex: milvus
      - source_labels: [__meta_kubernetes_pod_container_port_number]
        action: keep
        regex: "9091"

二、Qdrant部署指南

2.1 Docker快速部署

# 创建数据目录
mkdir -p qdrant/storage
 
# 启动Qdrant服务
docker run -d \
  --name qdrant \
  -p 6333:6333 \
  -p 6334:6334 \
  -v $(pwd)/qdrant/storage:/qdrant/storage \
  qdrant/qdrant:v1.7.0
 
# 验证服务
curl http://localhost:6333/readyz

2.2 Docker Compose生产配置

# qdrant-docker-compose.yml
version: '3.8'
 
services:
  qdrant:
    image: qdrant/qdrant:v1.7.0
    container_name: qdrant
    restart: unless-stopped
    ports:
      - "6333:6333"  # REST API
      - "6334:6334"  # gRPC API
    volumes:
      - ./storage:/qdrant/storage
      - ./config:/qdrant/config
    environment:
      # 日志级别
      QDRANT__LOG_LEVEL: INFO
      # 性能配置
      QDRANT__SERVICE__MAX_REQUEST_SIZE_MB: 32
      QDRANT__SERVICE__MAX_WORKERS: 16
      # 存储配置
      QDRANT__STORAGE__STORAGE_PATH: /qdrant/storage
      QDRANT__STORAGE__SNAPSHOT_TX_SIZE: 104857600  # 100MB
      # 优化器配置
      QDRANT__STORAGE__OPTIMIZERS__INDEXING_THRESHOLD: 20000
      QDRANT__STORAGE__OPTIMIZERS__MEMMAP_THRESHOLD_KB: 1024000
      # HNSW参数
      QDRANT__STORAGE__HNSW_INDEX__M: 16
      QDRANT__STORAGE__HNSW_INDEX__EF_CONSTRUCTION: 200
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:6333/health"]
      interval: 10s
      timeout: 5s
      retries: 5
    deploy:
      resources:
        limits:
          cpus: '4'
          memory: 16G
        reservations:
          cpus: '2'
          memory: 8G
 
  # 监控导出器(可选)
  qdrant-metrics:
    image: prom/client_python:latest
    depends_on:
      - qdrant
    command: python -c "
      import prometheus_client
      import requests
      import time
      from prometheus_client import Counter, Histogram, Gauge
      
      # 定义指标
      requests_total = Counter('qdrant_requests_total', 'Total requests', ['method', 'endpoint'])
      request_duration = Histogram('qdrant_request_duration_seconds', 'Request duration')
      vector_count = Gauge('qdrant_vector_count', 'Number of vectors')
      
      while True:
          try:
              # 获取Qdrant指标
              resp = requests.get('http://qdrant:6333/metrics')
              print(resp.text)
          except:
              pass
          time.sleep(15)
      "
    restart: unless-stopped

2.3 Kubernetes部署

# qdrant-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: qdrant
  namespace: vector-system
spec:
  serviceName: qdrant-headless
  replicas: 3
  selector:
    matchLabels:
      app: qdrant
  template:
    metadata:
      labels:
        app: qdrant
    spec:
      containers:
        - name: qdrant
          image: qdrant/qdrant:v1.7.0
          ports:
            - containerPort: 6333
              name: http
            - containerPort: 6334
              name: grpc
          resources:
            requests:
              cpu: "2"
              memory: 8Gi
            limits:
              cpu: "4"
              memory: 16Gi
          env:
            - name: QDRANT__SERVICE__TLS
              value: "false"
          volumeMounts:
            - name: qdrant-storage
              mountPath: /qdrant/storage
          livenessProbe:
            httpGet:
              path: /health
              port: 6333
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /readyz
              port: 6333
            initialDelaySeconds: 5
            periodSeconds: 5
          # 启动命令
          command: ["qdrant"]
          args:
            - "--address"
            - "0.0.0.0:6333"
            - "--config-path"
            - "/qdrant/config/config.yaml"
      volumes:
        - name: qdrant-storage
          persistentVolumeClaim:
            claimName: qdrant-pvc
 
---
# Service
apiVersion: v1
kind: Service
metadata:
  name: qdrant-service
  namespace: vector-system
spec:
  type: LoadBalancer
  selector:
    app: qdrant
  ports:
    - port: 6333
      targetPort: 6333
      name: http
    - port: 6334
      targetPort: 6334
      name: grpc
# qdrant-config.yaml
storage:
  storage_path: /qdrant/storage
  snapshots_path: /qdrant/snapshots
  
  # 优化器配置
  optimizers:
    indexing_threshold: 20000
    memmap_threshold_kb: 1024000
    flush_interval_sec: 5
    max_optimization_threads: 2
  
  # HNSW索引配置
  hnsw_index:
    m: 16
    ef_construct: 200
    full_scan_threshold: 10000
    on_disk: false
    max_indexing_threads: 2
 
service:
  host: 0.0.0.0
  http_port: 6333
  grpc_port: 6334
  max_request_size_mb: 32
  max_workers: 16

2.4 Qdrant Python客户端

"""
Qdrant Python客户端最佳实践
 
安装: pip install qdrant-client
"""
 
from qdrant_client import QdrantClient, models
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter
from qdrant_client.qdrant_fastembed import FastEmbedEmbeddingFunction
import numpy as np
from typing import List, Optional
 
class QdrantVectorStore:
    """Qdrant向量存储封装"""
    
    def __init__(self, host="localhost", port=6333, 
                 api_key: Optional[str] = None):
        self.client = QdrantClient(
            host=host,
            port=port,
            api_key=api_key,
            # 连接配置
            timeout=30,
            prefer_grpc=True  # gRPC性能更好
        )
        self._embedding_function = FastEmbedEmbeddingFunction(
            model_name="BAAI/bge-small-zh-v1.5",
            parallel=4  # 并行化编码
        )
    
    def create_collection(self, name: str, vector_size: int = 768,
                        distance: str = "Cosine"):
        """创建集合"""
        
        # 删除已存在的集合
        if self.client.collection_exists(name):
            self.client.delete_collection(name)
        
        # 距离度量映射
        distance_map = {
            "Cosine": Distance.COSINE,
            "Euclidean": Distance.EUCLID,
            "Dot": Distance.DOT
        }
        
        self.client.create_collection(
            collection_name=name,
            vectors_config=VectorParams(
                size=vector_size,
                distance=distance_map.get(distance, Distance.COSINE),
                on_disk=True  # 向量存储在磁盘,节省内存
            ),
            # 稀疏向量配置(支持混合搜索)
            sparse_vectors_config={
                "text": models.SparseVectorParams(
                    index=models.SparseIndexParams(
                        on_disk=False,
                        full_scan_threshold=10000
                    )
                )
            },
            # 优化器配置
            optimizers_config=models.OptimizersConfig(
                indexing_threshold=20000,
                memmap_threshold_kb=1024000,
                max_optimization_threads=2
            ),
            # HNSW参数
            hnsw_config=models.HnswConfig(
                m=16,
                ef_construct=200,
                full_scan_threshold=10000,
                max_indexing_threads=2
            )
        )
    
    def upsert(self, collection_name: str,
              points: List[PointStruct]):
        """批量插入向量"""
        # 分批处理,避免大请求超时
        batch_size = 1000
        for i in range(0, len(points), batch_size):
            batch = points[i:i+batch_size]
            self.client.upsert(
                collection_name=collection_name,
                points=batch,
                wait=True  # 等待索引更新完成
            )
    
    def search(self, collection_name: str,
              query_vector: List[float],
              top_k: int = 10,
              query_filter: Optional[Filter] = None,
              score_threshold: Optional[float] = None,
              with_payload: bool = True,
              with_vectors: bool = False):
        """向量搜索"""
        
        search_params = models.SearchParams(
            hnsw_ef=128,  # 查询时探索因子,越大越精确
            exact=False  # 是否使用精确搜索(慢但100%召回)
        )
        
        results = self.client.search(
            collection_name=collection_name,
            query_vector=query_vector,
            query_filter=query_filter,
            search_params=search_params,
            limit=top_k,
            score_threshold=score_threshold,
            with_payload=with_payload,
            with_vectors=with_vectors
        )
        
        return [{
            "id": r.id,
            "score": r.score,
            "payload": r.payload
        } for r in results]
    
    def search_batch(self, collection_name: str,
                   query_vectors: List[List[float]],
                   top_k: int = 10,
                   query_filter: Optional[Filter] = None):
        """批量搜索"""
        
        results = self.client.search_batch(
            collection_name=collection_name,
            requests=[
                models.SearchRequest(
                    vector=qv,
                    filter=query_filter,
                    limit=top_k,
                    params=models.SearchParams(hnsw_ef=128)
                )
                for qv in query_vectors
            ]
        )
        
        return [[{
            "id": r.id,
            "score": r.score,
            "payload": r.payload
        } for r in batch_results] for batch_results in results]
    
    def hybrid_search(self, collection_name: str,
                     query: str,
                     query_vector: List[float],
                     top_k: int = 10):
        """混合搜索(向量+关键词)"""
        
        # 获取查询的稀疏向量
        sparse_vector = self._embedding_function.query_encode(query)
        
        results = self.client.search(
            collection_name=collection_name,
            query_vector=query_vector,
            query_filter=None,
            search_params=models.SearchParams(
                hnsw_ef=128,
                exact=False
            ),
            limit=top_k,
            with_payload=True,
            # 稀疏向量检索
            sparse_vector=models.NamedSparseVector(
                name="text",
                vector=sparse_vector
            ),
            # 融合策略
            fusion=models.FusionQuery(
                fusion=models.Fusion.RRF  # Reciprocal Rank Fusion
            )
        )
        
        return results
    
    def scroll(self, collection_name: str,
              limit: int = 100,
              offset: Optional[str] = None,
              filter_cond: Optional[Filter] = None):
        """分页浏览数据"""
        return self.client.scroll(
            collection_name=collection_name,
            limit=limit,
            offset=offset,
            filter=filter_cond,
            with_payload=True
        )
    
    def get_collection_info(self, collection_name: str):
        """获取集合信息"""
        info = self.client.get_collection(collection_name)
        return {
            "name": info.name,
            "vectors_count": info.vectors_count,
            "points_count": info.points_count,
            "status": info.status,
            "indexed_vectors_count": info.indexed_vectors_count
        }
    
    def create_snapshot(self, collection_name: str):
        """创建快照"""
        return self.client.create_snapshot(collection_name=collection_name)
    
    def restore_snapshot(self, collection_name: str, location: str):
        """从快照恢复"""
        self.client.recover_from_snapshot(
            collection_name=collection_name,
            location=location
        )
 
 
# 使用示例
store = QdrantVectorStore(
    host="qdrant-service.vector-system.svc.cluster.local",
    port=6333
)
 
# 创建集合
store.create_collection(
    name="knowledge_base",
    vector_size=768,
    distance="Cosine"
)
 
# 批量插入
points = [
    PointStruct(
        id=f"doc-{i}",
        vector=np.random.rand(768).tolist(),
        payload={
            "text": f"文档{i}内容",
            "category": f"category-{i%5}",
            "created_at": "2024-01-01"
        }
    )
    for i in range(10000)
]
store.upsert("knowledge_base", points)
 
# 搜索
results = store.search(
    "knowledge_base",
    query_vector=np.random.rand(768).tolist(),
    top_k=10,
    query_filter=Filter(
        must=[
            models.FieldCondition(
                key="category",
                match=models.MatchValue(value="category-0")
            )
        ]
    ),
    score_threshold=0.7
)
 
# 混合搜索
results = store.hybrid_search(
    "knowledge_base",
    query="机器学习",
    query_vector=np.random.rand(768).tolist(),
    top_k=10
)

三、Weaviate部署指南

3.1 Docker Compose部署

# weaviate-docker-compose.yml
version: '3.8'
 
services:
  weaviate:
    image: semitechnologies/weaviate:1.22.4
    restart: unless-stopped
    ports:
      - "8080:8080"  # HTTP API
    environment:
      # 认证配置
      AUTHENTICATION_ENABLED: "true"
      AUTHENTICATION_APIKEY_ENABLED: "true"
      AUTHENTICATION_APIKEY_ALLOWED_KEYS: "your-api-key-here"
      AUTHENTICATION_APIKEY_ADMIN_KEY: "admin-api-key-here"
      
      # 授权配置
      AUTHORIZATION_ADMINLIST_ENABLED: "true"
      AUTHORIZATION_ADMINLIST_USERS: "admin-user-1,admin-user-2"
      
      # PERSISTENCE配置
      PERSISTENCE_DATA_PATH: "/var/lib/weaviate"
      PERSISTENCE_LSM_ACCESS_STRATEGY: "preferDisk"
      
      # 启用异步索引
      ASYNC_INDEXING: "true"
      REFRESH_TIME: "1s"
      
      # 索引和查询配置
      INDEX_MISSING_TEXT_WEBHOOK: ""  # 可配置文本注入钩子
      
      # 向量化模块配置
      ENABLE_MODULES: "text2vec-openai,text2vec-cohere,text2vec-transformers,ref2vec-centroid,qna-openai,generative-openai"
      DEFAULT_VECTORIZER_MODULE: "text2vec-transformers"
      
      # Transformer模型配置(本地)
      TRANSFORMERS_INFERENCE_API: "http://t2v-transformers:8080"
      ENCLS_ENABLE_CUDA: "0"  # 使用CPU
      
      # 日志级别
      LOG_LEVEL: "info"
      
      # 资源限制
      PROMETHEUS_MONITORING_ENABLED: "true"
      RANGE_KEY_ALGORITHM: "zCurve"
    
    volumes:
      - weaviate_data:/var/lib/weaviate
    
    deploy:
      resources:
        limits:
          cpus: '4'
          memory: 8G
        reservations:
          cpus: '2'
          memory: 4G
    
    healthcheck:
      test: ["CMD", "wget", "-f", "http://localhost:8080/.well-known/health"]
      interval: 30s
      timeout: 10s
      retries: 3
    
    depends_on:
      - t2v-transformers
 
  # 本地向量化模型(可选)
  t2v-transformers:
    image: semitechnologies/transformers-inference:sentence-transformers-paraphrase-multilingual-MiniLM-L12-v2
    environment:
      ENABLE_CUDA: "0"
    deploy:
      resources:
        limits:
          cpus: '4'
          memory: 4G
 
volumes:
  weaviate_data:

3.2 Kubernetes部署

# weaviate-k8s.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: weaviate
  namespace: vector-system
spec:
  replicas: 3
  selector:
    matchLabels:
      app: weaviate
  template:
    metadata:
      labels:
        app: weaviate
    spec:
      containers:
        - name: weaviate
          image: semitechnologies/weaviate:1.22.4
          ports:
            - containerPort: 8080
          env:
            - name: PERSISTENCE_DATA_PATH
              value: /var/lib/weaviate
            - name: ENABLE_MODULES
              value: "text2vec-openai,qna-openai,generative-openai"
            - name: DEFAULT_VECTORIZER_MODULE
              value: "text2vec-openai"
            - name: OPENAI_APIKEY
              valueFrom:
                secretKeyRef:
                  name: weaviate-secrets
                  key: openai-api-key
            - name: ASYNC_INDEXING
              value: "true"
            - name: AUTHENTICATION_APIKEY_ENABLED
              value: "true"
            - name: AUTHENTICATION_APIKEY_ALLOWED_KEYS
              value: "user-api-key-1,user-api-key-2"
            - name: AUTHENTICATION_APIKEY_ADMIN_KEY
              valueFrom:
                secretKeyRef:
                  name: weaviate-secrets
                  key: admin-api-key
          resources:
            requests:
              cpu: "1"
              memory: 2Gi
            limits:
              cpu: "4"
              memory: 8Gi
          volumeMounts:
            - name: weaviate-storage
              mountPath: /var/lib/weaviate
      volumes:
        - name: weaviate-storage
          persistentVolumeClaim:
            claimName: weaviate-pvc
 
---
apiVersion: v1
kind: Service
metadata:
  name: weaviate-service
  namespace: vector-system
spec:
  type: ClusterIP
  selector:
    app: weaviate
  ports:
    - port: 8080
      targetPort: 8080
 
---
# 水平自动扩缩容
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: weaviate-hpa
  namespace: vector-system
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: weaviate
  minReplicas: 3
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

3.3 Weaviate Python客户端

"""
Weaviate Python客户端
 
安装: pip install weaviate-client
"""
 
import weaviate
from weaviate.classes.config import Configure, Property, DataType
from weaviate.classes.query import Filter, MetadataQuery
import weaviate.classes as wvc
import numpy as np
from typing import List, Optional
 
class WeaviateVectorStore:
    """Weaviate向量存储封装"""
    
    def __init__(self, url: str, api_key: Optional[str] = None):
        # 初始化客户端
        self.client = weaviate.connect_to_local(
            host="localhost",
            port=8080,
            api_key=api_key
        )
    
    def create_collection(self, name: str, 
                         vectorizer: str = "text2vec-transformers"):
        """创建Collection(类比表)"""
        
        # 如果已存在则删除
        if self.client.collections.exists(name):
            self.client.collections.delete(name)
        
        # 根据向量化方式选择配置
        if vectorizer == "text2vec-transformers":
            vectorizer_config = Configure.Vectorizer.text2vec_transformers(
                vectorize_collection_name=False,  # 不自动向量化集合名
                pooling_strategy="mean"
            )
        elif vectorizer == "text2vec-openai":
            vectorizer_config = Configure.Vectorizer.text2vec_openai(
                model="ada",
                vectorize_collection_name=False
            )
        
        # 创建Collection
        articles = self.client.collections.create(
            name=name,
            properties=[
                Property(name="title", data_type=DataType.TEXT),
                Property(name="content", data_type=DataType.TEXT),
                Property(name="category", data_type=DataType.TEXT),
                Property(name="author", data_type=DataType.TEXT),
                Property(name="created_at", data_type=DataType.DATE),
            ],
            vectorizer_config=vectorizer_config,
            # 索引配置
            vector_index_config=Configure.VectorIndex.hnsw(
                distance_metric=Configure.VectorIndex.HnswDistanceMetric.COSINE,
                m=16,
                ef_construction=200,
                ef=128,
                max_connections=64
            ),
            # 租户配置(多租户场景)
            multi_tenancy_config=Configure.multi_tenancy(
                enabled=False
            )
        )
        
        return articles
    
    def insert(self, collection_name: str,
              properties: dict,
              vector: Optional[List[float]] = None):
        """插入单个对象(自动向量化或使用提供的向量)"""
        collection = self.client.collections.get(collection_name)
        
        # 如果提供向量,使用生成的;否则由Weaviate自动生成
        if vector:
            data_object = collection.data.insert(
                properties=properties,
                vector=vector
            )
        else:
            data_object = collection.data.insert(
                properties=properties
            )
        
        return data_object["id"]
    
    def insert_batch(self, collection_name: str,
                    objects: List[dict],
                    vectors: Optional[List[List[float]]] = None):
        """批量插入"""
        collection = self.client.collections.get(collection_name)
        
        # 构建数据对象
        data_objects = [
            wvc.data.DataObject(
                properties=obj["properties"],
                vector=vectors[i] if vectors else None
            )
            for i, obj in enumerate(objects)
        ]
        
        # 批量插入
        response = collection.data.insert_many(data_objects)
        
        return {
            "successful": len(response.successful),
            "failed": len(response.errors)
        }
    
    def search(self, collection_name: str,
              query: Optional[str] = None,
              vector: Optional[List[float]] = None,
              limit: int = 10,
              filters: Optional[Filter] = None,
              return_properties: List[str] = None):
        """向量搜索"""
        collection = self.client.collections.get(collection_name)
        
        # 基础搜索
        response = collection.query.bm25(
            query=query,
            limit=limit,
            return_properties=return_properties,
            return_metadata=MetadataQuery.full()
        )
        
        # 或使用向量搜索
        # response = collection.query.near_vector(
        #     near_vector=vector,
        #     limit=limit,
        #     filters=filters
        # )
        
        # 混合搜索(推荐)
        response = collection.query.hybrid(
            query=query or "",
            vector=vector,
            alpha=0.7,  # 0=纯关键词, 1=纯向量
            limit=limit,
            filters=filters,
            return_properties=return_properties,
            return_metadata=MetadataQuery(score=True, certainty=True)
        )
        
        return [{
            "id": obj.uuid,
            "properties": obj.properties,
            "metadata": obj.metadata
        } for obj in response.objects]
    
    def get_by_id(self, collection_name: str, uuid: str):
        """根据ID获取对象"""
        collection = self.client.collections.get(collection_name)
        return collection.query.fetch_object_by_id(uuid)
    
    def delete(self, collection_name: str, uuid: str):
        """删除对象"""
        collection = self.client.collections.get(collection_name)
        collection.data.delete_by_id(uuid)
 
# 使用示例
store = WeaviateVectorStore(
    url="http://weaviate-service:8080",
    api_key="your-api-key"
)
 
# 创建Collection
store.create_collection("knowledge_base")
 
# 批量插入
objects = [
    {"properties": {"title": f"文章{i}", "content": f"内容{i}", "category": f"cat-{i%3}"}}
    for i in range(100)
]
vectors = [np.random.rand(768).tolist() for _ in range(100)]
result = store.insert_batch("knowledge_base", objects, vectors)
 
# 搜索
results = store.search(
    "knowledge_base",
    query="机器学习",
    limit=10,
    return_properties=["title", "content", "category"]
)

四、运维监控与性能调优

4.1 监控指标体系

# Prometheus + Grafana监控栈
# docker-compose.monitoring.yml
version: '3.8'
 
services:
  prometheus:
    image: prom/prometheus:v2.47.0
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--storage.tsdb.retention.time=30d'
 
  grafana:
    image: grafana/grafana:10.1.0
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/provisioning:/etc/grafana/provisioning
 
  # 向量数据库监控面板
  alertmanager:
    image: prom/alertmanager:v0.26.0
    ports:
      - "9093:9093"
 
volumes:
  prometheus_data:
  grafana_data:
# prometheus.yml - 监控配置
global:
  scrape_interval: 15s
 
alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']
 
rule_files:
  - "alerts/*.yml"
 
scrape_configs:
  # Milvus监控
  - job_name: 'milvus'
    static_configs:
      - targets: ['milvus-service:9091']
    
  # Qdrant监控
  - job_name: 'qdrant'
    static_configs:
      - targets: ['qdrant:6333']
    metrics_path: /metrics
  
  # Weaviate监控
  - job_name: 'weaviate'
    static_configs:
      - targets: ['weaviate:8080']
    metrics_path: /v1/metrics
 
alerting:
  alert_rules:
    # 向量数据库告警规则
    - alert: VectorDBHighLatency
      expr: vector_db_query_latency_seconds > 1
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "向量数据库查询延迟过高"
        description: "P99延迟超过1秒,当前值: {{ $value }}"
 
    - alert: VectorDBHighMemory
      expr: vector_db_memory_usage_ratio > 0.9
      for: 10m
      labels:
        severity: critical
      annotations:
        summary: "向量数据库内存使用率过高"
        description: "内存使用超过90%,请检查数据量和索引配置"

4.2 性能调优参数

"""
向量数据库性能调优指南
 
主要调优维度:
1. 索引参数优化
2. 内存配置
3. 批量操作
4. 连接管理
"""
 
class PerformanceTuning:
    """性能调优建议"""
    
    # HNSW参数调优
    hnsw_tuning = {
        "M": {
            "low": 4,    # 低内存占用,高召回
            "medium": 16, # 平衡配置
            "high": 32   # 高精度,高内存
        },
        "efConstruction": {
            "fast": 100,   # 快速构建
            "normal": 200, # 标准配置
            "quality": 400 # 高质量构建
        },
        "ef": {
            "fast": 64,    # 快速查询
            "normal": 128,  # 标准查询
            "quality": 256  # 精确查询
        }
    }
    
    # 内存调优建议
    memory_tuning = {
        "per_vector_mb": {
            "768dim_float32": 768 * 4 / (1024 * 1024),  # ~3KB
            "768dim_float16": 768 * 2 / (1024 * 1024),  # ~1.5KB
            "768dim_int8": 768 * 1 / (1024 * 1024),     # ~0.75KB
        },
        "recommendation": """
            内存配置建议:
            - 数据量 < 1M向量: 8-16GB
            - 数据量 1-10M向量: 32-64GB
            - 数据量 > 10M向量: 128GB+
            
            公式: 所需内存 ≈ 向量数 × 向量维度 × 4bytes × 1.2(开销系数)
        """
    }
    
    # 批量操作优化
    batch_optimization = {
        "insert_batch_size": 1000,  # 单批次向量数
        "bulk_request_timeout": 300,  # 超时秒数
        "concurrent_requests": 10,   # 并发请求数
        "compression": True         # 启用压缩
    }

4.3 备份恢复策略

"""
向量数据库备份恢复策略
"""
 
class BackupStrategy:
    """备份恢复策略"""
    
    @staticmethod
    def create_backup(db_type: str, collection_name: str, backup_path: str):
        """创建备份"""
        if db_type == "milvus":
            # Milvus使用snapshot
            from pymilvus import utility
            snapshot = utility.snapshot(collection_name)
            # 保存到指定路径
            pass
        
        elif db_type == "qdrant":
            # Qdrant创建collection snapshot
            # client.create_snapshot(collection_name)
            pass
        
        elif db_type == "weaviate":
            # Weaviate导出数据
            # client.backup.create()
            pass
    
    @staticmethod
    def restore_backup(db_type: str, collection_name: str, backup_path: str):
        """恢复备份"""
        if db_type == "milvus":
            # 从snapshot恢复
            # utility.load_snapshot(collection_name, snapshot_path)
            pass
        
        elif db_type == "qdrant":
            # Qdrant从snapshot恢复
            # client.recover_from_snapshot(collection_name, location)
            pass
    
    @staticmethod
    def schedule_backups():
        """定时备份配置"""
        backup_config = """
            # cron表达式: 分 时 日 月 周
            # 每天凌晨2点备份
            0 2 * * * /scripts/backup_vector_db.sh
            
            # 备份保留策略
            - 每日备份: 保留7天
            - 每周备份: 保留4周
            - 每月备份: 保留12个月
        """
        return backup_config

五、相关主题链接


更新日志

  • 2026-04-18: 初始版本完成
  • 涵盖Milvus、Qdrant、Weaviate的完整部署指南
  • 提供Kubernetes配置和性能调优建议