diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index fe4337d1..80eb7ea1 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -1,3 +1,5 @@ +import time +from contextlib import contextmanager from fastapi import APIRouter, Depends, HTTPException, status, Query from pydantic import BaseModel, Field from sqlalchemy.orm import Session @@ -16,6 +18,18 @@ from app.core.logging_config import get_api_logger # 获取API专用日志器 api_logger = get_api_logger() + +@contextmanager +def timer(label: str, user_count: int = 0): + """上下文管理器:用于测量代码块执行时间""" + start = time.perf_counter() + try: + yield + finally: + elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒 + extra_info = f", 用户数: {user_count}" if user_count > 0 else "" + api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}") + router = APIRouter( prefix="/dashboard", tags=["Dashboard"], @@ -52,7 +66,7 @@ async def get_workspace_end_users( ): """ 获取工作空间的宿主列表(高性能优化版本 v2) - + 优化策略: 1. 批量查询 end_users(一次查询而非循环) 2. 并发查询所有用户的记忆数量(Neo4j) @@ -60,7 +74,7 @@ async def get_workspace_end_users( 4. 只返回必要字段减少数据传输 5. 添加短期缓存减少重复查询 6. 并发执行配置查询和记忆数量查询 - + 返回格式: { "end_user": {"id": "uuid", "other_name": "名称"}, @@ -70,129 +84,149 @@ async def get_workspace_end_users( """ import asyncio import json - from app.aioRedis import aio_redis_get, aio_redis_set - + # from app.aioRedis import aio_redis_get, aio_redis_set + + # 总耗时统计 + total_start = time.perf_counter() + workspace_id = current_user.current_workspace_id - - # 尝试从缓存获取(30秒缓存) - cache_key = f"end_users:workspace:{workspace_id}" - try: - cached_data = await aio_redis_get(cache_key) - if cached_data: - api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}") - return success(data=json.loads(cached_data), msg="宿主列表获取成功") - except Exception as e: - api_logger.warning(f"Redis 缓存读取失败: {str(e)}") - + + # # 尝试从缓存获取(30秒缓存)- 暂时注释以便进行性能测试 + # with timer("Redis缓存读取"): + # cache_key = f"end_users:workspace:{workspace_id}" + # try: + # cached_data = await aio_redis_get(cache_key) + # if cached_data: + # api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}") + # return success(data=json.loads(cached_data), msg="宿主列表获取成功") + # except Exception as e: + # api_logger.warning(f"Redis 缓存读取失败: {str(e)}") + # 获取当前空间类型 - current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) - api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表") - + with timer("获取空间类型"): + current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) + api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表, 类型: {current_workspace_type}") + # 获取 end_users(已优化为批量查询) - end_users = memory_dashboard_service.get_workspace_end_users( - db=db, - workspace_id=workspace_id, - current_user=current_user - ) + with timer("获取用户列表"): + end_users = memory_dashboard_service.get_workspace_end_users( + db=db, + workspace_id=workspace_id, + current_user=current_user + ) if not end_users: api_logger.info("工作空间下没有宿主") - # 缓存空结果,避免重复查询 - try: - await aio_redis_set(cache_key, json.dumps([]), expire=30) - except Exception as e: - api_logger.warning(f"Redis 缓存写入失败: {str(e)}") + # # 缓存空结果,避免重复查询 - 暂时注释 + # try: + # await aio_redis_set(cache_key, json.dumps([]), expire=30) + # except Exception as e: + # api_logger.warning(f"Redis 缓存写入失败: {str(e)}") return success(data=[], msg="宿主列表获取成功") - + end_user_ids = [str(user.id) for user in end_users] - + user_count = len(end_user_ids) + api_logger.info(f"需要处理的用户数: {user_count}") + # 并发执行两个独立的查询任务 async def get_memory_configs(): """获取记忆配置(在线程池中执行同步查询)""" - try: - return await asyncio.to_thread( - get_end_users_connected_configs_batch, - end_user_ids, db - ) - except Exception as e: - api_logger.error(f"批量获取记忆配置失败: {str(e)}") - return {} - + with timer("功能模块-获取记忆配置", user_count): + try: + return await asyncio.to_thread( + get_end_users_connected_configs_batch, + end_user_ids, db + ) + except Exception as e: + api_logger.error(f"批量获取记忆配置失败: {str(e)}") + return {} + async def get_memory_nums(): """获取记忆数量""" - if current_workspace_type == "rag": - # RAG 模式:批量查询 - try: - chunk_map = await asyncio.to_thread( - memory_dashboard_service.get_users_total_chunk_batch, - end_user_ids, db, current_user - ) - return {uid: {"total": count} for uid, count in chunk_map.items()} - except Exception as e: - api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}") - return {uid: {"total": 0} for uid in end_user_ids} - - elif current_workspace_type == "neo4j": - # Neo4j 模式:并发查询(带并发限制) - # 使用信号量限制并发数,避免大量用户时压垮 Neo4j - MAX_CONCURRENT_QUERIES = 10 - semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES) - - async def get_neo4j_memory_num(end_user_id: str): - async with semaphore: + with timer(f"功能模块-获取记忆数量[{current_workspace_type}]", user_count): + if current_workspace_type == "rag": + # RAG 模式:批量查询 + with timer(" - RAG批量查询chunks"): try: - return await memory_storage_service.search_all(end_user_id) + chunk_map = await asyncio.to_thread( + memory_dashboard_service.get_users_total_chunk_batch, + end_user_ids, db, current_user + ) + return {uid: {"total": count} for uid, count in chunk_map.items()} except Exception as e: - api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}") - return {"total": 0} - - memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids]) - return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))} - - return {uid: {"total": 0} for uid in end_user_ids} + api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}") + return {uid: {"total": 0} for uid in end_user_ids} + + elif current_workspace_type == "neo4j": + # Neo4j 模式:并发查询(带并发限制) + # 使用信号量限制并发数,避免大量用户时压垮 Neo4j + MAX_CONCURRENT_QUERIES = 10 + semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES) + + async def get_neo4j_memory_num(end_user_id: str): + async with semaphore: + single_start = time.perf_counter() + try: + result = await memory_storage_service.search_all(end_user_id) + elapsed = (time.perf_counter() - single_start) * 1000 + api_logger.info(f" - Neo4j单用户查询[{end_user_id}]: {elapsed:.2f}ms") + return result + except Exception as e: + api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}") + return {"total": 0} + + with timer(" - Neo4j并发查询所有用户"): + memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids]) + return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))} + + return {uid: {"total": 0} for uid in end_user_ids} # 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据 - try: - from app.celery_app import celery_app as _celery_app - _celery_app.send_task( - "app.tasks.init_implicit_emotions_for_users", - kwargs={"end_user_ids": end_user_ids}, - ) - _celery_app.send_task( - "app.tasks.init_interest_distribution_for_users", - kwargs={"end_user_ids": end_user_ids}, - ) - api_logger.info(f"已触发按需初始化任务,候选用户数: {len(end_user_ids)}") - except Exception as e: - api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}") + with timer("触发Celery初始化任务"): + try: + from app.celery_app import celery_app as _celery_app + _celery_app.send_task( + "app.tasks.init_implicit_emotions_for_users", + kwargs={"end_user_ids": end_user_ids}, + ) + _celery_app.send_task( + "app.tasks.init_interest_distribution_for_users", + kwargs={"end_user_ids": end_user_ids}, + ) + api_logger.info(f"已触发按需初始化任务,候选用户数: {len(end_user_ids)}") + except Exception as e: + api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}") # 并发执行配置查询和记忆数量查询 - memory_configs_map, memory_nums_map = await asyncio.gather( - get_memory_configs(), - get_memory_nums() - ) - + with timer("并发执行两个功能模块"): + memory_configs_map, memory_nums_map = await asyncio.gather( + get_memory_configs(), + get_memory_nums() + ) + # 构建结果(优化:使用列表推导式) - result = [] - for end_user in end_users: - user_id = str(end_user.id) - config_info = memory_configs_map.get(user_id, {}) - result.append({ - 'end_user': { - 'id': user_id, - 'other_name': end_user.other_name - }, - 'memory_num': memory_nums_map.get(user_id, {"total": 0}), - 'memory_config': { - "memory_config_id": config_info.get("memory_config_id"), - "memory_config_name": config_info.get("memory_config_name") - } - }) - - # 写入缓存(30秒过期) - try: - await aio_redis_set(cache_key, json.dumps(result), expire=30) - except Exception as e: - api_logger.warning(f"Redis 缓存写入失败: {str(e)}") + with timer("构建返回结果"): + result = [] + for end_user in end_users: + user_id = str(end_user.id) + config_info = memory_configs_map.get(user_id, {}) + result.append({ + 'end_user': { + 'id': user_id, + 'other_name': end_user.other_name + }, + 'memory_num': memory_nums_map.get(user_id, {"total": 0}), + 'memory_config': { + "memory_config_id": config_info.get("memory_config_id"), + "memory_config_name": config_info.get("memory_config_name") + } + }) + + # # 写入缓存(30秒过期)- 暂时注释以便进行性能测试 + # with timer("Redis缓存写入"): + # try: + # await aio_redis_set(cache_key, json.dumps(result), expire=30) + # except Exception as e: + # api_logger.warning(f"Redis 缓存写入失败: {str(e)}") # 触发社区聚类补全任务(异步,不阻塞接口响应) try: @@ -202,6 +236,8 @@ async def get_workspace_end_users( except Exception as e: api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}") + total_elapsed = (time.perf_counter() - total_start) * 1000 + api_logger.info(f"[性能统计] 接口总耗时: {total_elapsed:.2f}ms, 用户数: {user_count}") api_logger.info(f"成功获取 {len(end_users)} 个宿主记录") return success(data=result, msg="宿主列表获取成功")