From 73fbc197471780a750b5298cb351fb3c3307bd01 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 16 Apr 2026 17:14:30 +0800 Subject: [PATCH] refactor(memory): switch metadata extraction from full-replace to incremental changes - Replace UserMetadata full-object overwrite with incremental MetadataFieldChange operations (set/remove per field path) - Convert profile.role and profile.domain from scalar strings to lists - Remove UserMetadataBehavioralHints and knowledge_tags fields - Update Jinja2 prompt to instruct LLM to output incremental changes - Update extract_user_metadata_task to apply changes via deep-copy and per-field mutation for proper SQLAlchemy change detection - Minor lint: remove unnecessary f-string prefixes in tasks.py --- api/app/core/memory/models/__init__.py | 4 +- api/app/core/memory/models/metadata_models.py | 40 ++++--- .../metadata_extractor.py | 9 +- .../prompts/extract_user_metadata.jinja2 | 103 ++++++++--------- api/app/tasks.py | 104 ++++++++++++------ 5 files changed, 152 insertions(+), 108 deletions(-) diff --git a/api/app/core/memory/models/__init__.py b/api/app/core/memory/models/__init__.py index eed8e8c4..2a34159b 100644 --- a/api/app/core/memory/models/__init__.py +++ b/api/app/core/memory/models/__init__.py @@ -61,9 +61,9 @@ from app.core.memory.models.triplet_models import ( # User metadata models from app.core.memory.models.metadata_models import ( UserMetadata, - UserMetadataBehavioralHints, UserMetadataProfile, MetadataExtractionResponse, + MetadataFieldChange, ) # Ontology scenario models (LLM extracted from scenarios) @@ -133,9 +133,9 @@ __all__ = [ "Triplet", "TripletExtractionResponse", "UserMetadata", - "UserMetadataBehavioralHints", "UserMetadataProfile", "MetadataExtractionResponse", + "MetadataFieldChange", # Ontology models "OntologyClass", "OntologyExtractionResponse", diff --git a/api/app/core/memory/models/metadata_models.py b/api/app/core/memory/models/metadata_models.py index 55c2359e..1b490dbc 100644 --- a/api/app/core/memory/models/metadata_models.py +++ b/api/app/core/memory/models/metadata_models.py @@ -4,7 +4,7 @@ Independent from triplet_models.py - these models are used by the standalone metadata extraction pipeline (post-dedup async Celery task). """ -from typing import List +from typing import List, Optional from pydantic import BaseModel, ConfigDict, Field @@ -13,8 +13,8 @@ class UserMetadataProfile(BaseModel): """用户画像信息""" model_config = ConfigDict(extra="ignore") - role: str = Field(default="", description="用户职业或角色") - domain: str = Field(default="", description="用户所在领域") + role: List[str] = Field(default_factory=list, description="用户职业或角色") + domain: List[str] = Field(default_factory=list, description="用户所在领域") expertise: List[str] = Field( default_factory=list, description="用户擅长的技能或工具" ) @@ -23,31 +23,37 @@ class UserMetadataProfile(BaseModel): ) -class UserMetadataBehavioralHints(BaseModel): - """行为偏好""" - - model_config = ConfigDict(extra="ignore") - learning_stage: str = Field(default="", description="学习阶段") - preferred_depth: str = Field(default="", description="偏好深度") - tone_preference: str = Field(default="", description="语气偏好") - - class UserMetadata(BaseModel): """用户元数据顶层结构""" model_config = ConfigDict(extra="ignore") profile: UserMetadataProfile = Field(default_factory=UserMetadataProfile) - behavioral_hints: UserMetadataBehavioralHints = Field( - default_factory=UserMetadataBehavioralHints + + +class MetadataFieldChange(BaseModel): + """单个元数据字段的变更操作""" + + model_config = ConfigDict(extra="ignore") + field_path: str = Field( + description="字段路径,用点号分隔,如 'profile.role'、'knowledge_tags'、'behavioral_hints.tone_preference'" + ) + action: str = Field( + description="操作类型:'set' 表示新增或修改,'remove' 表示移除" + ) + value: Optional[str] = Field( + default=None, + description="字段的新值(action='set' 时必填)。标量字段直接填值,列表字段填单个要新增的元素" ) - knowledge_tags: List[str] = Field(default_factory=list, description="知识标签") class MetadataExtractionResponse(BaseModel): - """元数据提取 LLM 响应结构""" + """元数据提取 LLM 响应结构(增量模式)""" model_config = ConfigDict(extra="ignore") - user_metadata: UserMetadata = Field(default_factory=UserMetadata) + metadata_changes: List[MetadataFieldChange] = Field( + default_factory=list, + description="元数据的增量变更列表,每项描述一个字段的新增、修改或移除操作", + ) aliases_to_add: List[str] = Field( default_factory=list, description="本次新发现的用户别名(用户自我介绍或他人对用户的称呼)", diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py index 19f1e533..29f4e85b 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py @@ -118,7 +118,7 @@ class MetadataExtractor: existing_aliases: Optional[List[str]] = None, ) -> Optional[tuple]: """ - 对筛选后的 statement 列表调用 LLM 提取元数据和用户别名。 + 对筛选后的 statement 列表调用 LLM 提取元数据增量变更和用户别名。 Args: statements: 用户发言的 statement 文本列表 @@ -126,7 +126,8 @@ class MetadataExtractor: existing_aliases: 数据库已有的用户别名列表(可选) Returns: - (UserMetadata, List[str], List[str]) tuple: (metadata, aliases_to_add, aliases_to_remove) on success, None on failure + (List[MetadataFieldChange], List[str], List[str]) tuple: + (metadata_changes, aliases_to_add, aliases_to_remove) on success, None on failure """ if not statements: return None @@ -160,12 +161,12 @@ class MetadataExtractor: ) if response: - metadata = response.user_metadata if response.user_metadata else None + changes = response.metadata_changes if response.metadata_changes else [] to_add = response.aliases_to_add if response.aliases_to_add else [] to_remove = ( response.aliases_to_remove if response.aliases_to_remove else [] ) - return metadata, to_add, to_remove + return changes, to_add, to_remove logger.warning("LLM 返回的响应为空") return None diff --git a/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 b/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 index 5d019b12..1c32d369 100644 --- a/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 +++ b/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 @@ -1,5 +1,5 @@ ===Task=== -Extract user metadata from the following conversation statements spoken by the user. +Extract user metadata changes from the following conversation statements spoken by the user. {% if language == "zh" %} **"三度原则"判断标准:** @@ -10,28 +10,36 @@ Extract user metadata from the following conversation statements spoken by the u **提取规则:** - **只提取关于"用户本人"的画像信息**,忽略用户提到的第三方人物(如朋友、同事、家人)的信息 - 仅提取文本中明确提到的信息,不要推测 -- 如果文本中没有可提取的用户画像信息,返回空的 user_metadata 对象 - **输出语言必须与输入文本的语言一致**(输入中文则输出中文值,输入英文则输出英文值) +**增量模式(重要):** +你只需要输出**本次对话引起的变更操作**,不要输出完整的元数据。每个变更是一个对象,包含: +- `field_path`:字段路径,用点号分隔(如 `profile.role`、`profile.expertise`) +- `action`:操作类型 + * `set`:新增或修改一个字段的值 + * `remove`:移除一个字段的值 +- `value`:字段的新值(`action="set"` 时必填,`action="remove"` 时填要移除的元素值) + * 所有字段均为列表类型,每个元素一条变更记录 + +**判断规则:** +- 用户提到新信息 → `action="set"`,填入新值 +- 用户明确否定已有信息(如"我不再做老师了"、"我已经不学Python了")→ `action="remove"`,`value` 填要移除的元素值 +- 如果本次对话没有任何可提取的变更,返回空的 `metadata_changes` 数组 `[]` +- **不要为未被提及的字段生成任何变更操作** + {% if existing_metadata %} -**重要:合并已有元数据** -下方提供了数据库中已有的用户元数据。请结合用户最新发言,输出**合并后的完整元数据**: -- 如果用户明确否定了已有信息(如"我不再教高中物理了"),在输出中**移除**该信息 -- 如果用户提到了新信息,**添加**到对应字段中 -- 如果已有信息未被用户否定,**保留**在输出中 -- 标量字段(如 role、domain):如果用户提到了新值,用新值替换;否则保留已有值 -- 最终输出应该是完整的、合并后的元数据,不是增量 +**已有元数据(仅供参考,用于判断是否需要变更):** +请对比已有数据和用户最新发言,只输出差异部分的变更操作。 +- 如果用户说的信息和已有数据一致,不需要输出变更 +- 如果用户否定了已有数据中的某个值,输出 `remove` 操作 +- 如果用户提到了新信息,输出 `set` 操作 {% endif %} **字段说明:** -- profile.role:用户的职业或角色,如 教师、医生、后端工程师 -- profile.domain:用户所在领域,如 教育、医疗、软件开发 -- profile.expertise:用户擅长的技能或工具(通用,不限于编程),如 Python、心理咨询、高中物理 -- profile.interests:用户主动表达兴趣的话题或领域标签 -- behavioral_hints.learning_stage:学习阶段(初学者/中级/高级) -- behavioral_hints.preferred_depth:偏好深度(概览/技术细节/深入探讨) -- behavioral_hints.tone_preference:语气偏好(轻松随意/专业简洁/学术严谨) -- knowledge_tags:用户涉及的知识领域标签 +- profile.role:用户的职业或角色(列表),如 教师、医生、后端工程师,一个人可以有多个角色 +- profile.domain:用户所在领域(列表),如 教育、医疗、软件开发,一个人可以涉及多个领域 +- profile.expertise:用户擅长的技能或工具(列表),如 Python、心理咨询、高中物理 +- profile.interests:用户主动表达兴趣的话题或领域标签(列表) **用户别名变更(增量模式):** - **aliases_to_add**:本次新发现的用户别名,包括: @@ -43,7 +51,6 @@ Extract user metadata from the following conversation statements spoken by the u - **aliases_to_remove**:用户明确否认的别名,包括: * 用户说"我不叫XX了"、"别叫我XX"、"我改名了,不叫XX" → 将 XX 放入此数组 * **严格限制**:只将用户原文中**逐字提到**的被否认名字放入,不要推断关联的其他别名 - * 例如:用户说"我不叫陈小刀了" → 只移除"陈小刀",不要移除"陈哥"、"老陈"等未被提及的别名 * 如果没有要移除的别名,返回空数组 `[]` {% if existing_aliases %} - 已有别名:{{ existing_aliases | tojson }}(仅供参考,不需要在输出中重复) @@ -57,28 +64,36 @@ Extract user metadata from the following conversation statements spoken by the u **Extraction rules:** - **Only extract profile information about the user themselves**, ignore information about third parties (friends, colleagues, family) mentioned by the user - Only extract information explicitly mentioned in the text, do not speculate -- If no user profile information can be extracted, return an empty user_metadata object - **Output language must match the input text language** +**Incremental mode (important):** +You should only output **the change operations caused by this conversation**, not the complete metadata. Each change is an object containing: +- `field_path`: Field path separated by dots (e.g. `profile.role`, `profile.expertise`) +- `action`: Operation type + * `set`: Add or update a field value + * `remove`: Remove a field value +- `value`: The new value for the field (required when `action="set"`, for `action="remove"` fill in the element value to remove) + * All fields are list types, one change record per element + +**Decision rules:** +- User mentions new information → `action="set"`, fill in the new value +- User explicitly negates existing info (e.g. "I'm no longer a teacher", "I stopped learning Python") → `action="remove"`, `value` is the element to remove +- If this conversation has no extractable changes, return an empty `metadata_changes` array `[]` +- **Do NOT generate any change operations for fields not mentioned in the conversation** + {% if existing_metadata %} -**Important: Merge with existing metadata** -Existing user metadata from the database is provided below. Combine with the user's latest statements to output the **complete merged metadata**: -- If the user explicitly negates existing info (e.g. "I no longer teach high school physics"), **remove** it from output -- If the user mentions new info, **add** it to the corresponding field -- If existing info is not negated by the user, **keep** it in the output -- Scalar fields (e.g. role, domain): replace with new value if user mentions one; otherwise keep existing -- The final output should be the complete, merged metadata — not an incremental update +**Existing metadata (for reference only, to determine if changes are needed):** +Compare existing data with the user's latest statements, and only output change operations for the differences. +- If the user's statement matches existing data, no change is needed +- If the user negates a value in existing data, output a `remove` operation +- If the user mentions new information, output a `set` operation {% endif %} **Field descriptions:** -- profile.role: User's occupation or role, e.g. teacher, doctor, software engineer -- profile.domain: User's domain, e.g. education, healthcare, software development -- profile.expertise: User's skills or tools (general, not limited to programming) -- profile.interests: Topics or domain tags the user actively expressed interest in -- behavioral_hints.learning_stage: Learning stage (beginner/intermediate/advanced) -- behavioral_hints.preferred_depth: Preferred depth (overview/detailed/deep dive) -- behavioral_hints.tone_preference: Tone preference (casual/professional/academic) -- knowledge_tags: Knowledge domain tags related to the user +- profile.role: User's occupation or role (list), e.g. teacher, doctor, software engineer. A person can have multiple roles +- profile.domain: User's domain (list), e.g. education, healthcare, software development. A person can span multiple domains +- profile.expertise: User's skills or tools (list), e.g. Python, counseling, physics +- profile.interests: Topics or domain tags the user actively expressed interest in (list) **User alias changes (incremental mode):** - **aliases_to_add**: Newly discovered user aliases from this conversation, including: @@ -90,7 +105,6 @@ Existing user metadata from the database is provided below. Combine with the use - **aliases_to_remove**: Aliases the user explicitly denies, including: * User says "Don't call me XX anymore", "I'm not called XX", "I changed my name from XX" → put XX in this array * **Strict rule**: Only include the exact name the user **verbatim mentions** as denied. Do NOT infer or remove related aliases - * Example: User says "I'm not called John anymore" → only remove "John", do NOT remove "Johnny", "J" or other related aliases not mentioned * If no aliases to remove, return empty array `[]` {% if existing_aliases %} - Existing aliases: {{ existing_aliases | tojson }} (for reference only, do not repeat in output) @@ -113,20 +127,11 @@ Existing user metadata from the database is provided below. Combine with the use Return a JSON object with the following structure: ```json { - "user_metadata": { - "profile": { - "role": "", - "domain": "", - "expertise": [], - "interests": [] - }, - "behavioral_hints": { - "learning_stage": "", - "preferred_depth": "", - "tone_preference": "" - }, - "knowledge_tags": [] - }, + "metadata_changes": [ + {"field_path": "profile.role", "action": "set", "value": "后端工程师"}, + {"field_path": "profile.expertise", "action": "set", "value": "Python"}, + {"field_path": "profile.expertise", "action": "remove", "value": "Java"} + ], "aliases_to_add": [], "aliases_to_remove": [] } diff --git a/api/app/tasks.py b/api/app/tasks.py index 5a71066a..2c2f50a9 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -455,7 +455,7 @@ def build_graphrag_for_kb(kb_id: uuid.UUID): db_knowledge = db.query(Knowledge).filter(Knowledge.id == kb_id).first() if db_knowledge is None: logger.error(f"[GraphRAG-KB] knowledge={kb_id} not found") - return f"build knowledge graph failed: knowledge not found" + return "build knowledge graph failed: knowledge not found" if not (db_knowledge.parser_config and db_knowledge.parser_config.get("graphrag", {}).get("use_graphrag", False)): @@ -538,7 +538,7 @@ def build_graphrag_for_document(document_id: str, knowledge_id: str): db_knowledge = db.query(Knowledge).filter(Knowledge.id == uuid.UUID(knowledge_id)).first() if db_document is None or db_knowledge is None: logger.error(f"[GraphRAG] document={document_id} or knowledge={knowledge_id} not found") - return f"build_graphrag_for_document failed: record not found" + return "build_graphrag_for_document failed: record not found" graphrag_conf = db_knowledge.parser_config.get("graphrag", {}) with_resolution = graphrag_conf.get("resolution", False) @@ -617,7 +617,7 @@ def sync_knowledge_for_kb(kb_id: uuid.UUID): db_knowledge = db.query(Knowledge).filter(Knowledge.id == kb_id).first() if db_knowledge is None: logger.error(f"[SyncKB] knowledge={kb_id} not found") - return f"sync knowledge failed: knowledge not found" + return "sync knowledge failed: knowledge not found" # 1. get vector_service vector_service = ElasticSearchVectorFactory().init_vector(knowledge=db_knowledge) @@ -3102,29 +3102,11 @@ def extract_user_metadata_task( logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}") return {"status": "SUCCESS", "result": "no_metadata_extracted"} - user_metadata, aliases_to_add, aliases_to_remove = extract_result - logger.info(f"[CELERY METADATA] LLM 别名新增: {aliases_to_add}, 移除: {aliases_to_remove}") - - # 4. 清洗元数据、覆盖写入元数据和别名 - def clean_metadata(raw: dict) -> dict: - """递归移除空字符串、空列表、空字典。""" - result = {} - for k, v in raw.items(): - if v == "" or v == []: - continue - if isinstance(v, dict): - cleaned = clean_metadata(v) - if cleaned: - result[k] = cleaned - else: - result[k] = v - return result - - raw_dict = user_metadata.model_dump(exclude_none=True) if user_metadata else {} - logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}") - - cleaned = clean_metadata(raw_dict) if raw_dict else {} - logger.info(f"[CELERY METADATA] 清洗后元数据: {json.dumps(cleaned, ensure_ascii=False)}") + metadata_changes, aliases_to_add, aliases_to_remove = extract_result + logger.info( + f"[CELERY METADATA] LLM 元数据变更: {[c.model_dump() for c in metadata_changes]}, " + f"别名新增: {aliases_to_add}, 移除: {aliases_to_remove}" + ) from datetime import datetime as dt, timezone as tz now = dt.now(tz.utc).isoformat() @@ -3152,15 +3134,50 @@ def extract_user_metadata_task( end_user = EndUserRepository(db).get_by_id(end_user_uuid) if info: - # 元数据覆盖写入 - if cleaned: - existing_meta = info.meta_data if info.meta_data else {} + # 4. 元数据增量更新(按 LLM 输出的变更操作逐条执行,所有字段均为列表类型) + if metadata_changes: + # 深拷贝,确保 SQLAlchemy 能检测到变更 + import copy + existing_meta = copy.deepcopy(info.meta_data) if info.meta_data else {} updated_at = dict(existing_meta.get("_updated_at", {})) - _update_timestamps(existing_meta, cleaned, updated_at, now) - final = dict(cleaned) - final["_updated_at"] = updated_at - info.meta_data = final - logger.info("[CELERY METADATA] 覆盖写入元数据") + + for change in metadata_changes: + field_path = change.field_path + action = change.action + value = change.value + + if not value or not value.strip(): + continue + + # 定位到目标字段的父级节点 + parts = field_path.split(".") + target = existing_meta + for part in parts[:-1]: + target = target.setdefault(part, {}) + leaf = parts[-1] + + current_list = target.get(leaf, []) + if not isinstance(current_list, list): + current_list = [] + + if action == "set": + if value not in current_list: + current_list.append(value) + target[leaf] = current_list + logger.info(f"[CELERY METADATA] set {field_path} = {value}") + + elif action == "remove": + if value in current_list: + current_list.remove(value) + target[leaf] = current_list + logger.info(f"[CELERY METADATA] remove {value} from {field_path}") + + updated_at[field_path] = now + + existing_meta["_updated_at"] = updated_at + # 赋值深拷贝后的新对象,SQLAlchemy 会检测到字段变更并写入 + info.meta_data = existing_meta + logger.info(f"[CELERY METADATA] 增量更新元数据完成: {json.dumps(existing_meta, ensure_ascii=False)}") # 别名增量增删:(已有 - remove) + add old_aliases = info.aliases if info.aliases else [] @@ -3196,12 +3213,27 @@ def extract_user_metadata_task( from app.models.end_user_info_model import EndUserInfo initial_aliases = filtered_add # 新记录只有 add,没有 remove first_alias = initial_aliases[0] if initial_aliases else "" - if first_alias or cleaned: + + # 从变更操作构建初始元数据(所有字段均为列表类型) + initial_meta = {} + for change in metadata_changes: + if change.action == "set" and change.value is not None and change.value.strip(): + parts = change.field_path.split(".") + target = initial_meta + for part in parts[:-1]: + target = target.setdefault(part, {}) + leaf = parts[-1] + current_list = target.get(leaf, []) + if change.value not in current_list: + current_list.append(change.value) + target[leaf] = current_list + + if first_alias or initial_meta: new_info = EndUserInfo( end_user_id=end_user_uuid, other_name=first_alias or "", aliases=initial_aliases, - meta_data=cleaned if cleaned else None, + meta_data=initial_meta if initial_meta else None, ) db.add(new_info) if end_user and first_alias and (