diff --git a/api/app/models/end_user_model.py b/api/app/models/end_user_model.py index 30b56fc5..048d42ea 100644 --- a/api/app/models/end_user_model.py +++ b/api/app/models/end_user_model.py @@ -51,6 +51,12 @@ class EndUser(Base): growth_trajectory = Column(Text, nullable=True, comment="成长轨迹") memory_insight_updated_at = Column(DateTime, nullable=True, comment="洞察报告最后更新时间") + # RAG存储模式专用字段 - RAG Storage Mode Fields + storage_type = Column(String, nullable=True, default="neo4j", comment="存储模式类型: neo4j / rag") + rag_tags = Column(Text, nullable=True, comment="RAG模式下提取的标签列表(JSON格式)") + rag_personas = Column(Text, nullable=True, comment="RAG模式下提取的人物形象列表(JSON格式)") + rag_summary_updated_at = Column(DateTime, nullable=True, comment="RAG摘要/标签/人物形象最后更新时间") + # 与 App 的反向关系 app = relationship( "App", diff --git a/api/app/repositories/end_user_repository.py b/api/app/repositories/end_user_repository.py index 48c9c4ec..0b828a8b 100644 --- a/api/app/repositories/end_user_repository.py +++ b/api/app/repositories/end_user_repository.py @@ -220,6 +220,90 @@ class EndUserRepository: db_logger.error(f"更新终端用户 {end_user_id} 的用户摘要缓存时出错: {str(e)}") raise + def update_rag_summary_tags( + self, + end_user_id: uuid.UUID, + user_summary: str, + rag_tags: str, + rag_personas: str, + ) -> bool: + """更新RAG模式下的用户摘要、标签和人物形象缓存 + + Args: + end_user_id: 终端用户ID + user_summary: 用户摘要文本 + rag_tags: 标签列表(JSON字符串) + rag_personas: 人物形象列表(JSON字符串) + + Returns: + bool: 更新成功返回True,否则返回False + """ + try: + updated_count = ( + self.db.query(EndUser) + .filter(EndUser.id == end_user_id) + .update( + { + EndUser.user_summary: user_summary, + EndUser.rag_tags: rag_tags, + EndUser.rag_personas: rag_personas, + EndUser.storage_type: "rag", + EndUser.rag_summary_updated_at: datetime.datetime.now(), + }, + synchronize_session=False + ) + ) + self.db.commit() + if updated_count > 0: + db_logger.info(f"成功更新终端用户 {end_user_id} 的RAG摘要/标签/人物形象缓存") + return True + else: + db_logger.warning(f"未找到终端用户 {end_user_id},无法更新RAG摘要缓存") + return False + except Exception as e: + self.db.rollback() + db_logger.error(f"更新终端用户 {end_user_id} 的RAG摘要缓存时出错: {str(e)}") + raise + + def update_rag_insight( + self, + end_user_id: uuid.UUID, + memory_insight: str, + ) -> bool: + """更新RAG模式下的记忆洞察缓存 + + Args: + end_user_id: 终端用户ID + memory_insight: 洞察文本 + + Returns: + bool: 更新成功返回True,否则返回False + """ + try: + updated_count = ( + self.db.query(EndUser) + .filter(EndUser.id == end_user_id) + .update( + { + EndUser.memory_insight: memory_insight, + EndUser.storage_type: "rag", + EndUser.memory_insight_updated_at: datetime.datetime.now(), + }, + synchronize_session=False + ) + ) + self.db.commit() + if updated_count > 0: + db_logger.info(f"成功更新终端用户 {end_user_id} 的RAG洞察缓存") + return True + else: + db_logger.warning(f"未找到终端用户 {end_user_id},无法更新RAG洞察缓存") + return False + except Exception as e: + self.db.rollback() + db_logger.error(f"更新终端用户 {end_user_id} 的RAG洞察缓存时出错: {str(e)}") + raise + def get_all_by_workspace(self, workspace_id: uuid.UUID) -> List[EndUser]: """获取工作空间的所有终端用户 diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 63a9c361..6559ef2f 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -647,55 +647,63 @@ async def get_chunk_summary_and_tags( ) -> dict: """ 获取chunk的总结、标签和人物形象 - - Args: - end_user_id: 宿主ID - limit: 返回的chunk数量限制 - max_tags: 最大标签数量 - db: 数据库会话 - current_user: 当前用户 - - Returns: - 包含summary、tags和personas的字典 + 优先返回end_user表中的缓存,若无缓存则实时生成并写库 """ + import json + from app.repositories.end_user_repository import EndUserRepository + business_logger.info(f"获取chunk摘要、标签和人物形象: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}") - + try: - # 1. 获取chunk内容 + repo = EndUserRepository(db) + end_user = repo.get_by_id(uuid.UUID(end_user_id)) + + # 读缓存:user_summary / rag_tags / rag_personas 均有值时直接返回 + if ( + end_user + and end_user.user_summary + and end_user.rag_tags + and end_user.rag_personas + ): + business_logger.info(f"命中缓存,直接返回end_user {end_user_id} 的摘要/标签/人物形象") + return { + "summary": end_user.user_summary, + "tags": json.loads(end_user.rag_tags), + "personas": json.loads(end_user.rag_personas), + } + + # 无缓存:实时生成 rag_content = get_rag_content(end_user_id, limit, db, current_user) chunks = rag_content.get("contents", []) - + if not chunks: business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}") - return { - "summary": "暂无内容", - "tags": [], - "personas": [] - } - - # 2. 导入RAG工具函数 + return {"summary": "暂无内容", "tags": [], "personas": []} + from app.core.rag_utils import generate_chunk_summary, extract_chunk_tags, extract_chunk_persona - - # 3. 并发生成摘要、提取标签和人物形象 import asyncio - summary_task = generate_chunk_summary(chunks, max_chunks=limit, end_user_id=end_user_id) - tags_task = extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit, end_user_id=end_user_id) - personas_task = extract_chunk_persona(chunks, max_personas=5, max_chunks=limit, end_user_id=end_user_id) - - summary, tags_with_freq, personas = await asyncio.gather(summary_task, tags_task, personas_task) - - # 4. 格式化标签数据 + + summary, tags_with_freq, personas = await asyncio.gather( + generate_chunk_summary(chunks, max_chunks=limit, end_user_id=end_user_id), + extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit, end_user_id=end_user_id), + extract_chunk_persona(chunks, max_personas=5, max_chunks=limit, end_user_id=end_user_id), + ) + tags = [{"tag": tag, "frequency": freq} for tag, freq in tags_with_freq] - - result = { - "summary": summary, - "tags": tags, - "personas": personas - } - + + # 写库缓存 + if end_user: + repo.update_rag_summary_tags( + end_user_id=end_user.id, + user_summary=summary, + rag_tags=json.dumps(tags, ensure_ascii=False), + rag_personas=json.dumps(personas, ensure_ascii=False), + ) + + result = {"summary": summary, "tags": tags, "personas": personas} business_logger.info(f"成功获取chunk摘要、{len(tags)} 个标签和 {len(personas)} 个人物形象") return result - + except Exception as e: business_logger.error(f"获取chunk摘要、标签和人物形象失败: end_user_id={end_user_id} - {str(e)}") raise @@ -709,42 +717,40 @@ async def get_chunk_insight( ) -> dict: """ 获取chunk的洞察分析 - - Args: - end_user_id: 宿主ID - limit: 返回的chunk数量限制 - db: 数据库会话 - current_user: 当前用户 - - Returns: - 包含insight的字典 + 优先返回end_user表中的缓存,若无缓存则实时生成并写库 """ + from app.repositories.end_user_repository import EndUserRepository + business_logger.info(f"获取chunk洞察: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}") - + try: - # 1. 获取chunk内容 + repo = EndUserRepository(db) + end_user = repo.get_by_id(uuid.UUID(end_user_id)) + + # 读缓存 + if end_user and end_user.memory_insight: + business_logger.info(f"命中缓存,直接返回end_user {end_user_id} 的洞察") + return {"insight": end_user.memory_insight} + + # 无缓存:实时生成 rag_content = get_rag_content(end_user_id, limit, db, current_user) chunks = rag_content.get("contents", []) - + if not chunks: business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}") - return { - "insight": "暂无足够数据生成洞察报告" - } - - # 2. 导入RAG工具函数 + return {"insight": "暂无足够数据生成洞察报告"} + from app.core.rag_utils import generate_chunk_insight - - # 3. 生成洞察 + insight = await generate_chunk_insight(chunks, max_chunks=limit, end_user_id=end_user_id) - - result = { - "insight": insight - } - + + # 写库缓存 + if end_user: + repo.update_rag_insight(end_user_id=end_user.id, memory_insight=insight) + business_logger.info("成功获取chunk洞察") - return result - + return {"insight": insight} + except Exception as e: business_logger.error(f"获取chunk洞察失败: end_user_id={end_user_id} - {str(e)}") raise \ No newline at end of file