[commit] Refactor write pipeline

This commit is contained in:
lanceyq
2026-05-06 18:44:44 +08:00
parent 9dc9b7aee7
commit 6419dcd932
8 changed files with 445 additions and 28695 deletions

View File

@@ -21,7 +21,7 @@ from app.dependencies import cur_workspace_access_guard, get_current_user
from app.models import ModelApiKey
from app.models.user_model import User
from app.repositories import knowledge_repository
from app.schemas.memory_agent_schema import UserInput, Write_UserInput
from app.schemas.memory_agent_schema import StorageType, UserInput, Write_UserInput, WriteMemoryRequest
from app.schemas.response_schema import ApiResponse
from app.services import task_service, workspace_service
from app.services.memory_agent_service import MemoryAgentService
@@ -862,7 +862,7 @@ async def get_end_user_connected_config(
包含 memory_config_id 和相关信息的响应
"""
api_logger.info(f"Getting connected config for end_user: {end_user_id}")
api_logger.info(f"Getting connected config for end_user_id: {end_user_id}")
try:
result = get_config(end_user_id, db)

View File

@@ -301,11 +301,11 @@ class Settings:
# Prompt 中最大类型数量
MAX_ONTOLOGY_TYPES_IN_PROMPT: int = int(os.getenv("MAX_ONTOLOGY_TYPES_IN_PROMPT", "50"))
# 核心通用类型列表(逗号分隔)
# 核心通用类型列表(逗号分隔)—— 与 ontology.md Entity Ontology 保持一致的 13 类
CORE_GENERAL_TYPES: str = os.getenv(
"CORE_GENERAL_TYPES",
"Person,Organization,Company,GovernmentAgency,Place,Location,City,Country,Building,"
"Event,SportsEvent,SocialEvent,Work,Book,Film,Software,Concept,TopicalConcept,AcademicSubject"
"人物,组织,群体,角色职业,地点设施,物品设备,软件平台,识别联系信息,"
"文档媒体,知识能力,偏好习惯,具体目标,称呼别名"
)
# 实验模式开关(允许通过 API 动态切换本体配置)

View File

@@ -135,12 +135,12 @@ async def term_memory_save(end_user_id, strategy_type, scope):
chunk_data = data[:scope]
if len(chunk_data) == scope:
repo.upsert(end_user_id, chunk_data)
logger.info(f'---------写入短长期-----------')
logger.info('---------写入短长期-----------')
else:
long_time_data = write_store.find_user_recent_sessions(end_user_id, 5)
long_messages = await messages_parse(long_time_data)
repo.upsert(end_user_id, long_messages)
logger.info(f'写入短长期:')
logger.info('写入短长期:')
async def window_dialogue(end_user_id, langchain_messages, memory_config, scope):

File diff suppressed because it is too large Load Diff

View File

@@ -23,15 +23,12 @@ from app.core.memory.models.ontology_extraction_models import OntologyTypeInfo,
logger = logging.getLogger(__name__)
# 默认核心通用类型
# 默认核心通用类型 —— 与 ontology.md Entity Ontology 对齐的 13 类
DEFAULT_CORE_GENERAL_TYPES: Set[str] = {
"Person", "Organization", "Company", "GovernmentAgency",
"Place", "Location", "City", "Country", "Building",
"Event", "SportsEvent", "MusicEvent", "SocialEvent",
"Work", "Book", "Film", "Software", "Album",
"Concept", "TopicalConcept", "AcademicSubject",
"Device", "Food", "Drug", "ChemicalSubstance",
"TimePeriod", "Year",
"人物", "组织", "群体", "角色职业",
"地点设施", "物品设备", "软件平台", "识别联系信息",
"文档媒体", "知识能力", "偏好习惯", "具体目标",
"称呼别名",
}
@@ -129,10 +126,12 @@ class OntologyTypeMerger:
if type_name not in seen_names and remaining_slots > 0:
general_type = self.general_registry.get_type(type_name)
if general_type:
# 优先使用 rdfs:comment完整定义其次才是 label
# 对中文 13 类本体label 与 class_name 相同,单独展示无增益。
description = (
general_type.labels.get("zh") or
general_type.description or
general_type.get_label("en") or
general_type.description or
general_type.labels.get("zh") or
general_type.get_label("en") or
type_name
)
core_types_added.append(OntologyTypeInfo(
@@ -157,8 +156,8 @@ class OntologyTypeMerger:
parent_type = self.general_registry.get_type(parent_name)
if parent_type:
description = (
parent_type.labels.get("zh") or
parent_type.description or
parent_type.description or
parent_type.labels.get("zh") or
parent_name
)
related_types_added.append(OntologyTypeInfo(

View File

@@ -0,0 +1,120 @@
"""Pipeline stage snapshot — dump each extraction stage's output to JSON for comparison.
Usage:
snapshot = PipelineSnapshot("legacy") # or "new"
snapshot.save_stage("1_statements", data)
snapshot.save_stage("2_triplets", data)
...
Output structure:
logs/memory-output/snapshots/
legacy_20260422_123456/
1_statements.json
2_triplets.json
3_nodes_edges.json
4_dedup.json
new_20260422_123500/
1_statements.json
2_triplets.json
3_nodes_edges.json
4_dedup.json
Controlled by env var PIPELINE_SNAPSHOT_ENABLED (default: false).
"""
from __future__ import annotations
import json
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
_ENABLED: Optional[bool] = None
def _is_enabled() -> bool:
global _ENABLED
if _ENABLED is None:
_ENABLED = os.getenv("PIPELINE_SNAPSHOT_ENABLED", "false").lower() == "true"
return _ENABLED
def _safe_serialize(obj: Any) -> Any:
"""Convert objects to JSON-serializable form."""
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, (list, tuple)):
return [_safe_serialize(item) for item in obj]
if isinstance(obj, dict):
return {str(k): _safe_serialize(v) for k, v in obj.items()}
if hasattr(obj, "model_dump"):
return obj.model_dump()
if hasattr(obj, "__dataclass_fields__"):
from dataclasses import asdict
return asdict(obj)
if hasattr(obj, "__dict__"):
return {k: _safe_serialize(v) for k, v in obj.__dict__.items()
if not k.startswith("_")}
return str(obj)
class PipelineSnapshot:
"""Dump each pipeline stage's output to a timestamped directory."""
def __init__(self, pipeline_name: str):
"""
Args:
pipeline_name: "legacy" or "new", used as directory prefix.
"""
self.enabled = _is_enabled()
self.pipeline_name = pipeline_name
self._dir: Optional[Path] = None
if self.enabled:
from app.core.config import settings
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
self._dir = Path(settings.MEMORY_OUTPUT_DIR) / "snapshots" / f"{pipeline_name}_{ts}"
self._dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"[Snapshot] 已启用,输出目录: {self._dir}")
@property
def directory(self) -> Optional[str]:
"""Absolute path (str) of this snapshot's output directory, or None when disabled."""
return str(self._dir) if self._dir is not None else None
def save_stage(self, stage_name: str, data: Any) -> None:
"""Save a stage's output as JSON.
Args:
stage_name: e.g. "1_statements", "2_triplets"
data: Any serializable data (Pydantic models, dicts, lists, dataclasses)
"""
if not self.enabled or self._dir is None:
return
try:
path = self._dir / f"{stage_name}.json"
serialized = _safe_serialize(data)
with open(path, "w", encoding="utf-8") as f:
json.dump(serialized, f, ensure_ascii=False, indent=2, default=str)
logger.debug(f"[Snapshot] {stage_name}{path}")
except Exception as e:
logger.warning(f"[Snapshot] 保存 {stage_name} 失败: {e}")
def save_summary(self, stats: Dict[str, Any]) -> None:
"""Save a summary with pipeline metadata and stats."""
if not self.enabled or self._dir is None:
return
summary = {
"pipeline": self.pipeline_name,
"timestamp": datetime.now().isoformat(),
"stats": stats,
}
self.save_stage("0_summary", summary)

