feat(user system): modifies the email address.

This commit is contained in:
Timebomb2018
2026-02-25 11:29:42 +08:00
parent 0b9cc0f068
commit 12ba3d473e
6 changed files with 363 additions and 2 deletions

View File

@@ -1,13 +1,18 @@
import datetime
import json
import secrets
import string
from pydantic import EmailStr
from sqlalchemy.orm import Session
import uuid
from app.aioRedis import aio_redis_set, aio_redis_get, aio_redis_delete
from app.models.user_model import User
from app.repositories import user_repository
from app.schemas.user_schema import UserCreate
from app.schemas.tenant_schema import TenantCreate
from app.services.email_service import send_email
from app.services.tenant_service import TenantService
from app.services.session_service import SessionService
from app.core.security import get_password_hash, verify_password
@@ -563,3 +568,175 @@ def generate_random_password(length: int = 12) -> str:
secrets.SystemRandom().shuffle(password)
return ''.join(password)
def generate_email_code() -> str:
"""生成6位数字验证码"""
return ''.join([str(secrets.randbelow(10)) for _ in range(6)])
async def send_email_code_method(db: Session, email: EmailStr, user_id: uuid.UUID):
"""发送邮箱验证码"""
business_logger.info(f"发送邮箱验证码: email={email}")
# 检查发送间隔
rate_limit_key = f"email_code_rate:{user_id}"
last_send = await aio_redis_get(rate_limit_key)
if last_send:
raise BusinessException("请稍后再试验证码发送间隔为1分钟", code=BizCode.RATE_LIMITED)
# 检查新邮箱是否已被使用
existing_user = user_repository.get_user_by_email(db=db, email=email)
if existing_user and existing_user.id != user_id:
raise BusinessException("邮箱已被使用", code=BizCode.DUPLICATE_NAME)
if existing_user and existing_user.id == user_id:
raise BusinessException("新邮箱与当前邮箱相同", code=BizCode.DUPLICATE_NAME)
# 生成验证码
code = generate_email_code()
# 存储到 Redis5分钟过期
cache_key = f"email_code:{user_id}:{email}"
await aio_redis_set(cache_key, json.dumps(code), expire=300)
# 发送邮件
await send_email(
email,
"邮箱验证码",
f'<p>您的验证码是:<strong>{code}</strong></p><p>验证码在5分钟内有效。</p>'
)
# 设置发送间隔限制60秒
await aio_redis_set(rate_limit_key, "1", expire=60)
business_logger.info(f"邮箱验证码已发送: {email}")
async def verify_and_change_email(db: Session, user_id: uuid.UUID, new_email: EmailStr, code: str) -> User:
"""验证验证码并修改邮箱"""
business_logger.info(f"验证并修改邮箱: user_id={user_id}, new_email={new_email}")
db_user = user_repository.get_user_by_id(db=db, user_id=user_id)
if not db_user:
raise BusinessException("用户不存在", code=BizCode.USER_NOT_FOUND)
# 验证验证码
cache_key = f"email_code:{user_id}:{new_email}"
cached_code = await aio_redis_get(cache_key)
if not cached_code:
raise BusinessException("验证码已过期", code=BizCode.VALIDATION_FAILED)
if json.loads(cached_code) != code:
raise BusinessException("验证码错误", code=BizCode.VALIDATION_FAILED)
# 修改邮箱
db_user.email = new_email
db.commit()
db.refresh(db_user)
# 删除验证码
await aio_redis_delete(cache_key)
# 使所有旧 tokens 失效
# await SessionService.invalidate_all_user_tokens(str(user_id))
business_logger.info(f"用户邮箱修改成功: {db_user.username}, new_email={new_email}")
return db_user
# def generate_email_token(user_id: str, old_email: str, new_email: str) -> str:
# """生成邮箱修改token"""
# payload = {
# "user_id": user_id,
# "old_email": old_email,
# "new_email": new_email,
# "exp": datetime.datetime.now(datetime.timezone.utc) + timedelta(hours=24)
# }
# return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
#
#
# def verify_email_token(token: str) -> dict:
# """验证邮箱修改token"""
# try:
# payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
# return payload
# except jwt.ExpiredSignatureError:
# raise BusinessException("链接已过期", code=BizCode.VALIDATION_FAILED)
# except jwt.InvalidTokenError:
# raise BusinessException("无效的链接", code=BizCode.VALIDATION_FAILED)
#
#
# async def request_change_email(db: Session, user_id: uuid.UUID, new_email: EmailStr, current_user: User):
# """请求修改邮箱,发送验证邮件"""
# business_logger.info(f"用户请求修改邮箱: user_id={user_id}, new_email={new_email}")
#
# if current_user.id != user_id:
# raise PermissionDeniedException("只能修改自己的邮箱")
#
# db_user = user_repository.get_user_by_id(db=db, user_id=user_id)
# if not db_user:
# raise BusinessException("用户不存在", code=BizCode.USER_NOT_FOUND)
#
# if db_user.email == new_email:
# raise BusinessException("新邮箱与当前邮箱相同", code=BizCode.VALIDATION_FAILED)
#
# existing_user = user_repository.get_user_by_email(db=db, email=new_email)
# if existing_user and existing_user.id != user_id:
# raise BusinessException("邮箱已被使用", code=BizCode.DUPLICATE_NAME)
#
# token = generate_email_token(str(user_id), db_user.email, new_email)
#
# # 发送确认邮件到旧邮箱
# old_email_link = f"{settings.BASE_URL}/api/users/email/confirm-email-change?token={token}"
# await send_email(
# db_user.email,
# "确认修改邮箱",
# f'<p>请点击以下链接确认修改邮箱:</p><a href="{old_email_link}">确认修改</a>'
# )
#
# business_logger.info(f"邮箱修改确认邮件已发送到旧邮箱: {db_user.email}")
#
#
# async def confirm_email_change(db: Session, token: str):
# """确认修改邮箱(旧邮箱确认)"""
# payload = verify_email_token(token)
# user_id = uuid.UUID(payload["user_id"])
# new_email = payload["new_email"]
#
# db_user = user_repository.get_user_by_id(db=db, user_id=user_id)
# if not db_user:
# raise BusinessException("用户不存在", code=BizCode.USER_NOT_FOUND)
#
# # 发送激活邮件到新邮箱
# activate_link = f"{settings.BASE_URL}/api/users/email/activate-new-email?token={token}"
# await send_email(
# new_email,
# "激活新邮箱",
# f'<p>请点击以下链接激活新邮箱:</p><a href="{activate_link}">激活邮箱</a>'
# )
#
# business_logger.info(f"新邮箱激活邮件已发送: {new_email}")
#
#
# async def activate_new_email(db: Session, token: str) -> User:
# """激活新邮箱"""
# payload = verify_email_token(token)
# user_id = uuid.UUID(payload["user_id"])
# new_email = payload["new_email"]
#
# db_user = user_repository.get_user_by_id(db=db, user_id=user_id)
# if not db_user:
# raise BusinessException("用户不存在", code=BizCode.USER_NOT_FOUND)
#
# db_user.email = new_email
# db.commit()
# db.refresh(db_user)
#
# # 使所有旧 tokens 失效
# await SessionService.invalidate_all_user_tokens(str(user_id))
#
# business_logger.info(f"用户邮箱修改成功: {db_user.username}, new_email={new_email}")
# return db_user