Merge branch 'feature/rag2' into develop
This commit is contained in:
@@ -41,7 +41,7 @@ def list_app_logs(
|
||||
|
||||
# 验证应用访问权限
|
||||
app_service = AppService(db)
|
||||
app_service.get_app(app_id, workspace_id)
|
||||
app = app_service.get_app(app_id, workspace_id)
|
||||
|
||||
# 使用 Service 层查询
|
||||
log_service = AppLogService(db)
|
||||
@@ -51,7 +51,8 @@ def list_app_logs(
|
||||
page=page,
|
||||
pagesize=pagesize,
|
||||
is_draft=is_draft,
|
||||
keyword=keyword
|
||||
keyword=keyword,
|
||||
app_type=app.type,
|
||||
)
|
||||
|
||||
items = [AppLogConversation.model_validate(c) for c in conversations]
|
||||
|
||||
@@ -82,19 +82,32 @@ async def get_preview_chunks(
|
||||
detail="The file does not exist or you do not have permission to access it"
|
||||
)
|
||||
|
||||
# 5. Construct file path:/files/{kb_id}/{parent_id}/{file.id}{file.file_ext}
|
||||
file_path = os.path.join(
|
||||
settings.FILE_PATH,
|
||||
str(db_file.kb_id),
|
||||
str(db_file.parent_id),
|
||||
f"{db_file.id}{db_file.file_ext}"
|
||||
)
|
||||
|
||||
# 6. Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
# 5. Get file content from storage backend
|
||||
if not db_file.file_key:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="File not found (possibly deleted)"
|
||||
detail="File has no storage key (legacy data not migrated)"
|
||||
)
|
||||
|
||||
from app.services.file_storage_service import FileStorageService
|
||||
import asyncio
|
||||
storage_service = FileStorageService()
|
||||
|
||||
async def _download():
|
||||
return await storage_service.download_file(db_file.file_key)
|
||||
|
||||
try:
|
||||
file_binary = asyncio.run(_download())
|
||||
except RuntimeError:
|
||||
loop = asyncio.new_event_loop()
|
||||
try:
|
||||
file_binary = loop.run_until_complete(_download())
|
||||
finally:
|
||||
loop.close()
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"File not found in storage: {e}"
|
||||
)
|
||||
|
||||
# 7. Document parsing & segmentation
|
||||
@@ -104,11 +117,12 @@ async def get_preview_chunks(
|
||||
vision_model = QWenCV(
|
||||
key=db_knowledge.image2text.api_keys[0].api_key,
|
||||
model_name=db_knowledge.image2text.api_keys[0].model_name,
|
||||
lang="Chinese", # Default to Chinese
|
||||
lang="Chinese",
|
||||
base_url=db_knowledge.image2text.api_keys[0].api_base
|
||||
)
|
||||
from app.core.rag.app.naive import chunk
|
||||
res = chunk(filename=file_path,
|
||||
res = chunk(filename=db_file.file_name,
|
||||
binary=file_binary,
|
||||
from_page=0,
|
||||
to_page=5,
|
||||
callback=progress_callback,
|
||||
|
||||
@@ -20,6 +20,7 @@ from app.models.user_model import User
|
||||
from app.schemas import document_schema
|
||||
from app.schemas.response_schema import ApiResponse
|
||||
from app.services import document_service, file_service, knowledge_service
|
||||
from app.services.file_storage_service import FileStorageService, get_file_storage_service
|
||||
|
||||
|
||||
# Obtain a dedicated API logger
|
||||
@@ -231,7 +232,8 @@ async def update_document(
|
||||
async def delete_document(
|
||||
document_id: uuid.UUID,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
current_user: User = Depends(get_current_user),
|
||||
storage_service: FileStorageService = Depends(get_file_storage_service),
|
||||
):
|
||||
"""
|
||||
Delete document
|
||||
@@ -257,7 +259,7 @@ async def delete_document(
|
||||
db.commit()
|
||||
|
||||
# 3. Delete file
|
||||
await file_controller._delete_file(db=db, file_id=file_id, current_user=current_user)
|
||||
await file_controller._delete_file(db=db, file_id=file_id, current_user=current_user, storage_service=storage_service)
|
||||
|
||||
# 4. Delete vector index
|
||||
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=db_document.kb_id, current_user=current_user)
|
||||
@@ -305,38 +307,25 @@ async def parse_documents(
|
||||
detail="The file does not exist or you do not have permission to access it"
|
||||
)
|
||||
|
||||
# 3. Construct file path:/files/{kb_id}/{parent_id}/{file.id}{file.file_ext}
|
||||
file_path = os.path.join(
|
||||
settings.FILE_PATH,
|
||||
str(db_file.kb_id),
|
||||
str(db_file.parent_id),
|
||||
f"{db_file.id}{db_file.file_ext}"
|
||||
)
|
||||
|
||||
# 4. Check if the file exists
|
||||
api_logger.debug(f"Constructed file path: {file_path}")
|
||||
api_logger.debug(f"File metadata - kb_id: {db_file.kb_id}, parent_id: {db_file.parent_id}, file_id: {db_file.id}, extension: {db_file.file_ext}")
|
||||
if not os.path.exists(file_path):
|
||||
api_logger.error(f"File not found (possibly deleted): file_path={file_path}, file_id={db_file.id}, document_id={document_id}")
|
||||
# 3. Get file_key for storage backend
|
||||
if not db_file.file_key:
|
||||
api_logger.error(f"File has no storage key (legacy data not migrated): file_id={db_file.id}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="File not found (possibly deleted)"
|
||||
detail="File has no storage key (legacy data not migrated)"
|
||||
)
|
||||
|
||||
# 5. Obtain knowledge base information
|
||||
# 4. Obtain knowledge base information
|
||||
api_logger.info(f"Obtain details of the knowledge base: knowledge_id={db_document.kb_id}")
|
||||
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=db_document.kb_id, current_user=current_user)
|
||||
if not db_knowledge:
|
||||
api_logger.warning(f"The knowledge base does not exist or access is denied: knowledge_id={db_document.kb_id}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="The knowledge base does not exist or access is denied"
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Knowledge base not found")
|
||||
|
||||
# 6. Task: Document parsing, vectorization, and storage
|
||||
# from app.tasks import parse_document
|
||||
# parse_document(file_path, document_id)
|
||||
task = celery_app.send_task("app.core.rag.tasks.parse_document", args=[file_path, document_id])
|
||||
# 5. Dispatch parse task with file_key (not file_path)
|
||||
task = celery_app.send_task(
|
||||
"app.core.rag.tasks.parse_document",
|
||||
args=[db_file.file_key, document_id, db_file.file_name]
|
||||
)
|
||||
result = {
|
||||
"task_id": task.id
|
||||
}
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from typing import Any, Optional
|
||||
import uuid
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, File, UploadFile, Query
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from fastapi.responses import FileResponse
|
||||
from fastapi.responses import Response
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.core.config import settings
|
||||
@@ -19,10 +17,14 @@ from app.models.user_model import User
|
||||
from app.schemas import file_schema, document_schema
|
||||
from app.schemas.response_schema import ApiResponse
|
||||
from app.services import file_service, document_service
|
||||
from app.services.knowledge_service import get_knowledge_by_id as get_kb_by_id
|
||||
from app.services.file_storage_service import (
|
||||
FileStorageService,
|
||||
generate_kb_file_key,
|
||||
get_file_storage_service,
|
||||
)
|
||||
from app.core.quota_stub import check_knowledge_capacity_quota
|
||||
|
||||
|
||||
# Obtain a dedicated API logger
|
||||
api_logger = get_api_logger()
|
||||
|
||||
router = APIRouter(
|
||||
@@ -35,67 +37,37 @@ router = APIRouter(
|
||||
async def get_files(
|
||||
kb_id: uuid.UUID,
|
||||
parent_id: uuid.UUID,
|
||||
page: int = Query(1, gt=0), # Default: 1, which must be greater than 0
|
||||
pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items
|
||||
page: int = Query(1, gt=0),
|
||||
pagesize: int = Query(20, gt=0, le=100),
|
||||
orderby: Optional[str] = Query(None, description="Sort fields, such as: created_at"),
|
||||
desc: Optional[bool] = Query(False, description="Is it descending order"),
|
||||
keywords: Optional[str] = Query(None, description="Search keywords (file name)"),
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
"""
|
||||
Paged query file list
|
||||
- Support filtering by kb_id and parent_id
|
||||
- Support keyword search for file names
|
||||
- Support dynamic sorting
|
||||
- Return paging metadata + file list
|
||||
"""
|
||||
api_logger.info(f"Query file list: kb_id={kb_id}, parent_id={parent_id}, page={page}, pagesize={pagesize}, keywords={keywords}, username: {current_user.username}")
|
||||
# 1. parameter validation
|
||||
if page < 1 or pagesize < 1:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="The paging parameter must be greater than 0"
|
||||
)
|
||||
"""Paged query file list"""
|
||||
api_logger.info(f"Query file list: kb_id={kb_id}, parent_id={parent_id}, page={page}, pagesize={pagesize}")
|
||||
|
||||
# 2. Construct query conditions
|
||||
filters = [
|
||||
file_model.File.kb_id == kb_id
|
||||
]
|
||||
if page < 1 or pagesize < 1:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The paging parameter must be greater than 0")
|
||||
|
||||
filters = [file_model.File.kb_id == kb_id]
|
||||
if parent_id:
|
||||
filters.append(file_model.File.parent_id == parent_id)
|
||||
# Keyword search (fuzzy matching of file name)
|
||||
if keywords:
|
||||
filters.append(file_model.File.file_name.ilike(f"%{keywords}%"))
|
||||
|
||||
# 3. Execute paged query
|
||||
try:
|
||||
api_logger.debug("Start executing file paging query")
|
||||
total, items = file_service.get_files_paginated(
|
||||
db=db,
|
||||
filters=filters,
|
||||
page=page,
|
||||
pagesize=pagesize,
|
||||
orderby=orderby,
|
||||
desc=desc,
|
||||
current_user=current_user
|
||||
db=db, filters=filters, page=page, pagesize=pagesize,
|
||||
orderby=orderby, desc=desc, current_user=current_user
|
||||
)
|
||||
api_logger.info(f"File query successful: total={total}, returned={len(items)} records")
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Query failed: {str(e)}"
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Query failed: {str(e)}")
|
||||
|
||||
# 4. Return structured response
|
||||
result = {
|
||||
"items": items,
|
||||
"page": {
|
||||
"page": page,
|
||||
"pagesize": pagesize,
|
||||
"total": total,
|
||||
"has_next": True if page * pagesize < total else False
|
||||
}
|
||||
"page": {"page": page, "pagesize": pagesize, "total": total, "has_next": page * pagesize < total}
|
||||
}
|
||||
return success(data=jsonable_encoder(result), msg="Query of file list succeeded")
|
||||
|
||||
@@ -108,23 +80,14 @@ async def create_folder(
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""
|
||||
Create a new folder
|
||||
"""
|
||||
api_logger.info(f"Create folder request: kb_id={kb_id}, parent_id={parent_id}, folder_name={folder_name}, username: {current_user.username}")
|
||||
|
||||
"""Create a new folder"""
|
||||
api_logger.info(f"Create folder request: kb_id={kb_id}, parent_id={parent_id}, folder_name={folder_name}")
|
||||
try:
|
||||
api_logger.debug(f"Start creating a folder: {folder_name}")
|
||||
create_folder = file_schema.FileCreate(
|
||||
kb_id=kb_id,
|
||||
created_by=current_user.id,
|
||||
parent_id=parent_id,
|
||||
file_name=folder_name,
|
||||
file_ext='folder',
|
||||
file_size=0,
|
||||
create_folder_data = file_schema.FileCreate(
|
||||
kb_id=kb_id, created_by=current_user.id, parent_id=parent_id,
|
||||
file_name=folder_name, file_ext='folder', file_size=0,
|
||||
)
|
||||
db_file = file_service.create_file(db=db, file=create_folder, current_user=current_user)
|
||||
api_logger.info(f"Folder created successfully: {db_file.file_name} (ID: {db_file.id})")
|
||||
db_file = file_service.create_file(db=db, file=create_folder_data, current_user=current_user)
|
||||
return success(data=jsonable_encoder(file_schema.File.model_validate(db_file)), msg="Folder creation successful")
|
||||
except Exception as e:
|
||||
api_logger.error(f"Folder creation failed: {folder_name} - {str(e)}")
|
||||
@@ -138,76 +101,58 @@ async def upload_file(
|
||||
parent_id: uuid.UUID,
|
||||
file: UploadFile = File(...),
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
current_user: User = Depends(get_current_user),
|
||||
storage_service: FileStorageService = Depends(get_file_storage_service),
|
||||
):
|
||||
"""
|
||||
upload file
|
||||
"""
|
||||
api_logger.info(f"upload file request: kb_id={kb_id}, parent_id={parent_id}, filename={file.filename}, username: {current_user.username}")
|
||||
"""Upload file to storage backend"""
|
||||
api_logger.info(f"upload file request: kb_id={kb_id}, parent_id={parent_id}, filename={file.filename}")
|
||||
|
||||
# Read the contents of the file
|
||||
contents = await file.read()
|
||||
# Check file size
|
||||
file_size = len(contents)
|
||||
print(f"file size: {file_size} byte")
|
||||
if file_size == 0:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="The file is empty."
|
||||
)
|
||||
# If the file size exceeds 50MB (50 * 1024 * 1024 bytes)
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The file is empty.")
|
||||
if file_size > settings.MAX_FILE_SIZE:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"The file size exceeds the {settings.MAX_FILE_SIZE}byte limit"
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"File size exceeds {settings.MAX_FILE_SIZE} byte limit")
|
||||
|
||||
# Extract the extension using `os.path.splitext`
|
||||
_, file_extension = os.path.splitext(file.filename)
|
||||
upload_file = file_schema.FileCreate(
|
||||
kb_id=kb_id,
|
||||
created_by=current_user.id,
|
||||
parent_id=parent_id,
|
||||
file_name=file.filename,
|
||||
file_ext=file_extension.lower(),
|
||||
file_size=file_size,
|
||||
file_ext = file_extension.lower()
|
||||
|
||||
# Create File record
|
||||
upload_file_data = file_schema.FileCreate(
|
||||
kb_id=kb_id, created_by=current_user.id, parent_id=parent_id,
|
||||
file_name=file.filename, file_ext=file_ext, file_size=file_size,
|
||||
)
|
||||
db_file = file_service.create_file(db=db, file=upload_file, current_user=current_user)
|
||||
db_file = file_service.create_file(db=db, file=upload_file_data, current_user=current_user)
|
||||
|
||||
# Construct a save path:/files/{kb_id}/{parent_id}/{file.id}{file_extension}
|
||||
save_dir = os.path.join(settings.FILE_PATH, str(kb_id), str(parent_id))
|
||||
Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists
|
||||
save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}")
|
||||
# Upload to storage backend
|
||||
file_key = generate_kb_file_key(kb_id=kb_id, file_id=db_file.id, file_ext=file_ext)
|
||||
try:
|
||||
await storage_service.storage.upload(file_key=file_key, content=contents, content_type=file.content_type)
|
||||
except Exception as e:
|
||||
api_logger.error(f"Storage upload failed: {e}")
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File storage failed: {str(e)}")
|
||||
|
||||
# Save file
|
||||
with open(save_path, "wb") as f:
|
||||
f.write(contents)
|
||||
# Save file_key
|
||||
db_file.file_key = file_key
|
||||
db.commit()
|
||||
db.refresh(db_file)
|
||||
|
||||
# Verify whether the file has been saved successfully
|
||||
if not os.path.exists(save_path):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="File save failed"
|
||||
)
|
||||
|
||||
# Create a document
|
||||
create_data = document_schema.DocumentCreate(
|
||||
kb_id=kb_id,
|
||||
created_by=current_user.id,
|
||||
file_id=db_file.id,
|
||||
file_name=db_file.file_name,
|
||||
file_ext=db_file.file_ext,
|
||||
file_size=db_file.file_size,
|
||||
file_meta={},
|
||||
parser_id="naive",
|
||||
parser_config={
|
||||
"layout_recognize": "DeepDOC",
|
||||
"chunk_token_num": 128,
|
||||
"delimiter": "\n",
|
||||
"auto_keywords": 0,
|
||||
"auto_questions": 0,
|
||||
"html4excel": "false"
|
||||
# Create document (inherit parser_config from knowledge base)
|
||||
default_parser_config = {
|
||||
"layout_recognize": "DeepDOC", "chunk_token_num": 128, "delimiter": "\n",
|
||||
"auto_keywords": 0, "auto_questions": 0, "html4excel": "false"
|
||||
}
|
||||
try:
|
||||
db_knowledge = get_kb_by_id(db, knowledge_id=kb_id, current_user=current_user)
|
||||
if db_knowledge and db_knowledge.parser_config:
|
||||
default_parser_config.update(dict(db_knowledge.parser_config))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
create_data = document_schema.DocumentCreate(
|
||||
kb_id=kb_id, created_by=current_user.id, file_id=db_file.id,
|
||||
file_name=db_file.file_name, file_ext=db_file.file_ext, file_size=db_file.file_size,
|
||||
file_meta={}, parser_id="naive", parser_config=default_parser_config
|
||||
)
|
||||
db_document = document_service.create_document(db=db, document=create_data, current_user=current_user)
|
||||
|
||||
@@ -221,123 +166,73 @@ async def custom_text(
|
||||
parent_id: uuid.UUID,
|
||||
create_data: file_schema.CustomTextFileCreate,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
current_user: User = Depends(get_current_user),
|
||||
storage_service: FileStorageService = Depends(get_file_storage_service),
|
||||
):
|
||||
"""
|
||||
custom text
|
||||
"""
|
||||
api_logger.info(f"custom text upload request: kb_id={kb_id}, parent_id={parent_id}, title={create_data.title}, content={create_data.content}, username: {current_user.username}")
|
||||
|
||||
# Check file content size
|
||||
# 将内容编码为字节(UTF-8)
|
||||
"""Custom text upload"""
|
||||
content_bytes = create_data.content.encode('utf-8')
|
||||
file_size = len(content_bytes)
|
||||
print(f"file size: {file_size} byte")
|
||||
if file_size == 0:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="The content is empty."
|
||||
)
|
||||
# If the file size exceeds 50MB (50 * 1024 * 1024 bytes)
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The content is empty.")
|
||||
if file_size > settings.MAX_FILE_SIZE:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"The content size exceeds the {settings.MAX_FILE_SIZE}byte limit"
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Content size exceeds {settings.MAX_FILE_SIZE} byte limit")
|
||||
|
||||
upload_file_data = file_schema.FileCreate(
|
||||
kb_id=kb_id, created_by=current_user.id, parent_id=parent_id,
|
||||
file_name=f"{create_data.title}.txt", file_ext=".txt", file_size=file_size,
|
||||
)
|
||||
db_file = file_service.create_file(db=db, file=upload_file_data, current_user=current_user)
|
||||
|
||||
upload_file = file_schema.FileCreate(
|
||||
kb_id=kb_id,
|
||||
created_by=current_user.id,
|
||||
parent_id=parent_id,
|
||||
file_name=f"{create_data.title}.txt",
|
||||
file_ext=".txt",
|
||||
file_size=file_size,
|
||||
)
|
||||
db_file = file_service.create_file(db=db, file=upload_file, current_user=current_user)
|
||||
# Upload to storage backend
|
||||
file_key = generate_kb_file_key(kb_id=kb_id, file_id=db_file.id, file_ext=".txt")
|
||||
try:
|
||||
await storage_service.storage.upload(file_key=file_key, content=content_bytes, content_type="text/plain")
|
||||
except Exception as e:
|
||||
api_logger.error(f"Storage upload failed: {e}")
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File storage failed: {str(e)}")
|
||||
|
||||
# Construct a save path:/files/{kb_id}/{parent_id}/{file.id}{file_extension}
|
||||
save_dir = os.path.join(settings.FILE_PATH, str(kb_id), str(parent_id))
|
||||
Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists
|
||||
save_path = os.path.join(save_dir, f"{db_file.id}.txt")
|
||||
db_file.file_key = file_key
|
||||
db.commit()
|
||||
db.refresh(db_file)
|
||||
|
||||
# Save file
|
||||
with open(save_path, "wb") as f:
|
||||
f.write(content_bytes)
|
||||
|
||||
# Verify whether the file has been saved successfully
|
||||
if not os.path.exists(save_path):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="File save failed"
|
||||
)
|
||||
|
||||
# Create a document
|
||||
create_document_data = document_schema.DocumentCreate(
|
||||
kb_id=kb_id,
|
||||
created_by=current_user.id,
|
||||
file_id=db_file.id,
|
||||
file_name=db_file.file_name,
|
||||
file_ext=db_file.file_ext,
|
||||
file_size=db_file.file_size,
|
||||
file_meta={},
|
||||
parser_id="naive",
|
||||
parser_config={
|
||||
"layout_recognize": "DeepDOC",
|
||||
"chunk_token_num": 128,
|
||||
"delimiter": "\n",
|
||||
"auto_keywords": 0,
|
||||
"auto_questions": 0,
|
||||
"html4excel": "false"
|
||||
}
|
||||
kb_id=kb_id, created_by=current_user.id, file_id=db_file.id,
|
||||
file_name=db_file.file_name, file_ext=db_file.file_ext, file_size=db_file.file_size,
|
||||
file_meta={}, parser_id="naive",
|
||||
parser_config={"layout_recognize": "DeepDOC", "chunk_token_num": 128, "delimiter": "\n",
|
||||
"auto_keywords": 0, "auto_questions": 0, "html4excel": "false"}
|
||||
)
|
||||
db_document = document_service.create_document(db=db, document=create_document_data, current_user=current_user)
|
||||
|
||||
api_logger.info(f"custom text upload successfully: {create_data.title} (file_id: {db_file.id}, document_id: {db_document.id})")
|
||||
return success(data=jsonable_encoder(document_schema.Document.model_validate(db_document)), msg="custom text upload successful")
|
||||
|
||||
|
||||
@router.get("/{file_id}", response_model=Any)
|
||||
async def get_file(
|
||||
file_id: uuid.UUID,
|
||||
db: Session = Depends(get_db)
|
||||
db: Session = Depends(get_db),
|
||||
storage_service: FileStorageService = Depends(get_file_storage_service),
|
||||
) -> Any:
|
||||
"""
|
||||
Download the file based on the file_id
|
||||
- Query file information from the database
|
||||
- Construct the file path and check if it exists
|
||||
- Return a FileResponse to download the file
|
||||
"""
|
||||
api_logger.info(f"Download the file based on the file_id: file_id={file_id}")
|
||||
|
||||
# 1. Query file information from the database
|
||||
"""Download file by file_id"""
|
||||
db_file = file_service.get_file_by_id(db, file_id=file_id)
|
||||
if not db_file:
|
||||
api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="The file does not exist or you do not have permission to access it"
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
|
||||
|
||||
# 2. Construct file path:/files/{kb_id}/{parent_id}/{file.id}{file.file_ext}
|
||||
file_path = os.path.join(
|
||||
settings.FILE_PATH,
|
||||
str(db_file.kb_id),
|
||||
str(db_file.parent_id),
|
||||
f"{db_file.id}{db_file.file_ext}"
|
||||
)
|
||||
if not db_file.file_key:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File has no storage key (legacy data not migrated)")
|
||||
|
||||
# 3. Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="File not found (possibly deleted)"
|
||||
)
|
||||
try:
|
||||
content = await storage_service.download_file(db_file.file_key)
|
||||
except Exception as e:
|
||||
api_logger.error(f"Storage download failed: {e}")
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found in storage")
|
||||
|
||||
# 4.Return FileResponse (automatically handle download)
|
||||
return FileResponse(
|
||||
path=file_path,
|
||||
filename=db_file.file_name, # Use original file name
|
||||
media_type="application/octet-stream" # Universal binary stream type
|
||||
import mimetypes
|
||||
media_type = mimetypes.guess_type(db_file.file_name)[0] or "application/octet-stream"
|
||||
return Response(
|
||||
content=content,
|
||||
media_type=media_type,
|
||||
headers={"Content-Disposition": f'attachment; filename="{db_file.file_name}"'}
|
||||
)
|
||||
|
||||
|
||||
@@ -348,50 +243,22 @@ async def update_file(
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
"""
|
||||
Update file information (such as file name)
|
||||
- Only specified fields such as file_name are allowed to be modified
|
||||
"""
|
||||
api_logger.debug(f"Query the file to be updated: {file_id}")
|
||||
|
||||
# 1. Check if the file exists
|
||||
"""Update file information (such as file name)"""
|
||||
db_file = file_service.get_file_by_id(db, file_id=file_id)
|
||||
|
||||
if not db_file:
|
||||
api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="The file does not exist or you do not have permission to access it"
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
|
||||
|
||||
# 2. Update fields (only update non-null fields)
|
||||
api_logger.debug(f"Start updating the file fields: {file_id}")
|
||||
updated_fields = []
|
||||
for field, value in update_data.dict(exclude_unset=True).items():
|
||||
if hasattr(db_file, field):
|
||||
old_value = getattr(db_file, field)
|
||||
if old_value != value:
|
||||
# update value
|
||||
setattr(db_file, field, value)
|
||||
updated_fields.append(f"{field}: {old_value} -> {value}")
|
||||
|
||||
if updated_fields:
|
||||
api_logger.debug(f"updated fields: {', '.join(updated_fields)}")
|
||||
|
||||
# 3. Save to database
|
||||
try:
|
||||
db.commit()
|
||||
db.refresh(db_file)
|
||||
api_logger.info(f"The file has been successfully updated: {db_file.file_name} (ID: {db_file.id})")
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
api_logger.error(f"File update failed: file_id={file_id} - {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"File update failed: {str(e)}"
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File update failed: {str(e)}")
|
||||
|
||||
# 4. Return the updated file
|
||||
return success(data=jsonable_encoder(file_schema.File.model_validate(db_file)), msg="File information updated successfully")
|
||||
|
||||
|
||||
@@ -399,60 +266,43 @@ async def update_file(
|
||||
async def delete_file(
|
||||
file_id: uuid.UUID,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
current_user: User = Depends(get_current_user),
|
||||
storage_service: FileStorageService = Depends(get_file_storage_service),
|
||||
):
|
||||
"""
|
||||
Delete a file or folder
|
||||
"""
|
||||
api_logger.info(f"Request to delete file: file_id={file_id}, username: {current_user.username}")
|
||||
await _delete_file(db=db, file_id=file_id, current_user=current_user)
|
||||
"""Delete a file or folder"""
|
||||
api_logger.info(f"Request to delete file: file_id={file_id}")
|
||||
await _delete_file(db=db, file_id=file_id, current_user=current_user, storage_service=storage_service)
|
||||
return success(msg="File deleted successfully")
|
||||
|
||||
|
||||
async def _delete_file(
|
||||
file_id: uuid.UUID,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
db: Session,
|
||||
current_user: User,
|
||||
storage_service: FileStorageService,
|
||||
) -> None:
|
||||
"""
|
||||
Delete a file or folder
|
||||
"""
|
||||
# 1. Check if the file exists
|
||||
"""Delete a file or folder from storage and database"""
|
||||
db_file = file_service.get_file_by_id(db, file_id=file_id)
|
||||
|
||||
if not db_file:
|
||||
api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="The file does not exist or you do not have permission to access it"
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
|
||||
|
||||
# 2. Construct physical path
|
||||
file_path = Path(
|
||||
settings.FILE_PATH,
|
||||
str(db_file.kb_id),
|
||||
str(db_file.id)
|
||||
) if db_file.file_ext == 'folder' else Path(
|
||||
settings.FILE_PATH,
|
||||
str(db_file.kb_id),
|
||||
str(db_file.parent_id),
|
||||
f"{db_file.id}{db_file.file_ext}"
|
||||
)
|
||||
|
||||
# 3. Delete physical files/folders
|
||||
# Delete from storage backend
|
||||
if db_file.file_ext == 'folder':
|
||||
# For folders, delete all child files from storage first
|
||||
child_files = db.query(file_model.File).filter(file_model.File.parent_id == db_file.id).all()
|
||||
for child in child_files:
|
||||
if child.file_key:
|
||||
try:
|
||||
if file_path.exists():
|
||||
if db_file.file_ext == 'folder':
|
||||
shutil.rmtree(file_path) # Recursively delete folders
|
||||
else:
|
||||
file_path.unlink() # Delete a single file
|
||||
await storage_service.delete_file(child.file_key)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to delete physical file/folder: {str(e)}"
|
||||
)
|
||||
|
||||
# 4.Delete db_file
|
||||
if db_file.file_ext == 'folder':
|
||||
api_logger.warning(f"Failed to delete child file from storage: {child.file_key} - {e}")
|
||||
db.query(file_model.File).filter(file_model.File.parent_id == db_file.id).delete()
|
||||
else:
|
||||
if db_file.file_key:
|
||||
try:
|
||||
await storage_service.delete_file(db_file.file_key)
|
||||
except Exception as e:
|
||||
api_logger.warning(f"Failed to delete file from storage: {db_file.file_key} - {e}")
|
||||
|
||||
db.delete(db_file)
|
||||
db.commit()
|
||||
|
||||
@@ -173,6 +173,8 @@ async def delete_tool(
|
||||
return success(msg="工具删除成功")
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@@ -249,6 +251,8 @@ async def parse_openapi_schema(
|
||||
if result["success"] is False:
|
||||
raise HTTPException(status_code=400, detail=result["message"])
|
||||
return success(data=result, msg="Schema解析完成")
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ Transcribe the content from the provided PDF page image into clean Markdown form
|
||||
6. Do NOT wrap the output in ```markdown or ``` blocks.
|
||||
7. Only apply Markdown structure to headings, paragraphs, lists, and tables, strictly based on the layout of the image. Do NOT create tables unless an actual table exists in the image.
|
||||
8. Preserve the original language, information, and order exactly as shown in the image.
|
||||
9. Your output language MUST match the language of the content in the image. If the image contains Chinese text, output in Chinese. If English, output in English. Never translate.
|
||||
|
||||
{% if page %}
|
||||
At the end of the transcription, add the page divider: `--- Page {{ page }} ---`.
|
||||
|
||||
@@ -16,6 +16,7 @@ from app.core.workflow.engine.runtime_schema import ExecutionContext
|
||||
from app.core.workflow.engine.state_manager import WorkflowStateManager
|
||||
from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator
|
||||
from app.core.workflow.engine.variable_pool import VariablePool, VariablePoolInitializer
|
||||
from app.core.workflow.nodes.base_node import NodeExecutionError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -326,10 +327,43 @@ class WorkflowExecutor:
|
||||
|
||||
logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}",
|
||||
exc_info=True)
|
||||
|
||||
# 1) 尝试从 checkpoint 回补已成功节点的 node_outputs
|
||||
recovered: dict[str, Any] = {}
|
||||
try:
|
||||
if self.graph is not None:
|
||||
recovered = self.graph.get_state(
|
||||
self.execution_context.checkpoint_config
|
||||
).values or {}
|
||||
except Exception as recover_err:
|
||||
logger.warning(
|
||||
f"Recover state on failure failed: {recover_err}, "
|
||||
f"execution_id={self.execution_context.execution_id}"
|
||||
)
|
||||
|
||||
if result is None:
|
||||
result = {"error": str(e)}
|
||||
result = dict(recovered) if recovered else {}
|
||||
else:
|
||||
# 已有 result 与 recovered 合并,node_outputs 深度合并
|
||||
for k, v in recovered.items():
|
||||
if k == "node_outputs" and isinstance(v, dict):
|
||||
existing = result.get("node_outputs") or {}
|
||||
result["node_outputs"] = {**v, **existing}
|
||||
else:
|
||||
result.setdefault(k, v)
|
||||
|
||||
# 2) 如果是节点抛出的 NodeExecutionError,把失败节点的 node_output 注入 node_outputs
|
||||
failed_node_id: str | None = None
|
||||
if isinstance(e, NodeExecutionError):
|
||||
failed_node_id = e.node_id
|
||||
node_outputs = result.setdefault("node_outputs", {})
|
||||
# 不覆盖已有(理论上不会有),保底写入失败节点记录
|
||||
node_outputs.setdefault(e.node_id, e.node_output)
|
||||
|
||||
result["error"] = str(e)
|
||||
if failed_node_id:
|
||||
result["error_node"] = failed_node_id
|
||||
|
||||
yield {
|
||||
"event": "workflow_end",
|
||||
"data": self.result_builder.build_final_output(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
@@ -22,6 +23,20 @@ from app.services.multimodal_service import MultimodalService
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NodeExecutionError(Exception):
|
||||
"""节点执行失败异常。
|
||||
|
||||
携带失败节点的完整 node_output,供 executor 兜底注入 node_outputs,
|
||||
保证 workflow_executions.output_data 里能看到失败节点的日志记录。
|
||||
"""
|
||||
|
||||
def __init__(self, node_id: str, node_output: dict[str, Any], error_message: str):
|
||||
super().__init__(f"Node {node_id} execution failed: {error_message}")
|
||||
self.node_id = node_id
|
||||
self.node_output = node_output
|
||||
self.error_message = error_message
|
||||
|
||||
|
||||
class BaseNode(ABC):
|
||||
"""Base class for workflow nodes.
|
||||
|
||||
@@ -396,6 +411,8 @@ class BaseNode(ABC):
|
||||
"elapsed_time": elapsed_time,
|
||||
"token_usage": token_usage,
|
||||
"error": None,
|
||||
# 单调递增序号,用于日志按执行顺序排序(JSONB 不保证 key 顺序)
|
||||
"execution_order": time.monotonic_ns(),
|
||||
**self._extract_extra_fields(business_result),
|
||||
}
|
||||
final_output = {
|
||||
@@ -444,7 +461,9 @@ class BaseNode(ABC):
|
||||
"output": None,
|
||||
"elapsed_time": elapsed_time,
|
||||
"token_usage": None,
|
||||
"error": error_message
|
||||
"error": error_message,
|
||||
# 单调递增序号,用于日志按执行顺序排序
|
||||
"execution_order": time.monotonic_ns(),
|
||||
}
|
||||
|
||||
# if error_edge:
|
||||
@@ -466,7 +485,12 @@ class BaseNode(ABC):
|
||||
**node_output
|
||||
})
|
||||
logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}")
|
||||
raise Exception(f"Node {self.node_id} execution failed: {error_message}")
|
||||
# 抛出自定义异常,把 node_output 带给 executor,供其写入 node_outputs
|
||||
raise NodeExecutionError(
|
||||
node_id=self.node_id,
|
||||
node_output=node_output,
|
||||
error_message=error_message,
|
||||
)
|
||||
|
||||
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
|
||||
"""Extracts the input data for this node (used for logging or audit).
|
||||
|
||||
@@ -174,6 +174,9 @@ class IterationRuntime:
|
||||
continue
|
||||
node_type = result.get("node_outputs", {}).get(node_name, {}).get("node_type")
|
||||
cycle_variable = {"item": item} if node_type == NodeType.CYCLE_START else None
|
||||
node_cfg = next(
|
||||
(n for n in self.cycle_nodes if n.get("id") == node_name), None
|
||||
)
|
||||
self.event_write({
|
||||
"type": "cycle_item",
|
||||
"data": {
|
||||
|
||||
@@ -255,9 +255,18 @@ class HttpRequestNode(BaseNode):
|
||||
case HttpContentType.NONE:
|
||||
return {}
|
||||
case HttpContentType.JSON:
|
||||
content["json"] = json.loads(self._render_template(
|
||||
rendered = self._render_template(
|
||||
self.typed_config.body.data, variable_pool
|
||||
))
|
||||
)
|
||||
if not rendered or not rendered.strip():
|
||||
# 第三方导入的工作流可能出现 content_type=json 但 data 为空的情况,视为无 body
|
||||
return {}
|
||||
try:
|
||||
content["json"] = json.loads(rendered)
|
||||
except json.JSONDecodeError as e:
|
||||
raise RuntimeError(
|
||||
f"Invalid JSON body for HTTP request node: {e.msg} (data={rendered!r})"
|
||||
)
|
||||
case HttpContentType.FROM_DATA:
|
||||
data = {}
|
||||
files = []
|
||||
|
||||
@@ -334,7 +334,8 @@ class KnowledgeRetrievalNode(BaseNode):
|
||||
for kb_config in knowledge_bases:
|
||||
db_knowledge = knowledge_repository.get_knowledge_by_id(db=db, knowledge_id=kb_config.kb_id)
|
||||
if not (db_knowledge and db_knowledge.chunk_num > 0 and db_knowledge.status == 1):
|
||||
raise RuntimeError("The knowledge base does not exist or access is denied.")
|
||||
logger.warning("The knowledge base does not exist or access is denied.")
|
||||
continue
|
||||
tasks.append(self.knowledge_retrieval(db, query, db_knowledge, kb_config))
|
||||
if tasks:
|
||||
result = await asyncio.gather(*tasks)
|
||||
|
||||
@@ -15,4 +15,5 @@ class File(Base):
|
||||
file_ext = Column(String, index=True, nullable=False, comment="file extension:folder|pdf")
|
||||
file_size = Column(Integer, default=0, comment="file size(byte)")
|
||||
file_url = Column(String, index=True, nullable=True, comment="file comes from a website url")
|
||||
file_key = Column(String(512), nullable=True, index=True, comment="storage file key for FileStorageService")
|
||||
created_at = Column(DateTime, default=datetime.datetime.now)
|
||||
@@ -1,13 +1,15 @@
|
||||
import uuid
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import select, desc, func
|
||||
from sqlalchemy import select, desc, func, or_, cast, Text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.core.exceptions import ResourceNotFoundException
|
||||
from app.core.logging_config import get_db_logger
|
||||
from app.models import Conversation, Message
|
||||
from app.models.app_model import AppType
|
||||
from app.models.conversation_model import ConversationDetail
|
||||
from app.models.workflow_model import WorkflowExecution
|
||||
|
||||
logger = get_db_logger()
|
||||
|
||||
@@ -206,7 +208,8 @@ class ConversationRepository:
|
||||
is_draft: Optional[bool] = None,
|
||||
keyword: Optional[str] = None,
|
||||
page: int = 1,
|
||||
pagesize: int = 20
|
||||
pagesize: int = 20,
|
||||
app_type: Optional[str] = None,
|
||||
) -> tuple[list[Conversation], int]:
|
||||
"""
|
||||
查询应用日志会话列表(带分页和过滤)
|
||||
@@ -218,6 +221,9 @@ class ConversationRepository:
|
||||
keyword: 搜索关键词(匹配消息内容)
|
||||
page: 页码(从 1 开始)
|
||||
pagesize: 每页数量
|
||||
app_type: 应用类型。WORKFLOW 类型改用 workflow_executions 的
|
||||
input_data/output_data 做关键词过滤(因为失败的工作流不会写入 messages 表);
|
||||
其他类型仍走 messages 表。
|
||||
|
||||
Returns:
|
||||
Tuple[List[Conversation], int]: (会话列表,总数)
|
||||
@@ -234,10 +240,26 @@ class ConversationRepository:
|
||||
|
||||
# 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation
|
||||
if keyword:
|
||||
# 查找包含关键词的 conversation_id 列表
|
||||
kw_pattern = f"%{keyword}%"
|
||||
if app_type == AppType.WORKFLOW:
|
||||
# 工作流:从 workflow_executions 的 input_data / output_data 匹配
|
||||
# (messages 表只存开场白 assistant 消息,失败的工作流也不会写入)
|
||||
keyword_stmt = (
|
||||
select(WorkflowExecution.conversation_id)
|
||||
.where(
|
||||
WorkflowExecution.conversation_id.is_not(None),
|
||||
or_(
|
||||
cast(WorkflowExecution.input_data, Text).ilike(kw_pattern),
|
||||
cast(WorkflowExecution.output_data, Text).ilike(kw_pattern),
|
||||
),
|
||||
)
|
||||
.distinct()
|
||||
)
|
||||
else:
|
||||
# Agent 等其他类型:仍走 messages 表(user + assistant 内容)
|
||||
keyword_stmt = (
|
||||
select(Message.conversation_id)
|
||||
.where(Message.content.ilike(f"%{keyword}%"))
|
||||
.where(Message.content.ilike(kw_pattern))
|
||||
.distinct()
|
||||
)
|
||||
base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt))
|
||||
|
||||
@@ -3,7 +3,7 @@ import uuid
|
||||
from typing import Optional, Any, List, Dict, Union
|
||||
from enum import Enum, StrEnum
|
||||
|
||||
from pydantic import BaseModel, Field, ConfigDict, field_serializer, field_validator
|
||||
from pydantic import BaseModel, Field, ConfigDict, field_serializer, field_validator, model_serializer
|
||||
|
||||
from app.schemas.workflow_schema import WorkflowConfigCreate
|
||||
|
||||
@@ -661,9 +661,11 @@ class DraftRunResponse(BaseModel):
|
||||
suggested_questions: List[str] = Field(default_factory=list, description="下一步建议问题")
|
||||
citations: List[Dict[str, Any]] = Field(default_factory=list, description="引用来源")
|
||||
audio_url: Optional[str] = Field(default=None, description="TTS 语音URL")
|
||||
audio_status: Optional[str] = Field(default=None, description="TTS 语音状态")
|
||||
|
||||
def model_dump(self, **kwargs):
|
||||
data = super().model_dump(**kwargs)
|
||||
@model_serializer(mode="wrap")
|
||||
def _serialize(self, handler):
|
||||
data = handler(self)
|
||||
if not data.get("reasoning_content"):
|
||||
data.pop("reasoning_content", None)
|
||||
return data
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
import uuid
|
||||
import datetime
|
||||
from typing import Optional, Dict, Any, List
|
||||
from pydantic import BaseModel, Field, ConfigDict, field_serializer
|
||||
from pydantic import BaseModel, Field, ConfigDict, field_serializer, model_serializer
|
||||
|
||||
# 导入 FileInput(用于体验运行)
|
||||
from app.schemas.app_schema import FileInput
|
||||
@@ -94,6 +94,18 @@ class ChatResponse(BaseModel):
|
||||
message_id: str
|
||||
usage: Optional[Dict[str, Any]] = None
|
||||
elapsed_time: Optional[float] = None
|
||||
reasoning_content: Optional[str] = None
|
||||
suggested_questions: Optional[List[str]] = None
|
||||
citations: Optional[List[Dict[str, Any]]] = None
|
||||
audio_url: Optional[str] = None
|
||||
audio_status: Optional[str] = None
|
||||
|
||||
@model_serializer(mode="wrap")
|
||||
def _serialize(self, handler):
|
||||
data = handler(self)
|
||||
if not data.get("reasoning_content"):
|
||||
data.pop("reasoning_content", None)
|
||||
return data
|
||||
|
||||
|
||||
# ---------- Conversation Summary Schemas ----------
|
||||
|
||||
@@ -11,6 +11,7 @@ class FileBase(BaseModel):
|
||||
file_ext: str
|
||||
file_size: int
|
||||
file_url: str | None = None
|
||||
file_key: str | None = None
|
||||
created_at: datetime.datetime | None = None
|
||||
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from sqlalchemy.orm import Session
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.aioRedis import aio_redis
|
||||
from app.models.api_key_model import ApiKey
|
||||
from app.models.api_key_model import ApiKey, ApiKeyType
|
||||
from app.repositories.api_key_repository import ApiKeyRepository, ApiKeyLogRepository
|
||||
from app.schemas import api_key_schema
|
||||
from app.schemas.response_schema import PageData, PageMeta
|
||||
@@ -65,7 +65,8 @@ class ApiKeyService:
|
||||
BizCode.BAD_REQUEST
|
||||
)
|
||||
|
||||
if data.resource_id:
|
||||
# SERVICE 类型的 resource_id 指向 workspace,非应用,跳过应用发布校验
|
||||
if data.resource_id and data.type != ApiKeyType.SERVICE.value:
|
||||
app = db.get(App, data.resource_id)
|
||||
if not app or not app.current_release_id:
|
||||
raise BusinessException("该应用未发布", BizCode.APP_NOT_PUBLISHED)
|
||||
@@ -452,9 +453,12 @@ class ApiKeyAuthService:
|
||||
def check_app_published(db: Session, api_key_obj: ApiKey) -> None:
|
||||
"""
|
||||
检查应用是否已发布,未发布则抛出异常
|
||||
SERVICE 类型的 api_key 不绑定应用(resource_id 指向 workspace),跳过校验
|
||||
"""
|
||||
if not api_key_obj.resource_id:
|
||||
return
|
||||
if api_key_obj.type == ApiKeyType.SERVICE.value:
|
||||
return
|
||||
app = db.get(App, api_key_obj.resource_id)
|
||||
if not app or not app.current_release_id:
|
||||
raise BusinessException("应用未发布,不可用", BizCode.APP_NOT_PUBLISHED)
|
||||
|
||||
@@ -317,7 +317,7 @@ class AppChatService:
|
||||
"suggested_questions": suggested_questions,
|
||||
"citations": filtered_citations,
|
||||
"audio_url": audio_url,
|
||||
"audio_status": "pending"
|
||||
"audio_status": "pending" if audio_url else None
|
||||
}
|
||||
|
||||
async def agnet_chat_stream(
|
||||
|
||||
@@ -32,6 +32,7 @@ class AppLogService:
|
||||
pagesize: int = 20,
|
||||
is_draft: Optional[bool] = None,
|
||||
keyword: Optional[str] = None,
|
||||
app_type: Optional[str] = None,
|
||||
) -> Tuple[list[Conversation], int]:
|
||||
"""
|
||||
查询应用日志会话列表
|
||||
@@ -43,6 +44,7 @@ class AppLogService:
|
||||
pagesize: 每页数量
|
||||
is_draft: 是否草稿会话(None表示返回全部)
|
||||
keyword: 搜索关键词(匹配消息内容)
|
||||
app_type: 应用类型(WORKFLOW 时关键词将从 workflow_executions 搜索)
|
||||
|
||||
Returns:
|
||||
Tuple[list[Conversation], int]: (会话列表,总数)
|
||||
@@ -55,7 +57,8 @@ class AppLogService:
|
||||
"page": page,
|
||||
"pagesize": pagesize,
|
||||
"is_draft": is_draft,
|
||||
"keyword": keyword
|
||||
"keyword": keyword,
|
||||
"app_type": app_type,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -66,7 +69,8 @@ class AppLogService:
|
||||
is_draft=is_draft,
|
||||
keyword=keyword,
|
||||
page=page,
|
||||
pagesize=pagesize
|
||||
pagesize=pagesize,
|
||||
app_type=app_type,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
@@ -368,8 +372,16 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod
|
||||
if not output_data:
|
||||
return []
|
||||
node_outputs: dict = output_data.get("node_outputs") or {}
|
||||
# 按 execution_order(节点执行时写入的单调递增序号)排序。
|
||||
# PostgreSQL JSONB 不保证 key 顺序,不能依赖 dict 插入顺序;
|
||||
# 缺失 execution_order 的历史数据退化到 0,保持在最前。
|
||||
ordered_items = sorted(
|
||||
node_outputs.items(),
|
||||
key=lambda kv: (kv[1] or {}).get("execution_order", 0)
|
||||
if isinstance(kv[1], dict) else 0
|
||||
)
|
||||
result = []
|
||||
for node_id, node_data in node_outputs.items():
|
||||
for node_id, node_data in ordered_items:
|
||||
if not isinstance(node_data, dict):
|
||||
continue
|
||||
output = dict(node_data)
|
||||
@@ -382,6 +394,8 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod
|
||||
inp = output.pop("input", None)
|
||||
elapsed_time = output.pop("elapsed_time", None)
|
||||
token_usage = output.pop("token_usage", None)
|
||||
# execution_order 仅用于排序,不返回给前端
|
||||
output.pop("execution_order", None)
|
||||
result.append(AppLogNodeExecution(
|
||||
node_id=node_id,
|
||||
node_type=node_type,
|
||||
|
||||
@@ -754,7 +754,7 @@ class AgentRunService:
|
||||
) if not sub_agent else [],
|
||||
"citations": filtered_citations,
|
||||
"audio_url": audio_url,
|
||||
"audio_status": "pending"
|
||||
"audio_status": "pending" if audio_url else None
|
||||
}
|
||||
|
||||
logger.info(
|
||||
|
||||
@@ -34,26 +34,7 @@ def generate_file_key(
|
||||
Generate a unique file key for storage.
|
||||
|
||||
The file key follows the format: {tenant_id}/{workspace_id}/{file_id}{file_ext}
|
||||
|
||||
Args:
|
||||
tenant_id: The tenant UUID.
|
||||
workspace_id: The workspace UUID.
|
||||
file_id: The file UUID.
|
||||
file_ext: The file extension (e.g., '.pdf', '.txt').
|
||||
|
||||
Returns:
|
||||
A unique file key string.
|
||||
|
||||
Example:
|
||||
>>> generate_file_key(
|
||||
... uuid.UUID('550e8400-e29b-41d4-a716-446655440000'),
|
||||
... uuid.UUID('660e8400-e29b-41d4-a716-446655440001'),
|
||||
... uuid.UUID('770e8400-e29b-41d4-a716-446655440002'),
|
||||
... '.pdf'
|
||||
... )
|
||||
'550e8400-e29b-41d4-a716-446655440000/660e8400-e29b-41d4-a716-446655440001/770e8400-e29b-41d4-a716-446655440002.pdf'
|
||||
"""
|
||||
# Ensure file_ext starts with a dot
|
||||
if file_ext and not file_ext.startswith('.'):
|
||||
file_ext = f'.{file_ext}'
|
||||
if workspace_id:
|
||||
@@ -61,6 +42,21 @@ def generate_file_key(
|
||||
return f"{tenant_id}/{file_id}{file_ext}"
|
||||
|
||||
|
||||
def generate_kb_file_key(
|
||||
kb_id: uuid.UUID,
|
||||
file_id: uuid.UUID,
|
||||
file_ext: str,
|
||||
) -> str:
|
||||
"""
|
||||
Generate a file key for knowledge base files.
|
||||
|
||||
Format: kb/{kb_id}/{file_id}{file_ext}
|
||||
"""
|
||||
if file_ext and not file_ext.startswith('.'):
|
||||
file_ext = f'.{file_ext}'
|
||||
return f"kb/{kb_id}/{file_id}{file_ext}"
|
||||
|
||||
|
||||
class FileStorageService:
|
||||
"""
|
||||
High-level service for file storage operations.
|
||||
|
||||
@@ -95,7 +95,7 @@ class DashScopeFormatStrategy(MultimodalFormatStrategy):
|
||||
"""通义千问文档格式"""
|
||||
return True, {
|
||||
"type": "text",
|
||||
"text": f"<document name=\"{file_name}\">\n{text}\n</document>"
|
||||
"text": f"<document name=\"{file_name}\">\n文档内容:\n{text}\n</document>"
|
||||
}
|
||||
|
||||
async def format_audio(
|
||||
@@ -167,6 +167,7 @@ class BedrockFormatStrategy(MultimodalFormatStrategy):
|
||||
async def format_document(self, file_name: str, text: str) -> tuple[bool, Dict[str, Any]]:
|
||||
"""Bedrock/Anthropic 文档格式(需要 base64 编码)"""
|
||||
# Bedrock 文档需要 base64 编码
|
||||
text = f"文档内容:\n{text}\n"
|
||||
text_bytes = text.encode('utf-8')
|
||||
base64_text = base64.b64encode(text_bytes).decode('utf-8')
|
||||
|
||||
@@ -223,7 +224,7 @@ class OpenAIFormatStrategy(MultimodalFormatStrategy):
|
||||
"""OpenAI 文档格式"""
|
||||
return True, {
|
||||
"type": "text",
|
||||
"text": f"<document name=\"{file_name}\">\n{text}\n</document>"
|
||||
"text": f"<document name=\"{file_name}\">\n文档内容:\n{text}\n</document>"
|
||||
}
|
||||
|
||||
async def format_audio(
|
||||
@@ -395,7 +396,7 @@ class MultimodalService:
|
||||
ext = img_info.get("ext", "png")
|
||||
try:
|
||||
_, img_url = await self._save_doc_image_to_storage(img_info["bytes"], ext, tenant_id, workspace_id)
|
||||
placeholder = f"第{page}页 第{index + 1}张图片" if page > 0 else f"第{index + 1}张图片"
|
||||
placeholder = f"第{page}页 第{index + 1}张" if page > 0 else f"第{index + 1}张"
|
||||
# 在文本内容中追加图片位置标记
|
||||
if result and result[-1].get("type") in ("text", "document"):
|
||||
key = "text" if "text" in result[-1] else list(result[-1].keys())[-1]
|
||||
|
||||
@@ -210,9 +210,14 @@ def _build_vision_model(file_path: str, db_knowledge):
|
||||
|
||||
|
||||
@celery_app.task(name="app.core.rag.tasks.parse_document")
|
||||
def parse_document(file_path: str, document_id: uuid.UUID):
|
||||
def parse_document(file_key: str, document_id: uuid.UUID, file_name: str = ""):
|
||||
"""
|
||||
Document parsing, vectorization, and storage
|
||||
Document parsing, vectorization, and storage.
|
||||
|
||||
Args:
|
||||
file_key: Storage key for FileStorageService (e.g. "kb/{kb_id}/{file_id}.docx")
|
||||
document_id: Document UUID
|
||||
file_name: Original file name (used for extension detection in chunk())
|
||||
"""
|
||||
|
||||
db_document = None
|
||||
@@ -223,7 +228,6 @@ def parse_document(file_path: str, document_id: uuid.UUID):
|
||||
|
||||
with get_db_context() as db:
|
||||
try:
|
||||
# Celery JSON 序列化会将 UUID 转为字符串,需要确保类型正确
|
||||
if not isinstance(document_id, uuid.UUID):
|
||||
document_id = uuid.UUID(str(document_id))
|
||||
|
||||
@@ -234,7 +238,11 @@ def parse_document(file_path: str, document_id: uuid.UUID):
|
||||
if db_knowledge is None:
|
||||
raise ValueError(f"Knowledge {db_document.kb_id} not found")
|
||||
|
||||
# 1. Document parsing & segmentation
|
||||
# Use file_name from argument or fall back to document record
|
||||
if not file_name:
|
||||
file_name = db_document.file_name
|
||||
|
||||
# 1. Download file from storage backend
|
||||
progress_lines.append(f"{datetime.now().strftime('%H:%M:%S')} Start to parse.")
|
||||
start_time = time.time()
|
||||
db_document.progress = 0.0
|
||||
@@ -245,45 +253,36 @@ def parse_document(file_path: str, document_id: uuid.UUID):
|
||||
db.commit()
|
||||
db.refresh(db_document)
|
||||
|
||||
# Read file content from storage backend (no NFS dependency)
|
||||
from app.services.file_storage_service import FileStorageService
|
||||
import asyncio
|
||||
storage_service = FileStorageService()
|
||||
|
||||
async def _download():
|
||||
return await storage_service.download_file(file_key)
|
||||
|
||||
try:
|
||||
file_binary = asyncio.run(_download())
|
||||
except RuntimeError:
|
||||
# If there's already a running loop (e.g. in some worker configurations)
|
||||
loop = asyncio.new_event_loop()
|
||||
try:
|
||||
file_binary = loop.run_until_complete(_download())
|
||||
finally:
|
||||
loop.close()
|
||||
if not file_binary:
|
||||
raise IOError(f"Downloaded empty file from storage: {file_key}")
|
||||
logger.info(f"[ParseDoc] Downloaded {len(file_binary)} bytes from storage key: {file_key}")
|
||||
|
||||
def progress_callback(prog=None, msg=None):
|
||||
progress_lines.append(f"{datetime.now().strftime('%H:%M:%S')} parse progress: {prog} msg: {msg}.")
|
||||
|
||||
# Prepare vision_model for parsing
|
||||
vision_model = _build_vision_model(file_path, db_knowledge)
|
||||
|
||||
# 先将文件读入内存,避免解析过程中依赖 NFS 文件持续可访问
|
||||
# python-docx 等库在 binary=None 时会用路径直接打开文件,
|
||||
# 在 NFS/共享存储上可能因缓存失效导致 "Package not found"
|
||||
max_wait_seconds = 30
|
||||
wait_interval = 2
|
||||
waited = 0
|
||||
file_binary = None
|
||||
while waited <= max_wait_seconds:
|
||||
# os.listdir 强制 NFS 客户端刷新目录缓存
|
||||
parent_dir = os.path.dirname(file_path)
|
||||
try:
|
||||
os.listdir(parent_dir)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
with open(file_path, "rb") as f:
|
||||
file_binary = f.read()
|
||||
if not file_binary:
|
||||
# NFS 上文件存在但内容为空(可能还在同步中)
|
||||
raise IOError(f"File is empty (0 bytes), NFS may still be syncing: {file_path}")
|
||||
break
|
||||
except (FileNotFoundError, IOError) as e:
|
||||
if waited >= max_wait_seconds:
|
||||
raise type(e)(
|
||||
f"File not accessible at '{file_path}' after waiting {max_wait_seconds}s: {e}"
|
||||
)
|
||||
logger.warning(f"File not ready on this node, retrying in {wait_interval}s: {file_path} ({e})")
|
||||
time.sleep(wait_interval)
|
||||
waited += wait_interval
|
||||
vision_model = _build_vision_model(file_name, db_knowledge)
|
||||
|
||||
from app.core.rag.app.naive import chunk
|
||||
logger.info(f"[ParseDoc] file_binary size={len(file_binary)} bytes, type={type(file_binary).__name__}, bool={bool(file_binary)}")
|
||||
res = chunk(filename=file_path,
|
||||
res = chunk(filename=file_name,
|
||||
binary=file_binary,
|
||||
from_page=0,
|
||||
to_page=DEFAULT_PARSE_TO_PAGE,
|
||||
|
||||
Reference in New Issue
Block a user