feat(workflow): emit SSE events for node exception output

This commit is contained in:
Eternity
2026-01-26 11:59:13 +08:00
parent e3b6ede992
commit 0fd8a122fb
2 changed files with 38 additions and 17 deletions

View File

@@ -261,7 +261,7 @@ class WorkflowExecutor:
"data": { "data": {
"execution_id": self.execution_id, "execution_id": self.execution_id,
"workspace_id": self.workspace_id, "workspace_id": self.workspace_id,
"timestamp": start_time.isoformat() "timestamp": int(start_time.timestamp() * 1000)
} }
} }
@@ -293,20 +293,33 @@ class WorkflowExecutor:
# Handle custom streaming events (chunks from nodes via stream writer) # Handle custom streaming events (chunks from nodes via stream writer)
chunk_count += 1 chunk_count += 1
event_type = data.get("type", "node_chunk") # "message" or "node_chunk" event_type = data.get("type", "node_chunk") # "message" or "node_chunk"
logger.info(f"[CUSTOM] ✅ 收到 {event_type} #{chunk_count} from {data.get('node_id')}" if event_type in ("message", "node_chunk"):
f"- execution_id: {self.execution_id}") logger.info(f"[CUSTOM] ✅ 收到 {event_type} #{chunk_count} from {data.get('node_id')}"
yield { f"- execution_id: {self.execution_id}")
"event": event_type, # "message" or "node_chunk" yield {
"data": { "event": event_type, # "message" or "node_chunk"
"node_id": data.get("node_id"), "data": {
"chunk": data.get("chunk"), "node_id": data.get("node_id"),
"full_content": data.get("full_content"), "chunk": data.get("chunk"),
"chunk_index": data.get("chunk_index"), "full_content": data.get("full_content"),
"is_prefix": data.get("is_prefix"), "chunk_index": data.get("chunk_index"),
"is_suffix": data.get("is_suffix"), "is_prefix": data.get("is_prefix"),
"conversation_id": input_data.get("conversation_id"), "is_suffix": data.get("is_suffix"),
"conversation_id": input_data.get("conversation_id"),
}
}
elif event_type == "node_error":
yield {
"event": event_type, # "message" or "node_chunk"
"data": {
"node_id": data.get("node_id"),
"status": "failed",
"input": data.get("input_data"),
"elapsed_time": data.get("elapsed_time"),
"output": None,
"error": data.get("error")
}
} }
}
elif mode == "debug": elif mode == "debug":
# Handle debug information (node execution status) # Handle debug information (node execution status)
@@ -325,14 +338,15 @@ class WorkflowExecutor:
conversation_id = input_data.get("conversation_id") conversation_id = input_data.get("conversation_id")
logger.info(f"[NODE-START] Node starts execution: {node_name} " logger.info(f"[NODE-START] Node starts execution: {node_name} "
f"- execution_id: {self.execution_id}") f"- execution_id: {self.execution_id}")
yield { yield {
"event": "node_start", "event": "node_start",
"data": { "data": {
"node_id": node_name, "node_id": node_name,
"conversation_id": conversation_id, "conversation_id": conversation_id,
"execution_id": self.execution_id, "execution_id": self.execution_id,
"timestamp": data.get("timestamp"), "timestamp": int(datetime.datetime.fromisoformat(
data.get("timestamp")
).timestamp() * 1000),
} }
} }
elif event_type == "task_result": elif event_type == "task_result":
@@ -351,7 +365,9 @@ class WorkflowExecutor:
"node_id": node_name, "node_id": node_name,
"conversation_id": conversation_id, "conversation_id": conversation_id,
"execution_id": self.execution_id, "execution_id": self.execution_id,
"timestamp": data.get("timestamp"), "timestamp": int(datetime.datetime.fromisoformat(
data.get("timestamp")
).timestamp() * 1000),
"state": result.get("node_outputs", {}).get(node_name), "state": result.get("node_outputs", {}).get(node_name),
} }
} }

View File

@@ -541,6 +541,11 @@ class BaseNode(ABC):
"error_node": self.node_id "error_node": self.node_id
} }
else: else:
writer = get_stream_writer()
writer({
"type": "node_error",
**node_output
})
# 无错误边:抛出异常停止工作流 # 无错误边:抛出异常停止工作流
logger.error(f"节点 {self.node_id} 执行失败,停止工作流: {error_message}") logger.error(f"节点 {self.node_id} 执行失败,停止工作流: {error_message}")
raise Exception(f"节点 {self.node_id} 执行失败: {error_message}") raise Exception(f"节点 {self.node_id} 执行失败: {error_message}")