Merge #105 into develop from feature/user-summary

[feature]用户记忆内容扩展

* feature/user-summary: (11 commits squashed)

  - [ADD]Support graph search

  - Merge #82 into develop from feature/20251219_myh
    
    fix: correct function naming for memory retrieval
    
    * feature/20251219_myh: (2 commits squashed)
    
      - perf(workflow): adjust default template to be compatible with frontend format
    
      - fix: correct function naming for memory retrieval
    
    Signed-off-by: Eternity <1533512157@qq.com>
    Reviewed-by: zhuwenhui5566@163.com <zhuwenhui5566@163.com>
    Reviewed-by: aliyun6762716068 <accounts_68cb7c6b61f5dcc4200d6251@mail.teambition.com>
    Merged-by: aliyun6762716068 <accounts_68cb7c6b61f5dcc4200d6251@mail.teambition.com>
    
    CR-link: https://codeup.aliyun.com/redbearai/python/redbear-mem-open/change/82

  - [fix]parsed excel document error:float division by zero

  - [fix]parsed excel document error:float division by zero

  - [fix]parsed excel document error:float division by zero

  - [fix]parsed excel document error:float division by zero

  - [changes]1.Fix the Neo4j alert;2.Separate the functions of &quot;insight&quot; and &quot;summary&quot;

  - [feature]Develop user summary

  - [feature]Developing Memory Insights

  - [changes]Modify the data types and processing procedures of the configuration parameters

  - [fix]fix

Signed-off-by: 乐力齐 <accounts_690c7b0af9007d7e338af636@mail.teambition.com>
Reviewed-by: aliyun6762716068 <accounts_68cb7c6b61f5dcc4200d6251@mail.teambition.com>
Merged-by: aliyun6762716068 <accounts_68cb7c6b61f5dcc4200d6251@mail.teambition.com>

CR-link: https://codeup.aliyun.com/redbearai/python/redbear-mem-open/change/105
This commit is contained in:
乐力齐
2026-01-05 04:34:12 +00:00
committed by 孙科
parent e8a5cfe7e3
commit 1fc81d1347
10 changed files with 881 additions and 249 deletions

View File

