# -*- coding: utf-8 -*- """情绪分析控制器模块 本模块提供情绪分析相关的API端点,包括情绪标签、词云、健康指数和个性化建议。 Routes: POST /emotion/tags - 获取情绪标签统计 POST /emotion/wordcloud - 获取情绪词云数据 POST /emotion/health - 获取情绪健康指数 POST /emotion/suggestions - 获取个性化情绪建议 """ from app.core.error_codes import BizCode from app.core.logging_config import get_api_logger from app.core.response_utils import fail, success from app.dependencies import get_current_user, get_db from app.models.user_model import User from app.schemas.emotion_schema import ( EmotionHealthRequest, EmotionSuggestionsRequest, EmotionGenerateSuggestionsRequest, EmotionTagsRequest, EmotionWordcloudRequest, ) from app.schemas.response_schema import ApiResponse from app.services.emotion_analytics_service import EmotionAnalyticsService from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session # 获取API专用日志器 api_logger = get_api_logger() router = APIRouter( prefix="/memory/emotion-memory", tags=["Emotion Analysis"], dependencies=[Depends(get_current_user)] # 所有路由都需要认证 ) # 初始化情绪分析服务uv emotion_service = EmotionAnalyticsService() @router.post("/tags", response_model=ApiResponse) async def get_emotion_tags( request: EmotionTagsRequest, current_user: User = Depends(get_current_user), ): try: api_logger.info( f"用户 {current_user.username} 请求获取情绪标签统计", extra={ "group_id": request.group_id, "emotion_type": request.emotion_type, "start_date": request.start_date, "end_date": request.end_date, "limit": request.limit } ) # 调用服务层 data = await emotion_service.get_emotion_tags( end_user_id=request.group_id, emotion_type=request.emotion_type, start_date=request.start_date, end_date=request.end_date, limit=request.limit ) api_logger.info( "情绪标签统计获取成功", extra={ "group_id": request.group_id, "total_count": data.get("total_count", 0), "tags_count": len(data.get("tags", [])) } ) return success(data=data, msg="情绪标签获取成功") except Exception as e: api_logger.error( f"获取情绪标签统计失败: {str(e)}", extra={"group_id": request.group_id}, exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取情绪标签统计失败: {str(e)}" ) @router.post("/wordcloud", response_model=ApiResponse) async def get_emotion_wordcloud( request: EmotionWordcloudRequest, current_user: User = Depends(get_current_user), ): try: api_logger.info( f"用户 {current_user.username} 请求获取情绪词云数据", extra={ "group_id": request.group_id, "emotion_type": request.emotion_type, "limit": request.limit } ) # 调用服务层 data = await emotion_service.get_emotion_wordcloud( end_user_id=request.group_id, emotion_type=request.emotion_type, limit=request.limit ) api_logger.info( "情绪词云数据获取成功", extra={ "group_id": request.group_id, "total_keywords": data.get("total_keywords", 0) } ) return success(data=data, msg="情绪词云获取成功") except Exception as e: api_logger.error( f"获取情绪词云数据失败: {str(e)}", extra={"group_id": request.group_id}, exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取情绪词云数据失败: {str(e)}" ) @router.post("/health", response_model=ApiResponse) async def get_emotion_health( request: EmotionHealthRequest, current_user: User = Depends(get_current_user), ): try: # 验证时间范围参数 if request.time_range not in ["7d", "30d", "90d"]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="时间范围参数无效,必须是 7d、30d 或 90d" ) api_logger.info( f"用户 {current_user.username} 请求获取情绪健康指数", extra={ "group_id": request.group_id, "time_range": request.time_range } ) # 调用服务层 data = await emotion_service.calculate_emotion_health_index( end_user_id=request.group_id, time_range=request.time_range ) api_logger.info( "情绪健康指数获取成功", extra={ "group_id": request.group_id, "health_score": data.get("health_score", 0), "level": data.get("level", "未知") } ) return success(data=data, msg="情绪健康指数获取成功") except HTTPException: raise except Exception as e: api_logger.error( f"获取情绪健康指数失败: {str(e)}", extra={"group_id": request.group_id}, exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取情绪健康指数失败: {str(e)}" ) @router.post("/suggestions", response_model=ApiResponse) async def get_emotion_suggestions( request: EmotionSuggestionsRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """获取个性化情绪建议(从缓存读取) Args: request: 包含 group_id 和可选的 config_id db: 数据库会话 current_user: 当前用户 Returns: 缓存的个性化情绪建议响应 """ try: api_logger.info( f"用户 {current_user.username} 请求获取个性化情绪建议(缓存)", extra={ "group_id": request.group_id, "config_id": request.config_id } ) # 从缓存获取建议 data = await emotion_service.get_cached_suggestions( end_user_id=request.group_id, db=db ) if data is None: # 缓存不存在或已过期 api_logger.info( f"用户 {request.group_id} 的建议缓存不存在或已过期", extra={"group_id": request.group_id} ) return fail( BizCode.RESOURCE_NOT_FOUND, "建议缓存不存在或已过期,请调用 /generate_suggestions 接口生成新建议", None ) api_logger.info( "个性化建议获取成功(缓存)", extra={ "group_id": request.group_id, "suggestions_count": len(data.get("suggestions", [])) } ) return success(data=data, msg="个性化建议获取成功(缓存)") except Exception as e: api_logger.error( f"获取个性化建议失败: {str(e)}", extra={"group_id": request.group_id}, exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取个性化建议失败: {str(e)}" ) @router.post("/generate_suggestions", response_model=ApiResponse) async def generate_emotion_suggestions( request: EmotionGenerateSuggestionsRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """生成个性化情绪建议(调用LLM并缓存) Args: request: 包含 group_id、可选的 config_id 和 force_refresh db: 数据库会话 current_user: 当前用户 Returns: 新生成的个性化情绪建议响应 """ try: # 验证 config_id(如果提供) # 获取终端用户关联的配置 config_id = request.config_id if config_id is None: # 如果没有提供 config_id,尝试获取用户关联的配置 try: from app.services.memory_agent_service import ( get_end_user_connected_config, ) connected_config = get_end_user_connected_config(request.group_id, db) config_id = connected_config.get("memory_config_id") except ValueError as e: return fail(BizCode.INVALID_PARAMETER, "无法获取用户关联的配置", str(e)) else: # 如果提供了 config_id,验证其有效性 from app.services.memory_config_service import MemoryConfigService try: config_service = MemoryConfigService(db) config = config_service.get_config_by_id(config_id) if not config: return fail(BizCode.INVALID_PARAMETER, "配置ID无效", f"配置 {config_id} 不存在") except Exception as e: return fail(BizCode.INVALID_PARAMETER, "配置ID验证失败", str(e)) api_logger.info( f"用户 {current_user.username} 请求生成个性化情绪建议", extra={ "group_id": request.group_id, "config_id": config_id } ) # 调用服务层生成建议 data = await emotion_service.generate_emotion_suggestions( end_user_id=request.group_id, db=db ) # 保存到缓存 await emotion_service.save_suggestions_cache( end_user_id=request.group_id, suggestions_data=data, db=db, expires_hours=24 ) api_logger.info( "个性化建议生成成功", extra={ "group_id": request.group_id, "suggestions_count": len(data.get("suggestions", [])) } ) return success(data=data, msg="个性化建议生成成功") except Exception as e: api_logger.error( f"生成个性化建议失败: {str(e)}", extra={"group_id": request.group_id}, exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"生成个性化建议失败: {str(e)}" )