diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py index 4f3ecf5f..be593068 100644 --- a/api/app/services/app_chat_service.py +++ b/api/app/services/app_chat_service.py @@ -475,11 +475,298 @@ class AppChatService: # 发送错误事件 yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n" + async def workflow_chat( + self, + message: str, + conversation_id: uuid.UUID, + config: AgentConfig, + user_id: Optional[str] = None, + variables: Optional[Dict[str, Any]] = None, + web_search: bool = False, + memory: bool = True, + storage_type: Optional[str] = None, + user_rag_memory_id: Optional[str] = None, + ) -> Dict[str, Any]: + """聊天(非流式)""" + start_time = time.time() + config_id = None + + if variables is None: + variables = {} + + # 获取模型配置ID + model_config_id = config.default_model_config_id + api_key_obj = ModelApiKeyService.get_a_api_key(model_config_id) + # 处理系统提示词(支持变量替换) + system_prompt = config.get("system_prompt", "") + if variables: + system_prompt_rendered = render_prompt_message( + system_prompt, + PromptMessageRole.USER, + variables + ) + system_prompt = system_prompt_rendered.get_text_content() or system_prompt + + # 准备工具列表 + tools = [] + + # 添加知识库检索工具 + knowledge_retrieval = config.get("knowledge_retrieval") + if knowledge_retrieval: + knowledge_bases = knowledge_retrieval.get("knowledge_bases", []) + kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")] + if kb_ids: + kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id) + tools.append(kb_tool) + + # 添加长期记忆工具 + memory_flag = False + if memory == True: + memory_config = config.get("memory", {}) + if memory_config.get("enabled") and user_id: + memory_flag = True + memory_tool = create_long_term_memory_tool(memory_config, user_id) + tools.append(memory_tool) + + web_tools = config.get("tools") + web_search_choice = web_tools.get("web_search", {}) + web_search_enable = web_search_choice.get("enabled", False) + if web_search == True: + if web_search_enable == True: + search_tool = create_web_search_tool({}) + tools.append(search_tool) + + logger.debug( + "已添加网络搜索工具", + extra={ + "tool_count": len(tools) + } + ) + + # 获取模型参数 + model_parameters = config.get("model_parameters", {}) + + # 创建 LangChain Agent + agent = LangChainAgent( + model_name=api_key_obj.model_name, + api_key=api_key_obj.api_key, + provider=api_key_obj.provider, + api_base=api_key_obj.api_base, + temperature=model_parameters.get("temperature", 0.7), + max_tokens=model_parameters.get("max_tokens", 2000), + system_prompt=system_prompt, + tools=tools, + + ) + + # 加载历史消息 + history = [] + memory_config = {"enabled": True, 'max_history': 10} + if memory_config.get("enabled"): + messages = self.conversation_service.get_messages( + conversation_id=conversation_id, + limit=memory_config.get("max_history", 10) + ) + history = [ + {"role": msg.role, "content": msg.content} + for msg in messages + ] + + # 调用 Agent + result = await agent.chat( + message=message, + history=history, + context=None, + end_user_id=user_id, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id, + config_id=config_id, + memory_flag=memory_flag + ) + + # 保存消息 + self.conversation_service.save_conversation_messages( + conversation_id=conversation_id, + user_message=message, + assistant_message=result["content"] + ) + + elapsed_time = time.time() - start_time + + return { + "conversation_id": conversation_id, + "message": result["content"], + "usage": result.get("usage", { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0 + }), + "elapsed_time": elapsed_time + } + + async def workflow_chat_stream( + self, + message: str, + conversation_id: uuid.UUID, + config: AgentConfig, + user_id: Optional[str] = None, + variables: Optional[Dict[str, Any]] = None, + web_search: bool = False, + memory: bool = True, + storage_type: Optional[str] = None, + user_rag_memory_id: Optional[str] = None, + ) -> AsyncGenerator[str, None]: + """聊天(流式)""" + + try: + start_time = time.time() + config_id = None + + if variables is None: + variables = {} + + # 获取模型配置ID + model_config_id = config.default_model_config_id + api_key_obj = ModelApiKeyService.get_a_api_key(model_config_id) + # 处理系统提示词(支持变量替换) + system_prompt = config.get("system_prompt", "") + if variables: + system_prompt_rendered = render_prompt_message( + system_prompt, + PromptMessageRole.USER, + variables + ) + system_prompt = system_prompt_rendered.get_text_content() or system_prompt + + # 准备工具列表 + tools = [] + + # 添加知识库检索工具 + knowledge_retrieval = config.get("knowledge_retrieval") + if knowledge_retrieval: + knowledge_bases = knowledge_retrieval.get("knowledge_bases", []) + kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")] + if kb_ids: + kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id) + tools.append(kb_tool) + + # 添加长期记忆工具 + memory_flag = False + if memory: + memory_config = config.get("memory", {}) + if memory_config.get("enabled") and user_id: + memory_flag = True + memory_tool = create_long_term_memory_tool(memory_config, user_id) + tools.append(memory_tool) + + web_tools = config.get("tools") + web_search_choice = web_tools.get("web_search", {}) + web_search_enable = web_search_choice.get("enabled", False) + if web_search == True: + if web_search_enable == True: + search_tool = create_web_search_tool({}) + tools.append(search_tool) + + logger.debug( + "已添加网络搜索工具", + extra={ + "tool_count": len(tools) + } + ) + + # 获取模型参数 + model_parameters = config.get("model_parameters", {}) + + # 创建 LangChain Agent + agent = LangChainAgent( + model_name=api_key_obj.model_name, + api_key=api_key_obj.api_key, + provider=api_key_obj.provider, + api_base=api_key_obj.api_base, + temperature=model_parameters.get("temperature", 0.7), + max_tokens=model_parameters.get("max_tokens", 2000), + system_prompt=system_prompt, + tools=tools, + streaming=True + ) + + # 加载历史消息 + history = [] + memory_config = {"enabled": True, 'max_history': 10} + if memory_config.get("enabled"): + messages = self.conversation_service.get_messages( + conversation_id=conversation_id, + limit=memory_config.get("max_history", 10) + ) + history = [ + {"role": msg.role, "content": msg.content} + for msg in messages + ] + + # 发送开始事件 + yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" + + # 流式调用 Agent + full_content = "" + async for chunk in agent.chat_stream( + message=message, + history=history, + context=None, + end_user_id=user_id, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id, + config_id=config_id, + memory_flag=memory_flag + ): + full_content += chunk + # 发送消息块事件 + yield f"event: message\ndata: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n" + + elapsed_time = time.time() - start_time + + # 保存消息 + self.conversation_service.add_message( + conversation_id=conversation_id, + role="user", + content=message + ) + + self.conversation_service.add_message( + conversation_id=conversation_id, + role="assistant", + content=full_content, + meta_data={ + "model": api_key_obj.model_name, + "usage": {} + } + ) + + # 发送结束事件 + end_data = {"elapsed_time": elapsed_time, "message_length": len(full_content)} + yield f"event: end\ndata: {json.dumps(end_data, ensure_ascii=False)}\n\n" + + logger.info( + "流式聊天完成", + extra={ + "conversation_id": str(conversation_id), + "elapsed_time": elapsed_time, + "message_length": len(full_content) + } + ) + + except (GeneratorExit, asyncio.CancelledError): + # 生成器被关闭或任务被取消,正常退出 + logger.debug("流式聊天被中断") + raise + except Exception as e: + logger.error(f"流式聊天失败: {str(e)}", exc_info=True) + # 发送错误事件 + yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n" # ==================== 依赖注入函数 ==================== def get_app_chat_service( db: Annotated[Session, Depends(get_db)] -) -> ChatService: +) -> AppChatService: """获取工作流服务(依赖注入)""" - return ChatService(db) + return AppChatService(db) diff --git a/api/app/utils/app_config_utils.py b/api/app/utils/app_config_utils.py new file mode 100644 index 00000000..4202644a --- /dev/null +++ b/api/app/utils/app_config_utils.py @@ -0,0 +1,301 @@ +""" +App Config Utilities + +Utility functions for converting between dict and model objects for different app configurations. +""" + +import uuid +from typing import Dict, Any, Optional +from datetime import datetime + + +class AgentConfigProxy: + """Proxy class for AgentConfig (legacy compatibility)""" + + def __init__(self, release, app, config_data): + self.id = release.id + self.app_id = release.app_id + self.app = app + self.name = release.name + self.description = release.description + self.system_prompt = config_data.get("system_prompt") + self.default_model_config_id = release.default_model_config_id + + +def dict_to_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None): + """Convert dict to AgentConfig model object + + Args: + config_dict: Configuration dictionary + app_id: Optional app ID (if not provided in dict) + + Returns: + AgentConfig model instance (not yet persisted to database) + + Example: + >>> config_dict = { + ... "app_id": "uuid-here", + ... "system_prompt": "You are a helpful assistant", + ... "default_model_config_id": "model-uuid", + ... "model_parameters": {"temperature": 0.7, "max_tokens": 2000}, + ... "knowledge_retrieval": {"enabled": True, "top_k": 5}, + ... "memory": {"enabled": True, "window_size": 10}, + ... "variables": [{"name": "user_name", "type": "string"}], + ... "tools": {"enabled_tools": ["web_search", "calculator"]}, + ... "agent_role": "standalone", + ... "agent_domain": "customer_service", + ... "capabilities": ["chat", "search"] + ... } + >>> agent_config = dict_to_agent_config(config_dict) + """ + from app.models.agent_app_config_model import AgentConfig + + # Extract app_id + final_app_id = config_dict.get("app_id") or app_id + if not final_app_id: + raise ValueError("app_id is required") + + # Convert string UUID to UUID object if needed + if isinstance(final_app_id, str): + final_app_id = uuid.UUID(final_app_id) + + # Convert default_model_config_id if present + default_model_config_id = config_dict.get("default_model_config_id") + if default_model_config_id and isinstance(default_model_config_id, str): + default_model_config_id = uuid.UUID(default_model_config_id) + + # Convert parent_agent_id if present + parent_agent_id = config_dict.get("parent_agent_id") + if parent_agent_id and isinstance(parent_agent_id, str): + parent_agent_id = uuid.UUID(parent_agent_id) + + # Create AgentConfig instance + agent_config = AgentConfig( + id=uuid.UUID(config_dict["id"]) if "id" in config_dict else uuid.uuid4(), + app_id=final_app_id, + system_prompt=config_dict.get("system_prompt"), + default_model_config_id=default_model_config_id, + model_parameters=config_dict.get("model_parameters"), + knowledge_retrieval=config_dict.get("knowledge_retrieval"), + memory=config_dict.get("memory"), + variables=config_dict.get("variables", []), + tools=config_dict.get("tools", {}), + agent_role=config_dict.get("agent_role"), + agent_domain=config_dict.get("agent_domain"), + parent_agent_id=parent_agent_id, + capabilities=config_dict.get("capabilities", []), + is_active=config_dict.get("is_active", True), + created_at=config_dict.get("created_at", datetime.now()), + updated_at=config_dict.get("updated_at", datetime.now()) + ) + + return agent_config + + +def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None): + """Convert dict to MultiAgentConfig model object + + Args: + config_dict: Configuration dictionary + app_id: Optional app ID (if not provided in dict) + + Returns: + MultiAgentConfig model instance (not yet persisted to database) + + Example: + >>> config_dict = { + ... "app_id": "uuid-here", + ... "master_agent_id": "master-uuid", + ... "master_agent_name": "Master Agent", + ... "orchestration_mode": "conditional", + ... "sub_agents": [ + ... {"agent_id": "sub1-uuid", "name": "Sub Agent 1", "role": "specialist", "priority": 1}, + ... {"agent_id": "sub2-uuid", "name": "Sub Agent 2", "role": "specialist", "priority": 2} + ... ], + ... "routing_rules": [ + ... {"condition": "intent == 'technical'", "target_agent_id": "sub1-uuid", "priority": 1} + ... ], + ... "execution_config": {"max_iterations": 5, "timeout": 60, "parallel_limit": 3}, + ... "aggregation_strategy": "merge" + ... } + >>> multi_agent_config = dict_to_multi_agent_config(config_dict) + """ + from app.models.multi_agent_model import MultiAgentConfig + + # Extract app_id + final_app_id = config_dict.get("app_id") or app_id + if not final_app_id: + raise ValueError("app_id is required") + + # Convert string UUID to UUID object if needed + if isinstance(final_app_id, str): + final_app_id = uuid.UUID(final_app_id) + + # Convert master_agent_id + master_agent_id = config_dict.get("master_agent_id") + if not master_agent_id: + raise ValueError("master_agent_id is required") + if isinstance(master_agent_id, str): + master_agent_id = uuid.UUID(master_agent_id) + + # Create MultiAgentConfig instance + multi_agent_config = MultiAgentConfig( + id=uuid.UUID(config_dict["id"]) if "id" in config_dict else uuid.uuid4(), + app_id=final_app_id, + master_agent_id=master_agent_id, + master_agent_name=config_dict.get("master_agent_name"), + orchestration_mode=config_dict.get("orchestration_mode", "conditional"), + sub_agents=config_dict.get("sub_agents", []), + routing_rules=config_dict.get("routing_rules"), + execution_config=config_dict.get("execution_config", {}), + aggregation_strategy=config_dict.get("aggregation_strategy", "merge"), + is_active=config_dict.get("is_active", True), + created_at=config_dict.get("created_at", datetime.now()), + updated_at=config_dict.get("updated_at", datetime.now()) + ) + + return multi_agent_config + + +def dict_to_workflow_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None): + """Convert dict to WorkflowConfig model object + + Args: + config_dict: Configuration dictionary + app_id: Optional app ID (if not provided in dict) + + Returns: + WorkflowConfig model instance (not yet persisted to database) + + Example: + >>> config_dict = { + ... "app_id": "uuid-here", + ... "nodes": [ + ... {"id": "start", "type": "start", "config": {}}, + ... {"id": "llm", "type": "llm", "config": {"model": "gpt-4"}}, + ... {"id": "end", "type": "end", "config": {"output": "{{llm.output}}"}} + ... ], + ... "edges": [ + ... {"source": "start", "target": "llm"}, + ... {"source": "llm", "target": "end"} + ... ], + ... "variables": [ + ... {"name": "user_input", "type": "string", "default": ""} + ... ], + ... "execution_config": { + ... "max_iterations": 10, + ... "timeout": 300, + ... "enable_streaming": True + ... }, + ... "triggers": [ + ... {"type": "manual", "enabled": True} + ... ] + ... } + >>> workflow_config = dict_to_workflow_config(config_dict) + """ + from app.models.workflow_model import WorkflowConfig + + # Extract app_id + final_app_id = config_dict.get("app_id") or app_id + if not final_app_id: + raise ValueError("app_id is required") + + # Convert string UUID to UUID object if needed + if isinstance(final_app_id, str): + final_app_id = uuid.UUID(final_app_id) + + # Create WorkflowConfig instance + workflow_config = WorkflowConfig( + id=uuid.UUID(config_dict["id"]) if "id" in config_dict else uuid.uuid4(), + app_id=final_app_id, + nodes=config_dict.get("nodes", []), + edges=config_dict.get("edges", []), + variables=config_dict.get("variables", []), + execution_config=config_dict.get("execution_config", {}), + triggers=config_dict.get("triggers", []), + is_active=config_dict.get("is_active", True), + created_at=config_dict.get("created_at", datetime.now()), + updated_at=config_dict.get("updated_at", datetime.now()) + ) + + return workflow_config + + +def agent_config_to_dict(agent_config) -> Dict[str, Any]: + """Convert AgentConfig model to dict + + Args: + agent_config: AgentConfig model instance + + Returns: + Configuration dictionary + """ + return { + "id": str(agent_config.id), + "app_id": str(agent_config.app_id), + "system_prompt": agent_config.system_prompt, + "default_model_config_id": str(agent_config.default_model_config_id) if agent_config.default_model_config_id else None, + "model_parameters": agent_config.model_parameters, + "knowledge_retrieval": agent_config.knowledge_retrieval, + "memory": agent_config.memory, + "variables": agent_config.variables, + "tools": agent_config.tools, + "agent_role": agent_config.agent_role, + "agent_domain": agent_config.agent_domain, + "parent_agent_id": str(agent_config.parent_agent_id) if agent_config.parent_agent_id else None, + "capabilities": agent_config.capabilities, + "is_active": agent_config.is_active, + "created_at": agent_config.created_at.isoformat() if agent_config.created_at else None, + "updated_at": agent_config.updated_at.isoformat() if agent_config.updated_at else None + } + + +def multi_agent_config_to_dict(multi_agent_config) -> Dict[str, Any]: + """Convert MultiAgentConfig model to dict + + Args: + multi_agent_config: MultiAgentConfig model instance + + Returns: + Configuration dictionary + """ + return { + "id": str(multi_agent_config.id), + "app_id": str(multi_agent_config.app_id), + "master_agent_id": str(multi_agent_config.master_agent_id), + "master_agent_name": multi_agent_config.master_agent_name, + "orchestration_mode": multi_agent_config.orchestration_mode, + "sub_agents": multi_agent_config.sub_agents, + "routing_rules": multi_agent_config.routing_rules, + "execution_config": multi_agent_config.execution_config, + "aggregation_strategy": multi_agent_config.aggregation_strategy, + "is_active": multi_agent_config.is_active, + "created_at": multi_agent_config.created_at.isoformat() if multi_agent_config.created_at else None, + "updated_at": multi_agent_config.updated_at.isoformat() if multi_agent_config.updated_at else None + } + + +def workflow_config_to_dict(workflow_config) -> Dict[str, Any]: + """Convert WorkflowConfig model to dict + + Args: + workflow_config: WorkflowConfig model instance + + Returns: + Configuration dictionary + """ + return { + "id": str(workflow_config.id), + "app_id": str(workflow_config.app_id), + "nodes": workflow_config.nodes, + "edges": workflow_config.edges, + "variables": workflow_config.variables, + "execution_config": workflow_config.execution_config, + "triggers": workflow_config.triggers, + "is_active": workflow_config.is_active, + "created_at": workflow_config.created_at.isoformat() if workflow_config.created_at else None, + "updated_at": workflow_config.updated_at.isoformat() if workflow_config.updated_at else None + } + + +