From 8e893662f38e843ba833b121b827f8f4b6a9a840 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E4=BF=8A=E7=94=B7?= Date: Tue, 30 Dec 2025 10:00:37 +0800 Subject: [PATCH] feat(tool system): add mcp testing services --- api_key_mcp_server.py | 38 +++++++++++ basic_auth_mcp_server.py | 45 +++++++++++++ bearer_token_mcp_server.py | 40 ++++++++++++ mcp_base.py | 111 +++++++++++++++++++++++++++++++ simple_mcp_server.py | 130 +++++++++++++++++++++++++++++++++++++ 5 files changed, 364 insertions(+) create mode 100644 api_key_mcp_server.py create mode 100644 basic_auth_mcp_server.py create mode 100644 bearer_token_mcp_server.py create mode 100644 mcp_base.py create mode 100644 simple_mcp_server.py diff --git a/api_key_mcp_server.py b/api_key_mcp_server.py new file mode 100644 index 00000000..f611dc59 --- /dev/null +++ b/api_key_mcp_server.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +"""API Key认证MCP服务器""" + +from fastapi import FastAPI, HTTPException, Depends, Header +from typing import Optional +import uvicorn +from mcp_base import MCPRequest, handle_mcp_request, TOOLS + +app = FastAPI(title="API Key MCP Server", version="1.0.0") + +# API Key配置 +API_KEYS = {"test-api-key", "demo-key-123"} + +def verify_api_key(x_api_key: Optional[str] = Header(None)): + """验证API Key""" + if x_api_key and x_api_key in API_KEYS: + return True + raise HTTPException(status_code=401, detail="Invalid API Key") + +@app.get("/") +async def root(): + return {"name": "API Key MCP Server", "version": "1.0.0", "auth_type": "api_key"} + +@app.get("/health") +async def health(): + return {"status": "healthy", "tools": len(TOOLS), "auth_type": "api_key"} + +@app.post("/mcp") +async def mcp_handler(request: MCPRequest, _: bool = Depends(verify_api_key)): + return await handle_mcp_request(request, "API Key MCP Server") + +if __name__ == "__main__": + print("启动API Key认证MCP服务器...") + print("访问 http://localhost:8004 查看服务状态") + print("MCP端点: http://localhost:8004/mcp") + print("认证方式: API Key (Header: X-API-Key)") + print("测试API Keys: test-api-key, demo-key-123") + uvicorn.run(app, host="0.0.0.0", port=8004) \ No newline at end of file diff --git a/basic_auth_mcp_server.py b/basic_auth_mcp_server.py new file mode 100644 index 00000000..11bb5595 --- /dev/null +++ b/basic_auth_mcp_server.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +"""Basic Auth认证MCP服务器""" + +from fastapi import FastAPI, HTTPException, Depends, Header +from typing import Optional +import uvicorn +import base64 +from mcp_base import MCPRequest, handle_mcp_request, TOOLS + +app = FastAPI(title="Basic Auth MCP Server", version="1.0.0") + +# Basic Auth配置 +BASIC_AUTH_USERS = {"admin": "password", "user": "secret"} + +def verify_basic_auth(authorization: Optional[str] = Header(None)): + """验证Basic Auth""" + if authorization and authorization.startswith("Basic "): + try: + credentials = base64.b64decode(authorization.split(" ")[1]).decode() + username, password = credentials.split(":", 1) + if username in BASIC_AUTH_USERS and BASIC_AUTH_USERS[username] == password: + return True + except: + pass + raise HTTPException(status_code=401, detail="Invalid Basic Auth") + +@app.get("/") +async def root(): + return {"name": "Basic Auth MCP Server", "version": "1.0.0", "auth_type": "basic_auth"} + +@app.get("/health") +async def health(): + return {"status": "healthy", "tools": len(TOOLS), "auth_type": "basic_auth"} + +@app.post("/mcp") +async def mcp_handler(request: MCPRequest, _: bool = Depends(verify_basic_auth)): + return await handle_mcp_request(request, "Basic Auth MCP Server") + +if __name__ == "__main__": + print("启动Basic Auth认证MCP服务器...") + print("访问 http://localhost:8006 查看服务状态") + print("MCP端点: http://localhost:8006/mcp") + print("认证方式: Basic Auth (Header: Authorization: Basic )") + print("测试用户: admin:password, user:secret") + uvicorn.run(app, host="0.0.0.0", port=8006) \ No newline at end of file diff --git a/bearer_token_mcp_server.py b/bearer_token_mcp_server.py new file mode 100644 index 00000000..57d27f2f --- /dev/null +++ b/bearer_token_mcp_server.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +"""Bearer Token认证MCP服务器""" + +from fastapi import FastAPI, HTTPException, Depends, Header +from typing import Optional +import uvicorn +from mcp_base import MCPRequest, handle_mcp_request, TOOLS + +app = FastAPI(title="Bearer Token MCP Server", version="1.0.0") + +# Bearer Token配置 +BEARER_TOKENS = {"bearer-token-123", "demo-bearer-token"} + +def verify_bearer_token(authorization: Optional[str] = Header(None)): + """验证Bearer Token""" + if authorization and authorization.startswith("Bearer "): + token = authorization.split(" ")[1] + if token in BEARER_TOKENS: + return True + raise HTTPException(status_code=401, detail="Invalid Bearer Token") + +@app.get("/") +async def root(): + return {"name": "Bearer Token MCP Server", "version": "1.0.0", "auth_type": "bearer_token"} + +@app.get("/health") +async def health(): + return {"status": "healthy", "tools": len(TOOLS), "auth_type": "bearer_token"} + +@app.post("/mcp") +async def mcp_handler(request: MCPRequest, _: bool = Depends(verify_bearer_token)): + return await handle_mcp_request(request, "Bearer Token MCP Server") + +if __name__ == "__main__": + print("启动Bearer Token认证MCP服务器...") + print("访问 http://localhost:8005 查看服务状态") + print("MCP端点: http://localhost:8005/mcp") + print("认证方式: Bearer Token (Header: Authorization: Bearer )") + print("测试Bearer Tokens: bearer-token-123, demo-bearer-token") + uvicorn.run(app, host="0.0.0.0", port=8005) \ No newline at end of file diff --git a/mcp_base.py b/mcp_base.py new file mode 100644 index 00000000..f571e2fa --- /dev/null +++ b/mcp_base.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +"""MCP服务器基础模块 - 共享的模型和处理逻辑""" + +from pydantic import BaseModel +from typing import Dict, Any + +class MCPRequest(BaseModel): + jsonrpc: str = "2.0" + id: str + method: str + params: Dict[str, Any] = {} + +class MCPResponse(BaseModel): + jsonrpc: str = "2.0" + id: str + result: Any = None + error: Dict[str, Any] = None + +# 工具定义 +TOOLS = [ + { + "name": "calculator", + "description": "简单计算器", + "inputSchema": { + "type": "object", + "properties": { + "expression": {"type": "string", "description": "数学表达式"} + }, + "required": ["expression"] + } + }, + { + "name": "echo", + "description": "回显工具", + "inputSchema": { + "type": "object", + "properties": { + "message": {"type": "string", "description": "要回显的消息"} + }, + "required": ["message"] + } + } +] + +async def handle_mcp_request(request: MCPRequest, server_name: str = "MCP Server"): + """处理MCP请求""" + try: + if request.method == "initialize": + return MCPResponse( + id=request.id, + result={ + "protocolVersion": "2024-11-05", + "capabilities": {"tools": {"listChanged": True}}, + "serverInfo": {"name": server_name, "version": "1.0.0"} + } + ) + + elif request.method == "tools/list": + return MCPResponse( + id=request.id, + result={"tools": TOOLS} + ) + + elif request.method == "tools/call": + tool_name = request.params.get("name") + arguments = request.params.get("arguments", {}) + + if tool_name == "calculator": + try: + expression = arguments.get("expression", "") + result = eval(expression) + return MCPResponse( + id=request.id, + result={"content": [{"type": "text", "text": f"结果: {result}"}]} + ) + except Exception as e: + return MCPResponse( + id=request.id, + error={"code": -1, "message": f"计算错误: {str(e)}"} + ) + + elif tool_name == "echo": + message = arguments.get("message", "") + return MCPResponse( + id=request.id, + result={"content": [{"type": "text", "text": f"Echo: {message}"}]} + ) + + else: + return MCPResponse( + id=request.id, + error={"code": -1, "message": f"未知工具: {tool_name}"} + ) + + elif request.method == "ping": + return MCPResponse( + id=request.id, + result={"status": "pong"} + ) + + else: + return MCPResponse( + id=request.id, + error={"code": -1, "message": f"未知方法: {request.method}"} + ) + + except Exception as e: + return MCPResponse( + id=request.id, + error={"code": -1, "message": str(e)} + ) \ No newline at end of file diff --git a/simple_mcp_server.py b/simple_mcp_server.py new file mode 100644 index 00000000..fa299e37 --- /dev/null +++ b/simple_mcp_server.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +"""简化的MCP服务器 - 用于测试MCP工具集成""" + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from typing import Dict, Any, List +import uvicorn + +app = FastAPI(title="Simple MCP Server", version="1.0.0") + +class MCPRequest(BaseModel): + jsonrpc: str = "2.0" + id: str + method: str + params: Dict[str, Any] = {} + +class MCPResponse(BaseModel): + jsonrpc: str = "2.0" + id: str + result: Any = None + error: Dict[str, Any] = None + +# 可用工具定义 +TOOLS = [ + { + "name": "calculator", + "description": "简单计算器", + "inputSchema": { + "type": "object", + "properties": { + "expression": {"type": "string", "description": "数学表达式"} + }, + "required": ["expression"] + } + }, + { + "name": "echo", + "description": "回显工具", + "inputSchema": { + "type": "object", + "properties": { + "message": {"type": "string", "description": "要回显的消息"} + }, + "required": ["message"] + } + } +] + +@app.get("/") +async def root(): + return {"name": "Simple MCP Server", "version": "1.0.0"} + +@app.get("/health") +async def health(): + return {"status": "healthy", "tools": len(TOOLS)} + +@app.post("/mcp") +async def mcp_handler(request: MCPRequest): + """处理MCP请求""" + try: + if request.method == "initialize": + return MCPResponse( + id=request.id, + result={ + "protocolVersion": "2024-11-05", + "capabilities": {"tools": {"listChanged": True}}, + "serverInfo": {"name": "Simple MCP Server", "version": "1.0.0"} + } + ) + + elif request.method == "tools/list": + return MCPResponse( + id=request.id, + result={"tools": TOOLS} + ) + + elif request.method == "tools/call": + tool_name = request.params.get("name") + arguments = request.params.get("arguments", {}) + + if tool_name == "calculator": + try: + expression = arguments.get("expression", "") + result = eval(expression) # 注意:生产环境不要用eval + return MCPResponse( + id=request.id, + result={"content": [{"type": "text", "text": f"结果: {result}"}]} + ) + except Exception as e: + return MCPResponse( + id=request.id, + error={"code": -1, "message": f"计算错误: {str(e)}"} + ) + + elif tool_name == "echo": + message = arguments.get("message", "") + return MCPResponse( + id=request.id, + result={"content": [{"type": "text", "text": f"Echo: {message}"}]} + ) + + else: + return MCPResponse( + id=request.id, + error={"code": -1, "message": f"未知工具: {tool_name}"} + ) + + elif request.method == "ping": + return MCPResponse( + id=request.id, + result={"status": "pong"} + ) + + else: + return MCPResponse( + id=request.id, + error={"code": -1, "message": f"未知方法: {request.method}"} + ) + + except Exception as e: + return MCPResponse( + id=request.id, + error={"code": -1, "message": str(e)} + ) + +if __name__ == "__main__": + print("启动简化MCP服务器...") + print("访问 http://localhost:8002 查看服务状态") + print("MCP端点: http://localhost:8002/mcp") + uvicorn.run(app, host="0.0.0.0", port=8002) \ No newline at end of file