Feature/actr forget (#55)

* [changes]Request to remove 'config_id' has been received.

* [add]Add the access history record table

* [changes]Request to remove 'config_id' has been received.

* [add]Add the access history record table

* [add]Obtain the record of the forgetting trend

* [changes]Based on the AI's suggestion, make the necessary modifications.
This commit is contained in:
乐力齐
2026-01-08 15:15:13 +08:00
committed by GitHub
parent 7871663cae
commit a4af0f7432
8 changed files with 542 additions and 33 deletions

View File

@@ -76,9 +76,28 @@ async def trigger_forgetting_cycle(
api_logger.warning(f"用户 {current_user.username} 尝试触发遗忘周期但未选择工作空间") api_logger.warning(f"用户 {current_user.username} 尝试触发遗忘周期但未选择工作空间")
return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None")
# 通过 group_id 获取关联的 config_id
try:
from app.services.memory_agent_service import get_end_user_connected_config
connected_config = get_end_user_connected_config(payload.group_id, db)
config_id = connected_config.get("memory_config_id")
if config_id is None:
api_logger.warning(f"终端用户 {payload.group_id} 未关联记忆配置")
return fail(BizCode.INVALID_PARAMETER, f"终端用户 {payload.group_id} 未关联记忆配置", "memory_config_id is None")
api_logger.debug(f"通过 group_id={payload.group_id} 获取到 config_id={config_id}")
except ValueError as e:
api_logger.warning(f"获取终端用户配置失败: {str(e)}")
return fail(BizCode.INVALID_PARAMETER, str(e), "ValueError")
except Exception as e:
api_logger.error(f"获取终端用户配置时发生错误: {str(e)}")
return fail(BizCode.INTERNAL_ERROR, "获取终端用户配置失败", str(e))
api_logger.info( api_logger.info(
f"用户 {current_user.username} 在工作空间 {workspace_id} 请求触发遗忘周期: " f"用户 {current_user.username} 在工作空间 {workspace_id} 请求触发遗忘周期: "
f"group_id={payload.group_id}, max_batch={payload.max_merge_batch_size}, " f"group_id={payload.group_id}, config_id={config_id}, max_batch={payload.max_merge_batch_size}, "
f"min_days={payload.min_days_since_access}" f"min_days={payload.min_days_since_access}"
) )
@@ -89,7 +108,7 @@ async def trigger_forgetting_cycle(
group_id=payload.group_id, group_id=payload.group_id,
max_merge_batch_size=payload.max_merge_batch_size, max_merge_batch_size=payload.max_merge_batch_size,
min_days_since_access=payload.min_days_since_access, min_days_since_access=payload.min_days_since_access,
config_id=payload.config_id config_id=config_id
) )
# 构建响应 # 构建响应
@@ -217,7 +236,6 @@ async def update_forgetting_config(
@router.get("/stats", response_model=ApiResponse) @router.get("/stats", response_model=ApiResponse)
async def get_forgetting_stats( async def get_forgetting_stats(
group_id: Optional[str] = None, group_id: Optional[str] = None,
config_id: Optional[int] = None,
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
db: Session = Depends(get_db) db: Session = Depends(get_db)
): ):
@@ -227,8 +245,7 @@ async def get_forgetting_stats(
返回知识层节点统计、激活值分布等信息。 返回知识层节点统计、激活值分布等信息。
Args: Args:
group_id: 组ID可选 group_id: 组ID即 end_user_id可选)
config_id: 配置ID可选用于获取遗忘阈值
current_user: 当前用户 current_user: 当前用户
db: 数据库会话 db: 数据库会话
@@ -242,6 +259,27 @@ async def get_forgetting_stats(
api_logger.warning(f"用户 {current_user.username} 尝试获取遗忘引擎统计但未选择工作空间") api_logger.warning(f"用户 {current_user.username} 尝试获取遗忘引擎统计但未选择工作空间")
return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None")
# 如果提供了 group_id通过它获取 config_id
config_id = None
if group_id:
try:
from app.services.memory_agent_service import get_end_user_connected_config
connected_config = get_end_user_connected_config(group_id, db)
config_id = connected_config.get("memory_config_id")
if config_id is None:
api_logger.warning(f"终端用户 {group_id} 未关联记忆配置")
return fail(BizCode.INVALID_PARAMETER, f"终端用户 {group_id} 未关联记忆配置", "memory_config_id is None")
api_logger.debug(f"通过 group_id={group_id} 获取到 config_id={config_id}")
except ValueError as e:
api_logger.warning(f"获取终端用户配置失败: {str(e)}")
return fail(BizCode.INVALID_PARAMETER, str(e), "ValueError")
except Exception as e:
api_logger.error(f"获取终端用户配置时发生错误: {str(e)}")
return fail(BizCode.INTERNAL_ERROR, "获取终端用户配置失败", str(e))
api_logger.info( api_logger.info(
f"用户 {current_user.username} 在工作空间 {workspace_id} 请求获取遗忘引擎统计: " f"用户 {current_user.username} 在工作空间 {workspace_id} 请求获取遗忘引擎统计: "
f"group_id={group_id}, config_id={config_id}" f"group_id={group_id}, config_id={config_id}"

View File

@@ -37,12 +37,20 @@ def parse_historical_datetime(v):
此函数手动解析 ISO 8601 格式的日期字符串支持1-4位年份 此函数手动解析 ISO 8601 格式的日期字符串支持1-4位年份
Args: Args:
v: 日期值(可以是 None、datetime 对象或字符串) v: 日期值(可以是 None、datetime 对象、Neo4j DateTime 对象或字符串)
Returns: Returns:
datetime 对象或 None datetime 对象或 None
""" """
if v is None or isinstance(v, datetime): if v is None:
return v
# 处理 Neo4j DateTime 对象
if hasattr(v, 'to_native'):
return v.to_native()
# 处理 Python datetime 对象
if isinstance(v, datetime):
return v return v
if isinstance(v, str): if isinstance(v, str):

View File

@@ -0,0 +1,40 @@
"""
遗忘周期历史记录模型
用于存储每次遗忘周期执行的历史数据,支持趋势分析和可视化。
"""
import uuid
from datetime import datetime
from sqlalchemy import Column, Integer, String, Float, DateTime, Index
from sqlalchemy.dialects.postgresql import UUID
from app.db import Base
class ForgettingCycleHistory(Base):
"""遗忘周期历史记录表"""
__tablename__ = "forgetting_cycle_history"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, nullable=False, index=True, comment="主键ID")
end_user_id = Column(String(255), nullable=False, comment="终端用户ID")
execution_time = Column(DateTime, nullable=False, default=datetime.now, comment="执行时间")
merged_count = Column(Integer, default=0, comment="本次成功融合的节点对数")
failed_count = Column(Integer, default=0, comment="本次融合失败的节点对数")
average_activation_value = Column(Float, nullable=True, comment="平均激活值")
total_nodes = Column(Integer, default=0, comment="总节点数")
low_activation_nodes = Column(Integer, default=0, comment="低于遗忘阈值的节点总数(包含已融合、失败和待处理的)")
duration_seconds = Column(Float, nullable=True, comment="执行耗时(秒)")
trigger_type = Column(String(50), default="manual", comment="触发类型: manual/scheduled")
# 创建索引以优化查询
__table_args__ = (
Index('idx_end_user_time', 'end_user_id', 'execution_time'),
Index('idx_execution_time', 'execution_time'),
)
def __repr__(self):
return (
f"<ForgettingCycleHistory(id={self.id}, end_user_id={self.end_user_id}, "
f"merged_count={self.merged_count}, execution_time={self.execution_time})>"
)

View File

@@ -0,0 +1,105 @@
"""
遗忘周期历史记录仓储
提供遗忘周期历史记录的数据访问操作。
"""
from typing import List, Optional
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import desc, and_
from app.models.forgetting_cycle_history_model import ForgettingCycleHistory
class ForgettingCycleHistoryRepository:
"""遗忘周期历史记录仓储类"""
def create(
self,
db: Session,
end_user_id: str,
execution_time: datetime,
merged_count: int,
failed_count: int,
average_activation_value: Optional[float],
total_nodes: int,
low_activation_nodes: int,
duration_seconds: float,
trigger_type: str = "manual"
) -> ForgettingCycleHistory:
"""
创建历史记录
Args:
db: 数据库会话
end_user_id: 终端用户ID
execution_time: 执行时间
merged_count: 融合节点数
failed_count: 失败节点数
average_activation_value: 平均激活值
total_nodes: 总节点数
low_activation_nodes: 低激活值节点数
duration_seconds: 执行耗时
trigger_type: 触发类型
Returns:
ForgettingCycleHistory: 创建的历史记录
"""
history = ForgettingCycleHistory(
end_user_id=end_user_id,
execution_time=execution_time,
merged_count=merged_count,
failed_count=failed_count,
average_activation_value=average_activation_value,
total_nodes=total_nodes,
low_activation_nodes=low_activation_nodes,
duration_seconds=duration_seconds,
trigger_type=trigger_type
)
db.add(history)
db.commit()
db.refresh(history)
return history
def get_recent_by_end_user(
self,
db: Session,
end_user_id: str
) -> List[ForgettingCycleHistory]:
"""
获取指定终端用户的所有历史记录(按时间降序排列)
注意:此方法返回所有历史记录,调用方需要自行处理日期分组和数量限制。
Args:
db: 数据库会话
end_user_id: 终端用户ID
Returns:
List[ForgettingCycleHistory]: 历史记录列表,按时间降序排列
"""
return db.query(ForgettingCycleHistory).filter(
ForgettingCycleHistory.end_user_id == end_user_id
).order_by(ForgettingCycleHistory.execution_time.desc()).all()
def get_latest_by_end_user(
self,
db: Session,
end_user_id: str
) -> Optional[ForgettingCycleHistory]:
"""
获取指定终端用户的最新历史记录
Args:
db: 数据库会话
end_user_id: 终端用户ID
Returns:
Optional[ForgettingCycleHistory]: 最新历史记录
"""
return db.query(ForgettingCycleHistory).filter(
ForgettingCycleHistory.end_user_id == end_user_id
).order_by(desc(ForgettingCycleHistory.execution_time)).first()

View File

@@ -409,10 +409,9 @@ class ForgettingTriggerRequest(BaseModel):
"""手动触发遗忘周期请求模型""" """手动触发遗忘周期请求模型"""
model_config = ConfigDict(populate_by_name=True, extra="forbid") model_config = ConfigDict(populate_by_name=True, extra="forbid")
group_id: Optional[str] = Field(None, description="组ID可选,用于过滤特定组的节点") group_id: str = Field(..., description="组ID即终端用户ID必填")
max_merge_batch_size: int = Field(100, ge=1, le=1000, description="单次最大融合节点对数默认100") max_merge_batch_size: int = Field(100, ge=1, le=1000, description="单次最大融合节点对数默认100")
min_days_since_access: int = Field(30, ge=1, le=365, description="最小未访问天数默认30天") min_days_since_access: int = Field(30, ge=1, le=365, description="最小未访问天数默认30天")
config_id: Optional[int] = Field(None, description="配置ID可选用于指定遗忘引擎配置") # TODO 后续group_id更换成enduser_id自动与config_id关联 ,要删除此行
class ForgettingConfigResponse(BaseModel): class ForgettingConfigResponse(BaseModel):
@@ -450,15 +449,36 @@ class ForgettingConfigUpdateRequest(BaseModel):
forgetting_interval_hours: Optional[int] = Field(None, ge=1, le=168, description="遗忘周期间隔(小时)") forgetting_interval_hours: Optional[int] = Field(None, ge=1, le=168, description="遗忘周期间隔(小时)")
class ForgettingCycleHistoryPoint(BaseModel):
"""遗忘周期历史数据点模型(用于趋势图)"""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
date: str = Field(..., description="日期(格式: '1/1', '1/2'")
merged_count: int = Field(..., description="每日融合节点数")
average_activation: Optional[float] = Field(None, description="平均激活值")
total_nodes: int = Field(..., description="总节点数")
execution_time: int = Field(..., description="执行时间Unix时间戳")
class PendingForgettingNode(BaseModel):
"""待遗忘节点模型"""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
node_id: str = Field(..., description="节点ID")
node_type: str = Field(..., description="节点类型statement/entity/summary")
content_summary: str = Field(..., description="内容摘要")
activation_value: float = Field(..., description="激活值")
last_access_time: int = Field(..., description="最后访问时间Unix时间戳")
class ForgettingStatsResponse(BaseModel): class ForgettingStatsResponse(BaseModel):
"""遗忘引擎统计信息响应模型""" """遗忘引擎统计信息响应模型"""
model_config = ConfigDict(populate_by_name=True, extra="forbid") model_config = ConfigDict(populate_by_name=True, extra="forbid")
activation_metrics: Dict[str, Any] = Field(..., description="激活值相关指标") activation_metrics: Dict[str, Any] = Field(..., description="激活值相关指标")
node_distribution: Dict[str, int] = Field(..., description="节点类型分布") node_distribution: Dict[str, int] = Field(..., description="节点类型分布")
consistency_check: Optional[Dict[str, Any]] = Field(None, description="数据一致性检查结果") recent_trends: List[ForgettingCycleHistoryPoint] = Field(..., description="最近7个日期的遗忘趋势数据每天取最后一次执行")
nodes_merged_total: int = Field(..., description="累计融合节点对数") pending_nodes: List[PendingForgettingNode] = Field(..., description="待遗忘节点列表前20个满足遗忘条件的节点")
recent_cycles: List[Dict[str, Any]] = Field(..., description="最近的遗忘周期记录") timestamp: int = Field(..., description="统计时间(时间戳)")
timestamp: str = Field(..., description="统计时间ISO格式")
class ForgettingReportResponse(BaseModel): class ForgettingReportResponse(BaseModel):

View File

@@ -11,7 +11,7 @@
""" """
from typing import Optional, Dict, Any, Tuple from typing import Optional, Dict, Any, Tuple
from datetime import datetime from datetime import datetime, timezone
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -24,18 +24,54 @@ from app.core.memory.storage_services.forgetting_engine.config_utils import (
) )
from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.repositories.data_config_repository import DataConfigRepository from app.repositories.data_config_repository import DataConfigRepository
from app.repositories.forgetting_cycle_history_repository import ForgettingCycleHistoryRepository
# 获取API专用日志器 # 获取API专用日志器
api_logger = get_api_logger() api_logger = get_api_logger()
def convert_neo4j_datetime_to_python(value: Any) -> Optional[datetime]:
"""
将 Neo4j DateTime 对象转换为 Python datetime 对象
Args:
value: Neo4j DateTime 对象、Python datetime 对象或字符串
Returns:
Python datetime 对象或 None
"""
if value is None:
return None
try:
# Neo4j DateTime 对象
if hasattr(value, 'to_native'):
return value.to_native()
# Python datetime 对象
elif isinstance(value, datetime):
return value
# 字符串格式
elif isinstance(value, str):
if value.endswith('Z'):
return datetime.fromisoformat(value.replace('Z', '+00:00'))
else:
return datetime.fromisoformat(value)
# 其他类型,尝试转换为字符串
else:
return datetime.fromisoformat(str(value).replace('Z', '+00:00'))
except Exception as e:
api_logger.warning(f"转换时间失败: {value} (类型: {type(value).__name__}), 错误: {e}")
return None
class MemoryForgetService: class MemoryForgetService:
"""遗忘引擎服务类""" """遗忘引擎服务类"""
def __init__(self): def __init__(self):
"""初始化服务""" """初始化服务"""
self.config_repository = DataConfigRepository() self.config_repository = DataConfigRepository()
self.history_repository = ForgettingCycleHistoryRepository()
def _get_neo4j_connector(self) -> Neo4jConnector: def _get_neo4j_connector(self) -> Neo4jConnector:
""" """
@@ -161,10 +197,101 @@ class MemoryForgetService:
'low_activation_nodes': 0 'low_activation_nodes': 0
} }
async def _get_pending_forgetting_nodes(
self,
connector: Neo4jConnector,
group_id: str,
forgetting_threshold: float,
min_days_since_access: int,
limit: int = 20
) -> list[Dict[str, Any]]:
"""
获取待遗忘节点列表
查询满足遗忘条件的节点(激活值低于阈值且最后访问时间超过最小天数)
Args:
connector: Neo4j 连接器
group_id: 组ID
forgetting_threshold: 遗忘阈值
min_days_since_access: 最小未访问天数
limit: 返回节点数量限制
Returns:
list: 待遗忘节点列表
"""
from datetime import timedelta
# 计算最小访问时间ISO 8601 格式字符串,使用 UTC 时区)
min_access_time = datetime.now(timezone.utc) - timedelta(days=min_days_since_access)
min_access_time_str = min_access_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
query = """
MATCH (n)
WHERE (n:Statement OR n:ExtractedEntity OR n:MemorySummary)
AND n.group_id = $group_id
AND n.activation_value IS NOT NULL
AND n.activation_value < $threshold
AND n.last_access_time IS NOT NULL
AND datetime(n.last_access_time) < datetime($min_access_time_str)
RETURN
elementId(n) as node_id,
labels(n)[0] as node_type,
CASE
WHEN n:Statement THEN n.statement
WHEN n:ExtractedEntity THEN n.name
WHEN n:MemorySummary THEN n.content
ELSE ''
END as content_summary,
n.activation_value as activation_value,
n.last_access_time as last_access_time
ORDER BY n.activation_value ASC
LIMIT $limit
"""
params = {
'group_id': group_id,
'threshold': forgetting_threshold,
'min_access_time_str': min_access_time_str,
'limit': limit
}
results = await connector.execute_query(query, **params)
pending_nodes = []
for result in results:
# 将节点类型标签转换为小写
node_type_label = result['node_type'].lower()
if node_type_label == 'extractedentity':
node_type_label = 'entity'
elif node_type_label == 'memorysummary':
node_type_label = 'summary'
# 将 Neo4j DateTime 对象转换为时间戳
last_access_time = result['last_access_time']
last_access_dt = convert_neo4j_datetime_to_python(last_access_time)
# 确保 datetime 带有时区信息(假定为 UTC),避免 naive datetime 导致的时区偏差
if last_access_dt:
if last_access_dt.tzinfo is None:
last_access_dt = last_access_dt.replace(tzinfo=timezone.utc)
last_access_timestamp = int(last_access_dt.timestamp())
else:
last_access_timestamp = 0
pending_nodes.append({
'node_id': str(result['node_id']),
'node_type': node_type_label,
'content_summary': result['content_summary'] or '',
'activation_value': result['activation_value'],
'last_access_time': last_access_timestamp
})
return pending_nodes
async def trigger_forgetting_cycle( async def trigger_forgetting_cycle(
self, self,
db: Session, db: Session,
group_id: Optional[str] = None, group_id: str,
max_merge_batch_size: Optional[int] = None, max_merge_batch_size: Optional[int] = None,
min_days_since_access: Optional[int] = None, min_days_since_access: Optional[int] = None,
config_id: Optional[int] = None config_id: Optional[int] = None
@@ -176,10 +303,10 @@ class MemoryForgetService:
Args: Args:
db: 数据库会话 db: 数据库会话
group_id: 组ID可选 group_id: 组ID即终端用户ID必填
max_merge_batch_size: 最大融合批次大小(可选) max_merge_batch_size: 最大融合批次大小(可选)
min_days_since_access: 最小未访问天数(可选) min_days_since_access: 最小未访问天数(可选)
config_id: 配置ID可选 config_id: 配置ID必填,由控制器层通过 group_id 获取
Returns: Returns:
dict: 遗忘报告 dict: 遗忘报告
@@ -187,6 +314,9 @@ class MemoryForgetService:
# 获取遗忘引擎组件 # 获取遗忘引擎组件
_, _, forgetting_scheduler, config = await self._get_forgetting_components(db, config_id) _, _, forgetting_scheduler, config = await self._get_forgetting_components(db, config_id)
# 记录执行开始时间
execution_time = datetime.now()
# 运行遗忘周期LLM 客户端将在需要时由 forgetting_strategy 内部获取) # 运行遗忘周期LLM 客户端将在需要时由 forgetting_strategy 内部获取)
report = await forgetting_scheduler.run_forgetting_cycle( report = await forgetting_scheduler.run_forgetting_cycle(
group_id=group_id, group_id=group_id,
@@ -202,6 +332,58 @@ class MemoryForgetService:
f"耗时 {report['duration_seconds']:.2f}" f"耗时 {report['duration_seconds']:.2f}"
) )
# 获取当前的激活值统计(用于记录历史)
try:
connector = forgetting_scheduler.connector
stats_query = """
MATCH (n)
WHERE (n:Statement OR n:ExtractedEntity OR n:MemorySummary OR n:Chunk)
AND n.group_id = $group_id
RETURN
count(n) as total_nodes,
avg(n.activation_value) as average_activation,
sum(CASE WHEN n.activation_value IS NOT NULL AND n.activation_value < $threshold THEN 1 ELSE 0 END) as low_activation_nodes
"""
stats_results = await connector.execute_query(
stats_query,
group_id=group_id,
threshold=config['forgetting_threshold']
)
if stats_results:
stats = stats_results[0]
total_nodes = stats['total_nodes'] or 0
average_activation = stats['average_activation']
low_activation_nodes = stats['low_activation_nodes'] or 0
else:
total_nodes = 0
average_activation = None
low_activation_nodes = 0
# 保存历史记录到数据库
self.history_repository.create(
db=db,
end_user_id=group_id,
execution_time=execution_time,
merged_count=report['merged_count'],
failed_count=report['failed_count'],
average_activation_value=average_activation,
total_nodes=total_nodes,
low_activation_nodes=low_activation_nodes,
duration_seconds=report['duration_seconds'],
trigger_type='manual'
)
api_logger.info(
f"已保存遗忘周期历史记录: end_user_id={group_id}, "
f"merged_count={report['merged_count']}"
)
except Exception as e:
# 记录历史失败不应影响主流程
api_logger.error(f"保存遗忘周期历史记录失败: {str(e)}")
return report return report
def read_forgetting_config( def read_forgetting_config(
@@ -337,7 +519,8 @@ class MemoryForgetService:
'nodes_without_activation': result['nodes_without_activation'] or 0, 'nodes_without_activation': result['nodes_without_activation'] or 0,
'average_activation_value': result['average_activation'], 'average_activation_value': result['average_activation'],
'low_activation_nodes': result['low_activation_nodes'] or 0, 'low_activation_nodes': result['low_activation_nodes'] or 0,
'timestamp': datetime.now().isoformat() 'forgetting_threshold': forgetting_threshold,
'timestamp': int(datetime.now().timestamp())
} }
else: else:
activation_metrics = { activation_metrics = {
@@ -346,7 +529,8 @@ class MemoryForgetService:
'nodes_without_activation': 0, 'nodes_without_activation': 0,
'average_activation_value': None, 'average_activation_value': None,
'low_activation_nodes': 0, 'low_activation_nodes': 0,
'timestamp': datetime.now().isoformat() 'forgetting_threshold': forgetting_threshold,
'timestamp': int(datetime.now().timestamp())
} }
# 收集节点类型分布 # 收集节点类型分布
@@ -395,19 +579,95 @@ class MemoryForgetService:
'chunk_count': 0 'chunk_count': 0
} }
# 构建统计信息(不包含监控历史数据 # 获取最近7个日期的历史趋势数据每天取最后一次执行
recent_trends = []
try:
if group_id:
# 查询所有历史记录
history_records = self.history_repository.get_recent_by_end_user(
db=db,
end_user_id=group_id
)
# 按日期分组(一天可能有多次执行,取最后一次)
from collections import OrderedDict
daily_records = OrderedDict()
# 遍历记录(已按时间降序),每个日期只保留第一次遇到的(即最后一次执行)
for record in history_records:
# 提取日期(格式: "1/1", "1/2"- 跨平台兼容
month = record.execution_time.month
day = record.execution_time.day
date_str = f"{month}/{day}"
# 如果这个日期还没有记录,添加它(这是该日期最后一次执行)
if date_str not in daily_records:
daily_records[date_str] = record
# 如果已经有7个不同的日期停止
if len(daily_records) >= 7:
break
# 构建趋势数据点(按时间从旧到新排序)
sorted_dates = sorted(
daily_records.items(),
key=lambda x: x[1].execution_time
)
for date_str, record in sorted_dates:
recent_trends.append({
'date': date_str,
'merged_count': record.merged_count,
'average_activation': record.average_activation_value,
'total_nodes': record.total_nodes,
'execution_time': int(record.execution_time.timestamp())
})
api_logger.info(f"成功获取最近 {len(recent_trends)} 个日期的历史趋势数据")
except Exception as e:
api_logger.error(f"获取历史趋势数据失败: {str(e)}")
# 失败时返回空列表,不影响主流程
# 获取待遗忘节点列表前20个满足遗忘条件的节点
pending_nodes = []
try:
if group_id:
# 验证 min_days_since_access 配置值
min_days = config.get('min_days_since_access')
if min_days is None or not isinstance(min_days, (int, float)) or min_days < 0:
api_logger.warning(
f"min_days_since_access 配置无效: {min_days}, 使用默认值 7"
)
min_days = 7
pending_nodes = await self._get_pending_forgetting_nodes(
connector=connector,
group_id=group_id,
forgetting_threshold=forgetting_threshold,
min_days_since_access=int(min_days),
limit=20
)
api_logger.info(f"成功获取 {len(pending_nodes)} 个待遗忘节点")
except Exception as e:
api_logger.error(f"获取待遗忘节点失败: {str(e)}")
# 失败时返回空列表,不影响主流程
# 构建统计信息
stats = { stats = {
'activation_metrics': activation_metrics, 'activation_metrics': activation_metrics,
'node_distribution': node_distribution, 'node_distribution': node_distribution,
'consistency_check': None, # 不再提供一致性检查 'recent_trends': recent_trends,
'nodes_merged_total': 0, # 不再跟踪累计融合数 'pending_nodes': pending_nodes,
'recent_cycles': [], # 不再提供历史记录 'timestamp': int(datetime.now().timestamp())
'timestamp': datetime.now().isoformat()
} }
api_logger.info( api_logger.info(
f"成功获取遗忘引擎统计: total_nodes={stats['activation_metrics']['total_nodes']}, " f"成功获取遗忘引擎统计: total_nodes={stats['activation_metrics']['total_nodes']}, "
f"low_activation_nodes={stats['activation_metrics']['low_activation_nodes']}" f"low_activation_nodes={stats['activation_metrics']['low_activation_nodes']}, "
f"trend_days={len(recent_trends)}, pending_nodes={len(pending_nodes)}"
) )
return stats return stats

View File

@@ -89,11 +89,15 @@ class DataConfigService: # 数据配置服务类PostgreSQL
value = item[field] value = item[field]
dt = None dt = None
# 如果是 datetime 对象,直接使用 # 处理不同类型的时间值
if isinstance(value, datetime): if hasattr(value, 'to_native'):
# Neo4j DateTime 对象
dt = value.to_native()
elif isinstance(value, datetime):
# Python datetime 对象
dt = value dt = value
# 如果是字符串,先解析
elif isinstance(value, str): elif isinstance(value, str):
# 字符串格式
try: try:
dt = datetime.fromisoformat(value.replace('Z', '+00:00')) dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
except Exception: except Exception:

View File

@@ -160,11 +160,20 @@ class MemoryInsightHelper:
month_counts = Counter() month_counts = Counter()
valid_dates_count = 0 valid_dates_count = 0
for record in records: for record in records:
creation_time_str = record.get("creation_time") creation_time = record.get("creation_time")
if not creation_time_str: if not creation_time:
continue continue
try: try:
dt_object = datetime.fromisoformat(creation_time_str.replace("Z", "+00:00")) # 处理 Neo4j DateTime 对象或字符串
if hasattr(creation_time, 'to_native'):
dt_object = creation_time.to_native()
elif isinstance(creation_time, str):
dt_object = datetime.fromisoformat(creation_time.replace("Z", "+00:00"))
elif isinstance(creation_time, datetime):
dt_object = creation_time
else:
dt_object = datetime.fromisoformat(str(creation_time).replace("Z", "+00:00"))
month_counts[dt_object.month] += 1 month_counts[dt_object.month] += 1
valid_dates_count += 1 valid_dates_count += 1
except (ValueError, TypeError, AttributeError): except (ValueError, TypeError, AttributeError):
@@ -225,8 +234,33 @@ class MemoryInsightHelper:
) )
start_year, end_year = "N/A", "N/A" start_year, end_year = "N/A", "N/A"
if time_records and time_records[0]["start_time"]: if time_records and time_records[0]["start_time"]:
start_year = datetime.fromisoformat(time_records[0]["start_time"].replace("Z", "+00:00")).year start_time = time_records[0]["start_time"]
end_year = datetime.fromisoformat(time_records[0]["end_time"].replace("Z", "+00:00")).year end_time = time_records[0]["end_time"]
# 处理 Neo4j DateTime 对象或字符串
try:
if hasattr(start_time, 'to_native'):
start_year = start_time.to_native().year
elif isinstance(start_time, str):
start_year = datetime.fromisoformat(start_time.replace("Z", "+00:00")).year
elif isinstance(start_time, datetime):
start_year = start_time.year
else:
start_year = datetime.fromisoformat(str(start_time).replace("Z", "+00:00")).year
except Exception:
start_year = "N/A"
try:
if hasattr(end_time, 'to_native'):
end_year = end_time.to_native().year
elif isinstance(end_time, str):
end_year = datetime.fromisoformat(end_time.replace("Z", "+00:00")).year
elif isinstance(end_time, datetime):
end_year = end_time.year
else:
end_year = datetime.fromisoformat(str(end_time).replace("Z", "+00:00")).year
except Exception:
end_year = "N/A"
return { return {
"user_id": most_connected_user, "user_id": most_connected_user,