[changes] AI reviews and modifies the code

This commit is contained in:
lanceyq
2026-03-03 15:16:47 +08:00
parent 9675982555
commit 006c6cd159
9 changed files with 114 additions and 188 deletions

View File

@@ -2,10 +2,7 @@
Cache 缓存模块 Cache 缓存模块
提供各种缓存功能的统一入口 提供各种缓存功能的统一入口
注意隐性记忆和情绪建议已迁移到数据库存储不再使用Redis缓存
""" """
from .memory import EmotionMemoryCache, ImplicitMemoryCache
__all__ = [ __all__ = []
"EmotionMemoryCache",
"ImplicitMemoryCache",
]

View File

@@ -2,11 +2,7 @@
Memory 缓存模块 Memory 缓存模块
提供记忆系统相关的缓存功能 提供记忆系统相关的缓存功能
注意隐性记忆和情绪建议已迁移到数据库存储不再使用Redis缓存
""" """
from .emotion_memory import EmotionMemoryCache
from .implicit_memory import ImplicitMemoryCache
__all__ = [ __all__ = []
"EmotionMemoryCache",
"ImplicitMemoryCache",
]

View File

@@ -262,7 +262,6 @@ async def check_emotion_data_exists(
@router.post("/suggestions", response_model=ApiResponse) @router.post("/suggestions", response_model=ApiResponse)
async def get_emotion_suggestions( async def get_emotion_suggestions(
request: EmotionSuggestionsRequest, request: EmotionSuggestionsRequest,
language_type: str = Header(default=None, alias="X-Language-Type"),
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
@@ -277,9 +276,6 @@ async def get_emotion_suggestions(
存储的个性化情绪建议响应 存储的个性化情绪建议响应
""" """
try: try:
# 使用集中化的语言校验
language = get_language_from_header(language_type)
api_logger.info( api_logger.info(
f"用户 {current_user.username} 请求获取个性化情绪建议", f"用户 {current_user.username} 请求获取个性化情绪建议",
extra={ extra={
@@ -295,15 +291,13 @@ async def get_emotion_suggestions(
) )
if data is None: if data is None:
# 数据不存在,返回提示信息
api_logger.info( api_logger.info(
f"用户 {request.end_user_id} 的建议数据不存在", f"用户 {request.end_user_id} 的建议数据不存在",
extra={"end_user_id": request.end_user_id} extra={"end_user_id": request.end_user_id}
) )
return fail( return success(
BizCode.NOT_FOUND, data={"exists": False},
"情绪建议数据不存在,请点击右上角刷新进行初始化", msg="情绪建议数据不存在,请点击右上角刷新进行初始化"
""
) )
api_logger.info( api_logger.info(

View File

@@ -152,10 +152,9 @@ async def check_user_data_exists(
if cached_profile is None: if cached_profile is None:
api_logger.info(f"用户 {end_user_id} 的画像数据不存在") api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
return fail( return success(
BizCode.NOT_FOUND, data={"exists": False},
"画像数据不存在,请点击右上角刷新进行初始化", msg="画像数据不存在,请点击右上角刷新进行初始化"
{"exists": False}
) )
api_logger.info(f"用户 {end_user_id} 的画像数据存在") api_logger.info(f"用户 {end_user_id} 的画像数据存在")
@@ -203,11 +202,7 @@ async def get_preference_tags(
if cached_profile is None: if cached_profile is None:
api_logger.info(f"用户 {end_user_id} 的画像数据不存在") api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
return fail( return fail(BizCode.NOT_FOUND, "", "")
BizCode.NOT_FOUND,
"画像数据不存在,请点击右上角刷新进行初始化",
""
)
# Extract preferences from cache # Extract preferences from cache
preferences = cached_profile.get("preferences", []) preferences = cached_profile.get("preferences", [])
@@ -274,11 +269,7 @@ async def get_dimension_portrait(
if cached_profile is None: if cached_profile is None:
api_logger.info(f"用户 {end_user_id} 的画像数据不存在") api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
return fail( return fail(BizCode.NOT_FOUND, "", "")
BizCode.NOT_FOUND,
"画像数据不存在,请点击右上角刷新进行初始化",
""
)
# Extract portrait from cache # Extract portrait from cache
portrait = cached_profile.get("portrait", {}) portrait = cached_profile.get("portrait", {})
@@ -322,11 +313,7 @@ async def get_interest_area_distribution(
if cached_profile is None: if cached_profile is None:
api_logger.info(f"用户 {end_user_id} 的画像数据不存在") api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
return fail( return fail(BizCode.NOT_FOUND, "", "")
BizCode.NOT_FOUND,
"画像数据不存在,请点击右上角刷新进行初始化",
""
)
# Extract interest areas from cache # Extract interest areas from cache
interest_areas = cached_profile.get("interest_areas", {}) interest_areas = cached_profile.get("interest_areas", {})
@@ -374,11 +361,7 @@ async def get_behavior_habits(
if cached_profile is None: if cached_profile is None:
api_logger.info(f"用户 {end_user_id} 的画像数据不存在") api_logger.info(f"用户 {end_user_id} 的画像数据不存在")
return fail( return fail(BizCode.NOT_FOUND, "", "")
BizCode.NOT_FOUND,
"画像数据不存在,请点击右上角刷新进行初始化",
""
)
# Extract habits from cache # Extract habits from cache
habits = cached_profile.get("habits", []) habits = cached_profile.get("habits", [])

View File

@@ -19,8 +19,8 @@ class ImplicitEmotionsStorage(Base):
# 主键 # 主键
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, comment="主键ID") id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, comment="主键ID")
# 用户标识 # 用户标识unique=True会自动创建唯一索引
end_user_id = Column(String(255), nullable=False, unique=True, index=True, comment="终端用户ID") end_user_id = Column(String(255), nullable=False, unique=True, comment="终端用户ID")
# 隐性记忆画像数据JSON格式 # 隐性记忆画像数据JSON格式
implicit_profile = Column(JSONB, nullable=True, comment="隐性记忆用户画像数据") implicit_profile = Column(JSONB, nullable=True, comment="隐性记忆用户画像数据")
@@ -36,9 +36,8 @@ class ImplicitEmotionsStorage(Base):
implicit_generated_at = Column(DateTime, nullable=True, comment="隐性记忆画像生成时间") implicit_generated_at = Column(DateTime, nullable=True, comment="隐性记忆画像生成时间")
emotion_generated_at = Column(DateTime, nullable=True, comment="情绪建议生成时间") emotion_generated_at = Column(DateTime, nullable=True, comment="情绪建议生成时间")
# 索引 # 索引只为updated_at创建索引end_user_id的unique约束已自动创建索引
__table_args__ = ( __table_args__ = (
Index('idx_end_user_id', 'end_user_id'),
Index('idx_updated_at', 'updated_at'), Index('idx_updated_at', 'updated_at'),
) )

View File

@@ -2,10 +2,11 @@
Implicit Emotions Storage Repository Implicit Emotions Storage Repository
数据访问层:处理隐性记忆和情绪数据的数据库操作 数据访问层:处理隐性记忆和情绪数据的数据库操作
事务由调用方控制,仓储层只使用 flush/refresh
""" """
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Optional, List from typing import Optional, Generator
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import select from sqlalchemy import select
@@ -21,149 +22,100 @@ class ImplicitEmotionsStorageRepository:
self.db = db self.db = db
def get_by_end_user_id(self, end_user_id: str) -> Optional[ImplicitEmotionsStorage]: def get_by_end_user_id(self, end_user_id: str) -> Optional[ImplicitEmotionsStorage]:
"""根据终端用户ID获取存储记录 """根据终端用户ID获取存储记录"""
Args:
end_user_id: 终端用户ID
Returns:
存储记录如果不存在返回None
"""
try: try:
stmt = select(ImplicitEmotionsStorage).where( stmt = select(ImplicitEmotionsStorage).where(
ImplicitEmotionsStorage.end_user_id == end_user_id ImplicitEmotionsStorage.end_user_id == end_user_id
) )
result = self.db.execute(stmt).scalar_one_or_none() return self.db.execute(stmt).scalar_one_or_none()
return result
except Exception as e: except Exception as e:
logger.error(f"获取用户存储记录失败: end_user_id={end_user_id}, error={e}") logger.error(f"获取用户存储记录失败: end_user_id={end_user_id}, error={e}")
return None return None
def create(self, end_user_id: str) -> ImplicitEmotionsStorage: def create(self, end_user_id: str) -> ImplicitEmotionsStorage:
"""创建新的存储记录 """创建新的存储记录(事务由调用方提交)"""
storage = ImplicitEmotionsStorage(
Args: end_user_id=end_user_id,
end_user_id: 终端用户ID created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
Returns: )
新创建的存储记录 self.db.add(storage)
""" self.db.flush()
try: self.db.refresh(storage)
storage = ImplicitEmotionsStorage( logger.info(f"创建用户存储记录成功: end_user_id={end_user_id}")
end_user_id=end_user_id, return storage
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
self.db.add(storage)
self.db.commit()
self.db.refresh(storage)
logger.info(f"创建用户存储记录成功: end_user_id={end_user_id}")
return storage
except Exception as e:
self.db.rollback()
logger.error(f"创建用户存储记录失败: end_user_id={end_user_id}, error={e}")
raise
def update_implicit_profile( def update_implicit_profile(
self, self,
end_user_id: str, end_user_id: str,
profile_data: dict profile_data: dict
) -> Optional[ImplicitEmotionsStorage]: ) -> ImplicitEmotionsStorage:
"""更新隐性记忆画像数据 """更新隐性记忆画像数据(事务由调用方提交)"""
storage = self.get_by_end_user_id(end_user_id)
if storage is None:
storage = self.create(end_user_id)
Args: storage.implicit_profile = profile_data
end_user_id: 终端用户ID storage.implicit_generated_at = datetime.utcnow()
profile_data: 画像数据 storage.updated_at = datetime.utcnow()
Returns: self.db.flush()
更新后的存储记录 self.db.refresh(storage)
""" logger.info(f"更新隐性记忆画像成功: end_user_id={end_user_id}")
try: return storage
storage = self.get_by_end_user_id(end_user_id)
if storage is None:
# 如果记录不存在,创建新记录
storage = self.create(end_user_id)
storage.implicit_profile = profile_data
storage.implicit_generated_at = datetime.utcnow()
storage.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(storage)
logger.info(f"更新隐性记忆画像成功: end_user_id={end_user_id}")
return storage
except Exception as e:
self.db.rollback()
logger.error(f"更新隐性记忆画像失败: end_user_id={end_user_id}, error={e}")
raise
def update_emotion_suggestions( def update_emotion_suggestions(
self, self,
end_user_id: str, end_user_id: str,
suggestions_data: dict suggestions_data: dict
) -> Optional[ImplicitEmotionsStorage]: ) -> ImplicitEmotionsStorage:
"""更新情绪建议数据 """更新情绪建议数据(事务由调用方提交)"""
storage = self.get_by_end_user_id(end_user_id)
if storage is None:
storage = self.create(end_user_id)
storage.emotion_suggestions = suggestions_data
storage.emotion_generated_at = datetime.utcnow()
storage.updated_at = datetime.utcnow()
self.db.flush()
self.db.refresh(storage)
logger.info(f"更新情绪建议成功: end_user_id={end_user_id}")
return storage
def get_all_user_ids(self, batch_size: int = 100) -> Generator[str, None, None]:
"""分批次获取所有已存储数据的用户ID避免大数据量内存溢出
Args: Args:
end_user_id: 终端用户ID batch_size: 每批次加载的数量默认100
suggestions_data: 建议数据
Returns: Yields:
更新后的存储记录 用户ID字符串
""" """
try: offset = 0
storage = self.get_by_end_user_id(end_user_id) while True:
try:
if storage is None: stmt = (
# 如果记录不存在,创建新记录 select(ImplicitEmotionsStorage.end_user_id)
storage = self.create(end_user_id) .order_by(ImplicitEmotionsStorage.end_user_id)
.limit(batch_size)
storage.emotion_suggestions = suggestions_data .offset(offset)
storage.emotion_generated_at = datetime.utcnow() )
storage.updated_at = datetime.utcnow() batch = self.db.execute(stmt).scalars().all()
if not batch:
self.db.commit() break
self.db.refresh(storage) yield from batch
logger.info(f"更新情绪建议成功: end_user_id={end_user_id}") offset += batch_size
return storage except Exception as e:
except Exception as e: logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
self.db.rollback() break
logger.error(f"更新情绪建议失败: end_user_id={end_user_id}, error={e}")
raise
def get_all_user_ids(self) -> List[str]:
"""获取所有已存储数据的用户ID列表
Returns:
用户ID列表
"""
try:
stmt = select(ImplicitEmotionsStorage.end_user_id)
result = self.db.execute(stmt).scalars().all()
return list(result)
except Exception as e:
logger.error(f"获取所有用户ID失败: error={e}")
return []
def delete_by_end_user_id(self, end_user_id: str) -> bool: def delete_by_end_user_id(self, end_user_id: str) -> bool:
"""删除用户的存储记录 """删除用户的存储记录(事务由调用方提交)"""
storage = self.get_by_end_user_id(end_user_id)
Args: if storage:
end_user_id: 终端用户ID self.db.delete(storage)
self.db.flush()
Returns: logger.info(f"删除用户存储记录成功: end_user_id={end_user_id}")
是否删除成功 return True
""" return False
try:
storage = self.get_by_end_user_id(end_user_id)
if storage:
self.db.delete(storage)
self.db.commit()
logger.info(f"删除用户存储记录成功: end_user_id={end_user_id}")
return True
return False
except Exception as e:
self.db.rollback()
logger.error(f"删除用户存储记录失败: end_user_id={end_user_id}, error={e}")
return False

View File

@@ -892,12 +892,12 @@ class EmotionAnalyticsService:
logger.info(f"保存建议到数据库: user={end_user_id}") logger.info(f"保存建议到数据库: user={end_user_id}")
# 保存到数据库
repo = ImplicitEmotionsStorageRepository(db) repo = ImplicitEmotionsStorageRepository(db)
repo.update_emotion_suggestions(end_user_id, suggestions_data) repo.update_emotion_suggestions(end_user_id, suggestions_data)
db.commit()
logger.info(f"建议保存成功: user={end_user_id}") logger.info(f"建议保存成功: user={end_user_id}")
except Exception as e: except Exception as e:
db.rollback()
logger.error(f"保存建议失败: {str(e)}", exc_info=True) logger.error(f"保存建议失败: {str(e)}", exc_info=True)
# 不抛出异常,存储失败不应影响主流程

View File

@@ -471,12 +471,12 @@ class ImplicitMemoryService:
logger.info(f"保存用户画像到数据库: user={end_user_id}") logger.info(f"保存用户画像到数据库: user={end_user_id}")
# 保存到数据库
repo = ImplicitEmotionsStorageRepository(db) repo = ImplicitEmotionsStorageRepository(db)
repo.update_implicit_profile(end_user_id, profile_data) repo.update_implicit_profile(end_user_id, profile_data)
db.commit()
logger.info(f"用户画像保存成功: user={end_user_id}") logger.info(f"用户画像保存成功: user={end_user_id}")
except Exception as e: except Exception as e:
db.rollback()
logger.error(f"保存用户画像失败: {str(e)}", exc_info=True) logger.error(f"保存用户画像失败: {str(e)}", exc_info=True)
# 不抛出异常,存储失败不应影响主流程

View File

@@ -1963,6 +1963,8 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
async def _run() -> Dict[str, Any]: async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_logger from app.core.logging_config import get_logger
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
from sqlalchemy import select, func
from app.services.implicit_memory_service import ImplicitMemoryService from app.services.implicit_memory_service import ImplicitMemoryService
from app.services.emotion_analytics_service import EmotionAnalyticsService from app.services.emotion_analytics_service import EmotionAnalyticsService
@@ -1977,15 +1979,18 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
with get_db_context() as db: with get_db_context() as db:
try: try:
# 获取所有已存储数据的用户ID # 获取所有已存储数据的用户ID(分批次处理)
repo = ImplicitEmotionsStorageRepository(db) repo = ImplicitEmotionsStorageRepository(db)
user_ids = repo.get_all_user_ids()
total_users = len(user_ids)
# 先统计总数用于日志
from sqlalchemy import func
total_users = db.execute(
select(func.count()).select_from(ImplicitEmotionsStorage)
).scalar() or 0
logger.info(f"找到 {total_users} 个需要更新的用户") logger.info(f"找到 {total_users} 个需要更新的用户")
# 遍历每个用户并更新数据 # 遍历每个用户并更新数据分批次避免一次性加载所有ID
for end_user_id in user_ids: for end_user_id in repo.get_all_user_ids(batch_size=100):
logger.info(f"开始处理用户: {end_user_id}") logger.info(f"开始处理用户: {end_user_id}")
user_start_time = time.time() user_start_time = time.time()