modify documents
This commit is contained in:
@@ -0,0 +1,426 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Codex Harness Step Executor — phase 내 step을 순차 실행하고 자가 교정한다.
|
||||
|
||||
Usage:
|
||||
python3 scripts/execute.py <phase-dir> [--push]
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import types
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def progress_indicator(label: str):
|
||||
"""터미널 진행 표시기. with 문으로 사용하며 .elapsed 로 경과 시간을 읽는다."""
|
||||
frames = "◐◓◑◒"
|
||||
stop = threading.Event()
|
||||
t0 = time.monotonic()
|
||||
|
||||
def _animate():
|
||||
idx = 0
|
||||
while not stop.wait(0.12):
|
||||
sec = int(time.monotonic() - t0)
|
||||
sys.stderr.write(f"\r{frames[idx % len(frames)]} {label} [{sec}s]")
|
||||
sys.stderr.flush()
|
||||
idx += 1
|
||||
sys.stderr.write("\r" + " " * (len(label) + 20) + "\r")
|
||||
sys.stderr.flush()
|
||||
|
||||
th = threading.Thread(target=_animate, daemon=True)
|
||||
th.start()
|
||||
info = types.SimpleNamespace(elapsed=0.0)
|
||||
try:
|
||||
yield info
|
||||
finally:
|
||||
stop.set()
|
||||
th.join()
|
||||
info.elapsed = time.monotonic() - t0
|
||||
|
||||
|
||||
class StepExecutor:
|
||||
"""Phase 디렉토리 안의 step들을 Codex로 순차 실행하는 하네스."""
|
||||
|
||||
MAX_RETRIES = 3
|
||||
FEAT_MSG = "feat({phase}): step {num} — {name}"
|
||||
CHORE_MSG = "chore({phase}): step {num} output"
|
||||
TZ = timezone(timedelta(hours=9))
|
||||
|
||||
def __init__(self, phase_dir_name: str, *, auto_push: bool = False):
|
||||
self._root = str(ROOT)
|
||||
self._phases_dir = ROOT / "phases"
|
||||
self._phase_dir = self._phases_dir / phase_dir_name
|
||||
self._phase_dir_name = phase_dir_name
|
||||
self._top_index_file = self._phases_dir / "index.json"
|
||||
self._auto_push = auto_push
|
||||
|
||||
if not self._phase_dir.is_dir():
|
||||
print(f"ERROR: {self._phase_dir} not found")
|
||||
sys.exit(1)
|
||||
|
||||
self._index_file = self._phase_dir / "index.json"
|
||||
if not self._index_file.exists():
|
||||
print(f"ERROR: {self._index_file} not found")
|
||||
sys.exit(1)
|
||||
|
||||
idx = self._read_json(self._index_file)
|
||||
self._project = idx.get("project", "project")
|
||||
self._phase_name = idx.get("phase", phase_dir_name)
|
||||
self._total = len(idx["steps"])
|
||||
|
||||
def run(self):
|
||||
self._print_header()
|
||||
self._check_blockers()
|
||||
self._checkout_branch()
|
||||
guardrails = self._load_guardrails()
|
||||
self._ensure_created_at()
|
||||
self._execute_all_steps(guardrails)
|
||||
self._finalize()
|
||||
|
||||
# --- timestamps ---
|
||||
|
||||
def _stamp(self) -> str:
|
||||
return datetime.now(self.TZ).strftime("%Y-%m-%dT%H:%M:%S%z")
|
||||
|
||||
# --- JSON I/O ---
|
||||
|
||||
@staticmethod
|
||||
def _read_json(p: Path) -> dict:
|
||||
return json.loads(p.read_text(encoding="utf-8"))
|
||||
|
||||
@staticmethod
|
||||
def _write_json(p: Path, data: dict):
|
||||
p.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
|
||||
|
||||
# --- git ---
|
||||
|
||||
def _run_git(self, *args) -> subprocess.CompletedProcess:
|
||||
cmd = ["git"] + list(args)
|
||||
return subprocess.run(cmd, cwd=self._root, capture_output=True, text=True)
|
||||
|
||||
def _checkout_branch(self):
|
||||
branch = f"feat-{self._phase_name}"
|
||||
|
||||
r = self._run_git("rev-parse", "--abbrev-ref", "HEAD")
|
||||
if r.returncode != 0:
|
||||
print(f" ERROR: git을 사용할 수 없거나 git repo가 아닙니다.")
|
||||
print(f" {r.stderr.strip()}")
|
||||
sys.exit(1)
|
||||
|
||||
if r.stdout.strip() == branch:
|
||||
return
|
||||
|
||||
r = self._run_git("rev-parse", "--verify", branch)
|
||||
r = self._run_git("checkout", branch) if r.returncode == 0 else self._run_git("checkout", "-b", branch)
|
||||
|
||||
if r.returncode != 0:
|
||||
print(f" ERROR: 브랜치 '{branch}' checkout 실패.")
|
||||
print(f" {r.stderr.strip()}")
|
||||
print(f" Hint: 변경사항을 stash하거나 commit한 후 다시 시도하세요.")
|
||||
sys.exit(1)
|
||||
|
||||
print(f" Branch: {branch}")
|
||||
|
||||
def _commit_step(self, step_num: int, step_name: str):
|
||||
output_rel = f"phases/{self._phase_dir_name}/step{step_num}-output.json"
|
||||
index_rel = f"phases/{self._phase_dir_name}/index.json"
|
||||
|
||||
self._run_git("add", "-A")
|
||||
self._run_git("reset", "HEAD", "--", output_rel)
|
||||
self._run_git("reset", "HEAD", "--", index_rel)
|
||||
|
||||
if self._run_git("diff", "--cached", "--quiet").returncode != 0:
|
||||
msg = self.FEAT_MSG.format(phase=self._phase_name, num=step_num, name=step_name)
|
||||
r = self._run_git("commit", "-m", msg)
|
||||
if r.returncode == 0:
|
||||
print(f" Commit: {msg}")
|
||||
else:
|
||||
print(f" WARN: 코드 커밋 실패: {r.stderr.strip()}")
|
||||
|
||||
self._run_git("add", "-A")
|
||||
if self._run_git("diff", "--cached", "--quiet").returncode != 0:
|
||||
msg = self.CHORE_MSG.format(phase=self._phase_name, num=step_num)
|
||||
r = self._run_git("commit", "-m", msg)
|
||||
if r.returncode != 0:
|
||||
print(f" WARN: housekeeping 커밋 실패: {r.stderr.strip()}")
|
||||
|
||||
# --- top-level index ---
|
||||
|
||||
def _update_top_index(self, status: str):
|
||||
if not self._top_index_file.exists():
|
||||
return
|
||||
top = self._read_json(self._top_index_file)
|
||||
ts = self._stamp()
|
||||
for phase in top.get("phases", []):
|
||||
if phase.get("dir") == self._phase_dir_name:
|
||||
phase["status"] = status
|
||||
ts_key = {"completed": "completed_at", "error": "failed_at", "blocked": "blocked_at"}.get(status)
|
||||
if ts_key:
|
||||
phase[ts_key] = ts
|
||||
break
|
||||
self._write_json(self._top_index_file, top)
|
||||
|
||||
# --- guardrails & context ---
|
||||
|
||||
def _load_guardrails(self) -> str:
|
||||
sections = []
|
||||
agents_md = ROOT / "AGENTS.md"
|
||||
if agents_md.exists():
|
||||
sections.append(f"## 프로젝트 규칙 (AGENTS.md)\n\n{agents_md.read_text(encoding='utf-8')}")
|
||||
docs_dir = ROOT / "docs"
|
||||
if docs_dir.is_dir():
|
||||
for doc in sorted(docs_dir.glob("*.md")):
|
||||
sections.append(f"## {doc.stem}\n\n{doc.read_text(encoding='utf-8')}")
|
||||
return "\n\n---\n\n".join(sections) if sections else ""
|
||||
|
||||
@staticmethod
|
||||
def _build_step_context(index: dict) -> str:
|
||||
lines = [
|
||||
f"- Step {s['step']} ({s['name']}): {s['summary']}"
|
||||
for s in index["steps"]
|
||||
if s["status"] == "completed" and s.get("summary")
|
||||
]
|
||||
if not lines:
|
||||
return ""
|
||||
return "## 이전 Step 산출물\n\n" + "\n".join(lines) + "\n\n"
|
||||
|
||||
def _build_preamble(self, guardrails: str, step_context: str,
|
||||
prev_error: Optional[str] = None) -> str:
|
||||
retry_section = ""
|
||||
if prev_error:
|
||||
retry_section = (
|
||||
f"\n## ⚠ 이전 시도 실패 — 아래 에러를 반드시 참고하여 수정하라\n\n"
|
||||
f"{prev_error}\n\n---\n\n"
|
||||
)
|
||||
return (
|
||||
f"당신은 {self._project} 프로젝트의 Codex 문서 작성 에이전트입니다. 아래 step을 수행하세요.\n\n"
|
||||
f"{guardrails}\n\n---\n\n"
|
||||
f"{step_context}{retry_section}"
|
||||
f"## 작업 규칙\n\n"
|
||||
f"1. 이전 step에서 작성된 문서와 메모를 확인하고 일관성을 유지하라.\n"
|
||||
f"2. 이 step에 명시된 작업만 수행하라. 추가 산출물이나 임의 요구사항을 만들지 마라.\n"
|
||||
f"3. 기존 문서 구조와 피드백 기록을 깨뜨리지 마라.\n"
|
||||
f"4. AC(Acceptance Criteria) 검증을 직접 실행하라.\n"
|
||||
f"5. /phases/{self._phase_dir_name}/index.json의 해당 step status를 업데이트하라:\n"
|
||||
f" - AC 통과 → \"completed\" + \"summary\" 필드에 이 step의 산출물을 한 줄로 요약\n"
|
||||
f" - {self.MAX_RETRIES}회 수정 시도 후에도 실패 → \"error\" + \"error_message\" 기록\n"
|
||||
f" - 사용자 개입이 필요한 경우 (API 키, 인증, 수동 설정 등) → \"blocked\" + \"blocked_reason\" 기록 후 즉시 중단\n"
|
||||
f"6. 직접 git commit하지 마라. commit은 scripts/execute.py가 step 완료 후 수행한다.\n"
|
||||
f"7. 병렬 조사나 독립 리뷰가 필요하고 step에서 허용했다면 .codex/agents의 custom agent 역할을 활용하라.\n\n---\n\n"
|
||||
)
|
||||
|
||||
# --- Codex 호출 ---
|
||||
|
||||
def _invoke_codex(self, step: dict, preamble: str) -> dict:
|
||||
step_num, step_name = step["step"], step["name"]
|
||||
step_file = self._phase_dir / f"step{step_num}.md"
|
||||
|
||||
if not step_file.exists():
|
||||
print(f" ERROR: {step_file} not found")
|
||||
sys.exit(1)
|
||||
|
||||
prompt = preamble + step_file.read_text(encoding="utf-8")
|
||||
cmd = ["codex", "exec", "--skip-git-repo-check", "--full-auto", "--json", "-"]
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=self._root,
|
||||
input=prompt,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
errors="replace",
|
||||
timeout=1800,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
print("\n ERROR: Codex CLI를 찾을 수 없습니다. `codex --version`이 실행되는지 확인하세요.")
|
||||
sys.exit(1)
|
||||
|
||||
if result.returncode != 0:
|
||||
print(f"\n WARN: Codex가 비정상 종료됨 (code {result.returncode})")
|
||||
if result.stderr:
|
||||
print(f" stderr: {result.stderr[:500]}")
|
||||
|
||||
output = {
|
||||
"step": step_num, "name": step_name,
|
||||
"exitCode": result.returncode,
|
||||
"stdout": result.stdout, "stderr": result.stderr,
|
||||
}
|
||||
out_path = self._phase_dir / f"step{step_num}-output.json"
|
||||
with open(out_path, "w", encoding="utf-8") as f:
|
||||
json.dump(output, f, indent=2, ensure_ascii=False)
|
||||
|
||||
return output
|
||||
|
||||
# --- 헤더 & 검증 ---
|
||||
|
||||
def _print_header(self):
|
||||
print(f"\n{'='*60}")
|
||||
print(f" Codex Harness Step Executor")
|
||||
print(f" Phase: {self._phase_name} | Steps: {self._total}")
|
||||
if self._auto_push:
|
||||
print(f" Auto-push: enabled")
|
||||
print(f"{'='*60}")
|
||||
|
||||
def _check_blockers(self):
|
||||
index = self._read_json(self._index_file)
|
||||
for s in reversed(index["steps"]):
|
||||
if s["status"] == "error":
|
||||
print(f"\n ✗ Step {s['step']} ({s['name']}) failed.")
|
||||
print(f" Error: {s.get('error_message', 'unknown')}")
|
||||
print(f" Fix and reset status to 'pending' to retry.")
|
||||
sys.exit(1)
|
||||
if s["status"] == "blocked":
|
||||
print(f"\n ⏸ Step {s['step']} ({s['name']}) blocked.")
|
||||
print(f" Reason: {s.get('blocked_reason', 'unknown')}")
|
||||
print(f" Resolve and reset status to 'pending' to retry.")
|
||||
sys.exit(2)
|
||||
if s["status"] != "pending":
|
||||
break
|
||||
|
||||
def _ensure_created_at(self):
|
||||
index = self._read_json(self._index_file)
|
||||
if "created_at" not in index:
|
||||
index["created_at"] = self._stamp()
|
||||
self._write_json(self._index_file, index)
|
||||
|
||||
# --- 실행 루프 ---
|
||||
|
||||
def _execute_single_step(self, step: dict, guardrails: str) -> bool:
|
||||
"""단일 step 실행 (재시도 포함). 완료되면 True, 실패/차단이면 False."""
|
||||
step_num, step_name = step["step"], step["name"]
|
||||
done = sum(1 for s in self._read_json(self._index_file)["steps"] if s["status"] == "completed")
|
||||
prev_error = None
|
||||
|
||||
for attempt in range(1, self.MAX_RETRIES + 1):
|
||||
index = self._read_json(self._index_file)
|
||||
step_context = self._build_step_context(index)
|
||||
preamble = self._build_preamble(guardrails, step_context, prev_error)
|
||||
|
||||
tag = f"Step {step_num}/{self._total - 1} ({done} done): {step_name}"
|
||||
if attempt > 1:
|
||||
tag += f" [retry {attempt}/{self.MAX_RETRIES}]"
|
||||
|
||||
with progress_indicator(tag) as pi:
|
||||
self._invoke_codex(step, preamble)
|
||||
elapsed = int(pi.elapsed)
|
||||
|
||||
index = self._read_json(self._index_file)
|
||||
status = next((s.get("status", "pending") for s in index["steps"] if s["step"] == step_num), "pending")
|
||||
ts = self._stamp()
|
||||
|
||||
if status == "completed":
|
||||
for s in index["steps"]:
|
||||
if s["step"] == step_num:
|
||||
s["completed_at"] = ts
|
||||
self._write_json(self._index_file, index)
|
||||
self._commit_step(step_num, step_name)
|
||||
print(f" ✓ Step {step_num}: {step_name} [{elapsed}s]")
|
||||
return True
|
||||
|
||||
if status == "blocked":
|
||||
for s in index["steps"]:
|
||||
if s["step"] == step_num:
|
||||
s["blocked_at"] = ts
|
||||
self._write_json(self._index_file, index)
|
||||
reason = next((s.get("blocked_reason", "") for s in index["steps"] if s["step"] == step_num), "")
|
||||
print(f" ⏸ Step {step_num}: {step_name} blocked [{elapsed}s]")
|
||||
print(f" Reason: {reason}")
|
||||
self._update_top_index("blocked")
|
||||
sys.exit(2)
|
||||
|
||||
err_msg = next(
|
||||
(s.get("error_message", "Step did not update status") for s in index["steps"] if s["step"] == step_num),
|
||||
"Step did not update status",
|
||||
)
|
||||
|
||||
if attempt < self.MAX_RETRIES:
|
||||
for s in index["steps"]:
|
||||
if s["step"] == step_num:
|
||||
s["status"] = "pending"
|
||||
s.pop("error_message", None)
|
||||
self._write_json(self._index_file, index)
|
||||
prev_error = err_msg
|
||||
print(f" ↻ Step {step_num}: retry {attempt}/{self.MAX_RETRIES} — {err_msg}")
|
||||
else:
|
||||
for s in index["steps"]:
|
||||
if s["step"] == step_num:
|
||||
s["status"] = "error"
|
||||
s["error_message"] = f"[{self.MAX_RETRIES}회 시도 후 실패] {err_msg}"
|
||||
s["failed_at"] = ts
|
||||
self._write_json(self._index_file, index)
|
||||
self._commit_step(step_num, step_name)
|
||||
print(f" ✗ Step {step_num}: {step_name} failed after {self.MAX_RETRIES} attempts [{elapsed}s]")
|
||||
print(f" Error: {err_msg}")
|
||||
self._update_top_index("error")
|
||||
sys.exit(1)
|
||||
|
||||
return False # unreachable
|
||||
|
||||
def _execute_all_steps(self, guardrails: str):
|
||||
while True:
|
||||
index = self._read_json(self._index_file)
|
||||
pending = next((s for s in index["steps"] if s["status"] == "pending"), None)
|
||||
if pending is None:
|
||||
print("\n All steps completed!")
|
||||
return
|
||||
|
||||
step_num = pending["step"]
|
||||
for s in index["steps"]:
|
||||
if s["step"] == step_num and "started_at" not in s:
|
||||
s["started_at"] = self._stamp()
|
||||
self._write_json(self._index_file, index)
|
||||
break
|
||||
|
||||
self._execute_single_step(pending, guardrails)
|
||||
|
||||
def _finalize(self):
|
||||
index = self._read_json(self._index_file)
|
||||
index["completed_at"] = self._stamp()
|
||||
self._write_json(self._index_file, index)
|
||||
self._update_top_index("completed")
|
||||
|
||||
self._run_git("add", "-A")
|
||||
if self._run_git("diff", "--cached", "--quiet").returncode != 0:
|
||||
msg = f"chore({self._phase_name}): mark phase completed"
|
||||
r = self._run_git("commit", "-m", msg)
|
||||
if r.returncode == 0:
|
||||
print(f" ✓ {msg}")
|
||||
|
||||
if self._auto_push:
|
||||
branch = f"feat-{self._phase_name}"
|
||||
r = self._run_git("push", "-u", "origin", branch)
|
||||
if r.returncode != 0:
|
||||
print(f"\n ERROR: git push 실패: {r.stderr.strip()}")
|
||||
sys.exit(1)
|
||||
print(f" ✓ Pushed to origin/{branch}")
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f" Phase '{self._phase_name}' completed!")
|
||||
print(f"{'='*60}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Codex Harness Step Executor")
|
||||
parser.add_argument("phase_dir", help="Phase directory name (e.g. 0-mvp)")
|
||||
parser.add_argument("--push", action="store_true", help="Push branch after completion")
|
||||
args = parser.parse_args()
|
||||
|
||||
StepExecutor(args.phase_dir, auto_push=args.push).run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,560 @@
|
||||
"""
|
||||
execute.py 리팩터링 안전망 테스트.
|
||||
리팩터링 전후 동작이 동일한지 검증한다.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import textwrap
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
import execute as ex
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_project(tmp_path):
|
||||
"""phases/, AGENTS.md, docs/ 를 갖춘 임시 프로젝트 구조."""
|
||||
phases_dir = tmp_path / "phases"
|
||||
phases_dir.mkdir()
|
||||
|
||||
agents_md = tmp_path / "AGENTS.md"
|
||||
agents_md.write_text("# Rules\n- rule one\n- rule two", encoding="utf-8")
|
||||
|
||||
docs_dir = tmp_path / "docs"
|
||||
docs_dir.mkdir()
|
||||
(docs_dir / "arch.md").write_text("# Architecture\nSome content", encoding="utf-8")
|
||||
(docs_dir / "guide.md").write_text("# Guide\nAnother doc", encoding="utf-8")
|
||||
|
||||
return tmp_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def phase_dir(tmp_project):
|
||||
"""step 3개를 가진 phase 디렉토리."""
|
||||
d = tmp_project / "phases" / "0-mvp"
|
||||
d.mkdir()
|
||||
|
||||
index = {
|
||||
"project": "TestProject",
|
||||
"phase": "mvp",
|
||||
"steps": [
|
||||
{"step": 0, "name": "setup", "status": "completed", "summary": "프로젝트 초기화 완료"},
|
||||
{"step": 1, "name": "core", "status": "completed", "summary": "핵심 로직 구현"},
|
||||
{"step": 2, "name": "ui", "status": "pending"},
|
||||
],
|
||||
}
|
||||
(d / "index.json").write_text(json.dumps(index, indent=2, ensure_ascii=False), encoding="utf-8")
|
||||
(d / "step2.md").write_text("# Step 2: UI\n\nUI를 구현하세요.", encoding="utf-8")
|
||||
|
||||
return d
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def top_index(tmp_project):
|
||||
"""phases/index.json (top-level)."""
|
||||
top = {
|
||||
"phases": [
|
||||
{"dir": "0-mvp", "status": "pending"},
|
||||
{"dir": "1-polish", "status": "pending"},
|
||||
]
|
||||
}
|
||||
p = tmp_project / "phases" / "index.json"
|
||||
p.write_text(json.dumps(top, indent=2), encoding="utf-8")
|
||||
return p
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def executor(tmp_project, phase_dir):
|
||||
"""테스트용 StepExecutor 인스턴스. git 호출은 별도 mock 필요."""
|
||||
with patch.object(ex, "ROOT", tmp_project):
|
||||
inst = ex.StepExecutor("0-mvp")
|
||||
# 내부 경로를 tmp_project 기준으로 재설정
|
||||
inst._root = str(tmp_project)
|
||||
inst._phases_dir = tmp_project / "phases"
|
||||
inst._phase_dir = phase_dir
|
||||
inst._phase_dir_name = "0-mvp"
|
||||
inst._index_file = phase_dir / "index.json"
|
||||
inst._top_index_file = tmp_project / "phases" / "index.json"
|
||||
return inst
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _stamp (= 이전 now_iso)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestStamp:
|
||||
def test_returns_kst_timestamp(self, executor):
|
||||
result = executor._stamp()
|
||||
assert "+0900" in result
|
||||
|
||||
def test_format_is_iso(self, executor):
|
||||
result = executor._stamp()
|
||||
dt = datetime.strptime(result, "%Y-%m-%dT%H:%M:%S%z")
|
||||
assert dt.tzinfo is not None
|
||||
|
||||
def test_is_current_time(self, executor):
|
||||
before = datetime.now(ex.StepExecutor.TZ).replace(microsecond=0)
|
||||
result = executor._stamp()
|
||||
after = datetime.now(ex.StepExecutor.TZ).replace(microsecond=0) + timedelta(seconds=1)
|
||||
parsed = datetime.strptime(result, "%Y-%m-%dT%H:%M:%S%z")
|
||||
assert before <= parsed <= after
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _read_json / _write_json
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestJsonHelpers:
|
||||
def test_roundtrip(self, tmp_path):
|
||||
data = {"key": "값", "nested": [1, 2, 3]}
|
||||
p = tmp_path / "test.json"
|
||||
ex.StepExecutor._write_json(p, data)
|
||||
loaded = ex.StepExecutor._read_json(p)
|
||||
assert loaded == data
|
||||
|
||||
def test_save_ensures_ascii_false(self, tmp_path):
|
||||
p = tmp_path / "test.json"
|
||||
ex.StepExecutor._write_json(p, {"한글": "테스트"})
|
||||
raw = p.read_text(encoding="utf-8")
|
||||
assert "한글" in raw
|
||||
assert "\\u" not in raw
|
||||
|
||||
def test_save_indented(self, tmp_path):
|
||||
p = tmp_path / "test.json"
|
||||
ex.StepExecutor._write_json(p, {"a": 1})
|
||||
raw = p.read_text(encoding="utf-8")
|
||||
assert "\n" in raw
|
||||
|
||||
def test_load_nonexistent_raises(self, tmp_path):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
ex.StepExecutor._read_json(tmp_path / "nope.json")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _load_guardrails
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLoadGuardrails:
|
||||
def test_loads_agents_md_and_docs(self, executor, tmp_project):
|
||||
with patch.object(ex, "ROOT", tmp_project):
|
||||
result = executor._load_guardrails()
|
||||
assert "# Rules" in result
|
||||
assert "rule one" in result
|
||||
assert "# Architecture" in result
|
||||
assert "# Guide" in result
|
||||
|
||||
def test_sections_separated_by_divider(self, executor, tmp_project):
|
||||
with patch.object(ex, "ROOT", tmp_project):
|
||||
result = executor._load_guardrails()
|
||||
assert "---" in result
|
||||
|
||||
def test_docs_sorted_alphabetically(self, executor, tmp_project):
|
||||
with patch.object(ex, "ROOT", tmp_project):
|
||||
result = executor._load_guardrails()
|
||||
arch_pos = result.index("arch")
|
||||
guide_pos = result.index("guide")
|
||||
assert arch_pos < guide_pos
|
||||
|
||||
def test_no_agents_md(self, executor, tmp_project):
|
||||
(tmp_project / "AGENTS.md").unlink()
|
||||
with patch.object(ex, "ROOT", tmp_project):
|
||||
result = executor._load_guardrails()
|
||||
assert "AGENTS.md" not in result
|
||||
assert "Architecture" in result
|
||||
|
||||
def test_no_docs_dir(self, executor, tmp_project):
|
||||
import shutil
|
||||
shutil.rmtree(tmp_project / "docs")
|
||||
with patch.object(ex, "ROOT", tmp_project):
|
||||
result = executor._load_guardrails()
|
||||
assert "Rules" in result
|
||||
assert "Architecture" not in result
|
||||
|
||||
def test_empty_project(self, tmp_path):
|
||||
with patch.object(ex, "ROOT", tmp_path):
|
||||
# executor가 필요 없는 static-like 동작이므로 임시 인스턴스
|
||||
phases_dir = tmp_path / "phases" / "dummy"
|
||||
phases_dir.mkdir(parents=True)
|
||||
idx = {"project": "T", "phase": "t", "steps": []}
|
||||
(phases_dir / "index.json").write_text(json.dumps(idx), encoding="utf-8")
|
||||
inst = ex.StepExecutor.__new__(ex.StepExecutor)
|
||||
result = inst._load_guardrails()
|
||||
assert result == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_step_context
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBuildStepContext:
|
||||
def test_includes_completed_with_summary(self, phase_dir):
|
||||
index = json.loads((phase_dir / "index.json").read_text(encoding="utf-8"))
|
||||
result = ex.StepExecutor._build_step_context(index)
|
||||
assert "Step 0 (setup): 프로젝트 초기화 완료" in result
|
||||
assert "Step 1 (core): 핵심 로직 구현" in result
|
||||
|
||||
def test_excludes_pending(self, phase_dir):
|
||||
index = json.loads((phase_dir / "index.json").read_text(encoding="utf-8"))
|
||||
result = ex.StepExecutor._build_step_context(index)
|
||||
assert "ui" not in result
|
||||
|
||||
def test_excludes_completed_without_summary(self, phase_dir):
|
||||
index = json.loads((phase_dir / "index.json").read_text(encoding="utf-8"))
|
||||
del index["steps"][0]["summary"]
|
||||
result = ex.StepExecutor._build_step_context(index)
|
||||
assert "setup" not in result
|
||||
assert "core" in result
|
||||
|
||||
def test_empty_when_no_completed(self):
|
||||
index = {"steps": [{"step": 0, "name": "a", "status": "pending"}]}
|
||||
result = ex.StepExecutor._build_step_context(index)
|
||||
assert result == ""
|
||||
|
||||
def test_has_header(self, phase_dir):
|
||||
index = json.loads((phase_dir / "index.json").read_text(encoding="utf-8"))
|
||||
result = ex.StepExecutor._build_step_context(index)
|
||||
assert result.startswith("## 이전 Step 산출물")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_preamble
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBuildPreamble:
|
||||
def test_includes_project_name(self, executor):
|
||||
result = executor._build_preamble("", "")
|
||||
assert "TestProject" in result
|
||||
|
||||
def test_includes_guardrails(self, executor):
|
||||
result = executor._build_preamble("GUARD_CONTENT", "")
|
||||
assert "GUARD_CONTENT" in result
|
||||
|
||||
def test_includes_step_context(self, executor):
|
||||
ctx = "## 이전 Step 산출물\n\n- Step 0: done"
|
||||
result = executor._build_preamble("", ctx)
|
||||
assert "이전 Step 산출물" in result
|
||||
|
||||
def test_tells_agent_not_to_commit_directly(self, executor):
|
||||
result = executor._build_preamble("", "")
|
||||
assert "직접 git commit하지 마라" in result
|
||||
|
||||
def test_includes_rules(self, executor):
|
||||
result = executor._build_preamble("", "")
|
||||
assert "작업 규칙" in result
|
||||
assert "AC" in result
|
||||
|
||||
def test_no_retry_section_by_default(self, executor):
|
||||
result = executor._build_preamble("", "")
|
||||
assert "이전 시도 실패" not in result
|
||||
|
||||
def test_retry_section_with_prev_error(self, executor):
|
||||
result = executor._build_preamble("", "", prev_error="타입 에러 발생")
|
||||
assert "이전 시도 실패" in result
|
||||
assert "타입 에러 발생" in result
|
||||
|
||||
def test_includes_max_retries(self, executor):
|
||||
result = executor._build_preamble("", "")
|
||||
assert str(ex.StepExecutor.MAX_RETRIES) in result
|
||||
|
||||
def test_includes_index_path(self, executor):
|
||||
result = executor._build_preamble("", "")
|
||||
assert "/phases/0-mvp/index.json" in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _update_top_index
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestUpdateTopIndex:
|
||||
def test_completed(self, executor, top_index):
|
||||
executor._top_index_file = top_index
|
||||
executor._update_top_index("completed")
|
||||
data = json.loads(top_index.read_text(encoding="utf-8"))
|
||||
mvp = next(p for p in data["phases"] if p["dir"] == "0-mvp")
|
||||
assert mvp["status"] == "completed"
|
||||
assert "completed_at" in mvp
|
||||
|
||||
def test_error(self, executor, top_index):
|
||||
executor._top_index_file = top_index
|
||||
executor._update_top_index("error")
|
||||
data = json.loads(top_index.read_text(encoding="utf-8"))
|
||||
mvp = next(p for p in data["phases"] if p["dir"] == "0-mvp")
|
||||
assert mvp["status"] == "error"
|
||||
assert "failed_at" in mvp
|
||||
|
||||
def test_blocked(self, executor, top_index):
|
||||
executor._top_index_file = top_index
|
||||
executor._update_top_index("blocked")
|
||||
data = json.loads(top_index.read_text(encoding="utf-8"))
|
||||
mvp = next(p for p in data["phases"] if p["dir"] == "0-mvp")
|
||||
assert mvp["status"] == "blocked"
|
||||
assert "blocked_at" in mvp
|
||||
|
||||
def test_other_phases_unchanged(self, executor, top_index):
|
||||
executor._top_index_file = top_index
|
||||
executor._update_top_index("completed")
|
||||
data = json.loads(top_index.read_text(encoding="utf-8"))
|
||||
polish = next(p for p in data["phases"] if p["dir"] == "1-polish")
|
||||
assert polish["status"] == "pending"
|
||||
|
||||
def test_nonexistent_dir_is_noop(self, executor, top_index):
|
||||
executor._top_index_file = top_index
|
||||
executor._phase_dir_name = "no-such-dir"
|
||||
original = json.loads(top_index.read_text(encoding="utf-8"))
|
||||
executor._update_top_index("completed")
|
||||
after = json.loads(top_index.read_text(encoding="utf-8"))
|
||||
for p_before, p_after in zip(original["phases"], after["phases"]):
|
||||
assert p_before["status"] == p_after["status"]
|
||||
|
||||
def test_no_top_index_file(self, executor, tmp_path):
|
||||
executor._top_index_file = tmp_path / "nonexistent.json"
|
||||
executor._update_top_index("completed") # should not raise
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _checkout_branch (mocked)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCheckoutBranch:
|
||||
def _mock_git(self, executor, responses):
|
||||
call_idx = {"i": 0}
|
||||
def fake_git(*args):
|
||||
idx = call_idx["i"]
|
||||
call_idx["i"] += 1
|
||||
if idx < len(responses):
|
||||
return responses[idx]
|
||||
return MagicMock(returncode=0, stdout="", stderr="")
|
||||
executor._run_git = fake_git
|
||||
|
||||
def test_already_on_branch(self, executor):
|
||||
self._mock_git(executor, [
|
||||
MagicMock(returncode=0, stdout="feat-mvp\n", stderr=""),
|
||||
])
|
||||
executor._checkout_branch() # should return without checkout
|
||||
|
||||
def test_branch_exists_checkout(self, executor):
|
||||
self._mock_git(executor, [
|
||||
MagicMock(returncode=0, stdout="main\n", stderr=""),
|
||||
MagicMock(returncode=0, stdout="", stderr=""),
|
||||
MagicMock(returncode=0, stdout="", stderr=""),
|
||||
])
|
||||
executor._checkout_branch()
|
||||
|
||||
def test_branch_not_exists_create(self, executor):
|
||||
self._mock_git(executor, [
|
||||
MagicMock(returncode=0, stdout="main\n", stderr=""),
|
||||
MagicMock(returncode=1, stdout="", stderr="not found"),
|
||||
MagicMock(returncode=0, stdout="", stderr=""),
|
||||
])
|
||||
executor._checkout_branch()
|
||||
|
||||
def test_checkout_fails_exits(self, executor):
|
||||
self._mock_git(executor, [
|
||||
MagicMock(returncode=0, stdout="main\n", stderr=""),
|
||||
MagicMock(returncode=1, stdout="", stderr=""),
|
||||
MagicMock(returncode=1, stdout="", stderr="dirty tree"),
|
||||
])
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
executor._checkout_branch()
|
||||
assert exc_info.value.code == 1
|
||||
|
||||
def test_no_git_exits(self, executor):
|
||||
self._mock_git(executor, [
|
||||
MagicMock(returncode=1, stdout="", stderr="not a git repo"),
|
||||
])
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
executor._checkout_branch()
|
||||
assert exc_info.value.code == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _commit_step (mocked)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCommitStep:
|
||||
def test_two_phase_commit(self, executor):
|
||||
calls = []
|
||||
def fake_git(*args):
|
||||
calls.append(args)
|
||||
if args[:2] == ("diff", "--cached"):
|
||||
return MagicMock(returncode=1)
|
||||
return MagicMock(returncode=0, stdout="", stderr="")
|
||||
executor._run_git = fake_git
|
||||
|
||||
executor._commit_step(2, "ui")
|
||||
|
||||
commit_calls = [c for c in calls if c[0] == "commit"]
|
||||
assert len(commit_calls) == 2
|
||||
assert "feat(mvp):" in commit_calls[0][2]
|
||||
assert "chore(mvp):" in commit_calls[1][2]
|
||||
|
||||
def test_no_code_changes_skips_feat_commit(self, executor):
|
||||
call_count = {"diff": 0}
|
||||
calls = []
|
||||
def fake_git(*args):
|
||||
calls.append(args)
|
||||
if args[:2] == ("diff", "--cached"):
|
||||
call_count["diff"] += 1
|
||||
if call_count["diff"] == 1:
|
||||
return MagicMock(returncode=0)
|
||||
return MagicMock(returncode=1)
|
||||
return MagicMock(returncode=0, stdout="", stderr="")
|
||||
executor._run_git = fake_git
|
||||
|
||||
executor._commit_step(2, "ui")
|
||||
|
||||
commit_msgs = [c[2] for c in calls if c[0] == "commit"]
|
||||
assert len(commit_msgs) == 1
|
||||
assert "chore" in commit_msgs[0]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _invoke_codex (mocked)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestInvokeCodex:
|
||||
def test_invokes_codex_with_correct_args(self, executor):
|
||||
mock_result = MagicMock(returncode=0, stdout='{"result": "ok"}', stderr="")
|
||||
step = {"step": 2, "name": "ui"}
|
||||
preamble = "PREAMBLE\n"
|
||||
|
||||
with patch("subprocess.run", return_value=mock_result) as mock_run:
|
||||
output = executor._invoke_codex(step, preamble)
|
||||
|
||||
cmd = mock_run.call_args[0][0]
|
||||
assert cmd[:2] == ["codex", "exec"]
|
||||
assert "--skip-git-repo-check" in cmd
|
||||
assert "--full-auto" in cmd
|
||||
assert "--json" in cmd
|
||||
assert cmd[-1] == "-"
|
||||
assert "PREAMBLE" in mock_run.call_args[1]["input"]
|
||||
assert "UI를 구현하세요" in mock_run.call_args[1]["input"]
|
||||
|
||||
def test_saves_output_json(self, executor):
|
||||
mock_result = MagicMock(returncode=0, stdout='{"ok": true}', stderr="")
|
||||
step = {"step": 2, "name": "ui"}
|
||||
|
||||
with patch("subprocess.run", return_value=mock_result):
|
||||
executor._invoke_codex(step, "preamble")
|
||||
|
||||
output_file = executor._phase_dir / "step2-output.json"
|
||||
assert output_file.exists()
|
||||
data = json.loads(output_file.read_text(encoding="utf-8"))
|
||||
assert data["step"] == 2
|
||||
assert data["name"] == "ui"
|
||||
assert data["exitCode"] == 0
|
||||
|
||||
def test_nonexistent_step_file_exits(self, executor):
|
||||
step = {"step": 99, "name": "nonexistent"}
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
executor._invoke_codex(step, "preamble")
|
||||
assert exc_info.value.code == 1
|
||||
|
||||
def test_timeout_is_1800(self, executor):
|
||||
mock_result = MagicMock(returncode=0, stdout="{}", stderr="")
|
||||
step = {"step": 2, "name": "ui"}
|
||||
|
||||
with patch("subprocess.run", return_value=mock_result) as mock_run:
|
||||
executor._invoke_codex(step, "preamble")
|
||||
|
||||
assert mock_run.call_args[1]["timeout"] == 1800
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# progress_indicator (= 이전 Spinner)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestProgressIndicator:
|
||||
def test_context_manager(self):
|
||||
import time
|
||||
with ex.progress_indicator("test") as pi:
|
||||
time.sleep(0.15)
|
||||
assert pi.elapsed >= 0.1
|
||||
|
||||
def test_elapsed_increases(self):
|
||||
import time
|
||||
with ex.progress_indicator("test") as pi:
|
||||
time.sleep(0.2)
|
||||
assert pi.elapsed > 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# main() CLI 파싱 (mocked)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMainCli:
|
||||
def test_no_args_exits(self):
|
||||
with patch("sys.argv", ["execute.py"]):
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
ex.main()
|
||||
assert exc_info.value.code == 2 # argparse exits with 2
|
||||
|
||||
def test_invalid_phase_dir_exits(self):
|
||||
with patch("sys.argv", ["execute.py", "nonexistent"]):
|
||||
with patch.object(ex, "ROOT", Path("/tmp/fake_nonexistent")):
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
ex.main()
|
||||
assert exc_info.value.code == 1
|
||||
|
||||
def test_missing_index_exits(self, tmp_project):
|
||||
(tmp_project / "phases" / "empty").mkdir()
|
||||
with patch("sys.argv", ["execute.py", "empty"]):
|
||||
with patch.object(ex, "ROOT", tmp_project):
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
ex.main()
|
||||
assert exc_info.value.code == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _check_blockers (= 이전 main() error/blocked 체크)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCheckBlockers:
|
||||
def _make_executor_with_steps(self, tmp_project, steps):
|
||||
d = tmp_project / "phases" / "test-phase"
|
||||
d.mkdir(exist_ok=True)
|
||||
index = {"project": "T", "phase": "test", "steps": steps}
|
||||
(d / "index.json").write_text(json.dumps(index), encoding="utf-8")
|
||||
|
||||
with patch.object(ex, "ROOT", tmp_project):
|
||||
inst = ex.StepExecutor.__new__(ex.StepExecutor)
|
||||
inst._root = str(tmp_project)
|
||||
inst._phases_dir = tmp_project / "phases"
|
||||
inst._phase_dir = d
|
||||
inst._phase_dir_name = "test-phase"
|
||||
inst._index_file = d / "index.json"
|
||||
inst._top_index_file = tmp_project / "phases" / "index.json"
|
||||
inst._phase_name = "test"
|
||||
inst._total = len(steps)
|
||||
return inst
|
||||
|
||||
def test_error_step_exits_1(self, tmp_project):
|
||||
steps = [
|
||||
{"step": 0, "name": "ok", "status": "completed"},
|
||||
{"step": 1, "name": "bad", "status": "error", "error_message": "fail"},
|
||||
]
|
||||
inst = self._make_executor_with_steps(tmp_project, steps)
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
inst._check_blockers()
|
||||
assert exc_info.value.code == 1
|
||||
|
||||
def test_blocked_step_exits_2(self, tmp_project):
|
||||
steps = [
|
||||
{"step": 0, "name": "ok", "status": "completed"},
|
||||
{"step": 1, "name": "stuck", "status": "blocked", "blocked_reason": "API key"},
|
||||
]
|
||||
inst = self._make_executor_with_steps(tmp_project, steps)
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
inst._check_blockers()
|
||||
assert exc_info.value.code == 2
|
||||
@@ -0,0 +1,196 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Basic validation for the Markdown document harness template.
|
||||
|
||||
This check is intentionally lightweight: it verifies that the template files
|
||||
exist and keep the sections that later Harness steps depend on.
|
||||
"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
try:
|
||||
import tomllib
|
||||
except ModuleNotFoundError: # pragma: no cover - Python < 3.11 compatibility
|
||||
tomllib = None
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
|
||||
REQUIRED_FILES = [
|
||||
"README.md",
|
||||
"AGENTS.md",
|
||||
"docs/PRD.md",
|
||||
"docs/ResearchNote.md",
|
||||
"docs/DraftFeedback.md",
|
||||
"docs/FinalFeedback.md",
|
||||
"docs/ARCHITECTURE.md",
|
||||
"docs/ADR.md",
|
||||
"docs/UI_GUIDE.md",
|
||||
".agents/skills/document-harness/SKILL.md",
|
||||
".agents/skills/document-harness/references/phase-templates.md",
|
||||
".agents/skills/document-review/SKILL.md",
|
||||
".codex/config.toml",
|
||||
".codex/hooks.json",
|
||||
".codex/hooks/pre_tool_guard.py",
|
||||
".codex/hooks/stop_validate.py",
|
||||
".codex/agents/doc_researcher.toml",
|
||||
".codex/agents/doc_drafter.toml",
|
||||
".codex/agents/doc_reviewer.toml",
|
||||
".codex/agents/evidence_checker.toml",
|
||||
]
|
||||
|
||||
REQUIRED_DIRS = [
|
||||
"docs",
|
||||
"scripts",
|
||||
".agents",
|
||||
".agents/skills",
|
||||
".agents/skills/document-harness",
|
||||
".agents/skills/document-review",
|
||||
".codex",
|
||||
".codex/hooks",
|
||||
".codex/agents",
|
||||
]
|
||||
|
||||
REQUIRED_SECTIONS = {
|
||||
"README.md": [
|
||||
"## 핵심 아이디어",
|
||||
"## Codex 구성",
|
||||
"## 빠른 시작",
|
||||
"## 자동 실행 방식",
|
||||
"## 피드백 게이트",
|
||||
"## 검증",
|
||||
],
|
||||
"docs/PRD.md": [
|
||||
"## 문서 목적",
|
||||
"## 대상 독자",
|
||||
"## 최종 산출물",
|
||||
"## 문서 개요",
|
||||
"## 중요 키워드",
|
||||
"## 핵심 질문",
|
||||
"## 범위",
|
||||
"## 톤과 스타일",
|
||||
"## 조사 요구사항",
|
||||
"## 사용자 피드백 방식",
|
||||
],
|
||||
"docs/ResearchNote.md": [
|
||||
"## 조사 범위",
|
||||
"## 조사 일시",
|
||||
"## 검색어",
|
||||
"## 핵심 결론",
|
||||
"## 출처 목록",
|
||||
"## 쟁점과 상반된 주장",
|
||||
"## 확인 필요",
|
||||
],
|
||||
"AGENTS.md": [
|
||||
"## 목적",
|
||||
"## Codex 구성",
|
||||
"## 기본 산출물",
|
||||
"## 문서 작성 규칙",
|
||||
"## Codex 작업 규칙",
|
||||
"## 권장 워크플로우",
|
||||
"## 명령어",
|
||||
],
|
||||
".agents/skills/document-harness/SKILL.md": [
|
||||
"# Document Harness Skill",
|
||||
"## Operating Rules",
|
||||
"## Staged Workflow",
|
||||
"## Validation",
|
||||
],
|
||||
".agents/skills/document-review/SKILL.md": [
|
||||
"# Document Review Skill",
|
||||
"## Read First",
|
||||
"## Review Checklist",
|
||||
"## Output Format",
|
||||
],
|
||||
}
|
||||
|
||||
REQUIRED_JSON_FILES = [
|
||||
".codex/hooks.json",
|
||||
]
|
||||
|
||||
REQUIRED_TOML_FILES = [
|
||||
".codex/config.toml",
|
||||
".codex/agents/doc_researcher.toml",
|
||||
".codex/agents/doc_drafter.toml",
|
||||
".codex/agents/doc_reviewer.toml",
|
||||
".codex/agents/evidence_checker.toml",
|
||||
]
|
||||
|
||||
|
||||
def first_nonempty_line(path: Path) -> str:
|
||||
for line in path.read_text(encoding="utf-8").splitlines():
|
||||
if line.strip():
|
||||
return line.strip()
|
||||
return ""
|
||||
|
||||
|
||||
def markdown_file_has_valid_start(path: Path) -> bool:
|
||||
first = first_nonempty_line(path)
|
||||
if first.startswith("# "):
|
||||
return True
|
||||
if first == "---" and path.name == "SKILL.md":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def main() -> int:
|
||||
errors: list[str] = []
|
||||
|
||||
for rel in REQUIRED_DIRS:
|
||||
path = ROOT / rel
|
||||
if not path.is_dir():
|
||||
errors.append(f"missing directory: {rel}")
|
||||
|
||||
for rel in REQUIRED_FILES:
|
||||
path = ROOT / rel
|
||||
if not path.is_file():
|
||||
errors.append(f"missing file: {rel}")
|
||||
continue
|
||||
|
||||
if path.suffix == ".md":
|
||||
if not markdown_file_has_valid_start(path):
|
||||
errors.append(f"markdown file must start with a level-1 heading or Skill frontmatter: {rel}")
|
||||
|
||||
for rel, sections in REQUIRED_SECTIONS.items():
|
||||
path = ROOT / rel
|
||||
if not path.is_file():
|
||||
continue
|
||||
|
||||
text = path.read_text(encoding="utf-8")
|
||||
for section in sections:
|
||||
if section not in text:
|
||||
errors.append(f"missing section in {rel}: {section}")
|
||||
|
||||
for rel in REQUIRED_JSON_FILES:
|
||||
path = ROOT / rel
|
||||
if not path.is_file():
|
||||
continue
|
||||
try:
|
||||
json.loads(path.read_text(encoding="utf-8"))
|
||||
except json.JSONDecodeError as exc:
|
||||
errors.append(f"invalid JSON in {rel}: {exc}")
|
||||
|
||||
if tomllib is not None:
|
||||
for rel in REQUIRED_TOML_FILES:
|
||||
path = ROOT / rel
|
||||
if not path.is_file():
|
||||
continue
|
||||
try:
|
||||
tomllib.loads(path.read_text(encoding="utf-8"))
|
||||
except tomllib.TOMLDecodeError as exc:
|
||||
errors.append(f"invalid TOML in {rel}: {exc}")
|
||||
|
||||
if errors:
|
||||
print("Document harness validation failed:")
|
||||
for error in errors:
|
||||
print(f"- {error}")
|
||||
return 1
|
||||
|
||||
print("Document harness validation passed.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user