feat(memory): unify alias extraction into metadata pipeline and deduplicate user entity nodes

- Merge alias add/remove into MetadataExtractionResponse and Celery metadata task,
  removing the separate sync step from extraction_orchestrator
- Replace first-person pronouns ("我") with "用户" in statement extraction to
  preserve identity semantics for downstream metadata/alias extraction
- Update extract_statement.jinja2 prompt to enforce "用户" as subject for user
  statements instead of resolving to real names
- Add alias change instructions (aliases_to_add/aliases_to_remove) to
  extract_user_metadata.jinja2 with incremental merge logic
- Deduplicate special entities ("用户", "AI助手") in graph_saver by reusing
  existing Neo4j node IDs per end_user_id
- Sync final aliases from PgSQL to Neo4j user entity nodes after metadata write
This commit is contained in:
lanceyq
2026-04-09 21:55:59 +08:00
parent e0546e01ef
commit 15a863b41a
8 changed files with 304 additions and 76 deletions

View File

@@ -38,3 +38,11 @@ class MetadataExtractionResponse(BaseModel):
"""元数据提取 LLM 响应结构"""
model_config = ConfigDict(extra='ignore')
user_metadata: UserMetadata = Field(default_factory=UserMetadata)
aliases_to_add: List[str] = Field(
default_factory=list,
description="本次新发现的用户别名(用户自我介绍或他人对用户的称呼)"
)
aliases_to_remove: List[str] = Field(
default_factory=list,
description="用户明确否认的别名(如'我不叫XX了'"
)

View File

