feat(model and app):

1. Increase support for visual models and multimodal models;
2. The application and workflow can input various multimodal files such as images, documents, audio, and videos.
This commit is contained in:
Timebomb2018
2026-03-05 09:55:54 +08:00
parent 23bfdcefef
commit 590ec3a446
26 changed files with 958 additions and 233 deletions

View File

@@ -9,47 +9,100 @@
- OpenAI: 支持 URL 和 base64 格式
"""
import uuid
from typing import List, Dict, Any, Optional, Protocol
import httpx
import base64
from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod
from sqlalchemy.orm import Session
from docx import Document
import io
import PyPDF2
from app.core.logging_config import get_business_logger
from app.core.exceptions import BusinessException
from app.core.error_codes import BizCode
from app.schemas.app_schema import FileInput, FileType, TransferMethod
from app.models.generic_file_model import GenericFile
from app.models.file_metadata_model import FileMetadata
from app.core.config import settings
from app.services.audio_transcription_service import AudioTranscriptionService
logger = get_business_logger()
class ImageFormatStrategy(Protocol):
"""图片格式策略接口"""
class MultimodalFormatStrategy(ABC):
"""多模态格式策略基类"""
@abstractmethod
async def format_image(self, url: str) -> Dict[str, Any]:
"""格式化图片"""
pass
@abstractmethod
async def format_document(self, file_name: str, text: str) -> Dict[str, Any]:
"""格式化文档"""
pass
@abstractmethod
async def format_audio(self, file_type: str, url: str) -> Dict[str, Any]:
"""格式化音频"""
pass
@abstractmethod
async def format_video(self, url: str) -> Dict[str, Any]:
"""格式化视频"""
pass
class DashScopeFormatStrategy(MultimodalFormatStrategy):
"""通义千问策略"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""将图片 URL 转换为特定 provider 的格式"""
...
class DashScopeImageStrategy:
"""通义千问图片格式策略"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""通义千问格式: {"type": "image", "image": "url"}"""
"""通义千问图片格式:{"type": "image", "image": "url"}"""
return {
"type": "image",
"image": url
}
async def format_document(self, file_name: str, text: str) -> Dict[str, Any]:
"""通义千问文档格式"""
return {
"type": "text",
"text": f"<document name=\"{file_name}\">\n{text}\n</document>"
}
class BedrockImageStrategy:
"""Bedrock/Anthropic 图片格式策略"""
async def format_audio(self, file_type: str, url: str, transcription: Optional[str] = None) -> Dict[str, Any]:
"""
通义千问音频格式
- 原生支持: qwen-audio 系列
- 其他模型: 需要转录为文本
"""
if transcription:
return {
"type": "text",
"text": f"<audio url=\"{url}\">\n{transcription}\n</audio>"
}
# 通义千问音频格式:{"type": "audio", "audio": "url"}
return {
"type": "audio",
"audio": url
}
async def format_video(self, url: str) -> Dict[str, Any]:
"""通义千问视频格式qwen-vl 系列原生支持)"""
return {
"type": "video",
"video": url
}
class BedrockFormatStrategy(MultimodalFormatStrategy):
"""Bedrock/Anthropic 策略"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""
Bedrock/Anthropic 格式: base64 编码
{"type": "image", "source": {"type": "base64", "media_type": "...", "data": "..."}}
"""
import httpx
import base64
from mimetypes import guess_type
logger.info(f"下载并编码图片: {url}")
@@ -84,9 +137,46 @@ class BedrockImageStrategy:
}
}
async def format_document(self, file_name: str, text: str) -> Dict[str, Any]:
"""Bedrock/Anthropic 文档格式(需要 base64 编码)"""
# Bedrock 文档需要 base64 编码
text_bytes = text.encode('utf-8')
base64_text = base64.b64encode(text_bytes).decode('utf-8')
class OpenAIImageStrategy:
"""OpenAI 图片格式策略"""
return {
"type": "document",
"source": {
"type": "base64",
"media_type": "text/plain",
"data": base64_text
}
}
async def format_audio(self, file_type: str, url: str, transcription: Optional[str] = None) -> Dict[str, Any]:
"""
Bedrock/Anthropic 音频格式
不支持原生音频,必须转录为文本
"""
if transcription:
return {
"type": "text",
"text": f"[音频转录]\n{transcription}"
}
return {
"type": "text",
"text": "[音频文件Bedrock 不支持原生音频,请启用音频转文本功能]"
}
async def format_video(self, url: str) -> Dict[str, Any]:
"""Bedrock/Anthropic 视频格式"""
return {
"type": "text",
"text": f"<video url=\"{url}\">\n[视频文件,当前 provider 暂不支持]\n</video>"
}
class OpenAIFormatStrategy(MultimodalFormatStrategy):
"""OpenAI 策略"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""OpenAI 格式: {"type": "image_url", "image_url": {"url": "..."}}"""
@@ -97,29 +187,97 @@ class OpenAIImageStrategy:
}
}
async def format_document(self, file_name: str, text: str) -> Dict[str, Any]:
"""OpenAI 文档格式"""
return {
"type": "text",
"text": f"<document name=\"{file_name}\">\n{text}\n</document>"
}
async def format_audio(self, file_type: str, url: str, transcription: Optional[str] = None) -> Dict[str, Any]:
"""
OpenAI 音频格式
- gpt-4o-audio 系列支持原生音频(需要 base64 编码)
- 其他模型使用转录文本
"""
if transcription:
return {
"type": "text",
"text": f"<audio url=\"{url}\">\n{transcription}\n</audio>"
}
# OpenAI 音频需要 base64 编码
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
response.raise_for_status()
audio_data = response.content
base64_audio = base64.b64encode(audio_data).decode('utf-8')
# 1. 优先从 file_type (MIME) 取扩展名
file_ext = file_type.split('/')[-1] if file_type and '/' in file_type else None
# 2. 从响应头 content-type 取
if not file_ext:
ct = response.headers.get("content-type", "")
file_ext = ct.split('/')[-1].split(';')[0].strip() if '/' in ct else None
# 3. 从 URL 路径取扩展名
if not file_ext:
file_ext = url.split('?')[0].rsplit('.', 1)[-1].lower() or None
# 4. 默认 wav
# supported_ext = {"wav", "mp3", "mp4", "ogg", "flac", "webm", "m4a", "wave", "x-m4a"}
file_ext = "wav" if not file_ext else file_ext
return {
"type": "input_audio",
"input_audio": {
"data": f"data:;base64,{base64_audio}",
"format": file_ext
}
}
except Exception as e:
logger.error(f"下载音频失败: {e}")
return {
"type": "text",
"text": f"[音频处理失败: {str(e)}]"
}
async def format_video(self, url: str) -> Dict[str, Any]:
"""OpenAI 视频格式"""
return {
"type": "video_url",
"video_url": {
"url": url
}
}
# Provider 到策略的映射
PROVIDER_STRATEGIES = {
"dashscope": DashScopeImageStrategy,
"bedrock": BedrockImageStrategy,
"anthropic": BedrockImageStrategy,
"openai": OpenAIImageStrategy,
"dashscope": DashScopeFormatStrategy,
"bedrock": BedrockFormatStrategy,
"anthropic": BedrockFormatStrategy,
"openai": OpenAIFormatStrategy,
}
class MultimodalService:
"""多模态文件处理服务"""
def __init__(self, db: Session, provider: str = "dashscope"):
def __init__(self, db: Session, provider: str = "dashscope", api_key: Optional[str] = None, enable_audio_transcription: bool = False, is_omni: bool = False):
"""
初始化多模态服务
Args:
db: 数据库会话
provider: 模型提供商dashscope, bedrock, anthropic 等)
provider: 模型提供商dashscope, bedrock, anthropic, openai 等)
api_key: API 密钥(用于音频转文本)
enable_audio_transcription: 是否启用音频转文本
is_omni: 是否为 Omni 模型dashscope 的 omni 模型需要使用 OpenAI 兼容格式)
"""
self.db = db
self.provider = provider.lower()
self.api_key = api_key
self.enable_audio_transcription = enable_audio_transcription
self.is_omni = is_omni
async def process_files(
self,
@@ -137,20 +295,32 @@ class MultimodalService:
if not files:
return []
# 获取对应的策略
# dashscope 的 omni 模型使用 OpenAI 兼容格式
if self.provider == "dashscope" and self.is_omni:
strategy_class = OpenAIFormatStrategy
else:
strategy_class = PROVIDER_STRATEGIES.get(self.provider)
if not strategy_class:
logger.warning(f"未找到 provider '{self.provider}' 的策略,使用默认策略")
strategy_class = DashScopeFormatStrategy
strategy = strategy_class()
result = []
for idx, file in enumerate(files):
try:
if file.type == FileType.IMAGE:
content = await self._process_image(file)
content = await self._process_image(file, strategy)
result.append(content)
elif file.type == FileType.DOCUMENT:
content = await self._process_document(file)
content = await self._process_document(file, strategy)
result.append(content)
elif file.type == FileType.AUDIO:
content = await self._process_audio(file)
content = await self._process_audio(file, strategy)
result.append(content)
elif file.type == FileType.VIDEO:
content = await self._process_video(file)
content = await self._process_video(file, strategy)
result.append(content)
else:
logger.warning(f"不支持的文件类型: {file.type}")
@@ -172,55 +342,29 @@ class MultimodalService:
logger.info(f"成功处理 {len(result)}/{len(files)} 个文件provider={self.provider}")
return result
async def _process_image(self, file: FileInput) -> Dict[str, Any]:
async def _process_image(self, file: FileInput, strategy) -> Dict[str, Any]:
"""
处理图片文件
Args:
file: 图片文件输入
strategy: 格式化策略
Returns:
Dict: 根据 provider 返回不同格式
- Anthropic/Bedrock: {"type": "image", "source": {"type": "base64", "media_type": "...", "data": "..."}}
- 通义千问: {"type": "image", "image": "url"}
Dict: 根据 provider 返回不同格式的图片内容
"""
url = await self.get_file_url(file)
logger.debug(f"处理图片: {url}, provider={self.provider}")
# 根据 provider 返回不同格式
if self.provider in ["bedrock", "anthropic"]:
# Anthropic/Bedrock 只支持 base64 格式,需要下载并转换
try:
logger.info(f"开始下载并编码图片: {url}")
base64_data, media_type = await self._download_and_encode_image(url)
result = {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data[:100] + "..." # 只记录前100个字符
}
}
logger.info(f"图片编码完成: media_type={media_type}, data_length={len(base64_data)}")
# 返回完整数据
result["source"]["data"] = base64_data
return result
except Exception as e:
logger.error(f"下载并编码图片失败: {e}", exc_info=True)
# 返回错误提示
return {
"type": "text",
"text": f"[图片加载失败: {str(e)}]"
}
else:
# 通义千问等其他格式支持 URL
try:
url = await self.get_file_url(file)
return await strategy.format_image(url)
except Exception as e:
logger.error(f"处理图片失败: {e}", exc_info=True)
return {
"type": "image",
"image": url
"type": "text",
"text": f"[图片处理失败: {str(e)}]"
}
async def _download_and_encode_image(self, url: str) -> tuple[str, str]:
@staticmethod
async def _download_and_encode_image(url: str) -> tuple[str, str]:
"""
下载图片并转换为 base64
@@ -230,8 +374,6 @@ class MultimodalService:
Returns:
tuple: (base64_data, media_type)
"""
import httpx
import base64
from mimetypes import guess_type
# 下载图片
@@ -258,15 +400,16 @@ class MultimodalService:
return base64_data, media_type
async def _process_document(self, file: FileInput) -> Dict[str, Any]:
async def _process_document(self, file: FileInput, strategy) -> Dict[str, Any]:
"""
处理文档文件PDF、Word 等)
Args:
file: 文档文件输入
strategy: 格式化策略
Returns:
Dict: text 格式的内容(包含提取的文本)
Dict: 根据 provider 返回不同格式的文档内容
"""
if file.transfer_method == TransferMethod.REMOTE_URL:
# 远程文档暂不支持提取
@@ -277,48 +420,68 @@ class MultimodalService:
else:
# 本地文件,提取文本内容
text = await self._extract_document_text(file.upload_file_id)
generic_file = self.db.query(GenericFile).filter(
GenericFile.id == file.upload_file_id
file_metadata = self.db.query(FileMetadata).filter(
FileMetadata.id == file.upload_file_id
).first()
file_name = generic_file.file_name if generic_file else "unknown"
file_name = file_metadata.file_name if file_metadata else "unknown"
return {
"type": "text",
"text": f"<document name=\"{file_name}\">\n{text}\n</document>"
}
# 使用策略格式化文档
return await strategy.format_document(file_name, text)
async def _process_audio(self, file: FileInput) -> Dict[str, Any]:
async def _process_audio(self, file: FileInput, strategy) -> Dict[str, Any]:
"""
处理音频文件
Args:
file: 音频文件输入
strategy: 格式化策略
Returns:
Dict: 音频内容(暂时返回占位符)
Dict: 根据 provider 返回不同格式的音频内容
"""
# TODO: 实现音频转文字功能
return {
"type": "text",
"text": "[音频文件,暂不支持处理]"
}
try:
url = await self.get_file_url(file)
async def _process_video(self, file: FileInput) -> Dict[str, Any]:
# 如果启用音频转文本且有 API Key
transcription = None
if self.enable_audio_transcription and self.api_key:
logger.info(f"开始音频转文本: {url}")
if self.provider == "dashscope":
transcription = await AudioTranscriptionService.transcribe_dashscope(url, self.api_key)
elif self.provider == "openai":
transcription = await AudioTranscriptionService.transcribe_openai(url, self.api_key)
else:
logger.warning(f"Provider {self.provider} 不支持音频转文本")
return await strategy.format_audio(file.file_type, url, transcription)
except Exception as e:
logger.error(f"处理音频失败: {e}", exc_info=True)
return {
"type": "text",
"text": f"[音频处理失败: {str(e)}]"
}
async def _process_video(self, file: FileInput, strategy) -> Dict[str, Any]:
"""
处理视频文件
Args:
file: 视频文件输入
strategy: 格式化策略
Returns:
Dict: 视频内容(暂时返回占位符)
Dict: 根据 provider 返回不同格式的视频内容
"""
# TODO: 实现视频处理功能
return {
"type": "text",
"text": "[视频文件,暂不支持处理]"
}
try:
url = await self.get_file_url(file)
return await strategy.format_video(url)
except Exception as e:
logger.error(f"处理视频失败: {e}", exc_info=True)
return {
"type": "text",
"text": f"[视频处理失败: {str(e)}]"
}
async def get_file_url(self, file: FileInput) -> str:
"""
@@ -336,26 +499,22 @@ class MultimodalService:
if file.transfer_method == TransferMethod.REMOTE_URL:
return file.url
else:
# 本地文件,通过 file_storage 系统获取永久访问 URL
from app.models.file_metadata_model import FileMetadata
from app.core.config import settings
file_id = file.upload_file_id
print("="*50)
print("file_id",file_id)
# 查询 FileMetadata
file_metadata = self.db.query(FileMetadata).filter(
FileMetadata.id == file_id,
FileMetadata.status == "completed"
).first()
if not file_metadata:
raise BusinessException(
f"文件不存在或已删除: {file_id}",
BizCode.NOT_FOUND
)
# 返回永久URL
server_url = settings.FILE_LOCAL_SERVER_URL
return f"{server_url}/storage/permanent/{file_id}"
@@ -370,58 +529,79 @@ class MultimodalService:
Returns:
str: 提取的文本内容
"""
generic_file = self.db.query(GenericFile).filter(
GenericFile.id == file_id,
GenericFile.status == "active"
file_metadata = self.db.query(FileMetadata).filter(
FileMetadata.id == file_id,
FileMetadata.status == "completed"
).first()
if not generic_file:
if not file_metadata:
raise BusinessException(
f"文件不存在或已删除: {file_id}",
BizCode.NOT_FOUND
)
# TODO: 根据文件类型提取文本
# - PDF: 使用 PyPDF2 或 pdfplumber
# - Word: 使用 python-docx
# - TXT/MD: 直接读取
file_ext = generic_file.file_ext.lower()
file_ext = file_metadata.file_ext.lower()
server_url = settings.FILE_LOCAL_SERVER_URL
file_url = f"{server_url}/storage/permanent/{file_id}"
if file_ext in ['.txt', '.md', '.markdown']:
return await self._read_text_file(generic_file.storage_path)
return await self._read_text_file(file_url)
elif file_ext == '.pdf':
return await self._extract_pdf_text(generic_file.storage_path)
return await self._extract_pdf_text(file_url)
elif file_ext in ['.doc', '.docx']:
return await self._extract_word_text(generic_file.storage_path)
return await self._extract_word_text(file_url)
else:
return f"[不支持的文档格式: {file_ext}]"
async def _read_text_file(self, storage_path: str) -> str:
@staticmethod
async def _read_text_file(file_url: str) -> str:
"""读取纯文本文件"""
try:
with open(storage_path, 'r', encoding='utf-8') as f:
return f.read()
# 下载文件
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(file_url)
response.raise_for_status()
return response.text
except Exception as e:
logger.error(f"读取文本文件失败: {e}")
return f"[文件读取失败: {str(e)}]"
async def _extract_pdf_text(self, storage_path: str) -> str:
@staticmethod
async def _extract_pdf_text(file_url: str) -> str:
"""提取 PDF 文本"""
try:
# TODO: 实现 PDF 文本提取
# import PyPDF2 或 pdfplumber
return "[PDF 文本提取功能待实现]"
# 下载 PDF 文
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(file_url)
response.raise_for_status()
pdf_data = response.content
# 使用 BytesIO 读取 PDF
text_parts = []
pdf_file = io.BytesIO(pdf_data)
pdf_reader = PyPDF2.PdfReader(pdf_file)
for page in pdf_reader.pages:
text_parts.append(page.extract_text())
return '\n'.join(text_parts)
except Exception as e:
logger.error(f"提取 PDF 文本失败: {e}")
return f"[PDF 提取失败: {str(e)}]"
async def _extract_word_text(self, storage_path: str) -> str:
@staticmethod
async def _extract_word_text(file_url: str) -> str:
"""提取 Word 文档文本"""
try:
# TODO: 实现 Word 文本提取
# import docx
return "[Word 文本提取功能待实现]"
# 下载 Word 文
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(file_url)
response.raise_for_status()
word_data = response.content
# 使用 BytesIO 读取 Word 文档
word_file = io.BytesIO(word_data)
doc = Document(word_file)
text_parts = [paragraph.text for paragraph in doc.paragraphs]
return '\n'.join(text_parts)
except Exception as e:
logger.error(f"提取 Word 文本失败: {e}")
return f"[Word 提取失败: {str(e)}]"