feat(implicit-emotions): add Redis resilience and connection pooling
- Replace single Redis client with connection pool for better concurrency and auto-reconnection - Add graceful degradation when Redis is unavailable (None handling in get_users_needing_refresh) - Add RedisError exception handling with fallback to process all users on mget failures - Add type hints (Optional[redis.StrictRedis]) to Redis client parameters - Add health check and socket timeout configuration to connection pool - Add logging for Redis connection failures and degradation events - Reorganize imports alphabetically for consistency across both files - Update get_sync_redis_client to validate connection with ping() before returning
This commit is contained in:
@@ -5,13 +5,15 @@ Implicit Emotions Storage Repository
|
|||||||
事务由调用方控制,仓储层只使用 flush/refresh
|
事务由调用方控制,仓储层只使用 flush/refresh
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime, date, timezone, timedelta
|
from datetime import date, datetime, timedelta, timezone
|
||||||
from typing import Optional, Generator
|
from typing import Generator, Optional
|
||||||
from sqlalchemy.orm import Session
|
|
||||||
from sqlalchemy import select, not_, exists
|
import redis
|
||||||
|
from sqlalchemy import exists, not_, select
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
|
|
||||||
from app.models.end_user_model import EndUser
|
from app.models.end_user_model import EndUser
|
||||||
|
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -111,7 +113,7 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
|
logger.error(f"分批获取用户ID失败: offset={offset}, error={e}")
|
||||||
break
|
break
|
||||||
|
|
||||||
def get_users_needing_refresh(self, redis_client, batch_size: int = 100) -> Generator[str, None, None]:
|
def get_users_needing_refresh(self, redis_client: Optional[redis.StrictRedis], batch_size: int = 100) -> Generator[str, None, None]:
|
||||||
"""分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。
|
"""分批次获取需要刷新隐性记忆/情绪数据的存量用户ID。
|
||||||
|
|
||||||
筛选逻辑:
|
筛选逻辑:
|
||||||
@@ -121,14 +123,27 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
- 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新
|
- 若 last_done > updated_at,说明上次刷新后又有新记忆写入,需要刷新
|
||||||
- 若 last_done <= updated_at,说明已是最新,跳过
|
- 若 last_done <= updated_at,说明已是最新,跳过
|
||||||
|
|
||||||
|
如果 redis_client 为 None,则降级为返回所有用户(禁用时间过滤)。
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB)
|
redis_client: 同步 redis.StrictRedis 实例(连接 CELERY_BACKEND DB),如果为 None 则禁用时间过滤
|
||||||
batch_size: 每批次加载的数量
|
batch_size: 每批次加载的数量
|
||||||
|
|
||||||
Yields:
|
Yields:
|
||||||
需要刷新的用户ID字符串
|
需要刷新的用户ID字符串
|
||||||
"""
|
"""
|
||||||
from datetime import timezone
|
from datetime import timezone
|
||||||
|
|
||||||
|
from redis.exceptions import RedisError
|
||||||
|
|
||||||
|
# 如果 Redis 不可用,降级为处理所有用户
|
||||||
|
if redis_client is None:
|
||||||
|
logger.warning(
|
||||||
|
"Redis 客户端不可用,时间过滤已禁用,将处理所有存量用户"
|
||||||
|
)
|
||||||
|
yield from self.get_all_user_ids(batch_size)
|
||||||
|
return
|
||||||
|
|
||||||
offset = 0
|
offset = 0
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -144,7 +159,18 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
|
|
||||||
# 批量获取当前批次所有用户的 last_done 时间戳(一次网络往返)
|
# 批量获取当前批次所有用户的 last_done 时间戳(一次网络往返)
|
||||||
keys = [f"write_message:last_done:{end_user_id}" for end_user_id, _ in batch]
|
keys = [f"write_message:last_done:{end_user_id}" for end_user_id, _ in batch]
|
||||||
raw_values = redis_client.mget(keys)
|
|
||||||
|
try:
|
||||||
|
raw_values = redis_client.mget(keys)
|
||||||
|
except RedisError as e:
|
||||||
|
logger.error(
|
||||||
|
f"Redis mget 操作失败: {e},当前批次降级为处理所有用户",
|
||||||
|
extra={"offset": offset, "batch_size": len(batch)}
|
||||||
|
)
|
||||||
|
# Redis 操作失败,降级为返回当前批次所有用户
|
||||||
|
yield from (end_user_id for end_user_id, _ in batch)
|
||||||
|
offset += batch_size
|
||||||
|
continue
|
||||||
|
|
||||||
for (end_user_id, updated_at), raw in zip(batch, raw_values):
|
for (end_user_id, updated_at), raw in zip(batch, raw_values):
|
||||||
if raw is None:
|
if raw is None:
|
||||||
@@ -190,7 +216,8 @@ class ImplicitEmotionsStorageRepository:
|
|||||||
Yields:
|
Yields:
|
||||||
用户ID字符串
|
用户ID字符串
|
||||||
"""
|
"""
|
||||||
from sqlalchemy import cast, String as SAString
|
from sqlalchemy import String as SAString
|
||||||
|
from sqlalchemy import cast
|
||||||
CST = timezone(timedelta(hours=8))
|
CST = timezone(timedelta(hours=8))
|
||||||
now_cst = datetime.now(CST)
|
now_cst = datetime.now(CST)
|
||||||
today_start = now_cst.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc).replace(tzinfo=None)
|
today_start = now_cst.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(timezone.utc).replace(tzinfo=None)
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
@@ -14,29 +15,62 @@ from uuid import UUID
|
|||||||
|
|
||||||
import redis
|
import redis
|
||||||
import requests
|
import requests
|
||||||
|
from redis.exceptions import RedisError
|
||||||
|
|
||||||
# 模块级同步 Redis 客户端单例,供 Celery 任务共享使用(避免每次任务新建连接)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# 模块级同步 Redis 连接池,供 Celery 任务共享使用
|
||||||
# 连接 CELERY_BACKEND DB,与 write_message:last_done 时间戳写入保持一致
|
# 连接 CELERY_BACKEND DB,与 write_message:last_done 时间戳写入保持一致
|
||||||
def _build_sync_redis_client():
|
# 使用连接池而非单例客户端,提供更好的并发性能和自动重连
|
||||||
|
_sync_redis_pool: redis.ConnectionPool = None
|
||||||
|
|
||||||
|
def _get_or_create_redis_pool() -> redis.ConnectionPool:
|
||||||
|
"""获取或创建 Redis 连接池(懒初始化)"""
|
||||||
|
global _sync_redis_pool
|
||||||
|
if _sync_redis_pool is None:
|
||||||
|
try:
|
||||||
|
_sync_redis_pool = redis.ConnectionPool(
|
||||||
|
host=settings.REDIS_HOST,
|
||||||
|
port=settings.REDIS_PORT,
|
||||||
|
db=settings.REDIS_DB_CELERY_BACKEND,
|
||||||
|
password=settings.REDIS_PASSWORD,
|
||||||
|
decode_responses=True,
|
||||||
|
max_connections=10,
|
||||||
|
socket_connect_timeout=5,
|
||||||
|
socket_timeout=5,
|
||||||
|
retry_on_timeout=True,
|
||||||
|
health_check_interval=30,
|
||||||
|
)
|
||||||
|
logger.info("Redis connection pool created for Celery tasks")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to create Redis connection pool: {e}", exc_info=True)
|
||||||
|
return None
|
||||||
|
return _sync_redis_pool
|
||||||
|
|
||||||
|
def get_sync_redis_client() -> Optional[redis.StrictRedis]:
|
||||||
|
"""获取同步 Redis 客户端(使用连接池)
|
||||||
|
|
||||||
|
使用连接池提供的客户端,支持自动重连和健康检查。
|
||||||
|
如果 Redis 不可用,返回 None,调用方应优雅降级。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
redis.StrictRedis: Redis 客户端实例,如果连接失败则返回 None
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
return redis.StrictRedis(
|
pool = _get_or_create_redis_pool()
|
||||||
host=settings.REDIS_HOST,
|
if pool is None:
|
||||||
port=settings.REDIS_PORT,
|
return None
|
||||||
db=settings.REDIS_DB_CELERY_BACKEND,
|
|
||||||
password=settings.REDIS_PASSWORD,
|
client = redis.StrictRedis(connection_pool=pool)
|
||||||
decode_responses=True,
|
# 验证连接可用性
|
||||||
)
|
client.ping()
|
||||||
except Exception:
|
return client
|
||||||
|
except RedisError as e:
|
||||||
|
logger.error(f"Redis connection failed: {e}", exc_info=True)
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected error getting Redis client: {e}", exc_info=True)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
_sync_redis_client: redis.StrictRedis = None
|
|
||||||
|
|
||||||
def get_sync_redis_client() -> redis.StrictRedis:
|
|
||||||
"""获取模块级同步 Redis 客户端(懒初始化单例)"""
|
|
||||||
global _sync_redis_client
|
|
||||||
if _sync_redis_client is None:
|
|
||||||
_sync_redis_client = _build_sync_redis_client()
|
|
||||||
return _sync_redis_client
|
|
||||||
|
|
||||||
# Import a unified Celery instance
|
# Import a unified Celery instance
|
||||||
from app.celery_app import celery_app
|
from app.celery_app import celery_app
|
||||||
@@ -1117,8 +1151,9 @@ def write_message_task(self, end_user_id: str, message: list[dict], config_id: s
|
|||||||
try:
|
try:
|
||||||
_r = get_sync_redis_client()
|
_r = get_sync_redis_client()
|
||||||
if _r is not None:
|
if _r is not None:
|
||||||
from datetime import timezone as _tz, timedelta as _td
|
from datetime import timedelta as _td
|
||||||
_CST = _tz(timedelta(hours=8))
|
from datetime import timezone as _tz
|
||||||
|
_CST = _tz(_td(hours=8))
|
||||||
_now_cst = datetime.now(_CST).replace(tzinfo=None).isoformat()
|
_now_cst = datetime.now(_CST).replace(tzinfo=None).isoformat()
|
||||||
_r.set(
|
_r.set(
|
||||||
f"write_message:last_done:{end_user_id}",
|
f"write_message:last_done:{end_user_id}",
|
||||||
@@ -2187,12 +2222,15 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]:
|
|||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
async def _run() -> Dict[str, Any]:
|
async def _run() -> Dict[str, Any]:
|
||||||
|
from sqlalchemy import func, select
|
||||||
|
|
||||||
from app.core.logging_config import get_logger
|
from app.core.logging_config import get_logger
|
||||||
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
|
|
||||||
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
|
from app.models.implicit_emotions_storage_model import ImplicitEmotionsStorage
|
||||||
from sqlalchemy import select, func
|
from app.repositories.implicit_emotions_storage_repository import (
|
||||||
from app.services.implicit_memory_service import ImplicitMemoryService
|
ImplicitEmotionsStorageRepository,
|
||||||
|
)
|
||||||
from app.services.emotion_analytics_service import EmotionAnalyticsService
|
from app.services.emotion_analytics_service import EmotionAnalyticsService
|
||||||
|
from app.services.implicit_memory_service import ImplicitMemoryService
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
logger.info("开始执行隐性记忆和情绪数据更新定时任务")
|
logger.info("开始执行隐性记忆和情绪数据更新定时任务")
|
||||||
@@ -2476,9 +2514,11 @@ def init_implicit_emotions_for_users(self, end_user_ids: List[str]) -> Dict[str,
|
|||||||
|
|
||||||
async def _run() -> Dict[str, Any]:
|
async def _run() -> Dict[str, Any]:
|
||||||
from app.core.logging_config import get_logger
|
from app.core.logging_config import get_logger
|
||||||
from app.repositories.implicit_emotions_storage_repository import ImplicitEmotionsStorageRepository
|
from app.repositories.implicit_emotions_storage_repository import (
|
||||||
from app.services.implicit_memory_service import ImplicitMemoryService
|
ImplicitEmotionsStorageRepository,
|
||||||
|
)
|
||||||
from app.services.emotion_analytics_service import EmotionAnalyticsService
|
from app.services.emotion_analytics_service import EmotionAnalyticsService
|
||||||
|
from app.services.implicit_memory_service import ImplicitMemoryService
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
logger.info(f"开始按需初始化隐性记忆/情绪数据,候选用户数: {len(end_user_ids)}")
|
logger.info(f"开始按需初始化隐性记忆/情绪数据,候选用户数: {len(end_user_ids)}")
|
||||||
|
|||||||
Reference in New Issue
Block a user