#!/usr/bin/env python3 """ Report Harness Step Executor - run report phase steps sequentially with Codex and self-correction. Usage: python scripts/execute.py [--push] """ import argparse import contextlib import json import subprocess import sys import threading import time import types from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional ROOT = Path(__file__).resolve().parent.parent @contextlib.contextmanager def progress_indicator(label: str): """터미널 진행 표시기. with 문으로 사용하며 .elapsed 로 경과 시간을 읽는다.""" frames = "◐◓◑◒" stop = threading.Event() t0 = time.monotonic() def _animate(): idx = 0 while not stop.wait(0.12): sec = int(time.monotonic() - t0) sys.stderr.write(f"\r{frames[idx % len(frames)]} {label} [{sec}s]") sys.stderr.flush() idx += 1 sys.stderr.write("\r" + " " * (len(label) + 20) + "\r") sys.stderr.flush() th = threading.Thread(target=_animate, daemon=True) th.start() info = types.SimpleNamespace(elapsed=0.0) try: yield info finally: stop.set() th.join() info.elapsed = time.monotonic() - t0 class StepExecutor: """Phase 디렉토리 안의 step들을 순차 실행하는 하네스.""" MAX_RETRIES = 3 DOCS_MSG = "docs({phase}): step {num} - {name}" CHORE_MSG = "chore({phase}): step {num} output" TZ = timezone(timedelta(hours=9)) def __init__(self, phase_dir_name: str, *, auto_push: bool = False): self._root = str(ROOT) self._phases_dir = ROOT / "phases" self._phase_dir = self._phases_dir / phase_dir_name self._phase_dir_name = phase_dir_name self._top_index_file = self._phases_dir / "index.json" self._auto_push = auto_push self._git_enabled: Optional[bool] = None if not self._phase_dir.is_dir(): print(f"ERROR: {self._phase_dir} not found") sys.exit(1) self._index_file = self._phase_dir / "index.json" if not self._index_file.exists(): print(f"ERROR: {self._index_file} not found") sys.exit(1) idx = self._read_json(self._index_file) self._project = idx.get("project", "project") self._phase_name = idx.get("phase", phase_dir_name) self._total = len(idx["steps"]) def run(self): self._print_header() self._check_blockers() self._checkout_branch() guardrails = self._load_guardrails() self._ensure_created_at() self._execute_all_steps(guardrails) self._finalize() # --- timestamps --- def _stamp(self) -> str: return datetime.now(self.TZ).strftime("%Y-%m-%dT%H:%M:%S%z") # --- JSON I/O --- @staticmethod def _read_json(p: Path) -> dict: return json.loads(p.read_text(encoding="utf-8")) @staticmethod def _write_json(p: Path, data: dict): p.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") # --- git --- def _run_git(self, *args) -> subprocess.CompletedProcess: cmd = ["git"] + list(args) return subprocess.run(cmd, cwd=self._root, capture_output=True, text=True) def _ensure_git_enabled(self) -> bool: if self._git_enabled is not None: return self._git_enabled r = self._run_git("rev-parse", "--is-inside-work-tree") self._git_enabled = r.returncode == 0 and r.stdout.strip() == "true" if not self._git_enabled: print(" WARN: git 저장소가 아니므로 브랜치/커밋 관리를 건너뜁니다.") return self._git_enabled def _checkout_branch(self): if not self._ensure_git_enabled(): return branch = f"report-{self._phase_name}" r = self._run_git("rev-parse", "--abbrev-ref", "HEAD") if r.returncode != 0: print(f" ERROR: 현재 브랜치를 확인할 수 없습니다.") print(f" {r.stderr.strip()}") sys.exit(1) if r.stdout.strip() == branch: return r = self._run_git("rev-parse", "--verify", branch) r = self._run_git("checkout", branch) if r.returncode == 0 else self._run_git("checkout", "-b", branch) if r.returncode != 0: print(f" ERROR: 브랜치 '{branch}' checkout 실패.") print(f" {r.stderr.strip()}") print(f" Hint: 변경사항을 stash하거나 commit한 후 다시 시도하세요.") sys.exit(1) print(f" Branch: {branch}") def _commit_step(self, step_num: int, step_name: str): if not self._ensure_git_enabled(): return output_rel = f"phases/{self._phase_dir_name}/step{step_num}-output.json" index_rel = f"phases/{self._phase_dir_name}/index.json" self._run_git("add", "-A") self._run_git("reset", "HEAD", "--", output_rel) self._run_git("reset", "HEAD", "--", index_rel) if self._run_git("diff", "--cached", "--quiet").returncode != 0: msg = self.DOCS_MSG.format(phase=self._phase_name, num=step_num, name=step_name) r = self._run_git("commit", "-m", msg) if r.returncode == 0: print(f" Commit: {msg}") else: print(f" WARN: 보고서 산출물 커밋 실패: {r.stderr.strip()}") self._run_git("add", "-A") if self._run_git("diff", "--cached", "--quiet").returncode != 0: msg = self.CHORE_MSG.format(phase=self._phase_name, num=step_num) r = self._run_git("commit", "-m", msg) if r.returncode != 0: print(f" WARN: housekeeping 커밋 실패: {r.stderr.strip()}") # --- top-level index --- def _update_top_index(self, status: str): if not self._top_index_file.exists(): return top = self._read_json(self._top_index_file) ts = self._stamp() for phase in top.get("phases", []): if phase.get("dir") == self._phase_dir_name: phase["status"] = status ts_key = {"completed": "completed_at", "error": "failed_at", "blocked": "blocked_at"}.get(status) if ts_key: phase[ts_key] = ts break self._write_json(self._top_index_file, top) # --- guardrails & context --- def _load_guardrails(self) -> str: sections = [] agents_md = ROOT / "AGENTS.md" if agents_md.exists(): sections.append( f"## 프로젝트 규칙 (AGENTS.md)\n\n{agents_md.read_text(encoding='utf-8')}" ) docs_dir = ROOT / "docs" if docs_dir.is_dir(): for doc in sorted(docs_dir.glob("*.md")): sections.append(f"## {doc.stem}\n\n{doc.read_text(encoding='utf-8')}") return "\n\n---\n\n".join(sections) if sections else "" @staticmethod def _build_step_context(index: dict) -> str: lines = [ f"- Step {s['step']} ({s['name']}): {s['summary']}" for s in index["steps"] if s["status"] == "completed" and s.get("summary") ] if not lines: return "" return "## 이전 Step 산출물\n\n" + "\n".join(lines) + "\n\n" def _build_preamble(self, guardrails: str, step_context: str, prev_error: Optional[str] = None) -> str: retry_section = "" if prev_error: retry_section = ( f"\n## ⚠ 이전 시도 실패 — 아래 에러를 반드시 참고하여 수정하라\n\n" f"{prev_error}\n\n---\n\n" ) return ( f"당신은 {self._project} 보고서 프로젝트의 리서치/작성 에이전트입니다. 아래 step을 수행하세요.\n\n" f"{guardrails}\n\n---\n\n" f"{step_context}{retry_section}" f"## 작업 규칙\n\n" f"1. 이전 step 산출물과 `docs/*.md` 문서를 확인하고 보고서 맥락을 유지하라.\n" f"2. 이 step에 명시된 리서치, 출처 검토, 작성, 피드백 작업만 수행하라. 범위 밖 산출물을 만들지 마라.\n" f"3. 사실 주장, 수치, 최신 동향, 법/정책/시장 정보는 웹 검색으로 확인하고 출처를 `docs/RESEARCH_LOG.md`에 기록하라.\n" f"4. 출처가 부족한 핵심 주장은 본문에 단정하지 말고 `docs/RESEARCH_LOG.md`의 `검증 필요`에 남겨라.\n" f"5. AC(Acceptance Criteria) 검증을 직접 실행하라.\n" f"6. /phases/{self._phase_dir_name}/index.json의 해당 step status를 업데이트하라:\n" f" - AC 통과 → \"completed\" + \"summary\" 필드에 이 step의 산출물을 한 줄로 요약\n" f" - {self.MAX_RETRIES}회 수정 시도 후에도 실패 → \"error\" + \"error_message\" 기록\n" f" - 사용자 개입이 필요한 경우 (보고서 방향성 누락, 유료 자료 접근, 인증, 수동 판단 등) → \"blocked\" + \"blocked_reason\" 기록 후 즉시 중단\n" f"7. 변경사항은 워킹 트리에 남겨라. step 완료 후 커밋은 execute.py가 정리한다.\n\n---\n\n" ) # --- Codex 호출 --- def _invoke_codex(self, step: dict, preamble: str) -> dict: step_num, step_name = step["step"], step["name"] step_file = self._phase_dir / f"step{step_num}.md" if not step_file.exists(): print(f" ERROR: {step_file} not found") sys.exit(1) prompt = preamble + step_file.read_text(encoding="utf-8") last_message_path = self._phase_dir / f"step{step_num}-last-message.txt" result = subprocess.run( ["codex", "exec", "--full-auto", "--json", "-C", self._root, "-o", str(last_message_path)], cwd=self._root, input=prompt, capture_output=True, text=True, timeout=1800, ) if result.returncode != 0: print(f"\n WARN: Codex가 비정상 종료됨 (code {result.returncode})") if result.stderr: print(f" stderr: {result.stderr[:500]}") final_message = None if last_message_path.exists(): final_message = last_message_path.read_text(encoding="utf-8") output = { "step": step_num, "name": step_name, "exitCode": result.returncode, "finalMessage": final_message, "stdout": result.stdout, "stderr": result.stderr, } out_path = self._phase_dir / f"step{step_num}-output.json" with open(out_path, "w", encoding="utf-8") as f: json.dump(output, f, indent=2, ensure_ascii=False) return output # --- 헤더 & 검증 --- def _print_header(self): print(f"\n{'='*60}") print(f" Report Harness Step Executor") print(f" Phase: {self._phase_name} | Steps: {self._total}") if self._auto_push: print(f" Auto-push: enabled") print(f"{'='*60}") def _check_blockers(self): index = self._read_json(self._index_file) for s in reversed(index["steps"]): if s["status"] == "error": print(f"\n ✗ Step {s['step']} ({s['name']}) failed.") print(f" Error: {s.get('error_message', 'unknown')}") print(f" Fix and reset status to 'pending' to retry.") sys.exit(1) if s["status"] == "blocked": print(f"\n ⏸ Step {s['step']} ({s['name']}) blocked.") print(f" Reason: {s.get('blocked_reason', 'unknown')}") print(f" Resolve and reset status to 'pending' to retry.") sys.exit(2) if s["status"] != "pending": break def _ensure_created_at(self): index = self._read_json(self._index_file) if "created_at" not in index: index["created_at"] = self._stamp() self._write_json(self._index_file, index) # --- 실행 루프 --- def _execute_single_step(self, step: dict, guardrails: str) -> bool: """단일 step 실행 (재시도 포함). 완료되면 True, 실패/차단이면 False.""" step_num, step_name = step["step"], step["name"] done = sum(1 for s in self._read_json(self._index_file)["steps"] if s["status"] == "completed") prev_error = None for attempt in range(1, self.MAX_RETRIES + 1): index = self._read_json(self._index_file) step_context = self._build_step_context(index) preamble = self._build_preamble(guardrails, step_context, prev_error) tag = f"Step {step_num}/{self._total - 1} ({done} done): {step_name}" if attempt > 1: tag += f" [retry {attempt}/{self.MAX_RETRIES}]" with progress_indicator(tag) as pi: self._invoke_codex(step, preamble) elapsed = int(pi.elapsed) index = self._read_json(self._index_file) status = next((s.get("status", "pending") for s in index["steps"] if s["step"] == step_num), "pending") ts = self._stamp() if status == "completed": for s in index["steps"]: if s["step"] == step_num: s["completed_at"] = ts self._write_json(self._index_file, index) self._commit_step(step_num, step_name) print(f" ✓ Step {step_num}: {step_name} [{elapsed}s]") return True if status == "blocked": for s in index["steps"]: if s["step"] == step_num: s["blocked_at"] = ts self._write_json(self._index_file, index) reason = next((s.get("blocked_reason", "") for s in index["steps"] if s["step"] == step_num), "") print(f" ⏸ Step {step_num}: {step_name} blocked [{elapsed}s]") print(f" Reason: {reason}") self._update_top_index("blocked") sys.exit(2) err_msg = next( (s.get("error_message", "Step did not update status") for s in index["steps"] if s["step"] == step_num), "Step did not update status", ) if attempt < self.MAX_RETRIES: for s in index["steps"]: if s["step"] == step_num: s["status"] = "pending" s.pop("error_message", None) self._write_json(self._index_file, index) prev_error = err_msg print(f" ↻ Step {step_num}: retry {attempt}/{self.MAX_RETRIES} — {err_msg}") else: for s in index["steps"]: if s["step"] == step_num: s["status"] = "error" s["error_message"] = f"[{self.MAX_RETRIES}회 시도 후 실패] {err_msg}" s["failed_at"] = ts self._write_json(self._index_file, index) self._commit_step(step_num, step_name) print(f" ✗ Step {step_num}: {step_name} failed after {self.MAX_RETRIES} attempts [{elapsed}s]") print(f" Error: {err_msg}") self._update_top_index("error") sys.exit(1) return False # unreachable def _execute_all_steps(self, guardrails: str): while True: index = self._read_json(self._index_file) pending = next((s for s in index["steps"] if s["status"] == "pending"), None) if pending is None: print("\n All report steps completed!") return step_num = pending["step"] for s in index["steps"]: if s["step"] == step_num and "started_at" not in s: s["started_at"] = self._stamp() self._write_json(self._index_file, index) break self._execute_single_step(pending, guardrails) def _finalize(self): index = self._read_json(self._index_file) index["completed_at"] = self._stamp() self._write_json(self._index_file, index) self._update_top_index("completed") if self._ensure_git_enabled(): self._run_git("add", "-A") if self._run_git("diff", "--cached", "--quiet").returncode != 0: msg = f"chore({self._phase_name}): mark phase completed" r = self._run_git("commit", "-m", msg) if r.returncode == 0: print(f" ✓ {msg}") if self._auto_push: if not self._ensure_git_enabled(): print(" WARN: git 저장소가 아니므로 push를 건너뜁니다.") return branch = f"report-{self._phase_name}" r = self._run_git("push", "-u", "origin", branch) if r.returncode != 0: print(f"\n ERROR: git push 실패: {r.stderr.strip()}") sys.exit(1) print(f" ✓ Pushed to origin/{branch}") print(f"\n{'='*60}") print(f" Phase '{self._phase_name}' completed!") print(f"{'='*60}") def main(): parser = argparse.ArgumentParser(description="Harness Step Executor") parser.add_argument("phase_dir", help="Phase directory name (e.g. 0-mvp)") parser.add_argument("--push", action="store_true", help="Push branch after completion") args = parser.parse_args() StepExecutor(args.phase_dir, auto_push=args.push).run() if __name__ == "__main__": main()