Feature/behavior analysis (#53)

* init behavior analysis

* init behavior analysis

* feat(implicit-memory): add implicit memory analytics system
This commit is contained in:
Ke Sun
2026-01-07 18:14:25 +08:00
committed by GitHub
parent 28eccd6ce9
commit b3f8de3062
20 changed files with 3447 additions and 22 deletions

View File

@@ -0,0 +1,6 @@
"""Implicit Memory Module
This module provides behavior analysis capabilities that build comprehensive user profiles
by analyzing memory summary nodes from Neo4j. It creates detailed user portraits across
multiple dimensions, tracks interest distributions, and identifies behavioral habits.
"""

View File

@@ -0,0 +1 @@
"""Analyzers package for implicit memory analysis components."""

View File

@@ -0,0 +1,264 @@
"""Dimension Analyzer for Implicit Memory System
This module implements LLM-based personality dimension analysis from user memory summaries.
It analyzes four key dimensions: creativity, aesthetic, technology, and literature,
providing percentage scores with evidence and reasoning.
"""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from app.core.memory.analytics.implicit_memory.llm_client import ImplicitMemoryLLMClient
from app.core.memory.llm_tools.llm_client import LLMClientException
from app.schemas.implicit_memory_schema import (
ConfidenceLevel,
DimensionPortrait,
DimensionScore,
UserMemorySummary,
)
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
class DimensionData(BaseModel):
"""Individual dimension analysis data."""
percentage: float = Field(ge=0.0, le=100.0)
evidence: List[str] = Field(default_factory=list)
reasoning: str = ""
confidence_level: str = "medium"
class DimensionAnalysisResponse(BaseModel):
"""Response model for dimension analysis."""
dimensions: Dict[str, DimensionData] = Field(default_factory=dict)
class DimensionAnalyzer:
"""Analyzes user memory summaries to extract personality dimensions."""
# Define the four dimensions we analyze
DIMENSIONS = ["creativity", "aesthetic", "technology", "literature"]
def __init__(self, db: Session, llm_model_id: Optional[str] = None):
"""Initialize the dimension analyzer.
Args:
db: Database session
llm_model_id: Optional LLM model ID to use for analysis
"""
self.db = db
self.llm_model_id = llm_model_id
self._llm_client = ImplicitMemoryLLMClient(db, llm_model_id)
async def analyze_dimensions(
self,
user_id: str,
user_summaries: List[UserMemorySummary],
existing_portrait: Optional[DimensionPortrait] = None
) -> DimensionPortrait:
"""Analyze user summaries to extract personality dimensions.
Args:
user_id: Target user ID
user_summaries: List of user-specific memory summaries
existing_portrait: Optional existing portrait for incremental updates
Returns:
Dimension portrait with four personality dimensions
Raises:
LLMClientException: If LLM analysis fails
"""
if not user_summaries:
logger.warning(f"No summaries provided for user {user_id}")
return self._create_empty_portrait(user_id)
try:
logger.info(f"Analyzing dimensions for user {user_id} with {len(user_summaries)} summaries")
# Use the LLM client wrapper for analysis
response = await self._llm_client.analyze_dimensions(
user_summaries=user_summaries,
user_id=user_id,
model_id=self.llm_model_id
)
# Create dimension scores
dimension_scores = {}
current_time = datetime.now()
for dimension_name in self.DIMENSIONS:
# Handle response as dictionary
dimensions_data = response.get("dimensions", {})
dimension_data = dimensions_data.get(dimension_name)
if dimension_data:
# Validate and create dimension score
score = self._create_dimension_score(
dimension_name=dimension_name,
dimension_data=dimension_data
)
dimension_scores[dimension_name] = score
else:
# Create default score if missing
logger.warning(f"Missing dimension data for {dimension_name}, using default")
dimension_scores[dimension_name] = self._create_default_dimension_score(dimension_name)
# Create dimension portrait
portrait = DimensionPortrait(
user_id=user_id,
creativity=dimension_scores["creativity"],
aesthetic=dimension_scores["aesthetic"],
technology=dimension_scores["technology"],
literature=dimension_scores["literature"],
analysis_timestamp=current_time,
total_summaries_analyzed=len(user_summaries),
historical_trends=self._calculate_historical_trends(existing_portrait) if existing_portrait else None
)
logger.info(f"Created dimension portrait for user {user_id}")
return portrait
except LLMClientException:
raise
except Exception as e:
logger.error(f"Dimension analysis failed for user {user_id}: {e}")
raise LLMClientException(f"Dimension analysis failed: {e}") from e
def _create_dimension_score(
self,
dimension_name: str,
dimension_data: dict
) -> DimensionScore:
"""Create a dimension score from analysis data.
Args:
dimension_name: Name of the dimension
dimension_data: Analysis data dictionary for the dimension
Returns:
DimensionScore object
"""
# Validate percentage - handle dict access
percentage = dimension_data.get("percentage", 0.0)
percentage = max(0.0, min(100.0, float(percentage)))
# Validate confidence level
confidence_level_str = dimension_data.get("confidence_level", "low")
confidence_level = self._validate_confidence_level(confidence_level_str)
# Ensure evidence is not empty
evidence = dimension_data.get("evidence", [])
if not evidence:
evidence = ["No specific evidence found"]
# Ensure reasoning is not empty
reasoning = dimension_data.get("reasoning", "")
if not reasoning:
reasoning = f"Analysis for {dimension_name} dimension"
return DimensionScore(
dimension_name=dimension_name,
percentage=percentage,
evidence=evidence,
reasoning=reasoning,
confidence_level=confidence_level
)
def _create_default_dimension_score(self, dimension_name: str) -> DimensionScore:
"""Create a default dimension score when analysis fails.
Args:
dimension_name: Name of the dimension
Returns:
Default DimensionScore object
"""
return DimensionScore(
dimension_name=dimension_name,
percentage=0.0,
evidence=["Insufficient data for analysis"],
reasoning=f"No clear evidence found for {dimension_name} dimension",
confidence_level=ConfidenceLevel.LOW
)
def _validate_confidence_level(self, confidence_str: str) -> ConfidenceLevel:
"""Validate and convert confidence level string.
Args:
confidence_str: Confidence level as string
Returns:
ConfidenceLevel enum value
"""
if not confidence_str:
return ConfidenceLevel.MEDIUM
confidence_str = str(confidence_str).lower().strip()
if confidence_str in ["high", "높음"]:
return ConfidenceLevel.HIGH
elif confidence_str in ["medium", "중간"]:
return ConfidenceLevel.MEDIUM
elif confidence_str in ["low", "낮음"]:
return ConfidenceLevel.LOW
else:
logger.warning(f"Unknown confidence level: {confidence_str}, defaulting to medium")
return ConfidenceLevel.MEDIUM
def _create_empty_portrait(self, user_id: str) -> DimensionPortrait:
"""Create an empty dimension portrait when no data is available.
Args:
user_id: Target user ID
Returns:
Empty DimensionPortrait
"""
current_time = datetime.now()
return DimensionPortrait(
user_id=user_id,
creativity=self._create_default_dimension_score("creativity"),
aesthetic=self._create_default_dimension_score("aesthetic"),
technology=self._create_default_dimension_score("technology"),
literature=self._create_default_dimension_score("literature"),
analysis_timestamp=current_time,
total_summaries_analyzed=0,
historical_trends=None
)
def _calculate_historical_trends(
self,
existing_portrait: DimensionPortrait
) -> List[Dict[str, Any]]:
"""Calculate historical trends from existing portrait.
Args:
existing_portrait: Previous dimension portrait
Returns:
List of historical trend data
"""
if not existing_portrait:
return []
# Create trend entry from existing portrait
trend_entry = {
"timestamp": existing_portrait.analysis_timestamp.isoformat(),
"creativity": existing_portrait.creativity.percentage,
"aesthetic": existing_portrait.aesthetic.percentage,
"technology": existing_portrait.technology.percentage,
"literature": existing_portrait.literature.percentage,
"total_summaries": existing_portrait.total_summaries_analyzed
}
# Combine with existing trends
existing_trends = existing_portrait.historical_trends or []
# Keep only recent trends (last 10 entries)
all_trends = existing_trends + [trend_entry]
return all_trends[-10:]

View File

@@ -0,0 +1,452 @@
"""Habit Analyzer for Implicit Memory System
This module implements LLM-based behavioral habit analysis from user memory summaries.
It identifies recurring behavioral patterns, temporal patterns, and consolidates
similar habits with confidence scoring.
"""
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from app.core.memory.analytics.implicit_memory.llm_client import ImplicitMemoryLLMClient
from app.core.memory.llm_tools.llm_client import LLMClientException
from app.schemas.implicit_memory_schema import (
BehaviorHabit,
ConfidenceLevel,
FrequencyPattern,
UserMemorySummary,
)
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
class HabitData(BaseModel):
"""Individual habit analysis data."""
habit_description: str
frequency_pattern: str
time_context: str
confidence_level: str
supporting_summaries: List[str] = Field(default_factory=list)
specific_examples: List[str] = Field(default_factory=list)
is_current: bool = True
class HabitAnalysisResponse(BaseModel):
"""Response model for habit analysis."""
habits: List[HabitData] = Field(default_factory=list)
class HabitAnalyzer:
"""Analyzes user memory summaries to extract behavioral habits."""
def __init__(self, db: Session, llm_model_id: Optional[str] = None):
"""Initialize the habit analyzer.
Args:
db: Database session
llm_model_id: Optional LLM model ID to use for analysis
"""
self.db = db
self.llm_model_id = llm_model_id
self._llm_client = ImplicitMemoryLLMClient(db, llm_model_id)
async def analyze_habits(
self,
user_id: str,
user_summaries: List[UserMemorySummary],
existing_habits: Optional[List[BehaviorHabit]] = None
) -> List[BehaviorHabit]:
"""Analyze user summaries to extract behavioral habits.
Args:
user_id: Target user ID
user_summaries: List of user-specific memory summaries
existing_habits: Optional existing habits for consolidation
Returns:
List of extracted behavioral habits
Raises:
LLMClientException: If LLM analysis fails
"""
if not user_summaries:
logger.warning(f"No summaries provided for user {user_id}")
return existing_habits or []
try:
logger.info(f"Analyzing habits for user {user_id} with {len(user_summaries)} summaries")
# Use the LLM client wrapper for analysis
response = await self._llm_client.analyze_habits(
user_summaries=user_summaries,
user_id=user_id,
model_id=self.llm_model_id
)
# Convert to BehaviorHabit objects
behavior_habits = []
current_time = datetime.now()
for habit_data in response.get("habits", []):
try:
# Handle habit_data as dictionary
supporting_summaries = habit_data.get("supporting_summaries", [])
specific_examples = habit_data.get("specific_examples", [])
# Determine observation dates from summaries
first_observed, last_observed = self._determine_observation_dates(
user_summaries, supporting_summaries
)
behavior_habit = BehaviorHabit(
habit_description=habit_data.get("habit_description", ""),
frequency_pattern=self._validate_frequency_pattern(habit_data.get("frequency_pattern", "occasional")),
time_context=habit_data.get("time_context", ""),
confidence_level=self._validate_confidence_level(habit_data.get("confidence_level", "medium")),
supporting_summaries=supporting_summaries,
specific_examples=specific_examples,
first_observed=first_observed,
last_observed=last_observed,
is_current=habit_data.get("is_current", True)
)
# Validate habit
if self._is_valid_habit(behavior_habit):
behavior_habits.append(behavior_habit)
else:
logger.warning(f"Invalid habit skipped: {behavior_habit.habit_description}")
except Exception as e:
logger.error(f"Error creating behavior habit: {e}")
continue
# Consolidate with existing habits if provided
if existing_habits:
behavior_habits = self._consolidate_habits(
new_habits=behavior_habits,
existing_habits=existing_habits
)
# Sort habits by confidence and recency
behavior_habits = self._sort_habits_by_priority(behavior_habits)
logger.info(f"Extracted {len(behavior_habits)} habits for user {user_id}")
return behavior_habits
except LLMClientException:
raise
except Exception as e:
logger.error(f"Habit analysis failed for user {user_id}: {e}")
raise LLMClientException(f"Habit analysis failed: {e}") from e
def _validate_frequency_pattern(self, frequency_str: str) -> FrequencyPattern:
"""Validate and convert frequency pattern string.
Args:
frequency_str: Frequency pattern as string
Returns:
FrequencyPattern enum value
"""
frequency_str = frequency_str.lower().strip()
frequency_mapping = {
"daily": FrequencyPattern.DAILY,
"weekly": FrequencyPattern.WEEKLY,
"monthly": FrequencyPattern.MONTHLY,
"seasonal": FrequencyPattern.SEASONAL,
"occasional": FrequencyPattern.OCCASIONAL,
"event_triggered": FrequencyPattern.EVENT_TRIGGERED,
"event-triggered": FrequencyPattern.EVENT_TRIGGERED,
}
return frequency_mapping.get(frequency_str, FrequencyPattern.OCCASIONAL)
def _validate_confidence_level(self, confidence_str: str) -> ConfidenceLevel:
"""Validate and convert confidence level string.
Args:
confidence_str: Confidence level as string
Returns:
ConfidenceLevel enum value
"""
confidence_str = confidence_str.lower().strip()
if confidence_str in ["high", "높음"]:
return ConfidenceLevel.HIGH
elif confidence_str in ["medium", "중간"]:
return ConfidenceLevel.MEDIUM
elif confidence_str in ["low", "낮음"]:
return ConfidenceLevel.LOW
else:
logger.warning(f"Unknown confidence level: {confidence_str}, defaulting to medium")
return ConfidenceLevel.MEDIUM
def _determine_observation_dates(
self,
user_summaries: List[UserMemorySummary],
supporting_summary_ids: List[str]
) -> tuple[datetime, datetime]:
"""Determine first and last observation dates for a habit.
Args:
user_summaries: List of user summaries
supporting_summary_ids: IDs of summaries supporting the habit
Returns:
Tuple of (first_observed, last_observed) dates
"""
from datetime import timezone
# Find summaries that support this habit
supporting_summaries = [
summary for summary in user_summaries
if summary.summary_id in supporting_summary_ids
]
if not supporting_summaries:
# Use all summaries if no specific supporting summaries found
supporting_summaries = user_summaries
if not supporting_summaries:
current_time = datetime.now(timezone.utc).replace(tzinfo=None)
return current_time, current_time
# Get date range from supporting summaries - normalize to naive datetimes
timestamps = []
for summary in supporting_summaries:
ts = summary.timestamp
# Convert to naive datetime if it's timezone-aware
if ts.tzinfo is not None:
ts = ts.replace(tzinfo=None)
timestamps.append(ts)
first_observed = min(timestamps)
last_observed = max(timestamps)
return first_observed, last_observed
def _is_valid_habit(self, habit: BehaviorHabit) -> bool:
"""Validate a behavioral habit.
Args:
habit: Behavioral habit to validate
Returns:
True if valid, False otherwise
"""
try:
# Check required fields
if not habit.habit_description or not habit.habit_description.strip():
return False
# Check time context
if not habit.time_context or not habit.time_context.strip():
return False
# Check supporting summaries
if not habit.supporting_summaries or len(habit.supporting_summaries) == 0:
return False
# Check specific examples
if not habit.specific_examples or len(habit.specific_examples) == 0:
return False
# Check observation dates
if habit.first_observed > habit.last_observed:
return False
return True
except Exception as e:
logger.error(f"Error validating habit: {e}")
return False
def _consolidate_habits(
self,
new_habits: List[BehaviorHabit],
existing_habits: List[BehaviorHabit],
similarity_threshold: float = 0.7
) -> List[BehaviorHabit]:
"""Consolidate new habits with existing ones.
Args:
new_habits: Newly extracted habits
existing_habits: Existing habits
similarity_threshold: Threshold for considering habits similar
Returns:
Consolidated list of habits
"""
consolidated = existing_habits.copy()
current_time = datetime.now()
for new_habit in new_habits:
# Find similar existing habit
similar_habit = self._find_similar_habit(
new_habit, existing_habits, similarity_threshold
)
if similar_habit:
# Update existing habit
updated_habit = self._merge_habits(similar_habit, new_habit, current_time)
# Replace in consolidated list
for i, habit in enumerate(consolidated):
if habit.habit_description == similar_habit.habit_description:
consolidated[i] = updated_habit
break
else:
# Add as new habit
consolidated.append(new_habit)
return consolidated
def _find_similar_habit(
self,
target_habit: BehaviorHabit,
existing_habits: List[BehaviorHabit],
threshold: float
) -> Optional[BehaviorHabit]:
"""Find similar habit in existing list.
Args:
target_habit: Habit to find similarity for
existing_habits: List of existing habits
threshold: Similarity threshold
Returns:
Similar habit if found, None otherwise
"""
target_desc = target_habit.habit_description.lower().strip()
for existing_habit in existing_habits:
existing_desc = existing_habit.habit_description.lower().strip()
# Check description similarity
desc_similarity = self._calculate_text_similarity(target_desc, existing_desc)
# Check frequency pattern match
frequency_match = (target_habit.frequency_pattern == existing_habit.frequency_pattern)
# Check time context similarity
time_similarity = self._calculate_text_similarity(
target_habit.time_context.lower(),
existing_habit.time_context.lower()
)
# Combined similarity score
combined_similarity = (desc_similarity * 0.6 + time_similarity * 0.4)
if frequency_match:
combined_similarity += 0.1 # Bonus for frequency match
if combined_similarity >= threshold:
return existing_habit
return None
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
"""Calculate simple text similarity based on common words.
Args:
text1: First text
text2: Second text
Returns:
Similarity score between 0.0 and 1.0
"""
if not text1 or not text2:
return 0.0
# Simple word-based similarity
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union) if union else 0.0
def _merge_habits(
self,
existing_habit: BehaviorHabit,
new_habit: BehaviorHabit,
current_time: datetime
) -> BehaviorHabit:
"""Merge two similar habits.
Args:
existing_habit: Existing habit
new_habit: New habit to merge
current_time: Current timestamp
Returns:
Merged behavioral habit
"""
# Combine supporting summaries
combined_summaries = list(set(
existing_habit.supporting_summaries + new_habit.supporting_summaries
))
# Combine specific examples
combined_examples = list(set(
existing_habit.specific_examples + new_habit.specific_examples
))
# Update confidence level (take higher confidence)
confidence_levels = [existing_habit.confidence_level, new_habit.confidence_level]
new_confidence = max(confidence_levels, key=lambda x: ["low", "medium", "high"].index(x.value))
# Update observation dates
first_observed = min(existing_habit.first_observed, new_habit.first_observed)
last_observed = max(existing_habit.last_observed, new_habit.last_observed)
# Determine if habit is current (observed within last 30 days)
is_current = (current_time - last_observed).days <= 30
# Combine time context
combined_time_context = existing_habit.time_context
if new_habit.time_context and new_habit.time_context not in combined_time_context:
combined_time_context += f"; {new_habit.time_context}"
return BehaviorHabit(
habit_description=existing_habit.habit_description, # Keep original description
frequency_pattern=existing_habit.frequency_pattern, # Keep original frequency
time_context=combined_time_context,
confidence_level=new_confidence,
supporting_summaries=combined_summaries,
specific_examples=combined_examples,
first_observed=first_observed,
last_observed=last_observed,
is_current=is_current
)
def _sort_habits_by_priority(self, habits: List[BehaviorHabit]) -> List[BehaviorHabit]:
"""Sort habits by confidence level and recency.
Args:
habits: List of habits to sort
Returns:
Sorted list of habits
"""
def priority_score(habit: BehaviorHabit) -> tuple:
# Confidence level score (high=3, medium=2, low=1)
confidence_score = {"high": 3, "medium": 2, "low": 1}.get(habit.confidence_level.value, 1)
# Recency score (more recent = higher score)
days_since_last = (datetime.now() - habit.last_observed).days
recency_score = max(0, 365 - days_since_last) # Max 365 days
# Current habit bonus
current_bonus = 100 if habit.is_current else 0
return (confidence_score, recency_score + current_bonus, habit.last_observed)
return sorted(habits, key=priority_score, reverse=True)

