refactor(memory): enhance extraction ontology and add assistant pruning graph support

- Expand entity type ontology with detailed definitions, examples, and notes
  (merged types: 地点设施, 物品设备, 产品服务, 软件平台, 角色职业, 知识能力, 偏好习惯目标, 称呼别名, 智能体)
- Add relation ontology taxonomy with 15 predicate categories and usage rules
- Strengthen reference resolution rules: resolve pronouns before extraction,
  skip unresolvable references entirely
- Add guidelines to avoid extracting abstract propositions, emotions, and
  low-value entities (effort/reward/success patterns)
- Add 7 new extraction examples covering edge cases
- Add AssistantOriginal/AssistantPruned node models and graph persistence
  (PRUNED_TO and BELONGS_TO_DIALOG edges, Neo4j indexes and constraints)
- Add graph_build_step.py for building graph nodes/edges from DialogData
- Update write_pipeline.py to pass assistant pruning nodes/edges to graph saver
- Update data_pruning.py with related preprocessing changes
This commit is contained in:
lanceyq
2026-04-28 13:32:29 +08:00
parent 2355536b44
commit 7747ed7ac1
11 changed files with 917 additions and 421 deletions

View File

@@ -46,6 +46,12 @@ async def create_fulltext_indexes():
OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
""")
# 创建 AssistantPruned 剪枝文本全文索引
await connector.execute_query("""
CREATE FULLTEXT INDEX assistantPrunedFulltext IF NOT EXISTS FOR (p:AssistantPruned) ON EACH [p.text]
OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
""")
finally:
await connector.close()
@@ -135,6 +141,17 @@ async def create_vector_indexes():
`vector.similarity_function`: 'cosine'
}}
""")
# AssistantPruned text embedding index (optional, for semantic search on pruned hints)
await connector.execute_query("""
CREATE VECTOR INDEX assistant_pruned_embedding_index IF NOT EXISTS
FOR (p:AssistantPruned)
ON p.text_embedding
OPTIONS {indexConfig: {
`vector.dimensions`: 1024,
`vector.similarity_function`: 'cosine'
}}
""")
finally:
await connector.close()
@@ -179,6 +196,22 @@ async def create_unique_constraints():
"""
)
# AssistantOriginal.id unique
await connector.execute_query(
"""
CREATE CONSTRAINT assistant_original_id_unique IF NOT EXISTS
FOR (o:AssistantOriginal) REQUIRE o.id IS UNIQUE
"""
)
# AssistantPruned.id unique
await connector.execute_query(
"""
CREATE CONSTRAINT assistant_pruned_id_unique IF NOT EXISTS
FOR (p:AssistantPruned) REQUIRE p.id IS UNIQUE
"""
)
finally:
await connector.close()

View File

@@ -1363,154 +1363,60 @@ ORDER BY score DESC
LIMIT $limit
"""
SEARCH_STATEMENTS_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("statementsFulltext", $query) YIELD node AS s, score
WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity)
RETURN s.id AS id,
s.statement AS statement,
s.end_user_id AS end_user_id,
s.chunk_id AS chunk_id,
s.created_at AS created_at,
s.expired_at AS expired_at,
s.valid_at AS valid_at,
properties(s)['invalid_at'] AS invalid_at,
c.id AS chunk_id_from_rel,
collect(DISTINCT e.id) AS entity_ids,
COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value,
COALESCE(s.importance_score, 0.5) AS importance_score,
s.last_access_time AS last_access_time,
COALESCE(s.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
SEARCH_ENTITIES_BY_NAME_OR_ALIAS = """
CALL db.index.fulltext.queryNodes("entitiesFulltext", $query) YIELD node AS e, score
WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
WITH e, score
With collect({entity: e, score: score}) AS fulltextResults
# ── Assistant Pruning Nodes & Edges ──
OPTIONAL MATCH (ae:ExtractedEntity)
WHERE ($end_user_id IS NULL OR ae.end_user_id = $end_user_id)
AND ae.aliases IS NOT NULL
AND ANY(alias IN ae.aliases WHERE toLower(alias) CONTAINS toLower($query))
WITH fulltextResults, collect(ae) AS aliasEntities
UNWIND (fulltextResults + [x IN aliasEntities | {entity: x, score:
CASE
WHEN ANY(alias IN x.aliases WHERE toLower(alias) = toLower($query)) THEN 1.0
WHEN ANY(alias IN x.aliases WHERE toLower(alias) STARTS WITH toLower($query)) THEN 0.9
ELSE 0.8
END
}]) AS row
WITH row.entity AS e, row.score AS score
WITH DISTINCT e, MAX(score) AS score
OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
RETURN e.id AS id,
e.name AS name,
e.end_user_id AS end_user_id,
e.entity_type AS entity_type,
e.created_at AS created_at,
e.expired_at AS expired_at,
e.entity_idx AS entity_idx,
e.statement_id AS statement_id,
e.description AS description,
e.aliases AS aliases,
e.name_embedding AS name_embedding,
e.connect_strength AS connect_strength,
collect(DISTINCT s.id) AS statement_ids,
collect(DISTINCT c.id) AS chunk_ids,
COALESCE(e.activation_value, e.importance_score, 0.5) AS activation_value,
COALESCE(e.importance_score, 0.5) AS importance_score,
e.last_access_time AS last_access_time,
COALESCE(e.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
SEARCH_CHUNKS_BY_CONTENT = """
CALL db.index.fulltext.queryNodes("chunksFulltext", $query) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
OPTIONAL MATCH (c)-[:CONTAINS]->(s:Statement)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity)
RETURN c.id AS id,
c.end_user_id AS end_user_id,
c.content AS content,
c.dialog_id AS dialog_id,
c.sequence_number AS sequence_number,
collect(DISTINCT s.id) AS statement_ids,
collect(DISTINCT e.id) AS entity_ids,
COALESCE(c.activation_value, 0.5) AS activation_value,
c.last_access_time AS last_access_time,
COALESCE(c.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
# MemorySummary keyword search using fulltext index
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("summariesFulltext", $query) YIELD node AS m, score
WHERE ($end_user_id IS NULL OR m.end_user_id = $end_user_id)
OPTIONAL MATCH (m)-[:DERIVED_FROM_STATEMENT]->(s:Statement)
RETURN m.id AS id,
m.name AS name,
m.end_user_id AS end_user_id,
m.dialog_id AS dialog_id,
m.chunk_ids AS chunk_ids,
m.content AS content,
m.created_at AS created_at,
COALESCE(m.activation_value, m.importance_score, 0.5) AS activation_value,
COALESCE(m.importance_score, 0.5) AS importance_score,
m.last_access_time AS last_access_time,
COALESCE(m.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
# Community keyword search: matches name or summary via fulltext index
SEARCH_COMMUNITIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("communitiesFulltext", $query) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
RETURN c.community_id AS id,
c.name AS name,
c.summary AS content,
c.core_entities AS core_entities,
c.member_count AS member_count,
c.end_user_id AS end_user_id,
c.updated_at AS updated_at,
score
ORDER BY score DESC
LIMIT $limit
"""
FULLTEXT_QUERY_CYPHER_MAPPING = {
Neo4jNodeType.STATEMENT: SEARCH_STATEMENTS_BY_KEYWORD,
Neo4jNodeType.EXTRACTEDENTITY: SEARCH_ENTITIES_BY_NAME_OR_ALIAS,
Neo4jNodeType.CHUNK: SEARCH_CHUNKS_BY_CONTENT,
Neo4jNodeType.MEMORYSUMMARY: SEARCH_MEMORY_SUMMARIES_BY_KEYWORD,
Neo4jNodeType.COMMUNITY: SEARCH_COMMUNITIES_BY_KEYWORD,
Neo4jNodeType.PERCEPTUAL: SEARCH_PERCEPTUALS_BY_KEYWORD
ASSISTANT_ORIGINAL_NODE_SAVE = """
UNWIND $originals AS orig
MERGE (o:AssistantOriginal {id: orig.id})
SET o += {
end_user_id: orig.end_user_id,
run_id: orig.run_id,
dialog_id: orig.dialog_id,
pair_id: orig.pair_id,
text: orig.text,
created_at: orig.created_at,
expired_at: orig.expired_at
}
USER_ID_QUERY_CYPHER_MAPPING = {
Neo4jNodeType.STATEMENT: SEARCH_STATEMENTS_BY_USER_ID,
Neo4jNodeType.EXTRACTEDENTITY: SEARCH_ENTITIES_BY_USER_ID,
Neo4jNodeType.CHUNK: SEARCH_CHUNKS_BY_USER_ID,
Neo4jNodeType.MEMORYSUMMARY: SEARCH_MEMORY_SUMMARIES_BY_USER_ID,
Neo4jNodeType.COMMUNITY: SEARCH_COMMUNITIES_BY_USER_ID,
Neo4jNodeType.PERCEPTUAL: SEARCH_PERCEPTUAL_BY_USER_ID
}
NODE_ID_QUERY_CYPHER_MAPPING = {
Neo4jNodeType.STATEMENT: SEARCH_STATEMENTS_BY_IDS,
Neo4jNodeType.EXTRACTEDENTITY: SEARCH_ENTITIES_BY_IDS,
Neo4jNodeType.CHUNK: SEARCH_CHUNKS_BY_IDS,
Neo4jNodeType.MEMORYSUMMARY: SEARCH_MEMORY_SUMMARIES_BY_IDS,
Neo4jNodeType.COMMUNITY: SEARCH_COMMUNITIES_BY_IDS,
Neo4jNodeType.PERCEPTUAL: SEARCH_PERCEPTUAL_BY_IDS
RETURN o.id AS uuid
"""
ASSISTANT_PRUNED_NODE_SAVE = """
UNWIND $pruneds AS p
MERGE (pr:AssistantPruned {id: p.id})
SET pr += {
end_user_id: p.end_user_id,
run_id: p.run_id,
dialog_id: p.dialog_id,
pair_id: p.pair_id,
text: p.text,
memory_type: p.memory_type,
text_embedding: p.text_embedding,
created_at: p.created_at,
expired_at: p.expired_at
}
RETURN pr.id AS uuid
"""
ASSISTANT_PRUNED_EDGE_SAVE = """
UNWIND $edges AS edge
MATCH (o:AssistantOriginal {id: edge.source})
MATCH (p:AssistantPruned {id: edge.target})
MERGE (o)-[r:PRUNED_TO]->(p)
SET r.pair_id = edge.pair_id,
r.end_user_id = edge.end_user_id,
r.run_id = edge.run_id,
r.created_at = edge.created_at
RETURN elementId(r) AS uuid
"""
ASSISTANT_DIALOG_EDGE_SAVE = """
UNWIND $edges AS edge
MATCH (o:AssistantOriginal {id: edge.source})
MATCH (d:Dialogue {id: edge.target})
MERGE (o)-[r:BELONGS_TO_DIALOG]->(d)
SET r.end_user_id = edge.end_user_id,
r.run_id = edge.run_id,
r.created_at = edge.created_at
RETURN elementId(r) AS uuid
"""

View File

@@ -24,6 +24,10 @@ from app.core.memory.models.graph_models import (
EntityEntityEdge,
PerceptualNode,
PerceptualEdge,
AssistantOriginalNode,
AssistantPrunedNode,
AssistantPrunedEdge,
AssistantDialogEdge,
)
import logging
@@ -166,6 +170,10 @@ async def save_dialog_and_statements_to_neo4j(
statement_entity_edges: List[StatementEntityEdge],
perceptual_edges: List[PerceptualEdge],
connector: Neo4jConnector,
assistant_original_nodes: Optional[List[AssistantOriginalNode]] = None,
assistant_pruned_nodes: Optional[List[AssistantPrunedNode]] = None,
assistant_pruned_edges: Optional[List[AssistantPrunedEdge]] = None,
assistant_dialog_edges: Optional[List[AssistantDialogEdge]] = None,
) -> bool:
"""Save dialogue nodes, chunk nodes, statement nodes, entities, and all relationships to Neo4j using graph models.
@@ -368,6 +376,55 @@ async def save_dialog_and_statements_to_neo4j(
results['perceptual_chunk_edges'] = perceptual_edges_uuids
logger.info(f"Successfully saved {len(perceptual_edges_uuids)} perceptual-chunk edges to Neo4j")
# 8. Save assistant original nodes
if assistant_original_nodes:
from app.repositories.neo4j.cypher_queries import ASSISTANT_ORIGINAL_NODE_SAVE
original_data = [node.model_dump() for node in assistant_original_nodes]
result = await tx.run(ASSISTANT_ORIGINAL_NODE_SAVE, originals=original_data)
original_uuids = [record["uuid"] async for record in result]
results['assistant_originals'] = original_uuids
logger.info(f"Successfully saved {len(original_uuids)} assistant original nodes to Neo4j")
# 9. Save assistant pruned nodes
if assistant_pruned_nodes:
from app.repositories.neo4j.cypher_queries import ASSISTANT_PRUNED_NODE_SAVE
pruned_data = [node.model_dump() for node in assistant_pruned_nodes]
result = await tx.run(ASSISTANT_PRUNED_NODE_SAVE, pruneds=pruned_data)
pruned_uuids = [record["uuid"] async for record in result]
results['assistant_pruneds'] = pruned_uuids
logger.info(f"Successfully saved {len(pruned_uuids)} assistant pruned nodes to Neo4j")
# 10. Save PRUNED_TO edges (Original → Pruned)
if assistant_pruned_edges:
from app.repositories.neo4j.cypher_queries import ASSISTANT_PRUNED_EDGE_SAVE
edge_data = [{
"source": edge.source,
"target": edge.target,
"pair_id": edge.pair_id,
"end_user_id": edge.end_user_id,
"run_id": edge.run_id,
"created_at": edge.created_at.isoformat() if edge.created_at else None,
} for edge in assistant_pruned_edges]
result = await tx.run(ASSISTANT_PRUNED_EDGE_SAVE, edges=edge_data)
pruned_edge_uuids = [record["uuid"] async for record in result]
results['assistant_pruned_edges'] = pruned_edge_uuids
logger.info(f"Successfully saved {len(pruned_edge_uuids)} PRUNED_TO edges to Neo4j")
# 11. Save BELONGS_TO_DIALOG edges (Original → Dialogue)
if assistant_dialog_edges:
from app.repositories.neo4j.cypher_queries import ASSISTANT_DIALOG_EDGE_SAVE
edge_data = [{
"source": edge.source,
"target": edge.target,
"end_user_id": edge.end_user_id,
"run_id": edge.run_id,
"created_at": edge.created_at.isoformat() if edge.created_at else None,
} for edge in assistant_dialog_edges]
result = await tx.run(ASSISTANT_DIALOG_EDGE_SAVE, edges=edge_data)
dialog_edge_uuids = [record["uuid"] async for record in result]
results['assistant_dialog_edges'] = dialog_edge_uuids
logger.info(f"Successfully saved {len(dialog_edge_uuids)} BELONGS_TO_DIALOG edges to Neo4j")
return results
try: