refactor(rate_limit): refactor API Key rate limiting and remove tenant-level QPS check

- Streamline rate limit check flow by removing redundant tenant-level QPS checks.
- Restrict checks to API Key QPS and plan degradation protection only.
- Update constant naming and error message handling for consistency.
This commit is contained in:
wwq
2026-04-20 17:18:05 +08:00
parent c448cf0660
commit b03300c804
3 changed files with 18 additions and 72 deletions

View File

@@ -108,8 +108,7 @@ def require_api_key(
# 根据错误消息判断限流类型 # 根据错误消息判断限流类型
if "Daily" in error_msg: if "Daily" in error_msg:
code = BizCode.API_KEY_DAILY_LIMIT_EXCEEDED code = BizCode.API_KEY_DAILY_LIMIT_EXCEEDED
elif "QPS" in error_msg or "ops rate limit" in error_msg: elif "QPS" in error_msg:
# "QPS limit exceeded" 和 "API ops rate limit exceeded" 同属 QPS 类
code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED code = BizCode.API_KEY_QPS_LIMIT_EXCEEDED
else: else:
code = BizCode.API_KEY_QUOTA_EXCEEDED code = BizCode.API_KEY_QUOTA_EXCEEDED

View File

@@ -18,7 +18,7 @@ from app.core.quota_manager import (
get_quota_usage, get_quota_usage,
_check_quota, _check_quota,
QuotaUsageRepository, QuotaUsageRepository,
TENANT_QPS_REDIS_KEY, API_KEY_QPS_REDIS_KEY,
) )
__all__ = [ __all__ = [
@@ -34,5 +34,5 @@ __all__ = [
"get_quota_usage", "get_quota_usage",
"_check_quota", "_check_quota",
"QuotaUsageRepository", "QuotaUsageRepository",
"TENANT_QPS_REDIS_KEY", "API_KEY_QPS_REDIS_KEY",
] ]

View File

@@ -282,42 +282,6 @@ class RateLimiterService:
def __init__(self): def __init__(self):
self.redis = aio_redis self.redis = aio_redis
async def check_tenant_rate_limit(self, window_id: uuid.UUID, limit: int) -> Tuple[bool, dict]:
"""
按 window_idapi_key_id做 1 秒滑动窗口限速。
限制值来自套餐配额 api_ops_rate_limit每个 API Key 独立受此上限约束。
只有请求被允许时才计入窗口,超限请求不污染计数。
"""
now = time.time()
window_start = now - 1 # 1 秒窗口
key = f"rate_limit:tenant_qps:{window_id}"
async with self.redis.pipeline() as pipe:
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
results = await pipe.execute()
current = results[1]
if current >= limit:
return False, {
"limit": limit,
"remaining": 0,
"reset": int(now) + 1,
}
member = f"{now}:{uuid.uuid4().hex}"
async with self.redis.pipeline() as pipe:
pipe.zadd(key, {member: now})
pipe.expire(key, 2)
await pipe.execute()
return True, {
"limit": limit,
"remaining": max(0, limit - current - 1),
"reset": int(now) + 1,
}
async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]:
""" """
检查 API Key 自身 QPS 限制1 秒滑动窗口)。 检查 API Key 自身 QPS 限制1 秒滑动窗口)。
@@ -337,6 +301,7 @@ class RateLimiterService:
if current >= limit: if current >= limit:
return False, { return False, {
"limit": limit, "limit": limit,
"current": current,
"remaining": 0, "remaining": 0,
"reset": int(now) + 1, "reset": int(now) + 1,
} }
@@ -349,6 +314,7 @@ class RateLimiterService:
return True, { return True, {
"limit": limit, "limit": limit,
"current": current + 1,
"remaining": max(0, limit - current - 1), "remaining": max(0, limit - current - 1),
"reset": int(now) + 1, "reset": int(now) + 1,
} }
@@ -398,14 +364,9 @@ class RateLimiterService:
) -> Tuple[bool, str, dict]: ) -> Tuple[bool, str, dict]:
""" """
检查所有限制,按以下顺序: 检查所有限制,按以下顺序:
1. API Key 自身 QPS每个 key 独立,不超过其配置的 rate_limit 1. API Key 自身 QPSrate_limit 在创建时已保证不超过套餐 api_ops_rate_limit
2. 租户套餐 api_ops_rate_limit每个 key 独立受套餐限额约束) 2. 套餐降级保护:若套餐已降级导致 rate_limit > 套餐上限,用套餐上限二次卡控
3. API Key 日调用量 3. API Key 日调用量
设计语义:
- 租户下 N 个 API Key 可以并发调用,互不影响
- 每个 API Key 独立受自身 rate_limit 约束
- 每个 API Key 独立受租户套餐 api_ops_rate_limit 约束per-key 限额)
""" """
# 1. 检查 API Key 自身 QPS # 1. 检查 API Key 自身 QPS
qps_ok, qps_info = await self.check_qps( qps_ok, qps_info = await self.check_qps(
@@ -419,24 +380,20 @@ class RateLimiterService:
"X-RateLimit-Reset": str(qps_info["reset"]) "X-RateLimit-Reset": str(qps_info["reset"])
} }
# 2. 检查租户套餐 api_ops_rate_limit运行时流量控制 # 2. 套餐降级保护
# 每个 API Key 的速率都不能超过租户套餐上限,无论 key 自身配置多少 # 套餐降级后已有 api_key 的 rate_limit 可能高于新套餐上限
tenant_info = None # 复用第1关已计好的 current不重复写 Redis
if db is not None: if db is not None:
try: try:
from app.models.workspace_model import Workspace from app.models.workspace_model import Workspace
from app.core.quota_manager import get_api_ops_rate_limit from app.core.quota_manager import get_api_ops_rate_limit
# 从 Redis 缓存取 tenant_limit避免每次请求都查数据库
# TTL 60 秒,套餐变更后最多 1 分钟生效
# 用 workspace_id 作为滑动窗口维度(一个 workspace 只属于一个 tenant语义等价
cache_key = f"tenant_api_ops_limit:{api_key.workspace_id}" cache_key = f"tenant_api_ops_limit:{api_key.workspace_id}"
cached = await self.redis.get(cache_key) cached = await self.redis.get(cache_key)
if cached is not None: if cached is not None:
try: try:
tenant_limit = int(cached) if cached != "0" else None tenant_limit = int(cached) if cached != "0" else None
except (ValueError, TypeError): except (ValueError, TypeError):
# 缓存数据损坏,回源查数据库
cached = None cached = None
tenant_limit = None tenant_limit = None
@@ -448,19 +405,14 @@ class RateLimiterService:
else: else:
tenant_limit = None tenant_limit = None
if tenant_limit: if tenant_limit and qps_info["current"] > tenant_limit:
# 用 api_key.id 作为滑动窗口 key每个 API Key 独立受 api_ops_rate_limit 约束 return False, "QPS limit exceeded", {
tenant_ok, tenant_info = await self.check_tenant_rate_limit( "X-RateLimit-Limit-QPS": str(tenant_limit),
api_key.id, tenant_limit "X-RateLimit-Remaining-QPS": "0",
) "X-RateLimit-Reset": str(qps_info["reset"]),
if not tenant_ok: }
return False, "API ops rate limit exceeded", {
"X-RateLimit-Limit-QPS": str(tenant_limit),
"X-RateLimit-Remaining-QPS": str(tenant_info["remaining"]),
"X-RateLimit-Reset": str(tenant_info["reset"]),
}
except Exception as e: except Exception as e:
logger.warning(f"租户 api_ops_rate_limit 运行时检查失败,跳过: {e}") logger.warning(f"套餐降级保护检查失败,跳过: {e}")
# 3. 检查日调用量 # 3. 检查日调用量
daily_ok, daily_info = await self.check_daily_requests( daily_ok, daily_info = await self.check_daily_requests(
@@ -474,18 +426,13 @@ class RateLimiterService:
"X-RateLimit-Reset": str(daily_info["reset"]) "X-RateLimit-Reset": str(daily_info["reset"])
} }
headers = { return True, "", {
"X-RateLimit-Limit-QPS": str(qps_info["limit"]), "X-RateLimit-Limit-QPS": str(qps_info["limit"]),
"X-RateLimit-Remaining-QPS": str(qps_info["remaining"]), "X-RateLimit-Remaining-QPS": str(qps_info["remaining"]),
"X-RateLimit-Limit-Day": str(daily_info["limit"]), "X-RateLimit-Limit-Day": str(daily_info["limit"]),
"X-RateLimit-Remaining-Day": str(daily_info["remaining"]), "X-RateLimit-Remaining-Day": str(daily_info["remaining"]),
"X-RateLimit-Reset": str(daily_info["reset"]), "X-RateLimit-Reset": str(daily_info["reset"]),
} }
# 如果租户限速信息存在,补充到响应头
if tenant_info:
headers["X-RateLimit-Limit-Tenant-QPS"] = str(tenant_info["limit"])
headers["X-RateLimit-Remaining-Tenant-QPS"] = str(tenant_info["remaining"])
return True, "", headers
class ApiKeyAuthService: class ApiKeyAuthService: