feat(memory): add async user metadata extraction pipeline

- Add MetadataExtractor to collect user-related statements post-dedup
  and extract profile/behavioral metadata via independent LLM call
- Add Celery task (extract_user_metadata) routed to memory_tasks queue
- Add metadata models (UserMetadata, UserMetadataProfile, etc.)
- Add metadata utility functions (clean, validate, merge with _op support)
- Add Jinja2 prompt template for metadata extraction (zh/en)
- Fix Lucene query parameter naming: rename `q` to `query` across all
  Cypher queries, graph_search functions, and callers
- Escape `/` in Lucene queries to prevent TokenMgrError
- Add `speaker` field to ChunkNode and persist it in Neo4j
- Remove unused imports (argparse, os, UUID) in search.py
- Fix unnecessary db context nesting in interest distribution task
This commit is contained in:
lanceyq
2026-04-09 11:01:56 +08:00
parent cfbf83f71e
commit f2d7479229
17 changed files with 714 additions and 78 deletions

View File

@@ -153,7 +153,7 @@ class PerceptualSearchService:
return []
try:
r = await search_perceptual(
connector=connector, q=escaped,
connector=connector, query=escaped,
end_user_id=self.end_user_id,
limit=limit * 5, # 多查一些以提高命中率
)
@@ -178,7 +178,7 @@ class PerceptualSearchService:
if not escaped.strip():
return []
r = await search_perceptual(
connector=connector, q=escaped,
connector=connector, query=escaped,
end_user_id=self.end_user_id, limit=limit,
)
return r.get("perceptuals", [])

View File

@@ -58,6 +58,14 @@ from app.core.memory.models.triplet_models import (
TripletExtractionResponse,
)
# User metadata models
from app.core.memory.models.metadata_models import (
UserMetadata,
UserMetadataBehavioralHints,
UserMetadataProfile,
MetadataExtractionResponse,
)
# Ontology scenario models (LLM extracted from scenarios)
from app.core.memory.models.ontology_scenario_models import (
OntologyClass,
@@ -124,6 +132,10 @@ __all__ = [
"Entity",
"Triplet",
"TripletExtractionResponse",
"UserMetadata",
"UserMetadataBehavioralHints",
"UserMetadataProfile",
"MetadataExtractionResponse",
# Ontology models
"OntologyClass",
"OntologyExtractionResponse",

View File

@@ -364,12 +364,14 @@ class ChunkNode(Node):
Attributes:
dialog_id: ID of the parent dialog
content: The text content of the chunk
speaker: Speaker identifier ('user' or 'assistant')
chunk_embedding: Optional embedding vector for the chunk
sequence_number: Order of this chunk within the dialog
metadata: Additional chunk metadata as key-value pairs
"""
dialog_id: str = Field(..., description="ID of the parent dialog")
content: str = Field(..., description="The text content of the chunk")
speaker: Optional[str] = Field(None, description="Speaker identifier: 'user' for user messages, 'assistant' for AI responses")
chunk_embedding: Optional[List[float]] = Field(None, description="Chunk embedding vector")
sequence_number: int = Field(..., description="Order of this chunk within the dialog")
metadata: dict = Field(default_factory=dict, description="Additional chunk metadata")

View File

@@ -0,0 +1,40 @@
"""Models for user metadata extraction.
Independent from triplet_models.py - these models are used by the
standalone metadata extraction pipeline (post-dedup async Celery task).
"""
from typing import List
from pydantic import BaseModel, ConfigDict, Field
class UserMetadataProfile(BaseModel):
"""用户画像信息"""
model_config = ConfigDict(extra='ignore')
role: str = Field(default="", description="用户职业或角色,如 teacher, doctor, software_engineer")
domain: str = Field(default="", description="用户所在领域,如 education, healthcare, software_development")
expertise: List[str] = Field(default_factory=list, description="用户擅长的技能或工具")
interests: List[str] = Field(default_factory=list, description="用户关注的话题或领域标签")
class UserMetadataBehavioralHints(BaseModel):
"""行为偏好"""
model_config = ConfigDict(extra='ignore')
learning_stage: str = Field(default="", description="学习阶段")
preferred_depth: str = Field(default="", description="偏好深度")
tone_preference: str = Field(default="", description="语气偏好")
class UserMetadata(BaseModel):
"""用户元数据顶层结构"""
model_config = ConfigDict(extra='ignore')
profile: UserMetadataProfile = Field(default_factory=UserMetadataProfile)
behavioral_hints: UserMetadataBehavioralHints = Field(default_factory=UserMetadataBehavioralHints)
knowledge_tags: List[str] = Field(default_factory=list, description="知识标签")
class MetadataExtractionResponse(BaseModel):
"""元数据提取 LLM 响应结构"""
model_config = ConfigDict(extra='ignore')
user_metadata: UserMetadata = Field(default_factory=UserMetadata)

View File

@@ -1,4 +1,3 @@
import argparse
import asyncio
import json
import math
@@ -6,7 +5,6 @@ import os
import time
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from uuid import UUID
if TYPE_CHECKING:
from app.schemas.memory_config_schema import MemoryConfig
@@ -23,7 +21,7 @@ from app.core.memory.utils.config.config_utils import (
)
from app.core.memory.utils.data.text_utils import extract_plain_query
from app.core.memory.utils.data.time_utils import normalize_date_safe
from app.core.memory.utils.llm.llm_utils import get_reranker_client
# from app.core.memory.utils.llm.llm_utils import get_reranker_client
from app.core.models.base import RedBearModelConfig
from app.db import get_db_context
from app.repositories.neo4j.graph_search import (
@@ -748,11 +746,10 @@ async def run_hybrid_search(
if search_type in ["keyword", "hybrid"]:
# Keyword-based search
logger.info("[PERF] Starting keyword search...")
keyword_start = time.time()
keyword_task = asyncio.create_task(
search_graph(
connector=connector,
q=query_text,
query=query_text,
end_user_id=end_user_id,
limit=limit,
include=include
@@ -762,7 +759,6 @@ async def run_hybrid_search(
if search_type in ["embedding", "hybrid"]:
# Embedding-based search
logger.info("[PERF] Starting embedding search...")
embedding_start = time.time()
# 从数据库读取嵌入器配置(按 ID并构建 RedBearModelConfig
config_load_start = time.time()
@@ -904,10 +900,10 @@ async def run_hybrid_search(
else:
results["latency_metrics"] = latency_metrics
logger.info(f"[PERF] ===== SEARCH PERFORMANCE SUMMARY =====")
logger.info("[PERF] ===== SEARCH PERFORMANCE SUMMARY =====")
logger.info(f"[PERF] Total search completed in {total_latency:.4f}s")
logger.info(f"[PERF] Latency breakdown: {json.dumps(latency_metrics, indent=2)}")
logger.info(f"[PERF] =========================================")
logger.info("[PERF] =========================================")
# Sanitize results: drop large/unused fields
_remove_keys_recursive(results, ["name_embedding"]) # drop entity name embeddings from outputs

View File

@@ -311,8 +311,35 @@ class ExtractionOrchestrator:
dialog_data_list,
)
# 步骤 7: 同步用户别名到数据库表(仅正式模式)
# 步骤 7: 同步用户别名到数据库表 + 触发异步元数据提取(仅正式模式)
if not is_pilot_run:
# 收集用户相关 statement 并触发异步元数据提取
try:
from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import MetadataExtractor
metadata_extractor = MetadataExtractor(llm_client=self.llm_client, language=self.language)
user_statements = metadata_extractor.collect_user_related_statements(
entity_nodes, statement_nodes,
statement_entity_edges
)
if user_statements:
# 获取 end_user_id 和 config_id
end_user_id = dialog_data_list[0].end_user_id if dialog_data_list else None
config_id = dialog_data_list[0].config_id if dialog_data_list and hasattr(dialog_data_list[0], 'config_id') else None
if end_user_id:
from app.tasks import extract_user_metadata_task
extract_user_metadata_task.delay(
end_user_id=str(end_user_id),
statements=user_statements,
config_id=str(config_id) if config_id else None,
language=self.language,
)
logger.info(f"已触发异步元数据提取任务,共 {len(user_statements)} 条用户相关 statement")
else:
logger.info("未找到用户相关 statement跳过元数据提取")
except Exception as e:
logger.error(f"触发元数据提取任务失败(不影响主流程): {e}", exc_info=True)
# 同步用户别名到数据库表
logger.info("步骤 7: 同步用户别名到 end_user 和 end_user_info 表")
await self._update_end_user_other_name(entity_nodes, dialog_data_list)
@@ -1107,6 +1134,7 @@ class ExtractionOrchestrator:
end_user_id=dialog_data.end_user_id,
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
content=chunk.content,
speaker=getattr(chunk, 'speaker', None),
chunk_embedding=chunk.chunk_embedding,
sequence_number=chunk_idx, # 添加必需的 sequence_number 字段
created_at=dialog_data.created_at,
@@ -1342,7 +1370,7 @@ class ExtractionOrchestrator:
async def _update_end_user_other_name(
self,
entity_nodes: List[ExtractedEntityNode],
dialog_data_list: List[DialogData]
dialog_data_list: List[DialogData],
) -> None:
"""
将本轮提取的用户别名同步到 end_user 和 end_user_info 表。
@@ -1470,7 +1498,6 @@ class ExtractionOrchestrator:
end_user_id=end_user_uuid,
other_name=first_alias,
aliases=merged_aliases,
meta_data={}
))
logger.info(f"创建 end_user_info 记录other_name={first_alias}, aliases={merged_aliases}")

