Merge pull request #504 from SuanmoSuanyangTechnology/feature/activity-cache

[add] Recently, memory activities have adopted Redis caching.
This commit is contained in:
Mark
2026-03-06 18:48:26 +08:00
committed by GitHub
6 changed files with 222 additions and 25 deletions

View File

@@ -4,7 +4,9 @@ Memory 缓存模块
提供记忆系统相关的缓存功能
"""
from .interest_memory import InterestMemoryCache
from .activity_stats_cache import ActivityStatsCache
__all__ = [
"InterestMemoryCache",
"ActivityStatsCache",
]

View File

@@ -0,0 +1,124 @@
"""
Recent Activity Stats Cache
记忆提取活动统计缓存模块
用于缓存每次记忆提取流程的统计数据,按 workspace_id 存储24小时后释放
查询命令cache:memory:activity_stats:by_workspace:7de31a97-40a6-4fc0-b8d3-15c89f523843
"""
import json
import logging
from typing import Optional, Dict, Any
from datetime import datetime
from app.aioRedis import aio_redis
logger = logging.getLogger(__name__)
# 缓存过期时间24小时
ACTIVITY_STATS_CACHE_EXPIRE = 86400
class ActivityStatsCache:
"""记忆提取活动统计缓存类"""
PREFIX = "cache:memory:activity_stats"
@classmethod
def _get_key(cls, workspace_id: str) -> str:
"""生成 Redis key
Args:
workspace_id: 工作空间ID
Returns:
完整的 Redis key
"""
return f"{cls.PREFIX}:by_workspace:{workspace_id}"
@classmethod
async def set_activity_stats(
cls,
workspace_id: str,
stats: Dict[str, Any],
expire: int = ACTIVITY_STATS_CACHE_EXPIRE,
) -> bool:
"""设置记忆提取活动统计缓存
Args:
workspace_id: 工作空间ID
stats: 统计数据,格式:
{
"chunk_count": int,
"statements_count": int,
"triplet_entities_count": int,
"triplet_relations_count": int,
"temporal_count": int,
}
expire: 过期时间默认24小时
Returns:
是否设置成功
"""
try:
key = cls._get_key(workspace_id)
payload = {
"stats": stats,
"generated_at": datetime.now().isoformat(),
"workspace_id": workspace_id,
"cached": True,
}
value = json.dumps(payload, ensure_ascii=False)
await aio_redis.set(key, value, ex=expire)
logger.info(f"设置活动统计缓存成功: {key}, 过期时间: {expire}")
return True
except Exception as e:
logger.error(f"设置活动统计缓存失败: {e}", exc_info=True)
return False
@classmethod
async def get_activity_stats(
cls,
workspace_id: str,
) -> Optional[Dict[str, Any]]:
"""获取记忆提取活动统计缓存
Args:
workspace_id: 工作空间ID
Returns:
统计数据字典,缓存不存在或已过期返回 None
"""
try:
key = cls._get_key(workspace_id)
value = await aio_redis.get(key)
if value:
payload = json.loads(value)
logger.info(f"命中活动统计缓存: {key}")
return payload
logger.info(f"活动统计缓存不存在或已过期: {key}")
return None
except Exception as e:
logger.error(f"获取活动统计缓存失败: {e}", exc_info=True)
return None
@classmethod
async def delete_activity_stats(
cls,
workspace_id: str,
) -> bool:
"""删除记忆提取活动统计缓存
Args:
workspace_id: 工作空间ID
Returns:
是否删除成功
"""
try:
key = cls._get_key(workspace_id)
result = await aio_redis.delete(key)
logger.info(f"删除活动统计缓存: {key}, 结果: {result}")
return result > 0
except Exception as e:
logger.error(f"删除活动统计缓存失败: {e}", exc_info=True)
return False

View File

@@ -544,10 +544,11 @@ async def clear_hot_memory_tags_cache(
@router.get("/analytics/recent_activity_stats", response_model=ApiResponse)
async def get_recent_activity_stats_api(
current_user: User = Depends(get_current_user),
) -> dict:
api_logger.info("Recent activity stats requested")
) -> dict:
workspace_id = str(current_user.current_workspace_id) if current_user.current_workspace_id else None
api_logger.info(f"Recent activity stats requested: workspace_id={workspace_id}")
try:
result = await analytics_recent_activity_stats()
result = await analytics_recent_activity_stats(workspace_id=workspace_id)
return success(data=result, msg="查询成功")
except Exception as e:
api_logger.error(f"Recent activity stats failed: {str(e)}")

View File

@@ -225,5 +225,24 @@ async def write(
with open(log_file, "a", encoding="utf-8") as f:
f.write(f"=== Pipeline Run Completed: {timestamp} ===\n\n")
# 将提取统计写入 Redis按 workspace_id 存储
try:
from app.cache.memory.activity_stats_cache import ActivityStatsCache
stats_to_cache = {
"chunk_count": len(all_chunk_nodes) if all_chunk_nodes else 0,
"statements_count": len(all_statement_nodes) if all_statement_nodes else 0,
"triplet_entities_count": len(all_entity_nodes) if all_entity_nodes else 0,
"triplet_relations_count": len(all_entity_entity_edges) if all_entity_entity_edges else 0,
"temporal_count": 0,
}
await ActivityStatsCache.set_activity_stats(
workspace_id=str(memory_config.workspace_id),
stats=stats_to_cache,
)
logger.info(f"[WRITE] 活动统计已写入 Redis: workspace_id={memory_config.workspace_id}")
except Exception as cache_err:
logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True)
logger.info("=== Pipeline Complete ===")
logger.info(f"Total execution time: {total_time:.2f} seconds")

View File

@@ -783,8 +783,37 @@ async def analytics_hot_memory_tags(
await connector.close()
async def analytics_recent_activity_stats() -> Dict[str, Any]:
stats, _msg = get_recent_activity_stats()
async def analytics_recent_activity_stats(workspace_id: Optional[str] = None) -> Dict[str, Any]:
"""获取最近记忆提取活动统计。
优先从 Redis 缓存读取(按 workspace_id缓存不存在时降级到日志文件解析。
Args:
workspace_id: 工作空间ID用于从 Redis 读取对应缓存
Returns:
包含 total、stats、latest_relative、source 的统计字典
"""
stats = None
source = "log"
# 优先从 Redis 读取
if workspace_id:
try:
from app.cache.memory.activity_stats_cache import ActivityStatsCache
cached = await ActivityStatsCache.get_activity_stats(workspace_id)
if cached:
stats = cached.get("stats", {})
source = "redis"
logger.info(f"[ANALYTICS] 从 Redis 读取活动统计: workspace_id={workspace_id}")
except Exception as e:
logger.warning(f"[ANALYTICS] 读取 Redis 活动统计失败,降级到日志: {e}")
# 降级:从日志文件解析
if stats is None:
stats, _msg = get_recent_activity_stats()
source = "log"
total = (
stats.get("chunk_count", 0)
+ stats.get("statements_count", 0)
@@ -792,26 +821,29 @@ async def analytics_recent_activity_stats() -> Dict[str, Any]:
+ stats.get("triplet_relations_count", 0)
+ stats.get("temporal_count", 0)
)
# 精简:仅提供“最新一次活动多久前”
latest_relative = None
try:
info = stats.get("log_path", "")
idx = info.rfind("最新:")
if idx != -1:
latest_path = info[idx + 3 :].strip()
if latest_path and os.path.exists(latest_path):
import time
diff = max(0.0, time.time() - os.path.getmtime(latest_path))
m = int(diff // 60)
if m < 1:
latest_relative = "刚刚"
elif m < 60:
latest_relative = "一会前"
else:
latest_relative = "较早前"
except Exception:
pass
data = {"total": total, "stats": stats, "latest_relative": latest_relative}
# 计算"最新一次活动多久前"(仅日志来源时有效)
latest_relative = None
if source == "log":
try:
info = stats.get("log_path", "")
idx = info.rfind("最新:")
if idx != -1:
latest_path = info[idx + 3:].strip()
if latest_path and os.path.exists(latest_path):
import time
diff = max(0.0, time.time() - os.path.getmtime(latest_path))
m = int(diff // 60)
if m < 1:
latest_relative = "刚刚"
elif m < 60:
latest_relative = "一会前"
else:
latest_relative = "较早前"
except Exception:
pass
data = {"total": total, "stats": stats, "latest_relative": latest_relative, "source": source}
return data

View File

@@ -326,6 +326,25 @@ async def run_pilot_extraction(
logger.info("Pilot run completed: Skipping Neo4j save")
# 将提取统计写入 Redis按 workspace_id 存储
try:
from app.cache.memory.activity_stats_cache import ActivityStatsCache
stats_to_cache = {
"chunk_count": len(chunk_nodes) if chunk_nodes else 0,
"statements_count": len(statement_nodes) if statement_nodes else 0,
"triplet_entities_count": len(entity_nodes) if entity_nodes else 0,
"triplet_relations_count": len(entity_edges) if entity_edges else 0,
"temporal_count": 0, # temporal 数据在日志中此处暂置0
}
await ActivityStatsCache.set_activity_stats(
workspace_id=str(memory_config.workspace_id),
stats=stats_to_cache,
)
logger.info(f"[PILOT_RUN] 活动统计已写入 Redis: workspace_id={memory_config.workspace_id}")
except Exception as cache_err:
logger.warning(f"[PILOT_RUN] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True)
except Exception as e:
logger.error(f"Pilot run failed: {e}", exc_info=True)
raise