[fix] Set the page for the nodes to be forgotten

This commit is contained in:
lanceyq
2026-03-30 13:45:17 +08:00
parent 8f216db353
commit 5703fc0cb4
3 changed files with 240 additions and 28 deletions

View File

@@ -203,29 +203,36 @@ class MemoryForgetService:
connector: Neo4jConnector,
end_user_id: str,
forgetting_threshold: float,
min_days_since_access: int
) -> list[Dict[str, Any]]:
min_days_since_access: int,
page: Optional[int] = None,
pagesize: Optional[int] = None
) -> Dict[str, Any]:
"""
获取待遗忘节点列表
查询满足遗忘条件的节点(激活值低于阈值且最后访问时间超过最小天数)
查询满足遗忘条件的节点(激活值低于阈值且最后访问时间超过最小天数)。支持分页查询。
Args:
connector: Neo4j 连接器
end_user_id: 组ID
forgetting_threshold: 遗忘阈值
min_days_since_access: 最小未访问天数
page: 页码可选从1开始
pagesize: 每页数量(可选)
Returns:
list: 待遗忘节点列表
dict: 包含待遗忘节点列表和分页信息的字典
- items: 待遗忘节点列表
- page: 分页信息(分页时)
"""
from datetime import timedelta
# 计算最小访问时间ISO 8601 格式字符串,使用 UTC 时区)
min_access_time = datetime.now(timezone.utc) - timedelta(days=min_days_since_access)
min_access_time_str = min_access_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
query = """
# 基础查询(用于获取总数)
count_query = """
MATCH (n)
WHERE (n:Statement OR n:ExtractedEntity OR n:MemorySummary)
AND n.end_user_id = $end_user_id
@@ -233,10 +240,22 @@ class MemoryForgetService:
AND n.activation_value < $threshold
AND n.last_access_time IS NOT NULL
AND datetime(n.last_access_time) < datetime($min_access_time_str)
RETURN
RETURN count(n) as total
"""
# 数据查询
data_query = """
MATCH (n)
WHERE (n:Statement OR n:ExtractedEntity OR n:MemorySummary)
AND n.end_user_id = $end_user_id
AND n.activation_value IS NOT NULL
AND n.activation_value < $threshold
AND n.last_access_time IS NOT NULL
AND datetime(n.last_access_time) < datetime($min_access_time_str)
RETURN
elementId(n) as node_id,
labels(n)[0] as node_type,
CASE
CASE
WHEN n:Statement THEN n.statement
WHEN n:ExtractedEntity THEN n.name
WHEN n:MemorySummary THEN n.content
@@ -246,15 +265,31 @@ class MemoryForgetService:
n.last_access_time as last_access_time
ORDER BY n.activation_value ASC
"""
# 如果启用分页,添加 SKIP 和 LIMIT
if page is not None and pagesize is not None and page > 0 and pagesize > 0:
data_query += " SKIP $skip LIMIT $limit"
params = {
'end_user_id': end_user_id,
'threshold': forgetting_threshold,
'min_access_time_str': min_access_time_str
}
results = await connector.execute_query(query, **params)
# 获取总数(分页时需要)
total = 0
if page is not None and pagesize is not None and page > 0 and pagesize > 0:
count_results = await connector.execute_query(count_query, **params)
if count_results:
total = count_results[0]['total']
# 添加分页参数
if page is not None and pagesize is not None and page > 0 and pagesize > 0:
params['skip'] = (page - 1) * pagesize
params['limit'] = pagesize
results = await connector.execute_query(data_query, **params)
pending_nodes = []
for result in results:
# 将节点类型标签转换为小写
@@ -263,7 +298,7 @@ class MemoryForgetService:
node_type_label = 'entity'
elif node_type_label == 'memorysummary':
node_type_label = 'summary'
# 将 Neo4j DateTime 对象转换为时间戳(毫秒)
last_access_time = result['last_access_time']
last_access_dt = convert_neo4j_datetime_to_python(last_access_time)
@@ -274,7 +309,7 @@ class MemoryForgetService:
last_access_timestamp = int(last_access_dt.timestamp() * 1000)
else:
last_access_timestamp = 0
pending_nodes.append({
'node_id': str(result['node_id']),
'node_type': node_type_label,
@@ -282,8 +317,20 @@ class MemoryForgetService:
'activation_value': result['activation_value'],
'last_access_time': last_access_timestamp
})
return pending_nodes
# 构建返回结果
result: Dict[str, Any] = {'items': pending_nodes}
# 如果启用分页,添加分页信息
if page is not None and pagesize is not None and page > 0 and pagesize > 0:
result['page'] = {
'page': page,
'pagesize': pagesize,
'total': total,
'hasnext': (page * pagesize) < total
}
return result
async def trigger_forgetting_cycle(
self,
@@ -656,24 +703,79 @@ class MemoryForgetService:
except Exception as e:
api_logger.error(f"获取待遗忘节点失败: {str(e)}")
# 失败时返回空列表,不影响主流程
# 构建统计信息
# 构建统计信息(不包含 pending_nodes已分离到独立接口
stats = {
'activation_metrics': activation_metrics,
'node_distribution': node_distribution,
'recent_trends': recent_trends,
'pending_nodes': pending_nodes,
'timestamp': int(datetime.now().timestamp() * 1000)
}
api_logger.info(
f"成功获取遗忘引擎统计: total_nodes={stats['activation_metrics']['total_nodes']}, "
f"low_activation_nodes={stats['activation_metrics']['low_activation_nodes']}, "
f"trend_days={len(recent_trends)}, pending_nodes={len(pending_nodes)}"
f"trend_days={len(recent_trends)}"
)
return stats
async def get_pending_nodes(
self,
db: Session,
end_user_id: str,
config_id: Optional[UUID] = None,
page: int = 1,
pagesize: int = 10
) -> Dict[str, Any]:
"""
获取待遗忘节点列表(独立分页接口)
查询满足遗忘条件的节点(激活值低于阈值且最后访问时间超过最小天数)。
Args:
db: 数据库会话
end_user_id: 组ID必填
config_id: 配置ID可选用于获取遗忘阈值
page: 页码从1开始默认1
pagesize: 每页数量默认10
Returns:
dict: 包含待遗忘节点列表和分页信息的字典
- items: 待遗忘节点列表
- page: 分页信息
"""
# 获取遗忘引擎组件
_, _, forgetting_scheduler, config = await self._get_forgetting_components(db, config_id)
connector = forgetting_scheduler.connector
forgetting_threshold = config['forgetting_threshold']
# 验证 min_days_since_access 配置值
min_days = config.get('min_days_since_access')
if min_days is None or not isinstance(min_days, (int, float)) or min_days < 0:
api_logger.warning(
f"min_days_since_access 配置无效: {min_days}, 使用默认值 7"
)
min_days = 7
# 调用内部方法获取分页数据
pending_nodes_result = await self._get_pending_forgetting_nodes(
connector=connector,
end_user_id=end_user_id,
forgetting_threshold=forgetting_threshold,
min_days_since_access=int(min_days),
page=page,
pagesize=pagesize
)
api_logger.info(
f"成功获取待遗忘节点列表: end_user_id={end_user_id}, "
f"page={page}, pagesize={pagesize}, total={pending_nodes_result.get('page', {}).get('total', 0)}"
)
return pending_nodes_result
async def get_forgetting_curve(
self,
db: Session,