Files
MemoryBear/api/app/schemas/api_key_schema.py

223 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""API Key Schema"""
import datetime
import uuid
from pydantic import BaseModel, Field, ConfigDict, field_validator, field_serializer, computed_field
from typing import Optional, List
from app.models.api_key_model import ApiKeyType
from app.core.api_key_utils import timestamp_to_datetime, datetime_to_timestamp
class ApiKeyCreate(BaseModel):
"""创建 API Key"""
name: str = Field(..., description="API Key 名称", max_length=255)
description: Optional[str] = Field(None, description="描述")
type: ApiKeyType = Field(..., description="API Key 类型")
scopes: List[str] = Field(default_factory=list, description="权限范围列表")
resource_id: Optional[uuid.UUID] = Field(None, description="关联资源ID")
rate_limit: Optional[int] = Field(50, ge=1, le=1000, description="QPS限制请求/秒)")
daily_request_limit: Optional[int] = Field(100000, description="日请求限制", ge=1)
quota_limit: Optional[int] = Field(None, description="配额限制(总请求数)", ge=1)
expires_at: Optional[datetime.datetime] = Field(None, description="过期时间")
@computed_field
@property
def is_expired(self) -> bool:
"""检查API Key是否已过期"""
if not self.expires_at:
return False
return datetime.datetime.now() > self.expires_at
@field_validator('expires_at', mode='before')
@classmethod
def parse_expires_at(cls, v):
"""将时间戳转换为datetime"""
if isinstance(v, (int, float)):
return timestamp_to_datetime(v)
return v
@field_validator('scopes')
@classmethod
def validate_scopes(cls, v):
"""验证权限范围格式"""
if v is None:
return []
valid_scopes = ["app", "rag", "memory"]
for scope in v:
if scope not in valid_scopes:
raise ValueError(f"无效范围: {scope}")
return v
class ApiKeyUpdate(BaseModel):
"""更新 API Key配置"""
name: Optional[str] = Field(None, description="API Key 名称", max_length=255)
description: Optional[str] = Field(None, description="描述")
scopes: Optional[List[str]] = Field(None, description="权限范围列表")
rate_limit: Optional[int] = Field(None, description="速率限制(请求/分钟)", ge=1)
daily_request_limit: Optional[int] = Field(100000, description="每日请求数限制", ge=1)
quota_limit: Optional[int] = Field(None, description="配额限制(总请求数)", ge=1)
is_active: Optional[bool] = Field(None, description="是否激活")
expires_at: Optional[datetime.datetime] = Field(None, description="过期时间")
@computed_field
@property
def is_expired(self) -> bool:
"""检查API Key是否已过期"""
if not self.expires_at:
return False
return datetime.datetime.now() > self.expires_at
@field_validator('expires_at', mode='before')
@classmethod
def parse_expires_at(cls, v):
"""将时间戳转换为datetime"""
if isinstance(v, (int, float)):
return timestamp_to_datetime(v)
return v
@field_validator('scopes')
@classmethod
def validate_scopes(cls, v):
"""验证权限范围格式"""
if v is None:
return v
valid_scopes = ["app", "rag", "memory"]
for scope in v:
if scope not in valid_scopes:
raise ValueError(f"无效范围: {scope}")
return v
class ApiKeyResponse(BaseModel):
"""API Key 响应(创建时返回,包含明文 Key"""
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID
name: str
description: Optional[str]
api_key: str
type: str
scopes: List[str]
resource_id: Optional[uuid.UUID]
rate_limit: int
daily_request_limit: int
quota_limit: Optional[int]
is_active: bool
expires_at: Optional[datetime.datetime]
created_at: datetime.datetime
@computed_field
@property
def is_expired(self) -> bool:
"""检查API Key是否已过期"""
if not self.expires_at:
return False
return datetime.datetime.now() > self.expires_at
@field_serializer('expires_at', 'created_at')
@classmethod
def serialize_datetime(cls, v: Optional[datetime.datetime]) -> Optional[int]:
"""将datetime转换为时间戳"""
return datetime_to_timestamp(v)
class ApiKey(BaseModel):
"""API Key 信息(不包含明文 Key"""
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID
name: str
description: Optional[str]
api_key: str
type: str
scopes: List[str]
resource_id: Optional[uuid.UUID]
rate_limit: int
daily_request_limit: int
quota_limit: Optional[int]
quota_used: int
expires_at: Optional[datetime.datetime]
is_active: bool
last_used_at: Optional[datetime.datetime]
usage_count: int
workspace_id: uuid.UUID
created_by: uuid.UUID
created_at: datetime.datetime
updated_at: datetime.datetime
@computed_field
@property
def is_expired(self) -> bool:
"""检查API Key是否已过期"""
if not self.expires_at:
return False
return datetime.datetime.now() > self.expires_at
@field_serializer('expires_at', 'last_used_at', 'created_at', 'updated_at')
def serialize_datetime(self, v: Optional[datetime.datetime]) -> Optional[int]:
"""将datetime转换为时间戳"""
return datetime_to_timestamp(v)
class ApiKeyStats(BaseModel):
"""API Key 使用统计"""
total_requests: int = Field(..., description="总请求数")
requests_today: int = Field(..., description="今日请求数")
quota_used: int = Field(..., description="已使用配额")
quota_limit: Optional[int] = Field(None, description="配额限制")
last_used_at: Optional[datetime.datetime] = Field(None, description="最后使用时间")
avg_response_time: Optional[float] = Field(None, description="平均响应时间(毫秒)")
@field_serializer('last_used_at')
def serialize_datetime(self, v: Optional[datetime.datetime]) -> Optional[int]:
"""将datetime转换为时间戳"""
return datetime_to_timestamp(v)
class ApiKeyQuery(BaseModel):
"""API Key 查询参数"""
type: Optional[ApiKeyType] = Field(None, description="API Key 类型")
is_active: Optional[bool] = Field(None, description="是否激活")
resource_id: Optional[uuid.UUID] = Field(None, description="关联资源ID")
page: int = Field(1, ge=1, description="页码")
pagesize: int = Field(10, ge=1, le=100, description="每页数量")
class ApiKeyAuth(BaseModel):
"""API Key 认证信息"""
api_key_id: uuid.UUID
workspace_id: uuid.UUID
type: str
scopes: List[str]
resource_id: Optional[uuid.UUID]
class ApiKeyLog(BaseModel):
"""API Key 使用日志"""
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID
api_key_id: uuid.UUID
# 请求信息
endpoint: str
method: str
ip_address: Optional[str]
user_agent: Optional[str]
# 响应信息
status_code: Optional[int]
response_time: Optional[int] # 毫秒
# 业务信息
tokens_used: Optional[int]
# 时间信息
created_at: datetime.datetime
@field_serializer('created_at')
def serialize_datetime(self, v: datetime.datetime) -> int:
"""将datetime转换为时间戳"""
return datetime_to_timestamp(v)