Merge pull request #739 from SuanmoSuanyangTechnology/fix/python-GC

[fix] The "write_tools" module actively shuts down the client, and it…
This commit is contained in:
Ke Sun
2026-03-30 18:48:34 +08:00
committed by GitHub
2 changed files with 52 additions and 1 deletions

View File

@@ -274,5 +274,21 @@ async def write(
except Exception as cache_err:
logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True)
# Close LLM/Embedder underlying httpx clients to prevent
# 'RuntimeError: Event loop is closed' during garbage collection
for client_obj in (llm_client, embedder_client):
try:
underlying = getattr(client_obj, 'client', None) or getattr(client_obj, 'model', None)
if underlying is None:
continue
# Unwrap RedBearLLM / RedBearEmbeddings to get the LangChain model
inner = getattr(underlying, '_model', underlying)
# LangChain OpenAI models expose async_client (httpx.AsyncClient)
http_client = getattr(inner, 'async_client', None)
if http_client is not None and hasattr(http_client, 'aclose'):
await http_client.aclose()
except Exception:
pass
logger.info("=== Pipeline Complete ===")
logger.info(f"Total execution time: {total_time:.2f} seconds")

View File

@@ -101,7 +101,12 @@ def get_sync_redis_client() -> Optional[redis.StrictRedis]:
def set_asyncio_event_loop():
"""Set the asyncio event loop for the current thread."""
"""Ensure an open asyncio event loop exists for the current thread.
Reuses the existing event loop if one is available and still open.
Creates and installs a new event loop only when the current one is
closed or missing (e.g. after ``_shutdown_loop_gracefully``).
"""
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
@@ -113,6 +118,30 @@ def set_asyncio_event_loop():
return loop
def _shutdown_loop_gracefully(loop: asyncio.AbstractEventLoop):
"""Gracefully shutdown pending async generators and tasks on the event loop.
This prevents 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__
by giving pending aclose() coroutines a chance to run before the loop is discarded.
Note: This only tears down the given loop. Callers that need a fresh event
loop afterwards should use ``set_asyncio_event_loop()`` explicitly.
"""
try:
# Cancel and collect all remaining tasks
all_tasks = asyncio.all_tasks(loop)
if all_tasks:
for task in all_tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True))
# Shutdown async generators (triggers __aclose__ on httpx clients etc.)
loop.run_until_complete(loop.shutdown_asyncgens())
except Exception:
pass
finally:
loop.close()
@celery_app.task(name="tasks.process_item")
def process_item(item: dict):
"""
@@ -1221,6 +1250,12 @@ def write_message_task(
lock.release()
except Exception as e:
logger.warning(f"[CELERY WRITE] 释放锁失败: {e}")
# Gracefully shutdown the event loop to prevent
# 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__
_shutdown_loop_gracefully(loop)
# unused task
# @celery_app.task(name="app.core.memory.agent.health.check_read_service")
# def check_read_service_task() -> Dict[str, str]:
# """Call read_service and write latest status to Redis.