MongoDB 完全指南

NOTE

本文档最后更新于 2026年4月,涵盖 MongoDB 8.0 新特性、文档模型设计、Aggregation Pipeline、向量化搜索,以及在 AI 应用中的实战场景。


目录

  1. MongoDB 概述与核心定位
  2. MongoDB 8.0 新特性
  3. 文档模型与 Schema 设计
  4. Aggregation Pipeline
  5. MongoDB Atlas 云服务
  6. MongoDB vs PostgreSQL vs MySQL 对比
  7. Atlas AI Search 向量搜索
  8. Mongoose ODM
  9. 实战场景与代码示例
  10. 选型建议

MongoDB 概述与核心定位

为什么选择 MongoDB

MongoDB 是一个基于文档模型的 NoSQL 数据库,于 2009 年由 MongoDB Inc. 发布。它以 JSON 格式存储数据,提供了灵活的模式和强大的查询能力。

MongoDB 的核心优势体现在三个维度:

模式灵活:MongoDB 的文档模型允许同一集合中的文档拥有不同的字段,无需预定义 Schema。这对于需求变化快速的应用尤为有利。

水平扩展:MongoDB 原生支持分片(Sharding),可以轻松扩展到海量数据和高并发场景。

开发效率:JSON 文档结构与 JavaScript 天然契合,前端开发者学习曲线平缓。

MongoDB 在 AI 时代的价值

在 AI 应用中,MongoDB 展现了独特的优势:

  • 灵活的数据结构:AI 输出格式不固定,MongoDB 的灵活 Schema 完美适配
  • 向量搜索:Atlas Search 支持向量相似度搜索
  • 实时能力:Change Streams 支持实时数据处理
  • 文档原生:与 AI Prompt、Response 的 JSON 结构完美匹配

市场份额与行业地位

MongoDB 在 NoSQL 和文档数据库市场中占据主导地位:

排名数据库市场份额(2025)趋势
1MongoDB47.8%
2DynamoDB18.5%
3Couchbase8.2%
4Cassandra7.1%
5Cosmos DB6.3%

关键数据点

  • MongoDB 是全球最流行的文档数据库
  • 超过 40% 的新 Web 应用选择 MongoDB
  • 主要使用者包括 eBay、Adobe、Verizon、EA Games 等知名企业
  • MongoDB Atlas 云服务托管了数百万数据库实例
  • MongoDB Community 下载量超过 1 亿次

MongoDB 适用场景

场景推荐程度说明
内容管理系统⭐⭐⭐⭐⭐灵活的内容结构
物联网数据⭐⭐⭐⭐⭐时序数据存储
实时分析⭐⭐⭐⭐高吞吐量写入
移动应用⭐⭐⭐⭐离线优先架构
AI 数据存储⭐⭐⭐⭐灵活的数据结构

核心概念与数据模型

MongoDB 架构

MongoDB 采用分布式架构设计:

┌─────────────────────────────────────────────────────────────┐
│                    MongoDB 集群架构                            │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   应用层                                                      │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐               │
│   │  驱动    │  │ Shell    │  │ Compass │               │
│   └────┬─────┘  └────┬─────┘  └────┬─────┘               │
│        │               │               │                       │
│        └───────────────┼───────────────┘                       │
│                        ▼                                        │
│   ┌─────────────────────────────────────────────────────┐   │
│   │                  Router (mongos)                     │   │
│   │                  查询路由 + 分片协调                   │   │
│   └─────────────────────────────────────────────────────┘   │
│                        │                                        │
│        ┌───────────────┼───────────────┐                     │
│        ▼               ▼               ▼                     │
│   ┌─────────┐     ┌─────────┐     ┌─────────┐             │
│   │ Shard 1 │     │ Shard 2 │     │ Shard 3 │             │
│   │(副本集)  │     │(副本集)  │     │(副本集)  │             │
│   │ ┌─────┐ │     │ ┌─────┐ │     │ ┌─────┐ │             │
│   │ │ P   │ │     │ │ P   │ │     │ │ P   │ │             │
│   │ ├─────┤ │     │ ├─────┤ │     │ ├─────┤ │             │
│   │ │ S1  │ │     │ │ S1  │ │     │ │ S1  │ │             │
│   │ ├─────┤ │     │ ├─────┤ │     │ ├─────┤ │             │
│   │ │ S2  │ │     │ │ S2  │ │     │ │ S2  │ │             │
│   │ └─────┘ │     │ └─────┘ │     │ └─────┘ │             │
│   └─────────┘     └─────────┘     └─────────┘             │
│                                                              │
│   ┌─────────────────────────────────────────────────────┐   │
│   │              Config Server (副本集)                   │   │
│   │           存储集群元数据和配置信息                     │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

核心概念

1. 文档模型

MongoDB 的基本数据单元是文档,类似 JSON 对象:

// 文档示例
{
  _id: ObjectId("..."),
  title: "MongoDB 入门",
  content: "这是一篇关于 MongoDB 的教程...",
  author: {
    name: "Alice",
    email: "alice@example.com",
    bio: "技术作家"
  },
  tags: ["mongodb", "database", "nosql"],
  stats: {
    views: 1000,
    likes: 50,
    shares: 10
  },
  comments: [
    { user: "Bob", text: "很实用的文章!", createdAt: ISODate() },
    { user: "Carol", text: "期待更多内容", createdAt: ISODate() }
  ],
  createdAt: ISODate(),
  updatedAt: ISODate()
}

2. 集合(Collection)

集合是文档的容器,类似于关系型数据库的表:

// 创建集合(隐式创建)
db.articles.insertOne({ title: "Hello" })
 
// 创建带验证规则的集合
db.createCollection("users", {
  validator: {
    $jsonSchema: {
      bsonType: "object",
      required: ["email", "password"],
      properties: {
        email: {
          bsonType: "string",
          pattern: "^.+@.+$"
        },
        password: {
          bsonType: "string",
          minLength: 8
        }
      }
    }
  }
})
 
// 查看集合
show collections
 
// 删除集合
db.users.drop()

3. 数据库

// 切换数据库
use mydb
 
// 创建数据库(实际上创建第一个集合时才创建)
db.createDatabase = function() { return "use mydb, then insert data" }
 
// 查看所有数据库
show dbs
 
// 删除数据库
db.dropDatabase()

数据模型设计

1. 嵌入模式(一对一)

// 用户文档嵌入地址
{
  _id: ObjectId(),
  name: "Alice",
  email: "alice@example.com",
  address: {
    street: "123 Main St",
    city: "New York",
    state: "NY",
    zip: "10001",
    country: "USA"
  }
}

2. 引用模式(一对多)

// 订单文档引用产品
// orders 集合
{
  _id: ObjectId(),
  orderNumber: "ORD-001",
  customerId: ObjectId("..."),
  items: [
    { productId: ObjectId("..."), quantity: 2, price: 29.99 },
    { productId: ObjectId("..."), quantity: 1, price: 49.99 }
  ],
  total: 109.97,
  status: "completed"
}
 
// products 集合
{
  _id: ObjectId(),
  name: "Widget",
  description: "A useful widget",
  price: 29.99,
  stock: 100
}

3. 桶模式(时间序列)

// 传感器数据桶
{
  _id: ObjectId(),
  sensorId: "sensor-001",
  bucketStart: ISODate("2024-01-15T00:00:00Z"),
  bucketEnd: ISODate("2024-01-15T01:00:00Z"),
  readings: [
    { time: ISODate("2024-01-15T00:00:00Z"), temp: 22.5, humidity: 65 },
    { time: ISODate("2024-01-15T00:05:00Z"), temp: 22.6, humidity: 64 },
    // ... 最多 60 条记录
  ],
  stats: {
    minTemp: 22.0,
    maxTemp: 23.5,
    avgTemp: 22.8,
    count: 60
  }
}

4. 血统模式(层级结构)

// 分类树
{
  _id: ObjectId(),
  name: "Electronics",
  slug: "electronics",
  parentId: null,
  path: "/electronics",
  level: 0
}
 
{
  _id: ObjectId(),
  name: "Smartphones",
  slug: "smartphones",
  parentId: ObjectId("..."), // 指向 Electronics
  path: "/electronics/smartphones",
  level: 1
}
 
// 查询所有后代
db.categories.aggregate([
  {
    $graphLookup: {
      from: "categories",
      startWith: "$_id",
      connectFromField: "_id",
      connectToField: "parentId",
      as: "descendants",
      depthField: "depth"
    }
  }
])


MongoDB 8.0 新特性

MongoDB 8.0 于 2025 年发布,带来了性能优化和新功能。

核心新特性

1. 性能大幅提升

优化项提升幅度
查询性能提升 40%
写入吞吐量提升 50%
分片平衡提升 60%
索引构建提升 45%
内存使用减少 30%

2. 增强的聚合管道

// 新的聚合操作符
db.articles.aggregate([
  {
    $vectorSearch: {  // 向量搜索
      index: "vector_index",
      path: "embedding",
      queryVector: [0.1, 0.2, ...],
      numCandidates: 100,
      limit: 10
    }
  },
  {
    $score: {  // 评分
      score: { $meta: "vectorSearchScore" }
    }
  }
]);
 
