diff --git a/api/app/core/storage/oss.py b/api/app/core/storage/oss.py index 1db86fef..c6c6ec48 100644 --- a/api/app/core/storage/oss.py +++ b/api/app/core/storage/oss.py @@ -44,6 +44,8 @@ class OSSStorage(StorageBackend): access_key_id: str, access_key_secret: str, bucket_name: str, + connect_timeout: int = 30, + multipart_threshold: int = 10 * 1024 * 1024, # 10MB ): """ Initialize the OSSStorage backend. @@ -53,6 +55,8 @@ class OSSStorage(StorageBackend): access_key_id: The Aliyun access key ID. access_key_secret: The Aliyun access key secret. bucket_name: The name of the OSS bucket. + connect_timeout: Connection timeout in seconds (default: 30). + multipart_threshold: File size threshold for multipart upload (default: 10MB). Raises: StorageConfigError: If any required configuration is missing. @@ -69,10 +73,17 @@ class OSSStorage(StorageBackend): self.endpoint = endpoint self.bucket_name = bucket_name + self.multipart_threshold = multipart_threshold try: auth = oss2.Auth(access_key_id, access_key_secret) - self.bucket = oss2.Bucket(auth, endpoint, bucket_name) + # 设置超时和重试 + self.bucket = oss2.Bucket( + auth, + endpoint, + bucket_name, + connect_timeout=connect_timeout + ) logger.info( f"OSSStorage initialized with endpoint: {endpoint}, bucket: {bucket_name}" ) @@ -108,21 +119,38 @@ class OSSStorage(StorageBackend): if content_type: headers["Content-Type"] = content_type - self.bucket.put_object(file_key, content, headers=headers if headers else None) + # 大文件使用分片上传 + if len(content) > self.multipart_threshold: + logger.info(f"Using multipart upload for large file: {file_key} ({len(content)} bytes)") + upload_id = self.bucket.init_multipart_upload(file_key, headers=headers if headers else None).upload_id + parts = [] + part_size = 5 * 1024 * 1024 # 5MB per part + part_num = 1 + + for offset in range(0, len(content), part_size): + chunk = content[offset:offset + part_size] + result = self.bucket.upload_part(file_key, upload_id, part_num, chunk) + parts.append(oss2.models.PartInfo(part_num, result.etag)) + part_num += 1 + + self.bucket.complete_multipart_upload(file_key, upload_id, parts) + else: + self.bucket.put_object(file_key, content, headers=headers if headers else None) + logger.info(f"File uploaded to OSS successfully: {file_key}") return file_key except OssError as e: logger.error(f"OSS error uploading file {file_key}: {e}") raise StorageUploadError( - message=f"Failed to upload file to OSS: {e.message}", + message=f"Failed to upload file to OSS: {str(e)}", file_key=file_key, cause=e, ) except Exception as e: logger.error(f"Failed to upload file to OSS {file_key}: {e}") raise StorageUploadError( - message=f"Failed to upload file to OSS: {e}", + message=f"Failed to upload file to OSS: {str(e)}", file_key=file_key, cause=e, ) @@ -135,28 +163,73 @@ class OSSStorage(StorageBackend): ) -> int: """Upload from async stream to OSS. Returns total bytes written.""" buf = io.BytesIO() + headers = {"Content-Type": content_type} if content_type else None + upload_id = None + try: + # 收集流数据 + total_size = 0 async for chunk in stream: + if not chunk: + continue buf.write(chunk) + total_size += len(chunk) + content = buf.getvalue() - headers = {"Content-Type": content_type} if content_type else None - self.bucket.put_object(file_key, content, headers=headers) - logger.info(f"File stream uploaded to OSS successfully: {file_key}") - return len(content) + + if not content: + raise StorageUploadError( + message="Empty stream content", + file_key=file_key, + ) + + # 大文件使用分片上传 + if len(content) > self.multipart_threshold: + logger.info(f"Using multipart upload for stream: {file_key} ({len(content)} bytes)") + upload_id = self.bucket.init_multipart_upload(file_key, headers=headers).upload_id + parts = [] + part_size = 5 * 1024 * 1024 # 5MB + part_num = 1 + + for offset in range(0, len(content), part_size): + chunk = content[offset:offset + part_size] + result = self.bucket.upload_part(file_key, upload_id, part_num, chunk) + parts.append(oss2.models.PartInfo(part_num, result.etag)) + part_num += 1 + + self.bucket.complete_multipart_upload(file_key, upload_id, parts) + else: + self.bucket.put_object(file_key, content, headers=headers) + + logger.info(f"File stream uploaded to OSS successfully: {file_key} ({total_size} bytes)") + return total_size + except OssError as e: + if upload_id: + try: + self.bucket.abort_multipart_upload(file_key, upload_id) + except: + pass logger.error(f"OSS error stream uploading file {file_key}: {e}") raise StorageUploadError( - message=f"Failed to stream upload file to OSS: {e.message}", + message=f"Failed to stream upload file to OSS: {str(e)}", file_key=file_key, cause=e, ) except Exception as e: + if upload_id: + try: + self.bucket.abort_multipart_upload(file_key, upload_id) + except: + pass logger.error(f"Failed to stream upload file to OSS {file_key}: {e}") raise StorageUploadError( - message=f"Failed to stream upload file to OSS: {e}", + message=f"Failed to stream upload file to OSS: {str(e)}", file_key=file_key, cause=e, ) + finally: + buf.close() async def download(self, file_key: str) -> bytes: """ @@ -182,14 +255,14 @@ class OSSStorage(StorageBackend): except OssError as e: logger.error(f"OSS error downloading file {file_key}: {e}") raise StorageDownloadError( - message=f"Failed to download file from OSS: {e.message}", + message=f"Failed to download file from OSS: {str(e)}", file_key=file_key, cause=e, ) except Exception as e: logger.error(f"Failed to download file from OSS {file_key}: {e}") raise StorageDownloadError( - message=f"Failed to download file from OSS: {e}", + message=f"Failed to download file from OSS: {str(e)}", file_key=file_key, cause=e, ) @@ -215,14 +288,14 @@ class OSSStorage(StorageBackend): except OssError as e: logger.error(f"OSS error deleting file {file_key}: {e}") raise StorageDeleteError( - message=f"Failed to delete file from OSS: {e.message}", + message=f"Failed to delete file from OSS: {str(e)}", file_key=file_key, cause=e, ) except Exception as e: logger.error(f"Failed to delete file from OSS {file_key}: {e}") raise StorageDeleteError( - message=f"Failed to delete file from OSS: {e}", + message=f"Failed to delete file from OSS: {str(e)}", file_key=file_key, cause=e, ) diff --git a/api/app/models/conversation_model.py b/api/app/models/conversation_model.py index 4011247f..4ae9034d 100644 --- a/api/app/models/conversation_model.py +++ b/api/app/models/conversation_model.py @@ -57,6 +57,12 @@ class Conversation(Base): workspace = relationship("Workspace") messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan") + @property + def is_first_user_message(self): + """判断当前是否是用户的第一条消息(无视开场白)""" + user_message_count = sum(1 for msg in self.messages if msg.role == "user") + return user_message_count == 1 + class ConversationDetail(Base): __tablename__ = "conversation_details" diff --git a/api/app/schemas/app_schema.py b/api/app/schemas/app_schema.py index e34945eb..f1e9132f 100644 --- a/api/app/schemas/app_schema.py +++ b/api/app/schemas/app_schema.py @@ -276,7 +276,7 @@ class AgentConfigCreate(BaseModel): # 记忆配置 memory: MemoryConfig = Field( - default_factory=lambda: MemoryConfig(enabled=True), + default_factory=lambda: MemoryConfig(enabled=False), description="对话历史记忆配置" ) diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py index 90474428..bdccd787 100644 --- a/api/app/services/app_chat_service.py +++ b/api/app/services/app_chat_service.py @@ -140,13 +140,13 @@ class AppChatService: # 如果是新会话且有开场白,作为第一条 assistant 消息写入数据库 is_new_conversation = len(history) == 0 if is_new_conversation: - opening = self.agent_service._get_opening_statement(features_config, True, variables) + opening, suggested_questions = self.agent_service._get_opening_statement(features_config, True, variables) if opening: self.conversation_service.add_message( conversation_id=conversation_id, role="assistant", content=opening, - meta_data={} + meta_data={"suggested_questions": suggested_questions} ) # 重新加载历史(包含刚写入的开场白) history = await self.conversation_service.get_conversation_history( @@ -367,13 +367,13 @@ class AppChatService: # 如果是新会话且有开场白,作为第一条 assistant 消息写入数据库 is_new_conversation = len(history) == 0 if is_new_conversation: - opening = self.agent_service._get_opening_statement(features_config, True, variables) + opening, suggested_questions = self.agent_service._get_opening_statement(features_config, True, variables) if opening: self.conversation_service.add_message( conversation_id=conversation_id, role="assistant", content=opening, - meta_data={} + meta_data={"suggested_questions": suggested_questions} ) # 重新加载历史(包含刚写入的开场白) history = await self.conversation_service.get_conversation_history( diff --git a/api/app/services/app_service.py b/api/app/services/app_service.py index 736049e5..e1164206 100644 --- a/api/app/services/app_service.py +++ b/api/app/services/app_service.py @@ -1084,7 +1084,6 @@ class AppService: if not exists: cleaned["memory_config_id"] = None cleaned.pop("memory_content", None) - cleaned["enabled"] = False return cleaned exists = self.db.query( @@ -1096,7 +1095,6 @@ class AppService: if not exists: cleaned["memory_config_id"] = None cleaned.pop("memory_content", None) - cleaned["enabled"] = False return cleaned diff --git a/api/app/services/conversation_service.py b/api/app/services/conversation_service.py index 014d96b7..ecf316d9 100644 --- a/api/app/services/conversation_service.py +++ b/api/app/services/conversation_service.py @@ -214,14 +214,14 @@ class ConversationService: conversation.message_count += 1 - if conversation.message_count == 1 and role == "user": + self.db.commit() + self.db.refresh(message) + + if conversation.is_first_user_message and role == "user": conversation.title = ( content[:50] + ("..." if len(content) > 50 else "") ) - self.db.commit() - self.db.refresh(message) - logger.info( "Message added successfully", extra={ diff --git a/api/app/services/draft_run_service.py b/api/app/services/draft_run_service.py index e188872f..c658cf93 100644 --- a/api/app/services/draft_run_service.py +++ b/api/app/services/draft_run_service.py @@ -449,15 +449,16 @@ class AgentRunService: features_config: Dict[str, Any], is_new_conversation: bool, variables: Optional[Dict[str, Any]] = None - ) -> Optional[str]: + ) -> tuple[Any, Any]: """首轮对话时返回开场白文本(支持变量替换),否则返回 None""" if not is_new_conversation: - return None + return None, None opening = features_config.get("opening_statement", {}) if not (isinstance(opening, dict) and opening.get("enabled") and opening.get("statement")): - return None + return None, None statement = opening["statement"] + suggested_questions = opening["suggested_questions"] # 如果有变量,进行替换(仅支持 {{var_name}} 格式) if variables: @@ -465,7 +466,7 @@ class AgentRunService: placeholder = f"{{{{{var_name}}}}}" statement = statement.replace(placeholder, str(var_value)) - return statement + return statement, suggested_questions @staticmethod def _filter_citations( @@ -599,13 +600,16 @@ class AgentRunService: # 5. 处理会话ID(创建或验证),新会话时写入开场白 is_new_conversation = not conversation_id - opening = self._get_opening_statement(features_config, is_new_conversation, variables) + opening, suggested_questions = None, None + if not sub_agent: + opening, suggested_questions = self._get_opening_statement(features_config, is_new_conversation, variables) conversation_id = await self._ensure_conversation( conversation_id=conversation_id, app_id=agent_config.app_id, workspace_id=workspace_id, user_id=user_id, - opening_statement=opening + opening_statement=opening, + suggested_questions=suggested_questions ) model_info = ModelInfo( @@ -845,14 +849,17 @@ class AgentRunService: # 5. 处理会话ID(创建或验证),新会话时写入开场白 is_new_conversation = not conversation_id - opening = self._get_opening_statement(features_config, is_new_conversation, variables) + opening, suggested_questions = None, None + if not sub_agent: + opening, suggested_questions = self._get_opening_statement(features_config, is_new_conversation, variables) conversation_id = await self._ensure_conversation( conversation_id=conversation_id, app_id=agent_config.app_id, workspace_id=workspace_id, user_id=user_id, sub_agent=sub_agent, - opening_statement=opening + opening_statement=opening, + suggested_questions=suggested_questions ) model_info = ModelInfo( @@ -1061,7 +1068,8 @@ class AgentRunService: workspace_id: uuid.UUID, user_id: Optional[str], sub_agent: bool = False, - opening_statement: Optional[str] = None + opening_statement: Optional[str] = None, + suggested_questions: Optional[List[str]] = None ) -> str: """确保会话存在(创建或验证) @@ -1072,6 +1080,7 @@ class AgentRunService: user_id: 用户ID sub_agent: 是否为子代理 opening_statement: 开场白(新会话时作为第一条消息写入) + suggested_questions: 预设问题列表 Returns: str: 会话ID @@ -1115,7 +1124,7 @@ class AgentRunService: conversation_id=uuid.UUID(new_conv_id), role="assistant", content=opening_statement, - meta_data={} + meta_data={"suggested_questions": suggested_questions} ) logger.debug(f"已保存开场白到会话 {new_conv_id}")