View File

@@ -0,0 +1,277 @@
"""Interest Analyzer for Implicit Memory System
This module implements LLM-based interest area analysis from user memory summaries.
It categorizes user interests into four areas: tech, lifestyle, music, and art,
providing percentage distribution that totals 100%.
"""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from app.core.memory.analytics.implicit_memory.llm_client import ImplicitMemoryLLMClient
from app.core.memory.llm_tools.llm_client import LLMClientException
from app.schemas.implicit_memory_schema import (
InterestAreaDistribution,
InterestCategory,
UserMemorySummary,
)
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
class InterestData(BaseModel):
"""Individual interest category analysis data."""
percentage: float = Field(ge=0.0, le=100.0)
evidence: List[str] = Field(default_factory=list)
trending_direction: Optional[str] = None
class InterestAnalysisResponse(BaseModel):
"""Response model for interest analysis."""
interest_distribution: Dict[str, InterestData] = Field(default_factory=dict)
class InterestAnalyzer:
"""Analyzes user memory summaries to extract interest area distribution."""
# Define the four interest categories we analyze
INTEREST_CATEGORIES = ["tech", "lifestyle", "music", "art"]
def __init__(self, db: Session, llm_model_id: Optional[str] = None):
"""Initialize the interest analyzer.
Args:
db: Database session
llm_model_id: Optional LLM model ID to use for analysis
"""
self.db = db
self.llm_model_id = llm_model_id
self._llm_client = ImplicitMemoryLLMClient(db, llm_model_id)
async def analyze_interests(
self,
user_id: str,
user_summaries: List[UserMemorySummary],
existing_distribution: Optional[InterestAreaDistribution] = None
) -> InterestAreaDistribution:
"""Analyze user summaries to extract interest area distribution.
Args:
user_id: Target user ID
user_summaries: List of user-specific memory summaries
existing_distribution: Optional existing distribution for trend tracking
Returns:
Interest area distribution across four categories
Raises:
LLMClientException: If LLM analysis fails
"""
if not user_summaries:
logger.warning(f"No summaries provided for user {user_id}")
return self._create_empty_distribution(user_id)
try:
logger.info(f"Analyzing interests for user {user_id} with {len(user_summaries)} summaries")
# Use the LLM client wrapper for analysis
response = await self._llm_client.analyze_interests(
user_summaries=user_summaries,
user_id=user_id,
model_id=self.llm_model_id
)
# Create interest categories
interest_categories = {}
current_time = datetime.now()
# Extract interest_distribution from response dict
interest_distribution = response.get("interest_distribution", {})
# Extract and validate interest data
raw_interests = {}
for category_name in self.INTEREST_CATEGORIES:
interest_data_dict = interest_distribution.get(category_name)
if interest_data_dict:
raw_interests[category_name] = InterestData(
percentage=interest_data_dict.get("percentage", 0.0),
evidence=interest_data_dict.get("evidence", []),
trending_direction=interest_data_dict.get("trending_direction")
)
else:
# Create default if missing
logger.warning(f"Missing interest data for {category_name}, using default")
raw_interests[category_name] = InterestData(
percentage=0.0,
evidence=["No specific evidence found"],
trending_direction=None
)
# Normalize percentages to ensure they sum to 100%
normalized_interests = self._normalize_percentages(raw_interests)
# Create interest category objects
for category_name in self.INTEREST_CATEGORIES:
interest_data = normalized_interests[category_name]
# Calculate trending direction if we have existing data
trending_direction = self._calculate_trending_direction(
category_name=category_name,
current_percentage=interest_data.percentage,
existing_distribution=existing_distribution
) if existing_distribution else interest_data.trending_direction
interest_categories[category_name] = InterestCategory(
category_name=category_name,
percentage=interest_data.percentage,
evidence=interest_data.evidence if interest_data.evidence else ["No specific evidence found"],
trending_direction=trending_direction
)
# Create interest area distribution
distribution = InterestAreaDistribution(
user_id=user_id,
tech=interest_categories["tech"],
lifestyle=interest_categories["lifestyle"],
music=interest_categories["music"],
art=interest_categories["art"],
analysis_timestamp=current_time,
total_summaries_analyzed=len(user_summaries)
)
# Validate that percentages sum to 100%
total_percentage = distribution.total_percentage
if not (99.9 <= total_percentage <= 100.1):
logger.warning(f"Interest percentages sum to {total_percentage}, expected ~100%")
logger.info(f"Created interest distribution for user {user_id}")
return distribution
except LLMClientException:
raise
except Exception as e:
logger.error(f"Interest analysis failed for user {user_id}: {e}")
raise LLMClientException(f"Interest analysis failed: {e}") from e
def _normalize_percentages(self, raw_interests: Dict[str, InterestData]) -> Dict[str, InterestData]:
"""Normalize percentages to ensure they sum to 100%.
Args:
raw_interests: Raw interest data with potentially unnormalized percentages
Returns:
Normalized interest data
"""
# Calculate current total
total = sum(interest.percentage for interest in raw_interests.values())
if total == 0:
# If all percentages are 0, distribute equally
equal_percentage = 100.0 / len(self.INTEREST_CATEGORIES)
normalized = {}
for category_name, interest_data in raw_interests.items():
normalized[category_name] = InterestData(
percentage=equal_percentage,
evidence=interest_data.evidence,
trending_direction=interest_data.trending_direction
)
return normalized
# Normalize to sum to 100%
normalization_factor = 100.0 / total
normalized = {}
for category_name, interest_data in raw_interests.items():
normalized_percentage = interest_data.percentage * normalization_factor
normalized[category_name] = InterestData(
percentage=round(normalized_percentage, 1),
evidence=interest_data.evidence,
trending_direction=interest_data.trending_direction
)
# Handle rounding errors by adjusting the largest category
current_total = sum(interest.percentage for interest in normalized.values())
if abs(current_total - 100.0) > 0.1:
# Find category with largest percentage and adjust
largest_category = max(normalized.keys(), key=lambda k: normalized[k].percentage)
adjustment = 100.0 - current_total
adjusted_percentage = normalized[largest_category].percentage + adjustment
normalized[largest_category] = InterestData(
percentage=round(max(0.0, adjusted_percentage), 1),
evidence=normalized[largest_category].evidence,
trending_direction=normalized[largest_category].trending_direction
)
return normalized
def _calculate_trending_direction(
self,
category_name: str,
current_percentage: float,
existing_distribution: InterestAreaDistribution,
threshold: float = 5.0
) -> Optional[str]:
"""Calculate trending direction for an interest category.
Args:
category_name: Name of the interest category
current_percentage: Current percentage for the category
existing_distribution: Previous distribution for comparison
threshold: Minimum percentage change to consider a trend
Returns:
Trending direction: "increasing", "decreasing", "stable", or None
"""
try:
# Get previous percentage
previous_category = getattr(existing_distribution, category_name, None)
if not previous_category:
return None
previous_percentage = previous_category.percentage
change = current_percentage - previous_percentage
if abs(change) < threshold:
return "stable"
elif change > 0:
return "increasing"
else:
return "decreasing"
except Exception as e:
logger.error(f"Error calculating trending direction for {category_name}: {e}")
return None
def _create_empty_distribution(self, user_id: str) -> InterestAreaDistribution:
"""Create an empty interest distribution when no data is available.
Args:
user_id: Target user ID
Returns:
Empty InterestAreaDistribution with equal percentages
"""
current_time = datetime.now()
equal_percentage = 25.0 # 100% / 4 categories
default_category = lambda name: InterestCategory(
category_name=name,
percentage=equal_percentage,
evidence=["Insufficient data for analysis"],
trending_direction=None
)
return InterestAreaDistribution(
user_id=user_id,
tech=default_category("tech"),
lifestyle=default_category("lifestyle"),
music=default_category("music"),
art=default_category("art"),
analysis_timestamp=current_time,
total_summaries_analyzed=0
)

