refactor(memory): delegate metadata merging to LLM instead of code-based merge

- Remove merge_metadata and its helper functions from metadata_utils.py
- Pass existing_metadata to MetadataExtractor.extract_metadata() as LLM context
- Add merge instructions to extract_user_metadata.jinja2 prompt (zh/en)
- Update Celery task to read existing metadata before extraction and overwrite
- Simplify field descriptions in UserMetadataProfile model
- Add _update_timestamps helper to track changed fields
This commit is contained in:
lanceyq
2026-04-09 15:10:29 +08:00
parent f2d7479229
commit e0546e01ef
5 changed files with 87 additions and 148 deletions

View File

@@ -12,8 +12,8 @@ from pydantic import BaseModel, ConfigDict, Field
class UserMetadataProfile(BaseModel):
"""用户画像信息"""
model_config = ConfigDict(extra='ignore')
role: str = Field(default="", description="用户职业或角色,如 teacher, doctor, software_engineer")
domain: str = Field(default="", description="用户所在领域,如 education, healthcare, software_development")
role: str = Field(default="", description="用户职业或角色")
domain: str = Field(default="", description="用户所在领域")
expertise: List[str] = Field(default_factory=list, description="用户擅长的技能或工具")
interests: List[str] = Field(default_factory=list, description="用户关注的话题或领域标签")

View File

@@ -111,10 +111,15 @@ class MetadataExtractor:
)
return result
async def extract_metadata(self, statements: List[str]) -> Optional[UserMetadata]:
async def extract_metadata(self, statements: List[str], existing_metadata: Optional[dict] = None) -> Optional[UserMetadata]:
"""
对筛选后的 statement 列表调用 LLM 提取元数据。
语言根据 statement 内容自动检测,不依赖系统界面语言。
传入已有元数据作为上下文,让 LLM 能判断 replace/remove 操作。
Args:
statements: 用户发言的 statement 文本列表
existing_metadata: 数据库已有的元数据(可选),用于 LLM 对比判断变更
Returns:
UserMetadata on success, None on failure
@@ -133,6 +138,7 @@ class MetadataExtractor:
prompt = template.render(
statements=statements,
language=detected_language,
existing_metadata=existing_metadata,
json_schema="",
)

View File

@@ -1,10 +1,8 @@
"""
Metadata utility functions for cleaning, validating, aggregating, and merging
user metadata extracted from conversations.
Metadata utility functions for cleaning and validating user metadata.
"""
import logging
from datetime import datetime, timezone
from typing import Optional
from app.core.memory.models.metadata_models import UserMetadata
@@ -34,7 +32,7 @@ def clean_metadata(raw: dict) -> dict:
cleaned[key] = value
return cleaned
# TODO 这个函数没有调用的地方
def validate_metadata(raw: dict) -> Optional[UserMetadata]:
"""
Validate metadata structure using the Pydantic UserMetadata model.
@@ -45,135 +43,3 @@ def validate_metadata(raw: dict) -> Optional[UserMetadata]:
except Exception as e:
logger.warning("Metadata validation failed: %s", e)
return None
def merge_metadata(existing: dict, new: dict) -> dict:
"""
Merge new extracted metadata with existing database metadata.
- Scalar fields: new value overwrites old value
- Array fields: support _op marker (append/replace/remove)
- Missing top-level keys in new: preserve existing data
- Auto-update _updated_at timestamp dict with field paths and ISO timestamps
- When existing is None or {}: directly write new + _updated_at (no merge logic)
"""
now = datetime.now(timezone.utc).isoformat()
if not existing:
# Direct write: new + _updated_at for all fields
result = dict(new)
updated_at = {}
_collect_field_paths(result, "", updated_at, now)
if updated_at:
result["_updated_at"] = updated_at
return result
result = dict(existing)
updated_at: dict = dict(result.get("_updated_at", {}))
for key, new_value in new.items():
if key == "_updated_at":
continue
old_value = result.get(key)
if isinstance(new_value, dict) and isinstance(old_value, dict):
# Nested dict merge (e.g. profile, behavioral_hints)
_merge_nested(result, key, old_value, new_value, updated_at, now)
elif isinstance(new_value, list) or (isinstance(new_value, dict) and "_op" in new_value):
# Array field with possible _op
_merge_array_field(result, key, old_value, new_value, updated_at, now)
else:
# Scalar top-level field
if old_value != new_value:
result[key] = new_value
updated_at[key] = now
# If equal, no change needed
result["_updated_at"] = updated_at
return result
# TODO 考虑大函数包含小函数,因为只服务于大函数,实现代码文件的结构清楚
def _collect_field_paths(data: dict, prefix: str, updated_at: dict, now: str) -> None:
"""Collect all leaf field paths for _updated_at on direct write."""
for key, value in data.items():
if key == "_updated_at":
continue
path = f"{prefix}{key}" if not prefix else f"{prefix}.{key}"
if isinstance(value, dict):
_collect_field_paths(value, path, updated_at, now)
else:
updated_at[path] = now
def _merge_nested(
result: dict, key: str, old_dict: dict, new_dict: dict,
updated_at: dict, now: str
) -> None:
"""Merge a nested dict (e.g. profile, behavioral_hints)."""
merged = dict(old_dict)
for field, new_val in new_dict.items():
old_val = merged.get(field)
path = f"{key}.{field}"
if isinstance(new_val, list) or (isinstance(new_val, dict) and "_op" in new_val):
_merge_array_field_inner(merged, field, old_val, new_val, updated_at, path, now)
else:
# Scalar field
if old_val != new_val:
merged[field] = new_val
updated_at[path] = now
result[key] = merged
def _merge_array_field(
result: dict, key: str, old_value, new_value,
updated_at: dict, now: str
) -> None:
"""Merge a top-level array field with _op support."""
_merge_array_field_inner(result, key, old_value, new_value, updated_at, key, now)
def _merge_array_field_inner(
container: dict, field: str, old_value, new_value,
updated_at: dict, path: str, now: str
) -> None:
"""Core array merge logic with _op support."""
# Determine op and items
if isinstance(new_value, dict) and "_op" in new_value:
op = new_value.get("_op", "append")
items = new_value.get(field, new_value.get("items", []))
# If the dict has a key matching the field name, use it; otherwise look for list values
if not isinstance(items, list):
# Try to find the list value in the dict (excluding _op)
for k, v in new_value.items():
if k != "_op" and isinstance(v, list):
items = v
break
else:
items = []
elif isinstance(new_value, list):
op = "append"
items = new_value
else:
op = "append"
items = []
old_arr = old_value if isinstance(old_value, list) else []
if op == "replace":
new_arr = items
elif op == "remove":
new_arr = [x for x in old_arr if x not in items]
else:
# append (default): merge and deduplicate
seen = list(old_arr)
for item in items:
if item not in seen:
seen.append(item)
new_arr = seen
if old_arr != new_arr:
container[field] = new_arr
updated_at[path] = now
else:
container[field] = new_arr

