Merge branch 'refs/heads/develop' into feature/agent-tool_xjn

This commit is contained in:
Timebomb2018
2026-04-28 12:07:50 +08:00
24 changed files with 398 additions and 416 deletions

View File

@@ -41,7 +41,7 @@ def list_app_logs(
# 验证应用访问权限 # 验证应用访问权限
app_service = AppService(db) app_service = AppService(db)
app_service.get_app(app_id, workspace_id) app = app_service.get_app(app_id, workspace_id)
# 使用 Service 层查询 # 使用 Service 层查询
log_service = AppLogService(db) log_service = AppLogService(db)
@@ -51,7 +51,8 @@ def list_app_logs(
page=page, page=page,
pagesize=pagesize, pagesize=pagesize,
is_draft=is_draft, is_draft=is_draft,
keyword=keyword keyword=keyword,
app_type=app.type,
) )
items = [AppLogConversation.model_validate(c) for c in conversations] items = [AppLogConversation.model_validate(c) for c in conversations]

View File

@@ -82,19 +82,32 @@ async def get_preview_chunks(
detail="The file does not exist or you do not have permission to access it" detail="The file does not exist or you do not have permission to access it"
) )
# 5. Construct file path/files/{kb_id}/{parent_id}/{file.id}{file.file_ext} # 5. Get file content from storage backend
file_path = os.path.join( if not db_file.file_key:
settings.FILE_PATH,
str(db_file.kb_id),
str(db_file.parent_id),
f"{db_file.id}{db_file.file_ext}"
)
# 6. Check if the file exists
if not os.path.exists(file_path):
raise HTTPException( raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, status_code=status.HTTP_404_NOT_FOUND,
detail="File not found (possibly deleted)" detail="File has no storage key (legacy data not migrated)"
)
from app.services.file_storage_service import FileStorageService
import asyncio
storage_service = FileStorageService()
async def _download():
return await storage_service.download_file(db_file.file_key)
try:
file_binary = asyncio.run(_download())
except RuntimeError:
loop = asyncio.new_event_loop()
try:
file_binary = loop.run_until_complete(_download())
finally:
loop.close()
except Exception as e:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File not found in storage: {e}"
) )
# 7. Document parsing & segmentation # 7. Document parsing & segmentation
@@ -104,11 +117,12 @@ async def get_preview_chunks(
vision_model = QWenCV( vision_model = QWenCV(
key=db_knowledge.image2text.api_keys[0].api_key, key=db_knowledge.image2text.api_keys[0].api_key,
model_name=db_knowledge.image2text.api_keys[0].model_name, model_name=db_knowledge.image2text.api_keys[0].model_name,
lang="Chinese", # Default to Chinese lang="Chinese",
base_url=db_knowledge.image2text.api_keys[0].api_base base_url=db_knowledge.image2text.api_keys[0].api_base
) )
from app.core.rag.app.naive import chunk from app.core.rag.app.naive import chunk
res = chunk(filename=file_path, res = chunk(filename=db_file.file_name,
binary=file_binary,
from_page=0, from_page=0,
to_page=5, to_page=5,
callback=progress_callback, callback=progress_callback,

View File

@@ -20,6 +20,7 @@ from app.models.user_model import User
from app.schemas import document_schema from app.schemas import document_schema
from app.schemas.response_schema import ApiResponse from app.schemas.response_schema import ApiResponse
from app.services import document_service, file_service, knowledge_service from app.services import document_service, file_service, knowledge_service
from app.services.file_storage_service import FileStorageService, get_file_storage_service
# Obtain a dedicated API logger # Obtain a dedicated API logger
@@ -231,7 +232,8 @@ async def update_document(
async def delete_document( async def delete_document(
document_id: uuid.UUID, document_id: uuid.UUID,
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user),
storage_service: FileStorageService = Depends(get_file_storage_service),
): ):
""" """
Delete document Delete document
@@ -257,7 +259,7 @@ async def delete_document(
db.commit() db.commit()
# 3. Delete file # 3. Delete file
await file_controller._delete_file(db=db, file_id=file_id, current_user=current_user) await file_controller._delete_file(db=db, file_id=file_id, current_user=current_user, storage_service=storage_service)
# 4. Delete vector index # 4. Delete vector index
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=db_document.kb_id, current_user=current_user) db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=db_document.kb_id, current_user=current_user)
@@ -305,38 +307,25 @@ async def parse_documents(
detail="The file does not exist or you do not have permission to access it" detail="The file does not exist or you do not have permission to access it"
) )
# 3. Construct file path/files/{kb_id}/{parent_id}/{file.id}{file.file_ext} # 3. Get file_key for storage backend
file_path = os.path.join( if not db_file.file_key:
settings.FILE_PATH, api_logger.error(f"File has no storage key (legacy data not migrated): file_id={db_file.id}")
str(db_file.kb_id),
str(db_file.parent_id),
f"{db_file.id}{db_file.file_ext}"
)
# 4. Check if the file exists
api_logger.debug(f"Constructed file path: {file_path}")
api_logger.debug(f"File metadata - kb_id: {db_file.kb_id}, parent_id: {db_file.parent_id}, file_id: {db_file.id}, extension: {db_file.file_ext}")
if not os.path.exists(file_path):
api_logger.error(f"File not found (possibly deleted): file_path={file_path}, file_id={db_file.id}, document_id={document_id}")
raise HTTPException( raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, status_code=status.HTTP_404_NOT_FOUND,
detail="File not found (possibly deleted)" detail="File has no storage key (legacy data not migrated)"
) )
# 5. Obtain knowledge base information # 4. Obtain knowledge base information
api_logger.info( f"Obtain details of the knowledge base: knowledge_id={db_document.kb_id}") api_logger.info(f"Obtain details of the knowledge base: knowledge_id={db_document.kb_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=db_document.kb_id, current_user=current_user) db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=db_document.kb_id, current_user=current_user)
if not db_knowledge: if not db_knowledge:
api_logger.warning(f"The knowledge base does not exist or access is denied: knowledge_id={db_document.kb_id}") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Knowledge base not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or access is denied"
)
# 6. Task: Document parsing, vectorization, and storage # 5. Dispatch parse task with file_key (not file_path)
# from app.tasks import parse_document task = celery_app.send_task(
# parse_document(file_path, document_id) "app.core.rag.tasks.parse_document",
task = celery_app.send_task("app.core.rag.tasks.parse_document", args=[file_path, document_id]) args=[db_file.file_key, document_id, db_file.file_name]
)
result = { result = {
"task_id": task.id "task_id": task.id
} }

View File

