From e8ae46b286d6dce05c892d98c3e1c98acb54ec49 Mon Sep 17 00:00:00 2001 From: Ke Sun Date: Fri, 20 Mar 2026 21:04:41 +0800 Subject: [PATCH] feat(memory-api): add end user management and enhance memory API endpoints - Add end_user_controller with unauthenticated endpoint for creating end users - Implement get_or_create_end_user logic to handle duplicate end users by other_id - Register end_user_controller router in main controller initialization - Add list_memory_configs endpoint to retrieve all workspace memory configurations - Update MemoryWriteRequest and MemoryReadRequest to make config_id required field - Refactor memory API endpoints to parse request body directly instead of using Body parameter - Add CreateEndUserRequest and CreateEndUserResponse schemas for end user creation - Add ListConfigsResponse schema for configs listing endpoint - Remove unused config_id and llm_model_id parameters from Neo4j write operation - Update .gitignore to exclude redbear-mem-metrics and pitch-deck directories --- .gitignore | 2 + api/app/controllers/__init__.py | 2 + api/app/controllers/end_user_controller.py | 48 +++++++ .../service/memory_api_controller.py | 34 ++++- .../core/memory/agent/utils/write_tools.py | 2 - api/app/schemas/memory_api_schema.py | 80 +++++++++++- api/app/services/memory_agent_service.py | 8 +- api/app/services/memory_api_service.py | 121 ++++++++++++++---- 8 files changed, 260 insertions(+), 37 deletions(-) create mode 100644 api/app/controllers/end_user_controller.py diff --git a/.gitignore b/.gitignore index 66d1beb2..ae3261f0 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,8 @@ examples/ time.log celerybeat-schedule.db search_results.json +redbear-mem-metrics/ +pitch-deck/ api/migrations/versions tmp diff --git a/api/app/controllers/__init__.py b/api/app/controllers/__init__.py index 585de2ed..451dcdf7 100644 --- a/api/app/controllers/__init__.py +++ b/api/app/controllers/__init__.py @@ -13,6 +13,7 @@ from . import ( document_controller, emotion_config_controller, emotion_controller, + end_user_controller, file_controller, file_storage_controller, home_page_controller, @@ -96,5 +97,6 @@ manager_router.include_router(file_storage_controller.router) manager_router.include_router(ontology_controller.router) manager_router.include_router(skill_controller.router) manager_router.include_router(i18n_controller.router) +manager_router.include_router(end_user_controller.router) __all__ = ["manager_router"] diff --git a/api/app/controllers/end_user_controller.py b/api/app/controllers/end_user_controller.py new file mode 100644 index 00000000..b9d54fea --- /dev/null +++ b/api/app/controllers/end_user_controller.py @@ -0,0 +1,48 @@ +"""End User 管理接口 - 无需认证""" + +from app.core.logging_config import get_business_logger +from app.core.response_utils import success +from app.db import get_db +from app.repositories.end_user_repository import EndUserRepository +from app.schemas.memory_api_schema import ( + CreateEndUserRequest, + CreateEndUserResponse, +) +from fastapi import APIRouter, Depends +from sqlalchemy.orm import Session + +router = APIRouter(prefix="/end_users", tags=["End Users"]) +logger = get_business_logger() + + +@router.post("") +async def create_end_user( + data: CreateEndUserRequest, + db: Session = Depends(get_db), +): + """ + Create an end user. + + Creates a new end user for the given workspace. + If an end user with the same other_id already exists in the workspace, + returns the existing one. + """ + logger.info(f"Create end user request - other_id: {data.other_id}, workspace_id: {data.workspace_id}") + + end_user_repo = EndUserRepository(db) + end_user = end_user_repo.get_or_create_end_user( + app_id=None, + workspace_id=data.workspace_id, + other_id=data.other_id, + ) + + logger.info(f"End user ready: {end_user.id}") + + result = { + "id": str(end_user.id), + "other_id": end_user.other_id or "", + "other_name": end_user.other_name or "", + "workspace_id": str(end_user.workspace_id), + } + + return success(data=CreateEndUserResponse(**result).model_dump(), msg="End user created successfully") diff --git a/api/app/controllers/service/memory_api_controller.py b/api/app/controllers/service/memory_api_controller.py index 34489e8a..08a94a89 100644 --- a/api/app/controllers/service/memory_api_controller.py +++ b/api/app/controllers/service/memory_api_controller.py @@ -6,6 +6,7 @@ from app.core.response_utils import success from app.db import get_db from app.schemas.api_key_schema import ApiKeyAuth from app.schemas.memory_api_schema import ( + ListConfigsResponse, MemoryReadRequest, MemoryReadResponse, MemoryWriteRequest, @@ -31,14 +32,15 @@ async def write_memory_api_service( request: Request, api_key_auth: ApiKeyAuth = None, db: Session = Depends(get_db), - payload: MemoryWriteRequest = Body(..., embed=False), - + message: str = Body(..., description="Message content"), ): """ Write memory to storage. Stores memory content for the specified end user using the Memory API Service. """ + body = await request.json() + payload = MemoryWriteRequest(**body) logger.info(f"Memory write request - end_user_id: {payload.end_user_id}, workspace_id: {api_key_auth.workspace_id}") memory_api_service = MemoryAPIService(db) @@ -62,13 +64,15 @@ async def read_memory_api_service( request: Request, api_key_auth: ApiKeyAuth = None, db: Session = Depends(get_db), - payload: MemoryReadRequest = Body(..., embed=False), + message: str = Body(..., description="Query message"), ): """ Read memory from storage. Queries and retrieves memories for the specified end user with context-aware responses. """ + body = await request.json() + payload = MemoryReadRequest(**body) logger.info(f"Memory read request - end_user_id: {payload.end_user_id}") memory_api_service = MemoryAPIService(db) @@ -85,3 +89,27 @@ async def read_memory_api_service( logger.info(f"Memory read successful for end_user: {payload.end_user_id}") return success(data=MemoryReadResponse(**result).model_dump(), msg="Memory read successfully") + + +@router.get("/configs") +@require_api_key(scopes=["memory"]) +async def list_memory_configs( + request: Request, + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """ + List all memory configs for the workspace. + + Returns all available memory configurations associated with the authorized workspace. + """ + logger.info(f"List configs request - workspace_id: {api_key_auth.workspace_id}") + + memory_api_service = MemoryAPIService(db) + + result = memory_api_service.list_memory_configs( + workspace_id=api_key_auth.workspace_id, + ) + + logger.info(f"Listed {result['total']} configs for workspace: {api_key_auth.workspace_id}") + return success(data=ListConfigsResponse(**result).model_dump(), msg="Configs listed successfully") diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index b0c68b19..b62eb50a 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -166,8 +166,6 @@ async def write( statement_entity_edges=all_statement_entity_edges, entity_edges=all_entity_entity_edges, connector=neo4j_connector, - config_id=config_id, - llm_model_id=str(memory_config.llm_model_id) if memory_config.llm_model_id else None, ) if success: logger.info("Successfully saved all data to Neo4j") diff --git a/api/app/schemas/memory_api_schema.py b/api/app/schemas/memory_api_schema.py index 98d257c1..84a34e8a 100644 --- a/api/app/schemas/memory_api_schema.py +++ b/api/app/schemas/memory_api_schema.py @@ -21,7 +21,7 @@ class MemoryWriteRequest(BaseModel): """ end_user_id: str = Field(..., description="End user ID (required)") message: str = Field(..., description="Message content to store") - config_id: Optional[str] = Field(None, description="Memory configuration ID") + config_id: str = Field(..., description="Memory configuration ID (required)") storage_type: str = Field("neo4j", description="Storage type: neo4j or rag") user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID") @@ -68,7 +68,7 @@ class MemoryReadRequest(BaseModel): "0", description="Search mode: 0=verify, 1=direct, 2=context" ) - config_id: Optional[str] = Field(None, description="Memory configuration ID") + config_id: str = Field(..., description="Memory configuration ID (required)") storage_type: str = Field("neo4j", description="Storage type: neo4j or rag") user_rag_memory_id: Optional[str] = Field(None, description="RAG memory ID") @@ -132,3 +132,79 @@ class MemoryReadResponse(BaseModel): description="Intermediate retrieval outputs" ) end_user_id: str = Field(..., description="End user ID") + + +class CreateEndUserRequest(BaseModel): + """Request schema for creating an end user. + + Attributes: + workspace_id: Workspace ID (required) + other_id: External user identifier (required) + other_name: Display name for the end user + """ + workspace_id: str = Field(..., description="Workspace ID (required)") + other_id: str = Field(..., description="External user identifier (required)") + other_name: Optional[str] = Field("", description="Display name") + + @field_validator("workspace_id") + @classmethod + def validate_workspace_id(cls, v: str) -> str: + """Validate that workspace_id is not empty.""" + if not v or not v.strip(): + raise ValueError("workspace_id is required and cannot be empty") + return v.strip() + + @field_validator("other_id") + @classmethod + def validate_other_id(cls, v: str) -> str: + """Validate that other_id is not empty.""" + if not v or not v.strip(): + raise ValueError("other_id is required and cannot be empty") + return v.strip() + + +class CreateEndUserResponse(BaseModel): + """Response schema for end user creation. + + Attributes: + id: Created end user UUID + other_id: External user identifier + other_name: Display name + workspace_id: Workspace the user belongs to + """ + id: str = Field(..., description="End user UUID") + other_id: str = Field(..., description="External user identifier") + other_name: str = Field("", description="Display name") + workspace_id: str = Field(..., description="Workspace ID") + + +class MemoryConfigItem(BaseModel): + """Schema for a single memory config in the list response. + + Attributes: + config_id: Configuration UUID + config_name: Configuration name + config_desc: Configuration description + is_default: Whether this is the workspace default config + scene_name: Associated ontology scene name + created_at: Creation timestamp + updated_at: Last update timestamp + """ + config_id: str = Field(..., description="Configuration ID") + config_name: str = Field(..., description="Configuration name") + config_desc: Optional[str] = Field(None, description="Configuration description") + is_default: bool = Field(False, description="Whether this is the workspace default") + scene_name: Optional[str] = Field(None, description="Associated ontology scene name") + created_at: Optional[str] = Field(None, description="Creation timestamp") + updated_at: Optional[str] = Field(None, description="Last update timestamp") + + +class ListConfigsResponse(BaseModel): + """Response schema for listing memory configs. + + Attributes: + configs: List of memory config items + total: Total number of configs + """ + configs: List[MemoryConfigItem] = Field(default_factory=list, description="List of configs") + total: int = Field(0, description="Total number of configs") diff --git a/api/app/services/memory_agent_service.py b/api/app/services/memory_agent_service.py index 1e1d9e45..af9a04e2 100644 --- a/api/app/services/memory_agent_service.py +++ b/api/app/services/memory_agent_service.py @@ -1179,7 +1179,7 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An app = db.query(App).filter(App.id == app_id).first() if not app: logger.warning(f"App not found: {app_id}") - raise ValueError(f"应用不存在: {app_id}") + # raise ValueError(f"应用不存在: {app_id}") # TODO: temp fix for draft run # if not app.current_release_id: # logger.warning(f"No current release for app: {app_id}") @@ -1252,17 +1252,15 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An memory_config_service = MemoryConfigService(db) memory_config = memory_config_service.get_config_with_fallback( memory_config_id=memory_config_id_to_use, - workspace_id=app.workspace_id + workspace_id=end_user.workspace_id ) memory_config_id = str(memory_config.config_id) if memory_config else None result = { "end_user_id": str(end_user_id), - "app_id": str(app_id), - "release_id": str(app.current_release_id) if app.current_release_id else None, "memory_config_id": memory_config_id, - "workspace_id": str(app.workspace_id) + "workspace_id": str(end_user.workspace_id) } logger.info( diff --git a/api/app/services/memory_api_service.py b/api/app/services/memory_api_service.py index f86fbed8..01bc6267 100644 --- a/api/app/services/memory_api_service.py +++ b/api/app/services/memory_api_service.py @@ -84,43 +84,65 @@ class MemoryAPIService: if not app: logger.warning(f"App not found for end_user: {end_user_id}") - raise ResourceNotFoundException( - resource_type="App", - resource_id=str(end_user.app_id) - ) - - if app.workspace_id != workspace_id: - logger.warning( - f"End user {end_user_id} belongs to workspace {app.workspace_id}, " - f"not authorized workspace {workspace_id}" - ) - raise BusinessException( - message="End user does not belong to authorized workspace", - code=BizCode.FORBIDDEN - ) + # raise ResourceNotFoundException( + # resource_type="App", + # resource_id=str(end_user.app_id) + # ) + # temporally allow any workspace to access + # if end_user.workspace_id != workspace_id: + # print(f"[DEBUG] end_user.workspace_id={end_user.workspace_id}, api_key.workspace_id={workspace_id}") + # logger.warning( + # f"End user {end_user_id} belongs to workspace {end_user.workspace_id}, " + # f"not authorized workspace {workspace_id}" + # ) + # raise BusinessException( + # message=f"End user does not belong to authorized workspace. end_user.workspace_id={end_user.workspace_id}, api_key.workspace_id={workspace_id}", + # code=BizCode.FORBIDDEN + # ) logger.info(f"End user {end_user_id} validated successfully") return end_user - + + def _update_end_user_config(self, end_user_id: str, config_id: str) -> None: + """Update the end user's memory_config_id. + + Silently updates the config association. Logs warnings on failure + but does not raise, so it won't block the main read/write operation. + + Args: + end_user_id: End user identifier + config_id: Memory configuration ID to assign + """ + try: + config_uuid = uuid.UUID(config_id) + from app.repositories.end_user_repository import EndUserRepository + end_user_repo = EndUserRepository(self.db) + end_user_repo.update_memory_config_id( + end_user_id=uuid.UUID(end_user_id), + memory_config_id=config_uuid, + ) + except Exception as e: + logger.warning(f"Failed to update memory_config_id for end_user {end_user_id}: {e}") + async def write_memory( self, workspace_id: uuid.UUID, end_user_id: str, message: str, - config_id: Optional[str] = None, + config_id: str, storage_type: str = "neo4j", user_rag_memory_id: Optional[str] = None, ) -> Dict[str, Any]: """Write memory with validation. - Validates end_user exists and belongs to workspace, then delegates - to MemoryAgentService.write_memory. + Validates end_user exists and belongs to workspace, updates the end user's + memory_config_id, then delegates to MemoryAgentService.write_memory. Args: workspace_id: Workspace ID for resource validation end_user_id: End user identifier (used as end_user_id) message: Message content to store - config_id: Optional memory configuration ID + config_id: Memory configuration ID (required) storage_type: Storage backend (neo4j or rag) user_rag_memory_id: Optional RAG memory ID @@ -136,7 +158,8 @@ class MemoryAPIService: # Validate end_user exists and belongs to workspace self.validate_end_user(end_user_id, workspace_id) - # Use end_user_id as end_user_id for memory operations + # Update end user's memory_config_id + self._update_end_user_config(end_user_id, config_id) try: # Delegate to MemoryAgentService @@ -188,21 +211,21 @@ class MemoryAPIService: end_user_id: str, message: str, search_switch: str = "0", - config_id: Optional[str] = None, + config_id: str = "", storage_type: str = "neo4j", user_rag_memory_id: Optional[str] = None, ) -> Dict[str, Any]: """Read memory with validation. - Validates end_user exists and belongs to workspace, then delegates - to MemoryAgentService.read_memory. + Validates end_user exists and belongs to workspace, updates the end user's + memory_config_id, then delegates to MemoryAgentService.read_memory. Args: workspace_id: Workspace ID for resource validation end_user_id: End user identifier (used as end_user_id) message: Query message search_switch: Search mode (0=deep search with verification, 1=deep search, 2=fast search) - config_id: Optional memory configuration ID + config_id: Memory configuration ID (required) storage_type: Storage backend (neo4j or rag) user_rag_memory_id: Optional RAG memory ID @@ -218,7 +241,8 @@ class MemoryAPIService: # Validate end_user exists and belongs to workspace self.validate_end_user(end_user_id, workspace_id) - # Use end_user_id as end_user_id for memory operations + # Update end user's memory_config_id + self._update_end_user_config(end_user_id, config_id) try: @@ -256,3 +280,50 @@ class MemoryAPIService: message=f"Memory read failed: {str(e)}", code=BizCode.MEMORY_READ_FAILED ) + + def list_memory_configs( + self, + workspace_id: uuid.UUID, + ) -> Dict[str, Any]: + """List all memory configs for a workspace. + + Args: + workspace_id: Workspace ID from API key authorization + + Returns: + Dict with configs list and total count + + Raises: + BusinessException: If listing fails + """ + logger.info(f"Listing memory configs for workspace: {workspace_id}") + + try: + from app.repositories.memory_config_repository import MemoryConfigRepository + + results = MemoryConfigRepository.get_all(self.db, workspace_id=workspace_id) + + configs = [] + for config, scene_name in results: + configs.append({ + "config_id": str(config.config_id), + "config_name": config.config_name, + "config_desc": config.config_desc, + "is_default": config.is_default or False, + "scene_name": scene_name, + "created_at": config.created_at.isoformat() if config.created_at else None, + "updated_at": config.updated_at.isoformat() if config.updated_at else None, + }) + + logger.info(f"Found {len(configs)} memory configs for workspace {workspace_id}") + return { + "configs": configs, + "total": len(configs), + } + + except Exception as e: + logger.error(f"Failed to list memory configs for workspace {workspace_id}: {e}") + raise BusinessException( + message=f"Failed to list memory configs: {str(e)}", + code=BizCode.MEMORY_READ_FAILED + )