Compare commits

...

1 Commits

Author SHA1 Message Date
lanceyq
82c6d1a90f [feat] Context manager: Used to measure the execution time of code blocks 2026-03-31 14:56:26 +08:00

View File

@@ -1,3 +1,5 @@
import time
from contextlib import contextmanager
from fastapi import APIRouter, Depends, HTTPException, status, Query from fastapi import APIRouter, Depends, HTTPException, status, Query
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -16,6 +18,18 @@ from app.core.logging_config import get_api_logger
# 获取API专用日志器 # 获取API专用日志器
api_logger = get_api_logger() api_logger = get_api_logger()
@contextmanager
def timer(label: str, user_count: int = 0):
"""上下文管理器:用于测量代码块执行时间"""
start = time.perf_counter()
try:
yield
finally:
elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒
extra_info = f", 用户数: {user_count}" if user_count > 0 else ""
api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}")
router = APIRouter( router = APIRouter(
prefix="/dashboard", prefix="/dashboard",
tags=["Dashboard"], tags=["Dashboard"],
@@ -52,7 +66,7 @@ async def get_workspace_end_users(
): ):
""" """
获取工作空间的宿主列表(高性能优化版本 v2 获取工作空间的宿主列表(高性能优化版本 v2
优化策略: 优化策略:
1. 批量查询 end_users一次查询而非循环 1. 批量查询 end_users一次查询而非循环
2. 并发查询所有用户的记忆数量Neo4j 2. 并发查询所有用户的记忆数量Neo4j
@@ -60,7 +74,7 @@ async def get_workspace_end_users(
4. 只返回必要字段减少数据传输 4. 只返回必要字段减少数据传输
5. 添加短期缓存减少重复查询 5. 添加短期缓存减少重复查询
6. 并发执行配置查询和记忆数量查询 6. 并发执行配置查询和记忆数量查询
返回格式: 返回格式:
{ {
"end_user": {"id": "uuid", "other_name": "名称"}, "end_user": {"id": "uuid", "other_name": "名称"},
@@ -70,129 +84,149 @@ async def get_workspace_end_users(
""" """
import asyncio import asyncio
import json import json
from app.aioRedis import aio_redis_get, aio_redis_set # from app.aioRedis import aio_redis_get, aio_redis_set
# 总耗时统计
total_start = time.perf_counter()
workspace_id = current_user.current_workspace_id workspace_id = current_user.current_workspace_id
# 尝试从缓存获取30秒缓存 # # 尝试从缓存获取30秒缓存- 暂时注释以便进行性能测试
cache_key = f"end_users:workspace:{workspace_id}" # with timer("Redis缓存读取"):
try: # cache_key = f"end_users:workspace:{workspace_id}"
cached_data = await aio_redis_get(cache_key) # try:
if cached_data: # cached_data = await aio_redis_get(cache_key)
api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}") # if cached_data:
return success(data=json.loads(cached_data), msg="宿主列表获取成功") # api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}")
except Exception as e: # return success(data=json.loads(cached_data), msg="宿主列表获取成功")
api_logger.warning(f"Redis 缓存读取失败: {str(e)}") # except Exception as e:
# api_logger.warning(f"Redis 缓存读取失败: {str(e)}")
# 获取当前空间类型 # 获取当前空间类型
current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) with timer("获取空间类型"):
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表") current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user)
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表, 类型: {current_workspace_type}")
# 获取 end_users已优化为批量查询 # 获取 end_users已优化为批量查询
end_users = memory_dashboard_service.get_workspace_end_users( with timer("获取用户列表"):
db=db, end_users = memory_dashboard_service.get_workspace_end_users(
workspace_id=workspace_id, db=db,
current_user=current_user workspace_id=workspace_id,
) current_user=current_user
)
if not end_users: if not end_users:
api_logger.info("工作空间下没有宿主") api_logger.info("工作空间下没有宿主")
# 缓存空结果,避免重复查询 # # 缓存空结果,避免重复查询 - 暂时注释
try: # try:
await aio_redis_set(cache_key, json.dumps([]), expire=30) # await aio_redis_set(cache_key, json.dumps([]), expire=30)
except Exception as e: # except Exception as e:
api_logger.warning(f"Redis 缓存写入失败: {str(e)}") # api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
return success(data=[], msg="宿主列表获取成功") return success(data=[], msg="宿主列表获取成功")
end_user_ids = [str(user.id) for user in end_users] end_user_ids = [str(user.id) for user in end_users]
user_count = len(end_user_ids)
api_logger.info(f"需要处理的用户数: {user_count}")
# 并发执行两个独立的查询任务 # 并发执行两个独立的查询任务
async def get_memory_configs(): async def get_memory_configs():
"""获取记忆配置(在线程池中执行同步查询)""" """获取记忆配置(在线程池中执行同步查询)"""
try: with timer("功能模块-获取记忆配置", user_count):
return await asyncio.to_thread( try:
get_end_users_connected_configs_batch, return await asyncio.to_thread(
end_user_ids, db get_end_users_connected_configs_batch,
) end_user_ids, db
except Exception as e: )
api_logger.error(f"批量获取记忆配置失败: {str(e)}") except Exception as e:
return {} api_logger.error(f"批量获取记忆配置失败: {str(e)}")
return {}
async def get_memory_nums(): async def get_memory_nums():
"""获取记忆数量""" """获取记忆数量"""
if current_workspace_type == "rag": with timer(f"功能模块-获取记忆数量[{current_workspace_type}]", user_count):
# RAG 模式:批量查询 if current_workspace_type == "rag":
try: # RAG 模式:批量查询
chunk_map = await asyncio.to_thread( with timer(" - RAG批量查询chunks"):
memory_dashboard_service.get_users_total_chunk_batch,
end_user_ids, db, current_user
)
return {uid: {"total": count} for uid, count in chunk_map.items()}
except Exception as e:
api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}")
return {uid: {"total": 0} for uid in end_user_ids}
elif current_workspace_type == "neo4j":
# Neo4j 模式:并发查询(带并发限制)
# 使用信号量限制并发数,避免大量用户时压垮 Neo4j
MAX_CONCURRENT_QUERIES = 10
semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES)
async def get_neo4j_memory_num(end_user_id: str):
async with semaphore:
try: try:
return await memory_storage_service.search_all(end_user_id) chunk_map = await asyncio.to_thread(
memory_dashboard_service.get_users_total_chunk_batch,
end_user_ids, db, current_user
)
return {uid: {"total": count} for uid, count in chunk_map.items()}
except Exception as e: except Exception as e:
api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}") api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}")
return {"total": 0} return {uid: {"total": 0} for uid in end_user_ids}
memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids]) elif current_workspace_type == "neo4j":
return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))} # Neo4j 模式:并发查询(带并发限制)
# 使用信号量限制并发数,避免大量用户时压垮 Neo4j
return {uid: {"total": 0} for uid in end_user_ids} MAX_CONCURRENT_QUERIES = 10
semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES)
async def get_neo4j_memory_num(end_user_id: str):
async with semaphore:
single_start = time.perf_counter()
try:
result = await memory_storage_service.search_all(end_user_id)
elapsed = (time.perf_counter() - single_start) * 1000
api_logger.info(f" - Neo4j单用户查询[{end_user_id}]: {elapsed:.2f}ms")
return result
except Exception as e:
api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}")
return {"total": 0}
with timer(" - Neo4j并发查询所有用户"):
memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids])
return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))}
return {uid: {"total": 0} for uid in end_user_ids}
# 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据 # 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据
try: with timer("触发Celery初始化任务"):
from app.celery_app import celery_app as _celery_app try:
_celery_app.send_task( from app.celery_app import celery_app as _celery_app
"app.tasks.init_implicit_emotions_for_users", _celery_app.send_task(
kwargs={"end_user_ids": end_user_ids}, "app.tasks.init_implicit_emotions_for_users",
) kwargs={"end_user_ids": end_user_ids},
_celery_app.send_task( )
"app.tasks.init_interest_distribution_for_users", _celery_app.send_task(
kwargs={"end_user_ids": end_user_ids}, "app.tasks.init_interest_distribution_for_users",
) kwargs={"end_user_ids": end_user_ids},
api_logger.info(f"已触发按需初始化任务,候选用户数: {len(end_user_ids)}") )
except Exception as e: api_logger.info(f"已触发按需初始化任务,候选用户数: {len(end_user_ids)}")
api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}") except Exception as e:
api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}")
# 并发执行配置查询和记忆数量查询 # 并发执行配置查询和记忆数量查询
memory_configs_map, memory_nums_map = await asyncio.gather( with timer("并发执行两个功能模块"):
get_memory_configs(), memory_configs_map, memory_nums_map = await asyncio.gather(
get_memory_nums() get_memory_configs(),
) get_memory_nums()
)
# 构建结果(优化:使用列表推导式) # 构建结果(优化:使用列表推导式)
result = [] with timer("构建返回结果"):
for end_user in end_users: result = []
user_id = str(end_user.id) for end_user in end_users:
config_info = memory_configs_map.get(user_id, {}) user_id = str(end_user.id)
result.append({ config_info = memory_configs_map.get(user_id, {})
'end_user': { result.append({
'id': user_id, 'end_user': {
'other_name': end_user.other_name 'id': user_id,
}, 'other_name': end_user.other_name
'memory_num': memory_nums_map.get(user_id, {"total": 0}), },
'memory_config': { 'memory_num': memory_nums_map.get(user_id, {"total": 0}),
"memory_config_id": config_info.get("memory_config_id"), 'memory_config': {
"memory_config_name": config_info.get("memory_config_name") "memory_config_id": config_info.get("memory_config_id"),
} "memory_config_name": config_info.get("memory_config_name")
}) }
})
# 写入缓存30秒过期
try: # # 写入缓存30秒过期- 暂时注释以便进行性能测试
await aio_redis_set(cache_key, json.dumps(result), expire=30) # with timer("Redis缓存写入"):
except Exception as e: # try:
api_logger.warning(f"Redis 缓存写入失败: {str(e)}") # await aio_redis_set(cache_key, json.dumps(result), expire=30)
# except Exception as e:
# api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
# 触发社区聚类补全任务(异步,不阻塞接口响应) # 触发社区聚类补全任务(异步,不阻塞接口响应)
try: try:
@@ -202,6 +236,8 @@ async def get_workspace_end_users(
except Exception as e: except Exception as e:
api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}") api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}")
total_elapsed = (time.perf_counter() - total_start) * 1000
api_logger.info(f"[性能统计] 接口总耗时: {total_elapsed:.2f}ms, 用户数: {user_count}")
api_logger.info(f"成功获取 {len(end_users)} 个宿主记录") api_logger.info(f"成功获取 {len(end_users)} 个宿主记录")
return success(data=result, msg="宿主列表获取成功") return success(data=result, msg="宿主列表获取成功")