@@ -1,12 +1,10 @@
import os import os
from pathlib import Path
import shutil
from typing import Any, Optional from typing import Any, Optional
import uuid import uuid
from fastapi import APIRouter, Depends, HTTPException, status, File, UploadFile, Query from fastapi import APIRouter, Depends, HTTPException, status, File, UploadFile, Query
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from fastapi.responses import FileResponse from fastapi.responses import Response
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.core.config import settings from app.core.config import settings
@@ -19,10 +17,14 @@ from app.models.user_model import User
from app.schemas import file_schema, document_schema from app.schemas import file_schema, document_schema
from app.schemas.response_schema import ApiResponse from app.schemas.response_schema import ApiResponse
from app.services import file_service, document_service from app.services import file_service, document_service
from app.services.knowledge_service import get_knowledge_by_id as get_kb_by_id
from app.services.file_storage_service import (
FileStorageService,
generate_kb_file_key,
get_file_storage_service,
)
from app.core.quota_stub import check_knowledge_capacity_quota from app.core.quota_stub import check_knowledge_capacity_quota
# Obtain a dedicated API logger
api_logger = get_api_logger() api_logger = get_api_logger()
router = APIRouter( router = APIRouter(
@@ -35,67 +37,37 @@ router = APIRouter(
async def get_files( async def get_files(
kb_id: uuid.UUID, kb_id: uuid.UUID,
parent_id: uuid.UUID, parent_id: uuid.UUID,
page: int = Query(1, gt=0), # Default: 1, which must be greater than 0 page: int = Query(1, gt=0),
pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items pagesize: int = Query(20, gt=0, le=100),
orderby: Optional[str] = Query(None, description="Sort fields, such as: created_at"), orderby: Optional[str] = Query(None, description="Sort fields, such as: created_at"),
desc: Optional[bool] = Query(False, description="Is it descending order"), desc: Optional[bool] = Query(False, description="Is it descending order"),
keywords: Optional[str] = Query(None, description="Search keywords (file name)"), keywords: Optional[str] = Query(None, description="Search keywords (file name)"),
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user)
): ):
""" """Paged query file list"""
Paged query file list api_logger.info(f"Query file list: kb_id={kb_id}, parent_id={parent_id}, page={page}, pagesize={pagesize}")
- Support filtering by kb_id and parent_id
- Support keyword search for file names
- Support dynamic sorting
- Return paging metadata + file list
"""
api_logger.info(f"Query file list: kb_id={kb_id}, parent_id={parent_id}, page={page}, pagesize={pagesize}, keywords={keywords}, username: {current_user.username}")
# 1. parameter validation
if page < 1 or pagesize < 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The paging parameter must be greater than 0"
)
# 2. Construct query conditions if page < 1 or pagesize < 1:
filters = [ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The paging parameter must be greater than 0")
file_model.File.kb_id == kb_id
] filters = [file_model.File.kb_id == kb_id]
if parent_id: if parent_id:
filters.append(file_model.File.parent_id == parent_id) filters.append(file_model.File.parent_id == parent_id)
# Keyword search (fuzzy matching of file name)
if keywords: if keywords:
filters.append(file_model.File.file_name.ilike(f"%{keywords}%")) filters.append(file_model.File.file_name.ilike(f"%{keywords}%"))
# 3. Execute paged query
try: try:
api_logger.debug("Start executing file paging query")
total, items = file_service.get_files_paginated( total, items = file_service.get_files_paginated(
db=db, db=db, filters=filters, page=page, pagesize=pagesize,
filters=filters, orderby=orderby, desc=desc, current_user=current_user
page=page,
pagesize=pagesize,
orderby=orderby,
desc=desc,
current_user=current_user
) )
api_logger.info(f"File query successful: total={total}, returned={len(items)} records")
except Exception as e: except Exception as e:
raise HTTPException( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Query failed: {str(e)}")
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Query failed: {str(e)}"
)
# 4. Return structured response
result = { result = {
"items": items, "items": items,
"page": { "page": {"page": page, "pagesize": pagesize, "total": total, "has_next": page * pagesize < total}
"page": page,
"pagesize": pagesize,
"total": total,
"has_next": True if page * pagesize < total else False
}
} }
return success(data=jsonable_encoder(result), msg="Query of file list succeeded") return success(data=jsonable_encoder(result), msg="Query of file list succeeded")
@@ -108,23 +80,14 @@ async def create_folder(
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
""" """Create a new folder"""
Create a new folder api_logger.info(f"Create folder request: kb_id={kb_id}, parent_id={parent_id}, folder_name={folder_name}")
"""
api_logger.info(f"Create folder request: kb_id={kb_id}, parent_id={parent_id}, folder_name={folder_name}, username: {current_user.username}")
try: try:
api_logger.debug(f"Start creating a folder: {folder_name}") create_folder_data = file_schema.FileCreate(
create_folder = file_schema.FileCreate( kb_id=kb_id, created_by=current_user.id, parent_id=parent_id,
kb_id=kb_id, file_name=folder_name, file_ext='folder', file_size=0,
created_by=current_user.id,
parent_id=parent_id,
file_name=folder_name,
file_ext='folder',
file_size=0,
) )
db_file = file_service.create_file(db=db, file=create_folder, current_user=current_user) db_file = file_service.create_file(db=db, file=create_folder_data, current_user=current_user)
api_logger.info(f"Folder created successfully: {db_file.file_name} (ID: {db_file.id})")
return success(data=jsonable_encoder(file_schema.File.model_validate(db_file)), msg="Folder creation successful") return success(data=jsonable_encoder(file_schema.File.model_validate(db_file)), msg="Folder creation successful")
except Exception as e: except Exception as e:
api_logger.error(f"Folder creation failed: {folder_name} - {str(e)}") api_logger.error(f"Folder creation failed: {folder_name} - {str(e)}")
@@ -138,76 +101,58 @@ async def upload_file(
parent_id: uuid.UUID, parent_id: uuid.UUID,
file: UploadFile = File(...), file: UploadFile = File(...),
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user),
storage_service: FileStorageService = Depends(get_file_storage_service),
): ):
""" """Upload file to storage backend"""
upload file api_logger.info(f"upload file request: kb_id={kb_id}, parent_id={parent_id}, filename={file.filename}")
"""
api_logger.info(f"upload file request: kb_id={kb_id}, parent_id={parent_id}, filename={file.filename}, username: {current_user.username}")
# Read the contents of the file
contents = await file.read() contents = await file.read()
# Check file size
file_size = len(contents) file_size = len(contents)
print(f"file size: {file_size} byte")
if file_size == 0: if file_size == 0:
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The file is empty.")
status_code=status.HTTP_400_BAD_REQUEST,
detail="The file is empty."
)
# If the file size exceeds 50MB (50 * 1024 * 1024 bytes)
if file_size > settings.MAX_FILE_SIZE: if file_size > settings.MAX_FILE_SIZE:
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"File size exceeds {settings.MAX_FILE_SIZE} byte limit")
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"The file size exceeds the {settings.MAX_FILE_SIZE}byte limit"
)
# Extract the extension using `os.path.splitext`
_, file_extension = os.path.splitext(file.filename) _, file_extension = os.path.splitext(file.filename)
upload_file = file_schema.FileCreate( file_ext = file_extension.lower()
kb_id=kb_id,
created_by=current_user.id, # Create File record
parent_id=parent_id, upload_file_data = file_schema.FileCreate(
file_name=file.filename, kb_id=kb_id, created_by=current_user.id, parent_id=parent_id,
file_ext=file_extension.lower(), file_name=file.filename, file_ext=file_ext, file_size=file_size,
file_size=file_size,
) )
db_file = file_service.create_file(db=db, file=upload_file, current_user=current_user) db_file = file_service.create_file(db=db, file=upload_file_data, current_user=current_user)
# Construct a save path/files/{kb_id}/{parent_id}/{file.id}{file_extension} # Upload to storage backend
save_dir = os.path.join(settings.FILE_PATH, str(kb_id), str(parent_id)) file_key = generate_kb_file_key(kb_id=kb_id, file_id=db_file.id, file_ext=file_ext)
Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists try:
save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}") await storage_service.storage.upload(file_key=file_key, content=contents, content_type=file.content_type)
except Exception as e:
api_logger.error(f"Storage upload failed: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File storage failed: {str(e)}")
# Save file # Save file_key
with open(save_path, "wb") as f: db_file.file_key = file_key
f.write(contents) db.commit()
db.refresh(db_file)
# Verify whether the file has been saved successfully # Create document (inherit parser_config from knowledge base)
if not os.path.exists(save_path): default_parser_config = {
raise HTTPException( "layout_recognize": "DeepDOC", "chunk_token_num": 128, "delimiter": "\n",
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, "auto_keywords": 0, "auto_questions": 0, "html4excel": "false"
detail="File save failed" }
) try:
db_knowledge = get_kb_by_id(db, knowledge_id=kb_id, current_user=current_user)
if db_knowledge and db_knowledge.parser_config:
default_parser_config.update(dict(db_knowledge.parser_config))
except Exception:
pass
# Create a document
create_data = document_schema.DocumentCreate( create_data = document_schema.DocumentCreate(
kb_id=kb_id, kb_id=kb_id, created_by=current_user.id, file_id=db_file.id,
created_by=current_user.id, file_name=db_file.file_name, file_ext=db_file.file_ext, file_size=db_file.file_size,
file_id=db_file.id, file_meta={}, parser_id="naive", parser_config=default_parser_config
file_name=db_file.file_name,
file_ext=db_file.file_ext,
file_size=db_file.file_size,
file_meta={},
parser_id="naive",
parser_config={
"layout_recognize": "DeepDOC",
"chunk_token_num": 128,
"delimiter": "\n",
"auto_keywords": 0,
"auto_questions": 0,
"html4excel": "false"
}
) )
db_document = document_service.create_document(db=db, document=create_data, current_user=current_user) db_document = document_service.create_document(db=db, document=create_data, current_user=current_user)
@@ -221,123 +166,73 @@ async def custom_text(
parent_id: uuid.UUID, parent_id: uuid.UUID,
create_data: file_schema.CustomTextFileCreate, create_data: file_schema.CustomTextFileCreate,
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user),
storage_service: FileStorageService = Depends(get_file_storage_service),
): ):
""" """Custom text upload"""
custom text
"""
api_logger.info(f"custom text upload request: kb_id={kb_id}, parent_id={parent_id}, title={create_data.title}, content={create_data.content}, username: {current_user.username}")
# Check file content size
# 将内容编码为字节UTF-8
content_bytes = create_data.content.encode('utf-8') content_bytes = create_data.content.encode('utf-8')
file_size = len(content_bytes) file_size = len(content_bytes)
print(f"file size: {file_size} byte")
if file_size == 0: if file_size == 0:
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The content is empty.")
status_code=status.HTTP_400_BAD_REQUEST,
detail="The content is empty."
)
# If the file size exceeds 50MB (50 * 1024 * 1024 bytes)
if file_size > settings.MAX_FILE_SIZE: if file_size > settings.MAX_FILE_SIZE:
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Content size exceeds {settings.MAX_FILE_SIZE} byte limit")
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"The content size exceeds the {settings.MAX_FILE_SIZE}byte limit"
)
upload_file = file_schema.FileCreate( upload_file_data = file_schema.FileCreate(
kb_id=kb_id, kb_id=kb_id, created_by=current_user.id, parent_id=parent_id,
created_by=current_user.id, file_name=f"{create_data.title}.txt", file_ext=".txt", file_size=file_size,
parent_id=parent_id,
file_name=f"{create_data.title}.txt",
file_ext=".txt",
file_size=file_size,
) )
db_file = file_service.create_file(db=db, file=upload_file, current_user=current_user) db_file = file_service.create_file(db=db, file=upload_file_data, current_user=current_user)
# Construct a save path/files/{kb_id}/{parent_id}/{file.id}{file_extension} # Upload to storage backend
save_dir = os.path.join(settings.FILE_PATH, str(kb_id), str(parent_id)) file_key = generate_kb_file_key(kb_id=kb_id, file_id=db_file.id, file_ext=".txt")
Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists try:
save_path = os.path.join(save_dir, f"{db_file.id}.txt") await storage_service.storage.upload(file_key=file_key, content=content_bytes, content_type="text/plain")
except Exception as e:
api_logger.error(f"Storage upload failed: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File storage failed: {str(e)}")
# Save file db_file.file_key = file_key
with open(save_path, "wb") as f: db.commit()
f.write(content_bytes) db.refresh(db_file)
# Verify whether the file has been saved successfully
if not os.path.exists(save_path):
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="File save failed"
)
# Create a document
create_document_data = document_schema.DocumentCreate( create_document_data = document_schema.DocumentCreate(
kb_id=kb_id, kb_id=kb_id, created_by=current_user.id, file_id=db_file.id,
created_by=current_user.id, file_name=db_file.file_name, file_ext=db_file.file_ext, file_size=db_file.file_size,
file_id=db_file.id, file_meta={}, parser_id="naive",
file_name=db_file.file_name, parser_config={"layout_recognize": "DeepDOC", "chunk_token_num": 128, "delimiter": "\n",
file_ext=db_file.file_ext, "auto_keywords": 0, "auto_questions": 0, "html4excel": "false"}
file_size=db_file.file_size,
file_meta={},
parser_id="naive",
parser_config={
"layout_recognize": "DeepDOC",
"chunk_token_num": 128,
"delimiter": "\n",
"auto_keywords": 0,
"auto_questions": 0,
"html4excel": "false"
}
) )
db_document = document_service.create_document(db=db, document=create_document_data, current_user=current_user) db_document = document_service.create_document(db=db, document=create_document_data, current_user=current_user)
api_logger.info(f"custom text upload successfully: {create_data.title} (file_id: {db_file.id}, document_id: {db_document.id})")
return success(data=jsonable_encoder(document_schema.Document.model_validate(db_document)), msg="custom text upload successful") return success(data=jsonable_encoder(document_schema.Document.model_validate(db_document)), msg="custom text upload successful")
@router.get("/{file_id}", response_model=Any) @router.get("/{file_id}", response_model=Any)
async def get_file( async def get_file(
file_id: uuid.UUID, file_id: uuid.UUID,
db: Session = Depends(get_db) db: Session = Depends(get_db),
storage_service: FileStorageService = Depends(get_file_storage_service),
) -> Any: ) -> Any:
""" """Download file by file_id"""
Download the file based on the file_id
- Query file information from the database
- Construct the file path and check if it exists
- Return a FileResponse to download the file
"""
api_logger.info(f"Download the file based on the file_id: file_id={file_id}")
# 1. Query file information from the database
db_file = file_service.get_file_by_id(db, file_id=file_id) db_file = file_service.get_file_by_id(db, file_id=file_id)
if not db_file: if not db_file:
api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The file does not exist or you do not have permission to access it"
)
# 2. Construct file path/files/{kb_id}/{parent_id}/{file.id}{file.file_ext} if not db_file.file_key:
file_path = os.path.join( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File has no storage key (legacy data not migrated)")
settings.FILE_PATH,
str(db_file.kb_id),
str(db_file.parent_id),
f"{db_file.id}{db_file.file_ext}"
)
# 3. Check if the file exists try:
if not os.path.exists(file_path): content = await storage_service.download_file(db_file.file_key)
raise HTTPException( except Exception as e:
status_code=status.HTTP_404_NOT_FOUND, api_logger.error(f"Storage download failed: {e}")
detail="File not found (possibly deleted)" raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found in storage")
)
# 4.Return FileResponse (automatically handle download) import mimetypes
return FileResponse( media_type = mimetypes.guess_type(db_file.file_name)[0] or "application/octet-stream"
path=file_path, return Response(
filename=db_file.file_name, # Use original file name content=content,
media_type="application/octet-stream" # Universal binary stream type media_type=media_type,
headers={"Content-Disposition": f'attachment; filename="{db_file.file_name}"'}
) )
@@ -348,50 +243,22 @@ async def update_file(
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user)
): ):
""" """Update file information (such as file name)"""
Update file information (such as file name)
- Only specified fields such as file_name are allowed to be modified
"""
api_logger.debug(f"Query the file to be updated: {file_id}")
# 1. Check if the file exists
db_file = file_service.get_file_by_id(db, file_id=file_id) db_file = file_service.get_file_by_id(db, file_id=file_id)
if not db_file: if not db_file:
api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The file does not exist or you do not have permission to access it"
)
# 2. Update fields (only update non-null fields)
api_logger.debug(f"Start updating the file fields: {file_id}")
updated_fields = []
for field, value in update_data.dict(exclude_unset=True).items(): for field, value in update_data.dict(exclude_unset=True).items():
if hasattr(db_file, field): if hasattr(db_file, field):
old_value = getattr(db_file, field) setattr(db_file, field, value)
if old_value != value:
# update value
setattr(db_file, field, value)
updated_fields.append(f"{field}: {old_value} -> {value}")
if updated_fields:
api_logger.debug(f"updated fields: {', '.join(updated_fields)}")
# 3. Save to database
try: try:
db.commit() db.commit()
db.refresh(db_file) db.refresh(db_file)
api_logger.info(f"The file has been successfully updated: {db_file.file_name} (ID: {db_file.id})")
except Exception as e: except Exception as e:
db.rollback() db.rollback()
api_logger.error(f"File update failed: file_id={file_id} - {str(e)}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"File update failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"File update failed: {str(e)}"
)
# 4. Return the updated file
return success(data=jsonable_encoder(file_schema.File.model_validate(db_file)), msg="File information updated successfully") return success(data=jsonable_encoder(file_schema.File.model_validate(db_file)), msg="File information updated successfully")
@@ -399,60 +266,43 @@ async def update_file(
async def delete_file( async def delete_file(
file_id: uuid.UUID, file_id: uuid.UUID,
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user) current_user: User = Depends(get_current_user),
storage_service: FileStorageService = Depends(get_file_storage_service),
): ):
""" """Delete a file or folder"""
Delete a file or folder api_logger.info(f"Request to delete file: file_id={file_id}")
""" await _delete_file(db=db, file_id=file_id, current_user=current_user, storage_service=storage_service)
api_logger.info(f"Request to delete file: file_id={file_id}, username: {current_user.username}")
await _delete_file(db=db, file_id=file_id, current_user=current_user)
return success(msg="File deleted successfully") return success(msg="File deleted successfully")
async def _delete_file( async def _delete_file(
file_id: uuid.UUID, file_id: uuid.UUID,
db: Session = Depends(get_db), db: Session,
current_user: User = Depends(get_current_user) current_user: User,
storage_service: FileStorageService,
) -> None: ) -> None:
""" """Delete a file or folder from storage and database"""
Delete a file or folder
"""
# 1. Check if the file exists
db_file = file_service.get_file_by_id(db, file_id=file_id) db_file = file_service.get_file_by_id(db, file_id=file_id)
if not db_file: if not db_file:
api_logger.warning(f"The file does not exist or you do not have permission to access it: file_id={file_id}") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The file does not exist or you do not have permission to access it"
)
# 2. Construct physical path # Delete from storage backend
file_path = Path(
settings.FILE_PATH,
str(db_file.kb_id),
str(db_file.id)
) if db_file.file_ext == 'folder' else Path(
settings.FILE_PATH,
str(db_file.kb_id),
str(db_file.parent_id),
f"{db_file.id}{db_file.file_ext}"
)
# 3. Delete physical files/folders
try:
if file_path.exists():
if db_file.file_ext == 'folder':
shutil.rmtree(file_path) # Recursively delete folders
else:
file_path.unlink() # Delete a single file
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete physical file/folder: {str(e)}"
)
# 4.Delete db_file
if db_file.file_ext == 'folder': if db_file.file_ext == 'folder':
# For folders, delete all child files from storage first
child_files = db.query(file_model.File).filter(file_model.File.parent_id == db_file.id).all()
for child in child_files:
if child.file_key:
try:
await storage_service.delete_file(child.file_key)
except Exception as e:
api_logger.warning(f"Failed to delete child file from storage: {child.file_key} - {e}")
db.query(file_model.File).filter(file_model.File.parent_id == db_file.id).delete() db.query(file_model.File).filter(file_model.File.parent_id == db_file.id).delete()
else:
if db_file.file_key:
try:
await storage_service.delete_file(db_file.file_key)
except Exception as e:
api_logger.warning(f"Failed to delete file from storage: {db_file.file_key} - {e}")
db.delete(db_file) db.delete(db_file)
db.commit() db.commit()

