diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index 97405ab6..d1edf0ec 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -111,6 +111,57 @@ class ImplicitEmotionsStorageRepository: logger.error(f"分批获取用户ID失败: offset={offset}, error={e}") break + def get_users_needing_refresh(self, redis_client, batch_size: int = 100) -> Generator[str, None, None]: + """分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。 + + 筛选逻辑: + - 查询 implicit_emotions_storage 中所有用户的 end_user_id 和 updated_at + - 从 Redis 读取 write_message:last_done:{end_user_id} 的时间戳 + - 若 Redis 中无记录(该用户从未写入过记忆),跳过 + - 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新 + - 若 last_done <= updated_at,说明已是最新,跳过 + + Args: + redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB) + batch_size: 每批次加载的数量 + + Yields: + 需要刷新的用户ID字符串 + """ + from datetime import timezone + offset = 0 + while True: + try: + stmt = ( + select(ImplicitEmotionsStorage.end_user_id, ImplicitEmotionsStorage.updated_at) + .order_by(ImplicitEmotionsStorage.end_user_id) + .limit(batch_size) + .offset(offset) + ) + batch = self.db.execute(stmt).all() + if not batch: + break + + for end_user_id, updated_at in batch: + raw = redis_client.get(f"write_message:last_done:{end_user_id}") + if raw is None: + # 该用户从未有过 write_message 成功记录,跳过 + continue + try: + last_done = datetime.fromisoformat(raw) + # 统一去掉时区信息做 naive 比较 + if last_done.tzinfo is not None: + last_done = last_done.astimezone(timezone.utc).replace(tzinfo=None) + if updated_at is None or last_done > updated_at: + yield end_user_id + except Exception as e: + logger.warning(f"解析 last_done 时间戳失败: end_user_id={end_user_id}, raw={raw}, error={e}") + + offset += batch_size + except Exception as e: + logger.error(f"get_users_needing_refresh 分批查询失败: offset={offset}, error={e}") + break + def get_new_user_ids_today(self, batch_size: int = 100) -> Generator[str, None, None]: """分批次获取当天新增的、尚未初始化隐性记忆和情绪建议数据的用户ID diff --git a/api/app/tasks.py b/api/app/tasks.py index 82904b21..d4afcc68 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1090,6 +1090,25 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s logger.info( f"[CELERY WRITE] Task completed successfully - elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}") + # 记录该用户最后一次 write_message 成功的时间,供 init_implicit_emotions_for_users 做时间轴筛选 + try: + import redis as _redis + from urllib.parse import quote as _quote + _r = _redis.StrictRedis( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=settings.REDIS_DB_CELERY_BACKEND, + password=settings.REDIS_PASSWORD, + decode_responses=True, + ) + _r.set( + f"write_message:last_done:{end_user_id}", + datetime.utcnow().isoformat(), + ex=86400 * 30, # 30天过期 + ) + except Exception as _e: + logger.warning(f"[CELERY WRITE] 写入 last_done 时间戳失败(不影响主流程): {_e}") + return { "status": "SUCCESS", "result": result, @@ -2167,18 +2186,27 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: with get_db_context() as db: try: - # 获取所有已存储数据的用户ID(分批次处理) repo = ImplicitEmotionsStorageRepository(db) - + # 先统计总数用于日志 from sqlalchemy import func total_users = db.execute( select(func.count()).select_from(ImplicitEmotionsStorage) ).scalar() or 0 - logger.info(f"找到 {total_users} 个需要更新的用户") + logger.info(f"表中存量用户总数: {total_users},开始时间轴筛选") - # 遍历每个用户并更新数据(分批次,避免一次性加载所有ID) - for end_user_id in repo.get_all_user_ids(batch_size=100): + # 构建 Redis 同步客户端,用于时间轴筛选 + import redis as _redis + _redis_client = _redis.StrictRedis( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=settings.REDIS_DB_CELERY_BACKEND, + password=settings.REDIS_PASSWORD, + decode_responses=True, + ) + + # 只处理 last_done > updated_at 的用户(有新记忆写入的用户) + for end_user_id in repo.get_users_needing_refresh(_redis_client, batch_size=100): logger.info(f"开始处理用户: {end_user_id}") user_start_time = time.time() @@ -2264,10 +2292,10 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: user_results.append(error_info) logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}") - # ---- 处理增量用户(当天新增、尚未初始化的用户)---- + # ---- 当天新增用户兜底初始化 ---- new_users_initialized = 0 new_users_failed = 0 - logger.info("开始处理当天新增的增量用户初始化") + logger.info("开始处理当天新增用户的兜底初始化") for end_user_id in repo.get_new_user_ids_today(batch_size=100): logger.info(f"开始初始化新用户: {end_user_id}") @@ -2281,35 +2309,27 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id) profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id) await implicit_service.save_profile_cache( - end_user_id=end_user_id, - profile_data=profile_data, - db=db + end_user_id=end_user_id, profile_data=profile_data, db=db ) implicit_success = True logger.info(f"成功初始化新用户 {end_user_id} 的隐性记忆画像") except Exception as e: - error_msg = f"隐性记忆初始化失败: {str(e)}" - errors.append(error_msg) - logger.error(f"新用户 {end_user_id} {error_msg}") + errors.append(f"隐性记忆初始化失败: {str(e)}") + logger.error(f"新用户 {end_user_id} 隐性记忆初始化失败: {e}") try: emotion_service = EmotionAnalyticsService() suggestions_data = await emotion_service.generate_emotion_suggestions( - end_user_id=end_user_id, - db=db, - language="zh" + end_user_id=end_user_id, db=db, language="zh" ) await emotion_service.save_suggestions_cache( - end_user_id=end_user_id, - suggestions_data=suggestions_data, - db=db + end_user_id=end_user_id, suggestions_data=suggestions_data, db=db ) emotion_success = True logger.info(f"成功初始化新用户 {end_user_id} 的情绪建议") except Exception as e: - error_msg = f"情绪建议初始化失败: {str(e)}" - errors.append(error_msg) - logger.error(f"新用户 {end_user_id} {error_msg}") + errors.append(f"情绪建议初始化失败: {str(e)}") + logger.error(f"新用户 {end_user_id} 情绪建议初始化失败: {e}") if implicit_success or emotion_success: new_users_initialized += 1 @@ -2319,7 +2339,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: user_elapsed = time.time() - user_start_time user_results.append({ "end_user_id": end_user_id, - "type": "init", + "type": "new_user_init", "implicit_success": implicit_success, "emotion_success": emotion_success, "errors": errors, @@ -2331,7 +2351,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: user_elapsed = time.time() - user_start_time user_results.append({ "end_user_id": end_user_id, - "type": "init", + "type": "new_user_init", "implicit_success": False, "emotion_success": False, "errors": [str(e)], @@ -2339,27 +2359,24 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: }) logger.error(f"初始化新用户 {end_user_id} 时出错: {str(e)}") - logger.info( - f"增量用户初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}" - ) - # ---- 增量用户处理结束 ---- + logger.info(f"当天新增用户兜底初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}") + # ---- 新增用户兜底初始化结束 ---- - # 记录总体统计信息 logger.info( f"隐性记忆和情绪数据更新定时任务完成: " f"存量用户总数={total_users}, " f"隐性记忆成功={successful_implicit}, " f"情绪建议成功={successful_emotion}, " f"存量失败={failed}, " - f"增量初始化成功={new_users_initialized}, " - f"增量初始化失败={new_users_failed}" + f"新增用户初始化成功={new_users_initialized}, " + f"新增用户初始化失败={new_users_failed}" ) return { "status": "SUCCESS", "message": ( f"存量用户 {total_users} 个,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功;" - f"增量新用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败" + f"当天新增用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败" ), "total_users": total_users, "successful_implicit": successful_implicit, @@ -2367,7 +2384,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: "failed": failed, "new_users_initialized": new_users_initialized, "new_users_failed": new_users_failed, - "user_results": user_results[:50] # 只保留前50个用户的详细结果 + "user_results": user_results[:50] } except Exception as e: @@ -2430,12 +2447,13 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: soft_time_limit=3300, ) def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]: - """按需初始化:为指定用户列表中尚未生成隐性记忆/情绪数据的用户执行首次生成。 + """事件触发任务:对指定用户列表做存在性检查,无记录则执行首次初始化。 - 由 /dashboard/end_users 接口触发,仅处理 implicit_emotions_storage 表中不存在记录的用户。 + 由 /dashboard/end_users 接口触发,已有数据的用户直接跳过。 + 存量用户的数据刷新由定时任务 update_implicit_emotions_storage 负责。 Args: - end_user_ids: 需要检查并初始化的用户ID列表 + end_user_ids: 需要检查的用户ID列表 Returns: 包含任务执行结果的字典 @@ -2459,24 +2477,20 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, repo = ImplicitEmotionsStorageRepository(db) for end_user_id in end_user_ids: - # 幂等检查:已有记录则跳过 existing = repo.get_by_end_user_id(end_user_id) if existing is not None: skipped += 1 continue - logger.info(f"用户 {end_user_id} 无隐性记忆数据,开始初始化") + logger.info(f"用户 {end_user_id} 无记录,开始初始化") implicit_ok = False emotion_ok = False - try: try: implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id) profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id) await implicit_service.save_profile_cache( - end_user_id=end_user_id, - profile_data=profile_data, - db=db + end_user_id=end_user_id, profile_data=profile_data, db=db ) implicit_ok = True except Exception as e: @@ -2485,14 +2499,10 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, try: emotion_service = EmotionAnalyticsService() suggestions_data = await emotion_service.generate_emotion_suggestions( - end_user_id=end_user_id, - db=db, - language="zh" + end_user_id=end_user_id, db=db, language="zh" ) await emotion_service.save_suggestions_cache( - end_user_id=end_user_id, - suggestions_data=suggestions_data, - db=db + end_user_id=end_user_id, suggestions_data=suggestions_data, db=db ) emotion_ok = True except Exception as e: @@ -2502,7 +2512,6 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, initialized += 1 else: failed += 1 - except Exception as e: failed += 1 logger.error(f"用户 {end_user_id} 初始化异常: {e}")