From cef14cda9e3819cd485458f4148a37f7582cb76b Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Sat, 7 Mar 2026 16:36:24 +0800 Subject: [PATCH] [add] Standardize time zones; Reuse a single Redis client; Use "mget" for batch writing requests --- .../implicit_emotions_storage_repository.py | 29 +++++++--- api/app/tasks.py | 58 +++++++++++-------- 2 files changed, 57 insertions(+), 30 deletions(-) diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index d1edf0ec..dfc7061b 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -142,17 +142,32 @@ class ImplicitEmotionsStorageRepository: if not batch: break - for end_user_id, updated_at in batch: - raw = redis_client.get(f"write_message:last_done:{end_user_id}") + # 批量获取当前批次所有用户的 last_done 时间戳(一次网络往返) + keys = [f"write_message:last_done:{end_user_id}" for end_user_id, _ in batch] + raw_values = redis_client.mget(keys) + + for (end_user_id, updated_at), raw in zip(batch, raw_values): if raw is None: - # 该用户从未有过 write_message 成功记录,跳过 continue try: + CST = timezone(timedelta(hours=8)) last_done = datetime.fromisoformat(raw) - # 统一去掉时区信息做 naive 比较 - if last_done.tzinfo is not None: - last_done = last_done.astimezone(timezone.utc).replace(tzinfo=None) - if updated_at is None or last_done > updated_at: + # 统一转为 CST naive 时间做比较 + if last_done.tzinfo is None: + last_done = last_done.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None) + else: + last_done = last_done.astimezone(CST).replace(tzinfo=None) + + if updated_at is None: + yield end_user_id + continue + # updated_at 同样转为 CST naive + if updated_at.tzinfo is None: + updated_at_cst = updated_at.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None) + else: + updated_at_cst = updated_at.astimezone(CST).replace(tzinfo=None) + + if last_done > updated_at_cst: yield end_user_id except Exception as e: logger.warning(f"解析 last_done 时间戳失败: end_user_id={end_user_id}, raw={raw}, error={e}") diff --git a/api/app/tasks.py b/api/app/tasks.py index d4afcc68..0c0fd01e 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -15,6 +15,29 @@ from uuid import UUID import redis import requests +# 模块级同步 Redis 客户端单例,供 Celery 任务共享使用(避免每次任务新建连接) +# 连接 CELERY_BACKEND DB,与 write_message:last_done 时间戳写入保持一致 +def _build_sync_redis_client(): + try: + return redis.StrictRedis( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=settings.REDIS_DB_CELERY_BACKEND, + password=settings.REDIS_PASSWORD, + decode_responses=True, + ) + except Exception: + return None + +_sync_redis_client: redis.StrictRedis = None + +def get_sync_redis_client() -> redis.StrictRedis: + """获取模块级同步 Redis 客户端(懒初始化单例)""" + global _sync_redis_client + if _sync_redis_client is None: + _sync_redis_client = _build_sync_redis_client() + return _sync_redis_client + # Import a unified Celery instance from app.celery_app import celery_app from app.core.config import settings @@ -1090,22 +1113,18 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s logger.info( f"[CELERY WRITE] Task completed successfully - elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}") - # 记录该用户最后一次 write_message 成功的时间,供 init_implicit_emotions_for_users 做时间轴筛选 + # 记录该用户最后一次 write_message 成功的时间,供时间轴筛选使用 try: - import redis as _redis - from urllib.parse import quote as _quote - _r = _redis.StrictRedis( - host=settings.REDIS_HOST, - port=settings.REDIS_PORT, - db=settings.REDIS_DB_CELERY_BACKEND, - password=settings.REDIS_PASSWORD, - decode_responses=True, - ) - _r.set( - f"write_message:last_done:{end_user_id}", - datetime.utcnow().isoformat(), - ex=86400 * 30, # 30天过期 - ) + _r = get_sync_redis_client() + if _r is not None: + from datetime import timezone as _tz, timedelta as _td + _CST = _tz(timedelta(hours=8)) + _now_cst = datetime.now(_CST).replace(tzinfo=None).isoformat() + _r.set( + f"write_message:last_done:{end_user_id}", + _now_cst, + ex=86400 * 30, + ) except Exception as _e: logger.warning(f"[CELERY WRITE] 写入 last_done 时间戳失败(不影响主流程): {_e}") @@ -2196,14 +2215,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: logger.info(f"表中存量用户总数: {total_users},开始时间轴筛选") # 构建 Redis 同步客户端,用于时间轴筛选 - import redis as _redis - _redis_client = _redis.StrictRedis( - host=settings.REDIS_HOST, - port=settings.REDIS_PORT, - db=settings.REDIS_DB_CELERY_BACKEND, - password=settings.REDIS_PASSWORD, - decode_responses=True, - ) + _redis_client = get_sync_redis_client() # 只处理 last_done > updated_at 的用户(有新记忆写入的用户) for end_user_id in repo.get_users_needing_refresh(_redis_client, batch_size=100):