diff --git a/api/app/celery_worker.py b/api/app/celery_worker.py index baecdb3d..7d3ee686 100644 --- a/api/app/celery_worker.py +++ b/api/app/celery_worker.py @@ -3,6 +3,12 @@ Celery Worker 入口点 用于启动 Celery Worker: celery -A app.celery_worker worker --loglevel=info """ from app.celery_app import celery_app +from app.core.logging_config import LoggingConfig, get_logger + +# Initialize logging system for Celery worker +LoggingConfig.setup_logging() +logger = get_logger(__name__) +logger.info("Celery worker logging initialized") # 导入任务模块以注册任务 import app.tasks diff --git a/api/app/tasks.py b/api/app/tasks.py index 28a882b7..fba9f290 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1,27 +1,27 @@ import asyncio -import trio import json import os +import re import time import uuid from datetime import datetime, timezone from math import ceil from typing import Any, Dict, List, Optional -import re import redis import requests +import trio # Import a unified Celery instance from app.celery_app import celery_app from app.core.config import settings +from app.core.rag.graphrag.general.index import init_graphrag, run_graphrag_for_kb from app.core.rag.graphrag.utils import get_llm_cache, set_llm_cache from app.core.rag.llm.chat_model import Base from app.core.rag.llm.cv_model import QWenCV from app.core.rag.llm.embedding_model import OpenAIEmbed from app.core.rag.llm.sequence2txt_model import QWenSeq2txt from app.core.rag.models.chunk import DocumentChunk -from app.core.rag.graphrag.general.index import init_graphrag, run_graphrag_for_kb from app.core.rag.prompts.generator import question_proposal from app.core.rag.vdb.elasticsearch.elasticsearch_vector import ( ElasticSearchVectorFactory, @@ -486,6 +486,10 @@ def write_message_task(self, group_id: str, message: str, config_id: str,storage Raises: Exception on failure """ + from app.core.logging_config import get_logger + logger = get_logger(__name__) + + logger.info(f"[CELERY WRITE] Starting write task - group_id={group_id}, config_id={config_id}, storage_type={storage_type}") start_time = time.time() # Resolve config_id if None @@ -506,8 +510,14 @@ def write_message_task(self, group_id: str, message: str, config_id: str,storage async def _run() -> str: db = next(get_db()) try: + logger.info(f"[CELERY WRITE] Executing MemoryAgentService.write_memory") service = MemoryAgentService() - return await service.write_memory(group_id, message, actual_config_id, db, storage_type, user_rag_memory_id) + result = await service.write_memory(group_id, message, actual_config_id, db, storage_type, user_rag_memory_id) + logger.info(f"[CELERY WRITE] Write completed successfully: {result}") + return result + except Exception as e: + logger.error(f"[CELERY WRITE] Write failed: {e}", exc_info=True) + raise finally: db.close() @@ -532,6 +542,8 @@ def write_message_task(self, group_id: str, message: str, config_id: str,storage result = loop.run_until_complete(_run()) elapsed_time = time.time() - start_time + logger.info(f"[CELERY WRITE] Task completed successfully - elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}") + return { "status": "SUCCESS", "result": result, @@ -548,6 +560,9 @@ def write_message_task(self, group_id: str, message: str, config_id: str,storage detailed_error = "; ".join(error_messages) else: detailed_error = str(e) + + logger.error(f"[CELERY WRITE] Task failed - elapsed_time={elapsed_time:.2f}s, error={detailed_error}", exc_info=True) + return { "status": "FAILURE", "error": detailed_error,