[ADD]Add functions related to knowledge base graph:

Add functions related to knowledge base graph:
1. Entity type generation,
2. Knowledge base graph acquisition,
3. Hard deletion of knowledge base graph,
4. Knowledge base graph reconstruction (asynchronous)
This commit is contained in:
lixiangcheng1
2025-12-27 13:53:10 +08:00
parent 06f64809c3
commit a0c362244e
35 changed files with 6267 additions and 143 deletions

View File

@@ -1,5 +1,6 @@
from typing import Optional
import datetime
import json
import uuid
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy import or_
@@ -13,8 +14,13 @@ from app.schemas import knowledge_schema
from app.schemas.response_schema import ApiResponse
from app.core.response_utils import success
from app.services import knowledge_service, document_service
from app.core.rag.llm.chat_model import Base
from app.core.rag.prompts.generator import graph_entity_types
from app.core.rag.vdb.elasticsearch.elasticsearch_vector import ElasticSearchVectorFactory
from app.core.logging_config import get_api_logger
from app.core.rag.nlp import rag_tokenizer, search
from app.core.rag.common import settings
from app.celery_app import celery_app
# Obtain a dedicated API logger
api_logger = get_api_logger()
@@ -306,3 +312,171 @@ async def delete_knowledge(
except Exception as e:
api_logger.error(f"Failed to delete from the knowledge base: knowledge_id={knowledge_id} - {str(e)}")
raise
@router.get("/{knowledge_id}/knowledge_graph", response_model=ApiResponse)
async def get_knowledge_graph(
knowledge_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Retrieve knowledge_graph base information based on knowledge_id
"""
api_logger.info(f"Obtain details of the knowledge graph: knowledge_id={knowledge_id}, username: {current_user.username}")
try:
# 1. Query knowledge base information from the database
api_logger.debug(f"Query knowledge base: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(f"The knowledge base does not exist or access is denied: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or access is denied"
)
req = {
"kb_id": [str(db_knowledge.id)],
"knowledge_graph_kwd": ["graph"]
}
obj = {"graph": {}, "mind_map": {}}
if not settings.docStoreConn.indexExist(search.index_name(str(db_knowledge.workspace_id)), str(db_knowledge.id)):
return success(data=obj, msg="Successfully obtained knowledge graph information")
sres = settings.retriever.search(req, search.index_name(str(db_knowledge.workspace_id)), [str(db_knowledge.id)])
if not len(sres.ids):
return success(data=obj, msg="Successfully obtained knowledge graph information")
for id in sres.ids[:1]:
ty = sres.field[id]["knowledge_graph_kwd"]
try:
content_json = json.loads(sres.field[id]["page_content"])
except Exception:
continue
obj[ty] = content_json
if "nodes" in obj["graph"]:
obj["graph"]["nodes"] = sorted(obj["graph"]["nodes"], key=lambda x: x.get("pagerank", 0), reverse=True)[:256]
if "edges" in obj["graph"]:
node_id_set = {o["id"] for o in obj["graph"]["nodes"]}
filtered_edges = [o for o in obj["graph"]["edges"] if o["source"] != o["target"] and o["source"] in node_id_set and o["target"] in node_id_set]
obj["graph"]["edges"] = sorted(filtered_edges, key=lambda x: x.get("weight", 0), reverse=True)[:128]
return success(data=obj, msg="Successfully obtained knowledge graph information")
except HTTPException:
raise
except Exception as e:
api_logger.error(f"Knowledge graph query failed: knowledge_id={knowledge_id} - {str(e)}")
raise
@router.delete("/{knowledge_id}/knowledge_graph", response_model=ApiResponse)
async def delete_knowledge_graph(
knowledge_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Soft-delete knowledge graph
"""
api_logger.info(f"Request to delete knowledge graph: knowledge_id={knowledge_id}, username: {current_user.username}")
try:
# 1. Check whether the knowledge base exists
api_logger.debug(f"Check whether the knowledge base exists: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(f"The knowledge base does not exist or you do not have permission to access it: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or you do not have permission to access it"
)
# 2. delete knowledge graph
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(str(db_knowledge.workspace_id)), str(db_knowledge.id))
api_logger.info(f"The knowledge graph has been successfully deleted: {db_knowledge.name} (ID: {knowledge_id})")
return success(msg="The knowledge graph has been successfully deleted")
except Exception as e:
api_logger.error(f"Failed to delete from the knowledge base: knowledge_id={knowledge_id} - {str(e)}")
raise
@router.post("/{knowledge_id}/knowledge_graph", response_model=ApiResponse)
async def rebuild_knowledge_graph(
knowledge_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
rebuild knowledge graph
"""
api_logger.info(f"Request to rebuild knowledge graph: knowledge_id={knowledge_id}, username: {current_user.username}")
try:
# 1. Check whether the knowledge base exists
api_logger.debug(f"Check whether the knowledge base exists: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(
f"The knowledge base does not exist or you do not have permission to access it: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or you do not have permission to access it"
)
# 2. delete knowledge graph
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(str(db_knowledge.workspace_id)), str(db_knowledge.id))
# 3. build knowledge graph
# from app.tasks import build_graphrag_for_kb
# build_graphrag_for_kb(kb_id)
task = celery_app.send_task("app.core.rag.tasks.build_graphrag_for_kb", args=[knowledge_id])
result = {
"task_id": task.id
}
return success(data=result, msg="Task accepted. rebuild knowledge graph is being processed in the background.")
except Exception as e:
api_logger.error(f"Failed to rebuild knowledge graph: knowledge_id={knowledge_id} - {str(e)}")
raise
@router.get("/{knowledge_id}/knowledge_graph_entity_types", response_model=ApiResponse)
async def get_knowledge_graph_entity_types(
knowledge_id: uuid.UUID,
scenario: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
get knowledge graph entity types based on knowledge_id
"""
api_logger.info(f"Obtain details of the knowledge graph: knowledge_id={knowledge_id}, username: {current_user.username}")
try:
# 1. Check whether the knowledge base exists
api_logger.debug(f"Check whether the knowledge base exists: {knowledge_id}")
db_knowledge = knowledge_service.get_knowledge_by_id(db, knowledge_id=knowledge_id, current_user=current_user)
if not db_knowledge:
api_logger.warning(
f"The knowledge base does not exist or you do not have permission to access it: knowledge_id={knowledge_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="The knowledge base does not exist or you do not have permission to access it"
)
# 2. Prepare to configure chat_mdl information
chat_model = Base(
key=db_knowledge.llm.api_keys[0].api_key,
model_name=db_knowledge.llm.api_keys[0].model_name,
base_url=db_knowledge.llm.api_keys[0].api_base
)
response = graph_entity_types(chat_model, scenario)
return success(data=response, msg="Successfully obtained knowledge graph entity types")
except HTTPException:
raise
except Exception as e:
api_logger.error(f"get knowledge graph entity types failed: knowledge_id={knowledge_id} - {str(e)}")
raise