Files
MemoryBear/api/app/services/implicit_memory_service.py
Ke Sun 7167c2002f feat(implicit memory): upgrade pydantic v2 compatibility and confidence level handling
- Replace deprecated `.dict()` with `.model_dump(mode='json')` for pydantic v2 compatibility
- Convert confidence level from enum-based strings to numerical values (0-100 scale)
- Add confidence level mapping in controller (high: 85, medium: 50, low: 20)
- Update dimension analyzer to handle both string and numeric confidence inputs
- Refactor habit analyzer confidence level validation logic
- Remove ConfidenceLevel enum import and replace with integer-based approach
- Update memory config validators for numerical confidence level support
- Ensure all implicit memory schemas use model_dump for serialization
- Improve type consistency across memory analytics modules
2026-01-08 17:50:01 +08:00

375 lines
14 KiB
Python

"""
Implicit Memory Service
Main service orchestrating all implicit memory operations. This service coordinates
profile building, data extraction, and provides high-level methods for analyzing
user profiles from memory summaries.
"""
import logging
from datetime import datetime
from typing import List, Optional
from app.core.memory.analytics.implicit_memory.analyzers.dimension_analyzer import (
DimensionAnalyzer,
)
from app.core.memory.analytics.implicit_memory.analyzers.interest_analyzer import (
InterestAnalyzer,
)
from app.core.memory.analytics.implicit_memory.analyzers.preference_analyzer import (
PreferenceAnalyzer,
)
from app.core.memory.analytics.implicit_memory.data_source import MemoryDataSource
from app.core.memory.analytics.implicit_memory.habit_detector import HabitDetector
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.implicit_memory_schema import (
BehaviorHabit,
DateRange,
DimensionPortrait,
FrequencyPattern,
InterestAreaDistribution,
PreferenceTag,
TimeRange,
UserMemorySummary,
)
from app.schemas.memory_config_schema import MemoryConfig
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
class ImplicitMemoryService:
"""Main service for implicit memory operations."""
def __init__(
self,
db: Session,
end_user_id: str
):
"""Initialize the implicit memory service.
Args:
db: Database session
end_user_id: End user ID to get connected memory configuration
"""
self.db = db
self.end_user_id = end_user_id
# Get connected memory configuration for the user
self.memory_config = self._get_user_memory_config()
# Extract LLM model ID from memory config
llm_model_id = str(self.memory_config.llm_model_id) if self.memory_config.llm_model_id else None
# Initialize Neo4j connector
self.neo4j_connector = Neo4jConnector()
# Initialize core components with LLM model ID
self.data_source = MemoryDataSource(db, self.neo4j_connector)
self.preference_analyzer = PreferenceAnalyzer(db, llm_model_id)
self.dimension_analyzer = DimensionAnalyzer(db, llm_model_id)
self.interest_analyzer = InterestAnalyzer(db, llm_model_id)
self.habit_detector = HabitDetector(db, llm_model_id)
logger.info(f"ImplicitMemoryService initialized for end_user: {end_user_id}")
def _get_user_memory_config(self) -> MemoryConfig:
"""Get memory configuration for the connected end user.
Returns:
MemoryConfig: User's connected memory configuration
Raises:
ValueError: If no memory configuration found for user
"""
try:
from app.services.memory_agent_service import get_end_user_connected_config
from app.services.memory_config_service import MemoryConfigService
# Get user's connected config
connected_config = get_end_user_connected_config(self.end_user_id, self.db)
config_id = connected_config.get("memory_config_id")
if config_id is None:
raise ValueError(f"No memory configuration found for end_user: {self.end_user_id}")
# Load the memory configuration
config_service = MemoryConfigService(self.db)
memory_config = config_service.load_memory_config(config_id)
logger.info(f"Loaded memory config {config_id} for end_user: {self.end_user_id}")
return memory_config
except Exception as e:
logger.error(f"Failed to get memory config for end_user {self.end_user_id}: {e}")
raise ValueError(f"Unable to get memory configuration for end_user {self.end_user_id}: {e}")
async def extract_user_summaries(
self,
user_id: str,
time_range: Optional[TimeRange] = None,
limit: Optional[int] = None
) -> List[UserMemorySummary]:
"""Extract user-specific memory summaries.
Args:
user_id: Target user ID
time_range: Optional time range to filter summaries
limit: Optional limit on number of summaries
Returns:
List of user-specific memory summaries
"""
logger.info(f"Extracting user summaries for user {user_id}")
try:
summaries = await self.data_source.get_user_summaries(
user_id=user_id,
time_range=time_range,
limit=limit or 1000
)
logger.info(f"Extracted {len(summaries)} summaries for user {user_id}")
return summaries
except Exception as e:
logger.error(f"Failed to extract user summaries for user {user_id}: {e}")
raise
async def get_preference_tags(
self,
user_id: str,
confidence_threshold: float = 0.5,
tag_category: Optional[str] = None,
date_range: Optional[DateRange] = None
) -> List[PreferenceTag]:
"""Retrieve user preference tags with filtering.
Args:
user_id: Target user ID
confidence_threshold: Minimum confidence score for tags
tag_category: Optional category filter
date_range: Optional date range filter
Returns:
List of filtered preference tags
"""
logger.info(f"Getting preference tags for user {user_id}")
try:
# Get user summaries for analysis
time_range = None
if date_range:
time_range = TimeRange(
start_date=date_range.start_date or datetime.min,
end_date=date_range.end_date or datetime.now()
)
user_summaries = await self.extract_user_summaries(
user_id=user_id,
time_range=time_range
)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return []
# Analyze preferences
preference_tags = await self.preference_analyzer.analyze_preferences(
user_id=user_id,
user_summaries=user_summaries
)
# Apply filters
filtered_tags = []
for tag in preference_tags:
# Filter by confidence threshold
if tag.confidence_score < confidence_threshold:
continue
# Filter by category if specified
if tag_category and tag.category != tag_category:
continue
# Filter by date range if specified
if date_range:
if date_range.start_date and tag.created_at < date_range.start_date:
continue
if date_range.end_date and tag.created_at > date_range.end_date:
continue
filtered_tags.append(tag)
# Sort by confidence score and recency
filtered_tags.sort(
key=lambda x: (x.confidence_score, x.updated_at),
reverse=True
)
logger.info(f"Retrieved {len(filtered_tags)} preference tags for user {user_id}")
return filtered_tags
except Exception as e:
logger.error(f"Failed to get preference tags for user {user_id}: {e}")
raise
async def get_dimension_portrait(
self,
user_id: str,
include_history: bool = False
) -> DimensionPortrait:
"""Get user's four-dimension personality portrait.
Args:
user_id: Target user ID
include_history: Whether to include historical trends
Returns:
User's dimension portrait
"""
logger.info(f"Getting dimension portrait for user {user_id}")
try:
# Get user summaries
user_summaries = await self.extract_user_summaries(user_id=user_id)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return self.dimension_analyzer._create_empty_portrait(user_id)
# Analyze dimensions
dimension_portrait = await self.dimension_analyzer.analyze_dimensions(
user_id=user_id,
user_summaries=user_summaries
)
# Include historical trends if requested
if include_history:
# In a full implementation, this would retrieve historical data
# For now, we'll leave historical_trends as None
pass
logger.info(f"Retrieved dimension portrait for user {user_id}")
return dimension_portrait
except Exception as e:
logger.error(f"Failed to get dimension portrait for user {user_id}: {e}")
raise
async def get_interest_area_distribution(
self,
user_id: str,
include_trends: bool = False
) -> InterestAreaDistribution:
"""Get user's interest area distribution across four areas.
Args:
user_id: Target user ID
include_trends: Whether to include trending information
Returns:
User's interest area distribution
"""
logger.info(f"Getting interest area distribution for user {user_id}")
try:
# Get user summaries
user_summaries = await self.extract_user_summaries(user_id=user_id)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return self.interest_analyzer._create_empty_distribution(user_id)
# Analyze interests
interest_distribution = await self.interest_analyzer.analyze_interests(
user_id=user_id,
user_summaries=user_summaries
)
# Include trends if requested
if include_trends:
# In a full implementation, this would calculate trending directions
# For now, we'll leave trending_direction as None for each category
pass
logger.info(f"Retrieved interest area distribution for user {user_id}")
return interest_distribution
except Exception as e:
logger.error(f"Failed to get interest area distribution for user {user_id}: {e}")
raise
async def get_behavior_habits(
self,
user_id: str,
confidence_level: Optional[int] = None,
frequency_pattern: Optional[str] = None,
time_period: Optional[str] = None
) -> List[BehaviorHabit]:
"""Get user's behavioral habits with filtering.
Args:
user_id: Target user ID
confidence_level: Optional confidence level filter (0-100)
frequency_pattern: Optional frequency pattern filter
time_period: Optional time period filter ("current", "past")
Returns:
List of filtered behavioral habits
"""
logger.info(f"Getting behavior habits for user {user_id}")
try:
# Get user summaries
user_summaries = await self.extract_user_summaries(user_id=user_id)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return []
# Detect habits
behavior_habits = await self.habit_detector.detect_habits(
user_id=user_id,
user_summaries=user_summaries
)
# Apply filters
filtered_habits = []
for habit in behavior_habits:
# Filter by confidence level
if confidence_level is not None:
if habit.confidence_level < confidence_level:
continue
# Filter by frequency pattern
if frequency_pattern:
try:
target_frequency = FrequencyPattern(frequency_pattern.lower())
if habit.frequency_pattern != target_frequency:
continue
except ValueError:
logger.warning(f"Invalid frequency pattern: {frequency_pattern}")
continue
# Filter by time period
if time_period:
if time_period.lower() == "current" and not habit.is_current:
continue
elif time_period.lower() == "past" and habit.is_current:
continue
filtered_habits.append(habit)
# Sort by confidence level and recency
filtered_habits.sort(
key=lambda x: (x.confidence_level, x.last_observed),
reverse=True
)
logger.info(f"Retrieved {len(filtered_habits)} behavior habits for user {user_id}")
return filtered_habits
except Exception as e:
logger.error(f"Failed to get behavior habits for user {user_id}: {e}")
raise