Merge pull request #524 from SuanmoSuanyangTechnology/feature/details-memory

Feature/details memory
This commit is contained in:
Ke Sun
2026-03-10 14:42:18 +08:00
committed by GitHub
8 changed files with 567 additions and 375 deletions

View File

@@ -1,4 +1,5 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query from fastapi import APIRouter, Depends, HTTPException, status, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from typing import Optional from typing import Optional
from app.core.response_utils import success from app.core.response_utils import success
@@ -422,26 +423,18 @@ async def get_chunk_summary_tag(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
""" """
获取chunk总结、提取的标签和人物形象 读取RAG摘要、标签和人物形象纯读库不触发生成
返回格式: 返回格式:
{ {
"summary": "chunk内容的总结", "summary": "用户摘要",
"tags": [ "tags": [{"tag": "标签1", "frequency": 5}, ...],
{"tag": "标签1", "frequency": 5}, "personas": ["产品设计师", ...],
{"tag": "标签2", "frequency": 3}, "generated": true/false // false表示尚未生产请调用 /generate_rag_profile
...
],
"personas": [
"产品设计师",
"旅行爱好者",
"摄影发烧友",
...
]
} }
""" """
api_logger.info(f"用户 {current_user.username} 请求获取宿主 {end_user_id}chunk摘要标签人物形象") api_logger.info(f"用户 {current_user.username} 取宿主 {end_user_id}RAG摘要/标签/人物形象")
data = await memory_dashboard_service.get_chunk_summary_and_tags( data = await memory_dashboard_service.get_chunk_summary_and_tags(
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit, limit=limit,
@@ -449,9 +442,8 @@ async def get_chunk_summary_tag(
db=db, db=db,
current_user=current_user current_user=current_user
) )
api_logger.info(f"成功获取chunk摘要、{len(data.get('tags', []))} 个标签和 {len(data.get('personas', []))} 个人物形象") return success(data=data, msg="获取成功")
return success(data=data, msg="chunk摘要、标签和人物形象获取成功")
@router.get("/chunk_insight", response_model=ApiResponse) @router.get("/chunk_insight", response_model=ApiResponse)
@@ -462,24 +454,57 @@ async def get_chunk_insight(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
""" """
获取chunk的洞察内容 读取RAG洞察报告纯读库不触发生成
返回格式: 返回格式:
{ {
"insight": "对chunk内容的深度洞察分析" "insight": "总体概述",
"behavior_pattern": "行为模式",
"key_findings": "关键发现",
"growth_trajectory": "成长轨迹",
"generated": true/false // false表示尚未生产请调用 /generate_rag_profile
} }
""" """
api_logger.info(f"用户 {current_user.username} 请求获取宿主 {end_user_id}chunk洞察") api_logger.info(f"用户 {current_user.username} 取宿主 {end_user_id}RAG洞察")
data = await memory_dashboard_service.get_chunk_insight( data = await memory_dashboard_service.get_chunk_insight(
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit, limit=limit,
db=db, db=db,
current_user=current_user current_user=current_user
) )
api_logger.info("成功获取chunk洞察") return success(data=data, msg="获取成功")
return success(data=data, msg="chunk洞察获取成功")
class GenerateRagProfileRequest(BaseModel):
end_user_id: str = Field(..., description="宿主ID")
limit: int = Field(15, description="参与生成的chunk数量上限")
max_tags: int = Field(10, description="最大标签数量")
@router.post("/generate_rag_profile", response_model=ApiResponse)
async def generate_rag_profile(
body: GenerateRagProfileRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
生产接口为RAG存储模式的宿主全量重新生成完整画像并持久化到end_user表。
每次请求都会重新生成,覆盖已有数据。
"""
api_logger.info(f"用户 {current_user.username} 触发RAG画像生产: end_user_id={body.end_user_id}")
data = await memory_dashboard_service.generate_rag_profile(
end_user_id=body.end_user_id,
limit=body.limit,
max_tags=body.max_tags,
db=db,
current_user=current_user,
)
api_logger.info(f"RAG画像生产完成: {data}")
return success(data=data, msg="RAG画像生产完成")
@router.get("/dashboard_data", response_model=ApiResponse) @router.get("/dashboard_data", response_model=ApiResponse)

View File

@@ -4,11 +4,12 @@ RAG chunk analysis utilities.
from .chunk_summary import generate_chunk_summary from .chunk_summary import generate_chunk_summary
from .chunk_tags import extract_chunk_tags, extract_chunk_persona from .chunk_tags import extract_chunk_tags, extract_chunk_persona
from .chunk_insight import generate_chunk_insight from .chunk_insight import generate_chunk_insight, generate_chunk_insight_sections
__all__ = [ __all__ = [
"generate_chunk_summary", "generate_chunk_summary",
"extract_chunk_tags", "extract_chunk_tags",
"extract_chunk_persona", "extract_chunk_persona",
"generate_chunk_insight", "generate_chunk_insight",
"generate_chunk_insight_sections",
] ]

View File

@@ -1,213 +1,207 @@
""" """
Generate insights from RAG chunks. Generate memory insight report for RAG chunks using memory_insight.jinja2 prompt template.
This module provides functionality to analyze chunk content and generate insights using LLM. The memory_insight.jinja2 template produces a four-section report:
【总体概述】 → memory_insight
【行为模式】 → behavior_pattern
【关键发现】 → key_findings
【成长轨迹】 → growth_trajectory
generate_chunk_insight() returns the full raw text (stored in end_user.memory_insight).
generate_chunk_insight_sections() returns a dict with all four fields for richer storage.
""" """
import asyncio import asyncio
import os
import re
from collections import Counter from collections import Counter
from typing import Any, Dict, List from typing import Dict, List, Optional
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context from app.db import get_db_context
from pydantic import BaseModel, Field
business_logger = get_business_logger() business_logger = get_business_logger()
DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus")
def _get_llm_client():
"""Get LLM client using db context.""" # ── LLM client helper ────────────────────────────────────────────────────────
def _get_llm_client(end_user_id: Optional[str] = None):
"""Get LLM client, preferring user-connected config with fallback to default."""
with get_db_context() as db: with get_db_context() as db:
try:
if end_user_id:
from app.services.memory_agent_service import get_end_user_connected_config
from app.services.memory_config_service import MemoryConfigService
connected_config = get_end_user_connected_config(end_user_id, db)
config_id = connected_config.get("memory_config_id")
workspace_id = connected_config.get("workspace_id")
if config_id or workspace_id:
config_service = MemoryConfigService(db)
memory_config = config_service.load_memory_config(
config_id=config_id,
workspace_id=workspace_id
)
factory = MemoryClientFactory(db)
return factory.get_llm_client(memory_config.llm_model_id)
except Exception as e:
business_logger.warning(f"Failed to get user connected config, using default LLM: {e}")
factory = MemoryClientFactory(db) factory = MemoryClientFactory(db)
return factory.get_llm_client(None) # Uses default LLM return factory.get_llm_client(DEFAULT_LLM_ID)
class ChunkInsight(BaseModel): # ── Domain analysis helpers (kept for building prompt inputs) ─────────────────
"""Pydantic model for chunk insight."""
insight: str = Field(..., description="对chunk内容的深度洞察分析")
async def _classify_domain(chunk: str, llm_client) -> str:
"""Classify a single chunk into a domain category."""
from pydantic import BaseModel, Field
class DomainClassification(BaseModel): class _Domain(BaseModel):
"""Pydantic model for domain classification.""" domain: str = Field(..., description="领域分类")
domain: str = Field(
...,
description="内容所属的领域分类",
examples=["技术", "商业", "教育", "生活", "娱乐", "健康", "其他"]
)
async def classify_chunk_domain(chunk: str) -> str:
"""
Classify a chunk into a specific domain.
Args:
chunk: Chunk content string
Returns:
Domain name
"""
try: try:
llm_client = _get_llm_client() prompt = (
"请将以下文本归类到最合适的领域(技术/商业/教育/生活/娱乐/健康/其他)。\n\n"
prompt = f"""请将以下文本内容归类到最合适的领域中。 f"文本: {chunk[:500]}\n\n直接返回领域名称。"
可选领域及其关键词:
- 技术:编程、软件、硬件、算法、数据、网络、系统、开发、工程等
- 商业:市场、销售、管理、财务、投资、创业、营销、战略等
- 教育:学习、课程、培训、教学、知识、技能、考试、研究等
- 生活:日常、家庭、饮食、购物、旅行、休闲、娱乐等
- 娱乐:游戏、电影、音乐、体育、艺术、文化等
- 健康:医疗、养生、运动、心理、保健、疾病等
- 其他:无法归入以上类别的内容
文本内容: {chunk[:500]}...
请直接返回最合适的领域名称。"""
messages = [
{"role": "system", "content": "你是一个专业的文本分类助手。请仔细分析文本内容,选择最合适的领域分类。"},
{"role": "user", "content": prompt}
]
classification = await llm_client.response_structured(
messages=messages,
response_model=DomainClassification
) )
result = await llm_client.response_structured(
return classification.domain if classification else "其他" messages=[{"role": "user", "content": prompt}],
response_model=_Domain,
except Exception as e: )
business_logger.error(f"分类chunk领域失败: {str(e)}") return result.domain if result else "其他"
except Exception:
return "其他" return "其他"
async def analyze_domain_distribution(chunks: List[str], max_chunks: int = 20) -> Dict[str, float]: async def _build_insight_inputs(
chunks: List[str],
max_chunks: int,
end_user_id: Optional[str],
) -> Dict[str, Optional[str]]:
""" """
Analyze the domain distribution of chunks. Derive domain_distribution, active_periods, social_connections strings
to feed into the memory_insight.jinja2 template.
Args:
chunks: List of chunk content strings
max_chunks: Maximum number of chunks to analyze
Returns:
Dictionary of domain -> percentage
""" """
if not chunks: llm_client = _get_llm_client(end_user_id)
return {} chunks_sample = chunks[:max_chunks]
try: # Domain distribution
# 限制分析的chunk数量 domain_counts: Counter = Counter()
chunks_to_analyze = chunks[:max_chunks] for chunk in chunks_sample:
domain = await _classify_domain(chunk, llm_client)
# 为每个chunk分类 domain_counts[domain] += 1
domain_counts = Counter()
for chunk in chunks_to_analyze: total = sum(domain_counts.values()) or 1
domain = await classify_chunk_domain(chunk) domain_distribution = ", ".join(
domain_counts[domain] += 1 f"{d}({c / total:.0%})" for d, c in domain_counts.most_common(3)
)
# 计算百分比
total = sum(domain_counts.values()) return {
domain_distribution = { "domain_distribution": domain_distribution,
domain: count / total "active_periods": None, # RAG模式暂无时间维度数据
for domain, count in domain_counts.items() "social_connections": None, # RAG模式暂无社交关联数据
} }
# 按百分比降序排序
return dict(sorted(domain_distribution.items(), key=lambda x: x[1], reverse=True))
except Exception as e:
business_logger.error(f"分析领域分布失败: {str(e)}")
return {}
async def generate_chunk_insight(chunks: List[str], max_chunks: int = 15) -> str: # ── Section parser ────────────────────────────────────────────────────────────
_ZH_SECTIONS = {
"memory_insight": r"【总体概述】(.*?)(?=【|$)",
"behavior_pattern": r"【行为模式】(.*?)(?=【|$)",
"key_findings": r"【关键发现】(.*?)(?=【|$)",
"growth_trajectory": r"【成长轨迹】(.*?)(?=【|$)",
}
_EN_SECTIONS = {
"memory_insight": r"【Overview】(.*?)(?=【|$)",
"behavior_pattern": r"【Behavior Pattern】(.*?)(?=【|$)",
"key_findings": r"【Key Findings】(.*?)(?=【|$)",
"growth_trajectory": r"【Growth Trajectory】(.*?)(?=【|$)",
}
def _parse_sections(text: str, language: str = "zh") -> Dict[str, str]:
"""Extract the four sections from the LLM output."""
patterns = _ZH_SECTIONS if language == "zh" else _EN_SECTIONS
result = {}
for key, pattern in patterns.items():
match = re.search(pattern, text, re.DOTALL)
result[key] = match.group(1).strip() if match else ""
return result
# ── Public API ────────────────────────────────────────────────────────────────
async def generate_chunk_insight(
chunks: List[str],
max_chunks: int = 15,
end_user_id: Optional[str] = None,
language: str = "zh",
) -> str:
""" """
Generate insights from the given chunks. Generate a memory insight report from RAG chunks.
Args: Returns the full raw report text (suitable for end_user.memory_insight).
chunks: List of chunk content strings Use generate_chunk_insight_sections() when you need all four dimensions.
max_chunks: Maximum number of chunks to analyze """
sections = await generate_chunk_insight_sections(
Returns: chunks=chunks,
A comprehensive insight report max_chunks=max_chunks,
end_user_id=end_user_id,
language=language,
)
return sections.get("memory_insight") or sections.get("_raw", "洞察生成失败")
async def generate_chunk_insight_sections(
chunks: List[str],
max_chunks: int = 15,
end_user_id: Optional[str] = None,
language: str = "zh",
) -> Dict[str, str]:
"""
Generate a four-section memory insight report from RAG chunks.
Returns a dict with keys:
memory_insight, behavior_pattern, key_findings, growth_trajectory
(plus '_raw' containing the full LLM output for debugging)
""" """
if not chunks: if not chunks:
business_logger.warning("没有提供chunk内容用于生成洞察") business_logger.warning("没有提供chunk内容用于生成洞察")
return "暂无足够数据生成洞察报告" empty = {k: "" for k in ("memory_insight", "behavior_pattern", "key_findings", "growth_trajectory")}
empty["_raw"] = "暂无足够数据生成洞察报告"
return empty
try: try:
# 1. 分析领域分布 from app.core.memory.utils.prompt.prompt_utils import render_memory_insight_prompt
domain_dist = await analyze_domain_distribution(chunks, max_chunks=max_chunks)
# 2. 统计基本信息
total_chunks = len(chunks)
avg_length = sum(len(chunk) for chunk in chunks) / total_chunks if total_chunks > 0 else 0
# 3. 构建洞察prompt
prompt_parts = []
if domain_dist:
top_domains = ", ".join([f"{k}({v:.0%})" for k, v in list(domain_dist.items())[:3]])
prompt_parts.append(f"- 内容领域分布: {top_domains}")
prompt_parts.append(f"- 内容规模: 共{total_chunks}个知识片段,平均长度{avg_length:.0f}")
# 添加部分chunk内容作为参考
sample_chunks = chunks[:5]
sample_content = "\n".join([f"示例{i+1}: {chunk[:200]}..." for i, chunk in enumerate(sample_chunks)])
prompt_parts.append(f"\n内容示例:\n{sample_content}")
system_prompt = """你是一位专业的知识内容分析师。你的任务是根据提供的信息,生成一段简洁、有洞察力的分析报告。
重要规则: # Build template inputs from chunk analysis
1. 报告需要将所有要点流畅地串联成一个段落 inputs = await _build_insight_inputs(chunks, max_chunks, end_user_id)
2. 语言风格要专业、客观,同时易于理解
3. 不要添加任何额外的解释或标题,直接输出报告内容
4. 基于提供的数据和示例内容进行分析,不要编造信息
5. 重点关注内容的主题、特点和价值
6. 报告长度控制在150-200字
例如,如果输入是: rendered_prompt = await render_memory_insight_prompt(
- 内容领域分布: 技术(60%), 商业(25%), 教育(15%) domain_distribution=inputs["domain_distribution"],
- 内容规模: 共50个知识片段平均长度320字 active_periods=inputs["active_periods"],
内容示例: [示例内容...] social_connections=inputs["social_connections"],
language=language,
)
你的输出应该类似: messages = [{"role": "user", "content": rendered_prompt}]
"该知识库主要聚焦于技术领域(60%),涵盖商业(25%)和教育(15%)相关内容。共包含50个知识片段平均每个片段约320字内容详实。从示例来看内容涉及[具体主题],体现了[特点],对[目标用户]具有较高的参考价值。" llm_client = _get_llm_client(end_user_id)
"""
user_prompt = "\n".join(prompt_parts)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
# 调用LLM生成洞察
llm_client = _get_llm_client()
response = await llm_client.chat(messages=messages) response = await llm_client.chat(messages=messages)
raw_text = response.content.strip() if response and response.content else ""
insight = response.content.strip()
business_logger.info(f"成功生成chunk洞察分析了 {min(len(chunks), max_chunks)} 个片段") sections = _parse_sections(raw_text, language=language)
sections["_raw"] = raw_text
return insight
business_logger.info(
f"成功生成chunk洞察四维度分析了 {min(len(chunks), max_chunks)} 个片段"
)
return sections
except Exception as e: except Exception as e:
business_logger.error(f"生成chunk洞察失败: {str(e)}") business_logger.error(f"生成chunk洞察失败: {str(e)}")
return "洞察生成失败" empty = {k: "" for k in ("memory_insight", "behavior_pattern", "key_findings", "growth_trajectory")}
empty["_raw"] = "洞察生成失败"
return empty
if __name__ == "__main__":
# 测试代码
test_chunks = [
"Python是一种高级编程语言以其简洁的语法和强大的功能而闻名。它广泛应用于Web开发、数据分析、人工智能等领域。",
"机器学习算法可以从数据中自动学习模式,无需显式编程。常见的算法包括决策树、随机森林、神经网络等。",
"深度学习是机器学习的一个分支,使用多层神经网络来学习数据的层次化表示。它在图像识别、语音识别等任务中表现出色。",
"自然语言处理技术使计算机能够理解和生成人类语言。应用包括机器翻译、情感分析、文本摘要等。",
"数据科学结合了统计学、计算机科学和领域知识,用于从数据中提取有价值的洞察。"
]
print("开始生成chunk洞察...")
insight = asyncio.run(generate_chunk_insight(test_chunks))
print(f"\n生成的洞察:\n{insight}")

View File

@@ -1,11 +1,10 @@
""" """
Generate summary for RAG chunks. Generate summary for RAG chunks using memory_summary.jinja2 prompt template.
This module provides functionality to summarize chunk content using LLM.
""" """
import asyncio import asyncio
from typing import Any, Dict, List import os
from typing import List, Optional
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
@@ -14,94 +13,135 @@ from pydantic import BaseModel, Field
business_logger = get_business_logger() business_logger = get_business_logger()
DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus")
def _get_llm_client():
"""Get LLM client using db context."""
with get_db_context() as db:
factory = MemoryClientFactory(db)
return factory.get_llm_client(None) # Uses default LLM
class ChunkSummary(BaseModel): # ── Schema ──────────────────────────────────────────────────────────────────
"""Pydantic model for chunk summary."""
summary: str = Field(..., description="简洁的chunk内容摘要") class MemorySummaryStatement(BaseModel):
"""Single labelled statement extracted by memory_summary.jinja2."""
statement: str = Field(..., description="提取的陈述内容")
label: Optional[str] = Field(None, description="陈述标签")
async def generate_chunk_summary(chunks: List[str], max_chunks: int = 10) -> str: class MemorySummaryResponse(BaseModel):
""" """
Generate a summary for the given chunks. Structured output expected from memory_summary.jinja2.
The template asks for a JSON array of labelled statements;
we wrap it in an object so response_structured can parse it.
"""
statements: List[MemorySummaryStatement] = Field(
default_factory=list,
description="从chunk中提取的陈述列表"
)
summary: Optional[str] = Field(None, description="整体摘要文本(可选)")
# ── LLM client helper ────────────────────────────────────────────────────────
def _get_llm_client(end_user_id: Optional[str] = None):
"""Get LLM client, preferring user-connected config with fallback to default."""
with get_db_context() as db:
try:
if end_user_id:
from app.services.memory_agent_service import get_end_user_connected_config
from app.services.memory_config_service import MemoryConfigService
connected_config = get_end_user_connected_config(end_user_id, db)
config_id = connected_config.get("memory_config_id")
workspace_id = connected_config.get("workspace_id")
if config_id or workspace_id:
config_service = MemoryConfigService(db)
memory_config = config_service.load_memory_config(
config_id=config_id,
workspace_id=workspace_id
)
factory = MemoryClientFactory(db)
return factory.get_llm_client(memory_config.llm_model_id)
except Exception as e:
business_logger.warning(f"Failed to get user connected config, using default LLM: {e}")
factory = MemoryClientFactory(db)
return factory.get_llm_client(DEFAULT_LLM_ID)
# ── Core function ─────────────────────────────────────────────────────────────
async def generate_chunk_summary(
chunks: List[str],
max_chunks: int = 10,
end_user_id: Optional[str] = None,
language: str = "zh",
) -> str:
"""
Generate a user summary from RAG chunks using the memory_summary.jinja2 template.
The template extracts labelled statements from the chunks; we then join them
into a coherent summary string that can be stored in end_user.user_summary.
Args: Args:
chunks: List of chunk content strings chunks: List of chunk content strings
max_chunks: Maximum number of chunks to process (default: 10) max_chunks: Maximum number of chunks to process
end_user_id: Optional end-user ID for model selection
language: Output language ("zh" or "en")
Returns: Returns:
A concise summary of the chunks Summary string (joined statements or fallback text)
""" """
if not chunks: if not chunks:
business_logger.warning("没有提供chunk内容用于生成摘要") business_logger.warning("没有提供chunk内容用于生成摘要")
return "暂无内容" return "暂无内容"
try: try:
# 限制处理的chunk数量避免token过多 from app.core.memory.utils.prompt.prompt_utils import render_memory_summary_prompt
chunks_to_process = chunks[:max_chunks] chunks_to_process = chunks[:max_chunks]
chunk_texts = "\n\n".join(
# 合并chunk内容 [f"片段{i + 1}: {chunk}" for i, chunk in enumerate(chunks_to_process)]
combined_content = "\n\n".join([f"片段{i+1}: {chunk}" for i, chunk in enumerate(chunks_to_process)]) )
# 构建prompt json_schema = MemorySummaryResponse.model_json_schema()
system_prompt = (
"你是一位专业的文本摘要助手。请基于提供的文本片段,生成简洁的摘要。要求:\n" rendered_prompt = await render_memory_summary_prompt(
"- 摘要长度控制在100-150字\n" chunk_texts=chunk_texts,
"- 提取核心信息和关键要点;\n" json_schema=json_schema,
"- 使用客观、清晰的语言;\n" max_words=200,
"- 避免冗余和重复;\n" language=language,
"- 如果内容涉及多个主题,按重要性排序呈现。" )
messages = [{"role": "user", "content": rendered_prompt}]
llm_client = _get_llm_client(end_user_id)
# Try structured output; fall back to plain chat only for LLMClientException
# (indicates the model/provider doesn't support structured output).
# All other exceptions are re-raised so config/schema errors stay visible.
try:
response: MemorySummaryResponse = await llm_client.response_structured(
messages=messages,
response_model=MemorySummaryResponse,
)
if response.summary:
summary = response.summary.strip()
elif response.statements:
summary = "".join(s.statement for s in response.statements)
else:
summary = "暂无内容"
except Exception as e:
from app.core.memory.llm_tools.llm_client import LLMClientException
if isinstance(e, LLMClientException):
business_logger.warning(
f"结构化输出不可用,降级为普通对话: end_user_id={end_user_id}, reason={e}"
)
raw = await llm_client.chat(messages=messages)
summary = raw.content.strip() if raw and raw.content else "暂无内容"
else:
business_logger.error(f"生成摘要时发生非预期异常: {e}")
raise
business_logger.info(
f"成功生成chunk摘要处理了 {len(chunks_to_process)} 个片段"
) )
user_prompt = f"请为以下文本片段生成摘要:\n\n{combined_content}"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
# 调用LLM生成摘要
llm_client = _get_llm_client()
response = await llm_client.chat(messages=messages)
summary = response.content.strip()
business_logger.info(f"成功生成chunk摘要处理了 {len(chunks_to_process)} 个片段")
return summary return summary
except Exception as e: except Exception as e:
business_logger.error(f"生成chunk摘要失败: {str(e)}") business_logger.error(f"生成chunk摘要失败: {str(e)}")
return "摘要生成失败" return "摘要生成失败"
async def generate_chunk_summary_batch(chunks_list: List[List[str]]) -> List[str]:
"""
Generate summaries for multiple chunk lists in batch.
Args:
chunks_list: List of chunk lists
Returns:
List of summaries
"""
tasks = [generate_chunk_summary(chunks) for chunks in chunks_list]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
# 测试代码
test_chunks = [
"这是第一段测试内容,讲述了关于机器学习的基础知识。",
"第二段内容介绍了深度学习的应用场景和发展历史。",
"第三段讨论了自然语言处理技术的最新进展。"
]
print("开始生成chunk摘要...")
summary = asyncio.run(generate_chunk_summary(test_chunks))
print(f"\n生成的摘要:\n{summary}")

View File

@@ -5,8 +5,9 @@ This module provides functionality to extract meaningful tags from chunk content
""" """
import asyncio import asyncio
import os
from collections import Counter from collections import Counter
from typing import List, Tuple from typing import List, Optional, Tuple
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
@@ -15,12 +16,31 @@ from pydantic import BaseModel, Field
business_logger = get_business_logger() business_logger = get_business_logger()
DEFAULT_LLM_ID = os.getenv("SELECTED_LLM_ID", "openai/qwen-plus")
def _get_llm_client():
"""Get LLM client using db context.""" def _get_llm_client(end_user_id: Optional[str] = None):
"""Get LLM client, preferring user-connected config with fallback to default."""
with get_db_context() as db: with get_db_context() as db:
try:
if end_user_id:
from app.services.memory_agent_service import get_end_user_connected_config
from app.services.memory_config_service import MemoryConfigService
connected_config = get_end_user_connected_config(end_user_id, db)
config_id = connected_config.get("memory_config_id")
workspace_id = connected_config.get("workspace_id")
if config_id or workspace_id:
config_service = MemoryConfigService(db)
memory_config = config_service.load_memory_config(
config_id=config_id,
workspace_id=workspace_id
)
factory = MemoryClientFactory(db)
return factory.get_llm_client(memory_config.llm_model_id)
except Exception as e:
business_logger.warning(f"Failed to get user connected config, using default LLM: {e}")
factory = MemoryClientFactory(db) factory = MemoryClientFactory(db)
return factory.get_llm_client(None) # Uses default LLM return factory.get_llm_client(DEFAULT_LLM_ID)
class ExtractedTags(BaseModel): class ExtractedTags(BaseModel):
@@ -33,7 +53,7 @@ class ExtractedPersona(BaseModel):
personas: List[str] = Field(..., description="从文本中提取的人物形象列表,如'产品设计师''旅行爱好者'") personas: List[str] = Field(..., description="从文本中提取的人物形象列表,如'产品设计师''旅行爱好者'")
async def extract_chunk_tags(chunks: List[str], max_tags: int = 10, max_chunks: int = 10) -> List[Tuple[str, int]]: async def extract_chunk_tags(chunks: List[str], max_tags: int = 10, max_chunks: int = 10, end_user_id: Optional[str] = None) -> List[Tuple[str, int]]:
""" """
Extract meaningful tags from the given chunks. Extract meaningful tags from the given chunks.
@@ -64,7 +84,7 @@ async def extract_chunk_tags(chunks: List[str], max_tags: int = 10, max_chunks:
"标签应该是名词或名词短语,能够准确概括文本的核心内容。" "标签应该是名词或名词短语,能够准确概括文本的核心内容。"
) )
llm_client = _get_llm_client() llm_client = _get_llm_client(end_user_id)
# 为每个chunk单独提取标签然后统计频率 # 为每个chunk单独提取标签然后统计频率
all_tags = [] all_tags = []
@@ -116,7 +136,7 @@ async def extract_chunk_tags_with_frequency(chunks: List[str], max_tags: int = 1
return await extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=len(chunks)) return await extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=len(chunks))
async def extract_chunk_persona(chunks: List[str], max_personas: int = 5, max_chunks: int = 20) -> List[str]: async def extract_chunk_persona(chunks: List[str], max_personas: int = 5, max_chunks: int = 20, end_user_id: Optional[str] = None) -> List[str]:
""" """
Extract persona (人物形象) from the given chunks. Extract persona (人物形象) from the given chunks.
@@ -159,7 +179,7 @@ async def extract_chunk_persona(chunks: List[str], max_personas: int = 5, max_ch
] ]
# 调用LLM提取人物形象 # 调用LLM提取人物形象
llm_client = _get_llm_client() llm_client = _get_llm_client(end_user_id)
structured_response = await llm_client.response_structured( structured_response = await llm_client.response_structured(
messages=messages, messages=messages,
response_model=ExtractedPersona response_model=ExtractedPersona

View File

@@ -51,6 +51,12 @@ class EndUser(Base):
growth_trajectory = Column(Text, nullable=True, comment="成长轨迹") growth_trajectory = Column(Text, nullable=True, comment="成长轨迹")
memory_insight_updated_at = Column(DateTime, nullable=True, comment="洞察报告最后更新时间") memory_insight_updated_at = Column(DateTime, nullable=True, comment="洞察报告最后更新时间")
# RAG存储模式专用字段 - RAG Storage Mode Fields
storage_type = Column(String, nullable=True, default="neo4j", comment="存储模式类型: neo4j / rag")
rag_tags = Column(Text, nullable=True, comment="RAG模式下提取的标签列表JSON格式")
rag_personas = Column(Text, nullable=True, comment="RAG模式下提取的人物形象列表JSON格式")
rag_summary_updated_at = Column(DateTime, nullable=True, comment="RAG摘要/标签/人物形象最后更新时间")
# 与 App 的反向关系 # 与 App 的反向关系
app = relationship( app = relationship(
"App", "App",

View File

@@ -220,6 +220,90 @@ class EndUserRepository:
db_logger.error(f"更新终端用户 {end_user_id} 的用户摘要缓存时出错: {str(e)}") db_logger.error(f"更新终端用户 {end_user_id} 的用户摘要缓存时出错: {str(e)}")
raise raise
def update_rag_summary_tags(
self,
end_user_id: uuid.UUID,
user_summary: str,
rag_tags: str,
rag_personas: str,
) -> bool:
"""更新RAG模式下的用户摘要、标签和人物形象缓存
Args:
end_user_id: 终端用户ID
user_summary: 用户摘要文本
rag_tags: 标签列表JSON字符串
rag_personas: 人物形象列表JSON字符串
Returns:
bool: 更新成功返回True否则返回False
"""
try:
updated_count = (
self.db.query(EndUser)
.filter(EndUser.id == end_user_id)
.update(
{
EndUser.user_summary: user_summary,
EndUser.rag_tags: rag_tags,
EndUser.rag_personas: rag_personas,
EndUser.storage_type: "rag",
EndUser.rag_summary_updated_at: datetime.datetime.now(),
},
synchronize_session=False
)
)
self.db.commit()
if updated_count > 0:
db_logger.info(f"成功更新终端用户 {end_user_id} 的RAG摘要/标签/人物形象缓存")
return True
else:
db_logger.warning(f"未找到终端用户 {end_user_id}无法更新RAG摘要缓存")
return False
except Exception as e:
self.db.rollback()
db_logger.error(f"更新终端用户 {end_user_id} 的RAG摘要缓存时出错: {str(e)}")
raise
def update_rag_insight(
self,
end_user_id: uuid.UUID,
memory_insight: str,
) -> bool:
"""更新RAG模式下的记忆洞察缓存
Args:
end_user_id: 终端用户ID
memory_insight: 洞察文本
Returns:
bool: 更新成功返回True否则返回False
"""
try:
updated_count = (
self.db.query(EndUser)
.filter(EndUser.id == end_user_id)
.update(
{
EndUser.memory_insight: memory_insight,
EndUser.storage_type: "rag",
EndUser.memory_insight_updated_at: datetime.datetime.now(),
},
synchronize_session=False
)
)
self.db.commit()
if updated_count > 0:
db_logger.info(f"成功更新终端用户 {end_user_id} 的RAG洞察缓存")
return True
else:
db_logger.warning(f"未找到终端用户 {end_user_id}无法更新RAG洞察缓存")
return False
except Exception as e:
self.db.rollback()
db_logger.error(f"更新终端用户 {end_user_id} 的RAG洞察缓存时出错: {str(e)}")
raise
def get_all_by_workspace(self, workspace_id: uuid.UUID) -> List[EndUser]: def get_all_by_workspace(self, workspace_id: uuid.UUID) -> List[EndUser]:
"""获取工作空间的所有终端用户 """获取工作空间的所有终端用户

View File

@@ -646,59 +646,26 @@ async def get_chunk_summary_and_tags(
current_user: User current_user: User
) -> dict: ) -> dict:
""" """
获取chunk的总结、标签和人物形象 纯读库从end_user表返回RAG摘要、标签和人物形象缓存。
无数据时返回空结构不触发LLM生成。
Args:
end_user_id: 宿主ID
limit: 返回的chunk数量限制
max_tags: 最大标签数量
db: 数据库会话
current_user: 当前用户
Returns:
包含summary、tags和personas的字典
""" """
business_logger.info(f"获取chunk摘要、标签和人物形象: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}") import json
from app.repositories.end_user_repository import EndUserRepository
try:
# 1. 获取chunk内容 business_logger.info(f"读取chunk摘要/标签/人物形象缓存: end_user_id={end_user_id}")
rag_content = get_rag_content(end_user_id, limit, db, current_user)
chunks = rag_content.get("contents", []) repo = EndUserRepository(db)
end_user = repo.get_by_id(uuid.UUID(end_user_id))
if not chunks:
business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}") if not end_user:
return { return {"summary": "", "tags": [], "personas": [], "generated": False}
"summary": "暂无内容",
"tags": [], return {
"personas": [] "summary": end_user.user_summary or "",
} "tags": json.loads(end_user.rag_tags) if end_user.rag_tags else [],
"personas": json.loads(end_user.rag_personas) if end_user.rag_personas else [],
# 2. 导入RAG工具函数 "generated": bool(end_user.user_summary),
from app.core.rag_utils import generate_chunk_summary, extract_chunk_tags, extract_chunk_persona }
# 3. 并发生成摘要、提取标签和人物形象
import asyncio
summary_task = generate_chunk_summary(chunks, max_chunks=limit)
tags_task = extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit)
personas_task = extract_chunk_persona(chunks, max_personas=5, max_chunks=limit)
summary, tags_with_freq, personas = await asyncio.gather(summary_task, tags_task, personas_task)
# 4. 格式化标签数据
tags = [{"tag": tag, "frequency": freq} for tag, freq in tags_with_freq]
result = {
"summary": summary,
"tags": tags,
"personas": personas
}
business_logger.info(f"成功获取chunk摘要、{len(tags)} 个标签和 {len(personas)} 个人物形象")
return result
except Exception as e:
business_logger.error(f"获取chunk摘要、标签和人物形象失败: end_user_id={end_user_id} - {str(e)}")
raise
async def get_chunk_insight( async def get_chunk_insight(
@@ -708,43 +675,98 @@ async def get_chunk_insight(
current_user: User current_user: User
) -> dict: ) -> dict:
""" """
获取chunk的洞察分析 纯读库从end_user表返回RAG洞察缓存。
无数据时返回空结构不触发LLM生成。
Args:
end_user_id: 宿主ID
limit: 返回的chunk数量限制
db: 数据库会话
current_user: 当前用户
Returns:
包含insight的字典
""" """
business_logger.info(f"获取chunk洞察: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}") from app.repositories.end_user_repository import EndUserRepository
try: business_logger.info(f"读取chunk洞察缓存: end_user_id={end_user_id}")
# 1. 获取chunk内容
rag_content = get_rag_content(end_user_id, limit, db, current_user) repo = EndUserRepository(db)
chunks = rag_content.get("contents", []) end_user = repo.get_by_id(uuid.UUID(end_user_id))
if not chunks: if not end_user:
business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}") return {"insight": "", "behavior_pattern": "", "key_findings": "", "growth_trajectory": "", "generated": False}
return {
"insight": "暂无足够数据生成洞察报告" return {
} "insight": end_user.memory_insight or "",
"behavior_pattern": end_user.behavior_pattern or "",
# 2. 导入RAG工具函数 "key_findings": end_user.key_findings or "",
from app.core.rag_utils import generate_chunk_insight "growth_trajectory": end_user.growth_trajectory or "",
"generated": bool(end_user.memory_insight),
# 3. 生成洞察 }
insight = await generate_chunk_insight(chunks, max_chunks=limit)
result = { async def generate_rag_profile(
"insight": insight end_user_id: str,
} limit: int,
max_tags: int,
business_logger.info("成功获取chunk洞察") db: Session,
return result current_user: User,
) -> dict:
except Exception as e: """
business_logger.error(f"获取chunk洞察失败: end_user_id={end_user_id} - {str(e)}") 生产接口为RAG存储模式的end_user全量重新生成并持久化完整画像数据。
raise 每次调用都会重新生成,覆盖已有数据。
生成内容:
- user_summary / rag_tags / rag_personas
- memory_insight / behavior_pattern / key_findings / growth_trajectory
"""
import json
import asyncio
from app.repositories.end_user_repository import EndUserRepository
from app.core.rag_utils import (
generate_chunk_summary,
extract_chunk_tags,
extract_chunk_persona,
generate_chunk_insight_sections,
)
business_logger.info(f"开始生产RAG画像: end_user_id={end_user_id}, 操作者: {current_user.username}")
repo = EndUserRepository(db)
end_user = repo.get_by_id(uuid.UUID(end_user_id))
if not end_user:
raise ValueError(f"end_user {end_user_id} 不存在")
rag_content = get_rag_content(end_user_id, limit, db, current_user)
chunks = rag_content.get("contents", [])
if not chunks:
business_logger.warning(f"未找到chunk内容无法生产RAG画像: end_user_id={end_user_id}")
raise ValueError("暂无chunk内容无法生成画像")
summary, tags_with_freq, personas, insight_sections = await asyncio.gather(
generate_chunk_summary(chunks, max_chunks=limit, end_user_id=end_user_id),
extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit, end_user_id=end_user_id),
extract_chunk_persona(chunks, max_personas=5, max_chunks=limit, end_user_id=end_user_id),
generate_chunk_insight_sections(chunks, max_chunks=limit, end_user_id=end_user_id),
)
tags = [{"tag": tag, "frequency": freq} for tag, freq in tags_with_freq]
repo.update_rag_summary_tags(
end_user_id=end_user.id,
user_summary=summary,
rag_tags=json.dumps(tags, ensure_ascii=False),
rag_personas=json.dumps(personas, ensure_ascii=False),
)
repo.update_memory_insight(
end_user_id=end_user.id,
memory_insight=insight_sections.get("memory_insight", ""),
behavior_pattern=insight_sections.get("behavior_pattern", ""),
key_findings=insight_sections.get("key_findings", ""),
growth_trajectory=insight_sections.get("growth_trajectory", ""),
)
business_logger.info(f"RAG画像生产完成: end_user_id={end_user_id}, tags={len(tags)}, personas={len(personas)}")
return {
"end_user_id": end_user_id,
"summary_length": len(summary),
"tags_count": len(tags),
"personas_count": len(personas),
"insight_generated": bool(insight_sections.get("memory_insight")),
}