From 7c1f62279754c5611c535bcf1c1630fde7da678f Mon Sep 17 00:00:00 2001 From: lixinyue <2569494688@qq.com> Date: Wed, 4 Feb 2026 20:11:05 +0800 Subject: [PATCH 1/4] Multiple independent transactions - single transaction --- api/app/core/memory/agent/langgraph_graph/tools/write_tool.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/app/core/memory/agent/langgraph_graph/tools/write_tool.py b/api/app/core/memory/agent/langgraph_graph/tools/write_tool.py index d0be8e5c..9ce581ee 100644 --- a/api/app/core/memory/agent/langgraph_graph/tools/write_tool.py +++ b/api/app/core/memory/agent/langgraph_graph/tools/write_tool.py @@ -1,8 +1,6 @@ import json from langchain_core.messages import HumanMessage, AIMessage - - async def format_parsing(messages: list,type:str='string'): """ 格式化解析消息列表 From 3f906d81cbf9eafbbe14d59ea3167b96bbdc9661 Mon Sep 17 00:00:00 2001 From: lixinyue <2569494688@qq.com> Date: Wed, 4 Feb 2026 20:19:04 +0800 Subject: [PATCH 2/4] Multiple independent transactions - single transaction --- api/app/repositories/neo4j/graph_saver.py | 144 +++++++++++++++++----- 1 file changed, 112 insertions(+), 32 deletions(-) diff --git a/api/app/repositories/neo4j/graph_saver.py b/api/app/repositories/neo4j/graph_saver.py index 1575315f..f8aa7cdb 100644 --- a/api/app/repositories/neo4j/graph_saver.py +++ b/api/app/repositories/neo4j/graph_saver.py @@ -147,14 +147,14 @@ async def save_statement_entity_edges( async def save_dialog_and_statements_to_neo4j( - dialogue_nodes: List[DialogueNode], - chunk_nodes: List[ChunkNode], - statement_nodes: List[StatementNode], - entity_nodes: List[ExtractedEntityNode], - entity_edges: List[EntityEntityEdge], - statement_chunk_edges: List[StatementChunkEdge], - statement_entity_edges: List[StatementEntityEdge], - connector: Neo4jConnector + dialogue_nodes: List[DialogueNode], + chunk_nodes: List[ChunkNode], + statement_nodes: List[StatementNode], + entity_nodes: List[ExtractedEntityNode], + entity_edges: List[EntityEntityEdge], + statement_chunk_edges: List[StatementChunkEdge], + statement_entity_edges: List[StatementEntityEdge], + connector: Neo4jConnector ) -> bool: """Save dialogue nodes, chunk nodes, statement nodes, entities, and all relationships to Neo4j using graph models. @@ -171,40 +171,120 @@ async def save_dialog_and_statements_to_neo4j( Returns: bool: True if successful, False otherwise """ - try: - # Save all dialogue nodes in batch - dialogue_uuids = await add_dialogue_nodes(dialogue_nodes, connector) - if dialogue_uuids: + + # 定义事务函数,将所有写操作放在一个事务中 + async def _save_all_in_transaction(tx): + """在单个事务中执行所有保存操作,避免死锁""" + results = {} + + # 1. Save all dialogue nodes in batch + if dialogue_nodes: + from app.repositories.neo4j.cypher_queries import DIALOGUE_NODE_SAVE + dialogue_data = [node.model_dump() for node in dialogue_nodes] + result = await tx.run(DIALOGUE_NODE_SAVE, dialogues=dialogue_data) + dialogue_uuids = [record["uuid"] async for record in result] + results['dialogues'] = dialogue_uuids print(f"Dialogues saved to Neo4j with UUIDs: {dialogue_uuids}") - else: - print("Failed to save dialogues to Neo4j") - return False - # Save all chunk nodes in batch - await save_chunk_nodes(chunk_nodes, connector) + # 2. Save all chunk nodes in batch + if chunk_nodes: + from app.repositories.neo4j.cypher_queries import CHUNK_NODE_SAVE + chunk_data = [node.model_dump() for node in chunk_nodes] + result = await tx.run(CHUNK_NODE_SAVE, chunks=chunk_data) + chunk_uuids = [record["uuid"] async for record in result] + results['chunks'] = chunk_uuids + print(f"Successfully saved {len(chunk_uuids)} chunk nodes to Neo4j") - # Save all statement nodes in batch + # 3. Save all statement nodes in batch if statement_nodes: - statement_uuids = await add_statement_nodes(statement_nodes, connector) - if statement_uuids: - print(f"Successfully saved {len(statement_uuids)} statement nodes to Neo4j") - else: - print("Failed to save statement nodes to Neo4j") - return False - else: - print("No statement nodes to save") + from app.repositories.neo4j.cypher_queries import STATEMENT_NODE_SAVE + statement_data = [node.model_dump() for node in statement_nodes] + result = await tx.run(STATEMENT_NODE_SAVE, statements=statement_data) + statement_uuids = [record["uuid"] async for record in result] + results['statements'] = statement_uuids + print(f"Successfully saved {len(statement_uuids)} statement nodes to Neo4j") - # Save entities and relationships - await save_entities_and_relationships(entity_nodes, entity_edges, connector) - print("Successfully saved entities and relationships to Neo4j") + # 4. Save entities + if entity_nodes: + from app.repositories.neo4j.cypher_queries import EXTRACTED_ENTITY_NODE_SAVE + entity_data = [entity.model_dump() for entity in entity_nodes] + result = await tx.run(EXTRACTED_ENTITY_NODE_SAVE, entities=entity_data) + entity_uuids = [record["uuid"] async for record in result] + results['entities'] = entity_uuids + print(f"Successfully saved {len(entity_uuids)} entity nodes to Neo4j") - # Save new edges - await save_statement_chunk_edges(statement_chunk_edges, connector) - await save_statement_entity_edges(statement_entity_edges, connector) + # 5. Create entity relationships + if entity_edges: + from app.repositories.neo4j.cypher_queries import ENTITY_RELATIONSHIP_SAVE + relationship_data = [] + for edge in entity_edges: + relationship_data.append({ + 'source_id': edge.source, + 'target_id': edge.target, + 'predicate': edge.relation_type, + 'statement_id': edge.source_statement_id, + 'value': edge.relation_value, + 'statement': edge.statement, + 'valid_at': edge.valid_at.isoformat() if edge.valid_at else None, + 'invalid_at': edge.invalid_at.isoformat() if edge.invalid_at else None, + 'created_at': edge.created_at.isoformat(), + 'expired_at': edge.expired_at.isoformat(), + 'run_id': edge.run_id, + 'end_user_id': edge.end_user_id, + }) + result = await tx.run(ENTITY_RELATIONSHIP_SAVE, relationships=relationship_data) + rel_uuids = [record["uuid"] async for record in result] + results['entity_relationships'] = rel_uuids + print(f"Successfully saved {len(rel_uuids)} entity relationships to Neo4j") + # 6. Save statement-chunk edges + if statement_chunk_edges: + from app.repositories.neo4j.cypher_queries import STATEMENT_CHUNK_EDGE_SAVE + sc_edge_data = [] + for edge in statement_chunk_edges: + sc_edge_data.append({ + "id": edge.id, + "source": edge.source, + "target": edge.target, + "created_at": edge.created_at.isoformat(), + "expired_at": edge.expired_at.isoformat(), + "run_id": edge.run_id, + "end_user_id": edge.end_user_id, + }) + result = await tx.run(STATEMENT_CHUNK_EDGE_SAVE, edges=sc_edge_data) + sc_uuids = [record["uuid"] async for record in result] + results['statement_chunk_edges'] = sc_uuids + print(f"Successfully saved {len(sc_uuids)} statement-chunk edges to Neo4j") + + # 7. Save statement-entity edges + if statement_entity_edges: + from app.repositories.neo4j.cypher_queries import STATEMENT_ENTITY_EDGE_SAVE + se_edge_data = [] + for edge in statement_entity_edges: + se_edge_data.append({ + "id": edge.id, + "source": edge.source, + "target": edge.target, + "created_at": edge.created_at.isoformat(), + "expired_at": edge.expired_at.isoformat(), + "run_id": edge.run_id, + "end_user_id": edge.end_user_id, + }) + result = await tx.run(STATEMENT_ENTITY_EDGE_SAVE, edges=se_edge_data) + se_uuids = [record["uuid"] async for record in result] + results['statement_entity_edges'] = se_uuids + print(f"Successfully saved {len(se_uuids)} statement-entity edges to Neo4j") + + return results + + try: + # 使用显式写事务执行所有操作,避免死锁 + results = await connector.execute_write_transaction(_save_all_in_transaction) + print("Successfully saved all data to Neo4j in a single transaction") return True except Exception as e: print(f"Neo4j integration error: {e}") print("Continuing without database storage...") return False + From 3735bdde194ff44ba7b625118098f07e83fc3737 Mon Sep 17 00:00:00 2001 From: lixinyue <2569494688@qq.com> Date: Wed, 4 Feb 2026 20:20:45 +0800 Subject: [PATCH 3/4] Multiple independent transactions - single transaction --- .../core/memory/agent/utils/write_tools.py | 56 ++++++++++++++----- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index 446ab86a..aa66014c 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -4,6 +4,7 @@ Write Tools for Memory Knowledge Extraction Pipeline This module provides the main write function for executing the knowledge extraction pipeline. Only MemoryConfig is needed - clients are constructed internally. """ +import asyncio import time from datetime import datetime @@ -123,23 +124,48 @@ async def write( except Exception as e: logger.error(f"Error creating indexes: {e}", exc_info=True) + # 添加死锁重试机制 + max_retries = 3 + retry_delay = 1 # 秒 + + for attempt in range(max_retries): + try: + success = await save_dialog_and_statements_to_neo4j( + dialogue_nodes=all_dialogue_nodes, + chunk_nodes=all_chunk_nodes, + statement_nodes=all_statement_nodes, + entity_nodes=all_entity_nodes, + statement_chunk_edges=all_statement_chunk_edges, + statement_entity_edges=all_statement_entity_edges, + entity_edges=all_entity_entity_edges, + connector=neo4j_connector + ) + if success: + logger.info("Successfully saved all data to Neo4j") + break + else: + logger.warning("Failed to save some data to Neo4j") + if attempt < max_retries - 1: + logger.info(f"Retrying... (attempt {attempt + 2}/{max_retries})") + await asyncio.sleep(retry_delay * (attempt + 1)) # 指数退避 + except Exception as e: + error_msg = str(e) + # 检查是否是死锁错误 + if "DeadlockDetected" in error_msg or "deadlock" in error_msg.lower(): + if attempt < max_retries - 1: + logger.warning(f"Deadlock detected, retrying... (attempt {attempt + 2}/{max_retries})") + await asyncio.sleep(retry_delay * (attempt + 1)) # 指数退避 + else: + logger.error(f"Failed after {max_retries} attempts due to deadlock: {e}") + raise + else: + # 非死锁错误,直接抛出 + raise + try: - success = await save_dialog_and_statements_to_neo4j( - dialogue_nodes=all_dialogue_nodes, - chunk_nodes=all_chunk_nodes, - statement_nodes=all_statement_nodes, - entity_nodes=all_entity_nodes, - statement_chunk_edges=all_statement_chunk_edges, - statement_entity_edges=all_statement_entity_edges, - entity_edges=all_entity_entity_edges, - connector=neo4j_connector - ) - if success: - logger.info("Successfully saved all data to Neo4j") - else: - logger.warning("Failed to save some data to Neo4j") - finally: await neo4j_connector.close() + except Exception as e: + logger.error(f"Error closing Neo4j connector: {e}") log_time("Neo4j Database Save", time.time() - step_start, log_file) From 657d48a5f9b6321df8d67fa85ed5d3d89ccb8c9b Mon Sep 17 00:00:00 2001 From: lixinyue <2569494688@qq.com> Date: Wed, 4 Feb 2026 20:25:45 +0800 Subject: [PATCH 4/4] Multiple independent transactions - single transaction --- api/app/repositories/neo4j/graph_saver.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/api/app/repositories/neo4j/graph_saver.py b/api/app/repositories/neo4j/graph_saver.py index f8aa7cdb..1866fdb7 100644 --- a/api/app/repositories/neo4j/graph_saver.py +++ b/api/app/repositories/neo4j/graph_saver.py @@ -21,7 +21,8 @@ from app.core.memory.models.graph_models import ( ExtractedEntityNode, EntityEntityEdge, ) - +import logging +logger = logging.getLogger(__name__) async def save_entities_and_relationships( entity_nodes: List[ExtractedEntityNode], entity_entity_edges: List[EntityEntityEdge], @@ -193,7 +194,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(CHUNK_NODE_SAVE, chunks=chunk_data) chunk_uuids = [record["uuid"] async for record in result] results['chunks'] = chunk_uuids - print(f"Successfully saved {len(chunk_uuids)} chunk nodes to Neo4j") + logger.info(f"Successfully saved {len(chunk_uuids)} chunk nodes to Neo4j") # 3. Save all statement nodes in batch if statement_nodes: @@ -202,7 +203,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(STATEMENT_NODE_SAVE, statements=statement_data) statement_uuids = [record["uuid"] async for record in result] results['statements'] = statement_uuids - print(f"Successfully saved {len(statement_uuids)} statement nodes to Neo4j") + logger.info(f"Successfully saved {len(statement_uuids)} statement nodes to Neo4j") # 4. Save entities if entity_nodes: @@ -211,7 +212,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(EXTRACTED_ENTITY_NODE_SAVE, entities=entity_data) entity_uuids = [record["uuid"] async for record in result] results['entities'] = entity_uuids - print(f"Successfully saved {len(entity_uuids)} entity nodes to Neo4j") + logger.info(f"Successfully saved {len(entity_uuids)} entity nodes to Neo4j") # 5. Create entity relationships if entity_edges: @@ -235,7 +236,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(ENTITY_RELATIONSHIP_SAVE, relationships=relationship_data) rel_uuids = [record["uuid"] async for record in result] results['entity_relationships'] = rel_uuids - print(f"Successfully saved {len(rel_uuids)} entity relationships to Neo4j") + logger.info(f"Successfully saved {len(rel_uuids)} entity relationships to Neo4j") # 6. Save statement-chunk edges if statement_chunk_edges: @@ -254,7 +255,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(STATEMENT_CHUNK_EDGE_SAVE, edges=sc_edge_data) sc_uuids = [record["uuid"] async for record in result] results['statement_chunk_edges'] = sc_uuids - print(f"Successfully saved {len(sc_uuids)} statement-chunk edges to Neo4j") + logger.info(f"Successfully saved {len(sc_uuids)} statement-chunk edges to Neo4j") # 7. Save statement-entity edges if statement_entity_edges: @@ -273,7 +274,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(STATEMENT_ENTITY_EDGE_SAVE, edges=se_edge_data) se_uuids = [record["uuid"] async for record in result] results['statement_entity_edges'] = se_uuids - print(f"Successfully saved {len(se_uuids)} statement-entity edges to Neo4j") + logger.info(f"Successfully saved {len(se_uuids)} statement-entity edges to Neo4j") return results