diff --git a/api/app/celery_worker.py b/api/app/celery_worker.py index e6c041f5..9fabe15b 100644 --- a/api/app/celery_worker.py +++ b/api/app/celery_worker.py @@ -19,15 +19,35 @@ import app.tasks @worker_process_init.connect def _reinit_db_pool(**kwargs): """ - prefork 子进程启动时重建 SQLAlchemy 连接池。 + prefork 子进程启动时重建被 fork 污染的资源。 - fork() 后子进程继承了父进程的连接池和底层 TCP socket, - 多个子进程共享同一个 socket 会导致 PostgreSQL 连接损坏。 - dispose() 会关闭继承来的连接,后续操作会自动创建新连接。 + fork() 后子进程继承了父进程的: + 1. SQLAlchemy 连接池 — 多进程共享 TCP socket 导致 DB 连接损坏 + 2. ThreadPoolExecutor — fork 后线程状态不确定,第二个任务会死锁 """ + # 重建 DB 连接池 from app.db import engine engine.dispose() logger.info("DB connection pool disposed for forked worker process") + # 重建模块级 ThreadPoolExecutor(fork 后线程池不可用) + try: + from app.core.rag.deepdoc.parser import figure_parser + from concurrent.futures import ThreadPoolExecutor + figure_parser.shared_executor = ThreadPoolExecutor(max_workers=10) + logger.info("figure_parser.shared_executor recreated") + except Exception as e: + logger.warning(f"Failed to recreate figure_parser.shared_executor: {e}") + + try: + from app.core.rag.utils import libre_office + from concurrent.futures import ThreadPoolExecutor + import os + max_workers = os.cpu_count() * 2 if os.cpu_count() else 4 + libre_office.executor = ThreadPoolExecutor(max_workers=max_workers) + logger.info("libre_office.executor recreated") + except Exception as e: + logger.warning(f"Failed to recreate libre_office.executor: {e}") + __all__ = ['celery_app'] diff --git a/api/app/core/rag/common/connection_utils.py b/api/app/core/rag/common/connection_utils.py index 349caa27..d5d0dc2a 100644 --- a/api/app/core/rag/common/connection_utils.py +++ b/api/app/core/rag/common/connection_utils.py @@ -33,18 +33,16 @@ def timeout(seconds: float | int | str = None, attempts: int = 2, *, exception: thread.daemon = True thread.start() + effective_timeout = seconds if seconds else 120 # 默认 120 秒超时 for a in range(attempts): try: - if os.environ.get("ENABLE_TIMEOUT_ASSERTION"): - result = result_queue.get(timeout=seconds) - else: - result = result_queue.get() + result = result_queue.get(timeout=effective_timeout) if isinstance(result, Exception): raise result return result except queue.Empty: pass - raise TimeoutError(f"Function '{func.__name__}' timed out after {seconds} seconds and {attempts} attempts.") + raise TimeoutError(f"Function '{func.__name__}' timed out after {effective_timeout} seconds and {attempts} attempts.") @wraps(func) async def async_wrapper(*args, **kwargs) -> Any: diff --git a/api/app/tasks.py b/api/app/tasks.py index e965a281..92843175 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -268,18 +268,21 @@ def parse_document(file_path: str, document_id: uuid.UUID): try: with open(file_path, "rb") as f: file_binary = f.read() + if not file_binary: + # NFS 上文件存在但内容为空(可能还在同步中) + raise IOError(f"File is empty (0 bytes), NFS may still be syncing: {file_path}") break - except FileNotFoundError: + except (FileNotFoundError, IOError) as e: if waited >= max_wait_seconds: - raise FileNotFoundError( - f"File not found at '{file_path}' after waiting {max_wait_seconds}s " - f"(NFS cache may be stale)" + raise type(e)( + f"File not accessible at '{file_path}' after waiting {max_wait_seconds}s: {e}" ) - logger.warning(f"File not visible yet on this node, retrying in {wait_interval}s: {file_path}") + logger.warning(f"File not ready on this node, retrying in {wait_interval}s: {file_path} ({e})") time.sleep(wait_interval) waited += wait_interval from app.core.rag.app.naive import chunk + logger.info(f"[ParseDoc] file_binary size={len(file_binary)} bytes, type={type(file_binary).__name__}, bool={bool(file_binary)}") res = chunk(filename=file_path, binary=file_binary, from_page=0,