[add] Generate user summaries and memory insights using Jinja2 tags

This commit is contained in:
lanceyq
2026-03-10 11:51:17 +08:00
parent e2f5fa87b1
commit fd556f9b00
4 changed files with 242 additions and 243 deletions

View File

@@ -4,11 +4,12 @@ RAG chunk analysis utilities.
from .chunk_summary import generate_chunk_summary
from .chunk_tags import extract_chunk_tags, extract_chunk_persona
from .chunk_insight import generate_chunk_insight
from .chunk_insight import generate_chunk_insight, generate_chunk_insight_sections
__all__ = [
"generate_chunk_summary",
"extract_chunk_tags",
"extract_chunk_persona",
"generate_chunk_insight",
"generate_chunk_insight_sections",
]

View File

@@ -1,24 +1,33 @@
"""
Generate insights from RAG chunks.
Generate memory insight report for RAG chunks using memory_insight.jinja2 prompt template.
This module provides functionality to analyze chunk content and generate insights using LLM.
The memory_insight.jinja2 template produces a four-section report:
【总体概述】 → memory_insight
【行为模式】 → behavior_pattern
【关键发现】 → key_findings
【成长轨迹】 → growth_trajectory
generate_chunk_insight() returns the full raw text (stored in end_user.memory_insight).
generate_chunk_insight_sections() returns a dict with all four fields for richer storage.
"""
import asyncio
import os
import re
from collections import Counter
from typing import Any, Dict, List, Optional
from typing import Dict, List, Optional
from app.core.logging_config import get_business_logger
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context
from pydantic import BaseModel, Field
business_logger = get_business_logger()
DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus")
# ── LLM client helper ────────────────────────────────────────────────────────
def _get_llm_client(end_user_id: Optional[str] = None):
"""Get LLM client, preferring user-connected config with fallback to default."""
with get_db_context() as db:
@@ -43,191 +52,156 @@ def _get_llm_client(end_user_id: Optional[str] = None):
return factory.get_llm_client(DEFAULT_LLM_ID)
class ChunkInsight(BaseModel):
"""Pydantic model for chunk insight."""
insight: str = Field(..., description="对chunk内容的深度洞察分析")
# ── Domain analysis helpers (kept for building prompt inputs) ─────────────────
async def _classify_domain(chunk: str, llm_client) -> str:
"""Classify a single chunk into a domain category."""
from pydantic import BaseModel, Field
class DomainClassification(BaseModel):
"""Pydantic model for domain classification."""
domain: str = Field(
...,
description="内容所属的领域分类",
examples=["技术", "商业", "教育", "生活", "娱乐", "健康", "其他"]
)
class _Domain(BaseModel):
domain: str = Field(..., description="领域分类")
async def classify_chunk_domain(chunk: str, end_user_id: Optional[str] = None) -> str:
"""
Classify a chunk into a specific domain.
Args:
chunk: Chunk content string
Returns:
Domain name
"""
try:
llm_client = _get_llm_client(end_user_id)
prompt = f"""请将以下文本内容归类到最合适的领域中。
可选领域及其关键词:
- 技术:编程、软件、硬件、算法、数据、网络、系统、开发、工程等
- 商业:市场、销售、管理、财务、投资、创业、营销、战略等
- 教育:学习、课程、培训、教学、知识、技能、考试、研究等
- 生活:日常、家庭、饮食、购物、旅行、休闲、娱乐等
- 娱乐:游戏、电影、音乐、体育、艺术、文化等
- 健康:医疗、养生、运动、心理、保健、疾病等
- 其他:无法归入以上类别的内容
文本内容: {chunk[:500]}...
请直接返回最合适的领域名称。"""
messages = [
{"role": "system", "content": "你是一个专业的文本分类助手。请仔细分析文本内容,选择最合适的领域分类。"},
{"role": "user", "content": prompt}
]
classification = await llm_client.response_structured(
messages=messages,
response_model=DomainClassification
prompt = (
"请将以下文本归类到最合适的领域(技术/商业/教育/生活/娱乐/健康/其他)。\n\n"
f"文本: {chunk[:500]}\n\n直接返回领域名称。"
)
return classification.domain if classification else "其他"
except Exception as e:
business_logger.error(f"分类chunk领域失败: {str(e)}")
result = await llm_client.response_structured(
messages=[{"role": "user", "content": prompt}],
response_model=_Domain,
)
return result.domain if result else "其他"
except Exception:
return "其他"
async def analyze_domain_distribution(chunks: List[str], max_chunks: int = 20, end_user_id: Optional[str] = None) -> Dict[str, float]:
async def _build_insight_inputs(
chunks: List[str],
max_chunks: int,
end_user_id: Optional[str],
) -> Dict[str, str]:
"""
Analyze the domain distribution of chunks.
Args:
chunks: List of chunk content strings
max_chunks: Maximum number of chunks to analyze
Returns:
Dictionary of domain -> percentage
Derive domain_distribution, active_periods, social_connections strings
to feed into the memory_insight.jinja2 template.
"""
if not chunks:
return {}
try:
# 限制分析的chunk数量
chunks_to_analyze = chunks[:max_chunks]
# 为每个chunk分类
domain_counts = Counter()
for chunk in chunks_to_analyze:
domain = await classify_chunk_domain(chunk, end_user_id)
domain_counts[domain] += 1
# 计算百分比
total = sum(domain_counts.values())
domain_distribution = {
domain: count / total
for domain, count in domain_counts.items()
}
# 按百分比降序排序
return dict(sorted(domain_distribution.items(), key=lambda x: x[1], reverse=True))
except Exception as e:
business_logger.error(f"分析领域分布失败: {str(e)}")
return {}
llm_client = _get_llm_client(end_user_id)
chunks_sample = chunks[:max_chunks]
# Domain distribution
domain_counts: Counter = Counter()
for chunk in chunks_sample:
domain = await _classify_domain(chunk, llm_client)
domain_counts[domain] += 1
total = sum(domain_counts.values()) or 1
domain_distribution = ", ".join(
f"{d}({c / total:.0%})" for d, c in domain_counts.most_common(3)
)
return {
"domain_distribution": domain_distribution,
"active_periods": None, # RAG模式暂无时间维度数据
"social_connections": None, # RAG模式暂无社交关联数据
}
async def generate_chunk_insight(chunks: List[str], max_chunks: int = 15, end_user_id: Optional[str] = None) -> str:
# ── Section parser ────────────────────────────────────────────────────────────
_ZH_SECTIONS = {
"memory_insight": r"【总体概述】(.*?)(?=【|$)",
"behavior_pattern": r"【行为模式】(.*?)(?=【|$)",
"key_findings": r"【关键发现】(.*?)(?=【|$)",
"growth_trajectory": r"【成长轨迹】(.*?)(?=【|$)",
}
_EN_SECTIONS = {
"memory_insight": r"【Overview】(.*?)(?=【|$)",
"behavior_pattern": r"【Behavior Pattern】(.*?)(?=【|$)",
"key_findings": r"【Key Findings】(.*?)(?=【|$)",
"growth_trajectory": r"【Growth Trajectory】(.*?)(?=【|$)",
}
def _parse_sections(text: str, language: str = "zh") -> Dict[str, str]:
"""Extract the four sections from the LLM output."""
patterns = _ZH_SECTIONS if language == "zh" else _EN_SECTIONS
result = {}
for key, pattern in patterns.items():
match = re.search(pattern, text, re.DOTALL)
result[key] = match.group(1).strip() if match else ""
return result
# ── Public API ────────────────────────────────────────────────────────────────
async def generate_chunk_insight(
chunks: List[str],
max_chunks: int = 15,
end_user_id: Optional[str] = None,
language: str = "zh",
) -> str:
"""
Generate insights from the given chunks.
Args:
chunks: List of chunk content strings
max_chunks: Maximum number of chunks to analyze
Returns:
A comprehensive insight report
Generate a memory insight report from RAG chunks.
Returns the full raw report text (suitable for end_user.memory_insight).
Use generate_chunk_insight_sections() when you need all four dimensions.
"""
sections = await generate_chunk_insight_sections(
chunks=chunks,
max_chunks=max_chunks,
end_user_id=end_user_id,
language=language,
)
return sections.get("memory_insight") or sections.get("_raw", "洞察生成失败")
async def generate_chunk_insight_sections(
chunks: List[str],
max_chunks: int = 15,
end_user_id: Optional[str] = None,
language: str = "zh",
) -> Dict[str, str]:
"""
Generate a four-section memory insight report from RAG chunks.
Returns a dict with keys:
memory_insight, behavior_pattern, key_findings, growth_trajectory
(plus '_raw' containing the full LLM output for debugging)
"""
if not chunks:
business_logger.warning("没有提供chunk内容用于生成洞察")
return "暂无足够数据生成洞察报告"
empty = {k: "" for k in ("memory_insight", "behavior_pattern", "key_findings", "growth_trajectory")}
empty["_raw"] = "暂无足够数据生成洞察报告"
return empty
try:
# 1. 分析领域分布
domain_dist = await analyze_domain_distribution(chunks, max_chunks=max_chunks, end_user_id=end_user_id)
# 2. 统计基本信息
total_chunks = len(chunks)
avg_length = sum(len(chunk) for chunk in chunks) / total_chunks if total_chunks > 0 else 0
# 3. 构建洞察prompt
prompt_parts = []
if domain_dist:
top_domains = ", ".join([f"{k}({v:.0%})" for k, v in list(domain_dist.items())[:3]])
prompt_parts.append(f"- 内容领域分布: {top_domains}")
prompt_parts.append(f"- 内容规模: 共{total_chunks}个知识片段,平均长度{avg_length:.0f}")
# 添加部分chunk内容作为参考
sample_chunks = chunks[:5]
sample_content = "\n".join([f"示例{i+1}: {chunk[:200]}..." for i, chunk in enumerate(sample_chunks)])
prompt_parts.append(f"\n内容示例:\n{sample_content}")
system_prompt = """你是一位专业的知识内容分析师。你的任务是根据提供的信息,生成一段简洁、有洞察力的分析报告。
from app.core.memory.utils.prompt.prompt_utils import render_memory_insight_prompt
重要规则:
1. 报告需要将所有要点流畅地串联成一个段落
2. 语言风格要专业、客观,同时易于理解
3. 不要添加任何额外的解释或标题,直接输出报告内容
4. 基于提供的数据和示例内容进行分析,不要编造信息
5. 重点关注内容的主题、特点和价值
6. 报告长度控制在150-200字
# Build template inputs from chunk analysis
inputs = await _build_insight_inputs(chunks, max_chunks, end_user_id)
例如,如果输入是:
- 内容领域分布: 技术(60%), 商业(25%), 教育(15%)
- 内容规模: 共50个知识片段平均长度320字
内容示例: [示例内容...]
rendered_prompt = await render_memory_insight_prompt(
domain_distribution=inputs["domain_distribution"],
active_periods=inputs["active_periods"],
social_connections=inputs["social_connections"],
language=language,
)
你的输出应该类似:
"该知识库主要聚焦于技术领域(60%),涵盖商业(25%)和教育(15%)相关内容。共包含50个知识片段平均每个片段约320字内容详实。从示例来看内容涉及[具体主题],体现了[特点],对[目标用户]具有较高的参考价值。"
"""
user_prompt = "\n".join(prompt_parts)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
# 调用LLM生成洞察
messages = [{"role": "user", "content": rendered_prompt}]
llm_client = _get_llm_client(end_user_id)
response = await llm_client.chat(messages=messages)
insight = response.content.strip()
business_logger.info(f"成功生成chunk洞察分析了 {min(len(chunks), max_chunks)} 个片段")
return insight
raw_text = response.content.strip() if response and response.content else ""
sections = _parse_sections(raw_text, language=language)
sections["_raw"] = raw_text
business_logger.info(
f"成功生成chunk洞察四维度分析了 {min(len(chunks), max_chunks)} 个片段"
)
return sections
except Exception as e:
business_logger.error(f"生成chunk洞察失败: {str(e)}")
return "洞察生成失败"
if __name__ == "__main__":
# 测试代码
test_chunks = [
"Python是一种高级编程语言以其简洁的语法和强大的功能而闻名。它广泛应用于Web开发、数据分析、人工智能等领域。",
"机器学习算法可以从数据中自动学习模式,无需显式编程。常见的算法包括决策树、随机森林、神经网络等。",
"深度学习是机器学习的一个分支,使用多层神经网络来学习数据的层次化表示。它在图像识别、语音识别等任务中表现出色。",
"自然语言处理技术使计算机能够理解和生成人类语言。应用包括机器翻译、情感分析、文本摘要等。",
"数据科学结合了统计学、计算机科学和领域知识,用于从数据中提取有价值的洞察。"
]
print("开始生成chunk洞察...")
insight = asyncio.run(generate_chunk_insight(test_chunks))
print(f"\n生成的洞察:\n{insight}")
empty = {k: "" for k in ("memory_insight", "behavior_pattern", "key_findings", "growth_trajectory")}
empty["_raw"] = "洞察生成失败"
return empty

View File

@@ -1,12 +1,10 @@
"""
Generate summary for RAG chunks.
This module provides functionality to summarize chunk content using LLM.
Generate summary for RAG chunks using memory_summary.jinja2 prompt template.
"""
import asyncio
import os
from typing import Any, Dict, List, Optional
from typing import List, Optional
from app.core.logging_config import get_business_logger
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
@@ -18,6 +16,29 @@ business_logger = get_business_logger()
DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus")
# ── Schema ──────────────────────────────────────────────────────────────────
class MemorySummaryStatement(BaseModel):
"""Single labelled statement extracted by memory_summary.jinja2."""
statement: str = Field(..., description="提取的陈述内容")
label: Optional[str] = Field(None, description="陈述标签")
class MemorySummaryResponse(BaseModel):
"""
Structured output expected from memory_summary.jinja2.
The template asks for a JSON array of labelled statements;
we wrap it in an object so response_structured can parse it.
"""
statements: List[MemorySummaryStatement] = Field(
default_factory=list,
description="从chunk中提取的陈述列表"
)
summary: Optional[str] = Field(None, description="整体摘要文本(可选)")
# ── LLM client helper ────────────────────────────────────────────────────────
def _get_llm_client(end_user_id: Optional[str] = None):
"""Get LLM client, preferring user-connected config with fallback to default."""
with get_db_context() as db:
@@ -42,86 +63,75 @@ def _get_llm_client(end_user_id: Optional[str] = None):
return factory.get_llm_client(DEFAULT_LLM_ID)
class ChunkSummary(BaseModel):
"""Pydantic model for chunk summary."""
summary: str = Field(..., description="简洁的chunk内容摘要")
# ── Core function ─────────────────────────────────────────────────────────────
async def generate_chunk_summary(chunks: List[str], max_chunks: int = 10, end_user_id: Optional[str] = None) -> str:
async def generate_chunk_summary(
chunks: List[str],
max_chunks: int = 10,
end_user_id: Optional[str] = None,
language: str = "zh",
) -> str:
"""
Generate a summary for the given chunks.
Generate a user summary from RAG chunks using the memory_summary.jinja2 template.
The template extracts labelled statements from the chunks; we then join them
into a coherent summary string that can be stored in end_user.user_summary.
Args:
chunks: List of chunk content strings
max_chunks: Maximum number of chunks to process (default: 10)
max_chunks: Maximum number of chunks to process
end_user_id: Optional end-user ID for model selection
language: Output language ("zh" or "en")
Returns:
A concise summary of the chunks
Summary string (joined statements or fallback text)
"""
if not chunks:
business_logger.warning("没有提供chunk内容用于生成摘要")
return "暂无内容"
try:
# 限制处理的chunk数量避免token过多
from app.core.memory.utils.prompt.prompt_utils import render_memory_summary_prompt
chunks_to_process = chunks[:max_chunks]
# 合并chunk内容
combined_content = "\n\n".join([f"片段{i+1}: {chunk}" for i, chunk in enumerate(chunks_to_process)])
# 构建prompt
system_prompt = (
"你是一位专业的文本摘要助手。请基于提供的文本片段,生成简洁的摘要。要求:\n"
"- 摘要长度控制在100-150字\n"
"- 提取核心信息和关键要点;\n"
"- 使用客观、清晰的语言;\n"
"- 避免冗余和重复;\n"
"- 如果内容涉及多个主题,按重要性排序呈现。"
chunk_texts = "\n\n".join(
[f"片段{i + 1}: {chunk}" for i, chunk in enumerate(chunks_to_process)]
)
user_prompt = f"请为以下文本片段生成摘要:\n\n{combined_content}"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
# 调用LLM生成摘要
json_schema = MemorySummaryResponse.model_json_schema()
rendered_prompt = await render_memory_summary_prompt(
chunk_texts=chunk_texts,
json_schema=json_schema,
max_words=200,
language=language,
)
messages = [{"role": "user", "content": rendered_prompt}]
llm_client = _get_llm_client(end_user_id)
response = await llm_client.chat(messages=messages)
summary = response.content.strip()
business_logger.info(f"成功生成chunk摘要处理了 {len(chunks_to_process)} 个片段")
# Try structured output first; fall back to plain chat if unsupported
try:
response: MemorySummaryResponse = await llm_client.response_structured(
messages=messages,
response_model=MemorySummaryResponse,
)
if response.summary:
summary = response.summary.strip()
elif response.statements:
summary = "".join(s.statement for s in response.statements)
else:
summary = "暂无内容"
except Exception:
raw = await llm_client.chat(messages=messages)
summary = raw.content.strip() if raw and raw.content else "暂无内容"
business_logger.info(
f"成功生成chunk摘要处理了 {len(chunks_to_process)} 个片段"
)
return summary
except Exception as e:
business_logger.error(f"生成chunk摘要失败: {str(e)}")
return "摘要生成失败"
async def generate_chunk_summary_batch(chunks_list: List[List[str]]) -> List[str]:
"""
Generate summaries for multiple chunk lists in batch.
Args:
chunks_list: List of chunk lists
Returns:
List of summaries
"""
tasks = [generate_chunk_summary(chunks) for chunks in chunks_list]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
# 测试代码
test_chunks = [
"这是第一段测试内容,讲述了关于机器学习的基础知识。",
"第二段内容介绍了深度学习的应用场景和发展历史。",
"第三段讨论了自然语言处理技术的最新进展。"
]
print("开始生成chunk摘要...")
summary = asyncio.run(generate_chunk_summary(test_chunks))
print(f"\n生成的摘要:\n{summary}")

View File

@@ -740,13 +740,27 @@ async def get_chunk_insight(
business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}")
return {"insight": "暂无足够数据生成洞察报告"}
from app.core.rag_utils import generate_chunk_insight
from app.core.rag_utils import generate_chunk_insight_sections
insight = await generate_chunk_insight(chunks, max_chunks=limit, end_user_id=end_user_id)
sections = await generate_chunk_insight_sections(chunks, max_chunks=limit, end_user_id=end_user_id)
insight = sections.get("memory_insight") or sections.get("_raw", "")
# 写库缓存
# 写库缓存(四维度全部入库)
if end_user:
repo.update_rag_insight(end_user_id=end_user.id, memory_insight=insight)
from app.repositories.end_user_repository import EndUserRepository as _Repo
_repo = _Repo(db)
_repo.update_memory_insight(
end_user_id=end_user.id,
memory_insight=insight,
behavior_pattern=sections.get("behavior_pattern", ""),
key_findings=sections.get("key_findings", ""),
growth_trajectory=sections.get("growth_trajectory", ""),
)
# 同时标记 storage_type 为 rag
db.query(end_user.__class__).filter(
end_user.__class__.id == end_user.id
).update({"storage_type": "rag"}, synchronize_session=False)
db.commit()
business_logger.info("成功获取chunk洞察")
return {"insight": insight}