diff --git a/api/app/core/memory/pipelines/write_pipeline.py b/api/app/core/memory/pipelines/write_pipeline.py index e24eef6d..e57ff177 100644 --- a/api/app/core/memory/pipelines/write_pipeline.py +++ b/api/app/core/memory/pipelines/write_pipeline.py @@ -455,16 +455,18 @@ class WritePipeline: # ────────────────────────────────────────────── def _merge_alias_in_memory(self, result: ExtractionResult) -> None: - """别名归并(内存侧):处理 predicate="别名属于" 的边。 + """别名归并(内存侧):处理 predicate="别名属于" 和 predicate="别名失效" 的边。 在写入 Neo4j 之前执行,确保写入的数据已经完成别名归并: - - 将别名实体的 name 追加到目标实体的 aliases - - 将别名实体的 description 拼接到目标实体的 description + - 别名属于:将别名实体的 name 追加到目标实体的 aliases + - 别名属于:将别名实体的 description 拼接到目标实体的 description + - 别名失效:从目标实体的 aliases 中移除对应的旧别名 - 重定向指向别名节点的边到目标节点 纯内存操作,不涉及 Neo4j。 """ ALIAS_PREDICATE = "别名属于" + ALIAS_INVALID_PREDICATE = "别名失效" alias_edges = [ e @@ -472,15 +474,22 @@ class WritePipeline: if getattr(e, "relation_type", "") == ALIAS_PREDICATE or getattr(e, "predicate", "") == ALIAS_PREDICATE ] + invalid_alias_edges = [ + e + for e in result.entity_entity_edges + if getattr(e, "relation_type", "") == ALIAS_INVALID_PREDICATE + or getattr(e, "predicate", "") == ALIAS_INVALID_PREDICATE + ] - if not alias_edges: - logger.debug("[AliasMerge] 无 '别名属于' 关系,跳过") + if not alias_edges and not invalid_alias_edges: + logger.debug("[AliasMerge] 无 '别名属于'/'别名失效' 关系,跳过") return try: entity_map = {e.id: e for e in result.entity_nodes} alias_to_target: dict[str, str] = {} + # ── 处理 别名属于:追加 aliases ── for edge in alias_edges: source_node = entity_map.get(edge.source) target_node = entity_map.get(edge.target) @@ -507,29 +516,52 @@ class WritePipeline: f"{tgt_desc};{src_desc}" if tgt_desc else src_desc ) + # ── 处理 别名失效:从 aliases 中移除旧别名 ── + invalid_alias_to_target: dict[str, str] = {} + for edge in invalid_alias_edges: + source_node = entity_map.get(edge.source) + target_node = entity_map.get(edge.target) + if not source_node or not target_node: + continue + + invalid_alias_to_target[edge.source] = edge.target + + # 从 target.aliases 中移除 source.name(忽略大小写) + invalid_name = (source_node.name or "").strip() + if invalid_name and target_node.aliases: + target_node.aliases = [ + a for a in target_node.aliases + if a.lower() != invalid_name.lower() + ] + logger.debug( + f"[AliasMerge] 从 '{target_node.name}' 的 aliases 中移除失效别名 '{invalid_name}'" + ) + # 重定向指向别名节点的边到目标节点 - alias_ids = set(alias_to_target.keys()) + alias_ids = set(alias_to_target.keys()) | set(invalid_alias_to_target.keys()) + all_alias_map = {**alias_to_target, **invalid_alias_to_target} redirected_ee_count = 0 redirected_se_count = 0 for edge in result.entity_entity_edges: rel_type = getattr(edge, "relation_type", "") - if rel_type == ALIAS_PREDICATE: + if rel_type in (ALIAS_PREDICATE, ALIAS_INVALID_PREDICATE): continue if edge.source in alias_ids: - edge.source = alias_to_target[edge.source] + edge.source = all_alias_map[edge.source] redirected_ee_count += 1 if edge.target in alias_ids: - edge.target = alias_to_target[edge.target] + edge.target = all_alias_map[edge.target] redirected_ee_count += 1 for edge in result.stmt_entity_edges: if edge.target in alias_ids: - edge.target = alias_to_target[edge.target] + edge.target = all_alias_map[edge.target] redirected_se_count += 1 logger.info( f"[AliasMerge] 内存归并完成,处理 {len(alias_edges)} 条 '别名属于' 边," + f"{len(invalid_alias_edges)} 条 '别名失效' 边," f"重定向 entity_entity 边 {redirected_ee_count} 次," f"重定向 stmt_entity 边 {redirected_se_count} 次" ) @@ -574,6 +606,7 @@ class WritePipeline: "end_user_id": self.end_user_id, "entity_ids": [e.id for e in result.entity_nodes], "llm_model_id": llm_model_id, + "snapshot_dir": snapshot_dir, }, ) diff --git a/api/app/core/memory/utils/debug/write_snapshot_recorder.py b/api/app/core/memory/utils/debug/write_snapshot_recorder.py index 79f010f9..6b3bac19 100644 --- a/api/app/core/memory/utils/debug/write_snapshot_recorder.py +++ b/api/app/core/memory/utils/debug/write_snapshot_recorder.py @@ -125,6 +125,44 @@ class WriteSnapshotRecorder: }, ) + # ── Stage 8: 别名归并后(异步,由 Celery PostStore 任务写入) ── + + @staticmethod + def save_alias_merge_result(snapshot_dir: str, entity_rows: List[Dict]) -> None: + """将别名归并+节点删除后的 Neo4j 实体状态写入 8_after_alias_merge.json。 + + 由 Celery post_store_dedup_and_alias_merge 任务在完成归并和删除后调用, + 直接写入已有的 snapshot 目录,无需重建 WriteSnapshotRecorder 实例。 + + Args: + snapshot_dir: 主流水线创建的 snapshot 目录绝对路径。 + entity_rows: 从 Neo4j 查询到的实体属性列表,每项包含 + id / name / entity_type / description / aliases 字段。 + """ + import json + from pathlib import Path + + try: + path = Path(snapshot_dir) / "8_after_alias_merge.json" + data = { + "entity_nodes": [ + { + "id": row.get("id"), + "name": row.get("name"), + "entity_type": row.get("entity_type"), + "description": row.get("description"), + "aliases": row.get("aliases", []), + } + for row in entity_rows + ], + "entity_count": len(entity_rows), + } + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2, default=str) + logger.debug(f"[Snapshot] 8_after_alias_merge → {path}") + except Exception as e: + logger.warning(f"[Snapshot] 保存 8_after_alias_merge 失败: {e}") + # ── Stage 0: 汇总 ── def record_summary(self, stats: Dict[str, int]) -> None: diff --git a/api/app/core/memory/utils/prompt/prompts/extract_triplet.jinja2 b/api/app/core/memory/utils/prompt/prompts/extract_triplet.jinja2 index 5cf8ccb0..f24f9328 100644 --- a/api/app/core/memory/utils/prompt/prompts/extract_triplet.jinja2 +++ b/api/app/core/memory/utils/prompt/prompts/extract_triplet.jinja2 @@ -234,11 +234,12 @@ Each relation class now keeps only one canonical `covered_predicates` value. Onc {% endif %} - `命名关系` - - definition: 表达实体名称、别名、称呼之间的对应关系。 - - covered_predicates: `别名属于` - - positive_examples: `山哥 -> 别名属于 -> 用户`、`多多 -> 别名属于 -> 用户的小狗` + - definition: 表达实体名称、别名、称呼之间的对应关系,包括新增别名和别名失效两种子语义。 + - covered_predicates: `别名属于`、`别名失效` + - positive_examples (`别名属于`): `山哥 -> 别名属于 -> 用户`、`多多 -> 别名属于 -> 用户的小狗` + - positive_examples (`别名失效`): `老陈 -> 别名失效 -> 用户`(当对话明确表达"不再叫X了"时使用) - negative_examples: `导师 -> 别名属于 -> 用户`、`好人 -> 别名属于 -> 用户` - - notes: 只处理名字性表达,不处理角色、职业、评价词。 + - notes: 只处理名字性表达,不处理角色、职业、评价词。`别名失效` 仅在对话**明确**表达某个别名已不再适用时使用,不要因为出现新别名就自动推断旧别名失效。 - status: `enabled` - `归属身份关系` @@ -341,6 +342,7 @@ Each relation class now keeps only one canonical `covered_predicates` value. Onc 只能使用以下中文关系类型。如果没有完全匹配的关系,请选择最接近的一项,不要发明新关系。 - `别名属于`: 别名指向其对应的规范实体 +- `别名失效`: 某个别名已不再适用于该实体(仅在对话明确表达"不再叫X"时使用) - `属于类型`: 实体属于某种类别、身份、职业、角色或归属对象 - `位于`: 实体位于某地点、场所或空间位置 - `前往`: 主体前往某个地点、场所、组织或活动对象 @@ -477,7 +479,7 @@ Each relation class now keeps only one canonical `covered_predicates` value. Onc - Do not use ordinary time expressions as triplet objects. {% endif %} -**Alias Relation (`别名属于`):** +**Alias Relation (`别名属于` / `别名失效`):** {% if language == "zh" %} - 当多个名字明确指向同一实体时,使用 `别名属于`。 @@ -488,6 +490,8 @@ Each relation class now keeps only one canonical `covered_predicates` value. Onc - 如果所有格同时明确表达持有关系,也应抽取 `X -> 拥有 -> X 的 Y`。 - 在用户自指场景中,规范实体应为已经规范化后的 `用户`。 - 不要把角色、职业、身份、类别、夸赞、评价或其他非名字性描述抽成 `别名属于`。 +- 当对话**明确**表达某个别名已不再适用时(如"不叫X了""X这个称呼已经不用了"),使用 `别名失效`,方向为 `旧别名 -> 别名失效 -> canonical entity`。 +- 不要因为出现新别名就自动推断旧别名失效;只有对话中有明确的否定/废弃表达时才使用 `别名失效`。 {% else %} - Use `别名属于` when multiple names clearly refer to the same entity. - Direction is always `alias -> 别名属于 -> canonical entity`. @@ -497,6 +501,8 @@ Each relation class now keeps only one canonical `covered_predicates` value. Onc - If the possessive phrase also explicitly expresses possession, also extract `X -> 拥有 -> X's Y`. - In user self-reference cases, the canonical entity should be the normalized user entity `用户`. - Do not use `别名属于` for roles, occupations, identities, categories, compliments, evaluations, or other non-name descriptions. +- Use `别名失效` when the conversation **explicitly** states that an alias is no longer applicable (e.g., "no longer called X", "X is not used anymore"). Direction is `old_alias -> 别名失效 -> canonical entity`. +- Do not infer alias invalidation just because a new alias appears; only use `别名失效` when there is an explicit negation or abandonment expression in the conversation. {% endif %} **subject_name / object_name Consistency:** diff --git a/api/app/repositories/end_user_info_repository.py b/api/app/repositories/end_user_info_repository.py index cda4212c..5517299d 100644 --- a/api/app/repositories/end_user_info_repository.py +++ b/api/app/repositories/end_user_info_repository.py @@ -138,3 +138,41 @@ class EndUserInfoRepository: f"aliases_count={len(end_user_info.aliases or [])}" ) return end_user_info + + def remove_aliases( + self, + end_user_id: uuid.UUID, + aliases_to_remove: List[str], + ) -> Optional["EndUserInfo"]: + """从用户别名列表中移除指定别名(忽略大小写)。 + + Args: + end_user_id: 终端用户 ID + aliases_to_remove: 需要移除的别名列表 + + Returns: + 更新后的 EndUserInfo,若记录不存在则返回 None + """ + if not aliases_to_remove: + return self.get_by_end_user_id(end_user_id) + + end_user_info = self.get_by_end_user_id(end_user_id) + if not end_user_info: + logger.warning(f"[EndUserInfo] 记录不存在,跳过别名移除: end_user_id={end_user_id}") + return None + + remove_lower = {a.strip().lower() for a in aliases_to_remove if a.strip()} + existing = list(end_user_info.aliases or []) + new_aliases = [a for a in existing if a.lower() not in remove_lower] + + if len(new_aliases) == len(existing): + return end_user_info + + end_user_info.aliases = new_aliases + self.db.commit() + self.db.refresh(end_user_info) + logger.info( + f"[EndUserInfo] 别名移除完成: end_user_id={end_user_id}, " + f"removed={aliases_to_remove}, remaining={new_aliases}" + ) + return end_user_info diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index d854e971..c2703b7d 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -1379,16 +1379,16 @@ RETURN source.name AS merged_alias, target.name AS target_name, new_aliases AS u # 2. STATEMENT_ENTITY:陈述句 → 别名节点 # 对于每条需要重定向的边,创建一条指向用户节点的新边(复制所有属性),然后删除旧边。 REDIRECT_ALIAS_EDGES = """ -// 找到所有 别名→用户 的映射 +// 找到所有 别名→用户 的映射(包含 别名属于 和 别名失效 两种 predicate) MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[ar:EXTRACTED_RELATIONSHIP]->(user:ExtractedEntity {end_user_id: $end_user_id}) -WHERE ar.predicate = '别名属于' +WHERE ar.predicate IN ['别名属于', '别名失效'] WITH collect({alias_id: elementId(alias), user_id: elementId(user), alias_eid: alias.id, user_eid: user.id}) AS mappings // 1. 重定向 EXTRACTED_RELATIONSHIP 边:别名节点作为 target 的情况 UNWIND mappings AS m MATCH (other)-[r:EXTRACTED_RELATIONSHIP]->(alias:ExtractedEntity {end_user_id: $end_user_id}) WHERE alias.id = m.alias_eid - AND r.predicate <> '别名属于' + AND NOT (r.predicate IN ['别名属于', '别名失效']) AND other.id <> m.user_eid WITH m, other, r, alias MATCH (user:ExtractedEntity {id: m.user_eid, end_user_id: $end_user_id}) @@ -1399,10 +1399,10 @@ WITH count(*) AS redirected_incoming // 2. 重定向 EXTRACTED_RELATIONSHIP 边:别名节点作为 source 的情况 MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[ar2:EXTRACTED_RELATIONSHIP]->(user2:ExtractedEntity {end_user_id: $end_user_id}) -WHERE ar2.predicate = '别名属于' +WHERE ar2.predicate IN ['别名属于', '别名失效'] WITH alias, user2, redirected_incoming MATCH (alias)-[r:EXTRACTED_RELATIONSHIP]->(other) -WHERE r.predicate <> '别名属于' +WHERE NOT (r.predicate IN ['别名属于', '别名失效']) AND other.id <> user2.id WITH user2, other, r, redirected_incoming CREATE (user2)-[nr:EXTRACTED_RELATIONSHIP]->(other) @@ -1412,7 +1412,7 @@ WITH redirected_incoming, count(*) AS redirected_outgoing // 3. 重定向 STATEMENT_ENTITY 边:陈述句 → 别名节点 MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[ar3:EXTRACTED_RELATIONSHIP]->(user3:ExtractedEntity {end_user_id: $end_user_id}) -WHERE ar3.predicate = '别名属于' +WHERE ar3.predicate IN ['别名属于', '别名失效'] WITH alias, user3, redirected_incoming, redirected_outgoing MATCH (stmt)-[r:STATEMENT_ENTITY]->(alias) WITH user3, stmt, r, redirected_incoming, redirected_outgoing @@ -1423,6 +1423,32 @@ DELETE r RETURN redirected_incoming, redirected_outgoing, count(*) AS redirected_stmt """ +# 删除别名节点:在别名归并和边重定向完成后,删除所有 predicate="别名属于" 关系的 source 节点。 +# 此时这些节点的其他边已被 REDIRECT_ALIAS_EDGES 重定向完毕, +# 唯一剩余的边就是 (alias)-[:EXTRACTED_RELATIONSHIP {predicate:'别名属于'}]->(user), +# 使用 DETACH DELETE 一并删除节点和该关系。 +DELETE_ALIAS_NODES = """ +MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[r:EXTRACTED_RELATIONSHIP]->(user:ExtractedEntity {end_user_id: $end_user_id}) +WHERE r.predicate IN ['别名属于', '别名失效'] +WITH alias, count(r) AS rel_count +DETACH DELETE alias +RETURN count(alias) AS deleted_count +""" + +# 失效别名处理:将 predicate="别名失效" 的 source.name 从 target.aliases 中移除。 +# 在 MERGE_ALIAS_BELONGS_TO(追加新别名)之后、DELETE_ALIAS_NODES(删除节点)之前执行。 +REMOVE_INVALID_ALIASES = """ +MATCH (source:ExtractedEntity {end_user_id: $end_user_id})-[r:EXTRACTED_RELATIONSHIP]->(target:ExtractedEntity {end_user_id: $end_user_id}) +WHERE r.predicate = '别名失效' +WITH source, target, + coalesce(target.aliases, []) AS existing_aliases, + source.name AS invalid_name + +SET target.aliases = [a IN existing_aliases WHERE toLower(a) <> toLower(invalid_name)] + +RETURN source.name AS removed_alias, target.name AS target_name +""" + CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING = """ MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) RETURN ( diff --git a/api/app/tasks.py b/api/app/tasks.py index 0fe1d4ea..5e4a7862 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1582,6 +1582,7 @@ def post_store_dedup_and_alias_merge_task( end_user_id: str, entity_ids: List[str], llm_model_id: Optional[str] = None, + snapshot_dir: Optional[str] = None, ) -> Dict[str, Any]: """Celery task: 写入后异步执行 Neo4j 别名归并 + 第二层去重。 @@ -1608,13 +1609,15 @@ def post_store_dedup_and_alias_merge_task( from app.repositories.neo4j.cypher_queries import ( MERGE_ALIAS_BELONGS_TO, REDIRECT_ALIAS_EDGES, + DELETE_ALIAS_NODES, + REMOVE_INVALID_ALIASES, ) connector = Neo4jConnector() result_info: Dict[str, Any] = {} try: - # ── 1. Neo4j 别名归并 ── + # ── 1. Neo4j 别名归并(追加新别名) ── try: records = await connector.execute_query( MERGE_ALIAS_BELONGS_TO, @@ -1627,6 +1630,28 @@ def post_store_dedup_and_alias_merge_task( logger.warning(f"[PostStore] Neo4j 别名归并失败: {e}") result_info["alias_merge_error"] = str(e) + # ── 1.5 Neo4j 失效别名移除(从 aliases 中删除旧别名) ── + try: + invalid_records = await connector.execute_query( + REMOVE_INVALID_ALIASES, + end_user_id=end_user_id, + ) + invalid_count = len(invalid_records) if invalid_records else 0 + result_info["invalid_aliases_removed"] = invalid_count + logger.info(f"[PostStore] 失效别名移除完成,影响 {invalid_count} 条记录") + + # 同步删除 PostgreSQL end_user_info.aliases 中的失效别名 + if invalid_records: + removed_names = [ + r.get("removed_alias") for r in invalid_records + if r.get("removed_alias") + ] + if removed_names: + _remove_invalid_aliases_pg(end_user_id, removed_names) + except Exception as e: + logger.warning(f"[PostStore] 失效别名移除失败: {e}") + result_info["invalid_alias_error"] = str(e) + # ── 2. Neo4j 边重定向 ── try: redirect_records = await connector.execute_query( @@ -1640,7 +1665,41 @@ def post_store_dedup_and_alias_merge_task( logger.warning(f"[PostStore] Neo4j 边重定向失败: {e}") result_info["redirect_error"] = str(e) - # ── 3. 第二层去重(与 Neo4j 已有实体联合去重) ── + # ── 3. 删除别名节点及"别名属于"关系 ── + try: + delete_records = await connector.execute_query( + DELETE_ALIAS_NODES, + end_user_id=end_user_id, + ) + deleted_count = delete_records[0].get("deleted_count", 0) if delete_records else 0 + result_info["alias_nodes_deleted"] = deleted_count + logger.info(f"[PostStore] 别名节点删除完成,删除 {deleted_count} 个节点") + except Exception as e: + logger.warning(f"[PostStore] 别名节点删除失败: {e}") + result_info["alias_delete_error"] = str(e) + + # ── 3.5 Snapshot: 别名归并+删除后的实体状态 ── + if snapshot_dir: + try: + snapshot_query = """ + UNWIND $entity_ids AS eid + MATCH (e:ExtractedEntity {id: eid}) + RETURN e.id AS id, e.name AS name, + e.entity_type AS entity_type, + e.description AS description, + coalesce(e.aliases, []) AS aliases + """ + snap_records = await connector.execute_query( + snapshot_query, entity_ids=entity_ids + ) + entity_rows = [dict(r) for r in snap_records] if snap_records else [] + from app.core.memory.utils.debug.write_snapshot_recorder import WriteSnapshotRecorder + WriteSnapshotRecorder.save_alias_merge_result(snapshot_dir, entity_rows) + logger.info(f"[PostStore] Snapshot 8_after_alias_merge 已写入,实体数={len(entity_rows)}") + except Exception as e: + logger.warning(f"[PostStore] Snapshot 写入失败(不影响主流程): {e}") + + # ── 4. 第二层去重(与 Neo4j 已有实体联合去重) ── try: from app.core.memory.storage_services.extraction_engine.deduplication.second_layer_dedup import ( second_layer_dedup_and_merge_with_neo4j, @@ -1811,6 +1870,38 @@ def _sync_end_user_info_pg( ) +def _remove_invalid_aliases_pg( + end_user_id: str, + aliases_to_remove: List[str], +) -> None: + """将失效别名从 PostgreSQL end_user_info.aliases 中移除。 + + 失败只记日志,不抛异常,不影响主流程。 + """ + try: + import uuid as _uuid + from app.db import get_db_context + from app.repositories.end_user_info_repository import EndUserInfoRepository + + eu_uuid = _uuid.UUID(end_user_id) + with get_db_context() as db: + info_repo = EndUserInfoRepository(db) + info_repo.remove_aliases( + end_user_id=eu_uuid, + aliases_to_remove=aliases_to_remove, + ) + logger.info( + f"[PostStore][PG] 失效别名已从 end_user_info 移除: " + f"end_user_id={end_user_id}, removed={aliases_to_remove}" + ) + except Exception as e: + logger.warning( + f"[PostStore][PG] 移除失效别名失败(不影响主流程): " + f"end_user_id={end_user_id}, error={e}", + exc_info=True, + ) + + @celery_app.task( bind=True, name="app.tasks.extract_metadata_batch",