refactor(tasks, redis_lock): improve Redis connection and lock handling
- Increased max_connections for Redis pool from 10 to 100. - Extended socket_timeout from 5 to 10 seconds. - Added retry mechanism with exponential backoff for Redis operations in `RedisFairLock`.
This commit is contained in:
@@ -61,9 +61,9 @@ def _get_or_create_redis_pool() -> redis.ConnectionPool | None:
|
|||||||
db=settings.REDIS_DB_CELERY_BACKEND,
|
db=settings.REDIS_DB_CELERY_BACKEND,
|
||||||
password=settings.REDIS_PASSWORD,
|
password=settings.REDIS_PASSWORD,
|
||||||
decode_responses=True,
|
decode_responses=True,
|
||||||
max_connections=10,
|
max_connections=100,
|
||||||
socket_connect_timeout=5,
|
socket_connect_timeout=5,
|
||||||
socket_timeout=5,
|
socket_timeout=10,
|
||||||
retry_on_timeout=True,
|
retry_on_timeout=True,
|
||||||
health_check_interval=30,
|
health_check_interval=30,
|
||||||
)
|
)
|
||||||
@@ -1207,7 +1207,7 @@ def write_message_task(
|
|||||||
f"- elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}")
|
f"- elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_r = get_sync_redis_client()
|
_r = redis_client
|
||||||
if _r is not None:
|
if _r is not None:
|
||||||
from datetime import timezone as _tz
|
from datetime import timezone as _tz
|
||||||
_now_utc = datetime.now(_tz.utc).isoformat()
|
_now_utc = datetime.now(_tz.utc).isoformat()
|
||||||
|
|||||||
@@ -1,8 +1,14 @@
|
|||||||
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
import redis
|
import redis
|
||||||
|
from redis.exceptions import (
|
||||||
|
ConnectionError,
|
||||||
|
TimeoutError,
|
||||||
|
RedisError,
|
||||||
|
)
|
||||||
|
|
||||||
UNLOCK_SCRIPT = """
|
UNLOCK_SCRIPT = """
|
||||||
if redis.call("get", KEYS[1]) == ARGV[1] then
|
if redis.call("get", KEYS[1]) == ARGV[1] then
|
||||||
@@ -68,6 +74,12 @@ def _ensure_str(val):
|
|||||||
class RedisFairLock:
|
class RedisFairLock:
|
||||||
# ZOMBIE CLEAN BUFFER
|
# ZOMBIE CLEAN BUFFER
|
||||||
CLEANUP_BUFFER = 30
|
CLEANUP_BUFFER = 30
|
||||||
|
# Redis 操作失败时的最大重试次数
|
||||||
|
MAX_RETRIES = 3
|
||||||
|
# 重试间隔基数(秒),实际间隔 = base * 2^attempt(指数退避)
|
||||||
|
RETRY_BACKOFF_BASE = 0.1
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -90,18 +102,43 @@ class RedisFairLock:
|
|||||||
self._renew_thread = None
|
self._renew_thread = None
|
||||||
self._stop_renew = threading.Event()
|
self._stop_renew = threading.Event()
|
||||||
|
|
||||||
|
def _exec_with_retry(self, func, *args, raise_on_fail=True, **kwargs):
|
||||||
|
"""
|
||||||
|
带指数退避重试的 Redis 操作执行器。
|
||||||
|
|
||||||
|
对 ConnectionError / TimeoutError 自动重试,其他异常直接抛出。
|
||||||
|
"""
|
||||||
|
last_err = None
|
||||||
|
for attempt in range(self.MAX_RETRIES):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except (ConnectionError, TimeoutError) as e:
|
||||||
|
last_err = e
|
||||||
|
wait = self.RETRY_BACKOFF_BASE * (2 ** attempt)
|
||||||
|
self._logger.warning(
|
||||||
|
f"[RedisFairLock] Redis error on attempt {attempt + 1}/{self.MAX_RETRIES} "
|
||||||
|
f"for key={self.key}: {e}, retrying in {wait:.2f}s"
|
||||||
|
)
|
||||||
|
time.sleep(wait)
|
||||||
|
except RedisError:
|
||||||
|
raise
|
||||||
|
if raise_on_fail:
|
||||||
|
raise last_err
|
||||||
|
return None
|
||||||
|
|
||||||
def acquire(self):
|
def acquire(self):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
ok = self.redis.eval(
|
ok = self._exec_with_retry(
|
||||||
|
self.redis.eval,
|
||||||
ACQUIRE_SCRIPT,
|
ACQUIRE_SCRIPT,
|
||||||
2,
|
2,
|
||||||
self.queue_key,
|
self.queue_key,
|
||||||
self.key,
|
self.key,
|
||||||
self.value,
|
self.value,
|
||||||
str(self.expire),
|
str(self.expire),
|
||||||
str(self.timeout + self.CLEANUP_BUFFER)
|
str(self.timeout + self.CLEANUP_BUFFER),
|
||||||
)
|
)
|
||||||
|
|
||||||
if ok == 1:
|
if ok == 1:
|
||||||
@@ -111,7 +148,10 @@ class RedisFairLock:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
if time.time() - start > self.timeout:
|
if time.time() - start > self.timeout:
|
||||||
self.redis.zrem(self.queue_key, self.value)
|
self._exec_with_retry(
|
||||||
|
self.redis.zrem, self.queue_key, self.value,
|
||||||
|
raise_on_fail=False,
|
||||||
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
time.sleep(self.retry_interval)
|
time.sleep(self.retry_interval)
|
||||||
@@ -122,12 +162,14 @@ class RedisFairLock:
|
|||||||
if self._stop_renew.is_set():
|
if self._stop_renew.is_set():
|
||||||
break
|
break
|
||||||
|
|
||||||
success = self.redis.eval(
|
success = self._exec_with_retry(
|
||||||
|
self.redis.eval,
|
||||||
RENEW_SCRIPT,
|
RENEW_SCRIPT,
|
||||||
1,
|
1,
|
||||||
self.key,
|
self.key,
|
||||||
self.value,
|
self.value,
|
||||||
str(self.expire)
|
str(self.expire),
|
||||||
|
raise_on_fail=False,
|
||||||
)
|
)
|
||||||
if not success:
|
if not success:
|
||||||
break
|
break
|
||||||
@@ -149,7 +191,10 @@ class RedisFairLock:
|
|||||||
if self.auto_renewal:
|
if self.auto_renewal:
|
||||||
self._stop_renewal()
|
self._stop_renewal()
|
||||||
|
|
||||||
self.redis.eval(UNLOCK_SCRIPT, 1, self.key, self.value)
|
self._exec_with_retry(
|
||||||
|
self.redis.eval, UNLOCK_SCRIPT, 1, self.key, self.value,
|
||||||
|
raise_on_fail=False,
|
||||||
|
)
|
||||||
|
|
||||||
self._locked = False
|
self._locked = False
|
||||||
|
|
||||||
@@ -161,4 +206,3 @@ class RedisFairLock:
|
|||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
self.release()
|
self.release()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user