[add] celery support rbmq
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
|
import re
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
|
|
||||||
@@ -11,21 +12,24 @@ from app.core.logging_config import get_logger
|
|||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _mask_url(url: str) -> str:
|
||||||
|
"""隐藏 URL 中的密码部分,适用于 redis:// 和 amqp:// 等协议"""
|
||||||
|
return re.sub(r'(://[^:]*:)[^@]+(@)', r'\1***\2', url)
|
||||||
|
|
||||||
# macOS fork() safety - must be set before any Celery initialization
|
# macOS fork() safety - must be set before any Celery initialization
|
||||||
if platform.system() == 'Darwin':
|
if platform.system() == 'Darwin':
|
||||||
os.environ.setdefault('OBJC_DISABLE_INITIALIZE_FORK_SAFETY', 'YES')
|
os.environ.setdefault('OBJC_DISABLE_INITIALIZE_FORK_SAFETY', 'YES')
|
||||||
|
|
||||||
# 创建 Celery 应用实例
|
# 创建 Celery 应用实例
|
||||||
# broker: 任务队列(使用 Redis DB,由 CELERY_BROKER_DB 指定)
|
# broker: 优先使用环境变量 CELERY_BROKER_URL(支持 amqp:// 等任意协议),
|
||||||
# backend: 结果存储(使用 Redis DB,由 CELERY_BACKEND_DB 指定)
|
# 未配置则回退到 Redis 方案
|
||||||
|
# backend: 结果存储(使用 Redis)
|
||||||
# NOTE: 不要在 .env 中设置 BROKER_URL / RESULT_BACKEND / CELERY_BROKER / CELERY_BACKEND,
|
# NOTE: 不要在 .env 中设置 BROKER_URL / RESULT_BACKEND / CELERY_BROKER / CELERY_BACKEND,
|
||||||
# 这些名称会被 Celery CLI 的 Click 框架劫持,详见 docs/celery-env-bug-report.md
|
# 这些名称会被 Celery CLI 的 Click 框架劫持,详见 docs/celery-env-bug-report.md
|
||||||
|
|
||||||
# Build canonical broker/backend URLs and force them into os.environ so that
|
_broker_url = os.getenv("CELERY_BROKER_URL") or \
|
||||||
# Celery's Settings.broker_url property (which checks CELERY_BROKER_URL first)
|
f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB_CELERY_BROKER}"
|
||||||
# cannot be overridden by stray env vars.
|
|
||||||
# See: https://github.com/celery/celery/issues/4284
|
|
||||||
_broker_url = f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB_CELERY_BROKER}"
|
|
||||||
_backend_url = f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB_CELERY_BACKEND}"
|
_backend_url = f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB_CELERY_BACKEND}"
|
||||||
os.environ["CELERY_BROKER_URL"] = _broker_url
|
os.environ["CELERY_BROKER_URL"] = _broker_url
|
||||||
os.environ["CELERY_RESULT_BACKEND"] = _backend_url
|
os.environ["CELERY_RESULT_BACKEND"] = _backend_url
|
||||||
@@ -45,8 +49,8 @@ celery_app = Celery(
|
|||||||
logger.info(
|
logger.info(
|
||||||
"Celery app initialized",
|
"Celery app initialized",
|
||||||
extra={
|
extra={
|
||||||
"broker": _broker_url.replace(quote(settings.REDIS_PASSWORD), "***"),
|
"broker": _mask_url(_broker_url),
|
||||||
"backend": _backend_url.replace(quote(settings.REDIS_PASSWORD), "***"),
|
"backend": _mask_url(_backend_url),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
# Default queue for unrouted tasks
|
# Default queue for unrouted tasks
|
||||||
|
|||||||
@@ -231,8 +231,8 @@ class Settings:
|
|||||||
# Celery configuration (internal)
|
# Celery configuration (internal)
|
||||||
# NOTE: 变量名不以 CELERY_ 开头,避免被 Celery CLI 的前缀匹配机制劫持
|
# NOTE: 变量名不以 CELERY_ 开头,避免被 Celery CLI 的前缀匹配机制劫持
|
||||||
# 详见 docs/celery-env-bug-report.md
|
# 详见 docs/celery-env-bug-report.md
|
||||||
# 默认使用 Redis DB 3 (broker) 和 DB 4 (backend),与业务缓存 (DB 1/2) 隔离
|
# 默认使用 Redis 作为 broker 和 backend,与业务缓存隔离
|
||||||
# 多人共用同一 Redis 时,每位开发者应在 .env 中配置不同的 DB 编号避免任务互相干扰
|
# 如需使用 RabbitMQ,在 .env 中设置 CELERY_BROKER_URL=amqp://user:pass@host:5672/vhost
|
||||||
REDIS_DB_CELERY_BROKER: int = int(os.getenv("REDIS_DB_CELERY_BROKER", "3"))
|
REDIS_DB_CELERY_BROKER: int = int(os.getenv("REDIS_DB_CELERY_BROKER", "3"))
|
||||||
REDIS_DB_CELERY_BACKEND: int = int(os.getenv("REDIS_DB_CELERY_BACKEND", "4"))
|
REDIS_DB_CELERY_BACKEND: int = int(os.getenv("REDIS_DB_CELERY_BACKEND", "4"))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user