feat(tools): add OpenClaw remote agent tool integration

- Detect x-openclaw flag in OpenAPI schema and init dedicated config
- Implement multimodal input/output (image download, compress, base64)
- Add OpenClaw connection test and status validation in tool service
- Fix auth_config token check to support both api_key and bearer_token
- Inject runtime context (user_id, conversation_id, files) in chat services
This commit is contained in:
miao
2026-04-07 10:35:14 +08:00
parent 460c86cd94
commit e298b38de9
4 changed files with 467 additions and 6 deletions

View File

@@ -30,9 +30,60 @@ class CustomTool(BaseTool):
self.auth_config = config.get("auth_config", {})
self.base_url = config.get("base_url", "")
self.timeout = config.get("timeout", 30)
# 解析schema
self._parsed_operations = self._parse_openapi_schema()
#===========OpenClaw特殊判断取到OpenClaw特殊配置==========
schema = self.schema_content
if isinstance(schema, str):
try:
schema = json.loads(schema)
self.schema_content = schema
except json.JSONDecodeError:
schema = {}
info = schema.get("info", {}) if isinstance(schema, dict) else {}
self._is_openclaw = info.get("x-openclaw", False)
if self._is_openclaw:
# 从扩展字段读取 OpenClaw 配置
self._openclaw_agent_id = info.get("x-openclaw-agent-id", "main")
self._openclaw_model = info.get("x-openclaw-default-model", "openclaw")
self._openclaw_session_strategy = info.get(
"x-openclaw-session-strategy", "by_user")
self._openclaw_timeout = info.get("x-openclaw-timeout", 60)
self._openclaw_input_mode = info.get("x-openclaw-input-mode", "text")
self._openclaw_output_mode = info.get("x-openclaw-output-mode", "text")
# 从 servers 读取 base_url
servers = schema.get("servers", [])
if servers:
self.base_url = servers[0].get("url", "")
# 从 auth_config 读取 token兼容 api_key 和 bearer_token 两种认证方式)
self._openclaw_token = (
self.auth_config.get("api_key") # api_key 认证方式
or self.auth_config.get("token") # bearer_token 认证方式
or ""
)
# 覆盖 timeout
self.timeout = self._openclaw_timeout
# 运行时上下文(后续注入)
self._user_id = "anonymous"
self._conversation_id = None
self._uploaded_files = [] # 新增:用户上传的文件
# 跳过 Schema 解析
self._parsed_operations = {}
logger.info(
f"检测到 OpenClaw 工具: agent_id={self._openclaw_agent_id}, "
f"base_url={self.base_url}, "
f"input_mode={self._openclaw_input_mode}, "
f"output_mode={self._openclaw_output_mode}")
else:
# 解析schema
self._parsed_operations = self._parse_openapi_schema()
@property
def name(self) -> str:
@@ -58,6 +109,31 @@ class CustomTool(BaseTool):
@property
def parameters(self) -> List[ToolParameter]:
"""工具参数定义"""
# ========== OpenClaw 特判 根据输入模式解析是否需要image_url ==========
if self._is_openclaw:
params = [
ToolParameter(
name="message",
type=ParameterType.STRING,
description="发送给 OpenClaw Agent 的文本请求内容",
required=True
)
]
# 多模态输入模式下,增加 image_url 参数
if self._openclaw_input_mode == "multimodal":
params.append(ToolParameter(
name="image_url",
type=ParameterType.STRING,
description=(
"可选附带的图片URL或base64 data URI"
"(如 data:image/png;base64,...)。"
"传入后 Agent 可以理解图片内容。"
),
required=False
))
return params
# ========== 特判结束 ==========
params = []
# 添加操作选择参数
@@ -90,6 +166,10 @@ class CustomTool(BaseTool):
async def execute(self, **kwargs) -> ToolResult:
"""执行自定义工具"""
# ========== OpenClaw 特判 ==========
if self._is_openclaw:
return await self._execute_openclaw(**kwargs)
# ========== 特判结束 ==========
start_time = time.time()
try:
@@ -130,6 +210,269 @@ class CustomTool(BaseTool):
execution_time=execution_time
)
#=============openclaw执行函数开始===============
async def _execute_openclaw(self, **kwargs) -> ToolResult:
"""OpenClaw 专属执行逻辑(支持多模态输入)"""
start_time = time.time()
try:
message = kwargs.get("message", "")
# 从用户实际上传的文件中提取图片 URL
image_url = None
if self._uploaded_files:
for f in self._uploaded_files:
if f.get("type") == "image":
source = f.get("source", {})
if source.get("type") == "base64":
media_type = source.get("media_type", "image/jpeg")
data = source.get("data", "")
image_url = f"data:{media_type};base64,{data}"
elif f.get("image"):
# DashScope 格式:{"type": "image", "image": "url"}
image_url = f.get("image")
elif f.get("url"):
# 其他格式:{"type": "image", "url": "https://..."}
image_url = f.get("url")
break # 只取第一张图片
# 如果 image_url 是服务器中转 URL直接下载图片转 base64
# 避免 OSS 签名 URL 在重定向解析过程中被破坏
if image_url and not image_url.startswith("data:"):
try:
import base64
from io import BytesIO
from PIL import Image
MAX_RAW_SIZE = 4 * 1024 * 1024 # 超过 4MB 则压缩
async with aiohttp.ClientSession() as _session:
async with _session.get(image_url, allow_redirects=True, timeout=aiohttp.ClientTimeout(total=30)) as _resp:
if _resp.status == 200:
content_type = _resp.headers.get("Content-Type", "image/jpeg")
if content_type.startswith("image/"):
img_bytes = await _resp.read()
original_size = len(img_bytes)
logger.info(f"OpenClaw 下载图片: size={original_size} bytes, type={content_type}")
if original_size > MAX_RAW_SIZE:
img = Image.open(BytesIO(img_bytes))
if img.mode in ("RGBA", "P", "LA"):
img = img.convert("RGB")
max_side = 2048
if max(img.size) > max_side:
img.thumbnail((max_side, max_side), Image.LANCZOS)
buf = BytesIO()
img.save(buf, format="JPEG", quality=75, optimize=True)
img_bytes = buf.getvalue()
content_type = "image/jpeg"
logger.info(f"OpenClaw 图片已压缩: {original_size} -> {len(img_bytes)} bytes")
b64_data = base64.b64encode(img_bytes).decode("utf-8")
image_url = f"data:{content_type};base64,{b64_data}"
logger.info(f"OpenClaw 图片已转为 base64, size={len(img_bytes)} bytes")
else:
logger.warning(f"OpenClaw 图片 URL 返回非图片类型: {content_type}")
else:
logger.warning(f"OpenClaw 下载图片失败: HTTP {_resp.status}")
except Exception as e:
logger.warning(f"OpenClaw 下载图片失败,使用原始 URL: {e}")
if not message:
return ToolResult.error_result(
error="message 参数不能为空",
error_code="OPENCLAW_INVALID_INPUT",
execution_time=time.time() - start_time)
url = f"{self.base_url.rstrip('/')}/v1/responses"
#请求头
headers = {
"Authorization": f"Bearer {self._openclaw_token}",
"Content-Type": "application/json",
"x-openclaw-agent-id": self._openclaw_agent_id
}
# session 路由
if (self._openclaw_session_strategy == "by_conversation"
and self._conversation_id):
user_field = f"conv-{self._conversation_id}"
else:
user_field = f"user-{self._user_id}"
# 根据 input_mode 和是否有图片构造 input
input_field = self._build_openclaw_input(message, image_url)
#请求体
body = {
"model": self._openclaw_model,
"user": user_field,
"input": input_field,
"stream": False
}
logger.info(f"OpenClaw 请求体: {json.dumps(body, ensure_ascii=False)[:1000]}")
timeout_config = aiohttp.ClientTimeout(total=self.timeout)
#请求
async with aiohttp.ClientSession(timeout=timeout_config) as session:
async with session.post(url, json=body, headers=headers) as resp:
execution_time = time.time() - start_time
if resp.status >= 400:
error_text = await resp.text()
_img_preview2 = (image_url[:100] + "...") if image_url and len(image_url) > 100 else image_url
logger.error(
f"OpenClaw 调用失败: HTTP {resp.status}, "
f"url={url}, agent_id={self._openclaw_agent_id}, "
f"has_image={bool(image_url)}, image_url={_img_preview2}, "
f"input_type={'multimodal' if isinstance(input_field, list) else 'text'}, "
f"error_response={error_text[:1000]}"
)
return ToolResult.error_result(
error=f"OpenClaw HTTP {resp.status}: {error_text[:500]}",
error_code="OPENCLAW_HTTP_ERROR",
execution_time=execution_time)
data = await resp.json()
# 根据 output_mode 解析响应
result = self._extract_openclaw_response(
data, self._openclaw_output_mode)
display_text = self._format_openclaw_result(result)
logger.info(
"OpenClaw 调用成功",
extra={
"tool_id": self.tool_id,
"agent_id": self._openclaw_agent_id,
"has_images": len(result["images"]) > 0,
"execution_time": execution_time
})
return ToolResult.success_result(
data=display_text, execution_time=execution_time)
except aiohttp.ClientError as e:
return ToolResult.error_result(
error=f"OpenClaw 网络连接失败: {str(e)}",
error_code="OPENCLAW_NETWORK_ERROR",
execution_time=time.time() - start_time)
except Exception as e:
return ToolResult.error_result(
error=f"OpenClaw 调用失败: {str(e)}",
error_code="OPENCLAW_EXECUTION_ERROR",
execution_time=time.time() - start_time)
def _build_openclaw_input(self, message: str, image_url: str = None):
"""根据 input_mode 和是否有图片构造 OpenClaw input 字段
纯文本模式或无图片 → 返回字符串
多模态模式且有图片 → 返回结构化 item 数组
"""
if not image_url or self._openclaw_input_mode != "multimodal":
return message
# 构造多模态 content 数组
content_parts = [
{"type": "input_text", "text": message}
]
if image_url.startswith("data:"):
# base64 data URI: data:image/png;base64,iVBORw0KGgo...
try:
header, data = image_url.split(",", 1)
media_type = header.split(":")[1].split(";")[0]
content_parts.append({
"type": "input_image",
"source": {
"type": "base64",
"media_type": media_type,
"data": data
}
})
except (ValueError, IndexError):
logger.warning("无法解析 base64 data URI回退为纯文本输入")
return message
else:
# URL 引用
content_parts.append({
"type": "input_image",
"source": {
"type": "url",
"url": image_url
}
})
return [{
"type": "message",
"role": "user",
"content": content_parts
}]
@staticmethod
def _extract_openclaw_response(response_data: Dict[str, Any],
output_mode: str = "text") -> Dict[str, Any]:
"""从 OpenClaw 响应中提取文本和图片
响应格式:
{"output": [{"type": "message", "content": [
{"type": "output_text", "text": "..."},
{"type": "output_image", "image_url": "..."}
]}]}
返回:
{"text": "文本内容", "images": [{"url": "...", "media_type": "image/png"}]}
"""
output = response_data.get("output", [])
texts = []
images = []
for item in output:
if item.get("type") == "message":
for content in item.get("content", []):
content_type = content.get("type")
if content_type == "output_text":
text = content.get("text", "")
if text:
texts.append(text)
elif content_type == "output_image" and output_mode == "multimodal":
image_url = content.get("image_url", "")
if image_url:
images.append({
"url": image_url,
"media_type": content.get("media_type", "image/png")
})
text_result = "\n".join(texts) if texts else ""
# text 模式下只返回文本(向后兼容)
if output_mode == "text":
return {"text": text_result or str(response_data), "images": []}
return {"text": text_result, "images": images}
@staticmethod
def _format_openclaw_result(result: Dict[str, Any]) -> str:
"""将解析结果格式化为返回给 LLM 的字符串
纯文本 → 直接返回
有图片 → 将图片以 Markdown 格式嵌入文本
"""
text = result.get("text", "")
images = result.get("images", [])
if not images:
return text or "OpenClaw 返回了空内容)"
parts = []
if text:
parts.append(text)
for i, img in enumerate(images):
parts.append(f"![OpenClaw 生成的图片 {i+1}]({img['url']})")
return "\n\n".join(parts)
#=============openclaw执行函数结束================
def _parse_openapi_schema(self) -> Dict[str, Any]:
"""解析OpenAPI schema"""
operations = {}

