diff --git a/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/data_pruning.py b/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/data_pruning.py index d932c542..0a913633 100644 --- a/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/data_pruning.py +++ b/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/data_pruning.py @@ -82,6 +82,10 @@ class SemanticPruner: self.language = language # 保存语言配置 self.max_concurrent = max_concurrent # 新增:最大并发数 + # 详细日志配置:限制逐条消息日志的数量 + self._detailed_prune_logging = True # 是否启用详细日志 + self._max_debug_msgs_per_dialog = 20 # 每个对话最多记录前N条消息的详细日志 + # 加载场景特定配置 self.scene_config: ScenePatterns = SceneConfigRegistry.get_config( self.config.pruning_scene, @@ -595,6 +599,11 @@ class SemanticPruner: unimportant_msgs = [] # 不重要消息(可删除) filler_msgs = [] # 填充消息(优先删除) + # 判断是否需要详细日志(仅对前N条消息记录) + should_log_details = self._detailed_prune_logging and original_count <= self._max_debug_msgs_per_dialog + if self._detailed_prune_logging and original_count > self._max_debug_msgs_per_dialog: + self._log(f" 对话[{d_idx}]消息数={original_count},仅采样前{self._max_debug_msgs_per_dialog}条进行详细日志") + for idx, m in enumerate(msgs): msg_text = m.msg.strip() @@ -607,15 +616,18 @@ class SemanticPruner: # 填充消息(寒暄、表情等) if self._is_filler_message(m): filler_msgs.append((idx, m)) - self._log(f" [{idx}] '{msg_text[:30]}...' → 填充") + if should_log_details or idx < self._max_debug_msgs_per_dialog: + self._log(f" [{idx}] '{msg_text[:30]}...' → 填充") # 重要信息(学号、成绩、时间、金额等) elif self._is_important_message(m): important_msgs.append((idx, m)) - self._log(f" [{idx}] '{msg_text[:30]}...' → 重要(场景规则)") + if should_log_details or idx < self._max_debug_msgs_per_dialog: + self._log(f" [{idx}] '{msg_text[:30]}...' → 重要(场景规则)") # 其他消息 else: unimportant_msgs.append((idx, m)) - self._log(f" [{idx}] '{msg_text[:30]}...' → 不重要") + if should_log_details or idx < self._max_debug_msgs_per_dialog: + self._log(f" [{idx}] '{msg_text[:30]}...' → 不重要") # 计算删除配额 delete_target = int(original_count * proportion) diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py index 17bda0e4..1242e4e6 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py @@ -1932,17 +1932,17 @@ def preprocess_data( Returns: 经过清洗转换后的 DialogData 列表 """ - print("\n=== 数据预处理 ===") + logger.debug("=== 数据预处理 ===") from app.core.memory.storage_services.extraction_engine.data_preprocessing.data_preprocessor import ( DataPreprocessor, ) preprocessor = DataPreprocessor() try: cleaned_data = preprocessor.preprocess(input_path=input_path, output_path=output_path, skip_cleaning=skip_cleaning, indices=indices) - print(f"数据预处理完成!共处理了 {len(cleaned_data)} 条对话数据") + logger.debug(f"数据预处理完成!共处理了 {len(cleaned_data)} 条对话数据") return cleaned_data except Exception as e: - print(f"数据预处理过程中出现错误: {e}") + logger.error(f"数据预处理过程中出现错误: {e}") raise @@ -1961,7 +1961,7 @@ async def get_chunked_dialogs_from_preprocessed( Returns: 带 chunks 的 DialogData 列表 """ - print(f"\n=== 批量对话分块处理 (使用 {chunker_strategy}) ===") + logger.debug(f"=== 批量对话分块处理 (使用 {chunker_strategy}) ===") if not data: raise ValueError("预处理数据为空,无法进行分块") @@ -2006,7 +2006,7 @@ async def get_chunked_dialogs_with_preprocessing( Returns: 带 chunks 的 DialogData 列表 """ - print("\n=== 完整数据处理流程(包含预处理)===") + logger.debug("=== 完整数据处理流程(包含预处理)===") if input_data_path is None: input_data_path = os.path.join( @@ -2038,11 +2038,11 @@ async def get_chunked_dialogs_with_preprocessing( if pruning_config: # 使用传入的配置 config = PruningConfig(**pruning_config) - print(f"[剪枝] 使用传入配置: switch={config.pruning_switch}, scene={config.pruning_scene}, threshold={config.pruning_threshold}") + logger.debug(f"[剪枝] 使用传入配置: switch={config.pruning_switch}, scene={config.pruning_scene}, threshold={config.pruning_threshold}") else: # 使用默认配置(关闭剪枝) config = None - print("[剪枝] 未提供配置,使用默认配置(剪枝关闭)") + logger.debug("[剪枝] 未提供配置,使用默认配置(剪枝关闭)") pruner = SemanticPruner(config=config, llm_client=llm_client) @@ -2057,12 +2057,12 @@ async def get_chunked_dialogs_with_preprocessing( if len(preprocessed_data) == 1 and single_dialog_original_msgs is not None: remaining_msgs = len(preprocessed_data[0].context.msgs) if preprocessed_data[0].context else 0 deleted_msgs = max(0, single_dialog_original_msgs - remaining_msgs) - print( + logger.debug( f"语义剪枝完成!剩余 1 条对话!原始消息数:{single_dialog_original_msgs}," f"保留消息数:{remaining_msgs},删除 {deleted_msgs} 条。" ) else: - print(f"语义剪枝完成!剩余 {len(preprocessed_data)} 条对话") + logger.debug(f"语义剪枝完成!剩余 {len(preprocessed_data)} 条对话") # 保存剪枝后的数据 try: @@ -2073,9 +2073,9 @@ async def get_chunked_dialogs_with_preprocessing( dp = DataPreprocessor(output_file_path=pruned_output_path) dp.save_data(preprocessed_data, output_path=pruned_output_path) except Exception as se: - print(f"保存剪枝结果失败:{se}") + logger.error(f"保存剪枝结果失败:{se}") except Exception as e: - print(f"语义剪枝过程中出现错误,跳过剪枝: {e}") + logger.error(f"语义剪枝过程中出现错误,跳过剪枝: {e}") # 步骤3: 对话分块 return await get_chunked_dialogs_from_preprocessed( diff --git a/api/app/services/pilot_run_service.py b/api/app/services/pilot_run_service.py index c39d089e..4d9cbb5e 100644 --- a/api/app/services/pilot_run_service.py +++ b/api/app/services/pilot_run_service.py @@ -140,12 +140,22 @@ async def run_pilot_extraction( remaining_msg_count = len(remaining_messages) deleted_msg_count = original_msg_count - remaining_msg_count - # 找出被删除的消息(通过内容对比) - remaining_contents = {msg["content"] for msg in remaining_messages} + # 找出被删除的消息(基于索引精确匹配) + # 为剩余消息创建带索引的列表,用于精确追踪 + remaining_with_index = [] + remaining_idx = 0 + for orig_idx, orig_msg in enumerate(original_messages): + if remaining_idx < len(remaining_messages) and \ + orig_msg["role"] == remaining_messages[remaining_idx]["role"] and \ + orig_msg["content"] == remaining_messages[remaining_idx]["content"]: + remaining_with_index.append(orig_idx) + remaining_idx += 1 + + # 找出未在保留列表中的消息索引 deleted_messages = [ {"index": idx, "role": msg["role"], "content": msg["content"]} for idx, msg in enumerate(original_messages) - if msg["content"] not in remaining_contents + if idx not in remaining_with_index ] # 保存剪枝统计信息(用于最终汇总,只保留deleted_count)