Files
MemoryBear/api/app/services/multi_agent_service.py
2026-01-05 11:49:01 +08:00

618 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""多 Agent 配置管理服务"""
import uuid
from typing import Optional, List, Tuple, Any, Annotated
from fastapi import Depends
from sqlalchemy.orm import Session
from sqlalchemy import select, desc
from app.db import get_db
from app.models import MultiAgentConfig, App, AgentConfig
from app.schemas.multi_agent_schema import (
MultiAgentConfigCreate,
MultiAgentConfigUpdate,
MultiAgentRunRequest
)
from app.services.model_service import ModelApiKeyService
from app.services.multi_agent_orchestrator import MultiAgentOrchestrator
from app.core.exceptions import ResourceNotFoundException, BusinessException
from app.core.error_codes import BizCode
from app.core.logging_config import get_business_logger
from app.models import AppRelease
logger = get_business_logger()
def convert_uuids_to_str(obj: Any) -> Any:
"""递归转换对象中的所有 UUID 为字符串
Args:
obj: 要转换的对象dict, list, UUID 等)
Returns:
转换后的对象
"""
if isinstance(obj, uuid.UUID):
return str(obj)
elif isinstance(obj, dict):
return {k: convert_uuids_to_str(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_uuids_to_str(item) for item in obj]
else:
return obj
class MultiAgentService:
"""多 Agent 配置管理服务"""
def __init__(self, db: Session):
self.db = db
def create_config(
self,
app_id: uuid.UUID,
data: MultiAgentConfigCreate,
created_by: uuid.UUID
) -> MultiAgentConfig:
"""创建多 Agent 配置
Args:
app_id: 应用 ID
data: 配置数据
created_by: 创建者 ID
Returns:
多 Agent 配置
"""
# 1. 验证应用存在
app = self.db.get(App, app_id)
if not app:
raise ResourceNotFoundException("应用", str(app_id))
# 2. 检查是否已有有效配置
existing = self.db.scalars(
select(MultiAgentConfig)
.where(
MultiAgentConfig.app_id == app_id,
MultiAgentConfig.is_active == True
)
.order_by(MultiAgentConfig.updated_at.desc())
).first()
if existing:
raise BusinessException("应用已有多 Agent 配置", BizCode.DUPLICATE_RESOURCE)
# 3. 验证主 Agent 存在
master_agent = self.db.get(AgentConfig, data.master_agent_id)
if not master_agent:
raise ResourceNotFoundException("主 Agent", str(data.master_agent_id))
# 4. 验证子 Agent 存在
for sub_agent in data.sub_agents:
agent = self.db.get(AgentConfig, sub_agent.agent_id)
if not agent:
raise ResourceNotFoundException("子 Agent", str(sub_agent.agent_id))
# 5. 创建配置(转换 UUID 为字符串以支持 JSON 序列化)
sub_agents_data = [convert_uuids_to_str(sub_agent.model_dump()) for sub_agent in data.sub_agents]
routing_rules_data = [convert_uuids_to_str(rule.model_dump()) for rule in data.routing_rules] if data.routing_rules else None
# 处理 execution_config可能是 None、字典或 Pydantic 模型)
if data.execution_config is None:
execution_config_data = {}
elif isinstance(data.execution_config, dict):
execution_config_data = convert_uuids_to_str(data.execution_config)
else:
execution_config_data = convert_uuids_to_str(data.execution_config.model_dump())
config = MultiAgentConfig(
app_id=app_id,
master_agent_id=data.master_agent_id,
master_agent_name=data.master_agent_name,
orchestration_mode=data.orchestration_mode,
sub_agents=sub_agents_data,
routing_rules=routing_rules_data,
execution_config=execution_config_data,
aggregation_strategy=data.aggregation_strategy
)
self.db.add(config)
self.db.commit()
self.db.refresh(config)
logger.info(
"创建多 Agent 配置成功",
extra={
"config_id": str(config.id),
"app_id": str(app_id),
"mode": data.orchestration_mode,
"sub_agent_count": len(data.sub_agents)
}
)
return config
def get_config(self, app_id: uuid.UUID) -> Optional[MultiAgentConfig]:
"""获取多 Agent 配置
Args:
app_id: 应用 ID
Returns:
多 Agent 配置,如果不存在返回 None
"""
return self.db.scalars(
select(MultiAgentConfig)
.where(
MultiAgentConfig.app_id == app_id,
MultiAgentConfig.is_active == True
)
.order_by(MultiAgentConfig.updated_at.desc())
).first()
def get_multi_agent_configs(self, app_id: uuid.UUID) -> Optional[dict]:
"""通过 app_id 获取最新有效的多智能体配置,并将 agent_id 转换为 app_id
Args:
app_id: 应用 ID
Returns:
转换后的配置字典,如果不存在返回 None
"""
config = self.get_config(app_id)
if not config:
return None
#兼容代码
if not config.default_model_config_id:
master_release = self.db.get(AppRelease, config.master_agent_id)
config.default_model_config_id = master_release.default_model_config_id if master_release else None
# 转换 sub_agents 中的 agent_id (release_id) 为 app_id
converted_sub_agents = []
for sub_agent in config.sub_agents:
sub_agent_copy = sub_agent.copy()
release_id = sub_agent.get("agent_id")
if release_id:
try:
release_id_uuid = uuid.UUID(release_id) if isinstance(release_id, str) else release_id
sub_release = self.db.get(AppRelease, release_id_uuid)
if sub_release:
sub_agent_copy["agent_id"] = str(sub_release.app_id)
except Exception as e:
logger.warning(f"转换 sub_agent agent_id 失败: {release_id}, 错误: {str(e)}")
converted_sub_agents.append(sub_agent_copy)
# 构建返回的配置字典
return {
"id": config.id,
"app_id": config.app_id,
"default_model_config_id": config.default_model_config_id,
"model_parameters": config.model_parameters,
"orchestration_mode": config.orchestration_mode,
"sub_agents": converted_sub_agents,
"routing_rules": config.routing_rules,
"execution_config": config.execution_config,
"aggregation_strategy": config.aggregation_strategy,
"is_active": config.is_active,
"created_at": config.created_at,
"updated_at": config.updated_at
}
def get_published_config_by_agent_id(self, agent_id: uuid.UUID) -> Optional[dict]:
"""通过 agent_id 获取当前发布版本的完整配置
Args:
agent_id: Agent 配置 ID
Returns:
当前发布版本的配置字典,如果没有发布版本则返回 None
"""
from app.models import AppRelease
# 查询 Agent 配置
agent_config = self.db.get(AgentConfig, agent_id)
if not agent_config:
logger.warning(f"Agent 配置不存在: {agent_id}")
return None
# 获取关联的应用
app = self.db.get(App, agent_config.app_id)
if not app or not app.current_release_id:
logger.warning(f"应用未发布或不存在: app_id={agent_config.app_id}")
return None
# 获取当前发布版本
release = self.db.get(AppRelease, app.current_release_id)
if not release:
logger.warning(f"发布版本不存在: release_id={app.current_release_id}")
return None
# 从发布版本的 config 中获取完整配置
# config 是一个 JSON 对象,包含了发布时的配置快照
config_data = release.config
if config_data and isinstance(config_data, dict):
return config_data
return None
def get_published_by_agent_id(self, agent_id: uuid.UUID) -> Optional[AppRelease]:
"""通过 agent_id 获取当前发布版本的完整配置
Args:
agent_id: Agent 配置 ID
Returns:
当前发布版本的配置字典,如果没有发布版本则返回 None
"""
# 获取关联的应用
app = self.db.get(App, agent_id)
if not app or not app.current_release_id:
logger.warning(f"应用未发布或不存在: app_id={agent_id}")
return None
# 获取当前发布版本
release = self.db.get(AppRelease, app.current_release_id)
if not release:
logger.warning(f"发布版本不存在: release_id={app.current_release_id}")
return None
return release
def check_config_data(self,app_id: uuid.UUID, data: MultiAgentConfigUpdate) -> MultiAgentConfig:
# 1. 验证应用存在
app = self.db.get(App, app_id)
if not app:
raise ResourceNotFoundException("应用", str(app_id))
# 2. 验证模型配置
model_api_key = ModelApiKeyService.get_a_api_key(self.db,data.default_model_config_id)
if not model_api_key:
raise ResourceNotFoundException("模型配置", str(data.default_model_config_id))
# 3. 验证子 Agent 存在并获取发布版本 ID
for sub_agent in data.sub_agents:
agent_app_release = self.get_published_by_agent_id(sub_agent.agent_id)
if not agent_app_release:
raise ResourceNotFoundException("子 Agent 未发布或不存在", str(sub_agent.agent_id))
# 使用发布版本 ID
sub_agent.agent_id = agent_app_release.id
# 5. 创建配置(转换 UUID 为字符串以支持 JSON 序列化)
sub_agents_data = [convert_uuids_to_str(sub_agent.model_dump()) for sub_agent in data.sub_agents]
# routing_rules_data = [convert_uuids_to_str(rule.model_dump()) for rule in data.routing_rules] if data.routing_rules else None
# 处理 execution_config可能是 None、字典或 Pydantic 模型)
if data.execution_config is None:
execution_config_data = {}
elif isinstance(data.execution_config, dict):
execution_config_data = convert_uuids_to_str(data.execution_config)
else:
execution_config_data = convert_uuids_to_str(data.execution_config.model_dump())
# 处理 model_parameters可能是 None、字典或 Pydantic 模型)
if data.model_parameters is None:
model_parameters_data = None
# elif isinstance(data.model_parameters, dict):
# # 过滤掉值为 None 的字段
# model_parameters_data = {k: v for k, v in data.model_parameters.items() if v is not None}
else:
# 过滤掉值为 None 的字段
# model_parameters_data = {k: v for k, v in data.model_parameters.model_dump().items() if v is not None}
model_parameters_data = data.model_parameters
config = MultiAgentConfig(
app_id=app_id,
master_agent_id=data.master_agent_id,
master_agent_name=data.master_agent_name,
default_model_config_id=data.default_model_config_id,
model_parameters=model_parameters_data,
orchestration_mode=data.orchestration_mode,
sub_agents=sub_agents_data,
# routing_rules=routing_rules_data,
execution_config=execution_config_data,
aggregation_strategy=data.aggregation_strategy
)
return config
def update_config(
self,
app_id: uuid.UUID,
data: MultiAgentConfigUpdate
) -> MultiAgentConfig:
"""更新多 Agent 配置
Args:
app_id: 应用 ID
data: 更新数据
Returns:
更新后的配置
"""
config = self.get_config(app_id)
newConfig = self.check_config_data(app_id, data)
if not config:
config = newConfig
self.db.add(config)
self.db.commit()
self.db.refresh(config)
logger.info(
"创建多 Agent 配置成功",
extra={
"config_id": str(config.id),
"app_id": str(app_id),
"mode": data.orchestration_mode,
"sub_agent_count": len(data.sub_agents)
}
)
return config
config.default_model_config_id = newConfig.default_model_config_id
config.model_parameters = newConfig.model_parameters
config.orchestration_mode = newConfig.orchestration_mode
config.sub_agents = newConfig.sub_agents
config.routing_rules = newConfig.routing_rules
config.execution_config = newConfig.execution_config
config.aggregation_strategy = newConfig.aggregation_strategy
self.db.commit()
self.db.refresh(config)
logger.info(
"更新多 Agent 配置成功",
extra={
"config_id": str(config.id),
"app_id": str(app_id)
}
)
return config
def delete_config(self, app_id: uuid.UUID) -> None:
"""删除多 Agent 配置
Args:
app_id: 应用 ID
"""
config = self.get_config(app_id)
if not config:
raise ResourceNotFoundException("多 Agent 配置", str(app_id))
# 逻辑删除多 Agent 配置
config.is_active = False
self.db.commit()
logger.info(
"删除多 Agent 配置成功",
extra={
"config_id": str(config.id),
"app_id": str(app_id)
}
)
async def run(
self,
app_id: uuid.UUID,
request: MultiAgentRunRequest
) -> dict:
"""运行多 Agent 任务
Args:
app_id: 应用 ID
request: 运行请求
Returns:
执行结果
"""
# 1. 获取配置
config = self.get_config(app_id)
if not config:
raise ResourceNotFoundException("多 Agent 配置", str(app_id))
if not config.is_active:
raise BusinessException("多 Agent 配置已禁用", BizCode.RESOURCE_DISABLED)
# 2. 创建编排器
orchestrator = MultiAgentOrchestrator(self.db, config)
# 3. 执行任务
result = await orchestrator.execute(
message=request.message,
conversation_id=request.conversation_id,
user_id=request.user_id,
variables=request.variables,
use_llm_routing=getattr(request, 'use_llm_routing', True), # 默认启用 LLM 路由
web_search=getattr(request, 'web_search', False), # 网络搜索参数
memory=getattr(request, 'memory', True) # 记忆功能参数
)
return result
async def run_stream(
self,
app_id: uuid.UUID,
request: MultiAgentRunRequest,
storage_type :str,
user_rag_memory_id :str
):
"""运行多 Agent 任务(流式返回)
Args:
app_id: 应用 ID
request: 运行请求
Yields:
SSE 格式的事件流
"""
# 1. 获取配置
config = self.get_config(app_id)
if not config:
raise ResourceNotFoundException("多 Agent 配置", str(app_id))
if not config.is_active:
raise BusinessException("多 Agent 配置已禁用", BizCode.RESOURCE_DISABLED)
# 2. 创建编排器
orchestrator = MultiAgentOrchestrator(self.db, config)
# 3. 流式执行任务
async for event in orchestrator.execute_stream(
message=request.message,
conversation_id=request.conversation_id,
user_id=request.user_id,
variables=request.variables,
use_llm_routing=getattr(request, 'use_llm_routing', True),
web_search=getattr(request, 'web_search', False), # 网络搜索参数
memory=getattr(request, 'memory', True) , # 记忆功能参数
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id
):
yield event
# def add_sub_agent(
# self,
# app_id: uuid.UUID,
# agent_id: uuid.UUID,
# name: str,
# role: Optional[str] = None,
# priority: int = 1,
# capabilities: Optional[List[str]] = None
# ) -> MultiAgentConfig:
# """添加子 Agent
# Args:
# app_id: 应用 ID
# agent_id: Agent ID
# name: Agent 名称
# role: 角色描述
# priority: 优先级
# capabilities: 能力列表
# Returns:
# 更新后的配置
# """
# config = self.get_config(app_id)
# if not config:
# raise ResourceNotFoundException("多 Agent 配置", str(app_id))
# # 验证 Agent 存在
# agent = self.db.get(AgentConfig, agent_id)
# if not agent:
# raise ResourceNotFoundException("Agent", str(agent_id))
# # 检查是否已存在
# for sub_agent in config.sub_agents:
# if sub_agent["agent_id"] == str(agent_id):
# raise BusinessException("Agent 已存在于配置中", BizCode.DUPLICATE_RESOURCE)
# # 添加子 Agent
# new_sub_agent = {
# "agent_id": str(agent_id),
# "name": name,
# "role": role,
# "priority": priority,
# "capabilities": capabilities or []
# }
# config.sub_agents.append(new_sub_agent)
# # 标记为已修改
# self.db.add(config)
# self.db.commit()
# self.db.refresh(config)
# logger.info(
# "添加子 Agent 成功",
# extra={
# "config_id": str(config.id),
# "agent_id": str(agent_id),
# "agent_name": name
# }
# )
# return config
# def remove_sub_agent(
# self,
# app_id: uuid.UUID,
# agent_id: uuid.UUID
# ) -> MultiAgentConfig:
# """移除子 Agent
# Args:
# app_id: 应用 ID
# agent_id: Agent ID
# Returns:
# 更新后的配置
# """
# config = self.get_config(app_id)
# if not config:
# raise ResourceNotFoundException("多 Agent 配置", str(app_id))
# # 查找并移除
# original_count = len(config.sub_agents)
# config.sub_agents = [
# sub_agent for sub_agent in config.sub_agents
# if sub_agent["agent_id"] != str(agent_id)
# ]
# if len(config.sub_agents) == original_count:
# raise ResourceNotFoundException("子 Agent", str(agent_id))
# # 标记为已修改
# self.db.add(config)
# self.db.commit()
# self.db.refresh(config)
# logger.info(
# "移除子 Agent 成功",
# extra={
# "config_id": str(config.id),
# "agent_id": str(agent_id)
# }
# )
# return config
def list_configs(
self,
workspace_id: uuid.UUID,
page: int = 1,
pagesize: int = 20
) -> Tuple[List[MultiAgentConfig], int]:
"""列出多 Agent 配置
Args:
workspace_id: 工作空间 ID
page: 页码
pagesize: 每页数量
Returns:
配置列表和总数
"""
# 构建查询
stmt = (
select(MultiAgentConfig)
.join(App)
.where(App.workspace_id == workspace_id)
.order_by(desc(MultiAgentConfig.created_at))
)
# 总数
count_stmt = stmt.with_only_columns(MultiAgentConfig.id)
total = len(self.db.execute(count_stmt).all())
# 分页
stmt = stmt.offset((page - 1) * pagesize).limit(pagesize)
configs = list(self.db.scalars(stmt).all())
return configs, total
# ==================== 依赖注入函数 ====================
def get_multi_agent_service(
db: Annotated[Session, Depends(get_db)]
) -> MultiAgentService:
"""获取工作流服务(依赖注入)"""
return MultiAgentService(db)