AI应用API化部署
摘要
本文档详细介绍如何将AI应用封装为API服务进行部署,包括REST API设计、认证授权机制、限流策略、Webhook配置、多版本管理以及灰度发布策略。通过实战代码和架构设计,帮助读者构建生产级别的AI API服务。
核心关键词速览
| 关键词 | 说明 | 关键词 | 说明 |
|---|---|---|---|
| REST API | REST风格接口 | JWT认证 | JSON Web Token |
| API Key | 密钥认证 | 限流策略 | Rate Limiting |
| Webhook | 回调通知 | API版本 | Versioning |
| 灰度发布 | Canary Release | OpenAPI | API规范 |
| Swagger | API文档 | API网关 | API Gateway |
1. API设计原则
1.1 RESTful设计规范
graph LR A[客户端] -->|POST /v1/agents| B[创建Agent] A -->|GET /v1/agents| C[列出Agent] A -->|GET /v1/agents/{id}| D[获取Agent] A -->|PUT /v1/agents/{id}| E[更新Agent] A -->|DELETE /v1/agents/{id}| F[删除Agent] A -->|POST /v1/agents/{id}/chat| G[对话]
1.2 API端点设计
# API端点设计规范
endpoints:
# Agent管理
POST /v1/agents # 创建Agent
GET /v1/agents # 列出Agent
GET /v1/agents/{id} # 获取Agent详情
PUT /v1/agents/{id} # 更新Agent
DELETE /v1/agents/{id} # 删除Agent
# 对话接口
POST /v1/agents/{id}/chat # 发送消息
GET /v1/agents/{id}/history # 获取历史
# 知识库管理
POST /v1/knowledge-bases # 创建知识库
POST /v1/knowledge-bases/{id}/documents # 上传文档
# Webhook配置
POST /v1/webhooks # 创建Webhook
GET /v1/webhooks # 列出Webhook
PUT /v1/webhooks/{id} # 更新Webhook1.3 请求响应格式
// 标准请求格式
{
"request_id": "uuid-v4",
"timestamp": "ISO-8601",
"data": {
// 业务数据
}
}
// 标准响应格式 - 成功
{
"success": true,
"data": {
// 响应数据
},
"request_id": "uuid-v4",
"timestamp": "ISO-8601"
}
// 标准响应格式 - 错误
{
"success": false,
"error": {
"code": "INVALID_PARAMETER",
"message": "参数验证失败",
"details": [
{"field": "name", "message": "名称不能为空"}
]
},
"request_id": "uuid-v4",
"timestamp": "ISO-8601"
}2. FastAPI框架实现
2.1 项目结构
ai_api/
├── app/
│ ├── __init__.py
│ ├── main.py # 应用入口
│ ├── api/
│ │ ├── __init__.py
│ │ ├── v1/
│ │ │ ├── __init__.py
│ │ │ ├── agents.py # Agent接口
│ │ │ ├── chat.py # 对话接口
│ │ │ └── knowledge.py # 知识库接口
│ │ └── deps.py # 依赖注入
│ ├── core/
│ │ ├── config.py # 配置
│ │ ├── security.py # 安全认证
│ │ └── rate_limit.py # 限流
│ ├── models/ # 数据模型
│ ├── services/ # 业务逻辑
│ └── schemas/ # Pydantic模型
├── requirements.txt
└── Dockerfile
2.2 应用入口
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.api.v1 import agents, chat, knowledge
from app.core.config import settings
from app.core.security import auth_middleware
from app.core.rate_limit import RateLimitMiddleware
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化
await initialize_services()
yield
# 关闭时清理
await cleanup_services()
app = FastAPI(
title="AI Agent API",
description="AI智能体API服务",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan
)
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 限流中间件
app.add_middleware(RateLimitMiddleware)
# 认证中间件
app.middleware("http")(auth_middleware)
# 注册路由
app.include_router(
agents.router,
prefix="/v1/agents",
tags=["Agent管理"]
)
app.include_router(
chat.router,
prefix="/v1/chat",
tags=["对话接口"]
)
app.include_router(
knowledge.router,
prefix="/v1/knowledge",
tags=["知识库"]
)2.3 Agent管理接口
# app/api/v1/agents.py
from fastapi import APIRouter, Depends, HTTPException, status
from typing import List
from uuid import uuid4
from app.api.deps import get_current_user, get_db
from app.schemas.agent import AgentCreate, AgentUpdate, AgentResponse
from app.services.agent_service import AgentService
router = APIRouter()
@router.post("/", response_model=AgentResponse, status_code=status.HTTP_201_CREATED)
async def create_agent(
agent_data: AgentCreate,
current_user = Depends(get_current_user),
db = Depends(get_db)
):
"""创建新的Agent"""
service = AgentService(db)
agent = await service.create(
user_id=current_user.id,
data=agent_data
)
return AgentResponse.from_orm(agent)
@router.get("/", response_model=List[AgentResponse])
async def list_agents(
skip: int = 0,
limit: int = 20,
current_user = Depends(get_current_user),
db = Depends(get_db)
):
"""列出用户的所有Agent"""
service = AgentService(db)
agents = await service.list_by_user(
user_id=current_user.id,
skip=skip,
limit=limit
)
return [AgentResponse.from_orm(a) for a in agents]
@router.get("/{agent_id}", response_model=AgentResponse)
async def get_agent(
agent_id: str,
current_user = Depends(get_current_user),
db = Depends(get_db)
):
"""获取Agent详情"""
service = AgentService(db)
agent = await service.get_by_id(agent_id)
if not agent or agent.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Agent不存在"
)
return AgentResponse.from_orm(agent)
@router.delete("/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_agent(
agent_id: str,
current_user = Depends(get_current_user),
db = Depends(get_db)
):
"""删除Agent"""
service = AgentService(db)
success = await service.delete(agent_id, current_user.id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Agent不存在"
)2.4 对话接口
# app/api/v1/chat.py
from fastapi import APIRouter, Depends, BackgroundTasks
from typing import Optional
from app.api.deps import get_current_user, get_db
from app.schemas.chat import ChatRequest, ChatResponse, StreamResponse
from app.services.chat_service import ChatService
router = APIRouter()
@router.post("/", response_model=ChatResponse)
async def chat(
agent_id: str,
request: ChatRequest,
current_user = Depends(get_current_user),
db = Depends(get_db)
):
"""同步对话接口"""
service = ChatService(db)
response = await service.chat(
agent_id=agent_id,
user_id=current_user.id,
message=request.message,
context=request.context
)
return ChatResponse(
message_id=response.message_id,
content=response.content,
metadata=response.metadata
)
@router.post("/stream")
async def chat_stream(
agent_id: str,
request: ChatRequest,
current_user = Depends(get_current_user)
):
"""流式对话接口"""
from fastapi.responses import StreamingResponse
async def generate():
service = ChatService(None)
async for chunk in service.chat_stream(
agent_id=agent_id,
user_id=current_user.id,
message=request.message
):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@router.post("/async")
async def chat_async(
agent_id: str,
request: ChatRequest,
background_tasks: BackgroundTasks,
current_user = Depends(get_current_user)
):
"""异步对话接口(Webhook回调)"""
from app.core.webhook import send_webhook
# 生成任务ID
task_id = str(uuid4())
# 异步处理
background_tasks.add_task(
process_chat_async,
task_id=task_id,
agent_id=agent_id,
user_id=current_user.id,
message=request.message,
webhook_url=request.webhook_url
)
return {"task_id": task_id, "status": "processing"}3. 认证与授权
3.1 JWT认证
# app/core/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, Request, status
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""哈希密码"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""创建JWT Token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=24)
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
def decode_token(token: str) -> dict:
"""解码Token"""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token无效",
headers={"WWW-Authenticate": "Bearer"},
)
async def auth_middleware(request: Request, call_next):
"""认证中间件"""
# 排除公开接口
if request.url.path in ["/docs", "/redoc", "/openapi.json", "/health"]:
return await call_next(request)
# 获取Token
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
# 允许访问,继续处理
return await call_next(request)
token = auth_header.split(" ")[1]
try:
payload = decode_token(token)
request.state.user = payload
except HTTPException:
pass # Token无效但不阻止公开接口
return await call_next(request)3.2 API Key认证
# app/core/api_key.py
from typing import Optional
from fastapi import Security, HTTPException, status
from fastapi.security import APIKeyHeader
from app.core.config import settings
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def verify_api_key(api_key: Optional[str] = Security(api_key_header)) -> dict:
"""验证API Key"""
if not api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="缺少API Key"
)
# 查询API Key对应的应用
app = await get_app_by_api_key(api_key)
if not app:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的API Key"
)
# 检查应用状态
if not app.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API Key已被禁用"
)
return {"app_id": app.id, "permissions": app.permissions}
class APIKeyManager:
"""API Key管理器"""
@staticmethod
async def create_key(app_id: str, name: str, permissions: list) -> tuple:
"""创建新的API Key"""
import secrets
key = f"sk_{secrets.token_urlsafe(32)}"
# 存储Key哈希
key_hash = hash_api_key(key)
await db.api_keys.create(
app_id=app_id,
name=name,
key_hash=key_hash,
permissions=permissions
)
return key, key_hash
@staticmethod
def hash_api_key(key: str) -> str:
"""哈希API Key"""
return hashlib.sha256(key.encode()).hexdigest()4. 限流策略
4.1 限流实现
# app/core/rate_limit.py
from fastapi import Request, HTTPException, status
from fastapi.middleware.base import BaseHTTPMiddleware
from typing import Dict
import time
from collections import defaultdict
class RateLimitMiddleware(BaseHTTPMiddleware):
"""限流中间件"""
def __init__(self, app, requests_per_minute: int = 60):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.window_size = 60 # 1分钟窗口
self.requests: Dict[str, list] = defaultdict(list)
async def dispatch(self, request: Request, call_next):
# 获取客户端标识
client_id = self._get_client_id(request)
# 检查限流
if not self._check_rate_limit(client_id):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="请求过于频繁,请稍后再试",
headers={
"Retry-After": str(self.window_size),
"X-RateLimit-Limit": str(self.requests_per_minute),
"X-RateLimit-Remaining": str(self._get_remaining(client_id)),
}
)
response = await call_next(request)
# 添加限流头
response.headers["X-RateLimit-Limit"] = str(self.requests_per_minute)
response.headers["X-RateLimit-Remaining"] = str(self._get_remaining(client_id))
return response
def _get_client_id(self, request: Request) -> str:
"""获取客户端标识"""
# 优先使用API Key
api_key = request.headers.get("X-API-Key")
if api_key:
return f"api_key:{hashlib.md5(api_key.encode()).hexdigest()}"
# 使用IP
return f"ip:{request.client.host}"
def _check_rate_limit(self, client_id: str) -> bool:
"""检查限流"""
now = time.time()
window_start = now - self.window_size
# 清理过期记录
self.requests[client_id] = [
t for t in self.requests[client_id] if t > window_start
]
# 检查是否超限
if len(self.requests[client_id]) >= self.requests_per_minute:
return False
# 记录请求
self.requests[client_id].append(now)
return True
def _get_remaining(self, client_id: str) -> int:
"""获取剩余请求数"""
return max(0, self.requests_per_minute - len(self.requests[client_id]))4.2 不同限流策略
# app/core/rate_limit_strategies.py
from abc import ABC, abstractmethod
from typing import Optional
import redis.asyncio as redis
class RateLimitStrategy(ABC):
"""限流策略基类"""
@abstractmethod
async def is_allowed(self, key: str) -> bool:
pass
class TokenBucket(RateLimitStrategy):
"""令牌桶算法"""
def __init__(self, rate: float, capacity: int, redis_client: redis.Redis):
self.rate = rate # 每秒添加的令牌数
self.capacity = capacity
self.redis = redis_client
async def is_allowed(self, key: str) -> bool:
lua_script = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
local tokens = tonumber(bucket[1]) or capacity
local last_update = tonumber(bucket[2]) or now
local added = (now - last_update) * rate
tokens = math.min(capacity, tokens + added)
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
redis.call('EXPIRE', key, 3600)
return 1
else
redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
redis.call('EXPIRE', key, 3600)
return 0
end
"""
result = await self.redis.eval(
lua_script, 1, key,
self.rate, self.capacity, time.time()
)
return bool(result)
class SlidingWindow(RateLimitStrategy):
"""滑动窗口算法"""
def __init__(self, max_requests: int, window_seconds: int, redis_client: redis.Redis):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.redis = redis_client
async def is_allowed(self, key: str) -> bool:
now = time.time()
window_start = now - self.window_seconds
pipe = self.redis.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, self.window_seconds)
results = await pipe.execute()
return results[1] < self.max_requests5. Webhook配置
5.1 Webhook处理
# app/core/webhook.py
from typing import Callable, Dict, Any
import httpx
import asyncio
import hmac
import hashlib
import json
from datetime import datetime
class WebhookManager:
"""Webhook管理器"""
def __init__(self):
self.handlers: Dict[str, Callable] = {}
self.retry_config = {
"max_retries": 3,
"retry_delay": 5, # 秒
"timeout": 30
}
def register(self, event_type: str, handler: Callable):
"""注册Webhook处理器"""
self.handlers[event_type] = handler
async def trigger(
self,
webhook_url: str,
event_type: str,
payload: dict,
secret: str = None
) -> bool:
"""触发Webhook"""
headers = {
"Content-Type": "application/json",
"X-Webhook-Event": event_type,
"X-Webhook-Timestamp": datetime.utcnow().isoformat()
}
# 生成签名
if secret:
signature = self._generate_signature(payload, secret)
headers["X-Webhook-Signature"] = signature
for attempt in range(self.retry_config["max_retries"]):
try:
async with httpx.AsyncClient() as client:
response = await client.post(
webhook_url,
json=payload,
headers=headers,
timeout=self.retry_config["timeout"]
)
if response.status_code == 200:
return True
except (httpx.TimeoutException, httpx.RequestError) as e:
if attempt < self.retry_config["max_retries"] - 1:
await asyncio.sleep(self.retry_config["retry_delay"] * (attempt + 1))
return False
def _generate_signature(self, payload: dict, secret: str) -> str:
"""生成签名"""
body = json.dumps(payload, sort_keys=True)
signature = hmac.new(
secret.encode(),
body.encode(),
hashlib.sha256
).hexdigest()
return f"sha256={signature}"
# Webhook事件处理
async def handle_chat_completed(webhook_url: str, data: dict):
"""处理对话完成事件"""
manager = WebhookManager()
payload = {
"event": "chat.completed",
"timestamp": datetime.utcnow().isoformat(),
"data": {
"message_id": data["message_id"],
"agent_id": data["agent_id"],
"content": data["content"],
"metadata": data.get("metadata", {})
}
}
success = await manager.trigger(
webhook_url,
"chat.completed",
payload,
secret=data.get("webhook_secret")
)
return success6. 多版本与灰度
6.1 API版本管理
# app/api/v1/__init__.py
from fastapi import APIRouter
from app.api.v1 import agents, chat, knowledge
api_router = APIRouter()
api_router.include_router(agents.router)
api_router.include_router(chat.router)
api_router.include_router(knowledge.router)
# 版本信息
VERSION = "1.0.0"
API_VERSIONS = ["v1", "v2-beta"]6.2 灰度发布
# app/core/canary.py
from typing import Callable
import random
import hashlib
class CanaryRouter:
"""灰度路由"""
def __init__(self):
self.rules = []
def add_rule(
self,
name: str,
condition: Callable[[dict], bool],
handler: Callable,
weight: float = 0.0
):
"""添加灰度规则"""
self.rules.append({
"name": name,
"condition": condition,
"handler": handler,
"weight": weight
})
async def route(self, request: dict, default_handler: Callable) -> Any:
"""路由请求"""
# 按权重灰度
for rule in self.rules:
if rule["weight"] > 0:
if self._check_weight(request, rule["name"], rule["weight"]):
return await rule["handler"](request)
# 按条件灰度
for rule in self.rules:
if rule["condition"](request):
return await rule["handler"](request)
return await default_handler(request)
def _check_weight(self, request: dict, rule_name: str, weight: float) -> bool:
"""检查权重"""
# 使用用户ID或IP做一致性哈希
user_id = request.get("user_id", request.get("ip", ""))
hash_value = int(
hashlib.md5(f"{rule_name}:{user_id}".encode()).hexdigest(),
16
)
return (hash_value % 100) < (weight * 100)
# 配置灰度规则
canary_router = CanaryRouter()
# 10%流量灰度到新版本
canary_router.add_rule(
name="new_model_v2",
condition=lambda r: False, # 不用条件,用权重
handler=lambda r: process_with_new_model(r),
weight=0.1 # 10%
)
# 特定用户灰度
canary_router.add_rule(
name="premium_users",
condition=lambda r: r.get("user_tier") == "premium",
handler=lambda r: process_with_new_model(r),
weight=1.0 # 100%
)7. 相关资源
- AI应用生产部署 - 企业级部署最佳实践
- 工作流设计模式 - 工作流设计原则
- 多Agent系统设计 - 多智能体架构
本文档由归愚知识系统自动生成 last updated: 2026-04-18