Fix/memory bug fix (#162)

* 图谱数据量限制数量去掉

* 图谱数据量限制数量去掉

* 图谱数据量限制数量去掉

* 用户详情优化

* 用户详情优化

* 用户详情优化

* 用户详情优化

* 用户详情优化

* 用户详情优化

* 读取的接口,去掉全局锁

* 输出数组

* 反思优化1.0(优化隐私输出、时间检索)

* 反思优化1.0(优化隐私输出、时间检索)

* 反思优化1.0(优化隐私输出、时间检索)

* 反思优化测试接口

* 反思优化测试接口

* 读取接口内层嵌套BUG修复

* 读取接口内层嵌套BUG修复

* 读取接口内层嵌套BUG修复

* 读取接口内层嵌套BUG修复

* 读取接口内层嵌套BUG修复

* 新增中翻英功能(记忆时间线)(用户摘要)(兴趣分布接口)(查询核心档案)(记忆洞察)

* 新增中翻英功能(记忆时间线)(用户摘要)(兴趣分布接口)(查询核心档案)(记忆洞察)-接口添加翻译字段

* 新增中翻英功能(记忆时间线)(用户摘要)(兴趣分布接口)(查询核心档案)(记忆洞察)-接口添加翻译字段

* 新增中翻英功能(记忆时间线)(用户摘要)(兴趣分布接口)(查询核心档案)(记忆洞察)-接口添加翻译字段

* 新增中翻英功能(记忆时间线)(用户摘要)(兴趣分布接口)(查询核心档案)(记忆洞察)-接口添加翻译字段

* 新增中翻英功能(记忆时间线)(用户摘要)(兴趣分布接口)(查询核心档案)(记忆洞察)-接口添加翻译字段
This commit is contained in:
lixinyue11
2026-01-21 11:33:52 +08:00
committed by GitHub
parent 21ec923f24
commit 6920deef63
11 changed files with 511 additions and 57 deletions

View File

@@ -9,7 +9,7 @@ from app.db import get_db
from app.dependencies import cur_workspace_access_guard, get_current_user
from app.models import ModelApiKey
from app.models.user_model import User
from app.repositories import knowledge_repository
from app.repositories import knowledge_repository, WorkspaceRepository
from app.schemas.memory_agent_schema import UserInput, Write_UserInput
from app.schemas.response_schema import ApiResponse
from app.services import task_service, workspace_service
@@ -616,8 +616,10 @@ async def get_knowledge_type_stats_api(
@router.get("/analytics/hot_memory_tags/by_user", response_model=ApiResponse)
async def get_hot_memory_tags_by_user_api(
end_user_id: Optional[str] = Query(None, description="用户ID可选"),
language_type: Optional[str] ="zh",
limit: int = Query(20, description="返回标签数量限制"),
current_user: User = Depends(get_current_user)
current_user: User = Depends(get_current_user),
db: Session=Depends(get_db),
):
"""
获取指定用户的热门记忆标签
@@ -628,10 +630,22 @@ async def get_hot_memory_tags_by_user_api(
...
]
"""
workspace_id=current_user.current_workspace_id
workspace_repo = WorkspaceRepository(db)
workspace_models = workspace_repo.get_workspace_models_configs(workspace_id)
if workspace_models:
model_id = workspace_models.get("llm", None)
else:
model_id = None
api_logger.info(f"Hot memory tags by user requested: end_user_id={end_user_id}")
try:
result = await memory_agent_service.get_hot_memory_tags_by_user(
end_user_id=end_user_id,
language_type=language_type,
model_id=model_id,
limit=limit
)
return success(data=result, msg="获取热门记忆标签成功")

View File

@@ -20,6 +20,7 @@ router = APIRouter(
@router.get("/short_term")
async def short_term_configs(
end_user_id: str,
language_type:Optional[str] = "zh",
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):

View File

@@ -12,6 +12,7 @@ from app.core.logging_config import get_api_logger
from app.core.response_utils import success, fail
from app.core.error_codes import BizCode
from app.core.api_key_utils import timestamp_to_datetime
from app.services.memory_base_service import Translation_English
from app.services.user_memory_service import (
UserMemoryService,
analytics_memory_types,
@@ -20,7 +21,7 @@ from app.services.user_memory_service import (
from app.services.memory_entity_relationship_service import MemoryEntityService,MemoryEmotion,MemoryInteraction
from app.schemas.response_schema import ApiResponse
from app.schemas.memory_storage_schema import GenerateCacheRequest
from app.repositories.workspace_repository import WorkspaceRepository
from app.schemas.end_user_schema import (
EndUserProfileResponse,
EndUserProfileUpdate,
@@ -44,6 +45,7 @@ router = APIRouter(
@router.get("/analytics/memory_insight/report", response_model=ApiResponse)
async def get_memory_insight_report_api(
end_user_id: str,
language_type: str = "zh",
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
@@ -53,10 +55,18 @@ async def get_memory_insight_report_api(
此接口仅查询数据库中已缓存的记忆洞察数据,不执行生成操作。
如需生成新的洞察报告,请使用专门的生成接口。
"""
workspace_id = current_user.current_workspace_id
workspace_repo = WorkspaceRepository(db)
workspace_models = workspace_repo.get_workspace_models_configs(workspace_id)
if workspace_models:
model_id = workspace_models.get("llm", None)
else:
model_id = None
api_logger.info(f"记忆洞察报告查询请求: end_user_id={end_user_id}, user={current_user.username}")
try:
# 调用服务层获取缓存数据
result = await user_memory_service.get_cached_memory_insight(db, end_user_id)
result = await user_memory_service.get_cached_memory_insight(db, end_user_id,model_id,language_type)
if result["is_cached"]:
api_logger.info(f"成功返回缓存的记忆洞察报告: end_user_id={end_user_id}")
@@ -72,6 +82,7 @@ async def get_memory_insight_report_api(
@router.get("/analytics/user_summary", response_model=ApiResponse)
async def get_user_summary_api(
end_user_id: str,
language_type: str="zh",
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
@@ -81,10 +92,18 @@ async def get_user_summary_api(
此接口仅查询数据库中已缓存的用户摘要数据,不执行生成操作。
如需生成新的用户摘要,请使用专门的生成接口。
"""
workspace_id = current_user.current_workspace_id
workspace_repo = WorkspaceRepository(db)
workspace_models = workspace_repo.get_workspace_models_configs(workspace_id)
if workspace_models:
model_id = workspace_models.get("llm", None)
else:
model_id = None
api_logger.info(f"用户摘要查询请求: end_user_id={end_user_id}, user={current_user.username}")
try:
# 调用服务层获取缓存数据
result = await user_memory_service.get_cached_user_summary(db, end_user_id)
result = await user_memory_service.get_cached_user_summary(db, end_user_id,model_id,language_type)
if result["is_cached"]:
api_logger.info(f"成功返回缓存的用户摘要: end_user_id={end_user_id}")
@@ -253,7 +272,6 @@ async def get_graph_data_api(
depth=depth,
center_node_id=center_node_id
)
# 检查是否有错误消息
if "message" in result and result["statistics"]["total_nodes"] == 0:
api_logger.warning(f"图数据查询返回空结果: {result.get('message')}")
@@ -278,7 +296,13 @@ async def get_end_user_profile(
db: Session = Depends(get_db),
) -> dict:
workspace_id = current_user.current_workspace_id
workspace_repo = WorkspaceRepository(db)
workspace_models = workspace_repo.get_workspace_models_configs(workspace_id)
if workspace_models:
model_id = workspace_models.get("llm", None)
else:
model_id = None
# 检查用户是否已选择工作空间
if workspace_id is None:
api_logger.warning(f"用户 {current_user.username} 尝试查询用户信息但未选择工作空间")
@@ -296,7 +320,6 @@ async def get_end_user_profile(
if not end_user:
api_logger.warning(f"终端用户不存在: end_user_id={end_user_id}")
return fail(BizCode.INVALID_PARAMETER, "终端用户不存在", f"end_user_id={end_user_id}")
# 构建响应数据
profile_data = EndUserProfileResponse(
id=end_user.id,
@@ -396,12 +419,21 @@ async def update_end_user_profile(
return fail(BizCode.INTERNAL_ERROR, "用户信息更新失败", str(e))
@router.get("/memory_space/timeline_memories", response_model=ApiResponse)
async def memory_space_timeline_of_shared_memories(id: str, label: str,
async def memory_space_timeline_of_shared_memories(id: str, label: str,language_type: str="zh",
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
workspace_id=current_user.current_workspace_id
workspace_repo = WorkspaceRepository(db)
workspace_models = workspace_repo.get_workspace_models_configs(workspace_id)
if workspace_models:
model_id = workspace_models.get("llm", None)
else:
model_id = None
MemoryEntity = MemoryEntityService(id, label)
timeline_memories_result = await MemoryEntity.get_timeline_memories_server()
timeline_memories_result = await MemoryEntity.get_timeline_memories_server(model_id, language_type)
return success(data=timeline_memories_result, msg="共同记忆时间线")
@router.get("/memory_space/relationship_evolution", response_model=ApiResponse)
async def memory_space_relationship_evolution(id: str, label: str,

View File

@@ -11,6 +11,7 @@ class EmotionTagsRequest(BaseModel):
start_date: Optional[str] = Field(None, description="开始日期ISO格式2024-01-01")
end_date: Optional[str] = Field(None, description="结束日期ISO格式2024-12-31")
limit: int = Field(10, ge=1, le=100, description="返回数量限制")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")
class EmotionWordcloudRequest(BaseModel):
@@ -18,20 +19,24 @@ class EmotionWordcloudRequest(BaseModel):
group_id: str = Field(..., description="组ID")
emotion_type: Optional[str] = Field(None, description="情绪类型过滤joy/sadness/anger/fear/surprise/neutral")
limit: int = Field(50, ge=1, le=200, description="返回词语数量")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")
class EmotionHealthRequest(BaseModel):
"""获取情绪健康指数请求"""
group_id: str = Field(..., description="组ID")
time_range: str = Field("30d", description="时间范围7d/30d/90d")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")
class EmotionSuggestionsRequest(BaseModel):
"""获取个性化情绪建议请求"""
group_id: str = Field(..., description="组ID")
config_id: Optional[int] = Field(None, description="配置ID用于指定LLM模型")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")
class EmotionGenerateSuggestionsRequest(BaseModel):
"""生成个性化情绪建议请求"""
end_user_id: str = Field(..., description="终端用户ID")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")

View File

@@ -44,6 +44,7 @@ class EndUserProfileResponse(BaseModel):
updatetime_profile: Optional[datetime.datetime] = Field(description="核心档案信息最后更新时间", default=None)
class EndUserProfileUpdate(BaseModel):
"""终端用户基本信息更新请求模型"""
end_user_id: str = Field(description="终端用户ID")

View File

@@ -51,6 +51,7 @@ class EpisodicMemoryOverviewRequest(BaseModel):
"""情景记忆总览查询请求"""
end_user_id: str = Field(..., description="终端用户ID")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")
time_range: str = Field(
default="all",
description="时间范围筛选可选值all, today, this_week, this_month"
@@ -70,3 +71,4 @@ class EpisodicMemoryDetailsRequest(BaseModel):
end_user_id: str = Field(..., description="终端用户ID")
summary_id: str = Field(..., description="情景记忆摘要ID")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")

View File

@@ -1,15 +1,19 @@
"""
显性记忆的请求和响应模型
"""
from typing import Optional
from pydantic import BaseModel, Field
class ExplicitMemoryOverviewRequest(BaseModel):
"""显性记忆总览查询请求"""
end_user_id: str = Field(..., description="终端用户ID")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")
class ExplicitMemoryDetailsRequest(BaseModel):
"""显性记忆详情查询请求"""
end_user_id: str = Field(..., description="终端用户ID")
memory_id: str = Field(..., description="记忆ID情景记忆或语义记忆的ID")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")

View File

@@ -26,6 +26,7 @@ from app.db import get_db_context
from app.models.knowledge_model import Knowledge, KnowledgeType
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_config_schema import ConfigurationError
from app.services.memory_base_service import Translation_English
from app.services.memory_config_service import MemoryConfigService
from app.services.memory_konwledges_server import (
write_rag,
@@ -692,7 +693,9 @@ class MemoryAgentService:
async def get_hot_memory_tags_by_user(
self,
end_user_id: Optional[str] = None,
limit: int = 20
limit: int = 20,
model_id: Optional[str] = None,
language_type: Optional[str] = "zh"
) -> List[Dict[str, Any]]:
"""
获取指定用户的热门记忆标签
@@ -710,7 +713,13 @@ class MemoryAgentService:
try:
# by_user=False 表示按 group_id 查询在Neo4j中group_id就是用户维度
tags = await get_hot_memory_tags(end_user_id, limit=limit, by_user=False)
payload = [{"name": t, "frequency": f} for t, f in tags]
payload=[]
for tag, freq in tags:
if language_type!="zh":
tag=await Translation_English(model_id, tag)
payload.append({"name": tag, "frequency": freq})
else:
payload.append({"name": tag, "frequency": freq})
return payload
except Exception as e:
logger.error(f"热门记忆标签查询失败: {e}")

View File

@@ -3,17 +3,268 @@ Memory Base Service
提供记忆服务的基础功能和共享辅助方法。
"""
import asyncio
import re
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from app.core.logging_config import get_logger
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.services.emotion_analytics_service import EmotionAnalyticsService
from app.core.memory.llm_tools.openai_client import OpenAIClient
from app.core.models.base import RedBearModelConfig
from app.services.memory_config_service import MemoryConfigService
from app.db import get_db_context
logger = get_logger(__name__)
class TranslationResponse(BaseModel):
"""翻译响应模型"""
data: str
class MemoryTransService:
"""记忆翻译服务,提供中英文翻译功能"""
def __init__(self, llm_client=None, model_id: Optional[str] = None):
"""
初始化翻译服务
Args:
llm_client: LLM客户端实例或模型ID字符串可选
model_id: 模型ID用于初始化LLM客户端可选
Note:
- 如果llm_client是字符串会被当作model_id使用
- 如果同时提供llm_client和model_id优先使用llm_client
"""
# 处理llm_client参数如果是字符串当作model_id
if isinstance(llm_client, str):
self.model_id = llm_client
self.llm_client = None
else:
self.llm_client = llm_client
self.model_id = model_id
self._initialized = False
def _ensure_llm_client(self):
"""确保LLM客户端已初始化"""
if self._initialized:
return
if self.llm_client is None:
if self.model_id:
with get_db_context() as db:
config_service = MemoryConfigService(db)
model_config = config_service.get_model_config(self.model_id)
extra_params = {
"temperature": 0.2,
"max_tokens": 400,
"top_p": 0.8,
"stream": False,
}
self.llm_client = OpenAIClient(
RedBearModelConfig(
model_name=model_config.get("model_name"),
provider=model_config.get("provider"),
api_key=model_config.get("api_key"),
base_url=model_config.get("base_url"),
timeout=model_config.get("timeout", 30),
max_retries=model_config.get("max_retries", 3),
extra_params=extra_params
),
type_=model_config.get("type")
)
else:
raise ValueError("必须提供 llm_client 或 model_id 之一")
self._initialized = True
async def translate_to_english(self, text: str) -> str:
"""
将中文翻译为英文
Args:
text: 要翻译的中文文本
Returns:
翻译后的英文文本
"""
self._ensure_llm_client()
translation_messages = [
{
"role": "user",
"content": f"{text}\n\n中文翻译为英文,输出格式为{{\"data\":\"翻译后的内容\"}}"
}
]
try:
response = await self.llm_client.response_structured(
messages=translation_messages,
response_model=TranslationResponse
)
return response.data
except Exception as e:
logger.error(f"翻译失败: {str(e)}")
return text # 翻译失败时返回原文
async def is_english(self, text: str) -> bool:
"""
检查文本是否为英文
Args:
text: 要检查的文本(必须是字符串)
Returns:
True 如果文本主要是英文False 否则
Note:
- 只接受字符串类型
- 检查是否主要由英文字母和常见标点组成
- 允许数字、空格和常见标点符号
"""
if not isinstance(text, str):
raise TypeError(f"is_english 只接受字符串类型,收到: {type(text).__name__}")
if not text.strip():
return True # 空字符串视为英文
# 更宽松的英文检查:允许字母、数字、空格和常见标点
# 如果文本中英文字符占比超过 80%,认为是英文
english_chars = sum(1 for c in text if c.isascii() and (c.isalnum() or c.isspace() or c in '.,!?;:\'"()-'))
total_chars = len(text)
if total_chars == 0:
return True
return (english_chars / total_chars) >= 0.8
async def Translate(self, text: str, target_language: str = "en") -> str:
"""
通用翻译方法(保持向后兼容)
Args:
text: 要翻译的文本
target_language: 目标语言,"en"表示英文,"zh"表示中文
Returns:
翻译后的文本
"""
if target_language == "en":
return await self.translate_to_english(text)
else:
logger.warning(f"不支持的目标语言: {target_language},返回原文")
return text
# 测试翻译服务
async def Translation_English(modid, text, fields=None):
"""
将数据翻译为英文(支持字段级翻译)
Args:
modid: 模型ID
text: 要翻译的数据(可以是字符串、字典或列表)
fields: 需要翻译的字段列表(可选)
如果为None默认翻译: ['content', 'summary', 'statement', 'description',
'name', 'aliases', 'caption', 'emotion_keywords']
Returns:
翻译后的数据,保持原有结构
Note:
- 对于字符串:直接翻译
- 对于列表:递归处理每个元素,保持列表长度和索引不变
- 对于字典只翻译指定字段fields参数
- 对于其他类型:原样返回
"""
trans_service = MemoryTransService(modid)
# 处理字符串类型
if isinstance(text, str):
# 空字符串直接返回
if not text.strip():
return text
try:
is_eng = await trans_service.is_english(text)
if not is_eng:
english_result = await trans_service.Translate(text)
return english_result
return text
except Exception as e:
logger.warning(f"翻译字符串失败: {e}")
return text
# 处理列表类型
elif isinstance(text, list):
english_result = []
for item in text:
# 递归处理列表中的每个元素
if isinstance(item, str):
# 字符串元素:检查是否需要翻译
if not item.strip():
english_result.append(item)
continue
try:
is_eng = await trans_service.is_english(item)
if not is_eng:
translated = await trans_service.Translate(item)
english_result.append(translated)
else:
# 保留英文项,不改变列表长度
english_result.append(item)
except Exception as e:
logger.warning(f"翻译列表项失败: {e}")
english_result.append(item)
elif isinstance(item, dict):
# 字典元素:递归调用自己处理字典
translated_dict = await Translation_English(modid, item, fields)
english_result.append(translated_dict)
elif isinstance(item, list):
# 嵌套列表:递归处理
translated_list = await Translation_English(modid, item, fields)
english_result.append(translated_list)
else:
# 其他类型(数字、布尔值等):原样保留
english_result.append(item)
return english_result
# 处理字典类型
elif isinstance(text, dict):
# 确定要翻译的字段
if fields is None:
# 默认翻译字段
fields = [
'content', 'summary', 'statement', 'description',
'name', 'aliases', 'caption', 'emotion_keywords',
'text', 'title', 'label', 'type' # 添加常用字段
]
# 创建副本,避免修改原始数据
result = text.copy()
for field in fields:
if field in result and result[field] is not None:
# 递归翻译字段值(可能是字符串、列表或嵌套字典)
try:
result[field] = await Translation_English(modid, result[field], fields)
except Exception as e:
logger.warning(f"翻译字段 {field} 失败: {e}")
# 翻译失败时保留原值
continue
return result
# 其他类型数字、布尔值、None等原样返回
else:
return text
class MemoryBaseService:
"""记忆服务基类,提供共享的辅助方法"""
@@ -294,4 +545,4 @@ class MemoryBaseService:
except Exception as e:
logger.error(f"获取遗忘记忆数量时出错: {str(e)}", exc_info=True)
return 0
return 0

View File

@@ -16,6 +16,7 @@ import json
from datetime import datetime
from app.schemas.memory_episodic_schema import EmotionType
from app.services.memory_base_service import Translation_English
logger = logging.getLogger(__name__)
@@ -24,7 +25,7 @@ class MemoryEntityService:
self.id = id
self.table = table
self.connector = Neo4jConnector()
async def get_timeline_memories_server(self):
async def get_timeline_memories_server(self,model_id, language_type):
"""
获取时间线记忆数据
@@ -48,10 +49,10 @@ class MemoryEntityService:
logger.info(f"获取时间线记忆数据 - ID: {self.id}, Table: {self.table}")
# 根据表类型选择查询
if self.table == 'Statement':
if self.table == 'Statement':
# Statement只需要输入ID使用简化查询
results = await self.connector.execute_query(Memory_Timeline_Statement, id=self.id)
elif self.table == 'ExtractedEntity':
elif self.table == 'ExtractedEntity':
# ExtractedEntity类型查询
results = await self.connector.execute_query(Memory_Timeline_ExtractedEntity, id=self.id)
else:
@@ -62,7 +63,7 @@ class MemoryEntityService:
logger.info(f"时间线查询结果类型: {type(results)}, 长度: {len(results) if isinstance(results, list) else 'N/A'}")
# 处理查询结果
timeline_data = self._process_timeline_results(results)
timeline_data =await self._process_timeline_results(results, model_id, language_type)
logger.info(f"成功获取时间线记忆数据: 总计 {len(timeline_data.get('timelines_memory', []))}")
@@ -71,12 +72,14 @@ class MemoryEntityService:
except Exception as e:
logger.error(f"获取时间线记忆数据失败: {str(e)}", exc_info=True)
return str(e)
def _process_timeline_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
async def _process_timeline_results(self, results: List[Dict[str, Any]], model_id: str, language_type: str) -> Dict[str, Any]:
"""
处理时间线查询结果
Args:
results: Neo4j查询结果
model_id: 模型ID用于翻译
language_type: 语言类型 ('zh' 或其他)
Returns:
处理后的时间线数据字典
@@ -104,19 +107,19 @@ class MemoryEntityService:
# 处理MemorySummary
summary = data.get('MemorySummary')
if summary is not None:
processed_summary = self._process_field_value(summary, "MemorySummary")
processed_summary = await self._process_field_value(summary, "MemorySummary")
memory_summary_list.extend(processed_summary)
# 处理Statement
statement = data.get('statement')
if statement is not None:
processed_statement = self._process_field_value(statement, "Statement")
processed_statement = await self._process_field_value(statement, "Statement")
statement_list.extend(processed_statement)
# 处理ExtractedEntity
extracted_entity = data.get('ExtractedEntity')
if extracted_entity is not None:
processed_entity = self._process_field_value(extracted_entity, "ExtractedEntity")
processed_entity = await self._process_field_value(extracted_entity, "ExtractedEntity")
extracted_entity_list.extend(processed_entity)
# 去重 - 现在处理的是字典列表,需要更智能的去重
@@ -128,6 +131,8 @@ class MemoryEntityService:
all_timeline_data = memory_summary_list + statement_list
all_timeline_data = self._merge_same_text_items(all_timeline_data)
# 如果需要翻译(非中文),对整个结果进行翻译
result = {
"MemorySummary": memory_summary_list,
"Statement": statement_list,
@@ -233,7 +238,7 @@ class MemoryEntityService:
except Exception:
return False
def _process_field_value(self, value: Any, field_name: str) -> List[Dict[str, Any]]:
async def _process_field_value(self, value: Any, field_name: str) -> List[Dict[str, Any]]:
"""
处理字段值,支持字符串、列表等类型
@@ -251,13 +256,13 @@ class MemoryEntityService:
# 如果是列表,处理每个元素
for item in value:
if self._is_valid_item(item):
processed_item = self._process_single_item(item)
processed_item = await self._process_single_item(item)
if processed_item:
processed_values.append(processed_item)
elif isinstance(value, dict):
# 如果是字典,直接处理
if self._is_valid_item(value):
processed_item = self._process_single_item(value)
processed_item = await self._process_single_item(value)
if processed_item:
processed_values.append(processed_item)
elif isinstance(value, str):
@@ -304,7 +309,7 @@ class MemoryEntityService:
return (str(item).strip() != '' and
"MemorySummaryChunk" not in str(item))
def _process_single_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
async def _process_single_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
处理单个项目
@@ -369,6 +374,117 @@ class MemoryEntityService:
logger.warning(f"转换时间格式失败: {e}, 原始值: {dt}")
return str(dt) if dt is not None else None
async def _translate_list(
self,
data_list: List[Dict[str, Any]],
model_id: str,
fields: List[str]
) -> List[Dict[str, Any]]:
"""
翻译列表中每个字典的指定字段(并发有限度以降低整体延迟)
Args:
data_list: 要翻译的字典列表
model_id: 模型ID
fields: 需要翻译的字段列表
Returns:
翻译后的字典列表
"""
# 空列表或无字段时直接返回
if not data_list or not fields:
return data_list
import asyncio
# 并发限制,避免一次性发起过多请求
# 可根据实际情况调整(建议 5-10
concurrency_limit = 5
semaphore = asyncio.Semaphore(concurrency_limit)
async def translate_single_field(
index: int,
field: str,
value: Any,
) -> Optional[tuple]:
"""
翻译单个字段并返回 (索引, 字段名, 翻译结果)
Returns:
(index, field, translated_value) 或 None如果跳过
"""
# 跳过空值
if value is None or value == "":
return None
# 统一转成字符串再翻译,防止非字符串类型导致错误
text = str(value)
try:
async with semaphore:
# 调用 Translation_English 进行翻译
# 注意Translation_English 的参数顺序是 (model_id, text)
translated = await Translation_English(model_id, text)
# 如果翻译结果为空,保留原值
if translated is None or translated == "":
return None
return index, field, translated
except Exception as e:
logger.warning(f"翻译字段 {field} (索引 {index}) 失败: {e}")
return None
# 构造所有需要翻译的任务
tasks = []
for idx, item in enumerate(data_list):
# 防御性检查:确保 item 是字典
if not isinstance(item, dict):
continue
for field in fields:
if field not in item:
continue
value = item.get(field)
# 对于 None 或空字符串的值,直接跳过,不创建任务
if value is None or value == "":
continue
tasks.append(
asyncio.create_task(
translate_single_field(idx, field, value)
)
)
# 如果没有需要翻译的任务,直接返回原列表
if not tasks:
return data_list
# 使用 gather 并发执行翻译任务(受 semaphore 限制)
# return_exceptions=True 可以防止单个任务失败导致整体失败
results = await asyncio.gather(*tasks, return_exceptions=True)
# 创建深拷贝以避免修改原始数据
translated_list = [item.copy() if isinstance(item, dict) else item for item in data_list]
# 将翻译结果回填到列表
for result in results:
# 跳过 None 结果和异常
if result is None or isinstance(result, Exception):
if isinstance(result, Exception):
logger.warning(f"翻译任务异常: {result}")
continue
idx, field, translated = result
# 防御性检查索引范围
if 0 <= idx < len(translated_list) and isinstance(translated_list[idx], dict):
translated_list[idx][field] = translated
return translated_list
@@ -426,15 +542,19 @@ class MemoryEmotion:
# 如果解析失败,返回原始字符串
return iso_string
async def get_emotion(self) -> Dict[str, Any]:
async def get_emotion(self, model_id: str = None, language_type: str = 'zh') -> Dict[str, Any]:
"""
获取情绪随时间变化数据
Args:
model_id: 模型ID用于翻译
language_type: 语言类型 ('zh' 或其他)
Returns:
包含情绪数据的字典
"""
try:
logger.info(f"获取情绪数据 - ID: {self.id}, Table: {self.table}")
logger.info(f"获取情绪数据 - ID: {self.id}, Table: {self.table}, language_type={language_type}")
if self.table == 'Statement':
results = await self.connector.execute_query(Memory_Space_Emotion_Statement, id=self.id)
@@ -450,6 +570,10 @@ class MemoryEmotion:
# 转换Neo4j类型
final_data = self._convert_neo4j_types(emotion_data)
# 如果需要翻译(非中文)
if language_type != 'zh' and model_id and final_data:
final_data = await self._translate_emotion_data(final_data, model_id)
logger.info(f"成功获取 {len(final_data)} 条情绪数据")
return final_data
@@ -590,16 +714,14 @@ class MemoryInteraction:
"""
try:
logger.info(f"获取交互数据 - ID: {self.id}, Table: {self.table}")
ori_data= await self.connector.execute_query(Memory_Space_Entity, id=self.id)
if ori_data!=[]:
# name = ori_data[0]['name']
group_id = ori_data[0]['group_id']
group_id = [i['group_id'] for i in ori_data][0]
Space_User = await self.connector.execute_query(Memory_Space_User, group_id=group_id)
if not Space_User:
return []
user_id=Space_User[0]['id']
results = await self.connector.execute_query(Memory_Space_Associative, id=self.id,user_id=user_id)

View File

@@ -18,7 +18,7 @@ from app.repositories.end_user_repository import EndUserRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_episodic_schema import EmotionSubject, EmotionType, type_mapping
from app.services.implicit_memory_service import ImplicitMemoryService
from app.services.memory_base_service import MemoryBaseService
from app.services.memory_base_service import MemoryBaseService, MemoryTransService, Translation_English
from app.services.memory_config_service import MemoryConfigService
from app.services.memory_perceptual_service import MemoryPerceptualService
from app.services.memory_short_service import ShortService
@@ -360,7 +360,9 @@ class UserMemoryService:
async def get_cached_memory_insight(
self,
db: Session,
end_user_id: str
end_user_id: str,
model_id: str,
language_type: str
) -> Dict[str, Any]:
"""
从数据库获取缓存的记忆洞察(四个维度)
@@ -419,11 +421,18 @@ class UserMemoryService:
key_findings_array = []
logger.info(f"成功获取 end_user_id {end_user_id} 的缓存记忆洞察(四维度)")
memory_insight=end_user.memory_insight
behavior_pattern=end_user.behavior_pattern
growth_trajectory=end_user.growth_trajectory
if language_type!='zh':
memory_insight=await Translation_English(model_id,memory_insight)
behavior_pattern=await Translation_English(model_id,behavior_pattern)
growth_trajectory=await Translation_English(model_id,growth_trajectory)
return {
"memory_insight": end_user.memory_insight, # 总体概述存储在 memory_insight
"behavior_pattern": end_user.behavior_pattern,
"memory_insight":memory_insight, # 总体概述存储在 memory_insight
"behavior_pattern":behavior_pattern,
"key_findings": key_findings_array, # 返回数组
"growth_trajectory": end_user.growth_trajectory,
"growth_trajectory": growth_trajectory,
"updated_at": self._datetime_to_timestamp(end_user.memory_insight_updated_at),
"is_cached": True
}
@@ -457,7 +466,9 @@ class UserMemoryService:
async def get_cached_user_summary(
self,
db: Session,
end_user_id: str
end_user_id: str,
model_id:str,
language_type:str="zh"
) -> Dict[str, Any]:
"""
从数据库获取缓存的用户摘要(四个部分)
@@ -481,7 +492,6 @@ class UserMemoryService:
user_uuid = uuid.UUID(end_user_id)
repo = EndUserRepository(db)
end_user = repo.get_by_id(user_uuid)
if not end_user:
logger.warning(f"未找到 end_user_id 为 {end_user_id} 的用户")
return {
@@ -495,20 +505,29 @@ class UserMemoryService:
}
# 检查是否有缓存数据(至少有一个字段不为空)
user_summary=end_user.user_summary
personality_traits=end_user.personality_traits
core_values=end_user.core_values
one_sentence_summary=end_user.one_sentence_summary
if language_type!='zh':
user_summary=await Translation_English(model_id, user_summary)
personality_traits = await Translation_English(model_id, personality_traits)
core_values = await Translation_English(model_id, core_values)
one_sentence_summary = await Translation_English(model_id, one_sentence_summary)
has_cache = any([
end_user.user_summary,
end_user.personality_traits,
end_user.core_values,
end_user.one_sentence_summary
user_summary,
personality_traits,
core_values,
one_sentence_summary
])
if has_cache:
logger.info(f"成功获取 end_user_id {end_user_id} 的缓存用户摘要")
return {
"user_summary": end_user.user_summary,
"personality": end_user.personality_traits,
"core_values": end_user.core_values,
"one_sentence": end_user.one_sentence_summary,
"user_summary": user_summary,
"personality": personality_traits,
"core_values":core_values,
"one_sentence": one_sentence_summary,
"updated_at": self._datetime_to_timestamp(end_user.user_summary_updated_at),
"is_cached": True
}
@@ -1367,7 +1386,6 @@ async def analytics_memory_types(
return memory_types
async def analytics_graph_data(
db: Session,
end_user_id: str,
@@ -1557,7 +1575,7 @@ async def analytics_graph_data(
f"成功获取图数据: end_user_id={end_user_id}, "
f"nodes={len(nodes)}, edges={len(edges)}"
)
return {
"nodes": nodes,
"edges": edges,
@@ -1606,11 +1624,7 @@ async def _extract_node_properties(label: str, properties: Dict[str, Any],node_
# 获取该节点类型的白名单字段
allowed_fields = field_whitelist.get(label, [])
# 如果没有定义白名单,返回空字典(或者可以返回所有字段)
# if not allowed_fields:
# # 对于未定义的节点类型,只返回基本字段
# allowed_fields = ["name", "created_at", "caption"]
count_neo4j=f"""MATCH (n)-[r]-(m) WHERE elementId(n) ="{node_id}" RETURN count(r) AS rel_count;"""
node_results = await (_neo4j_connector.execute_query(count_neo4j))
# 提取白名单中的字段
@@ -1618,13 +1632,12 @@ async def _extract_node_properties(label: str, properties: Dict[str, Any],node_
for field in allowed_fields:
if field in properties:
value = properties[field]
if str(field) == 'entity_type':
if str(field) == 'entity_type':
value=type_mapping.get(value,'')
if str(field)=="emotion_type":
value=EmotionType.EMOTION_MAPPING.get(value)
if str(field)=="emotion_subject":
if str(field)=="emotion_subject":
value=EmotionSubject.SUBJECT_MAPPING.get(value)
# 清理 Neo4j 特殊类型
filtered_props[field] = _clean_neo4j_value(value)
filtered_props['associative_memory']=[i['rel_count'] for i in node_results][0]
return filtered_props