fix(workflow): fix workflow state not updating correctly after streaming runs

This commit is contained in:
mengyonghao
2026-01-14 10:46:33 +08:00
parent 4448296e7b
commit 7438fedd6b

View File

@@ -491,6 +491,17 @@ class WorkflowService:
)
end_user_id = str(new_end_user.id)
executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid)
for exec_res in executions:
if exec_res.status == "completed":
last_state = exec_res.output_data
if isinstance(last_state, dict):
variables = last_state.get("variables", {})
conv_vars = variables.get("conv", {})
input_data["conv"] = conv_vars
break
result = await execute_workflow(
workflow_config=workflow_config_dict,
input_data=input_data,
@@ -504,7 +515,7 @@ class WorkflowService:
self.update_execution_status(
execution.execution_id,
"completed",
output_data=result.get("node_outputs", {})
output_data=result
)
else:
self.update_execution_status(
@@ -517,6 +528,7 @@ class WorkflowService:
return {
"execution_id": execution.execution_id,
"status": result.get("status"),
"variables": result.get("variables"),
"output": result.get("output"), # 最终输出(字符串)
"output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
"conversation_id": result.get("conversation_id"), # 所有节点输出详细数据payload., # 会话 ID
@@ -617,6 +629,16 @@ class WorkflowService:
original_user_id=payload.user_id # Save original user_id to other_id
)
end_user_id = str(new_end_user.id)
executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid)
for exec_res in executions:
if exec_res.status == "completed":
last_state = exec_res.output_data
if isinstance(last_state, dict):
variables = last_state.get("variables", {})
conv_vars = variables.get("conv", {})
input_data["conv"] = conv_vars
break
# 调用流式执行executor 会发送 workflow_start 和 workflow_end 事件)
async for event in self._run_workflow_stream(
@@ -827,6 +849,23 @@ class WorkflowService:
user_id=user_id
):
# 直接转发事件executor 已经返回正确格式)
if event.get("event") == "workflow_end":
status = event.get("data", {}).get("status")
if status == "completed":
self.update_execution_status(
execution_id,
"completed",
output_data=event.get("data")
)
elif status == "failed":
self.update_execution_status(
execution_id,
"failed",
output_data=event.get("data")
)
else:
logger.error(f"unexpect workflow run status, status: {status}")
yield event
except Exception as e: