From 30cdf229de27370613e465d6177e4327ffa037b1 Mon Sep 17 00:00:00 2001 From: Mark <348207283@qq.com> Date: Mon, 27 Apr 2026 16:05:27 +0800 Subject: [PATCH] [modify] rag file system --- api/app/controllers/chunk_controller.py | 40 +- api/app/controllers/document_controller.py | 37 +- api/app/controllers/file_controller.py | 420 ++++++------------ .../rag/prompts/vision_llm_describe_prompt.md | 1 + api/app/models/file_model.py | 1 + api/app/schemas/file_schema.py | 1 + api/app/services/file_storage_service.py | 34 +- api/app/tasks.py | 42 +- 8 files changed, 228 insertions(+), 348 deletions(-) diff --git a/api/app/controllers/chunk_controller.py b/api/app/controllers/chunk_controller.py index b2cc3695..f031efbb 100644 --- a/api/app/controllers/chunk_controller.py +++ b/api/app/controllers/chunk_controller.py @@ -82,19 +82,32 @@ async def get_preview_chunks( detail="The file does not exist or you do not have permission to access it" ) - # 5. Construct file path:/files/{kb_id}/{parent_id}/{file.id}{file.file_ext} - file_path = os.path.join( - settings.FILE_PATH, - str(db_file.kb_id), - str(db_file.parent_id), - f"{db_file.id}{db_file.file_ext}" - ) - - # 6. Check if the file exists - if not os.path.exists(file_path): + # 5. Get file content from storage backend + if not db_file.file_key: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail="File not found (possibly deleted)" + detail="File has no storage key (legacy data not migrated)" + ) + + from app.services.file_storage_service import FileStorageService + import asyncio + storage_service = FileStorageService() + + async def _download(): + return await storage_service.download_file(db_file.file_key) + + try: + file_binary = asyncio.run(_download()) + except RuntimeError: + loop = asyncio.new_event_loop() + try: + file_binary = loop.run_until_complete(_download()) + finally: + loop.close() + except Exception as e: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"File not found in storage: {e}" ) # 7. Document parsing & segmentation @@ -104,11 +117,12 @@ async def get_preview_chunks( vision_model = QWenCV( key=db_knowledge.image2text.api_keys[0].api_key, model_name=db_knowledge.image2text.api_keys[0].model_name, - lang="Chinese", # Default to Chinese + lang="Chinese", base_url=db_knowledge.image2text.api_keys[0].api_base ) from app.core.rag.app.naive import chunk - res = chunk(filename=file_path, + res = chunk(filename=db_file.file_name, + binary=file_binary, from_page=0, to_page=5, callback=progress_callback, diff --git a/api/app/controllers/document_controller.py b/api/app/controllers/document_controller.py index 350acc0e..02e16943 100644 --- a/api/app/controllers/document_controller.py +++ b/api/app/controllers/document_controller.py @@ -305,38 +305,25 @@ async def parse_documents( detail="The file does not exist or you do not have permission to access it" ) - # 3. Construct file path:/files/{kb_id}/{parent_id}/{file.id}{file.file_ext} - file_path = os.path.join( - settings.FILE_PATH, - str(db_file.kb_id), - str(db_file.parent_id), - f"{db_file.id}{db_file.file_ext}" - ) - - # 4. Check if the file exists - api_logger.debug(f"Constructed file path: {file_path}") - api_logger.debug(f"File metadata - kb_id: {db_file.kb_id}, parent_id: {db_file.parent_id}, file_id: {db_file.id}, extension: {db_file.file_ext}") - if not os.path.exists(file_path): - api_logger.error(f"File not found (possibly deleted): file_path={file_path}, file_id={db_file.id}, document_id={document_id}") + # 3. Get file_key for storage backend + if not db_file.file_key: + api_logger.error(f"File has no storage key (legacy data not migrated): file_id={db_file.id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail="File not found (possibly deleted)" + detail="File has no storage key (legacy data not migrated)" ) - # 5. Obtain knowledge base information - api_logger.info( f"Obtain details of the knowledge base: knowledge_id={db_document.kb_id}") + # 4. Obtain knowledge base information + api_logger.info(f"Obtain details of the knowledge base: knowledge_id={db_document.kb_id}") db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=db_document.kb_id, current_user=current_user) if not db_knowledge: - api_logger.warning(f"The knowledge base does not exist or access is denied: knowledge_id={db_document.kb_id}") - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="The knowledge base does not exist or access is denied" - ) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Knowledge base not found") - # 6. Task: Document parsing, vectorization, and storage - # from app.tasks import parse_document - # parse_document(file_path, document_id) - task = celery_app.send_task("app.core.rag.tasks.parse_document", args=[file_path, document_id]) + # 5. Dispatch parse task with file_key (not file_path) + task = celery_app.send_task( + "app.core.rag.tasks.parse_document", + args=[db_file.file_key, document_id, db_file.file_name] + ) result = { "task_id": task.id } diff --git a/api/app/controllers/file_controller.py b/api/app/controllers/file_controller.py index f7bd0e7a..c213b6c6 100644 --- a/api/app/controllers/file_controller.py +++ b/api/app/controllers/file_controller.py @@ -1,12 +1,10 @@ import os -from pathlib import Path -import shutil from typing import Any, Optional import uuid from fastapi import APIRouter, Depends, HTTPException, status, File, UploadFile, Query from fastapi.encoders import jsonable_encoder -from fastapi.responses import FileResponse +from fastapi.responses import Response from sqlalchemy.orm import Session from app.core.config import settings @@ -19,9 +17,13 @@ from app.models.user_model import User from app.schemas import file_schema, document_schema from app.schemas.response_schema import ApiResponse from app.services import file_service, document_service +from app.services.knowledge_service import get_knowledge_by_id as get_kb_by_id +from app.services.file_storage_service import ( + FileStorageService, + generate_kb_file_key, + get_file_storage_service, +) - -# Obtain a dedicated API logger api_logger = get_api_logger() router = APIRouter( @@ -34,67 +36,37 @@ router = APIRouter( async def get_files( kb_id: uuid.UUID, parent_id: uuid.UUID, - page: int = Query(1, gt=0), # Default: 1, which must be greater than 0 - pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items + page: int = Query(1, gt=0), + pagesize: int = Query(20, gt=0, le=100), orderby: Optional[str] = Query(None, description="Sort fields, such as: created_at"), desc: Optional[bool] = Query(False, description="Is it descending order"), keywords: Optional[str] = Query(None, description="Search keywords (file name)"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): - """ - Paged query file list - - Support filtering by kb_id and parent_id - - Support keyword search for file names - - Support dynamic sorting - - Return paging metadata + file list - """ - api_logger.info(f"Query file list: kb_id={kb_id}, parent_id={parent_id}, page={page}, pagesize={pagesize}, keywords={keywords}, username: {current_user.username}") - # 1. parameter validation - if page < 1 or pagesize < 1: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="The paging parameter must be greater than 0" - ) + """Paged query file list""" + api_logger.info(f"Query file list: kb_id={kb_id}, parent_id={parent_id}, page={page}, pagesize={pagesize}") - # 2. Construct query conditions - filters = [ - file_model.File.kb_id == kb_id - ] + if page < 1 or pagesize < 1: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The paging parameter must be greater than 0") + + filters = [file_model.File.kb_id == kb_id] if parent_id: filters.append(file_model.File.parent_id == parent_id) - # Keyword search (fuzzy matching of file name) if keywords: filters.append(file_model.File.file_name.ilike(f"%{keywords}%")) - # 3. Execute paged query try: - api_logger.debug("Start executing file paging query") total, items = file_service.get_files_paginated( - db=db, - filters=filters, - page=page, - pagesize=pagesize, - orderby=orderby, - desc=desc, - current_user=current_user + db=db, filters=filters, page=page, pagesize=pagesize, + orderby=orderby, desc=desc, current_user=current_user ) - api_logger.info(f"File query successful: total={total}, returned={len(items)} records") except Exception as e: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Query failed: {str(e)}" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Query failed: {str(e)}") - # 4. Return structured response result = { "items": items, - "page": { - "page": page, - "pagesize": pagesize, - "total": total, - "has_next": True if page * pagesize < total else False - } + "page": {"page": page, "pagesize": pagesize, "total": total, "has_next": page * pagesize < total} } return success(data=jsonable_encoder(result), msg="Query of file list succeeded") @@ -107,23 +79,14 @@ async def create_folder( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): - """ - Create a new folder - """ - api_logger.info(f"Create folder request: kb_id={kb_id}, parent_id={parent_id}, folder_name={folder_name}, username: {current_user.username}") - + """Create a new folder""" + api_logger.info(f"Create folder request: kb_id={kb_id}, parent_id={parent_id}, folder_name={folder_name}") try: - api_logger.debug(f"Start creating a folder: {folder_name}") - create_folder = file_schema.FileCreate( - kb_id=kb_id, - created_by=current_user.id, - parent_id=parent_id, - file_name=folder_name, - file_ext='folder', - file_size=0, + create_folder_data = file_schema.FileCreate( + kb_id=kb_id, created_by=current_user.id, parent_id=parent_id, + file_name=folder_name, file_ext='folder', file_size=0, ) - db_file = file_service.create_file(db=db, file=create_folder, current_user=current_user) - api_logger.info(f"Folder created successfully: {db_file.file_name} (ID: {db_file.id})") + db_file = file_service.create_file(db=db, file=create_folder_data, current_user=current_user) return success(data=jsonable_encoder(file_schema.File.model_validate(db_file)), msg="Folder creation successful") except Exception as e: api_logger.error(f"Folder creation failed: {folder_name} - {str(e)}") @@ -136,76 +99,58 @@ async def upload_file( parent_id: uuid.UUID, file: UploadFile = File(...), db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) + current_user: User = Depends(get_current_user), + storage_service: FileStorageService = Depends(get_file_storage_service), ): - """ - upload file - """ - api_logger.info(f"upload file request: kb_id={kb_id}, parent_id={parent_id}, filename={file.filename}, username: {current_user.username}") + """Upload file to storage backend""" + api_logger.info(f"upload file request: kb_id={kb_id}, parent_id={parent_id}, filename={file.filename}") - # Read the contents of the file contents = await file.read() - # Check file size file_size = len(contents) - print(f"file size: {file_size} byte") if file_size == 0: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="The file is empty." - ) - # If the file size exceeds 50MB (50 * 1024 * 1024 bytes) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The file is empty.") if file_size > settings.MAX_FILE_SIZE: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"The file size exceeds the {settings.MAX_FILE_SIZE}byte limit" - ) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"File size exceeds {settings.MAX_FILE_SIZE} byte limit") - # Extract the extension using `os.path.splitext` _, file_extension = os.path.splitext(file.filename) - upload_file = file_schema.FileCreate( - kb_id=kb_id, - created_by=current_user.id, - parent_id=parent_id, - file_name=file.filename, - file_ext=file_extension.lower(), - file_size=file_size, + file_ext = file_extension.lower() + + # Create File record + upload_file_data = file_schema.FileCreate( + kb_id=kb_id, created_by=current_user.id, parent_id=parent_id, + file_name=file.filename, file_ext=file_ext, file_size=file_size, ) - db_file = file_service.create_file(db=db, file=upload_file, current_user=current_user) + db_file = file_service.create_file(db=db, file=upload_file_data, current_user=current_user) - # Construct a save path:/files/{kb_id}/{parent_id}/{file.id}{file_extension} - save_dir = os.path.join(settings.FILE_PATH, str(kb_id), str(parent_id)) - Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists - save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}") + # Upload to storage backend + file_key = generate_kb_file_key(kb_id=kb_id, file_id=db_file.id, file_ext=file_ext) + try: + await storage_service.storage.upload(file_key=file_key, content=contents, content_type=file.content_type) + except Exception as e: + api_logger.error(f"Storage upload failed: {e}") + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File storage failed: {str(e)}") - # Save file - with open(save_path, "wb") as f: - f.write(contents) + # Save file_key + db_file.file_key = file_key + db.commit() + db.refresh(db_file) - # Verify whether the file has been saved successfully - if not os.path.exists(save_path): - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="File save failed" - ) + # Create document (inherit parser_config from knowledge base) + default_parser_config = { + "layout_recognize": "DeepDOC", "chunk_token_num": 128, "delimiter": "\n", + "auto_keywords": 0, "auto_questions": 0, "html4excel": "false" + } + try: + db_knowledge = get_kb_by_id(db, knowledge_id=kb_id, current_user=current_user) + if db_knowledge and db_knowledge.parser_config: + default_parser_config.update(dict(db_knowledge.parser_config)) + except Exception: + pass - # Create a document create_data = document_schema.DocumentCreate( - kb_id=kb_id, - created_by=current_user.id, - file_id=db_file.id, - file_name=db_file.file_name, - file_ext=db_file.file_ext, - file_size=db_file.file_size, - file_meta={}, - parser_id="naive", - parser_config={ - "layout_recognize": "DeepDOC", - "chunk_token_num": 128, - "delimiter": "\n", - "auto_keywords": 0, - "auto_questions": 0, - "html4excel": "false" - } + kb_id=kb_id, created_by=current_user.id, file_id=db_file.id, + file_name=db_file.file_name, file_ext=db_file.file_ext, file_size=db_file.file_size, + file_meta={}, parser_id="naive", parser_config=default_parser_config ) db_document = document_service.create_document(db=db, document=create_data, current_user=current_user) @@ -219,123 +164,73 @@ async def custom_text( parent_id: uuid.UUID, create_data: file_schema.CustomTextFileCreate, db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) + current_user: User = Depends(get_current_user), + storage_service: FileStorageService = Depends(get_file_storage_service), ): - """ - custom text - """ - api_logger.info(f"custom text upload request: kb_id={kb_id}, parent_id={parent_id}, title={create_data.title}, content={create_data.content}, username: {current_user.username}") - - # Check file content size - # 将内容编码为字节(UTF-8) + """Custom text upload""" content_bytes = create_data.content.encode('utf-8') file_size = len(content_bytes) - print(f"file size: {file_size} byte") if file_size == 0: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="The content is empty." - ) - # If the file size exceeds 50MB (50 * 1024 * 1024 bytes) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The content is empty.") if file_size > settings.MAX_FILE_SIZE: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"The content size exceeds the {settings.MAX_FILE_SIZE}byte limit" - ) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Content size exceeds {settings.MAX_FILE_SIZE} byte limit") - upload_file = file_schema.FileCreate( - kb_id=kb_id, - created_by=current_user.id, - parent_id=parent_id, - file_name=f"{create_data.title}.txt", - file_ext=".txt", - file_size=file_size, + upload_file_data = file_schema.FileCreate( + kb_id=kb_id, created_by=current_user.id, parent_id=parent_id, + file_name=f"{create_data.title}.txt", file_ext=".txt", file_size=file_size, ) - db_file = file_service.create_file(db=db, file=upload_file, current_user=current_user) + db_file = file_service.create_file(db=db, file=upload_file_data, current_user=current_user) - # Construct a save path:/files/{kb_id}/{parent_id}/{file.id}{file_extension} - save_dir = os.path.join(settings.FILE_PATH, str(kb_id), str(parent_id)) - Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists - save_path = os.path.join(save_dir, f"{db_file.id}.txt") + # Upload to storage backend + file_key = generate_kb_file_key(kb_id=kb_id, file_id=db_file.id, file_ext=".txt") + try: + await storage_service.storage.upload(file_key=file_key, content=content_bytes, content_type="text/plain") + except Exception as e: + api_logger.error(f"Storage upload failed: {e}") + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File storage failed: {str(e)}") - # Save file - with open(save_path, "wb") as f: - f.write(content_bytes) + db_file.file_key = file_key + db.commit() + db.refresh(db_file) - # Verify whether the file has been saved successfully - if not os.path.exists(save_path): - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="File save failed" - ) - - # Create a document create_document_data = document_schema.DocumentCreate( - kb_id=kb_id, - created_by=current_user.id, - file_id=db_file.id, - file_name=db_file.file_name, - file_ext=db_file.file_ext, - file_size=db_file.file_size, - file_meta={}, - parser_id="naive", - parser_config={ - "layout_recognize": "DeepDOC", - "chunk_token_num": 128, - "delimiter": "\n", - "auto_keywords": 0, - "auto_questions": 0, - "html4excel": "false" - } + kb_id=kb_id, created_by=current_user.id, file_id=db_file.id, + file_name=db_file.file_name, file_ext=db_file.file_ext, file_size=db_file.file_size, + file_meta={}, parser_id="naive", + parser_config={"layout_recognize": "DeepDOC", "chunk_token_num": 128, "delimiter": "\n", + "auto_keywords": 0, "auto_questions": 0, "html4excel": "false"} ) db_document = document_service.create_document(db=db, document=create_document_data, current_user=current_user) - api_logger.info(f"custom text upload successfully: {create_data.title} (file_id: {db_file.id}, document_id: {db_document.id})") return success(data=jsonable_encoder(document_schema.Document.model_validate(db_document)), msg="custom text upload successful") @router.get("/{file_id}", response_model=Any) async def get_file( file_id: uuid.UUID, - db: Session = Depends(get_db) + db: Session = Depends(get_db), + storage_service: FileStorageService = Depends(get_file_storage_service), ) -> Any: - """ - Download the file based on the file_id - - Query file information from the database - - Construct the file path and check if it exists - - Return a FileResponse to download the file - """ - api_logger.info(f"Download the file based on the file_id: file_id={file_id}") - - # 1. Query file information from the database + """Download file by file_id""" db_file = file_service.get_file_by_id(db, file_id=file_id) if not db_file: - api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}") - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="The file does not exist or you do not have permission to access it" - ) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found") - # 2. Construct file path:/files/{kb_id}/{parent_id}/{file.id}{file.file_ext} - file_path = os.path.join( - settings.FILE_PATH, - str(db_file.kb_id), - str(db_file.parent_id), - f"{db_file.id}{db_file.file_ext}" - ) + if not db_file.file_key: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File has no storage key (legacy data not migrated)") - # 3. Check if the file exists - if not os.path.exists(file_path): - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="File not found (possibly deleted)" - ) + try: + content = await storage_service.download_file(db_file.file_key) + except Exception as e: + api_logger.error(f"Storage download failed: {e}") + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found in storage") - # 4.Return FileResponse (automatically handle download) - return FileResponse( - path=file_path, - filename=db_file.file_name, # Use original file name - media_type="application/octet-stream" # Universal binary stream type + import mimetypes + media_type = mimetypes.guess_type(db_file.file_name)[0] or "application/octet-stream" + return Response( + content=content, + media_type=media_type, + headers={"Content-Disposition": f'attachment; filename="{db_file.file_name}"'} ) @@ -346,50 +241,22 @@ async def update_file( db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): - """ - Update file information (such as file name) - - Only specified fields such as file_name are allowed to be modified - """ - api_logger.debug(f"Query the file to be updated: {file_id}") - - # 1. Check if the file exists + """Update file information (such as file name)""" db_file = file_service.get_file_by_id(db, file_id=file_id) - if not db_file: - api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}") - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="The file does not exist or you do not have permission to access it" - ) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found") - # 2. Update fields (only update non-null fields) - api_logger.debug(f"Start updating the file fields: {file_id}") - updated_fields = [] for field, value in update_data.dict(exclude_unset=True).items(): if hasattr(db_file, field): - old_value = getattr(db_file, field) - if old_value != value: - # update value - setattr(db_file, field, value) - updated_fields.append(f"{field}: {old_value} -> {value}") + setattr(db_file, field, value) - if updated_fields: - api_logger.debug(f"updated fields: {', '.join(updated_fields)}") - - # 3. Save to database try: db.commit() db.refresh(db_file) - api_logger.info(f"The file has been successfully updated: {db_file.file_name} (ID: {db_file.id})") except Exception as e: db.rollback() - api_logger.error(f"File update failed: file_id={file_id} - {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"File update failed: {str(e)}" - ) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File update failed: {str(e)}") - # 4. Return the updated file return success(data=jsonable_encoder(file_schema.File.model_validate(db_file)), msg="File information updated successfully") @@ -397,60 +264,43 @@ async def update_file( async def delete_file( file_id: uuid.UUID, db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) + current_user: User = Depends(get_current_user), + storage_service: FileStorageService = Depends(get_file_storage_service), ): - """ - Delete a file or folder - """ - api_logger.info(f"Request to delete file: file_id={file_id}, username: {current_user.username}") - await _delete_file(db=db, file_id=file_id, current_user=current_user) + """Delete a file or folder""" + api_logger.info(f"Request to delete file: file_id={file_id}") + await _delete_file(db=db, file_id=file_id, current_user=current_user, storage_service=storage_service) return success(msg="File deleted successfully") + async def _delete_file( file_id: uuid.UUID, - db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) + db: Session, + current_user: User, + storage_service: FileStorageService, ) -> None: - """ - Delete a file or folder - """ - # 1. Check if the file exists + """Delete a file or folder from storage and database""" db_file = file_service.get_file_by_id(db, file_id=file_id) - if not db_file: - api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}") - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="The file does not exist or you do not have permission to access it" - ) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found") - # 2. Construct physical path - file_path = Path( - settings.FILE_PATH, - str(db_file.kb_id), - str(db_file.id) - ) if db_file.file_ext == 'folder' else Path( - settings.FILE_PATH, - str(db_file.kb_id), - str(db_file.parent_id), - f"{db_file.id}{db_file.file_ext}" - ) - - # 3. Delete physical files/folders - try: - if file_path.exists(): - if db_file.file_ext == 'folder': - shutil.rmtree(file_path) # Recursively delete folders - else: - file_path.unlink() # Delete a single file - except Exception as e: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to delete physical file/folder: {str(e)}" - ) - - # 4.Delete db_file + # Delete from storage backend if db_file.file_ext == 'folder': + # For folders, delete all child files from storage first + child_files = db.query(file_model.File).filter(file_model.File.parent_id == db_file.id).all() + for child in child_files: + if child.file_key: + try: + await storage_service.delete_file(child.file_key) + except Exception as e: + api_logger.warning(f"Failed to delete child file from storage: {child.file_key} - {e}") db.query(file_model.File).filter(file_model.File.parent_id == db_file.id).delete() + else: + if db_file.file_key: + try: + await storage_service.delete_file(db_file.file_key) + except Exception as e: + api_logger.warning(f"Failed to delete file from storage: {db_file.file_key} - {e}") + db.delete(db_file) db.commit() diff --git a/api/app/core/rag/prompts/vision_llm_describe_prompt.md b/api/app/core/rag/prompts/vision_llm_describe_prompt.md index 8800703d..151e06b6 100644 --- a/api/app/core/rag/prompts/vision_llm_describe_prompt.md +++ b/api/app/core/rag/prompts/vision_llm_describe_prompt.md @@ -14,6 +14,7 @@ Transcribe the content from the provided PDF page image into clean Markdown form 6. Do NOT wrap the output in ```markdown or ``` blocks. 7. Only apply Markdown structure to headings, paragraphs, lists, and tables, strictly based on the layout of the image. Do NOT create tables unless an actual table exists in the image. 8. Preserve the original language, information, and order exactly as shown in the image. +9. Your output language MUST match the language of the content in the image. If the image contains Chinese text, output in Chinese. If English, output in English. Never translate. {% if page %} At the end of the transcription, add the page divider: `--- Page {{ page }} ---`. diff --git a/api/app/models/file_model.py b/api/app/models/file_model.py index 44a7d613..5f11b185 100644 --- a/api/app/models/file_model.py +++ b/api/app/models/file_model.py @@ -15,4 +15,5 @@ class File(Base): file_ext = Column(String, index=True, nullable=False, comment="file extension:folder|pdf") file_size = Column(Integer, default=0, comment="file size(byte)") file_url = Column(String, index=True, nullable=True, comment="file comes from a website url") + file_key = Column(String(512), nullable=True, index=True, comment="storage file key for FileStorageService") created_at = Column(DateTime, default=datetime.datetime.now) \ No newline at end of file diff --git a/api/app/schemas/file_schema.py b/api/app/schemas/file_schema.py index 7245671a..d01e8c77 100644 --- a/api/app/schemas/file_schema.py +++ b/api/app/schemas/file_schema.py @@ -11,6 +11,7 @@ class FileBase(BaseModel): file_ext: str file_size: int file_url: str | None = None + file_key: str | None = None created_at: datetime.datetime | None = None diff --git a/api/app/services/file_storage_service.py b/api/app/services/file_storage_service.py index 5897936b..22a864dc 100644 --- a/api/app/services/file_storage_service.py +++ b/api/app/services/file_storage_service.py @@ -34,26 +34,7 @@ def generate_file_key( Generate a unique file key for storage. The file key follows the format: {tenant_id}/{workspace_id}/{file_id}{file_ext} - - Args: - tenant_id: The tenant UUID. - workspace_id: The workspace UUID. - file_id: The file UUID. - file_ext: The file extension (e.g., '.pdf', '.txt'). - - Returns: - A unique file key string. - - Example: - >>> generate_file_key( - ... uuid.UUID('550e8400-e29b-41d4-a716-446655440000'), - ... uuid.UUID('660e8400-e29b-41d4-a716-446655440001'), - ... uuid.UUID('770e8400-e29b-41d4-a716-446655440002'), - ... '.pdf' - ... ) - '550e8400-e29b-41d4-a716-446655440000/660e8400-e29b-41d4-a716-446655440001/770e8400-e29b-41d4-a716-446655440002.pdf' """ - # Ensure file_ext starts with a dot if file_ext and not file_ext.startswith('.'): file_ext = f'.{file_ext}' if workspace_id: @@ -61,6 +42,21 @@ def generate_file_key( return f"{tenant_id}/{file_id}{file_ext}" +def generate_kb_file_key( + kb_id: uuid.UUID, + file_id: uuid.UUID, + file_ext: str, +) -> str: + """ + Generate a file key for knowledge base files. + + Format: kb/{kb_id}/{file_id}{file_ext} + """ + if file_ext and not file_ext.startswith('.'): + file_ext = f'.{file_ext}' + return f"kb/{kb_id}/{file_id}{file_ext}" + + class FileStorageService: """ High-level service for file storage operations. diff --git a/api/app/tasks.py b/api/app/tasks.py index 5a71066a..2e024255 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -210,9 +210,14 @@ def _build_vision_model(file_path: str, db_knowledge): @celery_app.task(name="app.core.rag.tasks.parse_document") -def parse_document(file_path: str, document_id: uuid.UUID): +def parse_document(file_key: str, document_id: uuid.UUID, file_name: str = ""): """ - Document parsing, vectorization, and storage + Document parsing, vectorization, and storage. + + Args: + file_key: Storage key for FileStorageService (e.g. "kb/{kb_id}/{file_id}.docx") + document_id: Document UUID + file_name: Original file name (used for extension detection in chunk()) """ db_document = None @@ -223,7 +228,6 @@ def parse_document(file_path: str, document_id: uuid.UUID): with get_db_context() as db: try: - # Celery JSON 序列化会将 UUID 转为字符串,需要确保类型正确 if not isinstance(document_id, uuid.UUID): document_id = uuid.UUID(str(document_id)) @@ -234,7 +238,11 @@ def parse_document(file_path: str, document_id: uuid.UUID): if db_knowledge is None: raise ValueError(f"Knowledge {db_document.kb_id} not found") - # 1. Document parsing & segmentation + # Use file_name from argument or fall back to document record + if not file_name: + file_name = db_document.file_name + + # 1. Download file from storage backend progress_lines.append(f"{datetime.now().strftime('%H:%M:%S')} Start to parse.") start_time = time.time() db_document.progress = 0.0 @@ -245,14 +253,36 @@ def parse_document(file_path: str, document_id: uuid.UUID): db.commit() db.refresh(db_document) + # Read file content from storage backend (no NFS dependency) + from app.services.file_storage_service import FileStorageService + import asyncio + storage_service = FileStorageService() + + async def _download(): + return await storage_service.download_file(file_key) + + try: + file_binary = asyncio.run(_download()) + except RuntimeError: + # If there's already a running loop (e.g. in some worker configurations) + loop = asyncio.new_event_loop() + try: + file_binary = loop.run_until_complete(_download()) + finally: + loop.close() + if not file_binary: + raise IOError(f"Downloaded empty file from storage: {file_key}") + logger.info(f"[ParseDoc] Downloaded {len(file_binary)} bytes from storage key: {file_key}") + def progress_callback(prog=None, msg=None): progress_lines.append(f"{datetime.now().strftime('%H:%M:%S')} parse progress: {prog} msg: {msg}.") # Prepare vision_model for parsing - vision_model = _build_vision_model(file_path, db_knowledge) + vision_model = _build_vision_model(file_name, db_knowledge) from app.core.rag.app.naive import chunk - res = chunk(filename=file_path, + res = chunk(filename=file_name, + binary=file_binary, from_page=0, to_page=DEFAULT_PARSE_TO_PAGE, callback=progress_callback,