refactor(memory): restructure memory search architecture

- Replace storage_services/search with new read_services/memory_search structure
- Implement content_search and perceptual_search strategies
- Add query_preprocessor for search optimization
- Create memory_service as unified interface
- Update celery_app and graph_search for new architecture
- Add enums for memory operations
- Implement base_pipeline and memory_read pipeline patterns
This commit is contained in:
Eternity
2026-04-10 17:42:57 +08:00
parent 5eaedaad77
commit dca3173ed9
17 changed files with 463 additions and 994 deletions

View File

@@ -1452,6 +1452,30 @@ ON CREATE SET r.end_user_id = edge.end_user_id,
RETURN elementId(r) AS uuid
"""
SEARCH_PERCEPTUAL_BY_USER_ID = """
MATCH (p:Perceptual)
WHERE p.end_user_id = $end_user_id
RETURN p.id AS id,
p.summary_embedding AS summary_embedding
"""
SEARCH_PERCEPTUAL_BY_IDS = """
MATCH (p:Perceptual)
WHERE p.id IN $ids
RETURN p.id AS id,
p.end_user_id AS end_user_id,
p.perceptual_type AS perceptual_type,
p.file_path AS file_path,
p.file_name AS file_name,
p.file_ext AS file_ext,
p.summary AS summary,
p.keywords AS keywords,
p.topic AS topic,
p.domain AS domain,
p.created_at AS created_at,
p.file_type AS file_type
"""
SEARCH_PERCEPTUAL_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("perceptualFulltext", $query) YIELD node AS p, score
WHERE p.end_user_id = $end_user_id
@@ -1471,24 +1495,3 @@ RETURN p.id AS id,
ORDER BY score DESC
LIMIT $limit
"""
PERCEPTUAL_EMBEDDING_SEARCH = """
CALL db.index.vector.queryNodes('perceptual_summary_embedding_index', $limit * 100, $embedding)
YIELD node AS p, score
WHERE p.summary_embedding IS NOT NULL AND p.end_user_id = $end_user_id
RETURN p.id AS id,
p.end_user_id AS end_user_id,
p.perceptual_type AS perceptual_type,
p.file_path AS file_path,
p.file_name AS file_name,
p.file_ext AS file_ext,
p.summary AS summary,
p.keywords AS keywords,
p.topic AS topic,
p.domain AS domain,
p.created_at AS created_at,
p.file_type AS file_type,
score
ORDER BY score DESC
LIMIT $limit
"""

View File

@@ -3,13 +3,15 @@ import logging
from typing import Any, Dict, List, Optional
from app.core.memory.utils.data.text_utils import escape_lucene_query
import numpy as np
from app.core.memory.llm_tools import OpenAIEmbedderClient
from app.repositories.neo4j.cypher_queries import (
CHUNK_EMBEDDING_SEARCH,
COMMUNITY_EMBEDDING_SEARCH,
ENTITY_EMBEDDING_SEARCH,
EXPAND_COMMUNITY_STATEMENTS,
MEMORY_SUMMARY_EMBEDDING_SEARCH,
PERCEPTUAL_EMBEDDING_SEARCH,
SEARCH_CHUNK_BY_CHUNK_ID,
SEARCH_CHUNKS_BY_CONTENT,
SEARCH_COMMUNITIES_BY_KEYWORD,
@@ -17,7 +19,6 @@ from app.repositories.neo4j.cypher_queries import (
SEARCH_ENTITIES_BY_NAME,
SEARCH_ENTITIES_BY_NAME_OR_ALIAS,
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD,
SEARCH_PERCEPTUAL_BY_KEYWORD,
SEARCH_STATEMENTS_BY_CREATED_AT,
SEARCH_STATEMENTS_BY_KEYWORD,
SEARCH_STATEMENTS_BY_KEYWORD_TEMPORAL,
@@ -28,14 +29,41 @@ from app.repositories.neo4j.cypher_queries import (
SEARCH_STATEMENTS_L_CREATED_AT,
SEARCH_STATEMENTS_L_VALID_AT,
STATEMENT_EMBEDDING_SEARCH,
SEARCH_PERCEPTUAL_BY_KEYWORD,
SEARCH_PERCEPTUAL_BY_IDS,
SEARCH_PERCEPTUAL_BY_USER_ID,
)
# 使用新的仓储层
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
logger = logging.getLogger(__name__)
def cosine_similarity_search(
query: list[float],
vectors: list[list[float]],
limit: int
) -> dict[int, float]:
if not vectors:
return {}
vectors: np.ndarray = np.array(vectors, dtype=np.float32)
vectors_norm = vectors / np.linalg.norm(vectors, axis=1, keepdims=True)
query: np.ndarray = np.array(query, dtype=np.float32)
query_norm = query / np.linalg.norm(query)
similarities = vectors_norm @ query_norm
similarities = (similarities + 1) / 2
top_k = min(limit, similarities.shape[0])
if top_k <= 0:
return {}
top_indices = np.argpartition(-similarities, top_k - 1)[-top_k:]
top_indices = top_indices[np.argsort(-similarities[top_indices])]
result = {}
for idx in top_indices:
result[idx] = similarities[idx]
return result
async def _update_activation_values_batch(
connector: Neo4jConnector,
nodes: List[Dict[str, Any]],
@@ -352,7 +380,7 @@ async def search_graph_by_embedding(
query_text: str,
end_user_id: Optional[str] = None,
limit: int = 50,
include: List[str] = ["statements", "chunks", "entities", "summaries"],
include=None,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Embedding-based semantic search across Statements, Chunks, and Entities.
@@ -365,6 +393,8 @@ async def search_graph_by_embedding(
- Filters by end_user_id if provided
- Returns up to 'limit' per included type
"""
if include is None:
include = ["statements", "chunks", "entities", "summaries"]
import time
# Get embedding for the query
@@ -1011,7 +1041,7 @@ async def search_perceptual(
async def search_perceptual_by_embedding(
connector: Neo4jConnector,
embedder_client,
embedder_client: OpenAIEmbedderClient,
query_text: str,
end_user_id: Optional[str] = None,
limit: int = 10,
@@ -1040,11 +1070,22 @@ async def search_perceptual_by_embedding(
try:
perceptuals = await connector.execute_query(
PERCEPTUAL_EMBEDDING_SEARCH,
embedding=embedding,
SEARCH_PERCEPTUAL_BY_USER_ID,
end_user_id=end_user_id,
limit=limit,
)
ids = [item['id'] for item in perceptuals]
vectors = [item['summary_embedding'] for item in perceptuals]
sim_res = cosine_similarity_search(embedding, vectors, limit=limit)
perceptual_res = {
ids[idx]: score
for idx, score in sim_res.items()
}
perceptuals = await connector.execute_query(
SEARCH_PERCEPTUAL_BY_IDS,
ids=list(perceptual_res.keys())
)
for perceptual in perceptuals:
perceptual["score"] = perceptual_res[perceptual["id"]]
except Exception as e:
logger.warning(f"search_perceptual_by_embedding: vector search failed: {e}")
perceptuals = []