@@ -311,9 +311,8 @@ class ExtractionOrchestrator:
dialog_data_list,
)
# 步骤 7: 同步用户别名到数据库表 + 触发异步元数据提取(仅正式模式)
# 步骤 7: 触发异步元数据和别名提取(仅正式模式)
if not is_pilot_run:
# 收集用户相关 statement 并触发异步元数据提取
try:
from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import MetadataExtractor
metadata_extractor = MetadataExtractor(llm_client=self.llm_client, language=self.language)
@@ -322,7 +321,6 @@ class ExtractionOrchestrator:
statement_entity_edges
)
if user_statements:
# 获取 end_user_id 和 config_id
end_user_id = dialog_data_list[0].end_user_id if dialog_data_list else None
config_id = dialog_data_list[0].config_id if dialog_data_list and hasattr(dialog_data_list[0], 'config_id') else None
if end_user_id:
@@ -339,9 +337,7 @@ class ExtractionOrchestrator:
except Exception as e:
logger.error(f"触发元数据提取任务失败(不影响主流程): {e}", exc_info=True)
# 同步用户别名到数据库表
logger.info("步骤 7: 同步用户别名到 end_user 和 end_user_info 表")
await self._update_end_user_other_name(entity_nodes, dialog_data_list)
# 别名同步已迁移到 Celery 元数据提取任务中,不再在此处执行
logger.info(f"知识提取流水线运行完成({mode_str}")
return (

View File

@@ -14,9 +14,9 @@ from app.core.memory.models.graph_models import (
StatementNode,
)
from app.core.memory.models.metadata_models import (
MetadataExtractionResponse,
UserMetadata,
)
from app.core.memory.models.message_models import DialogData
logger = logging.getLogger(__name__)
@@ -50,6 +50,35 @@ class MetadataExtractor:
return "zh"
return "en"
# def collect_user_raw_messages(
# self,
# dialog_data_list: List[DialogData],
# ) -> List[str]:
# """
# 从原始对话数据中提取 speaker="user" 的消息原文。
# 直接使用用户的原始输入,不经过陈述句提取阶段的 LLM 改写,
# 避免第一人称被替换为第三人称导致元数据/别名提取错误。
# Returns:
# 用户原始消息文本列表
# """
# result = []
# for dialog in dialog_data_list:
# if not dialog.context or not dialog.context.msgs:
# continue
# for msg in dialog.context.msgs:
# if getattr(msg, 'role', '') == 'user':
# text = (getattr(msg, 'msg', '') or '').strip()
# if text:
# result.append(text)
# logger.info(f"收集到 {len(result)} 条用户原始消息")
# if result:
# for i, text in enumerate(result):
# logger.info(f" [user message {i+1}] {text[:200]}")
# return result
def collect_user_related_statements(
self,
entity_nodes: List[ExtractedEntityNode],
@@ -104,6 +133,9 @@ class MetadataExtractor:
f"(直接关联: {total_associated}, speaker=user: {len(result)}, "
f"跳过非user: {skipped_non_user})"
)
if result:
for i, text in enumerate(result):
logger.info(f" [user statement {i+1}] {text}")
if total_associated > 0 and len(result) == 0:
logger.warning(
f"{total_associated} 条直接关联 statement 但全部被 speaker 过滤,"
@@ -111,18 +143,22 @@ class MetadataExtractor:
)
return result
async def extract_metadata(self, statements: List[str], existing_metadata: Optional[dict] = None) -> Optional[UserMetadata]:
async def extract_metadata(
self,
statements: List[str],
existing_metadata: Optional[dict] = None,
existing_aliases: Optional[List[str]] = None,
) -> Optional[tuple]:
"""
对筛选后的 statement 列表调用 LLM 提取元数据。
语言根据 statement 内容自动检测,不依赖系统界面语言。
传入已有元数据作为上下文,让 LLM 能判断 replace/remove 操作。
对筛选后的 statement 列表调用 LLM 提取元数据和用户别名
Args:
statements: 用户发言的 statement 文本列表
existing_metadata: 数据库已有的元数据(可选),用于 LLM 对比判断变更
existing_metadata: 数据库已有的元数据(可选)
existing_aliases: 数据库已有的用户别名列表(可选)
Returns:
UserMetadata on success, None on failure
(UserMetadata, List[str], List[str]) tuple: (metadata, aliases_to_add, aliases_to_remove) on success, None on failure
"""
if not statements:
return None
@@ -130,7 +166,6 @@ class MetadataExtractor:
try:
from app.core.memory.utils.prompt.prompt_utils import prompt_env
# 根据写入内容的语言自动检测,而非使用系统界面语言
detected_language = self.detect_language(statements)
logger.info(f"元数据提取语言检测结果: {detected_language}")
@@ -139,18 +174,23 @@ class MetadataExtractor:
statements=statements,
language=detected_language,
existing_metadata=existing_metadata,
existing_aliases=existing_aliases,
json_schema="",
)
from app.core.memory.models.metadata_models import MetadataExtractionResponse
response = await self.llm_client.response_structured(
messages=[{"role": "user", "content": prompt}],
response_model=MetadataExtractionResponse,
)
if response and response.user_metadata:
return response.user_metadata
if response:
metadata = response.user_metadata if response.user_metadata else None
to_add = response.aliases_to_add if response.aliases_to_add else []
to_remove = response.aliases_to_remove if response.aliases_to_remove else []
return metadata, to_add, to_remove
logger.warning("LLM 返回的元数据为空")
logger.warning("LLM 返回的响应为空")
return None
except Exception as e:

View File

