""" Implicit Memory Service Main service orchestrating all implicit memory operations. This service coordinates profile building, data extraction, and provides high-level methods for analyzing user profiles from memory summaries. """ import logging import asyncio from datetime import datetime from typing import List, Optional from app.core.memory.analytics.implicit_memory.analyzers.dimension_analyzer import ( DimensionAnalyzer, ) from app.core.memory.analytics.implicit_memory.analyzers.interest_analyzer import ( InterestAnalyzer, ) from app.core.memory.analytics.implicit_memory.analyzers.preference_analyzer import ( PreferenceAnalyzer, ) from app.core.memory.analytics.implicit_memory.data_source import MemoryDataSource from app.core.memory.analytics.implicit_memory.habit_detector import HabitDetector from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.schemas.implicit_memory_schema import ( BehaviorHabit, DateRange, DimensionPortrait, FrequencyPattern, InterestAreaDistribution, PreferenceTag, TimeRange, UserMemorySummary, ) from app.schemas.memory_config_schema import MemoryConfig from sqlalchemy.orm import Session logger = logging.getLogger(__name__) class ImplicitMemoryService: """Main service for implicit memory operations.""" def __init__( self, db: Session, end_user_id: str ): """Initialize the implicit memory service. Args: db: Database session end_user_id: End user ID to get connected memory configuration """ self.db = db self.end_user_id = end_user_id # Get connected memory configuration for the user self.memory_config = self._get_user_memory_config() # Extract LLM model ID from memory config llm_model_id = str(self.memory_config.llm_model_id) if self.memory_config.llm_model_id else None # Initialize Neo4j connector self.neo4j_connector = Neo4jConnector() # Initialize core components with LLM model ID self.data_source = MemoryDataSource(db, self.neo4j_connector) self.preference_analyzer = PreferenceAnalyzer(db, llm_model_id) self.dimension_analyzer = DimensionAnalyzer(db, llm_model_id) self.interest_analyzer = InterestAnalyzer(db, llm_model_id) self.habit_detector = HabitDetector(db, llm_model_id) logger.info(f"ImplicitMemoryService initialized for end_user: {end_user_id}") def _get_user_memory_config(self) -> MemoryConfig: """Get memory configuration for the connected end user. Returns: MemoryConfig: User's connected memory configuration Raises: ValueError: If no memory configuration found for user """ try: from app.services.memory_agent_service import get_end_user_connected_config from app.services.memory_config_service import MemoryConfigService # Get user's connected config connected_config = get_end_user_connected_config(self.end_user_id, self.db) config_id = connected_config.get("memory_config_id") if config_id is None: raise ValueError(f"No memory configuration found for end_user: {self.end_user_id}") # Load the memory configuration config_service = MemoryConfigService(self.db) memory_config = config_service.load_memory_config(config_id) logger.info(f"Loaded memory config {config_id} for end_user: {self.end_user_id}") return memory_config except Exception as e: logger.error(f"Failed to get memory config for end_user {self.end_user_id}: {e}") raise ValueError(f"Unable to get memory configuration for end_user {self.end_user_id}: {e}") async def extract_user_summaries( self, user_id: str, time_range: Optional[TimeRange] = None, limit: Optional[int] = None ) -> List[UserMemorySummary]: """Extract user-specific memory summaries. Args: user_id: Target user ID time_range: Optional time range to filter summaries limit: Optional limit on number of summaries Returns: List of user-specific memory summaries """ logger.info(f"Extracting user summaries for user {user_id}") try: summaries = await self.data_source.get_user_summaries( user_id=user_id, time_range=time_range, limit=limit or 1000 ) logger.info(f"Extracted {len(summaries)} summaries for user {user_id}") return summaries except Exception as e: logger.error(f"Failed to extract user summaries for user {user_id}: {e}") raise async def get_preference_tags( self, user_id: str, confidence_threshold: float = 0.5, tag_category: Optional[str] = None, date_range: Optional[DateRange] = None ) -> List[PreferenceTag]: """Retrieve user preference tags with filtering. Args: user_id: Target user ID confidence_threshold: Minimum confidence score for tags tag_category: Optional category filter date_range: Optional date range filter Returns: List of filtered preference tags """ logger.info(f"Getting preference tags for user {user_id}") try: # Get user summaries for analysis time_range = None if date_range: time_range = TimeRange( start_date=date_range.start_date or datetime.min, end_date=date_range.end_date or datetime.now() ) user_summaries = await self.extract_user_summaries( user_id=user_id, time_range=time_range ) if not user_summaries: logger.warning(f"No summaries found for user {user_id}") return [] # Analyze preferences preference_tags = await self.preference_analyzer.analyze_preferences( user_id=user_id, user_summaries=user_summaries ) # Apply filters filtered_tags = [] for tag in preference_tags: # Filter by confidence threshold if tag.confidence_score < confidence_threshold: continue # Filter by category if specified if tag_category and tag.category != tag_category: continue # Filter by date range if specified if date_range: if date_range.start_date and tag.created_at < date_range.start_date: continue if date_range.end_date and tag.created_at > date_range.end_date: continue filtered_tags.append(tag) # Sort by confidence score and recency filtered_tags.sort( key=lambda x: (x.confidence_score, x.updated_at), reverse=True ) logger.info(f"Retrieved {len(filtered_tags)} preference tags for user {user_id}") return filtered_tags except Exception as e: logger.error(f"Failed to get preference tags for user {user_id}: {e}") raise async def get_dimension_portrait( self, user_id: str, include_history: bool = False ) -> DimensionPortrait: """Get user's four-dimension personality portrait. Args: user_id: Target user ID include_history: Whether to include historical trends Returns: User's dimension portrait """ logger.info(f"Getting dimension portrait for user {user_id}") try: # Get user summaries user_summaries = await self.extract_user_summaries(user_id=user_id) if not user_summaries: logger.warning(f"No summaries found for user {user_id}") return self.dimension_analyzer._create_empty_portrait(user_id) # Analyze dimensions dimension_portrait = await self.dimension_analyzer.analyze_dimensions( user_id=user_id, user_summaries=user_summaries ) # Include historical trends if requested if include_history: # In a full implementation, this would retrieve historical data # For now, we'll leave historical_trends as None pass logger.info(f"Retrieved dimension portrait for user {user_id}") return dimension_portrait except Exception as e: logger.error(f"Failed to get dimension portrait for user {user_id}: {e}") raise async def get_interest_area_distribution( self, user_id: str, include_trends: bool = False ) -> InterestAreaDistribution: """Get user's interest area distribution across four areas. Args: user_id: Target user ID include_trends: Whether to include trending information Returns: User's interest area distribution """ logger.info(f"Getting interest area distribution for user {user_id}") try: # Get user summaries user_summaries = await self.extract_user_summaries(user_id=user_id) if not user_summaries: logger.warning(f"No summaries found for user {user_id}") return self.interest_analyzer._create_empty_distribution(user_id) # Analyze interests interest_distribution = await self.interest_analyzer.analyze_interests( user_id=user_id, user_summaries=user_summaries ) # Include trends if requested if include_trends: # In a full implementation, this would calculate trending directions # For now, we'll leave trending_direction as None for each category pass logger.info(f"Retrieved interest area distribution for user {user_id}") return interest_distribution except Exception as e: logger.error(f"Failed to get interest area distribution for user {user_id}: {e}") raise async def get_behavior_habits( self, user_id: str, confidence_level: Optional[int] = None, frequency_pattern: Optional[str] = None, time_period: Optional[str] = None ) -> List[BehaviorHabit]: """Get user's behavioral habits with filtering. Args: user_id: Target user ID confidence_level: Optional confidence level filter (0-100) frequency_pattern: Optional frequency pattern filter time_period: Optional time period filter ("current", "past") Returns: List of filtered behavioral habits """ logger.info(f"Getting behavior habits for user {user_id}") try: # Get user summaries user_summaries = await self.extract_user_summaries(user_id=user_id) if not user_summaries: logger.warning(f"No summaries found for user {user_id}") return [] # Detect habits behavior_habits = await self.habit_detector.detect_habits( user_id=user_id, user_summaries=user_summaries ) # Apply filters filtered_habits = [] for habit in behavior_habits: # Filter by confidence level if confidence_level is not None: if habit.confidence_level < confidence_level: continue # Filter by frequency pattern if frequency_pattern: try: target_frequency = FrequencyPattern(frequency_pattern.lower()) if habit.frequency_pattern != target_frequency: continue except ValueError: logger.warning(f"Invalid frequency pattern: {frequency_pattern}") continue # Filter by time period if time_period: if time_period.lower() == "current" and not habit.is_current: continue elif time_period.lower() == "past" and habit.is_current: continue filtered_habits.append(habit) # Sort by confidence level and recency filtered_habits.sort( key=lambda x: (x.confidence_level, x.last_observed), reverse=True ) logger.info(f"Retrieved {len(filtered_habits)} behavior habits for user {user_id}") return filtered_habits except Exception as e: logger.error(f"Failed to get behavior habits for user {user_id}: {e}") raise async def generate_complete_profile( self, user_id: str ) -> dict: """生成完整的用户画像(包含所有4个模块) Args: user_id: 用户ID Returns: Dict: 包含所有模块的完整画像数据 """ logger.info(f"生成完整用户画像: user={user_id}") try: # 并行调用4个分析方法 preferences, portrait, interest_areas, habits = await asyncio.gather( self.get_preference_tags(user_id=user_id), self.get_dimension_portrait(user_id=user_id), self.get_interest_area_distribution(user_id=user_id), self.get_behavior_habits(user_id=user_id) ) # 转换为可序列化的格式 profile_data = { "preferences": [tag.model_dump(mode='json') for tag in preferences], "portrait": portrait.model_dump(mode='json'), "interest_areas": interest_areas.model_dump(mode='json'), "habits": [habit.model_dump(mode='json') for habit in habits] } logger.info(f"完整用户画像生成完成: user={user_id}") return profile_data except Exception as e: logger.error(f"生成完整用户画像失败: {str(e)}", exc_info=True) raise async def get_cached_profile( self, end_user_id: str, db: Session ) -> Optional[dict]: """从 Redis 缓存获取完整用户画像 Args: end_user_id: 终端用户ID db: 数据库会话(保留参数以保持接口兼容性) Returns: Dict: 缓存的画像数据,如果不存在或已过期返回 None """ try: from app.cache.memory.implicit_memory import ImplicitMemoryCache logger.info(f"尝试从 Redis 缓存获取用户画像: user={end_user_id}") # 从 Redis 获取缓存 cached_data = await ImplicitMemoryCache.get_user_profile(end_user_id) if cached_data is None: logger.info(f"用户 {end_user_id} 的画像缓存不存在或已过期") return None logger.info(f"成功从 Redis 缓存获取用户画像: user={end_user_id}") return cached_data except Exception as e: logger.error(f"从 Redis 缓存获取用户画像失败: {str(e)}", exc_info=True) return None async def save_profile_cache( self, end_user_id: str, profile_data: dict, db: Session, expires_hours: int = 168 # 默认7天 ) -> None: """保存用户画像到 Redis 缓存 Args: end_user_id: 终端用户ID profile_data: 画像数据 db: 数据库会话(保留参数以保持接口兼容性) expires_hours: 过期时间(小时),默认168小时(7天) """ try: from app.cache.memory.implicit_memory import ImplicitMemoryCache logger.info(f"保存用户画像到 Redis 缓存: user={end_user_id}, expires={expires_hours}小时") # 计算过期时间(秒) expire_seconds = expires_hours * 3600 # 保存到 Redis success = await ImplicitMemoryCache.set_user_profile( user_id=end_user_id, profile_data=profile_data, expire=expire_seconds ) if success: logger.info(f"用户画像缓存保存成功: user={end_user_id}") else: logger.warning(f"用户画像缓存保存失败: user={end_user_id}") except Exception as e: logger.error(f"保存用户画像缓存失败: {str(e)}", exc_info=True) # 不抛出异常,缓存失败不应影响主流程