From 48f3d9b10514fc349ed84b1aa147ecf28d2bae1e Mon Sep 17 00:00:00 2001 From: wwq Date: Mon, 20 Apr 2026 15:10:12 +0800 Subject: [PATCH 1/7] feat(quota): refactor quota management and rate limiting services - Add `API_KEY_RATE_LIMIT_EXCEEDED` error code. - Refactor `QuotaExceededError` to support resource type localization. - Optimize rate limiting service by implementing the sliding window algorithm. - Add rate limit validation for tenant plans. - Unify quota check decorator to support both synchronous and asynchronous operations. - Enhance quota usage statistics endpoints. --- api/app/core/api_key_auth.py | 36 +--- api/app/core/error_codes.py | 1 + api/app/core/quota_manager.py | 279 ++++++++++++++++++++-------- api/app/core/quota_stub.py | 2 + api/app/i18n/exceptions.py | 31 +++- api/app/locales/en/errors.json | 16 +- api/app/locales/zh/errors.json | 16 +- api/app/services/api_key_service.py | 193 +++++++++++++++---- 8 files changed, 421 insertions(+), 153 deletions(-) diff --git a/api/app/core/api_key_auth.py b/api/app/core/api_key_auth.py index 91d6bd8a..297e5082 100644 --- a/api/app/core/api_key_auth.py +++ b/api/app/core/api_key_auth.py @@ -96,40 +96,8 @@ def require_api_key( resource_id=api_key_obj.resource_id, ) - # ── Tenant 级别限速(来自套餐配额 api_ops_rate_limit)────────── - try: - from app.models.workspace_model import Workspace - from premium.platform_admin.package_plan_service import TenantSubscriptionService - - workspace = db.query(Workspace).filter( - Workspace.id == api_key_obj.workspace_id - ).first() - if workspace: - quota = TenantSubscriptionService(db).get_effective_quota(workspace.tenant_id) - tenant_qps_limit = quota.get("api_ops_rate_limit") if quota else None - if tenant_qps_limit: - rate_limiter = RateLimiterService() - tenant_ok, tenant_info = await rate_limiter.check_tenant_rate_limit( - workspace.tenant_id, tenant_qps_limit - ) - if not tenant_ok: - raise RateLimitException( - "租户 API 调用速率超限", - BizCode.API_KEY_QPS_LIMIT_EXCEEDED, - rate_headers={ - "X-RateLimit-Tenant-Limit": str(tenant_info["limit"]), - "X-RateLimit-Tenant-Remaining": str(tenant_info["remaining"]), - "X-RateLimit-Tenant-Reset": str(tenant_info["reset"]), - } - ) - except RateLimitException: - raise - except Exception as e: - logger.warning(f"Tenant 限速检查异常,跳过: {e}") - # ───────────────────────────────────────────────────────────── - rate_limiter = RateLimiterService() - is_allowed, error_msg, rate_headers = await rate_limiter.check_all_limits(api_key_obj) + is_allowed, error_msg, rate_headers = await rate_limiter.check_all_limits(api_key_obj, db=db) if not is_allowed: logger.warning("API Key 限流触发", extra={ "api_key_id": str(api_key_obj.id), @@ -142,6 +110,8 @@ def require_api_key( code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED elif "Daily" in error_msg: code = BizCode.API_KEY_DAILY_LIMIT_EXCEEDED + elif "Tenant" in error_msg: + code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED # 租户套餐速率超限,同属 QPS 类 else: code = BizCode.API_KEY_QUOTA_EXCEEDED diff --git a/api/app/core/error_codes.py b/api/app/core/error_codes.py index 01b6115d..a4a20cbb 100644 --- a/api/app/core/error_codes.py +++ b/api/app/core/error_codes.py @@ -31,6 +31,7 @@ class BizCode(IntEnum): API_KEY_QPS_LIMIT_EXCEEDED = 3014 API_KEY_DAILY_LIMIT_EXCEEDED = 3015 API_KEY_QUOTA_EXCEEDED = 3016 + API_KEY_RATE_LIMIT_EXCEEDED = 3017 # 资源(4xxx) NOT_FOUND = 4000 USER_NOT_FOUND = 4001 diff --git a/api/app/core/quota_manager.py b/api/app/core/quota_manager.py index 0e0053a0..43d8fa42 100644 --- a/api/app/core/quota_manager.py +++ b/api/app/core/quota_manager.py @@ -15,10 +15,13 @@ from sqlalchemy import func from sqlalchemy.orm import Session from app.core.logging_config import get_auth_logger -from app.i18n.exceptions import QuotaExceededError +from app.i18n.exceptions import QuotaExceededError, InternalServerError logger = get_auth_logger() +# Redis key 格式常量,与 RateLimiterService.check_tenant_rate_limit 保持一致 +TENANT_QPS_REDIS_KEY = "rate_limit:tenant_qps:{tenant_id}" + def _get_user_from_kwargs(kwargs: dict): """从 kwargs 中获取 user 对象""" @@ -73,31 +76,52 @@ def _get_tenant_id_from_kwargs(db: Session, kwargs: dict): def _get_quota_config(db: Session, tenant_id: UUID) -> Optional[Dict[str, Any]]: """ 获取租户的配额配置 - + 优先级: 1. premium 模块的 tenant_subscriptions(SaaS 版) 2. default_free_plan.py 配置文件(社区版兜底) """ - # 尝试从 premium 模块获取 + # 尝试从 premium 模块获取(SaaS 版) try: from premium.platform_admin.package_plan_service import TenantSubscriptionService + # premium 模块存在,运行时错误不应被静默降级,直接抛出 quota_config = TenantSubscriptionService(db).get_effective_quota(tenant_id) if quota_config: logger.debug(f"从 premium 模块获取租户 {tenant_id} 配额配置") return quota_config - except (ModuleNotFoundError, ImportError, Exception) as e: - logger.debug(f"无法从 premium 模块获取配额配置: {e}") + # premium 存在但该租户无订阅记录,降级到免费套餐 + logger.debug(f"租户 {tenant_id} 无 premium 订阅,降级到免费套餐") + except (ModuleNotFoundError, ImportError): + # 社区版:premium 包不存在,正常降级 + logger.debug("premium 模块不存在,使用社区版免费套餐配额") - # 降级到配置文件 + # 降级到社区版配置文件 try: from app.config.default_free_plan import DEFAULT_FREE_PLAN - logger.info(f"使用配置文件中的免费套餐配额: tenant={tenant_id}") + logger.debug(f"使用社区版免费套餐配额: tenant={tenant_id}") return DEFAULT_FREE_PLAN.get("quotas") except Exception as e: logger.error(f"无法从配置文件获取配额: {e}") return None +def get_api_ops_rate_limit(db: Session, tenant_id: UUID) -> Optional[int]: + """ + 获取租户套餐的 API 操作速率限制(QPS 上限) + + 该函数兼容社区版和 SaaS 版: + - SaaS 版:从 premium 模块的套餐配额读取 + - 社区版:从 default_free_plan.py 配置文件读取 + + Returns: + int: api_ops_rate_limit 值,如果未配置则返回 None + """ + quota_config = _get_quota_config(db, tenant_id) + if quota_config: + return quota_config.get("api_ops_rate_limit") + return None + + class QuotaUsageRepository: """配额使用量数据访问层""" @@ -247,41 +271,74 @@ def _check_quota( def check_workspace_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "workspace_quota", "workspace") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "workspace_quota", "workspace") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_skill_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "skill_quota", "skill") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "skill_quota", "skill") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_app_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "app_quota", "app") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "app_quota", "app") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_knowledge_capacity_quota(func: Callable) -> Callable: @@ -289,12 +346,12 @@ def check_knowledge_capacity_quota(func: Callable) -> Callable: async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") if not db: - logger.warning("配额检查失败:缺少 db 参数") - return await func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 参数,拒绝请求") + raise InternalServerError() tenant_id = _get_tenant_id_from_kwargs(db, kwargs) if not tenant_id: - logger.warning("配额检查失败:无法获取 tenant_id") - return await func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 无法获取 tenant_id,拒绝请求") + raise InternalServerError() _check_quota(db, tenant_id, "knowledge_capacity_quota", "knowledge_capacity") return await func(*args, **kwargs) @@ -303,8 +360,8 @@ def check_knowledge_capacity_quota(func: Callable) -> Callable: db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "knowledge_capacity_quota", "knowledge_capacity") return func(*args, **kwargs) @@ -313,15 +370,26 @@ def check_knowledge_capacity_quota(func: Callable) -> Callable: def check_memory_engine_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "memory_engine_quota", "memory_engine") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "memory_engine_quota", "memory_engine") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_end_user_quota(func: Callable) -> Callable: @@ -329,12 +397,12 @@ def check_end_user_quota(func: Callable) -> Callable: async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") if not db: - logger.warning("配额检查失败:缺少 db 参数") - return await func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 参数,拒绝请求") + raise InternalServerError() tenant_id = _get_tenant_id_from_kwargs(db, kwargs) if not tenant_id: - logger.warning("配额检查失败:无法获取 tenant_id") - return await func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 无法获取 tenant_id,拒绝请求") + raise InternalServerError() _check_quota(db, tenant_id, "end_user_quota", "end_user") return await func(*args, **kwargs) @@ -342,12 +410,12 @@ def check_end_user_quota(func: Callable) -> Callable: def sync_wrapper(*args, **kwargs): db: Session = kwargs.get("db") if not db: - logger.warning("配额检查失败:缺少 db 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 参数,拒绝请求") + raise InternalServerError() tenant_id = _get_tenant_id_from_kwargs(db, kwargs) if not tenant_id: - logger.warning("配额检查失败:无法获取 tenant_id") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 无法获取 tenant_id,拒绝请求") + raise InternalServerError() _check_quota(db, tenant_id, "end_user_quota", "end_user") return func(*args, **kwargs) @@ -356,88 +424,155 @@ def check_end_user_quota(func: Callable) -> Callable: def check_ontology_project_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "ontology_project_quota", "ontology_project") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "ontology_project_quota", "ontology_project") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_model_quota(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, "model_quota", "model") + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, "model_quota", "model") return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_model_activation_quota(func: Callable) -> Callable: """模型激活时的配额检查装饰器""" @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) - + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + model_id = kwargs.get("model_id") or (args[1] if len(args) > 1 else None) model_data = kwargs.get("model_data") - + if not model_id or not model_data: logger.warning("模型激活配额检查失败:缺少 model_id 或 model_data 参数") - return func(*args, **kwargs) - + return await func(*args, **kwargs) + if model_data.is_active is True: try: - from app.models.models_model import ModelConfig from app.services.model_service import ModelConfigService - + existing_model = ModelConfigService.get_model_by_id( - db=db, - model_id=model_id, + db=db, + model_id=model_id, tenant_id=user.tenant_id ) - + if not existing_model.is_active: logger.info(f"模型激活操作,检查配额: model_id={model_id}, tenant_id={user.tenant_id}") _check_quota(db, user.tenant_id, "model_quota", "model") except Exception as e: logger.error(f"模型激活配额检查异常: model_id={model_id}, error={str(e)}") raise - + + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + + model_id = kwargs.get("model_id") or (args[1] if len(args) > 1 else None) + model_data = kwargs.get("model_data") + + if not model_id or not model_data: + logger.warning("模型激活配额检查失败:缺少 model_id 或 model_data 参数") + return func(*args, **kwargs) + + if model_data.is_active is True: + try: + from app.services.model_service import ModelConfigService + + existing_model = ModelConfigService.get_model_by_id( + db=db, + model_id=model_id, + tenant_id=user.tenant_id + ) + + if not existing_model.is_active: + logger.info(f"模型激活操作,检查配额: model_id={model_id}, tenant_id={user.tenant_id}") + _check_quota(db, user.tenant_id, "model_quota", "model") + except Exception as e: + logger.error(f"模型激活配额检查异常: model_id={model_id}, error={str(e)}") + raise + return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper def check_quota(quota_type: str, resource_name: str, usage_func: Optional[Callable] = None): """通用配额检查装饰器,支持自定义使用量获取函数""" def decorator(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): + async def async_wrapper(*args, **kwargs): db: Session = kwargs.get("db") user = _get_user_from_kwargs(kwargs) if not db or not user: - logger.warning("配额检查失败:缺少 db 或 user 参数") - return func(*args, **kwargs) + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() + _check_quota(db, user.tenant_id, quota_type, resource_name, usage_func) + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + db: Session = kwargs.get("db") + user = _get_user_from_kwargs(kwargs) + if not db or not user: + logger.error(f"配额检查失败:{func.__name__} 缺少 db 或 user 参数,拒绝请求") + raise InternalServerError() _check_quota(db, user.tenant_id, quota_type, resource_name, usage_func) return func(*args, **kwargs) - return wrapper + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper return decorator # ─── 配额使用统计 ──────────────────────────────────────────────────────────── -def get_quota_usage(db: Session, tenant_id: UUID) -> dict: +async def get_quota_usage(db: Session, tenant_id: UUID) -> dict: """获取租户所有配额的使用情况""" quota_config = _get_quota_config(db, tenant_id) if not quota_config: @@ -459,18 +594,12 @@ def get_quota_usage(db: Session, tenant_id: UUID) -> dict: api_ops_current = 0 try: - from app.core.config import settings - import redis + from app.aioRedis import aio_redis as _aio_redis _now = time.time() - _rk = f"rate_limit:tenant_qps:{tenant_id}" - _r = redis.StrictRedis( - host=settings.REDIS_HOST, port=settings.REDIS_PORT, - db=settings.REDIS_DB, password=settings.REDIS_PASSWORD, - decode_responses=True - ) - api_ops_current = int(_r.zcount(_rk, _now - 1, "+inf")) - except Exception: - pass + _rk = TENANT_QPS_REDIS_KEY.format(tenant_id=tenant_id) + api_ops_current = int(await _aio_redis.zcount(_rk, _now - 1, "+inf") or 0) + except Exception as e: + logger.warning(f"获取 api_ops_current 失败,返回 0: {type(e).__name__}: {e}") return { "workspace": {"used": workspace_count, "limit": quota_config.get("workspace_quota"), "percentage": pct(workspace_count, quota_config.get("workspace_quota"))}, diff --git a/api/app/core/quota_stub.py b/api/app/core/quota_stub.py index 577dfadb..4a8f7cee 100644 --- a/api/app/core/quota_stub.py +++ b/api/app/core/quota_stub.py @@ -18,6 +18,7 @@ from app.core.quota_manager import ( get_quota_usage, _check_quota, QuotaUsageRepository, + TENANT_QPS_REDIS_KEY, ) __all__ = [ @@ -33,4 +34,5 @@ __all__ = [ "get_quota_usage", "_check_quota", "QuotaUsageRepository", + "TENANT_QPS_REDIS_KEY", ] diff --git a/api/app/i18n/exceptions.py b/api/app/i18n/exceptions.py index b81369ed..9a517925 100644 --- a/api/app/i18n/exceptions.py +++ b/api/app/i18n/exceptions.py @@ -482,14 +482,39 @@ class RateLimitExceededError(I18nException): ) -class QuotaExceededError(ForbiddenError): - """Quota exceeded error.""" +class QuotaExceededError(I18nException): + """Quota exceeded error (402).""" + + # resource key -> i18n display key + _RESOURCE_KEY_MAP = { + "workspace": "errors.quota_resources.workspace", + "app": "errors.quota_resources.app", + "skill": "errors.quota_resources.skill", + "knowledge_capacity": "errors.quota_resources.knowledge_capacity", + "memory_engine": "errors.quota_resources.memory_engine", + "end_user": "errors.quota_resources.end_user", + "model": "errors.quota_resources.model", + "ontology_project": "errors.quota_resources.ontology_project", + "api_ops_rate_limit": "errors.quota_resources.api_ops_rate_limit", + } def __init__(self, resource: Optional[str] = None, **params): + # Translate resource key to a localized display name before calling super() if resource: - params["resource"] = resource + resource_i18n_key = self._RESOURCE_KEY_MAP.get(resource) + if resource_i18n_key: + try: + from app.i18n.service import get_translation_service + from app.core.config import settings + _locale = _current_locale.get() or settings.I18N_DEFAULT_LANGUAGE + params["resource"] = get_translation_service().translate(resource_i18n_key, _locale) + except Exception: + params["resource"] = resource + else: + params["resource"] = resource super().__init__( error_key="errors.api.quota_exceeded", + status_code=402, error_code="QUOTA_EXCEEDED", **params ) diff --git a/api/app/locales/en/errors.json b/api/app/locales/en/errors.json index d0276dc9..2355954c 100644 --- a/api/app/locales/en/errors.json +++ b/api/app/locales/en/errors.json @@ -106,7 +106,7 @@ }, "api": { "rate_limit_exceeded": "API rate limit exceeded", - "quota_exceeded": "API quota exceeded", + "quota_exceeded": "{resource} quota exceeded", "invalid_api_key": "Invalid API key", "api_key_expired": "API key has expired", "api_key_revoked": "API key has been revoked", @@ -114,7 +114,8 @@ "method_not_allowed": "Method not allowed", "invalid_request": "Invalid request", "missing_parameter": "Missing required parameter: {param}", - "invalid_parameter": "Invalid parameter: {param}" + "invalid_parameter": "Invalid parameter: {param}", + "api_key_rate_limit_exceeded": "API Key rate limit ({rate_limit}) exceeds tenant plan limit ({limit})" }, "database": { "connection_failed": "Database connection failed", @@ -134,5 +135,16 @@ "invalid_format": "Invalid format: {field}", "invalid_value": "Invalid value: {field}", "out_of_range": "Value out of range: {field}" + }, + "quota_resources": { + "workspace": "Workspace", + "app": "App", + "skill": "Skill", + "knowledge_capacity": "Knowledge capacity", + "memory_engine": "Memory engine", + "end_user": "End user", + "model": "Model", + "ontology_project": "Ontology project", + "api_ops_rate_limit": "API ops rate limit" } } diff --git a/api/app/locales/zh/errors.json b/api/app/locales/zh/errors.json index eafadad4..8b7fdec0 100644 --- a/api/app/locales/zh/errors.json +++ b/api/app/locales/zh/errors.json @@ -106,7 +106,7 @@ }, "api": { "rate_limit_exceeded": "API调用频率超限", - "quota_exceeded": "API调用配额已用完", + "quota_exceeded": "{resource} 配额已超限", "invalid_api_key": "无效的API密钥", "api_key_expired": "API密钥已过期", "api_key_revoked": "API密钥已被撤销", @@ -114,7 +114,8 @@ "method_not_allowed": "不支持的请求方法", "invalid_request": "无效的请求", "missing_parameter": "缺少必需参数:{param}", - "invalid_parameter": "参数无效:{param}" + "invalid_parameter": "参数无效:{param}", + "api_key_rate_limit_exceeded": "API Key 的 QPS 限制({rate_limit})超过租户套餐上限({limit})" }, "database": { "connection_failed": "数据库连接失败", @@ -134,5 +135,16 @@ "invalid_format": "格式不正确:{field}", "invalid_value": "值无效:{field}", "out_of_range": "值超出范围:{field}" + }, + "quota_resources": { + "workspace": "工作空间", + "app": "应用", + "skill": "技能", + "knowledge_capacity": "知识库容量", + "memory_engine": "记忆引擎", + "end_user": "终端用户", + "model": "模型", + "ontology_project": "本体工程", + "api_ops_rate_limit": "API 操作速率" } } diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index 07d55198..7b6b1172 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -19,6 +19,7 @@ from app.core.exceptions import ( ) from app.core.error_codes import BizCode from app.core.logging_config import get_business_logger +from app.i18n.exceptions import I18nException logger = get_business_logger() @@ -51,6 +52,22 @@ class ApiKeyService: if existing: raise BusinessException(f"API Key 名称 {data.name} 已存在", BizCode.API_KEY_DUPLICATE_NAME) + # 校验 rate_limit 不能超过租户套餐的 api_ops_rate_limit + from app.models.workspace_model import Workspace + from app.core.quota_manager import get_api_ops_rate_limit + + workspace = db.query(Workspace).filter(Workspace.id == workspace_id).first() + if workspace: + tenant_api_ops_limit = get_api_ops_rate_limit(db, workspace.tenant_id) + if tenant_api_ops_limit and data.rate_limit > tenant_api_ops_limit: + raise I18nException( + error_key="errors.api.api_key_rate_limit_exceeded", + status_code=400, + error_code="API_KEY_RATE_LIMIT_EXCEEDED", + rate_limit=data.rate_limit, + limit=tenant_api_ops_limit, + ) + # 生成 API Key api_key = generate_api_key(data.type) @@ -152,6 +169,23 @@ class ApiKeyService: if existing: raise BusinessException(f"API Key 名称 {data.name} 已存在", BizCode.API_KEY_DUPLICATE_NAME) + # 校验 rate_limit 不能超过租户套餐的 api_ops_rate_limit + if data.rate_limit is not None: + from app.models.workspace_model import Workspace + from app.core.quota_manager import get_api_ops_rate_limit + + workspace = db.query(Workspace).filter(Workspace.id == workspace_id).first() + if workspace: + tenant_api_ops_limit = get_api_ops_rate_limit(db, workspace.tenant_id) + if tenant_api_ops_limit and data.rate_limit > tenant_api_ops_limit: + raise I18nException( + error_key="errors.api.api_key_rate_limit_exceeded", + status_code=400, + error_code="API_KEY_RATE_LIMIT_EXCEEDED", + rate_limit=data.rate_limit, + limit=tenant_api_ops_limit, + ) + update_data = data.model_dump(exclude_unset=True) ApiKeyRepository.update(db, api_key_id, update_data) db.commit() @@ -248,55 +282,75 @@ class RateLimiterService: def __init__(self): self.redis = aio_redis - async def check_tenant_rate_limit(self, tenant_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: + async def check_tenant_rate_limit(self, window_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: """ - 按 tenant_id 做 1 秒滑动窗口限速,限制值来自套餐配额 api_ops_rate_limit + 按 window_id(workspace_id)做 1 秒滑动窗口限速。 + 限制值来自套餐配额 api_ops_rate_limit。 + 只有请求被允许时才计入窗口,超限请求不污染计数。 """ now = time.time() window_start = now - 1 # 1 秒窗口 - key = f"rate_limit:tenant_qps:{tenant_id}" + key = f"rate_limit:tenant_qps:{window_id}" async with self.redis.pipeline() as pipe: - # 清理 1 秒前的旧记录 pipe.zremrangebyscore(key, 0, window_start) - # 加入当前请求(score=时间戳,member=时间戳+随机数保证唯一) - pipe.zadd(key, {f"{now}:{uuid.uuid4().hex}": now}) - # 统计窗口内请求数 pipe.zcard(key) - # 设置 key 过期(2 秒后自动清理) - pipe.expire(key, 2) results = await pipe.execute() - current = results[2] - remaining = max(0, limit - current) - reset_time = int(now) + 1 + current = results[1] - return current <= limit, { + if current >= limit: + return False, { + "limit": limit, + "remaining": 0, + "reset": int(now) + 1, + } + + member = f"{now}:{uuid.uuid4().hex}" + async with self.redis.pipeline() as pipe: + pipe.zadd(key, {member: now}) + pipe.expire(key, 2) + await pipe.execute() + + return True, { "limit": limit, - "remaining": remaining, - "reset": reset_time, + "remaining": max(0, limit - current - 1), + "reset": int(now) + 1, } async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: """ - 检查QPS限制 - Returns: - (is_allowed, rate_limit_info) + 检查 API Key 自身 QPS 限制(1 秒滑动窗口)。 + 只有请求被允许时才计入窗口,超限请求不污染计数。 """ + now = time.time() + window_start = now - 1 key = f"rate_limit:qps:{api_key_id}" + async with self.redis.pipeline() as pipe: - pipe.incr(key) - pipe.expire(key, 1, nx=True) # 1 秒过期 + pipe.zremrangebyscore(key, 0, window_start) + pipe.zcard(key) results = await pipe.execute() - current = results[0] - remaining = max(0, limit - current) - reset_time = int(time.time()) + 1 + current = results[1] - return current <= limit, { + if current >= limit: + return False, { + "limit": limit, + "remaining": 0, + "reset": int(now) + 1, + } + + member = f"{now}:{uuid.uuid4().hex}" + async with self.redis.pipeline() as pipe: + pipe.zadd(key, {member: now}) + pipe.expire(key, 2) + await pipe.execute() + + return True, { "limit": limit, - "remaining": remaining, - "reset": reset_time + "remaining": max(0, limit - current - 1), + "reset": int(now) + 1, } async def check_daily_requests( @@ -304,7 +358,9 @@ class RateLimiterService: api_key_id: uuid.UUID, limit: int ) -> Tuple[bool, dict]: - """检查日调用量限制""" + """检查日调用量限制。 + 使用原子 INCR,先写后判断,极低概率下允许轻微超限(并发场景下可接受)。 + """ today = datetime.now().strftime("%Y%m%d") key = f"rate_limit:daily:{api_key_id}:{today}" @@ -313,6 +369,7 @@ class RateLimiterService: hour=0, minute=0, second=0, microsecond=0 ) expire_seconds = int((tomorrow_0 - now).total_seconds()) + reset_time = int(tomorrow_0.timestamp()) async with self.redis.pipeline() as pipe: pipe.incr(key) @@ -320,25 +377,37 @@ class RateLimiterService: results = await pipe.execute() current = results[0] - remaining = max(0, limit - current) - reset_time = int(tomorrow_0.timestamp()) - return current <= limit, { + if current > limit: + return False, { + "limit": limit, + "remaining": 0, + "reset": reset_time, + } + + return True, { "limit": limit, - "remaining": remaining, - "reset": reset_time + "remaining": max(0, limit - current), + "reset": reset_time, } async def check_all_limits( self, - api_key: ApiKey + api_key: ApiKey, + db: Optional[Session] = None, ) -> Tuple[bool, str, dict]: """ - 检查所有限制 - Returns: - (is_allowed, error_message, rate_limit_headers) + 检查所有限制,按以下顺序: + 1. API Key 自身 QPS(每个 key 独立,不超过其配置的 rate_limit) + 2. 租户套餐 api_ops_rate_limit(每个 key 独立受套餐限额约束) + 3. API Key 日调用量 + + 设计语义: + - 租户下 N 个 API Key 可以并发调用,互不影响 + - 每个 API Key 独立受自身 rate_limit 约束 + - 每个 API Key 独立受租户套餐 api_ops_rate_limit 约束(per-key 限额) """ - # Check QPS + # 1. 检查 API Key 自身 QPS qps_ok, qps_info = await self.check_qps( api_key.id, api_key.rate_limit @@ -350,6 +419,50 @@ class RateLimiterService: "X-RateLimit-Reset": str(qps_info["reset"]) } + # 2. 检查租户套餐 api_ops_rate_limit(运行时流量控制) + # 每个 API Key 的速率都不能超过租户套餐上限,无论 key 自身配置多少 + tenant_info = None + if db is not None: + try: + from app.models.workspace_model import Workspace + from app.core.quota_manager import get_api_ops_rate_limit + + # 从 Redis 缓存取 tenant_limit,避免每次请求都查数据库 + # TTL 60 秒,套餐变更后最多 1 分钟生效 + # 用 workspace_id 作为滑动窗口维度(一个 workspace 只属于一个 tenant,语义等价) + cache_key = f"tenant_api_ops_limit:{api_key.workspace_id}" + cached = await self.redis.get(cache_key) + if cached is not None: + try: + tenant_limit = int(cached) if cached != "0" else None + except (ValueError, TypeError): + # 缓存数据损坏,回源查数据库 + cached = None + tenant_limit = None + + if cached is None: + workspace = db.query(Workspace).filter(Workspace.id == api_key.workspace_id).first() + if workspace: + tenant_limit = get_api_ops_rate_limit(db, workspace.tenant_id) + await self.redis.set(cache_key, str(tenant_limit) if tenant_limit else "0", ex=60) + else: + tenant_limit = None + + if tenant_limit: + # 用 api_key.id 作为滑动窗口 key,每个 API Key 独立受 api_ops_rate_limit 约束 + tenant_ok, tenant_info = await self.check_tenant_rate_limit( + api_key.id, tenant_limit + ) + if not tenant_ok: + return False, "API ops rate limit exceeded", { + "X-RateLimit-Limit-QPS": str(tenant_limit), + "X-RateLimit-Remaining-QPS": str(tenant_info["remaining"]), + "X-RateLimit-Reset": str(tenant_info["reset"]), + } + except Exception as e: + logger.warning(f"租户 api_ops_rate_limit 运行时检查失败,跳过: {e}") + + # 3. 检查日调用量 daily_ok, daily_info = await self.check_daily_requests( api_key.id, api_key.daily_request_limit @@ -366,8 +479,12 @@ class RateLimiterService: "X-RateLimit-Remaining-QPS": str(qps_info["remaining"]), "X-RateLimit-Limit-Day": str(daily_info["limit"]), "X-RateLimit-Remaining-Day": str(daily_info["remaining"]), - "X-RateLimit-Reset": str(daily_info["reset"]) + "X-RateLimit-Reset": str(daily_info["reset"]), } + # 如果租户限速信息存在,补充到响应头 + if tenant_info: + headers["X-RateLimit-Limit-Tenant-QPS"] = str(tenant_info["limit"]) + headers["X-RateLimit-Remaining-Tenant-QPS"] = str(tenant_info["remaining"]) return True, "", headers From c448cf06602c0e72acef2cbedd78f544ffb9e0bd Mon Sep 17 00:00:00 2001 From: wwq Date: Mon, 20 Apr 2026 16:13:30 +0800 Subject: [PATCH 2/7] refactor(rate-limit): change rate limiting granularity from tenant to API Key - Refactor rate limiting mechanism to limit per API Key instead of per tenant (workspace). - Update error code logic and Redis key naming conventions. - Adjust quota usage statistics to display the QPS of the API Key closest to its limit. --- api/app/core/api_key_auth.py | 9 ++++----- api/app/core/quota_manager.py | 21 +++++++++++++++++---- api/app/services/api_key_service.py | 4 ++-- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/api/app/core/api_key_auth.py b/api/app/core/api_key_auth.py index 297e5082..b7cacd21 100644 --- a/api/app/core/api_key_auth.py +++ b/api/app/core/api_key_auth.py @@ -106,12 +106,11 @@ def require_api_key( "error_msg": error_msg }) # 根据错误消息判断限流类型 - if "QPS" in error_msg: - code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED - elif "Daily" in error_msg: + if "Daily" in error_msg: code = BizCode.API_KEY_DAILY_LIMIT_EXCEEDED - elif "Tenant" in error_msg: - code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED # 租户套餐速率超限,同属 QPS 类 + elif "QPS" in error_msg or "ops rate limit" in error_msg: + # "QPS limit exceeded" 和 "API ops rate limit exceeded" 同属 QPS 类 + code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED else: code = BizCode.API_KEY_QUOTA_EXCEEDED diff --git a/api/app/core/quota_manager.py b/api/app/core/quota_manager.py index 43d8fa42..28130dee 100644 --- a/api/app/core/quota_manager.py +++ b/api/app/core/quota_manager.py @@ -19,8 +19,8 @@ from app.i18n.exceptions import QuotaExceededError, InternalServerError logger = get_auth_logger() -# Redis key 格式常量,与 RateLimiterService.check_tenant_rate_limit 保持一致 -TENANT_QPS_REDIS_KEY = "rate_limit:tenant_qps:{tenant_id}" +# Redis key 格式常量,与 RateLimiterService.check_qps 保持一致(per api_key 独立计数) +API_KEY_QPS_REDIS_KEY = "rate_limit:qps:{api_key_id}" def _get_user_from_kwargs(kwargs: dict): @@ -595,9 +595,22 @@ async def get_quota_usage(db: Session, tenant_id: UUID) -> dict: api_ops_current = 0 try: from app.aioRedis import aio_redis as _aio_redis + from app.models.api_key_model import ApiKey + from app.models.workspace_model import Workspace _now = time.time() - _rk = TENANT_QPS_REDIS_KEY.format(tenant_id=tenant_id) - api_ops_current = int(await _aio_redis.zcount(_rk, _now - 1, "+inf") or 0) + # api_ops_rate_limit 限的是每个 api_key 每秒最高限额 + # 展示当前最接近触发限流的 key 的 QPS(取最大值) + api_key_ids = db.query(ApiKey.id).join( + Workspace, ApiKey.workspace_id == Workspace.id + ).filter( + Workspace.tenant_id == tenant_id, + ApiKey.is_active.is_(True) + ).all() + for (key_id,) in api_key_ids: + _rk = API_KEY_QPS_REDIS_KEY.format(api_key_id=key_id) + count = int(await _aio_redis.zcount(_rk, _now - 1, "+inf") or 0) + if count > api_ops_current: + api_ops_current = count except Exception as e: logger.warning(f"获取 api_ops_current 失败,返回 0: {type(e).__name__}: {e}") diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index 7b6b1172..4fe9d8b5 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -284,8 +284,8 @@ class RateLimiterService: async def check_tenant_rate_limit(self, window_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: """ - 按 window_id(workspace_id)做 1 秒滑动窗口限速。 - 限制值来自套餐配额 api_ops_rate_limit。 + 按 window_id(api_key_id)做 1 秒滑动窗口限速。 + 限制值来自套餐配额 api_ops_rate_limit,每个 API Key 独立受此上限约束。 只有请求被允许时才计入窗口,超限请求不污染计数。 """ now = time.time() From b03300c804a8f60a5020468399b763bc041a77fb Mon Sep 17 00:00:00 2001 From: wwq Date: Mon, 20 Apr 2026 17:18:05 +0800 Subject: [PATCH 3/7] refactor(rate_limit): refactor API Key rate limiting and remove tenant-level QPS check - Streamline rate limit check flow by removing redundant tenant-level QPS checks. - Restrict checks to API Key QPS and plan degradation protection only. - Update constant naming and error message handling for consistency. --- api/app/core/api_key_auth.py | 3 +- api/app/core/quota_stub.py | 4 +- api/app/services/api_key_service.py | 83 ++++++----------------------- 3 files changed, 18 insertions(+), 72 deletions(-) diff --git a/api/app/core/api_key_auth.py b/api/app/core/api_key_auth.py index b7cacd21..1ded6f81 100644 --- a/api/app/core/api_key_auth.py +++ b/api/app/core/api_key_auth.py @@ -108,8 +108,7 @@ def require_api_key( # 根据错误消息判断限流类型 if "Daily" in error_msg: code = BizCode.API_KEY_DAILY_LIMIT_EXCEEDED - elif "QPS" in error_msg or "ops rate limit" in error_msg: - # "QPS limit exceeded" 和 "API ops rate limit exceeded" 同属 QPS 类 + elif "QPS" in error_msg: code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED else: code = BizCode.API_KEY_QUOTA_EXCEEDED diff --git a/api/app/core/quota_stub.py b/api/app/core/quota_stub.py index 4a8f7cee..248d0875 100644 --- a/api/app/core/quota_stub.py +++ b/api/app/core/quota_stub.py @@ -18,7 +18,7 @@ from app.core.quota_manager import ( get_quota_usage, _check_quota, QuotaUsageRepository, - TENANT_QPS_REDIS_KEY, + API_KEY_QPS_REDIS_KEY, ) __all__ = [ @@ -34,5 +34,5 @@ __all__ = [ "get_quota_usage", "_check_quota", "QuotaUsageRepository", - "TENANT_QPS_REDIS_KEY", + "API_KEY_QPS_REDIS_KEY", ] diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index 4fe9d8b5..d710d4ee 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -282,42 +282,6 @@ class RateLimiterService: def __init__(self): self.redis = aio_redis - async def check_tenant_rate_limit(self, window_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: - """ - 按 window_id(api_key_id)做 1 秒滑动窗口限速。 - 限制值来自套餐配额 api_ops_rate_limit,每个 API Key 独立受此上限约束。 - 只有请求被允许时才计入窗口,超限请求不污染计数。 - """ - now = time.time() - window_start = now - 1 # 1 秒窗口 - key = f"rate_limit:tenant_qps:{window_id}" - - async with self.redis.pipeline() as pipe: - pipe.zremrangebyscore(key, 0, window_start) - pipe.zcard(key) - results = await pipe.execute() - - current = results[1] - - if current >= limit: - return False, { - "limit": limit, - "remaining": 0, - "reset": int(now) + 1, - } - - member = f"{now}:{uuid.uuid4().hex}" - async with self.redis.pipeline() as pipe: - pipe.zadd(key, {member: now}) - pipe.expire(key, 2) - await pipe.execute() - - return True, { - "limit": limit, - "remaining": max(0, limit - current - 1), - "reset": int(now) + 1, - } - async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: """ 检查 API Key 自身 QPS 限制(1 秒滑动窗口)。 @@ -337,6 +301,7 @@ class RateLimiterService: if current >= limit: return False, { "limit": limit, + "current": current, "remaining": 0, "reset": int(now) + 1, } @@ -349,6 +314,7 @@ class RateLimiterService: return True, { "limit": limit, + "current": current + 1, "remaining": max(0, limit - current - 1), "reset": int(now) + 1, } @@ -398,14 +364,9 @@ class RateLimiterService: ) -> Tuple[bool, str, dict]: """ 检查所有限制,按以下顺序: - 1. API Key 自身 QPS(每个 key 独立,不超过其配置的 rate_limit) - 2. 租户套餐 api_ops_rate_limit(每个 key 独立受套餐限额约束) + 1. API Key 自身 QPS(rate_limit 在创建时已保证不超过套餐 api_ops_rate_limit) + 2. 套餐降级保护:若套餐已降级导致 rate_limit > 套餐上限,用套餐上限二次卡控 3. API Key 日调用量 - - 设计语义: - - 租户下 N 个 API Key 可以并发调用,互不影响 - - 每个 API Key 独立受自身 rate_limit 约束 - - 每个 API Key 独立受租户套餐 api_ops_rate_limit 约束(per-key 限额) """ # 1. 检查 API Key 自身 QPS qps_ok, qps_info = await self.check_qps( @@ -419,24 +380,20 @@ class RateLimiterService: "X-RateLimit-Reset": str(qps_info["reset"]) } - # 2. 检查租户套餐 api_ops_rate_limit(运行时流量控制) - # 每个 API Key 的速率都不能超过租户套餐上限,无论 key 自身配置多少 - tenant_info = None + # 2. 套餐降级保护 + # 套餐降级后已有 api_key 的 rate_limit 可能高于新套餐上限 + # 复用第1关已计好的 current,不重复写 Redis if db is not None: try: from app.models.workspace_model import Workspace from app.core.quota_manager import get_api_ops_rate_limit - # 从 Redis 缓存取 tenant_limit,避免每次请求都查数据库 - # TTL 60 秒,套餐变更后最多 1 分钟生效 - # 用 workspace_id 作为滑动窗口维度(一个 workspace 只属于一个 tenant,语义等价) cache_key = f"tenant_api_ops_limit:{api_key.workspace_id}" cached = await self.redis.get(cache_key) if cached is not None: try: tenant_limit = int(cached) if cached != "0" else None except (ValueError, TypeError): - # 缓存数据损坏,回源查数据库 cached = None tenant_limit = None @@ -448,19 +405,14 @@ class RateLimiterService: else: tenant_limit = None - if tenant_limit: - # 用 api_key.id 作为滑动窗口 key,每个 API Key 独立受 api_ops_rate_limit 约束 - tenant_ok, tenant_info = await self.check_tenant_rate_limit( - api_key.id, tenant_limit - ) - if not tenant_ok: - return False, "API ops rate limit exceeded", { - "X-RateLimit-Limit-QPS": str(tenant_limit), - "X-RateLimit-Remaining-QPS": str(tenant_info["remaining"]), - "X-RateLimit-Reset": str(tenant_info["reset"]), - } + if tenant_limit and qps_info["current"] > tenant_limit: + return False, "QPS limit exceeded", { + "X-RateLimit-Limit-QPS": str(tenant_limit), + "X-RateLimit-Remaining-QPS": "0", + "X-RateLimit-Reset": str(qps_info["reset"]), + } except Exception as e: - logger.warning(f"租户 api_ops_rate_limit 运行时检查失败,跳过: {e}") + logger.warning(f"套餐降级保护检查失败,跳过: {e}") # 3. 检查日调用量 daily_ok, daily_info = await self.check_daily_requests( @@ -474,18 +426,13 @@ class RateLimiterService: "X-RateLimit-Reset": str(daily_info["reset"]) } - headers = { + return True, "", { "X-RateLimit-Limit-QPS": str(qps_info["limit"]), "X-RateLimit-Remaining-QPS": str(qps_info["remaining"]), "X-RateLimit-Limit-Day": str(daily_info["limit"]), "X-RateLimit-Remaining-Day": str(daily_info["remaining"]), "X-RateLimit-Reset": str(daily_info["reset"]), } - # 如果租户限速信息存在,补充到响应头 - if tenant_info: - headers["X-RateLimit-Limit-Tenant-QPS"] = str(tenant_info["limit"]) - headers["X-RateLimit-Remaining-Tenant-QPS"] = str(tenant_info["remaining"]) - return True, "", headers class ApiKeyAuthService: From 08b5c7bc8a09ac36fc20427cf795ece38dcdd99d Mon Sep 17 00:00:00 2001 From: wwq Date: Mon, 20 Apr 2026 17:46:05 +0800 Subject: [PATCH 4/7] =?UTF-8?q?perf(=E9=99=90=E6=B5=81=E6=9C=8D=E5=8A=A1):?= =?UTF-8?q?=20=E4=BC=98=E5=8C=96Redis=E6=9F=A5=E8=AF=A2=E4=BB=A5=E5=87=8F?= =?UTF-8?q?=E5=B0=91=E5=91=BD=E4=BB=A4=E6=95=B0=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 使用zcount替代zremrangebyscore和zcard组合查询,减少一次Redis操作 --- api/app/services/api_key_service.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index d710d4ee..c1bbbdc8 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -292,11 +292,10 @@ class RateLimiterService: key = f"rate_limit:qps:{api_key_id}" async with self.redis.pipeline() as pipe: - pipe.zremrangebyscore(key, 0, window_start) - pipe.zcard(key) + pipe.zcount(key, window_start, "+inf") results = await pipe.execute() - current = results[1] + current = results[0] if current >= limit: return False, { From 3227c25b07a805caab6d8c4c5fc71ddd7278e33c Mon Sep 17 00:00:00 2001 From: wwq Date: Mon, 20 Apr 2026 18:10:28 +0800 Subject: [PATCH 5/7] fix(quota): fix tenant ID retrieval and QPS counting logic - Fix issue where tenant ID lookup from shared records failed to query the workspace correctly. - Switch QPS counting from sliding window to simple counter to improve performance and simplify logic. - Remove unnecessary `time` module import. --- api/app/core/quota_manager.py | 9 ++-- api/app/services/api_key_service.py | 79 +++++++++++------------------ 2 files changed, 34 insertions(+), 54 deletions(-) diff --git a/api/app/core/quota_manager.py b/api/app/core/quota_manager.py index 28130dee..bf04059e 100644 --- a/api/app/core/quota_manager.py +++ b/api/app/core/quota_manager.py @@ -6,7 +6,6 @@ 2. 降级到 default_free_plan.py 配置文件(社区版兜底) """ import asyncio -import time from functools import wraps from typing import Optional, Callable, Dict, Any from uuid import UUID @@ -68,7 +67,9 @@ def _get_tenant_id_from_kwargs(db: Session, kwargs: dict): if share_record: app = db.query(App).filter(App.id == share_record.app_id, App.is_active.is_(True)).first() if app: - return app.workspace.tenant_id + workspace = db.query(Workspace).filter(Workspace.id == app.workspace_id).first() + if workspace: + return workspace.tenant_id return None @@ -597,7 +598,6 @@ async def get_quota_usage(db: Session, tenant_id: UUID) -> dict: from app.aioRedis import aio_redis as _aio_redis from app.models.api_key_model import ApiKey from app.models.workspace_model import Workspace - _now = time.time() # api_ops_rate_limit 限的是每个 api_key 每秒最高限额 # 展示当前最接近触发限流的 key 的 QPS(取最大值) api_key_ids = db.query(ApiKey.id).join( @@ -608,7 +608,8 @@ async def get_quota_usage(db: Session, tenant_id: UUID) -> dict: ).all() for (key_id,) in api_key_ids: _rk = API_KEY_QPS_REDIS_KEY.format(api_key_id=key_id) - count = int(await _aio_redis.zcount(_rk, _now - 1, "+inf") or 0) + val = await _aio_redis.get(_rk) + count = int(val) if val else 0 if count > api_ops_current: api_ops_current = count except Exception as e: diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index c1bbbdc8..e67d623e 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -283,39 +283,27 @@ class RateLimiterService: self.redis = aio_redis async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: + """检查QPS限制 + + Returns: + (is_allowed, rate_limit_info) """ - 检查 API Key 自身 QPS 限制(1 秒滑动窗口)。 - 只有请求被允许时才计入窗口,超限请求不污染计数。 - """ - now = time.time() - window_start = now - 1 key = f"rate_limit:qps:{api_key_id}" async with self.redis.pipeline() as pipe: - pipe.zcount(key, window_start, "+inf") + pipe.incr(key) + pipe.expire(key, 1, nx=True) # 1 秒过期 results = await pipe.execute() current = results[0] + remaining = max(0, limit - current) + reset_time = int(time.time()) + 1 - if current >= limit: - return False, { - "limit": limit, - "current": current, - "remaining": 0, - "reset": int(now) + 1, - } - - member = f"{now}:{uuid.uuid4().hex}" - async with self.redis.pipeline() as pipe: - pipe.zadd(key, {member: now}) - pipe.expire(key, 2) - await pipe.execute() - - return True, { + return current <= limit, { "limit": limit, - "current": current + 1, - "remaining": max(0, limit - current - 1), - "reset": int(now) + 1, + "current": current, + "remaining": remaining, + "reset": reset_time, } async def check_daily_requests( @@ -363,25 +351,11 @@ class RateLimiterService: ) -> Tuple[bool, str, dict]: """ 检查所有限制,按以下顺序: - 1. API Key 自身 QPS(rate_limit 在创建时已保证不超过套餐 api_ops_rate_limit) - 2. 套餐降级保护:若套餐已降级导致 rate_limit > 套餐上限,用套餐上限二次卡控 - 3. API Key 日调用量 + 1. API Key QPS:取 api_key.rate_limit 与套餐 api_ops_rate_limit 的最小值作为限额 + 2. API Key 日调用量 """ - # 1. 检查 API Key 自身 QPS - qps_ok, qps_info = await self.check_qps( - api_key.id, - api_key.rate_limit - ) - if not qps_ok: - return False, "QPS limit exceeded", { - "X-RateLimit-Limit-QPS": str(qps_info["limit"]), - "X-RateLimit-Remaining-QPS": str(qps_info["remaining"]), - "X-RateLimit-Reset": str(qps_info["reset"]) - } - - # 2. 套餐降级保护 - # 套餐降级后已有 api_key 的 rate_limit 可能高于新套餐上限 - # 复用第1关已计好的 current,不重复写 Redis + # 1. 取套餐限额与 api_key 自身限额的最小值 + effective_limit = api_key.rate_limit if db is not None: try: from app.models.workspace_model import Workspace @@ -404,16 +378,21 @@ class RateLimiterService: else: tenant_limit = None - if tenant_limit and qps_info["current"] > tenant_limit: - return False, "QPS limit exceeded", { - "X-RateLimit-Limit-QPS": str(tenant_limit), - "X-RateLimit-Remaining-QPS": "0", - "X-RateLimit-Reset": str(qps_info["reset"]), - } + if tenant_limit: + effective_limit = min(api_key.rate_limit, tenant_limit) except Exception as e: - logger.warning(f"套餐降级保护检查失败,跳过: {e}") + logger.warning(f"获取套餐限额失败,使用 api_key 自身限额: {e}") - # 3. 检查日调用量 + # 用最终有效限额做 QPS 检查 + qps_ok, qps_info = await self.check_qps(api_key.id, effective_limit) + if not qps_ok: + return False, "QPS limit exceeded", { + "X-RateLimit-Limit-QPS": str(qps_info["limit"]), + "X-RateLimit-Remaining-QPS": str(qps_info["remaining"]), + "X-RateLimit-Reset": str(qps_info["reset"]) + } + + # 2. 检查日调用量 daily_ok, daily_info = await self.check_daily_requests( api_key.id, api_key.daily_request_limit From d59990d3260652a88d55773b47ae2e26e8a713cf Mon Sep 17 00:00:00 2001 From: wwq Date: Mon, 20 Apr 2026 18:25:39 +0800 Subject: [PATCH 6/7] fix(rate_limit): differentiate between tenant plan and API Key QPS limit errors - Add logic to detect tenant plan QPS limits and return a specific error message when triggered. - Simplify boolean check in model activation quota validation. --- api/app/core/api_key_auth.py | 2 ++ api/app/core/quota_manager.py | 4 ++-- api/app/services/api_key_service.py | 7 ++++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/api/app/core/api_key_auth.py b/api/app/core/api_key_auth.py index 1ded6f81..05bca945 100644 --- a/api/app/core/api_key_auth.py +++ b/api/app/core/api_key_auth.py @@ -108,6 +108,8 @@ def require_api_key( # 根据错误消息判断限流类型 if "Daily" in error_msg: code = BizCode.API_KEY_DAILY_LIMIT_EXCEEDED + elif "Tenant" in error_msg: + code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED # 租户套餐速率超限,同属 QPS 类 elif "QPS" in error_msg: code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED else: diff --git a/api/app/core/quota_manager.py b/api/app/core/quota_manager.py index bf04059e..534e1940 100644 --- a/api/app/core/quota_manager.py +++ b/api/app/core/quota_manager.py @@ -488,7 +488,7 @@ def check_model_activation_quota(func: Callable) -> Callable: logger.warning("模型激活配额检查失败:缺少 model_id 或 model_data 参数") return await func(*args, **kwargs) - if model_data.is_active is True: + if model_data.is_active: try: from app.services.model_service import ModelConfigService @@ -522,7 +522,7 @@ def check_model_activation_quota(func: Callable) -> Callable: logger.warning("模型激活配额检查失败:缺少 model_id 或 model_data 参数") return func(*args, **kwargs) - if model_data.is_active is True: + if model_data.is_active: try: from app.services.model_service import ModelConfigService diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index e67d623e..5595e93f 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -386,7 +386,12 @@ class RateLimiterService: # 用最终有效限额做 QPS 检查 qps_ok, qps_info = await self.check_qps(api_key.id, effective_limit) if not qps_ok: - return False, "QPS limit exceeded", { + # 判断是套餐限额触发还是 api_key 自身限额触发 + if tenant_limit and effective_limit == tenant_limit and api_key.rate_limit > tenant_limit: + error_msg = "Tenant QPS limit exceeded" + else: + error_msg = "QPS limit exceeded" + return False, error_msg, { "X-RateLimit-Limit-QPS": str(qps_info["limit"]), "X-RateLimit-Remaining-QPS": str(qps_info["remaining"]), "X-RateLimit-Reset": str(qps_info["reset"]) From 817aa78d0394d062e10423e0910acd063f3b524d Mon Sep 17 00:00:00 2001 From: wwq Date: Mon, 20 Apr 2026 18:34:18 +0800 Subject: [PATCH 7/7] fix(rate_limit): differentiate between tenant plan and API Key QPS limit errors - Add logic to detect tenant plan QPS limits and return a specific error message when triggered. - Simplify boolean check in model activation quota validation. --- api/app/services/api_key_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index 5595e93f..53aad5ce 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -388,7 +388,7 @@ class RateLimiterService: if not qps_ok: # 判断是套餐限额触发还是 api_key 自身限额触发 if tenant_limit and effective_limit == tenant_limit and api_key.rate_limit > tenant_limit: - error_msg = "Tenant QPS limit exceeded" + error_msg = "Tenant limit exceeded" else: error_msg = "QPS limit exceeded" return False, error_msg, {