View File

@@ -0,0 +1,152 @@
"""
Metadata extractor module.
Collects user-related statements from post-dedup graph data and
extracts user metadata via an independent LLM call.
"""
import logging
from typing import List, Optional
from app.core.memory.models.graph_models import (
ExtractedEntityNode,
StatementEntityEdge,
StatementNode,
)
from app.core.memory.models.metadata_models import (
MetadataExtractionResponse,
UserMetadata,
)
logger = logging.getLogger(__name__)
# Reuse the same user-entity detection logic from dedup module
_USER_NAMES = {"用户", "", "user", "i"}
_CANONICAL_USER_TYPE = "用户"
def _is_user_entity(ent: ExtractedEntityNode) -> bool:
"""判断实体是否为用户实体"""
name = (getattr(ent, "name", "") or "").strip().lower()
etype = (getattr(ent, "entity_type", "") or "").strip()
return name in _USER_NAMES or etype == _CANONICAL_USER_TYPE
class MetadataExtractor:
"""Extracts user metadata from post-dedup graph data via independent LLM call."""
def __init__(self, llm_client, language: str = "zh"):
self.llm_client = llm_client
self.language = language
@staticmethod
def detect_language(statements: List[str]) -> str:
"""根据 statement 文本内容检测语言。
如果文本中包含中文字符则返回 "zh",否则返回 "en"
"""
import re
combined = " ".join(statements)
if re.search(r'[\u4e00-\u9fff]', combined):
return "zh"
return "en"
def collect_user_related_statements(
self,
entity_nodes: List[ExtractedEntityNode],
statement_nodes: List[StatementNode],
statement_entity_edges: List[StatementEntityEdge],
) -> List[str]:
"""
从去重后的数据中筛选与用户直接相关且由用户发言的 statement 文本。
筛选逻辑:
1. 用户实体 → StatementEntityEdge → statement直接关联
2. 只保留 speaker="user" 的 statement过滤 assistant 回复的噪声)
Returns:
用户发言的 statement 文本列表
"""
# Find user entity IDs
user_entity_ids = set()
for ent in entity_nodes:
if _is_user_entity(ent):
user_entity_ids.add(ent.id)
if not user_entity_ids:
logger.debug("未找到用户实体节点,跳过 statement 收集")
return []
# 用户实体 → StatementEntityEdge → statement
target_stmt_ids = set()
for edge in statement_entity_edges:
if edge.target in user_entity_ids:
target_stmt_ids.add(edge.source)
# Collect: only speaker="user" statements, preserving order
result = []
seen = set()
total_associated = 0
skipped_non_user = 0
for stmt_node in statement_nodes:
if stmt_node.id in target_stmt_ids and stmt_node.id not in seen:
total_associated += 1
speaker = getattr(stmt_node, 'speaker', None) or 'unknown'
if speaker == "user":
text = (stmt_node.statement or "").strip()
if text:
result.append(text)
else:
skipped_non_user += 1
seen.add(stmt_node.id)
logger.info(
f"收集到 {len(result)} 条用户发言 statement "
f"(直接关联: {total_associated}, speaker=user: {len(result)}, "
f"跳过非user: {skipped_non_user})"
)
if total_associated > 0 and len(result) == 0:
logger.warning(
f"{total_associated} 条直接关联 statement 但全部被 speaker 过滤,"
f"可能本次写入不包含 user 消息"
)
return result
async def extract_metadata(self, statements: List[str]) -> Optional[UserMetadata]:
"""
对筛选后的 statement 列表调用 LLM 提取元数据。
语言根据 statement 内容自动检测,不依赖系统界面语言。
Returns:
UserMetadata on success, None on failure
"""
if not statements:
return None
try:
from app.core.memory.utils.prompt.prompt_utils import prompt_env
# 根据写入内容的语言自动检测,而非使用系统界面语言
detected_language = self.detect_language(statements)
logger.info(f"元数据提取语言检测结果: {detected_language}")
template = prompt_env.get_template("extract_user_metadata.jinja2")
prompt = template.render(
statements=statements,
language=detected_language,
json_schema="",
)
response = await self.llm_client.response_structured(
messages=[{"role": "user", "content": prompt}],
response_model=MetadataExtractionResponse,
)
if response and response.user_metadata:
return response.user_metadata
logger.warning("LLM 返回的元数据为空")
return None
except Exception as e:
logger.error(f"元数据提取 LLM 调用失败: {e}", exc_info=True)
return None

View File

@@ -1,4 +1,3 @@
import os
import asyncio
from typing import List, Dict, Optional

View File

@@ -5,7 +5,7 @@
使用Neo4j的全文索引进行高效的文本匹配。
"""
from typing import List, Dict, Any, Optional
from typing import List, Optional
from app.core.logging_config import get_memory_logger
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.core.memory.storage_services.search.search_strategy import SearchStrategy, SearchResult
@@ -74,7 +74,7 @@ class KeywordSearchStrategy(SearchStrategy):
# 调用底层的关键词搜索函数
results_dict = await search_graph(
connector=self.connector,
q=query_text,
query=query_text,
end_user_id=end_user_id,
limit=limit,
include=include_list

View File

@@ -22,7 +22,9 @@ def escape_lucene_query(query: str) -> str:
s = s.replace("\r", " ").replace("\n", " ").strip()
# Lucene reserved tokens/special characters
specials = ['&&', '||', '\\', '+', '-', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~', '*', '?', ':']
# NOTE: '/' is the regex delimiter in Lucene — must be escaped to prevent
# TokenMgrError when the query contains unmatched slashes.
specials = ['&&', '||', '\\', '+', '-', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~', '*', '?', ':', '/']
# Replace longer tokens first to avoid partial double-escaping
for token in sorted(specials, key=len, reverse=True):
s = s.replace(token, f"\\{token}")

View File

@@ -0,0 +1,179 @@
"""
Metadata utility functions for cleaning, validating, aggregating, and merging
user metadata extracted from conversations.
"""
import logging
from datetime import datetime, timezone
from typing import Optional
from app.core.memory.models.metadata_models import UserMetadata
logger = logging.getLogger(__name__)
def clean_metadata(raw: dict) -> dict:
"""
Clean metadata by removing empty string values and empty array fields recursively.
Only keeps fields with actual content. If a nested dict becomes empty after cleaning,
it is removed too.
"""
cleaned = {}
for key, value in raw.items():
if isinstance(value, dict):
nested = clean_metadata(value)
if nested:
cleaned[key] = nested
elif isinstance(value, list):
if len(value) > 0:
cleaned[key] = value
elif isinstance(value, str):
if value != "":
cleaned[key] = value
else:
cleaned[key] = value
return cleaned
# TODO 这个函数没有调用的地方
def validate_metadata(raw: dict) -> Optional[UserMetadata]:
"""
Validate metadata structure using the Pydantic UserMetadata model.
Returns None and logs a WARNING on validation failure.
"""
try:
return UserMetadata.model_validate(raw)
except Exception as e:
logger.warning("Metadata validation failed: %s", e)
return None
def merge_metadata(existing: dict, new: dict) -> dict:
"""
Merge new extracted metadata with existing database metadata.
- Scalar fields: new value overwrites old value
- Array fields: support _op marker (append/replace/remove)
- Missing top-level keys in new: preserve existing data
- Auto-update _updated_at timestamp dict with field paths and ISO timestamps
- When existing is None or {}: directly write new + _updated_at (no merge logic)
"""
now = datetime.now(timezone.utc).isoformat()
if not existing:
# Direct write: new + _updated_at for all fields
result = dict(new)
updated_at = {}
_collect_field_paths(result, "", updated_at, now)
if updated_at:
result["_updated_at"] = updated_at
return result
result = dict(existing)
updated_at: dict = dict(result.get("_updated_at", {}))
for key, new_value in new.items():
if key == "_updated_at":
continue
old_value = result.get(key)
if isinstance(new_value, dict) and isinstance(old_value, dict):
# Nested dict merge (e.g. profile, behavioral_hints)
_merge_nested(result, key, old_value, new_value, updated_at, now)
elif isinstance(new_value, list) or (isinstance(new_value, dict) and "_op" in new_value):
# Array field with possible _op
_merge_array_field(result, key, old_value, new_value, updated_at, now)
else:
# Scalar top-level field
if old_value != new_value:
result[key] = new_value
updated_at[key] = now
# If equal, no change needed
result["_updated_at"] = updated_at
return result
# TODO 考虑大函数包含小函数,因为只服务于大函数,实现代码文件的结构清楚
def _collect_field_paths(data: dict, prefix: str, updated_at: dict, now: str) -> None:
"""Collect all leaf field paths for _updated_at on direct write."""
for key, value in data.items():
if key == "_updated_at":
continue
path = f"{prefix}{key}" if not prefix else f"{prefix}.{key}"
if isinstance(value, dict):
_collect_field_paths(value, path, updated_at, now)
else:
updated_at[path] = now
def _merge_nested(
result: dict, key: str, old_dict: dict, new_dict: dict,
updated_at: dict, now: str
) -> None:
"""Merge a nested dict (e.g. profile, behavioral_hints)."""
merged = dict(old_dict)
for field, new_val in new_dict.items():
old_val = merged.get(field)
path = f"{key}.{field}"
if isinstance(new_val, list) or (isinstance(new_val, dict) and "_op" in new_val):
_merge_array_field_inner(merged, field, old_val, new_val, updated_at, path, now)
else:
# Scalar field
if old_val != new_val:
merged[field] = new_val
updated_at[path] = now
result[key] = merged
def _merge_array_field(
result: dict, key: str, old_value, new_value,
updated_at: dict, now: str
) -> None:
"""Merge a top-level array field with _op support."""
_merge_array_field_inner(result, key, old_value, new_value, updated_at, key, now)
def _merge_array_field_inner(
container: dict, field: str, old_value, new_value,
updated_at: dict, path: str, now: str
) -> None:
"""Core array merge logic with _op support."""
# Determine op and items
if isinstance(new_value, dict) and "_op" in new_value:
op = new_value.get("_op", "append")
items = new_value.get(field, new_value.get("items", []))
# If the dict has a key matching the field name, use it; otherwise look for list values
if not isinstance(items, list):
# Try to find the list value in the dict (excluding _op)
for k, v in new_value.items():
if k != "_op" and isinstance(v, list):
items = v
break
else:
items = []
elif isinstance(new_value, list):
op = "append"
items = new_value
else:
op = "append"
items = []
old_arr = old_value if isinstance(old_value, list) else []
if op == "replace":
new_arr = items
elif op == "remove":
new_arr = [x for x in old_arr if x not in items]
else:
# append (default): merge and deduplicate
seen = list(old_arr)
for item in items:
if item not in seen:
seen.append(item)
new_arr = seen
if old_arr != new_arr:
container[field] = new_arr
updated_at[path] = now
else:
container[field] = new_arr

View File

@@ -406,4 +406,12 @@ Output:
- **⚠️ ALIASES ORDER: preserve temporal order of appearance**
- **🚨 MANDATORY FIELD: EVERY entity MUST include "aliases" field, even if empty array []**
**Output JSON structure:**
```json
{
"triplets": [...],
"entities": [...]
}
```
{{ json_schema }}

View File

@@ -0,0 +1,74 @@
===Task===
Extract user metadata from the following conversation statements spoken by the user.
{% if language == "zh" %}
**"三度原则"判断标准:**
- 复用度:该信息是否会被多个功能模块使用?
- 约束度:该信息是否会影响系统行为?
- 时效性:该信息是长期稳定的还是临时的?仅提取长期稳定信息。
**提取规则:**
- **只提取关于"用户本人"的画像信息**,忽略用户提到的第三方人物(如朋友、同事、家人)的信息
- 仅提取文本中明确提到的信息,不要推测
- 如果文本中没有可提取的用户画像信息,返回空的 user_metadata 对象
- **输出语言必须与输入文本的语言一致**(输入中文则输出中文值,输入英文则输出英文值)
**字段说明:**
- profile.role用户的职业或角色如 教师、医生、后端工程师
- profile.domain用户所在领域如 教育、医疗、软件开发
- profile.expertise用户擅长的技能或工具通用不限于编程如 Python、心理咨询、高中物理
- profile.interests用户主动表达兴趣的话题或领域标签
- behavioral_hints.learning_stage学习阶段初学者/中级/高级)
- behavioral_hints.preferred_depth偏好深度概览/技术细节/深入探讨)
- behavioral_hints.tone_preference语气偏好轻松随意/专业简洁/学术严谨)
- knowledge_tags用户涉及的知识领域标签
{% else %}
**"Three-Degree Principle" criteria:**
- Reusability: Will this information be used by multiple functional modules?
- Constraint: Will this information affect system behavior?
- Timeliness: Is this information long-term stable or temporary? Only extract long-term stable information.
**Extraction rules:**
- **Only extract profile information about the user themselves**, ignore information about third parties (friends, colleagues, family) mentioned by the user
- Only extract information explicitly mentioned in the text, do not speculate
- If no user profile information can be extracted, return an empty user_metadata object
- **Output language must match the input text language**
**Field descriptions:**
- profile.role: User's occupation or role, e.g. teacher, doctor, software engineer
- profile.domain: User's domain, e.g. education, healthcare, software development
- profile.expertise: User's skills or tools (general, not limited to programming)
- profile.interests: Topics or domain tags the user actively expressed interest in
- behavioral_hints.learning_stage: Learning stage (beginner/intermediate/advanced)
- behavioral_hints.preferred_depth: Preferred depth (overview/detailed/deep dive)
- behavioral_hints.tone_preference: Tone preference (casual/professional/academic)
- knowledge_tags: Knowledge domain tags related to the user
{% endif %}
===User Statements===
{% for stmt in statements %}
- {{ stmt }}
{% endfor %}
===Output Format===
Return a JSON object with the following structure:
```json
{
"user_metadata": {
"profile": {
"role": "",
"domain": "",
"expertise": [],
"interests": []
},
"behavioral_hints": {
"learning_stage": "",
"preferred_depth": "",
"tone_preference": ""
},
"knowledge_tags": []
}
}
```
{{ json_schema }}