diff --git a/api/app/celery_app.py b/api/app/celery_app.py index 33fa1703..ba294651 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -4,6 +4,7 @@ from datetime import timedelta from urllib.parse import quote from celery import Celery +from celery.schedules import crontab from app.core.config import settings @@ -93,10 +94,12 @@ celery_app.autodiscover_tasks(['app']) # Celery Beat schedule for periodic tasks memory_increment_schedule = timedelta(hours=settings.MEMORY_INCREMENT_INTERVAL_HOURS) memory_cache_regeneration_schedule = timedelta(hours=settings.MEMORY_CACHE_REGENERATION_HOURS) -# 这个30秒的设计不合理 -workspace_reflection_schedule = timedelta(seconds=30) # 每30秒运行一次settings.REFLECTION_INTERVAL_TIME -forgetting_cycle_schedule = timedelta(hours=24) # 每24小时运行一次遗忘周期 -implicit_emotions_update_schedule = timedelta(hours=24) # 每24小时更新一次隐性记忆和情绪数据 +workspace_reflection_schedule = timedelta(seconds=settings.WORKSPACE_REFLECTION_INTERVAL_SECONDS) +forgetting_cycle_schedule = timedelta(hours=settings.FORGETTING_CYCLE_INTERVAL_HOURS) +implicit_emotions_update_schedule = crontab( + hour=settings.IMPLICIT_EMOTIONS_UPDATE_HOUR, + minute=settings.IMPLICIT_EMOTIONS_UPDATE_MINUTE, +) #构建定时任务配置 beat_schedule_config = { diff --git a/api/app/core/config.py b/api/app/core/config.py index 2e6b4136..7392d29a 100644 --- a/api/app/core/config.py +++ b/api/app/core/config.py @@ -207,7 +207,15 @@ class Settings: # Memory Cache Regeneration Configuration MEMORY_CACHE_REGENERATION_HOURS: int = int(os.getenv("MEMORY_CACHE_REGENERATION_HOURS", "24")) - # Memory Module Configuration (internal) + # Periodic Task Schedule Configuration + # workspace_reflection: 每隔多少秒执行一次 + WORKSPACE_REFLECTION_INTERVAL_SECONDS: int = int(os.getenv("WORKSPACE_REFLECTION_INTERVAL_SECONDS", "30")) + # forgetting_cycle: 每隔多少小时执行一次 + FORGETTING_CYCLE_INTERVAL_HOURS: int = int(os.getenv("FORGETTING_CYCLE_INTERVAL_HOURS", "24")) + # implicit_emotions_update: 每天几点执行(小时,0-23) + IMPLICIT_EMOTIONS_UPDATE_HOUR: int = int(os.getenv("IMPLICIT_EMOTIONS_UPDATE_HOUR", "2")) + # implicit_emotions_update: 每天几分执行(分钟,0-59) + IMPLICIT_EMOTIONS_UPDATE_MINUTE: int = int(os.getenv("IMPLICIT_EMOTIONS_UPDATE_MINUTE", "0")) # Memory Module Configuration (internal) MEMORY_OUTPUT_DIR: str = os.getenv("MEMORY_OUTPUT_DIR", "logs/memory-output") MEMORY_CONFIG_DIR: str = os.getenv("MEMORY_CONFIG_DIR", "app/core/memory") diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index 176012b7..97405ab6 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -5,12 +5,13 @@ Implicit Emotions Storage Repository 事务由调用方控制,仓储层只使用 flush/refresh """ import logging -from datetime import datetime +from datetime import datetime, date, timezone, timedelta from typing import Optional, Generator from sqlalchemy.orm import Session -from sqlalchemy import select +from sqlalchemy import select, not_, exists from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage +from app.models.end_user_model import EndUser logger = logging.getLogger(__name__) @@ -110,6 +111,53 @@ class ImplicitEmotionsStorageRepository: logger.error(f"分批获取用户ID失败: offset={offset}, error={e}") break + def get_new_user_ids_today(self, batch_size: int = 100) -> Generator[str, None, None]: + """分批次获取当天新增的、尚未初始化隐性记忆和情绪建议数据的用户ID + + 查询逻辑:end_users 表中 created_at 为今天,且在 implicit_emotions_storage 中没有对应记录。 + 没有对应记录意味着隐性记忆画像和情绪建议均未初始化,需要对这批用户执行首次初始化。 + end_users.id(UUID)转为字符串后与 implicit_emotions_storage.end_user_id(String)对比。 + + Args: + batch_size: 每批次加载的数量,默认100 + + Yields: + 用户ID字符串 + """ + from sqlalchemy import cast, String as SAString + CST = timezone(timedelta(hours=8)) + now_cst = datetime.now(CST) + today_start = now_cst.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc).replace(tzinfo=None) + tomorrow_start = today_start + timedelta(days=1) + offset = 0 + while True: + try: + stmt = ( + select(EndUser.id) + .where( + EndUser.created_at >= today_start, + EndUser.created_at < tomorrow_start, + not_( + exists( + select(ImplicitEmotionsStorage.end_user_id).where( + ImplicitEmotionsStorage.end_user_id == cast(EndUser.id, SAString) + ) + ) + ) + ) + .order_by(EndUser.id) + .limit(batch_size) + .offset(offset) + ) + batch = self.db.execute(stmt).scalars().all() + if not batch: + break + yield from (str(uid) for uid in batch) + offset += batch_size + except Exception as e: + logger.error(f"分批获取当天新增用户ID失败: offset={offset}, error={e}") + break + def delete_by_end_user_id(self, end_user_id: str) -> bool: """删除用户的存储记录(事务由调用方提交)""" storage = self.get_by_end_user_id(end_user_id) diff --git a/api/app/tasks.py b/api/app/tasks.py index 877224b7..ae533489 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2214,7 +2214,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: # 更新情绪建议 try: - emotion_service = EmotionAnalyticsService(db=db, end_user_id=end_user_id) + emotion_service = EmotionAnalyticsService() suggestions_data = await emotion_service.generate_emotion_suggestions( end_user_id=end_user_id, db=db, @@ -2273,22 +2273,109 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: user_results.append(error_info) logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}") + # ---- 处理增量用户(当天新增、尚未初始化的用户)---- + new_users_initialized = 0 + new_users_failed = 0 + logger.info("开始处理当天新增的增量用户初始化") + + for end_user_id in repo.get_new_user_ids_today(batch_size=100): + logger.info(f"开始初始化新用户: {end_user_id}") + user_start_time = time.time() + implicit_success = False + emotion_success = False + errors = [] + + try: + try: + implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id) + profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id) + await implicit_service.save_profile_cache( + end_user_id=end_user_id, + profile_data=profile_data, + db=db + ) + implicit_success = True + logger.info(f"成功初始化新用户 {end_user_id} 的隐性记忆画像") + except Exception as e: + error_msg = f"隐性记忆初始化失败: {str(e)}" + errors.append(error_msg) + logger.error(f"新用户 {end_user_id} {error_msg}") + + try: + emotion_service = EmotionAnalyticsService() + suggestions_data = await emotion_service.generate_emotion_suggestions( + end_user_id=end_user_id, + db=db, + language="zh" + ) + await emotion_service.save_suggestions_cache( + end_user_id=end_user_id, + suggestions_data=suggestions_data, + db=db + ) + emotion_success = True + logger.info(f"成功初始化新用户 {end_user_id} 的情绪建议") + except Exception as e: + error_msg = f"情绪建议初始化失败: {str(e)}" + errors.append(error_msg) + logger.error(f"新用户 {end_user_id} {error_msg}") + + if implicit_success or emotion_success: + new_users_initialized += 1 + else: + new_users_failed += 1 + + user_elapsed = time.time() - user_start_time + user_results.append({ + "end_user_id": end_user_id, + "type": "init", + "implicit_success": implicit_success, + "emotion_success": emotion_success, + "errors": errors, + "elapsed_time": user_elapsed + }) + + except Exception as e: + new_users_failed += 1 + user_elapsed = time.time() - user_start_time + user_results.append({ + "end_user_id": end_user_id, + "type": "init", + "implicit_success": False, + "emotion_success": False, + "errors": [str(e)], + "elapsed_time": user_elapsed + }) + logger.error(f"初始化新用户 {end_user_id} 时出错: {str(e)}") + + logger.info( + f"增量用户初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}" + ) + # ---- 增量用户处理结束 ---- + # 记录总体统计信息 logger.info( f"隐性记忆和情绪数据更新定时任务完成: " - f"总用户数={total_users}, " + f"存量用户总数={total_users}, " f"隐性记忆成功={successful_implicit}, " f"情绪建议成功={successful_emotion}, " - f"失败={failed}" + f"存量失败={failed}, " + f"增量初始化成功={new_users_initialized}, " + f"增量初始化失败={new_users_failed}" ) return { "status": "SUCCESS", - "message": f"成功处理 {total_users} 个用户,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功", + "message": ( + f"存量用户 {total_users} 个,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功;" + f"增量新用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败" + ), "total_users": total_users, "successful_implicit": successful_implicit, "successful_emotion": successful_emotion, "failed": failed, + "new_users_initialized": new_users_initialized, + "new_users_failed": new_users_failed, "user_results": user_results[:50] # 只保留前50个用户的详细结果 } @@ -2301,6 +2388,8 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: "successful_implicit": successful_implicit, "successful_emotion": successful_emotion, "failed": failed, + "new_users_initialized": 0, + "new_users_failed": 0, "user_results": user_results[:50] }