diff --git a/api/app/core/language_utils.py b/api/app/core/language_utils.py index 5ed2b9ea..39a81636 100644 --- a/api/app/core/language_utils.py +++ b/api/app/core/language_utils.py @@ -46,6 +46,10 @@ def validate_language(language: Optional[str]) -> str: if language is None: return DEFAULT_LANGUAGE + # 处理枚举类型:优先取 .value,避免 str(Language.ZH) → "Language.ZH" + if hasattr(language, "value"): + language = language.value + # 标准化:转小写并去除空白 lang = str(language).lower().strip() diff --git a/api/app/core/logging_config.py b/api/app/core/logging_config.py index d0dda84b..285342e3 100644 --- a/api/app/core/logging_config.py +++ b/api/app/core/logging_config.py @@ -130,6 +130,10 @@ class LoggingConfig: for neo4j_logger_name in ["neo4j", "neo4j.io", "neo4j.pool", "neo4j.notifications"]: neo4j_logger = logging.getLogger(neo4j_logger_name) neo4j_logger.addFilter(neo4j_filter) + + # 压制 httpx / httpcore 的请求级日志(大量 HTTP Request: POST ... 噪音) + for noisy_logger in ["httpx", "httpcore", "httpcore.http11", "httpcore.connection"]: + logging.getLogger(noisy_logger).setLevel(logging.WARNING) # 创建格式化器 formatter = logging.Formatter( diff --git a/api/app/core/memory/agent/langgraph_graph/write_graph.py b/api/app/core/memory/agent/langgraph_graph/write_graph.py index 32fc7d8a..a18b5c2a 100644 --- a/api/app/core/memory/agent/langgraph_graph/write_graph.py +++ b/api/app/core/memory/agent/langgraph_graph/write_graph.py @@ -40,8 +40,20 @@ async def long_term_storage( # 获取数据库会话 with get_db_context() as db_session: config_service = MemoryConfigService(db_session) + # 通过 end_user_id 获取 workspace_id,确保日志和 fallback 逻辑完整 + from app.services.memory_agent_service import get_end_user_connected_config + import uuid as _uuid + workspace_id = None + try: + connected = get_end_user_connected_config(end_user_id, db_session) + raw = connected.get("workspace_id") + if raw and raw != "None": + workspace_id = _uuid.UUID(str(raw)) + except Exception: + pass memory_config = config_service.load_memory_config( - config_id=memory_config_id, # 改为整数 + config_id=memory_config_id, + workspace_id=workspace_id, service_name="MemoryAgentService" ) if long_term_type == AgentMemory_Long_Term.STRATEGY_CHUNK: diff --git a/api/app/core/memory/agent/services/parameter_builder.py b/api/app/core/memory/agent/services/parameter_builder.py index 74382ade..adf41228 100644 --- a/api/app/core/memory/agent/services/parameter_builder.py +++ b/api/app/core/memory/agent/services/parameter_builder.py @@ -15,7 +15,7 @@ class ParameterBuilder: def __init__(self): """Initialize the parameter builder.""" - logger.info("ParameterBuilder initialized") + logger.debug("ParameterBuilder initialized") def build_tool_args( self, diff --git a/api/app/core/memory/agent/services/search_service.py b/api/app/core/memory/agent/services/search_service.py index 93d1ebee..b9e9c748 100644 --- a/api/app/core/memory/agent/services/search_service.py +++ b/api/app/core/memory/agent/services/search_service.py @@ -86,7 +86,7 @@ class SearchService: def __init__(self): """Initialize the search service.""" - logger.info("SearchService initialized") + logger.debug("SearchService initialized") def extract_content_from_result(self, result: dict, node_type: str = "") -> str: """ diff --git a/api/app/core/memory/agent/services/session_service.py b/api/app/core/memory/agent/services/session_service.py index f7389984..f890f715 100644 --- a/api/app/core/memory/agent/services/session_service.py +++ b/api/app/core/memory/agent/services/session_service.py @@ -24,7 +24,7 @@ class SessionService: store: Redis session store instance """ self.store = store - logger.info("SessionService initialized") + logger.debug("SessionService initialized") def resolve_user_id(self, session_string: str) -> str: """ diff --git a/api/app/core/memory/agent/services/template_service.py b/api/app/core/memory/agent/services/template_service.py index 1bf86375..2fe93df0 100644 --- a/api/app/core/memory/agent/services/template_service.py +++ b/api/app/core/memory/agent/services/template_service.py @@ -51,7 +51,7 @@ class TemplateService: loader=FileSystemLoader(template_root), autoescape=False # Disable autoescape for prompt templates ) - logger.info(f"TemplateService initialized with root: {template_root}") + logger.debug(f"TemplateService initialized with root: {template_root}") @lru_cache(maxsize=128) def _load_template(self, template_name: str) -> Template: diff --git a/api/app/core/memory/agent/utils/get_dialogs.py b/api/app/core/memory/agent/utils/get_dialogs.py index 1180f367..24f0774f 100644 --- a/api/app/core/memory/agent/utils/get_dialogs.py +++ b/api/app/core/memory/agent/utils/get_dialogs.py @@ -10,6 +10,7 @@ async def get_chunked_dialogs( messages: list = None, ref_id: str = "", config_id: str = None, + workspace_id=None, snapshot=None, ) -> List[DialogData]: """Generate chunks from structured messages using the specified chunker strategy. @@ -76,6 +77,7 @@ async def get_chunked_dialogs( config_service = MemoryConfigService(db) memory_config = config_service.load_memory_config( config_id=config_id, + workspace_id=workspace_id, service_name="semantic_pruning" ) @@ -107,6 +109,13 @@ async def get_chunked_dialogs( remaining_msg_count = len(dialog_data.context.msgs) deleted_count = original_msg_count - remaining_msg_count logger.info(f"[剪枝] 完成: 原始{original_msg_count}条 -> 保留{remaining_msg_count}条 (删除{deleted_count}条)") + + # 将剪枝记录挂到 metadata,供 graph_build_step 构建节点 + if pruner.pruning_records: + dialog_data.metadata["assistant_pruning_records"] = [ + r.model_dump() for r in pruner.pruning_records + ] + logger.info(f"[剪枝] 收集到 {len(pruner.pruning_records)} 条剪枝记录") else: logger.warning("[剪枝] prune_dataset 返回空列表") else: diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index d4eff79e..fd260130 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -389,7 +389,7 @@ async def write( workspace_id=str(memory_config.workspace_id), stats=stats_to_cache, ) - logger.info(f"[WRITE] 活动统计已写入 Redis: workspace_id={memory_config.workspace_id}") + logger.info(f"[WRITE] 活动统计已写入 Redis: workspace_id = {memory_config.workspace_id}") except Exception as cache_err: logger.warning(f"[WRITE] 写入活动统计缓存失败(不影响主流程): {cache_err}", exc_info=True) diff --git a/api/app/core/memory/analytics/implicit_memory/llm_client.py b/api/app/core/memory/analytics/implicit_memory/llm_client.py index f72e49ec..39bb2f69 100644 --- a/api/app/core/memory/analytics/implicit_memory/llm_client.py +++ b/api/app/core/memory/analytics/implicit_memory/llm_client.py @@ -64,7 +64,7 @@ class ImplicitMemoryLLMClient: self.default_model_id = default_model_id self._client_factory = MemoryClientFactory(db) - logger.info("ImplicitMemoryLLMClient initialized") + logger.debug("ImplicitMemoryLLMClient initialized") def _get_llm_client(self, model_id: Optional[str] = None): """Get LLM client instance. diff --git a/api/app/core/memory/pipelines/write_pipeline.py b/api/app/core/memory/pipelines/write_pipeline.py index a6927847..f2a55ae3 100644 --- a/api/app/core/memory/pipelines/write_pipeline.py +++ b/api/app/core/memory/pipelines/write_pipeline.py @@ -17,10 +17,11 @@ from __future__ import annotations import asyncio import logging -import time import uuid from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional +from app.core.memory.utils.log.bear_logger import BearLogger + from pydantic import BaseModel, Field, ConfigDict if TYPE_CHECKING: @@ -40,6 +41,7 @@ from app.core.memory.models.graph_models import ( ) logger = logging.getLogger(__name__) +bear = BearLogger("memory.pipeline") # ────────────────────────────────────────────── @@ -165,112 +167,82 @@ class WritePipeline: is_pilot_run: 试运行模式(只萃取不写入) Returns: - WriteResult 包含状态和统计信息 + WriteResult 包含状态和统计信息 """ if not ref_id: ref_id = uuid.uuid4().hex mode = "试运行" if is_pilot_run else "正式" - pipeline_start = time.time() - - logger.info( - f"[WritePipeline] 开始 ({mode}) " - f"config={self.memory_config.config_name}, " - f"end_user={self.end_user_id}" - ) + extraction_result = None try: - # 初始化客户端和连接 - self._init_clients() - self._init_neo4j_connector() + async with bear.pipeline( + "WritePipeline", + mode=mode, + config_name=self.memory_config.config_name, + end_user_id=self.end_user_id, + ): + # 初始化客户端和连接 + self._init_clients() + self._init_neo4j_connector() - # 初始化 Snapshot(提前创建,供预处理阶段的剪枝使用) - from app.core.memory.utils.debug.pipeline_snapshot import PipelineSnapshot - self._snapshot = PipelineSnapshot("new") + # 初始化 Snapshot(提前创建,供预处理阶段的剪枝使用) + from app.core.memory.utils.debug.pipeline_snapshot import PipelineSnapshot + self._snapshot = PipelineSnapshot("new") - # Step 1: 预处理 - 消息分块 + AI消息语义剪枝 - step_start = time.time() - chunked_dialogs = await self._preprocess(messages, ref_id) - chunks_count = sum(len(d.chunks) for d in chunked_dialogs) - logger.info( - f"[WritePipeline] [1/5] 预处理:消息分块 " - f"✔ {time.time() - step_start:.2f}s chunks={chunks_count}" - ) + # Step 1: 预处理 - 消息分块 + AI消息语义剪枝 + async with bear.step(1, 5, "预处理", "消息分块") as s: + chunked_dialogs = await self._preprocess(messages, ref_id) + s.metadata(chunks=sum(len(d.chunks) for d in chunked_dialogs)) - # Step 2: 萃取 - 知识提取 - step_start = time.time() - extraction_result = await self._extract(chunked_dialogs, is_pilot_run) - stats = extraction_result.stats - logger.info( - f"[WritePipeline] [2/5] 萃取:知识提取 " - f"✔ {time.time() - step_start:.2f}s " - f"entities={stats['entity_count']}, " - f"statements={stats['statement_count']}, " - f"relations={stats['relation_count']}" - ) + # Step 2: 萃取 - 知识提取 + async with bear.step(2, 5, "萃取", "知识提取") as s: + extraction_result = await self._extract(chunked_dialogs, is_pilot_run) + stats = extraction_result.stats + s.metadata( + entities=stats["entity_count"], + statements=stats["statement_count"], + relations=stats["relation_count"], + ) + + # 试运行模式到此结束 + if is_pilot_run: + return WriteResult( + status="pilot_complete", + extraction=extraction_result.stats, + elapsed_seconds=0.0, + ) + + # Step 3: 存储 - 写入 Neo4j + async with bear.step(3, 5, "存储", "写入 Neo4j"): + await self._store(extraction_result) + + # Step 3.2: 别名归并 + async with bear.step(3, 5, "别名归并", "处理别名属于关系"): + await self._merge_alias_belongs_to(extraction_result) + + # Step 3.5: 异步情绪提取(fire-and-forget,需在 _store 之后确保 Statement 节点已存在) + await self._extract_emotion(getattr(self, "_emotion_statements", [])) + + # Step 4: 聚类 - 增量更新社区(异步,不阻塞) + async with bear.step(4, 5, "聚类", "增量更新社区") as s: + await self._cluster(extraction_result) + s.metadata(mode="async") + + # Step 5: 摘要 - 生成情景记忆摘要 + async with bear.step(5, 5, "摘要", "生成情景记忆"): + await self._summarize(chunked_dialogs) + + # 更新活动统计缓存 + await self._update_stats_cache(extraction_result) - # 试运行模式到此结束 - if is_pilot_run: - elapsed = time.time() - pipeline_start - logger.info(f"[WritePipeline] 完成(试运行) ✔ {elapsed:.2f}s") return WriteResult( - status="pilot_complete", + status="success", extraction=extraction_result.stats, - elapsed_seconds=elapsed, + elapsed_seconds=0.0, ) - # Step 3: 存储 - 写入 Neo4j - step_start = time.time() - await self._store(extraction_result) - logger.info( - f"[WritePipeline] [3/5] 存储:写入 Neo4j " - f"✔ {time.time() - step_start:.2f}s" - ) - - # Step 3.2: 别名归并 - 处理 predicate="别名属于" 的关系 - step_start = time.time() - await self._merge_alias_belongs_to(extraction_result) - logger.info( - f"[WritePipeline] [3.2] 别名归并:处理别名属于关系 " - f"✔ {time.time() - step_start:.2f}s" - ) - - # Step 3.5: 异步情绪提取(fire-and-forget,需在 _store 之后确保 Statement 节点已存在) - await self._extract_emotion(getattr(self, "_emotion_statements", [])) - - # Step 4: 聚类 - 增量更新社区(异步,不阻塞) - step_start = time.time() - await self._cluster(extraction_result) - logger.info( - f"[WritePipeline] [4/5] 聚类:增量更新社区 " - f"✔ {time.time() - step_start:.2f}s mode=async" - ) - - # Step 5: 摘要 - 生成情景记忆摘要 - step_start = time.time() - await self._summarize(chunked_dialogs) - logger.info( - f"[WritePipeline] [5/5] 摘要:生成情景记忆 " - f"✔ {time.time() - step_start:.2f}s" - ) - - # 更新活动统计缓存 - await self._update_stats_cache(extraction_result) - - elapsed = time.time() - pipeline_start - logger.info(f"[WritePipeline] 完成 ✔ {elapsed:.2f}s") - return WriteResult( - status="success", - extraction=extraction_result.stats, - elapsed_seconds=elapsed, - ) - - except Exception as e: - elapsed = time.time() - pipeline_start - logger.error( - f"[WritePipeline] 失败 ✘ {elapsed:.2f}s error={e}", - exc_info=True, - ) + except Exception: raise finally: @@ -300,6 +272,7 @@ class WritePipeline: messages=messages, ref_id=ref_id, config_id=str(self.memory_config.config_id), + workspace_id=self.memory_config.workspace_id, snapshot=snapshot, ) @@ -409,7 +382,7 @@ class WritePipeline: ] snapshot.save_stage("5_embedding_outputs", emb_data) - # step2: 构建图节点和边 + # step2: 构建图节点和边 graph = await build_graph_nodes_and_edges( dialog_data_list=dialog_data_list, embedder_client=self._embedder_client, @@ -445,7 +418,7 @@ class WritePipeline: }, ) - # step3: 两阶段去重消歧 + # step3: 两阶段去重消歧 dedup_result = await run_dedup( entity_nodes=graph.entity_nodes, statement_entity_edges=graph.stmt_entity_edges, @@ -482,7 +455,7 @@ class WritePipeline: ], }, ) - + # step4: 构造最终结果 result = ExtractionResult( dialogue_nodes=graph.dialogue_nodes, @@ -501,7 +474,7 @@ class WritePipeline: dialog_data_list=dialog_data_list, ) - snapshot.save_summary(result.stats) # TODO 乐力齐 snapshot需要改 + snapshot.save_summary(result.stats) # TODO 乐力齐 snapshot需要改 return result # ────────────────────────────────────────────── @@ -545,7 +518,7 @@ class WritePipeline: assistant_dialog_edges=result.assistant_dialog_edges, ) if success: - logger.info("Successfully saved all data to Neo4j") + logger.debug("Successfully saved all data to Neo4j") return # 写入返回 False(部分失败) if attempt < max_retries - 1: @@ -584,7 +557,8 @@ class WritePipeline: # 筛选出所有 predicate="别名属于" 的边 alias_edges = [ - e for e in result.entity_entity_edges + e + for e in result.entity_entity_edges if getattr(e, "relation_type", "") == ALIAS_PREDICATE or getattr(e, "predicate", "") == ALIAS_PREDICATE ] @@ -608,14 +582,18 @@ class WritePipeline: if source_name: existing_lower = {a.lower() for a in (target_node.aliases or [])} if source_name.lower() not in existing_lower: - target_node.aliases = list(target_node.aliases or []) + [source_name] + target_node.aliases = list(target_node.aliases or []) + [ + source_name + ] # 将 source.description append 进 target.description(追加,分号分隔) src_desc = (source_node.description or "").strip() if src_desc: tgt_desc = (target_node.description or "").strip() if src_desc not in tgt_desc: - target_node.description = f"{tgt_desc};{src_desc}" if tgt_desc else src_desc + target_node.description = ( + f"{tgt_desc};{src_desc}" if tgt_desc else src_desc + ) logger.info( f"[AliasMerge] 内存同步完成,处理 {len(alias_edges)} 条 '别名属于' 边" @@ -627,12 +605,12 @@ class WritePipeline: end_user_id=self.end_user_id, ) merged_count = len(records) if records else 0 - logger.info( - f"[AliasMerge] Neo4j 别名归并完成,影响 {merged_count} 条记录" - ) + logger.info(f"[AliasMerge] Neo4j 别名归并完成,影响 {merged_count} 条记录") except Exception as e: - logger.warning(f"[AliasMerge] 别名归并失败(不影响主流程): {e}", exc_info=True) + logger.warning( + f"[AliasMerge] 别名归并失败(不影响主流程): {e}", exc_info=True + ) # ────────────────────────────────────────────── # Step 4: 聚类 @@ -675,8 +653,8 @@ class WritePipeline: ) logger.info( f"[Clustering] 增量聚类任务已提交 - " - f"task_id={task.id}, " - f"entity_count={len(new_entity_ids)}, " + f"task_id = {task.id}, " + f"entity_count = {len(new_entity_ids)}, " f"source=dedup" ) except Exception as e: @@ -734,9 +712,9 @@ class WritePipeline: ) logger.info( f"[Emotion] 异步情绪提取任务已提交 - " - f"task_id={result.id}, " - f"statement_count={len(emotion_statements)}, " - f"snapshot_dir={snapshot_dir}, " + f"task_id = {result.id}, " + f"statement_count = {len(emotion_statements)}, " + f"snapshot_dir = {snapshot_dir}, " f"source=async" ) except Exception as e: @@ -749,7 +727,7 @@ class WritePipeline: # Step 5: 摘要 # (+ entity_description)+ meta_data部分在此提取 # ────────────────────────────────────────────── -# TODO 乐力齐 需要做成异步celery任务 + # TODO 乐力齐 需要做成异步celery任务 async def _summarize(self, chunked_dialogs: List[DialogData]) -> None: """ 摘要:生成情景记忆摘要 → 写入 Neo4j。 @@ -877,8 +855,7 @@ class WritePipeline: external_assistant_aliases=neo4j_assistant_aliases, ) logger.info( - f"别名清洗完成,AI助手别名排除集大小: " - f"{len(neo4j_assistant_aliases)}" + f"别名清洗完成,AI助手别名排除集大小: {len(neo4j_assistant_aliases)}" ) except Exception as e: logger.warning(f"别名清洗失败(不影响主流程): {e}") @@ -911,8 +888,7 @@ class WritePipeline: stats=stats, ) logger.info( - f"活动统计已写入 Redis: " - f"workspace_id={self.memory_config.workspace_id}" + f"活动统计已写入 Redis: workspace_id={self.memory_config.workspace_id}" ) except Exception as e: logger.warning(f"写入活动统计缓存失败(不影响主流程): {e}") diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py index 049c265f..4c3be0c6 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py @@ -1303,6 +1303,7 @@ class ExtractionOrchestrator: # 只有当两个实体ID都存在时才创建边 if subject_entity_id and object_entity_id: + _tv = getattr(statement, "temporal_validity", None) entity_entity_edge = EntityEntityEdge( source=subject_entity_id, target=object_entity_id, @@ -1314,6 +1315,8 @@ class ExtractionOrchestrator: run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id created_at=dialog_data.created_at, expired_at=dialog_data.expired_at, + valid_at=_tv.valid_at if _tv else None, + invalid_at=_tv.invalid_at if _tv else None, ) entity_entity_edges.append(entity_entity_edge) diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_pipeline_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_pipeline_orchestrator.py index 5c158083..16082804 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_pipeline_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_pipeline_orchestrator.py @@ -105,7 +105,7 @@ class NewExtractionOrchestrator: SidecarTiming.AFTER_TRIPLET ] - logger.info( + logger.debug( "NewExtractionOrchestrator initialised — " "after_statement sidecars: %d, after_triplet sidecars: %d", len(self.after_statement_sidecars), @@ -274,11 +274,11 @@ class NewExtractionOrchestrator: ) -> List[DialogData]: """Pilot mode: statement + triplet extraction only, no sidecars or embeddings.""" # Phase 1: Statement extraction (chunk-level parallel) - logger.info("Pilot phase 1/2: Statement extraction") + logger.debug("Pilot phase 1/2: Statement extraction") all_stmt_results = await self._extract_all_statements(dialog_data_list) # Phase 2: Triplet extraction (statement-level parallel) - logger.info("Pilot phase 2/2: Triplet extraction") + logger.debug("Pilot phase 2/2: Triplet extraction") all_triplet_results = await self._extract_all_triplets( dialog_data_list, all_stmt_results ) @@ -327,7 +327,7 @@ class NewExtractionOrchestrator: }, ) - logger.info("Pilot extraction complete") + logger.debug("Pilot extraction complete") return dialog_data_list # ── 3b. 正式模式:四阶段并发执行 ── @@ -338,7 +338,7 @@ class NewExtractionOrchestrator: """Full mode: all four phases with concurrent sidecars and embeddings.""" # ── Phase 1: Statement extraction + chunk/dialog embedding ── - logger.info("Phase 1/4: Statement extraction + chunk/dialog embedding") + logger.debug("Phase 1/4: Statement extraction + chunk/dialog embedding") chunk_dialog_emb_input = self._build_chunk_dialog_embedding_input( dialog_data_list ) @@ -367,7 +367,7 @@ class NewExtractionOrchestrator: logger.warning("Chunk/dialog embedding failed: %s", phase1_results[1]) # ── Phase 2: Triplet extraction + after_statement sidecars + statement embedding ── - logger.info( + logger.debug( "Phase 2/4: Triplet extraction + sidecars + statement embedding" ) stmt_emb_input = self._build_statement_embedding_input( @@ -403,7 +403,7 @@ class NewExtractionOrchestrator: ) # ── Phase 3: Entity embedding + after_triplet sidecars ── - logger.info("Phase 3/4: Entity embedding + after_triplet sidecars") + logger.debug("Phase 3/4: Entity embedding + after_triplet sidecars") entity_emb_input = self._build_entity_embedding_input(all_triplet_results) after_triplet_pairs: List[Tuple[ExtractionStep, Any]] = [] @@ -430,7 +430,7 @@ class NewExtractionOrchestrator: merged_emb = self._merge_embeddings(chunk_dialog_emb, stmt_emb, entity_emb) # ── Phase 4: Data assignment ── - logger.info("Phase 4/4: Data assignment") + logger.debug("Phase 4/4: Data assignment") self._assign_results( dialog_data_list, @@ -453,7 +453,7 @@ class NewExtractionOrchestrator: "embedding_output": merged_emb, } - logger.info("Full extraction pipeline complete") + logger.debug("Full extraction pipeline complete") return dialog_data_list @property diff --git a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/chunk_extraction.py b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/chunk_extraction.py index bbbf1c51..55316950 100644 --- a/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/chunk_extraction.py +++ b/api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/chunk_extraction.py @@ -53,7 +53,7 @@ class DialogueChunker: ) self.chunker_strategy = chunker_strategy - logger.info(f"Initializing DialogueChunker with strategy: {chunker_strategy}") + logger.debug(f"Initializing DialogueChunker with strategy: {chunker_strategy}") try: # Load and validate configuration @@ -71,7 +71,7 @@ class DialogueChunker: else: self.chunker_client = ChunkerClient(self.chunker_config) - logger.info(f"DialogueChunker initialized successfully with strategy: {chunker_strategy}") + logger.debug(f"DialogueChunker initialized successfully with strategy: {chunker_strategy}") except Exception as e: logger.error(f"Failed to initialize DialogueChunker: {e}", exc_info=True) @@ -101,7 +101,7 @@ class DialogueChunker: f"Messages: {len(dialogue.context.msgs) if dialogue.context else 0}" ) - logger.info( + logger.debug( f"Processing dialogue {dialogue.ref_id} with {len(dialogue.context.msgs)} messages " f"using strategy: {self.chunker_strategy}" ) @@ -121,7 +121,7 @@ class DialogueChunker: ) logger.info( - f"Successfully generated {len(chunks)} chunks for dialogue {dialogue.ref_id}. " + f"Successfully generated {len(chunks)} chunks for dialogue_id: {dialogue.ref_id}. " f"Total characters processed: {len(dialogue.content) if dialogue.content else 0}" ) diff --git a/api/app/core/memory/utils/log/audit_logger.py b/api/app/core/memory/utils/log/audit_logger.py index f80ad4d5..4afac614 100644 --- a/api/app/core/memory/utils/log/audit_logger.py +++ b/api/app/core/memory/utils/log/audit_logger.py @@ -142,7 +142,7 @@ class ConfigAuditLogger: result = "SUCCESS" if success else "FAILED" msg = ( f"{operation.upper()} config_id={config_id} " - f"group={end_user_id} result={result}" + f"end_user_id={end_user_id} result={result}" ) if duration is not None: msg += f" duration={duration:.2f}s" diff --git a/api/app/core/memory/utils/prompt/prompts/extract_triplet.jinja2 b/api/app/core/memory/utils/prompt/prompts/extract_triplet.jinja2 index fa868104..421b7381 100644 --- a/api/app/core/memory/utils/prompt/prompts/extract_triplet.jinja2 +++ b/api/app/core/memory/utils/prompt/prompts/extract_triplet.jinja2 @@ -12,6 +12,7 @@ Extract entities and knowledge triplets from the given statement. - 命名关系中新出现的称呼、别名、昵称、产品名保持原样,不做替换。 - `description` 使用中文。 - `type`、`predicate`、`type_description`、`predicate_description` 一律使用中文。 +- 每个 `triplet` 都必须携带 `valid_at` 和 `invalid_at`,并直接复制输入中的同名字段,不要自行改写或推断新的时间边界。 {% else %} Important: - Keep `name`, `subject_name`, and `object_name` in their original surface form from the source text by default. @@ -22,6 +23,7 @@ Extract entities and knowledge triplets from the given statement. - Newly introduced names in naming or alias expressions must stay in their original form. - Generate `description` in English. - Always generate `type`, `predicate`, `type_description`, and `predicate_description` in Chinese. +- Every `triplet` MUST include `valid_at` and `invalid_at`, copied directly from the input fields with the same names; do not rewrite or infer new temporal bounds. {% endif %} ===Inputs=== @@ -110,7 +112,8 @@ Primary statement to analyze: - 不要从 `supporting_context.msgs` 中单独抽取实体或关系。 - 如果某条信息只出现在 `supporting_context.msgs` 中,而没有出现在 `statement_text` 中,就不要输出它。 - 如果 `supporting_context.msgs` 中的 Assistant 消息包含总结、猜测、解释或改写,这些内容只能作为理解辅助,不能直接作为抽取来源。 -- `statement_type`、`temporal_type`、`valid_at`、`invalid_at` 是辅助理解字段,不是抽取目标。 +- `statement_type`、`temporal_type` 是辅助理解字段,不是抽取目标。 +- `valid_at`、`invalid_at` 不用于决定是否创建实体或关系,但如果产生 triplet,必须原样复制到每个 triplet 的同名字段中。 - 对 `statement_text` 中的用户自指表达,要统一规范成实体 `用户`。 - 对其他可稳定解析的代词或指示表达,要替换为具体指代实体名后再抽取。 - 对命名关系中新出现的称呼、别名、昵称、产品名,不要因为上下文可推断其所指而直接改写,它们应保持原样作为实体名。 @@ -120,7 +123,8 @@ Primary statement to analyze: - Do not extract any standalone entity or relation from `supporting_context.msgs`. - If some information appears only in `supporting_context.msgs` but not in `statement_text`, do not include it in the output. - If Assistant messages in `supporting_context.msgs` contain summary, guess, interpretation, or rephrasing, use them only as interpretive support and never as a direct extraction source. -- Treat `statement_type`, `temporal_type`, `valid_at`, and `invalid_at` as auxiliary context, not extraction targets. +- Treat `statement_type` and `temporal_type` as auxiliary context, not extraction targets. +- Do not use `valid_at` or `invalid_at` to decide whether to create entities or relations, but if any triplet is produced, copy them verbatim into every triplet field with the same names. - Normalize user self-reference in `statement_text` to the entity `用户`. - Replace other resolvable pronouns or demonstratives with their resolved entity names before extraction. - For newly introduced names in naming or alias expressions, do not rewrite them even if the context reveals who they refer to; keep them as entity names. @@ -229,7 +233,7 @@ Primary statement to analyze: - unresolved 或边界不稳的表达,不因“看起来像名词”就创建实体。 - 情绪、心理状态、金额、数量、普通时间、一次性动作短语,默认不作为独立实体类型抽取。 -- 抽象命题片段、泛化结果、价值判断,默认不创建实体;如有保留价值,应写入相关高价值实体的 `description`。 +- 抽象命题片段、泛化结果、价值判断,默认不创建实体;如有保留价值且适合作为实体的稳定描述,可写入相关高价值实体的 `description`。 实体类型选择原则: @@ -445,9 +449,9 @@ Do not let auxiliary fields drive the extraction process. - 不要为了表达一句带时间或地点的行动,而额外创造“任务”“计划”“事件”实体。 - 但如果动作明确把主体和某个稳定实体连接起来,可以保留该稳定实体,并抽取轻关系。例如“我去图书馆”“我去公司开会”“我去上课”“我去看演唱会”可以抽取 `前往`。 - 当句子只是在讨论一般道理、抽象规律、空泛结果或非个体化概念,而这些概念本身不构成可复用记忆时,不要创建实体。 -- 如果句子表达的是用户的观点、信念、判断、愿望或目标倾向,但其中抽象对象不值得作为独立实体保留,则只保留相关高价值实体,不要再创建这些低价值对象实体,并把未抽取的抽象内容压缩写入相关实体的 `description`;例如“用户认为努力就会有回报”应只保留 `用户`,并在 `description` 中体现“用户认为努力就会有回报”。 -- 对于未抽取的抽象实体、抽象命题片段或泛化结果,只要它们对理解该高价值实体有帮助,就应优先写入该实体的 `description`,而不是改用宽泛关系或补造弱实体。 -- 当前阶段同样不要把情绪或心理状态抽成实体;如果句子里出现“紧张”“开心”“难过”“焦虑”“放松”等,应写入相关高价值实体的 `description`,而不是把它们标成 `知识能力`、`偏好习惯目标` 或其他近似类型。 +- 如果句子表达的是用户的观点、信念、判断、愿望或目标倾向,但其中抽象对象不值得作为独立实体保留,则只保留相关高价值实体,不要再创建这些低价值对象实体;只有当未抽取内容适合作为该实体的稳定描述时,才写入相关实体的 `description`。例如“用户认为努力就会有回报”可只保留 `用户`,并在 `description` 中体现这一较稳定认知倾向。 +- 对于未抽取的抽象实体、抽象命题片段或泛化结果,不要默认全部写入 `description`;只有当它们适合作为该实体的稳定描述、且对后续区分或理解该实体有帮助时,才写入 `description`。 +- 当前阶段同样不要把情绪或心理状态抽成实体;如果句子里出现“紧张”“开心”“难过”“焦虑”“放松”等,只有在它们能被稳定概括为较持久的认知或态度时,才可间接体现在相关高价值实体的 `description` 中;短期情绪状态本身不要写入 `description`。 - 如果陈述里有值得保留的实体信息,但没有有效关系,可以只返回 `entities`,并把 `triplets` 设为 `[]`。 - `name` 默认保持原文中的表面形式,但用户自指必须写成 `用户`,可稳定解析的其他代词必须替换为具体指代实体名。 - `description` 必须使用中文。 @@ -461,9 +465,9 @@ Do not let auxiliary fields drive the extraction process. - Do not create extra "task", "plan", or "event" entities just to represent an action with time or location modifiers. - But if an action clearly connects the subject to a stable entity, keep that stable entity and use a light relation. For example, statements like "I go to the library", "I go to the office", "I go to class", or "I go to a concert" can use `前往`. - If the sentence is only about a generic principle, abstract outcome, or non-personalized concept that is not worth remembering on its own, do not create an entity for it. -- If a statement expresses the user's belief, judgment, opinion, wish, or goal tendency but the referenced abstract concepts are not worth keeping as standalone entities, keep only the relevant high-value entities, do not create those low-value concept entities, and compress the unextracted abstract content into the relevant entity `description`. For example, "the user believes effort brings reward" should keep only `用户` and reflect that belief in `description`. -- For abstract entities, proposition fragments, or generic outcomes that are not extracted, prefer writing them into the relevant retained entity's `description` when they help preserve the memory, instead of switching to a broad relation or inventing a weak entity. -- In the current stage, do not extract emotional or psychological states as entities. States such as nervousness, happiness, sadness, anxiety, or relief should be written into the relevant retained entity's `description` rather than mapped to `知识能力`, `偏好习惯目标`, or any other approximate type. +- If a statement expresses the user's belief, judgment, opinion, wish, or goal tendency but the referenced abstract concepts are not worth keeping as standalone entities, keep only the relevant high-value entities and do not create those low-value concept entities; write the unextracted content into an entity `description` only when it is suitable as a stable description of that entity. For example, "the user believes effort brings reward" may keep only `用户` and reflect that relatively stable belief in `description`. +- For abstract entities, proposition fragments, or generic outcomes that are not extracted, do not automatically write them into `description`; only do so when they are suitable as a stable description of the retained entity and help identify or understand it. +- In the current stage, do not extract emotional or psychological states as entities. States such as nervousness, happiness, sadness, anxiety, or relief should not be mapped to `知识能力`, `偏好习惯目标`, or any other approximate type, and short-lived emotional states should not be written into `description`. - If the statement contains entity-worthy content but no valid relation, it is acceptable to return `entities` with `triplets: []`. - Keep `name` in its original surface form by default, but write user self-reference as `用户` and replace other stably resolvable references with their resolved entity names. - `description` must be in English. @@ -487,11 +491,17 @@ Do not let auxiliary fields drive the extraction process. - `description` 应短、直白、与当前上下文相关,并能帮助区分实体。 - 优先描述实体在当前陈述和必要上下文中的身份、作用或关系。 +- `description` 只保留适合长期附着在该实体上的描述,例如稳定身份、稳定关系、长期偏好/兴趣/习惯、较稳定认知倾向或可用于区分实体的持久特征。 +- 不要把短期状态、一次性事件、临时计划、当前情绪、具体时间锚点,或只在当前句子里短暂成立的信息写进 `description`。 +- 如果实体应保留,但当前 statement 中没有适合长期附着在该实体上的稳定描述,则 `description` 允许为空字符串 `""`;不要为了填充 `description` 而写入短期状态或临时信息。 - 避免使用“陈述中提到的人物”“陈述中提到的组织”“陈述中提到的物品”这类低信息量模板。 - 不要补充识别实体所不需要的外部知识。 {% else %} - `description` should be short, context-grounded, and discriminative. - Prefer describing the entity's role, identity, or relation in the current statement and necessary supporting context. +- `description` should keep only information suitable to remain attached to the entity over time, such as stable identity, stable relations, long-term preferences/interests/habits, relatively stable beliefs, or persistent distinguishing traits. +- Do not put short-lived states, one-off events, temporary plans, current emotions, concrete time anchors, or information that only briefly holds in the current sentence into `description`. +- If an entity should be retained but the current statement does not provide any suitable stable description for it, `description` may be the empty string `""`; do not fill it with short-lived states or temporary information just to avoid emptiness. - Avoid low-information templates such as "the person mentioned in the statement" or "the organization mentioned in the statement". - Do not add extra world knowledge that is not needed for identifying the entity in context. {% endif %} @@ -518,6 +528,7 @@ Do not let auxiliary fields drive the extraction process. - 如果 `has_unsolved_reference` 是 `true`,不要抽取实体或 triplets。 - `subject_name` 和 `object_name` 默认保持原文中的表面形式,但用户自指必须写成 `用户`,可稳定解析的其他代词必须替换为具体指代实体名。 - `predicate_description` 必须直接复用对应 `predicate` 的中文定义。 +- 每个 triplet 都必须包含 `valid_at` 和 `invalid_at`,并直接复用输入中的同名字段值;如果输入是 `NULL`,这里也写 `NULL`。 - 不要把普通时间表达作为 triplet 的宾语。 - 不要为了表达一次性计划、安排、日程而强行构造关系。 - 当句子表达主体去某个地点、场所、组织、课程或活动时,只要该对象本身有记忆价值,就可以抽取 `前往`,即使句中同时带有时间信息。 @@ -537,6 +548,7 @@ Do not let auxiliary fields drive the extraction process. - If `has_unsolved_reference` is `true`, do not extract entities or triplets. - Keep `subject_name` and `object_name` in their original surface form by default, but write user self-reference as `用户` and replace other stably resolvable references with their resolved entity names. - `predicate_description` must directly reuse the corresponding Chinese definition of `predicate`. +- Every triplet must include `valid_at` and `invalid_at`, copied directly from the input fields with the same names; if the input is `NULL`, write `NULL` here as well. - Do not use ordinary time expressions as triplet objects. - Do not force relations just to encode one-off plans, schedules, or actions. - When the statement says that the subject goes to a place, venue, organization, class, or activity, you may extract `前往` as long as that destination itself is worth remembering, even if the statement also includes time information. @@ -589,10 +601,14 @@ Do not let auxiliary fields drive the extraction process. - 每个 triplet 中的 `subject_name` 必须与 `subject_id` 指向实体的 `name` 完全一致。 - 每个 triplet 中的 `object_name` 必须与 `object_id` 指向实体的 `name` 完全一致。 +- 每个 triplet 中的 `valid_at` 必须与输入中的 `valid_at` 完全一致。 +- 每个 triplet 中的 `invalid_at` 必须与输入中的 `invalid_at` 完全一致。 - 不要在 triplet 里使用与实体名不同的表面形式。 {% else %} - `subject_name` in each triplet MUST exactly match the `name` of the entity referenced by `subject_id`. - `object_name` in each triplet MUST exactly match the `name` of the entity referenced by `object_id`. +- `valid_at` in each triplet MUST exactly match the input `valid_at`. +- `invalid_at` in each triplet MUST exactly match the input `invalid_at`. - Do not use alternative surface forms inside triplets. {% endif %} @@ -603,7 +619,7 @@ Statement: "我住在巴黎。" Output: { "triplets": [ - {"subject_name": "用户", "subject_id": 0, "predicate": "居住于", "predicate_description": "人物居住在某地点", "object_name": "巴黎", "object_id": 1} + {"subject_name": "用户", "subject_id": 0, "predicate": "居住于", "predicate_description": "人物居住在某地点", "object_name": "巴黎", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"} ], "entities": [ {"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "居住在巴黎的说话者", "is_explicit_memory": false}, @@ -618,7 +634,7 @@ Input condition: supporting context has already made it clear that “他” ref Output: { "triplets": [ - {"subject_name": "张明", "subject_id": 0, "predicate": "任职于", "predicate_description": "主体在某组织中工作或任职", "object_name": "腾讯", "object_id": 1} + {"subject_name": "张明", "subject_id": 0, "predicate": "任职于", "predicate_description": "主体在某组织中工作或任职", "object_name": "腾讯", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"} ], "entities": [ {"entity_idx": 0, "name": "张明", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "在腾讯工作的人员", "is_explicit_memory": false}, @@ -632,11 +648,11 @@ Statement: "我常去图书馆学微积分。" Output: { "triplets": [ - {"subject_name": "用户", "subject_id": 0, "predicate": "前往", "predicate_description": "主体前往某个地点、场所、组织、课程或活动", "object_name": "图书馆", "object_id": 1}, - {"subject_name": "用户", "subject_id": 0, "predicate": "学习", "predicate_description": "主体正在学习某知识主题或技能", "object_name": "微积分", "object_id": 2} + {"subject_name": "用户", "subject_id": 0, "predicate": "前往", "predicate_description": "主体前往某个地点、场所、组织、课程或活动", "object_name": "图书馆", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"}, + {"subject_name": "用户", "subject_id": 0, "predicate": "学习", "predicate_description": "主体正在学习某知识主题或技能", "object_name": "微积分", "object_id": 2, "valid_at": "NULL", "invalid_at": "NULL"} ], "entities": [ - {"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "经常去图书馆学习微积分的说话者", "is_explicit_memory": false}, + {"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "经常在图书馆学习微积分的说话者", "is_explicit_memory": false}, {"entity_idx": 1, "name": "图书馆", "type": "地点设施", "type_description": "具有地理意义或功能性空间意义的位置与场所。", "description": "用户经常前往学习的地点", "is_explicit_memory": false}, {"entity_idx": 2, "name": "微积分", "type": "知识能力", "type_description": "可学习、掌握、使用或讨论的知识主题、技能、学科或语言。", "description": "用户经常学习的主题", "is_explicit_memory": true} ] @@ -658,8 +674,8 @@ Statement: "我的朋友都叫我山哥。" Output: { "triplets": [ - {"subject_name": "山哥", "subject_id": 2, "predicate": "别名属于", "predicate_description": "别名指向其对应的规范实体", "object_name": "用户", "object_id": 0}, - {"subject_name": "我的朋友", "subject_id": 1, "predicate": "使用称呼", "predicate_description": "主体使用某个名字来称呼另一实体", "object_name": "山哥", "object_id": 2} + {"subject_name": "山哥", "subject_id": 2, "predicate": "别名属于", "predicate_description": "别名指向其对应的规范实体", "object_name": "用户", "object_id": 0, "valid_at": "NULL", "invalid_at": "NULL"}, + {"subject_name": "我的朋友", "subject_id": 1, "predicate": "使用称呼", "predicate_description": "主体使用某个名字来称呼另一实体", "object_name": "山哥", "object_id": 2, "valid_at": "NULL", "invalid_at": "NULL"} ], "entities": [ {"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "被朋友称作山哥的说话者", "is_explicit_memory": false}, @@ -697,7 +713,7 @@ Output: { "triplets": [], "entities": [ - {"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "最近有些紧张并认为这很正常的说话者", "is_explicit_memory": false} + {"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "", "is_explicit_memory": false} ] } @@ -707,7 +723,7 @@ Statement: "王教授是导师。" Output: { "triplets": [ - {"subject_name": "王教授", "subject_id": 0, "predicate": "担任角色", "predicate_description": "主体承担某个角色", "object_name": "导师", "object_id": 1} + {"subject_name": "王教授", "subject_id": 0, "predicate": "担任角色", "predicate_description": "主体承担某个角色", "object_name": "导师", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"} ], "entities": [ {"entity_idx": 0, "name": "王教授", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "承担导师角色的具体个人", "is_explicit_memory": false}, @@ -721,8 +737,8 @@ Statement: "我的GitHub账号用户名是chen4。" Output: { "triplets": [ - {"subject_name": "用户", "subject_id": 0, "predicate": "拥有账号", "predicate_description": "实体具有某账号", "object_name": "GitHub账号", "object_id": 1}, - {"subject_name": "GitHub账号", "subject_id": 1, "predicate": "标识为", "predicate_description": "实体由某标识符标识", "object_name": "chen4", "object_id": 2} + {"subject_name": "用户", "subject_id": 0, "predicate": "拥有账号", "predicate_description": "实体具有某账号", "object_name": "GitHub账号", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"}, + {"subject_name": "GitHub账号", "subject_id": 1, "predicate": "标识为", "predicate_description": "实体由某标识符标识", "object_name": "chen4", "object_id": 2, "valid_at": "NULL", "invalid_at": "NULL"} ], "entities": [ {"entity_idx": 0, "name": "用户", "type": "人物", "type_description": "可稳定指向、可被当作具体个体区分和归并的个人实体。", "description": "拥有该 GitHub 账号的说话者", "is_explicit_memory": false}, @@ -737,7 +753,7 @@ Statement: "机器人查票员和我沟通。" Output: { "triplets": [ - {"subject_name": "机器人查票员", "subject_id": 0, "predicate": "沟通于", "predicate_description": "两个实体之间发生沟通或交流", "object_name": "用户", "object_id": 1} + {"subject_name": "机器人查票员", "subject_id": 0, "predicate": "沟通于", "predicate_description": "两个实体之间发生沟通或交流", "object_name": "用户", "object_id": 1, "valid_at": "NULL", "invalid_at": "NULL"} ], "entities": [ {"entity_idx": 0, "name": "机器人查票员", "type": "智能体", "type_description": "具有行动、交互或执行能力的非人主体,如机器人、AI 或其他智慧体。", "description": "与用户发生沟通的机器人主体", "is_explicit_memory": false}, @@ -757,6 +773,7 @@ JSON 要求: - `name`、`subject_name`、`object_name` 默认保持原文中的表面形式,但用户自指必须规范成 `用户`,可稳定解析的其他代词必须替换为具体指代实体名 - `description` 必须使用中文 - `type`、`predicate`、`type_description`、`predicate_description` 必须使用上方预定义的中文标签和中文说明 +- 每个 triplet 都必须包含 `valid_at` 和 `invalid_at`,并与输入中的同名字段完全一致 - 如果 `has_unsolved_reference` 是 `true`,输出必须是 `{"entities": [], "triplets": []}` - 如果存在无法稳定解析的代词或指示表达,输出也必须是 `{"entities": [], "triplets": []}` - 如果没有有效 triplet,返回 `"triplets": []` @@ -769,6 +786,7 @@ JSON 要求: - `name`, `subject_name`, and `object_name` keep their original surface forms by default, but user self-reference must be normalized to `用户` and other stably resolvable references must be replaced by their resolved entity names - `description` must be in English - `type`, `predicate`, `type_description`, and `predicate_description` must use the predefined Chinese labels and Chinese definitions above +- Every triplet must include `valid_at` and `invalid_at`, exactly matching the input fields with the same names - If `has_unsolved_reference` is `true`, the output must be `{"entities": [], "triplets": []}` - If unresolved references still remain, the output must also be `{"entities": [], "triplets": []}` - If no valid triplet exists, return `"triplets": []` @@ -799,7 +817,9 @@ Output JSON structure: "predicate": "string", "predicate_description": "string", "object_name": "string", - "object_id": 0 + "object_id": 0, + "valid_at": "ISO 8601 | NULL", + "invalid_at": "ISO 8601 | NULL" } ] } diff --git a/api/app/core/sensitive_filter.py b/api/app/core/sensitive_filter.py index 9348325d..c469f75b 100644 --- a/api/app/core/sensitive_filter.py +++ b/api/app/core/sensitive_filter.py @@ -57,10 +57,8 @@ class SensitiveDataFilter: (re.compile(r'\beyJ[A-Za-z0-9_-]+\.eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+'), "[TOKEN]"), # JWT Token 部分匹配(只有header和payload,没有signature) (re.compile(r'\beyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+(?:\.[A-Za-z0-9_-]*)?'), "[TOKEN]"), - # UUID格式的token或ID - (re.compile(r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b', re.IGNORECASE), "[UUID]"), - # API密钥格式(32位以上的字母数字组合) - (re.compile(r'\b[A-Za-z0-9]{32,}\b'), "[API_KEY]"), + # API密钥格式(64位以上的字母数字组合,避免误过滤普通业务字段) + (re.compile(r'\b[A-Za-z0-9]{64,}\b'), "[API_KEY]"), ] # 替换文本 diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 03d7ed7a..64ee5e24 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -33,8 +33,8 @@ SET s += { temporal_info: statement.temporal_info, created_at: statement.created_at, expired_at: statement.expired_at, - valid_at: statement.valid_at, - invalid_at: statement.invalid_at, + valid_at: coalesce(statement.valid_at, ""), + invalid_at: coalesce(statement.invalid_at, ""), statement_embedding: statement.statement_embedding, relevence_info: statement.relevence_info, importance_score: statement.importance_score, @@ -152,8 +152,8 @@ SET r.predicate = rel.predicate, r.statement_id = rel.statement_id, r.value = rel.value, r.statement = rel.statement, - r.valid_at = rel.valid_at, - r.invalid_at = rel.invalid_at, + r.valid_at = coalesce(rel.valid_at, ""), + r.invalid_at = coalesce(rel.invalid_at, ""), r.created_at = rel.created_at, r.expired_at = rel.expired_at, r.run_id = rel.run_id, diff --git a/api/app/repositories/neo4j/graph_saver.py b/api/app/repositories/neo4j/graph_saver.py index 6109f189..a317e299 100644 --- a/api/app/repositories/neo4j/graph_saver.py +++ b/api/app/repositories/neo4j/graph_saver.py @@ -260,7 +260,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(DIALOGUE_NODE_SAVE, dialogues=dialogue_data) dialogue_uuids = [record["uuid"] async for record in result] results['dialogues'] = dialogue_uuids - logger.info(f"Dialogues saved to Neo4j with UUIDs: {dialogue_uuids}") + logger.debug(f"Dialogues saved to Neo4j with UUIDs: {dialogue_uuids}") # 2. Save all chunk nodes in batch if chunk_nodes: @@ -269,7 +269,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(CHUNK_NODE_SAVE, chunks=chunk_data) chunk_uuids = [record["uuid"] async for record in result] results['chunks'] = chunk_uuids - logger.info(f"Successfully saved {len(chunk_uuids)} chunk nodes to Neo4j") + logger.debug(f"Successfully saved {len(chunk_uuids)} chunk nodes to Neo4j") if perceptual_nodes: from app.repositories.neo4j.cypher_queries import PERCEPTUAL_NODE_SAVE @@ -277,7 +277,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(PERCEPTUAL_NODE_SAVE, perceptuals=perceptual_data) perceptual_uuids = [record["uuid"] async for record in result] results["perceptuals"] = perceptual_uuids - logger.info(f"Successfully saved {len(perceptual_uuids)} perceptual nodes to Neo4j") + logger.debug(f"Successfully saved {len(perceptual_uuids)} perceptual nodes to Neo4j") # 3. Save all statement nodes in batch if statement_nodes: @@ -286,7 +286,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(STATEMENT_NODE_SAVE, statements=statement_data) statement_uuids = [record["uuid"] async for record in result] results['statements'] = statement_uuids - logger.info(f"Successfully saved {len(statement_uuids)} statement nodes to Neo4j") + logger.debug(f"Successfully saved {len(statement_uuids)} statement nodes to Neo4j") # 4. Save entities if entity_nodes: @@ -295,7 +295,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(EXTRACTED_ENTITY_NODE_SAVE, entities=entity_data) entity_uuids = [record["uuid"] async for record in result] results['entities'] = entity_uuids - logger.info(f"Successfully saved {len(entity_uuids)} entity nodes to Neo4j") + logger.debug(f"Successfully saved {len(entity_uuids)} entity nodes to Neo4j") # 5. Create entity relationships if entity_edges: @@ -320,7 +320,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(ENTITY_RELATIONSHIP_SAVE, relationships=relationship_data) rel_uuids = [record["uuid"] async for record in result] results['entity_relationships'] = rel_uuids - logger.info(f"Successfully saved {len(rel_uuids)} entity relationships to Neo4j") + logger.debug(f"Successfully saved {len(rel_uuids)} entity relationships to Neo4j") # 6. Save statement-chunk edges if statement_chunk_edges: @@ -339,7 +339,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(CHUNK_STATEMENT_EDGE_SAVE, chunk_statement_edges=sc_edge_data) sc_uuids = [record["uuid"] async for record in result] results['statement_chunk_edges'] = sc_uuids - logger.info(f"Successfully saved {len(sc_uuids)} statement-chunk edges to Neo4j") + logger.debug(f"Successfully saved {len(sc_uuids)} statement-chunk edges to Neo4j") # 7. Save statement-entity edges if statement_entity_edges: @@ -358,7 +358,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(STATEMENT_ENTITY_EDGE_SAVE, relationships=se_edge_data) se_uuids = [record["uuid"] async for record in result] results['statement_entity_edges'] = se_uuids - logger.info(f"Successfully saved {len(se_uuids)} statement-entity edges to Neo4j") + logger.debug(f"Successfully saved {len(se_uuids)} statement-entity edges to Neo4j") if perceptual_edges: from app.repositories.neo4j.cypher_queries import PERCEPTUAL_CHUNK_EDGE_SAVE @@ -374,7 +374,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(PERCEPTUAL_CHUNK_EDGE_SAVE, edges=perceptual_edge_data) perceptual_edges_uuids = [record["uuid"] async for record in result] results['perceptual_chunk_edges'] = perceptual_edges_uuids - logger.info(f"Successfully saved {len(perceptual_edges_uuids)} perceptual-chunk edges to Neo4j") + logger.debug(f"Successfully saved {len(perceptual_edges_uuids)} perceptual-chunk edges to Neo4j") # 8. Save assistant original nodes if assistant_original_nodes: @@ -383,7 +383,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(ASSISTANT_ORIGINAL_NODE_SAVE, originals=original_data) original_uuids = [record["uuid"] async for record in result] results['assistant_originals'] = original_uuids - logger.info(f"Successfully saved {len(original_uuids)} assistant original nodes to Neo4j") + logger.debug(f"Successfully saved {len(original_uuids)} assistant original nodes to Neo4j") # 9. Save assistant pruned nodes if assistant_pruned_nodes: @@ -392,7 +392,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(ASSISTANT_PRUNED_NODE_SAVE, pruneds=pruned_data) pruned_uuids = [record["uuid"] async for record in result] results['assistant_pruneds'] = pruned_uuids - logger.info(f"Successfully saved {len(pruned_uuids)} assistant pruned nodes to Neo4j") + logger.debug(f"Successfully saved {len(pruned_uuids)} assistant pruned nodes to Neo4j") # 10. Save PRUNED_TO edges (Original → Pruned) if assistant_pruned_edges: @@ -408,7 +408,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(ASSISTANT_PRUNED_EDGE_SAVE, edges=edge_data) pruned_edge_uuids = [record["uuid"] async for record in result] results['assistant_pruned_edges'] = pruned_edge_uuids - logger.info(f"Successfully saved {len(pruned_edge_uuids)} PRUNED_TO edges to Neo4j") + logger.debug(f"Successfully saved {len(pruned_edge_uuids)} PRUNED_TO edges to Neo4j") # 11. Save BELONGS_TO_DIALOG edges (Original → Dialogue) if assistant_dialog_edges: @@ -423,7 +423,7 @@ async def save_dialog_and_statements_to_neo4j( result = await tx.run(ASSISTANT_DIALOG_EDGE_SAVE, edges=edge_data) dialog_edge_uuids = [record["uuid"] async for record in result] results['assistant_dialog_edges'] = dialog_edge_uuids - logger.info(f"Successfully saved {len(dialog_edge_uuids)} BELONGS_TO_DIALOG edges to Neo4j") + logger.debug(f"Successfully saved {len(dialog_edge_uuids)} BELONGS_TO_DIALOG edges to Neo4j") return results diff --git a/api/app/repositories/ontology_class_repository.py b/api/app/repositories/ontology_class_repository.py index 5be81ff7..444717ce 100644 --- a/api/app/repositories/ontology_class_repository.py +++ b/api/app/repositories/ontology_class_repository.py @@ -227,7 +227,7 @@ class OntologyClassRepository: ).all() logger.info( - f"Found {len(classes)} ontology classes in scene {scene_id}" + f"Found {len(classes)} ontology classes in scene_id: {scene_id}" ) return classes diff --git a/api/app/services/memory_agent_service.py b/api/app/services/memory_agent_service.py index ca207933..797a3aef 100644 --- a/api/app/services/memory_agent_service.py +++ b/api/app/services/memory_agent_service.py @@ -64,7 +64,7 @@ class MemoryAgentService: def writer_messages_deal(self, messages, start_time, end_user_id, config_id, message, context): duration = time.time() - start_time if str(messages) == 'success': - logger.info(f"Write operation successful for group {end_user_id} with config_id {config_id}") + logger.info(f"Write operation successful for end_id: {end_user_id} with config_id: {config_id}") # 记录成功的操作 audit_logger.log_operation(operation="WRITE", config_id=config_id, end_user_id=end_user_id, success=True, @@ -360,10 +360,21 @@ class MemoryAgentService: workspace_id = None try: connected_config = get_end_user_connected_config(end_user_id, db) - workspace_id = connected_config.get("workspace_id") + # get_end_user_connected_config 返回字符串,需转为 UUID + workspace_id_raw = connected_config.get("workspace_id") + if workspace_id_raw and workspace_id_raw != "None": + try: + workspace_id = uuid.UUID(str(workspace_id_raw)) + except (ValueError, AttributeError): + workspace_id = None if config_id is None: - config_id = connected_config.get("memory_config_id") - logger.info(f"Resolved config from end_user: config_id={config_id}, workspace_id={workspace_id}") + config_id_raw = connected_config.get("memory_config_id") + if config_id_raw and config_id_raw != "None": + try: + config_id = uuid.UUID(str(config_id_raw)) + except (ValueError, AttributeError): + config_id = None + logger.info(f"Resolved config from end_user: config_id = {config_id}, workspace_id = {workspace_id}") if config_id is None and workspace_id is None: raise ValueError( f"No memory configuration found for end_user {end_user_id}. " @@ -517,7 +528,7 @@ class MemoryAgentService: workspace_id = connected_config.get("workspace_id") if config_id is None: config_id = connected_config.get("memory_config_id") - logger.info(f"Resolved config from end_user: config_id={config_id}, workspace_id={workspace_id}") + logger.info(f"Resolved config from end_user: config_id = {config_id}, workspace_id = {workspace_id}") if config_id is None and workspace_id is None: raise ValueError( f"No memory configuration found for end_user {end_user_id}. Please ensure the user has a connected memory configuration.") @@ -529,7 +540,7 @@ class MemoryAgentService: raise ValueError(f"Unable to determine memory configuration for end_user {end_user_id}: {e}") # If config_id was provided, continue without workspace_id fallback - logger.info(f"Read operation for group {end_user_id} with config_id {config_id}") + logger.info(f"Read operation for end_user_id: {end_user_id} with config_id: {config_id}") config_load_start = time.time() try: @@ -840,16 +851,16 @@ class MemoryAgentService: workspace_id = connected_config.get('workspace_id') if config_id is None: config_id = connected_config.get('memory_config_id') - logger.info(f"Resolved config from end_user: config_id={config_id}, workspace_id={workspace_id}") + logger.info(f"Resolved config from end_user: config_id = {config_id}, workspace_id = {workspace_id}") if config_id is None and workspace_id is None: raise ValueError( - f"No memory configuration found for end_user {end_user_id}. Please ensure the user has a connected memory configuration.") + f"No memory configuration found for end_user_id {end_user_id}. Please ensure the user has a connected memory configuration.") except Exception as e: if "No memory configuration found" in str(e): raise # Re-raise our specific error - logger.error(f"Failed to get connected config for end_user {end_user_id}: {e}") + logger.error(f"Failed to get connected config for end_user_id {end_user_id}: {e}") if config_id is None: - raise ValueError(f"Unable to determine memory configuration for end_user {end_user_id}: {e}") + raise ValueError(f"Unable to determine memory configuration for end_user_id {end_user_id}: {e}") # If config_id was provided, continue without workspace_id fallback logger.info(f"Generating summary from retrieve info for query: {query[:50]}...") @@ -1250,7 +1261,7 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An from app.models.end_user_model import EndUser from app.services.memory_config_service import MemoryConfigService - logger.info(f"Getting connected config for end_user: {end_user_id}") + logger.info(f"Getting connected config for end_user_id: {end_user_id}") # TODO: check sources for enduserid, should be one of these three: chat, draft, apikey # 1. 获取 end_user 及其 app_id @@ -1351,7 +1362,7 @@ def get_end_user_connected_config(end_user_id: str, db: Session) -> Dict[str, An } logger.info( - f"Successfully retrieved connected config: memory_config_id={memory_config_id}, workspace_id={end_user.workspace_id}") + f"Successfully retrieved connected config: memory_config_id = {memory_config_id}, workspace_id = {end_user.workspace_id}") return result diff --git a/api/app/tasks.py b/api/app/tasks.py index f697b5f3..6b1b6ae2 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -1279,7 +1279,7 @@ def write_message_task( with get_db_context() as db: logger.info( f"[CELERY WRITE] Executing MemoryAgentService.write_memory " - f"with config_id={actual_config_id} (type: {type(actual_config_id).__name__}), language={language}") + f"with config_id = {actual_config_id} (type: {type(actual_config_id).__name__}), language={language}") service = MemoryAgentService() result = await service.write_memory( WriteMemoryRequest(