Feature/generate cache (#135)

* [feature]Generate emotions, implicit cache

* [feature]Generate emotions, implicit cache

* [changes]Improve the code based on AI review

* [changes]Improve the code based on AI review

* [changes]Improve the code

* [feature]Generate emotions, implicit cache

* [changes]Improve the code based on AI review

* [changes]Improve the code
This commit is contained in:
乐力齐
2026-01-16 12:33:37 +08:00
committed by GitHub
parent 7c1f040b7c
commit 935f3d54b3
13 changed files with 896 additions and 74 deletions

View File

@@ -18,6 +18,7 @@ from app.models.user_model import User
from app.schemas.emotion_schema import ( from app.schemas.emotion_schema import (
EmotionHealthRequest, EmotionHealthRequest,
EmotionSuggestionsRequest, EmotionSuggestionsRequest,
EmotionGenerateSuggestionsRequest,
EmotionTagsRequest, EmotionTagsRequest,
EmotionWordcloudRequest, EmotionWordcloudRequest,
) )
@@ -198,7 +199,7 @@ async def get_emotion_suggestions(
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
"""获取个性化情绪建议 """获取个性化情绪建议(从缓存读取)
Args: Args:
request: 包含 group_id 和可选的 config_id request: 包含 group_id 和可选的 config_id
@@ -206,7 +207,72 @@ async def get_emotion_suggestions(
current_user: 当前用户 current_user: 当前用户
Returns: Returns:
个性化情绪建议响应 缓存的个性化情绪建议响应
"""
try:
api_logger.info(
f"用户 {current_user.username} 请求获取个性化情绪建议(缓存)",
extra={
"group_id": request.group_id,
"config_id": request.config_id
}
)
# 从缓存获取建议
data = await emotion_service.get_cached_suggestions(
end_user_id=request.group_id,
db=db
)
if data is None:
# 缓存不存在或已过期
api_logger.info(
f"用户 {request.group_id} 的建议缓存不存在或已过期",
extra={"group_id": request.group_id}
)
return fail(
BizCode.RESOURCE_NOT_FOUND,
"建议缓存不存在或已过期,请调用 /generate_suggestions 接口生成新建议",
None
)
api_logger.info(
"个性化建议获取成功(缓存)",
extra={
"group_id": request.group_id,
"suggestions_count": len(data.get("suggestions", []))
}
)
return success(data=data, msg="个性化建议获取成功(缓存)")
except Exception as e:
api_logger.error(
f"获取个性化建议失败: {str(e)}",
extra={"group_id": request.group_id},
exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取个性化建议失败: {str(e)}"
)
@router.post("/generate_suggestions", response_model=ApiResponse)
async def generate_emotion_suggestions(
request: EmotionGenerateSuggestionsRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""生成个性化情绪建议调用LLM并缓存
Args:
request: 包含 group_id、可选的 config_id 和 force_refresh
db: 数据库会话
current_user: 当前用户
Returns:
新生成的个性化情绪建议响应
""" """
try: try:
# 验证 config_id如果提供 # 验证 config_id如果提供
@@ -234,36 +300,44 @@ async def get_emotion_suggestions(
return fail(BizCode.INVALID_PARAMETER, "配置ID验证失败", str(e)) return fail(BizCode.INVALID_PARAMETER, "配置ID验证失败", str(e))
api_logger.info( api_logger.info(
f"用户 {current_user.username} 请求获取个性化情绪建议", f"用户 {current_user.username} 请求生成个性化情绪建议",
extra={ extra={
"group_id": request.group_id, "group_id": request.group_id,
"config_id": config_id "config_id": config_id
} }
) )
# 调用服务层 # 调用服务层生成建议
data = await emotion_service.generate_emotion_suggestions( data = await emotion_service.generate_emotion_suggestions(
end_user_id=request.group_id, end_user_id=request.group_id,
db=db db=db
) )
# 保存到缓存
await emotion_service.save_suggestions_cache(
end_user_id=request.group_id,
suggestions_data=data,
db=db,
expires_hours=24
)
api_logger.info( api_logger.info(
"个性化建议获取成功", "个性化建议生成成功",
extra={ extra={
"group_id": request.group_id, "group_id": request.group_id,
"suggestions_count": len(data.get("suggestions", [])) "suggestions_count": len(data.get("suggestions", []))
} }
) )
return success(data=data, msg="个性化建议获取成功") return success(data=data, msg="个性化建议生成成功")
except Exception as e: except Exception as e:
api_logger.error( api_logger.error(
f"获取个性化建议失败: {str(e)}", f"生成个性化建议失败: {str(e)}",
extra={"group_id": request.group_id}, extra={"group_id": request.group_id},
exc_info=True exc_info=True
) )
raise HTTPException( raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取个性化建议失败: {str(e)}" detail=f"生成个性化建议失败: {str(e)}"
) )

View File

@@ -11,6 +11,7 @@ from app.dependencies import (
) )
from app.models.user_model import User from app.models.user_model import User
from app.schemas.response_schema import ApiResponse from app.schemas.response_schema import ApiResponse
from app.schemas.implicit_memory_schema import GenerateProfileRequest
from app.services.implicit_memory_service import ImplicitMemoryService from app.services.implicit_memory_service import ImplicitMemoryService
from fastapi import APIRouter, Depends, Query from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -133,7 +134,7 @@ async def get_preference_tags(
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user)
) -> ApiResponse: ) -> ApiResponse:
""" """
Get user preference tags with filtering options. Get user preference tags from cache.
Args: Args:
user_id: Target user ID user_id: Target user ID
@@ -143,35 +144,56 @@ async def get_preference_tags(
end_date: Optional end date filter end_date: Optional end date filter
Returns: Returns:
List of preference tags matching the filters List of preference tags from cache
""" """
api_logger.info(f"Preference tags requested for user: {user_id}") api_logger.info(f"Preference tags requested for user: {user_id} (from cache)")
try: try:
# Validate inputs # Validate inputs
validate_user_id(user_id) validate_user_id(user_id)
validate_confidence_threshold(confidence_threshold)
validate_date_range(start_date, end_date)
# Create service with user-specific config # Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=user_id) service = ImplicitMemoryService(db=db, end_user_id=user_id)
# Build date range # Get cached profile
date_range = None cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db)
if start_date and end_date:
from app.schemas.implicit_memory_schema import DateRange
date_range = DateRange(start_date=start_date, end_date=end_date)
# Get preference tags if cached_profile is None:
tags = await service.get_preference_tags( api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
user_id=user_id, return fail(
confidence_threshold=confidence_threshold, BizCode.RESOURCE_NOT_FOUND,
tag_category=tag_category, "画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
date_range=date_range None
) )
api_logger.info(f"Retrieved {len(tags)} preference tags for user: {user_id}") # Extract preferences from cache
return success(data=[tag.model_dump(mode='json') for tag in tags], msg="偏好标签获取成功") preferences = cached_profile.get("preferences", [])
# Apply filters (client-side filtering on cached data)
filtered_preferences = []
for pref in preferences:
# Filter by confidence threshold
if confidence_threshold is not None and pref.get("confidence_score", 0) < confidence_threshold:
continue
# Filter by category if specified
if tag_category and pref.get("category") != tag_category:
continue
# Filter by date range if specified
if start_date or end_date:
created_at_ts = pref.get("created_at")
if created_at_ts:
created_at = datetime.fromtimestamp(created_at_ts / 1000)
if start_date and created_at < start_date:
continue
if end_date and created_at > end_date:
continue
filtered_preferences.append(pref)
api_logger.info(f"Retrieved {len(filtered_preferences)} preference tags for user: {user_id} (from cache)")
return success(data=filtered_preferences, msg="偏好标签获取成功(缓存)")
except Exception as e: except Exception as e:
return handle_implicit_memory_error(e, "偏好标签获取", user_id) return handle_implicit_memory_error(e, "偏好标签获取", user_id)
@@ -186,16 +208,16 @@ async def get_dimension_portrait(
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user)
) -> ApiResponse: ) -> ApiResponse:
""" """
Get user's four-dimension personality portrait. Get user's four-dimension personality portrait from cache.
Args: Args:
user_id: Target user ID user_id: Target user ID
include_history: Whether to include historical trend data include_history: Whether to include historical trend data (ignored for cached data)
Returns: Returns:
Four-dimension personality portrait with scores and evidence Four-dimension personality portrait from cache
""" """
api_logger.info(f"Dimension portrait requested for user: {user_id}") api_logger.info(f"Dimension portrait requested for user: {user_id} (from cache)")
try: try:
# Validate inputs # Validate inputs
@@ -204,13 +226,22 @@ async def get_dimension_portrait(
# Create service with user-specific config # Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=user_id) service = ImplicitMemoryService(db=db, end_user_id=user_id)
portrait = await service.get_dimension_portrait( # Get cached profile
user_id=user_id, cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db)
include_history=include_history
)
api_logger.info(f"Dimension portrait retrieved for user: {user_id}") if cached_profile is None:
return success(data=portrait.model_dump(mode='json'), msg="四维画像获取成功") api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.RESOURCE_NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
None
)
# Extract portrait from cache
portrait = cached_profile.get("portrait", {})
api_logger.info(f"Dimension portrait retrieved for user: {user_id} (from cache)")
return success(data=portrait, msg="四维画像获取成功(缓存)")
except Exception as e: except Exception as e:
return handle_implicit_memory_error(e, "四维画像获取", user_id) return handle_implicit_memory_error(e, "四维画像获取", user_id)
@@ -225,16 +256,16 @@ async def get_interest_area_distribution(
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user)
) -> ApiResponse: ) -> ApiResponse:
""" """
Get user's interest area distribution across four areas. Get user's interest area distribution from cache.
Args: Args:
user_id: Target user ID user_id: Target user ID
include_trends: Whether to include trend analysis data include_trends: Whether to include trend analysis data (ignored for cached data)
Returns: Returns:
Interest area distribution with percentages and evidence Interest area distribution from cache
""" """
api_logger.info(f"Interest area distribution requested for user: {user_id}") api_logger.info(f"Interest area distribution requested for user: {user_id} (from cache)")
try: try:
# Validate inputs # Validate inputs
@@ -243,13 +274,22 @@ async def get_interest_area_distribution(
# Create service with user-specific config # Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=user_id) service = ImplicitMemoryService(db=db, end_user_id=user_id)
distribution = await service.get_interest_area_distribution( # Get cached profile
user_id=user_id, cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db)
include_trends=include_trends
)
api_logger.info(f"Interest area distribution retrieved for user: {user_id}") if cached_profile is None:
return success(data=distribution.model_dump(mode='json'), msg="兴趣领域分布获取成功") api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.RESOURCE_NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
None
)
# Extract interest areas from cache
interest_areas = cached_profile.get("interest_areas", {})
api_logger.info(f"Interest area distribution retrieved for user: {user_id} (from cache)")
return success(data=interest_areas, msg="兴趣领域分布获取成功(缓存)")
except Exception as e: except Exception as e:
return handle_implicit_memory_error(e, "兴趣领域分布获取", user_id) return handle_implicit_memory_error(e, "兴趣领域分布获取", user_id)
@@ -266,7 +306,7 @@ async def get_behavior_habits(
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user)
) -> ApiResponse: ) -> ApiResponse:
""" """
Get user's behavioral habits with filtering options. Get user's behavioral habits from cache.
Args: Args:
user_id: Target user ID user_id: Target user ID
@@ -275,38 +315,117 @@ async def get_behavior_habits(
time_period: Filter by time period (current, past) time_period: Filter by time period (current, past)
Returns: Returns:
List of behavioral habits matching the filters List of behavioral habits from cache
""" """
api_logger.info(f"Behavior habits requested for user: {user_id}") api_logger.info(f"Behavior habits requested for user: {user_id} (from cache)")
try: try:
# Validate inputs # Validate inputs
validate_user_id(user_id) validate_user_id(user_id)
# Convert string confidence level to numerical
numerical_confidence = None
if confidence_level:
confidence_mapping = {
"high": 85,
"medium": 50,
"low": 20
}
numerical_confidence = confidence_mapping.get(confidence_level.lower())
# Create service with user-specific config # Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=user_id) service = ImplicitMemoryService(db=db, end_user_id=user_id)
habits = await service.get_behavior_habits( # Get cached profile
user_id=user_id, cached_profile = await service.get_cached_profile(end_user_id=user_id, db=db)
confidence_level=numerical_confidence,
frequency_pattern=frequency_pattern,
time_period=time_period
)
api_logger.info(f"Retrieved {len(habits)} behavior habits for user: {user_id}") if cached_profile is None:
return success(data=[habit.model_dump(mode='json') for habit in habits], msg="行为习惯获取成功") api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.RESOURCE_NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
None
)
# Extract habits from cache
habits = cached_profile.get("habits", [])
# Apply filters (client-side filtering on cached data)
filtered_habits = []
for habit in habits:
# Filter by confidence level
if confidence_level:
confidence_mapping = {
"high": 85,
"medium": 50,
"low": 20
}
numerical_confidence = confidence_mapping.get(confidence_level.lower())
if habit.get("confidence_level", 0) < numerical_confidence:
continue
# Filter by frequency pattern
if frequency_pattern and habit.get("frequency_pattern") != frequency_pattern:
continue
# Filter by time period
if time_period:
is_current = habit.get("is_current", True)
if time_period.lower() == "current" and not is_current:
continue
elif time_period.lower() == "past" and is_current:
continue
filtered_habits.append(habit)
api_logger.info(f"Retrieved {len(filtered_habits)} behavior habits for user: {user_id} (from cache)")
return success(data=filtered_habits, msg="行为习惯获取成功(缓存)")
except Exception as e: except Exception as e:
return handle_implicit_memory_error(e, "行为习惯获取", user_id) return handle_implicit_memory_error(e, "行为习惯获取", user_id)
@router.post("/generate_profile", response_model=ApiResponse)
@cur_workspace_access_guard()
async def generate_implicit_memory_profile(
request: GenerateProfileRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
) -> ApiResponse:
"""
Generate complete user profile (all 4 modules) and cache it.
Args:
request: Generate profile request with end_user_id
db: Database session
current_user: Current authenticated user
Returns:
Complete user profile with all modules
"""
end_user_id = request.end_user_id
api_logger.info(f"Generate profile requested for user: {end_user_id}")
try:
# Validate inputs
validate_user_id(end_user_id)
# Create service with user-specific config
service = ImplicitMemoryService(db=db, end_user_id=end_user_id)
# Generate complete profile (calls LLM for all 4 modules)
api_logger.info(f"开始生成完整用户画像: user={end_user_id}")
profile_data = await service.generate_complete_profile(user_id=end_user_id)
# Save to cache
await service.save_profile_cache(
end_user_id=end_user_id,
profile_data=profile_data,
db=db,
expires_hours=168 # 7 days
)
api_logger.info(f"用户画像生成并缓存成功: user={end_user_id}")
# Add metadata
profile_data["end_user_id"] = end_user_id
profile_data["cached"] = False
return success(data=profile_data, msg="用户画像生成成功")
except Exception as e:
api_logger.error(f"生成用户画像失败: user={end_user_id}, error={str(e)}", exc_info=True)
return handle_implicit_memory_error(e, "用户画像生成", end_user_id)

View File

@@ -27,6 +27,8 @@ from .tool_model import (
ToolExecution, ToolType, ToolStatus, AuthType, ExecutionStatus ToolExecution, ToolType, ToolStatus, AuthType, ExecutionStatus
) )
from .memory_perceptual_model import MemoryPerceptualModel from .memory_perceptual_model import MemoryPerceptualModel
from .emotion_suggestions_cache_model import EmotionSuggestionsCache
from .implicit_memory_cache_model import ImplicitMemoryCache
__all__ = [ __all__ = [
"Tenants", "Tenants",
@@ -76,5 +78,7 @@ __all__ = [
"ToolStatus", "ToolStatus",
"AuthType", "AuthType",
"ExecutionStatus", "ExecutionStatus",
"MemoryPerceptualModel" "MemoryPerceptualModel",
"EmotionSuggestionsCache",
"ImplicitMemoryCache"
] ]

View File

@@ -0,0 +1,24 @@
"""情绪建议缓存模型"""
import uuid
import datetime
from sqlalchemy import Column, String, Text, Integer, DateTime, JSON
from sqlalchemy.dialects.postgresql import UUID
from app.db import Base
class EmotionSuggestionsCache(Base):
"""情绪建议缓存表
用于缓存个性化情绪建议,减少 LLM 调用成本,提升响应速度。
"""
__tablename__ = "emotion_suggestions_cache"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID组ID")
health_summary = Column(Text, nullable=False, comment="健康状态摘要")
suggestions = Column(JSON, nullable=False, comment="建议列表JSON格式")
generated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, comment="生成时间")
expires_at = Column(DateTime, nullable=True, comment="过期时间")
created_at = Column(DateTime, default=datetime.datetime.now)
updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now)

View File

@@ -0,0 +1,27 @@
"""隐性记忆缓存模型"""
import uuid
import datetime
from sqlalchemy import Column, String, Integer, DateTime, JSON
from sqlalchemy.dialects.postgresql import UUID
from app.db import Base
class ImplicitMemoryCache(Base):
"""隐性记忆缓存表
用于缓存用户的完整隐性记忆画像,包括偏好标签、四维画像、兴趣领域和行为习惯。
减少 LLM 调用成本,提升响应速度。
"""
__tablename__ = "implicit_memory_cache"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID")
preferences = Column(JSON, nullable=False, comment="偏好标签列表JSON格式")
portrait = Column(JSON, nullable=False, comment="四维画像对象JSON格式")
interest_areas = Column(JSON, nullable=False, comment="兴趣领域分布对象JSON格式")
habits = Column(JSON, nullable=False, comment="行为习惯列表JSON格式")
generated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, comment="生成时间")
expires_at = Column(DateTime, nullable=True, comment="过期时间")
created_at = Column(DateTime, default=datetime.datetime.now)
updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now)

View File

@@ -81,7 +81,7 @@ class DataConfigRepository:
n.description AS description, n.description AS description,
n.entity_type AS entity_type, n.entity_type AS entity_type,
n.name AS name, n.name AS name,
n.fact_summary AS fact_summary, COALESCE(n.fact_summary, '') AS fact_summary,
n.group_id AS group_id, n.group_id AS group_id,
n.apply_id AS apply_id, n.apply_id AS apply_id,
n.user_id AS user_id, n.user_id AS user_id,
@@ -115,7 +115,7 @@ class DataConfigRepository:
description: n.description, description: n.description,
entity_type: n.entity_type, entity_type: n.entity_type,
name: n.name, name: n.name,
fact_summary: n.fact_summary, fact_summary: COALESCE(n.fact_summary, ''),
id: n.id id: n.id
} AS sourceNode, } AS sourceNode,
{ {
@@ -132,7 +132,7 @@ class DataConfigRepository:
description: m.description, description: m.description,
entity_type: m.entity_type, entity_type: m.entity_type,
name: m.name, name: m.name,
fact_summary: m.fact_summary, fact_summary: COALESCE(m.fact_summary, ''),
id: m.id id: m.id
} AS targetNode } AS targetNode
""" """

View File

@@ -0,0 +1,163 @@
"""情绪建议缓存仓储层"""
from sqlalchemy.orm import Session
from typing import Optional, Dict, Any
import datetime
from app.models.emotion_suggestions_cache_model import EmotionSuggestionsCache
from app.core.logging_config import get_db_logger
# 获取数据库专用日志器
db_logger = get_db_logger()
class EmotionSuggestionsCacheRepository:
"""情绪建议缓存仓储类"""
def __init__(self, db: Session):
self.db = db
def get_by_end_user_id(self, end_user_id: str) -> Optional[EmotionSuggestionsCache]:
"""根据终端用户ID获取缓存
Args:
end_user_id: 终端用户ID组ID
Returns:
缓存记录,如果不存在返回 None
"""
try:
cache = (
self.db.query(EmotionSuggestionsCache)
.filter(EmotionSuggestionsCache.end_user_id == end_user_id)
.first()
)
if cache:
db_logger.info(f"成功获取用户 {end_user_id} 的情绪建议缓存")
else:
db_logger.info(f"用户 {end_user_id} 的情绪建议缓存不存在")
return cache
except Exception as e:
db_logger.error(f"获取用户 {end_user_id} 的情绪建议缓存失败: {str(e)}")
raise
def create_or_update(
self,
end_user_id: str,
health_summary: str,
suggestions: list,
expires_hours: int = 24
) -> EmotionSuggestionsCache:
"""创建或更新缓存
Args:
end_user_id: 终端用户ID组ID
health_summary: 健康状态摘要
suggestions: 建议列表
expires_hours: 过期时间小时默认24小时
Returns:
缓存记录
"""
try:
# 查找现有记录
cache = self.get_by_end_user_id(end_user_id)
now = datetime.datetime.now()
expires_at = now + datetime.timedelta(hours=expires_hours)
if cache:
# 更新现有记录
cache.health_summary = health_summary
cache.suggestions = suggestions
cache.generated_at = now
cache.expires_at = expires_at
cache.updated_at = now
db_logger.info(f"更新用户 {end_user_id} 的情绪建议缓存")
else:
# 创建新记录
cache = EmotionSuggestionsCache(
end_user_id=end_user_id,
health_summary=health_summary,
suggestions=suggestions,
generated_at=now,
expires_at=expires_at,
created_at=now,
updated_at=now
)
self.db.add(cache)
db_logger.info(f"创建用户 {end_user_id} 的情绪建议缓存")
self.db.commit()
self.db.refresh(cache)
return cache
except Exception as e:
self.db.rollback()
db_logger.error(f"创建或更新用户 {end_user_id} 的情绪建议缓存失败: {str(e)}")
raise
def delete_by_end_user_id(self, end_user_id: str) -> bool:
"""删除缓存
Args:
end_user_id: 终端用户ID组ID
Returns:
是否删除成功
"""
try:
cache = self.get_by_end_user_id(end_user_id)
if cache:
self.db.delete(cache)
self.db.commit()
db_logger.info(f"删除用户 {end_user_id} 的情绪建议缓存")
return True
return False
except Exception as e:
self.db.rollback()
db_logger.error(f"删除用户 {end_user_id} 的情绪建议缓存失败: {str(e)}")
raise
@staticmethod
def is_expired(cache: EmotionSuggestionsCache) -> bool:
"""检查缓存是否过期
Args:
cache: 缓存记录
Returns:
是否过期
"""
if cache.expires_at is None:
return False
return datetime.datetime.now() > cache.expires_at
# 便捷函数
def get_cache_by_end_user_id(db: Session, end_user_id: str) -> Optional[EmotionSuggestionsCache]:
"""根据终端用户ID获取缓存"""
repo = EmotionSuggestionsCacheRepository(db)
return repo.get_by_end_user_id(end_user_id)
def create_or_update_cache(
db: Session,
end_user_id: str,
health_summary: str,
suggestions: list,
expires_hours: int = 24
) -> EmotionSuggestionsCache:
"""创建或更新缓存"""
repo = EmotionSuggestionsCacheRepository(db)
return repo.create_or_update(end_user_id, health_summary, suggestions, expires_hours)
def delete_cache_by_end_user_id(db: Session, end_user_id: str) -> bool:
"""删除缓存"""
repo = EmotionSuggestionsCacheRepository(db)
return repo.delete_by_end_user_id(end_user_id)
def is_cache_expired(cache: EmotionSuggestionsCache) -> bool:
"""检查缓存是否过期"""
return EmotionSuggestionsCacheRepository.is_expired(cache)

View File

@@ -0,0 +1,175 @@
"""隐性记忆缓存仓储层"""
from sqlalchemy.orm import Session
from typing import Optional, Dict, Any
import datetime
from app.models.implicit_memory_cache_model import ImplicitMemoryCache
from app.core.logging_config import get_db_logger
# 获取数据库专用日志器
db_logger = get_db_logger()
class ImplicitMemoryCacheRepository:
"""隐性记忆缓存仓储类"""
def __init__(self, db: Session):
self.db = db
def get_by_end_user_id(self, end_user_id: str) -> Optional[ImplicitMemoryCache]:
"""根据终端用户ID获取缓存
Args:
end_user_id: 终端用户ID
Returns:
缓存记录,如果不存在返回 None
"""
try:
cache = (
self.db.query(ImplicitMemoryCache)
.filter(ImplicitMemoryCache.end_user_id == end_user_id)
.first()
)
if cache:
db_logger.info(f"成功获取用户 {end_user_id} 的隐性记忆缓存")
else:
db_logger.info(f"用户 {end_user_id} 的隐性记忆缓存不存在")
return cache
except Exception as e:
db_logger.error(f"获取用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}")
raise
def create_or_update(
self,
end_user_id: str,
preferences: list,
portrait: dict,
interest_areas: dict,
habits: list,
expires_hours: int = 168 # 默认7天
) -> ImplicitMemoryCache:
"""创建或更新缓存
Args:
end_user_id: 终端用户ID
preferences: 偏好标签列表
portrait: 四维画像对象
interest_areas: 兴趣领域分布对象
habits: 行为习惯列表
expires_hours: 过期时间小时默认168小时7天
Returns:
缓存记录
"""
try:
# 查找现有记录
cache = self.get_by_end_user_id(end_user_id)
now = datetime.datetime.now()
expires_at = now + datetime.timedelta(hours=expires_hours)
if cache:
# 更新现有记录
cache.preferences = preferences
cache.portrait = portrait
cache.interest_areas = interest_areas
cache.habits = habits
cache.generated_at = now
cache.expires_at = expires_at
cache.updated_at = now
db_logger.info(f"更新用户 {end_user_id} 的隐性记忆缓存")
else:
# 创建新记录
cache = ImplicitMemoryCache(
end_user_id=end_user_id,
preferences=preferences,
portrait=portrait,
interest_areas=interest_areas,
habits=habits,
generated_at=now,
expires_at=expires_at,
created_at=now,
updated_at=now
)
self.db.add(cache)
db_logger.info(f"创建用户 {end_user_id} 的隐性记忆缓存")
self.db.commit()
self.db.refresh(cache)
return cache
except Exception as e:
self.db.rollback()
db_logger.error(f"创建或更新用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}")
raise
def delete_by_end_user_id(self, end_user_id: str) -> bool:
"""删除缓存
Args:
end_user_id: 终端用户ID
Returns:
是否删除成功
"""
try:
cache = self.get_by_end_user_id(end_user_id)
if cache:
self.db.delete(cache)
self.db.commit()
db_logger.info(f"删除用户 {end_user_id} 的隐性记忆缓存")
return True
return False
except Exception as e:
self.db.rollback()
db_logger.error(f"删除用户 {end_user_id} 的隐性记忆缓存失败: {str(e)}")
raise
@staticmethod
def is_expired(cache: ImplicitMemoryCache) -> bool:
"""检查缓存是否过期
Args:
cache: 缓存记录
Returns:
是否过期
"""
if cache.expires_at is None:
return False
return datetime.datetime.now() > cache.expires_at
# 便捷函数
def get_cache_by_end_user_id(db: Session, end_user_id: str) -> Optional[ImplicitMemoryCache]:
"""根据终端用户ID获取缓存"""
repo = ImplicitMemoryCacheRepository(db)
return repo.get_by_end_user_id(end_user_id)
def create_or_update_cache(
db: Session,
end_user_id: str,
preferences: list,
portrait: dict,
interest_areas: dict,
habits: list,
expires_hours: int = 168
) -> ImplicitMemoryCache:
"""创建或更新缓存"""
repo = ImplicitMemoryCacheRepository(db)
return repo.create_or_update(
end_user_id, preferences, portrait, interest_areas, habits, expires_hours
)
def delete_cache_by_end_user_id(db: Session, end_user_id: str) -> bool:
"""删除缓存"""
repo = ImplicitMemoryCacheRepository(db)
return repo.delete_by_end_user_id(end_user_id)
def is_cache_expired(cache: ImplicitMemoryCache) -> bool:
"""检查缓存是否过期"""
return ImplicitMemoryCacheRepository.is_expired(cache)

View File

@@ -332,7 +332,7 @@ RETURN e.id AS id,
e.description AS description, e.description AS description,
e.aliases AS aliases, e.aliases AS aliases,
e.name_embedding AS name_embedding, e.name_embedding AS name_embedding,
e.fact_summary AS fact_summary, COALESCE(e.fact_summary, '') AS fact_summary,
e.connect_strength AS connect_strength, e.connect_strength AS connect_strength,
collect(DISTINCT s.id) AS statement_ids, collect(DISTINCT s.id) AS statement_ids,
collect(DISTINCT c.id) AS chunk_ids, collect(DISTINCT c.id) AS chunk_ids,

View File

@@ -30,3 +30,9 @@ class EmotionSuggestionsRequest(BaseModel):
"""获取个性化情绪建议请求""" """获取个性化情绪建议请求"""
group_id: str = Field(..., description="组ID") group_id: str = Field(..., description="组ID")
config_id: Optional[int] = Field(None, description="配置ID用于指定LLM模型") config_id: Optional[int] = Field(None, description="配置ID用于指定LLM模型")
class EmotionGenerateSuggestionsRequest(BaseModel):
"""生成个性化情绪建议请求"""
group_id: str = Field(..., description="组ID")
config_id: Optional[int] = Field(None, description="配置ID用于指定LLM模型")

View File

@@ -262,3 +262,25 @@ InterestCategory = InterestCategoryResponse
InterestAreaDistribution = InterestAreaDistributionResponse InterestAreaDistribution = InterestAreaDistributionResponse
BehaviorHabit = BehaviorHabitResponse BehaviorHabit = BehaviorHabitResponse
UserProfile = UserProfileResponse UserProfile = UserProfileResponse
# Cache-related Schemas
class GenerateProfileRequest(BaseModel):
"""生成完整用户画像请求"""
end_user_id: str = Field(..., description="终端用户ID")
class CompleteProfileResponse(BaseModel):
"""完整用户画像响应(包含所有模块)"""
user_id: str
preferences: List[PreferenceTagResponse]
portrait: DimensionPortraitResponse
interest_areas: InterestAreaDistributionResponse
habits: List[BehaviorHabitResponse]
generated_at: datetime.datetime
cached: bool = Field(False, description="是否来自缓存")
@field_serializer("generated_at", when_used="json")
def _serialize_generated_at(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None

View File

@@ -705,3 +705,85 @@ class EmotionAnalyticsService:
health_summary=summary, health_summary=summary,
suggestions=suggestions suggestions=suggestions
) )
async def get_cached_suggestions(
self,
end_user_id: str,
db: Session,
) -> Optional[Dict[str, Any]]:
"""从缓存获取个性化情绪建议
Args:
end_user_id: 宿主ID用户组ID
db: 数据库会话
Returns:
Dict: 缓存的建议数据,如果不存在或已过期返回 None
"""
try:
from app.repositories.emotion_suggestions_cache_repository import (
EmotionSuggestionsCacheRepository,
)
logger.info(f"尝试从缓存获取情绪建议: user={end_user_id}")
cache_repo = EmotionSuggestionsCacheRepository(db)
cache = cache_repo.get_by_end_user_id(end_user_id)
if cache is None:
logger.info(f"用户 {end_user_id} 的建议缓存不存在")
return None
# 检查是否过期
if cache_repo.is_expired(cache):
logger.info(f"用户 {end_user_id} 的建议缓存已过期")
return None
logger.info(f"成功从缓存获取建议: user={end_user_id}")
return {
"health_summary": cache.health_summary,
"suggestions": cache.suggestions,
"generated_at": cache.generated_at.isoformat(),
"cached": True
}
except Exception as e:
logger.error(f"从缓存获取建议失败: {str(e)}", exc_info=True)
return None
async def save_suggestions_cache(
self,
end_user_id: str,
suggestions_data: Dict[str, Any],
db: Session,
expires_hours: int = 24
) -> None:
"""保存建议到缓存
Args:
end_user_id: 宿主ID用户组ID
suggestions_data: 建议数据
db: 数据库会话
expires_hours: 过期时间(小时)
"""
try:
from app.repositories.emotion_suggestions_cache_repository import (
EmotionSuggestionsCacheRepository,
)
logger.info(f"保存建议到缓存: user={end_user_id}")
cache_repo = EmotionSuggestionsCacheRepository(db)
cache_repo.create_or_update(
end_user_id=end_user_id,
health_summary=suggestions_data["health_summary"],
suggestions=suggestions_data["suggestions"],
expires_hours=expires_hours
)
logger.info(f"建议缓存保存成功: user={end_user_id}")
except Exception as e:
logger.error(f"保存建议缓存失败: {str(e)}", exc_info=True)
# 不抛出异常,缓存失败不应影响主流程

View File

@@ -7,6 +7,7 @@ user profiles from memory summaries.
""" """
import logging import logging
import asyncio
from datetime import datetime from datetime import datetime
from typing import List, Optional from typing import List, Optional
@@ -372,4 +373,129 @@ class ImplicitMemoryService:
except Exception as e: except Exception as e:
logger.error(f"Failed to get behavior habits for user {user_id}: {e}") logger.error(f"Failed to get behavior habits for user {user_id}: {e}")
raise raise
async def generate_complete_profile(
self,
user_id: str
) -> dict:
"""生成完整的用户画像包含所有4个模块
Args:
user_id: 用户ID
Returns:
Dict: 包含所有模块的完整画像数据
"""
logger.info(f"生成完整用户画像: user={user_id}")
try:
# 并行调用4个分析方法
preferences, portrait, interest_areas, habits = await asyncio.gather(
self.get_preference_tags(user_id=user_id),
self.get_dimension_portrait(user_id=user_id),
self.get_interest_area_distribution(user_id=user_id),
self.get_behavior_habits(user_id=user_id)
)
# 转换为可序列化的格式
profile_data = {
"preferences": [tag.model_dump(mode='json') for tag in preferences],
"portrait": portrait.model_dump(mode='json'),
"interest_areas": interest_areas.model_dump(mode='json'),
"habits": [habit.model_dump(mode='json') for habit in habits]
}
logger.info(f"完整用户画像生成完成: user={user_id}")
return profile_data
except Exception as e:
logger.error(f"生成完整用户画像失败: {str(e)}", exc_info=True)
raise
async def get_cached_profile(
self,
end_user_id: str,
db: Session
) -> Optional[dict]:
"""从缓存获取完整用户画像
Args:
end_user_id: 终端用户ID
db: 数据库会话
Returns:
Dict: 缓存的画像数据,如果不存在或已过期返回 None
"""
try:
from app.repositories.implicit_memory_cache_repository import (
ImplicitMemoryCacheRepository,
)
logger.info(f"尝试从缓存获取用户画像: user={end_user_id}")
cache_repo = ImplicitMemoryCacheRepository(db)
cache = cache_repo.get_by_end_user_id(end_user_id)
if cache is None:
logger.info(f"用户 {end_user_id} 的画像缓存不存在")
return None
# 检查是否过期
if cache_repo.is_expired(cache):
logger.info(f"用户 {end_user_id} 的画像缓存已过期")
return None
logger.info(f"成功从缓存获取用户画像: user={end_user_id}")
return {
"end_user_id": cache.end_user_id,
"preferences": cache.preferences,
"portrait": cache.portrait,
"interest_areas": cache.interest_areas,
"habits": cache.habits,
"generated_at": cache.generated_at.isoformat(),
"cached": True
}
except Exception as e:
logger.error(f"从缓存获取用户画像失败: {str(e)}", exc_info=True)
return None
async def save_profile_cache(
self,
end_user_id: str,
profile_data: dict,
db: Session,
expires_hours: int = 168 # 默认7天
) -> None:
"""保存用户画像到缓存
Args:
end_user_id: 终端用户ID
profile_data: 画像数据
db: 数据库会话
expires_hours: 过期时间小时默认168小时7天
"""
try:
from app.repositories.implicit_memory_cache_repository import (
ImplicitMemoryCacheRepository,
)
logger.info(f"保存用户画像到缓存: user={end_user_id}")
cache_repo = ImplicitMemoryCacheRepository(db)
cache_repo.create_or_update(
end_user_id=end_user_id,
preferences=profile_data["preferences"],
portrait=profile_data["portrait"],
interest_areas=profile_data["interest_areas"],
habits=profile_data["habits"],
expires_hours=expires_hours
)
logger.info(f"用户画像缓存保存成功: user={end_user_id}")
except Exception as e:
logger.error(f"保存用户画像缓存失败: {str(e)}", exc_info=True)
# 不抛出异常,缓存失败不应影响主流程