View File

@@ -173,6 +173,8 @@ async def delete_tool(
return success(msg="工具删除成功") return success(msg="工具删除成功")
except ValueError as e: except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
except HTTPException:
raise
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@@ -249,6 +251,8 @@ async def parse_openapi_schema(
if result["success"] is False: if result["success"] is False:
raise HTTPException(status_code=400, detail=result["message"]) raise HTTPException(status_code=400, detail=result["message"])
return success(data=result, msg="Schema解析完成") return success(data=result, msg="Schema解析完成")
except HTTPException:
raise
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))

View File

@@ -94,9 +94,9 @@ async def write(
# ) # )
scheduler.push_task( scheduler.push_task(
"app.core.memory.agent.write_message", "app.core.memory.agent.write_message",
actual_end_user_id, str(actual_end_user_id),
{ {
"end_user_id": actual_end_user_id, "end_user_id": str(actual_end_user_id),
"message": structured_messages, "message": structured_messages,
"config_id": str(actual_config_id), "config_id": str(actual_config_id),
"storage_type": storage_type, "storage_type": storage_type,
@@ -177,11 +177,11 @@ async def window_dialogue(end_user_id, langchain_messages, memory_config, scope)
scheduler.push_task( scheduler.push_task(
"app.core.memory.agent.write_message", "app.core.memory.agent.write_message",
end_user_id, str(end_user_id),
{ {
"end_user_id": end_user_id, "end_user_id": str(end_user_id),
"message": redis_messages, "message": redis_messages,
"config_id": config_id, "config_id": str(config_id),
"storage_type": AgentMemory_Long_Term.STORAGE_NEO4J, "storage_type": AgentMemory_Long_Term.STORAGE_NEO4J,
"user_rag_memory_id": "" "user_rag_memory_id": ""
} }

View File

@@ -14,6 +14,7 @@ Transcribe the content from the provided PDF page image into clean Markdown form
6. Do NOT wrap the output in ```markdown or ``` blocks. 6. Do NOT wrap the output in ```markdown or ``` blocks.
7. Only apply Markdown structure to headings, paragraphs, lists, and tables, strictly based on the layout of the image. Do NOT create tables unless an actual table exists in the image. 7. Only apply Markdown structure to headings, paragraphs, lists, and tables, strictly based on the layout of the image. Do NOT create tables unless an actual table exists in the image.
8. Preserve the original language, information, and order exactly as shown in the image. 8. Preserve the original language, information, and order exactly as shown in the image.
9. Your output language MUST match the language of the content in the image. If the image contains Chinese text, output in Chinese. If English, output in English. Never translate.
{% if page %} {% if page %}
At the end of the transcription, add the page divider: `--- Page {{ page }} ---`. At the end of the transcription, add the page divider: `--- Page {{ page }} ---`.

View File

@@ -16,6 +16,7 @@ from app.core.workflow.engine.runtime_schema import ExecutionContext
from app.core.workflow.engine.state_manager import WorkflowStateManager from app.core.workflow.engine.state_manager import WorkflowStateManager
from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator
from app.core.workflow.engine.variable_pool import VariablePool, VariablePoolInitializer from app.core.workflow.engine.variable_pool import VariablePool, VariablePoolInitializer
from app.core.workflow.nodes.base_node import NodeExecutionError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -326,10 +327,43 @@ class WorkflowExecutor:
logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}", logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}",
exc_info=True) exc_info=True)
# 1) 尝试从 checkpoint 回补已成功节点的 node_outputs
recovered: dict[str, Any] = {}
try:
if self.graph is not None:
recovered = self.graph.get_state(
self.execution_context.checkpoint_config
).values or {}
except Exception as recover_err:
logger.warning(
f"Recover state on failure failed: {recover_err}, "
f"execution_id={self.execution_context.execution_id}"
)
if result is None: if result is None:
result = {"error": str(e)} result = dict(recovered) if recovered else {}
else: else:
result["error"] = str(e) # 已有 result 与 recovered 合并node_outputs 深度合并
for k, v in recovered.items():
if k == "node_outputs" and isinstance(v, dict):
existing = result.get("node_outputs") or {}
result["node_outputs"] = {**v, **existing}
else:
result.setdefault(k, v)
# 2) 如果是节点抛出的 NodeExecutionError把失败节点的 node_output 注入 node_outputs
failed_node_id: str | None = None
if isinstance(e, NodeExecutionError):
failed_node_id = e.node_id
node_outputs = result.setdefault("node_outputs", {})
# 不覆盖已有(理论上不会有),保底写入失败节点记录
node_outputs.setdefault(e.node_id, e.node_output)
result["error"] = str(e)
if failed_node_id:
result["error_node"] = failed_node_id
yield { yield {
"event": "workflow_end", "event": "workflow_end",
"data": self.result_builder.build_final_output( "data": self.result_builder.build_final_output(

View File

@@ -1,5 +1,6 @@
import asyncio import asyncio
import logging import logging
import time
import uuid import uuid
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime from datetime import datetime
@@ -22,6 +23,20 @@ from app.services.multimodal_service import MultimodalService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class NodeExecutionError(Exception):
"""节点执行失败异常。
携带失败节点的完整 node_output供 executor 兜底注入 node_outputs
保证 workflow_executions.output_data 里能看到失败节点的日志记录。
"""
def __init__(self, node_id: str, node_output: dict[str, Any], error_message: str):
super().__init__(f"Node {node_id} execution failed: {error_message}")
self.node_id = node_id
self.node_output = node_output
self.error_message = error_message
class BaseNode(ABC): class BaseNode(ABC):
"""Base class for workflow nodes. """Base class for workflow nodes.
@@ -396,6 +411,8 @@ class BaseNode(ABC):
"elapsed_time": elapsed_time, "elapsed_time": elapsed_time,
"token_usage": token_usage, "token_usage": token_usage,
"error": None, "error": None,
# 单调递增序号用于日志按执行顺序排序JSONB 不保证 key 顺序)
"execution_order": time.monotonic_ns(),
**self._extract_extra_fields(business_result), **self._extract_extra_fields(business_result),
} }
final_output = { final_output = {
@@ -444,7 +461,9 @@ class BaseNode(ABC):
"output": None, "output": None,
"elapsed_time": elapsed_time, "elapsed_time": elapsed_time,
"token_usage": None, "token_usage": None,
"error": error_message "error": error_message,
# 单调递增序号,用于日志按执行顺序排序
"execution_order": time.monotonic_ns(),
} }
# if error_edge: # if error_edge:
@@ -466,7 +485,12 @@ class BaseNode(ABC):
**node_output **node_output
}) })
logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}") logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}")
raise Exception(f"Node {self.node_id} execution failed: {error_message}") # 抛出自定义异常,把 node_output 带给 executor供其写入 node_outputs
raise NodeExecutionError(
node_id=self.node_id,
node_output=node_output,
error_message=error_message,
)
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
"""Extracts the input data for this node (used for logging or audit). """Extracts the input data for this node (used for logging or audit).