View File

@@ -13,6 +13,16 @@ Extract user metadata from the following conversation statements spoken by the u
- 如果文本中没有可提取的用户画像信息,返回空的 user_metadata 对象
- **输出语言必须与输入文本的语言一致**(输入中文则输出中文值,输入英文则输出英文值)
{% if existing_metadata %}
**重要:合并已有元数据**
下方提供了数据库中已有的用户元数据。请结合用户最新发言,输出**合并后的完整元数据**
- 如果用户明确否定了已有信息(如"我不再教高中物理了"),在输出中**移除**该信息
- 如果用户提到了新信息,**添加**到对应字段中
- 如果已有信息未被用户否定,**保留**在输出中
- 标量字段(如 role、domain如果用户提到了新值用新值替换否则保留已有值
- 最终输出应该是完整的、合并后的元数据,不是增量
{% endif %}
**字段说明:**
- profile.role用户的职业或角色如 教师、医生、后端工程师
- profile.domain用户所在领域如 教育、医疗、软件开发
@@ -34,6 +44,16 @@ Extract user metadata from the following conversation statements spoken by the u
- If no user profile information can be extracted, return an empty user_metadata object
- **Output language must match the input text language**
{% if existing_metadata %}
**Important: Merge with existing metadata**
Existing user metadata from the database is provided below. Combine with the user's latest statements to output the **complete merged metadata**:
- If the user explicitly negates existing info (e.g. "I no longer teach high school physics"), **remove** it from output
- If the user mentions new info, **add** it to the corresponding field
- If existing info is not negated by the user, **keep** it in the output
- Scalar fields (e.g. role, domain): replace with new value if user mentions one; otherwise keep existing
- The final output should be the complete, merged metadata — not an incremental update
{% endif %}
**Field descriptions:**
- profile.role: User's occupation or role, e.g. teacher, doctor, software engineer
- profile.domain: User's domain, e.g. education, healthcare, software development
@@ -50,6 +70,13 @@ Extract user metadata from the following conversation statements spoken by the u
- {{ stmt }}
{% endfor %}
{% if existing_metadata %}
===Existing User Metadata===
```json
{{ existing_metadata | tojson }}
```
{% endif %}
===Output Format===
Return a JSON object with the following structure:
```json

View File

@@ -1,4 +1,5 @@
import asyncio
import json
import os
import re
import shutil
@@ -2916,6 +2917,20 @@ def init_community_clustering_for_users(self, end_user_ids: List[str], workspace
# ─── User Metadata Extraction Task ───────────────────────────────────────────
def _update_timestamps(existing: dict, new: dict, updated_at: dict, now: str, prefix: str = "") -> None:
"""对比新旧元数据,更新变更字段的 _updated_at 时间戳。"""
for key, new_val in new.items():
if key == "_updated_at":
continue
path = f"{prefix}.{key}" if prefix else key
old_val = existing.get(key)
if isinstance(new_val, dict) and isinstance(old_val, dict):
_update_timestamps(old_val, new_val, updated_at, now, prefix=path)
elif old_val != new_val:
updated_at[path] = now
@celery_app.task(
bind=True,
name='app.tasks.extract_user_metadata',
@@ -2954,7 +2969,7 @@ def extract_user_metadata_task(
async def _run() -> Dict[str, Any]:
from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import MetadataExtractor
from app.core.memory.utils.metadata_utils import clean_metadata, merge_metadata, validate_metadata
from app.core.memory.utils.metadata_utils import clean_metadata, validate_metadata
from app.repositories.end_user_info_repository import EndUserInfoRepository
from app.repositories.end_user_repository import EndUserRepository
from app.services.memory_config_service import MemoryConfigService
@@ -2985,36 +3000,61 @@ def extract_user_metadata_task(
return {"status": "FAILURE", "error": "Memory config has no LLM model configured"}
llm_client = factory.get_llm_client(memory_config.llm_id)
# 3. 提取元数据
# 2.5 读取已有元数据,传给 extractor 作为上下文
existing_metadata = None
try:
info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid)
if info and info.meta_data:
existing_metadata = info.meta_data
logger.info("[CELERY METADATA] 已读取数据库已有元数据作为 LLM 上下文")
except Exception as e:
logger.warning(f"[CELERY METADATA] 读取已有元数据失败(继续无上下文提取): {e}")
# 3. 提取元数据(传入已有元数据作为上下文)
extractor = MetadataExtractor(llm_client=llm_client, language=language)
user_metadata = await extractor.extract_metadata(statements)
user_metadata = await extractor.extract_metadata(statements, existing_metadata=existing_metadata)
if not user_metadata:
logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}")
return {"status": "SUCCESS", "result": "no_metadata_extracted"}
# 4. 清洗、校验、合并、写入
raw_dict = user_metadata.model_dump()
# 4. 清洗、校验、覆盖写入
raw_dict = user_metadata.model_dump(exclude_none=True)
logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}")
cleaned = clean_metadata(raw_dict)
if not cleaned:
logger.info(f"[CELERY METADATA] Cleaned metadata is empty for end_user_id={end_user_id}")
return {"status": "SUCCESS", "result": "empty_after_cleaning"}
logger.info(f"[CELERY METADATA] 清洗后元数据: {json.dumps(cleaned, ensure_ascii=False)}")
validated = validate_metadata(cleaned)
if not validated:
return {"status": "FAILURE", "error": "Metadata validation failed after cleaning"}
# 直接覆盖写入LLM 已完成语义合并,输出的是完整结果)
# 保留 _updated_at 时间戳追踪
from datetime import datetime as dt, timezone as tz
now = dt.now(tz.utc).isoformat()
with get_db_context() as db:
end_user_uuid = uuid.UUID(end_user_id)
info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid)
if info:
existing_meta = info.meta_data if info.meta_data else {}
info.meta_data = merge_metadata(existing_meta, cleaned)
logger.info(f"[CELERY METADATA] Updated metadata for end_user_id={end_user_id}")
logger.info(f"[CELERY METADATA] 数据库已有元数据: {json.dumps(existing_meta, ensure_ascii=False)}")
# 保留已有的 _updated_at更新变更字段的时间戳
updated_at = dict(existing_meta.get("_updated_at", {}))
_update_timestamps(existing_meta, cleaned, updated_at, now)
final = dict(cleaned)
final["_updated_at"] = updated_at
info.meta_data = final
logger.info(f"[CELERY METADATA] 覆盖写入元数据: {json.dumps(final, ensure_ascii=False)}")
else:
# No end_user_info record yet - metadata will be written when alias sync creates it,
# or we create a minimal record here
logger.info(
f"[CELERY METADATA] No end_user_info record for end_user_id={end_user_id}, "
f"skipping metadata write (will be created by alias sync)"