Files
MemoryBear/api/app/services/implicit_memory_service.py
2026-03-04 12:34:24 +08:00

483 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Implicit Memory Service
Main service orchestrating all implicit memory operations. This service coordinates
profile building, data extraction, and provides high-level methods for analyzing
user profiles from memory summaries.
"""
import logging
import asyncio
from datetime import datetime
from typing import List, Optional
from app.core.memory.analytics.implicit_memory.analyzers.dimension_analyzer import (
DimensionAnalyzer,
)
from app.core.memory.analytics.implicit_memory.analyzers.interest_analyzer import (
InterestAnalyzer,
)
from app.core.memory.analytics.implicit_memory.analyzers.preference_analyzer import (
PreferenceAnalyzer,
)
from app.core.memory.analytics.implicit_memory.data_source import MemoryDataSource
from app.core.memory.analytics.implicit_memory.habit_detector import HabitDetector
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.schemas.implicit_memory_schema import (
BehaviorHabit,
DateRange,
DimensionPortrait,
FrequencyPattern,
InterestAreaDistribution,
PreferenceTag,
TimeRange,
UserMemorySummary,
)
from app.schemas.memory_config_schema import MemoryConfig
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
class ImplicitMemoryService:
"""Main service for implicit memory operations."""
def __init__(
self,
db: Session,
end_user_id: str
):
"""Initialize the implicit memory service.
Args:
db: Database session
end_user_id: End user ID to get connected memory configuration
"""
self.db = db
self.end_user_id = end_user_id
# Get connected memory configuration for the user
self.memory_config = self._get_user_memory_config()
# Extract LLM model ID from memory config
llm_model_id = str(self.memory_config.llm_model_id) if self.memory_config.llm_model_id else None
# Initialize Neo4j connector
self.neo4j_connector = Neo4jConnector()
# Initialize core components with LLM model ID
self.data_source = MemoryDataSource(db, self.neo4j_connector)
self.preference_analyzer = PreferenceAnalyzer(db, llm_model_id)
self.dimension_analyzer = DimensionAnalyzer(db, llm_model_id)
self.interest_analyzer = InterestAnalyzer(db, llm_model_id)
self.habit_detector = HabitDetector(db, llm_model_id)
logger.info(f"ImplicitMemoryService initialized for end_user: {end_user_id}")
def _get_user_memory_config(self) -> MemoryConfig:
"""Get memory configuration for the connected end user.
Returns:
MemoryConfig: User's connected memory configuration
Raises:
ValueError: If no memory configuration found for user
"""
try:
from app.services.memory_agent_service import get_end_user_connected_config
from app.services.memory_config_service import MemoryConfigService
# Get user's connected config
connected_config = get_end_user_connected_config(self.end_user_id, self.db)
config_id = connected_config.get("memory_config_id")
workspace_id = connected_config.get("workspace_id")
if config_id is None and workspace_id is None:
raise ValueError(f"No memory configuration found for end_user: {self.end_user_id}")
# Load the memory configuration with workspace fallback
config_service = MemoryConfigService(self.db)
memory_config = config_service.load_memory_config(
config_id=config_id,
workspace_id=workspace_id
)
logger.info(f"Loaded memory config {config_id} for end_user: {self.end_user_id}")
return memory_config
except Exception as e:
logger.error(f"Failed to get memory config for end_user {self.end_user_id}: {e}")
raise ValueError(f"Unable to get memory configuration for end_user {self.end_user_id}: {e}")
async def extract_user_summaries(
self,
user_id: str,
time_range: Optional[TimeRange] = None,
limit: Optional[int] = None
) -> List[UserMemorySummary]:
"""Extract user-specific memory summaries.
Args:
user_id: Target user ID
time_range: Optional time range to filter summaries
limit: Optional limit on number of summaries
Returns:
List of user-specific memory summaries
"""
logger.info(f"Extracting user summaries for user {user_id}")
try:
summaries = await self.data_source.get_user_summaries(
user_id=user_id,
time_range=time_range,
limit=limit or 1000
)
logger.info(f"Extracted {len(summaries)} summaries for user {user_id}")
return summaries
except Exception as e:
logger.error(f"Failed to extract user summaries for user {user_id}: {e}")
raise
async def get_preference_tags(
self,
user_id: str,
confidence_threshold: float = 0.5,
tag_category: Optional[str] = None,
date_range: Optional[DateRange] = None
) -> List[PreferenceTag]:
"""Retrieve user preference tags with filtering.
Args:
user_id: Target user ID
confidence_threshold: Minimum confidence score for tags
tag_category: Optional category filter
date_range: Optional date range filter
Returns:
List of filtered preference tags
"""
logger.info(f"Getting preference tags for user {user_id}")
try:
# Get user summaries for analysis
time_range = None
if date_range:
time_range = TimeRange(
start_date=date_range.start_date or datetime.min,
end_date=date_range.end_date or datetime.now()
)
user_summaries = await self.extract_user_summaries(
user_id=user_id,
time_range=time_range
)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return []
# Analyze preferences
preference_tags = await self.preference_analyzer.analyze_preferences(
user_id=user_id,
user_summaries=user_summaries
)
# Apply filters
filtered_tags = []
for tag in preference_tags:
# Filter by confidence threshold
if tag.confidence_score < confidence_threshold:
continue
# Filter by category if specified
if tag_category and tag.category != tag_category:
continue
# Filter by date range if specified
if date_range:
if date_range.start_date and tag.created_at < date_range.start_date:
continue
if date_range.end_date and tag.created_at > date_range.end_date:
continue
filtered_tags.append(tag)
# Sort by confidence score and recency
filtered_tags.sort(
key=lambda x: (x.confidence_score, x.updated_at),
reverse=True
)
logger.info(f"Retrieved {len(filtered_tags)} preference tags for user {user_id}")
return filtered_tags
except Exception as e:
logger.error(f"Failed to get preference tags for user {user_id}: {e}")
raise
async def get_dimension_portrait(
self,
user_id: str,
include_history: bool = False
) -> DimensionPortrait:
"""Get user's four-dimension personality portrait.
Args:
user_id: Target user ID
include_history: Whether to include historical trends
Returns:
User's dimension portrait
"""
logger.info(f"Getting dimension portrait for user {user_id}")
try:
# Get user summaries
user_summaries = await self.extract_user_summaries(user_id=user_id)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return self.dimension_analyzer._create_empty_portrait(user_id)
# Analyze dimensions
dimension_portrait = await self.dimension_analyzer.analyze_dimensions(
user_id=user_id,
user_summaries=user_summaries
)
# Include historical trends if requested
if include_history:
# In a full implementation, this would retrieve historical data
# For now, we'll leave historical_trends as None
pass
logger.info(f"Retrieved dimension portrait for user {user_id}")
return dimension_portrait
except Exception as e:
logger.error(f"Failed to get dimension portrait for user {user_id}: {e}")
raise
async def get_interest_area_distribution(
self,
user_id: str,
include_trends: bool = False
) -> InterestAreaDistribution:
"""Get user's interest area distribution across four areas.
Args:
user_id: Target user ID
include_trends: Whether to include trending information
Returns:
User's interest area distribution
"""
logger.info(f"Getting interest area distribution for user {user_id}")
try:
# Get user summaries
user_summaries = await self.extract_user_summaries(user_id=user_id)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return self.interest_analyzer._create_empty_distribution(user_id)
# Analyze interests
interest_distribution = await self.interest_analyzer.analyze_interests(
user_id=user_id,
user_summaries=user_summaries
)
# Include trends if requested
if include_trends:
# In a full implementation, this would calculate trending directions
# For now, we'll leave trending_direction as None for each category
pass
logger.info(f"Retrieved interest area distribution for user {user_id}")
return interest_distribution
except Exception as e:
logger.error(f"Failed to get interest area distribution for user {user_id}: {e}")
raise
async def get_behavior_habits(
self,
user_id: str,
confidence_level: Optional[int] = None,
frequency_pattern: Optional[str] = None,
time_period: Optional[str] = None
) -> List[BehaviorHabit]:
"""Get user's behavioral habits with filtering.
Args:
user_id: Target user ID
confidence_level: Optional confidence level filter (0-100)
frequency_pattern: Optional frequency pattern filter
time_period: Optional time period filter ("current", "past")
Returns:
List of filtered behavioral habits
"""
logger.info(f"Getting behavior habits for user {user_id}")
try:
# Get user summaries
user_summaries = await self.extract_user_summaries(user_id=user_id)
if not user_summaries:
logger.warning(f"No summaries found for user {user_id}")
return []
# Detect habits
behavior_habits = await self.habit_detector.detect_habits(
user_id=user_id,
user_summaries=user_summaries
)
# Apply filters
filtered_habits = []
for habit in behavior_habits:
# Filter by confidence level
if confidence_level is not None:
if habit.confidence_level < confidence_level:
continue
# Filter by frequency pattern
if frequency_pattern:
try:
target_frequency = FrequencyPattern(frequency_pattern.lower())
if habit.frequency_pattern != target_frequency:
continue
except ValueError:
logger.warning(f"Invalid frequency pattern: {frequency_pattern}")
continue
# Filter by time period
if time_period:
if time_period.lower() == "current" and not habit.is_current:
continue
elif time_period.lower() == "past" and habit.is_current:
continue
filtered_habits.append(habit)
# Sort by confidence level and recency
filtered_habits.sort(
key=lambda x: (x.confidence_level, x.last_observed),
reverse=True
)
logger.info(f"Retrieved {len(filtered_habits)} behavior habits for user {user_id}")
return filtered_habits
except Exception as e:
logger.error(f"Failed to get behavior habits for user {user_id}: {e}")
raise
async def generate_complete_profile(
self,
user_id: str
) -> dict:
"""生成完整的用户画像包含所有4个模块
Args:
user_id: 用户ID
Returns:
Dict: 包含所有模块的完整画像数据
"""
logger.info(f"生成完整用户画像: user={user_id}")
try:
# 并行调用4个分析方法
preferences, portrait, interest_areas, habits = await asyncio.gather(
self.get_preference_tags(user_id=user_id),
self.get_dimension_portrait(user_id=user_id),
self.get_interest_area_distribution(user_id=user_id),
self.get_behavior_habits(user_id=user_id)
)
# 转换为可序列化的格式
profile_data = {
"preferences": [tag.model_dump(mode='json') for tag in preferences],
"portrait": portrait.model_dump(mode='json'),
"interest_areas": interest_areas.model_dump(mode='json'),
"habits": [habit.model_dump(mode='json') for habit in habits]
}
logger.info(f"完整用户画像生成完成: user={user_id}")
return profile_data
except Exception as e:
logger.error(f"生成完整用户画像失败: {str(e)}", exc_info=True)
raise
async def get_cached_profile(
self,
end_user_id: str,
db: Session
) -> Optional[dict]:
"""从数据库获取完整用户画像
Args:
end_user_id: 终端用户ID
db: 数据库会话
Returns:
Dict: 存储的画像数据,如果不存在返回 None
"""
try:
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
logger.info(f"尝试从数据库获取用户画像: user={end_user_id}")
# 从数据库获取存储记录
repo = ImplicitEmotionsStorageRepository(db)
storage = repo.get_by_end_user_id(end_user_id)
if storage is None or storage.implicit_profile is None:
logger.info(f"用户 {end_user_id} 的画像数据不存在")
return None
logger.info(f"成功从数据库获取用户画像: user={end_user_id}")
return storage.implicit_profile
except Exception as e:
logger.error(f"从数据库获取用户画像失败: {str(e)}", exc_info=True)
return None
async def save_profile_cache(
self,
end_user_id: str,
profile_data: dict,
db: Session,
expires_hours: int = 168 # 参数保留以保持接口兼容性
) -> None:
"""保存用户画像到数据库
Args:
end_user_id: 终端用户ID
profile_data: 画像数据
db: 数据库会话
expires_hours: 保留参数(兼容性)
"""
try:
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
logger.info(f"保存用户画像到数据库: user={end_user_id}")
repo = ImplicitEmotionsStorageRepository(db)
repo.update_implicit_profile(end_user_id, profile_data)
db.commit()
logger.info(f"用户画像保存成功: user={end_user_id}")
except Exception as e:
db.rollback()
logger.error(f"保存用户画像失败: {str(e)}", exc_info=True)