Compare commits
1 Commits
main
...
research/t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
82c6d1a90f |
@@ -1,3 +1,5 @@
|
|||||||
|
import time
|
||||||
|
from contextlib import contextmanager
|
||||||
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
@@ -16,6 +18,18 @@ from app.core.logging_config import get_api_logger
|
|||||||
# 获取API专用日志器
|
# 获取API专用日志器
|
||||||
api_logger = get_api_logger()
|
api_logger = get_api_logger()
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def timer(label: str, user_count: int = 0):
|
||||||
|
"""上下文管理器:用于测量代码块执行时间"""
|
||||||
|
start = time.perf_counter()
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒
|
||||||
|
extra_info = f", 用户数: {user_count}" if user_count > 0 else ""
|
||||||
|
api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}")
|
||||||
|
|
||||||
router = APIRouter(
|
router = APIRouter(
|
||||||
prefix="/dashboard",
|
prefix="/dashboard",
|
||||||
tags=["Dashboard"],
|
tags=["Dashboard"],
|
||||||
@@ -52,7 +66,7 @@ async def get_workspace_end_users(
|
|||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
获取工作空间的宿主列表(高性能优化版本 v2)
|
获取工作空间的宿主列表(高性能优化版本 v2)
|
||||||
|
|
||||||
优化策略:
|
优化策略:
|
||||||
1. 批量查询 end_users(一次查询而非循环)
|
1. 批量查询 end_users(一次查询而非循环)
|
||||||
2. 并发查询所有用户的记忆数量(Neo4j)
|
2. 并发查询所有用户的记忆数量(Neo4j)
|
||||||
@@ -60,7 +74,7 @@ async def get_workspace_end_users(
|
|||||||
4. 只返回必要字段减少数据传输
|
4. 只返回必要字段减少数据传输
|
||||||
5. 添加短期缓存减少重复查询
|
5. 添加短期缓存减少重复查询
|
||||||
6. 并发执行配置查询和记忆数量查询
|
6. 并发执行配置查询和记忆数量查询
|
||||||
|
|
||||||
返回格式:
|
返回格式:
|
||||||
{
|
{
|
||||||
"end_user": {"id": "uuid", "other_name": "名称"},
|
"end_user": {"id": "uuid", "other_name": "名称"},
|
||||||
@@ -70,129 +84,149 @@ async def get_workspace_end_users(
|
|||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
from app.aioRedis import aio_redis_get, aio_redis_set
|
# from app.aioRedis import aio_redis_get, aio_redis_set
|
||||||
|
|
||||||
|
# 总耗时统计
|
||||||
|
total_start = time.perf_counter()
|
||||||
|
|
||||||
workspace_id = current_user.current_workspace_id
|
workspace_id = current_user.current_workspace_id
|
||||||
|
|
||||||
# 尝试从缓存获取(30秒缓存)
|
# # 尝试从缓存获取(30秒缓存)- 暂时注释以便进行性能测试
|
||||||
cache_key = f"end_users:workspace:{workspace_id}"
|
# with timer("Redis缓存读取"):
|
||||||
try:
|
# cache_key = f"end_users:workspace:{workspace_id}"
|
||||||
cached_data = await aio_redis_get(cache_key)
|
# try:
|
||||||
if cached_data:
|
# cached_data = await aio_redis_get(cache_key)
|
||||||
api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}")
|
# if cached_data:
|
||||||
return success(data=json.loads(cached_data), msg="宿主列表获取成功")
|
# api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}")
|
||||||
except Exception as e:
|
# return success(data=json.loads(cached_data), msg="宿主列表获取成功")
|
||||||
api_logger.warning(f"Redis 缓存读取失败: {str(e)}")
|
# except Exception as e:
|
||||||
|
# api_logger.warning(f"Redis 缓存读取失败: {str(e)}")
|
||||||
|
|
||||||
# 获取当前空间类型
|
# 获取当前空间类型
|
||||||
current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user)
|
with timer("获取空间类型"):
|
||||||
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表")
|
current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user)
|
||||||
|
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表, 类型: {current_workspace_type}")
|
||||||
|
|
||||||
# 获取 end_users(已优化为批量查询)
|
# 获取 end_users(已优化为批量查询)
|
||||||
end_users = memory_dashboard_service.get_workspace_end_users(
|
with timer("获取用户列表"):
|
||||||
db=db,
|
end_users = memory_dashboard_service.get_workspace_end_users(
|
||||||
workspace_id=workspace_id,
|
db=db,
|
||||||
current_user=current_user
|
workspace_id=workspace_id,
|
||||||
)
|
current_user=current_user
|
||||||
|
)
|
||||||
if not end_users:
|
if not end_users:
|
||||||
api_logger.info("工作空间下没有宿主")
|
api_logger.info("工作空间下没有宿主")
|
||||||
# 缓存空结果,避免重复查询
|
# # 缓存空结果,避免重复查询 - 暂时注释
|
||||||
try:
|
# try:
|
||||||
await aio_redis_set(cache_key, json.dumps([]), expire=30)
|
# await aio_redis_set(cache_key, json.dumps([]), expire=30)
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
|
# api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
|
||||||
return success(data=[], msg="宿主列表获取成功")
|
return success(data=[], msg="宿主列表获取成功")
|
||||||
|
|
||||||
end_user_ids = [str(user.id) for user in end_users]
|
end_user_ids = [str(user.id) for user in end_users]
|
||||||
|
user_count = len(end_user_ids)
|
||||||
|
api_logger.info(f"需要处理的用户数: {user_count}")
|
||||||
|
|
||||||
# 并发执行两个独立的查询任务
|
# 并发执行两个独立的查询任务
|
||||||
async def get_memory_configs():
|
async def get_memory_configs():
|
||||||
"""获取记忆配置(在线程池中执行同步查询)"""
|
"""获取记忆配置(在线程池中执行同步查询)"""
|
||||||
try:
|
with timer("功能模块-获取记忆配置", user_count):
|
||||||
return await asyncio.to_thread(
|
try:
|
||||||
get_end_users_connected_configs_batch,
|
return await asyncio.to_thread(
|
||||||
end_user_ids, db
|
get_end_users_connected_configs_batch,
|
||||||
)
|
end_user_ids, db
|
||||||
except Exception as e:
|
)
|
||||||
api_logger.error(f"批量获取记忆配置失败: {str(e)}")
|
except Exception as e:
|
||||||
return {}
|
api_logger.error(f"批量获取记忆配置失败: {str(e)}")
|
||||||
|
return {}
|
||||||
|
|
||||||
async def get_memory_nums():
|
async def get_memory_nums():
|
||||||
"""获取记忆数量"""
|
"""获取记忆数量"""
|
||||||
if current_workspace_type == "rag":
|
with timer(f"功能模块-获取记忆数量[{current_workspace_type}]", user_count):
|
||||||
# RAG 模式:批量查询
|
if current_workspace_type == "rag":
|
||||||
try:
|
# RAG 模式:批量查询
|
||||||
chunk_map = await asyncio.to_thread(
|
with timer(" - RAG批量查询chunks"):
|
||||||
memory_dashboard_service.get_users_total_chunk_batch,
|
|
||||||
end_user_ids, db, current_user
|
|
||||||
)
|
|
||||||
return {uid: {"total": count} for uid, count in chunk_map.items()}
|
|
||||||
except Exception as e:
|
|
||||||
api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}")
|
|
||||||
return {uid: {"total": 0} for uid in end_user_ids}
|
|
||||||
|
|
||||||
elif current_workspace_type == "neo4j":
|
|
||||||
# Neo4j 模式:并发查询(带并发限制)
|
|
||||||
# 使用信号量限制并发数,避免大量用户时压垮 Neo4j
|
|
||||||
MAX_CONCURRENT_QUERIES = 10
|
|
||||||
semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES)
|
|
||||||
|
|
||||||
async def get_neo4j_memory_num(end_user_id: str):
|
|
||||||
async with semaphore:
|
|
||||||
try:
|
try:
|
||||||
return await memory_storage_service.search_all(end_user_id)
|
chunk_map = await asyncio.to_thread(
|
||||||
|
memory_dashboard_service.get_users_total_chunk_batch,
|
||||||
|
end_user_ids, db, current_user
|
||||||
|
)
|
||||||
|
return {uid: {"total": count} for uid, count in chunk_map.items()}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}")
|
api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}")
|
||||||
return {"total": 0}
|
return {uid: {"total": 0} for uid in end_user_ids}
|
||||||
|
|
||||||
memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids])
|
elif current_workspace_type == "neo4j":
|
||||||
return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))}
|
# Neo4j 模式:并发查询(带并发限制)
|
||||||
|
# 使用信号量限制并发数,避免大量用户时压垮 Neo4j
|
||||||
return {uid: {"total": 0} for uid in end_user_ids}
|
MAX_CONCURRENT_QUERIES = 10
|
||||||
|
semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES)
|
||||||
|
|
||||||
|
async def get_neo4j_memory_num(end_user_id: str):
|
||||||
|
async with semaphore:
|
||||||
|
single_start = time.perf_counter()
|
||||||
|
try:
|
||||||
|
result = await memory_storage_service.search_all(end_user_id)
|
||||||
|
elapsed = (time.perf_counter() - single_start) * 1000
|
||||||
|
api_logger.info(f" - Neo4j单用户查询[{end_user_id}]: {elapsed:.2f}ms")
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}")
|
||||||
|
return {"total": 0}
|
||||||
|
|
||||||
|
with timer(" - Neo4j并发查询所有用户"):
|
||||||
|
memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids])
|
||||||
|
return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))}
|
||||||
|
|
||||||
|
return {uid: {"total": 0} for uid in end_user_ids}
|
||||||
|
|
||||||
# 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据
|
# 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据
|
||||||
try:
|
with timer("触发Celery初始化任务"):
|
||||||
from app.celery_app import celery_app as _celery_app
|
try:
|
||||||
_celery_app.send_task(
|
from app.celery_app import celery_app as _celery_app
|
||||||
"app.tasks.init_implicit_emotions_for_users",
|
_celery_app.send_task(
|
||||||
kwargs={"end_user_ids": end_user_ids},
|
"app.tasks.init_implicit_emotions_for_users",
|
||||||
)
|
kwargs={"end_user_ids": end_user_ids},
|
||||||
_celery_app.send_task(
|
)
|
||||||
"app.tasks.init_interest_distribution_for_users",
|
_celery_app.send_task(
|
||||||
kwargs={"end_user_ids": end_user_ids},
|
"app.tasks.init_interest_distribution_for_users",
|
||||||
)
|
kwargs={"end_user_ids": end_user_ids},
|
||||||
api_logger.info(f"已触发按需初始化任务,候选用户数: {len(end_user_ids)}")
|
)
|
||||||
except Exception as e:
|
api_logger.info(f"已触发按需初始化任务,候选用户数: {len(end_user_ids)}")
|
||||||
api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}")
|
except Exception as e:
|
||||||
|
api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}")
|
||||||
|
|
||||||
# 并发执行配置查询和记忆数量查询
|
# 并发执行配置查询和记忆数量查询
|
||||||
memory_configs_map, memory_nums_map = await asyncio.gather(
|
with timer("并发执行两个功能模块"):
|
||||||
get_memory_configs(),
|
memory_configs_map, memory_nums_map = await asyncio.gather(
|
||||||
get_memory_nums()
|
get_memory_configs(),
|
||||||
)
|
get_memory_nums()
|
||||||
|
)
|
||||||
|
|
||||||
# 构建结果(优化:使用列表推导式)
|
# 构建结果(优化:使用列表推导式)
|
||||||
result = []
|
with timer("构建返回结果"):
|
||||||
for end_user in end_users:
|
result = []
|
||||||
user_id = str(end_user.id)
|
for end_user in end_users:
|
||||||
config_info = memory_configs_map.get(user_id, {})
|
user_id = str(end_user.id)
|
||||||
result.append({
|
config_info = memory_configs_map.get(user_id, {})
|
||||||
'end_user': {
|
result.append({
|
||||||
'id': user_id,
|
'end_user': {
|
||||||
'other_name': end_user.other_name
|
'id': user_id,
|
||||||
},
|
'other_name': end_user.other_name
|
||||||
'memory_num': memory_nums_map.get(user_id, {"total": 0}),
|
},
|
||||||
'memory_config': {
|
'memory_num': memory_nums_map.get(user_id, {"total": 0}),
|
||||||
"memory_config_id": config_info.get("memory_config_id"),
|
'memory_config': {
|
||||||
"memory_config_name": config_info.get("memory_config_name")
|
"memory_config_id": config_info.get("memory_config_id"),
|
||||||
}
|
"memory_config_name": config_info.get("memory_config_name")
|
||||||
})
|
}
|
||||||
|
})
|
||||||
# 写入缓存(30秒过期)
|
|
||||||
try:
|
# # 写入缓存(30秒过期)- 暂时注释以便进行性能测试
|
||||||
await aio_redis_set(cache_key, json.dumps(result), expire=30)
|
# with timer("Redis缓存写入"):
|
||||||
except Exception as e:
|
# try:
|
||||||
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
|
# await aio_redis_set(cache_key, json.dumps(result), expire=30)
|
||||||
|
# except Exception as e:
|
||||||
|
# api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
|
||||||
|
|
||||||
# 触发社区聚类补全任务(异步,不阻塞接口响应)
|
# 触发社区聚类补全任务(异步,不阻塞接口响应)
|
||||||
try:
|
try:
|
||||||
@@ -202,6 +236,8 @@ async def get_workspace_end_users(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}")
|
api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}")
|
||||||
|
|
||||||
|
total_elapsed = (time.perf_counter() - total_start) * 1000
|
||||||
|
api_logger.info(f"[性能统计] 接口总耗时: {total_elapsed:.2f}ms, 用户数: {user_count}")
|
||||||
api_logger.info(f"成功获取 {len(end_users)} 个宿主记录")
|
api_logger.info(f"成功获取 {len(end_users)} 个宿主记录")
|
||||||
return success(data=result, msg="宿主列表获取成功")
|
return success(data=result, msg="宿主列表获取成功")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user