feat(memory-api): implement memory read/write API service endpoints

- Add MemoryAPIService with read_memory and write_memory methods for managing user memories
- Create memory_api_schema.py with request/response schemas for read and write operations
- Implement write_memory_api_service endpoint for storing memory content with configurable storage backends
- Implement read_memory_api_service endpoint for querying memories with context-aware responses
- Add memory-specific error codes (MEMORY_WRITE_FAILED, MEMORY_READ_FAILED, MEMORY_CONFIG_NOT_FOUND) to error_codes.py
This commit is contained in:
Ke Sun
2025-12-26 11:43:51 +08:00
parent 2fa3bebe8f
commit bf1dfd97f0
4 changed files with 455 additions and 24 deletions

View File

@@ -1,14 +1,19 @@
"""Memory 服务接口 - 基于 API Key 认证"""
import uuid
from fastapi import APIRouter, Depends, Request, Body
from sqlalchemy.orm import Session
from app.db import get_db
from app.core.response_utils import success
from app.core.logging_config import get_business_logger
from app.core.api_key_auth import require_api_key
from app.core.logging_config import get_business_logger
from app.core.response_utils import success
from app.db import get_db
from app.schemas.api_key_schema import ApiKeyAuth
from app.schemas.memory_api_schema import (
MemoryReadRequest,
MemoryReadResponse,
MemoryWriteRequest,
MemoryWriteResponse,
)
from app.services.memory_api_service import MemoryAPIService
from fastapi import APIRouter, Body, Depends, Request
from sqlalchemy.orm import Session
router = APIRouter(prefix="/memory", tags=["V1 - Memory API"])
logger = get_business_logger()
@@ -20,27 +25,63 @@ async def get_memory_info():
return success(data={}, msg="Memory API - Coming Soon")
# /v1/memory/chat
@router.post("/chat")
@router.post("/write_api_service")
@require_api_key(scopes=["memory"])
async def chat_with_agent_demo(
async def write_memory_api_service(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(..., description="聊天消息内容"),
payload: MemoryWriteRequest = Body(..., embed=False),
):
"""
Agent 聊天接口demo
scopes: 所需的权限范围列表["app", "rag", "memory"]
Args:
message: 请求参数
request: 声明请求
api_key_auth: 包含验证后的API Key 信息
db: db_session
Write memory to storage.
Stores memory content for the specified end user using the Memory API Service.
"""
logger.info(f"API Key Auth: {api_key_auth}")
logger.info(f"Resource ID: {api_key_auth.resource_id}")
logger.info(f"Message: {message}")
return success(data={"received": True}, msg="消息已接收")
logger.info(f"Memory write request - end_user_id: {payload.end_user_id}")
memory_api_service = MemoryAPIService(db)
result = await memory_api_service.write_memory(
workspace_id=api_key_auth.workspace_id,
end_user_id=payload.end_user_id,
message=payload.message,
config_id=payload.config_id,
storage_type=payload.storage_type,
user_rag_memory_id=payload.user_rag_memory_id,
)
logger.info(f"Memory write successful for end_user: {payload.end_user_id}")
return success(data=MemoryWriteResponse(**result).model_dump(), msg="Memory written successfully")
@router.post("/read_api_service")
@require_api_key(scopes=["memory"])
async def read_memory_api_service(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
payload: MemoryReadRequest = Body(..., embed=False),
):
"""
Read memory from storage.
Queries and retrieves memories for the specified end user with context-aware responses.
"""
logger.info(f"Memory read request - end_user_id: {payload.end_user_id}")
memory_api_service = MemoryAPIService(db)
result = await memory_api_service.read_memory(
workspace_id=api_key_auth.workspace_id,
end_user_id=payload.end_user_id,
message=payload.message,
search_switch=payload.search_switch,
config_id=payload.config_id,
storage_type=payload.storage_type,
user_rag_memory_id=payload.user_rag_memory_id,
)
logger.info(f"Memory read successful for end_user: {payload.end_user_id}")
return success(data=MemoryReadResponse(**result).model_dump(), msg="Memory read successfully")

View File

@@ -78,6 +78,11 @@ class BizCode(IntEnum):
EMBEDDING_FAILED = 9002
SEARCH_FAILED = 9003
# Memory API95xx
MEMORY_WRITE_FAILED = 9501
MEMORY_READ_FAILED = 9502
MEMORY_CONFIG_NOT_FOUND = 9503
# 系统100xx
INTERNAL_ERROR = 10001
DB_ERROR = 10002
@@ -148,6 +153,12 @@ HTTP_MAPPING = {
BizCode.INDEX_BUILD_FAILED: 500,
BizCode.EMBEDDING_FAILED: 500,
BizCode.SEARCH_FAILED: 500,
# Memory API 错误码映射
BizCode.MEMORY_WRITE_FAILED: 500,
BizCode.MEMORY_READ_FAILED: 500,
BizCode.MEMORY_CONFIG_NOT_FOUND: 400,
BizCode.INTERNAL_ERROR: 500,
BizCode.DB_ERROR: 500,
BizCode.SERVICE_UNAVAILABLE: 503,

View File

@@ -0,0 +1,134 @@
"""Memory API Service request/response schemas.
This module defines Pydantic schemas for the Memory API Service endpoints,
including request validation and response structures for read and write operations.
"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field, field_validator
class MemoryWriteRequest(BaseModel):
"""Request schema for memory write operation.
Attributes:
end_user_id: End user identifier (required)
message: Message content to store (required)
config_id: Optional memory configuration ID
storage_type: Storage backend type (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID for rag storage type
"""
end_user_id: str = Field(..., description="End user ID (required)")
message: str = Field(..., description="Message content to store")
config_id: Optional[str] = Field(None, description="Memory configuration ID")
storage_type: str = Field("neo4j", description="Storage type: neo4j or rag")
user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID")
@field_validator("end_user_id")
@classmethod
def validate_end_user_id(cls, v: str) -> str:
"""Validate that end_user_id is not empty."""
if not v or not v.strip():
raise ValueError("end_user_id is required and cannot be empty")
return v.strip()
@field_validator("message")
@classmethod
def validate_message(cls, v: str) -> str:
"""Validate that message is not empty."""
if not v or not v.strip():
raise ValueError("message is required and cannot be empty")
return v
@field_validator("storage_type")
@classmethod
def validate_storage_type(cls, v: str) -> str:
"""Validate that storage_type is either neo4j or rag."""
valid_types = {"neo4j", "rag"}
if v.lower() not in valid_types:
raise ValueError(f"storage_type must be one of: {', '.join(valid_types)}")
return v.lower()
class MemoryReadRequest(BaseModel):
"""Request schema for memory read operation.
Attributes:
end_user_id: End user identifier (required)
message: Query message (required)
search_switch: Search mode (0=verify, 1=direct, 2=context)
config_id: Optional memory configuration ID
storage_type: Storage backend type (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID for rag storage type
"""
end_user_id: str = Field(..., description="End user ID (required)")
message: str = Field(..., description="Query message")
search_switch: str = Field(
"0",
description="Search mode: 0=verify, 1=direct, 2=context"
)
config_id: Optional[str] = Field(None, description="Memory configuration ID")
storage_type: str = Field("neo4j", description="Storage type: neo4j or rag")
user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID")
@field_validator("end_user_id")
@classmethod
def validate_end_user_id(cls, v: str) -> str:
"""Validate that end_user_id is not empty."""
if not v or not v.strip():
raise ValueError("end_user_id is required and cannot be empty")
return v.strip()
@field_validator("message")
@classmethod
def validate_message(cls, v: str) -> str:
"""Validate that message is not empty."""
if not v or not v.strip():
raise ValueError("message is required and cannot be empty")
return v
@field_validator("storage_type")
@classmethod
def validate_storage_type(cls, v: str) -> str:
"""Validate that storage_type is either neo4j or rag."""
valid_types = {"neo4j", "rag"}
if v.lower() not in valid_types:
raise ValueError(f"storage_type must be one of: {', '.join(valid_types)}")
return v.lower()
@field_validator("search_switch")
@classmethod
def validate_search_switch(cls, v: str) -> str:
"""Validate that search_switch is a valid mode."""
valid_modes = {"0", "1", "2"}
if v not in valid_modes:
raise ValueError(f"search_switch must be one of: {', '.join(valid_modes)}")
return v
class MemoryWriteResponse(BaseModel):
"""Response schema for memory write operation.
Attributes:
status: Operation status (success or failed)
end_user_id: End user ID that was written to
"""
status: str = Field(..., description="Operation status: success or failed")
end_user_id: str = Field(..., description="End user ID")
class MemoryReadResponse(BaseModel):
"""Response schema for memory read operation.
Attributes:
answer: Generated answer from memory retrieval
intermediate_outputs: Intermediate retrieval outputs
end_user_id: End user ID that was queried
"""
answer: str = Field(..., description="Generated answer")
intermediate_outputs: List[Dict[str, Any]] = Field(
default_factory=list,
description="Intermediate retrieval outputs"
)
end_user_id: str = Field(..., description="End user ID")

View File

@@ -0,0 +1,245 @@
"""
Memory API Service
Provides external access to memory read and write operations through API Key authentication.
This service validates inputs and delegates to MemoryAgentService for core memory operations.
"""
import uuid
from typing import Any, Dict, Optional
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException, ResourceNotFoundException
from app.core.logging_config import get_logger
from app.models.app_model import App
from app.models.end_user_model import EndUser
from app.schemas.memory_config_schema import ConfigurationError
from app.services.memory_agent_service import MemoryAgentService
from sqlalchemy.orm import Session
logger = get_logger(__name__)
class MemoryAPIService:
"""Service for memory API operations with validation and delegation to MemoryAgentService.
This service provides a thin layer that:
1. Validates end_user exists and belongs to the authorized workspace
2. Maps end_user_id to group_id for memory operations
3. Delegates to MemoryAgentService for actual memory read/write operations
"""
def __init__(self, db: Session):
"""Initialize MemoryAPIService.
Args:
db: SQLAlchemy database session
"""
self.db = db
def validate_end_user(
self,
end_user_id: str,
workspace_id: uuid.UUID
) -> EndUser:
"""Validate that end_user exists and belongs to the workspace.
Args:
end_user_id: End user ID to validate
workspace_id: Workspace ID from API key authorization
Returns:
EndUser object if valid
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If end_user not in authorized workspace
"""
logger.info(f"Validating end_user: {end_user_id} for workspace: {workspace_id}")
# Query end_user by ID
try:
end_user_uuid = uuid.UUID(end_user_id)
except ValueError:
logger.warning(f"Invalid end_user_id format: {end_user_id}")
raise BusinessException(
message=f"Invalid end_user_id format: {end_user_id}",
code=BizCode.INVALID_PARAMETER
)
end_user = self.db.query(EndUser).filter(EndUser.id == end_user_uuid).first()
if not end_user:
logger.warning(f"End user not found: {end_user_id}")
raise ResourceNotFoundException(
resource_type="EndUser",
resource_id=end_user_id
)
# Verify end_user belongs to the workspace via App relationship
app = self.db.query(App).filter(App.id == end_user.app_id).first()
if not app:
logger.warning(f"App not found for end_user: {end_user_id}")
raise ResourceNotFoundException(
resource_type="App",
resource_id=str(end_user.app_id)
)
if app.workspace_id != workspace_id:
logger.warning(
f"End user {end_user_id} belongs to workspace {app.workspace_id}, "
f"not authorized workspace {workspace_id}"
)
raise BusinessException(
message="End user does not belong to authorized workspace",
code=BizCode.FORBIDDEN
)
logger.info(f"End user {end_user_id} validated successfully")
return end_user
async def write_memory(
self,
workspace_id: uuid.UUID,
end_user_id: str,
message: str,
config_id: Optional[str] = None,
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Write memory with validation.
Validates end_user exists and belongs to workspace, then delegates
to MemoryAgentService.write_memory.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier (used as group_id)
message: Message content to store
config_id: Optional memory configuration ID
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
Returns:
Dict with status and end_user_id
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If end_user not in authorized workspace or write fails
"""
logger.info(f"Writing memory for end_user: {end_user_id}, workspace: {workspace_id}")
# Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id)
# Use end_user_id as group_id for memory operations
group_id = end_user_id
try:
# Delegate to MemoryAgentService
result = await MemoryAgentService().write_memory(
group_id=group_id,
message=message,
config_id=config_id,
db=self.db,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id or ""
)
logger.info(f"Memory write successful for end_user: {end_user_id}")
return {
"status": "success" if result == "success" else result,
"end_user_id": end_user_id
}
except ConfigurationError as e:
logger.error(f"Memory configuration error for end_user {end_user_id}: {e}")
raise BusinessException(
message=str(e),
code=BizCode.MEMORY_CONFIG_NOT_FOUND
)
except BusinessException:
raise
except Exception as e:
logger.error(f"Memory write failed for end_user {end_user_id}: {e}")
raise BusinessException(
message=f"Memory write failed: {str(e)}",
code=BizCode.MEMORY_WRITE_FAILED
)
async def read_memory(
self,
workspace_id: uuid.UUID,
end_user_id: str,
message: str,
search_switch: str = "0",
config_id: Optional[str] = None,
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Read memory with validation.
Validates end_user exists and belongs to workspace, then delegates
to MemoryAgentService.read_memory.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier (used as group_id)
message: Query message
search_switch: Search mode (0=deep search with verification, 1=deep search, 2=fast search)
config_id: Optional memory configuration ID
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
Returns:
Dict with answer, intermediate_outputs, and end_user_id
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If end_user not in authorized workspace or read fails
"""
logger.info(f"Reading memory for end_user: {end_user_id}, workspace: {workspace_id}")
# Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id)
# Use end_user_id as group_id for memory operations
group_id = end_user_id
try:
# Delegate to MemoryAgentService
result = await MemoryAgentService().read_memory(
group_id=group_id,
message=message,
history=[],
search_switch=search_switch,
config_id=config_id,
db=self.db,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id or ""
)
logger.info(f"Memory read successful for end_user: {end_user_id}")
return {
"answer": result.get("answer", ""),
"intermediate_outputs": result.get("intermediate_outputs", []),
"end_user_id": end_user_id
}
except ConfigurationError as e:
logger.error(f"Memory configuration error for end_user {end_user_id}: {e}")
raise BusinessException(
message=str(e),
code=BizCode.MEMORY_CONFIG_NOT_FOUND
)
except BusinessException:
raise
except Exception as e:
logger.error(f"Memory read failed for end_user {end_user_id}: {e}")
raise BusinessException(
message=f"Memory read failed: {str(e)}",
code=BizCode.MEMORY_READ_FAILED
)