View File

@@ -174,6 +174,9 @@ class IterationRuntime:
continue continue
node_type = result.get("node_outputs", {}).get(node_name, {}).get("node_type") node_type = result.get("node_outputs", {}).get(node_name, {}).get("node_type")
cycle_variable = {"item": item} if node_type == NodeType.CYCLE_START else None cycle_variable = {"item": item} if node_type == NodeType.CYCLE_START else None
node_cfg = next(
(n for n in self.cycle_nodes if n.get("id") == node_name), None
)
self.event_write({ self.event_write({
"type": "cycle_item", "type": "cycle_item",
"data": { "data": {

View File

@@ -255,9 +255,18 @@ class HttpRequestNode(BaseNode):
case HttpContentType.NONE: case HttpContentType.NONE:
return {} return {}
case HttpContentType.JSON: case HttpContentType.JSON:
content["json"] = json.loads(self._render_template( rendered = self._render_template(
self.typed_config.body.data, variable_pool self.typed_config.body.data, variable_pool
)) )
if not rendered or not rendered.strip():
# 第三方导入的工作流可能出现 content_type=json 但 data 为空的情况,视为无 body
return {}
try:
content["json"] = json.loads(rendered)
except json.JSONDecodeError as e:
raise RuntimeError(
f"Invalid JSON body for HTTP request node: {e.msg} (data={rendered!r})"
)
case HttpContentType.FROM_DATA: case HttpContentType.FROM_DATA:
data = {} data = {}
files = [] files = []

View File

@@ -334,7 +334,8 @@ class KnowledgeRetrievalNode(BaseNode):
for kb_config in knowledge_bases: for kb_config in knowledge_bases:
db_knowledge = knowledge_repository.get_knowledge_by_id(db=db, knowledge_id=kb_config.kb_id) db_knowledge = knowledge_repository.get_knowledge_by_id(db=db, knowledge_id=kb_config.kb_id)
if not (db_knowledge and db_knowledge.chunk_num > 0 and db_knowledge.status == 1): if not (db_knowledge and db_knowledge.chunk_num > 0 and db_knowledge.status == 1):
raise RuntimeError("The knowledge base does not exist or access is denied.") logger.warning("The knowledge base does not exist or access is denied.")
continue
tasks.append(self.knowledge_retrieval(db, query, db_knowledge, kb_config)) tasks.append(self.knowledge_retrieval(db, query, db_knowledge, kb_config))
if tasks: if tasks:
result = await asyncio.gather(*tasks) result = await asyncio.gather(*tasks)

View File

@@ -15,4 +15,5 @@ class File(Base):
file_ext = Column(String, index=True, nullable=False, comment="file extension:folder|pdf") file_ext = Column(String, index=True, nullable=False, comment="file extension:folder|pdf")
file_size = Column(Integer, default=0, comment="file size(byte)") file_size = Column(Integer, default=0, comment="file size(byte)")
file_url = Column(String, index=True, nullable=True, comment="file comes from a website url") file_url = Column(String, index=True, nullable=True, comment="file comes from a website url")
file_key = Column(String(512), nullable=True, index=True, comment="storage file key for FileStorageService")
created_at = Column(DateTime, default=datetime.datetime.now) created_at = Column(DateTime, default=datetime.datetime.now)

View File

@@ -1,13 +1,15 @@
import uuid import uuid
from typing import Optional from typing import Optional
from sqlalchemy import select, desc, func from sqlalchemy import select, desc, func, or_, cast, Text
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.core.exceptions import ResourceNotFoundException from app.core.exceptions import ResourceNotFoundException
from app.core.logging_config import get_db_logger from app.core.logging_config import get_db_logger
from app.models import Conversation, Message from app.models import Conversation, Message
from app.models.app_model import AppType
from app.models.conversation_model import ConversationDetail from app.models.conversation_model import ConversationDetail
from app.models.workflow_model import WorkflowExecution
logger = get_db_logger() logger = get_db_logger()
@@ -206,7 +208,8 @@ class ConversationRepository:
is_draft: Optional[bool] = None, is_draft: Optional[bool] = None,
keyword: Optional[str] = None, keyword: Optional[str] = None,
page: int = 1, page: int = 1,
pagesize: int = 20 pagesize: int = 20,
app_type: Optional[str] = None,
) -> tuple[list[Conversation], int]: ) -> tuple[list[Conversation], int]:
""" """
查询应用日志会话列表(带分页和过滤) 查询应用日志会话列表(带分页和过滤)
@@ -218,6 +221,9 @@ class ConversationRepository:
keyword: 搜索关键词(匹配消息内容) keyword: 搜索关键词(匹配消息内容)
page: 页码(从 1 开始) page: 页码(从 1 开始)
pagesize: 每页数量 pagesize: 每页数量
app_type: 应用类型。WORKFLOW 类型改用 workflow_executions 的
input_data/output_data 做关键词过滤(因为失败的工作流不会写入 messages 表);
其他类型仍走 messages 表。
Returns: Returns:
Tuple[List[Conversation], int]: (会话列表,总数) Tuple[List[Conversation], int]: (会话列表,总数)
@@ -234,12 +240,28 @@ class ConversationRepository:
# 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation # 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation
if keyword: if keyword:
# 查找包含关键词的 conversation_id 列表 kw_pattern = f"%{keyword}%"
keyword_stmt = ( if app_type == AppType.WORKFLOW:
select(Message.conversation_id) # 工作流:从 workflow_executions 的 input_data / output_data 匹配
.where(Message.content.ilike(f"%{keyword}%")) # messages 表只存开场白 assistant 消息,失败的工作流也不会写入)
.distinct() keyword_stmt = (
) select(WorkflowExecution.conversation_id)
.where(
WorkflowExecution.conversation_id.is_not(None),
or_(
cast(WorkflowExecution.input_data, Text).ilike(kw_pattern),
cast(WorkflowExecution.output_data, Text).ilike(kw_pattern),
),
)
.distinct()
)
else:
# Agent 等其他类型:仍走 messages 表user + assistant 内容)
keyword_stmt = (
select(Message.conversation_id)
.where(Message.content.ilike(kw_pattern))
.distinct()
)
base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt)) base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt))
# Calculate total number of records # Calculate total number of records