View File

@@ -0,0 +1,302 @@
"""Preference Analyzer for Implicit Memory System
This module implements LLM-based preference extraction from user memory summaries.
It identifies implicit preferences, consolidates similar preferences, and calculates
confidence scores based on evidence strength.
"""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from app.core.memory.analytics.implicit_memory.llm_client import ImplicitMemoryLLMClient
from app.core.memory.llm_tools.llm_client import LLMClientException
from app.schemas.implicit_memory_schema import (
PreferenceTag,
UserMemorySummary,
)
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
class PreferenceAnalysisResponse(BaseModel):
"""Response model for preference analysis."""
preferences: List[Dict[str, Any]] = Field(default_factory=list)
class PreferenceAnalyzer:
"""Analyzes user memory summaries to extract implicit preferences."""
def __init__(self, db: Session, llm_model_id: Optional[str] = None):
"""Initialize the preference analyzer.
Args:
db: Database session
llm_model_id: Optional LLM model ID to use for analysis
"""
self.db = db
self.llm_model_id = llm_model_id
self._llm_client = ImplicitMemoryLLMClient(db, llm_model_id)
async def analyze_preferences(
self,
user_id: str,
user_summaries: List[UserMemorySummary],
existing_preferences: Optional[List[PreferenceTag]] = None
) -> List[PreferenceTag]:
"""Analyze user summaries to extract preferences.
Args:
user_id: Target user ID
user_summaries: List of user-specific memory summaries
existing_preferences: Optional existing preferences for consolidation
Returns:
List of extracted preference tags
Raises:
LLMClientException: If LLM analysis fails
"""
if not user_summaries:
logger.warning(f"No summaries provided for user {user_id}")
return []
try:
logger.info(f"Analyzing preferences for user {user_id} with {len(user_summaries)} summaries")
# Use the LLM client wrapper for analysis
response = await self._llm_client.analyze_preferences(
user_summaries=user_summaries,
user_id=user_id,
model_id=self.llm_model_id
)
# Convert to PreferenceTag objects
preference_tags = []
current_time = datetime.now()
for pref_data in response.get("preferences", []):
try:
# Extract conversation references from summaries
conversation_refs = [s.summary_id for s in user_summaries]
preference_tag = PreferenceTag(
tag_name=pref_data.get("tag_name", ""),
confidence_score=float(pref_data.get("confidence_score", 0.0)),
supporting_evidence=pref_data.get("supporting_evidence", []),
context_details=pref_data.get("context_details", ""),
category=pref_data.get("category"),
conversation_references=conversation_refs,
created_at=current_time,
updated_at=current_time
)
# Validate preference tag
if self._is_valid_preference(preference_tag):
preference_tags.append(preference_tag)
else:
logger.warning(f"Invalid preference tag skipped: {preference_tag.tag_name}")
except Exception as e:
logger.error(f"Error creating preference tag: {e}")
continue
# Consolidate with existing preferences if provided
if existing_preferences:
preference_tags = self._consolidate_preferences(
new_preferences=preference_tags,
existing_preferences=existing_preferences
)
logger.info(f"Extracted {len(preference_tags)} preferences for user {user_id}")
return preference_tags
except LLMClientException:
raise
except Exception as e:
logger.error(f"Preference analysis failed for user {user_id}: {e}")
raise LLMClientException(f"Preference analysis failed: {e}") from e
def _is_valid_preference(self, preference: PreferenceTag) -> bool:
"""Validate a preference tag.
Args:
preference: Preference tag to validate
Returns:
True if valid, False otherwise
"""
try:
# Check required fields
if not preference.tag_name or not preference.tag_name.strip():
return False
# Check confidence score range
if not (0.0 <= preference.confidence_score <= 1.0):
return False
# Check supporting evidence
if not preference.supporting_evidence or len(preference.supporting_evidence) == 0:
return False
# Check context details
if not preference.context_details or not preference.context_details.strip():
return False
return True
except Exception as e:
logger.error(f"Error validating preference: {e}")
return False
def _consolidate_preferences(
self,
new_preferences: List[PreferenceTag],
existing_preferences: List[PreferenceTag],
similarity_threshold: float = 0.8
) -> List[PreferenceTag]:
"""Consolidate new preferences with existing ones.
Args:
new_preferences: Newly extracted preferences
existing_preferences: Existing preferences
similarity_threshold: Threshold for considering preferences similar
Returns:
Consolidated list of preferences
"""
consolidated = existing_preferences.copy()
current_time = datetime.now()
for new_pref in new_preferences:
# Find similar existing preference
similar_pref = self._find_similar_preference(
new_pref, existing_preferences, similarity_threshold
)
if similar_pref:
# Update existing preference
updated_pref = self._merge_preferences(similar_pref, new_pref, current_time)
# Replace in consolidated list
for i, pref in enumerate(consolidated):
if pref.tag_name == similar_pref.tag_name:
consolidated[i] = updated_pref
break
else:
# Add as new preference
consolidated.append(new_pref)
return consolidated
def _find_similar_preference(
self,
target_preference: PreferenceTag,
existing_preferences: List[PreferenceTag],
threshold: float
) -> Optional[PreferenceTag]:
"""Find similar preference in existing list.
Args:
target_preference: Preference to find similarity for
existing_preferences: List of existing preferences
threshold: Similarity threshold
Returns:
Similar preference if found, None otherwise
"""
target_name = target_preference.tag_name.lower().strip()
for existing_pref in existing_preferences:
existing_name = existing_pref.tag_name.lower().strip()
# Simple similarity check based on common words
similarity = self._calculate_text_similarity(target_name, existing_name)
if similarity >= threshold:
return existing_pref
return None
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
"""Calculate simple text similarity based on common words.
Args:
text1: First text
text2: Second text
Returns:
Similarity score between 0.0 and 1.0
"""
if not text1 or not text2:
return 0.0
# Simple word-based similarity
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union) if union else 0.0
def _merge_preferences(
self,
existing_pref: PreferenceTag,
new_pref: PreferenceTag,
current_time: datetime
) -> PreferenceTag:
"""Merge two similar preferences.
Args:
existing_pref: Existing preference
new_pref: New preference to merge
current_time: Current timestamp
Returns:
Merged preference tag
"""
# Combine supporting evidence
combined_evidence = list(set(
existing_pref.supporting_evidence + new_pref.supporting_evidence
))
# Combine conversation references
combined_refs = list(set(
existing_pref.conversation_references + new_pref.conversation_references
))
# Calculate new confidence score (weighted average)
evidence_weight = len(new_pref.supporting_evidence)
total_weight = len(existing_pref.supporting_evidence) + evidence_weight
if total_weight > 0:
new_confidence = (
(existing_pref.confidence_score * len(existing_pref.supporting_evidence) +
new_pref.confidence_score * evidence_weight) / total_weight
)
else:
new_confidence = max(existing_pref.confidence_score, new_pref.confidence_score)
# Ensure confidence doesn't exceed 1.0
new_confidence = min(new_confidence, 1.0)
# Combine context details
combined_context = existing_pref.context_details
if new_pref.context_details and new_pref.context_details not in combined_context:
combined_context += f"; {new_pref.context_details}"
return PreferenceTag(
tag_name=existing_pref.tag_name, # Keep original name
confidence_score=new_confidence,
supporting_evidence=combined_evidence,
context_details=combined_context,
category=existing_pref.category or new_pref.category,
conversation_references=combined_refs,
created_at=existing_pref.created_at,
updated_at=current_time
)

