Merge pull request #743 from SuanmoSuanyangTechnology/fix/redis-event
[fix] Add the function for judging the event loop switch
This commit is contained in:
@@ -33,20 +33,29 @@ _thread_local = threading.local()
|
|||||||
|
|
||||||
|
|
||||||
def get_thread_safe_redis() -> redis.StrictRedis:
|
def get_thread_safe_redis() -> redis.StrictRedis:
|
||||||
"""Get a Redis client safe for the current execution context.
|
"""Return a Redis client whose connection pool is bound to the current
|
||||||
|
thread, process **and** event loop.
|
||||||
Uses thread-local storage with PID checking to ensure:
|
|
||||||
- Each thread gets its own ConnectionPool (Celery --pool=threads)
|
The pool is recreated when:
|
||||||
- Pools are recreated after fork (Celery --pool=prefork)
|
- The PID changes (fork, Celery --pool=prefork)
|
||||||
- health_check_interval prevents stale connection errors
|
- The thread has no pool yet (Celery --pool=threads)
|
||||||
|
- The previously-cached event loop has been closed (Celery tasks call
|
||||||
Returns:
|
``_shutdown_loop_gracefully`` which closes the loop after each run)
|
||||||
redis.StrictRedis: A Redis client with a thread/process-local pool.
|
|
||||||
"""
|
"""
|
||||||
current_pid = os.getpid()
|
current_pid = os.getpid()
|
||||||
|
cached_loop = getattr(_thread_local, "loop", None)
|
||||||
|
loop_stale = cached_loop is not None and cached_loop.is_closed()
|
||||||
|
|
||||||
if not hasattr(_thread_local, "pool") or getattr(_thread_local, "pid", None) != current_pid:
|
if not hasattr(_thread_local, "pool") \
|
||||||
|
or getattr(_thread_local, "pid", None) != current_pid \
|
||||||
|
or loop_stale:
|
||||||
_thread_local.pid = current_pid
|
_thread_local.pid = current_pid
|
||||||
|
# Python 3.10+: get_event_loop() raises RuntimeError in threads
|
||||||
|
# where no loop has been set yet (e.g. Celery --pool=threads).
|
||||||
|
try:
|
||||||
|
_thread_local.loop = asyncio.get_event_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
_thread_local.loop = None
|
||||||
_thread_local.pool = ConnectionPool.from_url(
|
_thread_local.pool = ConnectionPool.from_url(
|
||||||
_REDIS_URL,
|
_REDIS_URL,
|
||||||
db=settings.REDIS_DB,
|
db=settings.REDIS_DB,
|
||||||
|
|||||||
Reference in New Issue
Block a user