View File

@@ -3,7 +3,7 @@ import uuid
from typing import Optional, Any, List, Dict, Union from typing import Optional, Any, List, Dict, Union
from enum import Enum, StrEnum from enum import Enum, StrEnum
from pydantic import BaseModel, Field, ConfigDict, field_serializer, field_validator from pydantic import BaseModel, Field, ConfigDict, field_serializer, field_validator, model_serializer
from app.schemas.workflow_schema import WorkflowConfigCreate from app.schemas.workflow_schema import WorkflowConfigCreate
@@ -661,9 +661,11 @@ class DraftRunResponse(BaseModel):
suggested_questions: List[str] = Field(default_factory=list, description="下一步建议问题") suggested_questions: List[str] = Field(default_factory=list, description="下一步建议问题")
citations: List[Dict[str, Any]] = Field(default_factory=list, description="引用来源") citations: List[Dict[str, Any]] = Field(default_factory=list, description="引用来源")
audio_url: Optional[str] = Field(default=None, description="TTS 语音URL") audio_url: Optional[str] = Field(default=None, description="TTS 语音URL")
audio_status: Optional[str] = Field(default=None, description="TTS 语音状态")
def model_dump(self, **kwargs): @model_serializer(mode="wrap")
data = super().model_dump(**kwargs) def _serialize(self, handler):
data = handler(self)
if not data.get("reasoning_content"): if not data.get("reasoning_content"):
data.pop("reasoning_content", None) data.pop("reasoning_content", None)
return data return data

