PostgreSQL 完全指南

NOTE

本文档最后更新于 2026年4月,涵盖 PostgreSQL 17 新特性、JSON 支持、向量搜索(pgvector)、PL/pgSQL 存储过程,以及在 AI 应用中的实战场景。


目录

  1. PostgreSQL 概述与核心定位
  2. PostgreSQL 17 新特性
  3. JSON 数据类型详解
  4. 全文搜索
  5. 高级 SQL 特性
  6. PostgreSQL vs MySQL vs MariaDB 对比
  7. pgvector 向量搜索
  8. Prisma/Drizzle 中的 PostgreSQL
  9. 实战场景与代码示例
  10. 选型建议

PostgreSQL 概述与核心定位

为什么选择 PostgreSQL

PostgreSQL 是一个功能完备、特性丰富的开源关系型数据库,由加州大学伯克利分校于 1986 年发起,历经近 40 年的发展,已成为「功能最全面的开源数据库」。

PostgreSQL 的核心优势体现在四个维度:

功能完备:PostgreSQL 提供了业界最丰富的 SQL 特性,包括窗口函数、CTE、递归查询、触发器、存储过程、JSON 支持等。

扩展性强:支持超过 100 种扩展插件,包括向量搜索(pgvector)、地理信息(PostGIS)、时序数据(TimescaleDB)等。

标准兼容:严格遵循 SQL 标准,兼容性极佳。迁移成本低,不会被锁定在特定厂商。

性能卓越:在 OLTP、OLAP、混合负载场景下均有出色表现。MVCC 机制提供了优秀的并发控制。

PostgreSQL 在 AI 时代的价值

在 AI 应用中,PostgreSQL 的价值被重新发现:

  • 向量搜索:pgvector 扩展让 PostgreSQL 支持 Embedding 相似度搜索
  • 混合存储:结构化数据与向量数据共存于单一数据库
  • RLS 策略:行级安全策略实现细粒度访问控制
  • JSON 支持:灵活处理 AI 输出的半结构化数据

市场份额与行业地位

PostgreSQL 在全球数据库市场中的地位持续攀升:

排名数据库市场份额(2025)趋势
1PostgreSQL28.5%
2MySQL25.2%
3MongoDB12.8%
4Microsoft SQL Server10.5%
5Redis8.3%

关键数据点

  • PostgreSQL 连续多年被评为「开发者最喜爱的数据库」
  • 超过 50% 的新项目选择 PostgreSQL 作为主数据库
  • 主要使用者包括苹果、Spotify、Netflix、Uber、Airbnb 等科技巨头
  • 在 DB-Engines 排名中稳居前五

PostgreSQL 适用场景

场景推荐程度说明
企业级应用⭐⭐⭐⭐⭐功能完备,稳定可靠
AI/ML 应用⭐⭐⭐⭐⭐pgvector 向量搜索
数据仓库⭐⭐⭐⭐⭐强大的分析能力
Web 应用⭐⭐⭐⭐性能优秀
移动后端⭐⭐⭐⭐可靠性和扩展性
GIS 应用⭐⭐⭐⭐⭐PostGIS 扩展

核心概念与数据模型

PostgreSQL 架构

PostgreSQL 采用客户端-服务器架构,主要组件包括:

┌─────────────────────────────────────────────────────────────┐
│                        PostgreSQL 架构                         │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   客户端应用层                                                │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐                  │
│   │  psql   │  │  libpq   │  │ JDBC/ODBC│                  │
│   └────┬─────┘  └────┬─────┘  └────┬─────┘                  │
│        │             │             │                         │
│        └─────────────┼─────────────┘                         │
│                      ▼                                        │
│   ┌─────────────────────────────────────────────────────┐   │
│   │              Postmaster 守护进程                       │   │
│   │  ┌─────────┐  ┌─────────┐  ┌─────────┐            │   │
│   │  │ 连接管理 │  │ 查询分析 │  │ 存储管理 │            │   │
│   │  └─────────┘  └─────────┘  └─────────┘            │   │
│   └─────────────────────────────────────────────────────┘   │
│                      │                                        │
│        ┌────────────┼────────────┐                         │
│        ▼            ▼            ▼                         │
│   ┌─────────┐  ┌─────────┐  ┌─────────┐                  │
│   │ Buffer  │  │  WAL    │  │ 锁管理  │                  │
│   │  Pool   │  │  Log    │  │         │                  │
│   └─────────┘  └─────────┘  └─────────┘                  │
│                                                              │
│   ┌─────────────────────────────────────────────────────┐   │
│   │                    数据文件                             │   │
│   │  (表、索引、TOAST、MVCC 多版本)                       │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

核心概念

1. MVCC(多版本并发控制)

PostgreSQL 使用 MVCC 实现并发控制,每个事务看到的是数据库的一致性快照:

-- 查看事务隔离级别
SHOW transaction_isolation;
 
-- 查看当前事务 ID
SELECT txid_current();
 
-- 查看当前快照
SELECT * FROM pg_stat_activity WHERE pid = pg_backend_pid();
 
-- 事务示例:两个并发事务
-- 事务 A(可重复读)
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT balance FROM accounts WHERE id = 1;  -- 看到余额 1000
-- 此时事务 B 修改了余额为 2000
SELECT balance FROM accounts WHERE id = 1;  -- 仍看到 1000
COMMIT;  -- 提交时检查一致性
 
-- 事务 B(读已提交)
BEGIN;
UPDATE accounts SET balance = 2000 WHERE id = 1;
COMMIT;  -- 提交后其他事务可见

2. TOAST(超长字段存储)

PostgreSQL 自动将大字段存储到独立的 TOAST 表中:

-- 创建使用 TOAST 的表
CREATE TABLE articles (
    id SERIAL PRIMARY KEY,
    title VARCHAR(255),
    content TEXT,      -- 大字段,自动使用 TOAST
    metadata JSONB     -- JSON 也可能使用 TOAST
);
 
-- 查看 TOAST 表
SELECT relname, reltoastrelid
FROM pg_class
WHERE relname = 'articles';
 
-- TOAST 压缩策略
-- 可在创建表时指定
CREATE TABLE large_data (
    data BYTEA
) WITH (toast_tuple_target = 248);

3. VACUUM 机制

VACUUM 是 PostgreSQL 维护的重要机制:

-- 标准 VACUUM(不阻塞读写)
VACUUM articles;
 
-- VACUUM FULL(会阻塞,需要重建表)
VACUUM FULL articles;
 
-- 带分析的 VACUUM
VACUUM ANALYZE articles;
 
-- 查看 VACUUM 状态
SELECT * FROM pg_stat_progress_vacuum;
 
-- 查看死亡元组
SELECT schemaname, relname, n_dead_tup, n_live_tup
FROM pg_stat_user_tables
ORDER BY n_dead_tup DESC;
 
-- 自动 VACUUM 配置
ALTER TABLE articles SET (
    autovacuum_vacuum_threshold = 50,
    autovacuum_analyze_threshold = 50,
    autovacuum_vacuum_scale_factor = 0.1,
    autovacuum_analyze_scale_factor = 0.05
);

数据模型设计

1. 规范化设计

-- 高度规范化(第三范式)
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);
 
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    order_date DATE NOT NULL,
    total DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) DEFAULT 'pending'
);
 
CREATE TABLE order_items (
    id SERIAL PRIMARY KEY,
    order_id INTEGER REFERENCES orders(id),
    product_id INTEGER REFERENCES products(id),
    quantity INTEGER NOT NULL,
    price DECIMAL(10,2) NOT NULL
);

2. 反规范化设计(读多写少场景)

-- 添加冗余字段优化查询
CREATE TABLE orders_denormalized (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    customer_name VARCHAR(100),  -- 冗余字段
    customer_email VARCHAR(255), -- 冗余字段
    order_date DATE NOT NULL,
    total DECIMAL(10,2) NOT NULL,
    item_count INTEGER,          -- 冗余字段
    items_summary JSONB          -- 冗余字段
);
 
-- 使用物化视图
CREATE MATERIALIZED VIEW monthly_sales AS
SELECT 
    DATE_TRUNC('month', order_date) AS month,
    COUNT(*) AS order_count,
    SUM(total) AS total_sales,
    AVG(total) AS avg_order_value
FROM orders
GROUP BY DATE_TRUNC('month', order_date)
WITH DATA;
 
CREATE UNIQUE INDEX idx_monthly_sales ON monthly_sales(month);
 
-- 刷新物化视图
REFRESH MATERIALIZED VIEW monthly_sales;
REFRESH MATERIALIZED VIEW CONCURRENTLY monthly_sales;  -- 不阻塞查询


PostgreSQL 17 新特性

PostgreSQL 17 于 2024 年 9 月发布,是自 PostgreSQL 16 以来的重大更新。

核心新特性

1. 改进的 VACUUM 机制

-- VACUUM 现在可以并行执行
VACUUM (PARALLEL 4) large_table;
 
-- 新的 VACUUM 选项
VACUUM (SKIP_LOCKED, INDEX_CLEANUP AUTO) large_table;
 
-- 监控 VACUUM 进度
SELECT * FROM pg_stat_progress_vacuum;

2. JSON_TABLE 函数

-- 将 JSON 数据转换为关系表
SELECT *
FROM json_table(
    '{"users": [
        {"name": "Alice", "age": 30},
        {"name": "Bob", "age": 25}
    ]}'::json,
    '$.users[*]'
    COLUMNS (
        name TEXT PATH '$.name',
        age INT PATH '$.age'
    )
) AS users;
 
-- 结果:
--  name  | age
-- -------+-----
--  Alice |  30
--  Bob   |  25

3. 增强的 MERGE 语句

-- 更强大的 MERGE 支持
MERGE INTO target_table AS t
USING source_view AS s
ON t.id = s.id
WHEN MATCHED AND s.type = 'UPDATE' THEN
    UPDATE SET
        name = s.name,
        updated_at = NOW()
WHEN MATCHED AND s.type = 'DELETE' THEN
    DELETE
WHEN NOT MATCHED THEN
    INSERT (id, name, created_at)
    VALUES (s.id, s.name, NOW());

4. SQL/JSON 增强

-- JSON_EXISTS 函数(检查路径是否存在)
SELECT JSON_EXISTS(
    '{"user": {"name": "Alice"}}'::json,
    '$.user.name'
);  -- 返回 true
 
-- JSON_QUERY 函数(提取 JSON 值)
SELECT JSON_QUERY(
    '{"items": [1, 2, 3]}'::json,
    '$.items'
);  -- 返回 '[1, 2, 3]'

