[add]The semantic pruning function is activated, removing the protection of question-answer pairs.

This commit is contained in:
lanceyq
2026-02-28 17:18:42 +08:00
parent f7aed9dd98
commit 3a0671c661
4 changed files with 619 additions and 291 deletions

View File

@@ -21,7 +21,7 @@ async def get_chunked_dialogs(
end_user_id: Group identifier
messages: Structured message list [{"role": "user", "content": "..."}, ...]
ref_id: Reference identifier
config_id: Configuration ID for processing
config_id: Configuration ID for processing (used to load pruning config)
Returns:
List of DialogData objects with generated chunks
@@ -57,6 +57,61 @@ async def get_chunked_dialogs(
end_user_id=end_user_id,
config_id=config_id
)
# 语义剪枝步骤(在分块之前)
try:
from app.core.memory.storage_services.extraction_engine.data_preprocessing.data_pruning import SemanticPruner
from app.core.memory.models.config_models import PruningConfig
from app.db import get_db_context
from app.services.memory_config_service import MemoryConfigService
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
# 加载剪枝配置
pruning_config = None
if config_id:
try:
with get_db_context() as db:
# 使用 MemoryConfigService 加载完整的 MemoryConfig 对象
config_service = MemoryConfigService(db)
memory_config = config_service.load_memory_config(
config_id=config_id,
service_name="semantic_pruning"
)
if memory_config:
pruning_config = PruningConfig(
pruning_switch=memory_config.pruning_enabled,
pruning_scene=memory_config.pruning_scene or "education",
pruning_threshold=memory_config.pruning_threshold
)
logger.info(f"[剪枝] 加载配置: switch={pruning_config.pruning_switch}, scene={pruning_config.pruning_scene}, threshold={pruning_config.pruning_threshold}")
# 获取LLM客户端用于剪枝
if pruning_config.pruning_switch:
factory = MemoryClientFactory(db)
llm_client = factory.get_llm_client_from_config(memory_config)
# 执行剪枝 - 使用 prune_dataset 支持消息级剪枝
pruner = SemanticPruner(config=pruning_config, llm_client=llm_client)
original_msg_count = len(dialog_data.context.msgs)
# 使用 prune_dataset 而不是 prune_dialog
# prune_dataset 会进行消息级剪枝,即使对话整体相关也会删除不重要消息
pruned_dialogs = await pruner.prune_dataset([dialog_data])
if pruned_dialogs:
dialog_data = pruned_dialogs[0]
remaining_msg_count = len(dialog_data.context.msgs)
deleted_count = original_msg_count - remaining_msg_count
logger.info(f"[剪枝] 完成: 原始{original_msg_count}条 -> 保留{remaining_msg_count}条 (删除{deleted_count}条)")
else:
logger.warning("[剪枝] prune_dataset 返回空列表")
else:
logger.info("[剪枝] 配置中剪枝开关关闭,跳过剪枝")
except Exception as e:
logger.warning(f"[剪枝] 加载配置失败,跳过剪枝: {e}", exc_info=True)
except Exception as e:
logger.warning(f"[剪枝] 执行失败,跳过剪枝: {e}", exc_info=True)
chunker = DialogueChunker(chunker_strategy)
extracted_chunks = await chunker.process_dialogue(dialog_data)

View File

@@ -22,6 +22,10 @@ from app.core.memory.models.message_models import DialogData, ConversationMessag
from app.core.memory.models.config_models import PruningConfig
from app.core.memory.utils.config.config_utils import get_pruning_config
from app.core.memory.utils.prompt.prompt_utils import prompt_env, log_prompt_rendering, log_template_rendering
from app.core.memory.storage_services.extraction_engine.data_preprocessing.scene_config import (
SceneConfigRegistry,
ScenePatterns
)
class DialogExtractionResponse(BaseModel):
@@ -78,6 +82,20 @@ class SemanticPruner:
self.language = language # 保存语言配置
self.max_concurrent = max_concurrent # 新增:最大并发数
# 加载场景特定配置
self.scene_config: ScenePatterns = SceneConfigRegistry.get_config(
self.config.pruning_scene,
fallback_to_generic=True
)
# 检查场景是否有专门支持
is_supported = SceneConfigRegistry.is_scene_supported(self.config.pruning_scene)
if is_supported:
self._log(f"[剪枝-初始化] 场景={self.config.pruning_scene} 使用专门配置")
else:
self._log(f"[剪枝-初始化] 场景={self.config.pruning_scene} 未预定义,使用通用配置(保守策略)")
self._log(f"[剪枝-初始化] 支持的场景: {SceneConfigRegistry.get_all_scenes()}")
# Load Jinja2 template
self.template = prompt_env.get_template("extracat_Pruning.jinja2")
@@ -87,108 +105,80 @@ class SemanticPruner:
# 运行日志:收集关键终端输出,便于写入 JSON
self.run_logs: List[str] = []
# 扩展的填充词库(包含表情符号和网络用语)
self._extended_fillers = [
# 基础寒暄
"你好", "您好", "在吗", "在的", "在呢", "", "嗯嗯", "", "哦哦",
"好的", "", "", "可以", "不可以", "谢谢", "多谢", "感谢",
"拜拜", "再见", "88", "", "回见",
# 口头禅
"哈哈", "呵呵", "哈哈哈", "嘿嘿", "嘻嘻", "hiahia",
"", "", "", "", "", "", "嗯哼",
# 确认词
"是的", "", "对的", "没错", "嗯嗯", "好嘞", "收到", "明白", "了解", "知道了",
# 标点和符号
"。。。", "...", "???", "", "!!!", "",
# 表情符号(文本形式)
"[微笑]", "[呲牙]", "[发呆]", "[得意]", "[流泪]", "[害羞]", "[闭嘴]",
"[睡]", "[大哭]", "[尴尬]", "[发怒]", "[调皮]", "[龇牙]", "[惊讶]",
"[难过]", "[酷]", "[冷汗]", "[抓狂]", "[吐]", "[偷笑]", "[可爱]",
"[白眼]", "[傲慢]", "[饥饿]", "[困]", "[惊恐]", "[流汗]", "[憨笑]",
# 网络用语
"hhh", "hhhh", "2333", "666", "gg", "ok", "OK", "okok",
"emmm", "emm", "em", "mmp", "wtf", "omg",
]
def _is_important_message(self, message: ConversationMessage) -> bool:
"""基于启发式规则识别重要信息消息,优先保留。
改进版:增强了规则覆盖范围,包括:
- 含日期/时间如YYYY-MM-DD、HH:MM、2024年11月10日、上午/下午)
- 含编号/ID/订单号/申请号/账号/电话/金额等关键字段
- 关键词:"时间""日期""编号""订单""流水""金额""""""电话""手机号""邮箱""地址"
- 新增:问句识别、决策性语句、承诺性语句
改进版:使用场景特定的模式进行识别
- 根据 pruning_scene 动态加载对应的识别规则
- 支持教育、在线服务、外呼三个场景的特定模式
"""
text = message.msg.strip()
if not text:
return False
patterns = [
# 原有模式
r"\d{4}-\d{1,2}-\d{1,2}", # 修复:移除 \b 边界,因为中文前后没有单词边界
r"\d{1,2}:\d{2}", # 修复:移除 \b
r"\d{4}\d{1,2}月\d{1,2}日",
r"上午|下午|AM|PM|今天|明天|后天|昨天|前天|本周|下周|上周|本月|下月|上月",
r"订单号|工单|申请号|编号|ID|账号|账户|流水号|单号",
r"电话|手机号|微信|QQ|邮箱|联系方式",
r"地址|地点|位置|门牌号",
r"金额|费用|价格|¥|¥|\d+元|人民币|美元|欧元",
r"时间|日期|有效期|截止|期限|到期",
# 新增模式
r"什么|为什么|怎么|如何|哪里|哪个|谁|多少|几点|何时", # 问句关键词
r"必须|一定|务必|需要|要求|规定|应该", # 决策性语句
r"承诺|保证|确保|负责|同意|答应", # 承诺性语句
r"\d{11}", # 11位手机号
r"\d{3,4}-\d{7,8}", # 固定电话
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", # 邮箱
]
# 使用场景特定的模式
all_patterns = (
self.scene_config.high_priority_patterns +
self.scene_config.medium_priority_patterns +
self.scene_config.low_priority_patterns
)
for p in patterns:
if re.search(p, text, flags=re.IGNORECASE):
for pattern, _ in all_patterns:
if re.search(pattern, text, flags=re.IGNORECASE):
return True
# 检查是否为问句(以问号结尾或包含疑问词)
if text.endswith("") or text.endswith("?"):
return True
# 检查是否包含问句关键词
if any(keyword in text for keyword in self.scene_config.question_keywords):
return True
# 检查是否包含决策性关键词
if any(keyword in text for keyword in self.scene_config.decision_keywords):
return True
return False
def _importance_score(self, message: ConversationMessage) -> int:
"""为重要消息打分,用于在保留比例内优先保留更关键的内容。
改进版:更细致的评分体系0-10分
改进版:使用场景特定的权重体系0-10分
- 根据场景动态调整不同信息类型的权重
- 高优先级模式4-6分
- 中优先级模式2-3分
- 低优先级模式1分
"""
text = message.msg.strip()
score = 0
weights = [
# 高优先级4-5分
(r"订单号|工单|申请号|编号|ID|账号|账户", 5),
(r"金额|费用|价格|¥|¥|\d+元", 5),
(r"\d{11}", 4), # 手机号
(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", 4), # 邮箱
# 中优先级2-3分
(r"\d{4}-\d{1,2}-\d{1,2}", 3), # 修复:移除 \b
(r"\d{4}\d{1,2}月\d{1,2}日", 3),
(r"电话|手机号|微信|QQ|联系方式", 3),
(r"地址|地点|位置", 2),
(r"时间|日期|有效期|截止|明天|后天|下周|下月", 2), # 新增时间相关词
# 低优先级1分
(r"\d{1,2}:\d{2}", 1), # 修复:移除 \b
(r"上午|下午|AM|PM", 1),
]
# 使用场景特定的权重
for pattern, weight in self.scene_config.high_priority_patterns:
if re.search(pattern, text, flags=re.IGNORECASE):
score += weight
for p, w in weights:
if re.search(p, text, flags=re.IGNORECASE):
score += w
for pattern, weight in self.scene_config.medium_priority_patterns:
if re.search(pattern, text, flags=re.IGNORECASE):
score += weight
for pattern, weight in self.scene_config.low_priority_patterns:
if re.search(pattern, text, flags=re.IGNORECASE):
score += weight
# 问句加分
if text.endswith("") or text.endswith("?"):
score += 2
# 包含问句关键词加分
if any(keyword in text for keyword in self.scene_config.question_keywords):
score += 1
# 包含决策性关键词加分
if any(keyword in text for keyword in self.scene_config.decision_keywords):
score += 2
# 长度加分(较长的消息通常包含更多信息)
if len(text) > 50:
score += 1
@@ -198,20 +188,35 @@ class SemanticPruner:
return min(score, 10) # 最高10分
def _is_filler_message(self, message: ConversationMessage) -> bool:
"""检测典型寒暄/口头禅/确认类短消息用于跳过LLM分类以加速
"""检测典型寒暄/口头禅/确认类短消息。
改进版:扩展了填充词库,支持表情符号和网络用语
改进版:更严格的填充消息判断,避免误删场景相关内容
满足以下之一视为填充消息:
- 纯标点或长度很短(<= 4 个汉字或 <= 8 个字符)且不包含数字或关键实体
- 在扩展填充词库中
- 纯标点或空白
- 在场景特定填充词库中(精确匹配)
- 纯表情符号
- 常见寒暄(精确匹配短语)
注意:不再使用长度判断,避免误删短但重要的消息
"""
t = message.msg.strip()
if not t:
return True
# 检查是否在扩展填充词库中
if t in self._extended_fillers:
# 检查是否在场景特定填充词库中(精确匹配)
if t in self.scene_config.filler_phrases:
return True
# 常见寒暄和问候(精确匹配,避免误删)
common_greetings = {
"在吗", "在不在", "在呢", "在的",
"你好", "您好", "hello", "hi",
"拜拜", "再见", "", "88", "bye",
"好的", "", "", "可以", "", "", "",
"是的", "", "对的", "没错", "是啊",
"哈哈", "呵呵", "嘿嘿", "嗯嗯"
}
if t in common_greetings:
return True
# 检查是否为纯表情符号(方括号包裹)
@@ -232,13 +237,9 @@ class SemanticPruner:
if emoji_pattern.fullmatch(t):
return True
# 长度与字符类型判断
if len(t) <= 8:
# 非数字、无关键实体的短文本
if not re.search(r"[0-9]", t) and not self._is_important_message(message):
# 主要是标点或简单确认词
if re.fullmatch(r"[。!?,.!?…·\s]+", t):
return True
# 纯标点符号
if re.fullmatch(r"[。!?,.!?…·\s]+", t):
return True
return False
@@ -308,6 +309,8 @@ class SemanticPruner:
def _identify_qa_pairs(self, messages: List[ConversationMessage]) -> List[QAPair]:
"""识别对话中的问答对,用于保护问答结构的完整性。
改进版:使用场景特定的问句关键词,并排除寒暄类问句
Args:
messages: 消息列表
@@ -316,21 +319,39 @@ class SemanticPruner:
"""
qa_pairs = []
# 寒暄类问句,不应该被保护(这些不是真正的问答)
greeting_questions = {
"在吗", "在不在", "你好吗", "怎么样", "好吗",
"有空吗", "忙吗", "睡了吗", "起床了吗"
}
for i in range(len(messages) - 1):
current_msg = messages[i].msg.strip()
next_msg = messages[i + 1].msg.strip()
# 简单规则:如果当前消息是问句,下一条消息可能是答案
is_question = (
current_msg.endswith("") or
current_msg.endswith("?") or
any(word in current_msg for word in ["什么", "为什么", "怎么", "如何", "哪里", "哪个", "", "多少", "几点", "何时", ""])
)
# 排除寒暄类问句
if current_msg in greeting_questions:
continue
# 使用场景特定的问句关键词,但要求更严格
is_question = False
# 1. 以问号结尾
if current_msg.endswith("") or current_msg.endswith("?"):
is_question = True
# 2. 包含实质性问句关键词(排除"吗"这种太宽泛的)
elif any(word in current_msg for word in ["什么", "为什么", "怎么", "如何", "哪里", "哪个", "", "多少", "几点", "何时"]):
is_question = True
if is_question and next_msg:
# 检查下一条消息是否像答案(不是另一个问句)
# 检查下一条消息是否像答案(不是另一个问句,也不是寒暄
is_answer = not (next_msg.endswith("") or next_msg.endswith("?"))
# 排除寒暄类回复
greeting_answers = {"你好", "您好", "在呢", "在的", "", "", "好的"}
if next_msg in greeting_answers:
is_answer = False
if is_answer:
qa_pairs.append(QAPair(
question_idx=i,
@@ -533,10 +554,9 @@ class SemanticPruner:
"""数据集层面:全局消息级剪枝,保留所有对话。
改进版:
- 并发处理对话级相关性判断
- 问答对识别和保护
- 优化删除策略,保持上下文连贯性
- 仅在"不相关对话"的范围内执行消息剪枝;相关对话不动
- 消息级独立判断,每条消息根据场景规则独立评估
- 问答对保护已注释(暂不启用,留作观察)
- 优化删除策略:填充消息 → 不重要消息 → 低分重要消息
- 只删除"不重要的不相关消息",重要信息(时间、编号等)强制保留
- 保证每段对话至少保留1条消息不会删除整段对话
"""
@@ -553,209 +573,122 @@ class SemanticPruner:
proportion = 0.0
self._log(
f"[剪枝-数据集] 对话总数={len(dialogs)} 场景={self.config.pruning_scene} 删除比例={proportion} 开关={self.config.pruning_switch}"
f"[剪枝-数据集] 对话总数={len(dialogs)} 场景={self.config.pruning_scene} 删除比例={proportion} 开关={self.config.pruning_switch} 模式=消息级独立判断"
)
# 并发处理对话级相关性分类
semaphore = asyncio.Semaphore(self.max_concurrent)
async def classify_dialog(idx: int, dd: DialogData):
async with semaphore:
try:
ex = await self._extract_dialog_important(dd.content)
return {
"dialog": dd,
"is_related": bool(ex.is_related),
"index": idx,
"extraction": ex
}
except Exception as e:
self._log(f"[剪枝-并发] 对话 {idx} 分类失败: {str(e)[:100]}")
return {
"dialog": dd,
"is_related": True, # 失败时标记为相关,避免误删
"index": idx,
"extraction": None
}
# 并发执行所有对话的分类
tasks = [classify_dialog(idx, dd) for idx, dd in enumerate(dialogs)]
evaluated_dialogs = await asyncio.gather(*tasks)
# 统计相关 / 不相关对话
not_related_dialogs = [d for d in evaluated_dialogs if not d["is_related"]]
related_dialogs = [d for d in evaluated_dialogs if d["is_related"]]
self._log(
f"[剪枝-数据集] 相关对话数={len(related_dialogs)} 不相关对话数={len(not_related_dialogs)}"
)
# 简洁打印第几段对话相关/不相关索引基于1
def _fmt_indices(items, cap: int = 10):
inds = [i["index"] + 1 for i in items]
if len(inds) <= cap:
return inds
return inds[:cap] + ["...", f"{len(inds)}"]
rel_inds = _fmt_indices(related_dialogs)
nrel_inds = _fmt_indices(not_related_dialogs)
self._log(f"[剪枝-数据集] 相关对话:第{rel_inds}段;不相关对话:第{nrel_inds}")
result: List[DialogData] = []
if not_related_dialogs:
# 为每个不相关对话进行分析
per_dialog_info = {}
total_unrelated = 0
total_original_msgs = 0
total_deleted_msgs = 0
for d_idx, dd in enumerate(dialogs):
msgs = dd.context.msgs
original_count = len(msgs)
total_original_msgs += original_count
for d in not_related_dialogs:
dd = d["dialog"]
extraction = d.get("extraction")
if extraction is None:
extraction = await self._extract_dialog_important(dd.content)
# 合并所有重要标记
tokens = extraction.times + extraction.ids + extraction.amounts + extraction.contacts + extraction.addresses + extraction.keywords
msgs = dd.context.msgs
# 识别问答对
qa_pairs = self._identify_qa_pairs(msgs)
protected_indices = self._get_protected_indices(msgs, qa_pairs, window_size=1)
# 分类消息(考虑问答对保护)
imp_unrel_msgs = []
unimp_unrel_msgs = []
for idx, m in enumerate(msgs):
# 问答对中的消息自动标记为重要
if idx in protected_indices:
imp_unrel_msgs.append((idx, m))
elif self._msg_matches_tokens(m, tokens) or self._is_important_message(m):
imp_unrel_msgs.append((idx, m))
elif not self._is_filler_message(m):
unimp_unrel_msgs.append((idx, m))
# 填充消息不加入任何列表,优先删除
# 重要消息按重要性排序
imp_sorted = sorted(imp_unrel_msgs, key=lambda x: self._importance_score(x[1]))
imp_sorted_ids = [id(m) for _, m in imp_sorted]
info = {
"dialog": dd,
"total_msgs": len(msgs),
"unrelated_count": len(msgs),
"imp_ids_sorted": imp_sorted_ids,
"unimp_ids": [id(m) for _, m in unimp_unrel_msgs],
"protected_indices": protected_indices,
"qa_pairs_count": len(qa_pairs),
}
per_dialog_info[d["index"]] = info
total_unrelated += info["unrelated_count"]
# ========== 问答对保护(已注释,暂不启用,留作观察) ==========
# qa_pairs = self._identify_qa_pairs(msgs)
# protected_indices = self._get_protected_indices(msgs, qa_pairs, window_size=0)
# ========================================================
# 全局删除配额计算
global_delete = int(total_unrelated * proportion)
if proportion > 0 and total_unrelated > 0 and global_delete == 0:
global_delete = 1
# 消息级分类:每条消息独立判断
important_msgs = [] # 重要消息(保留)
unimportant_msgs = [] # 不重要消息(可删除)
filler_msgs = [] # 填充消息(优先删除)
# 每段的最大可删容量
capacities = []
for d in not_related_dialogs:
idx = d["index"]
info = per_dialog_info[idx]
imp_count = len(info["imp_ids_sorted"])
unimp_count = len(info["unimp_ids"])
imp_cap = int(imp_count * proportion)
cap = min(unimp_count + imp_cap, max(0, info["total_msgs"] - 1))
capacities.append(cap)
for idx, m in enumerate(msgs):
msg_text = m.msg.strip()
# ========== 问答对保护判断(已注释) ==========
# if idx in protected_indices:
# important_msgs.append((idx, m))
# self._log(f" [{idx}] '{msg_text[:30]}...' → 重要(问答对保护)")
# ==========================================
# 填充消息(寒暄、表情等)
if self._is_filler_message(m):
filler_msgs.append((idx, m))
self._log(f" [{idx}] '{msg_text[:30]}...' → 填充")
# 重要信息(学号、成绩、时间、金额等)
elif self._is_important_message(m):
important_msgs.append((idx, m))
self._log(f" [{idx}] '{msg_text[:30]}...' → 重要(场景规则)")
# 其他消息
else:
unimportant_msgs.append((idx, m))
self._log(f" [{idx}] '{msg_text[:30]}...' → 不重要")
total_capacity = sum(capacities)
if global_delete > total_capacity:
self._log(f"[剪枝-数据集] 不相关消息总数={total_unrelated},目标删除={global_delete},最大可删={total_capacity}。将按最大可删执行。")
global_delete = total_capacity
# 配额分配
alloc = []
for i, d in enumerate(not_related_dialogs):
idx = d["index"]
info = per_dialog_info[idx]
share = int(global_delete * (info["unrelated_count"] / total_unrelated)) if total_unrelated > 0 else 0
alloc.append(min(share, capacities[i]))
# 计算删除配额
delete_target = int(original_count * proportion)
if proportion > 0 and original_count > 0 and delete_target == 0:
delete_target = 1
allocated = sum(alloc)
rem = global_delete - allocated
turn = 0
while rem > 0 and turn < 100000:
progressed = False
for i in range(len(not_related_dialogs)):
if rem <= 0:
break
if alloc[i] < capacities[i]:
alloc[i] += 1
rem -= 1
progressed = True
if not progressed:
break
turn += 1
# 应用删除
total_deleted_confirm = 0
for d in evaluated_dialogs:
dd = d["dialog"]
msgs = dd.context.msgs
original = len(msgs)
if d["is_related"]:
result.append(dd)
continue
idx_in_unrel = next((k for k, x in enumerate(not_related_dialogs) if x["index"] == d["index"]), None)
if idx_in_unrel is None:
result.append(dd)
continue
quota = alloc[idx_in_unrel]
info = per_dialog_info[d["index"]]
# 计算删除ID
imp_count = len(info["imp_ids_sorted"])
imp_del_cap = int(imp_count * proportion)
unimp_delete_ids = set(info["unimp_ids"][:min(quota, len(info["unimp_ids"]))])
del_unimp = min(quota, len(unimp_delete_ids))
rem_quota = quota - del_unimp
imp_delete_ids = set(info["imp_ids_sorted"][:min(rem_quota, imp_del_cap)])
deleted_here = 0
actual_unimp_deleted = 0
actual_imp_deleted = 0
kept = []
for m in msgs:
mid = id(m)
if mid in unimp_delete_ids and actual_unimp_deleted < del_unimp:
actual_unimp_deleted += 1
deleted_here += 1
continue
if mid in imp_delete_ids and actual_imp_deleted < len(imp_delete_ids):
actual_imp_deleted += 1
deleted_here += 1
continue
kept.append(m)
if not kept and msgs:
kept = [msgs[0]]
dd.context.msgs = kept
total_deleted_confirm += deleted_here
qa_info = f",问答对={info['qa_pairs_count']}" if info['qa_pairs_count'] > 0 else ""
self._log(
f"[剪枝-对话] 对话 {d['index']+1} 总消息={original} 分配删除={quota} 实删={deleted_here} 保留={len(kept)}{qa_info}"
)
result.append(dd)
# 确保至少保留1条消息
max_deletable = max(0, original_count - 1)
delete_target = min(delete_target, max_deletable)
self._log(f"[剪枝-数据集] 全局消息级剪枝完成,总删除 {total_deleted_confirm} 条(保护问答对和上下文)。")
else:
result = [d["dialog"] for d in evaluated_dialogs]
# 删除策略:优先删除填充消息,再删除不重要消息
to_delete_indices = set()
deleted_details = [] # 记录删除的消息详情
# 第一步:删除填充消息
filler_to_delete = min(len(filler_msgs), delete_target)
for i in range(filler_to_delete):
idx, msg = filler_msgs[i]
to_delete_indices.add(idx)
deleted_details.append(f"[{idx}] 填充: '{msg.msg[:50]}'")
# 第二步:如果还需要删除,删除不重要消息
remaining_quota = delete_target - len(to_delete_indices)
if remaining_quota > 0:
unimp_to_delete = min(len(unimportant_msgs), remaining_quota)
for i in range(unimp_to_delete):
idx, msg = unimportant_msgs[i]
to_delete_indices.add(idx)
deleted_details.append(f"[{idx}] 不重要: '{msg.msg[:50]}'")
# 第三步:如果还需要删除,按重要性分数删除重要消息
remaining_quota = delete_target - len(to_delete_indices)
if remaining_quota > 0 and important_msgs:
# 按重要性分数排序(分数低的优先删除)
imp_sorted = sorted(important_msgs, key=lambda x: self._importance_score(x[1]))
imp_to_delete = min(len(imp_sorted), remaining_quota)
for i in range(imp_to_delete):
idx, msg = imp_sorted[i]
to_delete_indices.add(idx)
score = self._importance_score(msg)
deleted_details.append(f"[{idx}] 重要(分数{score}): '{msg.msg[:50]}'")
# 执行删除
kept_msgs = []
for idx, m in enumerate(msgs):
if idx not in to_delete_indices:
kept_msgs.append(m)
# 确保至少保留1条
if not kept_msgs and msgs:
kept_msgs = [msgs[0]]
dd.context.msgs = kept_msgs
deleted_count = original_count - len(kept_msgs)
total_deleted_msgs += deleted_count
# 输出删除详情
if deleted_details:
self._log(f"[剪枝-删除详情] 对话 {d_idx+1} 删除了以下消息:")
for detail in deleted_details:
self._log(f" {detail}")
# ========== 问答对统计(已注释) ==========
# qa_info = f",问答对={len(qa_pairs)}" if qa_pairs else ""
# ========================================
self._log(
f"[剪枝-对话] 对话 {d_idx+1} 总消息={original_count} "
f"(重要={len(important_msgs)} 不重要={len(unimportant_msgs)} 填充={len(filler_msgs)}) "
f"删除={deleted_count} 保留={len(kept_msgs)}"
)
result.append(dd)
self._log(f"[剪枝-数据集] 剩余对话数={len(result)}")

View File

@@ -0,0 +1,326 @@
"""
场景特定配置 - 为不同场景提供定制化的剪枝规则
功能:
- 场景特定的重要信息识别模式
- 场景特定的重要性评分权重
- 场景特定的填充词库
- 场景特定的问答对识别规则
"""
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass, field
@dataclass
class ScenePatterns:
"""场景特定的识别模式"""
# 重要信息的正则模式(优先级从高到低)
high_priority_patterns: List[Tuple[str, int]] = field(default_factory=list) # (pattern, weight)
medium_priority_patterns: List[Tuple[str, int]] = field(default_factory=list)
low_priority_patterns: List[Tuple[str, int]] = field(default_factory=list)
# 填充词库(无意义对话)
filler_phrases: Set[str] = field(default_factory=set)
# 问句关键词(用于识别问答对)
question_keywords: Set[str] = field(default_factory=set)
# 决策性/承诺性关键词
decision_keywords: Set[str] = field(default_factory=set)
class SceneConfigRegistry:
"""场景配置注册表 - 管理所有场景的特定配置"""
# 基础通用模式(所有场景共享)
BASE_HIGH_PRIORITY = [
(r"订单号|工单|申请号|编号|ID|账号|账户", 5),
(r"金额|费用|价格|¥|¥|\d+元", 5),
(r"\d{11}", 4), # 手机号
(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", 4), # 邮箱
]
BASE_MEDIUM_PRIORITY = [
(r"\d{4}-\d{1,2}-\d{1,2}", 3), # 日期
(r"\d{4}\d{1,2}月\d{1,2}日", 3),
(r"电话|手机号|微信|QQ|联系方式", 3),
(r"地址|地点|位置", 2),
(r"时间|日期|有效期|截止", 2),
(r"今天|明天|后天|昨天|前天", 3), # 相对时间(提高权重)
(r"下周|下月|下年|上周|上月|上年|本周|本月|本年", 3),
(r"今年|去年|明年", 3),
]
BASE_LOW_PRIORITY = [
(r"\d{1,2}:\d{2}", 2), # 时间点 HH:MM
(r"\d{1,2}点\d{0,2}分?", 2), # 时间点 X点Y分 或 X点
(r"上午|下午|中午|晚上|早上|傍晚|凌晨", 2), # 时段(提高权重并扩充)
(r"AM|PM|am|pm", 1),
]
BASE_FILLERS = {
# 基础寒暄
"你好", "您好", "在吗", "在的", "在呢", "", "嗯嗯", "", "哦哦",
"好的", "", "", "可以", "不可以", "谢谢", "多谢", "感谢",
"拜拜", "再见", "88", "", "回见",
# 口头禅
"哈哈", "呵呵", "哈哈哈", "嘿嘿", "嘻嘻", "hiahia",
"", "", "", "", "", "", "嗯哼",
# 确认词
"是的", "", "对的", "没错", "嗯嗯", "好嘞", "收到", "明白", "了解", "知道了",
# 标点和符号
"。。。", "...", "???", "", "!!!", "",
# 表情符号
"[微笑]", "[呲牙]", "[发呆]", "[得意]", "[流泪]", "[害羞]", "[闭嘴]",
"[睡]", "[大哭]", "[尴尬]", "[发怒]", "[调皮]", "[龇牙]", "[惊讶]",
"[难过]", "[酷]", "[冷汗]", "[抓狂]", "[吐]", "[偷笑]", "[可爱]",
"[白眼]", "[傲慢]", "[饥饿]", "[困]", "[惊恐]", "[流汗]", "[憨笑]",
# 网络用语
"hhh", "hhhh", "2333", "666", "gg", "ok", "OK", "okok",
"emmm", "emm", "em", "mmp", "wtf", "omg",
}
BASE_QUESTION_KEYWORDS = {
"什么", "为什么", "怎么", "如何", "哪里", "哪个", "", "多少", "几点", "何时", ""
}
BASE_DECISION_KEYWORDS = {
"必须", "一定", "务必", "需要", "要求", "规定", "应该",
"承诺", "保证", "确保", "负责", "同意", "答应"
}
@classmethod
def get_education_config(cls) -> ScenePatterns:
"""教育场景配置"""
return ScenePatterns(
high_priority_patterns=cls.BASE_HIGH_PRIORITY + [
# 成绩相关(最高优先级)
(r"成绩|分数|得分|满分|及格|不及格", 6),
(r"GPA|绩点|学分|平均分", 6),
(r"\d+分|\d+\.?\d*分", 5), # 具体分数
(r"排名|名次|第.{1,3}名", 5), # 支持"第三名"、"第1名"等
# 学籍信息
(r"学号|学生证|教师工号|工号", 5),
(r"班级|年级|专业|院系", 4),
# 课程相关
(r"课程|科目|学科|必修|选修", 4),
(r"教材|课本|教科书|参考书", 4),
(r"章节|第.{1,3}章|第.{1,3}节", 3), # 支持"第三章"、"第1章"等
# 学科内容(新增)
(r"微积分|导数|积分|函数|极限|微分", 4),
(r"代数|几何|三角|概率|统计", 4),
(r"物理|化学|生物|历史|地理", 4),
(r"英语|语文|数学|政治|哲学", 4),
(r"定义|定理|公式|概念|原理|法则", 3),
(r"例题|解题|证明|推导|计算", 3),
],
medium_priority_patterns=cls.BASE_MEDIUM_PRIORITY + [
# 教学活动
(r"作业|练习|习题|题目", 3),
(r"考试|测验|测试|考核|期中|期末", 3),
(r"上课|下课|课堂|讲课", 2),
(r"提问|回答|发言|讨论", 2),
(r"问一下|请教|咨询|询问", 2), # 新增:问询相关
(r"理解|明白|懂|掌握|学会", 2), # 新增:学习状态
# 时间安排
(r"课表|课程表|时间表", 3),
(r"第.{1,3}节课|第.{1,3}周", 2), # 支持"第三节课"、"第1周"等
],
low_priority_patterns=cls.BASE_LOW_PRIORITY + [
(r"老师|教师|同学|学生", 1),
(r"教室|实验室|图书馆", 1),
],
filler_phrases=cls.BASE_FILLERS | {
# 教育场景特有填充词(移除了"明白了"、"懂了"、"不懂"等,这些在教育场景中有意义)
"老师好", "同学们好", "上课", "下课", "起立", "坐下",
"举手", "请坐", "很好", "不错", "继续",
"下一个", "下一题", "下一位", "还有吗", "还有问题吗",
},
question_keywords=cls.BASE_QUESTION_KEYWORDS | {
"为啥", "", "咋办", "怎样", "如何做",
"能不能", "可不可以", "行不行", "对不对", "是不是",
},
decision_keywords=cls.BASE_DECISION_KEYWORDS | {
"必考", "重点", "考点", "难点", "关键",
"记住", "背诵", "掌握", "理解", "复习",
}
)
@classmethod
def get_online_service_config(cls) -> ScenePatterns:
"""在线服务场景配置"""
return ScenePatterns(
high_priority_patterns=cls.BASE_HIGH_PRIORITY + [
# 工单相关(最高优先级)
(r"工单号|工单编号|ticket|TK\d+", 6),
(r"工单状态|处理中|已解决|已关闭|待处理", 5),
(r"优先级|紧急|高优先级|P0|P1|P2", 5),
# 产品信息
(r"产品型号|型号|SKU|产品编号", 5),
(r"序列号|SN|设备号", 5),
(r"版本号|软件版本|固件版本", 4),
# 问题描述
(r"故障|错误|异常|bug|问题", 4),
(r"错误代码|故障代码|error code", 5),
(r"无法|不能|失败|报错", 3),
],
medium_priority_patterns=cls.BASE_MEDIUM_PRIORITY + [
# 服务相关
(r"退款|退货|换货|补发", 4),
(r"发票|收据|凭证", 3),
(r"物流|快递|运单号", 3),
(r"保修|质保|售后", 3),
# 时效相关
(r"SLA|响应时间|处理时长", 4),
(r"超时|延迟|等待", 2),
],
low_priority_patterns=cls.BASE_LOW_PRIORITY + [
(r"客服|工程师|技术支持", 1),
(r"用户|客户|会员", 1),
],
filler_phrases=cls.BASE_FILLERS | {
# 在线服务特有填充词
"您好", "请问", "请稍等", "稍等", "马上", "立即",
"正在查询", "正在处理", "正在为您", "帮您查一下",
"还有其他问题吗", "还需要什么帮助", "很高兴为您服务",
"感谢您的耐心等待", "抱歉让您久等了",
"已记录", "已反馈", "已转接", "已升级",
"祝您生活愉快", "再见", "欢迎下次咨询",
},
question_keywords=cls.BASE_QUESTION_KEYWORDS | {
"能否", "可否", "是否", "有没有", "能不能",
"怎么办", "如何处理", "怎么解决",
},
decision_keywords=cls.BASE_DECISION_KEYWORDS | {
"立即处理", "马上解决", "尽快", "优先",
"升级", "转接", "派单", "跟进",
"补偿", "赔偿", "退款", "换货",
}
)
@classmethod
def get_outbound_config(cls) -> ScenePatterns:
"""外呼场景配置"""
return ScenePatterns(
high_priority_patterns=cls.BASE_HIGH_PRIORITY + [
# 意向相关(最高优先级)
(r"意向|意愿|兴趣|感兴趣", 6),
(r"A类|B类|C类|D类|高意向|低意向", 6),
(r"成交|签约|下单|购买|确认", 6),
# 联系信息(外呼场景中更重要)
(r"预约|约定|安排|确定时间", 5),
(r"下次联系|回访|跟进", 5),
(r"方便|有空|可以|时间", 4),
# 通话状态
(r"接通|未接通|占线|关机|停机", 4),
(r"通话时长|通话时间", 3),
],
medium_priority_patterns=cls.BASE_MEDIUM_PRIORITY + [
# 客户信息
(r"姓名|称呼|先生|女士", 3),
(r"公司|单位|职位|职务", 3),
(r"需求|要求|期望", 3),
# 跟进状态
(r"跟进状态|进展|进度", 3),
(r"已联系|待联系|联系中", 2),
(r"拒绝|不感兴趣|考虑|再说", 3),
],
low_priority_patterns=cls.BASE_LOW_PRIORITY + [
(r"销售|客户经理|业务员", 1),
(r"产品|服务|方案", 1),
],
filler_phrases=cls.BASE_FILLERS | {
# 外呼场景特有填充词
"您好", "", "hello", "打扰了", "不好意思",
"方便接电话吗", "现在方便吗", "占用您一点时间",
"我是", "我们是", "我们公司", "我们这边",
"了解一下", "介绍一下", "简单说一下",
"考虑考虑", "想一想", "再说", "再看看",
"不需要", "不感兴趣", "没兴趣", "不用了",
"好的", "", "可以", "没问题", "那就这样",
"再联系", "回头聊", "有需要再说",
},
question_keywords=cls.BASE_QUESTION_KEYWORDS | {
"有没有", "需不需要", "要不要", "考虑不考虑",
"了解吗", "知道吗", "听说过吗",
"方便吗", "有空吗", "在吗",
},
decision_keywords=cls.BASE_DECISION_KEYWORDS | {
"确定", "决定", "选择", "购买", "下单",
"预约", "安排", "约定", "确认",
"跟进", "回访", "联系", "沟通",
}
)
@classmethod
def get_config(cls, scene: str, fallback_to_generic: bool = True) -> ScenePatterns:
"""根据场景名称获取配置
Args:
scene: 场景名称 ('education', 'online_service', 'outbound' 或其他)
fallback_to_generic: 如果场景不存在,是否降级到通用配置
Returns:
对应场景的配置,如果场景不存在:
- fallback_to_generic=True: 返回通用配置(仅基础规则)
- fallback_to_generic=False: 抛出异常
"""
scene_map = {
'education': cls.get_education_config,
'online_service': cls.get_online_service_config,
'outbound': cls.get_outbound_config,
}
if scene in scene_map:
return scene_map[scene]()
if fallback_to_generic:
# 返回通用配置(仅包含基础规则,不包含场景特定规则)
return cls.get_generic_config()
else:
raise ValueError(f"不支持的场景: {scene},支持的场景: {list(scene_map.keys())}")
@classmethod
def get_generic_config(cls) -> ScenePatterns:
"""通用场景配置 - 仅包含基础规则,适用于未定义的场景
这是一个保守的配置,只使用最通用的规则,避免误删重要信息
"""
return ScenePatterns(
high_priority_patterns=cls.BASE_HIGH_PRIORITY,
medium_priority_patterns=cls.BASE_MEDIUM_PRIORITY,
low_priority_patterns=cls.BASE_LOW_PRIORITY,
filler_phrases=cls.BASE_FILLERS,
question_keywords=cls.BASE_QUESTION_KEYWORDS,
decision_keywords=cls.BASE_DECISION_KEYWORDS
)
@classmethod
def get_all_scenes(cls) -> List[str]:
"""获取所有预定义场景的列表"""
return ['education', 'online_service', 'outbound']
@classmethod
def is_scene_supported(cls, scene: str) -> bool:
"""检查场景是否有专门的配置支持
Args:
scene: 场景名称
Returns:
True: 有专门配置
False: 将使用通用配置
"""
return scene in cls.get_all_scenes()

View File

@@ -1988,6 +1988,7 @@ async def get_chunked_dialogs_with_preprocessing(
input_data_path: Optional[str] = None,
llm_client: Optional[Any] = None,
skip_cleaning: bool = True,
pruning_config: Optional[Dict] = None,
) -> List[DialogData]:
"""包含数据预处理步骤的完整分块流程
@@ -2000,6 +2001,7 @@ async def get_chunked_dialogs_with_preprocessing(
input_data_path: 输入数据路径
llm_client: LLM 客户端
skip_cleaning: 是否跳过数据清洗步骤默认False
pruning_config: 剪枝配置字典,包含 pruning_switch, pruning_scene, pruning_threshold
Returns:
带 chunks 的 DialogData 列表
@@ -2030,7 +2032,19 @@ async def get_chunked_dialogs_with_preprocessing(
from app.core.memory.storage_services.extraction_engine.data_preprocessing.data_pruning import (
SemanticPruner,
)
pruner = SemanticPruner(llm_client=llm_client)
from app.core.memory.models.config_models import PruningConfig
# 构建剪枝配置
if pruning_config:
# 使用传入的配置
config = PruningConfig(**pruning_config)
print(f"[剪枝] 使用传入配置: switch={config.pruning_switch}, scene={config.pruning_scene}, threshold={config.pruning_threshold}")
else:
# 使用默认配置(关闭剪枝)
config = None
print("[剪枝] 未提供配置,使用默认配置(剪枝关闭)")
pruner = SemanticPruner(config=config, llm_client=llm_client)
# 记录单对话场景下剪枝前的消息数量
single_dialog_original_msgs = None