236 lines
7.1 KiB
Python
236 lines
7.1 KiB
Python
from __future__ import annotations
|
|
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from pdf2md_ui.runner import (
|
|
CommandSpec,
|
|
ResolvedCommand,
|
|
RunningCommand,
|
|
build_child_environment,
|
|
build_convert_command,
|
|
build_doctor_command,
|
|
build_recheck_command,
|
|
default_output_dir,
|
|
resolve_cli_command,
|
|
terminate_process_tree,
|
|
)
|
|
from pdf2md_ui.runner import CliResolutionError
|
|
|
|
|
|
def test_resolves_pdf2md_from_path_before_uv(tmp_path: Path) -> None:
|
|
(tmp_path / "pyproject.toml").write_text("[project]\nname='x'\n", encoding="utf-8")
|
|
|
|
resolved = resolve_cli_command(
|
|
project_root=tmp_path,
|
|
which=lambda name: {"pdf2md": "pdf2md.exe", "uv": "uv.exe"}.get(name),
|
|
)
|
|
|
|
assert resolved == ResolvedCommand(("pdf2md.exe",), cwd=None, source="path")
|
|
|
|
|
|
def test_resolves_uv_run_with_project_root_when_pdf2md_missing(tmp_path: Path) -> None:
|
|
(tmp_path / "pyproject.toml").write_text("[project]\nname='x'\n", encoding="utf-8")
|
|
|
|
resolved = resolve_cli_command(
|
|
project_root=tmp_path,
|
|
which=lambda name: {"uv": "uv.exe"}.get(name),
|
|
)
|
|
|
|
assert resolved == ResolvedCommand(("uv.exe", "run", "pdf2md"), cwd=tmp_path.resolve(), source="uv")
|
|
|
|
|
|
def test_resolution_requires_project_root_for_uv() -> None:
|
|
with pytest.raises(CliResolutionError):
|
|
resolve_cli_command(which=lambda name: "uv.exe" if name == "uv" else None)
|
|
|
|
|
|
def test_configured_command_must_be_pdf2md() -> None:
|
|
with pytest.raises(CliResolutionError, match="pdf2md"):
|
|
resolve_cli_command(configured_command="mineru.exe")
|
|
|
|
|
|
def test_builds_doctor_command() -> None:
|
|
resolved = ResolvedCommand(("uv", "run", "pdf2md"), cwd=Path("repo"), source="uv")
|
|
|
|
command = build_doctor_command(resolved)
|
|
|
|
assert command == CommandSpec(("uv", "run", "pdf2md", "doctor"), cwd=Path("repo"))
|
|
|
|
|
|
def test_builds_convert_command_with_fixed_argument_list(tmp_path: Path) -> None:
|
|
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
|
|
input_pdf = tmp_path / "?쇰Ц.pdf"
|
|
output_dir = tmp_path / "outputs" / "?쇰Ц"
|
|
|
|
command = build_convert_command(
|
|
resolved,
|
|
input_pdf,
|
|
output_dir,
|
|
overwrite=True,
|
|
keep_raw=True,
|
|
chunk_pages=20,
|
|
gpu="cuda:0",
|
|
)
|
|
|
|
assert command.args == (
|
|
"pdf2md",
|
|
"convert",
|
|
str(input_pdf),
|
|
"--out",
|
|
str(output_dir),
|
|
"--overwrite",
|
|
"--keep-raw",
|
|
"--chunk-pages",
|
|
"20",
|
|
"--gpu",
|
|
"cuda:0",
|
|
"--mineru-profile",
|
|
"auto",
|
|
)
|
|
|
|
|
|
def test_builds_recheck_command(tmp_path: Path) -> None:
|
|
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
|
|
markdown = tmp_path / "paper.md"
|
|
|
|
command = build_recheck_command(resolved, markdown)
|
|
|
|
assert command.args == ("pdf2md", "recheck", str(markdown))
|
|
|
|
|
|
def test_generated_commands_do_not_include_remote_or_api_options(tmp_path: Path) -> None:
|
|
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
|
|
command = build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out")
|
|
joined = " ".join(command.args).casefold()
|
|
|
|
for token in ("--api-url", "http://", "https://", "router", "openai", "mineru-api"):
|
|
assert token not in joined
|
|
|
|
|
|
def test_default_output_dir_uses_shared_output_root(tmp_path: Path) -> None:
|
|
pdf = tmp_path / "?섍뎄議곕Ъ.pdf"
|
|
|
|
assert default_output_dir(pdf, base_dir=tmp_path) == tmp_path / "outputs"
|
|
|
|
|
|
def test_convert_rejects_non_positive_chunk_pages(tmp_path: Path) -> None:
|
|
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
|
|
|
|
with pytest.raises(ValueError, match="positive"):
|
|
build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out", chunk_pages=0)
|
|
|
|
|
|
def test_convert_rejects_prohibited_gpu_tokens(tmp_path: Path) -> None:
|
|
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
|
|
|
|
with pytest.raises(ValueError, match="strict-local"):
|
|
build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out", gpu="https://example.test")
|
|
|
|
|
|
def test_convert_rejects_unknown_mineru_profile(tmp_path: Path) -> None:
|
|
resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path")
|
|
|
|
with pytest.raises(ValueError, match="mineru_profile"):
|
|
build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out", mineru_profile="fast")
|
|
|
|
|
|
def test_child_environment_defaults_mineru_model_source() -> None:
|
|
environment = build_child_environment({"PATH": "x"})
|
|
|
|
assert environment["MINERU_MODEL_SOURCE"] == "local"
|
|
|
|
|
|
def test_child_environment_preserves_existing_mineru_model_source() -> None:
|
|
environment = build_child_environment({"MINERU_MODEL_SOURCE": "custom"})
|
|
|
|
assert environment["MINERU_MODEL_SOURCE"] == "custom"
|
|
|
|
|
|
def test_running_command_uses_shell_false_and_streams_output() -> None:
|
|
captured: dict[str, object] = {}
|
|
events = []
|
|
|
|
class FakeProcess:
|
|
pid = 123
|
|
stdout = iter(["hello\n", "done\n"])
|
|
|
|
def wait(self, timeout=None):
|
|
return 0
|
|
|
|
def poll(self):
|
|
return 0
|
|
|
|
def fake_popen(*args, **kwargs):
|
|
captured["args"] = args
|
|
captured["kwargs"] = kwargs
|
|
return FakeProcess()
|
|
|
|
runner = RunningCommand(CommandSpec(("pdf2md", "doctor")), events.append, popen_factory=fake_popen, base_env={})
|
|
|
|
assert runner.run() == 0
|
|
assert captured["args"] == (("pdf2md", "doctor"),)
|
|
assert captured["kwargs"]["shell"] is False
|
|
assert captured["kwargs"]["stderr"] is subprocess.STDOUT
|
|
assert captured["kwargs"]["env"]["MINERU_MODEL_SOURCE"] == "local"
|
|
assert [(event.kind, event.message, event.exit_code) for event in events] == [
|
|
("start", "pdf2md doctor", None),
|
|
("output", "hello", None),
|
|
("output", "done", None),
|
|
("exit", "Command exited with code 0.", 0),
|
|
]
|
|
|
|
|
|
def test_cancel_uses_taskkill_after_windows_grace_timeout() -> None:
|
|
taskkill_calls = []
|
|
|
|
class SlowProcess:
|
|
pid = 456
|
|
|
|
def __init__(self) -> None:
|
|
self.wait_count = 0
|
|
self.terminated = False
|
|
|
|
def poll(self):
|
|
return None
|
|
|
|
def terminate(self) -> None:
|
|
self.terminated = True
|
|
|
|
def wait(self, timeout=None):
|
|
self.wait_count += 1
|
|
if self.wait_count == 1:
|
|
raise subprocess.TimeoutExpired("pdf2md", timeout)
|
|
return 1
|
|
|
|
def fake_taskkill(*args, **kwargs):
|
|
taskkill_calls.append((args, kwargs))
|
|
return subprocess.CompletedProcess(args[0], 0)
|
|
|
|
process = SlowProcess()
|
|
|
|
assert terminate_process_tree(process, grace_seconds=0, taskkill_runner=fake_taskkill, os_name="nt")
|
|
assert process.terminated
|
|
assert taskkill_calls[0][0][0] == ["taskkill", "/pid", "456", "/t", "/f"]
|
|
|
|
|
|
def test_cancel_does_not_taskkill_when_process_exits_promptly() -> None:
|
|
taskkill_calls = []
|
|
|
|
class FastProcess:
|
|
pid = 789
|
|
|
|
def poll(self):
|
|
return None
|
|
|
|
def terminate(self) -> None:
|
|
pass
|
|
|
|
def wait(self, timeout=None):
|
|
return 0
|
|
|
|
assert terminate_process_tree(FastProcess(), taskkill_runner=lambda *args, **kwargs: taskkill_calls.append(args))
|
|
assert taskkill_calls == []
|