View File

@@ -0,0 +1,97 @@
"""
Memory Data Source
Handles retrieval and processing of memory data from Neo4j using direct Cypher queries.
"""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from app.repositories.neo4j.memory_summary_repository import MemorySummaryRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.implicit_memory_schema import TimeRange, UserMemorySummary
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
class MemoryDataSource:
"""Retrieves processed memory data from Neo4j using direct Cypher queries."""
def __init__(
self,
db: Session,
neo4j_connector: Optional[Neo4jConnector] = None
):
self.db = db
self.neo4j_connector = neo4j_connector or Neo4jConnector()
self.memory_summary_repo = MemorySummaryRepository(self.neo4j_connector)
def _parse_timestamp(self, timestamp: Any) -> datetime:
"""Parse timestamp from various formats."""
if isinstance(timestamp, str):
return datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
elif timestamp is None:
return datetime.now()
return timestamp
def _dict_to_user_summary(self, summary_dict: Dict, user_id: str) -> Optional[UserMemorySummary]:
"""Convert a Neo4j dict directly to UserMemorySummary."""
try:
content = summary_dict.get("content", summary_dict.get("summary", ""))
if not content or not content.strip():
return None
return UserMemorySummary(
summary_id=summary_dict.get("id", summary_dict.get("uuid", "")),
user_id=user_id,
user_content=content,
timestamp=self._parse_timestamp(summary_dict.get("created_at")),
confidence_score=1.0,
summary_type="memory_summary"
)
except Exception as e:
logger.warning(f"Failed to parse summary {summary_dict.get('id', 'unknown')}: {e}")
return None
async def get_user_summaries(
self,
user_id: str,
time_range: Optional[TimeRange] = None,
limit: int = 1000
) -> List[UserMemorySummary]:
"""Retrieve user memory summaries from Neo4j.
Args:
user_id: Target user ID
time_range: Optional time range filter
limit: Maximum number of summaries
Returns:
List of user memory summaries
"""
try:
start_date = time_range.start_date if time_range else None
end_date = time_range.end_date if time_range else None
summary_dicts = await self.memory_summary_repo.find_by_group_id(
group_id=user_id,
limit=limit,
start_date=start_date,
end_date=end_date
)
summaries = []
for summary_dict in summary_dicts:
summary = self._dict_to_user_summary(summary_dict, user_id)
if summary:
summaries.append(summary)
logger.info(f"Retrieved {len(summaries)} summaries for user {user_id}")
return summaries
except Exception as e:
logger.error(f"Failed to retrieve summaries for user {user_id}: {e}")
raise