// 新的 $group 增强
db.sales.aggregate([
  {
    $group: {
      _id: { region: "$region", category: "$category" },
      total: { $sum: "$amount" },
      avg: { $avg: { $ifNull: ["$amount", 0] } }
    }
  }
]);
 
// 新的 $unionWith 增强
db.orders.aggregate([
  { $match: { status: "completed" } },
  {
    $unionWith: {
      coll: "archived_orders",
      pipeline: [
        { $match: { year: 2024 } }
      ]
    }
  }
]);

3. 改进的安全功能

// 增强的字段级加密
db.users.insertOne({
  name: "Alice",
  ssn: "123-45-6789",  // 自动加密
  creditCard: "4111-1111-1111-1111",  // 自动加密
  _id: ObjectId()
});
 
// 改进的 RBAC
db.admin.updateUser("admin_user", {
  roles: [
    { role: "readWriteAnyDatabase", db: "admin" },
    { role: "atlasAdmin", db: "admin" }
  ]
});

4. 时间序列增强

// 时间序列集合改进
db.createCollection("sensor_data", {
  timeseries: {
    timeField: "timestamp",
    metaField: "metadata",
    granularity: "seconds",
    expireAfterSeconds: 2592000  // 30 天自动过期
  }
});
 
// 聚合优化
db.sensor_data.aggregate([
  {
    $group: {
      _id: {
        sensorId: "$metadata.sensorId",
        interval: {
          $dateTrunc: {
            date: "$timestamp",
            unit: "minute"
          }
        }
      },
      avgValue: { $avg: "$value" },
      maxValue: { $max: "$value" }
    }
  }
]);

文档模型与 Schema 设计

文档结构基础

// MongoDB 文档示例(BSON 格式)
{
  "_id": ObjectId("..."),
  "title": "My First Article",
  "content": "This is the article content...",
  "author": {
    "name": "Alice",
    "email": "alice@example.com",
    "bio": "Tech writer"
  },
  "tags": ["AI", "Machine Learning", "Python"],
  "stats": {
    "views": 1000,
    "likes": 50,
    "shares": 10
  },
  "comments": [
    {
      "user": "Bob",
      "text": "Great article!",
      "createdAt": ISODate("2024-01-15")
    }
  ],
  "createdAt": ISODate("2024-01-01"),
  "updatedAt": ISODate("2024-01-15"),
  "status": "published",
  "metadata": {  // 灵活字段
    "language": "en",
    "region": "US"
  }
}

Schema 设计模式

1. 嵌入(Embedding)vs 引用(Reference)

// 嵌入模式:一对少,数据总是一起访问
// 优点:一次查询获取所有数据
// 缺点:文档过大,数据重复
{
  _id: ObjectId(),
  orderNumber: "ORD-001",
  customer: {
    name: "Alice",
    email: "alice@example.com",
    address: {
      street: "123 Main St",
      city: "New York",
      zip: "10001"
    }
  },
  items: [
    { product: "Widget", quantity: 2, price: 19.99 },
    { product: "Gadget", quantity: 1, price: 49.99 }
  ],
  total: 89.97,
  createdAt: ISODate()
}
 
// 引用模式:一对多,数据需要单独管理
// 优点:数据规范化,易于更新
// 缺点:需要多次查询或 $lookup
 
// orders 集合
{
  _id: ObjectId(),
  orderNumber: "ORD-002",
  customerId: ObjectId("..."),  // 引用
  items: [
    { productId: ObjectId("..."), quantity: 2 },  // 引用
    { productId: ObjectId("..."), quantity: 1 }
  ],
  createdAt: ISODate()
}
 
// customers 集合
{
  _id: ObjectId(),
  name: "Bob",
  email: "bob@example.com"
}
 
// products 集合
{
  _id: ObjectId(),
  name: "Widget",
  price: 19.99
}
 
// 使用 $lookup 关联查询
db.orders.aggregate([
  { $match: { orderNumber: "ORD-002" } },
  {
    $lookup: {
      from: "customers",
      localField: "customerId",
      foreignField: "_id",
      as: "customer"
    }
  },
  {
    $lookup: {
      from: "products",
      localField: "items.productId",
      foreignField: "_id",
      as: "productDetails"
    }
  }
]);

2. 常见设计模式

// 扩展模式:灵活添加字段
{
  _id: ObjectId(),
  name: "Product",
  basePrice: 99.99,
  // 可选字段
  salePrice: 79.99,
  discount: { percent: 20, code: "SUMMER24" }
}
 
// 桶模式:优化范围查询
{
  _id: ObjectId(),
  sensorId: "sensor-001",
  bucketStart: ISODate("2024-01-01T00:00:00Z"),
  bucketEnd: ISODate("2024-01-01T01:00:00Z"),
  readings: [
    { time: ISODate("2024-01-01T00:15:00Z"), value: 22.5 },
    { time: ISODate("2024-01-01T00:30:00Z"), value: 23.0 }
  ],
  stats: {
    min: 22.0,
    max: 23.5,
    avg: 22.8,
    count: 60
  }
}
 
// 血统模式:处理层级数据
{
  _id: ObjectId(),
  name: "Electronics",
  parentId: null,  // 根节点
  path: "/electronics"
},
{
  _id: ObjectId(),
  name: "Laptops",
  parentId: ObjectId("..."),  // 父节点
  path: "/electronics/laptops"
}

Schema 验证

// 创建带有验证规则的集合
db.createCollection("users", {
  validator: {
    $jsonSchema: {
      bsonType: "object",
      required: ["email", "password", "createdAt"],
      properties: {
        email: {
          bsonType: "string",
          pattern: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
        },
        password: {
          bsonType: "string",
          minLength: 8
        },
        age: {
          bsonType: "int",
          minimum: 0,
          maximum: 150
        },
        role: {
          enum: ["user", "admin", "moderator"]
        },
        metadata: {
          bsonType: "object"
        }
      }
    }
  },
  validationLevel: "moderate",  // moderate: 新文档和更新验证
  validationAction: "error"      // error: 拒绝无效文档
});
 
// 修改验证规则
db.runCommand({
  collMod: "users",
  validator: {
    $jsonSchema: {
      bsonType: "object",
      required: ["email"],
      properties: {
        email: { bsonType: "string" }
      }
    }
  }
});

Aggregation Pipeline

基础管道

// 基础过滤和投影
db.articles.aggregate([
  // $match: 过滤文档
  { $match: { status: "published", tags: "AI" } },
  
  // $project: 选择和重命名字段
  { $project: {
      title: 1,
      authorName: "$author.name",
      year: { $year: "$createdAt" },
      _id: 0
    }
  },
  
  // $sort: 排序
  { $sort: { createdAt: -1 } },
  
  // $limit: 限制数量
  { $limit: 10 }
]);

高级聚合操作符

// $group: 分组聚合
db.orders.aggregate([
  { $unwind: "$items" },  // 展开数组
  {
    $group: {
      _id: {
        year: { $year: "$createdAt" },
        month: { $month: "$createdAt" },
        category: "$items.category"
      },
      totalRevenue: { $sum: { $multiply: ["$items.price", "$items.quantity"] } },
      orderCount: { $sum: 1 },
      avgOrderValue: { $avg: "$total" },
      topProducts: { $addToSet: "$items.name" }
    }
  },
  { $sort: { "_id.year": -1, "_id.month": -1 } }
]);
 
// $bucket: 自定义桶分组
db.sales.aggregate([
  {
    $bucket: {
      groupBy: "$amount",
      boundaries: [0, 100, 500, 1000, 5000, Infinity],
      default: "Other",
      output: {
        count: { $sum: 1 },
        orders: { $push: { id: "$_id", amount: "$amount" } }
      }
    }
  }
]);
 
// $facet: 多维度聚合
db.orders.aggregate([
  {
    $facet: {
      // 按状态统计
      byStatus: [
        { $group: { _id: "$status", count: { $sum: 1 } } }
      ],
      // 按月统计
      byMonth: [
        { $group: {
            _id: { $month: "$createdAt" },
            total: { $sum: "$total" }
          }
        },
        { $sort: { _id: 1 } }
      ],
      // Top 5 客户
      topCustomers: [
        { $group: { _id: "$customerId", total: { $sum: "$total" } } },
        { $sort: { total: -1 } },
        { $limit: 5 }
      ]
    }
  }
]);

数组操作

// $filter: 过滤数组元素
db.orders.aggregate([
  {
    $project: {
      orderId: 1,
      // 只保留状态为 pending 的商品
      pendingItems: {
        $filter: {
          input: "$items",
          as: "item",
          cond: { $eq: ["$$item.status", "pending"] }
        }
      }
    }
  }
]);
 
// $map: 转换数组元素
db.products.aggregate([
  {
    $project: {
      name: 1,
      // 应用折扣
      discountedPrices: {
        $map: {
          input: "$prices",
          as: "price",
          in: { $multiply: ["$$price", 0.9] }  // 9折
        }
      }
    }
  }
]);
 
// $reduce: 聚合数组
db.orders.aggregate([
  {
    $project: {
      orderId: 1,
      // 计算总价
      total: {
        $reduce: {
          input: "$items",
          initialValue: 0,
          in: {
            $add: [
              "$$value",
              { $multiply: ["$$this.price", "$$this.quantity"] }
            ]
          }
        }
      }
    }
  }
]);

