Files
MemoryBear/api/app/repositories/home_page_repository.py

252 lines
9.0 KiB
Python

from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import func
from uuid import UUID
from typing import Dict, Optional, Any
from app.models.end_user_model import EndUser
from app.models.user_model import User
from app.models.workspace_model import Workspace, WorkspaceMember
from app.models.models_model import ModelConfig
from app.models.app_model import App
class HomePageRepository:
@staticmethod
def get_model_statistics(db: Session, tenant_id: UUID, week_start: datetime) -> tuple[int, int, int, float]:
"""获取模型统计数据"""
total_models = db.query(ModelConfig).filter(
ModelConfig.tenant_id == tenant_id,
ModelConfig.is_active.is_(True)
).count()
total_llm = db.query(ModelConfig).filter(
ModelConfig.tenant_id == tenant_id,
ModelConfig.is_active.is_(True),
ModelConfig.type == "llm"
).count()
total_embedding = db.query(ModelConfig).filter(
ModelConfig.tenant_id == tenant_id,
ModelConfig.is_active.is_(True),
ModelConfig.type == "embedding"
).count()
new_models_this_week = db.query(ModelConfig).filter(
ModelConfig.tenant_id == tenant_id,
ModelConfig.is_active.is_(True),
ModelConfig.created_at >= week_start
).count()
if total_models == 0:
growth_rate = 0.0
elif new_models_this_week == 0:
growth_rate = 0.0
else:
last_week_total = total_models - new_models_this_week
if last_week_total == 0:
growth_rate = 100.0
else:
growth_rate = round((new_models_this_week / last_week_total) * 100, 2)
return total_models, total_llm, total_embedding, growth_rate
@staticmethod
def get_workspace_statistics(db: Session, tenant_id: UUID, week_start: datetime) -> tuple[int, int, float]:
"""获取工作空间统计数据"""
active_workspaces = db.query(Workspace).filter(
Workspace.tenant_id == tenant_id,
Workspace.is_active.is_(True)
).count()
new_workspaces_this_week = db.query(Workspace).filter(
Workspace.tenant_id == tenant_id,
Workspace.is_active.is_(True),
Workspace.created_at >= week_start
).count()
if active_workspaces == 0:
growth_rate = 0.0
elif new_workspaces_this_week == 0:
growth_rate = 0.0
else:
last_week_workspaces = active_workspaces - new_workspaces_this_week
if last_week_workspaces == 0:
growth_rate = 100.0
else:
growth_rate = round((new_workspaces_this_week / last_week_workspaces) * 100, 2)
return active_workspaces, new_workspaces_this_week, growth_rate
@staticmethod
def get_user_statistics(db: Session, tenant_id: UUID, week_start: datetime) -> tuple[int, int, float]:
"""获取用户统计数据"""
workspace_ids = db.query(Workspace.id).filter(
Workspace.tenant_id == tenant_id,
Workspace.is_active.is_(True)
).subquery()
total_users = db.query(EndUser).join(
App,
EndUser.app_id == App.id
).filter(
App.workspace_id.in_(workspace_ids),
App.is_active.is_(True),
App.status == "active"
).count()
new_users_this_week = db.query(EndUser).join(
App,
EndUser.app_id == App.id
).filter(
App.workspace_id.in_(workspace_ids),
App.is_active.is_(True),
App.status == "active",
EndUser.created_at >= week_start
).count()
if total_users == 0:
growth_rate = 0.0
elif new_users_this_week == 0:
growth_rate = 0.0
else:
last_week_users = total_users - new_users_this_week
if last_week_users == 0:
growth_rate = 100.0
else:
growth_rate = round((new_users_this_week / last_week_users) * 100, 2)
return total_users, new_users_this_week, growth_rate
@staticmethod
def get_app_statistics(db: Session, tenant_id: UUID, week_start: datetime) -> tuple[int, int, float]:
"""获取应用统计数据"""
workspace_ids = db.query(Workspace.id).filter(
Workspace.tenant_id == tenant_id,
Workspace.is_active.is_(True)
).subquery()
running_apps = db.query(App).filter(
App.workspace_id.in_(workspace_ids),
App.is_active.is_(True),
App.status == "active"
).count()
new_apps_this_week = db.query(App).filter(
App.workspace_id.in_(workspace_ids),
App.is_active.is_(True),
App.status == "active",
App.created_at >= week_start
).count()
if running_apps == 0:
growth_rate = 0.0
elif new_apps_this_week == 0:
growth_rate = 0.0
else:
last_week_apps = running_apps - new_apps_this_week
if last_week_apps == 0:
growth_rate = 100.0
else:
growth_rate = round((new_apps_this_week / last_week_apps) * 100, 2)
return running_apps, new_apps_this_week, growth_rate
@staticmethod
def get_workspaces_with_counts(db: Session, tenant_id: UUID) -> tuple[list[Workspace], Dict[UUID, int], Dict[UUID, int]]:
"""批量获取工作空间及其统计数据"""
# 获取工作空间列表
workspaces = db.query(Workspace).filter(
Workspace.tenant_id == tenant_id,
Workspace.is_active.is_(True)
).all()
workspace_ids = [ws.id for ws in workspaces]
# 批量获取应用数量
app_counts = db.query(
App.workspace_id,
func.count(App.id).label('count')
).filter(
App.workspace_id.in_(workspace_ids),
App.is_active.is_(True),
App.status == "active"
).group_by(App.workspace_id).all()
app_count_dict = {workspace_id: count for workspace_id, count in app_counts}
# 批量获取用户数量
user_counts = db.query(
App.workspace_id,
func.count(EndUser.id).label('count')
).join(
EndUser,
EndUser.app_id == App.id
).filter(
App.workspace_id.in_(workspace_ids),
App.is_active.is_(True),
App.status == "active"
).group_by(App.workspace_id).all()
user_count_dict = {workspace_id: count for workspace_id, count in user_counts}
return workspaces, app_count_dict, user_count_dict
@staticmethod
def get_version_introduction(db: Session, version: str) -> Optional[Dict[str, Any]]:
"""
从数据库获取版本说明(优先读取已发布的版本)
使用反射方式读取表结构,不依赖 premium 模型类
Args:
db: 数据库会话
version: 版本号,如 "v0.2.7"
Returns:
版本说明字典,格式与 version_info.json 一致
如果数据库中没有该版本,返回 None
"""
try:
from sqlalchemy import Table, MetaData
metadata = MetaData()
version_notes = Table('version_notes', metadata, autoload_with=db.engine)
version_note_items = Table('version_note_items', metadata, autoload_with=db.engine)
note = db.query(version_notes).filter(
version_notes.c.version == version,
version_notes.c.is_published == True
).first()
if not note:
return None
items = db.query(version_note_items).filter(
version_note_items.c.note_id == note.id
).order_by(version_note_items.c.sort_order).all()
core_upgrades = []
for item in items:
title = item.title
content = item.content
if content:
core_upgrades.append(f"{title}<br>{content}")
else:
core_upgrades.append(title)
return {
"introduction": {
"codeName": "",
"releaseDate": note.release_date.isoformat() if note.release_date else "",
"upgradePosition": "",
"coreUpgrades": core_upgrades
},
"introduction_en": {
"codeName": "",
"releaseDate": note.release_date.isoformat() if note.release_date else "",
"upgradePosition": "",
"coreUpgrades": core_upgrades
}
}
except Exception:
return None