Files
MemoryBear/api/app/services/memory_entity_relationship_service.py
lixinyue11 009ceefa30 新增记忆空间详情 (#58)
* 新增记忆空间详情

* 新增记忆空间详情
2026-01-08 17:51:49 +08:00

465 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from app.repositories.neo4j.cypher_queries import (
Memory_Timeline_ExtractedEntity,
Memory_Timeline_MemorySummary,
Memory_Timeline_Statement,
Memory_Space_Emotion_Statement,
Memory_Space_Emotion_MemorySummary,
Memory_Space_Emotion_ExtractedEntity,
Memory_Space_Interaction_Statement,
Memory_Space_Interaction_ExtractedEntity,
Memory_Space_Interaction_Summary
)
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from typing import Dict, List, Any, Optional
import logging
from neo4j.time import DateTime as Neo4jDateTime
import json
from datetime import datetime
logger = logging.getLogger(__name__)
class MemoryEntityService:
def __init__(self, id: str, table: str):
self.id = id
self.table = table
self.connector = Neo4jConnector()
async def get_timeline_memories_server(self):
"""
获取时间线记忆数据
Args:
id: 节点ID
table: 节点类型/标签
Returns:
Dict包含
- success: 是否成功
- data: 时间线数据列表
- total: 数据总数
- error: 错误信息(如果有)
根据不同标签返回相应字段:
- MemorySummary: content字段
- Statement: statement字段
- ExtractedEntity: name字段
"""
try:
logger.info(f"获取时间线记忆数据 - ID: {self.id}, Table: {self.table}")
# 根据表类型选择查询
if self.table == 'Statement':
# Statement只需要输入ID使用简化查询
results = await self.connector.execute_query(Memory_Timeline_Statement, id=self.id)
elif self.table == 'ExtractedEntity':
# ExtractedEntity类型查询
results = await self.connector.execute_query(Memory_Timeline_ExtractedEntity, id=self.id)
else:
# MemorySummary类型查询
results = await self.connector.execute_query(Memory_Timeline_MemorySummary, id=self.id)
# 记录查询结果的类型和内容用于调试
logger.info(f"时间线查询结果类型: {type(results)}, 长度: {len(results) if isinstance(results, list) else 'N/A'}")
# 处理查询结果
timeline_data = self._process_timeline_results(results)
logger.info(f"成功获取时间线记忆数据: 总计 {len(timeline_data.get('timelines_memory', []))}")
return {
'success': True,
'data': timeline_data,
}
except Exception as e:
logger.error(f"获取时间线记忆数据失败: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e),
'data': {
"MemorySummary": [],
"Statement": [],
"ExtractedEntity": [],
"timelines_memory": []
},
'total': 0
}
def _process_timeline_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
处理时间线查询结果
Args:
results: Neo4j查询结果
Returns:
处理后的时间线数据字典
"""
# 检查results是否为空或不是列表
if not results or not isinstance(results, list):
logger.warning(f"时间线查询结果为空或格式不正确: {type(results)}")
return {
"MemorySummary": [],
"Statement": [],
"ExtractedEntity": [],
"timelines_memory": []
}
memory_summary_list = []
statement_list = []
extracted_entity_list = []
for data in results:
# 检查data是否为字典类型
if not isinstance(data, dict):
logger.warning(f"跳过非字典类型的记录: {type(data)} - {data}")
continue
# 处理MemorySummary
summary = data.get('MemorySummary')
if summary is not None:
processed_summary = self._process_field_value(summary, "MemorySummary")
memory_summary_list.extend(processed_summary)
# 处理Statement
statement = data.get('statement')
if statement is not None:
processed_statement = self._process_field_value(statement, "Statement")
statement_list.extend(processed_statement)
# 处理ExtractedEntity
extracted_entity = data.get('ExtractedEntity')
if extracted_entity is not None:
processed_entity = self._process_field_value(extracted_entity, "ExtractedEntity")
extracted_entity_list.extend(processed_entity)
# 去重
memory_summary_list = list(set(memory_summary_list))
statement_list = list(set(statement_list))
extracted_entity_list = list(set(extracted_entity_list))
# 合并所有数据
all_timeline_data = memory_summary_list + statement_list + extracted_entity_list
result = {
"MemorySummary": memory_summary_list,
"Statement": statement_list,
"ExtractedEntity": extracted_entity_list,
"timelines_memory": all_timeline_data
}
logger.info(f"时间线数据处理完成: MemorySummary={len(memory_summary_list)}, Statement={len(statement_list)}, ExtractedEntity={len(extracted_entity_list)}")
return result
def _process_field_value(self, value: Any, field_name: str) -> List[str]:
"""
处理字段值,支持字符串、列表等类型
Args:
value: 字段值
field_name: 字段名称(用于日志)
Returns:
处理后的字符串列表
"""
processed_values = []
try:
if isinstance(value, list):
# 如果是列表,处理每个元素
for item in value:
if item is not None and str(item).strip() != '' and "MemorySummaryChunk" not in str(item):
processed_values.append(str(item))
elif isinstance(value, str):
# 如果是字符串,直接处理
if value.strip() != '' and "MemorySummaryChunk" not in value:
processed_values.append(value)
elif value is not None:
# 其他类型转换为字符串
str_value = str(value)
if str_value.strip() != '' and "MemorySummaryChunk" not in str_value:
processed_values.append(str_value)
except Exception as e:
logger.warning(f"处理字段 {field_name} 的值时出错: {e}, 值类型: {type(value)}, 值: {value}")
return processed_values
async def close(self):
"""关闭数据库连接"""
await self.connector.close()
class MemoryEmotion:
def __init__(self, id: str, table: str):
self.id = id
self.table = table
self.connector = Neo4jConnector()
def _convert_neo4j_types(self, obj: Any) -> Any:
"""
递归转换Neo4j特殊类型为可序列化的Python类型
"""
if isinstance(obj, Neo4jDateTime):
# 转换为用户友好的日期格式
return self._format_datetime(obj.iso_format())
elif hasattr(obj, '__class__') and 'neo4j' in str(obj.__class__):
if hasattr(obj, 'iso_format'):
return self._format_datetime(obj.iso_format())
elif hasattr(obj, '__str__'):
return str(obj)
else:
return repr(obj)
elif isinstance(obj, dict):
return {k: self._convert_neo4j_types(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._convert_neo4j_types(item) for item in obj]
elif isinstance(obj, tuple):
return tuple(self._convert_neo4j_types(item) for item in obj)
else:
return obj
def _format_datetime(self, iso_string: str) -> str:
"""
将ISO格式的日期时间字符串转换为用户友好的格式
Args:
iso_string: ISO格式的日期时间字符串"2026-01-07T13:40:33.679530"
Returns:
格式化后的日期时间字符串,如 "2026-01-07 13:40:33"
"""
try:
# 解析ISO格式的日期时间
dt = datetime.fromisoformat(iso_string.replace('Z', '+00:00'))
# 返回用户友好的格式YYYY-MM-DD HH:MM:SS
return dt.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, AttributeError):
# 如果解析失败,返回原始字符串
return iso_string
async def get_emotion(self) -> Dict[str, Any]:
"""
获取情绪随时间变化数据
Returns:
包含情绪数据的字典
"""
try:
logger.info(f"获取情绪数据 - ID: {self.id}, Table: {self.table}")
if self.table == 'Statement':
results = await self.connector.execute_query(Memory_Space_Emotion_Statement, id=self.id)
elif self.table == 'ExtractedEntity':
results = await self.connector.execute_query(Memory_Space_Emotion_ExtractedEntity, id=self.id)
else:
# MemorySummary/Chunk类型查询
results = await self.connector.execute_query(Memory_Space_Emotion_MemorySummary, id=self.id)
# 处理查询结果
emotion_data = self._process_emotion_results(results)
# 转换Neo4j类型
final_data = self._convert_neo4j_types(emotion_data)
logger.info(f"成功获取 {len(final_data)} 条情绪数据")
return {
'success': True,
'data': final_data,
'total': len(final_data)
}
except Exception as e:
logger.error(f"获取情绪数据失败: {str(e)}")
return {
'success': False,
'error': str(e),
'data': [],
'total': 0
}
def _process_emotion_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
处理情绪查询结果
Args:
results: Neo4j查询结果
Returns:
处理后的情绪数据列表
"""
emotion_data = []
# 检查results是否为空或不是列表
if not results or not isinstance(results, list):
logger.warning(f"情绪查询结果为空或格式不正确: {type(results)}")
return emotion_data
for record in results:
# 检查record是否为字典类型
if not isinstance(record, dict):
logger.warning(f"跳过非字典类型的记录: {type(record)} - {record}")
continue
# 获取创建时间并格式化
created_at = record.get('created_at')
formatted_created_at = created_at
# 如果created_at是字符串格式尝试格式化
if isinstance(created_at, str):
formatted_created_at = self._format_datetime(created_at)
emotion_type = record.get('emotion_type')
emotion_intensity = record.get('emotion_intensity')
if emotion_type is not None and emotion_intensity is not None:
# 只保留情绪相关的字段
emotion_record = {
'emotion_intensity': emotion_intensity,
'emotion_type': emotion_type,
'created_at': formatted_created_at
}
emotion_data.append(emotion_record)
return emotion_data
async def close(self):
"""关闭数据库连接"""
await self.connector.close()
class MemoryInteraction:
def __init__(self, id: str, table: str):
self.id = id
self.table = table
self.connector = Neo4jConnector()
def _convert_neo4j_types(self, obj: Any) -> Any:
"""
递归转换Neo4j特殊类型为可序列化的Python类型
"""
if isinstance(obj, Neo4jDateTime):
# 转换为用户友好的日期格式
return self._format_datetime(obj.iso_format())
elif hasattr(obj, '__class__') and 'neo4j' in str(obj.__class__):
if hasattr(obj, 'iso_format'):
return self._format_datetime(obj.iso_format())
elif hasattr(obj, '__str__'):
return str(obj)
else:
return repr(obj)
elif isinstance(obj, dict):
return {k: self._convert_neo4j_types(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._convert_neo4j_types(item) for item in obj]
elif isinstance(obj, tuple):
return tuple(self._convert_neo4j_types(item) for item in obj)
else:
return obj
def _format_datetime(self, iso_string: str) -> str:
"""
将ISO格式的日期时间字符串转换为用户友好的格式
Args:
iso_string: ISO格式的日期时间字符串"2026-01-07T13:40:33.679530"
Returns:
格式化后的日期时间字符串,如 "2026-01-07 13:40:33"
"""
try:
# 解析ISO格式的日期时间
dt = datetime.fromisoformat(iso_string.replace('Z', '+00:00'))
# 返回用户友好的格式YYYY-MM-DD HH:MM:SS
return dt.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, AttributeError):
# 如果解析失败,返回原始字符串
return iso_string
async def get_interaction_frequency(self) -> Dict[str, Any]:
"""
获取交互频率数据
Returns:
包含交互数据的字典
"""
try:
logger.info(f"获取交互数据 - ID: {self.id}, Table: {self.table}")
if self.table == 'Statement':
results = await self.connector.execute_query(Memory_Space_Interaction_Statement, id=self.id)
elif self.table == 'ExtractedEntity':
results = await self.connector.execute_query(Memory_Space_Interaction_ExtractedEntity, id=self.id)
else:
# MemorySummary/Chunk类型查询
results = await self.connector.execute_query(Memory_Space_Interaction_Summary, id=self.id)
# 处理查询结果
interaction_data = self._process_interaction_results(results)
# 转换Neo4j类型
final_data = self._convert_neo4j_types(interaction_data)
logger.info(f"成功获取 {len(final_data)} 条交互数据")
return {
'success': True,
'data': final_data,
'total': len(final_data)
}
except Exception as e:
logger.error(f"获取交互数据失败: {str(e)}")
return {
'success': False,
'error': str(e),
'data': [],
'total': 0
}
def _process_interaction_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
处理交互查询结果
Args:
results: Neo4j查询结果
Returns:
处理后的交互数据列表
"""
interaction_data = []
# 检查results是否为空或不是列表
if not results or not isinstance(results, list):
logger.warning(f"交互查询结果为空或格式不正确: {type(results)}")
return interaction_data
for record in results:
# 检查record是否为字典类型
if not isinstance(record, dict):
logger.warning(f"跳过非字典类型的记录: {type(record)} - {record}")
continue
# 只保留交互相关的字段
name = record.get('name')
if name is not None:
interaction_record = {
'name': name,
'importance_score': record.get('importance_score', 0.0),
'interaction_count': record.get('interaction_count', 1) # 默认交互次数为1
}
interaction_data.append(interaction_record)
return interaction_data
async def close(self):
"""关闭数据库连接"""
await self.connector.close()