多Agent系统设计:从原理到实践

这篇指南讲什么

当单个Agent能力有限时,我们需要多个Agent协作。这篇指南从理论到实现,详细讲解多Agent系统的设计模式、通信机制、协作流程和实战案例。

为什么需要多Agent系统?

单Agent的局限性

单Agent系统面临的核心挑战:

问题说明多Agent解决方案
能力边界单模型能力有限分工给不同专长Agent
上下文限制Token窗口有限各Agent维护独立上下文
单点故障一个Agent挂了全挂冗余+容错设计
扩展性差新能力需重训添加新Agent即可
维护困难代码膨胀模块化、职责单一

多Agent的优势

┌─────────────────────────────────────────────────────────────┐
│                    多Agent协作示意图                              │
│                                                            │
│                    用户请求                                       │
│                       ↓                                       │
│              ┌───────────────┐                               │
│              │  协调Agent   │                               │
│              │ (Coordinator) │                               │
│              └───────┬───────┘                               │
│                      ↓                                        │
│        ┌──────────┼──────────┐                              │
│        ↓          ↓          ↓                              │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐                        │
│  │ 搜索Agent │ │ 分析Agent │ │ 写作Agent │                        │
│  └─────┬─────┘ └─────┬─────┘ └─────┬─────┘                        │
│        └──────────┼──────────┘                              │
│                   ↓                                         │
│              ┌───────────────┐                               │
│              │  协调Agent   │                               │
│              └───────┬───────┘                               │
│                      ↓                                        │
│                   用户结果                                      │
└─────────────────────────────────────────────────────────────┘

Agent角色分类

角色类型

角色职责特点
协调者(Coordinator)分解任务、分配工作、整合结果全局视角、决策能力强
专家Agent(Specialist)专注特定领域深度知识、执行高效
执行者(Executor)负责具体操作执行力强、简单任务
审核者(Reviewer)检查结果、质量把控严格、挑剔

典型Agent配置

场景Agent配置协作模式
智能客服接待+知识库+订单+投诉分层协作
代码助手需求分析+代码生成+测试+审查流水线
研究助手搜索+阅读+整理+写作串行协作
自动化办公日程+邮件+文档+审批事件驱动

通信协议设计

消息格式定义

from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import uuid
import json
 
class MessageType(Enum):
    """消息类型枚举"""
    REQUEST = "request"           # 请求消息
    RESPONSE = "response"         # 响应消息
    EVENT = "event"               # 事件通知
    BROADCAST = "broadcast"       # 广播消息
    HEARTBEAT = "heartbeat"       # 心跳检测
 
class MessagePriority(Enum):
    """消息优先级"""
    CRITICAL = 1   # 关键任务
    HIGH = 3        # 高优先级
    NORMAL = 5      # 普通
    LOW = 7         # 低优先级
    BACKGROUND = 9   # 后台任务
 
@dataclass
class AgentMessage:
    """Agent通信消息"""
    message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    
    # 发送方和接收方
    sender: Dict[str, Any] = field(default_factory=dict)
    receiver: Dict[str, Any] = field(default_factory=dict)
    
    # 消息类型和内容
    message_type: MessageType = MessageType.REQUEST
    content: Dict[str, Any] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def __post_init__(self):
        if 'priority' not in self.metadata:
            self.metadata['priority'] = MessagePriority.NORMAL.value
        if 'ttl' not in self.metadata:
            self.metadata['ttl'] = 300  # 默认5分钟生存时间
    
    def to_json(self) -> str:
        """序列化为JSON"""
        return json.dumps({
            'message_id': self.message_id,
            'timestamp': self.timestamp,
            'sender': self.sender,
            'receiver': self.receiver,
            'message_type': self.message_type.value,
            'content': self.content,
            'metadata': self.metadata
        }, ensure_ascii=False)
    
    @classmethod
    def from_json(cls, json_str: str) -> 'AgentMessage':
        """从JSON反序列化"""
        data = json.loads(json_str)
        return cls(
            message_id=data['message_id'],
            timestamp=data['timestamp'],
            sender=data['sender'],
            receiver=data['receiver'],
            message_type=MessageType(data['message_type']),
            content=data['content'],
            metadata=data['metadata']
        )

消息协议格式

# 消息协议格式
MessageProtocol:
  message_id: "uuid-v4"           # 全局唯一消息ID
  timestamp: "ISO-8601"            # 时间戳
  sender:                          # 发送方
    agent_id: "string"
    agent_type: "string"
    capabilities: ["capability_list"]
  
  receiver:                        # 接收方
    agent_id: "string"             # 空表示广播
    agent_type: "string"           # 类型匹配
  
  message_type:                    # 消息类型
    enum: ["request", "response", "event", "broadcast"]
  
  content:                         # 消息内容
    action: "string"              # 操作名称
    params: {}                     # 参数
    context: {}                     # 上下文
  
  metadata:                        # 元信息
    priority: 1-10                # 优先级
    ttl: 300                       # 生存时间(秒)
    retry_count: 0                # 重试次数

通信中间件实现

import asyncio
from typing import Callable, Dict, Set, List, Optional
from collections import defaultdict
import logging
from datetime import datetime
 
logger = logging.getLogger(__name__)
 
class MessageBroker:
    """消息代理 - 实现Agent间通信"""
    
    def __init__(self, history_limit: int = 1000):
        self.subscribers: Dict[str, Set[Callable]] = defaultdict(set)
        self.message_queue: asyncio.Queue = asyncio.Queue()
        self.message_history: List[AgentMessage] = []
        self.max_history = history_limit
        self._running = False
    
    async def start(self):
        """启动消息代理"""
        self._running = True
        self._processor = asyncio.create_task(self._process_messages())
    
    async def stop(self):
        """停止消息代理"""
        self._running = False
        if hasattr(self, '_processor'):
            self._processor.cancel()
    
    async def _process_messages(self):
        """异步处理消息队列"""
        while self._running:
            try:
                message = await asyncio.wait_for(
                    self.message_queue.get(),
                    timeout=1.0
                )
                await self._route_message(message)
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                logger.error(f"Message processing error: {e}")
    
    async def publish(self, message: AgentMessage) -> None:
        """发布消息"""
        # 记录历史
        self.message_history.append(message)
        if len(self.message_history) > self.max_history:
            self.message_history.pop(0)
        
        # 加入队列
        await self.message_queue.put(message)
    
    async def subscribe(
        self, 
        agent_id: str, 
        callback: Callable[[AgentMessage], None]
    ) -> None:
        """订阅消息"""
        self.subscribers[agent_id].add(callback)
    
    async def unsubscribe(
        self,
        agent_id: str,
        callback: Callable[[AgentMessage], None]
    ) -> None:
        """取消订阅"""
        self.subscribers[agent_id].discard(callback)
    
    async def _route_message(self, message: AgentMessage) -> None:
        """路由消息"""
        # 点对点消息
        if message.receiver.get('agent_id'):
            await self._deliver_to_agent(message)
        # 类型广播
        elif message.receiver.get('agent_type'):
            await self._broadcast_to_type(message)
        # 全局广播
        else:
            await self._broadcast_all(message)
    
    async def _deliver_to_agent(self, message: AgentMessage):
        """发送给指定Agent"""
        agent_id = message.receiver['agent_id']
        callbacks = self.subscribers.get(agent_id, set())
        
        for callback in callbacks:
            try:
                await callback(message)
            except Exception as e:
                logger.error(f"Callback error for {agent_id}: {e}")
    
    async def _broadcast_to_type(self, message: AgentMessage):
        """按类型广播"""
        agent_type = message.receiver.get('agent_type')
        # 简化实现:广播给所有订阅者
        for agent_id, callbacks in self.subscribers.items():
            for callback in callbacks:
                try:
                    await callback(message)
                except Exception as e:
                    logger.error(f"Broadcast error: {e}")
    
    async def _broadcast_all(self, message: AgentMessage):
        """全局广播"""
        for agent_id, callbacks in self.subscribers.items():
            for callback in callbacks:
                try:
                    await callback(message)
                except Exception as e:
                    logger.error(f"Broadcast error: {e}")
    
    async def request(
        self,
        sender_id: str,
        receiver_id: str,
        action: str,
        params: Dict[str, Any],
        timeout: float = 30.0
    ) -> AgentMessage:
        """发送请求并等待响应"""
        request_msg = AgentMessage(
            sender={'agent_id': sender_id},
            receiver={'agent_id': receiver_id},
            message_type=MessageType.REQUEST,
            content={'action': action, 'params': params}
        )
        
        # 创建Future等待响应
        response_future: asyncio.Future = asyncio.Future()
        
        async def response_handler(msg: AgentMessage):
            if (msg.message_type == MessageType.RESPONSE and 
                msg.content.get('request_id') == request_msg.message_id):
                response_future.set_result(msg)
        
        # 订阅响应
        await self.subscribe(sender_id, response_handler)
        
        try:
            # 发送请求
            await self.publish(request_msg)
            
            # 等待响应
            return await asyncio.wait_for(response_future, timeout)
        except asyncio.TimeoutError:
            raise TimeoutError(f"Request to {receiver_id} timed out")
        finally:
            await self.unsubscribe(sender_id, response_handler)

协作模式详解

模式一:串行协作(Sequential)

最简单的协作模式,按顺序执行:

graph LR
    A[用户] --> B[Agent1]
    B --> C[Agent2]
    C --> D[Agent3]
    D --> E[输出]
    
    B -.->|传递上下文| C
    C -.->|传递上下文| D

适用场景: 任务有严格先后依赖