View File

@@ -2,7 +2,7 @@
import uuid import uuid
import datetime import datetime
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field, ConfigDict, field_serializer from pydantic import BaseModel, Field, ConfigDict, field_serializer, model_serializer
# 导入 FileInput用于体验运行 # 导入 FileInput用于体验运行
from app.schemas.app_schema import FileInput from app.schemas.app_schema import FileInput
@@ -94,6 +94,18 @@ class ChatResponse(BaseModel):
message_id: str message_id: str
usage: Optional[Dict[str, Any]] = None usage: Optional[Dict[str, Any]] = None
elapsed_time: Optional[float] = None elapsed_time: Optional[float] = None
reasoning_content: Optional[str] = None
suggested_questions: Optional[List[str]] = None
citations: Optional[List[Dict[str, Any]]] = None
audio_url: Optional[str] = None
audio_status: Optional[str] = None
@model_serializer(mode="wrap")
def _serialize(self, handler):
data = handler(self)
if not data.get("reasoning_content"):
data.pop("reasoning_content", None)
return data
# ---------- Conversation Summary Schemas ---------- # ---------- Conversation Summary Schemas ----------

View File

@@ -11,6 +11,7 @@ class FileBase(BaseModel):
file_ext: str file_ext: str
file_size: int file_size: int
file_url: str | None = None file_url: str | None = None
file_key: str | None = None
created_at: datetime.datetime | None = None created_at: datetime.datetime | None = None

View File

