From ac7c891ded676ebd90f2f49d84f2ac8989d21d05 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 26 Mar 2026 20:44:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B4=BB=E5=8A=A8=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/app/cache/memory/activity_stats_cache.py | 210 +++++++++--------- .../core/memory/agent/utils/write_tools.py | 38 ++-- 2 files changed, 124 insertions(+), 124 deletions(-) diff --git a/api/app/cache/memory/activity_stats_cache.py b/api/app/cache/memory/activity_stats_cache.py index 6b162cdd..35c702b1 100644 --- a/api/app/cache/memory/activity_stats_cache.py +++ b/api/app/cache/memory/activity_stats_cache.py @@ -1,124 +1,124 @@ -""" -Recent Activity Stats Cache +# """ +# Recent Activity Stats Cache -记忆提取活动统计缓存模块 -用于缓存每次记忆提取流程的统计数据,按 workspace_id 存储,24小时后释放 -查询命令:cache:memory:activity_stats:by_workspace:7de31a97-40a6-4fc0-b8d3-15c89f523843 -""" -import json -import logging -from typing import Optional, Dict, Any -from datetime import datetime +# 记忆提取活动统计缓存模块 +# 用于缓存每次记忆提取流程的统计数据,按 workspace_id 存储,24小时后释放 +# 查询命令:cache:memory:activity_stats:by_workspace:7de31a97-40a6-4fc0-b8d3-15c89f523843 +# """ +# import json +# import logging +# from typing import Optional, Dict, Any +# from datetime import datetime -from app.aioRedis import aio_redis +# from app.aioRedis import aio_redis -logger = logging.getLogger(__name__) +# logger = logging.getLogger(__name__) -# 缓存过期时间:24小时 -ACTIVITY_STATS_CACHE_EXPIRE = 86400 +# # 缓存过期时间:24小时 +# ACTIVITY_STATS_CACHE_EXPIRE = 86400 -class ActivityStatsCache: - """记忆提取活动统计缓存类""" +# class ActivityStatsCache: +# """记忆提取活动统计缓存类""" - PREFIX = "cache:memory:activity_stats" +# PREFIX = "cache:memory:activity_stats" - @classmethod - def _get_key(cls, workspace_id: str) -> str: - """生成 Redis key +# @classmethod +# def _get_key(cls, workspace_id: str) -> str: +# """生成 Redis key - Args: - workspace_id: 工作空间ID +# Args: +# workspace_id: 工作空间ID - Returns: - 完整的 Redis key - """ - return f"{cls.PREFIX}:by_workspace:{workspace_id}" +# Returns: +# 完整的 Redis key +# """ +# return f"{cls.PREFIX}:by_workspace:{workspace_id}" - @classmethod - async def set_activity_stats( - cls, - workspace_id: str, - stats: Dict[str, Any], - expire: int = ACTIVITY_STATS_CACHE_EXPIRE, - ) -> bool: - """设置记忆提取活动统计缓存 +# @classmethod +# async def set_activity_stats( +# cls, +# workspace_id: str, +# stats: Dict[str, Any], +# expire: int = ACTIVITY_STATS_CACHE_EXPIRE, +# ) -> bool: +# """设置记忆提取活动统计缓存 - Args: - workspace_id: 工作空间ID - stats: 统计数据,格式: - { - "chunk_count": int, - "statements_count": int, - "triplet_entities_count": int, - "triplet_relations_count": int, - "temporal_count": int, - } - expire: 过期时间(秒),默认24小时 +# Args: +# workspace_id: 工作空间ID +# stats: 统计数据,格式: +# { +# "chunk_count": int, +# "statements_count": int, +# "triplet_entities_count": int, +# "triplet_relations_count": int, +# "temporal_count": int, +# } +# expire: 过期时间(秒),默认24小时 - Returns: - 是否设置成功 - """ - try: - key = cls._get_key(workspace_id) - payload = { - "stats": stats, - "generated_at": datetime.now().isoformat(), - "workspace_id": workspace_id, - "cached": True, - } - value = json.dumps(payload, ensure_ascii=False) - await aio_redis.set(key, value, ex=expire) - logger.info(f"设置活动统计缓存成功: {key}, 过期时间: {expire}秒") - return True - except Exception as e: - logger.error(f"设置活动统计缓存失败: {e}", exc_info=True) - return False +# Returns: +# 是否设置成功 +# """ +# try: +# key = cls._get_key(workspace_id) +# payload = { +# "stats": stats, +# "generated_at": datetime.now().isoformat(), +# "workspace_id": workspace_id, +# "cached": True, +# } +# value = json.dumps(payload, ensure_ascii=False) +# await aio_redis.set(key, value, ex=expire) +# logger.info(f"设置活动统计缓存成功: {key}, 过期时间: {expire}秒") +# return True +# except Exception as e: +# logger.error(f"设置活动统计缓存失败: {e}", exc_info=True) +# return False - @classmethod - async def get_activity_stats( - cls, - workspace_id: str, - ) -> Optional[Dict[str, Any]]: - """获取记忆提取活动统计缓存 +# @classmethod +# async def get_activity_stats( +# cls, +# workspace_id: str, +# ) -> Optional[Dict[str, Any]]: +# """获取记忆提取活动统计缓存 - Args: - workspace_id: 工作空间ID +# Args: +# workspace_id: 工作空间ID - Returns: - 统计数据字典,缓存不存在或已过期返回 None - """ - try: - key = cls._get_key(workspace_id) - value = await aio_redis.get(key) - if value: - payload = json.loads(value) - logger.info(f"命中活动统计缓存: {key}") - return payload - logger.info(f"活动统计缓存不存在或已过期: {key}") - return None - except Exception as e: - logger.error(f"获取活动统计缓存失败: {e}", exc_info=True) - return None +# Returns: +# 统计数据字典,缓存不存在或已过期返回 None +# """ +# try: +# key = cls._get_key(workspace_id) +# value = await aio_redis.get(key) +# if value: +# payload = json.loads(value) +# logger.info(f"命中活动统计缓存: {key}") +# return payload +# logger.info(f"活动统计缓存不存在或已过期: {key}") +# return None +# except Exception as e: +# logger.error(f"获取活动统计缓存失败: {e}", exc_info=True) +# return None - @classmethod - async def delete_activity_stats( - cls, - workspace_id: str, - ) -> bool: - """删除记忆提取活动统计缓存 +# @classmethod +# async def delete_activity_stats( +# cls, +# workspace_id: str, +# ) -> bool: +# """删除记忆提取活动统计缓存 - Args: - workspace_id: 工作空间ID +# Args: +# workspace_id: 工作空间ID - Returns: - 是否删除成功 - """ - try: - key = cls._get_key(workspace_id) - result = await aio_redis.delete(key) - logger.info(f"删除活动统计缓存: {key}, 结果: {result}") - return result > 0 - except Exception as e: - logger.error(f"删除活动统计缓存失败: {e}", exc_info=True) - return False +# Returns: +# 是否删除成功 +# """ +# try: +# key = cls._get_key(workspace_id) +# result = await aio_redis.delete(key) +# logger.info(f"删除活动统计缓存: {key}, 结果: {result}") +# return result > 0 +# except Exception as e: +# logger.error(f"删除活动统计缓存失败: {e}", exc_info=True) +# return False diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index 55bcb8ba..c01a36d1 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -260,24 +260,24 @@ async def write( with open(log_file, "a", encoding="utf-8") as f: f.write(f"=== Pipeline Run Completed: {timestamp} ===\n\n") - # 将提取统计写入 Redis,按 workspace_id 存储 - try: - from app.cache.memory.activity_stats_cache import ActivityStatsCache + # # 将提取统计写入 Redis,按 workspace_id 存储 + # try: + # from app.cache.memory.activity_stats_cache import ActivityStatsCache - stats_to_cache = { - "chunk_count": len(all_chunk_nodes) if all_chunk_nodes else 0, - "statements_count": len(all_statement_nodes) if all_statement_nodes else 0, - "triplet_entities_count": len(all_entity_nodes) if all_entity_nodes else 0, - "triplet_relations_count": len(all_entity_entity_edges) if all_entity_entity_edges else 0, - "temporal_count": 0, - } - await ActivityStatsCache.set_activity_stats( - workspace_id=str(memory_config.workspace_id), - stats=stats_to_cache, - ) - logger.info(f"[WRITE] 活动统计已写入 Redis: workspace_id={memory_config.workspace_id}") - except Exception as cache_err: - logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True) + # stats_to_cache = { + # "chunk_count": len(all_chunk_nodes) if all_chunk_nodes else 0, + # "statements_count": len(all_statement_nodes) if all_statement_nodes else 0, + # "triplet_entities_count": len(all_entity_nodes) if all_entity_nodes else 0, + # "triplet_relations_count": len(all_entity_entity_edges) if all_entity_entity_edges else 0, + # "temporal_count": 0, + # } + # await ActivityStatsCache.set_activity_stats( + # workspace_id=str(memory_config.workspace_id), + # stats=stats_to_cache, + # ) + # logger.info(f"[WRITE] 活动统计已写入 Redis: workspace_id={memory_config.workspace_id}") + # except Exception as cache_err: + # logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True) - logger.info("=== Pipeline Complete ===") - logger.info(f"Total execution time: {total_time:.2f} seconds") + # logger.info("=== Pipeline Complete ===") + # logger.info(f"Total execution time: {total_time:.2f} seconds")