View File

@@ -0,0 +1,234 @@
"""Habit Detector for Implicit Memory System
This module implements the HabitDetector class that specializes in identifying
and ranking behavioral habits from user memory summaries. It provides advanced
habit analysis with confidence scoring, recency weighting, and current vs past
habit distinction.
"""
import logging
from datetime import datetime, timedelta
from typing import List, Optional
from app.core.memory.analytics.implicit_memory.analyzers.habit_analyzer import (
HabitAnalyzer,
)
from app.core.memory.llm_tools.llm_client import LLMClientException
from app.schemas.implicit_memory_schema import (
BehaviorHabit,
ConfidenceLevel,
FrequencyPattern,
UserMemorySummary,
)
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
class HabitDetector:
"""Detects and ranks behavioral habits from user memory summaries."""
def __init__(
self,
db: Session,
llm_model_id: Optional[str] = None
):
"""Initialize the habit detector.
Args:
db: Database session
llm_model_id: Optional LLM model ID to use for analysis
"""
self.db = db
self.llm_model_id = llm_model_id
self.habit_analyzer = HabitAnalyzer(db, llm_model_id)
async def detect_habits(
self,
user_id: str,
user_summaries: List[UserMemorySummary],
existing_habits: Optional[List[BehaviorHabit]] = None
) -> List[BehaviorHabit]:
"""Detect behavioral habits from user summaries.
Args:
user_id: Target user ID
user_summaries: List of user-specific memory summaries
existing_habits: Optional existing habits for consolidation
Returns:
List of detected and ranked behavioral habits
Raises:
LLMClientException: If habit analysis fails
"""
if not user_summaries:
logger.warning(f"No summaries provided for user {user_id}")
return existing_habits or []
logger.info(f"Detecting habits for user {user_id} with {len(user_summaries)} summaries")
try:
# Use the habit analyzer to extract habits
detected_habits = await self.habit_analyzer.analyze_habits(
user_id=user_id,
user_summaries=user_summaries,
existing_habits=existing_habits
)
# Apply advanced ranking and filtering
ranked_habits = self.rank_habits_by_confidence_and_recency(detected_habits)
# Distinguish current vs past habits
categorized_habits = self.distinguish_current_vs_past_habits(ranked_habits)
logger.info(f"Detected {len(categorized_habits)} habits for user {user_id}")
return categorized_habits
except LLMClientException:
logger.error(f"Habit detection failed for user {user_id}")
raise
except Exception as e:
logger.error(f"Habit detection failed for user {user_id}: {e}")
raise LLMClientException(f"Habit detection failed: {e}") from e
def rank_habits_by_confidence_and_recency(
self,
habits: List[BehaviorHabit],
confidence_weight: float = 0.6,
recency_weight: float = 0.4
) -> List[BehaviorHabit]:
"""Rank habits by confidence level and recency.
Args:
habits: List of habits to rank
confidence_weight: Weight for confidence score (0.0-1.0)
recency_weight: Weight for recency score (0.0-1.0)
Returns:
List of habits ranked by combined score
"""
if not habits:
return []
logger.info(f"Ranking {len(habits)} habits by confidence and recency")
def calculate_ranking_score(habit: BehaviorHabit) -> float:
"""Calculate combined ranking score for a habit."""
# Confidence score (0.0-1.0)
confidence_scores = {
ConfidenceLevel.HIGH: 1.0,
ConfidenceLevel.MEDIUM: 0.6,
ConfidenceLevel.LOW: 0.3
}
confidence_score = confidence_scores.get(habit.confidence_level, 0.3)
# Recency score (0.0-1.0)
current_time = datetime.now()
days_since_last = (current_time - habit.last_observed).days
# Exponential decay for recency (habits lose relevance over time)
if days_since_last <= 7:
recency_score = 1.0 # Very recent
elif days_since_last <= 30:
recency_score = 0.8 # Recent
elif days_since_last <= 90:
recency_score = 0.5 # Somewhat recent
elif days_since_last <= 180:
recency_score = 0.3 # Old
else:
recency_score = 0.1 # Very old
# Frequency pattern bonus
frequency_bonuses = {
FrequencyPattern.DAILY: 0.2,
FrequencyPattern.WEEKLY: 0.15,
FrequencyPattern.MONTHLY: 0.1,
FrequencyPattern.SEASONAL: 0.05,
FrequencyPattern.OCCASIONAL: 0.0,
FrequencyPattern.EVENT_TRIGGERED: 0.05
}
frequency_bonus = frequency_bonuses.get(habit.frequency_pattern, 0.0)
# Evidence quality bonus
evidence_bonus = min(len(habit.supporting_summaries) / 10.0, 0.1) # Max 0.1 bonus
# Current habit bonus
current_bonus = 0.1 if habit.is_current else 0.0
# Calculate final score
base_score = (confidence_score * confidence_weight +
recency_score * recency_weight)
final_score = base_score + frequency_bonus + evidence_bonus + current_bonus
return min(final_score, 1.0) # Cap at 1.0
# Sort habits by ranking score (descending)
ranked_habits = sorted(habits, key=calculate_ranking_score, reverse=True)
logger.info(f"Ranked habits with scores: {[calculate_ranking_score(h) for h in ranked_habits[:5]]}")
return ranked_habits
def distinguish_current_vs_past_habits(
self,
habits: List[BehaviorHabit],
current_threshold_days: int = 30
) -> List[BehaviorHabit]:
"""Distinguish between current and past habits based on recency.
Args:
habits: List of habits to categorize
current_threshold_days: Days threshold for considering a habit current
Returns:
List of habits with updated is_current status
"""
if not habits:
return []
current_time = datetime.now()
cutoff_date = current_time - timedelta(days=current_threshold_days)
current_habits = []
past_habits = []
for habit in habits:
# Update is_current status based on last observation
if habit.last_observed >= cutoff_date:
# Create updated habit with is_current = True
updated_habit = BehaviorHabit(
habit_description=habit.habit_description,
frequency_pattern=habit.frequency_pattern,
time_context=habit.time_context,
confidence_level=habit.confidence_level,
supporting_summaries=habit.supporting_summaries,
specific_examples=habit.specific_examples,
first_observed=habit.first_observed,
last_observed=habit.last_observed,
is_current=True
)
current_habits.append(updated_habit)
else:
# Create updated habit with is_current = False
updated_habit = BehaviorHabit(
habit_description=habit.habit_description,
frequency_pattern=habit.frequency_pattern,
time_context=habit.time_context,
confidence_level=habit.confidence_level,
supporting_summaries=habit.supporting_summaries,
specific_examples=habit.specific_examples,
first_observed=habit.first_observed,
last_observed=habit.last_observed,
is_current=False
)
past_habits.append(updated_habit)
# Return current habits first, then past habits
categorized_habits = current_habits + past_habits
logger.info(f"Categorized habits: {len(current_habits)} current, {len(past_habits)} past")
return categorized_habits