@@ -9,7 +9,7 @@ from sqlalchemy.orm import Session
from sqlalchemy import select from sqlalchemy import select
from app.aioRedis import aio_redis from app.aioRedis import aio_redis
from app.models.api_key_model import ApiKey from app.models.api_key_model import ApiKey, ApiKeyType
from app.repositories.api_key_repository import ApiKeyRepository, ApiKeyLogRepository from app.repositories.api_key_repository import ApiKeyRepository, ApiKeyLogRepository
from app.schemas import api_key_schema from app.schemas import api_key_schema
from app.schemas.response_schema import PageData, PageMeta from app.schemas.response_schema import PageData, PageMeta
@@ -65,7 +65,8 @@ class ApiKeyService:
BizCode.BAD_REQUEST BizCode.BAD_REQUEST
) )
if data.resource_id: # SERVICE 类型的 resource_id 指向 workspace非应用跳过应用发布校验
if data.resource_id and data.type != ApiKeyType.SERVICE.value:
app = db.get(App, data.resource_id) app = db.get(App, data.resource_id)
if not app or not app.current_release_id: if not app or not app.current_release_id:
raise BusinessException("该应用未发布", BizCode.APP_NOT_PUBLISHED) raise BusinessException("该应用未发布", BizCode.APP_NOT_PUBLISHED)
@@ -452,9 +453,12 @@ class ApiKeyAuthService:
def check_app_published(db: Session, api_key_obj: ApiKey) -> None: def check_app_published(db: Session, api_key_obj: ApiKey) -> None:
""" """
检查应用是否已发布,未发布则抛出异常 检查应用是否已发布,未发布则抛出异常
SERVICE 类型的 api_key 不绑定应用resource_id 指向 workspace跳过校验
""" """
if not api_key_obj.resource_id: if not api_key_obj.resource_id:
return return
if api_key_obj.type == ApiKeyType.SERVICE.value:
return
app = db.get(App, api_key_obj.resource_id) app = db.get(App, api_key_obj.resource_id)
if not app or not app.current_release_id: if not app or not app.current_release_id:
raise BusinessException("应用未发布,不可用", BizCode.APP_NOT_PUBLISHED) raise BusinessException("应用未发布,不可用", BizCode.APP_NOT_PUBLISHED)

View File

