From 0f7a7263ebad9f7b91267244ea4a837b5d58c5b5 Mon Sep 17 00:00:00 2001 From: wwq Date: Fri, 24 Apr 2026 11:39:33 +0800 Subject: [PATCH] fix(workflow): rectify error handling and bolster execution logging - Rectify exception propagation during node execution failures to ensure errors are correctly raised. - Bolster workflow logging to support failed status records and persist node execution data, including loop nodes. --- api/app/core/workflow/nodes/base_node.py | 29 ++++----- api/app/services/app_log_service.py | 56 ++++++++--------- api/app/services/workflow_service.py | 78 +++++++++++++++++++++++- 3 files changed, 117 insertions(+), 46 deletions(-) diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 5458a80c..85f4bc5f 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -228,19 +228,21 @@ class BaseNode(ABC): logger.error( f"Node {self.node_id} execution timed out ({timeout} seconds)." ) - return self._wrap_error( + self._wrap_error( f"Node execution timed out ({timeout} seconds).", elapsed_time, state, variable_pool, ) + raise except Exception as e: elapsed_time = (time.time() - start_time) * 1000 logger.error( f"Node {self.node_id} execution failed: {e}", exc_info=True, ) - return self._wrap_error(str(e), elapsed_time, state, variable_pool) + self._wrap_error(str(e), elapsed_time, state, variable_pool) + raise async def run_stream( self, state: WorkflowState, @@ -351,11 +353,13 @@ class BaseNode(ABC): variable_pool ) yield error_output + raise except Exception as e: elapsed_time = (time.time() - start_time) * 1000 logger.error(f"Node {self.node_id} execution failed: {e}", exc_info=True) error_output = self._wrap_error(str(e), elapsed_time, state, variable_pool) yield error_output + raise def _wrap_output( self, @@ -447,26 +451,19 @@ class BaseNode(ABC): "error": error_message } - # if error_edge: - # # If an error edge exists, log a warning and continue to error node - # logger.warning( - # f"Node {self.node_id} execution failed, redirecting to error node: {error_edge['target']}" - # ) - # return { - # "node_outputs": { - # self.node_id: node_output - # }, - # "error": error_message, - # "error_node": self.node_id - # } - # else: writer = get_stream_writer() writer({ "type": "node_error", **node_output }) logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}") - raise Exception(f"Node {self.node_id} execution failed: {error_message}") + return { + "node_outputs": { + self.node_id: node_output + }, + "error": error_message, + "error_node": self.node_id + } def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: """Extracts the input data for this node (used for logging or audit). diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index 8f5052e6..b2801df3 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -163,7 +163,7 @@ class AppLogService: # 查询该会话关联的所有工作流执行记录(按时间正序) stmt = select(WorkflowExecution).where( WorkflowExecution.conversation_id == conversation_id, - WorkflowExecution.status == "completed" + WorkflowExecution.status.in_(["completed", "failed"]) ).order_by(WorkflowExecution.started_at.asc()) executions = self.db.scalars(stmt).all() @@ -188,10 +188,33 @@ class AppLogService: used_message_ids: set[str] = set() for execution in executions: - if not execution.output_data: + # 构建节点执行记录列表 + execution_nodes = [] + for node_exec in execution.node_executions: + node_execution = AppLogNodeExecution( + node_id=node_exec.node_id, + node_type=node_exec.node_type, + node_name=node_exec.node_name, + status=node_exec.status, + error=node_exec.error_message, + input=node_exec.input_data, + process=None, + output=node_exec.output_data, + elapsed_time=node_exec.elapsed_time, + token_usage=node_exec.token_usage, + ) + node_executions.append(node_execution) + execution_nodes.append(node_execution) + + if not execution_nodes: continue - # 找到该 execution 对应的 assistant message + # 失败的执行没有 assistant message,直接用 execution id 作为 key + if execution.status == "failed": + node_executions_map[f"execution_{str(execution.id)}"] = execution_nodes + continue + + # completed:通过时序匹配关联到对应的 assistant message # 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message best_msg = None best_dt = None @@ -210,31 +233,6 @@ class AppLogService: msg_id_str = str(best_msg.id) used_message_ids.add(msg_id_str) - - # 提取节点输出 - output_data = execution.output_data - if isinstance(output_data, dict): - node_outputs = output_data.get("node_outputs", {}) - execution_nodes = [] - for node_id, node_data in node_outputs.items(): - if not isinstance(node_data, dict): - continue - node_execution = AppLogNodeExecution( - node_id=node_data.get("node_id", node_id), - node_type=node_data.get("node_type", "unknown"), - node_name=node_data.get("node_name"), - status=node_data.get("status", "unknown"), - error=node_data.get("error"), - input=node_data.get("input"), - process=node_data.get("process"), - output=node_data.get("output"), - elapsed_time=node_data.get("elapsed_time"), - token_usage=node_data.get("token_usage"), - ) - node_executions.append(node_execution) - execution_nodes.append(node_execution) - - # 将节点记录关联到 message_id - node_executions_map[msg_id_str] = execution_nodes + node_executions_map[msg_id_str] = execution_nodes return node_executions, node_executions_map diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index 0d282d78..6c93893a 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -17,8 +17,9 @@ from app.core.workflow.executor import execute_workflow, execute_workflow_stream from app.core.workflow.nodes.enums import NodeType from app.core.workflow.validator import validate_workflow_config from app.db import get_db +from sqlalchemy import select from app.models import App -from app.models.workflow_model import WorkflowConfig, WorkflowExecution +from app.models.workflow_model import WorkflowConfig, WorkflowExecution, WorkflowNodeExecution from app.repositories import knowledge_repository from app.repositories.workflow_repository import ( WorkflowConfigRepository, @@ -909,6 +910,8 @@ class WorkflowService: input_data["conv_messages"] = conv_messages init_message_length = len(input_data.get("conv_messages", [])) message_id = uuid.uuid4() + _node_order_counter = 0 + _cycle_items: dict[str, list] = {} # 新会话时写入开场白 is_new_conversation = init_message_length == 0 @@ -939,6 +942,52 @@ class WorkflowService: memory_storage_type=storage_type, user_rag_memory_id=user_rag_memory_id ): + event_type = event.get("event") + event_data = event.get("data", {}) + + # 持久化节点执行记录 + if event_type == "node_end": + node_id = event_data.get("node_id") + node_cfg = next((n for n in config.nodes if n.get("id") == node_id), {}) + self.db.add(WorkflowNodeExecution( + execution_id=execution.id, + node_id=node_id, + node_type=node_cfg.get("type", "unknown"), + node_name=node_cfg.get("data", {}).get("label") or node_id, + execution_order=_node_order_counter, + status="completed", + input_data=event_data.get("input"), + output_data=event_data.get("output"), + elapsed_time=event_data.get("elapsed_time"), + token_usage=event_data.get("token_usage"), + )) + self.db.commit() + _node_order_counter += 1 + + elif event_type == "node_error": + node_id = event_data.get("node_id") + node_cfg = next((n for n in config.nodes if n.get("id") == node_id), {}) + self.db.add(WorkflowNodeExecution( + execution_id=execution.id, + node_id=node_id, + node_type=node_cfg.get("type", "unknown"), + node_name=node_cfg.get("data", {}).get("label") or node_id, + execution_order=_node_order_counter, + status="failed", + input_data=event_data.get("input"), + output_data=None, + error_message=event_data.get("error"), + elapsed_time=event_data.get("elapsed_time"), + )) + self.db.commit() + _node_order_counter += 1 + + elif event_type == "cycle_item": + cycle_id = event_data.get("cycle_id") + if cycle_id not in _cycle_items: + _cycle_items[cycle_id] = [] + _cycle_items[cycle_id].append(event_data) + if event.get("event") == "workflow_end": status = event.get("data", {}).get("status") token_usage = event.get("data", {}).get("token_usage", {}) or {} @@ -1003,6 +1052,33 @@ class WorkflowService: ) else: logger.error(f"unexpect workflow run status, status: {status}") + # 把积累的 cycle_item 写入对应循环节点的 output_data + if _cycle_items: + for cycle_node_id, items in _cycle_items.items(): + node_exec = self.db.execute( + select(WorkflowNodeExecution).where( + WorkflowNodeExecution.execution_id == execution.id, + WorkflowNodeExecution.node_id == cycle_node_id + ) + ).scalar_one_or_none() + if node_exec: + node_exec.output_data = { + **(node_exec.output_data or {}), + "cycle_items": items + } + else: + node_cfg = next((n for n in config.nodes if n.get("id") == cycle_node_id), {}) + self.db.add(WorkflowNodeExecution( + execution_id=execution.id, + node_id=cycle_node_id, + node_type=node_cfg.get("type", "cycle"), + node_name=node_cfg.get("data", {}).get("label") or cycle_node_id, + execution_order=_node_order_counter, + status="completed", + output_data={"cycle_items": items}, + )) + _node_order_counter += 1 + self.db.commit() elif event.get("event") == "workflow_start": event["data"]["message_id"] = str(message_id) event = self._emit(public, event)