[modify] multi agent orchestrator
This commit is contained in:
@@ -31,6 +31,10 @@ class MultiAgentOrchestrator:
|
||||
self.config = config
|
||||
self.registry = AgentRegistry(db)
|
||||
|
||||
# 兼容处理:旧的 orchestration_mode 值映射到新值
|
||||
# collaboration | supervisor 是新值,其他旧值默认使用 supervisor
|
||||
self._normalized_mode = self._normalize_orchestration_mode(config.orchestration_mode)
|
||||
|
||||
# 加载主 Agent
|
||||
# self.master_agent = self._load_agent(config.master_agent_id)
|
||||
# self. config.d
|
||||
@@ -73,10 +77,25 @@ class MultiAgentOrchestrator:
|
||||
extra={
|
||||
"config_id": str(config.id),
|
||||
"model": self.master_model_config.name,
|
||||
"sub_agent_count": len(self.sub_agents)
|
||||
"sub_agent_count": len(self.sub_agents),
|
||||
"orchestration_mode": self._normalized_mode
|
||||
}
|
||||
)
|
||||
|
||||
def _normalize_orchestration_mode(self, mode: str) -> str:
|
||||
"""标准化 orchestration_mode,兼容旧值
|
||||
|
||||
Args:
|
||||
mode: 原始的 orchestration_mode 值
|
||||
|
||||
Returns:
|
||||
标准化后的模式:collaboration 或 supervisor
|
||||
"""
|
||||
if mode in [OrchestrationMode.SUPERVISOR, "supervisor"]:
|
||||
return OrchestrationMode.SUPERVISOR
|
||||
# 其他所有值(包括旧的 sequential、parallel、conditional、loop 和 collaboration)都映射到 collaboration
|
||||
return OrchestrationMode.COLLABORATION
|
||||
|
||||
async def execute_stream(
|
||||
self,
|
||||
message: str,
|
||||
@@ -108,7 +127,7 @@ class MultiAgentOrchestrator:
|
||||
logger.info(
|
||||
"开始执行多 Agent 任务(流式)",
|
||||
extra={
|
||||
"mode": self.config.orchestration_mode,
|
||||
"mode": self._normalized_mode,
|
||||
"message_length": len(message)
|
||||
}
|
||||
)
|
||||
@@ -116,7 +135,7 @@ class MultiAgentOrchestrator:
|
||||
try:
|
||||
# 发送开始事件
|
||||
yield self._format_sse_event("start", {
|
||||
"mode": self.config.orchestration_mode,
|
||||
"mode": self._normalized_mode,
|
||||
"timestamp": time.time()
|
||||
})
|
||||
|
||||
@@ -125,7 +144,8 @@ class MultiAgentOrchestrator:
|
||||
task_analysis["use_llm_routing"] = use_llm_routing
|
||||
|
||||
# 2. 根据模式执行(流式)
|
||||
if self.config.orchestration_mode == OrchestrationMode.CONDITIONAL:
|
||||
# Supervisor 模式:由主 Agent 统一调度子 Agent
|
||||
if self._normalized_mode == OrchestrationMode.SUPERVISOR:
|
||||
async for event in self._execute_conditional_stream(
|
||||
task_analysis,
|
||||
conversation_id,
|
||||
@@ -136,63 +156,23 @@ class MultiAgentOrchestrator:
|
||||
user_rag_memory_id
|
||||
):
|
||||
yield event
|
||||
# Collaboration 模式:Agent 之间可以相互 handoff(使用 handoffs_service)
|
||||
elif self._normalized_mode == OrchestrationMode.COLLABORATION:
|
||||
async for event in self._execute_collaboration_mode_stream(
|
||||
task_analysis,
|
||||
conversation_id,
|
||||
user_id,
|
||||
web_search,
|
||||
memory,
|
||||
storage_type,
|
||||
user_rag_memory_id
|
||||
):
|
||||
yield event
|
||||
else:
|
||||
# 其他模式暂时使用非流式执行,然后一次性返回
|
||||
if self.config.orchestration_mode == OrchestrationMode.SEQUENTIAL:
|
||||
results = await self._execute_sequential(
|
||||
task_analysis,
|
||||
conversation_id,
|
||||
user_id,
|
||||
web_search,
|
||||
memory,
|
||||
storage_type,
|
||||
user_rag_memory_id
|
||||
)
|
||||
elif self.config.orchestration_mode == OrchestrationMode.PARALLEL:
|
||||
results = await self._execute_parallel(
|
||||
task_analysis,
|
||||
conversation_id,
|
||||
user_id,
|
||||
web_search,
|
||||
memory,
|
||||
storage_type,
|
||||
user_rag_memory_id
|
||||
)
|
||||
# elif self.config.orchestration_mode == "loop":
|
||||
# results = await self._execute_loop(
|
||||
# task_analysis,
|
||||
# conversation_id,
|
||||
# user_id,
|
||||
# web_search,
|
||||
# memory,
|
||||
# storage_type,
|
||||
# user_rag_memory_id
|
||||
# )
|
||||
else:
|
||||
raise BusinessException(
|
||||
f"不支持的编排模式: {self.config.orchestration_mode}",
|
||||
BizCode.INVALID_PARAMETER
|
||||
)
|
||||
|
||||
# 整合结果
|
||||
final_result = await self._aggregate_results(results)
|
||||
|
||||
# 提取会话 ID
|
||||
sub_conversation_id = None
|
||||
if isinstance(results, dict):
|
||||
sub_conversation_id = results.get("conversation_id") or results.get("result", {}).get("conversation_id")
|
||||
elif isinstance(results, list) and results:
|
||||
for item in results:
|
||||
if "result" in item:
|
||||
sub_conversation_id = item["result"].get("conversation_id")
|
||||
if sub_conversation_id:
|
||||
break
|
||||
|
||||
# 发送消息事件
|
||||
yield self._format_sse_event("message", {
|
||||
"content": final_result,
|
||||
"conversation_id": sub_conversation_id
|
||||
})
|
||||
raise BusinessException(
|
||||
f"不支持的编排模式: {self._normalized_mode}",
|
||||
BizCode.INVALID_PARAMETER
|
||||
)
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
@@ -205,7 +185,7 @@ class MultiAgentOrchestrator:
|
||||
logger.info(
|
||||
"多 Agent 任务完成(流式)",
|
||||
extra={
|
||||
"mode": self.config.orchestration_mode,
|
||||
"mode": self._normalized_mode,
|
||||
"elapsed_time": elapsed_time
|
||||
}
|
||||
)
|
||||
@@ -213,7 +193,7 @@ class MultiAgentOrchestrator:
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"多 Agent 任务执行失败(流式)",
|
||||
extra={"error": str(e), "mode": self.config.orchestration_mode}
|
||||
extra={"error": str(e), "mode": self._normalized_mode}
|
||||
)
|
||||
# 发送错误事件
|
||||
yield self._format_sse_event("error", {
|
||||
@@ -247,10 +227,23 @@ class MultiAgentOrchestrator:
|
||||
|
||||
logger.info(
|
||||
"开始执行多 Agent 任务",
|
||||
extra={"message_length": len(message)}
|
||||
extra={
|
||||
"message_length": len(message),
|
||||
"mode": self._normalized_mode
|
||||
}
|
||||
)
|
||||
|
||||
try:
|
||||
# Collaboration 模式:使用 handoffs_service
|
||||
if self._normalized_mode == OrchestrationMode.COLLABORATION:
|
||||
return await self._execute_collaboration_mode(
|
||||
message,
|
||||
conversation_id,
|
||||
user_id,
|
||||
variables
|
||||
)
|
||||
|
||||
# Supervisor 模式:由 Master Agent 统一调度
|
||||
# 1. Master Agent 分析任务并做出决策
|
||||
task_analysis = await self._analyze_task(message, variables)
|
||||
|
||||
@@ -1411,6 +1404,162 @@ class MultiAgentOrchestrator:
|
||||
|
||||
return self._merge_results(results)
|
||||
|
||||
async def _execute_collaboration_mode_stream(
|
||||
self,
|
||||
task_analysis: Dict[str, Any],
|
||||
conversation_id: Optional[uuid.UUID],
|
||||
user_id: Optional[str],
|
||||
web_search: bool = False,
|
||||
memory: bool = True,
|
||||
storage_type: str = '',
|
||||
user_rag_memory_id: str = ''
|
||||
):
|
||||
"""Collaboration 模式流式执行 - Agent 之间可以相互 handoff
|
||||
|
||||
使用 handoffs_service 实现 Agent 之间的动态切换
|
||||
|
||||
Args:
|
||||
task_analysis: 任务分析结果
|
||||
conversation_id: 会话 ID
|
||||
user_id: 用户 ID
|
||||
web_search: 是否启用网络搜索
|
||||
memory: 是否启用记忆
|
||||
storage_type: 存储类型
|
||||
user_rag_memory_id: RAG 记忆 ID
|
||||
|
||||
Yields:
|
||||
SSE 格式的事件流
|
||||
"""
|
||||
from app.services.handoffs_service import (
|
||||
convert_multi_agent_config_to_handoffs,
|
||||
HandoffsService
|
||||
)
|
||||
|
||||
message = task_analysis.get("message", "")
|
||||
|
||||
try:
|
||||
# 1. 构建 multi_agent_config 字典
|
||||
multi_agent_config = {
|
||||
"sub_agents": self.config.sub_agents,
|
||||
"default_model_config_id": self.config.default_model_config_id,
|
||||
"model_parameters": self.config.model_parameters,
|
||||
"orchestration_mode": self.config.orchestration_mode
|
||||
}
|
||||
|
||||
# 2. 转换配置
|
||||
agent_configs, model_config = convert_multi_agent_config_to_handoffs(
|
||||
multi_agent_config,
|
||||
self.db
|
||||
)
|
||||
|
||||
if not agent_configs:
|
||||
raise BusinessException("没有可用的子 Agent", BizCode.AGENT_CONFIG_MISSING)
|
||||
|
||||
if not model_config:
|
||||
raise BusinessException("没有配置模型", BizCode.AGENT_CONFIG_MISSING)
|
||||
|
||||
# 3. 创建 HandoffsService
|
||||
handoffs_service = HandoffsService(
|
||||
agent_configs=agent_configs,
|
||||
model_config=model_config,
|
||||
streaming=True
|
||||
)
|
||||
|
||||
# 4. 使用 handoffs_service 的流式聊天
|
||||
conv_id = str(conversation_id) if conversation_id else None
|
||||
|
||||
async for event in handoffs_service.chat_stream(
|
||||
message=message,
|
||||
conversation_id=conv_id
|
||||
):
|
||||
# handoffs_service 返回的已经是 SSE 格式,直接 yield
|
||||
yield event
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Collaboration 模式执行失败: {str(e)}", exc_info=True)
|
||||
yield self._format_sse_event("error", {
|
||||
"error": str(e),
|
||||
"timestamp": time.time()
|
||||
})
|
||||
|
||||
async def _execute_collaboration_mode(
|
||||
self,
|
||||
message: str,
|
||||
conversation_id: Optional[uuid.UUID],
|
||||
user_id: Optional[str],
|
||||
variables: Optional[Dict[str, Any]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Collaboration 模式非流式执行 - Agent 之间可以相互 handoff
|
||||
|
||||
使用 handoffs_service 实现 Agent 之间的动态切换
|
||||
|
||||
Args:
|
||||
message: 用户消息
|
||||
conversation_id: 会话 ID
|
||||
user_id: 用户 ID
|
||||
variables: 变量参数
|
||||
|
||||
Returns:
|
||||
执行结果
|
||||
"""
|
||||
from app.services.handoffs_service import (
|
||||
convert_multi_agent_config_to_handoffs,
|
||||
HandoffsService
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# 1. 构建 multi_agent_config 字典
|
||||
multi_agent_config = {
|
||||
"sub_agents": self.config.sub_agents,
|
||||
"default_model_config_id": self.config.default_model_config_id,
|
||||
"model_parameters": self.config.model_parameters,
|
||||
"orchestration_mode": self.config.orchestration_mode
|
||||
}
|
||||
|
||||
# 2. 转换配置
|
||||
agent_configs, model_config = convert_multi_agent_config_to_handoffs(
|
||||
multi_agent_config,
|
||||
self.db
|
||||
)
|
||||
|
||||
if not agent_configs:
|
||||
raise BusinessException("没有可用的子 Agent", BizCode.AGENT_CONFIG_MISSING)
|
||||
|
||||
if not model_config:
|
||||
raise BusinessException("没有配置模型", BizCode.AGENT_CONFIG_MISSING)
|
||||
|
||||
# 3. 创建 HandoffsService
|
||||
handoffs_service = HandoffsService(
|
||||
agent_configs=agent_configs,
|
||||
model_config=model_config,
|
||||
streaming=False
|
||||
)
|
||||
|
||||
# 4. 使用 handoffs_service 的非流式聊天
|
||||
conv_id = str(conversation_id) if conversation_id else None
|
||||
|
||||
result = await handoffs_service.chat(
|
||||
message=message,
|
||||
conversation_id=conv_id
|
||||
)
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
return {
|
||||
"message": result.get("response", ""),
|
||||
"conversation_id": result.get("conversation_id"),
|
||||
"elapsed_time": elapsed_time,
|
||||
"strategy": "collaboration",
|
||||
"active_agent": result.get("active_agent"),
|
||||
"sub_results": result
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Collaboration 模式执行失败: {str(e)}", exc_info=True)
|
||||
raise
|
||||
|
||||
def _format_sse_event(self, event: str, data: Dict[str, Any]) -> str:
|
||||
"""格式化 SSE 事件
|
||||
|
||||
|
||||
Reference in New Issue
Block a user