From 2612abc9d07b6c152c12d9a3ec965af7cd80cd7a Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Sat, 7 Mar 2026 13:56:15 +0800 Subject: [PATCH 1/5] [add] Create a Celery task for checking the existence of the "implicit_emotions" data --- api/app/celery_app.py | 1 + .../memory_dashboard_controller.py | 11 + api/app/services/workspace_service.py | 237 +++++++++--------- api/app/tasks.py | 126 ++++++++++ api/docker-compose.yml | 14 ++ 5 files changed, 272 insertions(+), 117 deletions(-) diff --git a/api/app/celery_app.py b/api/app/celery_app.py index 0319e079..7ace1f9b 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -113,6 +113,7 @@ celery_app.conf.update( 'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'}, 'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'}, 'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'}, + 'app.tasks.init_implicit_emotions_for_users': {'queue': 'ondemand_tasks'}, }, ) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 1b5b45fb..1c82b636 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -149,6 +149,17 @@ async def get_workspace_end_users( return {uid: {"total": 0} for uid in end_user_ids} + # 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据 + try: + from app.celery_app import celery_app as _celery_app + _celery_app.send_task( + "app.tasks.init_implicit_emotions_for_users", + kwargs={"end_user_ids": end_user_ids}, + ) + api_logger.info(f"已触发隐性记忆按需初始化任务,候选用户数: {len(end_user_ids)}") + except Exception as e: + api_logger.warning(f"触发隐性记忆按需初始化任务失败(不影响主流程): {e}") + # 并发执行配置查询和记忆数量查询 memory_configs_map, memory_nums_map = await asyncio.gather( get_memory_configs(), diff --git a/api/app/services/workspace_service.py b/api/app/services/workspace_service.py index 74880410..7861ef62 100644 --- a/api/app/services/workspace_service.py +++ b/api/app/services/workspace_service.py @@ -130,6 +130,7 @@ def _create_workspace_only( business_logger.error(f"创建工作空间失败: {workspace.name} - {str(e)}") raise + def create_workspace( db: Session, workspace: WorkspaceCreate, user: User, language: str = "zh" ) -> Workspace: @@ -966,6 +967,125 @@ def update_workspace_models_configs( raise BusinessException(f"更新模型配置失败: {str(e)}", BizCode.INTERNAL_ERROR) +def _fill_workspace_configs_model_defaults( + db: Session, + workspace: Workspace +) -> None: + """Fill empty model fields for all memory configs in a workspace. + + Updates llm_id, embedding_id, rerank_id, reflection_model_id, and emotion_model_id + if they are None, using the corresponding workspace default models. + + Args: + db: Database session + workspace: The workspace containing default model settings + """ + from app.models.memory_config_model import MemoryConfig + + # Get all configs for this workspace + configs = db.query(MemoryConfig).filter( + MemoryConfig.workspace_id == workspace.id + ).all() + + if not configs: + return + + # Map of memory_config field -> workspace field + model_field_mappings = [ + ("llm_id", "llm"), + ("embedding_id", "embedding"), + ("rerank_id", "rerank"), + ("reflection_model_id", "llm"), # reflection uses LLM + ("emotion_model_id", "llm"), # emotion uses LLM + ] + + configs_updated = 0 + + for memory_config in configs: + updated_fields = [] + + for config_field, workspace_field in model_field_mappings: + config_value = getattr(memory_config, config_field, None) + workspace_value = getattr(workspace, workspace_field, None) + + if not config_value and workspace_value: + setattr(memory_config, config_field, workspace_value) + updated_fields.append(config_field) + + if updated_fields: + configs_updated += 1 + business_logger.debug( + f"Updated memory config {memory_config.config_id} fields: {updated_fields}" + ) + + if configs_updated > 0: + try: + db.commit() + business_logger.info( + f"Updated {configs_updated} memory configs in workspace {workspace.id} with default models" + ) + except Exception as e: + db.rollback() + business_logger.error( + f"Failed to update memory configs in workspace {workspace.id}: {str(e)}" + ) + + +def _create_default_memory_config( + db: Session, + workspace_id: uuid.UUID, + workspace_name: str, + llm_id: Optional[uuid.UUID] = None, + embedding_id: Optional[uuid.UUID] = None, + rerank_id: Optional[uuid.UUID] = None, + scene_id: Optional[uuid.UUID] = None, + pruning_scene_name: Optional[str] = None, +) -> None: + """Create a default memory config for a newly created workspace. + + Args: + db: Database session + workspace_id: The workspace ID + workspace_name: The workspace name (used for config naming) + llm_id: Optional LLM model ID + embedding_id: Optional embedding model ID + rerank_id: Optional rerank model ID + scene_id: Optional ontology scene ID (默认关联教育场景) + pruning_scene_name: Optional pruning scene name,取自 ontology_scene.scene_name + """ + from app.models.memory_config_model import MemoryConfig + + config_id = uuid.uuid4() + + default_config = MemoryConfig( + config_id=config_id, + config_name=f"{workspace_name} 默认配置", + config_desc="工作空间创建时自动生成的默认记忆配置", + workspace_id=workspace_id, + llm_id=str(llm_id) if llm_id else None, + embedding_id=str(embedding_id) if embedding_id else None, + rerank_id=str(rerank_id) if rerank_id else None, + scene_id=scene_id, # 关联本体场景ID(默认为"在线教育"场景) + pruning_scene=pruning_scene_name, # 语义剪枝场景直接使用 scene_name + state=True, # Active by default + is_default=True, # Mark as workspace default + ) + + db.add(default_config) + db.flush() # 使用 flush 而不是 commit,让调用者统一提交 + + business_logger.info( + "Created default memory config for workspace", + extra={ + "workspace_id": str(workspace_id), + "config_id": str(config_id), + "config_name": default_config.config_name, + "scene_id": str(scene_id) if scene_id else None, + } + ) + +# ==================== 检查配置相关服务 ==================== + def _ensure_default_memory_config(db: Session, workspace: Workspace) -> None: """Ensure a workspace has a default memory config, creating one if missing. @@ -1045,70 +1165,6 @@ def _ensure_default_memory_config(db: Session, workspace: Workspace) -> None: _fill_workspace_configs_model_defaults(db, workspace) -def _fill_workspace_configs_model_defaults( - db: Session, - workspace: Workspace -) -> None: - """Fill empty model fields for all memory configs in a workspace. - - Updates llm_id, embedding_id, rerank_id, reflection_model_id, and emotion_model_id - if they are None, using the corresponding workspace default models. - - Args: - db: Database session - workspace: The workspace containing default model settings - """ - from app.models.memory_config_model import MemoryConfig - - # Get all configs for this workspace - configs = db.query(MemoryConfig).filter( - MemoryConfig.workspace_id == workspace.id - ).all() - - if not configs: - return - - # Map of memory_config field -> workspace field - model_field_mappings = [ - ("llm_id", "llm"), - ("embedding_id", "embedding"), - ("rerank_id", "rerank"), - ("reflection_model_id", "llm"), # reflection uses LLM - ("emotion_model_id", "llm"), # emotion uses LLM - ] - - configs_updated = 0 - - for memory_config in configs: - updated_fields = [] - - for config_field, workspace_field in model_field_mappings: - config_value = getattr(memory_config, config_field, None) - workspace_value = getattr(workspace, workspace_field, None) - - if not config_value and workspace_value: - setattr(memory_config, config_field, workspace_value) - updated_fields.append(config_field) - - if updated_fields: - configs_updated += 1 - business_logger.debug( - f"Updated memory config {memory_config.config_id} fields: {updated_fields}" - ) - - if configs_updated > 0: - try: - db.commit() - business_logger.info( - f"Updated {configs_updated} memory configs in workspace {workspace.id} with default models" - ) - except Exception as e: - db.rollback() - business_logger.error( - f"Failed to update memory configs in workspace {workspace.id}: {str(e)}" - ) - - def _ensure_default_ontology_scenes(db: Session, workspace: Workspace) -> None: """Ensure a workspace has default ontology scenes, creating them if missing. @@ -1154,56 +1210,3 @@ def _ensure_default_ontology_scenes(db: Session, workspace: Workspace) -> None: f"为工作空间 {workspace.id} 补建默认本体场景异常: {str(e)}" ) - -def _create_default_memory_config( - db: Session, - workspace_id: uuid.UUID, - workspace_name: str, - llm_id: Optional[uuid.UUID] = None, - embedding_id: Optional[uuid.UUID] = None, - rerank_id: Optional[uuid.UUID] = None, - scene_id: Optional[uuid.UUID] = None, - pruning_scene_name: Optional[str] = None, -) -> None: - """Create a default memory config for a newly created workspace. - - Args: - db: Database session - workspace_id: The workspace ID - workspace_name: The workspace name (used for config naming) - llm_id: Optional LLM model ID - embedding_id: Optional embedding model ID - rerank_id: Optional rerank model ID - scene_id: Optional ontology scene ID (默认关联教育场景) - pruning_scene_name: Optional pruning scene name,取自 ontology_scene.scene_name - """ - from app.models.memory_config_model import MemoryConfig - - config_id = uuid.uuid4() - - default_config = MemoryConfig( - config_id=config_id, - config_name=f"{workspace_name} 默认配置", - config_desc="工作空间创建时自动生成的默认记忆配置", - workspace_id=workspace_id, - llm_id=str(llm_id) if llm_id else None, - embedding_id=str(embedding_id) if embedding_id else None, - rerank_id=str(rerank_id) if rerank_id else None, - scene_id=scene_id, # 关联本体场景ID(默认为"在线教育"场景) - pruning_scene=pruning_scene_name, # 语义剪枝场景直接使用 scene_name - state=True, # Active by default - is_default=True, # Mark as workspace default - ) - - db.add(default_config) - db.flush() # 使用 flush 而不是 commit,让调用者统一提交 - - business_logger.info( - "Created default memory config for workspace", - extra={ - "workspace_id": str(workspace_id), - "config_id": str(config_id), - "config_name": default_config.config_name, - "scene_id": str(scene_id) if scene_id else None, - } - ) diff --git a/api/app/tasks.py b/api/app/tasks.py index a6ebbb8e..82904b21 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2416,3 +2416,129 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: "elapsed_time": elapsed_time, "task_id": self.request.id } + + +# ============================================================================= + +@celery_app.task( + name="app.tasks.init_implicit_emotions_for_users", + bind=True, + ignore_result=True, + max_retries=0, + acks_late=False, + time_limit=3600, + soft_time_limit=3300, +) +def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]: + """按需初始化:为指定用户列表中尚未生成隐性记忆/情绪数据的用户执行首次生成。 + + 由 /dashboard/end_users 接口触发,仅处理 implicit_emotions_storage 表中不存在记录的用户。 + + Args: + end_user_ids: 需要检查并初始化的用户ID列表 + + Returns: + 包含任务执行结果的字典 + """ + start_time = time.time() + + async def _run() -> Dict[str, Any]: + from app.core.logging_config import get_logger + from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository + from app.services.implicit_memory_service import ImplicitMemoryService + from app.services.emotion_analytics_service import EmotionAnalyticsService + + logger = get_logger(__name__) + logger.info(f"开始按需初始化隐性记忆/情绪数据,候选用户数: {len(end_user_ids)}") + + initialized = 0 + failed = 0 + skipped = 0 + + with get_db_context() as db: + repo = ImplicitEmotionsStorageRepository(db) + + for end_user_id in end_user_ids: + # 幂等检查:已有记录则跳过 + existing = repo.get_by_end_user_id(end_user_id) + if existing is not None: + skipped += 1 + continue + + logger.info(f"用户 {end_user_id} 无隐性记忆数据,开始初始化") + implicit_ok = False + emotion_ok = False + + try: + try: + implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id) + profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id) + await implicit_service.save_profile_cache( + end_user_id=end_user_id, + profile_data=profile_data, + db=db + ) + implicit_ok = True + except Exception as e: + logger.error(f"用户 {end_user_id} 隐性记忆初始化失败: {e}") + + try: + emotion_service = EmotionAnalyticsService() + suggestions_data = await emotion_service.generate_emotion_suggestions( + end_user_id=end_user_id, + db=db, + language="zh" + ) + await emotion_service.save_suggestions_cache( + end_user_id=end_user_id, + suggestions_data=suggestions_data, + db=db + ) + emotion_ok = True + except Exception as e: + logger.error(f"用户 {end_user_id} 情绪建议初始化失败: {e}") + + if implicit_ok or emotion_ok: + initialized += 1 + else: + failed += 1 + + except Exception as e: + failed += 1 + logger.error(f"用户 {end_user_id} 初始化异常: {e}") + + logger.info(f"按需初始化完成: 初始化={initialized}, 跳过={skipped}, 失败={failed}") + return { + "status": "SUCCESS", + "initialized": initialized, + "skipped": skipped, + "failed": failed, + } + + try: + try: + import nest_asyncio + nest_asyncio.apply() + except ImportError: + pass + + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + result = loop.run_until_complete(_run()) + result["elapsed_time"] = time.time() - start_time + result["task_id"] = self.request.id + return result + except Exception as e: + return { + "status": "FAILURE", + "error": str(e), + "elapsed_time": time.time() - start_time, + "task_id": self.request.id, + } diff --git a/api/docker-compose.yml b/api/docker-compose.yml index 69763de2..1fcfc977 100644 --- a/api/docker-compose.yml +++ b/api/docker-compose.yml @@ -63,6 +63,20 @@ services: networks: - celery + # On-demand worker - API-triggered tasks (e.g. implicit emotions init) + worker-ondemand: + image: redbear-mem-open:latest + container_name: worker-ondemand + env_file: + - .env + volumes: + - ./files:/files + - /etc/localtime:/etc/localtime:ro + command: celery -A app.celery_worker.celery_app worker -E --loglevel=info --pool=prefork --concurrency=4 --queues=ondemand_tasks --max-tasks-per-child=50 -n ondemand_worker@%h + restart: unless-stopped + networks: + - celery + # Celery Beat - scheduler beat: image: redbear-mem-open:latest From c14f067afb5a50c26940b59d9f2d8ab78720042c Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Sat, 7 Mar 2026 16:23:59 +0800 Subject: [PATCH 2/5] [add] The "update-implicit-emotions-storage" task uses the timeline to filter the updated data users. --- .../implicit_emotions_storage_repository.py | 51 +++++++++ api/app/tasks.py | 107 ++++++++++-------- 2 files changed, 109 insertions(+), 49 deletions(-) diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index 97405ab6..d1edf0ec 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -111,6 +111,57 @@ class ImplicitEmotionsStorageRepository: logger.error(f"分批获取用户ID失败: offset={offset}, error={e}") break + def get_users_needing_refresh(self, redis_client, batch_size: int = 100) -> Generator[str, None, None]: + """分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。 + + 筛选逻辑: + - 查询 implicit_emotions_storage 中所有用户的 end_user_id 和 updated_at + - 从 Redis 读取 write_message:last_done:{end_user_id} 的时间戳 + - 若 Redis 中无记录(该用户从未写入过记忆),跳过 + - 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新 + - 若 last_done <= updated_at,说明已是最新,跳过 + + Args: + redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB) + batch_size: 每批次加载的数量 + + Yields: + 需要刷新的用户ID字符串 + """ + from datetime import timezone + offset = 0 + while True: + try: + stmt = ( + select(ImplicitEmotionsStorage.end_user_id, ImplicitEmotionsStorage.updated_at) + .order_by(ImplicitEmotionsStorage.end_user_id) + .limit(batch_size) + .offset(offset) + ) + batch = self.db.execute(stmt).all() + if not batch: + break + + for end_user_id, updated_at in batch: + raw = redis_client.get(f"write_message:last_done:{end_user_id}") + if raw is None: + # 该用户从未有过 write_message 成功记录,跳过 + continue + try: + last_done = datetime.fromisoformat(raw) + # 统一去掉时区信息做 naive 比较 + if last_done.tzinfo is not None: + last_done = last_done.astimezone(timezone.utc).replace(tzinfo=None) + if updated_at is None or last_done > updated_at: + yield end_user_id + except Exception as e: + logger.warning(f"解析 last_done 时间戳失败: end_user_id={end_user_id}, raw={raw}, error={e}") + + offset += batch_size + except Exception as e: + logger.error(f"get_users_needing_refresh 分批查询失败: offset={offset}, error={e}") + break + def get_new_user_ids_today(self, batch_size: int = 100) -> Generator[str, None, None]: """分批次获取当天新增的、尚未初始化隐性记忆和情绪建议数据的用户ID diff --git a/api/app/tasks.py b/api/app/tasks.py index 82904b21..d4afcc68 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1090,6 +1090,25 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s logger.info( f"[CELERY WRITE] Task completed successfully - elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}") + # 记录该用户最后一次 write_message 成功的时间,供 init_implicit_emotions_for_users 做时间轴筛选 + try: + import redis as _redis + from urllib.parse import quote as _quote + _r = _redis.StrictRedis( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=settings.REDIS_DB_CELERY_BACKEND, + password=settings.REDIS_PASSWORD, + decode_responses=True, + ) + _r.set( + f"write_message:last_done:{end_user_id}", + datetime.utcnow().isoformat(), + ex=86400 * 30, # 30天过期 + ) + except Exception as _e: + logger.warning(f"[CELERY WRITE] 写入 last_done 时间戳失败(不影响主流程): {_e}") + return { "status": "SUCCESS", "result": result, @@ -2167,18 +2186,27 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: with get_db_context() as db: try: - # 获取所有已存储数据的用户ID(分批次处理) repo = ImplicitEmotionsStorageRepository(db) - + # 先统计总数用于日志 from sqlalchemy import func total_users = db.execute( select(func.count()).select_from(ImplicitEmotionsStorage) ).scalar() or 0 - logger.info(f"找到 {total_users} 个需要更新的用户") + logger.info(f"表中存量用户总数: {total_users},开始时间轴筛选") - # 遍历每个用户并更新数据(分批次,避免一次性加载所有ID) - for end_user_id in repo.get_all_user_ids(batch_size=100): + # 构建 Redis 同步客户端,用于时间轴筛选 + import redis as _redis + _redis_client = _redis.StrictRedis( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=settings.REDIS_DB_CELERY_BACKEND, + password=settings.REDIS_PASSWORD, + decode_responses=True, + ) + + # 只处理 last_done > updated_at 的用户(有新记忆写入的用户) + for end_user_id in repo.get_users_needing_refresh(_redis_client, batch_size=100): logger.info(f"开始处理用户: {end_user_id}") user_start_time = time.time() @@ -2264,10 +2292,10 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: user_results.append(error_info) logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}") - # ---- 处理增量用户(当天新增、尚未初始化的用户)---- + # ---- 当天新增用户兜底初始化 ---- new_users_initialized = 0 new_users_failed = 0 - logger.info("开始处理当天新增的增量用户初始化") + logger.info("开始处理当天新增用户的兜底初始化") for end_user_id in repo.get_new_user_ids_today(batch_size=100): logger.info(f"开始初始化新用户: {end_user_id}") @@ -2281,35 +2309,27 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id) profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id) await implicit_service.save_profile_cache( - end_user_id=end_user_id, - profile_data=profile_data, - db=db + end_user_id=end_user_id, profile_data=profile_data, db=db ) implicit_success = True logger.info(f"成功初始化新用户 {end_user_id} 的隐性记忆画像") except Exception as e: - error_msg = f"隐性记忆初始化失败: {str(e)}" - errors.append(error_msg) - logger.error(f"新用户 {end_user_id} {error_msg}") + errors.append(f"隐性记忆初始化失败: {str(e)}") + logger.error(f"新用户 {end_user_id} 隐性记忆初始化失败: {e}") try: emotion_service = EmotionAnalyticsService() suggestions_data = await emotion_service.generate_emotion_suggestions( - end_user_id=end_user_id, - db=db, - language="zh" + end_user_id=end_user_id, db=db, language="zh" ) await emotion_service.save_suggestions_cache( - end_user_id=end_user_id, - suggestions_data=suggestions_data, - db=db + end_user_id=end_user_id, suggestions_data=suggestions_data, db=db ) emotion_success = True logger.info(f"成功初始化新用户 {end_user_id} 的情绪建议") except Exception as e: - error_msg = f"情绪建议初始化失败: {str(e)}" - errors.append(error_msg) - logger.error(f"新用户 {end_user_id} {error_msg}") + errors.append(f"情绪建议初始化失败: {str(e)}") + logger.error(f"新用户 {end_user_id} 情绪建议初始化失败: {e}") if implicit_success or emotion_success: new_users_initialized += 1 @@ -2319,7 +2339,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: user_elapsed = time.time() - user_start_time user_results.append({ "end_user_id": end_user_id, - "type": "init", + "type": "new_user_init", "implicit_success": implicit_success, "emotion_success": emotion_success, "errors": errors, @@ -2331,7 +2351,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: user_elapsed = time.time() - user_start_time user_results.append({ "end_user_id": end_user_id, - "type": "init", + "type": "new_user_init", "implicit_success": False, "emotion_success": False, "errors": [str(e)], @@ -2339,27 +2359,24 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: }) logger.error(f"初始化新用户 {end_user_id} 时出错: {str(e)}") - logger.info( - f"增量用户初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}" - ) - # ---- 增量用户处理结束 ---- + logger.info(f"当天新增用户兜底初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}") + # ---- 新增用户兜底初始化结束 ---- - # 记录总体统计信息 logger.info( f"隐性记忆和情绪数据更新定时任务完成: " f"存量用户总数={total_users}, " f"隐性记忆成功={successful_implicit}, " f"情绪建议成功={successful_emotion}, " f"存量失败={failed}, " - f"增量初始化成功={new_users_initialized}, " - f"增量初始化失败={new_users_failed}" + f"新增用户初始化成功={new_users_initialized}, " + f"新增用户初始化失败={new_users_failed}" ) return { "status": "SUCCESS", "message": ( f"存量用户 {total_users} 个,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功;" - f"增量新用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败" + f"当天新增用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败" ), "total_users": total_users, "successful_implicit": successful_implicit, @@ -2367,7 +2384,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: "failed": failed, "new_users_initialized": new_users_initialized, "new_users_failed": new_users_failed, - "user_results": user_results[:50] # 只保留前50个用户的详细结果 + "user_results": user_results[:50] } except Exception as e: @@ -2430,12 +2447,13 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: soft_time_limit=3300, ) def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]: - """按需初始化:为指定用户列表中尚未生成隐性记忆/情绪数据的用户执行首次生成。 + """事件触发任务:对指定用户列表做存在性检查,无记录则执行首次初始化。 - 由 /dashboard/end_users 接口触发,仅处理 implicit_emotions_storage 表中不存在记录的用户。 + 由 /dashboard/end_users 接口触发,已有数据的用户直接跳过。 + 存量用户的数据刷新由定时任务 update_implicit_emotions_storage 负责。 Args: - end_user_ids: 需要检查并初始化的用户ID列表 + end_user_ids: 需要检查的用户ID列表 Returns: 包含任务执行结果的字典 @@ -2459,24 +2477,20 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, repo = ImplicitEmotionsStorageRepository(db) for end_user_id in end_user_ids: - # 幂等检查:已有记录则跳过 existing = repo.get_by_end_user_id(end_user_id) if existing is not None: skipped += 1 continue - logger.info(f"用户 {end_user_id} 无隐性记忆数据,开始初始化") + logger.info(f"用户 {end_user_id} 无记录,开始初始化") implicit_ok = False emotion_ok = False - try: try: implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id) profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id) await implicit_service.save_profile_cache( - end_user_id=end_user_id, - profile_data=profile_data, - db=db + end_user_id=end_user_id, profile_data=profile_data, db=db ) implicit_ok = True except Exception as e: @@ -2485,14 +2499,10 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, try: emotion_service = EmotionAnalyticsService() suggestions_data = await emotion_service.generate_emotion_suggestions( - end_user_id=end_user_id, - db=db, - language="zh" + end_user_id=end_user_id, db=db, language="zh" ) await emotion_service.save_suggestions_cache( - end_user_id=end_user_id, - suggestions_data=suggestions_data, - db=db + end_user_id=end_user_id, suggestions_data=suggestions_data, db=db ) emotion_ok = True except Exception as e: @@ -2502,7 +2512,6 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, initialized += 1 else: failed += 1 - except Exception as e: failed += 1 logger.error(f"用户 {end_user_id} 初始化异常: {e}") From cef14cda9e3819cd485458f4148a37f7582cb76b Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Sat, 7 Mar 2026 16:36:24 +0800 Subject: [PATCH 3/5] [add] Standardize time zones; Reuse a single Redis client; Use "mget" for batch writing requests --- .../implicit_emotions_storage_repository.py | 29 +++++++--- api/app/tasks.py | 58 +++++++++++-------- 2 files changed, 57 insertions(+), 30 deletions(-) diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index d1edf0ec..dfc7061b 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -142,17 +142,32 @@ class ImplicitEmotionsStorageRepository: if not batch: break - for end_user_id, updated_at in batch: - raw = redis_client.get(f"write_message:last_done:{end_user_id}") + # 批量获取当前批次所有用户的 last_done 时间戳(一次网络往返) + keys = [f"write_message:last_done:{end_user_id}" for end_user_id, _ in batch] + raw_values = redis_client.mget(keys) + + for (end_user_id, updated_at), raw in zip(batch, raw_values): if raw is None: - # 该用户从未有过 write_message 成功记录,跳过 continue try: + CST = timezone(timedelta(hours=8)) last_done = datetime.fromisoformat(raw) - # 统一去掉时区信息做 naive 比较 - if last_done.tzinfo is not None: - last_done = last_done.astimezone(timezone.utc).replace(tzinfo=None) - if updated_at is None or last_done > updated_at: + # 统一转为 CST naive 时间做比较 + if last_done.tzinfo is None: + last_done = last_done.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None) + else: + last_done = last_done.astimezone(CST).replace(tzinfo=None) + + if updated_at is None: + yield end_user_id + continue + # updated_at 同样转为 CST naive + if updated_at.tzinfo is None: + updated_at_cst = updated_at.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None) + else: + updated_at_cst = updated_at.astimezone(CST).replace(tzinfo=None) + + if last_done > updated_at_cst: yield end_user_id except Exception as e: logger.warning(f"解析 last_done 时间戳失败: end_user_id={end_user_id}, raw={raw}, error={e}") diff --git a/api/app/tasks.py b/api/app/tasks.py index d4afcc68..0c0fd01e 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -15,6 +15,29 @@ from uuid import UUID import redis import requests +# 模块级同步 Redis 客户端单例,供 Celery 任务共享使用(避免每次任务新建连接) +# 连接 CELERY_BACKEND DB,与 write_message:last_done 时间戳写入保持一致 +def _build_sync_redis_client(): + try: + return redis.StrictRedis( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=settings.REDIS_DB_CELERY_BACKEND, + password=settings.REDIS_PASSWORD, + decode_responses=True, + ) + except Exception: + return None + +_sync_redis_client: redis.StrictRedis = None + +def get_sync_redis_client() -> redis.StrictRedis: + """获取模块级同步 Redis 客户端(懒初始化单例)""" + global _sync_redis_client + if _sync_redis_client is None: + _sync_redis_client = _build_sync_redis_client() + return _sync_redis_client + # Import a unified Celery instance from app.celery_app import celery_app from app.core.config import settings @@ -1090,22 +1113,18 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s logger.info( f"[CELERY WRITE] Task completed successfully - elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}") - # 记录该用户最后一次 write_message 成功的时间,供 init_implicit_emotions_for_users 做时间轴筛选 + # 记录该用户最后一次 write_message 成功的时间,供时间轴筛选使用 try: - import redis as _redis - from urllib.parse import quote as _quote - _r = _redis.StrictRedis( - host=settings.REDIS_HOST, - port=settings.REDIS_PORT, - db=settings.REDIS_DB_CELERY_BACKEND, - password=settings.REDIS_PASSWORD, - decode_responses=True, - ) - _r.set( - f"write_message:last_done:{end_user_id}", - datetime.utcnow().isoformat(), - ex=86400 * 30, # 30天过期 - ) + _r = get_sync_redis_client() + if _r is not None: + from datetime import timezone as _tz, timedelta as _td + _CST = _tz(timedelta(hours=8)) + _now_cst = datetime.now(_CST).replace(tzinfo=None).isoformat() + _r.set( + f"write_message:last_done:{end_user_id}", + _now_cst, + ex=86400 * 30, + ) except Exception as _e: logger.warning(f"[CELERY WRITE] 写入 last_done 时间戳失败(不影响主流程): {_e}") @@ -2196,14 +2215,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: logger.info(f"表中存量用户总数: {total_users},开始时间轴筛选") # 构建 Redis 同步客户端,用于时间轴筛选 - import redis as _redis - _redis_client = _redis.StrictRedis( - host=settings.REDIS_HOST, - port=settings.REDIS_PORT, - db=settings.REDIS_DB_CELERY_BACKEND, - password=settings.REDIS_PASSWORD, - decode_responses=True, - ) + _redis_client = get_sync_redis_client() # 只处理 last_done > updated_at 的用户(有新记忆写入的用户) for end_user_id in repo.get_users_needing_refresh(_redis_client, batch_size=100): From 476632294fe69899eea0a30a464cc7874d1c0f60 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Mon, 9 Mar 2026 14:02:23 +0800 Subject: [PATCH 4/5] [changes] Remove the "worker-ondemand" queue --- api/app/celery_app.py | 2 +- api/app/tasks.py | 2 ++ api/docker-compose.yml | 16 +--------------- 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/api/app/celery_app.py b/api/app/celery_app.py index 7ace1f9b..e6b239dd 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -113,7 +113,7 @@ celery_app.conf.update( 'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'}, 'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'}, 'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'}, - 'app.tasks.init_implicit_emotions_for_users': {'queue': 'ondemand_tasks'}, + 'app.tasks.init_implicit_emotions_for_users': {'queue': 'periodic_tasks'}, }, ) diff --git a/api/app/tasks.py b/api/app/tasks.py index 0c0fd01e..65a0a091 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2457,6 +2457,8 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: acks_late=False, time_limit=3600, soft_time_limit=3300, + # 触发型任务标识,区别于 periodic_tasks 队列中的定时任务 + triggered=True, ) def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]: """事件触发任务:对指定用户列表做存在性检查,无记录则执行首次初始化。 diff --git a/api/docker-compose.yml b/api/docker-compose.yml index 1fcfc977..5d358f2c 100644 --- a/api/docker-compose.yml +++ b/api/docker-compose.yml @@ -49,7 +49,7 @@ services: networks: - celery - # Periodic worker - Scheduled/beat tasks (prefork, low concurrency) + # Periodic worker - Scheduled/beat tasks + API-triggered tasks (prefork, low concurrency) worker-periodic: image: redbear-mem-open:latest container_name: worker-periodic @@ -63,20 +63,6 @@ services: networks: - celery - # On-demand worker - API-triggered tasks (e.g. implicit emotions init) - worker-ondemand: - image: redbear-mem-open:latest - container_name: worker-ondemand - env_file: - - .env - volumes: - - ./files:/files - - /etc/localtime:/etc/localtime:ro - command: celery -A app.celery_worker.celery_app worker -E --loglevel=info --pool=prefork --concurrency=4 --queues=ondemand_tasks --max-tasks-per-child=50 -n ondemand_worker@%h - restart: unless-stopped - networks: - - celery - # Celery Beat - scheduler beat: image: redbear-mem-open:latest From c8065b0c602242cb4233c85d67dd255b744170cc Mon Sep 17 00:00:00 2001 From: Ke Sun Date: Mon, 9 Mar 2026 14:12:53 +0800 Subject: [PATCH 5/5] feat(implicit-emotions): add Redis resilience and connection pooling - Replace single Redis client with connection pool for better concurrency and auto-reconnection - Add graceful degradation when Redis is unavailable (None handling in get_users_needing_refresh) - Add RedisError exception handling with fallback to process all users on mget failures - Add type hints (Optional[redis.StrictRedis]) to Redis client parameters - Add health check and socket timeout configuration to connection pool - Add logging for Redis connection failures and degradation events - Reorganize imports alphabetically for consistency across both files - Update get_sync_redis_client to validate connection with ping() before returning --- .../implicit_emotions_storage_repository.py | 45 +++++++-- api/app/tasks.py | 92 +++++++++++++------ 2 files changed, 102 insertions(+), 35 deletions(-) diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index dfc7061b..58e98dfd 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -5,13 +5,15 @@ Implicit Emotions Storage Repository 事务由调用方控制,仓储层只使用 flush/refresh """ import logging -from datetime import datetime, date, timezone, timedelta -from typing import Optional, Generator -from sqlalchemy.orm import Session -from sqlalchemy import select, not_, exists +from datetime import date, datetime, timedelta, timezone +from typing import Generator, Optional + +import redis +from sqlalchemy import exists, not_, select +from sqlalchemy.orm import Session -from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage from app.models.end_user_model import EndUser +from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage logger = logging.getLogger(__name__) @@ -111,7 +113,7 @@ class ImplicitEmotionsStorageRepository: logger.error(f"分批获取用户ID失败: offset={offset}, error={e}") break - def get_users_needing_refresh(self, redis_client, batch_size: int = 100) -> Generator[str, None, None]: + def get_users_needing_refresh(self, redis_client: Optional[redis.StrictRedis], batch_size: int = 100) -> Generator[str, None, None]: """分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。 筛选逻辑: @@ -120,15 +122,28 @@ class ImplicitEmotionsStorageRepository: - 若 Redis 中无记录(该用户从未写入过记忆),跳过 - 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新 - 若 last_done <= updated_at,说明已是最新,跳过 + + 如果 redis_client 为 None,则降级为返回所有用户(禁用时间过滤)。 Args: - redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB) + redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB),如果为 None 则禁用时间过滤 batch_size: 每批次加载的数量 Yields: 需要刷新的用户ID字符串 """ from datetime import timezone + + from redis.exceptions import RedisError + + # 如果 Redis 不可用,降级为处理所有用户 + if redis_client is None: + logger.warning( + "Redis 客户端不可用,时间过滤已禁用,将处理所有存量用户" + ) + yield from self.get_all_user_ids(batch_size) + return + offset = 0 while True: try: @@ -144,7 +159,18 @@ class ImplicitEmotionsStorageRepository: # 批量获取当前批次所有用户的 last_done 时间戳(一次网络往返) keys = [f"write_message:last_done:{end_user_id}" for end_user_id, _ in batch] - raw_values = redis_client.mget(keys) + + try: + raw_values = redis_client.mget(keys) + except RedisError as e: + logger.error( + f"Redis mget 操作失败: {e},当前批次降级为处理所有用户", + extra={"offset": offset, "batch_size": len(batch)} + ) + # Redis 操作失败,降级为返回当前批次所有用户 + yield from (end_user_id for end_user_id, _ in batch) + offset += batch_size + continue for (end_user_id, updated_at), raw in zip(batch, raw_values): if raw is None: @@ -190,7 +216,8 @@ class ImplicitEmotionsStorageRepository: Yields: 用户ID字符串 """ - from sqlalchemy import cast, String as SAString + from sqlalchemy import String as SAString + from sqlalchemy import cast CST = timezone(timedelta(hours=8)) now_cst = datetime.now(CST) today_start = now_cst.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc).replace(tzinfo=None) diff --git a/api/app/tasks.py b/api/app/tasks.py index 65a0a091..5958d77d 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1,5 +1,6 @@ import asyncio import json +import logging import os import re import shutil @@ -14,29 +15,62 @@ from uuid import UUID import redis import requests +from redis.exceptions import RedisError -# 模块级同步 Redis 客户端单例,供 Celery 任务共享使用(避免每次任务新建连接) +logger = logging.getLogger(__name__) + +# 模块级同步 Redis 连接池,供 Celery 任务共享使用 # 连接 CELERY_BACKEND DB,与 write_message:last_done 时间戳写入保持一致 -def _build_sync_redis_client(): +# 使用连接池而非单例客户端,提供更好的并发性能和自动重连 +_sync_redis_pool: redis.ConnectionPool = None + +def _get_or_create_redis_pool() -> redis.ConnectionPool: + """获取或创建 Redis 连接池(懒初始化)""" + global _sync_redis_pool + if _sync_redis_pool is None: + try: + _sync_redis_pool = redis.ConnectionPool( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=settings.REDIS_DB_CELERY_BACKEND, + password=settings.REDIS_PASSWORD, + decode_responses=True, + max_connections=10, + socket_connect_timeout=5, + socket_timeout=5, + retry_on_timeout=True, + health_check_interval=30, + ) + logger.info("Redis connection pool created for Celery tasks") + except Exception as e: + logger.error(f"Failed to create Redis connection pool: {e}", exc_info=True) + return None + return _sync_redis_pool + +def get_sync_redis_client() -> Optional[redis.StrictRedis]: + """获取同步 Redis 客户端(使用连接池) + + 使用连接池提供的客户端,支持自动重连和健康检查。 + 如果 Redis 不可用,返回 None,调用方应优雅降级。 + + Returns: + redis.StrictRedis: Redis 客户端实例,如果连接失败则返回 None + """ try: - return redis.StrictRedis( - host=settings.REDIS_HOST, - port=settings.REDIS_PORT, - db=settings.REDIS_DB_CELERY_BACKEND, - password=settings.REDIS_PASSWORD, - decode_responses=True, - ) - except Exception: + pool = _get_or_create_redis_pool() + if pool is None: + return None + + client = redis.StrictRedis(connection_pool=pool) + # 验证连接可用性 + client.ping() + return client + except RedisError as e: + logger.error(f"Redis connection failed: {e}", exc_info=True) + return None + except Exception as e: + logger.error(f"Unexpected error getting Redis client: {e}", exc_info=True) return None - -_sync_redis_client: redis.StrictRedis = None - -def get_sync_redis_client() -> redis.StrictRedis: - """获取模块级同步 Redis 客户端(懒初始化单例)""" - global _sync_redis_client - if _sync_redis_client is None: - _sync_redis_client = _build_sync_redis_client() - return _sync_redis_client # Import a unified Celery instance from app.celery_app import celery_app @@ -1117,8 +1151,9 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s try: _r = get_sync_redis_client() if _r is not None: - from datetime import timezone as _tz, timedelta as _td - _CST = _tz(timedelta(hours=8)) + from datetime import timedelta as _td + from datetime import timezone as _tz + _CST = _tz(_td(hours=8)) _now_cst = datetime.now(_CST).replace(tzinfo=None).isoformat() _r.set( f"write_message:last_done:{end_user_id}", @@ -2187,12 +2222,15 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: start_time = time.time() async def _run() -> Dict[str, Any]: + from sqlalchemy import func, select + from app.core.logging_config import get_logger - from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage - from sqlalchemy import select, func - from app.services.implicit_memory_service import ImplicitMemoryService + from app.repositories.implicit_emotions_storage_repository import ( + ImplicitEmotionsStorageRepository, + ) from app.services.emotion_analytics_service import EmotionAnalyticsService + from app.services.implicit_memory_service import ImplicitMemoryService logger = get_logger(__name__) logger.info("开始执行隐性记忆和情绪数据更新定时任务") @@ -2476,9 +2514,11 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, async def _run() -> Dict[str, Any]: from app.core.logging_config import get_logger - from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository - from app.services.implicit_memory_service import ImplicitMemoryService + from app.repositories.implicit_emotions_storage_repository import ( + ImplicitEmotionsStorageRepository, + ) from app.services.emotion_analytics_service import EmotionAnalyticsService + from app.services.implicit_memory_service import ImplicitMemoryService logger = get_logger(__name__) logger.info(f"开始按需初始化隐性记忆/情绪数据,候选用户数: {len(end_user_ids)}")