fix(celery): resolve environment variable hijacking by Celery CLI
- Rename CELERY_BROKER and CELERY_BACKEND to REDIS_DB_CELERY_BROKER and REDIS_DB_CELERY_BACKEND to avoid Celery CLI prefix matching hijacking - Build canonical broker and backend URLs and force them into os.environ to prevent override by stray environment variables - Add logging for Celery app initialization with sanitized connection details - Update celery_app.py to use pre-built URL variables instead of inline construction - Add documentation reference to celery-env-bug-report.md explaining the environment variable naming convention - Prevents Celery CLI's Click framework from intercepting broker/backend configuration through environment variables
This commit is contained in:
@@ -1,27 +1,49 @@
|
|||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from celery.schedules import crontab
|
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
|
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
from celery.schedules import crontab
|
from celery.schedules import crontab
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
from app.core.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
# macOS fork() safety - must be set before any Celery initialization
|
# macOS fork() safety - must be set before any Celery initialization
|
||||||
if platform.system() == 'Darwin':
|
if platform.system() == 'Darwin':
|
||||||
os.environ.setdefault('OBJC_DISABLE_INITIALIZE_FORK_SAFETY', 'YES')
|
os.environ.setdefault('OBJC_DISABLE_INITIALIZE_FORK_SAFETY', 'YES')
|
||||||
|
|
||||||
# 创建 Celery 应用实例
|
# 创建 Celery 应用实例
|
||||||
# broker: 任务队列(使用 Redis DB 0)
|
# broker: 任务队列(使用 Redis DB,由 CELERY_BROKER_DB 指定)
|
||||||
# backend: 结果存储(使用 Redis DB 10)
|
# backend: 结果存储(使用 Redis DB,由 CELERY_BACKEND_DB 指定)
|
||||||
|
# NOTE: 不要在 .env 中设置 BROKER_URL / RESULT_BACKEND / CELERY_BROKER / CELERY_BACKEND,
|
||||||
|
# 这些名称会被 Celery CLI 的 Click 框架劫持,详见 docs/celery-env-bug-report.md
|
||||||
|
|
||||||
|
# Build canonical broker/backend URLs and force them into os.environ so that
|
||||||
|
# Celery's Settings.broker_url property (which checks CELERY_BROKER_URL first)
|
||||||
|
# cannot be overridden by stray env vars.
|
||||||
|
# See: https://github.com/celery/celery/issues/4284
|
||||||
|
_broker_url = f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB_CELERY_BROKER}"
|
||||||
|
_backend_url = f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB_CELERY_BACKEND}"
|
||||||
|
os.environ["CELERY_BROKER_URL"] = _broker_url
|
||||||
|
os.environ["CELERY_RESULT_BACKEND"] = _backend_url
|
||||||
|
os.environ.pop("BROKER_URL", None)
|
||||||
|
|
||||||
celery_app = Celery(
|
celery_app = Celery(
|
||||||
"redbear_tasks",
|
"redbear_tasks",
|
||||||
broker=f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.CELERY_BROKER}",
|
broker=_broker_url,
|
||||||
backend=f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.CELERY_BACKEND}",
|
backend=_backend_url,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Celery app initialized",
|
||||||
|
extra={
|
||||||
|
"broker": _broker_url.replace(quote(settings.REDIS_PASSWORD), "***"),
|
||||||
|
"backend": _backend_url.replace(quote(settings.REDIS_PASSWORD), "***"),
|
||||||
|
},
|
||||||
|
)
|
||||||
# Default queue for unrouted tasks
|
# Default queue for unrouted tasks
|
||||||
celery_app.conf.task_default_queue = 'memory_tasks'
|
celery_app.conf.task_default_queue = 'memory_tasks'
|
||||||
|
|
||||||
|
|||||||
@@ -190,8 +190,10 @@ class Settings:
|
|||||||
LOG_FILE_MAX_SIZE_MB: int = int(os.getenv("LOG_FILE_MAX_SIZE_MB", "10")) # 10MB
|
LOG_FILE_MAX_SIZE_MB: int = int(os.getenv("LOG_FILE_MAX_SIZE_MB", "10")) # 10MB
|
||||||
|
|
||||||
# Celery configuration (internal)
|
# Celery configuration (internal)
|
||||||
CELERY_BROKER: int = int(os.getenv("CELERY_BROKER", "1"))
|
# NOTE: 变量名不以 CELERY_ 开头,避免被 Celery CLI 的前缀匹配机制劫持
|
||||||
CELERY_BACKEND: int = int(os.getenv("CELERY_BACKEND", "2"))
|
# 详见 docs/celery-env-bug-report.md
|
||||||
|
REDIS_DB_CELERY_BROKER: int = int(os.getenv("REDIS_DB_CELERY_BROKER", "1"))
|
||||||
|
REDIS_DB_CELERY_BACKEND: int = int(os.getenv("REDIS_DB_CELERY_BACKEND", "2"))
|
||||||
|
|
||||||
# SMTP Email Configuration
|
# SMTP Email Configuration
|
||||||
SMTP_SERVER: str = os.getenv("SMTP_SERVER", "smtp.gmail.com")
|
SMTP_SERVER: str = os.getenv("SMTP_SERVER", "smtp.gmail.com")
|
||||||
|
|||||||
Reference in New Issue
Block a user