[add] Add cache to RAG storage
This commit is contained in:
@@ -51,6 +51,12 @@ class EndUser(Base):
|
|||||||
growth_trajectory = Column(Text, nullable=True, comment="成长轨迹")
|
growth_trajectory = Column(Text, nullable=True, comment="成长轨迹")
|
||||||
memory_insight_updated_at = Column(DateTime, nullable=True, comment="洞察报告最后更新时间")
|
memory_insight_updated_at = Column(DateTime, nullable=True, comment="洞察报告最后更新时间")
|
||||||
|
|
||||||
|
# RAG存储模式专用字段 - RAG Storage Mode Fields
|
||||||
|
storage_type = Column(String, nullable=True, default="neo4j", comment="存储模式类型: neo4j / rag")
|
||||||
|
rag_tags = Column(Text, nullable=True, comment="RAG模式下提取的标签列表(JSON格式)")
|
||||||
|
rag_personas = Column(Text, nullable=True, comment="RAG模式下提取的人物形象列表(JSON格式)")
|
||||||
|
rag_summary_updated_at = Column(DateTime, nullable=True, comment="RAG摘要/标签/人物形象最后更新时间")
|
||||||
|
|
||||||
# 与 App 的反向关系
|
# 与 App 的反向关系
|
||||||
app = relationship(
|
app = relationship(
|
||||||
"App",
|
"App",
|
||||||
|
|||||||
@@ -220,6 +220,90 @@ class EndUserRepository:
|
|||||||
db_logger.error(f"更新终端用户 {end_user_id} 的用户摘要缓存时出错: {str(e)}")
|
db_logger.error(f"更新终端用户 {end_user_id} 的用户摘要缓存时出错: {str(e)}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def update_rag_summary_tags(
|
||||||
|
self,
|
||||||
|
end_user_id: uuid.UUID,
|
||||||
|
user_summary: str,
|
||||||
|
rag_tags: str,
|
||||||
|
rag_personas: str,
|
||||||
|
) -> bool:
|
||||||
|
"""更新RAG模式下的用户摘要、标签和人物形象缓存
|
||||||
|
|
||||||
|
Args:
|
||||||
|
end_user_id: 终端用户ID
|
||||||
|
user_summary: 用户摘要文本
|
||||||
|
rag_tags: 标签列表(JSON字符串)
|
||||||
|
rag_personas: 人物形象列表(JSON字符串)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: 更新成功返回True,否则返回False
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
updated_count = (
|
||||||
|
self.db.query(EndUser)
|
||||||
|
.filter(EndUser.id == end_user_id)
|
||||||
|
.update(
|
||||||
|
{
|
||||||
|
EndUser.user_summary: user_summary,
|
||||||
|
EndUser.rag_tags: rag_tags,
|
||||||
|
EndUser.rag_personas: rag_personas,
|
||||||
|
EndUser.storage_type: "rag",
|
||||||
|
EndUser.rag_summary_updated_at: datetime.datetime.now(),
|
||||||
|
},
|
||||||
|
synchronize_session=False
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.db.commit()
|
||||||
|
if updated_count > 0:
|
||||||
|
db_logger.info(f"成功更新终端用户 {end_user_id} 的RAG摘要/标签/人物形象缓存")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
db_logger.warning(f"未找到终端用户 {end_user_id},无法更新RAG摘要缓存")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
self.db.rollback()
|
||||||
|
db_logger.error(f"更新终端用户 {end_user_id} 的RAG摘要缓存时出错: {str(e)}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def update_rag_insight(
|
||||||
|
self,
|
||||||
|
end_user_id: uuid.UUID,
|
||||||
|
memory_insight: str,
|
||||||
|
) -> bool:
|
||||||
|
"""更新RAG模式下的记忆洞察缓存
|
||||||
|
|
||||||
|
Args:
|
||||||
|
end_user_id: 终端用户ID
|
||||||
|
memory_insight: 洞察文本
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: 更新成功返回True,否则返回False
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
updated_count = (
|
||||||
|
self.db.query(EndUser)
|
||||||
|
.filter(EndUser.id == end_user_id)
|
||||||
|
.update(
|
||||||
|
{
|
||||||
|
EndUser.memory_insight: memory_insight,
|
||||||
|
EndUser.storage_type: "rag",
|
||||||
|
EndUser.memory_insight_updated_at: datetime.datetime.now(),
|
||||||
|
},
|
||||||
|
synchronize_session=False
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.db.commit()
|
||||||
|
if updated_count > 0:
|
||||||
|
db_logger.info(f"成功更新终端用户 {end_user_id} 的RAG洞察缓存")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
db_logger.warning(f"未找到终端用户 {end_user_id},无法更新RAG洞察缓存")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
self.db.rollback()
|
||||||
|
db_logger.error(f"更新终端用户 {end_user_id} 的RAG洞察缓存时出错: {str(e)}")
|
||||||
|
raise
|
||||||
|
|
||||||
def get_all_by_workspace(self, workspace_id: uuid.UUID) -> List[EndUser]:
|
def get_all_by_workspace(self, workspace_id: uuid.UUID) -> List[EndUser]:
|
||||||
"""获取工作空间的所有终端用户
|
"""获取工作空间的所有终端用户
|
||||||
|
|
||||||
|
|||||||
@@ -647,55 +647,63 @@ async def get_chunk_summary_and_tags(
|
|||||||
) -> dict:
|
) -> dict:
|
||||||
"""
|
"""
|
||||||
获取chunk的总结、标签和人物形象
|
获取chunk的总结、标签和人物形象
|
||||||
|
优先返回end_user表中的缓存,若无缓存则实时生成并写库
|
||||||
Args:
|
|
||||||
end_user_id: 宿主ID
|
|
||||||
limit: 返回的chunk数量限制
|
|
||||||
max_tags: 最大标签数量
|
|
||||||
db: 数据库会话
|
|
||||||
current_user: 当前用户
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
包含summary、tags和personas的字典
|
|
||||||
"""
|
"""
|
||||||
|
import json
|
||||||
|
from app.repositories.end_user_repository import EndUserRepository
|
||||||
|
|
||||||
business_logger.info(f"获取chunk摘要、标签和人物形象: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}")
|
business_logger.info(f"获取chunk摘要、标签和人物形象: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 1. 获取chunk内容
|
repo = EndUserRepository(db)
|
||||||
|
end_user = repo.get_by_id(uuid.UUID(end_user_id))
|
||||||
|
|
||||||
|
# 读缓存:user_summary / rag_tags / rag_personas 均有值时直接返回
|
||||||
|
if (
|
||||||
|
end_user
|
||||||
|
and end_user.user_summary
|
||||||
|
and end_user.rag_tags
|
||||||
|
and end_user.rag_personas
|
||||||
|
):
|
||||||
|
business_logger.info(f"命中缓存,直接返回end_user {end_user_id} 的摘要/标签/人物形象")
|
||||||
|
return {
|
||||||
|
"summary": end_user.user_summary,
|
||||||
|
"tags": json.loads(end_user.rag_tags),
|
||||||
|
"personas": json.loads(end_user.rag_personas),
|
||||||
|
}
|
||||||
|
|
||||||
|
# 无缓存:实时生成
|
||||||
rag_content = get_rag_content(end_user_id, limit, db, current_user)
|
rag_content = get_rag_content(end_user_id, limit, db, current_user)
|
||||||
chunks = rag_content.get("contents", [])
|
chunks = rag_content.get("contents", [])
|
||||||
|
|
||||||
if not chunks:
|
if not chunks:
|
||||||
business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}")
|
business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}")
|
||||||
return {
|
return {"summary": "暂无内容", "tags": [], "personas": []}
|
||||||
"summary": "暂无内容",
|
|
||||||
"tags": [],
|
|
||||||
"personas": []
|
|
||||||
}
|
|
||||||
|
|
||||||
# 2. 导入RAG工具函数
|
|
||||||
from app.core.rag_utils import generate_chunk_summary, extract_chunk_tags, extract_chunk_persona
|
from app.core.rag_utils import generate_chunk_summary, extract_chunk_tags, extract_chunk_persona
|
||||||
|
|
||||||
# 3. 并发生成摘要、提取标签和人物形象
|
|
||||||
import asyncio
|
import asyncio
|
||||||
summary_task = generate_chunk_summary(chunks, max_chunks=limit, end_user_id=end_user_id)
|
|
||||||
tags_task = extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit, end_user_id=end_user_id)
|
summary, tags_with_freq, personas = await asyncio.gather(
|
||||||
personas_task = extract_chunk_persona(chunks, max_personas=5, max_chunks=limit, end_user_id=end_user_id)
|
generate_chunk_summary(chunks, max_chunks=limit, end_user_id=end_user_id),
|
||||||
|
extract_chunk_tags(chunks, max_tags=max_tags, max_chunks=limit, end_user_id=end_user_id),
|
||||||
summary, tags_with_freq, personas = await asyncio.gather(summary_task, tags_task, personas_task)
|
extract_chunk_persona(chunks, max_personas=5, max_chunks=limit, end_user_id=end_user_id),
|
||||||
|
)
|
||||||
# 4. 格式化标签数据
|
|
||||||
tags = [{"tag": tag, "frequency": freq} for tag, freq in tags_with_freq]
|
tags = [{"tag": tag, "frequency": freq} for tag, freq in tags_with_freq]
|
||||||
|
|
||||||
result = {
|
# 写库缓存
|
||||||
"summary": summary,
|
if end_user:
|
||||||
"tags": tags,
|
repo.update_rag_summary_tags(
|
||||||
"personas": personas
|
end_user_id=end_user.id,
|
||||||
}
|
user_summary=summary,
|
||||||
|
rag_tags=json.dumps(tags, ensure_ascii=False),
|
||||||
|
rag_personas=json.dumps(personas, ensure_ascii=False),
|
||||||
|
)
|
||||||
|
|
||||||
|
result = {"summary": summary, "tags": tags, "personas": personas}
|
||||||
business_logger.info(f"成功获取chunk摘要、{len(tags)} 个标签和 {len(personas)} 个人物形象")
|
business_logger.info(f"成功获取chunk摘要、{len(tags)} 个标签和 {len(personas)} 个人物形象")
|
||||||
return result
|
return result
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
business_logger.error(f"获取chunk摘要、标签和人物形象失败: end_user_id={end_user_id} - {str(e)}")
|
business_logger.error(f"获取chunk摘要、标签和人物形象失败: end_user_id={end_user_id} - {str(e)}")
|
||||||
raise
|
raise
|
||||||
@@ -709,42 +717,40 @@ async def get_chunk_insight(
|
|||||||
) -> dict:
|
) -> dict:
|
||||||
"""
|
"""
|
||||||
获取chunk的洞察分析
|
获取chunk的洞察分析
|
||||||
|
优先返回end_user表中的缓存,若无缓存则实时生成并写库
|
||||||
Args:
|
|
||||||
end_user_id: 宿主ID
|
|
||||||
limit: 返回的chunk数量限制
|
|
||||||
db: 数据库会话
|
|
||||||
current_user: 当前用户
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
包含insight的字典
|
|
||||||
"""
|
"""
|
||||||
|
from app.repositories.end_user_repository import EndUserRepository
|
||||||
|
|
||||||
business_logger.info(f"获取chunk洞察: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}")
|
business_logger.info(f"获取chunk洞察: end_user_id={end_user_id}, limit={limit}, 操作者: {current_user.username}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 1. 获取chunk内容
|
repo = EndUserRepository(db)
|
||||||
|
end_user = repo.get_by_id(uuid.UUID(end_user_id))
|
||||||
|
|
||||||
|
# 读缓存
|
||||||
|
if end_user and end_user.memory_insight:
|
||||||
|
business_logger.info(f"命中缓存,直接返回end_user {end_user_id} 的洞察")
|
||||||
|
return {"insight": end_user.memory_insight}
|
||||||
|
|
||||||
|
# 无缓存:实时生成
|
||||||
rag_content = get_rag_content(end_user_id, limit, db, current_user)
|
rag_content = get_rag_content(end_user_id, limit, db, current_user)
|
||||||
chunks = rag_content.get("contents", [])
|
chunks = rag_content.get("contents", [])
|
||||||
|
|
||||||
if not chunks:
|
if not chunks:
|
||||||
business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}")
|
business_logger.warning(f"未找到chunk内容: end_user_id={end_user_id}")
|
||||||
return {
|
return {"insight": "暂无足够数据生成洞察报告"}
|
||||||
"insight": "暂无足够数据生成洞察报告"
|
|
||||||
}
|
|
||||||
|
|
||||||
# 2. 导入RAG工具函数
|
|
||||||
from app.core.rag_utils import generate_chunk_insight
|
from app.core.rag_utils import generate_chunk_insight
|
||||||
|
|
||||||
# 3. 生成洞察
|
|
||||||
insight = await generate_chunk_insight(chunks, max_chunks=limit, end_user_id=end_user_id)
|
insight = await generate_chunk_insight(chunks, max_chunks=limit, end_user_id=end_user_id)
|
||||||
|
|
||||||
result = {
|
# 写库缓存
|
||||||
"insight": insight
|
if end_user:
|
||||||
}
|
repo.update_rag_insight(end_user_id=end_user.id, memory_insight=insight)
|
||||||
|
|
||||||
business_logger.info("成功获取chunk洞察")
|
business_logger.info("成功获取chunk洞察")
|
||||||
return result
|
return {"insight": insight}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
business_logger.error(f"获取chunk洞察失败: end_user_id={end_user_id} - {str(e)}")
|
business_logger.error(f"获取chunk洞察失败: end_user_id={end_user_id} - {str(e)}")
|
||||||
raise
|
raise
|
||||||
Reference in New Issue
Block a user