fix(workflow): rectify error handling and bolster execution logging
- Rectify exception propagation during node execution failures to ensure errors are correctly raised. - Bolster workflow logging to support failed status records and persist node execution data, including loop nodes.
This commit is contained in:
@@ -228,19 +228,21 @@ class BaseNode(ABC):
|
|||||||
logger.error(
|
logger.error(
|
||||||
f"Node {self.node_id} execution timed out ({timeout} seconds)."
|
f"Node {self.node_id} execution timed out ({timeout} seconds)."
|
||||||
)
|
)
|
||||||
return self._wrap_error(
|
self._wrap_error(
|
||||||
f"Node execution timed out ({timeout} seconds).",
|
f"Node execution timed out ({timeout} seconds).",
|
||||||
elapsed_time,
|
elapsed_time,
|
||||||
state,
|
state,
|
||||||
variable_pool,
|
variable_pool,
|
||||||
)
|
)
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
elapsed_time = (time.time() - start_time) * 1000
|
elapsed_time = (time.time() - start_time) * 1000
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Node {self.node_id} execution failed: {e}",
|
f"Node {self.node_id} execution failed: {e}",
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
return self._wrap_error(str(e), elapsed_time, state, variable_pool)
|
self._wrap_error(str(e), elapsed_time, state, variable_pool)
|
||||||
|
raise
|
||||||
|
|
||||||
async def run_stream(
|
async def run_stream(
|
||||||
self, state: WorkflowState,
|
self, state: WorkflowState,
|
||||||
@@ -351,11 +353,13 @@ class BaseNode(ABC):
|
|||||||
variable_pool
|
variable_pool
|
||||||
)
|
)
|
||||||
yield error_output
|
yield error_output
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
elapsed_time = (time.time() - start_time) * 1000
|
elapsed_time = (time.time() - start_time) * 1000
|
||||||
logger.error(f"Node {self.node_id} execution failed: {e}", exc_info=True)
|
logger.error(f"Node {self.node_id} execution failed: {e}", exc_info=True)
|
||||||
error_output = self._wrap_error(str(e), elapsed_time, state, variable_pool)
|
error_output = self._wrap_error(str(e), elapsed_time, state, variable_pool)
|
||||||
yield error_output
|
yield error_output
|
||||||
|
raise
|
||||||
|
|
||||||
def _wrap_output(
|
def _wrap_output(
|
||||||
self,
|
self,
|
||||||
@@ -447,26 +451,19 @@ class BaseNode(ABC):
|
|||||||
"error": error_message
|
"error": error_message
|
||||||
}
|
}
|
||||||
|
|
||||||
# if error_edge:
|
|
||||||
# # If an error edge exists, log a warning and continue to error node
|
|
||||||
# logger.warning(
|
|
||||||
# f"Node {self.node_id} execution failed, redirecting to error node: {error_edge['target']}"
|
|
||||||
# )
|
|
||||||
# return {
|
|
||||||
# "node_outputs": {
|
|
||||||
# self.node_id: node_output
|
|
||||||
# },
|
|
||||||
# "error": error_message,
|
|
||||||
# "error_node": self.node_id
|
|
||||||
# }
|
|
||||||
# else:
|
|
||||||
writer = get_stream_writer()
|
writer = get_stream_writer()
|
||||||
writer({
|
writer({
|
||||||
"type": "node_error",
|
"type": "node_error",
|
||||||
**node_output
|
**node_output
|
||||||
})
|
})
|
||||||
logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}")
|
logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}")
|
||||||
raise Exception(f"Node {self.node_id} execution failed: {error_message}")
|
return {
|
||||||
|
"node_outputs": {
|
||||||
|
self.node_id: node_output
|
||||||
|
},
|
||||||
|
"error": error_message,
|
||||||
|
"error_node": self.node_id
|
||||||
|
}
|
||||||
|
|
||||||
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
|
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
|
||||||
"""Extracts the input data for this node (used for logging or audit).
|
"""Extracts the input data for this node (used for logging or audit).
|
||||||
|
|||||||
@@ -163,7 +163,7 @@ class AppLogService:
|
|||||||
# 查询该会话关联的所有工作流执行记录(按时间正序)
|
# 查询该会话关联的所有工作流执行记录(按时间正序)
|
||||||
stmt = select(WorkflowExecution).where(
|
stmt = select(WorkflowExecution).where(
|
||||||
WorkflowExecution.conversation_id == conversation_id,
|
WorkflowExecution.conversation_id == conversation_id,
|
||||||
WorkflowExecution.status == "completed"
|
WorkflowExecution.status.in_(["completed", "failed"])
|
||||||
).order_by(WorkflowExecution.started_at.asc())
|
).order_by(WorkflowExecution.started_at.asc())
|
||||||
|
|
||||||
executions = self.db.scalars(stmt).all()
|
executions = self.db.scalars(stmt).all()
|
||||||
@@ -188,10 +188,33 @@ class AppLogService:
|
|||||||
used_message_ids: set[str] = set()
|
used_message_ids: set[str] = set()
|
||||||
|
|
||||||
for execution in executions:
|
for execution in executions:
|
||||||
if not execution.output_data:
|
# 构建节点执行记录列表
|
||||||
|
execution_nodes = []
|
||||||
|
for node_exec in execution.node_executions:
|
||||||
|
node_execution = AppLogNodeExecution(
|
||||||
|
node_id=node_exec.node_id,
|
||||||
|
node_type=node_exec.node_type,
|
||||||
|
node_name=node_exec.node_name,
|
||||||
|
status=node_exec.status,
|
||||||
|
error=node_exec.error_message,
|
||||||
|
input=node_exec.input_data,
|
||||||
|
process=None,
|
||||||
|
output=node_exec.output_data,
|
||||||
|
elapsed_time=node_exec.elapsed_time,
|
||||||
|
token_usage=node_exec.token_usage,
|
||||||
|
)
|
||||||
|
node_executions.append(node_execution)
|
||||||
|
execution_nodes.append(node_execution)
|
||||||
|
|
||||||
|
if not execution_nodes:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 找到该 execution 对应的 assistant message
|
# 失败的执行没有 assistant message,直接用 execution id 作为 key
|
||||||
|
if execution.status == "failed":
|
||||||
|
node_executions_map[f"execution_{str(execution.id)}"] = execution_nodes
|
||||||
|
continue
|
||||||
|
|
||||||
|
# completed:通过时序匹配关联到对应的 assistant message
|
||||||
# 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message
|
# 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message
|
||||||
best_msg = None
|
best_msg = None
|
||||||
best_dt = None
|
best_dt = None
|
||||||
@@ -210,31 +233,6 @@ class AppLogService:
|
|||||||
|
|
||||||
msg_id_str = str(best_msg.id)
|
msg_id_str = str(best_msg.id)
|
||||||
used_message_ids.add(msg_id_str)
|
used_message_ids.add(msg_id_str)
|
||||||
|
node_executions_map[msg_id_str] = execution_nodes
|
||||||
# 提取节点输出
|
|
||||||
output_data = execution.output_data
|
|
||||||
if isinstance(output_data, dict):
|
|
||||||
node_outputs = output_data.get("node_outputs", {})
|
|
||||||
execution_nodes = []
|
|
||||||
for node_id, node_data in node_outputs.items():
|
|
||||||
if not isinstance(node_data, dict):
|
|
||||||
continue
|
|
||||||
node_execution = AppLogNodeExecution(
|
|
||||||
node_id=node_data.get("node_id", node_id),
|
|
||||||
node_type=node_data.get("node_type", "unknown"),
|
|
||||||
node_name=node_data.get("node_name"),
|
|
||||||
status=node_data.get("status", "unknown"),
|
|
||||||
error=node_data.get("error"),
|
|
||||||
input=node_data.get("input"),
|
|
||||||
process=node_data.get("process"),
|
|
||||||
output=node_data.get("output"),
|
|
||||||
elapsed_time=node_data.get("elapsed_time"),
|
|
||||||
token_usage=node_data.get("token_usage"),
|
|
||||||
)
|
|
||||||
node_executions.append(node_execution)
|
|
||||||
execution_nodes.append(node_execution)
|
|
||||||
|
|
||||||
# 将节点记录关联到 message_id
|
|
||||||
node_executions_map[msg_id_str] = execution_nodes
|
|
||||||
|
|
||||||
return node_executions, node_executions_map
|
return node_executions, node_executions_map
|
||||||
|
|||||||
@@ -17,8 +17,9 @@ from app.core.workflow.executor import execute_workflow, execute_workflow_stream
|
|||||||
from app.core.workflow.nodes.enums import NodeType
|
from app.core.workflow.nodes.enums import NodeType
|
||||||
from app.core.workflow.validator import validate_workflow_config
|
from app.core.workflow.validator import validate_workflow_config
|
||||||
from app.db import get_db
|
from app.db import get_db
|
||||||
|
from sqlalchemy import select
|
||||||
from app.models import App
|
from app.models import App
|
||||||
from app.models.workflow_model import WorkflowConfig, WorkflowExecution
|
from app.models.workflow_model import WorkflowConfig, WorkflowExecution, WorkflowNodeExecution
|
||||||
from app.repositories import knowledge_repository
|
from app.repositories import knowledge_repository
|
||||||
from app.repositories.workflow_repository import (
|
from app.repositories.workflow_repository import (
|
||||||
WorkflowConfigRepository,
|
WorkflowConfigRepository,
|
||||||
@@ -909,6 +910,8 @@ class WorkflowService:
|
|||||||
input_data["conv_messages"] = conv_messages
|
input_data["conv_messages"] = conv_messages
|
||||||
init_message_length = len(input_data.get("conv_messages", []))
|
init_message_length = len(input_data.get("conv_messages", []))
|
||||||
message_id = uuid.uuid4()
|
message_id = uuid.uuid4()
|
||||||
|
_node_order_counter = 0
|
||||||
|
_cycle_items: dict[str, list] = {}
|
||||||
|
|
||||||
# 新会话时写入开场白
|
# 新会话时写入开场白
|
||||||
is_new_conversation = init_message_length == 0
|
is_new_conversation = init_message_length == 0
|
||||||
@@ -939,6 +942,52 @@ class WorkflowService:
|
|||||||
memory_storage_type=storage_type,
|
memory_storage_type=storage_type,
|
||||||
user_rag_memory_id=user_rag_memory_id
|
user_rag_memory_id=user_rag_memory_id
|
||||||
):
|
):
|
||||||
|
event_type = event.get("event")
|
||||||
|
event_data = event.get("data", {})
|
||||||
|
|
||||||
|
# 持久化节点执行记录
|
||||||
|
if event_type == "node_end":
|
||||||
|
node_id = event_data.get("node_id")
|
||||||
|
node_cfg = next((n for n in config.nodes if n.get("id") == node_id), {})
|
||||||
|
self.db.add(WorkflowNodeExecution(
|
||||||
|
execution_id=execution.id,
|
||||||
|
node_id=node_id,
|
||||||
|
node_type=node_cfg.get("type", "unknown"),
|
||||||
|
node_name=node_cfg.get("data", {}).get("label") or node_id,
|
||||||
|
execution_order=_node_order_counter,
|
||||||
|
status="completed",
|
||||||
|
input_data=event_data.get("input"),
|
||||||
|
output_data=event_data.get("output"),
|
||||||
|
elapsed_time=event_data.get("elapsed_time"),
|
||||||
|
token_usage=event_data.get("token_usage"),
|
||||||
|
))
|
||||||
|
self.db.commit()
|
||||||
|
_node_order_counter += 1
|
||||||
|
|
||||||
|
elif event_type == "node_error":
|
||||||
|
node_id = event_data.get("node_id")
|
||||||
|
node_cfg = next((n for n in config.nodes if n.get("id") == node_id), {})
|
||||||
|
self.db.add(WorkflowNodeExecution(
|
||||||
|
execution_id=execution.id,
|
||||||
|
node_id=node_id,
|
||||||
|
node_type=node_cfg.get("type", "unknown"),
|
||||||
|
node_name=node_cfg.get("data", {}).get("label") or node_id,
|
||||||
|
execution_order=_node_order_counter,
|
||||||
|
status="failed",
|
||||||
|
input_data=event_data.get("input"),
|
||||||
|
output_data=None,
|
||||||
|
error_message=event_data.get("error"),
|
||||||
|
elapsed_time=event_data.get("elapsed_time"),
|
||||||
|
))
|
||||||
|
self.db.commit()
|
||||||
|
_node_order_counter += 1
|
||||||
|
|
||||||
|
elif event_type == "cycle_item":
|
||||||
|
cycle_id = event_data.get("cycle_id")
|
||||||
|
if cycle_id not in _cycle_items:
|
||||||
|
_cycle_items[cycle_id] = []
|
||||||
|
_cycle_items[cycle_id].append(event_data)
|
||||||
|
|
||||||
if event.get("event") == "workflow_end":
|
if event.get("event") == "workflow_end":
|
||||||
status = event.get("data", {}).get("status")
|
status = event.get("data", {}).get("status")
|
||||||
token_usage = event.get("data", {}).get("token_usage", {}) or {}
|
token_usage = event.get("data", {}).get("token_usage", {}) or {}
|
||||||
@@ -1003,6 +1052,33 @@ class WorkflowService:
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.error(f"unexpect workflow run status, status: {status}")
|
logger.error(f"unexpect workflow run status, status: {status}")
|
||||||
|
# 把积累的 cycle_item 写入对应循环节点的 output_data
|
||||||
|
if _cycle_items:
|
||||||
|
for cycle_node_id, items in _cycle_items.items():
|
||||||
|
node_exec = self.db.execute(
|
||||||
|
select(WorkflowNodeExecution).where(
|
||||||
|
WorkflowNodeExecution.execution_id == execution.id,
|
||||||
|
WorkflowNodeExecution.node_id == cycle_node_id
|
||||||
|
)
|
||||||
|
).scalar_one_or_none()
|
||||||
|
if node_exec:
|
||||||
|
node_exec.output_data = {
|
||||||
|
**(node_exec.output_data or {}),
|
||||||
|
"cycle_items": items
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
node_cfg = next((n for n in config.nodes if n.get("id") == cycle_node_id), {})
|
||||||
|
self.db.add(WorkflowNodeExecution(
|
||||||
|
execution_id=execution.id,
|
||||||
|
node_id=cycle_node_id,
|
||||||
|
node_type=node_cfg.get("type", "cycle"),
|
||||||
|
node_name=node_cfg.get("data", {}).get("label") or cycle_node_id,
|
||||||
|
execution_order=_node_order_counter,
|
||||||
|
status="completed",
|
||||||
|
output_data={"cycle_items": items},
|
||||||
|
))
|
||||||
|
_node_order_counter += 1
|
||||||
|
self.db.commit()
|
||||||
elif event.get("event") == "workflow_start":
|
elif event.get("event") == "workflow_start":
|
||||||
event["data"]["message_id"] = str(message_id)
|
event["data"]["message_id"] = str(message_id)
|
||||||
event = self._emit(public, event)
|
event = self._emit(public, event)
|
||||||
|
|||||||
Reference in New Issue
Block a user