Add Markdown recheck command

This commit is contained in:
NINI
2026-05-11 01:00:26 +09:00
parent b69c03c206
commit 03927a26a1
9 changed files with 276 additions and 11 deletions
+177 -1
View File
@@ -11,7 +11,7 @@ from collections.abc import Callable
from dataclasses import dataclass, replace
from datetime import datetime, timezone
from pathlib import Path, PurePosixPath
from typing import Protocol
from typing import Any, Protocol
from pdf2md.ir import (
AssetRecord,
@@ -104,6 +104,13 @@ class _ConversionTask:
_IMAGE_LINK_RE = re.compile(r"!\[(?P<alt>[^\]\n]*)\]\((?P<target>[^)\n]+)\)")
_DISPLAY_MATH_RE = re.compile(r"(?<!\\)\$\$(?P<body>.*?)(?<!\\)\$\$", re.DOTALL)
_INLINE_MATH_RE = re.compile(r"(?<!\\)\$(?P<body>[^\n$]+?)(?<!\\)\$")
_RECHECKED_WARNING_CODES = frozenset(
{
WarningCode.MATH_RENDER_FAILED,
WarningCode.ASSET_LINK_MISSING,
WarningCode.ASSET_LINK_INVALID,
}
)
def convert_pdf(
@@ -212,6 +219,175 @@ def convert_input(
)
def recheck_markdown(
markdown_path: PathLike,
*,
math_checker: MathChecker | None = None,
clock: Clock | None = None,
) -> ConversionResult:
"""Re-run local quality checks for an existing Markdown output and rewrite metadata/report."""
markdown_file = Path(markdown_path).expanduser().resolve()
if not markdown_file.is_file():
raise ValueError(f"Markdown output does not exist: {markdown_file}")
metadata_path = markdown_file.with_suffix(".metadata.json")
report_path = markdown_file.with_suffix(".report.md")
if not metadata_path.is_file():
raise ValueError(f"Existing metadata JSON is required for recheck: {metadata_path}")
existing_metadata = _read_metadata_json(metadata_path)
markdown = markdown_file.read_text(encoding="utf-8")
assets_dir = markdown_file.with_suffix(".assets")
assets = _assets_from_metadata(existing_metadata)
quality = _run_quality_checks(
markdown,
markdown_dir=markdown_file.parent,
asset_root=assets_dir,
math_checker=math_checker,
)
warnings = _preserved_metadata_warnings(existing_metadata) + quality.warnings
document = _build_document(
source_pdf=Path(_metadata_text(existing_metadata, "source_pdf")),
markdown=markdown,
assets=assets,
warnings=warnings,
raw_structured={"pages": [None] * _metadata_page_count(existing_metadata)},
)
now = clock or _utc_now
metadata_data = build_metadata(
document=document,
source_sha256=_metadata_text(existing_metadata, "source_sha256"),
created_at=_format_timestamp(now()),
engine=_metadata_text(existing_metadata, "engine"),
engine_version=_metadata_text(existing_metadata, "engine_version"),
engine_options=_metadata_engine_options(existing_metadata),
)
report_quality = QualityResult(
missing_asset_link_count=quality.missing_asset_link_count,
invalid_asset_link_count=quality.invalid_asset_link_count,
)
report_text = render_report(
metadata_data,
quality=report_quality,
markdown_path=markdown_file,
metadata_path=metadata_path,
report_path=report_path,
)
final_status = determine_final_status(metadata_data, report_quality)
_write_text(metadata_path, json.dumps(metadata_data, indent=2, ensure_ascii=False, sort_keys=True) + "\n")
_write_text(report_path, report_text)
return ConversionResult(
source_pdf=Path(_metadata_text(metadata_data, "source_pdf")),
markdown_path=markdown_file,
metadata_path=metadata_path,
report_path=report_path,
assets_dir=assets_dir,
raw_dir=None,
engine=_metadata_text(metadata_data, "engine"),
engine_version=_metadata_text(metadata_data, "engine_version"),
final_status=final_status,
warning_count=len(warnings),
warnings=warnings,
pages_processed=int(metadata_data["summary"]["pages_processed"]),
)
def _read_metadata_json(path: Path) -> dict[str, Any]:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"metadata JSON must contain an object: {path}")
return data
def _assets_from_metadata(metadata: dict[str, Any]) -> tuple[AssetRecord, ...]:
raw_assets = metadata.get("assets", ())
if not isinstance(raw_assets, list):
return ()
assets: list[AssetRecord] = []
for item in raw_assets:
if not isinstance(item, dict):
continue
relative_path = item.get("relative_path")
if not isinstance(relative_path, str) or not relative_path:
continue
assets.append(
AssetRecord(
relative_path,
page_index=_optional_page_index(item.get("page_index")),
bbox=_optional_bbox(item.get("bbox")),
)
)
return tuple(assets)
def _preserved_metadata_warnings(metadata: dict[str, Any]) -> tuple[WarningRecord, ...]:
raw_warnings = metadata.get("warnings", ())
if not isinstance(raw_warnings, list):
return ()
warnings: list[WarningRecord] = []
for item in raw_warnings:
if not isinstance(item, dict):
continue
warning = _warning_from_metadata(item)
if warning is not None and warning.code not in _RECHECKED_WARNING_CODES:
warnings.append(warning)
return tuple(warnings)
def _warning_from_metadata(item: dict[str, Any]) -> WarningRecord | None:
code = item.get("code")
severity = item.get("severity")
message = item.get("message")
if not isinstance(code, str) or not isinstance(severity, str) or not isinstance(message, str) or not message:
return None
return WarningRecord(
WarningCode(code),
WarningSeverity(severity),
message,
page_index=_optional_page_index(item.get("page_index")),
bbox=_optional_bbox(item.get("bbox")),
)
def _metadata_text(metadata: dict[str, Any], field_name: str) -> str:
value = metadata.get(field_name)
if not isinstance(value, str) or not value:
raise ValueError(f"metadata field is required: {field_name}")
return value
def _metadata_engine_options(metadata: dict[str, Any]) -> dict[str, Any]:
value = metadata.get("engine_options", {})
return dict(value) if isinstance(value, dict) else {}
def _metadata_page_count(metadata: dict[str, Any]) -> int:
pages = metadata.get("pages")
if isinstance(pages, list) and pages:
return len(pages)
summary = metadata.get("summary")
if isinstance(summary, dict):
pages_processed = summary.get("pages_processed")
if isinstance(pages_processed, int) and pages_processed > 0:
return pages_processed
return 1
def _optional_page_index(value: object) -> int | None:
return value if isinstance(value, int) and value >= 0 else None
def _optional_bbox(value: object) -> tuple[float, float, float, float] | None:
if not isinstance(value, list | tuple) or len(value) != 4:
return None
if not all(isinstance(part, int | float) for part in value):
return None
return tuple(float(part) for part in value)
def _plan_conversion_tasks(
discovered: tuple[DiscoveredPdf, ...],
output_dir: PathLike,