class SequentialCollaboration:
    """串行协作模式"""
    
    async def execute(
        self, 
        agents: List['BaseAgent'],
        initial_input: Any,
        context: Dict = None
    ) -> Dict[str, Any]:
        """顺序执行所有Agent"""
        context = context or {}
        context['input'] = initial_input
        
        results = {}
        
        for agent in agents:
            logger.info(f"Executing {agent.name}")
            
            # 执行Agent
            result = await agent.process(context)
            results[agent.name] = result
            
            # 检查是否提前结束
            if result.get('should_stop'):
                logger.info(f"Early stop at {agent.name}")
                break
            
            # 更新上下文
            context[agent.name] = result
            context['previous_output'] = result.get('output')
        
        return {
            'results': results,
            'final_output': context.get('previous_output'),
            'all_context': context
        }
 
 
# 使用示例
async def document_generation_pipeline():
    """文档生成流水线"""
    agents = [
        ResearchAgent(name="研究", model="gpt-4o"),
        OutlineAgent(name="大纲", model="gpt-4o"),
        DraftAgent(name="初稿", model="gpt-4o"),
        ReviewAgent(name="审核", model="gpt-4o"),
        FormatAgent(name="格式", model="gpt-4o-mini")
    ]
    
    pipeline = SequentialCollaboration()
    result = await pipeline.execute(
        agents=agents,
        initial_input="一篇关于AI大模型发展趋势的研究报告"
    )
    
    return result['final_output']

模式二:并行协作(Parallel)

多个Agent同时工作,提高效率:

graph TD
    A[用户请求] --> B[分发器]
    B --> C[Agent1]
    B --> D[Agent2]
    B --> E[Agent3]
    C --> F[聚合器]
    D --> F
    E --> F
    F --> G[输出]

适用场景: 任务可分解为独立子任务

import asyncio
from typing import List, Callable, Any, Dict, Optional
from concurrent.futures import ThreadPoolExecutor
 
class ParallelCollaboration:
    """并行协作模式"""
    
    def __init__(self, max_concurrency: int = 5):
        self.max_concurrency = max_concurrency
        self.semaphore = asyncio.Semaphore(max_concurrency)
    
    async def execute(
        self,
        agents: List['BaseAgent'],
        initial_input: Any,
        aggregator: Optional[Callable] = None
    ) -> Dict[str, Any]:
        """并行执行Agent"""
        
        async def execute_with_semaphore(agent: 'BaseAgent') -> Dict:
            async with self.semaphore:
                try:
                    result = await agent.process(initial_input)
                    return {'success': True, 'agent': agent.name, 'result': result}
                except Exception as e:
                    logger.error(f"{agent.name} failed: {e}")
                    return {'success': False, 'agent': agent.name, 'error': str(e)}
        
        # 创建所有任务
        tasks = [execute_with_semaphore(agent) for agent in agents]
        
        # 并行执行
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 处理异常
        successful = []
        failed = []
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                failed.append({'agent': agents[i].name, 'error': str(result)})
            elif not result.get('success'):
                failed.append({'agent': result['agent'], 'error': result.get('error')})
            else:
                successful.append(result)
        
        # 聚合结果
        aggregated = None
        if aggregator and successful:
            aggregated = aggregator([r['result'] for r in successful])
        
        return {
            'successful': successful,
            'failed': failed,
            'aggregated': aggregated,
            'success_rate': len(successful) / len(agents) if agents else 0
        }
 
 
# 使用示例
async def multi_perspective_analysis():
    """多视角分析"""
    agents = [
        TechnicalAnalysisAgent(name="技术", model="gpt-4o"),
        MarketAnalysisAgent(name="市场", model="gpt-4o"),
        RiskAnalysisAgent(name="风险", model="gpt-4o"),
        CompetitorAgent(name="竞品", model="gpt-4o")
    ]
    
    async def aggregate(results: List[Dict]) -> Dict:
        return {
            'technical': results[0],
            'market': results[1],
            'risk': results[2],
            'competitor': results[3],
            'summary': await generate_summary(results)
        }
    
    parallel = ParallelCollaboration()
    result = await parallel.execute(agents, topic, aggregate)
    
    return result

模式三:分层协作(Hierarchical)

Supervisor模式,Agent管理子Agent:

graph TD
    A[用户] --> B[Supervisor]
    B --> C[Worker1]
    B --> D[Worker2]
    B --> E[Worker3]
    
    C --> B
    D --> B
    E --> B
    
    B -->|路由| F[工具调用]
    B -->|查询| G[知识库]

适用场景: 需要智能路由和任务分配

class HierarchicalCollaboration:
    """分层协作模式"""
    
    def __init__(self):
        self.supervisor: Optional['SupervisorAgent'] = None
        self.workers: Dict[str, 'BaseAgent'] = {}
        self.router = TaskRouter()
    
    def add_worker(self, worker: 'BaseAgent'):
        """添加Worker"""
        self.workers[worker.name] = worker
        self.router.register_worker(worker)
    
    def set_supervisor(self, supervisor: 'SupervisorAgent'):
        """设置Supervisor"""
        self.supervisor = supervisor
        supervisor.set_workers(self.workers)
        supervisor.set_router(self.router)
    
    async def execute(self, user_input: str) -> Dict[str, Any]:
        """执行分层协作"""
        if not self.supervisor:
            raise ValueError("Supervisor not set")
        
        # Supervisor分析并规划
        plan = await self.supervisor.plan(user_input)
        
        if plan['type'] == 'direct':
            # 直接回答
            return await self.supervisor.execute_direct(plan)
        
        elif plan['type'] == 'multi_step':
            # 多步骤任务
            results = []
            
            for step in plan['steps']:
                # 路由到合适的Worker
                worker = self.router.route(step)
                
                if worker:
                    result = await worker.process(step)
                    results.append(result)
                    
                    # Supervisor审核
                    if not await self.supervisor.verify(result):
                        # 重新执行
                        result = await worker.retry(step)
                        results[-1] = result
            
            return await self.supervisor.synthesize(results)
        
        return {'error': 'Unknown plan type'}
 
 
class SupervisorAgent:
    """监督Agent"""
    
    def __init__(self, name: str, model: str):
        self.name = name
        self.model = model
        self.workers = {}
        self.router = None
        self.llm = LLMClient(model)
    
    def set_workers(self, workers: Dict[str, 'BaseAgent']):
        self.workers = workers
    
    def set_router(self, router: 'TaskRouter'):
        self.router = router
    
    async def plan(self, user_input: str) -> Dict[str, Any]:
        """任务规划"""
        # 调用LLM进行规划
        prompt = f"""
用户请求:{user_input}
 
可用Worker:
{self._format_workers()}
 
请分析请求,判断:
1. 是简单问答还是复杂任务
2. 需要哪些Worker协作
3. 协作顺序是什么
 
返回JSON格式。
"""
        
        response = await self.llm.chat([{"role": "user", "content": prompt}])
        
        try:
            return json.loads(response)
        except:
            return {'type': 'direct', 'reason': 'parsing_failed'}
    
    async def verify(self, result: Dict) -> bool:
        """验证结果"""
        confidence = result.get('confidence', 1.0)
        return confidence >= 0.7
    
    def _format_workers(self) -> str:
        return "\n".join([
            f"- {name}: {worker.capabilities}"
            for name, worker in self.workers.items()
        ])
 
 
class TaskRouter:
    """任务路由器"""
    
    def __init__(self):
        self.workers: Dict[str, 'BaseAgent'] = {}
        self.capability_index: Dict[str, List[str]] = defaultdict(list)
    
    def register_worker(self, worker: 'BaseAgent'):
        self.workers[worker.name] = worker
        for cap in worker.capabilities:
            self.capability_index[cap].append(worker.name)
    
    def route(self, task: Dict) -> Optional['BaseAgent']:
        """根据任务路由到合适的Worker"""
        required_cap = task.get('required_capability')
        
        if not required_cap:
            return None
        
        suitable_workers = self.capability_index.get(required_cap, [])
        
        if not suitable_workers:
            return None
        
        return self.workers[suitable_workers[0]]

模式四:网状协作(Mesh)

Agent全连接,互相协作:

graph Full
    A[Agent1] <--> B[Agent2]
    A <--> C[Agent3]
    A <--> D[Agent4]
    B <--> C
    B <--> D
    C <--> D

适用场景: 需要多方协商、共识决策

class MeshCollaboration:
    """网状协作模式"""
    
    def __init__(self, broker: MessageBroker):
        self.broker = broker
        self.agents: Dict[str, 'BaseAgent'] = {}
        self.connections: Dict[str, Set[str]] = defaultdict(set)
    
    def add_agent(self, agent: 'BaseAgent', connections: List[str] = None):
        """添加Agent及其连接"""
        self.agents[agent.name] = agent
        
        if connections:
            for conn in connections:
                self.connections[agent.name].add(conn)
                self.connections[conn].add(agent.name)
    
    async def execute(self, initiating_agent: str, task: Any) -> Dict:
        """网状协作执行"""
        # 创建共识收集任务
        consensus_tasks = []
        
        for agent_name in self.connections[initiating_agent]:
            agent = self.agents[agent_name]
            consensus_tasks.append(
                self._get_opinion(agent, task)
            )
        
        # 并行收集意见
        opinions = await asyncio.gather(*consensus_tasks, return_exceptions=True)
        
        # 共识决策
        initiator = self.agents[initiating_agent]
        return await initiator.reach_consensus(opinions)
    
    async def _get_opinion(self, agent: 'BaseAgent', task: Any) -> Dict:
        """获取Agent的意见"""
        try:
            result = await agent.process(task)
            return {
                'agent': agent.name,
                'opinion': result.get('opinion'),
                'confidence': result.get('confidence', 0.5)
            }
        except Exception as e:
            return {
                'agent': agent.name,
                'error': str(e),
                'confidence': 0
            }

