From 12ba3d473e4e23a4816f48c0019b2d1b7d7567e7 Mon Sep 17 00:00:00 2001
From: Timebomb2018 <18868801967@163.com>
Date: Wed, 25 Feb 2026 11:29:42 +0800
Subject: [PATCH] feat(user system): modifies the email address.
---
api/app/controllers/user_controller.py | 63 ++++++++-
api/app/core/config.py | 6 +
api/app/schemas/user_schema.py | 22 +++
api/app/services/email_service.py | 88 ++++++++++++
api/app/services/user_service.py | 177 +++++++++++++++++++++++++
api/env.example | 9 ++
6 files changed, 363 insertions(+), 2 deletions(-)
create mode 100644 api/app/services/email_service.py
diff --git a/api/app/controllers/user_controller.py b/api/app/controllers/user_controller.py
index 57495a7c..3c574c81 100644
--- a/api/app/controllers/user_controller.py
+++ b/api/app/controllers/user_controller.py
@@ -2,15 +2,23 @@ from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
import uuid
+from app.core.error_codes import BizCode
+from app.core.exceptions import BusinessException
from app.db import get_db
from app.dependencies import get_current_user, get_current_superuser
from app.models.user_model import User
from app.schemas import user_schema
-from app.schemas.user_schema import ChangePasswordRequest, AdminChangePasswordRequest
+from app.schemas.user_schema import (
+ ChangePasswordRequest,
+ AdminChangePasswordRequest,
+ SendEmailCodeRequest,
+ VerifyEmailCodeRequest,
+ VerifyPasswordRequest)
from app.schemas.response_schema import ApiResponse
from app.services import user_service
from app.core.logging_config import get_api_logger
from app.core.response_utils import success
+from app.core.security import verify_password
# 获取API专用日志器
api_logger = get_api_logger()
@@ -120,6 +128,7 @@ def get_tenant_superusers(
return success(data=superusers_schema, msg="租户超管列表获取成功")
+
@router.get("/{user_id}", response_model=ApiResponse)
def get_user_info_by_id(
user_id: uuid.UUID,
@@ -180,4 +189,54 @@ async def admin_change_password(
return success(msg="密码修改成功")
else:
api_logger.info(f"管理员密码重置成功: 用户 {request.user_id}, 随机密码已生成")
- return success(data=generated_password, msg="密码重置成功")
\ No newline at end of file
+ return success(data=generated_password, msg="密码重置成功")
+
+
+@router.post("/verify_pwd", response_model=ApiResponse)
+def verify_pwd(
+ request: VerifyPasswordRequest,
+ current_user: User = Depends(get_current_user),
+):
+ """验证当前用户密码"""
+ api_logger.info(f"用户验证密码请求: {current_user.username}")
+
+ is_valid = verify_password(request.password, current_user.hashed_password)
+ api_logger.info(f"用户密码验证结果: {current_user.username}, valid={is_valid}")
+ if not is_valid:
+ raise BusinessException("密码验证失败", code=BizCode.VALIDATION_FAILED)
+ return success(data={"valid": is_valid}, msg="验证完成")
+
+
+@router.post("/send-email-code", response_model=ApiResponse)
+async def send_email_code(
+ request: SendEmailCodeRequest,
+ db: Session = Depends(get_db),
+ current_user: User = Depends(get_current_user),
+):
+ """发送邮箱验证码"""
+ api_logger.info(f"用户请求发送邮箱验证码: {current_user.username}, email={request.email}")
+
+ await user_service.send_email_code_method(db=db, email=request.email, user_id=current_user.id)
+
+ api_logger.info(f"邮箱验证码已发送: {current_user.username}")
+ return success(msg="验证码已发送到您的邮箱,请查收")
+
+
+@router.put("/change-email", response_model=ApiResponse)
+async def change_email(
+ request: VerifyEmailCodeRequest,
+ db: Session = Depends(get_db),
+ current_user: User = Depends(get_current_user),
+):
+ """验证验证码并修改邮箱"""
+ api_logger.info(f"用户修改邮箱: {current_user.username}, new_email={request.new_email}")
+
+ await user_service.verify_and_change_email(
+ db=db,
+ user_id=current_user.id,
+ new_email=request.new_email,
+ code=request.code
+ )
+
+ api_logger.info(f"用户邮箱修改成功: {current_user.username}")
+ return success(msg="邮箱修改成功")
diff --git a/api/app/core/config.py b/api/app/core/config.py
index b1354b9f..3a0c97b4 100644
--- a/api/app/core/config.py
+++ b/api/app/core/config.py
@@ -193,6 +193,12 @@ class Settings:
CELERY_BROKER: int = int(os.getenv("CELERY_BROKER", "1"))
CELERY_BACKEND: int = int(os.getenv("CELERY_BACKEND", "2"))
+ # SMTP Email Configuration
+ SMTP_SERVER: str = os.getenv("SMTP_SERVER", "smtp.gmail.com")
+ SMTP_PORT: int = int(os.getenv("SMTP_PORT", "587"))
+ SMTP_USER: str = os.getenv("SMTP_USER", "")
+ SMTP_PASSWORD: str = os.getenv("SMTP_PASSWORD", "")
+
REFLECTION_INTERVAL_SECONDS: float = float(os.getenv("REFLECTION_INTERVAL_SECONDS", "300"))
HEALTH_CHECK_SECONDS: float = float(os.getenv("HEALTH_CHECK_SECONDS", "600"))
MEMORY_INCREMENT_INTERVAL_HOURS: float = float(os.getenv("MEMORY_INCREMENT_INTERVAL_HOURS", "24"))
diff --git a/api/app/schemas/user_schema.py b/api/app/schemas/user_schema.py
index 60f52aaf..7b9e201d 100644
--- a/api/app/schemas/user_schema.py
+++ b/api/app/schemas/user_schema.py
@@ -36,6 +36,28 @@ class AdminChangePasswordRequest(BaseModel):
new_password: Optional[str] = Field(None, min_length=6, description="新密码,至少6位。如果不提供则自动生成随机密码")
+class ChangeEmailRequest(BaseModel):
+ """修改邮箱请求"""
+ password: str = Field(..., description="当前密码")
+ new_email: EmailStr = Field(..., description="新邮箱地址")
+
+
+class SendEmailCodeRequest(BaseModel):
+ """发送邮箱验证码请求"""
+ email: EmailStr = Field(..., description="邮箱地址")
+
+
+class VerifyEmailCodeRequest(BaseModel):
+ """验证邮箱验证码并修改邮箱请求"""
+ new_email: EmailStr = Field(..., description="新邮箱地址")
+ code: str = Field(..., min_length=6, max_length=6, description="验证码")
+
+
+class VerifyPasswordRequest(BaseModel):
+ """验证密码请求"""
+ password: str = Field(..., description="密码")
+
+
class ChangePasswordResponse(BaseModel):
"""修改密码响应"""
message: str
diff --git a/api/app/services/email_service.py b/api/app/services/email_service.py
new file mode 100644
index 00000000..d7b255dc
--- /dev/null
+++ b/api/app/services/email_service.py
@@ -0,0 +1,88 @@
+import smtplib
+import re
+import asyncio
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+from email.header import Header
+from email.utils import formataddr
+from concurrent.futures import ThreadPoolExecutor
+
+from app.core.config import settings
+from app.core.error_codes import BizCode
+from app.core.exceptions import BusinessException
+from app.core.logging_config import get_business_logger
+
+business_logger = get_business_logger()
+
+
+def _send_email_sync(to_email: str, subject: str, html_content: str, text_content: str = None):
+ """同步发送邮件"""
+ smtp_server = settings.SMTP_SERVER
+ smtp_port = settings.SMTP_PORT
+ smtp_user = settings.SMTP_USER
+ smtp_password = settings.SMTP_PASSWORD
+
+ if not smtp_server or not smtp_user or not smtp_password:
+ raise BusinessException("邮件服务未配置", code=BizCode.SERVICE_UNAVAILABLE)
+
+ msg = MIMEMultipart('alternative')
+ msg['Subject'] = Header(subject, "utf-8")
+ from_name = "MemoryBear系统"
+ msg['From'] = formataddr((Header(from_name, 'utf-8').encode(), smtp_user))
+ msg['To'] = Header(to_email, "utf-8")
+
+ if not text_content:
+ text_content = html_content.replace('
', '\n').replace('
', '\n').replace('
', '\n') + text_content = re.sub(r'<.*?>', '', text_content) + text_part = MIMEText(text_content, 'plain', 'utf-8') + msg.attach(text_part) + + html_part = MIMEText(html_content, 'html', 'utf-8') + msg.attach(html_part) + + if smtp_port == 465: + with smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=10) as server: + server.login(smtp_user, smtp_password) + server.send_message(msg) + else: + with smtplib.SMTP(smtp_server, smtp_port, timeout=10) as server: + server.starttls() + server.login(smtp_user, smtp_password) + server.send_message(msg) + + +async def send_email(to_email: str, subject: str, html_content: str, text_content: str = None): + """异步发送邮件""" + to_email = to_email.strip() + if not to_email or not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', to_email): + err_msg = f"收件人邮箱格式无效: {to_email}" + business_logger.error(err_msg) + raise BusinessException(err_msg, code=BizCode.INVALID_PARAMETER) + + try: + loop = asyncio.get_event_loop() + with ThreadPoolExecutor() as executor: + await loop.run_in_executor( + executor, + _send_email_sync, + to_email, + subject, + html_content, + text_content + ) + business_logger.info(f"邮件发送成功: {to_email}") + except smtplib.SMTPAuthenticationError: + err_msg = "SMTP认证失败,请检查SMTP账号/密码是否正确" + business_logger.error(f"邮件发送失败: {to_email} - {err_msg}") + raise BusinessException(err_msg, code=BizCode.UNAUTHORIZED) + except smtplib.SMTPConnectError: + err_msg = "SMTP服务器连接失败,请检查服务器地址/端口是否正确" + business_logger.error(f"邮件发送失败: {to_email} - {err_msg}") + raise BusinessException(err_msg, code=BizCode.SERVICE_UNAVAILABLE) + except TimeoutError: + err_msg = "邮件发送超时,请检查SMTP服务器配置" + business_logger.error(f"邮件发送失败: {to_email} - {err_msg}") + raise BusinessException(err_msg, code=BizCode.BAD_REQUEST) + except Exception as e: + business_logger.error(f"邮件发送失败: {to_email} - {str(e)}") + raise BusinessException(f"邮件发送失败: {str(e)}", code=BizCode.SERVICE_UNAVAILABLE) diff --git a/api/app/services/user_service.py b/api/app/services/user_service.py index d97e2fb2..22dabed7 100644 --- a/api/app/services/user_service.py +++ b/api/app/services/user_service.py @@ -1,13 +1,18 @@ import datetime +import json import secrets import string + +from pydantic import EmailStr from sqlalchemy.orm import Session import uuid +from app.aioRedis import aio_redis_set, aio_redis_get, aio_redis_delete from app.models.user_model import User from app.repositories import user_repository from app.schemas.user_schema import UserCreate from app.schemas.tenant_schema import TenantCreate +from app.services.email_service import send_email from app.services.tenant_service import TenantService from app.services.session_service import SessionService from app.core.security import get_password_hash, verify_password @@ -563,3 +568,175 @@ def generate_random_password(length: int = 12) -> str: secrets.SystemRandom().shuffle(password) return ''.join(password) + + +def generate_email_code() -> str: + """生成6位数字验证码""" + return ''.join([str(secrets.randbelow(10)) for _ in range(6)]) + + +async def send_email_code_method(db: Session, email: EmailStr, user_id: uuid.UUID): + """发送邮箱验证码""" + business_logger.info(f"发送邮箱验证码: email={email}") + + # 检查发送间隔 + rate_limit_key = f"email_code_rate:{user_id}" + last_send = await aio_redis_get(rate_limit_key) + + if last_send: + raise BusinessException("请稍后再试,验证码发送间隔为1分钟", code=BizCode.RATE_LIMITED) + + # 检查新邮箱是否已被使用 + existing_user = user_repository.get_user_by_email(db=db, email=email) + if existing_user and existing_user.id != user_id: + raise BusinessException("邮箱已被使用", code=BizCode.DUPLICATE_NAME) + + if existing_user and existing_user.id == user_id: + raise BusinessException("新邮箱与当前邮箱相同", code=BizCode.DUPLICATE_NAME) + + # 生成验证码 + code = generate_email_code() + + # 存储到 Redis,5分钟过期 + cache_key = f"email_code:{user_id}:{email}" + await aio_redis_set(cache_key, json.dumps(code), expire=300) + + # 发送邮件 + await send_email( + email, + "邮箱验证码", + f'您的验证码是:{code}
验证码在5分钟内有效。
' + ) + + # 设置发送间隔限制,60秒 + await aio_redis_set(rate_limit_key, "1", expire=60) + + business_logger.info(f"邮箱验证码已发送: {email}") + + +async def verify_and_change_email(db: Session, user_id: uuid.UUID, new_email: EmailStr, code: str) -> User: + """验证验证码并修改邮箱""" + business_logger.info(f"验证并修改邮箱: user_id={user_id}, new_email={new_email}") + + db_user = user_repository.get_user_by_id(db=db, user_id=user_id) + if not db_user: + raise BusinessException("用户不存在", code=BizCode.USER_NOT_FOUND) + + # 验证验证码 + cache_key = f"email_code:{user_id}:{new_email}" + cached_code = await aio_redis_get(cache_key) + + if not cached_code: + raise BusinessException("验证码已过期", code=BizCode.VALIDATION_FAILED) + + if json.loads(cached_code) != code: + raise BusinessException("验证码错误", code=BizCode.VALIDATION_FAILED) + + # 修改邮箱 + db_user.email = new_email + db.commit() + db.refresh(db_user) + + # 删除验证码 + await aio_redis_delete(cache_key) + + # 使所有旧 tokens 失效 + # await SessionService.invalidate_all_user_tokens(str(user_id)) + + business_logger.info(f"用户邮箱修改成功: {db_user.username}, new_email={new_email}") + return db_user + + +# def generate_email_token(user_id: str, old_email: str, new_email: str) -> str: +# """生成邮箱修改token""" +# payload = { +# "user_id": user_id, +# "old_email": old_email, +# "new_email": new_email, +# "exp": datetime.datetime.now(datetime.timezone.utc) + timedelta(hours=24) +# } +# return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM) +# +# +# def verify_email_token(token: str) -> dict: +# """验证邮箱修改token""" +# try: +# payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) +# return payload +# except jwt.ExpiredSignatureError: +# raise BusinessException("链接已过期", code=BizCode.VALIDATION_FAILED) +# except jwt.InvalidTokenError: +# raise BusinessException("无效的链接", code=BizCode.VALIDATION_FAILED) +# +# +# async def request_change_email(db: Session, user_id: uuid.UUID, new_email: EmailStr, current_user: User): +# """请求修改邮箱,发送验证邮件""" +# business_logger.info(f"用户请求修改邮箱: user_id={user_id}, new_email={new_email}") +# +# if current_user.id != user_id: +# raise PermissionDeniedException("只能修改自己的邮箱") +# +# db_user = user_repository.get_user_by_id(db=db, user_id=user_id) +# if not db_user: +# raise BusinessException("用户不存在", code=BizCode.USER_NOT_FOUND) +# +# if db_user.email == new_email: +# raise BusinessException("新邮箱与当前邮箱相同", code=BizCode.VALIDATION_FAILED) +# +# existing_user = user_repository.get_user_by_email(db=db, email=new_email) +# if existing_user and existing_user.id != user_id: +# raise BusinessException("邮箱已被使用", code=BizCode.DUPLICATE_NAME) +# +# token = generate_email_token(str(user_id), db_user.email, new_email) +# +# # 发送确认邮件到旧邮箱 +# old_email_link = f"{settings.BASE_URL}/api/users/email/confirm-email-change?token={token}" +# await send_email( +# db_user.email, +# "确认修改邮箱", +# f'请点击以下链接确认修改邮箱:
确认修改' +# ) +# +# business_logger.info(f"邮箱修改确认邮件已发送到旧邮箱: {db_user.email}") +# +# +# async def confirm_email_change(db: Session, token: str): +# """确认修改邮箱(旧邮箱确认)""" +# payload = verify_email_token(token) +# user_id = uuid.UUID(payload["user_id"]) +# new_email = payload["new_email"] +# +# db_user = user_repository.get_user_by_id(db=db, user_id=user_id) +# if not db_user: +# raise BusinessException("用户不存在", code=BizCode.USER_NOT_FOUND) +# +# # 发送激活邮件到新邮箱 +# activate_link = f"{settings.BASE_URL}/api/users/email/activate-new-email?token={token}" +# await send_email( +# new_email, +# "激活新邮箱", +# f'请点击以下链接激活新邮箱:
激活邮箱' +# ) +# +# business_logger.info(f"新邮箱激活邮件已发送: {new_email}") +# +# +# async def activate_new_email(db: Session, token: str) -> User: +# """激活新邮箱""" +# payload = verify_email_token(token) +# user_id = uuid.UUID(payload["user_id"]) +# new_email = payload["new_email"] +# +# db_user = user_repository.get_user_by_id(db=db, user_id=user_id) +# if not db_user: +# raise BusinessException("用户不存在", code=BizCode.USER_NOT_FOUND) +# +# db_user.email = new_email +# db.commit() +# db.refresh(db_user) +# +# # 使所有旧 tokens 失效 +# await SessionService.invalidate_all_user_tokens(str(user_id)) +# +# business_logger.info(f"用户邮箱修改成功: {db_user.username}, new_email={new_email}") +# return db_user diff --git a/api/env.example b/api/env.example index dfb1ae61..e8074f82 100644 --- a/api/env.example +++ b/api/env.example @@ -64,6 +64,9 @@ LANGCHAIN_ENDPOINT= # Generate a new one with: openssl rand -hex 32 SECRET_KEY=your-secret-key-here-generate-with-openssl-rand-hex-32 +# official environment system version +SYSTEM_VERSION= + # JWT Token expiration settings ACCESS_TOKEN_EXPIRE_MINUTES=30 REFRESH_TOKEN_EXPIRE_DAYS=7 @@ -129,6 +132,12 @@ KB_image2text_id= config_id= reranker_id= +# Email Configuration +SMTP_SERVER= +SMTP_PORT= +SMTP_USER= +SMTP_PASSWORD= + # 本体类型融合配置 (记得写入env_example) GENERAL_ONTOLOGY_FILES=General_purpose_entity.ttl # 指定要加载的本体文件路径,多个文件用逗号分隔 ENABLE_GENERAL_ONTOLOGY_TYPES=true # 总开关,控制是否启用通用本体类型融合功能(false = 不使用任何本体类型指导)