initial commit

This commit is contained in:
김경종
2026-06-09 12:27:22 +09:00
commit f186160e44
79 changed files with 6915 additions and 0 deletions
+417
View File
@@ -0,0 +1,417 @@
#!/usr/bin/env python3
"""
Harness Step Executor — phase 내 step을 순차 실행하고 자가 교정한다.
Usage:
python scripts/execute.py <phase-dir> [--push]
"""
import argparse
import contextlib
import json
import os
import subprocess
import sys
import threading
import time
import types
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
ROOT = Path(__file__).resolve().parent.parent
@contextlib.contextmanager
def progress_indicator(label: str):
"""터미널 진행 표시기. with 문으로 사용하며 .elapsed 로 경과 시간을 읽는다."""
frames = "◐◓◑◒"
stop = threading.Event()
t0 = time.monotonic()
def _animate():
idx = 0
while not stop.wait(0.12):
sec = int(time.monotonic() - t0)
sys.stderr.write(f"\r{frames[idx % len(frames)]} {label} [{sec}s]")
sys.stderr.flush()
idx += 1
sys.stderr.write("\r" + " " * (len(label) + 20) + "\r")
sys.stderr.flush()
th = threading.Thread(target=_animate, daemon=True)
th.start()
info = types.SimpleNamespace(elapsed=0.0)
try:
yield info
finally:
stop.set()
th.join()
info.elapsed = time.monotonic() - t0
class StepExecutor:
"""Phase 디렉토리 안의 step들을 순차 실행하는 하네스."""
MAX_RETRIES = 3
FEAT_MSG = "feat({phase}): step {num}{name}"
CHORE_MSG = "chore({phase}): step {num} output"
TZ = timezone(timedelta(hours=9))
def __init__(self, phase_dir_name: str, *, auto_push: bool = False):
self._root = str(ROOT)
self._phases_dir = ROOT / "phases"
self._phase_dir = self._phases_dir / phase_dir_name
self._phase_dir_name = phase_dir_name
self._top_index_file = self._phases_dir / "index.json"
self._auto_push = auto_push
if not self._phase_dir.is_dir():
print(f"ERROR: {self._phase_dir} not found")
sys.exit(1)
self._index_file = self._phase_dir / "index.json"
if not self._index_file.exists():
print(f"ERROR: {self._index_file} not found")
sys.exit(1)
idx = self._read_json(self._index_file)
self._project = idx.get("project", "project")
self._phase_name = idx.get("phase", phase_dir_name)
self._total = len(idx["steps"])
def run(self):
self._print_header()
self._check_blockers()
self._checkout_branch()
guardrails = self._load_guardrails()
self._ensure_created_at()
self._execute_all_steps(guardrails)
self._finalize()
# --- timestamps ---
def _stamp(self) -> str:
return datetime.now(self.TZ).strftime("%Y-%m-%dT%H:%M:%S%z")
# --- JSON I/O ---
@staticmethod
def _read_json(p: Path) -> dict:
return json.loads(p.read_text(encoding="utf-8"))
@staticmethod
def _write_json(p: Path, data: dict):
p.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
# --- git ---
def _run_git(self, *args) -> subprocess.CompletedProcess:
cmd = ["git"] + list(args)
return subprocess.run(cmd, cwd=self._root, capture_output=True, text=True)
def _checkout_branch(self):
branch = f"feat-{self._phase_name}"
r = self._run_git("rev-parse", "--abbrev-ref", "HEAD")
if r.returncode != 0:
print(f" ERROR: git을 사용할 수 없거나 git repo가 아닙니다.")
print(f" {r.stderr.strip()}")
sys.exit(1)
if r.stdout.strip() == branch:
return
r = self._run_git("rev-parse", "--verify", branch)
r = self._run_git("checkout", branch) if r.returncode == 0 else self._run_git("checkout", "-b", branch)
if r.returncode != 0:
print(f" ERROR: 브랜치 '{branch}' checkout 실패.")
print(f" {r.stderr.strip()}")
print(f" Hint: 변경사항을 stash하거나 commit한 후 다시 시도하세요.")
sys.exit(1)
print(f" Branch: {branch}")
def _commit_step(self, step_num: int, step_name: str):
output_rel = f"phases/{self._phase_dir_name}/step{step_num}-output.json"
index_rel = f"phases/{self._phase_dir_name}/index.json"
self._run_git("add", "-A")
self._run_git("reset", "HEAD", "--", output_rel)
self._run_git("reset", "HEAD", "--", index_rel)
if self._run_git("diff", "--cached", "--quiet").returncode != 0:
msg = self.FEAT_MSG.format(phase=self._phase_name, num=step_num, name=step_name)
r = self._run_git("commit", "-m", msg)
if r.returncode == 0:
print(f" Commit: {msg}")
else:
print(f" WARN: 코드 커밋 실패: {r.stderr.strip()}")
self._run_git("add", "-A")
if self._run_git("diff", "--cached", "--quiet").returncode != 0:
msg = self.CHORE_MSG.format(phase=self._phase_name, num=step_num)
r = self._run_git("commit", "-m", msg)
if r.returncode != 0:
print(f" WARN: housekeeping 커밋 실패: {r.stderr.strip()}")
# --- top-level index ---
def _update_top_index(self, status: str):
if not self._top_index_file.exists():
return
top = self._read_json(self._top_index_file)
ts = self._stamp()
for phase in top.get("phases", []):
if phase.get("dir") == self._phase_dir_name:
phase["status"] = status
ts_key = {"completed": "completed_at", "error": "failed_at", "blocked": "blocked_at"}.get(status)
if ts_key:
phase[ts_key] = ts
break
self._write_json(self._top_index_file, top)
# --- guardrails & context ---
def _load_guardrails(self) -> str:
sections = []
agents_md = ROOT / "AGENTS.md"
if agents_md.exists():
sections.append(f"## 프로젝트 규칙 (AGENTS.md)\n\n{agents_md.read_text(encoding='utf-8')}")
docs_dir = ROOT / "docs"
if docs_dir.is_dir():
for doc in sorted(docs_dir.glob("*.md")):
sections.append(f"## {doc.stem}\n\n{doc.read_text(encoding='utf-8')}")
return "\n\n---\n\n".join(sections) if sections else ""
@staticmethod
def _build_step_context(index: dict) -> str:
lines = [
f"- Step {s['step']} ({s['name']}): {s['summary']}"
for s in index["steps"]
if s["status"] == "completed" and s.get("summary")
]
if not lines:
return ""
return "## 이전 Step 산출물\n\n" + "\n".join(lines) + "\n\n"
def _build_preamble(self, guardrails: str, step_context: str,
prev_error: Optional[str] = None) -> str:
commit_example = self.FEAT_MSG.format(
phase=self._phase_name, num="N", name="<step-name>"
)
retry_section = ""
if prev_error:
retry_section = (
f"\n## ⚠ 이전 시도 실패 — 아래 에러를 반드시 참고하여 수정하라\n\n"
f"{prev_error}\n\n---\n\n"
)
return (
f"당신은 {self._project} 프로젝트의 개발자입니다. 아래 step을 수행하세요.\n\n"
f"{guardrails}\n\n---\n\n"
f"{step_context}{retry_section}"
f"## 작업 규칙\n\n"
f"1. 이전 step에서 작성된 코드를 확인하고 일관성을 유지하라.\n"
f"2. 이 step에 명시된 작업만 수행하라. 추가 기능이나 파일을 만들지 마라.\n"
f"3. 기존 테스트를 깨뜨리지 마라.\n"
f"4. AC(Acceptance Criteria) 검증을 직접 실행하라.\n"
f"5. /phases/{self._phase_dir_name}/index.json의 해당 step status를 업데이트하라:\n"
f" - AC 통과 → \"completed\" + \"summary\" 필드에 이 step의 산출물을 한 줄로 요약\n"
f" - {self.MAX_RETRIES}회 수정 시도 후에도 실패 → \"error\" + \"error_message\" 기록\n"
f" - 사용자 개입이 필요한 경우 (API 키, 인증, 수동 설정 등) → \"blocked\" + \"blocked_reason\" 기록 후 즉시 중단\n"
f"6. 모든 변경사항을 커밋하라:\n"
f" {commit_example}\n\n---\n\n"
)
# --- Codex 호출 ---
def _invoke_codex(self, step: dict, preamble: str) -> dict:
step_num, step_name = step["step"], step["name"]
step_file = self._phase_dir / f"step{step_num}.md"
if not step_file.exists():
print(f" ERROR: {step_file} not found")
sys.exit(1)
prompt = preamble + step_file.read_text(encoding="utf-8")
result = subprocess.run(
["codex", "exec", "--dangerously-bypass-approvals-and-sandbox", "--json", prompt],
cwd=self._root, capture_output=True, text=True, timeout=1800,
)
if result.returncode != 0:
print(f"\n WARN: Codex가 비정상 종료됨 (code {result.returncode})")
if result.stderr:
print(f" stderr: {result.stderr[:500]}")
output = {
"step": step_num, "name": step_name,
"exitCode": result.returncode,
"stdout": result.stdout, "stderr": result.stderr,
}
out_path = self._phase_dir / f"step{step_num}-output.json"
with open(out_path, "w", encoding="utf-8") as f:
json.dump(output, f, indent=2, ensure_ascii=False)
return output
# --- 헤더 & 검증 ---
def _print_header(self):
print(f"\n{'='*60}")
print(f" Harness Step Executor")
print(f" Phase: {self._phase_name} | Steps: {self._total}")
if self._auto_push:
print(f" Auto-push: enabled")
print(f"{'='*60}")
def _check_blockers(self):
index = self._read_json(self._index_file)
for s in reversed(index["steps"]):
if s["status"] == "error":
print(f"\n ✗ Step {s['step']} ({s['name']}) failed.")
print(f" Error: {s.get('error_message', 'unknown')}")
print(f" Fix and reset status to 'pending' to retry.")
sys.exit(1)
if s["status"] == "blocked":
print(f"\n ⏸ Step {s['step']} ({s['name']}) blocked.")
print(f" Reason: {s.get('blocked_reason', 'unknown')}")
print(f" Resolve and reset status to 'pending' to retry.")
sys.exit(2)
if s["status"] != "pending":
break
def _ensure_created_at(self):
index = self._read_json(self._index_file)
if "created_at" not in index:
index["created_at"] = self._stamp()
self._write_json(self._index_file, index)
# --- 실행 루프 ---
def _execute_single_step(self, step: dict, guardrails: str) -> bool:
"""단일 step 실행 (재시도 포함). 완료되면 True, 실패/차단이면 False."""
step_num, step_name = step["step"], step["name"]
done = sum(1 for s in self._read_json(self._index_file)["steps"] if s["status"] == "completed")
prev_error = None
for attempt in range(1, self.MAX_RETRIES + 1):
index = self._read_json(self._index_file)
step_context = self._build_step_context(index)
preamble = self._build_preamble(guardrails, step_context, prev_error)
tag = f"Step {step_num}/{self._total - 1} ({done} done): {step_name}"
if attempt > 1:
tag += f" [retry {attempt}/{self.MAX_RETRIES}]"
with progress_indicator(tag) as pi:
self._invoke_codex(step, preamble)
elapsed = int(pi.elapsed)
index = self._read_json(self._index_file)
status = next((s.get("status", "pending") for s in index["steps"] if s["step"] == step_num), "pending")
ts = self._stamp()
if status == "completed":
for s in index["steps"]:
if s["step"] == step_num:
s["completed_at"] = ts
self._write_json(self._index_file, index)
self._commit_step(step_num, step_name)
print(f" ✓ Step {step_num}: {step_name} [{elapsed}s]")
return True
if status == "blocked":
for s in index["steps"]:
if s["step"] == step_num:
s["blocked_at"] = ts
self._write_json(self._index_file, index)
reason = next((s.get("blocked_reason", "") for s in index["steps"] if s["step"] == step_num), "")
print(f" ⏸ Step {step_num}: {step_name} blocked [{elapsed}s]")
print(f" Reason: {reason}")
self._update_top_index("blocked")
sys.exit(2)
err_msg = next(
(s.get("error_message", "Step did not update status") for s in index["steps"] if s["step"] == step_num),
"Step did not update status",
)
if attempt < self.MAX_RETRIES:
for s in index["steps"]:
if s["step"] == step_num:
s["status"] = "pending"
s.pop("error_message", None)
self._write_json(self._index_file, index)
prev_error = err_msg
print(f" ↻ Step {step_num}: retry {attempt}/{self.MAX_RETRIES}{err_msg}")
else:
for s in index["steps"]:
if s["step"] == step_num:
s["status"] = "error"
s["error_message"] = f"[{self.MAX_RETRIES}회 시도 후 실패] {err_msg}"
s["failed_at"] = ts
self._write_json(self._index_file, index)
self._commit_step(step_num, step_name)
print(f" ✗ Step {step_num}: {step_name} failed after {self.MAX_RETRIES} attempts [{elapsed}s]")
print(f" Error: {err_msg}")
self._update_top_index("error")
sys.exit(1)
return False # unreachable
def _execute_all_steps(self, guardrails: str):
while True:
index = self._read_json(self._index_file)
pending = next((s for s in index["steps"] if s["status"] == "pending"), None)
if pending is None:
print("\n All steps completed!")
return
step_num = pending["step"]
for s in index["steps"]:
if s["step"] == step_num and "started_at" not in s:
s["started_at"] = self._stamp()
self._write_json(self._index_file, index)
break
self._execute_single_step(pending, guardrails)
def _finalize(self):
index = self._read_json(self._index_file)
index["completed_at"] = self._stamp()
self._write_json(self._index_file, index)
self._update_top_index("completed")
self._run_git("add", "-A")
if self._run_git("diff", "--cached", "--quiet").returncode != 0:
msg = f"chore({self._phase_name}): mark phase completed"
r = self._run_git("commit", "-m", msg)
if r.returncode == 0:
print(f"{msg}")
if self._auto_push:
branch = f"feat-{self._phase_name}"
r = self._run_git("push", "-u", "origin", branch)
if r.returncode != 0:
print(f"\n ERROR: git push 실패: {r.stderr.strip()}")
sys.exit(1)
print(f" ✓ Pushed to origin/{branch}")
print(f"\n{'='*60}")
print(f" Phase '{self._phase_name}' completed!")
print(f"{'='*60}")
def main():
parser = argparse.ArgumentParser(description="Harness Step Executor")
parser.add_argument("phase_dir", help="Phase directory name (e.g. 0-mvp)")
parser.add_argument("--push", action="store_true", help="Push branch after completion")
args = parser.parse_args()
StepExecutor(args.phase_dir, auto_push=args.push).run()
if __name__ == "__main__":
main()
+81
View File
@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""Intel oneAPI Fortran discovery helpers for no-Abaqus validation."""
from __future__ import annotations
import os
import shutil
import subprocess
from pathlib import Path
from typing import Iterable
COMPILER_CANDIDATES = ("ifx", "ifort")
ONEAPI_VARS_CANDIDATES = [
Path(r"C:\Program Files (x86)\Intel\oneAPI\compiler\latest\env\vars.bat"),
Path(r"C:\Program Files\Intel\oneAPI\compiler\latest\env\vars.bat"),
Path(r"C:\Program Files (x86)\Intel\oneAPI\setvars.bat"),
Path(r"C:\Program Files\Intel\oneAPI\setvars.bat"),
]
class FortranToolchain:
def __init__(self, *, name: str, executable: str, env_script: Path | None):
self.name = name
self.executable = executable
self.env_script = env_script
def compiler_candidates(preference: str | None = None) -> list[str]:
preference = (preference or os.environ.get("HARNESS_FORTRAN_COMPILER") or "auto").lower()
if preference == "auto":
return list(COMPILER_CANDIDATES)
if preference not in COMPILER_CANDIDATES:
raise ValueError(f"Unsupported HARNESS_FORTRAN_COMPILER: {preference}")
return [preference]
def discover_oneapi_env_script(env: dict[str, str] | None = None) -> Path | None:
env = env or os.environ
configured = env.get("HARNESS_ONEAPI_VARS_BAT")
if configured:
path = Path(configured)
return path if path.exists() else None
for path in ONEAPI_VARS_CANDIDATES:
if path.exists():
return path
return None
def resolve_toolchain(env: dict[str, str] | None = None) -> FortranToolchain | None:
env = env or os.environ
candidates = compiler_candidates(env.get("HARNESS_FORTRAN_COMPILER"))
for compiler in candidates:
resolved = shutil.which(compiler)
if resolved:
return FortranToolchain(name=compiler, executable=resolved, env_script=None)
env_script = discover_oneapi_env_script(env)
if env_script is None:
return None
compiler = candidates[0]
return FortranToolchain(name=compiler, executable=compiler, env_script=env_script)
def shell_join(args: Iterable[str | Path]) -> str:
return subprocess.list2cmdline([str(arg) for arg in args])
def quote_path(path: str | Path) -> str:
return shell_join([path])
def wrap_command(toolchain: FortranToolchain, args: list[str | Path]) -> str:
command = shell_join(args)
if toolchain.env_script is None:
return command
env_script = quote_path(toolchain.env_script)
return f'cmd /d /s /c "call {env_script} intel64 >nul && {command}"'
@@ -0,0 +1,310 @@
import unittest
from pathlib import Path
try:
import tomllib
except ModuleNotFoundError: # pragma: no cover
import tomli as tomllib
ROOT = Path(__file__).resolve().parents[1]
AGENTS_ROOT = ROOT / ".codex" / "agents"
SKILLS_ROOT = ROOT / ".codex" / "skills"
DESIGN_DOC = ROOT / "docs" / "ABAQUS_SUBROUTINE_AGENT_DESIGN.md"
COMMON_SKILL_SECTIONS = (
"## Inputs",
"## Workflow",
"## Output Contract",
"## Boundaries",
"## Quality Gate",
"## Handoff",
)
SKILLS = {
"abaqus-subroutine-requirements": (
"Subroutine requirements",
"ABAQUS-USUB-REQ-<FEATURE>-###",
"Requirement Verification Matrix",
"UMAT | VUMAT | UEL",
"Do not implement Fortran code.",
),
"abaqus-subroutine-research": (
"Research evidence",
"official Abaqus documentation",
"books, papers, and benchmark sources",
"Separate verified facts from inference.",
"Abaqus User Subroutines Reference Guide",
),
"abaqus-subroutine-formulation": (
"Finite element formulation",
"stress update",
"consistent tangent",
"state variables",
"Do not design Fortran source layout.",
),
"abaqus-subroutine-numerical-review": (
"Numerical review",
"finite-difference tangent check",
"state variable update",
"stability risks",
"pass-for-interface-definition",
),
"abaqus-subroutine-interface": (
"Abaqus ABI Contract",
"UMAT",
"VUMAT",
"UEL",
"DDSDDE",
"STATEV",
),
"abaqus-subroutine-test-models": (
"TDD test model",
"no-Abaqus",
"tests/fortran/manifest.json",
"references/<feature-id>/<model-id>/",
"source hash",
"msg/dat/log tail",
),
"abaqus-fortran-tdd": (
"Fortran TDD",
"RED -> GREEN -> VERIFY",
"python scripts/validate_fortran.py",
"Intel oneAPI",
"ifx",
"ifort",
),
"abaqus-subroutine-validation": (
"Subroutine validation",
"HARNESS_ABAQUS_VALIDATION=run",
"ready-for-comparison",
"source hash",
"msg/dat/log",
"Do not change tolerance policies.",
),
"abaqus-subroutine-physics-sanity": (
"Physics sanity",
"global equilibrium",
"reaction consistency",
"stress/strain",
"state variable",
"energy/residual",
),
"abaqus-subroutine-readiness": (
"Readiness audit",
"Gate Evidence Inventory",
"Known Limitations",
"Validation Evidence",
"Do not publish, deploy, package, tag, commit, or externally release anything unless the user explicitly asks.",
),
}
AGENT_SKILL_REFERENCES = {
"coordinator-agent.toml": (
"abaqus-subroutine-requirements",
"abaqus-subroutine-test-models",
"abaqus-subroutine-readiness",
),
"requirement-agent.toml": ("abaqus-subroutine-requirements",),
"research-agent.toml": ("abaqus-subroutine-research", "fem-theory-query"),
"formulation-agent.toml": ("abaqus-subroutine-formulation", "fem-theory-query"),
"numerical-review-agent.toml": (
"abaqus-subroutine-numerical-review",
"fem-theory-query",
),
"io-definition-agent.toml": (
"abaqus-subroutine-interface",
"fem-theory-query",
),
"reference-model-agent.toml": (
"abaqus-subroutine-test-models",
"fem-theory-query",
),
"implementation-planning-agent.toml": (
"abaqus-subroutine-formulation",
"abaqus-subroutine-test-models",
"abaqus-fortran-tdd",
"fem-theory-query",
),
"implementation-agent.toml": ("abaqus-fortran-tdd",),
"build-test-executor-agent.toml": (
"abaqus-fortran-tdd",
"abaqus-subroutine-validation",
),
"correction-agent.toml": ("abaqus-fortran-tdd",),
"reference-verification-agent.toml": (
"abaqus-subroutine-validation",
"abaqus-subroutine-interface",
),
"physics-evaluation-agent.toml": (
"abaqus-subroutine-physics-sanity",
"fem-theory-query",
),
"release-agent.toml": ("abaqus-subroutine-readiness",),
}
AGENT_REQUIRED_TERMS = {
"coordinator-agent.toml": (
"1. Subroutine requirements analysis",
"2. Research evidence",
"3. Finite element formulation",
"4. Abaqus subroutine interface",
"5. TDD test models",
"6. Fortran implementation",
"7. Subroutine validation",
),
"requirement-agent.toml": ("Subroutine requirements analysis", "ABAQUS-USUB-REQ"),
"research-agent.toml": ("books, papers, official Abaqus manuals", "source reliability"),
"formulation-agent.toml": ("stress update", "consistent tangent", "state variables"),
"numerical-review-agent.toml": (
"finite-difference tangent check",
"algorithmic consistency",
),
"io-definition-agent.toml": ("Abaqus ABI Contract", "STRESS", "DDSDDE", "STATEV"),
"reference-model-agent.toml": (
"tests/fortran/manifest.json",
"references/<feature-id>/<model-id>/",
),
"implementation-planning-agent.toml": (
"Fortran source",
"no-Abaqus driver",
"RED -> GREEN -> VERIFY",
),
"implementation-agent.toml": ("Fortran source", "Intel oneAPI", "RED -> GREEN -> VERIFY"),
"build-test-executor-agent.toml": (
"python scripts/validate_fortran.py",
"HARNESS_ABAQUS_VALIDATION=run",
),
"correction-agent.toml": ("Fortran compile", "minimal correction"),
"reference-verification-agent.toml": (
"metadata.json",
"source hash",
"Abaqus version",
"compiler version",
),
"physics-evaluation-agent.toml": ("global equilibrium", "stress/strain", "state variable"),
"release-agent.toml": ("Gate Evidence Inventory", "Known Limitations"),
}
def parse_frontmatter(text):
lines = text.splitlines()
if not lines or lines[0] != "---":
raise AssertionError("SKILL.md must start with YAML frontmatter")
fields = {}
for line in lines[1:]:
if line == "---":
return fields
key, sep, value = line.partition(":")
if not sep:
raise AssertionError(f"Invalid frontmatter line: {line}")
fields[key.strip()] = value.strip()
raise AssertionError("SKILL.md frontmatter must be closed")
class AbaqusSubroutineCodexConfigTests(unittest.TestCase):
def test_abaqus_subroutine_skill_files_exist_with_metadata(self):
for skill_name, body_terms in SKILLS.items():
with self.subTest(skill=skill_name):
skill_path = SKILLS_ROOT / skill_name / "SKILL.md"
self.assertTrue(skill_path.exists(), f"{skill_name} SKILL.md is missing")
text = skill_path.read_text(encoding="utf-8")
fields = parse_frontmatter(text)
self.assertEqual(set(fields), {"name", "description"})
self.assertEqual(fields["name"], skill_name)
self.assertIn("Use when", fields["description"])
self.assertIn("Abaqus", fields["description"])
self.assertIn("User Subroutine", fields["description"])
for section in COMMON_SKILL_SECTIONS:
self.assertIn(section, text)
self.assertIn("AGENTS.md", text)
self.assertIn("docs/ABAQUS_SUBROUTINE_AGENT_DESIGN.md", text)
for term in body_terms:
self.assertIn(term, text)
def test_abaqus_subroutine_skills_have_ui_metadata(self):
for skill_name in SKILLS:
with self.subTest(skill=skill_name):
metadata = SKILLS_ROOT / skill_name / "agents" / "openai.yaml"
self.assertTrue(metadata.exists(), f"{skill_name} openai.yaml is missing")
text = metadata.read_text(encoding="utf-8")
self.assertIn("interface:", text)
self.assertIn("display_name:", text)
self.assertIn("short_description:", text)
self.assertIn("default_prompt:", text)
self.assertIn(f"${skill_name}", text)
def test_deprecated_fesa_skill_directories_are_removed(self):
deprecated = sorted(p.name for p in SKILLS_ROOT.iterdir() if p.name.startswith("fesa-"))
self.assertEqual([], deprecated)
def test_agents_reference_abaqus_subroutine_skills(self):
for agent_file, skill_names in AGENT_SKILL_REFERENCES.items():
with self.subTest(agent=agent_file):
text = (AGENTS_ROOT / agent_file).read_text(encoding="utf-8")
data = tomllib.loads(text)
self.assertEqual(data["name"], agent_file.removesuffix(".toml"))
self.assertEqual(data["model_reasoning_effort"], "extra high")
self.assertIn("Abaqus User Subroutine", data["description"])
instructions = data["developer_instructions"]
self.assertIn("Skill references:", instructions)
self.assertIn("Abaqus User Subroutine", instructions)
for skill_name in skill_names:
self.assertIn(f"${skill_name}", instructions)
for term in AGENT_REQUIRED_TERMS[agent_file]:
self.assertIn(term, instructions)
def test_agent_and_skill_text_no_longer_targets_fesa_cpp_solver_work(self):
checked_paths = list(AGENTS_ROOT.glob("*.toml"))
checked_paths += [SKILLS_ROOT / name / "SKILL.md" for name in SKILLS]
checked_paths += list((SKILLS_ROOT / "harness-workflow").glob("SKILL.md"))
checked_paths += list((SKILLS_ROOT / "harness-review").glob("SKILL.md"))
forbidden_terms = (
"FESA solver",
"FESA FEM",
"FESA C++",
"C++17/MSVC",
"C++/MSVC",
"CMake/CTest",
)
for path in checked_paths:
with self.subTest(path=path):
text = path.read_text(encoding="utf-8")
for forbidden in forbidden_terms:
self.assertNotIn(forbidden, text)
def test_harness_skills_target_fortran_subroutine_workflow(self):
for skill_name in ("harness-workflow", "harness-review"):
with self.subTest(skill=skill_name):
text = (SKILLS_ROOT / skill_name / "SKILL.md").read_text(encoding="utf-8")
for term in (
"Abaqus User Subroutine",
"Fortran",
"python scripts/validate_fortran.py",
"python scripts/validate_reference_artifacts.py",
"HARNESS_ABAQUS_VALIDATION=run",
):
self.assertIn(term, text)
def test_design_doc_captures_user_subroutine_process(self):
text = DESIGN_DOC.read_text(encoding="utf-8")
for term in (
"Abaqus User Subroutine development process",
"1. Subroutine requirements analysis",
"2. Books, papers, and research evidence",
"3. Finite element formulation for implementation",
"4. Subroutine input/output parameter definition",
"5. TDD test model design",
"6. Fortran code implementation",
"7. Subroutine validation",
):
self.assertIn(term, text)
if __name__ == "__main__":
unittest.main()
+76
View File
@@ -0,0 +1,76 @@
import importlib.util
import os
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
def load_fortran_toolchain():
module_path = Path(__file__).resolve().parent / "fortran_toolchain.py"
spec = importlib.util.spec_from_file_location("fortran_toolchain", module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class FortranToolchainTests(unittest.TestCase):
def test_auto_prefers_ifx_over_ifort_when_both_are_on_path(self):
fortran_toolchain = load_fortran_toolchain()
def fake_which(name):
return f"C:\\oneapi\\{name}.exe" if name in {"ifx", "ifort"} else None
with patch.object(fortran_toolchain.shutil, "which", side_effect=fake_which):
with patch.dict(os.environ, {}, clear=True):
toolchain = fortran_toolchain.resolve_toolchain()
self.assertEqual(toolchain.name, "ifx")
self.assertEqual(toolchain.executable, "C:\\oneapi\\ifx.exe")
self.assertIsNone(toolchain.env_script)
def test_auto_uses_oneapi_env_script_when_compiler_is_not_on_path(self):
fortran_toolchain = load_fortran_toolchain()
with tempfile.TemporaryDirectory() as tmp:
vars_bat = Path(tmp) / "setvars.bat"
vars_bat.write_text("@echo off\n", encoding="utf-8")
with patch.object(fortran_toolchain.shutil, "which", return_value=None):
with patch.object(fortran_toolchain, "ONEAPI_VARS_CANDIDATES", [vars_bat]):
with patch.dict(os.environ, {}, clear=True):
toolchain = fortran_toolchain.resolve_toolchain()
self.assertEqual(toolchain.name, "ifx")
self.assertEqual(toolchain.executable, "ifx")
self.assertEqual(toolchain.env_script, vars_bat)
def test_explicit_compiler_preference_is_honored(self):
fortran_toolchain = load_fortran_toolchain()
def fake_which(name):
return f"C:\\oneapi\\{name}.exe"
with patch.object(fortran_toolchain.shutil, "which", side_effect=fake_which):
with patch.dict(os.environ, {"HARNESS_FORTRAN_COMPILER": "ifort"}, clear=True):
toolchain = fortran_toolchain.resolve_toolchain()
self.assertEqual(toolchain.name, "ifort")
self.assertEqual(toolchain.executable, "C:\\oneapi\\ifort.exe")
def test_wrap_command_calls_oneapi_env_before_compiler(self):
fortran_toolchain = load_fortran_toolchain()
toolchain = fortran_toolchain.FortranToolchain(
name="ifx",
executable="ifx",
env_script=Path(r"C:\Program Files (x86)\Intel\oneAPI\setvars.bat"),
)
command = fortran_toolchain.wrap_command(toolchain, ["ifx", "/nologo", "test.f90"])
self.assertIn("cmd /d /s /c", command)
self.assertIn('call "C:\\Program Files (x86)\\Intel\\oneAPI\\setvars.bat" intel64', command)
self.assertIn("ifx /nologo test.f90", command)
if __name__ == "__main__":
unittest.main()
+41
View File
@@ -0,0 +1,41 @@
import importlib.util
import sys
import tempfile
import unittest
from pathlib import Path
def load_pre_commit_checks():
module_path = Path(__file__).resolve().parent.parent / ".codex" / "hooks" / "pre_commit_checks.py"
spec = importlib.util.spec_from_file_location("pre_commit_checks", module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class PreCommitChecksTests(unittest.TestCase):
def test_git_commit_runs_python_self_tests_and_workspace_validation(self):
pre_commit_checks = load_pre_commit_checks()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
commands = pre_commit_checks._build_pre_commit_commands(root)
self.assertEqual(
commands,
[
[sys.executable, "-m", "unittest", "discover", "-s", "scripts", "-p", "test_*.py"],
[sys.executable, "scripts/validate_workspace.py"],
],
)
self.assertFalse(any("npm" in part.lower() for command in commands for part in command))
def test_only_git_commit_commands_trigger_checks(self):
pre_commit_checks = load_pre_commit_checks()
self.assertTrue(pre_commit_checks._is_git_commit('git commit -m "change"'))
self.assertTrue(pre_commit_checks._is_git_commit('git -c core.editor=true commit -m "change"'))
self.assertFalse(pre_commit_checks._is_git_commit("git status --short"))
self.assertFalse(pre_commit_checks._is_git_commit("echo git commit"))
if __name__ == "__main__":
unittest.main()
+101
View File
@@ -0,0 +1,101 @@
import importlib.util
import tempfile
import unittest
from pathlib import Path
def load_tdd_guard():
module_path = Path(__file__).resolve().parent.parent / ".codex" / "hooks" / "tdd-guard.py"
spec = importlib.util.spec_from_file_location("tdd_guard", module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class CppTddGuardTests(unittest.TestCase):
def test_cpp_production_file_without_related_test_is_blocked(self):
tdd_guard = load_tdd_guard()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
source = root / "include" / "fesa" / "Core" / "DofManager.hpp"
source.parent.mkdir(parents=True)
source.write_text("#pragma once\n", encoding="utf-8")
self.assertEqual(tdd_guard._guarded_paths([str(source)], root, root), ["DofManager"])
def test_cpp_production_file_with_module_test_is_allowed(self):
tdd_guard = load_tdd_guard()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
source = root / "include" / "fesa" / "Core" / "DofManager.hpp"
source.parent.mkdir(parents=True)
source.write_text("#pragma once\n", encoding="utf-8")
tests_dir = root / "tests"
tests_dir.mkdir()
(tests_dir / "test_core_module_includes.cpp").write_text("int main() { return 0; }\n", encoding="utf-8")
self.assertEqual(tdd_guard._guarded_paths([str(source)], root, root), [])
def test_cpp_production_file_with_basename_test_in_same_patch_is_allowed(self):
tdd_guard = load_tdd_guard()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
source = root / "src" / "Math" / "DenseMatrix.cpp"
source.parent.mkdir(parents=True)
source.write_text("void f() {}\n", encoding="utf-8")
test_path = root / "tests" / "test_dense_matrix.cpp"
self.assertEqual(tdd_guard._guarded_paths([str(source), str(test_path)], root, root), [])
def test_fortran_subroutine_without_related_test_is_blocked(self):
tdd_guard = load_tdd_guard()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
source = root / "src" / "fortran" / "abaqus" / "UMAT.for"
source.parent.mkdir(parents=True)
source.write_text(" subroutine umat()\n end\n", encoding="utf-8")
self.assertEqual(tdd_guard._guarded_paths([str(source)], root, root), ["UMAT"])
def test_fortran_subroutine_with_related_driver_test_is_allowed(self):
tdd_guard = load_tdd_guard()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
source = root / "src" / "fortran" / "abaqus" / "UMAT.for"
source.parent.mkdir(parents=True)
source.write_text(" subroutine umat()\n end\n", encoding="utf-8")
test_path = root / "tests" / "fortran" / "test_umat_linear_elastic.f90"
self.assertEqual(tdd_guard._guarded_paths([str(source), str(test_path)], root, root), [])
def test_reference_artifacts_are_exempt(self):
tdd_guard = load_tdd_guard()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
self.assertEqual(
tdd_guard._guarded_paths(
[
"references/umat/single-element/model.inp",
"references/umat/single-element/stresses.csv",
"references/umat/single-element/job.msg.tail.txt",
"references/umat/single-element/job.dat.tail.txt",
"references/umat/single-element/job.log.tail.txt",
],
root,
root,
),
[],
)
def test_cmake_and_docs_are_exempt(self):
tdd_guard = load_tdd_guard()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
self.assertEqual(
tdd_guard._guarded_paths(["CMakeLists.txt", "docs/ARCHITECTURE.md", "cmake/toolchain.cmake"], root, root),
[],
)
if __name__ == "__main__":
unittest.main()
+86
View File
@@ -0,0 +1,86 @@
import importlib.util
import json
import os
import tempfile
import unittest
from pathlib import Path
def load_validate_fortran():
module_path = Path(__file__).resolve().parent / "validate_fortran.py"
spec = importlib.util.spec_from_file_location("validate_fortran", module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class ValidateFortranTests(unittest.TestCase):
def test_auto_mode_has_no_commands_when_manifest_is_missing(self):
validate_fortran = load_validate_fortran()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
with unittest.mock.patch.dict(os.environ, {}, clear=True):
commands = validate_fortran.discover_commands(root)
self.assertEqual(commands, [])
def test_off_mode_skips_manifest(self):
validate_fortran = load_validate_fortran()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
manifest = root / "tests" / "fortran" / "manifest.json"
manifest.parent.mkdir(parents=True)
manifest.write_text(json.dumps({"tests": []}), encoding="utf-8")
with unittest.mock.patch.dict(os.environ, {"HARNESS_FORTRAN_VALIDATION": "off"}, clear=True):
commands = validate_fortran.discover_commands(root)
self.assertEqual(commands, [])
def test_manifest_requires_available_compiler(self):
validate_fortran = load_validate_fortran()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
manifest = root / "tests" / "fortran" / "manifest.json"
manifest.parent.mkdir(parents=True)
manifest.write_text(json.dumps({"tests": [{"name": "case", "sources": ["a.f90"]}]}), encoding="utf-8")
with unittest.mock.patch.dict(os.environ, {}, clear=True):
with unittest.mock.patch.object(validate_fortran, "resolve_toolchain", return_value=None):
with self.assertRaises(validate_fortran.FortranValidationError):
validate_fortran.discover_commands(root)
def test_manifest_generates_compile_and_run_commands(self):
validate_fortran = load_validate_fortran()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
manifest = root / "tests" / "fortran" / "manifest.json"
manifest.parent.mkdir(parents=True)
manifest.write_text(
json.dumps(
{
"tests": [
{
"name": "umat_linear_elastic_kernel",
"sources": [
"src/fortran/kernels/linear_elastic.f90",
"tests/fortran/test_umat_linear_elastic.f90",
],
}
]
}
),
encoding="utf-8",
)
toolchain = validate_fortran.FortranToolchain(name="ifx", executable="ifx", env_script=None)
with unittest.mock.patch.dict(os.environ, {}, clear=True):
with unittest.mock.patch.object(validate_fortran, "resolve_toolchain", return_value=toolchain):
commands = validate_fortran.discover_commands(root)
self.assertEqual(len(commands), 2)
self.assertIn("ifx /nologo", commands[0])
self.assertIn("src\\fortran\\kernels\\linear_elastic.f90", commands[0])
self.assertIn("/exe:", commands[0])
self.assertTrue(commands[1].endswith("umat_linear_elastic_kernel.exe"))
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,121 @@
import hashlib
import importlib.util
import json
import tempfile
import unittest
from pathlib import Path
def load_validate_reference_artifacts():
module_path = Path(__file__).resolve().parent / "validate_reference_artifacts.py"
spec = importlib.util.spec_from_file_location("validate_reference_artifacts", module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def write_json(path: Path, payload: dict):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
class ValidateReferenceArtifactsTests(unittest.TestCase):
def test_missing_references_directory_is_valid(self):
validator = load_validate_reference_artifacts()
with tempfile.TemporaryDirectory() as tmp:
self.assertEqual(validator.validate_root(Path(tmp)), [])
def test_draft_metadata_with_minimal_provenance_is_valid(self):
validator = load_validate_reference_artifacts()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
write_json(
root / "references" / "umat" / "single-element" / "metadata.json",
{
"schema_version": "abaqus-user-subroutine-artifact-v1",
"feature_id": "umat",
"model_id": "single-element",
"artifact_status": "draft",
},
)
self.assertEqual(validator.validate_root(root), [])
def test_ready_for_comparison_requires_declared_files(self):
validator = load_validate_reference_artifacts()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
write_json(
root / "references" / "umat" / "single-element" / "metadata.json",
{
"schema_version": "abaqus-user-subroutine-artifact-v1",
"feature_id": "umat",
"model_id": "single-element",
"artifact_status": "ready-for-comparison",
"abaqus": {"version": "2024", "precision": "double", "command": "abaqus job=case user=UMAT.for"},
"compiler": {"vendor": "Intel oneAPI", "name": "ifx", "version": "2024"},
"subroutine": {
"entry_points": ["UMAT"],
"source_files": [{"path": "src/fortran/abaqus/UMAT.for", "language": "Fortran", "sha256": "abc"}],
},
"input_file": "model.inp",
"outputs": {
"tails": {"msg": "job.msg.tail.txt", "dat": "job.dat.tail.txt", "log": "job.log.tail.txt"},
"csv": {"stresses": "stresses.csv"},
},
},
)
errors = validator.validate_root(root)
self.assertTrue(any("missing input_file" in error for error in errors))
self.assertTrue(any("missing output tail" in error for error in errors))
self.assertTrue(any("missing csv output" in error for error in errors))
self.assertTrue(any("missing source file" in error for error in errors))
def test_ready_for_comparison_checks_source_sha256(self):
validator = load_validate_reference_artifacts()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
source = root / "src" / "fortran" / "abaqus" / "UMAT.for"
source.parent.mkdir(parents=True)
source.write_text(" subroutine umat()\n end\n", encoding="utf-8")
source_hash = hashlib.sha256(source.read_bytes()).hexdigest()
model_dir = root / "references" / "umat" / "single-element"
for name in ["model.inp", "job.msg.tail.txt", "job.dat.tail.txt", "job.log.tail.txt", "stresses.csv"]:
(model_dir / name).parent.mkdir(parents=True, exist_ok=True)
(model_dir / name).write_text("ok\n", encoding="utf-8")
write_json(
model_dir / "metadata.json",
{
"schema_version": "abaqus-user-subroutine-artifact-v1",
"feature_id": "umat",
"model_id": "single-element",
"artifact_status": "ready-for-comparison",
"abaqus": {"version": "2024", "precision": "double", "command": "abaqus job=case user=UMAT.for"},
"compiler": {"vendor": "Intel oneAPI", "name": "ifx", "version": "2024"},
"subroutine": {
"entry_points": ["UMAT"],
"source_files": [
{
"path": "src/fortran/abaqus/UMAT.for",
"language": "Fortran",
"sha256": "0" * len(source_hash),
}
],
},
"input_file": "model.inp",
"outputs": {
"tails": {"msg": "job.msg.tail.txt", "dat": "job.dat.tail.txt", "log": "job.log.tail.txt"},
"csv": {"stresses": "stresses.csv"},
},
},
)
errors = validator.validate_root(root)
self.assertTrue(any("sha256 mismatch" in error for error in errors))
if __name__ == "__main__":
unittest.main()
+153
View File
@@ -0,0 +1,153 @@
import importlib.util
import io
import os
import subprocess
import tempfile
import unittest
from contextlib import redirect_stderr, redirect_stdout
from pathlib import Path
from unittest.mock import patch
def load_validate_workspace():
module_path = Path(__file__).resolve().parent / "validate_workspace.py"
spec = importlib.util.spec_from_file_location("validate_workspace", module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class ValidateWorkspaceTests(unittest.TestCase):
def _script_command(self, validate_workspace, script_name: str) -> str:
return validate_workspace.python_script_command(script_name)
def test_env_commands_override_cmake_detection(self):
validate_workspace = load_validate_workspace()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
(root / "CMakeLists.txt").write_text("cmake_minimum_required(VERSION 3.20)\n", encoding="utf-8")
with patch.dict(os.environ, {"HARNESS_VALIDATION_COMMANDS": "echo first\n echo second \n"}, clear=True):
self.assertEqual(validate_workspace.discover_commands(root), ["echo first", "echo second"])
def test_env_commands_override_abaqus_validation_config_in_main(self):
validate_workspace = load_validate_workspace()
env = {
"HARNESS_VALIDATION_COMMANDS": "echo override",
"HARNESS_ABAQUS_VALIDATION": "run",
}
with patch.dict(os.environ, env, clear=True):
with patch.object(
validate_workspace,
"run_command",
return_value=subprocess.CompletedProcess("echo override", 0, "", ""),
):
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
self.assertEqual(validate_workspace.main(), 0)
def test_msvc_debug_cmake_commands_are_default_for_cmake_project(self):
validate_workspace = load_validate_workspace()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
(root / "CMakeLists.txt").write_text("cmake_minimum_required(VERSION 3.20)\n", encoding="utf-8")
build_dir = root / "build" / "msvc-debug"
with patch.dict(os.environ, {}, clear=True):
self.assertEqual(
validate_workspace.discover_commands(root),
[
self._script_command(validate_workspace, "validate_reference_artifacts.py"),
self._script_command(validate_workspace, "validate_fortran.py"),
f'cmake -S "{root}" -B "{build_dir}" -G "Visual Studio 17 2022" -A x64',
f'cmake --build "{build_dir}" --config Debug',
f'ctest --test-dir "{build_dir}" --output-on-failure -C Debug',
],
)
def test_msvc_debug_configure_preset_is_preferred_when_present(self):
validate_workspace = load_validate_workspace()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
(root / "CMakeLists.txt").write_text("cmake_minimum_required(VERSION 3.20)\n", encoding="utf-8")
(root / "CMakePresets.json").write_text(
"""
{
"version": 3,
"configurePresets": [
{
"name": "msvc-debug",
"generator": "Visual Studio 17 2022",
"binaryDir": "${sourceDir}/out/msvc-debug"
}
]
}
""",
encoding="utf-8",
)
with patch.dict(os.environ, {}, clear=True):
self.assertEqual(
validate_workspace.discover_commands(root),
[
self._script_command(validate_workspace, "validate_reference_artifacts.py"),
self._script_command(validate_workspace, "validate_fortran.py"),
"cmake --preset msvc-debug",
f'cmake --build "{root / "out" / "msvc-debug"}" --config Debug',
f'ctest --test-dir "{root / "out" / "msvc-debug"}" --output-on-failure -C Debug',
],
)
def test_no_cmake_project_has_no_validation_commands(self):
validate_workspace = load_validate_workspace()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
with patch.dict(os.environ, {}, clear=True):
self.assertEqual(
validate_workspace.discover_commands(root),
[
self._script_command(validate_workspace, "validate_reference_artifacts.py"),
self._script_command(validate_workspace, "validate_fortran.py"),
],
)
def test_abaqus_detect_mode_reports_without_running_jobs(self):
validate_workspace = load_validate_workspace()
with patch.dict(os.environ, {"HARNESS_ABAQUS_VALIDATION": "detect"}, clear=True):
validation = validate_workspace.discover_abaqus_validation(which=lambda _name: None)
self.assertEqual(validation.mode, "detect")
self.assertEqual(validation.commands, [])
self.assertFalse(validation.required)
def test_abaqus_run_mode_requires_executable_and_commands(self):
validate_workspace = load_validate_workspace()
with patch.dict(os.environ, {"HARNESS_ABAQUS_VALIDATION": "run"}, clear=True):
with self.assertRaises(validate_workspace.ValidationConfigError):
validate_workspace.discover_abaqus_validation(which=lambda _name: None)
def test_abaqus_run_mode_replaces_abaqus_token(self):
validate_workspace = load_validate_workspace()
env = {
"HARNESS_ABAQUS_VALIDATION": "run",
"HARNESS_ABAQUS_VALIDATION_COMMANDS": "{abaqus} job=case user=UMAT.for",
"HARNESS_ABAQUS_USE_ONEAPI_ENV": "off",
}
with patch.dict(os.environ, env, clear=True):
validation = validate_workspace.discover_abaqus_validation(which=lambda _name: r"C:\SIMULIA\abaqus.bat")
self.assertTrue(validation.required)
self.assertEqual(validation.commands, [r"C:\SIMULIA\abaqus.bat job=case user=UMAT.for"])
def test_common_cmake_install_path_is_prepended_when_cmake_is_not_on_path(self):
validate_workspace = load_validate_workspace()
with tempfile.TemporaryDirectory() as tmp:
common_bin = Path(tmp) / "CMake" / "bin"
common_bin.mkdir(parents=True)
(common_bin / "cmake.exe").write_text("", encoding="utf-8")
(common_bin / "ctest.exe").write_text("", encoding="utf-8")
with patch.object(validate_workspace, "COMMON_CMAKE_BIN", common_bin):
with patch.object(validate_workspace.shutil, "which", return_value=None):
env = validate_workspace.validation_environment({"PATH": "C:\\Windows\\System32"})
self.assertTrue(env["PATH"].startswith(str(common_bin)))
if __name__ == "__main__":
unittest.main()
+131
View File
@@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""Run no-Abaqus Intel Fortran manifest tests."""
from __future__ import annotations
import json
import os
import subprocess
import sys
from pathlib import Path
try:
from fortran_toolchain import FortranToolchain, resolve_toolchain, wrap_command
except ModuleNotFoundError:
from scripts.fortran_toolchain import FortranToolchain, resolve_toolchain, wrap_command
MANIFEST_PATH = Path("tests/fortran/manifest.json")
BUILD_ROOT = Path("build/fortran-tests")
VALIDATION_MODES = {"off", "detect", "auto", "compile"}
class FortranValidationError(RuntimeError):
pass
def validation_mode(env: dict[str, str] | None = None) -> str:
env = env or os.environ
mode = env.get("HARNESS_FORTRAN_VALIDATION", "auto").lower()
if mode not in VALIDATION_MODES:
raise FortranValidationError(f"Unsupported HARNESS_FORTRAN_VALIDATION: {mode}")
return mode
def load_manifest(root: Path) -> dict | None:
manifest = root / MANIFEST_PATH
if not manifest.exists():
return None
try:
payload = json.loads(manifest.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
raise FortranValidationError(f"Invalid Fortran manifest JSON: {manifest}: {exc}") from exc
if not isinstance(payload, dict) or not isinstance(payload.get("tests", []), list):
raise FortranValidationError(f"Fortran manifest must contain a tests array: {manifest}")
return payload
def _validate_test_record(record: dict) -> tuple[str, list[str]]:
if not isinstance(record, dict):
raise FortranValidationError("Fortran manifest test record must be an object.")
name = record.get("name")
sources = record.get("sources")
if not isinstance(name, str) or not name:
raise FortranValidationError("Fortran manifest test record is missing name.")
if not isinstance(sources, list) or not all(isinstance(source, str) and source for source in sources):
raise FortranValidationError(f"Fortran manifest test {name} must define source paths.")
return name, sources
def build_test_commands(root: Path, manifest: dict, toolchain: FortranToolchain) -> list[str]:
commands: list[str] = []
for record in manifest.get("tests", []):
name, sources = _validate_test_record(record)
build_dir = root / BUILD_ROOT / name
exe_path = build_dir / f"{name}.exe"
source_paths = [root / source for source in sources]
compile_args: list[str | Path] = [
toolchain.executable,
"/nologo",
*source_paths,
f"/exe:{exe_path}",
]
commands.append(wrap_command(toolchain, compile_args))
commands.append(str(exe_path))
return commands
def discover_commands(root: Path, env: dict[str, str] | None = None) -> list[str]:
env = env or os.environ
mode = validation_mode(env)
if mode == "off":
return []
manifest = load_manifest(root)
if manifest is None:
return []
toolchain = resolve_toolchain(env)
if toolchain is None:
raise FortranValidationError("Fortran validation manifest exists, but no Intel oneAPI Fortran compiler was found.")
if mode == "detect":
return []
return build_test_commands(root, manifest, toolchain)
def run_command(command: str, root: Path) -> subprocess.CompletedProcess:
return subprocess.run(command, cwd=root, shell=True, capture_output=True, text=True, encoding="utf-8", errors="replace")
def main() -> int:
root = Path(__file__).resolve().parent.parent
try:
commands = discover_commands(root)
except FortranValidationError as exc:
print(f"Fortran validation failed: {exc}", file=sys.stderr)
return 2
if not commands:
print("No Fortran validation commands configured.")
return 0
for command in commands:
print(f"$ {command}")
result = run_command(command, root)
if result.stdout.strip():
print("[stdout]")
print(result.stdout.strip())
if result.stderr.strip():
print("[stderr]", file=sys.stderr)
print(result.stderr.strip(), file=sys.stderr)
if result.returncode != 0:
print(f"Fortran validation failed: {command}", file=sys.stderr)
return result.returncode
print("Fortran validation succeeded.")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+174
View File
@@ -0,0 +1,174 @@
#!/usr/bin/env python3
"""Validate stored Abaqus user-subroutine reference artifact metadata."""
from __future__ import annotations
import hashlib
import json
import sys
from pathlib import Path
SCHEMA_VERSION = "abaqus-user-subroutine-artifact-v1"
VALID_STATUSES = {"draft", "needs-reference-artifacts", "ready-for-comparison", "blocked"}
READY_STATUS = "ready-for-comparison"
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def _nested(payload: dict, *keys: str):
current = payload
for key in keys:
if not isinstance(current, dict) or key not in current:
return None
current = current[key]
return current
def _load_metadata(path: Path) -> tuple[dict | None, list[str]]:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
return None, [f"{path}: invalid JSON: {exc}"]
if not isinstance(payload, dict):
return None, [f"{path}: metadata must be a JSON object"]
return payload, []
def _validate_basic(path: Path, payload: dict) -> list[str]:
errors: list[str] = []
for key in ("schema_version", "feature_id", "model_id", "artifact_status"):
if not payload.get(key):
errors.append(f"{path}: missing required key {key}")
if payload.get("schema_version") and payload.get("schema_version") != SCHEMA_VERSION:
errors.append(f"{path}: unsupported schema_version {payload.get('schema_version')}")
status = payload.get("artifact_status")
if status and status not in VALID_STATUSES:
errors.append(f"{path}: unsupported artifact_status {status}")
return errors
def _require_ready_key(path: Path, payload: dict, *keys: str) -> list[str]:
value = _nested(payload, *keys)
dotted = ".".join(keys)
if value is None or value == "":
return [f"{path}: missing ready-for-comparison key {dotted}"]
return []
def _validate_ready_files(path: Path, root: Path, payload: dict) -> list[str]:
errors: list[str] = []
model_dir = path.parent
for keys in (
("abaqus", "version"),
("abaqus", "precision"),
("abaqus", "command"),
("compiler", "vendor"),
("compiler", "name"),
("compiler", "version"),
("subroutine", "entry_points"),
("subroutine", "source_files"),
("input_file",),
("outputs", "tails"),
("outputs", "csv"),
):
errors.extend(_require_ready_key(path, payload, *keys))
input_file = payload.get("input_file")
if isinstance(input_file, str) and not (model_dir / input_file).exists():
errors.append(f"{path}: missing input_file {input_file}")
tails = _nested(payload, "outputs", "tails")
if isinstance(tails, dict):
for key in ("msg", "dat", "log"):
tail_path = tails.get(key)
if not isinstance(tail_path, str) or not tail_path:
errors.append(f"{path}: missing output tail {key}")
elif not (model_dir / tail_path).exists():
errors.append(f"{path}: missing output tail {key}: {tail_path}")
csv_outputs = _nested(payload, "outputs", "csv")
if isinstance(csv_outputs, dict):
if not csv_outputs:
errors.append(f"{path}: missing csv output declaration")
for key, csv_path in csv_outputs.items():
if not isinstance(csv_path, str) or not csv_path:
errors.append(f"{path}: missing csv output {key}")
elif not (model_dir / csv_path).exists():
errors.append(f"{path}: missing csv output {key}: {csv_path}")
source_files = _nested(payload, "subroutine", "source_files")
if isinstance(source_files, list):
if not source_files:
errors.append(f"{path}: missing source file declaration")
for item in source_files:
if not isinstance(item, dict):
errors.append(f"{path}: source_files entries must be objects")
continue
source_path_text = item.get("path")
expected_hash = item.get("sha256")
if not isinstance(source_path_text, str) or not source_path_text:
errors.append(f"{path}: missing source file path")
continue
source_path = Path(source_path_text)
if not source_path.is_absolute():
source_path = root / source_path
if not source_path.exists():
errors.append(f"{path}: missing source file {source_path_text}")
continue
actual_hash = sha256_file(source_path)
if expected_hash != actual_hash:
errors.append(f"{path}: sha256 mismatch for {source_path_text}")
return errors
def validate_metadata(path: Path, root: Path) -> list[str]:
payload, errors = _load_metadata(path)
if payload is None:
return errors
errors.extend(_validate_basic(path, payload))
if errors:
return errors
if payload.get("artifact_status") == READY_STATUS:
errors.extend(_validate_ready_files(path, root, payload))
return errors
def validate_root(root: Path) -> list[str]:
references_dir = root / "references"
if not references_dir.exists():
return []
errors: list[str] = []
for metadata in sorted(references_dir.rglob("metadata.json")):
errors.extend(validate_metadata(metadata, root))
return errors
def main() -> int:
root = Path(__file__).resolve().parent.parent
errors = validate_root(root)
if not errors:
print("Reference artifact metadata validation succeeded.")
return 0
print("Reference artifact metadata validation failed:", file=sys.stderr)
for error in errors:
print(f"- {error}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())
+268
View File
@@ -0,0 +1,268 @@
#!/usr/bin/env python3
"""Run C++/MSVC Harness validation commands."""
from __future__ import annotations
import json
import os
import shutil
import subprocess
import sys
from pathlib import Path
DEFAULT_GENERATOR = "Visual Studio 17 2022"
DEFAULT_PLATFORM = "x64"
DEFAULT_CONFIG = "Debug"
DEFAULT_BUILD_DIR = "build/msvc-debug"
PRESET_NAME = "msvc-debug"
COMMON_CMAKE_BIN = Path(r"C:\Program Files\CMake\bin")
ABAQUS_VALIDATION_MODES = {"off", "detect", "run"}
class ValidationConfigError(RuntimeError):
pass
class AbaqusValidation:
def __init__(self, *, mode: str, executable: str | None, commands: list[str], required: bool):
self.mode = mode
self.executable = executable
self.commands = commands
self.required = required
def load_env_commands() -> list[str]:
raw = os.environ.get("HARNESS_VALIDATION_COMMANDS", "")
return [line.strip() for line in raw.splitlines() if line.strip()]
def _load_multiline_env(name: str) -> list[str]:
raw = os.environ.get(name, "")
return [line.strip() for line in raw.splitlines() if line.strip()]
def python_script_command(script_name: str) -> str:
return subprocess.list2cmdline([sys.executable, str(Path("scripts") / script_name)])
def _cmake_config() -> tuple[str, str, str, Path]:
generator = os.environ.get("HARNESS_CMAKE_GENERATOR", DEFAULT_GENERATOR)
platform = os.environ.get("HARNESS_CMAKE_PLATFORM", DEFAULT_PLATFORM)
config = os.environ.get("HARNESS_CMAKE_CONFIG", DEFAULT_CONFIG)
build_dir = Path(os.environ.get("HARNESS_BUILD_DIR", DEFAULT_BUILD_DIR))
return generator, platform, config, build_dir
def _read_presets(root: Path) -> dict:
presets_file = root / "CMakePresets.json"
if not presets_file.exists():
return {}
try:
return json.loads(presets_file.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
def _preset_binary_dir(root: Path, preset: dict) -> Path:
binary_dir = str(preset.get("binaryDir") or DEFAULT_BUILD_DIR)
binary_dir = binary_dir.replace("${sourceDir}", str(root))
binary_dir = binary_dir.replace("$sourceDir", str(root))
path = Path(binary_dir)
return path if path.is_absolute() else root / path
def load_preset_commands(root: Path) -> list[str]:
payload = _read_presets(root)
config = os.environ.get("HARNESS_CMAKE_CONFIG", DEFAULT_CONFIG)
for preset in payload.get("configurePresets", []):
if isinstance(preset, dict) and preset.get("name") == PRESET_NAME:
build_dir = _preset_binary_dir(root, preset)
return [
f"cmake --preset {PRESET_NAME}",
f'cmake --build "{build_dir}" --config {config}',
f'ctest --test-dir "{build_dir}" --output-on-failure -C {config}',
]
return []
def load_cmake_commands(root: Path) -> list[str]:
if not (root / "CMakeLists.txt").exists():
return []
generator, platform, config, build_dir = _cmake_config()
if not build_dir.is_absolute():
build_dir = root / build_dir
return [
f'cmake -S "{root}" -B "{build_dir}" -G "{generator}" -A {platform}',
f'cmake --build "{build_dir}" --config {config}',
f'ctest --test-dir "{build_dir}" --output-on-failure -C {config}',
]
def load_harness_commands(root: Path) -> list[str]:
return [
python_script_command("validate_reference_artifacts.py"),
python_script_command("validate_fortran.py"),
]
def _quote(value: str | Path) -> str:
return subprocess.list2cmdline([str(value)])
def _resolve_executable(command: str, which=shutil.which) -> str | None:
path = Path(command)
if path.exists():
return str(path)
return which(command)
def _discover_oneapi_env_script() -> Path | None:
configured = os.environ.get("HARNESS_ONEAPI_VARS_BAT")
if configured:
path = Path(configured)
return path if path.exists() else None
for candidate in (
Path(r"C:\Program Files (x86)\Intel\oneAPI\compiler\latest\env\vars.bat"),
Path(r"C:\Program Files\Intel\oneAPI\compiler\latest\env\vars.bat"),
Path(r"C:\Program Files (x86)\Intel\oneAPI\setvars.bat"),
Path(r"C:\Program Files\Intel\oneAPI\setvars.bat"),
):
if candidate.exists():
return candidate
return None
def _maybe_wrap_oneapi(command: str) -> str:
mode = os.environ.get("HARNESS_ABAQUS_USE_ONEAPI_ENV", "auto").lower()
if mode not in {"auto", "on", "off"}:
raise ValidationConfigError(f"Unsupported HARNESS_ABAQUS_USE_ONEAPI_ENV: {mode}")
if mode == "off":
return command
env_script = _discover_oneapi_env_script()
if env_script is None:
if mode == "on":
raise ValidationConfigError("HARNESS_ABAQUS_USE_ONEAPI_ENV=on but no oneAPI vars.bat was found.")
return command
return f'cmd /d /s /c "call {_quote(env_script)} intel64 >nul && {command}"'
def discover_abaqus_validation(which=shutil.which) -> AbaqusValidation:
mode = os.environ.get("HARNESS_ABAQUS_VALIDATION", "off").lower()
if mode not in ABAQUS_VALIDATION_MODES:
raise ValidationConfigError(f"Unsupported HARNESS_ABAQUS_VALIDATION: {mode}")
if mode == "off":
return AbaqusValidation(mode=mode, executable=None, commands=[], required=False)
command_name = os.environ.get("HARNESS_ABAQUS_COMMAND", "abaqus")
executable = _resolve_executable(command_name, which=which)
if mode == "detect":
return AbaqusValidation(mode=mode, executable=executable, commands=[], required=False)
if executable is None:
raise ValidationConfigError(f"Abaqus executable not found: {command_name}")
commands = _load_multiline_env("HARNESS_ABAQUS_VALIDATION_COMMANDS")
if not commands:
raise ValidationConfigError("HARNESS_ABAQUS_VALIDATION=run requires HARNESS_ABAQUS_VALIDATION_COMMANDS.")
quoted_executable = _quote(executable)
resolved_commands = [_maybe_wrap_oneapi(command.replace("{abaqus}", quoted_executable)) for command in commands]
return AbaqusValidation(mode=mode, executable=executable, commands=resolved_commands, required=True)
def discover_commands(root: Path) -> list[str]:
env_commands = load_env_commands()
if env_commands:
return env_commands
commands = load_harness_commands(root)
preset_commands = load_preset_commands(root)
if preset_commands:
commands.extend(preset_commands)
else:
commands.extend(load_cmake_commands(root))
commands.extend(discover_abaqus_validation().commands)
return commands
def run_command(command: str, root: Path) -> subprocess.CompletedProcess:
return subprocess.run(
command,
cwd=root,
shell=True,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
env=validation_environment(os.environ),
)
def validation_environment(base_env: os._Environ | dict[str, str]) -> dict[str, str]:
env = dict(base_env)
if shutil.which("cmake") is not None:
return env
cmake_exe = COMMON_CMAKE_BIN / "cmake.exe"
if not cmake_exe.exists():
return env
current_path = env.get("PATH", "")
paths = [part for part in current_path.split(os.pathsep) if part]
common_bin_text = str(COMMON_CMAKE_BIN)
if not any(part.lower() == common_bin_text.lower() for part in paths):
env["PATH"] = common_bin_text + (os.pathsep + current_path if current_path else "")
return env
def emit_stream(prefix: str, content: str, *, stream) -> None:
text = (content or "").strip()
if not text:
return
print(prefix, file=stream)
print(text, file=stream)
def main() -> int:
root = Path(__file__).resolve().parent.parent
try:
commands = discover_commands(root)
abaqus_validation = (
AbaqusValidation(mode="off", executable=None, commands=[], required=False)
if load_env_commands()
else discover_abaqus_validation()
)
except ValidationConfigError as exc:
print(f"Validation configuration failed: {exc}", file=sys.stderr)
return 2
if abaqus_validation.mode == "detect":
if abaqus_validation.executable:
print(f"Abaqus executable detected: {abaqus_validation.executable}")
else:
print("Abaqus executable not detected.")
if not commands:
print("No validation commands configured.")
print("Add CMakeLists.txt or set HARNESS_VALIDATION_COMMANDS.")
return 0
for command in commands:
print(f"$ {command}")
result = run_command(command, root)
emit_stream("[stdout]", result.stdout, stream=sys.stdout)
emit_stream("[stderr]", result.stderr, stream=sys.stderr)
if result.returncode != 0:
print(f"Validation failed: {command}", file=sys.stderr)
return result.returncode
print("Validation succeeded.")
return 0
if __name__ == "__main__":
raise SystemExit(main())