Merge branch 'develop' into fix/workflow

This commit is contained in:
Eternity
2026-01-14 10:58:13 +08:00
committed by GitHub
77 changed files with 3193 additions and 1889 deletions

View File

@@ -55,7 +55,7 @@ def create_long_term_memory_tool(memory_config: Dict[str, Any], end_user_id: str
长期记忆工具
"""
# search_switch = memory_config.get("search_switch", "2")
config_id= memory_config.get("memory_content",'17')
config_id= memory_config.get("memory_content",None)
logger.info(f"创建长期记忆工具,配置: end_user_id={end_user_id}, config_id={config_id}, storage_type={storage_type}")
@tool(args_schema=LongTermMemoryInput)
def long_term_memory(question: str) -> str:
@@ -94,7 +94,7 @@ def create_long_term_memory_tool(memory_config: Dict[str, Any], end_user_id: str
group_id=end_user_id,
message=question,
history=[],
search_switch="1",
search_switch="2",
config_id=config_id,
db=db,
storage_type=storage_type,

View File

@@ -1,6 +1,11 @@
import json
from pathlib import Path
from datetime import datetime, timedelta
from fastapi import HTTPException
from sqlalchemy.orm import Session
from uuid import UUID
from typing import Dict, Any
from app.repositories.home_page_repository import HomePageRepository
from app.schemas.home_page_schema import HomeStatistics, WorkspaceInfo
@@ -68,4 +73,69 @@ class HomePageService:
)
workspace_list.append(workspace_info)
return workspace_list
return workspace_list
@staticmethod
def load_version_introduction(version: str) -> Dict[str, Any]:
"""
从 JSON 文件加载对应版本的介绍
:param version: 系统版本号(如 "0.2.0"
:return: 对应版本的详细介绍
"""
# 1. 定义 JSON 文件路径(使用 Path 处理跨平台路径问题)
json_file_path = Path(__file__).parent.parent / "version_info.json"
# 转换为绝对路径,便于调试
json_abs_path = json_file_path.resolve()
try:
# 2. 读取 JSON 文件
if not json_abs_path.exists():
return {
"message": f"版本介绍文件不存在:{json_abs_path}",
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}
with open(json_abs_path, "r", encoding="utf-8") as f:
changelogs = json.load(f)
# 3. 匹配对应版本的介绍,若版本不存在返回默认提示
if version not in changelogs:
return {
"message": f"暂未查询到 {version} 版本的详细介绍",
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}
return changelogs[version]
except FileNotFoundError as e:
# 处理文件不存在异常
return {
"message": f"系统内部错误:{str(e)}",
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}
except json.JSONDecodeError:
# 处理 JSON 格式错误
return {
"message": "版本介绍文件格式错误,无法解析 JSON",
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}
except Exception as e:
# 处理其他未知异常
return {
"message": f"加载版本介绍失败:{str(e)}",
"codeName": "",
"releaseDate": "",
"upgradePosition": "",
"coreUpgrades": []
}

View File

@@ -4,7 +4,6 @@ Memory Agent Service
Handles business logic for memory agent operations including read/write services,
health checks, and message type classification.
"""
import datetime
import json
import os
import re
@@ -27,7 +26,7 @@ from app.db import get_db_context
from app.models.knowledge_model import Knowledge, KnowledgeType
from app.repositories.memory_short_repository import ShortTermMemoryRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_config_schema import ConfigurationError, MemoryConfig
from app.schemas.memory_config_schema import ConfigurationError
from app.services.memory_config_service import MemoryConfigService
from app.services.memory_konwledges_server import (
write_rag,
@@ -610,7 +609,7 @@ class MemoryAgentService:
reranked_results=raw_results.get('reranked_results',[])
try:
statements=[statement['statement'] for statement in reranked_results.get('statements', [])]
except Exception as e:
except Exception:
statements=[]
statements=list(set(statements))
retrieved_content.append({query:statements})
@@ -832,7 +831,6 @@ class MemoryAgentService:
# 获取当前空间下的所有宿主
from app.repositories import app_repository, end_user_repository
from app.schemas.app_schema import App as AppSchema
from app.schemas.end_user_schema import EndUser as EndUserSchema
# 查询应用并转换为 Pydantic 模型
apps_orm = app_repository.get_apps_by_workspace_id(db, current_workspace_id)
@@ -1175,19 +1173,21 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
1. 根据 end_user_id 获取用户的 app_id
2. 获取该应用的最新发布版本
3. 从发布版本的 config 字段中提取 memory_config_id
4. 根据 memory_config_id 查询配置名称
Args:
end_user_id: 终端用户ID
db: 数据库会话
Returns:
包含 memory_config_id 和相关信息的字典
包含 memory_config_id、config_name 和相关信息的字典
Raises:
ValueError: 当终端用户不存在或应用未发布时
"""
from app.models.app_release_model import AppRelease
from app.models.end_user_model import EndUser
from app.models.data_config_model import DataConfig
from sqlalchemy import select
logger.info(f"Getting connected config for end_user: {end_user_id}")
@@ -1220,13 +1220,158 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
memory_obj = config.get('memory', {})
memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None
# 4. 根据 memory_config_id 查询配置名称
config_name = None
if memory_config_id:
try:
# memory_config_id 可能是整数或字符串,需要转换
config_id = int(memory_config_id) if isinstance(memory_config_id, str) else memory_config_id
data_config = db.query(DataConfig).filter(DataConfig.config_id == config_id).first()
if data_config:
config_name = data_config.config_name
logger.debug(f"Found config_name: {config_name} for config_id: {config_id}")
else:
logger.warning(f"DataConfig not found for config_id: {config_id}")
except (ValueError, TypeError) as e:
logger.warning(f"Invalid memory_config_id format: {memory_config_id}, error: {str(e)}")
result = {
"end_user_id": str(end_user_id),
"app_id": str(app_id),
"release_id": str(latest_release.id),
"release_version": latest_release.version,
"memory_config_id": memory_config_id
"memory_config_id": memory_config_id,
"memory_config_name": config_name
}
logger.info(f"Successfully retrieved connected config: memory_config_id={memory_config_id}")
logger.info(f"Successfully retrieved connected config: memory_config_id={memory_config_id}, config_name={config_name}")
return result
def get_end_users_connected_configs_batch(end_user_ids: List[str], db: Session) -> Dict[str, Dict[str, Any]]:
"""
批量获取多个终端用户关联的记忆配置
通过优化的查询减少数据库往返次数:
1. 一次性查询所有 end_user 及其 app_id
2. 批量查询所有相关的 app_release
3. 批量查询所有相关的 data_config
Args:
end_user_ids: 终端用户ID列表
db: 数据库会话
Returns:
字典key 为 end_user_idvalue 为配置信息字典
对于查询失败的用户value 包含 error 字段
"""
from app.models.app_release_model import AppRelease
from app.models.end_user_model import EndUser
from app.models.data_config_model import DataConfig
from sqlalchemy import select
logger.info(f"Batch getting connected configs for {len(end_user_ids)} end users")
result = {}
# 1. 批量查询所有 end_user 及其 app_id
end_users = db.query(EndUser).filter(EndUser.id.in_(end_user_ids)).all()
# 构建 end_user_id -> end_user 的映射
end_user_map = {str(user.id): user for user in end_users}
# 记录不存在的用户
for user_id in end_user_ids:
if user_id not in end_user_map:
result[user_id] = {
"end_user_id": user_id,
"memory_config_id": None,
"memory_config_name": None,
"error": f"终端用户不存在: {user_id}"
}
if not end_users:
logger.warning("No valid end users found")
return result
# 2. 批量查询所有相关应用的最新发布版本
app_ids = [user.app_id for user in end_users]
# 使用子查询找到每个 app 的最新版本
from sqlalchemy import and_
# 查询所有相关的活跃发布版本
releases = db.query(AppRelease).filter(
and_(
AppRelease.app_id.in_(app_ids),
AppRelease.is_active.is_(True)
)
).order_by(AppRelease.app_id, AppRelease.version.desc()).all()
# 构建 app_id -> latest_release 的映射(每个 app 只保留最新版本)
app_release_map = {}
for release in releases:
app_id_str = str(release.app_id)
if app_id_str not in app_release_map:
app_release_map[app_id_str] = release
# 3. 收集所有 memory_config_id
memory_config_ids = []
for release in app_release_map.values():
config = release.config or {}
memory_obj = config.get('memory', {})
memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None
if memory_config_id:
try:
config_id = int(memory_config_id) if isinstance(memory_config_id, str) else memory_config_id
memory_config_ids.append(config_id)
except (ValueError, TypeError):
pass
# 4. 批量查询所有 data_config
config_name_map = {}
if memory_config_ids:
data_configs = db.query(DataConfig).filter(
DataConfig.config_id.in_(memory_config_ids)
).all()
config_name_map = {config.config_id: config.config_name for config in data_configs}
# 5. 组装结果
for user in end_users:
user_id = str(user.id)
app_id = str(user.app_id)
# 检查是否有发布版本
if app_id not in app_release_map:
result[user_id] = {
"end_user_id": user_id,
"memory_config_id": None,
"memory_config_name": None,
"error": f"应用未发布: {app_id}"
}
continue
release = app_release_map[app_id]
# 提取 memory_config_id
config = release.config or {}
memory_obj = config.get('memory', {})
memory_config_id = memory_obj.get('memory_content') if isinstance(memory_obj, dict) else None
# 获取 config_name
config_name = None
if memory_config_id:
try:
config_id = int(memory_config_id) if isinstance(memory_config_id, str) else memory_config_id
config_name = config_name_map.get(config_id)
except (ValueError, TypeError):
pass
result[user_id] = {
"end_user_id": user_id,
"memory_config_id": memory_config_id,
"memory_config_name": config_name
}
logger.info(f"Successfully retrieved batch configs: total={len(result)}, with_config={sum(1 for v in result.values() if v.get('memory_config_id'))}")
return result

View File

@@ -0,0 +1,297 @@
"""
Memory Base Service
提供记忆服务的基础功能和共享辅助方法。
"""
from datetime import datetime
from typing import Optional
from app.core.logging_config import get_logger
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.services.emotion_analytics_service import EmotionAnalyticsService
logger = get_logger(__name__)
class MemoryBaseService:
"""记忆服务基类,提供共享的辅助方法"""
def __init__(self):
self.neo4j_connector = Neo4jConnector()
@staticmethod
def parse_timestamp(timestamp_value) -> Optional[int]:
"""
将时间戳转换为毫秒级时间戳
支持多种输入格式:
- Neo4j DateTime 对象
- ISO格式的时间戳字符串
- Python datetime 对象
Args:
timestamp_value: 时间戳值(可以是多种类型)
Returns:
毫秒级时间戳如果解析失败则返回None
"""
if not timestamp_value:
return None
try:
# 处理 Neo4j DateTime 对象
if hasattr(timestamp_value, 'to_native'):
dt_object = timestamp_value.to_native()
return int(dt_object.timestamp() * 1000)
# 处理 Python datetime 对象
if isinstance(timestamp_value, datetime):
return int(timestamp_value.timestamp() * 1000)
# 处理字符串格式
if isinstance(timestamp_value, str):
dt_object = datetime.fromisoformat(timestamp_value.replace("Z", "+00:00"))
return int(dt_object.timestamp() * 1000)
# 其他情况尝试转换为字符串再解析
dt_object = datetime.fromisoformat(str(timestamp_value).replace("Z", "+00:00"))
return int(dt_object.timestamp() * 1000)
except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"无法解析时间戳: {timestamp_value}, error={str(e)}")
return None
async def extract_episodic_emotion(
self,
summary_id: str,
end_user_id: str
) -> Optional[str]:
"""
提取情景记忆的主要情绪
查询MemorySummary节点关联的Statement节点
返回emotion_intensity最大的emotion_type。
Args:
summary_id: Summary节点的ID
end_user_id: 终端用户ID (group_id)
Returns:
最大emotion_intensity对应的emotion_type如果没有则返回None
"""
try:
query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $summary_id AND s.group_id = $group_id
MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement)
WHERE stmt.emotion_type IS NOT NULL
AND stmt.emotion_intensity IS NOT NULL
RETURN stmt.emotion_type AS emotion_type,
stmt.emotion_intensity AS emotion_intensity
ORDER BY emotion_intensity DESC
LIMIT 1
"""
result = await self.neo4j_connector.execute_query(
query,
summary_id=summary_id,
group_id=end_user_id
)
if result and len(result) > 0:
emotion_type = result[0].get("emotion_type")
logger.info(f"成功提取 summary_id={summary_id} 的情绪: {emotion_type}")
return emotion_type
else:
logger.info(f"summary_id={summary_id} 没有情绪信息")
return None
except Exception as e:
logger.error(f"提取情景记忆情绪时出错: {str(e)}", exc_info=True)
return None
async def get_episodic_memory_count(
self,
end_user_id: Optional[str] = None
) -> int:
"""
获取情景记忆数量
查询 MemorySummary 节点的数量。
Args:
end_user_id: 可选的终端用户ID用于过滤特定用户的节点
Returns:
情景记忆的数量
"""
try:
if end_user_id:
query = """
MATCH (n:MemorySummary)
WHERE n.group_id = $group_id
RETURN count(n) as count
"""
result = await self.neo4j_connector.execute_query(query, group_id=end_user_id)
else:
query = """
MATCH (n:MemorySummary)
RETURN count(n) as count
"""
result = await self.neo4j_connector.execute_query(query)
count = result[0]["count"] if result and len(result) > 0 else 0
logger.debug(f"情景记忆数量: {count} (end_user_id={end_user_id})")
return count
except Exception as e:
logger.error(f"获取情景记忆数量时出错: {str(e)}", exc_info=True)
return 0
async def get_explicit_memory_count(
self,
end_user_id: Optional[str] = None
) -> int:
"""
获取显性记忆数量
显性记忆 = 情景记忆MemorySummary+ 语义记忆ExtractedEntity with is_explicit_memory=true
Args:
end_user_id: 可选的终端用户ID用于过滤特定用户的节点
Returns:
显性记忆的数量
"""
try:
# 1. 获取情景记忆数量
episodic_count = await self.get_episodic_memory_count(end_user_id)
# 2. 获取语义记忆数量ExtractedEntity 且 is_explicit_memory = true
if end_user_id:
semantic_query = """
MATCH (e:ExtractedEntity)
WHERE e.group_id = $group_id AND e.is_explicit_memory = true
RETURN count(e) as count
"""
semantic_result = await self.neo4j_connector.execute_query(
semantic_query,
group_id=end_user_id
)
else:
semantic_query = """
MATCH (e:ExtractedEntity)
WHERE e.is_explicit_memory = true
RETURN count(e) as count
"""
semantic_result = await self.neo4j_connector.execute_query(semantic_query)
semantic_count = semantic_result[0]["count"] if semantic_result and len(semantic_result) > 0 else 0
# 3. 计算总数
explicit_count = episodic_count + semantic_count
logger.debug(
f"显性记忆数量: {explicit_count} "
f"(情景={episodic_count}, 语义={semantic_count}, end_user_id={end_user_id})"
)
return explicit_count
except Exception as e:
logger.error(f"获取显性记忆数量时出错: {str(e)}", exc_info=True)
return 0
async def get_emotional_memory_count(
self,
end_user_id: Optional[str] = None,
statement_count_fallback: int = 0
) -> int:
"""
获取情绪记忆数量
通过 EmotionAnalyticsService 获取情绪标签统计总数。
如果获取失败或没有指定 end_user_id使用 statement_count_fallback 作为后备。
Args:
end_user_id: 可选的终端用户ID
statement_count_fallback: 后备方案的数量(通常是 statement 节点数量)
Returns:
情绪记忆的数量
"""
try:
if end_user_id:
emotion_service = EmotionAnalyticsService()
emotion_data = await emotion_service.get_emotion_tags(
end_user_id=end_user_id,
emotion_type=None,
start_date=None,
end_date=None,
limit=10
)
emotion_count = emotion_data.get("total_count", 0)
logger.debug(f"情绪记忆数量: {emotion_count} (end_user_id={end_user_id})")
return emotion_count
else:
# 如果没有指定 end_user_id使用后备方案
logger.debug(f"情绪记忆数量: {statement_count_fallback} (使用后备方案)")
return statement_count_fallback
except Exception as e:
logger.warning(f"获取情绪记忆数量失败,使用后备方案: {str(e)}")
return statement_count_fallback
async def get_forget_memory_count(
self,
end_user_id: Optional[str] = None,
forgetting_threshold: float = 0.3
) -> int:
"""
获取遗忘记忆数量
统计激活值低于遗忘阈值的节点数量low_activation_nodes
查询范围包括Statement、ExtractedEntity、MemorySummary、Chunk 节点。
Args:
end_user_id: 可选的终端用户ID用于过滤特定用户的节点
forgetting_threshold: 遗忘阈值,默认 0.3
Returns:
遗忘记忆的数量(激活值低于阈值的节点数)
"""
try:
# 构建查询语句
query = """
MATCH (n)
WHERE (n:Statement OR n:ExtractedEntity OR n:MemorySummary OR n:Chunk)
"""
if end_user_id:
query += " AND n.group_id = $group_id"
query += """
RETURN sum(CASE WHEN n.activation_value IS NOT NULL AND n.activation_value < $threshold THEN 1 ELSE 0 END) as low_activation_nodes
"""
# 设置查询参数
params = {'threshold': forgetting_threshold}
if end_user_id:
params['group_id'] = end_user_id
# 执行查询
result = await self.neo4j_connector.execute_query(query, **params)
# 提取结果
forget_count = result[0]['low_activation_nodes'] if result and len(result) > 0 else 0
forget_count = forget_count or 0 # 处理 None 值
logger.debug(
f"遗忘记忆数量: {forget_count} "
f"(threshold={forgetting_threshold}, end_user_id={end_user_id})"
)
return forget_count
except Exception as e:
logger.error(f"获取遗忘记忆数量时出错: {str(e)}", exc_info=True)
return 0

View File

@@ -0,0 +1,405 @@
"""
Episodic Memory Service
处理情景记忆相关的业务逻辑,包括情景记忆总览、详情查询等。
"""
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
import pytz
from app.core.logging_config import get_logger
from app.services.memory_base_service import MemoryBaseService
logger = get_logger(__name__)
class MemoryEpisodicService(MemoryBaseService):
"""情景记忆服务类"""
def __init__(self):
super().__init__()
logger.info("MemoryEpisodicService initialized")
async def _get_title_and_type(
self,
summary_id: str,
end_user_id: str
) -> Tuple[str, str]:
"""
读取情景记忆的标题(title)和类型(type)
仅负责读取已存在的title和type不进行生成
title从name属性读取type从memory_type属性读取
Args:
summary_id: Summary节点的ID
end_user_id: 终端用户ID (group_id)
Returns:
(标题, 类型)元组,如果不存在则返回默认值
"""
try:
# 查询Summary节点的name(作为title)和memory_type(作为type)
query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $summary_id AND s.group_id = $group_id
RETURN s.name AS title, s.memory_type AS type
"""
result = await self.neo4j_connector.execute_query(
query,
summary_id=summary_id,
group_id=end_user_id
)
if not result or len(result) == 0:
logger.warning(f"未找到 summary_id={summary_id} 的节点")
return ("未知标题", "其他")
record = result[0]
title = record.get("title") or "未命名"
episodic_type = record.get("type") or "其他"
return (title, episodic_type)
except Exception as e:
logger.error(f"读取标题和类型时出错: {str(e)}", exc_info=True)
return ("错误", "其他")
async def _extract_involved_objects(
self,
summary_id: str,
end_user_id: str
) -> List[str]:
"""
提取情景记忆涉及的前3个最重要实体
Args:
summary_id: Summary节点的ID
end_user_id: 终端用户ID (group_id)
Returns:
前3个实体的name属性列表
"""
try:
# 查询Summary节点指向的Statement节点,再查询Statement指向的ExtractedEntity节点
# 按activation_value降序排序,返回前3个
query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $summary_id AND s.group_id = $group_id
MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement)
MATCH (stmt)-[:REFERENCES_ENTITY]->(entity:ExtractedEntity)
WHERE entity.activation_value IS NOT NULL
RETURN DISTINCT entity.name AS name, entity.activation_value AS activation
ORDER BY activation DESC
LIMIT 3
"""
result = await self.neo4j_connector.execute_query(
query,
summary_id=summary_id,
group_id=end_user_id
)
# 提取实体名称
involved_objects = [record["name"] for record in result if record.get("name")]
logger.info(f"成功提取 summary_id={summary_id} 的涉及对象: {involved_objects}")
return involved_objects
except Exception as e:
logger.error(f"提取涉及对象时出错: {str(e)}", exc_info=True)
return []
async def _extract_content_records(
self,
summary_id: str,
end_user_id: str
) -> List[str]:
"""
提取情景记忆的内容记录
Args:
summary_id: Summary节点的ID
end_user_id: 终端用户ID (group_id)
Returns:
所有Statement节点的statement属性内容列表
"""
try:
# 查询Summary节点指向的所有Statement节点
query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $summary_id AND s.group_id = $group_id
MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement)
WHERE stmt.statement IS NOT NULL AND stmt.statement <> ''
RETURN stmt.statement AS statement
"""
result = await self.neo4j_connector.execute_query(
query,
summary_id=summary_id,
group_id=end_user_id
)
# 提取statement内容
content_records = [record["statement"] for record in result if record.get("statement")]
logger.info(f"成功提取 summary_id={summary_id} 的内容记录,共 {len(content_records)}")
return content_records
except Exception as e:
logger.error(f"提取内容记录时出错: {str(e)}", exc_info=True)
return []
def _calculate_time_filter(self, time_range: str) -> Optional[str]:
"""
根据时间范围计算过滤的起始时间
Args:
time_range: 时间范围 (all/today/this_week/this_month)
Returns:
ISO格式的时间字符串如果是"all"则返回None
"""
if time_range == "all":
return None
# 获取当前时间UTC
now = datetime.now(pytz.UTC)
if time_range == "today":
# 今天的开始时间00:00:00
start_time = now.replace(hour=0, minute=0, second=0, microsecond=0)
elif time_range == "this_week":
# 本周的开始时间周一00:00:00
days_since_monday = now.weekday()
start_time = (now - timedelta(days=days_since_monday)).replace(
hour=0, minute=0, second=0, microsecond=0
)
elif time_range == "this_month":
# 本月的开始时间1号00:00:00
start_time = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
else:
return None
# 返回ISO格式字符串
return start_time.isoformat()
async def get_episodic_memory_overview(
self,
end_user_id: str,
time_range: str = "all",
episodic_type: str = "all",
title_keyword: Optional[str] = None
) -> Dict[str, Any]:
"""
获取情景记忆总览信息
Args:
end_user_id: 终端用户ID
time_range: 时间范围筛选
episodic_type: 情景类型筛选
title_keyword: 标题关键词(可选,用于模糊搜索)
"""
try:
logger.info(
f"开始查询 end_user_id={end_user_id} 的情景记忆总览, "
f"time_range={time_range}, episodic_type={episodic_type}, title_keyword={title_keyword}"
)
# 1. 先查询所有情景记忆的总数(不受筛选条件限制)
total_all_query = """
MATCH (s:MemorySummary)
WHERE s.group_id = $group_id
RETURN count(s) AS total_all
"""
total_all_result = await self.neo4j_connector.execute_query(
total_all_query,
group_id=end_user_id
)
total_all = total_all_result[0]["total_all"] if total_all_result else 0
# 2. 计算时间范围的起始时间戳
time_filter = self._calculate_time_filter(time_range)
# 3. 构建Cypher查询
query = """
MATCH (s:MemorySummary)
WHERE s.group_id = $group_id
"""
# 添加时间范围过滤
if time_filter:
query += " AND s.created_at >= $time_filter"
# 添加标题关键词过滤如果提供了title_keyword
if title_keyword:
query += " AND toLower(s.name) CONTAINS toLower($title_keyword)"
query += """
RETURN elementId(s) AS id,
s.created_at AS created_at,
s.memory_type AS type,
s.name AS title
ORDER BY s.created_at DESC
"""
params = {"group_id": end_user_id}
if time_filter:
params["time_filter"] = time_filter
if title_keyword:
params["title_keyword"] = title_keyword
result = await self.neo4j_connector.execute_query(query, **params)
# 4. 如果没有数据,返回空列表
if not result:
logger.info(f"end_user_id={end_user_id} 没有情景记忆数据")
return {
"total": 0,
"total_all": total_all,
"episodic_memories": []
}
# 5. 对每个节点读取标题和类型,并应用类型筛选
episodic_memories = []
for record in result:
summary_id = record["id"]
created_at_str = record.get("created_at")
memory_type = record.get("type", "其他")
title = record.get("title") or "未命名" # 直接从查询结果获取标题
# 应用情景类型筛选
if episodic_type != "all":
# 检查类型是否匹配
# 注意Neo4j 中存储的 memory_type 现在应该是英文格式(如 "conversation", "project_work"
# 但为了兼容旧数据,我们也支持中文格式的匹配
type_mapping = {
"conversation": "对话",
"project_work": "项目/工作",
"learning": "学习",
"decision": "决策",
"important_event": "重要事件"
}
# 获取对应的中文类型(用于兼容旧数据)
chinese_type = type_mapping.get(episodic_type)
# 检查类型是否匹配(支持新的英文格式和旧的中文格式)
if memory_type != episodic_type and memory_type != chinese_type:
continue
# 使用基类方法转换时间戳
created_at_timestamp = self.parse_timestamp(created_at_str)
episodic_memories.append({
"id": summary_id,
"title": title,
"type": memory_type,
"created_at": created_at_timestamp
})
logger.info(
f"成功获取 end_user_id={end_user_id} 的情景记忆总览,"
f"筛选后 {len(episodic_memories)} 条,总共 {total_all}"
)
return {
"total": len(episodic_memories),
"total_all": total_all,
"episodic_memories": episodic_memories
}
except Exception as e:
logger.error(f"获取情景记忆总览时出错: {str(e)}", exc_info=True)
raise
async def get_episodic_memory_details(
self,
end_user_id: str,
summary_id: str
) -> Dict[str, Any]:
"""
获取单个情景记忆的详细信息
"""
try:
logger.info(f"开始查询 end_user_id={end_user_id}, summary_id={summary_id} 的情景记忆详情")
# 1. 查询指定的MemorySummary节点
query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $summary_id AND s.group_id = $group_id
RETURN elementId(s) AS id, s.created_at AS created_at
"""
result = await self.neo4j_connector.execute_query(
query,
summary_id=summary_id,
group_id=end_user_id
)
# 2. 如果节点不存在,返回错误
if not result or len(result) == 0:
logger.warning(f"未找到 summary_id={summary_id} 的情景记忆")
raise ValueError(f"情景记忆不存在: summary_id={summary_id}")
# 3. 获取基本信息
record = result[0]
created_at_str = record.get("created_at")
# 使用基类方法转换时间戳
created_at_timestamp = self.parse_timestamp(created_at_str)
# 4. 调用_get_title_and_type读取标题和类型
title, episodic_type = await self._get_title_and_type(
summary_id=summary_id,
end_user_id=end_user_id
)
# 5. 调用_extract_involved_objects提取涉及对象
involved_objects = await self._extract_involved_objects(
summary_id=summary_id,
end_user_id=end_user_id
)
# 6. 调用_extract_content_records提取内容记录
content_records = await self._extract_content_records(
summary_id=summary_id,
end_user_id=end_user_id
)
# 7. 使用基类方法提取情绪
emotion = await self.extract_episodic_emotion(
summary_id=summary_id,
end_user_id=end_user_id
)
# 8. 返回完整的详情信息
details = {
"id": summary_id,
"created_at": created_at_timestamp,
"involved_objects": involved_objects,
"episodic_type": episodic_type,
"content_records": content_records,
"emotion": emotion
}
logger.info(f"成功获取 summary_id={summary_id} 的情景记忆详情")
return details
except ValueError:
# 重新抛出ValueError让Controller层处理
raise
except Exception as e:
logger.error(f"获取情景记忆详情时出错: {str(e)}", exc_info=True)
raise
# 创建全局服务实例(供控制器层使用)
memory_episodic_service = MemoryEpisodicService()

View File

@@ -0,0 +1,274 @@
"""
显性记忆服务
处理显性记忆相关的业务逻辑,包括情景记忆和语义记忆的查询。
"""
from typing import Any, Dict
from app.core.logging_config import get_logger
from app.services.memory_base_service import MemoryBaseService
logger = get_logger(__name__)
class MemoryExplicitService(MemoryBaseService):
"""显性记忆服务类"""
def __init__(self):
super().__init__()
logger.info("MemoryExplicitService initialized")
async def get_explicit_memory_overview(
self,
end_user_id: str
) -> Dict[str, Any]:
"""
获取显性记忆总览信息
返回两部分:
1. 情景记忆episodic_memories- 来自MemorySummary节点
2. 语义记忆semantic_memories- 来自ExtractedEntity节点is_explicit_memory=true
Args:
end_user_id: 终端用户ID
Returns:
{
"total": int,
"episodic_memories": [
{
"id": str,
"title": str,
"content": str,
"created_at": int
}
],
"semantic_memories": [
{
"id": str,
"name": str,
"entity_type": str,
"core_definition": str
}
]
}
"""
try:
logger.info(f"开始查询 end_user_id={end_user_id} 的显性记忆总览(情景记忆+语义记忆)")
# ========== 1. 查询情景记忆MemorySummary节点 ==========
episodic_query = """
MATCH (s:MemorySummary)
WHERE s.group_id = $group_id
RETURN elementId(s) AS id,
s.name AS title,
s.content AS content,
s.created_at AS created_at
ORDER BY s.created_at DESC
"""
episodic_result = await self.neo4j_connector.execute_query(
episodic_query,
group_id=end_user_id
)
# 处理情景记忆数据
episodic_memories = []
if episodic_result:
for record in episodic_result:
summary_id = record["id"]
title = record.get("title") or "未命名"
content = record.get("content") or ""
created_at_str = record.get("created_at")
# 使用基类方法转换时间戳
created_at_timestamp = self.parse_timestamp(created_at_str)
# 注意:总览接口不返回 emotion 字段
episodic_memories.append({
"id": summary_id,
"title": title,
"content": content,
"created_at": created_at_timestamp
})
# ========== 2. 查询语义记忆ExtractedEntity节点 ==========
semantic_query = """
MATCH (e:ExtractedEntity)
WHERE e.group_id = $group_id
AND e.is_explicit_memory = true
RETURN elementId(e) AS id,
e.name AS name,
e.entity_type AS entity_type,
e.description AS core_definition
ORDER BY e.name ASC
"""
semantic_result = await self.neo4j_connector.execute_query(
semantic_query,
group_id=end_user_id
)
# 处理语义记忆数据
semantic_memories = []
if semantic_result:
for record in semantic_result:
entity_id = record["id"]
name = record.get("name") or "未命名"
entity_type = record.get("entity_type") or "未分类"
core_definition = record.get("core_definition") or ""
# 注意:总览接口不返回 detailed_notes 和 created_at 字段
semantic_memories.append({
"id": entity_id,
"name": name,
"entity_type": entity_type,
"core_definition": core_definition
})
# ========== 3. 返回结果 ==========
total_count = len(episodic_memories) + len(semantic_memories)
logger.info(
f"成功获取 end_user_id={end_user_id} 的显性记忆总览,"
f"情景记忆={len(episodic_memories)} 条,语义记忆={len(semantic_memories)} 条,"
f"总计 {total_count}"
)
return {
"total": total_count,
"episodic_memories": episodic_memories,
"semantic_memories": semantic_memories
}
except Exception as e:
logger.error(f"获取显性记忆总览时出错: {str(e)}", exc_info=True)
raise
async def get_explicit_memory_details(
self,
end_user_id: str,
memory_id: str
) -> Dict[str, Any]:
"""
获取显性记忆详情
根据 memory_id 查询情景记忆或语义记忆的详细信息。
先尝试查询情景记忆,如果找不到再查询语义记忆。
Args:
end_user_id: 终端用户ID
memory_id: 记忆ID可以是情景记忆或语义记忆的ID
Returns:
情景记忆返回:
{
"memory_type": "episodic",
"title": str,
"content": str,
"emotion": Dict,
"created_at": int
}
语义记忆返回:
{
"memory_type": "semantic",
"name": str,
"core_definition": str,
"detailed_notes": str,
"created_at": int
}
Raises:
ValueError: 当记忆不存在时
"""
try:
logger.info(f"开始查询显性记忆详情: end_user_id={end_user_id}, memory_id={memory_id}")
# ========== 1. 先尝试查询情景记忆 ==========
episodic_query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $memory_id AND s.group_id = $group_id
RETURN s.name AS title,
s.content AS content,
s.created_at AS created_at
"""
episodic_result = await self.neo4j_connector.execute_query(
episodic_query,
memory_id=memory_id,
group_id=end_user_id
)
if episodic_result and len(episodic_result) > 0:
record = episodic_result[0]
title = record.get("title") or "未命名"
content = record.get("content") or ""
created_at_str = record.get("created_at")
# 使用基类方法转换时间戳
created_at_timestamp = self.parse_timestamp(created_at_str)
# 使用基类方法获取情绪信息
emotion = await self.extract_episodic_emotion(
summary_id=memory_id,
end_user_id=end_user_id
)
logger.info(f"成功获取情景记忆详情: memory_id={memory_id}")
return {
"memory_type": "episodic",
"title": title,
"content": content,
"emotion": emotion,
"created_at": created_at_timestamp
}
# ========== 2. 如果不是情景记忆,尝试查询语义记忆 ==========
semantic_query = """
MATCH (e:ExtractedEntity)
WHERE elementId(e) = $memory_id
AND e.group_id = $group_id
AND e.is_explicit_memory = true
RETURN e.name AS name,
e.description AS core_definition,
e.example AS detailed_notes,
e.created_at AS created_at
"""
semantic_result = await self.neo4j_connector.execute_query(
semantic_query,
memory_id=memory_id,
group_id=end_user_id
)
if semantic_result and len(semantic_result) > 0:
record = semantic_result[0]
name = record.get("name") or "未命名"
core_definition = record.get("core_definition") or ""
detailed_notes = record.get("detailed_notes") or ""
created_at_str = record.get("created_at")
# 使用基类方法转换时间戳
created_at_timestamp = self.parse_timestamp(created_at_str)
logger.info(f"成功获取语义记忆详情: memory_id={memory_id}")
return {
"memory_type": "semantic",
"name": name,
"core_definition": core_definition,
"detailed_notes": detailed_notes,
"created_at": created_at_timestamp
}
# ========== 3. 两种记忆都找不到 ==========
logger.warning(f"记忆不存在: memory_id={memory_id}, end_user_id={end_user_id}")
raise ValueError(f"记忆不存在: memory_id={memory_id}")
except ValueError:
# 重新抛出 ValueError记忆不存在
raise
except Exception as e:
logger.error(f"获取显性记忆详情时出错: {str(e)}", exc_info=True)
raise

View File

@@ -15,6 +15,7 @@ from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context
from app.repositories.end_user_repository import EndUserRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.services.memory_base_service import MemoryBaseService
from app.services.memory_config_service import MemoryConfigService
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
@@ -883,866 +884,6 @@ class UserMemoryService:
"failed": failed,
"errors": errors + [{"error": f"批量处理失败: {str(e)}"}]
}
async def _get_title_and_type(
self,
summary_id: str,
end_user_id: str
) -> Tuple[str, str]:
"""
读取情景记忆的标题(title)和类型(type)
仅负责读取已存在的title和type不进行生成
title从name属性读取type从memory_type属性读取
Args:
summary_id: Summary节点的ID
end_user_id: 终端用户ID (group_id)
Returns:
(标题, 类型)元组,如果不存在则返回默认值
"""
try:
# 查询Summary节点的name(作为title)和memory_type(作为type)
query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $summary_id AND s.group_id = $group_id
RETURN s.name AS title, s.memory_type AS type
"""
result = await self.neo4j_connector.execute_query(
query,
summary_id=summary_id,
group_id=end_user_id
)
if not result or len(result) == 0:
logger.warning(f"未找到 summary_id={summary_id} 的节点")
return ("未知标题", "其他")
record = result[0]
title = record.get("title") or "未命名"
episodic_type = record.get("type") or "其他"
return (title, episodic_type)
except Exception as e:
logger.error(f"读取标题和类型时出错: {str(e)}", exc_info=True)
return ("错误", "其他")
@staticmethod
async def generate_title_and_type_for_summary(
content: str,
end_user_id: str
) -> Tuple[str, str]:
"""
为MemorySummary生成标题和类型静态方法用于创建节点时调用
此方法应该在创建MemorySummary节点时调用生成title和type
Args:
content: Summary的内容文本
end_user_id: 终端用户ID (group_id)
Returns:
(标题, 类型)元组
"""
from app.core.memory.utils.prompt.prompt_utils import render_episodic_title_and_type_prompt
import json
# 定义有效的类型集合
VALID_TYPES = {
"conversation", # 对话
"project_work", # 项目/工作
"learning", # 学习
"decision", # 决策
"important_event" # 重要事件
}
DEFAULT_TYPE = "conversation" # 默认类型
try:
if not content:
logger.warning("content为空无法生成标题和类型")
return ("空内容", DEFAULT_TYPE)
# 1. 渲染Jinja2提示词模板
prompt = await render_episodic_title_and_type_prompt(content)
# 2. 调用LLM生成标题和类型
llm_client = _get_llm_client_for_user(end_user_id)
messages = [
{"role": "user", "content": prompt}
]
response = await llm_client.chat(messages=messages)
# 3. 解析LLM响应
content_response = response.content
if isinstance(content_response, list):
if len(content_response) > 0:
if isinstance(content_response[0], dict):
text = content_response[0].get('text', content_response[0].get('content', str(content_response[0])))
full_response = str(text)
else:
full_response = str(content_response[0])
else:
full_response = ""
elif isinstance(content_response, dict):
full_response = str(content_response.get('text', content_response.get('content', str(content_response))))
else:
full_response = str(content_response) if content_response is not None else ""
# 4. 解析JSON响应
try:
# 尝试从响应中提取JSON
# 移除可能的markdown代码块标记
json_str = full_response.strip()
if json_str.startswith("```json"):
json_str = json_str[7:]
if json_str.startswith("```"):
json_str = json_str[3:]
if json_str.endswith("```"):
json_str = json_str[:-3]
json_str = json_str.strip()
result_data = json.loads(json_str)
title = result_data.get("title", "未知标题")
episodic_type_raw = result_data.get("type", DEFAULT_TYPE)
# 5. 校验和归一化类型
# 将类型转换为小写并去除空格
episodic_type_normalized = str(episodic_type_raw).lower().strip()
# 检查是否在有效类型集合中
if episodic_type_normalized in VALID_TYPES:
episodic_type = episodic_type_normalized
else:
# 尝试映射常见的中文类型到英文
type_mapping = {
"对话": "conversation",
"项目": "project_work",
"工作": "project_work",
"项目/工作": "project_work",
"学习": "learning",
"决策": "decision",
"重要事件": "important_event",
"事件": "important_event"
}
episodic_type = type_mapping.get(episodic_type_raw, DEFAULT_TYPE)
logger.warning(
f"LLM返回的类型 '{episodic_type_raw}' 不在有效集合中,"
f"已归一化为 '{episodic_type}'"
)
logger.info(f"成功生成标题和类型: title={title}, type={episodic_type}")
return (title, episodic_type)
except json.JSONDecodeError:
logger.error(f"无法解析LLM响应为JSON: {full_response}")
return ("解析失败", DEFAULT_TYPE)
except Exception as e:
logger.error(f"生成标题和类型时出错: {str(e)}", exc_info=True)
return ("错误", DEFAULT_TYPE)
async def _extract_involved_objects(
self,
summary_id: str,
end_user_id: str
) -> List[str]:
"""
提取情景记忆涉及的前3个最重要实体
Args:
summary_id: Summary节点的ID
end_user_id: 终端用户ID (group_id)
Returns:
前3个实体的name属性列表
"""
try:
# 查询Summary节点指向的Statement节点,再查询Statement指向的ExtractedEntity节点
# 按activation_value降序排序,返回前3个
query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $summary_id AND s.group_id = $group_id
MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement)
MATCH (stmt)-[:REFERENCES_ENTITY]->(entity:ExtractedEntity)
WHERE entity.activation_value IS NOT NULL
RETURN DISTINCT entity.name AS name, entity.activation_value AS activation
ORDER BY activation DESC
LIMIT 3
"""
result = await self.neo4j_connector.execute_query(
query,
summary_id=summary_id,
group_id=end_user_id
)
# 提取实体名称
involved_objects = [record["name"] for record in result if record.get("name")]
logger.info(f"成功提取 summary_id={summary_id} 的涉及对象: {involved_objects}")
return involved_objects
except Exception as e:
logger.error(f"提取涉及对象时出错: {str(e)}", exc_info=True)
return []
async def _extract_content_records(
self,
summary_id: str,
end_user_id: str
) -> List[str]:
"""
提取情景记忆的内容记录
Args:
summary_id: Summary节点的ID
end_user_id: 终端用户ID (group_id)
Returns:
所有Statement节点的statement属性内容列表
"""
try:
# 查询Summary节点指向的所有Statement节点
query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $summary_id AND s.group_id = $group_id
MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement)
WHERE stmt.statement IS NOT NULL AND stmt.statement <> ''
RETURN stmt.statement AS statement
"""
result = await self.neo4j_connector.execute_query(
query,
summary_id=summary_id,
group_id=end_user_id
)
# 提取statement内容
content_records = [record["statement"] for record in result if record.get("statement")]
logger.info(f"成功提取 summary_id={summary_id} 的内容记录,共 {len(content_records)}")
return content_records
except Exception as e:
logger.error(f"提取内容记录时出错: {str(e)}", exc_info=True)
return []
async def _extract_episodic_emotion(
self,
summary_id: str,
end_user_id: str
) -> Optional[str]:
"""
提取情景记忆的主要情绪
Args:
summary_id: Summary节点的ID
end_user_id: 终端用户ID (group_id)
Returns:
最大emotion_intensity对应的emotion_type,如果没有则返回None
"""
try:
# 查询Summary节点指向的所有Statement节点
# 筛选具有emotion_type属性的节点
# 按emotion_intensity降序排序,返回第一个
query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $summary_id AND s.group_id = $group_id
MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement)
WHERE stmt.emotion_type IS NOT NULL
AND stmt.emotion_intensity IS NOT NULL
RETURN stmt.emotion_type AS emotion_type,
stmt.emotion_intensity AS emotion_intensity
ORDER BY emotion_intensity DESC
LIMIT 1
"""
result = await self.neo4j_connector.execute_query(
query,
summary_id=summary_id,
group_id=end_user_id
)
# 提取emotion_type
if result and len(result) > 0:
emotion_type = result[0].get("emotion_type")
logger.info(f"成功提取 summary_id={summary_id} 的情绪: {emotion_type}")
return emotion_type
else:
logger.info(f"summary_id={summary_id} 没有情绪信息")
return None
except Exception as e:
logger.error(f"提取情景记忆情绪时出错: {str(e)}", exc_info=True)
return None
async def get_episodic_memory_overview(
self,
db: Session,
end_user_id: str,
time_range: str = "all",
episodic_type: str = "all",
title_keyword: Optional[str] = None
) -> Dict[str, Any]:
"""
获取情景记忆总览信息
Args:
db: 数据库会话
end_user_id: 终端用户ID
time_range: 时间范围筛选
episodic_type: 情景类型筛选
title_keyword: 标题关键词(可选,用于模糊搜索)
"""
try:
logger.info(
f"开始查询 end_user_id={end_user_id} 的情景记忆总览, "
f"time_range={time_range}, episodic_type={episodic_type}, title_keyword={title_keyword}"
)
# 1. 先查询所有情景记忆的总数(不受筛选条件限制)
total_all_query = """
MATCH (s:MemorySummary)
WHERE s.group_id = $group_id
RETURN count(s) AS total_all
"""
total_all_result = await self.neo4j_connector.execute_query(
total_all_query,
group_id=end_user_id
)
total_all = total_all_result[0]["total_all"] if total_all_result else 0
# 2. 计算时间范围的起始时间戳
time_filter = self._calculate_time_filter(time_range)
# 3. 构建Cypher查询
query = """
MATCH (s:MemorySummary)
WHERE s.group_id = $group_id
"""
# 添加时间范围过滤
if time_filter:
query += " AND s.created_at >= $time_filter"
# 添加标题关键词过滤如果提供了title_keyword
if title_keyword:
query += " AND toLower(s.name) CONTAINS toLower($title_keyword)"
query += """
RETURN elementId(s) AS id,
s.created_at AS created_at,
s.memory_type AS type,
s.name AS title
ORDER BY s.created_at DESC
"""
params = {"group_id": end_user_id}
if time_filter:
params["time_filter"] = time_filter
if title_keyword:
params["title_keyword"] = title_keyword
result = await self.neo4j_connector.execute_query(query, **params)
# 4. 如果没有数据,返回空列表
if not result:
logger.info(f"end_user_id={end_user_id} 没有情景记忆数据")
return {
"total": 0,
"total_all": total_all,
"episodic_memories": []
}
# 5. 对每个节点读取标题和类型,并应用类型筛选
episodic_memories = []
for record in result:
summary_id = record["id"]
created_at_str = record.get("created_at")
memory_type = record.get("type", "其他")
title = record.get("title") or "未命名" # 直接从查询结果获取标题
# 应用情景类型筛选
if episodic_type != "all":
# 检查类型是否匹配
# 注意Neo4j 中存储的 memory_type 现在应该是英文格式(如 "conversation", "project_work"
# 但为了兼容旧数据,我们也支持中文格式的匹配
type_mapping = {
"conversation": "对话",
"project_work": "项目/工作",
"learning": "学习",
"decision": "决策",
"important_event": "重要事件"
}
# 获取对应的中文类型(用于兼容旧数据)
chinese_type = type_mapping.get(episodic_type)
# 检查类型是否匹配(支持新的英文格式和旧的中文格式)
if memory_type != episodic_type and memory_type != chinese_type:
continue
# 转换时间戳
created_at_timestamp = None
if created_at_str:
try:
from datetime import datetime
dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
created_at_timestamp = int(dt_object.timestamp() * 1000)
except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}")
episodic_memories.append({
"id": summary_id,
"title": title,
"type": memory_type,
"created_at": created_at_timestamp
})
logger.info(
f"成功获取 end_user_id={end_user_id} 的情景记忆总览,"
f"筛选后 {len(episodic_memories)} 条,总共 {total_all}"
)
return {
"total": len(episodic_memories),
"total_all": total_all,
"episodic_memories": episodic_memories
}
except Exception as e:
logger.error(f"获取情景记忆总览时出错: {str(e)}", exc_info=True)
raise
def _calculate_time_filter(self, time_range: str) -> Optional[str]:
"""
根据时间范围计算过滤的起始时间
Args:
time_range: 时间范围 (all/today/this_week/this_month)
Returns:
ISO格式的时间字符串如果是"all"则返回None
"""
from datetime import datetime, timedelta
import pytz
if time_range == "all":
return None
# 获取当前时间UTC
now = datetime.now(pytz.UTC)
if time_range == "today":
# 今天的开始时间00:00:00
start_time = now.replace(hour=0, minute=0, second=0, microsecond=0)
elif time_range == "this_week":
# 本周的开始时间周一00:00:00
days_since_monday = now.weekday()
start_time = (now - timedelta(days=days_since_monday)).replace(
hour=0, minute=0, second=0, microsecond=0
)
elif time_range == "this_month":
# 本月的开始时间1号00:00:00
start_time = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
else:
return None
# 返回ISO格式字符串
return start_time.isoformat()
async def get_episodic_memory_details(
self,
db: Session,
end_user_id: str,
summary_id: str
) -> Dict[str, Any]:
"""
获取单个情景记忆的详细信息
"""
try:
logger.info(f"开始查询 end_user_id={end_user_id}, summary_id={summary_id} 的情景记忆详情")
# 1. 查询指定的MemorySummary节点
query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $summary_id AND s.group_id = $group_id
RETURN elementId(s) AS id, s.created_at AS created_at
"""
result = await self.neo4j_connector.execute_query(
query,
summary_id=summary_id,
group_id=end_user_id
)
# 2. 如果节点不存在,返回错误
if not result or len(result) == 0:
logger.warning(f"未找到 summary_id={summary_id} 的情景记忆")
raise ValueError(f"情景记忆不存在: summary_id={summary_id}")
# 3. 获取基本信息
record = result[0]
created_at_str = record.get("created_at")
# 转换时间戳
created_at_timestamp = None
if created_at_str:
try:
from datetime import datetime
dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
created_at_timestamp = int(dt_object.timestamp() * 1000)
except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}")
# 4. 调用_get_title_and_type读取标题和类型
title, episodic_type = await self._get_title_and_type(
summary_id=summary_id,
end_user_id=end_user_id
)
# 5. 调用_extract_involved_objects提取涉及对象
involved_objects = await self._extract_involved_objects(
summary_id=summary_id,
end_user_id=end_user_id
)
# 6. 调用_extract_content_records提取内容记录
content_records = await self._extract_content_records(
summary_id=summary_id,
end_user_id=end_user_id
)
# 7. 调用_extract_episodic_emotion提取情绪
emotion = await self._extract_episodic_emotion(
summary_id=summary_id,
end_user_id=end_user_id
)
# 8. 返回完整的详情信息
details = {
"id": summary_id,
"created_at": created_at_timestamp,
"involved_objects": involved_objects,
"episodic_type": episodic_type,
"content_records": content_records,
"emotion": emotion
}
logger.info(f"成功获取 summary_id={summary_id} 的情景记忆详情")
return details
except ValueError:
# 重新抛出ValueError让Controller层处理
raise
except Exception as e:
logger.error(f"获取情景记忆详情时出错: {str(e)}", exc_info=True)
raise
async def get_explicit_memory_overview(
self,
db: Session,
end_user_id: str
) -> Dict[str, Any]:
"""
获取显性记忆总览信息
返回两部分:
1. 情景记忆episodic_memories- 来自MemorySummary节点
2. 语义记忆semantic_memories- 来自ExtractedEntity节点is_explicit_memory=true
Args:
db: 数据库会话
end_user_id: 终端用户ID
Returns:
{
"total": int,
"episodic_memories": [
{
"id": str,
"title": str,
"content": str,
"created_at": int,
"emotion": Dict
}
],
"semantic_memories": [
{
"id": str,
"name": str,
"entity_type": str,
"core_definition": str,
"detailed_notes": str,
"created_at": int
}
]
}
"""
try:
logger.info(f"开始查询 end_user_id={end_user_id} 的显性记忆总览(情景记忆+语义记忆)")
# ========== 1. 查询情景记忆MemorySummary节点 ==========
episodic_query = """
MATCH (s:MemorySummary)
WHERE s.group_id = $group_id
RETURN elementId(s) AS id,
s.name AS title,
s.content AS content,
s.created_at AS created_at
ORDER BY s.created_at DESC
"""
episodic_result = await self.neo4j_connector.execute_query(
episodic_query,
group_id=end_user_id
)
# 处理情景记忆数据
episodic_memories = []
if episodic_result:
for record in episodic_result:
summary_id = record["id"]
title = record.get("title") or "未命名"
content = record.get("content") or ""
created_at_str = record.get("created_at")
# 转换时间戳
created_at_timestamp = None
if created_at_str:
try:
from datetime import datetime
dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
created_at_timestamp = int(dt_object.timestamp() * 1000)
except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}")
# 注意:总览接口不返回 emotion 字段
episodic_memories.append({
"id": summary_id,
"title": title,
"content": content,
"created_at": created_at_timestamp
})
# ========== 2. 查询语义记忆ExtractedEntity节点 ==========
semantic_query = """
MATCH (e:ExtractedEntity)
WHERE e.group_id = $group_id
AND e.is_explicit_memory = true
RETURN elementId(e) AS id,
e.name AS name,
e.entity_type AS entity_type,
e.description AS core_definition,
e.example AS detailed_notes,
e.created_at AS created_at
ORDER BY e.created_at DESC
"""
semantic_result = await self.neo4j_connector.execute_query(
semantic_query,
group_id=end_user_id
)
# 处理语义记忆数据
semantic_memories = []
if semantic_result:
for record in semantic_result:
entity_id = record["id"]
name = record.get("name") or "未命名"
entity_type = record.get("entity_type") or "未分类"
core_definition = record.get("core_definition") or ""
created_at_str = record.get("created_at")
# 转换时间戳
created_at_timestamp = None
if created_at_str:
try:
from datetime import datetime
dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
created_at_timestamp = int(dt_object.timestamp() * 1000)
except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}")
# 注意:总览接口不返回 detailed_notes 字段
semantic_memories.append({
"id": entity_id,
"name": name,
"entity_type": entity_type,
"core_definition": core_definition,
"created_at": created_at_timestamp
})
# ========== 3. 返回结果 ==========
total_count = len(episodic_memories) + len(semantic_memories)
logger.info(
f"成功获取 end_user_id={end_user_id} 的显性记忆总览,"
f"情景记忆={len(episodic_memories)} 条,语义记忆={len(semantic_memories)} 条,"
f"总计 {total_count}"
)
return {
"total": total_count,
"episodic_memories": episodic_memories,
"semantic_memories": semantic_memories
}
except Exception as e:
logger.error(f"获取显性记忆总览时出错: {str(e)}", exc_info=True)
raise
async def get_explicit_memory_details(
self,
db: Session,
end_user_id: str,
memory_id: str
) -> Dict[str, Any]:
"""
获取显性记忆详情
根据 memory_id 查询情景记忆或语义记忆的详细信息。
先尝试查询情景记忆,如果找不到再查询语义记忆。
Args:
db: 数据库会话
end_user_id: 终端用户ID
memory_id: 记忆ID可以是情景记忆或语义记忆的ID
Returns:
情景记忆返回:
{
"memory_type": "episodic",
"title": str,
"content": str,
"emotion": Dict,
"created_at": int
}
语义记忆返回:
{
"memory_type": "semantic",
"name": str,
"core_definition": str,
"detailed_notes": str,
"created_at": int
}
Raises:
ValueError: 当记忆不存在时
"""
try:
logger.info(f"开始查询显性记忆详情: end_user_id={end_user_id}, memory_id={memory_id}")
# ========== 1. 先尝试查询情景记忆 ==========
episodic_query = """
MATCH (s:MemorySummary)
WHERE elementId(s) = $memory_id AND s.group_id = $group_id
RETURN s.name AS title,
s.content AS content,
s.created_at AS created_at
"""
episodic_result = await self.neo4j_connector.execute_query(
episodic_query,
memory_id=memory_id,
group_id=end_user_id
)
if episodic_result and len(episodic_result) > 0:
record = episodic_result[0]
title = record.get("title") or "未命名"
content = record.get("content") or ""
created_at_str = record.get("created_at")
# 转换时间戳
created_at_timestamp = None
if created_at_str:
try:
from datetime import datetime
dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
created_at_timestamp = int(dt_object.timestamp() * 1000)
except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}")
# 获取情绪信息
emotion = await self._extract_episodic_emotion(
summary_id=memory_id,
end_user_id=end_user_id
)
logger.info(f"成功获取情景记忆详情: memory_id={memory_id}")
return {
"memory_type": "episodic",
"title": title,
"content": content,
"emotion": emotion,
"created_at": created_at_timestamp
}
# ========== 2. 如果不是情景记忆,尝试查询语义记忆 ==========
semantic_query = """
MATCH (e:ExtractedEntity)
WHERE elementId(e) = $memory_id
AND e.group_id = $group_id
AND e.is_explicit_memory = true
RETURN e.name AS name,
e.description AS core_definition,
e.example AS detailed_notes,
e.created_at AS created_at
"""
semantic_result = await self.neo4j_connector.execute_query(
semantic_query,
memory_id=memory_id,
group_id=end_user_id
)
if semantic_result and len(semantic_result) > 0:
record = semantic_result[0]
name = record.get("name") or "未命名"
core_definition = record.get("core_definition") or ""
detailed_notes = record.get("detailed_notes") or ""
created_at_str = record.get("created_at")
# 转换时间戳
created_at_timestamp = None
if created_at_str:
try:
from datetime import datetime
dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
created_at_timestamp = int(dt_object.timestamp() * 1000)
except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}")
logger.info(f"成功获取语义记忆详情: memory_id={memory_id}")
return {
"memory_type": "semantic",
"name": name,
"core_definition": core_definition,
"detailed_notes": detailed_notes,
"created_at": created_at_timestamp
}
# ========== 3. 两种记忆都找不到 ==========
logger.warning(f"记忆不存在: memory_id={memory_id}, end_user_id={end_user_id}")
raise ValueError(f"记忆不存在: memory_id={memory_id}")
except ValueError:
# 重新抛出 ValueError记忆不存在
raise
except Exception as e:
logger.error(f"获取显性记忆详情时出错: {str(e)}", exc_info=True)
raise
# 独立的分析函数
@@ -2055,17 +1196,18 @@ async def analytics_memory_types(
end_user_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
统计8种记忆类型的数量和百分比
统计9种记忆类型的数量和百分比
计算规则:
1. 感知记忆 (PERCEPTUAL_MEMORY) = statement + entity
2. 工作记忆 (WORKING_MEMORY) = chunk + entity
3. 短期记忆 (SHORT_TERM_MEMORY) = chunk
4. 长期记忆 (LONG_TERM_MEMORY) = entity
5. 显性记忆 (EXPLICIT_MEMORY) = 1/2 * entity
5. 显性记忆 (EXPLICIT_MEMORY) = 情景记忆 + 语义记忆(通过 MemoryBaseService.get_explicit_memory_count 获取)
6. 隐性记忆 (IMPLICIT_MEMORY) = 1/3 * entity
7. 情绪记忆 (EMOTIONAL_MEMORY) = statement
8. 情景记忆 (EPISODIC_MEMORY) = memory_summary
7. 情绪记忆 (EMOTIONAL_MEMORY) = 情绪标签统计总数(通过 MemoryBaseService.get_emotional_memory_count 获取)
8. 情景记忆 (EPISODIC_MEMORY) = memory_summary(通过 MemoryBaseService.get_episodic_memory_count 获取)
9. 遗忘记忆 (FORGET_MEMORY) = 激活值低于阈值的节点数(通过 MemoryBaseService.get_forget_memory_count 获取)
Args:
db: 数据库会话
@@ -2090,13 +1232,16 @@ async def analytics_memory_types(
- IMPLICIT_MEMORY: 隐性记忆
- EMOTIONAL_MEMORY: 情绪记忆
- EPISODIC_MEMORY: 情景记忆
- FORGET_MEMORY: 遗忘记忆
"""
# 定义需要查询的节点类型
# 初始化基础服务
base_service = MemoryBaseService()
# 定义需要查询的基础节点类型
node_types = {
"Statement": "Statement",
"Entity": "ExtractedEntity",
"Chunk": "Chunk",
"MemorySummary": "MemorySummary"
"Chunk": "Chunk"
}
# 存储每种节点类型的计数
@@ -2126,18 +1271,45 @@ async def analytics_memory_types(
statement_count = node_counts.get("Statement", 0)
entity_count = node_counts.get("Entity", 0)
chunk_count = node_counts.get("Chunk", 0)
memory_summary_count = node_counts.get("MemorySummary", 0)
# 按规则计算8种记忆类型的数量使用英文枚举作为key
# 获取用户的遗忘阈值配置
forgetting_threshold = 0.3 # 默认值
if end_user_id:
try:
from app.services.memory_agent_service import get_end_user_connected_config
from app.core.memory.storage_services.forgetting_engine.config_utils import load_actr_config_from_db
# 获取用户关联的 config_id
connected_config = get_end_user_connected_config(end_user_id, db)
config_id = connected_config.get('memory_config_id')
if config_id:
# 从数据库加载配置
config = load_actr_config_from_db(db, config_id)
forgetting_threshold = config.get('forgetting_threshold', 0.3)
logger.debug(f"使用用户配置的遗忘阈值: {forgetting_threshold} (end_user_id={end_user_id}, config_id={config_id})")
else:
logger.debug(f"用户未关联配置,使用默认遗忘阈值: {forgetting_threshold} (end_user_id={end_user_id})")
except Exception as e:
logger.warning(f"获取用户遗忘阈值配置失败,使用默认值 {forgetting_threshold}: {str(e)}")
# 使用 MemoryBaseService 的共享方法获取特殊记忆类型的数量
episodic_count = await base_service.get_episodic_memory_count(end_user_id)
explicit_count = await base_service.get_explicit_memory_count(end_user_id)
emotion_count = await base_service.get_emotional_memory_count(end_user_id, statement_count)
forget_count = await base_service.get_forget_memory_count(end_user_id, forgetting_threshold)
# 按规则计算9种记忆类型的数量使用英文枚举作为key
memory_counts = {
"PERCEPTUAL_MEMORY": statement_count + entity_count, # 感知记忆
"WORKING_MEMORY": chunk_count + entity_count, # 工作记忆
"SHORT_TERM_MEMORY": chunk_count, # 短期记忆
"LONG_TERM_MEMORY": entity_count, # 长期记忆
"EXPLICIT_MEMORY": entity_count // 2, # 显性记忆 (1/2 entity)
"EXPLICIT_MEMORY": explicit_count, # 显性记忆(情景记忆 + 语义记忆)
"IMPLICIT_MEMORY": entity_count // 3, # 隐性记忆 (1/3 entity)
"EMOTIONAL_MEMORY": statement_count, # 情绪记忆
"EPISODIC_MEMORY": memory_summary_count # 情景记忆
"EMOTIONAL_MEMORY": emotion_count, # 情绪记忆(使用情绪标签统计)
"EPISODIC_MEMORY": episodic_count, # 情景记忆
"FORGET_MEMORY": forget_count # 遗忘记忆(激活值低于阈值)
}
# 计算总数