diff --git a/api/app/controllers/memory_short_term_controller.py b/api/app/controllers/memory_short_term_controller.py index f21a00b6..64991f4d 100644 --- a/api/app/controllers/memory_short_term_controller.py +++ b/api/app/controllers/memory_short_term_controller.py @@ -40,5 +40,4 @@ async def short_term_configs( "long_term_number":len(long_result) } - return success(data=result, msg="短期记忆系统数据获取成功") - + return success(data=result, msg="短期记忆系统数据获取成功") \ No newline at end of file diff --git a/api/app/controllers/user_memory_controllers.py b/api/app/controllers/user_memory_controllers.py index da12cbf6..2f7667c1 100644 --- a/api/app/controllers/user_memory_controllers.py +++ b/api/app/controllers/user_memory_controllers.py @@ -17,6 +17,7 @@ from app.services.user_memory_service import ( analytics_memory_types, analytics_graph_data, ) +from app.services.memory_entity_relationship_service import MemoryEntityService,MemoryEmotion,MemoryInteraction from app.schemas.response_schema import ApiResponse from app.schemas.memory_storage_schema import GenerateCacheRequest from app.schemas.end_user_schema import ( @@ -47,7 +48,7 @@ async def get_memory_insight_report_api( ) -> dict: """ 获取缓存的记忆洞察报告 - + 此接口仅查询数据库中已缓存的记忆洞察数据,不执行生成操作。 如需生成新的洞察报告,请使用专门的生成接口。 """ @@ -55,7 +56,7 @@ async def get_memory_insight_report_api( try: # 调用服务层获取缓存数据 result = await user_memory_service.get_cached_memory_insight(db, end_user_id) - + if result["is_cached"]: api_logger.info(f"成功返回缓存的记忆洞察报告: end_user_id={end_user_id}") return success(data=result, msg="查询成功") @@ -75,7 +76,7 @@ async def get_user_summary_api( ) -> dict: """ 获取缓存的用户摘要 - + 此接口仅查询数据库中已缓存的用户摘要数据,不执行生成操作。 如需生成新的用户摘要,请使用专门的生成接口。 """ @@ -83,7 +84,7 @@ async def get_user_summary_api( try: # 调用服务层获取缓存数据 result = await user_memory_service.get_cached_user_summary(db, end_user_id) - + if result["is_cached"]: api_logger.info(f"成功返回缓存的用户摘要: end_user_id={end_user_id}") return success(data=result, msg="查询成功") @@ -103,35 +104,35 @@ async def generate_cache_api( ) -> dict: """ 手动触发缓存生成 - + - 如果提供 end_user_id,只为该用户生成 - 如果不提供,为当前工作空间的所有用户生成 """ workspace_id = current_user.current_workspace_id - + # 检查用户是否已选择工作空间 if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试生成缓存但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") - + group_id = request.end_user_id - + api_logger.info( f"缓存生成请求: user={current_user.username}, workspace={workspace_id}, " f"end_user_id={group_id if group_id else '全部用户'}" ) - + try: if group_id: # 为单个用户生成 api_logger.info(f"开始为单个用户生成缓存: end_user_id={group_id}") - + # 生成记忆洞察 insight_result = await user_memory_service.generate_and_cache_insight(db, group_id, workspace_id) - + # 生成用户摘要 summary_result = await user_memory_service.generate_and_cache_summary(db, group_id, workspace_id) - + # 构建响应 result = { "end_user_id": group_id, @@ -139,7 +140,7 @@ async def generate_cache_api( "summary_success": summary_result["success"], "errors": [] } - + # 收集错误信息 if not insight_result["success"]: result["errors"].append({ @@ -151,29 +152,29 @@ async def generate_cache_api( "type": "summary", "error": summary_result.get("error") }) - + # 记录结果 if result["insight_success"] and result["summary_success"]: api_logger.info(f"成功为用户 {group_id} 生成缓存") else: api_logger.warning(f"用户 {group_id} 的缓存生成部分失败: {result['errors']}") - + return success(data=result, msg="生成完成") - + else: # 为整个工作空间生成 api_logger.info(f"开始为工作空间 {workspace_id} 批量生成缓存") - + result = await user_memory_service.generate_cache_for_workspace(db, workspace_id) - + # 记录统计信息 api_logger.info( f"工作空间 {workspace_id} 批量生成完成: " f"总数={result['total_users']}, 成功={result['successful']}, 失败={result['failed']}" ) - + return success(data=result, msg="批量生成完成") - + except Exception as e: api_logger.error(f"缓存生成失败: user={current_user.username}, error={str(e)}") return fail(BizCode.INTERNAL_ERROR, "缓存生成失败", str(e)) @@ -186,18 +187,18 @@ async def get_node_statistics_api( db: Session = Depends(get_db), ) -> dict: workspace_id = current_user.current_workspace_id - + # 检查用户是否已选择工作空间 if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试查询节点统计但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") - + api_logger.info(f"记忆类型统计请求: end_user_id={end_user_id}, user={current_user.username}, workspace={workspace_id}") - + try: # 调用新的记忆类型统计函数 result = await analytics_memory_types(db, end_user_id) - + # 计算总数用于日志 total_count = sum(item["count"] for item in result) api_logger.info(f"成功获取记忆类型统计: end_user_id={end_user_id}, 总记忆数={total_count}, 类型数={len(result)}") @@ -217,31 +218,31 @@ async def get_graph_data_api( db: Session = Depends(get_db), ) -> dict: workspace_id = current_user.current_workspace_id - + # 检查用户是否已选择工作空间 if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试查询图数据但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") - + # 参数验证 if limit > 1000: limit = 1000 api_logger.warning("limit 参数超过最大值,已调整为 1000") - + if depth > 3: depth = 3 api_logger.warning("depth 参数超过最大值,已调整为 3") - + # 解析 node_types 参数 node_types_list = None if node_types: node_types_list = [t.strip() for t in node_types.split(",") if t.strip()] - + api_logger.info( f"图数据查询请求: end_user_id={end_user_id}, user={current_user.username}, " f"workspace={workspace_id}, node_types={node_types_list}, limit={limit}, depth={depth}" ) - + try: result = await analytics_graph_data( db=db, @@ -251,19 +252,19 @@ async def get_graph_data_api( depth=depth, center_node_id=center_node_id ) - + # 检查是否有错误消息 if "message" in result and result["statistics"]["total_nodes"] == 0: api_logger.warning(f"图数据查询返回空结果: {result.get('message')}") return success(data=result, msg=result.get("message", "查询成功")) - + api_logger.info( f"成功获取图数据: end_user_id={end_user_id}, " f"nodes={result['statistics']['total_nodes']}, " f"edges={result['statistics']['total_edges']}" ) return success(data=result, msg="查询成功") - + except Exception as e: api_logger.error(f"图数据查询失败: end_user_id={end_user_id}, error={str(e)}") return fail(BizCode.INTERNAL_ERROR, "图数据查询失败", str(e)) @@ -276,25 +277,25 @@ async def get_end_user_profile( db: Session = Depends(get_db), ) -> dict: workspace_id = current_user.current_workspace_id - + # 检查用户是否已选择工作空间 if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试查询用户信息但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") - + api_logger.info( f"用户信息查询请求: end_user_id={end_user_id}, user={current_user.username}, " f"workspace={workspace_id}" ) - + try: # 查询终端用户 end_user = db.query(EndUser).filter(EndUser.id == end_user_id).first() - + if not end_user: api_logger.warning(f"终端用户不存在: end_user_id={end_user_id}") return fail(BizCode.INVALID_PARAMETER, "终端用户不存在", f"end_user_id={end_user_id}") - + # 构建响应数据 profile_data = EndUserProfileResponse( id=end_user.id, @@ -306,10 +307,10 @@ async def get_end_user_profile( hire_date=end_user.hire_date, updatetime_profile=end_user.updatetime_profile ) - + api_logger.info(f"成功获取用户信息: end_user_id={end_user_id}") return success(data=UserMemoryService.convert_profile_to_dict_with_timestamp(profile_data), msg="查询成功") - + except Exception as e: api_logger.error(f"用户信息查询失败: end_user_id={end_user_id}, error={str(e)}") return fail(BizCode.INTERNAL_ERROR, "用户信息查询失败", str(e)) @@ -323,56 +324,56 @@ async def update_end_user_profile( ) -> dict: """ 更新终端用户的基本信息 - + 该接口可以更新用户的姓名、职位、部门、联系方式、电话和入职日期等信息。 所有字段都是可选的,只更新提供的字段。 - + """ workspace_id = current_user.current_workspace_id end_user_id = profile_update.end_user_id - + # 检查用户是否已选择工作空间 if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试更新用户信息但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") - + api_logger.info( f"用户信息更新请求: end_user_id={end_user_id}, user={current_user.username}, " f"workspace={workspace_id}" ) - + try: # 查询终端用户 end_user = db.query(EndUser).filter(EndUser.id == end_user_id).first() - + if not end_user: api_logger.warning(f"终端用户不存在: end_user_id={end_user_id}") return fail(BizCode.INVALID_PARAMETER, "终端用户不存在", f"end_user_id={end_user_id}") - + # 更新字段(只更新提供的字段,排除 end_user_id) # 允许 None 值来重置字段(如 hire_date) update_data = profile_update.model_dump(exclude_unset=True, exclude={'end_user_id'}) - + # 特殊处理 hire_date:如果提供了时间戳,转换为 DateTime if 'hire_date' in update_data: hire_date_timestamp = update_data['hire_date'] if hire_date_timestamp is not None: update_data['hire_date'] = timestamp_to_datetime(hire_date_timestamp) # 如果是 None,保持 None(允许清空) - + for field, value in update_data.items(): setattr(end_user, field, value) - + # 更新 updated_at 时间戳 end_user.updated_at = datetime.datetime.now() - + # 更新 updatetime_profile 为当前时间 end_user.updatetime_profile = datetime.datetime.now() - + # 提交更改 db.commit() db.refresh(end_user) - + # 构建响应数据 profile_data = EndUserProfileResponse( id=end_user.id, @@ -384,11 +385,50 @@ async def update_end_user_profile( hire_date=end_user.hire_date, updatetime_profile=end_user.updatetime_profile ) - + api_logger.info(f"成功更新用户信息: end_user_id={end_user_id}, updated_fields={list(update_data.keys())}") return success(data=UserMemoryService.convert_profile_to_dict_with_timestamp(profile_data), msg="更新成功") - + except Exception as e: db.rollback() api_logger.error(f"用户信息更新失败: end_user_id={end_user_id}, error={str(e)}") return fail(BizCode.INTERNAL_ERROR, "用户信息更新失败", str(e)) +@router.get("/memory_space/timeline_memories", response_model=ApiResponse) +async def memory_space_timeline_of_shared_memories(id: str, label: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), + ): + MemoryEntity = MemoryEntityService(id, label) + timeline_memories_result = await MemoryEntity.get_timeline_memories_server() + return success(data=timeline_memories_result, msg="共同记忆时间线") +@router.get("/memory_space/relationship_evolution", response_model=ApiResponse) +async def memory_space_relationship_evolution(id: str, label: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), + ): + try: + api_logger.info(f"关系演变查询请求: id={id}, table={label}, user={current_user.username}") + + # 获取情绪数据 + emotion = MemoryEmotion(id, label) + emotion_result = await emotion.get_emotion() + + # 获取交互数据 + interaction = MemoryInteraction(id, label) + interaction_result = await interaction.get_interaction_frequency() + + # 关闭连接 + await emotion.close() + await interaction.close() + + result = { + "emotion": emotion_result, + "interaction": interaction_result + } + + api_logger.info(f"关系演变查询成功: id={id}, table={label}") + return success(data=result, msg="关系演变") + + except Exception as e: + api_logger.error(f"关系演变查询失败: id={id}, table={label}, error={str(e)}", exc_info=True) + return fail(BizCode.INTERNAL_ERROR, "关系演变查询失败", str(e)) diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index ed8f6100..127ee78c 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -862,3 +862,120 @@ neo4j_query_all = """ """ +'''针对当前节点下扩长的句子,实体和总结''' +Memory_Timeline_ExtractedEntity=""" +MATCH (n)-[r1]-(e)-[r2]-(ms) +WHERE elementId(n) =$id +AND (ms:ExtractedEntity OR ms:MemorySummary) +RETURN + collect(DISTINCT coalesce(ms.name, n.name, e.name)) AS ExtractedEntity, + collect(DISTINCT ms.content) AS MemorySummary, + collect(DISTINCT e.statement) AS statement; +""" +Memory_Timeline_MemorySummary=""" +MATCH (n)-[r1]-(e)-[r2]-(ms) +WHERE elementId(n) = $id + AND (ms:MemorySummary OR ms:ExtractedEntity) +RETURN + collect(DISTINCT coalesce(ms.name, n.name, e.name)) AS ExtractedEntity, + collect(DISTINCT ms.content) AS MemorySummary, + collect(DISTINCT e.statement) AS statement;""" +Memory_Timeline_Statement=""" +MATCH (n) +WHERE elementId(n) = "4:f6039a9b-d553-4ba2-9b1c-d9a18917801f:77154" + +CALL { + WITH n + MATCH (n)-[]-(m) + WHERE m:ExtractedEntity + AND NOT m:MemorySummary + AND NOT m:Chunk + RETURN collect(DISTINCT m.name) AS ExtractedEntity +} + +CALL { + WITH n + MATCH (n)-[]-(m) + WHERE m:MemorySummary + AND NOT m:Chunk + RETURN collect(DISTINCT m.content) AS MemorySummary +} + +RETURN + ExtractedEntity, + MemorySummary, + collect(DISTINCT n.statement) AS Statement; + +""" + +'''针对当前节点,主要获取更加完整的句子节点''' +Memory_Space_Emotion_Statement=""" +MATCH (n) +WHERE elementId(n) = $id +RETURN + n.emotion_intensity AS emotion_intensity, + n.created_at AS created_at, + n.emotion_type AS emotion_type, + n.statement AS statement; + +""" +Memory_Space_Emotion_MemorySummary=""" +MATCH (n)-[]-(e) +WHERE elementId(n) = "4:f6039a9b-d553-4ba2-9b1c-d9a18917801f:77019" + AND EXISTS { + MATCH (e)-[]-(ms) + WHERE ms:MemorySummary OR ms:ExtractedEntity + } +RETURN DISTINCT + e.emotion_intensity AS emotion_intensity, + e.created_at AS created_at, + e.emotion_type AS emotion_type, + e.statement AS statement; +""" +Memory_Space_Emotion_ExtractedEntity=""" +MATCH (n)-[]-(e) +WHERE elementId(n) = $id + AND EXISTS { + MATCH (e)-[]-(ms:ExtractedEntity) + } +RETURN DISTINCT + e.emotion_intensity AS emotion_intensity, + e.created_at AS created_at, + e.emotion_type AS emotion_type, + e.statement AS statement; +""" + +'''获取实体''' +Memory_Space_Interaction_Statement=""" +MATCH (n)-[]-(m) +WHERE elementId(n) = $id + AND m.entity_type = "Person" +RETURN + m.name AS name, + m.importance_score AS importance_score; + +""" + +Memory_Space_Interaction_ExtractedEntity=""" +MATCH (n)-[]-(e) +WHERE elementId(n) = $id + AND EXISTS { + MATCH (e)-[]-(ms:ExtractedEntity) + } +RETURN DISTINCT + e.name AS name, + e.importance_score AS importance_score; + +""" + +Memory_Space_Interaction_Summary=""" +MATCH (n)-[]-(e) +WHERE elementId(n) = $id + AND EXISTS { + MATCH (e)-[]-(ms:ExtractedEntity) + } +RETURN DISTINCT + e.name AS name, + e.importance_score AS importance_score; + +""" \ No newline at end of file diff --git a/api/app/services/memory_entity_relationship_service.py b/api/app/services/memory_entity_relationship_service.py new file mode 100644 index 00000000..9f0f6032 --- /dev/null +++ b/api/app/services/memory_entity_relationship_service.py @@ -0,0 +1,464 @@ + +from app.repositories.neo4j.cypher_queries import ( +Memory_Timeline_ExtractedEntity, +Memory_Timeline_MemorySummary, +Memory_Timeline_Statement, +Memory_Space_Emotion_Statement, +Memory_Space_Emotion_MemorySummary, +Memory_Space_Emotion_ExtractedEntity, +Memory_Space_Interaction_Statement, +Memory_Space_Interaction_ExtractedEntity, +Memory_Space_Interaction_Summary +) +from app.repositories.neo4j.neo4j_connector import Neo4jConnector +from typing import Dict, List, Any, Optional +import logging +from neo4j.time import DateTime as Neo4jDateTime +import json +from datetime import datetime + +logger = logging.getLogger(__name__) + +class MemoryEntityService: + def __init__(self, id: str, table: str): + self.id = id + self.table = table + self.connector = Neo4jConnector() + + + + async def get_timeline_memories_server(self): + """ + 获取时间线记忆数据 + + Args: + id: 节点ID + table: 节点类型/标签 + + Returns: + Dict包含: + - success: 是否成功 + - data: 时间线数据列表 + - total: 数据总数 + - error: 错误信息(如果有) + + 根据不同标签返回相应字段: + - MemorySummary: content字段 + - Statement: statement字段 + - ExtractedEntity: name字段 + """ + try: + logger.info(f"获取时间线记忆数据 - ID: {self.id}, Table: {self.table}") + + # 根据表类型选择查询 + if self.table == 'Statement': + # Statement只需要输入ID,使用简化查询 + results = await self.connector.execute_query(Memory_Timeline_Statement, id=self.id) + elif self.table == 'ExtractedEntity': + # ExtractedEntity类型查询 + results = await self.connector.execute_query(Memory_Timeline_ExtractedEntity, id=self.id) + else: + # MemorySummary类型查询 + results = await self.connector.execute_query(Memory_Timeline_MemorySummary, id=self.id) + + # 记录查询结果的类型和内容用于调试 + logger.info(f"时间线查询结果类型: {type(results)}, 长度: {len(results) if isinstance(results, list) else 'N/A'}") + + # 处理查询结果 + timeline_data = self._process_timeline_results(results) + + logger.info(f"成功获取时间线记忆数据: 总计 {len(timeline_data.get('timelines_memory', []))} 条") + + return { + 'success': True, + 'data': timeline_data, + } + + except Exception as e: + logger.error(f"获取时间线记忆数据失败: {str(e)}", exc_info=True) + return { + 'success': False, + 'error': str(e), + 'data': { + "MemorySummary": [], + "Statement": [], + "ExtractedEntity": [], + "timelines_memory": [] + }, + 'total': 0 + } + def _process_timeline_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + 处理时间线查询结果 + + Args: + results: Neo4j查询结果 + + Returns: + 处理后的时间线数据字典 + """ + # 检查results是否为空或不是列表 + if not results or not isinstance(results, list): + logger.warning(f"时间线查询结果为空或格式不正确: {type(results)}") + return { + "MemorySummary": [], + "Statement": [], + "ExtractedEntity": [], + "timelines_memory": [] + } + + memory_summary_list = [] + statement_list = [] + extracted_entity_list = [] + + for data in results: + # 检查data是否为字典类型 + if not isinstance(data, dict): + logger.warning(f"跳过非字典类型的记录: {type(data)} - {data}") + continue + + # 处理MemorySummary + summary = data.get('MemorySummary') + if summary is not None: + processed_summary = self._process_field_value(summary, "MemorySummary") + memory_summary_list.extend(processed_summary) + + # 处理Statement + statement = data.get('statement') + if statement is not None: + processed_statement = self._process_field_value(statement, "Statement") + statement_list.extend(processed_statement) + + # 处理ExtractedEntity + extracted_entity = data.get('ExtractedEntity') + if extracted_entity is not None: + processed_entity = self._process_field_value(extracted_entity, "ExtractedEntity") + extracted_entity_list.extend(processed_entity) + + # 去重 + memory_summary_list = list(set(memory_summary_list)) + statement_list = list(set(statement_list)) + extracted_entity_list = list(set(extracted_entity_list)) + + # 合并所有数据 + all_timeline_data = memory_summary_list + statement_list + extracted_entity_list + + result = { + "MemorySummary": memory_summary_list, + "Statement": statement_list, + "ExtractedEntity": extracted_entity_list, + "timelines_memory": all_timeline_data + } + + logger.info(f"时间线数据处理完成: MemorySummary={len(memory_summary_list)}, Statement={len(statement_list)}, ExtractedEntity={len(extracted_entity_list)}") + + return result + + def _process_field_value(self, value: Any, field_name: str) -> List[str]: + """ + 处理字段值,支持字符串、列表等类型 + + Args: + value: 字段值 + field_name: 字段名称(用于日志) + + Returns: + 处理后的字符串列表 + """ + processed_values = [] + + try: + if isinstance(value, list): + # 如果是列表,处理每个元素 + for item in value: + if item is not None and str(item).strip() != '' and "MemorySummaryChunk" not in str(item): + processed_values.append(str(item)) + elif isinstance(value, str): + # 如果是字符串,直接处理 + if value.strip() != '' and "MemorySummaryChunk" not in value: + processed_values.append(value) + elif value is not None: + # 其他类型转换为字符串 + str_value = str(value) + if str_value.strip() != '' and "MemorySummaryChunk" not in str_value: + processed_values.append(str_value) + except Exception as e: + logger.warning(f"处理字段 {field_name} 的值时出错: {e}, 值类型: {type(value)}, 值: {value}") + + return processed_values + + + + + async def close(self): + """关闭数据库连接""" + await self.connector.close() + + + +class MemoryEmotion: + def __init__(self, id: str, table: str): + self.id = id + self.table = table + self.connector = Neo4jConnector() + + def _convert_neo4j_types(self, obj: Any) -> Any: + """ + 递归转换Neo4j特殊类型为可序列化的Python类型 + """ + if isinstance(obj, Neo4jDateTime): + # 转换为用户友好的日期格式 + return self._format_datetime(obj.iso_format()) + elif hasattr(obj, '__class__') and 'neo4j' in str(obj.__class__): + if hasattr(obj, 'iso_format'): + return self._format_datetime(obj.iso_format()) + elif hasattr(obj, '__str__'): + return str(obj) + else: + return repr(obj) + elif isinstance(obj, dict): + return {k: self._convert_neo4j_types(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [self._convert_neo4j_types(item) for item in obj] + elif isinstance(obj, tuple): + return tuple(self._convert_neo4j_types(item) for item in obj) + else: + return obj + + def _format_datetime(self, iso_string: str) -> str: + """ + 将ISO格式的日期时间字符串转换为用户友好的格式 + + Args: + iso_string: ISO格式的日期时间字符串,如 "2026-01-07T13:40:33.679530" + + Returns: + 格式化后的日期时间字符串,如 "2026-01-07 13:40:33" + """ + try: + # 解析ISO格式的日期时间 + dt = datetime.fromisoformat(iso_string.replace('Z', '+00:00')) + # 返回用户友好的格式:YYYY-MM-DD HH:MM:SS + return dt.strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, AttributeError): + # 如果解析失败,返回原始字符串 + return iso_string + + async def get_emotion(self) -> Dict[str, Any]: + """ + 获取情绪随时间变化数据 + + Returns: + 包含情绪数据的字典 + """ + try: + logger.info(f"获取情绪数据 - ID: {self.id}, Table: {self.table}") + + if self.table == 'Statement': + results = await self.connector.execute_query(Memory_Space_Emotion_Statement, id=self.id) + elif self.table == 'ExtractedEntity': + results = await self.connector.execute_query(Memory_Space_Emotion_ExtractedEntity, id=self.id) + else: + # MemorySummary/Chunk类型查询 + results = await self.connector.execute_query(Memory_Space_Emotion_MemorySummary, id=self.id) + + # 处理查询结果 + emotion_data = self._process_emotion_results(results) + + # 转换Neo4j类型 + final_data = self._convert_neo4j_types(emotion_data) + + logger.info(f"成功获取 {len(final_data)} 条情绪数据") + + return { + 'success': True, + 'data': final_data, + 'total': len(final_data) + } + + except Exception as e: + logger.error(f"获取情绪数据失败: {str(e)}") + return { + 'success': False, + 'error': str(e), + 'data': [], + 'total': 0 + } + + def _process_emotion_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + 处理情绪查询结果 + + Args: + results: Neo4j查询结果 + + Returns: + 处理后的情绪数据列表 + """ + emotion_data = [] + + # 检查results是否为空或不是列表 + if not results or not isinstance(results, list): + logger.warning(f"情绪查询结果为空或格式不正确: {type(results)}") + return emotion_data + + for record in results: + # 检查record是否为字典类型 + if not isinstance(record, dict): + logger.warning(f"跳过非字典类型的记录: {type(record)} - {record}") + continue + + # 获取创建时间并格式化 + created_at = record.get('created_at') + formatted_created_at = created_at + + # 如果created_at是字符串格式,尝试格式化 + if isinstance(created_at, str): + formatted_created_at = self._format_datetime(created_at) + + emotion_type = record.get('emotion_type') + emotion_intensity = record.get('emotion_intensity') + + if emotion_type is not None and emotion_intensity is not None: + # 只保留情绪相关的字段 + emotion_record = { + 'emotion_intensity': emotion_intensity, + 'emotion_type': emotion_type, + 'created_at': formatted_created_at + } + emotion_data.append(emotion_record) + + return emotion_data + + async def close(self): + """关闭数据库连接""" + await self.connector.close() + + +class MemoryInteraction: + def __init__(self, id: str, table: str): + self.id = id + self.table = table + self.connector = Neo4jConnector() + + def _convert_neo4j_types(self, obj: Any) -> Any: + """ + 递归转换Neo4j特殊类型为可序列化的Python类型 + """ + if isinstance(obj, Neo4jDateTime): + # 转换为用户友好的日期格式 + return self._format_datetime(obj.iso_format()) + elif hasattr(obj, '__class__') and 'neo4j' in str(obj.__class__): + if hasattr(obj, 'iso_format'): + return self._format_datetime(obj.iso_format()) + elif hasattr(obj, '__str__'): + return str(obj) + else: + return repr(obj) + elif isinstance(obj, dict): + return {k: self._convert_neo4j_types(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [self._convert_neo4j_types(item) for item in obj] + elif isinstance(obj, tuple): + return tuple(self._convert_neo4j_types(item) for item in obj) + else: + return obj + + def _format_datetime(self, iso_string: str) -> str: + """ + 将ISO格式的日期时间字符串转换为用户友好的格式 + + Args: + iso_string: ISO格式的日期时间字符串,如 "2026-01-07T13:40:33.679530" + + Returns: + 格式化后的日期时间字符串,如 "2026-01-07 13:40:33" + """ + try: + # 解析ISO格式的日期时间 + dt = datetime.fromisoformat(iso_string.replace('Z', '+00:00')) + # 返回用户友好的格式:YYYY-MM-DD HH:MM:SS + return dt.strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, AttributeError): + # 如果解析失败,返回原始字符串 + return iso_string + + async def get_interaction_frequency(self) -> Dict[str, Any]: + """ + 获取交互频率数据 + + Returns: + 包含交互数据的字典 + """ + try: + logger.info(f"获取交互数据 - ID: {self.id}, Table: {self.table}") + + if self.table == 'Statement': + results = await self.connector.execute_query(Memory_Space_Interaction_Statement, id=self.id) + elif self.table == 'ExtractedEntity': + results = await self.connector.execute_query(Memory_Space_Interaction_ExtractedEntity, id=self.id) + else: + # MemorySummary/Chunk类型查询 + results = await self.connector.execute_query(Memory_Space_Interaction_Summary, id=self.id) + + # 处理查询结果 + interaction_data = self._process_interaction_results(results) + + # 转换Neo4j类型 + final_data = self._convert_neo4j_types(interaction_data) + + logger.info(f"成功获取 {len(final_data)} 条交互数据") + + return { + 'success': True, + 'data': final_data, + 'total': len(final_data) + } + + except Exception as e: + logger.error(f"获取交互数据失败: {str(e)}") + return { + 'success': False, + 'error': str(e), + 'data': [], + 'total': 0 + } + + def _process_interaction_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + 处理交互查询结果 + + Args: + results: Neo4j查询结果 + + Returns: + 处理后的交互数据列表 + """ + interaction_data = [] + + # 检查results是否为空或不是列表 + if not results or not isinstance(results, list): + logger.warning(f"交互查询结果为空或格式不正确: {type(results)}") + return interaction_data + + for record in results: + # 检查record是否为字典类型 + if not isinstance(record, dict): + logger.warning(f"跳过非字典类型的记录: {type(record)} - {record}") + continue + + # 只保留交互相关的字段 + name = record.get('name') + if name is not None: + interaction_record = { + 'name': name, + 'importance_score': record.get('importance_score', 0.0), + 'interaction_count': record.get('interaction_count', 1) # 默认交互次数为1 + } + interaction_data.append(interaction_record) + + return interaction_data + + async def close(self): + """关闭数据库连接""" + await self.connector.close() diff --git a/api/app/services/memory_short_service.py b/api/app/services/memory_short_service.py index ac9f86e0..fa3870f0 100644 --- a/api/app/services/memory_short_service.py +++ b/api/app/services/memory_short_service.py @@ -27,7 +27,7 @@ class ShortService: for item in retrieved_content: if isinstance(item, dict): for key, values in item.items(): - retrieval_source.append({"query": key, "retrieval": values}) + retrieval_source.append({"query": key, "retrieval": values,"source":"上下文记忆"}) deep_expanded['retrieval'] = retrieval_source deep_expanded['message'] = messages # 修正拼写错误