关联查询

// $lookup: 左连接
db.orders.aggregate([
  {
    $lookup: {
      from: "customers",
      localField: "customerId",
      foreignField: "_id",
      as: "customerInfo"
    }
  },
  { $unwind: "$customerInfo" },  // 展开数组
  {
    $lookup: {
      from: "products",
      let: { itemIds: "$items.productId" },
      pipeline: [
        { $match: { $expr: { $in: ["$_id", "$$itemIds"] } } }
      ],
      as: "productDetails"
    }
  }
]);
 
// 子查询关联
db.sales.aggregate([
  {
    $lookup: {
      from: "inventory",
      let: { prodId: "$productId", qty: "$quantity" },
      pipeline: [
        {
          $match: {
            $expr: {
              $and: [
                { $eq: ["$productId", "$$prodId"] },
                { $gte: ["$stock", "$$qty"] }
              ]
            }
          }
        }
      ],
      as: "inventoryCheck"
    }
  }
]);

MongoDB Atlas 云服务

Atlas 核心功能

功能说明
Atlas Search全文搜索引擎,基于 Lucene
Atlas Data Lake无 Serverless 分析
Atlas Vector SearchAI 向量相似度搜索
Atlas Triggers事件驱动函数
Atlas Graph图数据库能力
Realm移动端同步

Atlas Search 配置

// 在 Atlas 中创建搜索索引(使用 Atlas UI 或 CLI)
 
// Atlas Search 索引定义
{
  "mappings": {
    "dynamic": false,
    "fields": {
      "title": {
        "type": "string",
        "analyzer": "lucene.standard"
      },
      "content": {
        "type": "string",
        "analyzer": "lucene.english"
      },
      "tags": {
        "type": "string",
        "analyzer": "lucene.keyword"
      },
      "metadata": {
        "type": "object"
      }
    }
  },
  "analyzers": [
    {
      "name": "customAnalyzer",
      "tokenizer": {
        "type": "standard"
      },
      "filter": ["lowercase", "porterStem"]
    }
  ]
}
 
// 使用 $search
db.articles.aggregate([
  {
    $search: {
      index: "default",
      text: {
        query: "machine learning",
        path: ["title", "content", "tags"],
        fuzzy: {
          maxEdits: 1,
          prefixLength: 2
        }
      },
      highlight: {
        path: ["title", "content"]
      }
    }
  },
  {
    $project: {
      title: 1,
      snippet: { $meta: "searchSnippet" },
      score: { $meta: "searchScore" },
      highlights: { $meta: "searchHighlights" }
    }
  }
]);
 
// 布尔搜索
db.articles.aggregate([
  {
    $search: {
      compound: {
        must: [
          { text: { query: "AI", path: "title" } }
        ],
        should: [
          { text: { query: "Python", path: ["title", "content"] } },
          { text: { query: "TensorFlow", path: ["title", "content"] } }
        ],
        mustNot: [
          { text: { query: "deprecated", path: "content" } }
        ],
        filter: [
          { range: { path: "createdAt", gte: ISODate("2024-01-01") } }
        ]
      }
    }
  }
]);

MongoDB vs PostgreSQL vs MySQL 对比

核心对比表

维度MongoDBPostgreSQLMySQL
模型文档型关系型关系型
Schema灵活(无模式)严格(可验证)严格
事务多文档 ACID完整 ACIDACID (InnoDB)
Join$lookup原生 SQL Join原生 SQL Join
查询语言MongoDB QuerySQLSQL
索引B-Tree, 全文, 向量B-Tree, GiST, GIN, 向量B-Tree, 全文, 向量
扩展性原生分片需要扩展需要扩展
JSON 支持原生文档jsonbJSON 函数
向量搜索Atlas Vector Searchpgvector原生 (9.0)
一致性最终一致/强一致强一致强一致
学习曲线平缓较陡平缓

选择建议对照表

场景推荐选择原因
灵活 SchemaMongoDB ⭐⭐⭐⭐⭐无模式限制
结构化数据PostgreSQL ⭐⭐⭐⭐⭐强类型 + 约束
高并发简单查询MySQL ⭐⭐⭐⭐⭐性能优异
复杂查询/报表PostgreSQL ⭐⭐⭐⭐⭐SQL 强大
AI 向量搜索PostgreSQL/MongoDBpgvector/Atlas
快速原型MongoDB ⭐⭐⭐⭐灵活修改
强类型需求PostgreSQL ⭐⭐⭐⭐Schema 验证
分片海量数据MongoDB ⭐⭐⭐⭐原生支持

Atlas AI Search 向量搜索

配置向量搜索

// 在 Atlas UI 创建向量搜索索引
// 索引定义示例
{
  "fields": [
    {
      "path": "embedding",
      "type": "vector",
      "similarity": "cosine",  // cosine, euclidean, dotProduct
      "dimensions": 1536
    },
    {
      "path": "content",
      "type": "string"
    },
    {
      "path": "metadata.category",
      "type": "string"
    }
  ]
}
 
// 插入带嵌入向量的文档
db.documents.insertOne({
  content: "PostgreSQL is a powerful open-source database.",
  embedding: [0.123, -0.456, ...],  // 1536 维向量
  metadata: {
    category: "database",
    source: "wiki",
    createdAt: ISODate()
  }
});
 
// 向量搜索查询
db.documents.aggregate([
  {
    $vectorSearch: {
      index: "vector_index",
      path: "embedding",
      queryVector: queryEmbedding,  // 查询向量
      numCandidates: 100,           // 候选数量
      limit: 10                      // 返回数量
    }
  },
  {
    $project: {
      content: 1,
      score: { $meta: "vectorSearchScore" },
      metadata: 1
    }
  }
]);
 
// 混合搜索:向量 + 关键词
db.documents.aggregate([
  {
    $search: {
      index: "hybrid_index",
      compound: {
        must: [
          {
            vectorSearch: {
              path: "embedding",
              queryVector: queryEmbedding,
              numCandidates: 100,
              score: { boost: { value: 1 } }
            }
          }
        ],
        should: [
          {
            text: {
              query: "database SQL",
              path: "content",
              score: { boost: { value: 0.3 } }
            }
          }
        ]
      }
    }
  }
]);

Mongoose ODM

基础用法

// 定义 Schema
const mongoose = require('mongoose');
 
const userSchema = new mongoose.Schema({
  name: {
    type: String,
    required: true,
    trim: true,
    maxlength: 100
  },
  email: {
    type: String,
    required: true,
    unique: true,
    lowercase: true,
    match: /^[^\s@]+@[^\s@]+\.[^\s@]+$/
  },
  age: {
    type: Number,
    min: 0,
    max: 150
  },
  role: {
    type: String,
    enum: ['user', 'admin', 'moderator'],
    default: 'user'
  },
  preferences: {
    theme: { type: String, default: 'light' },
    notifications: { type: Boolean, default: true }
  },
  // 数组字段
  tags: [String],
  // 嵌套文档
  address: {
    street: String,
    city: String,
    country: String
  },
  // 虚拟字段
  createdAt: { type: Date, default: Date.now },
  updatedAt: { type: Date, default: Date.now }
}, {
  timestamps: true,
  toJSON: { virtuals: true },
  toObject: { virtuals: true }
});
 
// 索引
userSchema.index({ email: 1 });
userSchema.index({ name: 'text' });
 
// 虚拟字段
userSchema.virtual('fullName').get(function() {
  return `${this.name} (${this.role})`;
});
 
// 方法
userSchema.methods.greet = function() {
  return `Hello, I'm ${this.name}!`;
};
 
// 静态方法
userSchema.statics.findByEmail = function(email) {
  return this.findOne({ email: email.toLowerCase() });
};
 
// 中间件
userSchema.pre('save', function(next) {
  console.log('About to save:', this.name);
  this.email = this.email.toLowerCase();
  next();
});
 
// 编译模型
const User = mongoose.model('User', userSchema);
 
// 使用
async function example() {
  const user = new User({
    name: 'Alice',
    email: 'Alice@Example.com',
    age: 30
  });
  
  await user.save();
  console.log(user.fullName);  // "Alice (user)"
  
  const found = await User.findByEmail('alice@example.com');
}

聚合管道封装

// 封装常用聚合操作
userSchema.statics.getStatsByRole = function() {
  return this.aggregate([
    { $group: {
        _id: '$role',
        count: { $sum: 1 },
        avgAge: { $avg: '$age' },
        users: { $push: { name: '$name', email: '$email' } }
      }
    },
    { $sort: { count: -1 } }
  ]);
};
 
userSchema.statics.getUserWithOrders = async function(userId) {
  return this.aggregate([
    { $match: { _id: mongoose.Types.ObjectId(userId) } },
    {
      $lookup: {
        from: 'orders',
        localField: '_id',
        foreignField: 'userId',
        as: 'orders'
      }
    },
    {
      $addFields: {
        orderCount: { $size: '$orders' },
        totalSpent: { $sum: '$orders.total' }
      }
    }
  ]);
};

实战场景与代码示例

场景一:AI 对话历史存储

// conversations 集合
const conversationSchema = new mongoose.Schema({
  userId: { type: mongoose.Schema.Types.ObjectId, ref: 'User', required: true },
  title: { type: String, default: 'New Conversation' },
  model: { type: String, default: 'gpt-4o' },
  status: { type: String, enum: ['active', 'archived'], default: 'active' },
  metadata: mongoose.Schema.Types.Mixed,
  createdAt: { type: Date, default: Date.now },
  updatedAt: { type: Date, default: Date.now }
});
 
// messages 集合
const messageSchema = new mongoose.Schema({
  conversationId: { 
    type: mongoose.Schema.Types.ObjectId, 
    ref: 'Conversation', 
    required: true,
    index: true
  },
  role: { 
    type: String, 
    enum: ['system', 'user', 'assistant', 'tool'], 
    required: true 
  },
  content: { type: String, required: true },
  embedding: [Number],  // 存储消息嵌入向量
  tokens: {
    prompt: { type: Number, default: 0 },
    completion: { type: Number, default: 0 }
  },
  metadata: {
    model: String,
    finishReason: String,
    toolCalls: [{
      name: String,
      arguments: mongoose.Schema.Types.Mixed
    }]
  },
  createdAt: { type: Date, default: Date.now }
});
 
// 创建索引
messageSchema.index({ conversationId: 1, createdAt: 1 });
messageSchema.index({ embedding: 1 });
 
// 对话历史查询
messageSchema.statics.getConversationMessages = async function(conversationId, limit = 50) {
  return this.find({ conversationId })
    .sort({ createdAt: -1 })
    .limit(limit)
    .lean();
};
 
// 语义搜索历史
messageSchema.statics.semanticSearch = async function(conversationId, queryEmbedding, limit = 5) {
  return this.aggregate([
    { $match: { 
        conversationId: mongoose.Types.ObjectId(conversationId),
        embedding: { $exists: true, $ne: [] }
      }
    },
    {
      $addFields: {
        score: {
          $reduce: {
            input: { $zip: { inputs: ["$embedding", queryEmbedding] } },
            initialValue: 0,
            in: { $add: ["$$value", { $multiply: ["$$this.0", "$$this.1"] }] }
          }
        }
      }
    },
    { $sort: { score: -1 } },
    { $limit: limit },
    {
      $project: {
        _id: 1,
        role: 1,
        content: 1,
        score: 1,
        createdAt: 1
      }
    }
  ]);
};

场景二:实时数据处理

// Change Streams 实时监听
async function watchConversations() {
  const collection = mongoose.connection.collection('conversations');
  
  const changeStream = collection.watch(
    [
      { $match: { operationType: { $in: ['insert', 'update', 'delete'] } } }
    ],
    { fullDocument: 'updateLookup' }
  );
  
  changeStream.on('change', async (change) => {
    switch (change.operationType) {
      case 'insert':
        console.log('New conversation:', change.fullDocument);
        // 触发通知
        break;
      case 'update':
        console.log('Conversation updated:', change.fullDocument._id);
        // 更新缓存
        break;
      case 'delete':
        console.log('Conversation deleted:', change.documentKey._id);
        // 清理关联数据
        break;
    }
  });
  
  // 错误处理
  changeStream.on('error', (error) => {
    console.error('Change stream error:', error);
  });
  
  return changeStream;
}
 
// 在 mongoose 连接中使用
mongoose.connection.once('open', () => {
  watchConversations();
});

选型建议

Schema 设计最佳实践

场景推荐模式说明
一对少,固定数据嵌入评论、地址
一对多,变化数据引用订单项、标签
频繁更新的数据引用用户、产品
需要全文搜索嵌入 + Atlas Search文章内容
层级数据血统模式分类树
时间序列桶模式传感器数据
AI 嵌入向量独立字段1536+ 维

性能优化建议

// 避免的问题
// ❌ 文档过大(超过 16MB)
// ❌ 数组无限增长
// ❌ 缺少索引的查询
// ❌ 过深的嵌套
 
// 推荐做法
// ✅ 合理使用引用
// ✅ 设置合理的文档大小
// ✅ 为常用查询创建索引
// ✅ 使用 projection 限制返回字段
// ✅ 使用 lean() 获取原生 JS 对象

TIP

对于 AI 应用,MongoDB 的灵活性使其非常适合存储 Prompt/Response 对话历史和 AI 输出结果。结合 Atlas Vector Search,可以实现语义搜索功能。


SUCCESS

本文档全面介绍了 MongoDB 的核心特性、8.0 新功能、文档模型设计、Aggregation Pipeline、Atlas 云服务,以及在 AI 应用中的实战场景。MongoDB 凭借其灵活的模式和强大的聚合能力,是 AI 应用数据存储的优秀选择。


完整安装与环境配置

安装方法

# macOS
brew install mongodb-community@7.0
brew services start mongodb-community@7.0
 
# Ubuntu/Debian
# 添加 MongoDB 仓库
curl -fsSL https://www.mongodb.org/static/pgp/server-7.0.asc | gpg --dearmor -o /usr/share/keyrings/mongodb-server-7.0.gpg
echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] https://repo.mongodb.org/apt/ubuntu jammy/mongodb-org/7.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-7.0.list
sudo apt update
sudo apt install -y mongodb-org
 
# Docker
docker run -d \
  --name mongodb \
  -e MONGO_INITDB_ROOT_USERNAME=admin \
  -e MONGO_INITDB_ROOT_PASSWORD=password \
  -p 27017:27017 \
  mongo:7.0
 
# Docker Compose
version: "3.8"
services:
  mongodb:
    image: mongo:7.0
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: password
    ports:
      - "27017:27017"
    volumes:
      - mongodb_data:/data/db
    healthcheck:
      test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"]
      interval: 10s
      timeout: 5s
      retries: 5
 
volumes:
  mongodb_data:

MongoDB Shell

# 连接
mongosh "mongodb://localhost:27017"
mongosh "mongodb://admin:password@localhost:27017"
mongosh "mongodb://localhost:27017,mongodb2:27017/?replicaSet=rs0"
 
# 常用命令
show dbs
use mydb
show collections
db.help()
 
# CRUD
db.users.find()
db.users.insertOne({ name: "John" })
db.users.updateOne({ _id: 1 }, { $set: { name: "Jane" } })
db.users.deleteOne({ _id: 1 })
 
# 聚合
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $group: { _id: "$customer", total: { $sum: "$amount" } } }
])

配置文件

# mongod.conf
systemLog:
  destination: file
  path: /var/log/mongodb/mongod.log
  logAppend: true
 
storage:
  dbPath: /var/lib/mongodb
  journal:
    enabled: true
 
processManagement:
  timeZoneInfo: /usr/share/zoneinfo
 
net:
  port: 27017
  bindIp: 127.0.0.1
  maxIncomingConnections: 65536
 
operationProfiling:
  mode: slowOp
  slowOpThresholdMs: 100
 
replication:
  replSetName: rs0
 
sharding:
  clusterRole: configsvr

数据库设计模式

文档结构设计

// 1. 引用模式(规范化)
// User 集合
{
  _id: ObjectId("..."),
  email: "user@example.com",
  name: "John Doe"
}
 
// Post 集合(引用 User)
{
  _id: ObjectId("..."),
  title: "My Post",
  author_id: ObjectId("..."),  // 引用
  content: "...",
  created_at: ISODate("...")
}
 
// 2. 嵌入模式(非规范化)
// Order 文档(嵌入 Line Items)
{
  _id: ObjectId("..."),
  customer_id: ObjectId("..."),
  items: [
    { product_id: 1, name: "Product A", quantity: 2, price: 29.99 },
    { product_id: 2, name: "Product B", quantity: 1, price: 49.99 }
  ],
  total: 109.97,
  status: "completed",
  created_at: ISODate("...")
}
 
// 3. 混合模式
{
  _id: ObjectId("..."),
  title: "Blog Post",
  author: {  // 嵌入基本作者信息
    id: ObjectId("..."),
    name: "John Doe",
    avatar: "https://..."
  },
  content: "...",
  tags: ["tech", "mongodb"],  // 标签数组
  comments: [  // 嵌入评论
    { author: "Jane", text: "Great post!", created_at: ISODate("...") }
  ],
  url: "https://...",  // 引用外部资源
  created_at: ISODate("...")
}

常见设计模式

// 1. 模式版本控制
{
  _id: ObjectId("..."),
  data: { /* 最新数据 */ },
  schema_version: 2,
  migrated_at: ISODate("...")
}
 
// 2. 桶模式(时间序列)
{
  _id: ObjectId(),
  device_id: "sensor-001",
  date: ISODate("2024-01-15"),
  readings: [
    { time: ISODate("2024-01-15T00:00:00Z"), temp: 22.5 },
    { time: ISODate("2024-01-15T00:01:00Z"), temp: 22.6 },
    // ... 更多读数
  ],
  count: 60
}
 
// 3. 血统模式(层级数据)
{
  _id: ObjectId("..."),
  name: "Electronics",
  slug: "electronics",
  parent_id: null,  // 根节点
  path: "/electronics"  // 路径
}
 
{
  _id: ObjectId("..."),
  name: "Smartphones",
  slug: "smartphones",
  parent_id: ObjectId("..."),  // Electronics
  path: "/electronics/smartphones"
}
 
// 4. 附加模式(处理增长字段)
// 博客文章(评论太多时移出)
{
  _id: ObjectId("..."),
  title: "...",
  comments_count: 1500,
  recent_comments: [
    { user: "user1", text: "...", created_at: ISODate("...") }
  ]
}
 
// 评论移到独立集合
{
  _id: ObjectId("..."),
  post_id: ObjectId("..."),
  user: "user1",
  text: "...",
  created_at: ISODate("...")
}

查询优化实战

索引设计

// 1. 单字段索引
db.users.createIndex({ email: 1 }, { unique: true });
db.posts.createIndex({ author_id: 1 });
db.posts.createIndex({ created_at: -1 });
 
// 2. 复合索引
db.posts.createIndex({ author_id: 1, created_at: -1 });
db.orders.createIndex({ customer_id: 1, status: 1 });
 
// 3. 多键索引(数组字段)
db.products.createIndex({ tags: 1 });
db.users.createIndex({ "address.city": 1 });
 
// 4. 文本索引
db.articles.createIndex({ title: "text", content: "text" });
 
// 搜索
db.articles.find(
  { $text: { $search: "mongodb tutorial" } },
  { score: { $meta: "textScore" } }
).sort({ score: { $meta: "textScore" } });
 
// 5. TTL 索引(自动过期)
db.sessions.createIndex(
  { created_at: 1 },
  { expireAfterSeconds: 3600 }  // 1 小时后自动删除
);
 
// 6. 部分索引
db.orders.createIndex(
  { customer_id: 1, created_at: -1 },
  { partialFilterExpression: { status: "pending" } }
);
 
// 7. 稀疏索引
db.users.createIndex(
  { phone: 1 },
  { sparse: true }  // 只索引存在 phone 字段的文档
);
 
// 8. 索引属性
db.users.createIndex(
  { email: 1 },
  { unique: true, background: true, name: "idx_users_email" }
);
 
// 查看索引
db.collection.getIndexes();
 
// 删除索引
db.collection.dropIndex("index_name");
db.collection.dropIndexes();  // 删除所有非 _id 索引

查询优化技巧

// 1. 使用 explain()
db.users.find({ email: "test@example.com" }).explain("executionStats");
 
// 2. 使用投影(Projection)
// ❌ 返回所有字段
db.users.find({ _id: 1 });
 
// ✅ 只返回需要的字段
db.users.find({ _id: 1 }, { email: 1, name: 1, _id: 0 });
 
// 3. 使用 Limit
db.posts.find({ published: true }).limit(20);
 
// 4. 使用 Sort + Limit(覆盖索引)
// 假设有索引 { author_id: 1, created_at: -1 }
db.posts
  .find({ author_id: ObjectId("...") })
  .sort({ created_at: -1 })
  .limit(10)
  .hint({ author_id: 1, created_at: -1 });
 
// 5. 使用 lean()(获取原生 JS 对象)
const users = await User.find({ active: true }).lean();
 
// 6. 分页优化
// ❌ OFFSET 分页(慢)
db.posts.find().skip(10000).limit(20);
 
// ✅ 游标分页(快)
db.posts.find({ _id: { $gt: lastId } }).limit(20);
 
// ✅ 时间戳分页
db.posts
  .find({ created_at: { $lt: lastTimestamp } })
  .sort({ created_at: -1 })
  .limit(20);
 
// 7. 避免正则开头包含 ^
db.users.find({ name: /.*keyword.*/ });  // 慢
db.users.find({ name: /keyword/ });  // 可以使用索引前缀优化

Aggregation Pipeline 深度使用

管道阶段

// 1. $match - 过滤
db.orders.aggregate([
  { $match: { status: "completed", created_at: { $gte: ISODate("2024-01-01") } } }
]);
 
// 2. $group - 分组
db.orders.aggregate([
  { $group: {
      _id: "$customer_id",
      total_amount: { $sum: "$amount" },
      order_count: { $sum: 1 },
      avg_amount: { $avg: "$amount" }
  }}
]);
 
// 3. $sort - 排序
db.orders.aggregate([
  { $sort: { total_amount: -1 } }
]);
 
// 4. $limit - 限制
db.orders.aggregate([
  { $limit: 10 }
]);
 
// 5. $project - 投影
db.orders.aggregate([
  { $project: {
      _id: 0,
      customer_id: 1,
      total: { $round: ["$total_amount", 2] },
      year: { $year: "$created_at" }
  }}
]);
 
// 6. $lookup - 关联
db.orders.aggregate([
  { $lookup: {
      from: "customers",
      localField: "customer_id",
      foreignField: "_id",
      as: "customer"
  }},
  { $unwind: "$customer" },
  { $project: {
      "customer.name": 1,
      total: 1
  }}
]);
 
// 7. $unwind - 展开数组
db.orders.aggregate([
  { $unwind: "$items" },
  { $group: {
      _id: "$items.product_id",
      total_sold: { $sum: "$items.quantity" }
  }}
]);
 
// 8. $addFields - 添加字段
db.orders.aggregate([
  { $addFields: {
      discount: { $multiply: ["$total", 0.1] },
      final_total: { $subtract: ["$total", { $multiply: ["$total", 0.1] }] }
  }}
]);
 
// 9. $bucket - 分桶
db.orders.aggregate([
  { $bucket: {
      groupBy: "$amount",
      boundaries: [0, 50, 100, 200, 500],
      default: "500+",
      output: { count: { $sum: 1 } }
  }}
]);
 
// 10. $facet - 多管道
db.orders.aggregate([
  { $facet: {
      totalOrders: [{ $count: "count" }],
      byStatus: [{ $group: { _id: "$status", count: { $sum: 1 } } }],
      totalAmount: [{ $group: { _id: null, total: { $sum: "$amount" } } }]
  }}
]);

聚合实战

// 1. 每日销售统计
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $group: {
      _id: {
        year: { $year: "$created_at" },
        month: { $month: "$created_at" },
        day: { $dayOfMonth: "$created_at" }
      },
      revenue: { $sum: "$amount" },
      orders: { $sum: 1 }
  }},
  { $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1 } }
]);
 
// 2. 用户购买排名
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $group: {
      _id: "$customer_id",
      total_spent: { $sum: "$amount" },
      order_count: { $sum: 1 }
  }},
  { $sort: { total_spent: -1 } },
  { $limit: 10 },
  { $lookup: {
      from: "customers",
      localField: "_id",
      foreignField: "_id",
      as: "customer"
  }},
  { $unwind: "$customer" },
  { $project: {
      "customer.name": 1,
      total_spent: 1,
      order_count: 1
  }}
]);
 
// 3. 产品销售分析
db.orders.aggregate([
  { $unwind: "$items" },
  { $group: {
      _id: "$items.product_id",
      product_name: { $first: "$items.name" },
      total_quantity: { $sum: "$items.quantity" },
      total_revenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] } }
  }},
  { $sort: { total_revenue: -1 } }
]);

备份与恢复

备份方法对比

备份类型工具特点适用场景
mongodumpmongodump逻辑备份,跨版本应用级恢复
文件系统cp/tar物理备份,快速灾难恢复
OpLogmongodump —oplog增量备份精确恢复
Atlas 备份Atlas Console自动备份,云托管Atlas 用户

mongodump 使用

# 备份单个数据库
mongodump --uri="mongodb://localhost:27017/mydb" --out=/backup/
 
# 备份所有数据库
mongodump --uri="mongodb://localhost:27017" --out=/backup/
 
# 备份特定集合
mongodump --uri="mongodb://localhost:27017/mydb" \
    --collection=users --out=/backup/
 
# 压缩备份
mongodump --uri="mongodb://localhost:27017/mydb" \
    --archive=/backup/mydb.archive --gzip
 
# 带 OpLog 的备份(副本集)
mongodump --uri="mongodb://localhost:27017" \
    --oplog --out=/backup/with_oplog
 
# 远程备份
mongodump --host=remote.mongodb.com --port=27017 \
    --username=user --password=password \
    --authenticationDatabase=admin \
    --out=/backup/remote/
 
# 排除特定集合
mongodump --uri="mongodb://localhost:27017/mydb" \
    --excludeCollection=logs \
    --excludeCollectionWithPrefix=temp_

恢复操作

# 恢复单个数据库
mongorestore --uri="mongodb://localhost:27017" /backup/mydb/
 
# 恢复前删除现有数据
mongorestore --uri="mongodb://localhost:27017" \
    --drop /backup/mydb/
 
# 恢复压缩备份
mongorestore --uri="mongodb://localhost:27017/mydb" \
    --archive=/backup/mydb.archive --gzip
 
# 恢复特定集合
mongorestore --uri="mongodb://localhost:27017/mydb" \
    --collection=users /backup/mydb/users.bson
 
# OpLog 恢复
mongorestore --uri="mongodb://localhost:27017" \
    --oplogReplay /backup/with_oplog/
 
# 恢复前查看备份内容
mongorestore --dry-run --uri="mongodb://localhost:27017" /backup/mydb/

文件系统备份

# 锁定数据库(副本集)
mongod --replSet rs0
# 在主节点执行
db.adminCommand({ fsync: 1, lock: true })
 
# 使用 cp 备份数据目录
cp -R /data/db /backup/$(date +%Y%m%d)/
 
# 解锁
db.adminCommand({ fsyncUnlock: 1 })
 
# 使用 LVM 快照
lvcreate --size 10G --snapshot --name mongodb-snap /dev/vg0/mongodb
mount /dev/vg0/mongodb-snap /mnt/snap
tar -czf /backup/mongodb-$(date +%Y%m%d).tar.gz /mnt/snap
umount /mnt/snap
lvdelete /dev/vg0/mongodb-snap

副本集备份策略

// 副本集备份脚本
db.adminCommand({
  backupCursor: {
    includeRest: true,
    maxTimeMS: 60000
  }
})
#!/bin/bash
# 副本集备份脚本
 
REPLICA_SET="rs0"
BACKUP_DIR="/backup/mongodb"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=7
 
mkdir -p "$BACKUP_DIR"
 
# 使用 mongodump 备份(从节点更佳)
mongodump --uri="mongodb://localhost:27017/?replicaSet=$REPLICA_SET" \
    --oplog \
    --out="$BACKUP_DIR/backup_$DATE"
 
# 压缩备份
tar -czf "$BACKUP_DIR/backup_$DATE.tar.gz" "$BACKUP_DIR/backup_$DATE"
rm -rf "$BACKUP_DIR/backup_$DATE"
 
# 清理旧备份
find "$BACKUP_DIR" -name "*.tar.gz" -mtime +$RETENTION_DAYS -delete
 
echo "Backup completed: backup_$DATE.tar.gz"

Python 集成

PyMongo 使用指南

from pymongo import MongoClient, DESCENDING
from bson import ObjectId
from datetime import datetime
 
# 连接 MongoDB
client = MongoClient("mongodb://localhost:27017")
db = client["mydb"]
 
# 获取集合
users = db["users"]
 
# CRUD 操作
# 创建
result = users.insert_one({
    "name": "Alice",
    "email": "alice@example.com",
    "age": 30,
    "tags": ["python", "mongodb"],
    "created_at": datetime.utcnow()
})
print(f"Inserted ID: {result.inserted_id}")
 
# 批量创建
users.insert_many([
    {"name": "Bob", "email": "bob@example.com", "age": 25},
    {"name": "Carol", "email": "carol@example.com", "age": 28}
])
 
# 读取
# 按 ID 查询
user = users.find_one({"_id": ObjectId("...")})
 
# 条件查询
active_users = users.find({"status": "active"})
young_users = users.find({"age": {"$gte": 18, "$lt": 30}})
 
# 投影(只返回特定字段)
names_only = users.find({}, {"name": 1, "email": 1, "_id": 0})
 
# 排序和限制
top_users = users.find().sort("age", DESCENDING).limit(10)
 
# 计数
count = users.count_documents({"status": "active"})
 
# 更新
users.update_one(
    {"_id": ObjectId("...")},
    {"$set": {"age": 31}}
)
 
# 批量更新
users.update_many(
    {"status": "inactive"},
    {"$set": {"last_active": datetime.utcnow()}}
)
 
# 原子更新
users.update_one(
    {"_id": ObjectId("...")},
    {"$inc": {"login_count": 1}}
)
 
# 删除
users.delete_one({"_id": ObjectId("...")})
users.delete_many({"status": "deleted"})

Motor(异步)

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from bson import ObjectId
 
async def main():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client["mydb"]
    users = db["users"]
    
    # 创建
    result = await users.insert_one({
        "name": "Alice",
        "email": "alice@example.com"
    })
    print(f"Inserted: {result.inserted_id}")
    
    # 读取
    user = await users.find_one({"_id": result.inserted_id})
    print(f"Found: {user}")
    
    # 批量操作
    await users.insert_many([
        {"name": "Bob", "email": "bob@example.com"},
        {"name": "Carol", "email": "carol@example.com"}
    ])
    
    # 查询
    async for user in users.find({"name": {"$regex": "^A"}}):
        print(user)
    
    # 更新
    await users.update_one(
        {"_id": result.inserted_id},
        {"$set": {"age": 30}}
    )
    
    # 聚合
    pipeline = [
        {"$group": {"_id": "$status", "count": {"$sum": 1}}}
    ]
    async for status in users.aggregate(pipeline):
        print(status)
    
    client.close()
 
asyncio.run(main())

MongoEngine ODM

from mongoengine import connect, Document, StringField, IntField, DateTimeField, ListField
from datetime import datetime
 
# 连接
connect("mydb", host="localhost", port=27017)
 
# 定义模型
class User(Document):
    name = StringField(required=True, max_length=100)
    email = StringField(required=True, unique=True)
    age = IntField(min_value=0, max_value=150)
    tags = ListField(StringField())
    created_at = DateTimeField(default=datetime.utcnow)
    
    meta = {
        "collection": "users",
        "indexes": ["email", "name"],
        "ordering": ["-created_at"]
    }
 
# CRUD 操作
# 创建
user = User(name="Alice", email="alice@example.com", age=30)
user.save()
 
# 读取
user = User.objects(email="alice@example.com").first()
users = User.objects(age__gte=18)
 
# 更新
user.age = 31
user.save()
 
# 删除
user.delete()
 
# 复杂查询
young_users = User.objects(age__gte=18, age__lt=30)
alice = User.objects.get_or_404(email="alice@example.com")
 
# 聚合查询
stats = User.objects.aggregate([
    {"$group": {"_id": None, "avg_age": {"$avg": "$age"}, "count": {"$sum": 1}}}
])

PyMongo 高级操作

from pymongo import MongoClient, UpdateOne
from bson import ObjectId
 
client = MongoClient("mongodb://localhost:27017")
db = client["mydb"]
 
# 聚合管道
pipeline = [
    # 匹配
    {"$match": {"status": "active"}},
    
    # 展开数组
    {"$unwind": "$tags"},
    
    # 分组
    {"$group": {
        "_id": "$tags",
        "count": {"$sum": 1},
        "users": {"$push": "$name"}
    }},
    
    # 排序
    {"$sort": {"count": -1}},
    
    # 限制
    {"$limit": 10}
]
 
results = list(db.users.aggregate(pipeline))
for r in results:
    print(f"{r['_id']}: {r['count']} users")
 
# 事务(副本集)
with client.start_session() as session:
    with session.start_transaction():
        db.orders.insert_one({"item": "Book"}, session=session)
        db.inventory.update_one(
            {"item": "Book", "qty": {"$gt": 0}},
            {"$inc": {"qty": -1}},
            session=session
        )
 
# 批量写入
operations = [
    UpdateOne(
        {"_id": ObjectId("...")},
        {"$set": {"status": "completed"}}
    ),
    UpdateOne(
        {"_id": ObjectId("...")},
        {"$set": {"status": "cancelled"}}
    )
]
result = db.orders.bulk_write(operations)
print(f"Modified: {result.modified_count}")

常见陷阱与最佳实践

陷阱 1:文档过大

// ❌ 错误:文档无限增长(超过 16MB 限制)
{
  _id: ObjectId("..."),
  title: "...",
  comments: [
    { user: "user1", text: "comment 1" },
    // ... 无限增长
  ]
}
 
// ✅ 正确:使用独立的 comments 集合
// Post 文档
{
  _id: ObjectId("..."),
  title: "...",
  comment_count: 1000
}
 
// Comments 集合
{
  _id: ObjectId("..."),
  post_id: ObjectId("..."),
  user: "user1",
  text: "...",
  created_at: ISODate("...")
}

陷阱 2:缺少索引

// ❌ 错误:频繁查询的字段没有索引
db.orders.find({ customer_id: ObjectId("...") });  // 慢
 
// ✅ 正确:创建索引
db.orders.createIndex({ customer_id: 1, created_at: -1 });

陷阱 3:过度使用 $where

// ❌ 错误:$where 性能差
db.users.find({ $where: "this.age > 25 && this.active === true" });
 
// ✅ 正确:使用标准查询运算符
db.users.find({ age: { $gt: 25 }, active: true });

最佳实践清单

  1. 合理设计文档结构:平衡嵌入和引用。
  2. 适当索引:为常用查询创建索引。
  3. 使用投影:只返回需要的字段。
  4. 分页优化:使用游标分页替代 OFFSET。
  5. 聚合管道优化:将 $match 放在最前面。
  6. 避免过深嵌套:最多 2-3 层。
  7. TTL 索引:用于会话、日志等自动过期。
  8. 副本集部署:保证高可用。
  9. 监控慢查询:使用 Profiler。
  10. 定期维护:压缩、重新索引。

与其他数据库对比

MongoDB vs PostgreSQL

特性MongoDBPostgreSQL
模型文档型关系型
模式灵活严格
事务ACID(副本集)ACID
JOIN$lookup完整 JOIN
扩展性水平扩展垂直扩展
查询MQLSQL
JSON原生jsonb

MongoDB vs MySQL

特性MongoDBMySQL
模型文档型关系型
扩展性水平扩展垂直扩展
模式灵活固定
事务副本集内单节点/主从
查询MQLSQL

MongoDB vs Redis

特性MongoDBRedis
类型文档数据库KV 存储
持久化默认持久化可选持久化
容量大数据量内存优先
查询复杂查询简单 KV
适用主数据库缓存/会话

MongoDB 事务与原子性

单文档原子性

// MongoDB 对单文档的操作是原子的
// 嵌入式文档被视为单个字段
 
