[modify] sse format

This commit is contained in:
Mark
2025-12-20 17:45:58 +08:00
parent 43a427bac7
commit fafbe72ce2
4 changed files with 139 additions and 87 deletions

View File

@@ -583,15 +583,27 @@ async def draft_run(
)
async def event_generator():
"""工作流事件生成器"""
# 调用多智能体服务的流式方法
"""工作流事件生成器
将事件转换为标准 SSE 格式:
event: <event_type>
data: <json_data>
"""
import json
# 调用工作流服务的流式方法
async for event in workflow_service.run_stream(
app_id=app_id,
payload=payload,
config=config
):
yield event
# 提取事件类型和数据
event_type = event.get("event", "message")
event_data = event.get("data", {})
# 转换为标准 SSE 格式(字符串)
sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n"
yield sse_message
return StreamingResponse(
event_generator(),

View File

@@ -471,7 +471,20 @@ async def run_workflow(
import json
async def event_generator():
"""生成 SSE 事件"""
"""生成 SSE 事件
SSE 格式:
event: <event_type>
data: <json_data>
支持的事件类型:
- workflow_start: 工作流开始
- workflow_end: 工作流结束
- node_start: 节点开始执行
- node_end: 节点执行完成
- node_chunk: 中间节点的流式输出
- message: 最终消息的流式输出End 节点及其相邻节点)
"""
try:
async for event in service.run_workflow(
app_id=app_id,
@@ -480,19 +493,30 @@ async def run_workflow(
conversation_id=uuid.UUID(request.conversation_id) if request.conversation_id else None,
stream=True
):
# 转换为 SSE 格式
yield f"data: {json.dumps(event)}\n\n"
# 提取事件类型和数据
event_type = event.get("event", "message")
event_data = event.get("data", {})
# 转换为标准 SSE 格式(字符串)
# event: <type>
# data: <json>
sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n"
yield sse_message
except Exception as e:
logger.error(f"流式执行异常: {e}", exc_info=True)
error_event = {
"type": "error",
"error": str(e)
}
yield f"data: {json.dumps(error_event)}\n\n"
# 发送错误事件
sse_error = f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
yield sse_error
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # 禁用 nginx 缓冲
}
)
else:
# 非流式执行

View File

@@ -375,17 +375,32 @@ class WorkflowExecutor:
使用多个 stream_mode 来获取:
1. "updates" - 节点的 state 更新和流式 chunk
2. "debug" - 节点执行的详细信息(开始/完成时间)
3. "custom" - 自定义流式数据chunks
Args:
input_data: 输入数据
Yields:
流式事件
流式事件,格式:
{
"event": "workflow_start" | "workflow_end" | "node_start" | "node_end" | "node_chunk" | "message",
"data": {...}
}
"""
logger.info(f"开始执行工作流(流式): execution_id={self.execution_id}")
# 记录开始时间
start_time = datetime.datetime.now()
# 发送 workflow_start 事件
yield {
"event": "workflow_start",
"data": {
"execution_id": self.execution_id,
"workspace_id": self.workspace_id,
"timestamp": start_time.isoformat()
}
}
# 1. 构建图
graph = self.build_graph(True)
@@ -396,6 +411,8 @@ class WorkflowExecutor:
# 3. Execute workflow
try:
chunk_count = 0
final_state = None
async for event in graph.astream(
initial_state,
stream_mode=["updates", "debug", "custom"], # Use updates + debug + custom mode
@@ -412,16 +429,19 @@ class WorkflowExecutor:
if mode == "custom":
# Handle custom streaming events (chunks from nodes via stream writer)
chunk_count += 1
event_type = data.get("type", "node_chunk") # 默认为 node_chunk
event_type = data.get("type", "node_chunk") # "message" or "node_chunk"
logger.info(f"[CUSTOM] ✅ 收到 {event_type} #{chunk_count} from {data.get('node_id')}")
yield {
"type": event_type, # "message" or "node_chunk"
"node_id": data.get("node_id"),
"chunk": data.get("chunk"),
"full_content": data.get("full_content"),
"chunk_index": data.get("chunk_index"),
"is_prefix": data.get("is_prefix"),
"is_suffix": data.get("is_suffix")
"event": event_type, # "message" or "node_chunk"
"data": {
"node_id": data.get("node_id"),
"chunk": data.get("chunk"),
"full_content": data.get("full_content"),
"chunk_index": data.get("chunk_index"),
"is_prefix": data.get("is_prefix"),
"is_suffix": data.get("is_suffix")
}
}
elif mode == "debug":
@@ -438,12 +458,15 @@ class WorkflowExecutor:
conversation_id = variables_sys.get("conversation_id")
execution_id = variables_sys.get("execution_id")
logger.info(f"[DEBUG] Node starts execution: {node_name}")
yield {
"type": "node_start",
"node_id": node_name,
"conversation_id": conversation_id,
"execution_id": execution_id,
"timestamp": data.get("timestamp")
"event": "node_start",
"data": {
"node_id": node_name,
"conversation_id": conversation_id,
"execution_id": execution_id,
"timestamp": data.get("timestamp")
}
}
elif event_type == "task_result":
# Node execution completed
@@ -454,19 +477,38 @@ class WorkflowExecutor:
conversation_id = variables_sys.get("conversation_id")
execution_id = variables_sys.get("execution_id")
logger.info(f"[DEBUG] Node execution completed: {node_name}")
yield {
"type": "node_complete",
"node_id": node_name,
"conversation_id": conversation_id,
"execution_id": execution_id,
"timestamp": data.get("timestamp")
"event": "node_end",
"data": {
"node_id": node_name,
"conversation_id": conversation_id,
"execution_id": execution_id,
"timestamp": data.get("timestamp")
}
}
elif mode == "updates":
# Handle state updates
# Handle state updates - store final state
logger.debug(f"[UPDATES] 收到 state 更新 from {list(data.keys())}")
final_state = data
logger.info(f"Workflow execution completed (streaming), total chunks: {chunk_count}")
# 计算耗时
end_time = datetime.datetime.now()
elapsed_time = (end_time - start_time).total_seconds()
logger.info(f"Workflow execution completed (streaming), total chunks: {chunk_count}, elapsed: {elapsed_time:.2f}s")
# 发送 workflow_end 事件
yield {
"event": "workflow_end",
"data": {
"execution_id": self.execution_id,
"status": "completed",
"elapsed_time": elapsed_time,
"timestamp": end_time.isoformat()
}
}
except Exception as e:
# 计算耗时(即使失败也记录)
@@ -474,13 +516,17 @@ class WorkflowExecutor:
elapsed_time = (end_time - start_time).total_seconds()
logger.error(f"工作流执行失败: execution_id={self.execution_id}, error={e}", exc_info=True)
# 发送 workflow_end 事件(失败)
yield {
"status": "failed",
"error": str(e),
"output": None,
"node_outputs": {},
"elapsed_time": elapsed_time,
"token_usage": None
"event": "workflow_end",
"data": {
"execution_id": self.execution_id,
"status": "failed",
"error": str(e),
"elapsed_time": elapsed_time,
"timestamp": end_time.isoformat()
}
}

View File

@@ -597,13 +597,7 @@ class WorkflowService:
# 更新状态为运行中
self.update_execution_status(execution.execution_id, "running")
# 发送开始事件
yield format_sse_message("workflow_start", {
"execution_id": execution.execution_id,
"conversation_id_uuid": str(conversation_id_uuid),
})
# 调用流式执行
# 调用流式执行executor 会发送 workflow_start 和 workflow_end 事件
async for event in self._run_workflow_stream(
workflow_config=workflow_config_dict,
input_data=input_data,
@@ -611,16 +605,8 @@ class WorkflowService:
workspace_id="",
user_id=payload.user_id
):
# 清理事件数据,移除不可序列化的对象
cleaned_event = self._clean_event_for_json(event)
# 转换为 SSE 格式
yield f"data: {json.dumps(cleaned_event)}\n\n"
# 发送完成事件
yield format_sse_message("workflow_end", {
"execution_id": execution.execution_id,
"conversation_id_uuid": str(conversation_id_uuid),
})
# 直接转发 executor 的事件(已经是正确的格式)
yield event
except Exception as e:
logger.error(f"工作流流式执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True)
@@ -630,7 +616,13 @@ class WorkflowService:
error_message=str(e)
)
# 发送错误事件
yield f"data: {json.dumps({'type': 'error', 'execution_id': execution.execution_id, 'error': str(e)})}\n\n"
yield {
"event": "error",
"data": {
"execution_id": execution.execution_id,
"error": str(e)
}
}
async def run_workflow(
self,
@@ -801,13 +793,11 @@ class WorkflowService:
user_id: 用户 ID
Yields:
流式事件
流式事件(格式:{"event": "<type>", "data": {...}}
"""
from app.core.workflow.executor import execute_workflow_stream
try:
output_data = {}
async for event in execute_workflow_stream(
workflow_config=workflow_config,
input_data=input_data,
@@ -815,31 +805,9 @@ class WorkflowService:
workspace_id=workspace_id,
user_id=user_id
):
# 转发事件
# 直接转发事件executor 已经返回正确格式)
yield event
# 收集输出数据
# if event.get("type") == "node_complete":
# node_data = event.get("data", {})
# node_outputs = node_data.get("node_outputs", {})
# output_data.update(node_outputs)
#
# # 处理完成事件
# if event.get("type") == "workflow_complete":
# self.update_execution_status(
# execution_id,
# "completed",
# output_data=output_data
# )
#
# # 处理错误事件
# if event.get("type") == "workflow_error":
# self.update_execution_status(
# execution_id,
# "failed",
# error_message=event.get("error")
# )
except Exception as e:
logger.error(f"工作流流式执行失败: execution_id={execution_id}, error={e}", exc_info=True)
self.update_execution_status(
@@ -848,9 +816,11 @@ class WorkflowService:
error_message=str(e)
)
yield {
"type": "workflow_error",
"execution_id": execution_id,
"error": str(e)
"event": "error",
"data": {
"execution_id": execution_id,
"error": str(e)
}
}