Merge pull request #918 from SuanmoSuanyangTechnology/fix/extract-metadata

refactor(memory): switch metadata extraction from full-replace to inc…
This commit is contained in:
Ke Sun
2026-04-17 10:58:51 +08:00
committed by GitHub
5 changed files with 150 additions and 108 deletions

View File

@@ -61,9 +61,9 @@ from app.core.memory.models.triplet_models import (
# User metadata models
from app.core.memory.models.metadata_models import (
UserMetadata,
UserMetadataBehavioralHints,
UserMetadataProfile,
MetadataExtractionResponse,
MetadataFieldChange,
)
# Ontology scenario models (LLM extracted from scenarios)
@@ -133,9 +133,9 @@ __all__ = [
"Triplet",
"TripletExtractionResponse",
"UserMetadata",
"UserMetadataBehavioralHints",
"UserMetadataProfile",
"MetadataExtractionResponse",
"MetadataFieldChange",
# Ontology models
"OntologyClass",
"OntologyExtractionResponse",

View File

@@ -4,7 +4,7 @@ Independent from triplet_models.py - these models are used by the
standalone metadata extraction pipeline (post-dedup async Celery task).
"""
from typing import List
from typing import List, Literal, Optional
from pydantic import BaseModel, ConfigDict, Field
@@ -13,8 +13,8 @@ class UserMetadataProfile(BaseModel):
"""用户画像信息"""
model_config = ConfigDict(extra="ignore")
role: str = Field(default="", description="用户职业或角色")
domain: str = Field(default="", description="用户所在领域")
role: List[str] = Field(default_factory=list, description="用户职业或角色")
domain: List[str] = Field(default_factory=list, description="用户所在领域")
expertise: List[str] = Field(
default_factory=list, description="用户擅长的技能或工具"
)
@@ -23,31 +23,37 @@ class UserMetadataProfile(BaseModel):
)
class UserMetadataBehavioralHints(BaseModel):
"""行为偏好"""
model_config = ConfigDict(extra="ignore")
learning_stage: str = Field(default="", description="学习阶段")
preferred_depth: str = Field(default="", description="偏好深度")
tone_preference: str = Field(default="", description="语气偏好")
class UserMetadata(BaseModel):
"""用户元数据顶层结构"""
model_config = ConfigDict(extra="ignore")
profile: UserMetadataProfile = Field(default_factory=UserMetadataProfile)
behavioral_hints: UserMetadataBehavioralHints = Field(
default_factory=UserMetadataBehavioralHints
class MetadataFieldChange(BaseModel):
"""单个元数据字段的变更操作"""
model_config = ConfigDict(extra="ignore")
field_path: str = Field(
description="字段路径,用点号分隔,如 'profile.role''profile.expertise'"
)
action: Literal["set", "remove"] = Field(
description="操作类型:'set' 表示新增或修改,'remove' 表示移除"
)
value: Optional[str] = Field(
default=None,
description="字段的新值action='set' 时必填)。标量字段直接填值,列表字段填单个要新增的元素"
)
knowledge_tags: List[str] = Field(default_factory=list, description="知识标签")
class MetadataExtractionResponse(BaseModel):
"""元数据提取 LLM 响应结构"""
"""元数据提取 LLM 响应结构(增量模式)"""
model_config = ConfigDict(extra="ignore")
user_metadata: UserMetadata = Field(default_factory=UserMetadata)
metadata_changes: List[MetadataFieldChange] = Field(
default_factory=list,
description="元数据的增量变更列表,每项描述一个字段的新增、修改或移除操作",
)
aliases_to_add: List[str] = Field(
default_factory=list,
description="本次新发现的用户别名(用户自我介绍或他人对用户的称呼)",

View File

@@ -118,7 +118,7 @@ class MetadataExtractor:
existing_aliases: Optional[List[str]] = None,
) -> Optional[tuple]:
"""
对筛选后的 statement 列表调用 LLM 提取元数据和用户别名。
对筛选后的 statement 列表调用 LLM 提取元数据增量变更和用户别名。
Args:
statements: 用户发言的 statement 文本列表
@@ -126,7 +126,8 @@ class MetadataExtractor:
existing_aliases: 数据库已有的用户别名列表(可选)
Returns:
(UserMetadata, List[str], List[str]) tuple: (metadata, aliases_to_add, aliases_to_remove) on success, None on failure
(List[MetadataFieldChange], List[str], List[str]) tuple:
(metadata_changes, aliases_to_add, aliases_to_remove) on success, None on failure
"""
if not statements:
return None
@@ -160,12 +161,12 @@ class MetadataExtractor:
)
if response:
metadata = response.user_metadata if response.user_metadata else None
changes = response.metadata_changes if response.metadata_changes else []
to_add = response.aliases_to_add if response.aliases_to_add else []
to_remove = (
response.aliases_to_remove if response.aliases_to_remove else []
)
return metadata, to_add, to_remove
return changes, to_add, to_remove
logger.warning("LLM 返回的响应为空")
return None

View File

@@ -1,5 +1,5 @@
===Task===
Extract user metadata from the following conversation statements spoken by the user.
Extract user metadata changes from the following conversation statements spoken by the user.
{% if language == "zh" %}
**"三度原则"判断标准:**
@@ -10,28 +10,36 @@ Extract user metadata from the following conversation statements spoken by the u
**提取规则:**
- **只提取关于"用户本人"的画像信息**,忽略用户提到的第三方人物(如朋友、同事、家人)的信息
- 仅提取文本中明确提到的信息,不要推测
- 如果文本中没有可提取的用户画像信息,返回空的 user_metadata 对象
- **输出语言必须与输入文本的语言一致**(输入中文则输出中文值,输入英文则输出英文值)
**增量模式(重要):**
你只需要输出**本次对话引起的变更操作**,不要输出完整的元数据。每个变更是一个对象,包含:
- `field_path`:字段路径,用点号分隔(如 `profile.role`、`profile.expertise`
- `action`:操作类型
* `set`:新增或修改一个字段的值
* `remove`:移除一个字段的值
- `value`:字段的新值(`action="set"` 时必填,`action="remove"` 时填要移除的元素值)
* 所有字段均为列表类型,每个元素一条变更记录
**判断规则:**
- 用户提到新信息 → `action="set"`,填入新值
- 用户明确否定已有信息(如"我不再做老师了"、"我已经不学Python了")→ `action="remove"``value` 填要移除的元素值
- 如果本次对话没有任何可提取的变更,返回空的 `metadata_changes` 数组 `[]`
- **不要为未被提及的字段生成任何变更操作**
{% if existing_metadata %}
**重要:合并已有元数据**
下方提供了数据库中已有的用户元数据。请结合用户最新发言,输出**合并后的完整元数据**
- 如果用户明确否定了已有信息(如"我不再教高中物理了"),在输出中**移除**该信息
- 如果用户提到了新信息,**添加**到对应字段中
- 如果已有信息未被用户否定,**保留**在输出中
- 标量字段(如 role、domain如果用户提到了新值用新值替换否则保留已有值
- 最终输出应该是完整的、合并后的元数据,不是增量
**已有元数据(仅供参考,用于判断是否需要变更):**
请对比已有数据和用户最新发言,输出差异部分的变更操作。
- 如果用户说的信息和已有数据一致,不需要输出变更
- 如果用户否定了已有数据中的某个值,输出 `remove` 操作
- 如果用户提到了新信息,输出 `set` 操作
{% endif %}
**字段说明:**
- profile.role用户的职业或角色如 教师、医生、后端工程师
- profile.domain用户所在领域如 教育、医疗、软件开发
- profile.expertise用户擅长的技能或工具通用,不限于编程),如 Python、心理咨询、高中物理
- profile.interests用户主动表达兴趣的话题或领域标签
- behavioral_hints.learning_stage学习阶段初学者/中级/高级)
- behavioral_hints.preferred_depth偏好深度概览/技术细节/深入探讨)
- behavioral_hints.tone_preference语气偏好轻松随意/专业简洁/学术严谨)
- knowledge_tags用户涉及的知识领域标签
- profile.role用户的职业或角色(列表),如 教师、医生、后端工程师,一个人可以有多个角色
- profile.domain用户所在领域(列表),如 教育、医疗、软件开发,一个人可以涉及多个领域
- profile.expertise用户擅长的技能或工具列表),如 Python、心理咨询、高中物理
- profile.interests用户主动表达兴趣的话题或领域标签(列表)
**用户别名变更(增量模式):**
- **aliases_to_add**:本次新发现的用户别名,包括:
@@ -43,7 +51,6 @@ Extract user metadata from the following conversation statements spoken by the u
- **aliases_to_remove**:用户明确否认的别名,包括:
* 用户说"我不叫XX了"、"别叫我XX"、"我改名了不叫XX" → 将 XX 放入此数组
* **严格限制**:只将用户原文中**逐字提到**的被否认名字放入,不要推断关联的其他别名
* 例如:用户说"我不叫陈小刀了" → 只移除"陈小刀",不要移除"陈哥"、"老陈"等未被提及的别名
* 如果没有要移除的别名,返回空数组 `[]`
{% if existing_aliases %}
- 已有别名:{{ existing_aliases | tojson }}(仅供参考,不需要在输出中重复)
@@ -57,28 +64,36 @@ Extract user metadata from the following conversation statements spoken by the u
**Extraction rules:**
- **Only extract profile information about the user themselves**, ignore information about third parties (friends, colleagues, family) mentioned by the user
- Only extract information explicitly mentioned in the text, do not speculate
- If no user profile information can be extracted, return an empty user_metadata object
- **Output language must match the input text language**
**Incremental mode (important):**
You should only output **the change operations caused by this conversation**, not the complete metadata. Each change is an object containing:
- `field_path`: Field path separated by dots (e.g. `profile.role`, `profile.expertise`)
- `action`: Operation type
* `set`: Add or update a field value
* `remove`: Remove a field value
- `value`: The new value for the field (required when `action="set"`, for `action="remove"` fill in the element value to remove)
* All fields are list types, one change record per element
**Decision rules:**
- User mentions new information → `action="set"`, fill in the new value
- User explicitly negates existing info (e.g. "I'm no longer a teacher", "I stopped learning Python") → `action="remove"`, `value` is the element to remove
- If this conversation has no extractable changes, return an empty `metadata_changes` array `[]`
- **Do NOT generate any change operations for fields not mentioned in the conversation**
{% if existing_metadata %}
**Important: Merge with existing metadata**
Existing user metadata from the database is provided below. Combine with the user's latest statements to output the **complete merged metadata**:
- If the user explicitly negates existing info (e.g. "I no longer teach high school physics"), **remove** it from output
- If the user mentions new info, **add** it to the corresponding field
- If existing info is not negated by the user, **keep** it in the output
- Scalar fields (e.g. role, domain): replace with new value if user mentions one; otherwise keep existing
- The final output should be the complete, merged metadata — not an incremental update
**Existing metadata (for reference only, to determine if changes are needed):**
Compare existing data with the user's latest statements, and only output change operations for the differences.
- If the user's statement matches existing data, no change is needed
- If the user negates a value in existing data, output a `remove` operation
- If the user mentions new information, output a `set` operation
{% endif %}
**Field descriptions:**
- profile.role: User's occupation or role, e.g. teacher, doctor, software engineer
- profile.domain: User's domain, e.g. education, healthcare, software development
- profile.expertise: User's skills or tools (general, not limited to programming)
- profile.interests: Topics or domain tags the user actively expressed interest in
- behavioral_hints.learning_stage: Learning stage (beginner/intermediate/advanced)
- behavioral_hints.preferred_depth: Preferred depth (overview/detailed/deep dive)
- behavioral_hints.tone_preference: Tone preference (casual/professional/academic)
- knowledge_tags: Knowledge domain tags related to the user
- profile.role: User's occupation or role (list), e.g. teacher, doctor, software engineer. A person can have multiple roles
- profile.domain: User's domain (list), e.g. education, healthcare, software development. A person can span multiple domains
- profile.expertise: User's skills or tools (list), e.g. Python, counseling, physics
- profile.interests: Topics or domain tags the user actively expressed interest in (list)
**User alias changes (incremental mode):**
- **aliases_to_add**: Newly discovered user aliases from this conversation, including:
@@ -90,7 +105,6 @@ Existing user metadata from the database is provided below. Combine with the use
- **aliases_to_remove**: Aliases the user explicitly denies, including:
* User says "Don't call me XX anymore", "I'm not called XX", "I changed my name from XX" → put XX in this array
* **Strict rule**: Only include the exact name the user **verbatim mentions** as denied. Do NOT infer or remove related aliases
* Example: User says "I'm not called John anymore" → only remove "John", do NOT remove "Johnny", "J" or other related aliases not mentioned
* If no aliases to remove, return empty array `[]`
{% if existing_aliases %}
- Existing aliases: {{ existing_aliases | tojson }} (for reference only, do not repeat in output)
@@ -113,20 +127,11 @@ Existing user metadata from the database is provided below. Combine with the use
Return a JSON object with the following structure:
```json
{
"user_metadata": {
"profile": {
"role": "",
"domain": "",
"expertise": [],
"interests": []
},
"behavioral_hints": {
"learning_stage": "",
"preferred_depth": "",
"tone_preference": ""
},
"knowledge_tags": []
},
"metadata_changes": [
{"field_path": "profile.role", "action": "set", "value": "后端工程师"},
{"field_path": "profile.expertise", "action": "set", "value": "Python"},
{"field_path": "profile.expertise", "action": "remove", "value": "Java"}
],
"aliases_to_add": [],
"aliases_to_remove": []
}

View File

@@ -455,7 +455,7 @@ def build_graphrag_for_kb(kb_id: uuid.UUID):
db_knowledge = db.query(Knowledge).filter(Knowledge.id == kb_id).first()
if db_knowledge is None:
logger.error(f"[GraphRAG-KB] knowledge={kb_id} not found")
return f"build knowledge graph failed: knowledge not found"
return "build knowledge graph failed: knowledge not found"
if not (db_knowledge.parser_config and
db_knowledge.parser_config.get("graphrag", {}).get("use_graphrag", False)):
@@ -538,7 +538,7 @@ def build_graphrag_for_document(document_id: str, knowledge_id: str):
db_knowledge = db.query(Knowledge).filter(Knowledge.id == uuid.UUID(knowledge_id)).first()
if db_document is None or db_knowledge is None:
logger.error(f"[GraphRAG] document={document_id} or knowledge={knowledge_id} not found")
return f"build_graphrag_for_document failed: record not found"
return "build_graphrag_for_document failed: record not found"
graphrag_conf = db_knowledge.parser_config.get("graphrag", {})
with_resolution = graphrag_conf.get("resolution", False)
@@ -617,7 +617,7 @@ def sync_knowledge_for_kb(kb_id: uuid.UUID):
db_knowledge = db.query(Knowledge).filter(Knowledge.id == kb_id).first()
if db_knowledge is None:
logger.error(f"[SyncKB] knowledge={kb_id} not found")
return f"sync knowledge failed: knowledge not found"
return "sync knowledge failed: knowledge not found"
# 1. get vector_service
vector_service = ElasticSearchVectorFactory().init_vector(knowledge=db_knowledge)
@@ -3102,29 +3102,11 @@ def extract_user_metadata_task(
logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}")
return {"status": "SUCCESS", "result": "no_metadata_extracted"}
user_metadata, aliases_to_add, aliases_to_remove = extract_result
logger.info(f"[CELERY METADATA] LLM 别名新增: {aliases_to_add}, 移除: {aliases_to_remove}")
# 4. 清洗元数据、覆盖写入元数据和别名
def clean_metadata(raw: dict) -> dict:
"""递归移除空字符串、空列表、空字典。"""
result = {}
for k, v in raw.items():
if v == "" or v == []:
continue
if isinstance(v, dict):
cleaned = clean_metadata(v)
if cleaned:
result[k] = cleaned
else:
result[k] = v
return result
raw_dict = user_metadata.model_dump(exclude_none=True) if user_metadata else {}
logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}")
cleaned = clean_metadata(raw_dict) if raw_dict else {}
logger.info(f"[CELERY METADATA] 清洗后元数据: {json.dumps(cleaned, ensure_ascii=False)}")
metadata_changes, aliases_to_add, aliases_to_remove = extract_result
logger.info(
f"[CELERY METADATA] LLM 元数据变更: {[c.model_dump() for c in metadata_changes]}, "
f"别名新增: {aliases_to_add}, 移除: {aliases_to_remove}"
)
from datetime import datetime as dt, timezone as tz
now = dt.now(tz.utc).isoformat()
@@ -3152,15 +3134,48 @@ def extract_user_metadata_task(
end_user = EndUserRepository(db).get_by_id(end_user_uuid)
if info:
# 元数据覆盖写入
if cleaned:
existing_meta = info.meta_data if info.meta_data else {}
# 4. 元数据增量更新(按 LLM 输出的变更操作逐条执行,所有字段均为列表类型)
if metadata_changes:
# 深拷贝,确保 SQLAlchemy 能检测到变更
import copy
existing_meta = copy.deepcopy(info.meta_data) if info.meta_data else {}
updated_at = dict(existing_meta.get("_updated_at", {}))
_update_timestamps(existing_meta, cleaned, updated_at, now)
final = dict(cleaned)
final["_updated_at"] = updated_at
info.meta_data = final
logger.info("[CELERY METADATA] 覆盖写入元数据")
for change in metadata_changes:
field_path = change.field_path
action = change.action
value = change.value
if not value or not value.strip():
continue
# 定位到目标字段的父级节点
parts = field_path.split(".")
target = existing_meta
for part in parts[:-1]:
target = target.setdefault(part, {})
leaf = parts[-1]
current_list = target.get(leaf, [])
if action == "set":
if value not in current_list:
current_list.append(value)
target[leaf] = current_list
logger.info(f"[CELERY METADATA] set {field_path} = {value}")
elif action == "remove":
if value in current_list:
current_list.remove(value)
target[leaf] = current_list
logger.info(f"[CELERY METADATA] remove {value} from {field_path}")
updated_at[field_path] = now
existing_meta["_updated_at"] = updated_at
# 赋值深拷贝后的新对象SQLAlchemy 会检测到字段变更并写入
info.meta_data = existing_meta
logger.info(f"[CELERY METADATA] 增量更新元数据完成: {json.dumps(existing_meta, ensure_ascii=False)}")
# 别名增量增删:(已有 - remove) + add
old_aliases = info.aliases if info.aliases else []
@@ -3196,12 +3211,27 @@ def extract_user_metadata_task(
from app.models.end_user_info_model import EndUserInfo
initial_aliases = filtered_add # 新记录只有 add没有 remove
first_alias = initial_aliases[0] if initial_aliases else ""
if first_alias or cleaned:
# 从变更操作构建初始元数据(所有字段均为列表类型)
initial_meta = {}
for change in metadata_changes:
if change.action == "set" and change.value is not None and change.value.strip():
parts = change.field_path.split(".")
target = initial_meta
for part in parts[:-1]:
target = target.setdefault(part, {})
leaf = parts[-1]
current_list = target.get(leaf, [])
if change.value not in current_list:
current_list.append(change.value)
target[leaf] = current_list
if first_alias or initial_meta:
new_info = EndUserInfo(
end_user_id=end_user_uuid,
other_name=first_alias or "",
aliases=initial_aliases,
meta_data=cleaned if cleaned else None,
meta_data=initial_meta if initial_meta else None,
)
db.add(new_info)
if end_user and first_alias and (