""" Celery Worker 入口点 用于启动 Celery Worker: celery -A app.celery_worker worker --loglevel=info """ from celery.signals import worker_process_init from app.celery_app import celery_app from app.core.logging_config import LoggingConfig, get_logger # Initialize logging system for Celery worker LoggingConfig.setup_logging() logger = get_logger(__name__) logger.info("Celery worker logging initialized") # 导入任务模块以注册任务 import app.tasks @worker_process_init.connect def _reinit_db_pool(**kwargs): """ prefork 子进程启动时重建被 fork 污染的资源。 fork() 后子进程继承了父进程的: 1. SQLAlchemy 连接池 — 多进程共享 TCP socket 导致 DB 连接损坏 2. ThreadPoolExecutor — fork 后线程状态不确定,第二个任务会死锁 """ # 重建 DB 连接池 from app.db import engine engine.dispose() logger.info("DB connection pool disposed for forked worker process") # 重建模块级 ThreadPoolExecutor(fork 后线程池不可用) try: from app.core.rag.deepdoc.parser import figure_parser from concurrent.futures import ThreadPoolExecutor figure_parser.shared_executor = ThreadPoolExecutor(max_workers=10) logger.info("figure_parser.shared_executor recreated") except Exception as e: logger.warning(f"Failed to recreate figure_parser.shared_executor: {e}") try: from app.core.rag.utils import libre_office from concurrent.futures import ThreadPoolExecutor import os max_workers = os.cpu_count() * 2 if os.cpu_count() else 4 libre_office.executor = ThreadPoolExecutor(max_workers=max_workers) logger.info("libre_office.executor recreated") except Exception as e: logger.warning(f"Failed to recreate libre_office.executor: {e}") __all__ = ['celery_app']