fix(memory,task): add Redis fair lock for ordered memory writes

This commit is contained in:
Eternity
2026-03-30 16:00:49 +08:00
parent 91b7f2a980
commit 8dd24533bf
3 changed files with 145 additions and 30 deletions

View File

@@ -1,5 +1,4 @@
import asyncio
import hashlib
import os
import re
import shutil
@@ -38,12 +37,10 @@ from app.db import get_db, get_db_context
from app.models import Document, File, Knowledge
from app.models.end_user_model import EndUser
from app.schemas import document_schema, file_schema
from app.schemas.model_schema import ModelInfo
from app.services.memory_agent_service import MemoryAgentService, get_end_user_connected_config
from app.services.memory_forget_service import MemoryForgetService
from app.services.memory_perceptual_service import MemoryPerceptualService
from app.utils.config_utils import resolve_config_id
from app.utils.redis_lock import RedisLock
from app.utils.redis_lock import RedisFairLock
logger = get_logger(__name__)
@@ -1148,8 +1145,28 @@ def write_message_task(
logger.info(f"[CELERY WRITE] Write completed successfully: {result}")
return result
redis_client = get_sync_redis_client()
lock = None
if redis_client is not None:
lock = RedisFairLock(
key=f"memory_write:{end_user_id}",
redis_client=redis_client,
expire=120,
timeout=300,
auto_renewal=True,
)
if not lock.acquire():
logger.warning(f"[CELERY WRITE] 获取锁超时,跳过本次写入: end_user_id={end_user_id}")
return {
"status": "SKIPPED",
"error": "acquire lock timeout",
"end_user_id": end_user_id,
"config_id": str(config_id),
"elapsed_time": time.time() - start_time,
"task_id": self.request.id,
}
try:
# 尝试获取现有事件循环,如果不存在则创建新的
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
@@ -1158,7 +1175,6 @@ def write_message_task(
logger.info(f"[CELERY WRITE] Task completed successfully "
f"- elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}")
# 记录该用户最后一次 write_message 成功的时间,供时间轴筛选使用
try:
_r = get_sync_redis_client()
if _r is not None:
@@ -1199,9 +1215,12 @@ def write_message_task(
"elapsed_time": elapsed_time,
"task_id": self.request.id
}
# unused task
finally:
if lock is not None:
try:
lock.release()
except Exception as e:
logger.warning(f"[CELERY WRITE] 释放锁失败: {e}")
# @celery_app.task(name="app.core.memory.agent.health.check_read_service")
# def check_read_service_task() -> Dict[str, str]:
# """Call read_service and write latest status to Redis.
@@ -2879,3 +2898,6 @@ def init_community_clustering_for_users(self, end_user_ids: List[str], workspace
"elapsed_time": time.time() - start_time,
"task_id": self.request.id,
}
# unused task