feat(quota): implement workspace-level quota enforcement and statistics

- Refactor quota management logic to support usage checks scoped by workspace.
- Update quota statistics API to return granular quota details for each workspace.
- Revise default configuration settings for terminal user and model limits.
- Remove quota check decorators from the model controller.
This commit is contained in:
wwq
2026-04-22 18:52:27 +08:00
parent 7193eed9e3
commit 4a4391a19c

View File

@@ -32,26 +32,13 @@ def _get_user_from_kwargs(kwargs: dict):
def _get_workspace_id_from_kwargs(kwargs: dict): def _get_workspace_id_from_kwargs(kwargs: dict):
"""从 kwargs 中获取 workspace_id""" """从 kwargs 中获取 workspace_id"""
# 优先从 kwargs['workspace_id'] 获取
workspace_id = kwargs.get("workspace_id") workspace_id = kwargs.get("workspace_id")
if workspace_id: if workspace_id:
logger.info(f"_get_workspace_id_from_kwargs: 从 kwargs['workspace_id'] 获取: {workspace_id}") logger.info(f"_get_workspace_id_from_kwargs: 从 kwargs['workspace_id'] 获取: {workspace_id}")
return workspace_id return workspace_id
data = kwargs.get("data") or kwargs.get("body") or kwargs.get("payload") # 从 current_user.current_workspace_id 获取(管理端接口)
if data and hasattr(data, "workspace_id"):
ws_id = data.workspace_id
if ws_id:
logger.info(f"_get_workspace_id_from_kwargs: 从 payload 获取: {ws_id}")
return ws_id
logger.info(f"_get_workspace_id_from_kwargs: payload.workspace_id 为 None继续尝试其他方式")
api_key_auth = kwargs.get("api_key_auth")
if api_key_auth and hasattr(api_key_auth, 'workspace_id'):
ws_id = api_key_auth.workspace_id
logger.info(f"_get_workspace_id_from_kwargs: 从 api_key_auth 获取: {ws_id}")
return ws_id
# 从 current_user.current_workspace_id 获取cur_workspace_access_guard 模式)
user = _get_user_from_kwargs(kwargs) user = _get_user_from_kwargs(kwargs)
logger.info(f"_get_workspace_id_from_kwargs: user={user}, type={type(user)}") logger.info(f"_get_workspace_id_from_kwargs: user={user}, type={type(user)}")
if user: if user:
@@ -60,25 +47,20 @@ def _get_workspace_id_from_kwargs(kwargs: dict):
if ws_id: if ws_id:
return ws_id return ws_id
# 如果 current_workspace_id 为空,尝试获取用户的第一个工作空间 # 从 payload/data/body 获取(仅当上述方式都失败时)
if hasattr(user, 'id'): data = kwargs.get("data") or kwargs.get("body") or kwargs.get("payload")
try: if data and hasattr(data, "workspace_id"):
from app.models.workspace_model import WorkspaceMember ws_id = data.workspace_id
db_session = kwargs.get("db") if ws_id:
logger.info(f"_get_workspace_id_from_kwargs: db_session={db_session is not None}, user.id={user.id}") logger.info(f"_get_workspace_id_from_kwargs: 从 payload 获取: {ws_id}")
if db_session: return ws_id
first_workspace = (
db_session.query(WorkspaceMember.workspace_id) # 从 api_key_auth 获取API Key 认证方式)
.filter(WorkspaceMember.user_id == user.id) api_key_auth = kwargs.get("api_key_auth")
.filter(WorkspaceMember.is_active.is_(True)) if api_key_auth and hasattr(api_key_auth, 'workspace_id'):
.first() ws_id = api_key_auth.workspace_id
) logger.info(f"_get_workspace_id_from_kwargs: 从 api_key_auth 获取: {ws_id}")
logger.info(f"_get_workspace_id_from_kwargs: first_workspace={first_workspace}") return ws_id
if first_workspace:
logger.info(f"用户 {user.username} 的 current_workspace_id 为空,使用第一个工作空间: {first_workspace.workspace_id}")
return first_workspace.workspace_id
except Exception as e:
logger.error(f"获取用户第一个工作空间失败: {e}", exc_info=True)
logger.warning(f"_get_workspace_id_from_kwargs: 无法从 kwargs 获取 workspace_id, keys={list(kwargs.keys())}") logger.warning(f"_get_workspace_id_from_kwargs: 无法从 kwargs 获取 workspace_id, keys={list(kwargs.keys())}")
return None return None