[add] The "update-implicit-emotions-storage" task uses the timeline to filter the updated data users.

This commit is contained in:
lanceyq
2026-03-07 16:23:59 +08:00
parent 2612abc9d0
commit c14f067afb
2 changed files with 109 additions and 49 deletions

View File

@@ -111,6 +111,57 @@ class ImplicitEmotionsStorageRepository:
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
break
def get_users_needing_refresh(self, redis_client, batch_size: int = 100) -> Generator[str, None, None]:
"""分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。
筛选逻辑:
- 查询 implicit_emotions_storage 中所有用户的 end_user_id 和 updated_at
- 从 Redis 读取 write_message:last_done:{end_user_id} 的时间戳
- 若 Redis 中无记录(该用户从未写入过记忆),跳过
- 若 last_done > updated_at说明上次刷新后又有新记忆写入需要刷新
- 若 last_done <= updated_at说明已是最新跳过
Args:
redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB
batch_size: 每批次加载的数量
Yields:
需要刷新的用户ID字符串
"""
from datetime import timezone
offset = 0
while True:
try:
stmt = (
select(ImplicitEmotionsStorage.end_user_id, ImplicitEmotionsStorage.updated_at)
.order_by(ImplicitEmotionsStorage.end_user_id)
.limit(batch_size)
.offset(offset)
)
batch = self.db.execute(stmt).all()
if not batch:
break
for end_user_id, updated_at in batch:
raw = redis_client.get(f"write_message:last_done:{end_user_id}")
if raw is None:
# 该用户从未有过 write_message 成功记录,跳过
continue
try:
last_done = datetime.fromisoformat(raw)
# 统一去掉时区信息做 naive 比较
if last_done.tzinfo is not None:
last_done = last_done.astimezone(timezone.utc).replace(tzinfo=None)
if updated_at is None or last_done > updated_at:
yield end_user_id
except Exception as e:
logger.warning(f"解析 last_done 时间戳失败: end_user_id={end_user_id}, raw={raw}, error={e}")
offset += batch_size
except Exception as e:
logger.error(f"get_users_needing_refresh 分批查询失败: offset={offset}, error={e}")
break
def get_new_user_ids_today(self, batch_size: int = 100) -> Generator[str, None, None]:
"""分批次获取当天新增的、尚未初始化隐性记忆和情绪建议数据的用户ID

View File

@@ -1090,6 +1090,25 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s
logger.info(
f"[CELERY WRITE] Task completed successfully - elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}")
# 记录该用户最后一次 write_message 成功的时间,供 init_implicit_emotions_for_users 做时间轴筛选
try:
import redis as _redis
from urllib.parse import quote as _quote
_r = _redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB_CELERY_BACKEND,
password=settings.REDIS_PASSWORD,
decode_responses=True,
)
_r.set(
f"write_message:last_done:{end_user_id}",
datetime.utcnow().isoformat(),
ex=86400 * 30, # 30天过期
)
except Exception as _e:
logger.warning(f"[CELERY WRITE] 写入 last_done 时间戳失败(不影响主流程): {_e}")
return {
"status": "SUCCESS",
"result": result,
@@ -2167,18 +2186,27 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
with get_db_context() as db:
try:
# 获取所有已存储数据的用户ID分批次处理
repo = ImplicitEmotionsStorageRepository(db)
# 先统计总数用于日志
from sqlalchemy import func
total_users = db.execute(
select(func.count()).select_from(ImplicitEmotionsStorage)
).scalar() or 0
logger.info(f"找到 {total_users} 个需要更新的用户")
logger.info(f"表中存量用户总数: {total_users},开始时间轴筛选")
# 遍历每个用户并更新数据分批次避免一次性加载所有ID
for end_user_id in repo.get_all_user_ids(batch_size=100):
# 构建 Redis 同步客户端,用于时间轴筛选
import redis as _redis
_redis_client = _redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB_CELERY_BACKEND,
password=settings.REDIS_PASSWORD,
decode_responses=True,
)
# 只处理 last_done > updated_at 的用户(有新记忆写入的用户)
for end_user_id in repo.get_users_needing_refresh(_redis_client, batch_size=100):
logger.info(f"开始处理用户: {end_user_id}")
user_start_time = time.time()
@@ -2264,10 +2292,10 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
user_results.append(error_info)
logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}")
# ---- 处理增量用户(当天新增、尚未初始化的用户)----
# ---- 当天新增用户兜底初始化 ----
new_users_initialized = 0
new_users_failed = 0
logger.info("开始处理当天新增的增量用户初始化")
logger.info("开始处理当天新增用户的兜底初始化")
for end_user_id in repo.get_new_user_ids_today(batch_size=100):
logger.info(f"开始初始化新用户: {end_user_id}")
@@ -2281,35 +2309,27 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id)
profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id)
await implicit_service.save_profile_cache(
end_user_id=end_user_id,
profile_data=profile_data,
db=db
end_user_id=end_user_id, profile_data=profile_data, db=db
)
implicit_success = True
logger.info(f"成功初始化新用户 {end_user_id} 的隐性记忆画像")
except Exception as e:
error_msg = f"隐性记忆初始化失败: {str(e)}"
errors.append(error_msg)
logger.error(f"新用户 {end_user_id} {error_msg}")
errors.append(f"隐性记忆初始化失败: {str(e)}")
logger.error(f"新用户 {end_user_id} 隐性记忆初始化失败: {e}")
try:
emotion_service = EmotionAnalyticsService()
suggestions_data = await emotion_service.generate_emotion_suggestions(
end_user_id=end_user_id,
db=db,
language="zh"
end_user_id=end_user_id, db=db, language="zh"
)
await emotion_service.save_suggestions_cache(
end_user_id=end_user_id,
suggestions_data=suggestions_data,
db=db
end_user_id=end_user_id, suggestions_data=suggestions_data, db=db
)
emotion_success = True
logger.info(f"成功初始化新用户 {end_user_id} 的情绪建议")
except Exception as e:
error_msg = f"情绪建议初始化失败: {str(e)}"
errors.append(error_msg)
logger.error(f"新用户 {end_user_id} {error_msg}")
errors.append(f"情绪建议初始化失败: {str(e)}")
logger.error(f"新用户 {end_user_id} 情绪建议初始化失败: {e}")
if implicit_success or emotion_success:
new_users_initialized += 1
@@ -2319,7 +2339,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
user_elapsed = time.time() - user_start_time
user_results.append({
"end_user_id": end_user_id,
"type": "init",
"type": "new_user_init",
"implicit_success": implicit_success,
"emotion_success": emotion_success,
"errors": errors,
@@ -2331,7 +2351,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
user_elapsed = time.time() - user_start_time
user_results.append({
"end_user_id": end_user_id,
"type": "init",
"type": "new_user_init",
"implicit_success": False,
"emotion_success": False,
"errors": [str(e)],
@@ -2339,27 +2359,24 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
})
logger.error(f"初始化新用户 {end_user_id} 时出错: {str(e)}")
logger.info(
f"增量用户初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}"
)
# ---- 增量用户处理结束 ----
logger.info(f"当天新增用户兜底初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}")
# ---- 新增用户兜底初始化结束 ----
# 记录总体统计信息
logger.info(
f"隐性记忆和情绪数据更新定时任务完成: "
f"存量用户总数={total_users}, "
f"隐性记忆成功={successful_implicit}, "
f"情绪建议成功={successful_emotion}, "
f"存量失败={failed}, "
f"增量初始化成功={new_users_initialized}, "
f"增量初始化失败={new_users_failed}"
f"新增用户初始化成功={new_users_initialized}, "
f"新增用户初始化失败={new_users_failed}"
)
return {
"status": "SUCCESS",
"message": (
f"存量用户 {total_users} 个,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功;"
f"量新用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败"
f"当天新增用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败"
),
"total_users": total_users,
"successful_implicit": successful_implicit,
@@ -2367,7 +2384,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
"failed": failed,
"new_users_initialized": new_users_initialized,
"new_users_failed": new_users_failed,
"user_results": user_results[:50] # 只保留前50个用户的详细结果
"user_results": user_results[:50]
}
except Exception as e:
@@ -2430,12 +2447,13 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
soft_time_limit=3300,
)
def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]:
"""按需初始化:为指定用户列表中尚未生成隐性记忆/情绪数据的用户执行首次生成
"""事件触发任务:对指定用户列表做存在性检查,无记录则执行首次初始化
由 /dashboard/end_users 接口触发,仅处理 implicit_emotions_storage 表中不存在记录的用户
由 /dashboard/end_users 接口触发,已有数据的用户直接跳过
存量用户的数据刷新由定时任务 update_implicit_emotions_storage 负责。
Args:
end_user_ids: 需要检查并初始化的用户ID列表
end_user_ids: 需要检查的用户ID列表
Returns:
包含任务执行结果的字典
@@ -2459,24 +2477,20 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str,
repo = ImplicitEmotionsStorageRepository(db)
for end_user_id in end_user_ids:
# 幂等检查:已有记录则跳过
existing = repo.get_by_end_user_id(end_user_id)
if existing is not None:
skipped += 1
continue
logger.info(f"用户 {end_user_id}隐性记忆数据,开始初始化")
logger.info(f"用户 {end_user_id}记录,开始初始化")
implicit_ok = False
emotion_ok = False
try:
try:
implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id)
profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id)
await implicit_service.save_profile_cache(
end_user_id=end_user_id,
profile_data=profile_data,
db=db
end_user_id=end_user_id, profile_data=profile_data, db=db
)
implicit_ok = True
except Exception as e:
@@ -2485,14 +2499,10 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str,
try:
emotion_service = EmotionAnalyticsService()
suggestions_data = await emotion_service.generate_emotion_suggestions(
end_user_id=end_user_id,
db=db,
language="zh"
end_user_id=end_user_id, db=db, language="zh"
)
await emotion_service.save_suggestions_cache(
end_user_id=end_user_id,
suggestions_data=suggestions_data,
db=db
end_user_id=end_user_id, suggestions_data=suggestions_data, db=db
)
emotion_ok = True
except Exception as e:
@@ -2502,7 +2512,6 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str,
initialized += 1
else:
failed += 1
except Exception as e:
failed += 1
logger.error(f"用户 {end_user_id} 初始化异常: {e}")