diff --git a/api/app/controllers/chunk_controller.py b/api/app/controllers/chunk_controller.py index e1fdaa89..fee26669 100644 --- a/api/app/controllers/chunk_controller.py +++ b/api/app/controllers/chunk_controller.py @@ -271,6 +271,9 @@ async def create_chunk( "sort_id": sort_id, "status": 1, } + # QA chunk: 注入 chunk_type/question/answer 到 metadata + if create_data.is_qa: + metadata.update(create_data.qa_metadata) chunk = DocumentChunk(page_content=content, metadata=metadata) # 3. Segmented vector storage vector_service.add_chunks([chunk]) @@ -342,6 +345,9 @@ async def update_chunk( if total: chunk = items[0] chunk.page_content = content + # QA chunk: 更新 metadata 中的 question/answer + if update_data.is_qa: + chunk.metadata.update(update_data.qa_metadata) vector_service.update_by_segment(chunk) return success(data=jsonable_encoder(chunk), msg="The document chunk has been successfully updated") else: diff --git a/api/app/core/rag/graphrag/general/index.py b/api/app/core/rag/graphrag/general/index.py index 1bd826ca..1f1ee756 100644 --- a/api/app/core/rag/graphrag/general/index.py +++ b/api/app/core/rag/graphrag/general/index.py @@ -46,7 +46,10 @@ async def run_graphrag( start = trio.current_time() workspace_id, kb_id, document_id = row["workspace_id"], str(row["kb_id"]), row["document_id"] chunks = [] - for d in settings.retriever.chunk_list(document_id, workspace_id, [kb_id], fields=["page_content", "document_id"], sort_by_position=True): + for d in settings.retriever.chunk_list(document_id, workspace_id, [kb_id], fields=["page_content", "document_id", "chunk_type"], sort_by_position=True): + # 跳过 QA chunks,只用原文 chunks 构建图谱 + if d.get("chunk_type") == "qa": + continue chunks.append(d["page_content"]) with trio.fail_after(max(120, len(chunks) * 60 * 10) if enable_timeout_assertion else 10000000000): @@ -150,6 +153,9 @@ async def run_graphrag_for_kb( total, items = vector_service.search_by_segment(document_id=str(document_id), query=None, pagesize=9999, page=1, asc=True) for doc in items: + # 跳过 QA chunks,只用原文 chunks 构建图谱 + if (doc.metadata or {}).get("chunk_type") == "qa": + continue content = doc.page_content if num_tokens_from_string(current_chunk + content) < 1024: current_chunk += content diff --git a/api/app/core/rag/prompts/generator.py b/api/app/core/rag/prompts/generator.py index 642d0849..88e9171f 100644 --- a/api/app/core/rag/prompts/generator.py +++ b/api/app/core/rag/prompts/generator.py @@ -131,18 +131,43 @@ def keyword_extraction(chat_mdl, content, topn=3): def question_proposal(chat_mdl, content, topn=3): + """生成问题(向后兼容,返回纯文本问题列表)""" + pairs = qa_proposal(chat_mdl, content, topn) + if not pairs: + return "" + return "\n".join([p["question"] for p in pairs]) + + +def qa_proposal(chat_mdl, content, topn=3): + """生成 QA 对,返回 [{"question": ..., "answer": ...}, ...]""" template = PROMPT_JINJA_ENV.from_string(QUESTION_PROMPT_TEMPLATE) rendered_prompt = template.render(content=content, topn=topn) msg = [{"role": "system", "content": rendered_prompt}, {"role": "user", "content": "Output: "}] _, msg = message_fit_in(msg, getattr(chat_mdl, 'max_length', 8096)) - kwd = chat_mdl.chat(rendered_prompt, msg[1:], {"temperature": 0.2}) - if isinstance(kwd, tuple): - kwd = kwd[0] - kwd = re.sub(r"^.*", "", kwd, flags=re.DOTALL) - if kwd.find("**ERROR**") >= 0: - return "" - return kwd + raw = chat_mdl.chat(rendered_prompt, msg[1:], {"temperature": 0.2}) + if isinstance(raw, tuple): + raw = raw[0] + raw = re.sub(r"^.*", "", raw, flags=re.DOTALL) + if raw.find("**ERROR**") >= 0: + return [] + return parse_qa_pairs(raw) + + +def parse_qa_pairs(text: str) -> list: + """解析 LLM 返回的 QA 对文本,格式: Q: xxx A: xxx""" + pairs = [] + for line in text.strip().split("\n"): + line = line.strip() + if not line: + continue + # 匹配 Q: ... A: ... 格式 + match = re.match(r'^Q:\s*(.+?)\s+A:\s*(.+)$', line, re.IGNORECASE) + if match: + q, a = match.group(1).strip(), match.group(2).strip() + if q and a: + pairs.append({"question": q, "answer": a}) + return pairs def graph_entity_types(chat_mdl, scenario): diff --git a/api/app/core/rag/prompts/question_prompt.md b/api/app/core/rag/prompts/question_prompt.md index ec9889fb..91e43d05 100644 --- a/api/app/core/rag/prompts/question_prompt.md +++ b/api/app/core/rag/prompts/question_prompt.md @@ -1,19 +1,24 @@ ## Role -You are a text analyzer. +You are a text analyzer and knowledge extraction expert. ## Task -Propose {{ topn }} questions about a given piece of text content. +Generate {{ topn }} question-answer pairs from the given text content. ## Requirements -- Understand and summarize the text content, and propose the top {{ topn }} important questions. +- Understand and summarize the text content, and generate the top {{ topn }} important question-answer pairs. +- Each question-answer pair MUST be on a single line, formatted as: Q: A: - The questions SHOULD NOT have overlapping meanings. - The questions SHOULD cover the main content of the text as much as possible. -- The questions MUST be in the same language as the given piece of text content. -- One question per line. -- Output questions ONLY. +- The answers MUST be concise, accurate, and directly derived from the text content. +- The answers SHOULD be self-contained and understandable without additional context. +- Both questions and answers MUST be in the same language as the given text content. +- Output question-answer pairs ONLY, no extra explanation. + +## Example Output +Q: What is the capital of France? A: The capital of France is Paris. +Q: When was the Eiffel Tower built? A: The Eiffel Tower was built in 1889. --- ## Text Content {{ content }} - diff --git a/api/app/core/rag/vdb/elasticsearch/elasticsearch_vector.py b/api/app/core/rag/vdb/elasticsearch/elasticsearch_vector.py index cc9ec120..3f64ad85 100644 --- a/api/app/core/rag/vdb/elasticsearch/elasticsearch_vector.py +++ b/api/app/core/rag/vdb/elasticsearch/elasticsearch_vector.py @@ -53,13 +53,30 @@ class ElasticSearchVector(BaseVector): return "elasticsearch" def add_chunks(self, chunks: list[DocumentChunk], **kwargs): - # 实现 Elasticsearch 保存向量 - texts = [chunk.page_content for chunk in chunks] + # QA chunks: embedding 只对 question 字段做;source chunks: 不做 embedding + texts_for_embedding = [] + for chunk in chunks: + chunk_type = (chunk.metadata or {}).get("chunk_type", "chunk") + if chunk_type == "source": + # source chunk 不需要向量索引 + texts_for_embedding.append("") + elif chunk_type == "qa": + # QA chunk: 用 question 字段做 embedding + texts_for_embedding.append((chunk.metadata or {}).get("question", chunk.page_content)) + else: + # 普通 chunk: 用 page_content 做 embedding + texts_for_embedding.append(chunk.page_content) + if self.is_multimodal_embedding: - # 火山引擎多模态 Embedding - embeddings = self.embeddings.embed_batch(texts) + embeddings = self.embeddings.embed_batch(texts_for_embedding) else: - embeddings = self.embeddings.embed_documents(list(texts)) + embeddings = self.embeddings.embed_documents(texts_for_embedding) + + # source chunk 的向量置空 + for i, chunk in enumerate(chunks): + if (chunk.metadata or {}).get("chunk_type") == "source": + embeddings[i] = None + self.create(chunks, embeddings, **kwargs) def create(self, chunks: list[DocumentChunk], embeddings: list[list[float]], **kwargs): @@ -72,13 +89,25 @@ class ElasticSearchVector(BaseVector): uuids = self._get_uuids(chunks) actions = [] for i, chunk in enumerate(chunks): + source = { + Field.CONTENT_KEY.value: chunk.page_content, + Field.METADATA_KEY.value: chunk.metadata or {}, + Field.VECTOR.value: embeddings[i] or None + } + # 写入 QA 相关字段 + meta = chunk.metadata or {} + if meta.get("chunk_type"): + source[Field.CHUNK_TYPE.value] = meta["chunk_type"] + if meta.get("question"): + source[Field.QUESTION.value] = meta["question"] + if meta.get("answer"): + source[Field.ANSWER.value] = meta["answer"] + if meta.get("source_chunk_id"): + source[Field.SOURCE_CHUNK_ID.value] = meta["source_chunk_id"] + action = { "_index": self._collection_name, - "_source": { - Field.CONTENT_KEY.value: chunk.page_content, - Field.METADATA_KEY.value: chunk.metadata or {}, - Field.VECTOR.value: embeddings[i] or None - } + "_source": source } actions.append(action) # using bulk mode @@ -241,10 +270,19 @@ class ElasticSearchVector(BaseVector): for res in result["hits"]["hits"]: source = res["_source"] page_content = source.get(Field.CONTENT_KEY.value) - # vector = source.get(Field.VECTOR.value) vector = None metadata = source.get(Field.METADATA_KEY.value, {}) + chunk_type = source.get(Field.CHUNK_TYPE.value) score = res["_score"] + + # 将 QA 字段注入 metadata 供前端展示 + if chunk_type: + metadata["chunk_type"] = chunk_type + if chunk_type == "qa": + metadata["question"] = source.get(Field.QUESTION.value, "") + metadata["answer"] = source.get(Field.ANSWER.value, "") + page_content = f"Q: {metadata['question']}\nA: {metadata['answer']}" + docs_and_scores.append((DocumentChunk(page_content=page_content, vector=vector, metadata=metadata), score)) docs = [] @@ -308,27 +346,43 @@ class ElasticSearchVector(BaseVector): Returns: updated count. """ - indices = kwargs.get("indices", self._collection_name) # Default single index, multi-index available,etc "index1,index2,index3" - if self.is_multimodal_embedding: - # 火山引擎多模态 Embedding - chunk.vector = self.embeddings.embed_text(chunk.page_content) + indices = kwargs.get("indices", self._collection_name) + chunk_type = (chunk.metadata or {}).get("chunk_type") + + # QA chunk: embedding 基于 question;source chunk: 不更新向量 + if chunk_type == "source": + embed_text = "" + elif chunk_type == "qa": + embed_text = (chunk.metadata or {}).get("question", chunk.page_content) else: - chunk.vector = self.embeddings.embed_query(chunk.page_content) + embed_text = chunk.page_content + + if chunk_type != "source": + if self.is_multimodal_embedding: + chunk.vector = self.embeddings.embed_text(embed_text) + else: + chunk.vector = self.embeddings.embed_query(embed_text) + + script_source = "ctx._source.page_content = params.new_content; ctx._source.vector = params.new_vector;" + params = { + "new_content": chunk.page_content, + "new_vector": chunk.vector if chunk_type != "source" else None + } + + # QA chunk: 同时更新 question/answer 字段 + if chunk_type == "qa": + script_source += " ctx._source.question = params.new_question; ctx._source.answer = params.new_answer;" + params["new_question"] = (chunk.metadata or {}).get("question", "") + params["new_answer"] = (chunk.metadata or {}).get("answer", "") body = { "script": { - "source": """ - ctx._source.page_content = params.new_content; - ctx._source.vector = params.new_vector; - """, - "params": { - "new_content": chunk.page_content, - "new_vector": chunk.vector - } + "source": script_source, + "params": params }, "query": { "term": { - Field.DOC_ID.value: chunk.metadata["doc_id"] # exact match doc_id + Field.DOC_ID.value: chunk.metadata["doc_id"] } } } @@ -336,9 +390,6 @@ class ElasticSearchVector(BaseVector): index=indices, body=body, ) - # Remove debug printing and use logging instead - # print(result) - # print(f"Update successful, number of affected documents: {result['updated']}") return result['updated'] def change_status_by_document_id(self, document_id: str, status: int, **kwargs) -> str: @@ -397,11 +448,11 @@ class ElasticSearchVector(BaseVector): } } }, - "filter": { # Add the filter condition of status=1 - "term": { - "metadata.status": 1 - } - } + "filter": [ + {"term": {"metadata.status": 1}}, + # 排除 source chunk(仅供 GraphRAG 使用,不参与检索) + {"bool": {"must_not": {"term": {Field.CHUNK_TYPE.value: "source"}}}} + ] } } # If file_names_filter is passed in, merge the filtering conditions @@ -415,22 +466,14 @@ class ElasticSearchVector(BaseVector): }, "script": { "source": f"cosineSimilarity(params.query_vector, '{Field.VECTOR.value}') + 1.0", - # The script_score query calculates the cosine similarity between the embedding field of each document and the query vector. The addition of +1.0 is to ensure that the scores returned by the script are non-negative, as the range of cosine similarity is [-1, 1] "params": {"query_vector": query_vector} } } }, "filter": [ - { - "term": { - "metadata.status": 1 - } - }, - { - "terms": { - "metadata.file_name": file_names_filter # Additional file_name filtering - } - } + {"term": {"metadata.status": 1}}, + {"terms": {"metadata.file_name": file_names_filter}}, + {"bool": {"must_not": {"term": {Field.CHUNK_TYPE.value: "source"}}}} ], } } @@ -451,8 +494,19 @@ class ElasticSearchVector(BaseVector): source = res["_source"] page_content = source.get(Field.CONTENT_KEY.value) metadata = source.get(Field.METADATA_KEY.value, {}) + chunk_type = source.get(Field.CHUNK_TYPE.value) score = res["_score"] score = score / 2 # Normalized [0-1] + + # QA chunk: 返回 Q+A 拼接作为上下文 + if chunk_type == "qa": + question = source.get(Field.QUESTION.value, "") + answer = source.get(Field.ANSWER.value, "") + page_content = f"Q: {question}\nA: {answer}" + metadata["chunk_type"] = "qa" + metadata["question"] = question + metadata["answer"] = answer + docs_and_scores.append((DocumentChunk(page_content=page_content, metadata=metadata), score)) docs = [] @@ -491,11 +545,10 @@ class ElasticSearchVector(BaseVector): } } }, - "filter": { # Add the filter condition of status=1 - "term": { - "metadata.status": 1 - } - } + "filter": [ + {"term": {"metadata.status": 1}}, + {"bool": {"must_not": {"term": {Field.CHUNK_TYPE.value: "source"}}}} + ] } } @@ -512,16 +565,9 @@ class ElasticSearchVector(BaseVector): } }, "filter": [ - { - "term": { - "metadata.status": 1 - } - }, - { - "terms": { - "metadata.file_name": file_names_filter # Additional file_name filtering - } - } + {"term": {"metadata.status": 1}}, + {"terms": {"metadata.file_name": file_names_filter}}, + {"bool": {"must_not": {"term": {Field.CHUNK_TYPE.value: "source"}}}} ], } } @@ -543,6 +589,17 @@ class ElasticSearchVector(BaseVector): source = res["_source"] page_content = source.get(Field.CONTENT_KEY.value) metadata = source.get(Field.METADATA_KEY.value, {}) + chunk_type = source.get(Field.CHUNK_TYPE.value) + + # QA chunk: 返回 Q+A 拼接作为上下文 + if chunk_type == "qa": + question = source.get(Field.QUESTION.value, "") + answer = source.get(Field.ANSWER.value, "") + page_content = f"Q: {question}\nA: {answer}" + metadata["chunk_type"] = "qa" + metadata["question"] = question + metadata["answer"] = answer + # Normalize the score to the [0,1] interval normalized_score = res["_score"] / max_score docs_and_scores.append((DocumentChunk(page_content=page_content, metadata=metadata), normalized_score)) diff --git a/api/app/core/rag/vdb/field.py b/api/app/core/rag/vdb/field.py index 99d872c2..5f20a21a 100644 --- a/api/app/core/rag/vdb/field.py +++ b/api/app/core/rag/vdb/field.py @@ -14,3 +14,8 @@ class Field(StrEnum): DOCUMENT_ID = "metadata.document_id" KNOWLEDGE_ID = "metadata.knowledge_id" SORT_ID = "metadata.sort_id" + # QA fields + CHUNK_TYPE = "chunk_type" # "chunk" | "source" | "qa" + QUESTION = "question" + ANSWER = "answer" + SOURCE_CHUNK_ID = "source_chunk_id" diff --git a/api/app/schemas/chunk_schema.py b/api/app/schemas/chunk_schema.py index ce8f70f2..b888c361 100644 --- a/api/app/schemas/chunk_schema.py +++ b/api/app/schemas/chunk_schema.py @@ -20,13 +20,26 @@ class ChunkCreate(BaseModel): @property def chunk_content(self) -> str: - """ - Get the actual content string regardless of input type - """ + """Get the actual content string regardless of input type""" if isinstance(self.content, QAChunk): - return f"question: {self.content.question} answer: {self.content.answer}" + return self.content.question # QA 模式下 page_content 存 question return self.content + @property + def is_qa(self) -> bool: + return isinstance(self.content, QAChunk) + + @property + def qa_metadata(self) -> dict: + """返回 QA 相关的 metadata 字段""" + if isinstance(self.content, QAChunk): + return { + "chunk_type": "qa", + "question": self.content.question, + "answer": self.content.answer, + } + return {} + class ChunkUpdate(BaseModel): content: Union[str, QAChunk] = Field( @@ -35,13 +48,26 @@ class ChunkUpdate(BaseModel): @property def chunk_content(self) -> str: - """ - Get the actual content string regardless of input type - """ + """Get the actual content string regardless of input type""" if isinstance(self.content, QAChunk): - return f"question: {self.content.question} answer: {self.content.answer}" + return self.content.question # QA 模式下 page_content 存 question return self.content + @property + def is_qa(self) -> bool: + return isinstance(self.content, QAChunk) + + @property + def qa_metadata(self) -> dict: + """返回 QA 相关的 metadata 字段""" + if isinstance(self.content, QAChunk): + return { + "chunk_type": "qa", + "question": self.content.question, + "answer": self.content.answer, + } + return {} + class ChunkRetrieve(BaseModel): query: str diff --git a/api/app/tasks.py b/api/app/tasks.py index 3ad1a0dd..ed961115 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -30,7 +30,7 @@ from app.core.rag.llm.cv_model import QWenCV from app.core.rag.llm.embedding_model import OpenAIEmbed from app.core.rag.llm.sequence2txt_model import QWenSeq2txt from app.core.rag.models.chunk import DocumentChunk -from app.core.rag.prompts.generator import question_proposal +from app.core.rag.prompts.generator import question_proposal, qa_proposal from app.core.rag.vdb.elasticsearch.elasticsearch_vector import ( ElasticSearchVectorFactory, ) @@ -323,57 +323,96 @@ def parse_document(file_key: str, document_id: uuid.UUID, file_name: str = ""): all_batch_chunks: list[list[DocumentChunk]] = [] if auto_questions_topn: - # auto_questions 开启:先并发生成所有 chunk 的问题,再按 batch 分组 - # 构建 (global_idx, item) 列表 + # QA 模式(FastGPT 方案): + # 1. 原 chunk 标记为 source(保留供 GraphRAG 使用,不参与检索) + # 2. LLM 生成 QA 对,每个 QA 对独立存储为 qa chunk indexed_items = list(enumerate(res)) - def _generate_question(idx_item: tuple[int, dict]) -> tuple[int, str]: - """为单个 chunk 生成问题(带缓存),返回 (global_idx, question_text)""" + def _generate_qa(idx_item: tuple[int, dict]) -> tuple[int, list]: + """为单个 chunk 生成 QA 对(带缓存),返回 (global_idx, qa_pairs)""" global_idx, item = idx_item content = item["content_with_weight"] - cached = get_llm_cache(chat_model.model_name, content, "question", + cached = get_llm_cache(chat_model.model_name, content, "qa", {"topn": auto_questions_topn}) if not cached: - cached = question_proposal(chat_model, content, auto_questions_topn) - set_llm_cache(chat_model.model_name, content, cached, "question", + pairs = qa_proposal(chat_model, content, auto_questions_topn) + cached = pairs + set_llm_cache(chat_model.model_name, content, cached, "qa", {"topn": auto_questions_topn}) + elif isinstance(cached, str): + # 兼容旧缓存格式(纯文本问题) + from app.core.rag.prompts.generator import parse_qa_pairs + cached = parse_qa_pairs(cached) if cached else [] return global_idx, cached - # 并发调用 LLM 生成问题 - question_map: dict[int, str] = {} + # 并发调用 LLM 生成 QA 对 + qa_map: dict[int, list] = {} with ThreadPoolExecutor(max_workers=AUTO_QUESTIONS_MAX_WORKERS) as q_executor: - futures = {q_executor.submit(_generate_question, item): item[0] + futures = {q_executor.submit(_generate_qa, item): item[0] for item in indexed_items} for future in futures: - global_idx, cached = future.result() - question_map[global_idx] = cached + global_idx, pairs = future.result() + qa_map[global_idx] = pairs progress_lines.append( - f"{datetime.now().strftime('%H:%M:%S')} Auto questions generated for {total_chunks} chunks " + f"{datetime.now().strftime('%H:%M:%S')} QA pairs generated for {total_chunks} chunks " f"(workers={AUTO_QUESTIONS_MAX_WORKERS}).") - # 按 batch 分组组装 DocumentChunk - for batch_start in range(0, total_chunks, EMBEDDING_BATCH_SIZE): - batch_end = min(batch_start + EMBEDDING_BATCH_SIZE, total_chunks) - chunks = [] - for global_idx in range(batch_start, batch_end): - item = res[global_idx] - metadata = { + # 组装 chunks:source chunks + qa chunks + source_chunks = [] + qa_chunks = [] + qa_sort_id = 0 + + for global_idx in range(total_chunks): + item = res[global_idx] + source_chunk_id = uuid.uuid4().hex + + # source chunk:保留原文,供 GraphRAG 使用,不参与向量检索 + source_meta = { + "doc_id": source_chunk_id, + "file_id": str(db_document.file_id), + "file_name": db_document.file_name, + "file_created_at": int(db_document.created_at.timestamp() * 1000), + "document_id": str(db_document.id), + "knowledge_id": str(db_document.kb_id), + "sort_id": global_idx, + "status": 1, + "chunk_type": "source", + } + source_chunks.append( + DocumentChunk(page_content=item["content_with_weight"], metadata=source_meta)) + + # qa chunks:每个 QA 对独立存储 + pairs = qa_map.get(global_idx, []) + for pair in pairs: + qa_meta = { "doc_id": uuid.uuid4().hex, "file_id": str(db_document.file_id), "file_name": db_document.file_name, "file_created_at": int(db_document.created_at.timestamp() * 1000), "document_id": str(db_document.id), "knowledge_id": str(db_document.kb_id), - "sort_id": global_idx, + "sort_id": qa_sort_id, "status": 1, + "chunk_type": "qa", + "question": pair["question"], + "answer": pair["answer"], + "source_chunk_id": source_chunk_id, } - cached = question_map[global_idx] - chunks.append( - DocumentChunk( - page_content=f"question: {cached} answer: {item['content_with_weight']}", - metadata=metadata)) - all_batch_chunks.append(chunks) + # page_content 存 question,用于向量索引 + qa_chunks.append( + DocumentChunk(page_content=pair["question"], metadata=qa_meta)) + qa_sort_id += 1 + + # 按 batch 分组(source + qa 一起) + all_chunks = source_chunks + qa_chunks + for batch_start in range(0, len(all_chunks), EMBEDDING_BATCH_SIZE): + batch_end = min(batch_start + EMBEDDING_BATCH_SIZE, len(all_chunks)) + all_batch_chunks.append(all_chunks[batch_start:batch_end]) + + progress_lines.append( + f"{datetime.now().strftime('%H:%M:%S')} QA mode: {len(source_chunks)} source chunks + " + f"{len(qa_chunks)} QA chunks prepared.") else: # 无 auto_questions:直接构建 chunks for batch_start in range(0, total_chunks, EMBEDDING_BATCH_SIZE):