AI应用API化部署

摘要

本文档详细介绍如何将AI应用封装为API服务进行部署,包括REST API设计、认证授权机制、限流策略、Webhook配置、多版本管理以及灰度发布策略。通过实战代码和架构设计,帮助读者构建生产级别的AI API服务。

核心关键词速览

关键词说明关键词说明
REST APIREST风格接口JWT认证JSON Web Token
API Key密钥认证限流策略Rate Limiting
Webhook回调通知API版本Versioning
灰度发布Canary ReleaseOpenAPIAPI规范
SwaggerAPI文档API网关API Gateway

1. API设计原则

1.1 RESTful设计规范

graph LR
    A[客户端] -->|POST /v1/agents| B[创建Agent]
    A -->|GET /v1/agents| C[列出Agent]
    A -->|GET /v1/agents/{id}| D[获取Agent]
    A -->|PUT /v1/agents/{id}| E[更新Agent]
    A -->|DELETE /v1/agents/{id}| F[删除Agent]
    A -->|POST /v1/agents/{id}/chat| G[对话]

1.2 API端点设计

# API端点设计规范
endpoints:
  # Agent管理
  POST /v1/agents              # 创建Agent
  GET /v1/agents               # 列出Agent
  GET /v1/agents/{id}          # 获取Agent详情
  PUT /v1/agents/{id}          # 更新Agent
  DELETE /v1/agents/{id}        # 删除Agent
  
  # 对话接口
  POST /v1/agents/{id}/chat    # 发送消息
  GET /v1/agents/{id}/history  # 获取历史
  
  # 知识库管理
  POST /v1/knowledge-bases      # 创建知识库
  POST /v1/knowledge-bases/{id}/documents  # 上传文档
  
  # Webhook配置
  POST /v1/webhooks             # 创建Webhook
  GET /v1/webhooks             # 列出Webhook
  PUT /v1/webhooks/{id}        # 更新Webhook

1.3 请求响应格式

// 标准请求格式
{
  "request_id": "uuid-v4",
  "timestamp": "ISO-8601",
  "data": {
    // 业务数据
  }
}
 
// 标准响应格式 - 成功
{
  "success": true,
  "data": {
    // 响应数据
  },
  "request_id": "uuid-v4",
  "timestamp": "ISO-8601"
}
 
// 标准响应格式 - 错误
{
  "success": false,
  "error": {
    "code": "INVALID_PARAMETER",
    "message": "参数验证失败",
    "details": [
      {"field": "name", "message": "名称不能为空"}
    ]
  },
  "request_id": "uuid-v4",
  "timestamp": "ISO-8601"
}

2. FastAPI框架实现

2.1 项目结构

ai_api/
├── app/
│   ├── __init__.py
│   ├── main.py              # 应用入口
│   ├── api/
│   │   ├── __init__.py
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── agents.py    # Agent接口
│   │   │   ├── chat.py      # 对话接口
│   │   │   └── knowledge.py # 知识库接口
│   │   └── deps.py          # 依赖注入
│   ├── core/
│   │   ├── config.py        # 配置
│   │   ├── security.py     # 安全认证
│   │   └── rate_limit.py   # 限流
│   ├── models/             # 数据模型
│   ├── services/           # 业务逻辑
│   └── schemas/            # Pydantic模型
├── requirements.txt
└── Dockerfile

2.2 应用入口

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
 
from app.api.v1 import agents, chat, knowledge
from app.core.config import settings
from app.core.security import auth_middleware
from app.core.rate_limit import RateLimitMiddleware
 
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时初始化
    await initialize_services()
    yield
    # 关闭时清理
    await cleanup_services()
 
app = FastAPI(
    title="AI Agent API",
    description="AI智能体API服务",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc",
    lifespan=lifespan
)
 
# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.CORS_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
 
# 限流中间件
app.add_middleware(RateLimitMiddleware)
 
# 认证中间件
app.middleware("http")(auth_middleware)
 
