#!/usr/bin/env python3 """ Harness Step Executor — phase 내 step을 순차 실행하고 자가 교정한다. Usage: python scripts/execute.py [--push] """ import argparse import contextlib import fnmatch import json import os import re import shutil import subprocess import sys import threading import time import types from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional ROOT = Path(__file__).resolve().parent.parent def configure_output_encoding(): for stream in (sys.stdout, sys.stderr): if hasattr(stream, "reconfigure"): stream.reconfigure(encoding="utf-8", errors="replace") @contextlib.contextmanager def progress_indicator(label: str): """터미널 진행 표시기. with 문으로 사용하며 .elapsed 로 경과 시간을 읽는다.""" frames = "◐◓◑◒" stop = threading.Event() t0 = time.monotonic() def _animate(): idx = 0 while not stop.wait(0.12): sec = int(time.monotonic() - t0) sys.stderr.write(f"\r{frames[idx % len(frames)]} {label} [{sec}s]") sys.stderr.flush() idx += 1 sys.stderr.write("\r" + " " * (len(label) + 20) + "\r") sys.stderr.flush() th = threading.Thread(target=_animate, daemon=True) th.start() info = types.SimpleNamespace(elapsed=0.0) try: yield info finally: stop.set() th.join() info.elapsed = time.monotonic() - t0 class StepExecutor: """Phase 디렉토리 안의 step들을 순차 실행하는 하네스.""" MAX_RETRIES = 3 VALIDATION_COMMANDS = ( [sys.executable, "-m", "unittest", "discover", "-s", "scripts", "-p", "test_*.py"], [sys.executable, "scripts/validate_workspace.py"], ) FEAT_MSG = "feat({phase}): step {num} — {name}" CHORE_MSG = "chore({phase}): step {num} output" TZ = timezone(timedelta(hours=9)) def __init__(self, phase_dir_name: str, *, auto_push: bool = False): self._root = str(ROOT) self._phases_dir = ROOT / "phases" self._phase_dir = self._phases_dir / phase_dir_name self._phase_dir_name = phase_dir_name self._top_index_file = self._phases_dir / "index.json" self._auto_push = auto_push if not self._phase_dir.is_dir(): print(f"ERROR: {self._phase_dir} not found") sys.exit(1) self._index_file = self._phase_dir / "index.json" if not self._index_file.exists(): print(f"ERROR: {self._index_file} not found") sys.exit(1) idx = self._read_json(self._index_file) self._project = idx.get("project", "project") self._phase_name = idx.get("phase", phase_dir_name) self._total = len(idx["steps"]) def run(self): self._print_header() self._check_blockers() self._assert_clean_worktree("before branch checkout") self._checkout_branch() guardrails = self._load_guardrails() self._ensure_created_at() self._execute_all_steps(guardrails) self._finalize() # --- timestamps --- def _stamp(self) -> str: return datetime.now(self.TZ).strftime("%Y-%m-%dT%H:%M:%S%z") # --- JSON I/O --- @staticmethod def _read_json(p: Path) -> dict: return json.loads(p.read_text(encoding="utf-8")) @staticmethod def _write_json(p: Path, data: dict): p.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") # --- git --- def _run_git(self, *args) -> subprocess.CompletedProcess: cmd = ["git"] + list(args) return subprocess.run(cmd, cwd=self._root, capture_output=True, text=True) def _validate_before_commit(self, commit_message: str): print(f" Validation before commit: {commit_message}") for cmd in self.VALIDATION_COMMANDS: r = subprocess.run(cmd, cwd=self._root, capture_output=True, text=True) if r.returncode != 0: print(f" ERROR: validation failed before commit: {' '.join(cmd)}") if r.stdout: print(r.stdout[-2000:]) if r.stderr: print(r.stderr[-2000:]) sys.exit(1) def _branch_name(self) -> str: slug = re.sub(r"[^A-Za-z0-9._-]+", "-", self._phase_name.strip()) slug = slug.strip("/.-") if not slug: slug = self._phase_dir_name return f"codex/{slug}" def _assert_clean_worktree(self, context: str): r = self._run_git("status", "--porcelain") if r.returncode != 0: print(" ERROR: git status failed.") print(f" {r.stderr.strip()}") sys.exit(1) dirty = r.stdout.strip() if dirty: print(f" ERROR: dirty worktree detected {context}.") print(" Commit, stash, or remove these changes before running scripts/execute.py:") for line in dirty.splitlines(): print(f" {line}") sys.exit(1) @staticmethod def _normalize_rel_path(path: str) -> str: return path.replace("\\", "/").lstrip("./") def _path_allowed(self, path: str, patterns: list[str]) -> bool: rel = self._normalize_rel_path(path) for raw in patterns: pattern = self._normalize_rel_path(str(raw)) if not pattern: continue if pattern.endswith("/") and rel.startswith(pattern): return True if any(ch in pattern for ch in "*?[") and fnmatch.fnmatchcase(rel, pattern): return True if rel == pattern: return True return False def _validate_step_allowlist(self, step: dict): allowed = step.get("allowed_paths") if ( not isinstance(allowed, list) or not allowed or not all(isinstance(p, str) and p.strip() for p in allowed) ): print(f" ERROR: Step {step.get('step')} must define non-empty allowed_paths.") sys.exit(1) def _changed_paths(self) -> list[str]: paths: list[str] = [] tracked = self._run_git("diff", "--name-only") if tracked.returncode != 0: print(" ERROR: git diff --name-only failed.") print(f" {tracked.stderr.strip()}") sys.exit(1) paths.extend(tracked.stdout.splitlines()) staged = self._run_git("diff", "--cached", "--name-only") if staged.returncode != 0: print(" ERROR: git diff --cached --name-only failed.") print(f" {staged.stderr.strip()}") sys.exit(1) paths.extend(staged.stdout.splitlines()) untracked = self._run_git("ls-files", "--others", "--exclude-standard") if untracked.returncode != 0: print(" ERROR: git ls-files --others failed.") print(f" {untracked.stderr.strip()}") sys.exit(1) paths.extend(untracked.stdout.splitlines()) return sorted({self._normalize_rel_path(p) for p in paths if p.strip()}) def _housekeeping_paths(self, step_num: int) -> set[str]: return { f"phases/{self._phase_dir_name}/index.json", f"phases/{self._phase_dir_name}/step{step_num}-output.json", "phases/index.json", } def _classify_step_changes(self, step_num: int, step: dict, changed_paths: list[str]) -> tuple[list[str], list[str], list[str]]: allowed_patterns = step.get("allowed_paths", []) housekeeping_set = self._housekeeping_paths(step_num) allowed: list[str] = [] housekeeping: list[str] = [] disallowed: list[str] = [] for path in changed_paths: rel = self._normalize_rel_path(path) if rel in housekeeping_set: housekeeping.append(rel) elif self._path_allowed(rel, allowed_patterns): allowed.append(rel) else: disallowed.append(rel) return allowed, housekeeping, disallowed def _checkout_branch(self): branch = self._branch_name() r = self._run_git("rev-parse", "--abbrev-ref", "HEAD") if r.returncode != 0: print(f" ERROR: git을 사용할 수 없거나 git repo가 아닙니다.") print(f" {r.stderr.strip()}") sys.exit(1) if r.stdout.strip() == branch: return r = self._run_git("rev-parse", "--verify", branch) r = self._run_git("checkout", branch) if r.returncode == 0 else self._run_git("checkout", "-b", branch) if r.returncode != 0: print(f" ERROR: 브랜치 '{branch}' checkout 실패.") print(f" {r.stderr.strip()}") print(f" Hint: 변경사항을 stash하거나 commit한 후 다시 시도하세요.") sys.exit(1) print(f" Branch: {branch}") def _stage_paths(self, paths: list[str]): if not paths: return r = self._run_git("add", "--", *paths) if r.returncode != 0: print(" ERROR: git add failed.") print(f" {r.stderr.strip()}") sys.exit(1) def _commit_step(self, step: dict, step_name: str): step_num = step["step"] changed = self._changed_paths() allowed, housekeeping, disallowed = self._classify_step_changes(step_num, step, changed) if disallowed: print(f" ERROR: Step {step_num} modified files outside allowed_paths:") for path in disallowed: print(f" {path}") sys.exit(1) if allowed: msg = self.FEAT_MSG.format(phase=self._phase_name, num=step_num, name=step_name) self._validate_before_commit(msg) self._stage_paths(allowed) if self._run_git("diff", "--cached", "--quiet").returncode != 0: r = self._run_git("commit", "-m", msg) if r.returncode != 0: print(f" ERROR: code commit failed: {r.stderr.strip()}") sys.exit(1) print(f" Commit: {msg}") if housekeeping: msg = self.CHORE_MSG.format(phase=self._phase_name, num=step_num) self._validate_before_commit(msg) self._stage_paths(housekeeping) if self._run_git("diff", "--cached", "--quiet").returncode != 0: r = self._run_git("commit", "-m", msg) if r.returncode != 0: print(f" ERROR: housekeeping commit failed: {r.stderr.strip()}") sys.exit(1) # --- top-level index --- def _update_top_index(self, status: str): if not self._top_index_file.exists(): return top = self._read_json(self._top_index_file) ts = self._stamp() for phase in top.get("phases", []): if phase.get("dir") == self._phase_dir_name: phase["status"] = status ts_key = {"completed": "completed_at", "error": "failed_at", "blocked": "blocked_at"}.get(status) if ts_key: phase[ts_key] = ts break self._write_json(self._top_index_file, top) # --- guardrails & context --- def _load_guardrails(self) -> str: sections = [] agents_md = ROOT / "AGENTS.md" if agents_md.exists(): sections.append(f"## 프로젝트 규칙 (AGENTS.md)\n\n{agents_md.read_text(encoding='utf-8')}") docs_dir = ROOT / "docs" if docs_dir.is_dir(): for doc in sorted(docs_dir.glob("*.md")): sections.append(f"## {doc.stem}\n\n{doc.read_text(encoding='utf-8')}") return "\n\n---\n\n".join(sections) if sections else "" @staticmethod def _build_step_context(index: dict) -> str: lines = [ f"- Step {s['step']} ({s['name']}): {s['summary']}" for s in index["steps"] if s["status"] == "completed" and s.get("summary") ] if not lines: return "" return "## 이전 Step 산출물\n\n" + "\n".join(lines) + "\n\n" def _build_preamble(self, guardrails: str, step_context: str, allowed_paths: list[str], prev_error: Optional[str] = None) -> str: commit_example = self.FEAT_MSG.format( phase=self._phase_name, num="N", name="" ) retry_section = "" if prev_error: retry_section = ( f"\n## ⚠ 이전 시도 실패 — 아래 에러를 반드시 참고하여 수정하라\n\n" f"{prev_error}\n\n---\n\n" ) return ( f"당신은 {self._project} 프로젝트의 개발자입니다. 아래 step을 수행하세요.\n\n" f"{guardrails}\n\n---\n\n" f"{step_context}{retry_section}" f"## Step file allowlist\n\n" f"This step may modify only these repository-relative paths:\n" f"{chr(10).join(f'- {p}' for p in allowed_paths)}\n\n" f"## 작업 규칙\n\n" f"1. 이전 step에서 작성된 코드를 확인하고 일관성을 유지하라.\n" f"2. 이 step에 명시된 작업만 수행하라. 추가 기능이나 파일을 만들지 마라.\n" f"3. 기존 테스트를 깨뜨리지 마라.\n" f"4. AC(Acceptance Criteria) 검증을 직접 실행하라.\n" f"5. /phases/{self._phase_dir_name}/index.json의 해당 step status를 업데이트하라:\n" f" - AC 통과 → \"completed\" + \"summary\" 필드에 이 step의 산출물을 한 줄로 요약\n" f" - {self.MAX_RETRIES}회 수정 시도 후에도 실패 → \"error\" + \"error_message\" 기록\n" f" - 사용자 개입이 필요한 경우 (API 키, 인증, 수동 설정 등) → \"blocked\" + \"blocked_reason\" 기록 후 즉시 중단\n" f"6. 모든 변경사항을 커밋하라:\n" f" {commit_example}\n\n---\n\n" ) # --- Codex 호출 --- def _invoke_codex(self, step: dict, preamble: str) -> dict: step_num, step_name = step["step"], step["name"] step_file = self._phase_dir / f"step{step_num}.md" if not step_file.exists(): print(f" ERROR: {step_file} not found") sys.exit(1) prompt = preamble + step_file.read_text(encoding="utf-8") result = subprocess.run( self._codex_exec_command(), cwd=self._root, capture_output=True, text=True, input=prompt, encoding="utf-8", errors="replace", timeout=1800, ) if result.returncode != 0: print(f"\n WARN: Codex가 비정상 종료됨 (code {result.returncode})") if result.stderr: print(f" stderr: {result.stderr[:500]}") output = { "step": step_num, "name": step_name, "exitCode": result.returncode, "stdout": result.stdout, "stderr": result.stderr, } out_path = self._phase_dir / f"step{step_num}-output.json" with open(out_path, "w", encoding="utf-8") as f: json.dump(output, f, indent=2, ensure_ascii=False) return output # --- 헤더 & 검증 --- @staticmethod def _codex_command() -> str: override = os.environ.get("HARNESS_CODEX_COMMAND", "").strip() if override: return override return ( shutil.which("codex.cmd") or shutil.which("codex.exe") or shutil.which("codex") or "codex" ) def _codex_exec_command(self) -> list[str]: cmd = [self._codex_command(), "exec"] model = os.environ.get("HARNESS_CODEX_MODEL", "").strip() if model: cmd.extend(["-m", model]) cmd.extend(["--dangerously-bypass-approvals-and-sandbox", "--json", "-"]) return cmd def _print_header(self): print(f"\n{'='*60}") print(f" Harness Step Executor") print(f" Phase: {self._phase_name} | Steps: {self._total}") if self._auto_push: print(f" Auto-push: enabled") print(f"{'='*60}") def _check_blockers(self): index = self._read_json(self._index_file) for s in reversed(index["steps"]): if s["status"] == "error": print(f"\n ✗ Step {s['step']} ({s['name']}) failed.") print(f" Error: {s.get('error_message', 'unknown')}") print(f" Fix and reset status to 'pending' to retry.") sys.exit(1) if s["status"] == "blocked": print(f"\n ⏸ Step {s['step']} ({s['name']}) blocked.") print(f" Reason: {s.get('blocked_reason', 'unknown')}") print(f" Resolve and reset status to 'pending' to retry.") sys.exit(2) if s["status"] != "pending": break def _ensure_created_at(self): index = self._read_json(self._index_file) if "created_at" not in index: index["created_at"] = self._stamp() self._write_json(self._index_file, index) # --- 실행 루프 --- def _execute_single_step(self, step: dict, guardrails: str) -> bool: """단일 step 실행 (재시도 포함). 완료되면 True, 실패/차단이면 False.""" step_num, step_name = step["step"], step["name"] done = sum(1 for s in self._read_json(self._index_file)["steps"] if s["status"] == "completed") prev_error = None for attempt in range(1, self.MAX_RETRIES + 1): index = self._read_json(self._index_file) step_context = self._build_step_context(index) preamble = self._build_preamble(guardrails, step_context, step.get("allowed_paths", []), prev_error) tag = f"Step {step_num}/{self._total - 1} ({done} done): {step_name}" if attempt > 1: tag += f" [retry {attempt}/{self.MAX_RETRIES}]" with progress_indicator(tag) as pi: self._invoke_codex(step, preamble) elapsed = int(pi.elapsed) index = self._read_json(self._index_file) status = next((s.get("status", "pending") for s in index["steps"] if s["step"] == step_num), "pending") ts = self._stamp() if status == "completed": for s in index["steps"]: if s["step"] == step_num: s["completed_at"] = ts self._write_json(self._index_file, index) self._commit_step(step, step_name) print(f" ✓ Step {step_num}: {step_name} [{elapsed}s]") return True if status == "blocked": for s in index["steps"]: if s["step"] == step_num: s["blocked_at"] = ts self._write_json(self._index_file, index) reason = next((s.get("blocked_reason", "") for s in index["steps"] if s["step"] == step_num), "") print(f" ⏸ Step {step_num}: {step_name} blocked [{elapsed}s]") print(f" Reason: {reason}") self._update_top_index("blocked") sys.exit(2) err_msg = next( (s.get("error_message", "Step did not update status") for s in index["steps"] if s["step"] == step_num), "Step did not update status", ) if attempt < self.MAX_RETRIES: for s in index["steps"]: if s["step"] == step_num: s["status"] = "pending" s.pop("error_message", None) self._write_json(self._index_file, index) prev_error = err_msg print(f" ↻ Step {step_num}: retry {attempt}/{self.MAX_RETRIES} — {err_msg}") else: for s in index["steps"]: if s["step"] == step_num: s["status"] = "error" s["error_message"] = f"[{self.MAX_RETRIES}회 시도 후 실패] {err_msg}" s["failed_at"] = ts self._write_json(self._index_file, index) self._commit_step(step, step_name) print(f" ✗ Step {step_num}: {step_name} failed after {self.MAX_RETRIES} attempts [{elapsed}s]") print(f" Error: {err_msg}") self._update_top_index("error") sys.exit(1) return False # unreachable def _execute_all_steps(self, guardrails: str): while True: index = self._read_json(self._index_file) pending = next((s for s in index["steps"] if s["status"] == "pending"), None) if pending is None: print("\n All steps completed!") return self._validate_step_allowlist(pending) step_num = pending["step"] for s in index["steps"]: if s["step"] == step_num and "started_at" not in s: s["started_at"] = self._stamp() self._write_json(self._index_file, index) break self._execute_single_step(pending, guardrails) def _finalize(self): index = self._read_json(self._index_file) index["completed_at"] = self._stamp() self._write_json(self._index_file, index) self._update_top_index("completed") final_paths = [f"phases/{self._phase_dir_name}/index.json"] if self._top_index_file.exists(): final_paths.append("phases/index.json") self._validate_before_commit(f"chore({self._phase_name}): mark phase completed") self._stage_paths(final_paths) if self._run_git("diff", "--cached", "--quiet").returncode != 0: msg = f"chore({self._phase_name}): mark phase completed" r = self._run_git("commit", "-m", msg) if r.returncode != 0: print(f" ERROR: phase completion commit failed: {r.stderr.strip()}") sys.exit(1) else: print(f" ✓ {msg}") if self._auto_push: branch = self._branch_name() r = self._run_git("push", "-u", "origin", branch) if r.returncode != 0: print(f"\n ERROR: git push 실패: {r.stderr.strip()}") sys.exit(1) print(f" ✓ Pushed to origin/{branch}") print(f"\n{'='*60}") print(f" Phase '{self._phase_name}' completed!") print(f"{'='*60}") def main(): configure_output_encoding() parser = argparse.ArgumentParser(description="Harness Step Executor") parser.add_argument("phase_dir", help="Phase directory name (e.g. 0-mvp)") parser.add_argument("--push", action="store_true", help="Push branch after completion") args = parser.parse_args() StepExecutor(args.phase_dir, auto_push=args.push).run() if __name__ == "__main__": main()