diff --git a/api/app/controllers/memory_explicit_controller.py b/api/app/controllers/memory_explicit_controller.py index c52f308c..88877de3 100644 --- a/api/app/controllers/memory_explicit_controller.py +++ b/api/app/controllers/memory_explicit_controller.py @@ -4,7 +4,9 @@ 处理显性记忆相关的API接口,包括情景记忆和语义记忆的查询。 """ -from fastapi import APIRouter, Depends +from typing import Optional + +from fastapi import APIRouter, Depends, Query from app.core.logging_config import get_api_logger from app.core.response_utils import success, fail @@ -69,6 +71,140 @@ async def get_explicit_memory_overview_api( return fail(BizCode.INTERNAL_ERROR, "显性记忆总览查询失败", str(e)) +@router.get("/episodics", response_model=ApiResponse) +async def get_episodic_memory_list_api( + end_user_id: str = Query(..., description="end user ID"), + page: int = Query(1, gt=0, description="page number, starting from 1"), + pagesize: int = Query(10, gt=0, le=100, description="number of items per page, max 100"), + start_date: Optional[int] = Query(None, description="start timestamp (ms)"), + end_date: Optional[int] = Query(None, description="end timestamp (ms)"), + episodic_type: str = Query("all", description="episodic type :all/conversation/project_work/learning/decision/important_event"), + current_user: User = Depends(get_current_user), +) -> dict: + """ + 获取情景记忆分页列表 + + 返回指定用户的情景记忆列表,支持分页、时间范围筛选和情景类型筛选。 + + Args: + end_user_id: 终端用户ID(必填) + page: 页码(从1开始,默认1) + pagesize: 每页数量(默认10,最大100) + start_date: 开始时间戳(可选,毫秒),自动扩展到当天 00:00:00 + end_date: 结束时间戳(可选,毫秒),自动扩展到当天 23:59:59 + episodic_type: 情景类型筛选(可选,默认all) + current_user: 当前用户 + + Returns: + ApiResponse: 包含情景记忆分页列表 + + Examples: + - 基础分页查询:GET /episodics?end_user_id=xxx&page=1&pagesize=5 + 返回第1页,每页5条数据 + - 按时间范围筛选:GET /episodics?end_user_id=xxx&page=1&pagesize=5&start_date=1738684800000&end_date=1738771199000 + 返回指定时间范围内的数据 + - 按情景类型筛选:GET /episodics?end_user_id=xxx&page=1&pagesize=5&episodic_type=important_event + 返回类型为"重要事件"的数据 + + Notes: + - start_date 和 end_date 必须同时提供或同时不提供 + - start_date 不能大于 end_date + - episodic_type 可选值:all, conversation, project_work, learning, decision, important_event + - total 为该用户情景记忆总数(不受筛选条件影响) + - page.total 为筛选后的总条数 + """ + workspace_id = current_user.current_workspace_id + + # 检查用户是否已选择工作空间 + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆列表但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"情景记忆分页查询: end_user_id={end_user_id}, " + f"start_date={start_date}, end_date={end_date}, episodic_type={episodic_type}, " + f"page={page}, pagesize={pagesize}, username={current_user.username}" + ) + + # 1. 参数校验 + if page < 1 or pagesize < 1: + api_logger.warning(f"分页参数错误: page={page}, pagesize={pagesize}") + return fail(BizCode.INVALID_PARAMETER, "分页参数必须大于0") + + valid_episodic_types = ["all", "conversation", "project_work", "learning", "decision", "important_event"] + if episodic_type not in valid_episodic_types: + api_logger.warning(f"无效的情景类型参数: {episodic_type}") + return fail(BizCode.INVALID_PARAMETER, f"无效的情景类型参数,可选值:{', '.join(valid_episodic_types)}") + + # 时间戳参数校验 + if (start_date is not None and end_date is None) or (end_date is not None and start_date is None): + return fail(BizCode.INVALID_PARAMETER, "start_date和end_date必须同时提供") + + if start_date is not None and end_date is not None and start_date > end_date: + return fail(BizCode.INVALID_PARAMETER, "start_date不能大于end_date") + + # 2. 执行查询 + try: + result = await memory_explicit_service.get_episodic_memory_list( + end_user_id=end_user_id, + page=page, + pagesize=pagesize, + start_date=start_date, + end_date=end_date, + episodic_type=episodic_type, + ) + api_logger.info( + f"情景记忆分页查询成功: end_user_id={end_user_id}, " + f"total={result['total']}, 返回={len(result['items'])}条" + ) + except Exception as e: + api_logger.error(f"情景记忆分页查询失败: end_user_id={end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "情景记忆分页查询失败", str(e)) + + # 3. 返回结构化响应 + return success(data=result, msg="查询成功") + +@router.get("/semantics", response_model=ApiResponse) +async def get_semantic_memory_list_api( + end_user_id: str = Query(..., description="终端用户ID"), + current_user: User = Depends(get_current_user), +) -> dict: + """ + 获取语义记忆列表 + + 返回指定用户的全量语义记忆列表。 + + Args: + end_user_id: 终端用户ID(必填) + current_user: 当前用户 + + Returns: + ApiResponse: 包含语义记忆全量列表 + """ + workspace_id = current_user.current_workspace_id + + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询语义记忆列表但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"语义记忆列表查询: end_user_id={end_user_id}, username={current_user.username}" + ) + + try: + result = await memory_explicit_service.get_semantic_memory_list( + end_user_id=end_user_id + ) + api_logger.info( + f"语义记忆列表查询成功: end_user_id={end_user_id}, total={len(result)}" + ) + except Exception as e: + api_logger.error(f"语义记忆列表查询失败: end_user_id={end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "语义记忆列表查询失败", str(e)) + + return success(data=result, msg="查询成功") + + @router.post("/details", response_model=ApiResponse) async def get_explicit_memory_details_api( request: ExplicitMemoryDetailsRequest, diff --git a/api/app/controllers/service/__init__.py b/api/app/controllers/service/__init__.py index 52d4b732..850b496d 100644 --- a/api/app/controllers/service/__init__.py +++ b/api/app/controllers/service/__init__.py @@ -14,6 +14,7 @@ from . import ( rag_api_document_controller, rag_api_file_controller, rag_api_knowledge_controller, + user_memory_api_controller, ) # 创建 V1 API 路由器 @@ -28,5 +29,6 @@ service_router.include_router(rag_api_chunk_controller.router) service_router.include_router(memory_api_controller.router) service_router.include_router(end_user_api_controller.router) service_router.include_router(memory_config_api_controller.router) +service_router.include_router(user_memory_api_controller.router) __all__ = ["service_router"] diff --git a/api/app/controllers/service/user_memory_api_controller.py b/api/app/controllers/service/user_memory_api_controller.py new file mode 100644 index 00000000..19a3a92f --- /dev/null +++ b/api/app/controllers/service/user_memory_api_controller.py @@ -0,0 +1,230 @@ +"""User Memory 服务接口 — 基于 API Key 认证 + +包装 user_memory_controllers.py 和 memory_agent_controller.py 中的内部接口, +提供基于 API Key 认证的对外服务: +1./analytics/graph_data - 知识图谱数据接口 +2./analytics/community_graph - 社区图谱接口 +3./analytics/node_statistics - 记忆节点统计接口 +4./analytics/user_summary - 用户摘要接口 +5./analytics/memory_insight - 记忆洞察接口 +6./analytics/interest_distribution - 兴趣分布接口 +7./analytics/end_user_info - 终端用户信息接口 +8./analytics/generate_cache - 缓存生成接口 + + +路由前缀: /memory +子路径: /analytics/... +最终路径: /v1/memory/analytics/... +认证方式: API Key (@require_api_key) +""" + +from typing import Optional + +from fastapi import APIRouter, Depends, Header, Query, Request, Body +from sqlalchemy.orm import Session + +from app.core.api_key_auth import require_api_key +from app.core.api_key_utils import get_current_user_from_api_key, validate_end_user_in_workspace +from app.core.logging_config import get_business_logger +from app.db import get_db +from app.schemas.api_key_schema import ApiKeyAuth +from app.schemas.memory_storage_schema import GenerateCacheRequest + +# 包装内部服务 controller +from app.controllers import user_memory_controllers, memory_agent_controller + +router = APIRouter(prefix="/memory", tags=["V1 - User Memory API"]) +logger = get_business_logger() + + +# ==================== 知识图谱 ==================== + + +@router.get("/analytics/graph_data") +@require_api_key(scopes=["memory"]) +async def get_graph_data( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + node_types: Optional[str] = Query(None, description="Comma-separated node types filter"), + limit: int = Query(100, description="Max nodes to return (auto-capped at 1000 in service layer)"), + depth: int = Query(1, description="Graph traversal depth (auto-capped at 3 in service layer)"), + center_node_id: Optional[str] = Query(None, description="Center node for subgraph"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get knowledge graph data (nodes + edges) for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_graph_data_api( + end_user_id=end_user_id, + node_types=node_types, + limit=limit, + depth=depth, + center_node_id=center_node_id, + current_user=current_user, + db=db, + ) + + +@router.get("/analytics/community_graph") +@require_api_key(scopes=["memory"]) +async def get_community_graph( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get community clustering graph for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_community_graph_data_api( + end_user_id=end_user_id, + current_user=current_user, + db=db, + ) + + +# ==================== 节点统计 ==================== + + +@router.get("/analytics/node_statistics") +@require_api_key(scopes=["memory"]) +async def get_node_statistics( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get memory node type statistics for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_node_statistics_api( + end_user_id=end_user_id, + current_user=current_user, + db=db, + ) + + +# ==================== 用户摘要 & 洞察 ==================== + + +@router.get("/analytics/user_summary") +@require_api_key(scopes=["memory"]) +async def get_user_summary( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + language_type: str = Header(default=None, alias="X-Language-Type"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get cached user summary for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_user_summary_api( + end_user_id=end_user_id, + language_type=language_type, + current_user=current_user, + db=db, + ) + + +@router.get("/analytics/memory_insight") +@require_api_key(scopes=["memory"]) +async def get_memory_insight( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get cached memory insight report for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_memory_insight_report_api( + end_user_id=end_user_id, + current_user=current_user, + db=db, + ) + + +# ==================== 兴趣分布 ==================== + + +@router.get("/analytics/interest_distribution") +@require_api_key(scopes=["memory"]) +async def get_interest_distribution( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + limit: int = Query(5, le=5, description="Max interest tags to return"), + language_type: str = Header(default=None, alias="X-Language-Type"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get interest distribution tags for an end user.""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await memory_agent_controller.get_interest_distribution_by_user_api( + end_user_id=end_user_id, + limit=limit, + language_type=language_type, + current_user=current_user, + db=db, + ) + + +# ==================== 终端用户信息 ==================== + + +@router.get("/analytics/end_user_info") +@require_api_key(scopes=["memory"]) +async def get_end_user_info( + request: Request, + end_user_id: str = Query(..., description="End user ID"), + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), +): + """Get end user basic information (name, aliases, metadata).""" + current_user = get_current_user_from_api_key(db, api_key_auth) + validate_end_user_in_workspace(db, end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.get_end_user_info( + end_user_id=end_user_id, + current_user=current_user, + db=db, + ) + + +# ==================== 缓存生成 ==================== + + +@router.post("/analytics/generate_cache") +@require_api_key(scopes=["memory"]) +async def generate_cache( + request: Request, + api_key_auth: ApiKeyAuth = None, + db: Session = Depends(get_db), + message: str = Body(None, description="Request body"), + language_type: str = Header(default=None, alias="X-Language-Type"), +): + """Trigger cache generation (user summary + memory insight) for an end user or all workspace users.""" + body = await request.json() + cache_request = GenerateCacheRequest(**body) + + current_user = get_current_user_from_api_key(db, api_key_auth) + + if cache_request.end_user_id: + validate_end_user_in_workspace(db, cache_request.end_user_id, api_key_auth.workspace_id) + + return await user_memory_controllers.generate_cache_api( + request=cache_request, + language_type=language_type, + current_user=current_user, + db=db, + ) + + diff --git a/api/app/core/api_key_utils.py b/api/app/core/api_key_utils.py index fb6b9552..7687d8af 100644 --- a/api/app/core/api_key_utils.py +++ b/api/app/core/api_key_utils.py @@ -1,8 +1,15 @@ """API Key 工具函数""" import secrets +import uuid as _uuid from typing import Optional, Union from datetime import datetime +from sqlalchemy.orm import Session as _Session +from app.core.error_codes import BizCode as _BizCode +from app.core.exceptions import BusinessException as _BusinessException +from app.models.end_user_model import EndUser as _EndUser +from app.repositories.end_user_repository import EndUserRepository as _EndUserRepository + from app.models.api_key_model import ApiKeyType from fastapi import Response from fastapi.responses import JSONResponse @@ -65,3 +72,72 @@ def datetime_to_timestamp(dt: Optional[datetime]) -> Optional[int]: return None return int(dt.timestamp() * 1000) + + +def get_current_user_from_api_key(db: _Session, api_key_auth): + """通过 API Key 构造 current_user 对象。 + + 从 API Key 反查创建者(管理员用户),并设置其 workspace 上下文。 + 与内部接口的 Depends(get_current_user) (JWT) 等价。 + + Args: + db: 数据库会话 + api_key_auth: API Key 认证信息(ApiKeyAuth) + + Returns: + User ORM 对象,已设置 current_workspace_id + """ + from app.services import api_key_service + + api_key = api_key_service.ApiKeyService.get_api_key( + db, api_key_auth.api_key_id, api_key_auth.workspace_id + ) + current_user = api_key.creator + current_user.current_workspace_id = api_key_auth.workspace_id + return current_user + + +def validate_end_user_in_workspace( + db: _Session, + end_user_id: str, + workspace_id, +) -> _EndUser: + """校验 end_user 是否存在且属于指定 workspace。 + + Args: + db: 数据库会话 + end_user_id: 终端用户 ID + workspace_id: 工作空间 ID(UUID 或字符串均可) + + Returns: + EndUser ORM 对象(校验通过时) + + Raises: + BusinessException(INVALID_PARAMETER): end_user_id 格式无效 + BusinessException(USER_NOT_FOUND): end_user 不存在 + BusinessException(PERMISSION_DENIED): end_user 不属于该 workspace + """ + try: + _uuid.UUID(end_user_id) + except (ValueError, AttributeError): + raise _BusinessException( + f"Invalid end_user_id format: {end_user_id}", + _BizCode.INVALID_PARAMETER, + ) + + end_user_repo = _EndUserRepository(db) + end_user = end_user_repo.get_end_user_by_id(end_user_id) + + if end_user is None: + raise _BusinessException( + "End user not found", + _BizCode.USER_NOT_FOUND, + ) + + if str(end_user.workspace_id) != str(workspace_id): + raise _BusinessException( + "End user does not belong to this workspace", + _BizCode.PERMISSION_DENIED, + ) + + return end_user \ No newline at end of file diff --git a/api/app/services/memory_explicit_service.py b/api/app/services/memory_explicit_service.py index f8d39ae8..4d9a5c2b 100644 --- a/api/app/services/memory_explicit_service.py +++ b/api/app/services/memory_explicit_service.py @@ -4,7 +4,7 @@ 处理显性记忆相关的业务逻辑,包括情景记忆和语义记忆的查询。 """ -from typing import Any, Dict +from typing import Any, Dict, Optional from app.core.logging_config import get_logger from app.services.memory_base_service import MemoryBaseService @@ -104,7 +104,7 @@ class MemoryExplicitService(MemoryBaseService): e.description AS core_definition ORDER BY e.name ASC """ - + semantic_result = await self.neo4j_connector.execute_query( semantic_query, end_user_id=end_user_id @@ -146,6 +146,209 @@ class MemoryExplicitService(MemoryBaseService): logger.error(f"获取显性记忆总览时出错: {str(e)}", exc_info=True) raise + + async def get_episodic_memory_list( + self, + end_user_id: str, + page: int, + pagesize: int, + start_date: Optional[int] = None, + end_date: Optional[int] = None, + episodic_type: str = "all", + ) -> Dict[str, Any]: + """ + 获取情景记忆分页列表 + + Args: + end_user_id: 终端用户ID + page: 页码 + pagesize: 每页数量 + start_date: 开始时间戳(毫秒),可选 + end_date: 结束时间戳(毫秒),可选 + episodic_type: 情景类型筛选 + + Returns: + { + "total": int, # 该用户情景记忆总数(不受筛选影响) + "items": [...], # 当前页数据 + "page": { + "page": int, + "pagesize": int, + "total": int, # 筛选后总数 + "hasnext": bool + } + } + """ + try: + logger.info( + f"情景记忆分页查询: end_user_id={end_user_id}, " + f"start_date={start_date}, end_date={end_date}, " + f"episodic_type={episodic_type}, page={page}, pagesize={pagesize}" + ) + + # 1. 查询情景记忆总数(不受筛选条件限制) + total_all_query = """ + MATCH (s:MemorySummary) + WHERE s.end_user_id = $end_user_id + RETURN count(s) AS total + """ + total_all_result = await self.neo4j_connector.execute_query( + total_all_query, end_user_id=end_user_id + ) + total_all = total_all_result[0]["total"] if total_all_result else 0 + + # 2. 构建筛选条件 + where_clauses = ["s.end_user_id = $end_user_id"] + params = {"end_user_id": end_user_id} + + # 时间戳筛选(毫秒时间戳转为 UTC ISO 字符串,使用 Neo4j datetime() 精确比较) + if start_date is not None and end_date is not None: + from datetime import datetime, timezone + start_dt = datetime.fromtimestamp(start_date / 1000, tz=timezone.utc) + end_dt = datetime.fromtimestamp(end_date / 1000, tz=timezone.utc) + # 开始时间取当天 UTC 00:00:00,结束时间取当天 UTC 23:59:59.999999 + start_iso = start_dt.strftime("%Y-%m-%dT") + "00:00:00.000000" + end_iso = end_dt.strftime("%Y-%m-%dT") + "23:59:59.999999" + + where_clauses.append("datetime(s.created_at) >= datetime($start_iso) AND datetime(s.created_at) <= datetime($end_iso)") + params["start_iso"] = start_iso + params["end_iso"] = end_iso + + # 类型筛选下推到 Cypher(兼容中英文) + if episodic_type != "all": + type_mapping = { + "conversation": "对话", + "project_work": "项目/工作", + "learning": "学习", + "decision": "决策", + "important_event": "重要事件" + } + chinese_type = type_mapping.get(episodic_type) + if chinese_type: + where_clauses.append( + "(s.memory_type = $episodic_type OR s.memory_type = $chinese_type)" + ) + params["episodic_type"] = episodic_type + params["chinese_type"] = chinese_type + else: + where_clauses.append("s.memory_type = $episodic_type") + params["episodic_type"] = episodic_type + + where_str = " AND ".join(where_clauses) + + # 3. 查询筛选后的总数 + count_query = f""" + MATCH (s:MemorySummary) + WHERE {where_str} + RETURN count(s) AS total + """ + count_result = await self.neo4j_connector.execute_query(count_query, **params) + filtered_total = count_result[0]["total"] if count_result else 0 + + # 4. 查询分页数据 + skip = (page - 1) * pagesize + data_query = f""" + MATCH (s:MemorySummary) + WHERE {where_str} + RETURN elementId(s) AS id, + s.name AS title, + s.memory_type AS memory_type, + s.content AS content, + s.created_at AS created_at + ORDER BY s.created_at DESC + SKIP $skip LIMIT $limit + """ + params["skip"] = skip + params["limit"] = pagesize + + result = await self.neo4j_connector.execute_query(data_query, **params) + + # 5. 处理结果 + items = [] + if result: + for record in result: + raw_created_at = record.get("created_at") + created_at_timestamp = self.parse_timestamp(raw_created_at) + items.append({ + "id": record["id"], + "title": record.get("title") or "未命名", + "memory_type": record.get("memory_type") or "其他", + "content": record.get("content") or "", + "created_at": created_at_timestamp + }) + + # 6. 构建返回结果 + return { + "total": total_all, + "items": items, + "page": { + "page": page, + "pagesize": pagesize, + "total": filtered_total, + "hasnext": (page * pagesize) < filtered_total + } + } + + except Exception as e: + logger.error(f"情景记忆分页查询出错: {str(e)}", exc_info=True) + raise + + async def get_semantic_memory_list( + self, + end_user_id: str + ) -> list: + """ + 获取语义记忆全量列表 + + Args: + end_user_id: 终端用户ID + + Returns: + [ + { + "id": str, + "name": str, + "entity_type": str, + "core_definition": str + } + ] + """ + try: + logger.info(f"语义记忆列表查询: end_user_id={end_user_id}") + + semantic_query = """ + MATCH (e:ExtractedEntity) + WHERE e.end_user_id = $end_user_id + AND e.is_explicit_memory = true + RETURN elementId(e) AS id, + e.name AS name, + e.entity_type AS entity_type, + e.description AS core_definition + ORDER BY e.name ASC + """ + + result = await self.neo4j_connector.execute_query( + semantic_query, end_user_id=end_user_id + ) + + items = [] + if result: + for record in result: + items.append({ + "id": record["id"], + "name": record.get("name") or "未命名", + "entity_type": record.get("entity_type") or "未分类", + "core_definition": record.get("core_definition") or "" + }) + + logger.info(f"语义记忆列表查询成功: end_user_id={end_user_id}, total={len(items)}") + + return items + + except Exception as e: + logger.error(f"语义记忆列表查询出错: {str(e)}", exc_info=True) + raise + async def get_explicit_memory_details( self, end_user_id: str,