fix(workflow): rectify error handling and bolster execution logging

- Rectify exception propagation during node execution failures to ensure errors are correctly raised.
- Bolster workflow logging to support failed status records and persist node execution data, including loop nodes.
This commit is contained in:
wwq
2026-04-24 11:50:48 +08:00
parent 0f7a7263eb
commit 2d120a64b1

View File

@@ -228,21 +228,19 @@ class BaseNode(ABC):
logger.error( logger.error(
f"Node {self.node_id} execution timed out ({timeout} seconds)." f"Node {self.node_id} execution timed out ({timeout} seconds)."
) )
self._wrap_error( return self._wrap_error(
f"Node execution timed out ({timeout} seconds).", f"Node execution timed out ({timeout} seconds).",
elapsed_time, elapsed_time,
state, state,
variable_pool, variable_pool,
) )
raise
except Exception as e: except Exception as e:
elapsed_time = (time.time() - start_time) * 1000 elapsed_time = (time.time() - start_time) * 1000
logger.error( logger.error(
f"Node {self.node_id} execution failed: {e}", f"Node {self.node_id} execution failed: {e}",
exc_info=True, exc_info=True,
) )
self._wrap_error(str(e), elapsed_time, state, variable_pool) return self._wrap_error(str(e), elapsed_time, state, variable_pool)
raise
async def run_stream( async def run_stream(
self, state: WorkflowState, self, state: WorkflowState,
@@ -353,13 +351,11 @@ class BaseNode(ABC):
variable_pool variable_pool
) )
yield error_output yield error_output
raise
except Exception as e: except Exception as e:
elapsed_time = (time.time() - start_time) * 1000 elapsed_time = (time.time() - start_time) * 1000
logger.error(f"Node {self.node_id} execution failed: {e}", exc_info=True) logger.error(f"Node {self.node_id} execution failed: {e}", exc_info=True)
error_output = self._wrap_error(str(e), elapsed_time, state, variable_pool) error_output = self._wrap_error(str(e), elapsed_time, state, variable_pool)
yield error_output yield error_output
raise
def _wrap_output( def _wrap_output(
self, self,
@@ -457,13 +453,7 @@ class BaseNode(ABC):
**node_output **node_output
}) })
logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}") logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}")
return { raise Exception(f"Node {self.node_id} execution failed: {error_message}")
"node_outputs": {
self.node_id: node_output
},
"error": error_message,
"error_node": self.node_id
}
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
"""Extracts the input data for this node (used for logging or audit). """Extracts the input data for this node (used for logging or audit).