Merge pull request #755 from SuanmoSuanyangTechnology/feature/enduser-page

Feature/enduser page
This commit is contained in:
Ke Sun
2026-04-01 11:15:33 +08:00
committed by GitHub
5 changed files with 243 additions and 91 deletions

View File

@@ -1,3 +1,5 @@
import asyncio
import uuid
from fastapi import APIRouter, Depends, HTTPException, status, Query from fastapi import APIRouter, Depends, HTTPException, status, Query
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -47,64 +49,64 @@ def get_workspace_total_end_users(
@router.get("/end_users", response_model=ApiResponse) @router.get("/end_users", response_model=ApiResponse)
async def get_workspace_end_users( async def get_workspace_end_users(
workspace_id: Optional[uuid.UUID] = Query(None, description="工作空间ID可选默认当前用户工作空间"),
keyword: Optional[str] = Query(None, description="搜索关键词(同时模糊匹配 other_name 和 id"),
page: int = Query(1, ge=1, description="页码从1开始"),
pagesize: int = Query(10, ge=1, description="每页数量"),
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
""" """
获取工作空间的宿主列表(高性能优化版本 v2 获取工作空间的宿主列表(分页查询,支持模糊搜索
优化策略: 返回工作空间下的宿主列表,支持分页查询和模糊搜索。
1. 批量查询 end_users一次查询而非循环 通过 keyword 参数同时模糊匹配 other_name 和 id 字段。
2. 并发查询所有用户的记忆数量Neo4j
3. RAG 模式使用批量查询(一次 SQL Args:
4. 只返回必要字段减少数据传输 workspace_id: 工作空间ID可选默认当前用户工作空间
5. 添加短期缓存减少重复查询 keyword: 搜索关键词(可选,同时模糊匹配 other_name 和 id
6. 并发执行配置查询和记忆数量查询 page: 页码从1开始默认1
pagesize: 每页数量默认10
返回格式: db: 数据库会话
{ current_user: 当前用户
"end_user": {"id": "uuid", "other_name": "名称"},
"memory_num": {"total": 数量}, Returns:
"memory_config": {"memory_config_id": "id", "memory_config_name": "名称"} ApiResponse: 包含宿主列表和分页信息
}
""" """
import asyncio # 如果未提供 workspace_id使用当前用户的工作空间
import json if workspace_id is None:
from app.aioRedis import aio_redis_get, aio_redis_set workspace_id = current_user.current_workspace_id
workspace_id = current_user.current_workspace_id
# 尝试从缓存获取30秒缓存
cache_key = f"end_users:workspace:{workspace_id}"
try:
cached_data = await aio_redis_get(cache_key)
if cached_data:
api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}")
return success(data=json.loads(cached_data), msg="宿主列表获取成功")
except Exception as e:
api_logger.warning(f"Redis 缓存读取失败: {str(e)}")
# 获取当前空间类型 # 获取当前空间类型
current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user)
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表") api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表, 类型: {current_workspace_type}")
# 获取 end_users(已优化为批量查询) # 获取分页的 end_users
end_users = memory_dashboard_service.get_workspace_end_users( end_users_result = memory_dashboard_service.get_workspace_end_users_paginated(
db=db, db=db,
workspace_id=workspace_id, workspace_id=workspace_id,
current_user=current_user current_user=current_user,
page=page,
pagesize=pagesize,
keyword=keyword
) )
end_users = end_users_result.get("items", [])
total = end_users_result.get("total", 0)
if not end_users: if not end_users:
api_logger.info("工作空间下没有宿主") api_logger.info(f"工作空间下没有宿主或当前页无数据: total={total}, page={page}")
# 缓存空结果,避免重复查询 return success(data={
try: "items": [],
await aio_redis_set(cache_key, json.dumps([]), expire=30) "page": {
except Exception as e: "page": page,
api_logger.warning(f"Redis 缓存写入失败: {str(e)}") "pagesize": pagesize,
return success(data=[], msg="宿主列表获取成功") "total": total,
"hasnext": (page * pagesize) < total
}
}, msg="宿主列表获取成功")
end_user_ids = [str(user.id) for user in end_users] end_user_ids = [str(user.id) for user in end_users]
# 并发执行两个独立的查询任务 # 并发执行两个独立的查询任务
async def get_memory_configs(): async def get_memory_configs():
"""获取记忆配置(在线程池中执行同步查询)""" """获取记忆配置(在线程池中执行同步查询)"""
@@ -116,7 +118,7 @@ async def get_workspace_end_users(
except Exception as e: except Exception as e:
api_logger.error(f"批量获取记忆配置失败: {str(e)}") api_logger.error(f"批量获取记忆配置失败: {str(e)}")
return {} return {}
async def get_memory_nums(): async def get_memory_nums():
"""获取记忆数量""" """获取记忆数量"""
if current_workspace_type == "rag": if current_workspace_type == "rag":
@@ -130,26 +132,18 @@ async def get_workspace_end_users(
except Exception as e: except Exception as e:
api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}") api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}")
return {uid: {"total": 0} for uid in end_user_ids} return {uid: {"total": 0} for uid in end_user_ids}
elif current_workspace_type == "neo4j": elif current_workspace_type == "neo4j":
# Neo4j 模式:并发查询(带并发限制 # Neo4j 模式:批量查询(简化版本只返回total
# 使用信号量限制并发数,避免大量用户时压垮 Neo4j try:
MAX_CONCURRENT_QUERIES = 10 batch_result = await memory_storage_service.search_all_batch(end_user_ids)
semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES) return {uid: {"total": count} for uid, count in batch_result.items()}
except Exception as e:
async def get_neo4j_memory_num(end_user_id: str): api_logger.error(f"批量获取 Neo4j 记忆数量失败: {str(e)}")
async with semaphore: return {uid: {"total": 0} for uid in end_user_ids}
try:
return await memory_storage_service.search_all(end_user_id)
except Exception as e:
api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}")
return {"total": 0}
memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids])
return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))}
return {uid: {"total": 0} for uid in end_user_ids} return {uid: {"total": 0} for uid in end_user_ids}
# 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据 # 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据
try: try:
from app.celery_app import celery_app as _celery_app from app.celery_app import celery_app as _celery_app
@@ -170,13 +164,13 @@ async def get_workspace_end_users(
get_memory_configs(), get_memory_configs(),
get_memory_nums() get_memory_nums()
) )
# 构建结果(优化:使用列表推导式) # 构建结果列表
result = [] items = []
for end_user in end_users: for end_user in end_users:
user_id = str(end_user.id) user_id = str(end_user.id)
config_info = memory_configs_map.get(user_id, {}) config_info = memory_configs_map.get(user_id, {})
result.append({ items.append({
'end_user': { 'end_user': {
'id': user_id, 'id': user_id,
'other_name': end_user.other_name 'other_name': end_user.other_name
@@ -187,12 +181,6 @@ async def get_workspace_end_users(
"memory_config_name": config_info.get("memory_config_name") "memory_config_name": config_info.get("memory_config_name")
} }
}) })
# 写入缓存30秒过期
try:
await aio_redis_set(cache_key, json.dumps(result), expire=30)
except Exception as e:
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
# 触发社区聚类补全任务(异步,不阻塞接口响应) # 触发社区聚类补全任务(异步,不阻塞接口响应)
try: try:
@@ -202,7 +190,18 @@ async def get_workspace_end_users(
except Exception as e: except Exception as e:
api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}") api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}")
api_logger.info(f"成功获取 {len(end_users)} 个宿主记录") # 构建分页响应
result = {
"items": items,
"page": {
"page": page,
"pagesize": pagesize,
"total": total,
"hasnext": (page * pagesize) < total
}
}
api_logger.info(f"成功获取 {len(end_users)} 个宿主记录,总计 {total}")
return success(data=result, msg="宿主列表获取成功") return success(data=result, msg="宿主列表获取成功")

