diff --git a/api/app/core/memory/memory_service.py b/api/app/core/memory/memory_service.py index 89762414..377eff66 100644 --- a/api/app/core/memory/memory_service.py +++ b/api/app/core/memory/memory_service.py @@ -16,7 +16,9 @@ import logging from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional if TYPE_CHECKING: + from app.core.memory.pipelines.pilot_write_pipeline import PilotWriteResult from app.core.memory.pipelines.write_pipeline import WriteResult + from app.core.memory.models.message_models import DialogData from app.schemas.memory_config_schema import MemoryConfig logger = logging.getLogger(__name__) @@ -83,6 +85,34 @@ class MemoryService: is_pilot_run=is_pilot_run, ) + async def pilot_write( + self, + chunked_dialogs: List[DialogData], + language: str = "zh", + progress_callback: Optional[ + Callable[[str, str, Optional[Dict[str, Any]]], Awaitable[None]] + ] = None, + ) -> PilotWriteResult: + """试运行写入:只执行萃取链路,不写入 Neo4j + + Args: + chunked_dialogs: 预处理 + 分块后的 DialogData 列表 + language: 语言 ("zh" | "en") + progress_callback: 可选的进度回调 + + Returns: + PilotWriteResult 包含萃取结果、图构建结果和去重结果 + """ + from app.core.memory.pipelines.pilot_write_pipeline import PilotWritePipeline + + pipeline = PilotWritePipeline( + memory_config=self.memory_config, + end_user_id=self.end_user_id, + language=language, + progress_callback=progress_callback, + ) + return await pipeline.run(chunked_dialogs) + async def read( self, query: str, history: list, search_switch: str ) -> dict: diff --git a/api/app/core/memory/models/__init__.py b/api/app/core/memory/models/__init__.py index 2a34159b..a08a9410 100644 --- a/api/app/core/memory/models/__init__.py +++ b/api/app/core/memory/models/__init__.py @@ -60,8 +60,6 @@ from app.core.memory.models.triplet_models import ( # User metadata models from app.core.memory.models.metadata_models import ( - UserMetadata, - UserMetadataProfile, MetadataExtractionResponse, MetadataFieldChange, ) @@ -132,8 +130,6 @@ __all__ = [ "Entity", "Triplet", "TripletExtractionResponse", - "UserMetadata", - "UserMetadataProfile", "MetadataExtractionResponse", "MetadataFieldChange", # Ontology models diff --git a/api/app/core/memory/models/graph_models.py b/api/app/core/memory/models/graph_models.py index cd44588d..37892e6f 100644 --- a/api/app/core/memory/models/graph_models.py +++ b/api/app/core/memory/models/graph_models.py @@ -464,6 +464,16 @@ class ExtractedEntityNode(Node): description="Whether this entity represents explicit/semantic memory (knowledge, concepts, definitions, theories, principles)" ) + # User Metadata Fields (populated by async metadata extraction after dedup) + core_facts: List[str] = Field(default_factory=list, description="Stable basic facts about the user") + traits: List[str] = Field(default_factory=list, description="Stable personality traits or behavioral tendencies") + relations: List[str] = Field(default_factory=list, description="Durable relationships with people/groups/entities") + goals: List[str] = Field(default_factory=list, description="Long-term goals or ongoing pursuits") + interests: List[str] = Field(default_factory=list, description="Stable interests, preferences, or hobbies") + beliefs_or_stances: List[str] = Field(default_factory=list, description="Stable beliefs, values, or stances") + anchors: List[str] = Field(default_factory=list, description="Personally meaningful objects or symbols") + events: List[str] = Field(default_factory=list, description="Durable personal experiences or milestones") + @field_validator('aliases', mode='before') @classmethod def validate_aliases_field(cls, v): # 字段验证器 自动清理和验证 aliases 字段 diff --git a/api/app/core/memory/models/metadata_models.py b/api/app/core/memory/models/metadata_models.py index e12c3d97..bfa29baf 100644 --- a/api/app/core/memory/models/metadata_models.py +++ b/api/app/core/memory/models/metadata_models.py @@ -2,6 +2,9 @@ Independent from triplet_models.py - these models are used by the standalone metadata extraction pipeline (post-dedup async Celery task). + +The field definitions align with the Jinja2 prompt template +``extract_user_metadata.jinja2``. """ from typing import List, Literal, Optional @@ -9,55 +12,69 @@ from typing import List, Literal, Optional from pydantic import BaseModel, ConfigDict, Field -class UserMetadataProfile(BaseModel): - """用户画像信息""" +class MetadataExtractionResponse(BaseModel): + """LLM 元数据提取响应结构。 + + 字段与 extract_user_metadata.jinja2 模板的输出 JSON 一一对应。 + 每个字段都是字符串数组,表示本次新增的元数据条目。 + """ model_config = ConfigDict(extra="ignore") - role: List[str] = Field(default_factory=list, description="用户职业或角色") - domain: List[str] = Field(default_factory=list, description="用户所在领域") - expertise: List[str] = Field( - default_factory=list, description="用户擅长的技能或工具" + + aliases: List[str] = Field( + default_factory=list, + description="用户别名、昵称、称呼", + ) + core_facts: List[str] = Field( + default_factory=list, + description="用户稳定的基础事实(身份、年龄、国籍、所在地等)", + ) + traits: List[str] = Field( + default_factory=list, + description="用户稳定的人格特质、风格、行为倾向", + ) + relations: List[str] = Field( + default_factory=list, + description="用户与他人/群体/宠物/重要对象之间的长期关系", + ) + goals: List[str] = Field( + default_factory=list, + description="用户明确、稳定的长期目标或计划", ) interests: List[str] = Field( - default_factory=list, description="用户关注的话题或领域标签" - ) - - -class UserMetadata(BaseModel): - """用户元数据顶层结构""" - - model_config = ConfigDict(extra="ignore") - profile: UserMetadataProfile = Field(default_factory=UserMetadataProfile) - - -class MetadataFieldChange(BaseModel): - """单个元数据字段的变更操作""" - - model_config = ConfigDict(extra="ignore") - field_path: str = Field( - description="字段路径,用点号分隔,如 'profile.role'、'profile.expertise'" - ) - action: Literal["set", "remove"] = Field( - description="操作类型:'set' 表示新增或修改,'remove' 表示移除" - ) - value: Optional[str] = Field( - default=None, - description="字段的新值(action='set' 时必填)。标量字段直接填值,列表字段填单个要新增的元素" - ) - - -class MetadataExtractionResponse(BaseModel): - """元数据提取 LLM 响应结构(增量模式)""" - - model_config = ConfigDict(extra="ignore") - metadata_changes: List[MetadataFieldChange] = Field( default_factory=list, - description="元数据的增量变更列表,每项描述一个字段的新增、修改或移除操作", + description="用户稳定的兴趣、偏好、长期爱好", ) - aliases_to_add: List[str] = Field( + beliefs_or_stances: List[str] = Field( default_factory=list, - description="本次新发现的用户别名(用户自我介绍或他人对用户的称呼)", + description="用户稳定的信念、价值立场", ) - aliases_to_remove: List[str] = Field( - default_factory=list, description="用户明确否认的别名(如'我不叫XX了')" + anchors: List[str] = Field( + default_factory=list, + description="对用户有长期意义的物品、收藏、纪念物", ) + events: List[str] = Field( + default_factory=list, + description="对用户画像有长期价值的个人经历、事件、里程碑", + ) + + # ── 便捷属性 ── + + METADATA_FIELDS: List[str] = [ + "core_facts", "traits", "relations", "goals", + "interests", "beliefs_or_stances", "anchors", "events", + ] + + def has_any_metadata(self) -> bool: + """是否提取到了任何元数据(不含 aliases)。""" + return any( + bool(getattr(self, field, [])) + for field in self.METADATA_FIELDS + ) + + def to_metadata_dict(self) -> dict: + """返回 8 个元数据字段的字典(不含 aliases),用于 Neo4j 回写。""" + return { + field: getattr(self, field, []) + for field in self.METADATA_FIELDS + } diff --git a/api/app/core/memory/pipelines/pilot_write_pipeline.py b/api/app/core/memory/pipelines/pilot_write_pipeline.py index 4c9e1750..2b75444c 100644 --- a/api/app/core/memory/pipelines/pilot_write_pipeline.py +++ b/api/app/core/memory/pipelines/pilot_write_pipeline.py @@ -1,17 +1,20 @@ """PilotWritePipeline — 试运行专用萃取流水线。 职责边界: -- 只执行“萃取相关”链路:statement -> triplet -> graph_build -> 第一层去重消歧 +- 只执行"萃取相关"链路:statement -> triplet -> graph_build -> 第一层去重消歧 - 不负责 Neo4j 写入、聚类、摘要、缓存更新 +- 自行管理客户端初始化和本体类型加载(与 WritePipeline 对齐) + +依赖方向:Facade → Pipeline → Engine → Repository(单向,不允许反向调用) """ from __future__ import annotations +import logging from dataclasses import dataclass -from typing import Any, Awaitable, Callable, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional from app.core.memory.models.message_models import DialogData -from app.core.memory.models.variate_config import ExtractionPipelineConfig from app.core.memory.storage_services.extraction_engine.steps.dedup_step import ( DedupResult, run_dedup, @@ -24,6 +27,11 @@ from app.core.memory.storage_services.extraction_engine.steps.graph_build_step i build_graph_nodes_and_edges, ) +if TYPE_CHECKING: + from app.schemas.memory_config_schema import MemoryConfig + +logger = logging.getLogger(__name__) + @dataclass class PilotWriteResult: @@ -46,36 +54,54 @@ class PilotWriteResult: class PilotWritePipeline: - """重构后试运行专用流水线。""" + """重构后试运行专用流水线。 + + 构造函数只接收 memory_config,客户端初始化和本体加载在 run() 内部完成, + 与 WritePipeline 保持一致的生命周期管理模式。 + """ def __init__( self, - llm_client: Any, - embedder_client: Any, - pipeline_config: ExtractionPipelineConfig, - embedding_id: Optional[str], + memory_config: MemoryConfig, + end_user_id: str, language: str = "zh", - ontology_types: Any = None, progress_callback: Optional[ Callable[[str, str, Optional[Dict[str, Any]]], Awaitable[None]] ] = None, ) -> None: - self.llm_client = llm_client - self.embedder_client = embedder_client - self.pipeline_config = pipeline_config - self.embedding_id = embedding_id + """ + Args: + memory_config: 不可变的记忆配置对象(从数据库加载) + end_user_id: 终端用户 ID + language: 语言 ("zh" | "en") + progress_callback: 可选的进度回调 + """ + self.memory_config = memory_config + self.end_user_id = end_user_id self.language = language - self.ontology_types = ontology_types self.progress_callback = progress_callback + # 延迟初始化的客户端 + self._llm_client = None + self._embedder_client = None + async def run(self, dialog_data_list: List[DialogData]) -> PilotWriteResult: - """执行试运行萃取链路。""" + """执行试运行萃取链路。 + + 内部完成客户端初始化 → 本体加载 → 萃取 → 图构建 → 去重。 + """ + from app.core.memory.utils.config.config_utils import get_pipeline_config + + self._init_clients() + pipeline_config = get_pipeline_config(self.memory_config) + ontology_types = self._load_ontology_types() + orchestrator = NewExtractionOrchestrator( - llm_client=self.llm_client, - embedder_client=self.embedder_client, - config=self.pipeline_config, - embedding_id=self.embedding_id, - ontology_types=self.ontology_types, + llm_client=self._llm_client, + embedder_client=self._embedder_client, + config=pipeline_config, + embedding_id=str(self.memory_config.embedding_model_id), + ontology_types=ontology_types, language=self.language, is_pilot_run=True, progress_callback=self.progress_callback, @@ -84,7 +110,7 @@ class PilotWritePipeline: graph = await build_graph_nodes_and_edges( dialog_data_list=extracted_dialogs, - embedder_client=self.embedder_client, + embedder_client=self._embedder_client, progress_callback=self.progress_callback, ) @@ -93,9 +119,9 @@ class PilotWritePipeline: statement_entity_edges=graph.stmt_entity_edges, entity_entity_edges=graph.entity_entity_edges, dialog_data_list=extracted_dialogs, - pipeline_config=self.pipeline_config, + pipeline_config=pipeline_config, connector=None, # pilot: no layer-2 db dedup - llm_client=self.llm_client, + llm_client=self._llm_client, is_pilot_run=True, progress_callback=self.progress_callback, ) @@ -106,3 +132,50 @@ class PilotWritePipeline: dedup=dedup, ) + # ────────────────────────────────────────────── + # 辅助方法 + # ────────────────────────────────────────────── + + def _init_clients(self) -> None: + """从 MemoryConfig 构建 LLM 和 Embedding 客户端。""" + from app.core.memory.utils.llm.llm_utils import MemoryClientFactory + from app.db import get_db_context + + with get_db_context() as db: + factory = MemoryClientFactory(db) + self._llm_client = factory.get_llm_client_from_config(self.memory_config) + self._embedder_client = factory.get_embedder_client_from_config( + self.memory_config + ) + logger.info("Pilot pipeline: LLM and embedding clients constructed") + + def _load_ontology_types(self): + """加载本体类型配置(如果配置了 scene_id)。""" + if not self.memory_config.scene_id: + return None + + try: + from app.core.memory.ontology_services.ontology_type_loader import ( + load_ontology_types_for_scene, + ) + from app.db import get_db_context + + with get_db_context() as db: + ontology_types = load_ontology_types_for_scene( + scene_id=self.memory_config.scene_id, + workspace_id=self.memory_config.workspace_id, + db=db, + ) + if ontology_types: + logger.info( + f"Loaded {len(ontology_types.types)} ontology types " + f"for scene_id: {self.memory_config.scene_id}" + ) + return ontology_types + except Exception as e: + logger.warning( + f"Failed to load ontology types for scene_id " + f"{self.memory_config.scene_id}: {e}", + exc_info=True, + ) + return None diff --git a/api/app/core/memory/pipelines/write_pipeline.py b/api/app/core/memory/pipelines/write_pipeline.py index f2a55ae3..d50f555f 100644 --- a/api/app/core/memory/pipelines/write_pipeline.py +++ b/api/app/core/memory/pipelines/write_pipeline.py @@ -186,9 +186,12 @@ class WritePipeline: self._init_clients() self._init_neo4j_connector() - # 初始化 Snapshot(提前创建,供预处理阶段的剪枝使用) - from app.core.memory.utils.debug.pipeline_snapshot import PipelineSnapshot - self._snapshot = PipelineSnapshot("new") + # 初始化快照记录器(提前创建,供预处理阶段的剪枝使用) + from app.core.memory.utils.debug.write_snapshot_recorder import ( + WriteSnapshotRecorder, + ) + + self._recorder = WriteSnapshotRecorder("new") # Step 1: 预处理 - 消息分块 + AI消息语义剪枝 async with bear.step(1, 5, "预处理", "消息分块") as s: @@ -197,7 +200,9 @@ class WritePipeline: # Step 2: 萃取 - 知识提取 async with bear.step(2, 5, "萃取", "知识提取") as s: - extraction_result = await self._extract(chunked_dialogs, is_pilot_run) + extraction_result = await self._extract( + chunked_dialogs, is_pilot_run + ) stats = extraction_result.stats s.metadata( entities=stats["entity_count"], @@ -224,6 +229,9 @@ class WritePipeline: # Step 3.5: 异步情绪提取(fire-and-forget,需在 _store 之后确保 Statement 节点已存在) await self._extract_emotion(getattr(self, "_emotion_statements", [])) + # Step 3.6: 异步元数据提取(fire-and-forget,需在 _store 之后确保 Entity 节点已存在) + await self._extract_metadata(extraction_result) + # Step 4: 聚类 - 增量更新社区(异步,不阻塞) async with bear.step(4, 5, "聚类", "增量更新社区") as s: await self._cluster(extraction_result) @@ -264,7 +272,8 @@ class WritePipeline: """ from app.core.memory.agent.utils.get_dialogs import get_chunked_dialogs - snapshot = getattr(self, "_snapshot", None) + recorder = getattr(self, "_recorder", None) + snapshot = recorder.snapshot if recorder else None return await get_chunked_dialogs( chunker_strategy=self.memory_config.chunker_strategy, @@ -308,14 +317,16 @@ class WritePipeline: ) from app.core.memory.utils.config.config_utils import get_pipeline_config - from app.core.memory.utils.debug.pipeline_snapshot import PipelineSnapshot + from app.core.memory.utils.debug.write_snapshot_recorder import ( + WriteSnapshotRecorder, + ) pipeline_config = get_pipeline_config(self.memory_config) ontology_types = self._load_ontology_types() - # 复用 run() 中已创建的 snapshot(剪枝阶段已使用同一实例) - snapshot = getattr(self, "_snapshot", None) or PipelineSnapshot("new") - self._snapshot = snapshot + # 复用 run() 中已创建的 recorder(剪枝阶段已使用同一实例) + recorder = getattr(self, "_recorder", None) or WriteSnapshotRecorder("new") + self._recorder = recorder # ── 新编排器:LLM 萃取 + 数据赋值 ── new_orchestrator = NewExtractionOrchestrator( @@ -335,52 +346,8 @@ class WritePipeline: # 注意:实际 dispatch 在 _store 之后,确保 Statement 节点已写入 Neo4j self._emotion_statements = new_orchestrator.emotion_statements - # ── Snapshot: 各阶段萃取结果 ── TODO 乐力齐 重构流水线切换生产环境稳定后修改 - stage_outputs = new_orchestrator.last_stage_outputs - if stage_outputs: - stmt_results = stage_outputs.get("statement_results", {}) - stmt_snapshot = [] - for _did, chunk_stmts in stmt_results.items(): - for _cid, stmts in chunk_stmts.items(): - for s in stmts: - stmt_snapshot.append(s.model_dump()) - snapshot.save_stage("2_statement_outputs", stmt_snapshot) - - triplet_results = stage_outputs.get("triplet_results", {}) - triplet_snapshot = {} - for _did, stmt_triplets in triplet_results.items(): - for stmt_id, t_out in stmt_triplets.items(): - triplet_snapshot[stmt_id] = t_out.model_dump() - snapshot.save_stage("3_triplet_outputs", triplet_snapshot) - - emotion_results = stage_outputs.get("emotion_results", {}) - emotion_snapshot = {} - for stmt_id, emo in emotion_results.items(): - if hasattr(emo, "model_dump"): - emotion_snapshot[stmt_id] = emo.model_dump() - snapshot.save_stage("4_emotion_outputs", emotion_snapshot) - - emb_output = stage_outputs.get("embedding_output") - if emb_output and hasattr(emb_output, "model_dump"): - emb_data = emb_output.model_dump() - for key in ( - "statement_embeddings", - "chunk_embeddings", - "entity_embeddings", - ): - if key in emb_data and isinstance(emb_data[key], dict): - emb_data[key] = { - k: v[:5] if isinstance(v, list) else v - for k, v in emb_data[key].items() - } - if "dialog_embeddings" in emb_data and isinstance( - emb_data["dialog_embeddings"], list - ): - emb_data["dialog_embeddings"] = [ - v[:5] if isinstance(v, list) else v - for v in emb_data["dialog_embeddings"] - ] - snapshot.save_stage("5_embedding_outputs", emb_data) + # ── Snapshot: 各阶段萃取结果 ── + recorder.record_stage_outputs(new_orchestrator.last_stage_outputs) # step2: 构建图节点和边 graph = await build_graph_nodes_and_edges( @@ -389,34 +356,8 @@ class WritePipeline: progress_callback=self.progress_callback, ) - # region Snapshot: 图节点和边(去重前)Snapshot有关的内容在重构流水线切换生产环境之后修改 - snapshot.save_stage( - "6_nodes_edges_before_dedup", - { - "dialogue_nodes_count": len(graph.dialogue_nodes), - "chunk_nodes_count": len(graph.chunk_nodes), - "statement_nodes_count": len(graph.statement_nodes), - "entity_nodes": [ - { - "id": e.id, - "name": e.name, - "entity_type": e.entity_type, - "description": e.description, - } - for e in graph.entity_nodes - ], - "entity_entity_edges": [ - { - "source": e.source, - "target": e.target, - "relation_type": e.relation_type, - "statement": e.statement, - } - for e in graph.entity_entity_edges - ], - "stmt_entity_edges_count": len(graph.stmt_entity_edges), - }, - ) + # Snapshot: 图节点和边(去重前) + recorder.record_graph_before_dedup(graph) # step3: 两阶段去重消歧 dedup_result = await run_dedup( @@ -432,29 +373,7 @@ class WritePipeline: ) # Snapshot: 去重后 - snapshot.save_stage( - "7_after_dedup", - { - "entity_nodes": [ - { - "id": e.id, - "name": e.name, - "entity_type": e.entity_type, - "description": e.description, - } - for e in dedup_result.entity_nodes - ], - "entity_entity_edges": [ - { - "source": e.source, - "target": e.target, - "relation_type": e.relation_type, - "statement": e.statement, - } - for e in dedup_result.entity_entity_edges - ], - }, - ) + recorder.record_dedup_result(dedup_result) # step4: 构造最终结果 result = ExtractionResult( @@ -474,7 +393,7 @@ class WritePipeline: dialog_data_list=dialog_data_list, ) - snapshot.save_summary(result.stats) # TODO 乐力齐 snapshot需要改 + recorder.record_summary(result.stats) return result # ────────────────────────────────────────────── @@ -551,7 +470,10 @@ class WritePipeline: 同时在内存中同步更新 ExtractionResult.entity_nodes,保持内存与 Neo4j 一致。 失败不中断主流程。 """ - from app.repositories.neo4j.cypher_queries import MERGE_ALIAS_BELONGS_TO + from app.repositories.neo4j.cypher_queries import ( + MERGE_ALIAS_BELONGS_TO, + REDIRECT_ALIAS_EDGES, + ) ALIAS_PREDICATE = "别名属于" @@ -571,12 +493,17 @@ class WritePipeline: # ── 1. 在内存中同步更新 entity_nodes ── entity_map = {e.id: e for e in result.entity_nodes} + # 构建 alias_id → target_id 映射(别名节点 → 用户节点) + alias_to_target: dict[str, str] = {} + for edge in alias_edges: source_node = entity_map.get(edge.source) target_node = entity_map.get(edge.target) if not source_node or not target_node: continue + alias_to_target[edge.source] = edge.target + # 将 source.name 追加到 target.aliases(去重,忽略大小写) source_name = (source_node.name or "").strip() if source_name: @@ -595,11 +522,36 @@ class WritePipeline: f"{tgt_desc};{src_desc}" if tgt_desc else src_desc ) + # ── 1.1 内存中重定向指向别名节点的边到用户节点 ── + alias_ids = set(alias_to_target.keys()) + redirected_ee_count = 0 + redirected_se_count = 0 + + # 重定向 entity_entity_edges(排除"别名属于"边本身) + for edge in result.entity_entity_edges: + rel_type = getattr(edge, "relation_type", "") + if rel_type == ALIAS_PREDICATE: + continue + if edge.source in alias_ids: + edge.source = alias_to_target[edge.source] + redirected_ee_count += 1 + if edge.target in alias_ids: + edge.target = alias_to_target[edge.target] + redirected_ee_count += 1 + + # 重定向 stmt_entity_edges(陈述句 → 实体边) + for edge in result.stmt_entity_edges: + if edge.target in alias_ids: + edge.target = alias_to_target[edge.target] + redirected_se_count += 1 + logger.info( - f"[AliasMerge] 内存同步完成,处理 {len(alias_edges)} 条 '别名属于' 边" + f"[AliasMerge] 内存同步完成,处理 {len(alias_edges)} 条 '别名属于' 边," + f"重定向 entity_entity 边 {redirected_ee_count} 次," + f"重定向 stmt_entity 边 {redirected_se_count} 次" ) - # ── 2. 写入 Neo4j ── + # ── 2. 写入 Neo4j:别名属性归并 ── records = await self._neo4j_connector.execute_query( MERGE_ALIAS_BELONGS_TO, end_user_id=self.end_user_id, @@ -607,6 +559,16 @@ class WritePipeline: merged_count = len(records) if records else 0 logger.info(f"[AliasMerge] Neo4j 别名归并完成,影响 {merged_count} 条记录") + # ── 3. 写入 Neo4j:重定向指向别名节点的边到用户节点 ── + redirect_records = await self._neo4j_connector.execute_query( + REDIRECT_ALIAS_EDGES, + end_user_id=self.end_user_id, + ) + redirect_count = len(redirect_records) if redirect_records else 0 + logger.info( + f"[AliasMerge] Neo4j 边重定向完成,影响 {redirect_count} 条记录" + ) + except Exception as e: logger.warning( f"[AliasMerge] 别名归并失败(不影响主流程): {e}", exc_info=True @@ -691,10 +653,10 @@ class WritePipeline: return # 快照目录:仅在 PIPELINE_SNAPSHOT_ENABLED=true 时非空,供 worker 端落盘 - snapshot = getattr(self, "_snapshot", None) + recorder = getattr(self, "_recorder", None) snapshot_dir = ( - snapshot.directory - if snapshot is not None and getattr(snapshot, "enabled", False) + recorder.snapshot_dir + if recorder is not None and recorder.enabled else None ) @@ -723,6 +685,67 @@ class WritePipeline: exc_info=True, ) + # ────────────────────────────────────────────── + # Step 3.6: 异步元数据提取 + # fire-and-forget 提交 Celery 任务,不阻塞主流程 + # ────────────────────────────────────────────── + + async def _extract_metadata(self, result: ExtractionResult) -> None: + """提交异步元数据提取 Celery 任务。 + + 从去重后的用户实体 description 中提取结构化元数据, + 异步回写到 Neo4j ExtractedEntity 节点。失败不影响主流程。 + """ + from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import ( + collect_user_entities_for_metadata, + ) + + user_entities = collect_user_entities_for_metadata(result.entity_nodes) + + if not user_entities: + return + + llm_model_id = ( + str(self.memory_config.llm_model_id) + if self.memory_config.llm_model_id + else None + ) + if not llm_model_id: + logger.warning("[Metadata] 无法提交元数据提取任务:llm_model_id 为空") + return + + # 快照目录 + recorder = getattr(self, "_recorder", None) + snapshot_dir = ( + recorder.snapshot_dir + if recorder is not None and recorder.enabled + else None + ) + + try: + from app.celery_app import celery_app + + task_result = celery_app.send_task( + "app.tasks.extract_metadata_batch", + kwargs={ + "user_entities": user_entities, + "llm_model_id": llm_model_id, + "language": self.language, + "snapshot_dir": snapshot_dir, + }, + ) + logger.info( + f"[Metadata] 异步元数据提取任务已提交 - " + f"task_id = {task_result.id}, " + f"entity_count = {len(user_entities)}, " + f"snapshot_dir = {snapshot_dir}" + ) + except Exception as e: + logger.error( + f"[Metadata] 提交元数据提取任务失败(不影响主流程): {e}", + exc_info=True, + ) + # ────────────────────────────────────────────── # Step 5: 摘要 # (+ entity_description)+ meta_data部分在此提取 diff --git a/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py b/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py index 980f5130..d2063e0f 100644 --- a/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py +++ b/api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py @@ -117,12 +117,18 @@ def _merge_attribute(canonical: ExtractedEntityNode, ent: ExtractedEntityNode): except Exception: pass - # 描述与事实摘要(保留更长者) + # 描述合并(去重拼接,分号分隔) try: - desc_a = getattr(canonical, "description", "") or "" - desc_b = getattr(ent, "description", "") or "" - if len(desc_b) > len(desc_a): - canonical.description = desc_b + desc_a = (getattr(canonical, "description", "") or "").strip() + desc_b = (getattr(ent, "description", "") or "").strip() + if desc_b and desc_b != desc_a: + if desc_a: + # 将已有 description 按分号拆分,检查新 description 是否已存在 + existing_parts = {p.strip() for p in desc_a.replace(";", ";").split(";") if p.strip()} + if desc_b not in existing_parts: + canonical.description = f"{desc_a};{desc_b}" + else: + canonical.description = desc_b # 合并事实摘要:统一保留一个“实体: name”行,来源行去重保序 # TODO: fact_summary 功能暂时禁用,待后续开发完善后启用 # fact_a = getattr(canonical, "fact_summary", "") or "" diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py index 4c3be0c6..358374ee 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py @@ -311,53 +311,8 @@ class ExtractionOrchestrator: dialog_data_list, ) - # 步骤 7: 触发异步元数据和别名提取(仅正式模式) - if not is_pilot_run: - try: - from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import ( - MetadataExtractor, - ) - - metadata_extractor = MetadataExtractor( - llm_client=self.llm_client, language=self.language - ) - user_statements = ( - metadata_extractor.collect_user_related_statements( - entity_nodes, statement_nodes, statement_entity_edges - ) - ) - if user_statements: - end_user_id = ( - dialog_data_list[0].end_user_id - if dialog_data_list - else None - ) - config_id = ( - dialog_data_list[0].config_id - if dialog_data_list - and hasattr(dialog_data_list[0], "config_id") - else None - ) - if end_user_id: - from app.tasks import extract_user_metadata_task - - extract_user_metadata_task.delay( - end_user_id=str(end_user_id), - statements=user_statements, - config_id=str(config_id) if config_id else None, - language=self.language, - ) - logger.info( - f"已触发异步元数据提取任务,共 {len(user_statements)} 条用户相关 statement" - ) - else: - logger.info("未找到用户相关 statement,跳过元数据提取") - except Exception as e: - logger.error( - f"触发元数据提取任务失败(不影响主流程): {e}", exc_info=True - ) - - # 别名同步已迁移到 Celery 元数据提取任务中,不再在此处执行 + # 步骤 7: 元数据提取已迁移到新流水线(WritePipeline._extract_metadata), + # 旧编排器不再触发异步元数据提取任务。 logger.info(f"知识提取流水线运行完成({mode_str})") return ( diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_pipeline_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_pipeline_orchestrator.py index 16082804..7fc6b053 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_pipeline_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_pipeline_orchestrator.py @@ -25,7 +25,7 @@ from app.core.memory.models.variate_config import ExtractionPipelineConfig from .steps.base import ExtractionStep, StepContext from .steps.embedding_step import EmbeddingStep -from .steps.sidecar_factory import SidecarStepFactory, SidecarTiming +from .sidecar_factory import SidecarStepFactory, SidecarTiming from .steps.statement_temporal_step import StatementTemporalExtractionStep from .steps.triplet_step import TripletExtractionStep from .steps.schema import ( diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/memory_summary.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/memory_summary.py index 5e39ba36..9bba7e51 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/memory_summary.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/memory_summary.py @@ -142,7 +142,7 @@ async def generate_title_and_type_for_summary( f"已归一化为 '{episodic_type}'" ) - logger.info(f"成功生成标题和类型 (language={language}): title={title}, type={episodic_type}") + logger.debug(f"成功生成标题和类型 (language={language}): title={title}, type={episodic_type}") return (title, episodic_type) except json.JSONDecodeError: @@ -197,7 +197,7 @@ async def _process_chunk_summary( llm_client=llm_client, language=language ) - logger.info(f"Generated title and type for MemorySummary (language={language}): title={title}, type={episodic_type}") + logger.debug(f"Generated title and type for MemorySummary (language={language}): title={title}, type={episodic_type}") except Exception as e: logger.warning(f"Failed to generate title and type for chunk {chunk.id}: {e}") # Continue without title and type diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py index 29f4e85b..514a4ebc 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/metadata_extractor.py @@ -1,176 +1,69 @@ """ -Metadata extractor module. +Metadata extractor utilities. -Collects user-related statements from post-dedup graph data and -extracts user metadata via an independent LLM call. +Provides helper functions for identifying user entities from post-dedup +graph data. The actual LLM extraction logic lives in MetadataExtractionStep. """ import logging -from typing import List, Optional +from typing import Dict, List -from app.core.memory.models.graph_models import ( - ExtractedEntityNode, - StatementEntityEdge, - StatementNode, -) +from app.core.memory.models.graph_models import ExtractedEntityNode logger = logging.getLogger(__name__) -# Reuse the same user-entity detection logic from dedup module -_USER_NAMES = {"用户", "我", "user", "i"} -_CANONICAL_USER_TYPE = "用户" +# 用户实体判定常量 +USER_NAMES = {"用户", "我", "user", "i"} +CANONICAL_USER_TYPE = "用户" -def _is_user_entity(ent: ExtractedEntityNode) -> bool: - """判断实体是否为用户实体""" - name = (getattr(ent, "name", "") or "").strip().lower() - etype = (getattr(ent, "entity_type", "") or "").strip() - return name in _USER_NAMES or etype == _CANONICAL_USER_TYPE +def is_user_entity(entity: ExtractedEntityNode) -> bool: + """判断实体是否为用户实体。""" + name = (getattr(entity, "name", "") or "").strip().lower() + etype = (getattr(entity, "entity_type", "") or "").strip() + return name in USER_NAMES or etype == CANONICAL_USER_TYPE -class MetadataExtractor: - """Extracts user metadata from post-dedup graph data via independent LLM call.""" +def collect_user_entities_for_metadata( + entity_nodes: List[ExtractedEntityNode], +) -> List[Dict]: + """从去重后的实体列表中筛选用户实体,构造元数据提取的输入。 - def __init__(self, llm_client, language: Optional[str] = None): - self.llm_client = llm_client - self.language = language + 将每个用户实体的 description 按分号拆分为列表, + 作为 Celery 异步元数据提取任务的输入。 - @staticmethod - def detect_language(statements: List[str]) -> str: - """根据 statement 文本内容检测语言。 - 如果文本中包含中文字符则返回 "zh",否则返回 "en"。 - """ - import re + Args: + entity_nodes: 去重后的实体节点列表 - combined = " ".join(statements) - if re.search(r"[\u4e00-\u9fff]", combined): - return "zh" - return "en" + Returns: + 用户实体字典列表,每项包含 entity_id、entity_name、descriptions + """ + user_entities = [] + for entity in entity_nodes: + if not is_user_entity(entity): + continue - def collect_user_related_statements( - self, - entity_nodes: List[ExtractedEntityNode], - statement_nodes: List[StatementNode], - statement_entity_edges: List[StatementEntityEdge], - ) -> List[str]: - """ - 从去重后的数据中筛选与用户直接相关且由用户发言的 statement 文本。 + desc = (getattr(entity, "description", "") or "").strip() + if not desc: + continue - 筛选逻辑: - 1. 用户实体 → StatementEntityEdge → statement(直接关联) - 2. 只保留 speaker="user" 的 statement(过滤 assistant 回复的噪声) - - Returns: - 用户发言的 statement 文本列表 - """ - # Find user entity IDs - user_entity_ids = set() - for ent in entity_nodes: - if _is_user_entity(ent): - user_entity_ids.add(ent.id) - - if not user_entity_ids: - logger.debug("未找到用户实体节点,跳过 statement 收集") - return [] - - # 用户实体 → StatementEntityEdge → statement - target_stmt_ids = set() - for edge in statement_entity_edges: - if edge.target in user_entity_ids: - target_stmt_ids.add(edge.source) - - # Collect: only speaker="user" statements, preserving order - result = [] - seen = set() - total_associated = 0 - skipped_non_user = 0 - for stmt_node in statement_nodes: - if stmt_node.id in target_stmt_ids and stmt_node.id not in seen: - total_associated += 1 - speaker = getattr(stmt_node, "speaker", None) or "unknown" - if speaker == "user": - text = (stmt_node.statement or "").strip() - if text: - result.append(text) - else: - skipped_non_user += 1 - seen.add(stmt_node.id) + # 将分号分隔的 description 拆分为列表 + descriptions = [ + d.strip() for d in desc.replace(";", ";").split(";") + if d.strip() + ] + if descriptions: + user_entities.append({ + "entity_id": entity.id, + "entity_name": entity.name, + "descriptions": descriptions, + }) + if user_entities: logger.info( - f"收集到 {len(result)} 条用户发言 statement " - f"(直接关联: {total_associated}, speaker=user: {len(result)}, " - f"跳过非user: {skipped_non_user})" + f"收集到 {len(user_entities)} 个用户实体用于元数据提取" ) - if result: - for i, text in enumerate(result): - logger.info(f" [user statement {i + 1}] {text}") - if total_associated > 0 and len(result) == 0: - logger.warning( - f"有 {total_associated} 条直接关联 statement 但全部被 speaker 过滤," - f"可能本次写入不包含 user 消息" - ) - return result + else: + logger.debug("未找到用户实体,跳过元数据提取") - async def extract_metadata( - self, - statements: List[str], - existing_metadata: Optional[dict] = None, - existing_aliases: Optional[List[str]] = None, - ) -> Optional[tuple]: - """ - 对筛选后的 statement 列表调用 LLM 提取元数据增量变更和用户别名。 - - Args: - statements: 用户发言的 statement 文本列表 - existing_metadata: 数据库已有的元数据(可选) - existing_aliases: 数据库已有的用户别名列表(可选) - - Returns: - (List[MetadataFieldChange], List[str], List[str]) tuple: - (metadata_changes, aliases_to_add, aliases_to_remove) on success, None on failure - """ - if not statements: - return None - - try: - from app.core.memory.utils.prompt.prompt_utils import prompt_env - - if self.language: - detected_language = self.language - logger.info(f"元数据提取使用显式指定语言: {detected_language}") - else: - detected_language = self.detect_language(statements) - logger.info(f"元数据提取语言自动检测结果: {detected_language}") - - template = prompt_env.get_template("extract_user_metadata.jinja2") - prompt = template.render( - statements=statements, - language=detected_language, - existing_metadata=existing_metadata, - existing_aliases=existing_aliases, - json_schema="", - ) - - from app.core.memory.models.metadata_models import ( - MetadataExtractionResponse, - ) - - response = await self.llm_client.response_structured( - messages=[{"role": "user", "content": prompt}], - response_model=MetadataExtractionResponse, - ) - - if response: - changes = response.metadata_changes if response.metadata_changes else [] - to_add = response.aliases_to_add if response.aliases_to_add else [] - to_remove = ( - response.aliases_to_remove if response.aliases_to_remove else [] - ) - return changes, to_add, to_remove - - logger.warning("LLM 返回的响应为空") - return None - - except Exception as e: - logger.error(f"元数据提取 LLM 调用失败: {e}", exc_info=True) - return None + return user_entities diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/ontology_extraction.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/ontology_extraction.py index 4bf44e07..ebe5fac5 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/ontology_extraction.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/ontology_extraction.py @@ -51,7 +51,7 @@ class OntologyExtractor: self.validator = OntologyValidator() self.owl_validator = OWLValidator() - logger.info("OntologyExtractor initialized") + logger.debug("OntologyExtractor initialized") async def extract_ontology_classes( self, diff --git a/api/app/core/memory/storage_services/extraction_engine/steps/sidecar_factory.py b/api/app/core/memory/storage_services/extraction_engine/sidecar_factory.py similarity index 98% rename from api/app/core/memory/storage_services/extraction_engine/steps/sidecar_factory.py rename to api/app/core/memory/storage_services/extraction_engine/sidecar_factory.py index 2f652ee6..2f48a0c7 100644 --- a/api/app/core/memory/storage_services/extraction_engine/steps/sidecar_factory.py +++ b/api/app/core/memory/storage_services/extraction_engine/sidecar_factory.py @@ -9,7 +9,7 @@ import logging from enum import Enum from typing import Any, Dict, List, Tuple, Type -from .base import ExtractionStep, StepContext +from .steps.base import ExtractionStep, StepContext logger = logging.getLogger(__name__) diff --git a/api/app/core/memory/storage_services/extraction_engine/steps/__init__.py b/api/app/core/memory/storage_services/extraction_engine/steps/__init__.py index cbd7b742..767f1a08 100644 --- a/api/app/core/memory/storage_services/extraction_engine/steps/__init__.py +++ b/api/app/core/memory/storage_services/extraction_engine/steps/__init__.py @@ -4,7 +4,7 @@ Importing this package triggers @register decorator self-registration for all sidecar (non-critical) steps via SidecarStepFactory. """ -from .sidecar_factory import SidecarStepFactory, SidecarTiming # noqa: F401 +from ..sidecar_factory import SidecarStepFactory, SidecarTiming # noqa: F401 # Step implementations — importing triggers @register self-registration. from .statement_temporal_step import StatementTemporalExtractionStep # noqa: F401 diff --git a/api/app/core/memory/storage_services/extraction_engine/steps/emotion_step.py b/api/app/core/memory/storage_services/extraction_engine/steps/emotion_step.py index 5dab791d..b267c2d9 100644 --- a/api/app/core/memory/storage_services/extraction_engine/steps/emotion_step.py +++ b/api/app/core/memory/storage_services/extraction_engine/steps/emotion_step.py @@ -12,7 +12,7 @@ from app.core.memory.models.emotion_models import EmotionExtraction from app.core.memory.utils.prompt.prompt_utils import render_emotion_extraction_prompt from .base import ExtractionStep, StepContext -from .sidecar_factory import SidecarStepFactory, SidecarTiming +from ..sidecar_factory import SidecarStepFactory, SidecarTiming from .schema import EmotionStepInput, EmotionStepOutput logger = logging.getLogger(__name__) diff --git a/api/app/core/memory/storage_services/extraction_engine/steps/graph_build_step.py b/api/app/core/memory/storage_services/extraction_engine/steps/graph_build_step.py index 33791f77..6a73e715 100644 --- a/api/app/core/memory/storage_services/extraction_engine/steps/graph_build_step.py +++ b/api/app/core/memory/storage_services/extraction_engine/steps/graph_build_step.py @@ -308,6 +308,7 @@ async def build_graph_nodes_and_edges( object_entity_id = entity_idx_to_id.get(triplet.object_id) if subject_entity_id and object_entity_id: + _tv = getattr(statement, "temporal_validity", None) entity_entity_edges.append( EntityEntityEdge( source=subject_entity_id, @@ -320,6 +321,8 @@ async def build_graph_nodes_and_edges( run_id=dialog_data.run_id, created_at=dialog_data.created_at, expired_at=dialog_data.expired_at, + valid_at=_tv.valid_at if _tv else None, + invalid_at=_tv.invalid_at if _tv else None, ) ) diff --git a/api/app/core/memory/storage_services/extraction_engine/steps/metadata_step.py b/api/app/core/memory/storage_services/extraction_engine/steps/metadata_step.py new file mode 100644 index 00000000..f3d37ffd --- /dev/null +++ b/api/app/core/memory/storage_services/extraction_engine/steps/metadata_step.py @@ -0,0 +1,89 @@ +"""MetadataExtractionStep — 用户实体元数据提取 step。 + +从用户实体的 description 中提取结构化元数据(core_facts、traits、relations 等), +通过 Celery 异步任务在去重消歧完成后执行,结果回写到 Neo4j ExtractedEntity 节点。 + +不注册为 SidecarStepFactory 的自动旁路(因为它在去重后异步执行,不在主萃取流程中), +而是由 Celery 任务直接实例化调用。 +""" + +import json +import logging +from typing import Any + +from .base import ExtractionStep, StepContext +from .schema import MetadataStepInput, MetadataStepOutput + +logger = logging.getLogger(__name__) + + +class MetadataExtractionStep(ExtractionStep[MetadataStepInput, MetadataStepOutput]): + """从用户实体 description 中提取结构化元数据。 + + 非 critical step — 失败返回空默认值,不中断流程。 + """ + + def __init__(self, context: StepContext) -> None: + super().__init__(context) + + @property + def name(self) -> str: + return "metadata_extraction" + + @property + def is_critical(self) -> bool: + return False + + @property + def max_retries(self) -> int: + return 1 + + async def render_prompt(self, input_data: MetadataStepInput) -> str: + """使用 Jinja2 模板渲染元数据提取 prompt。""" + from app.core.memory.utils.prompt.prompt_utils import prompt_env + + template = prompt_env.get_template("extract_user_metadata.jinja2") + + input_json = json.dumps( + { + "description": input_data.descriptions, + "existing_metadata": input_data.existing_metadata, + }, + ensure_ascii=False, + indent=2, + ) + + return template.render( + language=self.language, + input_json=input_json, + ) + + async def call_llm(self, prompt: Any) -> Any: + """调用 LLM 进行结构化输出。""" + from app.core.memory.models.metadata_models import MetadataExtractionResponse + + messages = [{"role": "user", "content": prompt}] + return await self.llm_client.response_structured( + messages, MetadataExtractionResponse + ) + + async def parse_response( + self, raw_response: Any, input_data: MetadataStepInput + ) -> MetadataStepOutput: + """将 LLM 响应解析为 MetadataStepOutput。""" + if raw_response is None: + return self.get_default_output() + + return MetadataStepOutput( + core_facts=getattr(raw_response, "core_facts", []) or [], + traits=getattr(raw_response, "traits", []) or [], + relations=getattr(raw_response, "relations", []) or [], + goals=getattr(raw_response, "goals", []) or [], + interests=getattr(raw_response, "interests", []) or [], + beliefs_or_stances=getattr(raw_response, "beliefs_or_stances", []) or [], + anchors=getattr(raw_response, "anchors", []) or [], + events=getattr(raw_response, "events", []) or [], + ) + + def get_default_output(self) -> MetadataStepOutput: + return MetadataStepOutput() diff --git a/api/app/core/memory/storage_services/extraction_engine/steps/schema/__init__.py b/api/app/core/memory/storage_services/extraction_engine/steps/schema/__init__.py index 0223b860..98351785 100644 --- a/api/app/core/memory/storage_services/extraction_engine/steps/schema/__init__.py +++ b/api/app/core/memory/storage_services/extraction_engine/steps/schema/__init__.py @@ -19,6 +19,8 @@ from .extraction_step_schema import ( from .sidecar_step_schema import ( EmotionStepInput, EmotionStepOutput, + MetadataStepInput, + MetadataStepOutput, ) __all__ = [ @@ -39,4 +41,7 @@ __all__ = [ # Sidecar — Emotion "EmotionStepInput", "EmotionStepOutput", + # Sidecar — Metadata + "MetadataStepInput", + "MetadataStepOutput", ] diff --git a/api/app/core/memory/storage_services/extraction_engine/steps/schema/sidecar_step_schema.py b/api/app/core/memory/storage_services/extraction_engine/steps/schema/sidecar_step_schema.py index 78cb0982..a79a99bf 100644 --- a/api/app/core/memory/storage_services/extraction_engine/steps/schema/sidecar_step_schema.py +++ b/api/app/core/memory/storage_services/extraction_engine/steps/schema/sidecar_step_schema.py @@ -24,3 +24,39 @@ class EmotionStepOutput(BaseModel): emotion_type: str = "neutral" emotion_intensity: float = 0.0 emotion_keywords: List[str] = Field(default_factory=list) + + +# ── Metadata extraction (async post-dedup) ── +class MetadataStepInput(BaseModel): + """Input for MetadataExtractionStep.""" + + entity_id: str + entity_name: str + descriptions: List[str] = Field( + default_factory=list, + description="用户实体的 description 列表(可能由分号分隔拆分而来)", + ) + existing_metadata: dict = Field( + default_factory=dict, + description="Neo4j 中已有的元数据,用于增量去重", + ) + + +class MetadataStepOutput(BaseModel): + """Output of MetadataExtractionStep.""" + + core_facts: List[str] = Field(default_factory=list) + traits: List[str] = Field(default_factory=list) + relations: List[str] = Field(default_factory=list) + goals: List[str] = Field(default_factory=list) + interests: List[str] = Field(default_factory=list) + beliefs_or_stances: List[str] = Field(default_factory=list) + anchors: List[str] = Field(default_factory=list) + events: List[str] = Field(default_factory=list) + + def has_any(self) -> bool: + """是否提取到了任何元数据。""" + return any([ + self.core_facts, self.traits, self.relations, self.goals, + self.interests, self.beliefs_or_stances, self.anchors, self.events, + ]) diff --git a/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 b/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 index 1c32d369..35b52f16 100644 --- a/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 +++ b/api/app/core/memory/utils/prompt/prompts/extract_user_metadata.jinja2 @@ -1,140 +1,616 @@ ===Task=== -Extract user metadata changes from the following conversation statements spoken by the user. - {% if language == "zh" %} -**"三度原则"判断标准:** -- 复用度:该信息是否会被多个功能模块使用? -- 约束度:该信息是否会影响系统行为? -- 时效性:该信息是长期稳定的还是临时的?仅提取长期稳定信息。 +你是一个用户画像 metadata 增量提取助手。你的任务是根据输入的用户 `description` 列表,提取值得长期保留、适合挂在“用户节点”下的新增 metadata。 -**提取规则:** -- **只提取关于"用户本人"的画像信息**,忽略用户提到的第三方人物(如朋友、同事、家人)的信息 -- 仅提取文本中明确提到的信息,不要推测 -- **输出语言必须与输入文本的语言一致**(输入中文则输出中文值,输入英文则输出英文值) +你会同时收到: -**增量模式(重要):** -你只需要输出**本次对话引起的变更操作**,不要输出完整的元数据。每个变更是一个对象,包含: -- `field_path`:字段路径,用点号分隔(如 `profile.role`、`profile.expertise`) -- `action`:操作类型 - * `set`:新增或修改一个字段的值 - * `remove`:移除一个字段的值 -- `value`:字段的新值(`action="set"` 时必填,`action="remove"` 时填要移除的元素值) - * 所有字段均为列表类型,每个元素一条变更记录 +- `description`: 一组待分析的描述字符串 +- `existing_metadata`: 用户当前已经存在的 metadata -**判断规则:** -- 用户提到新信息 → `action="set"`,填入新值 -- 用户明确否定已有信息(如"我不再做老师了"、"我已经不学Python了")→ `action="remove"`,`value` 填要移除的元素值 -- 如果本次对话没有任何可提取的变更,返回空的 `metadata_changes` 数组 `[]` -- **不要为未被提及的字段生成任何变更操作** +你的目标不是重建完整 metadata,而是只输出“新增内容”: -{% if existing_metadata %} -**已有元数据(仅供参考,用于判断是否需要变更):** -请对比已有数据和用户最新发言,只输出差异部分的变更操作。 -- 如果用户说的信息和已有数据一致,不需要输出变更 -- 如果用户否定了已有数据中的某个值,输出 `remove` 操作 -- 如果用户提到了新信息,输出 `set` 操作 -{% endif %} +- 只能输出从 `description` 中能够支持的新增 metadata +- 不要重复输出已经出现在 `existing_metadata` 里的内容 +- 不允许修改、重写、删除或纠正已有 metadata +- 所有字段一律输出为字符串数组 + {% else %} + You are an assistant for incremental user metadata extraction. Your task is to extract durable, user-node-level new metadata from the input `description` list. -**字段说明:** -- profile.role:用户的职业或角色(列表),如 教师、医生、后端工程师,一个人可以有多个角色 -- profile.domain:用户所在领域(列表),如 教育、医疗、软件开发,一个人可以涉及多个领域 -- profile.expertise:用户擅长的技能或工具(列表),如 Python、心理咨询、高中物理 -- profile.interests:用户主动表达兴趣的话题或领域标签(列表) +You will receive: -**用户别名变更(增量模式):** -- **aliases_to_add**:本次新发现的用户别名,包括: - * 用户主动自我介绍:如"我叫张三"、"我的名字是XX"、"我的网名是XX" - * 他人对用户的称呼:如"同事叫我陈哥"、"大家叫我小张"、"领导叫我老陈" - * 只提取原文中逐字出现的名字,严禁推测或创造 - * 禁止提取:用户给 AI 取的名字、第三方人物自身的名字、"用户"/"我" 等占位词 - * 如果没有新别名,返回空数组 `[]` -- **aliases_to_remove**:用户明确否认的别名,包括: - * 用户说"我不叫XX了"、"别叫我XX"、"我改名了,不叫XX" → 将 XX 放入此数组 - * **严格限制**:只将用户原文中**逐字提到**的被否认名字放入,不要推断关联的其他别名 - * 如果没有要移除的别名,返回空数组 `[]` -{% if existing_aliases %} -- 已有别名:{{ existing_aliases | tojson }}(仅供参考,不需要在输出中重复) -{% endif %} -{% else %} -**"Three-Degree Principle" criteria:** -- Reusability: Will this information be used by multiple functional modules? -- Constraint: Will this information affect system behavior? -- Timeliness: Is this information long-term stable or temporary? Only extract long-term stable information. +- `description`: a list of descriptions to analyze +- `existing_metadata`: the user's existing metadata -**Extraction rules:** -- **Only extract profile information about the user themselves**, ignore information about third parties (friends, colleagues, family) mentioned by the user -- Only extract information explicitly mentioned in the text, do not speculate -- **Output language must match the input text language** +Your goal is not to rebuild the full metadata. You must output only new metadata: -**Incremental mode (important):** -You should only output **the change operations caused by this conversation**, not the complete metadata. Each change is an object containing: -- `field_path`: Field path separated by dots (e.g. `profile.role`, `profile.expertise`) -- `action`: Operation type - * `set`: Add or update a field value - * `remove`: Remove a field value -- `value`: The new value for the field (required when `action="set"`, for `action="remove"` fill in the element value to remove) - * All fields are list types, one change record per element +- Output only metadata supported by `description` +- Do not repeat anything already present in `existing_metadata` +- Do not modify, rewrite, delete, or correct existing metadata +- Every field must be an array of strings + {% endif %} -**Decision rules:** -- User mentions new information → `action="set"`, fill in the new value -- User explicitly negates existing info (e.g. "I'm no longer a teacher", "I stopped learning Python") → `action="remove"`, `value` is the element to remove -- If this conversation has no extractable changes, return an empty `metadata_changes` array `[]` -- **Do NOT generate any change operations for fields not mentioned in the conversation** +===Inputs=== +{% if language == "zh" %} +输入 JSON 包含以下字段: -{% if existing_metadata %} -**Existing metadata (for reference only, to determine if changes are needed):** -Compare existing data with the user's latest statements, and only output change operations for the differences. -- If the user's statement matches existing data, no change is needed -- If the user negates a value in existing data, output a `remove` operation -- If the user mentions new information, output a `set` operation -{% endif %} +- `description`: 字符串数组,表示关于用户的一组描述 +- `existing_metadata`: 现有 metadata 对象,字段固定为: + - `aliases` + - `core_facts` + - `traits` + - `relations` + - `goals` + - `interests` + - `beliefs_or_stances` + - `anchors` + - `events` + {% else %} + The input JSON contains: +- `description`: an array of strings describing the user +- `existing_metadata`: an existing metadata object with these fixed fields: + - `aliases` + - `core_facts` + - `traits` + - `relations` + - `goals` + - `interests` + - `beliefs_or_stances` + - `anchors` + - `events` + {% endif %} -**Field descriptions:** -- profile.role: User's occupation or role (list), e.g. teacher, doctor, software engineer. A person can have multiple roles -- profile.domain: User's domain (list), e.g. education, healthcare, software development. A person can span multiple domains -- profile.expertise: User's skills or tools (list), e.g. Python, counseling, physics -- profile.interests: Topics or domain tags the user actively expressed interest in (list) +Input JSON: -**User alias changes (incremental mode):** -- **aliases_to_add**: Newly discovered user aliases from this conversation, including: - * User self-introductions: e.g. "I'm John", "My name is XX", "My username is XX" - * How others address the user: e.g. "My colleagues call me Johnny", "People call me Mike" - * Only extract names that appear VERBATIM in the text — never infer or fabricate - * Do NOT extract: names the user gives to the AI, third-party people's own names, placeholder words like "User"/"I" - * If no new aliases, return empty array `[]` -- **aliases_to_remove**: Aliases the user explicitly denies, including: - * User says "Don't call me XX anymore", "I'm not called XX", "I changed my name from XX" → put XX in this array - * **Strict rule**: Only include the exact name the user **verbatim mentions** as denied. Do NOT infer or remove related aliases - * If no aliases to remove, return empty array `[]` -{% if existing_aliases %} -- Existing aliases: {{ existing_aliases | tojson }} (for reference only, do not repeat in output) -{% endif %} -{% endif %} - -===User Statements=== -{% for stmt in statements %} -- {{ stmt }} -{% endfor %} - -{% if existing_metadata %} -===Existing User Metadata=== ```json -{{ existing_metadata | tojson }} +{{ input_json | default("{}") }} ``` + +===Field Definitions=== +{% if language == "zh" %} + +- `aliases` + - 用户的别名、昵称、称呼、英文名、稳定使用的另一个名字 +- `core_facts` + - 用户相对稳定的基础事实,如身份、年龄、国籍、所在地、关系状态、家庭状态、长期背景 +- `traits` + - 用户相对稳定的人格特质、风格、气质、行为倾向 +- `relations` + - 用户与他人/群体/宠物/重要对象之间值得长期记忆的关系 + - 保持字符串格式,可包含多个片段,常见格式如 `对象 | 关系/身份 | 补充信息` +- `goals` + - 用户明确、稳定、值得长期保留的人生目标、长期计划、持续追求 +- `interests` + - 用户稳定的兴趣、偏好、长期爱好 +- `beliefs_or_stances` + - 用户稳定的信念、价值立场、政治/宗教/社会议题立场 +- `anchors` + - 对用户有长期意义的物品、收藏、纪念物、象征物 + - 保持字符串格式,可包含多个片段,常见格式如 `对象 | 来源/关联 | 意义` +- `events` + - 对用户画像有长期价值的个人经历、事件、里程碑 + - 保持字符串格式,可包含多个片段,常见格式如 `事件 | 时间 | 补充说明` + {% else %} +- `aliases` + - aliases, nicknames, stable alternative names, English names, or regular forms of address +- `core_facts` + - stable basic facts such as identity, age, nationality, residence, relationship status, family status, or long-term background +- `traits` + - stable personality traits, style, temperament, or behavioral tendencies +- `relations` + - durable relationships between the user and people/groups/pets/important entities + - keep string format; common pattern: `entity | relation/identity | extra info` +- `goals` + - explicit, stable, long-term goals or ongoing pursuits worth remembering +- `interests` + - stable interests, preferences, or hobbies +- `beliefs_or_stances` + - stable beliefs, values, political/religious/social stances +- `anchors` + - personally meaningful objects, collections, keepsakes, or symbols + - keep string format; common pattern: `object | source/association | meaning` +- `events` + - durable personal experiences, milestones, or events worth preserving + - keep string format; common pattern: `event | time | extra note` + {% endif %} + +===Core Principles=== +{% if language == "zh" %} + +1. 只提取新增内容 + +- 如果某条信息已经在 `existing_metadata` 中出现,不能再次输出 +- 即使 `description` 只是换了一种说法表达已有信息,也不要重复输出 +- 如果只是对已有信息做轻微改写、近义改写、语序调整,也视为重复 + +2. 不修改已有内容 + +- 不要纠正已有 metadata 的措辞 +- 不要补全已有 metadata 的结构 +- 不要把已有 metadata 中的短字符串改写成更长版本后再输出 +- 不要因为 `description` 出现了更精确表达,就把已有内容“升级后重新输出” + +3. 只保留对用户画像有长期价值的信息 + +- 优先提取稳定身份、长期偏好、重要关系、重大目标、长期立场、重要锚点、关键事件 +- 不要提取纯闲聊、瞬时感受、一次性很弱的细节 +- 短暂情绪通常不单独提取,除非它是某个重要事件说明的一部分 + +4. 所有字段都必须是字符串数组 + +- 不允许输出对象数组 +- 不允许输出嵌套结构 +- 不允许把 `events` 拆成 event/time/note 对象 +- 不允许把 `relations` 拆成 subject/relation/object 对象 + +5. 可以保留多段信息在一个字符串里 + +- `relations`、`anchors`、`events` 可以使用 `|` 连接多个片段 +- 只有在确实有助于保留结构时才这样做 +- 不必强行补满固定片段数,宁可简洁准确 + +6. 证据边界 + +- 只能依据 `description` 提取新增 metadata +- `existing_metadata` 只用于去重和分类参考,不是新增内容来源 +- 不要从常识、推测或世界知识补充额外信息 + {% else %} + +1. Extract only new content + +- If something already appears in `existing_metadata`, do not output it again +- If a description merely paraphrases existing metadata, do not output it +- Minor wording changes, synonym swaps, or reordered phrasing still count as duplicates + +2. Do not modify existing content + +- Do not correct wording in existing metadata +- Do not expand existing metadata and re-output it +- Do not upgrade an existing item into a more detailed version and emit it as new + +3. Keep only durable user-profile information + +- Prioritize stable identity, long-term preferences, important relationships, major goals, durable stances, meaningful anchors, and key events +- Exclude casual chatter, fleeting states, and weak one-off details +- Temporary emotions should usually not be extracted unless they are part of an important event description + +4. Every field must be an array of strings + +- No object arrays +- No nested structure +- Do not split `events` into event/time/note objects +- Do not split `relations` into structured triples + +5. Multi-part strings are allowed + +- `relations`, `anchors`, and `events` may use `|` to join parts +- Do this only when it helps preserve useful structure +- Do not force a fixed number of parts + +6. Evidence boundary + +- Extract new metadata only from `description` +- Use `existing_metadata` only for deduplication and category reference +- Do not add unsupported information from world knowledge or inference beyond the text + {% endif %} + +===Deduplication Rules=== +{% if language == "zh" %} + +- 先理解 `description` 想表达的含义,再与 `existing_metadata` 做语义去重 +- 若以下任一情况成立,则视为“已存在”,不要输出: + - 完全相同 + - 近义表达 + - 更长或更短但语义相同 + - 只是把已有多段字符串拆开或重新组合 + - 只是把已有事件/关系中的时间或备注略作改写 +- 去重标准以“是否新增了值得保留的新事实”为准,而不是字面是否完全一致 + +去重示例: + +- 已有 `single`,新描述说 `not in a relationship`,不要输出 +- 已有 `from Sweden`,新描述说 `originally from Sweden`,不要输出 +- 已有 `art`,新描述说 `likes art a lot`,通常不要输出 +- 已有 `Oscar | pet guinea pig`,新描述说 `her guinea pig Oscar`,不要输出 + {% else %} +- First understand the meaning of the description, then deduplicate semantically against `existing_metadata` +- Treat an item as already existing if any of these holds: + - exact match + - close paraphrase + - longer or shorter wording with the same meaning + - just a split or recombination of an existing multi-part string + - a lightly reworded time/note variant of an existing event or relation +- The test is whether the item adds a genuinely new durable fact, not whether the wording is different + {% endif %} + +===Extraction Guidance By Field=== +{% if language == "zh" %} +`aliases` + +- 只收稳定名字,不收临时调侃 +- 职业、身份、评价词不算 alias + +`core_facts` + +- 放稳定基础事实 +- 不要放短暂状态、一次性动作、弱情绪 + +`traits` + +- 只收相对稳定的人格或行为风格 +- 不要因为一次行为就推断 trait + +`relations` + +- 只保留长期关系、有记忆价值的关系 +- 可以写成 `对象 | 关系/身份 | 补充信息` +- 不要收纯一次性互动 + +`goals` + +- 只收长期目标 +- 不要把一时愿望、泛化口号、普通期待当作 goal + +`interests` + +- 只收稳定兴趣 +- 短期尝试一次某事,通常不算 interest + +`beliefs_or_stances` + +- 收稳定信念、价值观、政治/宗教/社会议题立场 +- 不要收普通瞬时意见 + +`anchors` + +- 收具有象征意义、纪念意义、长期陪伴意义的对象 +- 可写来源与意义 + +`events` + +- 只收对用户画像有长期价值的事件或里程碑 +- 优先保留时间信息和事件意义 +- 普通日常小事通常不收,除非它明显揭示重要关系、目标推进或身份背景 + {% else %} + `aliases` +- only stable names, not playful one-off labels +- occupations, identities, and evaluations are not aliases + +`core_facts` + +- keep stable background facts +- exclude temporary states, one-off actions, and weak emotions + +`traits` + +- only relatively stable traits or behavioral style +- do not infer a trait from one isolated action + +`relations` + +- keep durable, memory-worthy relationships +- may use `entity | relation/identity | extra info` +- exclude one-off interactions + +`goals` + +- only long-term goals +- do not treat temporary wishes or generic aspirations as goals + +`interests` + +- only stable interests +- a one-time attempt usually does not qualify + +`beliefs_or_stances` + +- keep stable beliefs, values, or social/political/religious stances +- exclude ordinary fleeting opinions + +`anchors` + +- keep symbolic, commemorative, or personally meaningful objects +- source and meaning may be included + +`events` + +- keep only events or milestones with durable profile value +- preserve time and significance when useful +- exclude ordinary daily trivia unless it clearly advances an important goal, relationship, or identity arc + {% endif %} + +===Output Hard Constraints=== +{% if language == "zh" %} + +- 只输出新增 metadata,不要输出完整 metadata +- 结果必须包含全部 9 个字段 +- 每个字段都必须是数组 +- 即使某字段没有新增内容,也必须输出空数组 +- 每个数组元素必须是字符串 +- 不要输出 `null` +- 不要输出解释文字 +- 不要输出 markdown code fence +- 不要输出字段之外的任何额外键 +- 如果没有任何新增 metadata,也必须返回所有字段都为空数组的 JSON + {% else %} +- Output only new metadata, not the full metadata +- The result must include all 9 fields +- Every field must be an array +- Use empty arrays when there is no new content +- Every array element must be a string +- Do not output `null` +- Do not output explanation text +- Do not wrap the result in markdown code fences +- Do not output any keys beyond the required fields +- If there is no new metadata, still return the full JSON shape with empty arrays + {% endif %} + +===Examples=== +{% if language == "zh" %} +示例 1 +Input: + +- description: + - "She recently started volunteering for a trans youth hotline." +- existing_metadata: + - goals: ["pursue counseling / mental health work for transgender people"] + +Output: +{ + "aliases": [], + "core_facts": [], + "traits": [], + "relations": [], + "goals": [], + "interests": [], + "beliefs_or_stances": [], + "anchors": [], + "events": [ + "started volunteering for a trans youth hotline" + ] +} + +示例 2 +Input: + +- description: + - "She is originally from Sweden." + - "She is not dating anyone right now." +- existing_metadata: + - core_facts: ["from Sweden", "single"] + +Output: +{ + "aliases": [], + "core_facts": [], + "traits": [], + "relations": [], + "goals": [], + "interests": [], + "beliefs_or_stances": [], + "anchors": [], + "events": [] +} + +示例 3 +Input: + +- description: + - "Her sister Mia encouraged her to apply." +- existing_metadata: + - relations: ["grandma | grandmother | from Sweden"] + +Output: +{ + "aliases": [], + "core_facts": [], + "traits": [], + "relations": [ + "Mia | sister" + ], + "goals": [], + "interests": [], + "beliefs_or_stances": [], + "anchors": [], + "events": [] +} + +示例 4 +Input: + +- description: + - "She keeps a journal from her first year after moving." +- existing_metadata: + - anchors: [] + +Output: +{ + "aliases": [], + "core_facts": [], + "traits": [], + "relations": [], + "goals": [], + "interests": [], + "beliefs_or_stances": [], + "anchors": [ + "journal | from first year after moving" + ], + "events": [] +} + +示例 5 +Input: + +- description: + - "Last month she attended a workshop on trauma-informed care and felt it clarified her future direction." +- existing_metadata: + - goals: ["pursue counseling / mental health work for transgender people"] + +Output: +{ + "aliases": [], + "core_facts": [], + "traits": [], + "relations": [], + "goals": [], + "interests": [], + "beliefs_or_stances": [], + "anchors": [], + "events": [ + "attended workshop on trauma-informed care | last month | clarified future direction" + ] +} +{% else %} +Example 1 +Input: + +- description: + - "She recently started volunteering for a trans youth hotline." +- existing_metadata: + - goals: ["pursue counseling / mental health work for transgender people"] + +Output: +{ + "aliases": [], + "core_facts": [], + "traits": [], + "relations": [], + "goals": [], + "interests": [], + "beliefs_or_stances": [], + "anchors": [], + "events": [ + "started volunteering for a trans youth hotline" + ] +} + +Example 2 +Input: + +- description: + - "She is originally from Sweden." + - "She is not dating anyone right now." +- existing_metadata: + - core_facts: ["from Sweden", "single"] + +Output: +{ + "aliases": [], + "core_facts": [], + "traits": [], + "relations": [], + "goals": [], + "interests": [], + "beliefs_or_stances": [], + "anchors": [], + "events": [] +} + +Example 3 +Input: + +- description: + - "Her sister Mia encouraged her to apply." +- existing_metadata: + - relations: ["grandma | grandmother | from Sweden"] + +Output: +{ + "aliases": [], + "core_facts": [], + "traits": [], + "relations": [ + "Mia | sister" + ], + "goals": [], + "interests": [], + "beliefs_or_stances": [], + "anchors": [], + "events": [] +} + +Example 4 +Input: + +- description: + - "She keeps a journal from her first year after moving." +- existing_metadata: + - anchors: [] + +Output: +{ + "aliases": [], + "core_facts": [], + "traits": [], + "relations": [], + "goals": [], + "interests": [], + "beliefs_or_stances": [], + "anchors": [ + "journal | from first year after moving" + ], + "events": [] +} + +Example 5 +Input: + +- description: + - "Last month she attended a workshop on trauma-informed care and felt it clarified her future direction." +- existing_metadata: + - goals: ["pursue counseling / mental health work for transgender people"] + +Output: +{ + "aliases": [], + "core_facts": [], + "traits": [], + "relations": [], + "goals": [], + "interests": [], + "beliefs_or_stances": [], + "anchors": [], + "events": [ + "attended workshop on trauma-informed care | last month | clarified future direction" + ] +} {% endif %} ===Output Format=== -Return a JSON object with the following structure: +{% if language == "zh" %} +输出必须是严格可解析的 JSON 对象,结构固定如下: +{% else %} +Return a strict JSON object with this exact structure: +{% endif %} + ```json { - "metadata_changes": [ - {"field_path": "profile.role", "action": "set", "value": "后端工程师"}, - {"field_path": "profile.expertise", "action": "set", "value": "Python"}, - {"field_path": "profile.expertise", "action": "remove", "value": "Java"} - ], - "aliases_to_add": [], - "aliases_to_remove": [] + "aliases": ["string"], + "core_facts": ["string"], + "traits": ["string"], + "relations": ["string"], + "goals": ["string"], + "interests": ["string"], + "beliefs_or_stances": ["string"], + "anchors": ["string"], + "events": ["string"] } ``` -{{ json_schema }} +{% if language == "zh" %} +JSON 要求: + +- 使用标准 ASCII 双引号 `"` +- 不要使用中文引号 +- 不要在 JSON 外输出任何文字 +- 字符串内如果包含双引号,必须转义为 `\"` +- 不要遗漏字段 +- 不要输出尾逗号 + {% else %} + JSON requirements: +- Use standard ASCII double quotes `"` +- No smart quotes +- Output JSON only +- Escape internal quotes as `\"` +- Do not omit any field +- Do not emit trailing commas + {% endif %} \ No newline at end of file diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 64ee5e24..e3473c08 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -139,6 +139,65 @@ SET e.name = CASE WHEN entity.name IS NOT NULL AND entity.name <> '' THEN entity RETURN e.id AS uuid """ +# ── 元数据增量回写:将 LLM 提取的元数据追加到用户实体节点 ── +ENTITY_METADATA_UPDATE = """ +MATCH (e:ExtractedEntity {id: $entity_id}) +SET e.core_facts = CASE + WHEN $core_facts IS NOT NULL AND size($core_facts) > 0 + THEN reduce(acc = coalesce(e.core_facts, []), item IN $core_facts | + CASE WHEN item IN acc THEN acc ELSE acc + item END) + ELSE coalesce(e.core_facts, []) END, + e.traits = CASE + WHEN $traits IS NOT NULL AND size($traits) > 0 + THEN reduce(acc = coalesce(e.traits, []), item IN $traits | + CASE WHEN item IN acc THEN acc ELSE acc + item END) + ELSE coalesce(e.traits, []) END, + e.relations = CASE + WHEN $relations IS NOT NULL AND size($relations) > 0 + THEN reduce(acc = coalesce(e.relations, []), item IN $relations | + CASE WHEN item IN acc THEN acc ELSE acc + item END) + ELSE coalesce(e.relations, []) END, + e.goals = CASE + WHEN $goals IS NOT NULL AND size($goals) > 0 + THEN reduce(acc = coalesce(e.goals, []), item IN $goals | + CASE WHEN item IN acc THEN acc ELSE acc + item END) + ELSE coalesce(e.goals, []) END, + e.interests = CASE + WHEN $interests IS NOT NULL AND size($interests) > 0 + THEN reduce(acc = coalesce(e.interests, []), item IN $interests | + CASE WHEN item IN acc THEN acc ELSE acc + item END) + ELSE coalesce(e.interests, []) END, + e.beliefs_or_stances = CASE + WHEN $beliefs_or_stances IS NOT NULL AND size($beliefs_or_stances) > 0 + THEN reduce(acc = coalesce(e.beliefs_or_stances, []), item IN $beliefs_or_stances | + CASE WHEN item IN acc THEN acc ELSE acc + item END) + ELSE coalesce(e.beliefs_or_stances, []) END, + e.anchors = CASE + WHEN $anchors IS NOT NULL AND size($anchors) > 0 + THEN reduce(acc = coalesce(e.anchors, []), item IN $anchors | + CASE WHEN item IN acc THEN acc ELSE acc + item END) + ELSE coalesce(e.anchors, []) END, + e.events = CASE + WHEN $events IS NOT NULL AND size($events) > 0 + THEN reduce(acc = coalesce(e.events, []), item IN $events | + CASE WHEN item IN acc THEN acc ELSE acc + item END) + ELSE coalesce(e.events, []) END +RETURN e.id AS uuid +""" + +# ── 查询用户实体已有的元数据(供增量提取时去重) ── +ENTITY_METADATA_QUERY = """ +MATCH (e:ExtractedEntity {id: $entity_id}) +RETURN e.core_facts AS core_facts, + e.traits AS traits, + e.relations AS relations, + e.goals AS goals, + e.interests AS interests, + e.beliefs_or_stances AS beliefs_or_stances, + e.anchors AS anchors, + e.events AS events +""" + # Add back ENTITY_RELATIONSHIP_SAVE to be used by graph_saver.save_entities_and_relationships ENTITY_RELATIONSHIP_SAVE = """ UNWIND $relationships AS rel @@ -1136,6 +1195,56 @@ SET target.aliases = new_aliases, RETURN source.name AS merged_alias, target.name AS target_name, new_aliases AS updated_aliases """ +# 边重定向:将指向别名节点("别名属于"关系的 source)的所有其他边,重定向到用户节点(target)。 +# 处理两类边: +# 1. EXTRACTED_RELATIONSHIP:其他实体 → 别名节点 或 别名节点 → 其他实体 +# 2. STATEMENT_ENTITY:陈述句 → 别名节点 +# 对于每条需要重定向的边,创建一条指向用户节点的新边(复制所有属性),然后删除旧边。 +REDIRECT_ALIAS_EDGES = """ +// 找到所有 别名→用户 的映射 +MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[ar:EXTRACTED_RELATIONSHIP]->(user:ExtractedEntity {end_user_id: $end_user_id}) +WHERE ar.predicate = '别名属于' +WITH collect({alias_id: elementId(alias), user_id: elementId(user), alias_eid: alias.id, user_eid: user.id}) AS mappings + +// 1. 重定向 EXTRACTED_RELATIONSHIP 边:别名节点作为 target 的情况 +UNWIND mappings AS m +MATCH (other)-[r:EXTRACTED_RELATIONSHIP]->(alias:ExtractedEntity {end_user_id: $end_user_id}) +WHERE alias.id = m.alias_eid + AND r.predicate <> '别名属于' + AND other.id <> m.user_eid +WITH m, other, r, alias +MATCH (user:ExtractedEntity {id: m.user_eid, end_user_id: $end_user_id}) +CREATE (other)-[nr:EXTRACTED_RELATIONSHIP]->(user) +SET nr = properties(r) +DELETE r +WITH count(*) AS redirected_incoming + +// 2. 重定向 EXTRACTED_RELATIONSHIP 边:别名节点作为 source 的情况 +MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[ar2:EXTRACTED_RELATIONSHIP]->(user2:ExtractedEntity {end_user_id: $end_user_id}) +WHERE ar2.predicate = '别名属于' +WITH alias, user2, redirected_incoming +MATCH (alias)-[r:EXTRACTED_RELATIONSHIP]->(other) +WHERE r.predicate <> '别名属于' + AND other.id <> user2.id +WITH user2, other, r, redirected_incoming +CREATE (user2)-[nr:EXTRACTED_RELATIONSHIP]->(other) +SET nr = properties(r) +DELETE r +WITH redirected_incoming, count(*) AS redirected_outgoing + +// 3. 重定向 STATEMENT_ENTITY 边:陈述句 → 别名节点 +MATCH (alias:ExtractedEntity {end_user_id: $end_user_id})-[ar3:EXTRACTED_RELATIONSHIP]->(user3:ExtractedEntity {end_user_id: $end_user_id}) +WHERE ar3.predicate = '别名属于' +WITH alias, user3, redirected_incoming, redirected_outgoing +MATCH (stmt)-[r:STATEMENT_ENTITY]->(alias) +WITH user3, stmt, r, redirected_incoming, redirected_outgoing +CREATE (stmt)-[nr:STATEMENT_ENTITY]->(user3) +SET nr = properties(r) +DELETE r + +RETURN redirected_incoming, redirected_outgoing, count(*) AS redirected_stmt +""" + CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING = """ MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) RETURN ( diff --git a/api/app/services/pilot_run_service.py b/api/app/services/pilot_run_service.py index 5c7da40e..122e2181 100644 --- a/api/app/services/pilot_run_service.py +++ b/api/app/services/pilot_run_service.py @@ -12,7 +12,6 @@ from typing import Awaitable, Callable, Optional from app.core.config import settings from app.core.logging_config import get_memory_logger, log_time -from app.core.memory.pipelines.pilot_write_pipeline import PilotWritePipeline from app.core.memory.models.message_models import ( ConversationContext, ConversationMessage, @@ -306,14 +305,11 @@ async def run_pilot_extraction( logger.warning(f"Failed to load ontology types: {e}", exc_info=True) if use_refactored: - pilot_pipeline = PilotWritePipeline( - llm_client=llm_client, - embedder_client=embedder_client, - pipeline_config=get_pipeline_config(memory_config), - progress_callback=progress_callback, - embedding_id=str(memory_config.embedding_model_id), - language=language, - ontology_types=ontology_types, + from app.core.memory.memory_service import MemoryService + + memory_service = MemoryService( + memory_config=memory_config, + end_user_id=str(memory_config.workspace_id), ) log_time("Pilot Pipeline Initialization", time.time() - step_start, log_file) @@ -325,7 +321,11 @@ async def run_pilot_extraction( if progress_callback: await progress_callback("knowledge_extraction", "正在知识抽取...") - pilot_result = await pilot_pipeline.run(chunked_dialogs) + pilot_result = await memory_service.pilot_write( + chunked_dialogs=chunked_dialogs, + language=language, + progress_callback=progress_callback, + ) dialog_data_list = pilot_result.dialog_data_list graph = pilot_result.graph chunk_nodes = graph.chunk_nodes diff --git a/api/app/tasks.py b/api/app/tasks.py index 6b1b6ae2..6497f18d 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1564,9 +1564,201 @@ def extract_emotion_batch_task( _shutdown_loop_gracefully(loop) +@celery_app.task( + bind=True, + name="app.tasks.extract_metadata_batch", + max_retries=2, + default_retry_delay=30, +) +def extract_metadata_batch_task( + self, + user_entities: List[Dict[str, Any]], + llm_model_id: str, + language: str = "zh", + snapshot_dir: Optional[str] = None, +) -> Dict[str, Any]: + """Celery task: 用户实体元数据提取 + Neo4j 回写。 + + 在主写入流水线完成后异步执行。从用户实体的 description 中提取 + 结构化元数据(core_facts、traits、relations 等),增量回写到 Neo4j。 + + Args: + user_entities: 用户实体列表,每项包含: + - entity_id: 实体 ID + - entity_name: 实体名称 + - descriptions: description 文本列表 + llm_model_id: LLM 模型 UUID 字符串 + language: 语言 ("zh" / "en") + snapshot_dir: 可选的快照目录路径(调试模式下使用) + """ + task_id = self.request.id + total = len(user_entities) + logger.info( + f"[Metadata] 开始用户元数据提取: " + f"entities={total}, llm_model_id={llm_model_id}, " + f"language={language}, task_id={task_id}" + ) + start_time = time.time() + + if not user_entities: + return {"status": "SUCCESS", "total": 0, "extracted": 0, "failed": 0, "task_id": task_id} + + async def _run() -> Dict[str, Any]: + from app.core.memory.models.variate_config import ExtractionPipelineConfig + from app.core.memory.storage_services.extraction_engine.steps.base import StepContext + from app.core.memory.storage_services.extraction_engine.steps.metadata_step import MetadataExtractionStep + from app.core.memory.storage_services.extraction_engine.steps.schema import ( + MetadataStepInput, + ) + from app.core.memory.utils.llm.llm_utils import MemoryClientFactory + from app.db import get_db_context + from app.repositories.neo4j.neo4j_connector import Neo4jConnector + from app.repositories.neo4j.cypher_queries import ENTITY_METADATA_UPDATE, ENTITY_METADATA_QUERY + + # Build LLM client + with get_db_context() as db: + factory = MemoryClientFactory(db) + llm_client = factory.get_llm_client(llm_model_id) + + pipeline_config = ExtractionPipelineConfig() + context = StepContext( + llm_client=llm_client, + language=language, + config=pipeline_config, + ) + step = MetadataExtractionStep(context) + + extracted = 0 + failed = 0 + snapshot_outputs: Dict[str, Any] = {} if snapshot_dir else None # type: ignore[assignment] + + connector = Neo4jConnector() + try: + for entity_dict in user_entities: + entity_id = entity_dict["entity_id"] + entity_name = entity_dict.get("entity_name", "") + descriptions = entity_dict.get("descriptions", []) + + if not descriptions: + logger.debug(f"[Metadata] 跳过无 description 的实体: {entity_id}") + continue + + try: + # 查询已有元数据用于增量去重 + existing_metadata = {} + try: + records = await connector.execute_query( + ENTITY_METADATA_QUERY, entity_id=entity_id + ) + if records: + rec = records[0] + for field in ( + "core_facts", "traits", "relations", "goals", + "interests", "beliefs_or_stances", "anchors", "events", + ): + val = rec.get(field) + existing_metadata[field] = val if val else [] + except Exception as e: + logger.warning(f"[Metadata] 查询已有元数据失败: {e}") + + inp = MetadataStepInput( + entity_id=entity_id, + entity_name=entity_name, + descriptions=descriptions, + existing_metadata=existing_metadata, + ) + result = await step.run(inp) + + if result.has_any(): + # 回写 Neo4j + await connector.execute_query( + ENTITY_METADATA_UPDATE, + entity_id=entity_id, + core_facts=result.core_facts, + traits=result.traits, + relations=result.relations, + goals=result.goals, + interests=result.interests, + beliefs_or_stances=result.beliefs_or_stances, + anchors=result.anchors, + events=result.events, + ) + extracted += 1 + logger.info( + f"[Metadata] 实体 {entity_name}({entity_id}) 元数据提取并回写成功" + ) + else: + logger.debug( + f"[Metadata] 实体 {entity_name}({entity_id}) 无新增元数据" + ) + + if snapshot_outputs is not None: + snapshot_outputs[entity_id] = { + "entity_name": entity_name, + "descriptions": descriptions, + "extracted_metadata": result.model_dump(), + } + + except Exception as e: + failed += 1 + if snapshot_outputs is not None: + snapshot_outputs[entity_id] = {"error": str(e)} + logger.warning( + f"[Metadata] 实体 {entity_id} 元数据提取失败: {e}" + ) + finally: + await connector.close() + + # 快照落盘 + if snapshot_outputs is not None and snapshot_dir: + try: + from pathlib import Path as _Path + import json as _json + + _dir = _Path(snapshot_dir) + _dir.mkdir(parents=True, exist_ok=True) + _path = _dir / "8_metadata_outputs.json" + with open(_path, "w", encoding="utf-8") as _f: + _json.dump(snapshot_outputs, _f, ensure_ascii=False, indent=2, default=str) + logger.info( + f"[Metadata][Snapshot] 已落盘 {len(snapshot_outputs)} 条元数据结果 → {_path}" + ) + except Exception as _e: + logger.warning( + f"[Metadata][Snapshot] 快照落盘失败(不影响主流程): {_e}" + ) + + return {"extracted": extracted, "failed": failed} + + loop = None + try: + loop = set_asyncio_event_loop() + result = loop.run_until_complete(_run()) + elapsed = time.time() - start_time + logger.info( + f"[Metadata] 任务完成: 提取={result['extracted']}, " + f"失败={result['failed']}, 耗时={elapsed:.2f}s, task_id={task_id}" + ) + return { + "status": "SUCCESS", + "total": total, + **result, + "elapsed_time": elapsed, + "task_id": task_id, + } + except Exception as e: + elapsed = time.time() - start_time + logger.error( + f"[Metadata] 任务失败: {e}, 耗时={elapsed:.2f}s", + exc_info=True, + ) + raise self.retry(exc=e) + finally: + if loop: + _shutdown_loop_gracefully(loop) + + # unused task -# @celery_app.task(name="app.core.memory.agent.health.check_read_service") -# def check_read_service_task() -> Dict[str, str]: # """Call read_service and write latest status to Redis. # Returns status data dict that gets written to Redis. @@ -3222,299 +3414,4 @@ def init_community_clustering_for_users(self, end_user_ids: List[str], workspace # ─── User Metadata Extraction Task ─────────────────────────────────────────── -def _update_timestamps(existing: dict, new: dict, updated_at: dict, now: str, prefix: str = "") -> None: - """对比新旧元数据,更新变更字段的 _updated_at 时间戳。""" - for key, new_val in new.items(): - if key == "_updated_at": - continue - path = f"{prefix}.{key}" if prefix else key - old_val = existing.get(key) - - if isinstance(new_val, dict) and isinstance(old_val, dict): - _update_timestamps(old_val, new_val, updated_at, now, prefix=path) - elif old_val != new_val: - updated_at[path] = now - -@celery_app.task( - bind=True, - name='app.tasks.extract_user_metadata', - ignore_result=False, - max_retries=0, - acks_late=True, - time_limit=300, - soft_time_limit=240, -) -def extract_user_metadata_task( - self, - end_user_id: str, - statements: List[str], - config_id: Optional[str] = None, - language: str = "zh", -) -> Dict[str, Any]: - """异步提取用户元数据并写入数据库。 - - 在去重消歧完成后由编排器触发,使用独立 LLM 调用提取元数据。 - LLM 配置优先使用 config_id 对应的应用配置,失败时回退到工作空间默认配置。 - - Args: - end_user_id: 终端用户 ID - statements: 用户相关的 statement 文本列表 - config_id: 应用配置 ID(可选) - language: 语言类型 ("zh" 中文, "en" 英文) - - Returns: - 包含任务执行结果的字典 - """ - start_time = time.time() - logger.info( - f"[CELERY METADATA] Starting metadata extraction - end_user_id={end_user_id}, " - f"statements_count={len(statements)}, config_id={config_id}, language={language}" - ) - - async def _run() -> Dict[str, Any]: - from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import MetadataExtractor - from app.repositories.end_user_info_repository import EndUserInfoRepository - from app.repositories.end_user_repository import EndUserRepository - from app.services.memory_config_service import MemoryConfigService - - # 1. 获取 LLM 配置(应用配置 → 工作空间配置兜底)并创建 LLM client - with get_db_context() as db: - end_user_uuid = uuid.UUID(end_user_id) - - # 获取 workspace_id from end_user - end_user = EndUserRepository(db).get_by_id(end_user_uuid) - if not end_user: - return {"status": "FAILURE", "error": f"End user not found: {end_user_id}"} - - workspace_id = end_user.workspace_id - - config_service = MemoryConfigService(db) - memory_config = config_service.get_config_with_fallback( - memory_config_id=uuid.UUID(config_id) if config_id else None, - workspace_id=workspace_id, - ) - if not memory_config: - return {"status": "FAILURE", "error": "No LLM config available (app + workspace fallback failed)"} - - # 2. 创建 LLM client - from app.core.memory.utils.llm.llm_utils import MemoryClientFactory - factory = MemoryClientFactory(db) - if not memory_config.llm_id: - return {"status": "FAILURE", "error": "Memory config has no LLM model configured"} - llm_client = factory.get_llm_client(memory_config.llm_id) - - # 2.5 读取已有元数据和别名,传给 extractor 作为上下文 - existing_metadata = None - existing_aliases = None - try: - info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid) - if info: - if info.meta_data: - existing_metadata = info.meta_data - existing_aliases = info.aliases if info.aliases else [] - logger.info(f"[CELERY METADATA] 已读取已有元数据和别名(aliases={existing_aliases})") - except Exception as e: - logger.warning(f"[CELERY METADATA] 读取已有数据失败(继续无上下文提取): {e}") - - # 3. 提取元数据和别名(传入已有数据作为上下文) - extractor = MetadataExtractor(llm_client=llm_client, language=language) - extract_result = await extractor.extract_metadata( - statements, - existing_metadata=existing_metadata, - existing_aliases=existing_aliases, - ) - - if not extract_result: - logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}") - return {"status": "SUCCESS", "result": "no_metadata_extracted"} - - metadata_changes, aliases_to_add, aliases_to_remove = extract_result - logger.info( - f"[CELERY METADATA] LLM 元数据变更: {[c.model_dump() for c in metadata_changes]}, " - f"别名新增: {aliases_to_add}, 移除: {aliases_to_remove}" - ) - - from datetime import datetime as dt, timezone as tz - now = dt.now(tz.utc).isoformat() - - # 过滤别名中的占位名称,执行增量增删 - _PLACEHOLDER_NAMES = {"用户", "我", "user", "i"} - - def _filter_aliases(aliases_list): - seen = set() - result = [] - for a in aliases_list: - a_stripped = a.strip() - if a_stripped and a_stripped.lower() not in _PLACEHOLDER_NAMES and a_stripped.lower() not in seen: - result.append(a_stripped) - seen.add(a_stripped.lower()) - return result - - filtered_add = _filter_aliases(aliases_to_add) - filtered_remove = _filter_aliases(aliases_to_remove) - remove_lower = {a.lower() for a in filtered_remove} - - with get_db_context() as db: - end_user_uuid = uuid.UUID(end_user_id) - info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid) - end_user = EndUserRepository(db).get_by_id(end_user_uuid) - - if info: - # 4. 元数据增量更新(按 LLM 输出的变更操作逐条执行,所有字段均为列表类型) - if metadata_changes: - # 深拷贝,确保 SQLAlchemy 能检测到变更 - import copy - existing_meta = copy.deepcopy(info.meta_data) if info.meta_data else {} - updated_at = dict(existing_meta.get("_updated_at", {})) - - for change in metadata_changes: - field_path = change.field_path - action = change.action - value = change.value - - if not value or not value.strip(): - continue - - # 定位到目标字段的父级节点 - parts = field_path.split(".") - target = existing_meta - for part in parts[:-1]: - target = target.setdefault(part, {}) - leaf = parts[-1] - - current_list = target.get(leaf, []) - - if action == "set": - if value not in current_list: - # 新值插入列表头部,保证按时间从新到旧排序 - current_list.insert(0, value) - target[leaf] = current_list - logger.info(f"[CELERY METADATA] set {field_path} = {value}") - - elif action == "remove": - if value in current_list: - current_list.remove(value) - target[leaf] = current_list - logger.info(f"[CELERY METADATA] remove {value} from {field_path}") - - updated_at[field_path] = now - - existing_meta["_updated_at"] = updated_at - # 赋值深拷贝后的新对象,SQLAlchemy 会检测到字段变更并写入 - info.meta_data = existing_meta - logger.info(f"[CELERY METADATA] 增量更新元数据完成: {json.dumps(existing_meta, ensure_ascii=False)}") - - # 别名增量增删:(已有 - remove) + add - old_aliases = info.aliases if info.aliases else [] - # 先移除 - merged = [a for a in old_aliases if a.strip().lower() not in remove_lower] - # 再追加(去重) - existing_lower = {a.strip().lower() for a in merged} - for a in filtered_add: - if a.lower() not in existing_lower: - merged.append(a) - existing_lower.add(a.lower()) - - if merged != old_aliases: - info.aliases = merged - # other_name 更新逻辑 - if merged and ( - not info.other_name - or info.other_name.strip().lower() in _PLACEHOLDER_NAMES - or info.other_name.strip().lower() in remove_lower - ): - info.other_name = merged[0] - if end_user and merged and ( - not end_user.other_name - or end_user.other_name.strip().lower() in _PLACEHOLDER_NAMES - or end_user.other_name.strip().lower() in remove_lower - ): - end_user.other_name = merged[0] - logger.info( - f"[CELERY METADATA] 别名增量更新: {old_aliases} - {filtered_remove} + {filtered_add} → {merged}" - ) - else: - # 没有 end_user_info 记录,创建一条 - from app.models.end_user_info_model import EndUserInfo - initial_aliases = filtered_add # 新记录只有 add,没有 remove - first_alias = initial_aliases[0] if initial_aliases else "" - - # 从变更操作构建初始元数据(所有字段均为列表类型) - initial_meta = {} - for change in metadata_changes: - if change.action == "set" and change.value is not None and change.value.strip(): - parts = change.field_path.split(".") - target = initial_meta - for part in parts[:-1]: - target = target.setdefault(part, {}) - leaf = parts[-1] - current_list = target.get(leaf, []) - if change.value not in current_list: - # 新值插入列表头部,保证按时间从新到旧排序 - current_list.insert(0, change.value) - target[leaf] = current_list - - if first_alias or initial_meta: - new_info = EndUserInfo( - end_user_id=end_user_uuid, - other_name=first_alias or "", - aliases=initial_aliases, - meta_data=initial_meta if initial_meta else None, - ) - db.add(new_info) - if end_user and first_alias and ( - not end_user.other_name or end_user.other_name.strip().lower() in _PLACEHOLDER_NAMES - ): - end_user.other_name = first_alias - logger.info(f"[CELERY METADATA] 创建 end_user_info: other_name={first_alias}, aliases={initial_aliases}") - else: - return {"status": "SUCCESS", "result": "no_data_to_write"} - - db.commit() - - # 同步 PgSQL aliases 到 Neo4j 用户实体(PgSQL 为权威源) - final_aliases = info.aliases if info else initial_aliases - if final_aliases: - try: - from app.repositories.neo4j.neo4j_connector import Neo4jConnector - neo4j_connector = Neo4jConnector() - cypher = """ - MATCH (e:ExtractedEntity) - WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '我', 'User', 'I'] - SET e.aliases = $aliases - """ - await neo4j_connector.execute_query( - cypher, end_user_id=end_user_id, aliases=final_aliases - ) - await neo4j_connector.close() - logger.info(f"[CELERY METADATA] Neo4j 用户实体 aliases 已同步: {final_aliases}") - except Exception as neo4j_err: - logger.warning(f"[CELERY METADATA] Neo4j aliases 同步失败(不影响主流程): {neo4j_err}") - - return {"status": "SUCCESS", "result": "metadata_and_aliases_written"} - - loop = None - try: - loop = set_asyncio_event_loop() - result = loop.run_until_complete(_run()) - elapsed = time.time() - start_time - result["elapsed_time"] = elapsed - result["task_id"] = self.request.id - logger.info(f"[CELERY METADATA] Task completed - elapsed={elapsed:.2f}s, result={result.get('result')}") - return result - - except Exception as e: - elapsed = time.time() - start_time - logger.error(f"[CELERY METADATA] Task failed - elapsed={elapsed:.2f}s, error={e}", exc_info=True) - return { - "status": "FAILURE", - "error": str(e), - "elapsed_time": elapsed, - "task_id": self.request.id, - } - finally: - if loop: - _shutdown_loop_gracefully(loop) - - # unused task \ No newline at end of file