diff --git a/api/app/cache/__init__.py b/api/app/cache/__init__.py index a79d4cb2..748ce8ae 100644 --- a/api/app/cache/__init__.py +++ b/api/app/cache/__init__.py @@ -2,10 +2,7 @@ Cache 缓存模块 提供各种缓存功能的统一入口 +注意:隐性记忆和情绪建议已迁移到数据库存储,不再使用Redis缓存 """ -from .memory import EmotionMemoryCache, ImplicitMemoryCache -__all__ = [ - "EmotionMemoryCache", - "ImplicitMemoryCache", -] +__all__ = [] diff --git a/api/app/cache/memory/__init__.py b/api/app/cache/memory/__init__.py index 4ada3153..35f45aad 100644 --- a/api/app/cache/memory/__init__.py +++ b/api/app/cache/memory/__init__.py @@ -2,11 +2,7 @@ Memory 缓存模块 提供记忆系统相关的缓存功能 +注意:隐性记忆和情绪建议已迁移到数据库存储,不再使用Redis缓存 """ -from .emotion_memory import EmotionMemoryCache -from .implicit_memory import ImplicitMemoryCache -__all__ = [ - "EmotionMemoryCache", - "ImplicitMemoryCache", -] +__all__ = [] diff --git a/api/app/controllers/emotion_controller.py b/api/app/controllers/emotion_controller.py index 02ce7862..0a8b5fc8 100644 --- a/api/app/controllers/emotion_controller.py +++ b/api/app/controllers/emotion_controller.py @@ -262,7 +262,6 @@ async def check_emotion_data_exists( @router.post("/suggestions", response_model=ApiResponse) async def get_emotion_suggestions( request: EmotionSuggestionsRequest, - language_type: str = Header(default=None, alias="X-Language-Type"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): @@ -277,9 +276,6 @@ async def get_emotion_suggestions( 存储的个性化情绪建议响应 """ try: - # 使用集中化的语言校验 - language = get_language_from_header(language_type) - api_logger.info( f"用户 {current_user.username} 请求获取个性化情绪建议", extra={ @@ -295,15 +291,13 @@ async def get_emotion_suggestions( ) if data is None: - # 数据不存在,返回提示信息 api_logger.info( f"用户 {request.end_user_id} 的建议数据不存在", extra={"end_user_id": request.end_user_id} ) - return fail( - BizCode.NOT_FOUND, - "情绪建议数据不存在,请点击右上角刷新进行初始化", - "" + return success( + data={"exists": False}, + msg="情绪建议数据不存在,请点击右上角刷新进行初始化" ) api_logger.info( diff --git a/api/app/controllers/implicit_memory_controller.py b/api/app/controllers/implicit_memory_controller.py index 91e634c9..76a87c5f 100644 --- a/api/app/controllers/implicit_memory_controller.py +++ b/api/app/controllers/implicit_memory_controller.py @@ -152,10 +152,9 @@ async def check_user_data_exists( if cached_profile is None: api_logger.info(f"用户 {end_user_id} 的画像数据不存在") - return fail( - BizCode.NOT_FOUND, - "画像数据不存在,请点击右上角刷新进行初始化", - {"exists": False} + return success( + data={"exists": False}, + msg="画像数据不存在,请点击右上角刷新进行初始化" ) api_logger.info(f"用户 {end_user_id} 的画像数据存在") @@ -203,11 +202,7 @@ async def get_preference_tags( if cached_profile is None: api_logger.info(f"用户 {end_user_id} 的画像数据不存在") - return fail( - BizCode.NOT_FOUND, - "画像数据不存在,请点击右上角刷新进行初始化", - "" - ) + return fail(BizCode.NOT_FOUND, "", "") # Extract preferences from cache preferences = cached_profile.get("preferences", []) @@ -274,11 +269,7 @@ async def get_dimension_portrait( if cached_profile is None: api_logger.info(f"用户 {end_user_id} 的画像数据不存在") - return fail( - BizCode.NOT_FOUND, - "画像数据不存在,请点击右上角刷新进行初始化", - "" - ) + return fail(BizCode.NOT_FOUND, "", "") # Extract portrait from cache portrait = cached_profile.get("portrait", {}) @@ -322,11 +313,7 @@ async def get_interest_area_distribution( if cached_profile is None: api_logger.info(f"用户 {end_user_id} 的画像数据不存在") - return fail( - BizCode.NOT_FOUND, - "画像数据不存在,请点击右上角刷新进行初始化", - "" - ) + return fail(BizCode.NOT_FOUND, "", "") # Extract interest areas from cache interest_areas = cached_profile.get("interest_areas", {}) @@ -374,11 +361,7 @@ async def get_behavior_habits( if cached_profile is None: api_logger.info(f"用户 {end_user_id} 的画像数据不存在") - return fail( - BizCode.NOT_FOUND, - "画像数据不存在,请点击右上角刷新进行初始化", - "" - ) + return fail(BizCode.NOT_FOUND, "", "") # Extract habits from cache habits = cached_profile.get("habits", []) diff --git a/api/app/models/implicit_emotions_storage_model.py b/api/app/models/implicit_emotions_storage_model.py index 57c0fd61..cf654950 100644 --- a/api/app/models/implicit_emotions_storage_model.py +++ b/api/app/models/implicit_emotions_storage_model.py @@ -19,8 +19,8 @@ class ImplicitEmotionsStorage(Base): # 主键 id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, comment="主键ID") - # 用户标识 - end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID") + # 用户标识(unique=True会自动创建唯一索引) + end_user_id = Column(String(255), nullable=False, unique=True, comment="终端用户ID") # 隐性记忆画像数据(JSON格式) implicit_profile = Column(JSONB, nullable=True, comment="隐性记忆用户画像数据") @@ -36,9 +36,8 @@ class ImplicitEmotionsStorage(Base): implicit_generated_at = Column(DateTime, nullable=True, comment="隐性记忆画像生成时间") emotion_generated_at = Column(DateTime, nullable=True, comment="情绪建议生成时间") - # 索引 + # 索引(只为updated_at创建索引,end_user_id的unique约束已自动创建索引) __table_args__ = ( - Index('idx_end_user_id', 'end_user_id'), Index('idx_updated_at', 'updated_at'), ) diff --git a/api/app/repositories/implicit_emotions_storage_repository.py b/api/app/repositories/implicit_emotions_storage_repository.py index fd4b10ce..176012b7 100644 --- a/api/app/repositories/implicit_emotions_storage_repository.py +++ b/api/app/repositories/implicit_emotions_storage_repository.py @@ -2,10 +2,11 @@ Implicit Emotions Storage Repository 数据访问层:处理隐性记忆和情绪数据的数据库操作 +事务由调用方控制,仓储层只使用 flush/refresh """ import logging from datetime import datetime -from typing import Optional, List +from typing import Optional, Generator from sqlalchemy.orm import Session from sqlalchemy import select @@ -16,154 +17,105 @@ logger = logging.getLogger(__name__) class ImplicitEmotionsStorageRepository: """隐性记忆和情绪存储仓储类""" - + def __init__(self, db: Session): self.db = db - + def get_by_end_user_id(self, end_user_id: str) -> Optional[ImplicitEmotionsStorage]: - """根据终端用户ID获取存储记录 - - Args: - end_user_id: 终端用户ID - - Returns: - 存储记录,如果不存在返回None - """ + """根据终端用户ID获取存储记录""" try: stmt = select(ImplicitEmotionsStorage).where( ImplicitEmotionsStorage.end_user_id == end_user_id ) - result = self.db.execute(stmt).scalar_one_or_none() - return result + return self.db.execute(stmt).scalar_one_or_none() except Exception as e: logger.error(f"获取用户存储记录失败: end_user_id={end_user_id}, error={e}") return None - + def create(self, end_user_id: str) -> ImplicitEmotionsStorage: - """创建新的存储记录 - - Args: - end_user_id: 终端用户ID - - Returns: - 新创建的存储记录 - """ - try: - storage = ImplicitEmotionsStorage( - end_user_id=end_user_id, - created_at=datetime.utcnow(), - updated_at=datetime.utcnow() - ) - self.db.add(storage) - self.db.commit() - self.db.refresh(storage) - logger.info(f"创建用户存储记录成功: end_user_id={end_user_id}") - return storage - except Exception as e: - self.db.rollback() - logger.error(f"创建用户存储记录失败: end_user_id={end_user_id}, error={e}") - raise - + """创建新的存储记录(事务由调用方提交)""" + storage = ImplicitEmotionsStorage( + end_user_id=end_user_id, + created_at=datetime.utcnow(), + updated_at=datetime.utcnow() + ) + self.db.add(storage) + self.db.flush() + self.db.refresh(storage) + logger.info(f"创建用户存储记录成功: end_user_id={end_user_id}") + return storage + def update_implicit_profile( self, end_user_id: str, profile_data: dict - ) -> Optional[ImplicitEmotionsStorage]: - """更新隐性记忆画像数据 - - Args: - end_user_id: 终端用户ID - profile_data: 画像数据 - - Returns: - 更新后的存储记录 - """ - try: - storage = self.get_by_end_user_id(end_user_id) - - if storage is None: - # 如果记录不存在,创建新记录 - storage = self.create(end_user_id) - - storage.implicit_profile = profile_data - storage.implicit_generated_at = datetime.utcnow() - storage.updated_at = datetime.utcnow() - - self.db.commit() - self.db.refresh(storage) - logger.info(f"更新隐性记忆画像成功: end_user_id={end_user_id}") - return storage - except Exception as e: - self.db.rollback() - logger.error(f"更新隐性记忆画像失败: end_user_id={end_user_id}, error={e}") - raise - + ) -> ImplicitEmotionsStorage: + """更新隐性记忆画像数据(事务由调用方提交)""" + storage = self.get_by_end_user_id(end_user_id) + if storage is None: + storage = self.create(end_user_id) + + storage.implicit_profile = profile_data + storage.implicit_generated_at = datetime.utcnow() + storage.updated_at = datetime.utcnow() + + self.db.flush() + self.db.refresh(storage) + logger.info(f"更新隐性记忆画像成功: end_user_id={end_user_id}") + return storage + def update_emotion_suggestions( self, end_user_id: str, suggestions_data: dict - ) -> Optional[ImplicitEmotionsStorage]: - """更新情绪建议数据 - + ) -> ImplicitEmotionsStorage: + """更新情绪建议数据(事务由调用方提交)""" + storage = self.get_by_end_user_id(end_user_id) + if storage is None: + storage = self.create(end_user_id) + + storage.emotion_suggestions = suggestions_data + storage.emotion_generated_at = datetime.utcnow() + storage.updated_at = datetime.utcnow() + + self.db.flush() + self.db.refresh(storage) + logger.info(f"更新情绪建议成功: end_user_id={end_user_id}") + return storage + + def get_all_user_ids(self, batch_size: int = 100) -> Generator[str, None, None]: + """分批次获取所有已存储数据的用户ID(避免大数据量内存溢出) + Args: - end_user_id: 终端用户ID - suggestions_data: 建议数据 - - Returns: - 更新后的存储记录 + batch_size: 每批次加载的数量,默认100 + + Yields: + 用户ID字符串 """ - try: - storage = self.get_by_end_user_id(end_user_id) - - if storage is None: - # 如果记录不存在,创建新记录 - storage = self.create(end_user_id) - - storage.emotion_suggestions = suggestions_data - storage.emotion_generated_at = datetime.utcnow() - storage.updated_at = datetime.utcnow() - - self.db.commit() - self.db.refresh(storage) - logger.info(f"更新情绪建议成功: end_user_id={end_user_id}") - return storage - except Exception as e: - self.db.rollback() - logger.error(f"更新情绪建议失败: end_user_id={end_user_id}, error={e}") - raise - - def get_all_user_ids(self) -> List[str]: - """获取所有已存储数据的用户ID列表 - - Returns: - 用户ID列表 - """ - try: - stmt = select(ImplicitEmotionsStorage.end_user_id) - result = self.db.execute(stmt).scalars().all() - return list(result) - except Exception as e: - logger.error(f"获取所有用户ID失败: error={e}") - return [] - + offset = 0 + while True: + try: + stmt = ( + select(ImplicitEmotionsStorage.end_user_id) + .order_by(ImplicitEmotionsStorage.end_user_id) + .limit(batch_size) + .offset(offset) + ) + batch = self.db.execute(stmt).scalars().all() + if not batch: + break + yield from batch + offset += batch_size + except Exception as e: + logger.error(f"分批获取用户ID失败: offset={offset}, error={e}") + break + def delete_by_end_user_id(self, end_user_id: str) -> bool: - """删除用户的存储记录 - - Args: - end_user_id: 终端用户ID - - Returns: - 是否删除成功 - """ - try: - storage = self.get_by_end_user_id(end_user_id) - if storage: - self.db.delete(storage) - self.db.commit() - logger.info(f"删除用户存储记录成功: end_user_id={end_user_id}") - return True - return False - except Exception as e: - self.db.rollback() - logger.error(f"删除用户存储记录失败: end_user_id={end_user_id}, error={e}") - return False + """删除用户的存储记录(事务由调用方提交)""" + storage = self.get_by_end_user_id(end_user_id) + if storage: + self.db.delete(storage) + self.db.flush() + logger.info(f"删除用户存储记录成功: end_user_id={end_user_id}") + return True + return False diff --git a/api/app/services/emotion_analytics_service.py b/api/app/services/emotion_analytics_service.py index 099cbfb7..c226348e 100644 --- a/api/app/services/emotion_analytics_service.py +++ b/api/app/services/emotion_analytics_service.py @@ -892,12 +892,12 @@ class EmotionAnalyticsService: logger.info(f"保存建议到数据库: user={end_user_id}") - # 保存到数据库 repo = ImplicitEmotionsStorageRepository(db) repo.update_emotion_suggestions(end_user_id, suggestions_data) + db.commit() logger.info(f"建议保存成功: user={end_user_id}") except Exception as e: - logger.error(f"保存建议失败: {str(e)}", exc_info=True) - # 不抛出异常,存储失败不应影响主流程 \ No newline at end of file + db.rollback() + logger.error(f"保存建议失败: {str(e)}", exc_info=True) \ No newline at end of file diff --git a/api/app/services/implicit_memory_service.py b/api/app/services/implicit_memory_service.py index 534f138c..4bd11deb 100644 --- a/api/app/services/implicit_memory_service.py +++ b/api/app/services/implicit_memory_service.py @@ -471,12 +471,12 @@ class ImplicitMemoryService: logger.info(f"保存用户画像到数据库: user={end_user_id}") - # 保存到数据库 repo = ImplicitEmotionsStorageRepository(db) repo.update_implicit_profile(end_user_id, profile_data) + db.commit() logger.info(f"用户画像保存成功: user={end_user_id}") except Exception as e: + db.rollback() logger.error(f"保存用户画像失败: {str(e)}", exc_info=True) - # 不抛出异常,存储失败不应影响主流程 diff --git a/api/app/tasks.py b/api/app/tasks.py index 5a320c3f..1675f25d 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1963,6 +1963,8 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: async def _run() -> Dict[str, Any]: from app.core.logging_config import get_logger from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository + from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage + from sqlalchemy import select, func from app.services.implicit_memory_service import ImplicitMemoryService from app.services.emotion_analytics_service import EmotionAnalyticsService @@ -1977,15 +1979,18 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: with get_db_context() as db: try: - # 获取所有已存储数据的用户ID + # 获取所有已存储数据的用户ID(分批次处理) repo = ImplicitEmotionsStorageRepository(db) - user_ids = repo.get_all_user_ids() - total_users = len(user_ids) + # 先统计总数用于日志 + from sqlalchemy import func + total_users = db.execute( + select(func.count()).select_from(ImplicitEmotionsStorage) + ).scalar() or 0 logger.info(f"找到 {total_users} 个需要更新的用户") - # 遍历每个用户并更新数据 - for end_user_id in user_ids: + # 遍历每个用户并更新数据(分批次,避免一次性加载所有ID) + for end_user_id in repo.get_all_user_ids(batch_size=100): logger.info(f"开始处理用户: {end_user_id}") user_start_time = time.time()