Merge #21 into develop from feature/emotion-engine

feature/情绪引擎

* feature/emotion-engine: (7 commits squashed)

  - [feature]Emotion Engine Development

  - [feature]Emotion Engine Development

  - Merge branch 'feature/emotion-engine' of codeup.aliyun.com:redbearai/python/redbear-mem-open into feature/emotion-engine

  - [fix]1.Fix the front-end files;2.Cache Management Deletion;3.Delete "check_code.py"

  - [fix]1.Fix the front-end files;2.Cache Management Deletion;3.Delete "check_code.py"

  - Merge branch 'feature/emotion-engine' of codeup.aliyun.com:redbearai/python/redbear-mem-open into feature/emotion-engine

  - [fix]fix vite.config.ts

Signed-off-by: 乐力齐 <accounts_690c7b0af9007d7e338af636@mail.teambition.com>
Commented-by: aliyun6762716068 <accounts_68cb7c6b61f5dcc4200d6251@mail.teambition.com>
Commented-by: 乐力齐 <accounts_690c7b0af9007d7e338af636@mail.teambition.com>
Reviewed-by: aliyun6762716068 <accounts_68cb7c6b61f5dcc4200d6251@mail.teambition.com>
Merged-by: aliyun6762716068 <accounts_68cb7c6b61f5dcc4200d6251@mail.teambition.com>

CR-link: https://codeup.aliyun.com/redbearai/python/redbear-mem-open/change/21
This commit is contained in:
乐力齐
2025-12-20 07:02:46 +00:00
committed by 孙科
parent 1f0bb1f8af
commit 1f4524c28c
23 changed files with 2453 additions and 67 deletions

View File

