Refactor/memory statistics (#99)

* [refactor]Reconstructing forgotten, emotional, situational, and explicit memory statistics

* [refactor]Reconstructing forgotten, emotional, situational, and explicit memory statistics

* [changes]Improve the code based on AI review
This commit is contained in:
乐力齐
2026-01-13 20:27:27 +08:00
committed by GitHub
parent 70cbda27eb
commit b71f67f7df
7 changed files with 371 additions and 26 deletions

View File

@@ -30,7 +30,7 @@ from sqlalchemy.orm import Session
api_logger = get_api_logger()
router = APIRouter(
prefix="/memory/emotion",
prefix="/memory/emotion-memory",
tags=["Emotion Analysis"],
dependencies=[Depends(get_current_user)] # 所有路由都需要认证
)

View File

@@ -39,7 +39,7 @@ from app.services.memory_forget_service import MemoryForgetService
api_logger = get_api_logger()
router = APIRouter(
prefix="/memory/forget",
prefix="/memory/forget-memory",
tags=["Memory Forgetting Engine"],
dependencies=[Depends(get_current_user)] # 所有路由都需要认证
)

View File

@@ -1,6 +1,7 @@
import asyncio
import json
from datetime import datetime
from typing import List, Optional
from typing import List, Optional, Tuple
from uuid import uuid4
from app.core.logging_config import get_memory_logger
@@ -28,6 +29,118 @@ class MemorySummaryResponse(RobustLLMResponse):
)
async def generate_title_and_type_for_summary(
content: str,
llm_client
) -> Tuple[str, str]:
"""
为MemorySummary生成标题和类型
此方法应该在创建MemorySummary节点时调用生成title和type
Args:
content: Summary的内容文本
llm_client: LLM客户端实例
Returns:
(标题, 类型)元组
"""
from app.core.memory.utils.prompt.prompt_utils import render_episodic_title_and_type_prompt
# 定义有效的类型集合
VALID_TYPES = {
"conversation", # 对话
"project_work", # 项目/工作
"learning", # 学习
"decision", # 决策
"important_event" # 重要事件
}
DEFAULT_TYPE = "conversation" # 默认类型
try:
if not content:
logger.warning("content为空无法生成标题和类型")
return ("空内容", DEFAULT_TYPE)
# 1. 渲染Jinja2提示词模板
prompt = await render_episodic_title_and_type_prompt(content)
# 2. 调用LLM生成标题和类型
messages = [
{"role": "user", "content": prompt}
]
response = await llm_client.chat(messages=messages)
# 3. 解析LLM响应
content_response = response.content
if isinstance(content_response, list):
if len(content_response) > 0:
if isinstance(content_response[0], dict):
text = content_response[0].get('text', content_response[0].get('content', str(content_response[0])))
full_response = str(text)
else:
full_response = str(content_response[0])
else:
full_response = ""
elif isinstance(content_response, dict):
full_response = str(content_response.get('text', content_response.get('content', str(content_response))))
else:
full_response = str(content_response) if content_response is not None else ""
# 4. 解析JSON响应
try:
# 尝试从响应中提取JSON
# 移除可能的markdown代码块标记
json_str = full_response.strip()
if json_str.startswith("```json"):
json_str = json_str[7:]
if json_str.startswith("```"):
json_str = json_str[3:]
if json_str.endswith("```"):
json_str = json_str[:-3]
json_str = json_str.strip()
result_data = json.loads(json_str)
title = result_data.get("title", "未知标题")
episodic_type_raw = result_data.get("type", DEFAULT_TYPE)
# 5. 校验和归一化类型
# 将类型转换为小写并去除空格
episodic_type_normalized = str(episodic_type_raw).lower().strip()
# 检查是否在有效类型集合中
if episodic_type_normalized in VALID_TYPES:
episodic_type = episodic_type_normalized
else:
# 尝试映射常见的中文类型到英文
type_mapping = {
"对话": "conversation",
"项目": "project_work",
"工作": "project_work",
"项目/工作": "project_work",
"学习": "learning",
"决策": "decision",
"重要事件": "important_event",
"事件": "important_event"
}
episodic_type = type_mapping.get(episodic_type_raw, DEFAULT_TYPE)
logger.warning(
f"LLM返回的类型 '{episodic_type_raw}' 不在有效集合中,"
f"已归一化为 '{episodic_type}'"
)
logger.info(f"成功生成标题和类型: title={title}, type={episodic_type}")
return (title, episodic_type)
except json.JSONDecodeError:
logger.error(f"无法解析LLM响应为JSON: {full_response}")
return ("解析失败", DEFAULT_TYPE)
except Exception as e:
logger.error(f"生成标题和类型时出错: {str(e)}", exc_info=True)
return ("错误", DEFAULT_TYPE)
async def _process_chunk_summary(
dialog: DialogData,
chunk,
@@ -63,10 +176,9 @@ async def _process_chunk_summary(
title = None
episodic_type = None
try:
from app.services.user_memory_service import UserMemoryService
title, episodic_type = await UserMemoryService.generate_title_and_type_for_summary(
title, episodic_type = await generate_title_and_type_for_summary(
content=summary_text,
end_user_id=dialog.group_id
llm_client=llm_client
)
logger.info(f"Generated title and type for MemorySummary: title={title}, type={episodic_type}")
except Exception as e:

View File

@@ -260,17 +260,32 @@ class ForgettingStrategy:
)
# 生成标题和类型使用LLM
from app.services.user_memory_service import UserMemoryService
from app.core.memory.storage_services.extraction_engine.knowledge_extraction.memory_summary import generate_title_and_type_for_summary
# 获取 LLM 客户端
llm_client = None
if config_id is not None and db is not None:
try:
llm_client = await self._get_llm_client(db, config_id)
except Exception as e:
logger.warning(f"获取 LLM 客户端失败: {str(e)}")
# 生成标题和类型
try:
title, episodic_type = await UserMemoryService.generate_title_and_type_for_summary(
content=summary_text,
end_user_id=group_id
)
logger.info(f"成功为MemorySummary生成标题和类型: title={title}, type={episodic_type}")
if llm_client is not None:
title, episodic_type = await generate_title_and_type_for_summary(
content=summary_text,
llm_client=llm_client
)
logger.info(f"成功为MemorySummary生成标题和类型: title={title}, type={episodic_type}")
else:
logger.warning("LLM 客户端不可用,使用默认标题和类型")
title = "未命名"
episodic_type = "conversation"
except Exception as e:
logger.error(f"生成标题和类型失败,使用默认值: {str(e)}")
title = "未命名"
episodic_type = "其他"
episodic_type = "conversation"
# 计算继承的激活值和重要性(取较高值)
inherited_activation = max(statement_activation, entity_activation)

View File

@@ -9,6 +9,7 @@ from typing import Optional
from app.core.logging_config import get_logger
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.services.emotion_analytics_service import EmotionAnalyticsService
logger = get_logger(__name__)
@@ -109,3 +110,188 @@ class MemoryBaseService:
except Exception as e:
logger.error(f"提取情景记忆情绪时出错: {str(e)}", exc_info=True)
return None
async def get_episodic_memory_count(
self,
end_user_id: Optional[str] = None
) -> int:
"""
获取情景记忆数量
查询 MemorySummary 节点的数量。
Args:
end_user_id: 可选的终端用户ID用于过滤特定用户的节点
Returns:
情景记忆的数量
"""
try:
if end_user_id:
query = """
MATCH (n:MemorySummary)
WHERE n.group_id = $group_id
RETURN count(n) as count
"""
result = await self.neo4j_connector.execute_query(query, group_id=end_user_id)
else:
query = """
MATCH (n:MemorySummary)
RETURN count(n) as count
"""
result = await self.neo4j_connector.execute_query(query)
count = result[0]["count"] if result and len(result) > 0 else 0
logger.debug(f"情景记忆数量: {count} (end_user_id={end_user_id})")
return count
except Exception as e:
logger.error(f"获取情景记忆数量时出错: {str(e)}", exc_info=True)
return 0
async def get_explicit_memory_count(
self,
end_user_id: Optional[str] = None
) -> int:
"""
获取显性记忆数量
显性记忆 = 情景记忆MemorySummary+ 语义记忆ExtractedEntity with is_explicit_memory=true
Args:
end_user_id: 可选的终端用户ID用于过滤特定用户的节点
Returns:
显性记忆的数量
"""
try:
# 1. 获取情景记忆数量
episodic_count = await self.get_episodic_memory_count(end_user_id)
# 2. 获取语义记忆数量ExtractedEntity 且 is_explicit_memory = true
if end_user_id:
semantic_query = """
MATCH (e:ExtractedEntity)
WHERE e.group_id = $group_id AND e.is_explicit_memory = true
RETURN count(e) as count
"""
semantic_result = await self.neo4j_connector.execute_query(
semantic_query,
group_id=end_user_id
)
else:
semantic_query = """
MATCH (e:ExtractedEntity)
WHERE e.is_explicit_memory = true
RETURN count(e) as count
"""
semantic_result = await self.neo4j_connector.execute_query(semantic_query)
semantic_count = semantic_result[0]["count"] if semantic_result and len(semantic_result) > 0 else 0
# 3. 计算总数
explicit_count = episodic_count + semantic_count
logger.debug(
f"显性记忆数量: {explicit_count} "
f"(情景={episodic_count}, 语义={semantic_count}, end_user_id={end_user_id})"
)
return explicit_count
except Exception as e:
logger.error(f"获取显性记忆数量时出错: {str(e)}", exc_info=True)
return 0
async def get_emotional_memory_count(
self,
end_user_id: Optional[str] = None,
statement_count_fallback: int = 0
) -> int:
"""
获取情绪记忆数量
通过 EmotionAnalyticsService 获取情绪标签统计总数。
如果获取失败或没有指定 end_user_id使用 statement_count_fallback 作为后备。
Args:
end_user_id: 可选的终端用户ID
statement_count_fallback: 后备方案的数量(通常是 statement 节点数量)
Returns:
情绪记忆的数量
"""
try:
if end_user_id:
emotion_service = EmotionAnalyticsService()
emotion_data = await emotion_service.get_emotion_tags(
end_user_id=end_user_id,
emotion_type=None,
start_date=None,
end_date=None,
limit=10
)
emotion_count = emotion_data.get("total_count", 0)
logger.debug(f"情绪记忆数量: {emotion_count} (end_user_id={end_user_id})")
return emotion_count
else:
# 如果没有指定 end_user_id使用后备方案
logger.debug(f"情绪记忆数量: {statement_count_fallback} (使用后备方案)")
return statement_count_fallback
except Exception as e:
logger.warning(f"获取情绪记忆数量失败,使用后备方案: {str(e)}")
return statement_count_fallback
async def get_forget_memory_count(
self,
end_user_id: Optional[str] = None,
forgetting_threshold: float = 0.3
) -> int:
"""
获取遗忘记忆数量
统计激活值低于遗忘阈值的节点数量low_activation_nodes
查询范围包括Statement、ExtractedEntity、MemorySummary、Chunk 节点。
Args:
end_user_id: 可选的终端用户ID用于过滤特定用户的节点
forgetting_threshold: 遗忘阈值,默认 0.3
Returns:
遗忘记忆的数量(激活值低于阈值的节点数)
"""
try:
# 构建查询语句
query = """
MATCH (n)
WHERE (n:Statement OR n:ExtractedEntity OR n:MemorySummary OR n:Chunk)
"""
if end_user_id:
query += " AND n.group_id = $group_id"
query += """
RETURN sum(CASE WHEN n.activation_value IS NOT NULL AND n.activation_value < $threshold THEN 1 ELSE 0 END) as low_activation_nodes
"""
# 设置查询参数
params = {'threshold': forgetting_threshold}
if end_user_id:
params['group_id'] = end_user_id
# 执行查询
result = await self.neo4j_connector.execute_query(query, **params)
# 提取结果
forget_count = result[0]['low_activation_nodes'] if result and len(result) > 0 else 0
forget_count = forget_count or 0 # 处理 None 值
logger.debug(
f"遗忘记忆数量: {forget_count} "
f"(threshold={forgetting_threshold}, end_user_id={end_user_id})"
)
return forget_count
except Exception as e:
logger.error(f"获取遗忘记忆数量时出错: {str(e)}", exc_info=True)
return 0

View File

@@ -401,5 +401,5 @@ class MemoryEpisodicService(MemoryBaseService):
raise
# 创建全局服务实例
# 创建全局服务实例(供控制器层使用)
memory_episodic_service = MemoryEpisodicService()

View File

@@ -15,6 +15,7 @@ from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context
from app.repositories.end_user_repository import EndUserRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.services.memory_base_service import MemoryBaseService
from app.services.memory_config_service import MemoryConfigService
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
@@ -1195,17 +1196,18 @@ async def analytics_memory_types(
end_user_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
统计8种记忆类型的数量和百分比
统计9种记忆类型的数量和百分比
计算规则:
1. 感知记忆 (PERCEPTUAL_MEMORY) = statement + entity
2. 工作记忆 (WORKING_MEMORY) = chunk + entity
3. 短期记忆 (SHORT_TERM_MEMORY) = chunk
4. 长期记忆 (LONG_TERM_MEMORY) = entity
5. 显性记忆 (EXPLICIT_MEMORY) = 1/2 * entity
5. 显性记忆 (EXPLICIT_MEMORY) = 情景记忆 + 语义记忆(通过 MemoryBaseService.get_explicit_memory_count 获取)
6. 隐性记忆 (IMPLICIT_MEMORY) = 1/3 * entity
7. 情绪记忆 (EMOTIONAL_MEMORY) = statement
8. 情景记忆 (EPISODIC_MEMORY) = memory_summary
7. 情绪记忆 (EMOTIONAL_MEMORY) = 情绪标签统计总数(通过 MemoryBaseService.get_emotional_memory_count 获取)
8. 情景记忆 (EPISODIC_MEMORY) = memory_summary(通过 MemoryBaseService.get_episodic_memory_count 获取)
9. 遗忘记忆 (FORGET_MEMORY) = 激活值低于阈值的节点数(通过 MemoryBaseService.get_forget_memory_count 获取)
Args:
db: 数据库会话
@@ -1230,13 +1232,16 @@ async def analytics_memory_types(
- IMPLICIT_MEMORY: 隐性记忆
- EMOTIONAL_MEMORY: 情绪记忆
- EPISODIC_MEMORY: 情景记忆
- FORGET_MEMORY: 遗忘记忆
"""
# 定义需要查询的节点类型
# 初始化基础服务
base_service = MemoryBaseService()
# 定义需要查询的基础节点类型
node_types = {
"Statement": "Statement",
"Entity": "ExtractedEntity",
"Chunk": "Chunk",
"MemorySummary": "MemorySummary"
"Chunk": "Chunk"
}
# 存储每种节点类型的计数
@@ -1266,18 +1271,45 @@ async def analytics_memory_types(
statement_count = node_counts.get("Statement", 0)
entity_count = node_counts.get("Entity", 0)
chunk_count = node_counts.get("Chunk", 0)
memory_summary_count = node_counts.get("MemorySummary", 0)
# 按规则计算8种记忆类型的数量使用英文枚举作为key
# 获取用户的遗忘阈值配置
forgetting_threshold = 0.3 # 默认值
if end_user_id:
try:
from app.services.memory_agent_service import get_end_user_connected_config
from app.core.memory.storage_services.forgetting_engine.config_utils import load_actr_config_from_db
# 获取用户关联的 config_id
connected_config = get_end_user_connected_config(end_user_id, db)
config_id = connected_config.get('memory_config_id')
if config_id:
# 从数据库加载配置
config = load_actr_config_from_db(db, config_id)
forgetting_threshold = config.get('forgetting_threshold', 0.3)
logger.debug(f"使用用户配置的遗忘阈值: {forgetting_threshold} (end_user_id={end_user_id}, config_id={config_id})")
else:
logger.debug(f"用户未关联配置,使用默认遗忘阈值: {forgetting_threshold} (end_user_id={end_user_id})")
except Exception as e:
logger.warning(f"获取用户遗忘阈值配置失败,使用默认值 {forgetting_threshold}: {str(e)}")
# 使用 MemoryBaseService 的共享方法获取特殊记忆类型的数量
episodic_count = await base_service.get_episodic_memory_count(end_user_id)
explicit_count = await base_service.get_explicit_memory_count(end_user_id)
emotion_count = await base_service.get_emotional_memory_count(end_user_id, statement_count)
forget_count = await base_service.get_forget_memory_count(end_user_id, forgetting_threshold)
# 按规则计算9种记忆类型的数量使用英文枚举作为key
memory_counts = {
"PERCEPTUAL_MEMORY": statement_count + entity_count, # 感知记忆
"WORKING_MEMORY": chunk_count + entity_count, # 工作记忆
"SHORT_TERM_MEMORY": chunk_count, # 短期记忆
"LONG_TERM_MEMORY": entity_count, # 长期记忆
"EXPLICIT_MEMORY": entity_count // 2, # 显性记忆 (1/2 entity)
"EXPLICIT_MEMORY": explicit_count, # 显性记忆(情景记忆 + 语义记忆)
"IMPLICIT_MEMORY": entity_count // 3, # 隐性记忆 (1/3 entity)
"EMOTIONAL_MEMORY": statement_count, # 情绪记忆
"EPISODIC_MEMORY": memory_summary_count # 情景记忆
"EMOTIONAL_MEMORY": emotion_count, # 情绪记忆(使用情绪标签统计)
"EPISODIC_MEMORY": episodic_count, # 情景记忆
"FORGET_MEMORY": forget_count # 遗忘记忆(激活值低于阈值)
}
# 计算总数