Merge branch 'feature/20251219_lxc' into develop

This commit is contained in:
lixiangcheng1
2026-01-05 12:19:03 +08:00
11 changed files with 902 additions and 68 deletions

View File

@@ -4,14 +4,17 @@
认证方式: API Key
"""
from fastapi import APIRouter
from . import app_api_controller, rag_api_controller, memory_api_controller
from . import app_api_controller, rag_api_knowledge_controller, rag_api_document_controller, rag_api_file_controller, rag_api_chunk_controller, memory_api_controller
# 创建 V1 API 路由器
service_router = APIRouter()
# 注册子路由
service_router.include_router(app_api_controller.router)
service_router.include_router(rag_api_controller.router)
service_router.include_router(rag_api_knowledge_controller.router)
service_router.include_router(rag_api_document_controller.router)
service_router.include_router(rag_api_file_controller.router)
service_router.include_router(rag_api_chunk_controller.router)
service_router.include_router(memory_api_controller.router)
__all__ = ["service_router"]

View File

@@ -0,0 +1,221 @@
"""RAG 服务接口 - 基于 API Key 认证"""
from typing import Any, Optional, Union
import uuid
from fastapi import APIRouter, Body, Depends, Request, status, Query
from sqlalchemy.orm import Session
from app.controllers import chunk_controller
from app.core.api_key_auth import require_api_key
from app.core.logging_config import get_business_logger
from app.core.rag.models.chunk import QAChunk
from app.core.response_utils import success
from app.db import get_db
from app.schemas import chunk_schema
from app.schemas.api_key_schema import ApiKeyAuth
from app.schemas.response_schema import ApiResponse
from app.services import api_key_service
router = APIRouter(prefix="/chunks", tags=["V1 - RAG API"])
api_logger = get_business_logger()
@router.get("/{kb_id}/{document_id}/previewchunks", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def get_preview_chunks(
kb_id: uuid.UUID,
document_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
page: int = Query(1, gt=0), # Default: 1, which must be greater than 0
pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items
keywords: Optional[str] = Query(None, description="The keywords used to match chunk content")
):
"""
Paged query document block preview list
- Support filtering by document_id
- Support keyword search for segmented content
- Return paging metadata + file list
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await chunk_controller.get_preview_chunks(kb_id=kb_id,
document_id=document_id,
page=page,
pagesize=pagesize,
keywords=keywords,
db=db,
current_user=current_user)
@router.get("/{kb_id}/{document_id}/chunks", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def get_chunks(
kb_id: uuid.UUID,
document_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
page: int = Query(1, gt=0), # Default: 1, which must be greater than 0
pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items
keywords: Optional[str] = Query(None, description="The keywords used to match chunk content")
):
"""
Paged query document chunk list
- Support filtering by document_id
- Support keyword search for segmented content
- Return paging metadata + file list
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await chunk_controller.get_chunks(kb_id=kb_id,
document_id=document_id,
page=page,
pagesize=pagesize,
keywords=keywords,
db=db,
current_user=current_user)
@router.post("/{kb_id}/{document_id}/chunk", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def create_chunk(
kb_id: uuid.UUID,
document_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
content: Union[str, QAChunk] = Body(..., description="Content can be either a string or a QAChunk object"),
):
"""
create chunk
"""
body = await request.json()
create_data = chunk_schema.ChunkCreate(**body)
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await chunk_controller.create_chunk(kb_id=kb_id,
document_id=document_id,
create_data=create_data,
db=db,
current_user=current_user)
@router.get("/{kb_id}/{document_id}/{doc_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def get_chunk(
kb_id: uuid.UUID,
document_id: uuid.UUID,
doc_id: str,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Retrieve document chunk information based on doc_id
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await chunk_controller.get_chunk(kb_id=kb_id,
document_id=document_id,
doc_id=doc_id,
db=db,
current_user=current_user)
@router.put("/{kb_id}/{document_id}/{doc_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def update_chunk(
kb_id: uuid.UUID,
document_id: uuid.UUID,
doc_id: str,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
content: Union[str, QAChunk] = Body(..., description="Content can be either a string or a QAChunk object"),
):
"""
Update document chunk content
"""
body = await request.json()
update_data = chunk_schema.ChunkUpdate(**body)
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await chunk_controller.update_chunk(kb_id=kb_id,
document_id=document_id,
doc_id=doc_id,
update_data=update_data,
db=db,
current_user=current_user)
@router.delete("/{kb_id}/{document_id}/{doc_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def delete_chunk(
kb_id: uuid.UUID,
document_id: uuid.UUID,
doc_id: str,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
delete document chunk
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await chunk_controller.delete_chunk(kb_id=kb_id,
document_id=document_id,
doc_id=doc_id,
db=db,
current_user=current_user)
@router.get("/retrieve_type", response_model=ApiResponse)
def get_retrieve_types():
return success(msg="Successfully obtained the retrieval type", data=list(chunk_schema.RetrieveType))
@router.post("/retrieval", response_model=Any, status_code=status.HTTP_200_OK)
@require_api_key(scopes=["rag"])
async def retrieve_chunks(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
query: str = Body(..., description="question"),
):
"""
retrieve chunk
"""
body = await request.json()
retrieve_data = chunk_schema.ChunkRetrieve(**body)
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await chunk_controller.retrieve_chunks(retrieve_data=retrieve_data,
db=db,
current_user=current_user)

View File

@@ -1,16 +0,0 @@
"""RAG 服务接口 - 基于 API Key 认证"""
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.db import get_db
from app.core.response_utils import success
from app.core.logging_config import get_business_logger
router = APIRouter(prefix="/knowledge", tags=["V1 - RAG API"])
logger = get_business_logger()
@router.get("")
async def list_knowledge():
"""列出可访问的知识库(占位)"""
return success(data=[], msg="RAG API - Coming Soon")

View File

@@ -0,0 +1,172 @@
"""RAG 服务接口 - 基于 API Key 认证"""
from typing import Optional
import uuid
from fastapi import APIRouter, Body, Depends, Request, Query
from sqlalchemy.orm import Session
from app.controllers import document_controller
from app.core.api_key_auth import require_api_key
from app.core.logging_config import get_business_logger
from app.db import get_db
from app.schemas import document_schema
from app.schemas.api_key_schema import ApiKeyAuth
from app.schemas.response_schema import ApiResponse
from app.services import api_key_service
router = APIRouter(prefix="/documents", tags=["V1 - RAG API"])
api_logger = get_business_logger()
@router.get("/{kb_id}/documents", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def get_documents(
kb_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
parent_id: Optional[uuid.UUID] = Query(None, description="parent folder id when type is Folder"),
page: int = Query(1, gt=0), # Default: 1, which must be greater than 0
pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items
orderby: Optional[str] = Query(None, description="Sort fields, such as: created_at,updated_at"),
desc: Optional[bool] = Query(False, description="Is it descending order"),
keywords: Optional[str] = Query(None, description="Search keywords (file name)"),
document_ids: Optional[str] = Query(None, description="document ids, separated by commas")
):
"""
Paged query document list
- Support filtering by kb_id and parent_id
- Support keyword search for file names
- Support dynamic sorting
- Return paging metadata + file list
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await document_controller.get_documents(kb_id=kb_id,
parent_id=parent_id,
page=page,
pagesize=pagesize,
orderby=orderby,
desc=desc,
keywords=keywords,
document_ids=document_ids,
db=db,
current_user=current_user)
@router.post("/document", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def create_document(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
kb_id: uuid.UUID = Body(..., description="kb id"),
file_name: str = Body(..., description="file name"),
):
"""
create document
"""
body = await request.json()
create_data = document_schema.DocumentCreate(**body)
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await document_controller.create_document(create_data=create_data,
db=db,
current_user=current_user)
@router.get("/{document_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def get_document(
document_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Retrieve document information based on document_id
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await document_controller.get_document(document_id=document_id,
db=db,
current_user=current_user)
@router.put("/{document_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def update_document(
document_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
file_name: str = Body(None, description="file name (optional)"),
):
"""
Update document information
"""
body = await request.json()
update_data = document_schema.DocumentUpdate(**body)
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await document_controller.update_document(document_id=document_id,
update_data=update_data,
db=db,
current_user=current_user)
@router.delete("/{document_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def delete_document(
document_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Delete document
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await document_controller.delete_document(document_id=document_id,
db=db,
current_user=current_user)
@router.post("/{document_id}/chunks", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def parse_documents(
document_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
parse document
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await document_controller.parse_documents(document_id=document_id,
db=db,
current_user=current_user)

View File

@@ -0,0 +1,198 @@
"""RAG 服务接口 - 基于 API Key 认证"""
from typing import Any, Optional
import uuid
from fastapi import APIRouter, Body, Depends, Request, Query, File, UploadFile
from sqlalchemy.orm import Session
from app.controllers import file_controller
from app.core.api_key_auth import require_api_key
from app.core.logging_config import get_business_logger
from app.db import get_db
from app.schemas import file_schema
from app.schemas.api_key_schema import ApiKeyAuth
from app.schemas.response_schema import ApiResponse
from app.services import api_key_service
router = APIRouter(prefix="/files", tags=["V1 - RAG API"])
api_logger = get_business_logger()
@router.get("/{kb_id}/{parent_id}/files", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def get_files(
kb_id: uuid.UUID,
parent_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
page: int = Query(1, gt=0), # Default: 1, which must be greater than 0
pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items
orderby: Optional[str] = Query(None, description="Sort fields, such as: created_at"),
desc: Optional[bool] = Query(False, description="Is it descending order"),
keywords: Optional[str] = Query(None, description="Search keywords (file name)"),
):
"""
Paged query file list
- Support filtering by kb_id and parent_id
- Support keyword search for file names
- Support dynamic sorting
- Return paging metadata + file list
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id=api_key_auth.workspace_id
return await file_controller.get_files(kb_id=kb_id,
parent_id=parent_id,
page=page,
pagesize=pagesize,
orderby=orderby,
desc=desc,
keywords=keywords,
db=db,
current_user=current_user)
@router.post("/folder", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def create_folder(
kb_id: uuid.UUID,
parent_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
folder_name: str = '/'
):
"""
Create a new folder
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await file_controller.create_folder(kb_id=kb_id,
parent_id=parent_id,
folder_name=folder_name,
db=db,
current_user=current_user)
@router.post("/file", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def upload_file(
kb_id: uuid.UUID,
parent_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
file: UploadFile = File(...),
):
"""
upload file
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await file_controller.upload_file(kb_id=kb_id,
parent_id=parent_id,
file=file,
db=db,
current_user=current_user)
@router.post("/customtext", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def custom_text(
kb_id: uuid.UUID,
parent_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
title: str = Body(..., description="title"),
content: str = Body(..., description="content"),
):
"""
custom text
"""
body = await request.json()
create_data = file_schema.CustomTextFileCreate(**body)
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await file_controller.custom_text(kb_id=kb_id,
parent_id=parent_id,
create_data=create_data,
db=db,
current_user=current_user)
@router.get("/{file_id}", response_model=Any)
async def get_file(
file_id: uuid.UUID,
db: Session = Depends(get_db)
) -> Any:
"""
Download the file based on the file_id
- Query file information from the database
- Construct the file path and check if it exists
- Return a FileResponse to download the file
"""
return await file_controller.get_file(file_id=file_id,
db=db)
@router.put("/{file_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def update_file(
file_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
file_name: str = Body(None, description="file name (optional)"),
):
"""
Update file information (such as file name)
- Only specified fields such as file_name are allowed to be modified
"""
body = await request.json()
update_data = file_schema.FileUpdate(**body)
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await file_controller.update_file(file_id=file_id,
update_data=update_data,
db=db,
current_user=current_user)
@router.delete("/{file_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def delete_file(
file_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Delete a file or folder
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await file_controller.delete_file(file_id=file_id,
db=db,
current_user=current_user)

View File

@@ -0,0 +1,248 @@
"""RAG 服务接口 - 基于 API Key 认证"""
from typing import Optional, Dict
import uuid
from fastapi import APIRouter, Body, Depends, Request, Query
from sqlalchemy.orm import Session
from app.controllers import knowledge_controller
from app.core.api_key_auth import require_api_key
from app.core.logging_config import get_business_logger
from app.core.response_utils import success
from app.db import get_db
from app.models import knowledge_model
from app.schemas import knowledge_schema
from app.schemas.api_key_schema import ApiKeyAuth
from app.schemas.response_schema import ApiResponse
from app.services import api_key_service
router = APIRouter(prefix="/knowledges", tags=["V1 - RAG API"])
api_logger = get_business_logger()
@router.get("/knowledgetype", response_model=ApiResponse)
def get_knowledge_types():
return success(msg="Successfully obtained the knowledge type", data=list(knowledge_model.KnowledgeType))
@router.get("/permissiontype", response_model=ApiResponse)
def get_permission_types():
return success(msg="Successfully obtained the knowledge permission type", data=list(knowledge_model.PermissionType))
@router.get("/parsertype", response_model=ApiResponse)
def get_parser_types():
return success(msg="Successfully obtained the knowledge parser type", data=list(knowledge_model.ParserType))
@router.get("/knowledge_graph_entity_types", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def get_knowledge_graph_entity_types(
llm_id: uuid.UUID,
scenario: str,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
get knowledge graph entity types based on llm_id
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await knowledge_controller.get_knowledge_graph_entity_types(llm_id=llm_id,
scenario=scenario,
db=db,
current_user=current_user)
@router.get("/knowledges", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def get_knowledges(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
parent_id: Optional[uuid.UUID] = Query(None, description="parent folder id"),
page: int = Query(1, gt=0), # Default: 1, which must be greater than 0
pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items
orderby: Optional[str] = Query(None, description="Sort fields, such as: created_at,updated_at"),
desc: Optional[bool] = Query(False, description="Is it descending order"),
keywords: Optional[str] = Query(None, description="Search keywords (knowledge base name)"),
kb_ids: Optional[str] = Query(None, description="Knowledge base ids, separated by commas")
):
"""
Query the knowledge base list in pages
- Support filtering by parent_id
- Support keyword search for knowledge base names
- Support dynamic sorting
- Return paging metadata + file list
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await knowledge_controller.get_knowledges(parent_id=parent_id,
page=page,
pagesize=pagesize,
orderby=orderby,
desc=desc,
keywords=keywords,
kb_ids=kb_ids,
db=db,
current_user=current_user)
@router.post("/knowledge", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def create_knowledge(
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
name: str = Body(..., description="KB name"),
):
"""
create knowledge
"""
body = await request.json()
create_data = knowledge_schema.KnowledgeCreate(**body)
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await knowledge_controller.create_knowledge(create_data=create_data,
db=db,
current_user=current_user)
@router.get("/{knowledge_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def get_knowledge(
knowledge_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Retrieve knowledge base information based on knowledge_id
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await knowledge_controller.get_knowledge(knowledge_id=knowledge_id,
db=db,
current_user=current_user)
@router.put("/{knowledge_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def update_knowledge(
knowledge_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
name: str = Body(None, description="KB name (optional)"),
):
body = await request.json()
update_data = knowledge_schema.KnowledgeUpdate(**body)
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await knowledge_controller.update_knowledge(knowledge_id=knowledge_id,
update_data=update_data,
db=db,
current_user=current_user)
@router.delete("/{knowledge_id}", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def delete_knowledge(
knowledge_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Soft-delete knowledge base
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await knowledge_controller.delete_knowledge(knowledge_id=knowledge_id,
db=db,
current_user=current_user)
@router.get("/{knowledge_id}/knowledge_graph", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def get_knowledge_graph(
knowledge_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
Retrieve knowledge_graph base information based on knowledge_id
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await knowledge_controller.get_knowledge_graph(knowledge_id=knowledge_id,
db=db,
current_user=current_user)
@router.delete("/{knowledge_id}/knowledge_graph", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def delete_knowledge_graph(
knowledge_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
delete knowledge graph
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await knowledge_controller.delete_knowledge_graph(knowledge_id=knowledge_id,
db=db,
current_user=current_user)
@router.post("/{knowledge_id}/knowledge_graph", response_model=ApiResponse)
@require_api_key(scopes=["rag"])
async def rebuild_knowledge_graph(
knowledge_id: uuid.UUID,
request: Request,
api_key_auth: ApiKeyAuth = None,
db: Session = Depends(get_db),
):
"""
rebuild knowledge graph
"""
# 0. Obtain the creator of the api key
api_key = api_key_service.ApiKeyService.get_api_key(db, api_key_auth.api_key_id, api_key_auth.workspace_id)
current_user = api_key.creator
current_user.current_workspace_id = api_key_auth.workspace_id
return await knowledge_controller.rebuild_knowledge_graph(knowledge_id=knowledge_id,
db=db,
current_user=current_user)