diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 948154f1..260ea670 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -77,7 +77,7 @@ async def get_workspace_end_users( workspace_id = current_user.current_workspace_id # 获取当前空间类型 current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) - api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表: keyword={keyword}, page={page}, pagesize={pagesize}") + api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表, 类型: {current_workspace_type}") # 获取分页的 end_users end_users_result = memory_dashboard_service.get_workspace_end_users_paginated( @@ -105,7 +105,7 @@ async def get_workspace_end_users( }, msg="宿主列表获取成功") end_user_ids = [str(user.id) for user in end_users] - + # 并发执行两个独立的查询任务 async def get_memory_configs(): """获取记忆配置(在线程池中执行同步查询)""" @@ -117,7 +117,7 @@ async def get_workspace_end_users( except Exception as e: api_logger.error(f"批量获取记忆配置失败: {str(e)}") return {} - + async def get_memory_nums(): """获取记忆数量""" if current_workspace_type == "rag": @@ -131,26 +131,18 @@ async def get_workspace_end_users( except Exception as e: api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}") return {uid: {"total": 0} for uid in end_user_ids} - + elif current_workspace_type == "neo4j": - # Neo4j 模式:并发查询(带并发限制) - # 使用信号量限制并发数,避免大量用户时压垮 Neo4j - MAX_CONCURRENT_QUERIES = 10 - semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES) - - async def get_neo4j_memory_num(end_user_id: str): - async with semaphore: - try: - return await memory_storage_service.search_all(end_user_id) - except Exception as e: - api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}") - return {"total": 0} - - memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids]) - return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))} - + # Neo4j 模式:批量查询(简化版本,只返回total) + try: + batch_result = await memory_storage_service.search_all_batch(end_user_ids) + return {uid: {"total": count} for uid, count in batch_result.items()} + except Exception as e: + api_logger.error(f"批量获取 Neo4j 记忆数量失败: {str(e)}") + return {uid: {"total": 0} for uid in end_user_ids} + return {uid: {"total": 0} for uid in end_user_ids} - + # 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据 try: from app.celery_app import celery_app as _celery_app diff --git a/api/app/repositories/memory_config_repository.py b/api/app/repositories/memory_config_repository.py index e64d19a3..3139b851 100644 --- a/api/app/repositories/memory_config_repository.py +++ b/api/app/repositories/memory_config_repository.py @@ -78,6 +78,15 @@ class MemoryConfigRepository: OPTIONAL MATCH (n) WHERE n.end_user_id = $end_user_id RETURN 'ALL' AS Label, COUNT(n) AS Count """ + # 批量查询多个用户的记忆数量(简化版本,只返回total) + SEARCH_FOR_ALL_BATCH = """ + MATCH (n) WHERE n.end_user_id IN $end_user_ids + RETURN + n.end_user_id as user_id, + count(n) as total + ORDER BY user_id + """ + # Extracted entity details within group/app/user SEARCH_FOR_DETIALS = """ MATCH (n:ExtractedEntity) diff --git a/api/app/services/memory_storage_service.py b/api/app/services/memory_storage_service.py index 58f3e8bd..b3a66734 100644 --- a/api/app/services/memory_storage_service.py +++ b/api/app/services/memory_storage_service.py @@ -695,6 +695,37 @@ async def search_edges(end_user_id: Optional[str] = None) -> List[Dict[str, Any] return result +async def search_all_batch(end_user_ids: List[str]) -> Dict[str, int]: + """批量查询多个用户的记忆数量(简化版本,只返回total) + + Args: + end_user_ids: 用户ID列表 + + Returns: + Dict[str, int]: 以user_id为key的记忆数量字典 + 格式: {"user_id": total_count} + """ + if not end_user_ids: + return {} + + result = await _neo4j_connector.execute_query( + MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, + end_user_ids=end_user_ids, + ) + + # 转换结果为字典格式,字典格式在查询中无需遍历结果集,直接返回 + data = {} + for row in result: + data[row["user_id"]] = row["total"] + + # 为没有数据的用户填充默认值,转换字典格式还为无数据填充默认值 + for user_id in end_user_ids: + if user_id not in data: + data[user_id] = 0 + + return data + + async def analytics_hot_memory_tags( db: Session, current_user: User, diff --git a/api/app/utils/performance_timer.py b/api/app/utils/performance_timer.py new file mode 100644 index 00000000..6b0ec5d6 --- /dev/null +++ b/api/app/utils/performance_timer.py @@ -0,0 +1,37 @@ +""" +性能监控工具模块 + +提供代码块执行时间统计功能,用于接口性能分析。 +如需再次启用性能监控,只需在 controller 中导入 from app.utils.performance_timer import timer 并添加 with timer(...) 包裹需要监控的代码块即可 +""" + +import time +from contextlib import contextmanager +from app.core.logging_config import get_api_logger + +# 获取API专用日志器 +api_logger = get_api_logger() + + +@contextmanager +def timer(label: str, user_count: int = 0): + """上下文管理器:用于测量代码块执行时间 + + Args: + label: 统计标签,用于标识被测量的代码块 + user_count: 用户数,可选参数,用于记录处理的用户数量 + + Usage: + with timer("获取用户列表"): + users = get_users() + + with timer("批量处理", user_count=len(user_ids)): + process_users(user_ids) + """ + start = time.perf_counter() + try: + yield + finally: + elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒 + extra_info = f", 用户数: {user_count}" if user_count > 0 else "" + api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}")