feat:(controllers & services) Changes in data from yesterday to today

This commit is contained in:
lanceyq
2026-04-01 18:17:25 +08:00
parent cf519738f4
commit 6eca5f6cdf
4 changed files with 320 additions and 59 deletions

View File

@@ -519,6 +519,206 @@ def get_rag_user_kb_total_chunk(
business_logger.error(f"获取用户知识库总chunk数失败: workspace_id={workspace_id} - {str(e)}")
raise
def get_dashboard_yesterday_changes(
db: Session,
workspace_id: uuid.UUID,
storage_type: str,
today_data: dict
) -> dict:
"""
计算各指标相比昨天的变化量。
Args:
db: 数据库会话
workspace_id: 工作空间ID
storage_type: 存储类型 'neo4j' | 'rag'
today_data: 当前数据,包含 total_memory, total_app, total_knowledge, total_api_call
Returns:
{
"total_memory_change": int | None,
"total_app_change": int | None,
"total_knowledge_change": int | None,
"total_api_call_change": int | None
}
"""
from datetime import datetime, timedelta
from sqlalchemy import func
from app.models.api_key_model import ApiKey, ApiKeyLog
from app.models.knowledge_model import Knowledge
from app.models.app_model import App
from app.models.appshare_model import AppShare
business_logger.info(f"计算昨日对比: workspace_id={workspace_id}, storage_type={storage_type}")
now_local = datetime.now()
today_start = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_start = today_start - timedelta(days=1)
changes = {
"total_memory_change": None,
"total_app_change": None,
"total_knowledge_change": None,
"total_api_call_change": None,
}
# --- total_api_call_change ---
try:
# 获取该workspace下所有api_key的id
api_key_ids = [
row[0] for row in db.query(ApiKey.id).filter(
ApiKey.workspace_id == workspace_id
).all()
]
if api_key_ids:
# 今日累计
today_api_count = db.query(func.count(ApiKeyLog.id)).filter(
ApiKeyLog.api_key_id.in_(api_key_ids),
ApiKeyLog.created_at >= today_start,
ApiKeyLog.created_at < now_local
).scalar() or 0
# 昨日全天
yesterday_api_count = db.query(func.count(ApiKeyLog.id)).filter(
ApiKeyLog.api_key_id.in_(api_key_ids),
ApiKeyLog.created_at >= yesterday_start,
ApiKeyLog.created_at < today_start
).scalar() or 0
changes["total_api_call_change"] = today_api_count - yesterday_api_count
else:
# 没有api_key如果今日也是0则无对比意义
changes["total_api_call_change"] = None
except Exception as e:
business_logger.warning(f"计算API调用昨日对比失败: {str(e)}")
# --- total_knowledge_change ---
try:
# 今天有效总量当前status=1的知识库总数排除用户知识库(permission_id='Memory')
today_knowledge = db.query(func.count(Knowledge.id)).filter(
Knowledge.workspace_id == workspace_id,
Knowledge.status == 1,
Knowledge.permission_id != "Memory"
).scalar() or 0
# 昨日有效总量:昨天之前创建的、当前仍有效的知识库,排除用户知识库
yesterday_knowledge = db.query(func.count(Knowledge.id)).filter(
Knowledge.workspace_id == workspace_id,
Knowledge.status == 1,
Knowledge.permission_id != "Memory",
Knowledge.created_at < today_start
).scalar() or 0
# 今日软删:今天被软删的知识库(status=2 且 updated_at >= today_start),排除用户知识库
today_deleted_knowledge = db.query(func.count(Knowledge.id)).filter(
Knowledge.workspace_id == workspace_id,
Knowledge.status == 2,
Knowledge.permission_id != "Memory",
Knowledge.updated_at >= today_start
).scalar() or 0
if yesterday_knowledge == 0 and today_knowledge == 0 and today_deleted_knowledge == 0:
changes["total_knowledge_change"] = None
else:
# change = 今天有效总量 - 今日软删 - 昨日有效总量
changes["total_knowledge_change"] = today_knowledge - today_deleted_knowledge - yesterday_knowledge
except Exception as e:
business_logger.warning(f"计算知识库昨日对比失败: {str(e)}")
# --- total_app_change ---
try:
# === 自有app ===
# 今天有效总量
today_own_apps = db.query(func.count(App.id)).filter(
App.workspace_id == workspace_id,
App.is_active == True
).scalar() or 0
# 昨日有效总量
yesterday_own_apps = db.query(func.count(App.id)).filter(
App.workspace_id == workspace_id,
App.is_active == True,
App.created_at < today_start
).scalar() or 0
# 今日软删
today_deleted_own_apps = db.query(func.count(App.id)).filter(
App.workspace_id == workspace_id,
App.is_active == False,
App.updated_at >= today_start
).scalar() or 0
# === 被分享app ===
# 今天有效总量
today_shared_apps = db.query(func.count(AppShare.id)).filter(
AppShare.target_workspace_id == workspace_id,
AppShare.is_active == True
).scalar() or 0
# 昨日有效总量
yesterday_shared_apps = db.query(func.count(AppShare.id)).filter(
AppShare.target_workspace_id == workspace_id,
AppShare.is_active == True,
AppShare.created_at < today_start
).scalar() or 0
# 今日软删
today_deleted_shared_apps = db.query(func.count(AppShare.id)).filter(
AppShare.target_workspace_id == workspace_id,
AppShare.is_active == False,
AppShare.updated_at >= today_start
).scalar() or 0
today_total_app = today_own_apps + today_shared_apps
yesterday_total_app = yesterday_own_apps + yesterday_shared_apps
total_deleted = today_deleted_own_apps + today_deleted_shared_apps
if yesterday_total_app == 0 and today_total_app == 0 and total_deleted == 0:
changes["total_app_change"] = None
else:
# change = 今天有效总量 - 今日软删 - 昨日有效总量
changes["total_app_change"] = today_total_app - total_deleted - yesterday_total_app
except Exception as e:
business_logger.warning(f"计算应用数量昨日对比失败: {str(e)}")
# --- total_memory_change ---
try:
today_memory = today_data.get("total_memory")
if today_memory is None:
changes["total_memory_change"] = None
elif storage_type == "neo4j":
# 从 memory_increments 取最近一条 created_at < today_start 的记录
last_record = db.query(MemoryIncrement).filter(
MemoryIncrement.workspace_id == workspace_id,
MemoryIncrement.created_at < today_start
).order_by(desc(MemoryIncrement.created_at)).first()
if last_record is None:
changes["total_memory_change"] = None
else:
changes["total_memory_change"] = today_memory - last_record.total_num
elif storage_type == "rag":
# RAG: 查 documents 表中 created_at < today_start 的 chunk_num 之和
from app.models.document_model import Document
from app.models.end_user_model import EndUser as _EndUser
from app.models.app_model import App as _App
end_user_ids = [
str(eid) for (eid,) in db.query(_EndUser.id)
.join(_App, _EndUser.app_id == _App.id)
.filter(_App.workspace_id == workspace_id)
.all()
]
if not end_user_ids:
changes["total_memory_change"] = None
else:
file_names = [f"{uid}.txt" for uid in end_user_ids]
yesterday_chunk = db.query(func.sum(Document.chunk_num)).filter(
Document.file_name.in_(file_names),
Document.created_at < today_start
).scalar()
if yesterday_chunk is None:
changes["total_memory_change"] = None
else:
changes["total_memory_change"] = today_memory - int(yesterday_chunk)
except Exception as e:
business_logger.warning(f"计算记忆总量昨日对比失败: {str(e)}")
business_logger.info(f"昨日对比计算完成: {changes}")
return changes
def get_current_user_total_chunk(
end_user_id: str,
db: Session,

View File

@@ -614,34 +614,30 @@ async def search_entity(end_user_id: Optional[str] = None) -> Dict[str, Any]:
async def search_all(end_user_id: Optional[str] = None) -> Dict[str, Any]:
"""查询用户的记忆总量简化版本只返回total
Args:
end_user_id: 用户ID
Returns:
Dict[str, Any]: {"total": int}
"""
if not end_user_id:
return {"total": 0}
result = await _neo4j_connector.execute_query(
MemoryConfigRepository.SEARCH_FOR_ALL,
end_user_id=end_user_id,
MemoryConfigRepository.SEARCH_FOR_ALL_BATCH,
end_user_ids=[end_user_id],
)
# 检查结果是否为空或长度不足
if not result or len(result) < 4:
data = {
"total": 0,
"counts": {
"dialogue": 0,
"chunk": 0,
"statement": 0,
"entity": 0,
},
}
return data
# 从批量查询结果中提取该用户的total
total = 0
for row in result:
if row["user_id"] == end_user_id:
total = row["total"]
break
data = {
"total": result[-1]["Count"],
"counts": {
"dialogue": result[0]["Count"],
"chunk": result[1]["Count"],
"statement": result[2]["Count"],
"entity": result[3]["Count"],
},
}
return data
return {"total": total}
async def kb_type_distribution(end_user_id: Optional[str] = None) -> Dict[str, Any]: