[fix] Add the function for judging the event loop switch

This commit is contained in:
lanceyq
2026-03-30 21:17:21 +08:00
parent 3419bb137a
commit abc27c8372

View File

@@ -33,20 +33,24 @@ _thread_local = threading.local()
def get_thread_safe_redis() -> redis.StrictRedis: def get_thread_safe_redis() -> redis.StrictRedis:
"""Get a Redis client safe for the current execution context. """Return a Redis client whose connection pool is bound to the current
thread, process **and** event loop.
Uses thread-local storage with PID checking to ensure:
- Each thread gets its own ConnectionPool (Celery --pool=threads) The pool is recreated when:
- Pools are recreated after fork (Celery --pool=prefork) - The PID changes (fork, Celery --pool=prefork)
- health_check_interval prevents stale connection errors - The thread has no pool yet (Celery --pool=threads)
- The previously-cached event loop has been closed (Celery tasks call
Returns: ``_shutdown_loop_gracefully`` which closes the loop after each run)
redis.StrictRedis: A Redis client with a thread/process-local pool.
""" """
current_pid = os.getpid() current_pid = os.getpid()
cached_loop = getattr(_thread_local, "loop", None)
loop_stale = cached_loop is not None and cached_loop.is_closed()
if not hasattr(_thread_local, "pool") or getattr(_thread_local, "pid", None) != current_pid: if not hasattr(_thread_local, "pool") \
or getattr(_thread_local, "pid", None) != current_pid \
or loop_stale:
_thread_local.pid = current_pid _thread_local.pid = current_pid
_thread_local.loop = asyncio.get_event_loop()
_thread_local.pool = ConnectionPool.from_url( _thread_local.pool = ConnectionPool.from_url(
_REDIS_URL, _REDIS_URL,
db=settings.REDIS_DB, db=settings.REDIS_DB,