* refactor(celery): optimize task routing and worker configuration - Simplify Celery queue configuration with single default 'io_tasks' queue - Implement task routing strategy separating IO-bound and CPU-bound tasks - Add Flower monitoring support with task event tracking enabled - Add summary node search optimization to only retrieve summary nodes - Clean up unused imports and reorganize import statements for consistency - Update docker-compose configuration to support multi-queue worker setup * chore(celery): simplify flower configuration and add gevent dependency * chore(dependencies): add gevent dependency to requirements - Add gevent==24.11.1 to api/requirements.txt - Gevent is required for async worker support in Celery - Complements existing flower and celery configuration * refactor(celery): simplify async event loop handling and reorganize task queues - Replace complex nest_asyncio and manual event loop management with asyncio.run() in read_message_task, write_message_task, regenerate_memory_cache, and workspace_reflection_task - Rename task queues from io_tasks/cpu_tasks to memory_tasks/document_tasks for better semantic clarity - Update task routing configuration to reflect new queue names for memory agent tasks and document processing tasks - Remove redundant exception handling comments and simplify error handling logic - Update README with improved community support section including GitHub Issues, Pull Requests, Discussions, and WeChat community links - Simplifies event loop management by leveraging asyncio.run() which handles loop creation and cleanup automatically, reducing code complexity and potential race conditions
119 lines
4.1 KiB
Python
119 lines
4.1 KiB
Python
import os
|
||
import platform
|
||
from datetime import timedelta
|
||
from urllib.parse import quote
|
||
|
||
from app.core.config import settings
|
||
from celery import Celery
|
||
|
||
# 创建 Celery 应用实例
|
||
# broker: 任务队列(使用 Redis DB 0)
|
||
# backend: 结果存储(使用 Redis DB 10)
|
||
celery_app = Celery(
|
||
"redbear_tasks",
|
||
broker=f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.CELERY_BROKER}",
|
||
backend=f"redis://:{quote(settings.REDIS_PASSWORD)}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.CELERY_BACKEND}",
|
||
)
|
||
|
||
# Default queue for unrouted tasks
|
||
celery_app.conf.task_default_queue = 'memory_tasks'
|
||
|
||
# macOS 兼容性配置
|
||
if platform.system() == 'Darwin':
|
||
os.environ.setdefault('OBJC_DISABLE_INITIALIZE_FORK_SAFETY', 'YES')
|
||
|
||
# Celery 配置
|
||
celery_app.conf.update(
|
||
# 序列化
|
||
task_serializer='json',
|
||
accept_content=['json'],
|
||
result_serializer='json',
|
||
|
||
# 时区
|
||
timezone='Asia/Shanghai',
|
||
enable_utc=True,
|
||
|
||
# 任务追踪
|
||
task_track_started=True,
|
||
task_ignore_result=False,
|
||
|
||
# 超时设置
|
||
task_time_limit=1800, # 30分钟硬超时
|
||
task_soft_time_limit=1500, # 25分钟软超时
|
||
|
||
# Worker 设置 (per-worker settings are in docker-compose command line)
|
||
worker_prefetch_multiplier=1, # Don't hoard tasks, fairer distribution
|
||
|
||
# 结果过期时间
|
||
result_expires=3600, # 结果保存1小时
|
||
|
||
# 任务确认设置
|
||
task_acks_late=True,
|
||
task_reject_on_worker_lost=True,
|
||
worker_disable_rate_limits=True,
|
||
|
||
# FLower setting
|
||
worker_send_task_events=True,
|
||
task_send_sent_event=True,
|
||
|
||
# task routing
|
||
task_routes={
|
||
# Memory tasks → memory_tasks queue (threads worker)
|
||
'app.core.memory.agent.read_message_priority': {'queue': 'memory_tasks'},
|
||
'app.core.memory.agent.read_message': {'queue': 'memory_tasks'},
|
||
'app.core.memory.agent.write_message': {'queue': 'memory_tasks'},
|
||
|
||
# Document tasks → document_tasks queue (prefork worker)
|
||
'app.core.rag.tasks.parse_document': {'queue': 'document_tasks'},
|
||
'app.core.rag.tasks.build_graphrag_for_kb': {'queue': 'document_tasks'},
|
||
|
||
# Beat/periodic tasks → document_tasks queue (prefork worker)
|
||
'app.tasks.workspace_reflection_task': {'queue': 'document_tasks'},
|
||
'app.tasks.regenerate_memory_cache': {'queue': 'document_tasks'},
|
||
'app.tasks.run_forgetting_cycle_task': {'queue': 'document_tasks'},
|
||
'app.controllers.memory_storage_controller.search_all': {'queue': 'document_tasks'},
|
||
},
|
||
)
|
||
|
||
# 自动发现任务模块
|
||
celery_app.autodiscover_tasks(['app'])
|
||
|
||
# Celery Beat schedule for periodic tasks
|
||
memory_increment_schedule = timedelta(hours=settings.MEMORY_INCREMENT_INTERVAL_HOURS)
|
||
memory_cache_regeneration_schedule = timedelta(hours=settings.MEMORY_CACHE_REGENERATION_HOURS)
|
||
workspace_reflection_schedule = timedelta(seconds=30) # 每30秒运行一次settings.REFLECTION_INTERVAL_TIME
|
||
forgetting_cycle_schedule = timedelta(hours=24) # 每24小时运行一次遗忘周期
|
||
|
||
# 构建定时任务配置
|
||
beat_schedule_config = {
|
||
"run-workspace-reflection": {
|
||
"task": "app.tasks.workspace_reflection_task",
|
||
"schedule": workspace_reflection_schedule,
|
||
"args": (),
|
||
},
|
||
"regenerate-memory-cache": {
|
||
"task": "app.tasks.regenerate_memory_cache",
|
||
"schedule": memory_cache_regeneration_schedule,
|
||
"args": (),
|
||
},
|
||
"run-forgetting-cycle": {
|
||
"task": "app.tasks.run_forgetting_cycle_task",
|
||
"schedule": forgetting_cycle_schedule,
|
||
"kwargs": {
|
||
"config_id": None, # 使用默认配置,可以通过环境变量配置
|
||
},
|
||
},
|
||
}
|
||
|
||
# 如果配置了默认工作空间ID,则添加记忆总量统计任务
|
||
if settings.DEFAULT_WORKSPACE_ID:
|
||
beat_schedule_config["write-total-memory"] = {
|
||
"task": "app.controllers.memory_storage_controller.search_all",
|
||
"schedule": memory_increment_schedule,
|
||
"kwargs": {
|
||
"workspace_id": settings.DEFAULT_WORKSPACE_ID,
|
||
},
|
||
}
|
||
|
||
celery_app.conf.beat_schedule = beat_schedule_config
|