diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index 6721d7b0..f3feff60 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -261,7 +261,7 @@ class WorkflowExecutor: "data": { "execution_id": self.execution_id, "workspace_id": self.workspace_id, - "timestamp": start_time.isoformat() + "timestamp": int(start_time.timestamp() * 1000) } } @@ -293,20 +293,33 @@ class WorkflowExecutor: # Handle custom streaming events (chunks from nodes via stream writer) chunk_count += 1 event_type = data.get("type", "node_chunk") # "message" or "node_chunk" - logger.info(f"[CUSTOM] ✅ 收到 {event_type} #{chunk_count} from {data.get('node_id')}" - f"- execution_id: {self.execution_id}") - yield { - "event": event_type, # "message" or "node_chunk" - "data": { - "node_id": data.get("node_id"), - "chunk": data.get("chunk"), - "full_content": data.get("full_content"), - "chunk_index": data.get("chunk_index"), - "is_prefix": data.get("is_prefix"), - "is_suffix": data.get("is_suffix"), - "conversation_id": input_data.get("conversation_id"), + if event_type in ("message", "node_chunk"): + logger.info(f"[CUSTOM] ✅ 收到 {event_type} #{chunk_count} from {data.get('node_id')}" + f"- execution_id: {self.execution_id}") + yield { + "event": event_type, # "message" or "node_chunk" + "data": { + "node_id": data.get("node_id"), + "chunk": data.get("chunk"), + "full_content": data.get("full_content"), + "chunk_index": data.get("chunk_index"), + "is_prefix": data.get("is_prefix"), + "is_suffix": data.get("is_suffix"), + "conversation_id": input_data.get("conversation_id"), + } + } + elif event_type == "node_error": + yield { + "event": event_type, # "message" or "node_chunk" + "data": { + "node_id": data.get("node_id"), + "status": "failed", + "input": data.get("input_data"), + "elapsed_time": data.get("elapsed_time"), + "output": None, + "error": data.get("error") + } } - } elif mode == "debug": # Handle debug information (node execution status) @@ -325,14 +338,15 @@ class WorkflowExecutor: conversation_id = input_data.get("conversation_id") logger.info(f"[NODE-START] Node starts execution: {node_name} " f"- execution_id: {self.execution_id}") - yield { "event": "node_start", "data": { "node_id": node_name, "conversation_id": conversation_id, "execution_id": self.execution_id, - "timestamp": data.get("timestamp"), + "timestamp": int(datetime.datetime.fromisoformat( + data.get("timestamp") + ).timestamp() * 1000), } } elif event_type == "task_result": @@ -351,7 +365,9 @@ class WorkflowExecutor: "node_id": node_name, "conversation_id": conversation_id, "execution_id": self.execution_id, - "timestamp": data.get("timestamp"), + "timestamp": int(datetime.datetime.fromisoformat( + data.get("timestamp") + ).timestamp() * 1000), "state": result.get("node_outputs", {}).get(node_name), } } diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 0c015c89..61d5ca1e 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -541,6 +541,11 @@ class BaseNode(ABC): "error_node": self.node_id } else: + writer = get_stream_writer() + writer({ + "type": "node_error", + **node_output + }) # 无错误边:抛出异常停止工作流 logger.error(f"节点 {self.node_id} 执行失败,停止工作流: {error_message}") raise Exception(f"节点 {self.node_id} 执行失败: {error_message}")