feat(agent tool): mcp and custom tool repair

This commit is contained in:
谢俊男
2026-01-08 14:00:28 +08:00
parent 7165d53982
commit 4ac010eda7
4 changed files with 189 additions and 110 deletions

View File

@@ -21,24 +21,35 @@ class LangchainToolWrapper(LangchainBaseTool):
# 内部工具实例
tool_instance: BaseTool = Field(..., description="内部工具实例")
# 特定操作(用于自定义工具)
operation: Optional[str] = Field(None, description="特定操作")
class Config:
arbitrary_types_allowed = True
def __init__(self, tool_instance: BaseTool, **kwargs):
def __init__(self, tool_instance: BaseTool, operation: Optional[str] = None, **kwargs):
"""初始化Langchain工具包装器
Args:
tool_instance: 内部工具实例
operation: 特定操作(用于自定义工具)
"""
# 动态创建参数schema
args_schema = LangchainAdapter._create_pydantic_schema(tool_instance.parameters)
args_schema = LangchainAdapter._create_pydantic_schema(
tool_instance.parameters, operation
)
# 构建工具名称
tool_name = tool_instance.name
if operation:
tool_name = f"{tool_instance.name}_{operation}"
super().__init__(
name=tool_instance.name,
name=tool_name,
description=tool_instance.description,
args_schema=args_schema,
tool_instance=tool_instance,
operation=operation,
**kwargs
)
@@ -58,6 +69,10 @@ class LangchainToolWrapper(LangchainBaseTool):
) -> str:
"""异步执行工具"""
try:
# 如果有特定操作,添加到参数中
if self.operation:
kwargs["operation"] = self.operation
# 执行内部工具
result = await self.tool_instance.safe_execute(**kwargs)
@@ -85,16 +100,21 @@ class LangchainAdapter:
"""
try:
# 处理MCP工具的特定工具名称
if hasattr(tool, 'tool_type') and tool.tool_type.value == 'mcp' and operation:
if hasattr(tool, 'tool_type') and tool.tool_type.value == "mcp" and operation:
# 为MCP工具创建特定工具名称的实例
mcp_tool = LangchainAdapter._create_mcp_tool_with_name(tool, operation)
wrapper = LangchainToolWrapper(tool_instance=mcp_tool)
logger.debug(f"MCP工具转换成功: {tool.name}_{operation} -> Langchain格式")
return wrapper
elif operation and tool.name in ['datetime_tool', 'json_tool']:
# 为特定操作创建工具
operation_tool = LangchainAdapter._create_operation_tool(tool, operation)
wrapper = LangchainToolWrapper(tool_instance=operation_tool)
elif operation and LangchainAdapter._tool_supports_operations(tool):
# 为支持多操作的工具创建特定操作实例
if tool.tool_type.value == "custom":
# 自定义工具直接传递operation参数
wrapper = LangchainToolWrapper(tool_instance=tool, operation=operation)
else:
# 内置工具使用OperationTool包装
operation_tool = LangchainAdapter._create_operation_tool(tool, operation)
wrapper = LangchainToolWrapper(tool_instance=operation_tool)
logger.debug(f"工具转换成功: {tool.name}_{operation} -> Langchain格式")
return wrapper
else:
@@ -108,23 +128,42 @@ class LangchainAdapter:
raise
@staticmethod
def _create_operation_tool(base_tool: BaseTool, operation: str) -> BaseTool:
"""为特定操作创建工具实例"""
from app.core.tools.builtin.operation_tool import OperationTool
return OperationTool(base_tool, operation)
def _tool_supports_operations(tool: BaseTool) -> bool:
"""检查工具是否支持多操作"""
# 内置工具中支持操作的工具
builtin_operation_tools = ['datetime_tool', 'json_tool']
# 检查内置工具
if tool.tool_type.value == "builtin" and tool.name in builtin_operation_tools:
return True
# 检查自定义工具自定义工具通过解析OpenAPI schema支持多操作
if tool.tool_type.value == "custom":
# 检查工具是否有多个操作
if hasattr(tool, '_parsed_operations') and len(tool._parsed_operations) > 1:
return True
# 或者检查参数中是否有operation参数
for param in tool.parameters:
if param.name == "operation" and param.enum:
return True
return False
@staticmethod
def _create_mcp_tool_with_name(base_tool: BaseTool, tool_name: str) -> BaseTool:
"""MCP工具创建指定工具名称的实例"""
from app.core.tools.mcp.base import MCPTool
# 创建新的配置,指定具体工具名称
new_config = base_tool.config.copy()
new_config["tool_name"] = tool_name
# 创建新的MCP工具实例
return MCPTool(f"{base_tool.tool_id}_{tool_name}", new_config)
def _create_operation_tool(base_tool: BaseTool, operation: str) -> BaseTool:
"""特定操作创建工具实例"""
if base_tool.tool_type.value == "builtin":
from app.core.tools.builtin.operation_tool import OperationTool
return OperationTool(base_tool, operation)
else:
raise ValueError(f"不支持的工具类型: {base_tool.tool_type.value}")
@staticmethod
def _create_mcp_tool_with_name(mcp_tool: BaseTool, tool_name: str) -> BaseTool:
"""为MCP工具创建指定工具名称的实例"""
mcp_tool.set_current_tool(tool_name)
return mcp_tool
@staticmethod
def convert_tools(tools: List[BaseTool]) -> List[LangchainToolWrapper]:
"""批量转换工具
@@ -148,11 +187,15 @@ class LangchainAdapter:
return converted_tools
@staticmethod
def _create_pydantic_schema(parameters: List[ToolParameter]) -> Type[BaseModel]:
def _create_pydantic_schema(
parameters: List[ToolParameter],
operation: Optional[str] = None
) -> Type[BaseModel]:
"""根据工具参数创建Pydantic schema
Args:
parameters: 工具参数列表
operation: 特定操作(用于过滤参数)
Returns:
Pydantic模型类
@@ -161,7 +204,12 @@ class LangchainAdapter:
fields = {}
annotations = {}
for param in parameters:
# 如果指定了operation过滤掉operation参数
filtered_params = parameters
if operation:
filtered_params = [p for p in parameters if p.name != "operation"]
for param in filtered_params:
# 确定Python类型
python_type = LangchainAdapter._get_python_type(param.type)

View File

@@ -16,19 +16,15 @@ class MCPTool(BaseTool):
super().__init__(tool_id, config)
self.server_url = config.get("server_url", "")
self.connection_config = config.get("connection_config", {})
self.tool_name = config.get("tool_name", "") # 特定工具名称
self.tool_schema = config.get("tool_schema", {}) # 工具参数 schema
self.available_tools = config.get("available_tools", [])
@property
def name(self) -> str:
return f"mcp_{self.tool_name}" if self.tool_name else f"mcp_tool_{self.tool_id[:8]}"
return f"mcp_tool_{self.tool_id[:8]}"
@property
def description(self) -> str:
if self.tool_schema.get("description"):
return self.tool_schema["description"]
return f"MCP工具: {self.tool_name}" if self.tool_name else f"MCP工具 - 连接到 {self.server_url}"
return f"MCP工具 - 连接到 {self.server_url}"
@property
def tool_type(self) -> ToolType:
@@ -36,20 +32,36 @@ class MCPTool(BaseTool):
@property
def parameters(self) -> List[ToolParameter]:
"""从 MCP 工具 schema 生成参数"""
if not self.tool_schema:
return [ToolParameter(
"""根据工具名称返回对应参数"""
# 如果有指定的工具名称,从 available_tools 中获取参数
tool_name = getattr(self, '_current_tool_name', None)
if tool_name and self.available_tools:
for tool_info in self.available_tools:
if tool_info.get("tool_name") == tool_name:
arguments = tool_info.get("arguments", {})
return self._generate_parameters_from_schema(arguments)
# 默认返回通用参数
return [
ToolParameter(
name="tool_name",
type=ParameterType.STRING,
description="要执行的工具名称",
required=True
),
ToolParameter(
name="arguments",
type=ParameterType.OBJECT,
description="工具参数",
required=False,
default={}
)]
# 解析 MCP 工具的 inputSchema
input_schema = self.tool_schema.get("inputSchema", {})
properties = input_schema.get("properties", {})
required_fields = input_schema.get("required", [])
)
]
def _generate_parameters_from_schema(self, arguments: Dict[str, Any]) -> List[ToolParameter]:
"""从参数schema生成参数列表"""
properties = arguments.get("properties", {})
required_fields = arguments.get("required", [])
params = []
for param_name, param_def in properties.items():
@@ -69,7 +81,7 @@ class MCPTool(BaseTool):
return params
def _convert_json_type_to_parameter_type(self, json_type: str) -> ParameterType:
"""转换 JSON Schema 类型到 ParameterType"""
"""转换JSON Schema类型到ParameterType"""
type_mapping = {
"string": ParameterType.STRING,
"integer": ParameterType.INTEGER,
@@ -80,25 +92,27 @@ class MCPTool(BaseTool):
}
return type_mapping.get(json_type, ParameterType.STRING)
def set_current_tool(self, tool_name: str):
"""设置当前工具名称,用于获取特定参数"""
self._current_tool_name = tool_name
async def execute(self, **kwargs) -> ToolResult:
"""执行 MCP 工具"""
"""执行MCP工具"""
start_time = time.time()
try:
tool_name = kwargs.get("tool_name")
if not tool_name:
raise Exception("未指定工具名称")
arguments = kwargs.get("arguments", {})
from .client import SimpleMCPClient
client = SimpleMCPClient(self.server_url, self.connection_config)
async with client:
# 使用指定的工具名称或默认第一个工具
tool_name_to_use = self.tool_name
if not tool_name_to_use and self.available_tools:
tool_name_to_use = self.available_tools[0]
if not tool_name_to_use:
raise Exception("未指定工具名称且无可用工具")
result = await client.call_tool(tool_name_to_use, kwargs)
result = await client.call_tool(tool_name, arguments)
execution_time = time.time() - start_time
return ToolResult.success_result(
@@ -108,7 +122,7 @@ class MCPTool(BaseTool):
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"MCP工具执行失败: {self.tool_name or 'unknown'}, 错误: {e}")
logger.error(f"MCP工具执行失败: {kwargs.get('tool_name', 'unknown')}, 错误: {e}")
return ToolResult.error_result(
error=str(e),
error_code="MCP_EXECUTION_ERROR",