Files
PDFToMD/tests/test_conversion.py
T
2026-05-11 02:08:46 +09:00

487 lines
20 KiB
Python

from __future__ import annotations
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
import pytest
from pypdf import PdfWriter
import pdf2md.conversion as conversion_module
from pdf2md.conversion import BatchConversionResult, convert_input, convert_pdf, recheck_markdown
from pdf2md.ir import WarningCode, WarningRecord, WarningSeverity
from pdf2md.mineru_adapter import MinerUAdapterResult, StrictLocalViolationError
from pdf2md.paths import OutputConflictError
from pdf2md.quality import MathCheckResult
class FakeAdapter:
def __init__(
self,
*,
raw_markdown: str = "# Title\n",
raw_structured: object | None = None,
succeeded: bool = True,
warnings: tuple[WarningRecord, ...] = (),
asset_name: str | None = None,
) -> None:
self.raw_markdown = raw_markdown
self.raw_structured = raw_structured
self.succeeded = succeeded
self.warnings = warnings
self.asset_name = asset_name
self.calls: list[tuple[Path, Path, object]] = []
def convert(self, input_pdf, work_dir, options=None) -> MinerUAdapterResult:
input_path = Path(input_pdf)
output_dir = Path(work_dir)
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / "raw.log").write_text("raw output", encoding="utf-8")
self.calls.append((input_path, output_dir, options))
asset_paths: tuple[Path, ...] = ()
if self.asset_name is not None:
asset_path = output_dir / "assets" / self.asset_name
asset_path.parent.mkdir(parents=True, exist_ok=True)
asset_path.write_bytes(b"asset")
asset_paths = (asset_path,)
return MinerUAdapterResult(
succeeded=self.succeeded,
command=("mineru", "-p", str(input_path), "-o", str(output_dir)),
input_pdf=input_path,
work_dir=output_dir,
raw_markdown=self.raw_markdown if self.succeeded else None,
raw_structured=self.raw_structured,
asset_paths=asset_paths,
warnings=self.warnings,
engine="MinerU",
engine_version="3.1.0",
engine_options=options.to_engine_options() if options is not None else {"strict_local": True},
exit_code=0 if self.succeeded else 2,
stdout="",
stderr="",
)
class SequencedAdapter:
def __init__(self, outcomes: tuple[bool, ...]) -> None:
self.outcomes = list(outcomes)
self.calls: list[Path] = []
def convert(self, input_pdf, work_dir, options=None) -> MinerUAdapterResult:
input_path = Path(input_pdf)
output_dir = Path(work_dir)
output_dir.mkdir(parents=True, exist_ok=True)
self.calls.append(input_path)
succeeded = self.outcomes.pop(0)
warning = WarningRecord(WarningCode.MINERU_CLI_FAILED, WarningSeverity.ERROR, "MinerU failed.")
return MinerUAdapterResult(
succeeded=succeeded,
command=("mineru", "-p", str(input_path), "-o", str(output_dir)),
input_pdf=input_path,
work_dir=output_dir,
raw_markdown=f"# {input_path.stem}\n" if succeeded else None,
raw_structured={"pages": 1},
asset_paths=(),
warnings=() if succeeded else (warning,),
engine="MinerU",
engine_version="3.1.0",
engine_options=options.to_engine_options() if options is not None else {"strict_local": True},
exit_code=0 if succeeded else 2,
stdout="",
stderr="",
)
class NestedMinerUAssetAdapter:
def convert(self, input_pdf, work_dir, options=None) -> MinerUAdapterResult:
input_path = Path(input_pdf)
output_dir = Path(work_dir)
asset_path = output_dir / "paper" / "hybrid_auto" / "images" / "fig.png"
asset_path.parent.mkdir(parents=True, exist_ok=True)
asset_path.write_bytes(b"nested asset")
return MinerUAdapterResult(
succeeded=True,
command=("mineru", "-p", str(input_path), "-o", str(output_dir)),
input_pdf=input_path,
work_dir=output_dir,
raw_markdown="![fig](images/fig.png)\n\n\\[x^2\\]\n",
raw_structured=[{"page_idx": 0}, {"page_idx": 12}],
asset_paths=(asset_path,),
warnings=(),
engine="MinerU",
engine_version="3.1.0",
engine_options=options.to_engine_options() if options is not None else {"strict_local": True},
exit_code=0,
stdout="",
stderr="",
)
def fixed_clock() -> datetime:
return datetime(2026, 5, 8, tzinfo=timezone.utc)
def make_pdf(tmp_path: Path, name: str = "paper.pdf") -> Path:
path = tmp_path / name
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(b"%PDF-1.7\nlocal fixture\n")
return path
def make_pdf_with_pages(tmp_path: Path, page_count: int, name: str = "paper.pdf") -> Path:
path = tmp_path / name
path.parent.mkdir(parents=True, exist_ok=True)
writer = PdfWriter()
for _ in range(page_count):
writer.add_blank_page(width=72, height=72)
with path.open("wb") as file:
writer.write(file)
return path
def test_convert_pdf_writes_markdown_metadata_report_and_assets(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
adapter = FakeAdapter(
raw_markdown="# Title\n\nInline \\(x_i\\)\n\n![fig](assets/fig.png)\n",
raw_structured={"pages": [{}, {}]},
asset_name="fig.png",
)
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, math_checker=lambda _: True, clock=fixed_clock)
assert result.succeeded is True
assert result.final_status == "success"
assert result.pages_processed == 2
assert result.warning_count == 0
assert result.engine == "MinerU"
assert result.engine_version == "3.1.0"
assert result.markdown_path.read_text(encoding="utf-8") == "# Title\n\nInline $x_i$\n\n![fig](paper.assets/fig.png)\n"
assert (tmp_path / "out" / "paper.assets" / "fig.png").read_bytes() == b"asset"
assert result.report_path.exists()
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
assert metadata["source_sha256"] == hashlib.sha256(pdf.read_bytes()).hexdigest()
assert metadata["created_at"] == "2026-05-08T00:00:00Z"
assert metadata["summary"]["pages_processed"] == 2
assert metadata["summary"]["inline_formula_count"] == 1
assert metadata["summary"]["asset_count"] == 1
assert metadata["assets"] == [{"relative_path": "paper.assets/fig.png"}]
assert "- Final status: `success`" in result.report_path.read_text(encoding="utf-8")
assert not adapter.calls[0][1].exists()
def test_convert_pdf_adapter_failure_returns_failed_result_without_fallback_or_outputs(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
warning = WarningRecord(WarningCode.MINERU_CLI_FAILED, WarningSeverity.ERROR, "MinerU failed.")
adapter = FakeAdapter(succeeded=False, warnings=(warning,))
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, clock=fixed_clock)
assert result.succeeded is False
assert result.final_status == "failed"
assert result.warnings == (warning,)
assert len(adapter.calls) == 1
assert not result.markdown_path.exists()
assert not result.report_path.exists()
def test_convert_pdf_respects_output_conflicts_and_overwrite(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
out = tmp_path / "out"
out.mkdir()
(out / "paper.md").write_text("old", encoding="utf-8")
with pytest.raises(OutputConflictError):
convert_pdf(pdf, out, adapter=FakeAdapter(), clock=fixed_clock)
result = convert_pdf(pdf, out, adapter=FakeAdapter(raw_markdown="new\n"), clock=fixed_clock, overwrite=True)
assert result.succeeded is True
assert result.markdown_path.read_text(encoding="utf-8") == "new\n"
def test_convert_pdf_can_skip_metadata_json_but_still_writes_report(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
result = convert_pdf(pdf, tmp_path / "out", metadata=False, adapter=FakeAdapter(), clock=fixed_clock)
assert result.metadata_path is None
assert result.markdown_path.exists()
assert result.report_path.exists()
assert not (tmp_path / "out" / "paper.metadata.json").exists()
report = result.report_path.read_text(encoding="utf-8")
assert "Metadata JSON:" not in report
assert "Report Markdown:" in report
def test_convert_pdf_records_math_checker_failures_in_metadata_and_report(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
adapter = FakeAdapter(raw_markdown="Inline \\(bad_math\\)\n")
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, math_checker=lambda _: False, clock=fixed_clock)
assert result.final_status == "partial"
assert [warning.code for warning in result.warnings] == [WarningCode.MATH_RENDER_FAILED]
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
assert metadata["summary"]["math_render_error_count"] == 1
assert metadata["warnings"][0]["code"] == "MATH_RENDER_FAILED"
report = result.report_path.read_text(encoding="utf-8")
assert "- Math render error count: 1" in report
assert "`MATH_RENDER_FAILED`" in report
def test_convert_pdf_repairs_math_render_failure_before_writing_outputs(tmp_path: Path) -> None:
class RepairAwareChecker:
def check_expressions(self, expressions):
return tuple(MathCheckResult(ok="{} ^ {t}" in expression.body) for expression in expressions)
pdf = make_pdf(tmp_path)
adapter = FakeAdapter(raw_markdown="\\[x ^ {i} ^ {t}\\]\n")
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, math_checker=RepairAwareChecker(), clock=fixed_clock)
assert result.final_status == "partial"
assert result.markdown_path.read_text(encoding="utf-8") == "$$\nx ^ {i} {} ^ {t}\n$$"
assert [warning.code for warning in result.warnings] == [WarningCode.MATH_RENDER_REPAIRED]
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
assert metadata["summary"]["math_render_error_count"] == 0
assert metadata["warnings"][0]["code"] == "MATH_RENDER_REPAIRED"
report = result.report_path.read_text(encoding="utf-8")
assert "- Math render error count: 0" in report
assert "`MATH_RENDER_REPAIRED`" in report
def test_recheck_markdown_regenerates_metadata_and_report_from_current_markdown(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
adapter = FakeAdapter(raw_markdown="Inline \\(bad_math\\)\n")
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, math_checker=lambda _: False, clock=fixed_clock)
result.markdown_path.write_text("Inline $x_i$\n", encoding="utf-8")
rechecked = recheck_markdown(result.markdown_path, math_checker=lambda _: True, clock=fixed_clock)
assert rechecked.final_status == "success"
assert rechecked.warning_count == 0
assert rechecked.markdown_path == result.markdown_path
assert rechecked.metadata_path == result.metadata_path
assert rechecked.report_path == result.report_path
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
assert metadata["source_sha256"] == hashlib.sha256(pdf.read_bytes()).hexdigest()
assert metadata["created_at"] == "2026-05-08T00:00:00Z"
assert metadata["summary"]["pages_processed"] == 1
assert metadata["summary"]["inline_formula_count"] == 1
assert metadata["summary"]["math_render_error_count"] == 0
assert metadata["summary"]["warning_count"] == 0
assert metadata["warnings"] == []
report = result.report_path.read_text(encoding="utf-8")
assert "- Final status: `success`" in report
assert "- Math render error count: 0" in report
assert "- None" in report
def test_recheck_markdown_repairs_math_render_failure(tmp_path: Path) -> None:
class RepairAwareChecker:
def check_expressions(self, expressions):
return tuple(MathCheckResult(ok="{} ^ {t}" in expression.body) for expression in expressions)
pdf = make_pdf(tmp_path)
adapter = FakeAdapter(raw_markdown="No formulas.\n")
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, math_checker=lambda _: True, clock=fixed_clock)
result.markdown_path.write_text("$$\nx ^ {i} ^ {t}\n$$\n", encoding="utf-8")
rechecked = recheck_markdown(result.markdown_path, math_checker=RepairAwareChecker(), clock=fixed_clock)
assert rechecked.markdown_path.read_text(encoding="utf-8") == "$$\nx ^ {i} {} ^ {t}\n$$\n"
assert [warning.code for warning in rechecked.warnings] == [WarningCode.MATH_RENDER_REPAIRED]
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
assert metadata["summary"]["math_render_error_count"] == 0
assert metadata["warnings"][0]["code"] == "MATH_RENDER_REPAIRED"
def test_convert_pdf_records_unavailable_math_checker_for_math_output(tmp_path: Path, monkeypatch) -> None:
pdf = make_pdf(tmp_path)
adapter = FakeAdapter(raw_markdown="Inline \\(x\\)\n")
monkeypatch.setattr(conversion_module, "create_default_math_checker", lambda: None)
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, clock=fixed_clock)
assert result.final_status == "partial"
assert result.warnings[0].code == WarningCode.MATH_RENDER_FAILED
assert result.warnings[0].severity == WarningSeverity.INFO
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
assert metadata["summary"]["warning_count"] == 1
assert metadata["summary"]["math_render_error_count"] == 0
report = result.report_path.read_text(encoding="utf-8")
assert "unavailable" in report
assert "- Math render error count: 0" in report
def test_convert_pdf_uses_default_math_checker_when_available(tmp_path: Path, monkeypatch) -> None:
class DefaultChecker:
def __init__(self) -> None:
self.bodies: list[str] = []
def check_expressions(self, expressions):
self.bodies = [expression.body for expression in expressions]
return (True,)
checker = DefaultChecker()
pdf = make_pdf(tmp_path)
adapter = FakeAdapter(raw_markdown="Inline \\(x\\)\n")
monkeypatch.setattr(conversion_module, "create_default_math_checker", lambda: checker)
result = convert_pdf(pdf, tmp_path / "out", adapter=adapter, clock=fixed_clock)
assert result.final_status == "success"
assert result.warning_count == 0
assert checker.bodies == ["x"]
def test_convert_pdf_keep_raw_preserves_adapter_work_directory(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
result = convert_pdf(pdf, tmp_path / "out", keep_raw=True, adapter=FakeAdapter(), clock=fixed_clock)
assert result.raw_dir == tmp_path / "out" / "paper.raw"
assert (result.raw_dir / "raw.log").read_text(encoding="utf-8") == "raw output"
def test_convert_pdf_rejects_disabling_strict_local(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
with pytest.raises(StrictLocalViolationError):
convert_pdf(pdf, tmp_path / "out", strict_local=False, adapter=FakeAdapter(), clock=fixed_clock)
def test_convert_pdf_passes_gpu_device_to_strict_local_options(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
adapter = FakeAdapter()
convert_pdf(pdf, tmp_path / "out", gpu="cuda:0", adapter=adapter, clock=fixed_clock)
assert adapter.calls[0][2].to_engine_options() == {"strict_local": True, "gpu_device": "cuda:0"}
def test_convert_pdf_defaults_to_cuda_zero(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
adapter = FakeAdapter()
convert_pdf(pdf, tmp_path / "out", adapter=adapter, clock=fixed_clock)
assert adapter.calls[0][2].to_engine_options() == {"strict_local": True, "gpu_device": "cuda:0"}
def test_convert_pdf_rewrites_nested_mineru_image_links_and_page_indexes(tmp_path: Path) -> None:
pdf = make_pdf(tmp_path)
result = convert_pdf(
pdf,
tmp_path / "out",
adapter=NestedMinerUAssetAdapter(),
math_checker=lambda _: True,
clock=fixed_clock,
)
assert result.final_status == "success"
assert result.pages_processed == 13
markdown = result.markdown_path.read_text(encoding="utf-8")
assert "![fig](paper.assets/paper/hybrid_auto/images/fig.png)" in markdown
assert "](images/fig.png)" not in markdown
copied_asset = tmp_path / "out" / "paper.assets" / "paper" / "hybrid_auto" / "images" / "fig.png"
assert copied_asset.read_bytes() == b"nested asset"
metadata = json.loads(result.metadata_path.read_text(encoding="utf-8"))
assert metadata["summary"]["pages_processed"] == 13
assert metadata["summary"]["warning_count"] == 0
def test_convert_input_batch_continues_after_per_file_failure(tmp_path: Path) -> None:
source = tmp_path / "pdfs"
make_pdf(source, "a.pdf")
make_pdf(source, "b.pdf")
make_pdf(source, "c.pdf")
adapter = SequencedAdapter((True, False, True))
batch = convert_input(source, tmp_path / "out", adapter=adapter, clock=fixed_clock)
assert [path.name for path in adapter.calls] == ["a.pdf", "b.pdf", "c.pdf"]
assert batch.converted_count == 2
assert batch.failed_count == 1
assert (tmp_path / "out" / "a.md").exists()
assert not (tmp_path / "out" / "b.md").exists()
assert (tmp_path / "out" / "c.md").exists()
def test_convert_pdf_chunk_mode_returns_batch_and_deletes_temporary_chunk_pdfs(tmp_path: Path) -> None:
pdf = make_pdf_with_pages(tmp_path, 41, "thesis.pdf")
adapter = FakeAdapter(raw_structured={"pages": 1})
batch = convert_pdf(
pdf,
tmp_path / "out",
adapter=adapter,
math_checker=lambda _: True,
chunk_pages=20,
clock=fixed_clock,
)
assert isinstance(batch, BatchConversionResult)
assert batch.converted_count == 3
assert [result.markdown_path.name for result in batch.results] == [
"thesis.part-001.pages-001-020.md",
"thesis.part-002.pages-021-040.md",
"thesis.part-003.pages-041-041.md",
]
assert [path.name for path, _, _ in adapter.calls] == [
"thesis.part-001.pages-001-020.pdf",
"thesis.part-002.pages-021-040.pdf",
"thesis.part-003.pages-041-041.pdf",
]
assert all(result.source_pdf == pdf.resolve() for result in batch.results)
assert all(not path.exists() for path, _, _ in adapter.calls)
metadata = json.loads((tmp_path / "out" / "thesis.part-002.pages-021-040.metadata.json").read_text(encoding="utf-8"))
assert metadata["source_pdf"] == str(pdf.resolve())
assert metadata["source_sha256"] == hashlib.sha256(pdf.read_bytes()).hexdigest()
assert metadata["engine_options"]["chunk"] == {
"chunk_index": 2,
"chunk_page_count": 20,
"chunk_pdf_name": "thesis.part-002.pages-021-040.pdf",
"original_source_pdf": str(pdf.resolve()),
"source_page_end": 40,
"source_page_start": 21,
"total_chunks": 3,
}
report = (tmp_path / "out" / "thesis.part-002.pages-021-040.report.md").read_text(encoding="utf-8")
assert "- Chunk: 2/3, source pages: 21-40" in report
def test_convert_pdf_chunk_mode_keeps_short_pdf_as_single_batch_result(tmp_path: Path) -> None:
pdf = make_pdf_with_pages(tmp_path, 3, "short.pdf")
adapter = FakeAdapter(raw_structured={"pages": 3})
batch = convert_pdf(pdf, tmp_path / "out", adapter=adapter, chunk_pages=20, clock=fixed_clock)
assert isinstance(batch, BatchConversionResult)
assert batch.converted_count == 1
assert batch.results[0].markdown_path.name == "short.md"
assert adapter.calls[0][0] == pdf.resolve()
assert adapter.calls[0][0].exists()
def test_convert_input_chunk_mode_continues_after_failed_chunk(tmp_path: Path) -> None:
pdf = make_pdf_with_pages(tmp_path, 41, "paper.pdf")
adapter = SequencedAdapter((True, False, True))
batch = convert_input(pdf, tmp_path / "out", adapter=adapter, chunk_pages=20, clock=fixed_clock)
assert batch.converted_count == 2
assert batch.failed_count == 1
assert [path.name for path in adapter.calls] == [
"paper.part-001.pages-001-020.pdf",
"paper.part-002.pages-021-040.pdf",
"paper.part-003.pages-041-041.pdf",
]
assert (tmp_path / "out" / "paper.part-001.pages-001-020.md").exists()
assert not (tmp_path / "out" / "paper.part-002.pages-021-040.md").exists()
assert (tmp_path / "out" / "paper.part-003.pages-041-041.md").exists()