"""Catalog ↔ partial ↔ builder invariant audit CLI (IMP-#85 u3a / u3b). Offline audit of `templates/phase_z2/catalog/frame_contracts.yaml` against the on-disk frame partials and the runtime `PAYLOAD_BUILDERS` registry. Reports diff surface so first-fix iteration sees the entire catalog drift, not just the first failure (matches the boot-time invariant's aggregation behavior in `_check_catalog_builder_invariant`). Invariants (scope-locked per Stage 2): I1 partial existence — `templates/phase_z2/families/{template_id}.html` must exist for live (non-VP) contracts. I2 builder declared — live contracts must declare a non-empty `payload.builder`. I3 builder registered — declared builders must be members of `src.phase_z2_mapper.PAYLOAD_BUILDERS`. I4 slot_payload refs — every key generated by the contract's builder must appear as a `slot_payload.` reference in the partial. Direction A only (dead generated key). Skipped when the partial uses dynamic bracket access (`slot_payload[...]`) — those refs cannot be resolved statically; the relevant generated keys are presumed reachable via the dynamic form. `visual_pending: true` contracts are skipped for I1–I4 (data-driven from catalog, no hard-coded frame allow-list; matches u2 invariant scope). Exit codes: 0 — all invariants pass on live (non-VP) contracts. 1 — one or more violations reported. Usage:: python scripts/audit_frame_invariants.py python scripts/audit_frame_invariants.py --catalog --partials-dir """ from __future__ import annotations import argparse import re import sys from pathlib import Path from typing import Iterable REPO_ROOT = Path(__file__).resolve().parent.parent if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT)) import yaml DEFAULT_CATALOG_PATH = ( REPO_ROOT / "templates" / "phase_z2" / "catalog" / "frame_contracts.yaml" ) DEFAULT_PARTIALS_DIR = REPO_ROOT / "templates" / "phase_z2" / "families" def _format_path(path: Path) -> str: try: return str(path.relative_to(REPO_ROOT)) except ValueError: return str(path) def _is_visual_pending(contract: dict) -> bool: return contract.get("visual_pending") is True def _iter_live_contracts(catalog: dict) -> Iterable[tuple[str, dict]]: for template_id, contract in catalog.items(): if not isinstance(contract, dict): continue if _is_visual_pending(contract): continue yield template_id, contract def check_i1_partial_existence( catalog: dict, partials_dir: Path ) -> list[str]: """I1 — Live contracts must have `families/{template_id}.html` on disk.""" violations: list[str] = [] for template_id, _contract in _iter_live_contracts(catalog): partial_path = partials_dir / f"{template_id}.html" if not partial_path.is_file(): violations.append( f"I1 partial-missing: contract '{template_id}' has no " f"partial file at {_format_path(partial_path)}." ) return violations def check_i2_builder_declared(catalog: dict) -> list[str]: """I2 — Live contracts must declare a non-empty `payload.builder`.""" violations: list[str] = [] for template_id, contract in _iter_live_contracts(catalog): payload = contract.get("payload") or {} if not isinstance(payload, dict): violations.append( f"I2 builder-undeclared: contract '{template_id}' has " f"non-dict payload (type={type(payload).__name__})." ) continue builder_name = payload.get("builder") if not builder_name: violations.append( f"I2 builder-undeclared: contract '{template_id}' is " f"missing payload.builder." ) return violations def check_i3_builder_registered( catalog: dict, registered_builders: set[str] ) -> list[str]: """I3 — Declared builders must be members of PAYLOAD_BUILDERS registry.""" violations: list[str] = [] for template_id, contract in _iter_live_contracts(catalog): payload = contract.get("payload") or {} if not isinstance(payload, dict): continue builder_name = payload.get("builder") if not builder_name: continue if builder_name not in registered_builders: violations.append( f"I3 builder-unregistered: contract '{template_id}' " f"references payload.builder='{builder_name}' not in " f"PAYLOAD_BUILDERS." ) return violations _SLOT_PAYLOAD_DOT_RE = re.compile(r"slot_payload\.([A-Za-z_][A-Za-z0-9_]*)") _SLOT_PAYLOAD_BRACKET_RE = re.compile(r"slot_payload\s*\[") def extract_static_slot_refs(partial_text: str) -> set[str]: """Return the set of `slot_payload.` dot-access references.""" return set(_SLOT_PAYLOAD_DOT_RE.findall(partial_text)) def partial_uses_dynamic_slot_access(partial_text: str) -> bool: """True if the partial dereferences `slot_payload[...]` (dynamic key).""" return bool(_SLOT_PAYLOAD_BRACKET_RE.search(partial_text)) def expected_payload_keys(contract: dict) -> set[str]: """Statically compute the set of payload keys the contract's builder produces. Mirrors `src.phase_z2_mapper`'s registered builders (IMP-#85 u3b). Returns an empty set when the builder is unknown — I3 already flags that drift. """ payload = contract.get("payload") or {} if not isinstance(payload, dict): return set() keys: set[str] = set() title_spec = payload.get("title") if isinstance(title_spec, dict) and title_spec.get("source"): keys.add("title") builder = payload.get("builder") options = payload.get("builder_options") or {} if not isinstance(options, dict): options = {} if builder == "items_with_role": array_root = options.get("array_root") if array_root: keys.add(array_root) elif builder == "process_product_pair": for col in options.get("columns") or []: if not isinstance(col, dict): continue if col.get("title_to"): keys.add(col["title_to"]) if col.get("body_to"): keys.add(col["body_to"]) elif builder == "quadrant_flat_slots": pad_to = int(options.get("pad_to", 4)) label_key = options.get("label_key_pattern", "quadrant_{n}_label") body_key = options.get("body_key_pattern", "quadrant_{n}_body") for n in range(1, pad_to + 1): keys.add(label_key.format(n=n)) keys.add(body_key.format(n=n)) elif builder == "cycle_intersect_3": pad_to = int(options.get("pad_to", 3)) label_key = options.get("label_key_pattern", "circle_{n}_label") for n in range(1, pad_to + 1): keys.add(label_key.format(n=n)) keys.add("intersection") elif builder == "compare_table_2col": keys.update({"col_a_label", "col_b_label", "rows"}) elif builder == "paired_rows_4x2_slots": label_key = options.get("label_key_pattern", "row_{r}_{side}_label") body_key = options.get("body_key_pattern", "row_{r}_{side}_body") rows = int(options.get("rows", 4)) sides = options.get("sides", ["left", "right"]) or [] for r in range(1, rows + 1): for side in sides: keys.add(label_key.format(r=r, side=side)) keys.add(body_key.format(r=r, side=side)) return keys def check_i4_slot_payload_refs( catalog: dict, partials_dir: Path, registered_builders: set[str], ) -> list[str]: """I4 — every generated payload key must be referenced by the partial. Direction A only (dead key). Skipped when the partial uses dynamic bracket access (`slot_payload[...]`) — generated keys are presumed reached via the dynamic form and cannot be resolved statically. Contracts already failing I1 (missing partial) or I3 (unregistered builder) are skipped so the same drift is not double-reported. """ violations: list[str] = [] for template_id, contract in _iter_live_contracts(catalog): payload = contract.get("payload") or {} if not isinstance(payload, dict): continue builder_name = payload.get("builder") if not builder_name or builder_name not in registered_builders: continue partial_path = partials_dir / f"{template_id}.html" if not partial_path.is_file(): continue partial_text = partial_path.read_text(encoding="utf-8") if partial_uses_dynamic_slot_access(partial_text): continue static_refs = extract_static_slot_refs(partial_text) expected = expected_payload_keys(contract) orphans = sorted(expected - static_refs) for key in orphans: violations.append( f"I4 generated-key-orphan: contract '{template_id}' builder " f"'{builder_name}' produces payload key '{key}' but partial " f"never references slot_payload.{key}." ) return violations def run_audit( catalog_path: Path = DEFAULT_CATALOG_PATH, partials_dir: Path = DEFAULT_PARTIALS_DIR, ) -> list[str]: """Load catalog + registry and aggregate I1-I4 violations. Registry is imported here (not at module import) so the script can be inspected without triggering the boot-time catalog invariant. """ from src.phase_z2_mapper import PAYLOAD_BUILDERS catalog = yaml.safe_load(catalog_path.read_text(encoding="utf-8")) or {} registered = set(PAYLOAD_BUILDERS.keys()) violations: list[str] = [] violations.extend(check_i1_partial_existence(catalog, partials_dir)) violations.extend(check_i2_builder_declared(catalog)) violations.extend(check_i3_builder_registered(catalog, registered)) violations.extend(check_i4_slot_payload_refs(catalog, partials_dir, registered)) return violations def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( description="Audit Phase Z-2 catalog ↔ partials ↔ builder registry." ) parser.add_argument( "--catalog", type=Path, default=DEFAULT_CATALOG_PATH, help="Path to frame_contracts.yaml", ) parser.add_argument( "--partials-dir", type=Path, default=DEFAULT_PARTIALS_DIR, help="Directory containing families/{template_id}.html partials", ) args = parser.parse_args(argv) violations = run_audit(args.catalog, args.partials_dir) if not violations: print("audit_frame_invariants: PASS (I1-I4 clean on live contracts).") return 0 print( f"audit_frame_invariants: FAIL ({len(violations)} violation(s)):" ) for v in violations: print(f" - {v}") return 1 if __name__ == "__main__": sys.exit(main())