5. 改进的 EXPLAIN 输出

-- 更详细的执行计划
EXPLAIN (ANALYZE, VERBOSE, BUFFERS, SETTINGS)
SELECT * FROM users WHERE age > 25;

其他改进

特性说明
增量备份改进更高效的增量备份机制
逻辑复制增强支持更多数据类型
正则表达式新增 Unicode 属性支持
窗口函数支持更多窗口帧选项

JSON 数据类型详解

PostgreSQL 提供了两种 JSON 数据类型:jsonjsonb,它们在存储和查询方式上有显著差异。

json vs jsonb 对比

特性jsonjsonb
存储格式存储原始 JSON 字符串二进制格式存储
查询性能每次查询需重新解析直接索引,性能更高
索引支持有限支持 GIN、GiST 索引
空间占用较小(无转换开销)较大(预处理)
类型保留保留原始格式保留数据类型
使用场景临时数据、日志高频查询的核心数据

JSON 查询操作符

-- 创建表
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name TEXT,
    attributes JSONB
);
 
-- 插入数据
INSERT INTO products (name, attributes) VALUES
    ('iPhone 15', '{"color": "black", "storage": "256GB", "price": 999}'),
    ('MacBook Pro', '{"color": "silver", "storage": "512GB", "price": 1999}'),
    ('AirPods', '{"color": "white", "price": 199}');
 
-- 查询:提取 JSON 字段
SELECT 
    name,
    attributes->>'color' AS color,
    attributes->>'price' AS price
FROM products;
 
-- 查询:使用 WHERE 子句过滤
SELECT * FROM products
WHERE attributes->>'color' = 'black';
 
-- 查询:数字比较
SELECT * FROM products
WHERE (attributes->>'price')::numeric < 1000;
 
-- 查询:使用 jsonb 操作符(更高效)
SELECT * FROM products
WHERE attributes @> '{"color": "black"}';  -- 包含指定对象
 
SELECT * FROM products
WHERE attributes ? 'color';  -- 包含指定键
 
SELECT * FROM products
WHERE attributes ?| array['color', 'storage'];  -- 包含任一键

JSON 函数进阶

-- jsonb_object_keys:获取所有键
SELECT jsonb_object_keys(attributes)
FROM products;
 
-- jsonb_each:展开为键值对
SELECT key, value
FROM products,
    jsonb_each(attributes);
 
-- jsonb_agg:聚合为 JSON
SELECT 
    name,
    jsonb_object_agg(key, value) AS attributes
FROM products,
    jsonb_each(attributes::jsonb)
GROUP BY name;
 
-- jsonb_build_object:构建 JSON 对象
SELECT jsonb_build_object(
    'name', name,
    'price', (attributes->>'price')::numeric,
    'color', attributes->>'color'
) AS product_info
FROM products;
 
-- 递归查询 JSON
WITH RECURSIVE json_tree AS (
    -- 初始查询:顶层对象
    SELECT 
        id,
        attributes,
        0 AS depth,
        jsonb_array_elements(
            CASE 
                WHEN jsonb_typeof(attributes->'items') = 'array' 
                THEN attributes->'items' 
                ELSE '[]'::jsonb 
            END
        ) AS item
    FROM products
    
    UNION ALL
    
    -- 递归:嵌套数组
    SELECT 
        p.id,
        p.attributes,
        jt.depth + 1,
        jsonb_array_elements(
            CASE 
                WHEN jsonb_typeof(jt.item->'children') = 'array' 
                THEN jt.item->'children' 
                ELSE '[]'::jsonb 
            END
        )
    FROM products p
    JOIN json_tree jt ON p.id = jt.id
    WHERE jt.depth < 10  -- 防止无限递归
)
SELECT * FROM json_tree;

JSON 索引

-- 创建 GIN 索引(适用于 JSON 包含查询)
CREATE INDEX idx_products_attributes 
ON products USING GIN (attributes);
 
-- 创建表达式索引(适用于特定路径查询)
CREATE INDEX idx_products_color 
ON products ((attributes->>'color'));
 
-- 创建部分索引
CREATE INDEX idx_products_black 
ON products ((attributes->>'price'))
WHERE attributes @> '{"color": "black"}';
 
-- 验证索引使用
EXPLAIN SELECT * FROM products
WHERE attributes @> '{"color": "black"}';

全文搜索

PostgreSQL 内置了强大的全文搜索功能,无需额外安装插件。

全文搜索基础

-- 创建全文搜索配置(中文建议使用 zhparser 或 pg_jieba)
-- 这里使用默认的 english 配置
 
-- 索引列
ALTER TABLE articles ADD COLUMN 
    search_vector TSVECTOR
    GENERATED ALWAYS AS (
        to_tsvector('english', coalesce(title, '') || ' ' || coalesce(content, ''))
    ) STORED;
 
-- 创建索引
CREATE INDEX idx_articles_search 
ON articles USING GIN (search_vector);
 
-- 搜索查询
SELECT 
    title,
    ts_rank(search_vector, query) AS rank
FROM articles,
    to_tsquery('english', 'AI & (Python | JavaScript)') query
WHERE search_vector @@ query
ORDER BY rank DESC;
 
-- 高级搜索:包含短语
SELECT * FROM articles
WHERE search_vector @@ to_tsquery('english', 'PostgreSQL <-> 17');
 
-- 搜索结果高亮
SELECT 
    title,
    ts_headline(
        'english',
        content,
        to_tsquery('english', 'AI'),
        'StartSel=<mark>, StopSel=</mark>'
    ) AS snippet
FROM articles
WHERE search_vector @@ to_tsquery('english', 'AI');

搜索权重与排名

-- 为不同字段设置权重
ALTER TABLE articles ADD COLUMN 
    search_vector_weighted TSVECTOR
    GENERATED ALWAYS AS (
        setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
        setweight(to_tsvector('english', coalesce(summary, '')), 'B') ||
        setweight(to_tsvector('english', coalesce(content, '')), 'C')
    ) STORED;
 
-- 使用加权搜索
SELECT 
    title,
    ts_rank(search_vector_weighted, query) AS rank
FROM articles,
    plainto_tsquery('english', 'postgresql features') query
WHERE search_vector_weighted @@ query
ORDER BY rank DESC;

高级 SQL 特性

窗口函数

窗口函数允许在不影响原始数据的情况下进行跨行计算:

-- 创建示例数据
CREATE TABLE sales (
    id SERIAL,
    region TEXT,
    salesperson TEXT,
    sale_date DATE,
    amount NUMERIC
);
 
-- 窗口函数示例
SELECT 
    salesperson,
    region,
    amount,
    SUM(amount) OVER (
        PARTITION BY region  -- 按区域分组
        ORDER BY sale_date   -- 按日期排序
    ) AS running_total,
    
    AVG(amount) OVER (
        PARTITION BY region
    ) AS region_avg,
    
    amount - AVG(amount) OVER (
        PARTITION BY region
    ) AS diff_from_avg,
    
    RANK() OVER (
        PARTITION BY region
        ORDER BY amount DESC
    ) AS rank_in_region,
    
    LAG(amount, 1) OVER (
        PARTITION BY salesperson
        ORDER BY sale_date
    ) AS prev_sale,
    
    LEAD(amount, 1) OVER (
        PARTITION BY salesperson
        ORDER BY sale_date
    ) AS next_sale
FROM sales;

CTE(公共表表达式)

-- 普通 CTE
WITH monthly_sales AS (
    SELECT 
        DATE_TRUNC('month', sale_date) AS month,
        SUM(amount) AS total
    FROM sales
    GROUP BY DATE_TRUNC('month', sale_date)
),
avg_sales AS (
    SELECT AVG(total) AS average FROM monthly_sales
)
SELECT 
    ms.month,
    ms.total,
    a.average,
    ms.total - a.average AS diff
FROM monthly_sales ms
CROSS JOIN avg_sales a
ORDER BY ms.month;
 
-- 递归 CTE:生成树形结构
WITH RECURSIVE category_tree AS (
    -- 基础查询:根节点
    SELECT 
        id,
        name,
        parent_id,
        1 AS depth,
        name::text AS path
    FROM categories
    WHERE parent_id IS NULL
    
    UNION ALL
    
    -- 递归查询:子节点
    SELECT 
        c.id,
        c.name,
        c.parent_id,
        ct.depth + 1,
        ct.path || ' > ' || c.name
    FROM categories c
    JOIN category_tree ct ON c.parent_id = ct.id
    WHERE ct.depth < 10  -- 防止无限递归
)
SELECT * FROM category_tree ORDER BY path;

高级聚合

-- FILTER 子句
SELECT 
    region,
    SUM(amount) AS total_sales,
    SUM(amount) FILTER (WHERE sale_date >= '2024-01-01') AS ytd_sales
FROM sales
GROUP BY region;
 
-- GROUPING SETS:多维度聚合
SELECT 
    region,
    salesperson,
    SUM(amount) AS total
FROM sales
GROUP BY GROUPING SETS (
    (region, salesperson),  -- 按区域和销售人员
    (region,),              -- 按区域
    (salesperson,),        -- 按销售人员
    ()                     -- 总计
);
 
-- CUBE:所有组合
SELECT 
    region,
    salesperson,
    SUM(amount) AS total
FROM sales
GROUP BY CUBE (region, salesperson);
 
-- ROLLUP:层次结构
SELECT 
    region,
    salesperson,
    SUM(amount) AS total
FROM sales
GROUP BY ROLLUP (region, salesperson);

PostgreSQL vs MySQL vs MariaDB 对比

核心对比表

特性PostgreSQLMySQLMariaDB
最新稳定版17.x9.0.x11.6.x
许可证PostgreSQL License (BSD)GPLv2 / 商业版GPLv2
ACID 完整性完整支持完整支持(InnoDB)完整支持
MVCC原生支持支持(InnoDB)支持
JSON 支持原生(jsonb)5.7+ 支持10.2+ 支持
向量搜索pgvector 扩展插件支持插件支持
地理信息PostGIS插件插件
全文搜索内置支持支持
存储过程PL/pgSQL, PL/Perl, PL/PythonSQL/PSMSQL/PSM
窗口函数完全支持支持支持
CTE/递归完全支持8.0+ 支持10.2+ 支持
分区声明式分区支持支持
复制物理复制、逻辑复制异步、同步、半同步Galera Cluster
连接池内置 PgBouncer/PGpoolProxySQL/HikariCPProxySQL
性能OLAP 强OLTP 强OLTP 强

选择建议

场景推荐选择理由
AI/ML 应用PostgreSQL + pgvector向量搜索 + 关系数据
OLAP 数据仓库PostgreSQL窗口函数、CTE 强大
高并发 OLTPMySQL / MariaDB简单、快速
复杂查询PostgreSQL标准兼容、功能完备
Web 应用MySQL / MariaDB生态成熟、配置简单
地理信息系统PostgreSQL + PostGIS专业 GIS 支持
全文搜索PostgreSQL内置、功能强大

pgvector 向量搜索

pgvector 是 PostgreSQL 的向量嵌入扩展,在 AI 时代具有重要价值。

安装与基础配置

-- 安装扩展(需要超级用户权限)
CREATE EXTENSION IF NOT EXISTS vector;
 
-- 查看已安装扩展
SELECT * FROM pg_extension WHERE extname = 'vector';

向量列与索引

-- 创建带向量列的表
CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    content TEXT,
    embedding VECTOR(1536),  -- OpenAI text-embedding-3-small 维度
    metadata JSONB,
    created_at TIMESTAMP DEFAULT NOW()
);
 
-- 创建 HNSW 索引(推荐,性能最佳)
CREATE INDEX idx_documents_embedding_hnsw 
ON documents 
USING hnsw (embedding vector_cosine_ops);
 
-- 也可以创建 IVFFlat 索引(适合大数据集)
CREATE INDEX idx_documents_embedding_ivf 
ON documents 
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
 
-- 查看索引信息
\d documents

向量搜索操作

-- 插入向量数据
INSERT INTO documents (content, embedding, metadata) VALUES
    (
        'PostgreSQL is a powerful open-source relational database.',
        '[0.1, 0.2, 0.3, ...]::vector(1536)',
        '{"source": "wiki", "category": "database"}'
    );
 
-- 余弦相似度搜索
SELECT 
    id,
    content,
    1 - (embedding <=> '[query_vector]::vector(1536)') AS similarity
FROM documents
ORDER BY embedding <=> '[query_vector]::vector(1536)'
LIMIT 5;
 
-- L2 距离搜索
SELECT 
    id,
    content,
    (embedding <-> '[query_vector]::vector(1536)') AS distance
FROM documents
ORDER BY embedding <-> '[query_vector]::vector(1536)'
LIMIT 5;
 
-- 内积搜索
SELECT 
    id,
    content,
    (embedding <#> '[query_vector]::vector(1536)') AS dot_product
FROM documents
ORDER BY embedding <#> '[query_vector]::vector(1536)'
LIMIT 5;

混合搜索:向量 + 关键词

-- 创建全文搜索列
ALTER TABLE documents ADD COLUMN 
    search_vector TSVECTOR
    GENERATED ALWAYS AS (
        to_tsvector('english', coalesce(content, ''))
    ) STORED;
 
CREATE INDEX idx_documents_text_search 
ON documents USING GIN (search_vector);
 
-- 混合搜索:向量相似度 + 关键词匹配
WITH vector_results AS (
    SELECT 
        id,
        content,
        1 - (embedding <=> '[query_vector]::vector(1536)') AS vector_similarity,
        ts_rank(search_vector, query) AS text_rank
    FROM documents,
        plainto_tsquery('english', 'database') query
    WHERE search_vector @@ query
)
SELECT 
    id,
    content,
    0.7 * vector_similarity + 0.3 * text_rank AS combined_score
FROM vector_results
ORDER BY combined_score DESC
LIMIT 10;
 
-- 过滤搜索:结合元数据
SELECT 
    id,
    content,
    1 - (embedding <=> '[query_vector]::vector(1536)') AS similarity
FROM documents
WHERE metadata @> '{"category": "database"}'
ORDER BY embedding <=> '[query_vector]::vector(1536)'
LIMIT 5;

Python 集成示例

from psycopg2 import connect
import numpy as np
 
# 连接数据库
conn = connect(
    host="localhost",
    database="ai_app",
    user="postgres",
    password="password"
)
cursor = conn.cursor()
 
def generate_embedding(text: str) -> list[float]:
    """生成文本嵌入向量(示例)"""
    # 实际使用 OpenAI API 或本地模型
    return np.random.rand(1536).tolist()
 
def search_similar_documents(query: str, top_k: int = 5) -> list[dict]:
    """搜索相似文档"""
    embedding = generate_embedding(query)
    embedding_str = f"[{','.join(map(str, embedding))}]"
    
    sql = """
        SELECT 
            id,
            content,
            metadata,
            1 - (embedding <=> %s::vector) AS similarity
        FROM documents
        ORDER BY embedding <=> %s::vector
        LIMIT %s
    """
    
    cursor.execute(sql, (embedding_str, embedding_str, top_k))
    results = cursor.fetchall()
    
    return [
        {
            "id": row[0],
            "content": row[1],
            "metadata": row[2],
            "similarity": row[3]
        }
        for row in results
    ]
 
# 使用示例
results = search_similar_documents("What is PostgreSQL?", top_k=5)
for r in results:
    print(f"[{r['similarity']:.3f}] {r['content'][:100]}...")

Prisma/Drizzle 中的 PostgreSQL

Prisma Schema 定义

// schema.prisma
 
generator client {
  provider = "prisma-client-js"
}
 
datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}
 
model User {
  id        String   @id @default(uuid())
  email     String   @unique
  name      String?
  createdAt DateTime @default(now())
  updatedAt DateTime @updatedAt
  
  // 关系
  posts     Post[]
  
  @@index([email])
}
 
model Post {
  id        String   @id @default(uuid())
  title     String
  content   String
  published Boolean  @default(false)
  createdAt DateTime @default(now())
  updatedAt DateTime @updatedAt
  
  // 向量嵌入(存储为原始向量)
  embedding String?  // 存储为 JSON 字符串格式
  
  // 关系
  author    User     @relation(fields: [authorId], references: [id])
  authorId  String
  
  @@index([authorId])
  @@index([published])
}
 
// 扩展 Prisma Client 功能
model Document {
  id        String   @id @default(uuid())
  content   String
  metadata  Json?    // JSON 类型
  embedding Unsupported("vector(1536)")?
  
  createdAt DateTime @default(now())
  
  @@index([embedding], type: Hnsw(ops: VectorCosineOps))
}

Drizzle Schema 定义

// schema.ts
import { pgTable, serial, text, boolean, timestamp, uuid, jsonb, vector } from 'drizzle-orm/pg-core';
 
export const users = pgTable('users', {
  id: uuid('id').defaultRandom().primaryKey(),
  email: text('email').notNull().unique(),
  name: text('name'),
  createdAt: timestamp('created_at').defaultNow().notNull(),
  updatedAt: timestamp('updated_at').defaultNow().notNull(),
});
 
export const posts = pgTable('posts', {
  id: uuid('id').defaultRandom().primaryKey(),
  title: text('title').notNull(),
  content: text('content').notNull(),
  published: boolean('published').default(false).notNull(),
  authorId: uuid('author_id').notNull().references(() => users.id),
  embedding: vector('embedding', { dimensions: 1536 }), // pgvector
  metadata: jsonb('metadata'),
  createdAt: timestamp('created_at').defaultNow().notNull(),
  updatedAt: timestamp('updated_at').defaultNow().notNull(),
});
 
// 向量搜索查询
export async function searchSimilarPosts(client: any, queryEmbedding: number[], limit = 5) {
  const results = await client.execute(sql`
    SELECT 
      id,
      title,
      content,
      1 - (embedding <=> ${queryEmbedding}::vector) AS similarity
    FROM posts
    WHERE published = true
    ORDER BY embedding <=> ${queryEmbedding}::vector
    LIMIT ${limit}
  `);
  return results.rows;
}

实战场景与代码示例

场景一:构建 RAG 系统

-- RAG 系统数据模型
CREATE TABLE knowledge_base (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    source_url TEXT,
    embedding VECTOR(1536),
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);
 
-- 创建 HNSW 索引
CREATE INDEX idx_knowledge_embedding 
ON knowledge_base USING hnsw (embedding vector_cosine_ops);
 
-- 创建全文搜索索引
ALTER TABLE knowledge_base ADD COLUMN 
    search_vector TSVECTOR
    GENERATED ALWAYS AS (
        to_tsvector('english', coalesce(title, '') || ' ' || coalesce(content, ''))
    ) STORED;
 
CREATE INDEX idx_knowledge_text 
ON knowledge_base USING GIN (search_vector);
 
-- 混合搜索函数
CREATE OR REPLACE FUNCTION hybrid_search(
    query_embedding VECTOR(1536),
    query_text TEXT,
    top_k INT DEFAULT 5,
    alpha FLOAT DEFAULT 0.7
)
RETURNS TABLE (
    id UUID,
    title TEXT,
    content TEXT,
    score FLOAT
) AS $$
BEGIN
    RETURN QUERY
    WITH vector_search AS (
        SELECT 
            id,
            title,
            content,
            1 - (embedding <=> query_embedding) AS vector_score
        FROM knowledge_base
        ORDER BY embedding <=> query_embedding
        LIMIT top_k * 2
    ),
    text_search AS (
        SELECT 
            id,
            ts_rank(search_vector, plainto_tsquery('english', query_text)) AS text_score
        FROM knowledge_base
        WHERE search_vector @@ plainto_tsquery('english', query_text)
    )
    SELECT 
        vs.id,
        vs.title,
        vs.content,
        COALESCE(alpha * vs.vector_score + (1 - alpha) * ts.text_score, vs.vector_score) AS score
    FROM vector_search vs
    LEFT JOIN text_search ts ON vs.id = ts.id
    ORDER BY score DESC
    LIMIT top_k;
END;
$$ LANGUAGE plpgsql;

场景二:实时数据订阅

-- 启用逻辑复制
-- 需要在 postgresql.conf 中设置:wal_level = logical
 
-- 创建发布
CREATE PUBLICATION chat_messages FOR TABLE messages;
 
-- 创建订阅(需要 PostgreSQL 14+)
CREATE SUBSCRIPTION chat_realtime
    CONNECTION 'host=localhost port=5432 dbname=ai_chat user=postgres'
    PUBLICATION chat_messages
    WITH (copy_data = true);
 
-- 查看订阅状态
SELECT * FROM pg_stat_subscription;
 
-- 查看复制延迟
SELECT 
    subname,
    received_lsn,
    latest_lsn,
    pg_wal_lsn_diff(received_lsn, latest_lsn) AS replication_lag_bytes
FROM pg_stat_subscription;

选型建议

何时选择 PostgreSQL

