import os from typing import Any, Optional from pathlib import Path import shutil import uuid from fastapi import APIRouter, Depends, HTTPException, status, File, UploadFile, Query from fastapi.responses import FileResponse from sqlalchemy.orm import Session from app.core.config import settings from app.db import get_db from app.dependencies import get_current_user from app.models.user_model import User from app.models import file_model from app.schemas import file_schema, document_schema from app.schemas.response_schema import ApiResponse from app.core.response_utils import success from app.services import file_service, document_service from app.core.logging_config import get_api_logger # Obtain a dedicated API logger api_logger = get_api_logger() router = APIRouter( prefix="/files", tags=["files"] ) @router.get("/{kb_id}/{parent_id}/files", response_model=ApiResponse) async def get_files( kb_id: uuid.UUID, parent_id: uuid.UUID, page: int = Query(1, gt=0), # Default: 1, which must be greater than 0 pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items orderby: Optional[str] = Query(None, description="Sort fields, such as: created_at"), desc: Optional[bool] = Query(False, description="Is it descending order"), keywords: Optional[str] = Query(None, description="Search keywords (file name)"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Paged query file list - Support filtering by kb_id and parent_id - Support keyword search for file names - Support dynamic sorting - Return paging metadata + file list """ api_logger.info(f"Query file list: kb_id={kb_id}, parent_id={parent_id}, page={page}, pagesize={pagesize}, keywords={keywords}, username: {current_user.username}") # 1. parameter validation if page < 1 or pagesize < 1: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="The paging parameter must be greater than 0" ) # 2. Construct query conditions filters = [ file_model.File.kb_id == kb_id ] if parent_id: filters.append(file_model.File.parent_id == parent_id) # Keyword search (fuzzy matching of file name) if keywords: filters.append(file_model.File.file_name.ilike(f"%{keywords}%")) # 3. Execute paged query try: api_logger.debug("Start executing file paging query") total, items = file_service.get_files_paginated( db=db, filters=filters, page=page, pagesize=pagesize, orderby=orderby, desc=desc, current_user=current_user ) api_logger.info(f"File query successful: total={total}, returned={len(items)} records") except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Query failed: {str(e)}" ) # 4. Return structured response result = { "items": items, "page": { "page": page, "pagesize": pagesize, "total": total, "has_next": True if page * pagesize < total else False } } return success(data=result, msg="Query of file list succeeded") @router.post("/folder", response_model=ApiResponse) def create_folder( kb_id: uuid.UUID, parent_id: uuid.UUID, folder_name: str = '/', db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Create a new folder """ api_logger.info(f"Create folder request: kb_id={kb_id}, parent_id={parent_id}, folder_name={folder_name}, username: {current_user.username}") try: api_logger.debug(f"Start creating a folder: {folder_name}") create_folder = file_schema.FileCreate( kb_id=kb_id, created_by=current_user.id, parent_id=parent_id, file_name=folder_name, file_ext='folder', file_size=0, ) db_file = file_service.create_file(db=db, file=create_folder, current_user=current_user) api_logger.info(f"Folder created successfully: {db_file.file_name} (ID: {db_file.id})") return success(data=file_schema.File.model_validate(db_file), msg="Folder creation successful") except Exception as e: api_logger.error(f"Folder creation failed: {folder_name} - {str(e)}") raise @router.post("/file", response_model=ApiResponse) async def upload_file( kb_id: uuid.UUID, parent_id: uuid.UUID, file: UploadFile = File(...), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ upload file """ api_logger.info(f"upload file request: kb_id={kb_id}, parent_id={parent_id}, filename={file.filename}, username: {current_user.username}") # Read the contents of the file contents = await file.read() # Check file size file_size = len(contents) print(f"file size: {file_size} byte") if file_size == 0: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="The file is empty." ) # If the file size exceeds 50MB (50 * 1024 * 1024 bytes) if file_size > settings.MAX_FILE_SIZE: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"The file size exceeds the {settings.MAX_FILE_SIZE}byte limit" ) # Extract the extension using `os.path.splitext` _, file_extension = os.path.splitext(file.filename) upload_file = file_schema.FileCreate( kb_id=kb_id, created_by=current_user.id, parent_id=parent_id, file_name=file.filename, file_ext=file_extension.lower(), file_size=file_size, ) db_file = file_service.create_file(db=db, file=upload_file, current_user=current_user) # Construct a save path:/files/{kb_id}/{parent_id}/{file.id}{file_extension} save_dir = os.path.join(settings.FILE_PATH, str(kb_id), str(parent_id)) Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}") # Save file with open(save_path, "wb") as f: f.write(contents) # Verify whether the file has been saved successfully if not os.path.exists(save_path): raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="File save failed" ) # Create a document create_data = document_schema.DocumentCreate( kb_id=kb_id, created_by=current_user.id, file_id=db_file.id, file_name=db_file.file_name, file_ext=db_file.file_ext, file_size=db_file.file_size, file_meta={}, parser_id="naive", parser_config={ "layout_recognize": "DeepDOC", "chunk_token_num": 128, "delimiter": "\n", "auto_keywords": 0, "auto_questions": 0, "html4excel": "false" } ) db_document = document_service.create_document(db=db, document=create_data, current_user=current_user) api_logger.info(f"File upload successfully: {file.filename} (file_id: {db_file.id}, document_id: {db_document.id})") return success(data=document_schema.Document.model_validate(db_document), msg="File upload successful") @router.post("/customtext", response_model=ApiResponse) async def custom_text( kb_id: uuid.UUID, parent_id: uuid.UUID, create_data: file_schema.CustomTextFileCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ custom text """ api_logger.info(f"custom text upload request: kb_id={kb_id}, parent_id={parent_id}, title={create_data.title}, content={create_data.content}, username: {current_user.username}") # Check file content size # 将内容编码为字节(UTF-8) content_bytes = create_data.content.encode('utf-8') file_size = len(content_bytes) print(f"file size: {file_size} byte") if file_size == 0: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="The content is empty." ) # If the file size exceeds 50MB (50 * 1024 * 1024 bytes) if file_size > settings.MAX_FILE_SIZE: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"The content size exceeds the {settings.MAX_FILE_SIZE}byte limit" ) upload_file = file_schema.FileCreate( kb_id=kb_id, created_by=current_user.id, parent_id=parent_id, file_name=f"{create_data.title}.txt", file_ext=".txt", file_size=file_size, ) db_file = file_service.create_file(db=db, file=upload_file, current_user=current_user) # Construct a save path:/files/{kb_id}/{parent_id}/{file.id}{file_extension} save_dir = os.path.join(settings.FILE_PATH, str(kb_id), str(parent_id)) Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists save_path = os.path.join(save_dir, f"{db_file.id}.txt") # Save file with open(save_path, "wb") as f: f.write(content_bytes) # Verify whether the file has been saved successfully if not os.path.exists(save_path): raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="File save failed" ) # Create a document create_document_data = document_schema.DocumentCreate( kb_id=kb_id, created_by=current_user.id, file_id=db_file.id, file_name=db_file.file_name, file_ext=db_file.file_ext, file_size=db_file.file_size, file_meta={}, parser_id="naive", parser_config={ "layout_recognize": "DeepDOC", "chunk_token_num": 128, "delimiter": "\n", "auto_keywords": 0, "auto_questions": 0, "html4excel": "false" } ) db_document = document_service.create_document(db=db, document=create_document_data, current_user=current_user) api_logger.info(f"custom text upload successfully: {create_data.title} (file_id: {db_file.id}, document_id: {db_document.id})") return success(data=document_schema.Document.model_validate(db_document), msg="custom text upload successful") @router.get("/{file_id}", response_model=Any) async def get_file( file_id: uuid.UUID, db: Session = Depends(get_db) ) -> Any: """ Download the file based on the file_id - Query file information from the database - Construct the file path and check if it exists - Return a FileResponse to download the file """ api_logger.info(f"Download the file based on the file_id: file_id={file_id}") # 1. Query file information from the database db_file = file_service.get_file_by_id(db, file_id=file_id) if not db_file: api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="The file does not exist or you do not have permission to access it" ) # 2. Construct file path:/files/{kb_id}/{parent_id}/{file.id}{file.file_ext} file_path = os.path.join( settings.FILE_PATH, str(db_file.kb_id), str(db_file.parent_id), f"{db_file.id}{db_file.file_ext}" ) # 3. Check if the file exists if not os.path.exists(file_path): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="File not found (possibly deleted)" ) # 4.Return FileResponse (automatically handle download) return FileResponse( path=file_path, filename=db_file.file_name, # Use original file name media_type="application/octet-stream" # Universal binary stream type ) @router.put("/{file_id}", response_model=ApiResponse) async def update_file( file_id: uuid.UUID, update_data: file_schema.FileUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Update file information (such as file name) - Only specified fields such as file_name are allowed to be modified """ api_logger.debug(f"Query the file to be updated: {file_id}") # 1. Check if the file exists db_file = file_service.get_file_by_id(db, file_id=file_id) if not db_file: api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="The file does not exist or you do not have permission to access it" ) # 2. Update fields (only update non-null fields) api_logger.debug(f"Start updating the file fields: {file_id}") updated_fields = [] for field, value in update_data.items(): if hasattr(db_file, field): old_value = getattr(db_file, field) if old_value != value: # update value setattr(db_file, field, value) updated_fields.append(f"{field}: {old_value} -> {value}") if updated_fields: api_logger.debug(f"updated fields: {', '.join(updated_fields)}") # 3. Save to database try: db.commit() db.refresh(db_file) api_logger.info(f"The file has been successfully updated: {db_file.file_name} (ID: {db_file.id})") except Exception as e: db.rollback() api_logger.error(f"File update failed: file_id={file_id} - {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File update failed: {str(e)}" ) # 4. Return the updated file return success(data=file_schema.File.model_validate(db_file), msg="File information updated successfully") @router.delete("/{file_id}", response_model=ApiResponse) async def delete_file( file_id: uuid.UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Delete a file or folder """ api_logger.info(f"Request to delete file: file_id={file_id}, username: {current_user.username}") await _delete_file(db=db, file_id=file_id, current_user=current_user) return success(msg="File deleted successfully") async def _delete_file( file_id: uuid.UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ) -> None: """ Delete a file or folder """ # 1. Check if the file exists db_file = file_service.get_file_by_id(db, file_id=file_id) if not db_file: api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="The file does not exist or you do not have permission to access it" ) # 2. Construct physical path file_path = Path( settings.FILE_PATH, str(db_file.kb_id), str(db_file.id) ) if db_file.file_ext == 'folder' else Path( settings.FILE_PATH, str(db_file.kb_id), str(db_file.parent_id), f"{db_file.id}{db_file.file_ext}" ) # 3. Delete physical files/folders try: if file_path.exists(): if db_file.file_ext == 'folder': shutil.rmtree(file_path) # Recursively delete folders else: file_path.unlink() # Delete a single file except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete physical file/folder: {str(e)}" ) # 4.Delete db_file if db_file.file_ext == 'folder': db.query(file_model.File).filter(file_model.File.parent_id == db_file.id).delete() db.delete(db_file) db.commit()