@@ -317,7 +317,7 @@ class AppChatService:
"suggested_questions": suggested_questions, "suggested_questions": suggested_questions,
"citations": filtered_citations, "citations": filtered_citations,
"audio_url": audio_url, "audio_url": audio_url,
"audio_status": "pending" "audio_status": "pending" if audio_url else None
} }
async def agnet_chat_stream( async def agnet_chat_stream(

View File

@@ -32,6 +32,7 @@ class AppLogService:
pagesize: int = 20, pagesize: int = 20,
is_draft: Optional[bool] = None, is_draft: Optional[bool] = None,
keyword: Optional[str] = None, keyword: Optional[str] = None,
app_type: Optional[str] = None,
) -> Tuple[list[Conversation], int]: ) -> Tuple[list[Conversation], int]:
""" """
查询应用日志会话列表 查询应用日志会话列表
@@ -43,6 +44,7 @@ class AppLogService:
pagesize: 每页数量 pagesize: 每页数量
is_draft: 是否草稿会话None表示返回全部 is_draft: 是否草稿会话None表示返回全部
keyword: 搜索关键词(匹配消息内容) keyword: 搜索关键词(匹配消息内容)
app_type: 应用类型WORKFLOW 时关键词将从 workflow_executions 搜索)
Returns: Returns:
Tuple[list[Conversation], int]: (会话列表,总数) Tuple[list[Conversation], int]: (会话列表,总数)
@@ -55,7 +57,8 @@ class AppLogService:
"page": page, "page": page,
"pagesize": pagesize, "pagesize": pagesize,
"is_draft": is_draft, "is_draft": is_draft,
"keyword": keyword "keyword": keyword,
"app_type": app_type,
} }
) )
@@ -66,7 +69,8 @@ class AppLogService:
is_draft=is_draft, is_draft=is_draft,
keyword=keyword, keyword=keyword,
page=page, page=page,
pagesize=pagesize pagesize=pagesize,
app_type=app_type,
) )
logger.info( logger.info(
@@ -368,8 +372,16 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod
if not output_data: if not output_data:
return [] return []
node_outputs: dict = output_data.get("node_outputs") or {} node_outputs: dict = output_data.get("node_outputs") or {}
# 按 execution_order节点执行时写入的单调递增序号排序。
# PostgreSQL JSONB 不保证 key 顺序,不能依赖 dict 插入顺序;
# 缺失 execution_order 的历史数据退化到 0保持在最前。
ordered_items = sorted(
node_outputs.items(),
key=lambda kv: (kv[1] or {}).get("execution_order", 0)
if isinstance(kv[1], dict) else 0
)
result = [] result = []
for node_id, node_data in node_outputs.items(): for node_id, node_data in ordered_items:
if not isinstance(node_data, dict): if not isinstance(node_data, dict):
continue continue
output = dict(node_data) output = dict(node_data)
@@ -382,6 +394,8 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod
inp = output.pop("input", None) inp = output.pop("input", None)
elapsed_time = output.pop("elapsed_time", None) elapsed_time = output.pop("elapsed_time", None)
token_usage = output.pop("token_usage", None) token_usage = output.pop("token_usage", None)
# execution_order 仅用于排序,不返回给前端
output.pop("execution_order", None)
result.append(AppLogNodeExecution( result.append(AppLogNodeExecution(
node_id=node_id, node_id=node_id,
node_type=node_type, node_type=node_type,

View File

@@ -754,7 +754,7 @@ class AgentRunService:
) if not sub_agent else [], ) if not sub_agent else [],
"citations": filtered_citations, "citations": filtered_citations,
"audio_url": audio_url, "audio_url": audio_url,
"audio_status": "pending" "audio_status": "pending" if audio_url else None
} }
logger.info( logger.info(

View File

@@ -34,26 +34,7 @@ def generate_file_key(
Generate a unique file key for storage. Generate a unique file key for storage.
The file key follows the format: {tenant_id}/{workspace_id}/{file_id}{file_ext} The file key follows the format: {tenant_id}/{workspace_id}/{file_id}{file_ext}
Args:
tenant_id: The tenant UUID.
workspace_id: The workspace UUID.
file_id: The file UUID.
file_ext: The file extension (e.g., '.pdf', '.txt').
Returns:
A unique file key string.
Example:
>>> generate_file_key(
... uuid.UUID('550e8400-e29b-41d4-a716-446655440000'),
... uuid.UUID('660e8400-e29b-41d4-a716-446655440001'),
... uuid.UUID('770e8400-e29b-41d4-a716-446655440002'),
... '.pdf'
... )
'550e8400-e29b-41d4-a716-446655440000/660e8400-e29b-41d4-a716-446655440001/770e8400-e29b-41d4-a716-446655440002.pdf'
""" """
# Ensure file_ext starts with a dot
if file_ext and not file_ext.startswith('.'): if file_ext and not file_ext.startswith('.'):
file_ext = f'.{file_ext}' file_ext = f'.{file_ext}'
if workspace_id: if workspace_id:
@@ -61,6 +42,21 @@ def generate_file_key(
return f"{tenant_id}/{file_id}{file_ext}" return f"{tenant_id}/{file_id}{file_ext}"
def generate_kb_file_key(
kb_id: uuid.UUID,
file_id: uuid.UUID,
file_ext: str,
) -> str:
"""
Generate a file key for knowledge base files.
Format: kb/{kb_id}/{file_id}{file_ext}
"""
if file_ext and not file_ext.startswith('.'):
file_ext = f'.{file_ext}'
return f"kb/{kb_id}/{file_id}{file_ext}"
class FileStorageService: class FileStorageService:
""" """
High-level service for file storage operations. High-level service for file storage operations.

View File

@@ -95,7 +95,7 @@ class DashScopeFormatStrategy(MultimodalFormatStrategy):
"""通义千问文档格式""" """通义千问文档格式"""
return True, { return True, {
"type": "text", "type": "text",
"text": f"<document name=\"{file_name}\">\n{text}\n</document>" "text": f"<document name=\"{file_name}\">\n文档内容:\n{text}\n</document>"
} }
async def format_audio( async def format_audio(
@@ -167,6 +167,7 @@ class BedrockFormatStrategy(MultimodalFormatStrategy):
async def format_document(self, file_name: str, text: str) -> tuple[bool, Dict[str, Any]]: async def format_document(self, file_name: str, text: str) -> tuple[bool, Dict[str, Any]]:
"""Bedrock/Anthropic 文档格式(需要 base64 编码)""" """Bedrock/Anthropic 文档格式(需要 base64 编码)"""
# Bedrock 文档需要 base64 编码 # Bedrock 文档需要 base64 编码
text = f"文档内容:\n{text}\n"
text_bytes = text.encode('utf-8') text_bytes = text.encode('utf-8')
base64_text = base64.b64encode(text_bytes).decode('utf-8') base64_text = base64.b64encode(text_bytes).decode('utf-8')
@@ -223,7 +224,7 @@ class OpenAIFormatStrategy(MultimodalFormatStrategy):
"""OpenAI 文档格式""" """OpenAI 文档格式"""
return True, { return True, {
"type": "text", "type": "text",
"text": f"<document name=\"{file_name}\">\n{text}\n</document>" "text": f"<document name=\"{file_name}\">\n文档内容:\n{text}\n</document>"
} }
async def format_audio( async def format_audio(
@@ -395,7 +396,7 @@ class MultimodalService:
ext = img_info.get("ext", "png") ext = img_info.get("ext", "png")
try: try:
_, img_url = await self._save_doc_image_to_storage(img_info["bytes"], ext, tenant_id, workspace_id) _, img_url = await self._save_doc_image_to_storage(img_info["bytes"], ext, tenant_id, workspace_id)
placeholder = f"{page}页 第{index + 1}图片" if page > 0 else f"{index + 1}图片" placeholder = f"{page}页 第{index + 1}" if page > 0 else f"{index + 1}"
# 在文本内容中追加图片位置标记 # 在文本内容中追加图片位置标记
if result and result[-1].get("type") in ("text", "document"): if result and result[-1].get("type") in ("text", "document"):
key = "text" if "text" in result[-1] else list(result[-1].keys())[-1] key = "text" if "text" in result[-1] else list(result[-1].keys())[-1]

View File

@@ -210,9 +210,14 @@ def _build_vision_model(file_path: str, db_knowledge):
@celery_app.task(name="app.core.rag.tasks.parse_document") @celery_app.task(name="app.core.rag.tasks.parse_document")
def parse_document(file_path: str, document_id: uuid.UUID): def parse_document(file_key: str, document_id: uuid.UUID, file_name: str = ""):
""" """
Document parsing, vectorization, and storage Document parsing, vectorization, and storage.
Args:
file_key: Storage key for FileStorageService (e.g. "kb/{kb_id}/{file_id}.docx")
document_id: Document UUID
file_name: Original file name (used for extension detection in chunk())
""" """
db_document = None db_document = None
@@ -223,7 +228,6 @@ def parse_document(file_path: str, document_id: uuid.UUID):
with get_db_context() as db: with get_db_context() as db:
try: try:
# Celery JSON 序列化会将 UUID 转为字符串,需要确保类型正确
if not isinstance(document_id, uuid.UUID): if not isinstance(document_id, uuid.UUID):
document_id = uuid.UUID(str(document_id)) document_id = uuid.UUID(str(document_id))
@@ -234,7 +238,11 @@ def parse_document(file_path: str, document_id: uuid.UUID):
if db_knowledge is None: if db_knowledge is None:
raise ValueError(f"Knowledge {db_document.kb_id} not found") raise ValueError(f"Knowledge {db_document.kb_id} not found")
# 1. Document parsing & segmentation # Use file_name from argument or fall back to document record
if not file_name:
file_name = db_document.file_name
# 1. Download file from storage backend
progress_lines.append(f"{datetime.now().strftime('%H:%M:%S')} Start to parse.") progress_lines.append(f"{datetime.now().strftime('%H:%M:%S')} Start to parse.")
start_time = time.time() start_time = time.time()
db_document.progress = 0.0 db_document.progress = 0.0
@@ -245,45 +253,36 @@ def parse_document(file_path: str, document_id: uuid.UUID):
db.commit() db.commit()
db.refresh(db_document) db.refresh(db_document)
# Read file content from storage backend (no NFS dependency)
from app.services.file_storage_service import FileStorageService
import asyncio
storage_service = FileStorageService()
async def _download():
return await storage_service.download_file(file_key)
try:
file_binary = asyncio.run(_download())
except RuntimeError:
# If there's already a running loop (e.g. in some worker configurations)
loop = asyncio.new_event_loop()
try:
file_binary = loop.run_until_complete(_download())
finally:
loop.close()
if not file_binary:
raise IOError(f"Downloaded empty file from storage: {file_key}")
logger.info(f"[ParseDoc] Downloaded {len(file_binary)} bytes from storage key: {file_key}")
def progress_callback(prog=None, msg=None): def progress_callback(prog=None, msg=None):
progress_lines.append(f"{datetime.now().strftime('%H:%M:%S')} parse progress: {prog} msg: {msg}.") progress_lines.append(f"{datetime.now().strftime('%H:%M:%S')} parse progress: {prog} msg: {msg}.")
# Prepare vision_model for parsing # Prepare vision_model for parsing
vision_model = _build_vision_model(file_path, db_knowledge) vision_model = _build_vision_model(file_name, db_knowledge)
# 先将文件读入内存,避免解析过程中依赖 NFS 文件持续可访问
# python-docx 等库在 binary=None 时会用路径直接打开文件,
# 在 NFS/共享存储上可能因缓存失效导致 "Package not found"
max_wait_seconds = 30
wait_interval = 2
waited = 0
file_binary = None
while waited <= max_wait_seconds:
# os.listdir 强制 NFS 客户端刷新目录缓存
parent_dir = os.path.dirname(file_path)
try:
os.listdir(parent_dir)
except OSError:
pass
try:
with open(file_path, "rb") as f:
file_binary = f.read()
if not file_binary:
# NFS 上文件存在但内容为空(可能还在同步中)
raise IOError(f"File is empty (0 bytes), NFS may still be syncing: {file_path}")
break
except (FileNotFoundError, IOError) as e:
if waited >= max_wait_seconds:
raise type(e)(
f"File not accessible at '{file_path}' after waiting {max_wait_seconds}s: {e}"
)
logger.warning(f"File not ready on this node, retrying in {wait_interval}s: {file_path} ({e})")
time.sleep(wait_interval)
waited += wait_interval
from app.core.rag.app.naive import chunk from app.core.rag.app.naive import chunk
logger.info(f"[ParseDoc] file_binary size={len(file_binary)} bytes, type={type(file_binary).__name__}, bool={bool(file_binary)}") logger.info(f"[ParseDoc] file_binary size={len(file_binary)} bytes, type={type(file_binary).__name__}, bool={bool(file_binary)}")
res = chunk(filename=file_path, res = chunk(filename=file_name,
binary=file_binary, binary=file_binary,
from_page=0, from_page=0,
to_page=DEFAULT_PARSE_TO_PAGE, to_page=DEFAULT_PARSE_TO_PAGE,