diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 85f4bc5f..a0f1f01e 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -228,21 +228,19 @@ class BaseNode(ABC): logger.error( f"Node {self.node_id} execution timed out ({timeout} seconds)." ) - self._wrap_error( + return self._wrap_error( f"Node execution timed out ({timeout} seconds).", elapsed_time, state, variable_pool, ) - raise except Exception as e: elapsed_time = (time.time() - start_time) * 1000 logger.error( f"Node {self.node_id} execution failed: {e}", exc_info=True, ) - self._wrap_error(str(e), elapsed_time, state, variable_pool) - raise + return self._wrap_error(str(e), elapsed_time, state, variable_pool) async def run_stream( self, state: WorkflowState, @@ -353,13 +351,11 @@ class BaseNode(ABC): variable_pool ) yield error_output - raise except Exception as e: elapsed_time = (time.time() - start_time) * 1000 logger.error(f"Node {self.node_id} execution failed: {e}", exc_info=True) error_output = self._wrap_error(str(e), elapsed_time, state, variable_pool) yield error_output - raise def _wrap_output( self, @@ -457,13 +453,7 @@ class BaseNode(ABC): **node_output }) logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}") - return { - "node_outputs": { - self.node_id: node_output - }, - "error": error_message, - "error_node": self.node_id - } + raise Exception(f"Node {self.node_id} execution failed: {error_message}") def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: """Extracts the input data for this node (used for logging or audit).