Merge remote-tracking branch 'origin/release/v0.3.1' into fix/Timebomb_031

This commit is contained in:
Timebomb2018
2026-04-20 20:48:46 +08:00
23 changed files with 520 additions and 343 deletions

View File

@@ -51,6 +51,16 @@ class ApiKeyService:
if existing:
raise BusinessException(f"API Key 名称 {data.name} 已存在", BizCode.API_KEY_DUPLICATE_NAME)
# 若 rate_limit 超过租户套餐的 api_ops_rate_limit自动截断到套餐上限
from app.models.workspace_model import Workspace
from app.core.quota_manager import get_api_ops_rate_limit
workspace = db.query(Workspace).filter(Workspace.id == workspace_id).first()
if workspace:
tenant_api_ops_limit = get_api_ops_rate_limit(db, workspace.tenant_id)
if tenant_api_ops_limit and data.rate_limit > tenant_api_ops_limit:
data.rate_limit = tenant_api_ops_limit
# 生成 API Key
api_key = generate_api_key(data.type)
@@ -152,6 +162,17 @@ class ApiKeyService:
if existing:
raise BusinessException(f"API Key 名称 {data.name} 已存在", BizCode.API_KEY_DUPLICATE_NAME)
# 若 rate_limit 超过租户套餐的 api_ops_rate_limit自动截断到套餐上限
if data.rate_limit is not None:
from app.models.workspace_model import Workspace
from app.core.quota_manager import get_api_ops_rate_limit
workspace = db.query(Workspace).filter(Workspace.id == workspace_id).first()
if workspace:
tenant_api_ops_limit = get_api_ops_rate_limit(db, workspace.tenant_id)
if tenant_api_ops_limit and data.rate_limit > tenant_api_ops_limit:
data.rate_limit = tenant_api_ops_limit
update_data = data.model_dump(exclude_unset=True)
ApiKeyRepository.update(db, api_key_id, update_data)
db.commit()
@@ -248,42 +269,14 @@ class RateLimiterService:
def __init__(self):
self.redis = aio_redis
async def check_tenant_rate_limit(self, tenant_id: uuid.UUID, limit: int) -> Tuple[bool, dict]:
"""
按 tenant_id 做 1 秒滑动窗口限速,限制值来自套餐配额 api_ops_rate_limit
"""
now = time.time()
window_start = now - 1 # 1 秒窗口
key = f"rate_limit:tenant_qps:{tenant_id}"
async with self.redis.pipeline() as pipe:
# 清理 1 秒前的旧记录
pipe.zremrangebyscore(key, 0, window_start)
# 加入当前请求score=时间戳member=时间戳+随机数保证唯一)
pipe.zadd(key, {f"{now}:{uuid.uuid4().hex}": now})
# 统计窗口内请求数
pipe.zcard(key)
# 设置 key 过期2 秒后自动清理)
pipe.expire(key, 2)
results = await pipe.execute()
current = results[2]
remaining = max(0, limit - current)
reset_time = int(now) + 1
return current <= limit, {
"limit": limit,
"remaining": remaining,
"reset": reset_time,
}
async def check_qps(self, api_key_id: uuid.UUID, limit: int) -> Tuple[bool, dict]:
"""
检查QPS限制
"""检查QPS限制
Returns:
(is_allowed, rate_limit_info)
"""
key = f"rate_limit:qps:{api_key_id}"
async with self.redis.pipeline() as pipe:
pipe.incr(key)
pipe.expire(key, 1, nx=True) # 1 秒过期
@@ -295,8 +288,9 @@ class RateLimiterService:
return current <= limit, {
"limit": limit,
"current": current,
"remaining": remaining,
"reset": reset_time
"reset": reset_time,
}
async def check_daily_requests(
@@ -304,7 +298,9 @@ class RateLimiterService:
api_key_id: uuid.UUID,
limit: int
) -> Tuple[bool, dict]:
"""检查日调用量限制"""
"""检查日调用量限制
使用原子 INCR先写后判断极低概率下允许轻微超限并发场景下可接受
"""
today = datetime.now().strftime("%Y%m%d")
key = f"rate_limit:daily:{api_key_id}:{today}"
@@ -313,6 +309,7 @@ class RateLimiterService:
hour=0, minute=0, second=0, microsecond=0
)
expire_seconds = int((tomorrow_0 - now).total_seconds())
reset_time = int(tomorrow_0.timestamp())
async with self.redis.pipeline() as pipe:
pipe.incr(key)
@@ -320,36 +317,74 @@ class RateLimiterService:
results = await pipe.execute()
current = results[0]
remaining = max(0, limit - current)
reset_time = int(tomorrow_0.timestamp())
return current <= limit, {
if current > limit:
return False, {
"limit": limit,
"remaining": 0,
"reset": reset_time,
}
return True, {
"limit": limit,
"remaining": remaining,
"reset": reset_time
"remaining": max(0, limit - current),
"reset": reset_time,
}
async def check_all_limits(
self,
api_key: ApiKey
api_key: ApiKey,
db: Optional[Session] = None,
) -> Tuple[bool, str, dict]:
"""
检查所有限制
Returns:
(is_allowed, error_message, rate_limit_headers)
检查所有限制,按以下顺序:
1. API Key QPS取 api_key.rate_limit 与套餐 api_ops_rate_limit 的最小值作为限额
2. API Key 日调用量
"""
# Check QPS
qps_ok, qps_info = await self.check_qps(
api_key.id,
api_key.rate_limit
)
# 1. 取套餐限额与 api_key 自身限额的最小值
effective_limit = api_key.rate_limit
if db is not None:
try:
from app.models.workspace_model import Workspace
from app.core.quota_manager import get_api_ops_rate_limit
cache_key = f"tenant_api_ops_limit:{api_key.workspace_id}"
cached = await self.redis.get(cache_key)
if cached is not None:
try:
tenant_limit = int(cached) if cached != "0" else None
except (ValueError, TypeError):
cached = None
tenant_limit = None
if cached is None:
workspace = db.query(Workspace).filter(Workspace.id == api_key.workspace_id).first()
if workspace:
tenant_limit = get_api_ops_rate_limit(db, workspace.tenant_id)
await self.redis.set(cache_key, str(tenant_limit) if tenant_limit else "0", ex=60)
else:
tenant_limit = None
if tenant_limit:
effective_limit = min(api_key.rate_limit, tenant_limit)
except Exception as e:
logger.warning(f"获取套餐限额失败,使用 api_key 自身限额: {e}")
# 用最终有效限额做 QPS 检查
qps_ok, qps_info = await self.check_qps(api_key.id, effective_limit)
if not qps_ok:
return False, "QPS limit exceeded", {
# 判断是套餐限额触发还是 api_key 自身限额触发
if tenant_limit and effective_limit == tenant_limit and api_key.rate_limit > tenant_limit:
error_msg = "Tenant limit exceeded"
else:
error_msg = "QPS limit exceeded"
return False, error_msg, {
"X-RateLimit-Limit-QPS": str(qps_info["limit"]),
"X-RateLimit-Remaining-QPS": str(qps_info["remaining"]),
"X-RateLimit-Reset": str(qps_info["reset"])
}
# 2. 检查日调用量
daily_ok, daily_info = await self.check_daily_requests(
api_key.id,
api_key.daily_request_limit
@@ -361,14 +396,13 @@ class RateLimiterService:
"X-RateLimit-Reset": str(daily_info["reset"])
}
headers = {
return True, "", {
"X-RateLimit-Limit-QPS": str(qps_info["limit"]),
"X-RateLimit-Remaining-QPS": str(qps_info["remaining"]),
"X-RateLimit-Limit-Day": str(daily_info["limit"]),
"X-RateLimit-Remaining-Day": str(daily_info["remaining"]),
"X-RateLimit-Reset": str(daily_info["reset"])
"X-RateLimit-Reset": str(daily_info["reset"]),
}
return True, "", headers
class ApiKeyAuthService:

View File

@@ -1280,7 +1280,7 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
}
logger.info(
f"Successfully retrieved connected config: memory_config_id={memory_config_id}, workspace_id={app.workspace_id}")
f"Successfully retrieved connected config: memory_config_id={memory_config_id}, workspace_id={end_user.workspace_id}")
return result