feat(agent): add input variable validation

This commit is contained in:
Eternity
2026-03-05 11:17:56 +08:00
parent 0d8f4c76e7
commit 16c1cbe24f
13 changed files with 330 additions and 882 deletions

View File

@@ -1791,372 +1791,6 @@ class AppService:
return shares
# ==================== 试运行功能 ====================
async def draft_run(
self,
*,
app_id: uuid.UUID,
message: str,
conversation_id: Optional[str] = None,
user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None,
workspace_id: Optional[uuid.UUID] = None
) -> Dict[str, Any]:
"""试运行 Agent使用当前草稿配置
Args:
app_id: 应用ID
message: 用户消息
conversation_id: 会话ID用于多轮对话
user_id: 用户ID用于会话管理
variables: 自定义变量参数值
workspace_id: 工作空间ID用于权限验证
Returns:
Dict: 包含 AI 回复和元数据的字典
Raises:
ResourceNotFoundException: 当应用不存在时
BusinessException: 当应用类型不支持或配置缺失时
"""
from app.services.draft_run_service import DraftRunService
logger.info("试运行 Agent", extra={"app_id": str(app_id), "user_message": message[:50]})
# 1. 验证应用
app = self._get_app_or_404(app_id)
if app.type != "agent":
raise BusinessException("只有 Agent 类型应用支持试运行", BizCode.APP_TYPE_NOT_SUPPORTED)
# 只读操作,允许访问共享应用
self._validate_app_accessible(app, workspace_id)
# 2. 获取 Agent 配置
stmt = select(AgentConfig).where(AgentConfig.app_id == app_id)
agent_cfg = self.db.scalars(stmt).first()
if not agent_cfg:
raise BusinessException("Agent 配置不存在,无法试运行", BizCode.AGENT_CONFIG_MISSING)
# 3. 获取模型配置
model_config = None
if agent_cfg.default_model_config_id:
from app.models import ModelConfig
model_config = self.db.get(ModelConfig, agent_cfg.default_model_config_id)
if not model_config:
raise BusinessException("模型配置不存在,无法试运行", BizCode.AGENT_CONFIG_MISSING)
# 4. 调用试运行服务
logger.debug(
"准备调用试运行服务",
extra={
"app_id": str(app_id),
"model": model_config.name,
"has_conversation_id": bool(conversation_id),
"has_variables": bool(variables)
}
)
draft_service = DraftRunService(self.db)
result = await draft_service.run(
agent_config=agent_cfg,
model_config=model_config,
message=message,
workspace_id=workspace_id,
conversation_id=conversation_id,
user_id=user_id,
variables=variables
)
logger.debug(
"试运行服务返回结果",
extra={
"result_type": str(type(result)),
"result_keys": list(result.keys()) if isinstance(result, dict) else "not_dict",
"has_message": "message" in result if isinstance(result, dict) else False,
"has_conversation_id": "conversation_id" in result if isinstance(result, dict) else False
}
)
logger.info(
"试运行完成",
extra={
"app_id": str(app_id),
"elapsed_time": result.get("elapsed_time"),
"model": model_config.name
}
)
return result
async def draft_run_stream(
self,
*,
app_id: uuid.UUID,
message: str,
conversation_id: Optional[str] = None,
user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None,
workspace_id: Optional[uuid.UUID] = None
):
"""试运行 Agent流式返回
Args:
app_id: 应用ID
message: 用户消息
conversation_id: 会话ID用于多轮对话
user_id: 用户ID用于会话管理
variables: 自定义变量参数值
workspace_id: 工作空间ID用于权限验证
Yields:
str: SSE 格式的事件数据
Raises:
ResourceNotFoundException: 当应用不存在时
BusinessException: 当应用类型不支持或配置缺失时
"""
from app.services.draft_run_service import DraftRunService
logger.info("流式试运行 Agent", extra={"app_id": str(app_id), "user_message": message[:50]})
# 1. 验证应用
app = self._get_app_or_404(app_id)
if app.type != "agent":
raise BusinessException("只有 Agent 类型应用支持试运行", BizCode.APP_TYPE_NOT_SUPPORTED)
# 只读操作,允许访问共享应用
self._validate_app_accessible(app, workspace_id)
# 2. 获取 Agent 配置
stmt = select(AgentConfig).where(AgentConfig.app_id == app_id)
agent_cfg = self.db.scalars(stmt).first()
if not agent_cfg:
raise BusinessException("Agent 配置不存在,无法试运行", BizCode.AGENT_CONFIG_MISSING)
# 3. 获取模型配置
model_config = None
if agent_cfg.default_model_config_id:
from app.models import ModelConfig
model_config = self.db.get(ModelConfig, agent_cfg.default_model_config_id)
if not model_config:
raise BusinessException("模型配置不存在,无法试运行", BizCode.AGENT_CONFIG_MISSING)
# 4. 调用流式试运行服务
draft_service = DraftRunService(self.db)
async for event in draft_service.run_stream(
agent_config=agent_cfg,
model_config=model_config,
message=message,
workspace_id=workspace_id,
conversation_id=conversation_id,
user_id=user_id,
variables=variables
):
yield event
# ==================== 多模型对比试运行 ====================
async def draft_run_compare(
self,
*,
app_id: uuid.UUID,
message: str,
models: List[app_schema.ModelCompareItem],
conversation_id: Optional[str] = None,
user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None,
workspace_id: Optional[uuid.UUID] = None,
parallel: bool = True,
timeout: int = 60
) -> Dict[str, Any]:
"""多模型对比试运行
Args:
app_id: 应用ID
message: 用户消息
models: 要对比的模型列表
conversation_id: 会话ID
user_id: 用户ID
variables: 变量参数
workspace_id: 工作空间ID
parallel: 是否并行执行
timeout: 超时时间(秒)
Returns:
Dict: 对比结果
"""
from app.models import ModelConfig
from app.services.draft_run_service import DraftRunService
logger.info(
"多模型对比试运行",
extra={
"app_id": str(app_id),
"model_count": len(models),
"parallel": parallel
}
)
# 1. 验证应用
app = self._get_app_or_404(app_id)
if app.type != "agent":
raise BusinessException("只有 Agent 类型应用支持试运行", BizCode.APP_TYPE_NOT_SUPPORTED)
# 只读操作,允许访问共享应用
self._validate_app_accessible(app, workspace_id)
# 2. 获取 Agent 配置
stmt = select(AgentConfig).where(AgentConfig.app_id == app_id)
agent_cfg = self.db.scalars(stmt).first()
if not agent_cfg:
raise BusinessException("Agent 配置不存在", BizCode.AGENT_CONFIG_MISSING)
# 3. 准备所有模型配置
model_configs = []
for model_item in models:
model_config = self.db.get(ModelConfig, model_item.model_config_id)
if not model_config:
raise ResourceNotFoundException("模型配置", str(model_item.model_config_id))
# 合并参数agent配置参数 + 请求覆盖参数
merged_parameters = {
**(agent_cfg.model_parameters or {}),
**(model_item.model_parameters or {})
}
model_configs.append({
"model_config": model_config,
"parameters": merged_parameters,
"label": model_item.label or model_config.name,
"model_config_id": model_item.model_config_id
})
# 4. 调用 DraftRunService 的对比方法
draft_service = DraftRunService(self.db)
result = await draft_service.run_compare(
agent_config=agent_cfg,
models=model_configs,
message=message,
workspace_id=workspace_id,
conversation_id=conversation_id,
user_id=user_id,
variables=variables,
parallel=parallel,
timeout=timeout
)
logger.info(
"多模型对比完成",
extra={
"app_id": str(app_id),
"successful": result["successful_count"],
"failed": result["failed_count"]
}
)
return result
async def draft_run_compare_stream(
self,
*,
app_id: uuid.UUID,
message: str,
models: List[app_schema.ModelCompareItem],
conversation_id: Optional[str] = None,
user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None,
workspace_id: Optional[uuid.UUID] = None,
parallel: bool = True,
timeout: int = 60
):
"""多模型对比试运行(流式返回)
Args:
app_id: 应用ID
message: 用户消息
models: 要对比的模型列表
conversation_id: 会话ID
user_id: 用户ID
variables: 变量参数
workspace_id: 工作空间ID
timeout: 超时时间(秒)
Yields:
str: SSE 格式的事件数据
"""
from app.models import ModelConfig
from app.services.draft_run_service import DraftRunService
logger.info(
"多模型对比流式试运行",
extra={
"app_id": str(app_id),
"model_count": len(models)
}
)
# 1. 验证应用
app = self._get_app_or_404(app_id)
if app.type != "agent":
raise BusinessException("只有 Agent 类型应用支持试运行", BizCode.APP_TYPE_NOT_SUPPORTED)
# 只读操作,允许访问共享应用
self._validate_app_accessible(app, workspace_id)
# 2. 获取 Agent 配置
stmt = select(AgentConfig).where(AgentConfig.app_id == app_id)
agent_cfg = self.db.scalars(stmt).first()
if not agent_cfg:
raise BusinessException("Agent 配置不存在", BizCode.AGENT_CONFIG_MISSING)
# 3. 准备所有模型配置
model_configs = []
for model_item in models:
model_config = self.db.get(ModelConfig, model_item.model_config_id)
if not model_config:
raise ResourceNotFoundException("模型配置", str(model_item.model_config_id))
# 合并参数agent配置参数 + 请求覆盖参数
merged_parameters = {
**(agent_cfg.model_parameters or {}),
**(model_item.model_parameters or {})
}
model_configs.append({
"model_config": model_config,
"parameters": merged_parameters,
"label": model_item.label or model_config.name,
"model_config_id": model_item.model_config_id
})
# 4. 调用 DraftRunService 的流式对比方法
draft_service = DraftRunService(self.db)
async for event in draft_service.run_compare_stream(
agent_config=agent_cfg,
models=model_configs,
message=message,
workspace_id=workspace_id,
conversation_id=conversation_id,
user_id=user_id,
variables=variables,
parallel=parallel,
timeout=timeout
):
yield event
logger.info(
"多模型对比流式完成",
extra={"app_id": str(app_id)}
)
# ==================== 向后兼容的函数接口 ====================
# 保留函数接口以兼容现有代码,但内部使用服务类
@@ -2278,53 +1912,6 @@ def get_apps_by_ids(
return service.get_apps_by_ids(app_ids, workspace_id)
# ==================== 向后兼容的函数接口 ====================
async def draft_run(
db: Session,
*,
app_id: uuid.UUID,
message: str,
conversation_id: Optional[str] = None,
user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None,
workspace_id: Optional[uuid.UUID] = None
) -> Dict[str, Any]:
"""试运行 Agent向后兼容接口"""
service = AppService(db)
return await service.draft_run(
app_id=app_id,
message=message,
conversation_id=conversation_id,
user_id=user_id,
variables=variables,
workspace_id=workspace_id
)
async def draft_run_stream(
db: Session,
*,
app_id: uuid.UUID,
message: str,
conversation_id: Optional[str] = None,
user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None,
workspace_id: Optional[uuid.UUID] = None
):
"""试运行 Agent 流式返回(向后兼容接口)"""
service = AppService(db)
async for event in service.draft_run_stream(
app_id=app_id,
message=message,
conversation_id=conversation_id,
user_id=user_id,
variables=variables,
workspace_id=workspace_id
):
yield event
# ==================== 依赖注入函数 ====================
def get_app_service(