View File

@@ -0,0 +1,321 @@
"""LLM Client Wrapper for Implicit Memory Analysis
This module provides a specialized LLM client wrapper that integrates with the
MemoryClientFactory to perform implicit memory analysis tasks including preference
extraction, personality dimension analysis, interest categorization, and habit detection.
"""
import logging
from typing import Any, Dict, List, Optional
from app.core.memory.analytics.implicit_memory.prompts import (
get_dimension_analysis_prompt,
get_habit_analysis_prompt,
get_interest_analysis_prompt,
get_preference_analysis_prompt,
)
from app.core.memory.llm_tools.llm_client import LLMClientException
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.schemas.implicit_memory_schema import UserMemorySummary
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
# Response Models for LLM Analysis
class PreferenceAnalysisResponse(BaseModel):
"""Response model for preference analysis."""
preferences: List[Dict[str, Any]] = Field(default_factory=list)
class DimensionAnalysisResponse(BaseModel):
"""Response model for dimension analysis."""
dimensions: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
class InterestAnalysisResponse(BaseModel):
"""Response model for interest analysis."""
interest_distribution: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
class HabitAnalysisResponse(BaseModel):
"""Response model for habit analysis."""
habits: List[Dict[str, Any]] = Field(default_factory=list)
class ImplicitMemoryLLMClient:
"""LLM client wrapper for implicit memory analysis.
This class provides a high-level interface for performing LLM-based analysis
of user memory summaries to extract preferences, personality dimensions,
interests, and behavioral habits.
"""
def __init__(self, db: Session, default_model_id: Optional[str] = None):
"""Initialize the LLM client wrapper.
Args:
db: Database session for accessing model configurations
default_model_id: Default LLM model ID to use if none specified
"""
self.db = db
self.default_model_id = default_model_id
self._client_factory = MemoryClientFactory(db)
logger.info("ImplicitMemoryLLMClient initialized")
def _get_llm_client(self, model_id: Optional[str] = None):
"""Get LLM client instance.
Args:
model_id: LLM model ID to use, defaults to default_model_id
Returns:
LLM client instance
Raises:
ValueError: If no model ID is provided and no default is set
LLMClientException: If client creation fails
"""
effective_model_id = model_id or self.default_model_id
if not effective_model_id:
raise ValueError("No LLM model ID provided and no default model ID set")
try:
client = self._client_factory.get_llm_client(effective_model_id)
logger.debug(f"Created LLM client for model: {effective_model_id}")
return client
except Exception as e:
logger.error(f"Failed to create LLM client for model {effective_model_id}: {e}")
raise LLMClientException(f"Failed to create LLM client: {e}") from e
def _prepare_summaries_for_analysis(self, user_summaries: List[UserMemorySummary]) -> List[Dict[str, Any]]:
"""Prepare user memory summaries for LLM analysis.
Args:
user_summaries: List of user memory summaries
Returns:
List of formatted summary dictionaries
"""
formatted_summaries = []
for summary in user_summaries:
formatted_summary = {
'summary_id': summary.summary_id,
'user_content': summary.user_content,
'timestamp': summary.timestamp.isoformat(),
'summary_type': summary.summary_type,
'confidence_score': summary.confidence_score
}
formatted_summaries.append(formatted_summary)
logger.debug(f"Prepared {len(formatted_summaries)} summaries for analysis")
return formatted_summaries
async def analyze_preferences(
self,
user_summaries: List[UserMemorySummary],
user_id: str,
model_id: Optional[str] = None
) -> Dict[str, Any]:
"""Analyze user preferences from memory summaries.
Args:
user_summaries: List of user memory summaries to analyze
user_id: Target user ID for analysis
model_id: Optional LLM model ID to use
Returns:
Dictionary containing extracted preferences
Raises:
LLMClientException: If LLM analysis fails
ValueError: If input validation fails
"""
if not user_summaries:
logger.warning(f"No summaries provided for preference analysis of user {user_id}")
return {"preferences": []}
if not user_id:
raise ValueError("User ID is required for preference analysis")
try:
# Prepare summaries and get prompt
formatted_summaries = self._prepare_summaries_for_analysis(user_summaries)
prompt = get_preference_analysis_prompt(formatted_summaries, user_id)
# Get LLM client and perform analysis
llm_client = self._get_llm_client(model_id)
messages = [{"role": "user", "content": prompt}]
# Use structured output for reliable parsing
response = await llm_client.response_structured(
messages=messages,
response_model=PreferenceAnalysisResponse
)
result = response.model_dump()
logger.info(f"Analyzed preferences for user {user_id}: found {len(result.get('preferences', []))} preferences")
return result
except Exception as e:
logger.error(f"Preference analysis failed for user {user_id}: {e}")
raise LLMClientException(f"Preference analysis failed: {e}") from e
async def analyze_dimensions(
self,
user_summaries: List[UserMemorySummary],
user_id: str,
model_id: Optional[str] = None
) -> Dict[str, Any]:
"""Analyze user personality dimensions from memory summaries.
Args:
user_summaries: List of user memory summaries to analyze
user_id: Target user ID for analysis
model_id: Optional LLM model ID to use
Returns:
Dictionary containing dimension scores and analysis
Raises:
LLMClientException: If LLM analysis fails
ValueError: If input validation fails
"""
if not user_summaries:
logger.warning(f"No summaries provided for dimension analysis of user {user_id}")
return {"dimensions": {}}
if not user_id:
raise ValueError("User ID is required for dimension analysis")
try:
# Prepare summaries and get prompt
formatted_summaries = self._prepare_summaries_for_analysis(user_summaries)
prompt = get_dimension_analysis_prompt(formatted_summaries, user_id)
# Get LLM client and perform analysis
llm_client = self._get_llm_client(model_id)
messages = [{"role": "user", "content": prompt}]
# Use structured output for reliable parsing
response = await llm_client.response_structured(
messages=messages,
response_model=DimensionAnalysisResponse
)
result = response.model_dump()
dimensions = result.get('dimensions', {})
logger.info(f"Analyzed dimensions for user {user_id}: {list(dimensions.keys())}")
return result
except Exception as e:
logger.error(f"Dimension analysis failed for user {user_id}: {e}")
raise LLMClientException(f"Dimension analysis failed: {e}") from e
async def analyze_interests(
self,
user_summaries: List[UserMemorySummary],
user_id: str,
model_id: Optional[str] = None
) -> Dict[str, Any]:
"""Analyze user interest distribution from memory summaries.
Args:
user_summaries: List of user memory summaries to analyze
user_id: Target user ID for analysis
model_id: Optional LLM model ID to use
Returns:
Dictionary containing interest area distribution
Raises:
LLMClientException: If LLM analysis fails
ValueError: If input validation fails
"""
if not user_summaries:
logger.warning(f"No summaries provided for interest analysis of user {user_id}")
return {"interest_distribution": {}}
if not user_id:
raise ValueError("User ID is required for interest analysis")
try:
# Prepare summaries and get prompt
formatted_summaries = self._prepare_summaries_for_analysis(user_summaries)
prompt = get_interest_analysis_prompt(formatted_summaries, user_id)
# Get LLM client and perform analysis
llm_client = self._get_llm_client(model_id)
messages = [{"role": "user", "content": prompt}]
# Use structured output for reliable parsing
response = await llm_client.response_structured(
messages=messages,
response_model=InterestAnalysisResponse
)
result = response.model_dump()
interest_dist = result.get('interest_distribution', {})
logger.info(f"Analyzed interests for user {user_id}: {list(interest_dist.keys())}")
return result
except Exception as e:
logger.error(f"Interest analysis failed for user {user_id}: {e}")
raise LLMClientException(f"Interest analysis failed: {e}") from e
async def analyze_habits(
self,
user_summaries: List[UserMemorySummary],
user_id: str,
model_id: Optional[str] = None
) -> Dict[str, Any]:
"""Analyze user behavioral habits from memory summaries.
Args:
user_summaries: List of user memory summaries to analyze
user_id: Target user ID for analysis
model_id: Optional LLM model ID to use
Returns:
Dictionary containing identified behavioral habits
Raises:
LLMClientException: If LLM analysis fails
ValueError: If input validation fails
"""
if not user_summaries:
logger.warning(f"No summaries provided for habit analysis of user {user_id}")
return {"habits": []}
if not user_id:
raise ValueError("User ID is required for habit analysis")
try:
# Prepare summaries and get prompt
formatted_summaries = self._prepare_summaries_for_analysis(user_summaries)
prompt = get_habit_analysis_prompt(formatted_summaries, user_id)
# Get LLM client and perform analysis
llm_client = self._get_llm_client(model_id)
messages = [{"role": "user", "content": prompt}]
# Use structured output for reliable parsing
response = await llm_client.response_structured(
messages=messages,
response_model=HabitAnalysisResponse
)
result = response.model_dump()
logger.info(f"Analyzed habits for user {user_id}: found {len(result.get('habits', []))} habits")
return result
except Exception as e:
logger.error(f"Habit analysis failed for user {user_id}: {e}")
raise LLMClientException(f"Habit analysis failed: {e}") from e

