@@ -30,6 +30,24 @@ def _get_user_from_kwargs(kwargs: dict):
return None
def _get_workspace_id_from_kwargs ( kwargs : dict ) :
""" 从 kwargs 中获取 workspace_id """
# 优先从 kwargs['workspace_id'] 获取
workspace_id = kwargs . get ( " workspace_id " )
if workspace_id :
return workspace_id
# 从 user.current_workspace_id 获取
user = _get_user_from_kwargs ( kwargs )
if user :
ws_id = getattr ( user , ' current_workspace_id ' , None )
if ws_id :
return ws_id
logger . warning ( f " 无法获取 workspace_id, kwargs keys: { list ( kwargs . keys ( ) ) } " )
return None
def _get_tenant_id_from_kwargs ( db : Session , kwargs : dict ) :
""" 从 kwargs 中获取 tenant_id """
user = _get_user_from_kwargs ( kwargs )
@@ -136,15 +154,19 @@ class QuotaUsageRepository:
Workspace . is_active . is_ ( True )
) . count ( )
def count_apps ( self , tenant_id : UUID ) - > int :
def count_apps ( self , tenant_id : UUID , workspace_id : Optional [ UUID ] = None ) - > int :
from app . models . app_model import App
from app . models . workspace_model import Workspace
return self . db . query ( App ) . join (
query = self . db . query ( App ) . join (
Workspace , App . workspace_id == Workspace . id
) . filter (
Workspace . tenant_id == tenant_id ,
App . is_active . is_ ( True )
) . count ( )
)
if workspace_id :
query = query . filter ( App . workspace_id == workspace_id )
else :
query = query . filter ( Workspace . tenant_id == tenant_id )
return query . count ( )
def count_skills ( self , tenant_id : UUID ) - > int :
from app . models . skill_model import Skill
@@ -153,41 +175,50 @@ class QuotaUsageRepository:
Skill . is_active . is_ ( True )
) . count ( )
def sum_knowledge_capacity_gb ( self , tenant_id : UUID ) - > float :
def sum_knowledge_capacity_gb ( self , tenant_id : UUID , workspace_id : Optional [ UUID ] = None ) - > float :
from app . models . document_model import Document
from app . models . knowledge_model import Knowledge
from app . models . workspace_model import Workspace
result = self . db . query ( func . coalesce ( func . sum ( Document . file_size ) , 0 ) ) . join (
query = self . db . query ( func . coalesce ( func . sum ( Document . file_size ) , 0 ) ) . join (
Knowledge , Document . kb_id == Knowledge . id
) . join (
Workspace , Knowledge . workspace_id == Workspace . id
) . filter (
Workspace . tenant_id == tenant_id ,
Document . status == 1 ,
) . scalar ( )
)
if workspace_id :
query = query . filter ( Knowledge . workspace_id == workspace_id )
else :
query = query . filter ( Workspace . tenant_id == tenant_id )
result = query . scalar ( )
return float ( result ) / ( 1024 * * 3 ) if result else 0.0
def count_memory_engines ( self , tenant_id : UUID ) - > int :
def count_memory_engines ( self , tenant_id : UUID , workspace_id : Optional [ UUID ] = None ) - > int :
from app . models . memory_config_model import MemoryConfig
from app . models . workspace_model import Workspace
return self . db . query ( MemoryConfig ) . join (
query = self . db . query ( MemoryConfig ) . join (
Workspace , MemoryConfig . workspace_id == Workspace . id
) . filter (
W orkspace. tenant_id == tenant _id
) . count ( )
)
if w orkspace_id:
query = query . filter ( MemoryConfig . workspace_id == workspace_id )
else :
query = query . filter ( Workspace . tenant_id == tenant_id )
return query . count ( )
def count_end_users ( self , tenant_id : UUID ) - > int :
def count_end_users ( self , tenant_id : UUID , workspace_id : Optional [ UUID ] = None ) - > int :
from app . models . end_user_model import EndUser
from app . models . workspace_model import Workspace
from app . models . user_model import User
query = self . db . query ( EndUser ) . join (
Workspace , EndUser . workspace_id == Workspace . id
)
if workspace_id :
query = query . filter ( EndUser . workspace_id == workspace_id )
else :
query = query . filter ( Workspace . tenant_id == tenant_id )
trial_user_ids = [
str ( u . id ) for u in self . db . query ( User . id ) . filter ( User . tenant_id == tenant_id ) . all ( )
]
query = self . db . query ( EndUser ) . join (
Workspace , EndUser . workspace_id == Workspace . id
) . filter (
Workspace . tenant_id == tenant_id
)
if trial_user_ids :
query = query . filter ( ~ EndUser . other_id . in_ ( trial_user_ids ) )
return query . count ( )
@@ -196,19 +227,24 @@ class QuotaUsageRepository:
from app . models . models_model import ModelConfig
return self . db . query ( ModelConfig ) . filter (
ModelConfig . tenant_id == tenant_id ,
ModelConfig . is_active == True
ModelConfig . is_active == True ,
ModelConfig . is_composite == True
) . count ( )
def count_ontology_projects ( self , tenant_id : UUID ) - > int :
def count_ontology_projects ( self , tenant_id : UUID , workspace_id : Optional [ UUID ] = None ) - > int :
from app . models . ontology_scene import OntologyScene
from app . models . workspace_model import Workspace
if workspace_id :
return self . db . query ( OntologyScene ) . filter (
OntologyScene . workspace_id == workspace_id
) . count ( )
return self . db . query ( OntologyScene ) . join (
Workspace , OntologyScene . workspace_id == Workspace . id
) . filter (
Workspace . tenant_id == tenant_id
) . count ( )
def get_usage_by_quota_type ( self , tenant_id : UUID , quota_type : str ) :
def get_usage_by_quota_type ( self , tenant_id : UUID , quota_type : str , workspace_id : Optional [ UUID ] = None ):
""" 按配额类型分发,返回当前使用量 """
dispatch = {
" workspace_quota " : self . count_workspaces ,
@@ -221,6 +257,8 @@ class QuotaUsageRepository:
" ontology_project_quota " : self . count_ontology_projects ,
}
fn = dispatch . get ( quota_type )
if workspace_id :
return fn ( tenant_id , workspace_id ) if fn else 0
return fn ( tenant_id ) if fn else 0
@@ -230,6 +268,7 @@ def _check_quota(
quota_type : str ,
resource_name : str ,
usage_func : Optional [ Callable ] = None ,
workspace_id : Optional [ UUID ] = None ,
) - > None :
""" 核心配额检查逻辑:对比使用量和配额限制 """
try :
@@ -244,13 +283,13 @@ def _check_quota(
return
if usage_func :
current_usage = usage_func ( db , tenant_id )
current_usage = usage_func ( db , tenant_id , workspace_id ) if workspace_id else usage_func ( db , tenant_id )
else :
current_usage = QuotaUsageRepository ( db ) . get_usage_by_quota_type ( tenant_id , quota_type )
current_usage = QuotaUsageRepository ( db ) . get_usage_by_quota_type ( tenant_id , quota_type , workspace_id )
if current_usage > = quota_limit :
logger . warning (
f " 配额不足: tenant= { tenant_id } , type= { quota_type } , "
f " 配额不足: tenant= { tenant_id } , workspace= { workspace_id } , type= { quota_type } , "
f " usage= { current_usage } , limit= { quota_limit } "
)
raise QuotaExceededError (
@@ -260,7 +299,7 @@ def _check_quota(
)
logger . debug (
f " 配额检查通过: tenant= { tenant_id } , type= { quota_type } , "
f " 配额检查通过: tenant= { tenant_id } , workspace= { workspace_id } , type= { quota_type } , "
f " usage= { current_usage } , limit= { quota_limit } "
)
@@ -268,7 +307,7 @@ def _check_quota(
raise
except Exception as e :
logger . error (
f " 配额检查异常: tenant= { tenant_id } , type= { quota_type } , "
f " 配额检查异常: tenant= { tenant_id } , workspace= { workspace_id } , type= { quota_type } , "
f " error_type= { type ( e ) . __name__ } , error= { str ( e ) } " ,
exc_info = True ,
)
@@ -333,7 +372,11 @@ def check_app_quota(func: Callable) -> Callable:
if not db or not user :
logger . error ( f " 配额检查失败: { func . __name__ } 缺少 db 或 user 参数,拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " app_quota " , " app " )
workspace_id = _get_workspace_id_from_kwargs ( kwargs )
if not workspace_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 workspace_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " app_quota " , " app " , workspace_id = workspace_id )
return await func ( * args , * * kwargs )
@wraps ( func )
@@ -343,7 +386,11 @@ def check_app_quota(func: Callable) -> Callable:
if not db or not user :
logger . error ( f " 配额检查失败: { func . __name__ } 缺少 db 或 user 参数,拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " app_quota " , " app " )
workspace_id = _get_workspace_id_from_kwargs ( kwargs )
if not workspace_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 workspace_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " app_quota " , " app " , workspace_id = workspace_id )
return func ( * args , * * kwargs )
return async_wrapper if asyncio . iscoroutinefunction ( func ) else sync_wrapper
@@ -360,7 +407,11 @@ def check_knowledge_capacity_quota(func: Callable) -> Callable:
if not tenant_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 tenant_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , tenant_id , " knowledge_capacity_quota " , " knowledge_capacity " )
workspace_id = _get_workspace_id_from_kwargs ( kwargs )
if not workspace_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 workspace_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , tenant_id , " knowledge_capacity_quota " , " knowledge_capacity " , workspace_id = workspace_id )
return await func ( * args , * * kwargs )
@wraps ( func )
@@ -370,7 +421,11 @@ def check_knowledge_capacity_quota(func: Callable) -> Callable:
if not db or not user :
logger . error ( f " 配额检查失败: { func . __name__ } 缺少 db 或 user 参数,拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " knowledge_capacity_quota " , " knowledge_capacity " )
workspace_id = _get_workspace_id_from_kwargs ( kwargs )
if not workspace_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 workspace_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " knowledge_capacity_quota " , " knowledge_capacity " , workspace_id = workspace_id )
return func ( * args , * * kwargs )
return async_wrapper if asyncio . iscoroutinefunction ( func ) else sync_wrapper
@@ -381,20 +436,30 @@ def check_memory_engine_quota(func: Callable) -> Callable:
async def async_wrapper ( * args , * * kwargs ) :
db : Session = kwargs . get ( " db " )
user = _get_user_from_kwargs ( kwargs )
logger . debug ( f " check_memory_engine_quota async_wrapper: db= { db is not None } , user= { user } , kwargs_keys= { list ( kwargs . keys ( ) ) } " )
if not db or not user :
logger . error ( f " 配额检查失败: { func . __name__ } 缺少 db 或 user 参数,拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " memory_engine_quota " , " memory_engine " )
workspace_id = _get_workspace_id_from_kwargs ( kwargs )
if not workspace_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 workspace_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " memory_engine_quota " , " memory_engine " , workspace_id = workspace_id )
return await func ( * args , * * kwargs )
@wraps ( func )
def sync_wrapper ( * args , * * kwargs ) :
db : Session = kwargs . get ( " db " )
user = _get_user_from_kwargs ( kwargs )
logger . debug ( f " check_memory_engine_quota sync_wrapper: db= { db is not None } , user= { user } , kwargs_keys= { list ( kwargs . keys ( ) ) } " )
if not db or not user :
logger . error ( f " 配额检查失败: { func . __name__ } 缺少 db 或 user 参数,拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " memory_engine_quota " , " memory_engine " )
workspace_id = _get_workspace_id_from_kwargs ( kwargs )
if not workspace_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 workspace_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " memory_engine_quota " , " memory_engine " , workspace_id = workspace_id )
return func ( * args , * * kwargs )
return async_wrapper if asyncio . iscoroutinefunction ( func ) else sync_wrapper
@@ -411,7 +476,11 @@ def check_end_user_quota(func: Callable) -> Callable:
if not tenant_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 tenant_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , tenant_id , " end_user_quota " , " end_user " )
workspace_id = _get_workspace_id_from_kwargs ( kwargs )
if not workspace_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 workspace_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , tenant_id , " end_user_quota " , " end_user " , workspace_id = workspace_id )
return await func ( * args , * * kwargs )
@wraps ( func )
@@ -424,7 +493,11 @@ def check_end_user_quota(func: Callable) -> Callable:
if not tenant_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 tenant_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , tenant_id , " end_user_quota " , " end_user " )
workspace_id = _get_workspace_id_from_kwargs ( kwargs )
if not workspace_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 workspace_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , tenant_id , " end_user_quota " , " end_user " , workspace_id = workspace_id )
return func ( * args , * * kwargs )
return async_wrapper if asyncio . iscoroutinefunction ( func ) else sync_wrapper
@@ -438,7 +511,11 @@ def check_ontology_project_quota(func: Callable) -> Callable:
if not db or not user :
logger . error ( f " 配额检查失败: { func . __name__ } 缺少 db 或 user 参数,拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " ontology_project_quota " , " ontology_project " )
workspace_id = _get_workspace_id_from_kwargs ( kwargs )
if not workspace_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 workspace_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " ontology_project_quota " , " ontology_project " , workspace_id = workspace_id )
return await func ( * args , * * kwargs )
@wraps ( func )
@@ -448,7 +525,11 @@ def check_ontology_project_quota(func: Callable) -> Callable:
if not db or not user :
logger . error ( f " 配额检查失败: { func . __name__ } 缺少 db 或 user 参数,拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " ontology_project_quota " , " ontology_project " )
workspace_id = _get_workspace_id_from_kwargs ( kwargs )
if not workspace_id :
logger . error ( f " 配额检查失败: { func . __name__ } 无法获取 workspace_id, 拒绝请求 " )
raise InternalServerError ( )
_check_quota ( db , user . tenant_id , " ontology_project_quota " , " ontology_project " , workspace_id = workspace_id )
return func ( * args , * * kwargs )
return async_wrapper if asyncio . iscoroutinefunction ( func ) else sync_wrapper
@@ -581,7 +662,14 @@ def check_quota(quota_type: str, resource_name: str, usage_func: Optional[Callab
# ─── 配额使用统计 ────────────────────────────────────────────────────────────
async def get_quota_usage ( db : Session , tenant_id : UUID ) - > dict :
""" 获取租户所有配额的使用情况 """
""" 获取租户所有配额的使用情况
对于 workspace 级别的配额( app/knowledge_capacity/memory_engine/end_user) :
- used: 租户汇总(所有空间加总)
- limit: quota × 活跃工作区数(有效总限额,使汇总数据自洽)
- per_workspace: 各空间明细,包含 workspace_id、workspace_name、used、limit、percentage
- 配额检查逻辑不变:仍按单个空间独立检查
"""
quota_config = _get_quota_config ( db , tenant_id )
if not quota_config :
return { }
@@ -600,11 +688,48 @@ async def get_quota_usage(db: Session, tenant_id: UUID) -> dict:
model_count = repo . count_models ( tenant_id )
ontology_count = repo . count_ontology_projects ( tenant_id )
# 获取租户下所有活跃工作区,用于按空间拆分明细
from app . models . workspace_model import Workspace
active_workspaces = db . query ( Workspace ) . filter (
Workspace . tenant_id == tenant_id ,
Workspace . is_active . is_ ( True )
) . all ( )
# 构建各空间的 workspace 级配额明细
def _build_per_workspace_detail ( count_func , per_unit_limit ) :
""" 为 workspace 级配额构建 per_workspace 明细列表 """
if not per_unit_limit or not active_workspaces :
return [ ]
details = [ ]
for ws in active_workspaces :
ws_used = count_func ( tenant_id , ws . id )
details . append ( {
" workspace_id " : str ( ws . id ) ,
" workspace_name " : ws . name ,
" used " : ws_used ,
" limit " : per_unit_limit ,
" percentage " : pct ( ws_used , per_unit_limit ) ,
} )
return details
# workspace 级配额的每空间限额
app_quota_per_ws = quota_config . get ( " app_quota " )
knowledge_quota_per_ws = quota_config . get ( " knowledge_capacity_quota " )
memory_quota_per_ws = quota_config . get ( " memory_engine_quota " )
end_user_quota_per_ws = quota_config . get ( " end_user_quota " )
ontology_quota_per_ws = quota_config . get ( " ontology_project_quota " )
# workspace 级配额的有效总限额 = 每空间限额 × 活跃工作区数
app_effective_limit = app_quota_per_ws * workspace_count if app_quota_per_ws is not None and workspace_count > 0 else app_quota_per_ws
knowledge_effective_limit = knowledge_quota_per_ws * workspace_count if knowledge_quota_per_ws is not None and workspace_count > 0 else knowledge_quota_per_ws
memory_effective_limit = memory_quota_per_ws * workspace_count if memory_quota_per_ws is not None and workspace_count > 0 else memory_quota_per_ws
end_user_effective_limit = end_user_quota_per_ws * workspace_count if end_user_quota_per_ws is not None and workspace_count > 0 else end_user_quota_per_ws
ontology_effective_limit = ontology_quota_per_ws * workspace_count if ontology_quota_per_ws is not None and workspace_count > 0 else ontology_quota_per_ws
api_ops_current = 0
try :
from app . aioRedis import aio_redis as _aio_redis
from app . models . api_key_model import ApiKey
from app . models . workspace_model import Workspace
# api_ops_rate_limit 限的是每个 api_key 每秒最高限额
# 展示当前最接近触发限流的 key 的 QPS( 取最大值)
api_key_ids = db . query ( ApiKey . id ) . join (
@@ -625,11 +750,37 @@ async def get_quota_usage(db: Session, tenant_id: UUID) -> dict:
return {
" workspace " : { " used " : workspace_count , " limit " : quota_config . get ( " workspace_quota " ) , " percentage " : pct ( workspace_count , quota_config . get ( " workspace_quota " ) ) } ,
" skill " : { " used " : skill_count , " limit " : quota_config . get ( " skill_quota " ) , " percentage " : pct ( skill_count , quota_config . get ( " skill_quota " ) ) } ,
" app " : { " used " : app_count , " limit " : quota_config . get ( " app_quota " ) , " percentage " : pct ( app_count , quota_config . get ( " app_quota " ) ) } ,
" knowledge_capacity " : { " used " : round ( knowledge_gb , 2 ) , " limit " : quota_config . get ( " knowledge_capacity_quota " ) , " percentage " : pct ( knowledge_gb , quota_config . get ( " knowledge_capacity_quota " ) ) , " unit " : " GB " } ,
" memory_engine " : { " used " : memory_count , " limit " : quota_config . get ( " memory_engine_quota " ) , " percentage " : pct ( memory_count , quota_config . get ( " memory_engine_quota " ) ) } ,
" end_user " : { " used " : end_user_count , " limit " : quota_config . get ( " end_user_quota " ) , " percentage " : pct ( end_user _count, quota_config . get ( " end_user_quota " ) ) } ,
" ontology_project " : { " used " : ontology_count , " limit " : quota_config . get ( " ontology_project_quota " ) , " percentage " : pct ( ontology_count , quota_config . get ( " ontology_project_quota " ) ) } ,
" app " : {
" used " : app_count ,
" limit " : app_effective_limit ,
" percentage " : pct ( app _count, app_effective_limit ) ,
" per_workspace " : _build_per_workspace_detail ( repo . count_apps , app_ quota_per_ws ) ,
} ,
" knowledge_capacity " : {
" used " : round ( knowledge_gb , 2 ) ,
" limit " : knowledge_effective_limit ,
" percentage " : pct ( knowledge_gb , knowledge_effective_limit ) ,
" unit " : " GB " ,
" per_workspace " : _build_per_workspace_detail ( repo . sum_knowledge_capacity_gb , knowledge_quota_per_ws ) ,
} ,
" memory_engine " : {
" used " : memory_count ,
" limit " : memory_effective_limit ,
" percentage " : pct ( memory_count , memory_effective_limit ) ,
" per_workspace " : _build_per_workspace_detail ( repo . count_memory_engines , memory_quota_per_ws ) ,
} ,
" end_user " : {
" used " : end_user_count ,
" limit " : end_user_effective_limit ,
" percentage " : pct ( end_user_count , end_user_effective_limit ) ,
" per_workspace " : _build_per_workspace_detail ( repo . count_end_users , end_user_quota_per_ws ) ,
} ,
" ontology_project " : {
" used " : ontology_count ,
" limit " : ontology_effective_limit ,
" percentage " : pct ( ontology_count , ontology_effective_limit ) ,
" per_workspace " : _build_per_workspace_detail ( repo . count_ontology_projects , ontology_quota_per_ws ) ,
} ,
" model " : { " used " : model_count , " limit " : quota_config . get ( " model_quota " ) , " percentage " : pct ( model_count , quota_config . get ( " model_quota " ) ) } ,
" api_ops_rate_limit " : { " current " : api_ops_current , " limit " : quota_config . get ( " api_ops_rate_limit " ) , " percentage " : None , " unit " : " 次/秒 " } ,
}