""" 用户记忆相关的控制器 包含用户摘要、记忆洞察、节点统计、图数据和用户档案等接口 """ from typing import Optional import datetime from sqlalchemy.orm import Session from fastapi import APIRouter, Depends, Header from app.db import get_db from app.core.language_utils import get_language_from_header from app.core.logging_config import get_api_logger from app.core.response_utils import success, fail from app.core.error_codes import BizCode from app.core.api_key_utils import timestamp_to_datetime from app.services.user_memory_service import ( UserMemoryService, analytics_memory_types, analytics_graph_data, analytics_community_graph_data, ) from app.services.memory_entity_relationship_service import MemoryEntityService, MemoryEmotion, MemoryInteraction from app.schemas.response_schema import ApiResponse from app.schemas.memory_storage_schema import GenerateCacheRequest from app.repositories.workspace_repository import WorkspaceRepository from app.repositories.end_user_repository import EndUserRepository from app.schemas.end_user_info_schema import ( EndUserInfoResponse, EndUserInfoCreate, EndUserInfoUpdate, ) from app.models.end_user_model import EndUser from app.dependencies import get_current_user from app.models.user_model import User # Get API logger api_logger = get_api_logger() # Initialize service user_memory_service = UserMemoryService() router = APIRouter( prefix="/memory-storage", tags=["User Memory"], ) @router.get("/analytics/memory_insight/report", response_model=ApiResponse) async def get_memory_insight_report_api( end_user_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ) -> dict: """ 获取缓存的记忆洞察报告 此接口仅查询数据库中已缓存的记忆洞察数据,不执行生成操作。 如需生成新的洞察报告,请使用专门的生成接口。 """ api_logger.info(f"记忆洞察报告查询请求: end_user_id={end_user_id}, user={current_user.username}") try: # 调用服务层获取缓存数据 result = await user_memory_service.get_cached_memory_insight(db, end_user_id) if result["is_cached"]: api_logger.info(f"成功返回缓存的记忆洞察报告: end_user_id={end_user_id}") return success(data=result, msg="查询成功") else: api_logger.info(f"记忆洞察报告缓存不存在: end_user_id={end_user_id}") return success(data=result, msg="数据尚未生成") except Exception as e: api_logger.error(f"记忆洞察报告查询失败: end_user_id={end_user_id}, error={str(e)}") return fail(BizCode.INTERNAL_ERROR, "记忆洞察报告查询失败", str(e)) @router.get("/analytics/user_summary", response_model=ApiResponse) async def get_user_summary_api( end_user_id: str, language_type: str = Header(default=None, alias="X-Language-Type"), current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ) -> dict: """ 获取缓存的用户摘要 此接口仅查询数据库中已缓存的用户摘要数据,不执行生成操作。 如需生成新的用户摘要,请使用专门的生成接口。 语言控制: - 使用 X-Language-Type Header 指定语言 - 如果未传 Header,默认使用中文 (zh) """ # 使用集中化的语言校验 language = get_language_from_header(language_type) workspace_id = current_user.current_workspace_id workspace_repo = WorkspaceRepository(db) workspace_models = workspace_repo.get_workspace_models_configs(workspace_id) if workspace_models: model_id = workspace_models.get("llm", None) else: model_id = None api_logger.info(f"用户摘要查询请求: end_user_id={end_user_id}, user={current_user.username}") try: # 调用服务层获取缓存数据 result = await user_memory_service.get_cached_user_summary(db, end_user_id, model_id, language) if result["is_cached"]: api_logger.info(f"成功返回缓存的用户摘要: end_user_id={end_user_id}") return success(data=result, msg="查询成功") else: api_logger.info(f"用户摘要缓存不存在: end_user_id={end_user_id}") return success(data=result, msg="数据尚未生成") except Exception as e: api_logger.error(f"用户摘要查询失败: end_user_id={end_user_id}, error={str(e)}") return fail(BizCode.INTERNAL_ERROR, "用户摘要查询失败", str(e)) @router.post("/analytics/generate_cache", response_model=ApiResponse) async def generate_cache_api( request: GenerateCacheRequest, language_type: str = Header(default=None, alias="X-Language-Type"), current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ) -> dict: """ 手动触发缓存生成 - 如果提供 end_user_id,只为该用户生成 - 如果不提供,为当前工作空间的所有用户生成 语言控制: - 使用 X-Language-Type Header 指定语言 ("zh" 中文, "en" 英文) - 如果未传 Header,默认使用中文 (zh) """ # 使用集中化的语言校验 language = get_language_from_header(language_type) workspace_id = current_user.current_workspace_id # 检查用户是否已选择工作空间 if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试生成缓存但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") end_user_id = request.end_user_id api_logger.info( f"缓存生成请求: user={current_user.username}, workspace={workspace_id}, " f"end_user_id={end_user_id if end_user_id else '全部用户'}, language={language}" ) try: if end_user_id: # 为单个用户生成 api_logger.info(f"开始为单个用户生成缓存: end_user_id={end_user_id}") # 生成记忆洞察 insight_result = await user_memory_service.generate_and_cache_insight(db, end_user_id, workspace_id, language=language) # 生成用户摘要 summary_result = await user_memory_service.generate_and_cache_summary(db, end_user_id, workspace_id, language=language) # 构建响应 result = { "end_user_id": end_user_id, "insight_success": insight_result["success"], "summary_success": summary_result["success"], "errors": [] } # 收集错误信息 if not insight_result["success"]: result["errors"].append({ "type": "insight", "error": insight_result.get("error") }) if not summary_result["success"]: result["errors"].append({ "type": "summary", "error": summary_result.get("error") }) # 记录结果 if result["insight_success"] and result["summary_success"]: api_logger.info(f"成功为用户 {end_user_id} 生成缓存") else: api_logger.warning(f"用户 {end_user_id} 的缓存生成部分失败: {result['errors']}") return success(data=result, msg="生成完成") else: # 为整个工作空间生成 api_logger.info(f"开始为工作空间 {workspace_id} 批量生成缓存") result = await user_memory_service.generate_cache_for_workspace(db, workspace_id, language=language) # 记录统计信息 api_logger.info( f"工作空间 {workspace_id} 批量生成完成: " f"总数={result['total_users']}, 成功={result['successful']}, 失败={result['failed']}" ) return success(data=result, msg="批量生成完成") except Exception as e: api_logger.error(f"缓存生成失败: user={current_user.username}, error={str(e)}") return fail(BizCode.INTERNAL_ERROR, "缓存生成失败", str(e)) @router.get("/analytics/node_statistics", response_model=ApiResponse) async def get_node_statistics_api( end_user_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ) -> dict: workspace_id = current_user.current_workspace_id # 检查用户是否已选择工作空间 if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试查询节点统计但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") api_logger.info( f"记忆类型统计请求: end_user_id={end_user_id}, user={current_user.username}, workspace={workspace_id}") try: # 调用新的记忆类型统计函数 result = await analytics_memory_types(db, end_user_id) # 计算总数用于日志 total_count = sum(item["count"] for item in result) api_logger.info( f"成功获取记忆类型统计: end_user_id={end_user_id}, 总记忆数={total_count}, 类型数={len(result)}") return success(data=result, msg="查询成功") except Exception as e: api_logger.error(f"记忆类型查询失败: end_user_id={end_user_id}, error={str(e)}") return fail(BizCode.INTERNAL_ERROR, "记忆类型查询失败", str(e)) @router.get("/analytics/graph_data", response_model=ApiResponse) async def get_graph_data_api( end_user_id: str, node_types: Optional[str] = None, limit: int = 100, depth: int = 1, center_node_id: Optional[str] = None, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ) -> dict: workspace_id = current_user.current_workspace_id # 检查用户是否已选择工作空间 if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试查询图数据但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") # 参数验证 if limit > 1000: limit = 1000 api_logger.warning("limit 参数超过最大值,已调整为 1000") if depth > 3: depth = 3 api_logger.warning("depth 参数超过最大值,已调整为 3") # 解析 node_types 参数 node_types_list = None if node_types: node_types_list = [t.strip() for t in node_types.split(",") if t.strip()] api_logger.info( f"图数据查询请求: end_user_id={end_user_id}, user={current_user.username}, " f"workspace={workspace_id}, node_types={node_types_list}, limit={limit}, depth={depth}" ) try: result = await analytics_graph_data( db=db, end_user_id=end_user_id, node_types=node_types_list, limit=limit, depth=depth, center_node_id=center_node_id ) # 检查是否有错误消息 if "message" in result and result["statistics"]["total_nodes"] == 0: api_logger.warning(f"图数据查询返回空结果: {result.get('message')}") return success(data=result, msg=result.get("message", "查询成功")) api_logger.info( f"成功获取图数据: end_user_id={end_user_id}, " f"nodes={result['statistics']['total_nodes']}, " f"edges={result['statistics']['total_edges']}" ) return success(data=result, msg="查询成功") except Exception as e: api_logger.error(f"图数据查询失败: end_user_id={end_user_id}, error={str(e)}") return fail(BizCode.INTERNAL_ERROR, "图数据查询失败", str(e)) @router.get("/analytics/community_graph", response_model=ApiResponse) async def get_community_graph_data_api( end_user_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ) -> dict: workspace_id = current_user.current_workspace_id if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试查询社区图谱但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") api_logger.info( f"社区图谱查询请求: end_user_id={end_user_id}, user={current_user.username}, " f"workspace={workspace_id}" ) try: result = await analytics_community_graph_data(db=db, end_user_id=end_user_id) if "message" in result and result["statistics"]["total_nodes"] == 0: api_logger.warning(f"社区图谱查询返回空结果: {result.get('message')}") return success(data=result, msg=result.get("message", "查询成功")) api_logger.info( f"成功获取社区图谱: end_user_id={end_user_id}, " f"nodes={result['statistics']['total_nodes']}, " f"edges={result['statistics']['total_edges']}" ) return success(data=result, msg="查询成功") except Exception as e: api_logger.error(f"社区图谱查询失败: end_user_id={end_user_id}, error={str(e)}") return fail(BizCode.INTERNAL_ERROR, "社区图谱查询失败", str(e)) #=======================终端用户信息接口======================= @router.get("/end_user_info", response_model=ApiResponse) async def get_end_user_info( end_user_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ) -> dict: """ 查询终端用户信息记录 根据 end_user_id 查询单条终端用户信息记录。 """ workspace_id = current_user.current_workspace_id if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试查询终端用户信息但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") api_logger.info( f"查询终端用户信息请求: end_user_id={end_user_id}, user={current_user.username}, " f"workspace={workspace_id}" ) # 校验 end_user 是否属于当前工作空间 end_user_repo = EndUserRepository(db) end_user = end_user_repo.get_end_user_by_id(end_user_id) if end_user is None: return fail(BizCode.USER_NOT_FOUND, "终端用户不存在", "end_user not found") if str(end_user.workspace_id) != str(workspace_id): api_logger.warning( f"用户 {current_user.username} 尝试查询不属于工作空间 {workspace_id} 的终端用户 {end_user_id}" ) return fail(BizCode.PERMISSION_DENIED, "该终端用户不属于当前工作空间", "end_user workspace mismatch") result = user_memory_service.get_end_user_info(db, end_user_id) if result["success"]: api_logger.info(f"成功查询终端用户信息: end_user_id={end_user_id}") return success(data=result["data"], msg="查询成功") else: error_msg = result["error"] api_logger.error(f"查询终端用户信息失败: end_user_id={end_user_id}, error={error_msg}") if error_msg == "终端用户信息记录不存在": return fail(BizCode.USER_NOT_FOUND, "终端用户信息记录不存在", error_msg) elif error_msg == "无效的终端用户ID格式": return fail(BizCode.INVALID_USER_ID, "无效的终端用户ID格式", error_msg) else: return fail(BizCode.INTERNAL_ERROR, "查询终端用户信息失败", error_msg) @router.post("/end_user_info/updated", response_model=ApiResponse) async def update_end_user_info( info_update: EndUserInfoUpdate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ) -> dict: """ 更新终端用户信息记录 根据 end_user_id 更新终端用户信息记录,支持批量更新多个别名。 示例请求体: { "end_user_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", "other_name": "张三1", "aliases": ["小张", "张工"], "meta_data": {"position": "工程师", "department": "技术部"} } """ workspace_id = current_user.current_workspace_id end_user_id = info_update.end_user_id if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试更新终端用户信息但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") api_logger.info( f"更新终端用户信息请求: end_user_id={end_user_id}, user={current_user.username}, " f"workspace={workspace_id}" ) # 校验 end_user 是否属于当前工作空间 end_user_repo = EndUserRepository(db) end_user = end_user_repo.get_end_user_by_id(end_user_id) if end_user is None: return fail(BizCode.USER_NOT_FOUND, "终端用户不存在", "end_user not found") if str(end_user.workspace_id) != str(workspace_id): api_logger.warning( f"用户 {current_user.username} 尝试更新不属于工作空间 {workspace_id} 的终端用户 {end_user_id}" ) return fail(BizCode.PERMISSION_DENIED, "该终端用户不属于当前工作空间", "end_user workspace mismatch") # 获取更新数据(排除 end_user_id) update_data = info_update.model_dump(exclude_unset=True, exclude={'end_user_id'}) result = user_memory_service.update_end_user_info(db, end_user_id, update_data) if result["success"]: api_logger.info(f"成功更新终端用户信息: end_user_id={end_user_id}") return success(data=result["data"], msg="更新成功") else: error_msg = result["error"] api_logger.error(f"终端用户信息更新失败: end_user_id={end_user_id}, error={error_msg}") if error_msg == "终端用户信息记录不存在": return fail(BizCode.USER_NOT_FOUND, "终端用户信息记录不存在", error_msg) elif error_msg == "无效的终端用户ID格式": return fail(BizCode.INVALID_USER_ID, "无效的终端用户ID格式", error_msg) else: return fail(BizCode.INTERNAL_ERROR, "终端用户信息更新失败", error_msg) @router.get("/memory_space/timeline_memories", response_model=ApiResponse) async def memory_space_timeline_of_shared_memories( id: str, label: str, language_type: str = Header(default=None, alias="X-Language-Type"), current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): # 使用集中化的语言校验 language = get_language_from_header(language_type) workspace_id = current_user.current_workspace_id workspace_repo = WorkspaceRepository(db) workspace_models = workspace_repo.get_workspace_models_configs(workspace_id) if workspace_models: model_id = workspace_models.get("llm", None) else: model_id = None MemoryEntity = MemoryEntityService(id, label) timeline_memories_result = await MemoryEntity.get_timeline_memories_server(model_id, language) return success(data=timeline_memories_result, msg="共同记忆时间线") @router.get("/memory_space/relationship_evolution", response_model=ApiResponse) async def memory_space_relationship_evolution(id: str, label: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): try: api_logger.info(f"关系演变查询请求: id={id}, table={label}, user={current_user.username}") # 获取情绪数据 emotion = MemoryEmotion(id, label) emotion_result = await emotion.get_emotion() # 获取交互数据 interaction = MemoryInteraction(id, label) interaction_result = await interaction.get_interaction_frequency() # 关闭连接 await emotion.close() await interaction.close() result = { "emotion": emotion_result, "interaction": interaction_result } api_logger.info(f"关系演变查询成功: id={id}, table={label}") return success(data=result, msg="关系演变") except Exception as e: api_logger.error(f"关系演变查询失败: id={id}, table={label}, error={str(e)}", exc_info=True) return fail(BizCode.INTERNAL_ERROR, "关系演变查询失败", str(e))