703 lines
28 KiB
Python
703 lines
28 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
||
from pydantic import BaseModel, Field
|
||
from sqlalchemy.orm import Session
|
||
from typing import Optional
|
||
from app.core.response_utils import success
|
||
from app.db import get_db
|
||
from app.dependencies import get_current_user
|
||
from app.models.user_model import User
|
||
from app.schemas.response_schema import ApiResponse
|
||
|
||
from app.services import memory_dashboard_service, memory_storage_service, workspace_service
|
||
from app.services.memory_agent_service import get_end_users_connected_configs_batch
|
||
from app.services.app_statistics_service import AppStatisticsService
|
||
from app.core.logging_config import get_api_logger
|
||
|
||
# 获取API专用日志器
|
||
api_logger = get_api_logger()
|
||
|
||
router = APIRouter(
|
||
prefix="/dashboard",
|
||
tags=["Dashboard"],
|
||
dependencies=[Depends(get_current_user)] # Apply auth to all routes in this controller
|
||
)
|
||
|
||
|
||
@router.get("/total_end_users", response_model=ApiResponse)
|
||
def get_workspace_total_end_users(
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取用户列表的总用户数
|
||
"""
|
||
workspace_id = current_user.current_workspace_id
|
||
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表")
|
||
total_end_users = memory_dashboard_service.get_workspace_total_end_users(
|
||
db=db,
|
||
workspace_id=workspace_id,
|
||
current_user=current_user
|
||
)
|
||
api_logger.info(f"成功获取最新用户总数: total_num={total_end_users.get('total_num', 0)}")
|
||
return success(data=total_end_users, msg="用户数量获取成功")
|
||
|
||
|
||
|
||
|
||
|
||
@router.get("/end_users", response_model=ApiResponse)
|
||
async def get_workspace_end_users(
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取工作空间的宿主列表(高性能优化版本 v2)
|
||
|
||
优化策略:
|
||
1. 批量查询 end_users(一次查询而非循环)
|
||
2. 并发查询所有用户的记忆数量(Neo4j)
|
||
3. RAG 模式使用批量查询(一次 SQL)
|
||
4. 只返回必要字段减少数据传输
|
||
5. 添加短期缓存减少重复查询
|
||
6. 并发执行配置查询和记忆数量查询
|
||
|
||
返回格式:
|
||
{
|
||
"end_user": {"id": "uuid", "other_name": "名称"},
|
||
"memory_num": {"total": 数量},
|
||
"memory_config": {"memory_config_id": "id", "memory_config_name": "名称"}
|
||
}
|
||
"""
|
||
import asyncio
|
||
import json
|
||
from app.aioRedis import aio_redis_get, aio_redis_set
|
||
|
||
workspace_id = current_user.current_workspace_id
|
||
|
||
# 尝试从缓存获取(30秒缓存)
|
||
cache_key = f"end_users:workspace:{workspace_id}"
|
||
try:
|
||
cached_data = await aio_redis_get(cache_key)
|
||
if cached_data:
|
||
api_logger.info(f"从缓存获取宿主列表: workspace_id={workspace_id}")
|
||
return success(data=json.loads(cached_data), msg="宿主列表获取成功")
|
||
except Exception as e:
|
||
api_logger.warning(f"Redis 缓存读取失败: {str(e)}")
|
||
|
||
# 获取当前空间类型
|
||
current_workspace_type = memory_dashboard_service.get_current_workspace_type(db, workspace_id, current_user)
|
||
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的宿主列表")
|
||
|
||
# 获取 end_users(已优化为批量查询)
|
||
end_users = memory_dashboard_service.get_workspace_end_users(
|
||
db=db,
|
||
workspace_id=workspace_id,
|
||
current_user=current_user
|
||
)
|
||
if not end_users:
|
||
api_logger.info("工作空间下没有宿主")
|
||
# 缓存空结果,避免重复查询
|
||
try:
|
||
await aio_redis_set(cache_key, json.dumps([]), expire=30)
|
||
except Exception as e:
|
||
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
|
||
return success(data=[], msg="宿主列表获取成功")
|
||
|
||
end_user_ids = [str(user.id) for user in end_users]
|
||
|
||
# 并发执行两个独立的查询任务
|
||
async def get_memory_configs():
|
||
"""获取记忆配置(在线程池中执行同步查询)"""
|
||
try:
|
||
return await asyncio.to_thread(
|
||
get_end_users_connected_configs_batch,
|
||
end_user_ids, db
|
||
)
|
||
except Exception as e:
|
||
api_logger.error(f"批量获取记忆配置失败: {str(e)}")
|
||
return {}
|
||
|
||
async def get_memory_nums():
|
||
"""获取记忆数量"""
|
||
if current_workspace_type == "rag":
|
||
# RAG 模式:批量查询
|
||
try:
|
||
chunk_map = await asyncio.to_thread(
|
||
memory_dashboard_service.get_users_total_chunk_batch,
|
||
end_user_ids, db, current_user
|
||
)
|
||
return {uid: {"total": count} for uid, count in chunk_map.items()}
|
||
except Exception as e:
|
||
api_logger.error(f"批量获取 RAG chunk 数量失败: {str(e)}")
|
||
return {uid: {"total": 0} for uid in end_user_ids}
|
||
|
||
elif current_workspace_type == "neo4j":
|
||
# Neo4j 模式:并发查询(带并发限制)
|
||
# 使用信号量限制并发数,避免大量用户时压垮 Neo4j
|
||
MAX_CONCURRENT_QUERIES = 10
|
||
semaphore = asyncio.Semaphore(MAX_CONCURRENT_QUERIES)
|
||
|
||
async def get_neo4j_memory_num(end_user_id: str):
|
||
async with semaphore:
|
||
try:
|
||
return await memory_storage_service.search_all(end_user_id)
|
||
except Exception as e:
|
||
api_logger.error(f"获取用户 {end_user_id} Neo4j 记忆数量失败: {str(e)}")
|
||
return {"total": 0}
|
||
|
||
memory_nums_list = await asyncio.gather(*[get_neo4j_memory_num(uid) for uid in end_user_ids])
|
||
return {end_user_ids[i]: memory_nums_list[i] for i in range(len(end_user_ids))}
|
||
|
||
return {uid: {"total": 0} for uid in end_user_ids}
|
||
|
||
# 触发按需初始化:为 implicit_emotions_storage 中没有记录的用户异步生成数据
|
||
try:
|
||
from app.celery_app import celery_app as _celery_app
|
||
_celery_app.send_task(
|
||
"app.tasks.init_implicit_emotions_for_users",
|
||
kwargs={"end_user_ids": end_user_ids},
|
||
)
|
||
_celery_app.send_task(
|
||
"app.tasks.init_interest_distribution_for_users",
|
||
kwargs={"end_user_ids": end_user_ids},
|
||
)
|
||
api_logger.info(f"已触发按需初始化任务,候选用户数: {len(end_user_ids)}")
|
||
except Exception as e:
|
||
api_logger.warning(f"触发按需初始化任务失败(不影响主流程): {e}")
|
||
|
||
# 并发执行配置查询和记忆数量查询
|
||
memory_configs_map, memory_nums_map = await asyncio.gather(
|
||
get_memory_configs(),
|
||
get_memory_nums()
|
||
)
|
||
|
||
# 构建结果(优化:使用列表推导式)
|
||
result = []
|
||
for end_user in end_users:
|
||
user_id = str(end_user.id)
|
||
config_info = memory_configs_map.get(user_id, {})
|
||
result.append({
|
||
'end_user': {
|
||
'id': user_id,
|
||
'other_name': end_user.other_name
|
||
},
|
||
'memory_num': memory_nums_map.get(user_id, {"total": 0}),
|
||
'memory_config': {
|
||
"memory_config_id": config_info.get("memory_config_id"),
|
||
"memory_config_name": config_info.get("memory_config_name")
|
||
}
|
||
})
|
||
|
||
# 写入缓存(30秒过期)
|
||
try:
|
||
await aio_redis_set(cache_key, json.dumps(result), expire=30)
|
||
except Exception as e:
|
||
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
|
||
|
||
# 触发社区聚类补全任务(异步,不阻塞接口响应)
|
||
# 对有 ExtractedEntity 但无 Community 节点的存量用户自动补跑全量聚类
|
||
try:
|
||
from app.tasks import init_community_clustering_for_users
|
||
init_community_clustering_for_users.delay(end_user_ids=end_user_ids)
|
||
api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}")
|
||
except Exception as e:
|
||
api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}")
|
||
|
||
api_logger.info(f"成功获取 {len(end_users)} 个宿主记录")
|
||
return success(data=result, msg="宿主列表获取成功")
|
||
|
||
|
||
@router.get("/memory_increment", response_model=ApiResponse)
|
||
def get_workspace_memory_increment(
|
||
limit: int = Query(7, description="返回记录数"),
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""获取工作空间的记忆增量"""
|
||
workspace_id = current_user.current_workspace_id
|
||
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的记忆增量")
|
||
memory_increment = memory_dashboard_service.get_workspace_memory_increment(
|
||
db=db,
|
||
workspace_id=workspace_id,
|
||
current_user=current_user,
|
||
limit=limit
|
||
)
|
||
api_logger.info(f"成功获取 {len(memory_increment)} 条记忆增量记录")
|
||
return success(data=memory_increment, msg="记忆增量获取成功")
|
||
|
||
|
||
@router.get("/api_increment", response_model=ApiResponse)
|
||
def get_workspace_api_increment(
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""获取API调用趋势"""
|
||
workspace_id = current_user.current_workspace_id
|
||
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的API调用增量")
|
||
api_increment = memory_dashboard_service.get_workspace_api_increment(
|
||
db=db,
|
||
workspace_id=workspace_id,
|
||
current_user=current_user
|
||
)
|
||
api_logger.info(f"成功获取 {api_increment} API调用增量")
|
||
return success(data=api_increment, msg="API调用增量获取成功")
|
||
|
||
|
||
@router.post("/total_memory", response_model=ApiResponse)
|
||
def write_workspace_total_memory(
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""工作空间记忆总量的写入(异步任务)"""
|
||
workspace_id = current_user.current_workspace_id
|
||
api_logger.info(f"用户 {current_user.username} 请求写入工作空间 {workspace_id} 的记忆总量")
|
||
|
||
# 触发 Celery 异步任务
|
||
from app.celery_app import celery_app
|
||
task = celery_app.send_task(
|
||
"app.controllers.memory_storage_controller.search_all",
|
||
kwargs={"workspace_id": str(workspace_id)}
|
||
)
|
||
|
||
api_logger.info(f"已触发记忆总量统计任务,task_id: {task.id}")
|
||
return success(
|
||
data={"task_id": task.id, "workspace_id": str(workspace_id)},
|
||
msg="记忆总量统计任务已启动"
|
||
)
|
||
|
||
|
||
@router.get("/task_status/{task_id}", response_model=ApiResponse)
|
||
def get_task_status(
|
||
task_id: str,
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""查询异步任务的执行状态和结果"""
|
||
api_logger.info(f"用户 {current_user.username} 查询任务状态: task_id={task_id}")
|
||
|
||
from app.celery_app import celery_app
|
||
from celery.result import AsyncResult
|
||
|
||
# 获取任务结果
|
||
task_result = AsyncResult(task_id, app=celery_app)
|
||
|
||
response_data = {
|
||
"task_id": task_id,
|
||
"status": task_result.state, # PENDING, STARTED, SUCCESS, FAILURE, RETRY, REVOKED
|
||
}
|
||
|
||
# 如果任务完成,返回结果
|
||
if task_result.ready():
|
||
if task_result.successful():
|
||
response_data["result"] = task_result.result
|
||
api_logger.info(f"任务 {task_id} 执行成功")
|
||
return success(data=response_data, msg="任务执行成功")
|
||
else:
|
||
# 任务失败
|
||
response_data["error"] = str(task_result.result)
|
||
api_logger.error(f"任务 {task_id} 执行失败: {task_result.result}")
|
||
return success(data=response_data, msg="任务执行失败")
|
||
else:
|
||
# 任务还在执行中
|
||
api_logger.info(f"任务 {task_id} 状态: {task_result.state}")
|
||
return success(data=response_data, msg=f"任务状态: {task_result.state}")
|
||
|
||
|
||
@router.get("/memory_list", response_model=ApiResponse)
|
||
def get_workspace_memory_list(
|
||
limit: int = Query(7, description="记忆增量返回记录数"),
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
用户记忆列表整合接口
|
||
|
||
整合以下三个接口的数据:
|
||
1. total_memory - 工作空间记忆总量
|
||
2. memory_increment - 工作空间记忆增量
|
||
3. hosts - 工作空间宿主列表
|
||
|
||
返回格式:
|
||
{
|
||
"total_memory": float,
|
||
"memory_increment": [
|
||
{"date": "2024-01-01", "count": 100},
|
||
...
|
||
],
|
||
"hosts": [
|
||
{"id": "uuid", "name": "宿主名", ...},
|
||
...
|
||
]
|
||
}
|
||
"""
|
||
workspace_id = current_user.current_workspace_id
|
||
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的记忆列表")
|
||
memory_list = memory_dashboard_service.get_workspace_memory_list(
|
||
db=db,
|
||
workspace_id=workspace_id,
|
||
current_user=current_user,
|
||
limit=limit
|
||
)
|
||
api_logger.info("成功获取记忆列表")
|
||
return success(data=memory_list, msg="记忆列表获取成功")
|
||
|
||
|
||
@router.get("/total_memory_count", response_model=ApiResponse)
|
||
async def get_workspace_total_memory_count(
|
||
end_user_id: Optional[str] = Query(None, description="可选的用户ID"),
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取工作空间的记忆总量(通过聚合所有host的记忆数)
|
||
|
||
逻辑:
|
||
1. 从 memory_list 获取所有 host_id
|
||
2. 对每个 host_id 调用 search_all 获取 total
|
||
3. 将所有 total 求和返回
|
||
|
||
返回格式:
|
||
{
|
||
"total_memory_count": int,
|
||
"host_count": int,
|
||
"details": [
|
||
{"end_user_id": "uuid", "count": 100, "name": "用户名称"},
|
||
...
|
||
]
|
||
}
|
||
"""
|
||
workspace_id = current_user.current_workspace_id
|
||
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的记忆总量")
|
||
total_memory_count = await memory_dashboard_service.get_workspace_total_memory_count(
|
||
db=db,
|
||
workspace_id=workspace_id,
|
||
current_user=current_user,
|
||
end_user_id=end_user_id
|
||
)
|
||
api_logger.info(f"成功获取记忆总量: {total_memory_count.get('total_memory_count', 0)}")
|
||
return success(data=total_memory_count, msg="记忆总量获取成功")
|
||
|
||
|
||
# ======== RAG 数据统计 ========
|
||
@router.get("/total_rag_count", response_model=ApiResponse)
|
||
def get_workspace_total_rag_count(
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user)
|
||
):
|
||
"""
|
||
获取 rag 的总文档数、总chunk数、总知识库数量、总api调用数量
|
||
"""
|
||
total_documents = memory_dashboard_service.get_rag_total_doc(db, current_user)
|
||
total_chunk = memory_dashboard_service.get_rag_total_chunk(db, current_user)
|
||
total_kb = memory_dashboard_service.get_rag_total_kb(db, current_user)
|
||
data = {
|
||
'total_documents':total_documents,
|
||
'total_chunk':total_chunk,
|
||
'total_kb':total_kb,
|
||
'total_api':1024
|
||
}
|
||
return success(data=data, msg="RAG相关数据获取成功")
|
||
|
||
@router.get("/current_user_rag_total_num", response_model=ApiResponse)
|
||
def get_current_user_rag_total_num(
|
||
end_user_id: str = Query(..., description="宿主ID"),
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取当前宿主的 RAG 的总chunk数量
|
||
"""
|
||
total_chunk = memory_dashboard_service.get_current_user_total_chunk(end_user_id, db, current_user)
|
||
return success(data=total_chunk, msg="宿主RAG知识数据获取成功")
|
||
|
||
@router.get("/rag_content", response_model=ApiResponse)
|
||
def get_rag_content(
|
||
end_user_id: str = Query(..., description="宿主ID"),
|
||
page: int = Query(1, gt=0, description="页码,从1开始"),
|
||
pagesize: int = Query(15, gt=0, le=100, description="每页返回记录数"),
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取当前宿主知识库中的chunk内容(分页)
|
||
"""
|
||
data = memory_dashboard_service.get_rag_content(end_user_id, page, pagesize, db, current_user)
|
||
return success(data=data, msg="宿主RAGchunk数据获取成功")
|
||
|
||
|
||
@router.get("/chunk_summary_tag", response_model=ApiResponse)
|
||
async def get_chunk_summary_tag(
|
||
end_user_id: str = Query(..., description="宿主ID"),
|
||
limit: int = Query(15, description="返回记录数"),
|
||
max_tags: int = Query(10, description="最大标签数量"),
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
读取RAG摘要、标签和人物形象(纯读库,不触发生成)。
|
||
|
||
返回格式:
|
||
{
|
||
"summary": "用户摘要",
|
||
"tags": [{"tag": "标签1", "frequency": 5}, ...],
|
||
"personas": ["产品设计师", ...],
|
||
"generated": true/false // false表示尚未生产,请调用 /generate_rag_profile
|
||
}
|
||
"""
|
||
api_logger.info(f"用户 {current_user.username} 读取宿主 {end_user_id} 的RAG摘要/标签/人物形象")
|
||
|
||
data = await memory_dashboard_service.get_chunk_summary_and_tags(
|
||
end_user_id=end_user_id,
|
||
limit=limit,
|
||
max_tags=max_tags,
|
||
db=db,
|
||
current_user=current_user
|
||
)
|
||
|
||
return success(data=data, msg="获取成功")
|
||
|
||
|
||
@router.get("/chunk_insight", response_model=ApiResponse)
|
||
async def get_chunk_insight(
|
||
end_user_id: str = Query(..., description="宿主ID"),
|
||
limit: int = Query(15, description="返回记录数"),
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
读取RAG洞察报告(纯读库,不触发生成)。
|
||
|
||
返回格式:
|
||
{
|
||
"insight": "总体概述",
|
||
"behavior_pattern": "行为模式",
|
||
"key_findings": "关键发现",
|
||
"growth_trajectory": "成长轨迹",
|
||
"generated": true/false // false表示尚未生产,请调用 /generate_rag_profile
|
||
}
|
||
"""
|
||
api_logger.info(f"用户 {current_user.username} 读取宿主 {end_user_id} 的RAG洞察")
|
||
|
||
data = await memory_dashboard_service.get_chunk_insight(
|
||
end_user_id=end_user_id,
|
||
limit=limit,
|
||
db=db,
|
||
current_user=current_user
|
||
)
|
||
|
||
return success(data=data, msg="获取成功")
|
||
|
||
|
||
class GenerateRagProfileRequest(BaseModel):
|
||
end_user_id: str = Field(..., description="宿主ID")
|
||
limit: int = Field(15, description="参与生成的chunk数量上限")
|
||
max_tags: int = Field(10, description="最大标签数量")
|
||
|
||
|
||
@router.post("/generate_rag_profile", response_model=ApiResponse)
|
||
async def generate_rag_profile(
|
||
body: GenerateRagProfileRequest,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
生产接口:为RAG存储模式的宿主全量重新生成完整画像并持久化到end_user表。
|
||
每次请求都会重新生成,覆盖已有数据。
|
||
"""
|
||
api_logger.info(f"用户 {current_user.username} 触发RAG画像生产: end_user_id={body.end_user_id}")
|
||
|
||
data = await memory_dashboard_service.generate_rag_profile(
|
||
end_user_id=body.end_user_id,
|
||
limit=body.limit,
|
||
max_tags=body.max_tags,
|
||
db=db,
|
||
current_user=current_user,
|
||
)
|
||
|
||
api_logger.info(f"RAG画像生产完成: {data}")
|
||
return success(data=data, msg="RAG画像生产完成")
|
||
|
||
|
||
@router.get("/dashboard_data", response_model=ApiResponse)
|
||
async def dashboard_data(
|
||
end_user_id: Optional[str] = Query(None, description="可选的用户ID"),
|
||
start_date: Optional[int] = Query(None, description="开始时间戳(毫秒)"),
|
||
end_date: Optional[int] = Query(None, description="结束时间戳(毫秒)"),
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
整合dashboard数据接口
|
||
|
||
整合以下接口的数据:
|
||
1. /dashboard/total_memory_count - 记忆总量
|
||
2. /dashboard/api_increment - API调用增量
|
||
3. /memory/stats/types - 知识库类型统计(只要total数据)
|
||
4. /dashboard/total_rag_count - RAG相关数据
|
||
|
||
根据 storage_type 判断调用不同的接口
|
||
|
||
返回格式:
|
||
{
|
||
"storage_type": str,
|
||
"neo4j_data": {
|
||
"total_memory": int,
|
||
"total_app": int,
|
||
"total_knowledge": int,
|
||
"total_api_call": int
|
||
} | null,
|
||
"rag_data": {
|
||
"total_memory": int,
|
||
"total_app": int,
|
||
"total_knowledge": int,
|
||
"total_api_call": int
|
||
} | null
|
||
}
|
||
"""
|
||
workspace_id = current_user.current_workspace_id
|
||
api_logger.info(f"用户 {current_user.username} 请求获取工作空间 {workspace_id} 的dashboard整合数据")
|
||
|
||
# 如果没有提供时间范围,默认使用最近30天
|
||
if start_date is None or end_date is None:
|
||
from datetime import datetime, timedelta
|
||
end_dt = datetime.now()
|
||
start_dt = end_dt - timedelta(days=30)
|
||
end_date = int(end_dt.timestamp() * 1000)
|
||
start_date = int(start_dt.timestamp() * 1000)
|
||
api_logger.info(f"使用默认时间范围: {start_dt} 到 {end_dt}")
|
||
|
||
# 获取 storage_type,如果为 None 则使用默认值
|
||
storage_type = workspace_service.get_workspace_storage_type(
|
||
db=db,
|
||
workspace_id=workspace_id,
|
||
user=current_user
|
||
)
|
||
if storage_type is None:
|
||
storage_type = 'neo4j'
|
||
|
||
|
||
# 根据 storage_type 决定返回哪个数据对象
|
||
# 如果是 'rag',neo4j_data 为 null;否则 rag_data 为 null
|
||
result = {
|
||
"storage_type": storage_type,
|
||
"neo4j_data": None,
|
||
"rag_data": None
|
||
}
|
||
|
||
try:
|
||
# 如果 storage_type 为 'neo4j' 或空,获取 neo4j_data
|
||
if storage_type == 'neo4j':
|
||
neo4j_data = {
|
||
"total_memory": None,
|
||
"total_app": None,
|
||
"total_knowledge": None,
|
||
"total_api_call": None
|
||
}
|
||
|
||
# 1. 获取记忆总量(total_memory)
|
||
try:
|
||
total_memory_data = await memory_dashboard_service.get_workspace_total_memory_count(
|
||
db=db,
|
||
workspace_id=workspace_id,
|
||
current_user=current_user,
|
||
end_user_id=end_user_id
|
||
)
|
||
neo4j_data["total_memory"] = total_memory_data.get("total_memory_count", 0)
|
||
# total_app: 统计当前空间下的所有app数量
|
||
from app.repositories import app_repository
|
||
apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id)
|
||
neo4j_data["total_app"] = len(apps_orm)
|
||
api_logger.info(f"成功获取记忆总量: {neo4j_data['total_memory']}, 应用数量: {neo4j_data['total_app']}")
|
||
except Exception as e:
|
||
api_logger.warning(f"获取记忆总量失败: {str(e)}")
|
||
|
||
# 2. 获取知识库类型统计(total_knowledge)
|
||
try:
|
||
from app.services.memory_agent_service import MemoryAgentService
|
||
memory_agent_service = MemoryAgentService()
|
||
knowledge_stats = await memory_agent_service.get_knowledge_type_stats(
|
||
end_user_id=end_user_id,
|
||
only_active=True,
|
||
current_workspace_id=workspace_id,
|
||
db=db
|
||
)
|
||
neo4j_data["total_knowledge"] = knowledge_stats.get("total", 0)
|
||
api_logger.info(f"成功获取知识库类型统计total: {neo4j_data['total_knowledge']}")
|
||
except Exception as e:
|
||
api_logger.warning(f"获取知识库类型统计失败: {str(e)}")
|
||
|
||
# 3. 获取API调用统计(total_api_call)
|
||
try:
|
||
# 使用 AppStatisticsService 获取真实的API调用统计
|
||
app_stats_service = AppStatisticsService(db)
|
||
api_stats = app_stats_service.get_workspace_api_statistics(
|
||
workspace_id=workspace_id,
|
||
start_date=start_date,
|
||
end_date=end_date
|
||
)
|
||
# 计算总调用次数
|
||
total_api_calls = sum(item.get("total_calls", 0) for item in api_stats)
|
||
neo4j_data["total_api_call"] = total_api_calls
|
||
api_logger.info(f"成功获取API调用统计: {neo4j_data['total_api_call']}")
|
||
except Exception as e:
|
||
api_logger.error(f"获取API调用统计失败: {str(e)}")
|
||
neo4j_data["total_api_call"] = 0
|
||
|
||
result["neo4j_data"] = neo4j_data
|
||
api_logger.info("成功获取neo4j_data")
|
||
|
||
# 如果 storage_type 为 'rag',获取 rag_data
|
||
elif storage_type == 'rag':
|
||
rag_data = {
|
||
"total_memory": None,
|
||
"total_app": None,
|
||
"total_knowledge": None,
|
||
"total_api_call": None
|
||
}
|
||
|
||
# 获取RAG相关数据
|
||
try:
|
||
# total_memory: 只统计用户知识库(permission_id='Memory')的chunk数
|
||
total_chunk = memory_dashboard_service.get_rag_user_kb_total_chunk(db, current_user)
|
||
rag_data["total_memory"] = total_chunk
|
||
|
||
# total_app: 统计当前空间下的所有app数量
|
||
from app.repositories import app_repository
|
||
apps_orm = app_repository.get_apps_by_workspace_id(db, workspace_id)
|
||
rag_data["total_app"] = len(apps_orm)
|
||
|
||
# total_knowledge: 使用 total_kb(总知识库数)
|
||
total_kb = memory_dashboard_service.get_rag_total_kb(db, current_user)
|
||
rag_data["total_knowledge"] = total_kb
|
||
|
||
# total_api_call: 使用 AppStatisticsService 获取真实的API调用统计
|
||
try:
|
||
app_stats_service = AppStatisticsService(db)
|
||
api_stats = app_stats_service.get_workspace_api_statistics(
|
||
workspace_id=workspace_id,
|
||
start_date=start_date,
|
||
end_date=end_date
|
||
)
|
||
# 计算总调用次数
|
||
total_api_calls = sum(item.get("total_calls", 0) for item in api_stats)
|
||
rag_data["total_api_call"] = total_api_calls
|
||
api_logger.info(f"成功获取RAG模式API调用统计: {rag_data['total_api_call']}")
|
||
except Exception as e:
|
||
api_logger.warning(f"获取RAG模式API调用统计失败,使用默认值: {str(e)}")
|
||
rag_data["total_api_call"] = 0
|
||
|
||
api_logger.info(f"成功获取RAG相关数据: memory={total_chunk}, app={len(apps_orm)}, knowledge={total_kb}, api_calls={rag_data['total_api_call']}")
|
||
except Exception as e:
|
||
api_logger.warning(f"获取RAG相关数据失败: {str(e)}")
|
||
|
||
result["rag_data"] = rag_data
|
||
api_logger.info("成功获取rag_data")
|
||
|
||
api_logger.info("成功获取dashboard整合数据")
|
||
return success(data=result, msg="Dashboard数据获取成功")
|
||
|
||
except Exception as e:
|
||
api_logger.error(f"获取dashboard整合数据失败: {str(e)}")
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"获取dashboard整合数据失败: {str(e)}"
|
||
) |