From 2d120a64b1d9975f05dd5df4482ddce5723e1950 Mon Sep 17 00:00:00 2001 From: wwq Date: Fri, 24 Apr 2026 11:50:48 +0800 Subject: [PATCH] fix(workflow): rectify error handling and bolster execution logging - Rectify exception propagation during node execution failures to ensure errors are correctly raised. - Bolster workflow logging to support failed status records and persist node execution data, including loop nodes. --- api/app/core/workflow/nodes/base_node.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 85f4bc5f..a0f1f01e 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -228,21 +228,19 @@ class BaseNode(ABC): logger.error( f"Node {self.node_id} execution timed out ({timeout} seconds)." ) - self._wrap_error( + return self._wrap_error( f"Node execution timed out ({timeout} seconds).", elapsed_time, state, variable_pool, ) - raise except Exception as e: elapsed_time = (time.time() - start_time) * 1000 logger.error( f"Node {self.node_id} execution failed: {e}", exc_info=True, ) - self._wrap_error(str(e), elapsed_time, state, variable_pool) - raise + return self._wrap_error(str(e), elapsed_time, state, variable_pool) async def run_stream( self, state: WorkflowState, @@ -353,13 +351,11 @@ class BaseNode(ABC): variable_pool ) yield error_output - raise except Exception as e: elapsed_time = (time.time() - start_time) * 1000 logger.error(f"Node {self.node_id} execution failed: {e}", exc_info=True) error_output = self._wrap_error(str(e), elapsed_time, state, variable_pool) yield error_output - raise def _wrap_output( self, @@ -457,13 +453,7 @@ class BaseNode(ABC): **node_output }) logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}") - return { - "node_outputs": { - self.node_id: node_output - }, - "error": error_message, - "error_node": self.node_id - } + raise Exception(f"Node {self.node_id} execution failed: {error_message}") def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: """Extracts the input data for this node (used for logging or audit).