Merge branch 'release/v0.2.2'

# Conflicts:
#	api/app/repositories/memory_config_repository.py
#	api/app/services/emotion_analytics_service.py
#	api/app/utils/config_utils.py
This commit is contained in:
Mark
2026-01-31 15:55:58 +08:00
3 changed files with 202 additions and 195 deletions

View File

@@ -32,6 +32,8 @@ db_logger = get_db_logger()
config_logger = get_config_logger() config_logger = get_config_logger()
TABLE_NAME = "memory_config" TABLE_NAME = "memory_config"
class MemoryConfigRepository: class MemoryConfigRepository:
"""记忆配置Repository """记忆配置Repository
@@ -189,7 +191,6 @@ class MemoryConfigRepository:
raise RuntimeError("reflection config not found") raise RuntimeError("reflection config not found")
return memory_config return memory_config
@staticmethod @staticmethod
def build_select_all(workspace_id: uuid.UUID) -> Tuple[str, Dict]: def build_select_all(workspace_id: uuid.UUID) -> Tuple[str, Dict]:
"""构建查询所有配置的语句SQLAlchemy text() 命名参数) """构建查询所有配置的语句SQLAlchemy text() 命名参数)
@@ -289,7 +290,6 @@ class MemoryConfigRepository:
db_logger.error(f"更新记忆配置失败: config_id={update.config_id} - {str(e)}") db_logger.error(f"更新记忆配置失败: config_id={update.config_id} - {str(e)}")
raise raise
@staticmethod @staticmethod
def update_extracted(db: Session, update: ConfigUpdateExtracted) -> Optional[MemoryConfig]: def update_extracted(db: Session, update: ConfigUpdateExtracted) -> Optional[MemoryConfig]:
"""更新记忆萃取引擎配置 """更新记忆萃取引擎配置
@@ -412,7 +412,7 @@ class MemoryConfigRepository:
raise raise
@staticmethod @staticmethod
def get_extracted_config(db: Session, config_id: UUID |int) -> Optional[Dict]: def get_extracted_config(db: Session, config_id: UUID | int) -> Optional[Dict]:
"""获取萃取配置,通过主键查询某条配置 """获取萃取配置,通过主键查询某条配置
Args: Args:
@@ -422,7 +422,7 @@ class MemoryConfigRepository:
Returns: Returns:
Optional[Dict]: 萃取配置字典不存在则返回None Optional[Dict]: 萃取配置字典不存在则返回None
""" """
config_id=resolve_config_id(config_id,db) config_id = resolve_config_id(config_id, db)
db_logger.debug(f"查询萃取配置: config_id={config_id}") db_logger.debug(f"查询萃取配置: config_id={config_id}")
try: try:
db_config = db.query(MemoryConfig).filter(MemoryConfig.config_id == config_id).first() db_config = db.query(MemoryConfig).filter(MemoryConfig.config_id == config_id).first()
@@ -516,26 +516,27 @@ class MemoryConfigRepository:
except Exception as e: except Exception as e:
db_logger.error(f"根据ID查询记忆配置失败: config_id={config_id} - {str(e)}") db_logger.error(f"根据ID查询记忆配置失败: config_id={config_id} - {str(e)}")
raise raise
@staticmethod @staticmethod
def get_config_with_workspace(db: Session, config_id: uuid.UUID) -> Optional[tuple]: def get_config_with_workspace(db: Session, config_id: uuid.UUID | int | str) -> Optional[tuple]:
"""Get memory config and its associated workspace information """Get memory config and its associated workspace information
Args: Args:
db: Database session db: Database session
config_id: Configuration ID config_id: Configuration ID
Returns: Returns:
Optional[tuple]: (MemoryConfig, Workspace) tuple, None if not found Optional[tuple]: (MemoryConfig, Workspace) tuple, None if not found
Raises: Raises:
ValueError: Raised when config exists but workspace doesn't ValueError: Raised when config exists but workspace doesn't
""" """
import time import time
from app.models.workspace_model import Workspace from app.models.workspace_model import Workspace
start_time = time.time() start_time = time.time()
config_id = resolve_config_id(config_id, db)
# Log configuration loading start # Log configuration loading start
config_logger.info( config_logger.info(
"Loading configuration with workspace", "Loading configuration with workspace",
@@ -544,17 +545,16 @@ class MemoryConfigRepository:
"config_id": config_id "config_id": config_id
} }
) )
db_logger.debug(f"Querying memory config and workspace: config_id={config_id}") db_logger.debug(f"Querying memory config and workspace: config_id={config_id}")
try: try:
# Use join query to get both config and workspace # Use join query to get both config and workspace
result = db.query(MemoryConfig, Workspace).join( result = db.query(MemoryConfig, Workspace).join(
Workspace, MemoryConfig.workspace_id == Workspace.id Workspace, MemoryConfig.workspace_id == Workspace.id
).filter(MemoryConfig.config_id == config_id).first() ).filter(MemoryConfig.config_id == config_id).first()
elapsed_ms = (time.time() - start_time) * 1000 elapsed_ms = (time.time() - start_time) * 1000
if not result: if not result:
# Check if config exists but workspace is missing # Check if config exists but workspace is missing
config_only = db.query(MemoryConfig).filter(MemoryConfig.config_id == config_id).first() config_only = db.query(MemoryConfig).filter(MemoryConfig.config_id == config_id).first()
@@ -583,9 +583,11 @@ class MemoryConfigRepository:
"elapsed_ms": elapsed_ms "elapsed_ms": elapsed_ms
} }
) )
db_logger.error(f"Memory config {config_id} references non-existent workspace {config_only.workspace_id}") db_logger.error(
raise ValueError(f"Workspace {config_only.workspace_id} not found for configuration {config_id}") f"Memory config {config_id} references non-existent workspace {config_only.workspace_id}")
raise ValueError(
f"Workspace {config_only.workspace_id} not found for configuration {config_id}")
config_logger.debug( config_logger.debug(
"Configuration not found", "Configuration not found",
extra={ extra={
@@ -597,9 +599,9 @@ class MemoryConfigRepository:
) )
db_logger.debug(f"Memory config not found: config_id={config_id}") db_logger.debug(f"Memory config not found: config_id={config_id}")
return None return None
config, workspace = result config, workspace = result
# Log successful configuration loading # Log successful configuration loading
config_logger.info( config_logger.info(
"Configuration with workspace loaded successfully", "Configuration with workspace loaded successfully",
@@ -614,16 +616,17 @@ class MemoryConfigRepository:
"elapsed_ms": elapsed_ms "elapsed_ms": elapsed_ms
} }
) )
db_logger.debug(f"Memory config and workspace query successful: config={config.config_name}, workspace={workspace.name}") db_logger.debug(
f"Memory config and workspace query successful: config={config.config_name}, workspace={workspace.name}")
return (config, workspace) return (config, workspace)
except ValueError: except ValueError:
# Re-raise known business exceptions # Re-raise known business exceptions
raise raise
except Exception as e: except Exception as e:
elapsed_ms = (time.time() - start_time) * 1000 elapsed_ms = (time.time() - start_time) * 1000
config_logger.error( config_logger.error(
"Failed to load configuration with workspace", "Failed to load configuration with workspace",
extra={ extra={
@@ -636,9 +639,9 @@ class MemoryConfigRepository:
}, },
exc_info=True exc_info=True
) )
db_logger.error(f"Failed to query memory config and workspace: config_id={config_id} - {str(e)}") db_logger.error(f"Failed to query memory config and workspace: config_id={config_id} - {str(e)}")
raise raise
@staticmethod @staticmethod
def get_all(db: Session, workspace_id: Optional[uuid.UUID] = None) -> List[MemoryConfig]: def get_all(db: Session, workspace_id: Optional[uuid.UUID] = None) -> List[MemoryConfig]:
"""获取所有配置参数 """获取所有配置参数

