refactor(memory): extract shared MemorySummary count query and replace magic number

- Move duplicated Neo4j MemorySummary count query into
  MemoryBaseService.get_valid_memory_summary_count()
- Introduce MIN_MEMORY_SUMMARY_COUNT constant to replace hardcoded 5
- Fix import ordering in implicit_emotions_storage_repository
- Use UTC consistently for date calculations (remove CST offset,
  datetime.now → datetime.utcnow)
This commit is contained in:
lanceyq
2026-04-13 18:47:56 +08:00
parent ef8c7093b5
commit 9470dd2f1e
4 changed files with 60 additions and 35 deletions

View File

@@ -5,16 +5,9 @@ Implicit Emotions Storage Repository
事务由调用方控制,仓储层只使用 flush/refresh
"""
import logging
from datetime import date, datetime, timezone
from datetime import datetime, timedelta, timezone
from typing import Generator, Optional
class TimeFilterUnavailableError(Exception):
"""redis_client 不可用,无法执行时间轴筛选。
调用方捕获此异常后可选择回退到 get_all_user_ids 进行全量处理。
"""
import redis
from sqlalchemy import exists, not_, select
from sqlalchemy.orm import Session
@@ -25,6 +18,13 @@ from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
logger = logging.getLogger(__name__)
class TimeFilterUnavailableError(Exception):
"""redis_client 不可用,无法执行时间轴筛选。
调用方捕获此异常后可选择回退到 get_all_user_ids 进行全量处理。
"""
class ImplicitEmotionsStorageRepository:
"""隐性记忆和情绪存储仓储类"""
@@ -216,9 +216,7 @@ class ImplicitEmotionsStorageRepository:
"""
from sqlalchemy import String as SAString
from sqlalchemy import cast
CST = timezone(timedelta(hours=8))
now_cst = datetime.now(CST)
today_start = now_cst.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc).replace(tzinfo=None)
today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
tomorrow_start = today_start + timedelta(days=1)
offset = 0
while True:

View File

@@ -34,6 +34,7 @@ from app.schemas.implicit_memory_schema import (
UserMemorySummary,
)
from app.schemas.memory_config_schema import MemoryConfig
from app.services.memory_base_service import MIN_MEMORY_SUMMARY_COUNT
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
@@ -381,7 +382,7 @@ class ImplicitMemoryService:
def _build_empty_profile(self) -> dict:
"""构建 MemorySummary 不足时返回的固定空白画像数据"""
now_ms = int(datetime.now().timestamp() * 1000)
now_ms = int(datetime.utcnow().timestamp() * 1000)
insufficient = "Insufficient data for analysis"
def _empty_dimension(name: str) -> dict:
@@ -442,17 +443,13 @@ class ImplicitMemoryService:
try:
# 前置检查:查询该用户有效的 MemorySummary 节点数量(排除孤立节点)
query = """
MATCH (n:MemorySummary)-[:DERIVED_FROM_STATEMENT]->(:Statement)
WHERE n.end_user_id = $end_user_id
RETURN count(DISTINCT n) as count
"""
result = await self.neo4j_connector.execute_query(query, end_user_id=user_id)
memory_summary_count = result[0]["count"] if result and len(result) > 0 else 0
from app.services.memory_base_service import MemoryBaseService
base_service = MemoryBaseService()
memory_summary_count = await base_service.get_valid_memory_summary_count(user_id)
logger.info(f"用户 MemorySummary 节点数量: {memory_summary_count} (user={user_id})")
if memory_summary_count < 5:
logger.info(f"MemorySummary 数量不足 5(当前 {memory_summary_count}),返回空白画像: user={user_id}")
if memory_summary_count < MIN_MEMORY_SUMMARY_COUNT:
logger.info(f"MemorySummary 数量不足 {MIN_MEMORY_SUMMARY_COUNT}(当前 {memory_summary_count}),返回空白画像: user={user_id}")
return self._build_empty_profile()
# 并行调用4个分析方法

View File

