From 811193dd756295c9ac89d3d54c3b03dac131c840 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 14 Apr 2026 17:28:24 +0800 Subject: [PATCH 1/2] fix(memory): make PgSQL the single source of truth for user entity aliases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Skip alias merging for user entities during dedup (_merge_attribute and _merge_entities_with_aliases) to prevent dirty data from overwriting PgSQL authoritative aliases - Add PgSQL→Neo4j alias sync after Neo4j write in write_tools to ensure Neo4j user entities always reflect the PgSQL source - Remove deduped_aliases (Neo4j history) from alias sync in extraction_orchestrator, only append newly extracted aliases to PgSQL - Guard Neo4j MERGE cypher to preserve existing aliases for user entities (name IN ['用户','我','User','I']) - Fix emotion_analytics_service query to use ExtractedEntity label and entity_type property --- .../core/memory/agent/utils/write_tools.py | 29 ++++- .../deduplication/deduped_and_disamb.py | 109 +++++------------- .../extraction_orchestrator.py | 44 ++----- api/app/repositories/neo4j/cypher_queries.py | 2 + api/app/services/emotion_analytics_service.py | 4 +- 5 files changed, 71 insertions(+), 117 deletions(-) diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index bae4643e..5e51beba 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -191,15 +191,34 @@ async def write( if success: logger.info("Successfully saved all data to Neo4j") - # 使用 Celery 异步任务触发聚类(不阻塞主流程) if all_entity_nodes: + end_user_id = all_entity_nodes[0].end_user_id + + # Neo4j 写入完成后,用 PgSQL 权威 aliases 覆盖 Neo4j 用户实体 + try: + from app.repositories.end_user_info_repository import EndUserInfoRepository + if end_user_id: + with get_db_context() as db_session: + info = EndUserInfoRepository(db_session).get_by_end_user_id(uuid.UUID(end_user_id)) + pg_aliases = info.aliases if info and info.aliases else [] + if pg_aliases: + await neo4j_connector.execute_query( + """ + MATCH (e:ExtractedEntity) + WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '我', 'User', 'I'] + SET e.aliases = $aliases + """, + end_user_id=end_user_id, aliases=pg_aliases, + ) + logger.info(f"[AliasSync] Neo4j 用户实体 aliases 已用 PgSQL 权威源覆盖: {pg_aliases}") + except Exception as sync_err: + logger.warning(f"[AliasSync] PgSQL→Neo4j aliases 同步失败(不影响主流程): {sync_err}") + + # 使用 Celery 异步任务触发聚类(不阻塞主流程) try: from app.tasks import run_incremental_clustering - end_user_id = all_entity_nodes[0].end_user_id new_entity_ids = [e.id for e in all_entity_nodes] - - # 异步提交 Celery 任务 task = run_incremental_clustering.apply_async( kwargs={ "end_user_id": end_user_id, @@ -207,7 +226,6 @@ async def write( "llm_model_id": str(memory_config.llm_model_id) if memory_config.llm_model_id else None, "embedding_model_id": str(memory_config.embedding_model_id) if memory_config.embedding_model_id else None, }, - # 设置任务优先级(低优先级,不影响主业务) priority=3, ) logger.info( @@ -215,7 +233,6 @@ async def write( f"task_id={task.id}, end_user_id={end_user_id}, entity_count={len(new_entity_ids)}" ) except Exception as e: - # 聚类任务提交失败不影响主流程 logger.error(f"[Clustering] 提交聚类任务失败(不影响主流程): {e}", exc_info=True) break diff --git a/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py b/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py index 7e0976fe..8f659a27 100644 --- a/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py +++ b/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py @@ -82,51 +82,35 @@ def _merge_attribute(canonical: ExtractedEntityNode, ent: ExtractedEntityNode): canonical.connect_strength = next(iter(pair)) # 别名合并(去重保序,使用标准化工具) + # 用户实体的 aliases 由 PgSQL end_user_info 作为唯一权威源,去重合并时不修改 try: canonical_name = (getattr(canonical, "name", "") or "").strip() - incoming_name = (getattr(ent, "name", "") or "").strip() - - # 收集所有需要合并的别名 - all_aliases = [] - - # 1. 添加canonical现有的别名 - existing = getattr(canonical, "aliases", []) or [] - all_aliases.extend(existing) - - # 2. 添加incoming实体的名称(如果不同于canonical的名称) - if incoming_name and incoming_name != canonical_name: - all_aliases.append(incoming_name) - - # 3. 添加incoming实体的所有别名 - incoming = getattr(ent, "aliases", []) or [] - all_aliases.extend(incoming) - - # 4. 标准化并去重(优先使用alias_utils工具函数) - try: - from app.core.memory.utils.alias_utils import normalize_aliases - canonical.aliases = normalize_aliases(canonical_name, all_aliases) - except Exception: - # 如果导入失败,使用增强的去重逻辑 - seen_normalized = set() - unique_aliases = [] + if canonical_name.lower() not in _USER_PLACEHOLDER_NAMES: + incoming_name = (getattr(ent, "name", "") or "").strip() - for alias in all_aliases: - if not alias: - continue - - alias_stripped = str(alias).strip() - if not alias_stripped or alias_stripped == canonical_name: - continue - - # 标准化:转小写用于去重判断 - alias_normalized = alias_stripped.lower() - - if alias_normalized not in seen_normalized: - seen_normalized.add(alias_normalized) - unique_aliases.append(alias_stripped) + # 收集所有需要合并的别名 + all_aliases = list(getattr(canonical, "aliases", []) or []) + if incoming_name and incoming_name != canonical_name: + all_aliases.append(incoming_name) + all_aliases.extend(getattr(ent, "aliases", []) or []) - # 排序并赋值 - canonical.aliases = sorted(unique_aliases) + try: + from app.core.memory.utils.alias_utils import normalize_aliases + canonical.aliases = normalize_aliases(canonical_name, all_aliases) + except Exception: + seen_normalized = set() + unique_aliases = [] + for alias in all_aliases: + if not alias: + continue + alias_stripped = str(alias).strip() + if not alias_stripped or alias_stripped == canonical_name: + continue + alias_normalized = alias_stripped.lower() + if alias_normalized not in seen_normalized: + seen_normalized.add(alias_normalized) + unique_aliases.append(alias_stripped) + canonical.aliases = sorted(unique_aliases) except Exception: pass @@ -733,66 +717,37 @@ def fuzzy_match( def _merge_entities_with_aliases(canonical: ExtractedEntityNode, losing: ExtractedEntityNode): - """ 模糊匹配中的实体合并。 + """模糊匹配中的实体合并(别名部分)。 - 合并策略: - 1. 保留canonical的主名称不变 - 2. 将losing的主名称添加为alias(如果不同) - 3. 合并两个实体的所有aliases - 4. 自动去重(case-insensitive)并排序 - - Args: - canonical: 规范实体(保留) - losing: 被合并实体(删除) - - Note: - 使用alias_utils.normalize_aliases进行标准化去重 + 用户实体的 aliases 由 PgSQL end_user_info 作为唯一权威源,跳过合并。 """ - # 获取规范实体的名称 canonical_name = (getattr(canonical, "name", "") or "").strip() + if canonical_name.lower() in _USER_PLACEHOLDER_NAMES: + return + losing_name = (getattr(losing, "name", "") or "").strip() - # 收集所有需要合并的别名 - all_aliases = [] - - # 1. 添加canonical现有的别名 - current_aliases = getattr(canonical, "aliases", []) or [] - all_aliases.extend(current_aliases) - - # 2. 添加losing实体的名称(如果不同于canonical的名称) + all_aliases = list(getattr(canonical, "aliases", []) or []) if losing_name and losing_name != canonical_name: all_aliases.append(losing_name) + all_aliases.extend(getattr(losing, "aliases", []) or []) - # 3. 添加losing实体的所有别名 - losing_aliases = getattr(losing, "aliases", []) or [] - all_aliases.extend(losing_aliases) - - # 4. 标准化并去重(使用标准化后的字符串进行去重) try: from app.core.memory.utils.alias_utils import normalize_aliases canonical.aliases = normalize_aliases(canonical_name, all_aliases) except Exception: - # 如果导入失败,使用增强的去重逻辑 - # 使用标准化后的字符串作为key进行去重 seen_normalized = set() unique_aliases = [] - for alias in all_aliases: if not alias: continue - alias_stripped = str(alias).strip() if not alias_stripped or alias_stripped == canonical_name: continue - - # 标准化:转小写用于去重判断 alias_normalized = alias_stripped.lower() - if alias_normalized not in seen_normalized: seen_normalized.add(alias_normalized) unique_aliases.append(alias_stripped) - - # 排序并赋值 canonical.aliases = sorted(unique_aliases) # ========== 主循环:遍历所有实体对进行模糊匹配 ========== diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py index 5636dcb5..75fc87d2 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py @@ -1391,18 +1391,18 @@ class ExtractionOrchestrator: """ 将本轮提取的用户别名同步到 end_user 和 end_user_info 表。 - 注意:此方法在 Neo4j 写入之前调用,因此不能依赖 Neo4j 作为别名的权威数据源。 - 改为直接使用内存中去重后的 entity_nodes 的 aliases,与 PgSQL 已有的 aliases 合并。 + PgSQL end_user_info.aliases 是用户别名的唯一权威源。 + 此方法仅将本轮 LLM 从对话中新提取的别名增量追加到 PgSQL, + 不再从 Neo4j 二层去重合并历史别名,避免脏数据反向污染 PgSQL。 策略: - 1. 从内存中的 entity_nodes 提取本轮用户别名(current_aliases) - 2. 从去重后的 entity_nodes 中提取完整别名(含 Neo4j 二层去重合并的历史别名) - 3. 从 PgSQL end_user_info 读取已有的 aliases(db_aliases) - 4. 合并 db_aliases + deduped_aliases + current_aliases,去重保序 - 5. 写回 PgSQL + 1. 从本轮对话原始发言中提取用户别名(current_aliases) + 2. 从 PgSQL end_user_info 读取已有的 aliases(db_aliases) + 3. 合并 db_aliases + current_aliases,去重保序 + 4. 写回 PgSQL Args: - entity_nodes: 去重后的实体节点列表(内存中,含二层去重合并结果) + entity_nodes: 去重后的实体节点列表(内存中) dialog_data_list: 对话数据列表 """ try: @@ -1418,11 +1418,6 @@ class ExtractionOrchestrator: # 1. 提取本轮对话的用户别名(保持 LLM 提取的原始顺序,不排序) current_aliases = self._extract_current_aliases(entity_nodes, dialog_data_list) - # 1.5 从去重后的 entity_nodes 中提取完整别名 - # 二层去重会将 Neo4j 中已有的历史别名合并到 entity_nodes 中, - # 这里提取出来确保 PgSQL 与 Neo4j 的别名保持同步 - deduped_aliases = self._extract_deduped_entity_aliases(entity_nodes) - # 1.6 从 Neo4j 查询已有的 AI 助手别名,作为额外的排除源 # (防止 LLM 未提取出 AI 助手实体时,AI 别名泄漏到用户别名中) neo4j_assistant_aliases = await self._fetch_neo4j_assistant_aliases(end_user_id) @@ -1434,19 +1429,12 @@ class ExtractionOrchestrator: ] if len(current_aliases) < before_count: logger.info(f"通过 Neo4j AI 助手别名排除了 {before_count - len(current_aliases)} 个误归属别名") - # 同样过滤 deduped_aliases - deduped_aliases = [ - a for a in deduped_aliases - if a.strip().lower() not in neo4j_assistant_aliases - ] - if not current_aliases and not deduped_aliases: + if not current_aliases: logger.debug(f"本轮未提取到用户别名,跳过同步: end_user_id={end_user_id}") return logger.info(f"本轮对话提取的 aliases: {current_aliases}") - if deduped_aliases: - logger.info(f"去重后实体的完整 aliases(含历史): {deduped_aliases}") # 2. 同步到数据库 end_user_uuid = uuid.UUID(end_user_id) @@ -1457,21 +1445,15 @@ class ExtractionOrchestrator: logger.warning(f"未找到 end_user_id={end_user_id} 的用户记录") return - # 3. 从 PgSQL 读取已有 aliases 并与本轮合并 + # 3. 从 PgSQL 读取已有 aliases 并与本轮新增合并 info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid) db_aliases = (info.aliases if info and info.aliases else []) # 过滤掉占位名称 db_aliases = [a for a in db_aliases if a.strip().lower() not in self.USER_PLACEHOLDER_NAMES] - # 合并:已有 + 去重后完整别名 + 本轮新增,去重保序 + # 合并:PgSQL 已有 + 本轮新增,去重保序(不再合并 Neo4j 历史别名) merged_aliases = list(db_aliases) seen_lower = {a.strip().lower() for a in merged_aliases} - # 先合并去重后实体的完整别名(含 Neo4j 历史别名) - for alias in deduped_aliases: - if alias.strip().lower() not in seen_lower: - merged_aliases.append(alias) - seen_lower.add(alias.strip().lower()) - # 再合并本轮新提取的别名 for alias in current_aliases: if alias.strip().lower() not in seen_lower: merged_aliases.append(alias) @@ -1505,9 +1487,7 @@ class ExtractionOrchestrator: info.aliases = merged_aliases logger.info(f"同步合并后 aliases 到 end_user_info: {merged_aliases}") else: - first_alias = current_aliases[0].strip() if current_aliases else ( - deduped_aliases[0].strip() if deduped_aliases else "" - ) + first_alias = current_aliases[0].strip() if current_aliases else "" # 确保 first_alias 不是占位名称 if first_alias and first_alias.lower() not in self.USER_PLACEHOLDER_NAMES: db.add(EndUserInfo( diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 4b5273ac..daf04bcb 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -93,6 +93,8 @@ SET e.name = CASE WHEN entity.name IS NOT NULL AND entity.name <> '' THEN entity END, e.statement_id = CASE WHEN entity.statement_id IS NOT NULL AND entity.statement_id <> '' THEN entity.statement_id ELSE e.statement_id END, e.aliases = CASE + // 用户实体的 aliases 由 PgSQL end_user_info 作为唯一权威源,知识抽取完全不写入 + WHEN entity.name IN ['用户', '我', 'User', 'I'] THEN e.aliases WHEN entity.aliases IS NOT NULL AND size(entity.aliases) > 0 THEN CASE WHEN e.aliases IS NULL THEN entity.aliases diff --git a/api/app/services/emotion_analytics_service.py b/api/app/services/emotion_analytics_service.py index c226348e..9a215cd6 100644 --- a/api/app/services/emotion_analytics_service.py +++ b/api/app/services/emotion_analytics_service.py @@ -679,9 +679,9 @@ class EmotionAnalyticsService: # 查询用户的实体和标签 query = """ - MATCH (e:Entity) + MATCH (e:ExtractedEntity) WHERE e.end_user_id = $end_user_id - RETURN e.name as name, e.type as type + RETURN e.name as name, e.entity_type as type ORDER BY e.created_at DESC LIMIT 20 """ From 49e0801d1582c08bc7d746edd20d8314abc8bf29 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Tue, 14 Apr 2026 18:06:56 +0800 Subject: [PATCH 2/2] refactor(memory): unify user placeholder names and harden alias sync logic - Replace hardcoded user placeholder name lists in write_tools and user_memory_service with shared _USER_PLACEHOLDER_NAMES constant - Filter user placeholder names during alias merging in _merge_attribute to prevent cross-role alias contamination on non-user entities - Use toLower() in Cypher query for case-insensitive name matching - Change PgSQL->Neo4j alias sync condition from 'if pg_aliases' to 'if info is not None' so empty aliases correctly clear stale data --- api/app/core/memory/agent/utils/write_tools.py | 8 ++++++-- .../deduplication/deduped_and_disamb.py | 9 ++++++--- api/app/services/user_memory_service.py | 3 ++- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index 5e51beba..3b0ea1ee 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -14,6 +14,7 @@ from dotenv import load_dotenv from app.core.logging_config import get_agent_logger from app.core.memory.agent.utils.get_dialogs import get_chunked_dialogs +from app.core.memory.storage_services.extraction_engine.deduplication.deduped_and_disamb import _USER_PLACEHOLDER_NAMES from app.core.memory.storage_services.extraction_engine.extraction_orchestrator import ExtractionOrchestrator from app.core.memory.storage_services.extraction_engine.knowledge_extraction.memory_summary import \ memory_summary_generation @@ -201,14 +202,17 @@ async def write( with get_db_context() as db_session: info = EndUserInfoRepository(db_session).get_by_end_user_id(uuid.UUID(end_user_id)) pg_aliases = info.aliases if info and info.aliases else [] - if pg_aliases: + if info is not None: + # 将 Python 侧占位名集合作为参数传入,避免 Cypher 硬编码 + placeholder_names = list(_USER_PLACEHOLDER_NAMES) await neo4j_connector.execute_query( """ MATCH (e:ExtractedEntity) - WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '我', 'User', 'I'] + WHERE e.end_user_id = $end_user_id AND toLower(e.name) IN $placeholder_names SET e.aliases = $aliases """, end_user_id=end_user_id, aliases=pg_aliases, + placeholder_names=placeholder_names, ) logger.info(f"[AliasSync] Neo4j 用户实体 aliases 已用 PgSQL 权威源覆盖: {pg_aliases}") except Exception as sync_err: diff --git a/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py b/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py index 8f659a27..715f190c 100644 --- a/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py +++ b/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py @@ -88,11 +88,14 @@ def _merge_attribute(canonical: ExtractedEntityNode, ent: ExtractedEntityNode): if canonical_name.lower() not in _USER_PLACEHOLDER_NAMES: incoming_name = (getattr(ent, "name", "") or "").strip() - # 收集所有需要合并的别名 + # 收集所有需要合并的别名,过滤掉用户占位名避免污染非用户实体 all_aliases = list(getattr(canonical, "aliases", []) or []) - if incoming_name and incoming_name != canonical_name: + if incoming_name and incoming_name != canonical_name and incoming_name.lower() not in _USER_PLACEHOLDER_NAMES: all_aliases.append(incoming_name) - all_aliases.extend(getattr(ent, "aliases", []) or []) + all_aliases.extend( + a for a in (getattr(ent, "aliases", []) or []) + if a and a.strip().lower() not in _USER_PLACEHOLDER_NAMES + ) try: from app.core.memory.utils.alias_utils import normalize_aliases diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index cc18447e..9389ecfa 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -14,6 +14,7 @@ from pydantic import BaseModel, Field from sqlalchemy.orm import Session from app.core.logging_config import get_logger +from app.core.memory.storage_services.extraction_engine.deduplication.deduped_and_disamb import _USER_PLACEHOLDER_NAMES from app.core.memory.utils.llm.llm_utils import MemoryClientFactory from app.db import get_db_context from app.repositories.conversation_repository import ConversationRepository @@ -473,7 +474,7 @@ class UserMemoryService: allowed_fields = {'other_name', 'aliases', 'meta_data'} # 用户占位名称黑名单,不允许作为 other_name 或出现在 aliases 中 - _user_placeholder_names = {'用户', '我', 'User', 'I'} + _user_placeholder_names = _USER_PLACEHOLDER_NAMES # 过滤 other_name:不允许设置为占位名称 if 'other_name' in update_data and update_data['other_name'] and update_data['other_name'].strip() in _user_placeholder_names: