diff --git a/api/app/celery_app.py b/api/app/celery_app.py index e6b239dd..cac4eff1 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -114,6 +114,7 @@ celery_app.conf.update( 'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'}, 'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'}, 'app.tasks.init_implicit_emotions_for_users': {'queue': 'periodic_tasks'}, + 'app.tasks.init_interest_distribution_for_users': {'queue': 'periodic_tasks'}, }, ) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 1c82b636..50e8ec8f 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -156,9 +156,13 @@ async def get_workspace_end_users( "app.tasks.init_implicit_emotions_for_users", kwargs={"end_user_ids": end_user_ids}, ) - api_logger.info(f"已触发隐性记忆按需初始化任务,候选用户数: {len(end_user_ids)}") + _celery_app.send_task( + "app.tasks.init_interest_distribution_for_users", + kwargs={"end_user_ids": end_user_ids}, + ) + api_logger.info(f"已触发按需初始化任务,候选用户数: {len(end_user_ids)}") except Exception as e: - api_logger.warning(f"触发隐性记忆按需初始化任务失败(不影响主流程): {e}") + api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}") # 并发执行配置查询和记忆数量查询 memory_configs_map, memory_nums_map = await asyncio.gather( diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index 58e98dfd..f0871b4b 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -8,6 +8,13 @@ import logging from datetime import date, datetime, timedelta, timezone from typing import Generator, Optional + +class TimeFilterUnavailableError(Exception): + """redis_client 不可用,无法执行时间轴筛选。 + + 调用方捕获此异常后可选择回退到 get_all_user_ids 进行全量处理。 + """ + import redis from sqlalchemy import exists, not_, select from sqlalchemy.orm import Session @@ -113,7 +120,7 @@ class ImplicitEmotionsStorageRepository: logger.error(f"分批获取用户ID失败: offset={offset}, error={e}") break - def get_users_needing_refresh(self, redis_client: Optional[redis.StrictRedis], batch_size: int = 100) -> Generator[str, None, None]: + def get_users_needing_refresh(self, redis_client: redis.StrictRedis, batch_size: int = 100) -> Generator[str, None, None]: """分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。 筛选逻辑: @@ -123,27 +130,21 @@ class ImplicitEmotionsStorageRepository: - 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新 - 若 last_done <= updated_at,说明已是最新,跳过 - 如果 redis_client 为 None,则降级为返回所有用户(禁用时间过滤)。 - Args: - redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB),如果为 None 则禁用时间过滤 + redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB) batch_size: 每批次加载的数量 + Raises: + TimeFilterUnavailableError: redis_client 为 None 时抛出,调用方可捕获并回退到 get_all_user_ids + Yields: 需要刷新的用户ID字符串 """ - from datetime import timezone + if redis_client is None: + raise TimeFilterUnavailableError("redis_client 不可用,无法执行时间轴筛选") from redis.exceptions import RedisError - # 如果 Redis 不可用,降级为处理所有用户 - if redis_client is None: - logger.warning( - "Redis 客户端不可用,时间过滤已禁用,将处理所有存量用户" - ) - yield from self.get_all_user_ids(batch_size) - return - offset = 0 while True: try: @@ -178,16 +179,14 @@ class ImplicitEmotionsStorageRepository: try: CST = timezone(timedelta(hours=8)) last_done = datetime.fromisoformat(raw) - # 统一转为 CST naive 时间做比较 - if last_done.tzinfo is None: - last_done = last_done.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None) - else: + # last_done 写入时已是 CST naive,直接使用,无需转换 + if last_done.tzinfo is not None: last_done = last_done.astimezone(CST).replace(tzinfo=None) if updated_at is None: yield end_user_id continue - # updated_at 同样转为 CST naive + # updated_at 数据库存的是 UTC naive,转为 CST naive 再比较 if updated_at.tzinfo is None: updated_at_cst = updated_at.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None) else: diff --git a/api/app/tasks.py b/api/app/tasks.py index 5958d77d..6fd9c954 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2228,6 +2228,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage from app.repositories.implicit_emotions_storage_repository import ( ImplicitEmotionsStorageRepository, + TimeFilterUnavailableError, ) from app.services.emotion_analytics_service import EmotionAnalyticsService from app.services.implicit_memory_service import ImplicitMemoryService @@ -2256,7 +2257,14 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: _redis_client = get_sync_redis_client() # 只处理 last_done > updated_at 的用户(有新记忆写入的用户) - for end_user_id in repo.get_users_needing_refresh(_redis_client, batch_size=100): + # Redis 不可用时回退到全量处理 + try: + refresh_iter = repo.get_users_needing_refresh(_redis_client, batch_size=100) + except TimeFilterUnavailableError as e: + logger.warning(f"时间轴筛选不可用,回退到全量刷新: {e}") + refresh_iter = repo.get_all_user_ids(batch_size=100) + + for end_user_id in refresh_iter: logger.info(f"开始处理用户: {end_user_id}") user_start_time = time.time() @@ -2605,3 +2613,110 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, "elapsed_time": time.time() - start_time, "task_id": self.request.id, } + + +# ============================================================================= + +@celery_app.task( + name="app.tasks.init_interest_distribution_for_users", + bind=True, + ignore_result=True, + max_retries=0, + acks_late=False, + time_limit=3600, + soft_time_limit=3300, +) +def init_interest_distribution_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]: + """事件触发任务:检查指定用户列表的兴趣分布缓存,无缓存则生成并写入 Redis。 + + 由 /dashboard/end_users 接口触发,已有缓存的用户直接跳过。 + 默认生成中文(zh)兴趣分布数据。 + + Args: + end_user_ids: 需要检查的用户ID列表 + + Returns: + 包含任务执行结果的字典 + """ + start_time = time.time() + + async def _run() -> Dict[str, Any]: + from app.core.logging_config import get_logger + from app.cache.memory.interest_memory import InterestMemoryCache, INTEREST_CACHE_EXPIRE + from app.services.memory_agent_service import MemoryAgentService + + logger = get_logger(__name__) + logger.info(f"开始按需初始化兴趣分布缓存,候选用户数: {len(end_user_ids)}") + + initialized = 0 + failed = 0 + skipped = 0 + language = "zh" + + service = MemoryAgentService() + + with get_db_context() as db: + for end_user_id in end_user_ids: + # 存在性检查:缓存有数据则跳过 + cached = await InterestMemoryCache.get_interest_distribution( + end_user_id=end_user_id, + language=language, + ) + if cached is not None: + skipped += 1 + continue + + logger.info(f"用户 {end_user_id} 无兴趣分布缓存,开始生成") + try: + result = await service.get_interest_distribution_by_user( + end_user_id=end_user_id, + limit=5, + language=language, + ) + await InterestMemoryCache.set_interest_distribution( + end_user_id=end_user_id, + language=language, + data=result, + expire=INTEREST_CACHE_EXPIRE, + ) + initialized += 1 + logger.info(f"用户 {end_user_id} 兴趣分布缓存生成成功") + except Exception as e: + failed += 1 + logger.error(f"用户 {end_user_id} 兴趣分布缓存生成失败: {e}") + + logger.info(f"兴趣分布按需初始化完成: 初始化={initialized}, 跳过={skipped}, 失败={failed}") + return { + "status": "SUCCESS", + "initialized": initialized, + "skipped": skipped, + "failed": failed, + } + + try: + try: + import nest_asyncio + nest_asyncio.apply() + except ImportError: + pass + + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + result = loop.run_until_complete(_run()) + result["elapsed_time"] = time.time() - start_time + result["task_id"] = self.request.id + return result + except Exception as e: + return { + "status": "FAILURE", + "error": str(e), + "elapsed_time": time.time() - start_time, + "task_id": self.request.id, + }