From 9470dd2f1e882d218da46d09b4cb717a30591a62 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Mon, 13 Apr 2026 18:47:56 +0800 Subject: [PATCH] refactor(memory): extract shared MemorySummary count query and replace magic number MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move duplicated Neo4j MemorySummary count query into MemoryBaseService.get_valid_memory_summary_count() - Introduce MIN_MEMORY_SUMMARY_COUNT constant to replace hardcoded 5 - Fix import ordering in implicit_emotions_storage_repository - Use UTC consistently for date calculations (remove CST offset, datetime.now → datetime.utcnow) --- .../implicit_emotions_storage_repository.py | 20 +++++----- api/app/services/implicit_memory_service.py | 17 ++++----- api/app/services/memory_base_service.py | 38 +++++++++++++++++++ api/app/services/user_memory_service.py | 20 +++------- 4 files changed, 60 insertions(+), 35 deletions(-) diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index b6c40b40..b665924d 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -5,16 +5,9 @@ Implicit Emotions Storage Repository 事务由调用方控制,仓储层只使用 flush/refresh """ import logging -from datetime import date, datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Generator, Optional - -class TimeFilterUnavailableError(Exception): - """redis_client 不可用,无法执行时间轴筛选。 - - 调用方捕获此异常后可选择回退到 get_all_user_ids 进行全量处理。 - """ - import redis from sqlalchemy import exists, not_, select from sqlalchemy.orm import Session @@ -25,6 +18,13 @@ from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage logger = logging.getLogger(__name__) +class TimeFilterUnavailableError(Exception): + """redis_client 不可用,无法执行时间轴筛选。 + + 调用方捕获此异常后可选择回退到 get_all_user_ids 进行全量处理。 + """ + + class ImplicitEmotionsStorageRepository: """隐性记忆和情绪存储仓储类""" @@ -216,9 +216,7 @@ class ImplicitEmotionsStorageRepository: """ from sqlalchemy import String as SAString from sqlalchemy import cast - CST = timezone(timedelta(hours=8)) - now_cst = datetime.now(CST) - today_start = now_cst.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc).replace(tzinfo=None) + today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) tomorrow_start = today_start + timedelta(days=1) offset = 0 while True: diff --git a/api/app/services/implicit_memory_service.py b/api/app/services/implicit_memory_service.py index 10504fe7..7a186f33 100644 --- a/api/app/services/implicit_memory_service.py +++ b/api/app/services/implicit_memory_service.py @@ -34,6 +34,7 @@ from app.schemas.implicit_memory_schema import ( UserMemorySummary, ) from app.schemas.memory_config_schema import MemoryConfig +from app.services.memory_base_service import MIN_MEMORY_SUMMARY_COUNT from sqlalchemy.orm import Session logger = logging.getLogger(__name__) @@ -381,7 +382,7 @@ class ImplicitMemoryService: def _build_empty_profile(self) -> dict: """构建 MemorySummary 不足时返回的固定空白画像数据""" - now_ms = int(datetime.now().timestamp() * 1000) + now_ms = int(datetime.utcnow().timestamp() * 1000) insufficient = "Insufficient data for analysis" def _empty_dimension(name: str) -> dict: @@ -442,17 +443,13 @@ class ImplicitMemoryService: try: # 前置检查:查询该用户有效的 MemorySummary 节点数量(排除孤立节点) - query = """ - MATCH (n:MemorySummary)-[:DERIVED_FROM_STATEMENT]->(:Statement) - WHERE n.end_user_id = $end_user_id - RETURN count(DISTINCT n) as count - """ - result = await self.neo4j_connector.execute_query(query, end_user_id=user_id) - memory_summary_count = result[0]["count"] if result and len(result) > 0 else 0 + from app.services.memory_base_service import MemoryBaseService + base_service = MemoryBaseService() + memory_summary_count = await base_service.get_valid_memory_summary_count(user_id) logger.info(f"用户 MemorySummary 节点数量: {memory_summary_count} (user={user_id})") - if memory_summary_count < 5: - logger.info(f"MemorySummary 数量不足 5(当前 {memory_summary_count}),返回空白画像: user={user_id}") + if memory_summary_count < MIN_MEMORY_SUMMARY_COUNT: + logger.info(f"MemorySummary 数量不足 {MIN_MEMORY_SUMMARY_COUNT}(当前 {memory_summary_count}),返回空白画像: user={user_id}") return self._build_empty_profile() # 并行调用4个分析方法 diff --git a/api/app/services/memory_base_service.py b/api/app/services/memory_base_service.py index bc647752..e615af8b 100644 --- a/api/app/services/memory_base_service.py +++ b/api/app/services/memory_base_service.py @@ -265,12 +265,50 @@ async def Translation_English(modid, text, fields=None): # 其他类型(数字、布尔值、None等):原样返回 else: return text +# 隐性记忆画像生成所需的最低 MemorySummary 节点数量 +MIN_MEMORY_SUMMARY_COUNT = 5 + + class MemoryBaseService: """记忆服务基类,提供共享的辅助方法""" def __init__(self): self.neo4j_connector = Neo4jConnector() + async def get_valid_memory_summary_count( + self, + end_user_id: str + ) -> int: + """获取用户有效的 MemorySummary 节点数量(排除孤立节点)。 + + 只统计存在 DERIVED_FROM_STATEMENT 关系的 MemorySummary 节点。 + + Args: + end_user_id: 终端用户ID + + Returns: + 有效 MemorySummary 节点数量 + """ + try: + query = """ + MATCH (n:MemorySummary)-[:DERIVED_FROM_STATEMENT]->(:Statement) + WHERE n.end_user_id = $end_user_id + RETURN count(DISTINCT n) as count + """ + result = await self.neo4j_connector.execute_query( + query, end_user_id=end_user_id + ) + count = result[0]["count"] if result and len(result) > 0 else 0 + logger.debug( + f"有效 MemorySummary 节点数量: {count} (end_user_id={end_user_id})" + ) + return count + except Exception as e: + logger.error( + f"获取有效 MemorySummary 数量失败: {str(e)}", exc_info=True + ) + return 0 + @staticmethod def parse_timestamp(timestamp_value) -> Optional[int]: """ diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index dcbedba6..cc18447e 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -21,7 +21,7 @@ from app.repositories.end_user_repository import EndUserRepository from app.repositories.neo4j.cypher_queries import Graph_Node_query from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.schemas.memory_episodic_schema import EmotionSubject, EmotionType, type_mapping -from app.services.memory_base_service import MemoryBaseService +from app.services.memory_base_service import MemoryBaseService, MIN_MEMORY_SUMMARY_COUNT from app.services.memory_config_service import MemoryConfigService from app.services.memory_perceptual_service import MemoryPerceptualService from app.services.memory_short_service import ShortService @@ -1500,7 +1500,7 @@ async def analytics_memory_types( 2. 工作记忆 (WORKING_MEMORY) = 会话数量(通过 ConversationRepository.get_conversation_by_user_id 获取) 3. 短期记忆 (SHORT_TERM_MEMORY) = /short_term 接口返回的问答对数量 4. 显性记忆 (EXPLICIT_MEMORY) = 情景记忆 + 语义记忆(通过 MemoryBaseService.get_explicit_memory_count 获取) - 5. 隐性记忆 (IMPLICIT_MEMORY) = MemorySummary 节点数量(需 >= 5 才显示,否则为 0) + 5. 隐性记忆 (IMPLICIT_MEMORY) = MemorySummary 节点数量(需 >= MIN_MEMORY_SUMMARY_COUNT 才显示,否则为 0) 6. 情绪记忆 (EMOTIONAL_MEMORY) = 情绪标签统计总数(通过 MemoryBaseService.get_emotional_memory_count 获取) 7. 情景记忆 (EPISODIC_MEMORY) = memory_summary(通过 MemoryBaseService.get_episodic_memory_count 获取) 8. 遗忘记忆 (FORGET_MEMORY) = 激活值低于阈值的节点数(通过 MemoryBaseService.get_forget_memory_count 获取) @@ -1557,20 +1557,12 @@ async def analytics_memory_types( logger.warning(f"获取会话数量失败,工作记忆数量设为0: {str(e)}") work_count = 0 - # 获取隐性记忆数量(基于有关联关系的 MemorySummary 节点数量,需 >= 5 才计入) + # 获取隐性记忆数量(基于有关联关系的 MemorySummary 节点数量,需 >= MIN_MEMORY_SUMMARY_COUNT 才计入) implicit_count = 0 if end_user_id: try: - # 只统计有 DERIVED_FROM_STATEMENT 关系的 MemorySummary 节点,排除孤立节点 - query = """ - MATCH (n:MemorySummary)-[:DERIVED_FROM_STATEMENT]->(:Statement) - WHERE n.end_user_id = $end_user_id - RETURN count(DISTINCT n) as count - """ - result = await _neo4j_connector.execute_query(query, end_user_id=end_user_id) - memory_summary_count = result[0]["count"] if result and len(result) > 0 else 0 - # 仅当 MemorySummary 节点数量 >= 5 时才显示数量,否则为 0 - implicit_count = memory_summary_count if memory_summary_count >= 5 else 0 + memory_summary_count = await base_service.get_valid_memory_summary_count(end_user_id) + implicit_count = memory_summary_count if memory_summary_count >= MIN_MEMORY_SUMMARY_COUNT else 0 logger.debug(f"隐性记忆数量(有效MemorySummary节点数): {implicit_count} (有效MemorySummary总数={memory_summary_count}, end_user_id={end_user_id})") except Exception as e: logger.warning(f"获取MemorySummary数量失败,隐性记忆数量设为0: {str(e)}") @@ -1639,7 +1631,7 @@ async def analytics_memory_types( "WORKING_MEMORY": work_count, # 工作记忆(基于会话数量) "SHORT_TERM_MEMORY": short_term_count, # 短期记忆(基于问答对数量) "EXPLICIT_MEMORY": explicit_count, # 显性记忆(情景记忆 + 语义记忆) - "IMPLICIT_MEMORY": implicit_count, # 隐性记忆(MemorySummary节点数,需>=5) + "IMPLICIT_MEMORY": implicit_count, # 隐性记忆(MemorySummary节点数,需>=MIN_MEMORY_SUMMARY_COUNT) "EMOTIONAL_MEMORY": emotion_count, # 情绪记忆(使用情绪标签统计) "EPISODIC_MEMORY": episodic_count, # 情景记忆 "FORGET_MEMORY": forget_count # 遗忘记忆(激活值低于阈值)