From 5c836c90c94e46dde6bf98fe0c56482b0b716c81 Mon Sep 17 00:00:00 2001 From: miao <1468212639@qq.com> Date: Thu, 23 Apr 2026 12:05:31 +0800 Subject: [PATCH] feat(memory): add episodic memory pagination and semantic memory list API Split explicit memory overview into two independent endpoints: - GET /memory/explicit-memory/episodics: episodic memory paginated query with date range filter (millisecond timestamp) and episodic type filter using Neo4j datetime() for precise time comparison - GET /memory/explicit-memory/semantics: semantic memory full list query returns data as array directly Modified files: - api/app/controllers/memory_explicit_controller.py - api/app/services/memory_explicit_service.py --- .../controllers/memory_explicit_controller.py | 139 +++++++++++- api/app/services/memory_explicit_service.py | 206 +++++++++++++++++- 2 files changed, 342 insertions(+), 3 deletions(-) diff --git a/api/app/controllers/memory_explicit_controller.py b/api/app/controllers/memory_explicit_controller.py index c52f308c..90758dc7 100644 --- a/api/app/controllers/memory_explicit_controller.py +++ b/api/app/controllers/memory_explicit_controller.py @@ -4,7 +4,10 @@ 处理显性记忆相关的API接口,包括情景记忆和语义记忆的查询。 """ -from fastapi import APIRouter, Depends +from datetime import date +from typing import Optional + +from fastapi import APIRouter, Depends, Query from app.core.logging_config import get_api_logger from app.core.response_utils import success, fail @@ -69,6 +72,140 @@ async def get_explicit_memory_overview_api( return fail(BizCode.INTERNAL_ERROR, "显性记忆总览查询失败", str(e)) +@router.get("/episodics", response_model=ApiResponse) +async def get_episodic_memory_list_api( + end_user_id: str = Query(..., description="end user ID"), + page: int = Query(1, gt=0, description="page number, starting from 1"), + pagesize: int = Query(10, gt=0, le=100, description="number of items per page, max 100"), + start_date: Optional[int] = Query(None, description="start timestamp (ms)"), + end_date: Optional[int] = Query(None, description="end timestamp (ms)"), + episodic_type: str = Query("all", description="episodic type :all/conversation/project_work/learning/decision/important_event"), + current_user: User = Depends(get_current_user), +) -> dict: + """ + 获取情景记忆分页列表 + + 返回指定用户的情景记忆列表,支持分页、时间范围筛选和情景类型筛选。 + + Args: + end_user_id: 终端用户ID(必填) + page: 页码(从1开始,默认1) + pagesize: 每页数量(默认10,最大100) + start_date: 开始时间戳(可选,毫秒),自动扩展到当天 00:00:00 + end_date: 结束时间戳(可选,毫秒),自动扩展到当天 23:59:59 + episodic_type: 情景类型筛选(可选,默认all) + current_user: 当前用户 + + Returns: + ApiResponse: 包含情景记忆分页列表 + + Examples: + - 基础分页查询:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5 + 返回第1页,每页5条数据 + - 按时间范围筛选:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5&start_date=1738684800000&end_date=1738771199000 + 返回指定时间范围内的数据 + - 按情景类型筛选:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5&episodic_type=important_event + 返回类型为"重要事件"的数据 + + Notes: + - start_date 和 end_date 必须同时提供或同时不提供 + - start_date 不能大于 end_date + - episodic_type 可选值:all, conversation, project_work, learning, decision, important_event + - total 为该用户情景记忆总数(不受筛选条件影响) + - page.total 为筛选后的总条数 + """ + workspace_id = current_user.current_workspace_id + + # 检查用户是否已选择工作空间 + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆列表但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"情景记忆分页查询: end_user_id={end_user_id}, " + f"start_date={start_date}, end_date={end_date}, episodic_type={episodic_type}, " + f"page={page}, pagesize={pagesize}, username={current_user.username}" + ) + + # 1. 参数校验 + if page < 1 or pagesize < 1: + api_logger.warning(f"分页参数错误: page={page}, pagesize={pagesize}") + return fail(BizCode.INVALID_PARAMETER, "分页参数必须大于0") + + valid_episodic_types = ["all", "conversation", "project_work", "learning", "decision", "important_event"] + if episodic_type not in valid_episodic_types: + api_logger.warning(f"无效的情景类型参数: {episodic_type}") + return fail(BizCode.INVALID_PARAMETER, f"无效的情景类型参数,可选值:{', '.join(valid_episodic_types)}") + + # 时间戳参数校验 + if (start_date is not None and end_date is None) or (end_date is not None and start_date is None): + return fail(BizCode.INVALID_PARAMETER, "start_date和end_date必须同时提供") + + if start_date is not None and end_date is not None and start_date > end_date: + return fail(BizCode.INVALID_PARAMETER, "start_date不能大于end_date") + + # 2. 执行查询 + try: + result = await memory_explicit_service.get_episodic_memory_list( + end_user_id=end_user_id, + page=page, + pagesize=pagesize, + start_date=start_date, + end_date=end_date, + episodic_type=episodic_type, + ) + api_logger.info( + f"情景记忆分页查询成功: end_user_id={end_user_id}, " + f"total={result['total']}, 返回={len(result['items'])}条" + ) + except Exception as e: + api_logger.error(f"情景记忆分页查询失败: end_user_id={end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "情景记忆分页查询失败", str(e)) + + # 3. 返回结构化响应 + return success(data=result, msg="查询成功") + +@router.get("/semantics", response_model=ApiResponse) +async def get_semantic_memory_list_api( + end_user_id: str = Query(..., description="终端用户ID"), + current_user: User = Depends(get_current_user), +) -> dict: + """ + 获取语义记忆列表 + + 返回指定用户的全量语义记忆列表。 + + Args: + end_user_id: 终端用户ID(必填) + current_user: 当前用户 + + Returns: + ApiResponse: 包含语义记忆全量列表 + """ + workspace_id = current_user.current_workspace_id + + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询语义记忆列表但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"语义记忆列表查询: end_user_id={end_user_id}, username={current_user.username}" + ) + + try: + result = await memory_explicit_service.get_semantic_memory_list( + end_user_id=end_user_id + ) + api_logger.info( + f"语义记忆列表查询成功: end_user_id={end_user_id}, total={len(result)}" + ) + except Exception as e: + api_logger.error(f"语义记忆列表查询失败: end_user_id={end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "语义记忆列表查询失败", str(e)) + + return success(data=result, msg="查询成功") + + @router.post("/details", response_model=ApiResponse) async def get_explicit_memory_details_api( request: ExplicitMemoryDetailsRequest, diff --git a/api/app/services/memory_explicit_service.py b/api/app/services/memory_explicit_service.py index f8d39ae8..c5ab2e38 100644 --- a/api/app/services/memory_explicit_service.py +++ b/api/app/services/memory_explicit_service.py @@ -4,7 +4,8 @@ 处理显性记忆相关的业务逻辑,包括情景记忆和语义记忆的查询。 """ -from typing import Any, Dict +from datetime import date +from typing import Any, Dict, Optional from app.core.logging_config import get_logger from app.services.memory_base_service import MemoryBaseService @@ -104,7 +105,7 @@ class MemoryExplicitService(MemoryBaseService): e.description AS core_definition ORDER BY e.name ASC """ - + semantic_result = await self.neo4j_connector.execute_query( semantic_query, end_user_id=end_user_id @@ -146,6 +147,207 @@ class MemoryExplicitService(MemoryBaseService): logger.error(f"获取显性记忆总览时出错: {str(e)}", exc_info=True) raise + + async def get_episodic_memory_list( + self, + end_user_id: str, + page: int, + pagesize: int, + start_date: Optional[int] = None, + end_date: Optional[int] = None, + episodic_type: str = "all", + ) -> Dict[str, Any]: + """ + 获取情景记忆分页列表 + + Args: + end_user_id: 终端用户ID + page: 页码 + pagesize: 每页数量 + start_date: 开始时间戳(毫秒),可选 + end_date: 结束时间戳(毫秒),可选 + episodic_type: 情景类型筛选 + + Returns: + { + "total": int, # 该用户情景记忆总数(不受筛选影响) + "items": [...], # 当前页数据 + "page": { + "page": int, + "pagesize": int, + "total": int, # 筛选后总数 + "hasnext": bool + } + } + """ + try: + logger.info( + f"情景记忆分页查询: end_user_id={end_user_id}, " + f"start_date={start_date}, end_date={end_date}, " + f"episodic_type={episodic_type}, page={page}, pagesize={pagesize}" + ) + + # 1. 查询情景记忆总数(不受筛选条件限制) + total_all_query = """ + MATCH (s:MemorySummary) + WHERE s.end_user_id = $end_user_id + RETURN count(s) AS total + """ + total_all_result = await self.neo4j_connector.execute_query( + total_all_query, end_user_id=end_user_id + ) + total_all = total_all_result[0]["total"] if total_all_result else 0 + + # 2. 构建筛选条件 + where_clauses = ["s.end_user_id = $end_user_id"] + params = {"end_user_id": end_user_id} + + # 时间戳筛选(毫秒时间戳转为 ISO 字符串,使用 Neo4j datetime() 精确比较) + if start_date is not None and end_date is not None: + from datetime import datetime + start_dt = datetime.fromtimestamp(start_date / 1000) + end_dt = datetime.fromtimestamp(end_date / 1000) + # 开始时间取当天 00:00:00,结束时间取当天 23:59:59.999999 + start_iso = start_dt.strftime("%Y-%m-%dT") + "00:00:00.000000" + end_iso = end_dt.strftime("%Y-%m-%dT") + "23:59:59.999999" + + where_clauses.append("datetime(s.created_at) >= datetime($start_iso) AND datetime(s.created_at) <= datetime($end_iso)") + params["start_iso"] = start_iso + params["end_iso"] = end_iso + + # 类型筛选下推到 Cypher(兼容中英文) + if episodic_type != "all": + type_mapping = { + "conversation": "对话", + "project_work": "项目/工作", + "learning": "学习", + "decision": "决策", + "important_event": "重要事件" + } + chinese_type = type_mapping.get(episodic_type) + if chinese_type: + where_clauses.append( + "(s.memory_type = $episodic_type OR s.memory_type = $chinese_type)" + ) + params["episodic_type"] = episodic_type + params["chinese_type"] = chinese_type + else: + where_clauses.append("s.memory_type = $episodic_type") + params["episodic_type"] = episodic_type + + where_str = " AND ".join(where_clauses) + + # 3. 查询筛选后的总数 + count_query = f""" + MATCH (s:MemorySummary) + WHERE {where_str} + RETURN count(s) AS total + """ + count_result = await self.neo4j_connector.execute_query(count_query, **params) + filtered_total = count_result[0]["total"] if count_result else 0 + + # 4. 查询分页数据 + skip = (page - 1) * pagesize + data_query = f""" + MATCH (s:MemorySummary) + WHERE {where_str} + RETURN elementId(s) AS id, + s.name AS title, + s.memory_type AS memory_type, + s.content AS content, + s.created_at AS created_at + ORDER BY s.created_at DESC + SKIP {skip} LIMIT {pagesize} + """ + + result = await self.neo4j_connector.execute_query(data_query, **params) + + # 5. 处理结果 + items = [] + if result: + for record in result: + raw_created_at = record.get("created_at") + created_at_timestamp = self.parse_timestamp(raw_created_at) + items.append({ + "id": record["id"], + "title": record.get("title") or "未命名", + "memory_type": record.get("memory_type") or "其他", + "content": record.get("content") or "", + "created_at": created_at_timestamp + }) + + # 6. 构建返回结果 + return { + "total": total_all, + "items": items, + "page": { + "page": page, + "pagesize": pagesize, + "total": filtered_total, + "hasnext": (page * pagesize) < filtered_total + } + } + + except Exception as e: + logger.error(f"情景记忆分页查询出错: {str(e)}", exc_info=True) + raise + + async def get_semantic_memory_list( + self, + end_user_id: str + ) -> list: + """ + 获取语义记忆全量列表 + + Args: + end_user_id: 终端用户ID + + Returns: + [ + { + "id": str, + "name": str, + "entity_type": str, + "core_definition": str + } + ] + """ + try: + logger.info(f"语义记忆列表查询: end_user_id={end_user_id}") + + semantic_query = """ + MATCH (e:ExtractedEntity) + WHERE e.end_user_id = $end_user_id + AND e.is_explicit_memory = true + RETURN elementId(e) AS id, + e.name AS name, + e.entity_type AS entity_type, + e.description AS core_definition + ORDER BY e.name ASC + """ + + result = await self.neo4j_connector.execute_query( + semantic_query, end_user_id=end_user_id + ) + + items = [] + if result: + for record in result: + items.append({ + "id": record["id"], + "name": record.get("name") or "未命名", + "entity_type": record.get("entity_type") or "未分类", + "core_definition": record.get("core_definition") or "" + }) + + logger.info(f"语义记忆列表查询成功: end_user_id={end_user_id}, total={len(items)}") + + return items + + except Exception as e: + logger.error(f"语义记忆列表查询出错: {str(e)}", exc_info=True) + raise + async def get_explicit_memory_details( self, end_user_id: str,