[add] Recently, memory activities have adopted Redis caching.
This commit is contained in:
@@ -783,8 +783,37 @@ async def analytics_hot_memory_tags(
|
||||
await connector.close()
|
||||
|
||||
|
||||
async def analytics_recent_activity_stats() -> Dict[str, Any]:
|
||||
stats, _msg = get_recent_activity_stats()
|
||||
async def analytics_recent_activity_stats(workspace_id: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""获取最近记忆提取活动统计。
|
||||
|
||||
优先从 Redis 缓存读取(按 workspace_id),缓存不存在时降级到日志文件解析。
|
||||
|
||||
Args:
|
||||
workspace_id: 工作空间ID,用于从 Redis 读取对应缓存
|
||||
|
||||
Returns:
|
||||
包含 total、stats、latest_relative、source 的统计字典
|
||||
"""
|
||||
stats = None
|
||||
source = "log"
|
||||
|
||||
# 优先从 Redis 读取
|
||||
if workspace_id:
|
||||
try:
|
||||
from app.cache.memory.activity_stats_cache import ActivityStatsCache
|
||||
cached = await ActivityStatsCache.get_activity_stats(workspace_id)
|
||||
if cached:
|
||||
stats = cached.get("stats", {})
|
||||
source = "redis"
|
||||
logger.info(f"[ANALYTICS] 从 Redis 读取活动统计: workspace_id={workspace_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[ANALYTICS] 读取 Redis 活动统计失败,降级到日志: {e}")
|
||||
|
||||
# 降级:从日志文件解析
|
||||
if stats is None:
|
||||
stats, _msg = get_recent_activity_stats()
|
||||
source = "log"
|
||||
|
||||
total = (
|
||||
stats.get("chunk_count", 0)
|
||||
+ stats.get("statements_count", 0)
|
||||
@@ -792,26 +821,29 @@ async def analytics_recent_activity_stats() -> Dict[str, Any]:
|
||||
+ stats.get("triplet_relations_count", 0)
|
||||
+ stats.get("temporal_count", 0)
|
||||
)
|
||||
# 精简:仅提供“最新一次活动多久前”
|
||||
latest_relative = None
|
||||
try:
|
||||
info = stats.get("log_path", "")
|
||||
idx = info.rfind("最新:")
|
||||
if idx != -1:
|
||||
latest_path = info[idx + 3 :].strip()
|
||||
if latest_path and os.path.exists(latest_path):
|
||||
import time
|
||||
diff = max(0.0, time.time() - os.path.getmtime(latest_path))
|
||||
m = int(diff // 60)
|
||||
if m < 1:
|
||||
latest_relative = "刚刚"
|
||||
elif m < 60:
|
||||
latest_relative = "一会前"
|
||||
else:
|
||||
latest_relative = "较早前"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
data = {"total": total, "stats": stats, "latest_relative": latest_relative}
|
||||
# 计算"最新一次活动多久前"(仅日志来源时有效)
|
||||
latest_relative = None
|
||||
if source == "log":
|
||||
try:
|
||||
info = stats.get("log_path", "")
|
||||
idx = info.rfind("最新:")
|
||||
if idx != -1:
|
||||
latest_path = info[idx + 3:].strip()
|
||||
if latest_path and os.path.exists(latest_path):
|
||||
import time
|
||||
diff = max(0.0, time.time() - os.path.getmtime(latest_path))
|
||||
m = int(diff // 60)
|
||||
if m < 1:
|
||||
latest_relative = "刚刚"
|
||||
elif m < 60:
|
||||
latest_relative = "一会前"
|
||||
else:
|
||||
latest_relative = "较早前"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
data = {"total": total, "stats": stats, "latest_relative": latest_relative, "source": source}
|
||||
return data
|
||||
|
||||
|
||||
|
||||
@@ -326,6 +326,25 @@ async def run_pilot_extraction(
|
||||
|
||||
logger.info("Pilot run completed: Skipping Neo4j save")
|
||||
|
||||
# 将提取统计写入 Redis,按 workspace_id 存储
|
||||
try:
|
||||
from app.cache.memory.activity_stats_cache import ActivityStatsCache
|
||||
|
||||
stats_to_cache = {
|
||||
"chunk_count": len(chunk_nodes) if chunk_nodes else 0,
|
||||
"statements_count": len(statement_nodes) if statement_nodes else 0,
|
||||
"triplet_entities_count": len(entity_nodes) if entity_nodes else 0,
|
||||
"triplet_relations_count": len(entity_edges) if entity_edges else 0,
|
||||
"temporal_count": 0, # temporal 数据在日志中,此处暂置0
|
||||
}
|
||||
await ActivityStatsCache.set_activity_stats(
|
||||
workspace_id=str(memory_config.workspace_id),
|
||||
stats=stats_to_cache,
|
||||
)
|
||||
logger.info(f"[PILOT_RUN] 活动统计已写入 Redis: workspace_id={memory_config.workspace_id}")
|
||||
except Exception as cache_err:
|
||||
logger.warning(f"[PILOT_RUN] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Pilot run failed: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user