From e4aaa18f6108189256b0a0281b433b5b2d29aa73 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Mon, 9 Mar 2026 18:50:32 +0800 Subject: [PATCH 1/6] [changes] User summaries stored in RAG, generation of memory insights --- api/app/core/rag_utils/chunk_insight.py | 42 +++++++++++++++----- api/app/core/rag_utils/chunk_summary.py | 32 ++++++++++++--- api/app/core/rag_utils/chunk_tags.py | 36 +++++++++++++---- api/app/services/memory_dashboard_service.py | 8 ++-- 4 files changed, 89 insertions(+), 29 deletions(-) diff --git a/api/app/core/rag_utils/chunk_insight.py b/api/app/core/rag_utils/chunk_insight.py index e904e53d..9fbdbbb2 100644 --- a/api/app/core/rag_utils/chunk_insight.py +++ b/api/app/core/rag_utils/chunk_insight.py @@ -5,8 +5,9 @@ This module provides functionality to analyze chunk content and generate insight """ import asyncio +import os from collections import Counter -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from app.core.logging_config import get_business_logger from app.core.memory.utils.llm.llm_utils import MemoryClientFactory @@ -15,12 +16,31 @@ from pydantic import BaseModel, Field business_logger = get_business_logger() +DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus") -def _get_llm_client(): - """Get LLM client using db context.""" + +def _get_llm_client(end_user_id: Optional[str] = None): + """Get LLM client, preferring user-connected config with fallback to default.""" with get_db_context() as db: + try: + if end_user_id: + from app.services.memory_agent_service import get_end_user_connected_config + from app.services.memory_config_service import MemoryConfigService + connected_config = get_end_user_connected_config(end_user_id, db) + config_id = connected_config.get("memory_config_id") + workspace_id = connected_config.get("workspace_id") + if config_id or workspace_id: + config_service = MemoryConfigService(db) + memory_config = config_service.load_memory_config( + config_id=config_id, + workspace_id=workspace_id + ) + factory = MemoryClientFactory(db) + return factory.get_llm_client(memory_config.llm_model_id) + except Exception as e: + business_logger.warning(f"Failed to get user connected config, using default LLM: {e}") factory = MemoryClientFactory(db) - return factory.get_llm_client(None) # Uses default LLM + return factory.get_llm_client(DEFAULT_LLM_ID) class ChunkInsight(BaseModel): @@ -37,7 +57,7 @@ class DomainClassification(BaseModel): ) -async def classify_chunk_domain(chunk: str) -> str: +async def classify_chunk_domain(chunk: str, end_user_id: Optional[str] = None) -> str: """ Classify a chunk into a specific domain. @@ -48,7 +68,7 @@ async def classify_chunk_domain(chunk: str) -> str: Domain name """ try: - llm_client = _get_llm_client() + llm_client = _get_llm_client(end_user_id) prompt = f"""请将以下文本内容归类到最合适的领域中。 @@ -82,7 +102,7 @@ async def classify_chunk_domain(chunk: str) -> str: return "其他" -async def analyze_domain_distribution(chunks: List[str], max_chunks: int = 20) -> Dict[str, float]: +async def analyze_domain_distribution(chunks: List[str], max_chunks: int = 20, end_user_id: Optional[str] = None) -> Dict[str, float]: """ Analyze the domain distribution of chunks. @@ -103,7 +123,7 @@ async def analyze_domain_distribution(chunks: List[str], max_chunks: int = 20) - # 为每个chunk分类 domain_counts = Counter() for chunk in chunks_to_analyze: - domain = await classify_chunk_domain(chunk) + domain = await classify_chunk_domain(chunk, end_user_id) domain_counts[domain] += 1 # 计算百分比 @@ -121,7 +141,7 @@ async def analyze_domain_distribution(chunks: List[str], max_chunks: int = 20) - return {} -async def generate_chunk_insight(chunks: List[str], max_chunks: int = 15) -> str: +async def generate_chunk_insight(chunks: List[str], max_chunks: int = 15, end_user_id: Optional[str] = None) -> str: """ Generate insights from the given chunks. @@ -138,7 +158,7 @@ async def generate_chunk_insight(chunks: List[str], max_chunks: int = 15) -> str try: # 1. 分析领域分布 - domain_dist = await analyze_domain_distribution(chunks, max_chunks=max_chunks) + domain_dist = await analyze_domain_distribution(chunks, max_chunks=max_chunks, end_user_id=end_user_id) # 2. 统计基本信息 total_chunks = len(chunks) @@ -185,7 +205,7 @@ async def generate_chunk_insight(chunks: List[str], max_chunks: int = 15) -> str ] # 调用LLM生成洞察 - llm_client = _get_llm_client() + llm_client = _get_llm_client(end_user_id) response = await llm_client.chat(messages=messages) insight = response.content.strip() diff --git a/api/app/core/rag_utils/chunk_summary.py b/api/app/core/rag_utils/chunk_summary.py index 7f69af88..53df2ab3 100644 --- a/api/app/core/rag_utils/chunk_summary.py +++ b/api/app/core/rag_utils/chunk_summary.py @@ -5,7 +5,8 @@ This module provides functionality to summarize chunk content using LLM. """ import asyncio -from typing import Any, Dict, List +import os +from typing import Any, Dict, List, Optional from app.core.logging_config import get_business_logger from app.core.memory.utils.llm.llm_utils import MemoryClientFactory @@ -14,12 +15,31 @@ from pydantic import BaseModel, Field business_logger = get_business_logger() +DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus") -def _get_llm_client(): - """Get LLM client using db context.""" + +def _get_llm_client(end_user_id: Optional[str] = None): + """Get LLM client, preferring user-connected config with fallback to default.""" with get_db_context() as db: + try: + if end_user_id: + from app.services.memory_agent_service import get_end_user_connected_config + from app.services.memory_config_service import MemoryConfigService + connected_config = get_end_user_connected_config(end_user_id, db) + config_id = connected_config.get("memory_config_id") + workspace_id = connected_config.get("workspace_id") + if config_id or workspace_id: + config_service = MemoryConfigService(db) + memory_config = config_service.load_memory_config( + config_id=config_id, + workspace_id=workspace_id + ) + factory = MemoryClientFactory(db) + return factory.get_llm_client(memory_config.llm_model_id) + except Exception as e: + business_logger.warning(f"Failed to get user connected config, using default LLM: {e}") factory = MemoryClientFactory(db) - return factory.get_llm_client(None) # Uses default LLM + return factory.get_llm_client(DEFAULT_LLM_ID) class ChunkSummary(BaseModel): @@ -27,7 +47,7 @@ class ChunkSummary(BaseModel): summary: str = Field(..., description="简洁的chunk内容摘要") -async def generate_chunk_summary(chunks: List[str], max_chunks: int = 10) -> str: +async def generate_chunk_summary(chunks: List[str], max_chunks: int = 10, end_user_id: Optional[str] = None) -> str: """ Generate a summary for the given chunks. @@ -67,7 +87,7 @@ async def generate_chunk_summary(chunks: List[str], max_chunks: int = 10) -> str ] # 调用LLM生成摘要 - llm_client = _get_llm_client() + llm_client = _get_llm_client(end_user_id) response = await llm_client.chat(messages=messages) summary = response.content.strip() diff --git a/api/app/core/rag_utils/chunk_tags.py b/api/app/core/rag_utils/chunk_tags.py index 2057f8ac..98ab4a33 100644 --- a/api/app/core/rag_utils/chunk_tags.py +++ b/api/app/core/rag_utils/chunk_tags.py @@ -5,8 +5,9 @@ This module provides functionality to extract meaningful tags from chunk content """ import asyncio +import os from collections import Counter -from typing import List, Tuple +from typing import List, Optional, Tuple from app.core.logging_config import get_business_logger from app.core.memory.utils.llm.llm_utils import MemoryClientFactory @@ -15,12 +16,31 @@ from pydantic import BaseModel, Field business_logger = get_business_logger() +DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus") -def _get_llm_client(): - """Get LLM client using db context.""" + +def _get_llm_client(end_user_id: Optional[str] = None): + """Get LLM client, preferring user-connected config with fallback to default.""" with get_db_context() as db: + try: + if end_user_id: + from app.services.memory_agent_service import get_end_user_connected_config + from app.services.memory_config_service import MemoryConfigService + connected_config = get_end_user_connected_config(end_user_id, db) + config_id = connected_config.get("memory_config_id") + workspace_id = connected_config.get("workspace_id") + if config_id or workspace_id: + config_service = MemoryConfigService(db) + memory_config = config_service.load_memory_config( + config_id=config_id, + workspace_id=workspace_id + ) + factory = MemoryClientFactory(db) + return factory.get_llm_client(memory_config.llm_model_id) + except Exception as e: + business_logger.warning(f"Failed to get user connected config, using default LLM: {e}") factory = MemoryClientFactory(db) - return factory.get_llm_client(None) # Uses default LLM + return factory.get_llm_client(DEFAULT_LLM_ID) class ExtractedTags(BaseModel): @@ -33,7 +53,7 @@ class ExtractedPersona(BaseModel): personas: List[str] = Field(..., description="从文本中提取的人物形象列表,如'产品设计师'、'旅行爱好者'等") -async def extract_chunk_tags(chunks: List[str], max_tags: int = 10, max_chunks: int = 10) -> List[Tuple[str, int]]: +async def extract_chunk_tags(chunks: List[str], max_tags: int = 10, max_chunks: int = 10, end_user_id: Optional[str] = None) -> List[Tuple[str, int]]: """ Extract meaningful tags from the given chunks. @@ -64,7 +84,7 @@ async def extract_chunk_tags(chunks: List[str], max_tags: int = 10, max_chunks: "标签应该是名词或名词短语,能够准确概括文本的核心内容。" ) - llm_client = _get_llm_client() + llm_client = _get_llm_client(end_user_id) # 为每个chunk单独提取标签,然后统计频率 all_tags = [] @@ -116,7 +136,7 @@ async def extract_chunk_tags_with_frequency(chunks: List[str], max_tags: int = 1 return await extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=len(chunks)) -async def extract_chunk_persona(chunks: List[str], max_personas: int = 5, max_chunks: int = 20) -> List[str]: +async def extract_chunk_persona(chunks: List[str], max_personas: int = 5, max_chunks: int = 20, end_user_id: Optional[str] = None) -> List[str]: """ Extract persona (人物形象) from the given chunks. @@ -159,7 +179,7 @@ async def extract_chunk_persona(chunks: List[str], max_personas: int = 5, max_ch ] # 调用LLM提取人物形象 - llm_client = _get_llm_client() + llm_client = _get_llm_client(end_user_id) structured_response = await llm_client.response_structured( messages=messages, response_model=ExtractedPersona diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 05aed57e..63a9c361 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -678,9 +678,9 @@ async def get_chunk_summary_and_tags( # 3. 并发生成摘要、提取标签和人物形象 import asyncio - summary_task = generate_chunk_summary(chunks, max_chunks=limit) - tags_task = extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit) - personas_task = extract_chunk_persona(chunks, max_personas=5, max_chunks=limit) + summary_task = generate_chunk_summary(chunks, max_chunks=limit, end_user_id=end_user_id) + tags_task = extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit, end_user_id=end_user_id) + personas_task = extract_chunk_persona(chunks, max_personas=5, max_chunks=limit, end_user_id=end_user_id) summary, tags_with_freq, personas = await asyncio.gather(summary_task, tags_task, personas_task) @@ -736,7 +736,7 @@ async def get_chunk_insight( from app.core.rag_utils import generate_chunk_insight # 3. 生成洞察 - insight = await generate_chunk_insight(chunks, max_chunks=limit) + insight = await generate_chunk_insight(chunks, max_chunks=limit, end_user_id=end_user_id) result = { "insight": insight From e2f5fa87b165b6becd5e3b886fdb65487a3bce48 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 10 Mar 2026 11:41:09 +0800 Subject: [PATCH 2/6] [add] Add cache to RAG storage --- api/app/models/end_user_model.py | 6 + api/app/repositories/end_user_repository.py | 84 ++++++++++++ api/app/services/memory_dashboard_service.py | 132 ++++++++++--------- 3 files changed, 159 insertions(+), 63 deletions(-) diff --git a/api/app/models/end_user_model.py b/api/app/models/end_user_model.py index 30b56fc5..048d42ea 100644 --- a/api/app/models/end_user_model.py +++ b/api/app/models/end_user_model.py @@ -51,6 +51,12 @@ class EndUser(Base): growth_trajectory = Column(Text, nullable=True, comment="成长轨迹") memory_insight_updated_at = Column(DateTime, nullable=True, comment="洞察报告最后更新时间") + # RAG存储模式专用字段 - RAG Storage Mode Fields + storage_type = Column(String, nullable=True, default="neo4j", comment="存储模式类型: neo4j / rag") + rag_tags = Column(Text, nullable=True, comment="RAG模式下提取的标签列表(JSON格式)") + rag_personas = Column(Text, nullable=True, comment="RAG模式下提取的人物形象列表(JSON格式)") + rag_summary_updated_at = Column(DateTime, nullable=True, comment="RAG摘要/标签/人物形象最后更新时间") + # 与 App 的反向关系 app = relationship( "App", diff --git a/api/app/repositories/end_user_repository.py b/api/app/repositories/end_user_repository.py index 48c9c4ec..0b828a8b 100644 --- a/api/app/repositories/end_user_repository.py +++ b/api/app/repositories/end_user_repository.py @@ -220,6 +220,90 @@ class EndUserRepository: db_logger.error(f"更新终端用户 {end_user_id} 的用户摘要缓存时出错: {str(e)}") raise + def update_rag_summary_tags( + self, + end_user_id: uuid.UUID, + user_summary: str, + rag_tags: str, + rag_personas: str, + ) -> bool: + """更新RAG模式下的用户摘要、标签和人物形象缓存 + + Args: + end_user_id: 终端用户ID + user_summary: 用户摘要文本 + rag_tags: 标签列表(JSON字符串) + rag_personas: 人物形象列表(JSON字符串) + + Returns: + bool: 更新成功返回True,否则返回False + """ + try: + updated_count = ( + self.db.query(EndUser) + .filter(EndUser.id == end_user_id) + .update( + { + EndUser.user_summary: user_summary, + EndUser.rag_tags: rag_tags, + EndUser.rag_personas: rag_personas, + EndUser.storage_type: "rag", + EndUser.rag_summary_updated_at: datetime.datetime.now(), + }, + synchronize_session=False + ) + ) + self.db.commit() + if updated_count > 0: + db_logger.info(f"成功更新终端用户 {end_user_id} 的RAG摘要/标签/人物形象缓存") + return True + else: + db_logger.warning(f"未找到终端用户 {end_user_id},无法更新RAG摘要缓存") + return False + except Exception as e: + self.db.rollback() + db_logger.error(f"更新终端用户 {end_user_id} 的RAG摘要缓存时出错: {str(e)}") + raise + + def update_rag_insight( + self, + end_user_id: uuid.UUID, + memory_insight: str, + ) -> bool: + """更新RAG模式下的记忆洞察缓存 + + Args: + end_user_id: 终端用户ID + memory_insight: 洞察文本 + + Returns: + bool: 更新成功返回True,否则返回False + """ + try: + updated_count = ( + self.db.query(EndUser) + .filter(EndUser.id == end_user_id) + .update( + { + EndUser.memory_insight: memory_insight, + EndUser.storage_type: "rag", + EndUser.memory_insight_updated_at: datetime.datetime.now(), + }, + synchronize_session=False + ) + ) + self.db.commit() + if updated_count > 0: + db_logger.info(f"成功更新终端用户 {end_user_id} 的RAG洞察缓存") + return True + else: + db_logger.warning(f"未找到终端用户 {end_user_id},无法更新RAG洞察缓存") + return False + except Exception as e: + self.db.rollback() + db_logger.error(f"更新终端用户 {end_user_id} 的RAG洞察缓存时出错: {str(e)}") + raise + def get_all_by_workspace(self, workspace_id: uuid.UUID) -> List[EndUser]: """获取工作空间的所有终端用户 diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 63a9c361..6559ef2f 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -647,55 +647,63 @@ async def get_chunk_summary_and_tags( ) -> dict: """ 获取chunk的总结、标签和人物形象 - - Args: - end_user_id: 宿主ID - limit: 返回的chunk数量限制 - max_tags: 最大标签数量 - db: 数据库会话 - current_user: 当前用户 - - Returns: - 包含summary、tags和personas的字典 + 优先返回end_user表中的缓存,若无缓存则实时生成并写库 """ + import json + from app.repositories.end_user_repository import EndUserRepository + business_logger.info(f"获取chunk摘要、标签和人物形象: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}") - + try: - # 1. 获取chunk内容 + repo = EndUserRepository(db) + end_user = repo.get_by_id(uuid.UUID(end_user_id)) + + # 读缓存:user_summary / rag_tags / rag_personas 均有值时直接返回 + if ( + end_user + and end_user.user_summary + and end_user.rag_tags + and end_user.rag_personas + ): + business_logger.info(f"命中缓存,直接返回end_user {end_user_id} 的摘要/标签/人物形象") + return { + "summary": end_user.user_summary, + "tags": json.loads(end_user.rag_tags), + "personas": json.loads(end_user.rag_personas), + } + + # 无缓存:实时生成 rag_content = get_rag_content(end_user_id, limit, db, current_user) chunks = rag_content.get("contents", []) - + if not chunks: business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}") - return { - "summary": "暂无内容", - "tags": [], - "personas": [] - } - - # 2. 导入RAG工具函数 + return {"summary": "暂无内容", "tags": [], "personas": []} + from app.core.rag_utils import generate_chunk_summary, extract_chunk_tags, extract_chunk_persona - - # 3. 并发生成摘要、提取标签和人物形象 import asyncio - summary_task = generate_chunk_summary(chunks, max_chunks=limit, end_user_id=end_user_id) - tags_task = extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit, end_user_id=end_user_id) - personas_task = extract_chunk_persona(chunks, max_personas=5, max_chunks=limit, end_user_id=end_user_id) - - summary, tags_with_freq, personas = await asyncio.gather(summary_task, tags_task, personas_task) - - # 4. 格式化标签数据 + + summary, tags_with_freq, personas = await asyncio.gather( + generate_chunk_summary(chunks, max_chunks=limit, end_user_id=end_user_id), + extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit, end_user_id=end_user_id), + extract_chunk_persona(chunks, max_personas=5, max_chunks=limit, end_user_id=end_user_id), + ) + tags = [{"tag": tag, "frequency": freq} for tag, freq in tags_with_freq] - - result = { - "summary": summary, - "tags": tags, - "personas": personas - } - + + # 写库缓存 + if end_user: + repo.update_rag_summary_tags( + end_user_id=end_user.id, + user_summary=summary, + rag_tags=json.dumps(tags, ensure_ascii=False), + rag_personas=json.dumps(personas, ensure_ascii=False), + ) + + result = {"summary": summary, "tags": tags, "personas": personas} business_logger.info(f"成功获取chunk摘要、{len(tags)} 个标签和 {len(personas)} 个人物形象") return result - + except Exception as e: business_logger.error(f"获取chunk摘要、标签和人物形象失败: end_user_id={end_user_id} - {str(e)}") raise @@ -709,42 +717,40 @@ async def get_chunk_insight( ) -> dict: """ 获取chunk的洞察分析 - - Args: - end_user_id: 宿主ID - limit: 返回的chunk数量限制 - db: 数据库会话 - current_user: 当前用户 - - Returns: - 包含insight的字典 + 优先返回end_user表中的缓存,若无缓存则实时生成并写库 """ + from app.repositories.end_user_repository import EndUserRepository + business_logger.info(f"获取chunk洞察: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}") - + try: - # 1. 获取chunk内容 + repo = EndUserRepository(db) + end_user = repo.get_by_id(uuid.UUID(end_user_id)) + + # 读缓存 + if end_user and end_user.memory_insight: + business_logger.info(f"命中缓存,直接返回end_user {end_user_id} 的洞察") + return {"insight": end_user.memory_insight} + + # 无缓存:实时生成 rag_content = get_rag_content(end_user_id, limit, db, current_user) chunks = rag_content.get("contents", []) - + if not chunks: business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}") - return { - "insight": "暂无足够数据生成洞察报告" - } - - # 2. 导入RAG工具函数 + return {"insight": "暂无足够数据生成洞察报告"} + from app.core.rag_utils import generate_chunk_insight - - # 3. 生成洞察 + insight = await generate_chunk_insight(chunks, max_chunks=limit, end_user_id=end_user_id) - - result = { - "insight": insight - } - + + # 写库缓存 + if end_user: + repo.update_rag_insight(end_user_id=end_user.id, memory_insight=insight) + business_logger.info("成功获取chunk洞察") - return result - + return {"insight": insight} + except Exception as e: business_logger.error(f"获取chunk洞察失败: end_user_id={end_user_id} - {str(e)}") raise \ No newline at end of file From fd556f9b00b8fdbb2c0977abfa4f4d69a16de928 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 10 Mar 2026 11:51:17 +0800 Subject: [PATCH 3/6] [add] Generate user summaries and memory insights using Jinja2 tags --- api/app/core/rag_utils/__init__.py | 3 +- api/app/core/rag_utils/chunk_insight.py | 310 +++++++++---------- api/app/core/rag_utils/chunk_summary.py | 150 ++++----- api/app/services/memory_dashboard_service.py | 22 +- 4 files changed, 242 insertions(+), 243 deletions(-) diff --git a/api/app/core/rag_utils/__init__.py b/api/app/core/rag_utils/__init__.py index d5a8ce1c..0efe1938 100644 --- a/api/app/core/rag_utils/__init__.py +++ b/api/app/core/rag_utils/__init__.py @@ -4,11 +4,12 @@ RAG chunk analysis utilities. from .chunk_summary import generate_chunk_summary from .chunk_tags import extract_chunk_tags, extract_chunk_persona -from .chunk_insight import generate_chunk_insight +from .chunk_insight import generate_chunk_insight, generate_chunk_insight_sections __all__ = [ "generate_chunk_summary", "extract_chunk_tags", "extract_chunk_persona", "generate_chunk_insight", + "generate_chunk_insight_sections", ] diff --git a/api/app/core/rag_utils/chunk_insight.py b/api/app/core/rag_utils/chunk_insight.py index 9fbdbbb2..b46ce3c9 100644 --- a/api/app/core/rag_utils/chunk_insight.py +++ b/api/app/core/rag_utils/chunk_insight.py @@ -1,24 +1,33 @@ """ -Generate insights from RAG chunks. +Generate memory insight report for RAG chunks using memory_insight.jinja2 prompt template. -This module provides functionality to analyze chunk content and generate insights using LLM. +The memory_insight.jinja2 template produces a four-section report: + 【总体概述】 → memory_insight + 【行为模式】 → behavior_pattern + 【关键发现】 → key_findings + 【成长轨迹】 → growth_trajectory + +generate_chunk_insight() returns the full raw text (stored in end_user.memory_insight). +generate_chunk_insight_sections() returns a dict with all four fields for richer storage. """ import asyncio import os +import re from collections import Counter -from typing import Any, Dict, List, Optional +from typing import Dict, List, Optional from app.core.logging_config import get_business_logger from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.db import get_db_context -from pydantic import BaseModel, Field business_logger = get_business_logger() DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus") +# ── LLM client helper ──────────────────────────────────────────────────────── + def _get_llm_client(end_user_id: Optional[str] = None): """Get LLM client, preferring user-connected config with fallback to default.""" with get_db_context() as db: @@ -43,191 +52,156 @@ def _get_llm_client(end_user_id: Optional[str] = None): return factory.get_llm_client(DEFAULT_LLM_ID) -class ChunkInsight(BaseModel): - """Pydantic model for chunk insight.""" - insight: str = Field(..., description="对chunk内容的深度洞察分析") +# ── Domain analysis helpers (kept for building prompt inputs) ───────────────── +async def _classify_domain(chunk: str, llm_client) -> str: + """Classify a single chunk into a domain category.""" + from pydantic import BaseModel, Field -class DomainClassification(BaseModel): - """Pydantic model for domain classification.""" - domain: str = Field( - ..., - description="内容所属的领域分类", - examples=["技术", "商业", "教育", "生活", "娱乐", "健康", "其他"] - ) + class _Domain(BaseModel): + domain: str = Field(..., description="领域分类") - -async def classify_chunk_domain(chunk: str, end_user_id: Optional[str] = None) -> str: - """ - Classify a chunk into a specific domain. - - Args: - chunk: Chunk content string - - Returns: - Domain name - """ try: - llm_client = _get_llm_client(end_user_id) - - prompt = f"""请将以下文本内容归类到最合适的领域中。 - -可选领域及其关键词: -- 技术:编程、软件、硬件、算法、数据、网络、系统、开发、工程等 -- 商业:市场、销售、管理、财务、投资、创业、营销、战略等 -- 教育:学习、课程、培训、教学、知识、技能、考试、研究等 -- 生活:日常、家庭、饮食、购物、旅行、休闲、娱乐等 -- 娱乐:游戏、电影、音乐、体育、艺术、文化等 -- 健康:医疗、养生、运动、心理、保健、疾病等 -- 其他:无法归入以上类别的内容 - -文本内容: {chunk[:500]}... - -请直接返回最合适的领域名称。""" - - messages = [ - {"role": "system", "content": "你是一个专业的文本分类助手。请仔细分析文本内容,选择最合适的领域分类。"}, - {"role": "user", "content": prompt} - ] - - classification = await llm_client.response_structured( - messages=messages, - response_model=DomainClassification + prompt = ( + "请将以下文本归类到最合适的领域(技术/商业/教育/生活/娱乐/健康/其他)。\n\n" + f"文本: {chunk[:500]}\n\n直接返回领域名称。" ) - - return classification.domain if classification else "其他" - - except Exception as e: - business_logger.error(f"分类chunk领域失败: {str(e)}") + result = await llm_client.response_structured( + messages=[{"role": "user", "content": prompt}], + response_model=_Domain, + ) + return result.domain if result else "其他" + except Exception: return "其他" -async def analyze_domain_distribution(chunks: List[str], max_chunks: int = 20, end_user_id: Optional[str] = None) -> Dict[str, float]: +async def _build_insight_inputs( + chunks: List[str], + max_chunks: int, + end_user_id: Optional[str], +) -> Dict[str, str]: """ - Analyze the domain distribution of chunks. - - Args: - chunks: List of chunk content strings - max_chunks: Maximum number of chunks to analyze - - Returns: - Dictionary of domain -> percentage + Derive domain_distribution, active_periods, social_connections strings + to feed into the memory_insight.jinja2 template. """ - if not chunks: - return {} - - try: - # 限制分析的chunk数量 - chunks_to_analyze = chunks[:max_chunks] - - # 为每个chunk分类 - domain_counts = Counter() - for chunk in chunks_to_analyze: - domain = await classify_chunk_domain(chunk, end_user_id) - domain_counts[domain] += 1 - - # 计算百分比 - total = sum(domain_counts.values()) - domain_distribution = { - domain: count / total - for domain, count in domain_counts.items() - } - - # 按百分比降序排序 - return dict(sorted(domain_distribution.items(), key=lambda x: x[1], reverse=True)) - - except Exception as e: - business_logger.error(f"分析领域分布失败: {str(e)}") - return {} + llm_client = _get_llm_client(end_user_id) + chunks_sample = chunks[:max_chunks] + + # Domain distribution + domain_counts: Counter = Counter() + for chunk in chunks_sample: + domain = await _classify_domain(chunk, llm_client) + domain_counts[domain] += 1 + + total = sum(domain_counts.values()) or 1 + domain_distribution = ", ".join( + f"{d}({c / total:.0%})" for d, c in domain_counts.most_common(3) + ) + + return { + "domain_distribution": domain_distribution, + "active_periods": None, # RAG模式暂无时间维度数据 + "social_connections": None, # RAG模式暂无社交关联数据 + } -async def generate_chunk_insight(chunks: List[str], max_chunks: int = 15, end_user_id: Optional[str] = None) -> str: +# ── Section parser ──────────────────────────────────────────────────────────── + +_ZH_SECTIONS = { + "memory_insight": r"【总体概述】(.*?)(?=【|$)", + "behavior_pattern": r"【行为模式】(.*?)(?=【|$)", + "key_findings": r"【关键发现】(.*?)(?=【|$)", + "growth_trajectory": r"【成长轨迹】(.*?)(?=【|$)", +} + +_EN_SECTIONS = { + "memory_insight": r"【Overview】(.*?)(?=【|$)", + "behavior_pattern": r"【Behavior Pattern】(.*?)(?=【|$)", + "key_findings": r"【Key Findings】(.*?)(?=【|$)", + "growth_trajectory": r"【Growth Trajectory】(.*?)(?=【|$)", +} + + +def _parse_sections(text: str, language: str = "zh") -> Dict[str, str]: + """Extract the four sections from the LLM output.""" + patterns = _ZH_SECTIONS if language == "zh" else _EN_SECTIONS + result = {} + for key, pattern in patterns.items(): + match = re.search(pattern, text, re.DOTALL) + result[key] = match.group(1).strip() if match else "" + return result + + +# ── Public API ──────────────────────────────────────────────────────────────── + +async def generate_chunk_insight( + chunks: List[str], + max_chunks: int = 15, + end_user_id: Optional[str] = None, + language: str = "zh", +) -> str: """ - Generate insights from the given chunks. - - Args: - chunks: List of chunk content strings - max_chunks: Maximum number of chunks to analyze - - Returns: - A comprehensive insight report + Generate a memory insight report from RAG chunks. + + Returns the full raw report text (suitable for end_user.memory_insight). + Use generate_chunk_insight_sections() when you need all four dimensions. + """ + sections = await generate_chunk_insight_sections( + chunks=chunks, + max_chunks=max_chunks, + end_user_id=end_user_id, + language=language, + ) + return sections.get("memory_insight") or sections.get("_raw", "洞察生成失败") + + +async def generate_chunk_insight_sections( + chunks: List[str], + max_chunks: int = 15, + end_user_id: Optional[str] = None, + language: str = "zh", +) -> Dict[str, str]: + """ + Generate a four-section memory insight report from RAG chunks. + + Returns a dict with keys: + memory_insight, behavior_pattern, key_findings, growth_trajectory + (plus '_raw' containing the full LLM output for debugging) """ if not chunks: business_logger.warning("没有提供chunk内容用于生成洞察") - return "暂无足够数据生成洞察报告" - + empty = {k: "" for k in ("memory_insight", "behavior_pattern", "key_findings", "growth_trajectory")} + empty["_raw"] = "暂无足够数据生成洞察报告" + return empty + try: - # 1. 分析领域分布 - domain_dist = await analyze_domain_distribution(chunks, max_chunks=max_chunks, end_user_id=end_user_id) - - # 2. 统计基本信息 - total_chunks = len(chunks) - avg_length = sum(len(chunk) for chunk in chunks) / total_chunks if total_chunks > 0 else 0 - - # 3. 构建洞察prompt - prompt_parts = [] - - if domain_dist: - top_domains = ", ".join([f"{k}({v:.0%})" for k, v in list(domain_dist.items())[:3]]) - prompt_parts.append(f"- 内容领域分布: {top_domains}") - - prompt_parts.append(f"- 内容规模: 共{total_chunks}个知识片段,平均长度{avg_length:.0f}字") - - # 添加部分chunk内容作为参考 - sample_chunks = chunks[:5] - sample_content = "\n".join([f"示例{i+1}: {chunk[:200]}..." for i, chunk in enumerate(sample_chunks)]) - prompt_parts.append(f"\n内容示例:\n{sample_content}") - - system_prompt = """你是一位专业的知识内容分析师。你的任务是根据提供的信息,生成一段简洁、有洞察力的分析报告。 + from app.core.memory.utils.prompt.prompt_utils import render_memory_insight_prompt -重要规则: -1. 报告需要将所有要点流畅地串联成一个段落 -2. 语言风格要专业、客观,同时易于理解 -3. 不要添加任何额外的解释或标题,直接输出报告内容 -4. 基于提供的数据和示例内容进行分析,不要编造信息 -5. 重点关注内容的主题、特点和价值 -6. 报告长度控制在150-200字 + # Build template inputs from chunk analysis + inputs = await _build_insight_inputs(chunks, max_chunks, end_user_id) -例如,如果输入是: -- 内容领域分布: 技术(60%), 商业(25%), 教育(15%) -- 内容规模: 共50个知识片段,平均长度320字 -内容示例: [示例内容...] + rendered_prompt = await render_memory_insight_prompt( + domain_distribution=inputs["domain_distribution"], + active_periods=inputs["active_periods"], + social_connections=inputs["social_connections"], + language=language, + ) -你的输出应该类似: -"该知识库主要聚焦于技术领域(60%),涵盖商业(25%)和教育(15%)相关内容。共包含50个知识片段,平均每个片段约320字,内容详实。从示例来看,内容涉及[具体主题],体现了[特点],对[目标用户]具有较高的参考价值。" -""" - - user_prompt = "\n".join(prompt_parts) - - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt} - ] - - # 调用LLM生成洞察 + messages = [{"role": "user", "content": rendered_prompt}] llm_client = _get_llm_client(end_user_id) response = await llm_client.chat(messages=messages) - - insight = response.content.strip() - business_logger.info(f"成功生成chunk洞察,分析了 {min(len(chunks), max_chunks)} 个片段") - - return insight - + raw_text = response.content.strip() if response and response.content else "" + + sections = _parse_sections(raw_text, language=language) + sections["_raw"] = raw_text + + business_logger.info( + f"成功生成chunk洞察四维度,分析了 {min(len(chunks), max_chunks)} 个片段" + ) + return sections + except Exception as e: business_logger.error(f"生成chunk洞察失败: {str(e)}") - return "洞察生成失败" - - -if __name__ == "__main__": - # 测试代码 - test_chunks = [ - "Python是一种高级编程语言,以其简洁的语法和强大的功能而闻名。它广泛应用于Web开发、数据分析、人工智能等领域。", - "机器学习算法可以从数据中自动学习模式,无需显式编程。常见的算法包括决策树、随机森林、神经网络等。", - "深度学习是机器学习的一个分支,使用多层神经网络来学习数据的层次化表示。它在图像识别、语音识别等任务中表现出色。", - "自然语言处理技术使计算机能够理解和生成人类语言。应用包括机器翻译、情感分析、文本摘要等。", - "数据科学结合了统计学、计算机科学和领域知识,用于从数据中提取有价值的洞察。" - ] - - print("开始生成chunk洞察...") - insight = asyncio.run(generate_chunk_insight(test_chunks)) - print(f"\n生成的洞察:\n{insight}") + empty = {k: "" for k in ("memory_insight", "behavior_pattern", "key_findings", "growth_trajectory")} + empty["_raw"] = "洞察生成失败" + return empty diff --git a/api/app/core/rag_utils/chunk_summary.py b/api/app/core/rag_utils/chunk_summary.py index 53df2ab3..dd1c904e 100644 --- a/api/app/core/rag_utils/chunk_summary.py +++ b/api/app/core/rag_utils/chunk_summary.py @@ -1,12 +1,10 @@ """ -Generate summary for RAG chunks. - -This module provides functionality to summarize chunk content using LLM. +Generate summary for RAG chunks using memory_summary.jinja2 prompt template. """ import asyncio import os -from typing import Any, Dict, List, Optional +from typing import List, Optional from app.core.logging_config import get_business_logger from app.core.memory.utils.llm.llm_utils import MemoryClientFactory @@ -18,6 +16,29 @@ business_logger = get_business_logger() DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus") +# ── Schema ────────────────────────────────────────────────────────────────── + +class MemorySummaryStatement(BaseModel): + """Single labelled statement extracted by memory_summary.jinja2.""" + statement: str = Field(..., description="提取的陈述内容") + label: Optional[str] = Field(None, description="陈述标签") + + +class MemorySummaryResponse(BaseModel): + """ + Structured output expected from memory_summary.jinja2. + The template asks for a JSON array of labelled statements; + we wrap it in an object so response_structured can parse it. + """ + statements: List[MemorySummaryStatement] = Field( + default_factory=list, + description="从chunk中提取的陈述列表" + ) + summary: Optional[str] = Field(None, description="整体摘要文本(可选)") + + +# ── LLM client helper ──────────────────────────────────────────────────────── + def _get_llm_client(end_user_id: Optional[str] = None): """Get LLM client, preferring user-connected config with fallback to default.""" with get_db_context() as db: @@ -42,86 +63,75 @@ def _get_llm_client(end_user_id: Optional[str] = None): return factory.get_llm_client(DEFAULT_LLM_ID) -class ChunkSummary(BaseModel): - """Pydantic model for chunk summary.""" - summary: str = Field(..., description="简洁的chunk内容摘要") +# ── Core function ───────────────────────────────────────────────────────────── - -async def generate_chunk_summary(chunks: List[str], max_chunks: int = 10, end_user_id: Optional[str] = None) -> str: +async def generate_chunk_summary( + chunks: List[str], + max_chunks: int = 10, + end_user_id: Optional[str] = None, + language: str = "zh", +) -> str: """ - Generate a summary for the given chunks. - + Generate a user summary from RAG chunks using the memory_summary.jinja2 template. + + The template extracts labelled statements from the chunks; we then join them + into a coherent summary string that can be stored in end_user.user_summary. + Args: chunks: List of chunk content strings - max_chunks: Maximum number of chunks to process (default: 10) - + max_chunks: Maximum number of chunks to process + end_user_id: Optional end-user ID for model selection + language: Output language ("zh" or "en") + Returns: - A concise summary of the chunks + Summary string (joined statements or fallback text) """ if not chunks: business_logger.warning("没有提供chunk内容用于生成摘要") return "暂无内容" - + try: - # 限制处理的chunk数量,避免token过多 + from app.core.memory.utils.prompt.prompt_utils import render_memory_summary_prompt + chunks_to_process = chunks[:max_chunks] - - # 合并chunk内容 - combined_content = "\n\n".join([f"片段{i+1}: {chunk}" for i, chunk in enumerate(chunks_to_process)]) - - # 构建prompt - system_prompt = ( - "你是一位专业的文本摘要助手。请基于提供的文本片段,生成简洁的摘要。要求:\n" - "- 摘要长度控制在100-150字;\n" - "- 提取核心信息和关键要点;\n" - "- 使用客观、清晰的语言;\n" - "- 避免冗余和重复;\n" - "- 如果内容涉及多个主题,按重要性排序呈现。" + chunk_texts = "\n\n".join( + [f"片段{i + 1}: {chunk}" for i, chunk in enumerate(chunks_to_process)] ) - - user_prompt = f"请为以下文本片段生成摘要:\n\n{combined_content}" - - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] - - # 调用LLM生成摘要 + + json_schema = MemorySummaryResponse.model_json_schema() + + rendered_prompt = await render_memory_summary_prompt( + chunk_texts=chunk_texts, + json_schema=json_schema, + max_words=200, + language=language, + ) + + messages = [{"role": "user", "content": rendered_prompt}] + llm_client = _get_llm_client(end_user_id) - response = await llm_client.chat(messages=messages) - - summary = response.content.strip() - business_logger.info(f"成功生成chunk摘要,处理了 {len(chunks_to_process)} 个片段") - + + # Try structured output first; fall back to plain chat if unsupported + try: + response: MemorySummaryResponse = await llm_client.response_structured( + messages=messages, + response_model=MemorySummaryResponse, + ) + if response.summary: + summary = response.summary.strip() + elif response.statements: + summary = ";".join(s.statement for s in response.statements) + else: + summary = "暂无内容" + except Exception: + raw = await llm_client.chat(messages=messages) + summary = raw.content.strip() if raw and raw.content else "暂无内容" + + business_logger.info( + f"成功生成chunk摘要,处理了 {len(chunks_to_process)} 个片段" + ) return summary - + except Exception as e: business_logger.error(f"生成chunk摘要失败: {str(e)}") return "摘要生成失败" - - -async def generate_chunk_summary_batch(chunks_list: List[List[str]]) -> List[str]: - """ - Generate summaries for multiple chunk lists in batch. - - Args: - chunks_list: List of chunk lists - - Returns: - List of summaries - """ - tasks = [generate_chunk_summary(chunks) for chunks in chunks_list] - return await asyncio.gather(*tasks) - - -if __name__ == "__main__": - # 测试代码 - test_chunks = [ - "这是第一段测试内容,讲述了关于机器学习的基础知识。", - "第二段内容介绍了深度学习的应用场景和发展历史。", - "第三段讨论了自然语言处理技术的最新进展。" - ] - - print("开始生成chunk摘要...") - summary = asyncio.run(generate_chunk_summary(test_chunks)) - print(f"\n生成的摘要:\n{summary}") diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 6559ef2f..cbe0bd80 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -740,13 +740,27 @@ async def get_chunk_insight( business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}") return {"insight": "暂无足够数据生成洞察报告"} - from app.core.rag_utils import generate_chunk_insight + from app.core.rag_utils import generate_chunk_insight_sections - insight = await generate_chunk_insight(chunks, max_chunks=limit, end_user_id=end_user_id) + sections = await generate_chunk_insight_sections(chunks, max_chunks=limit, end_user_id=end_user_id) + insight = sections.get("memory_insight") or sections.get("_raw", "") - # 写库缓存 + # 写库缓存(四维度全部入库) if end_user: - repo.update_rag_insight(end_user_id=end_user.id, memory_insight=insight) + from app.repositories.end_user_repository import EndUserRepository as _Repo + _repo = _Repo(db) + _repo.update_memory_insight( + end_user_id=end_user.id, + memory_insight=insight, + behavior_pattern=sections.get("behavior_pattern", ""), + key_findings=sections.get("key_findings", ""), + growth_trajectory=sections.get("growth_trajectory", ""), + ) + # 同时标记 storage_type 为 rag + db.query(end_user.__class__).filter( + end_user.__class__.id == end_user.id + ).update({"storage_type": "rag"}, synchronize_session=False) + db.commit() business_logger.info("成功获取chunk洞察") return {"insight": insight} From 424d2033ea7b88199c382c5224b8d12a8bfbf705 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 10 Mar 2026 12:11:13 +0800 Subject: [PATCH 4/6] [add] Added an interface for refreshing RAG storage image data --- .../memory_dashboard_controller.py | 74 ++++--- api/app/services/memory_dashboard_service.py | 190 +++++++++--------- 2 files changed, 143 insertions(+), 121 deletions(-) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 50e8ec8f..1582a042 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -422,26 +422,18 @@ async def get_chunk_summary_tag( current_user: User = Depends(get_current_user), ): """ - 获取chunk总结、提取的标签和人物形象 - + 读取RAG摘要、标签和人物形象(纯读库,不触发生成)。 + 返回格式: { - "summary": "chunk内容的总结", - "tags": [ - {"tag": "标签1", "frequency": 5}, - {"tag": "标签2", "frequency": 3}, - ... - ], - "personas": [ - "产品设计师", - "旅行爱好者", - "摄影发烧友", - ... - ] + "summary": "用户摘要", + "tags": [{"tag": "标签1", "frequency": 5}, ...], + "personas": ["产品设计师", ...], + "generated": true/false // false表示尚未生产,请调用 /generate_rag_profile } """ - api_logger.info(f"用户 {current_user.username} 请求获取宿主 {end_user_id} 的chunk摘要、标签和人物形象") - + api_logger.info(f"用户 {current_user.username} 读取宿主 {end_user_id} 的RAG摘要/标签/人物形象") + data = await memory_dashboard_service.get_chunk_summary_and_tags( end_user_id=end_user_id, limit=limit, @@ -449,9 +441,8 @@ async def get_chunk_summary_tag( db=db, current_user=current_user ) - - api_logger.info(f"成功获取chunk摘要、{len(data.get('tags', []))} 个标签和 {len(data.get('personas', []))} 个人物形象") - return success(data=data, msg="chunk摘要、标签和人物形象获取成功") + + return success(data=data, msg="获取成功") @router.get("/chunk_insight", response_model=ApiResponse) @@ -462,24 +453,53 @@ async def get_chunk_insight( current_user: User = Depends(get_current_user), ): """ - 获取chunk的洞察内容 - + 读取RAG洞察报告(纯读库,不触发生成)。 + 返回格式: { - "insight": "对chunk内容的深度洞察分析" + "insight": "总体概述", + "behavior_pattern": "行为模式", + "key_findings": "关键发现", + "growth_trajectory": "成长轨迹", + "generated": true/false // false表示尚未生产,请调用 /generate_rag_profile } """ - api_logger.info(f"用户 {current_user.username} 请求获取宿主 {end_user_id} 的chunk洞察") - + api_logger.info(f"用户 {current_user.username} 读取宿主 {end_user_id} 的RAG洞察") + data = await memory_dashboard_service.get_chunk_insight( end_user_id=end_user_id, limit=limit, db=db, current_user=current_user ) - - api_logger.info("成功获取chunk洞察") - return success(data=data, msg="chunk洞察获取成功") + + return success(data=data, msg="获取成功") + + +@router.post("/generate_rag_profile", response_model=ApiResponse) +async def generate_rag_profile( + end_user_id: str = Query(..., description="宿主ID"), + limit: int = Query(15, description="参与生成的chunk数量上限"), + max_tags: int = Query(10, description="最大标签数量"), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """ + 生产接口:为RAG存储模式的宿主全量重新生成完整画像并持久化到end_user表。 + 每次请求都会重新生成,覆盖已有数据。 + """ + api_logger.info(f"用户 {current_user.username} 触发RAG画像生产: end_user_id={end_user_id}") + + data = await memory_dashboard_service.generate_rag_profile( + end_user_id=end_user_id, + limit=limit, + max_tags=max_tags, + db=db, + current_user=current_user, + ) + + api_logger.info(f"RAG画像生产完成: {data}") + return success(data=data, msg="RAG画像生产完成") @router.get("/dashboard_data", response_model=ApiResponse) diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index cbe0bd80..db49c50a 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -646,67 +646,26 @@ async def get_chunk_summary_and_tags( current_user: User ) -> dict: """ - 获取chunk的总结、标签和人物形象 - 优先返回end_user表中的缓存,若无缓存则实时生成并写库 + 纯读库:从end_user表返回RAG摘要、标签和人物形象缓存。 + 无数据时返回空结构,不触发LLM生成。 """ import json from app.repositories.end_user_repository import EndUserRepository - business_logger.info(f"获取chunk摘要、标签和人物形象: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}") + business_logger.info(f"读取chunk摘要/标签/人物形象缓存: end_user_id={end_user_id}") - try: - repo = EndUserRepository(db) - end_user = repo.get_by_id(uuid.UUID(end_user_id)) + repo = EndUserRepository(db) + end_user = repo.get_by_id(uuid.UUID(end_user_id)) - # 读缓存:user_summary / rag_tags / rag_personas 均有值时直接返回 - if ( - end_user - and end_user.user_summary - and end_user.rag_tags - and end_user.rag_personas - ): - business_logger.info(f"命中缓存,直接返回end_user {end_user_id} 的摘要/标签/人物形象") - return { - "summary": end_user.user_summary, - "tags": json.loads(end_user.rag_tags), - "personas": json.loads(end_user.rag_personas), - } + if not end_user: + return {"summary": "", "tags": [], "personas": [], "generated": False} - # 无缓存:实时生成 - rag_content = get_rag_content(end_user_id, limit, db, current_user) - chunks = rag_content.get("contents", []) - - if not chunks: - business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}") - return {"summary": "暂无内容", "tags": [], "personas": []} - - from app.core.rag_utils import generate_chunk_summary, extract_chunk_tags, extract_chunk_persona - import asyncio - - summary, tags_with_freq, personas = await asyncio.gather( - generate_chunk_summary(chunks, max_chunks=limit, end_user_id=end_user_id), - extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit, end_user_id=end_user_id), - extract_chunk_persona(chunks, max_personas=5, max_chunks=limit, end_user_id=end_user_id), - ) - - tags = [{"tag": tag, "frequency": freq} for tag, freq in tags_with_freq] - - # 写库缓存 - if end_user: - repo.update_rag_summary_tags( - end_user_id=end_user.id, - user_summary=summary, - rag_tags=json.dumps(tags, ensure_ascii=False), - rag_personas=json.dumps(personas, ensure_ascii=False), - ) - - result = {"summary": summary, "tags": tags, "personas": personas} - business_logger.info(f"成功获取chunk摘要、{len(tags)} 个标签和 {len(personas)} 个人物形象") - return result - - except Exception as e: - business_logger.error(f"获取chunk摘要、标签和人物形象失败: end_user_id={end_user_id} - {str(e)}") - raise + return { + "summary": end_user.user_summary or "", + "tags": json.loads(end_user.rag_tags) if end_user.rag_tags else [], + "personas": json.loads(end_user.rag_personas) if end_user.rag_personas else [], + "generated": bool(end_user.user_summary), + } async def get_chunk_insight( @@ -716,55 +675,98 @@ async def get_chunk_insight( current_user: User ) -> dict: """ - 获取chunk的洞察分析 - 优先返回end_user表中的缓存,若无缓存则实时生成并写库 + 纯读库:从end_user表返回RAG洞察缓存。 + 无数据时返回空结构,不触发LLM生成。 """ from app.repositories.end_user_repository import EndUserRepository - business_logger.info(f"获取chunk洞察: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}") + business_logger.info(f"读取chunk洞察缓存: end_user_id={end_user_id}") - try: - repo = EndUserRepository(db) - end_user = repo.get_by_id(uuid.UUID(end_user_id)) + repo = EndUserRepository(db) + end_user = repo.get_by_id(uuid.UUID(end_user_id)) - # 读缓存 - if end_user and end_user.memory_insight: - business_logger.info(f"命中缓存,直接返回end_user {end_user_id} 的洞察") - return {"insight": end_user.memory_insight} + if not end_user: + return {"insight": "", "behavior_pattern": "", "key_findings": "", "growth_trajectory": "", "generated": False} - # 无缓存:实时生成 - rag_content = get_rag_content(end_user_id, limit, db, current_user) - chunks = rag_content.get("contents", []) + return { + "insight": end_user.memory_insight or "", + "behavior_pattern": end_user.behavior_pattern or "", + "key_findings": end_user.key_findings or "", + "growth_trajectory": end_user.growth_trajectory or "", + "generated": bool(end_user.memory_insight), + } - if not chunks: - business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}") - return {"insight": "暂无足够数据生成洞察报告"} - from app.core.rag_utils import generate_chunk_insight_sections +async def generate_rag_profile( + end_user_id: str, + limit: int, + max_tags: int, + db: Session, + current_user: User, +) -> dict: + """ + 生产接口:为RAG存储模式的end_user全量重新生成并持久化完整画像数据。 + 每次调用都会重新生成,覆盖已有数据。 - sections = await generate_chunk_insight_sections(chunks, max_chunks=limit, end_user_id=end_user_id) - insight = sections.get("memory_insight") or sections.get("_raw", "") + 生成内容: + - user_summary / rag_tags / rag_personas + - memory_insight / behavior_pattern / key_findings / growth_trajectory + """ + import json + import asyncio + from app.repositories.end_user_repository import EndUserRepository + from app.core.rag_utils import ( + generate_chunk_summary, + extract_chunk_tags, + extract_chunk_persona, + generate_chunk_insight_sections, + ) - # 写库缓存(四维度全部入库) - if end_user: - from app.repositories.end_user_repository import EndUserRepository as _Repo - _repo = _Repo(db) - _repo.update_memory_insight( - end_user_id=end_user.id, - memory_insight=insight, - behavior_pattern=sections.get("behavior_pattern", ""), - key_findings=sections.get("key_findings", ""), - growth_trajectory=sections.get("growth_trajectory", ""), - ) - # 同时标记 storage_type 为 rag - db.query(end_user.__class__).filter( - end_user.__class__.id == end_user.id - ).update({"storage_type": "rag"}, synchronize_session=False) - db.commit() + business_logger.info(f"开始生产RAG画像: end_user_id={end_user_id}, 操作者: {current_user.username}") - business_logger.info("成功获取chunk洞察") - return {"insight": insight} + repo = EndUserRepository(db) + end_user = repo.get_by_id(uuid.UUID(end_user_id)) - except Exception as e: - business_logger.error(f"获取chunk洞察失败: end_user_id={end_user_id} - {str(e)}") - raise \ No newline at end of file + if not end_user: + raise ValueError(f"end_user {end_user_id} 不存在") + + rag_content = get_rag_content(end_user_id, limit, db, current_user) + chunks = rag_content.get("contents", []) + + if not chunks: + business_logger.warning(f"未找到chunk内容,无法生产RAG画像: end_user_id={end_user_id}") + raise ValueError("暂无chunk内容,无法生成画像") + + summary, tags_with_freq, personas, insight_sections = await asyncio.gather( + generate_chunk_summary(chunks, max_chunks=limit, end_user_id=end_user_id), + extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit, end_user_id=end_user_id), + extract_chunk_persona(chunks, max_personas=5, max_chunks=limit, end_user_id=end_user_id), + generate_chunk_insight_sections(chunks, max_chunks=limit, end_user_id=end_user_id), + ) + + tags = [{"tag": tag, "frequency": freq} for tag, freq in tags_with_freq] + + repo.update_rag_summary_tags( + end_user_id=end_user.id, + user_summary=summary, + rag_tags=json.dumps(tags, ensure_ascii=False), + rag_personas=json.dumps(personas, ensure_ascii=False), + ) + + repo.update_memory_insight( + end_user_id=end_user.id, + memory_insight=insight_sections.get("memory_insight", ""), + behavior_pattern=insight_sections.get("behavior_pattern", ""), + key_findings=insight_sections.get("key_findings", ""), + growth_trajectory=insight_sections.get("growth_trajectory", ""), + ) + + business_logger.info(f"RAG画像生产完成: end_user_id={end_user_id}, tags={len(tags)}, personas={len(personas)}") + + return { + "end_user_id": end_user_id, + "summary_length": len(summary), + "tags_count": len(tags), + "personas_count": len(personas), + "insight_generated": bool(insight_sections.get("memory_insight")), + } \ No newline at end of file From 58aa60ca0e8b3b45c81306e11efa174d161d14d7 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 10 Mar 2026 13:39:24 +0800 Subject: [PATCH 5/6] [add] Change to "Body - json" format and pass as parameters --- .../memory_dashboard_controller.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 1582a042..3bbb5cf7 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -1,4 +1,5 @@ from fastapi import APIRouter, Depends, HTTPException, status, Query +from pydantic import BaseModel, Field from sqlalchemy.orm import Session from typing import Optional from app.core.response_utils import success @@ -476,11 +477,15 @@ async def get_chunk_insight( return success(data=data, msg="获取成功") +class GenerateRagProfileRequest(BaseModel): + end_user_id: str = Field(..., description="宿主ID") + limit: int = Field(15, description="参与生成的chunk数量上限") + max_tags: int = Field(10, description="最大标签数量") + + @router.post("/generate_rag_profile", response_model=ApiResponse) async def generate_rag_profile( - end_user_id: str = Query(..., description="宿主ID"), - limit: int = Query(15, description="参与生成的chunk数量上限"), - max_tags: int = Query(10, description="最大标签数量"), + body: GenerateRagProfileRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): @@ -488,12 +493,12 @@ async def generate_rag_profile( 生产接口:为RAG存储模式的宿主全量重新生成完整画像并持久化到end_user表。 每次请求都会重新生成,覆盖已有数据。 """ - api_logger.info(f"用户 {current_user.username} 触发RAG画像生产: end_user_id={end_user_id}") + api_logger.info(f"用户 {current_user.username} 触发RAG画像生产: end_user_id={body.end_user_id}") data = await memory_dashboard_service.generate_rag_profile( - end_user_id=end_user_id, - limit=limit, - max_tags=max_tags, + end_user_id=body.end_user_id, + limit=body.limit, + max_tags=body.max_tags, db=db, current_user=current_user, ) From 10560fb34c7fcb012aa1deff71302ea748e1e6a4 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 10 Mar 2026 13:55:53 +0800 Subject: [PATCH 6/6] [changes] Clearly stipulated, the conditions for raising an error --- api/app/core/rag_utils/chunk_insight.py | 2 +- api/app/core/rag_utils/chunk_summary.py | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/api/app/core/rag_utils/chunk_insight.py b/api/app/core/rag_utils/chunk_insight.py index b46ce3c9..935b1449 100644 --- a/api/app/core/rag_utils/chunk_insight.py +++ b/api/app/core/rag_utils/chunk_insight.py @@ -79,7 +79,7 @@ async def _build_insight_inputs( chunks: List[str], max_chunks: int, end_user_id: Optional[str], -) -> Dict[str, str]: +) -> Dict[str, Optional[str]]: """ Derive domain_distribution, active_periods, social_connections strings to feed into the memory_insight.jinja2 template. diff --git a/api/app/core/rag_utils/chunk_summary.py b/api/app/core/rag_utils/chunk_summary.py index dd1c904e..1b0f4395 100644 --- a/api/app/core/rag_utils/chunk_summary.py +++ b/api/app/core/rag_utils/chunk_summary.py @@ -111,7 +111,9 @@ async def generate_chunk_summary( llm_client = _get_llm_client(end_user_id) - # Try structured output first; fall back to plain chat if unsupported + # Try structured output; fall back to plain chat only for LLMClientException + # (indicates the model/provider doesn't support structured output). + # All other exceptions are re-raised so config/schema errors stay visible. try: response: MemorySummaryResponse = await llm_client.response_structured( messages=messages, @@ -123,9 +125,17 @@ async def generate_chunk_summary( summary = ";".join(s.statement for s in response.statements) else: summary = "暂无内容" - except Exception: - raw = await llm_client.chat(messages=messages) - summary = raw.content.strip() if raw and raw.content else "暂无内容" + except Exception as e: + from app.core.memory.llm_tools.llm_client import LLMClientException + if isinstance(e, LLMClientException): + business_logger.warning( + f"结构化输出不可用,降级为普通对话: end_user_id={end_user_id}, reason={e}" + ) + raw = await llm_client.chat(messages=messages) + summary = raw.content.strip() if raw and raw.content else "暂无内容" + else: + business_logger.error(f"生成摘要时发生非预期异常: {e}") + raise business_logger.info( f"成功生成chunk摘要,处理了 {len(chunks_to_process)} 个片段"