[fix] The "write_tools" module actively shuts down the client, and it closes before the task event loop is completed.
This commit is contained in:
@@ -274,5 +274,21 @@ async def write(
|
|||||||
except Exception as cache_err:
|
except Exception as cache_err:
|
||||||
logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True)
|
logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True)
|
||||||
|
|
||||||
|
# Close LLM/Embedder underlying httpx clients to prevent
|
||||||
|
# 'RuntimeError: Event loop is closed' during garbage collection
|
||||||
|
for client_obj in (llm_client, embedder_client):
|
||||||
|
try:
|
||||||
|
underlying = getattr(client_obj, 'client', None) or getattr(client_obj, 'model', None)
|
||||||
|
if underlying is None:
|
||||||
|
continue
|
||||||
|
# Unwrap RedBearLLM / RedBearEmbeddings to get the LangChain model
|
||||||
|
inner = getattr(underlying, '_model', underlying)
|
||||||
|
# LangChain OpenAI models expose async_client (httpx.AsyncClient)
|
||||||
|
http_client = getattr(inner, 'async_client', None)
|
||||||
|
if http_client is not None and hasattr(http_client, 'aclose'):
|
||||||
|
await http_client.aclose()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
logger.info("=== Pipeline Complete ===")
|
logger.info("=== Pipeline Complete ===")
|
||||||
logger.info(f"Total execution time: {total_time:.2f} seconds")
|
logger.info(f"Total execution time: {total_time:.2f} seconds")
|
||||||
|
|||||||
@@ -101,7 +101,11 @@ def get_sync_redis_client() -> Optional[redis.StrictRedis]:
|
|||||||
|
|
||||||
|
|
||||||
def set_asyncio_event_loop():
|
def set_asyncio_event_loop():
|
||||||
"""Set the asyncio event loop for the current thread."""
|
"""Set the asyncio event loop for the current thread.
|
||||||
|
|
||||||
|
Always creates a fresh event loop to avoid 'Event loop is closed' errors
|
||||||
|
caused by stale httpx.AsyncClient objects from previous task runs.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
if loop.is_closed():
|
if loop.is_closed():
|
||||||
@@ -113,6 +117,30 @@ def set_asyncio_event_loop():
|
|||||||
return loop
|
return loop
|
||||||
|
|
||||||
|
|
||||||
|
def _shutdown_loop_gracefully(loop: asyncio.AbstractEventLoop):
|
||||||
|
"""Gracefully shutdown pending async generators and tasks on the event loop.
|
||||||
|
|
||||||
|
This prevents 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__
|
||||||
|
by giving pending aclose() coroutines a chance to run before the loop is discarded.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Cancel and collect all remaining tasks
|
||||||
|
all_tasks = asyncio.all_tasks(loop)
|
||||||
|
if all_tasks:
|
||||||
|
for task in all_tasks:
|
||||||
|
task.cancel()
|
||||||
|
loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True))
|
||||||
|
# Shutdown async generators (triggers __aclose__ on httpx clients etc.)
|
||||||
|
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
# Set a new event loop so subsequent tasks get a fresh one
|
||||||
|
new_loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(new_loop)
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(name="tasks.process_item")
|
@celery_app.task(name="tasks.process_item")
|
||||||
def process_item(item: dict):
|
def process_item(item: dict):
|
||||||
"""
|
"""
|
||||||
@@ -1216,11 +1244,12 @@ def write_message_task(
|
|||||||
"task_id": self.request.id
|
"task_id": self.request.id
|
||||||
}
|
}
|
||||||
finally:
|
finally:
|
||||||
if lock is not None:
|
# Gracefully shutdown the event loop to prevent
|
||||||
try:
|
# 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__
|
||||||
lock.release()
|
_shutdown_loop_gracefully(loop)
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"[CELERY WRITE] 释放锁失败: {e}")
|
|
||||||
|
# unused task
|
||||||
# @celery_app.task(name="app.core.memory.agent.health.check_read_service")
|
# @celery_app.task(name="app.core.memory.agent.health.check_read_service")
|
||||||
# def check_read_service_task() -> Dict[str, str]:
|
# def check_read_service_task() -> Dict[str, str]:
|
||||||
# """Call read_service and write latest status to Redis.
|
# """Call read_service and write latest status to Redis.
|
||||||
|
|||||||
Reference in New Issue
Block a user