@@ -82,6 +82,18 @@ class StatementExtractor:
logger.warning(f"Chunk {getattr(chunk, 'id', 'unknown')} has no speaker field or is empty")
return None
@staticmethod
def _replace_first_person_with_user(text: str) -> str:
"""将用户消息中的第一人称代词""替换为"用户"
替换规则:
- 所有独立的""都替换为"用户"(包括"我的""用户的""叫我""叫用户"
- 不替换"我们"中的"""我们"是复数,不指代用户个人)
"""
import re
result = re.sub(r'我(?!们)', '用户', text)
return result
async def _extract_statements(self, chunk, end_user_id: Optional[str] = None, dialogue_content: str = None) -> List[Statement]:
"""Process a single chunk and return extracted statements
@@ -99,6 +111,13 @@ class StatementExtractor:
logger.warning(f"Chunk {chunk.id} content too short or empty, skipping")
return []
# 对 speaker="user" 的 chunk将第一人称"我"替换为"用户"
# 避免 LLM 在陈述句提取时将"我"替换为具体名字(如"齐齐"
# 导致下游元数据/别名提取无法识别第一人称语义。
chunk_speaker = self._get_speaker_from_chunk(chunk)
if chunk_speaker == "user":
chunk_content = self._replace_first_person_with_user(chunk_content)
prompt_content = await render_statement_extraction_prompt(
chunk_content=chunk_content,
definitions=LABEL_DEFINITIONS,
@@ -149,8 +168,6 @@ class StatementExtractor:
relevence_info = RelevenceInfo[relevence_str] if relevence_str in RelevenceInfo.__members__ else RelevenceInfo.RELEVANT
except (KeyError, ValueError):
relevence_info = RelevenceInfo.RELEVANT
chunk_speaker = self._get_speaker_from_chunk(chunk)
chunk_statement = Statement(
statement=extracted_stmt.statement,

View File

@@ -43,8 +43,9 @@ Each statement must be labeled as per the criteria mentioned below.
对话上下文和共指消解:
- 将每个陈述句归属于说出它的参与者。
- 如果参与者列表为说话者提供了名称(例如,"李雪(用户)"),请在提取的陈述句中使用具体名称("李雪"),而不是通用角色("用户"
- 将所有代词解析为对话上下文中的具体人物或实体
- **对于用户的发言:必须使用"用户"作为主语**,禁止将"用户"或"我"替换为用户的真实姓名或别名。例如,用户说"我叫张三"应提取为"用户叫张三",而不是"张三叫张三"
- 对于 AI 助手的发言:使用"助手"或"AI助手"作为主语
- 将所有代词解析为对话上下文中的具体人物或实体,但"我"必须解析为"用户"。
- 识别并将抽象引用解析为其具体名称(如果提到)。
- 将缩写和首字母缩略词扩展为其完整形式。
{% else %}
@@ -68,8 +69,9 @@ Context Resolution Requirements:
Conversational Context & Co-reference Resolution:
- Attribute every statement to the participant who uttered it.
- If the participant list provides a name for a speaker (e.g., "李雪 (用户)"), use the specific name ("李雪") in the extracted statement, not the generic role ("用户").
- Resolve all pronouns to the specific person or entity from the conversation's context.
- **For user's statements: always use "用户" (User) as the subject**. Do NOT replace "用户" or "I" with the user's real name or alias. For example, if the user says "I'm John", extract as "用户 is John", not "John is John".
- For AI assistant's statements: use "助手" or "AI助手" as the subject.
- Resolve all pronouns to the specific person or entity from the conversation's context, but "I"/"我" must always resolve to "用户".
- Identify and resolve abstract references to their specific names if mentioned.
- Expand abbreviations and acronyms to their full form.
{% endif %}
@@ -139,13 +141,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料与阿拉伯树胶等粘合
示例输出: {
"statements": [
{
"statement": "Sarah Chen 最近一直在尝试水彩画。",
"statement": "用户最近一直在尝试水彩画。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
},
{
"statement": "Sarah Chen 画了一些花朵。",
"statement": "用户画了一些花朵。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
@@ -157,13 +159,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料与阿拉伯树胶等粘合
"relevance": "IRRELEVANT"
},
{
"statement": "Sarah Chen 认为她的水彩画中的色彩组合可以改进。",
"statement": "用户认为她的水彩画中的色彩组合可以改进。",
"statement_type": "OPINION",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
},
{
"statement": "Sarah Chen 真的很喜欢玫瑰和百合。",
"statement": "用户真的很喜欢玫瑰和百合。",
"statement_type": "FACT",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
@@ -186,13 +188,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合
示例输出: {
"statements": [
{
"statement": "张曼婷最近在尝试水彩画。",
"statement": "用户最近在尝试水彩画。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
},
{
"statement": "张曼婷画了一些花朵。",
"statement": "用户画了一些花朵。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
@@ -204,13 +206,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合
"relevance": "IRRELEVANT"
},
{
"statement": "张曼婷觉得水彩画的色彩搭配还有提升的空间。",
"statement": "用户觉得水彩画的色彩搭配还有提升的空间。",
"statement_type": "OPINION",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
},
{
"statement": "张曼婷很喜欢玫瑰和百合。",
"statement": "用户很喜欢玫瑰和百合。",
"statement_type": "FACT",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
@@ -233,13 +235,13 @@ User: "I think the color combinations could use some improvement, but I really l
Example Output: {
"statements": [
{
"statement": "Sarah Chen has been trying watercolor painting recently.",
"statement": "用户 has been trying watercolor painting recently.",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
},
{
"statement": "Sarah Chen painted some flowers.",
"statement": "用户 painted some flowers.",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
@@ -251,13 +253,13 @@ Example Output: {
"relevance": "IRRELEVANT"
},
{
"statement": "Sarah Chen thinks the color combinations in her watercolor paintings could use some improvement.",
"statement": "用户 thinks the color combinations in her watercolor paintings could use some improvement.",
"statement_type": "OPINION",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
},
{
"statement": "Sarah Chen really likes roses and lilies.",
"statement": "用户 really likes roses and lilies.",
"statement_type": "FACT",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
@@ -280,13 +282,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合
Example Output: {
"statements": [
{
"statement": "张曼婷最近在尝试水彩画。",
"statement": "用户最近在尝试水彩画。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
},
{
"statement": "张曼婷画了一些花朵。",
"statement": "用户画了一些花朵。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
@@ -298,13 +300,13 @@ Example Output: {
"relevance": "IRRELEVANT"
},
{
"statement": "张曼婷觉得水彩画的色彩搭配还有提升的空间。",
"statement": "用户觉得水彩画的色彩搭配还有提升的空间。",
"statement_type": "OPINION",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
},
{
"statement": "张曼婷很喜欢玫瑰和百合。",
"statement": "用户很喜欢玫瑰和百合。",
"statement_type": "FACT",
"temporal_type": "STATIC",
"relevance": "RELEVANT"

View File

@@ -32,6 +32,22 @@ Extract user metadata from the following conversation statements spoken by the u
- behavioral_hints.preferred_depth偏好深度概览/技术细节/深入探讨)
- behavioral_hints.tone_preference语气偏好轻松随意/专业简洁/学术严谨)
- knowledge_tags用户涉及的知识领域标签
**用户别名变更(增量模式):**
- **aliases_to_add**:本次新发现的用户别名,包括:
* 用户主动自我介绍:如"我叫张三"、"我的名字是XX"、"我的网名是XX"
* 他人对用户的称呼:如"同事叫我陈哥"、"大家叫我小张"、"领导叫我老陈"
* 只提取原文中逐字出现的名字,严禁推测或创造
* 禁止提取:用户给 AI 取的名字、第三方人物自身的名字、"用户"/"我" 等占位词
* 如果没有新别名,返回空数组 `[]`
- **aliases_to_remove**:用户明确否认的别名,包括:
* 用户说"我不叫XX了"、"别叫我XX"、"我改名了不叫XX" → 将 XX 放入此数组
* **严格限制**:只将用户原文中**逐字提到**的被否认名字放入,不要推断关联的其他别名
* 例如:用户说"我不叫陈小刀了" → 只移除"陈小刀",不要移除"陈哥"、"老陈"等未被提及的别名
* 如果没有要移除的别名,返回空数组 `[]`
{% if existing_aliases %}
- 已有别名:{{ existing_aliases | tojson }}(仅供参考,不需要在输出中重复)
{% endif %}
{% else %}
**"Three-Degree Principle" criteria:**
- Reusability: Will this information be used by multiple functional modules?
@@ -63,6 +79,22 @@ Existing user metadata from the database is provided below. Combine with the use
- behavioral_hints.preferred_depth: Preferred depth (overview/detailed/deep dive)
- behavioral_hints.tone_preference: Tone preference (casual/professional/academic)
- knowledge_tags: Knowledge domain tags related to the user
**User alias changes (incremental mode):**
- **aliases_to_add**: Newly discovered user aliases from this conversation, including:
* User self-introductions: e.g. "I'm John", "My name is XX", "My username is XX"
* How others address the user: e.g. "My colleagues call me Johnny", "People call me Mike"
* Only extract names that appear VERBATIM in the text — never infer or fabricate
* Do NOT extract: names the user gives to the AI, third-party people's own names, placeholder words like "User"/"I"
* If no new aliases, return empty array `[]`
- **aliases_to_remove**: Aliases the user explicitly denies, including:
* User says "Don't call me XX anymore", "I'm not called XX", "I changed my name from XX" → put XX in this array
* **Strict rule**: Only include the exact name the user **verbatim mentions** as denied. Do NOT infer or remove related aliases
* Example: User says "I'm not called John anymore" → only remove "John", do NOT remove "Johnny", "J" or other related aliases not mentioned
* If no aliases to remove, return empty array `[]`
{% if existing_aliases %}
- Existing aliases: {{ existing_aliases | tojson }} (for reference only, do not repeat in output)
{% endif %}
{% endif %}
===User Statements===
@@ -94,7 +126,9 @@ Return a JSON object with the following structure:
"tone_preference": ""
},
"knowledge_tags": []
}
},
"aliases_to_add": [],
"aliases_to_remove": []
}
```

View File

@@ -187,6 +187,58 @@ async def save_dialog_and_statements_to_neo4j(
bool: True if successful, False otherwise
"""
# 预处理:对特殊实体("用户"、"AI助手")复用 Neo4j 中已有节点的 ID
# 确保同一个 end_user_id 下只有一个"用户"节点和一个"AI助手"节点。
if entity_nodes:
_SPECIAL_NAMES = {"用户", "", "user", "i", "ai助手", "助手", "ai assistant", "assistant"}
end_user_id = entity_nodes[0].end_user_id if entity_nodes else None
if end_user_id:
try:
# 查询已有的特殊实体
cypher = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id AND toLower(e.name) IN $names
RETURN e.id AS id, e.name AS name
"""
existing = await connector.execute_query(
cypher,
end_user_id=end_user_id,
names=list(_SPECIAL_NAMES),
)
# 建立 name(lower) → existing_id 映射
existing_id_map = {}
for record in (existing or []):
name_lower = (record.get("name") or "").strip().lower()
if name_lower and record.get("id"):
existing_id_map[name_lower] = record["id"]
if existing_id_map:
# 替换新实体的 ID 为已有 ID同时更新所有引用该 ID 的边
for ent in entity_nodes:
name_lower = (ent.name or "").strip().lower()
if name_lower in existing_id_map:
old_id = ent.id
new_id = existing_id_map[name_lower]
if old_id != new_id:
ent.id = new_id
# 更新 statement_entity_edges 中的引用
for edge in statement_entity_edges:
if edge.target == old_id:
edge.target = new_id
if edge.source == old_id:
edge.source = new_id
# 更新 entity_edges 中的引用
for edge in entity_edges:
if edge.source == old_id:
edge.source = new_id
if edge.target == old_id:
edge.target = new_id
logger.info(
f"特殊实体 '{ent.name}' ID 复用: {old_id[:8]}... → {new_id[:8]}..."
)
except Exception as e:
logger.warning(f"特殊实体 ID 复用查询失败(不影响写入): {e}")
# 定义事务函数,将所有写操作放在一个事务中
async def _save_all_in_transaction(tx):
"""在单个事务中执行所有保存操作,避免死锁"""

View File

@@ -3000,70 +3000,149 @@ def extract_user_metadata_task(
return {"status": "FAILURE", "error": "Memory config has no LLM model configured"}
llm_client = factory.get_llm_client(memory_config.llm_id)
# 2.5 读取已有元数据,传给 extractor 作为上下文
# 2.5 读取已有元数据和别名,传给 extractor 作为上下文
existing_metadata = None
existing_aliases = None
try:
info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid)
if info and info.meta_data:
existing_metadata = info.meta_data
logger.info("[CELERY METADATA] 已读取数据库已有元数据作为 LLM 上下文")
if info:
if info.meta_data:
existing_metadata = info.meta_data
existing_aliases = info.aliases if info.aliases else []
logger.info(f"[CELERY METADATA] 已读取已有元数据和别名aliases={existing_aliases}")
except Exception as e:
logger.warning(f"[CELERY METADATA] 读取已有数据失败(继续无上下文提取): {e}")
logger.warning(f"[CELERY METADATA] 读取已有数据失败(继续无上下文提取): {e}")
# 3. 提取元数据(传入已有数据作为上下文)
# 3. 提取元数据和别名(传入已有数据作为上下文)
extractor = MetadataExtractor(llm_client=llm_client, language=language)
user_metadata = await extractor.extract_metadata(statements, existing_metadata=existing_metadata)
extract_result = await extractor.extract_metadata(
statements,
existing_metadata=existing_metadata,
existing_aliases=existing_aliases,
)
if not user_metadata:
if not extract_result:
logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}")
return {"status": "SUCCESS", "result": "no_metadata_extracted"}
# 4. 清洗、校验、覆盖写入
raw_dict = user_metadata.model_dump(exclude_none=True)
user_metadata, aliases_to_add, aliases_to_remove = extract_result
logger.info(f"[CELERY METADATA] LLM 别名新增: {aliases_to_add}, 移除: {aliases_to_remove}")
# 4. 清洗元数据、覆盖写入元数据和别名
raw_dict = user_metadata.model_dump(exclude_none=True) if user_metadata else {}
logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}")
cleaned = clean_metadata(raw_dict)
if not cleaned:
logger.info(f"[CELERY METADATA] Cleaned metadata is empty for end_user_id={end_user_id}")
return {"status": "SUCCESS", "result": "empty_after_cleaning"}
cleaned = clean_metadata(raw_dict) if raw_dict else {}
logger.info(f"[CELERY METADATA] 清洗后元数据: {json.dumps(cleaned, ensure_ascii=False)}")
validated = validate_metadata(cleaned)
if not validated:
return {"status": "FAILURE", "error": "Metadata validation failed after cleaning"}
# 直接覆盖写入LLM 已完成语义合并,输出的是完整结果)
# 保留 _updated_at 时间戳追踪
from datetime import datetime as dt, timezone as tz
now = dt.now(tz.utc).isoformat()
# 过滤别名中的占位名称,执行增量增删
_PLACEHOLDER_NAMES = {"用户", "", "user", "i"}
def _filter_aliases(aliases_list):
seen = set()
result = []
for a in aliases_list:
a_stripped = a.strip()
if a_stripped and a_stripped.lower() not in _PLACEHOLDER_NAMES and a_stripped.lower() not in seen:
result.append(a_stripped)
seen.add(a_stripped.lower())
return result
filtered_add = _filter_aliases(aliases_to_add)
filtered_remove = _filter_aliases(aliases_to_remove)
remove_lower = {a.lower() for a in filtered_remove}
with get_db_context() as db:
end_user_uuid = uuid.UUID(end_user_id)
info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid)
end_user = EndUserRepository(db).get_by_id(end_user_uuid)
if info:
existing_meta = info.meta_data if info.meta_data else {}
logger.info(f"[CELERY METADATA] 数据库已有元数据: {json.dumps(existing_meta, ensure_ascii=False)}")
# 元数据覆盖写入
if cleaned:
existing_meta = info.meta_data if info.meta_data else {}
updated_at = dict(existing_meta.get("_updated_at", {}))
_update_timestamps(existing_meta, cleaned, updated_at, now)
final = dict(cleaned)
final["_updated_at"] = updated_at
info.meta_data = final
logger.info("[CELERY METADATA] 覆盖写入元数据")
# 保留已有的 _updated_at更新变更字段的时间戳
updated_at = dict(existing_meta.get("_updated_at", {}))
_update_timestamps(existing_meta, cleaned, updated_at, now)
# 别名增量增删:(已有 - remove) + add
old_aliases = info.aliases if info.aliases else []
# 先移除
merged = [a for a in old_aliases if a.strip().lower() not in remove_lower]
# 再追加(去重)
existing_lower = {a.strip().lower() for a in merged}
for a in filtered_add:
if a.lower() not in existing_lower:
merged.append(a)
existing_lower.add(a.lower())
final = dict(cleaned)
final["_updated_at"] = updated_at
info.meta_data = final
logger.info(f"[CELERY METADATA] 覆盖写入元数据: {json.dumps(final, ensure_ascii=False)}")
if merged != old_aliases:
info.aliases = merged
# other_name 更新逻辑
if merged and (
not info.other_name
or info.other_name.strip().lower() in _PLACEHOLDER_NAMES
or info.other_name.strip().lower() in remove_lower
):
info.other_name = merged[0]
if end_user and merged and (
not end_user.other_name
or end_user.other_name.strip().lower() in _PLACEHOLDER_NAMES
or end_user.other_name.strip().lower() in remove_lower
):
end_user.other_name = merged[0]
logger.info(
f"[CELERY METADATA] 别名增量更新: {old_aliases} - {filtered_remove} + {filtered_add}{merged}"
)
else:
logger.info(
f"[CELERY METADATA] No end_user_info record for end_user_id={end_user_id}, "
f"skipping metadata write (will be created by alias sync)"
)
return {"status": "SUCCESS", "result": "no_info_record"}
# 没有 end_user_info 记录,创建一条
from app.models.end_user_info_model import EndUserInfo
initial_aliases = filtered_add # 新记录只有 add没有 remove
first_alias = initial_aliases[0] if initial_aliases else ""
if first_alias or cleaned:
new_info = EndUserInfo(
end_user_id=end_user_uuid,
other_name=first_alias or "",
aliases=initial_aliases,
meta_data=cleaned if cleaned else None,
)
db.add(new_info)
if end_user and first_alias and (
not end_user.other_name or end_user.other_name.strip().lower() in _PLACEHOLDER_NAMES
):
end_user.other_name = first_alias
logger.info(f"[CELERY METADATA] 创建 end_user_info: other_name={first_alias}, aliases={initial_aliases}")
else:
return {"status": "SUCCESS", "result": "no_data_to_write"}
db.commit()
return {"status": "SUCCESS", "result": "metadata_written"}
# 同步 PgSQL aliases 到 Neo4j 用户实体PgSQL 为权威源)
final_aliases = info.aliases if info else initial_aliases
if final_aliases:
try:
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
neo4j_connector = Neo4jConnector()
cypher = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '', 'User', 'I']
SET e.aliases = $aliases
"""
await neo4j_connector.execute_query(
cypher, end_user_id=end_user_id, aliases=final_aliases
)
await neo4j_connector.close()
logger.info(f"[CELERY METADATA] Neo4j 用户实体 aliases 已同步: {final_aliases}")
except Exception as neo4j_err:
logger.warning(f"[CELERY METADATA] Neo4j aliases 同步失败(不影响主流程): {neo4j_err}")
return {"status": "SUCCESS", "result": "metadata_and_aliases_written"}
loop = None
try: