feat(celery): add comprehensive logging to worker and write task

- Initialize logging system in Celery worker entry point with LoggingConfig
- Add logger instance and startup message to celery_worker.py
- Reorganize imports in tasks.py for better readability and consistency
- Add detailed logging to write_message_task for debugging and monitoring
- Log task start with group_id, config_id, and storage_type parameters
- Log service execution and completion status with results
- Add exception handling with error logging and stack trace capture
- Log task completion time and Celery task ID for performance tracking
- Improves observability and troubleshooting of async task execution
This commit is contained in:
Ke Sun
2026-01-19 19:01:51 +08:00
parent 547ce858e7
commit 63baf3bd40
2 changed files with 25 additions and 4 deletions

View File

@@ -3,6 +3,12 @@ Celery Worker 入口点
用于启动 Celery Worker: celery -A app.celery_worker worker --loglevel=info
"""
from app.celery_app import celery_app
from app.core.logging_config import LoggingConfig, get_logger
# Initialize logging system for Celery worker
LoggingConfig.setup_logging()
logger = get_logger(__name__)
logger.info("Celery worker logging initialized")
# 导入任务模块以注册任务
import app.tasks

View File

@@ -1,27 +1,27 @@
import asyncio
import trio
import json
import os
import re
import time
import uuid
from datetime import datetime, timezone
from math import ceil
from typing import Any, Dict, List, Optional
import re
import redis
import requests
import trio
# Import a unified Celery instance
from app.celery_app import celery_app
from app.core.config import settings
from app.core.rag.graphrag.general.index import init_graphrag, run_graphrag_for_kb
from app.core.rag.graphrag.utils import get_llm_cache, set_llm_cache
from app.core.rag.llm.chat_model import Base
from app.core.rag.llm.cv_model import QWenCV
from app.core.rag.llm.embedding_model import OpenAIEmbed
from app.core.rag.llm.sequence2txt_model import QWenSeq2txt
from app.core.rag.models.chunk import DocumentChunk
from app.core.rag.graphrag.general.index import init_graphrag, run_graphrag_for_kb
from app.core.rag.prompts.generator import question_proposal
from app.core.rag.vdb.elasticsearch.elasticsearch_vector import (
ElasticSearchVectorFactory,
@@ -486,6 +486,10 @@ def write_message_task(self, group_id: str, message: str, config_id: str,storage
Raises:
Exception on failure
"""
from app.core.logging_config import get_logger
logger = get_logger(__name__)
logger.info(f"[CELERY WRITE] Starting write task - group_id={group_id}, config_id={config_id}, storage_type={storage_type}")
start_time = time.time()
# Resolve config_id if None
@@ -506,8 +510,14 @@ def write_message_task(self, group_id: str, message: str, config_id: str,storage
async def _run() -> str:
db = next(get_db())
try:
logger.info(f"[CELERY WRITE] Executing MemoryAgentService.write_memory")
service = MemoryAgentService()
return await service.write_memory(group_id, message, actual_config_id, db, storage_type, user_rag_memory_id)
result = await service.write_memory(group_id, message, actual_config_id, db, storage_type, user_rag_memory_id)
logger.info(f"[CELERY WRITE] Write completed successfully: {result}")
return result
except Exception as e:
logger.error(f"[CELERY WRITE] Write failed: {e}", exc_info=True)
raise
finally:
db.close()
@@ -532,6 +542,8 @@ def write_message_task(self, group_id: str, message: str, config_id: str,storage
result = loop.run_until_complete(_run())
elapsed_time = time.time() - start_time
logger.info(f"[CELERY WRITE] Task completed successfully - elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}")
return {
"status": "SUCCESS",
"result": result,
@@ -548,6 +560,9 @@ def write_message_task(self, group_id: str, message: str, config_id: str,storage
detailed_error = "; ".join(error_messages)
else:
detailed_error = str(e)
logger.error(f"[CELERY WRITE] Task failed - elapsed_time={elapsed_time:.2f}s, error={detailed_error}", exc_info=True)
return {
"status": "FAILURE",
"error": detailed_error,