# 注册路由
app.include_router(
    agents.router,
    prefix="/v1/agents",
    tags=["Agent管理"]
)
app.include_router(
    chat.router,
    prefix="/v1/chat",
    tags=["对话接口"]
)
app.include_router(
    knowledge.router,
    prefix="/v1/knowledge",
    tags=["知识库"]
)

2.3 Agent管理接口

# app/api/v1/agents.py
from fastapi import APIRouter, Depends, HTTPException, status
from typing import List
from uuid import uuid4
 
from app.api.deps import get_current_user, get_db
from app.schemas.agent import AgentCreate, AgentUpdate, AgentResponse
from app.services.agent_service import AgentService
 
router = APIRouter()
 
@router.post("/", response_model=AgentResponse, status_code=status.HTTP_201_CREATED)
async def create_agent(
    agent_data: AgentCreate,
    current_user = Depends(get_current_user),
    db = Depends(get_db)
):
    """创建新的Agent"""
    service = AgentService(db)
    
    agent = await service.create(
        user_id=current_user.id,
        data=agent_data
    )
    
    return AgentResponse.from_orm(agent)
 
@router.get("/", response_model=List[AgentResponse])
async def list_agents(
    skip: int = 0,
    limit: int = 20,
    current_user = Depends(get_current_user),
    db = Depends(get_db)
):
    """列出用户的所有Agent"""
    service = AgentService(db)
    agents = await service.list_by_user(
        user_id=current_user.id,
        skip=skip,
        limit=limit
    )
    return [AgentResponse.from_orm(a) for a in agents]
 
@router.get("/{agent_id}", response_model=AgentResponse)
async def get_agent(
    agent_id: str,
    current_user = Depends(get_current_user),
    db = Depends(get_db)
):
    """获取Agent详情"""
    service = AgentService(db)
    agent = await service.get_by_id(agent_id)
    
    if not agent or agent.user_id != current_user.id:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Agent不存在"
        )
    
    return AgentResponse.from_orm(agent)
 
@router.delete("/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_agent(
    agent_id: str,
    current_user = Depends(get_current_user),
    db = Depends(get_db)
):
    """删除Agent"""
    service = AgentService(db)
    success = await service.delete(agent_id, current_user.id)
    
    if not success:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Agent不存在"
        )

2.4 对话接口

# app/api/v1/chat.py
from fastapi import APIRouter, Depends, BackgroundTasks
from typing import Optional
 
from app.api.deps import get_current_user, get_db
from app.schemas.chat import ChatRequest, ChatResponse, StreamResponse
from app.services.chat_service import ChatService
 
router = APIRouter()
 
@router.post("/", response_model=ChatResponse)
async def chat(
    agent_id: str,
    request: ChatRequest,
    current_user = Depends(get_current_user),
    db = Depends(get_db)
):
    """同步对话接口"""
    service = ChatService(db)
    
    response = await service.chat(
        agent_id=agent_id,
        user_id=current_user.id,
        message=request.message,
        context=request.context
    )
    
    return ChatResponse(
        message_id=response.message_id,
        content=response.content,
        metadata=response.metadata
    )
 
@router.post("/stream")
async def chat_stream(
    agent_id: str,
    request: ChatRequest,
    current_user = Depends(get_current_user)
):
    """流式对话接口"""
    from fastapi.responses import StreamingResponse
    
    async def generate():
        service = ChatService(None)
        async for chunk in service.chat_stream(
            agent_id=agent_id,
            user_id=current_user.id,
            message=request.message
        ):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )
 
@router.post("/async")
async def chat_async(
    agent_id: str,
    request: ChatRequest,
    background_tasks: BackgroundTasks,
    current_user = Depends(get_current_user)
):
    """异步对话接口(Webhook回调)"""
    from app.core.webhook import send_webhook
    
    # 生成任务ID
    task_id = str(uuid4())
    
    # 异步处理
    background_tasks.add_task(
        process_chat_async,
        task_id=task_id,
        agent_id=agent_id,
        user_id=current_user.id,
        message=request.message,
        webhook_url=request.webhook_url
    )
    
    return {"task_id": task_id, "status": "processing"}

