Files
PDFToMD/src/pdf2md/doctor.py
T
2026-05-14 10:16:59 +09:00

495 lines
17 KiB
Python

"""Local setup diagnostics for pdf2md."""
from __future__ import annotations
import importlib
import os
import re
import shutil
import subprocess
import sys
from collections.abc import Callable, Mapping
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal, Protocol
from pdf2md.gpu import NVIDIA_SMI_QUERY, GpuInfo, parse_nvidia_smi_gpus, select_gpu
from pdf2md.math_render import default_mathjax_helper_path
from pdf2md.mineru_adapter import CommandResult, MinerUAdapter, MinerUVersionResult
from pdf2md.mineru_profile import resolve_mineru_profile
DoctorStatus = Literal["pass", "warn", "fail"]
CommandRunner = Callable[[tuple[str, ...]], "DoctorCommandResult"]
Which = Callable[[str], str | None]
ImportModule = Callable[[str], Any]
PathExists = Callable[[Path], bool]
TARGET_PYTHON = (3, 12)
TARGET_MINERU_VERSION = "3.1.0"
MODEL_CACHE_ENV_VARS = (
"MINERU_MODEL_SOURCE",
"MINERU_MODEL_DIR",
"MINERU_CACHE_DIR",
"MINERU_TOOLS_CONFIG_JSON",
"HF_HOME",
"HUGGINGFACE_HUB_CACHE",
"MODELSCOPE_CACHE",
)
class MinerUProbe(Protocol):
def version(self) -> MinerUVersionResult:
"""Return the direct local MinerU CLI version result."""
@dataclass(frozen=True)
class DoctorCommandResult:
command: tuple[str, ...]
exit_code: int
stdout: str = ""
stderr: str = ""
@dataclass(frozen=True)
class DoctorCheck:
name: str
status: DoctorStatus
message: str
details: tuple[str, ...] = ()
@dataclass(frozen=True)
class DoctorReport:
checks: tuple[DoctorCheck, ...]
@property
def status(self) -> DoctorStatus:
if any(check.status == "fail" for check in self.checks):
return "fail"
if any(check.status == "warn" for check in self.checks):
return "warn"
return "pass"
@property
def exit_code(self) -> int:
return 1 if self.status == "fail" else 0
def run_doctor(
*,
python_version: tuple[int, int, int] | None = None,
which: Which = shutil.which,
run_command: CommandRunner | None = None,
import_module: ImportModule = importlib.import_module,
env: Mapping[str, str] | None = None,
path_exists: PathExists | None = None,
home: Path | None = None,
mineru_probe: MinerUProbe | None = None,
) -> DoctorReport:
"""Run ordered local setup checks without installing or downloading anything."""
runner = run_command or _run_command
environment = os.environ if env is None else env
exists = path_exists or (lambda path: path.exists())
version = python_version or sys.version_info[:3]
home_path = home if home is not None else Path.home()
probe = mineru_probe or _default_mineru_probe(which, runner)
checks = (
_check_python(version),
_check_uv(which, runner),
_check_mineru(probe),
_check_gpu(which, runner),
_check_pytorch(import_module),
_check_model_cache(environment, exists, home_path),
_check_mathjax_checker(which, runner, exists),
_check_local_only_policy(),
)
return DoctorReport(checks=checks)
def format_doctor_report(report: DoctorReport) -> str:
lines = [f"Doctor status: {report.status.upper()}"]
for check in report.checks:
lines.append(f"[{check.status.upper()}] {check.name}: {check.message}")
for detail in check.details:
lines.append(f" - {detail}")
return "\n".join(lines)
def _check_python(version: tuple[int, int, int]) -> DoctorCheck:
version_text = ".".join(str(part) for part in version)
if version[:2] == TARGET_PYTHON:
return DoctorCheck("python", "pass", f"Python {version_text} is supported.")
return DoctorCheck(
"python",
"fail",
f"Python {version_text} is unsupported; use Python 3.12.x.",
)
def _check_uv(which: Which, run_command: CommandRunner) -> DoctorCheck:
uv_path = which("uv")
if uv_path is None:
return DoctorCheck(
"uv",
"fail",
"uv executable was not found on PATH.",
("Windows per-user uv installs commonly use C:\\Users\\user\\.local\\bin.",),
)
result = run_command(("uv", "--version"))
version_text = _first_non_empty_line(result.stdout) or _first_non_empty_line(result.stderr)
if result.exit_code != 0:
return DoctorCheck(
"uv",
"warn",
"uv was found, but `uv --version` failed.",
(f"path: {uv_path}", f"exit code: {result.exit_code}", _trim_detail(result.stderr)),
)
if version_text is None:
return DoctorCheck("uv", "warn", "uv was found, but no version text was reported.", (f"path: {uv_path}",))
return DoctorCheck("uv", "pass", version_text, (f"path: {uv_path}",))
def _check_mineru(probe: MinerUProbe) -> DoctorCheck:
result = probe.version()
command_detail = f"command: {' '.join(result.command)}"
if not result.available:
return DoctorCheck("mineru", "fail", "MinerU CLI executable was not found.", (command_detail,))
warning_details = tuple(warning.message for warning in result.warnings)
if result.version is None:
details = (command_detail, f"exit code: {result.exit_code}", *warning_details, _trim_detail(result.stderr))
return DoctorCheck("mineru", "warn", "MinerU CLI is available, but version could not be detected.", details)
if not _has_target_mineru_version(result.version):
return DoctorCheck(
"mineru",
"warn",
f"MinerU version is `{result.version}`; project target is {TARGET_MINERU_VERSION}.",
(command_detail,),
)
return DoctorCheck("mineru", "pass", f"MinerU {result.version} CLI detected.", (command_detail,))
def _check_gpu(which: Which, run_command: CommandRunner) -> DoctorCheck:
nvidia_smi_path = which("nvidia-smi")
if nvidia_smi_path is None:
return DoctorCheck("gpu", "warn", "nvidia-smi was not found; NVIDIA GPU visibility could not be confirmed.")
result = run_command(NVIDIA_SMI_QUERY)
if result.exit_code != 0:
return DoctorCheck(
"gpu",
"warn",
"nvidia-smi was found, but GPU query failed.",
(f"path: {nvidia_smi_path}", f"exit code: {result.exit_code}", _trim_detail(result.stderr)),
)
try:
gpus = parse_nvidia_smi_gpus(result.stdout)
except ValueError as error:
return DoctorCheck(
"gpu",
"warn",
"nvidia-smi output could not be parsed.",
(f"path: {nvidia_smi_path}", str(error), _trim_detail(result.stdout)),
)
if not gpus:
return DoctorCheck("gpu", "warn", "nvidia-smi reported no visible NVIDIA GPU.", (f"path: {nvidia_smi_path}",))
details = [f"path: {nvidia_smi_path}", *_gpu_detail_lines(gpus), *_gpu_recommendation_details(gpus)]
risky_gpus = tuple(gpu for gpu in gpus if gpu.pre_turing_risk)
if risky_gpus:
return DoctorCheck(
"gpu",
"warn",
"NVIDIA GPU is visible, but Pascal/pre-Turing compatibility risk was detected.",
tuple(details),
)
return DoctorCheck("gpu", "pass", "NVIDIA GPU is visible.", tuple(details))
def _check_pytorch(import_module: ImportModule) -> DoctorCheck:
try:
torch = import_module("torch")
except ImportError:
return DoctorCheck("pytorch", "warn", "PyTorch is not installed; CUDA visibility through torch cannot be checked.")
except Exception as error: # pragma: no cover - defensive for broken local torch installs.
return DoctorCheck("pytorch", "warn", f"PyTorch import failed: {error}")
version = str(getattr(torch, "__version__", "unknown"))
cuda = getattr(torch, "cuda", None)
if cuda is None or not hasattr(cuda, "is_available"):
return DoctorCheck("pytorch", "warn", f"PyTorch {version} has no CUDA availability API.")
try:
available = bool(cuda.is_available())
except Exception as error: # pragma: no cover - defensive for broken CUDA runtimes.
return DoctorCheck("pytorch", "warn", f"PyTorch CUDA availability check failed: {error}", (f"torch: {version}",))
if not available:
return DoctorCheck("pytorch", "warn", f"PyTorch {version} reports CUDA unavailable.")
details = [f"torch: {version}"]
torch_version = getattr(torch, "version", None)
cuda_version = getattr(torch_version, "cuda", None)
if cuda_version:
details.append(f"torch cuda: {cuda_version}")
count = _safe_int_call(getattr(cuda, "device_count", None))
risky_devices: list[str] = []
if count is not None:
details.append(f"cuda devices: {count}")
get_device_name = getattr(cuda, "get_device_name", None)
get_device_capability = getattr(cuda, "get_device_capability", None)
if callable(get_device_name):
for index in range(count):
try:
device_name = str(get_device_name(index))
details.append(f"device {index}: {device_name}")
if _is_pascal_or_pre_turing(device_name):
risky_devices.append(f"device {index}: {device_name}")
except Exception:
details.append(f"device {index}: name unavailable")
if callable(get_device_capability):
for index in range(count):
try:
capability = tuple(int(part) for part in get_device_capability(index))
details.append(f"device {index} capability: {capability[0]}.{capability[1]}")
if capability < (7, 0):
risky_devices.append(f"device {index}: compute capability {capability[0]}.{capability[1]}")
except Exception:
details.append(f"device {index} capability: unavailable")
if risky_devices:
return DoctorCheck(
"pytorch",
"warn",
f"PyTorch {version} reports CUDA available, but Pascal/pre-Turing compatibility risk was detected.",
tuple(details + risky_devices),
)
return DoctorCheck("pytorch", "pass", f"PyTorch {version} reports CUDA available.", tuple(details))
def _gpu_detail_lines(gpus: tuple[GpuInfo, ...]) -> tuple[str, ...]:
return tuple(
f"gpu {gpu.index}: {gpu.name}, {gpu.memory_total_mib} MiB, driver {gpu.driver_version}"
for gpu in gpus
)
def _gpu_recommendation_details(gpus: tuple[GpuInfo, ...]) -> tuple[str, ...]:
try:
selection = select_gpu(gpus, "auto")
except ValueError:
return ()
profile = resolve_mineru_profile("auto", selected_gpu=selection.gpu, cuda_requested=True)
return (
f"auto gpu: {selection.cuda_device} ({selection.gpu.name}, {selection.gpu.memory_total_mib} MiB)",
f"recommended MinerU profile: {profile.applied_profile}",
)
def _check_model_cache(env: Mapping[str, str], path_exists: PathExists, home: Path) -> DoctorCheck:
configured_values: list[str] = []
existing_paths: list[str] = []
missing_paths: list[str] = []
for name in MODEL_CACHE_ENV_VARS:
raw_value = env.get(name, "").strip()
if not raw_value:
continue
if name == "MINERU_MODEL_SOURCE":
configured_values.append(f"{name}={raw_value}")
continue
path = _expand_path(raw_value)
detail = f"{name}={path}"
configured_values.append(detail)
if path_exists(path):
existing_paths.append(detail)
else:
missing_paths.append(detail)
user_config = home / "mineru.json"
if path_exists(user_config):
existing_paths.append(f"user config={user_config}")
if existing_paths:
details = existing_paths + [detail for detail in configured_values if detail not in existing_paths]
return DoctorCheck(
"models",
"pass",
"Local MinerU model/cache/config path was detected.",
tuple(details),
)
if missing_paths:
details = missing_paths + [detail for detail in configured_values if detail not in missing_paths]
return DoctorCheck(
"models",
"warn",
"MinerU model/cache environment variables are set, but their paths were not found.",
tuple(details),
)
if configured_values:
return DoctorCheck(
"models",
"warn",
"MinerU model source/config is set, but no local model/cache path was detected.",
tuple(configured_values),
)
return DoctorCheck(
"models",
"warn",
"No MinerU model/cache/config path was detected; run explicit local MinerU model setup before offline conversion.",
(f"checked env: {', '.join(MODEL_CACHE_ENV_VARS)}", f"checked config: {user_config}"),
)
def _check_mathjax_checker(which: Which, run_command: CommandRunner, path_exists: PathExists) -> DoctorCheck:
node_path = which("node")
helper_path = default_mathjax_helper_path()
if node_path is None:
return DoctorCheck(
"mathjax",
"warn",
"Node.js executable was not found; MathJax render checker is unavailable.",
)
if not path_exists(helper_path):
return DoctorCheck(
"mathjax",
"warn",
"MathJax helper script was not found.",
(f"expected: {helper_path}", f"node: {node_path}"),
)
version_result = run_command((node_path, "--version"))
if version_result.exit_code != 0:
return DoctorCheck(
"mathjax",
"warn",
"Node.js was found, but `node --version` failed.",
(f"node: {node_path}", f"exit code: {version_result.exit_code}", _trim_detail(version_result.stderr)),
)
health_result = run_command((node_path, str(helper_path), "--health"))
if health_result.exit_code != 0:
detail = _trim_detail(health_result.stderr)
return DoctorCheck(
"mathjax",
"warn",
"Local MathJax render checker is unavailable.",
(
f"node: {node_path}",
f"helper: {helper_path}",
f"exit code: {health_result.exit_code}",
detail,
),
)
node_version = _first_non_empty_line(version_result.stdout) or _first_non_empty_line(version_result.stderr)
details = [f"node: {node_path}", f"helper: {helper_path}"]
if node_version is not None:
details.append(f"node version: {node_version}")
return DoctorCheck("mathjax", "pass", "Local MathJax render checker is available.", tuple(details))
def _check_local_only_policy() -> DoctorCheck:
return DoctorCheck(
"local-only",
"pass",
"Runtime conversion is restricted to direct local mineru CLI execution.",
(
"allowed: mineru CLI without --api-url, including its temporary local mineru-api process",
"prohibited: --api-url, remote APIs, router mode, HTTP client backends, remote OpenAI-compatible backends",
),
)
def _default_mineru_probe(which: Which, run_command: CommandRunner) -> MinerUAdapter:
def adapter_runner(command: tuple[str, ...]) -> CommandResult:
result = run_command(command)
return CommandResult(
command=result.command,
exit_code=result.exit_code,
stdout=result.stdout,
stderr=result.stderr,
)
return MinerUAdapter(which=which, runner=adapter_runner)
def _run_command(command: tuple[str, ...]) -> DoctorCommandResult:
try:
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
timeout=20,
)
except FileNotFoundError as error:
return DoctorCommandResult(command=command, exit_code=127, stderr=str(error))
except subprocess.TimeoutExpired as error:
stdout = error.stdout if isinstance(error.stdout, str) else ""
stderr = error.stderr if isinstance(error.stderr, str) else ""
return DoctorCommandResult(command=command, exit_code=124, stdout=stdout, stderr=stderr or "command timed out")
return DoctorCommandResult(
command=command,
exit_code=completed.returncode,
stdout=completed.stdout,
stderr=completed.stderr,
)
def _first_non_empty_line(value: str) -> str | None:
for line in value.splitlines():
stripped = line.strip()
if stripped:
return stripped
return None
def _has_target_mineru_version(value: str) -> bool:
return re.search(rf"(?<!\d){re.escape(TARGET_MINERU_VERSION)}(?!\d)", value) is not None
def _trim_detail(value: str) -> str:
stripped = " ".join(value.split())
if not stripped:
return "stderr: <empty>"
return f"stderr: {stripped[:240]}"
def _is_pascal_or_pre_turing(value: str) -> bool:
normalized = value.casefold()
risky_tokens = (
"gtx 10",
"gtx 9",
"gtx 8",
"gtx 7",
"gtx 6",
"gtx 5",
"tesla p",
"quadro p",
"pascal",
)
return any(token in normalized for token in risky_tokens)
def _safe_int_call(function: object) -> int | None:
if not callable(function):
return None
try:
return int(function())
except Exception:
return None
def _expand_path(value: str) -> Path:
return Path(os.path.expandvars(value)).expanduser()