View File

@@ -0,0 +1,179 @@
"""WriteSnapshotRecorder — 写入流水线快照记录器。
将 WritePipeline 中所有 snapshot 序列化逻辑集中到此模块,
让 Pipeline 只做编排,不关心调试输出的数据格式。
Pipeline 侧调用示例:
recorder = WriteSnapshotRecorder()
recorder.record_stage_outputs(orchestrator.last_stage_outputs)
recorder.record_graph_before_dedup(graph)
recorder.record_dedup_result(dedup_result)
recorder.record_summary(extraction_result.stats)
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from app.core.memory.utils.debug.pipeline_snapshot import PipelineSnapshot
logger = logging.getLogger(__name__)
class WriteSnapshotRecorder:
"""写入流水线各阶段的快照记录器。
内部持有一个 PipelineSnapshot 实例,对外暴露语义化方法,
每个方法对应流水线中的一个可观测阶段。
当 PIPELINE_SNAPSHOT_ENABLED=false 时所有方法均为空操作no-op
"""
def __init__(self, pipeline_name: str = "new"):
self._snapshot = PipelineSnapshot(pipeline_name)
# ── 属性 ──
@property
def enabled(self) -> bool:
return self._snapshot.enabled
@property
def snapshot_dir(self) -> Optional[str]:
"""快照输出目录的绝对路径,未启用时返回 None。"""
return self._snapshot.directory
@property
def snapshot(self) -> PipelineSnapshot:
"""暴露底层 PipelineSnapshot供需要直接传递的场景使用如 SemanticPruner"""
return self._snapshot
# ── Stage 2-5: 萃取阶段各步骤输出 ──
def record_stage_outputs(self, stage_outputs: Optional[Dict[str, Any]]) -> None:
"""记录 NewExtractionOrchestrator 各步骤的输出。
对应原 write_pipeline._extract() 中 stage_outputs 的序列化逻辑,
包括 statement / triplet / emotion / embedding 四个阶段。
"""
if not stage_outputs:
return
self._record_statements(stage_outputs.get("statement_results", {}))
self._record_triplets(stage_outputs.get("triplet_results", {}))
self._record_emotions(stage_outputs.get("emotion_results", {}))
self._record_embeddings(stage_outputs.get("embedding_output"))
# ── Stage 6: 图构建(去重前) ──
def record_graph_before_dedup(self, graph: Any) -> None:
"""记录 build_graph_nodes_and_edges 的输出(去重前)。"""
self._snapshot.save_stage(
"6_nodes_edges_before_dedup",
{
"dialogue_nodes_count": len(graph.dialogue_nodes),
"chunk_nodes_count": len(graph.chunk_nodes),
"statement_nodes_count": len(graph.statement_nodes),
"entity_nodes": [
{
"id": e.id,
"name": e.name,
"entity_type": e.entity_type,
"description": e.description,
}
for e in graph.entity_nodes
],
"entity_entity_edges": [
{
"source": e.source,
"target": e.target,
"relation_type": e.relation_type,
"statement": e.statement,
}
for e in graph.entity_entity_edges
],
"stmt_entity_edges_count": len(graph.stmt_entity_edges),
},
)
# ── Stage 7: 去重后 ──
def record_dedup_result(self, dedup_result: Any) -> None:
"""记录两阶段去重消歧后的实体和关系。"""
self._snapshot.save_stage(
"7_after_dedup",
{
"entity_nodes": [
{
"id": e.id,
"name": e.name,
"entity_type": e.entity_type,
"description": e.description,
}
for e in dedup_result.entity_nodes
],
"entity_entity_edges": [
{
"source": e.source,
"target": e.target,
"relation_type": e.relation_type,
"statement": e.statement,
}
for e in dedup_result.entity_entity_edges
],
},
)
# ── Stage 0: 汇总 ──
def record_summary(self, stats: Dict[str, int]) -> None:
"""记录流水线最终统计摘要。"""
self._snapshot.save_summary(stats)
# ── 内部方法 ──
def _record_statements(self, stmt_results: Dict) -> None:
snapshot_data: List[Dict] = []
for _did, chunk_stmts in stmt_results.items():
for _cid, stmts in chunk_stmts.items():
for s in stmts:
snapshot_data.append(s.model_dump())
self._snapshot.save_stage("2_statement_outputs", snapshot_data)
def _record_triplets(self, triplet_results: Dict) -> None:
snapshot_data: Dict[str, Any] = {}
for _did, stmt_triplets in triplet_results.items():
for stmt_id, t_out in stmt_triplets.items():
snapshot_data[stmt_id] = t_out.model_dump()
self._snapshot.save_stage("3_triplet_outputs", snapshot_data)
def _record_emotions(self, emotion_results: Dict) -> None:
snapshot_data: Dict[str, Any] = {}
for stmt_id, emo in emotion_results.items():
if hasattr(emo, "model_dump"):
snapshot_data[stmt_id] = emo.model_dump()
self._snapshot.save_stage("4_emotion_outputs", snapshot_data)
def _record_embeddings(self, emb_output: Any) -> None:
if not emb_output or not hasattr(emb_output, "model_dump"):
return
emb_data = emb_output.model_dump()
# 截断向量,只保留前 5 维用于调试
for key in ("statement_embeddings", "chunk_embeddings", "entity_embeddings"):
if key in emb_data and isinstance(emb_data[key], dict):
emb_data[key] = {
k: v[:5] if isinstance(v, list) else v
for k, v in emb_data[key].items()
}
if "dialog_embeddings" in emb_data and isinstance(
emb_data["dialog_embeddings"], list
):
emb_data["dialog_embeddings"] = [
v[:5] if isinstance(v, list) else v
for v in emb_data["dialog_embeddings"]
]
self._snapshot.save_stage("5_embedding_outputs", emb_data)

View File

@@ -155,12 +155,6 @@ Primary statement to analyze:
- negative_examples: `他们`、`一些人`、`一个朋友`
- notes: 只用于边界相对稳定的人群;边界不稳或 unresolved 的表达不要归入 `群体`。
- `智能体`
- definition: 具有行动、交互或执行能力的非人主体如机器人、AI 或其他智慧体。
- positive_examples: `机器人查票员`、`家务机器人`、`智能助手`
- negative_examples: `手机`、`电脑`、`机器人公司`
- notes: 如果对象只是普通设备,不归入 `智能体`;只有在叙述中被当作主体行动或交互时才使用。
- `角色职业`
- definition: 人物承担的社会角色、功能身份或职业身份。
- positive_examples: `导师`、`老师`、`学生`、`医生`、`程序员`
@@ -175,39 +169,21 @@ Primary statement to analyze:
- `物品设备`
- definition: 可被持有、使用、携带的具体物体、设备、工具或交通工具。
- positive_examples: `手机`、`电脑`、`相机`、`自行车`
- positive_examples: `手机`、`电脑`、`相机`、`自行车`、`机器人查票员`、`智能助手`
- negative_examples: `微信`、`GitHub`、`会员服务`
- notes: 交通工具当前并入此类;数字服务不归入本类。
- `产品服务`
- definition: 可被购买、使用、消费或订阅的产品或服务。
- positive_examples: `iPhone`、`健身课`、`会员服务`
- negative_examples: `微信`、`GitHub`、`手机`
- notes: 具体商品和服务当前一级合并;纯软件平台优先归入 `软件平台`。
- notes: 交通工具当前并入此类;数字服务不归入本类。极简版中,原本可单列为 `智能体` 的非人行动主体也暂并入本类。
- `软件平台`
- definition: 软件、应用、网站、在线平台或数字服务系统。
- positive_examples: `微信`、`GitHub`、`ChatGPT`、`飞书`
- negative_examples: `iPhone`、`会员服务`、`手机号`
- notes: 软件、网站、平台当前一级合并;如果语境强调的是账号本身,改用 `账号`。
- negative_examples: `iPhone`、`手机号`、`邮箱`
- notes: 软件、网站、平台当前一级合并;如果语境强调的是登录、识别或联系信息本身,改用 `识别联系信息`。
- `账号`
- definition: 账户、账号、用户档案类实体
- positive_examples: `GitHub账号`、`微信号`
- negative_examples: `用户名`、`工号`、`邮箱`
- notes: 与 `标识符`、`联系方式` 分开;账号是主体可持有的账户对象
- `标识符`
- definition: 用于识别实体的编号、ID、用户名、学号、工号等标识。
- positive_examples: `学号`、`工号`、`用户名`
- negative_examples: `GitHub账号`、`手机号`
- notes: 当前允许保留,但通常只有在存在明确识别关系时才值得抽取。
- `联系方式`
- definition: 可用于联系实体的电话、邮箱、社交联系地址。
- positive_examples: `手机号`、`邮箱`、`微信联系方式`
- negative_examples: `用户名`、`GitHub账号`
- notes: 当前允许保留,但通常只有在存在明确联系关系时才值得抽取。
- `识别联系信息`
- definition: 账号、用户名、编号、邮箱、手机号等与识别、联系或登录相关的信息对象
- positive_examples: `GitHub账号`、`微信号`、`学号`、`工号`、`用户名`、`手机号`、`邮箱`
- negative_examples: `微信`、`张三`、`简历`
- notes: 极简版中将原先的 `账号`、`标识符`、`联系方式` 合并为一类;如后续需要更细的图谱结构,可在下一层再拆分
- `文档媒体`
- definition: 文章、报告、表格、图片、音频、视频等内容载体。
@@ -221,11 +197,17 @@ Primary statement to analyze:
- negative_examples: `紧张`、`成功`、`意义`
- notes: 不包含情绪、心理状态、抽象结果或价值判断;这些应写入 `description`。
- `偏好习惯目标`
- definition: 用户稳定的偏好、重复习惯,以及具体、明确、用户特异且值得长期保留的目标
- positive_examples: `喜欢安静环境`、`晨跑`、`通过雅思`
- negative_examples: `紧张`、`开心`、`成功`、`回报`
- notes: 这是高风险类型;只允许稳定偏好重复习惯、具体目标,不允许抽象愿望或情绪状态
- `偏好习惯`
- definition: 用户稳定的偏好、重复习惯或长期行为倾向
- positive_examples: `喜欢安静环境`、`晨跑`、`偏好黑咖啡`、`每天记笔记`
- negative_examples: `紧张`、`开心`、`成功`、`通过雅思`
- notes: 只保留稳定偏好重复习惯,不包含情绪状态,也不包含具体目标结果
- `具体目标`
- definition: 用户具体、明确、可验证、可长期追踪的目标结果或目标性安排。
- positive_examples: `通过雅思`、`完成毕业论文`、`每周读两篇论文`
- negative_examples: `成功`、`更有意义`、`迎接新的挑战`、`紧张`
- notes: 只保留具体目标,不保留宽泛愿望、抽象追求或价值判断;宽泛内容应写入 `description`。
- `称呼别名`
- definition: 用于指代或称呼实体的名字。
@@ -244,8 +226,8 @@ Primary statement to analyze:
- 优先保留对用户画像、偏好、长期身份、稳定关系或持续兴趣有记忆价值的实体类型。
- 对于“努力”“回报”“意义”“成功”这类泛化概念、抽象命题片段或价值判断,默认不要仅因句中出现就创建实体。
- `群体` 只用于边界相对稳定、可被当作整体引用的人群;像“他们”“一些人”“一个朋友”这类边界不稳或 unresolved 的表达不要归入 `群体`。
- `偏好习惯目标` 只能用于稳定偏好、重复习惯或具体明确的用户目标,不能把抽象结果、泛因果终点、空泛愿望或情绪状态强行归入其中
- 当前阶段不抽取情绪状态实体;像“紧张”“开心”“难过”“焦虑”“放松”这类情绪或心理状态,不要归入 `知识能力`、`偏好习惯目标` 或其他现有类型。
- `偏好习惯` 只保留稳定偏好、重复习惯或长期行为倾向;`具体目标` 只保留具体、明确、可验证的目标结果。都不允许吸纳抽象愿望、泛因果终点或情绪状态
- 当前阶段不抽取情绪状态实体;像“紧张”“开心”“难过”“焦虑”“放松”这类情绪或心理状态,不要归入 `知识能力`、`偏好习惯`、`具体目标` 或其他现有类型。
===关系本体大类===
以下大类是当前 `predicate` 本体树的第一层,用于帮助理解和约束后面的具体关系白名单。输出具体 `predicate` 时仍然必须使用后文列出的细关系,而不是直接输出这些大类名称。
@@ -346,20 +328,12 @@ Primary statement to analyze:
- notes: 两端通常都应是可作为交互主体的实体。
- status: `enabled`
- `提及关系`
- definition: 表达主体或文本明确提到某实体的关系。
- covered_predicates: `提到`
- positive_examples: `用户 -> 提到 -> 腾讯`、`文档 -> 提到 -> 张三`
- negative_examples: `用户 -> 提到 -> 努力`、`用户 -> 提到 -> 回报`、`用户 -> 提到 -> 紧张`
- notes: 受限大类;不用于保留泛化概念、抽象命题片段、情绪状态或仅在句面上出现但没有记忆价值的对象
- status: `restricted`
- `一般关联关系`
- definition: 表达两个实体之间存在明确、稳定、值得保留,但当前无更精确谓词可用的关联关系。
- covered_predicates: `关联于`、`相关于`
- positive_examples: `项目 -> 关联于 -> 实验室`、`账号 -> 相关于 -> 平台`
- negative_examples: `努力 -> 相关于 -> 回报`、`用户 -> 关联于 -> 紧张`、`成功 -> 相关于 -> 意义`
- notes: 受限大类;不能作为失败兜底关系,不能用来连接抽象概念、口号式表达或无法成立的关系。
- `弱关联关系`
- definition: 表达两个实体之间存在明确、稳定、值得保留,但当前缺少更精确谓词可用的弱关联关系。
- covered_predicates: `提到`、`关联于`、`相关于`
- positive_examples: `项目 -> 关联于 -> 实验室`、`账号 -> 相关于 -> 平台`、`文档 -> 提到 -> 张三`
- negative_examples: `努力 -> 相关于 -> 回报`、`用户 -> 提到 -> 紧张`、`成功 -> 关联于 -> 意义`
- notes: 受限大类;不能作为失败兜底关系,不能用来连接抽象概念、口号式表达、情绪状态或无法成立的关系
- status: `restricted`
===预定义关系类型===
@@ -750,8 +724,8 @@ Output:
],
"entities": [
{"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "拥有该 GitHub 账号的说话者", "is_explicit_memory": false},
{"entity_idx": 1, "name": "GitHub账号", "type": "账号", "type_description": "账户、账号、用户档案类实体。", "description": "用户拥有的 GitHub 账号", "is_explicit_memory": false},
{"entity_idx": 2, "name": "chen4", "type": "标识符", "type_description": "用于识别实体的编号、ID、用户名、号、工号等标识。", "description": "该 GitHub 账号对应的用户名标识", "is_explicit_memory": false}
{"entity_idx": 1, "name": "GitHub账号", "type": "识别联系信息", "type_description": "账号、用户名、编号、邮箱、手机号等与识别、联系或登录相关的信息对象。", "description": "用户拥有的 GitHub 账号", "is_explicit_memory": false},
{"entity_idx": 2, "name": "chen4", "type": "识别联系信息", "type_description": "账号、用户名、号、邮箱、手机号等与识别、联系或登录相关的信息对象。", "description": "该 GitHub 账号对应的用户名标识", "is_explicit_memory": false}
]
}
@@ -764,7 +738,7 @@ Output:
{"subject_name": "机器人查票员", "subject_id": 0, "predicate": "沟通于", "predicate_description": "两个实体之间发生沟通或交流", "object_name": "用户", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"}
],
"entities": [
{"entity_idx": 0, "name": "机器人查票员", "type": "智能体", "type_description": "具有行动、交互或执行能力的非人主体如机器人、AI 或其他智慧体。", "description": "与用户发生沟通的机器人主体", "is_explicit_memory": false},
{"entity_idx": 0, "name": "机器人查票员", "type": "物品设备", "type_description": "可被持有、使用、携带的具体物体、设备、工具或交通工具。", "description": "与用户发生沟通的机器人主体", "is_explicit_memory": false},
{"entity_idx": 1, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "与机器人查票员沟通的说话者", "is_explicit_memory": false}
]
}