diff --git a/api/app/controllers/public_share_controller.py b/api/app/controllers/public_share_controller.py index 5fe4eb56..adb199fb 100644 --- a/api/app/controllers/public_share_controller.py +++ b/api/app/controllers/public_share_controller.py @@ -1,4 +1,5 @@ import hashlib +import json import uuid from typing import Annotated from fastapi import APIRouter, Depends, Query, Request @@ -18,7 +19,7 @@ from app.services.conversation_service import ConversationService from app.services.release_share_service import ReleaseShareService from app.services.shared_chat_service import SharedChatService from app.services.app_chat_service import AppChatService, get_app_chat_service -from app.utils.app_config_utils import dict_to_multi_agent_config, dict_to_workflow_config, agent_config_4_app_release, multi_agent_config_4_app_release +from app.utils.app_config_utils import dict_to_multi_agent_config, workflow_config_4_app_release, agent_config_4_app_release, multi_agent_config_4_app_release router = APIRouter(prefix="/public/share", tags=["Public Share"]) logger = get_business_logger() @@ -288,7 +289,7 @@ async def chat( password = None # Token 认证不需要密码 # end_user_id = user_id other_id = user_id - + # 提前验证和准备(在流式响应开始前完成) # 这样可以确保错误能正确返回,而不是在流式响应中间出错 from app.models.app_model import AppType @@ -364,6 +365,9 @@ async def chat( config = release.config or {} if not config.get("sub_agents"): raise BusinessException("多 Agent 应用未配置子 Agent", BizCode.AGENT_CONFIG_MISSING) + elif app_type == AppType.WORKFLOW: + # Multi-Agent 类型:验证多 Agent 配置 + pass else: raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED) @@ -469,6 +473,7 @@ async def chat( ) return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json")) elif app_type == AppType.MULTI_AGENT: + # config = workflow_config_4_app_release(release) config = multi_agent_config_4_app_release(release) if payload.stream: async def event_generator(): @@ -553,8 +558,71 @@ async def chat( # ) # return success(data=conversation_schema.ChatResponse(**result)) + elif app_type == AppType.WORKFLOW: + + config = workflow_config_4_app_release(release) + if payload.stream: + async def event_generator(): + async for event in app_chat_service.workflow_chat_stream( + + message=payload.message, + conversation_id=conversation.id, # 使用已创建的会话 ID + user_id=new_end_user.id, # 转换为字符串 + variables=payload.variables, + config=config, + web_search=payload.web_search, + memory=payload.memory, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id, + app_id=release.app_id, + workspace_id=workspace_id + ): + event_type = event.get("event", "message") + event_data = event.get("data", {}) + + # 转换为标准 SSE 格式(字符串) + sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n" + yield sse_message + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" + } + ) + + # 多 Agent 非流式返回 + result = await app_chat_service.workflow_chat( + + message=payload.message, + conversation_id=conversation.id, # 使用已创建的会话 ID + user_id=new_end_user.id, # 转换为字符串 + variables=payload.variables, + config=config, + web_search=payload.web_search, + memory=payload.memory, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id, + app_id=release.app_id, + workspace_id=workspace_id + ) + logger.debug( + "工作流试运行返回结果", + extra={ + "result_type": str(type(result)), + "has_response": "response" in result if isinstance(result, dict) else False + } + ) + return success( + data=result, + msg="工作流任务执行成功" + ) + # return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json")) + else: from app.core.exceptions import BusinessException from app.core.error_codes import BizCode raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED) - pass diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py index 29e92cf6..6b7b3103 100644 --- a/api/app/services/app_chat_service.py +++ b/api/app/services/app_chat_service.py @@ -9,15 +9,18 @@ from fastapi import Depends from sqlalchemy.orm import Session from app.core.agent.langchain_agent import LangChainAgent +from app.core.error_codes import BizCode +from app.core.exceptions import BusinessException from app.core.logging_config import get_business_logger -from app.db import get_db -from app.models import MultiAgentConfig, AgentConfig +from app.db import get_db, get_db_context +from app.models import MultiAgentConfig, AgentConfig, WorkflowConfig from app.schemas.prompt_schema import render_prompt_message, PromptMessageRole from app.services.conversation_service import ConversationService from app.services.draft_run_service import create_knowledge_retrieval_tool, create_long_term_memory_tool from app.services.draft_run_service import create_web_search_tool from app.services.model_service import ModelApiKeyService from app.services.multi_agent_orchestrator import MultiAgentOrchestrator +from app.services.workflow_service import WorkflowService logger = get_business_logger() @@ -479,7 +482,9 @@ class AppChatService: self, message: str, conversation_id: uuid.UUID, - config: AgentConfig, + config: WorkflowConfig, + app_id: uuid.UUID, + workspace_id: uuid.UUID, user_id: Optional[str] = None, variables: Optional[Dict[str, Any]] = None, web_search: bool = False, @@ -488,281 +493,158 @@ class AppChatService: user_rag_memory_id: Optional[str] = None, ) -> Dict[str, Any]: """聊天(非流式)""" + workflow_service = WorkflowService(self.db) - start_time = time.time() - config_id = None + input_data = {"message":message, "variables": variables, + "conversation_id": str(conversation_id)} + inconfig = workflow_service.get_workflow_config(app_id) - if variables is None: - variables = {} + # 2. 创建执行记录 + execution = workflow_service.create_execution( + workflow_config_id=inconfig.id, + app_id=app_id, + trigger_type="manual", + triggered_by=None, + conversation_id=conversation_id, + input_data=input_data + ) - # 获取模型配置ID - model_config_id = config.default_model_config_id - api_key_obj = ModelApiKeyService.get_a_api_key(self.db ,model_config_id) - # 处理系统提示词(支持变量替换) - system_prompt = config.get("system_prompt", "") - if variables: - system_prompt_rendered = render_prompt_message( - system_prompt, - PromptMessageRole.USER, - variables + # 3. 构建工作流配置字典 + workflow_config_dict = { + "nodes": config.nodes, + "edges": config.edges, + "variables": config.variables, + "execution_config": config.execution_config + } + + # 4. 获取工作空间 ID(从 app 获取) + + # 5. 执行工作流 + from app.core.workflow.executor import execute_workflow + + try: + # 更新状态为运行中 + workflow_service.update_execution_status(execution.execution_id, "running") + + result = await execute_workflow( + workflow_config=workflow_config_dict, + input_data=input_data, + execution_id=execution.execution_id, + workspace_id=str(workspace_id), + user_id=user_id ) - system_prompt = system_prompt_rendered.get_text_content() or system_prompt - # 准备工具列表 - tools = [] - - # 添加知识库检索工具 - knowledge_retrieval = config.get("knowledge_retrieval") - if knowledge_retrieval: - knowledge_bases = knowledge_retrieval.get("knowledge_bases", []) - kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")] - if kb_ids: - kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id) - tools.append(kb_tool) - - # 添加长期记忆工具 - memory_flag = False - if memory == True: - memory_config = config.get("memory", {}) - if memory_config.get("enabled") and user_id: - memory_flag = True - memory_tool = create_long_term_memory_tool(memory_config, user_id) - tools.append(memory_tool) - - web_tools = config.get("tools") - web_search_choice = web_tools.get("web_search", {}) - web_search_enable = web_search_choice.get("enabled", False) - if web_search == True: - if web_search_enable == True: - search_tool = create_web_search_tool({}) - tools.append(search_tool) - - logger.debug( - "已添加网络搜索工具", - extra={ - "tool_count": len(tools) - } + # 更新执行结果 + if result.get("status") == "completed": + workflow_service.update_execution_status( + execution.execution_id, + "completed", + output_data=result.get("node_outputs", {}) + ) + else: + workflow_service.update_execution_status( + execution.execution_id, + "failed", + error_message=result.get("error") ) - # 获取模型参数 - model_parameters = config.get("model_parameters", {}) + # 返回增强的响应结构 + return { + "execution_id": execution.execution_id, + "status": result.get("status"), + "output": result.get("output"), # 最终输出(字符串) + "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) + "conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID + "error_message": result.get("error"), + "elapsed_time": result.get("elapsed_time"), + "token_usage": result.get("token_usage") + } - # 创建 LangChain Agent - agent = LangChainAgent( - model_name=api_key_obj.model_name, - api_key=api_key_obj.api_key, - provider=api_key_obj.provider, - api_base=api_key_obj.api_base, - temperature=model_parameters.get("temperature", 0.7), - max_tokens=model_parameters.get("max_tokens", 2000), - system_prompt=system_prompt, - tools=tools, - - ) - - # 加载历史消息 - history = [] - memory_config = {"enabled": True, 'max_history': 10} - if memory_config.get("enabled"): - messages = self.conversation_service.get_messages( - conversation_id=conversation_id, - limit=memory_config.get("max_history", 10) + except Exception as e: + logger.error(f"工作流执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True) + workflow_service.update_execution_status( + execution.execution_id, + "failed", + error_message=str(e) + ) + raise BusinessException( + code=BizCode.INTERNAL_ERROR, + message=f"工作流执行失败: {str(e)}" ) - history = [ - {"role": msg.role, "content": msg.content} - for msg in messages - ] - - # 调用 Agent - result = await agent.chat( - message=message, - history=history, - context=None, - end_user_id=user_id, - storage_type=storage_type, - user_rag_memory_id=user_rag_memory_id, - config_id=config_id, - memory_flag=memory_flag - ) - - # 保存消息 - self.conversation_service.save_conversation_messages( - conversation_id=conversation_id, - user_message=message, - assistant_message=result["content"] - ) - - elapsed_time = time.time() - start_time - - return { - "conversation_id": conversation_id, - "message": result["content"], - "usage": result.get("usage", { - "prompt_tokens": 0, - "completion_tokens": 0, - "total_tokens": 0 - }), - "elapsed_time": elapsed_time - } async def workflow_chat_stream( self, message: str, conversation_id: uuid.UUID, - config: AgentConfig, + config: WorkflowConfig, + app_id: uuid.UUID, + workspace_id: uuid.UUID, user_id: Optional[str] = None, variables: Optional[Dict[str, Any]] = None, web_search: bool = False, memory: bool = True, storage_type: Optional[str] = None, user_rag_memory_id: Optional[str] = None, + ) -> AsyncGenerator[str, None]: """聊天(流式)""" + workflow_service = WorkflowService(self.db) + input_data = {"message": message, "variables": variables, + "conversation_id": str(conversation_id)} + inconfig = workflow_service.get_workflow_config(app_id) + # 2. 创建执行记录 + execution = workflow_service.create_execution( + workflow_config_id=inconfig.id, + app_id=app_id, + trigger_type="manual", + triggered_by=None, + conversation_id=conversation_id, + input_data=input_data + ) + + # 3. 构建工作流配置字典 + workflow_config_dict = { + "nodes": config.nodes, + "edges": config.edges, + "variables": config.variables, + "execution_config": config.execution_config + } + + # 4. 获取工作空间 ID(从 app 获取) + + # 5. 流式执行工作流 try: - start_time = time.time() - config_id = None + # 更新状态为运行中 + workflow_service.update_execution_status(execution.execution_id, "running") - if variables is None: - variables = {} - # 获取模型配置ID - model_config_id = config.default_model_config_id - api_key_obj = ModelApiKeyService.get_a_api_key(self.db ,model_config_id) - # 处理系统提示词(支持变量替换) - system_prompt = config.get("system_prompt", "") - if variables: - system_prompt_rendered = render_prompt_message( - system_prompt, - PromptMessageRole.USER, - variables - ) - system_prompt = system_prompt_rendered.get_text_content() or system_prompt - - # 准备工具列表 - tools = [] - - # 添加知识库检索工具 - knowledge_retrieval = config.get("knowledge_retrieval") - if knowledge_retrieval: - knowledge_bases = knowledge_retrieval.get("knowledge_bases", []) - kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")] - if kb_ids: - kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id) - tools.append(kb_tool) - - # 添加长期记忆工具 - memory_flag = False - if memory: - memory_config = config.get("memory", {}) - if memory_config.get("enabled") and user_id: - memory_flag = True - memory_tool = create_long_term_memory_tool(memory_config, user_id) - tools.append(memory_tool) - - web_tools = config.get("tools") - web_search_choice = web_tools.get("web_search", {}) - web_search_enable = web_search_choice.get("enabled", False) - if web_search == True: - if web_search_enable == True: - search_tool = create_web_search_tool({}) - tools.append(search_tool) - - logger.debug( - "已添加网络搜索工具", - extra={ - "tool_count": len(tools) - } - ) - - # 获取模型参数 - model_parameters = config.get("model_parameters", {}) - - # 创建 LangChain Agent - agent = LangChainAgent( - model_name=api_key_obj.model_name, - api_key=api_key_obj.api_key, - provider=api_key_obj.provider, - api_base=api_key_obj.api_base, - temperature=model_parameters.get("temperature", 0.7), - max_tokens=model_parameters.get("max_tokens", 2000), - system_prompt=system_prompt, - tools=tools, - streaming=True - ) - - # 加载历史消息 - history = [] - memory_config = {"enabled": True, 'max_history': 10} - if memory_config.get("enabled"): - messages = self.conversation_service.get_messages( - conversation_id=conversation_id, - limit=memory_config.get("max_history", 10) - ) - history = [ - {"role": msg.role, "content": msg.content} - for msg in messages - ] - - # 发送开始事件 - yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" - - # 流式调用 Agent - full_content = "" - async for chunk in agent.chat_stream( - message=message, - history=history, - context=None, - end_user_id=user_id, - storage_type=storage_type, - user_rag_memory_id=user_rag_memory_id, - config_id=config_id, - memory_flag=memory_flag + # 调用流式执行(executor 会发送 workflow_start 和 workflow_end 事件) + async for event in workflow_service._run_workflow_stream( + workflow_config=workflow_config_dict, + input_data=input_data, + execution_id=execution.execution_id, + workspace_id=str(workspace_id), + user_id=user_id ): - full_content += chunk - # 发送消息块事件 - yield f"event: message\ndata: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n" + # 直接转发 executor 的事件(已经是正确的格式) + yield event - elapsed_time = time.time() - start_time - - # 保存消息 - self.conversation_service.add_message( - conversation_id=conversation_id, - role="user", - content=message - ) - - self.conversation_service.add_message( - conversation_id=conversation_id, - role="assistant", - content=full_content, - meta_data={ - "model": api_key_obj.model_name, - "usage": {} - } - ) - - # 发送结束事件 - end_data = {"elapsed_time": elapsed_time, "message_length": len(full_content)} - yield f"event: end\ndata: {json.dumps(end_data, ensure_ascii=False)}\n\n" - - logger.info( - "流式聊天完成", - extra={ - "conversation_id": str(conversation_id), - "elapsed_time": elapsed_time, - "message_length": len(full_content) - } - ) - - except (GeneratorExit, asyncio.CancelledError): - # 生成器被关闭或任务被取消,正常退出 - logger.debug("流式聊天被中断") - raise except Exception as e: - logger.error(f"流式聊天失败: {str(e)}", exc_info=True) + logger.error(f"工作流流式执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True) + workflow_service.update_execution_status( + execution.execution_id, + "failed", + error_message=str(e) + ) # 发送错误事件 - yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n" + yield { + "event": "error", + "data": { + "execution_id": execution.execution_id, + "error": str(e) + } + } # ==================== 依赖注入函数 ==================== diff --git a/api/app/services/app_service.py b/api/app/services/app_service.py index 95bcc07a..38097c4e 100644 --- a/api/app/services/app_service.py +++ b/api/app/services/app_service.py @@ -21,6 +21,7 @@ from app.core.exceptions import ( BusinessException, ) from app.core.logging_config import get_business_logger +from app.core.workflow.validator import WorkflowValidator from app.db import get_db from app.models import App, AgentConfig, AppRelease, MultiAgentConfig, WorkflowConfig from app.models.app_model import AppStatus, AppType @@ -31,6 +32,7 @@ from app.schemas.workflow_schema import WorkflowConfigUpdate from app.services.agent_config_converter import AgentConfigConverter from app.models import AppShare, Workspace from app.services.model_service import ModelApiKeyService +from app.services.workflow_service import WorkflowService # 获取业务日志器 logger = get_business_logger() @@ -1225,6 +1227,26 @@ class AppService: "orchestration_mode": multi_agent_cfg.orchestration_mode } ) + elif app.type == AppType.WORKFLOW: + service = WorkflowService(self.db) + workflow_cfg = service.get_workflow_config(app_id) + if not workflow_cfg: + raise BusinessException("应用缺少有效配置,无法发布", BizCode.CONFIG_MISSING) + + config = { + "nodes": workflow_cfg.nodes, + "edges": workflow_cfg.edges, + "variables": workflow_cfg.variables, + "execution_config": workflow_cfg.execution_config, + "triggers": workflow_cfg.triggers + } + + is_valid, errors = WorkflowValidator.validate_for_publish(config) + if not is_valid: + raise BusinessException("应用缺少有效配置,无法发布", BizCode.CONFIG_MISSING) + logger.info( + "应用发布配置准备完成" + ) now = datetime.datetime.now() version = self._get_next_version(app_id) diff --git a/api/app/services/multi_agent_orchestrator.py b/api/app/services/multi_agent_orchestrator.py index 08ae7e57..fd3ce229 100644 --- a/api/app/services/multi_agent_orchestrator.py +++ b/api/app/services/multi_agent_orchestrator.py @@ -1293,6 +1293,7 @@ class MultiAgentOrchestrator: conversation_id: 会话 ID user_id: 用户 ID + Returns: 执行结果 """ diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index d96efdf7..68d6279b 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -17,6 +17,7 @@ from app.core.workflow.validator import validate_workflow_config from app.db import get_db, get_db_context from app.models.workflow_model import WorkflowConfig, WorkflowExecution from app.repositories.end_user_repository import EndUserRepository +from app.services.multi_agent_service import convert_uuids_to_str from app.repositories.workflow_repository import ( WorkflowConfigRepository, WorkflowExecutionRepository, @@ -364,7 +365,7 @@ class WorkflowService: execution.status = status if output_data is not None: - execution.output_data = output_data + execution.output_data = convert_uuids_to_str(output_data) if error_message is not None: execution.error_message = error_message if error_node_id is not None: diff --git a/api/app/utils/app_config_utils.py b/api/app/utils/app_config_utils.py index 4fe692c1..97e64214 100644 --- a/api/app/utils/app_config_utils.py +++ b/api/app/utils/app_config_utils.py @@ -8,7 +8,7 @@ import uuid from typing import Dict, Any, Optional from datetime import datetime -from app.models import AppRelease +from app.models import AppRelease, WorkflowConfig from app.models.agent_app_config_model import AgentConfig from app.models.multi_agent_model import MultiAgentConfig @@ -28,7 +28,7 @@ class AgentConfigProxy: def agent_config_4_app_release(release: AppRelease ) -> AgentConfig: config_dict = release.config - + agent_config = AgentConfig( app_id=release.app_id, system_prompt=config_dict.get("system_prompt"), @@ -45,10 +45,10 @@ def agent_config_4_app_release(release: AppRelease ) -> AgentConfig: def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig: config_dict = release.config - + agent_config = MultiAgentConfig( - app_id=release.app_id, + app_id=release.app_id, default_model_config_id=release.default_model_config_id, model_parameters=config_dict.get("model_parameters"), master_agent_id=config_dict.get("master_agent_id"), @@ -58,11 +58,29 @@ def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig: routing_rules=config_dict.get("routing_rules"), execution_config=config_dict.get("execution_config", {}), aggregation_strategy=config_dict.get("aggregation_strategy", "merge"), - + ) return agent_config +def workflow_config_4_app_release(release: AppRelease ) -> WorkflowConfig: + + config_dict = release.config + + + config = WorkflowConfig( + id=release.id, + app_id=release.app_id, + nodes=config_dict.get("nodes", []), + edges=config_dict.get("edges", []), + variables=config_dict.get("variables", []), + execution_config=config_dict.get("execution_config", {}), + triggers=config_dict.get("triggers", []) + + ) + + return config + def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None): """Convert dict to MultiAgentConfig model object