fix(quota): fix tenant ID retrieval and QPS counting logic

- Fix issue where tenant ID lookup from shared records failed to query the workspace correctly.
- Switch QPS counting from sliding window to simple counter to improve performance and simplify logic.
- Remove unnecessary `time` module import.
This commit is contained in:
wwq
2026-04-20 18:10:28 +08:00
parent 08b5c7bc8a
commit 3227c25b07
2 changed files with 34 additions and 54 deletions

View File

@@ -6,7 +6,6 @@
2. 降级到 default_free_plan.py 配置文件(社区版兜底)
"""
import asyncio
import time
from functools import wraps
from typing import Optional, Callable, Dict, Any
from uuid import UUID
@@ -68,7 +67,9 @@ def _get_tenant_id_from_kwargs(db: Session, kwargs: dict):
if share_record:
app = db.query(App).filter(App.id == share_record.app_id, App.is_active.is_(True)).first()
if app:
return app.workspace.tenant_id
workspace = db.query(Workspace).filter(Workspace.id == app.workspace_id).first()
if workspace:
return workspace.tenant_id
return None
@@ -597,7 +598,6 @@ async def get_quota_usage(db: Session, tenant_id: UUID) -> dict:
from app.aioRedis import aio_redis as _aio_redis
from app.models.api_key_model import ApiKey
from app.models.workspace_model import Workspace
_now = time.time()
# api_ops_rate_limit 限的是每个 api_key 每秒最高限额
# 展示当前最接近触发限流的 key 的 QPS取最大值
api_key_ids = db.query(ApiKey.id).join(
@@ -608,7 +608,8 @@ async def get_quota_usage(db: Session, tenant_id: UUID) -> dict:
).all()
for (key_id,) in api_key_ids:
_rk = API_KEY_QPS_REDIS_KEY.format(api_key_id=key_id)
count = int(await _aio_redis.zcount(_rk, _now - 1, "+inf") or 0)
val = await _aio_redis.get(_rk)
count = int(val) if val else 0
if count > api_ops_current:
api_ops_current = count
except Exception as e:

View File

@@ -283,39 +283,27 @@ class RateLimiterService:
self.redis = aio_redis
async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]:
"""检查QPS限制
Returns:
(is_allowed, rate_limit_info)
"""
检查 API Key 自身 QPS 限制1 秒滑动窗口)。
只有请求被允许时才计入窗口,超限请求不污染计数。
"""
now = time.time()
window_start = now - 1
key = f"rate_limit:qps:{api_key_id}"
async with self.redis.pipeline() as pipe:
pipe.zcount(key, window_start, "+inf")
pipe.incr(key)
pipe.expire(key, 1, nx=True) # 1 秒过期
results = await pipe.execute()
current = results[0]
remaining = max(0, limit - current)
reset_time = int(time.time()) + 1
if current >= limit:
return False, {
"limit": limit,
"current": current,
"remaining": 0,
"reset": int(now) + 1,
}
member = f"{now}:{uuid.uuid4().hex}"
async with self.redis.pipeline() as pipe:
pipe.zadd(key, {member: now})
pipe.expire(key, 2)
await pipe.execute()
return True, {
return current <= limit, {
"limit": limit,
"current": current + 1,
"remaining": max(0, limit - current - 1),
"reset": int(now) + 1,
"current": current,
"remaining": remaining,
"reset": reset_time,
}
async def check_daily_requests(
@@ -363,25 +351,11 @@ class RateLimiterService:
) -> Tuple[bool, str, dict]:
"""
检查所有限制,按以下顺序:
1. API Key 自身 QPSrate_limit 在创建时已保证不超过套餐 api_ops_rate_limit
2. 套餐降级保护:若套餐已降级导致 rate_limit > 套餐上限,用套餐上限二次卡控
3. API Key 日调用量
1. API Key QPS取 api_key.rate_limit 套餐 api_ops_rate_limit 的最小值作为限额
2. API Key 日调用量
"""
# 1. 检查 API Key 自身 QPS
qps_ok, qps_info = await self.check_qps(
api_key.id,
api_key.rate_limit
)
if not qps_ok:
return False, "QPS limit exceeded", {
"X-RateLimit-Limit-QPS": str(qps_info["limit"]),
"X-RateLimit-Remaining-QPS": str(qps_info["remaining"]),
"X-RateLimit-Reset": str(qps_info["reset"])
}
# 2. 套餐降级保护
# 套餐降级后已有 api_key 的 rate_limit 可能高于新套餐上限
# 复用第1关已计好的 current不重复写 Redis
# 1. 取套餐限额与 api_key 自身限额的最小值
effective_limit = api_key.rate_limit
if db is not None:
try:
from app.models.workspace_model import Workspace
@@ -404,16 +378,21 @@ class RateLimiterService:
else:
tenant_limit = None
if tenant_limit and qps_info["current"] > tenant_limit:
return False, "QPS limit exceeded", {
"X-RateLimit-Limit-QPS": str(tenant_limit),
"X-RateLimit-Remaining-QPS": "0",
"X-RateLimit-Reset": str(qps_info["reset"]),
}
if tenant_limit:
effective_limit = min(api_key.rate_limit, tenant_limit)
except Exception as e:
logger.warning(f"套餐降级保护检查失败,跳过: {e}")
logger.warning(f"获取套餐限额失败,使用 api_key 自身限额: {e}")
# 3. 检查日调用量
# 用最终有效限额做 QPS 检查
qps_ok, qps_info = await self.check_qps(api_key.id, effective_limit)
if not qps_ok:
return False, "QPS limit exceeded", {
"X-RateLimit-Limit-QPS": str(qps_info["limit"]),
"X-RateLimit-Remaining-QPS": str(qps_info["remaining"]),
"X-RateLimit-Reset": str(qps_info["reset"])
}
# 2. 检查日调用量
daily_ok, daily_info = await self.check_daily_requests(
api_key.id,
api_key.daily_request_limit