// ✅ 原子操作:更新嵌入字段
db.users.updateOne(
  { _id: ObjectId("...") },
  {
    $set: {
      "profile.name": "New Name",
      "profile.bio": "New Bio"
    }
  }
);
 
// ✅ 原子操作:数组操作
db.posts.updateOne(
  { _id: ObjectId("...") },
  {
    $push: { comments: { user: "user1", text: "Great!" } },
    $inc: { comment_count: 1 }
  }
);
 
// ✅ 原子操作:条件更新
db.inventory.updateOne(
  { _id: ObjectId("..."), quantity: { $gte: 1 } },
  { $inc: { quantity: -1 } }
);
 
// ❌ 非原子操作(需要多文档事务)
db.orders.updateOne({ _id: ObjectId("...") }, { $set: { status: "shipped" } });
db.inventory.updateOne({ product_id: "SKU123" }, { $inc: { quantity: -1 } });

多文档事务

// 1. 启动事务会话
const session = db.getMongo().startSession();
 
// 2. 开始事务
session.startTransaction({
  readConcern: { level: "snapshot" },
  writeConcern: { w: "majority" }
});
 
try {
  const ordersCollection = session.getDatabase("mydb").orders;
  const inventoryCollection = session.getDatabase("mydb").inventory;
  const usersCollection = session.getDatabase("mydb").users;
 
  // 操作 1:创建订单
  const order = {
    user_id: ObjectId("..."),
    items: [{ product_id: "SKU123", quantity: 2 }],
    total: 99.99,
    status: "pending",
    created_at: new Date()
  };
  ordersCollection.insertOne(order, { session });
 
  // 操作 2:扣减库存
  inventoryCollection.updateOne(
    { product_id: "SKU123", quantity: { $gte: 2 } },
    { $inc: { quantity: -2 } },
    { session }
  );
 
  // 操作 3:更新用户订单统计
  usersCollection.updateOne(
    { _id: ObjectId("...") },
    {
      $inc: { order_count: 1 },
      $set: { last_order_at: new Date() }
    },
    { session }
  );
 
  // 3. 提交事务
  session.commitTransaction();
  console.log("Transaction committed successfully");
 
} catch (error) {
  // 4. 回滚事务
  session.abortTransaction();
  console.error("Transaction aborted:", error);
} finally {
  session.endSession();
}

事务选项详解

// 事务的读关注点 (readConcern)
// level: "local" - 返回本地最旧的数据(默认)
// level: "majority" - 返回被大多数副本确认的数据
// level: "snapshot" - 返回快照数据(仅在事务中可用)
 
// 事务的写关注点 (writeConcern)
// w: 1 - 仅主节点确认(默认)
// w: "majority" - 大多数节点确认
// w: "all" - 所有节点确认
// wtimeout: 5000 - 超时时间(毫秒)
 
// 示例:强一致性事务
session.startTransaction({
  readConcern: { level: "snapshot" },
  writeConcern: { w: "majority", wtimeout: 10000 }
});
 
// 示例:最终一致性事务(更快)
session.startTransaction({
  readConcern: { level: "majority" },
  writeConcern: { w: 1 }
});

事务最佳实践

// 1. 保持事务简短
// ❌ 错误:长时间运行的事务
session.startTransaction();
// ... 执行大量操作 ...
session.commitTransaction();
 
// ✅ 正确:短小的事务
async function processOrder(orderId) {
  const session = db.getMongo().startSession();
  try {
    session.startTransaction();
    
    // 只包含必要的写操作
    await Promise.all([
      updateOrderStatus(orderId, "confirmed", session),
      reserveInventory(orderId, session),
      sendConfirmation(orderId)  // 非关键操作移到事务外
    ]);
    
    await session.commitTransaction();
  } finally {
    session.endSession();
  }
}
 
// 2. 使用保存点(MongoDB 4.2+)
session.startTransaction();
try {
  db.collection1.updateOne({ ... }, { ... }, { session });
  session.commitTransaction();
} catch (e) {
  session.abortTransaction();
}
 
// 3. 事务重试逻辑
async function executeWithRetry(operation, maxRetries = 3) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    const session = db.getMongo().startSession();
    try {
      session.startTransaction();
      const result = await operation(session);
      await session.commitTransaction();
      return result;
    } catch (error) {
      await session.abortTransaction();
      
      if (error.hasErrorLabel && 
          error.hasErrorLabel("TransientTransactionError") &&
          attempt < maxRetries) {
        // 重试
        await new Promise(resolve => setTimeout(resolve, 100 * attempt));
        continue;
      }
      throw error;
    } finally {
      session.endSession();
    }
  }
}
 
// 4. 监控事务状态
db.adminCommand({
  serverStatus: 1,
  trasactions: 1
});

MongoDB Change Streams

基础使用

// 1. 监听集合变更
const changeStream = db.users.watch();
 
changeStream.on("change", change => {
  console.log("Change detected:", change);
  
  switch (change.operationType) {
    case "insert":
      console.log("New document:", change.fullDocument);
      break;
    case "update":
      console.log("Updated document:", change.fullDocument);
      console.log("Update fields:", change.updateDescription);
      break;
    case "delete":
      console.log("Deleted document ID:", change.documentKey);
      break;
    case "replace":
      console.log("Replaced document:", change.fullDocument);
      break;
  }
});
 
// 2. 监听特定操作类型
const changeStream = db.orders.watch([
  { $match: { operationType: { $in: ["insert", "update", "delete"] } } }
]);
 
// 3. 只监听特定字段的变更
const changeStream = db.orders.watch([
  { $match: { "updateDescription.updatedFields.status": { $exists: true } } }
]);
 
// 4. 监听多个集合
const changeStream1 = db.orders.watch();
const changeStream2 = db.users.watch();
 
// 5. 监听整个数据库
const changeStream = db.watch([
  { $match: { operationType: { $in: ["insert", "update", "delete"] } } }
]);

恢复光标

// 1. 使用 resume token 恢复变更流
let resumeToken = null;
const changeStream = db.orders.watch();
 
changeStream.on("change", change => {
  resumeToken = change._id;
  
  if (change.operationType === "insert") {
    // 处理变更...
  }
});
 
// 2. 从指定时间恢复
const timestamp = new Date("2024-01-15T10:30:00Z");
const changeStream = db.orders.watch([], {
  startAtOperationTime: timestamp
});
 
// 3. 从指定变更恢复
const changeStream = db.orders.watch([], {
  resumeAfter: { _id: ObjectId("...") }
});
 
// 4. 完整的恢复示例
async function watchCollectionWithResume(collection) {
  let resumeToken = getLastResumeToken();  // 从持久化存储获取
  
  const options = resumeToken 
    ? { resumeAfter: resumeToken }
    : {};
  
  const changeStream = collection.watch([], options);
  
  for await (const change of changeStream) {
    console.log("Processing change:", change);
    
    // 持久化 resume token
    await saveResumeToken(change._id);
    
    // 处理业务逻辑
    await processChange(change);
  }
}
 
