diff --git a/api/app/core/memory/models/metadata_models.py b/api/app/core/memory/models/metadata_models.py index e3184879..a5c70ec6 100644 --- a/api/app/core/memory/models/metadata_models.py +++ b/api/app/core/memory/models/metadata_models.py @@ -12,8 +12,8 @@ from pydantic import BaseModel, ConfigDict, Field class UserMetadataProfile(BaseModel): """用户画像信息""" model_config = ConfigDict(extra='ignore') - role: str = Field(default="", description="用户职业或角色,如 teacher, doctor, software_engineer") - domain: str = Field(default="", description="用户所在领域,如 education, healthcare, software_development") + role: str = Field(default="", description="用户职业或角色") + domain: str = Field(default="", description="用户所在领域") expertise: List[str] = Field(default_factory=list, description="用户擅长的技能或工具") interests: List[str] = Field(default_factory=list, description="用户关注的话题或领域标签") diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py index 5e763622..af3331b9 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py @@ -111,10 +111,15 @@ class MetadataExtractor: ) return result - async def extract_metadata(self, statements: List[str]) -> Optional[UserMetadata]: + async def extract_metadata(self, statements: List[str], existing_metadata: Optional[dict] = None) -> Optional[UserMetadata]: """ 对筛选后的 statement 列表调用 LLM 提取元数据。 语言根据 statement 内容自动检测,不依赖系统界面语言。 + 传入已有元数据作为上下文,让 LLM 能判断 replace/remove 操作。 + + Args: + statements: 用户发言的 statement 文本列表 + existing_metadata: 数据库已有的元数据(可选),用于 LLM 对比判断变更 Returns: UserMetadata on success, None on failure @@ -133,6 +138,7 @@ class MetadataExtractor: prompt = template.render( statements=statements, language=detected_language, + existing_metadata=existing_metadata, json_schema="", ) diff --git a/api/app/core/memory/utils/metadata_utils.py b/api/app/core/memory/utils/metadata_utils.py index ccdd1686..69bd8edf 100644 --- a/api/app/core/memory/utils/metadata_utils.py +++ b/api/app/core/memory/utils/metadata_utils.py @@ -1,10 +1,8 @@ """ -Metadata utility functions for cleaning, validating, aggregating, and merging -user metadata extracted from conversations. +Metadata utility functions for cleaning and validating user metadata. """ import logging -from datetime import datetime, timezone from typing import Optional from app.core.memory.models.metadata_models import UserMetadata @@ -34,7 +32,7 @@ def clean_metadata(raw: dict) -> dict: cleaned[key] = value return cleaned -# TODO 这个函数没有调用的地方 + def validate_metadata(raw: dict) -> Optional[UserMetadata]: """ Validate metadata structure using the Pydantic UserMetadata model. @@ -45,135 +43,3 @@ def validate_metadata(raw: dict) -> Optional[UserMetadata]: except Exception as e: logger.warning("Metadata validation failed: %s", e) return None - - -def merge_metadata(existing: dict, new: dict) -> dict: - """ - Merge new extracted metadata with existing database metadata. - - Scalar fields: new value overwrites old value - - Array fields: support _op marker (append/replace/remove) - - Missing top-level keys in new: preserve existing data - - Auto-update _updated_at timestamp dict with field paths and ISO timestamps - - When existing is None or {}: directly write new + _updated_at (no merge logic) - """ - now = datetime.now(timezone.utc).isoformat() - - if not existing: - # Direct write: new + _updated_at for all fields - result = dict(new) - updated_at = {} - _collect_field_paths(result, "", updated_at, now) - if updated_at: - result["_updated_at"] = updated_at - return result - - result = dict(existing) - updated_at: dict = dict(result.get("_updated_at", {})) - - for key, new_value in new.items(): - if key == "_updated_at": - continue - - old_value = result.get(key) - - if isinstance(new_value, dict) and isinstance(old_value, dict): - # Nested dict merge (e.g. profile, behavioral_hints) - _merge_nested(result, key, old_value, new_value, updated_at, now) - elif isinstance(new_value, list) or (isinstance(new_value, dict) and "_op" in new_value): - # Array field with possible _op - _merge_array_field(result, key, old_value, new_value, updated_at, now) - else: - # Scalar top-level field - if old_value != new_value: - result[key] = new_value - updated_at[key] = now - # If equal, no change needed - - result["_updated_at"] = updated_at - return result - -# TODO 考虑大函数包含小函数,因为只服务于大函数,实现代码文件的结构清楚 -def _collect_field_paths(data: dict, prefix: str, updated_at: dict, now: str) -> None: - """Collect all leaf field paths for _updated_at on direct write.""" - for key, value in data.items(): - if key == "_updated_at": - continue - path = f"{prefix}{key}" if not prefix else f"{prefix}.{key}" - if isinstance(value, dict): - _collect_field_paths(value, path, updated_at, now) - else: - updated_at[path] = now - - -def _merge_nested( - result: dict, key: str, old_dict: dict, new_dict: dict, - updated_at: dict, now: str -) -> None: - """Merge a nested dict (e.g. profile, behavioral_hints).""" - merged = dict(old_dict) - for field, new_val in new_dict.items(): - old_val = merged.get(field) - path = f"{key}.{field}" - - if isinstance(new_val, list) or (isinstance(new_val, dict) and "_op" in new_val): - _merge_array_field_inner(merged, field, old_val, new_val, updated_at, path, now) - else: - # Scalar field - if old_val != new_val: - merged[field] = new_val - updated_at[path] = now - result[key] = merged - - -def _merge_array_field( - result: dict, key: str, old_value, new_value, - updated_at: dict, now: str -) -> None: - """Merge a top-level array field with _op support.""" - _merge_array_field_inner(result, key, old_value, new_value, updated_at, key, now) - - -def _merge_array_field_inner( - container: dict, field: str, old_value, new_value, - updated_at: dict, path: str, now: str -) -> None: - """Core array merge logic with _op support.""" - # Determine op and items - if isinstance(new_value, dict) and "_op" in new_value: - op = new_value.get("_op", "append") - items = new_value.get(field, new_value.get("items", [])) - # If the dict has a key matching the field name, use it; otherwise look for list values - if not isinstance(items, list): - # Try to find the list value in the dict (excluding _op) - for k, v in new_value.items(): - if k != "_op" and isinstance(v, list): - items = v - break - else: - items = [] - elif isinstance(new_value, list): - op = "append" - items = new_value - else: - op = "append" - items = [] - - old_arr = old_value if isinstance(old_value, list) else [] - - if op == "replace": - new_arr = items - elif op == "remove": - new_arr = [x for x in old_arr if x not in items] - else: - # append (default): merge and deduplicate - seen = list(old_arr) - for item in items: - if item not in seen: - seen.append(item) - new_arr = seen - - if old_arr != new_arr: - container[field] = new_arr - updated_at[path] = now - else: - container[field] = new_arr diff --git a/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 b/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 index 9053e57d..c280e5f6 100644 --- a/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 +++ b/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 @@ -13,6 +13,16 @@ Extract user metadata from the following conversation statements spoken by the u - 如果文本中没有可提取的用户画像信息,返回空的 user_metadata 对象 - **输出语言必须与输入文本的语言一致**(输入中文则输出中文值,输入英文则输出英文值) +{% if existing_metadata %} +**重要:合并已有元数据** +下方提供了数据库中已有的用户元数据。请结合用户最新发言,输出**合并后的完整元数据**: +- 如果用户明确否定了已有信息(如"我不再教高中物理了"),在输出中**移除**该信息 +- 如果用户提到了新信息,**添加**到对应字段中 +- 如果已有信息未被用户否定,**保留**在输出中 +- 标量字段(如 role、domain):如果用户提到了新值,用新值替换;否则保留已有值 +- 最终输出应该是完整的、合并后的元数据,不是增量 +{% endif %} + **字段说明:** - profile.role:用户的职业或角色,如 教师、医生、后端工程师 - profile.domain:用户所在领域,如 教育、医疗、软件开发 @@ -34,6 +44,16 @@ Extract user metadata from the following conversation statements spoken by the u - If no user profile information can be extracted, return an empty user_metadata object - **Output language must match the input text language** +{% if existing_metadata %} +**Important: Merge with existing metadata** +Existing user metadata from the database is provided below. Combine with the user's latest statements to output the **complete merged metadata**: +- If the user explicitly negates existing info (e.g. "I no longer teach high school physics"), **remove** it from output +- If the user mentions new info, **add** it to the corresponding field +- If existing info is not negated by the user, **keep** it in the output +- Scalar fields (e.g. role, domain): replace with new value if user mentions one; otherwise keep existing +- The final output should be the complete, merged metadata — not an incremental update +{% endif %} + **Field descriptions:** - profile.role: User's occupation or role, e.g. teacher, doctor, software engineer - profile.domain: User's domain, e.g. education, healthcare, software development @@ -50,6 +70,13 @@ Extract user metadata from the following conversation statements spoken by the u - {{ stmt }} {% endfor %} +{% if existing_metadata %} +===Existing User Metadata=== +```json +{{ existing_metadata | tojson }} +``` +{% endif %} + ===Output Format=== Return a JSON object with the following structure: ```json diff --git a/api/app/tasks.py b/api/app/tasks.py index 4914e142..3eb1a52c 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1,4 +1,5 @@ import asyncio +import json import os import re import shutil @@ -2916,6 +2917,20 @@ def init_community_clustering_for_users(self, end_user_ids: List[str], workspace # ─── User Metadata Extraction Task ─────────────────────────────────────────── + +def _update_timestamps(existing: dict, new: dict, updated_at: dict, now: str, prefix: str = "") -> None: + """对比新旧元数据,更新变更字段的 _updated_at 时间戳。""" + for key, new_val in new.items(): + if key == "_updated_at": + continue + path = f"{prefix}.{key}" if prefix else key + old_val = existing.get(key) + + if isinstance(new_val, dict) and isinstance(old_val, dict): + _update_timestamps(old_val, new_val, updated_at, now, prefix=path) + elif old_val != new_val: + updated_at[path] = now + @celery_app.task( bind=True, name='app.tasks.extract_user_metadata', @@ -2954,7 +2969,7 @@ def extract_user_metadata_task( async def _run() -> Dict[str, Any]: from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import MetadataExtractor - from app.core.memory.utils.metadata_utils import clean_metadata, merge_metadata, validate_metadata + from app.core.memory.utils.metadata_utils import clean_metadata, validate_metadata from app.repositories.end_user_info_repository import EndUserInfoRepository from app.repositories.end_user_repository import EndUserRepository from app.services.memory_config_service import MemoryConfigService @@ -2985,36 +3000,61 @@ def extract_user_metadata_task( return {"status": "FAILURE", "error": "Memory config has no LLM model configured"} llm_client = factory.get_llm_client(memory_config.llm_id) - # 3. 提取元数据 + # 2.5 读取已有元数据,传给 extractor 作为上下文 + existing_metadata = None + try: + info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid) + if info and info.meta_data: + existing_metadata = info.meta_data + logger.info("[CELERY METADATA] 已读取数据库已有元数据作为 LLM 上下文") + except Exception as e: + logger.warning(f"[CELERY METADATA] 读取已有元数据失败(继续无上下文提取): {e}") + + # 3. 提取元数据(传入已有元数据作为上下文) extractor = MetadataExtractor(llm_client=llm_client, language=language) - user_metadata = await extractor.extract_metadata(statements) + user_metadata = await extractor.extract_metadata(statements, existing_metadata=existing_metadata) if not user_metadata: logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}") return {"status": "SUCCESS", "result": "no_metadata_extracted"} - # 4. 清洗、校验、合并、写入 - raw_dict = user_metadata.model_dump() + # 4. 清洗、校验、覆盖写入 + raw_dict = user_metadata.model_dump(exclude_none=True) + logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}") + cleaned = clean_metadata(raw_dict) if not cleaned: logger.info(f"[CELERY METADATA] Cleaned metadata is empty for end_user_id={end_user_id}") return {"status": "SUCCESS", "result": "empty_after_cleaning"} + logger.info(f"[CELERY METADATA] 清洗后元数据: {json.dumps(cleaned, ensure_ascii=False)}") + validated = validate_metadata(cleaned) if not validated: return {"status": "FAILURE", "error": "Metadata validation failed after cleaning"} + # 直接覆盖写入(LLM 已完成语义合并,输出的是完整结果) + # 保留 _updated_at 时间戳追踪 + from datetime import datetime as dt, timezone as tz + now = dt.now(tz.utc).isoformat() + with get_db_context() as db: end_user_uuid = uuid.UUID(end_user_id) info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid) if info: existing_meta = info.meta_data if info.meta_data else {} - info.meta_data = merge_metadata(existing_meta, cleaned) - logger.info(f"[CELERY METADATA] Updated metadata for end_user_id={end_user_id}") + logger.info(f"[CELERY METADATA] 数据库已有元数据: {json.dumps(existing_meta, ensure_ascii=False)}") + + # 保留已有的 _updated_at,更新变更字段的时间戳 + updated_at = dict(existing_meta.get("_updated_at", {})) + _update_timestamps(existing_meta, cleaned, updated_at, now) + + final = dict(cleaned) + final["_updated_at"] = updated_at + info.meta_data = final + logger.info(f"[CELERY METADATA] 覆盖写入元数据: {json.dumps(final, ensure_ascii=False)}") else: - # No end_user_info record yet - metadata will be written when alias sync creates it, - # or we create a minimal record here logger.info( f"[CELERY METADATA] No end_user_info record for end_user_id={end_user_id}, " f"skipping metadata write (will be created by alias sync)"