Merge pull request #923 from SuanmoSuanyangTechnology/feat/enduser-info-apikey

feat(memory): add V1 memory config management endpoints and memory read/write API
This commit is contained in:
Ke Sun
2026-04-17 21:03:10 +08:00
committed by GitHub
9 changed files with 1198 additions and 221 deletions

View File

@@ -8,6 +8,8 @@ This service validates inputs and delegates to MemoryAgentService for core memor
import uuid
from typing import Any, Dict, Optional
from sqlalchemy.orm import Session
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException, ResourceNotFoundException
from app.core.logging_config import get_logger
@@ -15,7 +17,6 @@ from app.models.app_model import App
from app.models.end_user_model import EndUser
from app.schemas.memory_config_schema import ConfigurationError
from app.services.memory_agent_service import MemoryAgentService
from sqlalchemy.orm import Session
logger = get_logger(__name__)
@@ -124,7 +125,7 @@ class MemoryAPIService:
except Exception as e:
logger.warning(f"Failed to update memory_config_id for end_user {end_user_id}: {e}")
async def write_memory(
def write_memory(
self,
workspace_id: uuid.UUID,
end_user_id: str,
@@ -133,27 +134,28 @@ class MemoryAPIService:
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Write memory with validation.
"""Submit a memory write task via Celery.
Validates end_user exists and belongs to workspace, updates the end user's
memory_config_id, then delegates to MemoryAgentService.write_memory.
memory_config_id, then dispatches write_message_task to Celery for async
processing with per-user fair locking.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier (used as end_user_id)
end_user_id: End user identifier
message: Message content to store
config_id: Memory configuration ID (required)
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
Returns:
Dict with status and end_user_id
Dict with task_id, status, and end_user_id
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If end_user not in authorized workspace or write fails
BusinessException: If validation fails
"""
logger.info(f"Writing memory for end_user: {end_user_id}, workspace: {workspace_id}")
logger.info(f"Submitting memory write for end_user: {end_user_id}, workspace: {workspace_id}")
# Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id)
@@ -161,9 +163,120 @@ class MemoryAPIService:
# Update end user's memory_config_id
self._update_end_user_config(end_user_id, config_id)
# Convert to message list format expected by write_message_task
messages = message if isinstance(message, list) else [{"role": "user", "content": message}]
from app.tasks import write_message_task
task = write_message_task.delay(
end_user_id,
messages,
config_id,
storage_type,
user_rag_memory_id or "",
)
logger.info(f"Memory write task submitted: task_id={task.id}, end_user_id={end_user_id}")
return {
"task_id": task.id,
"status": "PENDING",
"end_user_id": end_user_id,
}
def read_memory(
self,
workspace_id: uuid.UUID,
end_user_id: str,
message: str,
search_switch: str = "0",
config_id: str = "",
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Submit a memory read task via Celery.
Validates end_user exists and belongs to workspace, updates the end user's
memory_config_id, then dispatches read_message_task to Celery for async processing.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier
message: Query message
search_switch: Search mode (0=deep search with verification, 1=deep search, 2=fast search)
config_id: Memory configuration ID (required)
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
Returns:
Dict with task_id, status, and end_user_id
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If validation fails
"""
logger.info(f"Submitting memory read for end_user: {end_user_id}, workspace: {workspace_id}")
# Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id)
# Update end user's memory_config_id
self._update_end_user_config(end_user_id, config_id)
from app.tasks import read_message_task
task = read_message_task.delay(
end_user_id,
message,
[], # history
search_switch,
config_id,
storage_type,
user_rag_memory_id or "",
)
logger.info(f"Memory read task submitted: task_id={task.id}, end_user_id={end_user_id}")
return {
"task_id": task.id,
"status": "PENDING",
"end_user_id": end_user_id,
}
async def write_memory_sync(
self,
workspace_id: uuid.UUID,
end_user_id: str,
message: str,
config_id: str,
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Write memory synchronously (inline, no Celery).
Validates end_user, then calls MemoryAgentService.write_memory directly.
Blocks until the write completes. Use for cases where the caller needs
immediate confirmation.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier
message: Message content to store
config_id: Memory configuration ID (required)
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
Returns:
Dict with status and end_user_id
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If write fails
"""
logger.info(f"Writing memory (sync) for end_user: {end_user_id}, workspace: {workspace_id}")
self.validate_end_user(end_user_id, workspace_id)
self._update_end_user_config(end_user_id, config_id)
try:
# Delegate to MemoryAgentService
# Convert string message to list[dict] format expected by MemoryAgentService
messages = message if isinstance(message, list) else [{"role": "user", "content": message}]
result = await MemoryAgentService().write_memory(
end_user_id=end_user_id,
@@ -174,11 +287,8 @@ class MemoryAPIService:
user_rag_memory_id=user_rag_memory_id or "",
)
logger.info(f"Memory write successful for end_user: {end_user_id}")
logger.info(f"Memory write (sync) successful for end_user: {end_user_id}")
# result may be a string "success" or a dict with a "status" key
# Preserve the full dict so callers don't silently lose extra fields
# (e.g. error codes, metadata) returned by MemoryAgentService.
if isinstance(result, dict):
return {
**result,
@@ -192,20 +302,17 @@ class MemoryAPIService:
except ConfigurationError as e:
logger.error(f"Memory configuration error for end_user {end_user_id}: {e}")
raise BusinessException(
message=str(e),
code=BizCode.MEMORY_CONFIG_NOT_FOUND
)
raise BusinessException(message=str(e), code=BizCode.MEMORY_CONFIG_NOT_FOUND)
except BusinessException:
raise
except Exception as e:
logger.error(f"Memory write failed for end_user {end_user_id}: {e}")
logger.error(f"Memory write (sync) failed for end_user {end_user_id}: {e}")
raise BusinessException(
message=f"Memory write failed: {str(e)}",
code=BizCode.MEMORY_WRITE_FAILED
)
async def read_memory(
async def read_memory_sync(
self,
workspace_id: uuid.UUID,
end_user_id: str,
@@ -215,37 +322,34 @@ class MemoryAPIService:
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Read memory with validation.
Validates end_user exists and belongs to workspace, updates the end user's
memory_config_id, then delegates to MemoryAgentService.read_memory.
"""Read memory synchronously (inline, no Celery).
Validates end_user, then calls MemoryAgentService.read_memory directly.
Blocks until the read completes. Use for cases where the caller needs
the answer immediately.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier (used as end_user_id)
end_user_id: End user identifier
message: Query message
search_switch: Search mode (0=deep search with verification, 1=deep search, 2=fast search)
config_id: Memory configuration ID (required)
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
Returns:
Dict with answer, intermediate_outputs, and end_user_id
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If end_user not in authorized workspace or read fails
BusinessException: If read fails
"""
logger.info(f"Reading memory for end_user: {end_user_id}, workspace: {workspace_id}")
logger.info(f"Reading memory (sync) for end_user: {end_user_id}, workspace: {workspace_id}")
# Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id)
# Update end user's memory_config_id
self._update_end_user_config(end_user_id, config_id)
try:
# Delegate to MemoryAgentService
result = await MemoryAgentService().read_memory(
end_user_id=end_user_id,
message=message,
@@ -257,7 +361,7 @@ class MemoryAPIService:
user_rag_memory_id=user_rag_memory_id or ""
)
logger.info(f"Memory read successful for end_user: {end_user_id}")
logger.info(f"Memory read (sync) successful for end_user: {end_user_id}")
return {
"answer": result.get("answer", ""),
@@ -267,14 +371,11 @@ class MemoryAPIService:
except ConfigurationError as e:
logger.error(f"Memory configuration error for end_user {end_user_id}: {e}")
raise BusinessException(
message=str(e),
code=BizCode.MEMORY_CONFIG_NOT_FOUND
)
raise BusinessException(message=str(e), code=BizCode.MEMORY_CONFIG_NOT_FOUND)
except BusinessException:
raise
except Exception as e:
logger.error(f"Memory read failed for end_user {end_user_id}: {e}")
logger.error(f"Memory read (sync) failed for end_user {end_user_id}: {e}")
raise BusinessException(
message=f"Memory read failed: {str(e)}",
code=BizCode.MEMORY_READ_FAILED