diff --git a/api/app/core/quota_manager.py b/api/app/core/quota_manager.py index 28130dee..bf04059e 100644 --- a/api/app/core/quota_manager.py +++ b/api/app/core/quota_manager.py @@ -6,7 +6,6 @@ 2. 降级到 default_free_plan.py 配置文件(社区版兜底) """ import asyncio -import time from functools import wraps from typing import Optional, Callable, Dict, Any from uuid import UUID @@ -68,7 +67,9 @@ def _get_tenant_id_from_kwargs(db: Session, kwargs: dict): if share_record: app = db.query(App).filter(App.id == share_record.app_id, App.is_active.is_(True)).first() if app: - return app.workspace.tenant_id + workspace = db.query(Workspace).filter(Workspace.id == app.workspace_id).first() + if workspace: + return workspace.tenant_id return None @@ -597,7 +598,6 @@ async def get_quota_usage(db: Session, tenant_id: UUID) -> dict: from app.aioRedis import aio_redis as _aio_redis from app.models.api_key_model import ApiKey from app.models.workspace_model import Workspace - _now = time.time() # api_ops_rate_limit 限的是每个 api_key 每秒最高限额 # 展示当前最接近触发限流的 key 的 QPS(取最大值) api_key_ids = db.query(ApiKey.id).join( @@ -608,7 +608,8 @@ async def get_quota_usage(db: Session, tenant_id: UUID) -> dict: ).all() for (key_id,) in api_key_ids: _rk = API_KEY_QPS_REDIS_KEY.format(api_key_id=key_id) - count = int(await _aio_redis.zcount(_rk, _now - 1, "+inf") or 0) + val = await _aio_redis.get(_rk) + count = int(val) if val else 0 if count > api_ops_current: api_ops_current = count except Exception as e: diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index c1bbbdc8..e67d623e 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -283,39 +283,27 @@ class RateLimiterService: self.redis = aio_redis async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]: + """检查QPS限制 + + Returns: + (is_allowed, rate_limit_info) """ - 检查 API Key 自身 QPS 限制(1 秒滑动窗口)。 - 只有请求被允许时才计入窗口,超限请求不污染计数。 - """ - now = time.time() - window_start = now - 1 key = f"rate_limit:qps:{api_key_id}" async with self.redis.pipeline() as pipe: - pipe.zcount(key, window_start, "+inf") + pipe.incr(key) + pipe.expire(key, 1, nx=True) # 1 秒过期 results = await pipe.execute() current = results[0] + remaining = max(0, limit - current) + reset_time = int(time.time()) + 1 - if current >= limit: - return False, { - "limit": limit, - "current": current, - "remaining": 0, - "reset": int(now) + 1, - } - - member = f"{now}:{uuid.uuid4().hex}" - async with self.redis.pipeline() as pipe: - pipe.zadd(key, {member: now}) - pipe.expire(key, 2) - await pipe.execute() - - return True, { + return current <= limit, { "limit": limit, - "current": current + 1, - "remaining": max(0, limit - current - 1), - "reset": int(now) + 1, + "current": current, + "remaining": remaining, + "reset": reset_time, } async def check_daily_requests( @@ -363,25 +351,11 @@ class RateLimiterService: ) -> Tuple[bool, str, dict]: """ 检查所有限制,按以下顺序: - 1. API Key 自身 QPS(rate_limit 在创建时已保证不超过套餐 api_ops_rate_limit) - 2. 套餐降级保护:若套餐已降级导致 rate_limit > 套餐上限,用套餐上限二次卡控 - 3. API Key 日调用量 + 1. API Key QPS:取 api_key.rate_limit 与套餐 api_ops_rate_limit 的最小值作为限额 + 2. API Key 日调用量 """ - # 1. 检查 API Key 自身 QPS - qps_ok, qps_info = await self.check_qps( - api_key.id, - api_key.rate_limit - ) - if not qps_ok: - return False, "QPS limit exceeded", { - "X-RateLimit-Limit-QPS": str(qps_info["limit"]), - "X-RateLimit-Remaining-QPS": str(qps_info["remaining"]), - "X-RateLimit-Reset": str(qps_info["reset"]) - } - - # 2. 套餐降级保护 - # 套餐降级后已有 api_key 的 rate_limit 可能高于新套餐上限 - # 复用第1关已计好的 current,不重复写 Redis + # 1. 取套餐限额与 api_key 自身限额的最小值 + effective_limit = api_key.rate_limit if db is not None: try: from app.models.workspace_model import Workspace @@ -404,16 +378,21 @@ class RateLimiterService: else: tenant_limit = None - if tenant_limit and qps_info["current"] > tenant_limit: - return False, "QPS limit exceeded", { - "X-RateLimit-Limit-QPS": str(tenant_limit), - "X-RateLimit-Remaining-QPS": "0", - "X-RateLimit-Reset": str(qps_info["reset"]), - } + if tenant_limit: + effective_limit = min(api_key.rate_limit, tenant_limit) except Exception as e: - logger.warning(f"套餐降级保护检查失败,跳过: {e}") + logger.warning(f"获取套餐限额失败,使用 api_key 自身限额: {e}") - # 3. 检查日调用量 + # 用最终有效限额做 QPS 检查 + qps_ok, qps_info = await self.check_qps(api_key.id, effective_limit) + if not qps_ok: + return False, "QPS limit exceeded", { + "X-RateLimit-Limit-QPS": str(qps_info["limit"]), + "X-RateLimit-Remaining-QPS": str(qps_info["remaining"]), + "X-RateLimit-Reset": str(qps_info["reset"]) + } + + # 2. 检查日调用量 daily_ok, daily_info = await self.check_daily_requests( api_key.id, api_key.daily_request_limit