diff --git a/api/app/controllers/__init__.py b/api/app/controllers/__init__.py index ddf534c6..27f65b1d 100644 --- a/api/app/controllers/__init__.py +++ b/api/app/controllers/__init__.py @@ -29,6 +29,8 @@ from . import ( public_share_controller, multi_agent_controller, workflow_controller, + emotion_controller, + emotion_config_controller, prompt_optimizer_controller, ) @@ -60,6 +62,8 @@ manager_router.include_router(public_share_controller.router) # 公开路由( manager_router.include_router(memory_dashboard_controller.router) manager_router.include_router(multi_agent_controller.router) manager_router.include_router(workflow_controller.router) +manager_router.include_router(emotion_controller.router) +manager_router.include_router(emotion_config_controller.router) manager_router.include_router(prompt_optimizer_controller.router) manager_router.include_router(memory_reflection_controller.router) __all__ = ["manager_router"] diff --git a/api/app/controllers/emotion_config_controller.py b/api/app/controllers/emotion_config_controller.py new file mode 100644 index 00000000..76450d8a --- /dev/null +++ b/api/app/controllers/emotion_config_controller.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +"""情绪配置控制器模块 + +本模块提供情绪引擎配置管理的API端点,包括获取和更新配置。 + +Routes: + GET /memory/config/emotion - 获取情绪引擎配置 + POST /memory/config/emotion - 更新情绪引擎配置 +""" + +from fastapi import APIRouter, Depends, Query, HTTPException, status +from pydantic import BaseModel, Field +from typing import Optional +from sqlalchemy.orm import Session + +from app.core.response_utils import success +from app.dependencies import get_current_user +from app.models.user_model import User +from app.schemas.response_schema import ApiResponse +from app.services.emotion_config_service import EmotionConfigService +from app.core.logging_config import get_api_logger +from app.db import get_db + +# 获取API专用日志器 +api_logger = get_api_logger() + +router = APIRouter( + prefix="/memory/emotion", + tags=["Emotion Config"], + dependencies=[Depends(get_current_user)] # 所有路由都需要认证 +) + +class EmotionConfigQuery(BaseModel): + """情绪配置查询请求模型""" + config_id: int = Field(..., description="配置ID") + +class EmotionConfigUpdate(BaseModel): + """情绪配置更新请求模型""" + config_id: int = Field(..., description="配置ID") + emotion_enabled: bool = Field(..., description="是否启用情绪提取") + emotion_model_id: Optional[str] = Field(None, description="情绪分析专用模型ID") + emotion_extract_keywords: bool = Field(..., description="是否提取情绪关键词") + emotion_min_intensity: float = Field(..., ge=0.0, le=1.0, description="最小情绪强度阈值(0.0-1.0)") + emotion_enable_subject: bool = Field(..., description="是否启用主体分类") + +@router.get("/read_config", response_model=ApiResponse) +def get_emotion_config( + config_id: int = Query(..., description="配置ID"), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """获取情绪引擎配置 + + 查询指定配置ID的情绪相关配置字段。 + + Args: + config_id: 配置ID + + Returns: + ApiResponse: 包含情绪配置数据 + + Example Response: + { + "code": 2000, + "msg": "情绪配置获取成功", + "data": { + "config_id": 17, + "emotion_enabled": true, + "emotion_model_id": "gpt-4", + "emotion_extract_keywords": true, + "emotion_min_intensity": 0.1, + "emotion_enable_subject": true + } + } + """ + try: + api_logger.info( + f"用户 {current_user.username} 请求获取情绪配置", + extra={"config_id": config_id} + ) + + # 初始化服务 + config_service = EmotionConfigService(db) + + # 调用服务层 + data = config_service.get_emotion_config(config_id) + + api_logger.info( + "情绪配置获取成功", + extra={ + "config_id": config_id, + "emotion_enabled": data.get("emotion_enabled", False) + } + ) + + return success(data=data, msg="情绪配置获取成功") + + except ValueError as e: + api_logger.warning( + f"获取情绪配置失败: {str(e)}", + extra={"config_id": config_id} + ) + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(e) + ) + except Exception as e: + api_logger.error( + f"获取情绪配置失败: {str(e)}", + extra={"config_id": config_id}, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"获取情绪配置失败: {str(e)}" + ) + + + +@router.post("/updated_config", response_model=ApiResponse) +def update_emotion_config( + config: EmotionConfigUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """更新情绪引擎配置 + + 更新指定配置ID的情绪相关配置字段。 + + Args: + config: 配置更新数据(包含config_id) + + Returns: + ApiResponse: 包含更新后的情绪配置数据 + + Example Request: + { + "config_id": 2, + "emotion_enabled": true, + "emotion_model_id": "gpt-4", + "emotion_extract_keywords": true, + "emotion_min_intensity": 0.1, + "emotion_enable_subject": true + } + + Example Response: + { + "code": 2000, + "msg": "情绪配置更新成功", + "data": { + "config_id": 17, + "emotion_enabled": true, + "emotion_model_id": "gpt-4", + "emotion_extract_keywords": true, + "emotion_min_intensity": 0.2, + "emotion_enable_subject": true + } + } + """ + try: + api_logger.info( + f"用户 {current_user.username} 请求更新情绪配置", + extra={ + "config_id": config.config_id, + "emotion_enabled": config.emotion_enabled, + "emotion_min_intensity": config.emotion_min_intensity + } + ) + + # 初始化服务 + config_service = EmotionConfigService(db) + + # 转换为字典(排除config_id,因为它作为参数传递) + config_data = config.model_dump(exclude={'config_id'}) + + # 调用服务层 + data = config_service.update_emotion_config(config.config_id, config_data) + + api_logger.info( + "情绪配置更新成功", + extra={ + "config_id": config.config_id, + "emotion_enabled": data.get("emotion_enabled", False) + } + ) + + return success(data=data, msg="情绪配置更新成功") + + except ValueError as e: + api_logger.warning( + f"更新情绪配置失败: {str(e)}", + extra={"config_id": config.config_id} + ) + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e) + ) + except Exception as e: + api_logger.error( + f"更新情绪配置失败: {str(e)}", + extra={"config_id": config.config_id}, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"更新情绪配置失败: {str(e)}" + ) diff --git a/api/app/controllers/emotion_controller.py b/api/app/controllers/emotion_controller.py new file mode 100644 index 00000000..2ed00c43 --- /dev/null +++ b/api/app/controllers/emotion_controller.py @@ -0,0 +1,255 @@ +# -*- coding: utf-8 -*- +"""情绪分析控制器模块 + +本模块提供情绪分析相关的API端点,包括情绪标签、词云、健康指数和个性化建议。 + +Routes: + POST /emotion/tags - 获取情绪标签统计 + POST /emotion/wordcloud - 获取情绪词云数据 + POST /emotion/health - 获取情绪健康指数 + POST /emotion/suggestions - 获取个性化情绪建议 +""" + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session + +from app.core.response_utils import success, fail +from app.core.error_codes import BizCode +from app.dependencies import get_current_user, get_db +from app.models.user_model import User +from app.schemas.response_schema import ApiResponse +from app.schemas.emotion_schema import ( + EmotionTagsRequest, + EmotionWordcloudRequest, + EmotionHealthRequest, + EmotionSuggestionsRequest +) +from app.services.emotion_analytics_service import EmotionAnalyticsService +from app.core.logging_config import get_api_logger + +# 获取API专用日志器 +api_logger = get_api_logger() + +router = APIRouter( + prefix="/memory/emotion", + tags=["Emotion Analysis"], + dependencies=[Depends(get_current_user)] # 所有路由都需要认证 +) + + +# 初始化情绪分析服务uv +emotion_service = EmotionAnalyticsService() + + + +@router.post("/tags", response_model=ApiResponse) +async def get_emotion_tags( + request: EmotionTagsRequest, + current_user: User = Depends(get_current_user), +): + + try: + api_logger.info( + f"用户 {current_user.username} 请求获取情绪标签统计", + extra={ + "group_id": request.group_id, + "emotion_type": request.emotion_type, + "start_date": request.start_date, + "end_date": request.end_date, + "limit": request.limit + } + ) + + # 调用服务层 + data = await emotion_service.get_emotion_tags( + end_user_id=request.group_id, + emotion_type=request.emotion_type, + start_date=request.start_date, + end_date=request.end_date, + limit=request.limit + ) + + api_logger.info( + "情绪标签统计获取成功", + extra={ + "group_id": request.group_id, + "total_count": data.get("total_count", 0), + "tags_count": len(data.get("tags", [])) + } + ) + + return success(data=data, msg="情绪标签获取成功") + + except Exception as e: + api_logger.error( + f"获取情绪标签统计失败: {str(e)}", + extra={"group_id": request.group_id}, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"获取情绪标签统计失败: {str(e)}" + ) + + + +@router.post("/wordcloud", response_model=ApiResponse) +async def get_emotion_wordcloud( + request: EmotionWordcloudRequest, + current_user: User = Depends(get_current_user), +): + + try: + api_logger.info( + f"用户 {current_user.username} 请求获取情绪词云数据", + extra={ + "group_id": request.group_id, + "emotion_type": request.emotion_type, + "limit": request.limit + } + ) + + # 调用服务层 + data = await emotion_service.get_emotion_wordcloud( + end_user_id=request.group_id, + emotion_type=request.emotion_type, + limit=request.limit + ) + + api_logger.info( + "情绪词云数据获取成功", + extra={ + "group_id": request.group_id, + "total_keywords": data.get("total_keywords", 0) + } + ) + + return success(data=data, msg="情绪词云获取成功") + + except Exception as e: + api_logger.error( + f"获取情绪词云数据失败: {str(e)}", + extra={"group_id": request.group_id}, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"获取情绪词云数据失败: {str(e)}" + ) + + + +@router.post("/health", response_model=ApiResponse) +async def get_emotion_health( + request: EmotionHealthRequest, + current_user: User = Depends(get_current_user), +): + + try: + # 验证时间范围参数 + if request.time_range not in ["7d", "30d", "90d"]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="时间范围参数无效,必须是 7d、30d 或 90d" + ) + + api_logger.info( + f"用户 {current_user.username} 请求获取情绪健康指数", + extra={ + "group_id": request.group_id, + "time_range": request.time_range + } + ) + + # 调用服务层 + data = await emotion_service.calculate_emotion_health_index( + end_user_id=request.group_id, + time_range=request.time_range + ) + + api_logger.info( + "情绪健康指数获取成功", + extra={ + "group_id": request.group_id, + "health_score": data.get("health_score", 0), + "level": data.get("level", "未知") + } + ) + + return success(data=data, msg="情绪健康指数获取成功") + + except HTTPException: + raise + except Exception as e: + api_logger.error( + f"获取情绪健康指数失败: {str(e)}", + extra={"group_id": request.group_id}, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"获取情绪健康指数失败: {str(e)}" + ) + + + +@router.post("/suggestions", response_model=ApiResponse) +async def get_emotion_suggestions( + request: EmotionSuggestionsRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """获取个性化情绪建议 + + Args: + request: 包含 group_id 和可选的 config_id + db: 数据库会话 + current_user: 当前用户 + + Returns: + 个性化情绪建议响应 + """ + try: + # 验证 config_id(如果提供) + config_id = request.config_id + if config_id is not None: + from app.controllers.memory_agent_controller import validate_config_id + try: + config_id = validate_config_id(config_id, db) + except ValueError as e: + return fail(BizCode.INVALID_PARAMETER, "配置ID无效", str(e)) + + api_logger.info( + f"用户 {current_user.username} 请求获取个性化情绪建议", + extra={ + "group_id": request.group_id, + "config_id": config_id + } + ) + + # 调用服务层 + data = await emotion_service.generate_emotion_suggestions( + end_user_id=request.group_id, + config_id=config_id + ) + + api_logger.info( + "个性化建议获取成功", + extra={ + "group_id": request.group_id, + "suggestions_count": len(data.get("suggestions", [])) + } + ) + + return success(data=data, msg="个性化建议获取成功") + + except Exception as e: + api_logger.error( + f"获取个性化建议失败: {str(e)}", + extra={"group_id": request.group_id}, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"获取个性化建议失败: {str(e)}" + ) diff --git a/api/app/core/memory/agent/langgraph_graph/write_graph.py b/api/app/core/memory/agent/langgraph_graph/write_graph.py index dbdc51d6..cfcc1c4a 100644 --- a/api/app/core/memory/agent/langgraph_graph/write_graph.py +++ b/api/app/core/memory/agent/langgraph_graph/write_graph.py @@ -38,14 +38,53 @@ async def make_write_graph(user_id, tools, apply_id, group_id, config_id=None): messages = state["messages"] last_message = messages[-1] - result = await data_type_tool.ainvoke({ - "context": last_message[1] if isinstance(last_message, tuple) else last_message.content - }) - result=json.loads( result) + # 调用 Data_type_differentiation 工具 + try: + raw_result = await data_type_tool.ainvoke({ + "context": last_message[1] if isinstance(last_message, tuple) else last_message.content + }) + + # MCP工具返回的是列表格式,需要提取内容 + logger.debug(f"Data_type_differentiation raw result type: {type(raw_result)}, value: {raw_result}") + + # 处理不同的返回格式 + if isinstance(raw_result, list) and len(raw_result) > 0: + # MCP工具返回格式: [{"type": "text", "text": "..."}] + result_text = raw_result[0].get("text", "{}") if isinstance(raw_result[0], dict) else str(raw_result[0]) + elif isinstance(raw_result, str): + result_text = raw_result + else: + result_text = str(raw_result) + + # 解析JSON字符串 + try: + result = json.loads(result_text) + except json.JSONDecodeError as je: + logger.error(f"Failed to parse result as JSON: {result_text}, error: {je}") + return {"messages": [AIMessage(content=json.dumps({ + "status": "error", + "message": f"Invalid JSON response from Data_type_differentiation: {str(je)}" + }))]} + + # 检查是否有错误 + if isinstance(result, dict) and result.get("type") == "error": + error_msg = result.get("message", "Unknown error in Data_type_differentiation") + logger.error(f"Data_type_differentiation 返回错误: {error_msg}") + return {"messages": [AIMessage(content=json.dumps({ + "status": "error", + "message": error_msg + }))]} + + except Exception as e: + logger.error(f"调用 Data_type_differentiation 失败: {e}", exc_info=True) + return {"messages": [AIMessage(content=json.dumps({ + "status": "error", + "message": f"Data type differentiation failed: {str(e)}" + }))]} # 调用 Data_write,传递 config_id write_params = { - "content": result["context"], + "content": result.get("context", last_message.content if hasattr(last_message, 'content') else str(last_message)), "apply_id": apply_id, "group_id": group_id, "user_id": user_id @@ -56,14 +95,22 @@ async def make_write_graph(user_id, tools, apply_id, group_id, config_id=None): write_params["config_id"] = config_id logger.debug(f"传递 config_id 到 Data_write: {config_id}") - write_result = await data_write_tool.ainvoke(write_params) + try: + write_result = await data_write_tool.ainvoke(write_params) - if isinstance(write_result, dict): - content = write_result.get("data", str(write_result)) - else: - content = str(write_result) - logger.info("写入内容: %s", content) - return {"messages": [AIMessage(content=content)]} + if isinstance(write_result, dict): + content = write_result.get("data", str(write_result)) + else: + content = str(write_result) + logger.info("写入内容: %s", content) + return {"messages": [AIMessage(content=content)]} + + except Exception as e: + logger.error(f"调用 Data_write 失败: {e}", exc_info=True) + return {"messages": [AIMessage(content=json.dumps({ + "status": "error", + "message": f"Data write failed: {str(e)}" + }))]} workflow = StateGraph(WriteState) workflow.add_node("content_input", call_model) diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index ebfbcc6c..f792ea9d 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -39,6 +39,17 @@ async def write(content: str, user_id: str, apply_id: str, group_id: str, ref_id ref_id: 参考ID,默认为 "wyl20251027" config_id: 配置ID,用于标记数据处理配置 """ + # 如果提供了config_id,重新加载配置 + if config_id: + from app.core.memory.utils.config.definitions import reload_configuration_from_database + logger.info(f"Reloading configuration for config_id: {config_id}") + config_loaded = reload_configuration_from_database(config_id) + if not config_loaded: + error_msg = f"Failed to load configuration for config_id: {config_id}" + logger.error(error_msg) + raise ValueError(error_msg) + logger.info(f"Configuration reloaded successfully for config_id: {config_id}") + logger.info("=== MemSci Knowledge Extraction Pipeline ===") logger.info(f"Using model: {config_defs.SELECTED_LLM_NAME}") logger.info(f"Using LLM ID: {config_defs.SELECTED_LLM_ID}") diff --git a/api/app/core/memory/models/emotion_models.py b/api/app/core/memory/models/emotion_models.py new file mode 100644 index 00000000..f84165a7 --- /dev/null +++ b/api/app/core/memory/models/emotion_models.py @@ -0,0 +1,85 @@ +"""Emotion extraction models for LLM structured output. + +This module contains Pydantic models for emotion extraction from statements, +designed to be used with LLM structured output capabilities. + +Classes: + EmotionExtraction: Model for emotion extraction results from statements +""" + +from pydantic import BaseModel, Field, field_validator +from typing import List, Optional + + +class EmotionExtraction(BaseModel): + """Emotion extraction result model for LLM structured output. + + This model represents the structured emotion information extracted from + a statement using LLM. It includes emotion type, intensity, keywords, + subject classification, and optional target. + + Attributes: + emotion_type: Type of emotion (joy/sadness/anger/fear/surprise/neutral) + emotion_intensity: Intensity of emotion (0.0-1.0) + emotion_keywords: List of emotion keywords from the statement (max 3) + emotion_subject: Subject of emotion (self/other/object) + emotion_target: Optional target of emotion (person or object name) + """ + + emotion_type: str = Field( + ..., + description="Emotion type: joy/sadness/anger/fear/surprise/neutral" + ) + emotion_intensity: float = Field( + ..., + ge=0.0, + le=1.0, + description="Emotion intensity from 0.0 to 1.0" + ) + emotion_keywords: List[str] = Field( + default_factory=list, + description="Emotion keywords extracted from the statement (max 3)" + ) + emotion_subject: str = Field( + ..., + description="Emotion subject: self/other/object" + ) + emotion_target: Optional[str] = Field( + None, + description="Emotion target: person or object name" + ) + + @field_validator('emotion_type') + @classmethod + def validate_emotion_type(cls, v): + """Validate emotion type is one of the valid values.""" + valid_types = ['joy', 'sadness', 'anger', 'fear', 'surprise', 'neutral'] + if v not in valid_types: + raise ValueError(f"emotion_type must be one of {valid_types}, got {v}") + return v + + @field_validator('emotion_subject') + @classmethod + def validate_emotion_subject(cls, v): + """Validate emotion subject is one of the valid values.""" + valid_subjects = ['self', 'other', 'object'] + if v not in valid_subjects: + raise ValueError(f"emotion_subject must be one of {valid_subjects}, got {v}") + return v + + @field_validator('emotion_keywords') + @classmethod + def validate_emotion_keywords(cls, v): + """Validate and limit emotion keywords to max 3 items.""" + if not isinstance(v, list): + return [] + # Limit to max 3 keywords + return v[:3] + + @field_validator('emotion_intensity') + @classmethod + def validate_emotion_intensity(cls, v): + """Validate emotion intensity is within valid range.""" + if not (0.0 <= v <= 1.0): + raise ValueError(f"emotion_intensity must be between 0.0 and 1.0, got {v}") + return v diff --git a/api/app/core/memory/models/graph_models.py b/api/app/core/memory/models/graph_models.py index 58b8271c..a8c3f7b0 100644 --- a/api/app/core/memory/models/graph_models.py +++ b/api/app/core/memory/models/graph_models.py @@ -215,24 +215,58 @@ class StatementNode(Node): Attributes: chunk_id: ID of the parent chunk this statement belongs to stmt_type: Type of the statement (from ontology) - temporal_info: Temporal information extracted from the statement statement: The actual statement text content - connect_strength: Classification of connection strength ('Strong' or 'Weak') + emotion_intensity: Optional emotion intensity (0.0-1.0) - displayed on node + emotion_target: Optional emotion target (person or object name) + emotion_subject: Optional emotion subject (self/other/object) + emotion_type: Optional emotion type (joy/sadness/anger/fear/surprise/neutral) + emotion_keywords: Optional list of emotion keywords (max 3) + temporal_info: Temporal information extracted from the statement valid_at: Optional start date of temporal validity invalid_at: Optional end date of temporal validity statement_embedding: Optional embedding vector for the statement chunk_embedding: Optional embedding vector for the parent chunk + connect_strength: Classification of connection strength ('Strong' or 'Weak') config_id: Configuration ID used to process this statement """ + # Core fields (ordered as requested) chunk_id: str = Field(..., description="ID of the parent chunk") stmt_type: str = Field(..., description="Type of the statement") - temporal_info: TemporalInfo = Field(..., description="Temporal information") statement: str = Field(..., description="The statement text content") - connect_strength: str = Field(..., description="Strong VS Weak classification of this statement") + + # Emotion fields (ordered as requested, emotion_intensity first for display) + emotion_intensity: Optional[float] = Field( + None, + ge=0.0, + le=1.0, + description="Emotion intensity: 0.0-1.0 (displayed on node)" + ) + emotion_target: Optional[str] = Field( + None, + description="Emotion target: person or object name" + ) + emotion_subject: Optional[str] = Field( + None, + description="Emotion subject: self/other/object" + ) + emotion_type: Optional[str] = Field( + None, + description="Emotion type: joy/sadness/anger/fear/surprise/neutral" + ) + emotion_keywords: Optional[List[str]] = Field( + default_factory=list, + description="Emotion keywords list, max 3 items" + ) + + # Temporal fields + temporal_info: TemporalInfo = Field(..., description="Temporal information") valid_at: Optional[datetime] = Field(None, description="Temporal validity start") invalid_at: Optional[datetime] = Field(None, description="Temporal validity end") + + # Embedding and other fields statement_embedding: Optional[List[float]] = Field(None, description="Statement embedding vector") chunk_embedding: Optional[List[float]] = Field(None, description="Chunk embedding vector") + connect_strength: str = Field(..., description="Strong VS Weak classification of this statement") config_id: Optional[int | str] = Field(None, description="Configuration ID used to process this statement (integer or string)") @field_validator('valid_at', 'invalid_at', mode='before') @@ -240,6 +274,39 @@ class StatementNode(Node): def validate_datetime(cls, v): """使用通用的历史日期解析函数""" return parse_historical_datetime(v) + + @field_validator('emotion_type', mode='before') + @classmethod + def validate_emotion_type(cls, v): + """Validate emotion type is one of the valid values""" + if v is None: + return v + valid_types = ['joy', 'sadness', 'anger', 'fear', 'surprise', 'neutral'] + if v not in valid_types: + raise ValueError(f"emotion_type must be one of {valid_types}, got {v}") + return v + + @field_validator('emotion_subject', mode='before') + @classmethod + def validate_emotion_subject(cls, v): + """Validate emotion subject is one of the valid values""" + if v is None: + return v + valid_subjects = ['self', 'other', 'object'] + if v not in valid_subjects: + raise ValueError(f"emotion_subject must be one of {valid_subjects}, got {v}") + return v + + @field_validator('emotion_keywords', mode='before') + @classmethod + def validate_emotion_keywords(cls, v): + """Validate emotion keywords list has max 3 items""" + if v is None: + return [] + if not isinstance(v, list): + return [] + # Limit to max 3 keywords + return v[:3] class ChunkNode(Node): diff --git a/api/app/core/memory/models/message_models.py b/api/app/core/memory/models/message_models.py index 192816fd..199bdd75 100644 --- a/api/app/core/memory/models/message_models.py +++ b/api/app/core/memory/models/message_models.py @@ -64,6 +64,11 @@ class Statement(BaseModel): connect_strength: Optional connection strength ('Strong' or 'Weak') temporal_validity: Optional temporal validity range triplet_extraction_info: Optional triplet extraction results + emotion_type: Optional emotion type (joy/sadness/anger/fear/surprise/neutral) + emotion_intensity: Optional emotion intensity (0.0-1.0) + emotion_keywords: Optional list of emotion keywords + emotion_subject: Optional emotion subject (self/other/object) + emotion_target: Optional emotion target (person or object name) """ id: str = Field(default_factory=lambda: uuid4().hex, description="A unique identifier for the statement.") chunk_id: str = Field(..., description="ID of the parent chunk this statement belongs to.") @@ -80,6 +85,12 @@ class Statement(BaseModel): triplet_extraction_info: Optional[TripletExtractionResponse] = Field( None, description="The triplet extraction information of the statement." ) + # Emotion fields + emotion_type: Optional[str] = Field(None, description="Emotion type: joy/sadness/anger/fear/surprise/neutral") + emotion_intensity: Optional[float] = Field(None, ge=0.0, le=1.0, description="Emotion intensity: 0.0-1.0") + emotion_keywords: Optional[List[str]] = Field(default_factory=list, description="Emotion keywords, max 3") + emotion_subject: Optional[str] = Field(None, description="Emotion subject: self/other/object") + emotion_target: Optional[str] = Field(None, description="Emotion target: person or object name") class ConversationContext(BaseModel): diff --git a/api/app/core/memory/storage_services/extraction_engine/deduplication/entity_dedup_llm.py b/api/app/core/memory/storage_services/extraction_engine/deduplication/entity_dedup_llm.py index 2c784d42..734f7b69 100644 --- a/api/app/core/memory/storage_services/extraction_engine/deduplication/entity_dedup_llm.py +++ b/api/app/core/memory/storage_services/extraction_engine/deduplication/entity_dedup_llm.py @@ -480,7 +480,6 @@ async def llm_dedup_entities_iterative_blocks( # 迭代分块并发 LLM 去重 - global_redirect: dict losing_id -> canonical_id accumulated across rounds - records: textual logs including per-round/per-block summaries and per-pair decisions """ - import asyncio import random # 初始化全局日志和全局ID映射(存储所有轮次的结果) records: List[str] = [] diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py index e00bcf0a..91529aa9 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py @@ -35,7 +35,6 @@ from app.core.memory.models.graph_models import ( from app.core.memory.utils.data.ontology import TemporalInfo from app.core.memory.models.variate_config import ( ExtractionPipelineConfig, - StatementExtractionConfig, ) from app.core.memory.llm_tools.openai_client import LLMClient from app.core.memory.llm_tools.openai_embedder import OpenAIEmbedderClient @@ -53,7 +52,6 @@ from app.core.memory.storage_services.extraction_engine.knowledge_extraction.tem ) from app.core.memory.storage_services.extraction_engine.knowledge_extraction.embedding_generation import ( embedding_generation, - embedding_generation_all, generate_entity_embeddings_from_triplets, ) from app.core.memory.storage_services.extraction_engine.deduplication.two_stage_dedup import ( @@ -179,24 +177,12 @@ class ExtractionOrchestrator: all_statements_list.extend(chunk.statements) total_statements = len(all_statements_list) - # 🔥 陈述句提取完成后,立即发送知识抽取完成消息 - if self.progress_callback: - extraction_stats = { - "statements_count": total_statements, - "entities_count": 0, # 暂时为0,后续会更新 - "triplets_count": 0, # 暂时为0,后续会更新 - "temporal_ranges_count": 0, # 暂时为0,后续会更新 - } - await self.progress_callback("knowledge_extraction_complete", "知识抽取完成", extraction_stats) - - # 🔥 立即发送下一阶段的开始消息,让前端知道进入了创建节点和边阶段 - await self.progress_callback("creating_nodes_edges", "正在创建节点和边...") - - # 步骤 2: 并行执行三元组提取、时间信息提取和基础嵌入生成(后台静默执行) - logger.info("步骤 2/6: 并行执行三元组提取、时间信息提取和嵌入生成(后台静默执行)") + # 步骤 2: 并行执行三元组提取、时间信息提取、情绪提取和基础嵌入生成 + logger.info("步骤 2/6: 并行执行三元组提取、时间信息提取、情绪提取和嵌入生成") ( triplet_maps, temporal_maps, + emotion_maps, statement_embedding_maps, chunk_embedding_maps, dialog_embeddings, @@ -225,6 +211,7 @@ class ExtractionOrchestrator: dialog_data_list, temporal_maps, triplet_maps, + emotion_maps, statement_embedding_maps, chunk_embedding_maps, dialog_embeddings, @@ -552,9 +539,108 @@ class ExtractionOrchestrator: return temporal_maps + async def _extract_emotions( + self, dialog_data_list: List[DialogData] + ) -> List[Dict[str, Any]]: + """ + 从对话中提取情绪信息(优化版:全局陈述句级并行) + + Args: + dialog_data_list: 对话数据列表 + + Returns: + 情绪信息映射列表,每个对话对应一个字典 + """ + logger.info("开始情绪信息提取(全局陈述句级并行)") + + # 收集所有陈述句及其配置 + all_statements = [] + statement_metadata = [] # (dialog_idx, statement_id) + + # 获取第一个对话的config_id来加载配置 + config_id = None + if dialog_data_list and hasattr(dialog_data_list[0], 'config_id'): + config_id = dialog_data_list[0].config_id + + # 加载DataConfig + data_config = None + if config_id: + try: + from app.db import SessionLocal + from app.repositories.data_config_repository import DataConfigRepository + + db = SessionLocal() + try: + data_config = DataConfigRepository.get_by_id(db, config_id) + finally: + db.close() + + if data_config and not data_config.emotion_enabled: + logger.info("情绪提取已在配置中禁用,跳过情绪提取") + return [{} for _ in dialog_data_list] + + except Exception as e: + logger.warning(f"加载DataConfig失败: {e},将跳过情绪提取") + return [{} for _ in dialog_data_list] + else: + logger.info("未找到config_id,跳过情绪提取") + return [{} for _ in dialog_data_list] + + # 如果配置未启用情绪提取,直接返回空映射 + if not data_config or not data_config.emotion_enabled: + logger.info("情绪提取未启用,跳过") + return [{} for _ in dialog_data_list] + + # 收集所有陈述句 + for d_idx, dialog in enumerate(dialog_data_list): + for chunk in dialog.chunks: + for statement in chunk.statements: + all_statements.append((statement, data_config)) + statement_metadata.append((d_idx, statement.id)) + + logger.info(f"收集到 {len(all_statements)} 个陈述句,开始全局并行提取情绪") + + # 初始化情绪提取服务 + from app.services.emotion_extraction_service import EmotionExtractionService + emotion_service = EmotionExtractionService( + llm_id=data_config.emotion_model_id if data_config.emotion_model_id else None + ) + + # 全局并行处理所有陈述句 + async def extract_for_statement(stmt_data): + statement, config = stmt_data + try: + return await emotion_service.extract_emotion(statement.statement, config) + except Exception as e: + logger.error(f"陈述句 {statement.id} 情绪提取失败: {e}") + return None + + tasks = [extract_for_statement(stmt_data) for stmt_data in all_statements] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 将结果组织成对话级别的映射 + emotion_maps = [{} for _ in dialog_data_list] + successful_extractions = 0 + + for i, result in enumerate(results): + d_idx, stmt_id = statement_metadata[i] + if isinstance(result, Exception): + logger.error(f"陈述句处理异常: {result}") + emotion_maps[d_idx][stmt_id] = None + else: + emotion_maps[d_idx][stmt_id] = result + if result is not None: + successful_extractions += 1 + + # 统计提取结果 + logger.info(f"情绪信息提取完成,共成功提取 {successful_extractions}/{len(all_statements)} 个情绪") + + return emotion_maps + async def _parallel_extract_and_embed( self, dialog_data_list: List[DialogData] ) -> Tuple[ + List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, List[float]]], @@ -562,35 +648,39 @@ class ExtractionOrchestrator: List[List[float]], ]: """ - 并行执行三元组提取、时间信息提取和基础嵌入生成 + 并行执行三元组提取、时间信息提取、情绪提取和基础嵌入生成 - 这三个任务都依赖陈述句提取的结果,但彼此独立,可以并行执行: + 这四个任务都依赖陈述句提取的结果,但彼此独立,可以并行执行: - 三元组提取:从陈述句中提取实体和关系 - 时间信息提取:从陈述句中提取时间范围 + - 情绪提取:从陈述句中提取情绪信息 - 嵌入生成:为陈述句、分块和对话生成向量(不依赖三元组) Args: dialog_data_list: 对话数据列表 Returns: - 五个列表的元组: + 六个列表的元组: - 三元组映射列表 - 时间信息映射列表 + - 情绪映射列表 - 陈述句嵌入映射列表 - 分块嵌入映射列表 - 对话嵌入列表 """ - logger.info("并行执行:三元组提取 + 时间信息提取 + 基础嵌入生成") + logger.info("并行执行:三元组提取 + 时间信息提取 + 情绪提取 + 基础嵌入生成") - # 创建三个并行任务 + # 创建四个并行任务 triplet_task = self._extract_triplets(dialog_data_list) temporal_task = self._extract_temporal(dialog_data_list) + emotion_task = self._extract_emotions(dialog_data_list) embedding_task = self._generate_basic_embeddings(dialog_data_list) # 并行执行 results = await asyncio.gather( triplet_task, temporal_task, + emotion_task, embedding_task, return_exceptions=True ) @@ -598,19 +688,21 @@ class ExtractionOrchestrator: # 解包结果 triplet_maps = results[0] if not isinstance(results[0], Exception) else [{} for _ in dialog_data_list] temporal_maps = results[1] if not isinstance(results[1], Exception) else [{} for _ in dialog_data_list] + emotion_maps = results[2] if not isinstance(results[2], Exception) else [{} for _ in dialog_data_list] - if isinstance(results[2], Exception): - logger.error(f"基础嵌入生成失败: {results[2]}") + if isinstance(results[3], Exception): + logger.error(f"基础嵌入生成失败: {results[3]}") statement_embedding_maps = [{} for _ in dialog_data_list] chunk_embedding_maps = [{} for _ in dialog_data_list] dialog_embeddings = [[] for _ in dialog_data_list] else: - statement_embedding_maps, chunk_embedding_maps, dialog_embeddings = results[2] + statement_embedding_maps, chunk_embedding_maps, dialog_embeddings = results[3] logger.info("并行任务执行完成") return ( triplet_maps, temporal_maps, + emotion_maps, statement_embedding_maps, chunk_embedding_maps, dialog_embeddings, @@ -727,6 +819,7 @@ class ExtractionOrchestrator: dialog_data_list: List[DialogData], temporal_maps: List[Dict[str, Any]], triplet_maps: List[Dict[str, Any]], + emotion_maps: List[Dict[str, Any]], statement_embedding_maps: List[Dict[str, List[float]]], chunk_embedding_maps: List[Dict[str, List[float]]], dialog_embeddings: List[List[float]], @@ -738,6 +831,7 @@ class ExtractionOrchestrator: dialog_data_list: 对话数据列表 temporal_maps: 时间信息映射列表 triplet_maps: 三元组映射列表 + emotion_maps: 情绪信息映射列表 statement_embedding_maps: 陈述句嵌入映射列表 chunk_embedding_maps: 分块嵌入映射列表 dialog_embeddings: 对话嵌入列表 @@ -752,6 +846,7 @@ class ExtractionOrchestrator: if ( len(temporal_maps) != expected_length or len(triplet_maps) != expected_length + or len(emotion_maps) != expected_length or len(statement_embedding_maps) != expected_length or len(chunk_embedding_maps) != expected_length or len(dialog_embeddings) != expected_length @@ -759,6 +854,7 @@ class ExtractionOrchestrator: logger.warning( f"数据大小不匹配 - 对话: {len(dialog_data_list)}, " f"时间映射: {len(temporal_maps)}, 三元组映射: {len(triplet_maps)}, " + f"情绪映射: {len(emotion_maps)}, " f"陈述句嵌入: {len(statement_embedding_maps)}, " f"分块嵌入: {len(chunk_embedding_maps)}, " f"对话嵌入: {len(dialog_embeddings)}" @@ -767,6 +863,7 @@ class ExtractionOrchestrator: total_statements = 0 assigned_temporal = 0 assigned_triplets = 0 + assigned_emotions = 0 assigned_statement_embeddings = 0 assigned_chunk_embeddings = 0 assigned_dialog_embeddings = 0 @@ -774,12 +871,13 @@ class ExtractionOrchestrator: # 处理每个对话 for i, dialog_data in enumerate(dialog_data_list): # 检查是否有缺失的数据 - if i >= len(temporal_maps) or i >= len(triplet_maps): + if i >= len(temporal_maps) or i >= len(triplet_maps) or i >= len(emotion_maps): logger.warning(f"对话 {dialog_data.id} 缺少提取数据,跳过赋值") continue temporal_map = temporal_maps[i] triplet_map = triplet_maps[i] + emotion_map = emotion_maps[i] statement_embedding_map = statement_embedding_maps[i] if i < len(statement_embedding_maps) else {} chunk_embedding_map = chunk_embedding_maps[i] if i < len(chunk_embedding_maps) else {} dialog_embedding = dialog_embeddings[i] if i < len(dialog_embeddings) else [] @@ -810,6 +908,18 @@ class ExtractionOrchestrator: statement.triplet_extraction_info = triplet_map[statement.id] assigned_triplets += 1 + # 赋值情绪信息 + if statement.id in emotion_map: + emotion_data = emotion_map[statement.id] + if emotion_data is not None: + # 将EmotionExtraction对象的字段赋值到Statement + statement.emotion_type = emotion_data.emotion_type + statement.emotion_intensity = emotion_data.emotion_intensity + statement.emotion_keywords = emotion_data.emotion_keywords + statement.emotion_subject = emotion_data.emotion_subject + statement.emotion_target = emotion_data.emotion_target + assigned_emotions += 1 + # 赋值陈述句嵌入 if statement.id in statement_embedding_map: statement.statement_embedding = statement_embedding_map[statement.id] @@ -818,6 +928,7 @@ class ExtractionOrchestrator: logger.info( f"数据赋值完成 - 总陈述句: {total_statements}, " f"时间信息: {assigned_temporal}, 三元组: {assigned_triplets}, " + f"情绪信息: {assigned_emotions}, " f"陈述句嵌入: {assigned_statement_embeddings}, " f"分块嵌入: {assigned_chunk_embeddings}, " f"对话嵌入: {assigned_dialog_embeddings}" @@ -927,6 +1038,12 @@ class ExtractionOrchestrator: created_at=dialog_data.created_at, expired_at=dialog_data.expired_at, config_id=dialog_data.config_id if hasattr(dialog_data, 'config_id') else None, + # Emotion fields + emotion_type=getattr(statement, 'emotion_type', None), + emotion_intensity=getattr(statement, 'emotion_intensity', None), + emotion_keywords=getattr(statement, 'emotion_keywords', None), + emotion_subject=getattr(statement, 'emotion_subject', None), + emotion_target=getattr(statement, 'emotion_target', None), ) statement_nodes.append(statement_node) @@ -1333,7 +1450,7 @@ class ExtractionOrchestrator: if match: entity1_name = match.group(1).strip() entity1_type = match.group(2) - entity2_name = match.group(3).strip() + match.group(3).strip() entity2_type = match.group(4) # 提取置信度和原因 @@ -1646,7 +1763,6 @@ async def get_chunked_dialogs( """ import json import re - import os # 加载测试数据 testdata_path = os.path.join(os.path.dirname(__file__), "../../data", "testdata.json") @@ -1822,7 +1938,6 @@ async def get_chunked_dialogs_with_preprocessing( Returns: 带 chunks 的 DialogData 列表 """ - import os print("\n=== 完整数据处理流程(包含预处理)===") if input_data_path is None: diff --git a/api/app/core/memory/utils/config/overrides.py b/api/app/core/memory/utils/config/overrides.py index e333bb29..0dd7b2d1 100644 --- a/api/app/core/memory/utils/config/overrides.py +++ b/api/app/core/memory/utils/config/overrides.py @@ -28,7 +28,6 @@ """ import os import json -import socket from typing import Optional, Dict, Any, Literal NetworkMode = Literal['internal', 'external'] @@ -105,7 +104,6 @@ def _make_pgsql_conn() -> Optional[object]: try: import psycopg2 # type: ignore - from psycopg2.extras import RealDictCursor # type: ignore port = int(port_str) if port_str else 5432 conn = psycopg2.connect( @@ -193,7 +191,7 @@ def _fetch_db_config_by_config_id(config_id: int | str) -> Optional[Dict[str, An # config_id 在数据库中是 Integer 类型,需要转换 try: config_id_int = int(config_id) - except (ValueError, TypeError) as e: + except (ValueError, TypeError): try: pass except Exception: @@ -207,7 +205,7 @@ def _fetch_db_config_by_config_id(config_id: int | str) -> Optional[Dict[str, An " statement_granularity, include_dialogue_context, max_context, " " \"offset\" AS offset, lambda_time, lambda_mem, " " pruning_enabled, pruning_scene, pruning_threshold, " - " llm_id, embedding_id " + " llm_id, embedding_id, rerank_id " "FROM data_config WHERE config_id = %s LIMIT 1" ) cur.execute(sql, (config_id_int,)) @@ -222,7 +220,7 @@ def _fetch_db_config_by_config_id(config_id: int | str) -> Optional[Dict[str, An pass return row if row else None - except Exception as e: + except Exception: pass return None finally: @@ -325,7 +323,7 @@ def _apply_overrides_from_db_row( _set_if_present(selections, tk, db_row, tk, str) # 特殊处理 UUID 字段,确保转换为字符串格式 - for uuid_field in ("llm_id", "embedding_id"): + for uuid_field in ("llm_id", "embedding_id", "rerank_id"): if uuid_field in db_row and db_row.get(uuid_field) is not None: try: value = db_row.get(uuid_field) @@ -370,7 +368,7 @@ def _apply_overrides_from_db_row( pass return runtime_cfg - except Exception as e: + except Exception: pass return runtime_cfg @@ -460,7 +458,7 @@ def apply_runtime_overrides_with_config_id( updated_cfg = _apply_overrides_from_db_row(runtime_cfg, db_row, selected_cid, "config_id") return updated_cfg, True - except Exception as e: + except Exception: pass return runtime_cfg, False @@ -570,7 +568,7 @@ def load_unified_config( try: with open(runtime_config_path, "r", encoding="utf-8") as f: runtime_cfg = json.load(f) - except (FileNotFoundError, json.JSONDecodeError) as e: + except (FileNotFoundError, json.JSONDecodeError): runtime_cfg = {"selections": {}} # 步骤 2: 尝试从 dbrun.json 读取 config_id 并应用数据库配置(最高优先级) @@ -603,7 +601,7 @@ def load_unified_config( pass return runtime_cfg - except Exception as e: + except Exception: return {"selections": {}} diff --git a/api/app/core/memory/utils/prompt/prompt_utils.py b/api/app/core/memory/utils/prompt/prompt_utils.py index 77a23e0f..c39a3f89 100644 --- a/api/app/core/memory/utils/prompt/prompt_utils.py +++ b/api/app/core/memory/utils/prompt/prompt_utils.py @@ -238,3 +238,81 @@ async def render_memory_summary_prompt( 'json_schema': 'MemorySummaryResponse.schema' }) return rendered_prompt + +async def render_emotion_extraction_prompt( + statement: str, + extract_keywords: bool, + enable_subject: bool +) -> str: + """ + Renders the emotion extraction prompt using the extract_emotion.jinja2 template. + + Args: + statement: The statement to analyze + extract_keywords: Whether to extract emotion keywords + enable_subject: Whether to enable subject classification + + Returns: + Rendered prompt content as string + """ + template = prompt_env.get_template("extract_emotion.jinja2") + rendered_prompt = template.render( + statement=statement, + extract_keywords=extract_keywords, + enable_subject=enable_subject + ) + + # 记录渲染结果到提示日志 + log_prompt_rendering('emotion extraction', rendered_prompt) + # 可选:记录模板渲染信息 + log_template_rendering('extract_emotion.jinja2', { + 'statement': 'str', + 'extract_keywords': extract_keywords, + 'enable_subject': enable_subject + }) + + return rendered_prompt + +async def render_emotion_suggestions_prompt( + health_data: dict, + patterns: dict, + user_profile: dict +) -> str: + """ + Renders the emotion suggestions generation prompt using the generate_emotion_suggestions.jinja2 template. + + Args: + health_data: 情绪健康数据 + patterns: 情绪模式分析结果 + user_profile: 用户画像数据 + + Returns: + Rendered prompt content as string + """ + import json + + # 预处理 emotion_distribution 为 JSON 字符串 + emotion_distribution_json = json.dumps( + health_data.get('emotion_distribution', {}), + ensure_ascii=False, + indent=2 + ) + + template = prompt_env.get_template("generate_emotion_suggestions.jinja2") + rendered_prompt = template.render( + health_data=health_data, + patterns=patterns, + user_profile=user_profile, + emotion_distribution_json=emotion_distribution_json + ) + + # 记录渲染结果到提示日志 + log_prompt_rendering('emotion suggestions', rendered_prompt) + # 可选:记录模板渲染信息 + log_template_rendering('generate_emotion_suggestions.jinja2', { + 'health_score': health_data.get('health_score'), + 'health_level': health_data.get('level'), + 'user_interests': user_profile.get('interests', []) + }) + + return rendered_prompt diff --git a/api/app/core/memory/utils/prompt/prompts/extract_emotion.jinja2 b/api/app/core/memory/utils/prompt/prompts/extract_emotion.jinja2 new file mode 100644 index 00000000..5e1e425f --- /dev/null +++ b/api/app/core/memory/utils/prompt/prompts/extract_emotion.jinja2 @@ -0,0 +1,57 @@ +你是一个专业的情绪分析专家。请分析以下陈述句的情绪信息。 + +陈述句:{{ statement }} + +请提取以下信息: + +1. emotion_type(情绪类型): + - joy: 喜悦、开心、高兴、满意、愉快 + - sadness: 悲伤、难过、失落、沮丧、遗憾 + - anger: 愤怒、生气、不满、恼火、烦躁 + - fear: 恐惧、害怕、担心、焦虑、紧张 + - surprise: 惊讶、意外、震惊、吃惊 + - neutral: 中性、客观陈述、无明显情绪 + +2. emotion_intensity(情绪强度): + - 0.0-0.3: 弱情绪 + - 0.3-0.7: 中等情绪 + - 0.7-1.0: 强情绪 + +{% if extract_keywords %} +3. emotion_keywords(情绪关键词): + - 原句中直接表达情绪的词语 + - 最多提取3个关键词 + - 如果没有明显的情绪词,返回空列表 +{% else %} +3. emotion_keywords(情绪关键词): + - 返回空列表 +{% endif %} + +{% if enable_subject %} +4. emotion_subject(情绪主体): + - self: 用户本人的情绪(包含"我"、"我们"、"咱们"等第一人称) + - other: 他人的情绪(包含人名、"他/她"等第三人称) + - object: 对事物的评价(针对产品、地点、事件等) + + 注意: + - 如果同时包含多个主体,优先识别用户本人(self) + - 如果无法明确判断主体,默认为 self + +5. emotion_target(情绪对象): + - 如果有明确的情绪对象,提取其名称 + - 如果没有明确对象,返回 null +{% else %} +4. emotion_subject(情绪主体): + - 默认为 self + +5. emotion_target(情绪对象): + - 返回 null +{% endif %} + +注意事项: +- 如果陈述句是客观事实陈述,无明显情绪,标记为 neutral +- 情绪强度要符合语境,不要过度解读 +- 情绪关键词要准确,不要添加原句中没有的词 +- 主体分类要准确,优先识别用户本人(self) + +请以 JSON 格式返回结果。 diff --git a/api/app/core/memory/utils/prompt/prompts/generate_emotion_suggestions.jinja2 b/api/app/core/memory/utils/prompt/prompts/generate_emotion_suggestions.jinja2 new file mode 100644 index 00000000..6a29edd9 --- /dev/null +++ b/api/app/core/memory/utils/prompt/prompts/generate_emotion_suggestions.jinja2 @@ -0,0 +1,63 @@ +你是一位专业的心理健康顾问。请根据以下用户的情绪健康数据和个人信息,生成3-5条个性化的情绪改善建议。 + +## 用户情绪健康数据 + +健康分数:{{ health_data.health_score }}/100 +健康等级:{{ health_data.level }} + +维度分析: +- 积极率:{{ health_data.dimensions.positivity_rate.score }}/100 + - 正面情绪:{{ health_data.dimensions.positivity_rate.positive_count }}次 + - 负面情绪:{{ health_data.dimensions.positivity_rate.negative_count }}次 + - 中性情绪:{{ health_data.dimensions.positivity_rate.neutral_count }}次 + +- 稳定性:{{ health_data.dimensions.stability.score }}/100 + - 标准差:{{ health_data.dimensions.stability.std_deviation }} + +- 恢复力:{{ health_data.dimensions.resilience.score }}/100 + - 恢复率:{{ health_data.dimensions.resilience.recovery_rate }} + +情绪分布: +{{ emotion_distribution_json }} + +## 情绪模式分析 + +主要负面情绪:{{ patterns.dominant_negative_emotion|default('无') }} +情绪波动性:{{ patterns.emotion_volatility|default('未知') }} +高强度情绪次数:{{ patterns.high_intensity_emotions|default([])|length }} + +## 用户兴趣 + +{{ user_profile.interests|default(['未知'])|join(', ') }} + +## 任务要求 + +请生成3-5条个性化建议,每条建议包含: +1. type: 建议类型(emotion_balance/activity_recommendation/social_connection/stress_management) +2. title: 建议标题(简短有力) +3. content: 建议内容(详细说明,50-100字) +4. priority: 优先级(high/medium/low) +5. actionable_steps: 3个可执行的具体步骤 + +同时提供一个health_summary(不超过50字),概括用户的整体情绪状态。 + +请以JSON格式返回,格式如下: +{ + "health_summary": "您的情绪健康状况...", + "suggestions": [ + { + "type": "emotion_balance", + "title": "建议标题", + "content": "建议内容...", + "priority": "high", + "actionable_steps": ["步骤1", "步骤2", "步骤3"] + } + ] +} + +注意事项: +- 建议要具体、可执行,避免空泛 +- 结合用户的兴趣爱好提供个性化建议 +- 针对主要问题(如主要负面情绪)提供针对性建议 +- 优先级要合理分配(至少1个high,1-2个medium,其余low) +- 每个建议的3个步骤要循序渐进、易于实施 diff --git a/api/app/models/data_config_model.py b/api/app/models/data_config_model.py index be43bd8d..870d46b2 100644 --- a/api/app/models/data_config_model.py +++ b/api/app/models/data_config_model.py @@ -64,7 +64,14 @@ class DataConfig(Base): lambda_time = Column("lambda_time", Float, default=0.5, comment="最低保持度,0-1 小数") lambda_mem = Column("lambda_mem", Float, default=0.5, comment="遗忘率,0-1 小数") offset = Column("offset", Float, default=0.0, comment="偏移度,0-1 小数") - + + # 情绪引擎配置 + emotion_enabled = Column(Boolean, default=True, comment="是否启用情绪提取") + emotion_model_id = Column(String, nullable=True, comment="情绪分析专用模型ID") + emotion_extract_keywords = Column(Boolean, default=True, comment="是否提取情绪关键词") + emotion_min_intensity = Column(Float, default=0.1, comment="最小情绪强度阈值") + emotion_enable_subject = Column(Boolean, default=True, comment="是否启用主体分类") + # 时间戳 created_at = Column(DateTime, default=datetime.datetime.now, comment="创建时间") updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now, comment="更新时间") diff --git a/api/app/repositories/neo4j/add_nodes.py b/api/app/repositories/neo4j/add_nodes.py index d339879f..ce4a6876 100644 --- a/api/app/repositories/neo4j/add_nodes.py +++ b/api/app/repositories/neo4j/add_nodes.py @@ -100,7 +100,13 @@ async def add_statement_nodes(statements: List[StatementNode], connector: Neo4jC # "triplets": [triplet.model_dump() for triplet in statement.triplet_extraction_info.triplets] if statement.triplet_extraction_info else [], # "entities": [entity.model_dump() for entity in statement.triplet_extraction_info.entities] if statement.triplet_extraction_info else [] # }) if statement.triplet_extraction_info else json.dumps({"triplets": [], "entities": []}), - "statement_embedding": statement.statement_embedding if statement.statement_embedding else None + "statement_embedding": statement.statement_embedding if statement.statement_embedding else None, + # 添加情绪字段处理 + "emotion_type": statement.emotion_type, + "emotion_intensity": statement.emotion_intensity, + "emotion_keywords": statement.emotion_keywords if statement.emotion_keywords else [], + "emotion_subject": statement.emotion_subject, + "emotion_target": statement.emotion_target } flattened_statements.append(flattened_statement) diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 95e2ee03..0f6e32aa 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -20,20 +20,25 @@ UNWIND $statements AS statement MERGE (s:Statement {id: statement.id}) SET s += { id: statement.id, + run_id: statement.run_id, + chunk_id: statement.chunk_id, group_id: statement.group_id, user_id: statement.user_id, apply_id: statement.apply_id, - chunk_id: statement.chunk_id, - run_id: statement.run_id, + stmt_type: statement.stmt_type, + statement: statement.statement, + emotion_intensity: statement.emotion_intensity, + emotion_target: statement.emotion_target, + emotion_subject: statement.emotion_subject, + emotion_type: statement.emotion_type, + emotion_keywords: statement.emotion_keywords, + temporal_info: statement.temporal_info, created_at: statement.created_at, expired_at: statement.expired_at, - stmt_type: statement.stmt_type, - temporal_info: statement.temporal_info, - relevence_info: statement.relevence_info, - statement: statement.statement, valid_at: statement.valid_at, invalid_at: statement.invalid_at, - statement_embedding: statement.statement_embedding + statement_embedding: statement.statement_embedding, + relevence_info: statement.relevence_info } RETURN s.id AS uuid """ diff --git a/api/app/repositories/neo4j/emotion_repository.py b/api/app/repositories/neo4j/emotion_repository.py new file mode 100644 index 00000000..d445c8d4 --- /dev/null +++ b/api/app/repositories/neo4j/emotion_repository.py @@ -0,0 +1,246 @@ +# -*- coding: utf-8 -*- +"""情绪数据仓储模块 + +本模块提供情绪数据的查询功能,用于情绪分析和统计。 + +Classes: + EmotionRepository: 情绪数据仓储,提供情绪标签、词云、健康指数等查询方法 +""" + +from typing import List, Dict, Optional, Any +from datetime import datetime, timedelta +import json + +from app.repositories.neo4j.neo4j_connector import Neo4jConnector +from app.core.logging_config import get_business_logger + +logger = get_business_logger() + + +class EmotionRepository: + """情绪数据仓储 + + 提供情绪数据的查询和统计功能,包括: + - 情绪标签统计 + - 情绪词云数据 + - 时间范围内的情绪数据查询 + + Attributes: + connector: Neo4j连接器实例 + """ + + def __init__(self, connector: Neo4jConnector): + """初始化情绪数据仓储 + + Args: + connector: Neo4j连接器实例 + """ + self.connector = connector + logger.info("情绪数据仓储初始化完成") + + async def get_emotion_tags( + self, + group_id: str, + emotion_type: Optional[str] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: int = 10 + ) -> List[Dict[str, Any]]: + """获取情绪标签统计 + + 查询指定用户的情绪类型分布,包括计数、百分比和平均强度。 + + Args: + group_id: 用户组ID(宿主ID) + emotion_type: 可选的情绪类型过滤(joy/sadness/anger/fear/surprise/neutral) + start_date: 可选的开始日期(ISO格式字符串) + end_date: 可选的结束日期(ISO格式字符串) + limit: 返回结果的最大数量 + + Returns: + List[Dict]: 情绪标签列表,每个包含: + - emotion_type: 情绪类型 + - count: 该类型的数量 + - percentage: 占比百分比 + - avg_intensity: 平均强度 + """ + # 构建查询条件 + where_clauses = ["s.group_id = $group_id", "s.emotion_type IS NOT NULL"] + params = {"group_id": group_id, "limit": limit} + + if emotion_type: + where_clauses.append("s.emotion_type = $emotion_type") + params["emotion_type"] = emotion_type + + if start_date: + where_clauses.append("s.created_at >= $start_date") + params["start_date"] = start_date + + if end_date: + where_clauses.append("s.created_at <= $end_date") + params["end_date"] = end_date + + where_str = " AND ".join(where_clauses) + + # 优化的 Cypher 查询:使用索引,减少中间结果 + query = f""" + MATCH (s:Statement) + WHERE {where_str} + WITH s.emotion_type as emotion_type, + count(*) as count, + avg(s.emotion_intensity) as avg_intensity + WITH collect({{emotion_type: emotion_type, count: count, avg_intensity: avg_intensity}}) as results, + sum(count) as total_count + UNWIND results as result + RETURN result.emotion_type as emotion_type, + result.count as count, + toFloat(result.count) / total_count * 100 as percentage, + result.avg_intensity as avg_intensity + ORDER BY count DESC + LIMIT $limit + """ + + try: + results = await self.connector.execute_query(query, **params) + formatted_results = [ + { + "emotion_type": record["emotion_type"], + "count": record["count"], + "percentage": round(record["percentage"], 2), + "avg_intensity": round(record["avg_intensity"], 3) if record["avg_intensity"] else 0.0 + } + for record in results + ] + + return formatted_results + except Exception as e: + logger.error(f"查询情绪标签失败: {str(e)}", exc_info=True) + return [] + + async def get_emotion_wordcloud( + self, + group_id: str, + emotion_type: Optional[str] = None, + limit: int = 50 + ) -> List[Dict[str, Any]]: + """获取情绪词云数据 + + 查询情绪关键词及其频率,用于生成词云可视化。 + + Args: + group_id: 用户组ID(宿主ID) + emotion_type: 可选的情绪类型过滤 + limit: 返回关键词的最大数量 + + Returns: + List[Dict]: 关键词列表,每个包含: + - keyword: 关键词 + - frequency: 出现频率 + - emotion_type: 关联的情绪类型 + - avg_intensity: 平均强度 + """ + # 构建查询条件 + where_clauses = ["s.group_id = $group_id", "s.emotion_keywords IS NOT NULL"] + params = {"group_id": group_id, "limit": limit} + + if emotion_type: + where_clauses.append("s.emotion_type = $emotion_type") + params["emotion_type"] = emotion_type + + where_str = " AND ".join(where_clauses) + + # 优化的 Cypher 查询:使用索引,减少不必要的计算 + query = f""" + MATCH (s:Statement) + WHERE {where_str} + UNWIND s.emotion_keywords as keyword + WITH keyword, + s.emotion_type as emotion_type, + count(*) as frequency, + avg(s.emotion_intensity) as avg_intensity + WHERE keyword IS NOT NULL AND keyword <> '' + RETURN keyword, + frequency, + emotion_type, + avg_intensity + ORDER BY frequency DESC + LIMIT $limit + """ + + try: + results = await self.connector.execute_query(query, **params) + formatted_results = [ + { + "keyword": record["keyword"], + "frequency": record["frequency"], + "emotion_type": record["emotion_type"], + "avg_intensity": round(record["avg_intensity"], 3) if record["avg_intensity"] else 0.0 + } + for record in results + ] + + return formatted_results + except Exception as e: + logger.error(f"查询情绪词云失败: {str(e)}", exc_info=True) + return [] + + async def get_emotions_in_range( + self, + group_id: str, + time_range: str = "30d" + ) -> List[Dict[str, Any]]: + """获取时间范围内的情绪数据 + + 查询指定时间范围内的所有情绪数据,用于健康指数计算。 + + Args: + group_id: 用户组ID(宿主ID) + time_range: 时间范围(7d/30d/90d) + + Returns: + List[Dict]: 情绪数据列表,每个包含: + - emotion_type: 情绪类型 + - emotion_intensity: 情绪强度 + - created_at: 创建时间 + - statement_id: 陈述句ID + """ + # 解析时间范围 + days_map = {"7d": 7, "30d": 30, "90d": 90} + days = days_map.get(time_range, 30) + + # 计算起始日期(使用字符串比较,避免时区问题) + start_date = (datetime.now() - timedelta(days=days)).isoformat() + + # 优化的 Cypher 查询:使用字符串比较避免时区问题 + query = """ + MATCH (s:Statement) + WHERE s.group_id = $group_id + AND s.emotion_type IS NOT NULL + AND s.created_at >= $start_date + RETURN s.id as statement_id, + s.emotion_type as emotion_type, + s.emotion_intensity as emotion_intensity, + s.created_at as created_at + ORDER BY s.created_at ASC + """ + + try: + results = await self.connector.execute_query( + query, + group_id=group_id, + start_date=start_date + ) + formatted_results = [ + { + "statement_id": record["statement_id"], + "emotion_type": record["emotion_type"], + "emotion_intensity": record["emotion_intensity"], + "created_at": record["created_at"].isoformat() if hasattr(record["created_at"], "isoformat") else str(record["created_at"]) + } + for record in results + ] + + return formatted_results + except Exception as e: + logger.error(f"查询时间范围情绪数据失败: {str(e)}", exc_info=True) + return [] diff --git a/api/app/repositories/neo4j/statement_repository.py b/api/app/repositories/neo4j/statement_repository.py index ec2d6660..34858444 100644 --- a/api/app/repositories/neo4j/statement_repository.py +++ b/api/app/repositories/neo4j/statement_repository.py @@ -58,11 +58,22 @@ class StatementRepository(BaseNeo4jRepository[StatementNode]): n['invalid_at'] = datetime.fromisoformat(n['invalid_at']) # 处理temporal_info字段 - if isinstance(n.get('temporal_info'), dict): + if isinstance(n.get('temporal_info'), str): + # 从字符串转换为枚举值 + n['temporal_info'] = TemporalInfo(n['temporal_info']) + elif isinstance(n.get('temporal_info'), dict): n['temporal_info'] = TemporalInfo(**n['temporal_info']) elif not n.get('temporal_info'): # 如果没有temporal_info,创建一个默认的 - n['temporal_info'] = TemporalInfo() + n['temporal_info'] = TemporalInfo.STATIC + + # 处理情绪字段 - 映射 Neo4j 节点属性到 StatementNode 模型 + # 处理空值情况,确保字段存在 + n['emotion_type'] = n.get('emotion_type') + n['emotion_intensity'] = n.get('emotion_intensity') + n['emotion_keywords'] = n.get('emotion_keywords', []) + n['emotion_subject'] = n.get('emotion_subject') + n['emotion_target'] = n.get('emotion_target') return StatementNode(**n) diff --git a/api/app/schemas/emotion_schema.py b/api/app/schemas/emotion_schema.py new file mode 100644 index 00000000..9f14884d --- /dev/null +++ b/api/app/schemas/emotion_schema.py @@ -0,0 +1,32 @@ +"""情绪分析相关的请求和响应模型""" + +from typing import Optional +from pydantic import BaseModel, Field + + +class EmotionTagsRequest(BaseModel): + """获取情绪标签统计请求""" + group_id: str = Field(..., description="组ID") + emotion_type: Optional[str] = Field(None, description="情绪类型过滤(joy/sadness/anger/fear/surprise/neutral)") + start_date: Optional[str] = Field(None, description="开始日期(ISO格式,如:2024-01-01)") + end_date: Optional[str] = Field(None, description="结束日期(ISO格式,如:2024-12-31)") + limit: int = Field(10, ge=1, le=100, description="返回数量限制") + + +class EmotionWordcloudRequest(BaseModel): + """获取情绪词云数据请求""" + group_id: str = Field(..., description="组ID") + emotion_type: Optional[str] = Field(None, description="情绪类型过滤(joy/sadness/anger/fear/surprise/neutral)") + limit: int = Field(50, ge=1, le=200, description="返回词语数量") + + +class EmotionHealthRequest(BaseModel): + """获取情绪健康指数请求""" + group_id: str = Field(..., description="组ID") + time_range: str = Field("30d", description="时间范围(7d/30d/90d)") + + +class EmotionSuggestionsRequest(BaseModel): + """获取个性化情绪建议请求""" + group_id: str = Field(..., description="组ID") + config_id: Optional[int] = Field(None, description="配置ID(用于指定LLM模型)") diff --git a/api/app/services/emotion_analytics_service.py b/api/app/services/emotion_analytics_service.py new file mode 100644 index 00000000..6952256e --- /dev/null +++ b/api/app/services/emotion_analytics_service.py @@ -0,0 +1,670 @@ +# -*- coding: utf-8 -*- +"""情绪分析服务模块 + +本模块提供情绪数据的分析和统计功能,包括情绪标签、词云、健康指数计算等。 + +Classes: + EmotionAnalyticsService: 情绪分析服务,提供各种情绪分析功能 +""" + +from typing import Dict, Any, Optional, List +import statistics +import json +from pydantic import BaseModel, Field + +from app.repositories.neo4j.emotion_repository import EmotionRepository +from app.repositories.neo4j.neo4j_connector import Neo4jConnector +from app.core.logging_config import get_business_logger + +logger = get_business_logger() + + +class EmotionSuggestion(BaseModel): + """情绪建议模型""" + type: str = Field(..., description="建议类型:emotion_balance/activity_recommendation/social_connection/stress_management") + title: str = Field(..., description="建议标题") + content: str = Field(..., description="建议内容") + priority: str = Field(..., description="优先级:high/medium/low") + actionable_steps: List[str] = Field(..., description="可执行步骤列表(3个)") + + +class EmotionSuggestionsResponse(BaseModel): + """情绪建议响应模型""" + health_summary: str = Field(..., description="健康状态摘要(不超过50字)") + suggestions: List[EmotionSuggestion] = Field(..., description="建议列表(3-5条)") + + +class EmotionAnalyticsService: + """情绪分析服务 + + 提供情绪数据的分析和统计功能,包括: + - 情绪标签统计 + - 情绪词云数据 + - 情绪健康指数计算 + - 个性化情绪建议生成 + + Attributes: + emotion_repo: 情绪数据仓储实例 + """ + + def __init__(self): + """初始化情绪分析服务""" + connector = Neo4jConnector() + self.emotion_repo = EmotionRepository(connector) + logger.info("情绪分析服务初始化完成") + + async def get_emotion_tags( + self, + end_user_id: str, + emotion_type: Optional[str] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: int = 10 + ) -> Dict[str, Any]: + """获取情绪标签统计 + + 查询指定用户的情绪类型分布,包括计数、百分比和平均强度。 + + Args: + end_user_id: 宿主ID(用户组ID) + emotion_type: 可选的情绪类型过滤 + start_date: 可选的开始日期(ISO格式) + end_date: 可选的结束日期(ISO格式) + limit: 返回结果的最大数量 + + Returns: + Dict: 包含情绪标签统计的响应数据: + - tags: 情绪标签列表 + - total_count: 总情绪数量 + - time_range: 时间范围信息 + """ + try: + logger.info(f"获取情绪标签统计: user={end_user_id}, type={emotion_type}, " + f"start={start_date}, end={end_date}, limit={limit}") + + # 调用仓储层查询 + tags = await self.emotion_repo.get_emotion_tags( + group_id=end_user_id, + emotion_type=emotion_type, + start_date=start_date, + end_date=end_date, + limit=limit + ) + + # 计算总数 + total_count = sum(tag["count"] for tag in tags) + + # 构建时间范围信息 + time_range = {} + if start_date: + time_range["start_date"] = start_date + if end_date: + time_range["end_date"] = end_date + + # 格式化响应 + response = { + "tags": tags, + "total_count": total_count, + "time_range": time_range if time_range else None + } + + logger.info(f"情绪标签统计完成: total_count={total_count}, tags_count={len(tags)}") + return response + + except Exception as e: + logger.error(f"获取情绪标签统计失败: {str(e)}", exc_info=True) + raise + + async def get_emotion_wordcloud( + self, + end_user_id: str, + emotion_type: Optional[str] = None, + limit: int = 50 + ) -> Dict[str, Any]: + """获取情绪词云数据 + + 查询情绪关键词及其频率,用于生成词云可视化。 + + Args: + end_user_id: 宿主ID(用户组ID) + emotion_type: 可选的情绪类型过滤 + limit: 返回关键词的最大数量 + + Returns: + Dict: 包含情绪词云数据的响应: + - keywords: 关键词列表 + - total_keywords: 总关键词数量 + """ + try: + logger.info(f"获取情绪词云数据: user={end_user_id}, type={emotion_type}, limit={limit}") + + # 调用仓储层查询 + keywords = await self.emotion_repo.get_emotion_wordcloud( + group_id=end_user_id, + emotion_type=emotion_type, + limit=limit + ) + + # 计算总关键词数量 + total_keywords = len(keywords) + + # 格式化响应 + response = { + "keywords": keywords, + "total_keywords": total_keywords + } + + logger.info(f"情绪词云数据获取完成: total_keywords={total_keywords}") + return response + + except Exception as e: + logger.error(f"获取情绪词云数据失败: {str(e)}", exc_info=True) + raise + + def _calculate_positivity_rate(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]: + """计算积极率 + + 根据情绪类型分类正面、负面和中性情绪,计算积极率。 + 公式:(正面数 / (正面数 + 负面数)) * 100 + + Args: + emotions: 情绪数据列表,每个包含 emotion_type 字段 + + Returns: + Dict: 包含积极率计算结果: + - score: 积极率分数(0-100) + - positive_count: 正面情绪数量 + - negative_count: 负面情绪数量 + - neutral_count: 中性情绪数量 + """ + # 定义情绪分类 + positive_emotions = {'joy', 'surprise'} + negative_emotions = {'sadness', 'anger', 'fear'} + + # 统计各类情绪数量 + positive_count = sum(1 for e in emotions if e.get('emotion_type') in positive_emotions) + negative_count = sum(1 for e in emotions if e.get('emotion_type') in negative_emotions) + neutral_count = sum(1 for e in emotions if e.get('emotion_type') == 'neutral') + + # 计算积极率 + total_non_neutral = positive_count + negative_count + if total_non_neutral > 0: + score = (positive_count / total_non_neutral) * 100 + else: + score = 50.0 # 如果没有非中性情绪,默认为50 + + logger.debug(f"积极率计算: positive={positive_count}, negative={negative_count}, " + f"neutral={neutral_count}, score={score:.2f}") + + return { + "score": round(score, 2), + "positive_count": positive_count, + "negative_count": negative_count, + "neutral_count": neutral_count + } + + def _calculate_stability(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]: + """计算稳定性 + + 基于情绪强度的标准差计算情绪稳定性。 + 公式:(1 - min(std_deviation, 1.0)) * 100 + + Args: + emotions: 情绪数据列表,每个包含 emotion_intensity 字段 + + Returns: + Dict: 包含稳定性计算结果: + - score: 稳定性分数(0-100) + - std_deviation: 标准差 + """ + # 提取所有情绪强度 + intensities = [e.get('emotion_intensity', 0.0) for e in emotions if e.get('emotion_intensity') is not None] + + # 计算标准差 + if len(intensities) >= 2: + std_deviation = statistics.stdev(intensities) + elif len(intensities) == 1: + std_deviation = 0.0 # 只有一个数据点,标准差为0 + else: + std_deviation = 0.0 # 没有数据,标准差为0 + + # 计算稳定性分数 + # 标准差越小,稳定性越高 + score = (1 - min(std_deviation, 1.0)) * 100 + + logger.debug(f"稳定性计算: intensities_count={len(intensities)}, " + f"std_deviation={std_deviation:.3f}, score={score:.2f}") + + return { + "score": round(score, 2), + "std_deviation": round(std_deviation, 3) + } + + def _calculate_resilience(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]: + """计算恢复力 + + 分析情绪转换模式,统计从负面情绪恢复到正面情绪的能力。 + 公式:(负面到正面转换次数 / 总负面情绪数) * 100 + + Args: + emotions: 情绪数据列表,每个包含 emotion_type 和 created_at 字段 + 应该按时间顺序排列 + + Returns: + Dict: 包含恢复力计算结果: + - score: 恢复力分数(0-100) + - recovery_rate: 恢复率(转换次数/负面情绪数) + """ + # 定义情绪分类 + positive_emotions = {'joy', 'surprise'} + negative_emotions = {'sadness', 'anger', 'fear'} + + # 统计负面到正面的转换次数 + recovery_count = 0 + negative_count = 0 + + for i in range(len(emotions)): + current_emotion = emotions[i].get('emotion_type') + + # 统计负面情绪总数 + if current_emotion in negative_emotions: + negative_count += 1 + + # 检查下一个情绪是否为正面 + if i + 1 < len(emotions): + next_emotion = emotions[i + 1].get('emotion_type') + if next_emotion in positive_emotions: + recovery_count += 1 + + # 计算恢复力分数 + if negative_count > 0: + recovery_rate = recovery_count / negative_count + score = recovery_rate * 100 + else: + # 如果没有负面情绪,恢复力设为100(最佳状态) + recovery_rate = 1.0 + score = 100.0 + + logger.debug(f"恢复力计算: negative_count={negative_count}, " + f"recovery_count={recovery_count}, score={score:.2f}") + + return { + "score": round(score, 2), + "recovery_rate": round(recovery_rate, 3) + } + + async def calculate_emotion_health_index( + self, + end_user_id: str, + time_range: str = "30d" + ) -> Dict[str, Any]: + """计算情绪健康指数 + + 综合积极率、稳定性和恢复力计算情绪健康指数。 + + Args: + end_user_id: 宿主ID(用户组ID) + time_range: 时间范围(7d/30d/90d) + + Returns: + Dict: 包含情绪健康指数的完整响应: + - health_score: 综合健康分数(0-100) + - level: 健康等级(优秀/良好/一般/较差) + - dimensions: 各维度详细数据 + - positivity_rate: 积极率 + - stability: 稳定性 + - resilience: 恢复力 + - emotion_distribution: 情绪分布统计 + - time_range: 时间范围 + """ + try: + logger.info(f"计算情绪健康指数: user={end_user_id}, time_range={time_range}") + + # 获取时间范围内的情绪数据 + emotions = await self.emotion_repo.get_emotions_in_range( + group_id=end_user_id, + time_range=time_range + ) + + # 如果没有数据,返回默认值 + if not emotions: + logger.warning(f"用户 {end_user_id} 在时间范围 {time_range} 内没有情绪数据") + return { + "health_score": 0.0, + "level": "无数据", + "dimensions": { + "positivity_rate": {"score": 0.0, "positive_count": 0, "negative_count": 0, "neutral_count": 0}, + "stability": {"score": 0.0, "std_deviation": 0.0}, + "resilience": {"score": 0.0, "recovery_rate": 0.0} + }, + "emotion_distribution": {}, + "time_range": time_range + } + + # 计算各维度指标 + positivity_rate = self._calculate_positivity_rate(emotions) + stability = self._calculate_stability(emotions) + resilience = self._calculate_resilience(emotions) + + # 计算综合健康分数 + # 公式:positivity_rate * 0.4 + stability * 0.3 + resilience * 0.3 + health_score = ( + positivity_rate["score"] * 0.4 + + stability["score"] * 0.3 + + resilience["score"] * 0.3 + ) + + # 确定健康等级 + if health_score >= 80: + level = "优秀" + elif health_score >= 60: + level = "良好" + elif health_score >= 40: + level = "一般" + else: + level = "较差" + + # 统计情绪分布 + emotion_distribution = {} + for emotion_type in ['joy', 'sadness', 'anger', 'fear', 'surprise', 'neutral']: + count = sum(1 for e in emotions if e.get('emotion_type') == emotion_type) + emotion_distribution[emotion_type] = count + + # 格式化响应 + response = { + "health_score": round(health_score, 2), + "level": level, + "dimensions": { + "positivity_rate": positivity_rate, + "stability": stability, + "resilience": resilience + }, + "emotion_distribution": emotion_distribution, + "time_range": time_range + } + + logger.info(f"情绪健康指数计算完成: score={health_score:.2f}, level={level}") + return response + + except Exception as e: + logger.error(f"计算情绪健康指数失败: {str(e)}", exc_info=True) + raise + + def _analyze_emotion_patterns(self, emotions: List[Dict[str, Any]]) -> Dict[str, Any]: + """分析情绪模式 + + 识别主要负面情绪、情绪触发因素和波动时段。 + + Args: + emotions: 情绪数据列表,每个包含 emotion_type、emotion_intensity、created_at 字段 + + Returns: + Dict: 包含情绪模式分析结果: + - dominant_negative_emotion: 主要负面情绪类型 + - high_intensity_emotions: 高强度情绪列表 + - emotion_volatility: 情绪波动性(高/中/低) + """ + negative_emotions = {'sadness', 'anger', 'fear'} + + # 统计负面情绪分布 + negative_emotion_counts = {} + for emotion in emotions: + emotion_type = emotion.get('emotion_type') + if emotion_type in negative_emotions: + negative_emotion_counts[emotion_type] = negative_emotion_counts.get(emotion_type, 0) + 1 + + # 识别主要负面情绪 + dominant_negative_emotion = None + if negative_emotion_counts: + dominant_negative_emotion = max(negative_emotion_counts, key=negative_emotion_counts.get) + + # 识别高强度情绪(强度 >= 0.7) + high_intensity_emotions = [ + { + "type": e.get('emotion_type'), + "intensity": e.get('emotion_intensity'), + "created_at": e.get('created_at') + } + for e in emotions + if e.get('emotion_intensity', 0) >= 0.7 + ] + + # 评估情绪波动性 + intensities = [e.get('emotion_intensity', 0.0) for e in emotions if e.get('emotion_intensity') is not None] + if len(intensities) >= 2: + std_dev = statistics.stdev(intensities) + if std_dev > 0.3: + volatility = "高" + elif std_dev > 0.15: + volatility = "中" + else: + volatility = "低" + else: + volatility = "未知" + + logger.debug(f"情绪模式分析: dominant_negative={dominant_negative_emotion}, " + f"high_intensity_count={len(high_intensity_emotions)}, volatility={volatility}") + + return { + "dominant_negative_emotion": dominant_negative_emotion, + "high_intensity_emotions": high_intensity_emotions[:5], # 最多返回5个 + "emotion_volatility": volatility + } + + async def generate_emotion_suggestions( + self, + end_user_id: str, + config_id: Optional[int] = None + ) -> Dict[str, Any]: + """生成个性化情绪建议 + + 基于情绪健康数据和用户画像生成个性化建议。 + + Args: + end_user_id: 宿主ID(用户组ID) + config_id: 配置ID(可选,用于从数据库加载LLM配置) + + Returns: + Dict: 包含个性化建议的响应: + - health_summary: 健康状态摘要 + - suggestions: 建议列表(3-5条) + """ + try: + logger.info(f"生成个性化情绪建议: user={end_user_id}, config_id={config_id}") + + # 1. 如果提供了 config_id,从数据库加载配置 + if config_id is not None: + from app.core.memory.utils.config.definitions import reload_configuration_from_database + config_loaded = reload_configuration_from_database(config_id) + if not config_loaded: + logger.warning(f"无法加载配置 config_id={config_id},将使用默认配置") + + # 2. 获取情绪健康数据 + health_data = await self.calculate_emotion_health_index(end_user_id, time_range="30d") + + # 3. 获取情绪数据用于模式分析 + emotions = await self.emotion_repo.get_emotions_in_range( + group_id=end_user_id, + time_range="30d" + ) + + # 4. 分析情绪模式 + patterns = self._analyze_emotion_patterns(emotions) + + # 5. 获取用户画像数据(简化版,直接从Neo4j获取) + user_profile = await self._get_simple_user_profile(end_user_id) + + # 6. 构建LLM prompt + prompt = await self._build_suggestion_prompt(health_data, patterns, user_profile) + + # 7. 调用LLM生成建议(使用配置中的LLM) + from app.core.memory.utils.llm.llm_utils import get_llm_client + llm_client = get_llm_client() + + # 将 prompt 转换为 messages 格式 + messages = [ + {"role": "user", "content": prompt} + ] + + response = await llm_client.chat(messages=messages) + response_text = response.content.strip() + + # 8. 解析LLM响应 + try: + response_data = json.loads(response_text) + suggestions_response = EmotionSuggestionsResponse(**response_data) + except (json.JSONDecodeError, Exception) as e: + logger.error(f"解析LLM响应失败: {str(e)}, response={response_text}") + # 返回默认建议 + suggestions_response = self._get_default_suggestions(health_data) + + # 8. 验证建议数量(3-5条) + if len(suggestions_response.suggestions) < 3: + logger.warning(f"建议数量不足: {len(suggestions_response.suggestions)}") + suggestions_response = self._get_default_suggestions(health_data) + elif len(suggestions_response.suggestions) > 5: + logger.warning(f"建议数量过多: {len(suggestions_response.suggestions)}") + suggestions_response.suggestions = suggestions_response.suggestions[:5] + + # 9. 格式化响应 + response = { + "health_summary": suggestions_response.health_summary, + "suggestions": [ + { + "type": s.type, + "title": s.title, + "content": s.content, + "priority": s.priority, + "actionable_steps": s.actionable_steps + } + for s in suggestions_response.suggestions + ] + } + + logger.info(f"个性化建议生成完成: suggestions_count={len(response['suggestions'])}") + return response + + except Exception as e: + logger.error(f"生成个性化建议失败: {str(e)}", exc_info=True) + raise + + async def _get_simple_user_profile(self, end_user_id: str) -> Dict[str, Any]: + """获取简化的用户画像数据 + + Args: + end_user_id: 用户ID + + Returns: + Dict: 用户画像数据 + """ + try: + connector = Neo4jConnector() + + # 查询用户的实体和标签 + query = """ + MATCH (e:Entity) + WHERE e.group_id = $group_id + RETURN e.name as name, e.type as type + ORDER BY e.created_at DESC + LIMIT 20 + """ + + entities = await connector.execute_query(query, group_id=end_user_id) + + # 提取兴趣标签 + interests = [e["name"] for e in entities if e.get("type") in ["INTEREST", "HOBBY"]][:5] + # 后期会引入用户的习惯。。 + return { + "interests": interests if interests else ["未知"] + } + + except Exception as e: + logger.error(f"获取用户画像失败: {str(e)}") + return {"interests": ["未知"]} + + async def _build_suggestion_prompt( + self, + health_data: Dict[str, Any], + patterns: Dict[str, Any], + user_profile: Dict[str, Any] + ) -> str: + """构建情绪建议生成的prompt + + Args: + health_data: 情绪健康数据 + patterns: 情绪模式分析结果 + user_profile: 用户画像数据 + + Returns: + str: LLM prompt + """ + from app.core.memory.utils.prompt.prompt_utils import render_emotion_suggestions_prompt + + prompt = await render_emotion_suggestions_prompt( + health_data=health_data, + patterns=patterns, + user_profile=user_profile + ) + + return prompt + + def _get_default_suggestions(self, health_data: Dict[str, Any]) -> EmotionSuggestionsResponse: + """获取默认建议(当LLM调用失败时使用) + + Args: + health_data: 情绪健康数据 + + Returns: + EmotionSuggestionsResponse: 默认建议 + """ + health_score = health_data.get('health_score', 0) + + if health_score >= 80: + summary = "您的情绪健康状况优秀,请继续保持积极的生活态度。" + elif health_score >= 60: + summary = "您的情绪健康状况良好,可以通过一些调整进一步提升。" + elif health_score >= 40: + summary = "您的情绪健康需要关注,建议采取一些改善措施。" + else: + summary = "您的情绪健康需要重点关注,建议寻求专业帮助。" + + suggestions = [ + EmotionSuggestion( + type="emotion_balance", + title="保持情绪平衡", + content="通过正念冥想和深呼吸练习,帮助您更好地管理情绪波动,提升情绪稳定性。", + priority="high", + actionable_steps=[ + "每天早晨进行5-10分钟的正念冥想", + "感到情绪波动时,进行3次深呼吸", + "记录每天的情绪变化,识别触发因素" + ] + ), + EmotionSuggestion( + type="activity_recommendation", + title="增加户外活动", + content="适度的户外运动可以有效改善情绪,增强身心健康。建议每周进行3-4次户外活动。", + priority="medium", + actionable_steps=[ + "每周安排2-3次30分钟的散步", + "周末尝试户外运动如骑行或爬山", + "在户外活动时关注周围环境,放松心情" + ] + ), + EmotionSuggestion( + type="social_connection", + title="加强社交联系", + content="与朋友和家人保持良好的社交联系,可以提供情感支持,改善情绪健康。", + priority="medium", + actionable_steps=[ + "每周至少与一位朋友或家人深入交流", + "参加感兴趣的社交活动或兴趣小组", + "主动分享自己的感受和想法" + ] + ) + ] + + return EmotionSuggestionsResponse( + health_summary=summary, + suggestions=suggestions + ) diff --git a/api/app/services/emotion_config_service.py b/api/app/services/emotion_config_service.py new file mode 100644 index 00000000..37171640 --- /dev/null +++ b/api/app/services/emotion_config_service.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +"""情绪配置服务模块 + +本模块提供情绪引擎配置的管理功能,包括获取和更新配置。 + +Classes: + EmotionConfigService: 情绪配置服务,提供配置管理功能 +""" + +from typing import Dict, Any +from sqlalchemy.orm import Session + +from app.models.data_config_model import DataConfig +from app.core.logging_config import get_business_logger + +logger = get_business_logger() + + +class EmotionConfigService: + """情绪配置服务 + + 提供情绪引擎配置的管理功能,包括: + - 获取情绪配置 + - 更新情绪配置 + - 验证配置参数 + + Attributes: + db: 数据库会话 + """ + + def __init__(self, db: Session): + """初始化情绪配置服务 + + Args: + db: 数据库会话 + """ + self.db = db + logger.info("情绪配置服务初始化完成") + + def get_emotion_config(self, config_id: int) -> Dict[str, Any]: + """获取情绪引擎配置 + + 查询指定配置ID的情绪相关配置字段。 + + Args: + config_id: 配置ID + + Returns: + Dict: 包含情绪配置的响应数据: + - config_id: 配置ID + - emotion_enabled: 是否启用情绪提取 + - emotion_model_id: 情绪分析专用模型ID + - emotion_extract_keywords: 是否提取情绪关键词 + - emotion_min_intensity: 最小情绪强度阈值 + - emotion_enable_subject: 是否启用主体分类 + + Raises: + ValueError: 当配置不存在时 + """ + try: + logger.info(f"获取情绪配置: config_id={config_id}") + + # 查询配置 + config = self.db.query(DataConfig).filter( + DataConfig.config_id == config_id + ).first() + + if not config: + logger.error(f"配置不存在: config_id={config_id}") + raise ValueError(f"配置不存在: config_id={config_id}") + + # 提取情绪相关字段 + emotion_config = { + "config_id": config.config_id, + "emotion_enabled": config.emotion_enabled, + "emotion_model_id": config.emotion_model_id, + "emotion_extract_keywords": config.emotion_extract_keywords, + "emotion_min_intensity": config.emotion_min_intensity, + "emotion_enable_subject": config.emotion_enable_subject + } + + logger.info(f"情绪配置获取成功: config_id={config_id}") + return emotion_config + + except ValueError: + raise + except Exception as e: + logger.error(f"获取情绪配置失败: {str(e)}", exc_info=True) + raise + + def validate_emotion_config(self, config_data: Dict[str, Any]) -> bool: + """验证情绪配置参数 + + 验证配置参数的有效性,包括: + - emotion_min_intensity 在 [0.0, 1.0] 范围内 + - 布尔字段类型正确 + - emotion_model_id 格式有效(如果提供) + + Args: + config_data: 配置数据字典 + + Returns: + bool: 验证是否通过 + + Raises: + ValueError: 当配置参数无效时 + """ + try: + logger.debug(f"验证情绪配置参数: {config_data}") + + # 验证 emotion_min_intensity 范围 + if "emotion_min_intensity" in config_data: + min_intensity = config_data["emotion_min_intensity"] + if not isinstance(min_intensity, (int, float)): + raise ValueError("emotion_min_intensity 必须是数字类型") + if not (0.0 <= min_intensity <= 1.0): + raise ValueError("emotion_min_intensity 必须在 0.0 到 1.0 之间") + + # 验证布尔字段 + bool_fields = ["emotion_enabled", "emotion_extract_keywords", "emotion_enable_subject"] + for field in bool_fields: + if field in config_data: + value = config_data[field] + if not isinstance(value, bool): + raise ValueError(f"{field} 必须是布尔类型") + + # 验证 emotion_model_id(如果提供) + if "emotion_model_id" in config_data: + model_id = config_data["emotion_model_id"] + if model_id is not None and not isinstance(model_id, str): + raise ValueError("emotion_model_id 必须是字符串类型或 null") + if model_id is not None and len(model_id.strip()) == 0: + raise ValueError("emotion_model_id 不能为空字符串") + + logger.debug("情绪配置参数验证通过") + return True + + except ValueError as e: + logger.warning(f"配置参数验证失败: {str(e)}") + raise + except Exception as e: + logger.error(f"验证配置参数时发生错误: {str(e)}", exc_info=True) + raise ValueError(f"验证配置参数失败: {str(e)}") + + def update_emotion_config( + self, + config_id: int, + config_data: Dict[str, Any] + ) -> Dict[str, Any]: + """更新情绪引擎配置 + + 更新指定配置ID的情绪相关配置字段。 + + Args: + config_id: 配置ID + config_data: 要更新的配置数据,可包含以下字段: + - emotion_enabled: 是否启用情绪提取 + - emotion_model_id: 情绪分析专用模型ID + - emotion_extract_keywords: 是否提取情绪关键词 + - emotion_min_intensity: 最小情绪强度阈值 + - emotion_enable_subject: 是否启用主体分类 + + Returns: + Dict: 更新后的完整情绪配置 + + Raises: + ValueError: 当配置不存在或参数无效时 + """ + try: + logger.info(f"更新情绪配置: config_id={config_id}, data={config_data}") + + # 验证配置参数 + self.validate_emotion_config(config_data) + + # 查询配置 + config = self.db.query(DataConfig).filter( + DataConfig.config_id == config_id + ).first() + + if not config: + logger.error(f"配置不存在: config_id={config_id}") + raise ValueError(f"配置不存在: config_id={config_id}") + + # 更新字段 + if "emotion_enabled" in config_data: + config.emotion_enabled = config_data["emotion_enabled"] + if "emotion_model_id" in config_data: + config.emotion_model_id = config_data["emotion_model_id"] + if "emotion_extract_keywords" in config_data: + config.emotion_extract_keywords = config_data["emotion_extract_keywords"] + if "emotion_min_intensity" in config_data: + config.emotion_min_intensity = config_data["emotion_min_intensity"] + if "emotion_enable_subject" in config_data: + config.emotion_enable_subject = config_data["emotion_enable_subject"] + + # 提交更改 + self.db.commit() + self.db.refresh(config) + + # 返回更新后的配置 + updated_config = self.get_emotion_config(config_id) + + logger.info(f"情绪配置更新成功: config_id={config_id}") + return updated_config + + except ValueError: + self.db.rollback() + raise + except Exception as e: + self.db.rollback() + logger.error(f"更新情绪配置失败: {str(e)}", exc_info=True) + raise diff --git a/api/app/services/emotion_extraction_service.py b/api/app/services/emotion_extraction_service.py new file mode 100644 index 00000000..b3172df1 --- /dev/null +++ b/api/app/services/emotion_extraction_service.py @@ -0,0 +1,200 @@ +"""Emotion extraction service for analyzing emotions from statements. + +This service extracts emotion information from user statements using LLM, +including emotion type, intensity, keywords, subject classification, and target. + +Classes: + EmotionExtractionService: Service for extracting emotions from statements +""" + +import logging +from typing import Optional +from app.core.memory.models.emotion_models import EmotionExtraction +from app.models.data_config_model import DataConfig +from app.core.memory.utils.llm.llm_utils import get_llm_client +from app.core.memory.llm_tools.llm_client import LLMClientException + +logger = logging.getLogger(__name__) + + +class EmotionExtractionService: + """Service for extracting emotion information from statements. + + This service uses LLM to analyze statements and extract structured emotion + information including type, intensity, keywords, subject, and target. + It respects configuration settings for enabling/disabling extraction and + filtering by intensity threshold. + + Attributes: + llm_client: LLM client for making structured output calls + """ + + def __init__(self, llm_id: Optional[str] = None): + """Initialize the emotion extraction service. + + Args: + llm_id: Optional LLM model ID. If None, uses default from config. + """ + self.llm_client = None + self.llm_id = llm_id + logger.info(f"Initialized EmotionExtractionService with llm_id={llm_id}") + + def _get_llm_client(self, model_id: Optional[str] = None): + """Get or create LLM client instance. + + Args: + model_id: Optional model ID to use. If None, uses instance llm_id. + + Returns: + LLM client instance + """ + if self.llm_client is None or model_id: + effective_model_id = model_id or self.llm_id + self.llm_client = get_llm_client(effective_model_id) + return self.llm_client + + async def extract_emotion( + self, + statement: str, + config: DataConfig + ) -> Optional[EmotionExtraction]: + """Extract emotion information from a statement. + + This method checks if emotion extraction is enabled in the config, + builds an appropriate prompt, calls the LLM for structured output, + and applies intensity threshold filtering. + + Args: + statement: The statement text to analyze + config: Data configuration object containing emotion settings + + Returns: + EmotionExtraction object if extraction succeeds and passes threshold, + None if extraction is disabled, fails, or doesn't meet threshold + + Raises: + No exceptions are raised - failures are logged and return None + """ + # Check if emotion extraction is enabled + if not config.emotion_enabled: + logger.debug("Emotion extraction is disabled in config") + return None + + # Validate statement + if not statement or not statement.strip(): + logger.warning("Empty statement provided for emotion extraction") + return None + + try: + # Build the emotion extraction prompt + prompt = await self._build_emotion_prompt( + statement=statement, + extract_keywords=config.emotion_extract_keywords, + enable_subject=config.emotion_enable_subject + ) + + # Call LLM for structured output + emotion = await self._call_llm_structured( + prompt=prompt, + model_id=config.emotion_model_id + ) + + # Apply intensity threshold filtering + if emotion.emotion_intensity < config.emotion_min_intensity: + logger.debug( + f"Emotion intensity {emotion.emotion_intensity} below threshold " + f"{config.emotion_min_intensity}, skipping storage" + ) + return None + + logger.info( + f"Successfully extracted emotion: type={emotion.emotion_type}, " + f"intensity={emotion.emotion_intensity}, subject={emotion.emotion_subject}" + ) + + return emotion + + except Exception as e: + logger.error( + f"Emotion extraction failed for statement: {statement[:50]}..., " + f"error: {str(e)}", + exc_info=True + ) + return None + + async def _build_emotion_prompt( + self, + statement: str, + extract_keywords: bool, + enable_subject: bool + ) -> str: + """Build the emotion extraction prompt based on configuration. + + This method constructs a detailed prompt for the LLM that includes + instructions for emotion type classification, intensity assessment, + and optionally keyword extraction and subject classification. + + Args: + statement: The statement to analyze + extract_keywords: Whether to extract emotion keywords + enable_subject: Whether to enable subject classification + + Returns: + Formatted prompt string for LLM + """ + from app.core.memory.utils.prompt.prompt_utils import render_emotion_extraction_prompt + + prompt = await render_emotion_extraction_prompt( + statement=statement, + extract_keywords=extract_keywords, + enable_subject=enable_subject + ) + + return prompt + + async def _call_llm_structured( + self, + prompt: str, + model_id: Optional[str] = None + ) -> EmotionExtraction: + """Call LLM for structured emotion extraction output. + + This method uses the LLM client's response_structured method to get + a validated EmotionExtraction object from the LLM. + + Args: + prompt: The formatted prompt for emotion extraction + model_id: Optional model ID to use for this call + + Returns: + EmotionExtraction object with validated emotion data + + Raises: + LLMClientException: If LLM call fails or times out + ValidationError: If LLM response doesn't match expected schema + """ + try: + # Get LLM client + llm_client = self._get_llm_client(model_id) + + # Prepare messages + messages = [ + {"role": "user", "content": prompt} + ] + + # Call LLM with structured output + emotion = await llm_client.response_structured( + messages=messages, + response_model=EmotionExtraction, + temperature=0.3, + max_tokens=500 + ) + + return emotion + + except LLMClientException as e: + logger.error(f"LLM call failed: {str(e)}") + raise + except Exception as e: + logger.error(f"Unexpected error in LLM structured call: {str(e)}") + raise LLMClientException(f"Emotion extraction LLM call failed: {str(e)}")