[changes] Simultaneously create the "end_user_info" data to ensure that the interface modification takes effect immediately.

This commit is contained in:
lanceyq
2026-03-25 16:26:41 +08:00
parent 7c0743eb8f
commit 38c6c7f053
5 changed files with 438 additions and 93 deletions

View File

@@ -454,6 +454,7 @@ class UserMemoryService:
"""
try:
from app.models.end_user_info_model import EndUserInfo
from app.models.end_user_model import EndUser
from app.core.api_key_utils import datetime_to_timestamp
# 转换为UUID并查询
@@ -471,6 +472,12 @@ class UserMemoryService:
# 定义允许更新的字段白名单
allowed_fields = {'other_name', 'aliases', 'meta_data'}
# 检查是否更新了 aliases 字段
aliases_updated = 'aliases' in update_data and update_data['aliases'] != end_user_info_record.aliases
# 检查是否更新了 other_name 字段
other_name_updated = 'other_name' in update_data and update_data['other_name'] != end_user_info_record.other_name
# 更新字段(仅允许白名单中的字段)
for field, value in update_data.items():
if field in allowed_fields:
@@ -479,10 +486,30 @@ class UserMemoryService:
# 更新时间戳
end_user_info_record.updated_at = datetime.now()
# 如果 other_name 被更新,同步更新 end_user 表
if other_name_updated:
end_user_record = db.query(EndUser).filter(EndUser.id == user_uuid).first()
if end_user_record:
end_user_record.other_name = update_data['other_name']
end_user_record.updated_at = datetime.now()
logger.info(f"同步更新 end_user 表的 other_name: end_user_id={end_user_id}, other_name={update_data['other_name']}")
else:
logger.warning(f"未找到对应的 end_user 记录: end_user_id={end_user_id}")
# 提交更改
db.commit()
db.refresh(end_user_info_record)
# 如果 aliases 被更新,同步到 Neo4j
if aliases_updated:
try:
import asyncio
asyncio.create_task(self._sync_aliases_to_neo4j(end_user_id, update_data['aliases']))
logger.info(f"已触发 aliases 同步到 Neo4j: end_user_id={end_user_id}, aliases={update_data['aliases']}")
except Exception as sync_error:
logger.error(f"触发同步 aliases 到 Neo4j 失败: {sync_error}", exc_info=True)
# 不影响主流程,只记录错误
# 构建响应数据(转换时间为毫秒时间戳)
response_data = {
"end_user_info_id": str(end_user_info_record.id),
@@ -518,6 +545,42 @@ class UserMemoryService:
"error": str(e)
}
async def _sync_aliases_to_neo4j(self, end_user_id: str, aliases: List[str]) -> None:
"""
将 aliases 同步到 Neo4j 中的用户实体
Args:
end_user_id: 终端用户ID
aliases: 别名列表
"""
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
# Cypher 查询:更新用户实体的 aliases
cypher_query = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id
AND e.name IN ['用户', '', 'User', 'I']
SET e.aliases = $aliases
RETURN e.id AS entity_id, e.name AS entity_name, e.aliases AS updated_aliases
"""
connector = Neo4jConnector()
try:
result = await connector.execute_query(
cypher_query,
end_user_id=end_user_id,
aliases=aliases
)
if result:
logger.info(f"成功同步 aliases 到 Neo4j: end_user_id={end_user_id}, 更新了 {len(result)} 个实体节点")
else:
logger.warning(f"未找到需要更新的用户实体节点: end_user_id={end_user_id}")
except Exception as e:
logger.error(f"同步 aliases 到 Neo4j 失败: {e}", exc_info=True)
raise
async def get_cached_memory_insight(
self,
db: Session,