共享知识库设计

知识库架构

from typing import List, Optional, Dict, Any, Set
import hashlib
from datetime import datetime
 
class SharedKnowledgeBase:
    """共享知识库"""
    
    def __init__(
        self,
        vector_store: 'VectorStore',
        graph_store: 'GraphStore' = None,
        metadata_store: 'MetadataStore' = None
    ):
        self.vector_store = vector_store
        self.graph_store = graph_store
        self.metadata_store = metadata_store or InMemoryMetadataStore()
        self.version_control = VersionControl()
        self.locks: Dict[str, asyncio.Lock] = {}
    
    async def write(
        self,
        agent_id: str,
        content: str,
        knowledge_type: str,
        metadata: Dict[str, Any]
    ) -> str:
        """写入知识"""
        # 生成知识ID
        knowledge_id = self._generate_id(
            agent_id, content, datetime.now().isoformat()
        )
        
        # 检查写入权限
        if not await self._check_write_permission(agent_id, knowledge_id):
            raise PermissionError(f"Agent {agent_id} lacks write permission")
        
        # 并发控制
        async with self._get_lock(knowledge_id):
            # 向量化
            embedding = await self._embed(content)
            
            # 事务写入
            await self.vector_store.add(knowledge_id, embedding)
            
            if self.graph_store:
                await self.graph_store.add_triples(
                    self._extract_entities(content)
                )
            
            await self.metadata_store.save(knowledge_id, {
                'agent_id': agent_id,
                'type': knowledge_type,
                'metadata': metadata,
                'created_at': datetime.now().isoformat(),
                'version': 1
            })
        
        return knowledge_id
    
    async def query(
        self,
        agent_id: str,
        query: str,
        top_k: int = 10,
        filters: Dict = None
    ) -> List[Dict]:
        """查询知识"""
        # 权限检查
        if not await self._check_read_permission(agent_id):
            raise PermissionError(f"Agent {agent_id} lacks read permission")
        
        # 向量检索
        query_embedding = await self._embed(query)
        vector_results = await self.vector_store.search(
            query_embedding, top_k
        )
        
        # 图谱扩展
        related = []
        if self.graph_store and vector_results:
            related = await self.graph_store.expand(
                [r['id'] for r in vector_results],
                depth=2
            )
        
        # 结果融合
        fused = self._rerank(query, vector_results + related, agent_id)
        
        # 过滤
        if filters:
            fused = self._apply_filters(fused, filters)
        
        return fused
    
    async def subscribe(self, agent_id: str) -> 'AsyncIterator':
        """订阅知识更新"""
        pubsub = await self._create_pubsub(agent_id)
        async for update in pubsub.listen():
            yield update
    
    def _generate_id(self, *parts) -> str:
        """生成唯一ID"""
        content = "|".join(str(p) for p in parts)
        return hashlib.sha256(content.encode()).hexdigest()[:16]
    
    async def _embed(self, text: str) -> List[float]:
        """文本向量化"""
        # 实际实现调用embedding服务
        pass
    
    def _extract_entities(self, content: str) -> List:
        """提取实体"""
        # 实际实现调用NER服务
        pass
    
    async def _check_write_permission(
        self,
        agent_id: str,
        knowledge_id: str
    ) -> bool:
        """检查写入权限"""
        return True  # 简化实现
    
    async def _check_read_permission(self, agent_id: str) -> bool:
        """检查读取权限"""
        return True  # 简化实现
    
    def _rerank(
        self,
        query: str,
        results: List[Dict],
        agent_id: str
    ) -> List[Dict]:
        """结果重排序"""
        # 实现Rerank逻辑
        return results[:10]
    
    def _apply_filters(
        self,
        results: List[Dict],
        filters: Dict
    ) -> List[Dict]:
        """应用过滤器"""
        filtered = []
        for r in results:
            meta = r.get('metadata', {})
            if all(meta.get(k) == v for k, v in filters.items()):
                filtered.append(r)
        return filtered
    
    async def _get_lock(self, knowledge_id: str) -> asyncio.Lock:
        """获取锁"""
        if knowledge_id not in self.locks:
            self.locks[knowledge_id] = asyncio.Lock()
        return self.locks[knowledge_id]

冲突解决机制

冲突类型

冲突类型描述解决策略
资源冲突多Agent争用同一资源锁机制/优先级
决策冲突不同Agent给出不同建议投票/仲裁
状态冲突并发修改导致状态不一致乐观锁/版本控制
知识冲突知识库中存在矛盾信息置信度评估

冲突解决实现

from enum import Enum
from typing import List, Optional, Any
from dataclasses import dataclass
 
