diff --git a/api/app/controllers/app_controller.py b/api/app/controllers/app_controller.py index a92cfab2..29656608 100644 --- a/api/app/controllers/app_controller.py +++ b/api/app/controllers/app_controller.py @@ -583,15 +583,27 @@ async def draft_run( ) async def event_generator(): - """工作流事件生成器""" - - # 调用多智能体服务的流式方法 + """工作流事件生成器 + + 将事件转换为标准 SSE 格式: + event: + data: + """ + import json + + # 调用工作流服务的流式方法 async for event in workflow_service.run_stream( app_id=app_id, payload=payload, config=config ): - yield event + # 提取事件类型和数据 + event_type = event.get("event", "message") + event_data = event.get("data", {}) + + # 转换为标准 SSE 格式(字符串) + sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n" + yield sse_message return StreamingResponse( event_generator(), diff --git a/api/app/controllers/workflow_controller.py b/api/app/controllers/workflow_controller.py index 9ccfa858..91c21392 100644 --- a/api/app/controllers/workflow_controller.py +++ b/api/app/controllers/workflow_controller.py @@ -471,7 +471,20 @@ async def run_workflow( import json async def event_generator(): - """生成 SSE 事件""" + """生成 SSE 事件 + + SSE 格式: + event: + data: + + 支持的事件类型: + - workflow_start: 工作流开始 + - workflow_end: 工作流结束 + - node_start: 节点开始执行 + - node_end: 节点执行完成 + - node_chunk: 中间节点的流式输出 + - message: 最终消息的流式输出(End 节点及其相邻节点) + """ try: async for event in service.run_workflow( app_id=app_id, @@ -480,19 +493,30 @@ async def run_workflow( conversation_id=uuid.UUID(request.conversation_id) if request.conversation_id else None, stream=True ): - # 转换为 SSE 格式 - yield f"data: {json.dumps(event)}\n\n" + # 提取事件类型和数据 + event_type = event.get("event", "message") + event_data = event.get("data", {}) + + # 转换为标准 SSE 格式(字符串) + # event: + # data: + sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n" + yield sse_message + except Exception as e: logger.error(f"流式执行异常: {e}", exc_info=True) - error_event = { - "type": "error", - "error": str(e) - } - yield f"data: {json.dumps(error_event)}\n\n" + # 发送错误事件 + sse_error = f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" + yield sse_error return StreamingResponse( event_generator(), - media_type="text/event-stream" + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" # 禁用 nginx 缓冲 + } ) else: # 非流式执行 diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index db4fa626..029de97f 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -375,17 +375,32 @@ class WorkflowExecutor: 使用多个 stream_mode 来获取: 1. "updates" - 节点的 state 更新和流式 chunk 2. "debug" - 节点执行的详细信息(开始/完成时间) + 3. "custom" - 自定义流式数据(chunks) Args: input_data: 输入数据 Yields: - 流式事件 + 流式事件,格式: + { + "event": "workflow_start" | "workflow_end" | "node_start" | "node_end" | "node_chunk" | "message", + "data": {...} + } """ logger.info(f"开始执行工作流(流式): execution_id={self.execution_id}") # 记录开始时间 start_time = datetime.datetime.now() + + # 发送 workflow_start 事件 + yield { + "event": "workflow_start", + "data": { + "execution_id": self.execution_id, + "workspace_id": self.workspace_id, + "timestamp": start_time.isoformat() + } + } # 1. 构建图 graph = self.build_graph(True) @@ -396,6 +411,8 @@ class WorkflowExecutor: # 3. Execute workflow try: chunk_count = 0 + final_state = None + async for event in graph.astream( initial_state, stream_mode=["updates", "debug", "custom"], # Use updates + debug + custom mode @@ -412,16 +429,19 @@ class WorkflowExecutor: if mode == "custom": # Handle custom streaming events (chunks from nodes via stream writer) chunk_count += 1 - event_type = data.get("type", "node_chunk") # 默认为 node_chunk + event_type = data.get("type", "node_chunk") # "message" or "node_chunk" logger.info(f"[CUSTOM] ✅ 收到 {event_type} #{chunk_count} from {data.get('node_id')}") + yield { - "type": event_type, # "message" or "node_chunk" - "node_id": data.get("node_id"), - "chunk": data.get("chunk"), - "full_content": data.get("full_content"), - "chunk_index": data.get("chunk_index"), - "is_prefix": data.get("is_prefix"), - "is_suffix": data.get("is_suffix") + "event": event_type, # "message" or "node_chunk" + "data": { + "node_id": data.get("node_id"), + "chunk": data.get("chunk"), + "full_content": data.get("full_content"), + "chunk_index": data.get("chunk_index"), + "is_prefix": data.get("is_prefix"), + "is_suffix": data.get("is_suffix") + } } elif mode == "debug": @@ -438,12 +458,15 @@ class WorkflowExecutor: conversation_id = variables_sys.get("conversation_id") execution_id = variables_sys.get("execution_id") logger.info(f"[DEBUG] Node starts execution: {node_name}") + yield { - "type": "node_start", - "node_id": node_name, - "conversation_id": conversation_id, - "execution_id": execution_id, - "timestamp": data.get("timestamp") + "event": "node_start", + "data": { + "node_id": node_name, + "conversation_id": conversation_id, + "execution_id": execution_id, + "timestamp": data.get("timestamp") + } } elif event_type == "task_result": # Node execution completed @@ -454,19 +477,38 @@ class WorkflowExecutor: conversation_id = variables_sys.get("conversation_id") execution_id = variables_sys.get("execution_id") logger.info(f"[DEBUG] Node execution completed: {node_name}") + yield { - "type": "node_complete", - "node_id": node_name, - "conversation_id": conversation_id, - "execution_id": execution_id, - "timestamp": data.get("timestamp") + "event": "node_end", + "data": { + "node_id": node_name, + "conversation_id": conversation_id, + "execution_id": execution_id, + "timestamp": data.get("timestamp") + } } elif mode == "updates": - # Handle state updates + # Handle state updates - store final state logger.debug(f"[UPDATES] 收到 state 更新 from {list(data.keys())}") + final_state = data - logger.info(f"Workflow execution completed (streaming), total chunks: {chunk_count}") + # 计算耗时 + end_time = datetime.datetime.now() + elapsed_time = (end_time - start_time).total_seconds() + + logger.info(f"Workflow execution completed (streaming), total chunks: {chunk_count}, elapsed: {elapsed_time:.2f}s") + + # 发送 workflow_end 事件 + yield { + "event": "workflow_end", + "data": { + "execution_id": self.execution_id, + "status": "completed", + "elapsed_time": elapsed_time, + "timestamp": end_time.isoformat() + } + } except Exception as e: # 计算耗时(即使失败也记录) @@ -474,13 +516,17 @@ class WorkflowExecutor: elapsed_time = (end_time - start_time).total_seconds() logger.error(f"工作流执行失败: execution_id={self.execution_id}, error={e}", exc_info=True) + + # 发送 workflow_end 事件(失败) yield { - "status": "failed", - "error": str(e), - "output": None, - "node_outputs": {}, - "elapsed_time": elapsed_time, - "token_usage": None + "event": "workflow_end", + "data": { + "execution_id": self.execution_id, + "status": "failed", + "error": str(e), + "elapsed_time": elapsed_time, + "timestamp": end_time.isoformat() + } } diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index b48edfdd..87f06c96 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -597,13 +597,7 @@ class WorkflowService: # 更新状态为运行中 self.update_execution_status(execution.execution_id, "running") - # 发送开始事件 - yield format_sse_message("workflow_start", { - "execution_id": execution.execution_id, - "conversation_id_uuid": str(conversation_id_uuid), - }) - - # 调用流式执行 + # 调用流式执行(executor 会发送 workflow_start 和 workflow_end 事件) async for event in self._run_workflow_stream( workflow_config=workflow_config_dict, input_data=input_data, @@ -611,16 +605,8 @@ class WorkflowService: workspace_id="", user_id=payload.user_id ): - # 清理事件数据,移除不可序列化的对象 - cleaned_event = self._clean_event_for_json(event) - # 转换为 SSE 格式 - yield f"data: {json.dumps(cleaned_event)}\n\n" - - # 发送完成事件 - yield format_sse_message("workflow_end", { - "execution_id": execution.execution_id, - "conversation_id_uuid": str(conversation_id_uuid), - }) + # 直接转发 executor 的事件(已经是正确的格式) + yield event except Exception as e: logger.error(f"工作流流式执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True) @@ -630,7 +616,13 @@ class WorkflowService: error_message=str(e) ) # 发送错误事件 - yield f"data: {json.dumps({'type': 'error', 'execution_id': execution.execution_id, 'error': str(e)})}\n\n" + yield { + "event": "error", + "data": { + "execution_id": execution.execution_id, + "error": str(e) + } + } async def run_workflow( self, @@ -801,13 +793,11 @@ class WorkflowService: user_id: 用户 ID Yields: - 流式事件 + 流式事件(格式:{"event": "", "data": {...}}) """ from app.core.workflow.executor import execute_workflow_stream try: - output_data = {} - async for event in execute_workflow_stream( workflow_config=workflow_config, input_data=input_data, @@ -815,31 +805,9 @@ class WorkflowService: workspace_id=workspace_id, user_id=user_id ): - # 转发事件 + # 直接转发事件(executor 已经返回正确格式) yield event - # 收集输出数据 - # if event.get("type") == "node_complete": - # node_data = event.get("data", {}) - # node_outputs = node_data.get("node_outputs", {}) - # output_data.update(node_outputs) - # - # # 处理完成事件 - # if event.get("type") == "workflow_complete": - # self.update_execution_status( - # execution_id, - # "completed", - # output_data=output_data - # ) - # - # # 处理错误事件 - # if event.get("type") == "workflow_error": - # self.update_execution_status( - # execution_id, - # "failed", - # error_message=event.get("error") - # ) - except Exception as e: logger.error(f"工作流流式执行失败: execution_id={execution_id}, error={e}", exc_info=True) self.update_execution_status( @@ -848,9 +816,11 @@ class WorkflowService: error_message=str(e) ) yield { - "type": "workflow_error", - "execution_id": execution_id, - "error": str(e) + "event": "error", + "data": { + "execution_id": execution_id, + "error": str(e) + } }