feat(memory): implement quick search pipeline with Neo4j integration

This commit is contained in:
Eternity
2026-04-15 12:18:23 +08:00
parent dca3173ed9
commit 2716a55c7f
19 changed files with 899 additions and 574 deletions

View File

@@ -1,3 +1,4 @@
from app.core.memory.enums import Neo4jNodeType
DIALOGUE_NODE_SAVE = """
UNWIND $dialogues AS dialogue
@@ -147,57 +148,6 @@ SET r.predicate = rel.predicate,
RETURN elementId(r) AS uuid
"""
# 在 Neo4j 5及后续版本中id() 函数已被标记为弃用用elementId() 函数替代
# 保存弱关系实体,设置 e.is_weak = true不维护 e.relations 聚合字段
WEAK_ENTITY_NODE_SAVE = """
UNWIND $weak_entities AS entity
MERGE (e:ExtractedEntity {id: entity.id, run_id: entity.run_id})
SET e += {
name: entity.name,
end_user_id: entity.end_user_id,
run_id: entity.run_id,
description: entity.description,
chunk_id: entity.chunk_id,
dialog_id: entity.dialog_id
}
// Independent weak flag仅标记弱关系不再维护 relations 聚合字段
SET e.is_weak = true
RETURN e.id AS id
"""
# 为强关系三元组中的主语和宾语创建/更新实体节点,仅设置 e.is_strong = true不维护 e.relations 字段
SAVE_STRONG_TRIPLE_ENTITIES = """
UNWIND $items AS item
MERGE (s:ExtractedEntity {id: item.source_id, run_id: item.run_id})
SET s += {name: item.subject, end_user_id: item.end_user_id, run_id: item.run_id}
// Independent strong flag
SET s.is_strong = true
MERGE (o:ExtractedEntity {id: item.target_id, run_id: item.run_id})
SET o += {name: item.object, end_user_id: item.end_user_id, run_id: item.run_id}
// Independent strong flag
SET o.is_strong = true
"""
DIALOGUE_STATEMENT_EDGE_SAVE = """
UNWIND $dialogue_statement_edges AS edge
// 支持按 uuid 或 ref_id 连接到 Dialogue避免因来源 ID 不一致而断链
MATCH (dialogue:Dialogue)
WHERE dialogue.uuid = edge.source OR dialogue.ref_id = edge.source
MATCH (statement:Statement {id: edge.target})
// 仅按端点去重,关系属性可更新
MERGE (dialogue)-[e:MENTIONS]->(statement)
SET e.uuid = edge.id,
e.end_user_id = edge.end_user_id,
e.created_at = edge.created_at,
e.expired_at = edge.expired_at
RETURN e.uuid AS uuid
"""
# 在 Neo4j 5及后续版本中id() 函数已被标记为弃用用elementId() 函数替代
CHUNK_STATEMENT_EDGE_SAVE = """
UNWIND $chunk_statement_edges AS edge
MATCH (statement:Statement {id: edge.source, run_id: edge.run_id})
@@ -226,87 +176,6 @@ SET r.end_user_id = rel.end_user_id,
RETURN elementId(r) AS uuid
"""
ENTITY_EMBEDDING_SEARCH = """
CALL db.index.vector.queryNodes('entity_embedding_index', $limit * 100, $embedding)
YIELD node AS e, score
WHERE e.name_embedding IS NOT NULL
AND ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
RETURN e.id AS id,
e.name AS name,
e.end_user_id AS end_user_id,
e.entity_type AS entity_type,
COALESCE(e.activation_value, e.importance_score, 0.5) AS activation_value,
COALESCE(e.importance_score, 0.5) AS importance_score,
e.last_access_time AS last_access_time,
COALESCE(e.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
# Embedding-based search: cosine similarity on Statement.statement_embedding
STATEMENT_EMBEDDING_SEARCH = """
CALL db.index.vector.queryNodes('statement_embedding_index', $limit * 100, $embedding)
YIELD node AS s, score
WHERE s.statement_embedding IS NOT NULL
AND ($end_user_id IS NULL OR s.end_user_id = $end_user_id)
RETURN s.id AS id,
s.statement AS statement,
s.end_user_id AS end_user_id,
s.chunk_id AS chunk_id,
s.created_at AS created_at,
s.expired_at AS expired_at,
s.valid_at AS valid_at,
s.invalid_at AS invalid_at,
COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value,
COALESCE(s.importance_score, 0.5) AS importance_score,
s.last_access_time AS last_access_time,
COALESCE(s.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
# Embedding-based search: cosine similarity on Chunk.chunk_embedding
CHUNK_EMBEDDING_SEARCH = """
CALL db.index.vector.queryNodes('chunk_embedding_index', $limit * 100, $embedding)
YIELD node AS c, score
WHERE c.chunk_embedding IS NOT NULL
AND ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
RETURN c.id AS chunk_id,
c.end_user_id AS end_user_id,
c.content AS content,
c.dialog_id AS dialog_id,
COALESCE(c.activation_value, 0.5) AS activation_value,
c.last_access_time AS last_access_time,
COALESCE(c.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
SEARCH_STATEMENTS_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("statementsFulltext", $query) YIELD node AS s, score
WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity)
RETURN s.id AS id,
s.statement AS statement,
s.end_user_id AS end_user_id,
s.chunk_id AS chunk_id,
s.created_at AS created_at,
s.expired_at AS expired_at,
s.valid_at AS valid_at,
s.invalid_at AS invalid_at,
c.id AS chunk_id_from_rel,
collect(DISTINCT e.id) AS entity_ids,
COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value,
COALESCE(s.importance_score, 0.5) AS importance_score,
s.last_access_time AS last_access_time,
COALESCE(s.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
# 查询实体名称包含指定字符串的实体
SEARCH_ENTITIES_BY_NAME = """
CALL db.index.fulltext.queryNodes("entitiesFulltext", $query) YIELD node AS e, score
@@ -338,73 +207,6 @@ ORDER BY score DESC
LIMIT $limit
"""
SEARCH_ENTITIES_BY_NAME_OR_ALIAS = """
CALL db.index.fulltext.queryNodes("entitiesFulltext", $query) YIELD node AS e, score
WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
WITH e, score
With collect({entity: e, score: score}) AS fulltextResults
OPTIONAL MATCH (ae:ExtractedEntity)
WHERE ($end_user_id IS NULL OR ae.end_user_id = $end_user_id)
AND ae.aliases IS NOT NULL
AND ANY(alias IN ae.aliases WHERE toLower(alias) CONTAINS toLower($query))
WITH fulltextResults, collect(ae) AS aliasEntities
UNWIND (fulltextResults + [x IN aliasEntities | {entity: x, score:
CASE
WHEN ANY(alias IN x.aliases WHERE toLower(alias) = toLower($query)) THEN 1.0
WHEN ANY(alias IN x.aliases WHERE toLower(alias) STARTS WITH toLower($query)) THEN 0.9
ELSE 0.8
END
}]) AS row
WITH row.entity AS e, row.score AS score
WITH DISTINCT e, MAX(score) AS score
OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
RETURN e.id AS id,
e.name AS name,
e.end_user_id AS end_user_id,
e.entity_type AS entity_type,
e.created_at AS created_at,
e.expired_at AS expired_at,
e.entity_idx AS entity_idx,
e.statement_id AS statement_id,
e.description AS description,
e.aliases AS aliases,
e.name_embedding AS name_embedding,
e.connect_strength AS connect_strength,
collect(DISTINCT s.id) AS statement_ids,
collect(DISTINCT c.id) AS chunk_ids,
COALESCE(e.activation_value, e.importance_score, 0.5) AS activation_value,
COALESCE(e.importance_score, 0.5) AS importance_score,
e.last_access_time AS last_access_time,
COALESCE(e.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
SEARCH_CHUNKS_BY_CONTENT = """
CALL db.index.fulltext.queryNodes("chunksFulltext", $query) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
OPTIONAL MATCH (c)-[:CONTAINS]->(s:Statement)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity)
RETURN c.id AS chunk_id,
c.end_user_id AS end_user_id,
c.content AS content,
c.dialog_id AS dialog_id,
c.sequence_number AS sequence_number,
collect(DISTINCT s.id) AS statement_ids,
collect(DISTINCT e.id) AS entity_ids,
COALESCE(c.activation_value, 0.5) AS activation_value,
c.last_access_time AS last_access_time,
COALESCE(c.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
# 以下是关于第二层去重消歧与数据库进行检索的语句,在最近的规划中不再使用
# # 同组group_id下按“精确名字或别名+可选类型一致”来检索
@@ -677,49 +479,6 @@ MATCH (n:Statement {end_user_id: $end_user_id, id: $id})
SET n.invalid_at = $new_invalid_at
"""
# MemorySummary keyword search using fulltext index
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("summariesFulltext", $query) YIELD node AS m, score
WHERE ($end_user_id IS NULL OR m.end_user_id = $end_user_id)
OPTIONAL MATCH (m)-[:DERIVED_FROM_STATEMENT]->(s:Statement)
RETURN m.id AS id,
m.name AS name,
m.end_user_id AS end_user_id,
m.dialog_id AS dialog_id,
m.chunk_ids AS chunk_ids,
m.content AS content,
m.created_at AS created_at,
COALESCE(m.activation_value, m.importance_score, 0.5) AS activation_value,
COALESCE(m.importance_score, 0.5) AS importance_score,
m.last_access_time AS last_access_time,
COALESCE(m.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
# Embedding-based search: cosine similarity on MemorySummary.summary_embedding
MEMORY_SUMMARY_EMBEDDING_SEARCH = """
CALL db.index.vector.queryNodes('summary_embedding_index', $limit * 100, $embedding)
YIELD node AS m, score
WHERE m.summary_embedding IS NOT NULL
AND ($end_user_id IS NULL OR m.end_user_id = $end_user_id)
RETURN m.id AS id,
m.name AS name,
m.end_user_id AS end_user_id,
m.dialog_id AS dialog_id,
m.chunk_ids AS chunk_ids,
m.content AS content,
m.created_at AS created_at,
COALESCE(m.activation_value, m.importance_score, 0.5) AS activation_value,
COALESCE(m.importance_score, 0.5) AS importance_score,
m.last_access_time AS last_access_time,
COALESCE(m.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
MEMORY_SUMMARY_NODE_SAVE = """
UNWIND $summaries AS summary
MERGE (m:MemorySummary {id: summary.id})
@@ -1030,8 +789,6 @@ RETURN DISTINCT
e.statement AS statement;
"""
'''获取实体'''
Memory_Space_User = """
MATCH (n)-[r]->(m)
WHERE n.end_user_id = $end_user_id AND m.name="用户"
@@ -1363,22 +1120,6 @@ WHERE c.name IS NULL OR c.name = ''
RETURN c.community_id AS community_id
"""
# Community keyword search: matches name or summary via fulltext index
SEARCH_COMMUNITIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("communitiesFulltext", $query) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
RETURN c.community_id AS id,
c.name AS name,
c.summary AS content,
c.core_entities AS core_entities,
c.member_count AS member_count,
c.end_user_id AS end_user_id,
c.updated_at AS updated_at,
score
ORDER BY score DESC
LIMIT $limit
"""
# Community 向量检索 ──────────────────────────────────────────────────
# Community embedding-based search: cosine similarity on Community.summary_embedding
COMMUNITY_EMBEDDING_SEARCH = """
@@ -1452,13 +1193,54 @@ ON CREATE SET r.end_user_id = edge.end_user_id,
RETURN elementId(r) AS uuid
"""
# -------------------
# search by user id
# -------------------
SEARCH_PERCEPTUAL_BY_USER_ID = """
MATCH (p:Perceptual)
WHERE p.end_user_id = $end_user_id
RETURN p.id AS id,
p.summary_embedding AS summary_embedding
p.summary_embedding AS embedding
"""
SEARCH_STATEMENTS_BY_USER_ID = """
MATCH (s:Statement)
WHERE s.end_user_id = $end_user_id
RETURN s.id AS id,
s.statement_embedding AS embedding
"""
SEARCH_ENTITIES_BY_USER_ID = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id
RETURN e.id AS id,
e.name_embedding AS embedding
"""
SEARCH_CHUNKS_BY_USER_ID = """
MATCH (c:Chunk)
WHERE c.end_user_id = $end_user_id
RETURN c.id AS id,
c.chunk_embedding AS embedding
"""
SEARCH_MEMORY_SUMMARIES_BY_USER_ID = """
MATCH (s:MemorySummary)
WHERE s.end_user_id = $end_user_id
RETURN s.id AS id,
s.summary_embedding AS embedding
"""
SEARCH_COMMUNITIES_BY_USER_ID = """
MATCH (c:Community)
WHERE c.end_user_id = $end_user_id
RETURN c.id AS id,
c.summary_embedding AS embedding
"""
# -------------------
# search by id
# -------------------
SEARCH_PERCEPTUAL_BY_IDS = """
MATCH (p:Perceptual)
WHERE p.id IN $ids
@@ -1476,7 +1258,79 @@ RETURN p.id AS id,
p.file_type AS file_type
"""
SEARCH_PERCEPTUAL_BY_KEYWORD = """
SEARCH_STATEMENTS_BY_IDS = """
MATCH (s:Statement)
WHERE s.id IN $ids
RETURN s.id AS id,
s.statement AS statement,
s.end_user_id AS end_user_id,
s.chunk_id AS chunk_id,
s.created_at AS created_at,
s.expired_at AS expired_at,
s.valid_at AS valid_at,
properties(s)['invalid_at'] AS invalid_at,
COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value,
COALESCE(s.importance_score, 0.5) AS importance_score,
s.last_access_time AS last_access_time,
COALESCE(s.access_count, 0) AS access_count
"""
SEARCH_CHUNKS_BY_IDS = """
MATCH (c:Chunk)
WHERE c.id IN $ids
RETURN c.id AS id,
c.end_user_id AS end_user_id,
c.content AS content,
c.dialog_id AS dialog_id,
COALESCE(c.activation_value, 0.5) AS activation_value,
c.last_access_time AS last_access_time,
COALESCE(c.access_count, 0) AS access_count
"""
SEARCH_ENTITIES_BY_IDS = """
MATCH (e:ExtractedEntity)
WHERE e.id IN $ids
RETURN e.id AS id,
e.name AS name,
e.end_user_id AS end_user_id,
e.entity_type AS entity_type,
COALESCE(e.activation_value, e.importance_score, 0.5) AS activation_value,
COALESCE(e.importance_score, 0.5) AS importance_score,
e.last_access_time AS last_access_time,
COALESCE(e.access_count, 0) AS access_count
"""
SEARCH_MEMORY_SUMMARIES_BY_IDS = """
MATCH (m:MemorySummary)
WHERE m.id IN $ids
RETURN m.id AS id,
m.name AS name,
m.end_user_id AS end_user_id,
m.dialog_id AS dialog_id,
m.chunk_ids AS chunk_ids,
m.content AS content,
m.created_at AS created_at,
COALESCE(m.activation_value, m.importance_score, 0.5) AS activation_value,
COALESCE(m.importance_score, 0.5) AS importance_score,
m.last_access_time AS last_access_time,
COALESCE(m.access_count, 0) AS access_count
"""
SEARCH_COMMUNITIES_BY_IDS = """
MATCH (c:Community)
WHERE c.id IN $ids
RETURN c.id AS id,
c.name AS name,
c.summary AS content,
c.core_entities AS core_entities,
c.member_count AS member_count,
c.end_user_id AS end_user_id,
c.updated_at AS updated_at
"""
# -------------------
# search by fulltext
# -------------------
SEARCH_PERCEPTUALS_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("perceptualFulltext", $query) YIELD node AS p, score
WHERE p.end_user_id = $end_user_id
RETURN p.id AS id,
@@ -1495,3 +1349,155 @@ RETURN p.id AS id,
ORDER BY score DESC
LIMIT $limit
"""
SEARCH_STATEMENTS_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("statementsFulltext", $query) YIELD node AS s, score
WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity)
RETURN s.id AS id,
s.statement AS statement,
s.end_user_id AS end_user_id,
s.chunk_id AS chunk_id,
s.created_at AS created_at,
s.expired_at AS expired_at,
s.valid_at AS valid_at,
properties(s)['invalid_at'] AS invalid_at,
c.id AS chunk_id_from_rel,
collect(DISTINCT e.id) AS entity_ids,
COALESCE(s.activation_value, s.importance_score, 0.5) AS activation_value,
COALESCE(s.importance_score, 0.5) AS importance_score,
s.last_access_time AS last_access_time,
COALESCE(s.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
SEARCH_ENTITIES_BY_NAME_OR_ALIAS = """
CALL db.index.fulltext.queryNodes("entitiesFulltext", $query) YIELD node AS e, score
WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
WITH e, score
With collect({entity: e, score: score}) AS fulltextResults
OPTIONAL MATCH (ae:ExtractedEntity)
WHERE ($end_user_id IS NULL OR ae.end_user_id = $end_user_id)
AND ae.aliases IS NOT NULL
AND ANY(alias IN ae.aliases WHERE toLower(alias) CONTAINS toLower($query))
WITH fulltextResults, collect(ae) AS aliasEntities
UNWIND (fulltextResults + [x IN aliasEntities | {entity: x, score:
CASE
WHEN ANY(alias IN x.aliases WHERE toLower(alias) = toLower($query)) THEN 1.0
WHEN ANY(alias IN x.aliases WHERE toLower(alias) STARTS WITH toLower($query)) THEN 0.9
ELSE 0.8
END
}]) AS row
WITH row.entity AS e, row.score AS score
WITH DISTINCT e, MAX(score) AS score
OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
RETURN e.id AS id,
e.name AS name,
e.end_user_id AS end_user_id,
e.entity_type AS entity_type,
e.created_at AS created_at,
e.expired_at AS expired_at,
e.entity_idx AS entity_idx,
e.statement_id AS statement_id,
e.description AS description,
e.aliases AS aliases,
e.name_embedding AS name_embedding,
e.connect_strength AS connect_strength,
collect(DISTINCT s.id) AS statement_ids,
collect(DISTINCT c.id) AS chunk_ids,
COALESCE(e.activation_value, e.importance_score, 0.5) AS activation_value,
COALESCE(e.importance_score, 0.5) AS importance_score,
e.last_access_time AS last_access_time,
COALESCE(e.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
SEARCH_CHUNKS_BY_CONTENT = """
CALL db.index.fulltext.queryNodes("chunksFulltext", $query) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
OPTIONAL MATCH (c)-[:CONTAINS]->(s:Statement)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity)
RETURN c.id AS id,
c.end_user_id AS end_user_id,
c.content AS content,
c.dialog_id AS dialog_id,
c.sequence_number AS sequence_number,
collect(DISTINCT s.id) AS statement_ids,
collect(DISTINCT e.id) AS entity_ids,
COALESCE(c.activation_value, 0.5) AS activation_value,
c.last_access_time AS last_access_time,
COALESCE(c.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
# MemorySummary keyword search using fulltext index
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("summariesFulltext", $query) YIELD node AS m, score
WHERE ($end_user_id IS NULL OR m.end_user_id = $end_user_id)
OPTIONAL MATCH (m)-[:DERIVED_FROM_STATEMENT]->(s:Statement)
RETURN m.id AS id,
m.name AS name,
m.end_user_id AS end_user_id,
m.dialog_id AS dialog_id,
m.chunk_ids AS chunk_ids,
m.content AS content,
m.created_at AS created_at,
COALESCE(m.activation_value, m.importance_score, 0.5) AS activation_value,
COALESCE(m.importance_score, 0.5) AS importance_score,
m.last_access_time AS last_access_time,
COALESCE(m.access_count, 0) AS access_count,
score
ORDER BY score DESC
LIMIT $limit
"""
# Community keyword search: matches name or summary via fulltext index
SEARCH_COMMUNITIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("communitiesFulltext", $query) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
RETURN c.id AS id,
c.name AS name,
c.summary AS content,
c.core_entities AS core_entities,
c.member_count AS member_count,
c.end_user_id AS end_user_id,
c.updated_at AS updated_at,
score
ORDER BY score DESC
LIMIT $limit
"""
FULLTEXT_QUERY_CYPHER_MAPPING = {
Neo4jNodeType.STATEMENT: SEARCH_STATEMENTS_BY_KEYWORD,
Neo4jNodeType.EXTRACTEDENTITY: SEARCH_ENTITIES_BY_NAME_OR_ALIAS,
Neo4jNodeType.CHUNK: SEARCH_CHUNKS_BY_CONTENT,
Neo4jNodeType.MEMORYSUMMARY: SEARCH_MEMORY_SUMMARIES_BY_KEYWORD,
Neo4jNodeType.COMMUNITY: SEARCH_COMMUNITIES_BY_KEYWORD,
Neo4jNodeType.PERCEPTUAL: SEARCH_PERCEPTUALS_BY_KEYWORD
}
USER_ID_QUERY_CYPHER_MAPPING = {
Neo4jNodeType.STATEMENT: SEARCH_STATEMENTS_BY_USER_ID,
Neo4jNodeType.EXTRACTEDENTITY: SEARCH_ENTITIES_BY_USER_ID,
Neo4jNodeType.CHUNK: SEARCH_CHUNKS_BY_USER_ID,
Neo4jNodeType.MEMORYSUMMARY: SEARCH_MEMORY_SUMMARIES_BY_USER_ID,
Neo4jNodeType.COMMUNITY: SEARCH_COMMUNITIES_BY_USER_ID,
Neo4jNodeType.PERCEPTUAL: SEARCH_PERCEPTUAL_BY_USER_ID
}
NODE_ID_QUERY_CYPHER_MAPPING = {
Neo4jNodeType.STATEMENT: SEARCH_STATEMENTS_BY_IDS,
Neo4jNodeType.EXTRACTEDENTITY: SEARCH_ENTITIES_BY_IDS,
Neo4jNodeType.CHUNK: SEARCH_CHUNKS_BY_IDS,
Neo4jNodeType.MEMORYSUMMARY: SEARCH_MEMORY_SUMMARIES_BY_IDS,
Neo4jNodeType.COMMUNITY: SEARCH_COMMUNITIES_BY_IDS,
Neo4jNodeType.PERCEPTUAL: SEARCH_PERCEPTUAL_BY_IDS
}

View File

@@ -1,26 +1,19 @@
import asyncio
import logging
from typing import Any, Dict, List, Optional
import time
from typing import Any, Dict, List, Optional, Coroutine
from app.core.memory.utils.data.text_utils import escape_lucene_query
import numpy as np
from app.core.memory.enums import Neo4jNodeType
from app.core.memory.llm_tools import OpenAIEmbedderClient
from app.core.memory.utils.data.text_utils import escape_lucene_query
from app.repositories.neo4j.cypher_queries import (
CHUNK_EMBEDDING_SEARCH,
COMMUNITY_EMBEDDING_SEARCH,
ENTITY_EMBEDDING_SEARCH,
EXPAND_COMMUNITY_STATEMENTS,
MEMORY_SUMMARY_EMBEDDING_SEARCH,
SEARCH_CHUNK_BY_CHUNK_ID,
SEARCH_CHUNKS_BY_CONTENT,
SEARCH_COMMUNITIES_BY_KEYWORD,
SEARCH_DIALOGUE_BY_DIALOG_ID,
SEARCH_ENTITIES_BY_NAME,
SEARCH_ENTITIES_BY_NAME_OR_ALIAS,
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD,
SEARCH_STATEMENTS_BY_CREATED_AT,
SEARCH_STATEMENTS_BY_KEYWORD,
SEARCH_STATEMENTS_BY_KEYWORD_TEMPORAL,
SEARCH_STATEMENTS_BY_TEMPORAL,
SEARCH_STATEMENTS_BY_VALID_AT,
@@ -28,12 +21,14 @@ from app.repositories.neo4j.cypher_queries import (
SEARCH_STATEMENTS_G_VALID_AT,
SEARCH_STATEMENTS_L_CREATED_AT,
SEARCH_STATEMENTS_L_VALID_AT,
STATEMENT_EMBEDDING_SEARCH,
SEARCH_PERCEPTUAL_BY_KEYWORD,
SEARCH_PERCEPTUALS_BY_KEYWORD,
SEARCH_PERCEPTUAL_BY_IDS,
SEARCH_PERCEPTUAL_BY_USER_ID,
FULLTEXT_QUERY_CYPHER_MAPPING,
USER_ID_QUERY_CYPHER_MAPPING,
NODE_ID_QUERY_CYPHER_MAPPING
)
# 使用新的仓储层
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
logger = logging.getLogger(__name__)
@@ -52,7 +47,7 @@ def cosine_similarity_search(
query_norm = query / np.linalg.norm(query)
similarities = vectors_norm @ query_norm
similarities = (similarities + 1) / 2
similarities = np.clip(similarities, 0, 1)
top_k = min(limit, similarities.shape[0])
if top_k <= 0:
return {}
@@ -60,7 +55,7 @@ def cosine_similarity_search(
top_indices = top_indices[np.argsort(-similarities[top_indices])]
result = {}
for idx in top_indices:
result[idx] = similarities[idx]
result[idx] = float(similarities[idx])
return result
@@ -173,7 +168,10 @@ async def _update_search_results_activation(
knowledge_node_types = {
'statements': 'Statement',
'entities': 'ExtractedEntity',
'summaries': 'MemorySummary'
'summaries': 'MemorySummary',
Neo4jNodeType.STATEMENT: Neo4jNodeType.STATEMENT.value,
Neo4jNodeType.EXTRACTEDENTITY: Neo4jNodeType.EXTRACTEDENTITY.value,
Neo4jNodeType.MEMORYSUMMARY: Neo4jNodeType.MEMORYSUMMARY.value,
}
# 并行更新所有类型的节点
@@ -250,12 +248,147 @@ async def _update_search_results_activation(
return updated_results
async def search_perceptual_by_fulltext(
connector: Neo4jConnector,
query: str,
end_user_id: Optional[str] = None,
limit: int = 10,
) -> Dict[str, List[Dict[str, Any]]]:
try:
perceptuals = await connector.execute_query(
SEARCH_PERCEPTUALS_BY_KEYWORD,
query=escape_lucene_query(query),
end_user_id=end_user_id,
limit=limit,
)
except Exception as e:
logger.warning(f"search_perceptual: keyword search failed: {e}")
perceptuals = []
# Deduplicate
from app.core.memory.src.search import deduplicate_results
perceptuals = deduplicate_results(perceptuals)
return {"perceptuals": perceptuals}
async def search_perceptual_by_embedding(
connector: Neo4jConnector,
embedder_client: OpenAIEmbedderClient,
query_text: str,
end_user_id: Optional[str] = None,
limit: int = 10,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Search Perceptual memory nodes using embedding-based semantic search.
Uses cosine similarity on summary_embedding via the perceptual_summary_embedding_index.
Args:
connector: Neo4j connector
embedder_client: Embedding client with async response() method
query_text: Query text to embed
end_user_id: Optional user filter
limit: Max results
Returns:
Dictionary with 'perceptuals' key containing matched perceptual memory nodes
"""
embeddings = await embedder_client.response([query_text])
if not embeddings or not embeddings[0]:
logger.warning(f"search_perceptual_by_embedding: embedding generation failed for '{query_text[:50]}'")
return {"perceptuals": []}
embedding = embeddings[0]
try:
perceptuals = await connector.execute_query(
SEARCH_PERCEPTUAL_BY_USER_ID,
end_user_id=end_user_id,
)
ids = [item['id'] for item in perceptuals]
vectors = [item['summary_embedding'] for item in perceptuals]
sim_res = cosine_similarity_search(embedding, vectors, limit=limit)
perceptual_res = {
ids[idx]: score
for idx, score in sim_res.items()
}
perceptuals = await connector.execute_query(
SEARCH_PERCEPTUAL_BY_IDS,
ids=list(perceptual_res.keys())
)
for perceptual in perceptuals:
perceptual["score"] = perceptual_res[perceptual["id"]]
except Exception as e:
logger.warning(f"search_perceptual_by_embedding: vector search failed: {e}")
perceptuals = []
from app.core.memory.src.search import deduplicate_results
perceptuals = deduplicate_results(perceptuals)
return {"perceptuals": perceptuals}
def search_by_fulltext(
connector: Neo4jConnector,
node_type: Neo4jNodeType,
end_user_id: str,
query: str,
limit: int = 10,
) -> Coroutine[Any, Any, list[dict[str, Any]]]:
cypher = FULLTEXT_QUERY_CYPHER_MAPPING[node_type]
return connector.execute_query(
cypher,
json_format=True,
end_user_id=end_user_id,
query=query,
limit=limit,
)
async def search_by_embedding(
connector: Neo4jConnector,
node_type: Neo4jNodeType,
end_user_id: str,
query_embedding: list[float],
limit: int = 10,
) -> list[dict[str, Any]]:
try:
records = await connector.execute_query(
USER_ID_QUERY_CYPHER_MAPPING[node_type],
end_user_id=end_user_id,
)
records = [record for record in records if record if record["embedding"] is not None]
ids = [item['id'] for item in records]
vectors = [item['embedding'] for item in records]
sim_res = cosine_similarity_search(query_embedding, vectors, limit=limit)
records_score_map = {
ids[idx]: score
for idx, score in sim_res.items()
}
records = await connector.execute_query(
NODE_ID_QUERY_CYPHER_MAPPING[node_type],
ids=list(records_score_map.keys()),
json_format=True
)
for record in records:
record["score"] = records_score_map[record["id"]]
except Exception as e:
logger.warning(f"search_graph_by_embedding: vector search failed: {e}, node_type:{node_type.value}",
exc_info=True)
records = []
from app.core.memory.src.search import deduplicate_results
records = deduplicate_results(records)
return records
async def search_graph(
connector: Neo4jConnector,
query: str,
end_user_id: Optional[str] = None,
limit: int = 50,
include: List[str] = None,
include: List[Neo4jNodeType] = None,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Search across Statements, Entities, Chunks, and Summaries using a free-text query.
@@ -279,7 +412,13 @@ async def search_graph(
Dictionary with search results per category (with updated activation values)
"""
if include is None:
include = ["statements", "chunks", "entities", "summaries"]
include = [
Neo4jNodeType.STATEMENT,
Neo4jNodeType.CHUNK,
Neo4jNodeType.EXTRACTEDENTITY,
Neo4jNodeType.MEMORYSUMMARY,
Neo4jNodeType.PERCEPTUAL
]
# Escape Lucene special characters to prevent query parse errors
escaped_query = escape_lucene_query(query)
@@ -288,55 +427,9 @@ async def search_graph(
tasks = []
task_keys = []
if "statements" in include:
tasks.append(connector.execute_query(
SEARCH_STATEMENTS_BY_KEYWORD,
json_format=True,
query=escaped_query,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("statements")
if "entities" in include:
tasks.append(connector.execute_query(
SEARCH_ENTITIES_BY_NAME_OR_ALIAS,
json_format=True,
query=escaped_query,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("entities")
if "chunks" in include:
tasks.append(connector.execute_query(
SEARCH_CHUNKS_BY_CONTENT,
json_format=True,
query=escaped_query,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("chunks")
if "summaries" in include:
tasks.append(connector.execute_query(
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD,
json_format=True,
query=escaped_query,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("summaries")
if "communities" in include:
tasks.append(connector.execute_query(
SEARCH_COMMUNITIES_BY_KEYWORD,
json_format=True,
query=escaped_query,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("communities")
for node_type in include:
tasks.append(search_by_fulltext(connector, node_type, end_user_id, escaped_query, limit))
task_keys.append(node_type.value)
# Execute all queries in parallel
task_results = await asyncio.gather(*tasks, return_exceptions=True)
@@ -352,16 +445,16 @@ async def search_graph(
# Deduplicate results before updating activation values
# This prevents duplicates from propagating through the pipeline
from app.core.memory.src.search import _deduplicate_results
from app.core.memory.src.search import deduplicate_results
for key in results:
if isinstance(results[key], list):
results[key] = _deduplicate_results(results[key])
results[key] = deduplicate_results(results[key])
# 更新知识节点的激活值Statement, ExtractedEntity, MemorySummary
# Skip activation updates if only searching summaries (optimization)
needs_activation_update = any(
key in include and key in results and results[key]
for key in ['statements', 'entities', 'chunks']
for key in [Neo4jNodeType.STATEMENT, Neo4jNodeType.EXTRACTEDENTITY, Neo4jNodeType.MEMORYSUMMARY]
)
if needs_activation_update:
@@ -378,7 +471,7 @@ async def search_graph_by_embedding(
connector: Neo4jConnector,
embedder_client,
query_text: str,
end_user_id: Optional[str] = None,
end_user_id: str,
limit: int = 50,
include=None,
) -> Dict[str, List[Dict[str, Any]]]:
@@ -394,96 +487,32 @@ async def search_graph_by_embedding(
- Returns up to 'limit' per included type
"""
if include is None:
include = ["statements", "chunks", "entities", "summaries"]
import time
include = [
Neo4jNodeType.STATEMENT,
Neo4jNodeType.CHUNK,
Neo4jNodeType.EXTRACTEDENTITY,
Neo4jNodeType.MEMORYSUMMARY,
Neo4jNodeType.PERCEPTUAL
]
# Get embedding for the query
embed_start = time.time()
embeddings = await embedder_client.response([query_text])
embed_time = time.time() - embed_start
logger.debug(f"[PERF] Embedding generation took: {embed_time:.4f}s")
if not embeddings or not embeddings[0]:
logger.warning(
f"search_graph_by_embedding: embedding 生成失败或为空,"
f"query='{query_text[:50]}', end_user_id={end_user_id},向量检索跳过"
)
return {"statements": [], "chunks": [], "entities": [], "summaries": [], "communities": []}
logger.warning(f"search_graph_by_embedding: embedding generation failed for '{query_text[:50]}'")
return {search_key: [] for search_key in include}
embedding = embeddings[0]
# Prepare tasks for parallel execution
tasks = []
task_keys = []
# Statements (embedding)
if "statements" in include:
tasks.append(connector.execute_query(
STATEMENT_EMBEDDING_SEARCH,
json_format=True,
embedding=embedding,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("statements")
for node_type in include:
tasks.append(search_by_embedding(connector, node_type, end_user_id, embedding, limit))
task_keys.append(node_type.value)
# Chunks (embedding)
if "chunks" in include:
tasks.append(connector.execute_query(
CHUNK_EMBEDDING_SEARCH,
json_format=True,
embedding=embedding,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("chunks")
# Entities
if "entities" in include:
tasks.append(connector.execute_query(
ENTITY_EMBEDDING_SEARCH,
json_format=True,
embedding=embedding,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("entities")
# Memory summaries
if "summaries" in include:
tasks.append(connector.execute_query(
MEMORY_SUMMARY_EMBEDDING_SEARCH,
json_format=True,
embedding=embedding,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("summaries")
# Communities (向量语义匹配)
if "communities" in include:
tasks.append(connector.execute_query(
COMMUNITY_EMBEDDING_SEARCH,
json_format=True,
embedding=embedding,
end_user_id=end_user_id,
limit=limit,
))
task_keys.append("communities")
# Execute all queries in parallel
query_start = time.time()
task_results = await asyncio.gather(*tasks, return_exceptions=True)
query_time = time.time() - query_start
logger.debug(f"[PERF] Neo4j queries (parallel) took: {query_time:.4f}s")
# Build results dictionary
results: Dict[str, List[Dict[str, Any]]] = {
"statements": [],
"chunks": [],
"entities": [],
"summaries": [],
"communities": [],
}
results: Dict[str, List[Dict[str, Any]]] = {}
for key, result in zip(task_keys, task_results):
if isinstance(result, Exception):
@@ -494,16 +523,16 @@ async def search_graph_by_embedding(
# Deduplicate results before updating activation values
# This prevents duplicates from propagating through the pipeline
from app.core.memory.src.search import _deduplicate_results
from app.core.memory.src.search import deduplicate_results
for key in results:
if isinstance(results[key], list):
results[key] = _deduplicate_results(results[key])
results[key] = deduplicate_results(results[key])
# 更新知识节点的激活值Statement, ExtractedEntity, MemorySummary
# Skip activation updates if only searching summaries (optimization)
needs_activation_update = any(
key in include and key in results and results[key]
for key in ['statements', 'entities', 'chunks']
for key in [Neo4jNodeType.STATEMENT, Neo4jNodeType.EXTRACTEDENTITY, Neo4jNodeType.MEMORYSUMMARY]
)
if needs_activation_update:
@@ -781,12 +810,12 @@ async def search_graph_community_expand(
expanded.extend(result)
# 按 activation_value 全局排序后去重
from app.core.memory.src.search import _deduplicate_results
from app.core.memory.src.search import deduplicate_results
expanded.sort(
key=lambda x: float(x.get("activation_value") or 0),
reverse=True,
)
expanded = _deduplicate_results(expanded)
expanded = deduplicate_results(expanded)
logger.info(f"社区展开检索完成: community_ids={community_ids}, 展开 statements={len(expanded)}")
return {"expanded_statements": expanded}
@@ -999,98 +1028,3 @@ async def search_graph_l_valid_at(
)
return results
async def search_perceptual(
connector: Neo4jConnector,
query: str,
end_user_id: Optional[str] = None,
limit: int = 10,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Search Perceptual memory nodes using fulltext keyword search.
Matches against summary, topic, and domain fields via the perceptualFulltext index.
Args:
connector: Neo4j connector
query: Query text for full-text search
end_user_id: Optional user filter
limit: Max results
Returns:
Dictionary with 'perceptuals' key containing matched perceptual memory nodes
"""
try:
perceptuals = await connector.execute_query(
SEARCH_PERCEPTUAL_BY_KEYWORD,
query=escape_lucene_query(query),
end_user_id=end_user_id,
limit=limit,
)
except Exception as e:
logger.warning(f"search_perceptual: keyword search failed: {e}")
perceptuals = []
# Deduplicate
from app.core.memory.src.search import _deduplicate_results
perceptuals = _deduplicate_results(perceptuals)
return {"perceptuals": perceptuals}
async def search_perceptual_by_embedding(
connector: Neo4jConnector,
embedder_client: OpenAIEmbedderClient,
query_text: str,
end_user_id: Optional[str] = None,
limit: int = 10,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Search Perceptual memory nodes using embedding-based semantic search.
Uses cosine similarity on summary_embedding via the perceptual_summary_embedding_index.
Args:
connector: Neo4j connector
embedder_client: Embedding client with async response() method
query_text: Query text to embed
end_user_id: Optional user filter
limit: Max results
Returns:
Dictionary with 'perceptuals' key containing matched perceptual memory nodes
"""
embeddings = await embedder_client.response([query_text])
if not embeddings or not embeddings[0]:
logger.warning(f"search_perceptual_by_embedding: embedding generation failed for '{query_text[:50]}'")
return {"perceptuals": []}
embedding = embeddings[0]
try:
perceptuals = await connector.execute_query(
SEARCH_PERCEPTUAL_BY_USER_ID,
end_user_id=end_user_id,
)
ids = [item['id'] for item in perceptuals]
vectors = [item['summary_embedding'] for item in perceptuals]
sim_res = cosine_similarity_search(embedding, vectors, limit=limit)
perceptual_res = {
ids[idx]: score
for idx, score in sim_res.items()
}
perceptuals = await connector.execute_query(
SEARCH_PERCEPTUAL_BY_IDS,
ids=list(perceptual_res.keys())
)
for perceptual in perceptuals:
perceptual["score"] = perceptual_res[perceptual["id"]]
except Exception as e:
logger.warning(f"search_perceptual_by_embedding: vector search failed: {e}")
perceptuals = []
from app.core.memory.src.search import _deduplicate_results
perceptuals = _deduplicate_results(perceptuals)
return {"perceptuals": perceptuals}

View File

@@ -70,6 +70,12 @@ class Neo4jConnector:
auth=basic_auth(username, password)
)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def close(self):
"""关闭数据库连接
@@ -77,11 +83,11 @@ class Neo4jConnector:
"""
await self.driver.close()
async def execute_query(self, query: str, json_format=False, **kwargs: Any) -> List[Dict[str, Any]]:
async def execute_query(self, cypher: str, json_format=False, **kwargs: Any) -> List[Dict[str, Any]]:
"""执行Cypher查询
Args:
query: Cypher查询语句
cypher: Cypher查询语句
json_format: json格式化
**kwargs: 查询参数将作为参数传递给Cypher查询
@@ -92,7 +98,7 @@ class Neo4jConnector:
"""
result = await self.driver.execute_query(
query,
cypher,
database="neo4j",
**kwargs
)