From 935f3d54b31976efb2a54daec495f7c0c9d1e18b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=90=E5=8A=9B=E9=BD=90?= <162269739+lanceyq@users.noreply.github.com> Date: Fri, 16 Jan 2026 12:33:37 +0800 Subject: [PATCH] Feature/generate cache (#135) * [feature]Generate emotions, implicit cache * [feature]Generate emotions, implicit cache * [changes]Improve the code based on AI review * [changes]Improve the code based on AI review * [changes]Improve the code * [feature]Generate emotions, implicit cache * [changes]Improve the code based on AI review * [changes]Improve the code --- api/app/controllers/emotion_controller.py | 90 ++++++- .../controllers/implicit_memory_controller.py | 239 +++++++++++++----- api/app/models/__init__.py | 6 +- .../models/emotion_suggestions_cache_model.py | 24 ++ api/app/models/implicit_memory_cache_model.py | 27 ++ .../repositories/data_config_repository.py | 6 +- .../emotion_suggestions_cache_repository.py | 163 ++++++++++++ .../implicit_memory_cache_repository.py | 175 +++++++++++++ api/app/repositories/neo4j/cypher_queries.py | 2 +- api/app/schemas/emotion_schema.py | 6 + api/app/schemas/implicit_memory_schema.py | 22 ++ api/app/services/emotion_analytics_service.py | 82 ++++++ api/app/services/implicit_memory_service.py | 128 +++++++++- 13 files changed, 896 insertions(+), 74 deletions(-) create mode 100644 api/app/models/emotion_suggestions_cache_model.py create mode 100644 api/app/models/implicit_memory_cache_model.py create mode 100644 api/app/repositories/emotion_suggestions_cache_repository.py create mode 100644 api/app/repositories/implicit_memory_cache_repository.py diff --git a/api/app/controllers/emotion_controller.py b/api/app/controllers/emotion_controller.py index 7f0cb91b..c92c04d5 100644 --- a/api/app/controllers/emotion_controller.py +++ b/api/app/controllers/emotion_controller.py @@ -18,6 +18,7 @@ from app.models.user_model import User from app.schemas.emotion_schema import ( EmotionHealthRequest, EmotionSuggestionsRequest, + EmotionGenerateSuggestionsRequest, EmotionTagsRequest, EmotionWordcloudRequest, ) @@ -198,7 +199,7 @@ async def get_emotion_suggestions( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): - """获取个性化情绪建议 + """获取个性化情绪建议(从缓存读取) Args: request: 包含 group_id 和可选的 config_id @@ -206,7 +207,72 @@ async def get_emotion_suggestions( current_user: 当前用户 Returns: - 个性化情绪建议响应 + 缓存的个性化情绪建议响应 + """ + try: + api_logger.info( + f"用户 {current_user.username} 请求获取个性化情绪建议(缓存)", + extra={ + "group_id": request.group_id, + "config_id": request.config_id + } + ) + + # 从缓存获取建议 + data = await emotion_service.get_cached_suggestions( + end_user_id=request.group_id, + db=db + ) + + if data is None: + # 缓存不存在或已过期 + api_logger.info( + f"用户 {request.group_id} 的建议缓存不存在或已过期", + extra={"group_id": request.group_id} + ) + return fail( + BizCode.RESOURCE_NOT_FOUND, + "建议缓存不存在或已过期,请调用 /generate_suggestions 接口生成新建议", + None + ) + + api_logger.info( + "个性化建议获取成功(缓存)", + extra={ + "group_id": request.group_id, + "suggestions_count": len(data.get("suggestions", [])) + } + ) + + return success(data=data, msg="个性化建议获取成功(缓存)") + + except Exception as e: + api_logger.error( + f"获取个性化建议失败: {str(e)}", + extra={"group_id": request.group_id}, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"获取个性化建议失败: {str(e)}" + ) + + +@router.post("/generate_suggestions", response_model=ApiResponse) +async def generate_emotion_suggestions( + request: EmotionGenerateSuggestionsRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """生成个性化情绪建议(调用LLM并缓存) + + Args: + request: 包含 group_id、可选的 config_id 和 force_refresh + db: 数据库会话 + current_user: 当前用户 + + Returns: + 新生成的个性化情绪建议响应 """ try: # 验证 config_id(如果提供) @@ -234,36 +300,44 @@ async def get_emotion_suggestions( return fail(BizCode.INVALID_PARAMETER, "配置ID验证失败", str(e)) api_logger.info( - f"用户 {current_user.username} 请求获取个性化情绪建议", + f"用户 {current_user.username} 请求生成个性化情绪建议", extra={ "group_id": request.group_id, "config_id": config_id } ) - # 调用服务层 + # 调用服务层生成建议 data = await emotion_service.generate_emotion_suggestions( end_user_id=request.group_id, db=db ) + # 保存到缓存 + await emotion_service.save_suggestions_cache( + end_user_id=request.group_id, + suggestions_data=data, + db=db, + expires_hours=24 + ) + api_logger.info( - "个性化建议获取成功", + "个性化建议生成成功", extra={ "group_id": request.group_id, "suggestions_count": len(data.get("suggestions", [])) } ) - return success(data=data, msg="个性化建议获取成功") + return success(data=data, msg="个性化建议生成成功") except Exception as e: api_logger.error( - f"获取个性化建议失败: {str(e)}", + f"生成个性化建议失败: {str(e)}", extra={"group_id": request.group_id}, exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"获取个性化建议失败: {str(e)}" + detail=f"生成个性化建议失败: {str(e)}" ) diff --git a/api/app/controllers/implicit_memory_controller.py b/api/app/controllers/implicit_memory_controller.py index 6ef39929..eb7037ff 100644 --- a/api/app/controllers/implicit_memory_controller.py +++ b/api/app/controllers/implicit_memory_controller.py @@ -11,6 +11,7 @@ from app.dependencies import ( ) from app.models.user_model import User from app.schemas.response_schema import ApiResponse +from app.schemas.implicit_memory_schema import GenerateProfileRequest from app.services.implicit_memory_service import ImplicitMemoryService from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session @@ -133,7 +134,7 @@ async def get_preference_tags( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user preference tags with filtering options. + Get user preference tags from cache. Args: user_id: Target user ID @@ -143,35 +144,56 @@ async def get_preference_tags( end_date: Optional end date filter Returns: - List of preference tags matching the filters + List of preference tags from cache """ - api_logger.info(f"Preference tags requested for user: {user_id}") + api_logger.info(f"Preference tags requested for user: {user_id} (from cache)") try: # Validate inputs validate_user_id(user_id) - validate_confidence_threshold(confidence_threshold) - validate_date_range(start_date, end_date) # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - # Build date range - date_range = None - if start_date and end_date: - from app.schemas.implicit_memory_schema import DateRange - date_range = DateRange(start_date=start_date, end_date=end_date) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - # Get preference tags - tags = await service.get_preference_tags( - user_id=user_id, - confidence_threshold=confidence_threshold, - tag_category=tag_category, - date_range=date_range - ) + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.RESOURCE_NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + None + ) - api_logger.info(f"Retrieved {len(tags)} preference tags for user: {user_id}") - return success(data=[tag.model_dump(mode='json') for tag in tags], msg="偏好标签获取成功") + # Extract preferences from cache + preferences = cached_profile.get("preferences", []) + + # Apply filters (client-side filtering on cached data) + filtered_preferences = [] + for pref in preferences: + # Filter by confidence threshold + if confidence_threshold is not None and pref.get("confidence_score", 0) < confidence_threshold: + continue + + # Filter by category if specified + if tag_category and pref.get("category") != tag_category: + continue + + # Filter by date range if specified + if start_date or end_date: + created_at_ts = pref.get("created_at") + if created_at_ts: + created_at = datetime.fromtimestamp(created_at_ts / 1000) + if start_date and created_at < start_date: + continue + if end_date and created_at > end_date: + continue + + filtered_preferences.append(pref) + + api_logger.info(f"Retrieved {len(filtered_preferences)} preference tags for user: {user_id} (from cache)") + return success(data=filtered_preferences, msg="偏好标签获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "偏好标签获取", user_id) @@ -186,16 +208,16 @@ async def get_dimension_portrait( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user's four-dimension personality portrait. + Get user's four-dimension personality portrait from cache. Args: user_id: Target user ID - include_history: Whether to include historical trend data + include_history: Whether to include historical trend data (ignored for cached data) Returns: - Four-dimension personality portrait with scores and evidence + Four-dimension personality portrait from cache """ - api_logger.info(f"Dimension portrait requested for user: {user_id}") + api_logger.info(f"Dimension portrait requested for user: {user_id} (from cache)") try: # Validate inputs @@ -204,13 +226,22 @@ async def get_dimension_portrait( # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - portrait = await service.get_dimension_portrait( - user_id=user_id, - include_history=include_history - ) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - api_logger.info(f"Dimension portrait retrieved for user: {user_id}") - return success(data=portrait.model_dump(mode='json'), msg="四维画像获取成功") + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.RESOURCE_NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + None + ) + + # Extract portrait from cache + portrait = cached_profile.get("portrait", {}) + + api_logger.info(f"Dimension portrait retrieved for user: {user_id} (from cache)") + return success(data=portrait, msg="四维画像获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "四维画像获取", user_id) @@ -225,16 +256,16 @@ async def get_interest_area_distribution( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user's interest area distribution across four areas. + Get user's interest area distribution from cache. Args: user_id: Target user ID - include_trends: Whether to include trend analysis data + include_trends: Whether to include trend analysis data (ignored for cached data) Returns: - Interest area distribution with percentages and evidence + Interest area distribution from cache """ - api_logger.info(f"Interest area distribution requested for user: {user_id}") + api_logger.info(f"Interest area distribution requested for user: {user_id} (from cache)") try: # Validate inputs @@ -243,13 +274,22 @@ async def get_interest_area_distribution( # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - distribution = await service.get_interest_area_distribution( - user_id=user_id, - include_trends=include_trends - ) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - api_logger.info(f"Interest area distribution retrieved for user: {user_id}") - return success(data=distribution.model_dump(mode='json'), msg="兴趣领域分布获取成功") + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.RESOURCE_NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + None + ) + + # Extract interest areas from cache + interest_areas = cached_profile.get("interest_areas", {}) + + api_logger.info(f"Interest area distribution retrieved for user: {user_id} (from cache)") + return success(data=interest_areas, msg="兴趣领域分布获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "兴趣领域分布获取", user_id) @@ -266,7 +306,7 @@ async def get_behavior_habits( current_user: User = Depends(get_current_user) ) -> ApiResponse: """ - Get user's behavioral habits with filtering options. + Get user's behavioral habits from cache. Args: user_id: Target user ID @@ -275,38 +315,117 @@ async def get_behavior_habits( time_period: Filter by time period (current, past) Returns: - List of behavioral habits matching the filters + List of behavioral habits from cache """ - api_logger.info(f"Behavior habits requested for user: {user_id}") + api_logger.info(f"Behavior habits requested for user: {user_id} (from cache)") try: # Validate inputs validate_user_id(user_id) - # Convert string confidence level to numerical - numerical_confidence = None - if confidence_level: - confidence_mapping = { - "high": 85, - "medium": 50, - "low": 20 - } - numerical_confidence = confidence_mapping.get(confidence_level.lower()) - # Create service with user-specific config service = ImplicitMemoryService(db=db, end_user_id=user_id) - habits = await service.get_behavior_habits( - user_id=user_id, - confidence_level=numerical_confidence, - frequency_pattern=frequency_pattern, - time_period=time_period - ) + # Get cached profile + cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db) - api_logger.info(f"Retrieved {len(habits)} behavior habits for user: {user_id}") - return success(data=[habit.model_dump(mode='json') for habit in habits], msg="行为习惯获取成功") + if cached_profile is None: + api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期") + return fail( + BizCode.RESOURCE_NOT_FOUND, + "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像", + None + ) + + # Extract habits from cache + habits = cached_profile.get("habits", []) + + # Apply filters (client-side filtering on cached data) + filtered_habits = [] + for habit in habits: + # Filter by confidence level + if confidence_level: + confidence_mapping = { + "high": 85, + "medium": 50, + "low": 20 + } + numerical_confidence = confidence_mapping.get(confidence_level.lower()) + if habit.get("confidence_level", 0) < numerical_confidence: + continue + + # Filter by frequency pattern + if frequency_pattern and habit.get("frequency_pattern") != frequency_pattern: + continue + + # Filter by time period + if time_period: + is_current = habit.get("is_current", True) + if time_period.lower() == "current" and not is_current: + continue + elif time_period.lower() == "past" and is_current: + continue + + filtered_habits.append(habit) + + api_logger.info(f"Retrieved {len(filtered_habits)} behavior habits for user: {user_id} (from cache)") + return success(data=filtered_habits, msg="行为习惯获取成功(缓存)") except Exception as e: return handle_implicit_memory_error(e, "行为习惯获取", user_id) + + + +@router.post("/generate_profile", response_model=ApiResponse) +@cur_workspace_access_guard() +async def generate_implicit_memory_profile( + request: GenerateProfileRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +) -> ApiResponse: + """ + Generate complete user profile (all 4 modules) and cache it. + + Args: + request: Generate profile request with end_user_id + db: Database session + current_user: Current authenticated user + + Returns: + Complete user profile with all modules + """ + end_user_id = request.end_user_id + api_logger.info(f"Generate profile requested for user: {end_user_id}") + + try: + # Validate inputs + validate_user_id(end_user_id) + + # Create service with user-specific config + service = ImplicitMemoryService(db=db, end_user_id=end_user_id) + + # Generate complete profile (calls LLM for all 4 modules) + api_logger.info(f"开始生成完整用户画像: user={end_user_id}") + profile_data = await service.generate_complete_profile(user_id=end_user_id) + + # Save to cache + await service.save_profile_cache( + end_user_id=end_user_id, + profile_data=profile_data, + db=db, + expires_hours=168 # 7 days + ) + + api_logger.info(f"用户画像生成并缓存成功: user={end_user_id}") + + # Add metadata + profile_data["end_user_id"] = end_user_id + profile_data["cached"] = False + + return success(data=profile_data, msg="用户画像生成成功") + + except Exception as e: + api_logger.error(f"生成用户画像失败: user={end_user_id}, error={str(e)}", exc_info=True) + return handle_implicit_memory_error(e, "用户画像生成", end_user_id) diff --git a/api/app/models/__init__.py b/api/app/models/__init__.py index 189876a5..f45991cd 100644 --- a/api/app/models/__init__.py +++ b/api/app/models/__init__.py @@ -27,6 +27,8 @@ from .tool_model import ( ToolExecution, ToolType, ToolStatus, AuthType, ExecutionStatus ) from .memory_perceptual_model import MemoryPerceptualModel +from .emotion_suggestions_cache_model import EmotionSuggestionsCache +from .implicit_memory_cache_model import ImplicitMemoryCache __all__ = [ "Tenants", @@ -76,5 +78,7 @@ __all__ = [ "ToolStatus", "AuthType", "ExecutionStatus", - "MemoryPerceptualModel" + "MemoryPerceptualModel", + "EmotionSuggestionsCache", + "ImplicitMemoryCache" ] diff --git a/api/app/models/emotion_suggestions_cache_model.py b/api/app/models/emotion_suggestions_cache_model.py new file mode 100644 index 00000000..9b32f424 --- /dev/null +++ b/api/app/models/emotion_suggestions_cache_model.py @@ -0,0 +1,24 @@ +"""情绪建议缓存模型""" + +import uuid +import datetime +from sqlalchemy import Column, String, Text, Integer, DateTime, JSON +from sqlalchemy.dialects.postgresql import UUID +from app.db import Base + + +class EmotionSuggestionsCache(Base): + """情绪建议缓存表 + + 用于缓存个性化情绪建议,减少 LLM 调用成本,提升响应速度。 + """ + __tablename__ = "emotion_suggestions_cache" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True) + end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID(组ID)") + health_summary = Column(Text, nullable=False, comment="健康状态摘要") + suggestions = Column(JSON, nullable=False, comment="建议列表(JSON格式)") + generated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, comment="生成时间") + expires_at = Column(DateTime, nullable=True, comment="过期时间") + created_at = Column(DateTime, default=datetime.datetime.now) + updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now) diff --git a/api/app/models/implicit_memory_cache_model.py b/api/app/models/implicit_memory_cache_model.py new file mode 100644 index 00000000..32defbab --- /dev/null +++ b/api/app/models/implicit_memory_cache_model.py @@ -0,0 +1,27 @@ +"""隐性记忆缓存模型""" + +import uuid +import datetime +from sqlalchemy import Column, String, Integer, DateTime, JSON +from sqlalchemy.dialects.postgresql import UUID +from app.db import Base + + +class ImplicitMemoryCache(Base): + """隐性记忆缓存表 + + 用于缓存用户的完整隐性记忆画像,包括偏好标签、四维画像、兴趣领域和行为习惯。 + 减少 LLM 调用成本,提升响应速度。 + """ + __tablename__ = "implicit_memory_cache" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True) + end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID") + preferences = Column(JSON, nullable=False, comment="偏好标签列表(JSON格式)") + portrait = Column(JSON, nullable=False, comment="四维画像对象(JSON格式)") + interest_areas = Column(JSON, nullable=False, comment="兴趣领域分布对象(JSON格式)") + habits = Column(JSON, nullable=False, comment="行为习惯列表(JSON格式)") + generated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, comment="生成时间") + expires_at = Column(DateTime, nullable=True, comment="过期时间") + created_at = Column(DateTime, default=datetime.datetime.now) + updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now) diff --git a/api/app/repositories/data_config_repository.py b/api/app/repositories/data_config_repository.py index 7843acc2..135c0063 100644 --- a/api/app/repositories/data_config_repository.py +++ b/api/app/repositories/data_config_repository.py @@ -81,7 +81,7 @@ class DataConfigRepository: n.description AS description, n.entity_type AS entity_type, n.name AS name, - n.fact_summary AS fact_summary, + COALESCE(n.fact_summary, '') AS fact_summary, n.group_id AS group_id, n.apply_id AS apply_id, n.user_id AS user_id, @@ -115,7 +115,7 @@ class DataConfigRepository: description: n.description, entity_type: n.entity_type, name: n.name, - fact_summary: n.fact_summary, + fact_summary: COALESCE(n.fact_summary, ''), id: n.id } AS sourceNode, { @@ -132,7 +132,7 @@ class DataConfigRepository: description: m.description, entity_type: m.entity_type, name: m.name, - fact_summary: m.fact_summary, + fact_summary: COALESCE(m.fact_summary, ''), id: m.id } AS targetNode """ diff --git a/api/app/repositories/emotion_suggestions_cache_repository.py b/api/app/repositories/emotion_suggestions_cache_repository.py new file mode 100644 index 00000000..1c0430d5 --- /dev/null +++ b/api/app/repositories/emotion_suggestions_cache_repository.py @@ -0,0 +1,163 @@ +"""情绪建议缓存仓储层""" + +from sqlalchemy.orm import Session +from typing import Optional, Dict, Any +import datetime + +from app.models.emotion_suggestions_cache_model import EmotionSuggestionsCache +from app.core.logging_config import get_db_logger + +# 获取数据库专用日志器 +db_logger = get_db_logger() + + +class EmotionSuggestionsCacheRepository: + """情绪建议缓存仓储类""" + + def __init__(self, db: Session): + self.db = db + + def get_by_end_user_id(self, end_user_id: str) -> Optional[EmotionSuggestionsCache]: + """根据终端用户ID获取缓存 + + Args: + end_user_id: 终端用户ID(组ID) + + Returns: + 缓存记录,如果不存在返回 None + """ + try: + cache = ( + self.db.query(EmotionSuggestionsCache) + .filter(EmotionSuggestionsCache.end_user_id == end_user_id) + .first() + ) + if cache: + db_logger.info(f"成功获取用户 {end_user_id} 的情绪建议缓存") + else: + db_logger.info(f"用户 {end_user_id} 的情绪建议缓存不存在") + return cache + except Exception as e: + db_logger.error(f"获取用户 {end_user_id} 的情绪建议缓存失败: {str(e)}") + raise + + def create_or_update( + self, + end_user_id: str, + health_summary: str, + suggestions: list, + expires_hours: int = 24 + ) -> EmotionSuggestionsCache: + """创建或更新缓存 + + Args: + end_user_id: 终端用户ID(组ID) + health_summary: 健康状态摘要 + suggestions: 建议列表 + expires_hours: 过期时间(小时),默认24小时 + + Returns: + 缓存记录 + """ + try: + # 查找现有记录 + cache = self.get_by_end_user_id(end_user_id) + + now = datetime.datetime.now() + expires_at = now + datetime.timedelta(hours=expires_hours) + + if cache: + # 更新现有记录 + cache.health_summary = health_summary + cache.suggestions = suggestions + cache.generated_at = now + cache.expires_at = expires_at + cache.updated_at = now + db_logger.info(f"更新用户 {end_user_id} 的情绪建议缓存") + else: + # 创建新记录 + cache = EmotionSuggestionsCache( + end_user_id=end_user_id, + health_summary=health_summary, + suggestions=suggestions, + generated_at=now, + expires_at=expires_at, + created_at=now, + updated_at=now + ) + self.db.add(cache) + db_logger.info(f"创建用户 {end_user_id} 的情绪建议缓存") + + self.db.commit() + self.db.refresh(cache) + return cache + except Exception as e: + self.db.rollback() + db_logger.error(f"创建或更新用户 {end_user_id} 的情绪建议缓存失败: {str(e)}") + raise + + def delete_by_end_user_id(self, end_user_id: str) -> bool: + """删除缓存 + + Args: + end_user_id: 终端用户ID(组ID) + + Returns: + 是否删除成功 + """ + try: + cache = self.get_by_end_user_id(end_user_id) + if cache: + self.db.delete(cache) + self.db.commit() + db_logger.info(f"删除用户 {end_user_id} 的情绪建议缓存") + return True + return False + except Exception as e: + self.db.rollback() + db_logger.error(f"删除用户 {end_user_id} 的情绪建议缓存失败: {str(e)}") + raise + + @staticmethod + def is_expired(cache: EmotionSuggestionsCache) -> bool: + """检查缓存是否过期 + + Args: + cache: 缓存记录 + + Returns: + 是否过期 + """ + if cache.expires_at is None: + return False + return datetime.datetime.now() > cache.expires_at + + +# 便捷函数 +def get_cache_by_end_user_id(db: Session, end_user_id: str) -> Optional[EmotionSuggestionsCache]: + """根据终端用户ID获取缓存""" + repo = EmotionSuggestionsCacheRepository(db) + return repo.get_by_end_user_id(end_user_id) + + +def create_or_update_cache( + db: Session, + end_user_id: str, + health_summary: str, + suggestions: list, + expires_hours: int = 24 +) -> EmotionSuggestionsCache: + """创建或更新缓存""" + repo = EmotionSuggestionsCacheRepository(db) + return repo.create_or_update(end_user_id, health_summary, suggestions, expires_hours) + + +def delete_cache_by_end_user_id(db: Session, end_user_id: str) -> bool: + """删除缓存""" + repo = EmotionSuggestionsCacheRepository(db) + return repo.delete_by_end_user_id(end_user_id) + + +def is_cache_expired(cache: EmotionSuggestionsCache) -> bool: + """检查缓存是否过期""" + return EmotionSuggestionsCacheRepository.is_expired(cache) diff --git a/api/app/repositories/implicit_memory_cache_repository.py b/api/app/repositories/implicit_memory_cache_repository.py new file mode 100644 index 00000000..65356980 --- /dev/null +++ b/api/app/repositories/implicit_memory_cache_repository.py @@ -0,0 +1,175 @@ +"""隐性记忆缓存仓储层""" + +from sqlalchemy.orm import Session +from typing import Optional, Dict, Any +import datetime + +from app.models.implicit_memory_cache_model import ImplicitMemoryCache +from app.core.logging_config import get_db_logger + +# 获取数据库专用日志器 +db_logger = get_db_logger() + + +class ImplicitMemoryCacheRepository: + """隐性记忆缓存仓储类""" + + def __init__(self, db: Session): + self.db = db + + def get_by_end_user_id(self, end_user_id: str) -> Optional[ImplicitMemoryCache]: + """根据终端用户ID获取缓存 + + Args: + end_user_id: 终端用户ID + + Returns: + 缓存记录,如果不存在返回 None + """ + try: + cache = ( + self.db.query(ImplicitMemoryCache) + .filter(ImplicitMemoryCache.end_user_id == end_user_id) + .first() + ) + if cache: + db_logger.info(f"成功获取用户 {end_user_id} 的隐性记忆缓存") + else: + db_logger.info(f"用户 {end_user_id} 的隐性记忆缓存不存在") + return cache + except Exception as e: + db_logger.error(f"获取用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}") + raise + + def create_or_update( + self, + end_user_id: str, + preferences: list, + portrait: dict, + interest_areas: dict, + habits: list, + expires_hours: int = 168 # 默认7天 + ) -> ImplicitMemoryCache: + """创建或更新缓存 + + Args: + end_user_id: 终端用户ID + preferences: 偏好标签列表 + portrait: 四维画像对象 + interest_areas: 兴趣领域分布对象 + habits: 行为习惯列表 + expires_hours: 过期时间(小时),默认168小时(7天) + + Returns: + 缓存记录 + """ + try: + # 查找现有记录 + cache = self.get_by_end_user_id(end_user_id) + + now = datetime.datetime.now() + expires_at = now + datetime.timedelta(hours=expires_hours) + + if cache: + # 更新现有记录 + cache.preferences = preferences + cache.portrait = portrait + cache.interest_areas = interest_areas + cache.habits = habits + cache.generated_at = now + cache.expires_at = expires_at + cache.updated_at = now + db_logger.info(f"更新用户 {end_user_id} 的隐性记忆缓存") + else: + # 创建新记录 + cache = ImplicitMemoryCache( + end_user_id=end_user_id, + preferences=preferences, + portrait=portrait, + interest_areas=interest_areas, + habits=habits, + generated_at=now, + expires_at=expires_at, + created_at=now, + updated_at=now + ) + self.db.add(cache) + db_logger.info(f"创建用户 {end_user_id} 的隐性记忆缓存") + + self.db.commit() + self.db.refresh(cache) + return cache + except Exception as e: + self.db.rollback() + db_logger.error(f"创建或更新用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}") + raise + + def delete_by_end_user_id(self, end_user_id: str) -> bool: + """删除缓存 + + Args: + end_user_id: 终端用户ID + + Returns: + 是否删除成功 + """ + try: + cache = self.get_by_end_user_id(end_user_id) + if cache: + self.db.delete(cache) + self.db.commit() + db_logger.info(f"删除用户 {end_user_id} 的隐性记忆缓存") + return True + return False + except Exception as e: + self.db.rollback() + db_logger.error(f"删除用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}") + raise + + @staticmethod + def is_expired(cache: ImplicitMemoryCache) -> bool: + """检查缓存是否过期 + + Args: + cache: 缓存记录 + + Returns: + 是否过期 + """ + if cache.expires_at is None: + return False + return datetime.datetime.now() > cache.expires_at + + +# 便捷函数 +def get_cache_by_end_user_id(db: Session, end_user_id: str) -> Optional[ImplicitMemoryCache]: + """根据终端用户ID获取缓存""" + repo = ImplicitMemoryCacheRepository(db) + return repo.get_by_end_user_id(end_user_id) + + +def create_or_update_cache( + db: Session, + end_user_id: str, + preferences: list, + portrait: dict, + interest_areas: dict, + habits: list, + expires_hours: int = 168 +) -> ImplicitMemoryCache: + """创建或更新缓存""" + repo = ImplicitMemoryCacheRepository(db) + return repo.create_or_update( + end_user_id, preferences, portrait, interest_areas, habits, expires_hours + ) + + +def delete_cache_by_end_user_id(db: Session, end_user_id: str) -> bool: + """删除缓存""" + repo = ImplicitMemoryCacheRepository(db) + return repo.delete_by_end_user_id(end_user_id) + + +def is_cache_expired(cache: ImplicitMemoryCache) -> bool: + """检查缓存是否过期""" + return ImplicitMemoryCacheRepository.is_expired(cache) diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index fb28b81e..c91c2e80 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -332,7 +332,7 @@ RETURN e.id AS id, e.description AS description, e.aliases AS aliases, e.name_embedding AS name_embedding, - e.fact_summary AS fact_summary, + COALESCE(e.fact_summary, '') AS fact_summary, e.connect_strength AS connect_strength, collect(DISTINCT s.id) AS statement_ids, collect(DISTINCT c.id) AS chunk_ids, diff --git a/api/app/schemas/emotion_schema.py b/api/app/schemas/emotion_schema.py index 9f14884d..37e9a2e3 100644 --- a/api/app/schemas/emotion_schema.py +++ b/api/app/schemas/emotion_schema.py @@ -30,3 +30,9 @@ class EmotionSuggestionsRequest(BaseModel): """获取个性化情绪建议请求""" group_id: str = Field(..., description="组ID") config_id: Optional[int] = Field(None, description="配置ID(用于指定LLM模型)") + + +class EmotionGenerateSuggestionsRequest(BaseModel): + """生成个性化情绪建议请求""" + group_id: str = Field(..., description="组ID") + config_id: Optional[int] = Field(None, description="配置ID(用于指定LLM模型)") diff --git a/api/app/schemas/implicit_memory_schema.py b/api/app/schemas/implicit_memory_schema.py index e1770b18..ced50b92 100644 --- a/api/app/schemas/implicit_memory_schema.py +++ b/api/app/schemas/implicit_memory_schema.py @@ -262,3 +262,25 @@ InterestCategory = InterestCategoryResponse InterestAreaDistribution = InterestAreaDistributionResponse BehaviorHabit = BehaviorHabitResponse UserProfile = UserProfileResponse + + +# Cache-related Schemas + +class GenerateProfileRequest(BaseModel): + """生成完整用户画像请求""" + end_user_id: str = Field(..., description="终端用户ID") + + +class CompleteProfileResponse(BaseModel): + """完整用户画像响应(包含所有模块)""" + user_id: str + preferences: List[PreferenceTagResponse] + portrait: DimensionPortraitResponse + interest_areas: InterestAreaDistributionResponse + habits: List[BehaviorHabitResponse] + generated_at: datetime.datetime + cached: bool = Field(False, description="是否来自缓存") + + @field_serializer("generated_at", when_used="json") + def _serialize_generated_at(self, dt: datetime.datetime): + return int(dt.timestamp() * 1000) if dt else None diff --git a/api/app/services/emotion_analytics_service.py b/api/app/services/emotion_analytics_service.py index f2532557..50773b91 100644 --- a/api/app/services/emotion_analytics_service.py +++ b/api/app/services/emotion_analytics_service.py @@ -705,3 +705,85 @@ class EmotionAnalyticsService: health_summary=summary, suggestions=suggestions ) + + async def get_cached_suggestions( + self, + end_user_id: str, + db: Session, + ) -> Optional[Dict[str, Any]]: + """从缓存获取个性化情绪建议 + + Args: + end_user_id: 宿主ID(用户组ID) + db: 数据库会话 + + Returns: + Dict: 缓存的建议数据,如果不存在或已过期返回 None + """ + try: + from app.repositories.emotion_suggestions_cache_repository import ( + EmotionSuggestionsCacheRepository, + ) + + logger.info(f"尝试从缓存获取情绪建议: user={end_user_id}") + + cache_repo = EmotionSuggestionsCacheRepository(db) + cache = cache_repo.get_by_end_user_id(end_user_id) + + if cache is None: + logger.info(f"用户 {end_user_id} 的建议缓存不存在") + return None + + # 检查是否过期 + if cache_repo.is_expired(cache): + logger.info(f"用户 {end_user_id} 的建议缓存已过期") + return None + + logger.info(f"成功从缓存获取建议: user={end_user_id}") + + return { + "health_summary": cache.health_summary, + "suggestions": cache.suggestions, + "generated_at": cache.generated_at.isoformat(), + "cached": True + } + + except Exception as e: + logger.error(f"从缓存获取建议失败: {str(e)}", exc_info=True) + return None + + async def save_suggestions_cache( + self, + end_user_id: str, + suggestions_data: Dict[str, Any], + db: Session, + expires_hours: int = 24 + ) -> None: + """保存建议到缓存 + + Args: + end_user_id: 宿主ID(用户组ID) + suggestions_data: 建议数据 + db: 数据库会话 + expires_hours: 过期时间(小时) + """ + try: + from app.repositories.emotion_suggestions_cache_repository import ( + EmotionSuggestionsCacheRepository, + ) + + logger.info(f"保存建议到缓存: user={end_user_id}") + + cache_repo = EmotionSuggestionsCacheRepository(db) + cache_repo.create_or_update( + end_user_id=end_user_id, + health_summary=suggestions_data["health_summary"], + suggestions=suggestions_data["suggestions"], + expires_hours=expires_hours + ) + + logger.info(f"建议缓存保存成功: user={end_user_id}") + + except Exception as e: + logger.error(f"保存建议缓存失败: {str(e)}", exc_info=True) + # 不抛出异常,缓存失败不应影响主流程 \ No newline at end of file diff --git a/api/app/services/implicit_memory_service.py b/api/app/services/implicit_memory_service.py index 8155b7a1..c98f14bc 100644 --- a/api/app/services/implicit_memory_service.py +++ b/api/app/services/implicit_memory_service.py @@ -7,6 +7,7 @@ user profiles from memory summaries. """ import logging +import asyncio from datetime import datetime from typing import List, Optional @@ -372,4 +373,129 @@ class ImplicitMemoryService: except Exception as e: logger.error(f"Failed to get behavior habits for user {user_id}: {e}") raise - \ No newline at end of file + + + async def generate_complete_profile( + self, + user_id: str + ) -> dict: + """生成完整的用户画像(包含所有4个模块) + + Args: + user_id: 用户ID + + Returns: + Dict: 包含所有模块的完整画像数据 + """ + logger.info(f"生成完整用户画像: user={user_id}") + + try: + # 并行调用4个分析方法 + preferences, portrait, interest_areas, habits = await asyncio.gather( + self.get_preference_tags(user_id=user_id), + self.get_dimension_portrait(user_id=user_id), + self.get_interest_area_distribution(user_id=user_id), + self.get_behavior_habits(user_id=user_id) + ) + + # 转换为可序列化的格式 + profile_data = { + "preferences": [tag.model_dump(mode='json') for tag in preferences], + "portrait": portrait.model_dump(mode='json'), + "interest_areas": interest_areas.model_dump(mode='json'), + "habits": [habit.model_dump(mode='json') for habit in habits] + } + + logger.info(f"完整用户画像生成完成: user={user_id}") + return profile_data + + except Exception as e: + logger.error(f"生成完整用户画像失败: {str(e)}", exc_info=True) + raise + + async def get_cached_profile( + self, + end_user_id: str, + db: Session + ) -> Optional[dict]: + """从缓存获取完整用户画像 + + Args: + end_user_id: 终端用户ID + db: 数据库会话 + + Returns: + Dict: 缓存的画像数据,如果不存在或已过期返回 None + """ + try: + from app.repositories.implicit_memory_cache_repository import ( + ImplicitMemoryCacheRepository, + ) + + logger.info(f"尝试从缓存获取用户画像: user={end_user_id}") + + cache_repo = ImplicitMemoryCacheRepository(db) + cache = cache_repo.get_by_end_user_id(end_user_id) + + if cache is None: + logger.info(f"用户 {end_user_id} 的画像缓存不存在") + return None + + # 检查是否过期 + if cache_repo.is_expired(cache): + logger.info(f"用户 {end_user_id} 的画像缓存已过期") + return None + + logger.info(f"成功从缓存获取用户画像: user={end_user_id}") + + return { + "end_user_id": cache.end_user_id, + "preferences": cache.preferences, + "portrait": cache.portrait, + "interest_areas": cache.interest_areas, + "habits": cache.habits, + "generated_at": cache.generated_at.isoformat(), + "cached": True + } + + except Exception as e: + logger.error(f"从缓存获取用户画像失败: {str(e)}", exc_info=True) + return None + + async def save_profile_cache( + self, + end_user_id: str, + profile_data: dict, + db: Session, + expires_hours: int = 168 # 默认7天 + ) -> None: + """保存用户画像到缓存 + + Args: + end_user_id: 终端用户ID + profile_data: 画像数据 + db: 数据库会话 + expires_hours: 过期时间(小时),默认168小时(7天) + """ + try: + from app.repositories.implicit_memory_cache_repository import ( + ImplicitMemoryCacheRepository, + ) + + logger.info(f"保存用户画像到缓存: user={end_user_id}") + + cache_repo = ImplicitMemoryCacheRepository(db) + cache_repo.create_or_update( + end_user_id=end_user_id, + preferences=profile_data["preferences"], + portrait=profile_data["portrait"], + interest_areas=profile_data["interest_areas"], + habits=profile_data["habits"], + expires_hours=expires_hours + ) + + logger.info(f"用户画像缓存保存成功: user={end_user_id}") + + except Exception as e: + logger.error(f"保存用户画像缓存失败: {str(e)}", exc_info=True) + # 不抛出异常,缓存失败不应影响主流程