Files
MemoryBear/api/app/services/implicit_memory_service.py
乐力齐 935f3d54b3 Feature/generate cache (#135)
* [feature]Generate emotions, implicit cache

* [feature]Generate emotions, implicit cache

* [changes]Improve the code based on AI review

* [changes]Improve the code based on AI review

* [changes]Improve the code

* [feature]Generate emotions, implicit cache

* [changes]Improve the code based on AI review

* [changes]Improve the code
2026-01-16 12:33:37 +08:00

502 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Implicit Memory Service
Main service orchestrating all implicit memory operations. This service coordinates
profile building, data extraction, and provides high-level methods for analyzing
user profiles from memory summaries.
"""
import logging
import asyncio
from datetime import datetime
from typing import List, Optional
from app.core.memory.analytics.implicit_memory.analyzers.dimension_analyzer import (
DimensionAnalyzer,
)
from app.core.memory.analytics.implicit_memory.analyzers.interest_analyzer import (
InterestAnalyzer,
)
from app.core.memory.analytics.implicit_memory.analyzers.preference_analyzer import (
PreferenceAnalyzer,
)
from app.core.memory.analytics.implicit_memory.data_source import MemoryDataSource
from app.core.memory.analytics.implicit_memory.habit_detector import HabitDetector
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.implicit_memory_schema import (
BehaviorHabit,
DateRange,
DimensionPortrait,
FrequencyPattern,
InterestAreaDistribution,
PreferenceTag,
TimeRange,
UserMemorySummary,
)
from app.schemas.memory_config_schema import MemoryConfig
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
class ImplicitMemoryService:
"""Main service for implicit memory operations."""
def __init__(
self,
db: Session,
end_user_id: str
):
"""Initialize the implicit memory service.
Args:
db: Database session
end_user_id: End user ID to get connected memory configuration
"""
self.db = db
self.end_user_id = end_user_id
# Get connected memory configuration for the user
self.memory_config = self._get_user_memory_config()
# Extract LLM model ID from memory config
llm_model_id = str(self.memory_config.llm_model_id) if self.memory_config.llm_model_id else None
# Initialize Neo4j connector
self.neo4j_connector = Neo4jConnector()
# Initialize core components with LLM model ID
self.data_source = MemoryDataSource(db, self.neo4j_connector)
self.preference_analyzer = PreferenceAnalyzer(db, llm_model_id)
self.dimension_analyzer = DimensionAnalyzer(db, llm_model_id)
self.interest_analyzer = InterestAnalyzer(db, llm_model_id)
self.habit_detector = HabitDetector(db, llm_model_id)
logger.info(f"ImplicitMemoryService initialized for end_user: {end_user_id}")
def _get_user_memory_config(self) -> MemoryConfig:
"""Get memory configuration for the connected end user.
Returns:
MemoryConfig: User's connected memory configuration
Raises:
ValueError: If no memory configuration found for user
"""
try:
from app.services.memory_agent_service import get_end_user_connected_config
from app.services.memory_config_service import MemoryConfigService
# Get user's connected config
connected_config = get_end_user_connected_config(self.end_user_id, self.db)
config_id = connected_config.get("memory_config_id")
if config_id is None:
raise ValueError(f"No memory configuration found for end_user: {self.end_user_id}")
# Load the memory configuration
config_service = MemoryConfigService(self.db)
memory_config = config_service.load_memory_config(config_id)
logger.info(f"Loaded memory config {config_id} for end_user: {self.end_user_id}")
return memory_config
except Exception as e:
logger.error(f"Failed to get memory config for end_user {self.end_user_id}: {e}")
raise ValueError(f"Unable to get memory configuration for end_user {self.end_user_id}: {e}")
async def extract_user_summaries(
self,
user_id: str,
time_range: Optional[TimeRange] = None,
limit: Optional[int] = None
) -> List[UserMemorySummary]:
"""Extract user-specific memory summaries.
Args:
user_id: Target user ID
time_range: Optional time range to filter summaries
limit: Optional limit on number of summaries
Returns:
List of user-specific memory summaries
"""
logger.info(f"Extracting user summaries for user {user_id}")
try:
summaries = await self.data_source.get_user_summaries(
user_id=user_id,
time_range=time_range,
limit=limit or 1000
)
logger.info(f"Extracted {len(summaries)} summaries for user {user_id}")
return summaries
except Exception as e:
logger.error(f"Failed to extract user summaries for user {user_id}: {e}")
raise
async def get_preference_tags(
self,
user_id: str,
confidence_threshold: float = 0.5,
tag_category: Optional[str] = None,
date_range: Optional[DateRange] = None
) -> List[PreferenceTag]:
"""Retrieve user preference tags with filtering.
Args:
user_id: Target user ID
confidence_threshold: Minimum confidence score for tags
tag_category: Optional category filter
date_range: Optional date range filter
Returns:
List of filtered preference tags
"""
logger.info(f"Getting preference tags for user {user_id}")
try:
# Get user summaries for analysis
time_range = None
if date_range:
time_range = TimeRange(
start_date=date_range.start_date or datetime.min,
end_date=date_range.end_date or datetime.now()
)
user_summaries = await self.extract_user_summaries(
user_id=user_id,
time_range=time_range
)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return []
# Analyze preferences
preference_tags = await self.preference_analyzer.analyze_preferences(
user_id=user_id,
user_summaries=user_summaries
)
# Apply filters
filtered_tags = []
for tag in preference_tags:
# Filter by confidence threshold
if tag.confidence_score < confidence_threshold:
continue
# Filter by category if specified
if tag_category and tag.category != tag_category:
continue
# Filter by date range if specified
if date_range:
if date_range.start_date and tag.created_at < date_range.start_date:
continue
if date_range.end_date and tag.created_at > date_range.end_date:
continue
filtered_tags.append(tag)
# Sort by confidence score and recency
filtered_tags.sort(
key=lambda x: (x.confidence_score, x.updated_at),
reverse=True
)
logger.info(f"Retrieved {len(filtered_tags)} preference tags for user {user_id}")
return filtered_tags
except Exception as e:
logger.error(f"Failed to get preference tags for user {user_id}: {e}")
raise
async def get_dimension_portrait(
self,
user_id: str,
include_history: bool = False
) -> DimensionPortrait:
"""Get user's four-dimension personality portrait.
Args:
user_id: Target user ID
include_history: Whether to include historical trends
Returns:
User's dimension portrait
"""
logger.info(f"Getting dimension portrait for user {user_id}")
try:
# Get user summaries
user_summaries = await self.extract_user_summaries(user_id=user_id)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return self.dimension_analyzer._create_empty_portrait(user_id)
# Analyze dimensions
dimension_portrait = await self.dimension_analyzer.analyze_dimensions(
user_id=user_id,
user_summaries=user_summaries
)
# Include historical trends if requested
if include_history:
# In a full implementation, this would retrieve historical data
# For now, we'll leave historical_trends as None
pass
logger.info(f"Retrieved dimension portrait for user {user_id}")
return dimension_portrait
except Exception as e:
logger.error(f"Failed to get dimension portrait for user {user_id}: {e}")
raise
async def get_interest_area_distribution(
self,
user_id: str,
include_trends: bool = False
) -> InterestAreaDistribution:
"""Get user's interest area distribution across four areas.
Args:
user_id: Target user ID
include_trends: Whether to include trending information
Returns:
User's interest area distribution
"""
logger.info(f"Getting interest area distribution for user {user_id}")
try:
# Get user summaries
user_summaries = await self.extract_user_summaries(user_id=user_id)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return self.interest_analyzer._create_empty_distribution(user_id)
# Analyze interests
interest_distribution = await self.interest_analyzer.analyze_interests(
user_id=user_id,
user_summaries=user_summaries
)
# Include trends if requested
if include_trends:
# In a full implementation, this would calculate trending directions
# For now, we'll leave trending_direction as None for each category
pass
logger.info(f"Retrieved interest area distribution for user {user_id}")
return interest_distribution
except Exception as e:
logger.error(f"Failed to get interest area distribution for user {user_id}: {e}")
raise
async def get_behavior_habits(
self,
user_id: str,
confidence_level: Optional[int] = None,
frequency_pattern: Optional[str] = None,
time_period: Optional[str] = None
) -> List[BehaviorHabit]:
"""Get user's behavioral habits with filtering.
Args:
user_id: Target user ID
confidence_level: Optional confidence level filter (0-100)
frequency_pattern: Optional frequency pattern filter
time_period: Optional time period filter ("current", "past")
Returns:
List of filtered behavioral habits
"""
logger.info(f"Getting behavior habits for user {user_id}")
try:
# Get user summaries
user_summaries = await self.extract_user_summaries(user_id=user_id)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return []
# Detect habits
behavior_habits = await self.habit_detector.detect_habits(
user_id=user_id,
user_summaries=user_summaries
)
# Apply filters
filtered_habits = []
for habit in behavior_habits:
# Filter by confidence level
if confidence_level is not None:
if habit.confidence_level < confidence_level:
continue
# Filter by frequency pattern
if frequency_pattern:
try:
target_frequency = FrequencyPattern(frequency_pattern.lower())
if habit.frequency_pattern != target_frequency:
continue
except ValueError:
logger.warning(f"Invalid frequency pattern: {frequency_pattern}")
continue
# Filter by time period
if time_period:
if time_period.lower() == "current" and not habit.is_current:
continue
elif time_period.lower() == "past" and habit.is_current:
continue
filtered_habits.append(habit)
# Sort by confidence level and recency
filtered_habits.sort(
key=lambda x: (x.confidence_level, x.last_observed),
reverse=True
)
logger.info(f"Retrieved {len(filtered_habits)} behavior habits for user {user_id}")
return filtered_habits
except Exception as e:
logger.error(f"Failed to get behavior habits for user {user_id}: {e}")
raise
async def generate_complete_profile(
self,
user_id: str
) -> dict:
"""生成完整的用户画像包含所有4个模块
Args:
user_id: 用户ID
Returns:
Dict: 包含所有模块的完整画像数据
"""
logger.info(f"生成完整用户画像: user={user_id}")
try:
# 并行调用4个分析方法
preferences, portrait, interest_areas, habits = await asyncio.gather(
self.get_preference_tags(user_id=user_id),
self.get_dimension_portrait(user_id=user_id),
self.get_interest_area_distribution(user_id=user_id),
self.get_behavior_habits(user_id=user_id)
)
# 转换为可序列化的格式
profile_data = {
"preferences": [tag.model_dump(mode='json') for tag in preferences],
"portrait": portrait.model_dump(mode='json'),
"interest_areas": interest_areas.model_dump(mode='json'),
"habits": [habit.model_dump(mode='json') for habit in habits]
}
logger.info(f"完整用户画像生成完成: user={user_id}")
return profile_data
except Exception as e:
logger.error(f"生成完整用户画像失败: {str(e)}", exc_info=True)
raise
async def get_cached_profile(
self,
end_user_id: str,
db: Session
) -> Optional[dict]:
"""从缓存获取完整用户画像
Args:
end_user_id: 终端用户ID
db: 数据库会话
Returns:
Dict: 缓存的画像数据,如果不存在或已过期返回 None
"""
try:
from app.repositories.implicit_memory_cache_repository import (
ImplicitMemoryCacheRepository,
)
logger.info(f"尝试从缓存获取用户画像: user={end_user_id}")
cache_repo = ImplicitMemoryCacheRepository(db)
cache = cache_repo.get_by_end_user_id(end_user_id)
if cache is None:
logger.info(f"用户 {end_user_id} 的画像缓存不存在")
return None
# 检查是否过期
if cache_repo.is_expired(cache):
logger.info(f"用户 {end_user_id} 的画像缓存已过期")
return None
logger.info(f"成功从缓存获取用户画像: user={end_user_id}")
return {
"end_user_id": cache.end_user_id,
"preferences": cache.preferences,
"portrait": cache.portrait,
"interest_areas": cache.interest_areas,
"habits": cache.habits,
"generated_at": cache.generated_at.isoformat(),
"cached": True
}
except Exception as e:
logger.error(f"从缓存获取用户画像失败: {str(e)}", exc_info=True)
return None
async def save_profile_cache(
self,
end_user_id: str,
profile_data: dict,
db: Session,
expires_hours: int = 168 # 默认7天
) -> None:
"""保存用户画像到缓存
Args:
end_user_id: 终端用户ID
profile_data: 画像数据
db: 数据库会话
expires_hours: 过期时间小时默认168小时7天
"""
try:
from app.repositories.implicit_memory_cache_repository import (
ImplicitMemoryCacheRepository,
)
logger.info(f"保存用户画像到缓存: user={end_user_id}")
cache_repo = ImplicitMemoryCacheRepository(db)
cache_repo.create_or_update(
end_user_id=end_user_id,
preferences=profile_data["preferences"],
portrait=profile_data["portrait"],
interest_areas=profile_data["interest_areas"],
habits=profile_data["habits"],
expires_hours=expires_hours
)
logger.info(f"用户画像缓存保存成功: user={end_user_id}")
except Exception as e:
logger.error(f"保存用户画像缓存失败: {str(e)}", exc_info=True)
# 不抛出异常,缓存失败不应影响主流程