from __future__ import annotations import subprocess from pathlib import Path import pytest from pdf2md_ui.runner import ( CommandSpec, ResolvedCommand, RunningCommand, build_child_environment, build_convert_command, build_doctor_command, build_recheck_command, default_output_dir, resolve_cli_command, terminate_process_tree, ) from pdf2md_ui.runner import CliResolutionError def test_resolves_pdf2md_from_path_before_uv(tmp_path: Path) -> None: (tmp_path / "pyproject.toml").write_text("[project]\nname='x'\n", encoding="utf-8") resolved = resolve_cli_command( project_root=tmp_path, which=lambda name: {"pdf2md": "pdf2md.exe", "uv": "uv.exe"}.get(name), ) assert resolved == ResolvedCommand(("pdf2md.exe",), cwd=None, source="path") def test_resolves_uv_run_with_project_root_when_pdf2md_missing(tmp_path: Path) -> None: (tmp_path / "pyproject.toml").write_text("[project]\nname='x'\n", encoding="utf-8") resolved = resolve_cli_command( project_root=tmp_path, which=lambda name: {"uv": "uv.exe"}.get(name), ) assert resolved == ResolvedCommand(("uv.exe", "run", "pdf2md"), cwd=tmp_path.resolve(), source="uv") def test_resolution_requires_project_root_for_uv() -> None: with pytest.raises(CliResolutionError): resolve_cli_command(which=lambda name: "uv.exe" if name == "uv" else None) def test_configured_command_must_be_pdf2md() -> None: with pytest.raises(CliResolutionError, match="pdf2md"): resolve_cli_command(configured_command="mineru.exe") def test_builds_doctor_command() -> None: resolved = ResolvedCommand(("uv", "run", "pdf2md"), cwd=Path("repo"), source="uv") command = build_doctor_command(resolved) assert command == CommandSpec(("uv", "run", "pdf2md", "doctor"), cwd=Path("repo")) def test_builds_convert_command_with_fixed_argument_list(tmp_path: Path) -> None: resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path") input_pdf = tmp_path / "?쇰Ц.pdf" output_dir = tmp_path / "outputs" / "?쇰Ц" command = build_convert_command( resolved, input_pdf, output_dir, overwrite=True, keep_raw=True, chunk_pages=20, gpu="cuda:0", ) assert command.args == ( "pdf2md", "convert", str(input_pdf), "--out", str(output_dir), "--overwrite", "--keep-raw", "--chunk-pages", "20", "--gpu", "cuda:0", "--mineru-profile", "auto", ) def test_builds_recheck_command(tmp_path: Path) -> None: resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path") markdown = tmp_path / "paper.md" command = build_recheck_command(resolved, markdown) assert command.args == ("pdf2md", "recheck", str(markdown)) def test_generated_commands_do_not_include_remote_or_api_options(tmp_path: Path) -> None: resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path") command = build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out") joined = " ".join(command.args).casefold() for token in ("--api-url", "http://", "https://", "router", "openai", "mineru-api"): assert token not in joined def test_default_output_dir_uses_shared_output_root(tmp_path: Path) -> None: pdf = tmp_path / "?섍뎄議곕Ъ.pdf" assert default_output_dir(pdf, base_dir=tmp_path) == tmp_path / "outputs" def test_convert_rejects_non_positive_chunk_pages(tmp_path: Path) -> None: resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path") with pytest.raises(ValueError, match="positive"): build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out", chunk_pages=0) def test_convert_rejects_prohibited_gpu_tokens(tmp_path: Path) -> None: resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path") with pytest.raises(ValueError, match="strict-local"): build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out", gpu="https://example.test") def test_convert_rejects_unknown_mineru_profile(tmp_path: Path) -> None: resolved = ResolvedCommand(("pdf2md",), cwd=None, source="path") with pytest.raises(ValueError, match="mineru_profile"): build_convert_command(resolved, tmp_path / "paper.pdf", tmp_path / "out", mineru_profile="fast") def test_child_environment_defaults_mineru_model_source() -> None: environment = build_child_environment({"PATH": "x"}) assert environment["MINERU_MODEL_SOURCE"] == "local" def test_child_environment_preserves_existing_mineru_model_source() -> None: environment = build_child_environment({"MINERU_MODEL_SOURCE": "custom"}) assert environment["MINERU_MODEL_SOURCE"] == "custom" def test_running_command_uses_shell_false_and_streams_output() -> None: captured: dict[str, object] = {} events = [] class FakeProcess: pid = 123 stdout = iter(["hello\n", "done\n"]) def wait(self, timeout=None): return 0 def poll(self): return 0 def fake_popen(*args, **kwargs): captured["args"] = args captured["kwargs"] = kwargs return FakeProcess() runner = RunningCommand(CommandSpec(("pdf2md", "doctor")), events.append, popen_factory=fake_popen, base_env={}) assert runner.run() == 0 assert captured["args"] == (("pdf2md", "doctor"),) assert captured["kwargs"]["shell"] is False assert captured["kwargs"]["stderr"] is subprocess.STDOUT assert captured["kwargs"]["env"]["MINERU_MODEL_SOURCE"] == "local" assert [(event.kind, event.message, event.exit_code) for event in events] == [ ("start", "pdf2md doctor", None), ("output", "hello", None), ("output", "done", None), ("exit", "Command exited with code 0.", 0), ] def test_cancel_uses_taskkill_after_windows_grace_timeout() -> None: taskkill_calls = [] class SlowProcess: pid = 456 def __init__(self) -> None: self.wait_count = 0 self.terminated = False def poll(self): return None def terminate(self) -> None: self.terminated = True def wait(self, timeout=None): self.wait_count += 1 if self.wait_count == 1: raise subprocess.TimeoutExpired("pdf2md", timeout) return 1 def fake_taskkill(*args, **kwargs): taskkill_calls.append((args, kwargs)) return subprocess.CompletedProcess(args[0], 0) process = SlowProcess() assert terminate_process_tree(process, grace_seconds=0, taskkill_runner=fake_taskkill, os_name="nt") assert process.terminated assert taskkill_calls[0][0][0] == ["taskkill", "/pid", "456", "/t", "/f"] def test_cancel_does_not_taskkill_when_process_exits_promptly() -> None: taskkill_calls = [] class FastProcess: pid = 789 def poll(self): return None def terminate(self) -> None: pass def wait(self, timeout=None): return 0 assert terminate_process_tree(FastProcess(), taskkill_runner=lambda *args, **kwargs: taskkill_calls.append(args)) assert taskkill_calls == []