[ADD]Three party synchronization

1. Three party web website data access - Web site synchronization
Building a knowledge base by crawling web page data in batches through web crawlers
Web site synchronization utilizes crawler technology, which can automatically capture all websites under the same domain name through a single entry website. Currently, it supports up to 200 subpages. For compliance and security reasons, only static site crawling is supported, mainly used for quickly building knowledge bases on various document sites.
2. Feishu Knowledge Base
By configuring Feishu document permissions, a knowledge base can be built using Feishu documents, and the documents will not undergo secondary storage
3. Language Bird Knowledge Base
You can configure the permissions of the language bird document to build a knowledge base using the language bird document, and the document will not undergo secondary storage
This commit is contained in:
lixiangcheng1
2026-02-06 12:18:40 +08:00
parent c1941809e9
commit db46c186aa
30 changed files with 3422 additions and 1 deletions

View File

@@ -7,6 +7,8 @@ import uuid
from uuid import UUID
from datetime import datetime, timezone
from math import ceil
from pathlib import Path
import shutil
from typing import Any, Dict, List, Optional
import redis
@@ -16,8 +18,13 @@ import trio
# Import a unified Celery instance
from app.celery_app import celery_app
from app.core.config import settings
from app.core.rag.crawler.web_crawler import WebCrawler
from app.core.rag.graphrag.general.index import init_graphrag, run_graphrag_for_kb
from app.core.rag.graphrag.utils import get_llm_cache, set_llm_cache
from app.core.rag.integrations.feishu.client import FeishuAPIClient
from app.core.rag.integrations.feishu.models import FileInfo
from app.core.rag.integrations.yuque.client import YuqueAPIClient
from app.core.rag.integrations.yuque.models import YuqueDocInfo
from app.core.rag.llm.chat_model import Base
from app.core.rag.llm.cv_model import QWenCV
from app.core.rag.llm.embedding_model import OpenAIEmbed
@@ -29,7 +36,9 @@ from app.core.rag.vdb.elasticsearch.elasticsearch_vector import (
)
from app.db import get_db, get_db_context
from app.models.document_model import Document
from app.models.file_model import File
from app.models.knowledge_model import Knowledge
from app.schemas import file_schema, document_schema
from app.services.memory_agent_service import MemoryAgentService
@@ -382,6 +391,480 @@ def build_graphrag_for_kb(kb_id: uuid.UUID):
db.close()
@celery_app.task(name="app.core.rag.tasks.sync_knowledge_for_kb")
def sync_knowledge_for_kb(kb_id: uuid.UUID):
"""
sync knowledge document and Document parsing, vectorization, and storage
"""
db = next(get_db()) # Manually call the generator
db_knowledge = None
try:
db_knowledge = db.query(Knowledge).filter(Knowledge.id == kb_id).first()
# 1. get vector_service
vector_service = ElasticSearchVectorFactory().init_vector(knowledge=db_knowledge)
# 2. sync data
match db_knowledge.type:
case "Web": # Crawl webpages in batches through a web crawler
entry_url = db_knowledge.parser_config.get("entry_url", "")
max_pages = db_knowledge.parser_config.get("max_pages", 20)
delay_seconds = db_knowledge.parser_config.get("delay_seconds", 1.0)
timeout_seconds = db_knowledge.parser_config.get("timeout_seconds", 10)
user_agent = db_knowledge.parser_config.get("user_agent", "KnowledgeBaseCrawler/1.0")
# Create crawler
crawler = WebCrawler(
entry_url=entry_url,
max_pages=max_pages,
delay_seconds=delay_seconds,
timeout_seconds=timeout_seconds,
user_agent=user_agent
)
try:
# 初始化存储已爬取 URLs 的集合
file_urls = set()
# crawl entry_url by yield
for crawled_document in crawler.crawl():
file_urls.add(crawled_document.url)
db_file = db.query(File).filter(File.kb_id == db_knowledge.id,
File.file_url == crawled_document.url).first()
if db_file:
if db_file.file_size == crawled_document.content_length: # same
continue
else: # --update
if crawled_document.content_length:
# 1. update file
db_file.file_name = f"{crawled_document.title}.txt"
db_file.file_ext=".txt"
db_file.file_size=crawled_document.content_length
db.commit()
db.refresh(db_file)
# Construct a save path/files/{kb_id}/{parent_id}/{file.id}{file_extension}
save_dir = os.path.join(settings.FILE_PATH, str(db_knowledge.id), str(db_knowledge.parent_id))
Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists
save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}")
# update file
if os.path.exists(save_path):
os.remove(save_path) # Delete a single file
content_bytes = crawled_document.content.encode('utf-8')
with open(save_path, "wb") as f:
f.write(content_bytes)
# 2. update a document
db_document = db.query(Document).filter(Document.kb_id == db_knowledge.id,
Document.file_id == db_file.id).first()
if db_document:
db_document.file_name = db_file.file_name
db_document.file_ext = db_file.file_ext
db_document.file_size = db_file.file_size
db_document.updated_at = datetime.now()
db.commit()
db.refresh(db_document)
# 3. Document parsing, vectorization, and storage
parse_document(file_path=save_path, document_id=db_document.id)
else: # --add
if crawled_document.content_length:
# 1. upload file
upload_file = file_schema.FileCreate(
kb_id=db_knowledge.id,
created_by=db_knowledge.created_by,
parent_id=db_knowledge.id,
file_name=f"{crawled_document.title}.txt",
file_ext=".txt",
file_size=crawled_document.content_length,
file_url=crawled_document.url,
)
db_file = File(**upload_file.model_dump())
db.add(db_file)
db.commit()
# Construct a save path/files/{kb_id}/{parent_id}/{file.id}{file_extension}
save_dir = os.path.join(settings.FILE_PATH, str(db_knowledge.id), str(db_knowledge.id))
Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists
save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}")
# Save file
content_bytes = crawled_document.content.encode('utf-8')
with open(save_path, "wb") as f:
f.write(content_bytes)
# 2. Create a document
create_document_data = document_schema.DocumentCreate(
kb_id=db_knowledge.id,
created_by=db_knowledge.created_by,
file_id=db_file.id,
file_name=db_file.file_name,
file_ext=db_file.file_ext,
file_size=db_file.file_size,
file_meta={},
parser_id="naive",
parser_config={
"layout_recognize": "DeepDOC",
"chunk_token_num": 128,
"delimiter": "\n",
"auto_keywords": 0,
"auto_questions": 0,
"html4excel": "false"
}
)
db_document = Document(**create_document_data.model_dump())
db.add(db_document)
db.commit()
# 3. Document parsing, vectorization, and storage
parse_document(file_path=save_path, document_id=db_document.id)
db_files = db.query(File).filter(File.kb_id == db_knowledge.id, File.file_url.notin_(file_urls)).all()
if db_files: # --delete
for db_file in db_files:
db_document = db.query(Document).filter(Document.kb_id == db_knowledge.id,
Document.file_id == db_file.id).first()
if db_document:
# 1. Delete vector index
vector_service.delete_by_metadata_field(key="document_id", value=str(db_document.id))
# 2. Delete document
db.delete(db_document)
# 3. Delete file
file_path = Path(
settings.FILE_PATH,
str(db_file.kb_id),
str(db_file.parent_id),
f"{db_file.id}{db_file.file_ext}"
)
if file_path.exists():
file_path.unlink() # Delete a single file
db.delete(db_file)
# commit transaction
db.commit()
except Exception as e:
print(f"\n\nError during crawl: {e}")
case "Third-party": # Integration of knowledge bases from three parties
yuque_user_id = db_knowledge.parser_config.get("yuque_user_id", "")
feishu_app_id = db_knowledge.parser_config.get("feishu_app_id", "")
if yuque_user_id: # Yuque Knowledge Base
yuque_token = db_knowledge.parser_config.get("yuque_token", "")
# Create yuqueAPIClient
api_client = YuqueAPIClient(
user_id=yuque_user_id,
token=yuque_token
)
try:
# 初始化存储获取语雀 URLs 的集合
file_urls = set()
# Get all files from all repos
async def async_get_files(api_client: YuqueAPIClient):
async with api_client as client:
print("\n=== Fetching repositories ===")
repos = await client.get_user_repos()
print(f"Found {len(repos)} repositories:")
all_files = []
for repo in repos:
# Get documents from repository
print(f"\n=== Fetching documents from '{repo.name}' ===")
docs = await client.get_repo_docs(repo.id)
all_files.extend(docs)
return all_files
files = asyncio.run(async_get_files(api_client))
for doc in files:
file_urls.add(doc.slug)
db_file = db.query(File).filter(File.kb_id == db_knowledge.id,
File.file_url == doc.slug).first()
if db_file:
if db_file.created_at == doc.updated_at: # same
continue
else: # --update
# 1. update file
# Construct a save path/files/{kb_id}/{parent_id}/{file.id}{file_extension}
save_dir = os.path.join(settings.FILE_PATH, str(db_knowledge.id), str(db_knowledge.parent_id))
Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists
# download document from Feishu FileInfo
async def async_download_document(api_client: YuqueAPIClient, doc: YuqueDocInfo, save_dir: str):
async with api_client as client:
file_path = await client.download_document(doc, save_dir)
return file_path
file_path = asyncio.run(async_download_document(api_client, doc, save_dir))
save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}")
# update file
if os.path.exists(save_path):
os.remove(save_path) # Delete a single file
shutil.copyfile(file_path, save_path)
# update db_file
file_name = os.path.basename(file_path)
_, file_extension = os.path.splitext(file_name)
file_size = os.path.getsize(file_path)
db_file.file_name = file_name
db_file.file_ext = file_extension.lower()
db_file.file_size = file_size
db_file.created_at = doc.updated_at
db.commit()
db.refresh(db_file)
# 2. update a document
db_document = db.query(Document).filter(Document.kb_id == db_knowledge.id,
Document.file_id == db_file.id).first()
if db_document:
db_document.file_name = db_file.file_name
db_document.file_ext = db_file.file_ext
db_document.file_size = db_file.file_size
db_document.created_at = db_file.created_at
db_document.updated_at = datetime.now()
db.commit()
db.refresh(db_document)
# 3. Document parsing, vectorization, and storage
parse_document(file_path=save_path, document_id=db_document.id)
else: # --add
# 1. update file
# Construct a save path/files/{kb_id}/{parent_id}/{file.id}{file_extension}
save_dir = os.path.join(settings.FILE_PATH, str(db_knowledge.id), str(db_knowledge.parent_id))
Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists
# download document from Feishu FileInfo
async def async_download_document(api_client: YuqueAPIClient, doc: YuqueDocInfo, save_dir: str):
async with api_client as client:
file_path = await client.download_document(doc, save_dir)
return file_path
file_path = asyncio.run(async_download_document(api_client, doc, save_dir))
# add db_file
file_name = os.path.basename(file_path)
_, file_extension = os.path.splitext(file_name)
file_size = os.path.getsize(file_path)
upload_file = file_schema.FileCreate(
kb_id=db_knowledge.id,
created_by=db_knowledge.created_by,
parent_id=db_knowledge.id,
file_name=file_name,
file_ext=file_extension.lower(),
file_size=file_size,
file_url=doc.slug,
created_at=doc.updated_at
)
db_file = File(**upload_file.model_dump())
db.add(db_file)
db.commit()
# Save file
save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}")
# update file
if os.path.exists(save_path):
os.remove(save_path) # Delete a single file
shutil.copyfile(file_path, save_path)
# 2. Create a document
create_document_data = document_schema.DocumentCreate(
kb_id=db_knowledge.id,
created_by=db_knowledge.created_by,
file_id=db_file.id,
file_name=db_file.file_name,
file_ext=db_file.file_ext,
file_size=db_file.file_size,
file_meta={},
parser_id="naive",
parser_config={
"layout_recognize": "DeepDOC",
"chunk_token_num": 128,
"delimiter": "\n",
"auto_keywords": 0,
"auto_questions": 0,
"html4excel": "false"
}
)
db_document = Document(**create_document_data.model_dump())
db.add(db_document)
db.commit()
# 3. Document parsing, vectorization, and storage
parse_document(file_path=save_path, document_id=db_document.id)
db_files = db.query(File).filter(File.kb_id == db_knowledge.id,
File.file_url.notin_(file_urls)).all()
if db_files: # --delete
for db_file in db_files:
db_document = db.query(Document).filter(Document.kb_id == db_knowledge.id,
Document.file_id == db_file.id).first()
if db_document:
# 1. Delete vector index
vector_service.delete_by_metadata_field(key="document_id",
value=str(db_document.id))
# 2. Delete document
db.delete(db_document)
# 3. Delete file
file_path = Path(
settings.FILE_PATH,
str(db_file.kb_id),
str(db_file.parent_id),
f"{db_file.id}{db_file.file_ext}"
)
if file_path.exists():
file_path.unlink() # Delete a single file
db.delete(db_file)
# commit transaction
db.commit()
except Exception as e:
print(f"\n\nError during fetch feishu: {e}")
if feishu_app_id: # Feishu Knowledge Base
feishu_app_secret = db_knowledge.parser_config.get("feishu_app_secret", "")
feishu_folder_token = db_knowledge.parser_config.get("feishu_folder_token", "")
# Create feishuAPIClient
api_client = FeishuAPIClient(
app_id=feishu_app_id,
app_secret=feishu_app_secret
)
try:
# 初始化存储获取飞书 URLs 的集合
file_urls = set()
# Get all files from folder
async def async_get_files(api_client: FeishuAPIClient, feishu_folder_token: str):
async with api_client as client:
files = await client.list_all_folder_files(feishu_folder_token, recursive=True)
return files
files = asyncio.run(async_get_files(api_client, feishu_folder_token))
# Filter out folders, only sync documents
documents = [f for f in files if f.type in ["doc", "docx", "sheet", "bitable", "file"]]
for doc in documents:
file_urls.add(doc.url)
db_file = db.query(File).filter(File.kb_id == db_knowledge.id,
File.file_url == doc.url).first()
if db_file:
if db_file.created_at == doc.modified_time: # same
continue
else: # --update
# 1. update file
# Construct a save path/files/{kb_id}/{parent_id}/{file.id}{file_extension}
save_dir = os.path.join(settings.FILE_PATH, str(db_knowledge.id),
str(db_knowledge.parent_id))
Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists
# download document from Feishu FileInfo
async def async_download_document(api_client: FeishuAPIClient, doc: FileInfo, save_dir: str):
async with api_client as client:
file_path = await client.download_document(document=doc, save_dir=save_dir)
return file_path
file_path = asyncio.run(async_download_document(api_client, doc, save_dir))
save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}")
# update file
if os.path.exists(save_path):
os.remove(save_path) # Delete a single file
shutil.copyfile(file_path, save_path)
# update db_file
file_name = os.path.basename(file_path)
_, file_extension = os.path.splitext(file_name)
file_size = os.path.getsize(file_path)
db_file.file_name = file_name
db_file.file_ext = file_extension.lower()
db_file.file_size = file_size
db_file.created_at = doc.modified_time
db.commit()
db.refresh(db_file)
# 2. update a document
db_document = db.query(Document).filter(Document.kb_id == db_knowledge.id,
Document.file_id == db_file.id).first()
if db_document:
db_document.file_name = db_file.file_name
db_document.file_ext = db_file.file_ext
db_document.file_size = db_file.file_size
db_document.created_at = db_file.created_at
db_document.updated_at = datetime.now()
db.commit()
db.refresh(db_document)
# 3. Document parsing, vectorization, and storage
parse_document(file_path=save_path, document_id=db_document.id)
else: # --add
# 1. update file
# Construct a save path/files/{kb_id}/{parent_id}/{file.id}{file_extension}
save_dir = os.path.join(settings.FILE_PATH, str(db_knowledge.id),
str(db_knowledge.parent_id))
Path(save_dir).mkdir(parents=True, exist_ok=True) # Ensure that the directory exists
# download document from Feishu FileInfo
async def async_download_document(api_client: FeishuAPIClient, doc: FileInfo, save_dir: str):
async with api_client as client:
file_path = await client.download_document(document=doc, save_dir=save_dir)
return file_path
file_path = asyncio.run(async_download_document(api_client, doc, save_dir))
# add db_file
file_name = os.path.basename(file_path)
_, file_extension = os.path.splitext(file_name)
file_size = os.path.getsize(file_path)
upload_file = file_schema.FileCreate(
kb_id=db_knowledge.id,
created_by=db_knowledge.created_by,
parent_id=db_knowledge.id,
file_name=file_name,
file_ext=file_extension.lower(),
file_size=file_size,
file_url=doc.url,
created_at = doc.modified_time
)
db_file = File(**upload_file.model_dump())
db.add(db_file)
db.commit()
# Save file
save_path = os.path.join(save_dir, f"{db_file.id}{db_file.file_ext}")
# update file
if os.path.exists(save_path):
os.remove(save_path) # Delete a single file
shutil.copyfile(file_path, save_path)
# 2. Create a document
create_document_data = document_schema.DocumentCreate(
kb_id=db_knowledge.id,
created_by=db_knowledge.created_by,
file_id=db_file.id,
file_name=db_file.file_name,
file_ext=db_file.file_ext,
file_size=db_file.file_size,
file_meta={},
parser_id="naive",
parser_config={
"layout_recognize": "DeepDOC",
"chunk_token_num": 128,
"delimiter": "\n",
"auto_keywords": 0,
"auto_questions": 0,
"html4excel": "false"
}
)
db_document = Document(**create_document_data.model_dump())
db.add(db_document)
db.commit()
# 3. Document parsing, vectorization, and storage
parse_document(file_path=save_path, document_id=db_document.id)
db_files = db.query(File).filter(File.kb_id == db_knowledge.id,
File.file_url.notin_(file_urls)).all()
if db_files: # --delete
for db_file in db_files:
db_document = db.query(Document).filter(Document.kb_id == db_knowledge.id,
Document.file_id == db_file.id).first()
if db_document:
# 1. Delete vector index
vector_service.delete_by_metadata_field(key="document_id",
value=str(db_document.id))
# 2. Delete document
db.delete(db_document)
# 3. Delete file
file_path = Path(
settings.FILE_PATH,
str(db_file.kb_id),
str(db_file.parent_id),
f"{db_file.id}{db_file.file_ext}"
)
if file_path.exists():
file_path.unlink() # Delete a single file
db.delete(db_file)
# commit transaction
db.commit()
except Exception as e:
print(f"\n\nError during fetch feishu: {e}")
case _: # General
print(f"General: No synchronization needed\n")
result = f"sync knowledge '{db_knowledge.name}' processed successfully."
return result
except Exception as e:
if 'db_knowledge' in locals():
print(f"Failed to sync knowledge:{str(e)}\n")
result = f"sync knowledge '{db_knowledge.name}' failed."
return result
finally:
db.close()
@celery_app.task(name="app.core.memory.agent.read_message", bind=True)
def read_message_task(self, end_user_id: str, message: str, history: List[Dict[str, Any]], search_switch: str, config_id: str, storage_type:str, user_rag_memory_id:str) -> Dict[str, Any]: