From abc27c837213df859f3a743edeffa81b5af99c20 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Mon, 30 Mar 2026 21:17:21 +0800 Subject: [PATCH] [fix] Add the function for judging the event loop switch --- api/app/aioRedis.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/api/app/aioRedis.py b/api/app/aioRedis.py index f79ef0e1..357533ad 100644 --- a/api/app/aioRedis.py +++ b/api/app/aioRedis.py @@ -33,20 +33,24 @@ _thread_local = threading.local() def get_thread_safe_redis() -> redis.StrictRedis: - """Get a Redis client safe for the current execution context. - - Uses thread-local storage with PID checking to ensure: - - Each thread gets its own ConnectionPool (Celery --pool=threads) - - Pools are recreated after fork (Celery --pool=prefork) - - health_check_interval prevents stale connection errors - - Returns: - redis.StrictRedis: A Redis client with a thread/process-local pool. + """Return a Redis client whose connection pool is bound to the current + thread, process **and** event loop. + + The pool is recreated when: + - The PID changes (fork, Celery --pool=prefork) + - The thread has no pool yet (Celery --pool=threads) + - The previously-cached event loop has been closed (Celery tasks call + ``_shutdown_loop_gracefully`` which closes the loop after each run) """ current_pid = os.getpid() + cached_loop = getattr(_thread_local, "loop", None) + loop_stale = cached_loop is not None and cached_loop.is_closed() - if not hasattr(_thread_local, "pool") or getattr(_thread_local, "pid", None) != current_pid: + if not hasattr(_thread_local, "pool") \ + or getattr(_thread_local, "pid", None) != current_pid \ + or loop_stale: _thread_local.pid = current_pid + _thread_local.loop = asyncio.get_event_loop() _thread_local.pool = ConnectionPool.from_url( _REDIS_URL, db=settings.REDIS_DB,