From bf1dfd97f05216b1e69dba14ca038276a863f378 Mon Sep 17 00:00:00 2001 From: Ke Sun Date: Fri, 26 Dec 2025 11:43:51 +0800 Subject: [PATCH] feat(memory-api): implement memory read/write API service endpoints - Add MemoryAPIService with read_memory and write_memory methods for managing user memories - Create memory_api_schema.py with request/response schemas for read and write operations - Implement write_memory_api_service endpoint for storing memory content with configurable storage backends - Implement read_memory_api_service endpoint for querying memories with context-aware responses - Add memory-specific error codes (MEMORY_WRITE_FAILED, MEMORY_READ_FAILED, MEMORY_CONFIG_NOT_FOUND) to error_codes.py --- .../service/memory_api_controller.py | 89 +++++-- api/app/core/error_codes.py | 11 + api/app/schemas/memory_api_schema.py | 134 ++++++++++ api/app/services/memory_api_service.py | 245 ++++++++++++++++++ 4 files changed, 455 insertions(+), 24 deletions(-) create mode 100644 api/app/schemas/memory_api_schema.py create mode 100644 api/app/services/memory_api_service.py diff --git a/api/app/controllers/service/memory_api_controller.py b/api/app/controllers/service/memory_api_controller.py index 71b54f8c..30ca1306 100644 --- a/api/app/controllers/service/memory_api_controller.py +++ b/api/app/controllers/service/memory_api_controller.py @@ -1,14 +1,19 @@ """Memory 服务接口 - 基于 API Key 认证""" -import uuid -from fastapi import APIRouter, Depends, Request, Body -from sqlalchemy.orm import Session - -from app.db import get_db -from app.core.response_utils import success -from app.core.logging_config import get_business_logger from app.core.api_key_auth import require_api_key +from app.core.logging_config import get_business_logger +from app.core.response_utils import success +from app.db import get_db from app.schemas.api_key_schema import ApiKeyAuth +from app.schemas.memory_api_schema import ( + MemoryReadRequest, + MemoryReadResponse, + MemoryWriteRequest, + MemoryWriteResponse, +) +from app.services.memory_api_service import MemoryAPIService +from fastapi import APIRouter, Body, Depends, Request +from sqlalchemy.orm import Session router = APIRouter(prefix="/memory", tags=["V1 - Memory API"]) logger = get_business_logger() @@ -20,27 +25,63 @@ async def get_memory_info(): return success(data={}, msg="Memory API - Coming Soon") -# /v1/memory/chat -@router.post("/chat") +@router.post("/write_api_service") @require_api_key(scopes=["memory"]) -async def chat_with_agent_demo( +async def write_memory_api_service( request: Request, api_key_auth: ApiKeyAuth = None, db: Session = Depends(get_db), - message: str = Body(..., description="聊天消息内容"), + payload: MemoryWriteRequest = Body(..., embed=False), + ): """ - Agent 聊天接口demo - - scopes: 所需的权限范围列表["app", "rag", "memory"] - - Args: - message: 请求参数 - request: 声明请求 - api_key_auth: 包含验证后的API Key 信息 - db: db_session + Write memory to storage. + + Stores memory content for the specified end user using the Memory API Service. """ - logger.info(f"API Key Auth: {api_key_auth}") - logger.info(f"Resource ID: {api_key_auth.resource_id}") - logger.info(f"Message: {message}") - return success(data={"received": True}, msg="消息已接收") \ No newline at end of file + logger.info(f"Memory write request - end_user_id: {payload.end_user_id}") + + memory_api_service = MemoryAPIService(db) + + result = await memory_api_service.write_memory( + workspace_id=api_key_auth.workspace_id, + end_user_id=payload.end_user_id, + message=payload.message, + config_id=payload.config_id, + storage_type=payload.storage_type, + user_rag_memory_id=payload.user_rag_memory_id, + ) + + logger.info(f"Memory write successful for end_user: {payload.end_user_id}") + return success(data=MemoryWriteResponse(**result).model_dump(), msg="Memory written successfully") + + +@router.post("/read_api_service") +@require_api_key(scopes=["memory"]) +async def read_memory_api_service( + request: Request, + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), + payload: MemoryReadRequest = Body(..., embed=False), +): + """ + Read memory from storage. + + Queries and retrieves memories for the specified end user with context-aware responses. + """ + logger.info(f"Memory read request - end_user_id: {payload.end_user_id}") + + memory_api_service = MemoryAPIService(db) + + result = await memory_api_service.read_memory( + workspace_id=api_key_auth.workspace_id, + end_user_id=payload.end_user_id, + message=payload.message, + search_switch=payload.search_switch, + config_id=payload.config_id, + storage_type=payload.storage_type, + user_rag_memory_id=payload.user_rag_memory_id, + ) + + logger.info(f"Memory read successful for end_user: {payload.end_user_id}") + return success(data=MemoryReadResponse(**result).model_dump(), msg="Memory read successfully") diff --git a/api/app/core/error_codes.py b/api/app/core/error_codes.py index 6bb8ac29..d0aa9cc1 100644 --- a/api/app/core/error_codes.py +++ b/api/app/core/error_codes.py @@ -78,6 +78,11 @@ class BizCode(IntEnum): EMBEDDING_FAILED = 9002 SEARCH_FAILED = 9003 + # Memory API(95xx) + MEMORY_WRITE_FAILED = 9501 + MEMORY_READ_FAILED = 9502 + MEMORY_CONFIG_NOT_FOUND = 9503 + # 系统(100xx) INTERNAL_ERROR = 10001 DB_ERROR = 10002 @@ -148,6 +153,12 @@ HTTP_MAPPING = { BizCode.INDEX_BUILD_FAILED: 500, BizCode.EMBEDDING_FAILED: 500, BizCode.SEARCH_FAILED: 500, + + # Memory API 错误码映射 + BizCode.MEMORY_WRITE_FAILED: 500, + BizCode.MEMORY_READ_FAILED: 500, + BizCode.MEMORY_CONFIG_NOT_FOUND: 400, + BizCode.INTERNAL_ERROR: 500, BizCode.DB_ERROR: 500, BizCode.SERVICE_UNAVAILABLE: 503, diff --git a/api/app/schemas/memory_api_schema.py b/api/app/schemas/memory_api_schema.py new file mode 100644 index 00000000..98d257c1 --- /dev/null +++ b/api/app/schemas/memory_api_schema.py @@ -0,0 +1,134 @@ +"""Memory API Service request/response schemas. + +This module defines Pydantic schemas for the Memory API Service endpoints, +including request validation and response structures for read and write operations. +""" + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field, field_validator + + +class MemoryWriteRequest(BaseModel): + """Request schema for memory write operation. + + Attributes: + end_user_id: End user identifier (required) + message: Message content to store (required) + config_id: Optional memory configuration ID + storage_type: Storage backend type (neo4j or rag) + user_rag_memory_id: Optional RAG memory ID for rag storage type + """ + end_user_id: str = Field(..., description="End user ID (required)") + message: str = Field(..., description="Message content to store") + config_id: Optional[str] = Field(None, description="Memory configuration ID") + storage_type: str = Field("neo4j", description="Storage type: neo4j or rag") + user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID") + + @field_validator("end_user_id") + @classmethod + def validate_end_user_id(cls, v: str) -> str: + """Validate that end_user_id is not empty.""" + if not v or not v.strip(): + raise ValueError("end_user_id is required and cannot be empty") + return v.strip() + + @field_validator("message") + @classmethod + def validate_message(cls, v: str) -> str: + """Validate that message is not empty.""" + if not v or not v.strip(): + raise ValueError("message is required and cannot be empty") + return v + + @field_validator("storage_type") + @classmethod + def validate_storage_type(cls, v: str) -> str: + """Validate that storage_type is either neo4j or rag.""" + valid_types = {"neo4j", "rag"} + if v.lower() not in valid_types: + raise ValueError(f"storage_type must be one of: {', '.join(valid_types)}") + return v.lower() + + +class MemoryReadRequest(BaseModel): + """Request schema for memory read operation. + + Attributes: + end_user_id: End user identifier (required) + message: Query message (required) + search_switch: Search mode (0=verify, 1=direct, 2=context) + config_id: Optional memory configuration ID + storage_type: Storage backend type (neo4j or rag) + user_rag_memory_id: Optional RAG memory ID for rag storage type + """ + end_user_id: str = Field(..., description="End user ID (required)") + message: str = Field(..., description="Query message") + search_switch: str = Field( + "0", + description="Search mode: 0=verify, 1=direct, 2=context" + ) + config_id: Optional[str] = Field(None, description="Memory configuration ID") + storage_type: str = Field("neo4j", description="Storage type: neo4j or rag") + user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID") + + @field_validator("end_user_id") + @classmethod + def validate_end_user_id(cls, v: str) -> str: + """Validate that end_user_id is not empty.""" + if not v or not v.strip(): + raise ValueError("end_user_id is required and cannot be empty") + return v.strip() + + @field_validator("message") + @classmethod + def validate_message(cls, v: str) -> str: + """Validate that message is not empty.""" + if not v or not v.strip(): + raise ValueError("message is required and cannot be empty") + return v + + @field_validator("storage_type") + @classmethod + def validate_storage_type(cls, v: str) -> str: + """Validate that storage_type is either neo4j or rag.""" + valid_types = {"neo4j", "rag"} + if v.lower() not in valid_types: + raise ValueError(f"storage_type must be one of: {', '.join(valid_types)}") + return v.lower() + + @field_validator("search_switch") + @classmethod + def validate_search_switch(cls, v: str) -> str: + """Validate that search_switch is a valid mode.""" + valid_modes = {"0", "1", "2"} + if v not in valid_modes: + raise ValueError(f"search_switch must be one of: {', '.join(valid_modes)}") + return v + + +class MemoryWriteResponse(BaseModel): + """Response schema for memory write operation. + + Attributes: + status: Operation status (success or failed) + end_user_id: End user ID that was written to + """ + status: str = Field(..., description="Operation status: success or failed") + end_user_id: str = Field(..., description="End user ID") + + +class MemoryReadResponse(BaseModel): + """Response schema for memory read operation. + + Attributes: + answer: Generated answer from memory retrieval + intermediate_outputs: Intermediate retrieval outputs + end_user_id: End user ID that was queried + """ + answer: str = Field(..., description="Generated answer") + intermediate_outputs: List[Dict[str, Any]] = Field( + default_factory=list, + description="Intermediate retrieval outputs" + ) + end_user_id: str = Field(..., description="End user ID") diff --git a/api/app/services/memory_api_service.py b/api/app/services/memory_api_service.py new file mode 100644 index 00000000..0ae2b965 --- /dev/null +++ b/api/app/services/memory_api_service.py @@ -0,0 +1,245 @@ +""" +Memory API Service + +Provides external access to memory read and write operations through API Key authentication. +This service validates inputs and delegates to MemoryAgentService for core memory operations. +""" + +import uuid +from typing import Any, Dict, Optional + +from app.core.error_codes import BizCode +from app.core.exceptions import BusinessException, ResourceNotFoundException +from app.core.logging_config import get_logger +from app.models.app_model import App +from app.models.end_user_model import EndUser +from app.schemas.memory_config_schema import ConfigurationError +from app.services.memory_agent_service import MemoryAgentService +from sqlalchemy.orm import Session + +logger = get_logger(__name__) + + +class MemoryAPIService: + """Service for memory API operations with validation and delegation to MemoryAgentService. + + This service provides a thin layer that: + 1. Validates end_user exists and belongs to the authorized workspace + 2. Maps end_user_id to group_id for memory operations + 3. Delegates to MemoryAgentService for actual memory read/write operations + """ + + def __init__(self, db: Session): + """Initialize MemoryAPIService. + + Args: + db: SQLAlchemy database session + """ + self.db = db + + def validate_end_user( + self, + end_user_id: str, + workspace_id: uuid.UUID + ) -> EndUser: + """Validate that end_user exists and belongs to the workspace. + + Args: + end_user_id: End user ID to validate + workspace_id: Workspace ID from API key authorization + + Returns: + EndUser object if valid + + Raises: + ResourceNotFoundException: If end_user not found + BusinessException: If end_user not in authorized workspace + """ + logger.info(f"Validating end_user: {end_user_id} for workspace: {workspace_id}") + + # Query end_user by ID + try: + end_user_uuid = uuid.UUID(end_user_id) + except ValueError: + logger.warning(f"Invalid end_user_id format: {end_user_id}") + raise BusinessException( + message=f"Invalid end_user_id format: {end_user_id}", + code=BizCode.INVALID_PARAMETER + ) + + end_user = self.db.query(EndUser).filter(EndUser.id == end_user_uuid).first() + + if not end_user: + logger.warning(f"End user not found: {end_user_id}") + raise ResourceNotFoundException( + resource_type="EndUser", + resource_id=end_user_id + ) + + # Verify end_user belongs to the workspace via App relationship + app = self.db.query(App).filter(App.id == end_user.app_id).first() + + if not app: + logger.warning(f"App not found for end_user: {end_user_id}") + raise ResourceNotFoundException( + resource_type="App", + resource_id=str(end_user.app_id) + ) + + if app.workspace_id != workspace_id: + logger.warning( + f"End user {end_user_id} belongs to workspace {app.workspace_id}, " + f"not authorized workspace {workspace_id}" + ) + raise BusinessException( + message="End user does not belong to authorized workspace", + code=BizCode.FORBIDDEN + ) + + logger.info(f"End user {end_user_id} validated successfully") + return end_user + + async def write_memory( + self, + workspace_id: uuid.UUID, + end_user_id: str, + message: str, + config_id: Optional[str] = None, + storage_type: str = "neo4j", + user_rag_memory_id: Optional[str] = None, + ) -> Dict[str, Any]: + """Write memory with validation. + + Validates end_user exists and belongs to workspace, then delegates + to MemoryAgentService.write_memory. + + Args: + workspace_id: Workspace ID for resource validation + end_user_id: End user identifier (used as group_id) + message: Message content to store + config_id: Optional memory configuration ID + storage_type: Storage backend (neo4j or rag) + user_rag_memory_id: Optional RAG memory ID + + Returns: + Dict with status and end_user_id + + Raises: + ResourceNotFoundException: If end_user not found + BusinessException: If end_user not in authorized workspace or write fails + """ + logger.info(f"Writing memory for end_user: {end_user_id}, workspace: {workspace_id}") + + # Validate end_user exists and belongs to workspace + self.validate_end_user(end_user_id, workspace_id) + + # Use end_user_id as group_id for memory operations + group_id = end_user_id + + try: + # Delegate to MemoryAgentService + result = await MemoryAgentService().write_memory( + group_id=group_id, + message=message, + config_id=config_id, + db=self.db, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id or "" + ) + + logger.info(f"Memory write successful for end_user: {end_user_id}") + + return { + "status": "success" if result == "success" else result, + "end_user_id": end_user_id + } + + except ConfigurationError as e: + logger.error(f"Memory configuration error for end_user {end_user_id}: {e}") + raise BusinessException( + message=str(e), + code=BizCode.MEMORY_CONFIG_NOT_FOUND + ) + except BusinessException: + raise + except Exception as e: + logger.error(f"Memory write failed for end_user {end_user_id}: {e}") + raise BusinessException( + message=f"Memory write failed: {str(e)}", + code=BizCode.MEMORY_WRITE_FAILED + ) + + async def read_memory( + self, + workspace_id: uuid.UUID, + end_user_id: str, + message: str, + search_switch: str = "0", + config_id: Optional[str] = None, + storage_type: str = "neo4j", + user_rag_memory_id: Optional[str] = None, + ) -> Dict[str, Any]: + """Read memory with validation. + + Validates end_user exists and belongs to workspace, then delegates + to MemoryAgentService.read_memory. + + Args: + workspace_id: Workspace ID for resource validation + end_user_id: End user identifier (used as group_id) + message: Query message + search_switch: Search mode (0=deep search with verification, 1=deep search, 2=fast search) + config_id: Optional memory configuration ID + storage_type: Storage backend (neo4j or rag) + user_rag_memory_id: Optional RAG memory ID + + Returns: + Dict with answer, intermediate_outputs, and end_user_id + + Raises: + ResourceNotFoundException: If end_user not found + BusinessException: If end_user not in authorized workspace or read fails + """ + logger.info(f"Reading memory for end_user: {end_user_id}, workspace: {workspace_id}") + + # Validate end_user exists and belongs to workspace + self.validate_end_user(end_user_id, workspace_id) + + # Use end_user_id as group_id for memory operations + group_id = end_user_id + + try: + # Delegate to MemoryAgentService + result = await MemoryAgentService().read_memory( + group_id=group_id, + message=message, + history=[], + search_switch=search_switch, + config_id=config_id, + db=self.db, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id or "" + ) + + logger.info(f"Memory read successful for end_user: {end_user_id}") + + return { + "answer": result.get("answer", ""), + "intermediate_outputs": result.get("intermediate_outputs", []), + "end_user_id": end_user_id + } + + except ConfigurationError as e: + logger.error(f"Memory configuration error for end_user {end_user_id}: {e}") + raise BusinessException( + message=str(e), + code=BizCode.MEMORY_CONFIG_NOT_FOUND + ) + except BusinessException: + raise + except Exception as e: + logger.error(f"Memory read failed for end_user {end_user_id}: {e}") + raise BusinessException( + message=f"Memory read failed: {str(e)}", + code=BizCode.MEMORY_READ_FAILED + )