From 3a0671c661baee7000c1ab3ae43daf69bb11d811 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Sat, 28 Feb 2026 17:18:42 +0800 Subject: [PATCH] [add]The semantic pruning function is activated, removing the protection of question-answer pairs. --- .../core/memory/agent/utils/get_dialogs.py | 57 +- .../data_preprocessing/data_pruning.py | 511 ++++++++---------- .../data_preprocessing/scene_config.py | 326 +++++++++++ .../extraction_orchestrator.py | 16 +- 4 files changed, 619 insertions(+), 291 deletions(-) create mode 100644 api/app/core/memory/storage_services/extraction_engine/data_preprocessing/scene_config.py diff --git a/api/app/core/memory/agent/utils/get_dialogs.py b/api/app/core/memory/agent/utils/get_dialogs.py index bfb0f675..22555fff 100644 --- a/api/app/core/memory/agent/utils/get_dialogs.py +++ b/api/app/core/memory/agent/utils/get_dialogs.py @@ -21,7 +21,7 @@ async def get_chunked_dialogs( end_user_id: Group identifier messages: Structured message list [{"role": "user", "content": "..."}, ...] ref_id: Reference identifier - config_id: Configuration ID for processing + config_id: Configuration ID for processing (used to load pruning config) Returns: List of DialogData objects with generated chunks @@ -57,6 +57,61 @@ async def get_chunked_dialogs( end_user_id=end_user_id, config_id=config_id ) + + # 语义剪枝步骤(在分块之前) + try: + from app.core.memory.storage_services.extraction_engine.data_preprocessing.data_pruning import SemanticPruner + from app.core.memory.models.config_models import PruningConfig + from app.db import get_db_context + from app.services.memory_config_service import MemoryConfigService + from app.core.memory.utils.llm.llm_utils import MemoryClientFactory + + # 加载剪枝配置 + pruning_config = None + if config_id: + try: + with get_db_context() as db: + # 使用 MemoryConfigService 加载完整的 MemoryConfig 对象 + config_service = MemoryConfigService(db) + memory_config = config_service.load_memory_config( + config_id=config_id, + service_name="semantic_pruning" + ) + + if memory_config: + pruning_config = PruningConfig( + pruning_switch=memory_config.pruning_enabled, + pruning_scene=memory_config.pruning_scene or "education", + pruning_threshold=memory_config.pruning_threshold + ) + logger.info(f"[剪枝] 加载配置: switch={pruning_config.pruning_switch}, scene={pruning_config.pruning_scene}, threshold={pruning_config.pruning_threshold}") + + # 获取LLM客户端用于剪枝 + if pruning_config.pruning_switch: + factory = MemoryClientFactory(db) + llm_client = factory.get_llm_client_from_config(memory_config) + + # 执行剪枝 - 使用 prune_dataset 支持消息级剪枝 + pruner = SemanticPruner(config=pruning_config, llm_client=llm_client) + original_msg_count = len(dialog_data.context.msgs) + + # 使用 prune_dataset 而不是 prune_dialog + # prune_dataset 会进行消息级剪枝,即使对话整体相关也会删除不重要消息 + pruned_dialogs = await pruner.prune_dataset([dialog_data]) + + if pruned_dialogs: + dialog_data = pruned_dialogs[0] + remaining_msg_count = len(dialog_data.context.msgs) + deleted_count = original_msg_count - remaining_msg_count + logger.info(f"[剪枝] 完成: 原始{original_msg_count}条 -> 保留{remaining_msg_count}条 (删除{deleted_count}条)") + else: + logger.warning("[剪枝] prune_dataset 返回空列表") + else: + logger.info("[剪枝] 配置中剪枝开关关闭,跳过剪枝") + except Exception as e: + logger.warning(f"[剪枝] 加载配置失败,跳过剪枝: {e}", exc_info=True) + except Exception as e: + logger.warning(f"[剪枝] 执行失败,跳过剪枝: {e}", exc_info=True) chunker = DialogueChunker(chunker_strategy) extracted_chunks = await chunker.process_dialogue(dialog_data) diff --git a/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/data_pruning.py b/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/data_pruning.py index 2d0142c6..d932c542 100644 --- a/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/data_pruning.py +++ b/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/data_pruning.py @@ -22,6 +22,10 @@ from app.core.memory.models.message_models import DialogData, ConversationMessag from app.core.memory.models.config_models import PruningConfig from app.core.memory.utils.config.config_utils import get_pruning_config from app.core.memory.utils.prompt.prompt_utils import prompt_env, log_prompt_rendering, log_template_rendering +from app.core.memory.storage_services.extraction_engine.data_preprocessing.scene_config import ( + SceneConfigRegistry, + ScenePatterns +) class DialogExtractionResponse(BaseModel): @@ -78,6 +82,20 @@ class SemanticPruner: self.language = language # 保存语言配置 self.max_concurrent = max_concurrent # 新增:最大并发数 + # 加载场景特定配置 + self.scene_config: ScenePatterns = SceneConfigRegistry.get_config( + self.config.pruning_scene, + fallback_to_generic=True + ) + + # 检查场景是否有专门支持 + is_supported = SceneConfigRegistry.is_scene_supported(self.config.pruning_scene) + if is_supported: + self._log(f"[剪枝-初始化] 场景={self.config.pruning_scene} 使用专门配置") + else: + self._log(f"[剪枝-初始化] 场景={self.config.pruning_scene} 未预定义,使用通用配置(保守策略)") + self._log(f"[剪枝-初始化] 支持的场景: {SceneConfigRegistry.get_all_scenes()}") + # Load Jinja2 template self.template = prompt_env.get_template("extracat_Pruning.jinja2") @@ -87,108 +105,80 @@ class SemanticPruner: # 运行日志:收集关键终端输出,便于写入 JSON self.run_logs: List[str] = [] - - # 扩展的填充词库(包含表情符号和网络用语) - self._extended_fillers = [ - # 基础寒暄 - "你好", "您好", "在吗", "在的", "在呢", "嗯", "嗯嗯", "哦", "哦哦", - "好的", "好", "行", "可以", "不可以", "谢谢", "多谢", "感谢", - "拜拜", "再见", "88", "拜", "回见", - # 口头禅 - "哈哈", "呵呵", "哈哈哈", "嘿嘿", "嘻嘻", "hiahia", - "额", "呃", "啊", "诶", "唉", "哎", "嗯哼", - # 确认词 - "是的", "对", "对的", "没错", "嗯嗯", "好嘞", "收到", "明白", "了解", "知道了", - # 标点和符号 - "。。。", "...", "???", "???", "!!!", "!!!", - # 表情符号(文本形式) - "[微笑]", "[呲牙]", "[发呆]", "[得意]", "[流泪]", "[害羞]", "[闭嘴]", - "[睡]", "[大哭]", "[尴尬]", "[发怒]", "[调皮]", "[龇牙]", "[惊讶]", - "[难过]", "[酷]", "[冷汗]", "[抓狂]", "[吐]", "[偷笑]", "[可爱]", - "[白眼]", "[傲慢]", "[饥饿]", "[困]", "[惊恐]", "[流汗]", "[憨笑]", - # 网络用语 - "hhh", "hhhh", "2333", "666", "gg", "ok", "OK", "okok", - "emmm", "emm", "em", "mmp", "wtf", "omg", - ] def _is_important_message(self, message: ConversationMessage) -> bool: """基于启发式规则识别重要信息消息,优先保留。 - 改进版:增强了规则覆盖范围,包括: - - 含日期/时间(如YYYY-MM-DD、HH:MM、2024年11月10日、上午/下午) - - 含编号/ID/订单号/申请号/账号/电话/金额等关键字段 - - 关键词:"时间"、"日期"、"编号"、"订单"、"流水"、"金额"、"¥"、"元"、"电话"、"手机号"、"邮箱"、"地址" - - 新增:问句识别、决策性语句、承诺性语句 + 改进版:使用场景特定的模式进行识别 + - 根据 pruning_scene 动态加载对应的识别规则 + - 支持教育、在线服务、外呼三个场景的特定模式 """ text = message.msg.strip() if not text: return False - patterns = [ - # 原有模式 - r"\d{4}-\d{1,2}-\d{1,2}", # 修复:移除 \b 边界,因为中文前后没有单词边界 - r"\d{1,2}:\d{2}", # 修复:移除 \b - r"\d{4}年\d{1,2}月\d{1,2}日", - r"上午|下午|AM|PM|今天|明天|后天|昨天|前天|本周|下周|上周|本月|下月|上月", - r"订单号|工单|申请号|编号|ID|账号|账户|流水号|单号", - r"电话|手机号|微信|QQ|邮箱|联系方式", - r"地址|地点|位置|门牌号", - r"金额|费用|价格|¥|¥|\d+元|人民币|美元|欧元", - r"时间|日期|有效期|截止|期限|到期", - # 新增模式 - r"什么|为什么|怎么|如何|哪里|哪个|谁|多少|几点|何时", # 问句关键词 - r"必须|一定|务必|需要|要求|规定|应该", # 决策性语句 - r"承诺|保证|确保|负责|同意|答应", # 承诺性语句 - r"\d{11}", # 11位手机号 - r"\d{3,4}-\d{7,8}", # 固定电话 - r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", # 邮箱 - ] + # 使用场景特定的模式 + all_patterns = ( + self.scene_config.high_priority_patterns + + self.scene_config.medium_priority_patterns + + self.scene_config.low_priority_patterns + ) - for p in patterns: - if re.search(p, text, flags=re.IGNORECASE): + for pattern, _ in all_patterns: + if re.search(pattern, text, flags=re.IGNORECASE): return True # 检查是否为问句(以问号结尾或包含疑问词) if text.endswith("?") or text.endswith("?"): return True + + # 检查是否包含问句关键词 + if any(keyword in text for keyword in self.scene_config.question_keywords): + return True + + # 检查是否包含决策性关键词 + if any(keyword in text for keyword in self.scene_config.decision_keywords): + return True return False def _importance_score(self, message: ConversationMessage) -> int: """为重要消息打分,用于在保留比例内优先保留更关键的内容。 - 改进版:更细致的评分体系(0-10分) + 改进版:使用场景特定的权重体系(0-10分) + - 根据场景动态调整不同信息类型的权重 + - 高优先级模式:4-6分 + - 中优先级模式:2-3分 + - 低优先级模式:1分 """ text = message.msg.strip() score = 0 - weights = [ - # 高优先级(4-5分) - (r"订单号|工单|申请号|编号|ID|账号|账户", 5), - (r"金额|费用|价格|¥|¥|\d+元", 5), - (r"\d{11}", 4), # 手机号 - (r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", 4), # 邮箱 - - # 中优先级(2-3分) - (r"\d{4}-\d{1,2}-\d{1,2}", 3), # 修复:移除 \b - (r"\d{4}年\d{1,2}月\d{1,2}日", 3), - (r"电话|手机号|微信|QQ|联系方式", 3), - (r"地址|地点|位置", 2), - (r"时间|日期|有效期|截止|明天|后天|下周|下月", 2), # 新增时间相关词 - - # 低优先级(1分) - (r"\d{1,2}:\d{2}", 1), # 修复:移除 \b - (r"上午|下午|AM|PM", 1), - ] + # 使用场景特定的权重 + for pattern, weight in self.scene_config.high_priority_patterns: + if re.search(pattern, text, flags=re.IGNORECASE): + score += weight - for p, w in weights: - if re.search(p, text, flags=re.IGNORECASE): - score += w + for pattern, weight in self.scene_config.medium_priority_patterns: + if re.search(pattern, text, flags=re.IGNORECASE): + score += weight + + for pattern, weight in self.scene_config.low_priority_patterns: + if re.search(pattern, text, flags=re.IGNORECASE): + score += weight # 问句加分 if text.endswith("?") or text.endswith("?"): score += 2 + # 包含问句关键词加分 + if any(keyword in text for keyword in self.scene_config.question_keywords): + score += 1 + + # 包含决策性关键词加分 + if any(keyword in text for keyword in self.scene_config.decision_keywords): + score += 2 + # 长度加分(较长的消息通常包含更多信息) if len(text) > 50: score += 1 @@ -198,20 +188,35 @@ class SemanticPruner: return min(score, 10) # 最高10分 def _is_filler_message(self, message: ConversationMessage) -> bool: - """检测典型寒暄/口头禅/确认类短消息,用于跳过LLM分类以加速。 + """检测典型寒暄/口头禅/确认类短消息。 - 改进版:扩展了填充词库,支持表情符号和网络用语 + 改进版:更严格的填充消息判断,避免误删场景相关内容 满足以下之一视为填充消息: - - 纯标点或长度很短(<= 4 个汉字或 <= 8 个字符)且不包含数字或关键实体 - - 在扩展填充词库中 + - 纯标点或空白 + - 在场景特定填充词库中(精确匹配) - 纯表情符号 + - 常见寒暄(精确匹配短语) + + 注意:不再使用长度判断,避免误删短但重要的消息 """ t = message.msg.strip() if not t: return True - # 检查是否在扩展填充词库中 - if t in self._extended_fillers: + # 检查是否在场景特定填充词库中(精确匹配) + if t in self.scene_config.filler_phrases: + return True + + # 常见寒暄和问候(精确匹配,避免误删) + common_greetings = { + "在吗", "在不在", "在呢", "在的", + "你好", "您好", "hello", "hi", + "拜拜", "再见", "拜", "88", "bye", + "好的", "好", "行", "可以", "嗯", "哦", "啊", + "是的", "对", "对的", "没错", "是啊", + "哈哈", "呵呵", "嘿嘿", "嗯嗯" + } + if t in common_greetings: return True # 检查是否为纯表情符号(方括号包裹) @@ -232,13 +237,9 @@ class SemanticPruner: if emoji_pattern.fullmatch(t): return True - # 长度与字符类型判断 - if len(t) <= 8: - # 非数字、无关键实体的短文本 - if not re.search(r"[0-9]", t) and not self._is_important_message(message): - # 主要是标点或简单确认词 - if re.fullmatch(r"[。!?,.!?…·\s]+", t): - return True + # 纯标点符号 + if re.fullmatch(r"[。!?,.!?…·\s]+", t): + return True return False @@ -308,6 +309,8 @@ class SemanticPruner: def _identify_qa_pairs(self, messages: List[ConversationMessage]) -> List[QAPair]: """识别对话中的问答对,用于保护问答结构的完整性。 + 改进版:使用场景特定的问句关键词,并排除寒暄类问句 + Args: messages: 消息列表 @@ -316,21 +319,39 @@ class SemanticPruner: """ qa_pairs = [] + # 寒暄类问句,不应该被保护(这些不是真正的问答) + greeting_questions = { + "在吗", "在不在", "你好吗", "怎么样", "好吗", + "有空吗", "忙吗", "睡了吗", "起床了吗" + } + for i in range(len(messages) - 1): current_msg = messages[i].msg.strip() next_msg = messages[i + 1].msg.strip() - # 简单规则:如果当前消息是问句,下一条消息可能是答案 - is_question = ( - current_msg.endswith("?") or - current_msg.endswith("?") or - any(word in current_msg for word in ["什么", "为什么", "怎么", "如何", "哪里", "哪个", "谁", "多少", "几点", "何时", "吗"]) - ) + # 排除寒暄类问句 + if current_msg in greeting_questions: + continue + + # 使用场景特定的问句关键词,但要求更严格 + is_question = False + + # 1. 以问号结尾 + if current_msg.endswith("?") or current_msg.endswith("?"): + is_question = True + # 2. 包含实质性问句关键词(排除"吗"这种太宽泛的) + elif any(word in current_msg for word in ["什么", "为什么", "怎么", "如何", "哪里", "哪个", "谁", "多少", "几点", "何时"]): + is_question = True if is_question and next_msg: - # 检查下一条消息是否像答案(不是另一个问句) + # 检查下一条消息是否像答案(不是另一个问句,也不是寒暄) is_answer = not (next_msg.endswith("?") or next_msg.endswith("?")) + # 排除寒暄类回复 + greeting_answers = {"你好", "您好", "在呢", "在的", "嗯", "哦", "好的"} + if next_msg in greeting_answers: + is_answer = False + if is_answer: qa_pairs.append(QAPair( question_idx=i, @@ -533,10 +554,9 @@ class SemanticPruner: """数据集层面:全局消息级剪枝,保留所有对话。 改进版: - - 并发处理对话级相关性判断 - - 问答对识别和保护 - - 优化删除策略,保持上下文连贯性 - - 仅在"不相关对话"的范围内执行消息剪枝;相关对话不动 + - 消息级独立判断,每条消息根据场景规则独立评估 + - 问答对保护已注释(暂不启用,留作观察) + - 优化删除策略:填充消息 → 不重要消息 → 低分重要消息 - 只删除"不重要的不相关消息",重要信息(时间、编号等)强制保留 - 保证每段对话至少保留1条消息,不会删除整段对话 """ @@ -553,209 +573,122 @@ class SemanticPruner: proportion = 0.0 self._log( - f"[剪枝-数据集] 对话总数={len(dialogs)} 场景={self.config.pruning_scene} 删除比例={proportion} 开关={self.config.pruning_switch}" + f"[剪枝-数据集] 对话总数={len(dialogs)} 场景={self.config.pruning_scene} 删除比例={proportion} 开关={self.config.pruning_switch} 模式=消息级独立判断" ) - # 并发处理对话级相关性分类 - semaphore = asyncio.Semaphore(self.max_concurrent) - - async def classify_dialog(idx: int, dd: DialogData): - async with semaphore: - try: - ex = await self._extract_dialog_important(dd.content) - return { - "dialog": dd, - "is_related": bool(ex.is_related), - "index": idx, - "extraction": ex - } - except Exception as e: - self._log(f"[剪枝-并发] 对话 {idx} 分类失败: {str(e)[:100]}") - return { - "dialog": dd, - "is_related": True, # 失败时标记为相关,避免误删 - "index": idx, - "extraction": None - } - - # 并发执行所有对话的分类 - tasks = [classify_dialog(idx, dd) for idx, dd in enumerate(dialogs)] - evaluated_dialogs = await asyncio.gather(*tasks) - - # 统计相关 / 不相关对话 - not_related_dialogs = [d for d in evaluated_dialogs if not d["is_related"]] - related_dialogs = [d for d in evaluated_dialogs if d["is_related"]] - self._log( - f"[剪枝-数据集] 相关对话数={len(related_dialogs)} 不相关对话数={len(not_related_dialogs)}" - ) - - # 简洁打印第几段对话相关/不相关(索引基于1) - def _fmt_indices(items, cap: int = 10): - inds = [i["index"] + 1 for i in items] - if len(inds) <= cap: - return inds - return inds[:cap] + ["...", f"共{len(inds)}个"] - - rel_inds = _fmt_indices(related_dialogs) - nrel_inds = _fmt_indices(not_related_dialogs) - self._log(f"[剪枝-数据集] 相关对话:第{rel_inds}段;不相关对话:第{nrel_inds}段") - result: List[DialogData] = [] - if not_related_dialogs: - # 为每个不相关对话进行分析 - per_dialog_info = {} - total_unrelated = 0 + total_original_msgs = 0 + total_deleted_msgs = 0 + + for d_idx, dd in enumerate(dialogs): + msgs = dd.context.msgs + original_count = len(msgs) + total_original_msgs += original_count - for d in not_related_dialogs: - dd = d["dialog"] - extraction = d.get("extraction") - if extraction is None: - extraction = await self._extract_dialog_important(dd.content) - - # 合并所有重要标记 - tokens = extraction.times + extraction.ids + extraction.amounts + extraction.contacts + extraction.addresses + extraction.keywords - msgs = dd.context.msgs - - # 识别问答对 - qa_pairs = self._identify_qa_pairs(msgs) - protected_indices = self._get_protected_indices(msgs, qa_pairs, window_size=1) - - # 分类消息(考虑问答对保护) - imp_unrel_msgs = [] - unimp_unrel_msgs = [] - - for idx, m in enumerate(msgs): - # 问答对中的消息自动标记为重要 - if idx in protected_indices: - imp_unrel_msgs.append((idx, m)) - elif self._msg_matches_tokens(m, tokens) or self._is_important_message(m): - imp_unrel_msgs.append((idx, m)) - elif not self._is_filler_message(m): - unimp_unrel_msgs.append((idx, m)) - # 填充消息不加入任何列表,优先删除 - - # 重要消息按重要性排序 - imp_sorted = sorted(imp_unrel_msgs, key=lambda x: self._importance_score(x[1])) - imp_sorted_ids = [id(m) for _, m in imp_sorted] - - info = { - "dialog": dd, - "total_msgs": len(msgs), - "unrelated_count": len(msgs), - "imp_ids_sorted": imp_sorted_ids, - "unimp_ids": [id(m) for _, m in unimp_unrel_msgs], - "protected_indices": protected_indices, - "qa_pairs_count": len(qa_pairs), - } - per_dialog_info[d["index"]] = info - total_unrelated += info["unrelated_count"] + # ========== 问答对保护(已注释,暂不启用,留作观察) ========== + # qa_pairs = self._identify_qa_pairs(msgs) + # protected_indices = self._get_protected_indices(msgs, qa_pairs, window_size=0) + # ======================================================== - # 全局删除配额计算 - global_delete = int(total_unrelated * proportion) - if proportion > 0 and total_unrelated > 0 and global_delete == 0: - global_delete = 1 + # 消息级分类:每条消息独立判断 + important_msgs = [] # 重要消息(保留) + unimportant_msgs = [] # 不重要消息(可删除) + filler_msgs = [] # 填充消息(优先删除) - # 每段的最大可删容量 - capacities = [] - for d in not_related_dialogs: - idx = d["index"] - info = per_dialog_info[idx] - imp_count = len(info["imp_ids_sorted"]) - unimp_count = len(info["unimp_ids"]) - imp_cap = int(imp_count * proportion) - cap = min(unimp_count + imp_cap, max(0, info["total_msgs"] - 1)) - capacities.append(cap) + for idx, m in enumerate(msgs): + msg_text = m.msg.strip() + + # ========== 问答对保护判断(已注释) ========== + # if idx in protected_indices: + # important_msgs.append((idx, m)) + # self._log(f" [{idx}] '{msg_text[:30]}...' → 重要(问答对保护)") + # ========================================== + + # 填充消息(寒暄、表情等) + if self._is_filler_message(m): + filler_msgs.append((idx, m)) + self._log(f" [{idx}] '{msg_text[:30]}...' → 填充") + # 重要信息(学号、成绩、时间、金额等) + elif self._is_important_message(m): + important_msgs.append((idx, m)) + self._log(f" [{idx}] '{msg_text[:30]}...' → 重要(场景规则)") + # 其他消息 + else: + unimportant_msgs.append((idx, m)) + self._log(f" [{idx}] '{msg_text[:30]}...' → 不重要") - total_capacity = sum(capacities) - if global_delete > total_capacity: - self._log(f"[剪枝-数据集] 不相关消息总数={total_unrelated},目标删除={global_delete},最大可删={total_capacity}。将按最大可删执行。") - global_delete = total_capacity - - # 配额分配 - alloc = [] - for i, d in enumerate(not_related_dialogs): - idx = d["index"] - info = per_dialog_info[idx] - share = int(global_delete * (info["unrelated_count"] / total_unrelated)) if total_unrelated > 0 else 0 - alloc.append(min(share, capacities[i])) + # 计算删除配额 + delete_target = int(original_count * proportion) + if proportion > 0 and original_count > 0 and delete_target == 0: + delete_target = 1 - allocated = sum(alloc) - rem = global_delete - allocated - turn = 0 - while rem > 0 and turn < 100000: - progressed = False - for i in range(len(not_related_dialogs)): - if rem <= 0: - break - if alloc[i] < capacities[i]: - alloc[i] += 1 - rem -= 1 - progressed = True - if not progressed: - break - turn += 1 - - # 应用删除 - total_deleted_confirm = 0 - for d in evaluated_dialogs: - dd = d["dialog"] - msgs = dd.context.msgs - original = len(msgs) - - if d["is_related"]: - result.append(dd) - continue - - idx_in_unrel = next((k for k, x in enumerate(not_related_dialogs) if x["index"] == d["index"]), None) - if idx_in_unrel is None: - result.append(dd) - continue - - quota = alloc[idx_in_unrel] - info = per_dialog_info[d["index"]] - - # 计算删除ID - imp_count = len(info["imp_ids_sorted"]) - imp_del_cap = int(imp_count * proportion) - - unimp_delete_ids = set(info["unimp_ids"][:min(quota, len(info["unimp_ids"]))]) - del_unimp = min(quota, len(unimp_delete_ids)) - rem_quota = quota - del_unimp - - imp_delete_ids = set(info["imp_ids_sorted"][:min(rem_quota, imp_del_cap)]) - - deleted_here = 0 - actual_unimp_deleted = 0 - actual_imp_deleted = 0 - kept = [] - - for m in msgs: - mid = id(m) - if mid in unimp_delete_ids and actual_unimp_deleted < del_unimp: - actual_unimp_deleted += 1 - deleted_here += 1 - continue - if mid in imp_delete_ids and actual_imp_deleted < len(imp_delete_ids): - actual_imp_deleted += 1 - deleted_here += 1 - continue - kept.append(m) - - if not kept and msgs: - kept = [msgs[0]] - - dd.context.msgs = kept - total_deleted_confirm += deleted_here - - qa_info = f",问答对={info['qa_pairs_count']}" if info['qa_pairs_count'] > 0 else "" - self._log( - f"[剪枝-对话] 对话 {d['index']+1} 总消息={original} 分配删除={quota} 实删={deleted_here} 保留={len(kept)}{qa_info}" - ) - result.append(dd) + # 确保至少保留1条消息 + max_deletable = max(0, original_count - 1) + delete_target = min(delete_target, max_deletable) - self._log(f"[剪枝-数据集] 全局消息级剪枝完成,总删除 {total_deleted_confirm} 条(保护问答对和上下文)。") - else: - result = [d["dialog"] for d in evaluated_dialogs] + # 删除策略:优先删除填充消息,再删除不重要消息 + to_delete_indices = set() + deleted_details = [] # 记录删除的消息详情 + + # 第一步:删除填充消息 + filler_to_delete = min(len(filler_msgs), delete_target) + for i in range(filler_to_delete): + idx, msg = filler_msgs[i] + to_delete_indices.add(idx) + deleted_details.append(f"[{idx}] 填充: '{msg.msg[:50]}'") + + # 第二步:如果还需要删除,删除不重要消息 + remaining_quota = delete_target - len(to_delete_indices) + if remaining_quota > 0: + unimp_to_delete = min(len(unimportant_msgs), remaining_quota) + for i in range(unimp_to_delete): + idx, msg = unimportant_msgs[i] + to_delete_indices.add(idx) + deleted_details.append(f"[{idx}] 不重要: '{msg.msg[:50]}'") + + # 第三步:如果还需要删除,按重要性分数删除重要消息 + remaining_quota = delete_target - len(to_delete_indices) + if remaining_quota > 0 and important_msgs: + # 按重要性分数排序(分数低的优先删除) + imp_sorted = sorted(important_msgs, key=lambda x: self._importance_score(x[1])) + imp_to_delete = min(len(imp_sorted), remaining_quota) + for i in range(imp_to_delete): + idx, msg = imp_sorted[i] + to_delete_indices.add(idx) + score = self._importance_score(msg) + deleted_details.append(f"[{idx}] 重要(分数{score}): '{msg.msg[:50]}'") + + # 执行删除 + kept_msgs = [] + for idx, m in enumerate(msgs): + if idx not in to_delete_indices: + kept_msgs.append(m) + + # 确保至少保留1条 + if not kept_msgs and msgs: + kept_msgs = [msgs[0]] + + dd.context.msgs = kept_msgs + deleted_count = original_count - len(kept_msgs) + total_deleted_msgs += deleted_count + + # 输出删除详情 + if deleted_details: + self._log(f"[剪枝-删除详情] 对话 {d_idx+1} 删除了以下消息:") + for detail in deleted_details: + self._log(f" {detail}") + + # ========== 问答对统计(已注释) ========== + # qa_info = f",问答对={len(qa_pairs)}" if qa_pairs else "" + # ======================================== + + self._log( + f"[剪枝-对话] 对话 {d_idx+1} 总消息={original_count} " + f"(重要={len(important_msgs)} 不重要={len(unimportant_msgs)} 填充={len(filler_msgs)}) " + f"删除={deleted_count} 保留={len(kept_msgs)}" + ) + + result.append(dd) self._log(f"[剪枝-数据集] 剩余对话数={len(result)}") diff --git a/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/scene_config.py b/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/scene_config.py new file mode 100644 index 00000000..ed9592af --- /dev/null +++ b/api/app/core/memory/storage_services/extraction_engine/data_preprocessing/scene_config.py @@ -0,0 +1,326 @@ +""" +场景特定配置 - 为不同场景提供定制化的剪枝规则 + +功能: +- 场景特定的重要信息识别模式 +- 场景特定的重要性评分权重 +- 场景特定的填充词库 +- 场景特定的问答对识别规则 +""" + +from typing import Dict, List, Set, Tuple +from dataclasses import dataclass, field + + +@dataclass +class ScenePatterns: + """场景特定的识别模式""" + + # 重要信息的正则模式(优先级从高到低) + high_priority_patterns: List[Tuple[str, int]] = field(default_factory=list) # (pattern, weight) + medium_priority_patterns: List[Tuple[str, int]] = field(default_factory=list) + low_priority_patterns: List[Tuple[str, int]] = field(default_factory=list) + + # 填充词库(无意义对话) + filler_phrases: Set[str] = field(default_factory=set) + + # 问句关键词(用于识别问答对) + question_keywords: Set[str] = field(default_factory=set) + + # 决策性/承诺性关键词 + decision_keywords: Set[str] = field(default_factory=set) + + +class SceneConfigRegistry: + """场景配置注册表 - 管理所有场景的特定配置""" + + # 基础通用模式(所有场景共享) + BASE_HIGH_PRIORITY = [ + (r"订单号|工单|申请号|编号|ID|账号|账户", 5), + (r"金额|费用|价格|¥|¥|\d+元", 5), + (r"\d{11}", 4), # 手机号 + (r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", 4), # 邮箱 + ] + + BASE_MEDIUM_PRIORITY = [ + (r"\d{4}-\d{1,2}-\d{1,2}", 3), # 日期 + (r"\d{4}年\d{1,2}月\d{1,2}日", 3), + (r"电话|手机号|微信|QQ|联系方式", 3), + (r"地址|地点|位置", 2), + (r"时间|日期|有效期|截止", 2), + (r"今天|明天|后天|昨天|前天", 3), # 相对时间(提高权重) + (r"下周|下月|下年|上周|上月|上年|本周|本月|本年", 3), + (r"今年|去年|明年", 3), + ] + + BASE_LOW_PRIORITY = [ + (r"\d{1,2}:\d{2}", 2), # 时间点 HH:MM + (r"\d{1,2}点\d{0,2}分?", 2), # 时间点 X点Y分 或 X点 + (r"上午|下午|中午|晚上|早上|傍晚|凌晨", 2), # 时段(提高权重并扩充) + (r"AM|PM|am|pm", 1), + ] + + BASE_FILLERS = { + # 基础寒暄 + "你好", "您好", "在吗", "在的", "在呢", "嗯", "嗯嗯", "哦", "哦哦", + "好的", "好", "行", "可以", "不可以", "谢谢", "多谢", "感谢", + "拜拜", "再见", "88", "拜", "回见", + # 口头禅 + "哈哈", "呵呵", "哈哈哈", "嘿嘿", "嘻嘻", "hiahia", + "额", "呃", "啊", "诶", "唉", "哎", "嗯哼", + # 确认词 + "是的", "对", "对的", "没错", "嗯嗯", "好嘞", "收到", "明白", "了解", "知道了", + # 标点和符号 + "。。。", "...", "???", "???", "!!!", "!!!", + # 表情符号 + "[微笑]", "[呲牙]", "[发呆]", "[得意]", "[流泪]", "[害羞]", "[闭嘴]", + "[睡]", "[大哭]", "[尴尬]", "[发怒]", "[调皮]", "[龇牙]", "[惊讶]", + "[难过]", "[酷]", "[冷汗]", "[抓狂]", "[吐]", "[偷笑]", "[可爱]", + "[白眼]", "[傲慢]", "[饥饿]", "[困]", "[惊恐]", "[流汗]", "[憨笑]", + # 网络用语 + "hhh", "hhhh", "2333", "666", "gg", "ok", "OK", "okok", + "emmm", "emm", "em", "mmp", "wtf", "omg", + } + + BASE_QUESTION_KEYWORDS = { + "什么", "为什么", "怎么", "如何", "哪里", "哪个", "谁", "多少", "几点", "何时", "吗" + } + + BASE_DECISION_KEYWORDS = { + "必须", "一定", "务必", "需要", "要求", "规定", "应该", + "承诺", "保证", "确保", "负责", "同意", "答应" + } + + @classmethod + def get_education_config(cls) -> ScenePatterns: + """教育场景配置""" + return ScenePatterns( + high_priority_patterns=cls.BASE_HIGH_PRIORITY + [ + # 成绩相关(最高优先级) + (r"成绩|分数|得分|满分|及格|不及格", 6), + (r"GPA|绩点|学分|平均分", 6), + (r"\d+分|\d+\.?\d*分", 5), # 具体分数 + (r"排名|名次|第.{1,3}名", 5), # 支持"第三名"、"第1名"等 + + # 学籍信息 + (r"学号|学生证|教师工号|工号", 5), + (r"班级|年级|专业|院系", 4), + + # 课程相关 + (r"课程|科目|学科|必修|选修", 4), + (r"教材|课本|教科书|参考书", 4), + (r"章节|第.{1,3}章|第.{1,3}节", 3), # 支持"第三章"、"第1章"等 + + # 学科内容(新增) + (r"微积分|导数|积分|函数|极限|微分", 4), + (r"代数|几何|三角|概率|统计", 4), + (r"物理|化学|生物|历史|地理", 4), + (r"英语|语文|数学|政治|哲学", 4), + (r"定义|定理|公式|概念|原理|法则", 3), + (r"例题|解题|证明|推导|计算", 3), + ], + medium_priority_patterns=cls.BASE_MEDIUM_PRIORITY + [ + # 教学活动 + (r"作业|练习|习题|题目", 3), + (r"考试|测验|测试|考核|期中|期末", 3), + (r"上课|下课|课堂|讲课", 2), + (r"提问|回答|发言|讨论", 2), + (r"问一下|请教|咨询|询问", 2), # 新增:问询相关 + (r"理解|明白|懂|掌握|学会", 2), # 新增:学习状态 + + # 时间安排 + (r"课表|课程表|时间表", 3), + (r"第.{1,3}节课|第.{1,3}周", 2), # 支持"第三节课"、"第1周"等 + ], + low_priority_patterns=cls.BASE_LOW_PRIORITY + [ + (r"老师|教师|同学|学生", 1), + (r"教室|实验室|图书馆", 1), + ], + filler_phrases=cls.BASE_FILLERS | { + # 教育场景特有填充词(移除了"明白了"、"懂了"、"不懂"等,这些在教育场景中有意义) + "老师好", "同学们好", "上课", "下课", "起立", "坐下", + "举手", "请坐", "很好", "不错", "继续", + "下一个", "下一题", "下一位", "还有吗", "还有问题吗", + }, + question_keywords=cls.BASE_QUESTION_KEYWORDS | { + "为啥", "咋", "咋办", "怎样", "如何做", + "能不能", "可不可以", "行不行", "对不对", "是不是", + }, + decision_keywords=cls.BASE_DECISION_KEYWORDS | { + "必考", "重点", "考点", "难点", "关键", + "记住", "背诵", "掌握", "理解", "复习", + } + ) + + @classmethod + def get_online_service_config(cls) -> ScenePatterns: + """在线服务场景配置""" + return ScenePatterns( + high_priority_patterns=cls.BASE_HIGH_PRIORITY + [ + # 工单相关(最高优先级) + (r"工单号|工单编号|ticket|TK\d+", 6), + (r"工单状态|处理中|已解决|已关闭|待处理", 5), + (r"优先级|紧急|高优先级|P0|P1|P2", 5), + + # 产品信息 + (r"产品型号|型号|SKU|产品编号", 5), + (r"序列号|SN|设备号", 5), + (r"版本号|软件版本|固件版本", 4), + + # 问题描述 + (r"故障|错误|异常|bug|问题", 4), + (r"错误代码|故障代码|error code", 5), + (r"无法|不能|失败|报错", 3), + ], + medium_priority_patterns=cls.BASE_MEDIUM_PRIORITY + [ + # 服务相关 + (r"退款|退货|换货|补发", 4), + (r"发票|收据|凭证", 3), + (r"物流|快递|运单号", 3), + (r"保修|质保|售后", 3), + + # 时效相关 + (r"SLA|响应时间|处理时长", 4), + (r"超时|延迟|等待", 2), + ], + low_priority_patterns=cls.BASE_LOW_PRIORITY + [ + (r"客服|工程师|技术支持", 1), + (r"用户|客户|会员", 1), + ], + filler_phrases=cls.BASE_FILLERS | { + # 在线服务特有填充词 + "您好", "请问", "请稍等", "稍等", "马上", "立即", + "正在查询", "正在处理", "正在为您", "帮您查一下", + "还有其他问题吗", "还需要什么帮助", "很高兴为您服务", + "感谢您的耐心等待", "抱歉让您久等了", + "已记录", "已反馈", "已转接", "已升级", + "祝您生活愉快", "再见", "欢迎下次咨询", + }, + question_keywords=cls.BASE_QUESTION_KEYWORDS | { + "能否", "可否", "是否", "有没有", "能不能", + "怎么办", "如何处理", "怎么解决", + }, + decision_keywords=cls.BASE_DECISION_KEYWORDS | { + "立即处理", "马上解决", "尽快", "优先", + "升级", "转接", "派单", "跟进", + "补偿", "赔偿", "退款", "换货", + } + ) + + @classmethod + def get_outbound_config(cls) -> ScenePatterns: + """外呼场景配置""" + return ScenePatterns( + high_priority_patterns=cls.BASE_HIGH_PRIORITY + [ + # 意向相关(最高优先级) + (r"意向|意愿|兴趣|感兴趣", 6), + (r"A类|B类|C类|D类|高意向|低意向", 6), + (r"成交|签约|下单|购买|确认", 6), + + # 联系信息(外呼场景中更重要) + (r"预约|约定|安排|确定时间", 5), + (r"下次联系|回访|跟进", 5), + (r"方便|有空|可以|时间", 4), + + # 通话状态 + (r"接通|未接通|占线|关机|停机", 4), + (r"通话时长|通话时间", 3), + ], + medium_priority_patterns=cls.BASE_MEDIUM_PRIORITY + [ + # 客户信息 + (r"姓名|称呼|先生|女士", 3), + (r"公司|单位|职位|职务", 3), + (r"需求|要求|期望", 3), + + # 跟进状态 + (r"跟进状态|进展|进度", 3), + (r"已联系|待联系|联系中", 2), + (r"拒绝|不感兴趣|考虑|再说", 3), + ], + low_priority_patterns=cls.BASE_LOW_PRIORITY + [ + (r"销售|客户经理|业务员", 1), + (r"产品|服务|方案", 1), + ], + filler_phrases=cls.BASE_FILLERS | { + # 外呼场景特有填充词 + "您好", "喂", "hello", "打扰了", "不好意思", + "方便接电话吗", "现在方便吗", "占用您一点时间", + "我是", "我们是", "我们公司", "我们这边", + "了解一下", "介绍一下", "简单说一下", + "考虑考虑", "想一想", "再说", "再看看", + "不需要", "不感兴趣", "没兴趣", "不用了", + "好的", "行", "可以", "没问题", "那就这样", + "再联系", "回头聊", "有需要再说", + }, + question_keywords=cls.BASE_QUESTION_KEYWORDS | { + "有没有", "需不需要", "要不要", "考虑不考虑", + "了解吗", "知道吗", "听说过吗", + "方便吗", "有空吗", "在吗", + }, + decision_keywords=cls.BASE_DECISION_KEYWORDS | { + "确定", "决定", "选择", "购买", "下单", + "预约", "安排", "约定", "确认", + "跟进", "回访", "联系", "沟通", + } + ) + + @classmethod + def get_config(cls, scene: str, fallback_to_generic: bool = True) -> ScenePatterns: + """根据场景名称获取配置 + + Args: + scene: 场景名称 ('education', 'online_service', 'outbound' 或其他) + fallback_to_generic: 如果场景不存在,是否降级到通用配置 + + Returns: + 对应场景的配置,如果场景不存在: + - fallback_to_generic=True: 返回通用配置(仅基础规则) + - fallback_to_generic=False: 抛出异常 + """ + scene_map = { + 'education': cls.get_education_config, + 'online_service': cls.get_online_service_config, + 'outbound': cls.get_outbound_config, + } + + if scene in scene_map: + return scene_map[scene]() + + if fallback_to_generic: + # 返回通用配置(仅包含基础规则,不包含场景特定规则) + return cls.get_generic_config() + else: + raise ValueError(f"不支持的场景: {scene},支持的场景: {list(scene_map.keys())}") + + @classmethod + def get_generic_config(cls) -> ScenePatterns: + """通用场景配置 - 仅包含基础规则,适用于未定义的场景 + + 这是一个保守的配置,只使用最通用的规则,避免误删重要信息 + """ + return ScenePatterns( + high_priority_patterns=cls.BASE_HIGH_PRIORITY, + medium_priority_patterns=cls.BASE_MEDIUM_PRIORITY, + low_priority_patterns=cls.BASE_LOW_PRIORITY, + filler_phrases=cls.BASE_FILLERS, + question_keywords=cls.BASE_QUESTION_KEYWORDS, + decision_keywords=cls.BASE_DECISION_KEYWORDS + ) + + @classmethod + def get_all_scenes(cls) -> List[str]: + """获取所有预定义场景的列表""" + return ['education', 'online_service', 'outbound'] + + @classmethod + def is_scene_supported(cls, scene: str) -> bool: + """检查场景是否有专门的配置支持 + + Args: + scene: 场景名称 + + Returns: + True: 有专门配置 + False: 将使用通用配置 + """ + return scene in cls.get_all_scenes() diff --git a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py index a47497da..17bda0e4 100644 --- a/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py +++ b/api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py @@ -1988,6 +1988,7 @@ async def get_chunked_dialogs_with_preprocessing( input_data_path: Optional[str] = None, llm_client: Optional[Any] = None, skip_cleaning: bool = True, + pruning_config: Optional[Dict] = None, ) -> List[DialogData]: """包含数据预处理步骤的完整分块流程 @@ -2000,6 +2001,7 @@ async def get_chunked_dialogs_with_preprocessing( input_data_path: 输入数据路径 llm_client: LLM 客户端 skip_cleaning: 是否跳过数据清洗步骤(默认False) + pruning_config: 剪枝配置字典,包含 pruning_switch, pruning_scene, pruning_threshold Returns: 带 chunks 的 DialogData 列表 @@ -2030,7 +2032,19 @@ async def get_chunked_dialogs_with_preprocessing( from app.core.memory.storage_services.extraction_engine.data_preprocessing.data_pruning import ( SemanticPruner, ) - pruner = SemanticPruner(llm_client=llm_client) + from app.core.memory.models.config_models import PruningConfig + + # 构建剪枝配置 + if pruning_config: + # 使用传入的配置 + config = PruningConfig(**pruning_config) + print(f"[剪枝] 使用传入配置: switch={config.pruning_switch}, scene={config.pruning_scene}, threshold={config.pruning_threshold}") + else: + # 使用默认配置(关闭剪枝) + config = None + print("[剪枝] 未提供配置,使用默认配置(剪枝关闭)") + + pruner = SemanticPruner(config=config, llm_client=llm_client) # 记录单对话场景下剪枝前的消息数量 single_dialog_original_msgs = None