feat(memory): Refactor memory API to support async task-based and sync operations

- Rename endpoints from write_api_service/read_api_service to write/read for clarity
- Add async task-based endpoints (/write, /read) that dispatch to Celery with fair locking
- Add task status polling endpoints (/write/status, /read/status) to check async operation results
- Add synchronous endpoints (/write/sync, /read/sync) for blocking operations with direct results
- Introduce TaskStatusResponse schema for task status polling responses
- Add MemoryWriteSyncResponse and MemoryReadSyncResponse schemas for sync operations
- Implement write_memory_sync and read_memory_sync methods in MemoryAPIService
- Remove await from async service calls in task-based endpoints (now handled by Celery)
- Add Query parameter import for task_id in status endpoints
- Update docstrings to clarify async vs sync behavior and task polling workflow
- Integrate task_service for retrieving Celery task results
This commit is contained in:
Ke Sun
2026-04-02 14:47:36 +08:00
parent 7ce29019f7
commit 010eff17cf
3 changed files with 320 additions and 69 deletions

View File

@@ -1,6 +1,6 @@
"""Memory 服务接口 - 基于 API Key 认证"""
from fastapi import APIRouter, Body, Depends, Request
from fastapi import APIRouter, Body, Depends, Query, Request
from sqlalchemy.orm import Session
from app.core.api_key_auth import require_api_key
@@ -11,8 +11,11 @@ from app.schemas.api_key_schema import ApiKeyAuth
from app.schemas.memory_api_schema import (
MemoryReadRequest,
MemoryReadResponse,
MemoryReadSyncResponse,
MemoryWriteRequest,
MemoryWriteResponse,
MemoryWriteSyncResponse,
TaskStatusResponse,
)
from app.services.memory_api_service import MemoryAPIService
@@ -26,26 +29,27 @@ async def get_memory_info():
return success(data={}, msg="Memory API - Coming Soon")
@router.post("/write_api_service")
@router.post("/write")
@require_api_key(scopes=["memory"])
async def write_memory_api_service(
async def write_memory(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(..., description="Message content"),
):
"""
Write memory to storage.
Stores memory content for the specified end user using the Memory API Service.
Submit a memory write task.
Validates the end user, then dispatches the write to a Celery background task
with per-user fair locking. Returns a task_id for status polling.
"""
body = await request.json()
payload = MemoryWriteRequest(**body)
logger.info(f"Memory write request - end_user_id: {payload.end_user_id}, workspace_id: {api_key_auth.workspace_id}")
memory_api_service = MemoryAPIService(db)
result = await memory_api_service.write_memory(
result = memory_api_service.write_memory(
workspace_id=api_key_auth.workspace_id,
end_user_id=payload.end_user_id,
message=payload.message,
@@ -53,31 +57,53 @@ async def write_memory_api_service(
storage_type=payload.storage_type,
user_rag_memory_id=payload.user_rag_memory_id,
)
logger.info(f"Memory write successful for end_user: {payload.end_user_id}")
return success(data=MemoryWriteResponse(**result).model_dump(), msg="Memory written successfully")
logger.info(f"Memory write task submitted: task_id={result['task_id']}, end_user_id: {payload.end_user_id}")
return success(data=MemoryWriteResponse(**result).model_dump(), msg="Memory write task submitted")
@router.post("/read_api_service")
@router.get("/write/status")
@require_api_key(scopes=["memory"])
async def read_memory_api_service(
async def get_write_task_status(
request: Request,
task_id: str = Query(..., description="Celery task ID"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Check the status of a memory write task.
Returns the current status and result (if completed) of a previously submitted write task.
"""
logger.info(f"Write task status check - task_id: {task_id}")
from app.services.task_service import get_task_memory_write_result
result = get_task_memory_write_result(task_id)
return success(data=TaskStatusResponse(**result).model_dump(), msg="Task status retrieved")
@router.post("/read")
@require_api_key(scopes=["memory"])
async def read_memory(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(..., description="Query message"),
):
"""
Read memory from storage.
Queries and retrieves memories for the specified end user with context-aware responses.
Submit a memory read task.
Validates the end user, then dispatches the read to a Celery background task.
Returns a task_id for status polling.
"""
body = await request.json()
payload = MemoryReadRequest(**body)
logger.info(f"Memory read request - end_user_id: {payload.end_user_id}")
memory_api_service = MemoryAPIService(db)
result = await memory_api_service.read_memory(
result = memory_api_service.read_memory(
workspace_id=api_key_auth.workspace_id,
end_user_id=payload.end_user_id,
message=payload.message,
@@ -86,6 +112,94 @@ async def read_memory_api_service(
storage_type=payload.storage_type,
user_rag_memory_id=payload.user_rag_memory_id,
)
logger.info(f"Memory read successful for end_user: {payload.end_user_id}")
return success(data=MemoryReadResponse(**result).model_dump(), msg="Memory read successfully")
logger.info(f"Memory read task submitted: task_id={result['task_id']}, end_user_id: {payload.end_user_id}")
return success(data=MemoryReadResponse(**result).model_dump(), msg="Memory read task submitted")
@router.get("/read/status")
@require_api_key(scopes=["memory"])
async def get_read_task_status(
request: Request,
task_id: str = Query(..., description="Celery task ID"),
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Check the status of a memory read task.
Returns the current status and result (if completed) of a previously submitted read task.
"""
logger.info(f"Read task status check - task_id: {task_id}")
from app.services.task_service import get_task_memory_read_result
result = get_task_memory_read_result(task_id)
return success(data=TaskStatusResponse(**result).model_dump(), msg="Task status retrieved")
@router.post("/write/sync")
@require_api_key(scopes=["memory"])
async def write_memory_sync(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(..., description="Message content"),
):
"""
Write memory synchronously.
Blocks until the write completes and returns the result directly.
For async processing with task polling, use /write instead.
"""
body = await request.json()
payload = MemoryWriteRequest(**body)
logger.info(f"Memory write (sync) request - end_user_id: {payload.end_user_id}")
memory_api_service = MemoryAPIService(db)
result = await memory_api_service.write_memory_sync(
workspace_id=api_key_auth.workspace_id,
end_user_id=payload.end_user_id,
message=payload.message,
config_id=payload.config_id,
storage_type=payload.storage_type,
user_rag_memory_id=payload.user_rag_memory_id,
)
logger.info(f"Memory write (sync) successful for end_user: {payload.end_user_id}")
return success(data=MemoryWriteSyncResponse(**result).model_dump(), msg="Memory written successfully")
@router.post("/read/sync")
@require_api_key(scopes=["memory"])
async def read_memory_sync(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
message: str = Body(..., description="Query message"),
):
"""
Read memory synchronously.
Blocks until the read completes and returns the answer directly.
For async processing with task polling, use /read instead.
"""
body = await request.json()
payload = MemoryReadRequest(**body)
logger.info(f"Memory read (sync) request - end_user_id: {payload.end_user_id}")
memory_api_service = MemoryAPIService(db)
result = await memory_api_service.read_memory_sync(
workspace_id=api_key_auth.workspace_id,
end_user_id=payload.end_user_id,
message=payload.message,
search_switch=payload.search_switch,
config_id=payload.config_id,
storage_type=payload.storage_type,
user_rag_memory_id=payload.user_rag_memory_id,
)
logger.info(f"Memory read (sync) successful for end_user: {payload.end_user_id}")
return success(data=MemoryReadSyncResponse(**result).model_dump(), msg="Memory read successfully")

View File

@@ -110,6 +110,30 @@ class MemoryReadRequest(BaseModel):
class MemoryWriteResponse(BaseModel):
"""Response schema for memory write operation.
Attributes:
task_id: Celery task ID for status polling
status: Initial task status (PENDING)
end_user_id: End user ID the write was submitted for
"""
task_id: str = Field(..., description="Celery task ID for polling")
status: str = Field(..., description="Task status: PENDING")
end_user_id: str = Field(..., description="End user ID")
class TaskStatusResponse(BaseModel):
"""Response schema for task status check.
Attributes:
status: Task status (PENDING, STARTED, SUCCESS, FAILURE, SKIPPED)
result: Task result data (available when status is SUCCESS or FAILURE)
"""
status: str = Field(..., description="Task status")
result: Optional[Dict[str, Any]] = Field(None, description="Task result when completed")
class MemoryWriteSyncResponse(BaseModel):
"""Response schema for synchronous memory write.
Attributes:
status: Operation status (success or failed)
end_user_id: End user ID that was written to
@@ -118,8 +142,8 @@ class MemoryWriteResponse(BaseModel):
end_user_id: str = Field(..., description="End user ID")
class MemoryReadResponse(BaseModel):
"""Response schema for memory read operation.
class MemoryReadSyncResponse(BaseModel):
"""Response schema for synchronous memory read.
Attributes:
answer: Generated answer from memory retrieval
@@ -128,12 +152,25 @@ class MemoryReadResponse(BaseModel):
"""
answer: str = Field(..., description="Generated answer")
intermediate_outputs: List[Dict[str, Any]] = Field(
default_factory=list,
default_factory=list,
description="Intermediate retrieval outputs"
)
end_user_id: str = Field(..., description="End user ID")
class MemoryReadResponse(BaseModel):
"""Response schema for memory read operation.
Attributes:
task_id: Celery task ID for status polling
status: Initial task status (PENDING)
end_user_id: End user ID the read was submitted for
"""
task_id: str = Field(..., description="Celery task ID for polling")
status: str = Field(..., description="Task status: PENDING")
end_user_id: str = Field(..., description="End user ID")
class CreateEndUserRequest(BaseModel):
"""Request schema for creating an end user.

View File

@@ -125,7 +125,7 @@ class MemoryAPIService:
except Exception as e:
logger.warning(f"Failed to update memory_config_id for end_user {end_user_id}: {e}")
async def write_memory(
def write_memory(
self,
workspace_id: uuid.UUID,
end_user_id: str,
@@ -134,27 +134,28 @@ class MemoryAPIService:
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Write memory with validation.
"""Submit a memory write task via Celery.
Validates end_user exists and belongs to workspace, updates the end user's
memory_config_id, then delegates to MemoryAgentService.write_memory.
memory_config_id, then dispatches write_message_task to Celery for async
processing with per-user fair locking.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier (used as end_user_id)
end_user_id: End user identifier
message: Message content to store
config_id: Memory configuration ID (required)
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
Returns:
Dict with status and end_user_id
Dict with task_id, status, and end_user_id
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If end_user not in authorized workspace or write fails
BusinessException: If validation fails
"""
logger.info(f"Writing memory for end_user: {end_user_id}, workspace: {workspace_id}")
logger.info(f"Submitting memory write for end_user: {end_user_id}, workspace: {workspace_id}")
# Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id)
@@ -162,9 +163,120 @@ class MemoryAPIService:
# Update end user's memory_config_id
self._update_end_user_config(end_user_id, config_id)
# Convert to message list format expected by write_message_task
messages = message if isinstance(message, list) else [{"role": "user", "content": message}]
from app.tasks import write_message_task
task = write_message_task.delay(
end_user_id,
messages,
config_id,
storage_type,
user_rag_memory_id or "",
)
logger.info(f"Memory write task submitted: task_id={task.id}, end_user_id={end_user_id}")
return {
"task_id": task.id,
"status": "PENDING",
"end_user_id": end_user_id,
}
def read_memory(
self,
workspace_id: uuid.UUID,
end_user_id: str,
message: str,
search_switch: str = "0",
config_id: str = "",
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Submit a memory read task via Celery.
Validates end_user exists and belongs to workspace, updates the end user's
memory_config_id, then dispatches read_message_task to Celery for async processing.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier
message: Query message
search_switch: Search mode (0=deep search with verification, 1=deep search, 2=fast search)
config_id: Memory configuration ID (required)
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
Returns:
Dict with task_id, status, and end_user_id
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If validation fails
"""
logger.info(f"Submitting memory read for end_user: {end_user_id}, workspace: {workspace_id}")
# Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id)
# Update end user's memory_config_id
self._update_end_user_config(end_user_id, config_id)
from app.tasks import read_message_task
task = read_message_task.delay(
end_user_id,
message,
[], # history
search_switch,
config_id,
storage_type,
user_rag_memory_id or "",
)
logger.info(f"Memory read task submitted: task_id={task.id}, end_user_id={end_user_id}")
return {
"task_id": task.id,
"status": "PENDING",
"end_user_id": end_user_id,
}
async def write_memory_sync(
self,
workspace_id: uuid.UUID,
end_user_id: str,
message: str,
config_id: str,
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Write memory synchronously (inline, no Celery).
Validates end_user, then calls MemoryAgentService.write_memory directly.
Blocks until the write completes. Use for cases where the caller needs
immediate confirmation.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier
message: Message content to store
config_id: Memory configuration ID (required)
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
Returns:
Dict with status and end_user_id
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If write fails
"""
logger.info(f"Writing memory (sync) for end_user: {end_user_id}, workspace: {workspace_id}")
self.validate_end_user(end_user_id, workspace_id)
self._update_end_user_config(end_user_id, config_id)
try:
# Delegate to MemoryAgentService
# Convert string message to list[dict] format expected by MemoryAgentService
messages = message if isinstance(message, list) else [{"role": "user", "content": message}]
result = await MemoryAgentService().write_memory(
end_user_id=end_user_id,
@@ -175,11 +287,8 @@ class MemoryAPIService:
user_rag_memory_id=user_rag_memory_id or "",
)
logger.info(f"Memory write successful for end_user: {end_user_id}")
logger.info(f"Memory write (sync) successful for end_user: {end_user_id}")
# result may be a string "success" or a dict with a "status" key
# Preserve the full dict so callers don't silently lose extra fields
# (e.g. error codes, metadata) returned by MemoryAgentService.
if isinstance(result, dict):
return {
**result,
@@ -193,20 +302,17 @@ class MemoryAPIService:
except ConfigurationError as e:
logger.error(f"Memory configuration error for end_user {end_user_id}: {e}")
raise BusinessException(
message=str(e),
code=BizCode.MEMORY_CONFIG_NOT_FOUND
)
raise BusinessException(message=str(e), code=BizCode.MEMORY_CONFIG_NOT_FOUND)
except BusinessException:
raise
except Exception as e:
logger.error(f"Memory write failed for end_user {end_user_id}: {e}")
logger.error(f"Memory write (sync) failed for end_user {end_user_id}: {e}")
raise BusinessException(
message=f"Memory write failed: {str(e)}",
code=BizCode.MEMORY_WRITE_FAILED
)
async def read_memory(
async def read_memory_sync(
self,
workspace_id: uuid.UUID,
end_user_id: str,
@@ -216,37 +322,34 @@ class MemoryAPIService:
storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Read memory with validation.
Validates end_user exists and belongs to workspace, updates the end user's
memory_config_id, then delegates to MemoryAgentService.read_memory.
"""Read memory synchronously (inline, no Celery).
Validates end_user, then calls MemoryAgentService.read_memory directly.
Blocks until the read completes. Use for cases where the caller needs
the answer immediately.
Args:
workspace_id: Workspace ID for resource validation
end_user_id: End user identifier (used as end_user_id)
end_user_id: End user identifier
message: Query message
search_switch: Search mode (0=deep search with verification, 1=deep search, 2=fast search)
config_id: Memory configuration ID (required)
storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID
Returns:
Dict with answer, intermediate_outputs, and end_user_id
Raises:
ResourceNotFoundException: If end_user not found
BusinessException: If end_user not in authorized workspace or read fails
BusinessException: If read fails
"""
logger.info(f"Reading memory for end_user: {end_user_id}, workspace: {workspace_id}")
logger.info(f"Reading memory (sync) for end_user: {end_user_id}, workspace: {workspace_id}")
# Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id)
# Update end user's memory_config_id
self._update_end_user_config(end_user_id, config_id)
try:
# Delegate to MemoryAgentService
result = await MemoryAgentService().read_memory(
end_user_id=end_user_id,
message=message,
@@ -258,7 +361,7 @@ class MemoryAPIService:
user_rag_memory_id=user_rag_memory_id or ""
)
logger.info(f"Memory read successful for end_user: {end_user_id}")
logger.info(f"Memory read (sync) successful for end_user: {end_user_id}")
return {
"answer": result.get("answer", ""),
@@ -268,14 +371,11 @@ class MemoryAPIService:
except ConfigurationError as e:
logger.error(f"Memory configuration error for end_user {end_user_id}: {e}")
raise BusinessException(
message=str(e),
code=BizCode.MEMORY_CONFIG_NOT_FOUND
)
raise BusinessException(message=str(e), code=BizCode.MEMORY_CONFIG_NOT_FOUND)
except BusinessException:
raise
except Exception as e:
logger.error(f"Memory read failed for end_user {end_user_id}: {e}")
logger.error(f"Memory read (sync) failed for end_user {end_user_id}: {e}")
raise BusinessException(
message=f"Memory read failed: {str(e)}",
code=BizCode.MEMORY_READ_FAILED