From 5c836c90c94e46dde6bf98fe0c56482b0b716c81 Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Thu, 23 Apr 2026 12:05:31 +0800 Subject: [PATCH 1/5] feat(memory): add episodic memory pagination and semantic memory list API Split explicit memory overview into two independent endpoints: - GET /memory/explicit-memory/episodics: episodic memory paginated query with date range filter (millisecond timestamp) and episodic type filter using Neo4j datetime() for precise time comparison - GET /memory/explicit-memory/semantics: semantic memory full list query returns data as array directly Modified files: - api/app/controllers/memory_explicit_controller.py - api/app/services/memory_explicit_service.py --- .../controllers/memory_explicit_controller.py | 139 +++++++++++- api/app/services/memory_explicit_service.py | 206 +++++++++++++++++- 2 files changed, 342 insertions(+), 3 deletions(-) diff --git a/api/app/controllers/memory_explicit_controller.py b/api/app/controllers/memory_explicit_controller.py index c52f308c..90758dc7 100644 --- a/api/app/controllers/memory_explicit_controller.py +++ b/api/app/controllers/memory_explicit_controller.py @@ -4,7 +4,10 @@ 处理显性记忆相关的API接口,包括情景记忆和语义记忆的查询。 """ -from fastapi import APIRouter, Depends +from datetime import date +from typing import Optional + +from fastapi import APIRouter, Depends, Query from app.core.logging_config import get_api_logger from app.core.response_utils import success, fail @@ -69,6 +72,140 @@ async def get_explicit_memory_overview_api( return fail(BizCode.INTERNAL_ERROR, "显性记忆总览查询失败", str(e)) +@router.get("/episodics", response_model=ApiResponse) +async def get_episodic_memory_list_api( + end_user_id: str = Query(..., description="end user ID"), + page: int = Query(1, gt=0, description="page number, starting from 1"), + pagesize: int = Query(10, gt=0, le=100, description="number of items per page, max 100"), + start_date: Optional[int] = Query(None, description="start timestamp (ms)"), + end_date: Optional[int] = Query(None, description="end timestamp (ms)"), + episodic_type: str = Query("all", description="episodic type :all/conversation/project_work/learning/decision/important_event"), + current_user: User = Depends(get_current_user), +) -> dict: + """ + 获取情景记忆分页列表 + + 返回指定用户的情景记忆列表,支持分页、时间范围筛选和情景类型筛选。 + + Args: + end_user_id: 终端用户ID(必填) + page: 页码(从1开始,默认1) + pagesize: 每页数量(默认10,最大100) + start_date: 开始时间戳(可选,毫秒),自动扩展到当天 00:00:00 + end_date: 结束时间戳(可选,毫秒),自动扩展到当天 23:59:59 + episodic_type: 情景类型筛选(可选,默认all) + current_user: 当前用户 + + Returns: + ApiResponse: 包含情景记忆分页列表 + + Examples: + - 基础分页查询:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5 + 返回第1页,每页5条数据 + - 按时间范围筛选:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5&start_date=1738684800000&end_date=1738771199000 + 返回指定时间范围内的数据 + - 按情景类型筛选:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5&episodic_type=important_event + 返回类型为"重要事件"的数据 + + Notes: + - start_date 和 end_date 必须同时提供或同时不提供 + - start_date 不能大于 end_date + - episodic_type 可选值:all, conversation, project_work, learning, decision, important_event + - total 为该用户情景记忆总数(不受筛选条件影响) + - page.total 为筛选后的总条数 + """ + workspace_id = current_user.current_workspace_id + + # 检查用户是否已选择工作空间 + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆列表但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"情景记忆分页查询: end_user_id={end_user_id}, " + f"start_date={start_date}, end_date={end_date}, episodic_type={episodic_type}, " + f"page={page}, pagesize={pagesize}, username={current_user.username}" + ) + + # 1. 参数校验 + if page < 1 or pagesize < 1: + api_logger.warning(f"分页参数错误: page={page}, pagesize={pagesize}") + return fail(BizCode.INVALID_PARAMETER, "分页参数必须大于0") + + valid_episodic_types = ["all", "conversation", "project_work", "learning", "decision", "important_event"] + if episodic_type not in valid_episodic_types: + api_logger.warning(f"无效的情景类型参数: {episodic_type}") + return fail(BizCode.INVALID_PARAMETER, f"无效的情景类型参数,可选值:{', '.join(valid_episodic_types)}") + + # 时间戳参数校验 + if (start_date is not None and end_date is None) or (end_date is not None and start_date is None): + return fail(BizCode.INVALID_PARAMETER, "start_date和end_date必须同时提供") + + if start_date is not None and end_date is not None and start_date > end_date: + return fail(BizCode.INVALID_PARAMETER, "start_date不能大于end_date") + + # 2. 执行查询 + try: + result = await memory_explicit_service.get_episodic_memory_list( + end_user_id=end_user_id, + page=page, + pagesize=pagesize, + start_date=start_date, + end_date=end_date, + episodic_type=episodic_type, + ) + api_logger.info( + f"情景记忆分页查询成功: end_user_id={end_user_id}, " + f"total={result['total']}, 返回={len(result['items'])}条" + ) + except Exception as e: + api_logger.error(f"情景记忆分页查询失败: end_user_id={end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "情景记忆分页查询失败", str(e)) + + # 3. 返回结构化响应 + return success(data=result, msg="查询成功") + +@router.get("/semantics", response_model=ApiResponse) +async def get_semantic_memory_list_api( + end_user_id: str = Query(..., description="终端用户ID"), + current_user: User = Depends(get_current_user), +) -> dict: + """ + 获取语义记忆列表 + + 返回指定用户的全量语义记忆列表。 + + Args: + end_user_id: 终端用户ID(必填) + current_user: 当前用户 + + Returns: + ApiResponse: 包含语义记忆全量列表 + """ + workspace_id = current_user.current_workspace_id + + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询语义记忆列表但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"语义记忆列表查询: end_user_id={end_user_id}, username={current_user.username}" + ) + + try: + result = await memory_explicit_service.get_semantic_memory_list( + end_user_id=end_user_id + ) + api_logger.info( + f"语义记忆列表查询成功: end_user_id={end_user_id}, total={len(result)}" + ) + except Exception as e: + api_logger.error(f"语义记忆列表查询失败: end_user_id={end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "语义记忆列表查询失败", str(e)) + + return success(data=result, msg="查询成功") + + @router.post("/details", response_model=ApiResponse) async def get_explicit_memory_details_api( request: ExplicitMemoryDetailsRequest, diff --git a/api/app/services/memory_explicit_service.py b/api/app/services/memory_explicit_service.py index f8d39ae8..c5ab2e38 100644 --- a/api/app/services/memory_explicit_service.py +++ b/api/app/services/memory_explicit_service.py @@ -4,7 +4,8 @@ 处理显性记忆相关的业务逻辑,包括情景记忆和语义记忆的查询。 """ -from typing import Any, Dict +from datetime import date +from typing import Any, Dict, Optional from app.core.logging_config import get_logger from app.services.memory_base_service import MemoryBaseService @@ -104,7 +105,7 @@ class MemoryExplicitService(MemoryBaseService): e.description AS core_definition ORDER BY e.name ASC """ - + semantic_result = await self.neo4j_connector.execute_query( semantic_query, end_user_id=end_user_id @@ -146,6 +147,207 @@ class MemoryExplicitService(MemoryBaseService): logger.error(f"获取显性记忆总览时出错: {str(e)}", exc_info=True) raise + + async def get_episodic_memory_list( + self, + end_user_id: str, + page: int, + pagesize: int, + start_date: Optional[int] = None, + end_date: Optional[int] = None, + episodic_type: str = "all", + ) -> Dict[str, Any]: + """ + 获取情景记忆分页列表 + + Args: + end_user_id: 终端用户ID + page: 页码 + pagesize: 每页数量 + start_date: 开始时间戳(毫秒),可选 + end_date: 结束时间戳(毫秒),可选 + episodic_type: 情景类型筛选 + + Returns: + { + "total": int, # 该用户情景记忆总数(不受筛选影响) + "items": [...], # 当前页数据 + "page": { + "page": int, + "pagesize": int, + "total": int, # 筛选后总数 + "hasnext": bool + } + } + """ + try: + logger.info( + f"情景记忆分页查询: end_user_id={end_user_id}, " + f"start_date={start_date}, end_date={end_date}, " + f"episodic_type={episodic_type}, page={page}, pagesize={pagesize}" + ) + + # 1. 查询情景记忆总数(不受筛选条件限制) + total_all_query = """ + MATCH (s:MemorySummary) + WHERE s.end_user_id = $end_user_id + RETURN count(s) AS total + """ + total_all_result = await self.neo4j_connector.execute_query( + total_all_query, end_user_id=end_user_id + ) + total_all = total_all_result[0]["total"] if total_all_result else 0 + + # 2. 构建筛选条件 + where_clauses = ["s.end_user_id = $end_user_id"] + params = {"end_user_id": end_user_id} + + # 时间戳筛选(毫秒时间戳转为 ISO 字符串,使用 Neo4j datetime() 精确比较) + if start_date is not None and end_date is not None: + from datetime import datetime + start_dt = datetime.fromtimestamp(start_date / 1000) + end_dt = datetime.fromtimestamp(end_date / 1000) + # 开始时间取当天 00:00:00,结束时间取当天 23:59:59.999999 + start_iso = start_dt.strftime("%Y-%m-%dT") + "00:00:00.000000" + end_iso = end_dt.strftime("%Y-%m-%dT") + "23:59:59.999999" + + where_clauses.append("datetime(s.created_at) >= datetime($start_iso) AND datetime(s.created_at) <= datetime($end_iso)") + params["start_iso"] = start_iso + params["end_iso"] = end_iso + + # 类型筛选下推到 Cypher(兼容中英文) + if episodic_type != "all": + type_mapping = { + "conversation": "对话", + "project_work": "项目/工作", + "learning": "学习", + "decision": "决策", + "important_event": "重要事件" + } + chinese_type = type_mapping.get(episodic_type) + if chinese_type: + where_clauses.append( + "(s.memory_type = $episodic_type OR s.memory_type = $chinese_type)" + ) + params["episodic_type"] = episodic_type + params["chinese_type"] = chinese_type + else: + where_clauses.append("s.memory_type = $episodic_type") + params["episodic_type"] = episodic_type + + where_str = " AND ".join(where_clauses) + + # 3. 查询筛选后的总数 + count_query = f""" + MATCH (s:MemorySummary) + WHERE {where_str} + RETURN count(s) AS total + """ + count_result = await self.neo4j_connector.execute_query(count_query, **params) + filtered_total = count_result[0]["total"] if count_result else 0 + + # 4. 查询分页数据 + skip = (page - 1) * pagesize + data_query = f""" + MATCH (s:MemorySummary) + WHERE {where_str} + RETURN elementId(s) AS id, + s.name AS title, + s.memory_type AS memory_type, + s.content AS content, + s.created_at AS created_at + ORDER BY s.created_at DESC + SKIP {skip} LIMIT {pagesize} + """ + + result = await self.neo4j_connector.execute_query(data_query, **params) + + # 5. 处理结果 + items = [] + if result: + for record in result: + raw_created_at = record.get("created_at") + created_at_timestamp = self.parse_timestamp(raw_created_at) + items.append({ + "id": record["id"], + "title": record.get("title") or "未命名", + "memory_type": record.get("memory_type") or "其他", + "content": record.get("content") or "", + "created_at": created_at_timestamp + }) + + # 6. 构建返回结果 + return { + "total": total_all, + "items": items, + "page": { + "page": page, + "pagesize": pagesize, + "total": filtered_total, + "hasnext": (page * pagesize) < filtered_total + } + } + + except Exception as e: + logger.error(f"情景记忆分页查询出错: {str(e)}", exc_info=True) + raise + + async def get_semantic_memory_list( + self, + end_user_id: str + ) -> list: + """ + 获取语义记忆全量列表 + + Args: + end_user_id: 终端用户ID + + Returns: + [ + { + "id": str, + "name": str, + "entity_type": str, + "core_definition": str + } + ] + """ + try: + logger.info(f"语义记忆列表查询: end_user_id={end_user_id}") + + semantic_query = """ + MATCH (e:ExtractedEntity) + WHERE e.end_user_id = $end_user_id + AND e.is_explicit_memory = true + RETURN elementId(e) AS id, + e.name AS name, + e.entity_type AS entity_type, + e.description AS core_definition + ORDER BY e.name ASC + """ + + result = await self.neo4j_connector.execute_query( + semantic_query, end_user_id=end_user_id + ) + + items = [] + if result: + for record in result: + items.append({ + "id": record["id"], + "name": record.get("name") or "未命名", + "entity_type": record.get("entity_type") or "未分类", + "core_definition": record.get("core_definition") or "" + }) + + logger.info(f"语义记忆列表查询成功: end_user_id={end_user_id}, total={len(items)}") + + return items + + except Exception as e: + logger.error(f"语义记忆列表查询出错: {str(e)}", exc_info=True) + raise + async def get_explicit_memory_details( self, end_user_id: str, From bf9a3503de19ec3291b423cbf4d6d80648eff6cd Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Thu, 23 Apr 2026 15:23:39 +0800 Subject: [PATCH 2/5] feat(memory-api): add memory detail external service APIs Add external service APIs for memory detail queries Provides memory data access endpoints for external service integration Add utility functions for API key user resolution and end_user validation Modified files: - api/app/controllers/service/user_memory_api_controller.py - api/app/core/api_key_utils.py - api/app/controllers/service/__init__.py --- api/app/controllers/service/__init__.py | 2 + .../service/user_memory_api_controller.py | 197 ++++++++++++++++++ api/app/core/api_key_utils.py | 66 ++++++ 3 files changed, 265 insertions(+) create mode 100644 api/app/controllers/service/user_memory_api_controller.py diff --git a/api/app/controllers/service/__init__.py b/api/app/controllers/service/__init__.py index 52d4b732..850b496d 100644 --- a/api/app/controllers/service/__init__.py +++ b/api/app/controllers/service/__init__.py @@ -14,6 +14,7 @@ from . import ( rag_api_document_controller, rag_api_file_controller, rag_api_knowledge_controller, + user_memory_api_controller, ) # 创建 V1 API 路由器 @@ -28,5 +29,6 @@ service_router.include_router(rag_api_chunk_controller.router) service_router.include_router(memory_api_controller.router) service_router.include_router(end_user_api_controller.router) service_router.include_router(memory_config_api_controller.router) +service_router.include_router(user_memory_api_controller.router) __all__ = ["service_router"] diff --git a/api/app/controllers/service/user_memory_api_controller.py b/api/app/controllers/service/user_memory_api_controller.py new file mode 100644 index 00000000..6d2498c0 --- /dev/null +++ b/api/app/controllers/service/user_memory_api_controller.py @@ -0,0 +1,197 @@ +"""User Memory 服务接口 — 基于 API Key 认证 + +包装 user_memory_controllers.py 和 memory_agent_controller.py 中的内部接口, +提供基于 API Key 认证的对外服务: +1./analytics/graph_data - 知识图谱数据接口 +2./analytics/community_graph - 社区图谱接口 +3./analytics/node_statistics - 记忆节点统计接口 +4./analytics/user_summary - 用户摘要接口 +5./analytics/memory_insight - 记忆洞察接口 +6./analytics/interest_distribution - 兴趣分布接口 +7./analytics/end_user_info - 终端用户信息接口 + + +路由前缀: /memory +子路径: /analytics/... +最终路径: /v1/memory/analytics/... +认证方式: API Key (@require_api_key) +""" + +from typing import Optional + +from fastapi import APIRouter, Depends, Header, Query, Request +from sqlalchemy.orm import Session + +from app.core.api_key_auth import require_api_key +from app.core.api_key_utils import get_current_user_from_api_key, validate_end_user_in_workspace +from app.core.logging_config import get_business_logger +from app.db import get_db +from app.schemas.api_key_schema import ApiKeyAuth + +# 包装内部服务 controller +from app.controllers import user_memory_controllers, memory_agent_controller + +router = APIRouter(prefix="/memory", tags=["V1 - User Memory API"]) +logger = get_business_logger() + + +# ==================== 知识图谱 ==================== + + +@router.get("/analytics/graph_data") +@require_api_key(scopes=["memory"]) +async def get_graph_data( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + node_types: Optional[str] = Query(None, description="Comma-separated node types filter"), + limit: int = Query(100, description="Max nodes to return, capped at 1000"), + depth: int = Query(1, description="Graph traversal depth, capped at 3"), + center_node_id: Optional[str] = Query(None, description="Center node for subgraph"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get knowledge graph data (nodes + edges) for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_graph_data_api( + end_user_id=end_user_id, + node_types=node_types, + limit=limit, + depth=depth, + center_node_id=center_node_id, + current_user=current_user, + db=db, + ) + + +@router.get("/analytics/community_graph") +@require_api_key(scopes=["memory"]) +async def get_community_graph( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get community clustering graph for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_community_graph_data_api( + end_user_id=end_user_id, + current_user=current_user, + db=db, + ) + + +# ==================== 节点统计 ==================== + + +@router.get("/analytics/node_statistics") +@require_api_key(scopes=["memory"]) +async def get_node_statistics( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get memory node type statistics for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_node_statistics_api( + end_user_id=end_user_id, + current_user=current_user, + db=db, + ) + + +# ==================== 用户摘要 & 洞察 ==================== + + +@router.get("/analytics/user_summary") +@require_api_key(scopes=["memory"]) +async def get_user_summary( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + language_type: str = Header(default=None, alias="X-Language-Type"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get cached user summary for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_user_summary_api( + end_user_id=end_user_id, + language_type=language_type, + current_user=current_user, + db=db, + ) + + +@router.get("/analytics/memory_insight") +@require_api_key(scopes=["memory"]) +async def get_memory_insight( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get cached memory insight report for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_memory_insight_report_api( + end_user_id=end_user_id, + current_user=current_user, + db=db, + ) + + +# ==================== 兴趣分布 ==================== + + +@router.get("/analytics/interest_distribution") +@require_api_key(scopes=["memory"]) +async def get_interest_distribution( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + limit: int = Query(5, le=5, description="Max interest tags to return"), + language_type: str = Header(default=None, alias="X-Language-Type"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get interest distribution tags for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await memory_agent_controller.get_interest_distribution_by_user_api( + end_user_id=end_user_id, + limit=limit, + language_type=language_type, + current_user=current_user, + db=db, + ) + + +# ==================== 终端用户信息 ==================== + + +@router.get("/analytics/end_user_info") +@require_api_key(scopes=["memory"]) +async def get_end_user_info( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get end user basic information (name, aliases, metadata).""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_end_user_info( + end_user_id=end_user_id, + current_user=current_user, + db=db, + ) \ No newline at end of file diff --git a/api/app/core/api_key_utils.py b/api/app/core/api_key_utils.py index fb6b9552..290630ce 100644 --- a/api/app/core/api_key_utils.py +++ b/api/app/core/api_key_utils.py @@ -3,6 +3,12 @@ import secrets from typing import Optional, Union from datetime import datetime +from sqlalchemy.orm import Session as _Session +from app.core.error_codes import BizCode as _BizCode +from app.core.exceptions import BusinessException as _BusinessException +from app.models.end_user_model import EndUser as _EndUser +from app.repositories.end_user_repository import EndUserRepository as _EndUserRepository + from app.models.api_key_model import ApiKeyType from fastapi import Response from fastapi.responses import JSONResponse @@ -65,3 +71,63 @@ def datetime_to_timestamp(dt: Optional[datetime]) -> Optional[int]: return None return int(dt.timestamp() * 1000) + + +def get_current_user_from_api_key(db: _Session, api_key_auth): + """通过 API Key 构造 current_user 对象。 + + 从 API Key 反查创建者(管理员用户),并设置其 workspace 上下文。 + 与内部接口的 Depends(get_current_user) (JWT) 等价。 + + Args: + db: 数据库会话 + api_key_auth: API Key 认证信息(ApiKeyAuth) + + Returns: + User ORM 对象,已设置 current_workspace_id + """ + from app.services import api_key_service + + api_key = api_key_service.ApiKeyService.get_api_key( + db, api_key_auth.api_key_id, api_key_auth.workspace_id + ) + current_user = api_key.creator + current_user.current_workspace_id = api_key_auth.workspace_id + return current_user + + +def validate_end_user_in_workspace( + db: _Session, + end_user_id: str, + workspace_id, +) -> _EndUser: + """校验 end_user 是否存在且属于指定 workspace。 + + Args: + db: 数据库会话 + end_user_id: 终端用户 ID + workspace_id: 工作空间 ID(UUID 或字符串均可) + + Returns: + EndUser ORM 对象(校验通过时) + + Raises: + BusinessException(USER_NOT_FOUND): end_user 不存在 + BusinessException(PERMISSION_DENIED): end_user 不属于该 workspace + """ + end_user_repo = _EndUserRepository(db) + end_user = end_user_repo.get_end_user_by_id(end_user_id) + + if end_user is None: + raise _BusinessException( + "End user not found", + _BizCode.USER_NOT_FOUND, + ) + + if str(end_user.workspace_id) != str(workspace_id): + raise _BusinessException( + "End user does not belong to this workspace", + _BizCode.PERMISSION_DENIED, + ) + + return end_user \ No newline at end of file From aac89b172fe0aa7ba66ccb449ba8ba7cc67f060b Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Thu, 23 Apr 2026 15:26:49 +0800 Subject: [PATCH 3/5] fix(memory): remove unused date import and fix docstring route paths Remove unused rom datetime import date in controller and service Fix Examples route paths from /episodic-list to /episodics to match actual router --- api/app/controllers/memory_explicit_controller.py | 7 +++---- api/app/services/memory_explicit_service.py | 1 - 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/api/app/controllers/memory_explicit_controller.py b/api/app/controllers/memory_explicit_controller.py index 90758dc7..88877de3 100644 --- a/api/app/controllers/memory_explicit_controller.py +++ b/api/app/controllers/memory_explicit_controller.py @@ -4,7 +4,6 @@ 处理显性记忆相关的API接口,包括情景记忆和语义记忆的查询。 """ -from datetime import date from typing import Optional from fastapi import APIRouter, Depends, Query @@ -100,11 +99,11 @@ async def get_episodic_memory_list_api( ApiResponse: 包含情景记忆分页列表 Examples: - - 基础分页查询:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5 + - 基础分页查询:GET /episodics?end_user_id=xxx&page=1&pagesize=5 返回第1页,每页5条数据 - - 按时间范围筛选:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5&start_date=1738684800000&end_date=1738771199000 + - 按时间范围筛选:GET /episodics?end_user_id=xxx&page=1&pagesize=5&start_date=1738684800000&end_date=1738771199000 返回指定时间范围内的数据 - - 按情景类型筛选:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5&episodic_type=important_event + - 按情景类型筛选:GET /episodics?end_user_id=xxx&page=1&pagesize=5&episodic_type=important_event 返回类型为"重要事件"的数据 Notes: diff --git a/api/app/services/memory_explicit_service.py b/api/app/services/memory_explicit_service.py index c5ab2e38..8da3b167 100644 --- a/api/app/services/memory_explicit_service.py +++ b/api/app/services/memory_explicit_service.py @@ -4,7 +4,6 @@ 处理显性记忆相关的业务逻辑,包括情景记忆和语义记忆的查询。 """ -from datetime import date from typing import Any, Dict, Optional from app.core.logging_config import get_logger From 7ac0eff0b89c1bbec4ef0b1c1e52031da1de62ad Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Thu, 23 Apr 2026 16:29:22 +0800 Subject: [PATCH 4/5] fix(memory): fix problems - Parameterize SKIP/LIMIT in Cypher query instead of f-string interpolation - Add UUID format validation in validate_end_user_in_workspace before DB query - Update limit/depth Query descriptions to clarify auto-cap behavior in service layer - Move uuid import to module level in api_key_utils.py Modified files: - api/app/services/memory_explicit_service.py - api/app/core/api_key_utils.py - api/app/controllers/service/user_memory_api_controller.py --- .../controllers/service/user_memory_api_controller.py | 4 ++-- api/app/core/api_key_utils.py | 10 ++++++++++ api/app/services/memory_explicit_service.py | 4 +++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/api/app/controllers/service/user_memory_api_controller.py b/api/app/controllers/service/user_memory_api_controller.py index 6d2498c0..ecbdec50 100644 --- a/api/app/controllers/service/user_memory_api_controller.py +++ b/api/app/controllers/service/user_memory_api_controller.py @@ -44,8 +44,8 @@ async def get_graph_data( request: Request, end_user_id: str = Query(..., description="End user ID"), node_types: Optional[str] = Query(None, description="Comma-separated node types filter"), - limit: int = Query(100, description="Max nodes to return, capped at 1000"), - depth: int = Query(1, description="Graph traversal depth, capped at 3"), + limit: int = Query(100, description="Max nodes to return (auto-capped at 1000 in service layer)"), + depth: int = Query(1, description="Graph traversal depth (auto-capped at 3 in service layer)"), center_node_id: Optional[str] = Query(None, description="Center node for subgraph"), api_key_auth: ApiKeyAuth = None, db: Session = Depends(get_db), diff --git a/api/app/core/api_key_utils.py b/api/app/core/api_key_utils.py index 290630ce..7687d8af 100644 --- a/api/app/core/api_key_utils.py +++ b/api/app/core/api_key_utils.py @@ -1,5 +1,6 @@ """API Key 工具函数""" import secrets +import uuid as _uuid from typing import Optional, Union from datetime import datetime @@ -112,9 +113,18 @@ def validate_end_user_in_workspace( EndUser ORM 对象(校验通过时) Raises: + BusinessException(INVALID_PARAMETER): end_user_id 格式无效 BusinessException(USER_NOT_FOUND): end_user 不存在 BusinessException(PERMISSION_DENIED): end_user 不属于该 workspace """ + try: + _uuid.UUID(end_user_id) + except (ValueError, AttributeError): + raise _BusinessException( + f"Invalid end_user_id format: {end_user_id}", + _BizCode.INVALID_PARAMETER, + ) + end_user_repo = _EndUserRepository(db) end_user = end_user_repo.get_end_user_by_id(end_user_id) diff --git a/api/app/services/memory_explicit_service.py b/api/app/services/memory_explicit_service.py index 8da3b167..e640124e 100644 --- a/api/app/services/memory_explicit_service.py +++ b/api/app/services/memory_explicit_service.py @@ -256,8 +256,10 @@ class MemoryExplicitService(MemoryBaseService): s.content AS content, s.created_at AS created_at ORDER BY s.created_at DESC - SKIP {skip} LIMIT {pagesize} + SKIP $skip LIMIT $limit """ + params["skip"] = skip + params["limit"] = pagesize result = await self.neo4j_connector.execute_query(data_query, **params) From 4619b40d031c97308148419b55d2abcb43d84a70 Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Thu, 23 Apr 2026 19:32:13 +0800 Subject: [PATCH 5/5] =?UTF-8?q?fix(memory):=20fix=20timezone=20and=20add?= =?UTF-8?q?=20generate=5Fcache=20API=20endpoint=20=EF=BB=BF=20-=20Fix=20ep?= =?UTF-8?q?isodic=20memory=20time=20filter=20to=20use=20UTC=20(datetime.fr?= =?UTF-8?q?omtimestamp=20with=20tz=3Dtimezone.utc)=20=20=20to=20match=20Ne?= =?UTF-8?q?o4j=20stored=20UTC=20timestamps=20-=20Add=20POST=20/v1/memory/a?= =?UTF-8?q?nalytics/generate=5Fcache=20endpoint=20for=20cache=20generation?= =?UTF-8?q?=20via=20API=20Key=20=EF=BB=BF=20Modified=20files:=20-=20api/ap?= =?UTF-8?q?p/services/memory=5Fexplicit=5Fservice.py=20-=20api/app/control?= =?UTF-8?q?lers/service/user=5Fmemory=5Fapi=5Fcontroller.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/user_memory_api_controller.py | 37 ++++++++++++++++++- api/app/services/memory_explicit_service.py | 10 ++--- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/api/app/controllers/service/user_memory_api_controller.py b/api/app/controllers/service/user_memory_api_controller.py index ecbdec50..19a3a92f 100644 --- a/api/app/controllers/service/user_memory_api_controller.py +++ b/api/app/controllers/service/user_memory_api_controller.py @@ -9,6 +9,7 @@ 5./analytics/memory_insight - 记忆洞察接口 6./analytics/interest_distribution - 兴趣分布接口 7./analytics/end_user_info - 终端用户信息接口 +8./analytics/generate_cache - 缓存生成接口 路由前缀: /memory @@ -19,7 +20,7 @@ from typing import Optional -from fastapi import APIRouter, Depends, Header, Query, Request +from fastapi import APIRouter, Depends, Header, Query, Request, Body from sqlalchemy.orm import Session from app.core.api_key_auth import require_api_key @@ -27,6 +28,7 @@ from app.core.api_key_utils import get_current_user_from_api_key, validate_end_u from app.core.logging_config import get_business_logger from app.db import get_db from app.schemas.api_key_schema import ApiKeyAuth +from app.schemas.memory_storage_schema import GenerateCacheRequest # 包装内部服务 controller from app.controllers import user_memory_controllers, memory_agent_controller @@ -194,4 +196,35 @@ async def get_end_user_info( end_user_id=end_user_id, current_user=current_user, db=db, - ) \ No newline at end of file + ) + + +# ==================== 缓存生成 ==================== + + +@router.post("/analytics/generate_cache") +@require_api_key(scopes=["memory"]) +async def generate_cache( + request: Request, + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), + message: str = Body(None, description="Request body"), + language_type: str = Header(default=None, alias="X-Language-Type"), +): + """Trigger cache generation (user summary + memory insight) for an end user or all workspace users.""" + body = await request.json() + cache_request = GenerateCacheRequest(**body) + + current_user = get_current_user_from_api_key(db, api_key_auth) + + if cache_request.end_user_id: + validate_end_user_in_workspace(db, cache_request.end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.generate_cache_api( + request=cache_request, + language_type=language_type, + current_user=current_user, + db=db, + ) + + diff --git a/api/app/services/memory_explicit_service.py b/api/app/services/memory_explicit_service.py index e640124e..4d9a5c2b 100644 --- a/api/app/services/memory_explicit_service.py +++ b/api/app/services/memory_explicit_service.py @@ -201,12 +201,12 @@ class MemoryExplicitService(MemoryBaseService): where_clauses = ["s.end_user_id = $end_user_id"] params = {"end_user_id": end_user_id} - # 时间戳筛选(毫秒时间戳转为 ISO 字符串,使用 Neo4j datetime() 精确比较) + # 时间戳筛选(毫秒时间戳转为 UTC ISO 字符串,使用 Neo4j datetime() 精确比较) if start_date is not None and end_date is not None: - from datetime import datetime - start_dt = datetime.fromtimestamp(start_date / 1000) - end_dt = datetime.fromtimestamp(end_date / 1000) - # 开始时间取当天 00:00:00,结束时间取当天 23:59:59.999999 + from datetime import datetime, timezone + start_dt = datetime.fromtimestamp(start_date / 1000, tz=timezone.utc) + end_dt = datetime.fromtimestamp(end_date / 1000, tz=timezone.utc) + # 开始时间取当天 UTC 00:00:00,结束时间取当天 UTC 23:59:59.999999 start_iso = start_dt.strftime("%Y-%m-%dT") + "00:00:00.000000" end_iso = end_dt.strftime("%Y-%m-%dT") + "23:59:59.999999"