diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index aa62e07d..f0871b4b 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -8,6 +8,13 @@ import logging from datetime import date, datetime, timedelta, timezone from typing import Generator, Optional + +class TimeFilterUnavailableError(Exception): + """redis_client 不可用,无法执行时间轴筛选。 + + 调用方捕获此异常后可选择回退到 get_all_user_ids 进行全量处理。 + """ + import redis from sqlalchemy import exists, not_, select from sqlalchemy.orm import Session @@ -113,7 +120,7 @@ class ImplicitEmotionsStorageRepository: logger.error(f"分批获取用户ID失败: offset={offset}, error={e}") break - def get_users_needing_refresh(self, redis_client: Optional[redis.StrictRedis], batch_size: int = 100) -> Generator[str, None, None]: + def get_users_needing_refresh(self, redis_client: redis.StrictRedis, batch_size: int = 100) -> Generator[str, None, None]: """分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。 筛选逻辑: @@ -123,32 +130,21 @@ class ImplicitEmotionsStorageRepository: - 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新 - 若 last_done <= updated_at,说明已是最新,跳过 - 如果 redis_client 为 None,则降级为返回所有用户(禁用时间过滤)。 - Args: - redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB),为 None 时抛出 RuntimeError + redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB) batch_size: 每批次加载的数量 Raises: - RuntimeError: redis_client 为 None 时,调用方可捕获并回退到 get_all_user_ids + TimeFilterUnavailableError: redis_client 为 None 时抛出,调用方可捕获并回退到 get_all_user_ids Yields: 需要刷新的用户ID字符串 """ if redis_client is None: - raise RuntimeError("get_users_needing_refresh: redis_client 不可用,无法执行时间轴筛选") - from datetime import timezone + raise TimeFilterUnavailableError("redis_client 不可用,无法执行时间轴筛选") from redis.exceptions import RedisError - # 如果 Redis 不可用,降级为处理所有用户 - if redis_client is None: - logger.warning( - "Redis 客户端不可用,时间过滤已禁用,将处理所有存量用户" - ) - yield from self.get_all_user_ids(batch_size) - return - offset = 0 while True: try: diff --git a/api/app/tasks.py b/api/app/tasks.py index 101c66a5..6fd9c954 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2228,6 +2228,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage from app.repositories.implicit_emotions_storage_repository import ( ImplicitEmotionsStorageRepository, + TimeFilterUnavailableError, ) from app.services.emotion_analytics_service import EmotionAnalyticsService from app.services.implicit_memory_service import ImplicitMemoryService @@ -2259,7 +2260,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: # Redis 不可用时回退到全量处理 try: refresh_iter = repo.get_users_needing_refresh(_redis_client, batch_size=100) - except RuntimeError as e: + except TimeFilterUnavailableError as e: logger.warning(f"时间轴筛选不可用,回退到全量刷新: {e}") refresh_iter = repo.get_all_user_ids(batch_size=100)