[changes] Implicit and emotional memories are stored in a database.

This commit is contained in:
lanceyq
2026-03-03 15:33:17 +08:00
parent 1524d7b5ce
commit 6033d37537
11 changed files with 603 additions and 393 deletions

View File

@@ -2121,4 +2121,213 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
# "config_id": config_id,
# "elapsed_time": elapsed_time,
# "task_id": self.request.id
# }
# }
# =============================================================================
# 隐性记忆和情绪数据更新定时任务
# =============================================================================
@celery_app.task(
name="app.tasks.update_implicit_emotions_storage",
bind=True,
ignore_result=True,
max_retries=0,
acks_late=False,
time_limit=7200, # 2小时硬超时
soft_time_limit=6900, # 1小时55分钟软超时
)
def update_implicit_emotions_storage(self) -> Dict[str, Any]:
"""定时任务:更新所有用户的隐性记忆画像和情绪建议数据
遍历数据库中所有已存在数据的用户,为每个用户重新生成隐性记忆画像和情绪建议。
实现错误隔离,单个用户失败不影响其他用户的处理。
Returns:
包含任务执行结果的字典,包括:
- status: 任务状态 (SUCCESS/FAILURE)
- message: 执行消息
- total_users: 总用户数
- successful_implicit: 成功更新隐性记忆的用户数
- successful_emotion: 成功更新情绪建议的用户数
- failed: 失败的用户数
- user_results: 每个用户的详细结果
- elapsed_time: 执行耗时(秒)
- task_id: 任务ID
"""
start_time = time.time()
async def _run() -> Dict[str, Any]:
from app.core.logging_config import get_logger
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
from app.services.implicit_memory_service import ImplicitMemoryService
from app.services.emotion_analytics_service import EmotionAnalyticsService
logger = get_logger(__name__)
logger.info("开始执行隐性记忆和情绪数据更新定时任务")
total_users = 0
successful_implicit = 0
successful_emotion = 0
failed = 0
user_results = []
with get_db_context() as db:
try:
# 获取所有已存储数据的用户ID
repo = ImplicitEmotionsStorageRepository(db)
user_ids = repo.get_all_user_ids()
total_users = len(user_ids)
logger.info(f"找到 {total_users} 个需要更新的用户")
# 遍历每个用户并更新数据
for end_user_id in user_ids:
logger.info(f"开始处理用户: {end_user_id}")
user_start_time = time.time()
implicit_success = False
emotion_success = False
errors = []
try:
# 更新隐性记忆画像
try:
implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id)
profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id)
await implicit_service.save_profile_cache(
end_user_id=end_user_id,
profile_data=profile_data,
db=db
)
implicit_success = True
logger.info(f"成功更新用户 {end_user_id} 的隐性记忆画像")
except Exception as e:
error_msg = f"隐性记忆更新失败: {str(e)}"
errors.append(error_msg)
logger.error(f"用户 {end_user_id} {error_msg}")
# 更新情绪建议
try:
emotion_service = EmotionAnalyticsService(db=db, end_user_id=end_user_id)
suggestions_data = await emotion_service.generate_emotion_suggestions(
end_user_id=end_user_id,
db=db,
language="zh"
)
await emotion_service.save_suggestions_cache(
end_user_id=end_user_id,
suggestions_data=suggestions_data,
db=db
)
emotion_success = True
logger.info(f"成功更新用户 {end_user_id} 的情绪建议")
except Exception as e:
error_msg = f"情绪建议更新失败: {str(e)}"
errors.append(error_msg)
logger.error(f"用户 {end_user_id} {error_msg}")
# 统计结果
if implicit_success:
successful_implicit += 1
if emotion_success:
successful_emotion += 1
if not implicit_success and not emotion_success:
failed += 1
user_elapsed = time.time() - user_start_time
# 记录用户处理结果
user_result = {
"end_user_id": end_user_id,
"implicit_success": implicit_success,
"emotion_success": emotion_success,
"errors": errors,
"elapsed_time": user_elapsed
}
user_results.append(user_result)
logger.info(
f"用户 {end_user_id} 处理完成: "
f"隐性记忆={'成功' if implicit_success else '失败'}, "
f"情绪建议={'成功' if emotion_success else '失败'}, "
f"耗时={user_elapsed:.2f}"
)
except Exception as e:
# 单个用户失败不影响其他用户(错误隔离)
failed += 1
user_elapsed = time.time() - user_start_time
error_info = {
"end_user_id": end_user_id,
"implicit_success": False,
"emotion_success": False,
"errors": [str(e)],
"elapsed_time": user_elapsed
}
user_results.append(error_info)
logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}")
# 记录总体统计信息
logger.info(
f"隐性记忆和情绪数据更新定时任务完成: "
f"总用户数={total_users}, "
f"隐性记忆成功={successful_implicit}, "
f"情绪建议成功={successful_emotion}, "
f"失败={failed}"
)
return {
"status": "SUCCESS",
"message": f"成功处理 {total_users} 个用户,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功",
"total_users": total_users,
"successful_implicit": successful_implicit,
"successful_emotion": successful_emotion,
"failed": failed,
"user_results": user_results[:50] # 只保留前50个用户的详细结果
}
except Exception as e:
logger.error(f"隐性记忆和情绪数据更新定时任务执行失败: {str(e)}")
return {
"status": "FAILURE",
"error": str(e),
"total_users": total_users,
"successful_implicit": successful_implicit,
"successful_emotion": successful_emotion,
"failed": failed,
"user_results": user_results[:50]
}
try:
# 使用 nest_asyncio 来避免事件循环冲突
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
pass
# 尝试获取现有事件循环,如果不存在则创建新的
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(_run())
elapsed_time = time.time() - start_time
result["elapsed_time"] = elapsed_time
result["task_id"] = self.request.id
return result
except Exception as e:
elapsed_time = time.time() - start_time
return {
"status": "FAILURE",
"error": str(e),
"elapsed_time": elapsed_time,
"task_id": self.request.id
}