// 5. 错误处理和重连
async function resilientWatch(collection) {
  while (true) {
    try {
      const changeStream = collection.watch();
      
      for await (const change of changeStream) {
        await processChange(change);
      }
    } catch (error) {
      console.error("Change stream error:", error);
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  }
}

变更流聚合管道

// 1. 使用聚合管道过滤
const changeStream = db.orders.watch([
  {
    $match: {
      $or: [
        { operationType: "update" },
        { operationType: "insert" }
      ]
    }
  }
]);
 
// 2. 监听特定文档
const orderId = ObjectId("...");
const changeStream = db.orders.watch([
  { $match: { "documentKey": orderId } }
]);
 
// 3. 只监听删除操作(因为 update/delete 后文档不可用)
const changeStream = db.orders.watch([
  { $match: { operationType: { $in: ["delete", "drop", "rename"] } } }
]);
 
// 4. 监听集合删除
const changeStream = db.watch([
  { $match: { operationType: { $in: ["drop", "rename"] } } }
]);
 
// 5. 监听索引变更
const changeStream = db.orders.watch([
  { $match: { operationType: { $in: ["createIndex", "dropIndex"] } } }
]);

实际应用场景

// 场景 1:实时通知
async function setupNotifications(db) {
  const changeStream = db.orders.watch([
    { $match: { operationType: "update" } }
  ]);
  
  for await (const change of changeStream) {
    const { orderId, newStatus } = change.documentKey;
    
    if (change.updateDescription.updatedFields.status === "shipped") {
      await sendPushNotification(orderId, "Order shipped!");
    }
  }
}
 
// 场景 2:数据同步
async function syncToElasticsearch(db) {
  const changeStream = db.products.watch();
  
  for await (const change of changeStream) {
    switch (change.operationType) {
      case "insert":
      case "update":
      case "replace":
        await elasticsearch.index({
          index: "products",
          id: change.documentKey._id.toString(),
          body: change.fullDocument
        });
        break;
      case "delete":
        await elasticsearch.delete({
          index: "products",
          id: change.documentKey._id.toString()
        });
        break;
    }
  }
}
 
// 场景 3:审计日志
async function createAuditLog(db) {
  const changeStream = db.watch([
    { $match: { 
        operationType: { $in: ["insert", "update", "delete"] },
        "ns.coll": { $nin: ["audit_log", "system.sessions"] }
    }}
  ]);
  
  for await (const change of changeStream) {
    await db.audit_log.insertOne({
      timestamp: new Date(),
      operation: change.operationType,
      collection: change.ns.coll,
      documentKey: change.documentKey,
      fullDocument: change.fullDocument,
      updateDescription: change.updateDescription,
      user: change.user
    });
  }
}
 
// 场景 4:缓存失效
async function invalidateCache(db, cache) {
  const changeStream = db.users.watch([
    { $match: { operationType: "update" } }
  ]);
  
  for await (const change of changeStream) {
    const userId = change.documentKey._id.toString();
    cache.del(`user:${userId}`);
    cache.del(`user:${userId}:profile`);
  }
}

MongoDB 分片集群

分片架构

// 分片集群组件:
// 1. mongos - 路由进程
// 2. config servers - 配置服务器(存储集群元数据)
// 3. shard servers - 分片服务器(存储实际数据)
 
// 分片键选择原则:
// 1. 高基数 - 字段值要有足够多的不同值
// 2. 低频率 - 不是所有文档都有该字段
// 3. 非单调 - 避免热块(如自增 _id)
 
// 示例:用户集合分片
// ✅ 好:基于用户 ID
db.adminCommand({
  shardCollection: "mydb.user_sessions",
  key: { user_id: 1, created_at: 1 }
});
 
// ✅ 好:基于地理位置
db.adminCommand({
  shardCollection: "mydb.checkins",
  key: { location: "2dsphere" }
});
 
// ✅ 好:基于时间
db.adminCommand({
  shardCollection: "mydb.logs",
  key: { timestamp: 1 }
});
 
// ❌ 差:基于状态(低基数)
db.adminCommand({
  shardCollection: "mydb.orders",
  key: { status: 1 }  // status 只有几个值
});

分片策略

// 1. 哈希分片(数据均匀分布)
db.adminCommand({
  shardCollection: "mydb.orders",
  key: { order_id: "hashed" }
});
 
// 2. 范围分片
db.adminCommand({
  shardCollection: "mydb.sales",
  key: { sale_date: 1 }
});
 
// 3. 复合分片键
db.adminCommand({
  shardCollection: "mydb.events",
  key: { tenant_id: 1, created_at: 1 }
});
 
// 4. 多键索引分片(数组字段)
db.adminCommand({
  shardCollection: "mydb.products",
  key: { tags: 1 }  // tags 是数组
});
 
// 5. 区域分片(地理位置感知)
// 创建区域
db.adminCommand({
  shardingState: {
    enableSharding: "mydb"
  }
});
 
db.adminCommand({
  updateZoneKeyRange: {
    ns: "mydb.users",
    min: { region: "US" },
    max: { region: "US\0" },
    zone: "US_zone"
  }
});

分片管理

// 1. 查看分片状态
db.adminCommand({ shardCollection: 1 });
db.adminCommand({ listShards: 1 });
 
// 2. 查看 chunk 分布
db.adminCommand({ getShardDistribution: 1 });
 
// 3. 手动移动 chunk
db.adminCommand({
  moveChunk: "mydb.users",
  find: { user_id: ObjectId("...") },
  to: "shard0001"
});
 
// 4. 分割 chunk
db.adminCommand({
  split: "mydb.users",
  middle: { user_id: 5000 }
});
 
// 5. 均衡器控制
// 查看均衡器状态
db.adminCommand({ balancerStatus: 1 });
 
// 开启/关闭均衡器
db.adminCommand({ balancerStop: 1 });
db.adminCommand({ balancerStart: 1 });
 
// 设置均衡窗口
db.adminCommand({
  balancerWindow: {
    start: "23:00",
    stop: "06:00",
    stop: "12:00"
  }
});
 
// 6. 添加分片
db.adminCommand({
  addShard: "rs2/hostname:27018"
});
 
// 7. 移除分片
db.adminCommand({
  removeShard: "shard0002"
});
 
// 8. 查看待迁移的 chunk
db.adminCommand({
  moves: 1
});

分片查询

// 1. 广播查询(所有分片)
// 没有分片键的查询会广播到所有分片
db.orders.find({ status: "pending" });  // 广播查询
 
// 2. 定向查询(单个分片)
// 使用分片键的查询只访问目标分片
db.orders.find({ user_id: ObjectId("...") });  // 定向查询
 
// 3. 强制使用索引
db.orders.find({ user_id: ObjectId("...") })
  .hint({ user_id: 1, created_at: -1 });
 
// 4. 分片聚合
db.orders.aggregate([
  { $match: { created_at: { $gte: ISODate("2024-01-01") } } },
  { $group: { _id: "$status", count: { $sum: 1 } } },
  { $sort: { count: -1 } }
]);
 
// 5. $lookup 在分片集合
// 分片集合不能使用 $lookup 联结其他分片集合
// 解决方案:使用 $lookup 的 pipeline 形式
db.orders.aggregate([
  { $match: { user_id: ObjectId("...") } },
  { $lookup: {
      from: "users",
      let: { userId: "$user_id" },
      pipeline: [
        { $match: { $expr: { $eq: ["$_id", "$$userId"] } } },
        { $project: { name: 1, email: 1 } }
      ],
      as: "user_info"
    }
  }
]);

MongoDB 性能监控

诊断命令

// 1. 服务器状态
db.serverStatus();
 
// 2. 当前操作(类似 MySQL 的 PROCESSLIST)
db.currentOp({ "active": true });
 
// 查看慢查询
db.currentOp({
  "active": true,
  "secs_running": { $gt: 5 }
});
 
// 查看锁等待
db.currentOp({
  "waitingForLock": true
});
 
// 终止操作
db.killOp(opid);
 
// 3. 锁信息
db.adminCommand({ top: 1 });
 
// 4. 数据库统计
db.stats();
db.users.stats();
 
// 5. 集合统计
db.users.stats({ scale: 1024 * 1024 });  // MB 为单位
 
// 6. 索引统计
db.users.aggregate([{ $indexStats: {} }]);
 
// 7. 连接状态
db.adminCommand({ connectionStatus: 1 });

Profiler 分析

// 1. 开启 profiler
db.setProfilingLevel(2, { slowms: 100, sampleRate: 1 });
// level: 0 - 关闭
// level: 1 - 只记录慢查询
// level: 2 - 记录所有查询
 
// 2. 查看 profiler 集合
db.system.profile.find().sort({ ts: -1 }).limit(10);
 
// 3. 分析慢查询
db.system.profile.aggregate([
  { $match: { millis: { $gt: 100 } } },
  { $group: {
      _id: "$op",
      count: { $sum: 1 },
      avgTime: { $avg: "$millis" },
      maxTime: { $max: "$millis" },
      queries: { $push: "$command" }
    }
  }
]);
 
// 4. 分析耗时阶段
db.system.profile.aggregate([
  { $match: { op: "aggregate" } },
  { $unwind: "$secs" },
  { $group: {
      _id: "$secs.name",
      count: { $sum: 1 }
    }
  }
]);

Atlas 监控

// Atlas 提供了开箱即用的监控功能:
// 1. Real-Time Performance Panel
// 2. Metrics Explorer
// 3. Query Profiler
// 4. Performance Advisor
// 5. Data Explorer
 
// 推荐配置的 Atlas 告警:
// - 内存使用率 > 80%
// - CPU 使用率 > 75%
// - 磁盘使用率 > 85%
// - 连接数 > 80%
// - 慢查询率 > 5%
// - 副本集延迟 > 1 秒
// - oplog 窗口 < 24 小时
 
// 自定义指标
// 在 Atlas > Integrations 创建自定义告警规则

性能优化技巧

// 1. 投影减少网络传输
db.users.findOne(
  { _id: ObjectId("...") },
  { name: 1, email: 1 }  // 只返回这两个字段
);
 
// 2. 限制返回文档数
db.posts.find({ author_id: ObjectId("...") }).limit(20);
 
// 3. 避免 skip-large-offset
// ❌ 慢
db.posts.find().skip(10000).limit(20);
// ✅ 快
db.posts.find({ _id: { $gt: lastId } }).limit(20);
 
// 4. 批量操作
const bulkOps = [
  { insertOne: { document: { name: "User 1" } } },
  { insertOne: { document: { name: "User 2" } } },
  { updateOne: { filter: { _id: 1 }, update: { $set: { name: "Updated" } } } },
  { deleteOne: { filter: { _id: 2 } } }
];
db.users.bulkWrite(bulkOps, { ordered: false });
 
// 5. 解释执行计划
db.users.find({ email: "test@example.com" }).explain("executionStats");
// 分析:
// IXSCAN - 使用索引扫描(好)
// COLLSCAN - 全表扫描(差)
// FETCH - 获取完整文档
 
// 6. 覆盖索引查询
// 创建索引覆盖查询字段
db.users.createIndex({ email: 1, name: 1 });
// 查询只返回索引字段
db.users.find({ email: "test@example.com" }, { email: 1, name: 1 });

SUCCESS

本文档全面介绍了 MongoDB 的核心特性、文档模型、聚合管道、索引优化、副本集分片以及高可用架构。MongoDB 凭借其灵活的文档模型、强大的聚合框架和水平扩展能力,是现代应用开发的重要选择。