Files
MemoryBear/api/app/controllers/knowledge_controller.py
lixiangcheng1 db46c186aa [ADD]Three party synchronization
1. Three party web website data access - Web site synchronization
Building a knowledge base by crawling web page data in batches through web crawlers
Web site synchronization utilizes crawler technology, which can automatically capture all websites under the same domain name through a single entry website. Currently, it supports up to 200 subpages. For compliance and security reasons, only static site crawling is supported, mainly used for quickly building knowledge bases on various document sites.
2. Feishu Knowledge Base
By configuring Feishu document permissions, a knowledge base can be built using Feishu documents, and the documents will not undergo secondary storage
3. Language Bird Knowledge Base
You can configure the permissions of the language bird document to build a knowledge base using the language bird document, and the document will not undergo secondary storage
2026-02-06 12:18:40 +08:00

586 lines
25 KiB
Python

import datetime
import json
from typing import Optional
import uuid
from fastapi import APIRouter, Depends, HTTPException, status, Query
from fastapi.encoders import jsonable_encoder
from sqlalchemy import or_
from sqlalchemy.orm import Session
from app.celery_app import celery_app
from app.core.error_codes import BizCode
from app.core.logging_config import get_api_logger
from app.core.rag.common import settings
from app.core.rag.integrations.feishu.client import FeishuAPIClient
from app.core.rag.integrations.yuque.client import YuqueAPIClient
from app.core.rag.llm.chat_model import Base
from app.core.rag.nlp import rag_tokenizer, search
from app.core.rag.prompts.generator import graph_entity_types
from app.core.rag.vdb.elasticsearch.elasticsearch_vector import ElasticSearchVectorFactory
from app.core.response_utils import success, fail
from app.db import get_db
from app.dependencies import get_current_user
from app.models import knowledge_model
from app.models.user_model import User
from app.schemas import knowledge_schema
from app.schemas.response_schema import ApiResponse
from app.services import knowledge_service, document_service
from app.services.model_service import ModelConfigService
# Obtain a dedicated API logger
api_logger = get_api_logger()
router = APIRouter(
prefix="/knowledges",
tags=["knowledges"],
dependencies=[Depends(get_current_user)] # Apply auth to all routes in this controller
)
@router.get("/knowledgetype", response_model=ApiResponse)
def get_knowledge_types():
return success(msg="Successfully obtained the knowledge type", data=list(knowledge_model.KnowledgeType))
@router.get("/permissiontype", response_model=ApiResponse)
def get_permission_types():
return success(msg="Successfully obtained the knowledge permission type", data=list(knowledge_model.PermissionType))
@router.get("/parsertype", response_model=ApiResponse)
def get_parser_types():
return success(msg="Successfully obtained the knowledge parser type", data=list(knowledge_model.ParserType))
@router.get("/knowledge_graph_entity_types", response_model=ApiResponse)
async def get_knowledge_graph_entity_types(
llm_id: uuid.UUID,
scenario: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
get knowledge graph entity types based on llm_id
"""
api_logger.info(f"Obtain details of the knowledge graph: llm_id={llm_id}, username: {current_user.username}")
try:
# 1. Check whether the model exists
api_logger.debug(f"Check whether the model exists: {llm_id}")
config = ModelConfigService.get_model_by_id(db=db, model_id=llm_id)
if not config:
api_logger.warning(
f"The model does not exist or you do not have permission to access it: llm_id={llm_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The model does not exist or you do not have permission to access it"
)
# 2. Prepare to configure chat_mdl information
chat_model = Base(
key=config.api_keys[0].api_key,
model_name=config.api_keys[0].model_name,
base_url=config.api_keys[0].api_base
)
response = graph_entity_types(chat_model, scenario)
return success(data=response, msg="Successfully obtained knowledge graph entity types")
except HTTPException:
raise
except Exception as e:
api_logger.error(f"get knowledge graph entity types failed: llm_id={llm_id} - {str(e)}")
raise
@router.get("/knowledges", response_model=ApiResponse)
async def get_knowledges(
parent_id: Optional[uuid.UUID] = Query(None, description="parent folder id"),
page: int = Query(1, gt=0), # Default: 1, which must be greater than 0
pagesize: int = Query(20, gt=0, le=100), # Default: 20 items per page, maximum: 100 items
orderby: Optional[str] = Query(None, description="Sort fields, such as: created_at,updated_at"),
desc: Optional[bool] = Query(False, description="Is it descending order"),
keywords: Optional[str] = Query(None, description="Search keywords (knowledge base name)"),
kb_ids: Optional[str] = Query(None, description="Knowledge base ids, separated by commas"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Query the knowledge base list in pages
- Support filtering by parent_id
- Support keyword search for knowledge base names
- Support dynamic sorting
- Return paging metadata + file list
"""
api_logger.info(f"Query knowledge base list: workspace_id={current_user.current_workspace_id}, page={page}, pagesize={pagesize}, keywords={keywords}, kb_ids={kb_ids}, username: {current_user.username}")
# 1. parameter validation
if page < 1 or pagesize < 1:
api_logger.warning(f"Error in paging parameters: page={page}, pagesize={pagesize}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The paging parameter must be greater than 0"
)
# 2. Construct query conditions
filters = [
knowledge_model.Knowledge.workspace_id == current_user.current_workspace_id
]
# Keyword search (fuzzy matching of knowledge base name)
if keywords:
api_logger.debug(f"Add keyword search criteria: {keywords}")
filters.append(
or_(
knowledge_model.Knowledge.name.ilike(f"%{keywords}%"),
knowledge_model.Knowledge.description.ilike(f"%{keywords}%")
)
)
# Knowledge base ids
if kb_ids:
filters.append(knowledge_model.Knowledge.id.in_(kb_ids.split(',')))
else:
filters.append(knowledge_model.Knowledge.status != 2)
if parent_id:
filters.append(knowledge_model.Knowledge.parent_id == parent_id)
else:
filters.append(knowledge_model.Knowledge.parent_id == current_user.current_workspace_id)
filters.append(knowledge_model.Knowledge.permission_id != knowledge_model.PermissionType.Memory)
# 3. Execute paged query
try:
api_logger.debug("Start executing knowledge base paging query")
total, items = knowledge_service.get_knowledges_paginated(
db=db,
filters=filters,
page=page,
pagesize=pagesize,
orderby=orderby,
desc=desc,
current_user=current_user
)
api_logger.info(f"Knowledge base query successful: total={total}, returned={len(items)} records")
except Exception as e:
api_logger.error(f"Knowledge base query failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Query failed: {str(e)}"
)
# 4. Return structured response
result = {
"items": items,
"page": {
"page": page,
"pagesize": pagesize,
"total": total,
"has_next": True if page*pagesize < total else False
}
}
return success(data=jsonable_encoder(result), msg="Query of knowledge base list successful")
@router.post("/knowledge", response_model=ApiResponse)
async def create_knowledge(
create_data: knowledge_schema.KnowledgeCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
create knowledge
"""
api_logger.info(f"Request to create a knowledge base: name={create_data.name}, workspace_id={current_user.current_workspace_id}, username: {current_user.username}")
try:
api_logger.debug(f"Start creating the knowledge base: {create_data.name}")
# 1. Check if the knowledge base name already exists
db_knowledge_exist = knowledge_service.get_knowledge_by_name(db, name=create_data.name, current_user=current_user)
if db_knowledge_exist:
api_logger.warning(f"The knowledge base name already exists: {create_data.name}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"The knowledge base name already exists: {create_data.name}"
)
db_knowledge = knowledge_service.create_knowledge(db=db, knowledge=create_data, current_user=current_user)
api_logger.info(f"The knowledge base has been successfully created: {db_knowledge.name} (ID: {db_knowledge.id})")
return success(data=jsonable_encoder(knowledge_schema.Knowledge.model_validate(db_knowledge)), msg="The knowledge base has been successfully created")
except Exception as e:
api_logger.error(f"The creation of the knowledge base failed: {create_data.name} - {str(e)}")
raise
@router.get("/{knowledge_id}", response_model=ApiResponse)
async def get_knowledge(
knowledge_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Retrieve knowledge base information based on knowledge_id
"""
api_logger.info(f"Obtain details of the knowledge base: knowledge_id={knowledge_id}, username: {current_user.username}")
try:
# 1. Query knowledge base information from the database
api_logger.debug(f"Query knowledge base: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(f"The knowledge base does not exist or access is denied: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or access is denied"
)
api_logger.info(f"Knowledge base query successful: {db_knowledge.name} (ID: {db_knowledge.id})")
return success(data=jsonable_encoder(knowledge_schema.Knowledge.model_validate(db_knowledge)), msg="Successfully obtained knowledge base information")
except HTTPException:
raise
except Exception as e:
api_logger.error(f"Knowledge base query failed: knowledge_id={knowledge_id} - {str(e)}")
raise
@router.put("/{knowledge_id}", response_model=ApiResponse)
async def update_knowledge(
knowledge_id: uuid.UUID,
update_data: knowledge_schema.KnowledgeUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
api_logger.info(f"Update knowledge base request: knowledge_id={knowledge_id}, username: {current_user.username}")
db_knowledge = await _update_knowledge(knowledge_id=knowledge_id, update_data=update_data, db=db, current_user=current_user)
return success(data=jsonable_encoder(knowledge_schema.Knowledge.model_validate(db_knowledge)), msg="The knowledge base information has been successfully updated")
async def _update_knowledge(
knowledge_id: uuid.UUID,
update_data: knowledge_schema.KnowledgeUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
) -> knowledge_schema.Knowledge:
"""
Update knowledge base information
"""
try:
# 1. Check whether the knowledge base exists
api_logger.debug(f"Query the knowledge base to be updated: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(f"The knowledge base does not exist or you do not have permission to access it: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or you do not have permission to access it"
)
# 2. If updating the embedding_id, delete the knowledge base vector index, reset all document parsing progress to 0, and set chunk_num to 0
update_dict = update_data.dict(exclude_unset=True)
if "name" in update_dict:
name = update_dict["name"]
if name != db_knowledge.name:
# Check if the knowledge base name already exists
db_knowledge_exist = knowledge_service.get_knowledge_by_name(db, name=name, current_user=current_user)
if db_knowledge_exist:
api_logger.warning(f"The knowledge base name already exists: {name}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"The knowledge base name already exists: {name}"
)
if "embedding_id" in update_dict:
embedding_id = update_dict["embedding_id"]
if embedding_id != db_knowledge.embedding_id:
if db_knowledge.embedding_id and db_knowledge.reranker_id:
vector_service = ElasticSearchVectorFactory().init_vector(knowledge=db_knowledge)
vector_service.delete()
document_service.reset_documents_progress_by_kb_id(db, kb_id=db_knowledge.id, current_user=current_user)
# 2. Update fields (only update non-null fields)
api_logger.debug(f"Start updating the knowledge base fields: {knowledge_id}")
updated_fields = []
for field, value in update_data.dict(exclude_unset=True).items():
if hasattr(db_knowledge, field):
old_value = getattr(db_knowledge, field)
if old_value != value:
# update value
setattr(db_knowledge, field, value)
updated_fields.append(f"{field}: {old_value} -> {value}")
if updated_fields:
api_logger.debug(f"updated fields: {', '.join(updated_fields)}")
db_knowledge.updated_at = datetime.datetime.now()
# 3. Save to database
db.commit()
db.refresh(db_knowledge)
api_logger.info(f"The knowledge base has been successfully updated: {db_knowledge.name} (ID: {db_knowledge.id})")
# 4. Return the updated knowledge base
return db_knowledge
except HTTPException:
raise
except Exception as e:
db.rollback()
api_logger.error(f"Knowledge base update failed: knowledge_id={knowledge_id} - {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Knowledge base update failed: {str(e)}"
)
@router.delete("/{knowledge_id}", response_model=ApiResponse)
async def delete_knowledge(
knowledge_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Soft-delete knowledge base
"""
api_logger.info(f"Request to delete knowledge base: knowledge_id={knowledge_id}, username: {current_user.username}")
try:
# 1. Check whether the knowledge base exists
api_logger.debug(f"Check whether the knowledge base exists: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(f"The knowledge base does not exist or you do not have permission to access it: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or you do not have permission to access it"
)
# 2. Soft-delete knowledge base
api_logger.debug(f"Perform a soft delete: {db_knowledge.name} (ID: {knowledge_id})")
db_knowledge.status = 2
db.commit()
api_logger.info(f"The knowledge base has been successfully deleted: {db_knowledge.name} (ID: {knowledge_id})")
return success(msg="The knowledge base has been successfully deleted")
except Exception as e:
api_logger.error(f"Failed to delete from the knowledge base: knowledge_id={knowledge_id} - {str(e)}")
raise
@router.get("/{knowledge_id}/knowledge_graph", response_model=ApiResponse)
async def get_knowledge_graph(
knowledge_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Retrieve knowledge_graph base information based on knowledge_id
"""
api_logger.info(f"Obtain details of the knowledge graph: knowledge_id={knowledge_id}, username: {current_user.username}")
try:
# 1. Query knowledge base information from the database
api_logger.debug(f"Query knowledge base: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(f"The knowledge base does not exist or access is denied: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or access is denied"
)
req = {
"kb_id": [str(db_knowledge.id)],
"knowledge_graph_kwd": ["graph"]
}
obj = {"graph": {}, "mind_map": {}}
if not settings.docStoreConn.indexExist(search.index_name(str(db_knowledge.workspace_id)), str(db_knowledge.id)):
return success(data=obj, msg="Successfully obtained knowledge graph information")
sres = settings.retriever.search(req, search.index_name(str(db_knowledge.workspace_id)), [str(db_knowledge.id)])
if not len(sres.ids):
return success(data=obj, msg="Successfully obtained knowledge graph information")
for id in sres.ids[:1]:
ty = sres.field[id]["knowledge_graph_kwd"]
try:
content_json = json.loads(sres.field[id]["page_content"])
except Exception:
continue
obj[ty] = content_json
if "nodes" in obj["graph"]:
obj["graph"]["nodes"] = sorted(obj["graph"]["nodes"], key=lambda x: x.get("pagerank", 0), reverse=True)[:256]
if "edges" in obj["graph"]:
node_id_set = {o["id"] for o in obj["graph"]["nodes"]}
filtered_edges = [o for o in obj["graph"]["edges"] if o["source"] != o["target"] and o["source"] in node_id_set and o["target"] in node_id_set]
obj["graph"]["edges"] = sorted(filtered_edges, key=lambda x: x.get("weight", 0), reverse=True)[:128]
return success(data=obj, msg="Successfully obtained knowledge graph information")
except HTTPException:
raise
except Exception as e:
api_logger.error(f"Knowledge graph query failed: knowledge_id={knowledge_id} - {str(e)}")
raise
@router.delete("/{knowledge_id}/knowledge_graph", response_model=ApiResponse)
async def delete_knowledge_graph(
knowledge_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
delete knowledge graph
"""
api_logger.info(f"Request to delete knowledge graph: knowledge_id={knowledge_id}, username: {current_user.username}")
try:
# 1. Check whether the knowledge base exists
api_logger.debug(f"Check whether the knowledge base exists: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(f"The knowledge base does not exist or you do not have permission to access it: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or you do not have permission to access it"
)
# 2. delete knowledge graph
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(str(db_knowledge.workspace_id)), str(db_knowledge.id))
api_logger.info(f"The knowledge graph has been successfully deleted: {db_knowledge.name} (ID: {knowledge_id})")
return success(msg="The knowledge graph has been successfully deleted")
except Exception as e:
api_logger.error(f"Failed to delete from the knowledge base: knowledge_id={knowledge_id} - {str(e)}")
raise
@router.post("/{knowledge_id}/knowledge_graph", response_model=ApiResponse)
async def rebuild_knowledge_graph(
knowledge_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
rebuild knowledge graph
"""
api_logger.info(f"Request to rebuild knowledge graph: knowledge_id={knowledge_id}, username: {current_user.username}")
try:
# 1. Check whether the knowledge base exists
api_logger.debug(f"Check whether the knowledge base exists: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(
f"The knowledge base does not exist or you do not have permission to access it: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or you do not have permission to access it"
)
# 2. delete knowledge graph
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(str(db_knowledge.workspace_id)), str(db_knowledge.id))
# 3. build knowledge graph
# from app.tasks import build_graphrag_for_kb
# build_graphrag_for_kb(kb_id)
task = celery_app.send_task("app.core.rag.tasks.build_graphrag_for_kb", args=[knowledge_id])
result = {
"task_id": task.id
}
return success(data=result, msg="Task accepted. rebuild knowledge graph is being processed in the background.")
except Exception as e:
api_logger.error(f"Failed to rebuild knowledge graph: knowledge_id={knowledge_id} - {str(e)}")
raise
@router.get("/check/yuque/auth", response_model=ApiResponse)
async def check_yuque_auth(
yuque_user_id: str,
yuque_token: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
check yuque auth info
"""
api_logger.info(f"check yuque auth info, username: {current_user.username}")
try:
api_client = YuqueAPIClient(
user_id=yuque_user_id,
token=yuque_token
)
async with api_client as client:
repos = await client.get_user_repos()
if repos:
return success(data=repos, msg="Successfully auth yuque info")
return fail(BizCode.UNAUTHORIZED, msg="auth yuque info failed", error="user_id or token is incorrect")
except HTTPException:
raise
except Exception as e:
api_logger.error(f"auth yuque info failed: {str(e)}")
raise
@router.get("/check/feishu/auth", response_model=ApiResponse)
async def check_yuque_auth(
feishu_app_id: str,
feishu_app_secret: str,
feishu_folder_token: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
check feishu auth info
"""
api_logger.info(f"check feishu auth info, username: {current_user.username}")
try:
api_client = FeishuAPIClient(
app_id=feishu_app_id,
app_secret=feishu_app_secret
)
async with api_client as client:
files = await client.list_all_folder_files(feishu_folder_token, recursive=True)
if files:
return success(data=files, msg="Successfully auth feishu info")
return fail(BizCode.UNAUTHORIZED, msg="auth feishu info failed", error="app_id or app_secret or feishu_folder_token is incorrect")
except HTTPException:
raise
except Exception as e:
api_logger.error(f"auth feishu info failed: {str(e)}")
raise
@router.post("/{knowledge_id}/sync", response_model=ApiResponse)
async def sync_knowledge(
knowledge_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
sync knowledge base information based on knowledge_id
"""
api_logger.info(f"Obtain details of the knowledge base: knowledge_id={knowledge_id}, username: {current_user.username}")
try:
# 1. Query knowledge base information from the database
api_logger.debug(f"Query knowledge base: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(f"The knowledge base does not exist or access is denied: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or access is denied"
)
# 2. sync knowledge
# from app.tasks import sync_knowledge_for_kb
# sync_knowledge_for_kb(kb_id)
task = celery_app.send_task("app.core.rag.tasks.sync_knowledge_for_kb", args=[knowledge_id])
result = {
"task_id": task.id
}
return success(data=result, msg="Task accepted. sync knowledge is being processed in the background.")
except HTTPException:
raise
except Exception as e:
api_logger.error(f"Failed to sync knowledge: knowledge_id={knowledge_id} - {str(e)}")
raise