From 94836ed9af78e60c8417d7f174705822259b989c Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Wed, 4 Mar 2026 12:28:55 +0800 Subject: [PATCH 1/2] [add] Set up scheduled tasks for existing and new users --- api/app/celery_app.py | 11 ++- api/app/core/config.py | 10 +- .../implicit_emotions_storage_repository.py | 48 ++++++++- api/app/tasks.py | 97 ++++++++++++++++++- 4 files changed, 155 insertions(+), 11 deletions(-) diff --git a/api/app/celery_app.py b/api/app/celery_app.py index 33fa1703..ba294651 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -4,6 +4,7 @@ from datetime import timedelta from urllib.parse import quote from celery import Celery +from celery.schedules import crontab from app.core.config import settings @@ -93,10 +94,12 @@ celery_app.autodiscover_tasks(['app']) # Celery Beat schedule for periodic tasks memory_increment_schedule = timedelta(hours=settings.MEMORY_INCREMENT_INTERVAL_HOURS) memory_cache_regeneration_schedule = timedelta(hours=settings.MEMORY_CACHE_REGENERATION_HOURS) -# 这个30秒的设计不合理 -workspace_reflection_schedule = timedelta(seconds=30) # 每30秒运行一次settings.REFLECTION_INTERVAL_TIME -forgetting_cycle_schedule = timedelta(hours=24) # 每24小时运行一次遗忘周期 -implicit_emotions_update_schedule = timedelta(hours=24) # 每24小时更新一次隐性记忆和情绪数据 +workspace_reflection_schedule = timedelta(seconds=settings.WORKSPACE_REFLECTION_INTERVAL_SECONDS) +forgetting_cycle_schedule = timedelta(hours=settings.FORGETTING_CYCLE_INTERVAL_HOURS) +implicit_emotions_update_schedule = crontab( + hour=settings.IMPLICIT_EMOTIONS_UPDATE_HOUR, + minute=settings.IMPLICIT_EMOTIONS_UPDATE_MINUTE, +) #构建定时任务配置 beat_schedule_config = { diff --git a/api/app/core/config.py b/api/app/core/config.py index 3a0c97b4..dc993e24 100644 --- a/api/app/core/config.py +++ b/api/app/core/config.py @@ -208,7 +208,15 @@ class Settings: # Memory Cache Regeneration Configuration MEMORY_CACHE_REGENERATION_HOURS: int = int(os.getenv("MEMORY_CACHE_REGENERATION_HOURS", "24")) - # Memory Module Configuration (internal) + # Periodic Task Schedule Configuration + # workspace_reflection: 每隔多少秒执行一次 + WORKSPACE_REFLECTION_INTERVAL_SECONDS: int = int(os.getenv("WORKSPACE_REFLECTION_INTERVAL_SECONDS", "30")) + # forgetting_cycle: 每隔多少小时执行一次 + FORGETTING_CYCLE_INTERVAL_HOURS: int = int(os.getenv("FORGETTING_CYCLE_INTERVAL_HOURS", "24")) + # implicit_emotions_update: 每天几点执行(小时,0-23) + IMPLICIT_EMOTIONS_UPDATE_HOUR: int = int(os.getenv("IMPLICIT_EMOTIONS_UPDATE_HOUR", "2")) + # implicit_emotions_update: 每天几分执行(分钟,0-59) + IMPLICIT_EMOTIONS_UPDATE_MINUTE: int = int(os.getenv("IMPLICIT_EMOTIONS_UPDATE_MINUTE", "0")) # Memory Module Configuration (internal) MEMORY_OUTPUT_DIR: str = os.getenv("MEMORY_OUTPUT_DIR", "logs/memory-output") MEMORY_CONFIG_DIR: str = os.getenv("MEMORY_CONFIG_DIR", "app/core/memory") diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index 176012b7..1d11f89e 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -5,12 +5,13 @@ Implicit Emotions Storage Repository 事务由调用方控制,仓储层只使用 flush/refresh """ import logging -from datetime import datetime +from datetime import datetime, date from typing import Optional, Generator from sqlalchemy.orm import Session -from sqlalchemy import select +from sqlalchemy import select, not_, exists from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage +from app.models.end_user_model import EndUser logger = logging.getLogger(__name__) @@ -110,6 +111,49 @@ class ImplicitEmotionsStorageRepository: logger.error(f"分批获取用户ID失败: offset={offset}, error={e}") break + def get_new_user_ids_today(self, batch_size: int = 100) -> Generator[str, None, None]: + """分批次获取当天新增的、尚未初始化隐性记忆和情绪建议数据的用户ID + + 查询逻辑:end_users 表中 created_at 为今天,且在 implicit_emotions_storage 中没有对应记录。 + 没有对应记录意味着隐性记忆画像和情绪建议均未初始化,需要对这批用户执行首次初始化。 + end_users.id(UUID)转为字符串后与 implicit_emotions_storage.end_user_id(String)对比。 + + Args: + batch_size: 每批次加载的数量,默认100 + + Yields: + 用户ID字符串 + """ + from sqlalchemy import cast, String as SAString + today_start = datetime.combine(date.today(), datetime.min.time()) + offset = 0 + while True: + try: + stmt = ( + select(EndUser.id) + .where( + EndUser.created_at >= today_start, + not_( + exists( + select(ImplicitEmotionsStorage.end_user_id).where( + ImplicitEmotionsStorage.end_user_id == cast(EndUser.id, SAString) + ) + ) + ) + ) + .order_by(EndUser.id) + .limit(batch_size) + .offset(offset) + ) + batch = self.db.execute(stmt).scalars().all() + if not batch: + break + yield from (str(uid) for uid in batch) + offset += batch_size + except Exception as e: + logger.error(f"分批获取当天新增用户ID失败: offset={offset}, error={e}") + break + def delete_by_end_user_id(self, end_user_id: str) -> bool: """删除用户的存储记录(事务由调用方提交)""" storage = self.get_by_end_user_id(end_user_id) diff --git a/api/app/tasks.py b/api/app/tasks.py index 1675f25d..f30bbb81 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2017,7 +2017,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: # 更新情绪建议 try: - emotion_service = EmotionAnalyticsService(db=db, end_user_id=end_user_id) + emotion_service = EmotionAnalyticsService() suggestions_data = await emotion_service.generate_emotion_suggestions( end_user_id=end_user_id, db=db, @@ -2076,22 +2076,109 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: user_results.append(error_info) logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}") + # ---- 处理增量用户(当天新增、尚未初始化的用户)---- + new_users_initialized = 0 + new_users_failed = 0 + logger.info("开始处理当天新增的增量用户初始化") + + for end_user_id in repo.get_new_user_ids_today(batch_size=100): + logger.info(f"开始初始化新用户: {end_user_id}") + user_start_time = time.time() + implicit_success = False + emotion_success = False + errors = [] + + try: + try: + implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id) + profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id) + await implicit_service.save_profile_cache( + end_user_id=end_user_id, + profile_data=profile_data, + db=db + ) + implicit_success = True + logger.info(f"成功初始化新用户 {end_user_id} 的隐性记忆画像") + except Exception as e: + error_msg = f"隐性记忆初始化失败: {str(e)}" + errors.append(error_msg) + logger.error(f"新用户 {end_user_id} {error_msg}") + + try: + emotion_service = EmotionAnalyticsService() + suggestions_data = await emotion_service.generate_emotion_suggestions( + end_user_id=end_user_id, + db=db, + language="zh" + ) + await emotion_service.save_suggestions_cache( + end_user_id=end_user_id, + suggestions_data=suggestions_data, + db=db + ) + emotion_success = True + logger.info(f"成功初始化新用户 {end_user_id} 的情绪建议") + except Exception as e: + error_msg = f"情绪建议初始化失败: {str(e)}" + errors.append(error_msg) + logger.error(f"新用户 {end_user_id} {error_msg}") + + if implicit_success or emotion_success: + new_users_initialized += 1 + else: + new_users_failed += 1 + + user_elapsed = time.time() - user_start_time + user_results.append({ + "end_user_id": end_user_id, + "type": "init", + "implicit_success": implicit_success, + "emotion_success": emotion_success, + "errors": errors, + "elapsed_time": user_elapsed + }) + + except Exception as e: + new_users_failed += 1 + user_elapsed = time.time() - user_start_time + user_results.append({ + "end_user_id": end_user_id, + "type": "init", + "implicit_success": False, + "emotion_success": False, + "errors": [str(e)], + "elapsed_time": user_elapsed + }) + logger.error(f"初始化新用户 {end_user_id} 时出错: {str(e)}") + + logger.info( + f"增量用户初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}" + ) + # ---- 增量用户处理结束 ---- + # 记录总体统计信息 logger.info( f"隐性记忆和情绪数据更新定时任务完成: " - f"总用户数={total_users}, " + f"存量用户总数={total_users}, " f"隐性记忆成功={successful_implicit}, " f"情绪建议成功={successful_emotion}, " - f"失败={failed}" + f"存量失败={failed}, " + f"增量初始化成功={new_users_initialized}, " + f"增量初始化失败={new_users_failed}" ) return { "status": "SUCCESS", - "message": f"成功处理 {total_users} 个用户,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功", + "message": ( + f"存量用户 {total_users} 个,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功;" + f"增量新用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败" + ), "total_users": total_users, "successful_implicit": successful_implicit, "successful_emotion": successful_emotion, "failed": failed, + "new_users_initialized": new_users_initialized, + "new_users_failed": new_users_failed, "user_results": user_results[:50] # 只保留前50个用户的详细结果 } @@ -2104,6 +2191,8 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: "successful_implicit": successful_implicit, "successful_emotion": successful_emotion, "failed": failed, + "new_users_initialized": 0, + "new_users_failed": 0, "user_results": user_results[:50] } From a726a81224b7f5dca86ee6471c8741d74525c4dc Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Wed, 4 Mar 2026 13:39:21 +0800 Subject: [PATCH 2/2] [changes]Specifies the time zone divisions --- .../repositories/implicit_emotions_storage_repository.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index 1d11f89e..97405ab6 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -5,7 +5,7 @@ Implicit Emotions Storage Repository 事务由调用方控制,仓储层只使用 flush/refresh """ import logging -from datetime import datetime, date +from datetime import datetime, date, timezone, timedelta from typing import Optional, Generator from sqlalchemy.orm import Session from sqlalchemy import select, not_, exists @@ -125,7 +125,10 @@ class ImplicitEmotionsStorageRepository: 用户ID字符串 """ from sqlalchemy import cast, String as SAString - today_start = datetime.combine(date.today(), datetime.min.time()) + CST = timezone(timedelta(hours=8)) + now_cst = datetime.now(CST) + today_start = now_cst.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc).replace(tzinfo=None) + tomorrow_start = today_start + timedelta(days=1) offset = 0 while True: try: @@ -133,6 +136,7 @@ class ImplicitEmotionsStorageRepository: select(EndUser.id) .where( EndUser.created_at >= today_start, + EndUser.created_at < tomorrow_start, not_( exists( select(ImplicitEmotionsStorage.end_user_id).where(