[fix] Remove duplicate creations
This commit is contained in:
@@ -101,10 +101,11 @@ def get_sync_redis_client() -> Optional[redis.StrictRedis]:
|
|||||||
|
|
||||||
|
|
||||||
def set_asyncio_event_loop():
|
def set_asyncio_event_loop():
|
||||||
"""Set the asyncio event loop for the current thread.
|
"""Ensure an open asyncio event loop exists for the current thread.
|
||||||
|
|
||||||
Always creates a fresh event loop to avoid 'Event loop is closed' errors
|
Reuses the existing event loop if one is available and still open.
|
||||||
caused by stale httpx.AsyncClient objects from previous task runs.
|
Creates and installs a new event loop only when the current one is
|
||||||
|
closed or missing (e.g. after ``_shutdown_loop_gracefully``).
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
@@ -122,6 +123,9 @@ def _shutdown_loop_gracefully(loop: asyncio.AbstractEventLoop):
|
|||||||
|
|
||||||
This prevents 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__
|
This prevents 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__
|
||||||
by giving pending aclose() coroutines a chance to run before the loop is discarded.
|
by giving pending aclose() coroutines a chance to run before the loop is discarded.
|
||||||
|
|
||||||
|
Note: This only tears down the given loop. Callers that need a fresh event
|
||||||
|
loop afterwards should use ``set_asyncio_event_loop()`` explicitly.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Cancel and collect all remaining tasks
|
# Cancel and collect all remaining tasks
|
||||||
@@ -136,9 +140,6 @@ def _shutdown_loop_gracefully(loop: asyncio.AbstractEventLoop):
|
|||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
loop.close()
|
loop.close()
|
||||||
# Set a new event loop so subsequent tasks get a fresh one
|
|
||||||
new_loop = asyncio.new_event_loop()
|
|
||||||
asyncio.set_event_loop(new_loop)
|
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(name="tasks.process_item")
|
@celery_app.task(name="tasks.process_item")
|
||||||
|
|||||||
Reference in New Issue
Block a user