diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index 3af9326e..1f437973 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -274,5 +274,21 @@ async def write( except Exception as cache_err: logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True) + # Close LLM/Embedder underlying httpx clients to prevent + # 'RuntimeError: Event loop is closed' during garbage collection + for client_obj in (llm_client, embedder_client): + try: + underlying = getattr(client_obj, 'client', None) or getattr(client_obj, 'model', None) + if underlying is None: + continue + # Unwrap RedBearLLM / RedBearEmbeddings to get the LangChain model + inner = getattr(underlying, '_model', underlying) + # LangChain OpenAI models expose async_client (httpx.AsyncClient) + http_client = getattr(inner, 'async_client', None) + if http_client is not None and hasattr(http_client, 'aclose'): + await http_client.aclose() + except Exception: + pass + logger.info("=== Pipeline Complete ===") logger.info(f"Total execution time: {total_time:.2f} seconds") diff --git a/api/app/tasks.py b/api/app/tasks.py index 0e909fcc..b7826332 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -101,7 +101,11 @@ def get_sync_redis_client() -> Optional[redis.StrictRedis]: def set_asyncio_event_loop(): - """Set the asyncio event loop for the current thread.""" + """Set the asyncio event loop for the current thread. + + Always creates a fresh event loop to avoid 'Event loop is closed' errors + caused by stale httpx.AsyncClient objects from previous task runs. + """ try: loop = asyncio.get_event_loop() if loop.is_closed(): @@ -113,6 +117,30 @@ def set_asyncio_event_loop(): return loop +def _shutdown_loop_gracefully(loop: asyncio.AbstractEventLoop): + """Gracefully shutdown pending async generators and tasks on the event loop. + + This prevents 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__ + by giving pending aclose() coroutines a chance to run before the loop is discarded. + """ + try: + # Cancel and collect all remaining tasks + all_tasks = asyncio.all_tasks(loop) + if all_tasks: + for task in all_tasks: + task.cancel() + loop.run_until_complete(asyncio.gather(*all_tasks, return_exceptions=True)) + # Shutdown async generators (triggers __aclose__ on httpx clients etc.) + loop.run_until_complete(loop.shutdown_asyncgens()) + except Exception: + pass + finally: + loop.close() + # Set a new event loop so subsequent tasks get a fresh one + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + + @celery_app.task(name="tasks.process_item") def process_item(item: dict): """ @@ -1216,11 +1244,12 @@ def write_message_task( "task_id": self.request.id } finally: - if lock is not None: - try: - lock.release() - except Exception as e: - logger.warning(f"[CELERY WRITE] 释放锁失败: {e}") + # Gracefully shutdown the event loop to prevent + # 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__ + _shutdown_loop_gracefully(loop) + + +# unused task # @celery_app.task(name="app.core.memory.agent.health.check_read_service") # def check_read_service_task() -> Dict[str, str]: # """Call read_service and write latest status to Redis.