From c90b58bbcd891e95ab2b5ddeb38819c9dfe132ab Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Mon, 30 Mar 2026 17:05:59 +0800 Subject: [PATCH 1/3] [fix] The "write_tools" module actively shuts down the client, and it closes before the task event loop is completed. --- .../core/memory/agent/utils/write_tools.py | 16 ++++++++ api/app/tasks.py | 41 ++++++++++++++++--- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index 3af9326e..1f437973 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -274,5 +274,21 @@ async def write( except Exception as cache_err: logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True) + # Close LLM/Embedder underlying httpx clients to prevent + # 'RuntimeError: Event loop is closed' during garbage collection + for client_obj in (llm_client, embedder_client): + try: + underlying = getattr(client_obj, 'client', None) or getattr(client_obj, 'model', None) + if underlying is None: + continue + # Unwrap RedBearLLM / RedBearEmbeddings to get the LangChain model + inner = getattr(underlying, '_model', underlying) + # LangChain OpenAI models expose async_client (httpx.AsyncClient) + http_client = getattr(inner, 'async_client', None) + if http_client is not None and hasattr(http_client, 'aclose'): + await http_client.aclose() + except Exception: + pass + logger.info("=== Pipeline Complete ===") logger.info(f"Total execution time: {total_time:.2f} seconds") diff --git a/api/app/tasks.py b/api/app/tasks.py index 0e909fcc..b7826332 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -101,7 +101,11 @@ def get_sync_redis_client() -> Optional[redis.StrictRedis]: def set_asyncio_event_loop(): - """Set the asyncio event loop for the current thread.""" + """Set the asyncio event loop for the current thread. + + Always creates a fresh event loop to avoid 'Event loop is closed' errors + caused by stale httpx.AsyncClient objects from previous task runs. + """ try: loop = asyncio.get_event_loop() if loop.is_closed(): @@ -113,6 +117,30 @@ def set_asyncio_event_loop(): return loop +def _shutdown_loop_gracefully(loop: asyncio.AbstractEventLoop): + """Gracefully shutdown pending async generators and tasks on the event loop. + + This prevents 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__ + by giving pending aclose() coroutines a chance to run before the loop is discarded. + """ + try: + # Cancel and collect all remaining tasks + all_tasks = asyncio.all_tasks(loop) + if all_tasks: + for task in all_tasks: + task.cancel() + loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True)) + # Shutdown async generators (triggers __aclose__ on httpx clients etc.) + loop.run_until_complete(loop.shutdown_asyncgens()) + except Exception: + pass + finally: + loop.close() + # Set a new event loop so subsequent tasks get a fresh one + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + + @celery_app.task(name="tasks.process_item") def process_item(item: dict): """ @@ -1216,11 +1244,12 @@ def write_message_task( "task_id": self.request.id } finally: - if lock is not None: - try: - lock.release() - except Exception as e: - logger.warning(f"[CELERY WRITE] 释放锁失败: {e}") + # Gracefully shutdown the event loop to prevent + # 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__ + _shutdown_loop_gracefully(loop) + + +# unused task # @celery_app.task(name="app.core.memory.agent.health.check_read_service") # def check_read_service_task() -> Dict[str, str]: # """Call read_service and write latest status to Redis. From 0c677701c0f37f8ad6b3b008b10cf05c12b9fd25 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Mon, 30 Mar 2026 18:29:17 +0800 Subject: [PATCH 2/3] [fix] iron release --- api/app/tasks.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/api/app/tasks.py b/api/app/tasks.py index b7826332..73b001e2 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1244,6 +1244,11 @@ def write_message_task( "task_id": self.request.id } finally: + if lock is not None: + try: + lock.release() + except Exception as e: + logger.warning(f"[CELERY WRITE] 释放锁失败: {e}") # Gracefully shutdown the event loop to prevent # 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__ _shutdown_loop_gracefully(loop) From 6e7c641fd4965045f051ae9b375c5eab4da66f63 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Mon, 30 Mar 2026 18:46:25 +0800 Subject: [PATCH 3/3] [fix] Remove duplicate creations --- api/app/tasks.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/api/app/tasks.py b/api/app/tasks.py index 73b001e2..4928ca7f 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -101,10 +101,11 @@ def get_sync_redis_client() -> Optional[redis.StrictRedis]: def set_asyncio_event_loop(): - """Set the asyncio event loop for the current thread. + """Ensure an open asyncio event loop exists for the current thread. - Always creates a fresh event loop to avoid 'Event loop is closed' errors - caused by stale httpx.AsyncClient objects from previous task runs. + Reuses the existing event loop if one is available and still open. + Creates and installs a new event loop only when the current one is + closed or missing (e.g. after ``_shutdown_loop_gracefully``). """ try: loop = asyncio.get_event_loop() @@ -122,6 +123,9 @@ def _shutdown_loop_gracefully(loop: asyncio.AbstractEventLoop): This prevents 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__ by giving pending aclose() coroutines a chance to run before the loop is discarded. + + Note: This only tears down the given loop. Callers that need a fresh event + loop afterwards should use ``set_asyncio_event_loop()`` explicitly. """ try: # Cancel and collect all remaining tasks @@ -136,9 +140,6 @@ def _shutdown_loop_gracefully(loop: asyncio.AbstractEventLoop): pass finally: loop.close() - # Set a new event loop so subsequent tasks get a fresh one - new_loop = asyncio.new_event_loop() - asyncio.set_event_loop(new_loop) @celery_app.task(name="tasks.process_item")