refactor(workflow): Remove unnecessary workflow_collectroller layer and simplify non-streaming output
This commit is contained in:
@@ -544,10 +544,10 @@ class WorkflowService:
|
||||
return {
|
||||
"execution_id": execution.execution_id,
|
||||
"status": result.get("status"),
|
||||
"variables": result.get("variables"),
|
||||
"messages": result.get("messages"),
|
||||
# "variables": result.get("variables"),
|
||||
# "messages": result.get("messages"),
|
||||
"output": result.get("output"), # 最终输出(字符串)
|
||||
"output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
|
||||
# "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
|
||||
"conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID
|
||||
"error_message": result.get("error"),
|
||||
"elapsed_time": result.get("elapsed_time"),
|
||||
@@ -573,6 +573,7 @@ class WorkflowService:
|
||||
config: WorkflowConfig,
|
||||
workspace_id: uuid.UUID,
|
||||
release_id: Optional[uuid.UUID] = None,
|
||||
public: bool = False
|
||||
):
|
||||
"""运行工作流(流式)
|
||||
|
||||
@@ -582,6 +583,7 @@ class WorkflowService:
|
||||
app_id: 应用 ID
|
||||
payload: 请求对象(包含 message, variables, conversation_id 等)
|
||||
config: 存储类型(可选)
|
||||
public: 是否发布
|
||||
|
||||
Yields:
|
||||
SSE 格式的流式事件
|
||||
@@ -710,134 +712,6 @@ class WorkflowService:
|
||||
}
|
||||
}
|
||||
|
||||
@deprecated(reason="This method is deprecated. "
|
||||
"Please use WorkflowService.run / run_stream instead.")
|
||||
async def run_workflow(
|
||||
self,
|
||||
app_id: uuid.UUID,
|
||||
input_data: dict[str, Any],
|
||||
triggered_by: uuid.UUID,
|
||||
conversation_id: uuid.UUID | None = None,
|
||||
stream: bool = False
|
||||
) -> AsyncGenerator | dict:
|
||||
"""运行工作流
|
||||
|
||||
Args:
|
||||
app_id: 应用 ID
|
||||
input_data: 输入数据(包含 message 和 variables)
|
||||
triggered_by: 触发用户 ID
|
||||
conversation_id: 会话 ID(可选)
|
||||
stream: 是否流式返回
|
||||
|
||||
Returns:
|
||||
执行结果(非流式)或生成器(流式)
|
||||
|
||||
Raises:
|
||||
BusinessException: 配置不存在或执行失败时抛出
|
||||
"""
|
||||
# 1. 获取工作流配置
|
||||
config = self.get_workflow_config(app_id)
|
||||
if not config:
|
||||
raise BusinessException(
|
||||
code=BizCode.NOT_FOUND,
|
||||
message=f"工作流配置不存在: app_id={app_id}"
|
||||
)
|
||||
|
||||
# 2. 创建执行记录
|
||||
execution = self.create_execution(
|
||||
workflow_config_id=config.id,
|
||||
app_id=app_id,
|
||||
trigger_type="manual",
|
||||
triggered_by=triggered_by,
|
||||
conversation_id=conversation_id,
|
||||
input_data=input_data
|
||||
)
|
||||
|
||||
# 3. 构建工作流配置字典
|
||||
workflow_config_dict = {
|
||||
"nodes": config.nodes,
|
||||
"edges": config.edges,
|
||||
"variables": config.variables,
|
||||
"execution_config": config.execution_config
|
||||
}
|
||||
|
||||
# 4. 获取工作空间 ID(从 app 获取)
|
||||
from app.models import App
|
||||
app = self.db.query(App).filter(
|
||||
App.id == app_id,
|
||||
App.is_active.is_(True)
|
||||
).first()
|
||||
if not app:
|
||||
raise BusinessException(
|
||||
code=BizCode.NOT_FOUND,
|
||||
message=f"应用不存在: app_id={app_id}"
|
||||
)
|
||||
|
||||
# 5. 执行工作流
|
||||
from app.core.workflow.executor import execute_workflow
|
||||
|
||||
try:
|
||||
# 更新状态为运行中
|
||||
self.update_execution_status(execution.execution_id, "running")
|
||||
|
||||
if stream:
|
||||
# 流式执行
|
||||
return self._run_workflow_stream(
|
||||
workflow_config_dict,
|
||||
input_data,
|
||||
execution.execution_id,
|
||||
str(app.workspace_id),
|
||||
str(triggered_by)
|
||||
)
|
||||
else:
|
||||
# 非流式执行
|
||||
result = await execute_workflow(
|
||||
workflow_config=workflow_config_dict,
|
||||
input_data=input_data,
|
||||
execution_id=execution.execution_id,
|
||||
workspace_id=str(app.workspace_id),
|
||||
user_id=str(triggered_by)
|
||||
)
|
||||
|
||||
# 更新执行结果
|
||||
if result.get("status") == "completed":
|
||||
token_usage = result.get("data").get("token_usage", {}) or {}
|
||||
self.update_execution_status(
|
||||
execution.execution_id,
|
||||
"completed",
|
||||
output_data=result.get("node_outputs", {}),
|
||||
token_usage=token_usage.get("total_tokens", None)
|
||||
)
|
||||
else:
|
||||
self.update_execution_status(
|
||||
execution.execution_id,
|
||||
"failed",
|
||||
error_message=result.get("error")
|
||||
)
|
||||
|
||||
# 返回增强的响应结构
|
||||
return {
|
||||
"execution_id": execution.execution_id,
|
||||
"status": result.get("status"),
|
||||
"output": result.get("output"), # 最终输出(字符串)
|
||||
"output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
|
||||
"error_message": result.get("error"),
|
||||
"elapsed_time": result.get("elapsed_time"),
|
||||
"token_usage": result.get("token_usage")
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"工作流执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True)
|
||||
self.update_execution_status(
|
||||
execution.execution_id,
|
||||
"failed",
|
||||
error_message=str(e)
|
||||
)
|
||||
raise BusinessException(
|
||||
code=BizCode.INTERNAL_ERROR,
|
||||
message=f"工作流执行失败: {str(e)}"
|
||||
)
|
||||
|
||||
def _clean_event_for_json(self, event: dict[str, Any]) -> dict[str, Any]:
|
||||
"""清理事件数据,移除不可序列化的对象
|
||||
|
||||
@@ -869,72 +743,6 @@ class WorkflowService:
|
||||
|
||||
return clean_value(event)
|
||||
|
||||
@deprecated(reason="This method is deprecated. Please use WorkflowService.run_stream instead.")
|
||||
async def _run_workflow_stream(
|
||||
self,
|
||||
workflow_config: dict[str, Any],
|
||||
input_data: dict[str, Any],
|
||||
execution_id: str,
|
||||
workspace_id: str,
|
||||
user_id: str):
|
||||
"""运行工作流(流式,内部方法)
|
||||
|
||||
Args:
|
||||
workflow_config: 工作流配置
|
||||
input_data: 输入数据
|
||||
execution_id: 执行 ID
|
||||
workspace_id: 工作空间 ID
|
||||
user_id: 用户 ID
|
||||
|
||||
Yields:
|
||||
流式事件(格式:{"event": "<type>", "data": {...}})
|
||||
"""
|
||||
from app.core.workflow.executor import execute_workflow_stream
|
||||
|
||||
try:
|
||||
async for event in execute_workflow_stream(
|
||||
workflow_config=workflow_config,
|
||||
input_data=input_data,
|
||||
execution_id=execution_id,
|
||||
workspace_id=workspace_id,
|
||||
user_id=user_id
|
||||
):
|
||||
# 直接转发事件(executor 已经返回正确格式)
|
||||
if event.get("event") == "workflow_end":
|
||||
token_usage = event.get("data").get("token_usage", {}) or {}
|
||||
status = event.get("data", {}).get("status")
|
||||
if status == "completed":
|
||||
self.update_execution_status(
|
||||
execution_id,
|
||||
"completed",
|
||||
output_data=event.get("data"),
|
||||
token_usage=token_usage.get("total_tokens", None)
|
||||
)
|
||||
elif status == "failed":
|
||||
self.update_execution_status(
|
||||
execution_id,
|
||||
"failed",
|
||||
output_data=event.get("data")
|
||||
)
|
||||
else:
|
||||
logger.error(f"unexpect workflow run status, status: {status}")
|
||||
yield event
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"工作流流式执行失败: execution_id={execution_id}, error={e}", exc_info=True)
|
||||
self.update_execution_status(
|
||||
execution_id,
|
||||
"failed",
|
||||
error_message=str(e)
|
||||
)
|
||||
yield {
|
||||
"event": "error",
|
||||
"data": {
|
||||
"execution_id": execution_id,
|
||||
"error": str(e)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# ==================== 依赖注入函数 ====================
|
||||
|
||||
|
||||
Reference in New Issue
Block a user