From 9edfd6c16f64338e726a6a828e732bc7c99ac7d4 Mon Sep 17 00:00:00 2001 From: Mark Date: Sat, 27 Dec 2025 13:57:03 +0800 Subject: [PATCH] [modify] manage multi agent logic --- api/app/controllers/multi_agent_controller.py | 957 +++++++++--------- api/app/schemas/multi_agent_schema.py | 24 +- api/app/services/master_agent_router.py | 194 ++-- api/app/services/multi_agent_orchestrator.py | 883 ++++++++-------- api/app/services/multi_agent_service.py | 517 +++++----- 5 files changed, 1284 insertions(+), 1291 deletions(-) diff --git a/api/app/controllers/multi_agent_controller.py b/api/app/controllers/multi_agent_controller.py index d78aa67b..a4c1d62c 100644 --- a/api/app/controllers/multi_agent_controller.py +++ b/api/app/controllers/multi_agent_controller.py @@ -1,5 +1,7 @@ """多 Agent 控制器""" import uuid +from typing import Annotated + from fastapi import APIRouter, Depends, Query, Path from sqlalchemy.orm import Session @@ -7,9 +9,9 @@ from app.db import get_db from app.dependencies import get_current_user from app.core.response_utils import success from app.core.logging_config import get_business_logger -from app.schemas import multi_agent_schema +from app.schemas import multi_agent_schema, MultiAgentConfigUpdate, MultiAgentConfigSchema from app.schemas.response_schema import PageData, PageMeta -from app.services.multi_agent_service import MultiAgentService +from app.services.multi_agent_service import MultiAgentService, get_multi_agent_service from app.models import User router = APIRouter(prefix="/apps", tags=["Multi-Agent"]) @@ -18,36 +20,35 @@ logger = get_business_logger() # ==================== 多 Agent 配置管理 ==================== -@router.post( - "/{app_id}/multi-agent", - summary="创建多 Agent 配置" -) -def create_multi_agent_config( - app_id: uuid.UUID = Path(..., description="应用 ID"), - data: multi_agent_schema.MultiAgentConfigCreate = ..., - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db), -): - """创建多 Agent 配置 - - 支持四种编排模式: - - sequential: 顺序执行 - - parallel: 并行执行 - - conditional: 条件路由 - - loop: 循环执行 - """ - service = MultiAgentService(db) - config = service.create_config( - app_id=app_id, - data=data, - created_by=current_user.id - ) - - return success( - data=multi_agent_schema.MultiAgentConfigSchema.model_validate(config), - msg="多 Agent 配置创建成功" - ) +# @router.post( +# "/{app_id}/multi-agent", +# summary="创建多 Agent 配置" +# ) +# def create_multi_agent_config( +# app_id: uuid.UUID = Path(..., description="应用 ID"), +# data: multi_agent_schema.MultiAgentConfigCreate = ..., +# current_user: User = Depends(get_current_user), +# db: Session = Depends(get_db), +# ): +# """创建多 Agent 配置 +# 支持四种编排模式: +# - sequential: 顺序执行 +# - parallel: 并行执行 +# - conditional: 条件路由 +# - loop: 循环执行 +# """ +# service = MultiAgentService(db) +# config = service.create_config( +# app_id=app_id, +# data=data, +# created_by=current_user.id +# ) + +# return success( +# data=multi_agent_schema.MultiAgentConfigSchema.model_validate(config), +# msg="多 Agent 配置创建成功" +# ) @router.get( @@ -61,16 +62,16 @@ def get_multi_agent_configs( ): """获取指定应用的最新有效多 Agent 配置,如果不存在则返回默认模板""" service = MultiAgentService(db) - + # 通过 app_id 获取最新有效配置(已转换 agent_id 为 app_id) config = service.get_multi_agent_configs(app_id) - + if not config: # 返回默认模板 default_template = { "app_id": str(app_id), - "master_agent_id": None, - "master_agent_name": None, + "default_model_config_id": None, + "model_parameters": None, "orchestration_mode": "conditional", "sub_agents": [], "routing_rules": [], @@ -86,7 +87,7 @@ def get_multi_agent_configs( data=default_template, msg="该应用暂无配置,返回默认模板" ) - + # config 已经是字典格式,直接返回 return success(data=config) @@ -96,464 +97,464 @@ def get_multi_agent_configs( ) def update_multi_agent_config( app_id: uuid.UUID = Path(..., description="应用 ID"), - data: multi_agent_schema.MultiAgentConfigUpdate = ..., + data: MultiAgentConfigUpdate = ..., current_user: User = Depends(get_current_user), db: Session = Depends(get_db), + multi_agent_service: Annotated[MultiAgentService, Depends(get_multi_agent_service)] = None, ): """更新多 Agent 配置""" - service = MultiAgentService(db) - config = service.update_config(app_id, data) - + config = multi_agent_service.update_config(app_id, data) + print("="*50) + print(config.default_model_config_id) return success( - data=multi_agent_schema.MultiAgentConfigSchema.model_validate(config), + data=MultiAgentConfigSchema.model_validate(config), msg="多 Agent 配置更新成功" ) -@router.delete( - "/{app_id}/multi-agent", - summary="删除多 Agent 配置" -) -def delete_multi_agent_config( - app_id: uuid.UUID = Path(..., description="应用 ID"), - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db), -): - """删除多 Agent 配置""" - service = MultiAgentService(db) - service.delete_config(app_id) - - return success(msg="多 Agent 配置删除成功") +# @router.delete( +# "/{app_id}/multi-agent", +# summary="删除多 Agent 配置" +# ) +# def delete_multi_agent_config( +# app_id: uuid.UUID = Path(..., description="应用 ID"), +# current_user: User = Depends(get_current_user), +# db: Session = Depends(get_db), +# ): +# """删除多 Agent 配置""" +# service = MultiAgentService(db) +# service.delete_config(app_id) + +# return success(msg="多 Agent 配置删除成功") # ==================== 多 Agent 运行 ==================== -@router.post( - "/{app_id}/multi-agent/run", - summary="运行多 Agent 任务" -) -async def run_multi_agent( - app_id: uuid.UUID = Path(..., description="应用 ID"), - request: multi_agent_schema.MultiAgentRunRequest = ..., - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db), -): - """运行多 Agent 任务 - - 根据配置的编排模式执行多个 Agent: - - sequential: 按优先级顺序执行 - - parallel: 并行执行所有 Agent - - conditional: 根据条件选择 Agent - - loop: 循环执行直到满足条件 - """ - service = MultiAgentService(db) - result = await service.run(app_id, request) - - return success( - data=multi_agent_schema.MultiAgentRunResponse(**result), - msg="多 Agent 任务执行成功" - ) +# @router.post( +# "/{app_id}/multi-agent/run", +# summary="运行多 Agent 任务" +# ) +# async def run_multi_agent( +# app_id: uuid.UUID = Path(..., description="应用 ID"), +# request: multi_agent_schema.MultiAgentRunRequest = ..., +# current_user: User = Depends(get_current_user), +# db: Session = Depends(get_db), +# ): +# """运行多 Agent 任务 + +# 根据配置的编排模式执行多个 Agent: +# - sequential: 按优先级顺序执行 +# - parallel: 并行执行所有 Agent +# - conditional: 根据条件选择 Agent +# """ +# service = MultiAgentService(db) +# result = await service.run(app_id, request) + +# return success( +# data=multi_agent_schema.MultiAgentRunResponse(**result), +# msg="多 Agent 任务执行成功" +# ) -# ==================== 智能路由测试 ==================== +# # ==================== 智能路由测试 ==================== -@router.post( - "/{app_id}/multi-agent/test-routing", - summary="测试智能路由(支持 Master Agent 模式)" -) -async def test_routing( - app_id: uuid.UUID = Path(..., description="应用 ID"), - request: multi_agent_schema.RoutingTestRequest = ..., - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db), -): - """测试智能路由功能(重构版 - 支持 Master Agent) - - 支持三种路由模式: - - master_agent: 使用 Master Agent 决策(推荐) - - llm_router: 使用旧 LLM 路由器(向后兼容) - - rule_only: 仅使用规则路由(最快) - - 参数: - - message: 测试消息 - - conversation_id: 会话 ID(可选) - - routing_model_id: 路由模型 ID(可选) - - use_llm: 是否启用 LLM(默认 False) - - keyword_threshold: 关键词置信度阈值(默认 0.8) - - force_new: 是否强制重新路由(默认 False) - """ - from app.services.conversation_state_manager import ConversationStateManager - from app.services.llm_router import LLMRouter - from app.models import ModelConfig - - # 1. 获取多 Agent 配置 - service = MultiAgentService(db) - config = service.get_config(app_id) - - if not config: - return success( - data=None, - msg="应用未配置多 Agent,无法测试路由" - ) - - # 2. 准备子 Agent 信息 - sub_agents = {} - for sub_agent_info in config.sub_agents: - agent_id = sub_agent_info["agent_id"] - sub_agents[agent_id] = { - "name": sub_agent_info.get("name", agent_id), - "role": sub_agent_info.get("role", "") - } - - # 3. 获取路由模型(如果指定) - routing_model = None - if request.routing_model_id: - routing_model = db.get(ModelConfig, request.routing_model_id) - if not routing_model: - return success( - data=None, - msg=f"路由模型不存在: {request.routing_model_id}" - ) - - # 4. 初始化路由器 - state_manager = ConversationStateManager() - router = LLMRouter( - db=db, - state_manager=state_manager, - routing_rules=config.routing_rules or [], - sub_agents=sub_agents, - routing_model_config=routing_model, - use_llm=request.use_llm and routing_model is not None - ) - - # 5. 设置阈值 - if request.keyword_threshold: - router.keyword_high_confidence_threshold = request.keyword_threshold - - # 6. 执行路由 - try: - routing_result = await router.route( - message=request.message, - conversation_id=str(request.conversation_id) if request.conversation_id else None, - force_new=request.force_new - ) - - # 7. 获取 Agent 信息 - agent_id = routing_result["agent_id"] - agent_info = sub_agents.get(agent_id, {}) - - # 8. 构建响应 - response_data = { - "message": request.message, - "routing_result": { - "agent_id": agent_id, - "agent_name": agent_info.get("name", agent_id), - "agent_role": agent_info.get("role", ""), - "confidence": routing_result["confidence"], - "strategy": routing_result["strategy"], - "topic": routing_result["topic"], - "topic_changed": routing_result["topic_changed"], - "reason": routing_result["reason"], - "routing_method": routing_result["routing_method"] - }, - "cmulti-agent/batch-test-routingonfig_info": { - "use_llm": request.use_llm and routing_model is not None, - "routing_model": routing_model.name if routing_model else None, - "keyword_threshold": router.keyword_high_confidence_threshold, - "total_sub_agents": len(sub_agents) - } - } - - return success( - data=response_data, - msg="路由测试成功" - ) - - except Exception as e: - logger.error(f"路由测试失败: {str(e)}") - return success( - data=None, - msg=f"路由测试失败: {str(e)}" - ) +# @router.post( +# "/{app_id}/multi-agent/test-routing", +# summary="测试智能路由(支持 Master Agent 模式)" +# ) +# async def test_routing( +# app_id: uuid.UUID = Path(..., description="应用 ID"), +# request: multi_agent_schema.RoutingTestRequest = ..., +# current_user: User = Depends(get_current_user), +# db: Session = Depends(get_db), +# ): +# """测试智能路由功能(重构版 - 支持 Master Agent) + +# 支持三种路由模式: +# - master_agent: 使用 Master Agent 决策(推荐) +# - llm_router: 使用旧 LLM 路由器(向后兼容) +# - rule_only: 仅使用规则路由(最快) + +# 参数: +# - message: 测试消息 +# - conversation_id: 会话 ID(可选) +# - routing_model_id: 路由模型 ID(可选) +# - use_llm: 是否启用 LLM(默认 False) +# - keyword_threshold: 关键词置信度阈值(默认 0.8) +# - force_new: 是否强制重新路由(默认 False) +# """ +# from app.services.conversation_state_manager import ConversationStateManager +# from app.services.llm_router import LLMRouter +# from app.models import ModelConfig + +# # 1. 获取多 Agent 配置 +# service = MultiAgentService(db) +# config = service.get_config(app_id) + +# if not config: +# return success( +# data=None, +# msg="应用未配置多 Agent,无法测试路由" +# ) + +# # 2. 准备子 Agent 信息 +# sub_agents = {} +# for sub_agent_info in config.sub_agents: +# agent_id = sub_agent_info["agent_id"] +# sub_agents[agent_id] = { +# "name": sub_agent_info.get("name", agent_id), +# "role": sub_agent_info.get("role", "") +# } + +# # 3. 获取路由模型(如果指定) +# routing_model = None +# if request.routing_model_id: +# routing_model = db.get(ModelConfig, request.routing_model_id) +# if not routing_model: +# return success( +# data=None, +# msg=f"路由模型不存在: {request.routing_model_id}" +# ) + +# # 4. 初始化路由器 +# state_manager = ConversationStateManager() +# router = LLMRouter( +# db=db, +# state_manager=state_manager, +# routing_rules=config.routing_rules or [], +# sub_agents=sub_agents, +# routing_model_config=routing_model, +# use_llm=request.use_llm and routing_model is not None +# ) + +# # 5. 设置阈值 +# if request.keyword_threshold: +# router.keyword_high_confidence_threshold = request.keyword_threshold + +# # 6. 执行路由 +# try: +# routing_result = await router.route( +# message=request.message, +# conversation_id=str(request.conversation_id) if request.conversation_id else None, +# force_new=request.force_new +# ) + +# # 7. 获取 Agent 信息 +# agent_id = routing_result["agent_id"] +# agent_info = sub_agents.get(agent_id, {}) + +# # 8. 构建响应 +# response_data = { +# "message": request.message, +# "routing_result": { +# "agent_id": agent_id, +# "agent_name": agent_info.get("name", agent_id), +# "agent_role": agent_info.get("role", ""), +# "confidence": routing_result["confidence"], +# "strategy": routing_result["strategy"], +# "topic": routing_result["topic"], +# "topic_changed": routing_result["topic_changed"], +# "reason": routing_result["reason"], +# "routing_method": routing_result["routing_method"] +# }, +# "cmulti-agent/batch-test-routingonfig_info": { +# "use_llm": request.use_llm and routing_model is not None, +# "routing_model": routing_model.name if routing_model else None, +# "keyword_threshold": router.keyword_high_confidence_threshold, +# "total_sub_agents": len(sub_agents) +# } +# } + +# return success( +# data=response_data, +# msg="路由测试成功" +# ) + +# except Exception as e: +# logger.error(f"路由测试失败: {str(e)}") +# return success( +# data=None, +# msg=f"路由测试失败: {str(e)}" +# ) -@router.post( - "/{app_id}/multi-agent/test-master-agent", - summary="测试 Master Agent 决策" -) -async def test_master_agent( - app_id: uuid.UUID = Path(..., description="应用 ID"), - request: multi_agent_schema.RoutingTestRequest = ..., - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db), -): - """测试 Master Agent 的路由决策能力 - - 这个接口专门用于测试新的 Master Agent 路由器, - 可以看到 Master Agent 的完整决策过程。 - - 返回信息包括: - - 选中的 Agent - - 置信度 - - 决策理由 - - 是否需要协作 - - 路由策略(master_agent / rule_fast_path / fallback) - """ - from app.services.conversation_state_manager import ConversationStateManager - from app.services.master_agent_router import MasterAgentRouter - from app.models import ModelConfig - - # 1. 获取多 Agent 配置 - service = MultiAgentService(db) - config = service.get_config(app_id) - - if not config: - return success( - data=None, - msg="应用未配置多 Agent,无法测试" - ) - - # 2. 加载 Master Agent - from app.models import AppRelease, App - - master_release = db.get(AppRelease, config.master_agent_id) - if not master_release: - return success( - data=None, - msg=f"Master Agent 发布版本不存在: {config.master_agent_id}" - ) - - # 获取应用信息 - app = db.get(App, master_release.app_id) - if not app: - return success( - data=None, - msg=f"应用不存在: {master_release.app_id}" - ) - - # 创建 Master Agent 代理对象 - class AgentConfigProxy: - def __init__(self, release, app, config_data): - self.id = release.id - self.app_id = release.app_id - self.app = app - self.name = release.name - self.description = release.description - self.system_prompt = config_data.get("system_prompt") - self.default_model_config_id = release.default_model_config_id - - config_data = master_release.config or {} - master_agent_config = AgentConfigProxy(master_release, app, config_data) - - # 3. 获取 Master Agent 的模型配置 - master_model_config = db.get(ModelConfig, master_agent_config.default_model_config_id) - if not master_model_config: - return success( - data=None, - msg=f"Master Agent 模型配置不存在: {master_agent_config.default_model_config_id}" - ) - - # 4. 准备子 Agent 信息 - sub_agents = {} - for sub_agent_info in config.sub_agents: - agent_id = sub_agent_info["agent_id"] - - # 加载子 Agent - sub_release = db.get(AppRelease, uuid.UUID(agent_id)) - if sub_release: - sub_app = db.get(App, sub_release.app_id) - sub_config_data = sub_release.config or {} - sub_agent_config = AgentConfigProxy(sub_release, sub_app, sub_config_data) - - sub_agents[agent_id] = { - "config": sub_agent_config, - "info": sub_agent_info - } - - # 5. 初始化 Master Agent 路由器 - state_manager = ConversationStateManager() - router = MasterAgentRouter( - db=db, - master_agent_config=master_agent_config, - master_model_config=master_model_config, - sub_agents=sub_agents, - state_manager=state_manager, - enable_rule_fast_path=True - ) - - # 6. 执行路由决策 - try: - decision = await router.route( - message=request.message, - conversation_id=str(request.conversation_id) if request.conversation_id else None, - variables=None - ) - - # 7. 获取选中的 Agent 信息 - agent_id = decision["selected_agent_id"] - agent_info = sub_agents.get(agent_id, {}).get("info", {}) - - # 8. 构建响应 - response_data = { - "message": request.message, - "master_agent": { - "name": master_agent_config.name, - "model": master_model_config.name - }, - "decision": { - "selected_agent_id": agent_id, - "selected_agent_name": agent_info.get("name", "未知"), - "selected_agent_role": agent_info.get("role", ""), - "confidence": decision["confidence"], - "reasoning": decision.get("reasoning", ""), - "topic": decision.get("topic", ""), - "strategy": decision["strategy"], - "routing_method": decision.get("routing_method", ""), - "need_collaboration": decision.get("need_collaboration", False), - "collaboration_agents": decision.get("collaboration_agents", []) - }, - "config_info": { - "total_sub_agents": len(sub_agents), - "enable_rule_fast_path": True - } - } - - return success( - data=response_data, - msg="Master Agent 决策测试成功" - ) - - except Exception as e: - logger.error(f"Master Agent 决策测试失败: {str(e)}") - return success( - data=None, - msg=f"测试失败: {str(e)}" - ) +# @router.post( +# "/{app_id}/multi-agent/test-master-agent", +# summary="测试 Master Agent 决策" +# ) +# async def test_master_agent( +# app_id: uuid.UUID = Path(..., description="应用 ID"), +# request: multi_agent_schema.RoutingTestRequest = ..., +# current_user: User = Depends(get_current_user), +# db: Session = Depends(get_db), +# ): +# """测试 Master Agent 的路由决策能力 + +# 这个接口专门用于测试新的 Master Agent 路由器, +# 可以看到 Master Agent 的完整决策过程。 + +# 返回信息包括: +# - 选中的 Agent +# - 置信度 +# - 决策理由 +# - 是否需要协作 +# - 路由策略(master_agent / rule_fast_path / fallback) +# """ +# from app.services.conversation_state_manager import ConversationStateManager +# from app.services.master_agent_router import MasterAgentRouter +# from app.models import ModelConfig + +# # 1. 获取多 Agent 配置 +# service = MultiAgentService(db) +# config = service.get_config(app_id) + +# if not config: +# return success( +# data=None, +# msg="应用未配置多 Agent,无法测试" +# ) + +# # 2. 加载 Master Agent +# from app.models import AppRelease, App + +# master_release = db.get(AppRelease, config.master_agent_id) +# if not master_release: +# return success( +# data=None, +# msg=f"Master Agent 发布版本不存在: {config.master_agent_id}" +# ) + +# # 获取应用信息 +# app = db.get(App, master_release.app_id) +# if not app: +# return success( +# data=None, +# msg=f"应用不存在: {master_release.app_id}" +# ) + +# # 创建 Master Agent 代理对象 +# class AgentConfigProxy: +# def __init__(self, release, app, config_data): +# self.id = release.id +# self.app_id = release.app_id +# self.app = app +# self.name = release.name +# self.description = release.description +# self.system_prompt = config_data.get("system_prompt") +# self.default_model_config_id = release.default_model_config_id + +# config_data = master_release.config or {} +# master_agent_config = AgentConfigProxy(master_release, app, config_data) + +# # 3. 获取 Master Agent 的模型配置 +# master_model_config = db.get(ModelConfig, master_agent_config.default_model_config_id) +# if not master_model_config: +# return success( +# data=None, +# msg=f"Master Agent 模型配置不存在: {master_agent_config.default_model_config_id}" +# ) + +# # 4. 准备子 Agent 信息 +# sub_agents = {} +# for sub_agent_info in config.sub_agents: +# agent_id = sub_agent_info["agent_id"] + +# # 加载子 Agent +# sub_release = db.get(AppRelease, uuid.UUID(agent_id)) +# if sub_release: +# sub_app = db.get(App, sub_release.app_id) +# sub_config_data = sub_release.config or {} +# sub_agent_config = AgentConfigProxy(sub_release, sub_app, sub_config_data) + +# sub_agents[agent_id] = { +# "config": sub_agent_config, +# "info": sub_agent_info +# } + +# # 5. 初始化 Master Agent 路由器 +# state_manager = ConversationStateManager() +# router = MasterAgentRouter( +# db=db, +# master_agent_config=master_agent_config, +# master_model_config=master_model_config, +# sub_agents=sub_agents, +# state_manager=state_manager, +# enable_rule_fast_path=True +# ) + +# # 6. 执行路由决策 +# try: +# decision = await router.route( +# message=request.message, +# conversation_id=str(request.conversation_id) if request.conversation_id else None, +# variables=None +# ) + +# # 7. 获取选中的 Agent 信息 +# agent_id = decision["selected_agent_id"] +# agent_info = sub_agents.get(agent_id, {}).get("info", {}) + +# # 8. 构建响应 +# response_data = { +# "message": request.message, +# "master_agent": { +# "name": master_agent_config.name, +# "model": master_model_config.name +# }, +# "decision": { +# "selected_agent_id": agent_id, +# "selected_agent_name": agent_info.get("name", "未知"), +# "selected_agent_role": agent_info.get("role", ""), +# "confidence": decision["confidence"], +# "reasoning": decision.get("reasoning", ""), +# "topic": decision.get("topic", ""), +# "strategy": decision["strategy"], +# "routing_method": decision.get("routing_method", ""), +# "need_collaboration": decision.get("need_collaboration", False), +# "collaboration_agents": decision.get("collaboration_agents", []) +# }, +# "config_info": { +# "total_sub_agents": len(sub_agents), +# "enable_rule_fast_path": True +# } +# } + +# return success( +# data=response_data, +# msg="Master Agent 决策测试成功" +# ) + +# except Exception as e: +# logger.error(f"Master Agent 决策测试失败: {str(e)}") +# return success( +# data=None, +# msg=f"测试失败: {str(e)}" +# ) -@router.post( - "/{app_id}/multi-agent/batch-test-routing", - summary="批量测试智能路由" -) -async def batch_test_routing( - app_id: uuid.UUID = Path(..., description="应用 ID"), - request: multi_agent_schema.BatchRoutingTestRequest = ..., - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db), -): - """批量测试智能路由功能 - - 用于测试多条消息的路由效果,并统计准确率 - - 参数: - - test_cases: 测试用例列表 - - routing_model_id: 路由模型 ID(可选) - - use_llm: 是否启用 LLM - - keyword_threshold: 关键词置信度阈值 - """ - from app.services.conversation_state_manager import ConversationStateManager - from app.services.llm_router import LLMRouter - from app.models import ModelConfig - - # 1. 获取多 Agent 配置 - service = MultiAgentService(db) - config = service.get_config(app_id) - - if not config: - return success( - data=None, - msg="应用未配置多 Agent,无法测试路由" - ) - - # 2. 准备子 Agent 信息 - sub_agents = {} - for sub_agent_info in config.sub_agents: - agent_id = sub_agent_info["agent_id"] - sub_agents[agent_id] = { - "name": sub_agent_info.get("name", agent_id), - "role": sub_agent_info.get("role", "") - } - - # 3. 获取路由模型 - routing_model = None - if request.routing_model_id: - routing_model = db.get(ModelConfig, request.routing_model_id) - - # 4. 初始化路由器 - state_manager = ConversationStateManager() - router = LLMRouter( - db=db, - state_manager=state_manager, - routing_rules=config.routing_rules or [], - sub_agents=sub_agents, - routing_model_config=routing_model, - use_llm=request.use_llm and routing_model is not None - ) - - if request.keyword_threshold: - router.keyword_high_confidence_threshold = request.keyword_threshold - - # 5. 批量测试 - results = [] - correct_count = 0 - total_count = len(request.test_cases) - - for test_case in request.test_cases: - try: - routing_result = await router.route( - message=test_case.message, - conversation_id=str(uuid.uuid4()) # 每个测试用例使用独立会话 - ) - - agent_id = routing_result["agent_id"] - agent_info = sub_agents.get(agent_id, {}) - - # 判断是否正确 - is_correct = None - if test_case.expected_agent_id: - is_correct = (agent_id == str(test_case.expected_agent_id)) - if is_correct: - correct_count += 1 - - results.append({ - "message": test_case.message, - "description": test_case.description, - "routed_agent_id": agent_id, - "routed_agent_name": agent_info.get("name"), - "expected_agent_id": str(test_case.expected_agent_id) if test_case.expected_agent_id else None, - "is_correct": is_correct, - "confidence": routing_result["confidence"], - "routing_method": routing_result["routing_method"], - "strategy": routing_result["strategy"] - }) - - except Exception as e: - logger.error(f"测试用例失败: {test_case.message}, 错误: {str(e)}") - results.append({ - "message": test_case.message, - "description": test_case.description, - "error": str(e) - }) - - # 6. 统计 - accuracy = None - if correct_count > 0: - total_with_expected = sum(1 for r in results if r.get("expected_agent_id")) - if total_with_expected > 0: - accuracy = correct_count / total_with_expected * 100 - - response_data = { - "total_count": total_count, - "correct_count": correct_count, - "accuracy": accuracy, - "results": results, - "config_info": { - "use_llm": request.use_llm and routing_model is not None, - "routing_model": routing_model.name if routing_model else None, - "keyword_threshold": router.keyword_high_confidence_threshold - } - } - - return success( - data=response_data, - msg=f"批量测试完成,准确率: {accuracy:.1f}%" if accuracy else "批量测试完成" - ) +# @router.post( +# "/{app_id}/multi-agent/batch-test-routing", +# summary="批量测试智能路由" +# ) +# async def batch_test_routing( +# app_id: uuid.UUID = Path(..., description="应用 ID"), +# request: multi_agent_schema.BatchRoutingTestRequest = ..., +# current_user: User = Depends(get_current_user), +# db: Session = Depends(get_db), +# ): +# """批量测试智能路由功能 + +# 用于测试多条消息的路由效果,并统计准确率 + +# 参数: +# - test_cases: 测试用例列表 +# - routing_model_id: 路由模型 ID(可选) +# - use_llm: 是否启用 LLM +# - keyword_threshold: 关键词置信度阈值 +# """ +# from app.services.conversation_state_manager import ConversationStateManager +# from app.services.llm_router import LLMRouter +# from app.models import ModelConfig + +# # 1. 获取多 Agent 配置 +# service = MultiAgentService(db) +# config = service.get_config(app_id) + +# if not config: +# return success( +# data=None, +# msg="应用未配置多 Agent,无法测试路由" +# ) + +# # 2. 准备子 Agent 信息 +# sub_agents = {} +# for sub_agent_info in config.sub_agents: +# agent_id = sub_agent_info["agent_id"] +# sub_agents[agent_id] = { +# "name": sub_agent_info.get("name", agent_id), +# "role": sub_agent_info.get("role", "") +# } + +# # 3. 获取路由模型 +# routing_model = None +# if request.routing_model_id: +# routing_model = db.get(ModelConfig, request.routing_model_id) + +# # 4. 初始化路由器 +# state_manager = ConversationStateManager() +# router = LLMRouter( +# db=db, +# state_manager=state_manager, +# routing_rules=config.routing_rules or [], +# sub_agents=sub_agents, +# routing_model_config=routing_model, +# use_llm=request.use_llm and routing_model is not None +# ) + +# if request.keyword_threshold: +# router.keyword_high_confidence_threshold = request.keyword_threshold + +# # 5. 批量测试 +# results = [] +# correct_count = 0 +# total_count = len(request.test_cases) + +# for test_case in request.test_cases: +# try: +# routing_result = await router.route( +# message=test_case.message, +# conversation_id=str(uuid.uuid4()) # 每个测试用例使用独立会话 +# ) + +# agent_id = routing_result["agent_id"] +# agent_info = sub_agents.get(agent_id, {}) + +# # 判断是否正确 +# is_correct = None +# if test_case.expected_agent_id: +# is_correct = (agent_id == str(test_case.expected_agent_id)) +# if is_correct: +# correct_count += 1 + +# results.append({ +# "message": test_case.message, +# "description": test_case.description, +# "routed_agent_id": agent_id, +# "routed_agent_name": agent_info.get("name"), +# "expected_agent_id": str(test_case.expected_agent_id) if test_case.expected_agent_id else None, +# "is_correct": is_correct, +# "confidence": routing_result["confidence"], +# "routing_method": routing_result["routing_method"], +# "strategy": routing_result["strategy"] +# }) + +# except Exception as e: +# logger.error(f"测试用例失败: {test_case.message}, 错误: {str(e)}") +# results.append({ +# "message": test_case.message, +# "description": test_case.description, +# "error": str(e) +# }) + +# # 6. 统计 +# accuracy = None +# if correct_count > 0: +# total_with_expected = sum(1 for r in results if r.get("expected_agent_id")) +# if total_with_expected > 0: +# accuracy = correct_count / total_with_expected * 100 + +# response_data = { +# "total_count": total_count, +# "correct_count": correct_count, +# "accuracy": accuracy, +# "results": results, +# "config_info": { +# "use_llm": request.use_llm and routing_model is not None, +# "routing_model": routing_model.name if routing_model else None, +# "keyword_threshold": router.keyword_high_confidence_threshold +# } +# } + +# return success( +# data=response_data, +# msg=f"批量测试完成,准确率: {accuracy:.1f}%" if accuracy else "批量测试完成" +# ) diff --git a/api/app/schemas/multi_agent_schema.py b/api/app/schemas/multi_agent_schema.py index 86bd8e00..f041c9bb 100644 --- a/api/app/schemas/multi_agent_schema.py +++ b/api/app/schemas/multi_agent_schema.py @@ -4,6 +4,8 @@ import datetime from typing import Optional, List, Dict, Any, Union from pydantic import BaseModel, Field, ConfigDict, field_serializer +from app.schemas import ModelParameters + # ==================== 子 Agent 配置 ==================== @@ -30,7 +32,7 @@ class ExecutionConfig(BaseModel): parallel_limit: int = Field(default=3, ge=1, le=10, description="并行限制") retry_on_failure: bool = Field(default=True, description="失败时是否重试") max_retries: int = Field(default=3, ge=0, le=10, description="最大重试次数") - + # 新增:路由模式配置 routing_mode: str = Field( default="master_agent", @@ -41,14 +43,14 @@ class ExecutionConfig(BaseModel): default=True, description="是否启用规则快速路径(性能优化,高置信度关键词直接返回)" ) - + # 新增:结果整合模式配置 result_merge_mode: str = Field( default="smart", pattern="^(smart|master)$", description="结果整合模式:smart(规则去重,快速)| master(Master Agent 智能整合,连贯)" ) - + # 新增:子 Agent 执行模式配置 sub_agent_execution_mode: str = Field( default="parallel", @@ -82,6 +84,11 @@ class MultiAgentConfigUpdate(BaseModel): """更新多 Agent 配置""" master_agent_id: Optional[uuid.UUID] = None master_agent_name: Optional[str] = Field(None, max_length=100, description="主 Agent 名称") + default_model_config_id : uuid.UUID = Field(description="默认模型配置ID") + model_parameters: ModelParameters | None = Field( + default_factory=ModelParameters, + description="模型参数配置(temperature、max_tokens 等)" + ) orchestration_mode: Optional[str] = Field( None, pattern="^(sequential|parallel|conditional|loop)$" @@ -99,11 +106,16 @@ class MultiAgentConfigUpdate(BaseModel): class MultiAgentConfigSchema(BaseModel): """多 Agent 配置输出""" model_config = ConfigDict(from_attributes=True) - + id: uuid.UUID app_id: uuid.UUID master_agent_id: uuid.UUID master_agent_name: Optional[str] + default_model_config_id : uuid.UUID | None = Field(description="默认模型配置ID") + model_parameters: ModelParameters | None = Field( + default_factory=ModelParameters, + description="模型参数配置(temperature、max_tokens 等)" + ) orchestration_mode: str sub_agents: List[Dict[str, Any]] routing_rules: Optional[List[Dict[str, Any]]] @@ -112,11 +124,11 @@ class MultiAgentConfigSchema(BaseModel): is_active: bool created_at: datetime.datetime updated_at: datetime.datetime - + @field_serializer("created_at", when_used="json") def _serialize_created_at(self, dt: datetime.datetime): return int(dt.timestamp() * 1000) if dt else None - + @field_serializer("updated_at", when_used="json") def _serialize_updated_at(self, dt: datetime.datetime): return int(dt.timestamp() * 1000) if dt else None diff --git a/api/app/services/master_agent_router.py b/api/app/services/master_agent_router.py index aa473706..675b4bf9 100644 --- a/api/app/services/master_agent_router.py +++ b/api/app/services/master_agent_router.py @@ -5,63 +5,63 @@ import uuid from typing import Dict, Any, List, Optional, Tuple from sqlalchemy.orm import Session +from app.schemas import ModelParameters from app.services.conversation_state_manager import ConversationStateManager from app.models import ModelConfig, AgentConfig from app.core.logging_config import get_business_logger +from app.services.model_service import ModelApiKeyService logger = get_business_logger() class MasterAgentRouter: """Master Agent 路由器 - + 让 Master Agent 作为"大脑",负责: 1. 分析用户意图 2. 选择最合适的 Sub Agent 3. 决定是否需要多 Agent 协作 4. 管理会话上下文 - + 优势: - 更智能的决策(基于完整上下文) - 减少 LLM 调用次数 - 架构更清晰(Master Agent 真正起作用) """ - + def __init__( self, db: Session, - master_agent_config: AgentConfig, master_model_config: ModelConfig, + model_parameters: ModelParameters, sub_agents: Dict[str, Any], state_manager: ConversationStateManager, enable_rule_fast_path: bool = True ): """初始化 Master Agent 路由器 - + Args: db: 数据库会话 - master_agent_config: Master Agent 配置 master_model_config: Master Agent 使用的模型配置 sub_agents: 子 Agent 配置字典 state_manager: 会话状态管理器 enable_rule_fast_path: 是否启用规则快速路径(性能优化) """ self.db = db - self.master_agent_config = master_agent_config self.master_model_config = master_model_config + self.model_parameters = model_parameters self.sub_agents = sub_agents self.state_manager = state_manager self.enable_rule_fast_path = enable_rule_fast_path - + logger.info( "Master Agent 路由器初始化", extra={ - "master_agent": master_agent_config.name, "sub_agent_count": len(sub_agents), "enable_rule_fast_path": enable_rule_fast_path } ) - + async def route( self, message: str, @@ -69,12 +69,12 @@ class MasterAgentRouter: variables: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """智能路由决策 - + Args: message: 用户消息 conversation_id: 会话 ID variables: 变量参数 - + Returns: 路由决策结果 """ @@ -85,12 +85,12 @@ class MasterAgentRouter: "conversation_id": conversation_id } ) - + # 1. 获取会话状态 state = None if conversation_id: state = self.state_manager.get_state(conversation_id) - + # 2. 尝试规则快速路径(可选的性能优化) if self.enable_rule_fast_path: rule_result = self._try_rule_fast_path(message, state) @@ -102,7 +102,7 @@ class MasterAgentRouter: "confidence": rule_result["confidence"] } ) - + # 更新会话状态 if conversation_id: self.state_manager.update_state( @@ -112,12 +112,12 @@ class MasterAgentRouter: rule_result.get("topic"), rule_result["confidence"] ) - + return rule_result - + # 3. 调用 Master Agent 做决策 decision = await self._master_agent_decide(message, state, variables) - + # 4. 更新会话状态 if conversation_id: self.state_manager.update_state( @@ -127,7 +127,7 @@ class MasterAgentRouter: decision.get("topic"), decision["confidence"] ) - + logger.info( "Master Agent 路由完成", extra={ @@ -136,22 +136,22 @@ class MasterAgentRouter: "confidence": decision["confidence"] } ) - + return decision - + def _try_rule_fast_path( self, message: str, state: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """尝试规则快速路径(性能优化) - + 对于明确的关键词匹配,直接返回结果,不调用 Master Agent - + Args: message: 用户消息 state: 会话状态 - + Returns: 如果命中规则返回决策结果,否则返回 None """ @@ -178,15 +178,15 @@ class MasterAgentRouter: "confidence_threshold": 0.9 } ] - + message_lower = message.lower() - + for rule in high_confidence_rules: matched_keywords = [kw for kw in rule["keywords"] if kw in message_lower] - + if matched_keywords: confidence = len(matched_keywords) / len(rule["keywords"]) - + if confidence >= rule["confidence_threshold"]: # 查找对应的 agent for agent_id, agent_data in self.sub_agents.items(): @@ -201,9 +201,9 @@ class MasterAgentRouter: "need_collaboration": False, "routing_method": "rule" } - + return None - + async def _master_agent_decide( self, message: str, @@ -211,35 +211,35 @@ class MasterAgentRouter: variables: Optional[Dict[str, Any]] ) -> Dict[str, Any]: """让 Master Agent 做路由决策 - + Args: message: 用户消息 state: 会话状态 variables: 变量参数 - + Returns: 决策结果 """ # 1. 构建决策 prompt prompt = self._build_decision_prompt(message, state, variables) - + # 2. 调用 Master Agent 的 LLM try: response = await self._call_master_agent_llm(prompt) - + # 3. 解析决策 decision = self._parse_decision(response) - + # 4. 验证决策 decision = self._validate_decision(decision) - + return decision - + except Exception as e: logger.error(f"Master Agent 决策失败: {str(e)}") # 降级到默认 agent return self._get_fallback_decision(message) - + def _build_decision_prompt( self, message: str, @@ -247,12 +247,12 @@ class MasterAgentRouter: variables: Optional[Dict[str, Any]] ) -> str: """构建 Master Agent 的决策 prompt - + Args: message: 用户消息 state: 会话状态 variables: 变量参数 - + Returns: prompt 字符串 """ @@ -260,29 +260,29 @@ class MasterAgentRouter: agent_descriptions = [] for agent_id, agent_data in self.sub_agents.items(): agent_info = agent_data.get("info", {}) - + name = agent_info.get("name", "未命名") role = agent_info.get("role", "") capabilities = agent_info.get("capabilities", []) - + # 简化格式:一行描述 desc = f"- {agent_id}: {name}" if role: desc += f" ({role})" if capabilities: desc += f" - {', '.join(capabilities[:3])}" # 只取前3个能力 - + agent_descriptions.append(desc) - + agents_text = "\n".join(agent_descriptions) - + # 2. 构建会话上下文 context_text = "" if state: current_agent = state.get("current_agent_id") last_topic = state.get("last_topic") same_turns = state.get("same_agent_turns", 0) - + if current_agent: context_text = f""" 当前会话上下文: @@ -290,10 +290,10 @@ class MasterAgentRouter: - 上一个主题: {last_topic} - 连续使用轮数: {same_turns} """ - + # 获取第一个可用的 agent_id 作为示例 example_agent_id = next(iter(self.sub_agents.keys())) if self.sub_agents else "agent_id" - + # 3. 构建完整 prompt(简化版,提升性能) prompt = f"""路由任务:分析问题并选择合适的 Agent。 @@ -331,15 +331,15 @@ class MasterAgentRouter: 6. 只做路由决策,不要回答问题内容 请返回 JSON:""" - + return prompt - + async def _call_master_agent_llm(self, prompt: str) -> str: """调用 Master Agent 的 LLM - + Args: prompt: 提示词 - + Returns: LLM 响应 """ @@ -347,16 +347,13 @@ class MasterAgentRouter: from app.core.models import RedBearLLM from app.core.models.base import RedBearModelConfig from app.models import ModelApiKey, ModelType - + # 获取 API Key 配置 - api_key_config = self.db.query(ModelApiKey).filter( - ModelApiKey.model_config_id == self.master_model_config.id, - ModelApiKey.is_active == True - ).first() - + api_key_config = ModelApiKeyService.get_a_api_key(self.db, self.master_model_config.id) + if not api_key_config: raise Exception("Master Agent 模型没有可用的 API Key") - + logger.info( "调用 Master Agent LLM", extra={ @@ -364,39 +361,44 @@ class MasterAgentRouter: "model_name": api_key_config.model_name } ) - + temperature = 0.3 # 决策任务使用较低温度 + max_tokens = 1000 + extra_params = None + if self.model_parameters: + extra_params = {"temperature": temperature, + "max_tokens":max_tokens + } # 创建 RedBearModelConfig model_config = RedBearModelConfig( model_name=api_key_config.model_name, provider=api_key_config.provider, api_key=api_key_config.api_key, base_url=api_key_config.api_base, - temperature=0.3, # 决策任务使用较低温度 - max_tokens=1000 + extra_params = extra_params ) - + # 创建 LLM 实例 llm = RedBearLLM(model_config, type=ModelType.CHAT) - + # 调用模型 response = await llm.ainvoke(prompt) - + # 提取响应内容 if hasattr(response, 'content'): return response.content else: return str(response) - + except Exception as e: logger.error(f"Master Agent LLM 调用失败: {str(e)}") raise - + def _parse_decision(self, response: str) -> Dict[str, Any]: """解析 Master Agent 的决策 - + Args: response: LLM 响应 - + Returns: 决策字典 """ @@ -405,29 +407,29 @@ class MasterAgentRouter: json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', response, re.DOTALL) if json_match: decision = json.loads(json_match.group()) - + # 添加默认值 decision.setdefault("confidence", 0.8) decision.setdefault("strategy", "master_agent") decision.setdefault("routing_method", "master_agent") decision.setdefault("need_collaboration", False) decision.setdefault("collaboration_agents", []) - + return decision else: raise ValueError("无法从响应中提取 JSON") - + except Exception as e: logger.error(f"解析 Master Agent 决策失败: {str(e)}") logger.debug(f"原始响应: {response}") raise - + def _validate_decision(self, decision: Dict[str, Any]) -> Dict[str, Any]: """验证决策的有效性 - + Args: decision: 决策字典 - + Returns: 验证后的决策 """ @@ -439,26 +441,26 @@ class MasterAgentRouter: decision["selected_agent_id"] = self._get_default_agent_id() decision["confidence"] = 0.5 decision["reasoning"] = "原始选择无效,使用默认 Agent" - + # 验证 confidence confidence = decision.get("confidence", 0.8) if not isinstance(confidence, (int, float)) or confidence < 0 or confidence > 1: decision["confidence"] = 0.8 - + # 验证协作 agents if decision.get("need_collaboration"): # 检查是否是问题拆分模式 if decision.get("need_decomposition") or decision.get("sub_questions"): # 问题拆分模式 sub_questions = decision.get("sub_questions", []) - + # 验证每个子问题 valid_sub_questions = [] for sub_q in sub_questions: if isinstance(sub_q, dict): agent_id = sub_q.get("agent_id") question = sub_q.get("question") - + if agent_id in self.sub_agents and question: # 确保有必要的字段 sub_q.setdefault("order", len(valid_sub_questions) + 1) @@ -475,9 +477,9 @@ class MasterAgentRouter: "available_agents": list(self.sub_agents.keys()) } ) - + decision["sub_questions"] = valid_sub_questions - + # 如果所有子问题都验证失败,降级处理 if not valid_sub_questions and sub_questions: logger.warning( @@ -496,10 +498,10 @@ class MasterAgentRouter: first_agent_id = next(iter(self.sub_agents.keys())) decision["selected_agent_id"] = first_agent_id logger.info(f"降级使用默认 Agent: {first_agent_id}") - + # 设置协作策略为 decomposition decision["collaboration_strategy"] = "decomposition" - + logger.info( "问题拆分决策验证完成", extra={ @@ -510,7 +512,7 @@ class MasterAgentRouter: else: # 普通协作模式 collaboration_agents = decision.get("collaboration_agents", []) - + # 如果是简单列表格式,转换为详细格式 if collaboration_agents and isinstance(collaboration_agents[0], str): collaboration_agents = [ @@ -522,7 +524,7 @@ class MasterAgentRouter: } for i, agent_id in enumerate(collaboration_agents) ] - + # 验证每个协作 agent valid_agents = [] for agent_info in collaboration_agents: @@ -541,13 +543,13 @@ class MasterAgentRouter: "task": "协作处理", "order": len(valid_agents) + 1 }) - + decision["collaboration_agents"] = valid_agents - + # 设置默认协作策略 if not decision.get("collaboration_strategy"): decision["collaboration_strategy"] = "sequential" - + logger.info( "协作决策验证完成", extra={ @@ -555,20 +557,20 @@ class MasterAgentRouter: "strategy": decision.get("collaboration_strategy") } ) - + return decision - + def _get_fallback_decision(self, message: str) -> Dict[str, Any]: """获取降级决策(当 Master Agent 失败时) - + Args: message: 用户消息 - + Returns: 降级决策 """ default_agent_id = self._get_default_agent_id() - + return { "selected_agent_id": default_agent_id, "confidence": 0.5, @@ -579,15 +581,15 @@ class MasterAgentRouter: "collaboration_agents": [], "routing_method": "fallback" } - + def _get_default_agent_id(self) -> str: """获取默认 Agent ID - + Returns: 默认 Agent ID """ if self.sub_agents: # 返回第一个 agent return next(iter(self.sub_agents.keys())) - + return "default-agent" diff --git a/api/app/services/multi_agent_orchestrator.py b/api/app/services/multi_agent_orchestrator.py index 18315714..1759d16e 100644 --- a/api/app/services/multi_agent_orchestrator.py +++ b/api/app/services/multi_agent_orchestrator.py @@ -6,6 +6,7 @@ from typing import Dict, Any, List, Optional, AsyncIterator, Tuple from sqlalchemy.orm import Session from app.models import MultiAgentConfig, AgentConfig, ModelConfig +from app.models.multi_agent_model import AggregationStrategy, OrchestrationMode from app.services.agent_registry import AgentRegistry from app.services.master_agent_router import MasterAgentRouter from app.services.conversation_state_manager import ConversationStateManager @@ -18,10 +19,10 @@ logger = get_business_logger() class MultiAgentOrchestrator: """多 Agent 编排器 - 协调多个 Agent 协作完成任务""" - + def __init__(self, db: Session, config: MultiAgentConfig): """初始化编排器 - + Args: db: 数据库会话 config: 多 Agent 配置 @@ -29,10 +30,13 @@ class MultiAgentOrchestrator: self.db = db self.config = config self.registry = AgentRegistry(db) - + # 加载主 Agent - self.master_agent = self._load_agent(config.master_agent_id) - + # self.master_agent = self._load_agent(config.master_agent_id) + # self. config.d + self.default_model_config_id = config.default_model_config_id + self.model_parameters = config.model_parameters + # 加载子 Agent self.sub_agents = {} for sub_agent_info in config.sub_agents: @@ -42,38 +46,37 @@ class MultiAgentOrchestrator: "config": agent, "info": sub_agent_info } - + # 初始化会话状态管理器 self.state_manager = ConversationStateManager() - + # 获取 Master Agent 的模型配置 - if not hasattr(self.master_agent, 'default_model_config_id'): + if not self.default_model_config_id: raise BusinessException("Master Agent 缺少模型配置", BizCode.AGENT_CONFIG_MISSING) - - self.master_model_config = self.db.get(ModelConfig, self.master_agent.default_model_config_id) + + self.master_model_config = self.db.get(ModelConfig, self.default_model_config_id) if not self.master_model_config: raise BusinessException("Master Agent 模型配置不存在", BizCode.AGENT_CONFIG_MISSING) - + # 初始化 Master Agent 路由器 self.router = MasterAgentRouter( db=db, - master_agent_config=self.master_agent, master_model_config=self.master_model_config, + model_parameters=self.model_parameters, sub_agents=self.sub_agents, state_manager=self.state_manager, enable_rule_fast_path=config.execution_config.get("enable_rule_fast_path", True) ) - + logger.info( "多 Agent 编排器初始化完成", extra={ "config_id": str(config.id), - "master_agent": self.master_agent.name, "model": self.master_model_config.name, "sub_agent_count": len(self.sub_agents) } ) - + async def execute_stream( self, message: str, @@ -87,21 +90,21 @@ class MultiAgentOrchestrator: user_rag_memory_id: str = '' ): """执行多 Agent 任务(流式返回) - + Args: message: 用户消息 conversation_id: 会话 ID user_id: 用户 ID variables: 变量参数 use_llm_routing: 是否使用 LLM 路由 - + Yields: SSE 格式的事件流 """ import json - + start_time = time.time() - + logger.info( "开始执行多 Agent 任务(流式)", extra={ @@ -109,20 +112,20 @@ class MultiAgentOrchestrator: "message_length": len(message) } ) - + try: # 发送开始事件 yield self._format_sse_event("start", { "mode": self.config.orchestration_mode, "timestamp": time.time() }) - + # 1. 主 Agent 分析任务 task_analysis = await self._analyze_task(message, variables) task_analysis["use_llm_routing"] = use_llm_routing - + # 2. 根据模式执行(流式) - if self.config.orchestration_mode == "conditional": + if self.config.orchestration_mode == OrchestrationMode.CONDITIONAL: async for event in self._execute_conditional_stream( task_analysis, conversation_id, @@ -135,7 +138,7 @@ class MultiAgentOrchestrator: yield event else: # 其他模式暂时使用非流式执行,然后一次性返回 - if self.config.orchestration_mode == "sequential": + if self.config.orchestration_mode == OrchestrationMode.SEQUENTIAL: results = await self._execute_sequential( task_analysis, conversation_id, @@ -145,7 +148,7 @@ class MultiAgentOrchestrator: storage_type, user_rag_memory_id ) - elif self.config.orchestration_mode == "parallel": + elif self.config.orchestration_mode == OrchestrationMode.PARALLEL: results = await self._execute_parallel( task_analysis, conversation_id, @@ -155,25 +158,25 @@ class MultiAgentOrchestrator: storage_type, user_rag_memory_id ) - elif self.config.orchestration_mode == "loop": - results = await self._execute_loop( - task_analysis, - conversation_id, - user_id, - web_search, - memory, - storage_type, - user_rag_memory_id - ) + # elif self.config.orchestration_mode == "loop": + # results = await self._execute_loop( + # task_analysis, + # conversation_id, + # user_id, + # web_search, + # memory, + # storage_type, + # user_rag_memory_id + # ) else: raise BusinessException( f"不支持的编排模式: {self.config.orchestration_mode}", BizCode.INVALID_PARAMETER ) - + # 整合结果 final_result = await self._aggregate_results(results) - + # 提取会话 ID sub_conversation_id = None if isinstance(results, dict): @@ -184,21 +187,21 @@ class MultiAgentOrchestrator: sub_conversation_id = item["result"].get("conversation_id") if sub_conversation_id: break - + # 发送消息事件 yield self._format_sse_event("message", { "content": final_result, "conversation_id": sub_conversation_id }) - + elapsed_time = time.time() - start_time - + # 发送结束事件 yield self._format_sse_event("end", { "elapsed_time": elapsed_time, "timestamp": time.time() }) - + logger.info( "多 Agent 任务完成(流式)", extra={ @@ -206,7 +209,7 @@ class MultiAgentOrchestrator: "elapsed_time": elapsed_time } ) - + except Exception as e: logger.error( "多 Agent 任务执行失败(流式)", @@ -217,7 +220,7 @@ class MultiAgentOrchestrator: "error": str(e), "timestamp": time.time() }) - + async def execute( self, message: str, @@ -229,32 +232,32 @@ class MultiAgentOrchestrator: memory: bool = True ) -> Dict[str, Any]: """执行多 Agent 任务(基于 Master Agent 决策) - + Args: message: 用户消息 conversation_id: 会话 ID user_id: 用户 ID variables: 变量参数 use_llm_routing: 是否使用 LLM 路由(保留参数,实际总是使用 Master Agent) - + Returns: 执行结果 """ start_time = time.time() - + logger.info( "开始执行多 Agent 任务", extra={"message_length": len(message)} ) - + try: # 1. Master Agent 分析任务并做出决策 task_analysis = await self._analyze_task(message, variables) - + routing_decision = task_analysis.get("routing_decision") if not routing_decision: raise BusinessException("Master Agent 未返回路由决策", BizCode.AGENT_CONFIG_MISSING) - + logger.info( "Master Agent 决策", extra={ @@ -263,19 +266,19 @@ class MultiAgentOrchestrator: "confidence": routing_decision.get("confidence") } ) - + # 2. 根据 Master Agent 的决策执行 results = await self._execute_conditional( task_analysis, conversation_id, user_id ) - + # 3. 整合结果 final_result = await self._aggregate_results(results) - + elapsed_time = time.time() - start_time - + # 4. 提取子 Agent 的 conversation_id(用于多轮对话) sub_conversation_id = None if isinstance(results, dict): @@ -286,7 +289,7 @@ class MultiAgentOrchestrator: sub_conversation_id = item["result"].get("conversation_id") if sub_conversation_id: break - + logger.info( "多 Agent 任务完成", extra={ @@ -295,7 +298,7 @@ class MultiAgentOrchestrator: "sub_conversation_id": sub_conversation_id } ) - + return { "message": final_result, "conversation_id": sub_conversation_id, @@ -303,25 +306,25 @@ class MultiAgentOrchestrator: "strategy": routing_decision.get("collaboration_strategy", "single"), "sub_results": results } - + except Exception as e: logger.error( "多 Agent 任务执行失败", extra={"error": str(e)} ) raise - + async def _analyze_task( self, message: str, variables: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Master Agent 分析任务并做出路由决策 - + Args: message: 用户消息 variables: 变量参数 - + Returns: 任务分析结果,包含路由决策 """ @@ -329,14 +332,14 @@ class MultiAgentOrchestrator: "Master Agent 开始分析任务", extra={"message_length": len(message)} ) - + # 使用 Master Agent 路由器进行决策 routing_decision = await self.router.route( message=message, conversation_id=None, # 会在后续传入 variables=variables ) - + logger.info( "Master Agent 分析完成", extra={ @@ -345,7 +348,7 @@ class MultiAgentOrchestrator: "strategy": routing_decision.get("strategy") } ) - + return { "message": message, "variables": variables or {}, @@ -353,7 +356,7 @@ class MultiAgentOrchestrator: "initial_context": variables or {}, "routing_decision": routing_decision } - + async def _execute_sequential( self, task_analysis: Dict[str, Any], @@ -365,33 +368,33 @@ class MultiAgentOrchestrator: user_rag_memory_id: str = '' ) -> List[Dict[str, Any]]: """顺序执行子 Agent - + Args: task_analysis: 任务分析结果 conversation_id: 会话 ID user_id: 用户 ID - + Returns: 执行结果列表 """ results = [] context = task_analysis.get("initial_context", {}) message = task_analysis.get("message", "") - + # 按优先级排序 sub_agents = sorted( task_analysis["sub_agents"], key=lambda x: x.get("priority", 0) ) - + for sub_agent_info in sub_agents: agent_id = sub_agent_info["agent_id"] agent_data = self.sub_agents.get(agent_id) - + if not agent_data: logger.warning(f"子 Agent 不存在: {agent_id}") continue - + logger.info( "执行子 Agent", extra={ @@ -400,7 +403,7 @@ class MultiAgentOrchestrator: "priority": sub_agent_info.get("priority") } ) - + # 执行子 Agent result = await self._execute_sub_agent( agent_data["config"], @@ -413,19 +416,19 @@ class MultiAgentOrchestrator: storage_type, user_rag_memory_id ) - + results.append({ "agent_id": agent_id, "agent_name": sub_agent_info.get("name"), "result": result, "conversation_id": result.get("conversation_id") # 保存会话 ID }) - + # 更新上下文(后续 Agent 可以使用前面的结果) context[f"result_from_{sub_agent_info.get('name', agent_id)}"] = result.get("message") - + return results - + async def _execute_parallel( self, task_analysis: Dict[str, Any], @@ -437,30 +440,30 @@ class MultiAgentOrchestrator: user_rag_memory_id: str = '' ) -> List[Dict[str, Any]]: """并行执行子 Agent - + Args: task_analysis: 任务分析结果 conversation_id: 会话 ID user_id: 用户 ID - + Returns: 执行结果列表 """ context = task_analysis.get("initial_context", {}) message = task_analysis.get("message", "") - + # 获取并发限制 parallel_limit = self.config.execution_config.get("parallel_limit", 3) - + # 创建任务列表 tasks = [] for sub_agent_info in task_analysis["sub_agents"]: agent_id = sub_agent_info["agent_id"] agent_data = self.sub_agents.get(agent_id) - + if not agent_data: continue - + task = self._execute_sub_agent( agent_data["config"], message, @@ -473,7 +476,7 @@ class MultiAgentOrchestrator: user_rag_memory_id ) tasks.append((agent_id, sub_agent_info.get("name"), task)) - + # 并行执行(带限制) results = [] for i in range(0, len(tasks), parallel_limit): @@ -482,7 +485,7 @@ class MultiAgentOrchestrator: *[task for _, _, task in batch], return_exceptions=True ) - + for (agent_id, agent_name, _), result in zip(batch, batch_results, strict=False): if isinstance(result, Exception): logger.error(f"子 Agent 执行失败: {agent_name}", extra={"error": str(result)}) @@ -498,9 +501,9 @@ class MultiAgentOrchestrator: "result": result, "conversation_id": result.get("conversation_id") # 保存会话 ID }) - + return results - + async def _execute_collaboration_stream( self, task_analysis: Dict[str, Any], @@ -509,26 +512,26 @@ class MultiAgentOrchestrator: routing_decision: Dict[str, Any] ): """多 Agent 协作流式执行 - + Args: task_analysis: 任务分析结果 conversation_id: 会话 ID user_id: 用户 ID routing_decision: 路由决策 - + Yields: SSE 格式的事件流 """ message = task_analysis.get("message", "") initial_context = task_analysis.get("initial_context", {}) collaboration_strategy = routing_decision.get("collaboration_strategy", "sequential") - + # 获取协作信息 if collaboration_strategy == "decomposition": collaboration_agents = routing_decision.get("sub_questions", []) else: collaboration_agents = routing_decision.get("collaboration_agents", []) - + logger.info( "开始流式协作执行", extra={ @@ -536,7 +539,7 @@ class MultiAgentOrchestrator: "agent_count": len(collaboration_agents) } ) - + # 1. 发送编排计划事件(在执行前) # 构建子任务信息 sub_tasks = [] @@ -563,43 +566,43 @@ class MultiAgentOrchestrator: "role": item.get("role", "secondary"), "order": item.get("order", 0) }) - + yield self._format_sse_event("orchestration_plan", { "agent_count": len(sub_tasks), "strategy": collaboration_strategy, "sub_tasks": sub_tasks }) - + # 2. 流式执行所有子 Agent results = [] - + # 获取执行模式配置 execution_mode = self.config.execution_config.get("sub_agent_execution_mode", "parallel") - + if collaboration_strategy == "decomposition": # 问题拆分模式 # 检查是否有依赖关系 has_dependencies = self._check_dependencies(collaboration_agents) - + if has_dependencies or execution_mode == "sequential": # 有依赖或配置为串行:串行流式执行 logger.info("使用串行流式执行(问题拆分)") for sub_q in sorted(collaboration_agents, key=lambda x: x.get("order", 0)): sub_question = sub_q.get("question", "") agent_id = sub_q.get("agent_id") - + agent_data = self.sub_agents.get(agent_id) if not agent_data: continue - + agent_name = agent_data.get("info", {}).get("name", agent_id) - + # 发送子问题开始事件 yield self._format_sse_event("sub_question_start", { "question": sub_question, "agent_name": agent_name }) - + # 流式执行子 Agent,收集结果 result_content = "" async for event in self._execute_sub_agent_stream( @@ -615,12 +618,12 @@ class MultiAgentOrchestrator: import json data_line = event.split("data: ", 1)[1].strip() data = json.loads(data_line) - + # 提取内容 if "content" in data: content = data["content"] result_content += content - + # 转换为子 Agent 专用事件,带上 agent 信息 yield self._format_sse_event("sub_agent_message", { "content": content, @@ -633,14 +636,14 @@ class MultiAgentOrchestrator: else: # 非 data 事件直接转发 yield event - + results.append({ "agent_id": agent_id, "agent_name": agent_name, "sub_question": sub_question, "result": {"message": result_content} }) - + # 发送子问题完成事件 yield self._format_sse_event("sub_question_end", { "agent_name": agent_name @@ -648,27 +651,27 @@ class MultiAgentOrchestrator: else: # 无依赖且配置为并行:并行流式执行 logger.info(f"使用并行流式执行(问题拆分),共 {len(collaboration_agents)} 个子问题") - + # 准备并行任务 agent_tasks = [] agent_info_map = {} result_contents = {} - + for sub_q in collaboration_agents: sub_question = sub_q.get("question", "") agent_id = sub_q.get("agent_id") - + agent_data = self.sub_agents.get(agent_id) if not agent_data: continue - + agent_name = agent_data.get("info", {}).get("name", agent_id) agent_info_map[agent_id] = { "name": agent_name, "sub_question": sub_question } result_contents[agent_id] = "" - + agent_tasks.append(( agent_id, agent_name, @@ -676,13 +679,13 @@ class MultiAgentOrchestrator: sub_question, initial_context )) - + # 发送子问题开始事件 yield self._format_sse_event("sub_question_start", { "question": sub_question, "agent_name": agent_name }) - + # 并行流式执行 async for agent_id, agent_name, event_type, content in self._parallel_stream_agents( agent_tasks, @@ -692,7 +695,7 @@ class MultiAgentOrchestrator: if event_type == "content": # 累积结果 result_contents[agent_id] += content - + # 实时返回 yield self._format_sse_event("sub_agent_message", { "content": content, @@ -700,7 +703,7 @@ class MultiAgentOrchestrator: "agent_name": agent_name, "sub_question": agent_info_map[agent_id]["sub_question"] }) - + elif event_type == "done": # Agent 完成 results.append({ @@ -709,11 +712,11 @@ class MultiAgentOrchestrator: "sub_question": agent_info_map[agent_id]["sub_question"], "result": {"message": result_contents[agent_id]} }) - + yield self._format_sse_event("sub_question_end", { "agent_name": agent_name }) - + elif event_type == "error": logger.error(f"Agent {agent_name} 执行失败: {content}") else: @@ -721,18 +724,18 @@ class MultiAgentOrchestrator: if collaboration_strategy == "parallel" and execution_mode == "parallel": # 并行协作 + 并行流式执行 logger.info(f"使用并行流式执行(并行协作),共 {len(collaboration_agents)} 个 Agent") - + # 准备并行任务 agent_tasks = [] agent_info_map = {} result_contents = {} - + for agent_info in collaboration_agents: agent_id = agent_info.get("agent_id") agent_data = self.sub_agents.get(agent_id) if not agent_data: continue - + agent_name = agent_data.get("info", {}).get("name", agent_id) agent_info_map[agent_id] = { "name": agent_name, @@ -740,7 +743,7 @@ class MultiAgentOrchestrator: "task": agent_info.get("task", "") } result_contents[agent_id] = "" - + # 构建该 Agent 的消息 agent_task = agent_info.get("task", "处理任务") agent_message = f"""原始问题:{message} @@ -748,7 +751,7 @@ class MultiAgentOrchestrator: 你的任务:{agent_task} 请完成你的任务。""" - + agent_tasks.append(( agent_id, agent_name, @@ -756,12 +759,12 @@ class MultiAgentOrchestrator: agent_message, initial_context.copy() )) - + # 发送 Agent 开始事件 yield self._format_sse_event("agent_start", { "agent_name": agent_name }) - + # 并行流式执行 async for agent_id, agent_name, event_type, content in self._parallel_stream_agents( agent_tasks, @@ -771,7 +774,7 @@ class MultiAgentOrchestrator: if event_type == "content": # 累积结果 result_contents[agent_id] += content - + # 实时返回 yield self._format_sse_event("sub_agent_message", { "content": content, @@ -779,7 +782,7 @@ class MultiAgentOrchestrator: "agent_name": agent_name, "role": agent_info_map[agent_id]["role"] }) - + elif event_type == "done": # Agent 完成 results.append({ @@ -789,11 +792,11 @@ class MultiAgentOrchestrator: "task": agent_info_map[agent_id]["task"], "result": {"message": result_contents[agent_id]} }) - + yield self._format_sse_event("agent_end", { "agent_name": agent_name }) - + elif event_type == "error": logger.error(f"Agent {agent_name} 执行失败: {content}") else: @@ -804,14 +807,14 @@ class MultiAgentOrchestrator: agent_data = self.sub_agents.get(agent_id) if not agent_data: continue - + agent_name = agent_data.get("info", {}).get("name", agent_id) - + # 发送 Agent 开始事件 yield self._format_sse_event("agent_start", { "agent_name": agent_name }) - + # 流式执行子 Agent,收集结果 result_content = "" async for event in self._execute_sub_agent_stream( @@ -827,12 +830,12 @@ class MultiAgentOrchestrator: import json data_line = event.split("data: ", 1)[1].strip() data = json.loads(data_line) - + # 提取内容 if "content" in data: content = data["content"] result_content += content - + # 转换为子 Agent 专用事件,带上 agent 信息 yield self._format_sse_event("sub_agent_message", { "content": content, @@ -845,24 +848,24 @@ class MultiAgentOrchestrator: else: # 非 data 事件直接转发 yield event - + results.append({ "agent_id": agent_id, "agent_name": agent_name, "result": {"message": result_content} }) - + # 发送 Agent 完成事件 yield self._format_sse_event("agent_end", { "agent_name": agent_name }) - + # 3. 智能整合结果 merge_mode = self.config.execution_config.get("result_merge_mode", "smart") - + # 智能判断是否需要整合 need_merge = self._should_merge_results(results, collaboration_strategy) - + if not need_merge: # 不需要整合:用户已经看到所有内容了 logger.info("跳过整合阶段(用户已看到所有 Agent 输出)") @@ -870,14 +873,14 @@ class MultiAgentOrchestrator: elif merge_mode == "master" and len(results) > 1: # Master Agent 整合(非流式,避免等待时间) logger.info("开始 Master Agent 整合") - + # 发送整合开始提示 yield self._format_sse_event("merge_start", { "merge_mode": "master", "agent_count": len(results), "message": "正在整合多个专家的回答..." }) - + # 非流式整合(更快) try: final_response = await self._master_merge_results( @@ -885,7 +888,7 @@ class MultiAgentOrchestrator: collaboration_strategy, message ) - + # 发送整合后的结果 yield self._format_sse_event("merge_complete", { "content": final_response @@ -899,20 +902,20 @@ class MultiAgentOrchestrator: else: # Smart 模式:快速整合 logger.info("使用 Smart 模式整合") - + yield self._format_sse_event("merge_start", { "merge_mode": "smart", "agent_count": len(results) }) - + final_response = self._smart_merge_results(results, collaboration_strategy) - + # 只有在需要时才发送整合结果 if final_response and final_response != "": yield self._format_sse_event("merge_complete", { "content": final_response }) - + async def _execute_conditional_stream( self, task_analysis: Dict[str, Any], @@ -924,26 +927,26 @@ class MultiAgentOrchestrator: user_rag_memory_id: str = '' ): """条件路由执行(流式,重构版 - 使用 Master Agent 决策) - + Args: task_analysis: 任务分析结果(包含 Master Agent 的决策) conversation_id: 会话 ID user_id: 用户 ID - + Yields: SSE 格式的事件流 """ if not task_analysis["sub_agents"]: raise BusinessException("没有可用的子 Agent", BizCode.AGENT_CONFIG_MISSING) - + message = task_analysis.get("message", "") routing_decision = task_analysis.get("routing_decision") - + # 1. 检查是否需要协作 if routing_decision and routing_decision.get("need_collaboration"): # 需要多 Agent 协作,使用流式整合 logger.info("检测到需要多 Agent 协作,使用流式整合") - + async for event in self._execute_collaboration_stream( task_analysis, conversation_id, @@ -952,11 +955,11 @@ class MultiAgentOrchestrator: ): yield event return - + # 2. 单 Agent 模式:如果有 Master Agent 的决策,直接使用 if routing_decision and routing_decision.get("selected_agent_id"): agent_id = routing_decision["selected_agent_id"] - + logger.info( "使用 Master Agent 的路由决策(流式)", extra={ @@ -968,27 +971,27 @@ class MultiAgentOrchestrator: else: # 2. 降级:使用旧的路由逻辑 logger.warning("未获取到 Master Agent 决策,使用旧路由逻辑(流式)") - use_llm = task_analysis.get("use_llm_routing", True) - selected_agent_info = await self._route_by_rules( - message, - task_analysis["sub_agents"], - use_llm=use_llm, - conversation_id=str(conversation_id) if conversation_id else None - ) - - if not selected_agent_info: - selected_agent_info = task_analysis["sub_agents"][0] - logger.info("未匹配到路由规则,使用默认 Agent") - + # use_llm = task_analysis.get("use_llm_routing", True) + # selected_agent_info = await self._route_by_rules( + # message, + # task_analysis["sub_agents"], + # use_llm=use_llm, + # conversation_id=str(conversation_id) if conversation_id else None + # ) + # + # if not selected_agent_info: + selected_agent_info = task_analysis["sub_agents"][0] + logger.info("未匹配到路由规则,使用默认 Agent") + agent_id = selected_agent_info["agent_id"] - + # 3. 获取 Agent 配置 agent_data = self.sub_agents.get(agent_id) if not agent_data: raise BusinessException(f"子 Agent 不存在: {agent_id}", BizCode.AGENT_CONFIG_MISSING) - + agent_info = agent_data.get("info", {}) - + # 4. 发送路由信息事件 yield self._format_sse_event("agent_selected", { "agent_id": agent_id, @@ -999,7 +1002,7 @@ class MultiAgentOrchestrator: "strategy": routing_decision.get("strategy") if routing_decision else None } }) - + # 5. 流式执行子 Agent sub_conversation_id = None async for event in self._execute_sub_agent_stream( @@ -1023,15 +1026,15 @@ class MultiAgentOrchestrator: sub_conversation_id = data["conversation_id"] except: pass - + yield event - + # 6. 如果有会话 ID,发送一个包含它的事件 if sub_conversation_id: yield self._format_sse_event("conversation", { "conversation_id": sub_conversation_id }) - + async def _execute_conditional( self, task_analysis: Dict[str, Any], @@ -1043,26 +1046,26 @@ class MultiAgentOrchestrator: user_rag_memory_id: str = '' ) -> Dict[str, Any]: """条件路由执行(重构版 - 使用 Master Agent 的决策) - + Args: task_analysis: 任务分析结果(包含 Master Agent 的决策) conversation_id: 会话 ID user_id: 用户 ID - + Returns: 执行结果 """ if not task_analysis["sub_agents"]: raise BusinessException("没有可用的子 Agent", BizCode.AGENT_CONFIG_MISSING) - + message = task_analysis.get("message", "") routing_decision = task_analysis.get("routing_decision") - + if not routing_decision: raise BusinessException("缺少路由决策", BizCode.AGENT_CONFIG_MISSING) - + agent_id = routing_decision["selected_agent_id"] - + logger.info( "执行 Master Agent 的路由决策", extra={ @@ -1071,11 +1074,11 @@ class MultiAgentOrchestrator: "reasoning": routing_decision.get("reasoning") } ) - + # 检查是否需要协作 if routing_decision.get("need_collaboration"): collaboration_strategy = routing_decision.get("collaboration_strategy", "sequential") - + # 根据策略获取协作信息 if collaboration_strategy == "decomposition": # 问题拆分模式:使用 sub_questions @@ -1097,7 +1100,7 @@ class MultiAgentOrchestrator: "strategy": collaboration_strategy } ) - + # 执行多 Agent 协作 return await self._execute_collaboration( message=message, @@ -1108,14 +1111,14 @@ class MultiAgentOrchestrator: user_id=user_id, routing_decision=routing_decision ) - + # 3. 获取 Agent 配置 agent_data = self.sub_agents.get(agent_id) if not agent_data: raise BusinessException(f"子 Agent 不存在: {agent_id}", BizCode.AGENT_CONFIG_MISSING) - + agent_info = agent_data.get("info", {}) - + logger.info( "执行选中的 Agent", extra={ @@ -1124,7 +1127,7 @@ class MultiAgentOrchestrator: "message_preview": message[:50] } ) - + # 4. 执行 Agent result = await self._execute_sub_agent( agent_data["config"], @@ -1137,7 +1140,7 @@ class MultiAgentOrchestrator: storage_type, user_rag_memory_id ) - + # 5. 返回结果 return { "agent_id": agent_id, @@ -1146,7 +1149,7 @@ class MultiAgentOrchestrator: "conversation_id": result.get("conversation_id"), "routing_decision": routing_decision # 包含 Master Agent 的决策信息 } - + async def _execute_loop( self, task_analysis: Dict[str, Any], @@ -1158,30 +1161,30 @@ class MultiAgentOrchestrator: user_rag_memory_id: str = '' ) -> Dict[str, Any]: """循环执行(迭代优化) - + Args: task_analysis: 任务分析结果 conversation_id: 会话 ID user_id: 用户 ID - + Returns: 执行结果 """ max_iterations = self.config.execution_config.get("max_iterations", 5) - + if not task_analysis["sub_agents"]: raise BusinessException("没有可用的子 Agent", BizCode.AGENT_CONFIG_MISSING) - + agent_info = task_analysis["sub_agents"][0] agent_id = agent_info["agent_id"] agent_data = self.sub_agents.get(agent_id) - + if not agent_data: raise BusinessException(f"子 Agent 不存在: {agent_id}", BizCode.AGENT_CONFIG_MISSING) - + context = task_analysis.get("initial_context", {}) message = task_analysis.get("message", "") - + result = None for i in range(max_iterations): logger.info( @@ -1192,7 +1195,7 @@ class MultiAgentOrchestrator: "agent_name": agent_info.get("name") } ) - + result = await self._execute_sub_agent( agent_data["config"], message, @@ -1204,11 +1207,11 @@ class MultiAgentOrchestrator: storage_type, user_rag_memory_id ) - + # 简化版本:执行一次就返回 # 在实际应用中,应该验证结果是否满足条件 break - + return { "agent_id": agent_id, "agent_name": agent_info.get("name"), @@ -1216,7 +1219,7 @@ class MultiAgentOrchestrator: "result": result, "conversation_id": result.get("conversation_id") if result else None # 保存会话 ID } - + async def _execute_sub_agent_stream( self, agent_config: AgentConfig, @@ -1230,19 +1233,19 @@ class MultiAgentOrchestrator: user_rag_memory_id: str = '' ): """执行单个子 Agent(流式) - + Args: agent_config: Agent 配置 message: 消息 context: 上下文 conversation_id: 会话 ID user_id: 用户 ID - + Yields: SSE 格式的事件流 """ from app.services.draft_run_service import DraftRunService - + # 获取模型配置 model_config = self.db.get(ModelConfig, agent_config.default_model_config_id) if not model_config: @@ -1250,7 +1253,7 @@ class MultiAgentOrchestrator: "Agent 模型配置不存在", BizCode.AGENT_CONFIG_MISSING ) - + # 流式执行 Agent draft_service = DraftRunService(self.db) async for event in draft_service.run_stream( @@ -1267,7 +1270,7 @@ class MultiAgentOrchestrator: memory=memory ): yield event - + async def _execute_sub_agent( self, agent_config: AgentConfig, @@ -1281,19 +1284,19 @@ class MultiAgentOrchestrator: user_rag_memory_id: str = '' ) -> Dict[str, Any]: """执行单个子 Agent - + Args: agent_config: Agent 配置 message: 消息 context: 上下文 conversation_id: 会话 ID user_id: 用户 ID - + Returns: 执行结果 """ from app.services.draft_run_service import DraftRunService - + # 获取模型配置 model_config = self.db.get(ModelConfig, agent_config.default_model_config_id) if not model_config: @@ -1301,7 +1304,7 @@ class MultiAgentOrchestrator: "Agent 模型配置不存在", BizCode.AGENT_CONFIG_MISSING ) - + # 执行 Agent draft_service = DraftRunService(self.db) result = await draft_service.run( @@ -1317,38 +1320,38 @@ class MultiAgentOrchestrator: storage_type=storage_type, user_rag_memory_id=user_rag_memory_id ) - + return result - + async def _aggregate_results( self, results: Any ) -> str: """整合子 Agent 的结果 - + Args: results: 子 Agent 执行结果 - + Returns: 整合后的结果 """ strategy = self.config.aggregation_strategy - - if strategy == "merge": + + if strategy == AggregationStrategy.MERGE: return self._merge_results(results) - elif strategy == "vote": + elif strategy == AggregationStrategy.VOTE: return self._vote_results(results) - elif strategy == "priority": + elif strategy == AggregationStrategy.PRIORITY: return self._priority_results(results) else: return self._merge_results(results) - + def _merge_results(self, results: Any) -> str: """合并所有结果 - + Args: results: 执行结果 - + Returns: 合并后的结果 """ @@ -1363,22 +1366,22 @@ class MultiAgentOrchestrator: elif "error" in item: agent_name = item.get("agent_name", "Agent") merged.append(f"【{agent_name}】\n错误: {item['error']}") - + return "\n\n".join(merged) elif isinstance(results, dict): # 条件或循环执行的结果 if "result" in results: return results["result"].get("message", "") return str(results) - + return str(results) - + def _vote_results(self, results: Any) -> str: """投票选择最佳结果(简化版本) - + Args: results: 执行结果 - + Returns: 最佳结果 """ @@ -1387,15 +1390,15 @@ class MultiAgentOrchestrator: for item in results: if "result" in item: return item["result"].get("message", "") - + return self._merge_results(results) - + def _priority_results(self, results: Any) -> str: """按优先级选择结果(简化版本) - + Args: results: 执行结果 - + Returns: 优先级最高的结果 """ @@ -1403,48 +1406,48 @@ class MultiAgentOrchestrator: if isinstance(results, list) and results: if "result" in results[0]: return results[0]["result"].get("message", "") - + return self._merge_results(results) - + def _format_sse_event(self, event: str, data: Dict[str, Any]) -> str: """格式化 SSE 事件 - + Args: event: 事件类型 data: 事件数据 - + Returns: SSE 格式的字符串 """ import json return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" - + def _load_agent(self, release_id: uuid.UUID): """从发布版本加载 Agent 配置 - + Args: release_id: 发布版本 ID - + Returns: Agent 配置对象(包含发布版本的配置数据) """ from app.models import AppRelease, App - + # 获取发布版本 release = self.db.get(AppRelease, release_id) if not release: raise ResourceNotFoundException("发布版本", str(release_id)) - + # 从发布版本的 config 中获取 Agent 配置 config_data = release.config if not config_data: raise BusinessException(f"发布版本 {release_id} 缺少配置数据", BizCode.AGENT_CONFIG_MISSING) - + # 获取应用信息(用于 workspace_id) app = self.db.get(App, release.app_id) if not app: raise ResourceNotFoundException("应用", str(release.app_id)) - + # 创建一个类似 AgentConfig 的对象,包含所有需要的属性 class AgentConfigProxy: """Agent 配置代理对象,模拟 AgentConfig 的接口""" @@ -1461,7 +1464,7 @@ class MultiAgentOrchestrator: self.variables = config_data.get("variables", []) self.tools = config_data.get("tools", {}) self.default_model_config_id = release.default_model_config_id - + return AgentConfigProxy(release, app, config_data) async def _execute_collaboration( @@ -1475,7 +1478,7 @@ class MultiAgentOrchestrator: routing_decision: Dict[str, Any] ) -> Dict[str, Any]: """执行多 Agent 协作 - + Args: message: 用户消息 collaboration_agents: 协作 Agent 列表 @@ -1484,7 +1487,7 @@ class MultiAgentOrchestrator: conversation_id: 会话 ID user_id: 用户 ID routing_decision: 路由决策 - + Returns: 协作执行结果 """ @@ -1495,7 +1498,7 @@ class MultiAgentOrchestrator: "strategy": strategy } ) - + if strategy == "decomposition": # 问题拆分:每个 Agent 处理一个子问题 return await self._execute_decomposition_collaboration( @@ -1505,7 +1508,7 @@ class MultiAgentOrchestrator: elif strategy == "sequential": # 顺序协作:按顺序执行,后续 Agent 可以使用前面的结果 return await self._execute_sequential_collaboration( - message, collaboration_agents, initial_context, + message, collaboration_agents, initial_context, conversation_id, user_id, routing_decision ) elif strategy == "parallel": @@ -1526,13 +1529,13 @@ class MultiAgentOrchestrator: message, collaboration_agents, initial_context, conversation_id, user_id, routing_decision ) - + def _check_dependencies(self, sub_questions: List[Dict[str, Any]]) -> bool: """检测子问题是否有依赖关系 - + Args: sub_questions: 子问题列表 - + Returns: True 如果有依赖关系,False 如果完全独立 """ @@ -1548,7 +1551,7 @@ class MultiAgentOrchestrator: ) return True return False - + async def _execute_decomposition_collaboration( self, message: str, @@ -1559,23 +1562,23 @@ class MultiAgentOrchestrator: routing_decision: Dict[str, Any] ) -> Dict[str, Any]: """问题拆分执行 - + 每个 Agent 处理一个独立的子问题,避免重复 - + 示例: 原问题:"写一首关于雪的古诗,并计算3+8" 拆分后: - 子问题1:"写一首关于雪的古诗" → 文科导师 - 子问题2:"计算3+8" → 理科导师 - + Args: collaboration_agents: 在 decomposition 模式下,这就是 sub_questions 列表 """ results = [] - + # collaboration_agents 在 decomposition 模式下就是 sub_questions sub_questions = collaboration_agents - + if not sub_questions: # 如果没有子问题,降级到普通协作 logger.warning( @@ -1589,7 +1592,7 @@ class MultiAgentOrchestrator: message, collaboration_agents, initial_context, conversation_id, user_id, routing_decision ) - + logger.info( "开始问题拆分执行", extra={ @@ -1597,31 +1600,31 @@ class MultiAgentOrchestrator: "original_message": message[:50] } ) - + # 检测是否有依赖关系 has_dependencies = self._check_dependencies(sub_questions) - + # 获取执行模式配置 execution_mode = self.config.execution_config.get("sub_agent_execution_mode", "parallel") - + # 如果有依赖关系,强制使用串行模式 if has_dependencies: logger.info("检测到子问题有依赖关系,强制使用串行执行") execution_mode = "sequential" - + if execution_mode == "sequential": # 串行执行模式 logger.info(f"串行执行 {len(sub_questions)} 个子问题") - + # 用于存储已完成的子问题结果(按 order 索引) completed_results = {} - + for sub_q in sorted(sub_questions, key=lambda x: x.get("order", 0)): sub_question = sub_q.get("question", "") agent_id = sub_q.get("agent_id") order = sub_q.get("order", 0) depends_on = sub_q.get("depends_on", []) - + agent_data = self.sub_agents.get(agent_id) if not agent_data: logger.warning( @@ -1632,9 +1635,9 @@ class MultiAgentOrchestrator: } ) continue - + agent_name = agent_data.get("info", {}).get("name", agent_id) - + # 如果有依赖,构建包含依赖结果的上下文 context_with_deps = initial_context.copy() if depends_on: @@ -1646,7 +1649,7 @@ class MultiAgentOrchestrator: "question": dep_result.get("sub_question"), "answer": dep_result.get("result", {}).get("message", "") }) - + if dependency_results: context_with_deps["previous_results"] = dependency_results logger.info( @@ -1657,7 +1660,7 @@ class MultiAgentOrchestrator: "dependency_count": len(dependency_results) } ) - + logger.info( "处理子问题(串行)", extra={ @@ -1667,7 +1670,7 @@ class MultiAgentOrchestrator: "has_dependencies": bool(depends_on) } ) - + # 串行执行 try: result = await self._execute_sub_agent( @@ -1700,18 +1703,18 @@ class MultiAgentOrchestrator: # 并行执行模式(默认) tasks = [] agent_infos = [] - + for sub_q in sorted(sub_questions, key=lambda x: x.get("order", 0)): sub_question = sub_q.get("question", "") agent_id = sub_q.get("agent_id") - + agent_data = self.sub_agents.get(agent_id) if not agent_data: logger.warning(f"子问题对应的 Agent 不存在: {agent_id}") continue - + agent_name = agent_data.get("info", {}).get("name", agent_id) - + logger.info( "准备处理子问题(并行)", extra={ @@ -1720,7 +1723,7 @@ class MultiAgentOrchestrator: "agent_name": agent_name } ) - + # 创建异步任务 task = self._execute_sub_agent( agent_data["config"], @@ -1735,11 +1738,11 @@ class MultiAgentOrchestrator: "agent_name": agent_name, "sub_question": sub_question }) - + # 并行执行所有任务 logger.info(f"并行执行 {len(tasks)} 个子问题") task_results = await asyncio.gather(*tasks, return_exceptions=True) - + # 处理结果 for i, result in enumerate(task_results): if isinstance(result, Exception): @@ -1758,10 +1761,10 @@ class MultiAgentOrchestrator: "result": result, "conversation_id": result.get("conversation_id") }) - + # 整合结果(问题拆分模式) final_response = await self._merge_decomposition_results(results, message) - + return { "agent_id": "decomposition", "agent_name": "问题拆分协作", @@ -1773,7 +1776,7 @@ class MultiAgentOrchestrator: "routing_decision": routing_decision, "collaboration_results": results } - + async def _execute_sequential_collaboration( self, message: str, @@ -1784,27 +1787,27 @@ class MultiAgentOrchestrator: routing_decision: Dict[str, Any] ) -> Dict[str, Any]: """顺序协作执行 - + 每个 Agent 按顺序执行,后续 Agent 可以看到前面 Agent 的结果 """ results = [] context = initial_context.copy() accumulated_response = [] - + # 按 order 排序 sorted_agents = sorted(collaboration_agents, key=lambda x: x.get("order", 0)) - + for agent_info in sorted_agents: agent_id = agent_info["agent_id"] agent_data = self.sub_agents.get(agent_id) - + if not agent_data: logger.warning(f"协作 Agent 不存在: {agent_id}") continue - + agent_name = agent_data.get("info", {}).get("name", agent_id) agent_task = agent_info.get("task", "处理任务") - + logger.info( "执行协作 Agent", extra={ @@ -1815,7 +1818,7 @@ class MultiAgentOrchestrator: "order": agent_info.get("order") } ) - + # 构建该 Agent 的消息(包含任务说明和前面的结果) agent_message = message if context.get("previous_results"): @@ -1827,7 +1830,7 @@ class MultiAgentOrchestrator: {context['previous_results']} 请基于以上信息,完成你的任务。""" - + # 执行 Agent result = await self._execute_sub_agent( agent_data["config"], @@ -1836,9 +1839,9 @@ class MultiAgentOrchestrator: conversation_id, user_id ) - + agent_response = result.get("message", "") - + results.append({ "agent_id": agent_id, "agent_name": agent_name, @@ -1847,23 +1850,23 @@ class MultiAgentOrchestrator: "result": result, "conversation_id": result.get("conversation_id") }) - + # 更新上下文 context[f"result_from_{agent_name}"] = agent_response - + # 累积响应 accumulated_response.append(f"【{agent_name}】\n{agent_response}") - + # 更新 previous_results 供下一个 Agent 使用 context["previous_results"] = "\n\n".join(accumulated_response) - + # 整合最终结果 final_response = await self._merge_collaboration_results( - results, + results, strategy="sequential", original_question=message ) - + return { "agent_id": "collaboration", "agent_name": "多Agent协作", @@ -1875,7 +1878,7 @@ class MultiAgentOrchestrator: "routing_decision": routing_decision, "collaboration_results": results } - + async def _execute_parallel_collaboration( self, message: str, @@ -1886,28 +1889,28 @@ class MultiAgentOrchestrator: routing_decision: Dict[str, Any] ) -> Dict[str, Any]: """并行协作执行 - + 所有 Agent 同时执行,互不依赖 """ tasks = [] agent_infos = [] - + for agent_info in collaboration_agents: agent_id = agent_info["agent_id"] agent_data = self.sub_agents.get(agent_id) - + if not agent_data: continue - + agent_task = agent_info.get("task", "处理任务") - + # 构建该 Agent 的消息 agent_message = f"""原始问题:{message} 你的任务:{agent_task} 请完成你的任务。""" - + # 创建任务 task = self._execute_sub_agent( agent_data["config"], @@ -1918,15 +1921,15 @@ class MultiAgentOrchestrator: ) tasks.append(task) agent_infos.append((agent_id, agent_data, agent_info)) - + # 并行执行 task_results = await asyncio.gather(*tasks, return_exceptions=True) - + # 处理结果 results = [] for (agent_id, agent_data, agent_info), result in zip(agent_infos, task_results, strict=False): agent_name = agent_data.get("info", {}).get("name", agent_id) - + if isinstance(result, Exception): logger.error(f"协作 Agent 执行失败: {agent_name}", extra={"error": str(result)}) results.append({ @@ -1943,14 +1946,14 @@ class MultiAgentOrchestrator: "result": result, "conversation_id": result.get("conversation_id") }) - + # 整合结果 final_response = await self._merge_collaboration_results( - results, + results, strategy="parallel", original_question=message ) - + return { "agent_id": "collaboration", "agent_name": "多Agent协作", @@ -1962,7 +1965,7 @@ class MultiAgentOrchestrator: "routing_decision": routing_decision, "collaboration_results": results } - + async def _execute_hierarchical_collaboration( self, message: str, @@ -1973,42 +1976,42 @@ class MultiAgentOrchestrator: routing_decision: Dict[str, Any] ) -> Dict[str, Any]: """层级协作执行 - + 主 Agent(primary)负责协调,其他 Agent 提供辅助信息 """ # 找到主 Agent 和辅助 Agents primary_agent = None secondary_agents = [] - + for agent_info in collaboration_agents: if agent_info.get("role") == "primary": primary_agent = agent_info else: secondary_agents.append(agent_info) - + if not primary_agent: # 如果没有指定主 Agent,使用第一个 primary_agent = collaboration_agents[0] secondary_agents = collaboration_agents[1:] - + # 1. 先执行辅助 Agents(并行) secondary_results = [] if secondary_agents: tasks = [] agent_infos = [] - + for agent_info in secondary_agents: agent_id = agent_info["agent_id"] agent_data = self.sub_agents.get(agent_id) - + if not agent_data: continue - + agent_task = agent_info.get("task", "提供专业意见") agent_message = f"""问题:{message} 请从你的专业角度提供意见:{agent_task}""" - + task = self._execute_sub_agent( agent_data["config"], agent_message, @@ -2018,13 +2021,13 @@ class MultiAgentOrchestrator: ) tasks.append(task) agent_infos.append((agent_id, agent_data, agent_info)) - + # 并行执行辅助 Agents task_results = await asyncio.gather(*tasks, return_exceptions=True) - + for (agent_id, agent_data, agent_info), result in zip(agent_infos, task_results, strict=False): agent_name = agent_data.get("info", {}).get("name", agent_id) - + if not isinstance(result, Exception): secondary_results.append({ "agent_id": agent_id, @@ -2032,34 +2035,34 @@ class MultiAgentOrchestrator: "role": "secondary", "result": result }) - + # 2. 执行主 Agent(整合辅助 Agents 的结果) primary_agent_id = primary_agent["agent_id"] primary_agent_data = self.sub_agents.get(primary_agent_id) - + if not primary_agent_data: raise BusinessException(f"主协作 Agent 不存在: {primary_agent_id}", BizCode.AGENT_CONFIG_MISSING) - + # 构建主 Agent 的消息(包含辅助 Agents 的结果) primary_message = f"""问题:{message} 你的任务:{primary_agent.get('task', '综合分析并给出最终答案')} """ - + if secondary_results: expert_opinions = [] for sec_result in secondary_results: expert_opinions.append( f"【{sec_result['agent_name']}的意见】\n{sec_result['result'].get('message', '')}" ) - + primary_message += f""" 其他专家的意见: {chr(10).join(expert_opinions)} 请综合以上专家意见,给出你的最终答案。""" - + # 执行主 Agent primary_result = await self._execute_sub_agent( primary_agent_data["config"], @@ -2068,12 +2071,12 @@ class MultiAgentOrchestrator: conversation_id, user_id ) - + primary_agent_name = primary_agent_data.get("info", {}).get("name", primary_agent_id) - + # 整合所有结果 all_results = [*secondary_results, {"agent_id": primary_agent_id, "agent_name": primary_agent_name, "role": "primary", "result": primary_result, "conversation_id": primary_result.get("conversation_id")}] - + return { "agent_id": primary_agent_id, "agent_name": primary_agent_name, @@ -2082,29 +2085,29 @@ class MultiAgentOrchestrator: "routing_decision": routing_decision, "collaboration_results": all_results } - + async def _merge_decomposition_results( self, results: List[Dict[str, Any]], original_question: str = None ) -> str: """整合问题拆分的结果 - + 每个 Agent 处理了不同的子问题,需要按顺序组合 - + Args: results: 结果列表,每个包含 sub_question 和 result original_question: 原始用户问题 - + Returns: 整合后的响应 """ if not results: return "未获取到有效结果" - + # 获取整合模式 merge_mode = self.config.execution_config.get("result_merge_mode", "smart") - + if merge_mode == "master": # 使用 Master Agent 整合 return await self._master_merge_results(results, "decomposition", original_question) @@ -2115,9 +2118,9 @@ class MultiAgentOrchestrator: message = result.get("result", {}).get("message", "") if message: parts.append(message) - + return "\n\n".join(parts) - + async def _merge_collaboration_results( self, results: List[Dict[str, Any]], @@ -2125,12 +2128,12 @@ class MultiAgentOrchestrator: original_question: str = None ) -> str: """整合协作结果(智能去重和合并) - + Args: results: 协作结果列表 strategy: 协作策略 original_question: 原始用户问题 - + Returns: 整合后的响应 """ @@ -2143,32 +2146,32 @@ class MultiAgentOrchestrator: } ) return "协作执行失败,没有可用结果" - + # 获取整合策略配置 merge_mode = self.config.execution_config.get("result_merge_mode", "smart") - + if merge_mode == "master": # Master Agent 整合:让 Master Agent 结合原始问题和子 Agent 答案生成最终回复 return await self._master_merge_results(results, strategy, original_question) else: # 默认使用智能整合 return self._smart_merge_results(results, strategy) - + def _smart_merge_results( self, results: List[Dict[str, Any]], strategy: str ) -> str: """智能整合结果(去重、提取关键信息) - + 适用场景:多个 Agent 回答相似问题,需要去重和优化 - + 注意:在流式场景下,用户已经看到了所有 Agent 的输出, 这个方法主要用于生成一个"整合后的版本"供后续使用(如保存到数据库) """ if not results: return "" - + # 提取所有消息 messages = [] for result in results: @@ -2177,57 +2180,57 @@ class MultiAgentOrchestrator: message = result.get("result", {}).get("message", "") if message: messages.append(message) - + if not messages: return "" - + if len(messages) == 1: # 只有一个结果,直接返回 return messages[0] - + # 多个结果:根据策略智能整合 if strategy == "decomposition": # 问题拆分:用户已经看到所有子问题的答案了 # 返回空字符串,表示不需要额外的整合输出 return "" - + elif strategy == "sequential": # 顺序协作:返回最后一个 Agent 的结果(它包含了前面的信息) return self._merge_sequential_smart(results) - + elif strategy == "parallel": # 并行协作:检查是否需要去重 return self._merge_parallel_smart(results) - + elif strategy == "hierarchical": # 层级协作:只返回主 Agent 的结果 return self._merge_hierarchical_smart(results) - + else: # 默认:返回最完整的一个 return max(messages, key=len) - + def _merge_sequential_smart(self, results: List[Dict[str, Any]]) -> str: """智能整合顺序协作结果 - + 顺序协作的特点:后续 Agent 会引用前面的结果 策略:只保留最后一个 Agent 的完整回答(它已经包含了前面的信息) """ if not results: return "" - + # 获取最后一个成功的结果 for result in reversed(results): if "error" not in result: message = result.get("result", {}).get("message", "") if message: return message - + return "未获取到有效结果" - + def _merge_parallel_smart(self, results: List[Dict[str, Any]]) -> str: """智能整合并行协作结果 - + 并行协作的特点:多个独立观点 策略: 1. 如果回答高度相似(重复),只保留一个 @@ -2240,16 +2243,16 @@ class MultiAgentOrchestrator: message = result.get("result", {}).get("message", "") if message: messages.append(message) - + if not messages: return "未获取到有效结果" - + if len(messages) == 1: return messages[0] - + # 检查相似度 similarity = self._calculate_similarity(messages) - + if similarity > 0.7: # 高度相似,只返回最长的一个 return max(messages, key=len) @@ -2257,10 +2260,10 @@ class MultiAgentOrchestrator: # 不同观点,合并(不显示 Agent 名称) # 使用分隔符区分不同部分 return "\n\n---\n\n".join(messages) - + def _merge_hierarchical_smart(self, results: List[Dict[str, Any]]) -> str: """智能整合层级协作结果 - + 层级协作的特点:主 Agent 已经综合了辅助 Agent 的意见 策略:只返回主 Agent 的结果 """ @@ -2270,14 +2273,14 @@ class MultiAgentOrchestrator: message = result.get("result", {}).get("message", "") if message: return message - + # 如果没有找到主 Agent,返回最后一个 if results: last_result = results[-1] return last_result.get("result", {}).get("message", "") - + return "未获取到有效结果" - + async def _master_merge_results( self, results: List[Dict[str, Any]], @@ -2285,32 +2288,32 @@ class MultiAgentOrchestrator: original_question: str = None ) -> str: """使用 Master Agent 整合多个子 Agent 的结果 - + Args: results: 子 Agent 的响应结果列表 strategy: 协作策略 original_question: 原始用户问题 - + Returns: Master Agent 整合后的最终回复 """ if not results: return "没有收到任何 Agent 的响应" - + if len(results) == 1: # 只有一个结果,直接返回 return results[0].get('result', {}).get('message', '') - + # 构建子 Agent 回答的汇总 agent_responses = [] for i, result in enumerate(results, 1): if "error" in result: continue - + agent_name = result.get('agent_name', f'Agent {i}') task = result.get('task', '') message = result.get('result', {}).get('message', '') - + if message: response_info = { 'agent_name': agent_name, @@ -2318,22 +2321,22 @@ class MultiAgentOrchestrator: 'response': message } agent_responses.append(response_info) - + if not agent_responses: return "未获取到有效结果" - + # 构建 Master Agent 的整合 prompt responses_text = "" for resp in agent_responses: agent_name = resp['agent_name'] task = resp['task'] response = resp['response'] - + if task: responses_text += f"\n### {agent_name}(任务:{task})的回答:\n{response}\n" else: responses_text += f"\n### {agent_name} 的回答:\n{response}\n" - + # 根据策略调整整合指令 strategy_instructions = { "decomposition": "这些是针对不同子问题的回答,请将它们整合成一个完整、连贯的答案。", @@ -2341,11 +2344,11 @@ class MultiAgentOrchestrator: "parallel": "这些是从不同角度并行分析的结果,请综合这些观点给出全面的答案。", "hierarchical": "这些是层级协作的结果,请综合各方意见给出最终答案。" } - + strategy_instruction = strategy_instructions.get(strategy, "请整合这些回答,生成统一的最终答案。") - + question_context = f"\n**原始问题**:{original_question}\n" if original_question else "" - + merge_prompt = f"""你是一个智能助手,现在需要整合多个专业 Agent 的回答,生成一个统一、连贯、完整的最终答案。 {question_context} **各个专业 Agent 的回答**: @@ -2362,29 +2365,29 @@ class MultiAgentOrchestrator: 5. 直接给出整合后的答案,不要添加"根据以上回答"等元信息 请生成最终的整合答案:""" - + try: # 调用 Master Agent 的 LLM 进行整合 from app.core.models import RedBearLLM from app.core.models.base import RedBearModelConfig from app.models import ModelApiKey, ModelType - + # 获取 Master Agent 的模型配置 master_agent_release = self.config.master_agent_release if not master_agent_release: logger.warning("没有配置 Master Agent,使用简单整合") return self._smart_merge_results(results, strategy) - + # 获取 API Key 配置 api_key_config = self.db.query(ModelApiKey).filter( ModelApiKey.model_config_id == master_agent_release.default_model_config_id, ModelApiKey.is_active == True ).first() - + if not api_key_config: logger.warning("Master Agent 没有可用的 API Key,使用简单整合") return self._smart_merge_results(results, strategy) - + logger.info( "使用 Master Agent 整合结果", extra={ @@ -2393,7 +2396,7 @@ class MultiAgentOrchestrator: "has_original_question": bool(original_question) } ) - + # 创建 RedBearModelConfig model_config = RedBearModelConfig( model_name=api_key_config.model_name, @@ -2403,33 +2406,33 @@ class MultiAgentOrchestrator: temperature=0.7, # 整合任务使用中等温度 max_tokens=2000 ) - + # 创建 LLM 实例 llm = RedBearLLM(model_config, type=ModelType.CHAT) - + # 调用模型进行整合 response = await llm.ainvoke(merge_prompt) - + # 提取响应内容 if hasattr(response, 'content'): merged_response = response.content else: merged_response = str(response) - + logger.info( "Master Agent 整合完成", extra={ "merged_length": len(merged_response) } ) - + return merged_response - + except Exception as e: logger.error(f"Master Agent 整合失败: {str(e)}") # 降级到智能整合 return self._smart_merge_results(results, strategy) - + async def _master_merge_results_stream( self, results: List[Dict[str, Any]], @@ -2437,37 +2440,37 @@ class MultiAgentOrchestrator: original_question: str = None ): """使用 Master Agent 流式整合多个子 Agent 的结果 - + Args: results: 子 Agent 的响应结果列表 strategy: 协作策略 original_question: 原始用户问题 - + Yields: SSE 格式的事件流 """ if not results: yield self._format_sse_event("message", {"content": "没有收到任何 Agent 的响应"}) return - + if len(results) == 1: # 只有一个结果,直接返回 yield self._format_sse_event("message", { "content": results[0].get('result', {}).get('message', '') }) return - + # 构建子 Agent 回答的汇总(与非流式版本相同) agent_responses = [] for i, result in enumerate(results, 1): if "error" in result: continue - + agent_name = result.get('agent_name', f'Agent {i}') task = result.get('task', '') sub_question = result.get('sub_question', '') message = result.get('result', {}).get('message', '') - + if message: response_info = { 'agent_name': agent_name, @@ -2475,33 +2478,33 @@ class MultiAgentOrchestrator: 'response': message } agent_responses.append(response_info) - + if not agent_responses: yield self._format_sse_event("message", {"content": "未获取到有效结果"}) return - + # 构建整合 prompt responses_text = "" for resp in agent_responses: agent_name = resp['agent_name'] task = resp['task'] response = resp['response'] - + if task: responses_text += f"\n### {agent_name}(任务:{task})的回答:\n{response}\n" else: responses_text += f"\n### {agent_name} 的回答:\n{response}\n" - + strategy_instructions = { "decomposition": "这些是针对不同子问题的回答,请将它们整合成一个完整、连贯的答案。", "sequential": "这些是按顺序协作的结果,后面的 Agent 可能依赖前面的结果,请整合成最终答案。", "parallel": "这些是从不同角度并行分析的结果,请综合这些观点给出全面的答案。", "hierarchical": "这些是层级协作的结果,请综合各方意见给出最终答案。" } - + strategy_instruction = strategy_instructions.get(strategy, "请整合这些回答,生成统一的最终答案。") question_context = f"\n**原始问题**:{original_question}\n" if original_question else "" - + merge_prompt = f"""你是一个智能助手,现在需要整合多个专业 Agent 的回答,生成一个统一、连贯、完整的最终答案。 {question_context} **各个专业 Agent 的回答**: @@ -2518,12 +2521,12 @@ class MultiAgentOrchestrator: 5. 直接给出整合后的答案,不要添加"根据以上回答"等元信息 请生成最终的整合答案:""" - + try: from app.core.models import RedBearLLM from app.core.models.base import RedBearModelConfig from app.models import ModelApiKey, ModelType - + # 获取 Master Agent 的模型配置 master_agent_release = self.config.master_agent_release if not master_agent_release: @@ -2531,19 +2534,19 @@ class MultiAgentOrchestrator: final_response = self._smart_merge_results(results, strategy) yield self._format_sse_event("message", {"content": final_response}) return - + # 获取 API Key 配置 api_key_config = self.db.query(ModelApiKey).filter( ModelApiKey.model_config_id == master_agent_release.default_model_config_id, ModelApiKey.is_active == True ).first() - + if not api_key_config: logger.warning("Master Agent 没有可用的 API Key,使用简单整合") final_response = self._smart_merge_results(results, strategy) yield self._format_sse_event("message", {"content": final_response}) return - + logger.info( "开始 Master Agent 流式整合", extra={ @@ -2551,7 +2554,7 @@ class MultiAgentOrchestrator: "strategy": strategy } ) - + # 创建 RedBearModelConfig(启用流式) model_config = RedBearModelConfig( model_name=api_key_config.model_name, @@ -2562,26 +2565,26 @@ class MultiAgentOrchestrator: max_tokens=2000, extra_params={"streaming": True} # 启用流式输出 ) - + # 创建 LLM 实例 llm = RedBearLLM(model_config, type=ModelType.CHAT) - + logger.info("开始流式调用 Master Agent LLM") - + # 流式调用模型进行整合 try: chunk_count = 0 logger.debug(f"开始流式调用,provider={api_key_config.provider}") - + # 获取底层模型 underlying_model = llm._model if hasattr(llm, '_model') else llm logger.debug(f"底层模型类型: {type(underlying_model).__name__}") - + # 使用底层模型的 astream 方法直接流式输出 # 这样可以绕过可能的包装器累积问题 async for chunk in underlying_model.astream(merge_prompt): chunk_count += 1 - + # 提取内容 if hasattr(chunk, 'content'): content = chunk.content @@ -2589,14 +2592,14 @@ class MultiAgentOrchestrator: content = chunk else: content = str(chunk) - + if content: if chunk_count <= 5: logger.debug(f"收到流式 chunk #{chunk_count}: {content[:30]}...") yield self._format_sse_event("message", {"content": content}) - + logger.info(f"Master Agent 流式整合完成,共 {chunk_count} 个 chunks") - + except AttributeError as e: # 如果底层模型不支持流式,降级到非流式 logger.warning(f"底层模型不支持流式,降级到非流式: {str(e)}") @@ -2606,43 +2609,43 @@ class MultiAgentOrchestrator: else: content = str(response) yield self._format_sse_event("message", {"content": content}) - + except Exception as e: logger.error(f"Master Agent 流式整合失败: {str(e)}") # 降级到智能整合 final_response = self._smart_merge_results(results, strategy) yield self._format_sse_event("message", {"content": final_response}) - + def _should_merge_results( self, results: List[Dict[str, Any]], strategy: str ) -> bool: """判断是否需要整合结果 - + Args: results: Agent 执行结果 strategy: 协作策略 - + Returns: True 如果需要整合,False 如果不需要 """ if not results or len(results) == 1: # 没有结果或只有一个结果,不需要整合 return False - + if strategy == "decomposition": # 问题拆分:每个子问题独立,用户已经看到所有答案 # 通常不需要整合(除非配置要求) return self.config.execution_config.get("force_merge_decomposition", False) - + if strategy == "hierarchical": # 层级协作:主 Agent 已经整合了,不需要再整合 return False - + # sequential 和 parallel 模式:可能需要整合去重 return True - + async def _parallel_stream_agents( self, agent_tasks: List[Tuple[str, str, Any, str, Dict[str, Any]]], @@ -2650,12 +2653,12 @@ class MultiAgentOrchestrator: user_id: Optional[str] ) -> AsyncIterator[Tuple[str, str, str, str]]: """并行流式执行多个 Agent,实时返回结果 - + Args: agent_tasks: [(agent_id, agent_name, agent_config, message, context), ...] conversation_id: 会话 ID user_id: 用户 ID - + Yields: (agent_id, agent_name, event_type, content) 元组 """ @@ -2676,29 +2679,29 @@ class MultiAgentOrchestrator: import json data_line = event.split("data: ", 1)[1].strip() data = json.loads(data_line) - + if "content" in data: yield (agent_id, agent_name, "content", data["content"]) except: pass - + # 发送完成信号 yield (agent_id, agent_name, "done", "") - + except Exception as e: logger.error(f"Agent {agent_name} 流式执行失败: {str(e)}") yield (agent_id, agent_name, "error", str(e)) - + # 创建所有 Agent 的流式任务 streams = [] for agent_id, agent_name, agent_config, message, context in agent_tasks: stream = stream_single_agent(agent_id, agent_name, agent_config, message, context) streams.append(stream) - + # 使用队列来合并多个异步流 queue = asyncio.Queue() active_streams = len(streams) - + async def consume_stream(stream, stream_id): """消费单个流并放入队列""" nonlocal active_streams @@ -2709,53 +2712,53 @@ class MultiAgentOrchestrator: active_streams -= 1 if active_streams == 0: await queue.put(None) # 所有流都完成了 - + # 启动所有流的消费任务 tasks = [ asyncio.create_task(consume_stream(stream, i)) for i, stream in enumerate(streams) ] - + # 从队列中读取并 yield while True: item = await queue.get() if item is None: # 所有流都完成 break yield item - + # 等待所有任务完成 await asyncio.gather(*tasks, return_exceptions=True) - + def _calculate_similarity(self, messages: List[str]) -> float: """计算消息相似度(简化版) - + Args: messages: 消息列表 - + Returns: 相似度 (0-1) """ if len(messages) < 2: return 0.0 - + # 简化版:比较长度和关键词 # 实际应用中可以使用更复杂的算法(如编辑距离、余弦相似度等) - + # 计算平均长度 avg_length = sum(len(m) for m in messages) / len(messages) - + # 如果长度差异很大,认为不相似 length_variance = sum(abs(len(m) - avg_length) for m in messages) / len(messages) if length_variance > avg_length * 0.5: return 0.3 - + # 提取关键词(简化:取前50个字符) keywords = [m[:50] for m in messages] - + # 计算重复度 unique_keywords = len(set(keywords)) total_keywords = len(keywords) - + similarity = 1.0 - (unique_keywords / total_keywords) - + return similarity diff --git a/api/app/services/multi_agent_service.py b/api/app/services/multi_agent_service.py index fd4bbea2..f4bc1e40 100644 --- a/api/app/services/multi_agent_service.py +++ b/api/app/services/multi_agent_service.py @@ -1,15 +1,19 @@ """多 Agent 配置管理服务""" import uuid -from typing import Optional, List, Tuple, Any +from typing import Optional, List, Tuple, Any, Annotated + +from fastapi import Depends from sqlalchemy.orm import Session from sqlalchemy import select, desc +from app.db import get_db from app.models import MultiAgentConfig, App, AgentConfig from app.schemas.multi_agent_schema import ( MultiAgentConfigCreate, MultiAgentConfigUpdate, MultiAgentRunRequest ) +from app.services.model_service import ModelApiKeyService from app.services.multi_agent_orchestrator import MultiAgentOrchestrator from app.core.exceptions import ResourceNotFoundException, BusinessException from app.core.error_codes import BizCode @@ -21,10 +25,10 @@ logger = get_business_logger() def convert_uuids_to_str(obj: Any) -> Any: """递归转换对象中的所有 UUID 为字符串 - + Args: obj: 要转换的对象(dict, list, UUID 等) - + Returns: 转换后的对象 """ @@ -40,10 +44,10 @@ def convert_uuids_to_str(obj: Any) -> Any: class MultiAgentService: """多 Agent 配置管理服务""" - + def __init__(self, db: Session): self.db = db - + def create_config( self, app_id: uuid.UUID, @@ -51,12 +55,12 @@ class MultiAgentService: created_by: uuid.UUID ) -> MultiAgentConfig: """创建多 Agent 配置 - + Args: app_id: 应用 ID data: 配置数据 created_by: 创建者 ID - + Returns: 多 Agent 配置 """ @@ -64,7 +68,7 @@ class MultiAgentService: app = self.db.get(App, app_id) if not app: raise ResourceNotFoundException("应用", str(app_id)) - + # 2. 检查是否已有有效配置 existing = self.db.scalars( select(MultiAgentConfig) @@ -76,22 +80,22 @@ class MultiAgentService: ).first() if existing: raise BusinessException("应用已有多 Agent 配置", BizCode.DUPLICATE_RESOURCE) - + # 3. 验证主 Agent 存在 master_agent = self.db.get(AgentConfig, data.master_agent_id) if not master_agent: raise ResourceNotFoundException("主 Agent", str(data.master_agent_id)) - + # 4. 验证子 Agent 存在 for sub_agent in data.sub_agents: agent = self.db.get(AgentConfig, sub_agent.agent_id) if not agent: raise ResourceNotFoundException("子 Agent", str(sub_agent.agent_id)) - + # 5. 创建配置(转换 UUID 为字符串以支持 JSON 序列化) sub_agents_data = [convert_uuids_to_str(sub_agent.model_dump()) for sub_agent in data.sub_agents] routing_rules_data = [convert_uuids_to_str(rule.model_dump()) for rule in data.routing_rules] if data.routing_rules else None - + # 处理 execution_config(可能是 None、字典或 Pydantic 模型) if data.execution_config is None: execution_config_data = {} @@ -99,7 +103,7 @@ class MultiAgentService: execution_config_data = convert_uuids_to_str(data.execution_config) else: execution_config_data = convert_uuids_to_str(data.execution_config.model_dump()) - + config = MultiAgentConfig( app_id=app_id, master_agent_id=data.master_agent_id, @@ -110,11 +114,11 @@ class MultiAgentService: execution_config=execution_config_data, aggregation_strategy=data.aggregation_strategy ) - + self.db.add(config) self.db.commit() self.db.refresh(config) - + logger.info( "创建多 Agent 配置成功", extra={ @@ -124,15 +128,15 @@ class MultiAgentService: "sub_agent_count": len(data.sub_agents) } ) - + return config - + def get_config(self, app_id: uuid.UUID) -> Optional[MultiAgentConfig]: """获取多 Agent 配置 - + Args: app_id: 应用 ID - + Returns: 多 Agent 配置,如果不存在返回 None """ @@ -144,24 +148,25 @@ class MultiAgentService: ) .order_by(MultiAgentConfig.updated_at.desc()) ).first() - + def get_multi_agent_configs(self, app_id: uuid.UUID) -> Optional[dict]: """通过 app_id 获取最新有效的多智能体配置,并将 agent_id 转换为 app_id - + Args: app_id: 应用 ID - + Returns: 转换后的配置字典,如果不存在返回 None """ config = self.get_config(app_id) if not config: return None - - # 转换 master_agent_id (release_id) 为 app_id - master_release = self.db.get(AppRelease, config.master_agent_id) - master_app_id = master_release.app_id if master_release else config.master_agent_id - + + #兼容代码 + if not config.default_model_config_id: + master_release = self.db.get(AppRelease, config.master_agent_id) + config.default_model_config_id = master_release.default_model_config_id if master_release else None + # 转换 sub_agents 中的 agent_id (release_id) 为 app_id converted_sub_agents = [] for sub_agent in config.sub_agents: @@ -176,13 +181,13 @@ class MultiAgentService: except Exception as e: logger.warning(f"转换 sub_agent agent_id 失败: {release_id}, 错误: {str(e)}") converted_sub_agents.append(sub_agent_copy) - + # 构建返回的配置字典 return { "id": config.id, "app_id": config.app_id, - "master_agent_id": master_app_id, - "master_agent_name": config.master_agent_name, + "default_model_config_id": config.default_model_config_id, + "model_parameters": config.model_parameters, "orchestration_mode": config.orchestration_mode, "sub_agents": converted_sub_agents, "routing_rules": config.routing_rules, @@ -192,133 +197,134 @@ class MultiAgentService: "created_at": config.created_at, "updated_at": config.updated_at } - + def get_published_config_by_agent_id(self, agent_id: uuid.UUID) -> Optional[dict]: """通过 agent_id 获取当前发布版本的完整配置 - + Args: agent_id: Agent 配置 ID - + Returns: 当前发布版本的配置字典,如果没有发布版本则返回 None """ from app.models import AppRelease - + # 查询 Agent 配置 agent_config = self.db.get(AgentConfig, agent_id) if not agent_config: logger.warning(f"Agent 配置不存在: {agent_id}") return None - + # 获取关联的应用 app = self.db.get(App, agent_config.app_id) if not app or not app.current_release_id: logger.warning(f"应用未发布或不存在: app_id={agent_config.app_id}") return None - + # 获取当前发布版本 release = self.db.get(AppRelease, app.current_release_id) if not release: logger.warning(f"发布版本不存在: release_id={app.current_release_id}") return None - + # 从发布版本的 config 中获取完整配置 # config 是一个 JSON 对象,包含了发布时的配置快照 config_data = release.config if config_data and isinstance(config_data, dict): return config_data - + return None def get_published_by_agent_id(self, agent_id: uuid.UUID) -> Optional[AppRelease]: """通过 agent_id 获取当前发布版本的完整配置 - + Args: agent_id: Agent 配置 ID - + Returns: 当前发布版本的配置字典,如果没有发布版本则返回 None """ - + # 获取关联的应用 app = self.db.get(App, agent_id) if not app or not app.current_release_id: logger.warning(f"应用未发布或不存在: app_id={agent_id}") return None - + # 获取当前发布版本 release = self.db.get(AppRelease, app.current_release_id) if not release: logger.warning(f"发布版本不存在: release_id={app.current_release_id}") return None return release - - def update_config( - self, - app_id: uuid.UUID, - data: MultiAgentConfigUpdate - ) -> MultiAgentConfig: - """更新多 Agent 配置 - - Args: - app_id: 应用 ID - data: 更新数据 - - Returns: - 更新后的配置 - """ - config = self.get_config(app_id) - if not config: - # 1. 验证应用存在 - app = self.db.get(App, app_id) - if not app: - raise ResourceNotFoundException("应用", str(app_id)) - - # 2. 验证主 Agent 存在并获取发布版本 ID - master_app_release = self.get_published_by_agent_id(data.master_agent_id) - if not master_app_release: - raise ResourceNotFoundException("主 Agent 未发布或不存在", str(data.master_agent_id)) - - # 使用发布版本 ID - data.master_agent_id = master_app_release.id - # 3. 验证子 Agent 存在并获取发布版本 ID - for sub_agent in data.sub_agents: - agent_app_release = self.get_published_by_agent_id(sub_agent.agent_id) - if not agent_app_release: - raise ResourceNotFoundException("子 Agent 未发布或不存在", str(sub_agent.agent_id)) - + def check_config_data(self,app_id: uuid.UUID, data: MultiAgentConfigUpdate) -> MultiAgentConfig: + # 1. 验证应用存在 + app = self.db.get(App, app_id) + if not app: + raise ResourceNotFoundException("应用", str(app_id)) + + # 2. 验证模型配置 + model_api_key = ModelApiKeyService.get_a_api_key(self.db,data.default_model_config_id) + if not model_api_key: + raise ResourceNotFoundException("模型配置", str(data.default_model_config_id)) + + # 3. 验证子 Agent 存在并获取发布版本 ID + for sub_agent in data.sub_agents: + agent_app_release = self.get_published_by_agent_id(sub_agent.agent_id) + if not agent_app_release: + raise ResourceNotFoundException("子 Agent 未发布或不存在", str(sub_agent.agent_id)) + # 使用发布版本 ID - sub_agent.agent_id = agent_app_release.id - - - # 5. 创建配置(转换 UUID 为字符串以支持 JSON 序列化) - sub_agents_data = [convert_uuids_to_str(sub_agent.model_dump()) for sub_agent in data.sub_agents] - # routing_rules_data = [convert_uuids_to_str(rule.model_dump()) for rule in data.routing_rules] if data.routing_rules else None - - # 处理 execution_config(可能是 None、字典或 Pydantic 模型) - if data.execution_config is None: - execution_config_data = {} - elif isinstance(data.execution_config, dict): + sub_agent.agent_id = agent_app_release.id + + # 5. 创建配置(转换 UUID 为字符串以支持 JSON 序列化) + sub_agents_data = [convert_uuids_to_str(sub_agent.model_dump()) for sub_agent in data.sub_agents] + # routing_rules_data = [convert_uuids_to_str(rule.model_dump()) for rule in data.routing_rules] if data.routing_rules else None + + # 处理 execution_config(可能是 None、字典或 Pydantic 模型) + if data.execution_config is None: + execution_config_data = {} + elif isinstance(data.execution_config, dict): execution_config_data = convert_uuids_to_str(data.execution_config) - else: - execution_config_data = convert_uuids_to_str(data.execution_config.model_dump()) - - config = MultiAgentConfig( + else: + execution_config_data = convert_uuids_to_str(data.execution_config.model_dump()) + + config = MultiAgentConfig( app_id=app_id, master_agent_id=data.master_agent_id, master_agent_name=data.master_agent_name, + default_model_config_id=data.default_model_config_id, + model_parameters=data.model_parameters, orchestration_mode=data.orchestration_mode, sub_agents=sub_agents_data, # routing_rules=routing_rules_data, execution_config=execution_config_data, aggregation_strategy=data.aggregation_strategy ) - + return config + + def update_config( + self, + app_id: uuid.UUID, + data: MultiAgentConfigUpdate + ) -> MultiAgentConfig: + """更新多 Agent 配置 + + Args: + app_id: 应用 ID + data: 更新数据 + + Returns: + 更新后的配置 + """ + config = self.get_config(app_id) + newConfig = self.check_config_data(app_id, data) + if not config: + config = newConfig self.db.add(config) self.db.commit() self.db.refresh(config) - logger.info( "创建多 Agent 配置成功", extra={ @@ -329,56 +335,17 @@ class MultiAgentService: } ) return config - # raise ResourceNotFoundException("多 Agent 配置", str(app_id)) - - # 更新字段 - if data.master_agent_id is not None: - # 验证主 Agent 存在 - # 3. 验证主 Agent 存在并获取发布配置 - master_app_release = self.get_published_by_agent_id(data.master_agent_id) - if not master_app_release: - raise ResourceNotFoundException("主 Agent 未发布或", str(data.master_agent_id)) - config.master_agent_id = master_app_release.id - - if data.master_agent_name is not None: - config.master_agent_name = data.master_agent_name - - if data.orchestration_mode is not None: - config.orchestration_mode = data.orchestration_mode - - if data.sub_agents is not None: - # 验证子 Agent 存在,并获取其发布的 config_id - updated_sub_agents = [] - for sub_agent in data.sub_agents: - agent_app_release = self.get_published_by_agent_id(sub_agent.agent_id) - if not agent_app_release: - raise ResourceNotFoundException("子 Agent 未发布或", str(sub_agent.agent_id)) - sub_agent.agent_id = agent_app_release.id - sub_agent_dict = convert_uuids_to_str(sub_agent.model_dump()) - updated_sub_agents.append(sub_agent_dict) - - config.sub_agents = updated_sub_agents - - # if data.routing_rules is not None: - # config.routing_rules = [convert_uuids_to_str(rule.model_dump()) for rule in data.routing_rules] if data.routing_rules else None - - if data.execution_config is not None: - if isinstance(data.execution_config, dict): - execution_config_data = convert_uuids_to_str(data.execution_config) - else: - execution_config_data = convert_uuids_to_str(data.execution_config.model_dump()) - config.execution_config = execution_config_data - - if data.aggregation_strategy is not None: - config.aggregation_strategy = data.aggregation_strategy - - if data.is_active is not None: - config.is_active = data.is_active - + config.default_model_config_id = newConfig.default_model_config_id + config.model_parameters = newConfig.model_parameters + config.orchestration_mode = newConfig.orchestration_mode + config.sub_agents = newConfig.sub_agents + config.routing_rules = newConfig.routing_rules + config.execution_config = newConfig.execution_config + config.aggregation_strategy = newConfig.aggregation_strategy self.db.commit() self.db.refresh(config) - + logger.info( "更新多 Agent 配置成功", extra={ @@ -386,23 +353,23 @@ class MultiAgentService: "app_id": str(app_id) } ) - + return config - + def delete_config(self, app_id: uuid.UUID) -> None: """删除多 Agent 配置 - + Args: app_id: 应用 ID """ config = self.get_config(app_id) if not config: raise ResourceNotFoundException("多 Agent 配置", str(app_id)) - + # 逻辑删除多 Agent 配置 config.is_active = False self.db.commit() - + logger.info( "删除多 Agent 配置成功", extra={ @@ -410,18 +377,18 @@ class MultiAgentService: "app_id": str(app_id) } ) - + async def run( self, app_id: uuid.UUID, request: MultiAgentRunRequest ) -> dict: """运行多 Agent 任务 - + Args: app_id: 应用 ID request: 运行请求 - + Returns: 执行结果 """ @@ -429,13 +396,13 @@ class MultiAgentService: config = self.get_config(app_id) if not config: raise ResourceNotFoundException("多 Agent 配置", str(app_id)) - + if not config.is_active: raise BusinessException("多 Agent 配置已禁用", BizCode.RESOURCE_DISABLED) - + # 2. 创建编排器 orchestrator = MultiAgentOrchestrator(self.db, config) - + # 3. 执行任务 result = await orchestrator.execute( message=request.message, @@ -446,9 +413,9 @@ class MultiAgentService: web_search=getattr(request, 'web_search', False), # 网络搜索参数 memory=getattr(request, 'memory', True) # 记忆功能参数 ) - + return result - + async def run_stream( self, app_id: uuid.UUID, @@ -457,11 +424,11 @@ class MultiAgentService: user_rag_memory_id :str ): """运行多 Agent 任务(流式返回) - + Args: app_id: 应用 ID request: 运行请求 - + Yields: SSE 格式的事件流 """ @@ -469,13 +436,13 @@ class MultiAgentService: config = self.get_config(app_id) if not config: raise ResourceNotFoundException("多 Agent 配置", str(app_id)) - + if not config.is_active: raise BusinessException("多 Agent 配置已禁用", BizCode.RESOURCE_DISABLED) - + # 2. 创建编排器 orchestrator = MultiAgentOrchestrator(self.db, config) - + # 3. 流式执行任务 async for event in orchestrator.execute_stream( message=request.message, @@ -489,113 +456,113 @@ class MultiAgentService: user_rag_memory_id=user_rag_memory_id ): yield event - - def add_sub_agent( - self, - app_id: uuid.UUID, - agent_id: uuid.UUID, - name: str, - role: Optional[str] = None, - priority: int = 1, - capabilities: Optional[List[str]] = None - ) -> MultiAgentConfig: - """添加子 Agent - - Args: - app_id: 应用 ID - agent_id: Agent ID - name: Agent 名称 - role: 角色描述 - priority: 优先级 - capabilities: 能力列表 - - Returns: - 更新后的配置 - """ - config = self.get_config(app_id) - if not config: - raise ResourceNotFoundException("多 Agent 配置", str(app_id)) - - # 验证 Agent 存在 - agent = self.db.get(AgentConfig, agent_id) - if not agent: - raise ResourceNotFoundException("Agent", str(agent_id)) - - # 检查是否已存在 - for sub_agent in config.sub_agents: - if sub_agent["agent_id"] == str(agent_id): - raise BusinessException("Agent 已存在于配置中", BizCode.DUPLICATE_RESOURCE) - - # 添加子 Agent - new_sub_agent = { - "agent_id": str(agent_id), - "name": name, - "role": role, - "priority": priority, - "capabilities": capabilities or [] - } - - config.sub_agents.append(new_sub_agent) - - # 标记为已修改 - self.db.add(config) - self.db.commit() - self.db.refresh(config) - - logger.info( - "添加子 Agent 成功", - extra={ - "config_id": str(config.id), - "agent_id": str(agent_id), - "agent_name": name - } - ) - - return config - - def remove_sub_agent( - self, - app_id: uuid.UUID, - agent_id: uuid.UUID - ) -> MultiAgentConfig: - """移除子 Agent - - Args: - app_id: 应用 ID - agent_id: Agent ID - - Returns: - 更新后的配置 - """ - config = self.get_config(app_id) - if not config: - raise ResourceNotFoundException("多 Agent 配置", str(app_id)) - - # 查找并移除 - original_count = len(config.sub_agents) - config.sub_agents = [ - sub_agent for sub_agent in config.sub_agents - if sub_agent["agent_id"] != str(agent_id) - ] - - if len(config.sub_agents) == original_count: - raise ResourceNotFoundException("子 Agent", str(agent_id)) - - # 标记为已修改 - self.db.add(config) - self.db.commit() - self.db.refresh(config) - - logger.info( - "移除子 Agent 成功", - extra={ - "config_id": str(config.id), - "agent_id": str(agent_id) - } - ) - - return config - + + # def add_sub_agent( + # self, + # app_id: uuid.UUID, + # agent_id: uuid.UUID, + # name: str, + # role: Optional[str] = None, + # priority: int = 1, + # capabilities: Optional[List[str]] = None + # ) -> MultiAgentConfig: + # """添加子 Agent + + # Args: + # app_id: 应用 ID + # agent_id: Agent ID + # name: Agent 名称 + # role: 角色描述 + # priority: 优先级 + # capabilities: 能力列表 + + # Returns: + # 更新后的配置 + # """ + # config = self.get_config(app_id) + # if not config: + # raise ResourceNotFoundException("多 Agent 配置", str(app_id)) + + # # 验证 Agent 存在 + # agent = self.db.get(AgentConfig, agent_id) + # if not agent: + # raise ResourceNotFoundException("Agent", str(agent_id)) + + # # 检查是否已存在 + # for sub_agent in config.sub_agents: + # if sub_agent["agent_id"] == str(agent_id): + # raise BusinessException("Agent 已存在于配置中", BizCode.DUPLICATE_RESOURCE) + + # # 添加子 Agent + # new_sub_agent = { + # "agent_id": str(agent_id), + # "name": name, + # "role": role, + # "priority": priority, + # "capabilities": capabilities or [] + # } + + # config.sub_agents.append(new_sub_agent) + + # # 标记为已修改 + # self.db.add(config) + # self.db.commit() + # self.db.refresh(config) + + # logger.info( + # "添加子 Agent 成功", + # extra={ + # "config_id": str(config.id), + # "agent_id": str(agent_id), + # "agent_name": name + # } + # ) + + # return config + + # def remove_sub_agent( + # self, + # app_id: uuid.UUID, + # agent_id: uuid.UUID + # ) -> MultiAgentConfig: + # """移除子 Agent + + # Args: + # app_id: 应用 ID + # agent_id: Agent ID + + # Returns: + # 更新后的配置 + # """ + # config = self.get_config(app_id) + # if not config: + # raise ResourceNotFoundException("多 Agent 配置", str(app_id)) + + # # 查找并移除 + # original_count = len(config.sub_agents) + # config.sub_agents = [ + # sub_agent for sub_agent in config.sub_agents + # if sub_agent["agent_id"] != str(agent_id) + # ] + + # if len(config.sub_agents) == original_count: + # raise ResourceNotFoundException("子 Agent", str(agent_id)) + + # # 标记为已修改 + # self.db.add(config) + # self.db.commit() + # self.db.refresh(config) + + # logger.info( + # "移除子 Agent 成功", + # extra={ + # "config_id": str(config.id), + # "agent_id": str(agent_id) + # } + # ) + + # return config + def list_configs( self, workspace_id: uuid.UUID, @@ -603,12 +570,12 @@ class MultiAgentService: pagesize: int = 20 ) -> Tuple[List[MultiAgentConfig], int]: """列出多 Agent 配置 - + Args: workspace_id: 工作空间 ID page: 页码 pagesize: 每页数量 - + Returns: 配置列表和总数 """ @@ -619,13 +586,21 @@ class MultiAgentService: .where(App.workspace_id == workspace_id) .order_by(desc(MultiAgentConfig.created_at)) ) - + # 总数 count_stmt = stmt.with_only_columns(MultiAgentConfig.id) total = len(self.db.execute(count_stmt).all()) - + # 分页 stmt = stmt.offset((page - 1) * pagesize).limit(pagesize) configs = list(self.db.scalars(stmt).all()) - + return configs, total + +# ==================== 依赖注入函数 ==================== + +def get_multi_agent_service( + db: Annotated[Session, Depends(get_db)] +) -> MultiAgentService: + """获取工作流服务(依赖注入)""" + return MultiAgentService(db)