From fc56e1b62489b87a4b374aefc8f04650b40792dc Mon Sep 17 00:00:00 2001 From: Mark Date: Wed, 7 Jan 2026 17:48:17 +0800 Subject: [PATCH] [modify] multi agent orchestrator --- api/app/models/multi_agent_model.py | 11 +- api/app/schemas/multi_agent_schema.py | 11 +- api/app/services/app_service.py | 7 +- .../services/collaborative_orchestrator.py | 2 +- .../multi_agent_handoffs_integration.py | 91 +++--- api/app/services/multi_agent_orchestrator.py | 275 ++++++++++++++---- 6 files changed, 272 insertions(+), 125 deletions(-) diff --git a/api/app/models/multi_agent_model.py b/api/app/models/multi_agent_model.py index 2b41d9ee..07134d27 100644 --- a/api/app/models/multi_agent_model.py +++ b/api/app/models/multi_agent_model.py @@ -13,10 +13,9 @@ from app.schemas import ModelParameters class OrchestrationMode(StrEnum): - """图标类型枚举""" - SEQUENTIAL = "sequential" - PARALLEL = "parallel" - CONDITIONAL = "conditional" + """协作模式枚举""" + COLLABORATION = "collaboration" # 协作模式:Agent 之间可以相互 handoff + SUPERVISOR = "supervisor" # 监督模式:由主 Agent 统一调度子 Agent class AggregationStrategy(StrEnum): """图标类型枚举""" @@ -66,8 +65,8 @@ class MultiAgentConfig(Base): orchestration_mode = Column( String(20), nullable=False, - default="conditional", - comment="协作模式: sequential|parallel|conditional|loop" + default="collaboration", + comment="协作模式: collaboration(协作)| supervisor(监督)" ) # 子 Agent 列表 diff --git a/api/app/schemas/multi_agent_schema.py b/api/app/schemas/multi_agent_schema.py index c666a2c0..ca9d97b1 100644 --- a/api/app/schemas/multi_agent_schema.py +++ b/api/app/schemas/multi_agent_schema.py @@ -66,9 +66,9 @@ class MultiAgentConfigCreate(BaseModel): master_agent_id: uuid.UUID = Field(..., description="主 Agent ID") master_agent_name: Optional[str] = Field(None, max_length=100, description="主 Agent 名称") orchestration_mode: str = Field( - ..., - pattern="^(sequential|parallel|conditional|loop)$", - description="编排模式:sequential|parallel|conditional|loop" + default="collaboration", + pattern="^(collaboration|supervisor)$", + description="协作模式:collaboration(协作)| supervisor(监督)" ) sub_agents: List[SubAgentConfig] = Field(..., description="子 Agent 列表") routing_rules: Optional[List[RoutingRule]] = Field(None, description="路由规则") @@ -90,8 +90,9 @@ class MultiAgentConfigUpdate(BaseModel): description="模型参数配置(temperature、max_tokens 等)" ) orchestration_mode: Optional[str] = Field( - None, - pattern="^(sequential|parallel|conditional|loop)$" + default="collaboration", + pattern="^(collaboration|supervisor)$", + description="协作模式:collaboration(协作)| supervisor(监督)" ) sub_agents: Optional[List[SubAgentConfig]] = None routing_rules: Optional[List[RoutingRule]] = None diff --git a/api/app/services/app_service.py b/api/app/services/app_service.py index e15f68fe..046bb9a2 100644 --- a/api/app/services/app_service.py +++ b/api/app/services/app_service.py @@ -275,12 +275,7 @@ class AppService: ) logger.info( - "多智能体配置检查通过", - extra={ - "app_id": str(app_id), - "master_agent_id": str(multi_agent_config.master_agent_id), - "sub_agent_count": len(multi_agent_config.sub_agents) - } + "多智能体配置检查通过" ) def _create_agent_config( diff --git a/api/app/services/collaborative_orchestrator.py b/api/app/services/collaborative_orchestrator.py index bfb54f65..f01b7e01 100644 --- a/api/app/services/collaborative_orchestrator.py +++ b/api/app/services/collaborative_orchestrator.py @@ -537,7 +537,7 @@ class CollaborativeOrchestrator: }) # 提取 usage - if hasattr(response, 'usage_metadata'): + if hasattr(response, 'usage_metadata') and response.usage_metadata: result["usage"] = { "prompt_tokens": response.usage_metadata.get("input_tokens", 0), "completion_tokens": response.usage_metadata.get("output_tokens", 0), diff --git a/api/app/services/multi_agent_handoffs_integration.py b/api/app/services/multi_agent_handoffs_integration.py index d35252e8..38fd681a 100644 --- a/api/app/services/multi_agent_handoffs_integration.py +++ b/api/app/services/multi_agent_handoffs_integration.py @@ -13,16 +13,17 @@ from app.schemas.multi_agent_schema import MultiAgentRunRequest from app.core.logging_config import get_business_logger from app.core.exceptions import BusinessException from app.core.error_codes import BizCode +from app.services.multi_agent_service import MultiAgentService logger = get_business_logger() class MultiAgentHandoffsService: """Multi-Agent Handoffs 服务 - 扩展现有的 Multi-Agent Service""" - - def __init__(self, db: Session, multi_agent_service): + + def __init__(self, db: Session, multi_agent_service:MultiAgentService): """初始化服务 - + Args: db: 数据库会话 multi_agent_service: 现有的 MultiAgentService 实例 @@ -30,25 +31,25 @@ class MultiAgentHandoffsService: self.db = db self.multi_agent_service = multi_agent_service self.handoff_manager = get_handoff_manager() - + logger.info("Multi-Agent Handoffs 服务初始化完成") - + async def run_with_handoffs( self, app_id: uuid.UUID, request: MultiAgentRunRequest ) -> Dict[str, Any]: """运行支持 handoffs 的多 Agent 任务 - + Args: app_id: 应用 ID request: 运行请求 - + Returns: 执行结果 """ start_time = time.time() - + try: # 1. 获取配置 config = self.multi_agent_service.get_config(app_id) @@ -57,23 +58,25 @@ class MultiAgentHandoffsService: "多 Agent 配置不存在", BizCode.RESOURCE_NOT_FOUND ) - + # 2. 检查是否启用 handoffs execution_config = config.execution_config or {} + print("="*50) + print(execution_config) enable_handoffs = execution_config.get("enable_handoffs", False) - + if not enable_handoffs: # 降级到普通模式 logger.info("Handoffs 未启用,使用普通模式") return await self.multi_agent_service.run(app_id, request) - + # 3. 创建协作编排器 orchestrator = CollaborativeOrchestrator( db=self.db, config=config, handoff_manager=self.handoff_manager ) - + # 4. 执行协作 result = await orchestrator.execute_with_handoffs( message=request.message, @@ -81,11 +84,11 @@ class MultiAgentHandoffsService: user_id=request.user_id, variables=request.variables ) - + # 5. 增强结果 result["mode"] = "handoffs" result["elapsed_time"] = time.time() - start_time - + logger.info( "Handoffs 执行完成", extra={ @@ -95,27 +98,27 @@ class MultiAgentHandoffsService: "elapsed_time": result["elapsed_time"] } ) - + return result - + except Exception as e: logger.error(f"Handoffs 执行失败: {str(e)}") - + # 降级到普通模式 logger.info("降级到普通模式") return await self.multi_agent_service.run(app_id, request) - + async def run_stream_with_handoffs( self, app_id: uuid.UUID, request: MultiAgentRunRequest ) -> AsyncGenerator[str, None]: """流式运行支持 handoffs 的多 Agent 任务 - + Args: app_id: 应用 ID request: 运行请求 - + Yields: SSE 格式的事件流 """ @@ -125,24 +128,24 @@ class MultiAgentHandoffsService: if not config: yield f"data: {{\"event\": \"error\", \"error\": \"配置不存在\"}}\n\n" return - + # 2. 检查是否启用 handoffs execution_config = config.execution_config or {} enable_handoffs = execution_config.get("enable_handoffs", False) - + if not enable_handoffs: # 降级到普通流式模式 async for event in self.multi_agent_service.run_stream(app_id, request): yield event return - + # 3. 创建协作编排器 orchestrator = CollaborativeOrchestrator( db=self.db, config=config, handoff_manager=self.handoff_manager ) - + # 4. 流式执行 async for event in orchestrator.execute_stream_with_handoffs( message=request.message, @@ -151,27 +154,27 @@ class MultiAgentHandoffsService: variables=request.variables ): yield event - + except Exception as e: logger.error(f"流式 Handoffs 执行失败: {str(e)}") yield f"data: {{\"event\": \"error\", \"error\": \"{str(e)}\"}}\n\n" - + def get_handoff_history( self, conversation_id: str ) -> Optional[Dict[str, Any]]: """获取会话的 handoff 历史 - + Args: conversation_id: 会话 ID - + Returns: Handoff 历史信息 """ state = self.handoff_manager.get_state(conversation_id) if not state: return None - + return { "conversation_id": state.conversation_id, "current_agent_id": state.current_agent_id, @@ -190,27 +193,27 @@ class MultiAgentHandoffsService: "created_at": state.created_at.isoformat(), "updated_at": state.updated_at.isoformat() } - + def clear_handoff_state(self, conversation_id: str): """清除会话的 handoff 状态 - + Args: conversation_id: 会话 ID """ self.handoff_manager.clear_state(conversation_id) logger.info(f"清除 handoff 状态: {conversation_id}") - + async def test_handoff_routing( self, app_id: uuid.UUID, message: str ) -> Dict[str, Any]: """测试 handoff 路由决策(不实际执行) - + Args: app_id: 应用 ID message: 测试消息 - + Returns: 路由决策结果 """ @@ -221,7 +224,7 @@ class MultiAgentHandoffsService: "多 Agent 配置不存在", BizCode.RESOURCE_NOT_FOUND ) - + # 2. 解析 sub agents sub_agents = {} for agent_data in config.sub_agents: @@ -230,37 +233,37 @@ class MultiAgentHandoffsService: sub_agents[str(agent_id)] = { "info": agent_data } - + # 3. 测试路由 test_conversation_id = f"test-{uuid.uuid4()}" - + # 选择初始 Agent initial_agent_id = None message_lower = message.lower() - + for agent_id, agent_data in sub_agents.items(): agent_info = agent_data.get("info", {}) capabilities = agent_info.get("capabilities", []) role = agent_info.get("role", "") - + keywords = capabilities + ([role] if role else []) for keyword in keywords: if keyword.lower() in message_lower: initial_agent_id = agent_id break - + if initial_agent_id: break - + if not initial_agent_id: initial_agent_id = next(iter(sub_agents.keys())) - + # 4. 生成 handoff 工具 handoff_tools = self.handoff_manager.generate_handoff_tools( initial_agent_id, sub_agents ) - + # 5. 检查是否需要 handoff handoff_suggestion = self.handoff_manager.should_handoff( conversation_id=test_conversation_id, @@ -268,7 +271,7 @@ class MultiAgentHandoffsService: message=message, available_agents=sub_agents ) - + return { "message": message, "initial_agent_id": initial_agent_id, diff --git a/api/app/services/multi_agent_orchestrator.py b/api/app/services/multi_agent_orchestrator.py index fd3ce229..1369e2f4 100644 --- a/api/app/services/multi_agent_orchestrator.py +++ b/api/app/services/multi_agent_orchestrator.py @@ -31,6 +31,10 @@ class MultiAgentOrchestrator: self.config = config self.registry = AgentRegistry(db) + # 兼容处理:旧的 orchestration_mode 值映射到新值 + # collaboration | supervisor 是新值,其他旧值默认使用 supervisor + self._normalized_mode = self._normalize_orchestration_mode(config.orchestration_mode) + # 加载主 Agent # self.master_agent = self._load_agent(config.master_agent_id) # self. config.d @@ -73,10 +77,25 @@ class MultiAgentOrchestrator: extra={ "config_id": str(config.id), "model": self.master_model_config.name, - "sub_agent_count": len(self.sub_agents) + "sub_agent_count": len(self.sub_agents), + "orchestration_mode": self._normalized_mode } ) + def _normalize_orchestration_mode(self, mode: str) -> str: + """标准化 orchestration_mode,兼容旧值 + + Args: + mode: 原始的 orchestration_mode 值 + + Returns: + 标准化后的模式:collaboration 或 supervisor + """ + if mode in [OrchestrationMode.SUPERVISOR, "supervisor"]: + return OrchestrationMode.SUPERVISOR + # 其他所有值(包括旧的 sequential、parallel、conditional、loop 和 collaboration)都映射到 collaboration + return OrchestrationMode.COLLABORATION + async def execute_stream( self, message: str, @@ -108,7 +127,7 @@ class MultiAgentOrchestrator: logger.info( "开始执行多 Agent 任务(流式)", extra={ - "mode": self.config.orchestration_mode, + "mode": self._normalized_mode, "message_length": len(message) } ) @@ -116,7 +135,7 @@ class MultiAgentOrchestrator: try: # 发送开始事件 yield self._format_sse_event("start", { - "mode": self.config.orchestration_mode, + "mode": self._normalized_mode, "timestamp": time.time() }) @@ -125,7 +144,8 @@ class MultiAgentOrchestrator: task_analysis["use_llm_routing"] = use_llm_routing # 2. 根据模式执行(流式) - if self.config.orchestration_mode == OrchestrationMode.CONDITIONAL: + # Supervisor 模式:由主 Agent 统一调度子 Agent + if self._normalized_mode == OrchestrationMode.SUPERVISOR: async for event in self._execute_conditional_stream( task_analysis, conversation_id, @@ -136,63 +156,23 @@ class MultiAgentOrchestrator: user_rag_memory_id ): yield event + # Collaboration 模式:Agent 之间可以相互 handoff(使用 handoffs_service) + elif self._normalized_mode == OrchestrationMode.COLLABORATION: + async for event in self._execute_collaboration_mode_stream( + task_analysis, + conversation_id, + user_id, + web_search, + memory, + storage_type, + user_rag_memory_id + ): + yield event else: - # 其他模式暂时使用非流式执行,然后一次性返回 - if self.config.orchestration_mode == OrchestrationMode.SEQUENTIAL: - results = await self._execute_sequential( - task_analysis, - conversation_id, - user_id, - web_search, - memory, - storage_type, - user_rag_memory_id - ) - elif self.config.orchestration_mode == OrchestrationMode.PARALLEL: - results = await self._execute_parallel( - task_analysis, - conversation_id, - user_id, - web_search, - memory, - storage_type, - user_rag_memory_id - ) - # elif self.config.orchestration_mode == "loop": - # results = await self._execute_loop( - # task_analysis, - # conversation_id, - # user_id, - # web_search, - # memory, - # storage_type, - # user_rag_memory_id - # ) - else: - raise BusinessException( - f"不支持的编排模式: {self.config.orchestration_mode}", - BizCode.INVALID_PARAMETER - ) - - # 整合结果 - final_result = await self._aggregate_results(results) - - # 提取会话 ID - sub_conversation_id = None - if isinstance(results, dict): - sub_conversation_id = results.get("conversation_id") or results.get("result", {}).get("conversation_id") - elif isinstance(results, list) and results: - for item in results: - if "result" in item: - sub_conversation_id = item["result"].get("conversation_id") - if sub_conversation_id: - break - - # 发送消息事件 - yield self._format_sse_event("message", { - "content": final_result, - "conversation_id": sub_conversation_id - }) + raise BusinessException( + f"不支持的编排模式: {self._normalized_mode}", + BizCode.INVALID_PARAMETER + ) elapsed_time = time.time() - start_time @@ -205,7 +185,7 @@ class MultiAgentOrchestrator: logger.info( "多 Agent 任务完成(流式)", extra={ - "mode": self.config.orchestration_mode, + "mode": self._normalized_mode, "elapsed_time": elapsed_time } ) @@ -213,7 +193,7 @@ class MultiAgentOrchestrator: except Exception as e: logger.error( "多 Agent 任务执行失败(流式)", - extra={"error": str(e), "mode": self.config.orchestration_mode} + extra={"error": str(e), "mode": self._normalized_mode} ) # 发送错误事件 yield self._format_sse_event("error", { @@ -247,10 +227,23 @@ class MultiAgentOrchestrator: logger.info( "开始执行多 Agent 任务", - extra={"message_length": len(message)} + extra={ + "message_length": len(message), + "mode": self._normalized_mode + } ) try: + # Collaboration 模式:使用 handoffs_service + if self._normalized_mode == OrchestrationMode.COLLABORATION: + return await self._execute_collaboration_mode( + message, + conversation_id, + user_id, + variables + ) + + # Supervisor 模式:由 Master Agent 统一调度 # 1. Master Agent 分析任务并做出决策 task_analysis = await self._analyze_task(message, variables) @@ -1411,6 +1404,162 @@ class MultiAgentOrchestrator: return self._merge_results(results) + async def _execute_collaboration_mode_stream( + self, + task_analysis: Dict[str, Any], + conversation_id: Optional[uuid.UUID], + user_id: Optional[str], + web_search: bool = False, + memory: bool = True, + storage_type: str = '', + user_rag_memory_id: str = '' + ): + """Collaboration 模式流式执行 - Agent 之间可以相互 handoff + + 使用 handoffs_service 实现 Agent 之间的动态切换 + + Args: + task_analysis: 任务分析结果 + conversation_id: 会话 ID + user_id: 用户 ID + web_search: 是否启用网络搜索 + memory: 是否启用记忆 + storage_type: 存储类型 + user_rag_memory_id: RAG 记忆 ID + + Yields: + SSE 格式的事件流 + """ + from app.services.handoffs_service import ( + convert_multi_agent_config_to_handoffs, + HandoffsService + ) + + message = task_analysis.get("message", "") + + try: + # 1. 构建 multi_agent_config 字典 + multi_agent_config = { + "sub_agents": self.config.sub_agents, + "default_model_config_id": self.config.default_model_config_id, + "model_parameters": self.config.model_parameters, + "orchestration_mode": self.config.orchestration_mode + } + + # 2. 转换配置 + agent_configs, model_config = convert_multi_agent_config_to_handoffs( + multi_agent_config, + self.db + ) + + if not agent_configs: + raise BusinessException("没有可用的子 Agent", BizCode.AGENT_CONFIG_MISSING) + + if not model_config: + raise BusinessException("没有配置模型", BizCode.AGENT_CONFIG_MISSING) + + # 3. 创建 HandoffsService + handoffs_service = HandoffsService( + agent_configs=agent_configs, + model_config=model_config, + streaming=True + ) + + # 4. 使用 handoffs_service 的流式聊天 + conv_id = str(conversation_id) if conversation_id else None + + async for event in handoffs_service.chat_stream( + message=message, + conversation_id=conv_id + ): + # handoffs_service 返回的已经是 SSE 格式,直接 yield + yield event + + except Exception as e: + logger.error(f"Collaboration 模式执行失败: {str(e)}", exc_info=True) + yield self._format_sse_event("error", { + "error": str(e), + "timestamp": time.time() + }) + + async def _execute_collaboration_mode( + self, + message: str, + conversation_id: Optional[uuid.UUID], + user_id: Optional[str], + variables: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """Collaboration 模式非流式执行 - Agent 之间可以相互 handoff + + 使用 handoffs_service 实现 Agent 之间的动态切换 + + Args: + message: 用户消息 + conversation_id: 会话 ID + user_id: 用户 ID + variables: 变量参数 + + Returns: + 执行结果 + """ + from app.services.handoffs_service import ( + convert_multi_agent_config_to_handoffs, + HandoffsService + ) + + start_time = time.time() + + try: + # 1. 构建 multi_agent_config 字典 + multi_agent_config = { + "sub_agents": self.config.sub_agents, + "default_model_config_id": self.config.default_model_config_id, + "model_parameters": self.config.model_parameters, + "orchestration_mode": self.config.orchestration_mode + } + + # 2. 转换配置 + agent_configs, model_config = convert_multi_agent_config_to_handoffs( + multi_agent_config, + self.db + ) + + if not agent_configs: + raise BusinessException("没有可用的子 Agent", BizCode.AGENT_CONFIG_MISSING) + + if not model_config: + raise BusinessException("没有配置模型", BizCode.AGENT_CONFIG_MISSING) + + # 3. 创建 HandoffsService + handoffs_service = HandoffsService( + agent_configs=agent_configs, + model_config=model_config, + streaming=False + ) + + # 4. 使用 handoffs_service 的非流式聊天 + conv_id = str(conversation_id) if conversation_id else None + + result = await handoffs_service.chat( + message=message, + conversation_id=conv_id + ) + + elapsed_time = time.time() - start_time + + return { + "message": result.get("response", ""), + "conversation_id": result.get("conversation_id"), + "elapsed_time": elapsed_time, + "strategy": "collaboration", + "active_agent": result.get("active_agent"), + "sub_results": result + } + + except Exception as e: + logger.error(f"Collaboration 模式执行失败: {str(e)}", exc_info=True) + raise + def _format_sse_event(self, event: str, data: Dict[str, Any]) -> str: """格式化 SSE 事件