diff --git a/api/app/controllers/file_storage_controller.py b/api/app/controllers/file_storage_controller.py index 55149cce..14962a72 100644 --- a/api/app/controllers/file_storage_controller.py +++ b/api/app/controllers/file_storage_controller.py @@ -14,6 +14,9 @@ Routes: import os import uuid from typing import Any +import httpx +import mimetypes +from urllib.parse import urlparse, unquote from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile, status from fastapi.responses import FileResponse, RedirectResponse @@ -290,6 +293,101 @@ async def upload_file_with_share_token( ) +@router.get("/files/info-by-url", response_model=ApiResponse) +async def get_file_info_by_url( + url: str, +): + """ + Get file information by network URL (no authentication required). + + Fetches file metadata from a remote URL via HTTP HEAD request. + Falls back to GET request if HEAD is not supported. + Returns file type, name, and size. + + Args: + url: The network URL of the file. + + Returns: + ApiResponse with file information. + """ + api_logger.info(f"File info by URL request: url={url}") + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + # Try HEAD request first + response = await client.head(url, follow_redirects=True) + + # If HEAD fails, try GET request (some servers don't support HEAD) + if response.status_code != 200: + api_logger.info(f"HEAD request failed with {response.status_code}, trying GET request") + response = await client.get(url, follow_redirects=True) + + if response.status_code != 200: + api_logger.error(f"Failed to fetch file info: HTTP {response.status_code}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Unable to access file: HTTP {response.status_code}" + ) + + # Get file size from Content-Length header or actual content + file_size = response.headers.get("Content-Length") + if file_size: + file_size = int(file_size) + elif hasattr(response, 'content'): + file_size = len(response.content) + else: + file_size = None + + # Get content type from Content-Type header + content_type = response.headers.get("Content-Type", "application/octet-stream") + # Remove charset and other parameters from content type + content_type = content_type.split(';')[0].strip() + + # Extract filename from Content-Disposition or URL + file_name = None + content_disposition = response.headers.get("Content-Disposition") + if content_disposition and "filename=" in content_disposition: + parts = content_disposition.split("filename=") + if len(parts) > 1: + file_name = parts[1].strip('"').strip("'") + + if not file_name: + parsed_url = urlparse(url) + file_name = unquote(os.path.basename(parsed_url.path)) or "unknown" + + # Extract file extension from filename + _, file_ext = os.path.splitext(file_name) + + # If no extension found, infer from content type + if not file_ext: + ext = mimetypes.guess_extension(content_type) + if ext: + file_ext = ext + file_name = f"{file_name}{file_ext}" + + api_logger.info(f"File info retrieved: name={file_name}, size={file_size}, type={content_type}") + + return success( + data={ + "url": url, + "file_name": file_name, + "file_ext": file_ext.lower() if file_ext else "", + "file_size": file_size, + "content_type": content_type, + }, + msg="File information retrieved successfully" + ) + + except HTTPException: + raise + except Exception as e: + api_logger.error(f"Unexpected error: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to retrieve file information: {str(e)}" + ) + + @router.get("/files/{file_id}", response_model=Any) async def download_file( request: Request, @@ -697,3 +795,44 @@ async def permanent_download_file( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve file: {str(e)}" ) + + +@router.get("/files/{file_id}/status", response_model=ApiResponse) +async def get_file_status( + file_id: uuid.UUID, + db: Session = Depends(get_db), +): + """ + Get file upload/processing status (no authentication required). + + This endpoint is used to check if a file (e.g., TTS audio) is ready. + Returns status: pending, completed, or failed. + + Args: + file_id: The UUID of the file. + db: Database session. + + Returns: + ApiResponse with file status and metadata. + """ + api_logger.info(f"File status request: file_id={file_id}") + + # Query file metadata from database + file_metadata = db.query(FileMetadata).filter(FileMetadata.id == file_id).first() + if not file_metadata: + api_logger.warning(f"File not found in database: file_id={file_id}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="The file does not exist" + ) + + return success( + data={ + "file_id": str(file_id), + "status": file_metadata.status, + "file_name": file_metadata.file_name, + "file_size": file_metadata.file_size, + "content_type": file_metadata.content_type, + }, + msg="File status retrieved successfully" + ) diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py index 6fcf680b..645de979 100644 --- a/api/app/services/app_chat_service.py +++ b/api/app/services/app_chat_service.py @@ -129,39 +129,12 @@ class AppChatService: ) # 加载历史消息 - messages = self.conversation_service.get_messages( + history = self.conversation_service.get_conversation_history( conversation_id=conversation_id, - limit=10 + max_history=10, + current_provider=api_key_obj.provider, + current_is_omni=api_key_obj.is_omni ) - history = [] - for msg in messages: - content = [{"type": "text", "text": msg.content}] - - # 处理 meta_data 中的 files - if msg.meta_data and msg.meta_data.get("files"): - files = msg.meta_data.get("files", []) - # 使用 MultimodalService 处理文件 - multimodal_service = MultimodalService(self.db, api_config=model_info) - - # 将 files 转换为 FileInput 格式 - file_inputs = [] - for file in files: - from app.schemas.app_schema import FileInput, TransferMethod - file_input = FileInput( - type=file.get("type"), - transfer_method=TransferMethod.REMOTE_URL, - url=file.get("url") - ) - file_inputs.append(file_input) - - history_processed_files = await multimodal_service.history_process_files(files=file_inputs) - - content.extend(history_processed_files) - - history.append({ - "role": msg.role, - "content": content - }) # 处理多模态文件 processed_files = None @@ -206,7 +179,8 @@ class AppChatService: # 构建用户消息内容(含多模态文件) human_meta = { - "files": [] + "files": [], + "history_files": {} } assistant_meta = { "model": api_key_obj.model_name, @@ -221,6 +195,13 @@ class AppChatService: "url": f.url }) + if processed_files: + human_meta["history_files"] = { + "content": processed_files, + "provider": api_key_obj.provider, + "is_omni": api_key_obj.is_omni + } + # 保存消息 if audio_url: assistant_meta["audio_url"] = audio_url @@ -251,6 +232,7 @@ class AppChatService: "suggested_questions": suggested_questions, "citations": self.agent_service._filter_citations(features_config, result.get("citations", [])), "audio_url": audio_url, + "audio_status": "pending" } async def agnet_chat_stream( @@ -350,39 +332,12 @@ class AppChatService: ) # 加载历史消息 - messages = self.conversation_service.get_messages( + history = self.conversation_service.get_conversation_history( conversation_id=conversation_id, - limit=10 + max_history=10, + current_provider=api_key_obj.provider, + current_is_omni=api_key_obj.is_omni ) - history = [] - for msg in messages: - content = [{"type": "text", "text": msg.content}] - - # 处理 meta_data 中的 files - if msg.meta_data and msg.meta_data.get("files"): - history_files = msg.meta_data.get("files", []) - # 使用 MultimodalService 处理文件 - multimodal_service = MultimodalService(self.db, api_config=model_info) - - # 将 files 转换为 FileInput 格式 - file_inputs = [] - for file in history_files: - from app.schemas.app_schema import FileInput, TransferMethod - file_input = FileInput( - type=file.get("type"), - transfer_method=TransferMethod.REMOTE_URL, - url=file.get("url") - ) - file_inputs.append(file_input) - - history_processed_files = await multimodal_service.history_process_files(files=file_inputs) - - content.extend(history_processed_files) - - history.append({ - "role": msg.role, - "content": content - }) # 处理多模态文件 processed_files = None @@ -433,7 +388,7 @@ class AppChatService: elapsed_time = time.time() - start_time ModelApiKeyService.record_api_key_usage(self.db, api_key_obj.id) - # 发送结束事件(包含 suggested_questions、tts、citations) + # 发送结束事件(包含 suggested_questions、tts、audio_status、citations) end_data: dict = {"elapsed_time": elapsed_time, "message_length": len(full_content), "error": None} sq_config = features_config.get("suggested_questions_after_answer", {}) if isinstance(sq_config, dict) and sq_config.get("enabled"): @@ -443,11 +398,23 @@ class AppChatService: "api_base": api_key_obj.api_base}, {} ) end_data["audio_url"] = stream_audio_url + # 检查TTS是否已完成(非阻塞,不取消任务) + audio_status = "pending" + if tts_task is not None and tts_task.done(): + # 任务已完成,检查是否有异常 + try: + tts_task.result() + audio_status = "completed" + except Exception as e: + logger.warning(f"TTS任务异常: {e}") + audio_status = "failed" + end_data["audio_status"] = audio_status if stream_audio_url else None end_data["citations"] = self.agent_service._filter_citations(features_config, []) # 保存消息 human_meta = { - "files":[] + "files":[], + "history_files": {} } assistant_meta = { "model": api_key_obj.model_name, @@ -457,11 +424,16 @@ class AppChatService: if files: for f in files: - # url = await MultimodalService(self.db).get_file_url(f) human_meta["files"].append({ "type": f.type, "url": f.url }) + if processed_files: + human_meta["history_files"] = { + "content": processed_files, + "provider": api_key_obj.provider, + "is_omni": api_key_obj.is_omni + } if stream_audio_url: assistant_meta["audio_url"] = stream_audio_url diff --git a/api/app/services/conversation_service.py b/api/app/services/conversation_service.py index f8a01a40..014d96b7 100644 --- a/api/app/services/conversation_service.py +++ b/api/app/services/conversation_service.py @@ -274,7 +274,8 @@ class ConversationService: self, conversation_id: uuid.UUID, max_history: Optional[int] = None, - api_config: Optional[ModelInfo] = None + current_provider: Optional[str] = None, + current_is_omni: Optional[bool] = None ) -> List[dict]: """ Retrieve historical conversation messages formatted as dictionaries. @@ -282,7 +283,8 @@ class ConversationService: Args: conversation_id (uuid.UUID): Conversation UUID. max_history (Optional[int]): Maximum number of messages to retrieve. - api_config (Optional[ModelInfo]): Model API configuration for multimodal processing. + current_provider (Optional[str]): Current provider for file handling. + current_is_omni (Optional[bool]): Current omni flag for file handling. Returns: List[dict]: List of message dictionaries with keys 'role' and 'content'. @@ -292,38 +294,30 @@ class ConversationService: limit=max_history ) - # 转换为字典格式 history = [] for msg in messages: - content = [{"type": "text", "text": msg.content}] - - # 处理 meta_data 中的 files - if msg.meta_data and msg.meta_data.get("files"): - files = msg.meta_data.get("files", []) - if api_config: - # 使用 MultimodalService 处理文件 - from app.services.multimodal_service import MultimodalService - multimodal_service = MultimodalService(self.db, api_config=api_config) - - # 将 files 转换为 FileInput 格式 - file_inputs = [] - for file in files: - from app.schemas.app_schema import FileInput, TransferMethod - file_input = FileInput( - type=file.get("type"), - transfer_method=TransferMethod.REMOTE_URL, - url=file.get("url") - ) - file_inputs.append(file_input) - - processed_files = await multimodal_service.history_process_files(files=file_inputs) - - content.extend(processed_files) - - history.append({ + msg_dict = { "role": msg.role, - "content": content - }) + "content": [{"type": "text", "text": msg.content}] + } + + # 处理用户消息中的多模态文件 + if msg.role == "user" and msg.meta_data: + history_files = msg.meta_data.get("history_files", {}) + + if history_files and current_provider and current_is_omni is not None: + # 检查是否需要重新处理文件 + stored_provider = history_files.get("provider") + stored_is_omni = history_files.get("is_omni") + + # 如果provider或is_omni不匹配,需要重新处理 + if stored_provider != current_provider or stored_is_omni != current_is_omni: + continue + + # provider和is_omni匹配,直接使用存储的内容 + msg_dict["content"].extend(history_files.get("content")) + + history.append(msg_dict) return history @@ -539,6 +533,7 @@ class ConversationService: provider = api_config.provider api_key = api_config.api_key api_base = api_config.api_base + is_omni = api_config.is_omni model_type = config.type llm = RedBearLLM( @@ -546,7 +541,8 @@ class ConversationService: model_name=model_name, provider=provider, api_key=api_key, - base_url=api_base + base_url=api_base, + is_omni=is_omni ), type=ModelType(model_type) ) @@ -554,15 +550,8 @@ class ConversationService: conversation_messages = await self.get_conversation_history( conversation_id=conversation_id, max_history=20, - api_config=ModelInfo( - model_name=model_name, - provider=provider, - api_key=api_key, - api_base=api_base, - capability=api_config.capability, - is_omni=api_config.is_omni, - model_type=model_type - ) + current_provider=provider, + current_is_omni=is_omni ) if len(conversation_messages) == 0: return ConversationOut( diff --git a/api/app/services/draft_run_service.py b/api/app/services/draft_run_service.py index 5989f0f8..88a62ee8 100644 --- a/api/app/services/draft_run_service.py +++ b/api/app/services/draft_run_service.py @@ -592,8 +592,9 @@ class AgentRunService: # 6. 加载历史消息 history = await self._load_conversation_history( conversation_id=conversation_id, - api_config=model_info, - max_history=10 + max_history=10, + current_provider=api_key_config.get("provider"), + current_is_omni=api_key_config.get("is_omni", False) ) # 6. 处理多模态文件 @@ -661,7 +662,10 @@ class AgentRunService: }) }, files=files, - audio_url=audio_url + processed_files=processed_files, + audio_url=audio_url, + provider=api_key_config.get("provider"), + is_omni=api_key_config.get("is_omni", False) ) response = { @@ -678,6 +682,7 @@ class AgentRunService: ) if not sub_agent else [], "citations": self._filter_citations(features_config, result.get("citations", [])), "audio_url": audio_url, + "audio_status": "pending" } logger.info( @@ -830,8 +835,9 @@ class AgentRunService: # 6. 加载历史消息 history = await self._load_conversation_history( conversation_id=conversation_id, - api_config=model_info, - max_history=memory_config.get("max_history", 10) + max_history=memory_config.get("max_history", 10), + current_provider=api_key_config.get("provider"), + current_is_omni=api_key_config.get("is_omni", False) ) # 6. 处理多模态文件 @@ -909,10 +915,13 @@ class AgentRunService: "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": total_tokens} }, files=files, - audio_url=stream_audio_url + processed_files=processed_files, + audio_url=stream_audio_url, + provider=api_key_config.get("provider"), + is_omni=api_key_config.get("is_omni", False) ) - # 12. 发送结束事件(包含 suggested_questions 和 tts) + # 12. 发送结束事件(包含 suggested_questions、audio_url 和 audio_status) end_data: Dict[str, Any] = { "conversation_id": conversation_id, "elapsed_time": elapsed_time, @@ -923,6 +932,17 @@ class AgentRunService: features_config, full_content, api_key_config, effective_params ) end_data["audio_url"] = stream_audio_url + # 检查TTS是否已完成(非阻塞,不取消任务) + audio_status = "pending" + if tts_task is not None and tts_task.done(): + # 任务已完成,检查是否有异常 + try: + tts_task.result() + audio_status = "completed" + except Exception as e: + logger.warning(f"TTS任务异常: {e}") + audio_status = "failed" + end_data["audio_status"] = audio_status if stream_audio_url else None end_data["citations"] = self._filter_citations(features_config, []) yield self._format_sse_event("end", end_data) @@ -1119,14 +1139,17 @@ class AgentRunService: async def _load_conversation_history( self, conversation_id: str, - api_config: ModelInfo | None = None, - max_history: int = 10 + max_history: int = 10, + current_provider: Optional[str] = None, + current_is_omni: Optional[bool] = None ) -> List[Dict[str, str]]: - """加载会话历史消息 + """加载会话历史消息,并根据当前模型配置处理多模态文件 Args: conversation_id: 会话ID max_history: 最大历史消息数量 + current_provider: 当前模型的provider + current_is_omni: 当前模型的is_omni Returns: List[Dict]: 历史消息列表 @@ -1138,7 +1161,8 @@ class AgentRunService: history = await conversation_service.get_conversation_history( conversation_id=uuid.UUID(conversation_id), max_history=max_history, - api_config=api_config + current_provider=current_provider, + current_is_omni=current_is_omni ) logger.debug( @@ -1166,7 +1190,10 @@ class AgentRunService: app_id: Optional[uuid.UUID] = None, user_id: Optional[str] = None, files: Optional[List[FileInput]] = None, - audio_url: Optional[str] = None + processed_files: Optional[List[Dict[str, Any]]] = None, + audio_url: Optional[str] = None, + provider: Optional[str] = None, + is_omni: Optional[bool] = None ) -> None: """保存会话消息(会话已通过 _ensure_conversation 确保存在) @@ -1177,6 +1204,11 @@ class AgentRunService: app_id: 应用ID(未使用,保留用于兼容性) user_id: 用户ID(未使用,保留用于兼容性) meta_data: token消耗 + files: 原始文件输入 + processed_files: 处理后的文件 + audio_url: 音频URL + provider: 模型供应商 + is_omni: 是否为全模态模型 """ try: from app.services.conversation_service import ConversationService @@ -1186,15 +1218,24 @@ class AgentRunService: # 保存消息(会话已经存在) human_meta = { - "files": [] + "files": [], + "history_files": {} } if files: for f in files: - # url = await MultimodalService(self.db).get_file_url(f) human_meta["files"].append({ "type": f.type, "url": f.url }) + + # 保存 history_files,包含 provider 和 is_omni 信息 + if processed_files: + human_meta["history_files"] = { + "content": processed_files, + "provider": provider, + "is_omni": is_omni + } + # 保存用户消息 conversation_service.add_message( conversation_id=conv_uuid, @@ -1420,8 +1461,9 @@ class AgentRunService: workspace_id: Optional[uuid.UUID] = None, ) -> tuple[Optional[str], Optional[asyncio.Task]]: """文本流式输入并行合成音频。 - 返回 (audio_url, task),audio_url 立即可用,task 完成后文件内容就绪。 + 返回 (audio_url, task),audio_url 立即可用(pending状态),task 完成后文件内容就绪。 调用方向 text_queue put 文本 chunk,结束时 put None。 + 前端可通过 GET /storage/files/{file_id}/status 轮询检查音频是否就绪。 """ tts_config = features_config.get("text_to_speech", {}) if not isinstance(tts_config, dict) or not tts_config.get("enabled"): @@ -1808,6 +1850,7 @@ class AgentRunService: ), "cost_estimate": self._estimate_cost(usage, model_info["model_config"]), "audio_url": result.get("audio_url"), + "audio_status": result.get("audio_status"), "citations": result.get("citations", []), "suggested_questions": result.get("suggested_questions", []), "error": None @@ -1885,6 +1928,7 @@ class AgentRunService: "results": [{ **r, "audio_url": r.get("audio_url"), + "audio_status": r.get("audio_status"), "citations": r.get("citations", []), "suggested_questions": r.get("suggested_questions", []), } for r in results], @@ -2016,6 +2060,7 @@ class AgentRunService: full_content = "" returned_conversation_id = model_conversation_id audio_url = None + audio_status = None citations = [] suggested_questions = [] @@ -2074,6 +2119,7 @@ class AgentRunService: # 从 end 事件中提取 features 输出字段 if event_type == "end" and event_data: audio_url = event_data.get("audio_url") + audio_status = event_data.get("audio_status") citations = event_data.get("citations", []) suggested_questions = event_data.get("suggested_questions", []) @@ -2103,6 +2149,7 @@ class AgentRunService: "message": full_content, "elapsed_time": elapsed, "audio_url": audio_url, + "audio_status": audio_status, "citations": citations, "suggested_questions": suggested_questions, "error": None @@ -2117,6 +2164,7 @@ class AgentRunService: "elapsed_time": elapsed, "message_length": len(full_content), "audio_url": audio_url, + "audio_status": audio_status, "citations": citations, "suggested_questions": suggested_questions, "timestamp": time.time() @@ -2253,6 +2301,7 @@ class AgentRunService: "message": r.get("message"), "elapsed_time": r.get("elapsed_time", 0), "audio_url": r.get("audio_url"), + "audio_status": r.get("audio_status"), "citations": r.get("citations", []), "suggested_questions": r.get("suggested_questions", []), "error": r.get("error")