Merge pull request #851 from SuanmoSuanyangTechnology/feat/extract-metadata

Feat/extract metadata
This commit is contained in:
Ke Sun
2026-04-10 18:11:04 +08:00
committed by GitHub
19 changed files with 863 additions and 108 deletions

View File

@@ -111,6 +111,9 @@ celery_app.conf.update(
# Clustering tasks → memory_tasks queue (使用相同的 worker避免 macOS fork 问题) # Clustering tasks → memory_tasks queue (使用相同的 worker避免 macOS fork 问题)
'app.tasks.run_incremental_clustering': {'queue': 'memory_tasks'}, 'app.tasks.run_incremental_clustering': {'queue': 'memory_tasks'},
# Metadata extraction → memory_tasks queue
'app.tasks.extract_user_metadata': {'queue': 'memory_tasks'},
# Document tasks → document_tasks queue (prefork worker) # Document tasks → document_tasks queue (prefork worker)
'app.core.rag.tasks.parse_document': {'queue': 'document_tasks'}, 'app.core.rag.tasks.parse_document': {'queue': 'document_tasks'},
'app.core.rag.tasks.build_graphrag_for_kb': {'queue': 'document_tasks'}, 'app.core.rag.tasks.build_graphrag_for_kb': {'queue': 'document_tasks'},

View File

@@ -153,7 +153,7 @@ class PerceptualSearchService:
return [] return []
try: try:
r = await search_perceptual( r = await search_perceptual(
connector=connector, q=escaped, connector=connector, query=escaped,
end_user_id=self.end_user_id, end_user_id=self.end_user_id,
limit=limit * 5, # 多查一些以提高命中率 limit=limit * 5, # 多查一些以提高命中率
) )
@@ -178,7 +178,7 @@ class PerceptualSearchService:
if not escaped.strip(): if not escaped.strip():
return [] return []
r = await search_perceptual( r = await search_perceptual(
connector=connector, q=escaped, connector=connector, query=escaped,
end_user_id=self.end_user_id, limit=limit, end_user_id=self.end_user_id, limit=limit,
) )
return r.get("perceptuals", []) return r.get("perceptuals", [])

View File

@@ -58,6 +58,14 @@ from app.core.memory.models.triplet_models import (
TripletExtractionResponse, TripletExtractionResponse,
) )
# User metadata models
from app.core.memory.models.metadata_models import (
UserMetadata,
UserMetadataBehavioralHints,
UserMetadataProfile,
MetadataExtractionResponse,
)
# Ontology scenario models (LLM extracted from scenarios) # Ontology scenario models (LLM extracted from scenarios)
from app.core.memory.models.ontology_scenario_models import ( from app.core.memory.models.ontology_scenario_models import (
OntologyClass, OntologyClass,
@@ -124,6 +132,10 @@ __all__ = [
"Entity", "Entity",
"Triplet", "Triplet",
"TripletExtractionResponse", "TripletExtractionResponse",
"UserMetadata",
"UserMetadataBehavioralHints",
"UserMetadataProfile",
"MetadataExtractionResponse",
# Ontology models # Ontology models
"OntologyClass", "OntologyClass",
"OntologyExtractionResponse", "OntologyExtractionResponse",

View File

@@ -364,12 +364,14 @@ class ChunkNode(Node):
Attributes: Attributes:
dialog_id: ID of the parent dialog dialog_id: ID of the parent dialog
content: The text content of the chunk content: The text content of the chunk
speaker: Speaker identifier ('user' or 'assistant')
chunk_embedding: Optional embedding vector for the chunk chunk_embedding: Optional embedding vector for the chunk
sequence_number: Order of this chunk within the dialog sequence_number: Order of this chunk within the dialog
metadata: Additional chunk metadata as key-value pairs metadata: Additional chunk metadata as key-value pairs
""" """
dialog_id: str = Field(..., description="ID of the parent dialog") dialog_id: str = Field(..., description="ID of the parent dialog")
content: str = Field(..., description="The text content of the chunk") content: str = Field(..., description="The text content of the chunk")
speaker: Optional[str] = Field(None, description="Speaker identifier: 'user' for user messages, 'assistant' for AI responses")
chunk_embedding: Optional[List[float]] = Field(None, description="Chunk embedding vector") chunk_embedding: Optional[List[float]] = Field(None, description="Chunk embedding vector")
sequence_number: int = Field(..., description="Order of this chunk within the dialog") sequence_number: int = Field(..., description="Order of this chunk within the dialog")
metadata: dict = Field(default_factory=dict, description="Additional chunk metadata") metadata: dict = Field(default_factory=dict, description="Additional chunk metadata")

View File

@@ -0,0 +1,57 @@
"""Models for user metadata extraction.
Independent from triplet_models.py - these models are used by the
standalone metadata extraction pipeline (post-dedup async Celery task).
"""
from typing import List
from pydantic import BaseModel, ConfigDict, Field
class UserMetadataProfile(BaseModel):
"""用户画像信息"""
model_config = ConfigDict(extra="ignore")
role: str = Field(default="", description="用户职业或角色")
domain: str = Field(default="", description="用户所在领域")
expertise: List[str] = Field(
default_factory=list, description="用户擅长的技能或工具"
)
interests: List[str] = Field(
default_factory=list, description="用户关注的话题或领域标签"
)
class UserMetadataBehavioralHints(BaseModel):
"""行为偏好"""
model_config = ConfigDict(extra="ignore")
learning_stage: str = Field(default="", description="学习阶段")
preferred_depth: str = Field(default="", description="偏好深度")
tone_preference: str = Field(default="", description="语气偏好")
class UserMetadata(BaseModel):
"""用户元数据顶层结构"""
model_config = ConfigDict(extra="ignore")
profile: UserMetadataProfile = Field(default_factory=UserMetadataProfile)
behavioral_hints: UserMetadataBehavioralHints = Field(
default_factory=UserMetadataBehavioralHints
)
knowledge_tags: List[str] = Field(default_factory=list, description="知识标签")
class MetadataExtractionResponse(BaseModel):
"""元数据提取 LLM 响应结构"""
model_config = ConfigDict(extra="ignore")
user_metadata: UserMetadata = Field(default_factory=UserMetadata)
aliases_to_add: List[str] = Field(
default_factory=list,
description="本次新发现的用户别名(用户自我介绍或他人对用户的称呼)",
)
aliases_to_remove: List[str] = Field(
default_factory=list, description="用户明确否认的别名(如'我不叫XX了'"
)

View File

@@ -1,4 +1,3 @@
import argparse
import asyncio import asyncio
import json import json
import math import math
@@ -6,7 +5,6 @@ import os
import time import time
from datetime import datetime from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional from typing import TYPE_CHECKING, Any, Dict, List, Optional
from uuid import UUID
if TYPE_CHECKING: if TYPE_CHECKING:
from app.schemas.memory_config_schema import MemoryConfig from app.schemas.memory_config_schema import MemoryConfig
@@ -23,7 +21,7 @@ from app.core.memory.utils.config.config_utils import (
) )
from app.core.memory.utils.data.text_utils import extract_plain_query from app.core.memory.utils.data.text_utils import extract_plain_query
from app.core.memory.utils.data.time_utils import normalize_date_safe from app.core.memory.utils.data.time_utils import normalize_date_safe
from app.core.memory.utils.llm.llm_utils import get_reranker_client # from app.core.memory.utils.llm.llm_utils import get_reranker_client
from app.core.models.base import RedBearModelConfig from app.core.models.base import RedBearModelConfig
from app.db import get_db_context from app.db import get_db_context
from app.repositories.neo4j.graph_search import ( from app.repositories.neo4j.graph_search import (
@@ -748,11 +746,10 @@ async def run_hybrid_search(
if search_type in ["keyword", "hybrid"]: if search_type in ["keyword", "hybrid"]:
# Keyword-based search # Keyword-based search
logger.info("[PERF] Starting keyword search...") logger.info("[PERF] Starting keyword search...")
keyword_start = time.time()
keyword_task = asyncio.create_task( keyword_task = asyncio.create_task(
search_graph( search_graph(
connector=connector, connector=connector,
q=query_text, query=query_text,
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit, limit=limit,
include=include include=include
@@ -762,7 +759,6 @@ async def run_hybrid_search(
if search_type in ["embedding", "hybrid"]: if search_type in ["embedding", "hybrid"]:
# Embedding-based search # Embedding-based search
logger.info("[PERF] Starting embedding search...") logger.info("[PERF] Starting embedding search...")
embedding_start = time.time()
# 从数据库读取嵌入器配置(按 ID并构建 RedBearModelConfig # 从数据库读取嵌入器配置(按 ID并构建 RedBearModelConfig
config_load_start = time.time() config_load_start = time.time()
@@ -904,10 +900,10 @@ async def run_hybrid_search(
else: else:
results["latency_metrics"] = latency_metrics results["latency_metrics"] = latency_metrics
logger.info(f"[PERF] ===== SEARCH PERFORMANCE SUMMARY =====") logger.info("[PERF] ===== SEARCH PERFORMANCE SUMMARY =====")
logger.info(f"[PERF] Total search completed in {total_latency:.4f}s") logger.info(f"[PERF] Total search completed in {total_latency:.4f}s")
logger.info(f"[PERF] Latency breakdown: {json.dumps(latency_metrics, indent=2)}") logger.info(f"[PERF] Latency breakdown: {json.dumps(latency_metrics, indent=2)}")
logger.info(f"[PERF] =========================================") logger.info("[PERF] =========================================")
# Sanitize results: drop large/unused fields # Sanitize results: drop large/unused fields
_remove_keys_recursive(results, ["name_embedding"]) # drop entity name embeddings from outputs _remove_keys_recursive(results, ["name_embedding"]) # drop entity name embeddings from outputs

View File

@@ -311,10 +311,53 @@ class ExtractionOrchestrator:
dialog_data_list, dialog_data_list,
) )
# 步骤 7: 同步用户别名到数据库表(仅正式模式) # 步骤 7: 触发异步元数据和别名提取(仅正式模式)
if not is_pilot_run: if not is_pilot_run:
logger.info("步骤 7: 同步用户别名到 end_user 和 end_user_info 表") try:
await self._update_end_user_other_name(entity_nodes, dialog_data_list) from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import (
MetadataExtractor,
)
metadata_extractor = MetadataExtractor(
llm_client=self.llm_client, language=self.language
)
user_statements = (
metadata_extractor.collect_user_related_statements(
entity_nodes, statement_nodes, statement_entity_edges
)
)
if user_statements:
end_user_id = (
dialog_data_list[0].end_user_id
if dialog_data_list
else None
)
config_id = (
dialog_data_list[0].config_id
if dialog_data_list
and hasattr(dialog_data_list[0], "config_id")
else None
)
if end_user_id:
from app.tasks import extract_user_metadata_task
extract_user_metadata_task.delay(
end_user_id=str(end_user_id),
statements=user_statements,
config_id=str(config_id) if config_id else None,
language=self.language,
)
logger.info(
f"已触发异步元数据提取任务,共 {len(user_statements)} 条用户相关 statement"
)
else:
logger.info("未找到用户相关 statement跳过元数据提取")
except Exception as e:
logger.error(
f"触发元数据提取任务失败(不影响主流程): {e}", exc_info=True
)
# 别名同步已迁移到 Celery 元数据提取任务中,不再在此处执行
logger.info(f"知识提取流水线运行完成({mode_str}") logger.info(f"知识提取流水线运行完成({mode_str}")
return ( return (
@@ -1107,6 +1150,7 @@ class ExtractionOrchestrator:
end_user_id=dialog_data.end_user_id, end_user_id=dialog_data.end_user_id,
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
content=chunk.content, content=chunk.content,
speaker=getattr(chunk, 'speaker', None),
chunk_embedding=chunk.chunk_embedding, chunk_embedding=chunk.chunk_embedding,
sequence_number=chunk_idx, # 添加必需的 sequence_number 字段 sequence_number=chunk_idx, # 添加必需的 sequence_number 字段
created_at=dialog_data.created_at, created_at=dialog_data.created_at,
@@ -1342,7 +1386,7 @@ class ExtractionOrchestrator:
async def _update_end_user_other_name( async def _update_end_user_other_name(
self, self,
entity_nodes: List[ExtractedEntityNode], entity_nodes: List[ExtractedEntityNode],
dialog_data_list: List[DialogData] dialog_data_list: List[DialogData],
) -> None: ) -> None:
""" """
将本轮提取的用户别名同步到 end_user 和 end_user_info 表。 将本轮提取的用户别名同步到 end_user 和 end_user_info 表。
@@ -1470,7 +1514,6 @@ class ExtractionOrchestrator:
end_user_id=end_user_uuid, end_user_id=end_user_uuid,
other_name=first_alias, other_name=first_alias,
aliases=merged_aliases, aliases=merged_aliases,
meta_data={}
)) ))
logger.info(f"创建 end_user_info 记录other_name={first_alias}, aliases={merged_aliases}") logger.info(f"创建 end_user_info 记录other_name={first_alias}, aliases={merged_aliases}")
@@ -1478,9 +1521,6 @@ class ExtractionOrchestrator:
except Exception as e: except Exception as e:
logger.error(f"更新 end_user other_name 失败: {e}", exc_info=True) logger.error(f"更新 end_user other_name 失败: {e}", exc_info=True)
# 用户实体占位名称,不允许作为 other_name 或出现在 aliases 中 # 用户实体占位名称,不允许作为 other_name 或出现在 aliases 中
# 复用 deduped_and_disamb 模块级常量,避免重复维护 # 复用 deduped_and_disamb 模块级常量,避免重复维护
USER_PLACEHOLDER_NAMES = _USER_PLACEHOLDER_NAMES USER_PLACEHOLDER_NAMES = _USER_PLACEHOLDER_NAMES
@@ -1587,7 +1627,6 @@ class ExtractionOrchestrator:
if candidate and candidate.lower() in self.USER_PLACEHOLDER_NAMES: if candidate and candidate.lower() in self.USER_PLACEHOLDER_NAMES:
return None return None
return candidate return candidate
return None return None
async def _run_dedup_and_write_summary( async def _run_dedup_and_write_summary(

View File

@@ -0,0 +1,175 @@
"""
Metadata extractor module.
Collects user-related statements from post-dedup graph data and
extracts user metadata via an independent LLM call.
"""
import logging
from typing import List, Optional
from app.core.memory.models.graph_models import (
ExtractedEntityNode,
StatementEntityEdge,
StatementNode,
)
logger = logging.getLogger(__name__)
# Reuse the same user-entity detection logic from dedup module
_USER_NAMES = {"用户", "", "user", "i"}
_CANONICAL_USER_TYPE = "用户"
def _is_user_entity(ent: ExtractedEntityNode) -> bool:
"""判断实体是否为用户实体"""
name = (getattr(ent, "name", "") or "").strip().lower()
etype = (getattr(ent, "entity_type", "") or "").strip()
return name in _USER_NAMES or etype == _CANONICAL_USER_TYPE
class MetadataExtractor:
"""Extracts user metadata from post-dedup graph data via independent LLM call."""
def __init__(self, llm_client, language: Optional[str] = None):
self.llm_client = llm_client
self.language = language
@staticmethod
def detect_language(statements: List[str]) -> str:
"""根据 statement 文本内容检测语言。
如果文本中包含中文字符则返回 "zh",否则返回 "en"
"""
import re
combined = " ".join(statements)
if re.search(r"[\u4e00-\u9fff]", combined):
return "zh"
return "en"
def collect_user_related_statements(
self,
entity_nodes: List[ExtractedEntityNode],
statement_nodes: List[StatementNode],
statement_entity_edges: List[StatementEntityEdge],
) -> List[str]:
"""
从去重后的数据中筛选与用户直接相关且由用户发言的 statement 文本。
筛选逻辑:
1. 用户实体 → StatementEntityEdge → statement直接关联
2. 只保留 speaker="user" 的 statement过滤 assistant 回复的噪声)
Returns:
用户发言的 statement 文本列表
"""
# Find user entity IDs
user_entity_ids = set()
for ent in entity_nodes:
if _is_user_entity(ent):
user_entity_ids.add(ent.id)
if not user_entity_ids:
logger.debug("未找到用户实体节点,跳过 statement 收集")
return []
# 用户实体 → StatementEntityEdge → statement
target_stmt_ids = set()
for edge in statement_entity_edges:
if edge.target in user_entity_ids:
target_stmt_ids.add(edge.source)
# Collect: only speaker="user" statements, preserving order
result = []
seen = set()
total_associated = 0
skipped_non_user = 0
for stmt_node in statement_nodes:
if stmt_node.id in target_stmt_ids and stmt_node.id not in seen:
total_associated += 1
speaker = getattr(stmt_node, "speaker", None) or "unknown"
if speaker == "user":
text = (stmt_node.statement or "").strip()
if text:
result.append(text)
else:
skipped_non_user += 1
seen.add(stmt_node.id)
logger.info(
f"收集到 {len(result)} 条用户发言 statement "
f"(直接关联: {total_associated}, speaker=user: {len(result)}, "
f"跳过非user: {skipped_non_user})"
)
if result:
for i, text in enumerate(result):
logger.info(f" [user statement {i + 1}] {text}")
if total_associated > 0 and len(result) == 0:
logger.warning(
f"{total_associated} 条直接关联 statement 但全部被 speaker 过滤,"
f"可能本次写入不包含 user 消息"
)
return result
async def extract_metadata(
self,
statements: List[str],
existing_metadata: Optional[dict] = None,
existing_aliases: Optional[List[str]] = None,
) -> Optional[tuple]:
"""
对筛选后的 statement 列表调用 LLM 提取元数据和用户别名。
Args:
statements: 用户发言的 statement 文本列表
existing_metadata: 数据库已有的元数据(可选)
existing_aliases: 数据库已有的用户别名列表(可选)
Returns:
(UserMetadata, List[str], List[str]) tuple: (metadata, aliases_to_add, aliases_to_remove) on success, None on failure
"""
if not statements:
return None
try:
from app.core.memory.utils.prompt.prompt_utils import prompt_env
if self.language:
detected_language = self.language
logger.info(f"元数据提取使用显式指定语言: {detected_language}")
else:
detected_language = self.detect_language(statements)
logger.info(f"元数据提取语言自动检测结果: {detected_language}")
template = prompt_env.get_template("extract_user_metadata.jinja2")
prompt = template.render(
statements=statements,
language=detected_language,
existing_metadata=existing_metadata,
existing_aliases=existing_aliases,
json_schema="",
)
from app.core.memory.models.metadata_models import (
MetadataExtractionResponse,
)
response = await self.llm_client.response_structured(
messages=[{"role": "user", "content": prompt}],
response_model=MetadataExtractionResponse,
)
if response:
metadata = response.user_metadata if response.user_metadata else None
to_add = response.aliases_to_add if response.aliases_to_add else []
to_remove = (
response.aliases_to_remove if response.aliases_to_remove else []
)
return metadata, to_add, to_remove
logger.warning("LLM 返回的响应为空")
return None
except Exception as e:
logger.error(f"元数据提取 LLM 调用失败: {e}", exc_info=True)
return None

View File

@@ -1,6 +1,5 @@
import asyncio import asyncio
import logging import logging
import os
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
@@ -82,6 +81,7 @@ class StatementExtractor:
logger.warning(f"Chunk {getattr(chunk, 'id', 'unknown')} has no speaker field or is empty") logger.warning(f"Chunk {getattr(chunk, 'id', 'unknown')} has no speaker field or is empty")
return None return None
async def _extract_statements(self, chunk, end_user_id: Optional[str] = None, dialogue_content: str = None) -> List[Statement]: async def _extract_statements(self, chunk, end_user_id: Optional[str] = None, dialogue_content: str = None) -> List[Statement]:
"""Process a single chunk and return extracted statements """Process a single chunk and return extracted statements
@@ -94,6 +94,7 @@ class StatementExtractor:
List of ExtractedStatement objects extracted from the chunk List of ExtractedStatement objects extracted from the chunk
""" """
chunk_content = chunk.content chunk_content = chunk.content
chunk_speaker = self._get_speaker_from_chunk(chunk)
if not chunk_content or len(chunk_content.strip()) < 5: if not chunk_content or len(chunk_content.strip()) < 5:
logger.warning(f"Chunk {chunk.id} content too short or empty, skipping") logger.warning(f"Chunk {chunk.id} content too short or empty, skipping")
@@ -150,8 +151,6 @@ class StatementExtractor:
except (KeyError, ValueError): except (KeyError, ValueError):
relevence_info = RelevenceInfo.RELEVANT relevence_info = RelevenceInfo.RELEVANT
chunk_speaker = self._get_speaker_from_chunk(chunk)
chunk_statement = Statement( chunk_statement = Statement(
statement=extracted_stmt.statement, statement=extracted_stmt.statement,
stmt_type=stmt_type, stmt_type=stmt_type,

View File

@@ -1,4 +1,3 @@
import os
import asyncio import asyncio
from typing import List, Dict, Optional from typing import List, Dict, Optional

View File

@@ -5,7 +5,7 @@
使用Neo4j的全文索引进行高效的文本匹配。 使用Neo4j的全文索引进行高效的文本匹配。
""" """
from typing import List, Dict, Any, Optional from typing import List, Optional
from app.core.logging_config import get_memory_logger from app.core.logging_config import get_memory_logger
from app.repositories.neo4j.neo4j_connector import Neo4jConnector from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.core.memory.storage_services.search.search_strategy import SearchStrategy, SearchResult from app.core.memory.storage_services.search.search_strategy import SearchStrategy, SearchResult
@@ -74,7 +74,7 @@ class KeywordSearchStrategy(SearchStrategy):
# 调用底层的关键词搜索函数 # 调用底层的关键词搜索函数
results_dict = await search_graph( results_dict = await search_graph(
connector=self.connector, connector=self.connector,
q=query_text, query=query_text,
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit, limit=limit,
include=include_list include=include_list

View File

@@ -22,7 +22,9 @@ def escape_lucene_query(query: str) -> str:
s = s.replace("\r", " ").replace("\n", " ").strip() s = s.replace("\r", " ").replace("\n", " ").strip()
# Lucene reserved tokens/special characters # Lucene reserved tokens/special characters
specials = ['&&', '||', '\\', '+', '-', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~', '*', '?', ':'] # NOTE: '/' is the regex delimiter in Lucene — must be escaped to prevent
# TokenMgrError when the query contains unmatched slashes.
specials = ['&&', '||', '\\', '+', '-', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~', '*', '?', ':', '/']
# Replace longer tokens first to avoid partial double-escaping # Replace longer tokens first to avoid partial double-escaping
for token in sorted(specials, key=len, reverse=True): for token in sorted(specials, key=len, reverse=True):
s = s.replace(token, f"\\{token}") s = s.replace(token, f"\\{token}")

View File

@@ -43,8 +43,9 @@ Each statement must be labeled as per the criteria mentioned below.
对话上下文和共指消解: 对话上下文和共指消解:
- 将每个陈述句归属于说出它的参与者。 - 将每个陈述句归属于说出它的参与者。
- 如果参与者列表为说话者提供了名称(例如,"李雪(用户)"),请在提取的陈述句中使用具体名称("李雪"),而不是通用角色("用户" - **对于用户的发言:必须使用"用户"作为主语**,禁止将"用户"或"我"替换为用户的真实姓名或别名。例如,用户说"我叫张三"应提取为"用户叫张三",而不是"张三叫张三"
- 将所有代词解析为对话上下文中的具体人物或实体 - 对于 AI 助手的发言:使用"助手"或"AI助手"作为主语
- 将所有代词解析为对话上下文中的具体人物或实体,但"我"必须解析为"用户"。
- 识别并将抽象引用解析为其具体名称(如果提到)。 - 识别并将抽象引用解析为其具体名称(如果提到)。
- 将缩写和首字母缩略词扩展为其完整形式。 - 将缩写和首字母缩略词扩展为其完整形式。
{% else %} {% else %}
@@ -68,8 +69,9 @@ Context Resolution Requirements:
Conversational Context & Co-reference Resolution: Conversational Context & Co-reference Resolution:
- Attribute every statement to the participant who uttered it. - Attribute every statement to the participant who uttered it.
- If the participant list provides a name for a speaker (e.g., "李雪 (用户)"), use the specific name ("李雪") in the extracted statement, not the generic role ("用户"). - **For user's statements: always use "用户" (User) as the subject**. Do NOT replace "用户" or "I" with the user's real name or alias. For example, if the user says "I'm John", extract as "用户 is John", not "John is John".
- Resolve all pronouns to the specific person or entity from the conversation's context. - For AI assistant's statements: use "助手" or "AI助手" as the subject.
- Resolve all pronouns to the specific person or entity from the conversation's context, but "I"/"我" must always resolve to "用户".
- Identify and resolve abstract references to their specific names if mentioned. - Identify and resolve abstract references to their specific names if mentioned.
- Expand abbreviations and acronyms to their full form. - Expand abbreviations and acronyms to their full form.
{% endif %} {% endif %}
@@ -139,13 +141,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料与阿拉伯树胶等粘合
示例输出: { 示例输出: {
"statements": [ "statements": [
{ {
"statement": "Sarah Chen 最近一直在尝试水彩画。", "statement": "用户最近一直在尝试水彩画。",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "DYNAMIC", "temporal_type": "DYNAMIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
}, },
{ {
"statement": "Sarah Chen 画了一些花朵。", "statement": "用户画了一些花朵。",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "DYNAMIC", "temporal_type": "DYNAMIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
@@ -157,13 +159,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料与阿拉伯树胶等粘合
"relevance": "IRRELEVANT" "relevance": "IRRELEVANT"
}, },
{ {
"statement": "Sarah Chen 认为她的水彩画中的色彩组合可以改进。", "statement": "用户认为她的水彩画中的色彩组合可以改进。",
"statement_type": "OPINION", "statement_type": "OPINION",
"temporal_type": "STATIC", "temporal_type": "STATIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
}, },
{ {
"statement": "Sarah Chen 真的很喜欢玫瑰和百合。", "statement": "用户真的很喜欢玫瑰和百合。",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "STATIC", "temporal_type": "STATIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
@@ -186,13 +188,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合
示例输出: { 示例输出: {
"statements": [ "statements": [
{ {
"statement": "张曼婷最近在尝试水彩画。", "statement": "用户最近在尝试水彩画。",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "DYNAMIC", "temporal_type": "DYNAMIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
}, },
{ {
"statement": "张曼婷画了一些花朵。", "statement": "用户画了一些花朵。",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "DYNAMIC", "temporal_type": "DYNAMIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
@@ -204,13 +206,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合
"relevance": "IRRELEVANT" "relevance": "IRRELEVANT"
}, },
{ {
"statement": "张曼婷觉得水彩画的色彩搭配还有提升的空间。", "statement": "用户觉得水彩画的色彩搭配还有提升的空间。",
"statement_type": "OPINION", "statement_type": "OPINION",
"temporal_type": "STATIC", "temporal_type": "STATIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
}, },
{ {
"statement": "张曼婷很喜欢玫瑰和百合。", "statement": "用户很喜欢玫瑰和百合。",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "STATIC", "temporal_type": "STATIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
@@ -233,13 +235,13 @@ User: "I think the color combinations could use some improvement, but I really l
Example Output: { Example Output: {
"statements": [ "statements": [
{ {
"statement": "Sarah Chen has been trying watercolor painting recently.", "statement": "用户 has been trying watercolor painting recently.",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "DYNAMIC", "temporal_type": "DYNAMIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
}, },
{ {
"statement": "Sarah Chen painted some flowers.", "statement": "用户 painted some flowers.",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "DYNAMIC", "temporal_type": "DYNAMIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
@@ -251,13 +253,13 @@ Example Output: {
"relevance": "IRRELEVANT" "relevance": "IRRELEVANT"
}, },
{ {
"statement": "Sarah Chen thinks the color combinations in her watercolor paintings could use some improvement.", "statement": "用户 thinks the color combinations in her watercolor paintings could use some improvement.",
"statement_type": "OPINION", "statement_type": "OPINION",
"temporal_type": "STATIC", "temporal_type": "STATIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
}, },
{ {
"statement": "Sarah Chen really likes roses and lilies.", "statement": "用户 really likes roses and lilies.",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "STATIC", "temporal_type": "STATIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
@@ -280,13 +282,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合
Example Output: { Example Output: {
"statements": [ "statements": [
{ {
"statement": "张曼婷最近在尝试水彩画。", "statement": "用户最近在尝试水彩画。",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "DYNAMIC", "temporal_type": "DYNAMIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
}, },
{ {
"statement": "张曼婷画了一些花朵。", "statement": "用户画了一些花朵。",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "DYNAMIC", "temporal_type": "DYNAMIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
@@ -298,13 +300,13 @@ Example Output: {
"relevance": "IRRELEVANT" "relevance": "IRRELEVANT"
}, },
{ {
"statement": "张曼婷觉得水彩画的色彩搭配还有提升的空间。", "statement": "用户觉得水彩画的色彩搭配还有提升的空间。",
"statement_type": "OPINION", "statement_type": "OPINION",
"temporal_type": "STATIC", "temporal_type": "STATIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"
}, },
{ {
"statement": "张曼婷很喜欢玫瑰和百合。", "statement": "用户很喜欢玫瑰和百合。",
"statement_type": "FACT", "statement_type": "FACT",
"temporal_type": "STATIC", "temporal_type": "STATIC",
"relevance": "RELEVANT" "relevance": "RELEVANT"

View File

@@ -406,4 +406,12 @@ Output:
- **⚠️ ALIASES ORDER: preserve temporal order of appearance** - **⚠️ ALIASES ORDER: preserve temporal order of appearance**
- **🚨 MANDATORY FIELD: EVERY entity MUST include "aliases" field, even if empty array []** - **🚨 MANDATORY FIELD: EVERY entity MUST include "aliases" field, even if empty array []**
**Output JSON structure:**
```json
{
"triplets": [...],
"entities": [...]
}
```
{{ json_schema }} {{ json_schema }}

View File

@@ -0,0 +1,135 @@
===Task===
Extract user metadata from the following conversation statements spoken by the user.
{% if language == "zh" %}
**"三度原则"判断标准:**
- 复用度:该信息是否会被多个功能模块使用?
- 约束度:该信息是否会影响系统行为?
- 时效性:该信息是长期稳定的还是临时的?仅提取长期稳定信息。
**提取规则:**
- **只提取关于"用户本人"的画像信息**,忽略用户提到的第三方人物(如朋友、同事、家人)的信息
- 仅提取文本中明确提到的信息,不要推测
- 如果文本中没有可提取的用户画像信息,返回空的 user_metadata 对象
- **输出语言必须与输入文本的语言一致**(输入中文则输出中文值,输入英文则输出英文值)
{% if existing_metadata %}
**重要:合并已有元数据**
下方提供了数据库中已有的用户元数据。请结合用户最新发言,输出**合并后的完整元数据**
- 如果用户明确否定了已有信息(如"我不再教高中物理了"),在输出中**移除**该信息
- 如果用户提到了新信息,**添加**到对应字段中
- 如果已有信息未被用户否定,**保留**在输出中
- 标量字段(如 role、domain如果用户提到了新值用新值替换否则保留已有值
- 最终输出应该是完整的、合并后的元数据,不是增量
{% endif %}
**字段说明:**
- profile.role用户的职业或角色如 教师、医生、后端工程师
- profile.domain用户所在领域如 教育、医疗、软件开发
- profile.expertise用户擅长的技能或工具通用不限于编程如 Python、心理咨询、高中物理
- profile.interests用户主动表达兴趣的话题或领域标签
- behavioral_hints.learning_stage学习阶段初学者/中级/高级)
- behavioral_hints.preferred_depth偏好深度概览/技术细节/深入探讨)
- behavioral_hints.tone_preference语气偏好轻松随意/专业简洁/学术严谨)
- knowledge_tags用户涉及的知识领域标签
**用户别名变更(增量模式):**
- **aliases_to_add**:本次新发现的用户别名,包括:
* 用户主动自我介绍:如"我叫张三"、"我的名字是XX"、"我的网名是XX"
* 他人对用户的称呼:如"同事叫我陈哥"、"大家叫我小张"、"领导叫我老陈"
* 只提取原文中逐字出现的名字,严禁推测或创造
* 禁止提取:用户给 AI 取的名字、第三方人物自身的名字、"用户"/"我" 等占位词
* 如果没有新别名,返回空数组 `[]`
- **aliases_to_remove**:用户明确否认的别名,包括:
* 用户说"我不叫XX了"、"别叫我XX"、"我改名了不叫XX" → 将 XX 放入此数组
* **严格限制**:只将用户原文中**逐字提到**的被否认名字放入,不要推断关联的其他别名
* 例如:用户说"我不叫陈小刀了" → 只移除"陈小刀",不要移除"陈哥"、"老陈"等未被提及的别名
* 如果没有要移除的别名,返回空数组 `[]`
{% if existing_aliases %}
- 已有别名:{{ existing_aliases | tojson }}(仅供参考,不需要在输出中重复)
{% endif %}
{% else %}
**"Three-Degree Principle" criteria:**
- Reusability: Will this information be used by multiple functional modules?
- Constraint: Will this information affect system behavior?
- Timeliness: Is this information long-term stable or temporary? Only extract long-term stable information.
**Extraction rules:**
- **Only extract profile information about the user themselves**, ignore information about third parties (friends, colleagues, family) mentioned by the user
- Only extract information explicitly mentioned in the text, do not speculate
- If no user profile information can be extracted, return an empty user_metadata object
- **Output language must match the input text language**
{% if existing_metadata %}
**Important: Merge with existing metadata**
Existing user metadata from the database is provided below. Combine with the user's latest statements to output the **complete merged metadata**:
- If the user explicitly negates existing info (e.g. "I no longer teach high school physics"), **remove** it from output
- If the user mentions new info, **add** it to the corresponding field
- If existing info is not negated by the user, **keep** it in the output
- Scalar fields (e.g. role, domain): replace with new value if user mentions one; otherwise keep existing
- The final output should be the complete, merged metadata — not an incremental update
{% endif %}
**Field descriptions:**
- profile.role: User's occupation or role, e.g. teacher, doctor, software engineer
- profile.domain: User's domain, e.g. education, healthcare, software development
- profile.expertise: User's skills or tools (general, not limited to programming)
- profile.interests: Topics or domain tags the user actively expressed interest in
- behavioral_hints.learning_stage: Learning stage (beginner/intermediate/advanced)
- behavioral_hints.preferred_depth: Preferred depth (overview/detailed/deep dive)
- behavioral_hints.tone_preference: Tone preference (casual/professional/academic)
- knowledge_tags: Knowledge domain tags related to the user
**User alias changes (incremental mode):**
- **aliases_to_add**: Newly discovered user aliases from this conversation, including:
* User self-introductions: e.g. "I'm John", "My name is XX", "My username is XX"
* How others address the user: e.g. "My colleagues call me Johnny", "People call me Mike"
* Only extract names that appear VERBATIM in the text — never infer or fabricate
* Do NOT extract: names the user gives to the AI, third-party people's own names, placeholder words like "User"/"I"
* If no new aliases, return empty array `[]`
- **aliases_to_remove**: Aliases the user explicitly denies, including:
* User says "Don't call me XX anymore", "I'm not called XX", "I changed my name from XX" → put XX in this array
* **Strict rule**: Only include the exact name the user **verbatim mentions** as denied. Do NOT infer or remove related aliases
* Example: User says "I'm not called John anymore" → only remove "John", do NOT remove "Johnny", "J" or other related aliases not mentioned
* If no aliases to remove, return empty array `[]`
{% if existing_aliases %}
- Existing aliases: {{ existing_aliases | tojson }} (for reference only, do not repeat in output)
{% endif %}
{% endif %}
===User Statements===
{% for stmt in statements %}
- {{ stmt }}
{% endfor %}
{% if existing_metadata %}
===Existing User Metadata===
```json
{{ existing_metadata | tojson }}
```
{% endif %}
===Output Format===
Return a JSON object with the following structure:
```json
{
"user_metadata": {
"profile": {
"role": "",
"domain": "",
"expertise": [],
"interests": []
},
"behavioral_hints": {
"learning_stage": "",
"preferred_depth": "",
"tone_preference": ""
},
"knowledge_tags": []
},
"aliases_to_add": [],
"aliases_to_remove": []
}
```
{{ json_schema }}

View File

@@ -23,6 +23,7 @@ SET s += {
end_user_id: statement.end_user_id, end_user_id: statement.end_user_id,
stmt_type: statement.stmt_type, stmt_type: statement.stmt_type,
statement: statement.statement, statement: statement.statement,
speaker: statement.speaker,
emotion_intensity: statement.emotion_intensity, emotion_intensity: statement.emotion_intensity,
emotion_target: statement.emotion_target, emotion_target: statement.emotion_target,
emotion_subject: statement.emotion_subject, emotion_subject: statement.emotion_subject,
@@ -56,6 +57,7 @@ SET c += {
expired_at: chunk.expired_at, expired_at: chunk.expired_at,
dialog_id: chunk.dialog_id, dialog_id: chunk.dialog_id,
content: chunk.content, content: chunk.content,
speaker: chunk.speaker,
chunk_embedding: chunk.chunk_embedding, chunk_embedding: chunk.chunk_embedding,
sequence_number: chunk.sequence_number, sequence_number: chunk.sequence_number,
start_index: chunk.start_index, start_index: chunk.start_index,
@@ -283,7 +285,7 @@ LIMIT $limit
""" """
SEARCH_STATEMENTS_BY_KEYWORD = """ SEARCH_STATEMENTS_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("statementsFulltext", $q) YIELD node AS s, score CALL db.index.fulltext.queryNodes("statementsFulltext", $query) YIELD node AS s, score
WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id) WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s) OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity) OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity)
@@ -307,7 +309,7 @@ LIMIT $limit
""" """
# 查询实体名称包含指定字符串的实体 # 查询实体名称包含指定字符串的实体
SEARCH_ENTITIES_BY_NAME = """ SEARCH_ENTITIES_BY_NAME = """
CALL db.index.fulltext.queryNodes("entitiesFulltext", $q) YIELD node AS e, score CALL db.index.fulltext.queryNodes("entitiesFulltext", $query) YIELD node AS e, score
WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id) WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e) OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s) OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
@@ -337,21 +339,21 @@ LIMIT $limit
""" """
SEARCH_ENTITIES_BY_NAME_OR_ALIAS = """ SEARCH_ENTITIES_BY_NAME_OR_ALIAS = """
CALL db.index.fulltext.queryNodes("entitiesFulltext", $q) YIELD node AS e, score CALL db.index.fulltext.queryNodes("entitiesFulltext", $query) YIELD node AS e, score
WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id) WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
WITH e, score WITH e, score
WITH collect({entity: e, score: score}) AS fulltextResults With collect({entity: e, score: score}) AS fulltextResults
OPTIONAL MATCH (ae:ExtractedEntity) OPTIONAL MATCH (ae:ExtractedEntity)
WHERE ($end_user_id IS NULL OR ae.end_user_id = $end_user_id) WHERE ($end_user_id IS NULL OR ae.end_user_id = $end_user_id)
AND ae.aliases IS NOT NULL AND ae.aliases IS NOT NULL
AND ANY(alias IN ae.aliases WHERE toLower(alias) CONTAINS toLower($q)) AND ANY(alias IN ae.aliases WHERE toLower(alias) CONTAINS toLower($query))
WITH fulltextResults, collect(ae) AS aliasEntities WITH fulltextResults, collect(ae) AS aliasEntities
UNWIND (fulltextResults + [x IN aliasEntities | {entity: x, score: UNWIND (fulltextResults + [x IN aliasEntities | {entity: x, score:
CASE CASE
WHEN ANY(alias IN x.aliases WHERE toLower(alias) = toLower($q)) THEN 1.0 WHEN ANY(alias IN x.aliases WHERE toLower(alias) = toLower($query)) THEN 1.0
WHEN ANY(alias IN x.aliases WHERE toLower(alias) STARTS WITH toLower($q)) THEN 0.9 WHEN ANY(alias IN x.aliases WHERE toLower(alias) STARTS WITH toLower($query)) THEN 0.9
ELSE 0.8 ELSE 0.8
END END
}]) AS row }]) AS row
@@ -384,7 +386,7 @@ LIMIT $limit
SEARCH_CHUNKS_BY_CONTENT = """ SEARCH_CHUNKS_BY_CONTENT = """
CALL db.index.fulltext.queryNodes("chunksFulltext", $q) YIELD node AS c, score CALL db.index.fulltext.queryNodes("chunksFulltext", $query) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id) WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
OPTIONAL MATCH (c)-[:CONTAINS]->(s:Statement) OPTIONAL MATCH (c)-[:CONTAINS]->(s:Statement)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity) OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity)
@@ -501,7 +503,7 @@ LIMIT $limit
""" """
SEARCH_STATEMENTS_BY_KEYWORD_TEMPORAL = """ SEARCH_STATEMENTS_BY_KEYWORD_TEMPORAL = """
CALL db.index.fulltext.queryNodes("statementsFulltext", $q) YIELD node AS s, score CALL db.index.fulltext.queryNodes("statementsFulltext", $query) YIELD node AS s, score
WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id) WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id)
AND ((($start_date IS NULL OR (s.created_at IS NOT NULL AND datetime(s.created_at) >= datetime($start_date))) AND ((($start_date IS NULL OR (s.created_at IS NOT NULL AND datetime(s.created_at) >= datetime($start_date)))
AND ($end_date IS NULL OR (s.created_at IS NOT NULL AND datetime(s.created_at) <= datetime($end_date)))) AND ($end_date IS NULL OR (s.created_at IS NOT NULL AND datetime(s.created_at) <= datetime($end_date))))
@@ -677,7 +679,7 @@ SET n.invalid_at = $new_invalid_at
# MemorySummary keyword search using fulltext index # MemorySummary keyword search using fulltext index
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD = """ SEARCH_MEMORY_SUMMARIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("summariesFulltext", $q) YIELD node AS m, score CALL db.index.fulltext.queryNodes("summariesFulltext", $query) YIELD node AS m, score
WHERE ($end_user_id IS NULL OR m.end_user_id = $end_user_id) WHERE ($end_user_id IS NULL OR m.end_user_id = $end_user_id)
OPTIONAL MATCH (m)-[:DERIVED_FROM_STATEMENT]->(s:Statement) OPTIONAL MATCH (m)-[:DERIVED_FROM_STATEMENT]->(s:Statement)
RETURN m.id AS id, RETURN m.id AS id,
@@ -1363,7 +1365,7 @@ RETURN c.community_id AS community_id
# Community keyword search: matches name or summary via fulltext index # Community keyword search: matches name or summary via fulltext index
SEARCH_COMMUNITIES_BY_KEYWORD = """ SEARCH_COMMUNITIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("communitiesFulltext", $q) YIELD node AS c, score CALL db.index.fulltext.queryNodes("communitiesFulltext", $query) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id) WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
RETURN c.community_id AS id, RETURN c.community_id AS id,
c.name AS name, c.name AS name,
@@ -1451,7 +1453,7 @@ RETURN elementId(r) AS uuid
""" """
SEARCH_PERCEPTUAL_BY_KEYWORD = """ SEARCH_PERCEPTUAL_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("perceptualFulltext", $q) YIELD node AS p, score CALL db.index.fulltext.queryNodes("perceptualFulltext", $query) YIELD node AS p, score
WHERE p.end_user_id = $end_user_id WHERE p.end_user_id = $end_user_id
RETURN p.id AS id, RETURN p.id AS id,
p.end_user_id AS end_user_id, p.end_user_id AS end_user_id,

View File

@@ -186,6 +186,58 @@ async def save_dialog_and_statements_to_neo4j(
Returns: Returns:
bool: True if successful, False otherwise bool: True if successful, False otherwise
""" """
# TODO 需要在去重消歧节阶段,做以下逻辑的处理
# 预处理:对特殊实体("用户"、"AI助手")复用 Neo4j 中已有节点的 ID
# 确保同一个 end_user_id 下只有一个"用户"节点和一个"AI助手"节点。
if entity_nodes:
_SPECIAL_NAMES = {"用户", "", "user", "i", "ai助手", "助手", "ai assistant", "assistant"}
end_user_id = entity_nodes[0].end_user_id if entity_nodes else None
if end_user_id:
try:
# 查询已有的特殊实体
cypher = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id AND toLower(e.name) IN $names
RETURN e.id AS id, e.name AS name
"""
existing = await connector.execute_query(
cypher,
end_user_id=end_user_id,
names=list(_SPECIAL_NAMES),
)
# 建立 name(lower) → existing_id 映射
existing_id_map = {}
for record in (existing or []):
name_lower = (record.get("name") or "").strip().lower()
if name_lower and record.get("id"):
existing_id_map[name_lower] = record["id"]
if existing_id_map:
# 替换新实体的 ID 为已有 ID同时更新所有引用该 ID 的边
for ent in entity_nodes:
name_lower = (ent.name or "").strip().lower()
if name_lower in existing_id_map:
old_id = ent.id
new_id = existing_id_map[name_lower]
if old_id != new_id:
ent.id = new_id
# 更新 statement_entity_edges 中的引用
for edge in statement_entity_edges:
if edge.target == old_id:
edge.target = new_id
if edge.source == old_id:
edge.source = new_id
# 更新 entity_edges 中的引用
for edge in entity_edges:
if edge.source == old_id:
edge.source = new_id
if edge.target == old_id:
edge.target = new_id
logger.info(
f"特殊实体 '{ent.name}' ID 复用: {old_id[:8]}... → {new_id[:8]}..."
)
except Exception as e:
logger.warning(f"特殊实体 ID 复用查询失败(不影响写入): {e}")
# 定义事务函数,将所有写操作放在一个事务中 # 定义事务函数,将所有写操作放在一个事务中
async def _save_all_in_transaction(tx): async def _save_all_in_transaction(tx):

View File

@@ -2,6 +2,7 @@ import asyncio
import logging import logging
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from app.core.memory.utils.data.text_utils import escape_lucene_query
from app.repositories.neo4j.cypher_queries import ( from app.repositories.neo4j.cypher_queries import (
CHUNK_EMBEDDING_SEARCH, CHUNK_EMBEDDING_SEARCH,
COMMUNITY_EMBEDDING_SEARCH, COMMUNITY_EMBEDDING_SEARCH,
@@ -87,7 +88,7 @@ async def _update_activation_values_batch(
unique_node_ids.append(node_id) unique_node_ids.append(node_id)
if not unique_node_ids: if not unique_node_ids:
logger.warning(f"批量更新激活值没有有效的节点ID") logger.warning("批量更新激活值没有有效的节点ID")
return nodes return nodes
# 记录去重信息(仅针对具有有效 ID 的节点) # 记录去重信息(仅针对具有有效 ID 的节点)
@@ -223,7 +224,7 @@ async def _update_search_results_activation(
async def search_graph( async def search_graph(
connector: Neo4jConnector, connector: Neo4jConnector,
q: str, query: str,
end_user_id: Optional[str] = None, end_user_id: Optional[str] = None,
limit: int = 50, limit: int = 50,
include: List[str] = None, include: List[str] = None,
@@ -234,14 +235,14 @@ async def search_graph(
OPTIMIZED: Runs all queries in parallel using asyncio.gather() OPTIMIZED: Runs all queries in parallel using asyncio.gather()
INTEGRATED: Updates activation values for knowledge nodes before returning results INTEGRATED: Updates activation values for knowledge nodes before returning results
- Statements: matches s.statement CONTAINS q - Statements: matches s.statement CONTAINS query
- Entities: matches e.name CONTAINS q - Entities: matches e.name CONTAINS query
- Chunks: matches s.content CONTAINS q (from Statement nodes) - Chunks: matches s.content CONTAINS query (from Statement nodes)
- Summaries: matches ms.content CONTAINS q - Summaries: matches ms.content CONTAINS query
Args: Args:
connector: Neo4j connector connector: Neo4j connector
q: Query text query: Query text for full-text search
end_user_id: Optional group filter end_user_id: Optional group filter
limit: Max results per category limit: Max results per category
include: List of categories to search (default: all) include: List of categories to search (default: all)
@@ -252,6 +253,9 @@ async def search_graph(
if include is None: if include is None:
include = ["statements", "chunks", "entities", "summaries"] include = ["statements", "chunks", "entities", "summaries"]
# Escape Lucene special characters to prevent query parse errors
escaped_query = escape_lucene_query(query)
# Prepare tasks for parallel execution # Prepare tasks for parallel execution
tasks = [] tasks = []
task_keys = [] task_keys = []
@@ -260,7 +264,7 @@ async def search_graph(
tasks.append(connector.execute_query( tasks.append(connector.execute_query(
SEARCH_STATEMENTS_BY_KEYWORD, SEARCH_STATEMENTS_BY_KEYWORD,
json_format=True, json_format=True,
q=q, query=escaped_query,
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit, limit=limit,
)) ))
@@ -270,7 +274,7 @@ async def search_graph(
tasks.append(connector.execute_query( tasks.append(connector.execute_query(
SEARCH_ENTITIES_BY_NAME_OR_ALIAS, SEARCH_ENTITIES_BY_NAME_OR_ALIAS,
json_format=True, json_format=True,
q=q, query=escaped_query,
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit, limit=limit,
)) ))
@@ -280,7 +284,7 @@ async def search_graph(
tasks.append(connector.execute_query( tasks.append(connector.execute_query(
SEARCH_CHUNKS_BY_CONTENT, SEARCH_CHUNKS_BY_CONTENT,
json_format=True, json_format=True,
q=q, query=escaped_query,
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit, limit=limit,
)) ))
@@ -290,7 +294,7 @@ async def search_graph(
tasks.append(connector.execute_query( tasks.append(connector.execute_query(
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD, SEARCH_MEMORY_SUMMARIES_BY_KEYWORD,
json_format=True, json_format=True,
q=q, query=escaped_query,
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit, limit=limit,
)) ))
@@ -300,7 +304,7 @@ async def search_graph(
tasks.append(connector.execute_query( tasks.append(connector.execute_query(
SEARCH_COMMUNITIES_BY_KEYWORD, SEARCH_COMMUNITIES_BY_KEYWORD,
json_format=True, json_format=True,
q=q, query=escaped_query,
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit, limit=limit,
)) ))
@@ -482,7 +486,7 @@ async def search_graph_by_embedding(
update_time = time.time() - update_start update_time = time.time() - update_start
logger.info(f"[PERF] Activation value updates took: {update_time:.4f}s") logger.info(f"[PERF] Activation value updates took: {update_time:.4f}s")
else: else:
logger.info(f"[PERF] Skipping activation updates (only summaries)") logger.info("[PERF] Skipping activation updates (only summaries)")
return results return results
@@ -520,7 +524,7 @@ async def get_dedup_candidates_for_entities( # 适配新版查询:使用全
# 全文索引按名称检索(包含 CONTAINS 语义) # 全文索引按名称检索(包含 CONTAINS 语义)
rows = await connector.execute_query( rows = await connector.execute_query(
SEARCH_ENTITIES_BY_NAME, SEARCH_ENTITIES_BY_NAME,
q=name, query=escape_lucene_query(name),
end_user_id=end_user_id, end_user_id=end_user_id,
limit=100, limit=100,
) )
@@ -544,7 +548,7 @@ async def get_dedup_candidates_for_entities( # 适配新版查询:使用全
try: try:
rows = await connector.execute_query( rows = await connector.execute_query(
SEARCH_ENTITIES_BY_NAME, SEARCH_ENTITIES_BY_NAME,
q=name.lower(), query=escape_lucene_query(name.lower()),
end_user_id=end_user_id, end_user_id=end_user_id,
limit=100, limit=100,
) )
@@ -593,11 +597,12 @@ async def search_graph_by_keyword_temporal(
- Returns up to 'limit' statements - Returns up to 'limit' statements
""" """
if not query_text: if not query_text:
logger.warning(f"query_text不能为空") logger.warning("query_text不能为空")
return {"statements": []} return {"statements": []}
escaped_query = escape_lucene_query(query_text)
statements = await connector.execute_query( statements = await connector.execute_query(
SEARCH_STATEMENTS_BY_KEYWORD_TEMPORAL, SEARCH_STATEMENTS_BY_KEYWORD_TEMPORAL,
q=query_text, query=escaped_query,
end_user_id=end_user_id, end_user_id=end_user_id,
start_date=start_date, start_date=start_date,
end_date=end_date, end_date=end_date,
@@ -671,7 +676,7 @@ async def search_graph_by_dialog_id(
- Returns up to 'limit' dialogues - Returns up to 'limit' dialogues
""" """
if not dialog_id: if not dialog_id:
logger.warning(f"dialog_id不能为空") logger.warning("dialog_id不能为空")
return {"dialogues": []} return {"dialogues": []}
dialogues = await connector.execute_query( dialogues = await connector.execute_query(
@@ -690,7 +695,7 @@ async def search_graph_by_chunk_id(
limit: int = 1, limit: int = 1,
) -> Dict[str, List[Dict[str, Any]]]: ) -> Dict[str, List[Dict[str, Any]]]:
if not chunk_id: if not chunk_id:
logger.warning(f"chunk_id不能为空") logger.warning("chunk_id不能为空")
return {"chunks": []} return {"chunks": []}
chunks = await connector.execute_query( chunks = await connector.execute_query(
SEARCH_CHUNK_BY_CHUNK_ID, SEARCH_CHUNK_BY_CHUNK_ID,
@@ -968,7 +973,7 @@ async def search_graph_l_valid_at(
async def search_perceptual( async def search_perceptual(
connector: Neo4jConnector, connector: Neo4jConnector,
q: str, query: str,
end_user_id: Optional[str] = None, end_user_id: Optional[str] = None,
limit: int = 10, limit: int = 10,
) -> Dict[str, List[Dict[str, Any]]]: ) -> Dict[str, List[Dict[str, Any]]]:
@@ -979,7 +984,7 @@ async def search_perceptual(
Args: Args:
connector: Neo4j connector connector: Neo4j connector
q: Query text query: Query text for full-text search
end_user_id: Optional user filter end_user_id: Optional user filter
limit: Max results limit: Max results
@@ -989,7 +994,7 @@ async def search_perceptual(
try: try:
perceptuals = await connector.execute_query( perceptuals = await connector.execute_query(
SEARCH_PERCEPTUAL_BY_KEYWORD, SEARCH_PERCEPTUAL_BY_KEYWORD,
q=q, query=escape_lucene_query(query),
end_user_id=end_user_id, end_user_id=end_user_id,
limit=limit, limit=limit,
) )

View File

@@ -1,4 +1,5 @@
import asyncio import asyncio
import json
import os import os
import re import re
import shutil import shutil
@@ -1001,7 +1002,7 @@ def sync_knowledge_for_kb(kb_id: uuid.UUID):
except Exception as e: except Exception as e:
print(f"\n\nError during fetch feishu: {e}") print(f"\n\nError during fetch feishu: {e}")
case _: # General case _: # General
print(f"General: No synchronization needed\n") print("General: No synchronization needed\n")
result = f"sync knowledge '{db_knowledge.name}' processed successfully." result = f"sync knowledge '{db_knowledge.name}' processed successfully."
return result return result
@@ -1510,6 +1511,7 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
"status": "SUCCESS", "status": "SUCCESS",
"total_num": total_num, "total_num": total_num,
"end_user_count": len(end_users), "end_user_count": len(end_users),
"end_user_details": end_user_details,
"memory_increment_id": str(memory_increment.id), "memory_increment_id": str(memory_increment.id),
"created_at": memory_increment.created_at.isoformat(), "created_at": memory_increment.created_at.isoformat(),
}) })
@@ -2602,7 +2604,6 @@ def init_interest_distribution_for_users(self, end_user_ids: List[str]) -> Dict[
service = MemoryAgentService() service = MemoryAgentService()
with get_db_context() as db:
for end_user_id in end_user_ids: for end_user_id in end_user_ids:
# 存在性检查:缓存有数据则跳过 # 存在性检查:缓存有数据则跳过
cached = await InterestMemoryCache.get_interest_distribution( cached = await InterestMemoryCache.get_interest_distribution(
@@ -2914,4 +2915,270 @@ def init_community_clustering_for_users(self, end_user_ids: List[str], workspace
} }
# ─── User Metadata Extraction Task ───────────────────────────────────────────
def _update_timestamps(existing: dict, new: dict, updated_at: dict, now: str, prefix: str = "") -> None:
"""对比新旧元数据,更新变更字段的 _updated_at 时间戳。"""
for key, new_val in new.items():
if key == "_updated_at":
continue
path = f"{prefix}.{key}" if prefix else key
old_val = existing.get(key)
if isinstance(new_val, dict) and isinstance(old_val, dict):
_update_timestamps(old_val, new_val, updated_at, now, prefix=path)
elif old_val != new_val:
updated_at[path] = now
@celery_app.task(
bind=True,
name='app.tasks.extract_user_metadata',
ignore_result=False,
max_retries=0,
acks_late=True,
time_limit=300,
soft_time_limit=240,
)
def extract_user_metadata_task(
self,
end_user_id: str,
statements: List[str],
config_id: Optional[str] = None,
language: str = "zh",
) -> Dict[str, Any]:
"""异步提取用户元数据并写入数据库。
在去重消歧完成后由编排器触发,使用独立 LLM 调用提取元数据。
LLM 配置优先使用 config_id 对应的应用配置,失败时回退到工作空间默认配置。
Args:
end_user_id: 终端用户 ID
statements: 用户相关的 statement 文本列表
config_id: 应用配置 ID可选
language: 语言类型 ("zh" 中文, "en" 英文)
Returns:
包含任务执行结果的字典
"""
start_time = time.time()
logger.info(
f"[CELERY METADATA] Starting metadata extraction - end_user_id={end_user_id}, "
f"statements_count={len(statements)}, config_id={config_id}, language={language}"
)
async def _run() -> Dict[str, Any]:
from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import MetadataExtractor
from app.repositories.end_user_info_repository import EndUserInfoRepository
from app.repositories.end_user_repository import EndUserRepository
from app.services.memory_config_service import MemoryConfigService
# 1. 获取 LLM 配置(应用配置 → 工作空间配置兜底)并创建 LLM client
with get_db_context() as db:
end_user_uuid = uuid.UUID(end_user_id)
# 获取 workspace_id from end_user
end_user = EndUserRepository(db).get_by_id(end_user_uuid)
if not end_user:
return {"status": "FAILURE", "error": f"End user not found: {end_user_id}"}
workspace_id = end_user.workspace_id
config_service = MemoryConfigService(db)
memory_config = config_service.get_config_with_fallback(
memory_config_id=uuid.UUID(config_id) if config_id else None,
workspace_id=workspace_id,
)
if not memory_config:
return {"status": "FAILURE", "error": "No LLM config available (app + workspace fallback failed)"}
# 2. 创建 LLM client
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
factory = MemoryClientFactory(db)
if not memory_config.llm_id:
return {"status": "FAILURE", "error": "Memory config has no LLM model configured"}
llm_client = factory.get_llm_client(memory_config.llm_id)
# 2.5 读取已有元数据和别名,传给 extractor 作为上下文
existing_metadata = None
existing_aliases = None
try:
info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid)
if info:
if info.meta_data:
existing_metadata = info.meta_data
existing_aliases = info.aliases if info.aliases else []
logger.info(f"[CELERY METADATA] 已读取已有元数据和别名aliases={existing_aliases}")
except Exception as e:
logger.warning(f"[CELERY METADATA] 读取已有数据失败(继续无上下文提取): {e}")
# 3. 提取元数据和别名(传入已有数据作为上下文)
extractor = MetadataExtractor(llm_client=llm_client, language=language)
extract_result = await extractor.extract_metadata(
statements,
existing_metadata=existing_metadata,
existing_aliases=existing_aliases,
)
if not extract_result:
logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}")
return {"status": "SUCCESS", "result": "no_metadata_extracted"}
user_metadata, aliases_to_add, aliases_to_remove = extract_result
logger.info(f"[CELERY METADATA] LLM 别名新增: {aliases_to_add}, 移除: {aliases_to_remove}")
# 4. 清洗元数据、覆盖写入元数据和别名
def clean_metadata(raw: dict) -> dict:
"""递归移除空字符串、空列表、空字典。"""
result = {}
for k, v in raw.items():
if v == "" or v == []:
continue
if isinstance(v, dict):
cleaned = clean_metadata(v)
if cleaned:
result[k] = cleaned
else:
result[k] = v
return result
raw_dict = user_metadata.model_dump(exclude_none=True) if user_metadata else {}
logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}")
cleaned = clean_metadata(raw_dict) if raw_dict else {}
logger.info(f"[CELERY METADATA] 清洗后元数据: {json.dumps(cleaned, ensure_ascii=False)}")
from datetime import datetime as dt, timezone as tz
now = dt.now(tz.utc).isoformat()
# 过滤别名中的占位名称,执行增量增删
_PLACEHOLDER_NAMES = {"用户", "", "user", "i"}
def _filter_aliases(aliases_list):
seen = set()
result = []
for a in aliases_list:
a_stripped = a.strip()
if a_stripped and a_stripped.lower() not in _PLACEHOLDER_NAMES and a_stripped.lower() not in seen:
result.append(a_stripped)
seen.add(a_stripped.lower())
return result
filtered_add = _filter_aliases(aliases_to_add)
filtered_remove = _filter_aliases(aliases_to_remove)
remove_lower = {a.lower() for a in filtered_remove}
with get_db_context() as db:
end_user_uuid = uuid.UUID(end_user_id)
info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid)
end_user = EndUserRepository(db).get_by_id(end_user_uuid)
if info:
# 元数据覆盖写入
if cleaned:
existing_meta = info.meta_data if info.meta_data else {}
updated_at = dict(existing_meta.get("_updated_at", {}))
_update_timestamps(existing_meta, cleaned, updated_at, now)
final = dict(cleaned)
final["_updated_at"] = updated_at
info.meta_data = final
logger.info("[CELERY METADATA] 覆盖写入元数据")
# 别名增量增删:(已有 - remove) + add
old_aliases = info.aliases if info.aliases else []
# 先移除
merged = [a for a in old_aliases if a.strip().lower() not in remove_lower]
# 再追加(去重)
existing_lower = {a.strip().lower() for a in merged}
for a in filtered_add:
if a.lower() not in existing_lower:
merged.append(a)
existing_lower.add(a.lower())
if merged != old_aliases:
info.aliases = merged
# other_name 更新逻辑
if merged and (
not info.other_name
or info.other_name.strip().lower() in _PLACEHOLDER_NAMES
or info.other_name.strip().lower() in remove_lower
):
info.other_name = merged[0]
if end_user and merged and (
not end_user.other_name
or end_user.other_name.strip().lower() in _PLACEHOLDER_NAMES
or end_user.other_name.strip().lower() in remove_lower
):
end_user.other_name = merged[0]
logger.info(
f"[CELERY METADATA] 别名增量更新: {old_aliases} - {filtered_remove} + {filtered_add}{merged}"
)
else:
# 没有 end_user_info 记录,创建一条
from app.models.end_user_info_model import EndUserInfo
initial_aliases = filtered_add # 新记录只有 add没有 remove
first_alias = initial_aliases[0] if initial_aliases else ""
if first_alias or cleaned:
new_info = EndUserInfo(
end_user_id=end_user_uuid,
other_name=first_alias or "",
aliases=initial_aliases,
meta_data=cleaned if cleaned else None,
)
db.add(new_info)
if end_user and first_alias and (
not end_user.other_name or end_user.other_name.strip().lower() in _PLACEHOLDER_NAMES
):
end_user.other_name = first_alias
logger.info(f"[CELERY METADATA] 创建 end_user_info: other_name={first_alias}, aliases={initial_aliases}")
else:
return {"status": "SUCCESS", "result": "no_data_to_write"}
db.commit()
# 同步 PgSQL aliases 到 Neo4j 用户实体PgSQL 为权威源)
final_aliases = info.aliases if info else initial_aliases
if final_aliases:
try:
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
neo4j_connector = Neo4jConnector()
cypher = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '', 'User', 'I']
SET e.aliases = $aliases
"""
await neo4j_connector.execute_query(
cypher, end_user_id=end_user_id, aliases=final_aliases
)
await neo4j_connector.close()
logger.info(f"[CELERY METADATA] Neo4j 用户实体 aliases 已同步: {final_aliases}")
except Exception as neo4j_err:
logger.warning(f"[CELERY METADATA] Neo4j aliases 同步失败(不影响主流程): {neo4j_err}")
return {"status": "SUCCESS", "result": "metadata_and_aliases_written"}
loop = None
try:
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
elapsed = time.time() - start_time
result["elapsed_time"] = elapsed
result["task_id"] = self.request.id
logger.info(f"[CELERY METADATA] Task completed - elapsed={elapsed:.2f}s, result={result.get('result')}")
return result
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"[CELERY METADATA] Task failed - elapsed={elapsed:.2f}s, error={e}", exc_info=True)
return {
"status": "FAILURE",
"error": str(e),
"elapsed_time": elapsed,
"task_id": self.request.id,
}
finally:
if loop:
_shutdown_loop_gracefully(loop)
# unused task # unused task