refactor(memory): switch metadata extraction from full-replace to incremental changes
- Replace UserMetadata full-object overwrite with incremental MetadataFieldChange operations (set/remove per field path) - Convert profile.role and profile.domain from scalar strings to lists - Remove UserMetadataBehavioralHints and knowledge_tags fields - Update Jinja2 prompt to instruct LLM to output incremental changes - Update extract_user_metadata_task to apply changes via deep-copy and per-field mutation for proper SQLAlchemy change detection - Minor lint: remove unnecessary f-string prefixes in tasks.py
This commit is contained in:
104
api/app/tasks.py
104
api/app/tasks.py
@@ -455,7 +455,7 @@ def build_graphrag_for_kb(kb_id: uuid.UUID):
|
||||
db_knowledge = db.query(Knowledge).filter(Knowledge.id == kb_id).first()
|
||||
if db_knowledge is None:
|
||||
logger.error(f"[GraphRAG-KB] knowledge={kb_id} not found")
|
||||
return f"build knowledge graph failed: knowledge not found"
|
||||
return "build knowledge graph failed: knowledge not found"
|
||||
|
||||
if not (db_knowledge.parser_config and
|
||||
db_knowledge.parser_config.get("graphrag", {}).get("use_graphrag", False)):
|
||||
@@ -538,7 +538,7 @@ def build_graphrag_for_document(document_id: str, knowledge_id: str):
|
||||
db_knowledge = db.query(Knowledge).filter(Knowledge.id == uuid.UUID(knowledge_id)).first()
|
||||
if db_document is None or db_knowledge is None:
|
||||
logger.error(f"[GraphRAG] document={document_id} or knowledge={knowledge_id} not found")
|
||||
return f"build_graphrag_for_document failed: record not found"
|
||||
return "build_graphrag_for_document failed: record not found"
|
||||
|
||||
graphrag_conf = db_knowledge.parser_config.get("graphrag", {})
|
||||
with_resolution = graphrag_conf.get("resolution", False)
|
||||
@@ -617,7 +617,7 @@ def sync_knowledge_for_kb(kb_id: uuid.UUID):
|
||||
db_knowledge = db.query(Knowledge).filter(Knowledge.id == kb_id).first()
|
||||
if db_knowledge is None:
|
||||
logger.error(f"[SyncKB] knowledge={kb_id} not found")
|
||||
return f"sync knowledge failed: knowledge not found"
|
||||
return "sync knowledge failed: knowledge not found"
|
||||
|
||||
# 1. get vector_service
|
||||
vector_service = ElasticSearchVectorFactory().init_vector(knowledge=db_knowledge)
|
||||
@@ -3102,29 +3102,11 @@ def extract_user_metadata_task(
|
||||
logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}")
|
||||
return {"status": "SUCCESS", "result": "no_metadata_extracted"}
|
||||
|
||||
user_metadata, aliases_to_add, aliases_to_remove = extract_result
|
||||
logger.info(f"[CELERY METADATA] LLM 别名新增: {aliases_to_add}, 移除: {aliases_to_remove}")
|
||||
|
||||
# 4. 清洗元数据、覆盖写入元数据和别名
|
||||
def clean_metadata(raw: dict) -> dict:
|
||||
"""递归移除空字符串、空列表、空字典。"""
|
||||
result = {}
|
||||
for k, v in raw.items():
|
||||
if v == "" or v == []:
|
||||
continue
|
||||
if isinstance(v, dict):
|
||||
cleaned = clean_metadata(v)
|
||||
if cleaned:
|
||||
result[k] = cleaned
|
||||
else:
|
||||
result[k] = v
|
||||
return result
|
||||
|
||||
raw_dict = user_metadata.model_dump(exclude_none=True) if user_metadata else {}
|
||||
logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}")
|
||||
|
||||
cleaned = clean_metadata(raw_dict) if raw_dict else {}
|
||||
logger.info(f"[CELERY METADATA] 清洗后元数据: {json.dumps(cleaned, ensure_ascii=False)}")
|
||||
metadata_changes, aliases_to_add, aliases_to_remove = extract_result
|
||||
logger.info(
|
||||
f"[CELERY METADATA] LLM 元数据变更: {[c.model_dump() for c in metadata_changes]}, "
|
||||
f"别名新增: {aliases_to_add}, 移除: {aliases_to_remove}"
|
||||
)
|
||||
|
||||
from datetime import datetime as dt, timezone as tz
|
||||
now = dt.now(tz.utc).isoformat()
|
||||
@@ -3152,15 +3134,50 @@ def extract_user_metadata_task(
|
||||
end_user = EndUserRepository(db).get_by_id(end_user_uuid)
|
||||
|
||||
if info:
|
||||
# 元数据覆盖写入
|
||||
if cleaned:
|
||||
existing_meta = info.meta_data if info.meta_data else {}
|
||||
# 4. 元数据增量更新(按 LLM 输出的变更操作逐条执行,所有字段均为列表类型)
|
||||
if metadata_changes:
|
||||
# 深拷贝,确保 SQLAlchemy 能检测到变更
|
||||
import copy
|
||||
existing_meta = copy.deepcopy(info.meta_data) if info.meta_data else {}
|
||||
updated_at = dict(existing_meta.get("_updated_at", {}))
|
||||
_update_timestamps(existing_meta, cleaned, updated_at, now)
|
||||
final = dict(cleaned)
|
||||
final["_updated_at"] = updated_at
|
||||
info.meta_data = final
|
||||
logger.info("[CELERY METADATA] 覆盖写入元数据")
|
||||
|
||||
for change in metadata_changes:
|
||||
field_path = change.field_path
|
||||
action = change.action
|
||||
value = change.value
|
||||
|
||||
if not value or not value.strip():
|
||||
continue
|
||||
|
||||
# 定位到目标字段的父级节点
|
||||
parts = field_path.split(".")
|
||||
target = existing_meta
|
||||
for part in parts[:-1]:
|
||||
target = target.setdefault(part, {})
|
||||
leaf = parts[-1]
|
||||
|
||||
current_list = target.get(leaf, [])
|
||||
if not isinstance(current_list, list):
|
||||
current_list = []
|
||||
|
||||
if action == "set":
|
||||
if value not in current_list:
|
||||
current_list.append(value)
|
||||
target[leaf] = current_list
|
||||
logger.info(f"[CELERY METADATA] set {field_path} = {value}")
|
||||
|
||||
elif action == "remove":
|
||||
if value in current_list:
|
||||
current_list.remove(value)
|
||||
target[leaf] = current_list
|
||||
logger.info(f"[CELERY METADATA] remove {value} from {field_path}")
|
||||
|
||||
updated_at[field_path] = now
|
||||
|
||||
existing_meta["_updated_at"] = updated_at
|
||||
# 赋值深拷贝后的新对象,SQLAlchemy 会检测到字段变更并写入
|
||||
info.meta_data = existing_meta
|
||||
logger.info(f"[CELERY METADATA] 增量更新元数据完成: {json.dumps(existing_meta, ensure_ascii=False)}")
|
||||
|
||||
# 别名增量增删:(已有 - remove) + add
|
||||
old_aliases = info.aliases if info.aliases else []
|
||||
@@ -3196,12 +3213,27 @@ def extract_user_metadata_task(
|
||||
from app.models.end_user_info_model import EndUserInfo
|
||||
initial_aliases = filtered_add # 新记录只有 add,没有 remove
|
||||
first_alias = initial_aliases[0] if initial_aliases else ""
|
||||
if first_alias or cleaned:
|
||||
|
||||
# 从变更操作构建初始元数据(所有字段均为列表类型)
|
||||
initial_meta = {}
|
||||
for change in metadata_changes:
|
||||
if change.action == "set" and change.value is not None and change.value.strip():
|
||||
parts = change.field_path.split(".")
|
||||
target = initial_meta
|
||||
for part in parts[:-1]:
|
||||
target = target.setdefault(part, {})
|
||||
leaf = parts[-1]
|
||||
current_list = target.get(leaf, [])
|
||||
if change.value not in current_list:
|
||||
current_list.append(change.value)
|
||||
target[leaf] = current_list
|
||||
|
||||
if first_alias or initial_meta:
|
||||
new_info = EndUserInfo(
|
||||
end_user_id=end_user_uuid,
|
||||
other_name=first_alias or "",
|
||||
aliases=initial_aliases,
|
||||
meta_data=cleaned if cleaned else None,
|
||||
meta_data=initial_meta if initial_meta else None,
|
||||
)
|
||||
db.add(new_info)
|
||||
if end_user and first_alias and (
|
||||
|
||||
Reference in New Issue
Block a user