diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index 68d6279b..7d3c784f 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -491,6 +491,17 @@ class WorkflowService: ) end_user_id = str(new_end_user.id) + executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid) + + for exec_res in executions: + if exec_res.status == "completed": + last_state = exec_res.output_data + if isinstance(last_state, dict): + variables = last_state.get("variables", {}) + conv_vars = variables.get("conv", {}) + input_data["conv"] = conv_vars + break + result = await execute_workflow( workflow_config=workflow_config_dict, input_data=input_data, @@ -504,7 +515,7 @@ class WorkflowService: self.update_execution_status( execution.execution_id, "completed", - output_data=result.get("node_outputs", {}) + output_data=result ) else: self.update_execution_status( @@ -517,6 +528,7 @@ class WorkflowService: return { "execution_id": execution.execution_id, "status": result.get("status"), + "variables": result.get("variables"), "output": result.get("output"), # 最终输出(字符串) "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) "conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID @@ -617,6 +629,16 @@ class WorkflowService: original_user_id=payload.user_id # Save original user_id to other_id ) end_user_id = str(new_end_user.id) + executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid) + + for exec_res in executions: + if exec_res.status == "completed": + last_state = exec_res.output_data + if isinstance(last_state, dict): + variables = last_state.get("variables", {}) + conv_vars = variables.get("conv", {}) + input_data["conv"] = conv_vars + break # 调用流式执行(executor 会发送 workflow_start 和 workflow_end 事件) async for event in self._run_workflow_stream( @@ -827,6 +849,23 @@ class WorkflowService: user_id=user_id ): # 直接转发事件(executor 已经返回正确格式) + if event.get("event") == "workflow_end": + + status = event.get("data", {}).get("status") + if status == "completed": + self.update_execution_status( + execution_id, + "completed", + output_data=event.get("data") + ) + elif status == "failed": + self.update_execution_status( + execution_id, + "failed", + output_data=event.get("data") + ) + else: + logger.error(f"unexpect workflow run status, status: {status}") yield event except Exception as e: