fix(db): fix database connection handling

This commit is contained in:
mengyonghao
2025-12-24 12:22:59 +08:00
parent 38220006a6
commit 0a8c1be084

View File

@@ -4,7 +4,7 @@ from typing import Any
from app.core.rag.vdb.elasticsearch.elasticsearch_vector import ElasticSearchVectorFactory from app.core.rag.vdb.elasticsearch.elasticsearch_vector import ElasticSearchVectorFactory
from app.core.workflow.nodes.base_node import BaseNode, WorkflowState from app.core.workflow.nodes.base_node import BaseNode, WorkflowState
from app.core.workflow.nodes.knowledge import KnowledgeRetrievalNodeConfig from app.core.workflow.nodes.knowledge import KnowledgeRetrievalNodeConfig
from app.db import get_db from app.db import get_db_context
from app.models import knowledge_model, knowledgeshare_model from app.models import knowledge_model, knowledgeshare_model
from app.repositories import knowledge_repository from app.repositories import knowledge_repository
from app.schemas.chunk_schema import RetrieveType from app.schemas.chunk_schema import RetrieveType
@@ -20,74 +20,74 @@ class KnowledgeRetrievalNode(BaseNode):
async def execute(self, state: WorkflowState) -> Any: async def execute(self, state: WorkflowState) -> Any:
query = self._render_template(self.typed_config.query, state) query = self._render_template(self.typed_config.query, state)
db = next(get_db()) with get_db_context():
filters = [
knowledge_model.Knowledge.id.in_(self.typed_config.kb_ids),
knowledge_model.Knowledge.permission_id == knowledge_model.PermissionType.Private,
knowledge_model.Knowledge.chunk_num > 0,
knowledge_model.Knowledge.status == 1
]
existing_ids = knowledge_repository.get_chunked_knowledgeids(
db=db,
filters=filters
)
filters = [
knowledge_model.Knowledge.id.in_(self.typed_config.kb_ids),
knowledge_model.Knowledge.permission_id == knowledge_model.PermissionType.Share,
knowledge_model.Knowledge.chunk_num > 0,
knowledge_model.Knowledge.status == 1
]
share_ids = knowledge_service.knowledge_repository.get_chunked_knowledgeids(
db=db,
filters=filters
)
if share_ids:
filters = [ filters = [
knowledgeshare_model.KnowledgeShare.target_kb_id.in_(self.typed_config.kb_ids) knowledge_model.Knowledge.id.in_(self.typed_config.kb_ids),
knowledge_model.Knowledge.permission_id == knowledge_model.PermissionType.Private,
knowledge_model.Knowledge.chunk_num > 0,
knowledge_model.Knowledge.status == 1
] ]
items = knowledgeshare_service.knowledgeshare_repository.get_source_kb_ids_by_target_kb_id( existing_ids = knowledge_repository.get_chunked_knowledgeids(
db=db, db=db,
filters=filters filters=filters
) )
existing_ids.extend(items) filters = [
knowledge_model.Knowledge.id.in_(self.typed_config.kb_ids),
knowledge_model.Knowledge.permission_id == knowledge_model.PermissionType.Share,
knowledge_model.Knowledge.chunk_num > 0,
knowledge_model.Knowledge.status == 1
]
share_ids = knowledge_service.knowledge_repository.get_chunked_knowledgeids(
db=db,
filters=filters
)
if share_ids:
filters = [
knowledgeshare_model.KnowledgeShare.target_kb_id.in_(self.typed_config.kb_ids)
]
items = knowledgeshare_service.knowledgeshare_repository.get_source_kb_ids_by_target_kb_id(
db=db,
filters=filters
)
existing_ids.extend(items)
if not existing_ids: if not existing_ids:
raise RuntimeError("Knowledge base retrieval failed: the knowledge base does not exist.") raise RuntimeError("Knowledge base retrieval failed: the knowledge base does not exist.")
kb_id = existing_ids[0] kb_id = existing_ids[0]
uuid_strs = [f"Vector_index_{kb_id}_Node".lower() for kb_id in existing_ids] uuid_strs = [f"Vector_index_{kb_id}_Node".lower() for kb_id in existing_ids]
indices = ",".join(uuid_strs) indices = ",".join(uuid_strs)
db_knowledge = knowledge_repository.get_knowledge_by_id(db=db, knowledge_id=kb_id) db_knowledge = knowledge_repository.get_knowledge_by_id(db=db, knowledge_id=kb_id)
if not db_knowledge: if not db_knowledge:
raise RuntimeError("The knowledge base does not exist or access is denied.") raise RuntimeError("The knowledge base does not exist or access is denied.")
vector_service = ElasticSearchVectorFactory().init_vector(knowledge=db_knowledge) vector_service = ElasticSearchVectorFactory().init_vector(knowledge=db_knowledge)
match self.typed_config.retrieve_type: match self.typed_config.retrieve_type:
case RetrieveType.PARTICIPLE: case RetrieveType.PARTICIPLE:
rs = vector_service.search_by_full_text(query=query, top_k=self.typed_config.top_k, rs = vector_service.search_by_full_text(query=query, top_k=self.typed_config.top_k,
indices=indices, indices=indices,
score_threshold=self.typed_config.similarity_threshold) score_threshold=self.typed_config.similarity_threshold)
return [chunk.model_dump() for chunk in rs] return [chunk.model_dump() for chunk in rs]
case RetrieveType.SEMANTIC: case RetrieveType.SEMANTIC:
rs = vector_service.search_by_vector(query=query, top_k=self.typed_config.top_k, rs = vector_service.search_by_vector(query=query, top_k=self.typed_config.top_k,
indices=indices,
score_threshold=self.typed_config.vector_similarity_weight)
return [chunk.model_dump() for chunk in rs]
case _:
rs1 = vector_service.search_by_vector(query=query, top_k=self.typed_config.top_k,
indices=indices,
score_threshold=self.typed_config.vector_similarity_weight)
rs2 = vector_service.search_by_full_text(query=query, top_k=self.typed_config.top_k,
indices=indices, indices=indices,
score_threshold=self.typed_config.similarity_threshold) score_threshold=self.typed_config.vector_similarity_weight)
# Efficient deduplication return [chunk.model_dump() for chunk in rs]
seen_ids = set() case _:
unique_rs = [] rs1 = vector_service.search_by_vector(query=query, top_k=self.typed_config.top_k,
for doc in rs1 + rs2: indices=indices,
if doc.metadata["doc_id"] not in seen_ids: score_threshold=self.typed_config.vector_similarity_weight)
seen_ids.add(doc.metadata["doc_id"]) rs2 = vector_service.search_by_full_text(query=query, top_k=self.typed_config.top_k,
unique_rs.append(doc) indices=indices,
rs = vector_service.rerank(query=query, docs=unique_rs, top_k=self.typed_config.top_k) score_threshold=self.typed_config.similarity_threshold)
return [chunk.model_dump() for chunk in rs] # Efficient deduplication
seen_ids = set()
unique_rs = []
for doc in rs1 + rs2:
if doc.metadata["doc_id"] not in seen_ids:
seen_ids.add(doc.metadata["doc_id"])
unique_rs.append(doc)
rs = vector_service.rerank(query=query, docs=unique_rs, top_k=self.typed_config.top_k)
return [chunk.model_dump() for chunk in rs]