324 lines
9.8 KiB
Python
324 lines
9.8 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""情绪分析控制器模块
|
||
|
||
本模块提供情绪分析相关的API端点,包括情绪标签、词云、健康指数和个性化建议。
|
||
|
||
Routes:
|
||
POST /emotion/tags - 获取情绪标签统计
|
||
POST /emotion/wordcloud - 获取情绪词云数据
|
||
POST /emotion/health - 获取情绪健康指数
|
||
POST /emotion/suggestions - 获取个性化情绪建议
|
||
"""
|
||
|
||
from app.core.error_codes import BizCode
|
||
from app.core.logging_config import get_api_logger
|
||
from app.core.response_utils import fail, success
|
||
from app.dependencies import get_current_user, get_db
|
||
from app.models.user_model import User
|
||
from app.schemas.emotion_schema import (
|
||
EmotionHealthRequest,
|
||
EmotionSuggestionsRequest,
|
||
EmotionGenerateSuggestionsRequest,
|
||
EmotionTagsRequest,
|
||
EmotionWordcloudRequest,
|
||
)
|
||
from app.schemas.response_schema import ApiResponse
|
||
from app.services.emotion_analytics_service import EmotionAnalyticsService
|
||
from fastapi import APIRouter, Depends, HTTPException, status,Header
|
||
from sqlalchemy.orm import Session
|
||
|
||
# 获取API专用日志器
|
||
api_logger = get_api_logger()
|
||
|
||
router = APIRouter(
|
||
prefix="/memory/emotion-memory",
|
||
tags=["Emotion Analysis"],
|
||
dependencies=[Depends(get_current_user)] # 所有路由都需要认证
|
||
)
|
||
|
||
|
||
# 初始化情绪分析服务uv
|
||
emotion_service = EmotionAnalyticsService()
|
||
|
||
|
||
|
||
@router.post("/tags", response_model=ApiResponse)
|
||
async def get_emotion_tags(
|
||
request: EmotionTagsRequest,
|
||
language_type: str = Header(default="zh", alias="X-Language-Type"),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
|
||
try:
|
||
api_logger.info(
|
||
f"用户 {current_user.username} 请求获取情绪标签统计",
|
||
extra={
|
||
"group_id": request.group_id,
|
||
"emotion_type": request.emotion_type,
|
||
"start_date": request.start_date,
|
||
"end_date": request.end_date,
|
||
"limit": request.limit
|
||
}
|
||
)
|
||
|
||
# 调用服务层
|
||
data = await emotion_service.get_emotion_tags(
|
||
end_user_id=request.group_id,
|
||
emotion_type=request.emotion_type,
|
||
start_date=request.start_date,
|
||
end_date=request.end_date,
|
||
limit=request.limit
|
||
)
|
||
|
||
api_logger.info(
|
||
"情绪标签统计获取成功",
|
||
extra={
|
||
"group_id": request.group_id,
|
||
"total_count": data.get("total_count", 0),
|
||
"tags_count": len(data.get("tags", []))
|
||
}
|
||
)
|
||
|
||
return success(data=data, msg="情绪标签获取成功")
|
||
|
||
except Exception as e:
|
||
api_logger.error(
|
||
f"获取情绪标签统计失败: {str(e)}",
|
||
extra={"group_id": request.group_id},
|
||
exc_info=True
|
||
)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"获取情绪标签统计失败: {str(e)}"
|
||
)
|
||
|
||
|
||
|
||
@router.post("/wordcloud", response_model=ApiResponse)
|
||
async def get_emotion_wordcloud(
|
||
request: EmotionWordcloudRequest,
|
||
language_type: str = Header(default="zh", alias="X-Language-Type"),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
|
||
try:
|
||
api_logger.info(
|
||
f"用户 {current_user.username} 请求获取情绪词云数据",
|
||
extra={
|
||
"group_id": request.group_id,
|
||
"emotion_type": request.emotion_type,
|
||
"limit": request.limit
|
||
}
|
||
)
|
||
|
||
# 调用服务层
|
||
data = await emotion_service.get_emotion_wordcloud(
|
||
end_user_id=request.group_id,
|
||
emotion_type=request.emotion_type,
|
||
limit=request.limit
|
||
)
|
||
|
||
api_logger.info(
|
||
"情绪词云数据获取成功",
|
||
extra={
|
||
"group_id": request.group_id,
|
||
"total_keywords": data.get("total_keywords", 0)
|
||
}
|
||
)
|
||
|
||
return success(data=data, msg="情绪词云获取成功")
|
||
|
||
except Exception as e:
|
||
api_logger.error(
|
||
f"获取情绪词云数据失败: {str(e)}",
|
||
extra={"group_id": request.group_id},
|
||
exc_info=True
|
||
)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"获取情绪词云数据失败: {str(e)}"
|
||
)
|
||
|
||
|
||
|
||
@router.post("/health", response_model=ApiResponse)
|
||
async def get_emotion_health(
|
||
request: EmotionHealthRequest,
|
||
language_type: str = Header(default="zh", alias="X-Language-Type"),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
|
||
try:
|
||
# 验证时间范围参数
|
||
if request.time_range not in ["7d", "30d", "90d"]:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail="时间范围参数无效,必须是 7d、30d 或 90d"
|
||
)
|
||
|
||
api_logger.info(
|
||
f"用户 {current_user.username} 请求获取情绪健康指数",
|
||
extra={
|
||
"group_id": request.group_id,
|
||
"time_range": request.time_range
|
||
}
|
||
)
|
||
|
||
# 调用服务层
|
||
data = await emotion_service.calculate_emotion_health_index(
|
||
end_user_id=request.group_id,
|
||
time_range=request.time_range
|
||
)
|
||
|
||
api_logger.info(
|
||
"情绪健康指数获取成功",
|
||
extra={
|
||
"group_id": request.group_id,
|
||
"health_score": data.get("health_score", 0),
|
||
"level": data.get("level", "未知")
|
||
}
|
||
)
|
||
|
||
return success(data=data, msg="情绪健康指数获取成功")
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
api_logger.error(
|
||
f"获取情绪健康指数失败: {str(e)}",
|
||
extra={"group_id": request.group_id},
|
||
exc_info=True
|
||
)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"获取情绪健康指数失败: {str(e)}"
|
||
)
|
||
|
||
|
||
|
||
@router.post("/suggestions", response_model=ApiResponse)
|
||
async def get_emotion_suggestions(
|
||
request: EmotionSuggestionsRequest,
|
||
language_type: str = Header(default="zh", alias="X-Language-Type"),
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""获取个性化情绪建议(从缓存读取)
|
||
|
||
Args:
|
||
request: 包含 group_id 和可选的 config_id
|
||
db: 数据库会话
|
||
current_user: 当前用户
|
||
|
||
Returns:
|
||
缓存的个性化情绪建议响应
|
||
"""
|
||
try:
|
||
api_logger.info(
|
||
f"用户 {current_user.username} 请求获取个性化情绪建议(缓存)",
|
||
extra={
|
||
"group_id": request.group_id,
|
||
"config_id": request.config_id
|
||
}
|
||
)
|
||
|
||
# 从缓存获取建议
|
||
data = await emotion_service.get_cached_suggestions(
|
||
end_user_id=request.group_id,
|
||
db=db
|
||
)
|
||
|
||
if data is None:
|
||
# 缓存不存在或已过期
|
||
api_logger.info(
|
||
f"用户 {request.group_id} 的建议缓存不存在或已过期",
|
||
extra={"group_id": request.group_id}
|
||
)
|
||
return fail(
|
||
BizCode.NOT_FOUND,
|
||
"建议缓存不存在或已过期,请右上角刷新生成新建议",
|
||
""
|
||
)
|
||
|
||
api_logger.info(
|
||
"个性化建议获取成功(缓存)",
|
||
extra={
|
||
"group_id": request.group_id,
|
||
"suggestions_count": len(data.get("suggestions", []))
|
||
}
|
||
)
|
||
|
||
return success(data=data, msg="个性化建议获取成功(缓存)")
|
||
|
||
except Exception as e:
|
||
api_logger.error(
|
||
f"获取个性化建议失败: {str(e)}",
|
||
extra={"group_id": request.group_id},
|
||
exc_info=True
|
||
)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"获取个性化建议失败: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.post("/generate_suggestions", response_model=ApiResponse)
|
||
async def generate_emotion_suggestions(
|
||
request: EmotionGenerateSuggestionsRequest,
|
||
language_type: str = Header(default="zh", alias="X-Language-Type"),
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""生成个性化情绪建议(调用LLM并缓存)
|
||
|
||
Args:
|
||
request: 包含 end_user_id
|
||
db: 数据库会话
|
||
current_user: 当前用户
|
||
|
||
Returns:
|
||
新生成的个性化情绪建议响应
|
||
"""
|
||
try:
|
||
api_logger.info(
|
||
f"用户 {current_user.username} 请求生成个性化情绪建议",
|
||
extra={
|
||
"end_user_id": request.end_user_id
|
||
}
|
||
)
|
||
|
||
# 调用服务层生成建议
|
||
data = await emotion_service.generate_emotion_suggestions(
|
||
end_user_id=request.end_user_id,
|
||
db=db
|
||
)
|
||
|
||
# 保存到缓存
|
||
await emotion_service.save_suggestions_cache(
|
||
end_user_id=request.end_user_id,
|
||
suggestions_data=data,
|
||
db=db,
|
||
expires_hours=24
|
||
)
|
||
|
||
api_logger.info(
|
||
"个性化建议生成成功",
|
||
extra={
|
||
"end_user_id": request.end_user_id,
|
||
"suggestions_count": len(data.get("suggestions", []))
|
||
}
|
||
)
|
||
|
||
return success(data=data, msg="个性化建议生成成功")
|
||
|
||
except Exception as e:
|
||
api_logger.error(
|
||
f"生成个性化建议失败: {str(e)}",
|
||
extra={"end_user_id": request.end_user_id},
|
||
exc_info=True
|
||
)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"生成个性化建议失败: {str(e)}"
|
||
)
|