feat(memory): add episodic memory pagination and semantic memory list API
Split explicit memory overview into two independent endpoints: - GET /memory/explicit-memory/episodics: episodic memory paginated query with date range filter (millisecond timestamp) and episodic type filter using Neo4j datetime() for precise time comparison - GET /memory/explicit-memory/semantics: semantic memory full list query returns data as array directly Modified files: - api/app/controllers/memory_explicit_controller.py - api/app/services/memory_explicit_service.py
This commit is contained in:
@@ -4,7 +4,10 @@
|
||||
处理显性记忆相关的API接口,包括情景记忆和语义记忆的查询。
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
from datetime import date
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, Query
|
||||
|
||||
from app.core.logging_config import get_api_logger
|
||||
from app.core.response_utils import success, fail
|
||||
@@ -69,6 +72,140 @@ async def get_explicit_memory_overview_api(
|
||||
return fail(BizCode.INTERNAL_ERROR, "显性记忆总览查询失败", str(e))
|
||||
|
||||
|
||||
@router.get("/episodics", response_model=ApiResponse)
|
||||
async def get_episodic_memory_list_api(
|
||||
end_user_id: str = Query(..., description="end user ID"),
|
||||
page: int = Query(1, gt=0, description="page number, starting from 1"),
|
||||
pagesize: int = Query(10, gt=0, le=100, description="number of items per page, max 100"),
|
||||
start_date: Optional[int] = Query(None, description="start timestamp (ms)"),
|
||||
end_date: Optional[int] = Query(None, description="end timestamp (ms)"),
|
||||
episodic_type: str = Query("all", description="episodic type :all/conversation/project_work/learning/decision/important_event"),
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> dict:
|
||||
"""
|
||||
获取情景记忆分页列表
|
||||
|
||||
返回指定用户的情景记忆列表,支持分页、时间范围筛选和情景类型筛选。
|
||||
|
||||
Args:
|
||||
end_user_id: 终端用户ID(必填)
|
||||
page: 页码(从1开始,默认1)
|
||||
pagesize: 每页数量(默认10,最大100)
|
||||
start_date: 开始时间戳(可选,毫秒),自动扩展到当天 00:00:00
|
||||
end_date: 结束时间戳(可选,毫秒),自动扩展到当天 23:59:59
|
||||
episodic_type: 情景类型筛选(可选,默认all)
|
||||
current_user: 当前用户
|
||||
|
||||
Returns:
|
||||
ApiResponse: 包含情景记忆分页列表
|
||||
|
||||
Examples:
|
||||
- 基础分页查询:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5
|
||||
返回第1页,每页5条数据
|
||||
- 按时间范围筛选:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5&start_date=1738684800000&end_date=1738771199000
|
||||
返回指定时间范围内的数据
|
||||
- 按情景类型筛选:GET /episodic-list?end_user_id=xxx&page=1&pagesize=5&episodic_type=important_event
|
||||
返回类型为"重要事件"的数据
|
||||
|
||||
Notes:
|
||||
- start_date 和 end_date 必须同时提供或同时不提供
|
||||
- start_date 不能大于 end_date
|
||||
- episodic_type 可选值:all, conversation, project_work, learning, decision, important_event
|
||||
- total 为该用户情景记忆总数(不受筛选条件影响)
|
||||
- page.total 为筛选后的总条数
|
||||
"""
|
||||
workspace_id = current_user.current_workspace_id
|
||||
|
||||
# 检查用户是否已选择工作空间
|
||||
if workspace_id is None:
|
||||
api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆列表但未选择工作空间")
|
||||
return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None")
|
||||
|
||||
api_logger.info(
|
||||
f"情景记忆分页查询: end_user_id={end_user_id}, "
|
||||
f"start_date={start_date}, end_date={end_date}, episodic_type={episodic_type}, "
|
||||
f"page={page}, pagesize={pagesize}, username={current_user.username}"
|
||||
)
|
||||
|
||||
# 1. 参数校验
|
||||
if page < 1 or pagesize < 1:
|
||||
api_logger.warning(f"分页参数错误: page={page}, pagesize={pagesize}")
|
||||
return fail(BizCode.INVALID_PARAMETER, "分页参数必须大于0")
|
||||
|
||||
valid_episodic_types = ["all", "conversation", "project_work", "learning", "decision", "important_event"]
|
||||
if episodic_type not in valid_episodic_types:
|
||||
api_logger.warning(f"无效的情景类型参数: {episodic_type}")
|
||||
return fail(BizCode.INVALID_PARAMETER, f"无效的情景类型参数,可选值:{', '.join(valid_episodic_types)}")
|
||||
|
||||
# 时间戳参数校验
|
||||
if (start_date is not None and end_date is None) or (end_date is not None and start_date is None):
|
||||
return fail(BizCode.INVALID_PARAMETER, "start_date和end_date必须同时提供")
|
||||
|
||||
if start_date is not None and end_date is not None and start_date > end_date:
|
||||
return fail(BizCode.INVALID_PARAMETER, "start_date不能大于end_date")
|
||||
|
||||
# 2. 执行查询
|
||||
try:
|
||||
result = await memory_explicit_service.get_episodic_memory_list(
|
||||
end_user_id=end_user_id,
|
||||
page=page,
|
||||
pagesize=pagesize,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
episodic_type=episodic_type,
|
||||
)
|
||||
api_logger.info(
|
||||
f"情景记忆分页查询成功: end_user_id={end_user_id}, "
|
||||
f"total={result['total']}, 返回={len(result['items'])}条"
|
||||
)
|
||||
except Exception as e:
|
||||
api_logger.error(f"情景记忆分页查询失败: end_user_id={end_user_id}, error={str(e)}")
|
||||
return fail(BizCode.INTERNAL_ERROR, "情景记忆分页查询失败", str(e))
|
||||
|
||||
# 3. 返回结构化响应
|
||||
return success(data=result, msg="查询成功")
|
||||
|
||||
@router.get("/semantics", response_model=ApiResponse)
|
||||
async def get_semantic_memory_list_api(
|
||||
end_user_id: str = Query(..., description="终端用户ID"),
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> dict:
|
||||
"""
|
||||
获取语义记忆列表
|
||||
|
||||
返回指定用户的全量语义记忆列表。
|
||||
|
||||
Args:
|
||||
end_user_id: 终端用户ID(必填)
|
||||
current_user: 当前用户
|
||||
|
||||
Returns:
|
||||
ApiResponse: 包含语义记忆全量列表
|
||||
"""
|
||||
workspace_id = current_user.current_workspace_id
|
||||
|
||||
if workspace_id is None:
|
||||
api_logger.warning(f"用户 {current_user.username} 尝试查询语义记忆列表但未选择工作空间")
|
||||
return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None")
|
||||
|
||||
api_logger.info(
|
||||
f"语义记忆列表查询: end_user_id={end_user_id}, username={current_user.username}"
|
||||
)
|
||||
|
||||
try:
|
||||
result = await memory_explicit_service.get_semantic_memory_list(
|
||||
end_user_id=end_user_id
|
||||
)
|
||||
api_logger.info(
|
||||
f"语义记忆列表查询成功: end_user_id={end_user_id}, total={len(result)}"
|
||||
)
|
||||
except Exception as e:
|
||||
api_logger.error(f"语义记忆列表查询失败: end_user_id={end_user_id}, error={str(e)}")
|
||||
return fail(BizCode.INTERNAL_ERROR, "语义记忆列表查询失败", str(e))
|
||||
|
||||
return success(data=result, msg="查询成功")
|
||||
|
||||
|
||||
@router.post("/details", response_model=ApiResponse)
|
||||
async def get_explicit_memory_details_api(
|
||||
request: ExplicitMemoryDetailsRequest,
|
||||
|
||||
@@ -4,7 +4,8 @@
|
||||
处理显性记忆相关的业务逻辑,包括情景记忆和语义记忆的查询。
|
||||
"""
|
||||
|
||||
from typing import Any, Dict
|
||||
from datetime import date
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from app.core.logging_config import get_logger
|
||||
from app.services.memory_base_service import MemoryBaseService
|
||||
@@ -104,7 +105,7 @@ class MemoryExplicitService(MemoryBaseService):
|
||||
e.description AS core_definition
|
||||
ORDER BY e.name ASC
|
||||
"""
|
||||
|
||||
|
||||
semantic_result = await self.neo4j_connector.execute_query(
|
||||
semantic_query,
|
||||
end_user_id=end_user_id
|
||||
@@ -146,6 +147,207 @@ class MemoryExplicitService(MemoryBaseService):
|
||||
logger.error(f"获取显性记忆总览时出错: {str(e)}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
async def get_episodic_memory_list(
|
||||
self,
|
||||
end_user_id: str,
|
||||
page: int,
|
||||
pagesize: int,
|
||||
start_date: Optional[int] = None,
|
||||
end_date: Optional[int] = None,
|
||||
episodic_type: str = "all",
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
获取情景记忆分页列表
|
||||
|
||||
Args:
|
||||
end_user_id: 终端用户ID
|
||||
page: 页码
|
||||
pagesize: 每页数量
|
||||
start_date: 开始时间戳(毫秒),可选
|
||||
end_date: 结束时间戳(毫秒),可选
|
||||
episodic_type: 情景类型筛选
|
||||
|
||||
Returns:
|
||||
{
|
||||
"total": int, # 该用户情景记忆总数(不受筛选影响)
|
||||
"items": [...], # 当前页数据
|
||||
"page": {
|
||||
"page": int,
|
||||
"pagesize": int,
|
||||
"total": int, # 筛选后总数
|
||||
"hasnext": bool
|
||||
}
|
||||
}
|
||||
"""
|
||||
try:
|
||||
logger.info(
|
||||
f"情景记忆分页查询: end_user_id={end_user_id}, "
|
||||
f"start_date={start_date}, end_date={end_date}, "
|
||||
f"episodic_type={episodic_type}, page={page}, pagesize={pagesize}"
|
||||
)
|
||||
|
||||
# 1. 查询情景记忆总数(不受筛选条件限制)
|
||||
total_all_query = """
|
||||
MATCH (s:MemorySummary)
|
||||
WHERE s.end_user_id = $end_user_id
|
||||
RETURN count(s) AS total
|
||||
"""
|
||||
total_all_result = await self.neo4j_connector.execute_query(
|
||||
total_all_query, end_user_id=end_user_id
|
||||
)
|
||||
total_all = total_all_result[0]["total"] if total_all_result else 0
|
||||
|
||||
# 2. 构建筛选条件
|
||||
where_clauses = ["s.end_user_id = $end_user_id"]
|
||||
params = {"end_user_id": end_user_id}
|
||||
|
||||
# 时间戳筛选(毫秒时间戳转为 ISO 字符串,使用 Neo4j datetime() 精确比较)
|
||||
if start_date is not None and end_date is not None:
|
||||
from datetime import datetime
|
||||
start_dt = datetime.fromtimestamp(start_date / 1000)
|
||||
end_dt = datetime.fromtimestamp(end_date / 1000)
|
||||
# 开始时间取当天 00:00:00,结束时间取当天 23:59:59.999999
|
||||
start_iso = start_dt.strftime("%Y-%m-%dT") + "00:00:00.000000"
|
||||
end_iso = end_dt.strftime("%Y-%m-%dT") + "23:59:59.999999"
|
||||
|
||||
where_clauses.append("datetime(s.created_at) >= datetime($start_iso) AND datetime(s.created_at) <= datetime($end_iso)")
|
||||
params["start_iso"] = start_iso
|
||||
params["end_iso"] = end_iso
|
||||
|
||||
# 类型筛选下推到 Cypher(兼容中英文)
|
||||
if episodic_type != "all":
|
||||
type_mapping = {
|
||||
"conversation": "对话",
|
||||
"project_work": "项目/工作",
|
||||
"learning": "学习",
|
||||
"decision": "决策",
|
||||
"important_event": "重要事件"
|
||||
}
|
||||
chinese_type = type_mapping.get(episodic_type)
|
||||
if chinese_type:
|
||||
where_clauses.append(
|
||||
"(s.memory_type = $episodic_type OR s.memory_type = $chinese_type)"
|
||||
)
|
||||
params["episodic_type"] = episodic_type
|
||||
params["chinese_type"] = chinese_type
|
||||
else:
|
||||
where_clauses.append("s.memory_type = $episodic_type")
|
||||
params["episodic_type"] = episodic_type
|
||||
|
||||
where_str = " AND ".join(where_clauses)
|
||||
|
||||
# 3. 查询筛选后的总数
|
||||
count_query = f"""
|
||||
MATCH (s:MemorySummary)
|
||||
WHERE {where_str}
|
||||
RETURN count(s) AS total
|
||||
"""
|
||||
count_result = await self.neo4j_connector.execute_query(count_query, **params)
|
||||
filtered_total = count_result[0]["total"] if count_result else 0
|
||||
|
||||
# 4. 查询分页数据
|
||||
skip = (page - 1) * pagesize
|
||||
data_query = f"""
|
||||
MATCH (s:MemorySummary)
|
||||
WHERE {where_str}
|
||||
RETURN elementId(s) AS id,
|
||||
s.name AS title,
|
||||
s.memory_type AS memory_type,
|
||||
s.content AS content,
|
||||
s.created_at AS created_at
|
||||
ORDER BY s.created_at DESC
|
||||
SKIP {skip} LIMIT {pagesize}
|
||||
"""
|
||||
|
||||
result = await self.neo4j_connector.execute_query(data_query, **params)
|
||||
|
||||
# 5. 处理结果
|
||||
items = []
|
||||
if result:
|
||||
for record in result:
|
||||
raw_created_at = record.get("created_at")
|
||||
created_at_timestamp = self.parse_timestamp(raw_created_at)
|
||||
items.append({
|
||||
"id": record["id"],
|
||||
"title": record.get("title") or "未命名",
|
||||
"memory_type": record.get("memory_type") or "其他",
|
||||
"content": record.get("content") or "",
|
||||
"created_at": created_at_timestamp
|
||||
})
|
||||
|
||||
# 6. 构建返回结果
|
||||
return {
|
||||
"total": total_all,
|
||||
"items": items,
|
||||
"page": {
|
||||
"page": page,
|
||||
"pagesize": pagesize,
|
||||
"total": filtered_total,
|
||||
"hasnext": (page * pagesize) < filtered_total
|
||||
}
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"情景记忆分页查询出错: {str(e)}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def get_semantic_memory_list(
|
||||
self,
|
||||
end_user_id: str
|
||||
) -> list:
|
||||
"""
|
||||
获取语义记忆全量列表
|
||||
|
||||
Args:
|
||||
end_user_id: 终端用户ID
|
||||
|
||||
Returns:
|
||||
[
|
||||
{
|
||||
"id": str,
|
||||
"name": str,
|
||||
"entity_type": str,
|
||||
"core_definition": str
|
||||
}
|
||||
]
|
||||
"""
|
||||
try:
|
||||
logger.info(f"语义记忆列表查询: end_user_id={end_user_id}")
|
||||
|
||||
semantic_query = """
|
||||
MATCH (e:ExtractedEntity)
|
||||
WHERE e.end_user_id = $end_user_id
|
||||
AND e.is_explicit_memory = true
|
||||
RETURN elementId(e) AS id,
|
||||
e.name AS name,
|
||||
e.entity_type AS entity_type,
|
||||
e.description AS core_definition
|
||||
ORDER BY e.name ASC
|
||||
"""
|
||||
|
||||
result = await self.neo4j_connector.execute_query(
|
||||
semantic_query, end_user_id=end_user_id
|
||||
)
|
||||
|
||||
items = []
|
||||
if result:
|
||||
for record in result:
|
||||
items.append({
|
||||
"id": record["id"],
|
||||
"name": record.get("name") or "未命名",
|
||||
"entity_type": record.get("entity_type") or "未分类",
|
||||
"core_definition": record.get("core_definition") or ""
|
||||
})
|
||||
|
||||
logger.info(f"语义记忆列表查询成功: end_user_id={end_user_id}, total={len(items)}")
|
||||
|
||||
return items
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"语义记忆列表查询出错: {str(e)}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def get_explicit_memory_details(
|
||||
self,
|
||||
end_user_id: str,
|
||||
|
||||
Reference in New Issue
Block a user