modify documents

This commit is contained in:
김경종
2026-06-11 11:08:27 +09:00
parent 98eba54a12
commit 986cc9888e
35 changed files with 1984 additions and 169 deletions
+327
View File
@@ -0,0 +1,327 @@
#!/usr/bin/env python3
"""Compare externally generated Abaqus ODB-extracted CSV results."""
from __future__ import annotations
import argparse
import csv
import json
import math
import sys
from pathlib import Path
try:
from validate_reference_artifacts import validate_metadata
except ImportError:
from scripts.validate_reference_artifacts import validate_metadata
def load_csv_rows(path: Path) -> tuple[list[str], list[dict[str, str]]]:
with path.open(newline="", encoding="utf-8") as handle:
reader = csv.DictReader(handle)
return list(reader.fieldnames or []), list(reader)
def make_key(row: dict[str, str], key_columns: list[str]) -> tuple[str, ...]:
return tuple(row.get(column, "") for column in key_columns)
def _key_text(key: tuple[str, ...]) -> str:
return "|".join(key)
def _parse_finite(value: str) -> float:
parsed = float(value)
if not math.isfinite(parsed):
raise ValueError(f"nonfinite value: {value}")
return parsed
def failed_quantity(quantity: str, classification: str, message: str) -> dict:
return {
"quantity": quantity,
"result": "fail",
"classification": classification,
"message": message,
"compared_rows": 0,
"missing_rows": 0,
"extra_rows": 0,
"max_abs_error": None,
"max_rel_error": None,
"rms_error": None,
"worst_key": None,
"worst_component": None,
}
def validate_columns(headers: list[str], required_columns: list[str]) -> list[str]:
return [column for column in required_columns if column not in headers]
def duplicate_columns(headers: list[str]) -> list[str]:
seen: set[str] = set()
duplicates: list[str] = []
for header in headers:
if header in seen and header not in duplicates:
duplicates.append(header)
seen.add(header)
return duplicates
def _rows_by_key(rows: list[dict[str, str]], key_columns: list[str]) -> tuple[dict[tuple[str, ...], dict[str, str]], set[tuple[str, ...]]]:
keyed: dict[tuple[str, ...], dict[str, str]] = {}
duplicates: set[tuple[str, ...]] = set()
for row in rows:
key = make_key(row, key_columns)
if key in keyed:
duplicates.add(key)
keyed[key] = row
return keyed, duplicates
def validate_contract(contract: dict) -> list[str]:
required_keys = [
"reference_csv",
"actual_csv",
"required_columns",
"key_columns",
"value_column",
"tolerance",
]
missing = [key for key in required_keys if key not in contract]
if not isinstance(contract.get("tolerance", {}), dict):
missing.append("tolerance")
return sorted(set(missing))
def compare_quantity(quantity: str, contract: dict, reference_root: Path, actual_root: Path) -> dict:
contract_errors = validate_contract(contract)
if contract_errors:
return failed_quantity(
quantity,
"upstream-contract",
f"missing comparison contract keys: {', '.join(contract_errors)}",
)
reference_csv = reference_root / contract["reference_csv"]
actual_csv = actual_root / contract["actual_csv"]
if not reference_csv.exists():
return failed_quantity(quantity, "missing-reference-artifact", f"missing reference CSV: {reference_csv}")
if not actual_csv.exists():
return failed_quantity(quantity, "missing-generated-output", f"missing actual CSV: {actual_csv}")
reference_headers, reference_rows = load_csv_rows(reference_csv)
actual_headers, actual_rows = load_csv_rows(actual_csv)
required_columns = list(contract["required_columns"])
key_columns = list(contract["key_columns"])
value_column = contract["value_column"]
repeated_columns = duplicate_columns(reference_headers) + duplicate_columns(actual_headers)
if repeated_columns:
return failed_quantity(
quantity,
"schema-mismatch",
f"duplicate CSV header columns: {', '.join(sorted(set(repeated_columns)))}",
)
reference_missing_columns = validate_columns(reference_headers, required_columns)
actual_missing_columns = validate_columns(actual_headers, required_columns)
if reference_missing_columns or actual_missing_columns:
missing = sorted(set(reference_missing_columns + actual_missing_columns))
return failed_quantity(quantity, "schema-mismatch", f"missing required columns: {', '.join(missing)}")
reference_by_key, reference_duplicates = _rows_by_key(reference_rows, key_columns)
actual_by_key, actual_duplicates = _rows_by_key(actual_rows, key_columns)
duplicate_keys = reference_duplicates | actual_duplicates
if duplicate_keys:
return failed_quantity(quantity, "schema-mismatch", f"duplicate key rows: {_key_text(sorted(duplicate_keys)[0])}")
reference_keys = set(reference_by_key)
actual_keys = set(actual_by_key)
missing_keys = sorted(reference_keys - actual_keys)
extra_keys = sorted(actual_keys - reference_keys)
if missing_keys or extra_keys:
result = failed_quantity(quantity, "id-mismatch", "reference and actual row keys do not match")
result["missing_rows"] = len(missing_keys)
result["extra_rows"] = len(extra_keys)
result["worst_key"] = _key_text((missing_keys or extra_keys)[0])
return result
tolerance = contract.get("tolerance", {})
absolute = float(tolerance.get("absolute", 0.0))
relative = float(tolerance.get("relative", 0.0))
relative_floor = float(tolerance.get("relative_floor", 0.0))
unit_column = contract.get("unit_column")
coordinate_system_column = contract.get("coordinate_system_column")
compared_rows = 0
max_abs_error = 0.0
max_rel_error = 0.0
sum_square_error = 0.0
worst_key: tuple[str, ...] | None = None
tolerance_failed = False
for key in sorted(reference_keys):
reference_row = reference_by_key[key]
actual_row = actual_by_key[key]
try:
reference_value = _parse_finite(reference_row[value_column])
actual_value = _parse_finite(actual_row[value_column])
except (KeyError, ValueError) as exc:
return failed_quantity(quantity, "nonfinite-result", str(exc))
if unit_column and reference_row[unit_column] != actual_row[unit_column]:
result = failed_quantity(quantity, "unit-or-coordinate-mismatch", f"unit mismatch at {_key_text(key)}")
result["worst_key"] = _key_text(key)
return result
if coordinate_system_column and reference_row[coordinate_system_column] != actual_row[coordinate_system_column]:
result = failed_quantity(
quantity,
"unit-or-coordinate-mismatch",
f"coordinate system mismatch at {_key_text(key)}",
)
result["worst_key"] = _key_text(key)
return result
abs_error = abs(actual_value - reference_value)
rel_denominator = max(abs(reference_value), relative_floor)
rel_error = abs_error / rel_denominator if rel_denominator else 0.0
allowed_error = absolute + relative * rel_denominator
compared_rows += 1
sum_square_error += abs_error * abs_error
max_rel_error = max(max_rel_error, rel_error)
if worst_key is None or abs_error > max_abs_error:
max_abs_error = abs_error
worst_key = key
if abs_error > allowed_error:
tolerance_failed = True
rms_error = math.sqrt(sum_square_error / compared_rows) if compared_rows else 0.0
worst_key_text = _key_text(worst_key) if worst_key is not None else None
worst_component = worst_key[-1] if worst_key else None
result = "fail" if tolerance_failed else "pass"
classification = "tolerance-failure" if tolerance_failed else "N/A"
return {
"quantity": quantity,
"result": result,
"classification": classification,
"message": "",
"compared_rows": compared_rows,
"missing_rows": 0,
"extra_rows": 0,
"max_abs_error": max_abs_error,
"max_rel_error": max_rel_error,
"rms_error": rms_error,
"worst_key": worst_key_text,
"worst_component": worst_component,
}
def compare_metadata(
metadata_path: Path,
actual_root: Path,
*,
quantities: list[str] | None = None,
validate_artifacts: bool = True,
) -> dict:
payload = json.loads(metadata_path.read_text(encoding="utf-8"))
comparisons = payload.get("comparisons", {})
if not isinstance(comparisons, dict) or (not comparisons and quantities is None):
quantity_names = quantities if quantities is not None else ["metadata"]
results = [
failed_quantity(quantity, "upstream-contract", "missing comparison contracts")
for quantity in quantity_names
]
return {
"metadata": str(metadata_path),
"actual_root": str(actual_root),
"overall_result": "fail",
"quantities": results,
}
selected_quantities = quantities if quantities is not None else sorted(comparisons)
if validate_artifacts:
validation_errors = validate_metadata(metadata_path, _project_root_from_metadata(metadata_path))
if validation_errors:
message = "; ".join(validation_errors)
results = [
failed_quantity(quantity, "missing-reference-artifact", message)
for quantity in (selected_quantities or ["metadata"])
]
return {
"metadata": str(metadata_path),
"actual_root": str(actual_root),
"overall_result": "fail",
"quantities": results,
}
results = []
for quantity in selected_quantities:
contract = comparisons.get(quantity)
if contract is None:
results.append(failed_quantity(quantity, "upstream-contract", f"missing comparison contract: {quantity}"))
continue
results.append(compare_quantity(quantity, contract, metadata_path.parent, actual_root))
overall = "pass" if all(result["result"] == "pass" for result in results) else "fail"
return {
"metadata": str(metadata_path),
"actual_root": str(actual_root),
"overall_result": overall,
"quantities": results,
}
class _ArgumentParser(argparse.ArgumentParser):
def error(self, message: str) -> None:
raise ValueError(message)
def build_arg_parser() -> argparse.ArgumentParser:
parser = _ArgumentParser(description="Compare externally generated ODB-extracted CSV outputs.")
parser.add_argument("--metadata", required=True, type=Path, help="Reference metadata.json path.")
parser.add_argument("--actual-root", required=True, type=Path, help="Root directory containing actual extracted CSVs.")
parser.add_argument("--quantity", action="append", default=None, help="Quantity key to compare. May be repeated.")
parser.add_argument("--report-json", type=Path, default=None, help="Optional JSON report output path.")
return parser
def _format_summary(result: dict) -> str:
status = result["result"].upper()
parts = [
f"{status} {result['quantity']}",
f"rows={result['compared_rows']}",
f"max_abs_error={result['max_abs_error']}",
f"max_rel_error={result['max_rel_error']}",
f"rms_error={result['rms_error']}",
f"worst_key={result['worst_key']}",
]
if result["classification"] != "N/A":
parts.insert(2, f"classification={result['classification']}")
return " ".join(parts)
def _project_root_from_metadata(metadata_path: Path) -> Path:
resolved = metadata_path.resolve()
for parent in resolved.parents:
if parent.name == "references":
return parent.parent
return resolved.parent
def main(argv: list[str] | None = None) -> int:
parser = build_arg_parser()
try:
args = parser.parse_args(argv)
report = compare_metadata(args.metadata, args.actual_root, quantities=args.quantity)
except (OSError, ValueError, json.JSONDecodeError) as exc:
print(f"CSV comparison configuration failed: {exc}", file=sys.stderr)
return 2
if args.report_json is not None:
args.report_json.parent.mkdir(parents=True, exist_ok=True)
args.report_json.write_text(json.dumps(report, indent=2), encoding="utf-8")
for result in report["quantities"]:
print(_format_summary(result))
return 0 if report["overall_result"] == "pass" else 1
if __name__ == "__main__":
raise SystemExit(main())
+11 -4
View File
@@ -77,7 +77,7 @@ SKILLS = {
),
"abaqus-subroutine-validation": (
"Subroutine validation",
"HARNESS_ABAQUS_VALIDATION=run",
"externally generated",
"ready-for-comparison",
"source hash",
"msg/dat/log",
@@ -174,7 +174,7 @@ AGENT_REQUIRED_TERMS = {
"implementation-agent.toml": ("Fortran source", "Intel oneAPI", "RED -> GREEN -> VERIFY"),
"build-test-executor-agent.toml": (
"python scripts/validate_fortran.py",
"HARNESS_ABAQUS_VALIDATION=run",
"externally generated",
),
"correction-agent.toml": ("Fortran compile", "minimal correction"),
"reference-verification-agent.toml": (
@@ -263,14 +263,21 @@ class AbaqusSubroutineCodexConfigTests(unittest.TestCase):
checked_paths += [SKILLS_ROOT / name / "SKILL.md" for name in SKILLS]
checked_paths += list((SKILLS_ROOT / "harness-workflow").glob("SKILL.md"))
checked_paths += list((SKILLS_ROOT / "harness-review").glob("SKILL.md"))
checked_paths += list((ROOT / "docs").glob("**/README.md"))
forbidden_terms = (
"FESA",
"FESA solver",
"FESA FEM",
"FESA C++",
"Nastran",
"C++17/MSVC",
"C++/MSVC",
"CMake/CTest",
"HARNESS_ABAQUS_VALIDATION=run",
"opt-in Abaqus",
"Abaqus opt-in",
"Abaqus execution is valid",
"If explicitly configured, run",
)
for path in checked_paths:
with self.subTest(path=path):
@@ -287,7 +294,7 @@ class AbaqusSubroutineCodexConfigTests(unittest.TestCase):
"Fortran",
"python scripts/validate_fortran.py",
"python scripts/validate_reference_artifacts.py",
"HARNESS_ABAQUS_VALIDATION=run",
"externally generated",
):
self.assertIn(term, text)
+434
View File
@@ -0,0 +1,434 @@
import csv
import contextlib
import hashlib
import importlib.util
import io
import json
import tempfile
import unittest
from pathlib import Path
def load_compare_extracted_csv():
module_path = Path(__file__).resolve().parent / "compare_extracted_csv.py"
spec = importlib.util.spec_from_file_location("compare_extracted_csv", module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def write_json(path: Path, payload: dict):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def write_csv(path: Path, rows: list[dict[str, str]]):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=list(rows[0]))
writer.writeheader()
writer.writerows(rows)
def write_text(path: Path, text: str):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")
def metadata_payload() -> dict:
return {
"schema_version": "abaqus-user-subroutine-artifact-v1",
"feature_id": "umat",
"model_id": "single-element",
"artifact_status": "ready-for-comparison",
"abaqus": {"version": "2024", "precision": "double"},
"compiler": {"vendor": "Intel oneAPI", "name": "ifx", "version": "2024"},
"subroutine": {"entry_points": ["UMAT"], "source_files": []},
"input_file": "model.inp",
"outputs": {
"tails": {
"msg": "job.msg.tail.txt",
"dat": "job.dat.tail.txt",
"log": "job.log.tail.txt",
"sta": "job.sta.tail.txt",
},
"csv": {"stresses": "extracted/stresses.csv"},
},
"extraction": {
"source_odb": "job.odb",
"tool": "Abaqus Python",
"extracted_at": "2026-06-10T00:00:00+09:00",
"csv_directory": "extracted",
},
"comparisons": {
"stresses": {
"reference_csv": "extracted/stresses.csv",
"actual_csv": "extracted/stresses.csv",
"required_columns": [
"step",
"frame",
"instance",
"element_label",
"integration_point",
"section_point",
"output_position",
"component",
"coordinate_system",
"unit",
"value",
],
"key_columns": [
"step",
"frame",
"instance",
"element_label",
"integration_point",
"section_point",
"output_position",
"component",
],
"value_column": "value",
"unit_column": "unit",
"coordinate_system_column": "coordinate_system",
"tolerance": {"absolute": 1.0e-8, "relative": 1.0e-6, "relative_floor": 1.0e-12},
}
},
}
def stress_rows(value: str = "100.0") -> list[dict[str, str]]:
return [
{
"step": "Step-1",
"frame": "1",
"instance": "PART-1-1",
"element_label": "1",
"integration_point": "1",
"section_point": "",
"output_position": "INTEGRATION_POINT",
"component": "S11",
"coordinate_system": "GLOBAL",
"unit": "MPa",
"value": value,
}
]
def make_metadata_valid_for_artifact_validation(root: Path, model_dir: Path, payload: dict) -> dict:
source = root / "src" / "fortran" / "abaqus" / "UMAT.for"
source.parent.mkdir(parents=True, exist_ok=True)
source.write_text(" subroutine umat()\n end\n", encoding="utf-8")
source_hash = hashlib.sha256(source.read_bytes()).hexdigest()
payload["subroutine"]["source_files"] = [
{
"path": "src/fortran/abaqus/UMAT.for",
"language": "Fortran",
"sha256": source_hash,
}
]
for name in [
"model.inp",
"job.msg.tail.txt",
"job.dat.tail.txt",
"job.log.tail.txt",
"job.sta.tail.txt",
]:
path = model_dir / name
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("ok\n", encoding="utf-8")
return payload
def compare_stresses(reference_rows: list[dict[str, str]], actual_rows: list[dict[str, str]]) -> dict:
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
write_json(reference / "metadata.json", metadata_payload())
write_csv(reference / "extracted" / "stresses.csv", reference_rows)
write_csv(actual / "extracted" / "stresses.csv", actual_rows)
return compare.compare_metadata(
reference / "metadata.json",
actual,
quantities=["stresses"],
validate_artifacts=False,
)
def call_main_silently(compare, argv: list[str]) -> int:
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
return compare.main(argv)
class CompareExtractedCsvTests(unittest.TestCase):
def test_quantity_passes_when_schema_keys_units_and_values_match_within_tolerance(self):
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
write_json(reference / "metadata.json", metadata_payload())
write_csv(reference / "extracted" / "stresses.csv", stress_rows("100.0"))
write_csv(actual / "extracted" / "stresses.csv", stress_rows("100.00000001"))
report = compare.compare_metadata(
reference / "metadata.json",
actual,
quantities=["stresses"],
validate_artifacts=False,
)
self.assertEqual(report["overall_result"], "pass")
self.assertEqual(report["quantities"][0]["result"], "pass")
self.assertEqual(report["quantities"][0]["classification"], "N/A")
self.assertEqual(report["quantities"][0]["compared_rows"], 1)
def test_missing_actual_csv_is_missing_generated_output(self):
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
write_json(reference / "metadata.json", metadata_payload())
write_csv(reference / "extracted" / "stresses.csv", stress_rows("100.0"))
report = compare.compare_metadata(
reference / "metadata.json",
actual,
quantities=["stresses"],
validate_artifacts=False,
)
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "missing-generated-output")
def test_missing_required_column_is_schema_mismatch(self):
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
write_json(reference / "metadata.json", metadata_payload())
row = stress_rows("100.0")[0]
write_csv(reference / "extracted" / "stresses.csv", [row])
actual_row = dict(row)
actual_row.pop("coordinate_system")
write_csv(actual / "extracted" / "stresses.csv", [actual_row])
report = compare.compare_metadata(
reference / "metadata.json",
actual,
quantities=["stresses"],
validate_artifacts=False,
)
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "schema-mismatch")
def test_missing_quantity_contract_is_upstream_contract(self):
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
payload = metadata_payload()
payload["comparisons"].pop("stresses")
write_json(reference / "metadata.json", payload)
report = compare.compare_metadata(
reference / "metadata.json",
actual,
quantities=["stresses"],
validate_artifacts=False,
)
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "upstream-contract")
def test_missing_comparisons_block_is_upstream_contract(self):
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
payload = metadata_payload()
payload.pop("comparisons")
write_json(reference / "metadata.json", payload)
report = compare.compare_metadata(reference / "metadata.json", actual, validate_artifacts=False)
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "upstream-contract")
def test_incomplete_quantity_contract_is_upstream_contract(self):
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
payload = metadata_payload()
payload["comparisons"]["stresses"].pop("value_column")
write_json(reference / "metadata.json", payload)
report = compare.compare_metadata(
reference / "metadata.json",
actual,
quantities=["stresses"],
validate_artifacts=False,
)
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "upstream-contract")
def test_duplicate_header_is_schema_mismatch(self):
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
write_json(reference / "metadata.json", metadata_payload())
write_csv(reference / "extracted" / "stresses.csv", stress_rows("100.0"))
header = ",".join(list(stress_rows("100.0")[0]) + ["value"])
row = ",".join(stress_rows("100.0")[0].values()) + ",100.0"
write_text(actual / "extracted" / "stresses.csv", f"{header}\n{row}\n")
report = compare.compare_metadata(
reference / "metadata.json",
actual,
quantities=["stresses"],
validate_artifacts=False,
)
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "schema-mismatch")
def test_changed_row_key_is_id_mismatch(self):
actual_row = dict(stress_rows("100.0")[0])
actual_row["element_label"] = "2"
report = compare_stresses(stress_rows("100.0"), [actual_row])
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "id-mismatch")
self.assertEqual(report["quantities"][0]["missing_rows"], 1)
self.assertEqual(report["quantities"][0]["extra_rows"], 1)
def test_unit_mismatch_is_unit_or_coordinate_mismatch(self):
actual_row = dict(stress_rows("100.0")[0])
actual_row["unit"] = "Pa"
report = compare_stresses(stress_rows("100.0"), [actual_row])
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "unit-or-coordinate-mismatch")
def test_coordinate_system_mismatch_is_unit_or_coordinate_mismatch(self):
actual_row = dict(stress_rows("100.0")[0])
actual_row["coordinate_system"] = "LOCAL-1"
report = compare_stresses(stress_rows("100.0"), [actual_row])
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "unit-or-coordinate-mismatch")
def test_nonfinite_value_is_nonfinite_result(self):
report = compare_stresses(stress_rows("100.0"), stress_rows("nan"))
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "nonfinite-result")
def test_value_outside_tolerance_is_tolerance_failure(self):
report = compare_stresses(stress_rows("100.0"), stress_rows("101.0"))
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "tolerance-failure")
self.assertEqual(report["quantities"][0]["max_abs_error"], 1.0)
self.assertEqual(report["quantities"][0]["result"], "fail")
def test_cli_writes_json_report_and_returns_zero_when_all_quantities_pass(self):
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
report_json = root / "build" / "reference-verification" / "umat-single-element.json"
payload = make_metadata_valid_for_artifact_validation(root, reference, metadata_payload())
write_json(reference / "metadata.json", payload)
write_csv(reference / "extracted" / "stresses.csv", stress_rows("100.0"))
write_csv(actual / "extracted" / "stresses.csv", stress_rows("100.00000001"))
exit_code = call_main_silently(
compare,
[
"--metadata",
str(reference / "metadata.json"),
"--actual-root",
str(actual),
"--quantity",
"stresses",
"--report-json",
str(report_json),
],
)
report = json.loads(report_json.read_text(encoding="utf-8"))
self.assertEqual(exit_code, 0)
self.assertEqual(report["overall_result"], "pass")
def test_cli_writes_json_report_and_returns_one_when_a_quantity_fails(self):
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
report_json = root / "build" / "reference-verification" / "umat-single-element.json"
payload = make_metadata_valid_for_artifact_validation(root, reference, metadata_payload())
write_json(reference / "metadata.json", payload)
write_csv(reference / "extracted" / "stresses.csv", stress_rows("100.0"))
write_csv(actual / "extracted" / "stresses.csv", stress_rows("101.0"))
exit_code = call_main_silently(
compare,
[
"--metadata",
str(reference / "metadata.json"),
"--actual-root",
str(actual),
"--quantity",
"stresses",
"--report-json",
str(report_json),
],
)
report = json.loads(report_json.read_text(encoding="utf-8"))
self.assertEqual(exit_code, 1)
self.assertEqual(report["overall_result"], "fail")
def test_cli_returns_two_for_invalid_arguments(self):
compare = load_compare_extracted_csv()
self.assertEqual(call_main_silently(compare, []), 2)
def test_default_comparison_validates_reference_artifact_metadata(self):
compare = load_compare_extracted_csv()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
reference = root / "references" / "umat" / "single-element"
actual = root / "external-results" / "umat" / "single-element"
write_json(reference / "metadata.json", metadata_payload())
write_csv(reference / "extracted" / "stresses.csv", stress_rows("100.0"))
write_csv(actual / "extracted" / "stresses.csv", stress_rows("100.0"))
report = compare.compare_metadata(reference / "metadata.json", actual)
self.assertEqual(report["overall_result"], "fail")
self.assertEqual(report["quantities"][0]["classification"], "missing-reference-artifact")
if __name__ == "__main__":
unittest.main()
+107 -37
View File
@@ -19,6 +19,69 @@ def write_json(path: Path, payload: dict):
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def write_text(path: Path, text: str = "ok\n"):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")
def ready_metadata(source_hash: str) -> dict:
return {
"schema_version": "abaqus-user-subroutine-artifact-v1",
"feature_id": "umat",
"model_id": "single-element",
"artifact_status": "ready-for-comparison",
"abaqus": {"version": "2024", "precision": "double"},
"compiler": {"vendor": "Intel oneAPI", "name": "ifx", "version": "2024"},
"subroutine": {
"entry_points": ["UMAT"],
"source_files": [
{
"path": "src/fortran/abaqus/UMAT.for",
"language": "Fortran",
"sha256": source_hash,
}
],
},
"input_file": "model.inp",
"outputs": {
"tails": {
"msg": "job.msg.tail.txt",
"dat": "job.dat.tail.txt",
"log": "job.log.tail.txt",
"sta": "job.sta.tail.txt",
},
"csv": {"stresses": "extracted/stresses.csv"},
},
"extraction": {
"source_odb": "job.odb",
"tool": "Abaqus Python",
"extracted_at": "2026-06-10T00:00:00+09:00",
"csv_directory": "extracted",
"script": "extraction/extract_odb_to_csv.py",
},
}
def create_ready_bundle(root: Path) -> tuple[Path, dict]:
source = root / "src" / "fortran" / "abaqus" / "UMAT.for"
write_text(source, " subroutine umat()\n end\n")
source_hash = hashlib.sha256(source.read_bytes()).hexdigest()
model_dir = root / "references" / "umat" / "single-element"
for name in [
"model.inp",
"job.msg.tail.txt",
"job.dat.tail.txt",
"job.log.tail.txt",
"job.sta.tail.txt",
"extracted/stresses.csv",
"extraction/extract_odb_to_csv.py",
]:
write_text(model_dir / name)
return model_dir, ready_metadata(source_hash)
class ValidateReferenceArtifactsTests(unittest.TestCase):
def test_missing_references_directory_is_valid(self):
validator = load_validate_reference_artifacts()
@@ -52,7 +115,7 @@ class ValidateReferenceArtifactsTests(unittest.TestCase):
"feature_id": "umat",
"model_id": "single-element",
"artifact_status": "ready-for-comparison",
"abaqus": {"version": "2024", "precision": "double", "command": "abaqus job=case user=UMAT.for"},
"abaqus": {"version": "2024", "precision": "double"},
"compiler": {"vendor": "Intel oneAPI", "name": "ifx", "version": "2024"},
"subroutine": {
"entry_points": ["UMAT"],
@@ -61,7 +124,7 @@ class ValidateReferenceArtifactsTests(unittest.TestCase):
"input_file": "model.inp",
"outputs": {
"tails": {"msg": "job.msg.tail.txt", "dat": "job.dat.tail.txt", "log": "job.log.tail.txt"},
"csv": {"stresses": "stresses.csv"},
"csv": {"stresses": "extracted/stresses.csv"},
},
},
)
@@ -69,53 +132,60 @@ class ValidateReferenceArtifactsTests(unittest.TestCase):
errors = validator.validate_root(root)
self.assertTrue(any("missing input_file" in error for error in errors))
self.assertTrue(any("missing output tail" in error for error in errors))
self.assertTrue(any("missing output tail sta" in error for error in errors))
self.assertTrue(any("missing ready-for-comparison key extraction" in error for error in errors))
self.assertTrue(any("missing csv output" in error for error in errors))
self.assertTrue(any("missing source file" in error for error in errors))
def test_ready_for_comparison_accepts_external_bundle_without_abaqus_command(self):
validator = load_validate_reference_artifacts()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
model_dir, payload = create_ready_bundle(root)
payload["extraction"]["odb_sha256_file"] = "result.odb.sha256"
write_text(model_dir / "result.odb.sha256", "0" * 64 + " job.odb\n")
write_json(model_dir / "metadata.json", payload)
self.assertEqual(validator.validate_root(root), [])
def test_ready_for_comparison_checks_source_sha256(self):
validator = load_validate_reference_artifacts()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
source = root / "src" / "fortran" / "abaqus" / "UMAT.for"
source.parent.mkdir(parents=True)
source.write_text(" subroutine umat()\n end\n", encoding="utf-8")
source_hash = hashlib.sha256(source.read_bytes()).hexdigest()
model_dir = root / "references" / "umat" / "single-element"
for name in ["model.inp", "job.msg.tail.txt", "job.dat.tail.txt", "job.log.tail.txt", "stresses.csv"]:
(model_dir / name).parent.mkdir(parents=True, exist_ok=True)
(model_dir / name).write_text("ok\n", encoding="utf-8")
write_json(
model_dir / "metadata.json",
{
"schema_version": "abaqus-user-subroutine-artifact-v1",
"feature_id": "umat",
"model_id": "single-element",
"artifact_status": "ready-for-comparison",
"abaqus": {"version": "2024", "precision": "double", "command": "abaqus job=case user=UMAT.for"},
"compiler": {"vendor": "Intel oneAPI", "name": "ifx", "version": "2024"},
"subroutine": {
"entry_points": ["UMAT"],
"source_files": [
{
"path": "src/fortran/abaqus/UMAT.for",
"language": "Fortran",
"sha256": "0" * len(source_hash),
}
],
},
"input_file": "model.inp",
"outputs": {
"tails": {"msg": "job.msg.tail.txt", "dat": "job.dat.tail.txt", "log": "job.log.tail.txt"},
"csv": {"stresses": "stresses.csv"},
},
},
)
model_dir, payload = create_ready_bundle(root)
payload["subroutine"]["source_files"][0]["sha256"] = "0" * 64
write_json(model_dir / "metadata.json", payload)
errors = validator.validate_root(root)
self.assertTrue(any("sha256 mismatch" in error for error in errors))
def test_ready_for_comparison_rejects_csv_outside_extracted_directory(self):
validator = load_validate_reference_artifacts()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
model_dir, payload = create_ready_bundle(root)
payload["outputs"]["csv"] = {"stresses": "stresses.csv"}
write_text(model_dir / "stresses.csv")
write_json(model_dir / "metadata.json", payload)
errors = validator.validate_root(root)
self.assertTrue(any("csv output stresses must match extracted/*.csv" in error for error in errors))
def test_ready_for_comparison_checks_optional_odb_sha256_file(self):
validator = load_validate_reference_artifacts()
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
model_dir, payload = create_ready_bundle(root)
payload["extraction"]["odb_sha256_file"] = "result.odb.sha256"
write_text(model_dir / "result.odb.sha256", "not-a-sha\n")
write_json(model_dir / "metadata.json", payload)
errors = validator.validate_root(root)
self.assertTrue(any("invalid odb_sha256_file" in error for error in errors))
if __name__ == "__main__":
unittest.main()
+73 -2
View File
@@ -5,6 +5,7 @@ from __future__ import annotations
import hashlib
import json
import re
import sys
from pathlib import Path
@@ -12,6 +13,7 @@ from pathlib import Path
SCHEMA_VERSION = "abaqus-user-subroutine-artifact-v1"
VALID_STATUSES = {"draft", "needs-reference-artifacts", "ready-for-comparison", "blocked"}
READY_STATUS = "ready-for-comparison"
SHA256_RE = re.compile(r"^[0-9a-fA-F]{64}$")
def sha256_file(path: Path) -> str:
@@ -64,6 +66,70 @@ def _require_ready_key(path: Path, payload: dict, *keys: str) -> list[str]:
return []
def _is_safe_relative_path(path_text: str) -> bool:
candidate = Path(path_text)
return not candidate.is_absolute() and ".." not in candidate.parts
def _is_extracted_csv_path(path_text: str) -> bool:
candidate = Path(path_text)
return (
_is_safe_relative_path(path_text)
and len(candidate.parts) == 2
and candidate.parts[0] == "extracted"
and candidate.suffix.lower() == ".csv"
)
def _validate_optional_sha256_file(path: Path, model_dir: Path, key: str, value: object) -> list[str]:
if value is None:
return []
if not isinstance(value, str) or not value:
return [f"{path}: invalid {key}"]
if not _is_safe_relative_path(value):
return [f"{path}: {key} must be a relative path inside the artifact bundle"]
sha_path = model_dir / value
if not sha_path.exists():
return [f"{path}: missing {key}: {value}"]
first_token = sha_path.read_text(encoding="utf-8").strip().split(maxsplit=1)[0]
if not SHA256_RE.match(first_token):
return [f"{path}: invalid {key}: {value}"]
return []
def _validate_extraction(path: Path, model_dir: Path, payload: dict) -> list[str]:
errors: list[str] = []
extraction = payload.get("extraction")
if not isinstance(extraction, dict):
return [f"{path}: extraction provenance must be an object"]
for key in ("source_odb", "tool", "extracted_at", "csv_directory"):
if not extraction.get(key):
errors.append(f"{path}: missing extraction provenance key {key}")
csv_directory = extraction.get("csv_directory")
if isinstance(csv_directory, str) and csv_directory != "extracted":
errors.append(f"{path}: extraction.csv_directory must be extracted")
script = extraction.get("script")
if script is not None:
if not isinstance(script, str) or not script:
errors.append(f"{path}: invalid extraction script")
elif not _is_safe_relative_path(script):
errors.append(f"{path}: extraction script must be a relative path inside the artifact bundle")
elif not (model_dir / script).exists():
errors.append(f"{path}: missing extraction script: {script}")
odb_sha256 = extraction.get("odb_sha256")
if odb_sha256 is not None and (not isinstance(odb_sha256, str) or not SHA256_RE.match(odb_sha256)):
errors.append(f"{path}: invalid odb_sha256")
errors.extend(_validate_optional_sha256_file(path, model_dir, "odb_sha256_file", extraction.get("odb_sha256_file")))
return errors
def _validate_ready_files(path: Path, root: Path, payload: dict) -> list[str]:
errors: list[str] = []
model_dir = path.parent
@@ -71,7 +137,6 @@ def _validate_ready_files(path: Path, root: Path, payload: dict) -> list[str]:
for keys in (
("abaqus", "version"),
("abaqus", "precision"),
("abaqus", "command"),
("compiler", "vendor"),
("compiler", "name"),
("compiler", "version"),
@@ -80,6 +145,7 @@ def _validate_ready_files(path: Path, root: Path, payload: dict) -> list[str]:
("input_file",),
("outputs", "tails"),
("outputs", "csv"),
("extraction",),
):
errors.extend(_require_ready_key(path, payload, *keys))
@@ -89,7 +155,7 @@ def _validate_ready_files(path: Path, root: Path, payload: dict) -> list[str]:
tails = _nested(payload, "outputs", "tails")
if isinstance(tails, dict):
for key in ("msg", "dat", "log"):
for key in ("msg", "dat", "log", "sta"):
tail_path = tails.get(key)
if not isinstance(tail_path, str) or not tail_path:
errors.append(f"{path}: missing output tail {key}")
@@ -103,9 +169,14 @@ def _validate_ready_files(path: Path, root: Path, payload: dict) -> list[str]:
for key, csv_path in csv_outputs.items():
if not isinstance(csv_path, str) or not csv_path:
errors.append(f"{path}: missing csv output {key}")
elif not _is_extracted_csv_path(csv_path):
errors.append(f"{path}: csv output {key} must match extracted/*.csv")
elif not (model_dir / csv_path).exists():
errors.append(f"{path}: missing csv output {key}: {csv_path}")
if "extraction" in payload:
errors.extend(_validate_extraction(path, model_dir, payload))
source_files = _nested(payload, "subroutine", "source_files")
if isinstance(source_files, list):
if not source_files: