from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.orm import Session from typing import Optional from app.core.response_utils import success from app.db import get_db from app.dependencies import get_current_user from app.models.user_model import User from app.schemas.response_schema import ApiResponse from app.services import memory_dashboard_service, memory_storage_service, workspace_service from app.services.memory_agent_service import get_end_users_connected_configs_batch from app.services.app_statistics_service import AppStatisticsService from app.core.logging_config import get_api_logger # 获取API专用日志器 api_logger = get_api_logger() router = APIRouter( prefix="/dashboard", tags=["Dashboard"], dependencies=[Depends(get_current_user)] # Apply auth to all routes in this controller ) @router.get("/total_end_users", response_model=ApiResponse) def get_workspace_total_end_users( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取用户列表的总用户数 """ workspace_id = current_user.current_workspace_id api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表") total_end_users = memory_dashboard_service.get_workspace_total_end_users( db=db, workspace_id=workspace_id, current_user=current_user ) api_logger.info(f"成功获取最新用户总数: total_num={total_end_users.get('total_num', 0)}") return success(data=total_end_users, msg="用户数量获取成功") @router.get("/end_users", response_model=ApiResponse) async def get_workspace_end_users( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取工作空间的宿主列表(高性能优化版本 v2) 优化策略: 1. 批量查询 end_users(一次查询而非循环) 2. 并发查询所有用户的记忆数量(Neo4j) 3. RAG 模式使用批量查询(一次 SQL) 4. 只返回必要字段减少数据传输 5. 添加短期缓存减少重复查询 6. 并发执行配置查询和记忆数量查询 返回格式: { "end_user": {"id": "uuid", "other_name": "名称"}, "memory_num": {"total": 数量}, "memory_config": {"memory_config_id": "id", "memory_config_name": "名称"} } """ import asyncio import json from app.aioRedis import aio_redis_get, aio_redis_set workspace_id = current_user.current_workspace_id # 尝试从缓存获取(30秒缓存) cache_key = f"end_users:workspace:{workspace_id}" try: cached_data = await aio_redis_get(cache_key) if cached_data: api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}") return success(data=json.loads(cached_data), msg="宿主列表获取成功") except Exception as e: api_logger.warning(f"Redis 缓存读取失败: {str(e)}") # 获取当前空间类型 current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user) api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表") # 获取 end_users(已优化为批量查询) end_users = memory_dashboard_service.get_workspace_end_users( db=db, workspace_id=workspace_id, current_user=current_user ) if not end_users: api_logger.info("工作空间下没有宿主") # 缓存空结果,避免重复查询 try: await aio_redis_set(cache_key, json.dumps([]), expire=30) except Exception as e: api_logger.warning(f"Redis 缓存写入失败: {str(e)}") return success(data=[], msg="宿主列表获取成功") end_user_ids = [str(user.id) for user in end_users] # 并发执行两个独立的查询任务 async def get_memory_configs(): """获取记忆配置(在线程池中执行同步查询)""" try: return await asyncio.to_thread( get_end_users_connected_configs_batch, end_user_ids, db ) except Exception as e: api_logger.error(f"批量获取记忆配置失败: {str(e)}") return {} async def get_memory_nums(): """获取记忆数量""" if current_workspace_type == "rag": # RAG 模式:批量查询 try: chunk_map = await asyncio.to_thread( memory_dashboard_service.get_users_total_chunk_batch, end_user_ids, db, current_user ) return {uid: {"total": count} for uid, count in chunk_map.items()} except Exception as e: api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}") return {uid: {"total": 0} for uid in end_user_ids} elif current_workspace_type == "neo4j": # Neo4j 模式:并发查询(带并发限制) # 使用信号量限制并发数,避免大量用户时压垮 Neo4j MAX_CONCURRENT_QUERIES = 10 semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES) async def get_neo4j_memory_num(end_user_id: str): async with semaphore: try: return await memory_storage_service.search_all(end_user_id) except Exception as e: api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}") return {"total": 0} memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids]) return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))} return {uid: {"total": 0} for uid in end_user_ids} # 并发执行配置查询和记忆数量查询 memory_configs_map, memory_nums_map = await asyncio.gather( get_memory_configs(), get_memory_nums() ) # 构建结果(优化:使用列表推导式) result = [] for end_user in end_users: user_id = str(end_user.id) config_info = memory_configs_map.get(user_id, {}) result.append({ 'end_user': { 'id': user_id, 'other_name': end_user.other_name }, 'memory_num': memory_nums_map.get(user_id, {"total": 0}), 'memory_config': { "memory_config_id": config_info.get("memory_config_id"), "memory_config_name": config_info.get("memory_config_name") } }) # 写入缓存(30秒过期) try: await aio_redis_set(cache_key, json.dumps(result), expire=30) except Exception as e: api_logger.warning(f"Redis 缓存写入失败: {str(e)}") # 触发社区聚类补全任务(异步,不阻塞接口响应) # 对有 ExtractedEntity 但无 Community 节点的存量用户自动补跑全量聚类 try: from app.tasks import init_community_clustering_for_users init_community_clustering_for_users.apply_async( kwargs={"end_user_ids": end_user_ids}, queue="periodic_tasks", ) api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}") except Exception as e: api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}") api_logger.info(f"成功获取 {len(end_users)} 个宿主记录") return success(data=result, msg="宿主列表获取成功") @router.get("/memory_increment", response_model=ApiResponse) def get_workspace_memory_increment( limit: int = Query(7, description="返回记录数"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """获取工作空间的记忆增量""" workspace_id = current_user.current_workspace_id api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的记忆增量") memory_increment = memory_dashboard_service.get_workspace_memory_increment( db=db, workspace_id=workspace_id, current_user=current_user, limit=limit ) api_logger.info(f"成功获取 {len(memory_increment)} 条记忆增量记录") return success(data=memory_increment, msg="记忆增量获取成功") @router.get("/api_increment", response_model=ApiResponse) def get_workspace_api_increment( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """获取API调用趋势""" workspace_id = current_user.current_workspace_id api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的API调用增量") api_increment = memory_dashboard_service.get_workspace_api_increment( db=db, workspace_id=workspace_id, current_user=current_user ) api_logger.info(f"成功获取 {api_increment} API调用增量") return success(data=api_increment, msg="API调用增量获取成功") @router.post("/total_memory", response_model=ApiResponse) def write_workspace_total_memory( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """工作空间记忆总量的写入(异步任务)""" workspace_id = current_user.current_workspace_id api_logger.info(f"用户 {current_user.username} 请求写入工作空间 {workspace_id} 的记忆总量") # 触发 Celery 异步任务 from app.celery_app import celery_app task = celery_app.send_task( "app.controllers.memory_storage_controller.search_all", kwargs={"workspace_id": str(workspace_id)} ) api_logger.info(f"已触发记忆总量统计任务,task_id: {task.id}") return success( data={"task_id": task.id, "workspace_id": str(workspace_id)}, msg="记忆总量统计任务已启动" ) @router.get("/task_status/{task_id}", response_model=ApiResponse) def get_task_status( task_id: str, current_user: User = Depends(get_current_user), ): """查询异步任务的执行状态和结果""" api_logger.info(f"用户 {current_user.username} 查询任务状态: task_id={task_id}") from app.celery_app import celery_app from celery.result import AsyncResult # 获取任务结果 task_result = AsyncResult(task_id, app=celery_app) response_data = { "task_id": task_id, "status": task_result.state, # PENDING, STARTED, SUCCESS, FAILURE, RETRY, REVOKED } # 如果任务完成,返回结果 if task_result.ready(): if task_result.successful(): response_data["result"] = task_result.result api_logger.info(f"任务 {task_id} 执行成功") return success(data=response_data, msg="任务执行成功") else: # 任务失败 response_data["error"] = str(task_result.result) api_logger.error(f"任务 {task_id} 执行失败: {task_result.result}") return success(data=response_data, msg="任务执行失败") else: # 任务还在执行中 api_logger.info(f"任务 {task_id} 状态: {task_result.state}") return success(data=response_data, msg=f"任务状态: {task_result.state}") @router.get("/memory_list", response_model=ApiResponse) def get_workspace_memory_list( limit: int = Query(7, description="记忆增量返回记录数"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 用户记忆列表整合接口 整合以下三个接口的数据: 1. total_memory - 工作空间记忆总量 2. memory_increment - 工作空间记忆增量 3. hosts - 工作空间宿主列表 返回格式: { "total_memory": float, "memory_increment": [ {"date": "2024-01-01", "count": 100}, ... ], "hosts": [ {"id": "uuid", "name": "宿主名", ...}, ... ] } """ workspace_id = current_user.current_workspace_id api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的记忆列表") memory_list = memory_dashboard_service.get_workspace_memory_list( db=db, workspace_id=workspace_id, current_user=current_user, limit=limit ) api_logger.info("成功获取记忆列表") return success(data=memory_list, msg="记忆列表获取成功") @router.get("/total_memory_count", response_model=ApiResponse) async def get_workspace_total_memory_count( end_user_id: Optional[str] = Query(None, description="可选的用户ID"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取工作空间的记忆总量(通过聚合所有host的记忆数) 逻辑: 1. 从 memory_list 获取所有 host_id 2. 对每个 host_id 调用 search_all 获取 total 3. 将所有 total 求和返回 返回格式: { "total_memory_count": int, "host_count": int, "details": [ {"end_user_id": "uuid", "count": 100, "name": "用户名称"}, ... ] } """ workspace_id = current_user.current_workspace_id api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的记忆总量") total_memory_count = await memory_dashboard_service.get_workspace_total_memory_count( db=db, workspace_id=workspace_id, current_user=current_user, end_user_id=end_user_id ) api_logger.info(f"成功获取记忆总量: {total_memory_count.get('total_memory_count', 0)}") return success(data=total_memory_count, msg="记忆总量获取成功") # ======== RAG 数据统计 ======== @router.get("/total_rag_count", response_model=ApiResponse) def get_workspace_total_rag_count( db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ 获取 rag 的总文档数、总chunk数、总知识库数量、总api调用数量 """ total_documents = memory_dashboard_service.get_rag_total_doc(db, current_user) total_chunk = memory_dashboard_service.get_rag_total_chunk(db, current_user) total_kb = memory_dashboard_service.get_rag_total_kb(db, current_user) data = { 'total_documents':total_documents, 'total_chunk':total_chunk, 'total_kb':total_kb, 'total_api':1024 } return success(data=data, msg="RAG相关数据获取成功") @router.get("/current_user_rag_total_num", response_model=ApiResponse) def get_current_user_rag_total_num( end_user_id: str = Query(..., description="宿主ID"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取当前宿主的 RAG 的总chunk数量 """ total_chunk = memory_dashboard_service.get_current_user_total_chunk(end_user_id, db, current_user) return success(data=total_chunk, msg="宿主RAG知识数据获取成功") @router.get("/rag_content", response_model=ApiResponse) def get_rag_content( end_user_id: str = Query(..., description="宿主ID"), limit: int = Query(15, description="返回记录数"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取当前宿主知识库中的chunk内容 """ data = memory_dashboard_service.get_rag_content(end_user_id, limit, db, current_user) return success(data=data, msg="宿主RAGchunk数据获取成功") @router.get("/chunk_summary_tag", response_model=ApiResponse) async def get_chunk_summary_tag( end_user_id: str = Query(..., description="宿主ID"), limit: int = Query(15, description="返回记录数"), max_tags: int = Query(10, description="最大标签数量"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取chunk总结、提取的标签和人物形象 返回格式: { "summary": "chunk内容的总结", "tags": [ {"tag": "标签1", "frequency": 5}, {"tag": "标签2", "frequency": 3}, ... ], "personas": [ "产品设计师", "旅行爱好者", "摄影发烧友", ... ] } """ api_logger.info(f"用户 {current_user.username} 请求获取宿主 {end_user_id} 的chunk摘要、标签和人物形象") data = await memory_dashboard_service.get_chunk_summary_and_tags( end_user_id=end_user_id, limit=limit, max_tags=max_tags, db=db, current_user=current_user ) api_logger.info(f"成功获取chunk摘要、{len(data.get('tags', []))} 个标签和 {len(data.get('personas', []))} 个人物形象") return success(data=data, msg="chunk摘要、标签和人物形象获取成功") @router.get("/chunk_insight", response_model=ApiResponse) async def get_chunk_insight( end_user_id: str = Query(..., description="宿主ID"), limit: int = Query(15, description="返回记录数"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 获取chunk的洞察内容 返回格式: { "insight": "对chunk内容的深度洞察分析" } """ api_logger.info(f"用户 {current_user.username} 请求获取宿主 {end_user_id} 的chunk洞察") data = await memory_dashboard_service.get_chunk_insight( end_user_id=end_user_id, limit=limit, db=db, current_user=current_user ) api_logger.info("成功获取chunk洞察") return success(data=data, msg="chunk洞察获取成功") @router.get("/dashboard_data", response_model=ApiResponse) async def dashboard_data( end_user_id: Optional[str] = Query(None, description="可选的用户ID"), start_date: Optional[int] = Query(None, description="开始时间戳(毫秒)"), end_date: Optional[int] = Query(None, description="结束时间戳(毫秒)"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ 整合dashboard数据接口 整合以下接口的数据: 1. /dashboard/total_memory_count - 记忆总量 2. /dashboard/api_increment - API调用增量 3. /memory/stats/types - 知识库类型统计(只要total数据) 4. /dashboard/total_rag_count - RAG相关数据 根据 storage_type 判断调用不同的接口 返回格式: { "storage_type": str, "neo4j_data": { "total_memory": int, "total_app": int, "total_knowledge": int, "total_api_call": int } | null, "rag_data": { "total_memory": int, "total_app": int, "total_knowledge": int, "total_api_call": int } | null } """ workspace_id = current_user.current_workspace_id api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的dashboard整合数据") # 如果没有提供时间范围,默认使用最近30天 if start_date is None or end_date is None: from datetime import datetime, timedelta end_dt = datetime.now() start_dt = end_dt - timedelta(days=30) end_date = int(end_dt.timestamp() * 1000) start_date = int(start_dt.timestamp() * 1000) api_logger.info(f"使用默认时间范围: {start_dt} 到 {end_dt}") # 获取 storage_type,如果为 None 则使用默认值 storage_type = workspace_service.get_workspace_storage_type( db=db, workspace_id=workspace_id, user=current_user ) if storage_type is None: storage_type = 'neo4j' # 根据 storage_type 决定返回哪个数据对象 # 如果是 'rag',neo4j_data 为 null;否则 rag_data 为 null result = { "storage_type": storage_type, "neo4j_data": None, "rag_data": None } try: # 如果 storage_type 为 'neo4j' 或空,获取 neo4j_data if storage_type == 'neo4j': neo4j_data = { "total_memory": None, "total_app": None, "total_knowledge": None, "total_api_call": None } # 1. 获取记忆总量(total_memory) try: total_memory_data = await memory_dashboard_service.get_workspace_total_memory_count( db=db, workspace_id=workspace_id, current_user=current_user, end_user_id=end_user_id ) neo4j_data["total_memory"] = total_memory_data.get("total_memory_count", 0) # total_app: 统计当前空间下的所有app数量 from app.repositories import app_repository apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id) neo4j_data["total_app"] = len(apps_orm) api_logger.info(f"成功获取记忆总量: {neo4j_data['total_memory']}, 应用数量: {neo4j_data['total_app']}") except Exception as e: api_logger.warning(f"获取记忆总量失败: {str(e)}") # 2. 获取知识库类型统计(total_knowledge) try: from app.services.memory_agent_service import MemoryAgentService memory_agent_service = MemoryAgentService() knowledge_stats = await memory_agent_service.get_knowledge_type_stats( end_user_id=end_user_id, only_active=True, current_workspace_id=workspace_id, db=db ) neo4j_data["total_knowledge"] = knowledge_stats.get("total", 0) api_logger.info(f"成功获取知识库类型统计total: {neo4j_data['total_knowledge']}") except Exception as e: api_logger.warning(f"获取知识库类型统计失败: {str(e)}") # 3. 获取API调用统计(total_api_call) try: # 使用 AppStatisticsService 获取真实的API调用统计 app_stats_service = AppStatisticsService(db) api_stats = app_stats_service.get_workspace_api_statistics( workspace_id=workspace_id, start_date=start_date, end_date=end_date ) # 计算总调用次数 total_api_calls = sum(item.get("total_calls", 0) for item in api_stats) neo4j_data["total_api_call"] = total_api_calls api_logger.info(f"成功获取API调用统计: {neo4j_data['total_api_call']}") except Exception as e: api_logger.error(f"获取API调用统计失败: {str(e)}") neo4j_data["total_api_call"] = 0 result["neo4j_data"] = neo4j_data api_logger.info("成功获取neo4j_data") # 如果 storage_type 为 'rag',获取 rag_data elif storage_type == 'rag': rag_data = { "total_memory": None, "total_app": None, "total_knowledge": None, "total_api_call": None } # 获取RAG相关数据 try: # total_memory: 只统计用户知识库(permission_id='Memory')的chunk数 total_chunk = memory_dashboard_service.get_rag_user_kb_total_chunk(db, current_user) rag_data["total_memory"] = total_chunk # total_app: 统计当前空间下的所有app数量 from app.repositories import app_repository apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id) rag_data["total_app"] = len(apps_orm) # total_knowledge: 使用 total_kb(总知识库数) total_kb = memory_dashboard_service.get_rag_total_kb(db, current_user) rag_data["total_knowledge"] = total_kb # total_api_call: 使用 AppStatisticsService 获取真实的API调用统计 try: app_stats_service = AppStatisticsService(db) api_stats = app_stats_service.get_workspace_api_statistics( workspace_id=workspace_id, start_date=start_date, end_date=end_date ) # 计算总调用次数 total_api_calls = sum(item.get("total_calls", 0) for item in api_stats) rag_data["total_api_call"] = total_api_calls api_logger.info(f"成功获取RAG模式API调用统计: {rag_data['total_api_call']}") except Exception as e: api_logger.warning(f"获取RAG模式API调用统计失败,使用默认值: {str(e)}") rag_data["total_api_call"] = 0 api_logger.info(f"成功获取RAG相关数据: memory={total_chunk}, app={len(apps_orm)}, knowledge={total_kb}, api_calls={rag_data['total_api_call']}") except Exception as e: api_logger.warning(f"获取RAG相关数据失败: {str(e)}") result["rag_data"] = rag_data api_logger.info("成功获取rag_data") api_logger.info("成功获取dashboard整合数据") return success(data=result, msg="Dashboard数据获取成功") except Exception as e: api_logger.error(f"获取dashboard整合数据失败: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取dashboard整合数据失败: {str(e)}" )