From 15fac38e30d1fb60273fcfc3b258d76715b36db8 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Thu, 18 Dec 2025 14:50:10 +0800 Subject: [PATCH] fix(workflow): fix run_workflow streaming issues Resolve exceptions during run_workflow streaming and define proper status codes for error cases. --- api/app/controllers/workflow_controller.py | 2 +- api/app/services/workflow_service.py | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/api/app/controllers/workflow_controller.py b/api/app/controllers/workflow_controller.py index 9ccfa858..091846f6 100644 --- a/api/app/controllers/workflow_controller.py +++ b/api/app/controllers/workflow_controller.py @@ -473,7 +473,7 @@ async def run_workflow( async def event_generator(): """生成 SSE 事件""" try: - async for event in service.run_workflow( + async for event in await service.run_workflow( app_id=app_id, input_data=input_data, triggered_by=current_user.id, diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index f0b71824..fbf09505 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -5,7 +5,7 @@ import json import logging import uuid import datetime -from typing import Any, Annotated +from typing import Any, Annotated, AsyncGenerator from sqlalchemy.orm import Session from fastapi import Depends @@ -81,7 +81,7 @@ class WorkflowService: if not is_valid: logger.warning(f"工作流配置验证失败: {errors}") raise BusinessException( - error_code=BizCode.INVALID_PARAMETER, + code=BizCode.INVALID_PARAMETER, message=f"工作流配置无效: {'; '.join(errors)}" ) @@ -140,7 +140,7 @@ class WorkflowService: config = self.get_workflow_config(app_id) if not config: raise BusinessException( - error_code=BizCode.RESOURCE_NOT_FOUND, + code=BizCode.NOT_FOUND, message=f"工作流配置不存在: app_id={app_id}" ) @@ -166,7 +166,7 @@ class WorkflowService: if not is_valid: logger.warning(f"工作流配置验证失败: {errors}") raise BusinessException( - error_code=BizCode.INVALID_PARAMETER, + code=BizCode.INVALID_PARAMETER, message=f"工作流配置无效: {'; '.join(errors)}" ) @@ -245,7 +245,7 @@ class WorkflowService: config = self.get_workflow_config(app_id) if not config: raise BusinessException( - error_code=BizCode.RESOURCE_NOT_FOUND, + code=BizCode.NOT_FOUND, message=f"工作流配置不存在: app_id={app_id}" ) @@ -359,7 +359,7 @@ class WorkflowService: execution = self.get_execution(execution_id) if not execution: raise BusinessException( - error_code=BizCode.RESOURCE_NOT_FOUND, + code=BizCode.NOT_FOUND, message=f"执行记录不存在: execution_id={execution_id}" ) @@ -640,7 +640,7 @@ class WorkflowService: triggered_by: uuid.UUID, conversation_id: uuid.UUID | None = None, stream: bool = False - ): + ) -> AsyncGenerator | dict: """运行工作流 Args: @@ -660,7 +660,7 @@ class WorkflowService: config = self.get_workflow_config(app_id) if not config: raise BusinessException( - error_code=BizCode.RESOURCE_NOT_FOUND, + code=BizCode.NOT_FOUND, message=f"工作流配置不存在: app_id={app_id}" ) @@ -687,7 +687,7 @@ class WorkflowService: app = self.db.query(App).filter(App.id == app_id).first() if not app: raise BusinessException( - error_code=BizCode.RESOURCE_NOT_FOUND, + code=BizCode.NOT_FOUND, message=f"应用不存在: app_id={app_id}" ) @@ -750,7 +750,7 @@ class WorkflowService: error_message=str(e) ) raise BusinessException( - error_code=BizCode.INTERNAL_ERROR, + code=BizCode.INTERNAL_ERROR, message=f"工作流执行失败: {str(e)}" )