* [fix]The log retains genuine alerts and errors, while filtering out unnecessary noise. * [fix]Scenario English and Chinese, emotion specifications * [fix]Change the "no data" scenario from 0.0 to None * [fix]The emotional health indicators, emotional advice, and emotional distribution analysis are all linked together. * [fix]The emotional health indicators, emotional advice, and emotional distribution analysis are all linked together. * [fix]Separate expected errors from unexpected errors
248 lines
8.9 KiB
Python
248 lines
8.9 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""情绪数据仓储模块
|
||
|
||
本模块提供情绪数据的查询功能,用于情绪分析和统计。
|
||
|
||
Classes:
|
||
EmotionRepository: 情绪数据仓储,提供情绪标签、词云、健康指数等查询方法
|
||
"""
|
||
|
||
from typing import List, Dict, Optional, Any
|
||
from datetime import datetime, timedelta
|
||
import json
|
||
|
||
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
|
||
from app.core.logging_config import get_business_logger
|
||
|
||
logger = get_business_logger()
|
||
|
||
|
||
class EmotionRepository:
|
||
"""情绪数据仓储
|
||
|
||
提供情绪数据的查询和统计功能,包括:
|
||
- 情绪标签统计
|
||
- 情绪词云数据
|
||
- 时间范围内的情绪数据查询
|
||
|
||
Attributes:
|
||
connector: Neo4j连接器实例
|
||
"""
|
||
|
||
def __init__(self, connector: Neo4jConnector):
|
||
"""初始化情绪数据仓储
|
||
|
||
Args:
|
||
connector: Neo4j连接器实例
|
||
"""
|
||
self.connector = connector
|
||
logger.info("情绪数据仓储初始化完成")
|
||
|
||
async def get_emotion_tags(
|
||
self,
|
||
end_user_id: str,
|
||
emotion_type: Optional[str] = None,
|
||
start_date: Optional[str] = None,
|
||
end_date: Optional[str] = None,
|
||
limit: int = 10
|
||
) -> List[Dict[str, Any]]:
|
||
"""获取情绪标签统计
|
||
|
||
查询指定用户的情绪类型分布,包括计数、百分比和平均强度。
|
||
|
||
Args:
|
||
end_user_id: 用户组ID(宿主ID)
|
||
emotion_type: 可选的情绪类型过滤(joy/sadness/anger/fear/surprise/neutral)
|
||
start_date: 可选的开始日期(ISO格式字符串)
|
||
end_date: 可选的结束日期(ISO格式字符串)
|
||
limit: 返回结果的最大数量
|
||
|
||
Returns:
|
||
List[Dict]: 情绪标签列表,每个包含:
|
||
- emotion_type: 情绪类型
|
||
- count: 该类型的数量
|
||
- percentage: 占比百分比
|
||
- avg_intensity: 平均强度
|
||
"""
|
||
# 构建查询条件
|
||
where_clauses = ["s.end_user_id = $end_user_id", "s.emotion_type IS NOT NULL"]
|
||
params = {"end_user_id": end_user_id, "limit": limit}
|
||
|
||
if emotion_type:
|
||
where_clauses.append("s.emotion_type = $emotion_type")
|
||
params["emotion_type"] = emotion_type
|
||
|
||
if start_date:
|
||
where_clauses.append("s.created_at IS NOT NULL AND datetime(s.created_at) >= datetime($start_date)")
|
||
params["start_date"] = start_date
|
||
|
||
if end_date:
|
||
where_clauses.append("s.created_at IS NOT NULL AND datetime(s.created_at) <= datetime($end_date)")
|
||
params["end_date"] = end_date
|
||
|
||
where_str = " AND ".join(where_clauses)
|
||
|
||
# 优化的 Cypher 查询:使用索引,减少中间结果
|
||
query = f"""
|
||
MATCH (s:Statement)
|
||
WHERE {where_str}
|
||
WITH s.emotion_type as emotion_type,
|
||
count(*) as count,
|
||
avg(s.emotion_intensity) as avg_intensity
|
||
WITH collect({{emotion_type: emotion_type, count: count, avg_intensity: avg_intensity}}) as results,
|
||
sum(count) as total_count
|
||
UNWIND results as result
|
||
RETURN result.emotion_type as emotion_type,
|
||
result.count as count,
|
||
toFloat(result.count) / total_count * 100 as percentage,
|
||
result.avg_intensity as avg_intensity
|
||
ORDER BY count DESC
|
||
LIMIT $limit
|
||
"""
|
||
|
||
try:
|
||
results = await self.connector.execute_query(query, **params)
|
||
formatted_results = [
|
||
{
|
||
"emotion_type": record["emotion_type"],
|
||
"count": record["count"],
|
||
"percentage": round(record["percentage"], 2),
|
||
"avg_intensity": round(record["avg_intensity"], 3) if record["avg_intensity"] else 0.0
|
||
}
|
||
for record in results
|
||
]
|
||
|
||
return formatted_results
|
||
except Exception as e:
|
||
logger.error(f"查询情绪标签失败: {str(e)}", exc_info=True)
|
||
return []
|
||
|
||
async def get_emotion_wordcloud(
|
||
self,
|
||
end_user_id: str,
|
||
emotion_type: Optional[str] = None,
|
||
limit: int = 50
|
||
) -> List[Dict[str, Any]]:
|
||
"""获取情绪词云数据
|
||
|
||
查询情绪关键词及其频率,用于生成词云可视化。
|
||
|
||
Args:
|
||
end_user_id: 用户组ID(宿主ID)
|
||
emotion_type: 可选的情绪类型过滤
|
||
limit: 返回关键词的最大数量
|
||
|
||
Returns:
|
||
List[Dict]: 关键词列表,每个包含:
|
||
- keyword: 关键词
|
||
- frequency: 出现频率
|
||
- emotion_type: 关联的情绪类型
|
||
- avg_intensity: 平均强度
|
||
"""
|
||
# 构建查询条件
|
||
where_clauses = ["s.end_user_id = $end_user_id", "s.emotion_keywords IS NOT NULL"]
|
||
params = {"end_user_id": end_user_id, "limit": limit}
|
||
|
||
if emotion_type:
|
||
where_clauses.append("s.emotion_type = $emotion_type")
|
||
params["emotion_type"] = emotion_type
|
||
|
||
where_str = " AND ".join(where_clauses)
|
||
|
||
# 优化的 Cypher 查询:使用索引,减少不必要的计算
|
||
query = f"""
|
||
MATCH (s:Statement)
|
||
WHERE {where_str}
|
||
UNWIND s.emotion_keywords as keyword
|
||
WITH keyword,
|
||
s.emotion_type as emotion_type,
|
||
count(*) as frequency,
|
||
avg(s.emotion_intensity) as avg_intensity
|
||
WHERE keyword IS NOT NULL AND keyword <> ''
|
||
RETURN keyword,
|
||
frequency,
|
||
emotion_type,
|
||
avg_intensity
|
||
ORDER BY frequency DESC
|
||
LIMIT $limit
|
||
"""
|
||
|
||
try:
|
||
results = await self.connector.execute_query(query, **params)
|
||
formatted_results = [
|
||
{
|
||
"keyword": record["keyword"],
|
||
"frequency": record["frequency"],
|
||
"emotion_type": record["emotion_type"],
|
||
"avg_intensity": round(record["avg_intensity"], 3) if record["avg_intensity"] else 0.0
|
||
}
|
||
for record in results
|
||
]
|
||
|
||
return formatted_results
|
||
except Exception as e:
|
||
logger.error(f"查询情绪词云失败: {str(e)}", exc_info=True)
|
||
return []
|
||
|
||
async def get_emotions_in_range(
|
||
self,
|
||
end_user_id: str,
|
||
time_range: str = "30d"
|
||
) -> List[Dict[str, Any]]:
|
||
"""获取时间范围内的情绪数据
|
||
|
||
查询指定时间范围内的所有情绪数据,用于健康指数计算。
|
||
|
||
Args:
|
||
end_user_id: 用户组ID(宿主ID)
|
||
time_range: 时间范围(7d/30d/90d)
|
||
|
||
Returns:
|
||
List[Dict]: 情绪数据列表,每个包含:
|
||
- emotion_type: 情绪类型
|
||
- emotion_intensity: 情绪强度
|
||
- created_at: 创建时间
|
||
- statement_id: 陈述句ID
|
||
"""
|
||
# 解析时间范围
|
||
days_map = {"7d": 7, "30d": 30, "90d": 90}
|
||
days = days_map.get(time_range, 30)
|
||
|
||
# 计算起始日期(使用字符串比较,避免时区问题)
|
||
start_date = (datetime.now() - timedelta(days=days)).isoformat()
|
||
|
||
# 使用 datetime() 函数进行时间比较,与其他查询保持一致
|
||
query = """
|
||
MATCH (s:Statement)
|
||
WHERE s.end_user_id = $end_user_id
|
||
AND s.emotion_type IS NOT NULL
|
||
AND s.created_at IS NOT NULL
|
||
AND datetime(s.created_at) >= datetime($start_date)
|
||
RETURN s.id as statement_id,
|
||
s.emotion_type as emotion_type,
|
||
s.emotion_intensity as emotion_intensity,
|
||
s.created_at as created_at
|
||
ORDER BY datetime(s.created_at) ASC
|
||
"""
|
||
|
||
try:
|
||
results = await self.connector.execute_query(
|
||
query,
|
||
end_user_id=end_user_id,
|
||
start_date=start_date
|
||
)
|
||
formatted_results = [
|
||
{
|
||
"statement_id": record["statement_id"],
|
||
"emotion_type": record["emotion_type"],
|
||
"emotion_intensity": record["emotion_intensity"],
|
||
"created_at": record["created_at"].isoformat() if hasattr(record["created_at"], "isoformat") else str(record["created_at"])
|
||
}
|
||
for record in results
|
||
]
|
||
|
||
return formatted_results
|
||
except Exception as e:
|
||
logger.error(f"查询时间范围情绪数据失败: {str(e)}", exc_info=True)
|
||
return []
|