From a96f20ee0542baadd5bd3a859b99eac89ac5ac08 Mon Sep 17 00:00:00 2001 From: Mark <348207283@qq.com> Date: Mon, 13 Apr 2026 10:40:58 +0800 Subject: [PATCH] [modify] parse document workflow, add graph queue hand build graph --- api/app/tasks.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/api/app/tasks.py b/api/app/tasks.py index c32b8ad9..98b413c3 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -55,7 +55,7 @@ VIDEO_IMAGE_PATTERN = re.compile( ) DEFAULT_PARSE_LANGUAGE = "Chinese" DEFAULT_PARSE_TO_PAGE = 100_000 -EMBEDDING_BATCH_SIZE = 100 +EMBEDDING_BATCH_SIZE = int(os.getenv("EMBEDDING_BATCH_SIZE", "20")) # Embedding 并发写入的最大线程数,需根据模型 API rate limit 调整 EMBEDDING_MAX_WORKERS = int(os.getenv("EMBEDDING_MAX_WORKERS", "3")) # auto_questions LLM 并发调用的最大线程数 @@ -369,22 +369,28 @@ def parse_document(file_path: str, document_id: uuid.UUID): try: vector_service.add_chunks(batch_chunks) except Exception as exc: - batch_errors[batch_idx] = exc + logger.warning(f"[ParseDoc] batch {batch_idx} failed, retrying: {exc}") + try: + vector_service.add_chunks(batch_chunks) + except Exception as retry_exc: + logger.error(f"[ParseDoc] batch {batch_idx} retry failed: {retry_exc}", exc_info=True) + batch_errors[batch_idx] = retry_exc with ThreadPoolExecutor(max_workers=EMBEDDING_MAX_WORKERS) as executor: futures = { executor.submit(_embed_and_store, i, batch_chunks): i for i, batch_chunks in enumerate(all_batch_chunks) } - # 按提交顺序收集结果,逐批更新进度 for future in futures: - future.result() # 等待完成(异常已在 _embed_and_store 内捕获) + future.result() # 如果有 batch 失败,汇总抛出 if batch_errors: - failed = ", ".join(str(i) for i in sorted(batch_errors)) - first_err = next(iter(batch_errors.values())) - raise RuntimeError(f"Embedding failed for batch(es) [{failed}]: {first_err}") from first_err + failed_detail = "; ".join( + f"batch {i}: {type(err).__name__}: {err}" + for i, err in sorted(batch_errors.items()) + ) + raise RuntimeError(f"Embedding failed for {len(batch_errors)}/{total_batches} batch(es). {failed_detail}") # 所有 batch 完成后一次性更新进度 db_document.progress = 0.8 + 0.2 # 直接到 1.0 前的状态