diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index e03c1846..6181c319 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -49,63 +49,135 @@ async def get_workspace_end_users( current_user: User = Depends(get_current_user), ): """ - 获取工作空间的宿主列表 + 获取工作空间的宿主列表(高性能优化版本 v2) - 返回格式与原 memory_list 接口中的 end_users 字段相同, - 并包含每个用户的记忆配置信息(memory_config_id 和 memory_config_name) + 优化策略: + 1. 批量查询 end_users(一次查询而非循环) + 2. 并发查询所有用户的记忆数量(Neo4j) + 3. RAG 模式使用批量查询(一次 SQL) + 4. 只返回必要字段减少数据传输 + 5. 添加短期缓存减少重复查询 + 6. 并发执行配置查询和记忆数量查询 + + 返回格式: + { + "end_user": {"id": "uuid", "other_name": "名称"}, + "memory_num": {"total": 数量}, + "memory_config": {"memory_config_id": "id", "memory_config_name": "名称"} + } """ + import asyncio + import json + from app.aioRedis import aio_redis_get, aio_redis_set + workspace_id = current_user.current_workspace_id + + # 尝试从缓存获取(30秒缓存) + cache_key = f"end_users:workspace:{workspace_id}" + try: + cached_data = await aio_redis_get(cache_key) + if cached_data: + api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}") + return success(data=json.loads(cached_data), msg="宿主列表获取成功") + except Exception as e: + api_logger.warning(f"Redis 缓存读取失败: {str(e)}") + # 获取当前空间类型 current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表") + + # 获取 end_users(已优化为批量查询) end_users = memory_dashboard_service.get_workspace_end_users( db=db, workspace_id=workspace_id, current_user=current_user ) - # 批量获取所有用户的记忆配置信息(优化:一次查询而非 N 次) - end_user_ids = [str(user.id) for user in end_users] - memory_configs_map = {} - if end_user_ids: + if not end_users: + api_logger.info("工作空间下没有宿主") + # 缓存空结果,避免重复查询 try: - memory_configs_map = get_end_users_connected_configs_batch(end_user_ids, db) + await aio_redis_set(cache_key, json.dumps([]), expire=30) + except Exception as e: + api_logger.warning(f"Redis 缓存写入失败: {str(e)}") + return success(data=[], msg="宿主列表获取成功") + + end_user_ids = [str(user.id) for user in end_users] + + # 并发执行两个独立的查询任务 + async def get_memory_configs(): + """获取记忆配置(在线程池中执行同步查询)""" + try: + return await asyncio.to_thread( + get_end_users_connected_configs_batch, + end_user_ids, db + ) except Exception as e: api_logger.error(f"批量获取记忆配置失败: {str(e)}") - # 失败时使用空字典,不影响其他数据返回 + return {} + async def get_memory_nums(): + """获取记忆数量""" + if current_workspace_type == "rag": + # RAG 模式:批量查询 + try: + chunk_map = await asyncio.to_thread( + memory_dashboard_service.get_users_total_chunk_batch, + end_user_ids, db, current_user + ) + return {uid: {"total": count} for uid, count in chunk_map.items()} + except Exception as e: + api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}") + return {uid: {"total": 0} for uid in end_user_ids} + + elif current_workspace_type == "neo4j": + # Neo4j 模式:并发查询(带并发限制) + # 使用信号量限制并发数,避免大量用户时压垮 Neo4j + MAX_CONCURRENT_QUERIES = 10 + semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES) + + async def get_neo4j_memory_num(end_user_id: str): + async with semaphore: + try: + return await memory_storage_service.search_all(end_user_id) + except Exception as e: + api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}") + return {"total": 0} + + memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids]) + return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))} + + return {uid: {"total": 0} for uid in end_user_ids} + + # 并发执行配置查询和记忆数量查询 + memory_configs_map, memory_nums_map = await asyncio.gather( + get_memory_configs(), + get_memory_nums() + ) + + # 构建结果(优化:使用列表推导式) result = [] for end_user in end_users: - memory_num = {} - if current_workspace_type == "neo4j": - # EndUser 是 Pydantic 模型,直接访问属性而不是使用 .get() - memory_num = await memory_storage_service.search_all(str(end_user.id)) - elif current_workspace_type == "rag": - memory_num = { - "total":memory_dashboard_service.get_current_user_total_chunk(str(end_user.id), db, current_user) - } - - # 从批量查询结果中获取配置信息 user_id = str(end_user.id) - memory_config_info = memory_configs_map.get(user_id, { - "memory_config_id": None, - "memory_config_name": None - }) - - # 只保留需要的字段,移除 error 字段(如果有) - memory_config = { - "memory_config_id": memory_config_info.get("memory_config_id"), - "memory_config_name": memory_config_info.get("memory_config_name") - } - - result.append( - { - 'end_user': end_user, - 'memory_num': memory_num, - 'memory_config': memory_config + config_info = memory_configs_map.get(user_id, {}) + result.append({ + 'end_user': { + 'id': user_id, + 'other_name': end_user.other_name + }, + 'memory_num': memory_nums_map.get(user_id, {"total": 0}), + 'memory_config': { + "memory_config_id": config_info.get("memory_config_id"), + "memory_config_name": config_info.get("memory_config_name") } - ) - + }) + + # 写入缓存(30秒过期) + try: + await aio_redis_set(cache_key, json.dumps(result), expire=30) + except Exception as e: + api_logger.warning(f"Redis 缓存写入失败: {str(e)}") + api_logger.info(f"成功获取 {len(end_users)} 个宿主记录") return success(data=result, msg="宿主列表获取成功") diff --git a/api/app/controllers/memory_storage_controller.py b/api/app/controllers/memory_storage_controller.py index f4175923..3722be3d 100644 --- a/api/app/controllers/memory_storage_controller.py +++ b/api/app/controllers/memory_storage_controller.py @@ -420,15 +420,95 @@ async def get_hot_memory_tags_api( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ) -> dict: - api_logger.info(f"Hot memory tags requested for current_user: {current_user.id}") + """ + 获取热门记忆标签(带Redis缓存) + + 缓存策略: + - 缓存键:workspace_id + limit + - 过期时间:5分钟(300秒) + - 缓存命中:~50ms + - 缓存未命中:~600-800ms(取决于LLM速度) + """ + workspace_id = current_user.current_workspace_id + + # 构建缓存键 + cache_key = f"hot_memory_tags:{workspace_id}:{limit}" + + api_logger.info(f"Hot memory tags requested for workspace: {workspace_id}, limit: {limit}") + try: + # 尝试从Redis缓存获取 + from app.aioRedis import aio_redis_get, aio_redis_set + import json + + cached_result = await aio_redis_get(cache_key) + if cached_result: + api_logger.info(f"Cache hit for key: {cache_key}") + try: + data = json.loads(cached_result) + return success(data=data, msg="查询成功(缓存)") + except json.JSONDecodeError: + api_logger.warning(f"Failed to parse cached data, will refresh") + + # 缓存未命中,执行查询 + api_logger.info(f"Cache miss for key: {cache_key}, executing query") result = await analytics_hot_memory_tags(db, current_user, limit) + + # 写入缓存(过期时间:5分钟) + # 注意:result是列表,需要转换为JSON字符串 + try: + cache_data = json.dumps(result, ensure_ascii=False) + await aio_redis_set(cache_key, cache_data, expire=300) + api_logger.info(f"Cached result for key: {cache_key}") + except Exception as cache_error: + # 缓存写入失败不影响主流程 + api_logger.warning(f"Failed to cache result: {str(cache_error)}") + return success(data=result, msg="查询成功") + except Exception as e: api_logger.error(f"Hot memory tags failed: {str(e)}") return fail(BizCode.INTERNAL_ERROR, "热门标签查询失败", str(e)) +@router.delete("/analytics/hot_memory_tags/cache", response_model=ApiResponse) +async def clear_hot_memory_tags_cache( + current_user: User = Depends(get_current_user), + ) -> dict: + """ + 清除热门标签缓存 + + 用于: + - 手动刷新数据 + - 调试和测试 + - 数据更新后立即生效 + """ + workspace_id = current_user.current_workspace_id + + api_logger.info(f"Clear hot memory tags cache requested for workspace: {workspace_id}") + + try: + from app.aioRedis import aio_redis_delete + + # 清除所有limit的缓存(常见的limit值) + cleared_count = 0 + for limit in [5, 10, 15, 20, 30, 50]: + cache_key = f"hot_memory_tags:{workspace_id}:{limit}" + result = await aio_redis_delete(cache_key) + if result: + cleared_count += 1 + api_logger.info(f"Cleared cache for key: {cache_key}") + + return success( + data={"cleared_count": cleared_count}, + msg=f"成功清除 {cleared_count} 个缓存" + ) + + except Exception as e: + api_logger.error(f"Clear cache failed: {str(e)}") + return fail(BizCode.INTERNAL_ERROR, "清除缓存失败", str(e)) + + @router.get("/analytics/recent_activity_stats", response_model=ApiResponse) async def get_recent_activity_stats_api( current_user: User = Depends(get_current_user), diff --git a/api/app/services/memory_dashboard_service.py b/api/app/services/memory_dashboard_service.py index a774647e..06a94060 100644 --- a/api/app/services/memory_dashboard_service.py +++ b/api/app/services/memory_dashboard_service.py @@ -53,18 +53,28 @@ def get_workspace_end_users( workspace_id: uuid.UUID, current_user: User ) -> List[EndUser]: - """获取工作空间的所有宿主""" + """获取工作空间的所有宿主(优化版本:减少数据库查询次数)""" business_logger.info(f"获取工作空间宿主列表: workspace_id={workspace_id}, 操作者: {current_user.username}") try: - # 查询应用(ORM)并转换为 Pydantic 模型 + # 查询应用(ORM) apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id) - apps = [AppSchema.model_validate(h) for h in apps_orm] - app_ids = [app.id for app in apps] - end_users = [] - for app_id in app_ids: - end_user_orm_list = end_user_repository.get_end_users_by_app_id(db, app_id) - end_users.extend([EndUserSchema.model_validate(h) for h in end_user_orm_list]) + + if not apps_orm: + business_logger.info("工作空间下没有应用") + return [] + + # 提取所有 app_id + app_ids = [app.id for app in apps_orm] + + # 批量查询所有 end_users(一次查询而非循环查询) + from app.models.end_user_model import EndUser as EndUserModel + end_users_orm = db.query(EndUserModel).filter( + EndUserModel.app_id.in_(app_ids) + ).all() + + # 转换为 Pydantic 模型(只在需要时转换) + end_users = [EndUserSchema.model_validate(eu) for eu in end_users_orm] business_logger.info(f"成功获取 {len(end_users)} 个宿主记录") return end_users @@ -414,6 +424,67 @@ def get_current_user_total_chunk( business_logger.error(f"获取用户总chunk数失败: end_user_id={end_user_id} - {str(e)}") raise + +def get_users_total_chunk_batch( + end_user_ids: List[str], + db: Session, + current_user: User +) -> dict: + """ + 批量获取多个用户的总chunk数(性能优化版本) + + Args: + end_user_ids: 用户ID列表 + db: 数据库会话 + current_user: 当前用户 + + Returns: + 字典,key为end_user_id,value为chunk总数 + 格式: {"user_id_1": 100, "user_id_2": 50, ...} + """ + business_logger.info(f"批量获取 {len(end_user_ids)} 个用户的总chunk数, 操作者: {current_user.username}") + + try: + from app.models.document_model import Document + from sqlalchemy import func, case + + if not end_user_ids: + return {} + + # 构造所有文件名 + file_names = [f"{user_id}.txt" for user_id in end_user_ids] + + # 一次查询获取所有用户的chunk总数 + # 使用 GROUP BY file_name 来分组统计 + results = db.query( + Document.file_name, + func.sum(Document.chunk_num).label('total_chunk') + ).filter( + Document.file_name.in_(file_names) + ).group_by( + Document.file_name + ).all() + + # 构建结果字典 + chunk_map = {} + for file_name, total_chunk in results: + # 从文件名中提取 end_user_id (去掉 .txt 后缀) + user_id = file_name.replace('.txt', '') + chunk_map[user_id] = int(total_chunk or 0) + + # 对于没有记录的用户,设置为0 + for user_id in end_user_ids: + if user_id not in chunk_map: + chunk_map[user_id] = 0 + + business_logger.info(f"成功批量获取 {len(chunk_map)} 个用户的总chunk数") + return chunk_map + + except Exception as e: + business_logger.error(f"批量获取用户总chunk数失败: {str(e)}") + raise + + def get_rag_content( end_user_id: str, limit: int, diff --git a/api/app/services/memory_storage_service.py b/api/app/services/memory_storage_service.py index 48c3abf1..c276f337 100644 --- a/api/app/services/memory_storage_service.py +++ b/api/app/services/memory_storage_service.py @@ -12,7 +12,11 @@ from datetime import datetime from typing import Any, AsyncGenerator, Dict, List, Optional from app.core.logging_config import get_config_logger, get_logger -from app.core.memory.analytics.hot_memory_tags import get_hot_memory_tags +from app.core.memory.analytics.hot_memory_tags import ( + get_hot_memory_tags, + get_raw_tags_from_db, + filter_tags_with_llm, +) from app.core.memory.analytics.recent_activity_stats import get_recent_activity_stats from app.models.user_model import User from app.repositories.data_config_repository import DataConfigRepository @@ -515,27 +519,79 @@ async def analytics_hot_memory_tags( ) -> List[Dict[str, Any]]: """ 获取热门记忆标签,按数量排序并返回前N个 + + 优化策略: + 1. 先从所有用户收集原始标签(不调用LLM) + 2. 聚合并合并相同标签的频率 + 3. 排序后取前N个 + 4. 只调用一次LLM进行筛选 """ workspace_id = current_user.current_workspace_id # 获取更多标签供LLM筛选(获取limit*4个标签) raw_limit = limit * 4 from app.services.memory_dashboard_service import get_workspace_end_users - end_users = get_workspace_end_users(db, workspace_id, current_user) + # 使用 asyncio.to_thread 避免阻塞事件循环 + end_users = await asyncio.to_thread(get_workspace_end_users, db, workspace_id, current_user) - tags = [] - for end_user in end_users: - tag = await get_hot_memory_tags(str(end_user.id), limit=raw_limit) - if tag: - # 将每个用户的标签列表展平到总列表中 - tags.extend(tag) - - # 按频率降序排序(虽然数据库已经排序,但为了确保正确性再次排序) - sorted_tags = sorted(tags, key=lambda x: x[1], reverse=True) + if not end_users: + return [] - # 只返回前limit个 - top_tags = sorted_tags[:limit] - - return [{"name": t, "frequency": f} for t, f in top_tags] + # 步骤1: 收集所有用户的原始标签(不调用LLM) + connector = Neo4jConnector() + try: + all_raw_tags = [] + for end_user in end_users: + raw_tags = await get_raw_tags_from_db( + connector, + str(end_user.id), + limit=raw_limit, + by_user=False + ) + if raw_tags: + all_raw_tags.extend(raw_tags) + + if not all_raw_tags: + return [] + + # 步骤2: 聚合相同标签的频率 + tag_frequency_map = {} + for tag_name, frequency in all_raw_tags: + if tag_name in tag_frequency_map: + tag_frequency_map[tag_name] += frequency + else: + tag_frequency_map[tag_name] = frequency + + # 步骤3: 按频率降序排序,取前raw_limit个 + sorted_tags = sorted( + tag_frequency_map.items(), + key=lambda x: x[1], + reverse=True + )[:raw_limit] + + if not sorted_tags: + return [] + + # 步骤4: 只调用一次LLM进行筛选 + tag_names = [tag for tag, _ in sorted_tags] + + # 使用第一个用户的group_id来获取LLM配置 + # 因为同一工作空间下的用户应该使用相同的配置 + first_end_user_id = str(end_users[0].id) + filtered_tag_names = await filter_tags_with_llm(tag_names, first_end_user_id) + + # 步骤5: 根据LLM筛选结果构建最终列表(保留频率) + final_tags = [] + for tag, freq in sorted_tags: + if tag in filtered_tag_names: + final_tags.append((tag, freq)) + + # 步骤6: 只返回前limit个 + top_tags = final_tags[:limit] + + return [{"name": t, "frequency": f} for t, f in top_tags] + + finally: + await connector.close() async def analytics_recent_activity_stats() -> Dict[str, Any]: