fix(redis_lock): refactor RedisFairLock to use ZSET for queue management and fix loop shutdown
- Replace list-based queue with sorted set for better dead client cleanup - Add zombie cleanup buffer to handle expired queue entries - Fix potential None loop reference in graceful shutdown - Add task start time to write_message_task result - Update lock acquisition script to use ZSET operations - Remove unused queue cleanup scripts - Ensure proper lock release and renewal failure handling
This commit is contained in:
@@ -1176,6 +1176,7 @@ def write_message_task(
|
||||
|
||||
redis_client = get_sync_redis_client()
|
||||
lock = None
|
||||
loop = None
|
||||
if redis_client is not None:
|
||||
lock = RedisFairLock(
|
||||
key=f"memory_write:{end_user_id}",
|
||||
@@ -1196,6 +1197,7 @@ def write_message_task(
|
||||
}
|
||||
|
||||
try:
|
||||
task_start_time = int(time.time())
|
||||
loop = set_asyncio_event_loop()
|
||||
|
||||
result = loop.run_until_complete(_run())
|
||||
@@ -1219,6 +1221,7 @@ def write_message_task(
|
||||
return {
|
||||
"status": "SUCCESS",
|
||||
"result": result,
|
||||
"start_at": task_start_time,
|
||||
"end_user_id": end_user_id,
|
||||
"config_id": config_id,
|
||||
"elapsed_time": elapsed_time,
|
||||
@@ -1252,7 +1255,8 @@ def write_message_task(
|
||||
logger.warning(f"[CELERY WRITE] 释放锁失败: {e}")
|
||||
# Gracefully shutdown the event loop to prevent
|
||||
# 'RuntimeError: Event loop is closed' from httpx.AsyncClient.__del__
|
||||
_shutdown_loop_gracefully(loop)
|
||||
if loop:
|
||||
_shutdown_loop_gracefully(loop)
|
||||
|
||||
|
||||
# unused task
|
||||
|
||||
Reference in New Issue
Block a user