读取接口内层嵌套BUG修复

This commit is contained in:
lixinyue
2026-01-20 19:11:40 +08:00
parent 398964c747
commit 78559d98eb
5 changed files with 117 additions and 323 deletions

View File

@@ -26,22 +26,33 @@ class VerificationNodeService(LLMServiceMixin):
# 创建全局服务实例
verification_service = VerificationNodeService()
async def Verify_prompt(state: ReadState,messages_deal):
async def Verify_prompt(state: ReadState, messages_deal: VerificationResult):
"""处理验证结果并生成输出格式"""
storage_type = state.get('storage_type', '')
user_rag_memory_id = state.get('user_rag_memory_id', '')
data = state.get('data', '')
# 将 VerificationItem 对象转换为字典列表
verified_data = []
if messages_deal.expansion_issue:
for item in messages_deal.expansion_issue:
if hasattr(item, 'model_dump'):
verified_data.append(item.model_dump())
elif isinstance(item, dict):
verified_data.append(item)
Verify_result = {
"status": messages_deal.split_result,
"verified_data": messages_deal.expansion_issue,
"verified_data": verified_data,
"storage_type": storage_type,
"user_rag_memory_id": user_rag_memory_id,
"_intermediate": {
"type": "verification",
"title": "Data Verification",
"result": messages_deal.split_result,
"reason": messages_deal.reason,
"query": data,
"verified_count": len(messages_deal.expansion_issue),
"reason": messages_deal.reason or "验证完成",
"query": messages_deal.query,
"verified_count": len(verified_data),
"storage_type": storage_type,
"user_rag_memory_id": user_rag_memory_id
}
@@ -71,11 +82,16 @@ async def Verify(state: ReadState):
}
logger.info("Verify: 开始渲染模板")
# 生成 JSON schema 以指导 LLM 输出正确格式
json_schema = VerificationResult.model_json_schema()
system_prompt = await verification_service.template_service.render_template(
template_name='split_verify_prompt.jinja2',
operation_name='split_verify_prompt',
history=history,
sentence=messages
sentence=messages,
json_schema=json_schema
)
logger.info(f"Verify: 模板渲染完成prompt length={len(system_prompt)}")
@@ -92,10 +108,11 @@ async def Verify(state: ReadState):
system_prompt=system_prompt,
response_model=VerificationResult,
fallback_value={
"query": content, # 添加必填的 query 字段
"split_result": "fail",
"query": content,
"history": history if isinstance(history, list) else [],
"expansion_issue": [],
"reason": "验证失败"
"split_result": "failed",
"reason": "验证失败或超时"
}
),
timeout=150.0 # 150秒超时
@@ -105,8 +122,9 @@ async def Verify(state: ReadState):
logger.error("Verify: LLM 调用超时150秒使用 fallback 值")
structured = VerificationResult(
query=content,
split_result="fail",
history=history if isinstance(history, list) else [],
expansion_issue=[],
split_result="failed",
reason="LLM调用超时"
)

View File

@@ -4,11 +4,29 @@ from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
class VerificationItem(BaseModel):
"""Individual verification item for a query-answer pair."""
query_small: str = Field(..., description="子问题")
answer_small: str = Field(..., description="子问题的回答")
status: str = Field(..., description="验证状态True 或 False")
query_answer: str = Field(..., description="问题的答案(与 answer_small 相同)")
class VerificationResult(BaseModel):
"""Result model for verification operation."""
query: str
expansion_issue: List[Dict[str, Any]]
split_result: str
reason: Optional[str] = None
history: List[Dict[str, Any]] = Field(default_factory=list)
query: str = Field(..., description="原始查询问题")
history: List[Dict[str, Any]] = Field(default_factory=list, description="历史对话记录")
expansion_issue: List[VerificationItem] = Field(
default_factory=list,
description="验证后的数据列表,包含所有通过验证的问答对"
)
split_result: str = Field(
...,
description="验证结果状态successexpansion_issue 非空)或 failedexpansion_issue 为空)"
)
reason: Optional[str] = Field(
None,
description="验证结果的说明和分析"
)

View File

@@ -162,35 +162,22 @@ class OptimizedLLMService:
return fallback_value
elif isinstance(fallback_value, dict):
return response_model(**fallback_value)
elif isinstance(fallback_value, list):
# 对于 RootModel[List[...]] 类型,直接传入列表
if hasattr(response_model, 'model_fields') and 'root' in response_model.model_fields:
return response_model(root=fallback_value)
# 或者尝试直接传入Pydantic v2 的 RootModel 支持)
return response_model(fallback_value)
# 尝试创建空的响应模型
# 检查是否是 RootModel 类型(通过检查 __pydantic_root_model__ 属性)
if hasattr(response_model, '__pydantic_root_model__') and response_model.__pydantic_root_model__:
# RootModel类型 - 传入空列表
logger.debug(f"创建 RootModel 类型的空响应: {response_model.__name__}")
if hasattr(response_model, 'root'):
# RootModel类型
return response_model([])
else:
# 普通BaseModel类型 - 尝试无参数构造
logger.debug(f"创建普通 BaseModel 类型的空响应: {response_model.__name__}")
# 普通BaseModel类型
return response_model()
except Exception as e:
logger.error(f"创建降级响应失败: {e}", exc_info=True)
logger.error(f"创建降级响应失败: {e}")
# 最后的降级策略
try:
if hasattr(response_model, '__pydantic_root_model__') and response_model.__pydantic_root_model__:
return response_model([])
else:
return response_model()
except Exception as final_error:
logger.error(f"最终降级策略也失败: {final_error}")
raise
if hasattr(response_model, 'root'):
return response_model([])
else:
return response_model()
def clear_cache(self):
"""清理客户端缓存"""

View File