场景推荐程度原因
AI 应用(向量搜索)⭐⭐⭐⭐⭐pgvector + 关系数据
复杂查询/报表⭐⭐⭐⭐⭐窗口函数、CTE 强大
需要 JSON 灵活性⭐⭐⭐⭐⭐jsonb 高性能支持
地理信息应用⭐⭐⭐⭐⭐PostGIS 扩展
高并发 OLTP⭐⭐⭐⭐成熟稳定
简单 CRUD 应用⭐⭐⭐可能过于复杂
需要 MySQL 兼容⭐⭐需要额外配置

配置优化建议

-- 关键配置参数(在 postgresql.conf 中设置)
 
-- 内存配置
shared_buffers = '256MB'          -- 建议为系统内存的 25%
effective_cache_size = '1GB'      -- 建议为系统内存的 75%
work_mem = '64MB'                  -- 每个排序/哈希操作
 
-- 并行配置
max_worker_processes = 8           -- CPU 核心数
max_parallel_workers_per_gather = 4
parallel_tuple_cost = 0.1
parallel_setup_cost = 1000
 
-- WAL 配置
wal_buffers = '64MB'
checkpoint_completion_target = 0.9
 
-- 连接配置
max_connections = 200

TIP

对于 AI 应用,推荐使用 PostgreSQL + pgvector 组合,可以同时处理结构化数据和向量数据,减少系统复杂度。



### 备份脚本

```bash
#!/bin/bash
# backup.sh - PostgreSQL 备份脚本

set -e

BACKUP_DIR="/var/backups/postgresql"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=30

mkdir -p "$BACKUP_DIR"

# 逻辑备份
echo "Starting logical backup..."
pg_dump -U postgres -d mydb -F c -Z 5 -f "$BACKUP_DIR/mydb_$DATE.dump"

# 保留最近 N 天的备份
find "$BACKUP_DIR" -name "*.dump" -mtime +$RETENTION_DAYS -delete

# 备份完成
echo "Backup completed: mydb_$DATE.dump"
ls -lh "$BACKUP_DIR"

PL/pgSQL 存储过程详解

基础语法

-- 基础函数
CREATE OR REPLACE FUNCTION get_user_count()
RETURNS INTEGER AS $$
DECLARE
    count INTEGER;
BEGIN
    SELECT COUNT(*) INTO count FROM users;
    RETURN count;
END;
$$ LANGUAGE plpgsql;
 
-- 带参数函数
CREATE OR REPLACE FUNCTION get_user_by_email(user_email VARCHAR)
RETURNS users AS $$
DECLARE
    user_record users%ROWTYPE;
BEGIN
    SELECT * INTO user_record FROM users WHERE email = user_email;
    IF NOT FOUND THEN
        RAISE EXCEPTION 'User not found: %', user_email;
    END IF;
    RETURN user_record;
END;
$$ LANGUAGE plpgsql;
 
-- 返回表
CREATE OR REPLACE FUNCTION get_active_users()
RETURNS TABLE(id BIGINT, name VARCHAR, email VARCHAR) AS $$
BEGIN
    RETURN QUERY
    SELECT u.id, u.name, u.email
    FROM users u
    WHERE u.is_active = true;
END;
$$ LANGUAGE plpgsql;
 
-- 条件逻辑
CREATE OR REPLACE FUNCTION calculate_discount(total DECIMAL)
RETURNS DECIMAL AS $$
DECLARE
    discount_rate DECIMAL;
BEGIN
    IF total > 1000 THEN
        discount_rate := 0.15;
    ELSIF total > 500 THEN
        discount_rate := 0.10;
    ELSIF total > 100 THEN
        discount_rate := 0.05;
    ELSE
        discount_rate := 0;
    END IF;
    
    RETURN total * discount_rate;
END;
$$ LANGUAGE plpgsql;
 
-- 循环
CREATE OR REPLACE FUNCTION generate_series(n INTEGER)
RETURNS TABLE(value INTEGER) AS $$
DECLARE
    i INTEGER := 1;
BEGIN
    WHILE i <= n LOOP
        RETURN QUERY SELECT i;
        i := i + 1;
    END LOOP;
END;
$$ LANGUAGE plpgsql;
 
-- FOR 循环(查询结果)
CREATE OR REPLACE FUNCTION process_pending_orders()
RETURNS VOID AS $$
DECLARE
    order_record RECORD;
BEGIN
    FOR order_record IN 
        SELECT * FROM orders WHERE status = 'pending'
    LOOP
        -- 处理每个订单
        UPDATE orders SET status = 'processing' WHERE id = order_record.id;
    END LOOP;
END;
$$ LANGUAGE plpgsql;
 
-- 异常处理
CREATE OR REPLACE FUNCTION safe_divide(a DECIMAL, b DECIMAL)
RETURNS DECIMAL AS $$
BEGIN
    IF b = 0 THEN
        RAISE WARNING 'Division by zero, returning NULL';
        RETURN NULL;
    END IF;
    RETURN a / b;
EXCEPTION
    WHEN division_by_zero THEN
        RETURN NULL;
    WHEN OTHERS THEN
        RAISE WARNING 'Unexpected error: %', SQLERRM;
        RETURN NULL;
END;
$$ LANGUAGE plpgsql;
 
-- 事务控制
CREATE OR REPLACE FUNCTION transfer_funds(
    from_account BIGINT,
    to_account BIGINT,
    amount DECIMAL
)
RETURNS BOOLEAN AS $$
BEGIN
    -- 检查余额
    IF (SELECT balance FROM accounts WHERE id = from_account) < amount THEN
        RAISE EXCEPTION 'Insufficient funds';
    END IF;
    
    -- 开始事务
    BEGIN
        -- 扣款
        UPDATE accounts SET balance = balance - amount WHERE id = from_account;
        
        -- 存款
        UPDATE accounts SET balance = balance + amount WHERE id = to_account;
        
        -- 记录交易
        INSERT INTO transactions (from_account, to_account, amount) 
        VALUES (from_account, to_account, amount);
        
        RETURN TRUE;
    EXCEPTION
        WHEN OTHERS THEN
            RAISE NOTICE 'Transfer failed: %', SQLERRM;
            RETURN FALSE;
    END;
END;
$$ LANGUAGE plpgsql;
 
-- 触发器函数
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;
 
CREATE TRIGGER update_users_timestamp
    BEFORE UPDATE ON users
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at();
 
-- 审计触发器
CREATE OR REPLACE FUNCTION audit_changes()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO audit_log (table_name, action, new_data)
        VALUES (TG_TABLE_NAME, 'INSERT', to_jsonb(NEW));
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO audit_log (table_name, action, old_data, new_data)
        VALUES (TG_TABLE_NAME, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW));
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO audit_log (table_name, action, old_data)
        VALUES (TG_TABLE_NAME, 'DELETE', to_jsonb(OLD));
        RETURN OLD;
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;
 
CREATE TRIGGER audit_users_changes
    AFTER INSERT OR UPDATE OR DELETE ON users
    FOR EACH ROW
    EXECUTE FUNCTION audit_changes();
 
-- 游标使用
CREATE OR REPLACE FUNCTION process_large_batch()
RETURNS VOID AS $$
DECLARE
    rec RECORD;
    cur CURSOR FOR SELECT * FROM orders WHERE processed = false;
BEGIN
    OPEN cur;
    LOOP
        FETCH cur INTO rec;
        EXIT WHEN NOT FOUND;
        
        -- 处理每条记录
        -- ...
        
        UPDATE orders SET processed = true WHERE id = rec.id;
    END LOOP;
    CLOSE cur;
END;
$$ LANGUAGE plpgsql;
 
-- 动态 SQL
CREATE OR REPLACE FUNCTION dynamic_count(table_name VARCHAR)
RETURNS BIGINT AS $$
DECLARE
    sql_stmt TEXT;
    result BIGINT;
BEGIN
    sql_stmt := 'SELECT COUNT(*) FROM ' || quote_ident(table_name);
    EXECUTE sql_stmt INTO result;
    RETURN result;
END;
$$ LANGUAGE plpgsql;
 
-- 返回多个结果集
CREATE OR REPLACE FUNCTION get_stats()
RETURNS TABLE(
    users_count BIGINT,
    orders_count BIGINT,
    total_revenue DECIMAL
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        (SELECT COUNT(*) FROM users)::BIGINT,
        (SELECT COUNT(*) FROM orders)::BIGINT,
        (SELECT COALESCE(SUM(total), 0) FROM orders)::DECIMAL;
END;
$$ LANGUAGE plpgsql;

高级 PL/pgSQL 技巧

-- 使用数组
CREATE OR REPLACE FUNCTION get_user_permissions(user_id BIGINT)
RETURNS TEXT[] AS $$
DECLARE
    permissions TEXT[];
BEGIN
    SELECT ARRAY_AGG(permission_name) INTO permissions
    FROM user_permissions
    WHERE user_id = get_user_permissions.user_id;
    
    RETURN COALESCE(permissions, ARRAY[]::TEXT[]);
END;
$$ LANGUAGE plpgsql;
 
-- 使用 JSON/JSONB
CREATE OR REPLACE FUNCTION user_to_json(user_id BIGINT)
RETURNS JSONB AS $$
DECLARE
    result JSONB;
BEGIN
    SELECT jsonb_build_object(
        'id', id,
        'name', name,
        'email', email,
        'created_at', created_at
    ) INTO result
    FROM users
    WHERE id = user_id;
    
    RETURN result;
END;
$$ LANGUAGE plpgsql;
 
-- 递归 CTE 在函数中
CREATE OR REPLACE FUNCTION get_category_tree(parent_id BIGINT)
RETURNS TABLE(
    id BIGINT,
    name VARCHAR,
    parent_id BIGINT,
    level INT
) AS $$
BEGIN
    RETURN QUERY
    WITH RECURSIVE category_tree AS (
        SELECT id, name, parent_id, 0 AS level
        FROM categories
        WHERE id = parent_id
        
        UNION ALL
        
        SELECT c.id, c.name, c.parent_id, ct.level + 1
        FROM categories c
        JOIN category_tree ct ON c.parent_id = ct.id
        WHERE ct.level < 10
    )
    SELECT * FROM category_tree;
END;
$$ LANGUAGE plpgsql;
 
-- 批量操作
CREATE OR REPLACE FUNCTION batch_update_status(
    order_ids BIGINT[],
    new_status VARCHAR
)
RETURNS INT AS $$
DECLARE
    updated_count INT;
BEGIN
    UPDATE orders
    SET status = new_status, updated_at = NOW()
    WHERE id = ANY(order_ids);
    
    GET DIAGNOSTICS updated_count = ROW_COUNT;
    RETURN updated_count;
END;
$$ LANGUAGE plpgsql;
 
-- 窗口函数配合
CREATE OR REPLACE FUNCTION get_employee_ranking()
RETURNS TABLE(
    employee_id BIGINT,
    name VARCHAR,
    department VARCHAR,
    salary DECIMAL,
    rank BIGINT,
    department_avg DECIMAL
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        e.id, e.name, e.department, e.salary,
        RANK() OVER (PARTITION BY e.department ORDER BY e.salary DESC),
        AVG(e.salary) OVER (PARTITION BY e.department)
    FROM employees e;
END;
$$ LANGUAGE plpgsql;
 
-- 物化路径
CREATE OR REPLACE FUNCTION get_descendants(path_prefix VARCHAR)
RETURNS TABLE(id BIGINT, name VARCHAR, path VARCHAR) AS $$
BEGIN
    RETURN QUERY
    SELECT id, name, path
    FROM categories
    WHERE path LIKE path_prefix || '%';
END;
$$ LANGUAGE plpgsql;

分区表进阶

分区策略选择

-- 1. RANGE 分区(按时间)
CREATE TABLE logs (
    id BIGSERIAL,
    created_at TIMESTAMP NOT NULL,
    level VARCHAR(10),
    message TEXT
) PARTITION BY RANGE (created_at);
 
-- 按月分区
CREATE TABLE logs_2024_01 PARTITION OF logs
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
 
CREATE TABLE logs_2024_02 PARTITION OF logs
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
 
CREATE TABLE logs_2024_03 PARTITION OF logs
    FOR VALUES FROM ('2024-03-01') TO ('2024-04-01');
 
-- 按年分区
CREATE TABLE logs_2023 PARTITION OF logs
    FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');
 
-- 2. LIST 分区(按类别)
CREATE TABLE products (
    id BIGSERIAL,
    category VARCHAR(50),
    name VARCHAR(200),
    price DECIMAL
) PARTITION BY LIST (category);
 
CREATE TABLE products_electronics PARTITION OF products
    FOR VALUES IN ('electronics', 'computers', 'phones');
 
CREATE TABLE products_clothing PARTITION OF products
    FOR VALUES IN ('clothing', 'shoes', 'accessories');
 
CREATE TABLE products_other PARTITION OF products
    FOR VALUES IN (DEFAULT);
 
-- 3. HASH 分区(均匀分布)
CREATE TABLE user_sessions (
    id BIGSERIAL,
    user_id BIGINT,
    session_token VARCHAR,
    created_at TIMESTAMP
) PARTITION BY HASH (user_id);
 
-- 创建 8 个哈希分区
CREATE TABLE user_sessions_0 PARTITION OF user_sessions
    FOR VALUES WITH (MODULUS 8, REMAINDER 0);
CREATE TABLE user_sessions_1 PARTITION OF user_sessions
    FOR VALUES WITH (MODULUS 8, REMAINDER 1);
CREATE TABLE user_sessions_2 PARTITION OF user_sessions
    FOR VALUES WITH (MODULUS 8, REMAINDER 2);
CREATE TABLE user_sessions_3 PARTITION OF user_sessions
    FOR VALUES WITH (MODULUS 8, REMAINDER 3);
CREATE TABLE user_sessions_4 PARTITION OF user_sessions
    FOR VALUES WITH (MODULUS 8, REMAINDER 4);
CREATE TABLE user_sessions_5 PARTITION OF user_sessions
    FOR VALUES WITH (MODULUS 8, REMAINDER 5);
CREATE TABLE user_sessions_6 PARTITION OF user_sessions
    FOR VALUES WITH (MODULUS 8, REMAINDER 6);
CREATE TABLE user_sessions_7 PARTITION OF user_sessions
    FOR VALUES WITH (MODULUS 8, REMAINDER 7);
 
-- 4. 多列分区
CREATE TABLE sales (
    region VARCHAR(50),
    sale_date DATE,
    amount DECIMAL
) PARTITION BY RANGE (region, sale_date);
 
CREATE TABLE sales_north_2024 PARTITION OF sales
    FOR VALUES FROM (('North', '2024-01-01') TO ('North', '2025-01-01'));
 
CREATE TABLE sales_south_2024 PARTITION OF sales
    FOR VALUES FROM (('South', '2024-01-01') TO ('South', '2025-01-01'));
 
-- 5. 子分区
CREATE TABLE big_table (
    id BIGSERIAL,
    created_at DATE,
    status VARCHAR(20),
    data JSONB
) PARTITION BY RANGE (created_at);
 
CREATE TABLE big_table_2024 PARTITION OF big_table
    FOR VALUES FROM ('2024-01-01') TO ('2025-01-01')
    SUBPARTITION BY LIST (status);
 
CREATE TABLE big_table_2024_active PARTITION OF big_table_2024
    FOR VALUES IN ('active');
CREATE TABLE big_table_2024_inactive PARTITION OF big_table_2024
    FOR VALUES IN ('inactive');
CREATE TABLE big_table_2024_pending PARTITION OF big_table_2024
    FOR VALUES IN ('pending');

分区维护

-- 查看分区信息
SELECT 
    parent.relname AS parent_table,
    child.relname AS partition_name,
    pg_get_expr(child.relpartbound, child.oid) AS partition_bound
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
ORDER BY parent.relname, child.relname;
 
-- 查看分区行数
SELECT 
    child.relname AS partition,
    pg_stat_get_live_tuples(child.oid) AS row_count
FROM pg_class parent
JOIN pg_inherits ON parent.oid = pg_inherits.inhparent
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
WHERE parent.relname = 'logs';
 
-- 添加分区
ALTER TABLE logs ADD PARTITION logs_2024_04
    FOR VALUES FROM ('2024-04-01') TO ('2024-05-01');
 
-- 删除分区(同时删除数据)
ALTER TABLE logs DROP PARTITION logs_2023;
 
-- 分离分区
ALTER TABLE logs DETACH PARTITION logs_2023;
 
-- 将分离的分区变成独立表
ALTER TABLE logs_2023 SET SCHEMA archived;
ALTER TABLE logs_2023 SET logged;
 
-- 重新附加分区
ALTER TABLE logs ATTACH PARTITION logs_2023
    FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');
 
-- 分割分区
ALTER TABLE logs_2024_01 SPLIT PARTITION logs_2024_01 
    AT (ROW('2024-01-16')) INTO (
        TABLE logs_2024_01a,
        TABLE logs_2024_01b
    );
 
-- 合并分区
ALTER TABLE logs_2024_01a MERGE PARTITIONS 
    logs_2024_01a, logs_2024_01b INTO PARTITION logs_2024_01_merged;
 
-- 重建分区(改善性能)
ALTER TABLE logs DETACH PARTITION logs_2024_01;
VACUUM FULL logs_2024_01;
ALTER TABLE logs ATTACH PARTITION logs_2024_01
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

PgBouncer 连接池

安装与配置

# Ubuntu/Debian
sudo apt install pgbouncer
 
# macOS
brew install pgbouncer
# /etc/pgbouncer/pgbouncer.ini
 
[databases]
; 指向真实 PostgreSQL
mydb = host=127.0.0.1 port=5432 dbname=mydb
 
; 多个数据库
mydb2 = host=127.0.0.1 port=5432 dbname=mydb2
 
; 远程数据库
remotedb = host=remote.host.com port=5432 dbname=remotedb
 
[pgbouncer]
; 监听地址
listen_addr = 127.0.0.1
listen_port = 6432
 
; 连接池模式
; session: 客户端会话模式(默认)
; transaction: 事务模式(推荐)
; statement: 语句模式
pool_mode = transaction
 
; 最大连接数(指向 PostgreSQL)
max_client_conn = 1000
 
; 池中最大空闲连接数
; 默认等于 max_client_conn
; pool_size = 50
 
; 每个用户最大连接数
max_db_connections = 100
 
; 空闲超时(秒)
; 超过此时间未使用的连接会被关闭
idle_timeout = 600
 
; 服务器连接超时
server_connect_timeout = 15
 
; 服务器空闲超时
server_idle_timeout = 600
 
; 服务器生命周期
server_lifetime = 3600
 
; 保留备用连接
reserve_pool_size = 5
reserve_pool_timeout = 3
 
; 客户端超时
query_wait_timeout = 30
 
; 日志
log_connections = 0
log_disconnections = 0
log_pooler_errors = 1
 
; 管理命令
admin_users = postgres, pgbouncer_admin
 
; 认证类型
; md5, crypt, trust, plain
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
 
; 忽略启动参数
ignore_startup_parameters = extra_float_digits

用户列表文件

# /etc/pgbouncer/userlist.txt
"postgres" "md5abcdef1234567890abcdef12345678"
"app_user" "md5b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6"
 
# 生成密码哈希
# echo "mypassword" | pg_md5
# md5abcdef1234567890abcdef12345678

PgBouncer 管理命令

# 连接 PgBouncer 管理界面
psql -h 127.0.0.1 -p 6432 -U postgres pgbouncer
 
# 查看统计
pgbouncer SHOW STAT;
 
# 查看池
pgbouncer SHOW POOLS;
 
# 查看客户端
pgbouncer SHOW CLIENTS;
 
# 查看服务器
pgbouncer SHOW SERVERS;
 
# 查看配置
pgbouncer SHOW CONFIG;
 
# 查看版本
pgbouncer SHOW VERSION;
 
# 重新加载配置
pgbouncer RELOAD;
 
# 暂停所有新连接
pgbouncer PAUSE;
 
# 恢复连接
pgbouncer RESUME;
 
# 禁用/启用数据库
pgbouncer DISABLE mydb;
pgbouncer ENABLE mydb;
 
# 杀死客户端连接
pgbouncer KILL user_name;
 
# 杀死所有空闲连接
pgbouncer KILL IDLE;
 
# 重置统计
pgbouncer RESET STAT;
 
# 退出
pgbouncer SHUTDOWN;

Docker Compose 配置

version: "3.8"
services:
  postgres:
    image: postgres:17-alpine
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      POSTGRES_DB: mydb
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5
 
  pgbouncer:
    image: edoburu/pgbouncer:latest
    environment:
      DATABASE_URL: "postgres://postgres:password@postgres:5432/mydb"
      POOL_MODE: transaction
      MAX_CLIENT_CONN: 1000
      DEFAULT_POOL_SIZE: 25
    ports:
      - "5432:5432"
    depends_on:
      postgres:
        condition: service_healthy
 
volumes:
  postgres_data:

应用连接配置

// Node.js 连接 PgBouncer
import { Pool } from 'pg';
 
const pool = new Pool({
  host: 'localhost',       // PgBouncer 地址
  port: 6432,            // PgBouncer 端口
  database: 'mydb',
  user: 'app_user',
  password: 'password',
  max: 20,               // 应用层连接池大小
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 2000,
});
 
// 使用
const client = await pool.connect();
try {
  await client.query('SELECT * FROM users');
} finally {
  client.release();
}
# Python 连接 PgBouncer
import psycopg2
 
conn = psycopg2.connect(
    host='localhost',
    port=6432,
    database='mydb',
    user='app_user',
    password='password'
)
 
cursor = conn.cursor()
cursor.execute('SELECT * FROM users')

监控与诊断

关键监控查询

-- 1. 连接状态
SELECT 
    state,
    COUNT(*) AS count,
    ARRAY_AGG(usename ORDER BY usename) AS users
FROM pg_stat_activity
GROUP BY state;
 
-- 2. 慢查询(超过 1 秒)
SELECT 
    pid,
    now() - query_start AS duration,
    state,
    usename,
    query
FROM pg_stat_activity
WHERE state != 'idle'
    AND query_start < NOW() - INTERVAL '1 second'
ORDER BY duration DESC;
 
-- 3. 锁等待
SELECT 
    blocked_locks.pid AS blocked_pid,
    blocking_locks.pid AS blocking_pid,
    blocked_activity.usename AS blocked_user,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query AS blocked_query,
    blocking_activity.query AS blocking_query,
    blocked_activity.application_name AS blocked_app,
    blocking_activity.application_name AS blocking_app
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity 
    ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks 
    ON blocking_locks.locktype = blocked_locks.locktype
    AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
    AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
    AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
    AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
    AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
    AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
    AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
    AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
    AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
    AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity 
    ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;
 
-- 4. 表膨胀
SELECT 
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS total_size,
    pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) AS table_size,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename) 
        - pg_relation_size(schemaname||'.'||tablename)) AS indexes_size,
    n_dead_tup,
    n_live_tup,
    n_dead_tup + n_live_tup AS total_tuples,
    ROUND(n_dead_tup * 100.0 / NULLIF(n_dead_tup + n_live_tup, 0), 2) AS dead_tuple_pct
FROM pg_stat_user_tables
ORDER BY n_dead_tup DESC
LIMIT 20;
 
-- 5. 索引使用情况
SELECT 
    schemaname,
    relname AS table_name,
    indexrelname AS index_name,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch,
    pg_size_pretty(pg_relation_size(indexrelid)) AS index_size
FROM pg_stat_user_indexes
WHERE idx_scan = 0
    AND indexrelname NOT LIKE '%_pkey'
ORDER BY pg_relation_size(indexrelid) DESC;
 
-- 6. 缓冲池命中率
SELECT 
    pg_stat_bgwriter.buffers_checkpoint,
    pg_stat_bgwriter.buffers_clean,
    pg_stat_bgwriter.buffers_backend,
    pg_stat_bgwriter.buffers_alloc,
    pg_stat_bgwriter.checkpoints_timed,
    pg_stat_bgwriter.checkpoints_req,
    pg_stat_bgwriter.checkpoint_write_time,
    pg_stat_bgwriter.checkpoint_sync_time
FROM pg_stat_bgwriter;
 
-- 7. 复制状态(主库)
SELECT 
    client_addr,
    usename,
    application_name,
    state,
    sent_lsn,
    write_lsn,
    flush_lsn,
    replay_lsn,
    write_lag,
    flush_lag,
    replay_lag,
    sync_state
FROM pg_stat_replication;
 
-- 8. 复制状态(从库)
SELECT 
    pg_last_wal_receive_lsn() AS received_lsn,
    pg_last_wal_replay_lsn() AS replay_lsn,
    pg_last_wal_receive_lsn() - pg_last_wal_replay_lsn() AS replication_lag,
    pg_last_xact_replay_timestamp() AS last_replay_time;
 
-- 9. VACUUM 进度
SELECT 
    relname,
    phase,
    heap_blks_total,
    heap_blks_scanned,
    heap_blks_vacuumed,
    index_vacuum_count,
    max_dead_tuples,
    num_dead_tuples
FROM pg_stat_progress_vacuum;
 
-- 10. ANALYZE 进度
SELECT 
    relname,
    phase,
    sample_blks_total,
    sample_blks_scanned,
    sample_blks_total - sample_blks_scanned AS remaining_blks
FROM pg_stat_progress_analyze;

pg_stat_statments 扩展

-- 安装扩展
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
 
-- 查看最慢查询
SELECT 
    query,
    calls,
    total_exec_time / 1000 AS total_seconds,
    mean_exec_time AS mean_ms,
    max_exec_time AS max_ms,
    rows,
    shared_blks_hit,
    shared_blks_read,
    shared_blks_dirtied,
    shared_blks_written
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 20;
 
-- 查看最频繁查询
SELECT 
    query,
    calls,
    total_exec_time / 1000 AS total_seconds,
    rows,
    rows / NULLIF(calls, 0) AS avg_rows
FROM pg_stat_statements
ORDER BY calls DESC
LIMIT 20;
 
-- 查看消耗最多 I/O 的查询
SELECT 
    query,
    shared_blks_read,
    shared_blks_hit,
    shared_blks_dirtied,
    shared_blks_written,
    local_blks_read,
    local_blks_written,
    temp_blks_read,
    temp_blks_written
FROM pg_stat_statements
ORDER BY shared_blks_read DESC
LIMIT 20;
 
-- 重置统计
SELECT pg_stat_statements_reset();

高可用与复制

流复制配置

-- 1. 主库配置 (postgresql.conf)
# 启用 WAL 归档
wal_level = replica
max_wal_senders = 10
max_replication_slots = 10
wal_keep_size = 1GB
 
# 归档配置
archive_mode = on
archive_command = 'test ! -f /archive/%f && cp %p /archive/%f'
 
# 复制槽(确保从库能获取所有 WAL)
max_replication_slots = 10
 
# 监控
hot_standby = on
hot_standby_feedback = on
 
-- 2. 主库创建复制用户
CREATE USER replicator WITH REPLICATION ENCRYPTED PASSWORD 'replication_password';
 
-- 3. 主库配置 pg_hba.conf(允许复制)
# IPv4 本地复制
local   replication     all                                     trust
# IPv4 远程复制
host    replication     all             127.0.0.1/32            scram-sha-256
host    replication     replicator      192.168.1.0/24            scram-sha-256
 
-- 4. 从库配置 (postgresql.conf)
hot_standby = on
primary_conninfo = 'host=192.168.1.100 port=5432 user=replicator application_name=standby1'
 
-- 5. 从库创建复制槽
-- 在从库上执行
pg_basebackup -h master_host -U replicator -D /var/lib/postgresql/data -P -Xs -R
 
-- 或手动创建
touch /var/lib/postgresql/data/standby.signal
# 创建 standby.signal 使其成为从库
 
-- 6. 使用复制槽的流复制
# 主库创建复制槽
SELECT * FROM pg_create_physical_replication_slot('standby1_slot');
 
# 从库配置
primary_slot_name = 'standby1_slot'

故障转移与切换

-- 1. 检查复制状态
SELECT * FROM pg_stat_replication;
 
-- 2. 强制复制
SELECT pg_switch_wal();
 
-- 3. 等待复制到指定 LSN
SELECT pg_wait_for_lsn('0/3000000');
 
-- 4. 创建故障转移触发文件
-- 当需要故障转移时,从库上执行
SELECT pg_promote();
 
-- 或创建触发文件
CREATE TRIGGER file: /var/lib/postgresql/data/promote
-- 或
touch /var/lib/postgresql/data/promote
 
-- 5. Patroni 或 pgpool-II 用于自动故障转移

逻辑复制

-- 1. 主库配置
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
 
-- 2. 创建发布
CREATE PUBLICATION myapp_publication FOR TABLE users, orders, products;
 
-- 发布特定行
CREATE PUBLICATION active_users FOR TABLE users 
    WHERE (status = 'active');
 
-- 发布所有表
CREATE PUBLICATION full_publication FOR ALL TABLES;
 
-- 3. 从库配置
-- 创建订阅
CREATE SUBSCRIPTION myapp_subscription 
    CONNECTION 'host=master_host port=5432 dbname=mydb user=replicator'
    PUBLICATION myapp_publication
    WITH (copy_data = true);
 
-- 4. 管理订阅
ALTER SUBSCRIPTION myapp_subscription ENABLE;
ALTER SUBSCRIPTION myapp_subscription DISABLE;
ALTER SUBSCRIPTION myapp_subscription REFRESH PUBLICATION;
 
-- 5. 查看订阅状态
SELECT * FROM pg_stat_subscription;
 
-- 6. 删除订阅
DROP SUBSCRIPTION myapp_subscription;

SUCCESS

本文档全面介绍了 PostgreSQL 的核心特性、17 新功能、JSON 支持、全文搜索、向量搜索以及在 AI 应用中的实战场景。PostgreSQL 凭借其功能完备性、扩展性和稳定性,是现代 AI 应用数据层的首选方案。


完整安装与环境配置

安装方法

# macOS
brew install postgresql@17
brew services start postgresql@17
 
# Ubuntu/Debian
sudo apt update
sudo apt install postgresql-17
 
# Docker
docker run -d \
  --name postgres \
  -e POSTGRES_PASSWORD=password \
  -e POSTGRES_DB=mydb \
  -p 5432:5432 \
  postgres:17-alpine
 
# Docker Compose
version: "3.8"
services:
  postgres:
    image: postgres:17-alpine
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      POSTGRES_DB: mydb
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5
 
volumes:
  postgres_data:

客户端连接

# psql 命令行
psql -h localhost -U postgres -d mydb
 
# 常用 psql 命令
\l                          -- 列出所有数据库
\dt                         -- 列出所有表
\d table_name               -- 查看表结构
\du                         -- 列出所有用户
\di                         -- 列出所有索引
\dv                         -- 列出所有视图
\dn                         -- 列出所有 schema
\df                         -- 列出所有函数
\x                          -- 切换扩展显示模式
\i filename.sql            -- 执行 SQL 文件
\copy table TO 'file.csv' CSV HEADER  -- 导出数据
 
# 数据库操作
CREATE DATABASE mydb;
DROP DATABASE mydb;
ALTER DATABASE mydb RENAME TO newdb;

psqlrc 配置

# ~/.psqlrc
-- 显示执行时间
\timing
 
-- 扩展显示
\x auto
 
-- 别名
\pset null '(null)'
\pset format wrapped
\pset border 2
 
-- 自动提交
\set AUTOCOMMIT on
 
-- 编辑器
\set EDITOR vim
 
-- 提示符
\pset prompt1 '%[%033[1;32m%]%M/%[%033[0m%]%/%R%#%x '
\pset prompt2 '%[%033[1;32m%]%M/%/%R%#%x '

数据库设计模式

常见设计模式

-- 模式 1:单一 Schema(默认 public)
-- 适用于小型应用
CREATE SCHEMA public;
 
-- 模式 2:多 Schema 按功能划分
-- 适用于中型应用
CREATE SCHEMA sales;
CREATE SCHEMA inventory;
CREATE SCHEMA analytics;
 
-- 设置搜索路径
ALTER DATABASE mydb SET search_path TO sales, inventory, public;
 
-- 模式 3:多租户(Schema 隔离)
CREATE SCHEMA tenant_1;
CREATE SCHEMA tenant_2;
 
-- 模式 4:行级安全(RLS)
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
 
CREATE POLICY tenant_isolation ON users
    USING (tenant_id = current_setting('app.current_tenant')::uuid);

常见表结构设计

-- 1. 主从表设计
CREATE TABLE categories (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(100) NOT NULL,
    parent_id UUID REFERENCES categories(id),
    path LTREE,  -- 层级路径
    level INT DEFAULT 0,
    created_at TIMESTAMP DEFAULT NOW()
);
 
-- 2. 多对多关系
CREATE TABLE post_tags (
    post_id UUID REFERENCES posts(id) ON DELETE CASCADE,
    tag_id UUID REFERENCES tags(id) ON DELETE CASCADE,
    PRIMARY KEY (post_id, tag_id)
);
 
-- 3. 软删除设计
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email VARCHAR(255) NOT NULL UNIQUE,
    deleted_at TIMESTAMP,  -- 软删除标记
    CONSTRAINT users_deleted_at_check CHECK (deleted_at IS NULL OR deleted_at > created_at)
);
 
-- 查询时自动过滤已删除记录
CREATE VIEW active_users AS
SELECT * FROM users WHERE deleted_at IS NULL;
 
-- 4. 审计表设计
CREATE TABLE audit_log (
    id BIGSERIAL PRIMARY KEY,
    table_name VARCHAR(100) NOT NULL,
    record_id UUID NOT NULL,
    action VARCHAR(20) NOT NULL,  -- INSERT, UPDATE, DELETE
    old_data JSONB,
    new_data JSONB,
    changed_by UUID,
    changed_at TIMESTAMP DEFAULT NOW()
);
 
-- 触发器实现审计
CREATE OR REPLACE FUNCTION audit_trigger()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO audit_log (table_name, record_id, action, new_data)
        VALUES (TG_TABLE_NAME, NEW.id, 'INSERT', to_jsonb(NEW));
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO audit_log (table_name, record_id, action, old_data, new_data)
        VALUES (TG_TABLE_NAME, OLD.id, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW));
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO audit_log (table_name, record_id, action, old_data)
        VALUES (TG_TABLE_NAME, OLD.id, 'DELETE', to_jsonb(OLD));
        RETURN OLD;
    END IF;
END;
$$ LANGUAGE plpgsql;
 
CREATE TRIGGER users_audit
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_trigger();

查询优化实战

慢查询分析

-- 查看当前慢查询
SELECT 
    pid,
    now() - query_start AS duration,
    state,
    query
FROM pg_stat_activity
WHERE state != 'idle'
    AND query_start < NOW() - INTERVAL '5 minutes'
ORDER BY duration DESC;
 
-- 终止慢查询
SELECT pg_cancel_backend(pid);  -- 优雅终止
SELECT pg_terminate_backend(pid);  -- 强制终止
 
-- 查看统计信息
SELECT 
    schemaname,
    relname,
    seq_scan,
    seq_tup_read,
    idx_scan,
    idx_tup_fetch,
    n_tup_ins,
    n_tup_upd,
    n_tup_del,
    n_live_tup,
    n_dead_tup
FROM pg_stat_user_tables
ORDER BY seq_scan DESC;
 
-- 查看索引使用情况
SELECT 
    schemaname,
    tablename,
    indexname,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch,
    pg_size_pretty(pg_relation_size(indexrelid)) AS index_size
FROM pg_stat_user_indexes
ORDER BY idx_scan ASC;

索引优化

-- 1. B-tree 索引(默认)
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_posts_author_published ON posts(author_id, published_at DESC);
 
-- 2. 部分索引
CREATE INDEX idx_posts_published ON posts(published_at DESC)
WHERE published = true;
 
CREATE INDEX idx_users_active ON users(email)
WHERE is_active = true;
 
-- 3. 表达式索引
CREATE INDEX idx_users_email_lower ON users(LOWER(email));
CREATE INDEX idx_users_created_month ON users(DATE_TRUNC('month', created_at));
 
-- 4. 模糊查询索引(使用 pg_trgm)
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE INDEX idx_users_username_trgm ON users USING gin (username gin_trgm_ops);
 
-- 模糊查询示例
SELECT * FROM users WHERE username LIKE '%john%';
 
-- 5. JSONB 索引
CREATE INDEX idx_orders_metadata ON orders USING gin (metadata jsonb_path_ops);
CREATE INDEX idx_users_data_email ON users((data->>'email'));
 
-- 6. 向量索引
CREATE INDEX idx_embeddings_vector ON embeddings 
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
 
-- 7. 复合索引顺序
-- 假设查询: WHERE status = 'active' AND created_at > '2024-01-01'
-- 正确顺序:选择性高的放前面
CREATE INDEX idx_users_status_created ON users(status, created_at DESC);
 
-- 8. 索引维护
REINDEX INDEX CONCURRENTLY idx_users_email;
VACUUM ANALYZE users;
 
-- 查看未使用的索引
SELECT 
    schemaname || '.' || tablename AS table,
    indexname,
    pg_size_pretty(pg_relation_size(indexrelid)) AS index_size,
    idx_scan
FROM pg_stat_user_indexes
WHERE idx_scan = 0
    AND NOT indexname LIKE '%_pkey'
ORDER BY pg_relation_size(indexrelid) DESC;

查询优化技巧

-- 1. 使用 EXPLAIN ANALYZE
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM users WHERE email = 'test@example.com';
 
-- 2. 避免 SELECT *
SELECT id, email FROM users WHERE id = 1;
 
-- 3. 使用 LIMIT 避免大结果集
SELECT * FROM posts ORDER BY created_at LIMIT 100;
 
-- 4. 批量插入
INSERT INTO users (email, username) 
VALUES 
    ('user1@example.com', 'user1'),
    ('user2@example.com', 'user2'),
    ('user3@example.com', 'user3');
 
-- 5. 使用 CTE 优化复杂查询
WITH recent_posts AS (
    SELECT id, title, author_id 
    FROM posts 
    WHERE published_at > NOW() - INTERVAL '30 days'
)
SELECT 
    u.username,
    COUNT(rp.id) AS post_count
FROM users u
JOIN recent_posts rp ON rp.author_id = u.id
GROUP BY u.username
ORDER BY post_count DESC;
 
-- 6. 使用窗口函数替代子查询
-- ❌ 低效
SELECT 
    u.username,
    (SELECT COUNT(*) FROM posts WHERE author_id = u.id) AS post_count
FROM users u;
 
-- ✅ 高效
SELECT 
    username,
    COUNT(*) OVER (PARTITION BY author_id) AS post_count
FROM posts p
JOIN users u ON u.id = p.author_id;
 
-- 7. 分页优化
-- ❌ 偏移分页(慢)
SELECT * FROM posts ORDER BY id LIMIT 100 OFFSET 10000;
 
-- ✅ 游标分页(快)
SELECT * FROM posts 
WHERE id > 10000 
ORDER BY id 
LIMIT 100;
 
-- ✅ Keyset 分页
SELECT * FROM posts 
WHERE published_at < '2024-01-01' 
ORDER BY published_at DESC 
LIMIT 100;

事务与并发控制

事务隔离级别

-- 设置隔离级别
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
 
-- 开始事务
BEGIN;
 
-- 保存点
SAVEPOINT sp1;
 
-- 回滚到保存点
ROLLBACK TO SAVEPOINT sp1;
 
-- 提交
COMMIT;
 
-- 回滚
ROLLBACK;

行级锁

-- 1. 悲观锁
SELECT * FROM users WHERE id = 1 FOR UPDATE;
 
-- 2. 乐观锁(使用版本号)
UPDATE users 
SET email = 'new@example.com', version = version + 1
WHERE id = 1 AND version = 5;
 
-- 检查更新是否成功
-- 如果 version != 5,说明被其他事务修改,需要重试
 
-- 3. SKIP LOCKED(并发处理)
SELECT * FROM orders 
WHERE status = 'pending' 
ORDER BY created_at 
LIMIT 10 
FOR UPDATE SKIP LOCKED;
 
-- 4. NOWAIT(立即报错)
SELECT * FROM users WHERE id = 1 FOR UPDATE NOWAIT;
 
-- 死锁检测
SELECT 
    pg_blocking_pids(p.pid) AS blocked_by,
    p.pid,
    p.query
FROM pg_stat_activity p
WHERE pg_blocking_pids(p.pid) IS NOT NULL
ORDER BY p.pid;

并发控制模式

-- 模式 1:基于数据库锁
BEGIN;
SELECT * FROM inventory WHERE product_id = 100 FOR UPDATE;
-- 检查库存
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 100;
COMMIT;
 
-- 模式 2:原子更新
UPDATE inventory 
SET quantity = quantity - 1 
WHERE product_id = 100 AND quantity > 0
RETURNING *;
 
-- 检查是否成功
-- 如果没有返回行,说明库存不足
 
-- 模式 3:序列号
UPDATE accounts 
SET balance = balance - 100 
WHERE id = 1 AND balance >= 100
RETURNING *;

备份与恢复

备份类型

PostgreSQL 支持多种备份策略,适用于不同的场景:

备份类型工具特点适用场景
物理备份pg_basebackup快速、完整复制数据文件灾难恢复
逻辑备份pg_dump跨版本、可选择性恢复应用级恢复
持续归档WAL + pg_receivewal零数据丢失 (PITR)企业级需求
增量备份pgBackRest高效、压缩、去重大规模部署

逻辑备份(pg_dump)

# 备份单个数据库
pg_dump -U postgres -d mydb -F c -f backup.dump
 
# 备份所有数据库
pg_dumpall -U postgres -f all_databases.sql
 
# 备份特定表
pg_dump -U postgres -d mydb -t users -t posts -f tables.sql
 
# 备份结构(不包含数据)
pg_dump -U postgres -d mydb --schema-only -f schema.sql
 
# 备份数据(不包含结构)
pg_dump -U postgres -d mydb --data-only -f data.sql
 
# 压缩备份
pg_dump -U postgres -d mydb -F c | gzip > backup.dump.gz
 
# 并行备份(大型数据库)
pg_dump -U postgres -d mydb -j 4 -F d -f /backup/directory

恢复操作

# 恢复纯 SQL 格式备份
psql -U postgres -d mydb -f backup.sql
 
# 恢复压缩格式备份
pg_restore -U postgres -d mydb -c backup.dump
 
# 恢复前先创建数据库
createdb -U postgres newdb
pg_restore -U postgres -d newdb backup.dump
 
# 恢复特定表
pg_restore -U postgres -d mydb -t users backup.dump
 
# 恢复到指定时间点(需要 WAL 归档)
pg_restore -U postgres -d mydb --point-in-time="2024-01-15 10:00:00" backup.dump

物理备份(pg_basebackup)

# 基础备份
pg_basebackup -U postgres -D /backup/base -Ft -z -P -v
 
# 备份到远程服务器
pg_basebackup -U postgres -h primary_host -D /backup/base -Ft -z -P
 
# 使用复制槽(确保 WAL 不丢失)
pg_basebackup -U postgres -D /backup/base -Ft -z -P -X stream -S my_slot
 
# 查看备份状态
ls -la /backup/base/

持续归档与 PITR

# 配置 postgresql.conf 启用 WAL 归档
# wal_level = replica
# archive_mode = on
# archive_command = 'test ! -f /archive/%f && cp %p /archive/%f'
 
# 手动切换 WAL
psql -U postgres -c "SELECT pg_switch_wal();"
 
# 归档当前 WAL
psql -U postgres -c "SELECT pg_start_backup('backup_label');"
pg_basebackup -U postgres -D /backup/base -Ft -z -P
psql -U postgres -c "SELECT pg_stop_backup();"
 
# 恢复到指定时间点
cat > recovery.conf << 'EOF'
restore_command = 'cp /archive/%f %p'
recovery_target_time = '2024-01-15 10:00:00 UTC'
recovery_target_action = 'promote'
EOF

备份脚本示例

#!/bin/bash
# backup.sh - PostgreSQL 备份脚本
 
set -e
 
BACKUP_DIR="/var/backups/postgresql"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=30
 
# 创建备份目录
mkdir -p "$BACKUP_DIR"
 
# 逻辑备份
echo "Starting logical backup..."
pg_dump -U postgres -d mydb -F c -Z 5 -f "$BACKUP_DIR/mydb_$DATE.dump"
 
# 保留最近 N 天的备份
find "$BACKUP_DIR" -name "*.dump" -mtime +$RETENTION_DAYS -delete
 
# 备份完成
echo "Backup completed: mydb_$DATE.dump"
ls -lh "$BACKUP_DIR"

Python 集成

psycopg2 使用指南

import psycopg2
from psycopg2 import pool
from contextlib import contextmanager
 
# 基本连接
conn = psycopg2.connect(
    host="localhost",
    database="mydb",
    user="postgres",
    password="password",
    port=5432
)
 
# 创建游标
cursor = conn.cursor()
 
# 执行查询
cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
result = cursor.fetchone()
 
# 事务处理
try:
    cursor.execute("BEGIN")
    cursor.execute("INSERT INTO users (name, email) VALUES (%s, %s)", ("Alice", "alice@example.com"))
    cursor.execute("INSERT INTO orders (user_id, total) VALUES (%s, %s)", (1, 99.99))
    conn.commit()
except Exception as e:
    conn.rollback()
    print(f"Error: {e}")
 
# 使用 with 语句(自动管理)
with psycopg2.connect(conn_string) as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM users LIMIT 10")
        users = cur.fetchall()
 
# 连接池
connection_pool = pool.ThreadedConnectionPool(
    minconn=5,
    maxconn=20,
    host="localhost",
    database="mydb",
    user="postgres",
    password="password"
)
 
@contextmanager
def get_connection():
    conn = connection_pool.getconn()
    try:
        yield conn
        conn.commit()
    except:
        conn.rollback()
        raise
    finally:
        connection_pool.putconn(conn)
 
# 使用连接池
with get_connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT COUNT(*) FROM users")
        count = cur.fetchone()[0]

SQLAlchemy ORM

from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
 
# 创建引擎
engine = create_engine('postgresql://postgres:password@localhost:5432/mydb')
Session = sessionmaker(bind=engine)
Base = declarative_base()
 
# 定义模型
class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    email = Column(String(255), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    def __repr__(self):
        return f"<User(name='{self.name}', email='{self.email}')>"
 
# 创建表
Base.metadata.create_all(engine)
 
# CRUD 操作
session = Session()
 
# 创建
user = User(name='Alice', email='alice@example.com')
session.add(user)
session.commit()
 
# 读取
user = session.query(User).filter_by(email='alice@example.com').first()
users = session.query(User).filter(User.name.like('A%')).all()
 
# 更新
user.name = 'Alice Smith'
session.commit()
 
# 删除
session.delete(user)
session.commit()
 
# 复杂查询
from sqlalchemy import func, and_, or_
 
# 聚合查询
stats = session.query(
    func.count(User.id).label('count'),
    func.avg(func.length(User.name)).label('avg_name_length')
).first()
 
# 联表查询
from sqlalchemy.orm import relationship
 
class Post(Base):
    __tablename__ = 'posts'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    title = Column(String(200))
    
    author = relationship("User", back_populates="posts")
 
User.posts = relationship("Post", back_populates="author")
 
# 使用联表
session.query(User, Post).join(Post, User.id == Post.user_id).filter(
    User.name == 'Alice'
).all()

asyncpg(异步)

import asyncio
import asyncpg
 
async def main():
    # 连接
    pool = await asyncpg.create_pool(
        host='localhost',
        database='mydb',
        user='postgres',
        password='password',
        min_size=5,
        max_size=20
    )
    
    async with pool.acquire() as conn:
        # 查询
        row = await conn.fetchrow(
            'SELECT * FROM users WHERE id = $1',
            1
        )
        print(row)
        
        # 批量查询
        rows = await conn.fetch(
            'SELECT * FROM users WHERE id = ANY($1)',
            [1, 2, 3, 4, 5]
        )
        
        # 执行
        await conn.execute('''
            INSERT INTO users (name, email) VALUES ($1, $2)
        ''', 'Bob', 'bob@example.com')
        
        # 事务
        async with conn.transaction():
            await conn.execute('INSERT INTO orders (user_id, total) VALUES ($1, $2)', 1, 99.99)
            await conn.execute('UPDATE users SET name = $1 WHERE id = $2', 'Bobby', 1)
    
    await pool.close()
 
asyncio.run(main())

常见陷阱与最佳实践

陷阱 1:滥用 JSONB

-- ❌ 错误:所有数据都用 JSONB
CREATE TABLE orders (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    data JSONB  -- 不推荐,应该用列
);
 
-- ✅ 正确:结构化数据用列,动态数据用 JSONB
CREATE TABLE orders (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_id UUID NOT NULL,
    total_amount DECIMAL(10, 2) NOT NULL,
    status VARCHAR(20) NOT NULL,
    metadata JSONB,  -- 用于存储可选的动态数据
    created_at TIMESTAMP DEFAULT NOW()
);

陷阱 2:忽视索引维护

-- ❌ 错误:不维护索引
-- 随着数据增长,索引会变得臃肿
 
-- ✅ 正确:定期维护
-- 1. 重建膨胀的索引
REINDEX INDEX CONCURRENTLY idx_orders_customer_id;
 
-- 2. 更新统计信息
ANALYZE orders;
 
-- 3. 清理死亡元组
VACUUM orders;
VACUUM (VERBOSE, ANALYZE) orders;
 
-- 4. 使用 pg_stat_user_indexes 监控
SELECT 
    indexname,
    idx_scan,
    pg_size_pretty(pg_relation_size(indexrelid)) AS size
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan;

陷阱 3:错误的分页方式

-- ❌ 错误:OFFSET 大值时很慢
SELECT * FROM posts ORDER BY id LIMIT 20 OFFSET 100000;
 
-- ✅ 正确:使用游标分页
-- 首次查询
SELECT * FROM posts ORDER BY id LIMIT 20;
 
-- 下一页
SELECT * FROM posts 
WHERE id > 20 
ORDER BY id 
LIMIT 20;
 
-- ✅ 使用时间戳分页
SELECT * FROM posts 
WHERE created_at < '2024-01-01'
ORDER BY created_at DESC 
LIMIT 20;

最佳实践清单

  1. 合理使用数据类型:选择最合适的数据类型。
  2. 添加适当索引:为常用查询添加索引,避免过多索引。
  3. 定期维护:VACUUM、ANALYZE、REINDEX。
  4. 监控慢查询:使用 pg_stat_statements。
  5. 备份策略:使用 pg_dump + WAL 归档。
  6. 连接池:使用 PgBouncer 或 PgPool-II。
  7. 分区表:大表使用分区策略。
  8. 复制配置:主从复制保证高可用。
  9. 安全配置:使用 SSL、RLS、最小权限原则。
  10. 版本升级:定期升级到最新稳定版。

与其他数据库对比

PostgreSQL vs MySQL

特性PostgreSQLMySQL
SQL 标准完全遵循部分遵循
事务ACID 完全支持ACID 支持
并发控制MVCCMVCC + 锁
索引类型多种B-tree 为主
JSON 支持jsonb(高性能)JSON(一般)
向量搜索pgvector8.0+ 原生支持
全文搜索内置内置
地理信息PostGISMySQL Spatial
复制流复制主从/组复制
分区原生支持原生支持

PostgreSQL vs MongoDB

特性PostgreSQLMongoDB
模型关系型文档型
事务ACIDACID(副本集/分片)
扩展性垂直扩展水平扩展
JSONjsonb原生文档
JOIN完整支持有限支持
一致性强一致可调一致性
查询SQLMQL

PostgreSQL vs SQLite

特性PostgreSQLSQLite
类型服务器嵌入式
并发多连接文件锁
性能高并发优秀单机优秀
容量无限制TB 级
部署复杂简单
场景企业应用嵌入式/移动