View File

@@ -165,7 +165,18 @@ class AppChatService:
multimodal_service = MultimodalService(self.db, model_info)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件")
#============为 OpenClaw 工具注入会话session======
# 为 OpenClaw 工具注入运行时上下文
for t in tools:
if hasattr(t, 'tool_instance') and hasattr(t.tool_instance, '_is_openclaw'):
if t.tool_instance._is_openclaw:
t.tool_instance._user_id = user_id or "anonymous"
t.tool_instance._conversation_id = (
str(conversation_id) if conversation_id else None)
# 注入用户上传的文件
if processed_files:
t.tool_instance._uploaded_files = processed_files
#============为 OpenClaw 工具注入会话session======
# 调用 Agent支持多模态
result = await agent.chat(
message=message,
@@ -413,6 +424,17 @@ class AppChatService:
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件")
#============为 OpenClaw 工具注入运行时上下文======
for t in tools:
if hasattr(t, 'tool_instance') and hasattr(t.tool_instance, '_is_openclaw'):
if t.tool_instance._is_openclaw:
t.tool_instance._user_id = user_id or "anonymous"
t.tool_instance._conversation_id = (
str(conversation_id) if conversation_id else None)
if processed_files:
t.tool_instance._uploaded_files = processed_files
#============为 OpenClaw 工具注入运行时上下文结束======
# 流式调用 Agent支持多模态同时并行启动 TTS
full_content = ""
full_reasoning = ""

View File

@@ -640,7 +640,18 @@ class AgentRunService:
multimodal_service = MultimodalService(self.db, model_info)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件provider={provider}")
#================= 为 OpenClaw 工具注入运行时上下文==========
for t in tools:
logger.info(f"检查工具: {type(t).__name__}, has_tool_instance={hasattr(t, 'tool_instance')}, is_openclaw={getattr(getattr(t, 'tool_instance', None), '_is_openclaw', 'N/A')}")
if hasattr(t, 'tool_instance') and hasattr(t.tool_instance, '_is_openclaw'):
if t.tool_instance._is_openclaw:
t.tool_instance._user_id = user_id or "anonymous"
t.tool_instance._conversation_id = (
str(conversation_id) if conversation_id else None)
if processed_files:
t.tool_instance._uploaded_files = processed_files
logger.info(f"已注入 _uploaded_files, 数量: {len(processed_files)}")
#================= 为 OpenClaw 工具注入运行时上下文结束==========
# 7. 知识库检索
context = None
@@ -890,7 +901,18 @@ class AgentRunService:
multimodal_service = MultimodalService(self.db, model_info)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件provider={provider}")
#============为 OpenClaw 工具注入会话session======
# 为 OpenClaw 工具注入运行时上下文
for t in tools:
if hasattr(t, 'tool_instance') and hasattr(t.tool_instance, '_is_openclaw'):
if t.tool_instance._is_openclaw:
t.tool_instance._user_id = user_id or "anonymous"
t.tool_instance._conversation_id = (
str(conversation_id) if conversation_id else None)
# 注入用户上传的文件
if processed_files:
t.tool_instance._uploaded_files = processed_files
#============为 OpenClaw 工具注入会话session======
# 7. 知识库检索
context = None

View File

@@ -330,6 +330,20 @@ class ToolService:
if config.tool_type == ToolType.MCP.value:
return await self._test_mcp_connection(config)
elif config.tool_type == ToolType.CUSTOM.value:
# ========== 测试工具连接 OpenClaw 特判 ==========
custom_config = self.custom_repo.find_by_tool_id(self.db, config.id)
if custom_config and custom_config.schema_content:
schema = custom_config.schema_content
if isinstance(schema, str):
try:
schema = json.loads(schema)
except json.JSONDecodeError:
schema = {}
#请求头中包含OpenClaw字段
if isinstance(schema, dict) and schema.get("info", {}).get("x-openclaw"):
return await self._test_openclaw_connection(custom_config, schema)
# ========== OpenClaw 特判结束 ==========
#正常自定义工具逻辑
return await self._test_custom_connection(config)
elif config.tool_type == ToolType.BUILTIN.value:
return await self._test_builtin_connection(config)
@@ -339,6 +353,45 @@ class ToolService:
except Exception as e:
return {"success": False, "message": f"测试失败: {str(e)}"}
#=============测试openclaw连接 特判===============
async def _test_openclaw_connection(
self, custom_config: CustomToolConfig, schema: dict
) -> Dict[str, Any]:
"""测试 OpenClaw 连接"""
try:
info = schema.get("info", {})
servers = schema.get("servers", [])
base_url = servers[0]["url"] if servers else ""
token = (custom_config.auth_config or {}).get("token", "")
agent_id = info.get("x-openclaw-agent-id", "main")
model = info.get("x-openclaw-default-model", "openclaw")
url = f"{base_url.rstrip('/')}/v1/responses"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"x-openclaw-agent-id": agent_id
}
body = {
"model": model,
"user": "connection-test",
"input": "hi",
"stream": False
}
timeout_config = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout_config) as session:
async with session.post(url, json=body, headers=headers) as resp:
if resp.status < 400:
return {"success": True, "message": "OpenClaw 连接成功"}
error_text = await resp.text()
return {
"success": False,
"message": f"OpenClaw HTTP {resp.status}: {error_text[:200]}"
}
except Exception as e:
return {"success": False, "message": f"OpenClaw 连接失败: {str(e)}"}
#=============测试openclaw连接结束===========
def ensure_builtin_tools_initialized(self, tenant_id: uuid.UUID):
"""确保内置工具已初始化"""
existing = self.tool_repo.exists_builtin_for_tenant(self.db, tenant_id)
@@ -1139,6 +1192,27 @@ class ToolService:
custom_config = self.db.query(CustomToolConfig).filter(
CustomToolConfig.id == tool_config.id
).first()
# ========== 更新工具 OpenClaw 特判 ==========
if custom_config and custom_config.schema_content:
schema = custom_config.schema_content
if isinstance(schema, str):
try:
schema = json.loads(schema)
except json.JSONDecodeError:
schema = {}
info = schema.get("info", {}) if isinstance(schema, dict) else {}
if info.get("x-openclaw"):
servers = schema.get("servers", [])
has_url = bool(servers and servers[0].get("url"))
has_agent_id = bool(info.get("x-openclaw-agent-id"))
has_token = bool(custom_config.auth_config
and custom_config.auth_config.get("api_key"))
if has_url and has_agent_id and has_token:
tool_config.status = ToolStatus.AVAILABLE.value
else:
tool_config.status = ToolStatus.UNCONFIGURED.value
return
# ========== OpenClaw 特判结束 ==========
if custom_config and tool_config.name and (custom_config.schema_content or custom_config.schema_url):
tool_config.status = ToolStatus.AVAILABLE.value