Merge branch 'feature/multimodal' into develop

This commit is contained in:
Mark
2026-02-03 12:07:49 +08:00
16 changed files with 2580 additions and 1618 deletions

View File

@@ -3,7 +3,7 @@ import asyncio
import json
import time
import uuid
from typing import Optional, Dict, Any, AsyncGenerator, Annotated
from typing import Optional, Dict, Any, AsyncGenerator, Annotated, List
from fastapi import Depends
from sqlalchemy.orm import Session
@@ -15,6 +15,7 @@ from app.core.logging_config import get_business_logger
from app.db import get_db, get_db_context
from app.models import MultiAgentConfig, AgentConfig, WorkflowConfig
from app.schemas import DraftRunRequest
from app.schemas.app_schema import FileInput
from app.services.tool_service import ToolService
from app.repositories.tool_repository import ToolRepository
from app.db import get_db
@@ -26,6 +27,7 @@ from app.services.draft_run_service import create_web_search_tool
from app.services.model_service import ModelApiKeyService
from app.services.multi_agent_orchestrator import MultiAgentOrchestrator
from app.services.workflow_service import WorkflowService
from app.services.multimodal_service import MultimodalService
logger = get_business_logger()
@@ -48,7 +50,8 @@ class AppChatService:
memory: bool = True,
storage_type: Optional[str] = None,
user_rag_memory_id: Optional[str] = None,
workspace_id: Optional[str] = None
workspace_id: Optional[str] = None,
files: Optional[List[FileInput]] = None # 新增:多模态文件
) -> Dict[str, Any]:
"""聊天(非流式)"""
@@ -155,7 +158,14 @@ class AppChatService:
for msg in messages
]
# 调用 Agent
# 处理多模态文件
processed_files = None
if files:
multimodal_service = MultimodalService(self.db)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件")
# 调用 Agent支持多模态
result = await agent.chat(
message=message,
history=history,
@@ -164,7 +174,8 @@ class AppChatService:
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
config_id=config_id,
memory_flag=memory_flag
memory_flag=memory_flag,
files=processed_files # 传递处理后的文件
)
# 保存消息
@@ -206,6 +217,7 @@ class AppChatService:
storage_type: Optional[str] = None,
user_rag_memory_id: Optional[str] = None,
workspace_id: Optional[str] = None,
files: Optional[List[FileInput]] = None # 新增:多模态文件
) -> AsyncGenerator[str, None]:
"""聊天(流式)"""
@@ -312,10 +324,17 @@ class AppChatService:
for msg in messages
]
# 处理多模态文件
processed_files = None
if files:
multimodal_service = MultimodalService(self.db)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件")
# 发送开始事件
yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n"
# 流式调用 Agent
# 流式调用 Agent(支持多模态)
full_content = ""
total_tokens = 0
async for chunk in agent.chat_stream(
@@ -326,7 +345,8 @@ class AppChatService:
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
config_id=config_id,
memory_flag=memory_flag
memory_flag=memory_flag,
files=processed_files # 传递处理后的文件
):
if isinstance(chunk, int):
total_tokens = chunk

View File

