Files
MemoryBear/api/app/repositories/neo4j/emotion_repository.py
乐力齐 3ca3e8e023 Fix/bug en zh (#385)
* [fix]The log retains genuine alerts and errors, while filtering out unnecessary noise.

* [fix]Scenario English and Chinese, emotion specifications

* [fix]Change the "no data" scenario from 0.0 to None

* [fix]The emotional health indicators, emotional advice, and emotional distribution analysis are all linked together.

* [fix]The emotional health indicators, emotional advice, and emotional distribution analysis are all linked together.

* [fix]Separate expected errors from unexpected errors
2026-02-10 13:46:09 +08:00

248 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""情绪数据仓储模块
本模块提供情绪数据的查询功能,用于情绪分析和统计。
Classes:
EmotionRepository: 情绪数据仓储,提供情绪标签、词云、健康指数等查询方法
"""
from typing import List, Dict, Optional, Any
from datetime import datetime, timedelta
import json
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.core.logging_config import get_business_logger
logger = get_business_logger()
class EmotionRepository:
"""情绪数据仓储
提供情绪数据的查询和统计功能,包括:
- 情绪标签统计
- 情绪词云数据
- 时间范围内的情绪数据查询
Attributes:
connector: Neo4j连接器实例
"""
def __init__(self, connector: Neo4jConnector):
"""初始化情绪数据仓储
Args:
connector: Neo4j连接器实例
"""
self.connector = connector
logger.info("情绪数据仓储初始化完成")
async def get_emotion_tags(
self,
end_user_id: str,
emotion_type: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 10
) -> List[Dict[str, Any]]:
"""获取情绪标签统计
查询指定用户的情绪类型分布,包括计数、百分比和平均强度。
Args:
end_user_id: 用户组ID宿主ID
emotion_type: 可选的情绪类型过滤joy/sadness/anger/fear/surprise/neutral
start_date: 可选的开始日期ISO格式字符串
end_date: 可选的结束日期ISO格式字符串
limit: 返回结果的最大数量
Returns:
List[Dict]: 情绪标签列表,每个包含:
- emotion_type: 情绪类型
- count: 该类型的数量
- percentage: 占比百分比
- avg_intensity: 平均强度
"""
# 构建查询条件
where_clauses = ["s.end_user_id = $end_user_id", "s.emotion_type IS NOT NULL"]
params = {"end_user_id": end_user_id, "limit": limit}
if emotion_type:
where_clauses.append("s.emotion_type = $emotion_type")
params["emotion_type"] = emotion_type
if start_date:
where_clauses.append("s.created_at IS NOT NULL AND datetime(s.created_at) >= datetime($start_date)")
params["start_date"] = start_date
if end_date:
where_clauses.append("s.created_at IS NOT NULL AND datetime(s.created_at) <= datetime($end_date)")
params["end_date"] = end_date
where_str = " AND ".join(where_clauses)
# 优化的 Cypher 查询:使用索引,减少中间结果
query = f"""
MATCH (s:Statement)
WHERE {where_str}
WITH s.emotion_type as emotion_type,
count(*) as count,
avg(s.emotion_intensity) as avg_intensity
WITH collect({{emotion_type: emotion_type, count: count, avg_intensity: avg_intensity}}) as results,
sum(count) as total_count
UNWIND results as result
RETURN result.emotion_type as emotion_type,
result.count as count,
toFloat(result.count) / total_count * 100 as percentage,
result.avg_intensity as avg_intensity
ORDER BY count DESC
LIMIT $limit
"""
try:
results = await self.connector.execute_query(query, **params)
formatted_results = [
{
"emotion_type": record["emotion_type"],
"count": record["count"],
"percentage": round(record["percentage"], 2),
"avg_intensity": round(record["avg_intensity"], 3) if record["avg_intensity"] else 0.0
}
for record in results
]
return formatted_results
except Exception as e:
logger.error(f"查询情绪标签失败: {str(e)}", exc_info=True)
return []
async def get_emotion_wordcloud(
self,
end_user_id: str,
emotion_type: Optional[str] = None,
limit: int = 50
) -> List[Dict[str, Any]]:
"""获取情绪词云数据
查询情绪关键词及其频率,用于生成词云可视化。
Args:
end_user_id: 用户组ID宿主ID
emotion_type: 可选的情绪类型过滤
limit: 返回关键词的最大数量
Returns:
List[Dict]: 关键词列表,每个包含:
- keyword: 关键词
- frequency: 出现频率
- emotion_type: 关联的情绪类型
- avg_intensity: 平均强度
"""
# 构建查询条件
where_clauses = ["s.end_user_id = $end_user_id", "s.emotion_keywords IS NOT NULL"]
params = {"end_user_id": end_user_id, "limit": limit}
if emotion_type:
where_clauses.append("s.emotion_type = $emotion_type")
params["emotion_type"] = emotion_type
where_str = " AND ".join(where_clauses)
# 优化的 Cypher 查询:使用索引,减少不必要的计算
query = f"""
MATCH (s:Statement)
WHERE {where_str}
UNWIND s.emotion_keywords as keyword
WITH keyword,
s.emotion_type as emotion_type,
count(*) as frequency,
avg(s.emotion_intensity) as avg_intensity
WHERE keyword IS NOT NULL AND keyword <> ''
RETURN keyword,
frequency,
emotion_type,
avg_intensity
ORDER BY frequency DESC
LIMIT $limit
"""
try:
results = await self.connector.execute_query(query, **params)
formatted_results = [
{
"keyword": record["keyword"],
"frequency": record["frequency"],
"emotion_type": record["emotion_type"],
"avg_intensity": round(record["avg_intensity"], 3) if record["avg_intensity"] else 0.0
}
for record in results
]
return formatted_results
except Exception as e:
logger.error(f"查询情绪词云失败: {str(e)}", exc_info=True)
return []
async def get_emotions_in_range(
self,
end_user_id: str,
time_range: str = "30d"
) -> List[Dict[str, Any]]:
"""获取时间范围内的情绪数据
查询指定时间范围内的所有情绪数据,用于健康指数计算。
Args:
end_user_id: 用户组ID宿主ID
time_range: 时间范围7d/30d/90d
Returns:
List[Dict]: 情绪数据列表,每个包含:
- emotion_type: 情绪类型
- emotion_intensity: 情绪强度
- created_at: 创建时间
- statement_id: 陈述句ID
"""
# 解析时间范围
days_map = {"7d": 7, "30d": 30, "90d": 90}
days = days_map.get(time_range, 30)
# 计算起始日期(使用字符串比较,避免时区问题)
start_date = (datetime.now() - timedelta(days=days)).isoformat()
# 使用 datetime() 函数进行时间比较,与其他查询保持一致
query = """
MATCH (s:Statement)
WHERE s.end_user_id = $end_user_id
AND s.emotion_type IS NOT NULL
AND s.created_at IS NOT NULL
AND datetime(s.created_at) >= datetime($start_date)
RETURN s.id as statement_id,
s.emotion_type as emotion_type,
s.emotion_intensity as emotion_intensity,
s.created_at as created_at
ORDER BY datetime(s.created_at) ASC
"""
try:
results = await self.connector.execute_query(
query,
end_user_id=end_user_id,
start_date=start_date
)
formatted_results = [
{
"statement_id": record["statement_id"],
"emotion_type": record["emotion_type"],
"emotion_intensity": record["emotion_intensity"],
"created_at": record["created_at"].isoformat() if hasattr(record["created_at"], "isoformat") else str(record["created_at"])
}
for record in results
]
return formatted_results
except Exception as e:
logger.error(f"查询时间范围情绪数据失败: {str(e)}", exc_info=True)
return []