Merge pull request #449 from SuanmoSuanyangTechnology/fix/time_task

[add] Set up scheduled tasks for existing and new users
This commit is contained in:
Ke Sun
2026-03-04 13:54:42 +08:00
committed by GitHub
4 changed files with 159 additions and 11 deletions

View File

@@ -4,6 +4,7 @@ from datetime import timedelta
from urllib.parse import quote
from celery import Celery
from celery.schedules import crontab
from app.core.config import settings
@@ -93,10 +94,12 @@ celery_app.autodiscover_tasks(['app'])
# Celery Beat schedule for periodic tasks
memory_increment_schedule = timedelta(hours=settings.MEMORY_INCREMENT_INTERVAL_HOURS)
memory_cache_regeneration_schedule = timedelta(hours=settings.MEMORY_CACHE_REGENERATION_HOURS)
# 这个30秒的设计不合理
workspace_reflection_schedule = timedelta(seconds=30) # 每30秒运行一次settings.REFLECTION_INTERVAL_TIME
forgetting_cycle_schedule = timedelta(hours=24) # 每24小时运行一次遗忘周期
implicit_emotions_update_schedule = timedelta(hours=24) # 每24小时更新一次隐性记忆和情绪数据
workspace_reflection_schedule = timedelta(seconds=settings.WORKSPACE_REFLECTION_INTERVAL_SECONDS)
forgetting_cycle_schedule = timedelta(hours=settings.FORGETTING_CYCLE_INTERVAL_HOURS)
implicit_emotions_update_schedule = crontab(
hour=settings.IMPLICIT_EMOTIONS_UPDATE_HOUR,
minute=settings.IMPLICIT_EMOTIONS_UPDATE_MINUTE,
)
#构建定时任务配置
beat_schedule_config = {

View File

@@ -207,7 +207,15 @@ class Settings:
# Memory Cache Regeneration Configuration
MEMORY_CACHE_REGENERATION_HOURS: int = int(os.getenv("MEMORY_CACHE_REGENERATION_HOURS", "24"))
# Memory Module Configuration (internal)
# Periodic Task Schedule Configuration
# workspace_reflection: 每隔多少秒执行一次
WORKSPACE_REFLECTION_INTERVAL_SECONDS: int = int(os.getenv("WORKSPACE_REFLECTION_INTERVAL_SECONDS", "30"))
# forgetting_cycle: 每隔多少小时执行一次
FORGETTING_CYCLE_INTERVAL_HOURS: int = int(os.getenv("FORGETTING_CYCLE_INTERVAL_HOURS", "24"))
# implicit_emotions_update: 每天几点执行小时0-23
IMPLICIT_EMOTIONS_UPDATE_HOUR: int = int(os.getenv("IMPLICIT_EMOTIONS_UPDATE_HOUR", "2"))
# implicit_emotions_update: 每天几分执行分钟0-59
IMPLICIT_EMOTIONS_UPDATE_MINUTE: int = int(os.getenv("IMPLICIT_EMOTIONS_UPDATE_MINUTE", "0")) # Memory Module Configuration (internal)
MEMORY_OUTPUT_DIR: str = os.getenv("MEMORY_OUTPUT_DIR", "logs/memory-output")
MEMORY_CONFIG_DIR: str = os.getenv("MEMORY_CONFIG_DIR", "app/core/memory")

View File

@@ -5,12 +5,13 @@ Implicit Emotions Storage Repository
事务由调用方控制,仓储层只使用 flush/refresh
"""
import logging
from datetime import datetime
from datetime import datetime, date, timezone, timedelta
from typing import Optional, Generator
from sqlalchemy.orm import Session
from sqlalchemy import select
from sqlalchemy import select, not_, exists
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
from app.models.end_user_model import EndUser
logger = logging.getLogger(__name__)
@@ -110,6 +111,53 @@ class ImplicitEmotionsStorageRepository:
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
break
def get_new_user_ids_today(self, batch_size: int = 100) -> Generator[str, None, None]:
"""分批次获取当天新增的、尚未初始化隐性记忆和情绪建议数据的用户ID
查询逻辑end_users 表中 created_at 为今天,且在 implicit_emotions_storage 中没有对应记录。
没有对应记录意味着隐性记忆画像和情绪建议均未初始化,需要对这批用户执行首次初始化。
end_users.idUUID转为字符串后与 implicit_emotions_storage.end_user_idString对比。
Args:
batch_size: 每批次加载的数量默认100
Yields:
用户ID字符串
"""
from sqlalchemy import cast, String as SAString
CST = timezone(timedelta(hours=8))
now_cst = datetime.now(CST)
today_start = now_cst.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc).replace(tzinfo=None)
tomorrow_start = today_start + timedelta(days=1)
offset = 0
while True:
try:
stmt = (
select(EndUser.id)
.where(
EndUser.created_at >= today_start,
EndUser.created_at < tomorrow_start,
not_(
exists(
select(ImplicitEmotionsStorage.end_user_id).where(
ImplicitEmotionsStorage.end_user_id == cast(EndUser.id, SAString)
)
)
)
)
.order_by(EndUser.id)
.limit(batch_size)
.offset(offset)
)
batch = self.db.execute(stmt).scalars().all()
if not batch:
break
yield from (str(uid) for uid in batch)
offset += batch_size
except Exception as e:
logger.error(f"分批获取当天新增用户ID失败: offset={offset}, error={e}")
break
def delete_by_end_user_id(self, end_user_id: str) -> bool:
"""删除用户的存储记录(事务由调用方提交)"""
storage = self.get_by_end_user_id(end_user_id)

View File

@@ -2214,7 +2214,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
# 更新情绪建议
try:
emotion_service = EmotionAnalyticsService(db=db, end_user_id=end_user_id)
emotion_service = EmotionAnalyticsService()
suggestions_data = await emotion_service.generate_emotion_suggestions(
end_user_id=end_user_id,
db=db,
@@ -2273,22 +2273,109 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
user_results.append(error_info)
logger.error(f"处理用户 {end_user_id} 时出错: {str(e)}")
# ---- 处理增量用户(当天新增、尚未初始化的用户)----
new_users_initialized = 0
new_users_failed = 0
logger.info("开始处理当天新增的增量用户初始化")
for end_user_id in repo.get_new_user_ids_today(batch_size=100):
logger.info(f"开始初始化新用户: {end_user_id}")
user_start_time = time.time()
implicit_success = False
emotion_success = False
errors = []
try:
try:
implicit_service = ImplicitMemoryService(db=db, end_user_id=end_user_id)
profile_data = await implicit_service.generate_complete_profile(user_id=end_user_id)
await implicit_service.save_profile_cache(
end_user_id=end_user_id,
profile_data=profile_data,
db=db
)
implicit_success = True
logger.info(f"成功初始化新用户 {end_user_id} 的隐性记忆画像")
except Exception as e:
error_msg = f"隐性记忆初始化失败: {str(e)}"
errors.append(error_msg)
logger.error(f"新用户 {end_user_id} {error_msg}")
try:
emotion_service = EmotionAnalyticsService()
suggestions_data = await emotion_service.generate_emotion_suggestions(
end_user_id=end_user_id,
db=db,
language="zh"
)
await emotion_service.save_suggestions_cache(
end_user_id=end_user_id,
suggestions_data=suggestions_data,
db=db
)
emotion_success = True
logger.info(f"成功初始化新用户 {end_user_id} 的情绪建议")
except Exception as e:
error_msg = f"情绪建议初始化失败: {str(e)}"
errors.append(error_msg)
logger.error(f"新用户 {end_user_id} {error_msg}")
if implicit_success or emotion_success:
new_users_initialized += 1
else:
new_users_failed += 1
user_elapsed = time.time() - user_start_time
user_results.append({
"end_user_id": end_user_id,
"type": "init",
"implicit_success": implicit_success,
"emotion_success": emotion_success,
"errors": errors,
"elapsed_time": user_elapsed
})
except Exception as e:
new_users_failed += 1
user_elapsed = time.time() - user_start_time
user_results.append({
"end_user_id": end_user_id,
"type": "init",
"implicit_success": False,
"emotion_success": False,
"errors": [str(e)],
"elapsed_time": user_elapsed
})
logger.error(f"初始化新用户 {end_user_id} 时出错: {str(e)}")
logger.info(
f"增量用户初始化完成: 成功={new_users_initialized}, 失败={new_users_failed}"
)
# ---- 增量用户处理结束 ----
# 记录总体统计信息
logger.info(
f"隐性记忆和情绪数据更新定时任务完成: "
f"用户数={total_users}, "
f"存量用户数={total_users}, "
f"隐性记忆成功={successful_implicit}, "
f"情绪建议成功={successful_emotion}, "
f"失败={failed}"
f"存量失败={failed}, "
f"增量初始化成功={new_users_initialized}, "
f"增量初始化失败={new_users_failed}"
)
return {
"status": "SUCCESS",
"message": f"成功处理 {total_users} 个用户,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功",
"message": (
f"存量用户 {total_users} 个,隐性记忆 {successful_implicit} 个成功,情绪建议 {successful_emotion} 个成功;"
f"增量新用户初始化 {new_users_initialized} 个成功,{new_users_failed} 个失败"
),
"total_users": total_users,
"successful_implicit": successful_implicit,
"successful_emotion": successful_emotion,
"failed": failed,
"new_users_initialized": new_users_initialized,
"new_users_failed": new_users_failed,
"user_results": user_results[:50] # 只保留前50个用户的详细结果
}
@@ -2301,6 +2388,8 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
"successful_implicit": successful_implicit,
"successful_emotion": successful_emotion,
"failed": failed,
"new_users_initialized": 0,
"new_users_failed": 0,
"user_results": user_results[:50]
}