class ConflictResolutionStrategy(Enum):
    """冲突解决策略"""
    PRIORITY = "priority"           # 优先级策略
    VOTING = "voting"               # 投票策略
    ARBITRATION = "arbitration"     # 仲裁策略
    MERGE = "merge"                 # 合并策略
    TIMESTAMP = "timestamp"          # 时间戳策略
 
@dataclass
class Conflict:
    """冲突信息"""
    conflict_id: str
    conflict_type: str
    agents: List[str]
    candidates: List[Dict]
    context: Dict
 
class ConflictResolver:
    """冲突解决器"""
    
    def __init__(self):
        self.strategies = {
            ConflictResolutionStrategy.PRIORITY: self._priority_resolve,
            ConflictResolutionStrategy.VOTING: self._voting_resolve,
            ConflictResolutionStrategy.ARBITRATION: self._arbitration_resolve,
            ConflictResolutionStrategy.MERGE: self._merge_resolve,
        }
        self.agent_priorities: Dict[str, int] = {}
    
    def set_priority(self, agent_id: str, priority: int):
        """设置Agent优先级"""
        self.agent_priorities[agent_id] = priority
    
    async def resolve(
        self,
        conflict: Conflict,
        strategy: ConflictResolutionStrategy
    ) -> Any:
        """解决冲突"""
        resolver = self.strategies.get(strategy)
        if resolver:
            return await resolver(conflict)
        return conflict.candidates[0] if conflict.candidates else None
    
    async def _priority_resolve(self, conflict: Conflict) -> Any:
        """优先级策略"""
        candidates = conflict.candidates
        
        # 按Agent优先级排序
        sorted_candidates = sorted(
            candidates,
            key=lambda x: self.agent_priorities.get(x.get('agent_id'), 0),
            reverse=True
        )
        
        return sorted_candidates[0]['result'] if sorted_candidates else None
    
    async def _voting_resolve(self, conflict: Conflict) -> Any:
        """投票策略"""
        votes = defaultdict(int)
        
        # 统计投票
        for candidate in conflict.candidates:
            result_key = str(candidate.get('result'))
            votes[result_key] += 1
        
        # 返回票数最多的结果
        if votes:
            winner = max(votes.items(), key=lambda x: x[1])
            # 找到对应的candidate
            for c in conflict.candidates:
                if str(c.get('result')) == winner[0]:
                    return c['result']
        
        return None
    
    async def _arbitration_resolve(self, conflict: Conflict) -> Any:
        """仲裁策略"""
        # 使用预定义的仲裁规则
        if conflict.conflict_type == 'resource':
            # 资源冲突:先到先得
            return conflict.candidates[0]['result']
        elif conflict.conflict_type == 'decision':
            # 决策冲突:使用置信度
            best = max(conflict.candidates, key=lambda x: x.get('confidence', 0))
            return best['result']
        
        return None
    
    async def _merge_resolve(self, conflict: Conflict) -> Any:
        """合并策略"""
        # 尝试合并多个结果
        results = [c.get('result') for c in conflict.candidates if c.get('result')]
        
        if not results:
            return None
        
        if isinstance(results[0], dict):
            # 合并字典
            merged = {}
            for r in results:
                merged.update(r)
            return merged
        elif isinstance(results[0], list):
            # 合并列表
            merged = []
            seen = set()
            for r in results:
                for item in r:
                    if item not in seen:
                        merged.append(item)
                        seen.add(item)
            return merged
        else:
            # 返回第一个
            return results[0]

完整实战案例

案例:智能研究助手

class ResearchAssistantSystem:
    """研究助手系统"""
    
    def __init__(self, config: Dict):
        # 初始化组件
        self.broker = MessageBroker()
        self.knowledge_base = SharedKnowledgeBase(
            vector_store=self._init_vector_store(config),
            graph_store=GraphStore()
        )
        
        # 初始化Agent
        self.coordinator = CoordinatorAgent(
            name="coordinator",
            model=config['model'],
            broker=self.broker
        )
        self.search_agent = SearchAgent(
            name="search",
            model=config['model'],
            broker=self.broker
        )
        self.analysis_agent = AnalysisAgent(
            name="analysis",
            model=config['model'],
            broker=self.broker
        )
        self.writer_agent = WriterAgent(
            name="writer",
            model=config['model'],
            broker=self.broker
        )
        self.reviewer_agent = ReviewerAgent(
            name="reviewer",
            model=config['model'],
            broker=self.broker
        )
        
        # 注册到协调者
        self.coordinator.register_workers({
            'search': self.search_agent,
            'analysis': self.analysis_agent,
            'writer': self.writer_agent,
            'reviewer': self.reviewer_agent
        })
        
        # 启动消息代理
        self.broker.start()
    
    async def research(self, topic: str) -> str:
        """执行研究任务"""
        # 1. 协调者规划任务
        plan = await self.coordinator.plan(topic)
        
        if plan['type'] == 'simple':
            # 简单查询直接回答
            return await self.coordinator.answer(topic)
        
        # 2. 并行执行搜索与分析
        search_task = self.search_agent.search(plan['search_queries'])
        analysis_task = self.analysis_agent.analyze(plan['analysis_framework'])
        
        search_results, analysis_results = await asyncio.gather(
            search_task, analysis_task
        )
        
        # 3. 存储到知识库
        await self.knowledge_base.write(
            agent_id='search',
            content=json.dumps(search_results),
            knowledge_type='research_data',
            metadata={'topic': topic}
        )
        
        # 4. 写作Agent生成初稿
        draft = await self.writer_agent.write(
            topic=topic,
            search_results=search_results,
            analysis_results=analysis_results
        )
        
        # 5. 审核Agent质量检查
        review = await self.reviewer_agent.review(draft)
        
        # 6. 如需修订,循环处理
        iteration = 0
        max_iterations = 3
        
        while review['needs_revision'] and iteration < max_iterations:
            draft = await self.writer_agent.revise(
                draft, 
                review['feedback']
            )
            review = await self.reviewer_agent.review(draft)
            iteration += 1
        
        return draft['final_report']
    
    def _init_vector_store(self, config: Dict):
        """初始化向量存储"""
        # 根据配置选择合适的向量存储
        if config.get('use_pinecone'):
            return PineconeVectorStore(
                api_key=config['pinecone_api_key'],
                index_name=config['pinecone_index']
            )
        return InMemoryVectorStore()
 
 
# 使用示例
async def main():
    config = {
        'model': 'gpt-4o',
        'use_pinecone': False,
    }
    
    assistant = ResearchAssistantSystem(config)
    
    topic = "AI大模型在医疗领域的应用前景"
    report = await assistant.research(topic)
    
    print(report)
 
 
# 运行
asyncio.run(main())

Agent基类实现

from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
 
class BaseAgent(ABC):
    """Agent基类"""
    
    def __init__(
        self,
        name: str,
        model: str,
        broker: MessageBroker = None
    ):
        self.name = name
        self.model = model
        self.broker = broker
        self.capabilities: List[str] = []
        self.llm = LLMClient(model)
        self.memory: List[Dict] = []
    
    @abstractmethod
    async def process(self, input_data: Any) -> Dict[str, Any]:
        """处理输入,返回结果"""
        pass
    
    async def send_message(
        self,
        receiver: str,
        action: str,
        params: Dict[str, Any]
    ) -> Optional[AgentMessage]:
        """发送消息"""
        if not self.broker:
            return None
        
        try:
            response = await self.broker.request(
                sender_id=self.name,
                receiver_id=receiver,
                action=action,
                params=params
            )
            return response
        except TimeoutError:
            logger.warning(f"Request to {receiver} timed out")
            return None
    
    async def broadcast(
        self,
        action: str,
        params: Dict[str, Any]
    ):
        """广播消息"""
        if not self.broker:
            return
        
        message = AgentMessage(
            sender={'agent_id': self.name},
            receiver={},  # 空表示广播
            message_type=MessageType.BROADCAST,
            content={'action': action, 'params': params}
        )
        await self.broker.publish(message)
    
    def add_to_memory(self, item: Dict):
        """添加到记忆"""
        self.memory.append(item)
        # 限制记忆大小
        if len(self.memory) > 100:
            self.memory.pop(0)
 
 
class CoordinatorAgent(BaseAgent):
    """协调Agent"""
    
    def __init__(self, name: str, model: str, broker: MessageBroker):
        super().__init__(name, model, broker)
        self.workers: Dict[str, BaseAgent] = {}
        self.router = TaskRouter()
    
    def register_workers(self, workers: Dict[str, BaseAgent]):
        """注册Worker"""
        self.workers = workers
        for name, worker in workers.items():
            self.router.register_worker(worker)
    
    async def plan(self, user_input: str) -> Dict[str, Any]:
        """任务规划"""
        prompt = f"""
分析用户请求:{user_input}
 
可用能力:
{self._format_capabilities()}
 
判断任务类型和需要的协作方式。
"""
        
        response = await self.llm.chat([{"role": "user", "content": prompt}])
        
        try:
            return json.loads(response)
        except:
            return {'type': 'simple'}
    
    async def process(self, input_data: Any) -> Dict[str, Any]:
        """处理协调任务"""
        plan = await self.plan(input_data)
        
        if plan['type'] == 'simple':
            return await self.answer(input_data)
        else:
            return await self._delegate_to_workers(plan)
    
    async def answer(self, question: str) -> Dict[str, Any]:
        """直接回答"""
        response = await self.llm.chat([{"role": "user", "content": question}])
        return {'answer': response, 'confidence': 1.0}
    
    async def _delegate_to_workers(self, plan: Dict) -> Dict:
        """委托给Worker处理"""
        results = []
        
        for step in plan.get('steps', []):
            worker = self.router.route(step)
            if worker:
                result = await worker.process(step)
                results.append(result)
        
        return await self.synthesize(results)
    
    async def synthesize(self, results: List[Dict]) -> Dict:
        """整合结果"""
        synthesis_prompt = f"""
整合以下结果:
{json.dumps(results, ensure_ascii=False)}
 
生成最终回答。
"""
        
        response = await self.llm.chat([{"role": "user", "content": synthesis_prompt}])
        
        return {
            'answer': response,
            'confidence': 0.8,
            'sources': results
        }
    
    def _format_capabilities(self) -> str:
        return "\n".join([
            f"- {name}: {worker.capabilities}"
            for name, worker in self.workers.items()
        ])
 
 
