diff --git a/api/app/controllers/__init__.py b/api/app/controllers/__init__.py index a45c701f..2fabbcc8 100644 --- a/api/app/controllers/__init__.py +++ b/api/app/controllers/__init__.py @@ -20,6 +20,8 @@ from . import ( knowledgeshare_controller, memory_agent_controller, memory_dashboard_controller, + memory_episodic_controller, + memory_explicit_controller, memory_forget_controller, memory_reflection_controller, memory_short_term_controller, @@ -67,6 +69,8 @@ manager_router.include_router(memory_agent_controller.router) manager_router.include_router(memory_dashboard_controller.router) manager_router.include_router(memory_storage_controller.router) manager_router.include_router(user_memory_controllers.router) +manager_router.include_router(memory_episodic_controller.router) +manager_router.include_router(memory_explicit_controller.router) manager_router.include_router(api_key_controller.router) manager_router.include_router(release_share_controller.router) manager_router.include_router(public_share_controller.router) # 公开路由(无需认证) diff --git a/api/app/controllers/memory_episodic_controller.py b/api/app/controllers/memory_episodic_controller.py new file mode 100644 index 00000000..331adfd3 --- /dev/null +++ b/api/app/controllers/memory_episodic_controller.py @@ -0,0 +1,125 @@ +""" +情景记忆相关的控制器 +包含情景记忆总览和详情查询接口 +""" + +from fastapi import APIRouter, Depends + +from app.core.error_codes import BizCode +from app.core.logging_config import get_api_logger +from app.core.response_utils import fail, success +from app.dependencies import get_current_user +from app.models.user_model import User +from app.schemas.response_schema import ApiResponse +from app.schemas.memory_episodic_schema import ( + EpisodicMemoryOverviewRequest, + EpisodicMemoryDetailsRequest, +) +from app.services.memory_episodic_service import memory_episodic_service + +# Get API logger +api_logger = get_api_logger() + +router = APIRouter( + prefix="/memory/episodic-memory", + tags=["Episodic Memory"], +) + + +@router.post("/overview", response_model=ApiResponse) +async def get_episodic_memory_overview_api( + request: EpisodicMemoryOverviewRequest, + current_user: User = Depends(get_current_user), +) -> dict: + """ + 获取情景记忆总览 + + 返回指定用户的所有情景记忆列表,包括标题和创建时间。 + 支持通过时间范围、情景类型和标题关键词进行筛选。 + + """ + workspace_id = current_user.current_workspace_id + + # 检查用户是否已选择工作空间 + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆总览但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + # 验证参数 + valid_time_ranges = ["all", "today", "this_week", "this_month"] + valid_episodic_types = ["all", "conversation", "project_work", "learning", "decision", "important_event"] + + if request.time_range not in valid_time_ranges: + return fail(BizCode.INVALID_PARAMETER, f"无效的时间范围参数,可选值:{', '.join(valid_time_ranges)}") + + if request.episodic_type not in valid_episodic_types: + return fail(BizCode.INVALID_PARAMETER, f"无效的情景类型参数,可选值:{', '.join(valid_episodic_types)}") + + # 处理 title_keyword(去除首尾空格) + title_keyword = request.title_keyword.strip() if request.title_keyword else None + + api_logger.info( + f"情景记忆总览查询请求: end_user_id={request.end_user_id}, user={current_user.username}, " + f"workspace={workspace_id}, time_range={request.time_range}, episodic_type={request.episodic_type}, " + f"title_keyword={title_keyword}" + ) + + try: + # 调用Service层方法 + result = await memory_episodic_service.get_episodic_memory_overview( + request.end_user_id, request.time_range, request.episodic_type, title_keyword + ) + + api_logger.info( + f"成功获取情景记忆总览: end_user_id={request.end_user_id}, " + f"total={result['total']}" + ) + return success(data=result, msg="查询成功") + + except Exception as e: + api_logger.error(f"情景记忆总览查询失败: end_user_id={request.end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "情景记忆总览查询失败", str(e)) + + +@router.post("/details", response_model=ApiResponse) +async def get_episodic_memory_details_api( + request: EpisodicMemoryDetailsRequest, + current_user: User = Depends(get_current_user), +) -> dict: + """ + 获取情景记忆详情 + + 返回指定情景记忆的详细信息,包括涉及对象、情景类型、内容记录和情绪。 + + """ + workspace_id = current_user.current_workspace_id + + # 检查用户是否已选择工作空间 + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆详情但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"情景记忆详情查询请求: end_user_id={request.end_user_id}, summary_id={request.summary_id}, " + f"user={current_user.username}, workspace={workspace_id}" + ) + + try: + # 调用Service层方法 + result = await memory_episodic_service.get_episodic_memory_details( + end_user_id=request.end_user_id, + summary_id=request.summary_id + ) + + api_logger.info( + f"成功获取情景记忆详情: end_user_id={request.end_user_id}, summary_id={request.summary_id}" + ) + return success(data=result, msg="查询成功") + + except ValueError as e: + # 处理情景记忆不存在的情况 + api_logger.warning(f"情景记忆不存在: end_user_id={request.end_user_id}, summary_id={request.summary_id}, error={str(e)}") + return fail(BizCode.INVALID_PARAMETER, "情景记忆不存在", str(e)) + except Exception as e: + api_logger.error(f"情景记忆详情查询失败: end_user_id={request.end_user_id}, summary_id={request.summary_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "情景记忆详情查询失败", str(e)) diff --git a/api/app/controllers/memory_explicit_controller.py b/api/app/controllers/memory_explicit_controller.py new file mode 100644 index 00000000..c52f308c --- /dev/null +++ b/api/app/controllers/memory_explicit_controller.py @@ -0,0 +1,115 @@ +""" +显性记忆控制器 + +处理显性记忆相关的API接口,包括情景记忆和语义记忆的查询。 +""" + +from fastapi import APIRouter, Depends + +from app.core.logging_config import get_api_logger +from app.core.response_utils import success, fail +from app.core.error_codes import BizCode +from app.services.memory_explicit_service import MemoryExplicitService +from app.schemas.response_schema import ApiResponse +from app.schemas.memory_explicit_schema import ( + ExplicitMemoryOverviewRequest, + ExplicitMemoryDetailsRequest, +) +from app.dependencies import get_current_user +from app.models.user_model import User + +# Get API logger +api_logger = get_api_logger() + +# Initialize service +memory_explicit_service = MemoryExplicitService() + +router = APIRouter( + prefix="/memory/explicit-memory", + tags=["Explicit Memory"], +) + + +@router.post("/overview", response_model=ApiResponse) +async def get_explicit_memory_overview_api( + request: ExplicitMemoryOverviewRequest, + current_user: User = Depends(get_current_user), +) -> dict: + """ + 获取显性记忆总览 + + 返回指定用户的所有显性记忆列表,包括标题、完整内容、创建时间和情绪信息。 + """ + workspace_id = current_user.current_workspace_id + + # 检查用户是否已选择工作空间 + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询显性记忆总览但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"显性记忆总览查询请求: end_user_id={request.end_user_id}, user={current_user.username}, " + f"workspace={workspace_id}" + ) + + try: + # 调用Service层方法 + result = await memory_explicit_service.get_explicit_memory_overview( + request.end_user_id + ) + + api_logger.info( + f"成功获取显性记忆总览: end_user_id={request.end_user_id}, " + f"total={result['total']}" + ) + return success(data=result, msg="查询成功") + + except Exception as e: + api_logger.error(f"显性记忆总览查询失败: end_user_id={request.end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "显性记忆总览查询失败", str(e)) + + +@router.post("/details", response_model=ApiResponse) +async def get_explicit_memory_details_api( + request: ExplicitMemoryDetailsRequest, + current_user: User = Depends(get_current_user), +) -> dict: + """ + 获取显性记忆详情 + + 根据 memory_id 返回情景记忆或语义记忆的详细信息。 + - 情景记忆:包括标题、内容、情绪、创建时间 + - 语义记忆:包括名称、核心定义、详细笔记、创建时间 + """ + workspace_id = current_user.current_workspace_id + + # 检查用户是否已选择工作空间 + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询显性记忆详情但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"显性记忆详情查询请求: end_user_id={request.end_user_id}, memory_id={request.memory_id}, " + f"user={current_user.username}, workspace={workspace_id}" + ) + + try: + # 调用Service层方法 + result = await memory_explicit_service.get_explicit_memory_details( + end_user_id=request.end_user_id, + memory_id=request.memory_id + ) + + api_logger.info( + f"成功获取显性记忆详情: end_user_id={request.end_user_id}, memory_id={request.memory_id}, " + f"memory_type={result.get('memory_type')}" + ) + return success(data=result, msg="查询成功") + + except ValueError as e: + # 处理记忆不存在的情况 + api_logger.warning(f"显性记忆不存在: end_user_id={request.end_user_id}, memory_id={request.memory_id}, error={str(e)}") + return fail(BizCode.INVALID_PARAMETER, "显性记忆不存在", str(e)) + except Exception as e: + api_logger.error(f"显性记忆详情查询失败: end_user_id={request.end_user_id}, memory_id={request.memory_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "显性记忆详情查询失败", str(e)) diff --git a/api/app/controllers/user_memory_controllers.py b/api/app/controllers/user_memory_controllers.py index 5fd9b841..a96c7a52 100644 --- a/api/app/controllers/user_memory_controllers.py +++ b/api/app/controllers/user_memory_controllers.py @@ -20,12 +20,6 @@ from app.services.user_memory_service import ( from app.services.memory_entity_relationship_service import MemoryEntityService,MemoryEmotion,MemoryInteraction from app.schemas.response_schema import ApiResponse from app.schemas.memory_storage_schema import GenerateCacheRequest -from app.schemas.user_memory_schema import ( - EpisodicMemoryOverviewRequest, - EpisodicMemoryDetailsRequest, - ExplicitMemoryOverviewRequest, - ExplicitMemoryDetailsRequest, -) from app.schemas.end_user_schema import ( EndUserProfileResponse, @@ -440,195 +434,3 @@ async def memory_space_relationship_evolution(id: str, label: str, except Exception as e: api_logger.error(f"关系演变查询失败: id={id}, table={label}, error={str(e)}", exc_info=True) return fail(BizCode.INTERNAL_ERROR, "关系演变查询失败", str(e)) - - -@router.post("/classifications/episodic-memory", response_model=ApiResponse) -async def get_episodic_memory_overview_api( - request: EpisodicMemoryOverviewRequest, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db), -) -> dict: - """ - 获取情景记忆总览 - - 返回指定用户的所有情景记忆列表,包括标题和创建时间。 - 支持通过时间范围、情景类型和标题关键词进行筛选。 - - """ - workspace_id = current_user.current_workspace_id - - # 检查用户是否已选择工作空间 - if workspace_id is None: - api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆总览但未选择工作空间") - return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") - - # 验证参数 - valid_time_ranges = ["all", "today", "this_week", "this_month"] - valid_episodic_types = ["all", "conversation", "project_work", "learning", "decision", "important_event"] - - if request.time_range not in valid_time_ranges: - return fail(BizCode.INVALID_PARAMETER, f"无效的时间范围参数,可选值:{', '.join(valid_time_ranges)}") - - if request.episodic_type not in valid_episodic_types: - return fail(BizCode.INVALID_PARAMETER, f"无效的情景类型参数,可选值:{', '.join(valid_episodic_types)}") - - # 处理 title_keyword(去除首尾空格) - title_keyword = request.title_keyword.strip() if request.title_keyword else None - - api_logger.info( - f"情景记忆总览查询请求: end_user_id={request.end_user_id}, user={current_user.username}, " - f"workspace={workspace_id}, time_range={request.time_range}, episodic_type={request.episodic_type}, " - f"title_keyword={title_keyword}" - ) - - try: - # 调用Service层方法 - result = await user_memory_service.get_episodic_memory_overview( - db, request.end_user_id, request.time_range, request.episodic_type, title_keyword - ) - - api_logger.info( - f"成功获取情景记忆总览: end_user_id={request.end_user_id}, " - f"total={result['total']}" - ) - return success(data=result, msg="查询成功") - - except Exception as e: - api_logger.error(f"情景记忆总览查询失败: end_user_id={request.end_user_id}, error={str(e)}") - return fail(BizCode.INTERNAL_ERROR, "情景记忆总览查询失败", str(e)) - - -@router.post("/classifications/episodic-memory-details", response_model=ApiResponse) -async def get_episodic_memory_details_api( - request: EpisodicMemoryDetailsRequest, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db), -) -> dict: - """ - 获取情景记忆详情 - - 返回指定情景记忆的详细信息,包括涉及对象、情景类型、内容记录和情绪。 - - """ - workspace_id = current_user.current_workspace_id - - # 检查用户是否已选择工作空间 - if workspace_id is None: - api_logger.warning(f"用户 {current_user.username} 尝试查询情景记忆详情但未选择工作空间") - return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") - - api_logger.info( - f"情景记忆详情查询请求: end_user_id={request.end_user_id}, summary_id={request.summary_id}, " - f"user={current_user.username}, workspace={workspace_id}" - ) - - try: - # 调用Service层方法 - result = await user_memory_service.get_episodic_memory_details( - db=db, - end_user_id=request.end_user_id, - summary_id=request.summary_id - ) - - api_logger.info( - f"成功获取情景记忆详情: end_user_id={request.end_user_id}, summary_id={request.summary_id}" - ) - return success(data=result, msg="查询成功") - - except ValueError as e: - # 处理情景记忆不存在的情况 - api_logger.warning(f"情景记忆不存在: end_user_id={request.end_user_id}, summary_id={request.summary_id}, error={str(e)}") - return fail(BizCode.INVALID_PARAMETER, "情景记忆不存在", str(e)) - except Exception as e: - api_logger.error(f"情景记忆详情查询失败: end_user_id={request.end_user_id}, summary_id={request.summary_id}, error={str(e)}") - return fail(BizCode.INTERNAL_ERROR, "情景记忆详情查询失败", str(e)) - - -@router.post("/classifications/explicit-memory", response_model=ApiResponse) -async def get_explicit_memory_overview_api( - request: ExplicitMemoryOverviewRequest, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db), -) -> dict: - """ - 获取显性记忆总览 - - 返回指定用户的所有显性记忆列表,包括标题、完整内容、创建时间和情绪信息。 - """ - workspace_id = current_user.current_workspace_id - - # 检查用户是否已选择工作空间 - if workspace_id is None: - api_logger.warning(f"用户 {current_user.username} 尝试查询显性记忆总览但未选择工作空间") - return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") - - api_logger.info( - f"显性记忆总览查询请求: end_user_id={request.end_user_id}, user={current_user.username}, " - f"workspace={workspace_id}" - ) - - try: - # 调用Service层方法 - result = await user_memory_service.get_explicit_memory_overview( - db, request.end_user_id - ) - - api_logger.info( - f"成功获取显性记忆总览: end_user_id={request.end_user_id}, " - f"total={result['total']}" - ) - return success(data=result, msg="查询成功") - - except Exception as e: - api_logger.error(f"显性记忆总览查询失败: end_user_id={request.end_user_id}, error={str(e)}") - return fail(BizCode.INTERNAL_ERROR, "显性记忆总览查询失败", str(e)) - - -@router.post("/classifications/explicit-memory-details", response_model=ApiResponse) -async def get_explicit_memory_details_api( - request: ExplicitMemoryDetailsRequest, - current_user: User = Depends(get_current_user), - db: Session = Depends(get_db), -) -> dict: - """ - 获取显性记忆详情 - - 根据 memory_id 返回情景记忆或语义记忆的详细信息。 - - 情景记忆:包括标题、内容、情绪、创建时间 - - 语义记忆:包括名称、核心定义、详细笔记、创建时间 - """ - workspace_id = current_user.current_workspace_id - - # 检查用户是否已选择工作空间 - if workspace_id is None: - api_logger.warning(f"用户 {current_user.username} 尝试查询显性记忆详情但未选择工作空间") - return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") - - api_logger.info( - f"显性记忆详情查询请求: end_user_id={request.end_user_id}, memory_id={request.memory_id}, " - f"user={current_user.username}, workspace={workspace_id}" - ) - - try: - # 调用Service层方法 - result = await user_memory_service.get_explicit_memory_details( - db=db, - end_user_id=request.end_user_id, - memory_id=request.memory_id - ) - - api_logger.info( - f"成功获取显性记忆详情: end_user_id={request.end_user_id}, memory_id={request.memory_id}, " - f"memory_type={result.get('memory_type')}" - ) - return success(data=result, msg="查询成功") - - except ValueError as e: - # 处理记忆不存在的情况 - api_logger.warning(f"显性记忆不存在: end_user_id={request.end_user_id}, memory_id={request.memory_id}, error={str(e)}") - return fail(BizCode.INVALID_PARAMETER, "显性记忆不存在", str(e)) - except Exception as e: - api_logger.error(f"显性记忆详情查询失败: end_user_id={request.end_user_id}, memory_id={request.memory_id}, error={str(e)}") - return fail(BizCode.INTERNAL_ERROR, "显性记忆详情查询失败", str(e)) - - diff --git a/api/app/schemas/user_memory_schema.py b/api/app/schemas/memory_episodic_schema.py similarity index 67% rename from api/app/schemas/user_memory_schema.py rename to api/app/schemas/memory_episodic_schema.py index 796ad72f..7b3f3d2d 100644 --- a/api/app/schemas/user_memory_schema.py +++ b/api/app/schemas/memory_episodic_schema.py @@ -1,5 +1,5 @@ """ -用户记忆相关的请求和响应模型 +情景记忆的请求和响应模型 """ from pydantic import BaseModel, Field from typing import Optional @@ -28,16 +28,3 @@ class EpisodicMemoryDetailsRequest(BaseModel): end_user_id: str = Field(..., description="终端用户ID") summary_id: str = Field(..., description="情景记忆摘要ID") - - -class ExplicitMemoryOverviewRequest(BaseModel): - """显性记忆总览查询请求""" - - end_user_id: str = Field(..., description="终端用户ID") - - -class ExplicitMemoryDetailsRequest(BaseModel): - """显性记忆详情查询请求""" - - end_user_id: str = Field(..., description="终端用户ID") - memory_id: str = Field(..., description="记忆ID(情景记忆或语义记忆的ID)") diff --git a/api/app/schemas/memory_explicit_schema.py b/api/app/schemas/memory_explicit_schema.py new file mode 100644 index 00000000..c2b51a81 --- /dev/null +++ b/api/app/schemas/memory_explicit_schema.py @@ -0,0 +1,15 @@ +""" +显性记忆的请求和响应模型 +""" +from pydantic import BaseModel, Field + +class ExplicitMemoryOverviewRequest(BaseModel): + """显性记忆总览查询请求""" + + end_user_id: str = Field(..., description="终端用户ID") + +class ExplicitMemoryDetailsRequest(BaseModel): + """显性记忆详情查询请求""" + + end_user_id: str = Field(..., description="终端用户ID") + memory_id: str = Field(..., description="记忆ID(情景记忆或语义记忆的ID)") diff --git a/api/app/services/memory_base_service.py b/api/app/services/memory_base_service.py new file mode 100644 index 00000000..8eae3c42 --- /dev/null +++ b/api/app/services/memory_base_service.py @@ -0,0 +1,111 @@ +""" +Memory Base Service + +提供记忆服务的基础功能和共享辅助方法。 +""" + +from datetime import datetime +from typing import Optional + +from app.core.logging_config import get_logger +from app.repositories.neo4j.neo4j_connector import Neo4jConnector + +logger = get_logger(__name__) + + +class MemoryBaseService: + """记忆服务基类,提供共享的辅助方法""" + + def __init__(self): + self.neo4j_connector = Neo4jConnector() + + @staticmethod + def parse_timestamp(timestamp_value) -> Optional[int]: + """ + 将时间戳转换为毫秒级时间戳 + + 支持多种输入格式: + - Neo4j DateTime 对象 + - ISO格式的时间戳字符串 + - Python datetime 对象 + + Args: + timestamp_value: 时间戳值(可以是多种类型) + + Returns: + 毫秒级时间戳,如果解析失败则返回None + """ + if not timestamp_value: + return None + + try: + # 处理 Neo4j DateTime 对象 + if hasattr(timestamp_value, 'to_native'): + dt_object = timestamp_value.to_native() + return int(dt_object.timestamp() * 1000) + + # 处理 Python datetime 对象 + if isinstance(timestamp_value, datetime): + return int(timestamp_value.timestamp() * 1000) + + # 处理字符串格式 + if isinstance(timestamp_value, str): + dt_object = datetime.fromisoformat(timestamp_value.replace("Z", "+00:00")) + return int(dt_object.timestamp() * 1000) + + # 其他情况尝试转换为字符串再解析 + dt_object = datetime.fromisoformat(str(timestamp_value).replace("Z", "+00:00")) + return int(dt_object.timestamp() * 1000) + + except (ValueError, TypeError, AttributeError) as e: + logger.warning(f"无法解析时间戳: {timestamp_value}, error={str(e)}") + return None + + async def extract_episodic_emotion( + self, + summary_id: str, + end_user_id: str + ) -> Optional[str]: + """ + 提取情景记忆的主要情绪 + + 查询MemorySummary节点关联的Statement节点, + 返回emotion_intensity最大的emotion_type。 + + Args: + summary_id: Summary节点的ID + end_user_id: 终端用户ID (group_id) + + Returns: + 最大emotion_intensity对应的emotion_type,如果没有则返回None + """ + try: + query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $summary_id AND s.group_id = $group_id + MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement) + WHERE stmt.emotion_type IS NOT NULL + AND stmt.emotion_intensity IS NOT NULL + RETURN stmt.emotion_type AS emotion_type, + stmt.emotion_intensity AS emotion_intensity + ORDER BY emotion_intensity DESC + LIMIT 1 + """ + + result = await self.neo4j_connector.execute_query( + query, + summary_id=summary_id, + group_id=end_user_id + ) + + if result and len(result) > 0: + emotion_type = result[0].get("emotion_type") + logger.info(f"成功提取 summary_id={summary_id} 的情绪: {emotion_type}") + return emotion_type + else: + logger.info(f"summary_id={summary_id} 没有情绪信息") + return None + + except Exception as e: + logger.error(f"提取情景记忆情绪时出错: {str(e)}", exc_info=True) + return None diff --git a/api/app/services/memory_episodic_service.py b/api/app/services/memory_episodic_service.py new file mode 100644 index 00000000..e8bb0bfc --- /dev/null +++ b/api/app/services/memory_episodic_service.py @@ -0,0 +1,405 @@ +""" +Episodic Memory Service + +处理情景记忆相关的业务逻辑,包括情景记忆总览、详情查询等。 +""" + +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional, Tuple + +import pytz +from app.core.logging_config import get_logger +from app.services.memory_base_service import MemoryBaseService + +logger = get_logger(__name__) + + +class MemoryEpisodicService(MemoryBaseService): + """情景记忆服务类""" + + def __init__(self): + super().__init__() + logger.info("MemoryEpisodicService initialized") + + async def _get_title_and_type( + self, + summary_id: str, + end_user_id: str + ) -> Tuple[str, str]: + """ + 读取情景记忆的标题(title)和类型(type) + + 仅负责读取已存在的title和type,不进行生成 + title从name属性读取,type从memory_type属性读取 + + Args: + summary_id: Summary节点的ID + end_user_id: 终端用户ID (group_id) + + Returns: + (标题, 类型)元组,如果不存在则返回默认值 + """ + try: + # 查询Summary节点的name(作为title)和memory_type(作为type) + query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $summary_id AND s.group_id = $group_id + RETURN s.name AS title, s.memory_type AS type + """ + + result = await self.neo4j_connector.execute_query( + query, + summary_id=summary_id, + group_id=end_user_id + ) + + if not result or len(result) == 0: + logger.warning(f"未找到 summary_id={summary_id} 的节点") + return ("未知标题", "其他") + + record = result[0] + title = record.get("title") or "未命名" + episodic_type = record.get("type") or "其他" + + return (title, episodic_type) + + except Exception as e: + logger.error(f"读取标题和类型时出错: {str(e)}", exc_info=True) + return ("错误", "其他") + + async def _extract_involved_objects( + self, + summary_id: str, + end_user_id: str + ) -> List[str]: + """ + 提取情景记忆涉及的前3个最重要实体 + + Args: + summary_id: Summary节点的ID + end_user_id: 终端用户ID (group_id) + + Returns: + 前3个实体的name属性列表 + """ + try: + # 查询Summary节点指向的Statement节点,再查询Statement指向的ExtractedEntity节点 + # 按activation_value降序排序,返回前3个 + query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $summary_id AND s.group_id = $group_id + MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement) + MATCH (stmt)-[:REFERENCES_ENTITY]->(entity:ExtractedEntity) + WHERE entity.activation_value IS NOT NULL + RETURN DISTINCT entity.name AS name, entity.activation_value AS activation + ORDER BY activation DESC + LIMIT 3 + """ + + result = await self.neo4j_connector.execute_query( + query, + summary_id=summary_id, + group_id=end_user_id + ) + + # 提取实体名称 + involved_objects = [record["name"] for record in result if record.get("name")] + + logger.info(f"成功提取 summary_id={summary_id} 的涉及对象: {involved_objects}") + + return involved_objects + + except Exception as e: + logger.error(f"提取涉及对象时出错: {str(e)}", exc_info=True) + return [] + + async def _extract_content_records( + self, + summary_id: str, + end_user_id: str + ) -> List[str]: + """ + 提取情景记忆的内容记录 + + Args: + summary_id: Summary节点的ID + end_user_id: 终端用户ID (group_id) + + Returns: + 所有Statement节点的statement属性内容列表 + """ + try: + # 查询Summary节点指向的所有Statement节点 + query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $summary_id AND s.group_id = $group_id + MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement) + WHERE stmt.statement IS NOT NULL AND stmt.statement <> '' + RETURN stmt.statement AS statement + """ + + result = await self.neo4j_connector.execute_query( + query, + summary_id=summary_id, + group_id=end_user_id + ) + + # 提取statement内容 + content_records = [record["statement"] for record in result if record.get("statement")] + + logger.info(f"成功提取 summary_id={summary_id} 的内容记录,共 {len(content_records)} 条") + + return content_records + + except Exception as e: + logger.error(f"提取内容记录时出错: {str(e)}", exc_info=True) + return [] + + def _calculate_time_filter(self, time_range: str) -> Optional[str]: + """ + 根据时间范围计算过滤的起始时间 + + Args: + time_range: 时间范围 (all/today/this_week/this_month) + + Returns: + ISO格式的时间字符串,如果是"all"则返回None + """ + if time_range == "all": + return None + + # 获取当前时间(UTC) + now = datetime.now(pytz.UTC) + + if time_range == "today": + # 今天的开始时间(00:00:00) + start_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + elif time_range == "this_week": + # 本周的开始时间(周一00:00:00) + days_since_monday = now.weekday() + start_time = (now - timedelta(days=days_since_monday)).replace( + hour=0, minute=0, second=0, microsecond=0 + ) + elif time_range == "this_month": + # 本月的开始时间(1号00:00:00) + start_time = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + else: + return None + + # 返回ISO格式字符串 + return start_time.isoformat() + + async def get_episodic_memory_overview( + self, + end_user_id: str, + time_range: str = "all", + episodic_type: str = "all", + title_keyword: Optional[str] = None + ) -> Dict[str, Any]: + """ + 获取情景记忆总览信息 + + Args: + end_user_id: 终端用户ID + time_range: 时间范围筛选 + episodic_type: 情景类型筛选 + title_keyword: 标题关键词(可选,用于模糊搜索) + """ + try: + logger.info( + f"开始查询 end_user_id={end_user_id} 的情景记忆总览, " + f"time_range={time_range}, episodic_type={episodic_type}, title_keyword={title_keyword}" + ) + + # 1. 先查询所有情景记忆的总数(不受筛选条件限制) + total_all_query = """ + MATCH (s:MemorySummary) + WHERE s.group_id = $group_id + RETURN count(s) AS total_all + """ + total_all_result = await self.neo4j_connector.execute_query( + total_all_query, + group_id=end_user_id + ) + total_all = total_all_result[0]["total_all"] if total_all_result else 0 + + # 2. 计算时间范围的起始时间戳 + time_filter = self._calculate_time_filter(time_range) + + # 3. 构建Cypher查询 + query = """ + MATCH (s:MemorySummary) + WHERE s.group_id = $group_id + """ + + # 添加时间范围过滤 + if time_filter: + query += " AND s.created_at >= $time_filter" + + # 添加标题关键词过滤(如果提供了title_keyword) + if title_keyword: + query += " AND toLower(s.name) CONTAINS toLower($title_keyword)" + + query += """ + RETURN elementId(s) AS id, + s.created_at AS created_at, + s.memory_type AS type, + s.name AS title + ORDER BY s.created_at DESC + """ + + params = {"group_id": end_user_id} + if time_filter: + params["time_filter"] = time_filter + if title_keyword: + params["title_keyword"] = title_keyword + + result = await self.neo4j_connector.execute_query(query, **params) + + # 4. 如果没有数据,返回空列表 + if not result: + logger.info(f"end_user_id={end_user_id} 没有情景记忆数据") + return { + "total": 0, + "total_all": total_all, + "episodic_memories": [] + } + + # 5. 对每个节点读取标题和类型,并应用类型筛选 + episodic_memories = [] + for record in result: + summary_id = record["id"] + created_at_str = record.get("created_at") + memory_type = record.get("type", "其他") + title = record.get("title") or "未命名" # 直接从查询结果获取标题 + + # 应用情景类型筛选 + if episodic_type != "all": + # 检查类型是否匹配 + # 注意:Neo4j 中存储的 memory_type 现在应该是英文格式(如 "conversation", "project_work") + # 但为了兼容旧数据,我们也支持中文格式的匹配 + type_mapping = { + "conversation": "对话", + "project_work": "项目/工作", + "learning": "学习", + "decision": "决策", + "important_event": "重要事件" + } + + # 获取对应的中文类型(用于兼容旧数据) + chinese_type = type_mapping.get(episodic_type) + + # 检查类型是否匹配(支持新的英文格式和旧的中文格式) + if memory_type != episodic_type and memory_type != chinese_type: + continue + + # 使用基类方法转换时间戳 + created_at_timestamp = self.parse_timestamp(created_at_str) + + episodic_memories.append({ + "id": summary_id, + "title": title, + "type": memory_type, + "created_at": created_at_timestamp + }) + + logger.info( + f"成功获取 end_user_id={end_user_id} 的情景记忆总览," + f"筛选后 {len(episodic_memories)} 条,总共 {total_all} 条" + ) + + return { + "total": len(episodic_memories), + "total_all": total_all, + "episodic_memories": episodic_memories + } + + except Exception as e: + logger.error(f"获取情景记忆总览时出错: {str(e)}", exc_info=True) + raise + + async def get_episodic_memory_details( + self, + end_user_id: str, + summary_id: str + ) -> Dict[str, Any]: + """ + 获取单个情景记忆的详细信息 + + """ + try: + logger.info(f"开始查询 end_user_id={end_user_id}, summary_id={summary_id} 的情景记忆详情") + + # 1. 查询指定的MemorySummary节点 + query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $summary_id AND s.group_id = $group_id + RETURN elementId(s) AS id, s.created_at AS created_at + """ + + result = await self.neo4j_connector.execute_query( + query, + summary_id=summary_id, + group_id=end_user_id + ) + + # 2. 如果节点不存在,返回错误 + if not result or len(result) == 0: + logger.warning(f"未找到 summary_id={summary_id} 的情景记忆") + raise ValueError(f"情景记忆不存在: summary_id={summary_id}") + + # 3. 获取基本信息 + record = result[0] + created_at_str = record.get("created_at") + + # 使用基类方法转换时间戳 + created_at_timestamp = self.parse_timestamp(created_at_str) + + # 4. 调用_get_title_and_type读取标题和类型 + title, episodic_type = await self._get_title_and_type( + summary_id=summary_id, + end_user_id=end_user_id + ) + + # 5. 调用_extract_involved_objects提取涉及对象 + involved_objects = await self._extract_involved_objects( + summary_id=summary_id, + end_user_id=end_user_id + ) + + # 6. 调用_extract_content_records提取内容记录 + content_records = await self._extract_content_records( + summary_id=summary_id, + end_user_id=end_user_id + ) + + # 7. 使用基类方法提取情绪 + emotion = await self.extract_episodic_emotion( + summary_id=summary_id, + end_user_id=end_user_id + ) + + # 8. 返回完整的详情信息 + details = { + "id": summary_id, + "created_at": created_at_timestamp, + "involved_objects": involved_objects, + "episodic_type": episodic_type, + "content_records": content_records, + "emotion": emotion + } + + logger.info(f"成功获取 summary_id={summary_id} 的情景记忆详情") + + return details + + except ValueError: + # 重新抛出ValueError,让Controller层处理 + raise + except Exception as e: + logger.error(f"获取情景记忆详情时出错: {str(e)}", exc_info=True) + raise + + +# 创建全局服务实例 +memory_episodic_service = MemoryEpisodicService() diff --git a/api/app/services/memory_explicit_service.py b/api/app/services/memory_explicit_service.py new file mode 100644 index 00000000..713215c3 --- /dev/null +++ b/api/app/services/memory_explicit_service.py @@ -0,0 +1,274 @@ +""" +显性记忆服务 + +处理显性记忆相关的业务逻辑,包括情景记忆和语义记忆的查询。 +""" + +from typing import Any, Dict + +from app.core.logging_config import get_logger +from app.services.memory_base_service import MemoryBaseService + +logger = get_logger(__name__) + + +class MemoryExplicitService(MemoryBaseService): + """显性记忆服务类""" + + def __init__(self): + super().__init__() + logger.info("MemoryExplicitService initialized") + + async def get_explicit_memory_overview( + self, + end_user_id: str + ) -> Dict[str, Any]: + """ + 获取显性记忆总览信息 + + 返回两部分: + 1. 情景记忆(episodic_memories)- 来自MemorySummary节点 + 2. 语义记忆(semantic_memories)- 来自ExtractedEntity节点(is_explicit_memory=true) + + Args: + end_user_id: 终端用户ID + + Returns: + { + "total": int, + "episodic_memories": [ + { + "id": str, + "title": str, + "content": str, + "created_at": int + } + ], + "semantic_memories": [ + { + "id": str, + "name": str, + "entity_type": str, + "core_definition": str + } + ] + } + """ + try: + logger.info(f"开始查询 end_user_id={end_user_id} 的显性记忆总览(情景记忆+语义记忆)") + + # ========== 1. 查询情景记忆(MemorySummary节点) ========== + episodic_query = """ + MATCH (s:MemorySummary) + WHERE s.group_id = $group_id + RETURN elementId(s) AS id, + s.name AS title, + s.content AS content, + s.created_at AS created_at + ORDER BY s.created_at DESC + """ + + episodic_result = await self.neo4j_connector.execute_query( + episodic_query, + group_id=end_user_id + ) + + # 处理情景记忆数据 + episodic_memories = [] + if episodic_result: + for record in episodic_result: + summary_id = record["id"] + title = record.get("title") or "未命名" + content = record.get("content") or "" + created_at_str = record.get("created_at") + + # 使用基类方法转换时间戳 + created_at_timestamp = self.parse_timestamp(created_at_str) + + # 注意:总览接口不返回 emotion 字段 + episodic_memories.append({ + "id": summary_id, + "title": title, + "content": content, + "created_at": created_at_timestamp + }) + + # ========== 2. 查询语义记忆(ExtractedEntity节点) ========== + semantic_query = """ + MATCH (e:ExtractedEntity) + WHERE e.group_id = $group_id + AND e.is_explicit_memory = true + RETURN elementId(e) AS id, + e.name AS name, + e.entity_type AS entity_type, + e.description AS core_definition + ORDER BY e.name ASC + """ + + semantic_result = await self.neo4j_connector.execute_query( + semantic_query, + group_id=end_user_id + ) + + # 处理语义记忆数据 + semantic_memories = [] + if semantic_result: + for record in semantic_result: + entity_id = record["id"] + name = record.get("name") or "未命名" + entity_type = record.get("entity_type") or "未分类" + core_definition = record.get("core_definition") or "" + + # 注意:总览接口不返回 detailed_notes 和 created_at 字段 + semantic_memories.append({ + "id": entity_id, + "name": name, + "entity_type": entity_type, + "core_definition": core_definition + }) + + # ========== 3. 返回结果 ========== + total_count = len(episodic_memories) + len(semantic_memories) + + logger.info( + f"成功获取 end_user_id={end_user_id} 的显性记忆总览," + f"情景记忆={len(episodic_memories)} 条,语义记忆={len(semantic_memories)} 条," + f"总计 {total_count} 条" + ) + + return { + "total": total_count, + "episodic_memories": episodic_memories, + "semantic_memories": semantic_memories + } + + except Exception as e: + logger.error(f"获取显性记忆总览时出错: {str(e)}", exc_info=True) + raise + + async def get_explicit_memory_details( + self, + end_user_id: str, + memory_id: str + ) -> Dict[str, Any]: + """ + 获取显性记忆详情 + + 根据 memory_id 查询情景记忆或语义记忆的详细信息。 + 先尝试查询情景记忆,如果找不到再查询语义记忆。 + + Args: + end_user_id: 终端用户ID + memory_id: 记忆ID(可以是情景记忆或语义记忆的ID) + + Returns: + 情景记忆返回: + { + "memory_type": "episodic", + "title": str, + "content": str, + "emotion": Dict, + "created_at": int + } + + 语义记忆返回: + { + "memory_type": "semantic", + "name": str, + "core_definition": str, + "detailed_notes": str, + "created_at": int + } + + Raises: + ValueError: 当记忆不存在时 + """ + try: + logger.info(f"开始查询显性记忆详情: end_user_id={end_user_id}, memory_id={memory_id}") + + # ========== 1. 先尝试查询情景记忆 ========== + episodic_query = """ + MATCH (s:MemorySummary) + WHERE elementId(s) = $memory_id AND s.group_id = $group_id + RETURN s.name AS title, + s.content AS content, + s.created_at AS created_at + """ + + episodic_result = await self.neo4j_connector.execute_query( + episodic_query, + memory_id=memory_id, + group_id=end_user_id + ) + + if episodic_result and len(episodic_result) > 0: + record = episodic_result[0] + title = record.get("title") or "未命名" + content = record.get("content") or "" + created_at_str = record.get("created_at") + + # 使用基类方法转换时间戳 + created_at_timestamp = self.parse_timestamp(created_at_str) + + # 使用基类方法获取情绪信息 + emotion = await self.extract_episodic_emotion( + summary_id=memory_id, + end_user_id=end_user_id + ) + + logger.info(f"成功获取情景记忆详情: memory_id={memory_id}") + return { + "memory_type": "episodic", + "title": title, + "content": content, + "emotion": emotion, + "created_at": created_at_timestamp + } + + # ========== 2. 如果不是情景记忆,尝试查询语义记忆 ========== + semantic_query = """ + MATCH (e:ExtractedEntity) + WHERE elementId(e) = $memory_id + AND e.group_id = $group_id + AND e.is_explicit_memory = true + RETURN e.name AS name, + e.description AS core_definition, + e.example AS detailed_notes, + e.created_at AS created_at + """ + + semantic_result = await self.neo4j_connector.execute_query( + semantic_query, + memory_id=memory_id, + group_id=end_user_id + ) + + if semantic_result and len(semantic_result) > 0: + record = semantic_result[0] + name = record.get("name") or "未命名" + core_definition = record.get("core_definition") or "" + detailed_notes = record.get("detailed_notes") or "" + created_at_str = record.get("created_at") + + # 使用基类方法转换时间戳 + created_at_timestamp = self.parse_timestamp(created_at_str) + + logger.info(f"成功获取语义记忆详情: memory_id={memory_id}") + return { + "memory_type": "semantic", + "name": name, + "core_definition": core_definition, + "detailed_notes": detailed_notes, + "created_at": created_at_timestamp + } + + # ========== 3. 两种记忆都找不到 ========== + logger.warning(f"记忆不存在: memory_id={memory_id}, end_user_id={end_user_id}") + raise ValueError(f"记忆不存在: memory_id={memory_id}") + + except ValueError: + # 重新抛出 ValueError(记忆不存在) + raise + except Exception as e: + logger.error(f"获取显性记忆详情时出错: {str(e)}", exc_info=True) + raise diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index b77a4ada..bfb05d47 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -883,866 +883,6 @@ class UserMemoryService: "failed": failed, "errors": errors + [{"error": f"批量处理失败: {str(e)}"}] } - - async def _get_title_and_type( - self, - summary_id: str, - end_user_id: str - ) -> Tuple[str, str]: - """ - 读取情景记忆的标题(title)和类型(type) - - 仅负责读取已存在的title和type,不进行生成 - title从name属性读取,type从memory_type属性读取 - - Args: - summary_id: Summary节点的ID - end_user_id: 终端用户ID (group_id) - - Returns: - (标题, 类型)元组,如果不存在则返回默认值 - """ - try: - # 查询Summary节点的name(作为title)和memory_type(作为type) - query = """ - MATCH (s:MemorySummary) - WHERE elementId(s) = $summary_id AND s.group_id = $group_id - RETURN s.name AS title, s.memory_type AS type - """ - - result = await self.neo4j_connector.execute_query( - query, - summary_id=summary_id, - group_id=end_user_id - ) - - if not result or len(result) == 0: - logger.warning(f"未找到 summary_id={summary_id} 的节点") - return ("未知标题", "其他") - - record = result[0] - title = record.get("title") or "未命名" - episodic_type = record.get("type") or "其他" - - return (title, episodic_type) - - except Exception as e: - logger.error(f"读取标题和类型时出错: {str(e)}", exc_info=True) - return ("错误", "其他") - - @staticmethod - async def generate_title_and_type_for_summary( - content: str, - end_user_id: str - ) -> Tuple[str, str]: - """ - 为MemorySummary生成标题和类型(静态方法,用于创建节点时调用) - - 此方法应该在创建MemorySummary节点时调用,生成title和type - - Args: - content: Summary的内容文本 - end_user_id: 终端用户ID (group_id) - - Returns: - (标题, 类型)元组 - """ - from app.core.memory.utils.prompt.prompt_utils import render_episodic_title_and_type_prompt - import json - - # 定义有效的类型集合 - VALID_TYPES = { - "conversation", # 对话 - "project_work", # 项目/工作 - "learning", # 学习 - "decision", # 决策 - "important_event" # 重要事件 - } - DEFAULT_TYPE = "conversation" # 默认类型 - - try: - if not content: - logger.warning("content为空,无法生成标题和类型") - return ("空内容", DEFAULT_TYPE) - - # 1. 渲染Jinja2提示词模板 - prompt = await render_episodic_title_and_type_prompt(content) - - # 2. 调用LLM生成标题和类型 - llm_client = _get_llm_client_for_user(end_user_id) - messages = [ - {"role": "user", "content": prompt} - ] - - response = await llm_client.chat(messages=messages) - - # 3. 解析LLM响应 - content_response = response.content - if isinstance(content_response, list): - if len(content_response) > 0: - if isinstance(content_response[0], dict): - text = content_response[0].get('text', content_response[0].get('content', str(content_response[0]))) - full_response = str(text) - else: - full_response = str(content_response[0]) - else: - full_response = "" - elif isinstance(content_response, dict): - full_response = str(content_response.get('text', content_response.get('content', str(content_response)))) - else: - full_response = str(content_response) if content_response is not None else "" - - # 4. 解析JSON响应 - try: - # 尝试从响应中提取JSON - # 移除可能的markdown代码块标记 - json_str = full_response.strip() - if json_str.startswith("```json"): - json_str = json_str[7:] - if json_str.startswith("```"): - json_str = json_str[3:] - if json_str.endswith("```"): - json_str = json_str[:-3] - json_str = json_str.strip() - - result_data = json.loads(json_str) - title = result_data.get("title", "未知标题") - episodic_type_raw = result_data.get("type", DEFAULT_TYPE) - - # 5. 校验和归一化类型 - # 将类型转换为小写并去除空格 - episodic_type_normalized = str(episodic_type_raw).lower().strip() - - # 检查是否在有效类型集合中 - if episodic_type_normalized in VALID_TYPES: - episodic_type = episodic_type_normalized - else: - # 尝试映射常见的中文类型到英文 - type_mapping = { - "对话": "conversation", - "项目": "project_work", - "工作": "project_work", - "项目/工作": "project_work", - "学习": "learning", - "决策": "decision", - "重要事件": "important_event", - "事件": "important_event" - } - episodic_type = type_mapping.get(episodic_type_raw, DEFAULT_TYPE) - logger.warning( - f"LLM返回的类型 '{episodic_type_raw}' 不在有效集合中," - f"已归一化为 '{episodic_type}'" - ) - - logger.info(f"成功生成标题和类型: title={title}, type={episodic_type}") - return (title, episodic_type) - - except json.JSONDecodeError: - logger.error(f"无法解析LLM响应为JSON: {full_response}") - return ("解析失败", DEFAULT_TYPE) - - except Exception as e: - logger.error(f"生成标题和类型时出错: {str(e)}", exc_info=True) - return ("错误", DEFAULT_TYPE) - - async def _extract_involved_objects( - self, - summary_id: str, - end_user_id: str - ) -> List[str]: - """ - 提取情景记忆涉及的前3个最重要实体 - - Args: - summary_id: Summary节点的ID - end_user_id: 终端用户ID (group_id) - - Returns: - 前3个实体的name属性列表 - """ - try: - # 查询Summary节点指向的Statement节点,再查询Statement指向的ExtractedEntity节点 - # 按activation_value降序排序,返回前3个 - query = """ - MATCH (s:MemorySummary) - WHERE elementId(s) = $summary_id AND s.group_id = $group_id - MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement) - MATCH (stmt)-[:REFERENCES_ENTITY]->(entity:ExtractedEntity) - WHERE entity.activation_value IS NOT NULL - RETURN DISTINCT entity.name AS name, entity.activation_value AS activation - ORDER BY activation DESC - LIMIT 3 - """ - - result = await self.neo4j_connector.execute_query( - query, - summary_id=summary_id, - group_id=end_user_id - ) - - # 提取实体名称 - involved_objects = [record["name"] for record in result if record.get("name")] - - logger.info(f"成功提取 summary_id={summary_id} 的涉及对象: {involved_objects}") - - return involved_objects - - except Exception as e: - logger.error(f"提取涉及对象时出错: {str(e)}", exc_info=True) - return [] - - async def _extract_content_records( - self, - summary_id: str, - end_user_id: str - ) -> List[str]: - """ - 提取情景记忆的内容记录 - - Args: - summary_id: Summary节点的ID - end_user_id: 终端用户ID (group_id) - - Returns: - 所有Statement节点的statement属性内容列表 - """ - try: - # 查询Summary节点指向的所有Statement节点 - query = """ - MATCH (s:MemorySummary) - WHERE elementId(s) = $summary_id AND s.group_id = $group_id - MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement) - WHERE stmt.statement IS NOT NULL AND stmt.statement <> '' - RETURN stmt.statement AS statement - """ - - result = await self.neo4j_connector.execute_query( - query, - summary_id=summary_id, - group_id=end_user_id - ) - - # 提取statement内容 - content_records = [record["statement"] for record in result if record.get("statement")] - - logger.info(f"成功提取 summary_id={summary_id} 的内容记录,共 {len(content_records)} 条") - - return content_records - - except Exception as e: - logger.error(f"提取内容记录时出错: {str(e)}", exc_info=True) - return [] - - async def _extract_episodic_emotion( - self, - summary_id: str, - end_user_id: str - ) -> Optional[str]: - """ - 提取情景记忆的主要情绪 - - Args: - summary_id: Summary节点的ID - end_user_id: 终端用户ID (group_id) - - Returns: - 最大emotion_intensity对应的emotion_type,如果没有则返回None - """ - try: - # 查询Summary节点指向的所有Statement节点 - # 筛选具有emotion_type属性的节点 - # 按emotion_intensity降序排序,返回第一个 - query = """ - MATCH (s:MemorySummary) - WHERE elementId(s) = $summary_id AND s.group_id = $group_id - MATCH (s)-[:DERIVED_FROM_STATEMENT]->(stmt:Statement) - WHERE stmt.emotion_type IS NOT NULL - AND stmt.emotion_intensity IS NOT NULL - RETURN stmt.emotion_type AS emotion_type, - stmt.emotion_intensity AS emotion_intensity - ORDER BY emotion_intensity DESC - LIMIT 1 - """ - - result = await self.neo4j_connector.execute_query( - query, - summary_id=summary_id, - group_id=end_user_id - ) - - # 提取emotion_type - if result and len(result) > 0: - emotion_type = result[0].get("emotion_type") - logger.info(f"成功提取 summary_id={summary_id} 的情绪: {emotion_type}") - return emotion_type - else: - logger.info(f"summary_id={summary_id} 没有情绪信息") - return None - - except Exception as e: - logger.error(f"提取情景记忆情绪时出错: {str(e)}", exc_info=True) - return None - - async def get_episodic_memory_overview( - self, - db: Session, - end_user_id: str, - time_range: str = "all", - episodic_type: str = "all", - title_keyword: Optional[str] = None - ) -> Dict[str, Any]: - """ - 获取情景记忆总览信息 - - Args: - db: 数据库会话 - end_user_id: 终端用户ID - time_range: 时间范围筛选 - episodic_type: 情景类型筛选 - title_keyword: 标题关键词(可选,用于模糊搜索) - """ - try: - logger.info( - f"开始查询 end_user_id={end_user_id} 的情景记忆总览, " - f"time_range={time_range}, episodic_type={episodic_type}, title_keyword={title_keyword}" - ) - - # 1. 先查询所有情景记忆的总数(不受筛选条件限制) - total_all_query = """ - MATCH (s:MemorySummary) - WHERE s.group_id = $group_id - RETURN count(s) AS total_all - """ - total_all_result = await self.neo4j_connector.execute_query( - total_all_query, - group_id=end_user_id - ) - total_all = total_all_result[0]["total_all"] if total_all_result else 0 - - # 2. 计算时间范围的起始时间戳 - time_filter = self._calculate_time_filter(time_range) - - # 3. 构建Cypher查询 - query = """ - MATCH (s:MemorySummary) - WHERE s.group_id = $group_id - """ - - # 添加时间范围过滤 - if time_filter: - query += " AND s.created_at >= $time_filter" - - # 添加标题关键词过滤(如果提供了title_keyword) - if title_keyword: - query += " AND toLower(s.name) CONTAINS toLower($title_keyword)" - - query += """ - RETURN elementId(s) AS id, - s.created_at AS created_at, - s.memory_type AS type, - s.name AS title - ORDER BY s.created_at DESC - """ - - params = {"group_id": end_user_id} - if time_filter: - params["time_filter"] = time_filter - if title_keyword: - params["title_keyword"] = title_keyword - - result = await self.neo4j_connector.execute_query(query, **params) - - # 4. 如果没有数据,返回空列表 - if not result: - logger.info(f"end_user_id={end_user_id} 没有情景记忆数据") - return { - "total": 0, - "total_all": total_all, - "episodic_memories": [] - } - - # 5. 对每个节点读取标题和类型,并应用类型筛选 - episodic_memories = [] - for record in result: - summary_id = record["id"] - created_at_str = record.get("created_at") - memory_type = record.get("type", "其他") - title = record.get("title") or "未命名" # 直接从查询结果获取标题 - - # 应用情景类型筛选 - if episodic_type != "all": - # 检查类型是否匹配 - # 注意:Neo4j 中存储的 memory_type 现在应该是英文格式(如 "conversation", "project_work") - # 但为了兼容旧数据,我们也支持中文格式的匹配 - type_mapping = { - "conversation": "对话", - "project_work": "项目/工作", - "learning": "学习", - "decision": "决策", - "important_event": "重要事件" - } - - # 获取对应的中文类型(用于兼容旧数据) - chinese_type = type_mapping.get(episodic_type) - - # 检查类型是否匹配(支持新的英文格式和旧的中文格式) - if memory_type != episodic_type and memory_type != chinese_type: - continue - - # 转换时间戳 - created_at_timestamp = None - if created_at_str: - try: - from datetime import datetime - dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) - created_at_timestamp = int(dt_object.timestamp() * 1000) - except (ValueError, TypeError, AttributeError) as e: - logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}") - - episodic_memories.append({ - "id": summary_id, - "title": title, - "type": memory_type, - "created_at": created_at_timestamp - }) - - logger.info( - f"成功获取 end_user_id={end_user_id} 的情景记忆总览," - f"筛选后 {len(episodic_memories)} 条,总共 {total_all} 条" - ) - - return { - "total": len(episodic_memories), - "total_all": total_all, - "episodic_memories": episodic_memories - } - - except Exception as e: - logger.error(f"获取情景记忆总览时出错: {str(e)}", exc_info=True) - raise - - def _calculate_time_filter(self, time_range: str) -> Optional[str]: - """ - 根据时间范围计算过滤的起始时间 - - Args: - time_range: 时间范围 (all/today/this_week/this_month) - - Returns: - ISO格式的时间字符串,如果是"all"则返回None - """ - from datetime import datetime, timedelta - import pytz - - if time_range == "all": - return None - - # 获取当前时间(UTC) - now = datetime.now(pytz.UTC) - - if time_range == "today": - # 今天的开始时间(00:00:00) - start_time = now.replace(hour=0, minute=0, second=0, microsecond=0) - elif time_range == "this_week": - # 本周的开始时间(周一00:00:00) - days_since_monday = now.weekday() - start_time = (now - timedelta(days=days_since_monday)).replace( - hour=0, minute=0, second=0, microsecond=0 - ) - elif time_range == "this_month": - # 本月的开始时间(1号00:00:00) - start_time = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) - else: - return None - - # 返回ISO格式字符串 - return start_time.isoformat() - - async def get_episodic_memory_details( - self, - db: Session, - end_user_id: str, - summary_id: str - ) -> Dict[str, Any]: - """ - 获取单个情景记忆的详细信息 - - """ - try: - logger.info(f"开始查询 end_user_id={end_user_id}, summary_id={summary_id} 的情景记忆详情") - - # 1. 查询指定的MemorySummary节点 - query = """ - MATCH (s:MemorySummary) - WHERE elementId(s) = $summary_id AND s.group_id = $group_id - RETURN elementId(s) AS id, s.created_at AS created_at - """ - - result = await self.neo4j_connector.execute_query( - query, - summary_id=summary_id, - group_id=end_user_id - ) - - # 2. 如果节点不存在,返回错误 - if not result or len(result) == 0: - logger.warning(f"未找到 summary_id={summary_id} 的情景记忆") - raise ValueError(f"情景记忆不存在: summary_id={summary_id}") - - # 3. 获取基本信息 - record = result[0] - created_at_str = record.get("created_at") - - # 转换时间戳 - created_at_timestamp = None - if created_at_str: - try: - from datetime import datetime - dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) - created_at_timestamp = int(dt_object.timestamp() * 1000) - except (ValueError, TypeError, AttributeError) as e: - logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}") - - # 4. 调用_get_title_and_type读取标题和类型 - title, episodic_type = await self._get_title_and_type( - summary_id=summary_id, - end_user_id=end_user_id - ) - - # 5. 调用_extract_involved_objects提取涉及对象 - involved_objects = await self._extract_involved_objects( - summary_id=summary_id, - end_user_id=end_user_id - ) - - # 6. 调用_extract_content_records提取内容记录 - content_records = await self._extract_content_records( - summary_id=summary_id, - end_user_id=end_user_id - ) - - # 7. 调用_extract_episodic_emotion提取情绪 - emotion = await self._extract_episodic_emotion( - summary_id=summary_id, - end_user_id=end_user_id - ) - - # 8. 返回完整的详情信息 - details = { - "id": summary_id, - "created_at": created_at_timestamp, - "involved_objects": involved_objects, - "episodic_type": episodic_type, - "content_records": content_records, - "emotion": emotion - } - - logger.info(f"成功获取 summary_id={summary_id} 的情景记忆详情") - - return details - - except ValueError: - # 重新抛出ValueError,让Controller层处理 - raise - except Exception as e: - logger.error(f"获取情景记忆详情时出错: {str(e)}", exc_info=True) - raise - - async def get_explicit_memory_overview( - self, - db: Session, - end_user_id: str - ) -> Dict[str, Any]: - """ - 获取显性记忆总览信息 - - 返回两部分: - 1. 情景记忆(episodic_memories)- 来自MemorySummary节点 - 2. 语义记忆(semantic_memories)- 来自ExtractedEntity节点(is_explicit_memory=true) - - Args: - db: 数据库会话 - end_user_id: 终端用户ID - - Returns: - { - "total": int, - "episodic_memories": [ - { - "id": str, - "title": str, - "content": str, - "created_at": int, - "emotion": Dict - } - ], - "semantic_memories": [ - { - "id": str, - "name": str, - "entity_type": str, - "core_definition": str, - "detailed_notes": str, - "created_at": int - } - ] - } - """ - try: - logger.info(f"开始查询 end_user_id={end_user_id} 的显性记忆总览(情景记忆+语义记忆)") - - # ========== 1. 查询情景记忆(MemorySummary节点) ========== - episodic_query = """ - MATCH (s:MemorySummary) - WHERE s.group_id = $group_id - RETURN elementId(s) AS id, - s.name AS title, - s.content AS content, - s.created_at AS created_at - ORDER BY s.created_at DESC - """ - - episodic_result = await self.neo4j_connector.execute_query( - episodic_query, - group_id=end_user_id - ) - - # 处理情景记忆数据 - episodic_memories = [] - if episodic_result: - for record in episodic_result: - summary_id = record["id"] - title = record.get("title") or "未命名" - content = record.get("content") or "" - created_at_str = record.get("created_at") - - # 转换时间戳 - created_at_timestamp = None - if created_at_str: - try: - from datetime import datetime - dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) - created_at_timestamp = int(dt_object.timestamp() * 1000) - except (ValueError, TypeError, AttributeError) as e: - logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}") - - # 注意:总览接口不返回 emotion 字段 - episodic_memories.append({ - "id": summary_id, - "title": title, - "content": content, - "created_at": created_at_timestamp - }) - - # ========== 2. 查询语义记忆(ExtractedEntity节点) ========== - semantic_query = """ - MATCH (e:ExtractedEntity) - WHERE e.group_id = $group_id - AND e.is_explicit_memory = true - RETURN elementId(e) AS id, - e.name AS name, - e.entity_type AS entity_type, - e.description AS core_definition, - e.example AS detailed_notes, - e.created_at AS created_at - ORDER BY e.created_at DESC - """ - - semantic_result = await self.neo4j_connector.execute_query( - semantic_query, - group_id=end_user_id - ) - - # 处理语义记忆数据 - semantic_memories = [] - if semantic_result: - for record in semantic_result: - entity_id = record["id"] - name = record.get("name") or "未命名" - entity_type = record.get("entity_type") or "未分类" - core_definition = record.get("core_definition") or "" - created_at_str = record.get("created_at") - - # 转换时间戳 - created_at_timestamp = None - if created_at_str: - try: - from datetime import datetime - dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) - created_at_timestamp = int(dt_object.timestamp() * 1000) - except (ValueError, TypeError, AttributeError) as e: - logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}") - - # 注意:总览接口不返回 detailed_notes 字段 - semantic_memories.append({ - "id": entity_id, - "name": name, - "entity_type": entity_type, - "core_definition": core_definition, - "created_at": created_at_timestamp - }) - - # ========== 3. 返回结果 ========== - total_count = len(episodic_memories) + len(semantic_memories) - - logger.info( - f"成功获取 end_user_id={end_user_id} 的显性记忆总览," - f"情景记忆={len(episodic_memories)} 条,语义记忆={len(semantic_memories)} 条," - f"总计 {total_count} 条" - ) - - return { - "total": total_count, - "episodic_memories": episodic_memories, - "semantic_memories": semantic_memories - } - - except Exception as e: - logger.error(f"获取显性记忆总览时出错: {str(e)}", exc_info=True) - raise - - async def get_explicit_memory_details( - self, - db: Session, - end_user_id: str, - memory_id: str - ) -> Dict[str, Any]: - """ - 获取显性记忆详情 - - 根据 memory_id 查询情景记忆或语义记忆的详细信息。 - 先尝试查询情景记忆,如果找不到再查询语义记忆。 - - Args: - db: 数据库会话 - end_user_id: 终端用户ID - memory_id: 记忆ID(可以是情景记忆或语义记忆的ID) - - Returns: - 情景记忆返回: - { - "memory_type": "episodic", - "title": str, - "content": str, - "emotion": Dict, - "created_at": int - } - - 语义记忆返回: - { - "memory_type": "semantic", - "name": str, - "core_definition": str, - "detailed_notes": str, - "created_at": int - } - - Raises: - ValueError: 当记忆不存在时 - """ - try: - logger.info(f"开始查询显性记忆详情: end_user_id={end_user_id}, memory_id={memory_id}") - - # ========== 1. 先尝试查询情景记忆 ========== - episodic_query = """ - MATCH (s:MemorySummary) - WHERE elementId(s) = $memory_id AND s.group_id = $group_id - RETURN s.name AS title, - s.content AS content, - s.created_at AS created_at - """ - - episodic_result = await self.neo4j_connector.execute_query( - episodic_query, - memory_id=memory_id, - group_id=end_user_id - ) - - if episodic_result and len(episodic_result) > 0: - record = episodic_result[0] - title = record.get("title") or "未命名" - content = record.get("content") or "" - created_at_str = record.get("created_at") - - # 转换时间戳 - created_at_timestamp = None - if created_at_str: - try: - from datetime import datetime - dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) - created_at_timestamp = int(dt_object.timestamp() * 1000) - except (ValueError, TypeError, AttributeError) as e: - logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}") - - # 获取情绪信息 - emotion = await self._extract_episodic_emotion( - summary_id=memory_id, - end_user_id=end_user_id - ) - - logger.info(f"成功获取情景记忆详情: memory_id={memory_id}") - return { - "memory_type": "episodic", - "title": title, - "content": content, - "emotion": emotion, - "created_at": created_at_timestamp - } - - # ========== 2. 如果不是情景记忆,尝试查询语义记忆 ========== - semantic_query = """ - MATCH (e:ExtractedEntity) - WHERE elementId(e) = $memory_id - AND e.group_id = $group_id - AND e.is_explicit_memory = true - RETURN e.name AS name, - e.description AS core_definition, - e.example AS detailed_notes, - e.created_at AS created_at - """ - - semantic_result = await self.neo4j_connector.execute_query( - semantic_query, - memory_id=memory_id, - group_id=end_user_id - ) - - if semantic_result and len(semantic_result) > 0: - record = semantic_result[0] - name = record.get("name") or "未命名" - core_definition = record.get("core_definition") or "" - detailed_notes = record.get("detailed_notes") or "" - created_at_str = record.get("created_at") - - # 转换时间戳 - created_at_timestamp = None - if created_at_str: - try: - from datetime import datetime - dt_object = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) - created_at_timestamp = int(dt_object.timestamp() * 1000) - except (ValueError, TypeError, AttributeError) as e: - logger.warning(f"无法解析时间戳: {created_at_str}, error={str(e)}") - - logger.info(f"成功获取语义记忆详情: memory_id={memory_id}") - return { - "memory_type": "semantic", - "name": name, - "core_definition": core_definition, - "detailed_notes": detailed_notes, - "created_at": created_at_timestamp - } - - # ========== 3. 两种记忆都找不到 ========== - logger.warning(f"记忆不存在: memory_id={memory_id}, end_user_id={end_user_id}") - raise ValueError(f"记忆不存在: memory_id={memory_id}") - - except ValueError: - # 重新抛出 ValueError(记忆不存在) - raise - except Exception as e: - logger.error(f"获取显性记忆详情时出错: {str(e)}", exc_info=True) - raise # 独立的分析函数 diff --git a/api/migrations/versions/9ab9b6393f32_20261511.py b/api/migrations/versions/9ab9b6393f32_20261511.py index 8c4a5326..f8bc7418 100644 --- a/api/migrations/versions/9ab9b6393f32_20261511.py +++ b/api/migrations/versions/9ab9b6393f32_20261511.py @@ -27,4 +27,4 @@ def upgrade() -> None: def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### op.create_foreign_key(op.f('tool_executions_user_id_fkey'), 'tool_executions', 'users', ['user_id'], ['id']) - # ### end Alembic commands ### + # ### end Alembic commands ### \ No newline at end of file