From 5c89acced68717d96ed5f941fd8b1139b19c2231 Mon Sep 17 00:00:00 2001 From: wwq Date: Fri, 24 Apr 2026 10:29:41 +0800 Subject: [PATCH 1/8] fix(api_key): validate application publication status before key generation - Ensure the application exists and is published when resource_id is present; raise an exception otherwise. --- api/app/services/api_key_service.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index 49b07121..e4367e98 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -65,6 +65,11 @@ class ApiKeyService: BizCode.BAD_REQUEST ) + if data.resource_id: + app = db.get(App, data.resource_id) + if not app or not app.current_release_id: + raise BusinessException("该应用未发布", BizCode.APP_NOT_PUBLISHED) + # 生成 API Key api_key = generate_api_key(data.type) From 0f7a7263ebad9f7b91267244ea4a837b5d58c5b5 Mon Sep 17 00:00:00 2001 From: wwq Date: Fri, 24 Apr 2026 11:39:33 +0800 Subject: [PATCH 2/8] fix(workflow): rectify error handling and bolster execution logging - Rectify exception propagation during node execution failures to ensure errors are correctly raised. - Bolster workflow logging to support failed status records and persist node execution data, including loop nodes. --- api/app/core/workflow/nodes/base_node.py | 29 ++++----- api/app/services/app_log_service.py | 56 ++++++++--------- api/app/services/workflow_service.py | 78 +++++++++++++++++++++++- 3 files changed, 117 insertions(+), 46 deletions(-) diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 5458a80c..85f4bc5f 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -228,19 +228,21 @@ class BaseNode(ABC): logger.error( f"Node {self.node_id} execution timed out ({timeout} seconds)." ) - return self._wrap_error( + self._wrap_error( f"Node execution timed out ({timeout} seconds).", elapsed_time, state, variable_pool, ) + raise except Exception as e: elapsed_time = (time.time() - start_time) * 1000 logger.error( f"Node {self.node_id} execution failed: {e}", exc_info=True, ) - return self._wrap_error(str(e), elapsed_time, state, variable_pool) + self._wrap_error(str(e), elapsed_time, state, variable_pool) + raise async def run_stream( self, state: WorkflowState, @@ -351,11 +353,13 @@ class BaseNode(ABC): variable_pool ) yield error_output + raise except Exception as e: elapsed_time = (time.time() - start_time) * 1000 logger.error(f"Node {self.node_id} execution failed: {e}", exc_info=True) error_output = self._wrap_error(str(e), elapsed_time, state, variable_pool) yield error_output + raise def _wrap_output( self, @@ -447,26 +451,19 @@ class BaseNode(ABC): "error": error_message } - # if error_edge: - # # If an error edge exists, log a warning and continue to error node - # logger.warning( - # f"Node {self.node_id} execution failed, redirecting to error node: {error_edge['target']}" - # ) - # return { - # "node_outputs": { - # self.node_id: node_output - # }, - # "error": error_message, - # "error_node": self.node_id - # } - # else: writer = get_stream_writer() writer({ "type": "node_error", **node_output }) logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}") - raise Exception(f"Node {self.node_id} execution failed: {error_message}") + return { + "node_outputs": { + self.node_id: node_output + }, + "error": error_message, + "error_node": self.node_id + } def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: """Extracts the input data for this node (used for logging or audit). diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index 8f5052e6..b2801df3 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -163,7 +163,7 @@ class AppLogService: # 查询该会话关联的所有工作流执行记录(按时间正序) stmt = select(WorkflowExecution).where( WorkflowExecution.conversation_id == conversation_id, - WorkflowExecution.status == "completed" + WorkflowExecution.status.in_(["completed", "failed"]) ).order_by(WorkflowExecution.started_at.asc()) executions = self.db.scalars(stmt).all() @@ -188,10 +188,33 @@ class AppLogService: used_message_ids: set[str] = set() for execution in executions: - if not execution.output_data: + # 构建节点执行记录列表 + execution_nodes = [] + for node_exec in execution.node_executions: + node_execution = AppLogNodeExecution( + node_id=node_exec.node_id, + node_type=node_exec.node_type, + node_name=node_exec.node_name, + status=node_exec.status, + error=node_exec.error_message, + input=node_exec.input_data, + process=None, + output=node_exec.output_data, + elapsed_time=node_exec.elapsed_time, + token_usage=node_exec.token_usage, + ) + node_executions.append(node_execution) + execution_nodes.append(node_execution) + + if not execution_nodes: continue - # 找到该 execution 对应的 assistant message + # 失败的执行没有 assistant message,直接用 execution id 作为 key + if execution.status == "failed": + node_executions_map[f"execution_{str(execution.id)}"] = execution_nodes + continue + + # completed:通过时序匹配关联到对应的 assistant message # 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message best_msg = None best_dt = None @@ -210,31 +233,6 @@ class AppLogService: msg_id_str = str(best_msg.id) used_message_ids.add(msg_id_str) - - # 提取节点输出 - output_data = execution.output_data - if isinstance(output_data, dict): - node_outputs = output_data.get("node_outputs", {}) - execution_nodes = [] - for node_id, node_data in node_outputs.items(): - if not isinstance(node_data, dict): - continue - node_execution = AppLogNodeExecution( - node_id=node_data.get("node_id", node_id), - node_type=node_data.get("node_type", "unknown"), - node_name=node_data.get("node_name"), - status=node_data.get("status", "unknown"), - error=node_data.get("error"), - input=node_data.get("input"), - process=node_data.get("process"), - output=node_data.get("output"), - elapsed_time=node_data.get("elapsed_time"), - token_usage=node_data.get("token_usage"), - ) - node_executions.append(node_execution) - execution_nodes.append(node_execution) - - # 将节点记录关联到 message_id - node_executions_map[msg_id_str] = execution_nodes + node_executions_map[msg_id_str] = execution_nodes return node_executions, node_executions_map diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index 0d282d78..6c93893a 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -17,8 +17,9 @@ from app.core.workflow.executor import execute_workflow, execute_workflow_stream from app.core.workflow.nodes.enums import NodeType from app.core.workflow.validator import validate_workflow_config from app.db import get_db +from sqlalchemy import select from app.models import App -from app.models.workflow_model import WorkflowConfig, WorkflowExecution +from app.models.workflow_model import WorkflowConfig, WorkflowExecution, WorkflowNodeExecution from app.repositories import knowledge_repository from app.repositories.workflow_repository import ( WorkflowConfigRepository, @@ -909,6 +910,8 @@ class WorkflowService: input_data["conv_messages"] = conv_messages init_message_length = len(input_data.get("conv_messages", [])) message_id = uuid.uuid4() + _node_order_counter = 0 + _cycle_items: dict[str, list] = {} # 新会话时写入开场白 is_new_conversation = init_message_length == 0 @@ -939,6 +942,52 @@ class WorkflowService: memory_storage_type=storage_type, user_rag_memory_id=user_rag_memory_id ): + event_type = event.get("event") + event_data = event.get("data", {}) + + # 持久化节点执行记录 + if event_type == "node_end": + node_id = event_data.get("node_id") + node_cfg = next((n for n in config.nodes if n.get("id") == node_id), {}) + self.db.add(WorkflowNodeExecution( + execution_id=execution.id, + node_id=node_id, + node_type=node_cfg.get("type", "unknown"), + node_name=node_cfg.get("data", {}).get("label") or node_id, + execution_order=_node_order_counter, + status="completed", + input_data=event_data.get("input"), + output_data=event_data.get("output"), + elapsed_time=event_data.get("elapsed_time"), + token_usage=event_data.get("token_usage"), + )) + self.db.commit() + _node_order_counter += 1 + + elif event_type == "node_error": + node_id = event_data.get("node_id") + node_cfg = next((n for n in config.nodes if n.get("id") == node_id), {}) + self.db.add(WorkflowNodeExecution( + execution_id=execution.id, + node_id=node_id, + node_type=node_cfg.get("type", "unknown"), + node_name=node_cfg.get("data", {}).get("label") or node_id, + execution_order=_node_order_counter, + status="failed", + input_data=event_data.get("input"), + output_data=None, + error_message=event_data.get("error"), + elapsed_time=event_data.get("elapsed_time"), + )) + self.db.commit() + _node_order_counter += 1 + + elif event_type == "cycle_item": + cycle_id = event_data.get("cycle_id") + if cycle_id not in _cycle_items: + _cycle_items[cycle_id] = [] + _cycle_items[cycle_id].append(event_data) + if event.get("event") == "workflow_end": status = event.get("data", {}).get("status") token_usage = event.get("data", {}).get("token_usage", {}) or {} @@ -1003,6 +1052,33 @@ class WorkflowService: ) else: logger.error(f"unexpect workflow run status, status: {status}") + # 把积累的 cycle_item 写入对应循环节点的 output_data + if _cycle_items: + for cycle_node_id, items in _cycle_items.items(): + node_exec = self.db.execute( + select(WorkflowNodeExecution).where( + WorkflowNodeExecution.execution_id == execution.id, + WorkflowNodeExecution.node_id == cycle_node_id + ) + ).scalar_one_or_none() + if node_exec: + node_exec.output_data = { + **(node_exec.output_data or {}), + "cycle_items": items + } + else: + node_cfg = next((n for n in config.nodes if n.get("id") == cycle_node_id), {}) + self.db.add(WorkflowNodeExecution( + execution_id=execution.id, + node_id=cycle_node_id, + node_type=node_cfg.get("type", "cycle"), + node_name=node_cfg.get("data", {}).get("label") or cycle_node_id, + execution_order=_node_order_counter, + status="completed", + output_data={"cycle_items": items}, + )) + _node_order_counter += 1 + self.db.commit() elif event.get("event") == "workflow_start": event["data"]["message_id"] = str(message_id) event = self._emit(public, event) From 2d120a64b1d9975f05dd5df4482ddce5723e1950 Mon Sep 17 00:00:00 2001 From: wwq Date: Fri, 24 Apr 2026 11:50:48 +0800 Subject: [PATCH 3/8] fix(workflow): rectify error handling and bolster execution logging - Rectify exception propagation during node execution failures to ensure errors are correctly raised. - Bolster workflow logging to support failed status records and persist node execution data, including loop nodes. --- api/app/core/workflow/nodes/base_node.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 85f4bc5f..a0f1f01e 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -228,21 +228,19 @@ class BaseNode(ABC): logger.error( f"Node {self.node_id} execution timed out ({timeout} seconds)." ) - self._wrap_error( + return self._wrap_error( f"Node execution timed out ({timeout} seconds).", elapsed_time, state, variable_pool, ) - raise except Exception as e: elapsed_time = (time.time() - start_time) * 1000 logger.error( f"Node {self.node_id} execution failed: {e}", exc_info=True, ) - self._wrap_error(str(e), elapsed_time, state, variable_pool) - raise + return self._wrap_error(str(e), elapsed_time, state, variable_pool) async def run_stream( self, state: WorkflowState, @@ -353,13 +351,11 @@ class BaseNode(ABC): variable_pool ) yield error_output - raise except Exception as e: elapsed_time = (time.time() - start_time) * 1000 logger.error(f"Node {self.node_id} execution failed: {e}", exc_info=True) error_output = self._wrap_error(str(e), elapsed_time, state, variable_pool) yield error_output - raise def _wrap_output( self, @@ -457,13 +453,7 @@ class BaseNode(ABC): **node_output }) logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}") - return { - "node_outputs": { - self.node_id: node_output - }, - "error": error_message, - "error_node": self.node_id - } + raise Exception(f"Node {self.node_id} execution failed: {error_message}") def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: """Extracts the input data for this node (used for logging or audit). From b33f5951d854d60ee635cc47b562a720a72e3e62 Mon Sep 17 00:00:00 2001 From: wwq Date: Fri, 24 Apr 2026 11:52:15 +0800 Subject: [PATCH 4/8] fix(workflow): rectify error handling and bolster execution logging - Rectify exception propagation during node execution failures to ensure errors are correctly raised. - Bolster workflow logging to support failed status records and persist node execution data, including loop nodes. --- api/app/core/workflow/nodes/base_node.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index a0f1f01e..5458a80c 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -447,6 +447,19 @@ class BaseNode(ABC): "error": error_message } + # if error_edge: + # # If an error edge exists, log a warning and continue to error node + # logger.warning( + # f"Node {self.node_id} execution failed, redirecting to error node: {error_edge['target']}" + # ) + # return { + # "node_outputs": { + # self.node_id: node_output + # }, + # "error": error_message, + # "error_node": self.node_id + # } + # else: writer = get_stream_writer() writer({ "type": "node_error", From cedf47b3bca5bf999b35f7b1bd74e6028e63d1f4 Mon Sep 17 00:00:00 2001 From: wwq Date: Fri, 24 Apr 2026 15:29:33 +0800 Subject: [PATCH 5/8] fix(workflow): rectify error handling and bolster execution logging --- api/app/services/workflow_service.py | 39 +--------------------------- 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index 6c93893a..ea302a9c 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -945,44 +945,7 @@ class WorkflowService: event_type = event.get("event") event_data = event.get("data", {}) - # 持久化节点执行记录 - if event_type == "node_end": - node_id = event_data.get("node_id") - node_cfg = next((n for n in config.nodes if n.get("id") == node_id), {}) - self.db.add(WorkflowNodeExecution( - execution_id=execution.id, - node_id=node_id, - node_type=node_cfg.get("type", "unknown"), - node_name=node_cfg.get("data", {}).get("label") or node_id, - execution_order=_node_order_counter, - status="completed", - input_data=event_data.get("input"), - output_data=event_data.get("output"), - elapsed_time=event_data.get("elapsed_time"), - token_usage=event_data.get("token_usage"), - )) - self.db.commit() - _node_order_counter += 1 - - elif event_type == "node_error": - node_id = event_data.get("node_id") - node_cfg = next((n for n in config.nodes if n.get("id") == node_id), {}) - self.db.add(WorkflowNodeExecution( - execution_id=execution.id, - node_id=node_id, - node_type=node_cfg.get("type", "unknown"), - node_name=node_cfg.get("data", {}).get("label") or node_id, - execution_order=_node_order_counter, - status="failed", - input_data=event_data.get("input"), - output_data=None, - error_message=event_data.get("error"), - elapsed_time=event_data.get("elapsed_time"), - )) - self.db.commit() - _node_order_counter += 1 - - elif event_type == "cycle_item": + if event_type == "cycle_item": cycle_id = event_data.get("cycle_id") if cycle_id not in _cycle_items: _cycle_items[cycle_id] = [] From cf8db47389f3d0db856fc2a9be6ab26f27b1e78c Mon Sep 17 00:00:00 2001 From: wwq Date: Fri, 24 Apr 2026 17:02:03 +0800 Subject: [PATCH 6/8] feat(workflow): augment logging capabilities with execution status and loop support - Augment workflow logs with execution status fields and loop node information. - Refactor log service to handle distinct processing logic for workflows and agents. - Construct message and node logs derived from workflow_executions data. --- api/app/controllers/app_log_controller.py | 27 ++- .../workflow/nodes/cycle_graph/iteration.py | 2 + .../core/workflow/nodes/cycle_graph/loop.py | 2 + api/app/schemas/app_log_schema.py | 2 + api/app/services/app_log_service.py | 165 ++++++++++++++---- 5 files changed, 158 insertions(+), 40 deletions(-) diff --git a/api/app/controllers/app_log_controller.py b/api/app/controllers/app_log_controller.py index dea555b9..ea7962c1 100644 --- a/api/app/controllers/app_log_controller.py +++ b/api/app/controllers/app_log_controller.py @@ -9,7 +9,7 @@ from app.core.logging_config import get_business_logger from app.core.response_utils import success from app.db import get_db from app.dependencies import get_current_user, cur_workspace_access_guard -from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail +from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail, AppLogMessage from app.schemas.response_schema import PageData, PageMeta from app.services.app_service import AppService from app.services.app_log_service import AppLogService @@ -78,17 +78,32 @@ def get_app_log_detail( # 验证应用访问权限 app_service = AppService(db) - app_service.get_app(app_id, workspace_id) + app = app_service.get_app(app_id, workspace_id) # 使用 Service 层查询 log_service = AppLogService(db) - conversation, node_executions_map = log_service.get_conversation_detail( + conversation, messages, node_executions_map = log_service.get_conversation_detail( app_id=app_id, conversation_id=conversation_id, - workspace_id=workspace_id + workspace_id=workspace_id, + app_type=app.type ) - detail = AppLogConversationDetail.model_validate(conversation) - detail.node_executions_map = node_executions_map + # 构建基础会话信息(不经过 ORM relationship) + base = AppLogConversation.model_validate(conversation) + + # 单独处理 messages,避免触发 SQLAlchemy relationship 校验 + if messages and isinstance(messages[0], AppLogMessage): + # 工作流:已经是 AppLogMessage 实例 + msg_list = messages + else: + # Agent:ORM Message 对象逐个转换 + msg_list = [AppLogMessage.model_validate(m) for m in messages] + + detail = AppLogConversationDetail( + **base.model_dump(), + messages=msg_list, + node_executions_map=node_executions_map, + ) return success(data=detail) diff --git a/api/app/core/workflow/nodes/cycle_graph/iteration.py b/api/app/core/workflow/nodes/cycle_graph/iteration.py index 1633b9c7..1ac46180 100644 --- a/api/app/core/workflow/nodes/cycle_graph/iteration.py +++ b/api/app/core/workflow/nodes/cycle_graph/iteration.py @@ -180,6 +180,8 @@ class IterationRuntime: "cycle_id": self.node_id, "cycle_idx": idx, "node_id": node_name, + "node_type": node_type, + "node_name": node_cfg.get("data", {}).get("label") if node_cfg else node_name, "input": result.get("node_outputs", {}).get(node_name, {}).get("input") if not cycle_variable else cycle_variable, "output": result.get("node_outputs", {}).get(node_name, {}).get("output") diff --git a/api/app/core/workflow/nodes/cycle_graph/loop.py b/api/app/core/workflow/nodes/cycle_graph/loop.py index e555a228..ca432a04 100644 --- a/api/app/core/workflow/nodes/cycle_graph/loop.py +++ b/api/app/core/workflow/nodes/cycle_graph/loop.py @@ -210,6 +210,8 @@ class LoopRuntime: "cycle_id": self.node_id, "cycle_idx": idx, "node_id": node_name, + "node_type": node_type, + "node_name": node_name, "input": result.get("node_outputs", {}).get(node_name, {}).get("input") if not cycle_variable else cycle_variable, "output": result.get("node_outputs", {}).get(node_name, {}).get("output") diff --git a/api/app/schemas/app_log_schema.py b/api/app/schemas/app_log_schema.py index a60a8428..ce9ddd44 100644 --- a/api/app/schemas/app_log_schema.py +++ b/api/app/schemas/app_log_schema.py @@ -14,6 +14,7 @@ class AppLogMessage(BaseModel): conversation_id: uuid.UUID role: str = Field(description="角色: user / assistant / system") content: str + status: Optional[str] = Field(default=None, description="执行状态(工作流专用): completed / failed") meta_data: Optional[Dict[str, Any]] = None created_at: datetime.datetime @@ -58,6 +59,7 @@ class AppLogNodeExecution(BaseModel): input: Optional[Any] = None process: Optional[Any] = None output: Optional[Any] = None + cycle_items: Optional[List[Any]] = None elapsed_time: Optional[float] = None token_usage: Optional[Dict[str, Any]] = None diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index b2801df3..e8a80d40 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -1,16 +1,17 @@ """应用日志服务层""" import uuid +import datetime as dt from typing import Optional, Tuple -from datetime import datetime from sqlalchemy import select from sqlalchemy.orm import Session from app.core.logging_config import get_business_logger +from app.models.app_model import AppType from app.models.conversation_model import Conversation, Message from app.models.workflow_model import WorkflowExecution from app.repositories.conversation_repository import ConversationRepository, MessageRepository -from app.schemas.app_log_schema import AppLogNodeExecution +from app.schemas.app_log_schema import AppLogMessage, AppLogNodeExecution logger = get_business_logger() @@ -83,51 +84,40 @@ class AppLogService: self, app_id: uuid.UUID, conversation_id: uuid.UUID, - workspace_id: uuid.UUID - ) -> Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: + workspace_id: uuid.UUID, + app_type: str = AppType.AGENT + ) -> Tuple[Conversation, list, dict[str, list[AppLogNodeExecution]]]: """ - 查询会话详情(包含消息和工作流节点执行记录) - - Args: - app_id: 应用 ID - conversation_id: 会话 ID - workspace_id: 工作空间 ID + 查询会话详情 Returns: - Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: - (包含消息的会话对象, 按消息ID分组的节点执行记录) - - Raises: - ResourceNotFoundException: 当会话不存在时 + Tuple[Conversation, list[AppLogMessage|Message], dict[str, list[AppLogNodeExecution]]] """ logger.info( "查询应用日志会话详情", extra={ "app_id": str(app_id), "conversation_id": str(conversation_id), - "workspace_id": str(workspace_id) + "workspace_id": str(workspace_id), + "app_type": app_type } ) - # 查询会话 conversation = self.conversation_repository.get_conversation_for_app_log( conversation_id=conversation_id, app_id=app_id, workspace_id=workspace_id ) - # 查询消息(按时间正序) - messages = self.message_repository.get_messages_by_conversation( - conversation_id=conversation_id - ) - - # 将消息附加到会话对象 - conversation.messages = messages - - # 查询工作流节点执行记录(按消息分组) - _, node_executions_map = self._get_workflow_node_executions_with_map( - conversation_id, messages - ) + if app_type == AppType.WORKFLOW: + messages, node_executions_map = self._get_workflow_messages_and_nodes(conversation_id) + else: + messages = self.message_repository.get_messages_by_conversation( + conversation_id=conversation_id + ) + _, node_executions_map = self._get_workflow_node_executions_with_map( + conversation_id, messages + ) logger.info( "查询应用日志会话详情成功", @@ -139,7 +129,97 @@ class AppLogService: } ) - return conversation, node_executions_map + return conversation, messages, node_executions_map + + def _get_workflow_messages_and_nodes( + self, + conversation_id: uuid.UUID, + ) -> Tuple[list[AppLogMessage], dict[str, list[AppLogNodeExecution]]]: + """ + 工作流应用专用:从 workflow_executions 构建 messages 和节点日志。 + + 每条 WorkflowExecution 对应一轮对话: + - user message:来自 execution.input_data + - assistant message:来自 execution.output_data(失败时内容为错误信息) + 节点日志以 execution id 为 key 分组。 + + Returns: + (messages 列表, node_executions_map) + """ + stmt = ( + select(WorkflowExecution) + .where( + WorkflowExecution.conversation_id == conversation_id, + WorkflowExecution.status.in_(["completed", "failed"]) + ) + .order_by(WorkflowExecution.started_at.asc()) + ) + executions = list(self.db.scalars(stmt).all()) + + messages: list[AppLogMessage] = [] + node_executions_map: dict[str, list[AppLogNodeExecution]] = {} + + for execution in executions: + started_at = execution.started_at or dt.datetime.now() + completed_at = execution.completed_at or started_at + + # assistant message 的 id,同时作为 node_executions_map 的 key + assistant_msg_id = uuid.uuid5(execution.id, "assistant") + + # --- user message(输入)--- + input_content = _extract_text(execution.input_data) + user_msg = AppLogMessage( + id=uuid.uuid5(execution.id, "user"), + conversation_id=conversation_id, + role="user", + content=input_content, + meta_data=None, + created_at=started_at, + ) + messages.append(user_msg) + + # --- assistant message(输出)--- + if execution.status == "completed": + output_content = _extract_text(execution.output_data) + meta = {"usage": execution.token_usage or {}, "elapsed_time": execution.elapsed_time} + else: + output_content = _extract_text(execution.output_data) or "" + meta = {"error": execution.error_message, "error_node_id": execution.error_node_id} + + assistant_msg = AppLogMessage( + id=assistant_msg_id, + conversation_id=conversation_id, + role="assistant", + content=output_content, + status=execution.status, + meta_data=meta, + created_at=completed_at, + ) + messages.append(assistant_msg) + + # --- 节点执行记录,key 与 assistant message id 一致 --- + execution_nodes = [] + for node_exec in execution.node_executions: + output_data = dict(node_exec.output_data or {}) + cycle_items = output_data.pop("cycle_items", None) + execution_nodes.append(AppLogNodeExecution( + node_id=node_exec.node_id, + node_type=node_exec.node_type, + node_name=node_exec.node_name, + status=node_exec.status, + error=node_exec.error_message, + input=node_exec.input_data, + process=None, + output=output_data, + cycle_items=cycle_items, + elapsed_time=node_exec.elapsed_time, + token_usage=node_exec.token_usage, + )) + + if execution_nodes: + node_executions_map[str(assistant_msg_id)] = execution_nodes + + return messages, node_executions_map def _get_workflow_node_executions_with_map( self, @@ -191,6 +271,8 @@ class AppLogService: # 构建节点执行记录列表 execution_nodes = [] for node_exec in execution.node_executions: + output_data = dict(node_exec.output_data or {}) + cycle_items = output_data.pop("cycle_items", None) node_execution = AppLogNodeExecution( node_id=node_exec.node_id, node_type=node_exec.node_type, @@ -199,7 +281,8 @@ class AppLogService: error=node_exec.error_message, input=node_exec.input_data, process=None, - output=node_exec.output_data, + output=output_data, + cycle_items=cycle_items, elapsed_time=node_exec.elapsed_time, token_usage=node_exec.token_usage, ) @@ -223,9 +306,9 @@ class AppLogService: if msg_id_str in used_message_ids: continue if msg.created_at and msg.created_at >= execution.started_at: - dt = (msg.created_at - execution.started_at).total_seconds() - if best_dt is None or dt < best_dt: - best_dt = dt + delta = (msg.created_at - execution.started_at).total_seconds() + if best_dt is None or delta < best_dt: + best_dt = delta best_msg = msg if not best_msg: @@ -236,3 +319,17 @@ class AppLogService: node_executions_map[msg_id_str] = execution_nodes return node_executions, node_executions_map + + +def _extract_text(data: Optional[dict]) -> str: + """从 workflow execution 的 input_data / output_data 中提取可读文本。 + + 优先取 'text'、'content'、'output' 字段;若都没有则 JSON 序列化整个 dict。 + """ + if not data: + return "" + for key in ("text", "content", "output", "result", "answer"): + if key in data and isinstance(data[key], str): + return data[key] + import json + return json.dumps(data, ensure_ascii=False) From 21eb5006809dd7ce5b9c489ba7d657e61cd0d40c Mon Sep 17 00:00:00 2001 From: wxy Date: Fri, 24 Apr 2026 18:20:14 +0800 Subject: [PATCH 7/8] refactor(workflow): streamline node execution handling and log service logic - Consolidate node data retrieval from workflow_executions.output_data to unify storage access. - Optimize the construction of messages and execution records to support opening suggestions. - Eliminate redundant queries and storage logic to simplify the overall codebase structure. --- .../workflow/nodes/cycle_graph/iteration.py | 1 + .../core/workflow/nodes/cycle_graph/loop.py | 1 + api/app/services/app_log_service.py | 157 ++++++++++++------ api/app/services/workflow_service.py | 34 +--- 4 files changed, 121 insertions(+), 72 deletions(-) diff --git a/api/app/core/workflow/nodes/cycle_graph/iteration.py b/api/app/core/workflow/nodes/cycle_graph/iteration.py index 1ac46180..3ee7774f 100644 --- a/api/app/core/workflow/nodes/cycle_graph/iteration.py +++ b/api/app/core/workflow/nodes/cycle_graph/iteration.py @@ -182,6 +182,7 @@ class IterationRuntime: "node_id": node_name, "node_type": node_type, "node_name": node_cfg.get("data", {}).get("label") if node_cfg else node_name, + "status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"), "input": result.get("node_outputs", {}).get(node_name, {}).get("input") if not cycle_variable else cycle_variable, "output": result.get("node_outputs", {}).get(node_name, {}).get("output") diff --git a/api/app/core/workflow/nodes/cycle_graph/loop.py b/api/app/core/workflow/nodes/cycle_graph/loop.py index ca432a04..93f1a1e4 100644 --- a/api/app/core/workflow/nodes/cycle_graph/loop.py +++ b/api/app/core/workflow/nodes/cycle_graph/loop.py @@ -212,6 +212,7 @@ class LoopRuntime: "node_id": node_name, "node_type": node_type, "node_name": node_name, + "status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"), "input": result.get("node_outputs", {}).get(node_name, {}).get("input") if not cycle_variable else cycle_variable, "output": result.get("node_outputs", {}).get(node_name, {}).get("output") diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index e8a80d40..7ca05d42 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -115,7 +115,7 @@ class AppLogService: messages = self.message_repository.get_messages_by_conversation( conversation_id=conversation_id ) - _, node_executions_map = self._get_workflow_node_executions_with_map( + node_executions_map = self._get_workflow_node_executions_with_map( conversation_id, messages ) @@ -139,9 +139,9 @@ class AppLogService: 工作流应用专用:从 workflow_executions 构建 messages 和节点日志。 每条 WorkflowExecution 对应一轮对话: - - user message:来自 execution.input_data + - user message:来自 execution.input_data(content 取 message 字段,files 放 meta_data) - assistant message:来自 execution.output_data(失败时内容为错误信息) - 节点日志以 execution id 为 key 分组。 + 开场白的 suggested_questions 合并到第一条 assistant message 的 meta_data 里。 Returns: (messages 列表, node_executions_map) @@ -156,9 +156,44 @@ class AppLogService: ) executions = list(self.db.scalars(stmt).all()) + # 查开场白:Message 表里 meta_data 含 suggested_questions 的第一条 assistant 消息 + opening_stmt = ( + select(Message) + .where( + Message.conversation_id == conversation_id, + Message.role == "assistant", + ) + .order_by(Message.created_at.asc()) + .limit(10) + ) + early_messages = list(self.db.scalars(opening_stmt).all()) + suggested_questions: list = [] + for m in early_messages: + if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data: + suggested_questions = m.meta_data.get("suggested_questions") or [] + break + messages: list[AppLogMessage] = [] node_executions_map: dict[str, list[AppLogNodeExecution]] = {} + # 如果有开场白,作为第一条 assistant 消息插入 + if suggested_questions or early_messages: + opening_msg = next( + (m for m in early_messages + if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data), + None + ) + if opening_msg: + messages.append(AppLogMessage( + id=opening_msg.id, + conversation_id=conversation_id, + role="assistant", + content=opening_msg.content, + status=None, + meta_data={"suggested_questions": suggested_questions}, + created_at=opening_msg.created_at, + )) + for execution in executions: started_at = execution.started_at or dt.datetime.now() completed_at = execution.completed_at or started_at @@ -167,13 +202,20 @@ class AppLogService: assistant_msg_id = uuid.uuid5(execution.id, "assistant") # --- user message(输入)--- - input_content = _extract_text(execution.input_data) + input_data = execution.input_data or {} + input_content = input_data.get("message") or _extract_text(input_data) + + # 跳过没有用户输入的 execution(如开场白触发的记录) + if not input_content or not input_content.strip(): + continue + + files = input_data.get("files") or [] user_msg = AppLogMessage( id=uuid.uuid5(execution.id, "user"), conversation_id=conversation_id, role="user", content=input_content, - meta_data=None, + meta_data={"files": files} if files else None, created_at=started_at, ) messages.append(user_msg) @@ -197,24 +239,8 @@ class AppLogService: ) messages.append(assistant_msg) - # --- 节点执行记录,key 与 assistant message id 一致 --- - execution_nodes = [] - for node_exec in execution.node_executions: - output_data = dict(node_exec.output_data or {}) - cycle_items = output_data.pop("cycle_items", None) - execution_nodes.append(AppLogNodeExecution( - node_id=node_exec.node_id, - node_type=node_exec.node_type, - node_name=node_exec.node_name, - status=node_exec.status, - error=node_exec.error_message, - input=node_exec.input_data, - process=None, - output=output_data, - cycle_items=cycle_items, - elapsed_time=node_exec.elapsed_time, - token_usage=node_exec.token_usage, - )) + # --- 节点执行记录,从 workflow_executions.output_data["node_outputs"] 读取 --- + execution_nodes = _build_nodes_from_output_data(execution.output_data) if execution_nodes: node_executions_map[str(assistant_msg_id)] = execution_nodes @@ -225,7 +251,7 @@ class AppLogService: self, conversation_id: uuid.UUID, messages: list[Message] - ) -> Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: + ) -> dict[str, list[AppLogNodeExecution]]: """ 从 workflow_executions 表中提取节点执行记录,并按 assistant message 分组 @@ -237,7 +263,6 @@ class AppLogService: Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: (所有节点执行记录列表, 按 message_id 分组的节点执行记录字典) """ - node_executions = [] node_executions_map: dict[str, list[AppLogNodeExecution]] = {} # 查询该会话关联的所有工作流执行记录(按时间正序) @@ -268,26 +293,8 @@ class AppLogService: used_message_ids: set[str] = set() for execution in executions: - # 构建节点执行记录列表 - execution_nodes = [] - for node_exec in execution.node_executions: - output_data = dict(node_exec.output_data or {}) - cycle_items = output_data.pop("cycle_items", None) - node_execution = AppLogNodeExecution( - node_id=node_exec.node_id, - node_type=node_exec.node_type, - node_name=node_exec.node_name, - status=node_exec.status, - error=node_exec.error_message, - input=node_exec.input_data, - process=None, - output=output_data, - cycle_items=cycle_items, - elapsed_time=node_exec.elapsed_time, - token_usage=node_exec.token_usage, - ) - node_executions.append(node_execution) - execution_nodes.append(node_execution) + # 构建节点执行记录列表,从 workflow_executions.output_data["node_outputs"] 读取 + execution_nodes = _build_nodes_from_output_data(execution.output_data) if not execution_nodes: continue @@ -318,7 +325,7 @@ class AppLogService: used_message_ids.add(msg_id_str) node_executions_map[msg_id_str] = execution_nodes - return node_executions, node_executions_map + return node_executions_map def _extract_text(data: Optional[dict]) -> str: @@ -328,8 +335,64 @@ def _extract_text(data: Optional[dict]) -> str: """ if not data: return "" - for key in ("text", "content", "output", "result", "answer"): + for key in ("message", "text", "content", "output", "result", "answer"): if key in data and isinstance(data[key], str): return data[key] import json return json.dumps(data, ensure_ascii=False) + + +def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNodeExecution]: + """从 workflow_executions.output_data["node_outputs"] 构建节点执行记录列表。 + + output_data 结构: + { + "node_outputs": { + "": { + "node_type": ..., + "node_name": ..., + "status": ..., + "input": ..., + "output": ..., + "elapsed_time": ..., + "token_usage": ..., + "error": ..., + "cycle_items": [...], + ... + } + }, + "error": ..., + ... + } + """ + if not output_data: + return [] + node_outputs: dict = output_data.get("node_outputs") or {} + result = [] + for node_id, node_data in node_outputs.items(): + if not isinstance(node_data, dict): + continue + output = dict(node_data) + cycle_items = output.pop("cycle_items", None) + # 把已知的顶层字段剥离,剩余的作为 output + node_type = output.pop("node_type", "unknown") + node_name = output.pop("node_name", None) + status = output.pop("status", "completed") + error = output.pop("error", None) + inp = output.pop("input", None) + elapsed_time = output.pop("elapsed_time", None) + token_usage = output.pop("token_usage", None) + result.append(AppLogNodeExecution( + node_id=node_id, + node_type=node_type, + node_name=node_name, + status=status, + error=error, + input=inp, + process=None, + output=output if output else None, + cycle_items=cycle_items, + elapsed_time=elapsed_time, + token_usage=token_usage, + )) + return result diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index ea302a9c..aa91de89 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -910,7 +910,6 @@ class WorkflowService: input_data["conv_messages"] = conv_messages init_message_length = len(input_data.get("conv_messages", [])) message_id = uuid.uuid4() - _node_order_counter = 0 _cycle_items: dict[str, list] = {} # 新会话时写入开场白 @@ -1015,32 +1014,17 @@ class WorkflowService: ) else: logger.error(f"unexpect workflow run status, status: {status}") - # 把积累的 cycle_item 写入对应循环节点的 output_data - if _cycle_items: + # 把积累的 cycle_item 写入 workflow_executions.output_data["node_outputs"] + if _cycle_items and execution.output_data: + import copy + new_output_data = copy.deepcopy(execution.output_data) + node_outputs = new_output_data.setdefault("node_outputs", {}) for cycle_node_id, items in _cycle_items.items(): - node_exec = self.db.execute( - select(WorkflowNodeExecution).where( - WorkflowNodeExecution.execution_id == execution.id, - WorkflowNodeExecution.node_id == cycle_node_id - ) - ).scalar_one_or_none() - if node_exec: - node_exec.output_data = { - **(node_exec.output_data or {}), - "cycle_items": items - } + if cycle_node_id in node_outputs: + node_outputs[cycle_node_id]["cycle_items"] = items else: - node_cfg = next((n for n in config.nodes if n.get("id") == cycle_node_id), {}) - self.db.add(WorkflowNodeExecution( - execution_id=execution.id, - node_id=cycle_node_id, - node_type=node_cfg.get("type", "cycle"), - node_name=node_cfg.get("data", {}).get("label") or cycle_node_id, - execution_order=_node_order_counter, - status="completed", - output_data={"cycle_items": items}, - )) - _node_order_counter += 1 + node_outputs[cycle_node_id] = {"cycle_items": items} + execution.output_data = new_output_data self.db.commit() elif event.get("event") == "workflow_start": event["data"]["message_id"] = str(message_id) From f63bcd632105ca47eb9a23b432ce29303fc8fdf1 Mon Sep 17 00:00:00 2001 From: wxy Date: Fri, 24 Apr 2026 18:49:55 +0800 Subject: [PATCH 8/8] refactor(tool): flatten request body parameters for model exposure - Refactor the extraction logic in tool service to flatten request body parameters into independent arguments exposed to the model. --- api/app/core/tools/custom/base.py | 18 ++++++++++++++++++ api/app/services/tool_service.py | 7 ++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/api/app/core/tools/custom/base.py b/api/app/core/tools/custom/base.py index c03fe206..06237d32 100644 --- a/api/app/core/tools/custom/base.py +++ b/api/app/core/tools/custom/base.py @@ -73,6 +73,7 @@ class CustomTool(BaseTool): # 添加通用参数(基于第一个操作的参数) if self._parsed_operations: first_operation = next(iter(self._parsed_operations.values())) + # path/query 参数 for param_name, param_info in first_operation.get("parameters", {}).items(): params.append(ToolParameter( name=param_name, @@ -85,6 +86,23 @@ class CustomTool(BaseTool): maximum=param_info.get("maximum"), pattern=param_info.get("pattern") )) + # requestBody 参数 — 将 body 字段平铺为独立参数暴露给模型 + request_body = first_operation.get("request_body") + if request_body: + body_schema = request_body.get("properties", {}) + required_fields = request_body.get("required", []) + for prop_name, prop_schema in body_schema.items(): + params.append(ToolParameter( + name=prop_name, + type=self._convert_openapi_type(prop_schema.get("type", "string")), + description=prop_schema.get("description", ""), + required=prop_name in required_fields, + default=prop_schema.get("default"), + enum=prop_schema.get("enum"), + minimum=prop_schema.get("minimum"), + maximum=prop_schema.get("maximum"), + pattern=prop_schema.get("pattern") + )) return params diff --git a/api/app/services/tool_service.py b/api/app/services/tool_service.py index 9a59cd81..ff734c9d 100644 --- a/api/app/services/tool_service.py +++ b/api/app/services/tool_service.py @@ -815,11 +815,12 @@ class ToolService: "default": param_info.get("default") }) - # 请求体参数 + # 请求体参数 — _extract_request_body 返回 {"schema": {...}, "required": bool, ...} request_body = operation.get("request_body") if request_body: - schema_props = request_body.get("schema", {}).get("properties", {}) - required_props = request_body.get("schema", {}).get("required", []) + body_schema = request_body.get("schema", {}) + schema_props = body_schema.get("properties", {}) + required_props = body_schema.get("required", []) for prop_name, prop_schema in schema_props.items(): parameters.append({