diff --git a/api/app/cache/memory/__init__.py b/api/app/cache/memory/__init__.py index 9a7fd225..551062ac 100644 --- a/api/app/cache/memory/__init__.py +++ b/api/app/cache/memory/__init__.py @@ -4,7 +4,9 @@ Memory 缓存模块 提供记忆系统相关的缓存功能 """ from .interest_memory import InterestMemoryCache +from .activity_stats_cache import ActivityStatsCache __all__ = [ "InterestMemoryCache", + "ActivityStatsCache", ] diff --git a/api/app/cache/memory/activity_stats_cache.py b/api/app/cache/memory/activity_stats_cache.py new file mode 100644 index 00000000..6b162cdd --- /dev/null +++ b/api/app/cache/memory/activity_stats_cache.py @@ -0,0 +1,124 @@ +""" +Recent Activity Stats Cache + +记忆提取活动统计缓存模块 +用于缓存每次记忆提取流程的统计数据,按 workspace_id 存储,24小时后释放 +查询命令:cache:memory:activity_stats:by_workspace:7de31a97-40a6-4fc0-b8d3-15c89f523843 +""" +import json +import logging +from typing import Optional, Dict, Any +from datetime import datetime + +from app.aioRedis import aio_redis + +logger = logging.getLogger(__name__) + +# 缓存过期时间:24小时 +ACTIVITY_STATS_CACHE_EXPIRE = 86400 + + +class ActivityStatsCache: + """记忆提取活动统计缓存类""" + + PREFIX = "cache:memory:activity_stats" + + @classmethod + def _get_key(cls, workspace_id: str) -> str: + """生成 Redis key + + Args: + workspace_id: 工作空间ID + + Returns: + 完整的 Redis key + """ + return f"{cls.PREFIX}:by_workspace:{workspace_id}" + + @classmethod + async def set_activity_stats( + cls, + workspace_id: str, + stats: Dict[str, Any], + expire: int = ACTIVITY_STATS_CACHE_EXPIRE, + ) -> bool: + """设置记忆提取活动统计缓存 + + Args: + workspace_id: 工作空间ID + stats: 统计数据,格式: + { + "chunk_count": int, + "statements_count": int, + "triplet_entities_count": int, + "triplet_relations_count": int, + "temporal_count": int, + } + expire: 过期时间(秒),默认24小时 + + Returns: + 是否设置成功 + """ + try: + key = cls._get_key(workspace_id) + payload = { + "stats": stats, + "generated_at": datetime.now().isoformat(), + "workspace_id": workspace_id, + "cached": True, + } + value = json.dumps(payload, ensure_ascii=False) + await aio_redis.set(key, value, ex=expire) + logger.info(f"设置活动统计缓存成功: {key}, 过期时间: {expire}秒") + return True + except Exception as e: + logger.error(f"设置活动统计缓存失败: {e}", exc_info=True) + return False + + @classmethod + async def get_activity_stats( + cls, + workspace_id: str, + ) -> Optional[Dict[str, Any]]: + """获取记忆提取活动统计缓存 + + Args: + workspace_id: 工作空间ID + + Returns: + 统计数据字典,缓存不存在或已过期返回 None + """ + try: + key = cls._get_key(workspace_id) + value = await aio_redis.get(key) + if value: + payload = json.loads(value) + logger.info(f"命中活动统计缓存: {key}") + return payload + logger.info(f"活动统计缓存不存在或已过期: {key}") + return None + except Exception as e: + logger.error(f"获取活动统计缓存失败: {e}", exc_info=True) + return None + + @classmethod + async def delete_activity_stats( + cls, + workspace_id: str, + ) -> bool: + """删除记忆提取活动统计缓存 + + Args: + workspace_id: 工作空间ID + + Returns: + 是否删除成功 + """ + try: + key = cls._get_key(workspace_id) + result = await aio_redis.delete(key) + logger.info(f"删除活动统计缓存: {key}, 结果: {result}") + return result > 0 + except Exception as e: + logger.error(f"删除活动统计缓存失败: {e}", exc_info=True) + return False diff --git a/api/app/controllers/memory_storage_controller.py b/api/app/controllers/memory_storage_controller.py index ee45fb83..d91dfc36 100644 --- a/api/app/controllers/memory_storage_controller.py +++ b/api/app/controllers/memory_storage_controller.py @@ -544,10 +544,11 @@ async def clear_hot_memory_tags_cache( @router.get("/analytics/recent_activity_stats", response_model=ApiResponse) async def get_recent_activity_stats_api( current_user: User = Depends(get_current_user), - ) -> dict: - api_logger.info("Recent activity stats requested") +) -> dict: + workspace_id = str(current_user.current_workspace_id) if current_user.current_workspace_id else None + api_logger.info(f"Recent activity stats requested: workspace_id={workspace_id}") try: - result = await analytics_recent_activity_stats() + result = await analytics_recent_activity_stats(workspace_id=workspace_id) return success(data=result, msg="查询成功") except Exception as e: api_logger.error(f"Recent activity stats failed: {str(e)}") diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index 93c6ef6f..22030278 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -225,5 +225,24 @@ async def write( with open(log_file, "a", encoding="utf-8") as f: f.write(f"=== Pipeline Run Completed: {timestamp} ===\n\n") + # 将提取统计写入 Redis,按 workspace_id 存储 + try: + from app.cache.memory.activity_stats_cache import ActivityStatsCache + + stats_to_cache = { + "chunk_count": len(all_chunk_nodes) if all_chunk_nodes else 0, + "statements_count": len(all_statement_nodes) if all_statement_nodes else 0, + "triplet_entities_count": len(all_entity_nodes) if all_entity_nodes else 0, + "triplet_relations_count": len(all_entity_entity_edges) if all_entity_entity_edges else 0, + "temporal_count": 0, + } + await ActivityStatsCache.set_activity_stats( + workspace_id=str(memory_config.workspace_id), + stats=stats_to_cache, + ) + logger.info(f"[WRITE] 活动统计已写入 Redis: workspace_id={memory_config.workspace_id}") + except Exception as cache_err: + logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True) + logger.info("=== Pipeline Complete ===") logger.info(f"Total execution time: {total_time:.2f} seconds") \ No newline at end of file diff --git a/api/app/services/memory_storage_service.py b/api/app/services/memory_storage_service.py index a83d6830..6e7c1ad4 100644 --- a/api/app/services/memory_storage_service.py +++ b/api/app/services/memory_storage_service.py @@ -783,8 +783,37 @@ async def analytics_hot_memory_tags( await connector.close() -async def analytics_recent_activity_stats() -> Dict[str, Any]: - stats, _msg = get_recent_activity_stats() +async def analytics_recent_activity_stats(workspace_id: Optional[str] = None) -> Dict[str, Any]: + """获取最近记忆提取活动统计。 + + 优先从 Redis 缓存读取(按 workspace_id),缓存不存在时降级到日志文件解析。 + + Args: + workspace_id: 工作空间ID,用于从 Redis 读取对应缓存 + + Returns: + 包含 total、stats、latest_relative、source 的统计字典 + """ + stats = None + source = "log" + + # 优先从 Redis 读取 + if workspace_id: + try: + from app.cache.memory.activity_stats_cache import ActivityStatsCache + cached = await ActivityStatsCache.get_activity_stats(workspace_id) + if cached: + stats = cached.get("stats", {}) + source = "redis" + logger.info(f"[ANALYTICS] 从 Redis 读取活动统计: workspace_id={workspace_id}") + except Exception as e: + logger.warning(f"[ANALYTICS] 读取 Redis 活动统计失败,降级到日志: {e}") + + # 降级:从日志文件解析 + if stats is None: + stats, _msg = get_recent_activity_stats() + source = "log" + total = ( stats.get("chunk_count", 0) + stats.get("statements_count", 0) @@ -792,26 +821,29 @@ async def analytics_recent_activity_stats() -> Dict[str, Any]: + stats.get("triplet_relations_count", 0) + stats.get("temporal_count", 0) ) - # 精简:仅提供“最新一次活动多久前” - latest_relative = None - try: - info = stats.get("log_path", "") - idx = info.rfind("最新:") - if idx != -1: - latest_path = info[idx + 3 :].strip() - if latest_path and os.path.exists(latest_path): - import time - diff = max(0.0, time.time() - os.path.getmtime(latest_path)) - m = int(diff // 60) - if m < 1: - latest_relative = "刚刚" - elif m < 60: - latest_relative = "一会前" - else: - latest_relative = "较早前" - except Exception: - pass - data = {"total": total, "stats": stats, "latest_relative": latest_relative} + # 计算"最新一次活动多久前"(仅日志来源时有效) + latest_relative = None + if source == "log": + try: + info = stats.get("log_path", "") + idx = info.rfind("最新:") + if idx != -1: + latest_path = info[idx + 3:].strip() + if latest_path and os.path.exists(latest_path): + import time + diff = max(0.0, time.time() - os.path.getmtime(latest_path)) + m = int(diff // 60) + if m < 1: + latest_relative = "刚刚" + elif m < 60: + latest_relative = "一会前" + else: + latest_relative = "较早前" + except Exception: + pass + + data = {"total": total, "stats": stats, "latest_relative": latest_relative, "source": source} return data + diff --git a/api/app/services/pilot_run_service.py b/api/app/services/pilot_run_service.py index 4d9cbb5e..5d00d8a5 100644 --- a/api/app/services/pilot_run_service.py +++ b/api/app/services/pilot_run_service.py @@ -326,6 +326,25 @@ async def run_pilot_extraction( logger.info("Pilot run completed: Skipping Neo4j save") + # 将提取统计写入 Redis,按 workspace_id 存储 + try: + from app.cache.memory.activity_stats_cache import ActivityStatsCache + + stats_to_cache = { + "chunk_count": len(chunk_nodes) if chunk_nodes else 0, + "statements_count": len(statement_nodes) if statement_nodes else 0, + "triplet_entities_count": len(entity_nodes) if entity_nodes else 0, + "triplet_relations_count": len(entity_edges) if entity_edges else 0, + "temporal_count": 0, # temporal 数据在日志中,此处暂置0 + } + await ActivityStatsCache.set_activity_stats( + workspace_id=str(memory_config.workspace_id), + stats=stats_to_cache, + ) + logger.info(f"[PILOT_RUN] 活动统计已写入 Redis: workspace_id={memory_config.workspace_id}") + except Exception as cache_err: + logger.warning(f"[PILOT_RUN] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True) + except Exception as e: logger.error(f"Pilot run failed: {e}", exc_info=True) raise