feat(memory-api): add end user management and enhance memory API endpoints

- Add end_user_controller with unauthenticated endpoint for creating end users
- Implement get_or_create_end_user logic to handle duplicate end users by other_id
- Register end_user_controller router in main controller initialization
- Add list_memory_configs endpoint to retrieve all workspace memory configurations
- Update MemoryWriteRequest and MemoryReadRequest to make config_id required field
- Refactor memory API endpoints to parse request body directly instead of using Body parameter
- Add CreateEndUserRequest and CreateEndUserResponse schemas for end user creation
- Add ListConfigsResponse schema for configs listing endpoint
- Remove unused config_id and llm_model_id parameters from Neo4j write operation
- Update .gitignore to exclude redbear-mem-metrics and pitch-deck directories
This commit is contained in:
Ke Sun
2026-03-20 21:04:41 +08:00
parent 78316de411
commit e8ae46b286
8 changed files with 260 additions and 37 deletions

2
.gitignore vendored
View File

@@ -25,6 +25,8 @@ examples/
time.log time.log
celerybeat-schedule.db celerybeat-schedule.db
search_results.json search_results.json
redbear-mem-metrics/
pitch-deck/
api/migrations/versions api/migrations/versions
tmp tmp

View File

@@ -13,6 +13,7 @@ from . import (
document_controller, document_controller,
emotion_config_controller, emotion_config_controller,
emotion_controller, emotion_controller,
end_user_controller,
file_controller, file_controller,
file_storage_controller, file_storage_controller,
home_page_controller, home_page_controller,
@@ -96,5 +97,6 @@ manager_router.include_router(file_storage_controller.router)
manager_router.include_router(ontology_controller.router) manager_router.include_router(ontology_controller.router)
manager_router.include_router(skill_controller.router) manager_router.include_router(skill_controller.router)
manager_router.include_router(i18n_controller.router) manager_router.include_router(i18n_controller.router)
manager_router.include_router(end_user_controller.router)
__all__ = ["manager_router"] __all__ = ["manager_router"]

View File

@@ -0,0 +1,48 @@
"""End User 管理接口 - 无需认证"""
from app.core.logging_config import get_business_logger
from app.core.response_utils import success
from app.db import get_db
from app.repositories.end_user_repository import EndUserRepository
from app.schemas.memory_api_schema import (
CreateEndUserRequest,
CreateEndUserResponse,
)
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
router = APIRouter(prefix="/end_users", tags=["End Users"])
logger = get_business_logger()
@router.post("")
async def create_end_user(
data: CreateEndUserRequest,
db: Session = Depends(get_db),
):
"""
Create an end user.
Creates a new end user for the given workspace.
If an end user with the same other_id already exists in the workspace,
returns the existing one.
"""
logger.info(f"Create end user request - other_id: {data.other_id}, workspace_id: {data.workspace_id}")
end_user_repo = EndUserRepository(db)
end_user = end_user_repo.get_or_create_end_user(
app_id=None,
workspace_id=data.workspace_id,
other_id=data.other_id,
)
logger.info(f"End user ready: {end_user.id}")
result = {
"id": str(end_user.id),
"other_id": end_user.other_id or "",
"other_name": end_user.other_name or "",
"workspace_id": str(end_user.workspace_id),
}
return success(data=CreateEndUserResponse(**result).model_dump(), msg="End user created successfully")

View File

@@ -6,6 +6,7 @@ from app.core.response_utils import success
from app.db import get_db from app.db import get_db
from app.schemas.api_key_schema import ApiKeyAuth from app.schemas.api_key_schema import ApiKeyAuth
from app.schemas.memory_api_schema import ( from app.schemas.memory_api_schema import (
ListConfigsResponse,
MemoryReadRequest, MemoryReadRequest,
MemoryReadResponse, MemoryReadResponse,
MemoryWriteRequest, MemoryWriteRequest,
@@ -31,14 +32,15 @@ async def write_memory_api_service(
request: Request, request: Request,
api_key_auth: ApiKeyAuth = None, api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db), db: Session = Depends(get_db),
payload: MemoryWriteRequest = Body(..., embed=False), message: str = Body(..., description="Message content"),
): ):
""" """
Write memory to storage. Write memory to storage.
Stores memory content for the specified end user using the Memory API Service. Stores memory content for the specified end user using the Memory API Service.
""" """
body = await request.json()
payload = MemoryWriteRequest(**body)
logger.info(f"Memory write request - end_user_id: {payload.end_user_id}, workspace_id: {api_key_auth.workspace_id}") logger.info(f"Memory write request - end_user_id: {payload.end_user_id}, workspace_id: {api_key_auth.workspace_id}")
memory_api_service = MemoryAPIService(db) memory_api_service = MemoryAPIService(db)
@@ -62,13 +64,15 @@ async def read_memory_api_service(
request: Request, request: Request,
api_key_auth: ApiKeyAuth = None, api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db), db: Session = Depends(get_db),
payload: MemoryReadRequest = Body(..., embed=False), message: str = Body(..., description="Query message"),
): ):
""" """
Read memory from storage. Read memory from storage.
Queries and retrieves memories for the specified end user with context-aware responses. Queries and retrieves memories for the specified end user with context-aware responses.
""" """
body = await request.json()
payload = MemoryReadRequest(**body)
logger.info(f"Memory read request - end_user_id: {payload.end_user_id}") logger.info(f"Memory read request - end_user_id: {payload.end_user_id}")
memory_api_service = MemoryAPIService(db) memory_api_service = MemoryAPIService(db)
@@ -85,3 +89,27 @@ async def read_memory_api_service(
logger.info(f"Memory read successful for end_user: {payload.end_user_id}") logger.info(f"Memory read successful for end_user: {payload.end_user_id}")
return success(data=MemoryReadResponse(**result).model_dump(), msg="Memory read successfully") return success(data=MemoryReadResponse(**result).model_dump(), msg="Memory read successfully")
@router.get("/configs")
@require_api_key(scopes=["memory"])
async def list_memory_configs(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
List all memory configs for the workspace.
Returns all available memory configurations associated with the authorized workspace.
"""
logger.info(f"List configs request - workspace_id: {api_key_auth.workspace_id}")
memory_api_service = MemoryAPIService(db)
result = memory_api_service.list_memory_configs(
workspace_id=api_key_auth.workspace_id,
)
logger.info(f"Listed {result['total']} configs for workspace: {api_key_auth.workspace_id}")
return success(data=ListConfigsResponse(**result).model_dump(), msg="Configs listed successfully")

View File

@@ -166,8 +166,6 @@ async def write(
statement_entity_edges=all_statement_entity_edges, statement_entity_edges=all_statement_entity_edges,
entity_edges=all_entity_entity_edges, entity_edges=all_entity_entity_edges,
connector=neo4j_connector, connector=neo4j_connector,
config_id=config_id,
llm_model_id=str(memory_config.llm_model_id) if memory_config.llm_model_id else None,
) )
if success: if success:
logger.info("Successfully saved all data to Neo4j") logger.info("Successfully saved all data to Neo4j")

View File

@@ -21,7 +21,7 @@ class MemoryWriteRequest(BaseModel):
""" """
end_user_id: str = Field(..., description="End user ID (required)") end_user_id: str = Field(..., description="End user ID (required)")
message: str = Field(..., description="Message content to store") message: str = Field(..., description="Message content to store")
config_id: Optional[str] = Field(None, description="Memory configuration ID") config_id: str = Field(..., description="Memory configuration ID (required)")
storage_type: str = Field("neo4j", description="Storage type: neo4j or rag") storage_type: str = Field("neo4j", description="Storage type: neo4j or rag")
user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID") user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID")
@@ -68,7 +68,7 @@ class MemoryReadRequest(BaseModel):
"0", "0",
description="Search mode: 0=verify, 1=direct, 2=context" description="Search mode: 0=verify, 1=direct, 2=context"
) )
config_id: Optional[str] = Field(None, description="Memory configuration ID") config_id: str = Field(..., description="Memory configuration ID (required)")
storage_type: str = Field("neo4j", description="Storage type: neo4j or rag") storage_type: str = Field("neo4j", description="Storage type: neo4j or rag")
user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID") user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID")
@@ -132,3 +132,79 @@ class MemoryReadResponse(BaseModel):
description="Intermediate retrieval outputs" description="Intermediate retrieval outputs"
) )
end_user_id: str = Field(..., description="End user ID") end_user_id: str = Field(..., description="End user ID")
class CreateEndUserRequest(BaseModel):
"""Request schema for creating an end user.
Attributes:
workspace_id: Workspace ID (required)
other_id: External user identifier (required)
other_name: Display name for the end user
"""
workspace_id: str = Field(..., description="Workspace ID (required)")
other_id: str = Field(..., description="External user identifier (required)")
other_name: Optional[str] = Field("", description="Display name")
@field_validator("workspace_id")
@classmethod
def validate_workspace_id(cls, v: str) -> str:
"""Validate that workspace_id is not empty."""
if not v or not v.strip():
raise ValueError("workspace_id is required and cannot be empty")
return v.strip()
@field_validator("other_id")
@classmethod
def validate_other_id(cls, v: str) -> str:
"""Validate that other_id is not empty."""
if not v or not v.strip():
raise ValueError("other_id is required and cannot be empty")
return v.strip()
class CreateEndUserResponse(BaseModel):
"""Response schema for end user creation.
Attributes:
id: Created end user UUID
other_id: External user identifier
other_name: Display name
workspace_id: Workspace the user belongs to
"""
id: str = Field(..., description="End user UUID")
other_id: str = Field(..., description="External user identifier")
other_name: str = Field("", description="Display name")
workspace_id: str = Field(..., description="Workspace ID")
class MemoryConfigItem(BaseModel):
"""Schema for a single memory config in the list response.
Attributes:
config_id: Configuration UUID
config_name: Configuration name
config_desc: Configuration description
is_default: Whether this is the workspace default config
scene_name: Associated ontology scene name
created_at: Creation timestamp
updated_at: Last update timestamp
"""
config_id: str = Field(..., description="Configuration ID")
config_name: str = Field(..., description="Configuration name")
config_desc: Optional[str] = Field(None, description="Configuration description")
is_default: bool = Field(False, description="Whether this is the workspace default")
scene_name: Optional[str] = Field(None, description="Associated ontology scene name")
created_at: Optional[str] = Field(None, description="Creation timestamp")
updated_at: Optional[str] = Field(None, description="Last update timestamp")
class ListConfigsResponse(BaseModel):
"""Response schema for listing memory configs.
Attributes:
configs: List of memory config items
total: Total number of configs
"""
configs: List[MemoryConfigItem] = Field(default_factory=list, description="List of configs")
total: int = Field(0, description="Total number of configs")

View File

@@ -1179,7 +1179,7 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
app = db.query(App).filter(App.id == app_id).first() app = db.query(App).filter(App.id == app_id).first()
if not app: if not app:
logger.warning(f"App not found: {app_id}") logger.warning(f"App not found: {app_id}")
raise ValueError(f"应用不存在: {app_id}") # raise ValueError(f"应用不存在: {app_id}")
# TODO: temp fix for draft run # TODO: temp fix for draft run
# if not app.current_release_id: # if not app.current_release_id:
# logger.warning(f"No current release for app: {app_id}") # logger.warning(f"No current release for app: {app_id}")
@@ -1252,17 +1252,15 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
memory_config_service = MemoryConfigService(db) memory_config_service = MemoryConfigService(db)
memory_config = memory_config_service.get_config_with_fallback( memory_config = memory_config_service.get_config_with_fallback(
memory_config_id=memory_config_id_to_use, memory_config_id=memory_config_id_to_use,
workspace_id=app.workspace_id workspace_id=end_user.workspace_id
) )
memory_config_id = str(memory_config.config_id) if memory_config else None memory_config_id = str(memory_config.config_id) if memory_config else None
result = { result = {
"end_user_id": str(end_user_id), "end_user_id": str(end_user_id),
"app_id": str(app_id),
"release_id": str(app.current_release_id) if app.current_release_id else None,
"memory_config_id": memory_config_id, "memory_config_id": memory_config_id,
"workspace_id": str(app.workspace_id) "workspace_id": str(end_user.workspace_id)
} }
logger.info( logger.info(

View File

@@ -84,43 +84,65 @@ class MemoryAPIService:
if not app: if not app:
logger.warning(f"App not found for end_user: {end_user_id}") logger.warning(f"App not found for end_user: {end_user_id}")
raise ResourceNotFoundException( # raise ResourceNotFoundException(
resource_type="App", # resource_type="App",
resource_id=str(end_user.app_id) # resource_id=str(end_user.app_id)
) # )
# temporally allow any workspace to access
if app.workspace_id != workspace_id: # if end_user.workspace_id != workspace_id:
logger.warning( # print(f"[DEBUG] end_user.workspace_id={end_user.workspace_id}, api_key.workspace_id={workspace_id}")
f"End user {end_user_id} belongs to workspace {app.workspace_id}, " # logger.warning(
f"not authorized workspace {workspace_id}" # f"End user {end_user_id} belongs to workspace {end_user.workspace_id}, "
) # f"not authorized workspace {workspace_id}"
raise BusinessException( # )
message="End user does not belong to authorized workspace", # raise BusinessException(
code=BizCode.FORBIDDEN # message=f"End user does not belong to authorized workspace. end_user.workspace_id={end_user.workspace_id}, api_key.workspace_id={workspace_id}",
) # code=BizCode.FORBIDDEN
# )
logger.info(f"End user {end_user_id} validated successfully") logger.info(f"End user {end_user_id} validated successfully")
return end_user return end_user
def _update_end_user_config(self, end_user_id: str, config_id: str) -> None:
"""Update the end user's memory_config_id.
Silently updates the config association. Logs warnings on failure
but does not raise, so it won't block the main read/write operation.
Args:
end_user_id: End user identifier
config_id: Memory configuration ID to assign
"""
try:
config_uuid = uuid.UUID(config_id)
from app.repositories.end_user_repository import EndUserRepository
end_user_repo = EndUserRepository(self.db)
end_user_repo.update_memory_config_id(
end_user_id=uuid.UUID(end_user_id),
memory_config_id=config_uuid,
)
except Exception as e:
logger.warning(f"Failed to update memory_config_id for end_user {end_user_id}: {e}")
async def write_memory( async def write_memory(
self, self,
workspace_id: uuid.UUID, workspace_id: uuid.UUID,
end_user_id: str, end_user_id: str,
message: str, message: str,
config_id: Optional[str] = None, config_id: str,
storage_type: str = "neo4j", storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None, user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Write memory with validation. """Write memory with validation.
Validates end_user exists and belongs to workspace, then delegates Validates end_user exists and belongs to workspace, updates the end user's
to MemoryAgentService.write_memory. memory_config_id, then delegates to MemoryAgentService.write_memory.
Args: Args:
workspace_id: Workspace ID for resource validation workspace_id: Workspace ID for resource validation
end_user_id: End user identifier (used as end_user_id) end_user_id: End user identifier (used as end_user_id)
message: Message content to store message: Message content to store
config_id: Optional memory configuration ID config_id: Memory configuration ID (required)
storage_type: Storage backend (neo4j or rag) storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID user_rag_memory_id: Optional RAG memory ID
@@ -136,7 +158,8 @@ class MemoryAPIService:
# Validate end_user exists and belongs to workspace # Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id) self.validate_end_user(end_user_id, workspace_id)
# Use end_user_id as end_user_id for memory operations # Update end user's memory_config_id
self._update_end_user_config(end_user_id, config_id)
try: try:
# Delegate to MemoryAgentService # Delegate to MemoryAgentService
@@ -188,21 +211,21 @@ class MemoryAPIService:
end_user_id: str, end_user_id: str,
message: str, message: str,
search_switch: str = "0", search_switch: str = "0",
config_id: Optional[str] = None, config_id: str = "",
storage_type: str = "neo4j", storage_type: str = "neo4j",
user_rag_memory_id: Optional[str] = None, user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Read memory with validation. """Read memory with validation.
Validates end_user exists and belongs to workspace, then delegates Validates end_user exists and belongs to workspace, updates the end user's
to MemoryAgentService.read_memory. memory_config_id, then delegates to MemoryAgentService.read_memory.
Args: Args:
workspace_id: Workspace ID for resource validation workspace_id: Workspace ID for resource validation
end_user_id: End user identifier (used as end_user_id) end_user_id: End user identifier (used as end_user_id)
message: Query message message: Query message
search_switch: Search mode (0=deep search with verification, 1=deep search, 2=fast search) search_switch: Search mode (0=deep search with verification, 1=deep search, 2=fast search)
config_id: Optional memory configuration ID config_id: Memory configuration ID (required)
storage_type: Storage backend (neo4j or rag) storage_type: Storage backend (neo4j or rag)
user_rag_memory_id: Optional RAG memory ID user_rag_memory_id: Optional RAG memory ID
@@ -218,7 +241,8 @@ class MemoryAPIService:
# Validate end_user exists and belongs to workspace # Validate end_user exists and belongs to workspace
self.validate_end_user(end_user_id, workspace_id) self.validate_end_user(end_user_id, workspace_id)
# Use end_user_id as end_user_id for memory operations # Update end user's memory_config_id
self._update_end_user_config(end_user_id, config_id)
try: try:
@@ -256,3 +280,50 @@ class MemoryAPIService:
message=f"Memory read failed: {str(e)}", message=f"Memory read failed: {str(e)}",
code=BizCode.MEMORY_READ_FAILED code=BizCode.MEMORY_READ_FAILED
) )
def list_memory_configs(
self,
workspace_id: uuid.UUID,
) -> Dict[str, Any]:
"""List all memory configs for a workspace.
Args:
workspace_id: Workspace ID from API key authorization
Returns:
Dict with configs list and total count
Raises:
BusinessException: If listing fails
"""
logger.info(f"Listing memory configs for workspace: {workspace_id}")
try:
from app.repositories.memory_config_repository import MemoryConfigRepository
results = MemoryConfigRepository.get_all(self.db, workspace_id=workspace_id)
configs = []
for config, scene_name in results:
configs.append({
"config_id": str(config.config_id),
"config_name": config.config_name,
"config_desc": config.config_desc,
"is_default": config.is_default or False,
"scene_name": scene_name,
"created_at": config.created_at.isoformat() if config.created_at else None,
"updated_at": config.updated_at.isoformat() if config.updated_at else None,
})
logger.info(f"Found {len(configs)} memory configs for workspace {workspace_id}")
return {
"configs": configs,
"total": len(configs),
}
except Exception as e:
logger.error(f"Failed to list memory configs for workspace {workspace_id}: {e}")
raise BusinessException(
message=f"Failed to list memory configs: {str(e)}",
code=BizCode.MEMORY_READ_FAILED
)