From bbba995ff7d815a977cc9f894f8d589dcc057591 Mon Sep 17 00:00:00 2001 From: lixinyue11 <94037597+lixinyue11@users.noreply.github.com> Date: Fri, 6 Feb 2026 15:26:59 +0800 Subject: [PATCH] Fix/develop memory bug (#346) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 遗漏的历史映射 * 遗漏的历史映射 * fix_timeline_memories * fix_timeline_memories * write_gragp/bug_fix * write_gragp/bug_fix * write_gragp/bug_fix * write_gragp/bug_fix * Multiple independent transactions - single transaction * memory_content ->memory_config_id * memory_content ->memory_config_id * memory_content ->memory_config_id * memory_content ->memory_config_id * memory_content ->memory_config_id * memory_content ->memory_config_id * memory_content ->memory_config_id * tasks/bug_fix/long --- api/app/tasks.py | 96 ------------------------------------------------ 1 file changed, 96 deletions(-) diff --git a/api/app/tasks.py b/api/app/tasks.py index 29b0e485..e2c295ab 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1697,102 +1697,6 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di # Long-term Memory Storage Tasks (Batched Write Strategies) # ============================================================================= -@celery_app.task(name="app.core.memory.agent.long_term_storage.window", bind=True) -def long_term_storage_window_task( - self, - end_user_id: str, - langchain_messages: List[Dict[str, Any]], - config_id: str, - scope: int = 6 -) -> Dict[str, Any]: - """Celery task for window-based long-term memory storage. - - Accumulates messages in Redis buffer until window size (scope) is reached, - then writes batched messages to Neo4j. - - Args: - end_user_id: End user identifier - langchain_messages: List of messages [{"role": "user/assistant", "content": "..."}] - config_id: Memory configuration ID - scope: Window size (number of messages before triggering write) - - Returns: - Dict containing task status and metadata - """ - from app.core.logging_config import get_logger - logger = get_logger(__name__) - - logger.info(f"[LONG_TERM_WINDOW] Starting task - end_user_id={end_user_id}, scope={scope}") - start_time = time.time() - - async def _run() -> Dict[str, Any]: - from app.core.memory.agent.langgraph_graph.routing.write_router import window_dialogue - from app.core.memory.agent.langgraph_graph.tools.write_tool import chat_data_format - from app.core.memory.agent.utils.redis_tool import write_store - from app.services.memory_config_service import MemoryConfigService - - db = next(get_db()) - try: - # Save to Redis buffer first - write_store.save_session_write(end_user_id, await chat_data_format(langchain_messages)) - - # Load memory config - config_service = MemoryConfigService(db) - memory_config = config_service.load_memory_config( - config_id=config_id, - service_name="LongTermStorageTask" - ) - - # Execute window-based dialogue storage - await window_dialogue(end_user_id, langchain_messages, memory_config, scope) - - return {"status": "SUCCESS", "strategy": "window", "scope": scope} - finally: - db.close() - - try: - import nest_asyncio - nest_asyncio.apply() - except ImportError: - pass - - try: - loop = asyncio.get_event_loop() - if loop.is_closed(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - result = loop.run_until_complete(_run()) - elapsed_time = time.time() - start_time - - logger.info(f"[LONG_TERM_WINDOW] Task completed - elapsed_time={elapsed_time:.2f}s") - - return { - **result, - "end_user_id": end_user_id, - "config_id": config_id, - "elapsed_time": elapsed_time, - "task_id": self.request.id - } - except Exception as e: - elapsed_time = time.time() - start_time - logger.error(f"[LONG_TERM_WINDOW] Task failed - error={str(e)}", exc_info=True) - - return { - "status": "FAILURE", - "strategy": "window", - "error": str(e), - "end_user_id": end_user_id, - "config_id": config_id, - "elapsed_time": elapsed_time, - "task_id": self.request.id - } - - # @celery_app.task(name="app.core.memory.agent.long_term_storage.time", bind=True) # def long_term_storage_time_task( # self,