465 lines
17 KiB
Python
465 lines
17 KiB
Python
|
||
from app.repositories.neo4j.cypher_queries import (
|
||
Memory_Timeline_ExtractedEntity,
|
||
Memory_Timeline_MemorySummary,
|
||
Memory_Timeline_Statement,
|
||
Memory_Space_Emotion_Statement,
|
||
Memory_Space_Emotion_MemorySummary,
|
||
Memory_Space_Emotion_ExtractedEntity,
|
||
Memory_Space_Interaction_Statement,
|
||
Memory_Space_Interaction_ExtractedEntity,
|
||
Memory_Space_Interaction_Summary
|
||
)
|
||
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
|
||
from typing import Dict, List, Any, Optional
|
||
import logging
|
||
from neo4j.time import DateTime as Neo4jDateTime
|
||
import json
|
||
from datetime import datetime
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class MemoryEntityService:
|
||
def __init__(self, id: str, table: str):
|
||
self.id = id
|
||
self.table = table
|
||
self.connector = Neo4jConnector()
|
||
|
||
|
||
|
||
async def get_timeline_memories_server(self):
|
||
"""
|
||
获取时间线记忆数据
|
||
|
||
Args:
|
||
id: 节点ID
|
||
table: 节点类型/标签
|
||
|
||
Returns:
|
||
Dict包含:
|
||
- success: 是否成功
|
||
- data: 时间线数据列表
|
||
- total: 数据总数
|
||
- error: 错误信息(如果有)
|
||
|
||
根据不同标签返回相应字段:
|
||
- MemorySummary: content字段
|
||
- Statement: statement字段
|
||
- ExtractedEntity: name字段
|
||
"""
|
||
try:
|
||
logger.info(f"获取时间线记忆数据 - ID: {self.id}, Table: {self.table}")
|
||
|
||
# 根据表类型选择查询
|
||
if self.table == 'Statement':
|
||
# Statement只需要输入ID,使用简化查询
|
||
results = await self.connector.execute_query(Memory_Timeline_Statement, id=self.id)
|
||
elif self.table == 'ExtractedEntity':
|
||
# ExtractedEntity类型查询
|
||
results = await self.connector.execute_query(Memory_Timeline_ExtractedEntity, id=self.id)
|
||
else:
|
||
# MemorySummary类型查询
|
||
results = await self.connector.execute_query(Memory_Timeline_MemorySummary, id=self.id)
|
||
|
||
# 记录查询结果的类型和内容用于调试
|
||
logger.info(f"时间线查询结果类型: {type(results)}, 长度: {len(results) if isinstance(results, list) else 'N/A'}")
|
||
|
||
# 处理查询结果
|
||
timeline_data = self._process_timeline_results(results)
|
||
|
||
logger.info(f"成功获取时间线记忆数据: 总计 {len(timeline_data.get('timelines_memory', []))} 条")
|
||
|
||
return {
|
||
'success': True,
|
||
'data': timeline_data,
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取时间线记忆数据失败: {str(e)}", exc_info=True)
|
||
return {
|
||
'success': False,
|
||
'error': str(e),
|
||
'data': {
|
||
"MemorySummary": [],
|
||
"Statement": [],
|
||
"ExtractedEntity": [],
|
||
"timelines_memory": []
|
||
},
|
||
'total': 0
|
||
}
|
||
def _process_timeline_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""
|
||
处理时间线查询结果
|
||
|
||
Args:
|
||
results: Neo4j查询结果
|
||
|
||
Returns:
|
||
处理后的时间线数据字典
|
||
"""
|
||
# 检查results是否为空或不是列表
|
||
if not results or not isinstance(results, list):
|
||
logger.warning(f"时间线查询结果为空或格式不正确: {type(results)}")
|
||
return {
|
||
"MemorySummary": [],
|
||
"Statement": [],
|
||
"ExtractedEntity": [],
|
||
"timelines_memory": []
|
||
}
|
||
|
||
memory_summary_list = []
|
||
statement_list = []
|
||
extracted_entity_list = []
|
||
|
||
for data in results:
|
||
# 检查data是否为字典类型
|
||
if not isinstance(data, dict):
|
||
logger.warning(f"跳过非字典类型的记录: {type(data)} - {data}")
|
||
continue
|
||
|
||
# 处理MemorySummary
|
||
summary = data.get('MemorySummary')
|
||
if summary is not None:
|
||
processed_summary = self._process_field_value(summary, "MemorySummary")
|
||
memory_summary_list.extend(processed_summary)
|
||
|
||
# 处理Statement
|
||
statement = data.get('statement')
|
||
if statement is not None:
|
||
processed_statement = self._process_field_value(statement, "Statement")
|
||
statement_list.extend(processed_statement)
|
||
|
||
# 处理ExtractedEntity
|
||
extracted_entity = data.get('ExtractedEntity')
|
||
if extracted_entity is not None:
|
||
processed_entity = self._process_field_value(extracted_entity, "ExtractedEntity")
|
||
extracted_entity_list.extend(processed_entity)
|
||
|
||
# 去重
|
||
memory_summary_list = list(set(memory_summary_list))
|
||
statement_list = list(set(statement_list))
|
||
extracted_entity_list = list(set(extracted_entity_list))
|
||
|
||
# 合并所有数据
|
||
all_timeline_data = memory_summary_list + statement_list + extracted_entity_list
|
||
|
||
result = {
|
||
"MemorySummary": memory_summary_list,
|
||
"Statement": statement_list,
|
||
"ExtractedEntity": extracted_entity_list,
|
||
"timelines_memory": all_timeline_data
|
||
}
|
||
|
||
logger.info(f"时间线数据处理完成: MemorySummary={len(memory_summary_list)}, Statement={len(statement_list)}, ExtractedEntity={len(extracted_entity_list)}")
|
||
|
||
return result
|
||
|
||
def _process_field_value(self, value: Any, field_name: str) -> List[str]:
|
||
"""
|
||
处理字段值,支持字符串、列表等类型
|
||
|
||
Args:
|
||
value: 字段值
|
||
field_name: 字段名称(用于日志)
|
||
|
||
Returns:
|
||
处理后的字符串列表
|
||
"""
|
||
processed_values = []
|
||
|
||
try:
|
||
if isinstance(value, list):
|
||
# 如果是列表,处理每个元素
|
||
for item in value:
|
||
if item is not None and str(item).strip() != '' and "MemorySummaryChunk" not in str(item):
|
||
processed_values.append(str(item))
|
||
elif isinstance(value, str):
|
||
# 如果是字符串,直接处理
|
||
if value.strip() != '' and "MemorySummaryChunk" not in value:
|
||
processed_values.append(value)
|
||
elif value is not None:
|
||
# 其他类型转换为字符串
|
||
str_value = str(value)
|
||
if str_value.strip() != '' and "MemorySummaryChunk" not in str_value:
|
||
processed_values.append(str_value)
|
||
except Exception as e:
|
||
logger.warning(f"处理字段 {field_name} 的值时出错: {e}, 值类型: {type(value)}, 值: {value}")
|
||
|
||
return processed_values
|
||
|
||
|
||
|
||
|
||
async def close(self):
|
||
"""关闭数据库连接"""
|
||
await self.connector.close()
|
||
|
||
|
||
|
||
class MemoryEmotion:
|
||
def __init__(self, id: str, table: str):
|
||
self.id = id
|
||
self.table = table
|
||
self.connector = Neo4jConnector()
|
||
|
||
def _convert_neo4j_types(self, obj: Any) -> Any:
|
||
"""
|
||
递归转换Neo4j特殊类型为可序列化的Python类型
|
||
"""
|
||
if isinstance(obj, Neo4jDateTime):
|
||
# 转换为用户友好的日期格式
|
||
return self._format_datetime(obj.iso_format())
|
||
elif hasattr(obj, '__class__') and 'neo4j' in str(obj.__class__):
|
||
if hasattr(obj, 'iso_format'):
|
||
return self._format_datetime(obj.iso_format())
|
||
elif hasattr(obj, '__str__'):
|
||
return str(obj)
|
||
else:
|
||
return repr(obj)
|
||
elif isinstance(obj, dict):
|
||
return {k: self._convert_neo4j_types(v) for k, v in obj.items()}
|
||
elif isinstance(obj, list):
|
||
return [self._convert_neo4j_types(item) for item in obj]
|
||
elif isinstance(obj, tuple):
|
||
return tuple(self._convert_neo4j_types(item) for item in obj)
|
||
else:
|
||
return obj
|
||
|
||
def _format_datetime(self, iso_string: str) -> str:
|
||
"""
|
||
将ISO格式的日期时间字符串转换为用户友好的格式
|
||
|
||
Args:
|
||
iso_string: ISO格式的日期时间字符串,如 "2026-01-07T13:40:33.679530"
|
||
|
||
Returns:
|
||
格式化后的日期时间字符串,如 "2026-01-07 13:40:33"
|
||
"""
|
||
try:
|
||
# 解析ISO格式的日期时间
|
||
dt = datetime.fromisoformat(iso_string.replace('Z', '+00:00'))
|
||
# 返回用户友好的格式:YYYY-MM-DD HH:MM:SS
|
||
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
except (ValueError, AttributeError):
|
||
# 如果解析失败,返回原始字符串
|
||
return iso_string
|
||
|
||
async def get_emotion(self) -> Dict[str, Any]:
|
||
"""
|
||
获取情绪随时间变化数据
|
||
|
||
Returns:
|
||
包含情绪数据的字典
|
||
"""
|
||
try:
|
||
logger.info(f"获取情绪数据 - ID: {self.id}, Table: {self.table}")
|
||
|
||
if self.table == 'Statement':
|
||
results = await self.connector.execute_query(Memory_Space_Emotion_Statement, id=self.id)
|
||
elif self.table == 'ExtractedEntity':
|
||
results = await self.connector.execute_query(Memory_Space_Emotion_ExtractedEntity, id=self.id)
|
||
else:
|
||
# MemorySummary/Chunk类型查询
|
||
results = await self.connector.execute_query(Memory_Space_Emotion_MemorySummary, id=self.id)
|
||
|
||
# 处理查询结果
|
||
emotion_data = self._process_emotion_results(results)
|
||
|
||
# 转换Neo4j类型
|
||
final_data = self._convert_neo4j_types(emotion_data)
|
||
|
||
logger.info(f"成功获取 {len(final_data)} 条情绪数据")
|
||
|
||
return {
|
||
'success': True,
|
||
'data': final_data,
|
||
'total': len(final_data)
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取情绪数据失败: {str(e)}")
|
||
return {
|
||
'success': False,
|
||
'error': str(e),
|
||
'data': [],
|
||
'total': 0
|
||
}
|
||
|
||
def _process_emotion_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""
|
||
处理情绪查询结果
|
||
|
||
Args:
|
||
results: Neo4j查询结果
|
||
|
||
Returns:
|
||
处理后的情绪数据列表
|
||
"""
|
||
emotion_data = []
|
||
|
||
# 检查results是否为空或不是列表
|
||
if not results or not isinstance(results, list):
|
||
logger.warning(f"情绪查询结果为空或格式不正确: {type(results)}")
|
||
return emotion_data
|
||
|
||
for record in results:
|
||
# 检查record是否为字典类型
|
||
if not isinstance(record, dict):
|
||
logger.warning(f"跳过非字典类型的记录: {type(record)} - {record}")
|
||
continue
|
||
|
||
# 获取创建时间并格式化
|
||
created_at = record.get('created_at')
|
||
formatted_created_at = created_at
|
||
|
||
# 如果created_at是字符串格式,尝试格式化
|
||
if isinstance(created_at, str):
|
||
formatted_created_at = self._format_datetime(created_at)
|
||
|
||
emotion_type = record.get('emotion_type')
|
||
emotion_intensity = record.get('emotion_intensity')
|
||
|
||
if emotion_type is not None and emotion_intensity is not None:
|
||
# 只保留情绪相关的字段
|
||
emotion_record = {
|
||
'emotion_intensity': emotion_intensity,
|
||
'emotion_type': emotion_type,
|
||
'created_at': formatted_created_at
|
||
}
|
||
emotion_data.append(emotion_record)
|
||
|
||
return emotion_data
|
||
|
||
async def close(self):
|
||
"""关闭数据库连接"""
|
||
await self.connector.close()
|
||
|
||
|
||
class MemoryInteraction:
|
||
def __init__(self, id: str, table: str):
|
||
self.id = id
|
||
self.table = table
|
||
self.connector = Neo4jConnector()
|
||
|
||
def _convert_neo4j_types(self, obj: Any) -> Any:
|
||
"""
|
||
递归转换Neo4j特殊类型为可序列化的Python类型
|
||
"""
|
||
if isinstance(obj, Neo4jDateTime):
|
||
# 转换为用户友好的日期格式
|
||
return self._format_datetime(obj.iso_format())
|
||
elif hasattr(obj, '__class__') and 'neo4j' in str(obj.__class__):
|
||
if hasattr(obj, 'iso_format'):
|
||
return self._format_datetime(obj.iso_format())
|
||
elif hasattr(obj, '__str__'):
|
||
return str(obj)
|
||
else:
|
||
return repr(obj)
|
||
elif isinstance(obj, dict):
|
||
return {k: self._convert_neo4j_types(v) for k, v in obj.items()}
|
||
elif isinstance(obj, list):
|
||
return [self._convert_neo4j_types(item) for item in obj]
|
||
elif isinstance(obj, tuple):
|
||
return tuple(self._convert_neo4j_types(item) for item in obj)
|
||
else:
|
||
return obj
|
||
|
||
def _format_datetime(self, iso_string: str) -> str:
|
||
"""
|
||
将ISO格式的日期时间字符串转换为用户友好的格式
|
||
|
||
Args:
|
||
iso_string: ISO格式的日期时间字符串,如 "2026-01-07T13:40:33.679530"
|
||
|
||
Returns:
|
||
格式化后的日期时间字符串,如 "2026-01-07 13:40:33"
|
||
"""
|
||
try:
|
||
# 解析ISO格式的日期时间
|
||
dt = datetime.fromisoformat(iso_string.replace('Z', '+00:00'))
|
||
# 返回用户友好的格式:YYYY-MM-DD HH:MM:SS
|
||
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
except (ValueError, AttributeError):
|
||
# 如果解析失败,返回原始字符串
|
||
return iso_string
|
||
|
||
async def get_interaction_frequency(self) -> Dict[str, Any]:
|
||
"""
|
||
获取交互频率数据
|
||
|
||
Returns:
|
||
包含交互数据的字典
|
||
"""
|
||
try:
|
||
logger.info(f"获取交互数据 - ID: {self.id}, Table: {self.table}")
|
||
|
||
if self.table == 'Statement':
|
||
results = await self.connector.execute_query(Memory_Space_Interaction_Statement, id=self.id)
|
||
elif self.table == 'ExtractedEntity':
|
||
results = await self.connector.execute_query(Memory_Space_Interaction_ExtractedEntity, id=self.id)
|
||
else:
|
||
# MemorySummary/Chunk类型查询
|
||
results = await self.connector.execute_query(Memory_Space_Interaction_Summary, id=self.id)
|
||
|
||
# 处理查询结果
|
||
interaction_data = self._process_interaction_results(results)
|
||
|
||
# 转换Neo4j类型
|
||
final_data = self._convert_neo4j_types(interaction_data)
|
||
|
||
logger.info(f"成功获取 {len(final_data)} 条交互数据")
|
||
|
||
return {
|
||
'success': True,
|
||
'data': final_data,
|
||
'total': len(final_data)
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取交互数据失败: {str(e)}")
|
||
return {
|
||
'success': False,
|
||
'error': str(e),
|
||
'data': [],
|
||
'total': 0
|
||
}
|
||
|
||
def _process_interaction_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""
|
||
处理交互查询结果
|
||
|
||
Args:
|
||
results: Neo4j查询结果
|
||
|
||
Returns:
|
||
处理后的交互数据列表
|
||
"""
|
||
interaction_data = []
|
||
|
||
# 检查results是否为空或不是列表
|
||
if not results or not isinstance(results, list):
|
||
logger.warning(f"交互查询结果为空或格式不正确: {type(results)}")
|
||
return interaction_data
|
||
|
||
for record in results:
|
||
# 检查record是否为字典类型
|
||
if not isinstance(record, dict):
|
||
logger.warning(f"跳过非字典类型的记录: {type(record)} - {record}")
|
||
continue
|
||
|
||
# 只保留交互相关的字段
|
||
name = record.get('name')
|
||
if name is not None:
|
||
interaction_record = {
|
||
'name': name,
|
||
'importance_score': record.get('importance_score', 0.0),
|
||
'interaction_count': record.get('interaction_count', 1) # 默认交互次数为1
|
||
}
|
||
interaction_data.append(interaction_record)
|
||
|
||
return interaction_data
|
||
|
||
async def close(self):
|
||
"""关闭数据库连接"""
|
||
await self.connector.close()
|