feat(memory): sync user entity aliases and metadata to PostgreSQL

- Add `aliases` and `end_user_id` fields to user entity dicts in
  `collect_user_entities_for_metadata` so downstream tasks can write
  them to PostgreSQL
- Add `update_aliases_and_metadata` method to `EndUserInfoRepository`
  for incremental, case-insensitive dedup merge of aliases and
  structured metadata fields
- Add `_sync_end_user_info_pg` helper in tasks.py that writes aliases
  and extracted metadata to `end_user_info`, and back-fills
  `end_user.other_name` when empty
- Call `_sync_end_user_info_pg` from `extract_metadata_batch_task`
  after Neo4j write, and also when no new metadata but aliases exist
- Filter `meta_data` response in `UserMemoryService.get_end_user_info`
  to expose only four core fields: goals, traits, interests, core_facts
This commit is contained in:
lanceyq
2026-05-07 18:07:32 +08:00
parent d255f33f1f
commit e3ab19dd4f
4 changed files with 159 additions and 16 deletions

View File

@@ -1751,6 +1751,66 @@ def post_store_dedup_and_alias_merge_task(
_shutdown_loop_gracefully(loop)
def _sync_end_user_info_pg(
end_user_id: str,
aliases: List[str],
extracted_metadata: Optional[Dict[str, Any]],
) -> None:
"""将别名和元数据增量同步到 PostgreSQL end_user_info 表。
- aliases 合并到 end_user_info.aliases去重
- end_user_info.other_name 若为空则取 aliases[0]
- end_user.other_name 与 end_user_info.other_name 保持同步
- extracted_metadata 各字段列表合并到 end_user_info.meta_data去重
失败只记日志,不抛异常,不影响主流程。
"""
try:
import uuid as _uuid
from app.db import get_db_context
from app.repositories.end_user_info_repository import EndUserInfoRepository
from app.repositories.end_user_repository import EndUserRepository
eu_uuid = _uuid.UUID(end_user_id)
with get_db_context() as db:
info_repo = EndUserInfoRepository(db)
info = info_repo.update_aliases_and_metadata(
end_user_id=eu_uuid,
new_aliases=aliases or [],
new_metadata=extracted_metadata,
)
if info is None:
logger.warning(
f"[Metadata][PG] end_user_info 记录不存在,跳过同步: end_user_id={end_user_id}"
)
return
# 同步 end_user.other_name与 end_user_info.other_name 保持一致)
new_other_name = (info.other_name or "").strip()
if new_other_name:
eu_repo = EndUserRepository(db)
end_user = eu_repo.get_end_user_by_id(eu_uuid)
if end_user and not (end_user.other_name or "").strip():
end_user.other_name = new_other_name
db.commit()
logger.info(
f"[Metadata][PG] 同步 end_user.other_name={new_other_name}: "
f"end_user_id={end_user_id}"
)
logger.info(
f"[Metadata][PG] end_user_info 同步完成: end_user_id={end_user_id}, "
f"aliases_count={len(aliases or [])}"
)
except Exception as e:
logger.warning(
f"[Metadata][PG] 同步 end_user_info 失败(不影响主流程): "
f"end_user_id={end_user_id}, error={e}",
exc_info=True,
)
@celery_app.task(
bind=True,
name="app.tasks.extract_metadata_batch",
@@ -1764,16 +1824,19 @@ def extract_metadata_batch_task(
language: str = "zh",
snapshot_dir: Optional[str] = None,
) -> Dict[str, Any]:
"""Celery task: 用户实体元数据提取 + Neo4j 回写。
"""Celery task: 用户实体元数据提取 + Neo4j 回写 + PostgreSQL 同步
在主写入流水线完成后异步执行。从用户实体的 description 中提取
结构化元数据core_facts、traits、relations 等),增量回写到 Neo4j
结构化元数据core_facts、traits、relations 等),增量回写到 Neo4j
同时将 aliases 和 extracted_metadata 同步到 PostgreSQL end_user_info 表。
Args:
user_entities: 用户实体列表,每项包含:
- entity_id: 实体 ID
- entity_name: 实体名称
- descriptions: description 文本列表
- aliases: 实体别名列表(来自 "别名属于" 关系归并后的结果)
- end_user_id: 终端用户 ID用于写入 PostgreSQL
llm_model_id: LLM 模型 UUID 字符串
language: 语言 ("zh" / "en")
snapshot_dir: 可选的快照目录路径(调试模式下使用)
@@ -1825,6 +1888,8 @@ def extract_metadata_batch_task(
entity_id = entity_dict["entity_id"]
entity_name = entity_dict.get("entity_name", "")
descriptions = entity_dict.get("descriptions", [])
aliases = entity_dict.get("aliases", [])
end_user_id = entity_dict.get("end_user_id", "")
if not descriptions:
logger.debug(f"[Metadata] 跳过无 description 的实体: {entity_id}")
@@ -1874,7 +1939,22 @@ def extract_metadata_batch_task(
logger.info(
f"[Metadata] 实体 {entity_name}({entity_id}) 元数据提取并回写成功"
)
# 同步写入 PostgreSQL end_user_info
if end_user_id:
_sync_end_user_info_pg(
end_user_id=end_user_id,
aliases=aliases,
extracted_metadata=result.model_dump(),
)
else:
# 即使无新增元数据,也同步 aliases 到 PostgreSQL
if end_user_id and aliases:
_sync_end_user_info_pg(
end_user_id=end_user_id,
aliases=aliases,
extracted_metadata=None,
)
logger.debug(
f"[Metadata] 实体 {entity_name}({entity_id}) 无新增元数据"
)