@@ -1,5 +1,3 @@
import time
from contextlib import contextmanager
from fastapi import APIRouter , Depends , HTTPException , status , Query
from pydantic import BaseModel , Field
from sqlalchemy . orm import Session
@@ -18,18 +16,6 @@ from app.core.logging_config import get_api_logger
# 获取API专用日志器
api_logger = get_api_logger ( )
@contextmanager
def timer ( label : str , user_count : int = 0 ) :
""" 上下文管理器:用于测量代码块执行时间 """
start = time . perf_counter ( )
try :
yield
finally :
elapsed = ( time . perf_counter ( ) - start ) * 1000 # 转换为毫秒
extra_info = f " , 用户数: { user_count } " if user_count > 0 else " "
api_logger . info ( f " [性能统计] { label } : { elapsed : .2f } ms { extra_info } " )
router = APIRouter (
prefix = " /dashboard " ,
tags = [ " Dashboard " ] ,
@@ -66,7 +52,7 @@ async def get_workspace_end_users(
) :
"""
获取工作空间的宿主列表(高性能优化版本 v2)
优化策略:
1. 批量查询 end_users( 一次查询而非循环)
2. 并发查询所有用户的记忆数量( Neo4j)
@@ -74,7 +60,7 @@ async def get_workspace_end_users(
4. 只返回必要字段减少数据传输
5. 添加短期缓存减少重复查询
6. 并发执行配置查询和记忆数量查询
返回格式:
{
" end_user " : { " id " : " uuid " , " other_name " : " 名称 " },
@@ -84,149 +70,129 @@ async def get_workspace_end_users(
"""
import asyncio
import json
# from app. aioRedis import aio_redis_get, aio_redis_set
# 总耗时统计
total_start = time . perf_counter ( )
from app . aioRedis import aio_redis_get, aio_redis_set
workspace_id = current_user . current_workspace_id
# # 尝试从缓存获取( 30秒缓存) - 暂时注释以便进行性能测试
# with timer("Redis缓存读取"):
# cache_key = f"end_users:workspace:{workspace_id}"
# try:
# cached_data = await aio_redis_get(cache_key)
# if cached_data:
# api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}" )
# return success(data=json.loads(cached_data), msg="宿主列表获取成功")
# except Exception as e:
# api_logger.warning(f"Redis 缓存读取失败: {str(e)}")
# 尝试从缓存获取( 30秒缓存)
cache_key = f " end_users:workspace: { workspace_id } "
try :
cached_data = await aio_redis_get ( cache_key )
if cached_data:
api_logger . info ( f " 从缓存获取宿主列表: workspace_id= { workspace_id } " )
return success ( data = json . loads ( cached_data ) , msg = " 宿主列表获取成功 " )
except Exception as e :
api_logger . warning ( f " Redis 缓存读取失败: { str ( e ) } " )
# 获取当前空间类型
with timer ( " 获取空间类型 " ) :
current_workspace_type = memory_dashboard_service . get_current_workspace_type ( db , workspace_id , current_user )
api_logger . info ( f " 用户 { current_user . username } 请求获取工作空间 { workspace_id } 的宿主列表, 类型: { current_workspace_type } " )
current_workspace_type = memory_dashboard_service . get_current_workspace_type ( db , workspace_id , current_user )
api_logger . info ( f " 用户 { current_user . username } 请求获取工作空间 { workspace_id } 的宿主列表 " )
# 获取 end_users( 已优化为批量查询)
with timer ( " 获取用户列表 " ) :
end_users = memory_dashboard_service . get_workspace_end_users (
db = db ,
workspace_id = workspace_id ,
current_user = current_user
)
end_users = memory_dashboard_service . get_workspace_end_users (
db = db ,
workspace_id = workspace_id ,
current_user = current_user
)
if not end_users :
api_logger . info ( " 工作空间下没有宿主 " )
# # 缓存空结果,避免重复查询 - 暂时注释
# try:
# await aio_redis_set( cache_key, json.dumps([]), expire=30 )
# except Exception as e :
# api_logger. warning(f" Redis 缓存写入失败: {str(e)}" )
# 缓存空结果,避免重复查询
try :
await aio_redis_set( cache_key, json . dumps ( [ ] ) , expire = 30 )
except Exception as e :
api_logger. warning( f " Redis 缓存写入失败: { str ( e ) } " )
return success ( data = [ ] , msg = " 宿主列表获取成功 " )
end_user_ids = [ str ( user . id ) for user in end_users ]
user_count = len ( end_user_ids )
api_logger . info ( f " 需要处理的用户数: { user_count } " )
# 并发执行两个独立的查询任务
async def get_memory_configs ( ) :
""" 获取记忆配置(在线程池中执行同步查询) """
with timer ( " 功能模块-获取记忆配置 " , user_count ) :
try :
return await asyncio . to_thread (
get_end_users_connected_configs_batch ,
end_user_ids , db
)
except Exception as e :
api_logger . error ( f " 批量获取记忆配置失败: { str ( e ) } " )
return { }
try :
return await asyncio . to_thread (
get_end_users_connected_configs_batch ,
end_user_ids , db
)
except Exception as e :
api_logger . error ( f " 批量获取记忆配置失败: { str ( e ) } " )
return { }
async def get_memory_nums ( ) :
""" 获取记忆数量 """
with timer ( f " 功能模块-获取记忆数量[ { current_workspace_type } ] " , user_count ) :
if current_workspace_type == " rag " :
# RAG 模式:批量查询
w ith timer ( " - RAG批量查询chunks " ) :
if current_workspace_type == " rag " :
# RAG 模式:批量查询
try :
chunk_map = awa it asyncio . to_thread (
memory_dashboard_service . get_users_total_chunk_batch ,
end_user_ids , db , current_user
)
return { uid : { " total " : count } for uid , count in chunk_map . items ( ) }
except Exception as e :
api_logger . error ( f " 批量获取 RAG chunk 数量失败: { str ( e ) } " )
return { uid : { " total " : 0 } for uid in end_user_ids }
elif current_workspace_type == " neo4j " :
# Neo4j 模式:并发查询(带并发限制)
# 使用信号量限制并发数,避免大量用户时压垮 Neo4j
MAX_CONCURRENT_QUERIES = 10
semaphore = asyncio . Semaphore ( MAX_CONCURRENT_QUERIES )
async def get_neo4j_memory_num ( end_user_id : str ) :
async with semaphore :
try :
chunk_map = await asyncio . to_threa d(
memory_dashboard_service . get_users_total_chunk_batch ,
end_user_ids , db , current_user
)
return { uid : { " total " : count } for uid , count in chunk_map . items ( ) }
return await memory_storage_service . search_all ( end_user_i d)
except Exception as e :
api_logger . error ( f " 批量获取 RAG chunk 数量失败: { str ( e ) } " )
return { uid : { " total " : 0 } for uid in end_user_ids }
elif current_workspace_type == " neo4j " :
# Neo4j 模式:并发查询(带并发限制)
# 使用信号量限制并发数,避免大量用户时压垮 Neo4j
MAX_CONCURRENT_QUERIES = 10
semaphore = asyncio . Semaphore ( MAX_CONCURRENT_QUERIES )
async def get_neo4j_memory_num ( end_user_id : str ) :
async with semaphore :
single_start = time . perf_counter ( )
try :
result = await memory_storage_service . search_all ( end_user_id )
elapsed = ( time . perf_counter ( ) - single_start ) * 1000
api_logger . info ( f " - Neo4j单用户查询[ { end_user_id } ]: { elapsed : .2f } ms " )
return result
except Exception as e :
api_logger . error ( f " 获取用户 { end_user_id } Neo4j 记忆数量失败: { str ( e ) } " )
return { " total " : 0 }
with timer ( " - Neo4j并发查询所有用户 " ) :
memory_nums_list = await asyncio . gather ( * [ get_neo4j_memory_num ( uid ) for uid in end_user_ids ] )
return { end_user_ids [ i ] : memory_nums_list [ i ] for i in range ( len ( end_user_ids ) ) }
return { uid : { " total " : 0 } for uid in end_user_ids }
api_logger . error ( f " 获取用户 { end_user_id } Neo4j 记忆 数量失败: { str ( e ) } " )
return { " total " : 0 }
memory_nums_list = await asyncio . gather ( * [ get_neo4j_memory_num ( uid ) for uid in end_user_ids ] )
return { end_user_ids [ i ] : memory_nums_list [ i ] for i in range ( len ( end_user_ids ) ) }
return { uid : { " total " : 0 } for uid in end_user_ids }
# 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据
with timer ( " 触发Celery初始化任务 " ) :
try :
from app . celery_app import celery_app as _celery_app
_celery_app . send_task (
" app.tasks.init_implicit_emotions_for_users " ,
kwargs = { " end_user_ids " : end_user_ids } ,
)
_celery_app . send_task (
" app.tasks.init_interest_distribution_for_users " ,
kwargs = { " end_user_ids " : end_user_ids } ,
)
api_logger . info ( f " 已触发按需初始化任务,候选用户数: { len ( end_user_ids ) } " )
except Exception as e :
api_logger . warning ( f " 触发按需初始化任务失败(不影响主流程): { e } " )
try :
from app . celery_app import celery_app as _celery_app
_celery_app. send_task (
" app.tasks.init_implicit_emotions_for_users " ,
kwargs = { " end_user_ids " : end_user_ids } ,
)
_celery_app . send_task (
" app.tasks.init_interest_distribution_for_users " ,
kwargs = { " end_user_ids " : end_user_ids } ,
)
api_logger . info ( f " 已触发按需初始化任务,候选用户数: { len ( end_user_ids ) } " )
except Exception as e :
api_logger . warning ( f " 触发按需初始化任务失败(不影响主流程): { e } " )
# 并发执行配置查询和记忆数量查询
with timer ( " 并发执行两个功能模块 " ) :
memory_configs_map , memory_nums_map = await asyncio . gather (
get_memory_config s ( ) ,
get_memory_nums ( )
)
memory_configs_map , memory_nums_map = await asyncio . gather (
get_ memory_configs( ) ,
get_memory_num s ( )
)
# 构建结果(优化:使用列表推导式)
with timer ( " 构建返回结果 " ) :
result = [ ]
for end_user in end_users :
user_id = str ( end_user . id )
config_info = memory_configs_map . get ( user_id , { } )
result . append ( {
' end_user ' : {
' id ' : user_id ,
' other_name ' : end_user . other_name
} ,
' memory_num ' : memory_nums_map . get ( user_id , { " total " : 0 } ) ,
' memory_config' : {
" memory_config_id " : config_info . get ( " memory_config_id " ) ,
" memory_config_name " : config_info . get ( " memory_config_name " )
}
} )
# # 写入缓存( 30秒过期) - 暂时注释以便进行性能测试
# with timer("Redis缓存写入"):
# try :
# await aio_redis_set(cache_key, json.dumps(result), expire=30 )
# except Exception as e:
# api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
result = [ ]
for end_user in end_users :
user_id = str ( end_user. id )
config_info = memory_configs_map . get ( user_id , { } )
result . append ( {
' end_user ' : {
' id ' : user_id ,
' other_name ' : end_user . other_name
} ,
' memory_num ' : memory_nums_map . get ( user_id , { " total " : 0 } ) ,
' memory_config ' : {
" memory_config_id " : config_info . get ( " memory_config_id " ) ,
" memory_config_name " : config_info . get ( " memory_config_name " )
}
} )
# 写入缓存( 30秒过期)
try :
await aio_redis_set ( cache_key , json . dumps ( result ) , expire = 30 )
except Exception as e :
api_logger . warning ( f " Redis 缓存写入失败: { str ( e ) } " )
# 触发社区聚类补全任务(异步,不阻塞接口响应)
try :
@@ -236,8 +202,6 @@ async def get_workspace_end_users(
except Exception as e :
api_logger . warning ( f " 触发社区聚类补全任务失败(不影响主流程): { str ( e ) } " )
total_elapsed = ( time . perf_counter ( ) - total_start ) * 1000
api_logger . info ( f " [性能统计] 接口总耗时: { total_elapsed : .2f } ms, 用户数: { user_count } " )
api_logger . info ( f " 成功获取 { len ( end_users ) } 个宿主记录 " )
return success ( data = result , msg = " 宿主列表获取成功 " )