From 48f3d9b10514fc349ed84b1aa147ecf28d2bae1e Mon Sep 17 00:00:00 2001 From: wwq Date: Mon, 20 Apr 2026 15:10:12 +0800 Subject: [PATCH] feat(quota): refactor quota management and rate limiting services - Add `API_KEY_RATE_LIMIT_EXCEEDED` error code. - Refactor `QuotaExceededError` to support resource type localization. - Optimize rate limiting service by implementing the sliding window algorithm. - Add rate limit validation for tenant plans. - Unify quota check decorator to support both synchronous and asynchronous operations. - Enhance quota usage statistics endpoints. --- api/app/core/api_key_auth.py | 36 +--- api/app/core/error_codes.py | 1 + api/app/core/quota_manager.py | 279 ++++++++++++++++++++-------- api/app/core/quota_stub.py | 2 + api/app/i18n/exceptions.py | 31 +++- api/app/locales/en/errors.json | 16 +- api/app/locales/zh/errors.json | 16 +- api/app/services/api_key_service.py | 193 +++++++++++++++---- 8 files changed, 421 insertions(+), 153 deletions(-) diff --git a/api/app/core/api_key_auth.py b/api/app/core/api_key_auth.py index 91d6bd8a..297e5082 100644 --- a/api/app/core/api_key_auth.py +++ b/api/app/core/api_key_auth.py @@ -96,40 +96,8 @@ def require_api_key( resource_id=api_key_obj.resource_id, ) - # ── Tenant 级别限速(来自套餐配额 api_ops_rate_limit)────────── - try: - from app.models.workspace_model import Workspace - from premium.platform_admin.package_plan_service import TenantSubscriptionService - - workspace = db.query(Workspace).filter( - Workspace.id == api_key_obj.workspace_id - ).first() - if workspace: - quota = TenantSubscriptionService(db).get_effective_quota(workspace.tenant_id) - tenant_qps_limit = quota.get("api_ops_rate_limit") if quota else None - if tenant_qps_limit: - rate_limiter = RateLimiterService() - tenant_ok, tenant_info = await rate_limiter.check_tenant_rate_limit( - workspace.tenant_id, tenant_qps_limit - ) - if not tenant_ok: - raise RateLimitException( - "租户 API 调用速率超限", - BizCode.API_KEY_QPS_LIMIT_EXCEEDED, - rate_headers={ - "X-RateLimit-Tenant-Limit": str(tenant_info["limit"]), - "X-RateLimit-Tenant-Remaining": str(tenant_info["remaining"]), - "X-RateLimit-Tenant-Reset": str(tenant_info["reset"]), - } - ) - except RateLimitException: - raise - except Exception as e: - logger.warning(f"Tenant 限速检查异常,跳过: {e}") - # ───────────────────────────────────────────────────────────── - rate_limiter = RateLimiterService() - is_allowed, error_msg, rate_headers = await rate_limiter.check_all_limits(api_key_obj) + is_allowed, error_msg, rate_headers = await rate_limiter.check_all_limits(api_key_obj, db=db) if not is_allowed: logger.warning("API Key 限流触发", extra={ "api_key_id": str(api_key_obj.id), @@ -142,6 +110,8 @@ def require_api_key( code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED elif "Daily" in error_msg: code = BizCode.API_KEY_DAILY_LIMIT_EXCEEDED + elif "Tenant" in error_msg: + code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED # 租户套餐速率超限,同属 QPS 类 else: code = BizCode.API_KEY_QUOTA_EXCEEDED diff --git a/api/app/core/error_codes.py b/api/app/core/error_codes.py index 01b6115d..a4a20cbb 100644 --- a/api/app/core/error_codes.py +++ b/api/app/core/error_codes.py @@ -31,6 +31,7 @@ class BizCode(IntEnum): API_KEY_QPS_LIMIT_EXCEEDED = 3014 API_KEY_DAILY_LIMIT_EXCEEDED = 3015 API_KEY_QUOTA_EXCEEDED = 3016 + API_KEY_RATE_LIMIT_EXCEEDED = 3017 # 资源(4xxx) NOT_FOUND = 4000 USER_NOT_FOUND = 4001 diff --git a/api/app/core/quota_manager.py b/api/app/core/quota_manager.py index 0e0053a0..43d8fa42 100644 --- a/api/app/core/quota_manager.py +++ b/api/app/core/quota_manager.py @@ -15,10 +15,13 @@ from sqlalchemy import func from sqlalchemy.orm import Session from app.core.logging_config import get_auth_logger -from app.i18n.exceptions import QuotaExceededError +from app.i18n.exceptions import QuotaExceededError, InternalServerError logger = get_auth_logger() +# Redis key 格式常量,与 RateLimiterService.check_tenant_rate_limit 保持一致 +TENANT_QPS_REDIS_KEY = "rate_limit:tenant_qps:{tenant_id}" + def _get_user_from_kwargs(kwargs: dict): """从 kwargs 中获取 user 对象""" @@ -73,31 +76,52 @@ def _get_tenant_id_from_kwargs(db: Session, kwargs: dict): def _get_quota_config(db: Session, tenant_id: UUID) -> Optional[Dict[str, Any]]: """ 获取租户的配额配置 - + 优先级: 1. premium 模块的 tenant_subscriptions(SaaS 版) 2. default_free_plan.py 配置文件(社区版兜底) """ - # 尝试从 premium 模块获取 + # 尝试从 premium 模块获取(SaaS 版) try: from premium.platform_admin.package_plan_service import TenantSubscriptionService + # premium 模块存在,运行时错误不应被静默降级,直接抛出 quota_config = TenantSubscriptionService(db).get_effective_quota(tenant_id) if quota_config: logger.debug(f"从 premium 模块获取租户 {tenant_id} 配额配置") return quota_config - except (ModuleNotFoundError, ImportError, Exception) as e: - logger.debug(f"无法从 premium 模块获取配额配置: {e}") + # premium 存在但该租户无订阅记录,降级到免费套餐 + logger.debug(f"租户 {tenant_id} 无 premium 订阅,降级到免费套餐") + except (ModuleNotFoundError, ImportError): + # 社区版:premium 包不存在,正常降级 + logger.debug("premium 模块不存在,使用社区版免费套餐配额") - # 降级到配置文件 + # 降级到社区版配置文件 try: from app.config.default_free_plan import DEFAULT_FREE_PLAN - logger.info(f"使用配置文件中的免费套餐配额: tenant={tenant_id}") + logger.debug(f"使用社区版免费套餐配额: tenant={tenant_id}") return DEFAULT_FREE_PLAN.get("quotas") except Exception as e: logger.error(f"无法从配置文件获取配额: {e}") return None +def get_api_ops_rate_limit(db: Session, tenant_id: UUID) -> Optional[int]: + """ + 获取租户套餐的 API 操作速率限制(QPS 上限) + + 该函数兼容社区版和 SaaS 版: + - SaaS 版:从 premium 模块的套餐配额读取 + - 社区版:从 default_free_plan.py 配置文件读取 + + Returns: + int: api_ops_rate_limit 值,如果未配置则返回 None + """ + quota_config = _get_quota_config(db, tenant_id) + if quota_config: + return quota_config.get("api_ops_rate_limit") + return None + + class QuotaUsageRepository: """配额使用量数据访问层""" @@ -247,41 +271,74 @@ def _check_quota( def check_workspace_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "workspace_quota", "workspace") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "workspace_quota", "workspace") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_skill_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "skill_quota", "skill") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "skill_quota", "skill") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_app_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "app_quota", "app") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "app_quota", "app") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_knowledge_capacity_quota(func: Callable) -> Callable: @@ -289,12 +346,12 @@ def check_knowledge_capacity_quota(func: Callable) -> Callable: async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") if not db: - logger.warning("配额检查失败:缺少 db 参数") - return await func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 参数,拒绝请求") + raise InternalServerError() tenant_id = _get_tenant_id_from_kwargs(db, kwargs) if not tenant_id: - logger.warning("配额检查失败:无法获取 tenant_id") - return await func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 无法获取 tenant_id,拒绝请求") + raise InternalServerError() _check_quota(db, tenant_id, "knowledge_capacity_quota", "knowledge_capacity") return await func(*args, **kwargs) @@ -303,8 +360,8 @@ def check_knowledge_capacity_quota(func: Callable) -> Callable: db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "knowledge_capacity_quota", "knowledge_capacity") return func(*args, **kwargs) @@ -313,15 +370,26 @@ def check_knowledge_capacity_quota(func: Callable) -> Callable: def check_memory_engine_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "memory_engine_quota", "memory_engine") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "memory_engine_quota", "memory_engine") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_end_user_quota(func: Callable) -> Callable: @@ -329,12 +397,12 @@ def check_end_user_quota(func: Callable) -> Callable: async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") if not db: - logger.warning("配额检查失败:缺少 db 参数") - return await func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 参数,拒绝请求") + raise InternalServerError() tenant_id = _get_tenant_id_from_kwargs(db, kwargs) if not tenant_id: - logger.warning("配额检查失败:无法获取 tenant_id") - return await func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 无法获取 tenant_id,拒绝请求") + raise InternalServerError() _check_quota(db, tenant_id, "end_user_quota", "end_user") return await func(*args, **kwargs) @@ -342,12 +410,12 @@ def check_end_user_quota(func: Callable) -> Callable: def sync_wrapper(*args, **kwargs): db: Session = kwargs.get("db") if not db: - logger.warning("配额检查失败:缺少 db 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 参数,拒绝请求") + raise InternalServerError() tenant_id = _get_tenant_id_from_kwargs(db, kwargs) if not tenant_id: - logger.warning("配额检查失败:无法获取 tenant_id") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 无法获取 tenant_id,拒绝请求") + raise InternalServerError() _check_quota(db, tenant_id, "end_user_quota", "end_user") return func(*args, **kwargs) @@ -356,88 +424,155 @@ def check_end_user_quota(func: Callable) -> Callable: def check_ontology_project_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "ontology_project_quota", "ontology_project") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "ontology_project_quota", "ontology_project") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_model_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "model_quota", "model") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "model_quota", "model") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_model_activation_quota(func: Callable) -> Callable: """模型激活时的配额检查装饰器""" @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) - + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + model_id = kwargs.get("model_id") or (args[1] if len(args) > 1 else None) model_data = kwargs.get("model_data") - + if not model_id or not model_data: logger.warning("模型激活配额检查失败:缺少 model_id 或 model_data 参数") - return func(*args, **kwargs) - + return await func(*args, **kwargs) + if model_data.is_active is True: try: - from app.models.models_model import ModelConfig from app.services.model_service import ModelConfigService - + existing_model = ModelConfigService.get_model_by_id( - db=db, - model_id=model_id, + db=db, + model_id=model_id, tenant_id=user.tenant_id ) - + if not existing_model.is_active: logger.info(f"模型激活操作,检查配额: model_id={model_id}, tenant_id={user.tenant_id}") _check_quota(db, user.tenant_id, "model_quota", "model") except Exception as e: logger.error(f"模型激活配额检查异常: model_id={model_id}, error={str(e)}") raise - + + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + + model_id = kwargs.get("model_id") or (args[1] if len(args) > 1 else None) + model_data = kwargs.get("model_data") + + if not model_id or not model_data: + logger.warning("模型激活配额检查失败:缺少 model_id 或 model_data 参数") + return func(*args, **kwargs) + + if model_data.is_active is True: + try: + from app.services.model_service import ModelConfigService + + existing_model = ModelConfigService.get_model_by_id( + db=db, + model_id=model_id, + tenant_id=user.tenant_id + ) + + if not existing_model.is_active: + logger.info(f"模型激活操作,检查配额: model_id={model_id}, tenant_id={user.tenant_id}") + _check_quota(db, user.tenant_id, "model_quota", "model") + except Exception as e: + logger.error(f"模型激活配额检查异常: model_id={model_id}, error={str(e)}") + raise + return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_quota(quota_type: str, resource_name: str, usage_func: Optional[Callable] = None): """通用配额检查装饰器,支持自定义使用量获取函数""" def decorator(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, quota_type, resource_name, usage_func) + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, quota_type, resource_name, usage_func) return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper return decorator # ─── 配额使用统计 ──────────────────────────────────────────────────────────── -def get_quota_usage(db: Session, tenant_id: UUID) -> dict: +async def get_quota_usage(db: Session, tenant_id: UUID) -> dict: """获取租户所有配额的使用情况""" quota_config = _get_quota_config(db, tenant_id) if not quota_config: @@ -459,18 +594,12 @@ def get_quota_usage(db: Session, tenant_id: UUID) -> dict: api_ops_current = 0 try: - from app.core.config import settings - import redis + from app.aioRedis import aio_redis as _aio_redis _now = time.time() - _rk = f"rate_limit:tenant_qps:{tenant_id}" - _r = redis.StrictRedis( - host=settings.REDIS_HOST, port=settings.REDIS_PORT, - db=settings.REDIS_DB, password=settings.REDIS_PASSWORD, - decode_responses=True - ) - api_ops_current = int(_r.zcount(_rk, _now - 1, "+inf")) - except Exception: - pass + _rk = TENANT_QPS_REDIS_KEY.format(tenant_id=tenant_id) + api_ops_current = int(await _aio_redis.zcount(_rk, _now - 1, "+inf") or 0) + except Exception as e: + logger.warning(f"获取 api_ops_current 失败,返回 0: {type(e).__name__}: {e}") return { "workspace": {"used": workspace_count, "limit": quota_config.get("workspace_quota"), "percentage": pct(workspace_count, quota_config.get("workspace_quota"))}, diff --git a/api/app/core/quota_stub.py b/api/app/core/quota_stub.py index 577dfadb..4a8f7cee 100644 --- a/api/app/core/quota_stub.py +++ b/api/app/core/quota_stub.py @@ -18,6 +18,7 @@ from app.core.quota_manager import ( get_quota_usage, _check_quota, QuotaUsageRepository, + TENANT_QPS_REDIS_KEY, ) __all__ = [ @@ -33,4 +34,5 @@ __all__ = [ "get_quota_usage", "_check_quota", "QuotaUsageRepository", + "TENANT_QPS_REDIS_KEY", ] diff --git a/api/app/i18n/exceptions.py b/api/app/i18n/exceptions.py index b81369ed..9a517925 100644 --- a/api/app/i18n/exceptions.py +++ b/api/app/i18n/exceptions.py @@ -482,14 +482,39 @@ class RateLimitExceededError(I18nException): ) -class QuotaExceededError(ForbiddenError): - """Quota exceeded error.""" +class QuotaExceededError(I18nException): + """Quota exceeded error (402).""" + + # resource key -> i18n display key + _RESOURCE_KEY_MAP = { + "workspace": "errors.quota_resources.workspace", + "app": "errors.quota_resources.app", + "skill": "errors.quota_resources.skill", + "knowledge_capacity": "errors.quota_resources.knowledge_capacity", + "memory_engine": "errors.quota_resources.memory_engine", + "end_user": "errors.quota_resources.end_user", + "model": "errors.quota_resources.model", + "ontology_project": "errors.quota_resources.ontology_project", + "api_ops_rate_limit": "errors.quota_resources.api_ops_rate_limit", + } def __init__(self, resource: Optional[str] = None, **params): + # Translate resource key to a localized display name before calling super() if resource: - params["resource"] = resource + resource_i18n_key = self._RESOURCE_KEY_MAP.get(resource) + if resource_i18n_key: + try: + from app.i18n.service import get_translation_service + from app.core.config import settings + _locale = _current_locale.get() or settings.I18N_DEFAULT_LANGUAGE + params["resource"] = get_translation_service().translate(resource_i18n_key, _locale) + except Exception: + params["resource"] = resource + else: + params["resource"] = resource super().__init__( error_key="errors.api.quota_exceeded", + status_code=402, error_code="QUOTA_EXCEEDED", **params ) diff --git a/api/app/locales/en/errors.json b/api/app/locales/en/errors.json index d0276dc9..2355954c 100644 --- a/api/app/locales/en/errors.json +++ b/api/app/locales/en/errors.json @@ -106,7 +106,7 @@ }, "api": { "rate_limit_exceeded": "API rate limit exceeded", - "quota_exceeded": "API quota exceeded", + "quota_exceeded": "{resource} quota exceeded", "invalid_api_key": "Invalid API key", "api_key_expired": "API key has expired", "api_key_revoked": "API key has been revoked", @@ -114,7 +114,8 @@ "method_not_allowed": "Method not allowed", "invalid_request": "Invalid request", "missing_parameter": "Missing required parameter: {param}", - "invalid_parameter": "Invalid parameter: {param}" + "invalid_parameter": "Invalid parameter: {param}", + "api_key_rate_limit_exceeded": "API Key rate limit ({rate_limit}) exceeds tenant plan limit ({limit})" }, "database": { "connection_failed": "Database connection failed", @@ -134,5 +135,16 @@ "invalid_format": "Invalid format: {field}", "invalid_value": "Invalid value: {field}", "out_of_range": "Value out of range: {field}" + }, + "quota_resources": { + "workspace": "Workspace", + "app": "App", + "skill": "Skill", + "knowledge_capacity": "Knowledge capacity", + "memory_engine": "Memory engine", + "end_user": "End user", + "model": "Model", + "ontology_project": "Ontology project", + "api_ops_rate_limit": "API ops rate limit" } } diff --git a/api/app/locales/zh/errors.json b/api/app/locales/zh/errors.json index eafadad4..8b7fdec0 100644 --- a/api/app/locales/zh/errors.json +++ b/api/app/locales/zh/errors.json @@ -106,7 +106,7 @@ }, "api": { "rate_limit_exceeded": "API调用频率超限", - "quota_exceeded": "API调用配额已用完", + "quota_exceeded": "{resource} 配额已超限", "invalid_api_key": "无效的API密钥", "api_key_expired": "API密钥已过期", "api_key_revoked": "API密钥已被撤销", @@ -114,7 +114,8 @@ "method_not_allowed": "不支持的请求方法", "invalid_request": "无效的请求", "missing_parameter": "缺少必需参数:{param}", - "invalid_parameter": "参数无效:{param}" + "invalid_parameter": "参数无效:{param}", + "api_key_rate_limit_exceeded": "API Key 的 QPS 限制({rate_limit})超过租户套餐上限({limit})" }, "database": { "connection_failed": "数据库连接失败", @@ -134,5 +135,16 @@ "invalid_format": "格式不正确:{field}", "invalid_value": "值无效:{field}", "out_of_range": "值超出范围:{field}" + }, + "quota_resources": { + "workspace": "工作空间", + "app": "应用", + "skill": "技能", + "knowledge_capacity": "知识库容量", + "memory_engine": "记忆引擎", + "end_user": "终端用户", + "model": "模型", + "ontology_project": "本体工程", + "api_ops_rate_limit": "API 操作速率" } } diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index 07d55198..7b6b1172 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -19,6 +19,7 @@ from app.core.exceptions import ( ) from app.core.error_codes import BizCode from app.core.logging_config import get_business_logger +from app.i18n.exceptions import I18nException logger = get_business_logger() @@ -51,6 +52,22 @@ class ApiKeyService: if existing: raise BusinessException(f"API Key 名称 {data.name} 已存在", BizCode.API_KEY_DUPLICATE_NAME) + # 校验 rate_limit 不能超过租户套餐的 api_ops_rate_limit + from app.models.workspace_model import Workspace + from app.core.quota_manager import get_api_ops_rate_limit + + workspace = db.query(Workspace).filter(Workspace.id == workspace_id).first() + if workspace: + tenant_api_ops_limit = get_api_ops_rate_limit(db, workspace.tenant_id) + if tenant_api_ops_limit and data.rate_limit > tenant_api_ops_limit: + raise I18nException( + error_key="errors.api.api_key_rate_limit_exceeded", + status_code=400, + error_code="API_KEY_RATE_LIMIT_EXCEEDED", + rate_limit=data.rate_limit, + limit=tenant_api_ops_limit, + ) + # 生成 API Key api_key = generate_api_key(data.type) @@ -152,6 +169,23 @@ class ApiKeyService: if existing: raise BusinessException(f"API Key 名称 {data.name} 已存在", BizCode.API_KEY_DUPLICATE_NAME) + # 校验 rate_limit 不能超过租户套餐的 api_ops_rate_limit + if data.rate_limit is not None: + from app.models.workspace_model import Workspace + from app.core.quota_manager import get_api_ops_rate_limit + + workspace = db.query(Workspace).filter(Workspace.id == workspace_id).first() + if workspace: + tenant_api_ops_limit = get_api_ops_rate_limit(db, workspace.tenant_id) + if tenant_api_ops_limit and data.rate_limit > tenant_api_ops_limit: + raise I18nException( + error_key="errors.api.api_key_rate_limit_exceeded", + status_code=400, + error_code="API_KEY_RATE_LIMIT_EXCEEDED", + rate_limit=data.rate_limit, + limit=tenant_api_ops_limit, + ) + update_data = data.model_dump(exclude_unset=True) ApiKeyRepository.update(db, api_key_id, update_data) db.commit() @@ -248,55 +282,75 @@ class RateLimiterService: def __init__(self): self.redis = aio_redis - async def check_tenant_rate_limit(self, tenant_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: + async def check_tenant_rate_limit(self, window_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: """ - 按 tenant_id 做 1 秒滑动窗口限速,限制值来自套餐配额 api_ops_rate_limit + 按 window_id(workspace_id)做 1 秒滑动窗口限速。 + 限制值来自套餐配额 api_ops_rate_limit。 + 只有请求被允许时才计入窗口,超限请求不污染计数。 """ now = time.time() window_start = now - 1 # 1 秒窗口 - key = f"rate_limit:tenant_qps:{tenant_id}" + key = f"rate_limit:tenant_qps:{window_id}" async with self.redis.pipeline() as pipe: - # 清理 1 秒前的旧记录 pipe.zremrangebyscore(key, 0, window_start) - # 加入当前请求(score=时间戳,member=时间戳+随机数保证唯一) - pipe.zadd(key, {f"{now}:{uuid.uuid4().hex}": now}) - # 统计窗口内请求数 pipe.zcard(key) - # 设置 key 过期(2 秒后自动清理) - pipe.expire(key, 2) results = await pipe.execute() - current = results[2] - remaining = max(0, limit - current) - reset_time = int(now) + 1 + current = results[1] - return current <= limit, { + if current >= limit: + return False, { + "limit": limit, + "remaining": 0, + "reset": int(now) + 1, + } + + member = f"{now}:{uuid.uuid4().hex}" + async with self.redis.pipeline() as pipe: + pipe.zadd(key, {member: now}) + pipe.expire(key, 2) + await pipe.execute() + + return True, { "limit": limit, - "remaining": remaining, - "reset": reset_time, + "remaining": max(0, limit - current - 1), + "reset": int(now) + 1, } async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: """ - 检查QPS限制 - Returns: - (is_allowed, rate_limit_info) + 检查 API Key 自身 QPS 限制(1 秒滑动窗口)。 + 只有请求被允许时才计入窗口,超限请求不污染计数。 """ + now = time.time() + window_start = now - 1 key = f"rate_limit:qps:{api_key_id}" + async with self.redis.pipeline() as pipe: - pipe.incr(key) - pipe.expire(key, 1, nx=True) # 1 秒过期 + pipe.zremrangebyscore(key, 0, window_start) + pipe.zcard(key) results = await pipe.execute() - current = results[0] - remaining = max(0, limit - current) - reset_time = int(time.time()) + 1 + current = results[1] - return current <= limit, { + if current >= limit: + return False, { + "limit": limit, + "remaining": 0, + "reset": int(now) + 1, + } + + member = f"{now}:{uuid.uuid4().hex}" + async with self.redis.pipeline() as pipe: + pipe.zadd(key, {member: now}) + pipe.expire(key, 2) + await pipe.execute() + + return True, { "limit": limit, - "remaining": remaining, - "reset": reset_time + "remaining": max(0, limit - current - 1), + "reset": int(now) + 1, } async def check_daily_requests( @@ -304,7 +358,9 @@ class RateLimiterService: api_key_id: uuid.UUID, limit: int ) -> Tuple[bool, dict]: - """检查日调用量限制""" + """检查日调用量限制。 + 使用原子 INCR,先写后判断,极低概率下允许轻微超限(并发场景下可接受)。 + """ today = datetime.now().strftime("%Y%m%d") key = f"rate_limit:daily:{api_key_id}:{today}" @@ -313,6 +369,7 @@ class RateLimiterService: hour=0, minute=0, second=0, microsecond=0 ) expire_seconds = int((tomorrow_0 - now).total_seconds()) + reset_time = int(tomorrow_0.timestamp()) async with self.redis.pipeline() as pipe: pipe.incr(key) @@ -320,25 +377,37 @@ class RateLimiterService: results = await pipe.execute() current = results[0] - remaining = max(0, limit - current) - reset_time = int(tomorrow_0.timestamp()) - return current <= limit, { + if current > limit: + return False, { + "limit": limit, + "remaining": 0, + "reset": reset_time, + } + + return True, { "limit": limit, - "remaining": remaining, - "reset": reset_time + "remaining": max(0, limit - current), + "reset": reset_time, } async def check_all_limits( self, - api_key: ApiKey + api_key: ApiKey, + db: Optional[Session] = None, ) -> Tuple[bool, str, dict]: """ - 检查所有限制 - Returns: - (is_allowed, error_message, rate_limit_headers) + 检查所有限制,按以下顺序: + 1. API Key 自身 QPS(每个 key 独立,不超过其配置的 rate_limit) + 2. 租户套餐 api_ops_rate_limit(每个 key 独立受套餐限额约束) + 3. API Key 日调用量 + + 设计语义: + - 租户下 N 个 API Key 可以并发调用,互不影响 + - 每个 API Key 独立受自身 rate_limit 约束 + - 每个 API Key 独立受租户套餐 api_ops_rate_limit 约束(per-key 限额) """ - # Check QPS + # 1. 检查 API Key 自身 QPS qps_ok, qps_info = await self.check_qps( api_key.id, api_key.rate_limit @@ -350,6 +419,50 @@ class RateLimiterService: "X-RateLimit-Reset": str(qps_info["reset"]) } + # 2. 检查租户套餐 api_ops_rate_limit(运行时流量控制) + # 每个 API Key 的速率都不能超过租户套餐上限,无论 key 自身配置多少 + tenant_info = None + if db is not None: + try: + from app.models.workspace_model import Workspace + from app.core.quota_manager import get_api_ops_rate_limit + + # 从 Redis 缓存取 tenant_limit,避免每次请求都查数据库 + # TTL 60 秒,套餐变更后最多 1 分钟生效 + # 用 workspace_id 作为滑动窗口维度(一个 workspace 只属于一个 tenant,语义等价) + cache_key = f"tenant_api_ops_limit:{api_key.workspace_id}" + cached = await self.redis.get(cache_key) + if cached is not None: + try: + tenant_limit = int(cached) if cached != "0" else None + except (ValueError, TypeError): + # 缓存数据损坏,回源查数据库 + cached = None + tenant_limit = None + + if cached is None: + workspace = db.query(Workspace).filter(Workspace.id == api_key.workspace_id).first() + if workspace: + tenant_limit = get_api_ops_rate_limit(db, workspace.tenant_id) + await self.redis.set(cache_key, str(tenant_limit) if tenant_limit else "0", ex=60) + else: + tenant_limit = None + + if tenant_limit: + # 用 api_key.id 作为滑动窗口 key,每个 API Key 独立受 api_ops_rate_limit 约束 + tenant_ok, tenant_info = await self.check_tenant_rate_limit( + api_key.id, tenant_limit + ) + if not tenant_ok: + return False, "API ops rate limit exceeded", { + "X-RateLimit-Limit-QPS": str(tenant_limit), + "X-RateLimit-Remaining-QPS": str(tenant_info["remaining"]), + "X-RateLimit-Reset": str(tenant_info["reset"]), + } + except Exception as e: + logger.warning(f"租户 api_ops_rate_limit 运行时检查失败,跳过: {e}") + + # 3. 检查日调用量 daily_ok, daily_info = await self.check_daily_requests( api_key.id, api_key.daily_request_limit @@ -366,8 +479,12 @@ class RateLimiterService: "X-RateLimit-Remaining-QPS": str(qps_info["remaining"]), "X-RateLimit-Limit-Day": str(daily_info["limit"]), "X-RateLimit-Remaining-Day": str(daily_info["remaining"]), - "X-RateLimit-Reset": str(daily_info["reset"]) + "X-RateLimit-Reset": str(daily_info["reset"]), } + # 如果租户限速信息存在,补充到响应头 + if tenant_info: + headers["X-RateLimit-Limit-Tenant-QPS"] = str(tenant_info["limit"]) + headers["X-RateLimit-Remaining-Tenant-QPS"] = str(tenant_info["remaining"]) return True, "", headers