diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py index 514a4ebc..8954cee6 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py @@ -57,6 +57,8 @@ def collect_user_entities_for_metadata( "entity_id": entity.id, "entity_name": entity.name, "descriptions": descriptions, + "aliases": list(entity.aliases or []), + "end_user_id": entity.end_user_id, }) if user_entities: diff --git a/api/app/repositories/end_user_info_repository.py b/api/app/repositories/end_user_info_repository.py index f627b46f..cda4212c 100644 --- a/api/app/repositories/end_user_info_repository.py +++ b/api/app/repositories/end_user_info_repository.py @@ -2,7 +2,7 @@ 终端用户信息仓储层 """ import uuid -from typing import List, Optional +from typing import Dict, List, Optional from sqlalchemy.orm import Session from app.models.end_user_info_model import EndUserInfo @@ -69,3 +69,72 @@ class EndUserInfoRepository: self.db.commit() logger.info(f"删除用户所有信息记录: end_user_id={end_user_id}, count={count}") return count + + def update_aliases_and_metadata( + self, + end_user_id: uuid.UUID, + new_aliases: Optional[List[str]] = None, + new_metadata: Optional[dict] = None, + ) -> Optional[EndUserInfo]: + """增量更新用户别名列表和元数据。 + + - aliases:将 new_aliases 合并到现有列表(去重,忽略大小写),不覆盖 + - meta_data:将 new_metadata 的各字段列表合并到现有 meta_data(去重),不覆盖 + - other_name:若当前为空且 aliases 非空,则取 aliases[0] 作为 other_name + + Args: + end_user_id: 终端用户 ID + new_aliases: 本次新增的别名列表 + new_metadata: 本次提取的 extracted_metadata 字典 + + Returns: + 更新后的 EndUserInfo,若记录不存在则返回 None + """ + end_user_info = self.get_by_end_user_id(end_user_id) + if not end_user_info: + logger.warning(f"[EndUserInfo] 记录不存在,跳过更新: end_user_id={end_user_id}") + return None + + changed = False + + # ── 合并 aliases(去重,忽略大小写)── + if new_aliases: + existing = list(end_user_info.aliases or []) + existing_lower = {a.lower() for a in existing} + for alias in new_aliases: + alias = alias.strip() + if alias and alias.lower() not in existing_lower: + existing.append(alias) + existing_lower.add(alias.lower()) + end_user_info.aliases = existing + changed = True + + # ── 同步 other_name:取 aliases[0](若当前为空)── + if end_user_info.aliases and not (end_user_info.other_name or "").strip(): + end_user_info.other_name = end_user_info.aliases[0] + changed = True + + # ── 合并 meta_data(各字段列表去重追加)── + if new_metadata: + existing_meta = dict(end_user_info.meta_data or {}) + for field, values in new_metadata.items(): + if not isinstance(values, list): + continue + existing_list = list(existing_meta.get(field) or []) + existing_set = {str(v).lower() for v in existing_list} + for v in values: + if str(v).lower() not in existing_set: + existing_list.append(v) + existing_set.add(str(v).lower()) + existing_meta[field] = existing_list + end_user_info.meta_data = existing_meta + changed = True + + if changed: + self.db.commit() + self.db.refresh(end_user_info) + logger.info( + f"[EndUserInfo] 更新完成: end_user_id={end_user_id}, " + f"aliases_count={len(end_user_info.aliases or [])}" + ) + return end_user_info diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index 4d120d8c..ee6c5e0b 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -399,25 +399,17 @@ class UserMemoryService: } # 构建响应数据(转换时间为毫秒时间戳) - # 将 meta_data 中的 profile、knowledge_tags、behavioral_hints 平铺到顶层 - meta = end_user_info_record.meta_data or {} - - # profile 列表字段截断:只返回前 MAX_PROFILE_LIST_SIZE 条(按时间从新到旧) - MAX_PROFILE_LIST_SIZE = 5 - profile = meta.get("profile") - if isinstance(profile, dict): - for key in ("role", "domain", "expertise", "interests"): - if isinstance(profile.get(key), list): - profile[key] = profile[key][:MAX_PROFILE_LIST_SIZE] + # meta_data 只暴露四个核心字段 + _META_FIELDS = ("goals", "traits", "interests", "core_facts") + raw_meta = end_user_info_record.meta_data or {} + filtered_meta = {k: raw_meta[k] for k in _META_FIELDS if k in raw_meta} response_data = { "end_user_info_id": str(end_user_info_record.id), "end_user_id": str(end_user_info_record.end_user_id), "other_name": end_user_info_record.other_name, "aliases": end_user_info_record.aliases, - "profile": profile, - "knowledge_tags": meta.get("knowledge_tags"), - "behavioral_hints": meta.get("behavioral_hints"), + "meta_data": filtered_meta, "created_at": datetime_to_timestamp(end_user_info_record.created_at), "updated_at": datetime_to_timestamp(end_user_info_record.updated_at) } diff --git a/api/app/tasks.py b/api/app/tasks.py index d1dc4950..0fe1d4ea 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1751,6 +1751,66 @@ def post_store_dedup_and_alias_merge_task( _shutdown_loop_gracefully(loop) +def _sync_end_user_info_pg( + end_user_id: str, + aliases: List[str], + extracted_metadata: Optional[Dict[str, Any]], +) -> None: + """将别名和元数据增量同步到 PostgreSQL end_user_info 表。 + + - aliases 合并到 end_user_info.aliases(去重) + - end_user_info.other_name 若为空则取 aliases[0] + - end_user.other_name 与 end_user_info.other_name 保持同步 + - extracted_metadata 各字段列表合并到 end_user_info.meta_data(去重) + + 失败只记日志,不抛异常,不影响主流程。 + """ + try: + import uuid as _uuid + from app.db import get_db_context + from app.repositories.end_user_info_repository import EndUserInfoRepository + from app.repositories.end_user_repository import EndUserRepository + + eu_uuid = _uuid.UUID(end_user_id) + + with get_db_context() as db: + info_repo = EndUserInfoRepository(db) + info = info_repo.update_aliases_and_metadata( + end_user_id=eu_uuid, + new_aliases=aliases or [], + new_metadata=extracted_metadata, + ) + if info is None: + logger.warning( + f"[Metadata][PG] end_user_info 记录不存在,跳过同步: end_user_id={end_user_id}" + ) + return + + # 同步 end_user.other_name(与 end_user_info.other_name 保持一致) + new_other_name = (info.other_name or "").strip() + if new_other_name: + eu_repo = EndUserRepository(db) + end_user = eu_repo.get_end_user_by_id(eu_uuid) + if end_user and not (end_user.other_name or "").strip(): + end_user.other_name = new_other_name + db.commit() + logger.info( + f"[Metadata][PG] 同步 end_user.other_name={new_other_name}: " + f"end_user_id={end_user_id}" + ) + + logger.info( + f"[Metadata][PG] end_user_info 同步完成: end_user_id={end_user_id}, " + f"aliases_count={len(aliases or [])}" + ) + except Exception as e: + logger.warning( + f"[Metadata][PG] 同步 end_user_info 失败(不影响主流程): " + f"end_user_id={end_user_id}, error={e}", + exc_info=True, + ) + + @celery_app.task( bind=True, name="app.tasks.extract_metadata_batch", @@ -1764,16 +1824,19 @@ def extract_metadata_batch_task( language: str = "zh", snapshot_dir: Optional[str] = None, ) -> Dict[str, Any]: - """Celery task: 用户实体元数据提取 + Neo4j 回写。 + """Celery task: 用户实体元数据提取 + Neo4j 回写 + PostgreSQL 同步。 在主写入流水线完成后异步执行。从用户实体的 description 中提取 - 结构化元数据(core_facts、traits、relations 等),增量回写到 Neo4j。 + 结构化元数据(core_facts、traits、relations 等),增量回写到 Neo4j, + 同时将 aliases 和 extracted_metadata 同步到 PostgreSQL end_user_info 表。 Args: user_entities: 用户实体列表,每项包含: - entity_id: 实体 ID - entity_name: 实体名称 - descriptions: description 文本列表 + - aliases: 实体别名列表(来自 "别名属于" 关系归并后的结果) + - end_user_id: 终端用户 ID(用于写入 PostgreSQL) llm_model_id: LLM 模型 UUID 字符串 language: 语言 ("zh" / "en") snapshot_dir: 可选的快照目录路径(调试模式下使用) @@ -1825,6 +1888,8 @@ def extract_metadata_batch_task( entity_id = entity_dict["entity_id"] entity_name = entity_dict.get("entity_name", "") descriptions = entity_dict.get("descriptions", []) + aliases = entity_dict.get("aliases", []) + end_user_id = entity_dict.get("end_user_id", "") if not descriptions: logger.debug(f"[Metadata] 跳过无 description 的实体: {entity_id}") @@ -1874,7 +1939,22 @@ def extract_metadata_batch_task( logger.info( f"[Metadata] 实体 {entity_name}({entity_id}) 元数据提取并回写成功" ) + + # 同步写入 PostgreSQL end_user_info + if end_user_id: + _sync_end_user_info_pg( + end_user_id=end_user_id, + aliases=aliases, + extracted_metadata=result.model_dump(), + ) else: + # 即使无新增元数据,也同步 aliases 到 PostgreSQL + if end_user_id and aliases: + _sync_end_user_info_pg( + end_user_id=end_user_id, + aliases=aliases, + extracted_metadata=None, + ) logger.debug( f"[Metadata] 实体 {entity_name}({entity_id}) 无新增元数据" )