125 lines
3.6 KiB
Python
125 lines
3.6 KiB
Python
"""
|
||
Recent Activity Stats Cache
|
||
|
||
记忆提取活动统计缓存模块
|
||
用于缓存每次记忆提取流程的统计数据,按 workspace_id 存储,24小时后释放
|
||
查询命令:cache:memory:activity_stats:by_workspace:7de31a97-40a6-4fc0-b8d3-15c89f523843
|
||
"""
|
||
import json
|
||
import logging
|
||
from typing import Optional, Dict, Any
|
||
from datetime import datetime
|
||
|
||
from app.aioRedis import aio_redis
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 缓存过期时间:24小时
|
||
ACTIVITY_STATS_CACHE_EXPIRE = 86400
|
||
|
||
|
||
class ActivityStatsCache:
|
||
"""记忆提取活动统计缓存类"""
|
||
|
||
PREFIX = "cache:memory:activity_stats"
|
||
|
||
@classmethod
|
||
def _get_key(cls, workspace_id: str) -> str:
|
||
"""生成 Redis key
|
||
|
||
Args:
|
||
workspace_id: 工作空间ID
|
||
|
||
Returns:
|
||
完整的 Redis key
|
||
"""
|
||
return f"{cls.PREFIX}:by_workspace:{workspace_id}"
|
||
|
||
@classmethod
|
||
async def set_activity_stats(
|
||
cls,
|
||
workspace_id: str,
|
||
stats: Dict[str, Any],
|
||
expire: int = ACTIVITY_STATS_CACHE_EXPIRE,
|
||
) -> bool:
|
||
"""设置记忆提取活动统计缓存
|
||
|
||
Args:
|
||
workspace_id: 工作空间ID
|
||
stats: 统计数据,格式:
|
||
{
|
||
"chunk_count": int,
|
||
"statements_count": int,
|
||
"triplet_entities_count": int,
|
||
"triplet_relations_count": int,
|
||
"temporal_count": int,
|
||
}
|
||
expire: 过期时间(秒),默认24小时
|
||
|
||
Returns:
|
||
是否设置成功
|
||
"""
|
||
try:
|
||
key = cls._get_key(workspace_id)
|
||
payload = {
|
||
"stats": stats,
|
||
"generated_at": datetime.now().isoformat(),
|
||
"workspace_id": workspace_id,
|
||
"cached": True,
|
||
}
|
||
value = json.dumps(payload, ensure_ascii=False)
|
||
await aio_redis.set(key, value, ex=expire)
|
||
logger.info(f"设置活动统计缓存成功: {key}, 过期时间: {expire}秒")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"设置活动统计缓存失败: {e}", exc_info=True)
|
||
return False
|
||
|
||
@classmethod
|
||
async def get_activity_stats(
|
||
cls,
|
||
workspace_id: str,
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""获取记忆提取活动统计缓存
|
||
|
||
Args:
|
||
workspace_id: 工作空间ID
|
||
|
||
Returns:
|
||
统计数据字典,缓存不存在或已过期返回 None
|
||
"""
|
||
try:
|
||
key = cls._get_key(workspace_id)
|
||
value = await aio_redis.get(key)
|
||
if value:
|
||
payload = json.loads(value)
|
||
logger.info(f"命中活动统计缓存: {key}")
|
||
return payload
|
||
logger.info(f"活动统计缓存不存在或已过期: {key}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取活动统计缓存失败: {e}", exc_info=True)
|
||
return None
|
||
|
||
@classmethod
|
||
async def delete_activity_stats(
|
||
cls,
|
||
workspace_id: str,
|
||
) -> bool:
|
||
"""删除记忆提取活动统计缓存
|
||
|
||
Args:
|
||
workspace_id: 工作空间ID
|
||
|
||
Returns:
|
||
是否删除成功
|
||
"""
|
||
try:
|
||
key = cls._get_key(workspace_id)
|
||
result = await aio_redis.delete(key)
|
||
logger.info(f"删除活动统计缓存: {key}, 结果: {result}")
|
||
return result > 0
|
||
except Exception as e:
|
||
logger.error(f"删除活动统计缓存失败: {e}", exc_info=True)
|
||
return False
|