diff --git a/api/app/controllers/user_memory_controllers.py b/api/app/controllers/user_memory_controllers.py index 32e0e239..a5378e4d 100644 --- a/api/app/controllers/user_memory_controllers.py +++ b/api/app/controllers/user_memory_controllers.py @@ -20,6 +20,11 @@ from app.services.user_memory_service import ( from app.services.memory_entity_relationship_service import MemoryEntityService,MemoryEmotion,MemoryInteraction from app.schemas.response_schema import ApiResponse from app.schemas.memory_storage_schema import GenerateCacheRequest +from app.schemas.user_memory_schema import ( + EpisodicMemoryOverviewRequest, + EpisodicMemoryDetailsRequest, +) + from app.schemas.end_user_schema import ( EndUserProfileResponse, EndUserProfileUpdate, @@ -433,3 +438,106 @@ async def memory_space_relationship_evolution(id: str, label: str, except Exception as e: api_logger.error(f"关系演变查询失败: id={id}, table={label}, error={str(e)}", exc_info=True) return fail(BizCode.INTERNAL_ERROR, "关系演变查询失败", str(e)) + + +@router.post("/classifications/episodic-memory", response_model=ApiResponse) +async def get_episodic_memory_overview_api( + request: EpisodicMemoryOverviewRequest, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +) -> dict: + """ + 获取情景记忆总览 + + 返回指定用户的所有情景记忆列表,包括标题和创建时间。 + 标题通过LLM自动生成。 + 支持通过时间范围、情景类型和标题关键词进行筛选。 + + """ + workspace_id = current_user.current_workspace_id + + # 检查用户是否已选择工作空间 + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆总览但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + # 验证参数 + valid_time_ranges = ["all", "today", "this_week", "this_month"] + valid_episodic_types = ["all", "conversation", "project_work", "learning", "decision", "important_event"] + + if request.time_range not in valid_time_ranges: + return fail(BizCode.INVALID_PARAMETER, f"无效的时间范围参数,可选值:{', '.join(valid_time_ranges)}") + + if request.episodic_type not in valid_episodic_types: + return fail(BizCode.INVALID_PARAMETER, f"无效的情景类型参数,可选值:{', '.join(valid_episodic_types)}") + + # 处理 title_keyword(去除首尾空格) + title_keyword = request.title_keyword.strip() if request.title_keyword else None + + api_logger.info( + f"情景记忆总览查询请求: end_user_id={request.end_user_id}, user={current_user.username}, " + f"workspace={workspace_id}, time_range={request.time_range}, episodic_type={request.episodic_type}, " + f"title_keyword={title_keyword}" + ) + + try: + # 调用Service层方法 + result = await user_memory_service.get_episodic_memory_overview( + db, request.end_user_id, request.time_range, request.episodic_type, title_keyword + ) + + api_logger.info( + f"成功获取情景记忆总览: end_user_id={request.end_user_id}, " + f"total={result['total']}" + ) + return success(data=result, msg="查询成功") + + except Exception as e: + api_logger.error(f"情景记忆总览查询失败: end_user_id={request.end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "情景记忆总览查询失败", str(e)) + + +@router.post("/classifications/episodic-memory-details", response_model=ApiResponse) +async def get_episodic_memory_details_api( + request: EpisodicMemoryDetailsRequest, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +) -> dict: + """ + 获取情景记忆详情 + + 返回指定情景记忆的详细信息,包括涉及对象、情景类型、内容记录和情绪。 + + """ + workspace_id = current_user.current_workspace_id + + # 检查用户是否已选择工作空间 + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆详情但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"情景记忆详情查询请求: end_user_id={request.end_user_id}, summary_id={request.summary_id}, " + f"user={current_user.username}, workspace={workspace_id}" + ) + + try: + # 调用Service层方法 + result = await user_memory_service.get_episodic_memory_details( + db=db, + end_user_id=request.end_user_id, + summary_id=request.summary_id + ) + + api_logger.info( + f"成功获取情景记忆详情: end_user_id={request.end_user_id}, summary_id={request.summary_id}" + ) + return success(data=result, msg="查询成功") + + except ValueError as e: + # 处理情景记忆不存在的情况 + api_logger.warning(f"情景记忆不存在: end_user_id={request.end_user_id}, summary_id={request.summary_id}, error={str(e)}") + return fail(BizCode.INVALID_PARAMETER, "情景记忆不存在", str(e)) + except Exception as e: + api_logger.error(f"情景记忆详情查询失败: end_user_id={request.end_user_id}, summary_id={request.summary_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "情景记忆详情查询失败", str(e)) diff --git a/api/app/core/memory/models/graph_models.py b/api/app/core/memory/models/graph_models.py index 6ec1ca8e..1254388b 100644 --- a/api/app/core/memory/models/graph_models.py +++ b/api/app/core/memory/models/graph_models.py @@ -474,6 +474,8 @@ class MemorySummaryNode(Node): dialog_id: ID of the parent dialog chunk_ids: List of chunk IDs used to generate this summary content: Summary text content + name: Title/name of the memory summary (generated by LLM, used as title in API) + memory_type: Type/category of the episodic memory (e.g., Conversation, Project/Work, Learning, Decision, Important Event) summary_embedding: Optional embedding vector for the summary metadata: Additional metadata for the summary config_id: Configuration ID used to process this summary @@ -492,6 +494,7 @@ class MemorySummaryNode(Node): dialog_id: str = Field(..., description="ID of the parent dialog") chunk_ids: List[str] = Field(default_factory=list, description="List of chunk IDs used in the summary") content: str = Field(..., description="Summary text content") + memory_type: Optional[str] = Field(None, description="Type/category of the episodic memory") summary_embedding: Optional[List[float]] = Field(None, description="Embedding vector for the summary") metadata: dict = Field(default_factory=dict, description="Additional metadata for the summary") config_id: Optional[int | str] = Field(None, description="Configuration ID used to process this summary (integer or string)") diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/memory_summary.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/memory_summary.py index 70c1ceb3..c72b9a1f 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/memory_summary.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/memory_summary.py @@ -59,13 +59,28 @@ async def _process_chunk_summary( ) summary_text = structured.summary.strip() + # Generate title and type for the summary + title = None + episodic_type = None + try: + from app.services.user_memory_service import UserMemoryService + title, episodic_type = await UserMemoryService.generate_title_and_type_for_summary( + content=summary_text, + end_user_id=dialog.group_id + ) + logger.info(f"Generated title and type for MemorySummary: title={title}, type={episodic_type}") + except Exception as e: + logger.warning(f"Failed to generate title and type for chunk {chunk.id}: {e}") + # Continue without title and type + # Embed the summary embedding = (await embedder.response([summary_text]))[0] # Build node per chunk + # Note: title is stored in the 'name' field, type is stored in 'memory_type' field node = MemorySummaryNode( id=uuid4().hex, - name=f"MemorySummaryChunk_{chunk.id}", + name=title if title else f"MemorySummaryChunk_{chunk.id}", group_id=dialog.group_id, user_id=dialog.user_id, apply_id=dialog.apply_id, @@ -75,6 +90,7 @@ async def _process_chunk_summary( dialog_id=dialog.id, chunk_ids=[chunk.id], content=summary_text, + memory_type=episodic_type, summary_embedding=embedding, metadata={"ref_id": dialog.ref_id}, config_id=dialog.config_id, # 添加 config_id diff --git a/api/app/core/memory/storage_services/forgetting_engine/forgetting_strategy.py b/api/app/core/memory/storage_services/forgetting_engine/forgetting_strategy.py index 5e1e35da..f1802166 100644 --- a/api/app/core/memory/storage_services/forgetting_engine/forgetting_strategy.py +++ b/api/app/core/memory/storage_services/forgetting_engine/forgetting_strategy.py @@ -247,6 +247,9 @@ class ForgettingStrategy: entity_activation = entity_node['entity_activation'] entity_importance = entity_node['entity_importance'] + # 获取 group_id(从 statement 或 entity 节点) + group_id = statement_node.get('group_id') or entity_node.get('group_id') + # 生成摘要内容 summary_text = await self._generate_summary( statement_text=statement_text, @@ -256,6 +259,19 @@ class ForgettingStrategy: db=db ) + # 生成标题和类型(使用LLM) + from app.services.user_memory_service import UserMemoryService + try: + title, episodic_type = await UserMemoryService.generate_title_and_type_for_summary( + content=summary_text, + end_user_id=group_id + ) + logger.info(f"成功为MemorySummary生成标题和类型: title={title}, type={episodic_type}") + except Exception as e: + logger.error(f"生成标题和类型失败,使用默认值: {str(e)}") + title = "未命名" + episodic_type = "其他" + # 计算继承的激活值和重要性(取较高值) inherited_activation = max(statement_activation, entity_activation) inherited_importance = max(statement_importance, entity_importance) @@ -268,9 +284,6 @@ class ForgettingStrategy: import uuid summary_id = f"summary_{uuid.uuid4().hex[:16]}" - # 获取 group_id(从 statement 或 entity 节点) - group_id = statement_node.get('group_id') or entity_node.get('group_id') - # 使用事务创建 MemorySummary 并删除原节点 async def merge_transaction(tx, **params): """事务函数:创建摘要节点并删除原节点""" @@ -287,6 +300,8 @@ class ForgettingStrategy: CREATE (ms:MemorySummary { id: $summary_id, summary: $summary_text, + name: $title, + memory_type: $episodic_type, original_statement_id: $statement_id, original_entity_id: $entity_id, activation_value: $inherited_activation, @@ -386,6 +401,8 @@ class ForgettingStrategy: params = { 'summary_id': summary_id, 'summary_text': summary_text, + 'title': title, + 'episodic_type': episodic_type, 'statement_id': statement_id, 'entity_id': entity_id, 'inherited_activation': inherited_activation, diff --git a/api/app/core/memory/utils/prompt/prompt_utils.py b/api/app/core/memory/utils/prompt/prompt_utils.py index 842f3c82..50593e49 100644 --- a/api/app/core/memory/utils/prompt/prompt_utils.py +++ b/api/app/core/memory/utils/prompt/prompt_utils.py @@ -386,3 +386,26 @@ async def render_memory_insight_prompt( }) return rendered_prompt + + +async def render_episodic_title_and_type_prompt(content: str) -> str: + """ + Renders the episodic title and type classification prompt using the episodic_type_classification.jinja2 template. + + Args: + content: The content of the episodic memory summary to analyze + + Returns: + Rendered prompt content as string + """ + template = prompt_env.get_template("episodic_type_classification.jinja2") + rendered_prompt = template.render(content=content) + + # 记录渲染结果到提示日志 + log_prompt_rendering('episodic title and type classification', rendered_prompt) + # 可选:记录模板渲染信息 + log_template_rendering('episodic_type_classification.jinja2', { + 'content_len': len(content) if content else 0 + }) + + return rendered_prompt diff --git a/api/app/core/memory/utils/prompt/prompts/episodic_type_classification.jinja2 b/api/app/core/memory/utils/prompt/prompts/episodic_type_classification.jinja2 new file mode 100644 index 00000000..fa382ec7 --- /dev/null +++ b/api/app/core/memory/utils/prompt/prompts/episodic_type_classification.jinja2 @@ -0,0 +1,57 @@ +=== Task === +Generate a concise title and classify the episodic memory into the most appropriate category. + +=== Requirements === +- Extract a clear, concise title (10-20 characters) that captures the core content +- Classify into exactly one category based on the primary theme +- Be specific and avoid ambiguity +- Output must be valid JSON conforming to the schema below + +=== Input === +{{ content }} + +=== Category Definitions === + +1. **conversation**: Daily communication, chat, discussion, and social interactions + - Keywords: chat, communication, discussion, dialogue, exchange + +2. **project_work**: Work-related tasks, projects, meetings, and collaboration + - Keywords: project, task, work, meeting, collaboration, business, client + +3. **learning**: Acquiring new knowledge, skill development, reading, and research + - Keywords: learning, reading, research, knowledge, skill, course, training + +4. **decision**: Making important decisions, choices, and planning + - Keywords: decision, choice, planning, consideration, evaluation, weighing + +5. **important_event**: Major events, milestones, and special experiences + - Keywords: important, major, milestone, special, memorable, celebration + +=== Analysis Steps === +1. Read the episodic memory content carefully +2. Identify the core theme and context +3. Extract a concise title +4. Compare against category definitions and keywords +5. Select the best matching category +6. If multiple categories apply, choose the primary one + +=== Output Schema === +**CRITICAL JSON FORMATTING REQUIREMENTS:** +1. Use only standard ASCII double quotes (") for JSON structure +2. Escape any quotation marks within string values using backslashes (\") +3. Ensure all JSON strings are properly closed and comma-separated +4. Do not include line breaks within JSON string values + +Return only a JSON object with title and type fields: +{ + "title": "Generated title here", + "type": "Category type here" +} + +The type field must be exactly one of: +- conversation +- project_work +- learning +- decision +- important_event + diff --git a/api/app/repositories/neo4j/add_nodes.py b/api/app/repositories/neo4j/add_nodes.py index 79466fa0..1e24eeae 100644 --- a/api/app/repositories/neo4j/add_nodes.py +++ b/api/app/repositories/neo4j/add_nodes.py @@ -211,6 +211,7 @@ async def add_memory_summary_nodes(summaries: List[MemorySummaryNode], connector "dialog_id": s.dialog_id, "chunk_ids": s.chunk_ids, "content": s.content, + "memory_type": s.memory_type, # 添加 memory_type 字段 "summary_embedding": s.summary_embedding if s.summary_embedding else None, "config_id": s.config_id, # 添加 config_id }) diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 3bbd6b01..8c86cc4d 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -721,6 +721,7 @@ SET m += { dialog_id: summary.dialog_id, chunk_ids: summary.chunk_ids, content: summary.content, + memory_type: summary.memory_type, summary_embedding: summary.summary_embedding, config_id: summary.config_id, importance_score: CASE WHEN summary.importance_score IS NOT NULL THEN summary.importance_score ELSE coalesce(m.importance_score, 0.5) END, diff --git a/api/app/schemas/user_memory_schema.py b/api/app/schemas/user_memory_schema.py new file mode 100644 index 00000000..27a458b6 --- /dev/null +++ b/api/app/schemas/user_memory_schema.py @@ -0,0 +1,30 @@ +""" +用户记忆相关的请求和响应模型 +""" +from pydantic import BaseModel, Field +from typing import Optional + + +class EpisodicMemoryOverviewRequest(BaseModel): + """情景记忆总览查询请求""" + + end_user_id: str = Field(..., description="终端用户ID") + time_range: str = Field( + default="all", + description="时间范围筛选,可选值:all, today, this_week, this_month" + ) + episodic_type: str = Field( + default="all", + description="情景类型筛选,可选值:all, conversation, project_work, learning, decision, important_event" + ) + title_keyword: Optional[str] = Field( + default=None, + description="标题关键词,用于模糊搜索(可选)" + ) + + +class EpisodicMemoryDetailsRequest(BaseModel): + """情景记忆详情查询请求""" + + end_user_id: str = Field(..., description="终端用户ID") + summary_id: str = Field(..., description="情景记忆摘要ID") diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index c8b4d98d..3f0da196 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -315,16 +315,12 @@ class UserSummaryHelper: # ============================================================================ -# ============================================================================ -# Service Class -# ============================================================================ - - class UserMemoryService: """用户记忆服务类""" def __init__(self): logger.info("UserMemoryService initialized") + self.neo4j_connector = Neo4jConnector() @staticmethod def _datetime_to_timestamp(dt: Optional[Any]) -> Optional[int]: @@ -887,6 +883,570 @@ class UserMemoryService: "failed": failed, "errors": errors + [{"error": f"批量处理失败: {str(e)}"}] } + + async def _get_title_and_type( + self, + summary_id: str, + end_user_id: str + ) -> Tuple[str, str]: + """ + 读取情景记忆的标题(title)和类型(type) + + 仅负责读取已存在的title和type,不进行生成 + title从name属性读取,type从memory_type属性读取 + + Args: + summary_id: Summary节点的ID + end_user_id: 终端用户ID (group_id) + + Returns: + (标题, 类型)元组,如果不存在则返回默认值 + """ + try: + # 查询Summary节点的name(作为title)和memory_type(作为type) + query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $summary_id AND s.group_id = $group_id + RETURN s.name AS title, s.memory_type AS type + """ + + result = await self.neo4j_connector.execute_query( + query, + summary_id=summary_id, + group_id=end_user_id + ) + + if not result or len(result) == 0: + logger.warning(f"未找到 summary_id={summary_id} 的节点") + return ("未知标题", "其他") + + record = result[0] + title = record.get("title") or "未命名" + episodic_type = record.get("type") or "其他" + + return (title, episodic_type) + + except Exception as e: + logger.error(f"读取标题和类型时出错: {str(e)}", exc_info=True) + return ("错误", "其他") + + @staticmethod + async def generate_title_and_type_for_summary( + content: str, + end_user_id: str + ) -> Tuple[str, str]: + """ + 为MemorySummary生成标题和类型(静态方法,用于创建节点时调用) + + 此方法应该在创建MemorySummary节点时调用,生成title和type + + Args: + content: Summary的内容文本 + end_user_id: 终端用户ID (group_id) + + Returns: + (标题, 类型)元组 + """ + from app.core.memory.utils.prompt.prompt_utils import render_episodic_title_and_type_prompt + import json + + # 定义有效的类型集合 + VALID_TYPES = { + "conversation", # 对话 + "project_work", # 项目/工作 + "learning", # 学习 + "decision", # 决策 + "important_event" # 重要事件 + } + DEFAULT_TYPE = "conversation" # 默认类型 + + try: + if not content: + logger.warning("content为空,无法生成标题和类型") + return ("空内容", DEFAULT_TYPE) + + # 1. 渲染Jinja2提示词模板 + prompt = await render_episodic_title_and_type_prompt(content) + + # 2. 调用LLM生成标题和类型 + llm_client = _get_llm_client_for_user(end_user_id) + messages = [ + {"role": "user", "content": prompt} + ] + + response = await llm_client.chat(messages=messages) + + # 3. 解析LLM响应 + content_response = response.content + if isinstance(content_response, list): + if len(content_response) > 0: + if isinstance(content_response[0], dict): + text = content_response[0].get('text', content_response[0].get('content', str(content_response[0]))) + full_response = str(text) + else: + full_response = str(content_response[0]) + else: + full_response = "" + elif isinstance(content_response, dict): + full_response = str(content_response.get('text', content_response.get('content', str(content_response)))) + else: + full_response = str(content_response) if content_response is not None else "" + + # 4. 解析JSON响应 + try: + # 尝试从响应中提取JSON + # 移除可能的markdown代码块标记 + json_str = full_response.strip() + if json_str.startswith("```json"): + json_str = json_str[7:] + if json_str.startswith("```"): + json_str = json_str[3:] + if json_str.endswith("```"): + json_str = json_str[:-3] + json_str = json_str.strip() + + result_data = json.loads(json_str) + title = result_data.get("title", "未知标题") + episodic_type_raw = result_data.get("type", DEFAULT_TYPE) + + # 5. 校验和归一化类型 + # 将类型转换为小写并去除空格 + episodic_type_normalized = str(episodic_type_raw).lower().strip() + + # 检查是否在有效类型集合中 + if episodic_type_normalized in VALID_TYPES: + episodic_type = episodic_type_normalized + else: + # 尝试映射常见的中文类型到英文 + type_mapping = { + "对话": "conversation", + "项目": "project_work", + "工作": "project_work", + "项目/工作": "project_work", + "学习": "learning", + "决策": "decision", + "重要事件": "important_event", + "事件": "important_event" + } + episodic_type = type_mapping.get(episodic_type_raw, DEFAULT_TYPE) + logger.warning( + f"LLM返回的类型 '{episodic_type_raw}' 不在有效集合中," + f"已归一化为 '{episodic_type}'" + ) + + logger.info(f"成功生成标题和类型: title={title}, type={episodic_type}") + return (title, episodic_type) + + except json.JSONDecodeError: + logger.error(f"无法解析LLM响应为JSON: {full_response}") + return ("解析失败", DEFAULT_TYPE) + + except Exception as e: + logger.error(f"生成标题和类型时出错: {str(e)}", exc_info=True) + return ("错误", DEFAULT_TYPE) + + async def _extract_involved_objects( + self, + summary_id: str, + end_user_id: str + ) -> List[str]: + """ + 提取情景记忆涉及的前3个最重要实体 + + Args: + summary_id: Summary节点的ID + end_user_id: 终端用户ID (group_id) + + Returns: + 前3个实体的name属性列表 + """ + try: + # 查询Summary节点指向的Statement节点,再查询Statement指向的ExtractedEntity节点 + # 按activation_value降序排序,返回前3个 + query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $summary_id AND s.group_id = $group_id + MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement) + MATCH (stmt)-[:REFERENCES_ENTITY]->(entity:ExtractedEntity) + WHERE entity.activation_value IS NOT NULL + RETURN DISTINCT entity.name AS name, entity.activation_value AS activation + ORDER BY activation DESC + LIMIT 3 + """ + + result = await self.neo4j_connector.execute_query( + query, + summary_id=summary_id, + group_id=end_user_id + ) + + # 提取实体名称 + involved_objects = [record["name"] for record in result if record.get("name")] + + logger.info(f"成功提取 summary_id={summary_id} 的涉及对象: {involved_objects}") + + return involved_objects + + except Exception as e: + logger.error(f"提取涉及对象时出错: {str(e)}", exc_info=True) + return [] + + async def _extract_content_records( + self, + summary_id: str, + end_user_id: str + ) -> List[str]: + """ + 提取情景记忆的内容记录 + + Args: + summary_id: Summary节点的ID + end_user_id: 终端用户ID (group_id) + + Returns: + 所有Statement节点的statement属性内容列表 + """ + try: + # 查询Summary节点指向的所有Statement节点 + query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $summary_id AND s.group_id = $group_id + MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement) + WHERE stmt.statement IS NOT NULL AND stmt.statement <> '' + RETURN stmt.statement AS statement + """ + + result = await self.neo4j_connector.execute_query( + query, + summary_id=summary_id, + group_id=end_user_id + ) + + # 提取statement内容 + content_records = [record["statement"] for record in result if record.get("statement")] + + logger.info(f"成功提取 summary_id={summary_id} 的内容记录,共 {len(content_records)} 条") + + return content_records + + except Exception as e: + logger.error(f"提取内容记录时出错: {str(e)}", exc_info=True) + return [] + + async def _extract_episodic_emotion( + self, + summary_id: str, + end_user_id: str + ) -> Optional[str]: + """ + 提取情景记忆的主要情绪 + + Args: + summary_id: Summary节点的ID + end_user_id: 终端用户ID (group_id) + + Returns: + 最大emotion_intensity对应的emotion_type,如果没有则返回None + """ + try: + # 查询Summary节点指向的所有Statement节点 + # 筛选具有emotion_type属性的节点 + # 按emotion_intensity降序排序,返回第一个 + query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $summary_id AND s.group_id = $group_id + MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement) + WHERE stmt.emotion_type IS NOT NULL + AND stmt.emotion_intensity IS NOT NULL + RETURN stmt.emotion_type AS emotion_type, + stmt.emotion_intensity AS emotion_intensity + ORDER BY emotion_intensity DESC + LIMIT 1 + """ + + result = await self.neo4j_connector.execute_query( + query, + summary_id=summary_id, + group_id=end_user_id + ) + + # 提取emotion_type + if result and len(result) > 0: + emotion_type = result[0].get("emotion_type") + logger.info(f"成功提取 summary_id={summary_id} 的情绪: {emotion_type}") + return emotion_type + else: + logger.info(f"summary_id={summary_id} 没有情绪信息") + return None + + except Exception as e: + logger.error(f"提取情景记忆情绪时出错: {str(e)}", exc_info=True) + return None + + async def get_episodic_memory_overview( + self, + db: Session, + end_user_id: str, + time_range: str = "all", + episodic_type: str = "all", + title_keyword: Optional[str] = None + ) -> Dict[str, Any]: + """ + 获取情景记忆总览信息 + + Args: + db: 数据库会话 + end_user_id: 终端用户ID + time_range: 时间范围筛选 + episodic_type: 情景类型筛选 + title_keyword: 标题关键词(可选,用于模糊搜索) + """ + try: + logger.info( + f"开始查询 end_user_id={end_user_id} 的情景记忆总览, " + f"time_range={time_range}, episodic_type={episodic_type}, title_keyword={title_keyword}" + ) + + # 1. 先查询所有情景记忆的总数(不受筛选条件限制) + total_all_query = """ + MATCH (s:MemorySummary) + WHERE s.group_id = $group_id + RETURN count(s) AS total_all + """ + total_all_result = await self.neo4j_connector.execute_query( + total_all_query, + group_id=end_user_id + ) + total_all = total_all_result[0]["total_all"] if total_all_result else 0 + + # 2. 计算时间范围的起始时间戳 + time_filter = self._calculate_time_filter(time_range) + + # 3. 构建Cypher查询 + query = """ + MATCH (s:MemorySummary) + WHERE s.group_id = $group_id + """ + + # 添加时间范围过滤 + if time_filter: + query += " AND s.created_at >= $time_filter" + + # 添加标题关键词过滤(如果提供了title_keyword) + if title_keyword: + query += " AND toLower(s.name) CONTAINS toLower($title_keyword)" + + query += """ + RETURN elementId(s) AS id, + s.created_at AS created_at, + s.memory_type AS type, + s.name AS title + ORDER BY s.created_at DESC + """ + + params = {"group_id": end_user_id} + if time_filter: + params["time_filter"] = time_filter + if title_keyword: + params["title_keyword"] = title_keyword + + result = await self.neo4j_connector.execute_query(query, **params) + + # 4. 如果没有数据,返回空列表 + if not result: + logger.info(f"end_user_id={end_user_id} 没有情景记忆数据") + return { + "total": 0, + "total_all": total_all, + "episodic_memories": [] + } + + # 5. 对每个节点读取标题和类型,并应用类型筛选 + episodic_memories = [] + for record in result: + summary_id = record["id"] + created_at_str = record.get("created_at") + memory_type = record.get("type", "其他") + title = record.get("title") or "未命名" # 直接从查询结果获取标题 + + # 应用情景类型筛选 + if episodic_type != "all": + # 检查类型是否匹配 + # 注意:Neo4j 中存储的 memory_type 现在应该是英文格式(如 "conversation", "project_work") + # 但为了兼容旧数据,我们也支持中文格式的匹配 + type_mapping = { + "conversation": "对话", + "project_work": "项目/工作", + "learning": "学习", + "decision": "决策", + "important_event": "重要事件" + } + + # 获取对应的中文类型(用于兼容旧数据) + chinese_type = type_mapping.get(episodic_type) + + # 检查类型是否匹配(支持新的英文格式和旧的中文格式) + if memory_type != episodic_type and memory_type != chinese_type: + continue + + # 转换时间戳 + created_at_timestamp = None + if created_at_str: + try: + from datetime import datetime + dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) + created_at_timestamp = int(dt_object.timestamp() * 1000) + except (ValueError, TypeError, AttributeError) as e: + logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}") + + episodic_memories.append({ + "id": summary_id, + "title": title, + "type": memory_type, + "created_at": created_at_timestamp + }) + + logger.info( + f"成功获取 end_user_id={end_user_id} 的情景记忆总览," + f"筛选后 {len(episodic_memories)} 条,总共 {total_all} 条" + ) + + return { + "total": len(episodic_memories), + "total_all": total_all, + "episodic_memories": episodic_memories + } + + except Exception as e: + logger.error(f"获取情景记忆总览时出错: {str(e)}", exc_info=True) + raise + + def _calculate_time_filter(self, time_range: str) -> Optional[str]: + """ + 根据时间范围计算过滤的起始时间 + + Args: + time_range: 时间范围 (all/today/this_week/this_month) + + Returns: + ISO格式的时间字符串,如果是"all"则返回None + """ + from datetime import datetime, timedelta + import pytz + + if time_range == "all": + return None + + # 获取当前时间(UTC) + now = datetime.now(pytz.UTC) + + if time_range == "today": + # 今天的开始时间(00:00:00) + start_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + elif time_range == "this_week": + # 本周的开始时间(周一00:00:00) + days_since_monday = now.weekday() + start_time = (now - timedelta(days=days_since_monday)).replace( + hour=0, minute=0, second=0, microsecond=0 + ) + elif time_range == "this_month": + # 本月的开始时间(1号00:00:00) + start_time = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + else: + return None + + # 返回ISO格式字符串 + return start_time.isoformat() + + async def get_episodic_memory_details( + self, + db: Session, + end_user_id: str, + summary_id: str + ) -> Dict[str, Any]: + """ + 获取单个情景记忆的详细信息 + + """ + try: + logger.info(f"开始查询 end_user_id={end_user_id}, summary_id={summary_id} 的情景记忆详情") + + # 1. 查询指定的MemorySummary节点 + query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $summary_id AND s.group_id = $group_id + RETURN elementId(s) AS id, s.created_at AS created_at + """ + + result = await self.neo4j_connector.execute_query( + query, + summary_id=summary_id, + group_id=end_user_id + ) + + # 2. 如果节点不存在,返回错误 + if not result or len(result) == 0: + logger.warning(f"未找到 summary_id={summary_id} 的情景记忆") + raise ValueError(f"情景记忆不存在: summary_id={summary_id}") + + # 3. 获取基本信息 + record = result[0] + created_at_str = record.get("created_at") + + # 转换时间戳 + created_at_timestamp = None + if created_at_str: + try: + from datetime import datetime + dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) + created_at_timestamp = int(dt_object.timestamp() * 1000) + except (ValueError, TypeError, AttributeError) as e: + logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}") + + # 4. 调用_get_title_and_type读取标题和类型 + title, episodic_type = await self._get_title_and_type( + summary_id=summary_id, + end_user_id=end_user_id + ) + + # 5. 调用_extract_involved_objects提取涉及对象 + involved_objects = await self._extract_involved_objects( + summary_id=summary_id, + end_user_id=end_user_id + ) + + # 6. 调用_extract_content_records提取内容记录 + content_records = await self._extract_content_records( + summary_id=summary_id, + end_user_id=end_user_id + ) + + # 7. 调用_extract_episodic_emotion提取情绪 + emotion = await self._extract_episodic_emotion( + summary_id=summary_id, + end_user_id=end_user_id + ) + + # 8. 返回完整的详情信息 + details = { + "id": summary_id, + "created_at": created_at_timestamp, + "involved_objects": involved_objects, + "episodic_type": episodic_type, + "content_records": content_records, + "emotion": emotion + } + + logger.info(f"成功获取 summary_id={summary_id} 的情景记忆详情") + + return details + + except ValueError as e: + # 重新抛出ValueError,让Controller层处理 + raise + except Exception as e: + logger.error(f"获取情景记忆详情时出错: {str(e)}", exc_info=True) + raise # 独立的分析函数