@@ -265,12 +265,50 @@ async def Translation_English(modid, text, fields=None):
# 其他类型数字、布尔值、None等原样返回
else:
return text
# 隐性记忆画像生成所需的最低 MemorySummary 节点数量
MIN_MEMORY_SUMMARY_COUNT = 5
class MemoryBaseService:
"""记忆服务基类,提供共享的辅助方法"""
def __init__(self):
self.neo4j_connector = Neo4jConnector()
async def get_valid_memory_summary_count(
self,
end_user_id: str
) -> int:
"""获取用户有效的 MemorySummary 节点数量(排除孤立节点)。
只统计存在 DERIVED_FROM_STATEMENT 关系的 MemorySummary 节点。
Args:
end_user_id: 终端用户ID
Returns:
有效 MemorySummary 节点数量
"""
try:
query = """
MATCH (n:MemorySummary)-[:DERIVED_FROM_STATEMENT]->(:Statement)
WHERE n.end_user_id = $end_user_id
RETURN count(DISTINCT n) as count
"""
result = await self.neo4j_connector.execute_query(
query, end_user_id=end_user_id
)
count = result[0]["count"] if result and len(result) > 0 else 0
logger.debug(
f"有效 MemorySummary 节点数量: {count} (end_user_id={end_user_id})"
)
return count
except Exception as e:
logger.error(
f"获取有效 MemorySummary 数量失败: {str(e)}", exc_info=True
)
return 0
@staticmethod
def parse_timestamp(timestamp_value) -> Optional[int]:
"""

View File

@@ -21,7 +21,7 @@ from app.repositories.end_user_repository import EndUserRepository
from app.repositories.neo4j.cypher_queries import Graph_Node_query
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.memory_episodic_schema import EmotionSubject, EmotionType, type_mapping
from app.services.memory_base_service import MemoryBaseService
from app.services.memory_base_service import MemoryBaseService, MIN_MEMORY_SUMMARY_COUNT
from app.services.memory_config_service import MemoryConfigService
from app.services.memory_perceptual_service import MemoryPerceptualService
from app.services.memory_short_service import ShortService
@@ -1500,7 +1500,7 @@ async def analytics_memory_types(
2. 工作记忆 (WORKING_MEMORY) = 会话数量(通过 ConversationRepository.get_conversation_by_user_id 获取)
3. 短期记忆 (SHORT_TERM_MEMORY) = /short_term 接口返回的问答对数量
4. 显性记忆 (EXPLICIT_MEMORY) = 情景记忆 + 语义记忆(通过 MemoryBaseService.get_explicit_memory_count 获取)
5. 隐性记忆 (IMPLICIT_MEMORY) = MemorySummary 节点数量(需 >= 5 才显示,否则为 0
5. 隐性记忆 (IMPLICIT_MEMORY) = MemorySummary 节点数量(需 >= MIN_MEMORY_SUMMARY_COUNT 才显示,否则为 0
6. 情绪记忆 (EMOTIONAL_MEMORY) = 情绪标签统计总数(通过 MemoryBaseService.get_emotional_memory_count 获取)
7. 情景记忆 (EPISODIC_MEMORY) = memory_summary通过 MemoryBaseService.get_episodic_memory_count 获取)
8. 遗忘记忆 (FORGET_MEMORY) = 激活值低于阈值的节点数(通过 MemoryBaseService.get_forget_memory_count 获取)
@@ -1557,20 +1557,12 @@ async def analytics_memory_types(
logger.warning(f"获取会话数量失败工作记忆数量设为0: {str(e)}")
work_count = 0
# 获取隐性记忆数量(基于有关联关系的 MemorySummary 节点数量,需 >= 5 才计入)
# 获取隐性记忆数量(基于有关联关系的 MemorySummary 节点数量,需 >= MIN_MEMORY_SUMMARY_COUNT 才计入)
implicit_count = 0
if end_user_id:
try:
# 只统计有 DERIVED_FROM_STATEMENT 关系的 MemorySummary 节点,排除孤立节点
query = """
MATCH (n:MemorySummary)-[:DERIVED_FROM_STATEMENT]->(:Statement)
WHERE n.end_user_id = $end_user_id
RETURN count(DISTINCT n) as count
"""
result = await _neo4j_connector.execute_query(query, end_user_id=end_user_id)
memory_summary_count = result[0]["count"] if result and len(result) > 0 else 0
# 仅当 MemorySummary 节点数量 >= 5 时才显示数量,否则为 0
implicit_count = memory_summary_count if memory_summary_count >= 5 else 0
memory_summary_count = await base_service.get_valid_memory_summary_count(end_user_id)
implicit_count = memory_summary_count if memory_summary_count >= MIN_MEMORY_SUMMARY_COUNT else 0
logger.debug(f"隐性记忆数量有效MemorySummary节点数: {implicit_count} (有效MemorySummary总数={memory_summary_count}, end_user_id={end_user_id})")
except Exception as e:
logger.warning(f"获取MemorySummary数量失败隐性记忆数量设为0: {str(e)}")
@@ -1639,7 +1631,7 @@ async def analytics_memory_types(
"WORKING_MEMORY": work_count, # 工作记忆(基于会话数量)
"SHORT_TERM_MEMORY": short_term_count, # 短期记忆(基于问答对数量)
"EXPLICIT_MEMORY": explicit_count, # 显性记忆(情景记忆 + 语义记忆)
"IMPLICIT_MEMORY": implicit_count, # 隐性记忆MemorySummary节点数需>=5
"IMPLICIT_MEMORY": implicit_count, # 隐性记忆MemorySummary节点数需>=MIN_MEMORY_SUMMARY_COUNT
"EMOTIONAL_MEMORY": emotion_count, # 情绪记忆(使用情绪标签统计)
"EPISODIC_MEMORY": episodic_count, # 情景记忆
"FORGET_MEMORY": forget_count # 遗忘记忆(激活值低于阈值)