""" Recent Activity Stats Cache 记忆提取活动统计缓存模块 用于缓存每次记忆提取流程的统计数据,按 workspace_id 存储,24小时后释放 查询命令:cache:memory:activity_stats:by_workspace:7de31a97-40a6-4fc0-b8d3-15c89f523843 """ import json import logging from typing import Optional, Dict, Any from datetime import datetime from app.aioRedis import get_thread_safe_redis logger = logging.getLogger(__name__) # 缓存过期时间:24小时 ACTIVITY_STATS_CACHE_EXPIRE = 86400 class ActivityStatsCache: """记忆提取活动统计缓存类""" PREFIX = "cache:memory:activity_stats" @classmethod def _get_key(cls, workspace_id: str) -> str: """生成 Redis key Args: workspace_id: 工作空间ID Returns: 完整的 Redis key """ return f"{cls.PREFIX}:by_workspace:{workspace_id}" @classmethod async def set_activity_stats( cls, workspace_id: str, stats: Dict[str, Any], expire: int = ACTIVITY_STATS_CACHE_EXPIRE, ) -> bool: """设置记忆提取活动统计缓存 Args: workspace_id: 工作空间ID stats: 统计数据,格式: { "chunk_count": int, "statements_count": int, "triplet_entities_count": int, "triplet_relations_count": int, "temporal_count": int, } expire: 过期时间(秒),默认24小时 Returns: 是否设置成功 """ try: key = cls._get_key(workspace_id) payload = { "stats": stats, "generated_at": datetime.now().isoformat(), "workspace_id": workspace_id, "cached": True, } value = json.dumps(payload, ensure_ascii=False) await get_thread_safe_redis().set(key, value, ex=expire) logger.info(f"设置活动统计缓存成功: {key}, 过期时间: {expire}秒") return True except Exception as e: logger.error(f"设置活动统计缓存失败: {e}", exc_info=True) return False @classmethod async def get_activity_stats( cls, workspace_id: str, ) -> Optional[Dict[str, Any]]: """获取记忆提取活动统计缓存 Args: workspace_id: 工作空间ID Returns: 统计数据字典,缓存不存在或已过期返回 None """ try: key = cls._get_key(workspace_id) value = await get_thread_safe_redis().get(key) if value: payload = json.loads(value) logger.info(f"命中活动统计缓存: {key}") return payload logger.info(f"活动统计缓存不存在或已过期: {key}") return None except Exception as e: logger.error(f"获取活动统计缓存失败: {e}", exc_info=True) return None @classmethod async def delete_activity_stats( cls, workspace_id: str, ) -> bool: """删除记忆提取活动统计缓存 Args: workspace_id: 工作空间ID Returns: 是否删除成功 """ try: key = cls._get_key(workspace_id) result = await get_thread_safe_redis().delete(key) logger.info(f"删除活动统计缓存: {key}, 结果: {result}") return result > 0 except Exception as e: logger.error(f"删除活动统计缓存失败: {e}", exc_info=True) return False