feat(quota): refactor quota management and rate limiting services
- Add `API_KEY_RATE_LIMIT_EXCEEDED` error code. - Refactor `QuotaExceededError` to support resource type localization. - Optimize rate limiting service by implementing the sliding window algorithm. - Add rate limit validation for tenant plans. - Unify quota check decorator to support both synchronous and asynchronous operations. - Enhance quota usage statistics endpoints.
This commit is contained in:
@@ -19,6 +19,7 @@ from app.core.exceptions import (
|
||||
)
|
||||
from app.core.error_codes import BizCode
|
||||
from app.core.logging_config import get_business_logger
|
||||
from app.i18n.exceptions import I18nException
|
||||
|
||||
logger = get_business_logger()
|
||||
|
||||
@@ -51,6 +52,22 @@ class ApiKeyService:
|
||||
if existing:
|
||||
raise BusinessException(f"API Key 名称 {data.name} 已存在", BizCode.API_KEY_DUPLICATE_NAME)
|
||||
|
||||
# 校验 rate_limit 不能超过租户套餐的 api_ops_rate_limit
|
||||
from app.models.workspace_model import Workspace
|
||||
from app.core.quota_manager import get_api_ops_rate_limit
|
||||
|
||||
workspace = db.query(Workspace).filter(Workspace.id == workspace_id).first()
|
||||
if workspace:
|
||||
tenant_api_ops_limit = get_api_ops_rate_limit(db, workspace.tenant_id)
|
||||
if tenant_api_ops_limit and data.rate_limit > tenant_api_ops_limit:
|
||||
raise I18nException(
|
||||
error_key="errors.api.api_key_rate_limit_exceeded",
|
||||
status_code=400,
|
||||
error_code="API_KEY_RATE_LIMIT_EXCEEDED",
|
||||
rate_limit=data.rate_limit,
|
||||
limit=tenant_api_ops_limit,
|
||||
)
|
||||
|
||||
# 生成 API Key
|
||||
api_key = generate_api_key(data.type)
|
||||
|
||||
@@ -152,6 +169,23 @@ class ApiKeyService:
|
||||
if existing:
|
||||
raise BusinessException(f"API Key 名称 {data.name} 已存在", BizCode.API_KEY_DUPLICATE_NAME)
|
||||
|
||||
# 校验 rate_limit 不能超过租户套餐的 api_ops_rate_limit
|
||||
if data.rate_limit is not None:
|
||||
from app.models.workspace_model import Workspace
|
||||
from app.core.quota_manager import get_api_ops_rate_limit
|
||||
|
||||
workspace = db.query(Workspace).filter(Workspace.id == workspace_id).first()
|
||||
if workspace:
|
||||
tenant_api_ops_limit = get_api_ops_rate_limit(db, workspace.tenant_id)
|
||||
if tenant_api_ops_limit and data.rate_limit > tenant_api_ops_limit:
|
||||
raise I18nException(
|
||||
error_key="errors.api.api_key_rate_limit_exceeded",
|
||||
status_code=400,
|
||||
error_code="API_KEY_RATE_LIMIT_EXCEEDED",
|
||||
rate_limit=data.rate_limit,
|
||||
limit=tenant_api_ops_limit,
|
||||
)
|
||||
|
||||
update_data = data.model_dump(exclude_unset=True)
|
||||
ApiKeyRepository.update(db, api_key_id, update_data)
|
||||
db.commit()
|
||||
@@ -248,55 +282,75 @@ class RateLimiterService:
|
||||
def __init__(self):
|
||||
self.redis = aio_redis
|
||||
|
||||
async def check_tenant_rate_limit(self, tenant_id: uuid.UUID, limit: int) -> Tuple[bool, dict]:
|
||||
async def check_tenant_rate_limit(self, window_id: uuid.UUID, limit: int) -> Tuple[bool, dict]:
|
||||
"""
|
||||
按 tenant_id 做 1 秒滑动窗口限速,限制值来自套餐配额 api_ops_rate_limit
|
||||
按 window_id(workspace_id)做 1 秒滑动窗口限速。
|
||||
限制值来自套餐配额 api_ops_rate_limit。
|
||||
只有请求被允许时才计入窗口,超限请求不污染计数。
|
||||
"""
|
||||
now = time.time()
|
||||
window_start = now - 1 # 1 秒窗口
|
||||
key = f"rate_limit:tenant_qps:{tenant_id}"
|
||||
key = f"rate_limit:tenant_qps:{window_id}"
|
||||
|
||||
async with self.redis.pipeline() as pipe:
|
||||
# 清理 1 秒前的旧记录
|
||||
pipe.zremrangebyscore(key, 0, window_start)
|
||||
# 加入当前请求(score=时间戳,member=时间戳+随机数保证唯一)
|
||||
pipe.zadd(key, {f"{now}:{uuid.uuid4().hex}": now})
|
||||
# 统计窗口内请求数
|
||||
pipe.zcard(key)
|
||||
# 设置 key 过期(2 秒后自动清理)
|
||||
pipe.expire(key, 2)
|
||||
results = await pipe.execute()
|
||||
|
||||
current = results[2]
|
||||
remaining = max(0, limit - current)
|
||||
reset_time = int(now) + 1
|
||||
current = results[1]
|
||||
|
||||
return current <= limit, {
|
||||
if current >= limit:
|
||||
return False, {
|
||||
"limit": limit,
|
||||
"remaining": 0,
|
||||
"reset": int(now) + 1,
|
||||
}
|
||||
|
||||
member = f"{now}:{uuid.uuid4().hex}"
|
||||
async with self.redis.pipeline() as pipe:
|
||||
pipe.zadd(key, {member: now})
|
||||
pipe.expire(key, 2)
|
||||
await pipe.execute()
|
||||
|
||||
return True, {
|
||||
"limit": limit,
|
||||
"remaining": remaining,
|
||||
"reset": reset_time,
|
||||
"remaining": max(0, limit - current - 1),
|
||||
"reset": int(now) + 1,
|
||||
}
|
||||
|
||||
async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]:
|
||||
"""
|
||||
检查QPS限制
|
||||
Returns:
|
||||
(is_allowed, rate_limit_info)
|
||||
检查 API Key 自身 QPS 限制(1 秒滑动窗口)。
|
||||
只有请求被允许时才计入窗口,超限请求不污染计数。
|
||||
"""
|
||||
now = time.time()
|
||||
window_start = now - 1
|
||||
key = f"rate_limit:qps:{api_key_id}"
|
||||
|
||||
async with self.redis.pipeline() as pipe:
|
||||
pipe.incr(key)
|
||||
pipe.expire(key, 1, nx=True) # 1 秒过期
|
||||
pipe.zremrangebyscore(key, 0, window_start)
|
||||
pipe.zcard(key)
|
||||
results = await pipe.execute()
|
||||
|
||||
current = results[0]
|
||||
remaining = max(0, limit - current)
|
||||
reset_time = int(time.time()) + 1
|
||||
current = results[1]
|
||||
|
||||
return current <= limit, {
|
||||
if current >= limit:
|
||||
return False, {
|
||||
"limit": limit,
|
||||
"remaining": 0,
|
||||
"reset": int(now) + 1,
|
||||
}
|
||||
|
||||
member = f"{now}:{uuid.uuid4().hex}"
|
||||
async with self.redis.pipeline() as pipe:
|
||||
pipe.zadd(key, {member: now})
|
||||
pipe.expire(key, 2)
|
||||
await pipe.execute()
|
||||
|
||||
return True, {
|
||||
"limit": limit,
|
||||
"remaining": remaining,
|
||||
"reset": reset_time
|
||||
"remaining": max(0, limit - current - 1),
|
||||
"reset": int(now) + 1,
|
||||
}
|
||||
|
||||
async def check_daily_requests(
|
||||
@@ -304,7 +358,9 @@ class RateLimiterService:
|
||||
api_key_id: uuid.UUID,
|
||||
limit: int
|
||||
) -> Tuple[bool, dict]:
|
||||
"""检查日调用量限制"""
|
||||
"""检查日调用量限制。
|
||||
使用原子 INCR,先写后判断,极低概率下允许轻微超限(并发场景下可接受)。
|
||||
"""
|
||||
today = datetime.now().strftime("%Y%m%d")
|
||||
key = f"rate_limit:daily:{api_key_id}:{today}"
|
||||
|
||||
@@ -313,6 +369,7 @@ class RateLimiterService:
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
expire_seconds = int((tomorrow_0 - now).total_seconds())
|
||||
reset_time = int(tomorrow_0.timestamp())
|
||||
|
||||
async with self.redis.pipeline() as pipe:
|
||||
pipe.incr(key)
|
||||
@@ -320,25 +377,37 @@ class RateLimiterService:
|
||||
results = await pipe.execute()
|
||||
|
||||
current = results[0]
|
||||
remaining = max(0, limit - current)
|
||||
reset_time = int(tomorrow_0.timestamp())
|
||||
|
||||
return current <= limit, {
|
||||
if current > limit:
|
||||
return False, {
|
||||
"limit": limit,
|
||||
"remaining": 0,
|
||||
"reset": reset_time,
|
||||
}
|
||||
|
||||
return True, {
|
||||
"limit": limit,
|
||||
"remaining": remaining,
|
||||
"reset": reset_time
|
||||
"remaining": max(0, limit - current),
|
||||
"reset": reset_time,
|
||||
}
|
||||
|
||||
async def check_all_limits(
|
||||
self,
|
||||
api_key: ApiKey
|
||||
api_key: ApiKey,
|
||||
db: Optional[Session] = None,
|
||||
) -> Tuple[bool, str, dict]:
|
||||
"""
|
||||
检查所有限制
|
||||
Returns:
|
||||
(is_allowed, error_message, rate_limit_headers)
|
||||
检查所有限制,按以下顺序:
|
||||
1. API Key 自身 QPS(每个 key 独立,不超过其配置的 rate_limit)
|
||||
2. 租户套餐 api_ops_rate_limit(每个 key 独立受套餐限额约束)
|
||||
3. API Key 日调用量
|
||||
|
||||
设计语义:
|
||||
- 租户下 N 个 API Key 可以并发调用,互不影响
|
||||
- 每个 API Key 独立受自身 rate_limit 约束
|
||||
- 每个 API Key 独立受租户套餐 api_ops_rate_limit 约束(per-key 限额)
|
||||
"""
|
||||
# Check QPS
|
||||
# 1. 检查 API Key 自身 QPS
|
||||
qps_ok, qps_info = await self.check_qps(
|
||||
api_key.id,
|
||||
api_key.rate_limit
|
||||
@@ -350,6 +419,50 @@ class RateLimiterService:
|
||||
"X-RateLimit-Reset": str(qps_info["reset"])
|
||||
}
|
||||
|
||||
# 2. 检查租户套餐 api_ops_rate_limit(运行时流量控制)
|
||||
# 每个 API Key 的速率都不能超过租户套餐上限,无论 key 自身配置多少
|
||||
tenant_info = None
|
||||
if db is not None:
|
||||
try:
|
||||
from app.models.workspace_model import Workspace
|
||||
from app.core.quota_manager import get_api_ops_rate_limit
|
||||
|
||||
# 从 Redis 缓存取 tenant_limit,避免每次请求都查数据库
|
||||
# TTL 60 秒,套餐变更后最多 1 分钟生效
|
||||
# 用 workspace_id 作为滑动窗口维度(一个 workspace 只属于一个 tenant,语义等价)
|
||||
cache_key = f"tenant_api_ops_limit:{api_key.workspace_id}"
|
||||
cached = await self.redis.get(cache_key)
|
||||
if cached is not None:
|
||||
try:
|
||||
tenant_limit = int(cached) if cached != "0" else None
|
||||
except (ValueError, TypeError):
|
||||
# 缓存数据损坏,回源查数据库
|
||||
cached = None
|
||||
tenant_limit = None
|
||||
|
||||
if cached is None:
|
||||
workspace = db.query(Workspace).filter(Workspace.id == api_key.workspace_id).first()
|
||||
if workspace:
|
||||
tenant_limit = get_api_ops_rate_limit(db, workspace.tenant_id)
|
||||
await self.redis.set(cache_key, str(tenant_limit) if tenant_limit else "0", ex=60)
|
||||
else:
|
||||
tenant_limit = None
|
||||
|
||||
if tenant_limit:
|
||||
# 用 api_key.id 作为滑动窗口 key,每个 API Key 独立受 api_ops_rate_limit 约束
|
||||
tenant_ok, tenant_info = await self.check_tenant_rate_limit(
|
||||
api_key.id, tenant_limit
|
||||
)
|
||||
if not tenant_ok:
|
||||
return False, "API ops rate limit exceeded", {
|
||||
"X-RateLimit-Limit-QPS": str(tenant_limit),
|
||||
"X-RateLimit-Remaining-QPS": str(tenant_info["remaining"]),
|
||||
"X-RateLimit-Reset": str(tenant_info["reset"]),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"租户 api_ops_rate_limit 运行时检查失败,跳过: {e}")
|
||||
|
||||
# 3. 检查日调用量
|
||||
daily_ok, daily_info = await self.check_daily_requests(
|
||||
api_key.id,
|
||||
api_key.daily_request_limit
|
||||
@@ -366,8 +479,12 @@ class RateLimiterService:
|
||||
"X-RateLimit-Remaining-QPS": str(qps_info["remaining"]),
|
||||
"X-RateLimit-Limit-Day": str(daily_info["limit"]),
|
||||
"X-RateLimit-Remaining-Day": str(daily_info["remaining"]),
|
||||
"X-RateLimit-Reset": str(daily_info["reset"])
|
||||
"X-RateLimit-Reset": str(daily_info["reset"]),
|
||||
}
|
||||
# 如果租户限速信息存在,补充到响应头
|
||||
if tenant_info:
|
||||
headers["X-RateLimit-Limit-Tenant-QPS"] = str(tenant_info["limit"])
|
||||
headers["X-RateLimit-Remaining-Tenant-QPS"] = str(tenant_info["remaining"])
|
||||
return True, "", headers
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user