Files
MemoryBear/api/app/repositories/release_share_repository.py

60 lines
2.2 KiB
Python

import uuid
from typing import Optional
from sqlalchemy.orm import Session
from sqlalchemy import select
from app.models import ReleaseShare
class ReleaseShareRepository:
"""发布版本分享仓储"""
def __init__(self, db: Session):
self.db = db
def create(self, release_share: ReleaseShare) -> ReleaseShare:
"""创建分享配置"""
self.db.add(release_share)
self.db.commit()
self.db.refresh(release_share)
return release_share
def get_by_id(self, share_id: uuid.UUID) -> Optional[ReleaseShare]:
"""根据 ID 获取分享配置"""
return self.db.get(ReleaseShare, share_id)
def get_by_release_id(self, release_id: uuid.UUID) -> Optional[ReleaseShare]:
"""根据发布版本 ID 获取分享配置"""
stmt = select(ReleaseShare).where(ReleaseShare.release_id == release_id)
return self.db.scalars(stmt).first()
def get_by_share_token(self, share_token: str) -> Optional[ReleaseShare]:
"""根据分享 token 获取分享配置"""
stmt = select(ReleaseShare).where(ReleaseShare.share_token == share_token)
return self.db.scalars(stmt).first()
def update(self, release_share: ReleaseShare) -> ReleaseShare:
"""更新分享配置"""
self.db.commit()
self.db.refresh(release_share)
return release_share
def delete(self, release_share: ReleaseShare) -> None:
"""删除分享配置"""
self.db.delete(release_share)
self.db.commit()
def token_exists(self, share_token: str) -> bool:
"""检查 token 是否已存在"""
stmt = select(ReleaseShare.id).where(ReleaseShare.share_token == share_token)
return self.db.scalars(stmt).first() is not None
def increment_view_count(self, share_id: uuid.UUID) -> None:
"""增加访问次数(异步更新,不阻塞)"""
from datetime import datetime
stmt = select(ReleaseShare).where(ReleaseShare.id == share_id)
share = self.db.scalars(stmt).first()
if share:
share.view_count += 1
share.last_accessed_at = datetime.now()
self.db.commit()