@@ -4,20 +4,25 @@ User Memory Service
处理用户记忆相关的业务逻辑,包括记忆洞察、用户摘要、节点统计和图数据等。
"""
import os
import uuid
from typing import Any, Dict, List, Optional
from collections import Counter
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from app.core.logging_config import get_logger
from app.core.memory.analytics.memory_insight import MemoryInsight
from app.core.memory.analytics.user_summary import generate_user_summary
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context
from app.repositories.end_user_repository import EndUserRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.services.memory_config_service import MemoryConfigService
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
logger = get_logger(__name__)
# Neo4j connector instance
_neo4j_connector = Neo4jConnector()
# Neo4j connector instan
class UserMemoryService:
@@ -26,13 +31,42 @@ class UserMemoryService:
def __init__(self):
logger.info("UserMemoryService initialized")
@staticmethod
def _datetime_to_timestamp(dt: Optional[Any]) -> Optional[int]:
"""将 DateTime 对象转换为时间戳(毫秒)"""
if dt is None:
return None
if hasattr(dt, 'timestamp'):
return int(dt.timestamp() * 1000)
return None
@staticmethod
def convert_profile_to_dict_with_timestamp(profile_data: Any) -> dict:
"""
将 Pydantic 模型转换为字典,自动转换所有 DateTime 字段为时间戳(毫秒)
Args:
profile_data: Pydantic 模型对象
Returns:
包含时间戳的字典
"""
data = profile_data.model_dump()
# 自动转换所有 datetime 类型的字段
for key, value in data.items():
if hasattr(profile_data, key):
original_value = getattr(profile_data, key)
if hasattr(original_value, 'timestamp'):
data[key] = UserMemoryService._datetime_to_timestamp(original_value)
return data
async def get_cached_memory_insight(
self,
db: Session,
end_user_id: str
) -> Dict[str, Any]:
"""
从数据库获取缓存的记忆洞察
从数据库获取缓存的记忆洞察(四个维度)
Args:
db: 数据库会话
@@ -40,8 +74,11 @@ class UserMemoryService:
Returns:
{
"report": str,
"updated_at": datetime,
"memory_insight": str, # 总体概述
"behavior_pattern": str, # 行为模式
"key_findings": List[str], # 关键发现(数组)
"growth_trajectory": str, # 成长轨迹
"updated_at": int, # 时间戳(毫秒)
"is_cached": bool
}
"""
@@ -54,24 +91,52 @@ class UserMemoryService:
if not end_user:
logger.warning(f"未找到 end_user_id 为 {end_user_id} 的用户")
return {
"report": None,
"memory_insight": None,
"behavior_pattern": None,
"key_findings": None,
"growth_trajectory": None,
"updated_at": None,
"is_cached": False,
"message": "用户不存在"
}
# 检查是否有缓存数据
if end_user.memory_insight:
logger.info(f"成功获取 end_user_id {end_user_id} 的缓存记忆洞察")
# 检查是否有缓存数据(至少有一个字段不为空)
has_cache = any([
end_user.memory_insight,
end_user.behavior_pattern,
end_user.key_findings,
end_user.growth_trajectory
])
if has_cache:
# 反序列化 key_findings从 JSON 字符串转为数组)
key_findings_value = end_user.key_findings
if key_findings_value:
try:
import json
key_findings_array = json.loads(key_findings_value)
except (json.JSONDecodeError, TypeError):
# 如果解析失败,尝试按 • 分割(兼容旧数据)
key_findings_array = [item.strip() for item in key_findings_value.split('') if item.strip()]
else:
key_findings_array = []
logger.info(f"成功获取 end_user_id {end_user_id} 的缓存记忆洞察(四维度)")
return {
"report": end_user.memory_insight,
"updated_at": end_user.memory_insight_updated_at,
"memory_insight": end_user.memory_insight, # 总体概述存储在 memory_insight
"behavior_pattern": end_user.behavior_pattern,
"key_findings": key_findings_array, # 返回数组
"growth_trajectory": end_user.growth_trajectory,
"updated_at": self._datetime_to_timestamp(end_user.memory_insight_updated_at),
"is_cached": True
}
else:
logger.info(f"end_user_id {end_user_id} 的记忆洞察缓存为空")
return {
"report": None,
"memory_insight": None,
"behavior_pattern": None,
"key_findings": None,
"growth_trajectory": None,
"updated_at": None,
"is_cached": False,
"message": "数据尚未生成,请稍后重试或联系管理员"
@@ -80,7 +145,10 @@ class UserMemoryService:
except ValueError:
logger.error(f"无效的 end_user_id 格式: {end_user_id}")
return {
"report": None,
"memory_insight": None,
"behavior_pattern": None,
"key_findings": None,
"growth_trajectory": None,
"updated_at": None,
"is_cached": False,
"message": "无效的用户ID格式"
@@ -95,7 +163,7 @@ class UserMemoryService:
end_user_id: str
) -> Dict[str, Any]:
"""
从数据库获取缓存的用户摘要
从数据库获取缓存的用户摘要(四个部分)
Args:
db: 数据库会话
@@ -103,7 +171,10 @@ class UserMemoryService:
Returns:
{
"summary": str,
"user_summary": str,
"personality": str,
"core_values": str,
"one_sentence": str,
"updated_at": datetime,
"is_cached": bool
}
@@ -117,24 +188,40 @@ class UserMemoryService:
if not end_user:
logger.warning(f"未找到 end_user_id 为 {end_user_id} 的用户")
return {
"summary": None,
"user_summary": None,
"personality": None,
"core_values": None,
"one_sentence": None,
"updated_at": None,
"is_cached": False,
"message": "用户不存在"
}
# 检查是否有缓存数据
if end_user.user_summary:
# 检查是否有缓存数据(至少有一个字段不为空)
has_cache = any([
end_user.user_summary,
end_user.personality_traits,
end_user.core_values,
end_user.one_sentence_summary
])
if has_cache:
logger.info(f"成功获取 end_user_id {end_user_id} 的缓存用户摘要")
return {
"summary": end_user.user_summary,
"updated_at": end_user.user_summary_updated_at,
"user_summary": end_user.user_summary,
"personality": end_user.personality_traits,
"core_values": end_user.core_values,
"one_sentence": end_user.one_sentence_summary,
"updated_at": self._datetime_to_timestamp(end_user.user_summary_updated_at),
"is_cached": True
}
else:
logger.info(f"end_user_id {end_user_id} 的用户摘要缓存为空")
return {
"summary": None,
"user_summary": None,
"personality": None,
"core_values": None,
"one_sentence": None,
"updated_at": None,
"is_cached": False,
"message": "数据尚未生成,请稍后重试或联系管理员"
@@ -143,7 +230,10 @@ class UserMemoryService:
except ValueError:
logger.error(f"无效的 end_user_id 格式: {end_user_id}")
return {
"summary": None,
"user_summary": None,
"personality": None,
"core_values": None,
"one_sentence": None,
"updated_at": None,
"is_cached": False,
"message": "无效的用户ID格式"
@@ -151,7 +241,8 @@ class UserMemoryService:
except Exception as e:
logger.error(f"获取缓存用户摘要时出错: {str(e)}")
raise
# for user
async def generate_and_cache_insight(
self,
db: Session,
@@ -169,7 +260,10 @@ class UserMemoryService:
Returns:
{
"success": bool,
"report": str,
"memory_insight": str,
"behavior_pattern": str,
"key_findings": List[str], # 数组格式
"growth_trajectory": str,
"error": Optional[str]
}
"""
@@ -185,7 +279,10 @@ class UserMemoryService:
logger.error(f"end_user_id {end_user_id} 不存在")
return {
"success": False,
"report": None,
"memory_insight": None,
"behavior_pattern": None,
"key_findings": None,
"growth_trajectory": None,
"error": "用户不存在"
}
@@ -193,31 +290,55 @@ class UserMemoryService:
try:
logger.info(f"使用 end_user_id={end_user_id} 生成记忆洞察")
result = await analytics_memory_insight_report(end_user_id)
report = result.get("report", "")
if not report:
memory_insight = result.get("memory_insight", "")
behavior_pattern = result.get("behavior_pattern", "")
key_findings_array = result.get("key_findings", []) # 现在是数组
growth_trajectory = result.get("growth_trajectory", "")
# 将 key_findings 数组序列化为 JSON 字符串以存储到数据库
import json
key_findings_json = json.dumps(key_findings_array, ensure_ascii=False) if key_findings_array else ""
if not any([memory_insight, behavior_pattern, key_findings_array, growth_trajectory]):
logger.warning(f"end_user_id {end_user_id} 的记忆洞察生成结果为空")
return {
"success": False,
"report": None,
"memory_insight": None,
"behavior_pattern": None,
"key_findings": None,
"growth_trajectory": None,
"error": "生成的洞察报告为空,可能Neo4j中没有该用户的数据"
}
# 更新数据库缓存
success = repo.update_memory_insight(user_uuid, report)
# 更新数据库缓存(四个维度)
# 注意key_findings 存储为 JSON 字符串
success = repo.update_memory_insight(
user_uuid,
memory_insight,
behavior_pattern,
key_findings_json, # 存储 JSON 字符串
growth_trajectory
)
if success:
logger.info(f"成功为 end_user_id {end_user_id} 生成并缓存记忆洞察")
logger.info(f"成功为 end_user_id {end_user_id} 生成并缓存记忆洞察(四维度)")
return {
"success": True,
"report": report,
"memory_insight": memory_insight,
"behavior_pattern": behavior_pattern,
"key_findings": key_findings_array, # 返回数组
"growth_trajectory": growth_trajectory,
"error": None
}
else:
logger.error(f"更新 end_user_id {end_user_id} 的记忆洞察缓存失败")
return {
"success": False,
"report": report,
"memory_insight": memory_insight,
"behavior_pattern": behavior_pattern,
"key_findings": key_findings_array, # 返回数组
"growth_trajectory": growth_trajectory,
"error": "数据库更新失败"
}
@@ -225,7 +346,10 @@ class UserMemoryService:
logger.error(f"调用分析函数生成记忆洞察时出错: {str(e)}")
return {
"success": False,
"report": None,
"memory_insight": None,
"behavior_pattern": None,
"key_findings": None,
"growth_trajectory": None,
"error": f"Neo4j或LLM服务不可用: {str(e)}"
}
@@ -233,14 +357,20 @@ class UserMemoryService:
logger.error(f"无效的 end_user_id 格式: {end_user_id}")
return {
"success": False,
"report": None,
"memory_insight": None,
"behavior_pattern": None,
"key_findings": None,
"growth_trajectory": None,
"error": "无效的用户ID格式"
}
except Exception as e:
logger.error(f"生成并缓存记忆洞察时出错: {str(e)}")
return {
"success": False,
"report": None,
"memory_insight": None,
"behavior_pattern": None,
"key_findings": None,
"growth_trajectory": None,
"error": str(e)
}
@@ -251,7 +381,7 @@ class UserMemoryService:
workspace_id: Optional[uuid.UUID] = None
) -> Dict[str, Any]:
"""
生成并缓存用户摘要
生成并缓存用户摘要(四个部分)
Args:
db: 数据库会话
@@ -261,7 +391,10 @@ class UserMemoryService:
Returns:
{
"success": bool,
"summary": str,
"user_summary": str,
"personality": str,
"core_values": str,
"one_sentence": str,
"error": Optional[str]
}
"""
@@ -277,38 +410,61 @@ class UserMemoryService:
logger.error(f"end_user_id {end_user_id} 不存在")
return {
"success": False,
"summary": None,
"user_summary": None,
"personality": None,
"core_values": None,
"one_sentence": None,
"error": "用户不存在"
}
# 使用 end_user_id 调用分析函数
try:
logger.info(f"使用 end_user_id={end_user_id} 生成用户摘要")
summary = await generate_user_summary(end_user_id)
result = await analytics_user_summary(end_user_id)
if not summary:
user_summary = result.get("user_summary", "")
personality = result.get("personality", "")
core_values = result.get("core_values", "")
one_sentence = result.get("one_sentence", "")
if not any([user_summary, personality, core_values, one_sentence]):
logger.warning(f"end_user_id {end_user_id} 的用户摘要生成结果为空")
return {
"success": False,
"summary": None,
"user_summary": None,
"personality": None,
"core_values": None,
"one_sentence": None,
"error": "生成的用户摘要为空,可能Neo4j中没有该用户的数据"
}
# 更新数据库缓存
success = repo.update_user_summary(user_uuid, summary)
success = repo.update_user_summary(
user_uuid,
user_summary,
personality,
core_values,
one_sentence
)
if success:
logger.info(f"成功为 end_user_id {end_user_id} 生成并缓存用户摘要")
return {
"success": True,
"summary": summary,
"user_summary": user_summary,
"personality": personality,
"core_values": core_values,
"one_sentence": one_sentence,
"error": None
}
else:
logger.error(f"更新 end_user_id {end_user_id} 的用户摘要缓存失败")
return {
"success": False,
"summary": summary,
"user_summary": user_summary,
"personality": personality,
"core_values": core_values,
"one_sentence": one_sentence,
"error": "数据库更新失败"
}
@@ -316,7 +472,10 @@ class UserMemoryService:
logger.error(f"调用分析函数生成用户摘要时出错: {str(e)}")
return {
"success": False,
"summary": None,
"user_summary": None,
"personality": None,
"core_values": None,
"one_sentence": None,
"error": f"Neo4j或LLM服务不可用: {str(e)}"
}
@@ -324,17 +483,24 @@ class UserMemoryService:
logger.error(f"无效的 end_user_id 格式: {end_user_id}")
return {
"success": False,
"summary": None,
"user_summary": None,
"personality": None,
"core_values": None,
"one_sentence": None,
"error": "无效的用户ID格式"
}
except Exception as e:
logger.error(f"生成并缓存用户摘要时出错: {str(e)}")
return {
"success": False,
"summary": None,
"user_summary": None,
"personality": None,
"core_values": None,
"one_sentence": None,
"error": str(e)
}
# for workspace
async def generate_cache_for_workspace(
self,
db: Session,
@@ -432,34 +598,212 @@ class UserMemoryService:
async def analytics_memory_insight_report(end_user_id: Optional[str] = None) -> Dict[str, Any]:
"""
生成记忆洞察报告
生成记忆洞察报告(四个维度)
这个函数包含完整的业务逻辑:
1. 使用 MemoryInsight 工具类获取基础数据(领域分布、活跃时段、社交关联)
2. 使用 Jinja2 模板渲染提示词
3. 调用 LLM 生成四个维度的自然语言报告
4. 解析并返回四个部分
Args:
end_user_id: 可选的终端用户ID
Returns:
包含报告的字典
包含四个维度报告的字典: {
"memory_insight": str, # 总体概述
"behavior_pattern": str, # 行为模式
"key_findings": List[str], # 关键发现(数组)
"growth_trajectory": str # 成长轨迹
}
"""
from app.core.memory.utils.prompt.prompt_utils import render_memory_insight_prompt
import re
insight = MemoryInsight(end_user_id)
report = await insight.generate_insight_report()
await insight.close()
data = {"report": report}
return data
try:
# 1. 并行获取三个维度的数据
import asyncio
domain_dist, active_periods, social_conn = await asyncio.gather(
insight.get_domain_distribution(),
insight.get_active_periods(),
insight.get_social_connections(),
)
# 2. 构建数据字符串
domain_distribution_str = None
if domain_dist:
top_domains = ", ".join([f"{k}({v:.0%})" for k, v in list(domain_dist.items())[:3]])
domain_distribution_str = f"用户的记忆主要集中在 {top_domains}"
active_periods_str = None
if active_periods:
months_str = "".join(map(str, active_periods))
active_periods_str = f"用户在每年的 {months_str} 月最为活跃"
social_connections_str = None
if social_conn:
social_connections_str = f"与用户\"{social_conn['user_id']}\"拥有最多共同记忆({social_conn['common_memories_count']}条),时间范围主要在 {social_conn['time_range']}"
# 3. 如果没有足够数据,返回默认消息
if not any([domain_distribution_str, active_periods_str, social_connections_str]):
return {
"memory_insight": "暂无足够数据生成洞察报告。",
"behavior_pattern": "",
"key_findings": "",
"growth_trajectory": ""
}
# 4. 使用 Jinja2 模板渲染提示词
user_prompt = await render_memory_insight_prompt(
domain_distribution=domain_distribution_str,
active_periods=active_periods_str,
social_connections=social_connections_str
)
messages = [
{"role": "user", "content": user_prompt}
]
# 5. 调用 LLM 生成报告
response = await insight.llm_client.chat(messages=messages)
# 6. 处理 LLM 响应,确保返回字符串类型
content = response.content
if isinstance(content, list):
if len(content) > 0:
if isinstance(content[0], dict):
text = content[0].get('text', content[0].get('content', str(content[0])))
full_response = str(text)
else:
full_response = str(content[0])
else:
full_response = ""
elif isinstance(content, dict):
full_response = str(content.get('text', content.get('content', str(content))))
else:
full_response = str(content) if content is not None else ""
# 7. 解析四个部分
# 使用正则表达式提取四个部分
memory_insight_match = re.search(r'【总体概述】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
behavior_match = re.search(r'【行为模式】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
findings_match = re.search(r'【关键发现】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
trajectory_match = re.search(r'【成长轨迹】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
memory_insight = memory_insight_match.group(1).strip() if memory_insight_match else ""
behavior_pattern = behavior_match.group(1).strip() if behavior_match else ""
key_findings_text = findings_match.group(1).strip() if findings_match else ""
growth_trajectory = trajectory_match.group(1).strip() if trajectory_match else ""
# 将 key_findings 从文本转换为数组
# 按 • 符号分割,并清理每个条目
key_findings_array = []
if key_findings_text:
# 分割并清理每个条目
items = [item.strip() for item in key_findings_text.split('') if item.strip()]
key_findings_array = items
return {
"memory_insight": memory_insight,
"behavior_pattern": behavior_pattern,
"key_findings": key_findings_array, # 返回数组而不是字符串
"growth_trajectory": growth_trajectory
}
finally:
# 确保关闭连接
await insight.close()
async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str, Any]:
"""
生成用户摘要
生成用户摘要(包含四个部分)
这个函数包含完整的业务逻辑:
1. 使用 UserSummary 工具类获取基础数据(实体、语句)
2. 使用 prompt_utils 渲染提示词
3. 调用 LLM 生成四部分内容:基本介绍、性格特点、核心价值观、一句话总结
Args:
end_user_id: 可选的终端用户ID
Returns:
包含摘要的字典
包含四部分摘要的字典: {
"user_summary": str,
"personality": str,
"core_values": str,
"one_sentence": str
}
"""
summary = await generate_user_summary(end_user_id)
data = {"summary": summary}
return data
from app.core.memory.analytics.user_summary import UserSummary
from app.core.memory.utils.prompt.prompt_utils import render_user_summary_prompt
import re
# 创建 UserSummary 实例
user_summary_tool = UserSummary(end_user_id or os.getenv("SELECTED_GROUP_ID", "group_123"))
try:
# 1) 收集上下文数据
entities = await user_summary_tool._get_top_entities(limit=40)
statements = await user_summary_tool._get_recent_statements(limit=100)
entity_lines = [f"{name} ({freq})" for name, freq in entities][:20]
statement_samples = [s.statement.strip() for s in statements if (s.statement or '').strip()][:20]
# 2) 使用 prompt_utils 渲染提示词
user_prompt = await render_user_summary_prompt(
user_id=user_summary_tool.user_id,
entities=", ".join(entity_lines) if entity_lines else "(空)",
statements=" | ".join(statement_samples) if statement_samples else "(空)"
)
messages = [
{"role": "user", "content": user_prompt},
]
# 3) 调用 LLM 生成摘要
response = await user_summary_tool.llm.chat(messages=messages)
# 4) 处理 LLM 响应,确保返回字符串类型
content = response.content
if isinstance(content, list):
if len(content) > 0:
if isinstance(content[0], dict):
text = content[0].get('text', content[0].get('content', str(content[0])))
full_response = str(text)
else:
full_response = str(content[0])
else:
full_response = ""
elif isinstance(content, dict):
full_response = str(content.get('text', content.get('content', str(content))))
else:
full_response = str(content) if content is not None else ""
# 5) 解析四个部分
# 使用正则表达式提取四个部分
user_summary_match = re.search(r'【基本介绍】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
personality_match = re.search(r'【性格特点】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
core_values_match = re.search(r'【核心价值观】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
one_sentence_match = re.search(r'【一句话总结】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL)
user_summary = user_summary_match.group(1).strip() if user_summary_match else ""
personality = personality_match.group(1).strip() if personality_match else ""
core_values = core_values_match.group(1).strip() if core_values_match else ""
one_sentence = one_sentence_match.group(1).strip() if one_sentence_match else ""
return {
"user_summary": user_summary,
"personality": personality,
"core_values": core_values,
"one_sentence": one_sentence
}
finally:
# 确保关闭连接
await user_summary_tool.close()
async def analytics_node_statistics(