Merge pull request #775 from SuanmoSuanyangTechnology/pref/redis-connections
refactor(tasks, redis_lock): improve Redis connection and lock handling
This commit is contained in:
@@ -61,9 +61,9 @@ def _get_or_create_redis_pool() -> redis.ConnectionPool | None:
|
||||
db=settings.REDIS_DB_CELERY_BACKEND,
|
||||
password=settings.REDIS_PASSWORD,
|
||||
decode_responses=True,
|
||||
max_connections=10,
|
||||
max_connections=100,
|
||||
socket_connect_timeout=5,
|
||||
socket_timeout=5,
|
||||
socket_timeout=10,
|
||||
retry_on_timeout=True,
|
||||
health_check_interval=30,
|
||||
)
|
||||
@@ -1207,7 +1207,7 @@ def write_message_task(
|
||||
f"- elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}")
|
||||
|
||||
try:
|
||||
_r = get_sync_redis_client()
|
||||
_r = redis_client
|
||||
if _r is not None:
|
||||
from datetime import timezone as _tz
|
||||
_now_utc = datetime.now(_tz.utc).isoformat()
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import redis
|
||||
from redis.exceptions import (
|
||||
ConnectionError,
|
||||
TimeoutError,
|
||||
RedisError,
|
||||
)
|
||||
|
||||
UNLOCK_SCRIPT = """
|
||||
if redis.call("get", KEYS[1]) == ARGV[1] then
|
||||
@@ -68,6 +74,12 @@ def _ensure_str(val):
|
||||
class RedisFairLock:
|
||||
# ZOMBIE CLEAN BUFFER
|
||||
CLEANUP_BUFFER = 30
|
||||
# Redis 操作失败时的最大重试次数
|
||||
MAX_RETRIES = 3
|
||||
# 重试间隔基数(秒),实际间隔 = base * 2^attempt(指数退避)
|
||||
RETRY_BACKOFF_BASE = 0.1
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -90,18 +102,43 @@ class RedisFairLock:
|
||||
self._renew_thread = None
|
||||
self._stop_renew = threading.Event()
|
||||
|
||||
def _exec_with_retry(self, func, *args, raise_on_fail=True, **kwargs):
|
||||
"""
|
||||
带指数退避重试的 Redis 操作执行器。
|
||||
|
||||
对 ConnectionError / TimeoutError 自动重试,其他异常直接抛出。
|
||||
"""
|
||||
last_err = None
|
||||
for attempt in range(self.MAX_RETRIES):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except (ConnectionError, TimeoutError) as e:
|
||||
last_err = e
|
||||
wait = self.RETRY_BACKOFF_BASE * (2 ** attempt)
|
||||
self._logger.warning(
|
||||
f"[RedisFairLock] Redis error on attempt {attempt + 1}/{self.MAX_RETRIES} "
|
||||
f"for key={self.key}: {e}, retrying in {wait:.2f}s"
|
||||
)
|
||||
time.sleep(wait)
|
||||
except RedisError:
|
||||
raise
|
||||
if raise_on_fail:
|
||||
raise last_err
|
||||
return None
|
||||
|
||||
def acquire(self):
|
||||
start = time.time()
|
||||
|
||||
while True:
|
||||
ok = self.redis.eval(
|
||||
ok = self._exec_with_retry(
|
||||
self.redis.eval,
|
||||
ACQUIRE_SCRIPT,
|
||||
2,
|
||||
self.queue_key,
|
||||
self.key,
|
||||
self.value,
|
||||
str(self.expire),
|
||||
str(self.timeout + self.CLEANUP_BUFFER)
|
||||
str(self.timeout + self.CLEANUP_BUFFER),
|
||||
)
|
||||
|
||||
if ok == 1:
|
||||
@@ -111,7 +148,10 @@ class RedisFairLock:
|
||||
return True
|
||||
|
||||
if time.time() - start > self.timeout:
|
||||
self.redis.zrem(self.queue_key, self.value)
|
||||
self._exec_with_retry(
|
||||
self.redis.zrem, self.queue_key, self.value,
|
||||
raise_on_fail=False,
|
||||
)
|
||||
return False
|
||||
|
||||
time.sleep(self.retry_interval)
|
||||
@@ -122,12 +162,14 @@ class RedisFairLock:
|
||||
if self._stop_renew.is_set():
|
||||
break
|
||||
|
||||
success = self.redis.eval(
|
||||
success = self._exec_with_retry(
|
||||
self.redis.eval,
|
||||
RENEW_SCRIPT,
|
||||
1,
|
||||
self.key,
|
||||
self.value,
|
||||
str(self.expire)
|
||||
str(self.expire),
|
||||
raise_on_fail=False,
|
||||
)
|
||||
if not success:
|
||||
break
|
||||
@@ -149,7 +191,10 @@ class RedisFairLock:
|
||||
if self.auto_renewal:
|
||||
self._stop_renewal()
|
||||
|
||||
self.redis.eval(UNLOCK_SCRIPT, 1, self.key, self.value)
|
||||
self._exec_with_retry(
|
||||
self.redis.eval, UNLOCK_SCRIPT, 1, self.key, self.value,
|
||||
raise_on_fail=False,
|
||||
)
|
||||
|
||||
self._locked = False
|
||||
|
||||
@@ -161,4 +206,3 @@ class RedisFairLock:
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.release()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user