@@ -19,11 +19,13 @@ from app.models import AgentConfig, ModelApiKey, ModelConfig
from app.repositories.model_repository import ModelApiKeyRepository
from app.repositories.tool_repository import ToolRepository
from app.schemas.prompt_schema import PromptMessageRole, render_prompt_message
from app.schemas.app_schema import FileInput
from app.services import task_service
from app.services.langchain_tool_server import Search
from app.services.memory_agent_service import MemoryAgentService
from app.services.model_parameter_merger import ModelParameterMerger
from app.services.tool_service import ToolService
from app.services.multimodal_service import MultimodalService
from langchain.tools import tool
from pydantic import BaseModel, Field
from sqlalchemy import select
@@ -62,26 +64,23 @@ def create_long_term_memory_tool(memory_config: Dict[str, Any], end_user_id: str
@tool(args_schema=LongTermMemoryInput)
def long_term_memory(question: str) -> str:
"""
从用户的历史记忆中检索相关信息。这是一个强大的工具,可以帮助你了解用户的背景、偏好和历史对话内容。
从用户的历史记忆中检索相关信息。用于了解用户的背景、偏好和历史对话内容。
以下场景不需要使用此工具:
1. 情绪/社交问候场景(如"你好""谢谢""再见"等简单寒暄
2. 纯任务性场景(如"帮我写代码""翻译这段文字"等不需要历史上下文的任务
3. 处理外部内容时如用户提供的文本、代码、RAG数据等这些内容本身已经包含所需信息
**何时使用此工具:**
- 用户明确询问历史信息(如"我之前说过什么""上次我们聊了什么"
- 用户询问个人信息或偏好(如"我喜欢什么""我的习惯是什么"
- 需要基于历史上下文提供个性化建议
除上述场景外的所有其他情况都应该使用此工具,特别是:
- 用户询问个人信息或历史对话内容
- 需要了解用户偏好、习惯或背景
- 用户提到"之前""上次""记得"等涉及历史的词汇
- 需要个性化回复或基于历史上下文的建议
- 用户询问关于自己的任何信息
**何时不使用此工具:**
- 简单问候(如"你好""谢谢""再见"
- 纯任务性请求(如"写代码""翻译文字""分析图片"
- 用户已提供完整信息(如提供了文本、图片、文档等内容)
- 创作性任务(如"写诗""编故事""创作谜语"
**重要:如果用户的问题可以直接回答,不要调用此工具。只在确实需要历史信息时才使用。**
需要对question改写/优化:
需要重点关注一以下几点
- 相关的关键词,保持原问题的核心语义不变, 根据上下文,使问题更具体、更清晰,将模糊的表达转换为明确的搜索词
- 使用同义词或相关术语扩展查询
Args:
question: question改写之后的内容
question: 需要检索的问题(保持原问题的核心语义,使用清晰的关键词)
Returns:
检索到的历史记忆内容
@@ -124,6 +123,10 @@ def create_long_term_memory_tool(memory_config: Dict[str, Any], end_user_id: str
}
)
# 检查是否有有效内容
if not memory_content or str(memory_content).strip() == "" or "answer" in str(memory_content) and str(memory_content).count("''") > 0:
return "未找到相关的历史记忆。请直接回答用户的问题,不要再次调用此工具。"
return f"检索到以下历史记忆:\n\n{memory_content}"
except Exception as e:
logger.error("长期记忆检索失败", extra={"error": str(e), "error_type": type(e).__name__})
@@ -246,7 +249,8 @@ class DraftRunService:
user_rag_memory_id: Optional[str] = None,
web_search: bool = True,
memory: bool = True,
sub_agent: bool = False
sub_agent: bool = False,
files: Optional[List[FileInput]] = None # 新增:多模态文件
) -> Dict[str, Any]:
"""执行试运行(使用 LangChain Agent
@@ -406,7 +410,16 @@ class DraftRunService:
max_history=agent_config.memory.get("max_history", 10)
)
# 6. 知识库检索
# 6. 处理多模态文件
processed_files = None
if files:
# 获取 provider 信息
provider = api_key_config.get("provider", "openai")
multimodal_service = MultimodalService(self.db, provider=provider)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件provider={provider}")
# 7. 知识库检索
context = None
logger.debug(
@@ -414,14 +427,15 @@ class DraftRunService:
extra={
"model": api_key_config["model_name"],
"has_history": bool(history),
"has_context": bool(context)
"has_context": bool(context),
"has_files": bool(processed_files)
}
)
memory_config_= agent_config.memory
config_id = memory_config_.get("memory_content") or memory_config_.get("memory_config",None)
# 7. 调用 Agent
# 8. 调用 Agent(支持多模态)
result = await agent.chat(
message=message,
history=history,
@@ -430,12 +444,13 @@ class DraftRunService:
config_id=config_id,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
memory_flag=memory_flag
memory_flag=memory_flag,
files=processed_files # 传递处理后的文件
)
elapsed_time = time.time() - start_time
# 8. 保存会话消息
# 9. 保存会话消息
if not sub_agent and agent_config.memory and agent_config.memory.get("enabled"):
await self._save_conversation_message(
conversation_id=conversation_id,
@@ -493,7 +508,8 @@ class DraftRunService:
user_rag_memory_id: Optional[str] = None,
web_search: bool = True, # 布尔类型默认值
memory: bool = True, # 布尔类型默认值
sub_agent: bool = False # 是否是作为子Agent运行
sub_agent: bool = False, # 是否是作为子Agent运行
files: Optional[List[FileInput]] = None # 新增:多模态文件
) -> AsyncGenerator[str, None]:
"""执行试运行(流式返回,使用 LangChain Agent
@@ -642,6 +658,15 @@ class DraftRunService:
max_history=agent_config.memory.get("max_history", 10)
)
# 6. 处理多模态文件
processed_files = None
if files:
# 获取 provider 信息
provider = api_key_config.get("provider", "openai")
multimodal_service = MultimodalService(self.db, provider=provider)
processed_files = await multimodal_service.process_files(files)
logger.info(f"处理了 {len(processed_files)} 个文件provider={provider}")
# 7. 知识库检索
context = None
@@ -654,7 +679,7 @@ class DraftRunService:
memory_config_ = agent_config.memory
config_id = memory_config_.get("memory_content") or memory_config_.get("memory_config",None)
# 9. 流式调用 Agent
# 9. 流式调用 Agent(支持多模态)
full_content = ""
total_tokens = 0
async for chunk in agent.chat_stream(
@@ -665,7 +690,8 @@ class DraftRunService:
config_id=config_id,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
memory_flag=memory_flag
memory_flag=memory_flag,
files=processed_files # 传递处理后的文件
):
if isinstance(chunk, int):
total_tokens = chunk

View File

@@ -0,0 +1,429 @@
"""
多模态文件处理服务
处理图片、文档等多模态文件,转换为 LLM 可用的格式
支持的 Provider:
- DashScope (通义千问): 支持 URL 格式
- Bedrock/Anthropic: 仅支持 base64 格式
- OpenAI: 支持 URL 和 base64 格式
"""
import uuid
from typing import List, Dict, Any, Optional, Protocol
from sqlalchemy.orm import Session
from app.core.logging_config import get_business_logger
from app.core.exceptions import BusinessException
from app.core.error_codes import BizCode
from app.schemas.app_schema import FileInput, FileType, TransferMethod
from app.models.generic_file_model import GenericFile
logger = get_business_logger()
class ImageFormatStrategy(Protocol):
"""图片格式策略接口"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""将图片 URL 转换为特定 provider 的格式"""
...
class DashScopeImageStrategy:
"""通义千问图片格式策略"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""通义千问格式: {"type": "image", "image": "url"}"""
return {
"type": "image",
"image": url
}
class BedrockImageStrategy:
"""Bedrock/Anthropic 图片格式策略"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""
Bedrock/Anthropic 格式: base64 编码
{"type": "image", "source": {"type": "base64", "media_type": "...", "data": "..."}}
"""
import httpx
import base64
from mimetypes import guess_type
logger.info(f"下载并编码图片: {url}")
# 下载图片
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
response.raise_for_status()
# 获取图片数据
image_data = response.content
# 确定 media type
content_type = response.headers.get("content-type")
if content_type and content_type.startswith("image/"):
media_type = content_type
else:
guessed_type, _ = guess_type(url)
media_type = guessed_type if guessed_type and guessed_type.startswith("image/") else "image/jpeg"
# 转换为 base64
base64_data = base64.b64encode(image_data).decode("utf-8")
logger.info(f"图片编码完成: media_type={media_type}, size={len(base64_data)}")
return {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data
}
}
class OpenAIImageStrategy:
"""OpenAI 图片格式策略"""
async def format_image(self, url: str) -> Dict[str, Any]:
"""OpenAI 格式: {"type": "image_url", "image_url": {"url": "..."}}"""
return {
"type": "image_url",
"image_url": {
"url": url
}
}
# Provider 到策略的映射
PROVIDER_STRATEGIES = {
"dashscope": DashScopeImageStrategy,
"bedrock": BedrockImageStrategy,
"anthropic": BedrockImageStrategy,
"openai": OpenAIImageStrategy,
}
class MultimodalService:
"""多模态文件处理服务"""
def __init__(self, db: Session, provider: str = "dashscope"):
"""
初始化多模态服务
Args:
db: 数据库会话
provider: 模型提供商dashscope, bedrock, anthropic 等)
"""
self.db = db
self.provider = provider.lower()
async def process_files(
self,
files: Optional[List[FileInput]]
) -> List[Dict[str, Any]]:
"""
处理文件列表,返回 LLM 可用的格式
Args:
files: 文件输入列表
Returns:
List[Dict]: LLM 可用的内容格式列表(根据 provider 返回不同格式)
"""
if not files:
return []
result = []
for idx, file in enumerate(files):
try:
if file.type == FileType.IMAGE:
content = await self._process_image(file)
result.append(content)
elif file.type == FileType.DOCUMENT:
content = await self._process_document(file)
result.append(content)
elif file.type == FileType.AUDIO:
content = await self._process_audio(file)
result.append(content)
elif file.type == FileType.VIDEO:
content = await self._process_video(file)
result.append(content)
else:
logger.warning(f"不支持的文件类型: {file.type}")
except Exception as e:
logger.error(
f"处理文件失败",
extra={
"file_index": idx,
"file_type": file.type,
"error": str(e)
}
)
# 继续处理其他文件,不中断整个流程
result.append({
"type": "text",
"text": f"[文件处理失败: {str(e)}]"
})
logger.info(f"成功处理 {len(result)}/{len(files)} 个文件provider={self.provider}")
return result
async def _process_image(self, file: FileInput) -> Dict[str, Any]:
"""
处理图片文件
Args:
file: 图片文件输入
Returns:
Dict: 根据 provider 返回不同格式
- Anthropic/Bedrock: {"type": "image", "source": {"type": "base64", "media_type": "...", "data": "..."}}
- 通义千问: {"type": "image", "image": "url"}
"""
if file.transfer_method == TransferMethod.REMOTE_URL:
url = file.url
else:
# 本地文件,获取访问 URL
url = await self._get_file_url(file.upload_file_id)
logger.debug(f"处理图片: {url}, provider={self.provider}")
# 根据 provider 返回不同格式
if self.provider in ["bedrock", "anthropic"]:
# Anthropic/Bedrock 只支持 base64 格式,需要下载并转换
try:
logger.info(f"开始下载并编码图片: {url}")
base64_data, media_type = await self._download_and_encode_image(url)
result = {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data[:100] + "..." # 只记录前100个字符
}
}
logger.info(f"图片编码完成: media_type={media_type}, data_length={len(base64_data)}")
# 返回完整数据
result["source"]["data"] = base64_data
return result
except Exception as e:
logger.error(f"下载并编码图片失败: {e}", exc_info=True)
# 返回错误提示
return {
"type": "text",
"text": f"[图片加载失败: {str(e)}]"
}
else:
# 通义千问等其他格式支持 URL
return {
"type": "image",
"image": url
}
async def _download_and_encode_image(self, url: str) -> tuple[str, str]:
"""
下载图片并转换为 base64
Args:
url: 图片 URL
Returns:
tuple: (base64_data, media_type)
"""
import httpx
import base64
from mimetypes import guess_type
# 下载图片
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
response.raise_for_status()
# 获取图片数据
image_data = response.content
# 确定 media type
content_type = response.headers.get("content-type")
if content_type and content_type.startswith("image/"):
media_type = content_type
else:
# 从 URL 推断
guessed_type, _ = guess_type(url)
media_type = guessed_type if guessed_type and guessed_type.startswith("image/") else "image/jpeg"
# 转换为 base64
base64_data = base64.b64encode(image_data).decode("utf-8")
logger.debug(f"图片编码完成: media_type={media_type}, size={len(base64_data)}")
return base64_data, media_type
async def _process_document(self, file: FileInput) -> Dict[str, Any]:
"""
处理文档文件PDF、Word 等)
Args:
file: 文档文件输入
Returns:
Dict: text 格式的内容(包含提取的文本)
"""
if file.transfer_method == TransferMethod.REMOTE_URL:
# 远程文档暂不支持提取
return {
"type": "text",
"text": f"<document url=\"{file.url}\">\n[远程文档,暂不支持内容提取]\n</document>"
}
else:
# 本地文件,提取文本内容
text = await self._extract_document_text(file.upload_file_id)
generic_file = self.db.query(GenericFile).filter(
GenericFile.id == file.upload_file_id
).first()
file_name = generic_file.file_name if generic_file else "unknown"
return {
"type": "text",
"text": f"<document name=\"{file_name}\">\n{text}\n</document>"
}
async def _process_audio(self, file: FileInput) -> Dict[str, Any]:
"""
处理音频文件
Args:
file: 音频文件输入
Returns:
Dict: 音频内容(暂时返回占位符)
"""
# TODO: 实现音频转文字功能
return {
"type": "text",
"text": "[音频文件,暂不支持处理]"
}
async def _process_video(self, file: FileInput) -> Dict[str, Any]:
"""
处理视频文件
Args:
file: 视频文件输入
Returns:
Dict: 视频内容(暂时返回占位符)
"""
# TODO: 实现视频处理功能
return {
"type": "text",
"text": "[视频文件,暂不支持处理]"
}
async def _get_file_url(self, file_id: uuid.UUID) -> str:
"""
获取文件的访问 URL
Args:
file_id: 文件ID
Returns:
str: 文件访问 URL
Raises:
BusinessException: 文件不存在
"""
generic_file = self.db.query(GenericFile).filter(
GenericFile.id == file_id,
GenericFile.status == "active"
).first()
if not generic_file:
raise BusinessException(
f"文件不存在或已删除: {file_id}",
BizCode.NOT_FOUND
)
# 如果有 access_url直接返回
if generic_file.access_url:
return generic_file.access_url
# 否则,根据 storage_path 生成 URL
# TODO: 根据实际存储方式生成 URL本地存储、OSS 等)
# 这里暂时返回一个占位 URL
return f"/api/files/{file_id}/download"
async def _extract_document_text(self, file_id: uuid.UUID) -> str:
"""
提取文档文本内容
Args:
file_id: 文件ID
Returns:
str: 提取的文本内容
"""
generic_file = self.db.query(GenericFile).filter(
GenericFile.id == file_id,
GenericFile.status == "active"
).first()
if not generic_file:
raise BusinessException(
f"文件不存在或已删除: {file_id}",
BizCode.NOT_FOUND
)
# TODO: 根据文件类型提取文本
# - PDF: 使用 PyPDF2 或 pdfplumber
# - Word: 使用 python-docx
# - TXT/MD: 直接读取
file_ext = generic_file.file_ext.lower()
if file_ext in ['.txt', '.md', '.markdown']:
return await self._read_text_file(generic_file.storage_path)
elif file_ext == '.pdf':
return await self._extract_pdf_text(generic_file.storage_path)
elif file_ext in ['.doc', '.docx']:
return await self._extract_word_text(generic_file.storage_path)
else:
return f"[不支持的文档格式: {file_ext}]"
async def _read_text_file(self, storage_path: str) -> str:
"""读取纯文本文件"""
try:
with open(storage_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
logger.error(f"读取文本文件失败: {e}")
return f"[文件读取失败: {str(e)}]"
async def _extract_pdf_text(self, storage_path: str) -> str:
"""提取 PDF 文本"""
try:
# TODO: 实现 PDF 文本提取
# import PyPDF2 或 pdfplumber
return "[PDF 文本提取功能待实现]"
except Exception as e:
logger.error(f"提取 PDF 文本失败: {e}")
return f"[PDF 提取失败: {str(e)}]"
async def _extract_word_text(self, storage_path: str) -> str:
"""提取 Word 文档文本"""
try:
# TODO: 实现 Word 文本提取
# import docx
return "[Word 文本提取功能待实现]"
except Exception as e:
logger.error(f"提取 Word 文本失败: {e}")
return f"[Word 提取失败: {str(e)}]"
def get_multimodal_service(db: Session) -> MultimodalService:
"""获取多模态服务实例(依赖注入)"""
return MultimodalService(db)