Merge pull request #992 from wanxunyang/develop-wxy

fix(workflow): rectify error handling and bolster execution logging
This commit is contained in:
山程漫悟
2026-04-24 18:58:40 +08:00
committed by GitHub
9 changed files with 301 additions and 73 deletions

View File

@@ -9,7 +9,7 @@ from app.core.logging_config import get_business_logger
from app.core.response_utils import success
from app.db import get_db
from app.dependencies import get_current_user, cur_workspace_access_guard
from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail
from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail, AppLogMessage
from app.schemas.response_schema import PageData, PageMeta
from app.services.app_service import AppService
from app.services.app_log_service import AppLogService
@@ -78,17 +78,32 @@ def get_app_log_detail(
# 验证应用访问权限
app_service = AppService(db)
app_service.get_app(app_id, workspace_id)
app = app_service.get_app(app_id, workspace_id)
# 使用 Service 层查询
log_service = AppLogService(db)
conversation, node_executions_map = log_service.get_conversation_detail(
conversation, messages, node_executions_map = log_service.get_conversation_detail(
app_id=app_id,
conversation_id=conversation_id,
workspace_id=workspace_id
workspace_id=workspace_id,
app_type=app.type
)
detail = AppLogConversationDetail.model_validate(conversation)
detail.node_executions_map = node_executions_map
# 构建基础会话信息(不经过 ORM relationship
base = AppLogConversation.model_validate(conversation)
# 单独处理 messages避免触发 SQLAlchemy relationship 校验
if messages and isinstance(messages[0], AppLogMessage):
# 工作流:已经是 AppLogMessage 实例
msg_list = messages
else:
# AgentORM Message 对象逐个转换
msg_list = [AppLogMessage.model_validate(m) for m in messages]
detail = AppLogConversationDetail(
**base.model_dump(),
messages=msg_list,
node_executions_map=node_executions_map,
)
return success(data=detail)

View File

@@ -73,6 +73,7 @@ class CustomTool(BaseTool):
# 添加通用参数(基于第一个操作的参数)
if self._parsed_operations:
first_operation = next(iter(self._parsed_operations.values()))
# path/query 参数
for param_name, param_info in first_operation.get("parameters", {}).items():
params.append(ToolParameter(
name=param_name,
@@ -85,6 +86,23 @@ class CustomTool(BaseTool):
maximum=param_info.get("maximum"),
pattern=param_info.get("pattern")
))
# requestBody 参数 — 将 body 字段平铺为独立参数暴露给模型
request_body = first_operation.get("request_body")
if request_body:
body_schema = request_body.get("properties", {})
required_fields = request_body.get("required", [])
for prop_name, prop_schema in body_schema.items():
params.append(ToolParameter(
name=prop_name,
type=self._convert_openapi_type(prop_schema.get("type", "string")),
description=prop_schema.get("description", ""),
required=prop_name in required_fields,
default=prop_schema.get("default"),
enum=prop_schema.get("enum"),
minimum=prop_schema.get("minimum"),
maximum=prop_schema.get("maximum"),
pattern=prop_schema.get("pattern")
))
return params

View File

@@ -180,6 +180,9 @@ class IterationRuntime:
"cycle_id": self.node_id,
"cycle_idx": idx,
"node_id": node_name,
"node_type": node_type,
"node_name": node_cfg.get("data", {}).get("label") if node_cfg else node_name,
"status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"),
"input": result.get("node_outputs", {}).get(node_name, {}).get("input")
if not cycle_variable else cycle_variable,
"output": result.get("node_outputs", {}).get(node_name, {}).get("output")

View File

@@ -210,6 +210,9 @@ class LoopRuntime:
"cycle_id": self.node_id,
"cycle_idx": idx,
"node_id": node_name,
"node_type": node_type,
"node_name": node_name,
"status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"),
"input": result.get("node_outputs", {}).get(node_name, {}).get("input")
if not cycle_variable else cycle_variable,
"output": result.get("node_outputs", {}).get(node_name, {}).get("output")

View File

@@ -14,6 +14,7 @@ class AppLogMessage(BaseModel):
conversation_id: uuid.UUID
role: str = Field(description="角色: user / assistant / system")
content: str
status: Optional[str] = Field(default=None, description="执行状态(工作流专用): completed / failed")
meta_data: Optional[Dict[str, Any]] = None
created_at: datetime.datetime
@@ -58,6 +59,7 @@ class AppLogNodeExecution(BaseModel):
input: Optional[Any] = None
process: Optional[Any] = None
output: Optional[Any] = None
cycle_items: Optional[List[Any]] = None
elapsed_time: Optional[float] = None
token_usage: Optional[Dict[str, Any]] = None

View File

