diff --git a/api/app/celery_worker.py b/api/app/celery_worker.py index 4ea4fee1..e6c041f5 100644 --- a/api/app/celery_worker.py +++ b/api/app/celery_worker.py @@ -2,6 +2,8 @@ Celery Worker 入口点 用于启动 Celery Worker: celery -A app.celery_worker worker --loglevel=info """ +from celery.signals import worker_process_init + from app.celery_app import celery_app from app.core.logging_config import LoggingConfig, get_logger @@ -13,4 +15,19 @@ logger.info("Celery worker logging initialized") # 导入任务模块以注册任务 import app.tasks + +@worker_process_init.connect +def _reinit_db_pool(**kwargs): + """ + prefork 子进程启动时重建 SQLAlchemy 连接池。 + + fork() 后子进程继承了父进程的连接池和底层 TCP socket, + 多个子进程共享同一个 socket 会导致 PostgreSQL 连接损坏。 + dispose() 会关闭继承来的连接,后续操作会自动创建新连接。 + """ + from app.db import engine + engine.dispose() + logger.info("DB connection pool disposed for forked worker process") + + __all__ = ['celery_app'] diff --git a/api/app/tasks.py b/api/app/tasks.py index 8bbbdc6e..87bcaec9 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -251,8 +251,18 @@ def parse_document(file_path: str, document_id: uuid.UUID): # Prepare vision_model for parsing vision_model = _build_vision_model(file_path, db_knowledge) + # 先将文件读入内存,避免解析过程中依赖 NFS 文件持续可访问 + # python-docx 等库在 binary=None 时会用路径直接打开文件, + # 在 NFS/共享存储上可能因缓存失效导致 "Package not found" + try: + with open(file_path, "rb") as f: + file_binary = f.read() + except FileNotFoundError: + raise FileNotFoundError(f"File not found at '{file_path}'") + from app.core.rag.app.naive import chunk res = chunk(filename=file_path, + binary=file_binary, from_page=0, to_page=DEFAULT_PARSE_TO_PAGE, callback=progress_callback,