Merge remote-tracking branch 'origin/develop' into develop

This commit is contained in:
lixinyue
2026-01-22 12:11:17 +08:00
26 changed files with 3737 additions and 1520 deletions

View File

@@ -14,6 +14,7 @@ from . import (
emotion_config_controller,
emotion_controller,
file_controller,
file_storage_controller,
home_page_controller,
implicit_memory_controller,
knowledge_controller,
@@ -88,5 +89,6 @@ manager_router.include_router(home_page_controller.router)
manager_router.include_router(implicit_memory_controller.router)
manager_router.include_router(memory_perceptual_controller.router)
manager_router.include_router(memory_working_controller.router)
manager_router.include_router(file_storage_controller.router)
__all__ = ["manager_router"]

View File

@@ -24,7 +24,7 @@ from app.schemas.emotion_schema import (
)
from app.schemas.response_schema import ApiResponse
from app.services.emotion_analytics_service import EmotionAnalyticsService
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status,Header
from sqlalchemy.orm import Session
# 获取API专用日志器
@@ -45,6 +45,7 @@ emotion_service = EmotionAnalyticsService()
@router.post("/tags", response_model=ApiResponse)
async def get_emotion_tags(
request: EmotionTagsRequest,
language_type: str = Header(default="zh", alias="X-Language-Type"),
current_user: User = Depends(get_current_user),
):
@@ -96,6 +97,7 @@ async def get_emotion_tags(
@router.post("/wordcloud", response_model=ApiResponse)
async def get_emotion_wordcloud(
request: EmotionWordcloudRequest,
language_type: str = Header(default="zh", alias="X-Language-Type"),
current_user: User = Depends(get_current_user),
):
@@ -142,6 +144,7 @@ async def get_emotion_wordcloud(
@router.post("/health", response_model=ApiResponse)
async def get_emotion_health(
request: EmotionHealthRequest,
language_type: str = Header(default="zh", alias="X-Language-Type"),
current_user: User = Depends(get_current_user),
):
@@ -196,6 +199,7 @@ async def get_emotion_health(
@router.post("/suggestions", response_model=ApiResponse)
async def get_emotion_suggestions(
request: EmotionSuggestionsRequest,
language_type: str = Header(default="zh", alias="X-Language-Type"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
@@ -232,7 +236,7 @@ async def get_emotion_suggestions(
)
return fail(
BizCode.NOT_FOUND,
"建议缓存不存在或已过期,请调用 /generate_suggestions 接口生成新建议",
"建议缓存不存在或已过期,请右上角刷新生成新建议",
""
)
@@ -261,6 +265,7 @@ async def get_emotion_suggestions(
@router.post("/generate_suggestions", response_model=ApiResponse)
async def generate_emotion_suggestions(
request: EmotionGenerateSuggestionsRequest,
language_type: str = Header(default="zh", alias="X-Language-Type"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):

View File

@@ -0,0 +1,412 @@
"""
File storage controller module.
This module provides API endpoints for file storage operations using the
configurable storage backend. It is a new controller that does not modify
the existing file_controller.py.
Routes:
POST /storage/files - Upload a file
GET /storage/files/{file_id} - Download a file
DELETE /storage/files/{file_id} - Delete a file
"""
import os
import uuid
from typing import Any
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
from fastapi.responses import FileResponse, RedirectResponse
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.logging_config import get_api_logger
from app.core.response_utils import success
from app.core.storage import LocalStorage
from app.core.storage.url_signer import generate_signed_url, verify_signed_url
from app.core.storage_exceptions import (
StorageDeleteError,
StorageUploadError,
)
from app.db import get_db
from app.dependencies import get_current_user
from app.models.file_metadata_model import FileMetadata
from app.models.user_model import User
from app.schemas.response_schema import ApiResponse
from app.services.file_storage_service import (
FileStorageService,
generate_file_key,
get_file_storage_service,
)
api_logger = get_api_logger()
router = APIRouter(
prefix="/storage",
tags=["storage"]
)
@router.post("/files", response_model=ApiResponse)
async def upload_file(
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
storage_service: FileStorageService = Depends(get_file_storage_service),
):
"""
Upload a file to the configured storage backend.
"""
tenant_id = current_user.tenant_id
workspace_id = current_user.current_workspace_id
api_logger.info(
f"Storage upload request: tenant_id={tenant_id}, workspace_id={workspace_id}, "
f"filename={file.filename}, username={current_user.username}"
)
# Read file contents
contents = await file.read()
file_size = len(contents)
# Validate file size
if file_size == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The file is empty."
)
if file_size > settings.MAX_FILE_SIZE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"The file size exceeds the {settings.MAX_FILE_SIZE} byte limit"
)
# Extract file extension
_, file_extension = os.path.splitext(file.filename)
file_ext = file_extension.lower()
# Generate file_id and file_key
file_id = uuid.uuid4()
file_key = generate_file_key(
tenant_id=tenant_id,
workspace_id=workspace_id,
file_id=file_id,
file_ext=file_ext,
)
# Create file metadata record with pending status
file_metadata = FileMetadata(
id=file_id,
tenant_id=tenant_id,
workspace_id=workspace_id,
file_key=file_key,
file_name=file.filename,
file_ext=file_ext,
file_size=file_size,
content_type=file.content_type,
status="pending",
)
db.add(file_metadata)
db.commit()
db.refresh(file_metadata)
# Upload file to storage backend
try:
await storage_service.upload_file(
tenant_id=tenant_id,
workspace_id=workspace_id,
file_id=file_id,
file_ext=file_ext,
content=contents,
content_type=file.content_type,
)
# Update status to completed
file_metadata.status = "completed"
db.commit()
api_logger.info(f"File uploaded to storage: file_key={file_key}")
except StorageUploadError as e:
# Update status to failed
file_metadata.status = "failed"
db.commit()
api_logger.error(f"Storage upload failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"File storage failed: {str(e)}"
)
api_logger.info(f"File upload successful: {file.filename} (file_id: {file_id})")
return success(
data={"file_id": str(file_id), "file_key": file_key},
msg="File upload successful"
)
@router.get("/files/{file_id}", response_model=Any)
async def download_file(
file_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
storage_service: FileStorageService = Depends(get_file_storage_service),
) -> Any:
"""
Download a file from the configured storage backend.
"""
api_logger.info(f"Storage download request: file_id={file_id}")
# Query file metadata from database
file_metadata = db.query(FileMetadata).filter(FileMetadata.id == file_id).first()
if not file_metadata:
api_logger.warning(f"File not found in database: file_id={file_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The file does not exist"
)
if file_metadata.status != "completed":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File upload not completed, status: {file_metadata.status}"
)
file_key = file_metadata.file_key
storage = storage_service.storage
if isinstance(storage, LocalStorage):
full_path = storage._get_full_path(file_key)
if not full_path.exists():
api_logger.warning(f"File not found on disk: file_key={file_key}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="File not found (possibly deleted)"
)
api_logger.info(f"Serving local file: file_key={file_key}")
return FileResponse(
path=str(full_path),
filename=file_metadata.file_name,
media_type=file_metadata.content_type or "application/octet-stream"
)
else:
try:
presigned_url = await storage_service.get_file_url(file_key, expires=3600)
api_logger.info(f"Redirecting to presigned URL: file_key={file_key}")
return RedirectResponse(url=presigned_url, status_code=status.HTTP_302_FOUND)
except FileNotFoundError:
api_logger.warning(f"File not found in remote storage: file_key={file_key}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="File not found in storage"
)
except Exception as e:
api_logger.error(f"Failed to get presigned URL: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve file: {str(e)}"
)
@router.delete("/files/{file_id}", response_model=ApiResponse)
async def delete_file(
file_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
storage_service: FileStorageService = Depends(get_file_storage_service),
):
"""
Delete a file from the configured storage backend.
"""
api_logger.info(
f"Storage delete request: file_id={file_id}, username={current_user.username}"
)
# Query file metadata from database
file_metadata = db.query(FileMetadata).filter(FileMetadata.id == file_id).first()
if not file_metadata:
api_logger.warning(f"File not found in database: file_id={file_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The file does not exist"
)
file_key = file_metadata.file_key
# Delete file from storage
try:
deleted = await storage_service.delete_file(file_key)
if deleted:
api_logger.info(f"File deleted from storage: file_key={file_key}")
else:
api_logger.info(f"File did not exist in storage: file_key={file_key}")
except StorageDeleteError as e:
api_logger.error(f"Storage delete failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete file from storage: {str(e)}"
)
# Delete database record
try:
db.delete(file_metadata)
db.commit()
api_logger.info(f"File record deleted from database: file_id={file_id}")
except Exception as e:
api_logger.error(f"Database delete failed: {e}")
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete file record: {str(e)}"
)
return success(msg="File deleted successfully")
@router.get("/files/{file_id}/url", response_model=ApiResponse)
async def get_file_url(
file_id: uuid.UUID,
expires: int = None,
db: Session = Depends(get_db),
storage_service: FileStorageService = Depends(get_file_storage_service),
):
"""
Get a temporary access URL for a file (no authentication required).
Args:
file_id: The UUID of the file.
expires: URL validity period in seconds (default from FILE_URL_EXPIRES env).
db: Database session.
storage_service: The file storage service.
Returns:
ApiResponse with the temporary access URL.
"""
if expires is None:
expires = settings.FILE_URL_EXPIRES
api_logger.info(f"Get file URL request: file_id={file_id}, expires={expires}")
# Query file metadata from database
file_metadata = db.query(FileMetadata).filter(FileMetadata.id == file_id).first()
if not file_metadata:
api_logger.warning(f"File not found in database: file_id={file_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The file does not exist"
)
if file_metadata.status != "completed":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File upload not completed, status: {file_metadata.status}"
)
file_key = file_metadata.file_key
storage = storage_service.storage
try:
if isinstance(storage, LocalStorage):
# For local storage, generate signed URL with expiration
url = generate_signed_url(str(file_id), expires)
else:
# For remote storage (OSS/S3), get presigned URL
url = await storage_service.get_file_url(file_key, expires=expires)
api_logger.info(f"Generated file URL: file_id={file_id}")
return success(
data={
"url": url,
"expires_in": expires,
"file_name": file_metadata.file_name,
},
msg="File URL generated successfully"
)
except Exception as e:
api_logger.error(f"Failed to generate file URL: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to generate file URL: {str(e)}"
)
@router.get("/public/{file_id}", response_model=Any)
async def public_download_file(
file_id: uuid.UUID,
expires: int = 0,
signature: str = "",
db: Session = Depends(get_db),
storage_service: FileStorageService = Depends(get_file_storage_service),
) -> Any:
"""
Public file download endpoint with signature verification.
This endpoint allows downloading files without authentication,
but requires a valid signature and non-expired timestamp.
Args:
file_id: The UUID of the file.
expires: Expiration timestamp.
signature: HMAC signature for verification.
db: Database session.
storage_service: The file storage service.
Returns:
FileResponse for the requested file.
"""
api_logger.info(f"Public download request: file_id={file_id}")
# Verify signature
is_valid, error_msg = verify_signed_url(str(file_id), expires, signature)
if not is_valid:
api_logger.warning(f"Invalid signed URL: file_id={file_id}, error={error_msg}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=error_msg
)
# Query file metadata from database
file_metadata = db.query(FileMetadata).filter(FileMetadata.id == file_id).first()
if not file_metadata:
api_logger.warning(f"File not found in database: file_id={file_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The file does not exist"
)
if file_metadata.status != "completed":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File upload not completed, status: {file_metadata.status}"
)
file_key = file_metadata.file_key
storage = storage_service.storage
if isinstance(storage, LocalStorage):
full_path = storage._get_full_path(file_key)
if not full_path.exists():
api_logger.warning(f"File not found on disk: file_key={file_key}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="File not found"
)
api_logger.info(f"Serving public file: file_key={file_key}")
return FileResponse(
path=str(full_path),
filename=file_metadata.file_name,
media_type=file_metadata.content_type or "application/octet-stream"
)
else:
# For remote storage, redirect to presigned URL
try:
presigned_url = await storage_service.get_file_url(file_key, expires=3600)
return RedirectResponse(url=presigned_url, status_code=status.HTTP_302_FOUND)
except Exception as e:
api_logger.error(f"Failed to get presigned URL: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve file: {str(e)}"
)

View File

@@ -162,7 +162,7 @@ async def get_preference_tags(
api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
"画像缓存不存在或已过期,请右上角刷新生成新画像",
""
)
@@ -233,7 +233,7 @@ async def get_dimension_portrait(
api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
"画像缓存不存在或已过期,请右上角刷新生成新画像",
""
)
@@ -281,7 +281,7 @@ async def get_interest_area_distribution(
api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
"画像缓存不存在或已过期,请右上角刷新生成新画像",
""
)
@@ -333,7 +333,7 @@ async def get_behavior_habits(
api_logger.info(f"用户 {user_id} 的画像缓存不存在或已过期")
return fail(
BizCode.NOT_FOUND,
"画像缓存不存在或已过期,请调用 /generate_profile 接口生成新画像",
"画像缓存不存在或已过期,请右上角刷新生成新画像",
""
)

View File

@@ -18,7 +18,7 @@ from app.services import task_service, workspace_service
from app.services.memory_agent_service import MemoryAgentService
from app.services.model_service import ModelConfigService
from dotenv import load_dotenv
from fastapi import APIRouter, Depends, File, Form, Query, UploadFile
from fastapi import APIRouter, Depends, File, Form, Query, UploadFile,Header
from sqlalchemy.orm import Session
from starlette.responses import StreamingResponse
@@ -647,7 +647,7 @@ async def get_knowledge_type_stats_api(
@router.get("/analytics/hot_memory_tags/by_user", response_model=ApiResponse)
async def get_hot_memory_tags_by_user_api(
end_user_id: Optional[str] = Query(None, description="用户ID可选"),
language_type: Optional[str] ="zh",
language_type: str = Header(default="zh", alias="X-Language-Type"),
limit: int = Query(20, description="返回标签数量限制"),
current_user: User = Depends(get_current_user),
db: Session=Depends(get_db),

View File

@@ -20,7 +20,7 @@ from app.services.memory_reflection_service import (
)
from app.services.model_service import ModelConfigService
from dotenv import load_dotenv
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status,Header
from sqlalchemy import text
from sqlalchemy.orm import Session
@@ -192,7 +192,7 @@ async def start_reflection_configs(
@router.get("/reflection/run")
async def reflection_run(
config_id: int,
language_type: str = "zh",
language_type: str = Header(default="zh", alias="X-Language-Type"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status,Header
from app.core.logging_config import get_api_logger
from app.core.response_utils import success
from app.db import get_db
@@ -20,7 +20,7 @@ router = APIRouter(
@router.get("/short_term")
async def short_term_configs(
end_user_id: str,
language_type:Optional[str] = "zh",
language_type:str = Header(default="zh", alias="X-Language-Type"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):

View File

@@ -5,7 +5,7 @@
from typing import Optional
import datetime
from sqlalchemy.orm import Session
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends,Header
from app.db import get_db
from app.core.logging_config import get_api_logger
@@ -45,7 +45,7 @@ router = APIRouter(
@router.get("/analytics/memory_insight/report", response_model=ApiResponse)
async def get_memory_insight_report_api(
end_user_id: str,
language_type: str = "zh",
language_type: str = Header(default="zh", alias="X-Language-Type"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
@@ -82,7 +82,7 @@ async def get_memory_insight_report_api(
@router.get("/analytics/user_summary", response_model=ApiResponse)
async def get_user_summary_api(
end_user_id: str,
language_type: str="zh",
language_type: str = Header(default="zh", alias="X-Language-Type"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict:
@@ -385,7 +385,7 @@ async def update_end_user_profile(
return fail(BizCode.INTERNAL_ERROR, "用户信息更新失败", error_msg)
@router.get("/memory_space/timeline_memories", response_model=ApiResponse)
async def memory_space_timeline_of_shared_memories(id: str, label: str,language_type: str="zh",
async def memory_space_timeline_of_shared_memories(id: str, label: str,language_type: str = Header(default="zh", alias="X-Language-Type"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):

View File

@@ -76,6 +76,22 @@ class Settings:
# File Upload
MAX_FILE_SIZE: int = int(os.getenv("MAX_FILE_SIZE", "52428800"))
FILE_PATH: str = os.getenv("FILE_PATH", "/files")
FILE_URL_EXPIRES: int = int(os.getenv("FILE_URL_EXPIRES", "3600"))
# Storage Configuration
STORAGE_TYPE: str = os.getenv("STORAGE_TYPE", "local")
# Aliyun OSS Configuration
OSS_ENDPOINT: str = os.getenv("OSS_ENDPOINT", "")
OSS_ACCESS_KEY_ID: str = os.getenv("OSS_ACCESS_KEY_ID", "")
OSS_ACCESS_KEY_SECRET: str = os.getenv("OSS_ACCESS_KEY_SECRET", "")
OSS_BUCKET_NAME: str = os.getenv("OSS_BUCKET_NAME", "")
# AWS S3 Configuration
S3_REGION: str = os.getenv("S3_REGION", "")
S3_ACCESS_KEY_ID: str = os.getenv("S3_ACCESS_KEY_ID", "")
S3_SECRET_ACCESS_KEY: str = os.getenv("S3_SECRET_ACCESS_KEY", "")
S3_BUCKET_NAME: str = os.getenv("S3_BUCKET_NAME", "")
# VOLC ASR settings
VOLC_APP_KEY: str = os.getenv("VOLC_APP_KEY", "")

View File

@@ -0,0 +1,15 @@
"""Storage backend module."""
from app.core.storage.base import StorageBackend
from app.core.storage.factory import StorageFactory
from app.core.storage.local import LocalStorage
from app.core.storage.oss import OSSStorage
from app.core.storage.s3 import S3Storage
__all__ = [
"StorageBackend",
"LocalStorage",
"OSSStorage",
"S3Storage",
"StorageFactory",
]

View File

@@ -0,0 +1,103 @@
"""
Abstract base class for storage backends.
This module defines the StorageBackend abstract class that all storage
implementations must inherit from. It provides a unified interface for
file operations across different storage backends.
"""
from abc import ABC, abstractmethod
from typing import Optional
class StorageBackend(ABC):
"""
Abstract base class for storage backends.
All storage implementations (local, OSS, S3, etc.) must inherit from this
class and implement all abstract methods. All methods are async to support
non-blocking I/O operations.
"""
@abstractmethod
async def upload(
self,
file_key: str,
content: bytes,
content_type: Optional[str] = None,
) -> str:
"""
Upload a file to the storage backend.
Args:
file_key: Unique identifier for the file in the storage system.
content: File content as bytes.
content_type: Optional MIME type of the file.
Returns:
The storage path or URL of the uploaded file.
Raises:
StorageUploadError: If the upload operation fails.
"""
pass
@abstractmethod
async def download(self, file_key: str) -> bytes:
"""
Download a file from the storage backend.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
File content as bytes.
Raises:
FileNotFoundError: If the file does not exist.
StorageDownloadError: If the download operation fails.
"""
pass
@abstractmethod
async def delete(self, file_key: str) -> bool:
"""
Delete a file from the storage backend.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
True if the file was deleted successfully, False if file didn't exist.
Raises:
StorageDeleteError: If the delete operation fails.
"""
pass
@abstractmethod
async def exists(self, file_key: str) -> bool:
"""
Check if a file exists in the storage backend.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
True if the file exists, False otherwise.
"""
pass
@abstractmethod
async def get_url(self, file_key: str, expires: int = 3600) -> str:
"""
Get an access URL for the file.
Args:
file_key: Unique identifier for the file in the storage system.
expires: URL validity period in seconds (default: 1 hour).
Returns:
URL for accessing the file.
"""
pass

View File

@@ -0,0 +1,103 @@
"""
Storage backend factory module.
This module provides a factory class for creating storage backend instances
based on configuration. It implements the singleton pattern to ensure only
one storage backend instance exists throughout the application lifecycle.
"""
import logging
from pathlib import Path
from typing import Optional
from app.core.config import settings
from app.core.storage.base import StorageBackend
from app.core.storage_exceptions import StorageConfigError
logger = logging.getLogger(__name__)
class StorageFactory:
"""
Factory class for creating storage backend instances.
This class implements the singleton pattern to ensure only one storage
backend instance is created and reused throughout the application.
Attributes:
_instance: The singleton storage backend instance.
"""
_instance: Optional[StorageBackend] = None
@classmethod
def get_storage(cls) -> StorageBackend:
"""
Get the storage backend instance (singleton).
Returns:
The storage backend instance.
Raises:
ValueError: If the configured storage type is not supported.
StorageConfigError: If required configuration is missing.
"""
if cls._instance is None:
cls._instance = cls._create_storage()
return cls._instance
@classmethod
def _create_storage(cls) -> StorageBackend:
"""
Create a storage backend instance based on configuration.
Returns:
A new storage backend instance.
Raises:
ValueError: If the configured storage type is not supported.
StorageConfigError: If required configuration is missing.
"""
storage_type = settings.STORAGE_TYPE.lower()
logger.info(f"Creating storage backend of type: {storage_type}")
if storage_type == "local":
from app.core.storage.local import LocalStorage
base_path = Path("storage") / settings.FILE_PATH.lstrip("/")
return LocalStorage(base_path=str(base_path))
elif storage_type == "oss":
from app.core.storage.oss import OSSStorage
return OSSStorage(
endpoint=settings.OSS_ENDPOINT,
access_key_id=settings.OSS_ACCESS_KEY_ID,
access_key_secret=settings.OSS_ACCESS_KEY_SECRET,
bucket_name=settings.OSS_BUCKET_NAME,
)
elif storage_type == "s3":
from app.core.storage.s3 import S3Storage
return S3Storage(
region=settings.S3_REGION,
access_key_id=settings.S3_ACCESS_KEY_ID,
secret_access_key=settings.S3_SECRET_ACCESS_KEY,
bucket_name=settings.S3_BUCKET_NAME,
)
else:
logger.error(f"Unsupported storage type: {storage_type}")
raise ValueError(f"Unsupported storage type: {storage_type}")
@classmethod
def reset(cls) -> None:
"""
Reset the singleton instance.
This method is primarily used for testing purposes to allow
creating new storage instances with different configurations.
"""
cls._instance = None
logger.debug("StorageFactory singleton instance reset")

View File

@@ -0,0 +1,196 @@
"""
Local file system storage backend implementation.
This module provides a storage backend that stores files on the local
file system using async I/O operations via aiofiles.
"""
import logging
from pathlib import Path
from typing import Optional
import aiofiles
import aiofiles.os
from app.core.storage.base import StorageBackend
from app.core.storage_exceptions import (
StorageDeleteError,
StorageDownloadError,
StorageUploadError,
)
logger = logging.getLogger(__name__)
class LocalStorage(StorageBackend):
"""
Local file system storage implementation.
This class implements the StorageBackend interface for storing files
on the local file system. It uses aiofiles for async file operations.
Attributes:
base_path: The base directory path where files will be stored.
"""
def __init__(self, base_path: str):
"""
Initialize the LocalStorage backend.
Args:
base_path: The base directory path for file storage.
Will be created if it doesn't exist.
"""
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
logger.info(f"LocalStorage initialized with base_path: {self.base_path}")
def _get_full_path(self, file_key: str) -> Path:
"""
Get the full file system path for a given file key.
Args:
file_key: The unique identifier for the file.
Returns:
The full Path object for the file.
"""
return self.base_path / file_key
async def upload(
self,
file_key: str,
content: bytes,
content_type: Optional[str] = None,
) -> str:
"""
Upload a file to the local file system.
Args:
file_key: Unique identifier for the file in the storage system.
content: File content as bytes.
content_type: Optional MIME type (not used for local storage).
Returns:
The full path of the uploaded file as a string.
Raises:
StorageUploadError: If the upload operation fails.
"""
full_path = self._get_full_path(file_key)
try:
# Create parent directories if they don't exist
full_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(full_path, "wb") as f:
await f.write(content)
logger.info(f"File uploaded successfully: {file_key}")
return str(full_path)
except Exception as e:
logger.error(f"Failed to upload file {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to upload file: {e}",
file_key=file_key,
cause=e,
)
async def download(self, file_key: str) -> bytes:
"""
Download a file from the local file system.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
File content as bytes.
Raises:
FileNotFoundError: If the file does not exist.
StorageDownloadError: If the download operation fails.
"""
full_path = self._get_full_path(file_key)
if not full_path.exists():
logger.warning(f"File not found: {file_key}")
raise FileNotFoundError(f"File not found: {file_key}")
try:
async with aiofiles.open(full_path, "rb") as f:
content = await f.read()
logger.info(f"File downloaded successfully: {file_key}")
return content
except FileNotFoundError:
raise
except Exception as e:
logger.error(f"Failed to download file {file_key}: {e}")
raise StorageDownloadError(
message=f"Failed to download file: {e}",
file_key=file_key,
cause=e,
)
async def delete(self, file_key: str) -> bool:
"""
Delete a file from the local file system.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
True if the file was deleted, False if it didn't exist.
Raises:
StorageDeleteError: If the delete operation fails.
"""
full_path = self._get_full_path(file_key)
if not full_path.exists():
logger.info(f"File does not exist, nothing to delete: {file_key}")
return False
try:
await aiofiles.os.remove(full_path)
logger.info(f"File deleted successfully: {file_key}")
return True
except Exception as e:
logger.error(f"Failed to delete file {file_key}: {e}")
raise StorageDeleteError(
message=f"Failed to delete file: {e}",
file_key=file_key,
cause=e,
)
async def exists(self, file_key: str) -> bool:
"""
Check if a file exists in the local file system.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
True if the file exists, False otherwise.
"""
full_path = self._get_full_path(file_key)
return full_path.exists()
async def get_url(self, file_key: str, expires: int = 3600) -> str:
"""
Get an access URL for the file.
For local storage, this returns a relative path that can be used
by the API layer to serve the file.
Args:
file_key: Unique identifier for the file in the storage system.
expires: URL validity period in seconds (not used for local storage).
Returns:
A relative URL path for accessing the file.
"""
return f"/files/{file_key}"

233
api/app/core/storage/oss.py Normal file
View File

@@ -0,0 +1,233 @@
"""
Aliyun OSS storage backend implementation.
This module provides a storage backend that stores files on Aliyun Object
Storage Service (OSS) using the oss2 SDK.
"""
import logging
from typing import Optional
import oss2
from oss2.exceptions import NoSuchKey, OssError
from app.core.storage.base import StorageBackend
from app.core.storage_exceptions import (
StorageConfigError,
StorageConnectionError,
StorageDeleteError,
StorageDownloadError,
StorageUploadError,
)
logger = logging.getLogger(__name__)
class OSSStorage(StorageBackend):
"""
Aliyun OSS storage implementation.
This class implements the StorageBackend interface for storing files
on Aliyun Object Storage Service (OSS).
Attributes:
bucket: The oss2.Bucket instance for OSS operations.
bucket_name: The name of the OSS bucket.
endpoint: The OSS endpoint URL.
"""
def __init__(
self,
endpoint: str,
access_key_id: str,
access_key_secret: str,
bucket_name: str,
):
"""
Initialize the OSSStorage backend.
Args:
endpoint: The OSS endpoint URL (e.g., 'https://oss-cn-hangzhou.aliyuncs.com').
access_key_id: The Aliyun access key ID.
access_key_secret: The Aliyun access key secret.
bucket_name: The name of the OSS bucket.
Raises:
StorageConfigError: If any required configuration is missing.
"""
# Validate required configuration
if not endpoint:
raise StorageConfigError(message="OSS endpoint is required")
if not access_key_id:
raise StorageConfigError(message="OSS access_key_id is required")
if not access_key_secret:
raise StorageConfigError(message="OSS access_key_secret is required")
if not bucket_name:
raise StorageConfigError(message="OSS bucket_name is required")
self.endpoint = endpoint
self.bucket_name = bucket_name
try:
auth = oss2.Auth(access_key_id, access_key_secret)
self.bucket = oss2.Bucket(auth, endpoint, bucket_name)
logger.info(
f"OSSStorage initialized with endpoint: {endpoint}, bucket: {bucket_name}"
)
except Exception as e:
logger.error(f"Failed to initialize OSS client: {e}")
raise StorageConnectionError(
message=f"Failed to initialize OSS client: {e}",
cause=e,
)
async def upload(
self,
file_key: str,
content: bytes,
content_type: Optional[str] = None,
) -> str:
"""
Upload a file to OSS.
Args:
file_key: Unique identifier for the file in the storage system.
content: File content as bytes.
content_type: Optional MIME type of the file.
Returns:
The file key of the uploaded file.
Raises:
StorageUploadError: If the upload operation fails.
"""
try:
headers = {}
if content_type:
headers["Content-Type"] = content_type
self.bucket.put_object(file_key, content, headers=headers if headers else None)
logger.info(f"File uploaded to OSS successfully: {file_key}")
return file_key
except OssError as e:
logger.error(f"OSS error uploading file {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to upload file to OSS: {e.message}",
file_key=file_key,
cause=e,
)
except Exception as e:
logger.error(f"Failed to upload file to OSS {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to upload file to OSS: {e}",
file_key=file_key,
cause=e,
)
async def download(self, file_key: str) -> bytes:
"""
Download a file from OSS.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
File content as bytes.
Raises:
FileNotFoundError: If the file does not exist.
StorageDownloadError: If the download operation fails.
"""
try:
result = self.bucket.get_object(file_key)
content = result.read()
logger.info(f"File downloaded from OSS successfully: {file_key}")
return content
except NoSuchKey:
logger.warning(f"File not found in OSS: {file_key}")
raise FileNotFoundError(f"File not found: {file_key}")
except OssError as e:
logger.error(f"OSS error downloading file {file_key}: {e}")
raise StorageDownloadError(
message=f"Failed to download file from OSS: {e.message}",
file_key=file_key,
cause=e,
)
except Exception as e:
logger.error(f"Failed to download file from OSS {file_key}: {e}")
raise StorageDownloadError(
message=f"Failed to download file from OSS: {e}",
file_key=file_key,
cause=e,
)
async def delete(self, file_key: str) -> bool:
"""
Delete a file from OSS.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
True if the file was deleted successfully.
Raises:
StorageDeleteError: If the delete operation fails.
"""
try:
self.bucket.delete_object(file_key)
logger.info(f"File deleted from OSS successfully: {file_key}")
return True
except OssError as e:
logger.error(f"OSS error deleting file {file_key}: {e}")
raise StorageDeleteError(
message=f"Failed to delete file from OSS: {e.message}",
file_key=file_key,
cause=e,
)
except Exception as e:
logger.error(f"Failed to delete file from OSS {file_key}: {e}")
raise StorageDeleteError(
message=f"Failed to delete file from OSS: {e}",
file_key=file_key,
cause=e,
)
async def exists(self, file_key: str) -> bool:
"""
Check if a file exists in OSS.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
True if the file exists, False otherwise.
"""
try:
return self.bucket.object_exists(file_key)
except Exception as e:
logger.error(f"Failed to check file existence in OSS {file_key}: {e}")
return False
async def get_url(self, file_key: str, expires: int = 3600) -> str:
"""
Get a presigned URL for accessing the file.
Args:
file_key: Unique identifier for the file in the storage system.
expires: URL validity period in seconds (default: 1 hour).
Returns:
A presigned URL for accessing the file.
"""
try:
url = self.bucket.sign_url("GET", file_key, expires)
logger.debug(f"Generated presigned URL for {file_key}, expires in {expires}s")
return url
except Exception as e:
logger.error(f"Failed to generate presigned URL for {file_key}: {e}")
# Return a basic URL format as fallback
return f"https://{self.bucket_name}.{self.endpoint.replace('https://', '').replace('http://', '')}/{file_key}"

299
api/app/core/storage/s3.py Normal file
View File

@@ -0,0 +1,299 @@
"""
AWS S3 storage backend implementation.
This module provides a storage backend that stores files on AWS S3
using the boto3 SDK.
"""
import logging
from typing import Optional
import boto3
from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError
from app.core.storage.base import StorageBackend
from app.core.storage_exceptions import (
StorageConfigError,
StorageConnectionError,
StorageDeleteError,
StorageDownloadError,
StorageUploadError,
)
logger = logging.getLogger(__name__)
class S3Storage(StorageBackend):
"""
AWS S3 storage implementation.
This class implements the StorageBackend interface for storing files
on AWS Simple Storage Service (S3).
Attributes:
client: The boto3 S3 client instance.
bucket_name: The name of the S3 bucket.
region: The AWS region.
"""
def __init__(
self,
region: str,
access_key_id: str,
secret_access_key: str,
bucket_name: str,
):
"""
Initialize the S3Storage backend.
Args:
region: The AWS region (e.g., 'us-east-1').
access_key_id: The AWS access key ID.
secret_access_key: The AWS secret access key.
bucket_name: The name of the S3 bucket.
Raises:
StorageConfigError: If any required configuration is missing.
StorageConnectionError: If connection to S3 fails.
"""
# Validate required configuration
if not region:
raise StorageConfigError(message="S3 region is required")
if not access_key_id:
raise StorageConfigError(message="S3 access_key_id is required")
if not secret_access_key:
raise StorageConfigError(message="S3 secret_access_key is required")
if not bucket_name:
raise StorageConfigError(message="S3 bucket_name is required")
self.region = region
self.bucket_name = bucket_name
try:
self.client = boto3.client(
"s3",
region_name=region,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
)
logger.info(
f"S3Storage initialized with region: {region}, bucket: {bucket_name}"
)
except NoCredentialsError as e:
logger.error(f"Invalid AWS credentials: {e}")
raise StorageConfigError(
message=f"Invalid AWS credentials: {e}",
cause=e,
)
except Exception as e:
logger.error(f"Failed to initialize S3 client: {e}")
raise StorageConnectionError(
message=f"Failed to initialize S3 client: {e}",
cause=e,
)
async def upload(
self,
file_key: str,
content: bytes,
content_type: Optional[str] = None,
) -> str:
"""
Upload a file to S3.
Args:
file_key: Unique identifier for the file in the storage system.
content: File content as bytes.
content_type: Optional MIME type of the file.
Returns:
The file key of the uploaded file.
Raises:
StorageUploadError: If the upload operation fails.
"""
try:
extra_args = {}
if content_type:
extra_args["ContentType"] = content_type
self.client.put_object(
Bucket=self.bucket_name,
Key=file_key,
Body=content,
**extra_args,
)
logger.info(f"File uploaded to S3 successfully: {file_key}")
return file_key
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
error_message = e.response.get("Error", {}).get("Message", str(e))
logger.error(f"S3 ClientError uploading file {file_key}: {error_message}")
raise StorageUploadError(
message=f"Failed to upload file to S3 ({error_code}): {error_message}",
file_key=file_key,
cause=e,
)
except BotoCoreError as e:
logger.error(f"S3 BotoCoreError uploading file {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to upload file to S3: {e}",
file_key=file_key,
cause=e,
)
except Exception as e:
logger.error(f"Failed to upload file to S3 {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to upload file to S3: {e}",
file_key=file_key,
cause=e,
)
async def download(self, file_key: str) -> bytes:
"""
Download a file from S3.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
File content as bytes.
Raises:
FileNotFoundError: If the file does not exist.
StorageDownloadError: If the download operation fails.
"""
try:
response = self.client.get_object(
Bucket=self.bucket_name,
Key=file_key,
)
content = response["Body"].read()
logger.info(f"File downloaded from S3 successfully: {file_key}")
return content
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
if error_code in ("NoSuchKey", "404"):
logger.warning(f"File not found in S3: {file_key}")
raise FileNotFoundError(f"File not found: {file_key}")
error_message = e.response.get("Error", {}).get("Message", str(e))
logger.error(f"S3 ClientError downloading file {file_key}: {error_message}")
raise StorageDownloadError(
message=f"Failed to download file from S3 ({error_code}): {error_message}",
file_key=file_key,
cause=e,
)
except BotoCoreError as e:
logger.error(f"S3 BotoCoreError downloading file {file_key}: {e}")
raise StorageDownloadError(
message=f"Failed to download file from S3: {e}",
file_key=file_key,
cause=e,
)
except Exception as e:
logger.error(f"Failed to download file from S3 {file_key}: {e}")
raise StorageDownloadError(
message=f"Failed to download file from S3: {e}",
file_key=file_key,
cause=e,
)
async def delete(self, file_key: str) -> bool:
"""
Delete a file from S3.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
True if the file was deleted successfully.
Raises:
StorageDeleteError: If the delete operation fails.
"""
try:
self.client.delete_object(
Bucket=self.bucket_name,
Key=file_key,
)
logger.info(f"File deleted from S3 successfully: {file_key}")
return True
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
error_message = e.response.get("Error", {}).get("Message", str(e))
logger.error(f"S3 ClientError deleting file {file_key}: {error_message}")
raise StorageDeleteError(
message=f"Failed to delete file from S3 ({error_code}): {error_message}",
file_key=file_key,
cause=e,
)
except BotoCoreError as e:
logger.error(f"S3 BotoCoreError deleting file {file_key}: {e}")
raise StorageDeleteError(
message=f"Failed to delete file from S3: {e}",
file_key=file_key,
cause=e,
)
except Exception as e:
logger.error(f"Failed to delete file from S3 {file_key}: {e}")
raise StorageDeleteError(
message=f"Failed to delete file from S3: {e}",
file_key=file_key,
cause=e,
)
async def exists(self, file_key: str) -> bool:
"""
Check if a file exists in S3.
Args:
file_key: Unique identifier for the file in the storage system.
Returns:
True if the file exists, False otherwise.
"""
try:
self.client.head_object(
Bucket=self.bucket_name,
Key=file_key,
)
return True
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
if error_code in ("404", "NoSuchKey"):
return False
logger.error(f"Failed to check file existence in S3 {file_key}: {e}")
return False
except Exception as e:
logger.error(f"Failed to check file existence in S3 {file_key}: {e}")
return False
async def get_url(self, file_key: str, expires: int = 3600) -> str:
"""
Get a presigned URL for accessing the file.
Args:
file_key: Unique identifier for the file in the storage system.
expires: URL validity period in seconds (default: 1 hour).
Returns:
A presigned URL for accessing the file.
"""
try:
url = self.client.generate_presigned_url(
"get_object",
Params={
"Bucket": self.bucket_name,
"Key": file_key,
},
ExpiresIn=expires,
)
logger.debug(f"Generated presigned URL for {file_key}, expires in {expires}s")
return url
except Exception as e:
logger.error(f"Failed to generate presigned URL for {file_key}: {e}")
# Return a basic URL format as fallback
return f"https://{self.bucket_name}.s3.{self.region}.amazonaws.com/{file_key}"

View File

@@ -0,0 +1,101 @@
"""
URL signing utilities for local file storage.
This module provides functions to generate and verify signed URLs
with expiration time for local file access.
"""
import hashlib
import hmac
import time
from typing import Optional, Tuple
from urllib.parse import parse_qs, urlencode, urlparse
from app.core.config import settings
def generate_signed_url(
file_id: str,
expires: int,
base_url: Optional[str] = None,
) -> str:
"""
Generate a signed URL for local file access.
Args:
file_id: The file UUID as string.
expires: URL validity period in seconds.
base_url: Base URL prefix (default: http://localhost:8000/api).
Returns:
A signed URL with expiration timestamp and signature.
Example:
>>> generate_signed_url("abc-123", 3600)
'http://localhost:8000/api/storage/public/abc-123?expires=1234567890&signature=xxx'
"""
if base_url is None:
# Use SERVER_IP or default to localhost
server_url = f"http://{settings.SERVER_IP}:8000/api"
base_url = server_url
# Calculate expiration timestamp
expires_at = int(time.time()) + expires
# Generate signature
signature = _generate_signature(file_id, expires_at)
# Build URL with query parameters
params = urlencode({
"expires": expires_at,
"signature": signature,
})
return f"{base_url}/storage/public/{file_id}?{params}"
def verify_signed_url(
file_id: str,
expires_at: int,
signature: str,
) -> Tuple[bool, Optional[str]]:
"""
Verify a signed URL.
Args:
file_id: The file UUID as string.
expires_at: The expiration timestamp.
signature: The signature to verify.
Returns:
A tuple of (is_valid, error_message).
If valid, error_message is None.
"""
# Check expiration
if time.time() > expires_at:
return False, "URL has expired"
# Verify signature
expected_signature = _generate_signature(file_id, expires_at)
if not hmac.compare_digest(signature, expected_signature):
return False, "Invalid signature"
return True, None
def _generate_signature(file_id: str, expires_at: int) -> str:
"""
Generate HMAC signature for URL parameters.
Args:
file_id: The file UUID as string.
expires_at: The expiration timestamp.
Returns:
Hex-encoded HMAC-SHA256 signature.
"""
secret_key = settings.SECRET_KEY.encode()
message = f"{file_id}:{expires_at}".encode()
signature = hmac.new(secret_key, message, hashlib.sha256).hexdigest()
return signature

View File

@@ -0,0 +1,59 @@
"""
Custom exceptions for storage operations.
This module defines a hierarchy of exceptions for handling storage-related errors,
including configuration errors, connection errors, and operation-specific errors.
"""
class StorageError(Exception):
"""Base exception for all storage operations."""
def __init__(
self,
message: str,
file_key: str | None = None,
cause: Exception | None = None,
):
self.message = message
self.file_key = file_key
self.cause = cause
super().__init__(self.message)
def __str__(self) -> str:
parts = [self.message]
if self.file_key:
parts.append(f"file_key={self.file_key}")
if self.cause:
parts.append(f"cause={self.cause}")
return ", ".join(parts)
class StorageConfigError(StorageError):
"""Exception raised when storage configuration is invalid or missing."""
pass
class StorageConnectionError(StorageError):
"""Exception raised when connection to storage backend fails."""
pass
class StorageUploadError(StorageError):
"""Exception raised when file upload operation fails."""
pass
class StorageDownloadError(StorageError):
"""Exception raised when file download operation fails."""
pass
class StorageDeleteError(StorageError):
"""Exception raised when file delete operation fails."""
pass

View File

@@ -4,6 +4,7 @@ from .workspace_model import Workspace, WorkspaceMember, WorkspaceRole
from .knowledge_model import Knowledge
from .document_model import Document
from .file_model import File
from .file_metadata_model import FileMetadata
from .generic_file_model import GenericFile
from .models_model import ModelConfig, ModelProvider, ModelType, ModelApiKey
from .memory_short_model import ShortTermMemory, LongTermMemory
@@ -37,6 +38,7 @@ __all__ = [
"Knowledge",
"Document",
"File",
"FileMetadata",
"GenericFile",
"ModelConfig",
"ModelProvider",

View File

@@ -0,0 +1,45 @@
"""
File metadata model for storing file storage information.
This model stores metadata about files uploaded to the storage backend,
including the storage key, content type, and other relevant information.
"""
import datetime
import uuid
from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.dialects.postgresql import UUID
from app.db import Base
class FileMetadata(Base):
"""
Model for storing file metadata.
Attributes:
id: Primary key UUID.
tenant_id: The tenant that owns the file.
workspace_id: The workspace the file belongs to.
file_key: The unique storage key for the file.
file_name: Original file name.
file_ext: File extension (e.g., .pdf, .md).
file_size: File size in bytes.
content_type: MIME type of the file.
status: Upload status (pending, completed, failed).
created_at: Timestamp when the file was uploaded.
"""
__tablename__ = "file_metadata"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True, comment="Tenant ID")
workspace_id = Column(UUID(as_uuid=True), nullable=False, index=True, comment="Workspace ID")
file_key = Column(String(512), nullable=False, unique=True, index=True, comment="Storage file key")
file_name = Column(String(255), nullable=False, comment="Original file name")
file_ext = Column(String(32), nullable=False, comment="File extension")
file_size = Column(Integer, nullable=False, default=0, comment="File size in bytes")
content_type = Column(String(128), nullable=True, comment="MIME content type")
status = Column(String(16), nullable=False, default="pending", comment="Upload status: pending, completed, failed")
created_at = Column(DateTime, nullable=False, default=datetime.datetime.now)

View File

@@ -3,7 +3,6 @@
from typing import Optional
from pydantic import BaseModel, Field
class EmotionTagsRequest(BaseModel):
"""获取情绪标签统计请求"""
group_id: str = Field(..., description="组ID")
@@ -11,7 +10,6 @@ class EmotionTagsRequest(BaseModel):
start_date: Optional[str] = Field(None, description="开始日期ISO格式2024-01-01")
end_date: Optional[str] = Field(None, description="结束日期ISO格式2024-12-31")
limit: int = Field(10, ge=1, le=100, description="返回数量限制")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")
class EmotionWordcloudRequest(BaseModel):
@@ -19,24 +17,22 @@ class EmotionWordcloudRequest(BaseModel):
group_id: str = Field(..., description="组ID")
emotion_type: Optional[str] = Field(None, description="情绪类型过滤joy/sadness/anger/fear/surprise/neutral")
limit: int = Field(50, ge=1, le=200, description="返回词语数量")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")
class EmotionHealthRequest(BaseModel):
"""获取情绪健康指数请求"""
group_id: str = Field(..., description="组ID")
time_range: str = Field("30d", description="时间范围7d/30d/90d")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")
class EmotionSuggestionsRequest(BaseModel):
"""获取个性化情绪建议请求"""
group_id: str = Field(..., description="组ID")
config_id: Optional[int] = Field(None, description="配置ID用于指定LLM模型")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")
class EmotionGenerateSuggestionsRequest(BaseModel):
"""生成个性化情绪建议请求"""
end_user_id: str = Field(..., description="终端用户ID")
language_type: Optional[str] = Field("zh", description="语言类型zh/en")

View File

@@ -0,0 +1,340 @@
"""
File storage service module.
This module provides a high-level service layer for file storage operations,
encapsulating the storage backend and providing file_key generation, logging,
and error handling.
"""
import logging
import time
import uuid
from typing import Optional
from app.core.storage import StorageFactory, StorageBackend
from app.core.storage_exceptions import (
StorageError,
StorageUploadError,
StorageDownloadError,
StorageDeleteError,
)
from app.core.logging_config import get_business_logger
# Obtain a dedicated logger for business logic
logger = get_business_logger()
def generate_file_key(
tenant_id: uuid.UUID,
workspace_id: uuid.UUID,
file_id: uuid.UUID,
file_ext: str,
) -> str:
"""
Generate a unique file key for storage.
The file key follows the format: {tenant_id}/{workspace_id}/{file_id}{file_ext}
Args:
tenant_id: The tenant UUID.
workspace_id: The workspace UUID.
file_id: The file UUID.
file_ext: The file extension (e.g., '.pdf', '.txt').
Returns:
A unique file key string.
Example:
>>> generate_file_key(
... uuid.UUID('550e8400-e29b-41d4-a716-446655440000'),
... uuid.UUID('660e8400-e29b-41d4-a716-446655440001'),
... uuid.UUID('770e8400-e29b-41d4-a716-446655440002'),
... '.pdf'
... )
'550e8400-e29b-41d4-a716-446655440000/660e8400-e29b-41d4-a716-446655440001/770e8400-e29b-41d4-a716-446655440002.pdf'
"""
# Ensure file_ext starts with a dot
if file_ext and not file_ext.startswith('.'):
file_ext = f'.{file_ext}'
return f"{tenant_id}/{workspace_id}/{file_id}{file_ext}"
class FileStorageService:
"""
High-level service for file storage operations.
This service encapsulates the storage backend and provides:
- File key generation
- Upload, download, delete operations
- Comprehensive logging
- Error handling with meaningful messages
"""
def __init__(self, storage: Optional[StorageBackend] = None):
"""
Initialize the file storage service.
Args:
storage: Optional storage backend instance. If not provided,
the default storage backend from StorageFactory is used.
"""
self._storage = storage
@property
def storage(self) -> StorageBackend:
"""
Get the storage backend instance (lazy initialization).
Returns:
The storage backend instance.
"""
if self._storage is None:
self._storage = StorageFactory.get_storage()
return self._storage
async def upload_file(
self,
tenant_id: uuid.UUID,
workspace_id: uuid.UUID,
file_id: uuid.UUID,
file_ext: str,
content: bytes,
content_type: Optional[str] = None,
) -> str:
"""
Upload a file to storage.
Args:
tenant_id: The tenant UUID.
workspace_id: The workspace UUID.
file_id: The file UUID.
file_ext: The file extension.
content: The file content as bytes.
content_type: Optional MIME type of the file.
Returns:
The file key of the uploaded file.
Raises:
StorageUploadError: If the upload operation fails.
"""
file_key = generate_file_key(tenant_id, workspace_id, file_id, file_ext)
start_time = time.time()
logger.info(
f"Starting file upload: file_key={file_key}, "
f"size={len(content)} bytes, content_type={content_type}"
)
try:
await self.storage.upload(file_key, content, content_type)
elapsed_time = time.time() - start_time
logger.info(
f"File upload successful: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s"
)
return file_key
except StorageError as e:
elapsed_time = time.time() - start_time
logger.error(
f"File upload failed: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageUploadError(
message=f"Failed to upload file: {str(e)}",
file_key=file_key,
cause=e,
)
except Exception as e:
elapsed_time = time.time() - start_time
logger.error(
f"Unexpected error during file upload: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageUploadError(
message=f"Unexpected error during upload: {str(e)}",
file_key=file_key,
cause=e,
)
async def download_file(self, file_key: str) -> bytes:
"""
Download a file from storage.
Args:
file_key: The file key of the file to download.
Returns:
The file content as bytes.
Raises:
FileNotFoundError: If the file does not exist.
StorageDownloadError: If the download operation fails.
"""
start_time = time.time()
logger.info(f"Starting file download: file_key={file_key}")
try:
content = await self.storage.download(file_key)
elapsed_time = time.time() - start_time
logger.info(
f"File download successful: file_key={file_key}, "
f"size={len(content)} bytes, elapsed_time={elapsed_time:.3f}s"
)
return content
except FileNotFoundError:
elapsed_time = time.time() - start_time
logger.warning(
f"File not found: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s"
)
raise
except StorageError as e:
elapsed_time = time.time() - start_time
logger.error(
f"File download failed: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageDownloadError(
message=f"Failed to download file: {str(e)}",
file_key=file_key,
cause=e,
)
except Exception as e:
elapsed_time = time.time() - start_time
logger.error(
f"Unexpected error during file download: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageDownloadError(
message=f"Unexpected error during download: {str(e)}",
file_key=file_key,
cause=e,
)
async def delete_file(self, file_key: str) -> bool:
"""
Delete a file from storage.
Args:
file_key: The file key of the file to delete.
Returns:
True if the file was deleted, False if it didn't exist.
Raises:
StorageDeleteError: If the delete operation fails.
"""
start_time = time.time()
logger.info(f"Starting file deletion: file_key={file_key}")
try:
result = await self.storage.delete(file_key)
elapsed_time = time.time() - start_time
if result:
logger.info(
f"File deletion successful: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s"
)
else:
logger.info(
f"File did not exist: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s"
)
return result
except StorageError as e:
elapsed_time = time.time() - start_time
logger.error(
f"File deletion failed: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageDeleteError(
message=f"Failed to delete file: {str(e)}",
file_key=file_key,
cause=e,
)
except Exception as e:
elapsed_time = time.time() - start_time
logger.error(
f"Unexpected error during file deletion: file_key={file_key}, "
f"elapsed_time={elapsed_time:.3f}s, error={str(e)}"
)
raise StorageDeleteError(
message=f"Unexpected error during deletion: {str(e)}",
file_key=file_key,
cause=e,
)
async def file_exists(self, file_key: str) -> bool:
"""
Check if a file exists in storage.
Args:
file_key: The file key to check.
Returns:
True if the file exists, False otherwise.
"""
logger.debug(f"Checking file existence: file_key={file_key}")
try:
exists = await self.storage.exists(file_key)
logger.debug(f"File existence check: file_key={file_key}, exists={exists}")
return exists
except Exception as e:
logger.error(
f"Error checking file existence: file_key={file_key}, error={str(e)}"
)
raise
async def get_file_url(self, file_key: str, expires: int = 3600) -> str:
"""
Get an access URL for a file.
Args:
file_key: The file key.
expires: URL validity period in seconds (default: 1 hour).
Returns:
URL for accessing the file.
"""
logger.debug(f"Getting file URL: file_key={file_key}, expires={expires}s")
try:
url = await self.storage.get_url(file_key, expires)
logger.debug(f"File URL generated: file_key={file_key}")
return url
except Exception as e:
logger.error(
f"Error getting file URL: file_key={file_key}, error={str(e)}"
)
raise
# Create a default instance for convenience
_default_service: Optional[FileStorageService] = None
def get_file_storage_service() -> FileStorageService:
"""
Get the default file storage service instance.
Returns:
The default FileStorageService instance.
"""
global _default_service
if _default_service is None:
_default_service = FileStorageService()
return _default_service

View File

@@ -75,6 +75,23 @@ ENABLE_SINGLE_SESSION=
MAX_FILE_SIZE=52428800 # 50MB:10 * 1024 * 1024
FILE_PATH=/files
# Storage Backend Configuration
# Supported values: local, oss, s3
# Default: local
STORAGE_TYPE=local
# Aliyun OSS Configuration (required when STORAGE_TYPE=oss)
OSS_ENDPOINT=https://oss-cn-hangzhou.aliyuncs.com
OSS_ACCESS_KEY_ID=your_oss_access_key_id
OSS_ACCESS_KEY_SECRET=your_oss_access_key_secret
OSS_BUCKET_NAME=your_bucket_name
# AWS S3 Configuration (required when STORAGE_TYPE=s3)
S3_REGION=us-east-1
S3_ACCESS_KEY_ID=your_s3_access_key_id
S3_SECRET_ACCESS_KEY=your_s3_secret_access_key
S3_BUCKET_NAME=your_bucket_name
# RAG Setting
DOTNET_SYSTEM_GLOBALIZATION_INVARIANT=1
HF_ENDPOINT=https://hf-mirror.com

View File

@@ -0,0 +1,84 @@
"""202601221030
Revision ID: 9a936a9ebb20
Revises: 8cd790908f92
Create Date: 2026-01-22 10:27:10.840844
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '9a936a9ebb20'
down_revision: Union[str, None] = '8cd790908f92'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('file_metadata',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('tenant_id', sa.UUID(), nullable=False, comment='Tenant ID'),
sa.Column('workspace_id', sa.UUID(), nullable=False, comment='Workspace ID'),
sa.Column('file_key', sa.String(length=512), nullable=False, comment='Storage file key'),
sa.Column('file_name', sa.String(length=255), nullable=False, comment='Original file name'),
sa.Column('file_ext', sa.String(length=32), nullable=False, comment='File extension'),
sa.Column('file_size', sa.Integer(), nullable=False, comment='File size in bytes'),
sa.Column('content_type', sa.String(length=128), nullable=True, comment='MIME content type'),
sa.Column('status', sa.String(length=16), nullable=False, comment='Upload status: pending, completed, failed'),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_file_metadata_file_key'), 'file_metadata', ['file_key'], unique=True)
op.create_index(op.f('ix_file_metadata_id'), 'file_metadata', ['id'], unique=False)
op.create_index(op.f('ix_file_metadata_tenant_id'), 'file_metadata', ['tenant_id'], unique=False)
op.create_index(op.f('ix_file_metadata_workspace_id'), 'file_metadata', ['workspace_id'], unique=False)
op.drop_index(op.f('ix_emotion_suggestions_cache_end_user_id'), table_name='emotion_suggestions_cache')
op.drop_index(op.f('ix_emotion_suggestions_cache_id'), table_name='emotion_suggestions_cache')
op.drop_table('emotion_suggestions_cache')
op.drop_index(op.f('ix_implicit_memory_cache_end_user_id'), table_name='implicit_memory_cache')
op.drop_index(op.f('ix_implicit_memory_cache_id'), table_name='implicit_memory_cache')
op.drop_table('implicit_memory_cache')
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('implicit_memory_cache',
sa.Column('id', sa.UUID(), autoincrement=False, nullable=False),
sa.Column('end_user_id', sa.VARCHAR(length=255), autoincrement=False, nullable=False, comment='终端用户ID'),
sa.Column('preferences', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=False, comment='偏好标签列表JSON格式'),
sa.Column('portrait', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=False, comment='四维画像对象JSON格式'),
sa.Column('interest_areas', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=False, comment='兴趣领域分布对象JSON格式'),
sa.Column('habits', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=False, comment='行为习惯列表JSON格式'),
sa.Column('generated_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=False, comment='生成时间'),
sa.Column('expires_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True, comment='过期时间'),
sa.Column('created_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True),
sa.Column('updated_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('implicit_memory_cache_pkey'))
)
op.create_index(op.f('ix_implicit_memory_cache_id'), 'implicit_memory_cache', ['id'], unique=False)
op.create_index(op.f('ix_implicit_memory_cache_end_user_id'), 'implicit_memory_cache', ['end_user_id'], unique=True)
op.create_table('emotion_suggestions_cache',
sa.Column('id', sa.UUID(), autoincrement=False, nullable=False),
sa.Column('end_user_id', sa.VARCHAR(length=255), autoincrement=False, nullable=False, comment='终端用户ID组ID'),
sa.Column('health_summary', sa.TEXT(), autoincrement=False, nullable=False, comment='健康状态摘要'),
sa.Column('suggestions', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=False, comment='建议列表JSON格式'),
sa.Column('generated_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=False, comment='生成时间'),
sa.Column('expires_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True, comment='过期时间'),
sa.Column('created_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True),
sa.Column('updated_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('emotion_suggestions_cache_pkey'))
)
op.create_index(op.f('ix_emotion_suggestions_cache_id'), 'emotion_suggestions_cache', ['id'], unique=False)
op.create_index(op.f('ix_emotion_suggestions_cache_end_user_id'), 'emotion_suggestions_cache', ['end_user_id'], unique=True)
op.drop_index(op.f('ix_file_metadata_workspace_id'), table_name='file_metadata')
op.drop_index(op.f('ix_file_metadata_tenant_id'), table_name='file_metadata')
op.drop_index(op.f('ix_file_metadata_id'), table_name='file_metadata')
op.drop_index(op.f('ix_file_metadata_file_key'), table_name='file_metadata')
op.drop_table('file_metadata')
# ### end Alembic commands ###

View File

@@ -13,7 +13,6 @@ dependencies = [
"bcrypt==5.0.0",
"billiard==4.2.2",
"celery==5.5.3",
"flower==2.0.1",
"cffi==2.0.0",
"click==8.3.0",
"click-didyoumean==0.3.1",
@@ -139,7 +138,9 @@ dependencies = [
"python-calamine>=0.4.0",
"xlrd==2.0.2",
"deprecated>=1.3.1",
"oss2>=2.19.1",
"flower>=2.0.1",
"aiofile>=3.9.0",
]
[tool.pytest.ini_options]

View File

@@ -6,7 +6,7 @@ async-timeout==5.0.1
bcrypt==5.0.0
billiard==4.2.2
celery==5.5.3
flower==2.0.1
flower>=2.0.1
cffi==2.0.0
click==8.3.0
click-didyoumean==0.3.1
@@ -132,3 +132,6 @@ markdown-to-json==2.1.1
valkey==6.0.2
python-calamine>=0.4.0
xlrd==2.0.2
oss2>=2.18.0
boto3>=1.28.0
aiofiles>=23.0.0

3177
api/uv.lock generated

File diff suppressed because it is too large Load Diff