feat(llm): add json_output support for structured LLM responses

This commit is contained in:
Timebomb2018
2026-04-16 16:27:55 +08:00
parent 5ce0bdb0f5
commit 8c6b65db12
23 changed files with 304 additions and 110 deletions

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import os
from typing import Any, Dict, Optional, TypeVar
from typing import Any, Dict, List, Optional, TypeVar
from langchain_aws import ChatBedrock
from langchain_community.chat_models import ChatTongyi
@@ -9,7 +9,7 @@ from langchain_core.embeddings import Embeddings
from langchain_core.language_models import BaseLLM
from langchain_ollama import OllamaLLM
from langchain_openai import ChatOpenAI, OpenAI
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException
@@ -25,10 +25,11 @@ class RedBearModelConfig(BaseModel):
provider: str
api_key: str
base_url: Optional[str] = None
capability: List[str] = Field(default_factory=list) # 模型能力列表,驱动所有能力开关
is_omni: bool = False # 是否为 Omni 模型
deep_thinking: bool = False # 是否启用深度思考模式
thinking_budget_tokens: Optional[int] = None # 深度思考 token 预算
support_thinking: bool = False # 模型是否支持 enable_thinking 参数capability 含 thinking
json_output: bool = False # 是否强制 JSON 输出
# 请求超时时间(秒)- 默认120秒以支持复杂的LLM调用可通过环境变量 LLM_TIMEOUT 配置
timeout: float = Field(default_factory=lambda: float(os.getenv("LLM_TIMEOUT", "120.0")))
# 最大重试次数 - 默认2次以避免过长等待可通过环境变量 LLM_MAX_RETRIES 配置
@@ -36,6 +37,29 @@ class RedBearModelConfig(BaseModel):
concurrency: int = 5 # 并发限流
extra_params: Dict[str, Any] = {}
@model_validator(mode="after")
def _resolve_capabilities(self) -> "RedBearModelConfig":
from app.core.logging_config import get_business_logger
logger = get_business_logger()
if self.deep_thinking and "thinking" not in self.capability:
logger.warning(
f"模型 {self.model_name} 不支持深度思考capability 中无 'thinking'),已自动关闭 deep_thinking"
)
self.deep_thinking = False
self.thinking_budget_tokens = None
if self.json_output and "json_output" not in self.capability:
logger.warning(
f"模型 {self.model_name} 不支持 JSON 输出capability 中无 'json_output'),已自动关闭 json_output"
)
self.json_output = False
if self.json_output and self.deep_thinking:
logger.warning(
f"模型 {self.model_name} json_output 与 deep_thinking 互斥,已自动关闭 deep_thinking"
)
self.deep_thinking = False
self.thinking_budget_tokens = None
return self
class RedBearModelFactory:
"""模型工厂类"""
@@ -74,17 +98,20 @@ class RedBearModelFactory:
is_streaming = bool(config.extra_params.get("streaming"))
if is_streaming:
params["stream_usage"] = True
# 只有支持 thinking 的模型传 enable_thinking
if config.support_thinking:
model_kwargs: Dict[str, Any] = config.extra_params.get("model_kwargs", {})
if is_streaming:
model_kwargs["enable_thinking"] = config.deep_thinking
if config.deep_thinking:
model_kwargs["incremental_output"] = True
if config.thinking_budget_tokens:
model_kwargs["thinking_budget"] = config.thinking_budget_tokens
else:
model_kwargs["enable_thinking"] = False
# 支持 thinking 的模型始终传 enable_thinking,关闭时显式传 False 避免模型默认开启思考
if "thinking" in config.capability:
extra_body = params.setdefault("extra_body", {})
if config.deep_thinking:
extra_body["enable_thinking"] = False
if is_streaming:
extra_body["enable_thinking"] = True
if config.thinking_budget_tokens:
extra_body["thinking_budget"] = config.thinking_budget_tokens
params["extra_body"] = extra_body
# JSON 输出模式
if config.json_output:
model_kwargs = params.setdefault("model_kwargs", {})
model_kwargs["response_format"] = {"type": "json_object"}
params["model_kwargs"] = model_kwargs
return params
@@ -108,27 +135,30 @@ class RedBearModelFactory:
**config.extra_params
}
# 流式模式下启用 stream_usage 以获取 token 统计
if config.extra_params.get("streaming"):
params["stream_usage"] = True
# 深度思考模式
is_streaming = bool(config.extra_params.get("streaming"))
if config.support_thinking:
if is_streaming and not config.is_omni:
if provider == ModelProvider.VOLCANO:
# 火山引擎深度思考仅流式调用支持,非流式时不传 thinking 参数
thinking_config: Dict[str, Any] = {
"type": "enabled" if config.deep_thinking else "disabled"
}
if config.deep_thinking and config.thinking_budget_tokens:
thinking_config["budget_tokens"] = config.thinking_budget_tokens
params["extra_body"] = {"thinking": thinking_config}
else:
# 始终显式传递 enable_thinking不支持该参数的模型如 DeepSeek-R1会直接忽略
model_kwargs: Dict[str, Any] = config.extra_params.get("model_kwargs", {})
model_kwargs["enable_thinking"] = config.deep_thinking
if config.deep_thinking and config.thinking_budget_tokens:
model_kwargs["thinking_budget"] = config.thinking_budget_tokens
params["model_kwargs"] = model_kwargs
if is_streaming:
params["stream_usage"] = True
# 支持 thinking 的模型始终传 enable_thinking关闭时显式传 False 避免模型默认开启思考
if "thinking" in config.capability:
# VOLCANO 深度思考仅流式支持
if provider == ModelProvider.VOLCANO:
thinking_config: Dict[str, Any] = {"type": "enabled" if config.deep_thinking else "disabled"}
if config.deep_thinking and config.thinking_budget_tokens:
thinking_config["budget_tokens"] = config.thinking_budget_tokens
params["extra_body"] = {"thinking": thinking_config}
else:
extra_body = params.setdefault("extra_body", {})
if config.deep_thinking:
extra_body["enable_thinking"] = False
if is_streaming:
extra_body["enable_thinking"] = True
if config.thinking_budget_tokens:
extra_body["thinking_budget"] = config.thinking_budget_tokens
params["extra_body"] = extra_body
# JSON 输出模式
if config.json_output:
params.setdefault("model_kwargs", {})
params["model_kwargs"]["response_format"] = {"type": "json_object"}
return params
elif provider == ModelProvider.DASHSCOPE:
params = {
@@ -137,18 +167,21 @@ class RedBearModelFactory:
"max_retries": config.max_retries,
**config.extra_params
}
# 只有支持 thinking 的模型传 enable_thinking
if config.support_thinking:
# 支持 thinking 的模型始终传 enable_thinking,关闭时显式传 False 避免模型默认开启思考
if "thinking" in config.capability:
is_streaming = bool(config.extra_params.get("streaming"))
model_kwargs: Dict[str, Any] = config.extra_params.get("model_kwargs", {})
if is_streaming:
model_kwargs["enable_thinking"] = config.deep_thinking
if config.deep_thinking:
model_kwargs["incremental_output"] = True
if config.thinking_budget_tokens:
model_kwargs["thinking_budget"] = config.thinking_budget_tokens
else:
model_kwargs = params.setdefault("model_kwargs", {})
if config.deep_thinking:
model_kwargs["enable_thinking"] = False
if is_streaming:
model_kwargs["enable_thinking"] = True
model_kwargs["incremental_output"] = True
if config.thinking_budget_tokens:
model_kwargs["thinking_budget"] = config.thinking_budget_tokens
params["model_kwargs"] = model_kwargs
if config.json_output:
model_kwargs = params.setdefault("model_kwargs", {})
model_kwargs["response_format"] = {"type": "json_object"}
params["model_kwargs"] = model_kwargs
return params
elif provider == ModelProvider.BEDROCK:
@@ -196,6 +229,10 @@ class RedBearModelFactory:
params["additional_model_request_fields"] = {
"thinking": {"type": "enabled", "budget_tokens": budget}
}
# JSON 输出模式
if config.json_output:
params.setdefault("model_kwargs", {})
params["model_kwargs"]["response_format"] = {"type": "json_object"}
return params
else:
raise BusinessException(f"不支持的提供商: {provider}", code=BizCode.PROVIDER_NOT_SUPPORTED)