[changes] AI reviews and modifies the code

This commit is contained in:
lanceyq
2026-03-03 15:16:47 +08:00
parent 6033d37537
commit 7446241735
9 changed files with 114 additions and 188 deletions

View File

@@ -2,10 +2,7 @@
Cache 缓存模块
提供各种缓存功能的统一入口
注意隐性记忆和情绪建议已迁移到数据库存储不再使用Redis缓存
"""
from .memory import EmotionMemoryCache, ImplicitMemoryCache
__all__ = [
"EmotionMemoryCache",
"ImplicitMemoryCache",
]
__all__ = []

View File

@@ -2,11 +2,7 @@
Memory 缓存模块
提供记忆系统相关的缓存功能
注意隐性记忆和情绪建议已迁移到数据库存储不再使用Redis缓存
"""
from .emotion_memory import EmotionMemoryCache
from .implicit_memory import ImplicitMemoryCache
__all__ = [
"EmotionMemoryCache",
"ImplicitMemoryCache",
]
__all__ = []

View File

@@ -262,7 +262,6 @@ async def check_emotion_data_exists(
@router.post("/suggestions", response_model=ApiResponse)
async def get_emotion_suggestions(
request: EmotionSuggestionsRequest,
language_type: str = Header(default=None, alias="X-Language-Type"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
@@ -277,9 +276,6 @@ async def get_emotion_suggestions(
存储的个性化情绪建议响应
"""
try:
# 使用集中化的语言校验
language = get_language_from_header(language_type)
api_logger.info(
f"用户 {current_user.username} 请求获取个性化情绪建议",
extra={
@@ -295,15 +291,13 @@ async def get_emotion_suggestions(
)
if data is None:
# 数据不存在,返回提示信息
api_logger.info(
f"用户 {request.end_user_id} 的建议数据不存在",
extra={"end_user_id": request.end_user_id}
)
return fail(
BizCode.NOT_FOUND,
"情绪建议数据不存在,请点击右上角刷新进行初始化",
""
return success(
data={"exists": False},
msg="情绪建议数据不存在,请点击右上角刷新进行初始化"
)
api_logger.info(

View File

@@ -152,10 +152,9 @@ async def check_user_data_exists(
if cached_profile is None:
api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
return fail(
BizCode.NOT_FOUND,
"画像数据不存在,请点击右上角刷新进行初始化",
{"exists": False}
return success(
data={"exists": False},
msg="画像数据不存在,请点击右上角刷新进行初始化"
)
api_logger.info(f"用户 {end_user_id} 的画像数据存在")
@@ -203,11 +202,7 @@ async def get_preference_tags(
if cached_profile is None:
api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
return fail(
BizCode.NOT_FOUND,
"画像数据不存在,请点击右上角刷新进行初始化",
""
)
return fail(BizCode.NOT_FOUND, "", "")
# Extract preferences from cache
preferences = cached_profile.get("preferences", [])
@@ -274,11 +269,7 @@ async def get_dimension_portrait(
if cached_profile is None:
api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
return fail(
BizCode.NOT_FOUND,
"画像数据不存在,请点击右上角刷新进行初始化",
""
)
return fail(BizCode.NOT_FOUND, "", "")
# Extract portrait from cache
portrait = cached_profile.get("portrait", {})
@@ -322,11 +313,7 @@ async def get_interest_area_distribution(
if cached_profile is None:
api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
return fail(
BizCode.NOT_FOUND,
"画像数据不存在,请点击右上角刷新进行初始化",
""
)
return fail(BizCode.NOT_FOUND, "", "")
# Extract interest areas from cache
interest_areas = cached_profile.get("interest_areas", {})
@@ -374,11 +361,7 @@ async def get_behavior_habits(
if cached_profile is None:
api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
return fail(
BizCode.NOT_FOUND,
"画像数据不存在,请点击右上角刷新进行初始化",
""
)
return fail(BizCode.NOT_FOUND, "", "")
# Extract habits from cache
habits = cached_profile.get("habits", [])

View File

@@ -19,8 +19,8 @@ class ImplicitEmotionsStorage(Base):
# 主键
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, comment="主键ID")
# 用户标识
end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID")
# 用户标识unique=True会自动创建唯一索引
end_user_id = Column(String(255), nullable=False, unique=True, comment="终端用户ID")
# 隐性记忆画像数据JSON格式
implicit_profile = Column(JSONB, nullable=True, comment="隐性记忆用户画像数据")
@@ -36,9 +36,8 @@ class ImplicitEmotionsStorage(Base):
implicit_generated_at = Column(DateTime, nullable=True, comment="隐性记忆画像生成时间")
emotion_generated_at = Column(DateTime, nullable=True, comment="情绪建议生成时间")
# 索引
# 索引只为updated_at创建索引end_user_id的unique约束已自动创建索引
__table_args__ = (
Index('idx_end_user_id', 'end_user_id'),
Index('idx_updated_at', 'updated_at'),
)

View File

@@ -2,10 +2,11 @@
Implicit Emotions Storage Repository
数据访问层:处理隐性记忆和情绪数据的数据库操作
事务由调用方控制,仓储层只使用 flush/refresh
"""
import logging
from datetime import datetime
from typing import Optional, List
from typing import Optional, Generator
from sqlalchemy.orm import Session
from sqlalchemy import select
@@ -16,154 +17,105 @@ logger = logging.getLogger(__name__)
class ImplicitEmotionsStorageRepository:
"""隐性记忆和情绪存储仓储类"""
def __init__(self, db: Session):
self.db = db
def get_by_end_user_id(self, end_user_id: str) -> Optional[ImplicitEmotionsStorage]:
"""根据终端用户ID获取存储记录
Args:
end_user_id: 终端用户ID
Returns:
存储记录如果不存在返回None
"""
"""根据终端用户ID获取存储记录"""
try:
stmt = select(ImplicitEmotionsStorage).where(
ImplicitEmotionsStorage.end_user_id == end_user_id
)
result = self.db.execute(stmt).scalar_one_or_none()
return result
return self.db.execute(stmt).scalar_one_or_none()
except Exception as e:
logger.error(f"获取用户存储记录失败: end_user_id={end_user_id}, error={e}")
return None
def create(self, end_user_id: str) -> ImplicitEmotionsStorage:
"""创建新的存储记录
Args:
end_user_id: 终端用户ID
Returns:
新创建的存储记录
"""
try:
storage = ImplicitEmotionsStorage(
end_user_id=end_user_id,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
self.db.add(storage)
self.db.commit()
self.db.refresh(storage)
logger.info(f"创建用户存储记录成功: end_user_id={end_user_id}")
return storage
except Exception as e:
self.db.rollback()
logger.error(f"创建用户存储记录失败: end_user_id={end_user_id}, error={e}")
raise
"""创建新的存储记录(事务由调用方提交)"""
storage = ImplicitEmotionsStorage(
end_user_id=end_user_id,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
self.db.add(storage)
self.db.flush()
self.db.refresh(storage)
logger.info(f"创建用户存储记录成功: end_user_id={end_user_id}")
return storage
def update_implicit_profile(
self,
end_user_id: str,
profile_data: dict
) -> Optional[ImplicitEmotionsStorage]:
"""更新隐性记忆画像数据
Args:
end_user_id: 终端用户ID
profile_data: 画像数据
Returns:
更新后的存储记录
"""
try:
storage = self.get_by_end_user_id(end_user_id)
if storage is None:
# 如果记录不存在,创建新记录
storage = self.create(end_user_id)
storage.implicit_profile = profile_data
storage.implicit_generated_at = datetime.utcnow()
storage.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(storage)
logger.info(f"更新隐性记忆画像成功: end_user_id={end_user_id}")
return storage
except Exception as e:
self.db.rollback()
logger.error(f"更新隐性记忆画像失败: end_user_id={end_user_id}, error={e}")
raise
) -> ImplicitEmotionsStorage:
"""更新隐性记忆画像数据(事务由调用方提交)"""
storage = self.get_by_end_user_id(end_user_id)
if storage is None:
storage = self.create(end_user_id)
storage.implicit_profile = profile_data
storage.implicit_generated_at = datetime.utcnow()
storage.updated_at = datetime.utcnow()
self.db.flush()
self.db.refresh(storage)
logger.info(f"更新隐性记忆画像成功: end_user_id={end_user_id}")
return storage
def update_emotion_suggestions(
self,
end_user_id: str,
suggestions_data: dict
) -> Optional[ImplicitEmotionsStorage]:
"""更新情绪建议数据
) -> ImplicitEmotionsStorage:
"""更新情绪建议数据(事务由调用方提交)"""
storage = self.get_by_end_user_id(end_user_id)
if storage is None:
storage = self.create(end_user_id)
storage.emotion_suggestions = suggestions_data
storage.emotion_generated_at = datetime.utcnow()
storage.updated_at = datetime.utcnow()
self.db.flush()
self.db.refresh(storage)
logger.info(f"更新情绪建议成功: end_user_id={end_user_id}")
return storage
def get_all_user_ids(self, batch_size: int = 100) -> Generator[str, None, None]:
"""分批次获取所有已存储数据的用户ID避免大数据量内存溢出
Args:
end_user_id: 终端用户ID
suggestions_data: 建议数据
Returns:
更新后的存储记录
batch_size: 每批次加载的数量默认100
Yields:
用户ID字符串
"""
try:
storage = self.get_by_end_user_id(end_user_id)
if storage is None:
# 如果记录不存在,创建新记录
storage = self.create(end_user_id)
storage.emotion_suggestions = suggestions_data
storage.emotion_generated_at = datetime.utcnow()
storage.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(storage)
logger.info(f"更新情绪建议成功: end_user_id={end_user_id}")
return storage
except Exception as e:
self.db.rollback()
logger.error(f"更新情绪建议失败: end_user_id={end_user_id}, error={e}")
raise
def get_all_user_ids(self) -> List[str]:
"""获取所有已存储数据的用户ID列表
Returns:
用户ID列表
"""
try:
stmt = select(ImplicitEmotionsStorage.end_user_id)
result = self.db.execute(stmt).scalars().all()
return list(result)
except Exception as e:
logger.error(f"获取所有用户ID失败: error={e}")
return []
offset = 0
while True:
try:
stmt = (
select(ImplicitEmotionsStorage.end_user_id)
.order_by(ImplicitEmotionsStorage.end_user_id)
.limit(batch_size)
.offset(offset)
)
batch = self.db.execute(stmt).scalars().all()
if not batch:
break
yield from batch
offset += batch_size
except Exception as e:
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
break
def delete_by_end_user_id(self, end_user_id: str) -> bool:
"""删除用户的存储记录
Args:
end_user_id: 终端用户ID
Returns:
是否删除成功
"""
try:
storage = self.get_by_end_user_id(end_user_id)
if storage:
self.db.delete(storage)
self.db.commit()
logger.info(f"删除用户存储记录成功: end_user_id={end_user_id}")
return True
return False
except Exception as e:
self.db.rollback()
logger.error(f"删除用户存储记录失败: end_user_id={end_user_id}, error={e}")
return False
"""删除用户的存储记录(事务由调用方提交)"""
storage = self.get_by_end_user_id(end_user_id)
if storage:
self.db.delete(storage)
self.db.flush()
logger.info(f"删除用户存储记录成功: end_user_id={end_user_id}")
return True
return False

View File

@@ -892,12 +892,12 @@ class EmotionAnalyticsService:
logger.info(f"保存建议到数据库: user={end_user_id}")
# 保存到数据库
repo = ImplicitEmotionsStorageRepository(db)
repo.update_emotion_suggestions(end_user_id, suggestions_data)
db.commit()
logger.info(f"建议保存成功: user={end_user_id}")
except Exception as e:
logger.error(f"保存建议失败: {str(e)}", exc_info=True)
# 不抛出异常,存储失败不应影响主流程
db.rollback()
logger.error(f"保存建议失败: {str(e)}", exc_info=True)

View File

@@ -471,12 +471,12 @@ class ImplicitMemoryService:
logger.info(f"保存用户画像到数据库: user={end_user_id}")
# 保存到数据库
repo = ImplicitEmotionsStorageRepository(db)
repo.update_implicit_profile(end_user_id, profile_data)
db.commit()
logger.info(f"用户画像保存成功: user={end_user_id}")
except Exception as e:
db.rollback()
logger.error(f"保存用户画像失败: {str(e)}", exc_info=True)
# 不抛出异常,存储失败不应影响主流程

View File

@@ -2160,6 +2160,8 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_logger
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
from sqlalchemy import select, func
from app.services.implicit_memory_service import ImplicitMemoryService
from app.services.emotion_analytics_service import EmotionAnalyticsService
@@ -2174,15 +2176,18 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
with get_db_context() as db:
try:
# 获取所有已存储数据的用户ID
# 获取所有已存储数据的用户ID(分批次处理)
repo = ImplicitEmotionsStorageRepository(db)
user_ids = repo.get_all_user_ids()
total_users = len(user_ids)
# 先统计总数用于日志
from sqlalchemy import func
total_users = db.execute(
select(func.count()).select_from(ImplicitEmotionsStorage)
).scalar() or 0
logger.info(f"找到 {total_users} 个需要更新的用户")
# 遍历每个用户并更新数据
for end_user_id in user_ids:
# 遍历每个用户并更新数据分批次避免一次性加载所有ID
for end_user_id in repo.get_all_user_ids(batch_size=100):
logger.info(f"开始处理用户: {end_user_id}")
user_start_time = time.time()