@@ -42,19 +42,33 @@
如果状态是TRUE保留这条数据否则需不需要这条数据
### 第五步 输出格式
按照json的形式输出
{"data":"Query":原来Query的字段"history":原来的history字段
"expansion_issue":以为列表的形式存储验证之后的数据比如[
{"query_small": query_small,
"answer_small": answer_small,,
"status": 回答的结果是否符合query_small填写状态,
"query_answer": answer_small},
{"query":"原来Query的字段",
"history":"原来的history字段",
"expansion_issue":以列表的形式存储验证之后的数据比如[
{
"query_small": "张曼婷生日是什么时候?",
"answer_small": "张曼婷喜欢绘画。",
"status": "True",
"query_answer": "张曼 婷喜欢绘画。"
},{}......]
,
"split_result":如果expansion_issue是空的列表返回failed不是空列表返回success,
"reason": 为以上分析完之后的结果给一个说明
}
"query_small": "子问题",
"answer_small": "子问题的回答",
"status": "True或False表示回答是否符合query_small",
"query_answer": "问题的答案与answer_small相同"
},
{
"query_small": "张曼婷生日是什么时候?",
"answer_small": "张曼婷喜欢绘画。",
"status": "False",
"query_answer": "张曼婷喜欢绘画。"
}
],
"split_result":"如果expansion_issue是空的列表返回failed不是空列表返回success",
"reason": "为以上分析完之后的结果给一个说明"
}
**输出格式要求**
**CRITICAL JSON FORMATTING REQUIREMENTS:**
1. Use only standard ASCII double quotes (") for JSON structure - never use Chinese quotation marks ("") or other Unicode quotes
2. If the extracted statement text contains quotation marks, escape them properly using backslashes (\")
3. Ensure all JSON strings are properly closed and comma-separated
4. Do not include line breaks within JSON string values
5. The output language should always be the same as the input language
**JSON Schema:**
{{ json_schema }}

View File

@@ -100,41 +100,6 @@ class OpenAIClient(LLMClient):
logger.error(f"LLM 调用失败: {e}")
raise LLMClientException(f"LLM 调用失败: {e}") from e
async def response(self, messages: List[Dict[str, str]], **kwargs) -> str:
"""
简单响应接口实现用于fallback机制
Args:
messages: 消息列表
**kwargs: 额外参数
Returns:
LLM 响应文本
Raises:
LLMClientException: LLM 调用失败
"""
try:
template = """{messages}"""
prompt = ChatPromptTemplate.from_template(template)
chain = prompt | self.client
# 添加 Langfuse 回调(如果可用)
config = {}
if self.langfuse_handler:
config["callbacks"] = [self.langfuse_handler]
response = await chain.ainvoke({"messages": messages}, config=config)
# 提取文本内容
if hasattr(response, "content"):
return str(response.content)
return str(response)
except Exception as e:
logger.error(f"LLM 调用失败: {e}")
raise LLMClientException(f"LLM 调用失败: {e}") from e
async def response_structured(
self,
messages: List[Dict[str, str]],
@@ -166,206 +131,44 @@ class OpenAIClient(LLMClient):
if self.langfuse_handler:
config["callbacks"] = [self.langfuse_handler]
template = """{question}"""
prompt = ChatPromptTemplate.from_template(template)
# 对于 DashScope 等不支持 with_structured_output 的模型优先使用手动JSON解析
# 这样可以避免不必要的尝试和错误
if self.provider: #.lower() == "dashscope"
logger.info("DashScope 模型直接使用手动JSON解析方法")
try:
# 获取原始响应,添加超时保护
chain = prompt | self.client
response = await asyncio.wait_for(
chain.ainvoke({"question": question_text}, config=config),
timeout=self.timeout
)
# 提取响应文本
response_text = ""
if hasattr(response, "content"):
response_text = str(response.content)
else:
response_text = str(response)
logger.debug(f"LLM原始响应长度: {len(response_text)}")
# 尝试提取JSON内容
json_text = response_text.strip()
# 如果响应包含markdown代码块提取其中的JSON
if "```json" in json_text:
json_text = json_text.split("```json")[1].split("```")[0].strip()
elif "```" in json_text:
json_text = json_text.split("```")[1].split("```")[0].strip()
# 尝试修复常见的JSON格式问题
# 1. 移除可能的BOM标记
json_text = json_text.lstrip('\ufeff')
# 2. 如果JSON被截断缺少结尾的 ] 或 }),尝试修复
if json_text.startswith('[') and not json_text.rstrip().endswith(']'):
logger.warning("检测到JSON数组被截断尝试修复")
# 找到最后一个完整的对象
last_complete_brace = json_text.rfind('}')
if last_complete_brace > 0:
json_text = json_text[:last_complete_brace + 1] + ']'
logger.info(f"修复后的JSON长度: {len(json_text)}")
elif json_text.startswith('{') and not json_text.rstrip().endswith('}'):
logger.warning("检测到JSON对象被截断尝试修复")
# 找到最后一个完整的字段
last_complete_brace = json_text.rfind('}')
if last_complete_brace > 0:
json_text = json_text[:last_complete_brace + 1]
logger.info(f"修复后的JSON长度: {len(json_text)}")
# 解析JSON
try:
parsed_dict = json.loads(json_text)
logger.debug(f"JSON解析成功类型: {type(parsed_dict)}")
# 如果是列表,记录第一个元素的结构
if isinstance(parsed_dict, list) and len(parsed_dict) > 0:
logger.debug(f"第一个元素的键: {list(parsed_dict[0].keys()) if isinstance(parsed_dict[0], dict) else 'not a dict'}")
# 尝试字段映射转换处理LLM返回格式不匹配的情况
if isinstance(parsed_dict, list):
transformed_list = []
for item in parsed_dict:
if isinstance(item, dict):
transformed_item = {}
# 常见的字段映射规则
field_mappings = {
'question': ['extended_question', 'question', 'query'],
'original_question': ['original_question', 'original', 'source_question'],
'extended_question': ['extended_question', 'question', 'query', 'extended'],
'type': ['type', 'category', 'question_type'],
'reason': ['reason', 'explanation', 'rationale'],
'query': ['query', 'question', 'text'],
'split_result': ['split_result', 'result', 'status'],
'expansion_issue': ['expansion_issue', 'issues', 'expansions'],
}
# 对于每个期望的字段,尝试从多个可能的源字段中获取
for target_field, source_fields in field_mappings.items():
for source_field in source_fields:
if source_field in item:
transformed_item[target_field] = item[source_field]
break
# 特殊处理:如果只有 'question' 但缺少 'original_question' 和 'extended_question'
if 'question' in item and 'original_question' not in transformed_item:
transformed_item['original_question'] = item['question']
if 'question' in item and 'extended_question' not in transformed_item:
transformed_item['extended_question'] = item['question']
# 保留原始字段(如果没有被映射)
for key, value in item.items():
if key not in transformed_item:
transformed_item[key] = value
transformed_list.append(transformed_item)
else:
transformed_list.append(item)
logger.info(f"字段映射完成,尝试重新验证")
logger.debug(f"转换后的数据: {transformed_list}")
try:
return response_model.model_validate(transformed_list)
except Exception as retry_error:
logger.error(f"字段映射后仍然验证失败: {retry_error}")
logger.error(f"完整的LLM响应: {response_text}")
logger.error(f"原始解析字典: {parsed_dict}")
logger.error(f"转换后的字典: {transformed_list}")
raise
else:
# 非列表类型,记录并抛出原始错误
logger.error(f"完整的LLM响应: {response_text}")
logger.error(f"解析后的字典: {parsed_dict}")
raise
except json.JSONDecodeError as je:
logger.error(f"JSON解析失败: {je}")
logger.error(f"问题位置附近的文本: {json_text[max(0, je.pos-50):min(len(json_text), je.pos+50)]}")
# 尝试更激进的修复逐行解析找到有效的JSON部分
logger.info("尝试逐行解析JSON")
lines = json_text.split('\n')
for i in range(len(lines), 0, -1):
try:
partial_json = '\n'.join(lines[:i])
if partial_json.startswith('['):
partial_json = partial_json.rstrip().rstrip(',') + ']'
elif partial_json.startswith('{'):
partial_json = partial_json.rstrip().rstrip(',') + '}'
parsed_dict = json.loads(partial_json)
logger.info(f"成功解析部分JSON{i}行)")
return response_model.model_validate(parsed_dict)
except:
continue
# 如果所有尝试都失败,抛出原始错误
raise LLMClientException(f"JSON解析失败: {je}") from je
except asyncio.TimeoutError:
logger.error(f"LLM调用超时{self.timeout}秒)")
raise LLMClientException(f"LLM调用超时{self.timeout}秒)")
except LLMClientException:
raise
except Exception as e:
logger.error(f"手动JSON解析失败: {e}", exc_info=True)
raise LLMClientException(f"手动JSON解析失败: {e}") from e
# 方法 1: 使用 PydanticOutputParser适用于支持的模型
# 方法 1: 使用 PydanticOutputParser
if PydanticOutputParser is not None:
try:
parser = PydanticOutputParser(pydantic_object=response_model)
format_instructions = parser.get_format_instructions()
prompt_with_instructions = ChatPromptTemplate.from_template(
prompt = ChatPromptTemplate.from_template(
"{question}\n{format_instructions}"
)
chain = prompt_with_instructions | self.client | parser
chain = prompt | self.client | parser
parsed = await asyncio.wait_for(
chain.ainvoke(
{
"question": question_text,
"format_instructions": format_instructions,
},
config=config
),
timeout=self.timeout
parsed = await chain.ainvoke(
{
"question": question_text,
"format_instructions": format_instructions,
},
config=config
)
logger.debug(f"使用 PydanticOutputParser 解析成功")
return parsed
except asyncio.TimeoutError:
logger.error(f"PydanticOutputParser 调用超时({self.timeout}秒)")
raise LLMClientException(f"LLM调用超时{self.timeout}秒)")
except Exception as e:
logger.warning(
f"PydanticOutputParser 解析失败,尝试其他方法: {e}"
)
# 方法 2: 使用 LangChain 的 with_structured_output (如果支持)
with_so = getattr(self.client, "with_structured_output", None)
if callable(with_so):
try:
# 方法 2: 使用 LangChain 的 with_structured_output
template = """{question}"""
prompt = ChatPromptTemplate.from_template(template)
try:
with_so = getattr(self.client, "with_structured_output", None)
if callable(with_so):
structured_chain = prompt | with_so(response_model, strict=True)
parsed = await asyncio.wait_for(
structured_chain.ainvoke(
{"question": question_text},
config=config
),
timeout=self.timeout
parsed = await structured_chain.ainvoke(
{"question": question_text},
config=config
)
# 验证并返回结果
@@ -378,60 +181,14 @@ class OpenAIClient(LLMClient):
# 尝试从 JSON 解析
return response_model.model_validate_json(json.dumps(parsed))
except asyncio.TimeoutError:
logger.error(f"with_structured_output 调用超时({self.timeout}秒)")
raise LLMClientException(f"LLM调用超时{self.timeout}秒)")
except NotImplementedError:
logger.warning(
f"模型 {self.model_name} 不支持 with_structured_output使用手动JSON解析"
)
except Exception as e:
logger.warning(f"with_structured_output 失败: {e},尝试手动解析")
# 方法 3: 手动JSON解析fallback方法
logger.info("使用手动JSON解析方法fallback")
try:
# 获取原始响应
chain = prompt | self.client
response = await asyncio.wait_for(
chain.ainvoke({"question": question_text}, config=config),
timeout=self.timeout
)
# 提取响应文本
response_text = ""
if hasattr(response, "content"):
response_text = str(response.content)
else:
response_text = str(response)
logger.debug(f"LLM原始响应: {response_text[:500]}...")
# 尝试提取JSON内容
json_text = response_text.strip()
# 如果响应包含markdown代码块提取其中的JSON
if "```json" in json_text:
json_text = json_text.split("```json")[1].split("```")[0].strip()
elif "```" in json_text:
json_text = json_text.split("```")[1].split("```")[0].strip()
# 解析JSON
parsed_dict = json.loads(json_text)
logger.debug(f"JSON解析成功: {parsed_dict}")
# 验证并创建Pydantic模型
return response_model.model_validate(parsed_dict)
except asyncio.TimeoutError:
logger.error(f"手动JSON解析调用超时{self.timeout}秒)")
raise LLMClientException(f"LLM调用超时{self.timeout}秒)")
except json.JSONDecodeError as je:
logger.error(f"JSON解析失败: {je}, 原始文本: {json_text[:200]}...")
raise LLMClientException(f"JSON解析失败: {je}") from je
except Exception as e:
logger.error(f"手动JSON解析失败: {e}")
raise LLMClientException(f"手动JSON解析失败: {e}") from e
logger.error(f"结构化输出失败: {e}")
raise LLMClientException(f"结构化输出失败: {e}") from e
# 如果所有方法都失败,抛出异常
raise LLMClientException(
"无法生成结构化输出,所有解析方法均失败"
)
except LLMClientException:
raise