from app.core.memory.enums import Neo4jNodeType DIALOGUE_NODE_SAVE = """ UNWIND $dialogues AS dialogue MERGE (n:Dialogue {id: dialogue.id}) SET n.uuid = coalesce(n.uuid, dialogue.id), n.end_user_id = dialogue.end_user_id, n.run_id = dialogue.run_id, n.ref_id = dialogue.ref_id, n.created_at = dialogue.created_at, n.expired_at = dialogue.expired_at, n.content = dialogue.content, n.dialog_embedding = dialogue.dialog_embedding RETURN n.id AS uuid """ STATEMENT_NODE_SAVE = """ UNWIND $statements AS statement MERGE (s:Statement {id: statement.id}) SET s += { id: statement.id, run_id: statement.run_id, chunk_id: statement.chunk_id, end_user_id: statement.end_user_id, stmt_type: statement.stmt_type, statement: statement.statement, speaker: statement.speaker, emotion_intensity: statement.emotion_intensity, emotion_target: statement.emotion_target, emotion_subject: statement.emotion_subject, emotion_type: statement.emotion_type, emotion_keywords: statement.emotion_keywords, temporal_info: statement.temporal_info, created_at: statement.created_at, expired_at: statement.expired_at, valid_at: statement.valid_at, invalid_at: statement.invalid_at, statement_embedding: statement.statement_embedding, relevence_info: statement.relevence_info, importance_score: statement.importance_score, activation_value: statement.activation_value, access_history: statement.access_history, last_access_time: statement.last_access_time, access_count: statement.access_count } RETURN s.id AS uuid """ CHUNK_NODE_SAVE = """ UNWIND $chunks AS chunk MERGE (c:Chunk {id: chunk.id}) SET c += { id: chunk.id, name: chunk.name, end_user_id: chunk.end_user_id, run_id: chunk.run_id, created_at: chunk.created_at, expired_at: chunk.expired_at, dialog_id: chunk.dialog_id, content: chunk.content, speaker: chunk.speaker, chunk_embedding: chunk.chunk_embedding, sequence_number: chunk.sequence_number, start_index: chunk.start_index, end_index: chunk.end_index } RETURN c.id AS uuid """ # bug修改点 EXTRACTED_ENTITY_NODE_SAVE = """ // Upsert entity nodes safely: preserve existing non-empty fields when incoming is empty UNWIND $entities AS entity MERGE (e:ExtractedEntity {id: entity.id}) SET e.name = CASE WHEN entity.name IS NOT NULL AND entity.name <> '' THEN entity.name ELSE e.name END, e.end_user_id = CASE WHEN entity.end_user_id IS NOT NULL AND entity.end_user_id <> '' THEN entity.end_user_id ELSE e.end_user_id END, e.run_id = CASE WHEN entity.run_id IS NOT NULL AND entity.run_id <> '' THEN entity.run_id ELSE e.run_id END, e.created_at = CASE WHEN entity.created_at IS NOT NULL AND (e.created_at IS NULL OR entity.created_at < e.created_at) THEN entity.created_at ELSE e.created_at END, e.expired_at = CASE WHEN entity.expired_at IS NOT NULL AND (e.expired_at IS NULL OR entity.expired_at > e.expired_at) THEN entity.expired_at ELSE e.expired_at END, e.entity_idx = CASE WHEN e.entity_idx IS NULL OR e.entity_idx = 0 THEN entity.entity_idx ELSE e.entity_idx END, e.entity_type = CASE WHEN entity.entity_type IS NOT NULL AND entity.entity_type <> '' THEN entity.entity_type ELSE e.entity_type END, e.description = CASE WHEN entity.description IS NOT NULL AND entity.description <> '' AND (e.description IS NULL OR size(e.description) = 0 OR size(entity.description) > size(e.description)) THEN entity.description ELSE e.description END, e.example = CASE WHEN entity.example IS NOT NULL AND entity.example <> '' THEN entity.example ELSE coalesce(e.example, '') END, e.statement_id = CASE WHEN entity.statement_id IS NOT NULL AND entity.statement_id <> '' THEN entity.statement_id ELSE e.statement_id END, e.aliases = CASE // 用户实体的 aliases 由 PgSQL end_user_info 作为唯一权威源,知识抽取完全不写入 WHEN entity.name IN ['用户', '我', 'User', 'I'] THEN e.aliases WHEN entity.aliases IS NOT NULL AND size(entity.aliases) > 0 THEN CASE WHEN e.aliases IS NULL THEN entity.aliases ELSE reduce(acc = [], alias IN (e.aliases + entity.aliases) | CASE WHEN alias IN acc THEN acc ELSE acc + alias END) END ELSE e.aliases END, e.name_embedding = CASE WHEN entity.name_embedding IS NOT NULL AND size(entity.name_embedding) > 0 THEN entity.name_embedding ELSE e.name_embedding END, // TODO: fact_summary 功能暂时禁用,待后续开发完善后启用 // e.fact_summary = CASE // WHEN entity.fact_summary IS NOT NULL AND entity.fact_summary <> '' // AND (e.fact_summary IS NULL OR size(e.fact_summary) = 0 OR size(entity.fact_summary) > size(e.fact_summary)) // THEN entity.fact_summary ELSE e.fact_summary END, e.connect_strength = CASE WHEN entity.connect_strength IS NULL OR entity.connect_strength = '' THEN e.connect_strength ELSE CASE WHEN e.connect_strength = 'strong' AND entity.connect_strength = 'weak' THEN 'both' WHEN e.connect_strength = 'weak' AND entity.connect_strength = 'strong' THEN 'both' WHEN e.connect_strength IS NULL OR e.connect_strength = '' THEN entity.connect_strength ELSE e.connect_strength END END, e.importance_score = CASE WHEN entity.importance_score IS NOT NULL THEN entity.importance_score ELSE coalesce(e.importance_score, 0.5) END, e.activation_value = CASE WHEN entity.activation_value IS NOT NULL THEN entity.activation_value ELSE e.activation_value END, e.access_history = CASE WHEN entity.access_history IS NOT NULL THEN entity.access_history ELSE coalesce(e.access_history, []) END, e.last_access_time = CASE WHEN entity.last_access_time IS NOT NULL THEN entity.last_access_time ELSE e.last_access_time END, e.access_count = CASE WHEN entity.access_count IS NOT NULL THEN entity.access_count ELSE coalesce(e.access_count, 0) END, e.is_explicit_memory = CASE WHEN entity.is_explicit_memory IS NOT NULL THEN entity.is_explicit_memory ELSE coalesce(e.is_explicit_memory, false) END RETURN e.id AS uuid """ # Add back ENTITY_RELATIONSHIP_SAVE to be used by graph_saver.save_entities_and_relationships ENTITY_RELATIONSHIP_SAVE = """ UNWIND $relationships AS rel // Match entities by stable id within end_user_id, do not constrain by run_id MATCH (subject:ExtractedEntity {id: rel.source_id, end_user_id: rel.end_user_id}) MATCH (object:ExtractedEntity {id: rel.target_id, end_user_id: rel.end_user_id}) // Avoid duplicate edges across runs for the same endpoints MERGE (subject)-[r:EXTRACTED_RELATIONSHIP]->(object) SET r.predicate = rel.predicate, r.statement_id = rel.statement_id, r.value = rel.value, r.statement = rel.statement, r.valid_at = rel.valid_at, r.invalid_at = rel.invalid_at, r.created_at = rel.created_at, r.expired_at = rel.expired_at, r.run_id = rel.run_id, r.end_user_id = rel.end_user_id RETURN elementId(r) AS uuid """ CHUNK_STATEMENT_EDGE_SAVE = """ UNWIND $chunk_statement_edges AS edge MATCH (statement:Statement {id: edge.source, run_id: edge.run_id}) MATCH (chunk:Chunk {id: edge.target, run_id: edge.run_id}) MERGE (chunk)-[e:CONTAINS {id: edge.id}]->(statement) SET e.end_user_id = edge.end_user_id, e.run_id = edge.run_id, e.created_at = edge.created_at, e.expired_at = edge.expired_at RETURN e.id AS uuid """ STATEMENT_ENTITY_EDGE_SAVE = """ UNWIND $relationships AS rel // Statement nodes are per-run; keep run_id constraint on statements MATCH (statement:Statement {id: rel.source, run_id: rel.run_id}) // Entities are shared across runs within end_user_id; do not constrain by run_id MATCH (entity:ExtractedEntity {id: rel.target, end_user_id: rel.end_user_id}) // Avoid duplicate edges across runs for same endpoints MERGE (statement)-[r:REFERENCES_ENTITY]->(entity) SET r.end_user_id = rel.end_user_id, r.run_id = rel.run_id, r.created_at = rel.created_at, r.expired_at = rel.expired_at, r.connect_strength = rel.connect_strength RETURN elementId(r) AS uuid """ # 查询实体名称包含指定字符串的实体 SEARCH_ENTITIES_BY_NAME = """ CALL db.index.fulltext.queryNodes("entitiesFulltext", $query) YIELD node AS e, score WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id) OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e) OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s) RETURN e.id AS id, e.name AS name, e.end_user_id AS end_user_id, e.entity_type AS entity_type, e.created_at AS created_at, e.expired_at AS expired_at, e.entity_idx AS entity_idx, e.statement_id AS statement_id, e.description AS description, e.aliases AS aliases, e.name_embedding AS name_embedding, // TODO: fact_summary 功能暂时禁用,待后续开发完善后启用 // COALESCE(e.fact_summary, '') AS fact_summary, e.connect_strength AS connect_strength, collect(DISTINCT s.id) AS statement_ids, collect(DISTINCT c.id) AS chunk_ids, COALESCE(e.activation_value, e.importance_score, 0.5) AS activation_value, COALESCE(e.importance_score, 0.5) AS importance_score, e.last_access_time AS last_access_time, COALESCE(e.access_count, 0) AS access_count, score ORDER BY score DESC LIMIT $limit """ # 以下是关于第二层去重消歧与数据库进行检索的语句,在最近的规划中不再使用 # # 同组group_id下按“精确名字或别名+可选类型一致”来检索 # SECOND_LAYER_CANDIDATE_MATCH_BATCH = """ # UNWIND $rows AS row # MATCH (e:ExtractedEntity) # WHERE e.group_id = row.group_id # AND (toLower(e.name) = toLower(row.name) OR any(a IN e.aliases WHERE toLower(a) = toLower(row.name))) # AND (row.entity_type IS NULL OR e.entity_type = row.entity_type) # RETURN row.id AS incoming_id, # e.id AS id, # e.name AS name, # e.group_id AS group_id, # e.entity_idx AS entity_idx, # e.entity_type AS entity_type, # e.description AS description, # e.statement_id AS statement_id, # e.aliases AS aliases, # e.name_embedding AS name_embedding, # e.fact_summary AS fact_summary, # e.connect_strength AS connect_strength, # e.created_at AS created_at, # e.expired_at AS expired_at # """ # # 同组group_id下按name contains召回补充 # SECOND_LAYER_CANDIDATE_CONTAINS_BATCH = """ # UNWIND $rows AS row # MATCH (e:ExtractedEntity) # WHERE e.group_id = row.group_id # AND toLower(e.name) CONTAINS toLower(row.name) # RETURN row.id AS incoming_id, # e.id AS id, # e.name AS name, # e.group_id AS group_id, # e.entity_idx AS entity_idx, # e.entity_type AS entity_type, # e.description AS description, # e.statement_id AS statement_id, # e.aliases AS aliases, # e.name_embedding AS name_embedding, # e.fact_summary AS fact_summary, # e.connect_strength AS connect_strength, # e.created_at AS created_at, # e.expired_at AS expired_at # """ SEARCH_DIALOGUE_BY_DIALOG_ID = """ MATCH (d:Dialogue) WHERE ($end_user_id IS NULL OR d.end_user_id = $end_user_id) AND d.id = $dialog_id RETURN d.id AS dialog_id, d.end_user_id AS end_user_id, d.content AS content, d.created_at AS created_at, d.expired_at AS expired_at ORDER BY d.created_at DESC LIMIT $limit """ SEARCH_CHUNK_BY_CHUNK_ID = """ MATCH (c:Chunk) WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id) AND c.id = $chunk_id RETURN c.id AS chunk_id, c.end_user_id AS end_user_id, c.content AS content, c.dialog_id AS dialog_id, c.created_at AS created_at, c.expired_at AS expired_at, c.sequence_number AS sequence_number ORDER BY c.created_at DESC LIMIT $limit """ SEARCH_STATEMENTS_BY_TEMPORAL = """ MATCH (s:Statement) WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id) AND ((($start_date IS NULL OR datetime(s.created_at) >= datetime($start_date)) AND ($end_date IS NULL OR datetime(s.created_at) <= datetime($end_date))) OR (($valid_date IS NULL OR (s.valid_at IS NOT NULL AND datetime(s.valid_at) >= datetime($valid_date))) AND ($invalid_date IS NULL OR (s.invalid_at IS NOT NULL AND datetime(s.invalid_at) <= datetime($invalid_date))))) RETURN s.id AS id, s.statement AS statement, s.end_user_id AS end_user_id, s.chunk_id AS chunk_id, s.created_at AS created_at, s.valid_at AS valid_at, s.invalid_at AS invalid_at, collect(DISTINCT s.id) AS statement_ids, COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value, COALESCE(s.importance_score, 0.5) AS importance_score, s.last_access_time AS last_access_time, COALESCE(s.access_count, 0) AS access_count ORDER BY datetime(s.created_at) DESC LIMIT $limit """ SEARCH_STATEMENTS_BY_KEYWORD_TEMPORAL = """ CALL db.index.fulltext.queryNodes("statementsFulltext", $query) YIELD node AS s, score WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id) AND ((($start_date IS NULL OR (s.created_at IS NOT NULL AND datetime(s.created_at) >= datetime($start_date))) AND ($end_date IS NULL OR (s.created_at IS NOT NULL AND datetime(s.created_at) <= datetime($end_date)))) OR (($valid_date IS NULL OR (s.valid_at IS NOT NULL AND datetime(s.valid_at) >= datetime($valid_date))) AND ($invalid_date IS NULL OR (s.invalid_at IS NOT NULL AND datetime(s.invalid_at) <= datetime($invalid_date))))) OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s) OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity) RETURN s.id AS id, s.statement AS statement, s.end_user_id AS end_user_id, s.chunk_id AS chunk_id, s.created_at AS created_at, s.valid_at AS valid_at, s.invalid_at AS invalid_at, c.id AS chunk_id_from_rel, collect(DISTINCT e.id) AS entity_ids, COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value, COALESCE(s.importance_score, 0.5) AS importance_score, s.last_access_time AS last_access_time, COALESCE(s.access_count, 0) AS access_count, score ORDER BY s.created_at DESC, score DESC LIMIT $limit """ SEARCH_STATEMENTS_BY_CREATED_AT = """ MATCH (n:Statement) WHERE ($end_user_id IS NULL OR n.end_user_id = $end_user_id) AND ($created_at IS NOT NULL AND date(substring(n.created_at, 0, 10)) = date($created_at)) RETURN n.id AS id, n.statement AS statement, n.end_user_id AS end_user_id, n.chunk_id AS chunk_id, n.created_at AS created_at, n.valid_at AS valid_at, n.invalid_at AS invalid_at, collect(DISTINCT n.id) AS statement_ids ORDER BY n.created_at DESC LIMIT $limit """ SEARCH_STATEMENTS_BY_VALID_AT = """ MATCH (n:Statement) WHERE ($end_user_id IS NULL OR n.end_user_id = $end_user_id) AND ($valid_at IS NOT NULL AND date(substring(n.valid_at, 0, 10)) = date($valid_at)) RETURN n.id AS id, n.statement AS statement, n.end_user_id AS end_user_id, n.chunk_id AS chunk_id, n.created_at AS created_at, n.valid_at AS valid_at, n.invalid_at AS invalid_at, collect(DISTINCT n.id) AS statement_ids ORDER BY n.valid_at DESC LIMIT $limit """ SEARCH_STATEMENTS_G_CREATED_AT = """ MATCH (n:Statement) WHERE ($end_user_id IS NULL OR n.end_user_id = $end_user_id) AND ($created_at IS NOT NULL AND date(substring(n.created_at, 0, 19)) = date($created_at)) RETURN n.id AS id, n.statement AS statement, n.end_user_id AS end_user_id, n.chunk_id AS chunk_id, n.created_at AS created_at, n.valid_at AS valid_at, n.invalid_at AS invalid_at, collect(DISTINCT n.id) AS statement_ids ORDER BY n.created_at DESC LIMIT $limit """ SEARCH_STATEMENTS_L_CREATED_AT = """ MATCH (n:Statement) WHERE ($end_user_id IS NULL OR n.end_user_id = $end_user_id) AND ($created_at IS NOT NULL AND date(substring(n.created_at, 0, 19)) < date($created_at)) RETURN n.id AS id, n.statement AS statement, n.end_user_id AS end_user_id, n.chunk_id AS chunk_id, n.created_at AS created_at, n.valid_at AS valid_at, n.invalid_at AS invalid_at, collect(DISTINCT n.id) AS statement_ids ORDER BY n.created_at DESC LIMIT $limit """ SEARCH_STATEMENTS_G_VALID_AT = """ MATCH (n:Statement) WHERE ($end_user_id IS NULL OR n.end_user_id = $end_user_id) AND ($valid_at IS NOT NULL AND date(substring(n.valid_at, 0, 10)) > date($valid_at)) RETURN n.id AS id, n.statement AS statement, n.end_user_id AS end_user_id, n.chunk_id AS chunk_id, n.created_at AS created_at, n.valid_at AS valid_at, n.invalid_at AS invalid_at, collect(DISTINCT n.id) AS statement_ids ORDER BY n.valid_at DESC LIMIT $limit """ SEARCH_STATEMENTS_L_VALID_AT = """ MATCH (n:Statement) WHERE ($end_user_id IS NULL OR n.end_user_id = $end_user_id) AND ($valid_at IS NOT NULL AND date(substring(n.valid_at, 0, 10)) < date($valid_at)) RETURN n.id AS id, n.statement AS statement, n.end_user_id AS end_user_id, n.chunk_id AS chunk_id, n.created_at AS created_at, n.valid_at AS valid_at, n.invalid_at AS invalid_at, collect(DISTINCT n.id) AS statement_ids ORDER BY n.valid_at DESC LIMIT $limit """ # 以下是关于第二层去重消歧与数据库进行检索的语句,在最近的规划中不再使用 # # 同组group_id下按“精确名字或别名+可选类型一致”来检索 # SECOND_LAYER_CANDIDATE_MATCH_BATCH = """ # UNWIND $rows AS row # MATCH (e:ExtractedEntity) # WHERE e.group_id = row.group_id # AND (toLower(e.name) = toLower(row.name) OR any(a IN e.aliases WHERE toLower(a) = toLower(row.name))) # AND (row.entity_type IS NULL OR e.entity_type = row.entity_type) # RETURN row.id AS incoming_id, # e.id AS id, # e.name AS name, # e.group_id AS group_id, # e.entity_idx AS entity_idx, # e.entity_type AS entity_type, # e.description AS description, # e.statement_id AS statement_id, # e.aliases AS aliases, # e.name_embedding AS name_embedding, # e.fact_summary AS fact_summary, # e.connect_strength AS connect_strength, # e.created_at AS created_at, # e.expired_at AS expired_at # """ # # 同组group_id下按name contains召回补充 # SECOND_LAYER_CANDIDATE_CONTAINS_BATCH = """ # UNWIND $rows AS row # MATCH (e:ExtractedEntity) # WHERE e.group_id = row.group_id # AND toLower(e.name) CONTAINS toLower(row.name) # RETURN row.id AS incoming_id, # e.id AS id, # e.name AS name, # e.group_id AS group_id, # e.entity_idx AS entity_idx, # e.entity_type AS entity_type, # e.description AS description, # e.statement_id AS statement_id, # e.aliases AS aliases, # e.name_embedding AS name_embedding, # e.fact_summary AS fact_summary, # e.connect_strength AS connect_strength, # e.created_at AS created_at, # e.expired_at AS expired_at # """ # 根据id修改句子的invalid_at的值 UPDATE_STATEMENT_INVALID_AT = """ MATCH (n:Statement {end_user_id: $end_user_id, id: $id}) SET n.invalid_at = $new_invalid_at """ MEMORY_SUMMARY_NODE_SAVE = """ UNWIND $summaries AS summary MERGE (m:MemorySummary {id: summary.id}) SET m += { id: summary.id, name: summary.name, end_user_id: summary.end_user_id, run_id: summary.run_id, created_at: summary.created_at, expired_at: summary.expired_at, dialog_id: summary.dialog_id, chunk_ids: summary.chunk_ids, content: summary.content, memory_type: summary.memory_type, summary_embedding: summary.summary_embedding, config_id: summary.config_id, importance_score: CASE WHEN summary.importance_score IS NOT NULL THEN summary.importance_score ELSE coalesce(m.importance_score, 0.5) END, activation_value: CASE WHEN summary.activation_value IS NOT NULL THEN summary.activation_value ELSE m.activation_value END, access_history: CASE WHEN summary.access_history IS NOT NULL THEN summary.access_history ELSE coalesce(m.access_history, []) END, last_access_time: CASE WHEN summary.last_access_time IS NOT NULL THEN summary.last_access_time ELSE m.last_access_time END, access_count: CASE WHEN summary.access_count IS NOT NULL THEN summary.access_count ELSE coalesce(m.access_count, 0) END } RETURN m.id AS uuid """ MEMORY_SUMMARY_STATEMENT_EDGE_SAVE = """ UNWIND $edges AS e MATCH (ms:MemorySummary {id: e.summary_id, run_id: e.run_id}) MATCH (c:Chunk {id: e.chunk_id, run_id: e.run_id}) MATCH (c)-[:CONTAINS]->(s:Statement {run_id: e.run_id}) MERGE (ms)-[r:DERIVED_FROM_STATEMENT]->(s) SET r.end_user_id = e.end_user_id, r.run_id = e.run_id, r.created_at = e.created_at, r.expired_at = e.expired_at RETURN elementId(r) AS uuid """ # Entity Merge Query MERGE_ENTITIES = """ MATCH (canonical:ExtractedEntity {id: $canonical_id}) MATCH (losing:ExtractedEntity {id: $losing_id}) // 更新canonical实体的aliases SET canonical.aliases = $merged_aliases // 转移所有从losing出发的关系到canonical WITH canonical, losing OPTIONAL MATCH (losing)-[r]->(target) WHERE NOT (canonical)-[:RELATES_TO]->(target) FOREACH (rel IN CASE WHEN r IS NOT NULL THEN [r] ELSE [] END | CREATE (canonical)-[:RELATES_TO { id: rel.id, relation_type: rel.relation_type, relation_value: rel.relation_value, statement: rel.statement, source_statement_id: rel.source_statement_id, valid_at: rel.valid_at, invalid_at: rel.invalid_at, end_user_id: rel.end_user_id, user_id: rel.user_id, apply_id: rel.apply_id, run_id: rel.run_id, created_at: rel.created_at, expired_at: rel.expired_at }]->(target) ) // 转移所有指向losing的关系到canonical WITH canonical, losing OPTIONAL MATCH (source)-[r]->(losing) WHERE NOT (source)-[:RELATES_TO]->(canonical) FOREACH (rel IN CASE WHEN r IS NOT NULL THEN [r] ELSE [] END | CREATE (source)-[:RELATES_TO { id: rel.id, relation_type: rel.relation_type, relation_value: rel.relation_value, statement: rel.statement, source_statement_id: rel.source_statement_id, valid_at: rel.valid_at, invalid_at: rel.invalid_at, end_user_id: rel.end_user_id, user_id: rel.user_id, apply_id: rel.apply_id, run_id: rel.run_id, created_at: rel.created_at, expired_at: rel.expired_at }]->(canonical) ) // 删除losing实体及其所有关系 WITH losing DETACH DELETE losing RETURN count(losing) as deleted """ neo4j_statement_part = ''' MATCH (n:Statement) WHERE n.end_user_id = "{}" AND datetime(n.created_at) >= datetime() - duration('P3D') RETURN n.statement as statement_name, n.id as statement_id, n.created_at as statement_created_at ''' neo4j_statement_all = ''' MATCH (n:Statement) WHERE n.end_user_id = "{}" RETURN n.statement as statement_name, n.id as statement_id ''' neo4j_query_part = """ MATCH (n)-[r]-(m:ExtractedEntity) WHERE n.end_user_id = "{}" AND datetime(n.created_at) >= datetime() - duration('P3D') WITH DISTINCT m OPTIONAL MATCH (m)-[rel]-(other:ExtractedEntity) RETURN elementId(m) as id, m.name as entity1_name, m.description as description, m.statement_id as statement_id, m.created_at as created_at, m.expired_at as expired_at, CASE WHEN rel IS NULL THEN "NO_RELATIONSHIP" ELSE type(rel) END as relationship_type, elementId(rel) as rel_id, rel.predicate as predicate, rel.statement as relationship, rel.statement_id as relationship_statement_id, CASE WHEN other IS NULL THEN "ISOLATED_NODE" ELSE other.name END as entity2_name, other as entity2 """ neo4j_query_all = """ MATCH (n)-[r]-(m:ExtractedEntity) WHERE n.end_user_id = "{}" WITH DISTINCT m OPTIONAL MATCH (m)-[rel]-(other:ExtractedEntity) RETURN elementId(m) as id, m.name as entity1_name, m.description as description, m.statement_id as statement_id, m.created_at as created_at, m.expired_at as expired_at, CASE WHEN rel IS NULL THEN "NO_RELATIONSHIP" ELSE type(rel) END as relationship_type, elementId(rel) as rel_id, rel.predicate as predicate, rel.statement as relationship, rel.statement_id as relationship_statement_id, CASE WHEN other IS NULL THEN "ISOLATED_NODE" ELSE other.name END as entity2_name, other as entity2 """ '''针对当前节点下扩长的句子,实体和总结''' Memory_Timeline_ExtractedEntity = """ MATCH (n)-[r1]-(e)-[r2]-(ms) WHERE elementId(n) = $id AND (ms:ExtractedEntity OR ms:MemorySummary) RETURN collect( DISTINCT CASE WHEN ms:ExtractedEntity THEN { text: ms.name, created_at: ms.created_at, type: "情景记忆" } END ) AS ExtractedEntity, collect( DISTINCT CASE WHEN ms:MemorySummary THEN { text: ms.content, created_at: ms.created_at, type: "长期沉淀" } END ) AS MemorySummary, collect( DISTINCT { text: e.statement, created_at: e.created_at, type: "情绪记忆" } ) AS statement; """ Memory_Timeline_MemorySummary = """ MATCH (n)-[r1]-(e)-[r2]-(ms) WHERE elementId(n) =$id AND (ms:MemorySummary OR ms:ExtractedEntity) RETURN collect( DISTINCT CASE WHEN ms:ExtractedEntity THEN { text: ms.name, created_at: ms.created_at, type: "情景记忆" } END ) AS ExtractedEntity, collect( DISTINCT CASE WHEN n:MemorySummary THEN { text: n.content, created_at: n.created_at, type: "长期沉淀" } END ) AS MemorySummary, collect( DISTINCT { text: e.statement, created_at: e.created_at, type: "情绪记忆" } ) AS statement; """ Memory_Timeline_Statement = """ MATCH (n) WHERE elementId(n) = $id CALL { WITH n MATCH (n)-[]-(m:ExtractedEntity) WHERE NOT m:MemorySummary AND NOT m:Chunk RETURN collect( DISTINCT { text: m.name, created_at: m.created_at, type: "情景记忆" } ) AS ExtractedEntity } CALL { WITH n MATCH (n)-[]-(m:MemorySummary) WHERE NOT m:Chunk RETURN collect( DISTINCT { text: m.content, created_at: m.created_at, type: "长期沉淀" } ) AS MemorySummary } RETURN ExtractedEntity, MemorySummary, { text: n.statement, created_at: n.created_at, type: "情绪记忆" } AS statement; """ '''针对当前节点,主要获取更加完整的句子节点''' Memory_Space_Emotion_Statement = """ MATCH (n) WHERE elementId(n) = $id RETURN n.emotion_intensity AS emotion_intensity, n.created_at AS created_at, n.emotion_type AS emotion_type, n.statement AS statement; """ Memory_Space_Emotion_MemorySummary = """ MATCH (n)-[]-(e) WHERE elementId(n) = $id AND EXISTS { MATCH (e)-[]-(ms) WHERE ms:MemorySummary OR ms:ExtractedEntity } RETURN DISTINCT e.emotion_intensity AS emotion_intensity, e.created_at AS created_at, e.emotion_type AS emotion_type, e.statement AS statement; """ Memory_Space_Emotion_ExtractedEntity = """ MATCH (n)-[]-(e) WHERE elementId(n) = $id AND EXISTS { MATCH (e)-[]-(ms:ExtractedEntity) } RETURN DISTINCT e.emotion_intensity AS emotion_intensity, e.created_at AS created_at, e.emotion_type AS emotion_type, e.statement AS statement; """ Memory_Space_User = """ MATCH (n)-[r]->(m) WHERE n.end_user_id = $end_user_id AND m.name="用户" return DISTINCT elementId(m) as id """ Memory_Space_Entity = """ MATCH (n)-[]-(m) WHERE elementId(m) = $id AND m.entity_type = "Person" RETURN DISTINCT m.name as name,m.end_user_id as end_user_id """ Memory_Space_Associative = """ MATCH (u)-[]-(x)-[]-(h) WHERE elementId(u) = $user_id AND elementId(h) = $id RETURN DISTINCT x.statement as statement,x.created_at as created_at """ Graph_Node_query = """ MATCH (n:MemorySummary) WHERE n.end_user_id = $end_user_id RETURN elementId(n) AS id, labels(n) AS labels, properties(n) AS properties, 0 AS priority LIMIT $limit UNION ALL MATCH (n:Dialogue) WHERE n.end_user_id = $end_user_id RETURN elementId(n) AS id, labels(n) AS labels, properties(n) AS properties, 1 AS priority LIMIT 1 UNION ALL MATCH (n:Statement) WHERE n.end_user_id = $end_user_id RETURN elementId(n) AS id, labels(n) AS labels, properties(n) AS properties, 1 AS priority LIMIT $limit UNION ALL MATCH (n:ExtractedEntity) WHERE n.end_user_id = $end_user_id RETURN elementId(n) AS id, labels(n) AS labels, properties(n) AS properties, 2 AS priority LIMIT $limit UNION ALL MATCH (n:Chunk) WHERE n.end_user_id = $end_user_id RETURN elementId(n) AS id, labels(n) AS labels, properties(n) AS properties, 3 AS priority LIMIT $limit UNION ALL MATCH (n:Perceptual) WHERE n.end_user_id = $end_user_id RETURN elementId(n) AS id, labels(n) AS labels, properties(n) AS properties, 4 AS priority """ # ============================================================ # Community 节点 & BELONGS_TO_COMMUNITY 边 # ============================================================ # ─── Community 聚类相关 Cypher 模板 ─────────────────────────────────────────── COMMUNITY_NODE_UPSERT = """ MERGE (c:Community {community_id: $community_id}) ON CREATE SET c.id = $community_id SET c.end_user_id = $end_user_id, c.member_count = $member_count, c.updated_at = datetime() RETURN c.community_id AS community_id """ ENTITY_JOIN_COMMUNITY = """ MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) MERGE (e)-[:BELONGS_TO_COMMUNITY]->(c) SET c.updated_at = datetime() RETURN e.id AS entity_id, c.community_id AS community_id """ ENTITY_LEAVE_ALL_COMMUNITIES = """ MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) MATCH (e)-[r:BELONGS_TO_COMMUNITY]->(:Community) DELETE r """ GET_ENTITY_NEIGHBORS = """ MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) // 来源一:直接关系邻居(EXTRACTED_RELATIONSHIP 边) OPTIONAL MATCH (e)-[:EXTRACTED_RELATIONSHIP]-(nb1:ExtractedEntity {end_user_id: $end_user_id}) // 来源二:同 Statement 共现邻居(REFERENCES_ENTITY 边) OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e) OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(nb2:ExtractedEntity {end_user_id: $end_user_id}) WHERE nb2.id <> e.id WITH collect(DISTINCT nb1) + collect(DISTINCT nb2) AS all_neighbors UNWIND all_neighbors AS nb WITH nb WHERE nb IS NOT NULL OPTIONAL MATCH (nb)-[:BELONGS_TO_COMMUNITY]->(c:Community) RETURN DISTINCT nb.id AS id, nb.name AS name, nb.name_embedding AS name_embedding, nb.activation_value AS activation_value, CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id """ GET_ALL_ENTITIES_FOR_USER = """ MATCH (e:ExtractedEntity {end_user_id: $end_user_id}) OPTIONAL MATCH (e)-[:BELONGS_TO_COMMUNITY]->(c:Community) RETURN e.id AS id, e.name AS name, e.name_embedding AS name_embedding, e.activation_value AS activation_value, CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id """ GET_ENTITY_COUNT_FOR_USER = """ MATCH (e:ExtractedEntity {end_user_id: $end_user_id}) RETURN count(e) AS entity_count """ GET_ALL_ENTITY_IDS_FOR_USER = """ MATCH (e:ExtractedEntity {end_user_id: $end_user_id}) RETURN e.id AS id """ GET_COMMUNITY_MEMBERS = """ MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community {community_id: $community_id}) RETURN e.id AS id, e.name AS name, e.entity_type AS entity_type, e.importance_score AS importance_score, e.activation_value AS activation_value, e.name_embedding AS name_embedding, e.aliases AS aliases, e.description AS description, e.example AS example ORDER BY coalesce(e.activation_value, 0) DESC """ GET_COMMUNITY_RELATIONSHIPS = """ MATCH (e1:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community {community_id: $community_id}) MATCH (e2:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c) MATCH (e1)-[r:EXTRACTED_RELATIONSHIP]->(e2) RETURN e1.name AS subject, r.predicate AS predicate, e2.name AS object ORDER BY e1.name, r.predicate, e2.name LIMIT 20 """ GET_ALL_COMMUNITY_MEMBERS_BATCH = """ MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community) RETURN c.community_id AS community_id, e.id AS id, e.name AS name, e.entity_type AS entity_type, e.importance_score AS importance_score, e.activation_value AS activation_value, e.name_embedding AS name_embedding, e.aliases AS aliases, e.description AS description ORDER BY c.community_id, coalesce(e.activation_value, 0) DESC """ CHECK_USER_HAS_COMMUNITIES = """ MATCH (c:Community {end_user_id: $end_user_id}) RETURN count(c) AS community_count """ UPDATE_COMMUNITY_MEMBER_COUNT = """ MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community {community_id: $community_id}) WITH c, count(e) AS cnt SET c.member_count = cnt RETURN c.community_id AS community_id, cnt AS member_count """ UPDATE_COMMUNITY_METADATA = """ MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) SET c.id = coalesce(c.id, $community_id), c.name = $name, c.summary = $summary, c.core_entities = $core_entities, c.summary_embedding = $summary_embedding, c.updated_at = datetime() RETURN c.community_id AS community_id """ BATCH_UPDATE_COMMUNITY_METADATA = """ UNWIND $communities AS row MATCH (c:Community {community_id: row.community_id, end_user_id: row.end_user_id}) SET c.id = coalesce(c.id, row.community_id), c.name = row.name, c.summary = row.summary, c.core_entities = row.core_entities, c.summary_embedding = row.summary_embedding, c.updated_at = datetime() RETURN c.community_id AS community_id """ GET_ENTITIES_PAGE = """ MATCH (e:ExtractedEntity {end_user_id: $end_user_id}) OPTIONAL MATCH (e)-[:BELONGS_TO_COMMUNITY]->(c:Community) RETURN e.id AS id, e.name AS name, e.name_embedding AS name_embedding, e.activation_value AS activation_value, CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id ORDER BY e.id SKIP $skip LIMIT $limit """ GET_ENTITY_NEIGHBORS_BATCH_FOR_IDS = """ // 批量拉取指定实体列表的邻居(用于分批全量聚类) MATCH (e:ExtractedEntity {end_user_id: $end_user_id}) WHERE e.id IN $entity_ids OPTIONAL MATCH (e)-[:EXTRACTED_RELATIONSHIP]-(nb1:ExtractedEntity {end_user_id: $end_user_id}) OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e) OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(nb2:ExtractedEntity {end_user_id: $end_user_id}) WHERE nb2.id <> e.id WITH e, collect(DISTINCT nb1) + collect(DISTINCT nb2) AS all_neighbors UNWIND all_neighbors AS nb WITH e, nb WHERE nb IS NOT NULL OPTIONAL MATCH (nb)-[:BELONGS_TO_COMMUNITY]->(c:Community) RETURN DISTINCT e.id AS entity_id, nb.id AS id, nb.name AS name, nb.name_embedding AS name_embedding, nb.activation_value AS activation_value, CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id """ GET_ALL_ENTITY_NEIGHBORS_BATCH = """ // 批量拉取某用户下所有实体的邻居(用于全量聚类预加载) MATCH (e:ExtractedEntity {end_user_id: $end_user_id}) // 来源一:直接关系邻居 OPTIONAL MATCH (e)-[:EXTRACTED_RELATIONSHIP]-(nb1:ExtractedEntity {end_user_id: $end_user_id}) // 来源二:同 Statement 共现邻居 OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e) OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(nb2:ExtractedEntity {end_user_id: $end_user_id}) WHERE nb2.id <> e.id WITH e, collect(DISTINCT nb1) + collect(DISTINCT nb2) AS all_neighbors UNWIND all_neighbors AS nb WITH e, nb WHERE nb IS NOT NULL OPTIONAL MATCH (nb)-[:BELONGS_TO_COMMUNITY]->(c:Community) RETURN DISTINCT e.id AS entity_id, nb.id AS id, nb.name AS name, nb.name_embedding AS name_embedding, nb.activation_value AS activation_value, CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id """ GET_COMMUNITY_GRAPH_DATA = """ MATCH (c:Community {end_user_id: $end_user_id}) MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[b:BELONGS_TO_COMMUNITY]->(c) OPTIONAL MATCH (e)-[r:EXTRACTED_RELATIONSHIP]-(e2:ExtractedEntity {end_user_id: $end_user_id}) RETURN elementId(c) AS c_id, properties(c) AS c_props, elementId(e) AS e_id, properties(e) AS e_props, elementId(b) AS b_id, elementId(e2) AS e2_id, properties(e2) AS e2_props, elementId(r) AS r_id, type(r) AS r_type, properties(r) AS r_props, startNode(r) = e AS r_from_e """ CHECK_COMMUNITY_IS_COMPLETE = """ MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) RETURN ( c.name IS NOT NULL AND c.name <> '' AND c.summary IS NOT NULL AND c.summary <> '' AND c.core_entities IS NOT NULL ) AS is_complete """ CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING = """ MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) RETURN ( c.name IS NOT NULL AND c.name <> '' AND c.summary IS NOT NULL AND c.summary <> '' AND c.core_entities IS NOT NULL AND c.summary_embedding IS NOT NULL ) AS is_complete """ GET_INCOMPLETE_COMMUNITIES = """ MATCH (c:Community {end_user_id: $end_user_id}) WHERE c.name IS NULL OR c.summary IS NULL OR c.core_entities IS NULL OR c.name = '' OR c.summary = '' RETURN c.community_id AS community_id """ GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING = """ MATCH (c:Community {end_user_id: $end_user_id}) WHERE c.name IS NULL OR c.name = '' OR c.summary IS NULL OR c.summary = '' OR c.core_entities IS NULL OR (c.summary_embedding IS NULL AND c.summary IS NOT NULL AND c.summary <> '(empty)') RETURN c.community_id AS community_id """ # Community 向量检索 ────────────────────────────────────────────────── # Community embedding-based search: cosine similarity on Community.summary_embedding COMMUNITY_EMBEDDING_SEARCH = """ CALL db.index.vector.queryNodes('community_summary_embedding_index', $limit * 100, $embedding) YIELD node AS c, score WHERE c.summary_embedding IS NOT NULL AND ($end_user_id IS NULL OR c.end_user_id = $end_user_id) RETURN c.community_id AS id, c.name AS name, c.summary AS content, c.core_entities AS core_entities, c.member_count AS member_count, c.end_user_id AS end_user_id, c.updated_at AS updated_at, score ORDER BY score DESC LIMIT $limit """ # Community 展开检索 ────────────────────────────────────────────────── # 命中社区后,拉取该社区所有成员实体关联的 Statement 节点(主题→细节两级检索) EXPAND_COMMUNITY_STATEMENTS = """ MATCH (c:Community {community_id: $community_id}) MATCH (e:ExtractedEntity)-[:BELONGS_TO_COMMUNITY]->(c) MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e) WHERE s.end_user_id = $end_user_id RETURN s.statement AS statement, s.id AS id, s.end_user_id AS end_user_id, s.created_at AS created_at, s.valid_at AS valid_at, s.invalid_at AS invalid_at, COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value, COALESCE(s.importance_score, 0.5) AS importance_score, e.name AS source_entity, c.name AS community_name ORDER BY COALESCE(s.activation_value, 0) DESC LIMIT $limit """ # 感知记忆节点保存 PERCEPTUAL_NODE_SAVE = """ UNWIND $perceptuals AS p MERGE (n:Perceptual {id: p.id}) SET n += { id: p.id, end_user_id: p.end_user_id, perceptual_type: p.perceptual_type, file_path: p.file_path, file_name: p.file_name, file_ext: p.file_ext, summary: p.summary, keywords: p.keywords, topic: p.topic, domain: p.domain, created_at: p.created_at, file_type: p.file_type, summary_embedding: p.summary_embedding } RETURN n.id AS uuid """ # 感知记忆与对话的关联边 PERCEPTUAL_CHUNK_EDGE_SAVE = """ UNWIND $edges AS edge MATCH (p:Perceptual {id: edge.perceptual_id, end_user_id: edge.end_user_id}) MATCH (c:Chunk {id: edge.chunk_id, end_user_id: edge.end_user_id}) MERGE (c)-[r:HAS_PERCEPTUAL]->(p) ON CREATE SET r.end_user_id = edge.end_user_id, r.created_at = edge.created_at RETURN elementId(r) AS uuid """ # ------------------- # search by user id # ------------------- SEARCH_PERCEPTUAL_BY_USER_ID = """ MATCH (p:Perceptual) WHERE p.end_user_id = $end_user_id RETURN p.id AS id, p.summary_embedding AS embedding """ SEARCH_STATEMENTS_BY_USER_ID = """ MATCH (s:Statement) WHERE s.end_user_id = $end_user_id RETURN s.id AS id, s.statement_embedding AS embedding """ SEARCH_ENTITIES_BY_USER_ID = """ MATCH (e:ExtractedEntity) WHERE e.end_user_id = $end_user_id RETURN e.id AS id, e.name_embedding AS embedding """ SEARCH_CHUNKS_BY_USER_ID = """ MATCH (c:Chunk) WHERE c.end_user_id = $end_user_id RETURN c.id AS id, c.chunk_embedding AS embedding """ SEARCH_MEMORY_SUMMARIES_BY_USER_ID = """ MATCH (s:MemorySummary) WHERE s.end_user_id = $end_user_id RETURN s.id AS id, s.summary_embedding AS embedding """ SEARCH_COMMUNITIES_BY_USER_ID = """ MATCH (c:Community) WHERE c.end_user_id = $end_user_id RETURN c.community_id AS id, c.summary_embedding AS embedding """ # ------------------- # search by id # ------------------- SEARCH_PERCEPTUAL_BY_IDS = """ MATCH (p:Perceptual) WHERE p.id IN $ids RETURN p.id AS id, p.end_user_id AS end_user_id, p.perceptual_type AS perceptual_type, p.file_path AS file_path, p.file_name AS file_name, p.file_ext AS file_ext, p.summary AS summary, p.keywords AS keywords, p.topic AS topic, p.domain AS domain, p.created_at AS created_at, p.file_type AS file_type """ SEARCH_STATEMENTS_BY_IDS = """ MATCH (s:Statement) WHERE s.id IN $ids RETURN s.id AS id, s.statement AS statement, s.end_user_id AS end_user_id, s.chunk_id AS chunk_id, s.created_at AS created_at, s.expired_at AS expired_at, s.valid_at AS valid_at, properties(s)['invalid_at'] AS invalid_at, COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value, COALESCE(s.importance_score, 0.5) AS importance_score, s.last_access_time AS last_access_time, COALESCE(s.access_count, 0) AS access_count """ SEARCH_CHUNKS_BY_IDS = """ MATCH (c:Chunk) WHERE c.id IN $ids RETURN c.id AS id, c.end_user_id AS end_user_id, c.content AS content, c.dialog_id AS dialog_id, COALESCE(c.activation_value, 0.5) AS activation_value, c.last_access_time AS last_access_time, COALESCE(c.access_count, 0) AS access_count """ SEARCH_ENTITIES_BY_IDS = """ MATCH (e:ExtractedEntity) WHERE e.id IN $ids RETURN e.id AS id, e.name AS name, e.end_user_id AS end_user_id, e.entity_type AS entity_type, COALESCE(e.activation_value, e.importance_score, 0.5) AS activation_value, COALESCE(e.importance_score, 0.5) AS importance_score, e.last_access_time AS last_access_time, COALESCE(e.access_count, 0) AS access_count """ SEARCH_MEMORY_SUMMARIES_BY_IDS = """ MATCH (m:MemorySummary) WHERE m.id IN $ids RETURN m.id AS id, m.name AS name, m.end_user_id AS end_user_id, m.dialog_id AS dialog_id, m.chunk_ids AS chunk_ids, m.content AS content, m.created_at AS created_at, COALESCE(m.activation_value, m.importance_score, 0.5) AS activation_value, COALESCE(m.importance_score, 0.5) AS importance_score, m.last_access_time AS last_access_time, COALESCE(m.access_count, 0) AS access_count """ SEARCH_COMMUNITIES_BY_IDS = """ MATCH (c:Community) WHERE c.id IN $ids RETURN c.id AS id, c.name AS name, c.summary AS content, c.core_entities AS core_entities, c.member_count AS member_count, c.end_user_id AS end_user_id, c.updated_at AS updated_at """ # ------------------- # search by fulltext # ------------------- SEARCH_PERCEPTUALS_BY_KEYWORD = """ CALL db.index.fulltext.queryNodes("perceptualFulltext", $query) YIELD node AS p, score WHERE p.end_user_id = $end_user_id RETURN p.id AS id, p.end_user_id AS end_user_id, p.perceptual_type AS perceptual_type, p.file_path AS file_path, p.file_name AS file_name, p.file_ext AS file_ext, p.summary AS summary, p.keywords AS keywords, p.topic AS topic, p.domain AS domain, p.created_at AS created_at, p.file_type AS file_type, score ORDER BY score DESC LIMIT $limit """ SEARCH_STATEMENTS_BY_KEYWORD = """ CALL db.index.fulltext.queryNodes("statementsFulltext", $query) YIELD node AS s, score WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id) OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s) OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity) RETURN s.id AS id, s.statement AS statement, s.end_user_id AS end_user_id, s.chunk_id AS chunk_id, s.created_at AS created_at, s.expired_at AS expired_at, s.valid_at AS valid_at, properties(s)['invalid_at'] AS invalid_at, c.id AS chunk_id_from_rel, collect(DISTINCT e.id) AS entity_ids, COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value, COALESCE(s.importance_score, 0.5) AS importance_score, s.last_access_time AS last_access_time, COALESCE(s.access_count, 0) AS access_count, score ORDER BY score DESC LIMIT $limit """ SEARCH_ENTITIES_BY_NAME_OR_ALIAS = """ CALL db.index.fulltext.queryNodes("entitiesFulltext", $query) YIELD node AS e, score WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id) WITH e, score With collect({entity: e, score: score}) AS fulltextResults OPTIONAL MATCH (ae:ExtractedEntity) WHERE ($end_user_id IS NULL OR ae.end_user_id = $end_user_id) AND ae.aliases IS NOT NULL AND ANY(alias IN ae.aliases WHERE toLower(alias) CONTAINS toLower($query)) WITH fulltextResults, collect(ae) AS aliasEntities UNWIND (fulltextResults + [x IN aliasEntities | {entity: x, score: CASE WHEN ANY(alias IN x.aliases WHERE toLower(alias) = toLower($query)) THEN 1.0 WHEN ANY(alias IN x.aliases WHERE toLower(alias) STARTS WITH toLower($query)) THEN 0.9 ELSE 0.8 END }]) AS row WITH row.entity AS e, row.score AS score WITH DISTINCT e, MAX(score) AS score OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e) OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s) RETURN e.id AS id, e.name AS name, e.end_user_id AS end_user_id, e.entity_type AS entity_type, e.created_at AS created_at, e.expired_at AS expired_at, e.entity_idx AS entity_idx, e.statement_id AS statement_id, e.description AS description, e.aliases AS aliases, e.name_embedding AS name_embedding, e.connect_strength AS connect_strength, collect(DISTINCT s.id) AS statement_ids, collect(DISTINCT c.id) AS chunk_ids, COALESCE(e.activation_value, e.importance_score, 0.5) AS activation_value, COALESCE(e.importance_score, 0.5) AS importance_score, e.last_access_time AS last_access_time, COALESCE(e.access_count, 0) AS access_count, score ORDER BY score DESC LIMIT $limit """ SEARCH_CHUNKS_BY_CONTENT = """ CALL db.index.fulltext.queryNodes("chunksFulltext", $query) YIELD node AS c, score WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id) OPTIONAL MATCH (c)-[:CONTAINS]->(s:Statement) OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity) RETURN c.id AS id, c.end_user_id AS end_user_id, c.content AS content, c.dialog_id AS dialog_id, c.sequence_number AS sequence_number, collect(DISTINCT s.id) AS statement_ids, collect(DISTINCT e.id) AS entity_ids, COALESCE(c.activation_value, 0.5) AS activation_value, c.last_access_time AS last_access_time, COALESCE(c.access_count, 0) AS access_count, score ORDER BY score DESC LIMIT $limit """ # MemorySummary keyword search using fulltext index SEARCH_MEMORY_SUMMARIES_BY_KEYWORD = """ CALL db.index.fulltext.queryNodes("summariesFulltext", $query) YIELD node AS m, score WHERE ($end_user_id IS NULL OR m.end_user_id = $end_user_id) OPTIONAL MATCH (m)-[:DERIVED_FROM_STATEMENT]->(s:Statement) RETURN m.id AS id, m.name AS name, m.end_user_id AS end_user_id, m.dialog_id AS dialog_id, m.chunk_ids AS chunk_ids, m.content AS content, m.created_at AS created_at, COALESCE(m.activation_value, m.importance_score, 0.5) AS activation_value, COALESCE(m.importance_score, 0.5) AS importance_score, m.last_access_time AS last_access_time, COALESCE(m.access_count, 0) AS access_count, score ORDER BY score DESC LIMIT $limit """ # Community keyword search: matches name or summary via fulltext index SEARCH_COMMUNITIES_BY_KEYWORD = """ CALL db.index.fulltext.queryNodes("communitiesFulltext", $query) YIELD node AS c, score WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id) RETURN c.community_id AS id, c.name AS name, c.summary AS content, c.core_entities AS core_entities, c.member_count AS member_count, c.end_user_id AS end_user_id, c.updated_at AS updated_at, score ORDER BY score DESC LIMIT $limit """ FULLTEXT_QUERY_CYPHER_MAPPING = { Neo4jNodeType.STATEMENT: SEARCH_STATEMENTS_BY_KEYWORD, Neo4jNodeType.EXTRACTEDENTITY: SEARCH_ENTITIES_BY_NAME_OR_ALIAS, Neo4jNodeType.CHUNK: SEARCH_CHUNKS_BY_CONTENT, Neo4jNodeType.MEMORYSUMMARY: SEARCH_MEMORY_SUMMARIES_BY_KEYWORD, Neo4jNodeType.COMMUNITY: SEARCH_COMMUNITIES_BY_KEYWORD, Neo4jNodeType.PERCEPTUAL: SEARCH_PERCEPTUALS_BY_KEYWORD } USER_ID_QUERY_CYPHER_MAPPING = { Neo4jNodeType.STATEMENT: SEARCH_STATEMENTS_BY_USER_ID, Neo4jNodeType.EXTRACTEDENTITY: SEARCH_ENTITIES_BY_USER_ID, Neo4jNodeType.CHUNK: SEARCH_CHUNKS_BY_USER_ID, Neo4jNodeType.MEMORYSUMMARY: SEARCH_MEMORY_SUMMARIES_BY_USER_ID, Neo4jNodeType.COMMUNITY: SEARCH_COMMUNITIES_BY_USER_ID, Neo4jNodeType.PERCEPTUAL: SEARCH_PERCEPTUAL_BY_USER_ID } NODE_ID_QUERY_CYPHER_MAPPING = { Neo4jNodeType.STATEMENT: SEARCH_STATEMENTS_BY_IDS, Neo4jNodeType.EXTRACTEDENTITY: SEARCH_ENTITIES_BY_IDS, Neo4jNodeType.CHUNK: SEARCH_CHUNKS_BY_IDS, Neo4jNodeType.MEMORYSUMMARY: SEARCH_MEMORY_SUMMARIES_BY_IDS, Neo4jNodeType.COMMUNITY: SEARCH_COMMUNITIES_BY_IDS, Neo4jNodeType.PERCEPTUAL: SEARCH_PERCEPTUAL_BY_IDS }