@@ -65,6 +65,11 @@ class ApiKeyService:
BizCode.BAD_REQUEST
)
if data.resource_id:
app = db.get(App, data.resource_id)
if not app or not app.current_release_id:
raise BusinessException("该应用未发布", BizCode.APP_NOT_PUBLISHED)
# 生成 API Key
api_key = generate_api_key(data.type)

View File

@@ -1,16 +1,17 @@
"""应用日志服务层"""
import uuid
import datetime as dt
from typing import Optional, Tuple
from datetime import datetime
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.core.logging_config import get_business_logger
from app.models.app_model import AppType
from app.models.conversation_model import Conversation, Message
from app.models.workflow_model import WorkflowExecution
from app.repositories.conversation_repository import ConversationRepository, MessageRepository
from app.schemas.app_log_schema import AppLogNodeExecution
from app.schemas.app_log_schema import AppLogMessage, AppLogNodeExecution
logger = get_business_logger()
@@ -83,51 +84,40 @@ class AppLogService:
self,
app_id: uuid.UUID,
conversation_id: uuid.UUID,
workspace_id: uuid.UUID
) -> Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]:
workspace_id: uuid.UUID,
app_type: str = AppType.AGENT
) -> Tuple[Conversation, list, dict[str, list[AppLogNodeExecution]]]:
"""
查询会话详情(包含消息和工作流节点执行记录)
Args:
app_id: 应用 ID
conversation_id: 会话 ID
workspace_id: 工作空间 ID
查询会话详情
Returns:
Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]:
(包含消息的会话对象, 按消息ID分组的节点执行记录)
Raises:
ResourceNotFoundException: 当会话不存在时
Tuple[Conversation, list[AppLogMessage|Message], dict[str, list[AppLogNodeExecution]]]
"""
logger.info(
"查询应用日志会话详情",
extra={
"app_id": str(app_id),
"conversation_id": str(conversation_id),
"workspace_id": str(workspace_id)
"workspace_id": str(workspace_id),
"app_type": app_type
}
)
# 查询会话
conversation = self.conversation_repository.get_conversation_for_app_log(
conversation_id=conversation_id,
app_id=app_id,
workspace_id=workspace_id
)
# 查询消息(按时间正序)
messages = self.message_repository.get_messages_by_conversation(
conversation_id=conversation_id
)
# 将消息附加到会话对象
conversation.messages = messages
# 查询工作流节点执行记录(按消息分组)
_, node_executions_map = self._get_workflow_node_executions_with_map(
conversation_id, messages
)
if app_type == AppType.WORKFLOW:
messages, node_executions_map = self._get_workflow_messages_and_nodes(conversation_id)
else:
messages = self.message_repository.get_messages_by_conversation(
conversation_id=conversation_id
)
node_executions_map = self._get_workflow_node_executions_with_map(
conversation_id, messages
)
logger.info(
"查询应用日志会话详情成功",
@@ -139,13 +129,129 @@ class AppLogService:
}
)
return conversation, node_executions_map
return conversation, messages, node_executions_map
def _get_workflow_messages_and_nodes(
self,
conversation_id: uuid.UUID,
) -> Tuple[list[AppLogMessage], dict[str, list[AppLogNodeExecution]]]:
"""
工作流应用专用:从 workflow_executions 构建 messages 和节点日志。
每条 WorkflowExecution 对应一轮对话:
- user message来自 execution.input_datacontent 取 message 字段files 放 meta_data
- assistant message来自 execution.output_data失败时内容为错误信息
开场白的 suggested_questions 合并到第一条 assistant message 的 meta_data 里。
Returns:
(messages 列表, node_executions_map)
"""
stmt = (
select(WorkflowExecution)
.where(
WorkflowExecution.conversation_id == conversation_id,
WorkflowExecution.status.in_(["completed", "failed"])
)
.order_by(WorkflowExecution.started_at.asc())
)
executions = list(self.db.scalars(stmt).all())
# 查开场白Message 表里 meta_data 含 suggested_questions 的第一条 assistant 消息
opening_stmt = (
select(Message)
.where(
Message.conversation_id == conversation_id,
Message.role == "assistant",
)
.order_by(Message.created_at.asc())
.limit(10)
)
early_messages = list(self.db.scalars(opening_stmt).all())
suggested_questions: list = []
for m in early_messages:
if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data:
suggested_questions = m.meta_data.get("suggested_questions") or []
break
messages: list[AppLogMessage] = []
node_executions_map: dict[str, list[AppLogNodeExecution]] = {}
# 如果有开场白,作为第一条 assistant 消息插入
if suggested_questions or early_messages:
opening_msg = next(
(m for m in early_messages
if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data),
None
)
if opening_msg:
messages.append(AppLogMessage(
id=opening_msg.id,
conversation_id=conversation_id,
role="assistant",
content=opening_msg.content,
status=None,
meta_data={"suggested_questions": suggested_questions},
created_at=opening_msg.created_at,
))
for execution in executions:
started_at = execution.started_at or dt.datetime.now()
completed_at = execution.completed_at or started_at
# assistant message 的 id同时作为 node_executions_map 的 key
assistant_msg_id = uuid.uuid5(execution.id, "assistant")
# --- user message输入---
input_data = execution.input_data or {}
input_content = input_data.get("message") or _extract_text(input_data)
# 跳过没有用户输入的 execution如开场白触发的记录
if not input_content or not input_content.strip():
continue
files = input_data.get("files") or []
user_msg = AppLogMessage(
id=uuid.uuid5(execution.id, "user"),
conversation_id=conversation_id,
role="user",
content=input_content,
meta_data={"files": files} if files else None,
created_at=started_at,
)
messages.append(user_msg)
# --- assistant message输出---
if execution.status == "completed":
output_content = _extract_text(execution.output_data)
meta = {"usage": execution.token_usage or {}, "elapsed_time": execution.elapsed_time}
else:
output_content = _extract_text(execution.output_data) or ""
meta = {"error": execution.error_message, "error_node_id": execution.error_node_id}
assistant_msg = AppLogMessage(
id=assistant_msg_id,
conversation_id=conversation_id,
role="assistant",
content=output_content,
status=execution.status,
meta_data=meta,
created_at=completed_at,
)
messages.append(assistant_msg)
# --- 节点执行记录,从 workflow_executions.output_data["node_outputs"] 读取 ---
execution_nodes = _build_nodes_from_output_data(execution.output_data)
if execution_nodes:
node_executions_map[str(assistant_msg_id)] = execution_nodes
return messages, node_executions_map
def _get_workflow_node_executions_with_map(
self,
conversation_id: uuid.UUID,
messages: list[Message]
) -> Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]:
) -> dict[str, list[AppLogNodeExecution]]:
"""
从 workflow_executions 表中提取节点执行记录,并按 assistant message 分组
@@ -157,13 +263,12 @@ class AppLogService:
Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]:
(所有节点执行记录列表, 按 message_id 分组的节点执行记录字典)
"""
node_executions = []
node_executions_map: dict[str, list[AppLogNodeExecution]] = {}
# 查询该会话关联的所有工作流执行记录(按时间正序)
stmt = select(WorkflowExecution).where(
WorkflowExecution.conversation_id == conversation_id,
WorkflowExecution.status == "completed"
WorkflowExecution.status.in_(["completed", "failed"])
).order_by(WorkflowExecution.started_at.asc())
executions = self.db.scalars(stmt).all()
@@ -188,10 +293,18 @@ class AppLogService:
used_message_ids: set[str] = set()
for execution in executions:
if not execution.output_data:
# 构建节点执行记录列表,从 workflow_executions.output_data["node_outputs"] 读取
execution_nodes = _build_nodes_from_output_data(execution.output_data)
if not execution_nodes:
continue
# 找到该 execution 对应的 assistant message
# 失败的执行没有 assistant message,直接用 execution id 作为 key
if execution.status == "failed":
node_executions_map[f"execution_{str(execution.id)}"] = execution_nodes
continue
# completed通过时序匹配关联到对应的 assistant message
# 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message
best_msg = None
best_dt = None
@@ -200,9 +313,9 @@ class AppLogService:
if msg_id_str in used_message_ids:
continue
if msg.created_at and msg.created_at >= execution.started_at:
dt = (msg.created_at - execution.started_at).total_seconds()
if best_dt is None or dt < best_dt:
best_dt = dt
delta = (msg.created_at - execution.started_at).total_seconds()
if best_dt is None or delta < best_dt:
best_dt = delta
best_msg = msg
if not best_msg:
@@ -210,31 +323,76 @@ class AppLogService:
msg_id_str = str(best_msg.id)
used_message_ids.add(msg_id_str)
node_executions_map[msg_id_str] = execution_nodes
# 提取节点输出
output_data = execution.output_data
if isinstance(output_data, dict):
node_outputs = output_data.get("node_outputs", {})
execution_nodes = []
for node_id, node_data in node_outputs.items():
if not isinstance(node_data, dict):
continue
node_execution = AppLogNodeExecution(
node_id=node_data.get("node_id", node_id),
node_type=node_data.get("node_type", "unknown"),
node_name=node_data.get("node_name"),
status=node_data.get("status", "unknown"),
error=node_data.get("error"),
input=node_data.get("input"),
process=node_data.get("process"),
output=node_data.get("output"),
elapsed_time=node_data.get("elapsed_time"),
token_usage=node_data.get("token_usage"),
)
node_executions.append(node_execution)
execution_nodes.append(node_execution)
return node_executions_map
# 将节点记录关联到 message_id
node_executions_map[msg_id_str] = execution_nodes
return node_executions, node_executions_map
def _extract_text(data: Optional[dict]) -> str:
"""从 workflow execution 的 input_data / output_data 中提取可读文本。
优先取 'text''content''output' 字段;若都没有则 JSON 序列化整个 dict。
"""
if not data:
return ""
for key in ("message", "text", "content", "output", "result", "answer"):
if key in data and isinstance(data[key], str):
return data[key]
import json
return json.dumps(data, ensure_ascii=False)
def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNodeExecution]:
"""从 workflow_executions.output_data["node_outputs"] 构建节点执行记录列表。
output_data 结构:
{
"node_outputs": {
"<node_id>": {
"node_type": ...,
"node_name": ...,
"status": ...,
"input": ...,
"output": ...,
"elapsed_time": ...,
"token_usage": ...,
"error": ...,
"cycle_items": [...],
...
}
},
"error": ...,
...
}
"""
if not output_data:
return []
node_outputs: dict = output_data.get("node_outputs") or {}
result = []
for node_id, node_data in node_outputs.items():
if not isinstance(node_data, dict):
continue
output = dict(node_data)
cycle_items = output.pop("cycle_items", None)
# 把已知的顶层字段剥离,剩余的作为 output
node_type = output.pop("node_type", "unknown")
node_name = output.pop("node_name", None)
status = output.pop("status", "completed")
error = output.pop("error", None)
inp = output.pop("input", None)
elapsed_time = output.pop("elapsed_time", None)
token_usage = output.pop("token_usage", None)
result.append(AppLogNodeExecution(
node_id=node_id,
node_type=node_type,
node_name=node_name,
status=status,
error=error,
input=inp,
process=None,
output=output if output else None,
cycle_items=cycle_items,
elapsed_time=elapsed_time,
token_usage=token_usage,
))
return result

