refactor(memory): consolidate memory search services and update model client handling

- Consolidate memory search services by removing separate content_search.py and perceptual_search.py
- Update model client handling in base_pipeline.py to use ModelApiKeyService for LLM client initialization
- Add new prompt files and modify existing services to support consolidated search architecture
- Refactor memory read pipeline and related services to use updated model client approach
This commit is contained in:
Eternity
2026-04-16 13:46:39 +08:00
parent a01525e239
commit 749cf79581
4 changed files with 27 additions and 9 deletions

View File

@@ -12,8 +12,8 @@ from app.repositories.neo4j.neo4j_connector import Neo4jConnector
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DEFAULT_ALPHA = 0.7 DEFAULT_ALPHA = 0.6
DEFAULT_FULLTEXT_SCORE_THRESHOLD = 1 DEFAULT_FULLTEXT_SCORE_THRESHOLD = 1.5
DEFAULT_COSINE_SCORE_THRESHOLD = 0.5 DEFAULT_COSINE_SCORE_THRESHOLD = 0.5
DEFAULT_CONTENT_SCORE_THRESHOLD = 0.5 DEFAULT_CONTENT_SCORE_THRESHOLD = 0.5
@@ -112,7 +112,7 @@ class Neo4jSearchService:
kw = float(combined[item_id].get("kw_score", 0) or 0) kw = float(combined[item_id].get("kw_score", 0) or 0)
emb = float(combined[item_id].get("embedding_score", 0) or 0) emb = float(combined[item_id].get("embedding_score", 0) or 0)
base = self.alpha * emb + (1 - self.alpha) * kw base = self.alpha * emb + (1 - self.alpha) * kw
combined[item_id]["content_score"] = base + min(1 - base, kw * emb) combined[item_id]["content_score"] = base + min(1 - base, 0.1 * kw * emb)
results = sorted(combined.values(), key=lambda x: x["content_score"], reverse=True) results = sorted(combined.values(), key=lambda x: x["content_score"], reverse=True)
# results = [ # results = [
# res for res in results # res for res in results

View File

@@ -61,14 +61,18 @@ class EntityBuilder(BaseBuilder):
def data(self) -> dict: def data(self) -> dict:
return { return {
"id": self.record.get("id"), "id": self.record.get("id"),
"content": self.record.get("name"), "name": self.record.get("name"),
"description": self.record.get("description"),
"kw_score": self.record.get("kw_score", 0.0), "kw_score": self.record.get("kw_score", 0.0),
"emb_score": self.record.get("embedding_score", 0.0) "emb_score": self.record.get("embedding_score", 0.0)
} }
@property @property
def content(self) -> str: def content(self) -> str:
return self.record.get("name") return (f"<entity>"
f"<name>{self.record.get("name")}<name>"
f"<description>{self.record.get("description")}</description>"
f"</entity>")
class SummaryBuilder(BaseBuilder): class SummaryBuilder(BaseBuilder):

View File

@@ -19,7 +19,8 @@ async def create_fulltext_indexes():
# """) # """)
# 创建 Entities 索引 # 创建 Entities 索引
await connector.execute_query(""" await connector.execute_query("""
CREATE FULLTEXT INDEX entitiesFulltext IF NOT EXISTS FOR (e:ExtractedEntity) ON EACH [e.name] CREATE FULLTEXT INDEX entitiesFulltext IF NOT EXISTS
FOR (e:ExtractedEntity) ON EACH [e.name, e.description, e.aliases]
OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } } OPTIONS { indexConfig: { `fulltext.analyzer`: 'cjk' } }
""") """)
@@ -139,6 +140,16 @@ async def create_vector_indexes():
await connector.close() await connector.close()
async def create_user_indexes():
connector = Neo4jConnector()
await connector.execute_query(
"""
CREATE INDEX user_perceptual IF NOT EXISTS
FOR (p:Perceptual) ON (p.end_user_id);
"""
)
async def create_unique_constraints(): async def create_unique_constraints():
"""Create uniqueness constraints for core node identifiers. """Create uniqueness constraints for core node identifiers.
Ensures concurrent MERGE operations remain safe and prevents duplicates. Ensures concurrent MERGE operations remain safe and prevents duplicates.

View File

@@ -45,14 +45,17 @@ def cosine_similarity_search(
vectors: np.ndarray = np.array(vectors, dtype=np.float32) vectors: np.ndarray = np.array(vectors, dtype=np.float32)
vectors_norm = vectors / np.linalg.norm(vectors, axis=1, keepdims=True) vectors_norm = vectors / np.linalg.norm(vectors, axis=1, keepdims=True)
query: np.ndarray = np.array(query, dtype=np.float32) query: np.ndarray = np.array(query, dtype=np.float32)
query_norm = query / np.linalg.norm(query) norm = np.linalg.norm(query)
if norm == 0:
return {}
query_norm = query / norm
similarities = vectors_norm @ query_norm similarities = vectors_norm @ query_norm
similarities = np.clip(similarities, 0, 1) similarities = np.clip(similarities, 0, 1)
top_k = min(limit, similarities.shape[0]) top_k = min(limit, similarities.shape[0])
if top_k <= 0: if top_k <= 0:
return {} return {}
top_indices = np.argpartition(-similarities, top_k - 1)[-top_k:] top_indices = np.argpartition(-similarities, top_k - 1)[:top_k]
top_indices = top_indices[np.argsort(-similarities[top_indices])] top_indices = top_indices[np.argsort(-similarities[top_indices])]
result = {} result = {}
for idx in top_indices: for idx in top_indices:
@@ -510,7 +513,7 @@ async def search_graph_by_embedding(
task_keys = [] task_keys = []
for node_type in include: for node_type in include:
tasks.append(search_by_embedding(connector, node_type, end_user_id, embedding, limit)) tasks.append(search_by_embedding(connector, node_type, end_user_id, embedding, limit*2))
task_keys.append(node_type.value) task_keys.append(node_type.value)
task_results = await asyncio.gather(*tasks, return_exceptions=True) task_results = await asyncio.gather(*tasks, return_exceptions=True)