3. 认证与授权

3.1 JWT认证

# app/core/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, Request, status
 
from app.core.config import settings
 
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 
def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证密码"""
    return pwd_context.verify(plain_password, hashed_password)
 
def get_password_hash(password: str) -> str:
    """哈希密码"""
    return pwd_context.hash(password)
 
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """创建JWT Token"""
    to_encode = data.copy()
    
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(hours=24)
    
    to_encode.update({"exp": expire, "iat": datetime.utcnow()})
    encoded_jwt = jwt.encode(
        to_encode, 
        settings.SECRET_KEY, 
        algorithm=settings.ALGORITHM
    )
    return encoded_jwt
 
def decode_token(token: str) -> dict:
    """解码Token"""
    try:
        payload = jwt.decode(
            token, 
            settings.SECRET_KEY, 
            algorithms=[settings.ALGORITHM]
        )
        return payload
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token无效",
            headers={"WWW-Authenticate": "Bearer"},
        )
 
async def auth_middleware(request: Request, call_next):
    """认证中间件"""
    # 排除公开接口
    if request.url.path in ["/docs", "/redoc", "/openapi.json", "/health"]:
        return await call_next(request)
    
    # 获取Token
    auth_header = request.headers.get("Authorization")
    if not auth_header or not auth_header.startswith("Bearer "):
        # 允许访问,继续处理
        return await call_next(request)
    
    token = auth_header.split(" ")[1]
    try:
        payload = decode_token(token)
        request.state.user = payload
    except HTTPException:
        pass  # Token无效但不阻止公开接口
    
    return await call_next(request)

3.2 API Key认证

# app/core/api_key.py
from typing import Optional
from fastapi import Security, HTTPException, status
from fastapi.security import APIKeyHeader
 
from app.core.config import settings
 
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
 
async def verify_api_key(api_key: Optional[str] = Security(api_key_header)) -> dict:
    """验证API Key"""
    if not api_key:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="缺少API Key"
        )
    
    # 查询API Key对应的应用
    app = await get_app_by_api_key(api_key)
    if not app:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的API Key"
        )
    
    # 检查应用状态
    if not app.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="API Key已被禁用"
        )
    
    return {"app_id": app.id, "permissions": app.permissions}
 
class APIKeyManager:
    """API Key管理器"""
    
    @staticmethod
    async def create_key(app_id: str, name: str, permissions: list) -> tuple:
        """创建新的API Key"""
        import secrets
        key = f"sk_{secrets.token_urlsafe(32)}"
        
        # 存储Key哈希
        key_hash = hash_api_key(key)
        await db.api_keys.create(
            app_id=app_id,
            name=name,
            key_hash=key_hash,
            permissions=permissions
        )
        
        return key, key_hash
    
    @staticmethod
    def hash_api_key(key: str) -> str:
        """哈希API Key"""
        return hashlib.sha256(key.encode()).hexdigest()

4. 限流策略

4.1 限流实现

# app/core/rate_limit.py
from fastapi import Request, HTTPException, status
from fastapi.middleware.base import BaseHTTPMiddleware
from typing import Dict
import time
from collections import defaultdict
 
class RateLimitMiddleware(BaseHTTPMiddleware):
    """限流中间件"""
    
    def __init__(self, app, requests_per_minute: int = 60):
        super().__init__(app)
        self.requests_per_minute = requests_per_minute
        self.window_size = 60  # 1分钟窗口
        self.requests: Dict[str, list] = defaultdict(list)
    
    async def dispatch(self, request: Request, call_next):
        # 获取客户端标识
        client_id = self._get_client_id(request)
        
        # 检查限流
        if not self._check_rate_limit(client_id):
            raise HTTPException(
                status_code=status.HTTP_429_TOO_MANY_REQUESTS,
                detail="请求过于频繁,请稍后再试",
                headers={
                    "Retry-After": str(self.window_size),
                    "X-RateLimit-Limit": str(self.requests_per_minute),
                    "X-RateLimit-Remaining": str(self._get_remaining(client_id)),
                }
            )
        
        response = await call_next(request)
        
        # 添加限流头
        response.headers["X-RateLimit-Limit"] = str(self.requests_per_minute)
        response.headers["X-RateLimit-Remaining"] = str(self._get_remaining(client_id))
        
        return response
    
    def _get_client_id(self, request: Request) -> str:
        """获取客户端标识"""
        # 优先使用API Key
        api_key = request.headers.get("X-API-Key")
        if api_key:
            return f"api_key:{hashlib.md5(api_key.encode()).hexdigest()}"
        
        # 使用IP
        return f"ip:{request.client.host}"
    
    def _check_rate_limit(self, client_id: str) -> bool:
        """检查限流"""
        now = time.time()
        window_start = now - self.window_size
        
        # 清理过期记录
        self.requests[client_id] = [
            t for t in self.requests[client_id] if t > window_start
        ]
        
        # 检查是否超限
        if len(self.requests[client_id]) >= self.requests_per_minute:
            return False
        
        # 记录请求
        self.requests[client_id].append(now)
        return True
    
    def _get_remaining(self, client_id: str) -> int:
        """获取剩余请求数"""
        return max(0, self.requests_per_minute - len(self.requests[client_id]))

4.2 不同限流策略

# app/core/rate_limit_strategies.py
from abc import ABC, abstractmethod
from typing import Optional
import redis.asyncio as redis
 
class RateLimitStrategy(ABC):
    """限流策略基类"""
    
    @abstractmethod
    async def is_allowed(self, key: str) -> bool:
        pass
 
class TokenBucket(RateLimitStrategy):
    """令牌桶算法"""
    def __init__(self, rate: float, capacity: int, redis_client: redis.Redis):
        self.rate = rate  # 每秒添加的令牌数
        self.capacity = capacity
        self.redis = redis_client
    
    async def is_allowed(self, key: str) -> bool:
        lua_script = """
        local key = KEYS[1]
        local rate = tonumber(ARGV[1])
        local capacity = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])
        
        local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
        local tokens = tonumber(bucket[1]) or capacity
        local last_update = tonumber(bucket[2]) or now
        
        local added = (now - last_update) * rate
        tokens = math.min(capacity, tokens + added)
        
        if tokens >= 1 then
            tokens = tokens - 1
            redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
            redis.call('EXPIRE', key, 3600)
            return 1
        else
            redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
            redis.call('EXPIRE', key, 3600)
            return 0
        end
        """
        
        result = await self.redis.eval(
            lua_script, 1, key,
            self.rate, self.capacity, time.time()
        )
        return bool(result)
 
class SlidingWindow(RateLimitStrategy):
    """滑动窗口算法"""
    def __init__(self, max_requests: int, window_seconds: int, redis_client: redis.Redis):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.redis = redis_client
    
    async def is_allowed(self, key: str) -> bool:
        now = time.time()
        window_start = now - self.window_seconds
        
        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(key, 0, window_start)
        pipe.zcard(key)
        pipe.zadd(key, {str(now): now})
        pipe.expire(key, self.window_seconds)
        results = await pipe.execute()
        
        return results[1] < self.max_requests

5. Webhook配置

5.1 Webhook处理

# app/core/webhook.py
from typing import Callable, Dict, Any
import httpx
import asyncio
import hmac
import hashlib
import json
from datetime import datetime
 
class WebhookManager:
    """Webhook管理器"""
    
    def __init__(self):
        self.handlers: Dict[str, Callable] = {}
        self.retry_config = {
            "max_retries": 3,
            "retry_delay": 5,  # 秒
            "timeout": 30
        }
    
    def register(self, event_type: str, handler: Callable):
        """注册Webhook处理器"""
        self.handlers[event_type] = handler
    
    async def trigger(
        self,
        webhook_url: str,
        event_type: str,
        payload: dict,
        secret: str = None
    ) -> bool:
        """触发Webhook"""
        headers = {
            "Content-Type": "application/json",
            "X-Webhook-Event": event_type,
            "X-Webhook-Timestamp": datetime.utcnow().isoformat()
        }
        
        # 生成签名
        if secret:
            signature = self._generate_signature(payload, secret)
            headers["X-Webhook-Signature"] = signature
        
        for attempt in range(self.retry_config["max_retries"]):
            try:
                async with httpx.AsyncClient() as client:
                    response = await client.post(
                        webhook_url,
                        json=payload,
                        headers=headers,
                        timeout=self.retry_config["timeout"]
                    )
                    
                if response.status_code == 200:
                    return True
                    
            except (httpx.TimeoutException, httpx.RequestError) as e:
                if attempt < self.retry_config["max_retries"] - 1:
                    await asyncio.sleep(self.retry_config["retry_delay"] * (attempt + 1))
        
        return False
    
    def _generate_signature(self, payload: dict, secret: str) -> str:
        """生成签名"""
        body = json.dumps(payload, sort_keys=True)
        signature = hmac.new(
            secret.encode(),
            body.encode(),
            hashlib.sha256
        ).hexdigest()
        return f"sha256={signature}"
 
# Webhook事件处理
async def handle_chat_completed(webhook_url: str, data: dict):
    """处理对话完成事件"""
    manager = WebhookManager()
    
    payload = {
        "event": "chat.completed",
        "timestamp": datetime.utcnow().isoformat(),
        "data": {
            "message_id": data["message_id"],
            "agent_id": data["agent_id"],
            "content": data["content"],
            "metadata": data.get("metadata", {})
        }
    }
    
    success = await manager.trigger(
        webhook_url,
        "chat.completed",
        payload,
        secret=data.get("webhook_secret")
    )
    
    return success

6. 多版本与灰度

6.1 API版本管理

# app/api/v1/__init__.py
from fastapi import APIRouter
 
from app.api.v1 import agents, chat, knowledge
 
api_router = APIRouter()
 
api_router.include_router(agents.router)
api_router.include_router(chat.router)
api_router.include_router(knowledge.router)
 
# 版本信息
VERSION = "1.0.0"
API_VERSIONS = ["v1", "v2-beta"]

6.2 灰度发布

# app/core/canary.py
from typing import Callable
import random
import hashlib
 
class CanaryRouter:
    """灰度路由"""
    
    def __init__(self):
        self.rules = []
    
    def add_rule(
        self,
        name: str,
        condition: Callable[[dict], bool],
        handler: Callable,
        weight: float = 0.0
    ):
        """添加灰度规则"""
        self.rules.append({
            "name": name,
            "condition": condition,
            "handler": handler,
            "weight": weight
        })
    
    async def route(self, request: dict, default_handler: Callable) -> Any:
        """路由请求"""
        # 按权重灰度
        for rule in self.rules:
            if rule["weight"] > 0:
                if self._check_weight(request, rule["name"], rule["weight"]):
                    return await rule["handler"](request)
        
        # 按条件灰度
        for rule in self.rules:
            if rule["condition"](request):
                return await rule["handler"](request)
        
        return await default_handler(request)
    
    def _check_weight(self, request: dict, rule_name: str, weight: float) -> bool:
        """检查权重"""
        # 使用用户ID或IP做一致性哈希
        user_id = request.get("user_id", request.get("ip", ""))
        hash_value = int(
            hashlib.md5(f"{rule_name}:{user_id}".encode()).hexdigest(),
            16
        )
        return (hash_value % 100) < (weight * 100)
 
# 配置灰度规则
canary_router = CanaryRouter()
 
# 10%流量灰度到新版本
canary_router.add_rule(
    name="new_model_v2",
    condition=lambda r: False,  # 不用条件,用权重
    handler=lambda r: process_with_new_model(r),
    weight=0.1  # 10%
)
 
# 特定用户灰度
canary_router.add_rule(
    name="premium_users",
    condition=lambda r: r.get("user_tier") == "premium",
    handler=lambda r: process_with_new_model(r),
    weight=1.0  # 100%
)

7. 相关资源


本文档由归愚知识系统自动生成 last updated: 2026-04-18