diff --git a/api/app/core/memory/models/metadata_models.py b/api/app/core/memory/models/metadata_models.py index f08d18ed..55c2359e 100644 --- a/api/app/core/memory/models/metadata_models.py +++ b/api/app/core/memory/models/metadata_models.py @@ -11,16 +11,22 @@ from pydantic import BaseModel, ConfigDict, Field class UserMetadataProfile(BaseModel): """用户画像信息""" - model_config = ConfigDict(extra='ignore') + + model_config = ConfigDict(extra="ignore") role: str = Field(default="", description="用户职业或角色") domain: str = Field(default="", description="用户所在领域") - expertise: List[str] = Field(default_factory=list, description="用户擅长的技能或工具") - interests: List[str] = Field(default_factory=list, description="用户关注的话题或领域标签") + expertise: List[str] = Field( + default_factory=list, description="用户擅长的技能或工具" + ) + interests: List[str] = Field( + default_factory=list, description="用户关注的话题或领域标签" + ) class UserMetadataBehavioralHints(BaseModel): """行为偏好""" - model_config = ConfigDict(extra='ignore') + + model_config = ConfigDict(extra="ignore") learning_stage: str = Field(default="", description="学习阶段") preferred_depth: str = Field(default="", description="偏好深度") tone_preference: str = Field(default="", description="语气偏好") @@ -28,21 +34,24 @@ class UserMetadataBehavioralHints(BaseModel): class UserMetadata(BaseModel): """用户元数据顶层结构""" - model_config = ConfigDict(extra='ignore') + + model_config = ConfigDict(extra="ignore") profile: UserMetadataProfile = Field(default_factory=UserMetadataProfile) - behavioral_hints: UserMetadataBehavioralHints = Field(default_factory=UserMetadataBehavioralHints) + behavioral_hints: UserMetadataBehavioralHints = Field( + default_factory=UserMetadataBehavioralHints + ) knowledge_tags: List[str] = Field(default_factory=list, description="知识标签") class MetadataExtractionResponse(BaseModel): """元数据提取 LLM 响应结构""" - model_config = ConfigDict(extra='ignore') + + model_config = ConfigDict(extra="ignore") user_metadata: UserMetadata = Field(default_factory=UserMetadata) aliases_to_add: List[str] = Field( default_factory=list, - description="本次新发现的用户别名(用户自我介绍或他人对用户的称呼)" + description="本次新发现的用户别名(用户自我介绍或他人对用户的称呼)", ) aliases_to_remove: List[str] = Field( - default_factory=list, - description="用户明确否认的别名(如'我不叫XX了')" + default_factory=list, description="用户明确否认的别名(如'我不叫XX了')" ) diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py index b8a36e44..5636dcb5 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py @@ -314,28 +314,48 @@ class ExtractionOrchestrator: # 步骤 7: 触发异步元数据和别名提取(仅正式模式) if not is_pilot_run: try: - from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import MetadataExtractor - metadata_extractor = MetadataExtractor(llm_client=self.llm_client, language=self.language) - user_statements = metadata_extractor.collect_user_related_statements( - entity_nodes, statement_nodes, - statement_entity_edges + from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import ( + MetadataExtractor, + ) + + metadata_extractor = MetadataExtractor( + llm_client=self.llm_client, language=self.language + ) + user_statements = ( + metadata_extractor.collect_user_related_statements( + entity_nodes, statement_nodes, statement_entity_edges + ) ) if user_statements: - end_user_id = dialog_data_list[0].end_user_id if dialog_data_list else None - config_id = dialog_data_list[0].config_id if dialog_data_list and hasattr(dialog_data_list[0], 'config_id') else None + end_user_id = ( + dialog_data_list[0].end_user_id + if dialog_data_list + else None + ) + config_id = ( + dialog_data_list[0].config_id + if dialog_data_list + and hasattr(dialog_data_list[0], "config_id") + else None + ) if end_user_id: from app.tasks import extract_user_metadata_task + extract_user_metadata_task.delay( end_user_id=str(end_user_id), statements=user_statements, config_id=str(config_id) if config_id else None, language=self.language, ) - logger.info(f"已触发异步元数据提取任务,共 {len(user_statements)} 条用户相关 statement") + logger.info( + f"已触发异步元数据提取任务,共 {len(user_statements)} 条用户相关 statement" + ) else: logger.info("未找到用户相关 statement,跳过元数据提取") except Exception as e: - logger.error(f"触发元数据提取任务失败(不影响主流程): {e}", exc_info=True) + logger.error( + f"触发元数据提取任务失败(不影响主流程): {e}", exc_info=True + ) # 别名同步已迁移到 Celery 元数据提取任务中,不再在此处执行 @@ -1501,9 +1521,6 @@ class ExtractionOrchestrator: except Exception as e: logger.error(f"更新 end_user other_name 失败: {e}", exc_info=True) - - - # 用户实体占位名称,不允许作为 other_name 或出现在 aliases 中 # 复用 deduped_and_disamb 模块级常量,避免重复维护 USER_PLACEHOLDER_NAMES = _USER_PLACEHOLDER_NAMES @@ -1610,7 +1627,6 @@ class ExtractionOrchestrator: if candidate and candidate.lower() in self.USER_PLACEHOLDER_NAMES: return None return candidate - return None async def _run_dedup_and_write_summary( diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py index cc8c6073..8b749c40 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py @@ -13,10 +13,6 @@ from app.core.memory.models.graph_models import ( StatementEntityEdge, StatementNode, ) -from app.core.memory.models.metadata_models import ( - UserMetadata, -) -from app.core.memory.models.message_models import DialogData logger = logging.getLogger(__name__) @@ -45,40 +41,12 @@ class MetadataExtractor: 如果文本中包含中文字符则返回 "zh",否则返回 "en"。 """ import re + combined = " ".join(statements) - if re.search(r'[\u4e00-\u9fff]', combined): + if re.search(r"[\u4e00-\u9fff]", combined): return "zh" return "en" - # def collect_user_raw_messages( - # self, - # dialog_data_list: List[DialogData], - # ) -> List[str]: - # """ - # 从原始对话数据中提取 speaker="user" 的消息原文。 - - # 直接使用用户的原始输入,不经过陈述句提取阶段的 LLM 改写, - # 避免第一人称被替换为第三人称导致元数据/别名提取错误。 - - # Returns: - # 用户原始消息文本列表 - # """ - # result = [] - # for dialog in dialog_data_list: - # if not dialog.context or not dialog.context.msgs: - # continue - # for msg in dialog.context.msgs: - # if getattr(msg, 'role', '') == 'user': - # text = (getattr(msg, 'msg', '') or '').strip() - # if text: - # result.append(text) - - # logger.info(f"收集到 {len(result)} 条用户原始消息") - # if result: - # for i, text in enumerate(result): - # logger.info(f" [user message {i+1}] {text[:200]}") - # return result - def collect_user_related_statements( self, entity_nodes: List[ExtractedEntityNode], @@ -119,7 +87,7 @@ class MetadataExtractor: for stmt_node in statement_nodes: if stmt_node.id in target_stmt_ids and stmt_node.id not in seen: total_associated += 1 - speaker = getattr(stmt_node, 'speaker', None) or 'unknown' + speaker = getattr(stmt_node, "speaker", None) or "unknown" if speaker == "user": text = (stmt_node.statement or "").strip() if text: @@ -135,7 +103,7 @@ class MetadataExtractor: ) if result: for i, text in enumerate(result): - logger.info(f" [user statement {i+1}] {text}") + logger.info(f" [user statement {i + 1}] {text}") if total_associated > 0 and len(result) == 0: logger.warning( f"有 {total_associated} 条直接关联 statement 但全部被 speaker 过滤," @@ -178,7 +146,10 @@ class MetadataExtractor: json_schema="", ) - from app.core.memory.models.metadata_models import MetadataExtractionResponse + from app.core.memory.models.metadata_models import ( + MetadataExtractionResponse, + ) + response = await self.llm_client.response_structured( messages=[{"role": "user", "content": prompt}], response_model=MetadataExtractionResponse, @@ -187,7 +158,9 @@ class MetadataExtractor: if response: metadata = response.user_metadata if response.user_metadata else None to_add = response.aliases_to_add if response.aliases_to_add else [] - to_remove = response.aliases_to_remove if response.aliases_to_remove else [] + to_remove = ( + response.aliases_to_remove if response.aliases_to_remove else [] + ) return metadata, to_add, to_remove logger.warning("LLM 返回的响应为空") diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/statement_extraction.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/statement_extraction.py index 684ad556..d90a49ba 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/statement_extraction.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/statement_extraction.py @@ -1,6 +1,5 @@ import asyncio import logging -import os from datetime import datetime from typing import Any, Dict, List, Optional @@ -82,17 +81,6 @@ class StatementExtractor: logger.warning(f"Chunk {getattr(chunk, 'id', 'unknown')} has no speaker field or is empty") return None - @staticmethod - def _replace_first_person_with_user(text: str) -> str: - """将用户消息中的第一人称代词"我"替换为"用户"。 - - 替换规则: - - 所有独立的"我"都替换为"用户"(包括"我的"→"用户的"、"叫我"→"叫用户") - - 不替换"我们"中的"我"("我们"是复数,不指代用户个人) - """ - import re - result = re.sub(r'我(?!们)', '用户', text) - return result async def _extract_statements(self, chunk, end_user_id: Optional[str] = None, dialogue_content: str = None) -> List[Statement]: """Process a single chunk and return extracted statements @@ -106,18 +94,12 @@ class StatementExtractor: List of ExtractedStatement objects extracted from the chunk """ chunk_content = chunk.content - + chunk_speaker = self._get_speaker_from_chunk(chunk) + if not chunk_content or len(chunk_content.strip()) < 5: logger.warning(f"Chunk {chunk.id} content too short or empty, skipping") return [] - # 对 speaker="user" 的 chunk,将第一人称"我"替换为"用户", - # 避免 LLM 在陈述句提取时将"我"替换为具体名字(如"齐齐"), - # 导致下游元数据/别名提取无法识别第一人称语义。 - chunk_speaker = self._get_speaker_from_chunk(chunk) - if chunk_speaker == "user": - chunk_content = self._replace_first_person_with_user(chunk_content) - prompt_content = await render_statement_extraction_prompt( chunk_content=chunk_content, definitions=LABEL_DEFINITIONS, diff --git a/api/app/core/memory/utils/metadata_utils.py b/api/app/core/memory/utils/metadata_utils.py deleted file mode 100644 index 69bd8edf..00000000 --- a/api/app/core/memory/utils/metadata_utils.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Metadata utility functions for cleaning and validating user metadata. -""" - -import logging -from typing import Optional - -from app.core.memory.models.metadata_models import UserMetadata - -logger = logging.getLogger(__name__) - - -def clean_metadata(raw: dict) -> dict: - """ - Clean metadata by removing empty string values and empty array fields recursively. - Only keeps fields with actual content. If a nested dict becomes empty after cleaning, - it is removed too. - """ - cleaned = {} - for key, value in raw.items(): - if isinstance(value, dict): - nested = clean_metadata(value) - if nested: - cleaned[key] = nested - elif isinstance(value, list): - if len(value) > 0: - cleaned[key] = value - elif isinstance(value, str): - if value != "": - cleaned[key] = value - else: - cleaned[key] = value - return cleaned - - -def validate_metadata(raw: dict) -> Optional[UserMetadata]: - """ - Validate metadata structure using the Pydantic UserMetadata model. - Returns None and logs a WARNING on validation failure. - """ - try: - return UserMetadata.model_validate(raw) - except Exception as e: - logger.warning("Metadata validation failed: %s", e) - return None diff --git a/api/app/tasks.py b/api/app/tasks.py index 3641f438..6fd5f8d6 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2969,7 +2969,6 @@ def extract_user_metadata_task( async def _run() -> Dict[str, Any]: from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import MetadataExtractor - from app.core.memory.utils.metadata_utils import clean_metadata, validate_metadata from app.repositories.end_user_info_repository import EndUserInfoRepository from app.repositories.end_user_repository import EndUserRepository from app.services.memory_config_service import MemoryConfigService @@ -3029,6 +3028,14 @@ def extract_user_metadata_task( logger.info(f"[CELERY METADATA] LLM 别名新增: {aliases_to_add}, 移除: {aliases_to_remove}") # 4. 清洗元数据、覆盖写入元数据和别名 + def clean_metadata(raw: dict) -> dict: + """递归移除空字符串、空列表、空字典。""" + return { + k: (cleaned if isinstance(v, dict) and (cleaned := clean_metadata(v)) else v) + for k, v in raw.items() + if not (v == "" or v == [] or (isinstance(v, dict) and not clean_metadata(v))) + } + raw_dict = user_metadata.model_dump(exclude_none=True) if user_metadata else {} logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}")