From d270d25a99d00a8940748294308f0b020c6320ba Mon Sep 17 00:00:00 2001 From: Eternity <1533512157@qq.com> Date: Thu, 2 Apr 2026 13:57:22 +0800 Subject: [PATCH] refactor(tasks, redis_lock): improve Redis connection and lock handling - Increased max_connections for Redis pool from 10 to 100. - Extended socket_timeout from 5 to 10 seconds. - Added retry mechanism with exponential backoff for Redis operations in `RedisFairLock`. --- api/app/tasks.py | 6 ++-- api/app/utils/redis_lock.py | 58 ++++++++++++++++++++++++++++++++----- 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/api/app/tasks.py b/api/app/tasks.py index fa2fa55d..44bf7c90 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -61,9 +61,9 @@ def _get_or_create_redis_pool() -> redis.ConnectionPool | None: db=settings.REDIS_DB_CELERY_BACKEND, password=settings.REDIS_PASSWORD, decode_responses=True, - max_connections=10, + max_connections=100, socket_connect_timeout=5, - socket_timeout=5, + socket_timeout=10, retry_on_timeout=True, health_check_interval=30, ) @@ -1207,7 +1207,7 @@ def write_message_task( f"- elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}") try: - _r = get_sync_redis_client() + _r = redis_client if _r is not None: from datetime import timezone as _tz _now_utc = datetime.now(_tz.utc).isoformat() diff --git a/api/app/utils/redis_lock.py b/api/app/utils/redis_lock.py index f517cbb5..b192c129 100644 --- a/api/app/utils/redis_lock.py +++ b/api/app/utils/redis_lock.py @@ -1,8 +1,14 @@ +import logging import threading import time import uuid import redis +from redis.exceptions import ( + ConnectionError, + TimeoutError, + RedisError, +) UNLOCK_SCRIPT = """ if redis.call("get", KEYS[1]) == ARGV[1] then @@ -68,6 +74,12 @@ def _ensure_str(val): class RedisFairLock: # ZOMBIE CLEAN BUFFER CLEANUP_BUFFER = 30 + # Redis 操作失败时的最大重试次数 + MAX_RETRIES = 3 + # 重试间隔基数(秒),实际间隔 = base * 2^attempt(指数退避) + RETRY_BACKOFF_BASE = 0.1 + + _logger = logging.getLogger(__name__) def __init__( self, @@ -90,18 +102,43 @@ class RedisFairLock: self._renew_thread = None self._stop_renew = threading.Event() + def _exec_with_retry(self, func, *args, raise_on_fail=True, **kwargs): + """ + 带指数退避重试的 Redis 操作执行器。 + + 对 ConnectionError / TimeoutError 自动重试,其他异常直接抛出。 + """ + last_err = None + for attempt in range(self.MAX_RETRIES): + try: + return func(*args, **kwargs) + except (ConnectionError, TimeoutError) as e: + last_err = e + wait = self.RETRY_BACKOFF_BASE * (2 ** attempt) + self._logger.warning( + f"[RedisFairLock] Redis error on attempt {attempt + 1}/{self.MAX_RETRIES} " + f"for key={self.key}: {e}, retrying in {wait:.2f}s" + ) + time.sleep(wait) + except RedisError: + raise + if raise_on_fail: + raise last_err + return None + def acquire(self): start = time.time() while True: - ok = self.redis.eval( + ok = self._exec_with_retry( + self.redis.eval, ACQUIRE_SCRIPT, 2, self.queue_key, self.key, self.value, str(self.expire), - str(self.timeout + self.CLEANUP_BUFFER) + str(self.timeout + self.CLEANUP_BUFFER), ) if ok == 1: @@ -111,7 +148,10 @@ class RedisFairLock: return True if time.time() - start > self.timeout: - self.redis.zrem(self.queue_key, self.value) + self._exec_with_retry( + self.redis.zrem, self.queue_key, self.value, + raise_on_fail=False, + ) return False time.sleep(self.retry_interval) @@ -122,12 +162,14 @@ class RedisFairLock: if self._stop_renew.is_set(): break - success = self.redis.eval( + success = self._exec_with_retry( + self.redis.eval, RENEW_SCRIPT, 1, self.key, self.value, - str(self.expire) + str(self.expire), + raise_on_fail=False, ) if not success: break @@ -149,7 +191,10 @@ class RedisFairLock: if self.auto_renewal: self._stop_renewal() - self.redis.eval(UNLOCK_SCRIPT, 1, self.key, self.value) + self._exec_with_retry( + self.redis.eval, UNLOCK_SCRIPT, 1, self.key, self.value, + raise_on_fail=False, + ) self._locked = False @@ -161,4 +206,3 @@ class RedisFairLock: def __exit__(self, exc_type, exc_val, exc_tb): self.release() -