From 2dfc3b25d80c18c5306209aa9b7d227d2ee6df27 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 31 Mar 2026 12:26:46 +0800 Subject: [PATCH 1/3] [feat] User list pagination function --- .../memory_dashboard_controller.py | 118 +++++++++--------- api/app/services/memory_dashboard_service.py | 113 ++++++++++++++--- 2 files changed, 160 insertions(+), 71 deletions(-) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index fe4337d1..948154f1 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -1,3 +1,4 @@ +import asyncio from fastapi import APIRouter, Depends, HTTPException, status, Query from pydantic import BaseModel, Field from sqlalchemy.orm import Session @@ -47,62 +48,62 @@ def get_workspace_total_end_users( @router.get("/end_users", response_model=ApiResponse) async def get_workspace_end_users( + workspace_id: Optional[str] = Query(None, description="工作空间ID(可选,默认当前用户工作空间)"), + keyword: Optional[str] = Query(None, description="搜索关键词(同时模糊匹配 other_name 和 id)"), + page: int = Query(1, ge=1, description="页码,从1开始"), + pagesize: int = Query(10, ge=1, description="每页数量"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ - 获取工作空间的宿主列表(高性能优化版本 v2) - - 优化策略: - 1. 批量查询 end_users(一次查询而非循环) - 2. 并发查询所有用户的记忆数量(Neo4j) - 3. RAG 模式使用批量查询(一次 SQL) - 4. 只返回必要字段减少数据传输 - 5. 添加短期缓存减少重复查询 - 6. 并发执行配置查询和记忆数量查询 - - 返回格式: - { - "end_user": {"id": "uuid", "other_name": "名称"}, - "memory_num": {"total": 数量}, - "memory_config": {"memory_config_id": "id", "memory_config_name": "名称"} - } + 获取工作空间的宿主列表(分页查询,支持模糊搜索) + + 返回工作空间下的宿主列表,支持分页查询和模糊搜索。 + 通过 keyword 参数同时模糊匹配 other_name 和 id 字段。 + + Args: + workspace_id: 工作空间ID(可选,默认当前用户工作空间) + keyword: 搜索关键词(可选,同时模糊匹配 other_name 和 id) + page: 页码(从1开始,默认1) + pagesize: 每页数量(默认10) + db: 数据库会话 + current_user: 当前用户 + + Returns: + ApiResponse: 包含宿主列表和分页信息 """ - import asyncio - import json - from app.aioRedis import aio_redis_get, aio_redis_set - - workspace_id = current_user.current_workspace_id - - # 尝试从缓存获取(30秒缓存) - cache_key = f"end_users:workspace:{workspace_id}" - try: - cached_data = await aio_redis_get(cache_key) - if cached_data: - api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}") - return success(data=json.loads(cached_data), msg="宿主列表获取成功") - except Exception as e: - api_logger.warning(f"Redis 缓存读取失败: {str(e)}") - + # 如果未提供 workspace_id,使用当前用户的工作空间 + if workspace_id is None: + workspace_id = current_user.current_workspace_id # 获取当前空间类型 current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) - api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表") - - # 获取 end_users(已优化为批量查询) - end_users = memory_dashboard_service.get_workspace_end_users( + api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表: keyword={keyword}, page={page}, pagesize={pagesize}") + + # 获取分页的 end_users + end_users_result = memory_dashboard_service.get_workspace_end_users_paginated( db=db, workspace_id=workspace_id, - current_user=current_user + current_user=current_user, + page=page, + pagesize=pagesize, + keyword=keyword ) + + end_users = end_users_result.get("items", []) + total = end_users_result.get("total", 0) + if not end_users: - api_logger.info("工作空间下没有宿主") - # 缓存空结果,避免重复查询 - try: - await aio_redis_set(cache_key, json.dumps([]), expire=30) - except Exception as e: - api_logger.warning(f"Redis 缓存写入失败: {str(e)}") - return success(data=[], msg="宿主列表获取成功") - + api_logger.info(f"工作空间下没有宿主或当前页无数据: total={total}, page={page}") + return success(data={ + "items": [], + "page": { + "page": page, + "pagesize": pagesize, + "total": total, + "hasnext": (page * pagesize) < total + } + }, msg="宿主列表获取成功") + end_user_ids = [str(user.id) for user in end_users] # 并发执行两个独立的查询任务 @@ -170,13 +171,13 @@ async def get_workspace_end_users( get_memory_configs(), get_memory_nums() ) - - # 构建结果(优化:使用列表推导式) - result = [] + + # 构建结果列表 + items = [] for end_user in end_users: user_id = str(end_user.id) config_info = memory_configs_map.get(user_id, {}) - result.append({ + items.append({ 'end_user': { 'id': user_id, 'other_name': end_user.other_name @@ -187,12 +188,6 @@ async def get_workspace_end_users( "memory_config_name": config_info.get("memory_config_name") } }) - - # 写入缓存(30秒过期) - try: - await aio_redis_set(cache_key, json.dumps(result), expire=30) - except Exception as e: - api_logger.warning(f"Redis 缓存写入失败: {str(e)}") # 触发社区聚类补全任务(异步,不阻塞接口响应) try: @@ -202,7 +197,18 @@ async def get_workspace_end_users( except Exception as e: api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}") - api_logger.info(f"成功获取 {len(end_users)} 个宿主记录") + # 构建分页响应 + result = { + "items": items, + "page": { + "page": page, + "pagesize": pagesize, + "total": total, + "hasnext": (page * pagesize) < total + } + } + + api_logger.info(f"成功获取 {len(end_users)} 个宿主记录,总计 {total} 条") return success(data=result, msg="宿主列表获取成功") diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index d0078088..3ab54561 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -1,11 +1,12 @@ from sqlalchemy.orm import Session -from typing import List, Optional +from sqlalchemy import desc, nullslast, or_, and_, cast, String +from typing import List, Optional, Dict, Any import uuid from fastapi import HTTPException from app.models.user_model import User from app.models.app_model import App -from app.models.end_user_model import EndUser +from app.models.end_user_model import EndUser, EndUser as EndUserModel from app.models.memory_increment_model import MemoryIncrement from app.repositories import ( @@ -49,44 +50,40 @@ def get_current_workspace_type( def get_workspace_end_users( - db: Session, - workspace_id: uuid.UUID, + db: Session, + workspace_id: uuid.UUID, current_user: User ) -> List[EndUser]: """获取工作空间的所有宿主(优化版本:减少数据库查询次数) - 返回结果按 created_at 从新到旧排序(NULL 值排在最后) """ business_logger.info(f"获取工作空间宿主列表: workspace_id={workspace_id}, 操作者: {current_user.username}") - - try: + + try: # 查询应用(ORM) apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id) - + if not apps_orm: business_logger.info("工作空间下没有应用") return [] - + # 提取所有 app_id # app_ids = [app.id for app in apps_orm] - # 批量查询所有 end_users(一次查询而非循环查询) # 按 created_at 降序排序,NULL 值排在最后;id 作为次级排序键保证确定性 - from app.models.end_user_model import EndUser as EndUserModel - from sqlalchemy import desc, nullslast end_users_orm = db.query(EndUserModel).filter( EndUserModel.workspace_id == workspace_id ).order_by( nullslast(desc(EndUserModel.created_at)), desc(EndUserModel.id) ).all() - + # 转换为 Pydantic 模型(只在需要时转换) end_users = [EndUserSchema.model_validate(eu) for eu in end_users_orm] - + business_logger.info(f"成功获取 {len(end_users)} 个宿主记录") return end_users - + except HTTPException: raise except Exception as e: @@ -94,6 +91,92 @@ def get_workspace_end_users( raise +def get_workspace_end_users_paginated( + db: Session, + workspace_id: uuid.UUID, + current_user: User, + page: int, + pagesize: int, + keyword: Optional[str] = None +) -> Dict[str, Any]: + """获取工作空间的宿主列表(分页版本,支持模糊搜索) + + 返回结果按 created_at 从新到旧排序(NULL 值排在最后) + 支持通过 keyword 参数同时模糊搜索 other_name 和 id 字段 + + Args: + db: 数据库会话 + workspace_id: 工作空间ID + current_user: 当前用户 + page: 页码(从1开始) + pagesize: 每页数量 + keyword: 搜索关键词(可选,同时模糊匹配 other_name 和 id) + + Returns: + dict: 包含 items(宿主列表)和 total(总记录数)的字典 + """ + business_logger.info(f"获取工作空间宿主列表(分页): workspace_id={workspace_id}, keyword={keyword}, page={page}, pagesize={pagesize}, 操作者: {current_user.username}") + + try: + # 构建基础查询 + base_query = db.query(EndUserModel).filter( + EndUserModel.workspace_id == workspace_id + ) + + # 构建搜索条件(过滤空字符串和None) + keyword = keyword.strip() if keyword else None + + if keyword: + keyword_pattern = f"%{keyword}%" + # 优先匹配 other_name,如果 other_name 为空则匹配 id + # 使用 OR 条件:匹配 other_name 不为空的数据,或者 other_name 为空但 id 匹配的数据 + base_query = base_query.filter( + or_( + # 情况1:other_name 不为空且匹配 keyword + and_( + EndUserModel.other_name != "", + EndUserModel.other_name.isnot(None), + EndUserModel.other_name.ilike(keyword_pattern) + ), + # 情况2:other_name 为空或 None,但 id 匹配 keyword + and_( + or_( + EndUserModel.other_name == "", + EndUserModel.other_name.is_(None) + ), + cast(EndUserModel.id, String).ilike(keyword_pattern) + ) + ) + ) + business_logger.info(f"应用模糊搜索: keyword={keyword}(优先匹配 other_name,无 other_name 时匹配 id)") + + # 获取总记录数 + total = base_query.count() + + if total == 0: + business_logger.info("工作空间下没有宿主") + return {"items": [], "total": 0} + + # 分页查询 + # 按 created_at 降序排序,NULL 值排在最后;id 作为次级排序键保证确定性 + end_users_orm = base_query.order_by( + nullslast(desc(EndUserModel.created_at)), + desc(EndUserModel.id) + ).offset((page - 1) * pagesize).limit(pagesize).all() + + # 转换为 Pydantic 模型 + end_users = [EndUserSchema.model_validate(eu) for eu in end_users_orm] + + business_logger.info(f"成功获取 {len(end_users)} 个宿主记录,总计 {total} 条") + return {"items": end_users, "total": total} + + except HTTPException: + raise + except Exception as e: + business_logger.error(f"获取工作空间宿主列表(分页)失败: workspace_id={workspace_id} - {str(e)}") + raise + + def get_workspace_memory_increment( db: Session, workspace_id: uuid.UUID, From ab45b7abacf41dd090bc8aeca595ae47c4853e80 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 31 Mar 2026 16:30:29 +0800 Subject: [PATCH 2/3] [feat] Optimize the performance of the /end_users interface and introduce performance monitoring tools --- .../memory_dashboard_controller.py | 34 +++++++---------- .../repositories/memory_config_repository.py | 9 +++++ api/app/services/memory_storage_service.py | 31 ++++++++++++++++ api/app/utils/performance_timer.py | 37 +++++++++++++++++++ 4 files changed, 90 insertions(+), 21 deletions(-) create mode 100644 api/app/utils/performance_timer.py diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 948154f1..260ea670 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -77,7 +77,7 @@ async def get_workspace_end_users( workspace_id = current_user.current_workspace_id # 获取当前空间类型 current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) - api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表: keyword={keyword}, page={page}, pagesize={pagesize}") + api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表, 类型: {current_workspace_type}") # 获取分页的 end_users end_users_result = memory_dashboard_service.get_workspace_end_users_paginated( @@ -105,7 +105,7 @@ async def get_workspace_end_users( }, msg="宿主列表获取成功") end_user_ids = [str(user.id) for user in end_users] - + # 并发执行两个独立的查询任务 async def get_memory_configs(): """获取记忆配置(在线程池中执行同步查询)""" @@ -117,7 +117,7 @@ async def get_workspace_end_users( except Exception as e: api_logger.error(f"批量获取记忆配置失败: {str(e)}") return {} - + async def get_memory_nums(): """获取记忆数量""" if current_workspace_type == "rag": @@ -131,26 +131,18 @@ async def get_workspace_end_users( except Exception as e: api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}") return {uid: {"total": 0} for uid in end_user_ids} - + elif current_workspace_type == "neo4j": - # Neo4j 模式:并发查询(带并发限制) - # 使用信号量限制并发数,避免大量用户时压垮 Neo4j - MAX_CONCURRENT_QUERIES = 10 - semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES) - - async def get_neo4j_memory_num(end_user_id: str): - async with semaphore: - try: - return await memory_storage_service.search_all(end_user_id) - except Exception as e: - api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}") - return {"total": 0} - - memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids]) - return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))} - + # Neo4j 模式:批量查询(简化版本,只返回total) + try: + batch_result = await memory_storage_service.search_all_batch(end_user_ids) + return {uid: {"total": count} for uid, count in batch_result.items()} + except Exception as e: + api_logger.error(f"批量获取 Neo4j 记忆数量失败: {str(e)}") + return {uid: {"total": 0} for uid in end_user_ids} + return {uid: {"total": 0} for uid in end_user_ids} - + # 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据 try: from app.celery_app import celery_app as _celery_app diff --git a/api/app/repositories/memory_config_repository.py b/api/app/repositories/memory_config_repository.py index e64d19a3..3139b851 100644 --- a/api/app/repositories/memory_config_repository.py +++ b/api/app/repositories/memory_config_repository.py @@ -78,6 +78,15 @@ class MemoryConfigRepository: OPTIONAL MATCH (n) WHERE n.end_user_id = $end_user_id RETURN 'ALL' AS Label, COUNT(n) AS Count """ + # 批量查询多个用户的记忆数量(简化版本,只返回total) + SEARCH_FOR_ALL_BATCH = """ + MATCH (n) WHERE n.end_user_id IN $end_user_ids + RETURN + n.end_user_id as user_id, + count(n) as total + ORDER BY user_id + """ + # Extracted entity details within group/app/user SEARCH_FOR_DETIALS = """ MATCH (n:ExtractedEntity) diff --git a/api/app/services/memory_storage_service.py b/api/app/services/memory_storage_service.py index 58f3e8bd..b3a66734 100644 --- a/api/app/services/memory_storage_service.py +++ b/api/app/services/memory_storage_service.py @@ -695,6 +695,37 @@ async def search_edges(end_user_id: Optional[str] = None) -> List[Dict[str, Any] return result +async def search_all_batch(end_user_ids: List[str]) -> Dict[str, int]: + """批量查询多个用户的记忆数量(简化版本,只返回total) + + Args: + end_user_ids: 用户ID列表 + + Returns: + Dict[str, int]: 以user_id为key的记忆数量字典 + 格式: {"user_id": total_count} + """ + if not end_user_ids: + return {} + + result = await _neo4j_connector.execute_query( + MemoryConfigRepository.SEARCH_FOR_ALL_BATCH, + end_user_ids=end_user_ids, + ) + + # 转换结果为字典格式,字典格式在查询中无需遍历结果集,直接返回 + data = {} + for row in result: + data[row["user_id"]] = row["total"] + + # 为没有数据的用户填充默认值,转换字典格式还为无数据填充默认值 + for user_id in end_user_ids: + if user_id not in data: + data[user_id] = 0 + + return data + + async def analytics_hot_memory_tags( db: Session, current_user: User, diff --git a/api/app/utils/performance_timer.py b/api/app/utils/performance_timer.py new file mode 100644 index 00000000..6b0ec5d6 --- /dev/null +++ b/api/app/utils/performance_timer.py @@ -0,0 +1,37 @@ +""" +性能监控工具模块 + +提供代码块执行时间统计功能,用于接口性能分析。 +如需再次启用性能监控,只需在 controller 中导入 from app.utils.performance_timer import timer 并添加 with timer(...) 包裹需要监控的代码块即可 +""" + +import time +from contextlib import contextmanager +from app.core.logging_config import get_api_logger + +# 获取API专用日志器 +api_logger = get_api_logger() + + +@contextmanager +def timer(label: str, user_count: int = 0): + """上下文管理器:用于测量代码块执行时间 + + Args: + label: 统计标签,用于标识被测量的代码块 + user_count: 用户数,可选参数,用于记录处理的用户数量 + + Usage: + with timer("获取用户列表"): + users = get_users() + + with timer("批量处理", user_count=len(user_ids)): + process_users(user_ids) + """ + start = time.perf_counter() + try: + yield + finally: + elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒 + extra_info = f", 用户数: {user_count}" if user_count > 0 else "" + api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}") From 6d6338eb06388aedddbf9951f0c2dc40dd8f0596 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Wed, 1 Apr 2026 10:36:29 +0800 Subject: [PATCH 3/3] [changes] Modify the data format and improve the query logic. --- .../memory_dashboard_controller.py | 3 ++- api/app/services/memory_dashboard_service.py | 19 ++++++------------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 260ea670..bedee987 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -1,4 +1,5 @@ import asyncio +import uuid from fastapi import APIRouter, Depends, HTTPException, status, Query from pydantic import BaseModel, Field from sqlalchemy.orm import Session @@ -48,7 +49,7 @@ def get_workspace_total_end_users( @router.get("/end_users", response_model=ApiResponse) async def get_workspace_end_users( - workspace_id: Optional[str] = Query(None, description="工作空间ID(可选,默认当前用户工作空间)"), + workspace_id: Optional[uuid.UUID] = Query(None, description="工作空间ID(可选,默认当前用户工作空间)"), keyword: Optional[str] = Query(None, description="搜索关键词(同时模糊匹配 other_name 和 id)"), page: int = Query(1, ge=1, description="页码,从1开始"), pagesize: int = Query(10, ge=1, description="每页数量"), diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index 3ab54561..9fad5dfe 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -128,27 +128,20 @@ def get_workspace_end_users_paginated( if keyword: keyword_pattern = f"%{keyword}%" - # 优先匹配 other_name,如果 other_name 为空则匹配 id - # 使用 OR 条件:匹配 other_name 不为空的数据,或者 other_name 为空但 id 匹配的数据 + # other_name 匹配始终生效;id 匹配仅对 other_name 为空的记录生效 base_query = base_query.filter( or_( - # 情况1:other_name 不为空且匹配 keyword - and_( - EndUserModel.other_name != "", - EndUserModel.other_name.isnot(None), - EndUserModel.other_name.ilike(keyword_pattern) - ), - # 情况2:other_name 为空或 None,但 id 匹配 keyword + EndUserModel.other_name.ilike(keyword_pattern), and_( or_( + EndUserModel.other_name.is_(None), EndUserModel.other_name == "", - EndUserModel.other_name.is_(None) ), - cast(EndUserModel.id, String).ilike(keyword_pattern) - ) + cast(EndUserModel.id, String).ilike(keyword_pattern), + ), ) ) - business_logger.info(f"应用模糊搜索: keyword={keyword}(优先匹配 other_name,无 other_name 时匹配 id)") + business_logger.info(f"应用模糊搜索: keyword={keyword}(匹配 other_name;other_name 为空时匹配 id)") # 获取总记录数 total = base_query.count()