Merge pull request #731 from SuanmoSuanyangTechnology/fix/cypher-indexes

Fix/cypher indexes
This commit is contained in:
Ke Sun
2026-03-30 18:18:03 +08:00
committed by GitHub
3 changed files with 20 additions and 221 deletions

View File

@@ -151,11 +151,6 @@ async def write(
# Step 3: Save all data to Neo4j database # Step 3: Save all data to Neo4j database
step_start = time.time() step_start = time.time()
from app.repositories.neo4j.create_indexes import create_fulltext_indexes
try:
await create_fulltext_indexes()
except Exception as e:
logger.error(f"Error creating indexes: {e}", exc_info=True)
# 添加死锁重试机制 # 添加死锁重试机制
max_retries = 3 max_retries = 3

View File

@@ -1,5 +1,6 @@
import os import os
import subprocess import subprocess
from app.repositories.neo4j.create_indexes import create_all_indexes
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import FastAPI, APIRouter from fastapi import FastAPI, APIRouter
@@ -60,8 +61,10 @@ async def lifespan(app: FastAPI):
logger.warning(f"加载预定义模型时出错: {str(e)}") logger.warning(f"加载预定义模型时出错: {str(e)}")
else: else:
logger.info("预定义模型加载已禁用 (LOAD_MODEL=false)") logger.info("预定义模型加载已禁用 (LOAD_MODEL=false)")
await create_all_indexes()
logger.info("应用程序启动完成") logger.info("应用程序启动完成")
yield yield
# 应用关闭事件 # 应用关闭事件
logger.info("应用程序正在关闭") logger.info("应用程序正在关闭")

View File

@@ -1,62 +1,47 @@
import asyncio
from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.repositories.neo4j.neo4j_connector import Neo4jConnector
async def create_fulltext_indexes(): async def create_fulltext_indexes():
"""Create full-text indexes for keyword search with BM25 scoring.""" """Create full-text indexes for keyword search with BM25 scoring."""
connector = Neo4jConnector() connector = Neo4jConnector()
try: try:
print("\n" + "=" * 70)
print("Creating Full-Text Indexes (for keyword search)")
print("=" * 70)
# 创建 Statements 索引 # 创建 Statements 索引
await connector.execute_query(""" await connector.execute_query("""
CREATE FULLTEXT INDEX statementsFulltext IF NOT EXISTS FOR (s:Statement) ON EACH [s.statement] CREATE FULLTEXT INDEX statementsFulltext IF NOT EXISTS FOR (s:Statement) ON EACH [s.statement]
OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } } OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
""") """)
print("✓ Created: statementsFulltext")
# # 创建 Dialogues 索引 # # 创建 Dialogues 索引
# await connector.execute_query(""" # await connector.execute_query("""
# CREATE FULLTEXT INDEX dialoguesFulltext IF NOT EXISTS FOR (d:Dialogue) ON EACH [d.content] # CREATE FULLTEXT INDEX dialoguesFulltext IF NOT EXISTS FOR (d:Dialogue) ON EACH [d.content]
# OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } } # OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
# """) # """)
# 创建 Entities 索引 # 创建 Entities 索引
await connector.execute_query(""" await connector.execute_query("""
CREATE FULLTEXT INDEX entitiesFulltext IF NOT EXISTS FOR (e:ExtractedEntity) ON EACH [e.name] CREATE FULLTEXT INDEX entitiesFulltext IF NOT EXISTS FOR (e:ExtractedEntity) ON EACH [e.name]
OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } } OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
""") """)
print("✓ Created: entitiesFulltext")
# 创建 Chunks 索引 # 创建 Chunks 索引
await connector.execute_query(""" await connector.execute_query("""
CREATE FULLTEXT INDEX chunksFulltext IF NOT EXISTS FOR (c:Chunk) ON EACH [c.content] CREATE FULLTEXT INDEX chunksFulltext IF NOT EXISTS FOR (c:Chunk) ON EACH [c.content]
OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } } OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
""") """)
print("✓ Created: chunksFulltext")
# 创建 MemorySummary 索引 # 创建 MemorySummary 索引
await connector.execute_query(""" await connector.execute_query("""
CREATE FULLTEXT INDEX summariesFulltext IF NOT EXISTS FOR (m:MemorySummary) ON EACH [m.content] CREATE FULLTEXT INDEX summariesFulltext IF NOT EXISTS FOR (m:MemorySummary) ON EACH [m.content]
OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } } OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
""") """)
print("✓ Created: summariesFulltext")
# 创建 Community 索引 # 创建 Community 索引
await connector.execute_query(""" await connector.execute_query("""
CREATE FULLTEXT INDEX communitiesFulltext IF NOT EXISTS FOR (c:Community) ON EACH [c.name, c.summary] CREATE FULLTEXT INDEX communitiesFulltext IF NOT EXISTS FOR (c:Community) ON EACH [c.name, c.summary]
OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } } OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
""") """)
print("✓ Created: communitiesFulltext")
print("\nFull-text indexes created successfully with BM25 support.")
except Exception as e:
print(f"✗ Error creating full-text indexes: {e}")
finally: finally:
await connector.close() await connector.close()
async def create_vector_indexes(): async def create_vector_indexes():
"""Create vector indexes for fast embedding similarity search. """Create vector indexes for fast embedding similarity search.
@@ -65,12 +50,7 @@ async def create_vector_indexes():
""" """
connector = Neo4jConnector() connector = Neo4jConnector()
try: try:
print("\n" + "=" * 70)
print("Creating Vector Indexes (for embedding search)")
print("=" * 70)
print("Note: Adjust vector.dimensions if using different embedding model")
print(" Current setting: 1024 dimensions (for bge-m3)")
print()
# Statement embedding index # Statement embedding index
await connector.execute_query(""" await connector.execute_query("""
@@ -82,7 +62,7 @@ async def create_vector_indexes():
`vector.similarity_function`: 'cosine' `vector.similarity_function`: 'cosine'
}} }}
""") """)
print("✓ Created: statement_embedding_index")
# Chunk embedding index # Chunk embedding index
await connector.execute_query(""" await connector.execute_query("""
@@ -94,7 +74,7 @@ async def create_vector_indexes():
`vector.similarity_function`: 'cosine' `vector.similarity_function`: 'cosine'
}} }}
""") """)
print("✓ Created: chunk_embedding_index")
# Entity name embedding index # Entity name embedding index
await connector.execute_query(""" await connector.execute_query("""
@@ -106,7 +86,7 @@ async def create_vector_indexes():
`vector.similarity_function`: 'cosine' `vector.similarity_function`: 'cosine'
}} }}
""") """)
print("✓ Created: entity_embedding_index")
# Memory summary embedding index # Memory summary embedding index
await connector.execute_query(""" await connector.execute_query("""
@@ -118,8 +98,7 @@ async def create_vector_indexes():
`vector.similarity_function`: 'cosine' `vector.similarity_function`: 'cosine'
}} }}
""") """)
print("✓ Created: summary_embedding_index")
# Community summary embedding index # Community summary embedding index
await connector.execute_query(""" await connector.execute_query("""
CREATE VECTOR INDEX community_summary_embedding_index IF NOT EXISTS CREATE VECTOR INDEX community_summary_embedding_index IF NOT EXISTS
@@ -129,8 +108,7 @@ async def create_vector_indexes():
`vector.dimensions`: 1024, `vector.dimensions`: 1024,
`vector.similarity_function`: 'cosine' `vector.similarity_function`: 'cosine'
}} }}
""") """)
print("✓ Created: community_summary_embedding_index")
# Dialogue embedding index (optional) # Dialogue embedding index (optional)
await connector.execute_query(""" await connector.execute_query("""
@@ -142,91 +120,15 @@ async def create_vector_indexes():
`vector.similarity_function`: 'cosine' `vector.similarity_function`: 'cosine'
}} }}
""") """)
print("✓ Created: dialogue_embedding_index")
# Community summary embedding index
await connector.execute_query("""
CREATE VECTOR INDEX community_summary_embedding_index IF NOT EXISTS
FOR (c:Community)
ON c.summary_embedding
OPTIONS {indexConfig: {
`vector.dimensions`: 1024,
`vector.similarity_function`: 'cosine'
}}
""")
print("✓ Created: community_summary_embedding_index")
print("\nVector indexes created successfully!")
print("\nExpected performance improvement:")
print(" Before: ~1.4s for embedding search")
print(" After: ~0.05-0.2s for embedding search (10-30x faster!)")
except Exception as e:
print(f"✗ Error creating vector indexes: {e}")
finally: finally:
await connector.close() await connector.close()
async def create_config_id_indexes():
"""Create indexes on config_id fields for improved query performance.
These indexes enable fast filtering of nodes by configuration ID,
which is essential for configuration isolation and multi-tenant scenarios.
"""
connector = Neo4jConnector()
try:
print("\n" + "=" * 70)
print("Creating Config ID Indexes")
print("=" * 70)
# Dialogue.config_id index
await connector.execute_query("""
CREATE INDEX dialogue_config_id_index IF NOT EXISTS
FOR (d:Dialogue) ON (d.config_id)
""")
print("✓ Created: dialogue_config_id_index")
# Statement.config_id index
await connector.execute_query("""
CREATE INDEX statement_config_id_index IF NOT EXISTS
FOR (s:Statement) ON (s.config_id)
""")
print("✓ Created: statement_config_id_index")
# ExtractedEntity.config_id index
await connector.execute_query("""
CREATE INDEX entity_config_id_index IF NOT EXISTS
FOR (e:ExtractedEntity) ON (e.config_id)
""")
print("✓ Created: entity_config_id_index")
# MemorySummary.config_id index
await connector.execute_query("""
CREATE INDEX summary_config_id_index IF NOT EXISTS
FOR (m:MemorySummary) ON (m.config_id)
""")
print("✓ Created: summary_config_id_index")
print("\nConfig ID indexes created successfully!")
print("These indexes enable fast filtering by configuration ID.")
except Exception as e:
print(f"✗ Error creating config_id indexes: {e}")
finally:
await connector.close()
async def create_unique_constraints(): async def create_unique_constraints():
"""Create uniqueness constraints for core node identifiers. """Create uniqueness constraints for core node identifiers.
Ensures concurrent MERGE operations remain safe and prevents duplicates. Ensures concurrent MERGE operations remain safe and prevents duplicates.
""" """
connector = Neo4jConnector() connector = Neo4jConnector()
try: try:
print("\n" + "=" * 70)
print("Creating Unique Constraints")
print("=" * 70)
# Dialogue.id unique # Dialogue.id unique
await connector.execute_query( await connector.execute_query(
""" """
@@ -234,8 +136,7 @@ async def create_unique_constraints():
FOR (d:Dialogue) REQUIRE d.id IS UNIQUE FOR (d:Dialogue) REQUIRE d.id IS UNIQUE
""" """
) )
print("✓ Created: dialog_id_unique")
# Statement.id unique # Statement.id unique
await connector.execute_query( await connector.execute_query(
""" """
@@ -243,8 +144,7 @@ async def create_unique_constraints():
FOR (s:Statement) REQUIRE s.id IS UNIQUE FOR (s:Statement) REQUIRE s.id IS UNIQUE
""" """
) )
print("✓ Created: statement_id_unique")
# Chunk.id unique # Chunk.id unique
await connector.execute_query( await connector.execute_query(
""" """
@@ -252,112 +152,13 @@ async def create_unique_constraints():
FOR (c:Chunk) REQUIRE c.id IS UNIQUE FOR (c:Chunk) REQUIRE c.id IS UNIQUE
""" """
) )
print("✓ Created: chunk_id_unique")
print("\nUnique constraints ensured for Dialogue, Statement, and Chunk.")
except Exception as e:
print(f"✗ Error creating unique constraints: {e}")
finally: finally:
await connector.close() await connector.close()
async def create_all_indexes(): async def create_all_indexes():
"""Create all indexes and constraints in one go.""" """Create all indexes and constraints in one go."""
print("\n" + "=" * 70)
print("Neo4j Index & Constraint Setup")
print("=" * 70)
print("This will create:")
print(" 1. Full-text indexes (for keyword/BM25 search)")
print(" 2. Vector indexes (for embedding similarity search)")
print(" 3. Config ID indexes (for configuration isolation)")
print(" 4. Unique constraints (for data integrity)")
print("=" * 70)
await create_fulltext_indexes() await create_fulltext_indexes()
await create_vector_indexes() await create_vector_indexes()
await create_config_id_indexes()
await create_unique_constraints() await create_unique_constraints()
print("\n" + "=" * 70)
print("✓ All indexes and constraints created successfully!") print("✓ All indexes and constraints created successfully!")
print("=" * 70)
print("\nTo verify, run in Neo4j Browser:")
print(" SHOW INDEXES")
print(" SHOW CONSTRAINTS")
print()
async def check_indexes():
"""Check what indexes currently exist."""
connector = Neo4jConnector()
try:
print("\n" + "=" * 70)
print("Checking Existing Indexes")
print("=" * 70)
query = "SHOW INDEXES"
result = await connector.execute_query(query)
fulltext_indexes = [idx for idx in result if idx.get('type') == 'FULLTEXT']
vector_indexes = [idx for idx in result if idx.get('type') == 'VECTOR']
range_indexes = [idx for idx in result if idx.get('type') == 'RANGE']
print(f"\nFull-text indexes: {len(fulltext_indexes)}")
for idx in fulltext_indexes:
print(f"{idx.get('name')}")
print(f"\nVector indexes: {len(vector_indexes)}")
for idx in vector_indexes:
print(f"{idx.get('name')}")
print(f"\nRange indexes (including config_id): {len(range_indexes)}")
for idx in range_indexes:
print(f"{idx.get('name')}")
if not vector_indexes:
print("\n⚠️ WARNING: No vector indexes found!")
print(" Embedding search will be VERY SLOW (~1.4s)")
print(" Run: python create_indexes.py")
# Check for config_id indexes
config_id_indexes = [idx for idx in range_indexes if 'config_id' in idx.get('name', '')]
if len(config_id_indexes) < 4:
print("\n⚠️ WARNING: Not all config_id indexes found!")
print(f" Expected 4, found {len(config_id_indexes)}")
print(" Run: python create_indexes.py config_id")
print("=" * 70)
finally:
await connector.close()
if __name__ == "__main__":
import asyncio
import sys
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "check":
asyncio.run(check_indexes())
elif command == "fulltext":
asyncio.run(create_fulltext_indexes())
elif command == "vector":
asyncio.run(create_vector_indexes())
elif command == "config_id":
asyncio.run(create_config_id_indexes())
elif command == "constraints":
asyncio.run(create_unique_constraints())
else:
print(f"Unknown command: {command}")
print("\nUsage:")
print(" python create_indexes.py # Create all indexes")
print(" python create_indexes.py check # Check existing indexes")
print(" python create_indexes.py fulltext # Create only full-text indexes")
print(" python create_indexes.py vector # Create only vector indexes")
print(" python create_indexes.py config_id # Create only config_id indexes")
print(" python create_indexes.py constraints # Create only constraints")
else:
asyncio.run(create_all_indexes())