From 8dd24533bfb0a7738528fcb5ea99b38ebf467947 Mon Sep 17 00:00:00 2001 From: Eternity <1533512157@qq.com> Date: Mon, 30 Mar 2026 16:00:49 +0800 Subject: [PATCH] fix(memory,task): add Redis fair lock for ordered memory writes --- .../core/memory/llm_tools/openai_client.py | 2 +- api/app/tasks.py | 40 ++++-- api/app/utils/redis_lock.py | 133 +++++++++++++++--- 3 files changed, 145 insertions(+), 30 deletions(-) diff --git a/api/app/core/memory/llm_tools/openai_client.py b/api/app/core/memory/llm_tools/openai_client.py index 43c2b445..c70fef5f 100644 --- a/api/app/core/memory/llm_tools/openai_client.py +++ b/api/app/core/memory/llm_tools/openai_client.py @@ -65,7 +65,7 @@ class OpenAIClient(LLMClient): type=type_ ) - logger.info(f"OpenAI 客户端初始化完成: type={type_}") + logger.debug(f"OpenAI 客户端初始化完成: type={type_}") async def chat(self, messages: List[Dict[str, str]], **kwargs) -> Any: """ diff --git a/api/app/tasks.py b/api/app/tasks.py index d5f09a29..0e909fcc 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1,5 +1,4 @@ import asyncio -import hashlib import os import re import shutil @@ -38,12 +37,10 @@ from app.db import get_db, get_db_context from app.models import Document, File, Knowledge from app.models.end_user_model import EndUser from app.schemas import document_schema, file_schema -from app.schemas.model_schema import ModelInfo from app.services.memory_agent_service import MemoryAgentService, get_end_user_connected_config from app.services.memory_forget_service import MemoryForgetService -from app.services.memory_perceptual_service import MemoryPerceptualService from app.utils.config_utils import resolve_config_id -from app.utils.redis_lock import RedisLock +from app.utils.redis_lock import RedisFairLock logger = get_logger(__name__) @@ -1148,8 +1145,28 @@ def write_message_task( logger.info(f"[CELERY WRITE] Write completed successfully: {result}") return result + redis_client = get_sync_redis_client() + lock = None + if redis_client is not None: + lock = RedisFairLock( + key=f"memory_write:{end_user_id}", + redis_client=redis_client, + expire=120, + timeout=300, + auto_renewal=True, + ) + if not lock.acquire(): + logger.warning(f"[CELERY WRITE] 获取锁超时,跳过本次写入: end_user_id={end_user_id}") + return { + "status": "SKIPPED", + "error": "acquire lock timeout", + "end_user_id": end_user_id, + "config_id": str(config_id), + "elapsed_time": time.time() - start_time, + "task_id": self.request.id, + } + try: - # 尝试获取现有事件循环,如果不存在则创建新的 loop = set_asyncio_event_loop() result = loop.run_until_complete(_run()) @@ -1158,7 +1175,6 @@ def write_message_task( logger.info(f"[CELERY WRITE] Task completed successfully " f"- elapsed_time={elapsed_time:.2f}s, task_id={self.request.id}") - # 记录该用户最后一次 write_message 成功的时间,供时间轴筛选使用 try: _r = get_sync_redis_client() if _r is not None: @@ -1199,9 +1215,12 @@ def write_message_task( "elapsed_time": elapsed_time, "task_id": self.request.id } - - -# unused task + finally: + if lock is not None: + try: + lock.release() + except Exception as e: + logger.warning(f"[CELERY WRITE] 释放锁失败: {e}") # @celery_app.task(name="app.core.memory.agent.health.check_read_service") # def check_read_service_task() -> Dict[str, str]: # """Call read_service and write latest status to Redis. @@ -2879,3 +2898,6 @@ def init_community_clustering_for_users(self, end_user_ids: List[str], workspace "elapsed_time": time.time() - start_time, "task_id": self.request.id, } + + +# unused task \ No newline at end of file diff --git a/api/app/utils/redis_lock.py b/api/app/utils/redis_lock.py index 99f62d84..a86ba46e 100644 --- a/api/app/utils/redis_lock.py +++ b/api/app/utils/redis_lock.py @@ -1,6 +1,7 @@ import redis import uuid import time +import threading UNLOCK_SCRIPT = """ if redis.call("get", KEYS[1]) == ARGV[1] then @@ -10,45 +11,136 @@ else end """ +RENEW_SCRIPT = """ +if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("expire", KEYS[1], ARGV[2]) +else + return 0 +end +""" -class RedisLock: +CLEANUP_DEAD_HEAD_SCRIPT = """ +local queue_key = KEYS[1] +local lock_key = KEYS[2] + +local first = redis.call("lindex", queue_key, 0) +if not first then + return 0 +end + +if redis.call("exists", lock_key) == 1 then + return 0 +end + +redis.call("lpop", queue_key) +return 1 +""" + +SAFE_RELEASE_QUEUE_SCRIPT = """ +local queue_key = KEYS[1] +local value = ARGV[1] + +local first = redis.call("lindex", queue_key, 0) +if first == value then + redis.call("lpop", queue_key) + return 1 +end +return 0 +""" + + +def _ensure_str(val): + """统一将 Redis 返回值转为 str,兼容 decode_responses=True/False""" + if val is None: + return None + if isinstance(val, bytes): + return val.decode("utf-8") + return str(val) + + +class RedisFairLock: def __init__( self, key: str, redis_client: redis.StrictRedis, - expire: int = 60, - retry_interval: float = 0.1, - timeout: float = 30 - + expire: int = 30, + retry_interval: float = 0.05, + timeout: float = 600, + auto_renewal: bool = True ): self.key = key - self.expire = expire + self.queue_key = f"{key}:queue" self.value = str(uuid.uuid4()) - self._locked = False + self.expire = expire self.retry_interval = retry_interval self.timeout = timeout - self.redis_client = redis_client + self.redis = redis_client + self._locked = False + self.auto_renewal = auto_renewal + self._renew_thread = None + self._stop_renew = threading.Event() - def acquire(self) -> bool: + def acquire(self): start = time.time() + + self.redis.rpush(self.queue_key, self.value) + while True: - ok = self.redis_client.set(self.key, self.value, ex=self.expire, nx=True) - if ok: - self._locked = True - return True - if time.time() - start >= self.timeout: + first = _ensure_str(self.redis.lindex(self.queue_key, 0)) + + if first == self.value: + ok = self.redis.set(self.key, self.value, nx=True, ex=self.expire) + if ok: + self._locked = True + + if self.auto_renewal: + self._start_renewal() + return True + + if first: + self.redis.eval(CLEANUP_DEAD_HEAD_SCRIPT, 2, self.queue_key, self.key) + + if time.time() - start > self.timeout: + self.redis.lrem(self.queue_key, 0, self.value) return False + time.sleep(self.retry_interval) + def _renewal_loop(self): + while not self._stop_renew.is_set(): + time.sleep(self.expire / 3) + if self._stop_renew.is_set(): + break + + self.redis.eval( + RENEW_SCRIPT, + 1, + self.key, + self.value, + str(self.expire) + ) + + def _start_renewal(self): + self._stop_renew = threading.Event() + self._renew_thread = threading.Thread(target=self._renewal_loop, daemon=True) + self._renew_thread.start() + + def _stop_renewal(self): + self._stop_renew.set() + if self._renew_thread: + self._renew_thread.join(timeout=1) + def release(self): if not self._locked: return - self.redis_client.eval( - UNLOCK_SCRIPT, - 1, - self.key, - self.value - ) + + if self.auto_renewal: + self._stop_renewal() + + self.redis.eval(UNLOCK_SCRIPT, 1, self.key, self.value) + + self.redis.eval(SAFE_RELEASE_QUEUE_SCRIPT, 1, self.queue_key, self.value) + self._locked = False def __enter__(self): @@ -59,3 +151,4 @@ class RedisLock: def __exit__(self, exc_type, exc_val, exc_tb): self.release() +