Merge branch 'develop' of github.com:SuanmoSuanyangTechnology/MemoryBear into develop
This commit is contained in:
385
api/app/services/implicit_memory_service.py
Normal file
385
api/app/services/implicit_memory_service.py
Normal file
@@ -0,0 +1,385 @@
|
||||
"""
|
||||
Implicit Memory Service
|
||||
|
||||
Main service orchestrating all implicit memory operations. This service coordinates
|
||||
profile building, data extraction, and provides high-level methods for analyzing
|
||||
user profiles from memory summaries.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
|
||||
from app.core.memory.analytics.implicit_memory.analyzers.dimension_analyzer import (
|
||||
DimensionAnalyzer,
|
||||
)
|
||||
from app.core.memory.analytics.implicit_memory.analyzers.interest_analyzer import (
|
||||
InterestAnalyzer,
|
||||
)
|
||||
from app.core.memory.analytics.implicit_memory.analyzers.preference_analyzer import (
|
||||
PreferenceAnalyzer,
|
||||
)
|
||||
from app.core.memory.analytics.implicit_memory.data_source import MemoryDataSource
|
||||
from app.core.memory.analytics.implicit_memory.habit_detector import HabitDetector
|
||||
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
|
||||
from app.schemas.implicit_memory_schema import (
|
||||
BehaviorHabit,
|
||||
ConfidenceLevel,
|
||||
DateRange,
|
||||
DimensionPortrait,
|
||||
FrequencyPattern,
|
||||
InterestAreaDistribution,
|
||||
PreferenceTag,
|
||||
TimeRange,
|
||||
UserMemorySummary,
|
||||
)
|
||||
from app.schemas.memory_config_schema import MemoryConfig
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImplicitMemoryService:
|
||||
"""Main service for implicit memory operations."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
db: Session,
|
||||
end_user_id: str
|
||||
):
|
||||
"""Initialize the implicit memory service.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
end_user_id: End user ID to get connected memory configuration
|
||||
"""
|
||||
self.db = db
|
||||
self.end_user_id = end_user_id
|
||||
|
||||
# Get connected memory configuration for the user
|
||||
self.memory_config = self._get_user_memory_config()
|
||||
|
||||
# Extract LLM model ID from memory config
|
||||
llm_model_id = str(self.memory_config.llm_model_id) if self.memory_config.llm_model_id else None
|
||||
|
||||
# Initialize Neo4j connector
|
||||
self.neo4j_connector = Neo4jConnector()
|
||||
|
||||
# Initialize core components with LLM model ID
|
||||
self.data_source = MemoryDataSource(db, self.neo4j_connector)
|
||||
self.preference_analyzer = PreferenceAnalyzer(db, llm_model_id)
|
||||
self.dimension_analyzer = DimensionAnalyzer(db, llm_model_id)
|
||||
self.interest_analyzer = InterestAnalyzer(db, llm_model_id)
|
||||
self.habit_detector = HabitDetector(db, llm_model_id)
|
||||
|
||||
logger.info(f"ImplicitMemoryService initialized for end_user: {end_user_id}")
|
||||
|
||||
def _get_user_memory_config(self) -> MemoryConfig:
|
||||
"""Get memory configuration for the connected end user.
|
||||
|
||||
Returns:
|
||||
MemoryConfig: User's connected memory configuration
|
||||
|
||||
Raises:
|
||||
ValueError: If no memory configuration found for user
|
||||
"""
|
||||
try:
|
||||
from app.services.memory_agent_service import get_end_user_connected_config
|
||||
from app.services.memory_config_service import MemoryConfigService
|
||||
|
||||
# Get user's connected config
|
||||
connected_config = get_end_user_connected_config(self.end_user_id, self.db)
|
||||
config_id = connected_config.get("memory_config_id")
|
||||
|
||||
if config_id is None:
|
||||
raise ValueError(f"No memory configuration found for end_user: {self.end_user_id}")
|
||||
|
||||
# Load the memory configuration
|
||||
config_service = MemoryConfigService(self.db)
|
||||
memory_config = config_service.load_memory_config(config_id)
|
||||
|
||||
logger.info(f"Loaded memory config {config_id} for end_user: {self.end_user_id}")
|
||||
return memory_config
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get memory config for end_user {self.end_user_id}: {e}")
|
||||
raise ValueError(f"Unable to get memory configuration for end_user {self.end_user_id}: {e}")
|
||||
|
||||
async def extract_user_summaries(
|
||||
self,
|
||||
user_id: str,
|
||||
time_range: Optional[TimeRange] = None,
|
||||
limit: Optional[int] = None
|
||||
) -> List[UserMemorySummary]:
|
||||
"""Extract user-specific memory summaries.
|
||||
|
||||
Args:
|
||||
user_id: Target user ID
|
||||
time_range: Optional time range to filter summaries
|
||||
limit: Optional limit on number of summaries
|
||||
|
||||
Returns:
|
||||
List of user-specific memory summaries
|
||||
"""
|
||||
logger.info(f"Extracting user summaries for user {user_id}")
|
||||
|
||||
try:
|
||||
summaries = await self.data_source.get_user_summaries(
|
||||
user_id=user_id,
|
||||
time_range=time_range,
|
||||
limit=limit or 1000
|
||||
)
|
||||
|
||||
logger.info(f"Extracted {len(summaries)} summaries for user {user_id}")
|
||||
return summaries
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract user summaries for user {user_id}: {e}")
|
||||
raise
|
||||
|
||||
async def get_preference_tags(
|
||||
self,
|
||||
user_id: str,
|
||||
confidence_threshold: float = 0.5,
|
||||
tag_category: Optional[str] = None,
|
||||
date_range: Optional[DateRange] = None
|
||||
) -> List[PreferenceTag]:
|
||||
"""Retrieve user preference tags with filtering.
|
||||
|
||||
Args:
|
||||
user_id: Target user ID
|
||||
confidence_threshold: Minimum confidence score for tags
|
||||
tag_category: Optional category filter
|
||||
date_range: Optional date range filter
|
||||
|
||||
Returns:
|
||||
List of filtered preference tags
|
||||
"""
|
||||
logger.info(f"Getting preference tags for user {user_id}")
|
||||
|
||||
try:
|
||||
# Get user summaries for analysis
|
||||
time_range = None
|
||||
if date_range:
|
||||
time_range = TimeRange(
|
||||
start_date=date_range.start_date or datetime.min,
|
||||
end_date=date_range.end_date or datetime.now()
|
||||
)
|
||||
|
||||
user_summaries = await self.extract_user_summaries(
|
||||
user_id=user_id,
|
||||
time_range=time_range
|
||||
)
|
||||
|
||||
if not user_summaries:
|
||||
logger.warning(f"No summaries found for user {user_id}")
|
||||
return []
|
||||
|
||||
# Analyze preferences
|
||||
preference_tags = await self.preference_analyzer.analyze_preferences(
|
||||
user_id=user_id,
|
||||
user_summaries=user_summaries
|
||||
)
|
||||
|
||||
# Apply filters
|
||||
filtered_tags = []
|
||||
for tag in preference_tags:
|
||||
# Filter by confidence threshold
|
||||
if tag.confidence_score < confidence_threshold:
|
||||
continue
|
||||
|
||||
# Filter by category if specified
|
||||
if tag_category and tag.category != tag_category:
|
||||
continue
|
||||
|
||||
# Filter by date range if specified
|
||||
if date_range:
|
||||
if date_range.start_date and tag.created_at < date_range.start_date:
|
||||
continue
|
||||
if date_range.end_date and tag.created_at > date_range.end_date:
|
||||
continue
|
||||
|
||||
filtered_tags.append(tag)
|
||||
|
||||
# Sort by confidence score and recency
|
||||
filtered_tags.sort(
|
||||
key=lambda x: (x.confidence_score, x.updated_at),
|
||||
reverse=True
|
||||
)
|
||||
|
||||
logger.info(f"Retrieved {len(filtered_tags)} preference tags for user {user_id}")
|
||||
return filtered_tags
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get preference tags for user {user_id}: {e}")
|
||||
raise
|
||||
|
||||
async def get_dimension_portrait(
|
||||
self,
|
||||
user_id: str,
|
||||
include_history: bool = False
|
||||
) -> DimensionPortrait:
|
||||
"""Get user's four-dimension personality portrait.
|
||||
|
||||
Args:
|
||||
user_id: Target user ID
|
||||
include_history: Whether to include historical trends
|
||||
|
||||
Returns:
|
||||
User's dimension portrait
|
||||
"""
|
||||
logger.info(f"Getting dimension portrait for user {user_id}")
|
||||
|
||||
try:
|
||||
# Get user summaries
|
||||
user_summaries = await self.extract_user_summaries(user_id=user_id)
|
||||
|
||||
if not user_summaries:
|
||||
logger.warning(f"No summaries found for user {user_id}")
|
||||
return self.dimension_analyzer._create_empty_portrait(user_id)
|
||||
|
||||
# Analyze dimensions
|
||||
dimension_portrait = await self.dimension_analyzer.analyze_dimensions(
|
||||
user_id=user_id,
|
||||
user_summaries=user_summaries
|
||||
)
|
||||
|
||||
# Include historical trends if requested
|
||||
if include_history:
|
||||
# In a full implementation, this would retrieve historical data
|
||||
# For now, we'll leave historical_trends as None
|
||||
pass
|
||||
|
||||
logger.info(f"Retrieved dimension portrait for user {user_id}")
|
||||
return dimension_portrait
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get dimension portrait for user {user_id}: {e}")
|
||||
raise
|
||||
|
||||
async def get_interest_area_distribution(
|
||||
self,
|
||||
user_id: str,
|
||||
include_trends: bool = False
|
||||
) -> InterestAreaDistribution:
|
||||
"""Get user's interest area distribution across four areas.
|
||||
|
||||
Args:
|
||||
user_id: Target user ID
|
||||
include_trends: Whether to include trending information
|
||||
|
||||
Returns:
|
||||
User's interest area distribution
|
||||
"""
|
||||
logger.info(f"Getting interest area distribution for user {user_id}")
|
||||
|
||||
try:
|
||||
# Get user summaries
|
||||
user_summaries = await self.extract_user_summaries(user_id=user_id)
|
||||
|
||||
if not user_summaries:
|
||||
logger.warning(f"No summaries found for user {user_id}")
|
||||
return self.interest_analyzer._create_empty_distribution(user_id)
|
||||
|
||||
# Analyze interests
|
||||
interest_distribution = await self.interest_analyzer.analyze_interests(
|
||||
user_id=user_id,
|
||||
user_summaries=user_summaries
|
||||
)
|
||||
|
||||
# Include trends if requested
|
||||
if include_trends:
|
||||
# In a full implementation, this would calculate trending directions
|
||||
# For now, we'll leave trending_direction as None for each category
|
||||
pass
|
||||
|
||||
logger.info(f"Retrieved interest area distribution for user {user_id}")
|
||||
return interest_distribution
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get interest area distribution for user {user_id}: {e}")
|
||||
raise
|
||||
|
||||
async def get_behavior_habits(
|
||||
self,
|
||||
user_id: str,
|
||||
confidence_level: Optional[str] = None,
|
||||
frequency_pattern: Optional[str] = None,
|
||||
time_period: Optional[str] = None
|
||||
) -> List[BehaviorHabit]:
|
||||
"""Get user's behavioral habits with filtering.
|
||||
|
||||
Args:
|
||||
user_id: Target user ID
|
||||
confidence_level: Optional confidence level filter ("high", "medium", "low")
|
||||
frequency_pattern: Optional frequency pattern filter
|
||||
time_period: Optional time period filter ("current", "past")
|
||||
|
||||
Returns:
|
||||
List of filtered behavioral habits
|
||||
"""
|
||||
logger.info(f"Getting behavior habits for user {user_id}")
|
||||
|
||||
try:
|
||||
# Get user summaries
|
||||
user_summaries = await self.extract_user_summaries(user_id=user_id)
|
||||
|
||||
if not user_summaries:
|
||||
logger.warning(f"No summaries found for user {user_id}")
|
||||
return []
|
||||
|
||||
# Detect habits
|
||||
behavior_habits = await self.habit_detector.detect_habits(
|
||||
user_id=user_id,
|
||||
user_summaries=user_summaries
|
||||
)
|
||||
|
||||
# Apply filters
|
||||
filtered_habits = []
|
||||
for habit in behavior_habits:
|
||||
# Filter by confidence level
|
||||
if confidence_level:
|
||||
try:
|
||||
target_confidence = ConfidenceLevel(confidence_level.lower())
|
||||
if habit.confidence_level != target_confidence:
|
||||
continue
|
||||
except ValueError:
|
||||
logger.warning(f"Invalid confidence level: {confidence_level}")
|
||||
continue
|
||||
|
||||
# Filter by frequency pattern
|
||||
if frequency_pattern:
|
||||
try:
|
||||
target_frequency = FrequencyPattern(frequency_pattern.lower())
|
||||
if habit.frequency_pattern != target_frequency:
|
||||
continue
|
||||
except ValueError:
|
||||
logger.warning(f"Invalid frequency pattern: {frequency_pattern}")
|
||||
continue
|
||||
|
||||
# Filter by time period
|
||||
if time_period:
|
||||
if time_period.lower() == "current" and not habit.is_current:
|
||||
continue
|
||||
elif time_period.lower() == "past" and habit.is_current:
|
||||
continue
|
||||
|
||||
filtered_habits.append(habit)
|
||||
|
||||
# Sort by confidence level and recency
|
||||
confidence_order = {"high": 3, "medium": 2, "low": 1}
|
||||
filtered_habits.sort(
|
||||
key=lambda x: (
|
||||
confidence_order.get(x.confidence_level.value, 0),
|
||||
x.last_observed
|
||||
),
|
||||
reverse=True
|
||||
)
|
||||
|
||||
logger.info(f"Retrieved {len(filtered_habits)} behavior habits for user {user_id}")
|
||||
return filtered_habits
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get behavior habits for user {user_id}: {e}")
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user