[feat] User list pagination function

This commit is contained in:
lanceyq
2026-03-31 12:26:46 +08:00
parent 3ea42ac27f
commit 2dfc3b25d8
2 changed files with 160 additions and 71 deletions

View File

@@ -1,3 +1,4 @@
import asyncio
from fastapi import APIRouter, Depends, HTTPException, status, Query from fastapi import APIRouter, Depends, HTTPException, status, Query
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -47,62 +48,62 @@ def get_workspace_total_end_users(
@router.get("/end_users", response_model=ApiResponse) @router.get("/end_users", response_model=ApiResponse)
async def get_workspace_end_users( async def get_workspace_end_users(
workspace_id: Optional[str] = Query(None, description="工作空间ID可选默认当前用户工作空间"),
keyword: Optional[str] = Query(None, description="搜索关键词(同时模糊匹配 other_name 和 id"),
page: int = Query(1, ge=1, description="页码从1开始"),
pagesize: int = Query(10, ge=1, description="每页数量"),
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
""" """
获取工作空间的宿主列表(高性能优化版本 v2 获取工作空间的宿主列表(分页查询,支持模糊搜索
优化策略: 返回工作空间下的宿主列表,支持分页查询和模糊搜索。
1. 批量查询 end_users一次查询而非循环 通过 keyword 参数同时模糊匹配 other_name 和 id 字段。
2. 并发查询所有用户的记忆数量Neo4j
3. RAG 模式使用批量查询(一次 SQL Args:
4. 只返回必要字段减少数据传输 workspace_id: 工作空间ID可选默认当前用户工作空间
5. 添加短期缓存减少重复查询 keyword: 搜索关键词(可选,同时模糊匹配 other_name 和 id
6. 并发执行配置查询和记忆数量查询 page: 页码从1开始默认1
pagesize: 每页数量默认10
返回格式: db: 数据库会话
{ current_user: 当前用户
"end_user": {"id": "uuid", "other_name": "名称"},
"memory_num": {"total": 数量}, Returns:
"memory_config": {"memory_config_id": "id", "memory_config_name": "名称"} ApiResponse: 包含宿主列表和分页信息
}
""" """
import asyncio # 如果未提供 workspace_id使用当前用户的工作空间
import json if workspace_id is None:
from app.aioRedis import aio_redis_get, aio_redis_set workspace_id = current_user.current_workspace_id
workspace_id = current_user.current_workspace_id
# 尝试从缓存获取30秒缓存
cache_key = f"end_users:workspace:{workspace_id}"
try:
cached_data = await aio_redis_get(cache_key)
if cached_data:
api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}")
return success(data=json.loads(cached_data), msg="宿主列表获取成功")
except Exception as e:
api_logger.warning(f"Redis 缓存读取失败: {str(e)}")
# 获取当前空间类型 # 获取当前空间类型
current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user)
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表") api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表: keyword={keyword}, page={page}, pagesize={pagesize}")
# 获取 end_users(已优化为批量查询) # 获取分页的 end_users
end_users = memory_dashboard_service.get_workspace_end_users( end_users_result = memory_dashboard_service.get_workspace_end_users_paginated(
db=db, db=db,
workspace_id=workspace_id, workspace_id=workspace_id,
current_user=current_user current_user=current_user,
page=page,
pagesize=pagesize,
keyword=keyword
) )
end_users = end_users_result.get("items", [])
total = end_users_result.get("total", 0)
if not end_users: if not end_users:
api_logger.info("工作空间下没有宿主") api_logger.info(f"工作空间下没有宿主或当前页无数据: total={total}, page={page}")
# 缓存空结果,避免重复查询 return success(data={
try: "items": [],
await aio_redis_set(cache_key, json.dumps([]), expire=30) "page": {
except Exception as e: "page": page,
api_logger.warning(f"Redis 缓存写入失败: {str(e)}") "pagesize": pagesize,
return success(data=[], msg="宿主列表获取成功") "total": total,
"hasnext": (page * pagesize) < total
}
}, msg="宿主列表获取成功")
end_user_ids = [str(user.id) for user in end_users] end_user_ids = [str(user.id) for user in end_users]
# 并发执行两个独立的查询任务 # 并发执行两个独立的查询任务
@@ -170,13 +171,13 @@ async def get_workspace_end_users(
get_memory_configs(), get_memory_configs(),
get_memory_nums() get_memory_nums()
) )
# 构建结果(优化:使用列表推导式) # 构建结果列表
result = [] items = []
for end_user in end_users: for end_user in end_users:
user_id = str(end_user.id) user_id = str(end_user.id)
config_info = memory_configs_map.get(user_id, {}) config_info = memory_configs_map.get(user_id, {})
result.append({ items.append({
'end_user': { 'end_user': {
'id': user_id, 'id': user_id,
'other_name': end_user.other_name 'other_name': end_user.other_name
@@ -187,12 +188,6 @@ async def get_workspace_end_users(
"memory_config_name": config_info.get("memory_config_name") "memory_config_name": config_info.get("memory_config_name")
} }
}) })
# 写入缓存30秒过期
try:
await aio_redis_set(cache_key, json.dumps(result), expire=30)
except Exception as e:
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
# 触发社区聚类补全任务(异步,不阻塞接口响应) # 触发社区聚类补全任务(异步,不阻塞接口响应)
try: try:
@@ -202,7 +197,18 @@ async def get_workspace_end_users(
except Exception as e: except Exception as e:
api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}") api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}")
api_logger.info(f"成功获取 {len(end_users)} 个宿主记录") # 构建分页响应
result = {
"items": items,
"page": {
"page": page,
"pagesize": pagesize,
"total": total,
"hasnext": (page * pagesize) < total
}
}
api_logger.info(f"成功获取 {len(end_users)} 个宿主记录,总计 {total}")
return success(data=result, msg="宿主列表获取成功") return success(data=result, msg="宿主列表获取成功")

View File

@@ -1,11 +1,12 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from typing import List, Optional from sqlalchemy import desc, nullslast, or_, and_, cast, String
from typing import List, Optional, Dict, Any
import uuid import uuid
from fastapi import HTTPException from fastapi import HTTPException
from app.models.user_model import User from app.models.user_model import User
from app.models.app_model import App from app.models.app_model import App
from app.models.end_user_model import EndUser from app.models.end_user_model import EndUser, EndUser as EndUserModel
from app.models.memory_increment_model import MemoryIncrement from app.models.memory_increment_model import MemoryIncrement
from app.repositories import ( from app.repositories import (
@@ -49,44 +50,40 @@ def get_current_workspace_type(
def get_workspace_end_users( def get_workspace_end_users(
db: Session, db: Session,
workspace_id: uuid.UUID, workspace_id: uuid.UUID,
current_user: User current_user: User
) -> List[EndUser]: ) -> List[EndUser]:
"""获取工作空间的所有宿主(优化版本:减少数据库查询次数) """获取工作空间的所有宿主(优化版本:减少数据库查询次数)
返回结果按 created_at 从新到旧排序NULL 值排在最后) 返回结果按 created_at 从新到旧排序NULL 值排在最后)
""" """
business_logger.info(f"获取工作空间宿主列表: workspace_id={workspace_id}, 操作者: {current_user.username}") business_logger.info(f"获取工作空间宿主列表: workspace_id={workspace_id}, 操作者: {current_user.username}")
try: try:
# 查询应用ORM # 查询应用ORM
apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id) apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id)
if not apps_orm: if not apps_orm:
business_logger.info("工作空间下没有应用") business_logger.info("工作空间下没有应用")
return [] return []
# 提取所有 app_id # 提取所有 app_id
# app_ids = [app.id for app in apps_orm] # app_ids = [app.id for app in apps_orm]
# 批量查询所有 end_users一次查询而非循环查询 # 批量查询所有 end_users一次查询而非循环查询
# 按 created_at 降序排序NULL 值排在最后id 作为次级排序键保证确定性 # 按 created_at 降序排序NULL 值排在最后id 作为次级排序键保证确定性
from app.models.end_user_model import EndUser as EndUserModel
from sqlalchemy import desc, nullslast
end_users_orm = db.query(EndUserModel).filter( end_users_orm = db.query(EndUserModel).filter(
EndUserModel.workspace_id == workspace_id EndUserModel.workspace_id == workspace_id
).order_by( ).order_by(
nullslast(desc(EndUserModel.created_at)), nullslast(desc(EndUserModel.created_at)),
desc(EndUserModel.id) desc(EndUserModel.id)
).all() ).all()
# 转换为 Pydantic 模型(只在需要时转换) # 转换为 Pydantic 模型(只在需要时转换)
end_users = [EndUserSchema.model_validate(eu) for eu in end_users_orm] end_users = [EndUserSchema.model_validate(eu) for eu in end_users_orm]
business_logger.info(f"成功获取 {len(end_users)} 个宿主记录") business_logger.info(f"成功获取 {len(end_users)} 个宿主记录")
return end_users return end_users
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
@@ -94,6 +91,92 @@ def get_workspace_end_users(
raise raise
def get_workspace_end_users_paginated(
db: Session,
workspace_id: uuid.UUID,
current_user: User,
page: int,
pagesize: int,
keyword: Optional[str] = None
) -> Dict[str, Any]:
"""获取工作空间的宿主列表(分页版本,支持模糊搜索)
返回结果按 created_at 从新到旧排序NULL 值排在最后)
支持通过 keyword 参数同时模糊搜索 other_name 和 id 字段
Args:
db: 数据库会话
workspace_id: 工作空间ID
current_user: 当前用户
page: 页码从1开始
pagesize: 每页数量
keyword: 搜索关键词(可选,同时模糊匹配 other_name 和 id
Returns:
dict: 包含 items宿主列表和 total总记录数的字典
"""
business_logger.info(f"获取工作空间宿主列表(分页): workspace_id={workspace_id}, keyword={keyword}, page={page}, pagesize={pagesize}, 操作者: {current_user.username}")
try:
# 构建基础查询
base_query = db.query(EndUserModel).filter(
EndUserModel.workspace_id == workspace_id
)
# 构建搜索条件过滤空字符串和None
keyword = keyword.strip() if keyword else None
if keyword:
keyword_pattern = f"%{keyword}%"
# 优先匹配 other_name如果 other_name 为空则匹配 id
# 使用 OR 条件:匹配 other_name 不为空的数据,或者 other_name 为空但 id 匹配的数据
base_query = base_query.filter(
or_(
# 情况1other_name 不为空且匹配 keyword
and_(
EndUserModel.other_name != "",
EndUserModel.other_name.isnot(None),
EndUserModel.other_name.ilike(keyword_pattern)
),
# 情况2other_name 为空或 None但 id 匹配 keyword
and_(
or_(
EndUserModel.other_name == "",
EndUserModel.other_name.is_(None)
),
cast(EndUserModel.id, String).ilike(keyword_pattern)
)
)
)
business_logger.info(f"应用模糊搜索: keyword={keyword}(优先匹配 other_name无 other_name 时匹配 id")
# 获取总记录数
total = base_query.count()
if total == 0:
business_logger.info("工作空间下没有宿主")
return {"items": [], "total": 0}
# 分页查询
# 按 created_at 降序排序NULL 值排在最后id 作为次级排序键保证确定性
end_users_orm = base_query.order_by(
nullslast(desc(EndUserModel.created_at)),
desc(EndUserModel.id)
).offset((page - 1) * pagesize).limit(pagesize).all()
# 转换为 Pydantic 模型
end_users = [EndUserSchema.model_validate(eu) for eu in end_users_orm]
business_logger.info(f"成功获取 {len(end_users)} 个宿主记录,总计 {total}")
return {"items": end_users, "total": total}
except HTTPException:
raise
except Exception as e:
business_logger.error(f"获取工作空间宿主列表(分页)失败: workspace_id={workspace_id} - {str(e)}")
raise
def get_workspace_memory_increment( def get_workspace_memory_increment(
db: Session, db: Session,
workspace_id: uuid.UUID, workspace_id: uuid.UUID,