From b00d6e37e310352171d928800f6ad5c901b1fca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E4=BF=8A=E7=94=B7?= Date: Sat, 20 Dec 2025 16:03:06 +0800 Subject: [PATCH] feat(tool system): tool system development --- api/app/core/workflow/executor.py | 356 +++++++++++++++--------------- api/app/services/agent_tools.py | 219 +----------------- 2 files changed, 179 insertions(+), 396 deletions(-) diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index 6effaa5b..46f8cf08 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -5,7 +5,7 @@ """ import logging -import uuid +# import uuid import datetime from typing import Any @@ -16,10 +16,11 @@ from langgraph.graph.state import CompiledStateGraph from app.core.workflow.expression_evaluator import evaluate_condition from app.core.workflow.nodes import WorkflowState, NodeFactory from app.core.workflow.nodes.enums import NodeType -from app.core.tools.registry import ToolRegistry -from app.core.tools.executor import ToolExecutor -from app.core.tools.langchain_adapter import LangchainAdapter -TOOL_MANAGEMENT_AVAILABLE = True +# from app.core.tools.registry import ToolRegistry +# from app.core.tools.executor import ToolExecutor +# from app.core.tools.langchain_adapter import LangchainAdapter +# TOOL_MANAGEMENT_AVAILABLE = True +# from app.db import get_db logger = logging.getLogger(__name__) @@ -466,176 +467,175 @@ async def execute_workflow_stream( # ==================== 工具管理系统集成 ==================== -def get_workflow_tools(workspace_id: str, user_id: str) -> list: - """获取工作流可用的工具列表 - - Args: - workspace_id: 工作空间ID - user_id: 用户ID - - Returns: - 可用工具列表 - """ - if not TOOL_MANAGEMENT_AVAILABLE: - logger.warning("工具管理系统不可用") - return [] - - try: - from sqlalchemy.orm import Session - db = next(get_db()) - - # 创建工具注册表 - registry = ToolRegistry(db) - - # 注册内置工具类 - from app.core.tools.builtin import ( - DateTimeTool, JsonTool, BaiduSearchTool, MinerUTool, TextInTool - ) - registry.register_tool_class(DateTimeTool) - registry.register_tool_class(JsonTool) - registry.register_tool_class(BaiduSearchTool) - registry.register_tool_class(MinerUTool) - registry.register_tool_class(TextInTool) - - # 获取活跃的工具 - import uuid - tools = registry.list_tools(workspace_id=uuid.UUID(workspace_id)) - active_tools = [tool for tool in tools if tool.status.value == "active"] - - # 转换为Langchain工具 - langchain_tools = [] - for tool_info in active_tools: - try: - tool_instance = registry.get_tool(tool_info.id) - if tool_instance: - langchain_tool = LangchainAdapter.convert_tool(tool_instance) - langchain_tools.append(langchain_tool) - except Exception as e: - logger.error(f"转换工具失败: {tool_info.name}, 错误: {e}") - - logger.info(f"为工作流获取了 {len(langchain_tools)} 个工具") - return langchain_tools - - except Exception as e: - logger.error(f"获取工作流工具失败: {e}") - return [] - - -class ToolWorkflowNode: - """工具工作流节点 - 在工作流中执行工具""" - - def __init__(self, node_config: dict, workflow_config: dict): - """初始化工具节点 - - Args: - node_config: 节点配置 - workflow_config: 工作流配置 - """ - self.node_config = node_config - self.workflow_config = workflow_config - self.tool_id = node_config.get("tool_id") - self.tool_parameters = node_config.get("parameters", {}) - - async def run(self, state: WorkflowState) -> WorkflowState: - """执行工具节点""" - if not TOOL_MANAGEMENT_AVAILABLE: - logger.error("工具管理系统不可用") - state["error"] = "工具管理系统不可用" - return state - - try: - from sqlalchemy.orm import Session - db = next(get_db()) - - # 创建工具执行器 - registry = ToolRegistry(db) - executor = ToolExecutor(db, registry) - - # 准备参数(支持变量替换) - parameters = self._prepare_parameters(state) - - # 执行工具 - result = await executor.execute_tool( - tool_id=self.tool_id, - parameters=parameters, - user_id=uuid.UUID(state["user_id"]), - workspace_id=uuid.UUID(state["workspace_id"]) - ) - - # 更新状态 - node_id = self.node_config.get("id") - if result.success: - state["node_outputs"][node_id] = { - "type": "tool", - "tool_id": self.tool_id, - "output": result.data, - "execution_time": result.execution_time, - "token_usage": result.token_usage - } - - # 更新运行时变量 - if isinstance(result.data, dict): - for key, value in result.data.items(): - state["runtime_vars"][f"{node_id}.{key}"] = value - else: - state["runtime_vars"][f"{node_id}.result"] = result.data - else: - state["error"] = result.error - state["error_node"] = node_id - state["node_outputs"][node_id] = { - "type": "tool", - "tool_id": self.tool_id, - "error": result.error, - "execution_time": result.execution_time - } - - return state - - except Exception as e: - logger.error(f"工具节点执行失败: {e}") - state["error"] = str(e) - state["error_node"] = self.node_config.get("id") - return state - - def _prepare_parameters(self, state: WorkflowState) -> dict: - """准备工具参数(支持变量替换)""" - parameters = {} - - for key, value in self.tool_parameters.items(): - if isinstance(value, str) and value.startswith("${") and value.endswith("}"): - # 变量替换 - var_path = value[2:-1] - - # 支持多层级变量访问,如 ${sys.message} 或 ${node1.result} - if "." in var_path: - parts = var_path.split(".") - current = state.get("variables", {}) - - for part in parts: - if isinstance(current, dict) and part in current: - current = current[part] - else: - # 尝试从运行时变量获取 - runtime_key = ".".join(parts) - current = state.get("runtime_vars", {}).get(runtime_key, value) - break - - parameters[key] = current - else: - # 简单变量 - variables = state.get("variables", {}) - parameters[key] = variables.get(var_path, value) - else: - parameters[key] = value - - return parameters - - -# 注册工具节点到NodeFactory(如果存在) -try: - from app.core.workflow.nodes import NodeFactory - if hasattr(NodeFactory, 'register_node_type'): - NodeFactory.register_node_type("tool", ToolWorkflowNode) - logger.info("工具节点已注册到工作流系统") -except Exception as e: - logger.warning(f"注册工具节点失败: {e}") \ No newline at end of file +# def get_workflow_tools(workspace_id: str, user_id: str) -> list: +# """获取工作流可用的工具列表 +# +# Args: +# workspace_id: 工作空间ID +# user_id: 用户ID +# +# Returns: +# 可用工具列表 +# """ +# if not TOOL_MANAGEMENT_AVAILABLE: +# logger.warning("工具管理系统不可用") +# return [] +# +# try: +# db = next(get_db()) +# +# # 创建工具注册表 +# registry = ToolRegistry(db) +# +# # 注册内置工具类 +# from app.core.tools.builtin import ( +# DateTimeTool, JsonTool, BaiduSearchTool, MinerUTool, TextInTool +# ) +# registry.register_tool_class(DateTimeTool) +# registry.register_tool_class(JsonTool) +# registry.register_tool_class(BaiduSearchTool) +# registry.register_tool_class(MinerUTool) +# registry.register_tool_class(TextInTool) +# +# # 获取活跃的工具 +# import uuid +# tools = registry.list_tools(workspace_id=uuid.UUID(workspace_id)) +# active_tools = [tool for tool in tools if tool.status.value == "active"] +# +# # 转换为Langchain工具 +# langchain_tools = [] +# for tool_info in active_tools: +# try: +# tool_instance = registry.get_tool(tool_info.id) +# if tool_instance: +# langchain_tool = LangchainAdapter.convert_tool(tool_instance) +# langchain_tools.append(langchain_tool) +# except Exception as e: +# logger.error(f"转换工具失败: {tool_info.name}, 错误: {e}") +# +# logger.info(f"为工作流获取了 {len(langchain_tools)} 个工具") +# return langchain_tools +# +# except Exception as e: +# logger.error(f"获取工作流工具失败: {e}") +# return [] +# +# +# class ToolWorkflowNode: +# """工具工作流节点 - 在工作流中执行工具""" +# +# def __init__(self, node_config: dict, workflow_config: dict): +# """初始化工具节点 +# +# Args: +# node_config: 节点配置 +# workflow_config: 工作流配置 +# """ +# self.node_config = node_config +# self.workflow_config = workflow_config +# self.tool_id = node_config.get("tool_id") +# self.tool_parameters = node_config.get("parameters", {}) +# +# async def run(self, state: WorkflowState) -> WorkflowState: +# """执行工具节点""" +# if not TOOL_MANAGEMENT_AVAILABLE: +# logger.error("工具管理系统不可用") +# state["error"] = "工具管理系统不可用" +# return state +# +# try: +# from sqlalchemy.orm import Session +# db = next(get_db()) +# +# # 创建工具执行器 +# registry = ToolRegistry(db) +# executor = ToolExecutor(db, registry) +# +# # 准备参数(支持变量替换) +# parameters = self._prepare_parameters(state) +# +# # 执行工具 +# result = await executor.execute_tool( +# tool_id=self.tool_id, +# parameters=parameters, +# user_id=uuid.UUID(state["user_id"]), +# workspace_id=uuid.UUID(state["workspace_id"]) +# ) +# +# # 更新状态 +# node_id = self.node_config.get("id") +# if result.success: +# state["node_outputs"][node_id] = { +# "type": "tool", +# "tool_id": self.tool_id, +# "output": result.data, +# "execution_time": result.execution_time, +# "token_usage": result.token_usage +# } +# +# # 更新运行时变量 +# if isinstance(result.data, dict): +# for key, value in result.data.items(): +# state["runtime_vars"][f"{node_id}.{key}"] = value +# else: +# state["runtime_vars"][f"{node_id}.result"] = result.data +# else: +# state["error"] = result.error +# state["error_node"] = node_id +# state["node_outputs"][node_id] = { +# "type": "tool", +# "tool_id": self.tool_id, +# "error": result.error, +# "execution_time": result.execution_time +# } +# +# return state +# +# except Exception as e: +# logger.error(f"工具节点执行失败: {e}") +# state["error"] = str(e) +# state["error_node"] = self.node_config.get("id") +# return state +# +# def _prepare_parameters(self, state: WorkflowState) -> dict: +# """准备工具参数(支持变量替换)""" +# parameters = {} +# +# for key, value in self.tool_parameters.items(): +# if isinstance(value, str) and value.startswith("${") and value.endswith("}"): +# # 变量替换 +# var_path = value[2:-1] +# +# # 支持多层级变量访问,如 ${sys.message} 或 ${node1.result} +# if "." in var_path: +# parts = var_path.split(".") +# current = state.get("variables", {}) +# +# for part in parts: +# if isinstance(current, dict) and part in current: +# current = current[part] +# else: +# # 尝试从运行时变量获取 +# runtime_key = ".".join(parts) +# current = state.get("runtime_vars", {}).get(runtime_key, value) +# break +# +# parameters[key] = current +# else: +# # 简单变量 +# variables = state.get("variables", {}) +# parameters[key] = variables.get(var_path, value) +# else: +# parameters[key] = value +# +# return parameters +# +# +# # 注册工具节点到NodeFactory(如果存在) +# try: +# from app.core.workflow.nodes import NodeFactory +# if hasattr(NodeFactory, 'register_node_type'): +# NodeFactory.register_node_type("tool", ToolWorkflowNode) +# logger.info("工具节点已注册到工作流系统") +# except Exception as e: +# logger.warning(f"注册工具节点失败: {e}") \ No newline at end of file diff --git a/api/app/services/agent_tools.py b/api/app/services/agent_tools.py index 7fe6a0c0..3ca7bddd 100644 --- a/api/app/services/agent_tools.py +++ b/api/app/services/agent_tools.py @@ -13,10 +13,6 @@ from app.core.exceptions import BusinessException, ResourceNotFoundException from app.core.error_codes import BizCode from app.core.logging_config import get_business_logger from app.repositories import workspace_repository, knowledge_repository -from app.core.tools.registry import ToolRegistry -from app.core.tools.executor import ToolExecutor -from app.core.tools.langchain_adapter import LangchainAdapter -TOOL_MANAGEMENT_AVAILABLE = True logger = get_business_logger() @@ -333,217 +329,4 @@ def create_agent_invocation_tool( ) return f"调用 Agent 失败: {str(e)}" - return invoke_agent - -def get_available_tools_for_agent( - db: Session, - workspace_id: uuid.UUID, - agent_id: Optional[uuid.UUID] = None -) -> List[Dict[str, Any]]: - """获取Agent可用的工具列表 - - Args: - db: 数据库会话 - workspace_id: 工作空间ID - agent_id: Agent ID(可选) - - Returns: - 可用工具列表 - """ - if not TOOL_MANAGEMENT_AVAILABLE: - logger.warning("工具管理系统不可用") - return [] - - try: - # 创建工具注册表 - registry = ToolRegistry(db) - - # 获取工具列表 - tools = registry.list_tools(workspace_id=workspace_id) - - # 转换为Agent可用的格式 - available_tools = [] - for tool_info in tools: - if tool_info.status.value == "active": - available_tools.append({ - "id": tool_info.id, - "name": tool_info.name, - "description": tool_info.description, - "type": tool_info.tool_type.value, - "version": tool_info.version, - "tags": tool_info.tags, - "parameters": [ - { - "name": param.name, - "type": param.type.value, - "description": param.description, - "required": param.required, - "default": param.default - } - for param in tool_info.parameters - ] - }) - - logger.info(f"为Agent获取到 {len(available_tools)} 个可用工具") - return available_tools - - except Exception as e: - logger.error(f"获取Agent可用工具失败: {e}") - return [] - - -def create_langchain_tools_for_agent( - db: Session, - workspace_id: uuid.UUID, - agent_id: Optional[uuid.UUID] = None -) -> List[Any]: - """为Agent创建Langchain兼容的工具列表 - - Args: - db: 数据库会话 - workspace_id: 工作空间ID - agent_id: Agent ID(可选) - - Returns: - Langchain工具列表 - """ - if not TOOL_MANAGEMENT_AVAILABLE: - logger.warning("工具管理系统不可用") - return [] - - try: - # 创建工具注册表 - registry = ToolRegistry(db) - - # 注册内置工具类 - from app.core.tools.builtin import ( - DateTimeTool, JsonTool, BaiduSearchTool, MinerUTool, TextInTool - ) - registry.register_tool_class(DateTimeTool) - registry.register_tool_class(JsonTool) - registry.register_tool_class(BaiduSearchTool) - registry.register_tool_class(MinerUTool) - registry.register_tool_class(TextInTool) - - # 获取活跃的工具 - tools = registry.list_tools(workspace_id=workspace_id) - active_tools = [tool for tool in tools if tool.status.value == "active"] - - # 转换为Langchain工具 - langchain_tools = [] - for tool_info in active_tools: - try: - tool_instance = registry.get_tool(tool_info.id) - if tool_instance: - langchain_tool = LangchainAdapter.convert_tool(tool_instance) - langchain_tools.append(langchain_tool) - except Exception as e: - logger.error(f"转换工具失败: {tool_info.name}, 错误: {e}") - - logger.info(f"为Agent创建了 {len(langchain_tools)} 个Langchain工具") - return langchain_tools - - except Exception as e: - logger.error(f"创建Agent Langchain工具失败: {e}") - return [] - - -class ToolExecutionInput(BaseModel): - """工具执行输入参数""" - tool_id: str = Field(..., description="工具ID") - parameters: Dict[str, Any] = Field(default_factory=dict, description="工具参数") - timeout: Optional[float] = Field(None, description="超时时间(秒)") - - -def create_tool_execution_tool( - db: Session, - workspace_id: uuid.UUID, - user_id: uuid.UUID -): - """创建工具执行工具 - - Args: - db: 数据库会话 - workspace_id: 工作空间ID - user_id: 用户ID - - Returns: - 工具执行工具 - """ - if not TOOL_MANAGEMENT_AVAILABLE: - logger.warning("工具管理系统不可用") - return None - - @tool(args_schema=ToolExecutionInput) - async def execute_tool( - tool_id: str, - parameters: Dict[str, Any] = None, - timeout: Optional[float] = None - ) -> str: - """执行指定的工具。当需要使用系统中的工具来完成特定任务时使用。 - - Args: - tool_id: 工具ID(通过工具列表获取) - parameters: 工具参数(根据工具要求提供) - timeout: 超时时间(秒,可选) - - Returns: - 工具执行结果 - """ - try: - # 创建工具执行器 - registry = ToolRegistry(db) - executor = ToolExecutor(db, registry) - - # 执行工具 - result = await executor.execute_tool( - tool_id=tool_id, - parameters=parameters or {}, - user_id=user_id, - workspace_id=workspace_id, - timeout=timeout - ) - - if result.success: - # 格式化成功结果 - if isinstance(result.data, str): - return result.data - else: - import json - return json.dumps(result.data, ensure_ascii=False, indent=2) - else: - return f"工具执行失败: {result.error}" - - except Exception as e: - logger.error(f"工具执行异常: {tool_id}, 错误: {e}") - return f"工具执行异常: {str(e)}" - - return execute_tool - - -def get_tool_management_tools( - db: Session, - workspace_id: uuid.UUID, - user_id: uuid.UUID -) -> List[Any]: - """获取工具管理相关的工具 - - Args: - db: 数据库会话 - workspace_id: 工作空间ID - user_id: 用户ID - - Returns: - 工具管理工具列表 - """ - if not TOOL_MANAGEMENT_AVAILABLE: - return [] - - tools = [] - - # 添加工具执行工具 - execution_tool = create_tool_execution_tool(db, workspace_id, user_id) - if execution_tool: - tools.append(execution_tool) - - return tools \ No newline at end of file + return invoke_agent \ No newline at end of file