新增中翻英功能(记忆时间线)(用户摘要)(兴趣分布接口)(查询核心档案)(记忆洞察)

This commit is contained in:
lixinyue
2026-01-20 20:12:14 +08:00
parent 575190a96d
commit 29852ff0a5
7 changed files with 352 additions and 58 deletions

View File

@@ -16,6 +16,7 @@ import json
from datetime import datetime
from app.schemas.memory_episodic_schema import EmotionType
from app.services.memory_base_service import Translation_English
logger = logging.getLogger(__name__)
@@ -24,7 +25,7 @@ class MemoryEntityService:
self.id = id
self.table = table
self.connector = Neo4jConnector()
async def get_timeline_memories_server(self):
async def get_timeline_memories_server(self,model_id, language_type):
"""
获取时间线记忆数据
@@ -48,10 +49,10 @@ class MemoryEntityService:
logger.info(f"获取时间线记忆数据 - ID: {self.id}, Table: {self.table}")
# 根据表类型选择查询
if self.table == 'Statement':
if self.table == 'Statement':
# Statement只需要输入ID使用简化查询
results = await self.connector.execute_query(Memory_Timeline_Statement, id=self.id)
elif self.table == 'ExtractedEntity':
elif self.table == 'ExtractedEntity':
# ExtractedEntity类型查询
results = await self.connector.execute_query(Memory_Timeline_ExtractedEntity, id=self.id)
else:
@@ -62,7 +63,7 @@ class MemoryEntityService:
logger.info(f"时间线查询结果类型: {type(results)}, 长度: {len(results) if isinstance(results, list) else 'N/A'}")
# 处理查询结果
timeline_data = self._process_timeline_results(results)
timeline_data =await self._process_timeline_results(results, model_id, language_type)
logger.info(f"成功获取时间线记忆数据: 总计 {len(timeline_data.get('timelines_memory', []))}")
@@ -71,12 +72,14 @@ class MemoryEntityService:
except Exception as e:
logger.error(f"获取时间线记忆数据失败: {str(e)}", exc_info=True)
return str(e)
def _process_timeline_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
async def _process_timeline_results(self, results: List[Dict[str, Any]], model_id: str, language_type: str) -> Dict[str, Any]:
"""
处理时间线查询结果
Args:
results: Neo4j查询结果
model_id: 模型ID用于翻译
language_type: 语言类型 ('zh' 或其他)
Returns:
处理后的时间线数据字典
@@ -104,19 +107,19 @@ class MemoryEntityService:
# 处理MemorySummary
summary = data.get('MemorySummary')
if summary is not None:
processed_summary = self._process_field_value(summary, "MemorySummary")
processed_summary = await self._process_field_value(summary, "MemorySummary")
memory_summary_list.extend(processed_summary)
# 处理Statement
statement = data.get('statement')
if statement is not None:
processed_statement = self._process_field_value(statement, "Statement")
processed_statement = await self._process_field_value(statement, "Statement")
statement_list.extend(processed_statement)
# 处理ExtractedEntity
extracted_entity = data.get('ExtractedEntity')
if extracted_entity is not None:
processed_entity = self._process_field_value(extracted_entity, "ExtractedEntity")
processed_entity = await self._process_field_value(extracted_entity, "ExtractedEntity")
extracted_entity_list.extend(processed_entity)
# 去重 - 现在处理的是字典列表,需要更智能的去重
@@ -128,6 +131,21 @@ class MemoryEntityService:
all_timeline_data = memory_summary_list + statement_list
all_timeline_data = self._merge_same_text_items(all_timeline_data)
# 如果需要翻译(非中文),对整个结果进行翻译
if language_type != 'zh':
# 定义需要翻译的字段
fields_to_translate = ['text', 'type']
# 翻译各个列表
if memory_summary_list:
memory_summary_list = await self._translate_list(memory_summary_list, model_id, fields_to_translate)
if statement_list:
statement_list = await self._translate_list(statement_list, model_id, fields_to_translate)
if extracted_entity_list:
extracted_entity_list = await self._translate_list(extracted_entity_list, model_id, fields_to_translate)
if all_timeline_data:
all_timeline_data = await self._translate_list(all_timeline_data, model_id, fields_to_translate)
result = {
"MemorySummary": memory_summary_list,
"Statement": statement_list,
@@ -233,7 +251,7 @@ class MemoryEntityService:
except Exception:
return False
def _process_field_value(self, value: Any, field_name: str) -> List[Dict[str, Any]]:
async def _process_field_value(self, value: Any, field_name: str) -> List[Dict[str, Any]]:
"""
处理字段值,支持字符串、列表等类型
@@ -251,13 +269,13 @@ class MemoryEntityService:
# 如果是列表,处理每个元素
for item in value:
if self._is_valid_item(item):
processed_item = self._process_single_item(item)
processed_item = await self._process_single_item(item)
if processed_item:
processed_values.append(processed_item)
elif isinstance(value, dict):
# 如果是字典,直接处理
if self._is_valid_item(value):
processed_item = self._process_single_item(value)
processed_item = await self._process_single_item(value)
if processed_item:
processed_values.append(processed_item)
elif isinstance(value, str):
@@ -304,7 +322,7 @@ class MemoryEntityService:
return (str(item).strip() != '' and
"MemorySummaryChunk" not in str(item))
def _process_single_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
async def _process_single_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
处理单个项目
@@ -369,6 +387,42 @@ class MemoryEntityService:
logger.warning(f"转换时间格式失败: {e}, 原始值: {dt}")
return str(dt) if dt is not None else None
async def _translate_list(self, data_list: List[Dict[str, Any]], model_id: str, fields: List[str]) -> List[Dict[str, Any]]:
"""
翻译列表中每个字典的指定字段
Args:
data_list: 要翻译的字典列表
model_id: 模型ID
fields: 需要翻译的字段列表
Returns:
翻译后的字典列表
"""
if not data_list:
return data_list
translated_list = []
for item in data_list:
if not isinstance(item, dict):
translated_list.append(item)
continue
translated_item = item.copy()
for field in fields:
if field in translated_item and translated_item[field]:
try:
# 调用Translation_English翻译单个字段
translated_value = await Translation_English(model_id, translated_item[field])
if translated_value:
translated_item[field] = translated_value
except Exception as e:
logger.warning(f"翻译字段 {field} 失败: {e}")
translated_list.append(translated_item)
return translated_list
@@ -426,15 +480,19 @@ class MemoryEmotion:
# 如果解析失败,返回原始字符串
return iso_string
async def get_emotion(self) -> Dict[str, Any]:
async def get_emotion(self, model_id: str = None, language_type: str = 'zh') -> Dict[str, Any]:
"""
获取情绪随时间变化数据
Args:
model_id: 模型ID用于翻译
language_type: 语言类型 ('zh' 或其他)
Returns:
包含情绪数据的字典
"""
try:
logger.info(f"获取情绪数据 - ID: {self.id}, Table: {self.table}")
logger.info(f"获取情绪数据 - ID: {self.id}, Table: {self.table}, language_type={language_type}")
if self.table == 'Statement':
results = await self.connector.execute_query(Memory_Space_Emotion_Statement, id=self.id)
@@ -450,6 +508,10 @@ class MemoryEmotion:
# 转换Neo4j类型
final_data = self._convert_neo4j_types(emotion_data)
# 如果需要翻译(非中文)
if language_type != 'zh' and model_id and final_data:
final_data = await self._translate_emotion_data(final_data, model_id)
logger.info(f"成功获取 {len(final_data)} 条情绪数据")
return final_data
@@ -590,16 +652,14 @@ class MemoryInteraction:
"""
try:
logger.info(f"获取交互数据 - ID: {self.id}, Table: {self.table}")
ori_data= await self.connector.execute_query(Memory_Space_Entity, id=self.id)
if ori_data!=[]:
# name = ori_data[0]['name']
group_id = ori_data[0]['group_id']
group_id = [i['group_id'] for i in ori_data][0]
Space_User = await self.connector.execute_query(Memory_Space_User, group_id=group_id)
if not Space_User:
return []
user_id=Space_User[0]['id']
results = await self.connector.execute_query(Memory_Space_Associative, id=self.id,user_id=user_id)