446 lines
18 KiB
Python
446 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from pypdf import PdfWriter
|
|
|
|
import pdf2md.conversion as conversion_module
|
|
from pdf2md.conversion import BatchConversionResult, convert_input, convert_pdf, recheck_markdown
|
|
from pdf2md.ir import WarningCode, WarningRecord, WarningSeverity
|
|
from pdf2md.mineru_adapter import MinerUAdapterResult, StrictLocalViolationError
|
|
from pdf2md.paths import OutputConflictError
|
|
|
|
|
|
class FakeAdapter:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
raw_markdown: str = "# Title\n",
|
|
raw_structured: object | None = None,
|
|
succeeded: bool = True,
|
|
warnings: tuple[WarningRecord, ...] = (),
|
|
asset_name: str | None = None,
|
|
) -> None:
|
|
self.raw_markdown = raw_markdown
|
|
self.raw_structured = raw_structured
|
|
self.succeeded = succeeded
|
|
self.warnings = warnings
|
|
self.asset_name = asset_name
|
|
self.calls: list[tuple[Path, Path, object]] = []
|
|
|
|
def convert(self, input_pdf, work_dir, options=None) -> MinerUAdapterResult:
|
|
input_path = Path(input_pdf)
|
|
output_dir = Path(work_dir)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
(output_dir / "raw.log").write_text("raw output", encoding="utf-8")
|
|
self.calls.append((input_path, output_dir, options))
|
|
asset_paths: tuple[Path, ...] = ()
|
|
if self.asset_name is not None:
|
|
asset_path = output_dir / "assets" / self.asset_name
|
|
asset_path.parent.mkdir(parents=True, exist_ok=True)
|
|
asset_path.write_bytes(b"asset")
|
|
asset_paths = (asset_path,)
|
|
return MinerUAdapterResult(
|
|
succeeded=self.succeeded,
|
|
command=("mineru", "-p", str(input_path), "-o", str(output_dir)),
|
|
input_pdf=input_path,
|
|
work_dir=output_dir,
|
|
raw_markdown=self.raw_markdown if self.succeeded else None,
|
|
raw_structured=self.raw_structured,
|
|
asset_paths=asset_paths,
|
|
warnings=self.warnings,
|
|
engine="MinerU",
|
|
engine_version="3.1.0",
|
|
engine_options=options.to_engine_options() if options is not None else {"strict_local": True},
|
|
exit_code=0 if self.succeeded else 2,
|
|
stdout="",
|
|
stderr="",
|
|
)
|
|
|
|
|
|
class SequencedAdapter:
|
|
def __init__(self, outcomes: tuple[bool, ...]) -> None:
|
|
self.outcomes = list(outcomes)
|
|
self.calls: list[Path] = []
|
|
|
|
def convert(self, input_pdf, work_dir, options=None) -> MinerUAdapterResult:
|
|
input_path = Path(input_pdf)
|
|
output_dir = Path(work_dir)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.calls.append(input_path)
|
|
succeeded = self.outcomes.pop(0)
|
|
warning = WarningRecord(WarningCode.MINERU_CLI_FAILED, WarningSeverity.ERROR, "MinerU failed.")
|
|
return MinerUAdapterResult(
|
|
succeeded=succeeded,
|
|
command=("mineru", "-p", str(input_path), "-o", str(output_dir)),
|
|
input_pdf=input_path,
|
|
work_dir=output_dir,
|
|
raw_markdown=f"# {input_path.stem}\n" if succeeded else None,
|
|
raw_structured={"pages": 1},
|
|
asset_paths=(),
|
|
warnings=() if succeeded else (warning,),
|
|
engine="MinerU",
|
|
engine_version="3.1.0",
|
|
engine_options=options.to_engine_options() if options is not None else {"strict_local": True},
|
|
exit_code=0 if succeeded else 2,
|
|
stdout="",
|
|
stderr="",
|
|
)
|
|
|
|
|
|
class NestedMinerUAssetAdapter:
|
|
def convert(self, input_pdf, work_dir, options=None) -> MinerUAdapterResult:
|
|
input_path = Path(input_pdf)
|
|
output_dir = Path(work_dir)
|
|
asset_path = output_dir / "paper" / "hybrid_auto" / "images" / "fig.png"
|
|
asset_path.parent.mkdir(parents=True, exist_ok=True)
|
|
asset_path.write_bytes(b"nested asset")
|
|
return MinerUAdapterResult(
|
|
succeeded=True,
|
|
command=("mineru", "-p", str(input_path), "-o", str(output_dir)),
|
|
input_pdf=input_path,
|
|
work_dir=output_dir,
|
|
raw_markdown="\n\n\\[x^2\\]\n",
|
|
raw_structured=[{"page_idx": 0}, {"page_idx": 12}],
|
|
asset_paths=(asset_path,),
|
|
warnings=(),
|
|
engine="MinerU",
|
|
engine_version="3.1.0",
|
|
engine_options=options.to_engine_options() if options is not None else {"strict_local": True},
|
|
exit_code=0,
|
|
stdout="",
|
|
stderr="",
|
|
)
|
|
|
|
|
|
def fixed_clock() -> datetime:
|
|
return datetime(2026, 5, 8, tzinfo=timezone.utc)
|
|
|
|
|
|
def make_pdf(tmp_path: Path, name: str = "paper.pdf") -> Path:
|
|
path = tmp_path / name
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_bytes(b"%PDF-1.7\nlocal fixture\n")
|
|
return path
|
|
|
|
|
|
def make_pdf_with_pages(tmp_path: Path, page_count: int, name: str = "paper.pdf") -> Path:
|
|
path = tmp_path / name
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
writer = PdfWriter()
|
|
for _ in range(page_count):
|
|
writer.add_blank_page(width=72, height=72)
|
|
with path.open("wb") as file:
|
|
writer.write(file)
|
|
return path
|
|
|
|
|
|
def test_convert_pdf_writes_markdown_metadata_report_and_assets(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
adapter = FakeAdapter(
|
|
raw_markdown="# Title\n\nInline \\(x_i\\)\n\n\n",
|
|
raw_structured={"pages": [{}, {}]},
|
|
asset_name="fig.png",
|
|
)
|
|
|
|
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, math_checker=lambda _: True, clock=fixed_clock)
|
|
|
|
assert result.succeeded is True
|
|
assert result.final_status == "success"
|
|
assert result.pages_processed == 2
|
|
assert result.warning_count == 0
|
|
assert result.engine == "MinerU"
|
|
assert result.engine_version == "3.1.0"
|
|
assert result.markdown_path.read_text(encoding="utf-8") == "# Title\n\nInline $x_i$\n\n\n"
|
|
assert (tmp_path / "out" / "paper.assets" / "fig.png").read_bytes() == b"asset"
|
|
assert result.report_path.exists()
|
|
|
|
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
|
|
assert metadata["source_sha256"] == hashlib.sha256(pdf.read_bytes()).hexdigest()
|
|
assert metadata["created_at"] == "2026-05-08T00:00:00Z"
|
|
assert metadata["summary"]["pages_processed"] == 2
|
|
assert metadata["summary"]["inline_formula_count"] == 1
|
|
assert metadata["summary"]["asset_count"] == 1
|
|
assert metadata["assets"] == [{"relative_path": "paper.assets/fig.png"}]
|
|
assert "- Final status: `success`" in result.report_path.read_text(encoding="utf-8")
|
|
assert not adapter.calls[0][1].exists()
|
|
|
|
|
|
def test_convert_pdf_adapter_failure_returns_failed_result_without_fallback_or_outputs(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
warning = WarningRecord(WarningCode.MINERU_CLI_FAILED, WarningSeverity.ERROR, "MinerU failed.")
|
|
adapter = FakeAdapter(succeeded=False, warnings=(warning,))
|
|
|
|
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, clock=fixed_clock)
|
|
|
|
assert result.succeeded is False
|
|
assert result.final_status == "failed"
|
|
assert result.warnings == (warning,)
|
|
assert len(adapter.calls) == 1
|
|
assert not result.markdown_path.exists()
|
|
assert not result.report_path.exists()
|
|
|
|
|
|
def test_convert_pdf_respects_output_conflicts_and_overwrite(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
out = tmp_path / "out"
|
|
out.mkdir()
|
|
(out / "paper.md").write_text("old", encoding="utf-8")
|
|
|
|
with pytest.raises(OutputConflictError):
|
|
convert_pdf(pdf, out, adapter=FakeAdapter(), clock=fixed_clock)
|
|
|
|
result = convert_pdf(pdf, out, adapter=FakeAdapter(raw_markdown="new\n"), clock=fixed_clock, overwrite=True)
|
|
|
|
assert result.succeeded is True
|
|
assert result.markdown_path.read_text(encoding="utf-8") == "new\n"
|
|
|
|
|
|
def test_convert_pdf_can_skip_metadata_json_but_still_writes_report(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
|
|
result = convert_pdf(pdf, tmp_path / "out", metadata=False, adapter=FakeAdapter(), clock=fixed_clock)
|
|
|
|
assert result.metadata_path is None
|
|
assert result.markdown_path.exists()
|
|
assert result.report_path.exists()
|
|
assert not (tmp_path / "out" / "paper.metadata.json").exists()
|
|
report = result.report_path.read_text(encoding="utf-8")
|
|
assert "Metadata JSON:" not in report
|
|
assert "Report Markdown:" in report
|
|
|
|
|
|
def test_convert_pdf_records_math_checker_failures_in_metadata_and_report(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
adapter = FakeAdapter(raw_markdown="Inline \\(bad_math\\)\n")
|
|
|
|
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, math_checker=lambda _: False, clock=fixed_clock)
|
|
|
|
assert result.final_status == "partial"
|
|
assert [warning.code for warning in result.warnings] == [WarningCode.MATH_RENDER_FAILED]
|
|
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
|
|
assert metadata["summary"]["math_render_error_count"] == 1
|
|
assert metadata["warnings"][0]["code"] == "MATH_RENDER_FAILED"
|
|
report = result.report_path.read_text(encoding="utf-8")
|
|
assert "- Math render error count: 1" in report
|
|
assert "`MATH_RENDER_FAILED`" in report
|
|
|
|
|
|
def test_recheck_markdown_regenerates_metadata_and_report_from_current_markdown(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
adapter = FakeAdapter(raw_markdown="Inline \\(bad_math\\)\n")
|
|
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, math_checker=lambda _: False, clock=fixed_clock)
|
|
|
|
result.markdown_path.write_text("Inline $x_i$\n", encoding="utf-8")
|
|
rechecked = recheck_markdown(result.markdown_path, math_checker=lambda _: True, clock=fixed_clock)
|
|
|
|
assert rechecked.final_status == "success"
|
|
assert rechecked.warning_count == 0
|
|
assert rechecked.markdown_path == result.markdown_path
|
|
assert rechecked.metadata_path == result.metadata_path
|
|
assert rechecked.report_path == result.report_path
|
|
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
|
|
assert metadata["source_sha256"] == hashlib.sha256(pdf.read_bytes()).hexdigest()
|
|
assert metadata["created_at"] == "2026-05-08T00:00:00Z"
|
|
assert metadata["summary"]["pages_processed"] == 1
|
|
assert metadata["summary"]["inline_formula_count"] == 1
|
|
assert metadata["summary"]["math_render_error_count"] == 0
|
|
assert metadata["summary"]["warning_count"] == 0
|
|
assert metadata["warnings"] == []
|
|
report = result.report_path.read_text(encoding="utf-8")
|
|
assert "- Final status: `success`" in report
|
|
assert "- Math render error count: 0" in report
|
|
assert "- None" in report
|
|
|
|
|
|
def test_convert_pdf_records_unavailable_math_checker_for_math_output(tmp_path: Path, monkeypatch) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
adapter = FakeAdapter(raw_markdown="Inline \\(x\\)\n")
|
|
monkeypatch.setattr(conversion_module, "create_default_math_checker", lambda: None)
|
|
|
|
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, clock=fixed_clock)
|
|
|
|
assert result.final_status == "partial"
|
|
assert result.warnings[0].code == WarningCode.MATH_RENDER_FAILED
|
|
assert result.warnings[0].severity == WarningSeverity.INFO
|
|
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
|
|
assert metadata["summary"]["warning_count"] == 1
|
|
assert metadata["summary"]["math_render_error_count"] == 0
|
|
report = result.report_path.read_text(encoding="utf-8")
|
|
assert "unavailable" in report
|
|
assert "- Math render error count: 0" in report
|
|
|
|
|
|
def test_convert_pdf_uses_default_math_checker_when_available(tmp_path: Path, monkeypatch) -> None:
|
|
class DefaultChecker:
|
|
def __init__(self) -> None:
|
|
self.bodies: list[str] = []
|
|
|
|
def check_expressions(self, expressions):
|
|
self.bodies = [expression.body for expression in expressions]
|
|
return (True,)
|
|
|
|
checker = DefaultChecker()
|
|
pdf = make_pdf(tmp_path)
|
|
adapter = FakeAdapter(raw_markdown="Inline \\(x\\)\n")
|
|
monkeypatch.setattr(conversion_module, "create_default_math_checker", lambda: checker)
|
|
|
|
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, clock=fixed_clock)
|
|
|
|
assert result.final_status == "success"
|
|
assert result.warning_count == 0
|
|
assert checker.bodies == ["x"]
|
|
|
|
|
|
def test_convert_pdf_keep_raw_preserves_adapter_work_directory(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
|
|
result = convert_pdf(pdf, tmp_path / "out", keep_raw=True, adapter=FakeAdapter(), clock=fixed_clock)
|
|
|
|
assert result.raw_dir == tmp_path / "out" / "paper.raw"
|
|
assert (result.raw_dir / "raw.log").read_text(encoding="utf-8") == "raw output"
|
|
|
|
|
|
def test_convert_pdf_rejects_disabling_strict_local(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
|
|
with pytest.raises(StrictLocalViolationError):
|
|
convert_pdf(pdf, tmp_path / "out", strict_local=False, adapter=FakeAdapter(), clock=fixed_clock)
|
|
|
|
|
|
def test_convert_pdf_passes_gpu_device_to_strict_local_options(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
adapter = FakeAdapter()
|
|
|
|
convert_pdf(pdf, tmp_path / "out", gpu="cuda:0", adapter=adapter, clock=fixed_clock)
|
|
|
|
assert adapter.calls[0][2].to_engine_options() == {"strict_local": True, "gpu_device": "cuda:0"}
|
|
|
|
|
|
def test_convert_pdf_defaults_to_cuda_zero(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
adapter = FakeAdapter()
|
|
|
|
convert_pdf(pdf, tmp_path / "out", adapter=adapter, clock=fixed_clock)
|
|
|
|
assert adapter.calls[0][2].to_engine_options() == {"strict_local": True, "gpu_device": "cuda:0"}
|
|
|
|
|
|
def test_convert_pdf_rewrites_nested_mineru_image_links_and_page_indexes(tmp_path: Path) -> None:
|
|
pdf = make_pdf(tmp_path)
|
|
|
|
result = convert_pdf(
|
|
pdf,
|
|
tmp_path / "out",
|
|
adapter=NestedMinerUAssetAdapter(),
|
|
math_checker=lambda _: True,
|
|
clock=fixed_clock,
|
|
)
|
|
|
|
assert result.final_status == "success"
|
|
assert result.pages_processed == 13
|
|
markdown = result.markdown_path.read_text(encoding="utf-8")
|
|
assert "" in markdown
|
|
assert "](images/fig.png)" not in markdown
|
|
copied_asset = tmp_path / "out" / "paper.assets" / "paper" / "hybrid_auto" / "images" / "fig.png"
|
|
assert copied_asset.read_bytes() == b"nested asset"
|
|
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
|
|
assert metadata["summary"]["pages_processed"] == 13
|
|
assert metadata["summary"]["warning_count"] == 0
|
|
|
|
|
|
def test_convert_input_batch_continues_after_per_file_failure(tmp_path: Path) -> None:
|
|
source = tmp_path / "pdfs"
|
|
make_pdf(source, "a.pdf")
|
|
make_pdf(source, "b.pdf")
|
|
make_pdf(source, "c.pdf")
|
|
adapter = SequencedAdapter((True, False, True))
|
|
|
|
batch = convert_input(source, tmp_path / "out", adapter=adapter, clock=fixed_clock)
|
|
|
|
assert [path.name for path in adapter.calls] == ["a.pdf", "b.pdf", "c.pdf"]
|
|
assert batch.converted_count == 2
|
|
assert batch.failed_count == 1
|
|
assert (tmp_path / "out" / "a.md").exists()
|
|
assert not (tmp_path / "out" / "b.md").exists()
|
|
assert (tmp_path / "out" / "c.md").exists()
|
|
|
|
|
|
def test_convert_pdf_chunk_mode_returns_batch_and_deletes_temporary_chunk_pdfs(tmp_path: Path) -> None:
|
|
pdf = make_pdf_with_pages(tmp_path, 41, "thesis.pdf")
|
|
adapter = FakeAdapter(raw_structured={"pages": 1})
|
|
|
|
batch = convert_pdf(
|
|
pdf,
|
|
tmp_path / "out",
|
|
adapter=adapter,
|
|
math_checker=lambda _: True,
|
|
chunk_pages=20,
|
|
clock=fixed_clock,
|
|
)
|
|
|
|
assert isinstance(batch, BatchConversionResult)
|
|
assert batch.converted_count == 3
|
|
assert [result.markdown_path.name for result in batch.results] == [
|
|
"thesis.part-001.pages-001-020.md",
|
|
"thesis.part-002.pages-021-040.md",
|
|
"thesis.part-003.pages-041-041.md",
|
|
]
|
|
assert [path.name for path, _, _ in adapter.calls] == [
|
|
"thesis.part-001.pages-001-020.pdf",
|
|
"thesis.part-002.pages-021-040.pdf",
|
|
"thesis.part-003.pages-041-041.pdf",
|
|
]
|
|
assert all(result.source_pdf == pdf.resolve() for result in batch.results)
|
|
assert all(not path.exists() for path, _, _ in adapter.calls)
|
|
|
|
metadata = json.loads((tmp_path / "out" / "thesis.part-002.pages-021-040.metadata.json").read_text(encoding="utf-8"))
|
|
assert metadata["source_pdf"] == str(pdf.resolve())
|
|
assert metadata["source_sha256"] == hashlib.sha256(pdf.read_bytes()).hexdigest()
|
|
assert metadata["engine_options"]["chunk"] == {
|
|
"chunk_index": 2,
|
|
"chunk_page_count": 20,
|
|
"chunk_pdf_name": "thesis.part-002.pages-021-040.pdf",
|
|
"original_source_pdf": str(pdf.resolve()),
|
|
"source_page_end": 40,
|
|
"source_page_start": 21,
|
|
"total_chunks": 3,
|
|
}
|
|
report = (tmp_path / "out" / "thesis.part-002.pages-021-040.report.md").read_text(encoding="utf-8")
|
|
assert "- Chunk: 2/3, source pages: 21-40" in report
|
|
|
|
|
|
def test_convert_pdf_chunk_mode_keeps_short_pdf_as_single_batch_result(tmp_path: Path) -> None:
|
|
pdf = make_pdf_with_pages(tmp_path, 3, "short.pdf")
|
|
adapter = FakeAdapter(raw_structured={"pages": 3})
|
|
|
|
batch = convert_pdf(pdf, tmp_path / "out", adapter=adapter, chunk_pages=20, clock=fixed_clock)
|
|
|
|
assert isinstance(batch, BatchConversionResult)
|
|
assert batch.converted_count == 1
|
|
assert batch.results[0].markdown_path.name == "short.md"
|
|
assert adapter.calls[0][0] == pdf.resolve()
|
|
assert adapter.calls[0][0].exists()
|
|
|
|
|
|
def test_convert_input_chunk_mode_continues_after_failed_chunk(tmp_path: Path) -> None:
|
|
pdf = make_pdf_with_pages(tmp_path, 41, "paper.pdf")
|
|
adapter = SequencedAdapter((True, False, True))
|
|
|
|
batch = convert_input(pdf, tmp_path / "out", adapter=adapter, chunk_pages=20, clock=fixed_clock)
|
|
|
|
assert batch.converted_count == 2
|
|
assert batch.failed_count == 1
|
|
assert [path.name for path in adapter.calls] == [
|
|
"paper.part-001.pages-001-020.pdf",
|
|
"paper.part-002.pages-021-040.pdf",
|
|
"paper.part-003.pages-041-041.pdf",
|
|
]
|
|
assert (tmp_path / "out" / "paper.part-001.pages-001-020.md").exists()
|
|
assert not (tmp_path / "out" / "paper.part-002.pages-021-040.md").exists()
|
|
assert (tmp_path / "out" / "paper.part-003.pages-041-041.md").exists()
|