[add] Set up scheduled tasks for existing and new users

This commit is contained in:
lanceyq
2026-03-04 12:28:55 +08:00
parent 229eb5cc86
commit 94836ed9af
4 changed files with 155 additions and 11 deletions

View File

@@ -5,12 +5,13 @@ Implicit Emotions Storage Repository
事务由调用方控制,仓储层只使用 flush/refresh
"""
import logging
from datetime import datetime
from datetime import datetime, date
from typing import Optional, Generator
from sqlalchemy.orm import Session
from sqlalchemy import select
from sqlalchemy import select, not_, exists
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
from app.models.end_user_model import EndUser
logger = logging.getLogger(__name__)
@@ -110,6 +111,49 @@ class ImplicitEmotionsStorageRepository:
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
break
def get_new_user_ids_today(self, batch_size: int = 100) -> Generator[str, None, None]:
"""分批次获取当天新增的、尚未初始化隐性记忆和情绪建议数据的用户ID
查询逻辑end_users 表中 created_at 为今天,且在 implicit_emotions_storage 中没有对应记录。
没有对应记录意味着隐性记忆画像和情绪建议均未初始化,需要对这批用户执行首次初始化。
end_users.idUUID转为字符串后与 implicit_emotions_storage.end_user_idString对比。
Args:
batch_size: 每批次加载的数量默认100
Yields:
用户ID字符串
"""
from sqlalchemy import cast, String as SAString
today_start = datetime.combine(date.today(), datetime.min.time())
offset = 0
while True:
try:
stmt = (
select(EndUser.id)
.where(
EndUser.created_at >= today_start,
not_(
exists(
select(ImplicitEmotionsStorage.end_user_id).where(
ImplicitEmotionsStorage.end_user_id == cast(EndUser.id, SAString)
)
)
)
)
.order_by(EndUser.id)
.limit(batch_size)
.offset(offset)
)
batch = self.db.execute(stmt).scalars().all()
if not batch:
break
yield from (str(uid) for uid in batch)
offset += batch_size
except Exception as e:
logger.error(f"分批获取当天新增用户ID失败: offset={offset}, error={e}")
break
def delete_by_end_user_id(self, end_user_id: str) -> bool:
"""删除用户的存储记录(事务由调用方提交)"""
storage = self.get_by_end_user_id(end_user_id)