View File

@@ -78,6 +78,15 @@ class MemoryConfigRepository:
OPTIONAL MATCH (n) WHERE n.end_user_id = $end_user_id RETURN 'ALL' AS Label, COUNT(n) AS Count OPTIONAL MATCH (n) WHERE n.end_user_id = $end_user_id RETURN 'ALL' AS Label, COUNT(n) AS Count
""" """
# 批量查询多个用户的记忆数量简化版本只返回total
SEARCH_FOR_ALL_BATCH = """
MATCH (n) WHERE n.end_user_id IN $end_user_ids
RETURN
n.end_user_id as user_id,
count(n) as total
ORDER BY user_id
"""
# Extracted entity details within group/app/user # Extracted entity details within group/app/user
SEARCH_FOR_DETIALS = """ SEARCH_FOR_DETIALS = """
MATCH (n:ExtractedEntity) MATCH (n:ExtractedEntity)

View File

@@ -1,11 +1,12 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from typing import List, Optional from sqlalchemy import desc, nullslast, or_, and_, cast, String
from typing import List, Optional, Dict, Any
import uuid import uuid
from fastapi import HTTPException from fastapi import HTTPException
from app.models.user_model import User from app.models.user_model import User
from app.models.app_model import App from app.models.app_model import App
from app.models.end_user_model import EndUser from app.models.end_user_model import EndUser, EndUser as EndUserModel
from app.models.memory_increment_model import MemoryIncrement from app.models.memory_increment_model import MemoryIncrement
from app.repositories import ( from app.repositories import (
@@ -49,44 +50,40 @@ def get_current_workspace_type(
def get_workspace_end_users( def get_workspace_end_users(
db: Session, db: Session,
workspace_id: uuid.UUID, workspace_id: uuid.UUID,
current_user: User current_user: User
) -> List[EndUser]: ) -> List[EndUser]:
"""获取工作空间的所有宿主(优化版本:减少数据库查询次数) """获取工作空间的所有宿主(优化版本:减少数据库查询次数)
返回结果按 created_at 从新到旧排序NULL 值排在最后) 返回结果按 created_at 从新到旧排序NULL 值排在最后)
""" """
business_logger.info(f"获取工作空间宿主列表: workspace_id={workspace_id}, 操作者: {current_user.username}") business_logger.info(f"获取工作空间宿主列表: workspace_id={workspace_id}, 操作者: {current_user.username}")
try: try:
# 查询应用ORM # 查询应用ORM
apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id) apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id)
if not apps_orm: if not apps_orm:
business_logger.info("工作空间下没有应用") business_logger.info("工作空间下没有应用")
return [] return []
# 提取所有 app_id # 提取所有 app_id
# app_ids = [app.id for app in apps_orm] # app_ids = [app.id for app in apps_orm]
# 批量查询所有 end_users一次查询而非循环查询 # 批量查询所有 end_users一次查询而非循环查询
# 按 created_at 降序排序NULL 值排在最后id 作为次级排序键保证确定性 # 按 created_at 降序排序NULL 值排在最后id 作为次级排序键保证确定性
from app.models.end_user_model import EndUser as EndUserModel
from sqlalchemy import desc, nullslast
end_users_orm = db.query(EndUserModel).filter( end_users_orm = db.query(EndUserModel).filter(
EndUserModel.workspace_id == workspace_id EndUserModel.workspace_id == workspace_id
).order_by( ).order_by(
nullslast(desc(EndUserModel.created_at)), nullslast(desc(EndUserModel.created_at)),
desc(EndUserModel.id) desc(EndUserModel.id)
).all() ).all()
# 转换为 Pydantic 模型(只在需要时转换) # 转换为 Pydantic 模型(只在需要时转换)
end_users = [EndUserSchema.model_validate(eu) for eu in end_users_orm] end_users = [EndUserSchema.model_validate(eu) for eu in end_users_orm]
business_logger.info(f"成功获取 {len(end_users)} 个宿主记录") business_logger.info(f"成功获取 {len(end_users)} 个宿主记录")
return end_users return end_users
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
@@ -94,6 +91,85 @@ def get_workspace_end_users(
raise raise
def get_workspace_end_users_paginated(
db: Session,
workspace_id: uuid.UUID,
current_user: User,
page: int,
pagesize: int,
keyword: Optional[str] = None
) -> Dict[str, Any]:
"""获取工作空间的宿主列表(分页版本,支持模糊搜索)
返回结果按 created_at 从新到旧排序NULL 值排在最后)
支持通过 keyword 参数同时模糊搜索 other_name 和 id 字段
Args:
db: 数据库会话
workspace_id: 工作空间ID
current_user: 当前用户
page: 页码从1开始
pagesize: 每页数量
keyword: 搜索关键词(可选,同时模糊匹配 other_name 和 id
Returns:
dict: 包含 items宿主列表和 total总记录数的字典
"""
business_logger.info(f"获取工作空间宿主列表(分页): workspace_id={workspace_id}, keyword={keyword}, page={page}, pagesize={pagesize}, 操作者: {current_user.username}")
try:
# 构建基础查询
base_query = db.query(EndUserModel).filter(
EndUserModel.workspace_id == workspace_id
)
# 构建搜索条件过滤空字符串和None
keyword = keyword.strip() if keyword else None
if keyword:
keyword_pattern = f"%{keyword}%"
# other_name 匹配始终生效id 匹配仅对 other_name 为空的记录生效
base_query = base_query.filter(
or_(
EndUserModel.other_name.ilike(keyword_pattern),
and_(
or_(
EndUserModel.other_name.is_(None),
EndUserModel.other_name == "",
),
cast(EndUserModel.id, String).ilike(keyword_pattern),
),
)
)
business_logger.info(f"应用模糊搜索: keyword={keyword}(匹配 other_nameother_name 为空时匹配 id")
# 获取总记录数
total = base_query.count()
if total == 0:
business_logger.info("工作空间下没有宿主")
return {"items": [], "total": 0}
# 分页查询
# 按 created_at 降序排序NULL 值排在最后id 作为次级排序键保证确定性
end_users_orm = base_query.order_by(
nullslast(desc(EndUserModel.created_at)),
desc(EndUserModel.id)
).offset((page - 1) * pagesize).limit(pagesize).all()
# 转换为 Pydantic 模型
end_users = [EndUserSchema.model_validate(eu) for eu in end_users_orm]
business_logger.info(f"成功获取 {len(end_users)} 个宿主记录,总计 {total}")
return {"items": end_users, "total": total}
except HTTPException:
raise
except Exception as e:
business_logger.error(f"获取工作空间宿主列表(分页)失败: workspace_id={workspace_id} - {str(e)}")
raise
def get_workspace_memory_increment( def get_workspace_memory_increment(
db: Session, db: Session,
workspace_id: uuid.UUID, workspace_id: uuid.UUID,

View File

@@ -695,6 +695,37 @@ async def search_edges(end_user_id: Optional[str] = None) -> List[Dict[str, Any]
return result return result
async def search_all_batch(end_user_ids: List[str]) -> Dict[str, int]:
"""批量查询多个用户的记忆数量简化版本只返回total
Args:
end_user_ids: 用户ID列表
Returns:
Dict[str, int]: 以user_id为key的记忆数量字典
格式: {"user_id": total_count}
"""
if not end_user_ids:
return {}
result = await _neo4j_connector.execute_query(
MemoryConfigRepository.SEARCH_FOR_ALL_BATCH,
end_user_ids=end_user_ids,
)
# 转换结果为字典格式,字典格式在查询中无需遍历结果集,直接返回
data = {}
for row in result:
data[row["user_id"]] = row["total"]
# 为没有数据的用户填充默认值,转换字典格式还为无数据填充默认值
for user_id in end_user_ids:
if user_id not in data:
data[user_id] = 0
return data
async def analytics_hot_memory_tags( async def analytics_hot_memory_tags(
db: Session, db: Session,
current_user: User, current_user: User,

View File

@@ -0,0 +1,37 @@
"""
性能监控工具模块
提供代码块执行时间统计功能,用于接口性能分析。
如需再次启用性能监控,只需在 controller 中导入 from app.utils.performance_timer import timer 并添加 with timer(...) 包裹需要监控的代码块即可
"""
import time
from contextlib import contextmanager
from app.core.logging_config import get_api_logger
# 获取API专用日志器
api_logger = get_api_logger()
@contextmanager
def timer(label: str, user_count: int = 0):
"""上下文管理器:用于测量代码块执行时间
Args:
label: 统计标签,用于标识被测量的代码块
user_count: 用户数,可选参数,用于记录处理的用户数量
Usage:
with timer("获取用户列表"):
users = get_users()
with timer("批量处理", user_count=len(user_ids)):
process_users(user_ids)
"""
start = time.perf_counter()
try:
yield
finally:
elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒
extra_info = f", 用户数: {user_count}" if user_count > 0 else ""
api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}")