View File

@@ -0,0 +1,69 @@
"""LLM Prompt Templates for Implicit Memory Analysis
This module contains prompt rendering functions for analyzing user memory summaries
to extract preferences, personality dimensions, interests, and behavioral habits.
"""
import os
from typing import Any, Dict, List
from jinja2 import Environment, FileSystemLoader
# Setup Jinja2 environment
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_dir = os.path.join(current_dir, "prompts")
prompt_env = Environment(loader=FileSystemLoader(prompt_dir))
def _render_template(template_name: str, **kwargs) -> str:
"""Helper function to render Jinja2 templates."""
template = prompt_env.get_template(template_name)
return template.render(**kwargs)
def get_preference_analysis_prompt(
memory_summaries: List[Dict[str, Any]],
user_id: str
) -> str:
"""Get formatted preference analysis prompt using Jinja2 template."""
return _render_template(
"preference_analysis.jinja2",
memory_summaries=memory_summaries,
user_id=user_id
)
def get_dimension_analysis_prompt(
memory_summaries: List[Dict[str, Any]],
user_id: str
) -> str:
"""Get formatted dimension analysis prompt using Jinja2 template."""
return _render_template(
"dimension_analysis.jinja2",
memory_summaries=memory_summaries,
user_id=user_id
)
def get_interest_analysis_prompt(
memory_summaries: List[Dict[str, Any]],
user_id: str
) -> str:
"""Get formatted interest analysis prompt using Jinja2 template."""
return _render_template(
"interest_analysis.jinja2",
memory_summaries=memory_summaries,
user_id=user_id
)
def get_habit_analysis_prompt(
memory_summaries: List[Dict[str, Any]],
user_id: str
) -> str:
"""Get formatted habit analysis prompt using Jinja2 template."""
return _render_template(
"habit_analysis.jinja2",
memory_summaries=memory_summaries,
user_id=user_id
)

