246 lines
8.7 KiB
Python
246 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Validate stored Abaqus user-subroutine reference artifact metadata."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
|
|
SCHEMA_VERSION = "abaqus-user-subroutine-artifact-v1"
|
|
VALID_STATUSES = {"draft", "needs-reference-artifacts", "ready-for-comparison", "blocked"}
|
|
READY_STATUS = "ready-for-comparison"
|
|
SHA256_RE = re.compile(r"^[0-9a-fA-F]{64}$")
|
|
|
|
|
|
def sha256_file(path: Path) -> str:
|
|
digest = hashlib.sha256()
|
|
with path.open("rb") as handle:
|
|
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
|
|
digest.update(chunk)
|
|
return digest.hexdigest()
|
|
|
|
|
|
def _nested(payload: dict, *keys: str):
|
|
current = payload
|
|
for key in keys:
|
|
if not isinstance(current, dict) or key not in current:
|
|
return None
|
|
current = current[key]
|
|
return current
|
|
|
|
|
|
def _load_metadata(path: Path) -> tuple[dict | None, list[str]]:
|
|
try:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
return None, [f"{path}: invalid JSON: {exc}"]
|
|
if not isinstance(payload, dict):
|
|
return None, [f"{path}: metadata must be a JSON object"]
|
|
return payload, []
|
|
|
|
|
|
def _validate_basic(path: Path, payload: dict) -> list[str]:
|
|
errors: list[str] = []
|
|
for key in ("schema_version", "feature_id", "model_id", "artifact_status"):
|
|
if not payload.get(key):
|
|
errors.append(f"{path}: missing required key {key}")
|
|
|
|
if payload.get("schema_version") and payload.get("schema_version") != SCHEMA_VERSION:
|
|
errors.append(f"{path}: unsupported schema_version {payload.get('schema_version')}")
|
|
|
|
status = payload.get("artifact_status")
|
|
if status and status not in VALID_STATUSES:
|
|
errors.append(f"{path}: unsupported artifact_status {status}")
|
|
return errors
|
|
|
|
|
|
def _require_ready_key(path: Path, payload: dict, *keys: str) -> list[str]:
|
|
value = _nested(payload, *keys)
|
|
dotted = ".".join(keys)
|
|
if value is None or value == "":
|
|
return [f"{path}: missing ready-for-comparison key {dotted}"]
|
|
return []
|
|
|
|
|
|
def _is_safe_relative_path(path_text: str) -> bool:
|
|
candidate = Path(path_text)
|
|
return not candidate.is_absolute() and ".." not in candidate.parts
|
|
|
|
|
|
def _is_extracted_csv_path(path_text: str) -> bool:
|
|
candidate = Path(path_text)
|
|
return (
|
|
_is_safe_relative_path(path_text)
|
|
and len(candidate.parts) == 2
|
|
and candidate.parts[0] == "extracted"
|
|
and candidate.suffix.lower() == ".csv"
|
|
)
|
|
|
|
|
|
def _validate_optional_sha256_file(path: Path, model_dir: Path, key: str, value: object) -> list[str]:
|
|
if value is None:
|
|
return []
|
|
if not isinstance(value, str) or not value:
|
|
return [f"{path}: invalid {key}"]
|
|
if not _is_safe_relative_path(value):
|
|
return [f"{path}: {key} must be a relative path inside the artifact bundle"]
|
|
|
|
sha_path = model_dir / value
|
|
if not sha_path.exists():
|
|
return [f"{path}: missing {key}: {value}"]
|
|
|
|
first_token = sha_path.read_text(encoding="utf-8").strip().split(maxsplit=1)[0]
|
|
if not SHA256_RE.match(first_token):
|
|
return [f"{path}: invalid {key}: {value}"]
|
|
return []
|
|
|
|
|
|
def _validate_extraction(path: Path, model_dir: Path, payload: dict) -> list[str]:
|
|
errors: list[str] = []
|
|
extraction = payload.get("extraction")
|
|
if not isinstance(extraction, dict):
|
|
return [f"{path}: extraction provenance must be an object"]
|
|
|
|
for key in ("source_odb", "tool", "extracted_at", "csv_directory"):
|
|
if not extraction.get(key):
|
|
errors.append(f"{path}: missing extraction provenance key {key}")
|
|
|
|
csv_directory = extraction.get("csv_directory")
|
|
if isinstance(csv_directory, str) and csv_directory != "extracted":
|
|
errors.append(f"{path}: extraction.csv_directory must be extracted")
|
|
|
|
script = extraction.get("script")
|
|
if script is not None:
|
|
if not isinstance(script, str) or not script:
|
|
errors.append(f"{path}: invalid extraction script")
|
|
elif not _is_safe_relative_path(script):
|
|
errors.append(f"{path}: extraction script must be a relative path inside the artifact bundle")
|
|
elif not (model_dir / script).exists():
|
|
errors.append(f"{path}: missing extraction script: {script}")
|
|
|
|
odb_sha256 = extraction.get("odb_sha256")
|
|
if odb_sha256 is not None and (not isinstance(odb_sha256, str) or not SHA256_RE.match(odb_sha256)):
|
|
errors.append(f"{path}: invalid odb_sha256")
|
|
|
|
errors.extend(_validate_optional_sha256_file(path, model_dir, "odb_sha256_file", extraction.get("odb_sha256_file")))
|
|
return errors
|
|
|
|
|
|
def _validate_ready_files(path: Path, root: Path, payload: dict) -> list[str]:
|
|
errors: list[str] = []
|
|
model_dir = path.parent
|
|
|
|
for keys in (
|
|
("abaqus", "version"),
|
|
("abaqus", "precision"),
|
|
("compiler", "vendor"),
|
|
("compiler", "name"),
|
|
("compiler", "version"),
|
|
("subroutine", "entry_points"),
|
|
("subroutine", "source_files"),
|
|
("input_file",),
|
|
("outputs", "tails"),
|
|
("outputs", "csv"),
|
|
("extraction",),
|
|
):
|
|
errors.extend(_require_ready_key(path, payload, *keys))
|
|
|
|
input_file = payload.get("input_file")
|
|
if isinstance(input_file, str) and not (model_dir / input_file).exists():
|
|
errors.append(f"{path}: missing input_file {input_file}")
|
|
|
|
tails = _nested(payload, "outputs", "tails")
|
|
if isinstance(tails, dict):
|
|
for key in ("msg", "dat", "log", "sta"):
|
|
tail_path = tails.get(key)
|
|
if not isinstance(tail_path, str) or not tail_path:
|
|
errors.append(f"{path}: missing output tail {key}")
|
|
elif not (model_dir / tail_path).exists():
|
|
errors.append(f"{path}: missing output tail {key}: {tail_path}")
|
|
|
|
csv_outputs = _nested(payload, "outputs", "csv")
|
|
if isinstance(csv_outputs, dict):
|
|
if not csv_outputs:
|
|
errors.append(f"{path}: missing csv output declaration")
|
|
for key, csv_path in csv_outputs.items():
|
|
if not isinstance(csv_path, str) or not csv_path:
|
|
errors.append(f"{path}: missing csv output {key}")
|
|
elif not _is_extracted_csv_path(csv_path):
|
|
errors.append(f"{path}: csv output {key} must match extracted/*.csv")
|
|
elif not (model_dir / csv_path).exists():
|
|
errors.append(f"{path}: missing csv output {key}: {csv_path}")
|
|
|
|
if "extraction" in payload:
|
|
errors.extend(_validate_extraction(path, model_dir, payload))
|
|
|
|
source_files = _nested(payload, "subroutine", "source_files")
|
|
if isinstance(source_files, list):
|
|
if not source_files:
|
|
errors.append(f"{path}: missing source file declaration")
|
|
for item in source_files:
|
|
if not isinstance(item, dict):
|
|
errors.append(f"{path}: source_files entries must be objects")
|
|
continue
|
|
source_path_text = item.get("path")
|
|
expected_hash = item.get("sha256")
|
|
if not isinstance(source_path_text, str) or not source_path_text:
|
|
errors.append(f"{path}: missing source file path")
|
|
continue
|
|
source_path = Path(source_path_text)
|
|
if not source_path.is_absolute():
|
|
source_path = root / source_path
|
|
if not source_path.exists():
|
|
errors.append(f"{path}: missing source file {source_path_text}")
|
|
continue
|
|
actual_hash = sha256_file(source_path)
|
|
if expected_hash != actual_hash:
|
|
errors.append(f"{path}: sha256 mismatch for {source_path_text}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_metadata(path: Path, root: Path) -> list[str]:
|
|
payload, errors = _load_metadata(path)
|
|
if payload is None:
|
|
return errors
|
|
|
|
errors.extend(_validate_basic(path, payload))
|
|
if errors:
|
|
return errors
|
|
|
|
if payload.get("artifact_status") == READY_STATUS:
|
|
errors.extend(_validate_ready_files(path, root, payload))
|
|
return errors
|
|
|
|
|
|
def validate_root(root: Path) -> list[str]:
|
|
references_dir = root / "references"
|
|
if not references_dir.exists():
|
|
return []
|
|
|
|
errors: list[str] = []
|
|
for metadata in sorted(references_dir.rglob("metadata.json")):
|
|
errors.extend(validate_metadata(metadata, root))
|
|
return errors
|
|
|
|
|
|
def main() -> int:
|
|
root = Path(__file__).resolve().parent.parent
|
|
errors = validate_root(root)
|
|
if not errors:
|
|
print("Reference artifact metadata validation succeeded.")
|
|
return 0
|
|
|
|
print("Reference artifact metadata validation failed:", file=sys.stderr)
|
|
for error in errors:
|
|
print(f"- {error}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|