[modify] parse document workflow, add graph queue hand build graph

This commit is contained in:
Mark
2026-04-13 10:40:58 +08:00
parent 0f50537d7d
commit a96f20ee05

View File

@@ -55,7 +55,7 @@ VIDEO_IMAGE_PATTERN = re.compile(
)
DEFAULT_PARSE_LANGUAGE = "Chinese"
DEFAULT_PARSE_TO_PAGE = 100_000
EMBEDDING_BATCH_SIZE = 100
EMBEDDING_BATCH_SIZE = int(os.getenv("EMBEDDING_BATCH_SIZE", "20"))
# Embedding 并发写入的最大线程数,需根据模型 API rate limit 调整
EMBEDDING_MAX_WORKERS = int(os.getenv("EMBEDDING_MAX_WORKERS", "3"))
# auto_questions LLM 并发调用的最大线程数
@@ -369,22 +369,28 @@ def parse_document(file_path: str, document_id: uuid.UUID):
try:
vector_service.add_chunks(batch_chunks)
except Exception as exc:
batch_errors[batch_idx] = exc
logger.warning(f"[ParseDoc] batch {batch_idx} failed, retrying: {exc}")
try:
vector_service.add_chunks(batch_chunks)
except Exception as retry_exc:
logger.error(f"[ParseDoc] batch {batch_idx} retry failed: {retry_exc}", exc_info=True)
batch_errors[batch_idx] = retry_exc
with ThreadPoolExecutor(max_workers=EMBEDDING_MAX_WORKERS) as executor:
futures = {
executor.submit(_embed_and_store, i, batch_chunks): i
for i, batch_chunks in enumerate(all_batch_chunks)
}
# 按提交顺序收集结果,逐批更新进度
for future in futures:
future.result() # 等待完成(异常已在 _embed_and_store 内捕获)
future.result()
# 如果有 batch 失败,汇总抛出
if batch_errors:
failed = ", ".join(str(i) for i in sorted(batch_errors))
first_err = next(iter(batch_errors.values()))
raise RuntimeError(f"Embedding failed for batch(es) [{failed}]: {first_err}") from first_err
failed_detail = "; ".join(
f"batch {i}: {type(err).__name__}: {err}"
for i, err in sorted(batch_errors.items())
)
raise RuntimeError(f"Embedding failed for {len(batch_errors)}/{total_batches} batch(es). {failed_detail}")
# 所有 batch 完成后一次性更新进度
db_document.progress = 0.8 + 0.2 # 直接到 1.0 前的状态