feat(memory): add alias invalidation support for entity alias management

Introduce the `别名失效` predicate to handle cases where an alias is
explicitly no longer applicable to an entity.

Changes:
- write_pipeline.py: extend _merge_alias_in_memory to process
  `别名失效` edges — removes invalidated alias names from target
  entity's aliases list in-memory before Neo4j write
- cypher_queries.py: add REMOVE_INVALID_ALIASES and DELETE_ALIAS_NODES
  queries; update REDIRECT_ALIAS_EDGES to handle both `别名属于` and
  `别名失效` predicates
- tasks.py: add step 1.5 in post_store_dedup_and_alias_merge_task to
  execute REMOVE_INVALID_ALIASES and sync removals to PostgreSQL;
  add step 3 to delete alias nodes after edge redirection; add
  snapshot step 3.5 for post-merge entity state; pass snapshot_dir
  to the task
- end_user_info_repository.py: add remove_aliases() method to remove
  specified aliases from end_user_info.aliases (case-insensitive)
- write_snapshot_recorder.py: add save_alias_merge_result() static
  method to write stage 8 snapshot after alias merge and deletion
- extract_triplet.jinja2: document `别名失效` predicate with usage
  rules — only use when conversation explicitly negates an alias
This commit is contained in:
lanceyq
2026-05-07 20:07:53 +08:00
parent e3ab19dd4f
commit aa9eb66668
6 changed files with 255 additions and 23 deletions

View File

