Files
MemoryBear/api/app/services/file_storage_service.py
2026-04-27 16:05:27 +08:00

366 lines
11 KiB
Python

"""
File storage service module.
This module provides a high-level service layer for file storage operations,
encapsulating the storage backend and providing file_key generation, logging,
and error handling.
"""
import logging
import time
import uuid
from typing import AsyncIterator, Optional
from app.core.storage import StorageFactory, StorageBackend
from app.core.storage_exceptions import (
StorageError,
StorageUploadError,
StorageDownloadError,
StorageDeleteError,
)
from app.core.logging_config import get_business_logger
# Obtain a dedicated logger for business logic
logger = get_business_logger()
def generate_file_key(
tenant_id: uuid.UUID,
workspace_id: uuid.UUID | None,
file_id: uuid.UUID,
file_ext: str,
) -> str:
"""
Generate a unique file key for storage.
The file key follows the format: {tenant_id}/{workspace_id}/{file_id}{file_ext}
"""
if file_ext and not file_ext.startswith('.'):
file_ext = f'.{file_ext}'
if workspace_id:
return f"{tenant_id}/{workspace_id}/{file_id}{file_ext}"
return f"{tenant_id}/{file_id}{file_ext}"
def generate_kb_file_key(
kb_id: uuid.UUID,
file_id: uuid.UUID,
file_ext: str,
) -> str:
"""
Generate a file key for knowledge base files.
Format: kb/{kb_id}/{file_id}{file_ext}
"""
if file_ext and not file_ext.startswith('.'):
file_ext = f'.{file_ext}'
return f"kb/{kb_id}/{file_id}{file_ext}"
class FileStorageService:
"""
High-level service for file storage operations.
This service encapsulates the storage backend and provides:
- File key generation
- Upload, download, delete operations
- Comprehensive logging
- Error handling with meaningful messages
"""
def __init__(self, storage: Optional[StorageBackend] = None):
"""
Initialize the file storage service.
Args:
storage: Optional storage backend instance. If not provided,
the default storage backend from StorageFactory is used.
"""
self._storage = storage
@property
def storage(self) -> StorageBackend:
"""
Get the storage backend instance (lazy initialization).
Returns:
The storage backend instance.
"""
if self._storage is None:
self._storage = StorageFactory.get_storage()
return self._storage
async def upload_file(
self,
tenant_id: uuid.UUID,
workspace_id: uuid.UUID | None,
file_id: uuid.UUID,
file_ext: str,
content: bytes,
content_type: Optional[str] = None,
) -> str:
"""
Upload a file to storage.
Args:
tenant_id: The tenant UUID.
workspace_id: The workspace UUID.
file_id: The file UUID.
file_ext: The file extension.
content: The file content as bytes.
content_type: Optional MIME type of the file.
Returns:
The file key of the uploaded file.
Raises:
StorageUploadError: If the upload operation fails.
"""
file_key = generate_file_key(tenant_id, workspace_id, file_id, file_ext)
start_time = time.time()
logger.info(
f"Starting file upload: file_key={file_key}, "
f"size={len(content)} bytes, content_type={content_type}"
)
try:
await self.storage.upload(file_key, content, content_type)
elapsed_time = time.time() - start_time
logger.info(
f"File upload successful: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s"
)
return file_key
except StorageError as e:
elapsed_time = time.time() - start_time
logger.error(
f"File upload failed: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageUploadError(
message=f"Failed to upload file: {str(e)}",
file_key=file_key,
cause=e,
)
except Exception as e:
elapsed_time = time.time() - start_time
logger.error(
f"Unexpected error during file upload: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageUploadError(
message=f"Unexpected error during upload: {str(e)}",
file_key=file_key,
cause=e,
)
async def upload_stream(
self,
tenant_id: uuid.UUID,
workspace_id: uuid.UUID | None,
file_id: uuid.UUID,
file_ext: str,
stream: AsyncIterator[bytes],
content_type: Optional[str] = None,
) -> int:
"""
Upload a file from an async byte stream.
Returns:
Total bytes written.
"""
file_key = generate_file_key(tenant_id, workspace_id, file_id, file_ext)
logger.info(f"Starting stream upload: file_key={file_key}, content_type={content_type}")
try:
total = await self.storage.upload_stream(file_key, stream, content_type)
logger.info(f"Stream upload successful: file_key={file_key}, size={total} bytes")
return total
except Exception as e:
logger.error(f"Stream upload failed: file_key={file_key}, error={str(e)}")
raise
async def download_file(self, file_key: str) -> bytes:
"""
Download a file from storage.
Args:
file_key: The file key of the file to download.
Returns:
The file content as bytes.
Raises:
FileNotFoundError: If the file does not exist.
StorageDownloadError: If the download operation fails.
"""
start_time = time.time()
logger.info(f"Starting file download: file_key={file_key}")
try:
content = await self.storage.download(file_key)
elapsed_time = time.time() - start_time
logger.info(
f"File download successful: file_key={file_key}, "
f"size={len(content)} bytes, elapsed_time={elapsed_time:.3f}s"
)
return content
except FileNotFoundError:
elapsed_time = time.time() - start_time
logger.warning(
f"File not found: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s"
)
raise
except StorageError as e:
elapsed_time = time.time() - start_time
logger.error(
f"File download failed: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageDownloadError(
message=f"Failed to download file: {str(e)}",
file_key=file_key,
cause=e,
)
except Exception as e:
elapsed_time = time.time() - start_time
logger.error(
f"Unexpected error during file download: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageDownloadError(
message=f"Unexpected error during download: {str(e)}",
file_key=file_key,
cause=e,
)
async def delete_file(self, file_key: str) -> bool:
"""
Delete a file from storage.
Args:
file_key: The file key of the file to delete.
Returns:
True if the file was deleted, False if it didn't exist.
Raises:
StorageDeleteError: If the delete operation fails.
"""
start_time = time.time()
logger.info(f"Starting file deletion: file_key={file_key}")
try:
result = await self.storage.delete(file_key)
elapsed_time = time.time() - start_time
if result:
logger.info(
f"File deletion successful: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s"
)
else:
logger.info(
f"File did not exist: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s"
)
return result
except StorageError as e:
elapsed_time = time.time() - start_time
logger.error(
f"File deletion failed: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageDeleteError(
message=f"Failed to delete file: {str(e)}",
file_key=file_key,
cause=e,
)
except Exception as e:
elapsed_time = time.time() - start_time
logger.error(
f"Unexpected error during file deletion: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageDeleteError(
message=f"Unexpected error during deletion: {str(e)}",
file_key=file_key,
cause=e,
)
async def file_exists(self, file_key: str) -> bool:
"""
Check if a file exists in storage.
Args:
file_key: The file key to check.
Returns:
True if the file exists, False otherwise.
"""
logger.debug(f"Checking file existence: file_key={file_key}")
try:
exists = await self.storage.exists(file_key)
logger.debug(f"File existence check: file_key={file_key}, exists={exists}")
return exists
except Exception as e:
logger.error(
f"Error checking file existence: file_key={file_key}, error={str(e)}"
)
raise
async def get_file_url(
self,
file_key: str,
expires: int = 3600,
file_name: Optional[str] = None,
) -> str:
"""
Get an access URL for a file.
Args:
file_key: The file key.
expires: URL validity period in seconds (default: 1 hour).
file_name: If set, adds Content-Disposition: attachment to force download.
Returns:
URL for accessing the file.
"""
logger.debug(f"Getting file URL: file_key={file_key}, expires={expires}s")
try:
url = await self.storage.get_url(file_key, expires, file_name=file_name)
logger.debug(f"File URL generated: file_key={file_key}")
return url
except Exception as e:
logger.error(f"Error getting file URL: file_key={file_key}, error={str(e)}")
raise
# Create a default instance for convenience
_default_service: Optional[FileStorageService] = None
def get_file_storage_service() -> FileStorageService:
"""
Get the default file storage service instance.
Returns:
The default FileStorageService instance.
"""
global _default_service
if _default_service is None:
_default_service = FileStorageService()
return _default_service