@@ -0,0 +1,670 @@
# -*- coding: utf-8 -*-
"""情绪分析服务模块
本模块提供情绪数据的分析和统计功能,包括情绪标签、词云、健康指数计算等。
Classes:
EmotionAnalyticsService: 情绪分析服务,提供各种情绪分析功能
"""
from typing import Dict, Any, Optional, List
import statistics
import json
from pydantic import BaseModel, Field
from app.repositories.neo4j.emotion_repository import EmotionRepository
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.core.logging_config import get_business_logger
logger = get_business_logger()
class EmotionSuggestion(BaseModel):
"""情绪建议模型"""
type: str = Field(..., description="建议类型emotion_balance/activity_recommendation/social_connection/stress_management")
title: str = Field(..., description="建议标题")
content: str = Field(..., description="建议内容")
priority: str = Field(..., description="优先级high/medium/low")
actionable_steps: List[str] = Field(..., description="可执行步骤列表3个")
class EmotionSuggestionsResponse(BaseModel):
"""情绪建议响应模型"""
health_summary: str = Field(..., description="健康状态摘要不超过50字")
suggestions: List[EmotionSuggestion] = Field(..., description="建议列表3-5条")
class EmotionAnalyticsService:
"""情绪分析服务
提供情绪数据的分析和统计功能,包括:
- 情绪标签统计
- 情绪词云数据
- 情绪健康指数计算
- 个性化情绪建议生成
Attributes:
emotion_repo: 情绪数据仓储实例
"""
def __init__(self):
"""初始化情绪分析服务"""
connector = Neo4jConnector()
self.emotion_repo = EmotionRepository(connector)
logger.info("情绪分析服务初始化完成")
async def get_emotion_tags(
self,
end_user_id: str,
emotion_type: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 10
) -> Dict[str, Any]:
"""获取情绪标签统计
查询指定用户的情绪类型分布,包括计数、百分比和平均强度。
Args:
end_user_id: 宿主ID用户组ID
emotion_type: 可选的情绪类型过滤
start_date: 可选的开始日期ISO格式
end_date: 可选的结束日期ISO格式
limit: 返回结果的最大数量
Returns:
Dict: 包含情绪标签统计的响应数据:
- tags: 情绪标签列表
- total_count: 总情绪数量
- time_range: 时间范围信息
"""
try:
logger.info(f"获取情绪标签统计: user={end_user_id}, type={emotion_type}, "
f"start={start_date}, end={end_date}, limit={limit}")
# 调用仓储层查询
tags = await self.emotion_repo.get_emotion_tags(
group_id=end_user_id,
emotion_type=emotion_type,
start_date=start_date,
end_date=end_date,
limit=limit
)
# 计算总数
total_count = sum(tag["count"] for tag in tags)
# 构建时间范围信息
time_range = {}
if start_date:
time_range["start_date"] = start_date
if end_date:
time_range["end_date"] = end_date
# 格式化响应
response = {
"tags": tags,
"total_count": total_count,
"time_range": time_range if time_range else None
}
logger.info(f"情绪标签统计完成: total_count={total_count}, tags_count={len(tags)}")
return response
except Exception as e:
logger.error(f"获取情绪标签统计失败: {str(e)}", exc_info=True)
raise
async def get_emotion_wordcloud(
self,
end_user_id: str,
emotion_type: Optional[str] = None,
limit: int = 50
) -> Dict[str, Any]:
"""获取情绪词云数据
查询情绪关键词及其频率,用于生成词云可视化。
Args:
end_user_id: 宿主ID用户组ID
emotion_type: 可选的情绪类型过滤
limit: 返回关键词的最大数量
Returns:
Dict: 包含情绪词云数据的响应:
- keywords: 关键词列表
- total_keywords: 总关键词数量
"""
try:
logger.info(f"获取情绪词云数据: user={end_user_id}, type={emotion_type}, limit={limit}")
# 调用仓储层查询
keywords = await self.emotion_repo.get_emotion_wordcloud(
group_id=end_user_id,
emotion_type=emotion_type,
limit=limit
)
# 计算总关键词数量
total_keywords = len(keywords)
# 格式化响应
response = {
"keywords": keywords,
"total_keywords": total_keywords
}
logger.info(f"情绪词云数据获取完成: total_keywords={total_keywords}")
return response
except Exception as e:
logger.error(f"获取情绪词云数据失败: {str(e)}", exc_info=True)
raise
def _calculate_positivity_rate(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""计算积极率
根据情绪类型分类正面、负面和中性情绪,计算积极率。
公式:(正面数 / (正面数 + 负面数)) * 100
Args:
emotions: 情绪数据列表,每个包含 emotion_type 字段
Returns:
Dict: 包含积极率计算结果:
- score: 积极率分数0-100
- positive_count: 正面情绪数量
- negative_count: 负面情绪数量
- neutral_count: 中性情绪数量
"""
# 定义情绪分类
positive_emotions = {'joy', 'surprise'}
negative_emotions = {'sadness', 'anger', 'fear'}
# 统计各类情绪数量
positive_count = sum(1 for e in emotions if e.get('emotion_type') in positive_emotions)
negative_count = sum(1 for e in emotions if e.get('emotion_type') in negative_emotions)
neutral_count = sum(1 for e in emotions if e.get('emotion_type') == 'neutral')
# 计算积极率
total_non_neutral = positive_count + negative_count
if total_non_neutral > 0:
score = (positive_count / total_non_neutral) * 100
else:
score = 50.0 # 如果没有非中性情绪默认为50
logger.debug(f"积极率计算: positive={positive_count}, negative={negative_count}, "
f"neutral={neutral_count}, score={score:.2f}")
return {
"score": round(score, 2),
"positive_count": positive_count,
"negative_count": negative_count,
"neutral_count": neutral_count
}
def _calculate_stability(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""计算稳定性
基于情绪强度的标准差计算情绪稳定性。
公式:(1 - min(std_deviation, 1.0)) * 100
Args:
emotions: 情绪数据列表,每个包含 emotion_intensity 字段
Returns:
Dict: 包含稳定性计算结果:
- score: 稳定性分数0-100
- std_deviation: 标准差
"""
# 提取所有情绪强度
intensities = [e.get('emotion_intensity', 0.0) for e in emotions if e.get('emotion_intensity') is not None]
# 计算标准差
if len(intensities) >= 2:
std_deviation = statistics.stdev(intensities)
elif len(intensities) == 1:
std_deviation = 0.0 # 只有一个数据点标准差为0
else:
std_deviation = 0.0 # 没有数据标准差为0
# 计算稳定性分数
# 标准差越小,稳定性越高
score = (1 - min(std_deviation, 1.0)) * 100
logger.debug(f"稳定性计算: intensities_count={len(intensities)}, "
f"std_deviation={std_deviation:.3f}, score={score:.2f}")
return {
"score": round(score, 2),
"std_deviation": round(std_deviation, 3)
}
def _calculate_resilience(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""计算恢复力
分析情绪转换模式,统计从负面情绪恢复到正面情绪的能力。
公式:(负面到正面转换次数 / 总负面情绪数) * 100
Args:
emotions: 情绪数据列表,每个包含 emotion_type 和 created_at 字段
应该按时间顺序排列
Returns:
Dict: 包含恢复力计算结果:
- score: 恢复力分数0-100
- recovery_rate: 恢复率(转换次数/负面情绪数)
"""
# 定义情绪分类
positive_emotions = {'joy', 'surprise'}
negative_emotions = {'sadness', 'anger', 'fear'}
# 统计负面到正面的转换次数
recovery_count = 0
negative_count = 0
for i in range(len(emotions)):
current_emotion = emotions[i].get('emotion_type')
# 统计负面情绪总数
if current_emotion in negative_emotions:
negative_count += 1
# 检查下一个情绪是否为正面
if i + 1 < len(emotions):
next_emotion = emotions[i + 1].get('emotion_type')
if next_emotion in positive_emotions:
recovery_count += 1
# 计算恢复力分数
if negative_count > 0:
recovery_rate = recovery_count / negative_count
score = recovery_rate * 100
else:
# 如果没有负面情绪恢复力设为100最佳状态
recovery_rate = 1.0
score = 100.0
logger.debug(f"恢复力计算: negative_count={negative_count}, "
f"recovery_count={recovery_count}, score={score:.2f}")
return {
"score": round(score, 2),
"recovery_rate": round(recovery_rate, 3)
}
async def calculate_emotion_health_index(
self,
end_user_id: str,
time_range: str = "30d"
) -> Dict[str, Any]:
"""计算情绪健康指数
综合积极率、稳定性和恢复力计算情绪健康指数。
Args:
end_user_id: 宿主ID用户组ID
time_range: 时间范围7d/30d/90d
Returns:
Dict: 包含情绪健康指数的完整响应:
- health_score: 综合健康分数0-100
- level: 健康等级(优秀/良好/一般/较差)
- dimensions: 各维度详细数据
- positivity_rate: 积极率
- stability: 稳定性
- resilience: 恢复力
- emotion_distribution: 情绪分布统计
- time_range: 时间范围
"""
try:
logger.info(f"计算情绪健康指数: user={end_user_id}, time_range={time_range}")
# 获取时间范围内的情绪数据
emotions = await self.emotion_repo.get_emotions_in_range(
group_id=end_user_id,
time_range=time_range
)
# 如果没有数据,返回默认值
if not emotions:
logger.warning(f"用户 {end_user_id} 在时间范围 {time_range} 内没有情绪数据")
return {
"health_score": 0.0,
"level": "无数据",
"dimensions": {
"positivity_rate": {"score": 0.0, "positive_count": 0, "negative_count": 0, "neutral_count": 0},
"stability": {"score": 0.0, "std_deviation": 0.0},
"resilience": {"score": 0.0, "recovery_rate": 0.0}
},
"emotion_distribution": {},
"time_range": time_range
}
# 计算各维度指标
positivity_rate = self._calculate_positivity_rate(emotions)
stability = self._calculate_stability(emotions)
resilience = self._calculate_resilience(emotions)
# 计算综合健康分数
# 公式positivity_rate * 0.4 + stability * 0.3 + resilience * 0.3
health_score = (
positivity_rate["score"] * 0.4 +
stability["score"] * 0.3 +
resilience["score"] * 0.3
)
# 确定健康等级
if health_score >= 80:
level = "优秀"
elif health_score >= 60:
level = "良好"
elif health_score >= 40:
level = "一般"
else:
level = "较差"
# 统计情绪分布
emotion_distribution = {}
for emotion_type in ['joy', 'sadness', 'anger', 'fear', 'surprise', 'neutral']:
count = sum(1 for e in emotions if e.get('emotion_type') == emotion_type)
emotion_distribution[emotion_type] = count
# 格式化响应
response = {
"health_score": round(health_score, 2),
"level": level,
"dimensions": {
"positivity_rate": positivity_rate,
"stability": stability,
"resilience": resilience
},
"emotion_distribution": emotion_distribution,
"time_range": time_range
}
logger.info(f"情绪健康指数计算完成: score={health_score:.2f}, level={level}")
return response
except Exception as e:
logger.error(f"计算情绪健康指数失败: {str(e)}", exc_info=True)
raise
def _analyze_emotion_patterns(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""分析情绪模式
识别主要负面情绪、情绪触发因素和波动时段。
Args:
emotions: 情绪数据列表,每个包含 emotion_type、emotion_intensity、created_at 字段
Returns:
Dict: 包含情绪模式分析结果:
- dominant_negative_emotion: 主要负面情绪类型
- high_intensity_emotions: 高强度情绪列表
- emotion_volatility: 情绪波动性(高/中/低)
"""
negative_emotions = {'sadness', 'anger', 'fear'}
# 统计负面情绪分布
negative_emotion_counts = {}
for emotion in emotions:
emotion_type = emotion.get('emotion_type')
if emotion_type in negative_emotions:
negative_emotion_counts[emotion_type] = negative_emotion_counts.get(emotion_type, 0) + 1
# 识别主要负面情绪
dominant_negative_emotion = None
if negative_emotion_counts:
dominant_negative_emotion = max(negative_emotion_counts, key=negative_emotion_counts.get)
# 识别高强度情绪(强度 >= 0.7
high_intensity_emotions = [
{
"type": e.get('emotion_type'),
"intensity": e.get('emotion_intensity'),
"created_at": e.get('created_at')
}
for e in emotions
if e.get('emotion_intensity', 0) >= 0.7
]
# 评估情绪波动性
intensities = [e.get('emotion_intensity', 0.0) for e in emotions if e.get('emotion_intensity') is not None]
if len(intensities) >= 2:
std_dev = statistics.stdev(intensities)
if std_dev > 0.3:
volatility = ""
elif std_dev > 0.15:
volatility = ""
else:
volatility = ""
else:
volatility = "未知"
logger.debug(f"情绪模式分析: dominant_negative={dominant_negative_emotion}, "
f"high_intensity_count={len(high_intensity_emotions)}, volatility={volatility}")
return {
"dominant_negative_emotion": dominant_negative_emotion,
"high_intensity_emotions": high_intensity_emotions[:5], # 最多返回5个
"emotion_volatility": volatility
}
async def generate_emotion_suggestions(
self,
end_user_id: str,
config_id: Optional[int] = None
) -> Dict[str, Any]:
"""生成个性化情绪建议
基于情绪健康数据和用户画像生成个性化建议。
Args:
end_user_id: 宿主ID用户组ID
config_id: 配置ID可选用于从数据库加载LLM配置
Returns:
Dict: 包含个性化建议的响应:
- health_summary: 健康状态摘要
- suggestions: 建议列表3-5条
"""
try:
logger.info(f"生成个性化情绪建议: user={end_user_id}, config_id={config_id}")
# 1. 如果提供了 config_id从数据库加载配置
if config_id is not None:
from app.core.memory.utils.config.definitions import reload_configuration_from_database
config_loaded = reload_configuration_from_database(config_id)
if not config_loaded:
logger.warning(f"无法加载配置 config_id={config_id},将使用默认配置")
# 2. 获取情绪健康数据
health_data = await self.calculate_emotion_health_index(end_user_id, time_range="30d")
# 3. 获取情绪数据用于模式分析
emotions = await self.emotion_repo.get_emotions_in_range(
group_id=end_user_id,
time_range="30d"
)
# 4. 分析情绪模式
patterns = self._analyze_emotion_patterns(emotions)
# 5. 获取用户画像数据简化版直接从Neo4j获取
user_profile = await self._get_simple_user_profile(end_user_id)
# 6. 构建LLM prompt
prompt = await self._build_suggestion_prompt(health_data, patterns, user_profile)
# 7. 调用LLM生成建议使用配置中的LLM
from app.core.memory.utils.llm.llm_utils import get_llm_client
llm_client = get_llm_client()
# 将 prompt 转换为 messages 格式
messages = [
{"role": "user", "content": prompt}
]
response = await llm_client.chat(messages=messages)
response_text = response.content.strip()
# 8. 解析LLM响应
try:
response_data = json.loads(response_text)
suggestions_response = EmotionSuggestionsResponse(**response_data)
except (json.JSONDecodeError, Exception) as e:
logger.error(f"解析LLM响应失败: {str(e)}, response={response_text}")
# 返回默认建议
suggestions_response = self._get_default_suggestions(health_data)
# 8. 验证建议数量3-5条
if len(suggestions_response.suggestions) < 3:
logger.warning(f"建议数量不足: {len(suggestions_response.suggestions)}")
suggestions_response = self._get_default_suggestions(health_data)
elif len(suggestions_response.suggestions) > 5:
logger.warning(f"建议数量过多: {len(suggestions_response.suggestions)}")
suggestions_response.suggestions = suggestions_response.suggestions[:5]
# 9. 格式化响应
response = {
"health_summary": suggestions_response.health_summary,
"suggestions": [
{
"type": s.type,
"title": s.title,
"content": s.content,
"priority": s.priority,
"actionable_steps": s.actionable_steps
}
for s in suggestions_response.suggestions
]
}
logger.info(f"个性化建议生成完成: suggestions_count={len(response['suggestions'])}")
return response
except Exception as e:
logger.error(f"生成个性化建议失败: {str(e)}", exc_info=True)
raise
async def _get_simple_user_profile(self, end_user_id: str) -> Dict[str, Any]:
"""获取简化的用户画像数据
Args:
end_user_id: 用户ID
Returns:
Dict: 用户画像数据
"""
try:
connector = Neo4jConnector()
# 查询用户的实体和标签
query = """
MATCH (e:Entity)
WHERE e.group_id = $group_id
RETURN e.name as name, e.type as type
ORDER BY e.created_at DESC
LIMIT 20
"""
entities = await connector.execute_query(query, group_id=end_user_id)
# 提取兴趣标签
interests = [e["name"] for e in entities if e.get("type") in ["INTEREST", "HOBBY"]][:5]
# 后期会引入用户的习惯。。
return {
"interests": interests if interests else ["未知"]
}
except Exception as e:
logger.error(f"获取用户画像失败: {str(e)}")
return {"interests": ["未知"]}
async def _build_suggestion_prompt(
self,
health_data: Dict[str, Any],
patterns: Dict[str, Any],
user_profile: Dict[str, Any]
) -> str:
"""构建情绪建议生成的prompt
Args:
health_data: 情绪健康数据
patterns: 情绪模式分析结果
user_profile: 用户画像数据
Returns:
str: LLM prompt
"""
from app.core.memory.utils.prompt.prompt_utils import render_emotion_suggestions_prompt
prompt = await render_emotion_suggestions_prompt(
health_data=health_data,
patterns=patterns,
user_profile=user_profile
)
return prompt
def _get_default_suggestions(self, health_data: Dict[str, Any]) -> EmotionSuggestionsResponse:
"""获取默认建议当LLM调用失败时使用
Args:
health_data: 情绪健康数据
Returns:
EmotionSuggestionsResponse: 默认建议
"""
health_score = health_data.get('health_score', 0)
if health_score >= 80:
summary = "您的情绪健康状况优秀,请继续保持积极的生活态度。"
elif health_score >= 60:
summary = "您的情绪健康状况良好,可以通过一些调整进一步提升。"
elif health_score >= 40:
summary = "您的情绪健康需要关注,建议采取一些改善措施。"
else:
summary = "您的情绪健康需要重点关注,建议寻求专业帮助。"
suggestions = [
EmotionSuggestion(
type="emotion_balance",
title="保持情绪平衡",
content="通过正念冥想和深呼吸练习,帮助您更好地管理情绪波动,提升情绪稳定性。",
priority="high",
actionable_steps=[
"每天早晨进行5-10分钟的正念冥想",
"感到情绪波动时进行3次深呼吸",
"记录每天的情绪变化,识别触发因素"
]
),
EmotionSuggestion(
type="activity_recommendation",
title="增加户外活动",
content="适度的户外运动可以有效改善情绪增强身心健康。建议每周进行3-4次户外活动。",
priority="medium",
actionable_steps=[
"每周安排2-3次30分钟的散步",
"周末尝试户外运动如骑行或爬山",
"在户外活动时关注周围环境,放松心情"
]
),
EmotionSuggestion(
type="social_connection",
title="加强社交联系",
content="与朋友和家人保持良好的社交联系,可以提供情感支持,改善情绪健康。",
priority="medium",
actionable_steps=[
"每周至少与一位朋友或家人深入交流",
"参加感兴趣的社交活动或兴趣小组",
"主动分享自己的感受和想法"
]
)
]
return EmotionSuggestionsResponse(
health_summary=summary,
suggestions=suggestions
)

View File

@@ -0,0 +1,212 @@
# -*- coding: utf-8 -*-
"""情绪配置服务模块
本模块提供情绪引擎配置的管理功能,包括获取和更新配置。
Classes:
EmotionConfigService: 情绪配置服务,提供配置管理功能
"""
from typing import Dict, Any
from sqlalchemy.orm import Session
from app.models.data_config_model import DataConfig
from app.core.logging_config import get_business_logger
logger = get_business_logger()
class EmotionConfigService:
"""情绪配置服务
提供情绪引擎配置的管理功能,包括:
- 获取情绪配置
- 更新情绪配置
- 验证配置参数
Attributes:
db: 数据库会话
"""
def __init__(self, db: Session):
"""初始化情绪配置服务
Args:
db: 数据库会话
"""
self.db = db
logger.info("情绪配置服务初始化完成")
def get_emotion_config(self, config_id: int) -> Dict[str, Any]:
"""获取情绪引擎配置
查询指定配置ID的情绪相关配置字段。
Args:
config_id: 配置ID
Returns:
Dict: 包含情绪配置的响应数据:
- config_id: 配置ID
- emotion_enabled: 是否启用情绪提取
- emotion_model_id: 情绪分析专用模型ID
- emotion_extract_keywords: 是否提取情绪关键词
- emotion_min_intensity: 最小情绪强度阈值
- emotion_enable_subject: 是否启用主体分类
Raises:
ValueError: 当配置不存在时
"""
try:
logger.info(f"获取情绪配置: config_id={config_id}")
# 查询配置
config = self.db.query(DataConfig).filter(
DataConfig.config_id == config_id
).first()
if not config:
logger.error(f"配置不存在: config_id={config_id}")
raise ValueError(f"配置不存在: config_id={config_id}")
# 提取情绪相关字段
emotion_config = {
"config_id": config.config_id,
"emotion_enabled": config.emotion_enabled,
"emotion_model_id": config.emotion_model_id,
"emotion_extract_keywords": config.emotion_extract_keywords,
"emotion_min_intensity": config.emotion_min_intensity,
"emotion_enable_subject": config.emotion_enable_subject
}
logger.info(f"情绪配置获取成功: config_id={config_id}")
return emotion_config
except ValueError:
raise
except Exception as e:
logger.error(f"获取情绪配置失败: {str(e)}", exc_info=True)
raise
def validate_emotion_config(self, config_data: Dict[str, Any]) -> bool:
"""验证情绪配置参数
验证配置参数的有效性,包括:
- emotion_min_intensity 在 [0.0, 1.0] 范围内
- 布尔字段类型正确
- emotion_model_id 格式有效(如果提供)
Args:
config_data: 配置数据字典
Returns:
bool: 验证是否通过
Raises:
ValueError: 当配置参数无效时
"""
try:
logger.debug(f"验证情绪配置参数: {config_data}")
# 验证 emotion_min_intensity 范围
if "emotion_min_intensity" in config_data:
min_intensity = config_data["emotion_min_intensity"]
if not isinstance(min_intensity, (int, float)):
raise ValueError("emotion_min_intensity 必须是数字类型")
if not (0.0 <= min_intensity <= 1.0):
raise ValueError("emotion_min_intensity 必须在 0.0 到 1.0 之间")
# 验证布尔字段
bool_fields = ["emotion_enabled", "emotion_extract_keywords", "emotion_enable_subject"]
for field in bool_fields:
if field in config_data:
value = config_data[field]
if not isinstance(value, bool):
raise ValueError(f"{field} 必须是布尔类型")
# 验证 emotion_model_id如果提供
if "emotion_model_id" in config_data:
model_id = config_data["emotion_model_id"]
if model_id is not None and not isinstance(model_id, str):
raise ValueError("emotion_model_id 必须是字符串类型或 null")
if model_id is not None and len(model_id.strip()) == 0:
raise ValueError("emotion_model_id 不能为空字符串")
logger.debug("情绪配置参数验证通过")
return True
except ValueError as e:
logger.warning(f"配置参数验证失败: {str(e)}")
raise
except Exception as e:
logger.error(f"验证配置参数时发生错误: {str(e)}", exc_info=True)
raise ValueError(f"验证配置参数失败: {str(e)}")
def update_emotion_config(
self,
config_id: int,
config_data: Dict[str, Any]
) -> Dict[str, Any]:
"""更新情绪引擎配置
更新指定配置ID的情绪相关配置字段。
Args:
config_id: 配置ID
config_data: 要更新的配置数据,可包含以下字段:
- emotion_enabled: 是否启用情绪提取
- emotion_model_id: 情绪分析专用模型ID
- emotion_extract_keywords: 是否提取情绪关键词
- emotion_min_intensity: 最小情绪强度阈值
- emotion_enable_subject: 是否启用主体分类
Returns:
Dict: 更新后的完整情绪配置
Raises:
ValueError: 当配置不存在或参数无效时
"""
try:
logger.info(f"更新情绪配置: config_id={config_id}, data={config_data}")
# 验证配置参数
self.validate_emotion_config(config_data)
# 查询配置
config = self.db.query(DataConfig).filter(
DataConfig.config_id == config_id
).first()
if not config:
logger.error(f"配置不存在: config_id={config_id}")
raise ValueError(f"配置不存在: config_id={config_id}")
# 更新字段
if "emotion_enabled" in config_data:
config.emotion_enabled = config_data["emotion_enabled"]
if "emotion_model_id" in config_data:
config.emotion_model_id = config_data["emotion_model_id"]
if "emotion_extract_keywords" in config_data:
config.emotion_extract_keywords = config_data["emotion_extract_keywords"]
if "emotion_min_intensity" in config_data:
config.emotion_min_intensity = config_data["emotion_min_intensity"]
if "emotion_enable_subject" in config_data:
config.emotion_enable_subject = config_data["emotion_enable_subject"]
# 提交更改
self.db.commit()
self.db.refresh(config)
# 返回更新后的配置
updated_config = self.get_emotion_config(config_id)
logger.info(f"情绪配置更新成功: config_id={config_id}")
return updated_config
except ValueError:
self.db.rollback()
raise
except Exception as e:
self.db.rollback()
logger.error(f"更新情绪配置失败: {str(e)}", exc_info=True)
raise

View File

@@ -0,0 +1,200 @@
"""Emotion extraction service for analyzing emotions from statements.
This service extracts emotion information from user statements using LLM,
including emotion type, intensity, keywords, subject classification, and target.
Classes:
EmotionExtractionService: Service for extracting emotions from statements
"""
import logging
from typing import Optional
from app.core.memory.models.emotion_models import EmotionExtraction
from app.models.data_config_model import DataConfig
from app.core.memory.utils.llm.llm_utils import get_llm_client
from app.core.memory.llm_tools.llm_client import LLMClientException
logger = logging.getLogger(__name__)
class EmotionExtractionService:
"""Service for extracting emotion information from statements.
This service uses LLM to analyze statements and extract structured emotion
information including type, intensity, keywords, subject, and target.
It respects configuration settings for enabling/disabling extraction and
filtering by intensity threshold.
Attributes:
llm_client: LLM client for making structured output calls
"""
def __init__(self, llm_id: Optional[str] = None):
"""Initialize the emotion extraction service.
Args:
llm_id: Optional LLM model ID. If None, uses default from config.
"""
self.llm_client = None
self.llm_id = llm_id
logger.info(f"Initialized EmotionExtractionService with llm_id={llm_id}")
def _get_llm_client(self, model_id: Optional[str] = None):
"""Get or create LLM client instance.
Args:
model_id: Optional model ID to use. If None, uses instance llm_id.
Returns:
LLM client instance
"""
if self.llm_client is None or model_id:
effective_model_id = model_id or self.llm_id
self.llm_client = get_llm_client(effective_model_id)
return self.llm_client
async def extract_emotion(
self,
statement: str,
config: DataConfig
) -> Optional[EmotionExtraction]:
"""Extract emotion information from a statement.
This method checks if emotion extraction is enabled in the config,
builds an appropriate prompt, calls the LLM for structured output,
and applies intensity threshold filtering.
Args:
statement: The statement text to analyze
config: Data configuration object containing emotion settings
Returns:
EmotionExtraction object if extraction succeeds and passes threshold,
None if extraction is disabled, fails, or doesn't meet threshold
Raises:
No exceptions are raised - failures are logged and return None
"""
# Check if emotion extraction is enabled
if not config.emotion_enabled:
logger.debug("Emotion extraction is disabled in config")
return None
# Validate statement
if not statement or not statement.strip():
logger.warning("Empty statement provided for emotion extraction")
return None
try:
# Build the emotion extraction prompt
prompt = await self._build_emotion_prompt(
statement=statement,
extract_keywords=config.emotion_extract_keywords,
enable_subject=config.emotion_enable_subject
)
# Call LLM for structured output
emotion = await self._call_llm_structured(
prompt=prompt,
model_id=config.emotion_model_id
)
# Apply intensity threshold filtering
if emotion.emotion_intensity < config.emotion_min_intensity:
logger.debug(
f"Emotion intensity {emotion.emotion_intensity} below threshold "
f"{config.emotion_min_intensity}, skipping storage"
)
return None
logger.info(
f"Successfully extracted emotion: type={emotion.emotion_type}, "
f"intensity={emotion.emotion_intensity}, subject={emotion.emotion_subject}"
)
return emotion
except Exception as e:
logger.error(
f"Emotion extraction failed for statement: {statement[:50]}..., "
f"error: {str(e)}",
exc_info=True
)
return None
async def _build_emotion_prompt(
self,
statement: str,
extract_keywords: bool,
enable_subject: bool
) -> str:
"""Build the emotion extraction prompt based on configuration.
This method constructs a detailed prompt for the LLM that includes
instructions for emotion type classification, intensity assessment,
and optionally keyword extraction and subject classification.
Args:
statement: The statement to analyze
extract_keywords: Whether to extract emotion keywords
enable_subject: Whether to enable subject classification
Returns:
Formatted prompt string for LLM
"""
from app.core.memory.utils.prompt.prompt_utils import render_emotion_extraction_prompt
prompt = await render_emotion_extraction_prompt(
statement=statement,
extract_keywords=extract_keywords,
enable_subject=enable_subject
)
return prompt
async def _call_llm_structured(
self,
prompt: str,
model_id: Optional[str] = None
) -> EmotionExtraction:
"""Call LLM for structured emotion extraction output.
This method uses the LLM client's response_structured method to get
a validated EmotionExtraction object from the LLM.
Args:
prompt: The formatted prompt for emotion extraction
model_id: Optional model ID to use for this call
Returns:
EmotionExtraction object with validated emotion data
Raises:
LLMClientException: If LLM call fails or times out
ValidationError: If LLM response doesn't match expected schema
"""
try:
# Get LLM client
llm_client = self._get_llm_client(model_id)
# Prepare messages
messages = [
{"role": "user", "content": prompt}
]
# Call LLM with structured output
emotion = await llm_client.response_structured(
messages=messages,
response_model=EmotionExtraction,
temperature=0.3,
max_tokens=500
)
return emotion
except LLMClientException as e:
logger.error(f"LLM call failed: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error in LLM structured call: {str(e)}")
raise LLMClientException(f"Emotion extraction LLM call failed: {str(e)}")