Multiple independent transactions - single transaction
This commit is contained in:
@@ -4,6 +4,7 @@ Write Tools for Memory Knowledge Extraction Pipeline
|
|||||||
This module provides the main write function for executing the knowledge extraction
|
This module provides the main write function for executing the knowledge extraction
|
||||||
pipeline. Only MemoryConfig is needed - clients are constructed internally.
|
pipeline. Only MemoryConfig is needed - clients are constructed internally.
|
||||||
"""
|
"""
|
||||||
|
import asyncio
|
||||||
import time
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
@@ -123,23 +124,48 @@ async def write(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error creating indexes: {e}", exc_info=True)
|
logger.error(f"Error creating indexes: {e}", exc_info=True)
|
||||||
|
|
||||||
|
# 添加死锁重试机制
|
||||||
|
max_retries = 3
|
||||||
|
retry_delay = 1 # 秒
|
||||||
|
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
success = await save_dialog_and_statements_to_neo4j(
|
||||||
|
dialogue_nodes=all_dialogue_nodes,
|
||||||
|
chunk_nodes=all_chunk_nodes,
|
||||||
|
statement_nodes=all_statement_nodes,
|
||||||
|
entity_nodes=all_entity_nodes,
|
||||||
|
statement_chunk_edges=all_statement_chunk_edges,
|
||||||
|
statement_entity_edges=all_statement_entity_edges,
|
||||||
|
entity_edges=all_entity_entity_edges,
|
||||||
|
connector=neo4j_connector
|
||||||
|
)
|
||||||
|
if success:
|
||||||
|
logger.info("Successfully saved all data to Neo4j")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
logger.warning("Failed to save some data to Neo4j")
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
logger.info(f"Retrying... (attempt {attempt + 2}/{max_retries})")
|
||||||
|
await asyncio.sleep(retry_delay * (attempt + 1)) # 指数退避
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = str(e)
|
||||||
|
# 检查是否是死锁错误
|
||||||
|
if "DeadlockDetected" in error_msg or "deadlock" in error_msg.lower():
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
logger.warning(f"Deadlock detected, retrying... (attempt {attempt + 2}/{max_retries})")
|
||||||
|
await asyncio.sleep(retry_delay * (attempt + 1)) # 指数退避
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed after {max_retries} attempts due to deadlock: {e}")
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
# 非死锁错误,直接抛出
|
||||||
|
raise
|
||||||
|
|
||||||
try:
|
try:
|
||||||
success = await save_dialog_and_statements_to_neo4j(
|
|
||||||
dialogue_nodes=all_dialogue_nodes,
|
|
||||||
chunk_nodes=all_chunk_nodes,
|
|
||||||
statement_nodes=all_statement_nodes,
|
|
||||||
entity_nodes=all_entity_nodes,
|
|
||||||
statement_chunk_edges=all_statement_chunk_edges,
|
|
||||||
statement_entity_edges=all_statement_entity_edges,
|
|
||||||
entity_edges=all_entity_entity_edges,
|
|
||||||
connector=neo4j_connector
|
|
||||||
)
|
|
||||||
if success:
|
|
||||||
logger.info("Successfully saved all data to Neo4j")
|
|
||||||
else:
|
|
||||||
logger.warning("Failed to save some data to Neo4j")
|
|
||||||
finally:
|
|
||||||
await neo4j_connector.close()
|
await neo4j_connector.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error closing Neo4j connector: {e}")
|
||||||
|
|
||||||
log_time("Neo4j Database Save", time.time() - step_start, log_file)
|
log_time("Neo4j Database Save", time.time() - step_start, log_file)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user