From 1faa258e233eb5712526ac1abe9086ac5a165035 Mon Sep 17 00:00:00 2001 From: wwq Date: Wed, 15 Apr 2026 18:48:09 +0800 Subject: [PATCH] feat(quota): implement unified quota management system and add community free plan - Add `default_free_plan.py` to define the configuration for the Community Free Plan. - Refactor `quota_stub.py` as a unified entry point, delegating checks to `core/quota_manager`. - Implement core logic in `quota_manager.py` to support retrieving quotas from the premium module or configuration files. - Update `tenant_subscription_controller` to return Community Free Plan information. --- api/app/config/default_free_plan.py | 30 ++ .../tenant_subscription_controller.py | 33 +- api/app/core/quota_manager.py | 473 ++++++++++++++++++ api/app/core/quota_stub.py | 74 ++- 4 files changed, 567 insertions(+), 43 deletions(-) create mode 100644 api/app/config/default_free_plan.py create mode 100644 api/app/core/quota_manager.py diff --git a/api/app/config/default_free_plan.py b/api/app/config/default_free_plan.py new file mode 100644 index 00000000..23a3a10e --- /dev/null +++ b/api/app/config/default_free_plan.py @@ -0,0 +1,30 @@ +""" +社区版默认免费套餐配置 +当无法从 SaaS 版获取 premium 模块时,使用此配置作为兜底 +""" + +DEFAULT_FREE_PLAN = { + "name": "记忆体验版", + "category": "saas_personal", + "tier_level": 0, + "version": "1.0", + "status": True, + "price": 0, + "billing_cycle": "permanent_free", + "core_value": "感受永久记忆", + "tech_support": "社群交流", + "sla_compliance": "无", + "page_customization": "无", + "theme_color": "#64748B", + "quotas": { + "workspace_quota": 1, + "skill_quota": 5, + "app_quota": 2, + "knowledge_capacity_quota": 0.3, + "memory_engine_quota": 1, + "end_user_quota": 1, + "ontology_project_quota": 3, + "model_quota": 1, + "api_ops_rate_limit": 50, + }, +} diff --git a/api/app/controllers/tenant_subscription_controller.py b/api/app/controllers/tenant_subscription_controller.py index 2629f7f1..c3fde572 100644 --- a/api/app/controllers/tenant_subscription_controller.py +++ b/api/app/controllers/tenant_subscription_controller.py @@ -1,6 +1,7 @@ """ 租户套餐查询接口(普通用户可访问) """ +import datetime from typing import Callable from fastapi import APIRouter, Depends @@ -46,8 +47,36 @@ async def get_my_tenant_subscription( return success(data=svc.build_response(sub)) except ModuleNotFoundError: - # 社区版无 premium 模块,返回空 - return success(data=None, msg="套餐功能未启用") + # 社区版无 premium 模块,从配置文件读取免费套餐 + if not current_user.tenant: + return JSONResponse(status_code=404, content=fail(code=404, msg="用户未关联租户")) + + from app.config.default_free_plan import DEFAULT_FREE_PLAN + + plan = DEFAULT_FREE_PLAN + response_data = { + "subscription_id": None, + "tenant_id": str(current_user.tenant.id), + "package_plan_id": None, + "package_version": plan["version"], + "package_plan": { + "id": None, + "name": plan["name"], + "version": plan["version"], + "category": plan["category"], + "tier_level": plan["tier_level"], + "price": float(plan["price"]), + "billing_cycle": plan["billing_cycle"], + }, + "started_at": None, + "expired_at": None, + "status": "active", + "quota": plan["quotas"], + "created_at": int(datetime.datetime.utcnow().timestamp() * 1000), + "updated_at": int(datetime.datetime.utcnow().timestamp() * 1000), + } + return success(data=response_data, msg="社区版免费套餐") + except Exception as e: logger.error(f"获取租户套餐信息失败: {e}", exc_info=True) return JSONResponse(status_code=500, content=fail(code=500, msg="获取套餐信息失败")) diff --git a/api/app/core/quota_manager.py b/api/app/core/quota_manager.py new file mode 100644 index 00000000..6c02ac7a --- /dev/null +++ b/api/app/core/quota_manager.py @@ -0,0 +1,473 @@ +""" +统一配额管理器 - 社区版和 SaaS 版共用 + +配额来源策略: +1. 优先从 premium 模块的 tenant_subscriptions 表读取(SaaS 版) +2. 降级到 default_free_plan.py 配置文件(社区版兜底) +""" +import asyncio +import time +from functools import wraps +from typing import Optional, Callable, Dict, Any +from uuid import UUID + +from sqlalchemy import func +from sqlalchemy.orm import Session + +from app.core.logging_config import get_auth_logger +from app.i18n.exceptions import QuotaExceededError + +logger = get_auth_logger() + + +def _get_user_from_kwargs(kwargs: dict): + """从 kwargs 中获取 user 对象""" + for key in ["user", "current_user"]: + if key in kwargs: + return kwargs[key] + return None + + +def _get_tenant_id_from_kwargs(db: Session, kwargs: dict): + """从 kwargs 中获取 tenant_id""" + user = _get_user_from_kwargs(kwargs) + if user and hasattr(user, 'tenant_id'): + return user.tenant_id + + workspace_id = kwargs.get("workspace_id") + if workspace_id: + from app.models.workspace_model import Workspace + workspace = db.query(Workspace).filter(Workspace.id == workspace_id).first() + if workspace: + return workspace.tenant_id + + api_key_auth = kwargs.get("api_key_auth") + if api_key_auth and hasattr(api_key_auth, 'workspace_id'): + from app.models.workspace_model import Workspace + workspace = db.query(Workspace).filter(Workspace.id == api_key_auth.workspace_id).first() + if workspace: + return workspace.tenant_id + + data = kwargs.get("data") or kwargs.get("body") or kwargs.get("payload") + if data and hasattr(data, "workspace_id"): + from app.models.workspace_model import Workspace + workspace = db.query(Workspace).filter(Workspace.id == data.workspace_id).first() + if workspace: + return workspace.tenant_id + + return None + + +def _get_quota_config(db: Session, tenant_id: UUID) -> Optional[Dict[str, Any]]: + """ + 获取租户的配额配置 + + 优先级: + 1. premium 模块的 tenant_subscriptions(SaaS 版) + 2. default_free_plan.py 配置文件(社区版兜底) + """ + # 尝试从 premium 模块获取 + try: + from premium.platform_admin.package_plan_service import TenantSubscriptionService + quota_config = TenantSubscriptionService(db).get_effective_quota(tenant_id) + if quota_config: + logger.debug(f"从 premium 模块获取租户 {tenant_id} 配额配置") + return quota_config + except (ModuleNotFoundError, ImportError, Exception) as e: + logger.debug(f"无法从 premium 模块获取配额配置: {e}") + + # 降级到配置文件 + try: + from app.config.default_free_plan import DEFAULT_FREE_PLAN + logger.info(f"使用配置文件中的免费套餐配额: tenant={tenant_id}") + return DEFAULT_FREE_PLAN.get("quotas") + except Exception as e: + logger.error(f"无法从配置文件获取配额: {e}") + return None + + +class QuotaUsageRepository: + """配额使用量数据访问层""" + + def __init__(self, db: Session): + self.db = db + + def count_workspaces(self, tenant_id: UUID) -> int: + from app.models.workspace_model import Workspace + return self.db.query(Workspace).filter( + Workspace.tenant_id == tenant_id, + Workspace.is_active.is_(True) + ).count() + + def count_apps(self, tenant_id: UUID) -> int: + from app.models.app_model import App + from app.models.workspace_model import Workspace + return self.db.query(App).join( + Workspace, App.workspace_id == Workspace.id + ).filter( + Workspace.tenant_id == tenant_id, + App.is_active.is_(True) + ).count() + + def count_skills(self, tenant_id: UUID) -> int: + from app.models.skill_model import Skill + return self.db.query(Skill).filter( + Skill.tenant_id == tenant_id, + Skill.is_active.is_(True) + ).count() + + def sum_knowledge_capacity_gb(self, tenant_id: UUID) -> float: + from app.models.document_model import Document + from app.models.knowledge_model import Knowledge + from app.models.workspace_model import Workspace + result = self.db.query(func.coalesce(func.sum(Document.file_size), 0)).join( + Knowledge, Document.kb_id == Knowledge.id + ).join( + Workspace, Knowledge.workspace_id == Workspace.id + ).filter( + Workspace.tenant_id == tenant_id, + Document.status == 1, + ).scalar() + return float(result) / (1024 ** 3) if result else 0.0 + + def count_memory_engines(self, tenant_id: UUID) -> int: + from app.models.memory_config_model import MemoryConfig + from app.models.workspace_model import Workspace + return self.db.query(MemoryConfig).join( + Workspace, MemoryConfig.workspace_id == Workspace.id + ).filter( + Workspace.tenant_id == tenant_id + ).count() + + def count_end_users(self, tenant_id: UUID) -> int: + from app.models.end_user_model import EndUser + from app.models.workspace_model import Workspace + return self.db.query(EndUser).join( + Workspace, EndUser.workspace_id == Workspace.id + ).filter( + Workspace.tenant_id == tenant_id + ).count() + + def count_models(self, tenant_id: UUID) -> int: + from app.models.models_model import ModelConfig + return self.db.query(ModelConfig).filter( + ModelConfig.tenant_id == tenant_id, + ModelConfig.is_active == True + ).count() + + def count_ontology_projects(self, tenant_id: UUID) -> int: + from app.models.ontology_scene import OntologyScene + from app.models.workspace_model import Workspace + return self.db.query(OntologyScene).join( + Workspace, OntologyScene.workspace_id == Workspace.id + ).filter( + Workspace.tenant_id == tenant_id + ).count() + + def get_usage_by_quota_type(self, tenant_id: UUID, quota_type: str): + """按配额类型分发,返回当前使用量""" + dispatch = { + "workspace_quota": self.count_workspaces, + "app_quota": self.count_apps, + "skill_quota": self.count_skills, + "knowledge_capacity_quota": self.sum_knowledge_capacity_gb, + "memory_engine_quota": self.count_memory_engines, + "end_user_quota": self.count_end_users, + "model_quota": self.count_models, + "ontology_project_quota": self.count_ontology_projects, + } + fn = dispatch.get(quota_type) + return fn(tenant_id) if fn else 0 + + +def _check_quota( + db: Session, + tenant_id: UUID, + quota_type: str, + resource_name: str, + usage_func: Optional[Callable] = None, +) -> None: + """核心配额检查逻辑:对比使用量和配额限制""" + try: + quota_config = _get_quota_config(db, tenant_id) + if not quota_config: + logger.warning(f"租户 {tenant_id} 无有效配额配置,跳过配额检查") + return + + quota_limit = quota_config.get(quota_type) + if quota_limit is None: + logger.warning(f"配额配置未包含 {quota_type},跳过配额检查") + return + + if usage_func: + current_usage = usage_func(db, tenant_id) + else: + current_usage = QuotaUsageRepository(db).get_usage_by_quota_type(tenant_id, quota_type) + + if current_usage >= quota_limit: + logger.warning( + f"配额不足: tenant={tenant_id}, type={quota_type}, " + f"usage={current_usage}, limit={quota_limit}" + ) + raise QuotaExceededError( + resource=resource_name, + current_usage=current_usage, + quota_limit=quota_limit, + ) + + logger.debug( + f"配额检查通过: tenant={tenant_id}, type={quota_type}, " + f"usage={current_usage}, limit={quota_limit}" + ) + + except QuotaExceededError: + raise + except Exception as e: + logger.error( + f"配额检查异常: tenant={tenant_id}, type={quota_type}, " + f"error_type={type(e).__name__}, error={str(e)}", + exc_info=True, + ) + raise + + +# ─── 具名装饰器 ──────────────────────────────────────────────────────────── + +def check_workspace_quota(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.warning("配额检查失败:缺少 db 或 user 参数") + return func(*args, **kwargs) + _check_quota(db, user.tenant_id, "workspace_quota", "workspace") + return func(*args, **kwargs) + return wrapper + + +def check_skill_quota(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.warning("配额检查失败:缺少 db 或 user 参数") + return func(*args, **kwargs) + _check_quota(db, user.tenant_id, "skill_quota", "skill") + return func(*args, **kwargs) + return wrapper + + +def check_app_quota(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.warning("配额检查失败:缺少 db 或 user 参数") + return func(*args, **kwargs) + _check_quota(db, user.tenant_id, "app_quota", "app") + return func(*args, **kwargs) + return wrapper + + +def check_knowledge_capacity_quota(func: Callable) -> Callable: + @wraps(func) + async def async_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + if not db: + logger.warning("配额检查失败:缺少 db 参数") + return await func(*args, **kwargs) + tenant_id = _get_tenant_id_from_kwargs(db, kwargs) + if not tenant_id: + logger.warning("配额检查失败:无法获取 tenant_id") + return await func(*args, **kwargs) + _check_quota(db, tenant_id, "knowledge_capacity_quota", "knowledge_capacity") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.warning("配额检查失败:缺少 db 或 user 参数") + return func(*args, **kwargs) + _check_quota(db, user.tenant_id, "knowledge_capacity_quota", "knowledge_capacity") + return func(*args, **kwargs) + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper + + +def check_memory_engine_quota(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.warning("配额检查失败:缺少 db 或 user 参数") + return func(*args, **kwargs) + _check_quota(db, user.tenant_id, "memory_engine_quota", "memory_engine") + return func(*args, **kwargs) + return wrapper + + +def check_end_user_quota(func: Callable) -> Callable: + @wraps(func) + async def async_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + if not db: + logger.warning("配额检查失败:缺少 db 参数") + return await func(*args, **kwargs) + tenant_id = _get_tenant_id_from_kwargs(db, kwargs) + if not tenant_id: + logger.warning("配额检查失败:无法获取 tenant_id") + return await func(*args, **kwargs) + _check_quota(db, tenant_id, "end_user_quota", "end_user") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + if not db: + logger.warning("配额检查失败:缺少 db 参数") + return func(*args, **kwargs) + tenant_id = _get_tenant_id_from_kwargs(db, kwargs) + if not tenant_id: + logger.warning("配额检查失败:无法获取 tenant_id") + return func(*args, **kwargs) + _check_quota(db, tenant_id, "end_user_quota", "end_user") + return func(*args, **kwargs) + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper + + +def check_ontology_project_quota(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.warning("配额检查失败:缺少 db 或 user 参数") + return func(*args, **kwargs) + _check_quota(db, user.tenant_id, "ontology_project_quota", "ontology_project") + return func(*args, **kwargs) + return wrapper + + +def check_model_quota(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.warning("配额检查失败:缺少 db 或 user 参数") + return func(*args, **kwargs) + _check_quota(db, user.tenant_id, "model_quota", "model") + return func(*args, **kwargs) + return wrapper + + +def check_model_activation_quota(func: Callable) -> Callable: + """模型激活时的配额检查装饰器""" + @wraps(func) + def wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.warning("配额检查失败:缺少 db 或 user 参数") + return func(*args, **kwargs) + + model_id = kwargs.get("model_id") or (args[1] if len(args) > 1 else None) + model_data = kwargs.get("model_data") + + if not model_id or not model_data: + logger.warning("模型激活配额检查失败:缺少 model_id 或 model_data 参数") + return func(*args, **kwargs) + + if model_data.is_active is True: + try: + from app.models.models_model import ModelConfig + from app.services.model_service import ModelConfigService + + existing_model = ModelConfigService.get_model_by_id( + db=db, + model_id=model_id, + tenant_id=user.tenant_id + ) + + if not existing_model.is_active: + logger.info(f"模型激活操作,检查配额: model_id={model_id}, tenant_id={user.tenant_id}") + _check_quota(db, user.tenant_id, "model_quota", "model") + except Exception as e: + logger.error(f"模型激活配额检查异常: model_id={model_id}, error={str(e)}") + raise + + return func(*args, **kwargs) + return wrapper + + +def check_quota(quota_type: str, resource_name: str, usage_func: Optional[Callable] = None): + """通用配额检查装饰器,支持自定义使用量获取函数""" + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.warning("配额检查失败:缺少 db 或 user 参数") + return func(*args, **kwargs) + _check_quota(db, user.tenant_id, quota_type, resource_name, usage_func) + return func(*args, **kwargs) + return wrapper + return decorator + + +# ─── 配额使用统计 ──────────────────────────────────────────────────────────── + +def get_quota_usage(db: Session, tenant_id: UUID) -> dict: + """获取租户所有配额的使用情况""" + quota_config = _get_quota_config(db, tenant_id) + if not quota_config: + return {} + + repo = QuotaUsageRepository(db) + + def pct(used, limit): + return round(used / limit * 100, 1) if limit else None + + workspace_count = repo.count_workspaces(tenant_id) + skill_count = repo.count_skills(tenant_id) + app_count = repo.count_apps(tenant_id) + knowledge_gb = repo.sum_knowledge_capacity_gb(tenant_id) + memory_count = repo.count_memory_engines(tenant_id) + end_user_count = repo.count_end_users(tenant_id) + model_count = repo.count_models(tenant_id) + ontology_count = repo.count_ontology_projects(tenant_id) + + api_ops_current = 0 + try: + from app.core.config import settings + import redis + _now = time.time() + _rk = f"rate_limit:tenant_qps:{tenant_id}" + _r = redis.StrictRedis( + host=settings.REDIS_HOST, port=settings.REDIS_PORT, + db=settings.REDIS_DB, password=settings.REDIS_PASSWORD, + decode_responses=True + ) + api_ops_current = int(_r.zcount(_rk, _now - 1, "+inf")) + except Exception: + pass + + return { + "workspace": {"used": workspace_count, "limit": quota_config.get("workspace_quota"), "percentage": pct(workspace_count, quota_config.get("workspace_quota"))}, + "skill": {"used": skill_count, "limit": quota_config.get("skill_quota"), "percentage": pct(skill_count, quota_config.get("skill_quota"))}, + "app": {"used": app_count, "limit": quota_config.get("app_quota"), "percentage": pct(app_count, quota_config.get("app_quota"))}, + "knowledge_capacity": {"used": round(knowledge_gb, 2), "limit": quota_config.get("knowledge_capacity_quota"), "percentage": pct(knowledge_gb, quota_config.get("knowledge_capacity_quota")), "unit": "GB"}, + "memory_engine": {"used": memory_count, "limit": quota_config.get("memory_engine_quota"), "percentage": pct(memory_count, quota_config.get("memory_engine_quota"))}, + "end_user": {"used": end_user_count, "limit": quota_config.get("end_user_quota"), "percentage": pct(end_user_count, quota_config.get("end_user_quota"))}, + "ontology_project": {"used": ontology_count, "limit": quota_config.get("ontology_project_quota"), "percentage": pct(ontology_count, quota_config.get("ontology_project_quota"))}, + "model": {"used": model_count, "limit": quota_config.get("model_quota"), "percentage": pct(model_count, quota_config.get("model_quota"))}, + "api_ops_rate_limit": {"current": api_ops_current, "limit": quota_config.get("api_ops_rate_limit"), "percentage": None, "unit": "次/秒"}, + } diff --git a/api/app/core/quota_stub.py b/api/app/core/quota_stub.py index b8f82e75..577dfadb 100644 --- a/api/app/core/quota_stub.py +++ b/api/app/core/quota_stub.py @@ -1,44 +1,36 @@ """ -配额检查 stub - 社区版使用,所有检查直接放行。 -企业版通过 premium.platform_admin.quota_decorator 提供真实实现。 +配额检查 stub - 社区版和 SaaS 版统一使用 core.quota_manager 实现 + +所有配额检查逻辑统一在 core 层实现,两个版本共用: +- 社区版:从 default_free_plan.py 读取配额限制 +- SaaS 版:优先从 tenant_subscriptions 表读取,降级到配置文件 """ -from functools import wraps -from typing import Callable +from app.core.quota_manager import ( + check_workspace_quota, + check_skill_quota, + check_app_quota, + check_knowledge_capacity_quota, + check_memory_engine_quota, + check_end_user_quota, + check_ontology_project_quota, + check_model_quota, + check_model_activation_quota, + get_quota_usage, + _check_quota, + QuotaUsageRepository, +) - -def _noop_decorator(func: Callable) -> Callable: - """空装饰器,直接放行""" - return func - - -def _noop_check(*args, **kwargs): - """空检查函数,直接放行""" - pass - - -try: - from premium.platform_admin.quota_decorator import ( - check_workspace_quota, - check_skill_quota, - check_app_quota, - check_knowledge_capacity_quota, - check_memory_engine_quota, - check_end_user_quota, - check_ontology_project_quota, - check_model_quota, - check_model_activation_quota, - get_quota_usage, - _check_quota, - ) -except ModuleNotFoundError: - check_workspace_quota = _noop_decorator - check_skill_quota = _noop_decorator - check_app_quota = _noop_decorator - check_knowledge_capacity_quota = _noop_decorator - check_memory_engine_quota = _noop_decorator - check_end_user_quota = _noop_decorator - check_ontology_project_quota = _noop_decorator - check_model_quota = _noop_decorator - check_model_activation_quota = _noop_decorator - get_quota_usage = lambda db, tenant_id: {} - _check_quota = _noop_check +__all__ = [ + "check_workspace_quota", + "check_skill_quota", + "check_app_quota", + "check_knowledge_capacity_quota", + "check_memory_engine_quota", + "check_end_user_quota", + "check_ontology_project_quota", + "check_model_quota", + "check_model_activation_quota", + "get_quota_usage", + "_check_quota", + "QuotaUsageRepository", +]