[fix] celery task
This commit is contained in:
@@ -19,15 +19,35 @@ import app.tasks
|
|||||||
@worker_process_init.connect
|
@worker_process_init.connect
|
||||||
def _reinit_db_pool(**kwargs):
|
def _reinit_db_pool(**kwargs):
|
||||||
"""
|
"""
|
||||||
prefork 子进程启动时重建 SQLAlchemy 连接池。
|
prefork 子进程启动时重建被 fork 污染的资源。
|
||||||
|
|
||||||
fork() 后子进程继承了父进程的连接池和底层 TCP socket,
|
fork() 后子进程继承了父进程的:
|
||||||
多个子进程共享同一个 socket 会导致 PostgreSQL 连接损坏。
|
1. SQLAlchemy 连接池 — 多进程共享 TCP socket 导致 DB 连接损坏
|
||||||
dispose() 会关闭继承来的连接,后续操作会自动创建新连接。
|
2. ThreadPoolExecutor — fork 后线程状态不确定,第二个任务会死锁
|
||||||
"""
|
"""
|
||||||
|
# 重建 DB 连接池
|
||||||
from app.db import engine
|
from app.db import engine
|
||||||
engine.dispose()
|
engine.dispose()
|
||||||
logger.info("DB connection pool disposed for forked worker process")
|
logger.info("DB connection pool disposed for forked worker process")
|
||||||
|
|
||||||
|
# 重建模块级 ThreadPoolExecutor(fork 后线程池不可用)
|
||||||
|
try:
|
||||||
|
from app.core.rag.deepdoc.parser import figure_parser
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
figure_parser.shared_executor = ThreadPoolExecutor(max_workers=10)
|
||||||
|
logger.info("figure_parser.shared_executor recreated")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to recreate figure_parser.shared_executor: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
from app.core.rag.utils import libre_office
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
import os
|
||||||
|
max_workers = os.cpu_count() * 2 if os.cpu_count() else 4
|
||||||
|
libre_office.executor = ThreadPoolExecutor(max_workers=max_workers)
|
||||||
|
logger.info("libre_office.executor recreated")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to recreate libre_office.executor: {e}")
|
||||||
|
|
||||||
|
|
||||||
__all__ = ['celery_app']
|
__all__ = ['celery_app']
|
||||||
|
|||||||
@@ -33,18 +33,16 @@ def timeout(seconds: float | int | str = None, attempts: int = 2, *, exception:
|
|||||||
thread.daemon = True
|
thread.daemon = True
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
|
effective_timeout = seconds if seconds else 120 # 默认 120 秒超时
|
||||||
for a in range(attempts):
|
for a in range(attempts):
|
||||||
try:
|
try:
|
||||||
if os.environ.get("ENABLE_TIMEOUT_ASSERTION"):
|
result = result_queue.get(timeout=effective_timeout)
|
||||||
result = result_queue.get(timeout=seconds)
|
|
||||||
else:
|
|
||||||
result = result_queue.get()
|
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
raise result
|
raise result
|
||||||
return result
|
return result
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
pass
|
pass
|
||||||
raise TimeoutError(f"Function '{func.__name__}' timed out after {seconds} seconds and {attempts} attempts.")
|
raise TimeoutError(f"Function '{func.__name__}' timed out after {effective_timeout} seconds and {attempts} attempts.")
|
||||||
|
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
async def async_wrapper(*args, **kwargs) -> Any:
|
async def async_wrapper(*args, **kwargs) -> Any:
|
||||||
|
|||||||
@@ -268,18 +268,21 @@ def parse_document(file_path: str, document_id: uuid.UUID):
|
|||||||
try:
|
try:
|
||||||
with open(file_path, "rb") as f:
|
with open(file_path, "rb") as f:
|
||||||
file_binary = f.read()
|
file_binary = f.read()
|
||||||
|
if not file_binary:
|
||||||
|
# NFS 上文件存在但内容为空(可能还在同步中)
|
||||||
|
raise IOError(f"File is empty (0 bytes), NFS may still be syncing: {file_path}")
|
||||||
break
|
break
|
||||||
except FileNotFoundError:
|
except (FileNotFoundError, IOError) as e:
|
||||||
if waited >= max_wait_seconds:
|
if waited >= max_wait_seconds:
|
||||||
raise FileNotFoundError(
|
raise type(e)(
|
||||||
f"File not found at '{file_path}' after waiting {max_wait_seconds}s "
|
f"File not accessible at '{file_path}' after waiting {max_wait_seconds}s: {e}"
|
||||||
f"(NFS cache may be stale)"
|
|
||||||
)
|
)
|
||||||
logger.warning(f"File not visible yet on this node, retrying in {wait_interval}s: {file_path}")
|
logger.warning(f"File not ready on this node, retrying in {wait_interval}s: {file_path} ({e})")
|
||||||
time.sleep(wait_interval)
|
time.sleep(wait_interval)
|
||||||
waited += wait_interval
|
waited += wait_interval
|
||||||
|
|
||||||
from app.core.rag.app.naive import chunk
|
from app.core.rag.app.naive import chunk
|
||||||
|
logger.info(f"[ParseDoc] file_binary size={len(file_binary)} bytes, type={type(file_binary).__name__}, bool={bool(file_binary)}")
|
||||||
res = chunk(filename=file_path,
|
res = chunk(filename=file_path,
|
||||||
binary=file_binary,
|
binary=file_binary,
|
||||||
from_page=0,
|
from_page=0,
|
||||||
|
|||||||
Reference in New Issue
Block a user