[add] Specify the error types and clearly define the downgrade conditions
This commit is contained in:
@@ -8,6 +8,13 @@ import logging
|
|||||||
from datetime import date, datetime, timedelta, timezone
|
from datetime import date, datetime, timedelta, timezone
|
||||||
from typing import Generator, Optional
|
from typing import Generator, Optional
|
||||||
|
|
||||||
|
|
||||||
|
class TimeFilterUnavailableError(Exception):
|
||||||
|
"""redis_client 不可用,无法执行时间轴筛选。
|
||||||
|
|
||||||
|
调用方捕获此异常后可选择回退到 get_all_user_ids 进行全量处理。
|
||||||
|
"""
|
||||||
|
|
||||||
import redis
|
import redis
|
||||||
from sqlalchemy import exists, not_, select
|
from sqlalchemy import exists, not_, select
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
@@ -113,7 +120,7 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
|
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
|
||||||
break
|
break
|
||||||
|
|
||||||
def get_users_needing_refresh(self, redis_client: Optional[redis.StrictRedis], batch_size: int = 100) -> Generator[str, None, None]:
|
def get_users_needing_refresh(self, redis_client: redis.StrictRedis, batch_size: int = 100) -> Generator[str, None, None]:
|
||||||
"""分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。
|
"""分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。
|
||||||
|
|
||||||
筛选逻辑:
|
筛选逻辑:
|
||||||
@@ -123,32 +130,21 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
- 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新
|
- 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新
|
||||||
- 若 last_done <= updated_at,说明已是最新,跳过
|
- 若 last_done <= updated_at,说明已是最新,跳过
|
||||||
|
|
||||||
如果 redis_client 为 None,则降级为返回所有用户(禁用时间过滤)。
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB),为 None 时抛出 RuntimeError
|
redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB)
|
||||||
batch_size: 每批次加载的数量
|
batch_size: 每批次加载的数量
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
RuntimeError: redis_client 为 None 时,调用方可捕获并回退到 get_all_user_ids
|
TimeFilterUnavailableError: redis_client 为 None 时抛出,调用方可捕获并回退到 get_all_user_ids
|
||||||
|
|
||||||
Yields:
|
Yields:
|
||||||
需要刷新的用户ID字符串
|
需要刷新的用户ID字符串
|
||||||
"""
|
"""
|
||||||
if redis_client is None:
|
if redis_client is None:
|
||||||
raise RuntimeError("get_users_needing_refresh: redis_client 不可用,无法执行时间轴筛选")
|
raise TimeFilterUnavailableError("redis_client 不可用,无法执行时间轴筛选")
|
||||||
from datetime import timezone
|
|
||||||
|
|
||||||
from redis.exceptions import RedisError
|
from redis.exceptions import RedisError
|
||||||
|
|
||||||
# 如果 Redis 不可用,降级为处理所有用户
|
|
||||||
if redis_client is None:
|
|
||||||
logger.warning(
|
|
||||||
"Redis 客户端不可用,时间过滤已禁用,将处理所有存量用户"
|
|
||||||
)
|
|
||||||
yield from self.get_all_user_ids(batch_size)
|
|
||||||
return
|
|
||||||
|
|
||||||
offset = 0
|
offset = 0
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -2228,6 +2228,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
|
|||||||
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
|
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
|
||||||
from app.repositories.implicit_emotions_storage_repository import (
|
from app.repositories.implicit_emotions_storage_repository import (
|
||||||
ImplicitEmotionsStorageRepository,
|
ImplicitEmotionsStorageRepository,
|
||||||
|
TimeFilterUnavailableError,
|
||||||
)
|
)
|
||||||
from app.services.emotion_analytics_service import EmotionAnalyticsService
|
from app.services.emotion_analytics_service import EmotionAnalyticsService
|
||||||
from app.services.implicit_memory_service import ImplicitMemoryService
|
from app.services.implicit_memory_service import ImplicitMemoryService
|
||||||
@@ -2259,7 +2260,7 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
|
|||||||
# Redis 不可用时回退到全量处理
|
# Redis 不可用时回退到全量处理
|
||||||
try:
|
try:
|
||||||
refresh_iter = repo.get_users_needing_refresh(_redis_client, batch_size=100)
|
refresh_iter = repo.get_users_needing_refresh(_redis_client, batch_size=100)
|
||||||
except RuntimeError as e:
|
except TimeFilterUnavailableError as e:
|
||||||
logger.warning(f"时间轴筛选不可用,回退到全量刷新: {e}")
|
logger.warning(f"时间轴筛选不可用,回退到全量刷新: {e}")
|
||||||
refresh_iter = repo.get_all_user_ids(batch_size=100)
|
refresh_iter = repo.get_all_user_ids(batch_size=100)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user