diff --git a/api/app/celery_app.py b/api/app/celery_app.py index e6b239dd..cac4eff1 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -114,6 +114,7 @@ celery_app.conf.update( 'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'}, 'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'}, 'app.tasks.init_implicit_emotions_for_users': {'queue': 'periodic_tasks'}, + 'app.tasks.init_interest_distribution_for_users': {'queue': 'periodic_tasks'}, }, ) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 1c82b636..50e8ec8f 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -156,9 +156,13 @@ async def get_workspace_end_users( "app.tasks.init_implicit_emotions_for_users", kwargs={"end_user_ids": end_user_ids}, ) - api_logger.info(f"已触发隐性记忆按需初始化任务,候选用户数: {len(end_user_ids)}") + _celery_app.send_task( + "app.tasks.init_interest_distribution_for_users", + kwargs={"end_user_ids": end_user_ids}, + ) + api_logger.info(f"已触发按需初始化任务,候选用户数: {len(end_user_ids)}") except Exception as e: - api_logger.warning(f"触发隐性记忆按需初始化任务失败(不影响主流程): {e}") + api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}") # 并发执行配置查询和记忆数量查询 memory_configs_map, memory_nums_map = await asyncio.gather( diff --git a/api/app/tasks.py b/api/app/tasks.py index 5958d77d..05bc022d 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2605,3 +2605,110 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, "elapsed_time": time.time() - start_time, "task_id": self.request.id, } + + +# ============================================================================= + +@celery_app.task( + name="app.tasks.init_interest_distribution_for_users", + bind=True, + ignore_result=True, + max_retries=0, + acks_late=False, + time_limit=3600, + soft_time_limit=3300, +) +def init_interest_distribution_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]: + """事件触发任务:检查指定用户列表的兴趣分布缓存,无缓存则生成并写入 Redis。 + + 由 /dashboard/end_users 接口触发,已有缓存的用户直接跳过。 + 默认生成中文(zh)兴趣分布数据。 + + Args: + end_user_ids: 需要检查的用户ID列表 + + Returns: + 包含任务执行结果的字典 + """ + start_time = time.time() + + async def _run() -> Dict[str, Any]: + from app.core.logging_config import get_logger + from app.cache.memory.interest_memory import InterestMemoryCache, INTEREST_CACHE_EXPIRE + from app.services.memory_agent_service import MemoryAgentService + + logger = get_logger(__name__) + logger.info(f"开始按需初始化兴趣分布缓存,候选用户数: {len(end_user_ids)}") + + initialized = 0 + failed = 0 + skipped = 0 + language = "zh" + + service = MemoryAgentService() + + with get_db_context() as db: + for end_user_id in end_user_ids: + # 存在性检查:缓存有数据则跳过 + cached = await InterestMemoryCache.get_interest_distribution( + end_user_id=end_user_id, + language=language, + ) + if cached is not None: + skipped += 1 + continue + + logger.info(f"用户 {end_user_id} 无兴趣分布缓存,开始生成") + try: + result = await service.get_interest_distribution_by_user( + end_user_id=end_user_id, + limit=5, + language=language, + ) + await InterestMemoryCache.set_interest_distribution( + end_user_id=end_user_id, + language=language, + data=result, + expire=INTEREST_CACHE_EXPIRE, + ) + initialized += 1 + logger.info(f"用户 {end_user_id} 兴趣分布缓存生成成功") + except Exception as e: + failed += 1 + logger.error(f"用户 {end_user_id} 兴趣分布缓存生成失败: {e}") + + logger.info(f"兴趣分布按需初始化完成: 初始化={initialized}, 跳过={skipped}, 失败={failed}") + return { + "status": "SUCCESS", + "initialized": initialized, + "skipped": skipped, + "failed": failed, + } + + try: + try: + import nest_asyncio + nest_asyncio.apply() + except ImportError: + pass + + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + result = loop.run_until_complete(_run()) + result["elapsed_time"] = time.time() - start_time + result["task_id"] = self.request.id + return result + except Exception as e: + return { + "status": "FAILURE", + "error": str(e), + "elapsed_time": time.time() - start_time, + "task_id": self.request.id, + }