feat(models): support reasoning_content streaming

This commit is contained in:
Timebomb2018
2026-04-01 15:47:43 +08:00
parent 9561578a2a
commit 264183cec2
28 changed files with 495 additions and 109 deletions

View File

@@ -14,6 +14,7 @@ from pydantic import BaseModel, Field
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException
from app.models.models_model import ModelProvider, ModelType
from app.core.models.volcano_chat import VolcanoChatOpenAI
T = TypeVar("T")
@@ -25,6 +26,9 @@ class RedBearModelConfig(BaseModel):
api_key: str
base_url: Optional[str] = None
is_omni: bool = False # 是否为 Omni 模型
deep_thinking: bool = False # 是否启用深度思考模式
thinking_budget_tokens: Optional[int] = None # 深度思考 token 预算
support_thinking: bool = False # 模型是否支持 enable_thinking 参数capability 含 thinking
# 请求超时时间(秒)- 默认120秒以支持复杂的LLM调用可通过环境变量 LLM_TIMEOUT 配置
timeout: float = Field(default_factory=lambda: float(os.getenv("LLM_TIMEOUT", "120.0")))
# 最大重试次数 - 默认2次以避免过长等待可通过环境变量 LLM_MAX_RETRIES 配置
@@ -44,7 +48,7 @@ class RedBearModelFactory:
# 打印供应商信息用于调试
from app.core.logging_config import get_business_logger
logger = get_business_logger()
logger.debug(f"获取模型参数 - Provider: {provider}, Model: {config.model_name}, is_omni: {config.is_omni}")
logger.debug(f"获取模型参数 - Provider: {provider}, Model: {config.model_name}, is_omni: {config.is_omni}, deep_thinking: {config.deep_thinking}")
# dashscope 的 omni 模型使用 OpenAI 兼容模式
if provider == ModelProvider.DASHSCOPE and config.is_omni:
@@ -58,7 +62,7 @@ class RedBearModelFactory:
write=60.0,
pool=10.0,
)
params = {
params: Dict[str, Any] = {
"model": config.model_name,
"base_url": config.base_url,
"api_key": config.api_key,
@@ -67,8 +71,19 @@ class RedBearModelFactory:
**config.extra_params
}
# 流式模式下启用 stream_usage 以获取 token 统计
if config.extra_params.get("streaming"):
is_streaming = bool(config.extra_params.get("streaming"))
if is_streaming:
params["stream_usage"] = True
# 只有支持 thinking 的模型才传 enable_thinking
if config.support_thinking:
model_kwargs: Dict[str, Any] = config.extra_params.get("model_kwargs", {})
if is_streaming:
model_kwargs["enable_thinking"] = config.deep_thinking
if config.deep_thinking and config.thinking_budget_tokens:
model_kwargs["thinking_budget"] = config.thinking_budget_tokens
else:
model_kwargs["enable_thinking"] = False
params["model_kwargs"] = model_kwargs
return params
if provider in [ModelProvider.OPENAI, ModelProvider.XINFERENCE, ModelProvider.GPUSTACK, ModelProvider.OLLAMA, ModelProvider.VOLCANO]:
@@ -82,7 +97,7 @@ class RedBearModelFactory:
write=60.0, # 写入超时60秒
pool=10.0, # 连接池超时10秒
)
params = {
params: Dict[str, Any] = {
"model": config.model_name,
"base_url": config.base_url,
"api_key": config.api_key,
@@ -93,17 +108,44 @@ class RedBearModelFactory:
# 流式模式下启用 stream_usage 以获取 token 统计
if config.extra_params.get("streaming"):
params["stream_usage"] = True
# 深度思考模式
is_streaming = bool(config.extra_params.get("streaming"))
if is_streaming:
if provider == ModelProvider.VOLCANO:
# 火山引擎深度思考仅流式调用支持,非流式时不传 thinking 参数
thinking_config: Dict[str, Any] = {
"type": "enabled" if config.deep_thinking else "disabled"
}
if config.deep_thinking and config.thinking_budget_tokens:
thinking_config["budget_tokens"] = config.thinking_budget_tokens
params["extra_body"] = {"thinking": thinking_config}
else:
# 始终显式传递 enable_thinking不支持该参数的模型如 DeepSeek-R1会直接忽略
model_kwargs: Dict[str, Any] = config.extra_params.get("model_kwargs", {})
model_kwargs["enable_thinking"] = config.deep_thinking
if config.deep_thinking and config.thinking_budget_tokens:
model_kwargs["thinking_budget"] = config.thinking_budget_tokens
params["model_kwargs"] = model_kwargs
return params
elif provider == ModelProvider.DASHSCOPE:
# DashScope (通义千问) 使用自己的参数格式
# 注意: DashScopeEmbeddings 不支持 timeout 和 base_url 参数
# 只支持: model, dashscope_api_key, max_retries, client
return {
params = {
"model": config.model_name,
"dashscope_api_key": config.api_key,
"max_retries": config.max_retries,
**config.extra_params
}
# 只有支持 thinking 的模型才传 enable_thinking
if config.support_thinking:
is_streaming = bool(config.extra_params.get("streaming"))
model_kwargs: Dict[str, Any] = config.extra_params.get("model_kwargs", {})
if is_streaming:
model_kwargs["enable_thinking"] = config.deep_thinking
if config.deep_thinking and config.thinking_budget_tokens:
model_kwargs["thinking_budget"] = config.thinking_budget_tokens
else:
model_kwargs["enable_thinking"] = False
params["model_kwargs"] = model_kwargs
return params
elif provider == ModelProvider.BEDROCK:
# Bedrock 使用 AWS 凭证
# api_key 格式: "access_key_id:secret_access_key" 或只是 access_key_id
@@ -142,6 +184,13 @@ class RedBearModelFactory:
elif "region_name" not in params:
params["region_name"] = "us-east-1" # 默认区域
# 深度思考模式Claude 3.7 Sonnet 等支持思考的模型
# 通过 additional_model_request_fields 传递 thinking 块关闭时不传Bedrock 无 disabled 选项)
if config.deep_thinking:
budget = config.thinking_budget_tokens or 10000
params["additional_model_request_fields"] = {
"thinking": {"type": "enabled", "budget_tokens": budget}
}
return params
else:
raise BusinessException(f"不支持的提供商: {provider}", code=BizCode.PROVIDER_NOT_SUPPORTED)
@@ -168,7 +217,9 @@ def get_provider_llm_class(config: RedBearModelConfig, type: ModelType = ModelTy
# dashscope 的 omni 模型使用 OpenAI 兼容模式
if provider == ModelProvider.DASHSCOPE and config.is_omni:
return ChatOpenAI
if provider in [ModelProvider.OPENAI, ModelProvider.XINFERENCE, ModelProvider.GPUSTACK, ModelProvider.VOLCANO]:
if provider == ModelProvider.VOLCANO:
return VolcanoChatOpenAI
if provider in [ModelProvider.OPENAI, ModelProvider.XINFERENCE, ModelProvider.GPUSTACK]:
if type == ModelType.LLM:
return OpenAI
elif type == ModelType.CHAT: