diff --git a/api/app/controllers/memory_forget_controller.py b/api/app/controllers/memory_forget_controller.py index d4a76f6f..705445fd 100644 --- a/api/app/controllers/memory_forget_controller.py +++ b/api/app/controllers/memory_forget_controller.py @@ -76,9 +76,28 @@ async def trigger_forgetting_cycle( api_logger.warning(f"用户 {current_user.username} 尝试触发遗忘周期但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + # 通过 group_id 获取关联的 config_id + try: + from app.services.memory_agent_service import get_end_user_connected_config + + connected_config = get_end_user_connected_config(payload.group_id, db) + config_id = connected_config.get("memory_config_id") + + if config_id is None: + api_logger.warning(f"终端用户 {payload.group_id} 未关联记忆配置") + return fail(BizCode.INVALID_PARAMETER, f"终端用户 {payload.group_id} 未关联记忆配置", "memory_config_id is None") + + api_logger.debug(f"通过 group_id={payload.group_id} 获取到 config_id={config_id}") + except ValueError as e: + api_logger.warning(f"获取终端用户配置失败: {str(e)}") + return fail(BizCode.INVALID_PARAMETER, str(e), "ValueError") + except Exception as e: + api_logger.error(f"获取终端用户配置时发生错误: {str(e)}") + return fail(BizCode.INTERNAL_ERROR, "获取终端用户配置失败", str(e)) + api_logger.info( f"用户 {current_user.username} 在工作空间 {workspace_id} 请求触发遗忘周期: " - f"group_id={payload.group_id}, max_batch={payload.max_merge_batch_size}, " + f"group_id={payload.group_id}, config_id={config_id}, max_batch={payload.max_merge_batch_size}, " f"min_days={payload.min_days_since_access}" ) @@ -89,7 +108,7 @@ async def trigger_forgetting_cycle( group_id=payload.group_id, max_merge_batch_size=payload.max_merge_batch_size, min_days_since_access=payload.min_days_since_access, - config_id=payload.config_id + config_id=config_id ) # 构建响应 @@ -217,7 +236,6 @@ async def update_forgetting_config( @router.get("/stats", response_model=ApiResponse) async def get_forgetting_stats( group_id: Optional[str] = None, - config_id: Optional[int] = None, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): @@ -227,8 +245,7 @@ async def get_forgetting_stats( 返回知识层节点统计、激活值分布等信息。 Args: - group_id: 组ID(可选) - config_id: 配置ID(可选,用于获取遗忘阈值) + group_id: 组ID(即 end_user_id,可选) current_user: 当前用户 db: 数据库会话 @@ -242,6 +259,27 @@ async def get_forgetting_stats( api_logger.warning(f"用户 {current_user.username} 尝试获取遗忘引擎统计但未选择工作空间") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + # 如果提供了 group_id,通过它获取 config_id + config_id = None + if group_id: + try: + from app.services.memory_agent_service import get_end_user_connected_config + + connected_config = get_end_user_connected_config(group_id, db) + config_id = connected_config.get("memory_config_id") + + if config_id is None: + api_logger.warning(f"终端用户 {group_id} 未关联记忆配置") + return fail(BizCode.INVALID_PARAMETER, f"终端用户 {group_id} 未关联记忆配置", "memory_config_id is None") + + api_logger.debug(f"通过 group_id={group_id} 获取到 config_id={config_id}") + except ValueError as e: + api_logger.warning(f"获取终端用户配置失败: {str(e)}") + return fail(BizCode.INVALID_PARAMETER, str(e), "ValueError") + except Exception as e: + api_logger.error(f"获取终端用户配置时发生错误: {str(e)}") + return fail(BizCode.INTERNAL_ERROR, "获取终端用户配置失败", str(e)) + api_logger.info( f"用户 {current_user.username} 在工作空间 {workspace_id} 请求获取遗忘引擎统计: " f"group_id={group_id}, config_id={config_id}" diff --git a/api/app/core/memory/models/graph_models.py b/api/app/core/memory/models/graph_models.py index 4d4221a3..6ec1ca8e 100644 --- a/api/app/core/memory/models/graph_models.py +++ b/api/app/core/memory/models/graph_models.py @@ -37,12 +37,20 @@ def parse_historical_datetime(v): 此函数手动解析 ISO 8601 格式的日期字符串,支持1-4位年份 Args: - v: 日期值(可以是 None、datetime 对象或字符串) + v: 日期值(可以是 None、datetime 对象、Neo4j DateTime 对象或字符串) Returns: datetime 对象或 None """ - if v is None or isinstance(v, datetime): + if v is None: + return v + + # 处理 Neo4j DateTime 对象 + if hasattr(v, 'to_native'): + return v.to_native() + + # 处理 Python datetime 对象 + if isinstance(v, datetime): return v if isinstance(v, str): diff --git a/api/app/models/forgetting_cycle_history_model.py b/api/app/models/forgetting_cycle_history_model.py new file mode 100644 index 00000000..6c4f8208 --- /dev/null +++ b/api/app/models/forgetting_cycle_history_model.py @@ -0,0 +1,40 @@ +""" +遗忘周期历史记录模型 + +用于存储每次遗忘周期执行的历史数据,支持趋势分析和可视化。 +""" + +import uuid +from datetime import datetime +from sqlalchemy import Column, Integer, String, Float, DateTime, Index +from sqlalchemy.dialects.postgresql import UUID +from app.db import Base + + +class ForgettingCycleHistory(Base): + """遗忘周期历史记录表""" + + __tablename__ = "forgetting_cycle_history" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, nullable=False, index=True, comment="主键ID") + end_user_id = Column(String(255), nullable=False, comment="终端用户ID") + execution_time = Column(DateTime, nullable=False, default=datetime.now, comment="执行时间") + merged_count = Column(Integer, default=0, comment="本次成功融合的节点对数") + failed_count = Column(Integer, default=0, comment="本次融合失败的节点对数") + average_activation_value = Column(Float, nullable=True, comment="平均激活值") + total_nodes = Column(Integer, default=0, comment="总节点数") + low_activation_nodes = Column(Integer, default=0, comment="低于遗忘阈值的节点总数(包含已融合、失败和待处理的)") + duration_seconds = Column(Float, nullable=True, comment="执行耗时(秒)") + trigger_type = Column(String(50), default="manual", comment="触发类型: manual/scheduled") + + # 创建索引以优化查询 + __table_args__ = ( + Index('idx_end_user_time', 'end_user_id', 'execution_time'), + Index('idx_execution_time', 'execution_time'), + ) + + def __repr__(self): + return ( + f"" + ) diff --git a/api/app/repositories/forgetting_cycle_history_repository.py b/api/app/repositories/forgetting_cycle_history_repository.py new file mode 100644 index 00000000..9c84b859 --- /dev/null +++ b/api/app/repositories/forgetting_cycle_history_repository.py @@ -0,0 +1,105 @@ +""" +遗忘周期历史记录仓储 + +提供遗忘周期历史记录的数据访问操作。 +""" + +from typing import List, Optional +from datetime import datetime, timedelta +from sqlalchemy.orm import Session +from sqlalchemy import desc, and_ + +from app.models.forgetting_cycle_history_model import ForgettingCycleHistory + + +class ForgettingCycleHistoryRepository: + """遗忘周期历史记录仓储类""" + + def create( + self, + db: Session, + end_user_id: str, + execution_time: datetime, + merged_count: int, + failed_count: int, + average_activation_value: Optional[float], + total_nodes: int, + low_activation_nodes: int, + duration_seconds: float, + trigger_type: str = "manual" + ) -> ForgettingCycleHistory: + """ + 创建历史记录 + + Args: + db: 数据库会话 + end_user_id: 终端用户ID + execution_time: 执行时间 + merged_count: 融合节点数 + failed_count: 失败节点数 + average_activation_value: 平均激活值 + total_nodes: 总节点数 + low_activation_nodes: 低激活值节点数 + duration_seconds: 执行耗时 + trigger_type: 触发类型 + + Returns: + ForgettingCycleHistory: 创建的历史记录 + """ + history = ForgettingCycleHistory( + end_user_id=end_user_id, + execution_time=execution_time, + merged_count=merged_count, + failed_count=failed_count, + average_activation_value=average_activation_value, + total_nodes=total_nodes, + low_activation_nodes=low_activation_nodes, + duration_seconds=duration_seconds, + trigger_type=trigger_type + ) + + db.add(history) + db.commit() + db.refresh(history) + + return history + + def get_recent_by_end_user( + self, + db: Session, + end_user_id: str + ) -> List[ForgettingCycleHistory]: + """ + 获取指定终端用户的所有历史记录(按时间降序排列) + + 注意:此方法返回所有历史记录,调用方需要自行处理日期分组和数量限制。 + + Args: + db: 数据库会话 + end_user_id: 终端用户ID + + Returns: + List[ForgettingCycleHistory]: 历史记录列表,按时间降序排列 + """ + return db.query(ForgettingCycleHistory).filter( + ForgettingCycleHistory.end_user_id == end_user_id + ).order_by(ForgettingCycleHistory.execution_time.desc()).all() + + def get_latest_by_end_user( + self, + db: Session, + end_user_id: str + ) -> Optional[ForgettingCycleHistory]: + """ + 获取指定终端用户的最新历史记录 + + Args: + db: 数据库会话 + end_user_id: 终端用户ID + + Returns: + Optional[ForgettingCycleHistory]: 最新历史记录 + """ + return db.query(ForgettingCycleHistory).filter( + ForgettingCycleHistory.end_user_id == end_user_id + ).order_by(desc(ForgettingCycleHistory.execution_time)).first() diff --git a/api/app/schemas/memory_storage_schema.py b/api/app/schemas/memory_storage_schema.py index 24747c34..ca9b29de 100644 --- a/api/app/schemas/memory_storage_schema.py +++ b/api/app/schemas/memory_storage_schema.py @@ -409,10 +409,9 @@ class ForgettingTriggerRequest(BaseModel): """手动触发遗忘周期请求模型""" model_config = ConfigDict(populate_by_name=True, extra="forbid") - group_id: Optional[str] = Field(None, description="组ID(可选,用于过滤特定组的节点)") + group_id: str = Field(..., description="组ID(即终端用户ID,必填)") max_merge_batch_size: int = Field(100, ge=1, le=1000, description="单次最大融合节点对数(默认100)") min_days_since_access: int = Field(30, ge=1, le=365, description="最小未访问天数(默认30天)") - config_id: Optional[int] = Field(None, description="配置ID(可选,用于指定遗忘引擎配置)") # TODO 后续group_id更换成enduser_id,自动与config_id关联 ,要删除此行 class ForgettingConfigResponse(BaseModel): @@ -450,15 +449,36 @@ class ForgettingConfigUpdateRequest(BaseModel): forgetting_interval_hours: Optional[int] = Field(None, ge=1, le=168, description="遗忘周期间隔(小时)") +class ForgettingCycleHistoryPoint(BaseModel): + """遗忘周期历史数据点模型(用于趋势图)""" + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + date: str = Field(..., description="日期(格式: '1/1', '1/2')") + merged_count: int = Field(..., description="每日融合节点数") + average_activation: Optional[float] = Field(None, description="平均激活值") + total_nodes: int = Field(..., description="总节点数") + execution_time: int = Field(..., description="执行时间(Unix时间戳,秒)") + + +class PendingForgettingNode(BaseModel): + """待遗忘节点模型""" + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + node_id: str = Field(..., description="节点ID") + node_type: str = Field(..., description="节点类型:statement/entity/summary") + content_summary: str = Field(..., description="内容摘要") + activation_value: float = Field(..., description="激活值") + last_access_time: int = Field(..., description="最后访问时间(Unix时间戳,秒)") + + class ForgettingStatsResponse(BaseModel): """遗忘引擎统计信息响应模型""" model_config = ConfigDict(populate_by_name=True, extra="forbid") activation_metrics: Dict[str, Any] = Field(..., description="激活值相关指标") node_distribution: Dict[str, int] = Field(..., description="节点类型分布") - consistency_check: Optional[Dict[str, Any]] = Field(None, description="数据一致性检查结果") - nodes_merged_total: int = Field(..., description="累计融合节点对数") - recent_cycles: List[Dict[str, Any]] = Field(..., description="最近的遗忘周期记录") - timestamp: str = Field(..., description="统计时间(ISO格式)") + recent_trends: List[ForgettingCycleHistoryPoint] = Field(..., description="最近7个日期的遗忘趋势数据(每天取最后一次执行)") + pending_nodes: List[PendingForgettingNode] = Field(..., description="待遗忘节点列表(前20个满足遗忘条件的节点)") + timestamp: int = Field(..., description="统计时间(时间戳)") class ForgettingReportResponse(BaseModel): diff --git a/api/app/services/memory_forget_service.py b/api/app/services/memory_forget_service.py index 30a84b25..8979682d 100644 --- a/api/app/services/memory_forget_service.py +++ b/api/app/services/memory_forget_service.py @@ -11,7 +11,7 @@ """ from typing import Optional, Dict, Any, Tuple -from datetime import datetime +from datetime import datetime, timezone from sqlalchemy.orm import Session @@ -24,18 +24,54 @@ from app.core.memory.storage_services.forgetting_engine.config_utils import ( ) from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.repositories.data_config_repository import DataConfigRepository +from app.repositories.forgetting_cycle_history_repository import ForgettingCycleHistoryRepository # 获取API专用日志器 api_logger = get_api_logger() +def convert_neo4j_datetime_to_python(value: Any) -> Optional[datetime]: + """ + 将 Neo4j DateTime 对象转换为 Python datetime 对象 + + Args: + value: Neo4j DateTime 对象、Python datetime 对象或字符串 + + Returns: + Python datetime 对象或 None + """ + if value is None: + return None + + try: + # Neo4j DateTime 对象 + if hasattr(value, 'to_native'): + return value.to_native() + # Python datetime 对象 + elif isinstance(value, datetime): + return value + # 字符串格式 + elif isinstance(value, str): + if value.endswith('Z'): + return datetime.fromisoformat(value.replace('Z', '+00:00')) + else: + return datetime.fromisoformat(value) + # 其他类型,尝试转换为字符串 + else: + return datetime.fromisoformat(str(value).replace('Z', '+00:00')) + except Exception as e: + api_logger.warning(f"转换时间失败: {value} (类型: {type(value).__name__}), 错误: {e}") + return None + + class MemoryForgetService: """遗忘引擎服务类""" def __init__(self): """初始化服务""" self.config_repository = DataConfigRepository() + self.history_repository = ForgettingCycleHistoryRepository() def _get_neo4j_connector(self) -> Neo4jConnector: """ @@ -161,10 +197,101 @@ class MemoryForgetService: 'low_activation_nodes': 0 } + async def _get_pending_forgetting_nodes( + self, + connector: Neo4jConnector, + group_id: str, + forgetting_threshold: float, + min_days_since_access: int, + limit: int = 20 + ) -> list[Dict[str, Any]]: + """ + 获取待遗忘节点列表 + + 查询满足遗忘条件的节点(激活值低于阈值且最后访问时间超过最小天数) + + Args: + connector: Neo4j 连接器 + group_id: 组ID + forgetting_threshold: 遗忘阈值 + min_days_since_access: 最小未访问天数 + limit: 返回节点数量限制 + + Returns: + list: 待遗忘节点列表 + """ + from datetime import timedelta + + # 计算最小访问时间(ISO 8601 格式字符串,使用 UTC 时区) + min_access_time = datetime.now(timezone.utc) - timedelta(days=min_days_since_access) + min_access_time_str = min_access_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') + + query = """ + MATCH (n) + WHERE (n:Statement OR n:ExtractedEntity OR n:MemorySummary) + AND n.group_id = $group_id + AND n.activation_value IS NOT NULL + AND n.activation_value < $threshold + AND n.last_access_time IS NOT NULL + AND datetime(n.last_access_time) < datetime($min_access_time_str) + RETURN + elementId(n) as node_id, + labels(n)[0] as node_type, + CASE + WHEN n:Statement THEN n.statement + WHEN n:ExtractedEntity THEN n.name + WHEN n:MemorySummary THEN n.content + ELSE '' + END as content_summary, + n.activation_value as activation_value, + n.last_access_time as last_access_time + ORDER BY n.activation_value ASC + LIMIT $limit + """ + + params = { + 'group_id': group_id, + 'threshold': forgetting_threshold, + 'min_access_time_str': min_access_time_str, + 'limit': limit + } + + results = await connector.execute_query(query, **params) + + pending_nodes = [] + for result in results: + # 将节点类型标签转换为小写 + node_type_label = result['node_type'].lower() + if node_type_label == 'extractedentity': + node_type_label = 'entity' + elif node_type_label == 'memorysummary': + node_type_label = 'summary' + + # 将 Neo4j DateTime 对象转换为时间戳 + last_access_time = result['last_access_time'] + last_access_dt = convert_neo4j_datetime_to_python(last_access_time) + # 确保 datetime 带有时区信息(假定为 UTC),避免 naive datetime 导致的时区偏差 + if last_access_dt: + if last_access_dt.tzinfo is None: + last_access_dt = last_access_dt.replace(tzinfo=timezone.utc) + last_access_timestamp = int(last_access_dt.timestamp()) + else: + last_access_timestamp = 0 + + pending_nodes.append({ + 'node_id': str(result['node_id']), + 'node_type': node_type_label, + 'content_summary': result['content_summary'] or '', + 'activation_value': result['activation_value'], + 'last_access_time': last_access_timestamp + }) + + return pending_nodes + async def trigger_forgetting_cycle( self, db: Session, - group_id: Optional[str] = None, + group_id: str, max_merge_batch_size: Optional[int] = None, min_days_since_access: Optional[int] = None, config_id: Optional[int] = None @@ -176,10 +303,10 @@ class MemoryForgetService: Args: db: 数据库会话 - group_id: 组ID(可选) + group_id: 组ID(即终端用户ID,必填) max_merge_batch_size: 最大融合批次大小(可选) min_days_since_access: 最小未访问天数(可选) - config_id: 配置ID(可选) + config_id: 配置ID(必填,由控制器层通过 group_id 获取) Returns: dict: 遗忘报告 @@ -187,6 +314,9 @@ class MemoryForgetService: # 获取遗忘引擎组件 _, _, forgetting_scheduler, config = await self._get_forgetting_components(db, config_id) + # 记录执行开始时间 + execution_time = datetime.now() + # 运行遗忘周期(LLM 客户端将在需要时由 forgetting_strategy 内部获取) report = await forgetting_scheduler.run_forgetting_cycle( group_id=group_id, @@ -202,6 +332,58 @@ class MemoryForgetService: f"耗时 {report['duration_seconds']:.2f} 秒" ) + # 获取当前的激活值统计(用于记录历史) + try: + connector = forgetting_scheduler.connector + stats_query = """ + MATCH (n) + WHERE (n:Statement OR n:ExtractedEntity OR n:MemorySummary OR n:Chunk) + AND n.group_id = $group_id + RETURN + count(n) as total_nodes, + avg(n.activation_value) as average_activation, + sum(CASE WHEN n.activation_value IS NOT NULL AND n.activation_value < $threshold THEN 1 ELSE 0 END) as low_activation_nodes + """ + + stats_results = await connector.execute_query( + stats_query, + group_id=group_id, + threshold=config['forgetting_threshold'] + ) + + if stats_results: + stats = stats_results[0] + total_nodes = stats['total_nodes'] or 0 + average_activation = stats['average_activation'] + low_activation_nodes = stats['low_activation_nodes'] or 0 + else: + total_nodes = 0 + average_activation = None + low_activation_nodes = 0 + + # 保存历史记录到数据库 + self.history_repository.create( + db=db, + end_user_id=group_id, + execution_time=execution_time, + merged_count=report['merged_count'], + failed_count=report['failed_count'], + average_activation_value=average_activation, + total_nodes=total_nodes, + low_activation_nodes=low_activation_nodes, + duration_seconds=report['duration_seconds'], + trigger_type='manual' + ) + + api_logger.info( + f"已保存遗忘周期历史记录: end_user_id={group_id}, " + f"merged_count={report['merged_count']}" + ) + + except Exception as e: + # 记录历史失败不应影响主流程 + api_logger.error(f"保存遗忘周期历史记录失败: {str(e)}") + return report def read_forgetting_config( @@ -337,7 +519,8 @@ class MemoryForgetService: 'nodes_without_activation': result['nodes_without_activation'] or 0, 'average_activation_value': result['average_activation'], 'low_activation_nodes': result['low_activation_nodes'] or 0, - 'timestamp': datetime.now().isoformat() + 'forgetting_threshold': forgetting_threshold, + 'timestamp': int(datetime.now().timestamp()) } else: activation_metrics = { @@ -346,7 +529,8 @@ class MemoryForgetService: 'nodes_without_activation': 0, 'average_activation_value': None, 'low_activation_nodes': 0, - 'timestamp': datetime.now().isoformat() + 'forgetting_threshold': forgetting_threshold, + 'timestamp': int(datetime.now().timestamp()) } # 收集节点类型分布 @@ -395,19 +579,95 @@ class MemoryForgetService: 'chunk_count': 0 } - # 构建统计信息(不包含监控历史数据) + # 获取最近7个日期的历史趋势数据(每天取最后一次执行) + recent_trends = [] + try: + if group_id: + # 查询所有历史记录 + history_records = self.history_repository.get_recent_by_end_user( + db=db, + end_user_id=group_id + ) + + # 按日期分组(一天可能有多次执行,取最后一次) + from collections import OrderedDict + daily_records = OrderedDict() + + # 遍历记录(已按时间降序),每个日期只保留第一次遇到的(即最后一次执行) + for record in history_records: + # 提取日期(格式: "1/1", "1/2")- 跨平台兼容 + month = record.execution_time.month + day = record.execution_time.day + date_str = f"{month}/{day}" + + # 如果这个日期还没有记录,添加它(这是该日期最后一次执行) + if date_str not in daily_records: + daily_records[date_str] = record + + # 如果已经有7个不同的日期,停止 + if len(daily_records) >= 7: + break + + # 构建趋势数据点(按时间从旧到新排序) + sorted_dates = sorted( + daily_records.items(), + key=lambda x: x[1].execution_time + ) + + for date_str, record in sorted_dates: + recent_trends.append({ + 'date': date_str, + 'merged_count': record.merged_count, + 'average_activation': record.average_activation_value, + 'total_nodes': record.total_nodes, + 'execution_time': int(record.execution_time.timestamp()) + }) + + api_logger.info(f"成功获取最近 {len(recent_trends)} 个日期的历史趋势数据") + + except Exception as e: + api_logger.error(f"获取历史趋势数据失败: {str(e)}") + # 失败时返回空列表,不影响主流程 + + # 获取待遗忘节点列表(前20个满足遗忘条件的节点) + pending_nodes = [] + try: + if group_id: + # 验证 min_days_since_access 配置值 + min_days = config.get('min_days_since_access') + if min_days is None or not isinstance(min_days, (int, float)) or min_days < 0: + api_logger.warning( + f"min_days_since_access 配置无效: {min_days}, 使用默认值 7" + ) + min_days = 7 + + pending_nodes = await self._get_pending_forgetting_nodes( + connector=connector, + group_id=group_id, + forgetting_threshold=forgetting_threshold, + min_days_since_access=int(min_days), + limit=20 + ) + + api_logger.info(f"成功获取 {len(pending_nodes)} 个待遗忘节点") + + except Exception as e: + api_logger.error(f"获取待遗忘节点失败: {str(e)}") + # 失败时返回空列表,不影响主流程 + + # 构建统计信息 stats = { 'activation_metrics': activation_metrics, 'node_distribution': node_distribution, - 'consistency_check': None, # 不再提供一致性检查 - 'nodes_merged_total': 0, # 不再跟踪累计融合数 - 'recent_cycles': [], # 不再提供历史记录 - 'timestamp': datetime.now().isoformat() + 'recent_trends': recent_trends, + 'pending_nodes': pending_nodes, + 'timestamp': int(datetime.now().timestamp()) } api_logger.info( f"成功获取遗忘引擎统计: total_nodes={stats['activation_metrics']['total_nodes']}, " - f"low_activation_nodes={stats['activation_metrics']['low_activation_nodes']}" + f"low_activation_nodes={stats['activation_metrics']['low_activation_nodes']}, " + f"trend_days={len(recent_trends)}, pending_nodes={len(pending_nodes)}" ) return stats diff --git a/api/app/services/memory_storage_service.py b/api/app/services/memory_storage_service.py index cf7ff74a..9cac26ec 100644 --- a/api/app/services/memory_storage_service.py +++ b/api/app/services/memory_storage_service.py @@ -89,11 +89,15 @@ class DataConfigService: # 数据配置服务类(PostgreSQL) value = item[field] dt = None - # 如果是 datetime 对象,直接使用 - if isinstance(value, datetime): + # 处理不同类型的时间值 + if hasattr(value, 'to_native'): + # Neo4j DateTime 对象 + dt = value.to_native() + elif isinstance(value, datetime): + # Python datetime 对象 dt = value - # 如果是字符串,先解析 elif isinstance(value, str): + # 字符串格式 try: dt = datetime.fromisoformat(value.replace('Z', '+00:00')) except Exception: diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index 25577cbf..a17e21f0 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -160,11 +160,20 @@ class MemoryInsightHelper: month_counts = Counter() valid_dates_count = 0 for record in records: - creation_time_str = record.get("creation_time") - if not creation_time_str: + creation_time = record.get("creation_time") + if not creation_time: continue try: - dt_object = datetime.fromisoformat(creation_time_str.replace("Z", "+00:00")) + # 处理 Neo4j DateTime 对象或字符串 + if hasattr(creation_time, 'to_native'): + dt_object = creation_time.to_native() + elif isinstance(creation_time, str): + dt_object = datetime.fromisoformat(creation_time.replace("Z", "+00:00")) + elif isinstance(creation_time, datetime): + dt_object = creation_time + else: + dt_object = datetime.fromisoformat(str(creation_time).replace("Z", "+00:00")) + month_counts[dt_object.month] += 1 valid_dates_count += 1 except (ValueError, TypeError, AttributeError): @@ -225,8 +234,33 @@ class MemoryInsightHelper: ) start_year, end_year = "N/A", "N/A" if time_records and time_records[0]["start_time"]: - start_year = datetime.fromisoformat(time_records[0]["start_time"].replace("Z", "+00:00")).year - end_year = datetime.fromisoformat(time_records[0]["end_time"].replace("Z", "+00:00")).year + start_time = time_records[0]["start_time"] + end_time = time_records[0]["end_time"] + + # 处理 Neo4j DateTime 对象或字符串 + try: + if hasattr(start_time, 'to_native'): + start_year = start_time.to_native().year + elif isinstance(start_time, str): + start_year = datetime.fromisoformat(start_time.replace("Z", "+00:00")).year + elif isinstance(start_time, datetime): + start_year = start_time.year + else: + start_year = datetime.fromisoformat(str(start_time).replace("Z", "+00:00")).year + except Exception: + start_year = "N/A" + + try: + if hasattr(end_time, 'to_native'): + end_year = end_time.to_native().year + elif isinstance(end_time, str): + end_year = datetime.fromisoformat(end_time.replace("Z", "+00:00")).year + elif isinstance(end_time, datetime): + end_year = end_time.year + else: + end_year = datetime.fromisoformat(str(end_time).replace("Z", "+00:00")).year + except Exception: + end_year = "N/A" return { "user_id": most_connected_user,