@@ -455,16 +455,18 @@ class WritePipeline:
# ──────────────────────────────────────────────
def _merge_alias_in_memory(self, result: ExtractionResult) -> None:
"""别名归并(内存侧):处理 predicate="别名属于" 的边。
"""别名归并(内存侧):处理 predicate="别名属于" 和 predicate="别名失效" 的边。
在写入 Neo4j 之前执行,确保写入的数据已经完成别名归并:
- 将别名实体的 name 追加到目标实体的 aliases
- 将别名实体的 description 拼接到目标实体的 description
- 别名属于:将别名实体的 name 追加到目标实体的 aliases
- 别名属于:将别名实体的 description 拼接到目标实体的 description
- 别名失效:从目标实体的 aliases 中移除对应的旧别名
- 重定向指向别名节点的边到目标节点
纯内存操作,不涉及 Neo4j。
"""
ALIAS_PREDICATE = "别名属于"
ALIAS_INVALID_PREDICATE = "别名失效"
alias_edges = [
e
@@ -472,15 +474,22 @@ class WritePipeline:
if getattr(e, "relation_type", "") == ALIAS_PREDICATE
or getattr(e, "predicate", "") == ALIAS_PREDICATE
]
invalid_alias_edges = [
e
for e in result.entity_entity_edges
if getattr(e, "relation_type", "") == ALIAS_INVALID_PREDICATE
or getattr(e, "predicate", "") == ALIAS_INVALID_PREDICATE
]
if not alias_edges:
logger.debug("[AliasMerge] 无 '别名属于' 关系,跳过")
if not alias_edges and not invalid_alias_edges:
logger.debug("[AliasMerge] 无 '别名属于'/'别名失效' 关系,跳过")
return
try:
entity_map = {e.id: e for e in result.entity_nodes}
alias_to_target: dict[str, str] = {}
# ── 处理 别名属于:追加 aliases ──
for edge in alias_edges:
source_node = entity_map.get(edge.source)
target_node = entity_map.get(edge.target)
@@ -507,29 +516,52 @@ class WritePipeline:
f"{tgt_desc}{src_desc}" if tgt_desc else src_desc
)
# ── 处理 别名失效:从 aliases 中移除旧别名 ──
invalid_alias_to_target: dict[str, str] = {}
for edge in invalid_alias_edges:
source_node = entity_map.get(edge.source)
target_node = entity_map.get(edge.target)
if not source_node or not target_node:
continue
invalid_alias_to_target[edge.source] = edge.target
# 从 target.aliases 中移除 source.name忽略大小写
invalid_name = (source_node.name or "").strip()
if invalid_name and target_node.aliases:
target_node.aliases = [
a for a in target_node.aliases
if a.lower() != invalid_name.lower()
]
logger.debug(
f"[AliasMerge] 从 '{target_node.name}' 的 aliases 中移除失效别名 '{invalid_name}'"
)
# 重定向指向别名节点的边到目标节点
alias_ids = set(alias_to_target.keys())
alias_ids = set(alias_to_target.keys()) | set(invalid_alias_to_target.keys())
all_alias_map = {**alias_to_target, **invalid_alias_to_target}
redirected_ee_count = 0
redirected_se_count = 0
for edge in result.entity_entity_edges:
rel_type = getattr(edge, "relation_type", "")
if rel_type == ALIAS_PREDICATE:
if rel_type in (ALIAS_PREDICATE, ALIAS_INVALID_PREDICATE):
continue
if edge.source in alias_ids:
edge.source = alias_to_target[edge.source]
edge.source = all_alias_map[edge.source]
redirected_ee_count += 1
if edge.target in alias_ids:
edge.target = alias_to_target[edge.target]
edge.target = all_alias_map[edge.target]
redirected_ee_count += 1
for edge in result.stmt_entity_edges:
if edge.target in alias_ids:
edge.target = alias_to_target[edge.target]
edge.target = all_alias_map[edge.target]
redirected_se_count += 1
logger.info(
f"[AliasMerge] 内存归并完成,处理 {len(alias_edges)}'别名属于' 边,"
f"{len(invalid_alias_edges)}'别名失效' 边,"
f"重定向 entity_entity 边 {redirected_ee_count} 次,"
f"重定向 stmt_entity 边 {redirected_se_count}"
)
@@ -574,6 +606,7 @@ class WritePipeline:
"end_user_id": self.end_user_id,
"entity_ids": [e.id for e in result.entity_nodes],
"llm_model_id": llm_model_id,
"snapshot_dir": snapshot_dir,
},
)

View File

@@ -125,6 +125,44 @@ class WriteSnapshotRecorder:
},
)
# ── Stage 8: 别名归并后(异步,由 Celery PostStore 任务写入) ──
@staticmethod
def save_alias_merge_result(snapshot_dir: str, entity_rows: List[Dict]) -> None:
"""将别名归并+节点删除后的 Neo4j 实体状态写入 8_after_alias_merge.json。
由 Celery post_store_dedup_and_alias_merge 任务在完成归并和删除后调用,
直接写入已有的 snapshot 目录,无需重建 WriteSnapshotRecorder 实例。
Args:
snapshot_dir: 主流水线创建的 snapshot 目录绝对路径。
entity_rows: 从 Neo4j 查询到的实体属性列表,每项包含
id / name / entity_type / description / aliases 字段。
"""
import json
from pathlib import Path
try:
path = Path(snapshot_dir) / "8_after_alias_merge.json"
data = {
"entity_nodes": [
{
"id": row.get("id"),
"name": row.get("name"),
"entity_type": row.get("entity_type"),
"description": row.get("description"),
"aliases": row.get("aliases", []),
}
for row in entity_rows
],
"entity_count": len(entity_rows),
}
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2, default=str)
logger.debug(f"[Snapshot] 8_after_alias_merge → {path}")
except Exception as e:
logger.warning(f"[Snapshot] 保存 8_after_alias_merge 失败: {e}")
# ── Stage 0: 汇总 ──
def record_summary(self, stats: Dict[str, int]) -> None:

View File

@@ -234,11 +234,12 @@ Each relation class now keeps only one canonical `covered_predicates` value. Onc
{% endif %}
- `命名关系`
- definition: 表达实体名称、别名、称呼之间的对应关系。
- covered_predicates: `别名属于`
- positive_examples: `山哥 -> 别名属于 -> 用户`、`多多 -> 别名属于 -> 用户的小狗`
- definition: 表达实体名称、别名、称呼之间的对应关系,包括新增别名和别名失效两种子语义
- covered_predicates: `别名属于`、`别名失效`
- positive_examples (`别名属于`): `山哥 -> 别名属于 -> 用户`、`多多 -> 别名属于 -> 用户的小狗`
- positive_examples (`别名失效`): `老陈 -> 别名失效 -> 用户`(当对话明确表达"不再叫X了"时使用)
- negative_examples: `导师 -> 别名属于 -> 用户`、`好人 -> 别名属于 -> 用户`
- notes: 只处理名字性表达,不处理角色、职业、评价词。
- notes: 只处理名字性表达,不处理角色、职业、评价词。`别名失效` 仅在对话**明确**表达某个别名已不再适用时使用,不要因为出现新别名就自动推断旧别名失效。
- status: `enabled`
- `归属身份关系`
@@ -341,6 +342,7 @@ Each relation class now keeps only one canonical `covered_predicates` value. Onc
只能使用以下中文关系类型。如果没有完全匹配的关系,请选择最接近的一项,不要发明新关系。
- `别名属于`: 别名指向其对应的规范实体
- `别名失效`: 某个别名已不再适用于该实体(仅在对话明确表达"不再叫X"时使用)
- `属于类型`: 实体属于某种类别、身份、职业、角色或归属对象
- `位于`: 实体位于某地点、场所或空间位置
- `前往`: 主体前往某个地点、场所、组织或活动对象
@@ -477,7 +479,7 @@ Each relation class now keeps only one canonical `covered_predicates` value. Onc
- Do not use ordinary time expressions as triplet objects.
{% endif %}
**Alias Relation (`别名属于`):**
**Alias Relation (`别名属于` / `别名失效`):**
{% if language == "zh" %}
- 当多个名字明确指向同一实体时,使用 `别名属于`。
@@ -488,6 +490,8 @@ Each relation class now keeps only one canonical `covered_predicates` value. Onc
- 如果所有格同时明确表达持有关系,也应抽取 `X -> 拥有 -> X 的 Y`。
- 在用户自指场景中,规范实体应为已经规范化后的 `用户`。
- 不要把角色、职业、身份、类别、夸赞、评价或其他非名字性描述抽成 `别名属于`。
- 当对话**明确**表达某个别名已不再适用时(如"不叫X了""X这个称呼已经不用了"),使用 `别名失效`,方向为 `旧别名 -> 别名失效 -> canonical entity`。
- 不要因为出现新别名就自动推断旧别名失效;只有对话中有明确的否定/废弃表达时才使用 `别名失效`。
{% else %}
- Use `别名属于` when multiple names clearly refer to the same entity.
- Direction is always `alias -> 别名属于 -> canonical entity`.
@@ -497,6 +501,8 @@ Each relation class now keeps only one canonical `covered_predicates` value. Onc
- If the possessive phrase also explicitly expresses possession, also extract `X -> 拥有 -> X's Y`.
- In user self-reference cases, the canonical entity should be the normalized user entity `用户`.
- Do not use `别名属于` for roles, occupations, identities, categories, compliments, evaluations, or other non-name descriptions.
- Use `别名失效` when the conversation **explicitly** states that an alias is no longer applicable (e.g., "no longer called X", "X is not used anymore"). Direction is `old_alias -> 别名失效 -> canonical entity`.
- Do not infer alias invalidation just because a new alias appears; only use `别名失效` when there is an explicit negation or abandonment expression in the conversation.
{% endif %}
**subject_name / object_name Consistency:**

View File

@@ -138,3 +138,41 @@ class EndUserInfoRepository:
f"aliases_count={len(end_user_info.aliases or [])}"
)
return end_user_info
def remove_aliases(
self,
end_user_id: uuid.UUID,
aliases_to_remove: List[str],
) -> Optional["EndUserInfo"]:
"""从用户别名列表中移除指定别名(忽略大小写)。
Args:
end_user_id: 终端用户 ID
aliases_to_remove: 需要移除的别名列表
Returns:
更新后的 EndUserInfo若记录不存在则返回 None
"""
if not aliases_to_remove:
return self.get_by_end_user_id(end_user_id)
end_user_info = self.get_by_end_user_id(end_user_id)
if not end_user_info:
logger.warning(f"[EndUserInfo] 记录不存在,跳过别名移除: end_user_id={end_user_id}")
return None
remove_lower = {a.strip().lower() for a in aliases_to_remove if a.strip()}
existing = list(end_user_info.aliases or [])
new_aliases = [a for a in existing if a.lower() not in remove_lower]
if len(new_aliases) == len(existing):
return end_user_info
end_user_info.aliases = new_aliases
self.db.commit()
self.db.refresh(end_user_info)
logger.info(
f"[EndUserInfo] 别名移除完成: end_user_id={end_user_id}, "
f"removed={aliases_to_remove}, remaining={new_aliases}"
)
return end_user_info

View File

@@ -1379,16 +1379,16 @@ RETURN source.name AS merged_alias, target.name AS target_name, new_aliases AS u
# 2. STATEMENT_ENTITY陈述句 → 别名节点
# 对于每条需要重定向的边,创建一条指向用户节点的新边(复制所有属性),然后删除旧边。
REDIRECT_ALIAS_EDGES = """
// 找到所有 别名→用户 的映射
// 找到所有 别名→用户 的映射(包含 别名属于 和 别名失效 两种 predicate
MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[ar:EXTRACTED_RELATIONSHIP]->(user:ExtractedEntity {end_user_id: $end_user_id})
WHERE ar.predicate = '别名属于'
WHERE ar.predicate IN ['别名属于', '别名失效']
WITH collect({alias_id: elementId(alias), user_id: elementId(user), alias_eid: alias.id, user_eid: user.id}) AS mappings
// 1. 重定向 EXTRACTED_RELATIONSHIP 边:别名节点作为 target 的情况
UNWIND mappings AS m
MATCH (other)-[r:EXTRACTED_RELATIONSHIP]->(alias:ExtractedEntity {end_user_id: $end_user_id})
WHERE alias.id = m.alias_eid
AND r.predicate <> '别名属于'
AND NOT (r.predicate IN ['别名属于', '别名失效'])
AND other.id <> m.user_eid
WITH m, other, r, alias
MATCH (user:ExtractedEntity {id: m.user_eid, end_user_id: $end_user_id})
@@ -1399,10 +1399,10 @@ WITH count(*) AS redirected_incoming
// 2. 重定向 EXTRACTED_RELATIONSHIP 边:别名节点作为 source 的情况
MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[ar2:EXTRACTED_RELATIONSHIP]->(user2:ExtractedEntity {end_user_id: $end_user_id})
WHERE ar2.predicate = '别名属于'
WHERE ar2.predicate IN ['别名属于', '别名失效']
WITH alias, user2, redirected_incoming
MATCH (alias)-[r:EXTRACTED_RELATIONSHIP]->(other)
WHERE r.predicate <> '别名属于'
WHERE NOT (r.predicate IN ['别名属于', '别名失效'])
AND other.id <> user2.id
WITH user2, other, r, redirected_incoming
CREATE (user2)-[nr:EXTRACTED_RELATIONSHIP]->(other)
@@ -1412,7 +1412,7 @@ WITH redirected_incoming, count(*) AS redirected_outgoing
// 3. 重定向 STATEMENT_ENTITY 边:陈述句 → 别名节点
MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[ar3:EXTRACTED_RELATIONSHIP]->(user3:ExtractedEntity {end_user_id: $end_user_id})
WHERE ar3.predicate = '别名属于'
WHERE ar3.predicate IN ['别名属于', '别名失效']
WITH alias, user3, redirected_incoming, redirected_outgoing
MATCH (stmt)-[r:STATEMENT_ENTITY]->(alias)
WITH user3, stmt, r, redirected_incoming, redirected_outgoing
@@ -1423,6 +1423,32 @@ DELETE r
RETURN redirected_incoming, redirected_outgoing, count(*) AS redirected_stmt
"""
# 删除别名节点:在别名归并和边重定向完成后,删除所有 predicate="别名属于" 关系的 source 节点。
# 此时这些节点的其他边已被 REDIRECT_ALIAS_EDGES 重定向完毕,
# 唯一剩余的边就是 (alias)-[:EXTRACTED_RELATIONSHIP {predicate:'别名属于'}]->(user)
# 使用 DETACH DELETE 一并删除节点和该关系。
DELETE_ALIAS_NODES = """
MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[r:EXTRACTED_RELATIONSHIP]->(user:ExtractedEntity {end_user_id: $end_user_id})
WHERE r.predicate IN ['别名属于', '别名失效']
WITH alias, count(r) AS rel_count
DETACH DELETE alias
RETURN count(alias) AS deleted_count
"""
# 失效别名处理:将 predicate="别名失效" 的 source.name 从 target.aliases 中移除。
# 在 MERGE_ALIAS_BELONGS_TO追加新别名之后、DELETE_ALIAS_NODES删除节点之前执行。
REMOVE_INVALID_ALIASES = """
MATCH (source:ExtractedEntity {end_user_id: $end_user_id})-[r:EXTRACTED_RELATIONSHIP]->(target:ExtractedEntity {end_user_id: $end_user_id})
WHERE r.predicate = '别名失效'
WITH source, target,
coalesce(target.aliases, []) AS existing_aliases,
source.name AS invalid_name
SET target.aliases = [a IN existing_aliases WHERE toLower(a) <> toLower(invalid_name)]
RETURN source.name AS removed_alias, target.name AS target_name
"""
CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING = """
MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id})
RETURN (

View File

@@ -1582,6 +1582,7 @@ def post_store_dedup_and_alias_merge_task(
end_user_id: str,
entity_ids: List[str],
llm_model_id: Optional[str] = None,
snapshot_dir: Optional[str] = None,
) -> Dict[str, Any]:
"""Celery task: 写入后异步执行 Neo4j 别名归并 + 第二层去重。
@@ -1608,13 +1609,15 @@ def post_store_dedup_and_alias_merge_task(
from app.repositories.neo4j.cypher_queries import (
MERGE_ALIAS_BELONGS_TO,
REDIRECT_ALIAS_EDGES,
DELETE_ALIAS_NODES,
REMOVE_INVALID_ALIASES,
)
connector = Neo4jConnector()
result_info: Dict[str, Any] = {}
try:
# ── 1. Neo4j 别名归并 ──
# ── 1. Neo4j 别名归并(追加新别名) ──
try:
records = await connector.execute_query(
MERGE_ALIAS_BELONGS_TO,
@@ -1627,6 +1630,28 @@ def post_store_dedup_and_alias_merge_task(
logger.warning(f"[PostStore] Neo4j 别名归并失败: {e}")
result_info["alias_merge_error"] = str(e)
# ── 1.5 Neo4j 失效别名移除(从 aliases 中删除旧别名) ──
try:
invalid_records = await connector.execute_query(
REMOVE_INVALID_ALIASES,
end_user_id=end_user_id,
)
invalid_count = len(invalid_records) if invalid_records else 0
result_info["invalid_aliases_removed"] = invalid_count
logger.info(f"[PostStore] 失效别名移除完成,影响 {invalid_count} 条记录")
# 同步删除 PostgreSQL end_user_info.aliases 中的失效别名
if invalid_records:
removed_names = [
r.get("removed_alias") for r in invalid_records
if r.get("removed_alias")
]
if removed_names:
_remove_invalid_aliases_pg(end_user_id, removed_names)
except Exception as e:
logger.warning(f"[PostStore] 失效别名移除失败: {e}")
result_info["invalid_alias_error"] = str(e)
# ── 2. Neo4j 边重定向 ──
try:
redirect_records = await connector.execute_query(
@@ -1640,7 +1665,41 @@ def post_store_dedup_and_alias_merge_task(
logger.warning(f"[PostStore] Neo4j 边重定向失败: {e}")
result_info["redirect_error"] = str(e)
# ── 3. 第二层去重(与 Neo4j 已有实体联合去重) ──
# ── 3. 删除别名节点及"别名属于"关系 ──
try:
delete_records = await connector.execute_query(
DELETE_ALIAS_NODES,
end_user_id=end_user_id,
)
deleted_count = delete_records[0].get("deleted_count", 0) if delete_records else 0
result_info["alias_nodes_deleted"] = deleted_count
logger.info(f"[PostStore] 别名节点删除完成,删除 {deleted_count} 个节点")
except Exception as e:
logger.warning(f"[PostStore] 别名节点删除失败: {e}")
result_info["alias_delete_error"] = str(e)
# ── 3.5 Snapshot: 别名归并+删除后的实体状态 ──
if snapshot_dir:
try:
snapshot_query = """
UNWIND $entity_ids AS eid
MATCH (e:ExtractedEntity {id: eid})
RETURN e.id AS id, e.name AS name,
e.entity_type AS entity_type,
e.description AS description,
coalesce(e.aliases, []) AS aliases
"""
snap_records = await connector.execute_query(
snapshot_query, entity_ids=entity_ids
)
entity_rows = [dict(r) for r in snap_records] if snap_records else []
from app.core.memory.utils.debug.write_snapshot_recorder import WriteSnapshotRecorder
WriteSnapshotRecorder.save_alias_merge_result(snapshot_dir, entity_rows)
logger.info(f"[PostStore] Snapshot 8_after_alias_merge 已写入,实体数={len(entity_rows)}")
except Exception as e:
logger.warning(f"[PostStore] Snapshot 写入失败(不影响主流程): {e}")
# ── 4. 第二层去重(与 Neo4j 已有实体联合去重) ──
try:
from app.core.memory.storage_services.extraction_engine.deduplication.second_layer_dedup import (
second_layer_dedup_and_merge_with_neo4j,
@@ -1811,6 +1870,38 @@ def _sync_end_user_info_pg(
)
def _remove_invalid_aliases_pg(
end_user_id: str,
aliases_to_remove: List[str],
) -> None:
"""将失效别名从 PostgreSQL end_user_info.aliases 中移除。
失败只记日志,不抛异常,不影响主流程。
"""
try:
import uuid as _uuid
from app.db import get_db_context
from app.repositories.end_user_info_repository import EndUserInfoRepository
eu_uuid = _uuid.UUID(end_user_id)
with get_db_context() as db:
info_repo = EndUserInfoRepository(db)
info_repo.remove_aliases(
end_user_id=eu_uuid,
aliases_to_remove=aliases_to_remove,
)
logger.info(
f"[PostStore][PG] 失效别名已从 end_user_info 移除: "
f"end_user_id={end_user_id}, removed={aliases_to_remove}"
)
except Exception as e:
logger.warning(
f"[PostStore][PG] 移除失效别名失败(不影响主流程): "
f"end_user_id={end_user_id}, error={e}",
exc_info=True,
)
@celery_app.task(
bind=True,
name="app.tasks.extract_metadata_batch",