diff --git a/api/app/core/memory/models/metadata_models.py b/api/app/core/memory/models/metadata_models.py index a5c70ec6..f08d18ed 100644 --- a/api/app/core/memory/models/metadata_models.py +++ b/api/app/core/memory/models/metadata_models.py @@ -38,3 +38,11 @@ class MetadataExtractionResponse(BaseModel): """元数据提取 LLM 响应结构""" model_config = ConfigDict(extra='ignore') user_metadata: UserMetadata = Field(default_factory=UserMetadata) + aliases_to_add: List[str] = Field( + default_factory=list, + description="本次新发现的用户别名(用户自我介绍或他人对用户的称呼)" + ) + aliases_to_remove: List[str] = Field( + default_factory=list, + description="用户明确否认的别名(如'我不叫XX了')" + ) diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py index 8f6d9853..b8a36e44 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py @@ -311,9 +311,8 @@ class ExtractionOrchestrator: dialog_data_list, ) - # 步骤 7: 同步用户别名到数据库表 + 触发异步元数据提取(仅正式模式) + # 步骤 7: 触发异步元数据和别名提取(仅正式模式) if not is_pilot_run: - # 收集用户相关 statement 并触发异步元数据提取 try: from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import MetadataExtractor metadata_extractor = MetadataExtractor(llm_client=self.llm_client, language=self.language) @@ -322,7 +321,6 @@ class ExtractionOrchestrator: statement_entity_edges ) if user_statements: - # 获取 end_user_id 和 config_id end_user_id = dialog_data_list[0].end_user_id if dialog_data_list else None config_id = dialog_data_list[0].config_id if dialog_data_list and hasattr(dialog_data_list[0], 'config_id') else None if end_user_id: @@ -339,9 +337,7 @@ class ExtractionOrchestrator: except Exception as e: logger.error(f"触发元数据提取任务失败(不影响主流程): {e}", exc_info=True) - # 同步用户别名到数据库表 - logger.info("步骤 7: 同步用户别名到 end_user 和 end_user_info 表") - await self._update_end_user_other_name(entity_nodes, dialog_data_list) + # 别名同步已迁移到 Celery 元数据提取任务中,不再在此处执行 logger.info(f"知识提取流水线运行完成({mode_str})") return ( diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py index af3331b9..cc8c6073 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py @@ -14,9 +14,9 @@ from app.core.memory.models.graph_models import ( StatementNode, ) from app.core.memory.models.metadata_models import ( - MetadataExtractionResponse, UserMetadata, ) +from app.core.memory.models.message_models import DialogData logger = logging.getLogger(__name__) @@ -50,6 +50,35 @@ class MetadataExtractor: return "zh" return "en" + # def collect_user_raw_messages( + # self, + # dialog_data_list: List[DialogData], + # ) -> List[str]: + # """ + # 从原始对话数据中提取 speaker="user" 的消息原文。 + + # 直接使用用户的原始输入,不经过陈述句提取阶段的 LLM 改写, + # 避免第一人称被替换为第三人称导致元数据/别名提取错误。 + + # Returns: + # 用户原始消息文本列表 + # """ + # result = [] + # for dialog in dialog_data_list: + # if not dialog.context or not dialog.context.msgs: + # continue + # for msg in dialog.context.msgs: + # if getattr(msg, 'role', '') == 'user': + # text = (getattr(msg, 'msg', '') or '').strip() + # if text: + # result.append(text) + + # logger.info(f"收集到 {len(result)} 条用户原始消息") + # if result: + # for i, text in enumerate(result): + # logger.info(f" [user message {i+1}] {text[:200]}") + # return result + def collect_user_related_statements( self, entity_nodes: List[ExtractedEntityNode], @@ -104,6 +133,9 @@ class MetadataExtractor: f"(直接关联: {total_associated}, speaker=user: {len(result)}, " f"跳过非user: {skipped_non_user})" ) + if result: + for i, text in enumerate(result): + logger.info(f" [user statement {i+1}] {text}") if total_associated > 0 and len(result) == 0: logger.warning( f"有 {total_associated} 条直接关联 statement 但全部被 speaker 过滤," @@ -111,18 +143,22 @@ class MetadataExtractor: ) return result - async def extract_metadata(self, statements: List[str], existing_metadata: Optional[dict] = None) -> Optional[UserMetadata]: + async def extract_metadata( + self, + statements: List[str], + existing_metadata: Optional[dict] = None, + existing_aliases: Optional[List[str]] = None, + ) -> Optional[tuple]: """ - 对筛选后的 statement 列表调用 LLM 提取元数据。 - 语言根据 statement 内容自动检测,不依赖系统界面语言。 - 传入已有元数据作为上下文,让 LLM 能判断 replace/remove 操作。 + 对筛选后的 statement 列表调用 LLM 提取元数据和用户别名。 Args: statements: 用户发言的 statement 文本列表 - existing_metadata: 数据库已有的元数据(可选),用于 LLM 对比判断变更 + existing_metadata: 数据库已有的元数据(可选) + existing_aliases: 数据库已有的用户别名列表(可选) Returns: - UserMetadata on success, None on failure + (UserMetadata, List[str], List[str]) tuple: (metadata, aliases_to_add, aliases_to_remove) on success, None on failure """ if not statements: return None @@ -130,7 +166,6 @@ class MetadataExtractor: try: from app.core.memory.utils.prompt.prompt_utils import prompt_env - # 根据写入内容的语言自动检测,而非使用系统界面语言 detected_language = self.detect_language(statements) logger.info(f"元数据提取语言检测结果: {detected_language}") @@ -139,18 +174,23 @@ class MetadataExtractor: statements=statements, language=detected_language, existing_metadata=existing_metadata, + existing_aliases=existing_aliases, json_schema="", ) + from app.core.memory.models.metadata_models import MetadataExtractionResponse response = await self.llm_client.response_structured( messages=[{"role": "user", "content": prompt}], response_model=MetadataExtractionResponse, ) - if response and response.user_metadata: - return response.user_metadata + if response: + metadata = response.user_metadata if response.user_metadata else None + to_add = response.aliases_to_add if response.aliases_to_add else [] + to_remove = response.aliases_to_remove if response.aliases_to_remove else [] + return metadata, to_add, to_remove - logger.warning("LLM 返回的元数据为空") + logger.warning("LLM 返回的响应为空") return None except Exception as e: diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/statement_extraction.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/statement_extraction.py index b06bd70f..684ad556 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/statement_extraction.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/statement_extraction.py @@ -82,6 +82,18 @@ class StatementExtractor: logger.warning(f"Chunk {getattr(chunk, 'id', 'unknown')} has no speaker field or is empty") return None + @staticmethod + def _replace_first_person_with_user(text: str) -> str: + """将用户消息中的第一人称代词"我"替换为"用户"。 + + 替换规则: + - 所有独立的"我"都替换为"用户"(包括"我的"→"用户的"、"叫我"→"叫用户") + - 不替换"我们"中的"我"("我们"是复数,不指代用户个人) + """ + import re + result = re.sub(r'我(?!们)', '用户', text) + return result + async def _extract_statements(self, chunk, end_user_id: Optional[str] = None, dialogue_content: str = None) -> List[Statement]: """Process a single chunk and return extracted statements @@ -99,6 +111,13 @@ class StatementExtractor: logger.warning(f"Chunk {chunk.id} content too short or empty, skipping") return [] + # 对 speaker="user" 的 chunk,将第一人称"我"替换为"用户", + # 避免 LLM 在陈述句提取时将"我"替换为具体名字(如"齐齐"), + # 导致下游元数据/别名提取无法识别第一人称语义。 + chunk_speaker = self._get_speaker_from_chunk(chunk) + if chunk_speaker == "user": + chunk_content = self._replace_first_person_with_user(chunk_content) + prompt_content = await render_statement_extraction_prompt( chunk_content=chunk_content, definitions=LABEL_DEFINITIONS, @@ -149,8 +168,6 @@ class StatementExtractor: relevence_info = RelevenceInfo[relevence_str] if relevence_str in RelevenceInfo.__members__ else RelevenceInfo.RELEVANT except (KeyError, ValueError): relevence_info = RelevenceInfo.RELEVANT - - chunk_speaker = self._get_speaker_from_chunk(chunk) chunk_statement = Statement( statement=extracted_stmt.statement, diff --git a/api/app/core/memory/utils/prompt/prompts/extract_statement.jinja2 b/api/app/core/memory/utils/prompt/prompts/extract_statement.jinja2 index 3cdb5fd0..611bd6df 100644 --- a/api/app/core/memory/utils/prompt/prompts/extract_statement.jinja2 +++ b/api/app/core/memory/utils/prompt/prompts/extract_statement.jinja2 @@ -43,8 +43,9 @@ Each statement must be labeled as per the criteria mentioned below. 对话上下文和共指消解: - 将每个陈述句归属于说出它的参与者。 -- 如果参与者列表为说话者提供了名称(例如,"李雪(用户)"),请在提取的陈述句中使用具体名称("李雪"),而不是通用角色("用户")。 -- 将所有代词解析为对话上下文中的具体人物或实体。 +- **对于用户的发言:必须使用"用户"作为主语**,禁止将"用户"或"我"替换为用户的真实姓名或别名。例如,用户说"我叫张三"应提取为"用户叫张三",而不是"张三叫张三"。 +- 对于 AI 助手的发言:使用"助手"或"AI助手"作为主语。 +- 将所有代词解析为对话上下文中的具体人物或实体,但"我"必须解析为"用户"。 - 识别并将抽象引用解析为其具体名称(如果提到)。 - 将缩写和首字母缩略词扩展为其完整形式。 {% else %} @@ -68,8 +69,9 @@ Context Resolution Requirements: Conversational Context & Co-reference Resolution: - Attribute every statement to the participant who uttered it. -- If the participant list provides a name for a speaker (e.g., "李雪 (用户)"), use the specific name ("李雪") in the extracted statement, not the generic role ("用户"). -- Resolve all pronouns to the specific person or entity from the conversation's context. +- **For user's statements: always use "用户" (User) as the subject**. Do NOT replace "用户" or "I" with the user's real name or alias. For example, if the user says "I'm John", extract as "用户 is John", not "John is John". +- For AI assistant's statements: use "助手" or "AI助手" as the subject. +- Resolve all pronouns to the specific person or entity from the conversation's context, but "I"/"我" must always resolve to "用户". - Identify and resolve abstract references to their specific names if mentioned. - Expand abbreviations and acronyms to their full form. {% endif %} @@ -139,13 +141,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料与阿拉伯树胶等粘合 示例输出: { "statements": [ { - "statement": "Sarah Chen 最近一直在尝试水彩画。", + "statement": "用户最近一直在尝试水彩画。", "statement_type": "FACT", "temporal_type": "DYNAMIC", "relevance": "RELEVANT" }, { - "statement": "Sarah Chen 画了一些花朵。", + "statement": "用户画了一些花朵。", "statement_type": "FACT", "temporal_type": "DYNAMIC", "relevance": "RELEVANT" @@ -157,13 +159,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料与阿拉伯树胶等粘合 "relevance": "IRRELEVANT" }, { - "statement": "Sarah Chen 认为她的水彩画中的色彩组合可以改进。", + "statement": "用户认为她的水彩画中的色彩组合可以改进。", "statement_type": "OPINION", "temporal_type": "STATIC", "relevance": "RELEVANT" }, { - "statement": "Sarah Chen 真的很喜欢玫瑰和百合。", + "statement": "用户真的很喜欢玫瑰和百合。", "statement_type": "FACT", "temporal_type": "STATIC", "relevance": "RELEVANT" @@ -186,13 +188,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合 示例输出: { "statements": [ { - "statement": "张曼婷最近在尝试水彩画。", + "statement": "用户最近在尝试水彩画。", "statement_type": "FACT", "temporal_type": "DYNAMIC", "relevance": "RELEVANT" }, { - "statement": "张曼婷画了一些花朵。", + "statement": "用户画了一些花朵。", "statement_type": "FACT", "temporal_type": "DYNAMIC", "relevance": "RELEVANT" @@ -204,13 +206,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合 "relevance": "IRRELEVANT" }, { - "statement": "张曼婷觉得水彩画的色彩搭配还有提升的空间。", + "statement": "用户觉得水彩画的色彩搭配还有提升的空间。", "statement_type": "OPINION", "temporal_type": "STATIC", "relevance": "RELEVANT" }, { - "statement": "张曼婷很喜欢玫瑰和百合。", + "statement": "用户很喜欢玫瑰和百合。", "statement_type": "FACT", "temporal_type": "STATIC", "relevance": "RELEVANT" @@ -233,13 +235,13 @@ User: "I think the color combinations could use some improvement, but I really l Example Output: { "statements": [ { - "statement": "Sarah Chen has been trying watercolor painting recently.", + "statement": "用户 has been trying watercolor painting recently.", "statement_type": "FACT", "temporal_type": "DYNAMIC", "relevance": "RELEVANT" }, { - "statement": "Sarah Chen painted some flowers.", + "statement": "用户 painted some flowers.", "statement_type": "FACT", "temporal_type": "DYNAMIC", "relevance": "RELEVANT" @@ -251,13 +253,13 @@ Example Output: { "relevance": "IRRELEVANT" }, { - "statement": "Sarah Chen thinks the color combinations in her watercolor paintings could use some improvement.", + "statement": "用户 thinks the color combinations in her watercolor paintings could use some improvement.", "statement_type": "OPINION", "temporal_type": "STATIC", "relevance": "RELEVANT" }, { - "statement": "Sarah Chen really likes roses and lilies.", + "statement": "用户 really likes roses and lilies.", "statement_type": "FACT", "temporal_type": "STATIC", "relevance": "RELEVANT" @@ -280,13 +282,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合 Example Output: { "statements": [ { - "statement": "张曼婷最近在尝试水彩画。", + "statement": "用户最近在尝试水彩画。", "statement_type": "FACT", "temporal_type": "DYNAMIC", "relevance": "RELEVANT" }, { - "statement": "张曼婷画了一些花朵。", + "statement": "用户画了一些花朵。", "statement_type": "FACT", "temporal_type": "DYNAMIC", "relevance": "RELEVANT" @@ -298,13 +300,13 @@ Example Output: { "relevance": "IRRELEVANT" }, { - "statement": "张曼婷觉得水彩画的色彩搭配还有提升的空间。", + "statement": "用户觉得水彩画的色彩搭配还有提升的空间。", "statement_type": "OPINION", "temporal_type": "STATIC", "relevance": "RELEVANT" }, { - "statement": "张曼婷很喜欢玫瑰和百合。", + "statement": "用户很喜欢玫瑰和百合。", "statement_type": "FACT", "temporal_type": "STATIC", "relevance": "RELEVANT" diff --git a/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 b/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 index c280e5f6..5d019b12 100644 --- a/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 +++ b/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 @@ -32,6 +32,22 @@ Extract user metadata from the following conversation statements spoken by the u - behavioral_hints.preferred_depth:偏好深度(概览/技术细节/深入探讨) - behavioral_hints.tone_preference:语气偏好(轻松随意/专业简洁/学术严谨) - knowledge_tags:用户涉及的知识领域标签 + +**用户别名变更(增量模式):** +- **aliases_to_add**:本次新发现的用户别名,包括: + * 用户主动自我介绍:如"我叫张三"、"我的名字是XX"、"我的网名是XX" + * 他人对用户的称呼:如"同事叫我陈哥"、"大家叫我小张"、"领导叫我老陈" + * 只提取原文中逐字出现的名字,严禁推测或创造 + * 禁止提取:用户给 AI 取的名字、第三方人物自身的名字、"用户"/"我" 等占位词 + * 如果没有新别名,返回空数组 `[]` +- **aliases_to_remove**:用户明确否认的别名,包括: + * 用户说"我不叫XX了"、"别叫我XX"、"我改名了,不叫XX" → 将 XX 放入此数组 + * **严格限制**:只将用户原文中**逐字提到**的被否认名字放入,不要推断关联的其他别名 + * 例如:用户说"我不叫陈小刀了" → 只移除"陈小刀",不要移除"陈哥"、"老陈"等未被提及的别名 + * 如果没有要移除的别名,返回空数组 `[]` +{% if existing_aliases %} +- 已有别名:{{ existing_aliases | tojson }}(仅供参考,不需要在输出中重复) +{% endif %} {% else %} **"Three-Degree Principle" criteria:** - Reusability: Will this information be used by multiple functional modules? @@ -63,6 +79,22 @@ Existing user metadata from the database is provided below. Combine with the use - behavioral_hints.preferred_depth: Preferred depth (overview/detailed/deep dive) - behavioral_hints.tone_preference: Tone preference (casual/professional/academic) - knowledge_tags: Knowledge domain tags related to the user + +**User alias changes (incremental mode):** +- **aliases_to_add**: Newly discovered user aliases from this conversation, including: + * User self-introductions: e.g. "I'm John", "My name is XX", "My username is XX" + * How others address the user: e.g. "My colleagues call me Johnny", "People call me Mike" + * Only extract names that appear VERBATIM in the text — never infer or fabricate + * Do NOT extract: names the user gives to the AI, third-party people's own names, placeholder words like "User"/"I" + * If no new aliases, return empty array `[]` +- **aliases_to_remove**: Aliases the user explicitly denies, including: + * User says "Don't call me XX anymore", "I'm not called XX", "I changed my name from XX" → put XX in this array + * **Strict rule**: Only include the exact name the user **verbatim mentions** as denied. Do NOT infer or remove related aliases + * Example: User says "I'm not called John anymore" → only remove "John", do NOT remove "Johnny", "J" or other related aliases not mentioned + * If no aliases to remove, return empty array `[]` +{% if existing_aliases %} +- Existing aliases: {{ existing_aliases | tojson }} (for reference only, do not repeat in output) +{% endif %} {% endif %} ===User Statements=== @@ -94,7 +126,9 @@ Return a JSON object with the following structure: "tone_preference": "" }, "knowledge_tags": [] - } + }, + "aliases_to_add": [], + "aliases_to_remove": [] } ``` diff --git a/api/app/repositories/neo4j/graph_saver.py b/api/app/repositories/neo4j/graph_saver.py index adc266fe..ae76b9f6 100644 --- a/api/app/repositories/neo4j/graph_saver.py +++ b/api/app/repositories/neo4j/graph_saver.py @@ -187,6 +187,58 @@ async def save_dialog_and_statements_to_neo4j( bool: True if successful, False otherwise """ + # 预处理:对特殊实体("用户"、"AI助手")复用 Neo4j 中已有节点的 ID, + # 确保同一个 end_user_id 下只有一个"用户"节点和一个"AI助手"节点。 + if entity_nodes: + _SPECIAL_NAMES = {"用户", "我", "user", "i", "ai助手", "助手", "ai assistant", "assistant"} + end_user_id = entity_nodes[0].end_user_id if entity_nodes else None + if end_user_id: + try: + # 查询已有的特殊实体 + cypher = """ + MATCH (e:ExtractedEntity) + WHERE e.end_user_id = $end_user_id AND toLower(e.name) IN $names + RETURN e.id AS id, e.name AS name + """ + existing = await connector.execute_query( + cypher, + end_user_id=end_user_id, + names=list(_SPECIAL_NAMES), + ) + # 建立 name(lower) → existing_id 映射 + existing_id_map = {} + for record in (existing or []): + name_lower = (record.get("name") or "").strip().lower() + if name_lower and record.get("id"): + existing_id_map[name_lower] = record["id"] + + if existing_id_map: + # 替换新实体的 ID 为已有 ID,同时更新所有引用该 ID 的边 + for ent in entity_nodes: + name_lower = (ent.name or "").strip().lower() + if name_lower in existing_id_map: + old_id = ent.id + new_id = existing_id_map[name_lower] + if old_id != new_id: + ent.id = new_id + # 更新 statement_entity_edges 中的引用 + for edge in statement_entity_edges: + if edge.target == old_id: + edge.target = new_id + if edge.source == old_id: + edge.source = new_id + # 更新 entity_edges 中的引用 + for edge in entity_edges: + if edge.source == old_id: + edge.source = new_id + if edge.target == old_id: + edge.target = new_id + logger.info( + f"特殊实体 '{ent.name}' ID 复用: {old_id[:8]}... → {new_id[:8]}..." + ) + except Exception as e: + logger.warning(f"特殊实体 ID 复用查询失败(不影响写入): {e}") + # 定义事务函数,将所有写操作放在一个事务中 async def _save_all_in_transaction(tx): """在单个事务中执行所有保存操作,避免死锁""" diff --git a/api/app/tasks.py b/api/app/tasks.py index 3eb1a52c..3641f438 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -3000,70 +3000,149 @@ def extract_user_metadata_task( return {"status": "FAILURE", "error": "Memory config has no LLM model configured"} llm_client = factory.get_llm_client(memory_config.llm_id) - # 2.5 读取已有元数据,传给 extractor 作为上下文 + # 2.5 读取已有元数据和别名,传给 extractor 作为上下文 existing_metadata = None + existing_aliases = None try: info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid) - if info and info.meta_data: - existing_metadata = info.meta_data - logger.info("[CELERY METADATA] 已读取数据库已有元数据作为 LLM 上下文") + if info: + if info.meta_data: + existing_metadata = info.meta_data + existing_aliases = info.aliases if info.aliases else [] + logger.info(f"[CELERY METADATA] 已读取已有元数据和别名(aliases={existing_aliases})") except Exception as e: - logger.warning(f"[CELERY METADATA] 读取已有元数据失败(继续无上下文提取): {e}") + logger.warning(f"[CELERY METADATA] 读取已有数据失败(继续无上下文提取): {e}") - # 3. 提取元数据(传入已有元数据作为上下文) + # 3. 提取元数据和别名(传入已有数据作为上下文) extractor = MetadataExtractor(llm_client=llm_client, language=language) - user_metadata = await extractor.extract_metadata(statements, existing_metadata=existing_metadata) + extract_result = await extractor.extract_metadata( + statements, + existing_metadata=existing_metadata, + existing_aliases=existing_aliases, + ) - if not user_metadata: + if not extract_result: logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}") return {"status": "SUCCESS", "result": "no_metadata_extracted"} - # 4. 清洗、校验、覆盖写入 - raw_dict = user_metadata.model_dump(exclude_none=True) + user_metadata, aliases_to_add, aliases_to_remove = extract_result + logger.info(f"[CELERY METADATA] LLM 别名新增: {aliases_to_add}, 移除: {aliases_to_remove}") + + # 4. 清洗元数据、覆盖写入元数据和别名 + raw_dict = user_metadata.model_dump(exclude_none=True) if user_metadata else {} logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}") - cleaned = clean_metadata(raw_dict) - if not cleaned: - logger.info(f"[CELERY METADATA] Cleaned metadata is empty for end_user_id={end_user_id}") - return {"status": "SUCCESS", "result": "empty_after_cleaning"} - + cleaned = clean_metadata(raw_dict) if raw_dict else {} logger.info(f"[CELERY METADATA] 清洗后元数据: {json.dumps(cleaned, ensure_ascii=False)}") - validated = validate_metadata(cleaned) - if not validated: - return {"status": "FAILURE", "error": "Metadata validation failed after cleaning"} - - # 直接覆盖写入(LLM 已完成语义合并,输出的是完整结果) - # 保留 _updated_at 时间戳追踪 from datetime import datetime as dt, timezone as tz now = dt.now(tz.utc).isoformat() + # 过滤别名中的占位名称,执行增量增删 + _PLACEHOLDER_NAMES = {"用户", "我", "user", "i"} + + def _filter_aliases(aliases_list): + seen = set() + result = [] + for a in aliases_list: + a_stripped = a.strip() + if a_stripped and a_stripped.lower() not in _PLACEHOLDER_NAMES and a_stripped.lower() not in seen: + result.append(a_stripped) + seen.add(a_stripped.lower()) + return result + + filtered_add = _filter_aliases(aliases_to_add) + filtered_remove = _filter_aliases(aliases_to_remove) + remove_lower = {a.lower() for a in filtered_remove} + with get_db_context() as db: end_user_uuid = uuid.UUID(end_user_id) info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid) + end_user = EndUserRepository(db).get_by_id(end_user_uuid) if info: - existing_meta = info.meta_data if info.meta_data else {} - logger.info(f"[CELERY METADATA] 数据库已有元数据: {json.dumps(existing_meta, ensure_ascii=False)}") + # 元数据覆盖写入 + if cleaned: + existing_meta = info.meta_data if info.meta_data else {} + updated_at = dict(existing_meta.get("_updated_at", {})) + _update_timestamps(existing_meta, cleaned, updated_at, now) + final = dict(cleaned) + final["_updated_at"] = updated_at + info.meta_data = final + logger.info("[CELERY METADATA] 覆盖写入元数据") - # 保留已有的 _updated_at,更新变更字段的时间戳 - updated_at = dict(existing_meta.get("_updated_at", {})) - _update_timestamps(existing_meta, cleaned, updated_at, now) + # 别名增量增删:(已有 - remove) + add + old_aliases = info.aliases if info.aliases else [] + # 先移除 + merged = [a for a in old_aliases if a.strip().lower() not in remove_lower] + # 再追加(去重) + existing_lower = {a.strip().lower() for a in merged} + for a in filtered_add: + if a.lower() not in existing_lower: + merged.append(a) + existing_lower.add(a.lower()) - final = dict(cleaned) - final["_updated_at"] = updated_at - info.meta_data = final - logger.info(f"[CELERY METADATA] 覆盖写入元数据: {json.dumps(final, ensure_ascii=False)}") + if merged != old_aliases: + info.aliases = merged + # other_name 更新逻辑 + if merged and ( + not info.other_name + or info.other_name.strip().lower() in _PLACEHOLDER_NAMES + or info.other_name.strip().lower() in remove_lower + ): + info.other_name = merged[0] + if end_user and merged and ( + not end_user.other_name + or end_user.other_name.strip().lower() in _PLACEHOLDER_NAMES + or end_user.other_name.strip().lower() in remove_lower + ): + end_user.other_name = merged[0] + logger.info( + f"[CELERY METADATA] 别名增量更新: {old_aliases} - {filtered_remove} + {filtered_add} → {merged}" + ) else: - logger.info( - f"[CELERY METADATA] No end_user_info record for end_user_id={end_user_id}, " - f"skipping metadata write (will be created by alias sync)" - ) - return {"status": "SUCCESS", "result": "no_info_record"} + # 没有 end_user_info 记录,创建一条 + from app.models.end_user_info_model import EndUserInfo + initial_aliases = filtered_add # 新记录只有 add,没有 remove + first_alias = initial_aliases[0] if initial_aliases else "" + if first_alias or cleaned: + new_info = EndUserInfo( + end_user_id=end_user_uuid, + other_name=first_alias or "", + aliases=initial_aliases, + meta_data=cleaned if cleaned else None, + ) + db.add(new_info) + if end_user and first_alias and ( + not end_user.other_name or end_user.other_name.strip().lower() in _PLACEHOLDER_NAMES + ): + end_user.other_name = first_alias + logger.info(f"[CELERY METADATA] 创建 end_user_info: other_name={first_alias}, aliases={initial_aliases}") + else: + return {"status": "SUCCESS", "result": "no_data_to_write"} db.commit() - return {"status": "SUCCESS", "result": "metadata_written"} + # 同步 PgSQL aliases 到 Neo4j 用户实体(PgSQL 为权威源) + final_aliases = info.aliases if info else initial_aliases + if final_aliases: + try: + from app.repositories.neo4j.neo4j_connector import Neo4jConnector + neo4j_connector = Neo4jConnector() + cypher = """ + MATCH (e:ExtractedEntity) + WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '我', 'User', 'I'] + SET e.aliases = $aliases + """ + await neo4j_connector.execute_query( + cypher, end_user_id=end_user_id, aliases=final_aliases + ) + await neo4j_connector.close() + logger.info(f"[CELERY METADATA] Neo4j 用户实体 aliases 已同步: {final_aliases}") + except Exception as neo4j_err: + logger.warning(f"[CELERY METADATA] Neo4j aliases 同步失败(不影响主流程): {neo4j_err}") + + return {"status": "SUCCESS", "result": "metadata_and_aliases_written"} loop = None try: