Merge pull request #619 from SuanmoSuanyangTechnology/release/v0.2.8

Release/v0.2.8
This commit is contained in:
Ke Sun
2026-03-19 15:54:20 +08:00
committed by GitHub
19 changed files with 625 additions and 170 deletions

View File

@@ -8,6 +8,7 @@ from app.core.response_utils import success
from app.db import get_db
from app.dependencies import get_current_user
from app.models import User
from app.schemas import conversation_schema
from app.schemas.response_schema import ApiResponse
from app.services.conversation_service import ConversationService
@@ -90,11 +91,7 @@ def get_messages(
conversation_id,
)
messages = [
{
"role": message.role,
"content": message.content,
"created_at": int(message.created_at.timestamp() * 1000),
}
conversation_schema.Message.model_validate(message)
for message in messages_obj
]
return success(data=messages, msg="get conversation history success")

View File

@@ -13,7 +13,6 @@ from app.core.logging_config import get_business_logger
from app.core.response_utils import success, fail
from app.db import get_db, get_db_read
from app.dependencies import get_share_user_id, ShareTokenData
from app.models.app_model import App
from app.models.app_model import AppType
from app.repositories import knowledge_repository
from app.repositories.end_user_repository import EndUserRepository
@@ -618,11 +617,11 @@ async def chat(
# 多 Agent 非流式返回
result = await app_chat_service.workflow_chat(
message=payload.message,
conversation_id=conversation.id, # 使用已创建的会话 ID
user_id=end_user_id, # 转换为字符串
variables=payload.variables,
files=payload.files,
config=config,
web_search=payload.web_search,
memory=payload.memory,

View File

@@ -280,6 +280,7 @@ async def chat(
memory=memory,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
files=payload.files,
app_id=app.id,
workspace_id=workspace_id,
release_id=app.current_release.id

View File

@@ -7,7 +7,7 @@ file operations across different storage backends.
"""
from abc import ABC, abstractmethod
from typing import Optional
from typing import AsyncIterator, Optional
class StorageBackend(ABC):
@@ -42,6 +42,26 @@ class StorageBackend(ABC):
"""
pass
@abstractmethod
async def upload_stream(
self,
file_key: str,
stream: AsyncIterator[bytes],
content_type: Optional[str] = None,
) -> int:
"""
Upload a file from an async byte stream.
Args:
file_key: Unique identifier for the file.
stream: Async iterator yielding bytes chunks.
content_type: Optional MIME type of the file.
Returns:
Total bytes written.
"""
pass
@abstractmethod
async def download(self, file_key: str) -> bytes:
"""

View File

@@ -11,6 +11,7 @@ from typing import Optional
import aiofiles
import aiofiles.os
from typing import AsyncIterator
from app.core.storage.base import StorageBackend
from app.core.storage_exceptions import (
@@ -179,6 +180,36 @@ class LocalStorage(StorageBackend):
full_path = self._get_full_path(file_key)
return full_path.exists()
async def upload_stream(
self,
file_key: str,
stream: AsyncIterator[bytes],
content_type: Optional[str] = None,
) -> int:
"""
Upload a file from an async byte stream to the local file system.
Returns:
Total bytes written.
"""
full_path = self._get_full_path(file_key)
try:
full_path.parent.mkdir(parents=True, exist_ok=True)
total = 0
async with aiofiles.open(full_path, "wb") as f:
async for chunk in stream:
await f.write(chunk)
total += len(chunk)
logger.info(f"File stream uploaded successfully: {file_key}")
return total
except Exception as e:
logger.error(f"Failed to stream upload file {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to stream upload file: {e}",
file_key=file_key,
cause=e,
)
async def get_url(self, file_key: str, expires: int = 3600) -> str:
"""
Get an access URL for the file.

View File

@@ -5,8 +5,9 @@ This module provides a storage backend that stores files on Aliyun Object
Storage Service (OSS) using the oss2 SDK.
"""
import io
import logging
from typing import Optional
from typing import AsyncIterator, Optional
import oss2
from oss2.exceptions import NoSuchKey, OssError
@@ -125,10 +126,39 @@ class OSSStorage(StorageBackend):
cause=e,
)
async def upload_stream(
self,
file_key: str,
stream: AsyncIterator[bytes],
content_type: Optional[str] = None,
) -> int:
"""Upload from async stream to OSS. Returns total bytes written."""
buf = io.BytesIO()
try:
async for chunk in stream:
buf.write(chunk)
content = buf.getvalue()
headers = {"Content-Type": content_type} if content_type else None
self.bucket.put_object(file_key, content, headers=headers)
logger.info(f"File stream uploaded to OSS successfully: {file_key}")
return len(content)
except OssError as e:
logger.error(f"OSS error stream uploading file {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to stream upload file to OSS: {e.message}",
file_key=file_key,
cause=e,
)
except Exception as e:
logger.error(f"Failed to stream upload file to OSS {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to stream upload file to OSS: {e}",
file_key=file_key,
cause=e,
)
async def download(self, file_key: str) -> bytes:
"""
Download a file from OSS.
Args:
file_key: Unique identifier for the file in the storage system.

View File

@@ -5,8 +5,9 @@ This module provides a storage backend that stores files on AWS S3
using the boto3 SDK.
"""
import io
import logging
from typing import Optional
from typing import AsyncIterator, Optional
import boto3
from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError
@@ -174,6 +175,62 @@ class S3Storage(StorageBackend):
cause=e,
)
async def upload_stream(
self,
file_key: str,
stream: AsyncIterator[bytes],
content_type: Optional[str] = None,
) -> int:
"""Upload from async stream to S3 via multipart upload. Returns total bytes written."""
extra_args = {"ContentType": content_type} if content_type else {}
mpu = self.client.create_multipart_upload(
Bucket=self.bucket_name, Key=file_key, **extra_args
)
upload_id = mpu["UploadId"]
parts = []
part_number = 1
buf = io.BytesIO()
total = 0
min_part_size = 5 * 1024 * 1024 # S3 最小分片 5MB
try:
async for chunk in stream:
buf.write(chunk)
total += len(chunk)
if buf.tell() >= min_part_size:
buf.seek(0)
resp = self.client.upload_part(
Bucket=self.bucket_name, Key=file_key,
UploadId=upload_id, PartNumber=part_number, Body=buf.read()
)
parts.append({"PartNumber": part_number, "ETag": resp["ETag"]})
part_number += 1
buf = io.BytesIO()
# 上传剩余数据(最后一片可小于 5MB
remaining = buf.getvalue()
if remaining:
resp = self.client.upload_part(
Bucket=self.bucket_name, Key=file_key,
UploadId=upload_id, PartNumber=part_number, Body=remaining
)
parts.append({"PartNumber": part_number, "ETag": resp["ETag"]})
self.client.complete_multipart_upload(
Bucket=self.bucket_name, Key=file_key,
UploadId=upload_id,
MultipartUpload={"Parts": parts}
)
logger.info(f"File stream uploaded to S3 successfully: {file_key}")
return total
except Exception as e:
self.client.abort_multipart_upload(
Bucket=self.bucket_name, Key=file_key, UploadId=upload_id
)
logger.error(f"Failed to stream upload file to S3 {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to stream upload file to S3: {e}",
file_key=file_key,
cause=e,
)
async def download(self, file_key: str) -> bytes:
"""
Download a file from S3.

View File

@@ -139,25 +139,25 @@ class FileUploadConfig(BaseModel):
image_enabled: bool = Field(default=False)
image_max_size_mb: int = Field(default=20)
image_allowed_extensions: List[str] = Field(
default=["png", "jpg", "jpeg", "gif", "webp"]
default=["png", "jpg", "jpeg"]
)
# 语音文件MP3/WAV/M4A/OGG/FLAC最大 50MB
audio_enabled: bool = Field(default=False)
audio_max_size_mb: int = Field(default=50)
audio_allowed_extensions: List[str] = Field(
default=["mp3", "wav", "m4a", "ogg", "flac"]
default=["mp3", "wav", "m4a"]
)
# 通用文件PDF/DOCX/XLSX/TXT/CSV/JSON最大 100MB
document_enabled: bool = Field(default=False)
document_max_size_mb: int = Field(default=100)
document_allowed_extensions: List[str] = Field(
default=["pdf", "docx", "xlsx", "txt", "csv", "json"]
default=["pdf", "docx", "xlsx", "txt", "csv", "json", "md"]
)
# 视频文件MP4/MOV/AVI/WebM最大 500MB
video_enabled: bool = Field(default=False)
video_max_size_mb: int = Field(default=500)
video_allowed_extensions: List[str] = Field(
default=["mp4", "mov", "avi", "webm"]
default=["mp4", "mov"]
)
# 最大文件数量
max_file_count: int = Field(default=5, ge=1, le=20)

View File

@@ -51,6 +51,10 @@ class Message(BaseModel):
def _serialize_created_at(self, dt: datetime.datetime):
return int(dt.timestamp() * 1000) if dt else None
@field_serializer("meta_data", when_used="json")
def _serialize_meta_data(self, data: Optional[Dict[str, Any]]):
return data or {}
class Conversation(BaseModel):
"""会话输出"""

View File

@@ -191,7 +191,7 @@ class AppChatService:
for f in files:
# url = await MultimodalService(self.db).get_file_url(f)
human_meta["files"].append({
"type": FileType.IMAGE,
"type": f.type,
"url": f.url
})
@@ -342,9 +342,17 @@ class AppChatService:
processed_files = await multimodal_service.process_files(user_id, files)
logger.info(f"处理了 {len(processed_files)} 个文件")
# 流式调用 Agent支持多模态
# 流式调用 Agent支持多模态,同时并行启动 TTS
full_content = ""
total_tokens = 0
text_queue: asyncio.Queue = asyncio.Queue()
stream_audio_url, tts_task = await self.agent_service._generate_tts_streaming(
features_config, api_key_obj,
text_queue=text_queue,
tenant_id=tenant_id, workspace_id=workspace_id
)
async for chunk in agent.chat_stream(
message=message,
history=history,
@@ -354,17 +362,20 @@ class AppChatService:
user_rag_memory_id=user_rag_memory_id,
config_id=config_id,
memory_flag=memory_flag,
files=processed_files # 传递处理后的文件
files=processed_files
):
if isinstance(chunk, int):
total_tokens = chunk
else:
full_content += chunk
# 发送消息块事件
yield f"event: message\ndata: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n"
if tts_task is not None:
await text_queue.put(chunk)
if tts_task is not None:
await text_queue.put(None)
elapsed_time = time.time() - start_time
ModelApiKeyService.record_api_key_usage(self.db, api_key_obj.id)
# 发送结束事件(包含 suggested_questions、tts、citations
@@ -376,12 +387,6 @@ class AppChatService:
{"model_name": api_key_obj.model_name, "api_key": api_key_obj.api_key,
"api_base": api_key_obj.api_base}, {}
)
stream_audio_url = await self.agent_service._generate_tts(
features_config, full_content,
{"model_name": api_key_obj.model_name, "api_key": api_key_obj.api_key,
"api_base": api_key_obj.api_base, "provider": api_key_obj.provider},
tenant_id=tenant_id, workspace_id=workspace_id
)
end_data["audio_url"] = stream_audio_url
end_data["citations"] = self.agent_service._filter_citations(features_config, [])
@@ -399,7 +404,7 @@ class AppChatService:
for f in files:
# url = await MultimodalService(self.db).get_file_url(f)
human_meta["files"].append({
"type": FileType.IMAGE,
"type": f.type,
"url": f.url
})
@@ -623,6 +628,7 @@ class AppChatService:
app_id: uuid.UUID,
release_id: uuid.UUID,
workspace_id: uuid.UUID,
files: Optional[List[FileInput]] = None,
user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None,
web_search: bool = False,
@@ -636,7 +642,8 @@ class AppChatService:
variables=variables,
conversation_id=str(conversation_id),
stream=True,
user_id=user_id
user_id=user_id,
files=files
)
return await self.workflow_service.run(
app_id=app_id,

View File

@@ -852,9 +852,18 @@ class AgentRunService:
# 兼容新旧字段名:优先使用 memory_config_id回退到 memory_content
config_id = memory_config_.get("memory_config_id") or memory_config_.get("memory_content", None)
# 9. 流式调用 Agent支持多模态
# 9. 流式调用 Agent支持多模态,同时并行启动 TTS
full_content = ""
total_tokens = 0
# 启动流式 TTS文本边输出边合成
text_queue: asyncio.Queue = asyncio.Queue()
stream_audio_url, tts_task = await self._generate_tts_streaming(
features_config, api_key_config,
text_queue=text_queue,
tenant_id=tenant_id, workspace_id=workspace_id
) if not sub_agent else (None, None)
async for chunk in agent.chat_stream(
message=message,
history=history,
@@ -864,31 +873,25 @@ class AgentRunService:
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
memory_flag=memory_flag,
files=processed_files # 传递处理后的文件
files=processed_files
):
if isinstance(chunk, int):
total_tokens = chunk
else:
full_content += chunk
# 发送消息块事件
yield self._format_sse_event("message", {
"content": chunk
})
yield self._format_sse_event("message", {"content": chunk})
if tts_task is not None:
await text_queue.put(chunk)
# 文本结束,通知 TTS
if tts_task is not None:
await text_queue.put(None)
elapsed_time = time.time() - start_time
ModelApiKeyService.record_api_key_usage(self.db, api_key_config.get("api_key_id"))
if sub_agent:
yield self._format_sse_event("sub_usage", {
"total_tokens": total_tokens
})
# 10. 生成 audio_url在保存消息前生成以便一并存入 meta_data
stream_audio_url = await self._generate_tts(
features_config, full_content, api_key_config,
tenant_id=tenant_id, workspace_id=workspace_id
) if not sub_agent else None
yield self._format_sse_event("sub_usage", {"total_tokens": total_tokens})
# 11. 保存会话消息
if not sub_agent:
@@ -1182,7 +1185,7 @@ class AgentRunService:
for f in files:
# url = await MultimodalService(self.db).get_file_url(f)
human_meta["files"].append({
"type": FileType.IMAGE,
"type": f.type,
"url": f.url
})
# 保存用户消息
@@ -1317,125 +1320,345 @@ class AgentRunService:
tenant_id: Optional[uuid.UUID] = None,
workspace_id: Optional[uuid.UUID] = None,
) -> Optional[str]:
"""根据 text_to_speech 配置生成语音,上传到存储并返回 URL"""
"""先注册文件元数据并返回 audio_url再后台流式写入音频内容"""
tts_config = features_config.get("text_to_speech", {})
if not isinstance(tts_config, dict) or not tts_config.get("enabled"):
return None
if not text or not text.strip():
return None
try:
from app.services.file_storage_service import FileStorageService
from app.models.file_metadata_model import FileMetadata
from app.services.file_storage_service import FileStorageService, generate_file_key
provider = api_key_config.get("provider", "openai")
api_key = api_key_config.get("api_key")
api_base = api_key_config.get("api_base")
voice = tts_config.get("voice")
provider = api_key_config.get("provider", "openai")
api_key = api_key_config.get("api_key")
api_base = api_key_config.get("api_base")
voice = tts_config.get("voice")
file_ext, content_type = ".mp3", "audio/mpeg"
if provider == "dashscope":
audio_bytes, file_ext, content_type = await self._tts_dashscope(
api_key=api_key,
text=text,
voice=voice or "longxiaochun", # 会根据 model 版本自动修正后缀
tts_config=tts_config,
)
else:
# OpenAI 兼容接口openai / xinference / gpustack 等)
audio_bytes, file_ext, content_type = await self._tts_openai(
api_key=api_key,
api_base=api_base,
text=text,
voice=voice or "alloy",
file_id = uuid.uuid4()
file_key = generate_file_key(tenant_id, workspace_id, file_id, file_ext)
# 先写入 pending 状态的元数据,立即返回 URL
db_file = FileMetadata(
id=file_id,
tenant_id=tenant_id,
workspace_id=workspace_id,
file_key=file_key,
file_name=f"tts_{file_id}{file_ext}",
file_ext=file_ext,
file_size=0,
content_type=content_type,
status="pending",
)
self.db.add(db_file)
self.db.commit()
server_url = settings.FILE_LOCAL_SERVER_URL
audio_url = f"{server_url}/storage/permanent/{file_id}"
# 后台任务:流式生成并写入存储,完成后更新状态
async def _stream_to_storage():
try:
storage_service = FileStorageService()
if provider == "dashscope":
stream = self._tts_dashscope_stream(
api_key=api_key,
text=text,
voice=voice or "longxiaochun",
tts_config=tts_config,
)
else:
stream = self._tts_openai_stream(
api_key=api_key,
api_base=api_base,
text=text,
voice=voice or "alloy",
)
total_size = await storage_service.upload_stream(
tenant_id=tenant_id,
workspace_id=workspace_id,
file_id=file_id,
file_ext=file_ext,
stream=stream,
content_type=content_type,
)
storage_service = FileStorageService()
file_id = uuid.uuid4()
file_key = await storage_service.upload_file(
tenant_id=tenant_id,
workspace_id=workspace_id,
file_id=file_id,
file_ext=file_ext,
content=audio_bytes,
content_type=content_type,
)
# 更新元数据状态
with get_db_context() as bg_db:
record = bg_db.get(FileMetadata, file_id)
if record:
record.status = "completed"
record.file_size = total_size
bg_db.commit()
logger.debug(f"TTS 流式写入完成provider={provider}, file_key={file_key}")
except Exception as e:
logger.warning(f"TTS 流式写入失败: {e}")
with get_db_context() as bg_db:
record = bg_db.get(FileMetadata, file_id)
if record:
record.status = "failed"
bg_db.commit()
# 保存文件元数据到数据库
from app.models.file_metadata_model import FileMetadata
db_file = FileMetadata(
id=file_id,
tenant_id=tenant_id,
workspace_id=workspace_id,
file_key=file_key,
file_name=f"tts_{file_id}{file_ext}",
file_ext=file_ext,
file_size=len(audio_bytes),
content_type=content_type,
status="completed",
)
self.db.add(db_file)
self.db.commit()
asyncio.create_task(_stream_to_storage())
return audio_url
server_url = settings.FILE_LOCAL_SERVER_URL
audio_url = f"{server_url}/storage/permanent/{file_id}"
logger.debug(f"TTS 生成成功provider={provider}, file_key={file_key}")
return audio_url
async def _generate_tts_streaming(
self,
features_config: Dict[str, Any],
api_key_config: Dict[str, Any],
text_queue: asyncio.Queue,
tenant_id: Optional[uuid.UUID] = None,
workspace_id: Optional[uuid.UUID] = None,
) -> tuple[Optional[str], Optional[asyncio.Task]]:
"""文本流式输入并行合成音频。
返回 (audio_url, task)audio_url 立即可用task 完成后文件内容就绪。
调用方向 text_queue put 文本 chunk结束时 put None。
"""
tts_config = features_config.get("text_to_speech", {})
if not isinstance(tts_config, dict) or not tts_config.get("enabled"):
return None, None
except Exception as e:
logger.warning(f"TTS 生成失败: {e}")
return None
from app.models.file_metadata_model import FileMetadata
from app.services.file_storage_service import FileStorageService, generate_file_key
provider = api_key_config.get("provider", "openai")
api_key = api_key_config.get("api_key")
api_base = api_key_config.get("api_base")
voice = tts_config.get("voice")
file_ext, content_type = ".mp3", "audio/mpeg"
file_id = uuid.uuid4()
file_key = generate_file_key(tenant_id, workspace_id, file_id, file_ext)
db_file = FileMetadata(
id=file_id,
tenant_id=tenant_id,
workspace_id=workspace_id,
file_key=file_key,
file_name=f"tts_{file_id}{file_ext}",
file_ext=file_ext,
file_size=0,
content_type=content_type,
status="pending",
)
self.db.add(db_file)
self.db.commit()
server_url = settings.FILE_LOCAL_SERVER_URL
audio_url = f"{server_url}/storage/permanent/{file_id}"
async def _run():
try:
storage_service = FileStorageService()
if provider == "dashscope":
audio_stream = self._tts_dashscope_stream_from_queue(
api_key=api_key,
voice=voice or "longxiaochun",
tts_config=tts_config,
text_queue=text_queue,
)
else:
audio_stream = self._tts_openai_stream_from_queue(
api_key=api_key,
api_base=api_base,
voice=voice or "alloy",
text_queue=text_queue,
)
total_size = await storage_service.upload_stream(
tenant_id=tenant_id,
workspace_id=workspace_id,
file_id=file_id,
file_ext=file_ext,
stream=audio_stream,
content_type=content_type,
)
with get_db_context() as bg_db:
record = bg_db.get(FileMetadata, file_id)
if record:
record.status = "completed"
record.file_size = total_size
bg_db.commit()
logger.debug(f"TTS 流式合成完成provider={provider}, file_key={file_key}")
except Exception as e:
logger.warning(f"TTS 流式合成失败: {e}")
with get_db_context() as bg_db:
record = bg_db.get(FileMetadata, file_id)
if record:
record.status = "failed"
bg_db.commit()
task = asyncio.create_task(_run())
return audio_url, task
@staticmethod
async def _tts_openai(
async def _tts_openai_stream_from_queue(
api_key: str,
api_base: Optional[str],
voice: str,
text_queue: asyncio.Queue,
):
"""OpenAI TTS收集全部文本后流式合成OpenAI 不支持增量输入)"""
from openai import AsyncOpenAI
# 收集全部文本(此时文本流已并行输出,等待时间短)
parts = []
while True:
chunk = await text_queue.get()
if chunk is None:
break
parts.append(chunk)
full_text = "".join(parts)
if not full_text.strip():
return
client = AsyncOpenAI(api_key=api_key, base_url=api_base)
async with client.audio.speech.with_streaming_response.create(
model="tts-1",
voice=voice,
input=full_text[:4096],
) as response:
async for chunk in response.iter_bytes(chunk_size=4096):
yield chunk
@staticmethod
async def _tts_dashscope_stream_from_queue(
api_key: str,
voice: str,
tts_config: Dict[str, Any],
text_queue: asyncio.Queue,
):
"""DashScope TTS文本流式输入实现真正并行合成"""
import dashscope
from dashscope.audio.tts_v2 import SpeechSynthesizer, AudioFormat, ResultCallback
model = tts_config.get("model") or "cosyvoice-v2"
is_v2 = model.endswith("-v2")
if is_v2 and not voice.endswith("_v2"):
voice = voice + "_v2"
elif not is_v2 and voice.endswith("_v2"):
voice = voice[:-3]
audio_queue: asyncio.Queue = asyncio.Queue()
loop = asyncio.get_event_loop()
class _Callback(ResultCallback):
def on_data(self, data: bytes):
if data:
loop.call_soon_threadsafe(audio_queue.put_nowait, data)
def on_complete(self):
loop.call_soon_threadsafe(audio_queue.put_nowait, None)
def on_error(self, message):
loop.call_soon_threadsafe(audio_queue.put_nowait, RuntimeError(str(message)))
def on_open(self): pass
def on_close(self): pass
dashscope.api_key = api_key
synthesizer = SpeechSynthesizer(
model=model,
voice=voice,
format=AudioFormat.MP3_22050HZ_MONO_256KBPS,
callback=_Callback(),
)
async def _feed_text():
"""从 text_queue 取文本按句子切分后喂给 synthesizer"""
import re
buf = ""
sentence_end = re.compile(r'[\u3002\uff01\uff1f\.!?\n]')
while True:
chunk = await text_queue.get()
if chunk is None:
if buf.strip():
await asyncio.to_thread(synthesizer.streaming_call, buf)
await asyncio.to_thread(synthesizer.streaming_complete)
break
buf += chunk
# 按句子切分喂入
while sentence_end.search(buf):
m = sentence_end.search(buf)
sentence = buf[:m.end()]
buf = buf[m.end():]
await asyncio.to_thread(synthesizer.streaming_call, sentence)
asyncio.create_task(_feed_text())
while True:
item = await audio_queue.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
@staticmethod
async def _tts_openai_stream(
api_key: str,
api_base: Optional[str],
text: str,
voice: str,
) -> tuple:
"""OpenAI 兼容 TTS,返回 (audio_bytes, file_ext, content_type)"""
):
"""OpenAI 兼容 TTS 流式生成yield bytes chunks"""
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key=api_key, base_url=api_base)
response = await client.audio.speech.create(
async with client.audio.speech.with_streaming_response.create(
model="tts-1",
voice=voice,
input=text[:4096],
)
return response.content, ".mp3", "audio/mpeg"
) as response:
async for chunk in response.iter_bytes(chunk_size=4096):
yield chunk
@staticmethod
async def _tts_dashscope(
async def _tts_dashscope_stream(
api_key: str,
text: str,
voice: str,
tts_config: Dict[str, Any],
) -> tuple:
"""DashScope CosyVoice TTS返回 (audio_bytes, file_ext, content_type)"""
):
"""DashScope TTS 流式生成yield bytes chunks"""
import dashscope
from dashscope.audio.tts_v2 import SpeechSynthesizer, AudioFormat
from dashscope.audio.tts_v2 import SpeechSynthesizer, AudioFormat, ResultCallback
model = tts_config.get("model") or "cosyvoice-v2"
is_v2 = model.endswith("-v2")
# cosyvoice-v2 音色名带 _v2 后缀v1 不带
# 如果用户传入的 voice 不匹配当前模型版本,自动修正
if is_v2 and not voice.endswith("_v2"):
voice = voice + "_v2"
elif not is_v2 and voice.endswith("_v2"):
voice = voice[:-3] # 去掉 _v2
voice = voice[:-3]
def _sync_call() -> bytes:
queue: asyncio.Queue = asyncio.Queue()
loop = asyncio.get_event_loop()
class _Callback(ResultCallback):
def on_data(self, data: bytes):
if data:
loop.call_soon_threadsafe(queue.put_nowait, data)
def on_complete(self):
loop.call_soon_threadsafe(queue.put_nowait, None)
def on_error(self, message):
loop.call_soon_threadsafe(queue.put_nowait, RuntimeError(str(message)))
def on_open(self): pass
def on_close(self): pass
def _sync_stream():
dashscope.api_key = api_key
synthesizer = SpeechSynthesizer(
model=model,
voice=voice,
format=AudioFormat.MP3_22050HZ_MONO_256KBPS,
callback=_Callback(),
)
audio = synthesizer.call(text[:4096])
if audio is None:
raise RuntimeError("DashScope TTS 返回空音频")
return audio
synthesizer.streaming_call(text[:4096])
synthesizer.streaming_complete()
audio_bytes = await asyncio.to_thread(_sync_call)
return audio_bytes, ".mp3", "audio/mpeg"
asyncio.create_task(asyncio.to_thread(_sync_stream))
while True:
item = await queue.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
def _replace_variables(
self,

View File

@@ -9,7 +9,7 @@ and error handling.
import logging
import time
import uuid
from typing import Optional
from typing import AsyncIterator, Optional
from app.core.storage import StorageFactory, StorageBackend
from app.core.storage_exceptions import (
@@ -162,6 +162,31 @@ class FileStorageService:
cause=e,
)
async def upload_stream(
self,
tenant_id: uuid.UUID,
workspace_id: uuid.UUID | None,
file_id: uuid.UUID,
file_ext: str,
stream: AsyncIterator[bytes],
content_type: Optional[str] = None,
) -> int:
"""
Upload a file from an async byte stream.
Returns:
Total bytes written.
"""
file_key = generate_file_key(tenant_id, workspace_id, file_id, file_ext)
logger.info(f"Starting stream upload: file_key={file_key}, content_type={content_type}")
try:
total = await self.storage.upload_stream(file_key, stream, content_type)
logger.info(f"Stream upload successful: file_key={file_key}, size={total} bytes")
return total
except Exception as e:
logger.error(f"Stream upload failed: file_key={file_key}, error={str(e)}")
raise
async def download_file(self, file_key: str) -> bytes:
"""
Download a file from storage.

View File

@@ -1638,6 +1638,7 @@ class MultiAgentOrchestrator:
self.variables = config_data.get("variables", [])
self.tools = config_data.get("tools", {})
self.skills = config_data.get("skills", {})
self.features = config_data.get("features", {})
self.default_model_config_id = release.default_model_config_id
return AgentConfigProxy(release, app, config_data)

View File

@@ -636,30 +636,33 @@ class WorkflowService:
final_messages = result.get("messages", [])[init_message_length:]
human_message = ""
assistant_message = ""
human_meta = {
"files": []
}
for message in final_messages:
if message["role"] == "user":
if isinstance(message["content"], str):
human_message += message["content"]
elif isinstance(message["content"], list):
for file in message["content"]:
if file.get("type") == FileType.IMAGE:
human_message += f"![image]({file.get('url', '')})"
else:
human_message += f"[{file.get('type')}]({file.get('url', '')})"
human_meta["files"].append({
"type": file.get("type"),
"url": file.get("url")
})
if message["role"] == "assistant":
assistant_message = message["content"]
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role="user",
content=human_message,
meta_data=None
meta_data=human_meta
)
self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id_uuid,
role="assistant",
content=assistant_message,
meta_data={"usage": token_usage}
meta_data={"usage": token_usage, "audio_url": None}
)
self.update_execution_status(
execution.execution_id,
@@ -802,30 +805,33 @@ class WorkflowService:
final_messages = event.get("data", {}).get("messages", [])[init_message_length:]
human_message = ""
assistant_message = ""
human_meta = {
"files": []
}
for message in final_messages:
if message["role"] == "user":
if isinstance(message["content"], str):
human_message += message["content"]
elif isinstance(message["content"], list):
for file in message["content"]:
if file.get("type") == FileType.IMAGE:
human_message += f"![image]({file.get('url', '')})"
else:
human_message += f"[{file.get('type')}]({file.get('url', '')})"
human_meta["files"].append({
"type": file.get("type"),
"url": file.get("url")
})
if message["role"] == "assistant":
assistant_message = message["content"]
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role="user",
content=human_message,
meta_data=None
meta_data=human_meta
)
self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id_uuid,
role="assistant",
content=assistant_message,
meta_data={"usage": token_usage}
meta_data={"usage": token_usage, "audio_url": None}
)
self.update_execution_status(
execution.execution_id,

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-02 15:01:59
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-17 15:35:34
* @Last Modified time: 2026-03-19 13:41:26
*/
/**
@@ -42,7 +42,8 @@ const ButtonCheckbox: FC<ButtonCheckboxProps> = ({
icon,
checkedIcon,
children,
cicle = false
cicle = false,
disabled,
}) => {
// Listen to value changes and trigger side effects via onValueChange callback
useEffect(() => {
@@ -70,6 +71,7 @@ const ButtonCheckbox: FC<ButtonCheckboxProps> = ({
"rb:bg-[rgba(21,94,239,0.06)] rb:border-[rgba(21,94,239,0.25)] rb:hover:bg-[rgba(21,94,239,0.06)] rb:text-[#155EEF]": checked,
// Unchecked state: gray border and dark text
"rb:border-[#DFE4ED] rb:text-[#212332]": !checked,
"rb:opacity-65 rb:cursor-not-allowed!": disabled
})}
onClick={handleChange}
>

View File

@@ -2,15 +2,20 @@
* @Author: ZhaoYing
* @Date: 2025-12-10 16:46:17
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-18 20:48:03
* @Last Modified time: 2026-03-19 13:38:20
*/
import { type FC, useRef, useEffect, useState } from 'react'
import clsx from 'clsx'
import Markdown from '@/components/Markdown'
import type { ChatContentProps } from './types'
import { Spin, Divider, Space } from 'antd'
import { Spin, Divider, Space, Image, Flex } from 'antd'
import { SoundOutlined } from '@ant-design/icons'
const getFileUrl = (file: any) => {
return file.thumbUrl || file.url || (file.originFileObj ? URL.createObjectURL(file.originFileObj) : undefined)
}
/**
* Chat Content Display Component
* Responsible for rendering chat message list, supports different role message styles and auto-scrolling
@@ -54,8 +59,8 @@ const ChatContent: FC<ChatContentProps> = ({
const handleScroll = () => {
if (scrollContainerRef.current) {
const { scrollTop, scrollHeight, clientHeight } = scrollContainerRef.current;
// Consider user is at bottom if within 20px of the bottom
isScrolledToBottomRef.current = scrollHeight - scrollTop - clientHeight < 20;
// Consider user is at bottom if within 100px of the bottom
isScrolledToBottomRef.current = scrollHeight - scrollTop - clientHeight < 100;
}
};
@@ -83,11 +88,16 @@ const ChatContent: FC<ChatContentProps> = ({
// Auto-scroll if data length changed OR user is currently at bottom
if (data.length !== prevDataLengthRef.current || isScrolledToBottomRef.current) {
scrollContainerRef.current.scrollTop = scrollContainerRef.current.scrollHeight;
isScrolledToBottomRef.current = true;
}
prevDataLengthRef.current = data.length;
}
}, 0);
}, [data])
const handleDownload = (file: any) => {
window.open(getFileUrl(file), '_blank')
}
return (
<div ref={scrollContainerRef} className={clsx("rb:relative rb:overflow-y-auto", classNames)}>
{data.length === 0
@@ -108,48 +118,44 @@ const ChatContent: FC<ChatContentProps> = ({
{labelFormat(item)}
</div>
}
{item.meta_data?.files && item.meta_data?.files.length > 0 && <div>
{item.meta_data?.files && item.meta_data?.files.length > 0 && <Flex gap={8} vertical align="end">
{item.meta_data?.files?.map((file) => {
if (file.type.includes('image')) {
return (
<div key={file.url || file.uid} className={`rb:inline-block rb:group rb:relative rb:rounded-lg ${contentClassNames}`}>
<img src={file.url} alt={file.name} className="rb:w-full rb:max-w-80 rb:rounded-lg rb:object-cover rb:cursor-pointer" />
<Image src={getFileUrl(file)} alt={file.name} className="rb:w-full rb:max-w-80 rb:rounded-lg rb:object-cover rb:cursor-pointer" />
</div>
)
}
if (file.type.includes('video')) {
return (
<div key={file.url || file.uid} className="rb:w-45 rb:h-16 rb:inline-block rb:group rb:relative rb:rounded-lg">
<video src={file.url} controls className="rb:w-45 rb:h-16 rb:rounded-lg rb:object-cover rb:cursor-pointer" />
<div key={file.url || file.uid} className="rb:inline-block rb:group rb:relative rb:rounded-lg">
<video src={getFileUrl(file)} controls className="rb:max-w-80 rb:rounded-lg rb:object-cover rb:cursor-pointer" />
</div>
)
}
if (file.type.includes('audio')) {
return (
<div key={file.url || file.uid} className="rb:w-45 rb:h-16 rb:inline-flex rb:items-center rb:group rb:relative rb:rounded-lg rb:bg-[#F0F3F8] rb:py-2 rb:px-2.5 rb:gap-2">
<audio src={file.url} controls className="rb:w-45 rb:h-16" />
<div key={file.url || file.uid} className="rb:inline-flex rb:items-center rb:group rb:relative rb:rounded-lg rb:bg-[#F0F3F8] rb:py-2 rb:px-2.5 rb:gap-2">
<audio src={getFileUrl(file)} controls className="rb:max-w-80" />
</div>
)
}
return (
<div key={file.url || file.uid} className="rb:w-45 rb:text-[12px] rb:gap-2.5 rb:flex rb:items-center rb:group rb:relative rb:rounded-lg rb:bg-[#F0F3F8] rb:py-2 rb:px-2.5">
<div key={file.url || file.uid} className="rb:relative rb:rounded-lg rb:bg-[#F0F3F8] rb:p-1! rb:cursor-pointer" onClick={() => handleDownload(file)}>
{(file.type.includes('doc') || file.type.includes('docx') || file.type.includes('word') || file.type.includes('wordprocessingml.document')) && <div
className="rb:size-5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/word_disabled.svg')] rb:hover:bg-[url('@/assets/images/conversation/word.svg')]"
className="rb:size-10 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/word.svg')]"
></div>}
{(file.type.includes('pdf')) && <div
className="rb:size-5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/pdf_disabled.svg')] rb:hover:bg-[url('@/assets/images/conversation/pdf.svg')]"
className="rb:size-10 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/pdf.svg')]"
></div>}
{(file.type.includes('excel') || file.type.includes('spreadsheetml.sheet') || file.type.includes('csv')) && <div
className="rb:size-5 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/excel_disabled.svg')] rb:hover:bg-[url('@/assets/images/conversation/excel.svg')]"
className="rb:size-10 rb:cursor-pointer rb:bg-cover rb:bg-[url('@/assets/images/conversation/excel.svg')]"
></div>}
<div className="rb:flex-1 rb:w-32.5">
<div className="rb:leading-4 rb:text-ellipsis rb:overflow-hidden rb:whitespace-nowrap">{file.name}</div>
<div className="rb:leading-3.5 rb:mt-0.5 rb:text-[#5B6167] rb:text-ellipsis rb:overflow-hidden rb:whitespace-nowrap">{file.type} · {file.size}</div>
</div>
</div>
)
})}
</div>}
</Flex>}
{/* Message bubble */}
<div className={clsx('rb:border rb:text-left rb:rounded-lg rb:mt-1.5 rb:leading-4.5 rb:p-[10px_12px_2px_12px] rb:inline-block rb:max-w-130 rb:wrap-break-word', contentClassNames, {
// Error message style (content is null and not assistant message)

View File

@@ -136,7 +136,7 @@ const RbMarkdown: FC<RbMarkdownProps> = ({
/** Sync edit content when external content changes */
useEffect(() => {
setEditContent(content)
setEditContent(prev => prev !== content ? content : prev)
}, [content])
/** Handle textarea content changes and trigger callback */

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-03-05
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-18 20:29:28
* @Last Modified time: 2026-03-19 15:18:20
*/
import { forwardRef, useImperativeHandle, useState } from 'react';
import { Form, InputNumber, Flex, Switch, Row, Col, Radio } from 'antd';
@@ -27,22 +27,43 @@ const fileTypeOptions = [
{
type: 'document',
icon: <div className="rb:size-9 rb:bg-cover rb:bg-[url('@/assets/images/file/txt.svg')]"></div>,
formats: 'TXT, PDF, DOC, DOCX, XLSX, CSV, JSON',
formats: [
"pdf",
"docx",
"doc",
"xlsx",
"xls",
"txt",
"csv",
"json",
"md",
],
},
{
type: 'image',
icon: <div className="rb:size-9 rb:bg-cover rb:bg-[url('@/assets/images/file/image.svg')]"></div>,
formats: 'JPG, JPEG, PNG, GIF, WEBP',
formats: [
"png",
"jpg",
"jpeg"
],
},
{
type: 'audio',
icon: <div className="rb:size-9 rb:bg-cover rb:bg-[url('@/assets/images/file/audio.svg')]"></div>,
formats: 'MP3, M4A, WAV, OGG, FLAC',
formats: [
"mp3",
"wav",
"m4a",
],
},
{
type: 'video',
icon: <div className="rb:size-9 rb:bg-cover rb:bg-[url('@/assets/images/file/video.svg')]"></div>,
formats: 'MP4, MOV, AVI, WEBM',
formats: [
"mp4",
"mov",
],
},
];
@@ -50,16 +71,38 @@ const defaultValues: FileUpload = {
enabled: false,
image_enabled: false,
image_max_size_mb: 20,
image_allowed_extensions: ['png', 'jpg', 'jpeg', 'gif', 'webp'],
image_allowed_extensions: [
"png",
"jpg",
"jpeg"
],
audio_enabled: false,
audio_max_size_mb: 50,
audio_allowed_extensions: ['mp3', 'wav', 'm4a', 'ogg', 'flac'],
audio_allowed_extensions: [
"mp3",
"wav",
"m4a",
"ogg",
"flac"
],
document_enabled: false,
document_max_size_mb: 100,
document_allowed_extensions: ['pdf', 'docx', 'xlsx', 'txt', 'csv', 'json'],
document_allowed_extensions: [
"pdf",
"docx",
"xlsx",
"txt",
"csv",
"json"
],
video_enabled: false,
video_max_size_mb: 500,
video_allowed_extensions: ['mp4', 'mov', 'avi', 'webm'],
video_max_size_mb: 100,
video_allowed_extensions: [
"mp4",
"mov",
"avi",
"webm"
],
max_file_count: 5,
allowed_transfer_methods: 'both'
}
@@ -112,7 +155,6 @@ const FileUploadSettingModal = forwardRef<FileUploadSettingModalRef, FileUploadS
open={visible}
onCancel={handleClose}
onOk={handleSave}
width={600}
>
<Form form={form} layout="vertical" initialValues={defaultValues}>
<Form.Item
@@ -128,7 +170,7 @@ const FileUploadSettingModal = forwardRef<FileUploadSettingModalRef, FileUploadS
<div className="rb:text-[12px] rb:text-[#5B6167] rb:mb-1">{t('application.maxCount')}</div>
<Form.Item label={t('application.maxCount')} name="max_file_count">
<InputNumber min={1} max={100} precision={0} className="rb:w-full!" placeholder={t('common.pleaseEnter')} />
<InputNumber min={1} max={20} precision={0} className="rb:w-full!" placeholder={t('common.pleaseEnter')} />
</Form.Item>
<Form.Item label={t('application.supportedTypes')}>
@@ -150,7 +192,7 @@ const FileUploadSettingModal = forwardRef<FileUploadSettingModalRef, FileUploadS
<Flex align="center" justify="space-between">
<Flex vertical>
<div className="rb:font-medium">{t(`application.${option.type}`)}</div>
<div className="rb:text-[12px] rb:text-[#5B6167]">{option.formats}</div>
<div className="rb:text-[12px] rb:text-[#5B6167]">{option.formats.map(item => item.toUpperCase()).join(', ')}</div>
</Flex>
<Form.Item name={enabledKey} valuePropName="checked" noStyle>
<Switch />
@@ -162,7 +204,7 @@ const FileUploadSettingModal = forwardRef<FileUploadSettingModalRef, FileUploadS
<Flex align="center" gap={12} className="rb:mt-3! rb:pt-3! rb:border-t rb:border-[#DFE4ED]">
<div>{t('application.singleMaxSize')}: </div>
<Form.Item name={sizeKey} noStyle>
<InputNumber min={1} max={500} suffix="MB" className="rb:flex-1" />
<InputNumber min={1} max={100} suffix="MB" className="rb:flex-1" />
</Form.Item>
<Form.Item name={`${option.type}_allowed_extensions`} hidden />
</Flex>

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-03 16:58:03
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-18 20:54:00
* @Last Modified time: 2026-03-19 12:30:41
*/
/**
* Conversation Page
@@ -63,6 +63,7 @@ const Conversation: FC = () => {
const [isHasMemory, setIsHasMemory] = useState(false)
const [memory, setMemory] = useState(true)
const [features, setFeatures] = useState<FeaturesConfigForm>({} as FeaturesConfigForm)
const [config, setConfig] = useState<Record<string, any>>({})
useEffect(() => {
const shareToken = localStorage.getItem(`shareToken_${token}`)
@@ -88,6 +89,7 @@ const Conversation: FC = () => {
.then(res => {
const response = res as { variables: Variable[]; features: FeaturesConfigForm; app_type: string; memory?: boolean; }
toolbarRef.current?.setVariables(response.variables || [])
setConfig(response)
setFeatures(response.features)
setIsHasMemory((response.app_type === 'workflow' && response.memory) || (response.app_type !== 'workflow'))
})
@@ -284,6 +286,7 @@ const Conversation: FC = () => {
}
const handleChangeMemory = (value: boolean) => {
if (config.app_type === 'workflow') return;
modal.confirm({
title: value ? t('memoryConversation.memoryTipTitle') : t('memoryConversation.memoryCancelTipTitle'),
okText: t('common.confirm'),
@@ -388,6 +391,7 @@ const Conversation: FC = () => {
icon={MemoryFunctionIcon}
checkedIcon={MemoryFunctionCheckedIcon}
checked={memory}
disabled={config.app_type === 'workflow'}
onChange={handleChangeMemory}
>
{t('memoryConversation.memory')}