from datetime import datetime, timedelta from sqlalchemy.orm import Session from sqlalchemy import func from uuid import UUID from typing import Dict from app.models.end_user_model import EndUser from app.models.user_model import User from app.models.workspace_model import Workspace, WorkspaceMember from app.models.models_model import ModelConfig from app.models.app_model import App class HomePageRepository: @staticmethod def get_model_statistics(db: Session, tenant_id: UUID, week_start: datetime) -> tuple[int, int, int, float]: """获取模型统计数据""" total_models = db.query(ModelConfig).filter( ModelConfig.tenant_id == tenant_id, ModelConfig.is_active.is_(True) ).count() total_llm = db.query(ModelConfig).filter( ModelConfig.tenant_id == tenant_id, ModelConfig.is_active.is_(True), ModelConfig.type == "llm" ).count() total_embedding = db.query(ModelConfig).filter( ModelConfig.tenant_id == tenant_id, ModelConfig.is_active.is_(True), ModelConfig.type == "embedding" ).count() new_models_this_week = db.query(ModelConfig).filter( ModelConfig.tenant_id == tenant_id, ModelConfig.is_active.is_(True), ModelConfig.created_at >= week_start ).count() if total_models == 0: growth_rate = 0.0 elif new_models_this_week == 0: growth_rate = 0.0 else: last_week_total = total_models - new_models_this_week if last_week_total == 0: growth_rate = 100.0 else: growth_rate = round((new_models_this_week / last_week_total) * 100, 2) return total_models, total_llm, total_embedding, growth_rate @staticmethod def get_workspace_statistics(db: Session, tenant_id: UUID, week_start: datetime) -> tuple[int, int, float]: """获取工作空间统计数据""" active_workspaces = db.query(Workspace).filter( Workspace.tenant_id == tenant_id, Workspace.is_active.is_(True) ).count() new_workspaces_this_week = db.query(Workspace).filter( Workspace.tenant_id == tenant_id, Workspace.is_active.is_(True), Workspace.created_at >= week_start ).count() if active_workspaces == 0: growth_rate = 0.0 elif new_workspaces_this_week == 0: growth_rate = 0.0 else: last_week_workspaces = active_workspaces - new_workspaces_this_week if last_week_workspaces == 0: growth_rate = 100.0 else: growth_rate = round((new_workspaces_this_week / last_week_workspaces) * 100, 2) return active_workspaces, new_workspaces_this_week, growth_rate @staticmethod def get_user_statistics(db: Session, tenant_id: UUID, week_start: datetime) -> tuple[int, int, float]: """获取用户统计数据""" workspace_ids = db.query(Workspace.id).filter( Workspace.tenant_id == tenant_id, Workspace.is_active.is_(True) ).subquery() total_users = db.query(EndUser).join( App, EndUser.app_id == App.id ).filter( App.workspace_id.in_(workspace_ids), App.is_active.is_(True), App.status == "active" ).count() new_users_this_week = db.query(EndUser).join( App, EndUser.app_id == App.id ).filter( App.workspace_id.in_(workspace_ids), App.is_active.is_(True), App.status == "active", EndUser.created_at >= week_start ).count() if total_users == 0: growth_rate = 0.0 elif new_users_this_week == 0: growth_rate = 0.0 else: last_week_users = total_users - new_users_this_week if last_week_users == 0: growth_rate = 100.0 else: growth_rate = round((new_users_this_week / last_week_users) * 100, 2) return total_users, new_users_this_week, growth_rate @staticmethod def get_app_statistics(db: Session, tenant_id: UUID, week_start: datetime) -> tuple[int, int, float]: """获取应用统计数据""" workspace_ids = db.query(Workspace.id).filter( Workspace.tenant_id == tenant_id, Workspace.is_active.is_(True) ).subquery() running_apps = db.query(App).filter( App.workspace_id.in_(workspace_ids), App.is_active.is_(True), App.status == "active" ).count() new_apps_this_week = db.query(App).filter( App.workspace_id.in_(workspace_ids), App.is_active.is_(True), App.status == "active", App.created_at >= week_start ).count() if running_apps == 0: growth_rate = 0.0 elif new_apps_this_week == 0: growth_rate = 0.0 else: last_week_apps = running_apps - new_apps_this_week if last_week_apps == 0: growth_rate = 100.0 else: growth_rate = round((new_apps_this_week / last_week_apps) * 100, 2) return running_apps, new_apps_this_week, growth_rate @staticmethod def get_workspaces_with_counts(db: Session, tenant_id: UUID) -> tuple[list[Workspace], Dict[UUID, int], Dict[UUID, int]]: """批量获取工作空间及其统计数据""" # 获取工作空间列表 workspaces = db.query(Workspace).filter( Workspace.tenant_id == tenant_id, Workspace.is_active.is_(True) ).all() workspace_ids = [ws.id for ws in workspaces] # 批量获取应用数量 app_counts = db.query( App.workspace_id, func.count(App.id).label('count') ).filter( App.workspace_id.in_(workspace_ids), App.is_active.is_(True), App.status == "active" ).group_by(App.workspace_id).all() app_count_dict = {workspace_id: count for workspace_id, count in app_counts} # 批量获取用户数量 user_counts = db.query( App.workspace_id, func.count(EndUser.id).label('count') ).join( EndUser, EndUser.app_id == App.id ).filter( App.workspace_id.in_(workspace_ids), App.is_active.is_(True), App.status == "active" ).group_by(App.workspace_id).all() user_count_dict = {workspace_id: count for workspace_id, count in user_counts} return workspaces, app_count_dict, user_count_dict