diff --git a/api/app/cache/memory/emotion_memory.py b/api/app/cache/memory/emotion_memory.py deleted file mode 100644 index 45ea90de..00000000 --- a/api/app/cache/memory/emotion_memory.py +++ /dev/null @@ -1,134 +0,0 @@ -""" -Emotion Suggestions Cache - -情绪个性化建议缓存模块 -用于缓存用户的情绪个性化建议数据 -""" -import json -import logging -from typing import Optional, Dict, Any -from datetime import datetime - -from app.aioRedis import aio_redis - -logger = logging.getLogger(__name__) - - -class EmotionMemoryCache: - """情绪建议缓存类""" - - # Key 前缀 - PREFIX = "cache:memory:emotion_memory" - - @classmethod - def _get_key(cls, *parts: str) -> str: - """生成 Redis key - - Args: - *parts: key 的各个部分 - - Returns: - 完整的 Redis key - """ - return ":".join([cls.PREFIX] + list(parts)) - - @classmethod - async def set_emotion_suggestions( - cls, - user_id: str, - suggestions_data: Dict[str, Any], - expire: int = 86400 - ) -> bool: - """设置用户情绪建议缓存 - - Args: - user_id: 用户ID(end_user_id) - suggestions_data: 建议数据字典,包含: - - health_summary: 健康状态摘要 - - suggestions: 建议列表 - - generated_at: 生成时间(可选) - expire: 过期时间(秒),默认24小时(86400秒) - - Returns: - 是否设置成功 - """ - try: - key = cls._get_key("suggestions", user_id) - - # 添加生成时间戳 - if "generated_at" not in suggestions_data: - suggestions_data["generated_at"] = datetime.now().isoformat() - - # 添加缓存标记 - suggestions_data["cached"] = True - - value = json.dumps(suggestions_data, ensure_ascii=False) - await aio_redis.set(key, value, ex=expire) - logger.info(f"设置情绪建议缓存成功: {key}, 过期时间: {expire}秒") - return True - except Exception as e: - logger.error(f"设置情绪建议缓存失败: {e}", exc_info=True) - return False - - @classmethod - async def get_emotion_suggestions(cls, user_id: str) -> Optional[Dict[str, Any]]: - """获取用户情绪建议缓存 - - Args: - user_id: 用户ID(end_user_id) - - Returns: - 建议数据字典,如果不存在或已过期返回 None - """ - try: - key = cls._get_key("suggestions", user_id) - value = await aio_redis.get(key) - - if value: - data = json.loads(value) - logger.info(f"成功获取情绪建议缓存: {key}") - return data - - logger.info(f"情绪建议缓存不存在或已过期: {key}") - return None - except Exception as e: - logger.error(f"获取情绪建议缓存失败: {e}", exc_info=True) - return None - - @classmethod - async def delete_emotion_suggestions(cls, user_id: str) -> bool: - """删除用户情绪建议缓存 - - Args: - user_id: 用户ID(end_user_id) - - Returns: - 是否删除成功 - """ - try: - key = cls._get_key("suggestions", user_id) - result = await aio_redis.delete(key) - logger.info(f"删除情绪建议缓存: {key}, 结果: {result}") - return result > 0 - except Exception as e: - logger.error(f"删除情绪建议缓存失败: {e}", exc_info=True) - return False - - @classmethod - async def get_suggestions_ttl(cls, user_id: str) -> int: - """获取情绪建议缓存的剩余过期时间 - - Args: - user_id: 用户ID(end_user_id) - - Returns: - 剩余秒数,-1表示永不过期,-2表示key不存在 - """ - try: - key = cls._get_key("suggestions", user_id) - ttl = await aio_redis.ttl(key) - logger.debug(f"情绪建议缓存TTL: {key} = {ttl}秒") - return ttl - except Exception as e: - logger.error(f"获取情绪建议缓存TTL失败: {e}") - return -2 diff --git a/api/app/cache/memory/implicit_memory.py b/api/app/cache/memory/implicit_memory.py deleted file mode 100644 index 21f08e9a..00000000 --- a/api/app/cache/memory/implicit_memory.py +++ /dev/null @@ -1,136 +0,0 @@ -""" -Implicit Memory Profile Cache - -隐式记忆用户画像缓存模块 -用于缓存用户的完整画像数据(偏好标签、四维画像、兴趣领域、行为习惯) -""" -import json -import logging -from typing import Optional, Dict, Any -from datetime import datetime - -from app.aioRedis import aio_redis - -logger = logging.getLogger(__name__) - - -class ImplicitMemoryCache: - """隐式记忆用户画像缓存类""" - - # Key 前缀 - PREFIX = "cache:memory:implicit_memory" - - @classmethod - def _get_key(cls, *parts: str) -> str: - """生成 Redis key - - Args: - *parts: key 的各个部分 - - Returns: - 完整的 Redis key - """ - return ":".join([cls.PREFIX] + list(parts)) - - @classmethod - async def set_user_profile( - cls, - user_id: str, - profile_data: Dict[str, Any], - expire: int = 86400 - ) -> bool: - """设置用户完整画像缓存 - - Args: - user_id: 用户ID(end_user_id) - profile_data: 画像数据字典,包含: - - preferences: 偏好标签列表 - - portrait: 四维画像对象 - - interest_areas: 兴趣领域分布对象 - - habits: 行为习惯列表 - - generated_at: 生成时间(可选) - expire: 过期时间(秒),默认24小时(86400秒) - - Returns: - 是否设置成功 - """ - try: - key = cls._get_key("profile", user_id) - - # 添加生成时间戳 - if "generated_at" not in profile_data: - profile_data["generated_at"] = datetime.now().isoformat() - - # 添加缓存标记 - profile_data["cached"] = True - - value = json.dumps(profile_data, ensure_ascii=False) - await aio_redis.set(key, value, ex=expire) - logger.info(f"设置用户画像缓存成功: {key}, 过期时间: {expire}秒") - return True - except Exception as e: - logger.error(f"设置用户画像缓存失败: {e}", exc_info=True) - return False - - @classmethod - async def get_user_profile(cls, user_id: str) -> Optional[Dict[str, Any]]: - """获取用户完整画像缓存 - - Args: - user_id: 用户ID(end_user_id) - - Returns: - 画像数据字典,如果不存在或已过期返回 None - """ - try: - key = cls._get_key("profile", user_id) - value = await aio_redis.get(key) - - if value: - data = json.loads(value) - logger.info(f"成功获取用户画像缓存: {key}") - return data - - logger.info(f"用户画像缓存不存在或已过期: {key}") - return None - except Exception as e: - logger.error(f"获取用户画像缓存失败: {e}", exc_info=True) - return None - - @classmethod - async def delete_user_profile(cls, user_id: str) -> bool: - """删除用户完整画像缓存 - - Args: - user_id: 用户ID(end_user_id) - - Returns: - 是否删除成功 - """ - try: - key = cls._get_key("profile", user_id) - result = await aio_redis.delete(key) - logger.info(f"删除用户画像缓存: {key}, 结果: {result}") - return result > 0 - except Exception as e: - logger.error(f"删除用户画像缓存失败: {e}", exc_info=True) - return False - - @classmethod - async def get_profile_ttl(cls, user_id: str) -> int: - """获取用户画像缓存的剩余过期时间 - - Args: - user_id: 用户ID(end_user_id) - - Returns: - 剩余秒数,-1表示永不过期,-2表示key不存在 - """ - try: - key = cls._get_key("profile", user_id) - ttl = await aio_redis.ttl(key) - logger.debug(f"用户画像缓存TTL: {key} = {ttl}秒") - return ttl - except Exception as e: - logger.error(f"获取用户画像缓存TTL失败: {e}") - return -2 diff --git a/api/app/celery_app.py b/api/app/celery_app.py index 8ef44975..e804d303 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -82,7 +82,8 @@ celery_app.conf.update( 'app.tasks.workspace_reflection_task': {'queue': 'periodic_tasks'}, 'app.tasks.regenerate_memory_cache': {'queue': 'periodic_tasks'}, 'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'}, - 'app.controllers.memory_storage_controller.search_all': {'queue': 'periodic_tasks'}, + 'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'}, + 'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'}, }, ) @@ -95,6 +96,7 @@ memory_cache_regeneration_schedule = timedelta(hours=settings.MEMORY_CACHE_REGEN # 这个30秒的设计不合理 workspace_reflection_schedule = timedelta(seconds=30) # 每30秒运行一次settings.REFLECTION_INTERVAL_TIME forgetting_cycle_schedule = timedelta(hours=24) # 每24小时运行一次遗忘周期 +implicit_emotions_update_schedule = timedelta(hours=24) # 每24小时更新一次隐性记忆和情绪数据 #构建定时任务配置 beat_schedule_config = { @@ -122,9 +124,13 @@ if settings.DEFAULT_WORKSPACE_ID: beat_schedule_config["write-total-memory"] = { "task": "app.controllers.memory_storage_controller.search_all", "schedule": memory_increment_schedule, - "kwargs": { - "workspace_id": settings.DEFAULT_WORKSPACE_ID, - }, - } + "args": (), + }, + "update-implicit-emotions-storage": { + "task": "app.tasks.update_implicit_emotions_storage", + "schedule": implicit_emotions_update_schedule, + "args": (), + }, +} celery_app.conf.beat_schedule = beat_schedule_config diff --git a/api/app/controllers/emotion_controller.py b/api/app/controllers/emotion_controller.py index eb2436d2..02ce7862 100644 --- a/api/app/controllers/emotion_controller.py +++ b/api/app/controllers/emotion_controller.py @@ -208,6 +208,57 @@ async def get_emotion_health( +@router.post("/check-data", response_model=ApiResponse) +async def check_emotion_data_exists( + request: EmotionSuggestionsRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """检查用户情绪建议数据是否存在 + + Args: + request: 包含 end_user_id + db: 数据库会话 + current_user: 当前用户 + + Returns: + 数据存在状态 + """ + try: + api_logger.info( + f"检查用户情绪建议数据是否存在: {request.end_user_id}", + extra={"end_user_id": request.end_user_id} + ) + + # 从数据库获取建议 + data = await emotion_service.get_cached_suggestions( + end_user_id=request.end_user_id, + db=db + ) + + if data is None: + api_logger.info(f"用户 {request.end_user_id} 的情绪建议数据不存在") + return fail( + BizCode.NOT_FOUND, + "情绪建议数据不存在,请点击右上角刷新进行初始化", + {"exists": False} + ) + + api_logger.info(f"用户 {request.end_user_id} 的情绪建议数据存在") + return success(data={"exists": True}, msg="情绪建议数据已存在") + + except Exception as e: + api_logger.error( + f"检查情绪建议数据失败: {str(e)}", + extra={"end_user_id": request.end_user_id}, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"检查情绪建议数据失败: {str(e)}" + ) + + @router.post("/suggestions", response_model=ApiResponse) async def get_emotion_suggestions( request: EmotionSuggestionsRequest, @@ -215,7 +266,7 @@ async def get_emotion_suggestions( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): - """获取个性化情绪建议(从缓存读取) + """获取个性化情绪建议(从数据库读取) Args: request: 包含 end_user_id 和可选的 config_id @@ -223,77 +274,47 @@ async def get_emotion_suggestions( current_user: 当前用户 Returns: - 缓存的个性化情绪建议响应 + 存储的个性化情绪建议响应 """ try: # 使用集中化的语言校验 language = get_language_from_header(language_type) api_logger.info( - f"用户 {current_user.username} 请求获取个性化情绪建议(缓存)", + f"用户 {current_user.username} 请求获取个性化情绪建议", extra={ "end_user_id": request.end_user_id, "config_id": request.config_id } ) - # 从缓存获取建议 + # 从数据库获取建议 data = await emotion_service.get_cached_suggestions( end_user_id=request.end_user_id, db=db ) if data is None: - # 缓存不存在或已过期,自动触发生成 + # 数据不存在,返回提示信息 api_logger.info( - f"用户 {request.end_user_id} 的建议缓存不存在或已过期,自动生成新建议", + f"用户 {request.end_user_id} 的建议数据不存在", extra={"end_user_id": request.end_user_id} ) - try: - data = await emotion_service.generate_emotion_suggestions( - end_user_id=request.end_user_id, - db=db, - language=language - ) - # 保存到缓存 - await emotion_service.save_suggestions_cache( - end_user_id=request.end_user_id, - suggestions_data=data, - db=db, - expires_hours=24 - ) - except (ValueError, KeyError) as gen_e: - # 预期内的业务异常:配置缺失、数据格式问题等 - api_logger.warning( - f"自动生成建议失败(业务异常): {str(gen_e)}", - extra={"end_user_id": request.end_user_id} - ) - return fail( - BizCode.NOT_FOUND, - f"自动生成建议失败: {str(gen_e)}", - "" - ) - except Exception as gen_e: - # 非预期异常:记录完整 traceback 便于排查 - api_logger.error( - f"自动生成建议时发生未预期异常: {str(gen_e)}", - extra={"end_user_id": request.end_user_id}, - exc_info=True - ) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"生成建议时发生内部错误: {str(gen_e)}" - ) + return fail( + BizCode.NOT_FOUND, + "情绪建议数据不存在,请点击右上角刷新进行初始化", + "" + ) api_logger.info( - "个性化建议获取成功(缓存)", + "个性化建议获取成功", extra={ "end_user_id": request.end_user_id, "suggestions_count": len(data.get("suggestions", [])) } ) - return success(data=data, msg="个性化建议获取成功(缓存)") + return success(data=data, msg="个性化建议获取成功") except Exception as e: api_logger.error( @@ -314,7 +335,7 @@ async def generate_emotion_suggestions( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): - """生成个性化情绪建议(调用LLM并缓存) + """生成个性化情绪建议(调用LLM并保存到数据库) Args: request: 包含 end_user_id @@ -342,12 +363,11 @@ async def generate_emotion_suggestions( language=language ) - # 保存到缓存 + # 保存到数据库 await emotion_service.save_suggestions_cache( end_user_id=request.end_user_id, suggestions_data=data, - db=db, - expires_hours=24 + db=db ) api_logger.info( diff --git a/api/app/controllers/implicit_memory_controller.py b/api/app/controllers/implicit_memory_controller.py index 96e437d6..91e634c9 100644 --- a/api/app/controllers/implicit_memory_controller.py +++ b/api/app/controllers/implicit_memory_controller.py @@ -122,6 +122,49 @@ def validate_confidence_threshold(threshold: float) -> None: raise ValueError("confidence_threshold must be between 0.0 and 1.0") +@router.get("/check-data/{end_user_id}", response_model=ApiResponse) +@cur_workspace_access_guard() +async def check_user_data_exists( + end_user_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +) -> ApiResponse: + """ + 检查用户画像数据是否存在 + + Args: + end_user_id: 目标用户ID + + Returns: + 数据存在状态 + """ + api_logger.info(f"检查用户画像数据是否存在: {end_user_id}") + + try: + # Validate inputs + validate_user_id(end_user_id) + + # Create service with user-specific config + service = ImplicitMemoryService(db=db, end_user_id=end_user_id) + + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=end_user_id, db=db) + + if cached_profile is None: + api_logger.info(f"用户 {end_user_id} 的画像数据不存在") + return fail( + BizCode.NOT_FOUND, + "画像数据不存在,请点击右上角刷新进行初始化", + {"exists": False} + ) + + api_logger.info(f"用户 {end_user_id} 的画像数据存在") + return success(data={"exists": True}, msg="画像数据已存在") + + except Exception as e: + return handle_implicit_memory_error(e, "检查画像数据", end_user_id) + + @router.get("/preferences/{end_user_id}", response_model=ApiResponse) @cur_workspace_access_guard() async def get_preference_tags( @@ -159,10 +202,10 @@ async def get_preference_tags( cached_profile = await service.get_cached_profile(end_user_id=end_user_id, db=db) if cached_profile is None: - api_logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期") + api_logger.info(f"用户 {end_user_id} 的画像数据不存在") return fail( BizCode.NOT_FOUND, - "画像缓存不存在或已过期,请右上角刷新生成新画像", + "画像数据不存在,请点击右上角刷新进行初始化", "" ) @@ -230,10 +273,10 @@ async def get_dimension_portrait( cached_profile = await service.get_cached_profile(end_user_id=end_user_id, db=db) if cached_profile is None: - api_logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期") + api_logger.info(f"用户 {end_user_id} 的画像数据不存在") return fail( BizCode.NOT_FOUND, - "画像缓存不存在或已过期,请右上角刷新生成新画像", + "画像数据不存在,请点击右上角刷新进行初始化", "" ) @@ -278,10 +321,10 @@ async def get_interest_area_distribution( cached_profile = await service.get_cached_profile(end_user_id=end_user_id, db=db) if cached_profile is None: - api_logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期") + api_logger.info(f"用户 {end_user_id} 的画像数据不存在") return fail( BizCode.NOT_FOUND, - "画像缓存不存在或已过期,请右上角刷新生成新画像", + "画像数据不存在,请点击右上角刷新进行初始化", "" ) @@ -330,10 +373,10 @@ async def get_behavior_habits( cached_profile = await service.get_cached_profile(end_user_id=end_user_id, db=db) if cached_profile is None: - api_logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期") + api_logger.info(f"用户 {end_user_id} 的画像数据不存在") return fail( BizCode.NOT_FOUND, - "画像缓存不存在或已过期,请右上角刷新生成新画像", + "画像数据不存在,请点击右上角刷新进行初始化", "" ) diff --git a/api/app/models/__init__.py b/api/app/models/__init__.py index b1b723e9..c6098a6d 100644 --- a/api/app/models/__init__.py +++ b/api/app/models/__init__.py @@ -35,6 +35,7 @@ from .ontology_scene import OntologyScene from .ontology_class import OntologyClass from .ontology_scene import OntologyScene from .ontology_class import OntologyClass +from .implicit_emotions_storage_model import ImplicitEmotionsStorage __all__ = [ "Tenants", @@ -90,5 +91,6 @@ __all__ = [ "MemoryPerceptualModel", "ModelBase", "LoadBalanceStrategy", - "Skill" + "Skill", + "ImplicitEmotionsStorage" ] diff --git a/api/app/models/implicit_emotions_storage_model.py b/api/app/models/implicit_emotions_storage_model.py new file mode 100644 index 00000000..57c0fd61 --- /dev/null +++ b/api/app/models/implicit_emotions_storage_model.py @@ -0,0 +1,46 @@ +""" +Implicit Emotions Storage Model + +数据库模型:存储用户的隐性记忆画像和情绪建议数据 +替代原有的Redis缓存方式 +""" +import uuid +from datetime import datetime +from sqlalchemy import Column, String, Text, DateTime, Index +from sqlalchemy.dialects.postgresql import UUID, JSONB +from app.db import Base + + +class ImplicitEmotionsStorage(Base): + """隐性记忆和情绪存储表""" + + __tablename__ = "implicit_emotions_storage" + + # 主键 + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, comment="主键ID") + + # 用户标识 + end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID") + + # 隐性记忆画像数据(JSON格式) + implicit_profile = Column(JSONB, nullable=True, comment="隐性记忆用户画像数据") + + # 情绪建议数据(JSON格式) + emotion_suggestions = Column(JSONB, nullable=True, comment="情绪个性化建议数据") + + # 时间戳 + created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="创建时间") + updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="更新时间") + + # 数据生成时间(用于业务逻辑) + implicit_generated_at = Column(DateTime, nullable=True, comment="隐性记忆画像生成时间") + emotion_generated_at = Column(DateTime, nullable=True, comment="情绪建议生成时间") + + # 索引 + __table_args__ = ( + Index('idx_end_user_id', 'end_user_id'), + Index('idx_updated_at', 'updated_at'), + ) + + def __repr__(self): + return f"" diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py new file mode 100644 index 00000000..fd4b10ce --- /dev/null +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -0,0 +1,169 @@ +""" +Implicit Emotions Storage Repository + +数据访问层:处理隐性记忆和情绪数据的数据库操作 +""" +import logging +from datetime import datetime +from typing import Optional, List +from sqlalchemy.orm import Session +from sqlalchemy import select + +from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage + +logger = logging.getLogger(__name__) + + +class ImplicitEmotionsStorageRepository: + """隐性记忆和情绪存储仓储类""" + + def __init__(self, db: Session): + self.db = db + + def get_by_end_user_id(self, end_user_id: str) -> Optional[ImplicitEmotionsStorage]: + """根据终端用户ID获取存储记录 + + Args: + end_user_id: 终端用户ID + + Returns: + 存储记录,如果不存在返回None + """ + try: + stmt = select(ImplicitEmotionsStorage).where( + ImplicitEmotionsStorage.end_user_id == end_user_id + ) + result = self.db.execute(stmt).scalar_one_or_none() + return result + except Exception as e: + logger.error(f"获取用户存储记录失败: end_user_id={end_user_id}, error={e}") + return None + + def create(self, end_user_id: str) -> ImplicitEmotionsStorage: + """创建新的存储记录 + + Args: + end_user_id: 终端用户ID + + Returns: + 新创建的存储记录 + """ + try: + storage = ImplicitEmotionsStorage( + end_user_id=end_user_id, + created_at=datetime.utcnow(), + updated_at=datetime.utcnow() + ) + self.db.add(storage) + self.db.commit() + self.db.refresh(storage) + logger.info(f"创建用户存储记录成功: end_user_id={end_user_id}") + return storage + except Exception as e: + self.db.rollback() + logger.error(f"创建用户存储记录失败: end_user_id={end_user_id}, error={e}") + raise + + def update_implicit_profile( + self, + end_user_id: str, + profile_data: dict + ) -> Optional[ImplicitEmotionsStorage]: + """更新隐性记忆画像数据 + + Args: + end_user_id: 终端用户ID + profile_data: 画像数据 + + Returns: + 更新后的存储记录 + """ + try: + storage = self.get_by_end_user_id(end_user_id) + + if storage is None: + # 如果记录不存在,创建新记录 + storage = self.create(end_user_id) + + storage.implicit_profile = profile_data + storage.implicit_generated_at = datetime.utcnow() + storage.updated_at = datetime.utcnow() + + self.db.commit() + self.db.refresh(storage) + logger.info(f"更新隐性记忆画像成功: end_user_id={end_user_id}") + return storage + except Exception as e: + self.db.rollback() + logger.error(f"更新隐性记忆画像失败: end_user_id={end_user_id}, error={e}") + raise + + def update_emotion_suggestions( + self, + end_user_id: str, + suggestions_data: dict + ) -> Optional[ImplicitEmotionsStorage]: + """更新情绪建议数据 + + Args: + end_user_id: 终端用户ID + suggestions_data: 建议数据 + + Returns: + 更新后的存储记录 + """ + try: + storage = self.get_by_end_user_id(end_user_id) + + if storage is None: + # 如果记录不存在,创建新记录 + storage = self.create(end_user_id) + + storage.emotion_suggestions = suggestions_data + storage.emotion_generated_at = datetime.utcnow() + storage.updated_at = datetime.utcnow() + + self.db.commit() + self.db.refresh(storage) + logger.info(f"更新情绪建议成功: end_user_id={end_user_id}") + return storage + except Exception as e: + self.db.rollback() + logger.error(f"更新情绪建议失败: end_user_id={end_user_id}, error={e}") + raise + + def get_all_user_ids(self) -> List[str]: + """获取所有已存储数据的用户ID列表 + + Returns: + 用户ID列表 + """ + try: + stmt = select(ImplicitEmotionsStorage.end_user_id) + result = self.db.execute(stmt).scalars().all() + return list(result) + except Exception as e: + logger.error(f"获取所有用户ID失败: error={e}") + return [] + + def delete_by_end_user_id(self, end_user_id: str) -> bool: + """删除用户的存储记录 + + Args: + end_user_id: 终端用户ID + + Returns: + 是否删除成功 + """ + try: + storage = self.get_by_end_user_id(end_user_id) + if storage: + self.db.delete(storage) + self.db.commit() + logger.info(f"删除用户存储记录成功: end_user_id={end_user_id}") + return True + return False + except Exception as e: + self.db.rollback() + logger.error(f"删除用户存储记录失败: end_user_id={end_user_id}, error={e}") + return False diff --git a/api/app/services/emotion_analytics_service.py b/api/app/services/emotion_analytics_service.py index 89e3cab9..099cbfb7 100644 --- a/api/app/services/emotion_analytics_service.py +++ b/api/app/services/emotion_analytics_service.py @@ -843,32 +843,33 @@ class EmotionAnalyticsService: end_user_id: str, db: Session, ) -> Optional[Dict[str, Any]]: - """从 Redis 缓存获取个性化情绪建议 + """从数据库获取个性化情绪建议 Args: end_user_id: 宿主ID(用户组ID) - db: 数据库会话(保留参数以保持接口兼容性) + db: 数据库会话 Returns: - Dict: 缓存的建议数据,如果不存在或已过期返回 None + Dict: 存储的建议数据,如果不存在返回 None """ try: - from app.cache.memory.emotion_memory import EmotionMemoryCache + from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository - logger.info(f"尝试从 Redis 缓存获取情绪建议: user={end_user_id}") + logger.info(f"尝试从数据库获取情绪建议: user={end_user_id}") - # 从 Redis 获取缓存 - cached_data = await EmotionMemoryCache.get_emotion_suggestions(end_user_id) + # 从数据库获取存储记录 + repo = ImplicitEmotionsStorageRepository(db) + storage = repo.get_by_end_user_id(end_user_id) - if cached_data is None: - logger.info(f"用户 {end_user_id} 的建议缓存不存在或已过期") + if storage is None or storage.emotion_suggestions is None: + logger.info(f"用户 {end_user_id} 的建议数据不存在") return None - logger.info(f"成功从 Redis 缓存获取建议: user={end_user_id}") - return cached_data + logger.info(f"成功从数据库获取建议: user={end_user_id}") + return storage.emotion_suggestions except Exception as e: - logger.error(f"从 Redis 缓存获取建议失败: {str(e)}", exc_info=True) + logger.error(f"从数据库获取建议失败: {str(e)}", exc_info=True) return None async def save_suggestions_cache( @@ -876,36 +877,27 @@ class EmotionAnalyticsService: end_user_id: str, suggestions_data: Dict[str, Any], db: Session, - expires_hours: int = 24 + expires_hours: int = 24 # 参数保留以保持接口兼容性 ) -> None: - """保存建议到 Redis 缓存 + """保存建议到数据库 Args: end_user_id: 宿主ID(用户组ID) suggestions_data: 建议数据 - db: 数据库会话(保留参数以保持接口兼容性) - expires_hours: 过期时间(小时),默认24小时 + db: 数据库会话 + expires_hours: 保留参数(兼容性) """ try: - from app.cache.memory.emotion_memory import EmotionMemoryCache + from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository - logger.info(f"保存建议到 Redis 缓存: user={end_user_id}, expires={expires_hours}小时") + logger.info(f"保存建议到数据库: user={end_user_id}") - # 计算过期时间(秒) - expire_seconds = expires_hours * 3600 + # 保存到数据库 + repo = ImplicitEmotionsStorageRepository(db) + repo.update_emotion_suggestions(end_user_id, suggestions_data) - # 保存到 Redis - success = await EmotionMemoryCache.set_emotion_suggestions( - user_id=end_user_id, - suggestions_data=suggestions_data, - expire=expire_seconds - ) - - if success: - logger.info(f"建议缓存保存成功: user={end_user_id}") - else: - logger.warning(f"建议缓存保存失败: user={end_user_id}") + logger.info(f"建议保存成功: user={end_user_id}") except Exception as e: - logger.error(f"保存建议缓存失败: {str(e)}", exc_info=True) - # 不抛出异常,缓存失败不应影响主流程 \ No newline at end of file + logger.error(f"保存建议失败: {str(e)}", exc_info=True) + # 不抛出异常,存储失败不应影响主流程 \ No newline at end of file diff --git a/api/app/services/implicit_memory_service.py b/api/app/services/implicit_memory_service.py index 34ebe880..534f138c 100644 --- a/api/app/services/implicit_memory_service.py +++ b/api/app/services/implicit_memory_service.py @@ -422,32 +422,33 @@ class ImplicitMemoryService: end_user_id: str, db: Session ) -> Optional[dict]: - """从 Redis 缓存获取完整用户画像 + """从数据库获取完整用户画像 Args: end_user_id: 终端用户ID - db: 数据库会话(保留参数以保持接口兼容性) + db: 数据库会话 Returns: - Dict: 缓存的画像数据,如果不存在或已过期返回 None + Dict: 存储的画像数据,如果不存在返回 None """ try: - from app.cache.memory.implicit_memory import ImplicitMemoryCache + from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository - logger.info(f"尝试从 Redis 缓存获取用户画像: user={end_user_id}") + logger.info(f"尝试从数据库获取用户画像: user={end_user_id}") - # 从 Redis 获取缓存 - cached_data = await ImplicitMemoryCache.get_user_profile(end_user_id) + # 从数据库获取存储记录 + repo = ImplicitEmotionsStorageRepository(db) + storage = repo.get_by_end_user_id(end_user_id) - if cached_data is None: - logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期") + if storage is None or storage.implicit_profile is None: + logger.info(f"用户 {end_user_id} 的画像数据不存在") return None - logger.info(f"成功从 Redis 缓存获取用户画像: user={end_user_id}") - return cached_data + logger.info(f"成功从数据库获取用户画像: user={end_user_id}") + return storage.implicit_profile except Exception as e: - logger.error(f"从 Redis 缓存获取用户画像失败: {str(e)}", exc_info=True) + logger.error(f"从数据库获取用户画像失败: {str(e)}", exc_info=True) return None async def save_profile_cache( @@ -455,36 +456,27 @@ class ImplicitMemoryService: end_user_id: str, profile_data: dict, db: Session, - expires_hours: int = 168 # 默认7天 + expires_hours: int = 168 # 参数保留以保持接口兼容性 ) -> None: - """保存用户画像到 Redis 缓存 + """保存用户画像到数据库 Args: end_user_id: 终端用户ID profile_data: 画像数据 - db: 数据库会话(保留参数以保持接口兼容性) - expires_hours: 过期时间(小时),默认168小时(7天) + db: 数据库会话 + expires_hours: 保留参数(兼容性) """ try: - from app.cache.memory.implicit_memory import ImplicitMemoryCache + from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository - logger.info(f"保存用户画像到 Redis 缓存: user={end_user_id}, expires={expires_hours}小时") + logger.info(f"保存用户画像到数据库: user={end_user_id}") - # 计算过期时间(秒) - expire_seconds = expires_hours * 3600 + # 保存到数据库 + repo = ImplicitEmotionsStorageRepository(db) + repo.update_implicit_profile(end_user_id, profile_data) - # 保存到 Redis - success = await ImplicitMemoryCache.set_user_profile( - user_id=end_user_id, - profile_data=profile_data, - expire=expire_seconds - ) - - if success: - logger.info(f"用户画像缓存保存成功: user={end_user_id}") - else: - logger.warning(f"用户画像缓存保存失败: user={end_user_id}") + logger.info(f"用户画像保存成功: user={end_user_id}") except Exception as e: - logger.error(f"保存用户画像缓存失败: {str(e)}", exc_info=True) - # 不抛出异常,缓存失败不应影响主流程 + logger.error(f"保存用户画像失败: {str(e)}", exc_info=True) + # 不抛出异常,存储失败不应影响主流程 diff --git a/api/app/tasks.py b/api/app/tasks.py index d408a0da..5a320c3f 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1924,4 +1924,213 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di # "config_id": config_id, # "elapsed_time": elapsed_time, # "task_id": self.request.id -# } \ No newline at end of file +# } + + +# ============================================================================= +# 隐性记忆和情绪数据更新定时任务 +# ============================================================================= + +@celery_app.task( + name="app.tasks.update_implicit_emotions_storage", + bind=True, + ignore_result=True, + max_retries=0, + acks_late=False, + time_limit=7200, # 2小时硬超时 + soft_time_limit=6900, # 1小时55分钟软超时 +) +def update_implicit_emotions_storage(self) -> Dict[str, Any]: + """定时任务:更新所有用户的隐性记忆画像和情绪建议数据 + + 遍历数据库中所有已存在数据的用户,为每个用户重新生成隐性记忆画像和情绪建议。 + 实现错误隔离,单个用户失败不影响其他用户的处理。 + + Returns: + 包含任务执行结果的字典,包括: + - status: 任务状态 (SUCCESS/FAILURE) + - message: 执行消息 + - total_users: 总用户数 + - successful_implicit: 成功更新隐性记忆的用户数 + - successful_emotion: 成功更新情绪建议的用户数 + - failed: 失败的用户数 + - user_results: 每个用户的详细结果 + - elapsed_time: 执行耗时(秒) + - task_id: 任务ID + """ + start_time = time.time() + + async def _run() -> Dict[str, Any]: + from app.core.logging_config import get_logger + from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository + from app.services.implicit_memory_service import ImplicitMemoryService + from app.services.emotion_analytics_service import EmotionAnalyticsService + + logger = get_logger(__name__) + logger.info("开始执行隐性记忆和情绪数据更新定时任务") + + total_users = 0 + successful_implicit = 0 + successful_emotion = 0 + failed = 0 + user_results = [] + + with get_db_context() as db: + try: + # 获取所有已存储数据的用户ID + repo = ImplicitEmotionsStorageRepository(db) + user_ids = repo.get_all_user_ids() + total_users = len(user_ids) + + logger.info(f"找到 {total_users} 个需要更新的用户") + + # 遍历每个用户并更新数据 + for end_user_id in user_ids: + logger.info(f"开始处理用户: {end_user_id}") + user_start_time = time.time() + + implicit_success = False + emotion_success = False + errors = [] + + try: + # 更新隐性记忆画像 + try: + implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id) + profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id) + await implicit_service.save_profile_cache( + end_user_id=end_user_id, + profile_data=profile_data, + db=db + ) + implicit_success = True + logger.info(f"成功更新用户 {end_user_id} 的隐性记忆画像") + except Exception as e: + error_msg = f"隐性记忆更新失败: {str(e)}" + errors.append(error_msg) + logger.error(f"用户 {end_user_id} {error_msg}") + + # 更新情绪建议 + try: + emotion_service = EmotionAnalyticsService(db=db, end_user_id=end_user_id) + suggestions_data = await emotion_service.generate_emotion_suggestions( + end_user_id=end_user_id, + db=db, + language="zh" + ) + await emotion_service.save_suggestions_cache( + end_user_id=end_user_id, + suggestions_data=suggestions_data, + db=db + ) + emotion_success = True + logger.info(f"成功更新用户 {end_user_id} 的情绪建议") + except Exception as e: + error_msg = f"情绪建议更新失败: {str(e)}" + errors.append(error_msg) + logger.error(f"用户 {end_user_id} {error_msg}") + + # 统计结果 + if implicit_success: + successful_implicit += 1 + if emotion_success: + successful_emotion += 1 + if not implicit_success and not emotion_success: + failed += 1 + + user_elapsed = time.time() - user_start_time + + # 记录用户处理结果 + user_result = { + "end_user_id": end_user_id, + "implicit_success": implicit_success, + "emotion_success": emotion_success, + "errors": errors, + "elapsed_time": user_elapsed + } + user_results.append(user_result) + + logger.info( + f"用户 {end_user_id} 处理完成: " + f"隐性记忆={'成功' if implicit_success else '失败'}, " + f"情绪建议={'成功' if emotion_success else '失败'}, " + f"耗时={user_elapsed:.2f}秒" + ) + + except Exception as e: + # 单个用户失败不影响其他用户(错误隔离) + failed += 1 + user_elapsed = time.time() - user_start_time + error_info = { + "end_user_id": end_user_id, + "implicit_success": False, + "emotion_success": False, + "errors": [str(e)], + "elapsed_time": user_elapsed + } + user_results.append(error_info) + logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}") + + # 记录总体统计信息 + logger.info( + f"隐性记忆和情绪数据更新定时任务完成: " + f"总用户数={total_users}, " + f"隐性记忆成功={successful_implicit}, " + f"情绪建议成功={successful_emotion}, " + f"失败={failed}" + ) + + return { + "status": "SUCCESS", + "message": f"成功处理 {total_users} 个用户,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功", + "total_users": total_users, + "successful_implicit": successful_implicit, + "successful_emotion": successful_emotion, + "failed": failed, + "user_results": user_results[:50] # 只保留前50个用户的详细结果 + } + + except Exception as e: + logger.error(f"隐性记忆和情绪数据更新定时任务执行失败: {str(e)}") + return { + "status": "FAILURE", + "error": str(e), + "total_users": total_users, + "successful_implicit": successful_implicit, + "successful_emotion": successful_emotion, + "failed": failed, + "user_results": user_results[:50] + } + + try: + # 使用 nest_asyncio 来避免事件循环冲突 + try: + import nest_asyncio + nest_asyncio.apply() + except ImportError: + pass + + # 尝试获取现有事件循环,如果不存在则创建新的 + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + result = loop.run_until_complete(_run()) + elapsed_time = time.time() - start_time + result["elapsed_time"] = elapsed_time + result["task_id"] = self.request.id + + return result + except Exception as e: + elapsed_time = time.time() - start_time + return { + "status": "FAILURE", + "error": str(e), + "elapsed_time": elapsed_time, + "task_id": self.request.id + }