add pdftomd

This commit is contained in:
김경종
2026-05-08 16:42:19 +09:00
parent 551ab50735
commit 88d6b92283
99 changed files with 47332 additions and 0 deletions
+785
View File
@@ -0,0 +1,785 @@
"""Conversion orchestration for local PDF-to-Markdown output."""
from __future__ import annotations
import hashlib
import json
import re
import shutil
import tempfile
from collections.abc import Callable
from dataclasses import dataclass, replace
from datetime import datetime, timezone
from pathlib import Path, PurePosixPath
from typing import Protocol
from pdf2md.ir import (
AssetRecord,
BlockRecord,
BlockType,
DocumentRecord,
PageRecord,
WarningCode,
WarningRecord,
WarningSeverity,
)
from pdf2md.markdown import normalize_markdown
from pdf2md.math_render import create_default_math_checker
from pdf2md.metadata import build_metadata
from pdf2md.mineru_adapter import (
ENGINE_NAME,
MinerUAdapter,
MinerUAdapterResult,
MinerUOptions,
StrictLocalViolationError,
)
from pdf2md.paths import DiscoveredPdf, PathLike, PlannedOutput, discover_pdfs, plan_outputs
from pdf2md.pdf_splitter import PdfChunkPlan, plan_pdf_chunks, write_pdf_chunk
from pdf2md.quality import MathChecker, QualityResult, check_asset_links, check_math_renderability, merge_quality_results
from pdf2md.report import FinalStatus, determine_final_status, render_report
Clock = Callable[[], datetime]
DEFAULT_GPU_DEVICE = "cuda:0"
DEFAULT_CHUNK_PAGES = 20
class ConversionAdapter(Protocol):
def convert(self, input_pdf: PathLike, work_dir: PathLike, options: MinerUOptions | None = None) -> MinerUAdapterResult:
"""Run the conversion engine into a local work directory."""
@dataclass(frozen=True)
class ConversionResult:
source_pdf: Path
markdown_path: Path
metadata_path: Path | None
report_path: Path
assets_dir: Path
raw_dir: Path | None
engine: str
engine_version: str
final_status: FinalStatus
warning_count: int
warnings: tuple[WarningRecord, ...]
pages_processed: int
@property
def succeeded(self) -> bool:
return self.final_status != "failed"
@dataclass(frozen=True)
class BatchConversionResult:
results: tuple[ConversionResult, ...]
@property
def converted_count(self) -> int:
return sum(result.succeeded for result in self.results)
@property
def failed_count(self) -> int:
return sum(not result.succeeded for result in self.results)
@property
def warning_count(self) -> int:
return sum(result.warning_count for result in self.results)
@dataclass(frozen=True)
class _AssetMaterialization:
records: tuple[AssetRecord, ...]
warnings: tuple[WarningRecord, ...]
link_map: dict[str, str]
@dataclass(frozen=True)
class _ConversionTask:
output_plan: PlannedOutput
chunk_plan: PdfChunkPlan | None = None
original_source_pdf: Path | None = None
original_source_sha256: str | None = None
_IMAGE_LINK_RE = re.compile(r"!\[(?P<alt>[^\]\n]*)\]\((?P<target>[^)\n]+)\)")
_DISPLAY_MATH_RE = re.compile(r"(?<!\\)\$\$(?P<body>.*?)(?<!\\)\$\$", re.DOTALL)
_INLINE_MATH_RE = re.compile(r"(?<!\\)\$(?P<body>[^\n$]+?)(?<!\\)\$")
def convert_pdf(
input_path: PathLike,
output_dir: PathLike,
*,
metadata: bool = True,
keep_raw: bool = False,
overwrite: bool = False,
gpu: str | None = DEFAULT_GPU_DEVICE,
strict_local: bool = True,
adapter: ConversionAdapter | None = None,
math_checker: MathChecker | None = None,
chunk_pages: int | None = None,
clock: Clock | None = None,
) -> ConversionResult | BatchConversionResult:
"""Convert one local PDF into Markdown, metadata, and report outputs."""
_raise_if_strict_local_disabled(strict_local)
candidate = Path(input_path).expanduser()
if candidate.exists() and not candidate.is_file():
raise ValueError("convert_pdf requires a PDF file input")
discovered = discover_pdfs(input_path, recursive=False)
if len(discovered) != 1:
raise ValueError("convert_pdf requires a single PDF input")
engine = adapter or MinerUAdapter()
now = clock or _utc_now
if chunk_pages is None:
plan = plan_outputs(discovered, output_dir, metadata=metadata, keep_raw=keep_raw, overwrite=overwrite)[0]
return _convert_plan(
plan,
adapter=engine,
clock=now,
metadata_enabled=metadata,
keep_raw=keep_raw,
overwrite=overwrite,
gpu=gpu,
strict_local=strict_local,
math_checker=math_checker,
)
tasks = _plan_conversion_tasks(
discovered,
output_dir,
metadata=metadata,
keep_raw=keep_raw,
overwrite=overwrite,
chunk_pages=chunk_pages,
)
return BatchConversionResult(
_convert_tasks(
tasks,
adapter=engine,
clock=now,
metadata_enabled=metadata,
keep_raw=keep_raw,
overwrite=overwrite,
gpu=gpu,
strict_local=strict_local,
math_checker=math_checker,
)
)
def convert_input(
input_path: PathLike,
output_dir: PathLike,
*,
metadata: bool = True,
keep_raw: bool = False,
recursive: bool = False,
overwrite: bool = False,
gpu: str | None = DEFAULT_GPU_DEVICE,
strict_local: bool = True,
adapter: ConversionAdapter | None = None,
math_checker: MathChecker | None = None,
chunk_pages: int | None = None,
clock: Clock | None = None,
) -> BatchConversionResult:
"""Convert a local PDF or directory of PDFs."""
_raise_if_strict_local_disabled(strict_local)
discovered = discover_pdfs(input_path, recursive=recursive)
tasks = _plan_conversion_tasks(
discovered,
output_dir,
metadata=metadata,
keep_raw=keep_raw,
overwrite=overwrite,
chunk_pages=chunk_pages,
)
engine = adapter or MinerUAdapter()
now = clock or _utc_now
return BatchConversionResult(
_convert_tasks(
tasks,
adapter=engine,
clock=now,
metadata_enabled=metadata,
keep_raw=keep_raw,
overwrite=overwrite,
gpu=gpu,
strict_local=strict_local,
math_checker=math_checker,
)
)
def _plan_conversion_tasks(
discovered: tuple[DiscoveredPdf, ...],
output_dir: PathLike,
*,
metadata: bool,
keep_raw: bool,
overwrite: bool,
chunk_pages: int | None,
) -> tuple[_ConversionTask, ...]:
if chunk_pages is None:
plans = plan_outputs(discovered, output_dir, metadata=metadata, keep_raw=keep_raw, overwrite=overwrite)
return tuple(_ConversionTask(output_plan=plan) for plan in plans)
if not isinstance(chunk_pages, int) or chunk_pages < 1:
raise ValueError("chunk_pages must be a positive integer")
planned_inputs: list[DiscoveredPdf] = []
chunk_plans: list[PdfChunkPlan | None] = []
original_sources: list[Path | None] = []
source_hashes: dict[Path, str] = {}
for item in discovered:
chunks = plan_pdf_chunks(item.source_path, chunk_pages=chunk_pages)
if len(chunks) == 1:
planned_inputs.append(item)
chunk_plans.append(None)
original_sources.append(None)
continue
source_hashes[item.source_path] = _sha256(item.source_path)
for chunk in chunks:
planned_inputs.append(
DiscoveredPdf(
source_path=item.source_path.with_name(chunk.output_filename),
relative_parent=item.relative_parent,
)
)
chunk_plans.append(chunk)
original_sources.append(item.source_path)
plans = plan_outputs(planned_inputs, output_dir, metadata=metadata, keep_raw=keep_raw, overwrite=overwrite)
return tuple(
_ConversionTask(
output_plan=plan,
chunk_plan=chunk,
original_source_pdf=original,
original_source_sha256=source_hashes[original] if original is not None else None,
)
for plan, chunk, original in zip(plans, chunk_plans, original_sources, strict=True)
)
def _convert_tasks(
tasks: tuple[_ConversionTask, ...],
*,
adapter: ConversionAdapter,
clock: Clock,
metadata_enabled: bool,
keep_raw: bool,
overwrite: bool,
gpu: str | None,
strict_local: bool,
math_checker: MathChecker | None,
) -> tuple[ConversionResult, ...]:
if any(task.chunk_plan is not None for task in tasks):
with tempfile.TemporaryDirectory(prefix="pdf2md.chunks.") as chunk_directory:
return tuple(
_convert_task(
task,
chunk_directory=Path(chunk_directory),
adapter=adapter,
clock=clock,
metadata_enabled=metadata_enabled,
keep_raw=keep_raw,
overwrite=overwrite,
gpu=gpu,
strict_local=strict_local,
math_checker=math_checker,
)
for task in tasks
)
return tuple(
_convert_task(
task,
chunk_directory=None,
adapter=adapter,
clock=clock,
metadata_enabled=metadata_enabled,
keep_raw=keep_raw,
overwrite=overwrite,
gpu=gpu,
strict_local=strict_local,
math_checker=math_checker,
)
for task in tasks
)
def _convert_task(
task: _ConversionTask,
*,
chunk_directory: Path | None,
adapter: ConversionAdapter,
clock: Clock,
metadata_enabled: bool,
keep_raw: bool,
overwrite: bool,
gpu: str | None,
strict_local: bool,
math_checker: MathChecker | None,
) -> ConversionResult:
if task.chunk_plan is None:
return _convert_plan(
task.output_plan,
adapter=adapter,
clock=clock,
metadata_enabled=metadata_enabled,
keep_raw=keep_raw,
overwrite=overwrite,
gpu=gpu,
strict_local=strict_local,
math_checker=math_checker,
)
if chunk_directory is None:
raise ValueError("chunk directory is required for chunked conversion")
chunk_pdf = write_pdf_chunk(task.chunk_plan, chunk_directory / task.chunk_plan.output_filename)
chunk_output_plan = replace(task.output_plan, source_pdf=chunk_pdf)
return _convert_plan(
chunk_output_plan,
adapter=adapter,
clock=clock,
metadata_enabled=metadata_enabled,
keep_raw=keep_raw,
overwrite=overwrite,
gpu=gpu,
strict_local=strict_local,
math_checker=math_checker,
result_source_pdf=task.original_source_pdf,
metadata_source_pdf=task.original_source_pdf,
metadata_source_sha256=task.original_source_sha256,
engine_options_extra={"chunk": task.chunk_plan.metadata()},
)
def _convert_plan(
plan: PlannedOutput,
*,
adapter: ConversionAdapter,
clock: Clock,
metadata_enabled: bool,
keep_raw: bool,
overwrite: bool,
gpu: str | None,
strict_local: bool,
math_checker: MathChecker | None,
result_source_pdf: Path | None = None,
metadata_source_pdf: Path | None = None,
metadata_source_sha256: str | None = None,
engine_options_extra: dict[str, object] | None = None,
) -> ConversionResult:
if overwrite:
_clear_planned_outputs(plan)
plan.markdown_path.parent.mkdir(parents=True, exist_ok=True)
options = MinerUOptions(strict_local=strict_local, gpu_device=gpu)
if keep_raw:
if plan.raw_dir is None:
raise ValueError("raw output directory is required when keep_raw is enabled")
plan.raw_dir.mkdir(parents=True, exist_ok=True)
return _convert_in_work_dir(
plan,
plan.raw_dir,
adapter,
options,
clock,
metadata_enabled,
math_checker,
result_source_pdf=result_source_pdf,
metadata_source_pdf=metadata_source_pdf,
metadata_source_sha256=metadata_source_sha256,
engine_options_extra=engine_options_extra,
)
with tempfile.TemporaryDirectory(prefix=f"{plan.source_pdf.stem}.", dir=plan.markdown_path.parent) as temporary_dir:
return _convert_in_work_dir(
plan,
Path(temporary_dir),
adapter,
options,
clock,
metadata_enabled,
math_checker,
result_source_pdf=result_source_pdf,
metadata_source_pdf=metadata_source_pdf,
metadata_source_sha256=metadata_source_sha256,
engine_options_extra=engine_options_extra,
)
def _convert_in_work_dir(
plan: PlannedOutput,
work_dir: Path,
adapter: ConversionAdapter,
options: MinerUOptions,
clock: Clock,
metadata_enabled: bool,
math_checker: MathChecker | None,
result_source_pdf: Path | None = None,
metadata_source_pdf: Path | None = None,
metadata_source_sha256: str | None = None,
engine_options_extra: dict[str, object] | None = None,
) -> ConversionResult:
result_source = result_source_pdf or plan.source_pdf
metadata_source = metadata_source_pdf or result_source
try:
adapter_result = adapter.convert(plan.source_pdf, work_dir, options)
except StrictLocalViolationError as error:
return _failed_result(plan, warnings=(error.warning,), source_pdf=result_source)
engine = adapter_result.engine or ENGINE_NAME
engine_version = adapter_result.engine_version or "unknown"
if not adapter_result.succeeded:
return _failed_result(
plan,
warnings=adapter_result.warnings,
engine=engine,
engine_version=engine_version,
source_pdf=result_source,
)
if adapter_result.raw_markdown is None:
warning = WarningRecord(
WarningCode.MINERU_CLI_FAILED,
WarningSeverity.ERROR,
"MinerU produced structured output but no Markdown; no fallback engine was used.",
)
return _failed_result(
plan,
warnings=adapter_result.warnings + (warning,),
engine=engine,
engine_version=engine_version,
source_pdf=result_source,
)
assets = _materialize_assets(adapter_result.asset_paths, work_dir, plan.assets_dir)
markdown_source = _rewrite_asset_links(adapter_result.raw_markdown, assets.link_map)
normalized = normalize_markdown(
markdown_source,
markdown_dir=plan.markdown_path.parent,
asset_root=plan.assets_dir,
check_assets=False,
)
quality = _run_quality_checks(
normalized.markdown,
markdown_dir=plan.markdown_path.parent,
asset_root=plan.assets_dir,
math_checker=math_checker,
)
warnings = adapter_result.warnings + assets.warnings + normalized.warnings + quality.warnings
document = _build_document(
source_pdf=metadata_source,
markdown=normalized.markdown,
assets=assets.records,
warnings=warnings,
raw_structured=adapter_result.raw_structured,
)
engine_options = dict(adapter_result.engine_options)
if engine_options_extra:
engine_options.update(engine_options_extra)
metadata_data = build_metadata(
document=document,
source_sha256=metadata_source_sha256 or _sha256(metadata_source),
created_at=_format_timestamp(clock()),
engine=engine,
engine_version=engine_version,
engine_options=engine_options,
)
report_quality = QualityResult(
missing_asset_link_count=quality.missing_asset_link_count,
invalid_asset_link_count=quality.invalid_asset_link_count,
)
report_text = render_report(
metadata_data,
quality=report_quality,
markdown_path=plan.markdown_path,
metadata_path=plan.metadata_path if metadata_enabled else None,
report_path=plan.report_path,
)
final_status = determine_final_status(metadata_data, report_quality)
_write_text(plan.markdown_path, normalized.markdown)
if metadata_enabled and plan.metadata_path is not None:
_write_text(plan.metadata_path, json.dumps(metadata_data, indent=2, ensure_ascii=False, sort_keys=True) + "\n")
_write_text(plan.report_path, report_text)
return ConversionResult(
source_pdf=result_source,
markdown_path=plan.markdown_path,
metadata_path=plan.metadata_path if metadata_enabled else None,
report_path=plan.report_path,
assets_dir=plan.assets_dir,
raw_dir=plan.raw_dir,
engine=engine,
engine_version=engine_version,
final_status=final_status,
warning_count=len(warnings),
warnings=warnings,
pages_processed=int(metadata_data["summary"]["pages_processed"]),
)
def _materialize_assets(asset_paths: tuple[Path, ...], work_dir: Path, assets_dir: Path) -> _AssetMaterialization:
records: list[AssetRecord] = []
warnings: list[WarningRecord] = []
link_map: dict[str, str] = {}
copied: set[str] = set()
work_root = work_dir.resolve()
for source in asset_paths:
source_path = Path(source)
if not source_path.exists() or not source_path.is_file():
warnings.append(_warning(WarningCode.ASSET_LINK_MISSING, f"Adapter asset file does not exist: {source_path}"))
continue
try:
source_relative = source_path.resolve().relative_to(work_root)
except ValueError:
warnings.append(_warning(WarningCode.ASSET_LINK_INVALID, f"Adapter asset path is outside the work directory: {source_path}"))
continue
destination_relative = _destination_asset_relative(source_relative)
destination = assets_dir / destination_relative
try:
destination.resolve(strict=False).relative_to(assets_dir.resolve(strict=False))
except ValueError:
warnings.append(_warning(WarningCode.ASSET_LINK_INVALID, f"Adapter asset destination is outside the assets directory: {source_path}"))
continue
destination_key = destination_relative.as_posix()
if destination_key in copied:
warnings.append(_warning(WarningCode.ASSET_LINK_INVALID, f"Duplicate adapter asset destination was skipped: {destination_key}"))
continue
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source_path, destination)
copied.add(destination_key)
final_link = PurePosixPath(assets_dir.name, destination_relative).as_posix()
records.append(AssetRecord(final_link))
_add_asset_link_keys(link_map, source_path, source_relative, destination_relative, final_link)
return _AssetMaterialization(records=tuple(records), warnings=tuple(warnings), link_map=link_map)
def _destination_asset_relative(source_relative: Path) -> PurePosixPath:
parts = PurePosixPath(source_relative.as_posix()).parts
if len(parts) > 1 and parts[0].casefold() in {"asset", "assets", "image", "images"}:
parts = parts[1:]
return PurePosixPath(*parts)
def _add_asset_link_keys(
link_map: dict[str, str],
source_path: Path,
source_relative: Path,
destination_relative: PurePosixPath,
final_link: str,
) -> None:
keys = {
source_relative.as_posix(),
destination_relative.as_posix(),
source_path.name,
str(source_path),
source_path.as_posix(),
}
keys.update(_asset_link_suffixes(source_relative))
keys.update(_asset_link_suffixes(destination_relative))
for key in keys:
link_map[key.replace("\\", "/")] = final_link
def _asset_link_suffixes(path: Path | PurePosixPath) -> set[str]:
parts = PurePosixPath(path.as_posix()).parts
suffixes: set[str] = set()
for index, part in enumerate(parts):
if part.casefold() in {"asset", "assets", "image", "images"} and index + 1 < len(parts):
suffixes.add(PurePosixPath(*parts[index:]).as_posix())
return suffixes
def _rewrite_asset_links(markdown: str, link_map: dict[str, str]) -> str:
if not link_map:
return markdown
def replace(match: re.Match[str]) -> str:
alt = match.group("alt")
target = match.group("target").strip()
unwrapped = _unwrap_angle_target(target).replace("\\", "/")
replacement = link_map.get(unwrapped)
if replacement is None:
return match.group(0)
return f"![{alt}]({replacement})"
return _IMAGE_LINK_RE.sub(replace, markdown)
def _build_document(
*,
source_pdf: Path,
markdown: str,
assets: tuple[AssetRecord, ...],
warnings: tuple[WarningRecord, ...],
raw_structured: object | None,
) -> DocumentRecord:
page_count = _page_count(raw_structured)
blocks = _formula_blocks(markdown)
pages = [
PageRecord(page_index=page_index, blocks=blocks if page_index == 0 else ())
for page_index in range(page_count)
]
return DocumentRecord(source_pdf=source_pdf, pages=tuple(pages), assets=assets, warnings=warnings)
def _run_quality_checks(
markdown: str,
*,
markdown_dir: Path,
asset_root: Path,
math_checker: MathChecker | None,
) -> QualityResult:
asset_quality = check_asset_links(markdown, markdown_dir=markdown_dir, asset_root=asset_root)
if not _has_math(markdown):
return asset_quality
if math_checker is None:
math_checker = create_default_math_checker()
math_quality = check_math_renderability(markdown, math_checker)
return merge_quality_results(asset_quality, math_quality)
def _has_math(markdown: str) -> bool:
return _DISPLAY_MATH_RE.search(markdown) is not None or _INLINE_MATH_RE.search(markdown) is not None
def _formula_blocks(markdown: str) -> tuple[BlockRecord, ...]:
blocks: list[BlockRecord] = []
display_spans: list[tuple[int, int]] = []
for match in _DISPLAY_MATH_RE.finditer(markdown):
display_spans.append(match.span())
blocks.append(BlockRecord(BlockType.DISPLAY_FORMULA, page_index=0, markdown_span=match.span()))
inline_parts: list[tuple[int, str]] = []
cursor = 0
for start, end in display_spans:
inline_parts.append((cursor, markdown[cursor:start]))
cursor = end
inline_parts.append((cursor, markdown[cursor:]))
for offset, part in inline_parts:
for match in _INLINE_MATH_RE.finditer(part):
body = match.group("body").strip()
if body and not body[0].isdigit():
start = offset + match.start()
end = offset + match.end()
blocks.append(BlockRecord(BlockType.INLINE_FORMULA, page_index=0, markdown_span=(start, end)))
return tuple(blocks) or (BlockRecord(BlockType.PARAGRAPH, page_index=0),)
def _page_count(raw_structured: object | None) -> int:
if isinstance(raw_structured, dict):
pages = raw_structured.get("pages")
if isinstance(pages, list):
return max(1, len(pages))
if isinstance(pages, int):
return max(1, pages)
if isinstance(pages, dict):
return max(1, len(pages))
pdf_info = raw_structured.get("pdf_info")
if isinstance(pdf_info, list):
return max(1, len(pdf_info))
page_info = raw_structured.get("page_info")
if isinstance(page_info, list):
return max(1, len(page_info))
page_indexes = tuple(_page_indexes(raw_structured))
if page_indexes:
return max(1, max(page_indexes) + 1)
return 1
def _page_indexes(value: object) -> tuple[int, ...]:
indexes: list[int] = []
if isinstance(value, dict):
for key in ("page_idx", "page_index"):
page_value = value.get(key)
if isinstance(page_value, int) and page_value >= 0:
indexes.append(page_value)
for item in value.values():
indexes.extend(_page_indexes(item))
elif isinstance(value, list):
for item in value:
indexes.extend(_page_indexes(item))
return tuple(indexes)
def _failed_result(
plan: PlannedOutput,
*,
warnings: tuple[WarningRecord, ...],
engine: str = ENGINE_NAME,
engine_version: str = "unknown",
source_pdf: Path | None = None,
) -> ConversionResult:
return ConversionResult(
source_pdf=source_pdf or plan.source_pdf,
markdown_path=plan.markdown_path,
metadata_path=plan.metadata_path,
report_path=plan.report_path,
assets_dir=plan.assets_dir,
raw_dir=plan.raw_dir,
engine=engine,
engine_version=engine_version,
final_status="failed",
warning_count=len(warnings),
warnings=warnings,
pages_processed=0,
)
def _clear_planned_outputs(plan: PlannedOutput) -> None:
for path in plan.planned_paths():
if path.is_dir():
shutil.rmtree(path)
elif path.exists():
path.unlink()
def _write_text(path: Path, text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")
def _sha256(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as file:
for chunk in iter(lambda: file.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def _format_timestamp(value: datetime) -> str:
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
def _unwrap_angle_target(target: str) -> str:
if target.startswith("<") and target.endswith(">"):
return target[1:-1].strip()
return target
def _warning(code: WarningCode, message: str) -> WarningRecord:
return WarningRecord(code, WarningSeverity.WARNING, message)
def _raise_if_strict_local_disabled(strict_local: bool) -> None:
if not strict_local:
raise StrictLocalViolationError("strict-local execution cannot be disabled in v1.")