From fd556f9b00b8fdbb2c0977abfa4f4d69a16de928 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 10 Mar 2026 11:51:17 +0800 Subject: [PATCH] [add] Generate user summaries and memory insights using Jinja2 tags --- api/app/core/rag_utils/__init__.py | 3 +- api/app/core/rag_utils/chunk_insight.py | 310 +++++++++---------- api/app/core/rag_utils/chunk_summary.py | 150 ++++----- api/app/services/memory_dashboard_service.py | 22 +- 4 files changed, 242 insertions(+), 243 deletions(-) diff --git a/api/app/core/rag_utils/__init__.py b/api/app/core/rag_utils/__init__.py index d5a8ce1c..0efe1938 100644 --- a/api/app/core/rag_utils/__init__.py +++ b/api/app/core/rag_utils/__init__.py @@ -4,11 +4,12 @@ RAG chunk analysis utilities. from .chunk_summary import generate_chunk_summary from .chunk_tags import extract_chunk_tags, extract_chunk_persona -from .chunk_insight import generate_chunk_insight +from .chunk_insight import generate_chunk_insight, generate_chunk_insight_sections __all__ = [ "generate_chunk_summary", "extract_chunk_tags", "extract_chunk_persona", "generate_chunk_insight", + "generate_chunk_insight_sections", ] diff --git a/api/app/core/rag_utils/chunk_insight.py b/api/app/core/rag_utils/chunk_insight.py index 9fbdbbb2..b46ce3c9 100644 --- a/api/app/core/rag_utils/chunk_insight.py +++ b/api/app/core/rag_utils/chunk_insight.py @@ -1,24 +1,33 @@ """ -Generate insights from RAG chunks. +Generate memory insight report for RAG chunks using memory_insight.jinja2 prompt template. -This module provides functionality to analyze chunk content and generate insights using LLM. +The memory_insight.jinja2 template produces a four-section report: + 【总体概述】 → memory_insight + 【行为模式】 → behavior_pattern + 【关键发现】 → key_findings + 【成长轨迹】 → growth_trajectory + +generate_chunk_insight() returns the full raw text (stored in end_user.memory_insight). +generate_chunk_insight_sections() returns a dict with all four fields for richer storage. """ import asyncio import os +import re from collections import Counter -from typing import Any, Dict, List, Optional +from typing import Dict, List, Optional from app.core.logging_config import get_business_logger from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.db import get_db_context -from pydantic import BaseModel, Field business_logger = get_business_logger() DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus") +# ── LLM client helper ──────────────────────────────────────────────────────── + def _get_llm_client(end_user_id: Optional[str] = None): """Get LLM client, preferring user-connected config with fallback to default.""" with get_db_context() as db: @@ -43,191 +52,156 @@ def _get_llm_client(end_user_id: Optional[str] = None): return factory.get_llm_client(DEFAULT_LLM_ID) -class ChunkInsight(BaseModel): - """Pydantic model for chunk insight.""" - insight: str = Field(..., description="对chunk内容的深度洞察分析") +# ── Domain analysis helpers (kept for building prompt inputs) ───────────────── +async def _classify_domain(chunk: str, llm_client) -> str: + """Classify a single chunk into a domain category.""" + from pydantic import BaseModel, Field -class DomainClassification(BaseModel): - """Pydantic model for domain classification.""" - domain: str = Field( - ..., - description="内容所属的领域分类", - examples=["技术", "商业", "教育", "生活", "娱乐", "健康", "其他"] - ) + class _Domain(BaseModel): + domain: str = Field(..., description="领域分类") - -async def classify_chunk_domain(chunk: str, end_user_id: Optional[str] = None) -> str: - """ - Classify a chunk into a specific domain. - - Args: - chunk: Chunk content string - - Returns: - Domain name - """ try: - llm_client = _get_llm_client(end_user_id) - - prompt = f"""请将以下文本内容归类到最合适的领域中。 - -可选领域及其关键词: -- 技术:编程、软件、硬件、算法、数据、网络、系统、开发、工程等 -- 商业:市场、销售、管理、财务、投资、创业、营销、战略等 -- 教育:学习、课程、培训、教学、知识、技能、考试、研究等 -- 生活:日常、家庭、饮食、购物、旅行、休闲、娱乐等 -- 娱乐:游戏、电影、音乐、体育、艺术、文化等 -- 健康:医疗、养生、运动、心理、保健、疾病等 -- 其他:无法归入以上类别的内容 - -文本内容: {chunk[:500]}... - -请直接返回最合适的领域名称。""" - - messages = [ - {"role": "system", "content": "你是一个专业的文本分类助手。请仔细分析文本内容,选择最合适的领域分类。"}, - {"role": "user", "content": prompt} - ] - - classification = await llm_client.response_structured( - messages=messages, - response_model=DomainClassification + prompt = ( + "请将以下文本归类到最合适的领域(技术/商业/教育/生活/娱乐/健康/其他)。\n\n" + f"文本: {chunk[:500]}\n\n直接返回领域名称。" ) - - return classification.domain if classification else "其他" - - except Exception as e: - business_logger.error(f"分类chunk领域失败: {str(e)}") + result = await llm_client.response_structured( + messages=[{"role": "user", "content": prompt}], + response_model=_Domain, + ) + return result.domain if result else "其他" + except Exception: return "其他" -async def analyze_domain_distribution(chunks: List[str], max_chunks: int = 20, end_user_id: Optional[str] = None) -> Dict[str, float]: +async def _build_insight_inputs( + chunks: List[str], + max_chunks: int, + end_user_id: Optional[str], +) -> Dict[str, str]: """ - Analyze the domain distribution of chunks. - - Args: - chunks: List of chunk content strings - max_chunks: Maximum number of chunks to analyze - - Returns: - Dictionary of domain -> percentage + Derive domain_distribution, active_periods, social_connections strings + to feed into the memory_insight.jinja2 template. """ - if not chunks: - return {} - - try: - # 限制分析的chunk数量 - chunks_to_analyze = chunks[:max_chunks] - - # 为每个chunk分类 - domain_counts = Counter() - for chunk in chunks_to_analyze: - domain = await classify_chunk_domain(chunk, end_user_id) - domain_counts[domain] += 1 - - # 计算百分比 - total = sum(domain_counts.values()) - domain_distribution = { - domain: count / total - for domain, count in domain_counts.items() - } - - # 按百分比降序排序 - return dict(sorted(domain_distribution.items(), key=lambda x: x[1], reverse=True)) - - except Exception as e: - business_logger.error(f"分析领域分布失败: {str(e)}") - return {} + llm_client = _get_llm_client(end_user_id) + chunks_sample = chunks[:max_chunks] + + # Domain distribution + domain_counts: Counter = Counter() + for chunk in chunks_sample: + domain = await _classify_domain(chunk, llm_client) + domain_counts[domain] += 1 + + total = sum(domain_counts.values()) or 1 + domain_distribution = ", ".join( + f"{d}({c / total:.0%})" for d, c in domain_counts.most_common(3) + ) + + return { + "domain_distribution": domain_distribution, + "active_periods": None, # RAG模式暂无时间维度数据 + "social_connections": None, # RAG模式暂无社交关联数据 + } -async def generate_chunk_insight(chunks: List[str], max_chunks: int = 15, end_user_id: Optional[str] = None) -> str: +# ── Section parser ──────────────────────────────────────────────────────────── + +_ZH_SECTIONS = { + "memory_insight": r"【总体概述】(.*?)(?=【|$)", + "behavior_pattern": r"【行为模式】(.*?)(?=【|$)", + "key_findings": r"【关键发现】(.*?)(?=【|$)", + "growth_trajectory": r"【成长轨迹】(.*?)(?=【|$)", +} + +_EN_SECTIONS = { + "memory_insight": r"【Overview】(.*?)(?=【|$)", + "behavior_pattern": r"【Behavior Pattern】(.*?)(?=【|$)", + "key_findings": r"【Key Findings】(.*?)(?=【|$)", + "growth_trajectory": r"【Growth Trajectory】(.*?)(?=【|$)", +} + + +def _parse_sections(text: str, language: str = "zh") -> Dict[str, str]: + """Extract the four sections from the LLM output.""" + patterns = _ZH_SECTIONS if language == "zh" else _EN_SECTIONS + result = {} + for key, pattern in patterns.items(): + match = re.search(pattern, text, re.DOTALL) + result[key] = match.group(1).strip() if match else "" + return result + + +# ── Public API ──────────────────────────────────────────────────────────────── + +async def generate_chunk_insight( + chunks: List[str], + max_chunks: int = 15, + end_user_id: Optional[str] = None, + language: str = "zh", +) -> str: """ - Generate insights from the given chunks. - - Args: - chunks: List of chunk content strings - max_chunks: Maximum number of chunks to analyze - - Returns: - A comprehensive insight report + Generate a memory insight report from RAG chunks. + + Returns the full raw report text (suitable for end_user.memory_insight). + Use generate_chunk_insight_sections() when you need all four dimensions. + """ + sections = await generate_chunk_insight_sections( + chunks=chunks, + max_chunks=max_chunks, + end_user_id=end_user_id, + language=language, + ) + return sections.get("memory_insight") or sections.get("_raw", "洞察生成失败") + + +async def generate_chunk_insight_sections( + chunks: List[str], + max_chunks: int = 15, + end_user_id: Optional[str] = None, + language: str = "zh", +) -> Dict[str, str]: + """ + Generate a four-section memory insight report from RAG chunks. + + Returns a dict with keys: + memory_insight, behavior_pattern, key_findings, growth_trajectory + (plus '_raw' containing the full LLM output for debugging) """ if not chunks: business_logger.warning("没有提供chunk内容用于生成洞察") - return "暂无足够数据生成洞察报告" - + empty = {k: "" for k in ("memory_insight", "behavior_pattern", "key_findings", "growth_trajectory")} + empty["_raw"] = "暂无足够数据生成洞察报告" + return empty + try: - # 1. 分析领域分布 - domain_dist = await analyze_domain_distribution(chunks, max_chunks=max_chunks, end_user_id=end_user_id) - - # 2. 统计基本信息 - total_chunks = len(chunks) - avg_length = sum(len(chunk) for chunk in chunks) / total_chunks if total_chunks > 0 else 0 - - # 3. 构建洞察prompt - prompt_parts = [] - - if domain_dist: - top_domains = ", ".join([f"{k}({v:.0%})" for k, v in list(domain_dist.items())[:3]]) - prompt_parts.append(f"- 内容领域分布: {top_domains}") - - prompt_parts.append(f"- 内容规模: 共{total_chunks}个知识片段,平均长度{avg_length:.0f}字") - - # 添加部分chunk内容作为参考 - sample_chunks = chunks[:5] - sample_content = "\n".join([f"示例{i+1}: {chunk[:200]}..." for i, chunk in enumerate(sample_chunks)]) - prompt_parts.append(f"\n内容示例:\n{sample_content}") - - system_prompt = """你是一位专业的知识内容分析师。你的任务是根据提供的信息,生成一段简洁、有洞察力的分析报告。 + from app.core.memory.utils.prompt.prompt_utils import render_memory_insight_prompt -重要规则: -1. 报告需要将所有要点流畅地串联成一个段落 -2. 语言风格要专业、客观,同时易于理解 -3. 不要添加任何额外的解释或标题,直接输出报告内容 -4. 基于提供的数据和示例内容进行分析,不要编造信息 -5. 重点关注内容的主题、特点和价值 -6. 报告长度控制在150-200字 + # Build template inputs from chunk analysis + inputs = await _build_insight_inputs(chunks, max_chunks, end_user_id) -例如,如果输入是: -- 内容领域分布: 技术(60%), 商业(25%), 教育(15%) -- 内容规模: 共50个知识片段,平均长度320字 -内容示例: [示例内容...] + rendered_prompt = await render_memory_insight_prompt( + domain_distribution=inputs["domain_distribution"], + active_periods=inputs["active_periods"], + social_connections=inputs["social_connections"], + language=language, + ) -你的输出应该类似: -"该知识库主要聚焦于技术领域(60%),涵盖商业(25%)和教育(15%)相关内容。共包含50个知识片段,平均每个片段约320字,内容详实。从示例来看,内容涉及[具体主题],体现了[特点],对[目标用户]具有较高的参考价值。" -""" - - user_prompt = "\n".join(prompt_parts) - - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt} - ] - - # 调用LLM生成洞察 + messages = [{"role": "user", "content": rendered_prompt}] llm_client = _get_llm_client(end_user_id) response = await llm_client.chat(messages=messages) - - insight = response.content.strip() - business_logger.info(f"成功生成chunk洞察,分析了 {min(len(chunks), max_chunks)} 个片段") - - return insight - + raw_text = response.content.strip() if response and response.content else "" + + sections = _parse_sections(raw_text, language=language) + sections["_raw"] = raw_text + + business_logger.info( + f"成功生成chunk洞察四维度,分析了 {min(len(chunks), max_chunks)} 个片段" + ) + return sections + except Exception as e: business_logger.error(f"生成chunk洞察失败: {str(e)}") - return "洞察生成失败" - - -if __name__ == "__main__": - # 测试代码 - test_chunks = [ - "Python是一种高级编程语言,以其简洁的语法和强大的功能而闻名。它广泛应用于Web开发、数据分析、人工智能等领域。", - "机器学习算法可以从数据中自动学习模式,无需显式编程。常见的算法包括决策树、随机森林、神经网络等。", - "深度学习是机器学习的一个分支,使用多层神经网络来学习数据的层次化表示。它在图像识别、语音识别等任务中表现出色。", - "自然语言处理技术使计算机能够理解和生成人类语言。应用包括机器翻译、情感分析、文本摘要等。", - "数据科学结合了统计学、计算机科学和领域知识,用于从数据中提取有价值的洞察。" - ] - - print("开始生成chunk洞察...") - insight = asyncio.run(generate_chunk_insight(test_chunks)) - print(f"\n生成的洞察:\n{insight}") + empty = {k: "" for k in ("memory_insight", "behavior_pattern", "key_findings", "growth_trajectory")} + empty["_raw"] = "洞察生成失败" + return empty diff --git a/api/app/core/rag_utils/chunk_summary.py b/api/app/core/rag_utils/chunk_summary.py index 53df2ab3..dd1c904e 100644 --- a/api/app/core/rag_utils/chunk_summary.py +++ b/api/app/core/rag_utils/chunk_summary.py @@ -1,12 +1,10 @@ """ -Generate summary for RAG chunks. - -This module provides functionality to summarize chunk content using LLM. +Generate summary for RAG chunks using memory_summary.jinja2 prompt template. """ import asyncio import os -from typing import Any, Dict, List, Optional +from typing import List, Optional from app.core.logging_config import get_business_logger from app.core.memory.utils.llm.llm_utils import MemoryClientFactory @@ -18,6 +16,29 @@ business_logger = get_business_logger() DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus") +# ── Schema ────────────────────────────────────────────────────────────────── + +class MemorySummaryStatement(BaseModel): + """Single labelled statement extracted by memory_summary.jinja2.""" + statement: str = Field(..., description="提取的陈述内容") + label: Optional[str] = Field(None, description="陈述标签") + + +class MemorySummaryResponse(BaseModel): + """ + Structured output expected from memory_summary.jinja2. + The template asks for a JSON array of labelled statements; + we wrap it in an object so response_structured can parse it. + """ + statements: List[MemorySummaryStatement] = Field( + default_factory=list, + description="从chunk中提取的陈述列表" + ) + summary: Optional[str] = Field(None, description="整体摘要文本(可选)") + + +# ── LLM client helper ──────────────────────────────────────────────────────── + def _get_llm_client(end_user_id: Optional[str] = None): """Get LLM client, preferring user-connected config with fallback to default.""" with get_db_context() as db: @@ -42,86 +63,75 @@ def _get_llm_client(end_user_id: Optional[str] = None): return factory.get_llm_client(DEFAULT_LLM_ID) -class ChunkSummary(BaseModel): - """Pydantic model for chunk summary.""" - summary: str = Field(..., description="简洁的chunk内容摘要") +# ── Core function ───────────────────────────────────────────────────────────── - -async def generate_chunk_summary(chunks: List[str], max_chunks: int = 10, end_user_id: Optional[str] = None) -> str: +async def generate_chunk_summary( + chunks: List[str], + max_chunks: int = 10, + end_user_id: Optional[str] = None, + language: str = "zh", +) -> str: """ - Generate a summary for the given chunks. - + Generate a user summary from RAG chunks using the memory_summary.jinja2 template. + + The template extracts labelled statements from the chunks; we then join them + into a coherent summary string that can be stored in end_user.user_summary. + Args: chunks: List of chunk content strings - max_chunks: Maximum number of chunks to process (default: 10) - + max_chunks: Maximum number of chunks to process + end_user_id: Optional end-user ID for model selection + language: Output language ("zh" or "en") + Returns: - A concise summary of the chunks + Summary string (joined statements or fallback text) """ if not chunks: business_logger.warning("没有提供chunk内容用于生成摘要") return "暂无内容" - + try: - # 限制处理的chunk数量,避免token过多 + from app.core.memory.utils.prompt.prompt_utils import render_memory_summary_prompt + chunks_to_process = chunks[:max_chunks] - - # 合并chunk内容 - combined_content = "\n\n".join([f"片段{i+1}: {chunk}" for i, chunk in enumerate(chunks_to_process)]) - - # 构建prompt - system_prompt = ( - "你是一位专业的文本摘要助手。请基于提供的文本片段,生成简洁的摘要。要求:\n" - "- 摘要长度控制在100-150字;\n" - "- 提取核心信息和关键要点;\n" - "- 使用客观、清晰的语言;\n" - "- 避免冗余和重复;\n" - "- 如果内容涉及多个主题,按重要性排序呈现。" + chunk_texts = "\n\n".join( + [f"片段{i + 1}: {chunk}" for i, chunk in enumerate(chunks_to_process)] ) - - user_prompt = f"请为以下文本片段生成摘要:\n\n{combined_content}" - - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] - - # 调用LLM生成摘要 + + json_schema = MemorySummaryResponse.model_json_schema() + + rendered_prompt = await render_memory_summary_prompt( + chunk_texts=chunk_texts, + json_schema=json_schema, + max_words=200, + language=language, + ) + + messages = [{"role": "user", "content": rendered_prompt}] + llm_client = _get_llm_client(end_user_id) - response = await llm_client.chat(messages=messages) - - summary = response.content.strip() - business_logger.info(f"成功生成chunk摘要,处理了 {len(chunks_to_process)} 个片段") - + + # Try structured output first; fall back to plain chat if unsupported + try: + response: MemorySummaryResponse = await llm_client.response_structured( + messages=messages, + response_model=MemorySummaryResponse, + ) + if response.summary: + summary = response.summary.strip() + elif response.statements: + summary = ";".join(s.statement for s in response.statements) + else: + summary = "暂无内容" + except Exception: + raw = await llm_client.chat(messages=messages) + summary = raw.content.strip() if raw and raw.content else "暂无内容" + + business_logger.info( + f"成功生成chunk摘要,处理了 {len(chunks_to_process)} 个片段" + ) return summary - + except Exception as e: business_logger.error(f"生成chunk摘要失败: {str(e)}") return "摘要生成失败" - - -async def generate_chunk_summary_batch(chunks_list: List[List[str]]) -> List[str]: - """ - Generate summaries for multiple chunk lists in batch. - - Args: - chunks_list: List of chunk lists - - Returns: - List of summaries - """ - tasks = [generate_chunk_summary(chunks) for chunks in chunks_list] - return await asyncio.gather(*tasks) - - -if __name__ == "__main__": - # 测试代码 - test_chunks = [ - "这是第一段测试内容,讲述了关于机器学习的基础知识。", - "第二段内容介绍了深度学习的应用场景和发展历史。", - "第三段讨论了自然语言处理技术的最新进展。" - ] - - print("开始生成chunk摘要...") - summary = asyncio.run(generate_chunk_summary(test_chunks)) - print(f"\n生成的摘要:\n{summary}") diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 6559ef2f..cbe0bd80 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -740,13 +740,27 @@ async def get_chunk_insight( business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}") return {"insight": "暂无足够数据生成洞察报告"} - from app.core.rag_utils import generate_chunk_insight + from app.core.rag_utils import generate_chunk_insight_sections - insight = await generate_chunk_insight(chunks, max_chunks=limit, end_user_id=end_user_id) + sections = await generate_chunk_insight_sections(chunks, max_chunks=limit, end_user_id=end_user_id) + insight = sections.get("memory_insight") or sections.get("_raw", "") - # 写库缓存 + # 写库缓存(四维度全部入库) if end_user: - repo.update_rag_insight(end_user_id=end_user.id, memory_insight=insight) + from app.repositories.end_user_repository import EndUserRepository as _Repo + _repo = _Repo(db) + _repo.update_memory_insight( + end_user_id=end_user.id, + memory_insight=insight, + behavior_pattern=sections.get("behavior_pattern", ""), + key_findings=sections.get("key_findings", ""), + growth_trajectory=sections.get("growth_trajectory", ""), + ) + # 同时标记 storage_type 为 rag + db.query(end_user.__class__).filter( + end_user.__class__.id == end_user.id + ).update({"storage_type": "rag"}, synchronize_session=False) + db.commit() business_logger.info("成功获取chunk洞察") return {"insight": insight}