View File

@@ -815,11 +815,12 @@ class ToolService:
"default": param_info.get("default")
})
# 请求体参数
# 请求体参数 — _extract_request_body 返回 {"schema": {...}, "required": bool, ...}
request_body = operation.get("request_body")
if request_body:
schema_props = request_body.get("schema", {}).get("properties", {})
required_props = request_body.get("schema", {}).get("required", [])
body_schema = request_body.get("schema", {})
schema_props = body_schema.get("properties", {})
required_props = body_schema.get("required", [])
for prop_name, prop_schema in schema_props.items():
parameters.append({

View File

@@ -17,8 +17,9 @@ from app.core.workflow.executor import execute_workflow, execute_workflow_stream
from app.core.workflow.nodes.enums import NodeType
from app.core.workflow.validator import validate_workflow_config
from app.db import get_db
from sqlalchemy import select
from app.models import App
from app.models.workflow_model import WorkflowConfig, WorkflowExecution
from app.models.workflow_model import WorkflowConfig, WorkflowExecution, WorkflowNodeExecution
from app.repositories import knowledge_repository
from app.repositories.workflow_repository import (
WorkflowConfigRepository,
@@ -918,6 +919,7 @@ class WorkflowService:
input_data["conv_messages"] = conv_messages
init_message_length = len(input_data.get("conv_messages", []))
message_id = uuid.uuid4()
_cycle_items: dict[str, list] = {}
# 新会话时写入开场白
is_new_conversation = init_message_length == 0
@@ -948,6 +950,15 @@ class WorkflowService:
memory_storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id
):
event_type = event.get("event")
event_data = event.get("data", {})
if event_type == "cycle_item":
cycle_id = event_data.get("cycle_id")
if cycle_id not in _cycle_items:
_cycle_items[cycle_id] = []
_cycle_items[cycle_id].append(event_data)
if event.get("event") == "workflow_end":
status = event.get("data", {}).get("status")
token_usage = event.get("data", {}).get("token_usage", {}) or {}
@@ -1019,6 +1030,18 @@ class WorkflowService:
)
else:
logger.error(f"unexpect workflow run status, status: {status}")
# 把积累的 cycle_item 写入 workflow_executions.output_data["node_outputs"]
if _cycle_items and execution.output_data:
import copy
new_output_data = copy.deepcopy(execution.output_data)
node_outputs = new_output_data.setdefault("node_outputs", {})
for cycle_node_id, items in _cycle_items.items():
if cycle_node_id in node_outputs:
node_outputs[cycle_node_id]["cycle_items"] = items
else:
node_outputs[cycle_node_id] = {"cycle_items": items}
execution.output_data = new_output_data
self.db.commit()
elif event.get("event") == "workflow_start":
event["data"]["message_id"] = str(message_id)
event = self._emit(public, event)