diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index dfc7061b..bb869934 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -122,12 +122,17 @@ class ImplicitEmotionsStorageRepository: - 若 last_done <= updated_at,说明已是最新,跳过 Args: - redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB) + redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB),为 None 时抛出 RuntimeError batch_size: 每批次加载的数量 + Raises: + RuntimeError: redis_client 为 None 时,调用方可捕获并回退到 get_all_user_ids + Yields: 需要刷新的用户ID字符串 """ + if redis_client is None: + raise RuntimeError("get_users_needing_refresh: redis_client 不可用,无法执行时间轴筛选") from datetime import timezone offset = 0 while True: @@ -152,16 +157,14 @@ class ImplicitEmotionsStorageRepository: try: CST = timezone(timedelta(hours=8)) last_done = datetime.fromisoformat(raw) - # 统一转为 CST naive 时间做比较 - if last_done.tzinfo is None: - last_done = last_done.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None) - else: + # last_done 写入时已是 CST naive,直接使用,无需转换 + if last_done.tzinfo is not None: last_done = last_done.astimezone(CST).replace(tzinfo=None) if updated_at is None: yield end_user_id continue - # updated_at 同样转为 CST naive + # updated_at 数据库存的是 UTC naive,转为 CST naive 再比较 if updated_at.tzinfo is None: updated_at_cst = updated_at.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None) else: diff --git a/api/app/tasks.py b/api/app/tasks.py index 2387a9e4..c5e8a105 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1117,7 +1117,7 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s try: _r = get_sync_redis_client() if _r is not None: - from datetime import timezone as _tz, timedelta as _td + from datetime import timezone as _tz _CST = _tz(timedelta(hours=8)) _now_cst = datetime.now(_CST).replace(tzinfo=None).isoformat() _r.set( @@ -2218,7 +2218,14 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: _redis_client = get_sync_redis_client() # 只处理 last_done > updated_at 的用户(有新记忆写入的用户) - for end_user_id in repo.get_users_needing_refresh(_redis_client, batch_size=100): + # Redis 不可用时回退到全量处理 + try: + refresh_iter = repo.get_users_needing_refresh(_redis_client, batch_size=100) + except RuntimeError as e: + logger.warning(f"时间轴筛选不可用,回退到全量刷新: {e}") + refresh_iter = repo.get_all_user_ids(batch_size=100) + + for end_user_id in refresh_iter: logger.info(f"开始处理用户: {end_user_id}") user_start_time = time.time()