[add] Throw out explicit error messages; Using the CST time zone
This commit is contained in:
@@ -122,12 +122,17 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
- 若 last_done <= updated_at,说明已是最新,跳过
|
- 若 last_done <= updated_at,说明已是最新,跳过
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB)
|
redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB),为 None 时抛出 RuntimeError
|
||||||
batch_size: 每批次加载的数量
|
batch_size: 每批次加载的数量
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RuntimeError: redis_client 为 None 时,调用方可捕获并回退到 get_all_user_ids
|
||||||
|
|
||||||
Yields:
|
Yields:
|
||||||
需要刷新的用户ID字符串
|
需要刷新的用户ID字符串
|
||||||
"""
|
"""
|
||||||
|
if redis_client is None:
|
||||||
|
raise RuntimeError("get_users_needing_refresh: redis_client 不可用,无法执行时间轴筛选")
|
||||||
from datetime import timezone
|
from datetime import timezone
|
||||||
offset = 0
|
offset = 0
|
||||||
while True:
|
while True:
|
||||||
@@ -152,16 +157,14 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
try:
|
try:
|
||||||
CST = timezone(timedelta(hours=8))
|
CST = timezone(timedelta(hours=8))
|
||||||
last_done = datetime.fromisoformat(raw)
|
last_done = datetime.fromisoformat(raw)
|
||||||
# 统一转为 CST naive 时间做比较
|
# last_done 写入时已是 CST naive,直接使用,无需转换
|
||||||
if last_done.tzinfo is None:
|
if last_done.tzinfo is not None:
|
||||||
last_done = last_done.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None)
|
|
||||||
else:
|
|
||||||
last_done = last_done.astimezone(CST).replace(tzinfo=None)
|
last_done = last_done.astimezone(CST).replace(tzinfo=None)
|
||||||
|
|
||||||
if updated_at is None:
|
if updated_at is None:
|
||||||
yield end_user_id
|
yield end_user_id
|
||||||
continue
|
continue
|
||||||
# updated_at 同样转为 CST naive
|
# updated_at 数据库存的是 UTC naive,转为 CST naive 再比较
|
||||||
if updated_at.tzinfo is None:
|
if updated_at.tzinfo is None:
|
||||||
updated_at_cst = updated_at.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None)
|
updated_at_cst = updated_at.replace(tzinfo=timezone.utc).astimezone(CST).replace(tzinfo=None)
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -1117,7 +1117,7 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s
|
|||||||
try:
|
try:
|
||||||
_r = get_sync_redis_client()
|
_r = get_sync_redis_client()
|
||||||
if _r is not None:
|
if _r is not None:
|
||||||
from datetime import timezone as _tz, timedelta as _td
|
from datetime import timezone as _tz
|
||||||
_CST = _tz(timedelta(hours=8))
|
_CST = _tz(timedelta(hours=8))
|
||||||
_now_cst = datetime.now(_CST).replace(tzinfo=None).isoformat()
|
_now_cst = datetime.now(_CST).replace(tzinfo=None).isoformat()
|
||||||
_r.set(
|
_r.set(
|
||||||
@@ -2218,7 +2218,14 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
|
|||||||
_redis_client = get_sync_redis_client()
|
_redis_client = get_sync_redis_client()
|
||||||
|
|
||||||
# 只处理 last_done > updated_at 的用户(有新记忆写入的用户)
|
# 只处理 last_done > updated_at 的用户(有新记忆写入的用户)
|
||||||
for end_user_id in repo.get_users_needing_refresh(_redis_client, batch_size=100):
|
# Redis 不可用时回退到全量处理
|
||||||
|
try:
|
||||||
|
refresh_iter = repo.get_users_needing_refresh(_redis_client, batch_size=100)
|
||||||
|
except RuntimeError as e:
|
||||||
|
logger.warning(f"时间轴筛选不可用,回退到全量刷新: {e}")
|
||||||
|
refresh_iter = repo.get_all_user_ids(batch_size=100)
|
||||||
|
|
||||||
|
for end_user_id in refresh_iter:
|
||||||
logger.info(f"开始处理用户: {end_user_id}")
|
logger.info(f"开始处理用户: {end_user_id}")
|
||||||
user_start_time = time.time()
|
user_start_time = time.time()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user