class SearchAgent(BaseAgent):
    """搜索Agent"""
    
    def __init__(self, name: str, model: str, broker: MessageBroker):
        super().__init__(name, model, broker)
        self.capabilities = ["web_search", "knowledge_query"]
    
    async def search(self, queries: List[str]) -> Dict[str, Any]:
        """执行搜索"""
        # 实现搜索逻辑
        results = []
        for query in queries:
            # 调用搜索API
            search_result = await self._perform_search(query)
            results.append(search_result)
        
        return {
            'query_results': results,
            'query_count': len(queries)
        }
    
    async def process(self, input_data: Any) -> Dict[str, Any]:
        """处理任务"""
        if isinstance(input_data, dict) and 'queries' in input_data:
            return await self.search(input_data['queries'])
        
        return await self.search([str(input_data)])
    
    async def _perform_search(self, query: str) -> Dict:
        """执行搜索"""
        # 简化实现
        return {'query': query, 'results': []}
 
 
class WriterAgent(BaseAgent):
    """写作Agent"""
    
    def __init__(self, name: str, model: str, broker: MessageBroker):
        super().__init__(name, model, broker)
        self.capabilities = ["writing", "summarization"]
    
    async def write(
        self,
        topic: str,
        search_results: Dict,
        analysis_results: Dict
    ) -> Dict[str, Any]:
        """生成报告"""
        prompt = f"""
根据以下研究和分析结果,撰写一份研究报告:
 
主题:{topic}
 
搜索结果:
{json.dumps(search_results, ensure_ascii=False)}
 
分析结果:
{json.dumps(analysis_results, ensure_ascii=False)}
 
要求:
1. 结构完整,包含摘要、正文、结论
2. 内容详实,有数据支撑
3. 语言流畅,逻辑清晰
"""
        
        report = await self.llm.chat([{"role": "user", "content": prompt}])
        
        return {
            'draft': report,
            'confidence': 0.7
        }
    
    async def revise(self, draft: Dict, feedback: str) -> Dict:
        """修订报告"""
        prompt = f"""
根据反馈修订报告:
 
原报告:
{draft.get('draft')}
 
反馈:
{feedback}
 
请根据反馈修改报告。
"""
        
        revised = await self.llm.chat([{"role": "user", "content": prompt}])
        
        return {
            'draft': revised,
            'confidence': 0.8
        }
    
    async def process(self, input_data: Any) -> Dict[str, Any]:
        return await self.write(
            input_data.get('topic'),
            input_data.get('search_results', {}),
            input_data.get('analysis_results', {})
        )
 
 
class ReviewerAgent(BaseAgent):
    """审核Agent"""
    
    def __init__(self, name: str, model: str, broker: MessageBroker):
        super().__init__(name, model, broker)
        self.capabilities = ["review", "quality_check"]
    
    async def review(self, draft: Dict) -> Dict[str, Any]:
        """审核报告"""
        prompt = f"""
审核以下研究报告:
 
{draft.get('draft')}
 
检查:
1. 内容是否准确
2. 结构是否合理
3. 是否有逻辑漏洞
4. 是否符合要求
 
返回JSON:
{{
  "needs_revision": true/false,
  "feedback": "具体反馈",
  "issues": ["问题列表"]
}}
"""
        
        response = await self.llm.chat([{"role": "user", "content": prompt}])
        
        try:
            result = json.loads(response)
            return {
                'needs_revision': result.get('needs_revision', False),
                'feedback': result.get('feedback', ''),
                'issues': result.get('issues', [])
            }
        except:
            return {
                'needs_revision': False,
                'feedback': '',
                'issues': []
            }
    
    async def process(self, input_data: Any) -> Dict[str, Any]:
        return await self.review(input_data)

总结

多Agent系统设计核心要点:

方面关键点
协作模式串行/并行/分层/网状,根据场景选择
通信协议统一消息格式、异步通信、消息队列
知识共享向量存储+图存储、权限控制、版本管理
冲突解决优先级/投票/仲裁/合并
容错设计超时处理、重试机制、降级方案

相关资源


本文档由归愚知识系统生成 last updated: 2026-04-24