View File

@@ -17,12 +17,15 @@ from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.utils.config_utils import resolve_config_id
logger = get_business_logger() logger = get_business_logger()
class EmotionSuggestion(BaseModel): class EmotionSuggestion(BaseModel):
"""情绪建议模型""" """情绪建议模型"""
type: str = Field(..., description="建议类型emotion_balance/activity_recommendation/social_connection/stress_management") type: str = Field(...,
description="建议类型emotion_balance/activity_recommendation/social_connection/stress_management")
title: str = Field(..., description="建议标题") title: str = Field(..., description="建议标题")
content: str = Field(..., description="建议内容") content: str = Field(..., description="建议内容")
priority: str = Field(..., description="优先级high/medium/low") priority: str = Field(..., description="优先级high/medium/low")
@@ -37,33 +40,33 @@ class EmotionSuggestionsResponse(BaseModel):
class EmotionAnalyticsService: class EmotionAnalyticsService:
"""情绪分析服务 """情绪分析服务
提供情绪数据的分析和统计功能,包括: 提供情绪数据的分析和统计功能,包括:
- 情绪标签统计 - 情绪标签统计
- 情绪词云数据 - 情绪词云数据
- 情绪健康指数计算 - 情绪健康指数计算
- 个性化情绪建议生成 - 个性化情绪建议生成
Attributes: Attributes:
emotion_repo: 情绪数据仓储实例 emotion_repo: 情绪数据仓储实例
""" """
def __init__(self): def __init__(self):
"""初始化情绪分析服务""" """初始化情绪分析服务"""
connector = Neo4jConnector() connector = Neo4jConnector()
self.emotion_repo = EmotionRepository(connector) self.emotion_repo = EmotionRepository(connector)
logger.info("情绪分析服务初始化完成") logger.info("情绪分析服务初始化完成")
async def get_emotion_tags( async def get_emotion_tags(
self, self,
end_user_id: str, end_user_id: str,
emotion_type: Optional[str] = None, emotion_type: Optional[str] = None,
start_date: Optional[str] = None, start_date: Optional[str] = None,
end_date: Optional[str] = None, end_date: Optional[str] = None,
limit: int = 10 limit: int = 10
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""获取情绪标签统计 """获取情绪标签统计
查询指定用户的情绪类型分布,包括计数、百分比和平均强度。 查询指定用户的情绪类型分布,包括计数、百分比和平均强度。
确保返回所有6个情绪维度joy、sadness、anger、fear、surprise、neutral 确保返回所有6个情绪维度joy、sadness、anger、fear、surprise、neutral
即使某些维度没有数据也会返回count=0的记录。 即使某些维度没有数据也会返回count=0的记录。
@@ -71,8 +74,8 @@ class EmotionAnalyticsService:
""" """
try: try:
logger.info(f"获取情绪标签统计: user={end_user_id}, type={emotion_type}, " logger.info(f"获取情绪标签统计: user={end_user_id}, type={emotion_type}, "
f"start={start_date}, end={end_date}, limit={limit}") f"start={start_date}, end={end_date}, limit={limit}")
# 调用仓储层查询 # 调用仓储层查询
tags = await self.emotion_repo.get_emotion_tags( tags = await self.emotion_repo.get_emotion_tags(
end_user_id=end_user_id, end_user_id=end_user_id,
@@ -81,13 +84,13 @@ class EmotionAnalyticsService:
end_date=end_date, end_date=end_date,
limit=limit limit=limit
) )
# 定义所有6个情绪维度 # 定义所有6个情绪维度
all_emotion_types = ['joy', 'sadness', 'anger', 'fear', 'surprise', 'neutral'] all_emotion_types = ['joy', 'sadness', 'anger', 'fear', 'surprise', 'neutral']
# 将查询结果转换为字典,方便查找 # 将查询结果转换为字典,方便查找
tags_dict = {tag["emotion_type"]: tag for tag in tags} tags_dict = {tag["emotion_type"]: tag for tag in tags}
# 补全缺失的情绪维度 # 补全缺失的情绪维度
complete_tags = [] complete_tags = []
for emotion in all_emotion_types: for emotion in all_emotion_types:
@@ -101,52 +104,52 @@ class EmotionAnalyticsService:
"percentage": 0.0, "percentage": 0.0,
"avg_intensity": 0.0 "avg_intensity": 0.0
}) })
# 计算总数 # 计算总数
total_count = sum(tag["count"] for tag in complete_tags) total_count = sum(tag["count"] for tag in complete_tags)
# 如果有数据重新计算百分比因为补全了0值项 # 如果有数据重新计算百分比因为补全了0值项
if total_count > 0: if total_count > 0:
for tag in complete_tags: for tag in complete_tags:
if tag["count"] > 0: if tag["count"] > 0:
tag["percentage"] = round((tag["count"] / total_count) * 100, 2) tag["percentage"] = round((tag["count"] / total_count) * 100, 2)
# 构建时间范围信息 # 构建时间范围信息
time_range = {} time_range = {}
if start_date: if start_date:
time_range["start_date"] = start_date time_range["start_date"] = start_date
if end_date: if end_date:
time_range["end_date"] = end_date time_range["end_date"] = end_date
# 格式化响应 # 格式化响应
response = { response = {
"tags": complete_tags, "tags": complete_tags,
"total_count": total_count, "total_count": total_count,
"time_range": time_range if time_range else None "time_range": time_range if time_range else None
} }
logger.info(f"情绪标签统计完成: total_count={total_count}, tags_count={len(complete_tags)}") logger.info(f"情绪标签统计完成: total_count={total_count}, tags_count={len(complete_tags)}")
return response return response
except Exception as e: except Exception as e:
logger.error(f"获取情绪标签统计失败: {str(e)}", exc_info=True) logger.error(f"获取情绪标签统计失败: {str(e)}", exc_info=True)
raise raise
async def get_emotion_wordcloud( async def get_emotion_wordcloud(
self, self,
end_user_id: str, end_user_id: str,
emotion_type: Optional[str] = None, emotion_type: Optional[str] = None,
limit: int = 50 limit: int = 50
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""获取情绪词云数据 """获取情绪词云数据
查询情绪关键词及其频率,用于生成词云可视化。 查询情绪关键词及其频率,用于生成词云可视化。
Args: Args:
end_user_id: 宿主ID用户组ID end_user_id: 宿主ID用户组ID
emotion_type: 可选的情绪类型过滤 emotion_type: 可选的情绪类型过滤
limit: 返回关键词的最大数量 limit: 返回关键词的最大数量
Returns: Returns:
Dict: 包含情绪词云数据的响应: Dict: 包含情绪词云数据的响应:
- keywords: 关键词列表 - keywords: 关键词列表
@@ -154,39 +157,39 @@ class EmotionAnalyticsService:
""" """
try: try:
logger.info(f"获取情绪词云数据: user={end_user_id}, type={emotion_type}, limit={limit}") logger.info(f"获取情绪词云数据: user={end_user_id}, type={emotion_type}, limit={limit}")
# 调用仓储层查询 # 调用仓储层查询
keywords = await self.emotion_repo.get_emotion_wordcloud( keywords = await self.emotion_repo.get_emotion_wordcloud(
end_user_id=end_user_id, end_user_id=end_user_id,
emotion_type=emotion_type, emotion_type=emotion_type,
limit=limit limit=limit
) )
# 计算总关键词数量 # 计算总关键词数量
total_keywords = len(keywords) total_keywords = len(keywords)
# 格式化响应 # 格式化响应
response = { response = {
"keywords": keywords, "keywords": keywords,
"total_keywords": total_keywords "total_keywords": total_keywords
} }
logger.info(f"情绪词云数据获取完成: total_keywords={total_keywords}") logger.info(f"情绪词云数据获取完成: total_keywords={total_keywords}")
return response return response
except Exception as e: except Exception as e:
logger.error(f"获取情绪词云数据失败: {str(e)}", exc_info=True) logger.error(f"获取情绪词云数据失败: {str(e)}", exc_info=True)
raise raise
def _calculate_positivity_rate(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]: def _calculate_positivity_rate(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""计算积极率 """计算积极率
根据情绪类型分类正面、负面和中性情绪,计算积极率。 根据情绪类型分类正面、负面和中性情绪,计算积极率。
公式:(正面数 / (正面数 + 负面数)) * 100 公式:(正面数 / (正面数 + 负面数)) * 100
Args: Args:
emotions: 情绪数据列表,每个包含 emotion_type 字段 emotions: 情绪数据列表,每个包含 emotion_type 字段
Returns: Returns:
Dict: 包含积极率计算结果: Dict: 包含积极率计算结果:
- score: 积极率分数0-100 - score: 积极率分数0-100
@@ -197,38 +200,38 @@ class EmotionAnalyticsService:
# 定义情绪分类 # 定义情绪分类
positive_emotions = {'joy', 'surprise'} positive_emotions = {'joy', 'surprise'}
negative_emotions = {'sadness', 'anger', 'fear'} negative_emotions = {'sadness', 'anger', 'fear'}
# 统计各类情绪数量 # 统计各类情绪数量
positive_count = sum(1 for e in emotions if e.get('emotion_type') in positive_emotions) positive_count = sum(1 for e in emotions if e.get('emotion_type') in positive_emotions)
negative_count = sum(1 for e in emotions if e.get('emotion_type') in negative_emotions) negative_count = sum(1 for e in emotions if e.get('emotion_type') in negative_emotions)
neutral_count = sum(1 for e in emotions if e.get('emotion_type') == 'neutral') neutral_count = sum(1 for e in emotions if e.get('emotion_type') == 'neutral')
# 计算积极率 # 计算积极率
total_non_neutral = positive_count + negative_count total_non_neutral = positive_count + negative_count
if total_non_neutral > 0: if total_non_neutral > 0:
score = (positive_count / total_non_neutral) * 100 score = (positive_count / total_non_neutral) * 100
else: else:
score = 50.0 # 如果没有非中性情绪默认为50 score = 50.0 # 如果没有非中性情绪默认为50
logger.debug(f"积极率计算: positive={positive_count}, negative={negative_count}, " logger.debug(f"积极率计算: positive={positive_count}, negative={negative_count}, "
f"neutral={neutral_count}, score={score:.2f}") f"neutral={neutral_count}, score={score:.2f}")
return { return {
"score": round(score, 2), "score": round(score, 2),
"positive_count": positive_count, "positive_count": positive_count,
"negative_count": negative_count, "negative_count": negative_count,
"neutral_count": neutral_count "neutral_count": neutral_count
} }
def _calculate_stability(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]: def _calculate_stability(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""计算稳定性 """计算稳定性
基于情绪强度的标准差计算情绪稳定性。 基于情绪强度的标准差计算情绪稳定性。
公式:(1 - min(std_deviation, 1.0)) * 100 公式:(1 - min(std_deviation, 1.0)) * 100
Args: Args:
emotions: 情绪数据列表,每个包含 emotion_intensity 字段 emotions: 情绪数据列表,每个包含 emotion_intensity 字段
Returns: Returns:
Dict: 包含稳定性计算结果: Dict: 包含稳定性计算结果:
- score: 稳定性分数0-100 - score: 稳定性分数0-100
@@ -236,7 +239,7 @@ class EmotionAnalyticsService:
""" """
# 提取所有情绪强度 # 提取所有情绪强度
intensities = [e.get('emotion_intensity', 0.0) for e in emotions if e.get('emotion_intensity') is not None] intensities = [e.get('emotion_intensity', 0.0) for e in emotions if e.get('emotion_intensity') is not None]
# 计算标准差 # 计算标准差
if len(intensities) >= 2: if len(intensities) >= 2:
std_deviation = statistics.stdev(intensities) std_deviation = statistics.stdev(intensities)
@@ -244,29 +247,29 @@ class EmotionAnalyticsService:
std_deviation = 0.0 # 只有一个数据点标准差为0 std_deviation = 0.0 # 只有一个数据点标准差为0
else: else:
std_deviation = 0.0 # 没有数据标准差为0 std_deviation = 0.0 # 没有数据标准差为0
# 计算稳定性分数 # 计算稳定性分数
# 标准差越小,稳定性越高 # 标准差越小,稳定性越高
score = (1 - min(std_deviation, 1.0)) * 100 score = (1 - min(std_deviation, 1.0)) * 100
logger.debug(f"稳定性计算: intensities_count={len(intensities)}, " logger.debug(f"稳定性计算: intensities_count={len(intensities)}, "
f"std_deviation={std_deviation:.3f}, score={score:.2f}") f"std_deviation={std_deviation:.3f}, score={score:.2f}")
return { return {
"score": round(score, 2), "score": round(score, 2),
"std_deviation": round(std_deviation, 3) "std_deviation": round(std_deviation, 3)
} }
def _calculate_resilience(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]: def _calculate_resilience(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""计算恢复力 """计算恢复力
分析情绪转换模式,统计从负面情绪恢复到正面情绪的能力。 分析情绪转换模式,统计从负面情绪恢复到正面情绪的能力。
公式:(负面到正面转换次数 / 总负面情绪数) * 100 公式:(负面到正面转换次数 / 总负面情绪数) * 100
Args: Args:
emotions: 情绪数据列表,每个包含 emotion_type 和 created_at 字段 emotions: 情绪数据列表,每个包含 emotion_type 和 created_at 字段
应该按时间顺序排列 应该按时间顺序排列
Returns: Returns:
Dict: 包含恢复力计算结果: Dict: 包含恢复力计算结果:
- score: 恢复力分数0-100 - score: 恢复力分数0-100
@@ -275,24 +278,24 @@ class EmotionAnalyticsService:
# 定义情绪分类 # 定义情绪分类
positive_emotions = {'joy', 'surprise'} positive_emotions = {'joy', 'surprise'}
negative_emotions = {'sadness', 'anger', 'fear'} negative_emotions = {'sadness', 'anger', 'fear'}
# 统计负面到正面的转换次数 # 统计负面到正面的转换次数
recovery_count = 0 recovery_count = 0
negative_count = 0 negative_count = 0
for i in range(len(emotions)): for i in range(len(emotions)):
current_emotion = emotions[i].get('emotion_type') current_emotion = emotions[i].get('emotion_type')
# 统计负面情绪总数 # 统计负面情绪总数
if current_emotion in negative_emotions: if current_emotion in negative_emotions:
negative_count += 1 negative_count += 1
# 检查下一个情绪是否为正面 # 检查下一个情绪是否为正面
if i + 1 < len(emotions): if i + 1 < len(emotions):
next_emotion = emotions[i + 1].get('emotion_type') next_emotion = emotions[i + 1].get('emotion_type')
if next_emotion in positive_emotions: if next_emotion in positive_emotions:
recovery_count += 1 recovery_count += 1
# 计算恢复力分数 # 计算恢复力分数
if negative_count > 0: if negative_count > 0:
recovery_rate = recovery_count / negative_count recovery_rate = recovery_count / negative_count
@@ -301,28 +304,28 @@ class EmotionAnalyticsService:
# 如果没有负面情绪恢复力设为100最佳状态 # 如果没有负面情绪恢复力设为100最佳状态
recovery_rate = 1.0 recovery_rate = 1.0
score = 100.0 score = 100.0
logger.debug(f"恢复力计算: negative_count={negative_count}, " logger.debug(f"恢复力计算: negative_count={negative_count}, "
f"recovery_count={recovery_count}, score={score:.2f}") f"recovery_count={recovery_count}, score={score:.2f}")
return { return {
"score": round(score, 2), "score": round(score, 2),
"recovery_rate": round(recovery_rate, 3) "recovery_rate": round(recovery_rate, 3)
} }
async def calculate_emotion_health_index( async def calculate_emotion_health_index(
self, self,
end_user_id: str, end_user_id: str,
time_range: str = "30d" time_range: str = "30d"
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""计算情绪健康指数 """计算情绪健康指数
综合积极率、稳定性和恢复力计算情绪健康指数。 综合积极率、稳定性和恢复力计算情绪健康指数。
Args: Args:
end_user_id: 宿主ID用户组ID end_user_id: 宿主ID用户组ID
time_range: 时间范围7d/30d/90d time_range: 时间范围7d/30d/90d
Returns: Returns:
Dict: 包含情绪健康指数的完整响应: Dict: 包含情绪健康指数的完整响应:
- health_score: 综合健康分数0-100 - health_score: 综合健康分数0-100
@@ -336,13 +339,13 @@ class EmotionAnalyticsService:
""" """
try: try:
logger.info(f"计算情绪健康指数: user={end_user_id}, time_range={time_range}") logger.info(f"计算情绪健康指数: user={end_user_id}, time_range={time_range}")
# 获取时间范围内的情绪数据 # 获取时间范围内的情绪数据
emotions = await self.emotion_repo.get_emotions_in_range( emotions = await self.emotion_repo.get_emotions_in_range(
end_user_id=end_user_id, end_user_id=end_user_id,
time_range=time_range time_range=time_range
) )
# 如果没有数据,返回默认值 # 如果没有数据,返回默认值
if not emotions: if not emotions:
logger.warning(f"用户 {end_user_id} 在时间范围 {time_range} 内没有情绪数据") logger.warning(f"用户 {end_user_id} 在时间范围 {time_range} 内没有情绪数据")
@@ -357,20 +360,20 @@ class EmotionAnalyticsService:
"emotion_distribution": {}, "emotion_distribution": {},
"time_range": time_range "time_range": time_range
} }
# 计算各维度指标 # 计算各维度指标
positivity_rate = self._calculate_positivity_rate(emotions) positivity_rate = self._calculate_positivity_rate(emotions)
stability = self._calculate_stability(emotions) stability = self._calculate_stability(emotions)
resilience = self._calculate_resilience(emotions) resilience = self._calculate_resilience(emotions)
# 计算综合健康分数 # 计算综合健康分数
# 公式positivity_rate * 0.4 + stability * 0.3 + resilience * 0.3 # 公式positivity_rate * 0.4 + stability * 0.3 + resilience * 0.3
health_score = ( health_score = (
positivity_rate["score"] * 0.4 + positivity_rate["score"] * 0.4 +
stability["score"] * 0.3 + stability["score"] * 0.3 +
resilience["score"] * 0.3 resilience["score"] * 0.3
) )
# 确定健康等级 # 确定健康等级
if health_score >= 80: if health_score >= 80:
level = "优秀" level = "优秀"
@@ -380,13 +383,13 @@ class EmotionAnalyticsService:
level = "一般" level = "一般"
else: else:
level = "较差" level = "较差"
# 统计情绪分布 # 统计情绪分布
emotion_distribution = {} emotion_distribution = {}
for emotion_type in ['joy', 'sadness', 'anger', 'fear', 'surprise', 'neutral']: for emotion_type in ['joy', 'sadness', 'anger', 'fear', 'surprise', 'neutral']:
count = sum(1 for e in emotions if e.get('emotion_type') == emotion_type) count = sum(1 for e in emotions if e.get('emotion_type') == emotion_type)
emotion_distribution[emotion_type] = count emotion_distribution[emotion_type] = count
# 格式化响应 # 格式化响应
response = { response = {
"health_score": round(health_score, 2), "health_score": round(health_score, 2),
@@ -399,22 +402,22 @@ class EmotionAnalyticsService:
"emotion_distribution": emotion_distribution, "emotion_distribution": emotion_distribution,
"time_range": time_range "time_range": time_range
} }
logger.info(f"情绪健康指数计算完成: score={health_score:.2f}, level={level}") logger.info(f"情绪健康指数计算完成: score={health_score:.2f}, level={level}")
return response return response
except Exception as e: except Exception as e:
logger.error(f"计算情绪健康指数失败: {str(e)}", exc_info=True) logger.error(f"计算情绪健康指数失败: {str(e)}", exc_info=True)
raise raise
def _analyze_emotion_patterns(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]: def _analyze_emotion_patterns(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""分析情绪模式 """分析情绪模式
识别主要负面情绪、情绪触发因素和波动时段。 识别主要负面情绪、情绪触发因素和波动时段。
Args: Args:
emotions: 情绪数据列表,每个包含 emotion_type、emotion_intensity、created_at 字段 emotions: 情绪数据列表,每个包含 emotion_type、emotion_intensity、created_at 字段
Returns: Returns:
Dict: 包含情绪模式分析结果: Dict: 包含情绪模式分析结果:
- dominant_negative_emotion: 主要负面情绪类型 - dominant_negative_emotion: 主要负面情绪类型
@@ -422,19 +425,19 @@ class EmotionAnalyticsService:
- emotion_volatility: 情绪波动性(高/中/低) - emotion_volatility: 情绪波动性(高/中/低)
""" """
negative_emotions = {'sadness', 'anger', 'fear'} negative_emotions = {'sadness', 'anger', 'fear'}
# 统计负面情绪分布 # 统计负面情绪分布
negative_emotion_counts = {} negative_emotion_counts = {}
for emotion in emotions: for emotion in emotions:
emotion_type = emotion.get('emotion_type') emotion_type = emotion.get('emotion_type')
if emotion_type in negative_emotions: if emotion_type in negative_emotions:
negative_emotion_counts[emotion_type] = negative_emotion_counts.get(emotion_type, 0) + 1 negative_emotion_counts[emotion_type] = negative_emotion_counts.get(emotion_type, 0) + 1
# 识别主要负面情绪 # 识别主要负面情绪
dominant_negative_emotion = None dominant_negative_emotion = None
if negative_emotion_counts: if negative_emotion_counts:
dominant_negative_emotion = max(negative_emotion_counts, key=negative_emotion_counts.get) dominant_negative_emotion = max(negative_emotion_counts, key=negative_emotion_counts.get)
# 识别高强度情绪(强度 >= 0.7 # 识别高强度情绪(强度 >= 0.7
high_intensity_emotions = [ high_intensity_emotions = [
{ {
@@ -445,7 +448,7 @@ class EmotionAnalyticsService:
for e in emotions for e in emotions
if e.get('emotion_intensity', 0) >= 0.7 if e.get('emotion_intensity', 0) >= 0.7
] ]
# 评估情绪波动性 # 评估情绪波动性
intensities = [e.get('emotion_intensity', 0.0) for e in emotions if e.get('emotion_intensity') is not None] intensities = [e.get('emotion_intensity', 0.0) for e in emotions if e.get('emotion_intensity') is not None]
if len(intensities) >= 2: if len(intensities) >= 2:
@@ -458,29 +461,29 @@ class EmotionAnalyticsService:
volatility = "" volatility = ""
else: else:
volatility = "未知" volatility = "未知"
logger.debug(f"情绪模式分析: dominant_negative={dominant_negative_emotion}, " logger.debug(f"情绪模式分析: dominant_negative={dominant_negative_emotion}, "
f"high_intensity_count={len(high_intensity_emotions)}, volatility={volatility}") f"high_intensity_count={len(high_intensity_emotions)}, volatility={volatility}")
return { return {
"dominant_negative_emotion": dominant_negative_emotion, "dominant_negative_emotion": dominant_negative_emotion,
"high_intensity_emotions": high_intensity_emotions[:5], # 最多返回5个 "high_intensity_emotions": high_intensity_emotions[:5], # 最多返回5个
"emotion_volatility": volatility "emotion_volatility": volatility
} }
async def generate_emotion_suggestions( async def generate_emotion_suggestions(
self, self,
end_user_id: str, end_user_id: str,
db: Session, db: Session,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""生成个性化情绪建议 """生成个性化情绪建议
基于情绪健康数据和用户画像生成个性化建议。 基于情绪健康数据和用户画像生成个性化建议。
Args: Args:
end_user_id: 宿主ID用户组ID end_user_id: 宿主ID用户组ID
db: 数据库会话 db: 数据库会话
Returns: Returns:
Dict: 包含个性化建议的响应: Dict: 包含个性化建议的响应:
- health_summary: 健康状态摘要 - health_summary: 健康状态摘要
@@ -488,17 +491,17 @@ class EmotionAnalyticsService:
""" """
try: try:
logger.info(f"生成个性化情绪建议: user={end_user_id}") logger.info(f"生成个性化情绪建议: user={end_user_id}")
# 1. 从 end_user_id 获取关联的 memory_config_id # 1. 从 end_user_id 获取关联的 memory_config_id
llm_client = None llm_client = None
try: try:
from app.services.memory_agent_service import ( from app.services.memory_agent_service import (
get_end_user_connected_config, get_end_user_connected_config,
) )
connected_config = get_end_user_connected_config(end_user_id, db) connected_config = get_end_user_connected_config(end_user_id, db)
config_id = connected_config.get("memory_config_id") config_id = connected_config.get("memory_config_id")
config_id = resolve_config_id(config_id, db)
if config_id is not None: if config_id is not None:
from app.services.memory_config_service import ( from app.services.memory_config_service import (
MemoryConfigService, MemoryConfigService,
@@ -513,35 +516,35 @@ class EmotionAnalyticsService:
llm_client = factory.get_llm_client(str(memory_config.llm_model_id)) llm_client = factory.get_llm_client(str(memory_config.llm_model_id))
except Exception as e: except Exception as e:
logger.warning(f"无法获取 end_user {end_user_id} 的配置,将使用默认配置: {e}") logger.warning(f"无法获取 end_user {end_user_id} 的配置,将使用默认配置: {e}")
# 2. 获取情绪健康数据 # 2. 获取情绪健康数据
health_data = await self.calculate_emotion_health_index(end_user_id, time_range="30d") health_data = await self.calculate_emotion_health_index(end_user_id, time_range="30d")
# 3. 获取情绪数据用于模式分析 # 3. 获取情绪数据用于模式分析
emotions = await self.emotion_repo.get_emotions_in_range( emotions = await self.emotion_repo.get_emotions_in_range(
end_user_id=end_user_id, end_user_id=end_user_id,
time_range="30d" time_range="30d"
) )
# 4. 分析情绪模式 # 4. 分析情绪模式
patterns = self._analyze_emotion_patterns(emotions) patterns = self._analyze_emotion_patterns(emotions)
# 5. 获取用户画像数据简化版直接从Neo4j获取 # 5. 获取用户画像数据简化版直接从Neo4j获取
user_profile = await self._get_simple_user_profile(end_user_id) user_profile = await self._get_simple_user_profile(end_user_id)
# 6. 构建LLM prompt # 6. 构建LLM prompt
prompt = await self._build_suggestion_prompt(health_data, patterns, user_profile) prompt = await self._build_suggestion_prompt(health_data, patterns, user_profile)
# 7. 调用LLM生成建议使用配置中的LLM # 7. 调用LLM生成建议使用配置中的LLM
if llm_client is None: if llm_client is None:
# 无法获取配置时,抛出错误而不是使用默认配置 # 无法获取配置时,抛出错误而不是使用默认配置
raise ValueError("无法获取LLM配置请确保end_user关联了有效的memory_config") raise ValueError("无法获取LLM配置请确保end_user关联了有效的memory_config")
# 将 prompt 转换为 messages 格式 # 将 prompt 转换为 messages 格式
messages = [ messages = [
{"role": "user", "content": prompt} {"role": "user", "content": prompt}
] ]
# 8. 使用结构化输出直接获取 Pydantic 模型 # 8. 使用结构化输出直接获取 Pydantic 模型
try: try:
suggestions_response = await llm_client.response_structured( suggestions_response = await llm_client.response_structured(
@@ -552,7 +555,7 @@ class EmotionAnalyticsService:
logger.error(f"LLM 结构化输出失败: {str(e)}") logger.error(f"LLM 结构化输出失败: {str(e)}")
# 返回默认建议 # 返回默认建议
suggestions_response = self._get_default_suggestions(health_data) suggestions_response = self._get_default_suggestions(health_data)
# 8. 验证建议数量3-5条 # 8. 验证建议数量3-5条
if len(suggestions_response.suggestions) < 3: if len(suggestions_response.suggestions) < 3:
logger.warning(f"建议数量不足: {len(suggestions_response.suggestions)}") logger.warning(f"建议数量不足: {len(suggestions_response.suggestions)}")
@@ -560,7 +563,7 @@ class EmotionAnalyticsService:
elif len(suggestions_response.suggestions) > 5: elif len(suggestions_response.suggestions) > 5:
logger.warning(f"建议数量过多: {len(suggestions_response.suggestions)}") logger.warning(f"建议数量过多: {len(suggestions_response.suggestions)}")
suggestions_response.suggestions = suggestions_response.suggestions[:5] suggestions_response.suggestions = suggestions_response.suggestions[:5]
# 9. 格式化响应 # 9. 格式化响应
response = { response = {
"health_summary": suggestions_response.health_summary, "health_summary": suggestions_response.health_summary,
@@ -575,26 +578,26 @@ class EmotionAnalyticsService:
for s in suggestions_response.suggestions for s in suggestions_response.suggestions
] ]
} }
logger.info(f"个性化建议生成完成: suggestions_count={len(response['suggestions'])}") logger.info(f"个性化建议生成完成: suggestions_count={len(response['suggestions'])}")
return response return response
except Exception as e: except Exception as e:
logger.error(f"生成个性化建议失败: {str(e)}", exc_info=True) logger.error(f"生成个性化建议失败: {str(e)}", exc_info=True)
raise raise
async def _get_simple_user_profile(self, end_user_id: str) -> Dict[str, Any]: async def _get_simple_user_profile(self, end_user_id: str) -> Dict[str, Any]:
"""获取简化的用户画像数据 """获取简化的用户画像数据
Args: Args:
end_user_id: 用户ID end_user_id: 用户ID
Returns: Returns:
Dict: 用户画像数据 Dict: 用户画像数据
""" """
try: try:
connector = Neo4jConnector() connector = Neo4jConnector()
# 查询用户的实体和标签 # 查询用户的实体和标签
query = """ query = """
MATCH (e:Entity) MATCH (e:Entity)
@@ -603,59 +606,59 @@ class EmotionAnalyticsService:
ORDER BY e.created_at DESC ORDER BY e.created_at DESC
LIMIT 20 LIMIT 20
""" """
entities = await connector.execute_query(query, end_user_id=end_user_id) entities = await connector.execute_query(query, end_user_id=end_user_id)
# 提取兴趣标签 # 提取兴趣标签
interests = [e["name"] for e in entities if e.get("type") in ["INTEREST", "HOBBY"]][:5] interests = [e["name"] for e in entities if e.get("type") in ["INTEREST", "HOBBY"]][:5]
# 后期会引入用户的习惯。。 # 后期会引入用户的习惯。。
return { return {
"interests": interests if interests else ["未知"] "interests": interests if interests else ["未知"]
} }
except Exception as e: except Exception as e:
logger.error(f"获取用户画像失败: {str(e)}") logger.error(f"获取用户画像失败: {str(e)}")
return {"interests": ["未知"]} return {"interests": ["未知"]}
async def _build_suggestion_prompt( async def _build_suggestion_prompt(
self, self,
health_data: Dict[str, Any], health_data: Dict[str, Any],
patterns: Dict[str, Any], patterns: Dict[str, Any],
user_profile: Dict[str, Any] user_profile: Dict[str, Any]
) -> str: ) -> str:
"""构建情绪建议生成的prompt """构建情绪建议生成的prompt
Args: Args:
health_data: 情绪健康数据 health_data: 情绪健康数据
patterns: 情绪模式分析结果 patterns: 情绪模式分析结果
user_profile: 用户画像数据 user_profile: 用户画像数据
Returns: Returns:
str: LLM prompt str: LLM prompt
""" """
from app.core.memory.utils.prompt.prompt_utils import ( from app.core.memory.utils.prompt.prompt_utils import (
render_emotion_suggestions_prompt, render_emotion_suggestions_prompt,
) )
prompt = await render_emotion_suggestions_prompt( prompt = await render_emotion_suggestions_prompt(
health_data=health_data, health_data=health_data,
patterns=patterns, patterns=patterns,
user_profile=user_profile user_profile=user_profile
) )
return prompt return prompt
def _get_default_suggestions(self, health_data: Dict[str, Any]) -> EmotionSuggestionsResponse: def _get_default_suggestions(self, health_data: Dict[str, Any]) -> EmotionSuggestionsResponse:
"""获取默认建议当LLM调用失败时使用 """获取默认建议当LLM调用失败时使用
Args: Args:
health_data: 情绪健康数据 health_data: 情绪健康数据
Returns: Returns:
EmotionSuggestionsResponse: 默认建议 EmotionSuggestionsResponse: 默认建议
""" """
health_score = health_data.get('health_score', 0) health_score = health_data.get('health_score', 0)
if health_score >= 80: if health_score >= 80:
summary = "您的情绪健康状况优秀,请继续保持积极的生活态度。" summary = "您的情绪健康状况优秀,请继续保持积极的生活态度。"
elif health_score >= 60: elif health_score >= 60:
@@ -664,7 +667,7 @@ class EmotionAnalyticsService:
summary = "您的情绪健康需要关注,建议采取一些改善措施。" summary = "您的情绪健康需要关注,建议采取一些改善措施。"
else: else:
summary = "您的情绪健康需要重点关注,建议寻求专业帮助。" summary = "您的情绪健康需要重点关注,建议寻求专业帮助。"
suggestions = [ suggestions = [
EmotionSuggestion( EmotionSuggestion(
type="emotion_balance", type="emotion_balance",
@@ -700,54 +703,54 @@ class EmotionAnalyticsService:
] ]
) )
] ]
return EmotionSuggestionsResponse( return EmotionSuggestionsResponse(
health_summary=summary, health_summary=summary,
suggestions=suggestions suggestions=suggestions
) )
async def get_cached_suggestions( async def get_cached_suggestions(
self, self,
end_user_id: str, end_user_id: str,
db: Session, db: Session,
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
"""从 Redis 缓存获取个性化情绪建议 """从 Redis 缓存获取个性化情绪建议
Args: Args:
end_user_id: 宿主ID用户组ID end_user_id: 宿主ID用户组ID
db: 数据库会话(保留参数以保持接口兼容性) db: 数据库会话(保留参数以保持接口兼容性)
Returns: Returns:
Dict: 缓存的建议数据,如果不存在或已过期返回 None Dict: 缓存的建议数据,如果不存在或已过期返回 None
""" """
try: try:
from app.cache.memory.emotion_memory import EmotionMemoryCache from app.cache.memory.emotion_memory import EmotionMemoryCache
logger.info(f"尝试从 Redis 缓存获取情绪建议: user={end_user_id}") logger.info(f"尝试从 Redis 缓存获取情绪建议: user={end_user_id}")
# 从 Redis 获取缓存 # 从 Redis 获取缓存
cached_data = await EmotionMemoryCache.get_emotion_suggestions(end_user_id) cached_data = await EmotionMemoryCache.get_emotion_suggestions(end_user_id)
if cached_data is None: if cached_data is None:
logger.info(f"用户 {end_user_id} 的建议缓存不存在或已过期") logger.info(f"用户 {end_user_id} 的建议缓存不存在或已过期")
return None return None
logger.info(f"成功从 Redis 缓存获取建议: user={end_user_id}") logger.info(f"成功从 Redis 缓存获取建议: user={end_user_id}")
return cached_data return cached_data
except Exception as e: except Exception as e:
logger.error(f"从 Redis 缓存获取建议失败: {str(e)}", exc_info=True) logger.error(f"从 Redis 缓存获取建议失败: {str(e)}", exc_info=True)
return None return None
async def save_suggestions_cache( async def save_suggestions_cache(
self, self,
end_user_id: str, end_user_id: str,
suggestions_data: Dict[str, Any], suggestions_data: Dict[str, Any],
db: Session, db: Session,
expires_hours: int = 24 expires_hours: int = 24
) -> None: ) -> None:
"""保存建议到 Redis 缓存 """保存建议到 Redis 缓存
Args: Args:
end_user_id: 宿主ID用户组ID end_user_id: 宿主ID用户组ID
suggestions_data: 建议数据 suggestions_data: 建议数据
@@ -756,24 +759,24 @@ class EmotionAnalyticsService:
""" """
try: try:
from app.cache.memory.emotion_memory import EmotionMemoryCache from app.cache.memory.emotion_memory import EmotionMemoryCache
logger.info(f"保存建议到 Redis 缓存: user={end_user_id}, expires={expires_hours}小时") logger.info(f"保存建议到 Redis 缓存: user={end_user_id}, expires={expires_hours}小时")
# 计算过期时间(秒) # 计算过期时间(秒)
expire_seconds = expires_hours * 3600 expire_seconds = expires_hours * 3600
# 保存到 Redis # 保存到 Redis
success = await EmotionMemoryCache.set_emotion_suggestions( success = await EmotionMemoryCache.set_emotion_suggestions(
user_id=end_user_id, user_id=end_user_id,
suggestions_data=suggestions_data, suggestions_data=suggestions_data,
expire=expire_seconds expire=expire_seconds
) )
if success: if success:
logger.info(f"建议缓存保存成功: user={end_user_id}") logger.info(f"建议缓存保存成功: user={end_user_id}")
else: else:
logger.warning(f"建议缓存保存失败: user={end_user_id}") logger.warning(f"建议缓存保存失败: user={end_user_id}")
except Exception as e: except Exception as e:
logger.error(f"保存建议缓存失败: {str(e)}", exc_info=True) logger.error(f"保存建议缓存失败: {str(e)}", exc_info=True)
# 不抛出异常,缓存失败不应影响主流程 # 不抛出异常,缓存失败不应影响主流程

View File

@@ -7,30 +7,31 @@ from uuid import UUID
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
def resolve_config_id(config_id: UUID | int, db: Session) -> UUID: def resolve_config_id(config_id: UUID | int|str, db: Session) -> UUID:
""" """
解析 config_id如果是整数则通过 config_id_old 查找对应的 UUID 解析 config_id如果是整数则通过 config_id_old 查找对应的 UUID
Args: Args:
config_id: 配置IDUUID 或整数) config_id: 配置IDUUID 或整数)
db: 数据库会话 db: 数据库会话
Returns: Returns:
UUID: 解析后的配置ID UUID: 解析后的配置ID
Raises: Raises:
ValueError: 当找不到对应的配置时 ValueError: 当找不到对应的配置时
""" """
from app.models.memory_config_model import MemoryConfig from app.models.memory_config_model import MemoryConfig
if isinstance(config_id, UUID): if isinstance(config_id, UUID):
return config_id return config_id
if isinstance(config_id, str) and len(config_id)<=6: if isinstance(config_id, str) and len(config_id)<=6:
memory_config = db.query(MemoryConfig).filter( memory_config = db.query(MemoryConfig).filter(
MemoryConfig.config_id_old == config_id MemoryConfig.config_id_old == int(config_id)
).first() ).first()
print(memory_config)
if not memory_config: if not memory_config:
raise ValueError(f"未找到 config_id_old={config_id} 对应的配置") raise ValueError(f"STR 未找到 config_id_old={config_id} 对应的配置")
return memory_config.config_id return memory_config.config_id
if isinstance(config_id, int): if isinstance(config_id, int):
memory_config = db.query(MemoryConfig).filter( memory_config = db.query(MemoryConfig).filter(
@@ -38,7 +39,7 @@ def resolve_config_id(config_id: UUID | int, db: Session) -> UUID:
).first() ).first()
if not memory_config: if not memory_config:
raise ValueError(f"未找到 config_id_old={config_id} 对应的配置") raise ValueError(f"INT 未找到 config_id_old={config_id} 对应的配置")
return memory_config.config_id return memory_config.config_id