Merge pull request #774 from SuanmoSuanyangTechnology/feat/data-transformation
Feat/data transformation
This commit is contained in:
@@ -352,6 +352,7 @@ async def delete_knowledge(
|
|||||||
# 2. Soft-delete knowledge base
|
# 2. Soft-delete knowledge base
|
||||||
api_logger.debug(f"Perform a soft delete: {db_knowledge.name} (ID: {knowledge_id})")
|
api_logger.debug(f"Perform a soft delete: {db_knowledge.name} (ID: {knowledge_id})")
|
||||||
db_knowledge.status = 2
|
db_knowledge.status = 2
|
||||||
|
db_knowledge.updated_at = datetime.datetime.now()
|
||||||
db.commit()
|
db.commit()
|
||||||
api_logger.info(f"The knowledge base has been successfully deleted: {db_knowledge.name} (ID: {knowledge_id})")
|
api_logger.info(f"The knowledge base has been successfully deleted: {db_knowledge.name} (ID: {knowledge_id})")
|
||||||
return success(msg="The knowledge base has been successfully deleted")
|
return success(msg="The knowledge base has been successfully deleted")
|
||||||
|
|||||||
@@ -591,7 +591,7 @@ async def dashboard_data(
|
|||||||
"total_api_call": None
|
"total_api_call": None
|
||||||
}
|
}
|
||||||
|
|
||||||
# 1. 获取记忆总量(total_memory)
|
# 1. 获取记忆总量(total_memory)—— neo4j 独有逻辑:查询 neo4j 存储节点
|
||||||
try:
|
try:
|
||||||
total_memory_data = await memory_dashboard_service.get_workspace_total_memory_count(
|
total_memory_data = await memory_dashboard_service.get_workspace_total_memory_count(
|
||||||
db=db,
|
db=db,
|
||||||
@@ -600,48 +600,32 @@ async def dashboard_data(
|
|||||||
end_user_id=end_user_id
|
end_user_id=end_user_id
|
||||||
)
|
)
|
||||||
neo4j_data["total_memory"] = total_memory_data.get("total_memory_count", 0)
|
neo4j_data["total_memory"] = total_memory_data.get("total_memory_count", 0)
|
||||||
# total_app: 统计当前空间下的所有app数量
|
api_logger.info(f"成功获取记忆总量: {neo4j_data['total_memory']}")
|
||||||
# 包含自有app + 被分享给本工作空间的app
|
|
||||||
from app.services import app_service as _app_svc
|
|
||||||
_, total_app = _app_svc.AppService(db).list_apps(
|
|
||||||
workspace_id=workspace_id, include_shared=True, pagesize=1
|
|
||||||
)
|
|
||||||
neo4j_data["total_app"] = total_app
|
|
||||||
api_logger.info(f"成功获取记忆总量: {neo4j_data['total_memory']}, 应用数量: {neo4j_data['total_app']}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
api_logger.warning(f"获取记忆总量失败: {str(e)}")
|
api_logger.warning(f"获取记忆总量失败: {str(e)}")
|
||||||
|
|
||||||
# 2. 获取知识库类型统计(total_knowledge)
|
# 2. 获取共享统计数据(total_app、total_knowledge、total_api_call)
|
||||||
try:
|
common_stats = memory_dashboard_service.get_dashboard_common_stats(db, workspace_id)
|
||||||
from app.services.memory_agent_service import MemoryAgentService
|
neo4j_data.update(common_stats)
|
||||||
memory_agent_service = MemoryAgentService()
|
api_logger.info(f"成功获取共享统计: app={common_stats['total_app']}, knowledge={common_stats['total_knowledge']}, api_call={common_stats['total_api_call']}")
|
||||||
knowledge_stats = await memory_agent_service.get_knowledge_type_stats(
|
|
||||||
end_user_id=end_user_id,
|
|
||||||
only_active=True,
|
|
||||||
current_workspace_id=workspace_id,
|
|
||||||
db=db
|
|
||||||
)
|
|
||||||
neo4j_data["total_knowledge"] = knowledge_stats.get("total", 0)
|
|
||||||
api_logger.info(f"成功获取知识库类型统计total: {neo4j_data['total_knowledge']}")
|
|
||||||
except Exception as e:
|
|
||||||
api_logger.warning(f"获取知识库类型统计失败: {str(e)}")
|
|
||||||
|
|
||||||
# 3. 获取API调用统计(total_api_call)
|
# 计算昨日对比
|
||||||
try:
|
try:
|
||||||
# 使用 AppStatisticsService 获取真实的API调用统计
|
changes = memory_dashboard_service.get_dashboard_yesterday_changes(
|
||||||
app_stats_service = AppStatisticsService(db)
|
db=db,
|
||||||
api_stats = app_stats_service.get_workspace_api_statistics(
|
|
||||||
workspace_id=workspace_id,
|
workspace_id=workspace_id,
|
||||||
start_date=start_date,
|
storage_type=storage_type,
|
||||||
end_date=end_date
|
today_data=neo4j_data
|
||||||
)
|
)
|
||||||
# 计算总调用次数
|
neo4j_data.update(changes)
|
||||||
total_api_calls = sum(item.get("total_calls", 0) for item in api_stats)
|
|
||||||
neo4j_data["total_api_call"] = total_api_calls
|
|
||||||
api_logger.info(f"成功获取API调用统计: {neo4j_data['total_api_call']}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
api_logger.error(f"获取API调用统计失败: {str(e)}")
|
api_logger.warning(f"计算neo4j昨日对比失败: {str(e)}")
|
||||||
neo4j_data["total_api_call"] = 0
|
neo4j_data.update({
|
||||||
|
"total_memory_change": None,
|
||||||
|
"total_app_change": None,
|
||||||
|
"total_knowledge_change": None,
|
||||||
|
"total_api_call_change": None,
|
||||||
|
})
|
||||||
|
|
||||||
result["neo4j_data"] = neo4j_data
|
result["neo4j_data"] = neo4j_data
|
||||||
api_logger.info("成功获取neo4j_data")
|
api_logger.info("成功获取neo4j_data")
|
||||||
@@ -655,43 +639,36 @@ async def dashboard_data(
|
|||||||
"total_api_call": None
|
"total_api_call": None
|
||||||
}
|
}
|
||||||
|
|
||||||
# 获取RAG相关数据
|
# 1. 获取记忆总量(total_memory)—— rag 独有逻辑:查询 document 表的 chunk_num
|
||||||
try:
|
try:
|
||||||
# total_memory: 只统计用户知识库(permission_id='Memory')的chunk数
|
|
||||||
total_chunk = memory_dashboard_service.get_rag_user_kb_total_chunk(db, current_user)
|
total_chunk = memory_dashboard_service.get_rag_user_kb_total_chunk(db, current_user)
|
||||||
rag_data["total_memory"] = total_chunk
|
rag_data["total_memory"] = total_chunk
|
||||||
|
api_logger.info(f"成功获取RAG记忆总量: {total_chunk}")
|
||||||
# total_app: 统计当前空间下的所有app数量
|
|
||||||
# 包含自有app + 被分享给本工作空间的app
|
|
||||||
from app.services import app_service as _app_svc
|
|
||||||
_, total_app = _app_svc.AppService(db).list_apps(
|
|
||||||
workspace_id=workspace_id, include_shared=True, pagesize=1
|
|
||||||
)
|
|
||||||
rag_data["total_app"] = total_app
|
|
||||||
|
|
||||||
# total_knowledge: 使用 total_kb(总知识库数)
|
|
||||||
total_kb = memory_dashboard_service.get_rag_total_kb(db, current_user)
|
|
||||||
rag_data["total_knowledge"] = total_kb
|
|
||||||
|
|
||||||
# total_api_call: 使用 AppStatisticsService 获取真实的API调用统计
|
|
||||||
try:
|
|
||||||
app_stats_service = AppStatisticsService(db)
|
|
||||||
api_stats = app_stats_service.get_workspace_api_statistics(
|
|
||||||
workspace_id=workspace_id,
|
|
||||||
start_date=start_date,
|
|
||||||
end_date=end_date
|
|
||||||
)
|
|
||||||
# 计算总调用次数
|
|
||||||
total_api_calls = sum(item.get("total_calls", 0) for item in api_stats)
|
|
||||||
rag_data["total_api_call"] = total_api_calls
|
|
||||||
api_logger.info(f"成功获取RAG模式API调用统计: {rag_data['total_api_call']}")
|
|
||||||
except Exception as e:
|
|
||||||
api_logger.warning(f"获取RAG模式API调用统计失败,使用默认值: {str(e)}")
|
|
||||||
rag_data["total_api_call"] = 0
|
|
||||||
|
|
||||||
api_logger.info(f"成功获取RAG相关数据: memory={total_chunk}, app={total_app}, knowledge={total_kb}, api_calls={rag_data['total_api_call']}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
api_logger.warning(f"获取RAG相关数据失败: {str(e)}")
|
api_logger.warning(f"获取RAG记忆总量失败: {str(e)}")
|
||||||
|
|
||||||
|
# 2. 获取共享统计数据(total_app、total_knowledge、total_api_call)
|
||||||
|
common_stats = memory_dashboard_service.get_dashboard_common_stats(db, workspace_id)
|
||||||
|
rag_data.update(common_stats)
|
||||||
|
api_logger.info(f"成功获取共享统计: app={common_stats['total_app']}, knowledge={common_stats['total_knowledge']}, api_call={common_stats['total_api_call']}")
|
||||||
|
|
||||||
|
# 计算昨日对比
|
||||||
|
try:
|
||||||
|
changes = memory_dashboard_service.get_dashboard_yesterday_changes(
|
||||||
|
db=db,
|
||||||
|
workspace_id=workspace_id,
|
||||||
|
storage_type=storage_type,
|
||||||
|
today_data=rag_data
|
||||||
|
)
|
||||||
|
rag_data.update(changes)
|
||||||
|
except Exception as e:
|
||||||
|
api_logger.warning(f"计算RAG昨日对比失败: {str(e)}")
|
||||||
|
rag_data.update({
|
||||||
|
"total_memory_change": None,
|
||||||
|
"total_app_change": None,
|
||||||
|
"total_knowledge_change": None,
|
||||||
|
"total_api_call_change": None,
|
||||||
|
})
|
||||||
|
|
||||||
result["rag_data"] = rag_data
|
result["rag_data"] = rag_data
|
||||||
api_logger.info("成功获取rag_data")
|
api_logger.info("成功获取rag_data")
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ from app.services.memory_storage_service import (
|
|||||||
analytics_hot_memory_tags,
|
analytics_hot_memory_tags,
|
||||||
analytics_recent_activity_stats,
|
analytics_recent_activity_stats,
|
||||||
kb_type_distribution,
|
kb_type_distribution,
|
||||||
search_all,
|
search_all_batch,
|
||||||
search_chunk,
|
search_chunk,
|
||||||
search_detials,
|
search_detials,
|
||||||
search_dialogue,
|
search_dialogue,
|
||||||
@@ -409,7 +409,10 @@ async def search_all_num(
|
|||||||
) -> dict:
|
) -> dict:
|
||||||
api_logger.info(f"Search all requested for end_user_id: {end_user_id}")
|
api_logger.info(f"Search all requested for end_user_id: {end_user_id}")
|
||||||
try:
|
try:
|
||||||
result = await search_all(end_user_id)
|
if not end_user_id:
|
||||||
|
return success(data={"total": 0}, msg="查询成功")
|
||||||
|
batch_result = await search_all_batch([end_user_id])
|
||||||
|
result = {"total": batch_result.get(end_user_id, 0)}
|
||||||
return success(data=result, msg="查询成功")
|
return success(data=result, msg="查询成功")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
api_logger.error(f"Search all failed: {str(e)}")
|
api_logger.error(f"Search all failed: {str(e)}")
|
||||||
|
|||||||
@@ -353,15 +353,13 @@ async def get_workspace_total_memory_count(
|
|||||||
"details": []
|
"details": []
|
||||||
}
|
}
|
||||||
|
|
||||||
# 2. 对每个 host_id 调用 search_all 获取 total
|
# 2. 使用 search_all_batch 批量查询所有宿主的记忆数量
|
||||||
from app.services import memory_storage_service
|
from app.services import memory_storage_service
|
||||||
|
|
||||||
total_count = 0
|
|
||||||
details = []
|
|
||||||
|
|
||||||
# 如果提供了 end_user_id,只查询该用户
|
# 如果提供了 end_user_id,只查询该用户
|
||||||
if end_user_id:
|
if end_user_id:
|
||||||
search_result = await memory_storage_service.search_all(end_user_id=end_user_id)
|
batch_result = await memory_storage_service.search_all_batch([end_user_id])
|
||||||
|
count = batch_result.get(end_user_id, 0)
|
||||||
# 查询用户名称
|
# 查询用户名称
|
||||||
from app.repositories.end_user_repository import EndUserRepository
|
from app.repositories.end_user_repository import EndUserRepository
|
||||||
repo = EndUserRepository(db)
|
repo = EndUserRepository(db)
|
||||||
@@ -369,42 +367,31 @@ async def get_workspace_total_memory_count(
|
|||||||
user_name = end_user.other_name if end_user else None
|
user_name = end_user.other_name if end_user else None
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"total_memory_count": search_result.get("total", 0),
|
"total_memory_count": count,
|
||||||
"host_count": 1,
|
"host_count": 1,
|
||||||
"details": [{
|
"details": [{
|
||||||
"end_user_id": end_user_id,
|
"end_user_id": end_user_id,
|
||||||
"count": search_result.get("total", 0),
|
"count": count,
|
||||||
"name": user_name
|
"name": user_name
|
||||||
}]
|
}]
|
||||||
}
|
}
|
||||||
|
|
||||||
for host in hosts:
|
# 批量查询所有宿主记忆数量(一次 Neo4j 查询)
|
||||||
try:
|
end_user_ids = [str(host.id) for host in hosts]
|
||||||
end_user_id_str = str(host.id)
|
batch_result = await memory_storage_service.search_all_batch(end_user_ids)
|
||||||
|
|
||||||
search_result = await memory_storage_service.search_all(
|
# 构建 host name 映射
|
||||||
end_user_id=end_user_id_str
|
host_name_map = {str(host.id): host.other_name for host in hosts}
|
||||||
)
|
|
||||||
|
|
||||||
host_total = search_result.get("total", 0)
|
total_count = sum(batch_result.values())
|
||||||
total_count += host_total
|
details = [
|
||||||
|
{
|
||||||
details.append({
|
"end_user_id": uid,
|
||||||
"end_user_id": end_user_id_str,
|
"count": batch_result.get(uid, 0),
|
||||||
"count": host_total,
|
"name": host_name_map.get(uid)
|
||||||
"name": host.other_name # 使用 other_name 字段
|
}
|
||||||
})
|
for uid in end_user_ids
|
||||||
|
]
|
||||||
business_logger.debug(f"EndUser {end_user_id_str} ({host.other_name}) 记忆数: {host_total}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
business_logger.warning(f"获取 end_user {host.id} 记忆数失败: {str(e)}")
|
|
||||||
# 失败的 host 记为 0
|
|
||||||
details.append({
|
|
||||||
"end_user_id": str(host.id),
|
|
||||||
"count": 0,
|
|
||||||
"name": host.other_name # 使用 other_name 字段
|
|
||||||
})
|
|
||||||
|
|
||||||
result = {
|
result = {
|
||||||
"total_memory_count": total_count,
|
"total_memory_count": total_count,
|
||||||
@@ -519,6 +506,206 @@ def get_rag_user_kb_total_chunk(
|
|||||||
business_logger.error(f"获取用户知识库总chunk数失败: workspace_id={workspace_id} - {str(e)}")
|
business_logger.error(f"获取用户知识库总chunk数失败: workspace_id={workspace_id} - {str(e)}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def get_dashboard_yesterday_changes(
|
||||||
|
db: Session,
|
||||||
|
workspace_id: uuid.UUID,
|
||||||
|
storage_type: str,
|
||||||
|
today_data: dict
|
||||||
|
) -> dict:
|
||||||
|
"""
|
||||||
|
计算各指标相比昨天的变化量。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: 数据库会话
|
||||||
|
workspace_id: 工作空间ID
|
||||||
|
storage_type: 存储类型 'neo4j' | 'rag'
|
||||||
|
today_data: 当前数据,包含 total_memory, total_app, total_knowledge, total_api_call
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
{
|
||||||
|
"total_memory_change": int | None,
|
||||||
|
"total_app_change": int | None,
|
||||||
|
"total_knowledge_change": int | None,
|
||||||
|
"total_api_call_change": int | None
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from sqlalchemy import func
|
||||||
|
from app.models.api_key_model import ApiKey, ApiKeyLog
|
||||||
|
from app.models.knowledge_model import Knowledge
|
||||||
|
from app.models.app_model import App
|
||||||
|
from app.models.appshare_model import AppShare
|
||||||
|
|
||||||
|
business_logger.info(f"计算昨日对比: workspace_id={workspace_id}, storage_type={storage_type}")
|
||||||
|
|
||||||
|
now_local = datetime.now()
|
||||||
|
today_start = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||||
|
yesterday_start = today_start - timedelta(days=1)
|
||||||
|
|
||||||
|
changes = {
|
||||||
|
"total_memory_change": None,
|
||||||
|
"total_app_change": None,
|
||||||
|
"total_knowledge_change": None,
|
||||||
|
"total_api_call_change": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# --- total_api_call_change ---
|
||||||
|
try:
|
||||||
|
# 获取该workspace下所有api_key的id
|
||||||
|
api_key_ids = [
|
||||||
|
row[0] for row in db.query(ApiKey.id).filter(
|
||||||
|
ApiKey.workspace_id == workspace_id
|
||||||
|
).all()
|
||||||
|
]
|
||||||
|
if api_key_ids:
|
||||||
|
# 今日累计
|
||||||
|
today_api_count = db.query(func.count(ApiKeyLog.id)).filter(
|
||||||
|
ApiKeyLog.api_key_id.in_(api_key_ids),
|
||||||
|
ApiKeyLog.created_at >= today_start,
|
||||||
|
ApiKeyLog.created_at < now_local
|
||||||
|
).scalar() or 0
|
||||||
|
# 昨日全天
|
||||||
|
yesterday_api_count = db.query(func.count(ApiKeyLog.id)).filter(
|
||||||
|
ApiKeyLog.api_key_id.in_(api_key_ids),
|
||||||
|
ApiKeyLog.created_at >= yesterday_start,
|
||||||
|
ApiKeyLog.created_at < today_start
|
||||||
|
).scalar() or 0
|
||||||
|
changes["total_api_call_change"] = today_api_count - yesterday_api_count
|
||||||
|
else:
|
||||||
|
# 没有api_key,如果今日也是0则无对比意义
|
||||||
|
changes["total_api_call_change"] = None
|
||||||
|
except Exception as e:
|
||||||
|
business_logger.warning(f"计算API调用昨日对比失败: {str(e)}")
|
||||||
|
|
||||||
|
# --- total_knowledge_change ---
|
||||||
|
try:
|
||||||
|
# 今天有效总量:当前status=1的知识库总数,排除用户知识库(permission_id='Memory')
|
||||||
|
today_knowledge = db.query(func.count(Knowledge.id)).filter(
|
||||||
|
Knowledge.workspace_id == workspace_id,
|
||||||
|
Knowledge.status == 1,
|
||||||
|
Knowledge.permission_id != "Memory"
|
||||||
|
).scalar() or 0
|
||||||
|
# 昨日有效总量:昨天之前创建的、当前仍有效的知识库,排除用户知识库
|
||||||
|
yesterday_knowledge = db.query(func.count(Knowledge.id)).filter(
|
||||||
|
Knowledge.workspace_id == workspace_id,
|
||||||
|
Knowledge.status == 1,
|
||||||
|
Knowledge.permission_id != "Memory",
|
||||||
|
Knowledge.created_at < today_start
|
||||||
|
).scalar() or 0
|
||||||
|
# 今日软删:今天被软删的知识库(status=2 且 updated_at >= today_start),排除用户知识库
|
||||||
|
today_deleted_knowledge = db.query(func.count(Knowledge.id)).filter(
|
||||||
|
Knowledge.workspace_id == workspace_id,
|
||||||
|
Knowledge.status == 2,
|
||||||
|
Knowledge.permission_id != "Memory",
|
||||||
|
Knowledge.updated_at >= today_start
|
||||||
|
).scalar() or 0
|
||||||
|
|
||||||
|
if yesterday_knowledge == 0 and today_knowledge == 0 and today_deleted_knowledge == 0:
|
||||||
|
changes["total_knowledge_change"] = None
|
||||||
|
else:
|
||||||
|
# change = 今天有效总量 - 今日软删 - 昨日有效总量
|
||||||
|
changes["total_knowledge_change"] = today_knowledge - today_deleted_knowledge - yesterday_knowledge
|
||||||
|
except Exception as e:
|
||||||
|
business_logger.warning(f"计算知识库昨日对比失败: {str(e)}")
|
||||||
|
|
||||||
|
# --- total_app_change ---
|
||||||
|
try:
|
||||||
|
# === 自有app ===
|
||||||
|
# 今天有效总量
|
||||||
|
today_own_apps = db.query(func.count(App.id)).filter(
|
||||||
|
App.workspace_id == workspace_id,
|
||||||
|
App.is_active == True
|
||||||
|
).scalar() or 0
|
||||||
|
# 昨日有效总量
|
||||||
|
yesterday_own_apps = db.query(func.count(App.id)).filter(
|
||||||
|
App.workspace_id == workspace_id,
|
||||||
|
App.is_active == True,
|
||||||
|
App.created_at < today_start
|
||||||
|
).scalar() or 0
|
||||||
|
# 今日软删
|
||||||
|
today_deleted_own_apps = db.query(func.count(App.id)).filter(
|
||||||
|
App.workspace_id == workspace_id,
|
||||||
|
App.is_active == False,
|
||||||
|
App.updated_at >= today_start
|
||||||
|
).scalar() or 0
|
||||||
|
|
||||||
|
# === 被分享app ===
|
||||||
|
# 今天有效总量
|
||||||
|
today_shared_apps = db.query(func.count(AppShare.id)).filter(
|
||||||
|
AppShare.target_workspace_id == workspace_id,
|
||||||
|
AppShare.is_active == True
|
||||||
|
).scalar() or 0
|
||||||
|
# 昨日有效总量
|
||||||
|
yesterday_shared_apps = db.query(func.count(AppShare.id)).filter(
|
||||||
|
AppShare.target_workspace_id == workspace_id,
|
||||||
|
AppShare.is_active == True,
|
||||||
|
AppShare.created_at < today_start
|
||||||
|
).scalar() or 0
|
||||||
|
# 今日软删
|
||||||
|
today_deleted_shared_apps = db.query(func.count(AppShare.id)).filter(
|
||||||
|
AppShare.target_workspace_id == workspace_id,
|
||||||
|
AppShare.is_active == False,
|
||||||
|
AppShare.updated_at >= today_start
|
||||||
|
).scalar() or 0
|
||||||
|
|
||||||
|
today_total_app = today_own_apps + today_shared_apps
|
||||||
|
yesterday_total_app = yesterday_own_apps + yesterday_shared_apps
|
||||||
|
total_deleted = today_deleted_own_apps + today_deleted_shared_apps
|
||||||
|
|
||||||
|
if yesterday_total_app == 0 and today_total_app == 0 and total_deleted == 0:
|
||||||
|
changes["total_app_change"] = None
|
||||||
|
else:
|
||||||
|
# change = 今天有效总量 - 今日软删 - 昨日有效总量
|
||||||
|
changes["total_app_change"] = today_total_app - total_deleted - yesterday_total_app
|
||||||
|
except Exception as e:
|
||||||
|
business_logger.warning(f"计算应用数量昨日对比失败: {str(e)}")
|
||||||
|
|
||||||
|
# --- total_memory_change ---
|
||||||
|
try:
|
||||||
|
today_memory = today_data.get("total_memory")
|
||||||
|
if today_memory is None:
|
||||||
|
changes["total_memory_change"] = None
|
||||||
|
elif storage_type == "neo4j":
|
||||||
|
# 从 memory_increments 取最近一条 created_at < today_start 的记录
|
||||||
|
last_record = db.query(MemoryIncrement).filter(
|
||||||
|
MemoryIncrement.workspace_id == workspace_id,
|
||||||
|
MemoryIncrement.created_at < today_start
|
||||||
|
).order_by(desc(MemoryIncrement.created_at)).first()
|
||||||
|
if last_record is None:
|
||||||
|
changes["total_memory_change"] = None
|
||||||
|
else:
|
||||||
|
changes["total_memory_change"] = today_memory - last_record.total_num
|
||||||
|
elif storage_type == "rag":
|
||||||
|
# RAG: 查 documents 表中 created_at < today_start 的 chunk_num 之和
|
||||||
|
from app.models.document_model import Document
|
||||||
|
from app.models.end_user_model import EndUser as _EndUser
|
||||||
|
from app.models.app_model import App as _App
|
||||||
|
|
||||||
|
end_user_ids = [
|
||||||
|
str(eid) for (eid,) in db.query(_EndUser.id)
|
||||||
|
.join(_App, _EndUser.app_id == _App.id)
|
||||||
|
.filter(_App.workspace_id == workspace_id)
|
||||||
|
.all()
|
||||||
|
]
|
||||||
|
if not end_user_ids:
|
||||||
|
changes["total_memory_change"] = None
|
||||||
|
else:
|
||||||
|
file_names = [f"{uid}.txt" for uid in end_user_ids]
|
||||||
|
yesterday_chunk = db.query(func.sum(Document.chunk_num)).filter(
|
||||||
|
Document.file_name.in_(file_names),
|
||||||
|
Document.created_at < today_start
|
||||||
|
).scalar()
|
||||||
|
if yesterday_chunk is None:
|
||||||
|
changes["total_memory_change"] = None
|
||||||
|
else:
|
||||||
|
changes["total_memory_change"] = today_memory - int(yesterday_chunk)
|
||||||
|
except Exception as e:
|
||||||
|
business_logger.warning(f"计算记忆总量昨日对比失败: {str(e)}")
|
||||||
|
|
||||||
|
business_logger.info(f"昨日对比计算完成: {changes}")
|
||||||
|
return changes
|
||||||
|
|
||||||
|
|
||||||
def get_current_user_total_chunk(
|
def get_current_user_total_chunk(
|
||||||
end_user_id: str,
|
end_user_id: str,
|
||||||
db: Session,
|
db: Session,
|
||||||
@@ -882,3 +1069,65 @@ async def generate_rag_profile(
|
|||||||
"personas_count": len(personas),
|
"personas_count": len(personas),
|
||||||
"insight_generated": bool(insight_sections.get("memory_insight")),
|
"insight_generated": bool(insight_sections.get("memory_insight")),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_dashboard_common_stats(db: Session, workspace_id) -> dict:
|
||||||
|
"""
|
||||||
|
获取 dashboard 中 neo4j/rag 分支共享的统计数据:
|
||||||
|
total_app、total_knowledge、total_api_call
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: {"total_app": int, "total_knowledge": int, "total_api_call": int}
|
||||||
|
"""
|
||||||
|
result = {"total_app": 0, "total_knowledge": 0, "total_api_call": 0}
|
||||||
|
|
||||||
|
# total_app: 统计当前空间下的所有app数量(包含自有 + 被分享给本工作空间的app)
|
||||||
|
try:
|
||||||
|
from app.services import app_service as _app_svc
|
||||||
|
_, total_app = _app_svc.AppService(db).list_apps(
|
||||||
|
workspace_id=workspace_id, include_shared=True, pagesize=1
|
||||||
|
)
|
||||||
|
result["total_app"] = total_app
|
||||||
|
except Exception as e:
|
||||||
|
business_logger.warning(f"获取应用数量失败: {e}")
|
||||||
|
|
||||||
|
# total_knowledge: 统计顶层知识库(parent_id = workspace_id)
|
||||||
|
try:
|
||||||
|
from sqlalchemy import func as _func
|
||||||
|
from app.models.knowledge_model import Knowledge as _Knowledge
|
||||||
|
total_knowledge = db.query(_func.count(_Knowledge.id)).filter(
|
||||||
|
_Knowledge.workspace_id == workspace_id,
|
||||||
|
_Knowledge.status == 1,
|
||||||
|
_Knowledge.parent_id == _Knowledge.workspace_id
|
||||||
|
).scalar() or 0
|
||||||
|
result["total_knowledge"] = total_knowledge
|
||||||
|
except Exception as e:
|
||||||
|
business_logger.warning(f"获取知识库数量失败: {e}")
|
||||||
|
|
||||||
|
# total_api_call: 仅统计当天 api_key_log 调用次数
|
||||||
|
try:
|
||||||
|
from datetime import datetime
|
||||||
|
from sqlalchemy import func as _api_func
|
||||||
|
from app.models.api_key_model import ApiKey as _ApiKey, ApiKeyLog as _ApiKeyLog
|
||||||
|
|
||||||
|
_now = datetime.now()
|
||||||
|
_today_start = _now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||||
|
|
||||||
|
_api_key_ids = [
|
||||||
|
row[0] for row in db.query(_ApiKey.id).filter(
|
||||||
|
_ApiKey.workspace_id == workspace_id
|
||||||
|
).all()
|
||||||
|
]
|
||||||
|
if _api_key_ids:
|
||||||
|
total_api_calls = db.query(_api_func.count(_ApiKeyLog.id)).filter(
|
||||||
|
_ApiKeyLog.api_key_id.in_(_api_key_ids),
|
||||||
|
_ApiKeyLog.created_at >= _today_start,
|
||||||
|
_ApiKeyLog.created_at < _now
|
||||||
|
).scalar() or 0
|
||||||
|
else:
|
||||||
|
total_api_calls = 0
|
||||||
|
result["total_api_call"] = total_api_calls
|
||||||
|
except Exception as e:
|
||||||
|
business_logger.warning(f"获取API调用统计失败: {e}")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|||||||
@@ -613,37 +613,6 @@ async def search_entity(end_user_id: Optional[str] = None) -> Dict[str, Any]:
|
|||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
async def search_all(end_user_id: Optional[str] = None) -> Dict[str, Any]:
|
|
||||||
result = await _neo4j_connector.execute_query(
|
|
||||||
MemoryConfigRepository.SEARCH_FOR_ALL,
|
|
||||||
end_user_id=end_user_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
# 检查结果是否为空或长度不足
|
|
||||||
if not result or len(result) < 4:
|
|
||||||
data = {
|
|
||||||
"total": 0,
|
|
||||||
"counts": {
|
|
||||||
"dialogue": 0,
|
|
||||||
"chunk": 0,
|
|
||||||
"statement": 0,
|
|
||||||
"entity": 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
return data
|
|
||||||
|
|
||||||
data = {
|
|
||||||
"total": result[-1]["Count"],
|
|
||||||
"counts": {
|
|
||||||
"dialogue": result[0]["Count"],
|
|
||||||
"chunk": result[1]["Count"],
|
|
||||||
"statement": result[2]["Count"],
|
|
||||||
"entity": result[3]["Count"],
|
|
||||||
},
|
|
||||||
}
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
async def kb_type_distribution(end_user_id: Optional[str] = None) -> Dict[str, Any]:
|
async def kb_type_distribution(end_user_id: Optional[str] = None) -> Dict[str, Any]:
|
||||||
"""统一知识库类型分布接口。
|
"""统一知识库类型分布接口。
|
||||||
|
|
||||||
|
|||||||
@@ -1324,7 +1324,7 @@ def write_total_memory_task(workspace_id: str) -> Dict[str, Any]:
|
|||||||
from app.models.app_model import App
|
from app.models.app_model import App
|
||||||
from app.models.end_user_model import EndUser
|
from app.models.end_user_model import EndUser
|
||||||
from app.repositories.memory_increment_repository import write_memory_increment
|
from app.repositories.memory_increment_repository import write_memory_increment
|
||||||
from app.services.memory_storage_service import search_all
|
from app.services.memory_storage_service import search_all_batch
|
||||||
|
|
||||||
with get_db_context() as db:
|
with get_db_context() as db:
|
||||||
try:
|
try:
|
||||||
@@ -1358,27 +1358,15 @@ def write_total_memory_task(workspace_id: str) -> Dict[str, Any]:
|
|||||||
EndUser.workspace_id == workspace_id
|
EndUser.workspace_id == workspace_id
|
||||||
).distinct().all()
|
).distinct().all()
|
||||||
|
|
||||||
# 3. 遍历所有end_user,查询每个宿主的记忆总量并累加
|
# 3. 批量查询所有宿主的记忆总量
|
||||||
total_num = 0
|
end_user_id_list = [str(eid) for (eid,) in end_users]
|
||||||
end_user_details = []
|
batch_result = await search_all_batch(end_user_id_list)
|
||||||
|
|
||||||
for (end_user_id,) in end_users:
|
total_num = sum(batch_result.values())
|
||||||
try:
|
end_user_details = [
|
||||||
# 调用 search_all 接口查询该宿主的总量
|
{"end_user_id": uid, "total": batch_result.get(uid, 0)}
|
||||||
result = await search_all(str(end_user_id))
|
for uid in end_user_id_list
|
||||||
user_total = result.get("total", 0)
|
]
|
||||||
total_num += user_total
|
|
||||||
end_user_details.append({
|
|
||||||
"end_user_id": str(end_user_id),
|
|
||||||
"total": user_total
|
|
||||||
})
|
|
||||||
except Exception as e:
|
|
||||||
# 记录单个用户查询失败,但继续处理其他用户
|
|
||||||
end_user_details.append({
|
|
||||||
"end_user_id": str(end_user_id),
|
|
||||||
"total": 0,
|
|
||||||
"error": str(e)
|
|
||||||
})
|
|
||||||
|
|
||||||
# 4. 写入数据库
|
# 4. 写入数据库
|
||||||
memory_increment = write_memory_increment(
|
memory_increment = write_memory_increment(
|
||||||
@@ -1441,7 +1429,7 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
|
|||||||
from app.models.end_user_model import EndUser
|
from app.models.end_user_model import EndUser
|
||||||
from app.models.workspace_model import Workspace
|
from app.models.workspace_model import Workspace
|
||||||
from app.repositories.memory_increment_repository import write_memory_increment
|
from app.repositories.memory_increment_repository import write_memory_increment
|
||||||
from app.services.memory_storage_service import search_all
|
from app.services.memory_storage_service import search_all_batch
|
||||||
|
|
||||||
with get_db_context() as db:
|
with get_db_context() as db:
|
||||||
try:
|
try:
|
||||||
@@ -1499,28 +1487,15 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
|
|||||||
EndUser.workspace_id == workspace_id
|
EndUser.workspace_id == workspace_id
|
||||||
).distinct().all()
|
).distinct().all()
|
||||||
|
|
||||||
# 3. 遍历所有end_user,查询每个宿主的记忆总量并累加
|
# 3. 批量查询所有宿主的记忆总量
|
||||||
total_num = 0
|
end_user_id_list = [str(eid) for (eid,) in end_users]
|
||||||
end_user_details = []
|
batch_result = await search_all_batch(end_user_id_list)
|
||||||
|
|
||||||
for (end_user_id,) in end_users:
|
total_num = sum(batch_result.values())
|
||||||
try:
|
end_user_details = [
|
||||||
# 调用 search_all 接口查询该宿主的总量
|
{"end_user_id": uid, "total": batch_result.get(uid, 0)}
|
||||||
result = await search_all(str(end_user_id))
|
for uid in end_user_id_list
|
||||||
user_total = result.get("total", 0)
|
]
|
||||||
total_num += user_total
|
|
||||||
end_user_details.append({
|
|
||||||
"end_user_id": str(end_user_id),
|
|
||||||
"total": user_total
|
|
||||||
})
|
|
||||||
except Exception as e:
|
|
||||||
# 记录单个用户查询失败,但继续处理其他用户
|
|
||||||
logger.warning(f"查询用户 {end_user_id} 记忆失败: {str(e)}")
|
|
||||||
end_user_details.append({
|
|
||||||
"end_user_id": str(end_user_id),
|
|
||||||
"total": 0,
|
|
||||||
"error": str(e)
|
|
||||||
})
|
|
||||||
|
|
||||||
# 4. 写入数据库
|
# 4. 写入数据库
|
||||||
memory_increment = write_memory_increment(
|
memory_increment = write_memory_increment(
|
||||||
|
|||||||
@@ -6,13 +6,13 @@
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import time
|
import time
|
||||||
from contextlib import contextmanager
|
from contextlib import asynccontextmanager, contextmanager
|
||||||
from app.core.logging_config import get_api_logger
|
from app.core.logging_config import get_api_logger
|
||||||
|
|
||||||
# 获取API专用日志器
|
# 获取API专用日志器
|
||||||
api_logger = get_api_logger()
|
api_logger = get_api_logger()
|
||||||
|
|
||||||
|
# 同步的上下文管理器,使用@contextmanager修饰
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def timer(label: str, user_count: int = 0):
|
def timer(label: str, user_count: int = 0):
|
||||||
"""上下文管理器:用于测量代码块执行时间
|
"""上下文管理器:用于测量代码块执行时间
|
||||||
@@ -35,3 +35,27 @@ def timer(label: str, user_count: int = 0):
|
|||||||
elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒
|
elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒
|
||||||
extra_info = f", 用户数: {user_count}" if user_count > 0 else ""
|
extra_info = f", 用户数: {user_count}" if user_count > 0 else ""
|
||||||
api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}")
|
api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}")
|
||||||
|
|
||||||
|
# 异步的上下文管理器,使用@asynccontextmanager装饰
|
||||||
|
@asynccontextmanager
|
||||||
|
async def async_timer(label: str, user_count: int = 0):
|
||||||
|
"""异步上下文管理器:用于测量包含 await 的异步代码块执行时间
|
||||||
|
|
||||||
|
Args:
|
||||||
|
label: 统计标签,用于标识被测量的代码块
|
||||||
|
user_count: 用户数,可选参数,用于记录处理的用户数量
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
async with async_timer("获取用户列表"):
|
||||||
|
users = await get_users()
|
||||||
|
|
||||||
|
async with async_timer("批量处理", user_count=len(user_ids)):
|
||||||
|
await process_users(user_ids)
|
||||||
|
"""
|
||||||
|
start = time.perf_counter()
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
elapsed = (time.perf_counter() - start) * 1000 # 转换为毫秒
|
||||||
|
extra_info = f", 用户数: {user_count}" if user_count > 0 else ""
|
||||||
|
api_logger.info(f"[性能统计] {label}: {elapsed:.2f}ms{extra_info}")
|
||||||
|
|||||||
Reference in New Issue
Block a user