diff --git a/api/app/core/memory/agent/langgraph_graph/nodes/verification_nodes.py b/api/app/core/memory/agent/langgraph_graph/nodes/verification_nodes.py index fbfbe86d..dac7ea14 100644 --- a/api/app/core/memory/agent/langgraph_graph/nodes/verification_nodes.py +++ b/api/app/core/memory/agent/langgraph_graph/nodes/verification_nodes.py @@ -26,22 +26,33 @@ class VerificationNodeService(LLMServiceMixin): # 创建全局服务实例 verification_service = VerificationNodeService() -async def Verify_prompt(state: ReadState,messages_deal): +async def Verify_prompt(state: ReadState, messages_deal: VerificationResult): + """处理验证结果并生成输出格式""" storage_type = state.get('storage_type', '') user_rag_memory_id = state.get('user_rag_memory_id', '') data = state.get('data', '') + + # 将 VerificationItem 对象转换为字典列表 + verified_data = [] + if messages_deal.expansion_issue: + for item in messages_deal.expansion_issue: + if hasattr(item, 'model_dump'): + verified_data.append(item.model_dump()) + elif isinstance(item, dict): + verified_data.append(item) + Verify_result = { "status": messages_deal.split_result, - "verified_data": messages_deal.expansion_issue, + "verified_data": verified_data, "storage_type": storage_type, "user_rag_memory_id": user_rag_memory_id, "_intermediate": { "type": "verification", "title": "Data Verification", "result": messages_deal.split_result, - "reason": messages_deal.reason, - "query": data, - "verified_count": len(messages_deal.expansion_issue), + "reason": messages_deal.reason or "验证完成", + "query": messages_deal.query, + "verified_count": len(verified_data), "storage_type": storage_type, "user_rag_memory_id": user_rag_memory_id } @@ -71,11 +82,16 @@ async def Verify(state: ReadState): } logger.info("Verify: 开始渲染模板") + + # 生成 JSON schema 以指导 LLM 输出正确格式 + json_schema = VerificationResult.model_json_schema() + system_prompt = await verification_service.template_service.render_template( template_name='split_verify_prompt.jinja2', operation_name='split_verify_prompt', history=history, - sentence=messages + sentence=messages, + json_schema=json_schema ) logger.info(f"Verify: 模板渲染完成,prompt length={len(system_prompt)}") @@ -92,10 +108,11 @@ async def Verify(state: ReadState): system_prompt=system_prompt, response_model=VerificationResult, fallback_value={ - "query": content, # 添加必填的 query 字段 - "split_result": "fail", + "query": content, + "history": history if isinstance(history, list) else [], "expansion_issue": [], - "reason": "验证失败" + "split_result": "failed", + "reason": "验证失败或超时" } ), timeout=150.0 # 150秒超时 @@ -105,8 +122,9 @@ async def Verify(state: ReadState): logger.error("Verify: LLM 调用超时(150秒),使用 fallback 值") structured = VerificationResult( query=content, - split_result="fail", + history=history if isinstance(history, list) else [], expansion_issue=[], + split_result="failed", reason="LLM调用超时" ) diff --git a/api/app/core/memory/agent/models/verification_models.py b/api/app/core/memory/agent/models/verification_models.py index bd8896b3..abdce040 100644 --- a/api/app/core/memory/agent/models/verification_models.py +++ b/api/app/core/memory/agent/models/verification_models.py @@ -4,11 +4,29 @@ from typing import List, Optional, Dict, Any from pydantic import BaseModel, Field +class VerificationItem(BaseModel): + """Individual verification item for a query-answer pair.""" + + query_small: str = Field(..., description="子问题") + answer_small: str = Field(..., description="子问题的回答") + status: str = Field(..., description="验证状态:True 或 False") + query_answer: str = Field(..., description="问题的答案(与 answer_small 相同)") + + class VerificationResult(BaseModel): """Result model for verification operation.""" - query: str - expansion_issue: List[Dict[str, Any]] - split_result: str - reason: Optional[str] = None - history: List[Dict[str, Any]] = Field(default_factory=list) + query: str = Field(..., description="原始查询问题") + history: List[Dict[str, Any]] = Field(default_factory=list, description="历史对话记录") + expansion_issue: List[VerificationItem] = Field( + default_factory=list, + description="验证后的数据列表,包含所有通过验证的问答对" + ) + split_result: str = Field( + ..., + description="验证结果状态:success(expansion_issue 非空)或 failed(expansion_issue 为空)" + ) + reason: Optional[str] = Field( + None, + description="验证结果的说明和分析" + ) diff --git a/api/app/core/memory/agent/services/optimized_llm_service.py b/api/app/core/memory/agent/services/optimized_llm_service.py index fce1cd76..68919c4a 100644 --- a/api/app/core/memory/agent/services/optimized_llm_service.py +++ b/api/app/core/memory/agent/services/optimized_llm_service.py @@ -162,35 +162,22 @@ class OptimizedLLMService: return fallback_value elif isinstance(fallback_value, dict): return response_model(**fallback_value) - elif isinstance(fallback_value, list): - # 对于 RootModel[List[...]] 类型,直接传入列表 - if hasattr(response_model, 'model_fields') and 'root' in response_model.model_fields: - return response_model(root=fallback_value) - # 或者尝试直接传入(Pydantic v2 的 RootModel 支持) - return response_model(fallback_value) - + # 尝试创建空的响应模型 - # 检查是否是 RootModel 类型(通过检查 __pydantic_root_model__ 属性) - if hasattr(response_model, '__pydantic_root_model__') and response_model.__pydantic_root_model__: - # RootModel类型 - 传入空列表 - logger.debug(f"创建 RootModel 类型的空响应: {response_model.__name__}") + if hasattr(response_model, 'root'): + # RootModel类型 return response_model([]) else: - # 普通BaseModel类型 - 尝试无参数构造 - logger.debug(f"创建普通 BaseModel 类型的空响应: {response_model.__name__}") + # 普通BaseModel类型 return response_model() - + except Exception as e: - logger.error(f"创建降级响应失败: {e}", exc_info=True) + logger.error(f"创建降级响应失败: {e}") # 最后的降级策略 - try: - if hasattr(response_model, '__pydantic_root_model__') and response_model.__pydantic_root_model__: - return response_model([]) - else: - return response_model() - except Exception as final_error: - logger.error(f"最终降级策略也失败: {final_error}") - raise + if hasattr(response_model, 'root'): + return response_model([]) + else: + return response_model() def clear_cache(self): """清理客户端缓存""" diff --git a/api/app/core/memory/agent/utils/prompt/split_verify_prompt.jinja2 b/api/app/core/memory/agent/utils/prompt/split_verify_prompt.jinja2 index d6ad8cab..5d10304a 100644 --- a/api/app/core/memory/agent/utils/prompt/split_verify_prompt.jinja2 +++ b/api/app/core/memory/agent/utils/prompt/split_verify_prompt.jinja2 @@ -42,19 +42,33 @@ 如果状态是TRUE保留这条数据,否则需不需要这条数据 ### 第五步 输出格式 按照json的形式输出 -{"data":"Query":原来Query的字段,"history":原来的history字段, -"expansion_issue":以为列表的形式存储验证之后的数据比如[ -{"query_small": query_small, - "answer_small": answer_small,, - "status": 回答的结果是否符合query_small,填写状态, - "query_answer": answer_small}, +{"query":"原来Query的字段", +"history":"原来的history字段", +"expansion_issue":以列表的形式存储验证之后的数据比如[ { - "query_small": "张曼婷生日是什么时候?", - "answer_small": "张曼婷喜欢绘画。", - "status": "True", - "query_answer": "张曼 婷喜欢绘画。" - },{}......] -, - "split_result":如果expansion_issue是空的列表返回failed,不是空列表返回success, - "reason": 为以上分析完之后的结果给一个说明 - } \ No newline at end of file + "query_small": "子问题", + "answer_small": "子问题的回答", + "status": "True或False,表示回答是否符合query_small", + "query_answer": "问题的答案(与answer_small相同)" +}, +{ + "query_small": "张曼婷生日是什么时候?", + "answer_small": "张曼婷喜欢绘画。", + "status": "False", + "query_answer": "张曼婷喜欢绘画。" +} +], +"split_result":"如果expansion_issue是空的列表返回failed,不是空列表返回success", +"reason": "为以上分析完之后的结果给一个说明" +} + +**输出格式要求** +**CRITICAL JSON FORMATTING REQUIREMENTS:** +1. Use only standard ASCII double quotes (") for JSON structure - never use Chinese quotation marks ("") or other Unicode quotes +2. If the extracted statement text contains quotation marks, escape them properly using backslashes (\") +3. Ensure all JSON strings are properly closed and comma-separated +4. Do not include line breaks within JSON string values +5. The output language should always be the same as the input language + +**JSON Schema:** +{{ json_schema }} \ No newline at end of file diff --git a/api/app/core/memory/llm_tools/openai_client.py b/api/app/core/memory/llm_tools/openai_client.py index 93c0efd6..dce7b495 100644 --- a/api/app/core/memory/llm_tools/openai_client.py +++ b/api/app/core/memory/llm_tools/openai_client.py @@ -100,41 +100,6 @@ class OpenAIClient(LLMClient): logger.error(f"LLM 调用失败: {e}") raise LLMClientException(f"LLM 调用失败: {e}") from e - async def response(self, messages: List[Dict[str, str]], **kwargs) -> str: - """ - 简单响应接口实现(用于fallback机制) - - Args: - messages: 消息列表 - **kwargs: 额外参数 - - Returns: - LLM 响应文本 - - Raises: - LLMClientException: LLM 调用失败 - """ - try: - template = """{messages}""" - prompt = ChatPromptTemplate.from_template(template) - chain = prompt | self.client - - # 添加 Langfuse 回调(如果可用) - config = {} - if self.langfuse_handler: - config["callbacks"] = [self.langfuse_handler] - - response = await chain.ainvoke({"messages": messages}, config=config) - - # 提取文本内容 - if hasattr(response, "content"): - return str(response.content) - return str(response) - - except Exception as e: - logger.error(f"LLM 调用失败: {e}") - raise LLMClientException(f"LLM 调用失败: {e}") from e - async def response_structured( self, messages: List[Dict[str, str]], @@ -166,206 +131,44 @@ class OpenAIClient(LLMClient): if self.langfuse_handler: config["callbacks"] = [self.langfuse_handler] - template = """{question}""" - prompt = ChatPromptTemplate.from_template(template) - - # 对于 DashScope 等不支持 with_structured_output 的模型,优先使用手动JSON解析 - # 这样可以避免不必要的尝试和错误 - if self.provider: #.lower() == "dashscope" - logger.info("DashScope 模型,直接使用手动JSON解析方法") - try: - # 获取原始响应,添加超时保护 - chain = prompt | self.client - response = await asyncio.wait_for( - chain.ainvoke({"question": question_text}, config=config), - timeout=self.timeout - ) - - # 提取响应文本 - response_text = "" - if hasattr(response, "content"): - response_text = str(response.content) - else: - response_text = str(response) - - logger.debug(f"LLM原始响应长度: {len(response_text)}") - - # 尝试提取JSON内容 - json_text = response_text.strip() - - # 如果响应包含markdown代码块,提取其中的JSON - if "```json" in json_text: - json_text = json_text.split("```json")[1].split("```")[0].strip() - elif "```" in json_text: - json_text = json_text.split("```")[1].split("```")[0].strip() - - # 尝试修复常见的JSON格式问题 - # 1. 移除可能的BOM标记 - json_text = json_text.lstrip('\ufeff') - - # 2. 如果JSON被截断(缺少结尾的 ] 或 }),尝试修复 - if json_text.startswith('[') and not json_text.rstrip().endswith(']'): - logger.warning("检测到JSON数组被截断,尝试修复") - # 找到最后一个完整的对象 - last_complete_brace = json_text.rfind('}') - if last_complete_brace > 0: - json_text = json_text[:last_complete_brace + 1] + ']' - logger.info(f"修复后的JSON长度: {len(json_text)}") - elif json_text.startswith('{') and not json_text.rstrip().endswith('}'): - logger.warning("检测到JSON对象被截断,尝试修复") - # 找到最后一个完整的字段 - last_complete_brace = json_text.rfind('}') - if last_complete_brace > 0: - json_text = json_text[:last_complete_brace + 1] - logger.info(f"修复后的JSON长度: {len(json_text)}") - - # 解析JSON - try: - parsed_dict = json.loads(json_text) - logger.debug(f"JSON解析成功,类型: {type(parsed_dict)}") - - # 如果是列表,记录第一个元素的结构 - if isinstance(parsed_dict, list) and len(parsed_dict) > 0: - logger.debug(f"第一个元素的键: {list(parsed_dict[0].keys()) if isinstance(parsed_dict[0], dict) else 'not a dict'}") - - # 尝试字段映射转换(处理LLM返回格式不匹配的情况) - if isinstance(parsed_dict, list): - transformed_list = [] - for item in parsed_dict: - if isinstance(item, dict): - transformed_item = {} - - # 常见的字段映射规则 - field_mappings = { - 'question': ['extended_question', 'question', 'query'], - 'original_question': ['original_question', 'original', 'source_question'], - 'extended_question': ['extended_question', 'question', 'query', 'extended'], - 'type': ['type', 'category', 'question_type'], - 'reason': ['reason', 'explanation', 'rationale'], - 'query': ['query', 'question', 'text'], - 'split_result': ['split_result', 'result', 'status'], - 'expansion_issue': ['expansion_issue', 'issues', 'expansions'], - } - - # 对于每个期望的字段,尝试从多个可能的源字段中获取 - for target_field, source_fields in field_mappings.items(): - for source_field in source_fields: - if source_field in item: - transformed_item[target_field] = item[source_field] - break - - # 特殊处理:如果只有 'question' 但缺少 'original_question' 和 'extended_question' - if 'question' in item and 'original_question' not in transformed_item: - transformed_item['original_question'] = item['question'] - if 'question' in item and 'extended_question' not in transformed_item: - transformed_item['extended_question'] = item['question'] - - # 保留原始字段(如果没有被映射) - for key, value in item.items(): - if key not in transformed_item: - transformed_item[key] = value - - transformed_list.append(transformed_item) - else: - transformed_list.append(item) - - logger.info(f"字段映射完成,尝试重新验证") - logger.debug(f"转换后的数据: {transformed_list}") - - try: - return response_model.model_validate(transformed_list) - except Exception as retry_error: - logger.error(f"字段映射后仍然验证失败: {retry_error}") - logger.error(f"完整的LLM响应: {response_text}") - logger.error(f"原始解析字典: {parsed_dict}") - logger.error(f"转换后的字典: {transformed_list}") - raise - else: - # 非列表类型,记录并抛出原始错误 - logger.error(f"完整的LLM响应: {response_text}") - logger.error(f"解析后的字典: {parsed_dict}") - raise - except json.JSONDecodeError as je: - logger.error(f"JSON解析失败: {je}") - logger.error(f"问题位置附近的文本: {json_text[max(0, je.pos-50):min(len(json_text), je.pos+50)]}") - - # 尝试更激进的修复:逐行解析,找到有效的JSON部分 - logger.info("尝试逐行解析JSON") - lines = json_text.split('\n') - for i in range(len(lines), 0, -1): - try: - partial_json = '\n'.join(lines[:i]) - if partial_json.startswith('['): - partial_json = partial_json.rstrip().rstrip(',') + ']' - elif partial_json.startswith('{'): - partial_json = partial_json.rstrip().rstrip(',') + '}' - - parsed_dict = json.loads(partial_json) - logger.info(f"成功解析部分JSON(前{i}行)") - return response_model.model_validate(parsed_dict) - except: - continue - - # 如果所有尝试都失败,抛出原始错误 - raise LLMClientException(f"JSON解析失败: {je}") from je - - except asyncio.TimeoutError: - logger.error(f"LLM调用超时({self.timeout}秒)") - raise LLMClientException(f"LLM调用超时({self.timeout}秒)") - except LLMClientException: - raise - except Exception as e: - logger.error(f"手动JSON解析失败: {e}", exc_info=True) - raise LLMClientException(f"手动JSON解析失败: {e}") from e - - - - - - # 方法 1: 使用 PydanticOutputParser(适用于支持的模型) + # 方法 1: 使用 PydanticOutputParser if PydanticOutputParser is not None: try: parser = PydanticOutputParser(pydantic_object=response_model) format_instructions = parser.get_format_instructions() - prompt_with_instructions = ChatPromptTemplate.from_template( + prompt = ChatPromptTemplate.from_template( "{question}\n{format_instructions}" ) - chain = prompt_with_instructions | self.client | parser + chain = prompt | self.client | parser - parsed = await asyncio.wait_for( - chain.ainvoke( - { - "question": question_text, - "format_instructions": format_instructions, - }, - config=config - ), - timeout=self.timeout + parsed = await chain.ainvoke( + { + "question": question_text, + "format_instructions": format_instructions, + }, + config=config ) logger.debug(f"使用 PydanticOutputParser 解析成功") return parsed - except asyncio.TimeoutError: - logger.error(f"PydanticOutputParser 调用超时({self.timeout}秒)") - raise LLMClientException(f"LLM调用超时({self.timeout}秒)") except Exception as e: logger.warning( f"PydanticOutputParser 解析失败,尝试其他方法: {e}" ) - # 方法 2: 使用 LangChain 的 with_structured_output (如果支持) - with_so = getattr(self.client, "with_structured_output", None) - - if callable(with_so): - try: + # 方法 2: 使用 LangChain 的 with_structured_output + template = """{question}""" + prompt = ChatPromptTemplate.from_template(template) + + try: + with_so = getattr(self.client, "with_structured_output", None) + + if callable(with_so): structured_chain = prompt | with_so(response_model, strict=True) - parsed = await asyncio.wait_for( - structured_chain.ainvoke( - {"question": question_text}, - config=config - ), - timeout=self.timeout + parsed = await structured_chain.ainvoke( + {"question": question_text}, + config=config ) # 验证并返回结果 @@ -378,60 +181,14 @@ class OpenAIClient(LLMClient): # 尝试从 JSON 解析 return response_model.model_validate_json(json.dumps(parsed)) - except asyncio.TimeoutError: - logger.error(f"with_structured_output 调用超时({self.timeout}秒)") - raise LLMClientException(f"LLM调用超时({self.timeout}秒)") - except NotImplementedError: - logger.warning( - f"模型 {self.model_name} 不支持 with_structured_output,使用手动JSON解析" - ) - except Exception as e: - logger.warning(f"with_structured_output 失败: {e},尝试手动解析") - - # 方法 3: 手动JSON解析(fallback方法) - logger.info("使用手动JSON解析方法(fallback)") - try: - # 获取原始响应 - chain = prompt | self.client - response = await asyncio.wait_for( - chain.ainvoke({"question": question_text}, config=config), - timeout=self.timeout - ) - - # 提取响应文本 - response_text = "" - if hasattr(response, "content"): - response_text = str(response.content) - else: - response_text = str(response) - - logger.debug(f"LLM原始响应: {response_text[:500]}...") - - # 尝试提取JSON内容 - json_text = response_text.strip() - - # 如果响应包含markdown代码块,提取其中的JSON - if "```json" in json_text: - json_text = json_text.split("```json")[1].split("```")[0].strip() - elif "```" in json_text: - json_text = json_text.split("```")[1].split("```")[0].strip() - - # 解析JSON - parsed_dict = json.loads(json_text) - logger.debug(f"JSON解析成功: {parsed_dict}") - - # 验证并创建Pydantic模型 - return response_model.model_validate(parsed_dict) - - except asyncio.TimeoutError: - logger.error(f"手动JSON解析调用超时({self.timeout}秒)") - raise LLMClientException(f"LLM调用超时({self.timeout}秒)") - except json.JSONDecodeError as je: - logger.error(f"JSON解析失败: {je}, 原始文本: {json_text[:200]}...") - raise LLMClientException(f"JSON解析失败: {je}") from je except Exception as e: - logger.error(f"手动JSON解析失败: {e}") - raise LLMClientException(f"手动JSON解析失败: {e}") from e + logger.error(f"结构化输出失败: {e}") + raise LLMClientException(f"结构化输出失败: {e}") from e + + # 如果所有方法都失败,抛出异常 + raise LLMClientException( + "无法生成结构化输出,所有解析方法均失败" + ) except LLMClientException: raise