314 lines
12 KiB
Python
314 lines
12 KiB
Python
from fastapi import APIRouter, Depends
|
||
from sqlalchemy.orm import Session
|
||
import uuid
|
||
from typing import Callable
|
||
|
||
from app.core.error_codes import BizCode
|
||
from app.core.exceptions import BusinessException
|
||
from app.db import get_db
|
||
from app.dependencies import get_current_user, get_current_superuser
|
||
from app.models.user_model import User
|
||
from app.schemas import user_schema
|
||
from app.schemas.user_schema import (
|
||
ChangePasswordRequest,
|
||
AdminChangePasswordRequest,
|
||
SendEmailCodeRequest,
|
||
VerifyEmailCodeRequest,
|
||
VerifyPasswordRequest)
|
||
from app.schemas.response_schema import ApiResponse
|
||
from app.services import user_service
|
||
from app.core.logging_config import get_api_logger
|
||
from app.core.response_utils import success
|
||
from app.core.security import verify_password
|
||
from app.i18n.dependencies import get_translator
|
||
|
||
# 获取API专用日志器
|
||
api_logger = get_api_logger()
|
||
|
||
router = APIRouter(
|
||
prefix="/users",
|
||
tags=["Users"],
|
||
)
|
||
|
||
|
||
@router.post("/superuser", response_model=ApiResponse)
|
||
def create_superuser(
|
||
user: user_schema.UserCreate,
|
||
db: Session = Depends(get_db),
|
||
current_superuser: User = Depends(get_current_superuser),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""创建超级管理员(仅超级管理员可访问)"""
|
||
api_logger.info(f"超级管理员创建请求: {user.username}, email: {user.email}")
|
||
|
||
result = user_service.create_superuser(db=db, user=user, current_user=current_superuser)
|
||
api_logger.info(f"超级管理员创建成功: {result.username} (ID: {result.id})")
|
||
|
||
result_schema = user_schema.User.model_validate(result)
|
||
return success(data=result_schema, msg=t("users.create.superuser_success"))
|
||
|
||
|
||
@router.delete("/{user_id}", response_model=ApiResponse)
|
||
def delete_user(
|
||
user_id: uuid.UUID,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""停用用户(软删除)"""
|
||
api_logger.info(f"用户停用请求: user_id={user_id}, 操作者: {current_user.username}")
|
||
result = user_service.deactivate_user(
|
||
db=db, user_id_to_deactivate=user_id, current_user=current_user
|
||
)
|
||
api_logger.info(f"用户停用成功: {result.username} (ID: {result.id})")
|
||
return success(msg=t("users.delete.deactivate_success"))
|
||
|
||
@router.post("/{user_id}/activate", response_model=ApiResponse)
|
||
def activate_user(
|
||
user_id: uuid.UUID,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""激活用户"""
|
||
api_logger.info(f"用户激活请求: user_id={user_id}, 操作者: {current_user.username}")
|
||
|
||
result = user_service.activate_user(
|
||
db=db, user_id_to_activate=user_id, current_user=current_user
|
||
)
|
||
api_logger.info(f"用户激活成功: {result.username} (ID: {result.id})")
|
||
|
||
result_schema = user_schema.User.model_validate(result)
|
||
return success(data=result_schema, msg=t("users.activate.success"))
|
||
|
||
|
||
@router.get("", response_model=ApiResponse)
|
||
def get_current_user_info(
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""获取当前用户信息"""
|
||
api_logger.info(f"当前用户信息请求: {current_user.username}")
|
||
|
||
result = user_service.get_user(
|
||
db=db, user_id=current_user.id, current_user=current_user
|
||
)
|
||
|
||
result_schema = user_schema.User.model_validate(result)
|
||
|
||
# 设置当前工作空间的角色和名称
|
||
if current_user.current_workspace_id:
|
||
from app.repositories.workspace_repository import WorkspaceRepository
|
||
workspace_repo = WorkspaceRepository(db)
|
||
current_workspace = workspace_repo.get_workspace_by_id(current_user.current_workspace_id)
|
||
if current_workspace:
|
||
result_schema.current_workspace_name = current_workspace.name
|
||
|
||
for ws in result.workspaces:
|
||
if ws.workspace_id == current_user.current_workspace_id and ws.is_active:
|
||
result_schema.role = ws.role
|
||
break
|
||
|
||
api_logger.info(f"当前用户信息获取成功: {result.username}, 角色: {result_schema.role}, 工作空间: {result_schema.current_workspace_name}")
|
||
|
||
# 设置权限:如果用户来自 SSO Source,则使用该 Source 的 permissions;否则返回 "all" 表示拥有所有权限
|
||
if current_user.external_source:
|
||
from premium.sso.models import SSOSource
|
||
source = db.query(SSOSource).filter(SSOSource.source_code == current_user.external_source).first()
|
||
if source and source.permissions:
|
||
result_schema.permissions = source.permissions
|
||
else:
|
||
result_schema.permissions = []
|
||
else:
|
||
result_schema.permissions = ["all"]
|
||
|
||
return success(data=result_schema, msg=t("users.info.get_success"))
|
||
|
||
|
||
@router.get("/superusers", response_model=ApiResponse)
|
||
def get_tenant_superusers(
|
||
include_inactive: bool = False,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_superuser),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""获取当前租户下的超管账号列表(仅超级管理员可访问)"""
|
||
api_logger.info(f"获取租户超管列表请求: {current_user.username}")
|
||
|
||
superusers = user_service.get_tenant_superusers(
|
||
db=db,
|
||
current_user=current_user,
|
||
include_inactive=include_inactive
|
||
)
|
||
api_logger.info(f"租户超管列表获取成功: count={len(superusers)}")
|
||
|
||
superusers_schema = [user_schema.User.model_validate(u) for u in superusers]
|
||
return success(data=superusers_schema, msg=t("users.list.superusers_success"))
|
||
|
||
|
||
@router.get("/{user_id}", response_model=ApiResponse)
|
||
def get_user_info_by_id(
|
||
user_id: uuid.UUID,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""根据用户ID获取用户信息"""
|
||
api_logger.info(f"获取用户信息请求: user_id={user_id}, 操作者: {current_user.username}")
|
||
|
||
result = user_service.get_user(
|
||
db=db, user_id=user_id, current_user=current_user
|
||
)
|
||
api_logger.info(f"用户信息获取成功: {result.username}")
|
||
|
||
result_schema = user_schema.User.model_validate(result)
|
||
return success(data=result_schema, msg=t("users.info.get_success"))
|
||
|
||
|
||
@router.put("/change-password", response_model=ApiResponse)
|
||
async def change_password(
|
||
request: ChangePasswordRequest,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""修改当前用户密码"""
|
||
api_logger.info(f"用户密码修改请求: {current_user.username}")
|
||
|
||
await user_service.change_password(
|
||
db=db,
|
||
user_id=current_user.id,
|
||
old_password=request.old_password,
|
||
new_password=request.new_password,
|
||
current_user=current_user
|
||
)
|
||
api_logger.info(f"用户密码修改成功: {current_user.username}")
|
||
return success(msg=t("auth.password.change_success"))
|
||
|
||
|
||
@router.put("/admin/change-password", response_model=ApiResponse)
|
||
async def admin_change_password(
|
||
request: AdminChangePasswordRequest,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_superuser),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""超级管理员修改指定用户的密码"""
|
||
api_logger.info(f"管理员密码修改请求: 管理员 {current_user.username} 修改用户 {request.user_id}")
|
||
|
||
user, generated_password = await user_service.admin_change_password(
|
||
db=db,
|
||
target_user_id=request.user_id,
|
||
new_password=request.new_password,
|
||
current_user=current_user
|
||
)
|
||
|
||
# 根据是否生成了随机密码来构造响应
|
||
if request.new_password:
|
||
api_logger.info(f"管理员密码修改成功: 用户 {request.user_id}")
|
||
return success(msg=t("auth.password.change_success"))
|
||
else:
|
||
api_logger.info(f"管理员密码重置成功: 用户 {request.user_id}, 随机密码已生成")
|
||
return success(data=generated_password, msg=t("auth.password.reset_success"))
|
||
|
||
|
||
@router.post("/verify_pwd", response_model=ApiResponse)
|
||
def verify_pwd(
|
||
request: VerifyPasswordRequest,
|
||
current_user: User = Depends(get_current_user),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""验证当前用户密码"""
|
||
api_logger.info(f"用户验证密码请求: {current_user.username}")
|
||
|
||
is_valid = verify_password(request.password, current_user.hashed_password)
|
||
api_logger.info(f"用户密码验证结果: {current_user.username}, valid={is_valid}")
|
||
if not is_valid:
|
||
raise BusinessException(t("users.errors.password_verification_failed"), code=BizCode.VALIDATION_FAILED)
|
||
return success(data={"valid": is_valid}, msg=t("common.success.retrieved"))
|
||
|
||
|
||
@router.post("/send-email-code", response_model=ApiResponse)
|
||
async def send_email_code(
|
||
request: SendEmailCodeRequest,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""发送邮箱验证码"""
|
||
api_logger.info(f"用户请求发送邮箱验证码: {current_user.username}, email={request.email}")
|
||
|
||
await user_service.send_email_code_method(db=db, email=request.email, user_id=current_user.id)
|
||
|
||
api_logger.info(f"邮箱验证码已发送: {current_user.username}")
|
||
return success(msg=t("users.email.code_sent"))
|
||
|
||
|
||
@router.put("/change-email", response_model=ApiResponse)
|
||
async def change_email(
|
||
request: VerifyEmailCodeRequest,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""验证验证码并修改邮箱"""
|
||
api_logger.info(f"用户修改邮箱: {current_user.username}, new_email={request.new_email}")
|
||
|
||
await user_service.verify_and_change_email(
|
||
db=db,
|
||
user_id=current_user.id,
|
||
new_email=request.new_email,
|
||
code=request.code
|
||
)
|
||
|
||
api_logger.info(f"用户邮箱修改成功: {current_user.username}")
|
||
return success(msg=t("users.email.change_success"))
|
||
|
||
|
||
|
||
@router.get("/me/language", response_model=ApiResponse)
|
||
def get_current_user_language(
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""获取当前用户的语言偏好"""
|
||
api_logger.info(f"获取用户语言偏好: {current_user.username}")
|
||
|
||
language = user_service.get_user_language_preference(
|
||
db=db,
|
||
user_id=current_user.id,
|
||
current_user=current_user
|
||
)
|
||
|
||
api_logger.info(f"用户语言偏好获取成功: {current_user.username}, language={language}")
|
||
return success(
|
||
data=user_schema.LanguagePreferenceResponse(language=language),
|
||
msg=t("users.language.get_success")
|
||
)
|
||
|
||
|
||
@router.put("/me/language", response_model=ApiResponse)
|
||
def update_current_user_language(
|
||
request: user_schema.LanguagePreferenceRequest,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
t: Callable = Depends(get_translator)
|
||
):
|
||
"""设置当前用户的语言偏好"""
|
||
api_logger.info(f"更新用户语言偏好: {current_user.username}, language={request.language}")
|
||
|
||
updated_user = user_service.update_user_language_preference(
|
||
db=db,
|
||
user_id=current_user.id,
|
||
language=request.language,
|
||
current_user=current_user
|
||
)
|
||
|
||
api_logger.info(f"用户语言偏好更新成功: {current_user.username}, language={request.language}")
|
||
return success(
|
||
data=user_schema.LanguagePreferenceResponse(language=updated_user.preferred_language),
|
||
msg=t("users.language.update_success")
|
||
)
|