#!/usr/bin/env python3 """Validate stored Abaqus user-subroutine reference artifact metadata.""" from __future__ import annotations import hashlib import json import sys from pathlib import Path SCHEMA_VERSION = "abaqus-user-subroutine-artifact-v1" VALID_STATUSES = {"draft", "needs-reference-artifacts", "ready-for-comparison", "blocked"} READY_STATUS = "ready-for-comparison" def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def _nested(payload: dict, *keys: str): current = payload for key in keys: if not isinstance(current, dict) or key not in current: return None current = current[key] return current def _load_metadata(path: Path) -> tuple[dict | None, list[str]]: try: payload = json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: return None, [f"{path}: invalid JSON: {exc}"] if not isinstance(payload, dict): return None, [f"{path}: metadata must be a JSON object"] return payload, [] def _validate_basic(path: Path, payload: dict) -> list[str]: errors: list[str] = [] for key in ("schema_version", "feature_id", "model_id", "artifact_status"): if not payload.get(key): errors.append(f"{path}: missing required key {key}") if payload.get("schema_version") and payload.get("schema_version") != SCHEMA_VERSION: errors.append(f"{path}: unsupported schema_version {payload.get('schema_version')}") status = payload.get("artifact_status") if status and status not in VALID_STATUSES: errors.append(f"{path}: unsupported artifact_status {status}") return errors def _require_ready_key(path: Path, payload: dict, *keys: str) -> list[str]: value = _nested(payload, *keys) dotted = ".".join(keys) if value is None or value == "": return [f"{path}: missing ready-for-comparison key {dotted}"] return [] def _validate_ready_files(path: Path, root: Path, payload: dict) -> list[str]: errors: list[str] = [] model_dir = path.parent for keys in ( ("abaqus", "version"), ("abaqus", "precision"), ("abaqus", "command"), ("compiler", "vendor"), ("compiler", "name"), ("compiler", "version"), ("subroutine", "entry_points"), ("subroutine", "source_files"), ("input_file",), ("outputs", "tails"), ("outputs", "csv"), ): errors.extend(_require_ready_key(path, payload, *keys)) input_file = payload.get("input_file") if isinstance(input_file, str) and not (model_dir / input_file).exists(): errors.append(f"{path}: missing input_file {input_file}") tails = _nested(payload, "outputs", "tails") if isinstance(tails, dict): for key in ("msg", "dat", "log"): tail_path = tails.get(key) if not isinstance(tail_path, str) or not tail_path: errors.append(f"{path}: missing output tail {key}") elif not (model_dir / tail_path).exists(): errors.append(f"{path}: missing output tail {key}: {tail_path}") csv_outputs = _nested(payload, "outputs", "csv") if isinstance(csv_outputs, dict): if not csv_outputs: errors.append(f"{path}: missing csv output declaration") for key, csv_path in csv_outputs.items(): if not isinstance(csv_path, str) or not csv_path: errors.append(f"{path}: missing csv output {key}") elif not (model_dir / csv_path).exists(): errors.append(f"{path}: missing csv output {key}: {csv_path}") source_files = _nested(payload, "subroutine", "source_files") if isinstance(source_files, list): if not source_files: errors.append(f"{path}: missing source file declaration") for item in source_files: if not isinstance(item, dict): errors.append(f"{path}: source_files entries must be objects") continue source_path_text = item.get("path") expected_hash = item.get("sha256") if not isinstance(source_path_text, str) or not source_path_text: errors.append(f"{path}: missing source file path") continue source_path = Path(source_path_text) if not source_path.is_absolute(): source_path = root / source_path if not source_path.exists(): errors.append(f"{path}: missing source file {source_path_text}") continue actual_hash = sha256_file(source_path) if expected_hash != actual_hash: errors.append(f"{path}: sha256 mismatch for {source_path_text}") return errors def validate_metadata(path: Path, root: Path) -> list[str]: payload, errors = _load_metadata(path) if payload is None: return errors errors.extend(_validate_basic(path, payload)) if errors: return errors if payload.get("artifact_status") == READY_STATUS: errors.extend(_validate_ready_files(path, root, payload)) return errors def validate_root(root: Path) -> list[str]: references_dir = root / "references" if not references_dir.exists(): return [] errors: list[str] = [] for metadata in sorted(references_dir.rglob("metadata.json")): errors.extend(validate_metadata(metadata, root)) return errors def main() -> int: root = Path(__file__).resolve().parent.parent errors = validate_root(root) if not errors: print("Reference artifact metadata validation succeeded.") return 0 print("Reference artifact metadata validation failed:", file=sys.stderr) for error in errors: print(f"- {error}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())