View File

@@ -0,0 +1,41 @@
You are an expert personality analyst. Analyze memory summaries to assess the user's personality across four dimensions.
## Memory Summaries
{% for summary in memory_summaries %}
Summary {{ loop.index }}:
{{ summary.content or summary.user_content or '' }}
---
{% endfor %}
## Target User ID
{{ user_id }}
## Dimensions to Analyze
1. **Creativity** (0-100%): Creative thinking, artistic interests, innovative ideas
2. **Aesthetic** (0-100%): Design preferences, visual interests, artistic appreciation
3. **Technology** (0-100%): Technical discussions, tool usage, programming interests
4. **Literature** (0-100%): Reading habits, writing style, literary references
## Instructions
1. Analyze the user's content for each dimension
2. Calculate percentage scores (0-100%)
## Output Format
{
"dimensions": {
"creativity": {"percentage": 0-100},
"aesthetic": {"percentage": 0-100},
"technology": {"percentage": 0-100},
"literature": {"percentage": 0-100}
}
}
## Example
{
"dimensions": {
"creativity": {"percentage": 75},
"aesthetic": {"percentage": 45},
"technology": {"percentage": 60},
"literature": {"percentage": 30}
}
}

View File

@@ -0,0 +1,70 @@
You are an expert at identifying behavioral patterns and habits from memory summaries.
## Memory Summaries
{% for summary in memory_summaries %}
Summary {{ loop.index }}:
{{ summary.content or summary.user_content or '' }}
---
{% endfor %}
## Target User ID
{{ user_id }}
## Instructions
1. Identify recurring behavioral patterns mentioned by the SPECIFIED USER
2. Focus on specific, concrete habits with temporal patterns
3. For each habit, provide:
- habit_description: Clear, specific description
- frequency_pattern: "daily", "weekly", "monthly", "seasonal", "occasional", "event_triggered"
- time_context: When it typically happens
- confidence_level: "high", "medium", "low"
- supporting_summaries: References to evidence
- specific_examples: Concrete examples from summaries
- is_current: true if current habit, false if past habit
4. Only include habits with medium or high confidence
5. **IMPORTANT: Output language MUST match the input language. If summaries are in Chinese, output in Chinese. If in English, output in English.**
## Output Format
{
"habits": [
{
"habit_description": "string",
"frequency_pattern": "daily|weekly|monthly|seasonal|occasional|event_triggered",
"time_context": "string",
"confidence_level": "high|medium|low",
"supporting_summaries": ["id1", "id2"],
"specific_examples": ["example1", "example2"],
"is_current": true|false
}
]
}
## Example (English input → English output)
{
"habits": [
{
"habit_description": "drinks coffee every morning",
"frequency_pattern": "daily",
"time_context": "morning routine",
"confidence_level": "high",
"supporting_summaries": ["s1", "s2"],
"specific_examples": ["needs coffee to start the day"],
"is_current": true
}
]
}
## Example (Chinese input → Chinese output)
{
"habits": [
{
"habit_description": "每天早上喝咖啡",
"frequency_pattern": "daily",
"time_context": "早晨日常",
"confidence_level": "high",
"supporting_summaries": ["s1", "s2"],
"specific_examples": ["需要咖啡来开始一天"],
"is_current": true
}
]
}

View File

@@ -0,0 +1,54 @@
You are an expert at analyzing user interests from memory summaries.
## Memory Summaries
{% for summary in memory_summaries %}
Summary {{ loop.index }}:
{{ summary.content or summary.user_content or '' }}
---
{% endfor %}
## Target User ID
{{ user_id }}
## Interest Categories
1. **Tech**: Programming, technology, software tools, hardware
2. **Lifestyle**: Daily routines, health, hobbies, social activities
3. **Music**: Music preferences, instruments, concerts
4. **Art**: Visual arts, creative projects, design, aesthetics
## Instructions
1. Categorize the user's interests into the four areas
2. Calculate percentage distribution (must total 100%)
3. Provide specific evidence for each interest area
4. Use "increasing", "decreasing", or "stable" for trending direction
5. **IMPORTANT: Output language MUST match the input language. If summaries are in Chinese, output in Chinese. If in English, output in English.**
## Output Format
{
"interest_distribution": {
"tech": {"percentage": 0-100, "evidence": [], "trending_direction": "increasing|decreasing|stable|null"},
"lifestyle": {"percentage": 0-100, "evidence": [], "trending_direction": "increasing|decreasing|stable|null"},
"music": {"percentage": 0-100, "evidence": [], "trending_direction": "increasing|decreasing|stable|null"},
"art": {"percentage": 0-100, "evidence": [], "trending_direction": "increasing|decreasing|stable|null"}
}
}
## Example (English input → English output)
{
"interest_distribution": {
"tech": {"percentage": 40, "evidence": ["discusses programming frequently"], "trending_direction": "increasing"},
"lifestyle": {"percentage": 35, "evidence": ["talks about fitness routine"], "trending_direction": "stable"},
"music": {"percentage": 15, "evidence": ["mentioned favorite bands"], "trending_direction": "stable"},
"art": {"percentage": 10, "evidence": ["visited art museum"], "trending_direction": "stable"}
}
}
## Example (Chinese input → Chinese output)
{
"interest_distribution": {
"tech": {"percentage": 40, "evidence": ["经常讨论编程"], "trending_direction": "increasing"},
"lifestyle": {"percentage": 35, "evidence": ["谈论健身日常"], "trending_direction": "stable"},
"music": {"percentage": 15, "evidence": ["提到喜欢的乐队"], "trending_direction": "stable"},
"art": {"percentage": 10, "evidence": ["参观了艺术博物馆"], "trending_direction": "stable"}
}
}

View File

@@ -0,0 +1,47 @@
You are an expert at analyzing user memory summaries to identify implicit preferences.
## Memory Summaries
{% for summary in memory_summaries %}
Summary {{ loop.index }}:
{{ summary.content or summary.user_content or '' }}
---
{% endfor %}
## Target User ID
{{ user_id }}
## Instructions
1. Focus ONLY on the specified user's preferences
2. Extract SHORT preference tags (1-3 words max), like: "音乐", "咖啡", "科幻", "设计", "古典", "吉他"
3. DO NOT use long phrases - use short nouns or noun phrases
4. Only include preferences with confidence_score >= 0.3
5. **IMPORTANT: Output language MUST match the input language. If summaries are in Chinese, output in Chinese. If in English, output in English.**
## Output Format
{
"preferences": [
{
"tag_name": "short tag",
"confidence_score": 0.0-1.0,
"supporting_evidence": ["evidence1", "evidence2"],
"context_details": "brief context",
"category": "category or null"
}
]
}
## Example (Chinese input → Chinese output)
{
"preferences": [
{"tag_name": "咖啡", "confidence_score": 0.8, "supporting_evidence": ["每天早上喝咖啡"], "context_details": "日常习惯", "category": "lifestyle"},
{"tag_name": "古典音乐", "confidence_score": 0.7, "supporting_evidence": ["喜欢听古典"], "context_details": "音乐偏好", "category": "music"}
]
}
## Example (English input → English output)
{
"preferences": [
{"tag_name": "coffee", "confidence_score": 0.8, "supporting_evidence": ["drinks coffee every morning"], "context_details": "daily routine", "category": "lifestyle"},
{"tag_name": "classical music", "confidence_score": 0.7, "supporting_evidence": ["enjoys classical"], "context_details": "music preference", "category": "music"}
]
}