feat(memory): propagate temporal validity fields through extraction pipeline

- Add valid_at/invalid_at passthrough in triplet extraction prompt (both zh/en)
- Propagate temporal_validity to EntityEntityEdge in ExtractionOrchestrator
- Use coalesce() for valid_at/invalid_at in Neo4j cypher queries to handle NULLs
- Fix workspace_id/config_id UUID parsing in read_memory config resolution
- Downgrade verbose extraction pipeline logs from info to debug
- Remove UUID and short API key patterns from sensitive filter to reduce false positives
- Standardize log message format (use = spacing, end_user_id label)
- Fix misindented TODO comment in write_pipeline.py
This commit is contained in:
lanceyq
2026-04-28 21:26:32 +08:00
parent 1f0c88a5f0
commit 4af9b02815
22 changed files with 229 additions and 192 deletions

View File

@@ -46,6 +46,10 @@ def validate_language(language: Optional[str]) -> str:
if language is None:
return DEFAULT_LANGUAGE
# 处理枚举类型:优先取 .value避免 str(Language.ZH) → "Language.ZH"
if hasattr(language, "value"):
language = language.value
# 标准化:转小写并去除空白
lang = str(language).lower().strip()

View File

@@ -130,6 +130,10 @@ class LoggingConfig:
for neo4j_logger_name in ["neo4j", "neo4j.io", "neo4j.pool", "neo4j.notifications"]:
neo4j_logger = logging.getLogger(neo4j_logger_name)
neo4j_logger.addFilter(neo4j_filter)
# 压制 httpx / httpcore 的请求级日志(大量 HTTP Request: POST ... 噪音)
for noisy_logger in ["httpx", "httpcore", "httpcore.http11", "httpcore.connection"]:
logging.getLogger(noisy_logger).setLevel(logging.WARNING)
# 创建格式化器
formatter = logging.Formatter(

View File

@@ -40,8 +40,20 @@ async def long_term_storage(
# 获取数据库会话
with get_db_context() as db_session:
config_service = MemoryConfigService(db_session)
# 通过 end_user_id 获取 workspace_id确保日志和 fallback 逻辑完整
from app.services.memory_agent_service import get_end_user_connected_config
import uuid as _uuid
workspace_id = None
try:
connected = get_end_user_connected_config(end_user_id, db_session)
raw = connected.get("workspace_id")
if raw and raw != "None":
workspace_id = _uuid.UUID(str(raw))
except Exception:
pass
memory_config = config_service.load_memory_config(
config_id=memory_config_id, # 改为整数
config_id=memory_config_id,
workspace_id=workspace_id,
service_name="MemoryAgentService"
)
if long_term_type == AgentMemory_Long_Term.STRATEGY_CHUNK:

View File

@@ -15,7 +15,7 @@ class ParameterBuilder:
def __init__(self):
"""Initialize the parameter builder."""
logger.info("ParameterBuilder initialized")
logger.debug("ParameterBuilder initialized")
def build_tool_args(
self,

View File

@@ -86,7 +86,7 @@ class SearchService:
def __init__(self):
"""Initialize the search service."""
logger.info("SearchService initialized")
logger.debug("SearchService initialized")
def extract_content_from_result(self, result: dict, node_type: str = "") -> str:
"""

View File

@@ -24,7 +24,7 @@ class SessionService:
store: Redis session store instance
"""
self.store = store
logger.info("SessionService initialized")
logger.debug("SessionService initialized")
def resolve_user_id(self, session_string: str) -> str:
"""

View File

@@ -51,7 +51,7 @@ class TemplateService:
loader=FileSystemLoader(template_root),
autoescape=False # Disable autoescape for prompt templates
)
logger.info(f"TemplateService initialized with root: {template_root}")
logger.debug(f"TemplateService initialized with root: {template_root}")
@lru_cache(maxsize=128)
def _load_template(self, template_name: str) -> Template:

View File

@@ -10,6 +10,7 @@ async def get_chunked_dialogs(
messages: list = None,
ref_id: str = "",
config_id: str = None,
workspace_id=None,
snapshot=None,
) -> List[DialogData]:
"""Generate chunks from structured messages using the specified chunker strategy.
@@ -76,6 +77,7 @@ async def get_chunked_dialogs(
config_service = MemoryConfigService(db)
memory_config = config_service.load_memory_config(
config_id=config_id,
workspace_id=workspace_id,
service_name="semantic_pruning"
)
@@ -107,6 +109,13 @@ async def get_chunked_dialogs(
remaining_msg_count = len(dialog_data.context.msgs)
deleted_count = original_msg_count - remaining_msg_count
logger.info(f"[剪枝] 完成: 原始{original_msg_count}条 -> 保留{remaining_msg_count}条 (删除{deleted_count}条)")
# 将剪枝记录挂到 metadata供 graph_build_step 构建节点
if pruner.pruning_records:
dialog_data.metadata["assistant_pruning_records"] = [
r.model_dump() for r in pruner.pruning_records
]
logger.info(f"[剪枝] 收集到 {len(pruner.pruning_records)} 条剪枝记录")
else:
logger.warning("[剪枝] prune_dataset 返回空列表")
else:

View File

@@ -389,7 +389,7 @@ async def write(
workspace_id=str(memory_config.workspace_id),
stats=stats_to_cache,
)
logger.info(f"[WRITE] 活动统计已写入 Redis: workspace_id={memory_config.workspace_id}")
logger.info(f"[WRITE] 活动统计已写入 Redis: workspace_id = {memory_config.workspace_id}")
except Exception as cache_err:
logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True)

View File

@@ -64,7 +64,7 @@ class ImplicitMemoryLLMClient:
self.default_model_id = default_model_id
self._client_factory = MemoryClientFactory(db)
logger.info("ImplicitMemoryLLMClient initialized")
logger.debug("ImplicitMemoryLLMClient initialized")
def _get_llm_client(self, model_id: Optional[str] = None):
"""Get LLM client instance.

View File

@@ -17,10 +17,11 @@ from __future__ import annotations
import asyncio
import logging
import time
import uuid
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional
from app.core.memory.utils.log.bear_logger import BearLogger
from pydantic import BaseModel, Field, ConfigDict
if TYPE_CHECKING:
@@ -40,6 +41,7 @@ from app.core.memory.models.graph_models import (
)
logger = logging.getLogger(__name__)
bear = BearLogger("memory.pipeline")
# ──────────────────────────────────────────────
@@ -165,112 +167,82 @@ class WritePipeline:
is_pilot_run: 试运行模式(只萃取不写入)
Returns:
WriteResult 包含状态和统计信息
WriteResult 包含状态和统计信息
"""
if not ref_id:
ref_id = uuid.uuid4().hex
mode = "试运行" if is_pilot_run else "正式"
pipeline_start = time.time()
logger.info(
f"[WritePipeline] 开始 ({mode}) "
f"config={self.memory_config.config_name}, "
f"end_user={self.end_user_id}"
)
extraction_result = None
try:
# 初始化客户端和连接
self._init_clients()
self._init_neo4j_connector()
async with bear.pipeline(
"WritePipeline",
mode=mode,
config_name=self.memory_config.config_name,
end_user_id=self.end_user_id,
):
# 初始化客户端和连接
self._init_clients()
self._init_neo4j_connector()
# 初始化 Snapshot提前创建供预处理阶段的剪枝使用
from app.core.memory.utils.debug.pipeline_snapshot import PipelineSnapshot
self._snapshot = PipelineSnapshot("new")
# 初始化 Snapshot提前创建供预处理阶段的剪枝使用
from app.core.memory.utils.debug.pipeline_snapshot import PipelineSnapshot
self._snapshot = PipelineSnapshot("new")
# Step 1: 预处理 - 消息分块 + AI消息语义剪枝
step_start = time.time()
chunked_dialogs = await self._preprocess(messages, ref_id)
chunks_count = sum(len(d.chunks) for d in chunked_dialogs)
logger.info(
f"[WritePipeline] [1/5] 预处理:消息分块 "
f"{time.time() - step_start:.2f}s chunks={chunks_count}"
)
# Step 1: 预处理 - 消息分块 + AI消息语义剪枝
async with bear.step(1, 5, "预处理", "消息分块") as s:
chunked_dialogs = await self._preprocess(messages, ref_id)
s.metadata(chunks=sum(len(d.chunks) for d in chunked_dialogs))
# Step 2: 萃取 - 知识提取
step_start = time.time()
extraction_result = await self._extract(chunked_dialogs, is_pilot_run)
stats = extraction_result.stats
logger.info(
f"[WritePipeline] [2/5] 萃取:知识提取 "
f"{time.time() - step_start:.2f}s "
f"entities={stats['entity_count']}, "
f"statements={stats['statement_count']}, "
f"relations={stats['relation_count']}"
)
# Step 2: 萃取 - 知识提取
async with bear.step(2, 5, "萃取", "知识提取") as s:
extraction_result = await self._extract(chunked_dialogs, is_pilot_run)
stats = extraction_result.stats
s.metadata(
entities=stats["entity_count"],
statements=stats["statement_count"],
relations=stats["relation_count"],
)
# 试运行模式到此结束
if is_pilot_run:
return WriteResult(
status="pilot_complete",
extraction=extraction_result.stats,
elapsed_seconds=0.0,
)
# Step 3: 存储 - 写入 Neo4j
async with bear.step(3, 5, "存储", "写入 Neo4j"):
await self._store(extraction_result)
# Step 3.2: 别名归并
async with bear.step(3, 5, "别名归并", "处理别名属于关系"):
await self._merge_alias_belongs_to(extraction_result)
# Step 3.5: 异步情绪提取fire-and-forget需在 _store 之后确保 Statement 节点已存在)
await self._extract_emotion(getattr(self, "_emotion_statements", []))
# Step 4: 聚类 - 增量更新社区(异步,不阻塞)
async with bear.step(4, 5, "聚类", "增量更新社区") as s:
await self._cluster(extraction_result)
s.metadata(mode="async")
# Step 5: 摘要 - 生成情景记忆摘要
async with bear.step(5, 5, "摘要", "生成情景记忆"):
await self._summarize(chunked_dialogs)
# 更新活动统计缓存
await self._update_stats_cache(extraction_result)
# 试运行模式到此结束
if is_pilot_run:
elapsed = time.time() - pipeline_start
logger.info(f"[WritePipeline] 完成(试运行) ✔ {elapsed:.2f}s")
return WriteResult(
status="pilot_complete",
status="success",
extraction=extraction_result.stats,
elapsed_seconds=elapsed,
elapsed_seconds=0.0,
)
# Step 3: 存储 - 写入 Neo4j
step_start = time.time()
await self._store(extraction_result)
logger.info(
f"[WritePipeline] [3/5] 存储:写入 Neo4j "
f"{time.time() - step_start:.2f}s"
)
# Step 3.2: 别名归并 - 处理 predicate="别名属于" 的关系
step_start = time.time()
await self._merge_alias_belongs_to(extraction_result)
logger.info(
f"[WritePipeline] [3.2] 别名归并:处理别名属于关系 "
f"{time.time() - step_start:.2f}s"
)
# Step 3.5: 异步情绪提取fire-and-forget需在 _store 之后确保 Statement 节点已存在)
await self._extract_emotion(getattr(self, "_emotion_statements", []))
# Step 4: 聚类 - 增量更新社区(异步,不阻塞)
step_start = time.time()
await self._cluster(extraction_result)
logger.info(
f"[WritePipeline] [4/5] 聚类:增量更新社区 "
f"{time.time() - step_start:.2f}s mode=async"
)
# Step 5: 摘要 - 生成情景记忆摘要
step_start = time.time()
await self._summarize(chunked_dialogs)
logger.info(
f"[WritePipeline] [5/5] 摘要:生成情景记忆 "
f"{time.time() - step_start:.2f}s"
)
# 更新活动统计缓存
await self._update_stats_cache(extraction_result)
elapsed = time.time() - pipeline_start
logger.info(f"[WritePipeline] 完成 ✔ {elapsed:.2f}s")
return WriteResult(
status="success",
extraction=extraction_result.stats,
elapsed_seconds=elapsed,
)
except Exception as e:
elapsed = time.time() - pipeline_start
logger.error(
f"[WritePipeline] 失败 ✘ {elapsed:.2f}s error={e}",
exc_info=True,
)
except Exception:
raise
finally:
@@ -300,6 +272,7 @@ class WritePipeline:
messages=messages,
ref_id=ref_id,
config_id=str(self.memory_config.config_id),
workspace_id=self.memory_config.workspace_id,
snapshot=snapshot,
)
@@ -409,7 +382,7 @@ class WritePipeline:
]
snapshot.save_stage("5_embedding_outputs", emb_data)
# step2: 构建图节点和边
# step2: 构建图节点和边
graph = await build_graph_nodes_and_edges(
dialog_data_list=dialog_data_list,
embedder_client=self._embedder_client,
@@ -445,7 +418,7 @@ class WritePipeline:
},
)
# step3: 两阶段去重消歧
# step3: 两阶段去重消歧
dedup_result = await run_dedup(
entity_nodes=graph.entity_nodes,
statement_entity_edges=graph.stmt_entity_edges,
@@ -482,7 +455,7 @@ class WritePipeline:
],
},
)
# step4: 构造最终结果
result = ExtractionResult(
dialogue_nodes=graph.dialogue_nodes,
@@ -501,7 +474,7 @@ class WritePipeline:
dialog_data_list=dialog_data_list,
)
snapshot.save_summary(result.stats) # TODO 乐力齐 snapshot需要改
snapshot.save_summary(result.stats) # TODO 乐力齐 snapshot需要改
return result
# ──────────────────────────────────────────────
@@ -545,7 +518,7 @@ class WritePipeline:
assistant_dialog_edges=result.assistant_dialog_edges,
)
if success:
logger.info("Successfully saved all data to Neo4j")
logger.debug("Successfully saved all data to Neo4j")
return
# 写入返回 False部分失败
if attempt < max_retries - 1:
@@ -584,7 +557,8 @@ class WritePipeline:
# 筛选出所有 predicate="别名属于" 的边
alias_edges = [
e for e in result.entity_entity_edges
e
for e in result.entity_entity_edges
if getattr(e, "relation_type", "") == ALIAS_PREDICATE
or getattr(e, "predicate", "") == ALIAS_PREDICATE
]
@@ -608,14 +582,18 @@ class WritePipeline:
if source_name:
existing_lower = {a.lower() for a in (target_node.aliases or [])}
if source_name.lower() not in existing_lower:
target_node.aliases = list(target_node.aliases or []) + [source_name]
target_node.aliases = list(target_node.aliases or []) + [
source_name
]
# 将 source.description append 进 target.description追加分号分隔
src_desc = (source_node.description or "").strip()
if src_desc:
tgt_desc = (target_node.description or "").strip()
if src_desc not in tgt_desc:
target_node.description = f"{tgt_desc}{src_desc}" if tgt_desc else src_desc
target_node.description = (
f"{tgt_desc}{src_desc}" if tgt_desc else src_desc
)
logger.info(
f"[AliasMerge] 内存同步完成,处理 {len(alias_edges)}'别名属于'"
@@ -627,12 +605,12 @@ class WritePipeline:
end_user_id=self.end_user_id,
)
merged_count = len(records) if records else 0
logger.info(
f"[AliasMerge] Neo4j 别名归并完成,影响 {merged_count} 条记录"
)
logger.info(f"[AliasMerge] Neo4j 别名归并完成,影响 {merged_count} 条记录")
except Exception as e:
logger.warning(f"[AliasMerge] 别名归并失败(不影响主流程): {e}", exc_info=True)
logger.warning(
f"[AliasMerge] 别名归并失败(不影响主流程): {e}", exc_info=True
)
# ──────────────────────────────────────────────
# Step 4: 聚类
@@ -675,8 +653,8 @@ class WritePipeline:
)
logger.info(
f"[Clustering] 增量聚类任务已提交 - "
f"task_id={task.id}, "
f"entity_count={len(new_entity_ids)}, "
f"task_id = {task.id}, "
f"entity_count = {len(new_entity_ids)}, "
f"source=dedup"
)
except Exception as e:
@@ -734,9 +712,9 @@ class WritePipeline:
)
logger.info(
f"[Emotion] 异步情绪提取任务已提交 - "
f"task_id={result.id}, "
f"statement_count={len(emotion_statements)}, "
f"snapshot_dir={snapshot_dir}, "
f"task_id = {result.id}, "
f"statement_count = {len(emotion_statements)}, "
f"snapshot_dir = {snapshot_dir}, "
f"source=async"
)
except Exception as e:
@@ -749,7 +727,7 @@ class WritePipeline:
# Step 5: 摘要
# + entity_description+ meta_data部分在此提取
# ──────────────────────────────────────────────
# TODO 乐力齐 需要做成异步celery任务
# TODO 乐力齐 需要做成异步celery任务
async def _summarize(self, chunked_dialogs: List[DialogData]) -> None:
"""
摘要:生成情景记忆摘要 → 写入 Neo4j。
@@ -877,8 +855,7 @@ class WritePipeline:
external_assistant_aliases=neo4j_assistant_aliases,
)
logger.info(
f"别名清洗完成AI助手别名排除集大小: "
f"{len(neo4j_assistant_aliases)}"
f"别名清洗完成AI助手别名排除集大小: {len(neo4j_assistant_aliases)}"
)
except Exception as e:
logger.warning(f"别名清洗失败(不影响主流程): {e}")
@@ -911,8 +888,7 @@ class WritePipeline:
stats=stats,
)
logger.info(
f"活动统计已写入 Redis: "
f"workspace_id={self.memory_config.workspace_id}"
f"活动统计已写入 Redis: workspace_id={self.memory_config.workspace_id}"
)
except Exception as e:
logger.warning(f"写入活动统计缓存失败(不影响主流程): {e}")

View File

@@ -1303,6 +1303,7 @@ class ExtractionOrchestrator:
# 只有当两个实体ID都存在时才创建边
if subject_entity_id and object_entity_id:
_tv = getattr(statement, "temporal_validity", None)
entity_entity_edge = EntityEntityEdge(
source=subject_entity_id,
target=object_entity_id,
@@ -1314,6 +1315,8 @@ class ExtractionOrchestrator:
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
created_at=dialog_data.created_at,
expired_at=dialog_data.expired_at,
valid_at=_tv.valid_at if _tv else None,
invalid_at=_tv.invalid_at if _tv else None,
)
entity_entity_edges.append(entity_entity_edge)

View File

@@ -105,7 +105,7 @@ class NewExtractionOrchestrator:
SidecarTiming.AFTER_TRIPLET
]
logger.info(
logger.debug(
"NewExtractionOrchestrator initialised — "
"after_statement sidecars: %d, after_triplet sidecars: %d",
len(self.after_statement_sidecars),
@@ -274,11 +274,11 @@ class NewExtractionOrchestrator:
) -> List[DialogData]:
"""Pilot mode: statement + triplet extraction only, no sidecars or embeddings."""
# Phase 1: Statement extraction (chunk-level parallel)
logger.info("Pilot phase 1/2: Statement extraction")
logger.debug("Pilot phase 1/2: Statement extraction")
all_stmt_results = await self._extract_all_statements(dialog_data_list)
# Phase 2: Triplet extraction (statement-level parallel)
logger.info("Pilot phase 2/2: Triplet extraction")
logger.debug("Pilot phase 2/2: Triplet extraction")
all_triplet_results = await self._extract_all_triplets(
dialog_data_list, all_stmt_results
)
@@ -327,7 +327,7 @@ class NewExtractionOrchestrator:
},
)
logger.info("Pilot extraction complete")
logger.debug("Pilot extraction complete")
return dialog_data_list
# ── 3b. 正式模式:四阶段并发执行 ──
@@ -338,7 +338,7 @@ class NewExtractionOrchestrator:
"""Full mode: all four phases with concurrent sidecars and embeddings."""
# ── Phase 1: Statement extraction + chunk/dialog embedding ──
logger.info("Phase 1/4: Statement extraction + chunk/dialog embedding")
logger.debug("Phase 1/4: Statement extraction + chunk/dialog embedding")
chunk_dialog_emb_input = self._build_chunk_dialog_embedding_input(
dialog_data_list
)
@@ -367,7 +367,7 @@ class NewExtractionOrchestrator:
logger.warning("Chunk/dialog embedding failed: %s", phase1_results[1])
# ── Phase 2: Triplet extraction + after_statement sidecars + statement embedding ──
logger.info(
logger.debug(
"Phase 2/4: Triplet extraction + sidecars + statement embedding"
)
stmt_emb_input = self._build_statement_embedding_input(
@@ -403,7 +403,7 @@ class NewExtractionOrchestrator:
)
# ── Phase 3: Entity embedding + after_triplet sidecars ──
logger.info("Phase 3/4: Entity embedding + after_triplet sidecars")
logger.debug("Phase 3/4: Entity embedding + after_triplet sidecars")
entity_emb_input = self._build_entity_embedding_input(all_triplet_results)
after_triplet_pairs: List[Tuple[ExtractionStep, Any]] = []
@@ -430,7 +430,7 @@ class NewExtractionOrchestrator:
merged_emb = self._merge_embeddings(chunk_dialog_emb, stmt_emb, entity_emb)
# ── Phase 4: Data assignment ──
logger.info("Phase 4/4: Data assignment")
logger.debug("Phase 4/4: Data assignment")
self._assign_results(
dialog_data_list,
@@ -453,7 +453,7 @@ class NewExtractionOrchestrator:
"embedding_output": merged_emb,
}
logger.info("Full extraction pipeline complete")
logger.debug("Full extraction pipeline complete")
return dialog_data_list
@property

View File

@@ -53,7 +53,7 @@ class DialogueChunker:
)
self.chunker_strategy = chunker_strategy
logger.info(f"Initializing DialogueChunker with strategy: {chunker_strategy}")
logger.debug(f"Initializing DialogueChunker with strategy: {chunker_strategy}")
try:
# Load and validate configuration
@@ -71,7 +71,7 @@ class DialogueChunker:
else:
self.chunker_client = ChunkerClient(self.chunker_config)
logger.info(f"DialogueChunker initialized successfully with strategy: {chunker_strategy}")
logger.debug(f"DialogueChunker initialized successfully with strategy: {chunker_strategy}")
except Exception as e:
logger.error(f"Failed to initialize DialogueChunker: {e}", exc_info=True)
@@ -101,7 +101,7 @@ class DialogueChunker:
f"Messages: {len(dialogue.context.msgs) if dialogue.context else 0}"
)
logger.info(
logger.debug(
f"Processing dialogue {dialogue.ref_id} with {len(dialogue.context.msgs)} messages "
f"using strategy: {self.chunker_strategy}"
)
@@ -121,7 +121,7 @@ class DialogueChunker:
)
logger.info(
f"Successfully generated {len(chunks)} chunks for dialogue {dialogue.ref_id}. "
f"Successfully generated {len(chunks)} chunks for dialogue_id: {dialogue.ref_id}. "
f"Total characters processed: {len(dialogue.content) if dialogue.content else 0}"
)

View File

@@ -142,7 +142,7 @@ class ConfigAuditLogger:
result = "SUCCESS" if success else "FAILED"
msg = (
f"{operation.upper()} config_id={config_id} "
f"group={end_user_id} result={result}"
f"end_user_id={end_user_id} result={result}"
)
if duration is not None:
msg += f" duration={duration:.2f}s"

View File

@@ -12,6 +12,7 @@ Extract entities and knowledge triplets from the given statement.
- 命名关系中新出现的称呼、别名、昵称、产品名保持原样,不做替换。
- `description` 使用中文。
- `type`、`predicate`、`type_description`、`predicate_description` 一律使用中文。
- 每个 `triplet` 都必须携带 `valid_at` 和 `invalid_at`,并直接复制输入中的同名字段,不要自行改写或推断新的时间边界。
{% else %}
Important:
- Keep `name`, `subject_name`, and `object_name` in their original surface form from the source text by default.
@@ -22,6 +23,7 @@ Extract entities and knowledge triplets from the given statement.
- Newly introduced names in naming or alias expressions must stay in their original form.
- Generate `description` in English.
- Always generate `type`, `predicate`, `type_description`, and `predicate_description` in Chinese.
- Every `triplet` MUST include `valid_at` and `invalid_at`, copied directly from the input fields with the same names; do not rewrite or infer new temporal bounds.
{% endif %}
===Inputs===
@@ -110,7 +112,8 @@ Primary statement to analyze:
- 不要从 `supporting_context.msgs` 中单独抽取实体或关系。
- 如果某条信息只出现在 `supporting_context.msgs` 中,而没有出现在 `statement_text` 中,就不要输出它。
- 如果 `supporting_context.msgs` 中的 Assistant 消息包含总结、猜测、解释或改写,这些内容只能作为理解辅助,不能直接作为抽取来源。
- `statement_type`、`temporal_type`、`valid_at`、`invalid_at` 是辅助理解字段,不是抽取目标。
- `statement_type`、`temporal_type` 是辅助理解字段,不是抽取目标。
- `valid_at`、`invalid_at` 不用于决定是否创建实体或关系,但如果产生 triplet必须原样复制到每个 triplet 的同名字段中。
- 对 `statement_text` 中的用户自指表达,要统一规范成实体 `用户`。
- 对其他可稳定解析的代词或指示表达,要替换为具体指代实体名后再抽取。
- 对命名关系中新出现的称呼、别名、昵称、产品名,不要因为上下文可推断其所指而直接改写,它们应保持原样作为实体名。
@@ -120,7 +123,8 @@ Primary statement to analyze:
- Do not extract any standalone entity or relation from `supporting_context.msgs`.
- If some information appears only in `supporting_context.msgs` but not in `statement_text`, do not include it in the output.
- If Assistant messages in `supporting_context.msgs` contain summary, guess, interpretation, or rephrasing, use them only as interpretive support and never as a direct extraction source.
- Treat `statement_type`, `temporal_type`, `valid_at`, and `invalid_at` as auxiliary context, not extraction targets.
- Treat `statement_type` and `temporal_type` as auxiliary context, not extraction targets.
- Do not use `valid_at` or `invalid_at` to decide whether to create entities or relations, but if any triplet is produced, copy them verbatim into every triplet field with the same names.
- Normalize user self-reference in `statement_text` to the entity `用户`.
- Replace other resolvable pronouns or demonstratives with their resolved entity names before extraction.
- For newly introduced names in naming or alias expressions, do not rewrite them even if the context reveals who they refer to; keep them as entity names.
@@ -229,7 +233,7 @@ Primary statement to analyze:
- unresolved 或边界不稳的表达,不因“看起来像名词”就创建实体。
- 情绪、心理状态、金额、数量、普通时间、一次性动作短语,默认不作为独立实体类型抽取。
- 抽象命题片段、泛化结果、价值判断,默认不创建实体;如有保留价值,应写入相关高价值实体的 `description`。
- 抽象命题片段、泛化结果、价值判断,默认不创建实体;如有保留价值且适合作为实体的稳定描述,可写入相关高价值实体的 `description`。
实体类型选择原则:
@@ -445,9 +449,9 @@ Do not let auxiliary fields drive the extraction process.
- 不要为了表达一句带时间或地点的行动,而额外创造“任务”“计划”“事件”实体。
- 但如果动作明确把主体和某个稳定实体连接起来,可以保留该稳定实体,并抽取轻关系。例如“我去图书馆”“我去公司开会”“我去上课”“我去看演唱会”可以抽取 `前往`。
- 当句子只是在讨论一般道理、抽象规律、空泛结果或非个体化概念,而这些概念本身不构成可复用记忆时,不要创建实体。
- 如果句子表达的是用户的观点、信念、判断、愿望或目标倾向,但其中抽象对象不值得作为独立实体保留,则只保留相关高价值实体,不要再创建这些低价值对象实体,并把未抽取的抽象内容压缩写入相关实体的 `description`例如“用户认为努力就会有回报”只保留 `用户`,并在 `description` 中体现“用户认为努力就会有回报”
- 对于未抽取的抽象实体、抽象命题片段或泛化结果,只要它们对理解该高价值实体有帮助,就应优先写入该实体的 `description`,而不是改用宽泛关系或补造弱实体
- 当前阶段同样不要把情绪或心理状态抽成实体;如果句子里出现“紧张”“开心”“难过”“焦虑”“放松”等,应写入相关高价值实体的 `description`,而不是把它们标成 `知识能力`、`偏好习惯目标` 或其他近似类型
- 如果句子表达的是用户的观点、信念、判断、愿望或目标倾向,但其中抽象对象不值得作为独立实体保留,则只保留相关高价值实体,不要再创建这些低价值对象实体;只有当未抽取内容适合作为该实体的稳定描述时,才写入相关实体的 `description`例如“用户认为努力就会有回报”只保留 `用户`,并在 `description` 中体现这一较稳定认知倾向
- 对于未抽取的抽象实体、抽象命题片段或泛化结果,不要默认全部写入 `description`;只有当它们适合作为该实体的稳定描述、且对后续区分或理解该实体有帮助时,才写入 `description`
- 当前阶段同样不要把情绪或心理状态抽成实体;如果句子里出现“紧张”“开心”“难过”“焦虑”“放松”等,只有在它们能被稳定概括为较持久的认知或态度时,才可间接体现在相关高价值实体的 `description` 中;短期情绪状态本身不要写入 `description`
- 如果陈述里有值得保留的实体信息,但没有有效关系,可以只返回 `entities`,并把 `triplets` 设为 `[]`。
- `name` 默认保持原文中的表面形式,但用户自指必须写成 `用户`,可稳定解析的其他代词必须替换为具体指代实体名。
- `description` 必须使用中文。
@@ -461,9 +465,9 @@ Do not let auxiliary fields drive the extraction process.
- Do not create extra "task", "plan", or "event" entities just to represent an action with time or location modifiers.
- But if an action clearly connects the subject to a stable entity, keep that stable entity and use a light relation. For example, statements like "I go to the library", "I go to the office", "I go to class", or "I go to a concert" can use `前往`.
- If the sentence is only about a generic principle, abstract outcome, or non-personalized concept that is not worth remembering on its own, do not create an entity for it.
- If a statement expresses the user's belief, judgment, opinion, wish, or goal tendency but the referenced abstract concepts are not worth keeping as standalone entities, keep only the relevant high-value entities, do not create those low-value concept entities, and compress the unextracted abstract content into the relevant entity `description`. For example, "the user believes effort brings reward" should keep only `用户` and reflect that belief in `description`.
- For abstract entities, proposition fragments, or generic outcomes that are not extracted, prefer writing them into the relevant retained entity's `description` when they help preserve the memory, instead of switching to a broad relation or inventing a weak entity.
- In the current stage, do not extract emotional or psychological states as entities. States such as nervousness, happiness, sadness, anxiety, or relief should be written into the relevant retained entity's `description` rather than mapped to `知识能力`, `偏好习惯目标`, or any other approximate type.
- If a statement expresses the user's belief, judgment, opinion, wish, or goal tendency but the referenced abstract concepts are not worth keeping as standalone entities, keep only the relevant high-value entities and do not create those low-value concept entities; write the unextracted content into an entity `description` only when it is suitable as a stable description of that entity. For example, "the user believes effort brings reward" may keep only `用户` and reflect that relatively stable belief in `description`.
- For abstract entities, proposition fragments, or generic outcomes that are not extracted, do not automatically write them into `description`; only do so when they are suitable as a stable description of the retained entity and help identify or understand it.
- In the current stage, do not extract emotional or psychological states as entities. States such as nervousness, happiness, sadness, anxiety, or relief should not be mapped to `知识能力`, `偏好习惯目标`, or any other approximate type, and short-lived emotional states should not be written into `description`.
- If the statement contains entity-worthy content but no valid relation, it is acceptable to return `entities` with `triplets: []`.
- Keep `name` in its original surface form by default, but write user self-reference as `用户` and replace other stably resolvable references with their resolved entity names.
- `description` must be in English.
@@ -487,11 +491,17 @@ Do not let auxiliary fields drive the extraction process.
- `description` 应短、直白、与当前上下文相关,并能帮助区分实体。
- 优先描述实体在当前陈述和必要上下文中的身份、作用或关系。
- `description` 只保留适合长期附着在该实体上的描述,例如稳定身份、稳定关系、长期偏好/兴趣/习惯、较稳定认知倾向或可用于区分实体的持久特征。
- 不要把短期状态、一次性事件、临时计划、当前情绪、具体时间锚点,或只在当前句子里短暂成立的信息写进 `description`。
- 如果实体应保留,但当前 statement 中没有适合长期附着在该实体上的稳定描述,则 `description` 允许为空字符串 `""`;不要为了填充 `description` 而写入短期状态或临时信息。
- 避免使用“陈述中提到的人物”“陈述中提到的组织”“陈述中提到的物品”这类低信息量模板。
- 不要补充识别实体所不需要的外部知识。
{% else %}
- `description` should be short, context-grounded, and discriminative.
- Prefer describing the entity's role, identity, or relation in the current statement and necessary supporting context.
- `description` should keep only information suitable to remain attached to the entity over time, such as stable identity, stable relations, long-term preferences/interests/habits, relatively stable beliefs, or persistent distinguishing traits.
- Do not put short-lived states, one-off events, temporary plans, current emotions, concrete time anchors, or information that only briefly holds in the current sentence into `description`.
- If an entity should be retained but the current statement does not provide any suitable stable description for it, `description` may be the empty string `""`; do not fill it with short-lived states or temporary information just to avoid emptiness.
- Avoid low-information templates such as "the person mentioned in the statement" or "the organization mentioned in the statement".
- Do not add extra world knowledge that is not needed for identifying the entity in context.
{% endif %}
@@ -518,6 +528,7 @@ Do not let auxiliary fields drive the extraction process.
- 如果 `has_unsolved_reference` 是 `true`,不要抽取实体或 triplets。
- `subject_name` 和 `object_name` 默认保持原文中的表面形式,但用户自指必须写成 `用户`,可稳定解析的其他代词必须替换为具体指代实体名。
- `predicate_description` 必须直接复用对应 `predicate` 的中文定义。
- 每个 triplet 都必须包含 `valid_at` 和 `invalid_at`,并直接复用输入中的同名字段值;如果输入是 `NULL`,这里也写 `NULL`。
- 不要把普通时间表达作为 triplet 的宾语。
- 不要为了表达一次性计划、安排、日程而强行构造关系。
- 当句子表达主体去某个地点、场所、组织、课程或活动时,只要该对象本身有记忆价值,就可以抽取 `前往`,即使句中同时带有时间信息。
@@ -537,6 +548,7 @@ Do not let auxiliary fields drive the extraction process.
- If `has_unsolved_reference` is `true`, do not extract entities or triplets.
- Keep `subject_name` and `object_name` in their original surface form by default, but write user self-reference as `用户` and replace other stably resolvable references with their resolved entity names.
- `predicate_description` must directly reuse the corresponding Chinese definition of `predicate`.
- Every triplet must include `valid_at` and `invalid_at`, copied directly from the input fields with the same names; if the input is `NULL`, write `NULL` here as well.
- Do not use ordinary time expressions as triplet objects.
- Do not force relations just to encode one-off plans, schedules, or actions.
- When the statement says that the subject goes to a place, venue, organization, class, or activity, you may extract `前往` as long as that destination itself is worth remembering, even if the statement also includes time information.
@@ -589,10 +601,14 @@ Do not let auxiliary fields drive the extraction process.
- 每个 triplet 中的 `subject_name` 必须与 `subject_id` 指向实体的 `name` 完全一致。
- 每个 triplet 中的 `object_name` 必须与 `object_id` 指向实体的 `name` 完全一致。
- 每个 triplet 中的 `valid_at` 必须与输入中的 `valid_at` 完全一致。
- 每个 triplet 中的 `invalid_at` 必须与输入中的 `invalid_at` 完全一致。
- 不要在 triplet 里使用与实体名不同的表面形式。
{% else %}
- `subject_name` in each triplet MUST exactly match the `name` of the entity referenced by `subject_id`.
- `object_name` in each triplet MUST exactly match the `name` of the entity referenced by `object_id`.
- `valid_at` in each triplet MUST exactly match the input `valid_at`.
- `invalid_at` in each triplet MUST exactly match the input `invalid_at`.
- Do not use alternative surface forms inside triplets.
{% endif %}
@@ -603,7 +619,7 @@ Statement: "我住在巴黎。"
Output:
{
"triplets": [
{"subject_name": "用户", "subject_id": 0, "predicate": "居住于", "predicate_description": "人物居住在某地点", "object_name": "巴黎", "object_id": 1}
{"subject_name": "用户", "subject_id": 0, "predicate": "居住于", "predicate_description": "人物居住在某地点", "object_name": "巴黎", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"}
],
"entities": [
{"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "居住在巴黎的说话者", "is_explicit_memory": false},
@@ -618,7 +634,7 @@ Input condition: supporting context has already made it clear that “他” ref
Output:
{
"triplets": [
{"subject_name": "张明", "subject_id": 0, "predicate": "任职于", "predicate_description": "主体在某组织中工作或任职", "object_name": "腾讯", "object_id": 1}
{"subject_name": "张明", "subject_id": 0, "predicate": "任职于", "predicate_description": "主体在某组织中工作或任职", "object_name": "腾讯", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"}
],
"entities": [
{"entity_idx": 0, "name": "张明", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "在腾讯工作的人员", "is_explicit_memory": false},
@@ -632,11 +648,11 @@ Statement: "我常去图书馆学微积分。"
Output:
{
"triplets": [
{"subject_name": "用户", "subject_id": 0, "predicate": "前往", "predicate_description": "主体前往某个地点、场所、组织、课程或活动", "object_name": "图书馆", "object_id": 1},
{"subject_name": "用户", "subject_id": 0, "predicate": "学习", "predicate_description": "主体正在学习某知识主题或技能", "object_name": "微积分", "object_id": 2}
{"subject_name": "用户", "subject_id": 0, "predicate": "前往", "predicate_description": "主体前往某个地点、场所、组织、课程或活动", "object_name": "图书馆", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"},
{"subject_name": "用户", "subject_id": 0, "predicate": "学习", "predicate_description": "主体正在学习某知识主题或技能", "object_name": "微积分", "object_id": 2, "valid_at": "NULL", "invalid_at": "NULL"}
],
"entities": [
{"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "经常图书馆学习微积分的说话者", "is_explicit_memory": false},
{"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "经常图书馆学习微积分的说话者", "is_explicit_memory": false},
{"entity_idx": 1, "name": "图书馆", "type": "地点设施", "type_description": "具有地理意义或功能性空间意义的位置与场所。", "description": "用户经常前往学习的地点", "is_explicit_memory": false},
{"entity_idx": 2, "name": "微积分", "type": "知识能力", "type_description": "可学习、掌握、使用或讨论的知识主题、技能、学科或语言。", "description": "用户经常学习的主题", "is_explicit_memory": true}
]
@@ -658,8 +674,8 @@ Statement: "我的朋友都叫我山哥。"
Output:
{
"triplets": [
{"subject_name": "山哥", "subject_id": 2, "predicate": "别名属于", "predicate_description": "别名指向其对应的规范实体", "object_name": "用户", "object_id": 0},
{"subject_name": "我的朋友", "subject_id": 1, "predicate": "使用称呼", "predicate_description": "主体使用某个名字来称呼另一实体", "object_name": "山哥", "object_id": 2}
{"subject_name": "山哥", "subject_id": 2, "predicate": "别名属于", "predicate_description": "别名指向其对应的规范实体", "object_name": "用户", "object_id": 0, "valid_at": "NULL", "invalid_at": "NULL"},
{"subject_name": "我的朋友", "subject_id": 1, "predicate": "使用称呼", "predicate_description": "主体使用某个名字来称呼另一实体", "object_name": "山哥", "object_id": 2, "valid_at": "NULL", "invalid_at": "NULL"}
],
"entities": [
{"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "被朋友称作山哥的说话者", "is_explicit_memory": false},
@@ -697,7 +713,7 @@ Output:
{
"triplets": [],
"entities": [
{"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "最近有些紧张并认为这很正常的说话者", "is_explicit_memory": false}
{"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "", "is_explicit_memory": false}
]
}
@@ -707,7 +723,7 @@ Statement: "王教授是导师。"
Output:
{
"triplets": [
{"subject_name": "王教授", "subject_id": 0, "predicate": "担任角色", "predicate_description": "主体承担某个角色", "object_name": "导师", "object_id": 1}
{"subject_name": "王教授", "subject_id": 0, "predicate": "担任角色", "predicate_description": "主体承担某个角色", "object_name": "导师", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"}
],
"entities": [
{"entity_idx": 0, "name": "王教授", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "承担导师角色的具体个人", "is_explicit_memory": false},
@@ -721,8 +737,8 @@ Statement: "我的GitHub账号用户名是chen4。"
Output:
{
"triplets": [
{"subject_name": "用户", "subject_id": 0, "predicate": "拥有账号", "predicate_description": "实体具有某账号", "object_name": "GitHub账号", "object_id": 1},
{"subject_name": "GitHub账号", "subject_id": 1, "predicate": "标识为", "predicate_description": "实体由某标识符标识", "object_name": "chen4", "object_id": 2}
{"subject_name": "用户", "subject_id": 0, "predicate": "拥有账号", "predicate_description": "实体具有某账号", "object_name": "GitHub账号", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"},
{"subject_name": "GitHub账号", "subject_id": 1, "predicate": "标识为", "predicate_description": "实体由某标识符标识", "object_name": "chen4", "object_id": 2, "valid_at": "NULL", "invalid_at": "NULL"}
],
"entities": [
{"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "拥有该 GitHub 账号的说话者", "is_explicit_memory": false},
@@ -737,7 +753,7 @@ Statement: "机器人查票员和我沟通。"
Output:
{
"triplets": [
{"subject_name": "机器人查票员", "subject_id": 0, "predicate": "沟通于", "predicate_description": "两个实体之间发生沟通或交流", "object_name": "用户", "object_id": 1}
{"subject_name": "机器人查票员", "subject_id": 0, "predicate": "沟通于", "predicate_description": "两个实体之间发生沟通或交流", "object_name": "用户", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"}
],
"entities": [
{"entity_idx": 0, "name": "机器人查票员", "type": "智能体", "type_description": "具有行动、交互或执行能力的非人主体如机器人、AI 或其他智慧体。", "description": "与用户发生沟通的机器人主体", "is_explicit_memory": false},
@@ -757,6 +773,7 @@ JSON 要求:
- `name`、`subject_name`、`object_name` 默认保持原文中的表面形式,但用户自指必须规范成 `用户`,可稳定解析的其他代词必须替换为具体指代实体名
- `description` 必须使用中文
- `type`、`predicate`、`type_description`、`predicate_description` 必须使用上方预定义的中文标签和中文说明
- 每个 triplet 都必须包含 `valid_at` 和 `invalid_at`,并与输入中的同名字段完全一致
- 如果 `has_unsolved_reference` 是 `true`,输出必须是 `{"entities": [], "triplets": []}`
- 如果存在无法稳定解析的代词或指示表达,输出也必须是 `{"entities": [], "triplets": []}`
- 如果没有有效 triplet返回 `"triplets": []`
@@ -769,6 +786,7 @@ JSON 要求:
- `name`, `subject_name`, and `object_name` keep their original surface forms by default, but user self-reference must be normalized to `用户` and other stably resolvable references must be replaced by their resolved entity names
- `description` must be in English
- `type`, `predicate`, `type_description`, and `predicate_description` must use the predefined Chinese labels and Chinese definitions above
- Every triplet must include `valid_at` and `invalid_at`, exactly matching the input fields with the same names
- If `has_unsolved_reference` is `true`, the output must be `{"entities": [], "triplets": []}`
- If unresolved references still remain, the output must also be `{"entities": [], "triplets": []}`
- If no valid triplet exists, return `"triplets": []`
@@ -799,7 +817,9 @@ Output JSON structure:
"predicate": "string",
"predicate_description": "string",
"object_name": "string",
"object_id": 0
"object_id": 0,
"valid_at": "ISO 8601 | NULL",
"invalid_at": "ISO 8601 | NULL"
}
]
}

View File

@@ -57,10 +57,8 @@ class SensitiveDataFilter:
(re.compile(r'\beyJ[A-Za-z0-9_-]+\.eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+'), "[TOKEN]"),
# JWT Token 部分匹配只有header和payload没有signature
(re.compile(r'\beyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+(?:\.[A-Za-z0-9_-]*)?'), "[TOKEN]"),
# UUID格式的token或ID
(re.compile(r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b', re.IGNORECASE), "[UUID]"),
# API密钥格式32位以上的字母数字组合
(re.compile(r'\b[A-Za-z0-9]{32,}\b'), "[API_KEY]"),
# API密钥格式64位以上的字母数字组合避免误过滤普通业务字段
(re.compile(r'\b[A-Za-z0-9]{64,}\b'), "[API_KEY]"),
]
# 替换文本

View File

@@ -33,8 +33,8 @@ SET s += {
temporal_info: statement.temporal_info,
created_at: statement.created_at,
expired_at: statement.expired_at,
valid_at: statement.valid_at,
invalid_at: statement.invalid_at,
valid_at: coalesce(statement.valid_at, ""),
invalid_at: coalesce(statement.invalid_at, ""),
statement_embedding: statement.statement_embedding,
relevence_info: statement.relevence_info,
importance_score: statement.importance_score,
@@ -152,8 +152,8 @@ SET r.predicate = rel.predicate,
r.statement_id = rel.statement_id,
r.value = rel.value,
r.statement = rel.statement,
r.valid_at = rel.valid_at,
r.invalid_at = rel.invalid_at,
r.valid_at = coalesce(rel.valid_at, ""),
r.invalid_at = coalesce(rel.invalid_at, ""),
r.created_at = rel.created_at,
r.expired_at = rel.expired_at,
r.run_id = rel.run_id,

View File

@@ -260,7 +260,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(DIALOGUE_NODE_SAVE, dialogues=dialogue_data)
dialogue_uuids = [record["uuid"] async for record in result]
results['dialogues'] = dialogue_uuids
logger.info(f"Dialogues saved to Neo4j with UUIDs: {dialogue_uuids}")
logger.debug(f"Dialogues saved to Neo4j with UUIDs: {dialogue_uuids}")
# 2. Save all chunk nodes in batch
if chunk_nodes:
@@ -269,7 +269,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(CHUNK_NODE_SAVE, chunks=chunk_data)
chunk_uuids = [record["uuid"] async for record in result]
results['chunks'] = chunk_uuids
logger.info(f"Successfully saved {len(chunk_uuids)} chunk nodes to Neo4j")
logger.debug(f"Successfully saved {len(chunk_uuids)} chunk nodes to Neo4j")
if perceptual_nodes:
from app.repositories.neo4j.cypher_queries import PERCEPTUAL_NODE_SAVE
@@ -277,7 +277,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(PERCEPTUAL_NODE_SAVE, perceptuals=perceptual_data)
perceptual_uuids = [record["uuid"] async for record in result]
results["perceptuals"] = perceptual_uuids
logger.info(f"Successfully saved {len(perceptual_uuids)} perceptual nodes to Neo4j")
logger.debug(f"Successfully saved {len(perceptual_uuids)} perceptual nodes to Neo4j")
# 3. Save all statement nodes in batch
if statement_nodes:
@@ -286,7 +286,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(STATEMENT_NODE_SAVE, statements=statement_data)
statement_uuids = [record["uuid"] async for record in result]
results['statements'] = statement_uuids
logger.info(f"Successfully saved {len(statement_uuids)} statement nodes to Neo4j")
logger.debug(f"Successfully saved {len(statement_uuids)} statement nodes to Neo4j")
# 4. Save entities
if entity_nodes:
@@ -295,7 +295,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(EXTRACTED_ENTITY_NODE_SAVE, entities=entity_data)
entity_uuids = [record["uuid"] async for record in result]
results['entities'] = entity_uuids
logger.info(f"Successfully saved {len(entity_uuids)} entity nodes to Neo4j")
logger.debug(f"Successfully saved {len(entity_uuids)} entity nodes to Neo4j")
# 5. Create entity relationships
if entity_edges:
@@ -320,7 +320,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(ENTITY_RELATIONSHIP_SAVE, relationships=relationship_data)
rel_uuids = [record["uuid"] async for record in result]
results['entity_relationships'] = rel_uuids
logger.info(f"Successfully saved {len(rel_uuids)} entity relationships to Neo4j")
logger.debug(f"Successfully saved {len(rel_uuids)} entity relationships to Neo4j")
# 6. Save statement-chunk edges
if statement_chunk_edges:
@@ -339,7 +339,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(CHUNK_STATEMENT_EDGE_SAVE, chunk_statement_edges=sc_edge_data)
sc_uuids = [record["uuid"] async for record in result]
results['statement_chunk_edges'] = sc_uuids
logger.info(f"Successfully saved {len(sc_uuids)} statement-chunk edges to Neo4j")
logger.debug(f"Successfully saved {len(sc_uuids)} statement-chunk edges to Neo4j")
# 7. Save statement-entity edges
if statement_entity_edges:
@@ -358,7 +358,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(STATEMENT_ENTITY_EDGE_SAVE, relationships=se_edge_data)
se_uuids = [record["uuid"] async for record in result]
results['statement_entity_edges'] = se_uuids
logger.info(f"Successfully saved {len(se_uuids)} statement-entity edges to Neo4j")
logger.debug(f"Successfully saved {len(se_uuids)} statement-entity edges to Neo4j")
if perceptual_edges:
from app.repositories.neo4j.cypher_queries import PERCEPTUAL_CHUNK_EDGE_SAVE
@@ -374,7 +374,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(PERCEPTUAL_CHUNK_EDGE_SAVE, edges=perceptual_edge_data)
perceptual_edges_uuids = [record["uuid"] async for record in result]
results['perceptual_chunk_edges'] = perceptual_edges_uuids
logger.info(f"Successfully saved {len(perceptual_edges_uuids)} perceptual-chunk edges to Neo4j")
logger.debug(f"Successfully saved {len(perceptual_edges_uuids)} perceptual-chunk edges to Neo4j")
# 8. Save assistant original nodes
if assistant_original_nodes:
@@ -383,7 +383,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(ASSISTANT_ORIGINAL_NODE_SAVE, originals=original_data)
original_uuids = [record["uuid"] async for record in result]
results['assistant_originals'] = original_uuids
logger.info(f"Successfully saved {len(original_uuids)} assistant original nodes to Neo4j")
logger.debug(f"Successfully saved {len(original_uuids)} assistant original nodes to Neo4j")
# 9. Save assistant pruned nodes
if assistant_pruned_nodes:
@@ -392,7 +392,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(ASSISTANT_PRUNED_NODE_SAVE, pruneds=pruned_data)
pruned_uuids = [record["uuid"] async for record in result]
results['assistant_pruneds'] = pruned_uuids
logger.info(f"Successfully saved {len(pruned_uuids)} assistant pruned nodes to Neo4j")
logger.debug(f"Successfully saved {len(pruned_uuids)} assistant pruned nodes to Neo4j")
# 10. Save PRUNED_TO edges (Original → Pruned)
if assistant_pruned_edges:
@@ -408,7 +408,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(ASSISTANT_PRUNED_EDGE_SAVE, edges=edge_data)
pruned_edge_uuids = [record["uuid"] async for record in result]
results['assistant_pruned_edges'] = pruned_edge_uuids
logger.info(f"Successfully saved {len(pruned_edge_uuids)} PRUNED_TO edges to Neo4j")
logger.debug(f"Successfully saved {len(pruned_edge_uuids)} PRUNED_TO edges to Neo4j")
# 11. Save BELONGS_TO_DIALOG edges (Original → Dialogue)
if assistant_dialog_edges:
@@ -423,7 +423,7 @@ async def save_dialog_and_statements_to_neo4j(
result = await tx.run(ASSISTANT_DIALOG_EDGE_SAVE, edges=edge_data)
dialog_edge_uuids = [record["uuid"] async for record in result]
results['assistant_dialog_edges'] = dialog_edge_uuids
logger.info(f"Successfully saved {len(dialog_edge_uuids)} BELONGS_TO_DIALOG edges to Neo4j")
logger.debug(f"Successfully saved {len(dialog_edge_uuids)} BELONGS_TO_DIALOG edges to Neo4j")
return results

View File

@@ -227,7 +227,7 @@ class OntologyClassRepository:
).all()
logger.info(
f"Found {len(classes)} ontology classes in scene {scene_id}"
f"Found {len(classes)} ontology classes in scene_id: {scene_id}"
)
return classes

View File

@@ -64,7 +64,7 @@ class MemoryAgentService:
def writer_messages_deal(self, messages, start_time, end_user_id, config_id, message, context):
duration = time.time() - start_time
if str(messages) == 'success':
logger.info(f"Write operation successful for group {end_user_id} with config_id {config_id}")
logger.info(f"Write operation successful for end_id: {end_user_id} with config_id {config_id}")
# 记录成功的操作
audit_logger.log_operation(operation="WRITE", config_id=config_id, end_user_id=end_user_id,
success=True,
@@ -360,10 +360,21 @@ class MemoryAgentService:
workspace_id = None
try:
connected_config = get_end_user_connected_config(end_user_id, db)
workspace_id = connected_config.get("workspace_id")
# get_end_user_connected_config 返回字符串,需转为 UUID
workspace_id_raw = connected_config.get("workspace_id")
if workspace_id_raw and workspace_id_raw != "None":
try:
workspace_id = uuid.UUID(str(workspace_id_raw))
except (ValueError, AttributeError):
workspace_id = None
if config_id is None:
config_id = connected_config.get("memory_config_id")
logger.info(f"Resolved config from end_user: config_id={config_id}, workspace_id={workspace_id}")
config_id_raw = connected_config.get("memory_config_id")
if config_id_raw and config_id_raw != "None":
try:
config_id = uuid.UUID(str(config_id_raw))
except (ValueError, AttributeError):
config_id = None
logger.info(f"Resolved config from end_user: config_id = {config_id}, workspace_id = {workspace_id}")
if config_id is None and workspace_id is None:
raise ValueError(
f"No memory configuration found for end_user {end_user_id}. "
@@ -517,7 +528,7 @@ class MemoryAgentService:
workspace_id = connected_config.get("workspace_id")
if config_id is None:
config_id = connected_config.get("memory_config_id")
logger.info(f"Resolved config from end_user: config_id={config_id}, workspace_id={workspace_id}")
logger.info(f"Resolved config from end_user: config_id = {config_id}, workspace_id = {workspace_id}")
if config_id is None and workspace_id is None:
raise ValueError(
f"No memory configuration found for end_user {end_user_id}. Please ensure the user has a connected memory configuration.")
@@ -529,7 +540,7 @@ class MemoryAgentService:
raise ValueError(f"Unable to determine memory configuration for end_user {end_user_id}: {e}")
# If config_id was provided, continue without workspace_id fallback
logger.info(f"Read operation for group {end_user_id} with config_id {config_id}")
logger.info(f"Read operation for end_user_id: {end_user_id} with config_id: {config_id}")
config_load_start = time.time()
try:
@@ -840,16 +851,16 @@ class MemoryAgentService:
workspace_id = connected_config.get('workspace_id')
if config_id is None:
config_id = connected_config.get('memory_config_id')
logger.info(f"Resolved config from end_user: config_id={config_id}, workspace_id={workspace_id}")
logger.info(f"Resolved config from end_user: config_id = {config_id}, workspace_id = {workspace_id}")
if config_id is None and workspace_id is None:
raise ValueError(
f"No memory configuration found for end_user {end_user_id}. Please ensure the user has a connected memory configuration.")
f"No memory configuration found for end_user_id {end_user_id}. Please ensure the user has a connected memory configuration.")
except Exception as e:
if "No memory configuration found" in str(e):
raise # Re-raise our specific error
logger.error(f"Failed to get connected config for end_user {end_user_id}: {e}")
logger.error(f"Failed to get connected config for end_user_id {end_user_id}: {e}")
if config_id is None:
raise ValueError(f"Unable to determine memory configuration for end_user {end_user_id}: {e}")
raise ValueError(f"Unable to determine memory configuration for end_user_id {end_user_id}: {e}")
# If config_id was provided, continue without workspace_id fallback
logger.info(f"Generating summary from retrieve info for query: {query[:50]}...")
@@ -1250,7 +1261,7 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
from app.models.end_user_model import EndUser
from app.services.memory_config_service import MemoryConfigService
logger.info(f"Getting connected config for end_user: {end_user_id}")
logger.info(f"Getting connected config for end_user_id: {end_user_id}")
# TODO: check sources for enduserid, should be one of these three: chat, draft, apikey
# 1. 获取 end_user 及其 app_id
@@ -1351,7 +1362,7 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An
}
logger.info(
f"Successfully retrieved connected config: memory_config_id={memory_config_id}, workspace_id={end_user.workspace_id}")
f"Successfully retrieved connected config: memory_config_id = {memory_config_id}, workspace_id = {end_user.workspace_id}")
return result

View File

@@ -1279,7 +1279,7 @@ def write_message_task(
with get_db_context() as db:
logger.info(
f"[CELERY WRITE] Executing MemoryAgentService.write_memory "
f"with config_id={actual_config_id} (type: {type(actual_config_id).__name__}), language={language}")
f"with config_id = {actual_config_id} (type: {type(actual_config_id).__name__}), language={language}")
service = MemoryAgentService()
result = await service.write_memory(
WriteMemoryRequest(