diff --git a/api/app/controllers/__init__.py b/api/app/controllers/__init__.py index 765ef967..42d7fe87 100644 --- a/api/app/controllers/__init__.py +++ b/api/app/controllers/__init__.py @@ -39,7 +39,6 @@ from . import ( upload_controller, user_controller, user_memory_controllers, - workflow_controller, workspace_controller, memory_forget_controller, home_page_controller, @@ -78,7 +77,6 @@ manager_router.include_router(release_share_controller.router) manager_router.include_router(public_share_controller.router) # 公开路由(无需认证) manager_router.include_router(memory_dashboard_controller.router) manager_router.include_router(multi_agent_controller.router) -manager_router.include_router(workflow_controller.router) manager_router.include_router(emotion_controller.router) manager_router.include_router(emotion_config_controller.router) manager_router.include_router(prompt_optimizer_controller.router) diff --git a/api/app/controllers/workflow_controller.py b/api/app/controllers/workflow_controller.py deleted file mode 100644 index 8a15f717..00000000 --- a/api/app/controllers/workflow_controller.py +++ /dev/null @@ -1,610 +0,0 @@ -""" -工作流 API 控制器 -""" - -import logging -import uuid -from typing import Annotated - -from fastapi import APIRouter, Depends, Path, Query -from sqlalchemy.orm import Session - -from app.db import get_db -from app.dependencies import get_current_user, cur_workspace_access_guard - -from app.models.user_model import User -from app.models.app_model import App -from app.services.workflow_service import WorkflowService, get_workflow_service -from app.schemas.workflow_schema import ( - WorkflowConfigCreate, - WorkflowConfigUpdate, - WorkflowConfig, - WorkflowValidationResponse, - WorkflowExecution, - WorkflowNodeExecution, - WorkflowExecutionRequest, - WorkflowExecutionResponse -) -from app.core.response_utils import success, fail -from app.core.exceptions import BusinessException -from app.core.error_codes import BizCode - -logger = logging.getLogger(__name__) - -router = APIRouter(prefix="/apps", tags=["workflow"]) - - -# ==================== 工作流配置管理 ==================== - -@router.post("/{app_id}/workflow") -@cur_workspace_access_guard() -async def create_workflow_config( - app_id: Annotated[uuid.UUID, Path(description="应用 ID")], - config: WorkflowConfigCreate, - db: Annotated[Session, Depends(get_db)], - current_user: Annotated[User, Depends(get_current_user)], - service: Annotated[WorkflowService, Depends(get_workflow_service)] -): - """创建工作流配置 - - 创建或更新应用的工作流配置。配置会进行基础验证,但允许保存不完整的配置(草稿)。 - """ - try: - # 验证应用是否存在且属于当前工作空间 - app = db.query(App).filter( - App.id == app_id, - App.workspace_id == current_user.current_workspace_id, - App.is_active.is_(True) - ).first() - - if not app: - return fail( - code=BizCode.NOT_FOUND, - msg="应用不存在或无权访问" - ) - - # 验证应用类型 - if app.type != "workflow": - return fail( - code=BizCode.INVALID_PARAMETER, - msg=f"应用类型必须为 workflow,当前为 {app.type}" - ) - - # 创建工作流配置 - workflow_config = service.create_workflow_config( - app_id=app_id, - nodes=[node.model_dump() for node in config.nodes], - edges=[edge.model_dump() for edge in config.edges], - variables=[var.model_dump() for var in config.variables], - execution_config=config.execution_config.model_dump(), - triggers=[trigger.model_dump() for trigger in config.triggers], - validate=True # 进行基础验证 - ) - - return success( - data=WorkflowConfig.model_validate(workflow_config), - msg="工作流配置创建成功" - ) - - except BusinessException as e: - logger.warning(f"创建工作流配置失败: {e.message}") - return fail(code=e.error_code, msg=e.message) - except Exception as e: - logger.error(f"创建工作流配置异常: {e}", exc_info=True) - return fail( - code=BizCode.INTERNAL_ERROR, - msg=f"创建工作流配置失败: {str(e)}" - ) - - -# -# @router.get("/{app_id}/workflow") -# async def get_workflow_config( -# app_id: Annotated[uuid.UUID, Path(description="应用 ID")], -# db: Annotated[Session, Depends(get_db)], -# current_user: Annotated[User, Depends(get_current_user)] -# -# ): -# """获取工作流配置 -# -# 获取应用的工作流配置详情。 -# """ -# try: -# # 验证应用是否存在且属于当前工作空间 -# app = db.query(App).filter( -# App.id == app_id, -# App.workspace_id == current_user.current_workspace_id, -# App.is_active == True -# ).first() -# -# if not app: -# return fail( -# code=BizCode.NOT_FOUND, -# msg="应用不存在或无权访问" -# ) -# -# # 获取工作流配置 -# service = WorkflowService(db) -# workflow_config = service.get_workflow_config(app_id) -# -# if not workflow_config: -# return fail( -# code=BizCode.NOT_FOUND, -# msg="工作流配置不存在" -# ) -# -# return success( -# data=WorkflowConfig.model_validate(workflow_config) -# ) -# -# except Exception as e: -# logger.error(f"获取工作流配置异常: {e}", exc_info=True) -# return fail( -# code=BizCode.INTERNAL_ERROR, -# msg=f"获取工作流配置失败: {str(e)}" -# ) - - -# @router.put("/{app_id}/workflow") -# async def update_workflow_config( -# app_id: Annotated[uuid.UUID, Path(description="应用 ID")], -# config: WorkflowConfigUpdate, -# db: Annotated[Session, Depends(get_db)], -# current_user: Annotated[User, Depends(get_current_user)], -# service: Annotated[WorkflowService, Depends(get_workflow_service)] -# ): -# """更新工作流配置 - -# 更新应用的工作流配置。可以部分更新,未提供的字段保持不变。 -# """ -# try: -# # 验证应用是否存在且属于当前工作空间 -# app = db.query(App).filter( -# App.id == app_id, -# App.workspace_id == current_user.current_workspace_id, -# App.is_active == True -# ).first() - -# if not app: -# return fail( -# code=BizCode.NOT_FOUND, -# msg="应用不存在或无权访问" -# ) - -# # 更新工作流配置 -# workflow_config = service.update_workflow_config( -# app_id=app_id, -# nodes=[node.model_dump() for node in config.nodes] if config.nodes else None, -# edges=[edge.model_dump() for edge in config.edges] if config.edges else None, -# variables=[var.model_dump() for var in config.variables] if config.variables else None, -# execution_config=config.execution_config.model_dump() if config.execution_config else None, -# triggers=[trigger.model_dump() for trigger in config.triggers] if config.triggers else None, -# validate=True -# ) - -# return success( -# data=WorkflowConfig.model_validate(workflow_config), -# msg="工作流配置更新成功" -# ) - -# except BusinessException as e: -# logger.warning(f"更新工作流配置失败: {e.message}") -# return fail(code=e.error_code, msg=e.message) -# except Exception as e: -# logger.error(f"更新工作流配置异常: {e}", exc_info=True) -# return fail( -# code=BizCode.INTERNAL_ERROR, -# msg=f"更新工作流配置失败: {str(e)}" -# ) - - -@router.delete("/{app_id}/workflow") -async def delete_workflow_config( - app_id: Annotated[uuid.UUID, Path(description="应用 ID")], - db: Annotated[Session, Depends(get_db)], - current_user: Annotated[User, Depends(get_current_user)], - service: Annotated[WorkflowService, Depends(get_workflow_service)] -): - """删除工作流配置 - - 删除应用的工作流配置。 - """ - try: - # 验证应用是否存在且属于当前工作空间 - app = db.query(App).filter( - App.id == app_id, - App.workspace_id == current_user.current_workspace_id, - App.is_active.is_(True) - ).first() - - if not app: - return fail( - code=BizCode.NOT_FOUND, - msg="应用不存在或无权访问" - ) - - # 删除工作流配置 - deleted = service.delete_workflow_config(app_id) - - if not deleted: - return fail( - code=BizCode.NOT_FOUND, - msg="工作流配置不存在" - ) - - return success(msg="工作流配置删除成功") - - except Exception as e: - logger.error(f"删除工作流配置异常: {e}", exc_info=True) - return fail( - code=BizCode.INTERNAL_ERROR, - msg=f"删除工作流配置失败: {str(e)}" - ) - - -@router.post("/{app_id}/workflow/validate") -async def validate_workflow_config( - app_id: Annotated[uuid.UUID, Path(description="应用 ID")], - db: Annotated[Session, Depends(get_db)], - current_user: Annotated[User, Depends(get_current_user)], - service: Annotated[WorkflowService, Depends(get_workflow_service)], - for_publish: Annotated[bool, Query(description="是否为发布验证")] = False -): - """验证工作流配置 - - 验证工作流配置是否有效。可以选择是否进行发布级别的严格验证。 - """ - try: - # 验证应用是否存在且属于当前工作空间 - app = db.query(App).filter( - App.id == app_id, - App.workspace_id == current_user.current_workspace_id, - App.is_active.is_(True) - ).first() - - if not app: - return fail( - code=BizCode.NOT_FOUND, - msg="应用不存在或无权访问" - ) - - # 验证工作流配置 - - if for_publish: - is_valid, errors = service.validate_workflow_config_for_publish(app_id) - else: - workflow_config = service.get_workflow_config(app_id) - if not workflow_config: - return fail( - code=BizCode.NOT_FOUND, - msg="工作流配置不存在" - ) - - from app.core.workflow.validator import validate_workflow_config as validate_config - config_dict = { - "nodes": workflow_config.nodes, - "edges": workflow_config.edges, - "variables": workflow_config.variables, - "execution_config": workflow_config.execution_config, - "triggers": workflow_config.triggers - } - is_valid, errors = validate_config(config_dict, for_publish=False) - - return success( - data=WorkflowValidationResponse( - is_valid=is_valid, - errors=errors, - warnings=[] - ) - ) - - except BusinessException as e: - logger.warning(f"验证工作流配置失败: {e.message}") - return fail(code=e.error_code, msg=e.message) - except Exception as e: - logger.error(f"验证工作流配置异常: {e}", exc_info=True) - return fail( - code=BizCode.INTERNAL_ERROR, - msg=f"验证工作流配置失败: {str(e)}" - ) - - -# ==================== 工作流执行管理 ==================== - -@router.get("/{app_id}/workflow/executions") -async def get_workflow_executions( - app_id: Annotated[uuid.UUID, Path(description="应用 ID")], - db: Annotated[Session, Depends(get_db)], - current_user: Annotated[User, Depends(get_current_user)], - service: Annotated[WorkflowService, Depends(get_workflow_service)], - limit: Annotated[int, Query(ge=1, le=100)] = 50, - offset: Annotated[int, Query(ge=0)] = 0 -): - """获取工作流执行记录列表 - - 获取应用的工作流执行历史记录。 - """ - try: - # 验证应用是否存在且属于当前工作空间 - app = db.query(App).filter( - App.id == app_id, - App.workspace_id == current_user.current_workspace_id, - App.is_active.is_(True) - ).first() - - if not app: - return fail( - code=BizCode.NOT_FOUND, - msg="应用不存在或无权访问" - ) - - # 获取执行记录 - executions = service.get_executions_by_app(app_id, limit, offset) - - # 获取统计信息 - statistics = service.get_execution_statistics(app_id) - - return success( - data={ - "executions": [WorkflowExecution.model_validate(e) for e in executions], - "statistics": statistics, - "pagination": { - "limit": limit, - "offset": offset, - "total": statistics["total"] - } - } - ) - - except Exception as e: - logger.error(f"获取工作流执行记录异常: {e}", exc_info=True) - return fail( - code=BizCode.INTERNAL_ERROR, - msg=f"获取工作流执行记录失败: {str(e)}" - ) - - -@router.get("/workflow/executions/{execution_id}") -async def get_workflow_execution( - execution_id: Annotated[str, Path(description="执行 ID")], - db: Annotated[Session, Depends(get_db)], - current_user: Annotated[User, Depends(get_current_user)], - service: Annotated[WorkflowService, Depends(get_workflow_service)] -): - """获取工作流执行详情 - - 获取单个工作流执行的详细信息,包括所有节点的执行记录。 - """ - try: - # 获取执行记录 - execution = service.get_execution(execution_id) - - if not execution: - return fail( - code=BizCode.NOT_FOUND, - msg="执行记录不存在" - ) - - # 验证应用是否属于当前工作空间 - app = db.query(App).filter( - App.id == execution.app_id, - App.workspace_id == current_user.current_workspace_id, - App.is_active.is_(True) - ).first() - - if not app: - return fail( - code=BizCode.NOT_FOUND, - msg="无权访问该执行记录" - ) - - # 获取节点执行记录 - node_executions = service.node_execution_repo.get_by_execution_id(execution.id) - - return success( - data={ - "execution": WorkflowExecution.model_validate(execution), - "node_executions": [ - WorkflowNodeExecution.model_validate(ne) for ne in node_executions - ] - } - ) - - except Exception as e: - logger.error(f"获取工作流执行详情异常: {e}", exc_info=True) - return fail( - code=BizCode.INTERNAL_ERROR, - msg=f"获取工作流执行详情失败: {str(e)}" - ) - - -# ==================== 工作流执行 ==================== -@router.post("/{app_id}/workflow/run") -async def run_workflow( - app_id: Annotated[uuid.UUID, Path(description="应用 ID")], - request: WorkflowExecutionRequest, - db: Annotated[Session, Depends(get_db)], - current_user: Annotated[User, Depends(get_current_user)], - service: Annotated[WorkflowService, Depends(get_workflow_service)] -): - """执行工作流 - - 执行工作流并返回结果。支持流式和非流式两种模式。 - - **非流式模式**:等待工作流执行完成后返回完整结果。 - - **流式模式**:实时返回执行过程中的事件(节点开始、节点完成、工作流完成等)。 - """ - try: - # 验证应用是否存在且属于当前工作空间 - app = db.query(App).filter( - App.id == app_id, - App.workspace_id == current_user.current_workspace_id, - App.is_active.is_(True) - ).first() - - if not app: - return fail( - code=BizCode.NOT_FOUND, - msg="应用不存在或无权访问" - ) - - # 验证应用类型 - if app.type != "workflow": - return fail( - code=BizCode.INVALID_PARAMETER, - msg=f"应用类型必须为 workflow,当前为 {app.type}" - ) - - # 准备输入数据 - input_data = { - "message": request.message or "", - "variables": request.variables - } - - # 执行工作流 - - if request.stream: - # 流式执行 - from fastapi.responses import StreamingResponse - import json - - async def event_generator(): - """生成 SSE 事件 - - SSE 格式: - event: - data: - - 支持的事件类型: - - workflow_start: 工作流开始 - - workflow_end: 工作流结束 - - node_start: 节点开始执行 - - node_end: 节点执行完成 - - node_chunk: 中间节点的流式输出 - - message: 最终消息的流式输出(End 节点及其相邻节点) - """ - try: - async for event in await service.run_workflow( - app_id=app_id, - input_data=input_data, - triggered_by=current_user.id, - conversation_id=uuid.UUID(request.conversation_id) if request.conversation_id else None, - stream=True - ): - # 提取事件类型和数据 - event_type = event.get("event", "message") - event_data = event.get("data", {}) - - # 转换为标准 SSE 格式(字符串) - # event: - # data: - sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n" - yield sse_message - - except Exception as e: - logger.error(f"流式执行异常: {e}", exc_info=True) - # 发送错误事件 - sse_error = f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" - yield sse_error - - return StreamingResponse( - event_generator(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "X-Accel-Buffering": "no" # 禁用 nginx 缓冲 - } - ) - else: - # 非流式执行 - result = await service.run_workflow( - app_id=app_id, - input_data=input_data, - triggered_by=current_user.id, - conversation_id=uuid.UUID(request.conversation_id) if request.conversation_id else None, - stream=False - ) - - return success( - data=WorkflowExecutionResponse( - execution_id=result["execution_id"], - status=result["status"], - output=result.get("output"), - output_data=result.get("output_data"), - error_message=result.get("error_message"), - elapsed_time=result.get("elapsed_time"), - token_usage=result.get("token_usage") - ), - msg="工作流执行完成" - ) - - except BusinessException as e: - logger.warning(f"执行工作流失败: {e.message}") - return fail(code=e.error_code, msg=e.message) - except Exception as e: - logger.error(f"执行工作流异常: {e}", exc_info=True) - return fail( - code=BizCode.INTERNAL_ERROR, - msg=f"执行工作流失败: {str(e)}" - ) - - -@router.post("/workflow/executions/{execution_id}/cancel") -async def cancel_workflow_execution( - execution_id: Annotated[str, Path(description="执行 ID")], - db: Annotated[Session, Depends(get_db)], - current_user: Annotated[User, Depends(get_current_user)], - service: Annotated[WorkflowService, Depends(get_workflow_service)] -): - """取消工作流执行 - - 取消正在运行的工作流执行。 - - **注意**:当前版本仅更新状态为 cancelled,实际的执行取消功能待实现。 - """ - try: - # 获取执行记录 - execution = service.get_execution(execution_id) - - if not execution: - return fail( - code=BizCode.NOT_FOUND, - msg="执行记录不存在" - ) - - # 验证应用是否属于当前工作空间 - app = db.query(App).filter( - App.id == execution.app_id, - App.workspace_id == current_user.current_workspace_id, - App.is_active.is_(True) - ).first() - - if not app: - return fail( - code=BizCode.NOT_FOUND, - msg="无权访问该执行记录" - ) - - # 检查执行状态 - if execution.status not in ["pending", "running"]: - return fail( - code=BizCode.INVALID_PARAMETER, - msg=f"无法取消状态为 {execution.status} 的执行" - ) - - # 更新状态为 cancelled - service.update_execution_status(execution_id, "cancelled") - - return success(msg="工作流执行已取消") - - except BusinessException as e: - logger.warning(f"取消工作流执行失败: {e.message}") - return fail(code=e.code, msg=e.message) - except Exception as e: - logger.error(f"取消工作流执行异常: {e}", exc_info=True) - return fail( - code=BizCode.INTERNAL_ERROR, - msg=f"取消工作流执行失败: {str(e)}" - ) diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index 2958f4f9..8ce5fa37 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -544,10 +544,10 @@ class WorkflowService: return { "execution_id": execution.execution_id, "status": result.get("status"), - "variables": result.get("variables"), - "messages": result.get("messages"), + # "variables": result.get("variables"), + # "messages": result.get("messages"), "output": result.get("output"), # 最终输出(字符串) - "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) + # "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) "conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID "error_message": result.get("error"), "elapsed_time": result.get("elapsed_time"), @@ -573,6 +573,7 @@ class WorkflowService: config: WorkflowConfig, workspace_id: uuid.UUID, release_id: Optional[uuid.UUID] = None, + public: bool = False ): """运行工作流(流式) @@ -582,6 +583,7 @@ class WorkflowService: app_id: 应用 ID payload: 请求对象(包含 message, variables, conversation_id 等) config: 存储类型(可选) + public: 是否发布 Yields: SSE 格式的流式事件 @@ -710,134 +712,6 @@ class WorkflowService: } } - @deprecated(reason="This method is deprecated. " - "Please use WorkflowService.run / run_stream instead.") - async def run_workflow( - self, - app_id: uuid.UUID, - input_data: dict[str, Any], - triggered_by: uuid.UUID, - conversation_id: uuid.UUID | None = None, - stream: bool = False - ) -> AsyncGenerator | dict: - """运行工作流 - - Args: - app_id: 应用 ID - input_data: 输入数据(包含 message 和 variables) - triggered_by: 触发用户 ID - conversation_id: 会话 ID(可选) - stream: 是否流式返回 - - Returns: - 执行结果(非流式)或生成器(流式) - - Raises: - BusinessException: 配置不存在或执行失败时抛出 - """ - # 1. 获取工作流配置 - config = self.get_workflow_config(app_id) - if not config: - raise BusinessException( - code=BizCode.NOT_FOUND, - message=f"工作流配置不存在: app_id={app_id}" - ) - - # 2. 创建执行记录 - execution = self.create_execution( - workflow_config_id=config.id, - app_id=app_id, - trigger_type="manual", - triggered_by=triggered_by, - conversation_id=conversation_id, - input_data=input_data - ) - - # 3. 构建工作流配置字典 - workflow_config_dict = { - "nodes": config.nodes, - "edges": config.edges, - "variables": config.variables, - "execution_config": config.execution_config - } - - # 4. 获取工作空间 ID(从 app 获取) - from app.models import App - app = self.db.query(App).filter( - App.id == app_id, - App.is_active.is_(True) - ).first() - if not app: - raise BusinessException( - code=BizCode.NOT_FOUND, - message=f"应用不存在: app_id={app_id}" - ) - - # 5. 执行工作流 - from app.core.workflow.executor import execute_workflow - - try: - # 更新状态为运行中 - self.update_execution_status(execution.execution_id, "running") - - if stream: - # 流式执行 - return self._run_workflow_stream( - workflow_config_dict, - input_data, - execution.execution_id, - str(app.workspace_id), - str(triggered_by) - ) - else: - # 非流式执行 - result = await execute_workflow( - workflow_config=workflow_config_dict, - input_data=input_data, - execution_id=execution.execution_id, - workspace_id=str(app.workspace_id), - user_id=str(triggered_by) - ) - - # 更新执行结果 - if result.get("status") == "completed": - token_usage = result.get("data").get("token_usage", {}) or {} - self.update_execution_status( - execution.execution_id, - "completed", - output_data=result.get("node_outputs", {}), - token_usage=token_usage.get("total_tokens", None) - ) - else: - self.update_execution_status( - execution.execution_id, - "failed", - error_message=result.get("error") - ) - - # 返回增强的响应结构 - return { - "execution_id": execution.execution_id, - "status": result.get("status"), - "output": result.get("output"), # 最终输出(字符串) - "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) - "error_message": result.get("error"), - "elapsed_time": result.get("elapsed_time"), - "token_usage": result.get("token_usage") - } - - except Exception as e: - logger.error(f"工作流执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True) - self.update_execution_status( - execution.execution_id, - "failed", - error_message=str(e) - ) - raise BusinessException( - code=BizCode.INTERNAL_ERROR, - message=f"工作流执行失败: {str(e)}" - ) - def _clean_event_for_json(self, event: dict[str, Any]) -> dict[str, Any]: """清理事件数据,移除不可序列化的对象 @@ -869,72 +743,6 @@ class WorkflowService: return clean_value(event) - @deprecated(reason="This method is deprecated. Please use WorkflowService.run_stream instead.") - async def _run_workflow_stream( - self, - workflow_config: dict[str, Any], - input_data: dict[str, Any], - execution_id: str, - workspace_id: str, - user_id: str): - """运行工作流(流式,内部方法) - - Args: - workflow_config: 工作流配置 - input_data: 输入数据 - execution_id: 执行 ID - workspace_id: 工作空间 ID - user_id: 用户 ID - - Yields: - 流式事件(格式:{"event": "", "data": {...}}) - """ - from app.core.workflow.executor import execute_workflow_stream - - try: - async for event in execute_workflow_stream( - workflow_config=workflow_config, - input_data=input_data, - execution_id=execution_id, - workspace_id=workspace_id, - user_id=user_id - ): - # 直接转发事件(executor 已经返回正确格式) - if event.get("event") == "workflow_end": - token_usage = event.get("data").get("token_usage", {}) or {} - status = event.get("data", {}).get("status") - if status == "completed": - self.update_execution_status( - execution_id, - "completed", - output_data=event.get("data"), - token_usage=token_usage.get("total_tokens", None) - ) - elif status == "failed": - self.update_execution_status( - execution_id, - "failed", - output_data=event.get("data") - ) - else: - logger.error(f"unexpect workflow run status, status: {status}") - yield event - - except Exception as e: - logger.error(f"工作流流式执行失败: execution_id={execution_id}, error={e}", exc_info=True) - self.update_execution_status( - execution_id, - "failed", - error_message=str(e) - ) - yield { - "event": "error", - "data": { - "execution_id": execution_id, - "error": str(e) - } - } - # ==================== 依赖注入函数 ====================