Files
PDFToMD/tests/test_ui_runner.py
2026-05-14 10:16:59 +09:00

236 lines
7.1 KiB
Python

from __future__ import annotations
import subprocess
from pathlib import Path
import pytest
from pdf2md_ui.runner import (
CommandSpec,
ResolvedCommand,
RunningCommand,
build_child_environment,
build_convert_command,
build_doctor_command,
build_recheck_command,
default_output_dir,
resolve_cli_command,
terminate_process_tree,
)
from pdf2md_ui.runner import CliResolutionError
def test_resolves_pdf2md_from_path_before_uv(tmp_path: Path) -> None:
(tmp_path / "pyproject.toml").write_text("[project]\nname='x'\n", encoding="utf-8")
resolved = resolve_cli_command(
project_root=tmp_path,
which=lambda name: {"pdf2md": "pdf2md.exe", "uv": "uv.exe"}.get(name),
)
assert resolved == ResolvedCommand(("pdf2md.exe",), cwd=None, source="path")
def test_resolves_uv_run_with_project_root_when_pdf2md_missing(tmp_path: Path) -> None:
(tmp_path / "pyproject.toml").write_text("[project]\nname='x'\n", encoding="utf-8")
resolved = resolve_cli_command(
project_root=tmp_path,
which=lambda name: {"uv": "uv.exe"}.get(name),
)
assert resolved == ResolvedCommand(("uv.exe", "run", "pdf2md"), cwd=tmp_path.resolve(), source="uv")
def test_resolution_requires_project_root_for_uv() -> None:
with pytest.raises(CliResolutionError):
resolve_cli_command(which=lambda name: "uv.exe" if name == "uv" else None)
def test_configured_command_must_be_pdf2md() -> None:
with pytest.raises(CliResolutionError, match="pdf2md"):
resolve_cli_command(configured_command="mineru.exe")
def test_builds_doctor_command() -> None:
resolved = ResolvedCommand(("uv", "run", "pdf2md"), cwd=Path("repo"), source="uv")
command = build_doctor_command(resolved)
assert command == CommandSpec(("uv", "run", "pdf2md", "doctor"), cwd=Path("repo"))
def test_builds_convert_command_with_fixed_argument_list(tmp_path: Path) -> None:
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
input_pdf = tmp_path / "?쇰Ц.pdf"
output_dir = tmp_path / "outputs" / "?쇰Ц"
command = build_convert_command(
resolved,
input_pdf,
output_dir,
overwrite=True,
keep_raw=True,
chunk_pages=20,
gpu="cuda:0",
)
assert command.args == (
"pdf2md",
"convert",
str(input_pdf),
"--out",
str(output_dir),
"--overwrite",
"--keep-raw",
"--chunk-pages",
"20",
"--gpu",
"cuda:0",
"--mineru-profile",
"auto",
)
def test_builds_recheck_command(tmp_path: Path) -> None:
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
markdown = tmp_path / "paper.md"
command = build_recheck_command(resolved, markdown)
assert command.args == ("pdf2md", "recheck", str(markdown))
def test_generated_commands_do_not_include_remote_or_api_options(tmp_path: Path) -> None:
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
command = build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out")
joined = " ".join(command.args).casefold()
for token in ("--api-url", "http://", "https://", "router", "openai", "mineru-api"):
assert token not in joined
def test_default_output_dir_uses_shared_output_root(tmp_path: Path) -> None:
pdf = tmp_path / "?섍뎄議곕Ъ.pdf"
assert default_output_dir(pdf, base_dir=tmp_path) == tmp_path / "outputs"
def test_convert_rejects_non_positive_chunk_pages(tmp_path: Path) -> None:
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
with pytest.raises(ValueError, match="positive"):
build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out", chunk_pages=0)
def test_convert_rejects_prohibited_gpu_tokens(tmp_path: Path) -> None:
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
with pytest.raises(ValueError, match="strict-local"):
build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out", gpu="https://example.test")
def test_convert_rejects_unknown_mineru_profile(tmp_path: Path) -> None:
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
with pytest.raises(ValueError, match="mineru_profile"):
build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out", mineru_profile="fast")
def test_child_environment_defaults_mineru_model_source() -> None:
environment = build_child_environment({"PATH": "x"})
assert environment["MINERU_MODEL_SOURCE"] == "local"
def test_child_environment_preserves_existing_mineru_model_source() -> None:
environment = build_child_environment({"MINERU_MODEL_SOURCE": "custom"})
assert environment["MINERU_MODEL_SOURCE"] == "custom"
def test_running_command_uses_shell_false_and_streams_output() -> None:
captured: dict[str, object] = {}
events = []
class FakeProcess:
pid = 123
stdout = iter(["hello\n", "done\n"])
def wait(self, timeout=None):
return 0
def poll(self):
return 0
def fake_popen(*args, **kwargs):
captured["args"] = args
captured["kwargs"] = kwargs
return FakeProcess()
runner = RunningCommand(CommandSpec(("pdf2md", "doctor")), events.append, popen_factory=fake_popen, base_env={})
assert runner.run() == 0
assert captured["args"] == (("pdf2md", "doctor"),)
assert captured["kwargs"]["shell"] is False
assert captured["kwargs"]["stderr"] is subprocess.STDOUT
assert captured["kwargs"]["env"]["MINERU_MODEL_SOURCE"] == "local"
assert [(event.kind, event.message, event.exit_code) for event in events] == [
("start", "pdf2md doctor", None),
("output", "hello", None),
("output", "done", None),
("exit", "Command exited with code 0.", 0),
]
def test_cancel_uses_taskkill_after_windows_grace_timeout() -> None:
taskkill_calls = []
class SlowProcess:
pid = 456
def __init__(self) -> None:
self.wait_count = 0
self.terminated = False
def poll(self):
return None
def terminate(self) -> None:
self.terminated = True
def wait(self, timeout=None):
self.wait_count += 1
if self.wait_count == 1:
raise subprocess.TimeoutExpired("pdf2md", timeout)
return 1
def fake_taskkill(*args, **kwargs):
taskkill_calls.append((args, kwargs))
return subprocess.CompletedProcess(args[0], 0)
process = SlowProcess()
assert terminate_process_tree(process, grace_seconds=0, taskkill_runner=fake_taskkill, os_name="nt")
assert process.terminated
assert taskkill_calls[0][0][0] == ["taskkill", "/pid", "456", "/t", "/f"]
def test_cancel_does_not_taskkill_when_process_exits_promptly() -> None:
taskkill_calls = []
class FastProcess:
pid = 789
def poll(self):
return None
def terminate(self) -> None:
pass
def wait(self, timeout=None):
return 0
assert terminate_process_tree(FastProcess(), taskkill_runner=lambda *args, **kwargs: taskkill_calls.append(args))
assert taskkill_calls == []