fix(workflow): fix run_workflow streaming issues

Resolve exceptions during run_workflow streaming and define proper status codes for error cases.
This commit is contained in:
mengyonghao
2025-12-18 14:50:10 +08:00
committed by 谢俊男
parent 8c73aa60b9
commit 15fac38e30
2 changed files with 11 additions and 11 deletions

View File

@@ -473,7 +473,7 @@ async def run_workflow(
async def event_generator(): async def event_generator():
"""生成 SSE 事件""" """生成 SSE 事件"""
try: try:
async for event in service.run_workflow( async for event in await service.run_workflow(
app_id=app_id, app_id=app_id,
input_data=input_data, input_data=input_data,
triggered_by=current_user.id, triggered_by=current_user.id,

View File

@@ -5,7 +5,7 @@ import json
import logging import logging
import uuid import uuid
import datetime import datetime
from typing import Any, Annotated from typing import Any, Annotated, AsyncGenerator
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from fastapi import Depends from fastapi import Depends
@@ -81,7 +81,7 @@ class WorkflowService:
if not is_valid: if not is_valid:
logger.warning(f"工作流配置验证失败: {errors}") logger.warning(f"工作流配置验证失败: {errors}")
raise BusinessException( raise BusinessException(
error_code=BizCode.INVALID_PARAMETER, code=BizCode.INVALID_PARAMETER,
message=f"工作流配置无效: {'; '.join(errors)}" message=f"工作流配置无效: {'; '.join(errors)}"
) )
@@ -140,7 +140,7 @@ class WorkflowService:
config = self.get_workflow_config(app_id) config = self.get_workflow_config(app_id)
if not config: if not config:
raise BusinessException( raise BusinessException(
error_code=BizCode.RESOURCE_NOT_FOUND, code=BizCode.NOT_FOUND,
message=f"工作流配置不存在: app_id={app_id}" message=f"工作流配置不存在: app_id={app_id}"
) )
@@ -166,7 +166,7 @@ class WorkflowService:
if not is_valid: if not is_valid:
logger.warning(f"工作流配置验证失败: {errors}") logger.warning(f"工作流配置验证失败: {errors}")
raise BusinessException( raise BusinessException(
error_code=BizCode.INVALID_PARAMETER, code=BizCode.INVALID_PARAMETER,
message=f"工作流配置无效: {'; '.join(errors)}" message=f"工作流配置无效: {'; '.join(errors)}"
) )
@@ -245,7 +245,7 @@ class WorkflowService:
config = self.get_workflow_config(app_id) config = self.get_workflow_config(app_id)
if not config: if not config:
raise BusinessException( raise BusinessException(
error_code=BizCode.RESOURCE_NOT_FOUND, code=BizCode.NOT_FOUND,
message=f"工作流配置不存在: app_id={app_id}" message=f"工作流配置不存在: app_id={app_id}"
) )
@@ -359,7 +359,7 @@ class WorkflowService:
execution = self.get_execution(execution_id) execution = self.get_execution(execution_id)
if not execution: if not execution:
raise BusinessException( raise BusinessException(
error_code=BizCode.RESOURCE_NOT_FOUND, code=BizCode.NOT_FOUND,
message=f"执行记录不存在: execution_id={execution_id}" message=f"执行记录不存在: execution_id={execution_id}"
) )
@@ -640,7 +640,7 @@ class WorkflowService:
triggered_by: uuid.UUID, triggered_by: uuid.UUID,
conversation_id: uuid.UUID | None = None, conversation_id: uuid.UUID | None = None,
stream: bool = False stream: bool = False
): ) -> AsyncGenerator | dict:
"""运行工作流 """运行工作流
Args: Args:
@@ -660,7 +660,7 @@ class WorkflowService:
config = self.get_workflow_config(app_id) config = self.get_workflow_config(app_id)
if not config: if not config:
raise BusinessException( raise BusinessException(
error_code=BizCode.RESOURCE_NOT_FOUND, code=BizCode.NOT_FOUND,
message=f"工作流配置不存在: app_id={app_id}" message=f"工作流配置不存在: app_id={app_id}"
) )
@@ -687,7 +687,7 @@ class WorkflowService:
app = self.db.query(App).filter(App.id == app_id).first() app = self.db.query(App).filter(App.id == app_id).first()
if not app: if not app:
raise BusinessException( raise BusinessException(
error_code=BizCode.RESOURCE_NOT_FOUND, code=BizCode.NOT_FOUND,
message=f"应用不存在: app_id={app_id}" message=f"应用不存在: app_id={app_id}"
) )
@@ -750,7 +750,7 @@ class WorkflowService:
error_message=str(e) error_message=str(e)
) )
raise BusinessException( raise BusinessException(
error_code=BizCode.INTERNAL_ERROR, code=BizCode.INTERNAL_ERROR,
message=f"工作流执行失败: {str(e)}" message=f"工作流执行失败: {str(e)}"
) )