Files
C.E.L_Slide_test2/tests/integration/test_multi_mdx_regression.py
kyeongmin c59864eb9a
Some checks failed
Multi-MDX Regression (IMP-91) / multi-mdx-regression (push) Failing after 31s
feat(#91): IMP-91 u2~u15 multi-mdx regression CI suite + status-board auto-update
- u2~u5: tests/integration/test_multi_mdx_regression.py — MDX_SET=(01..05)
  cached integration runs + status/structural/visual snapshots +
  full_mdx_coverage assertion (9 snapshots populated for 01-05).
- u6~u11: F0 normalize / F1 V4 ranking / F2 slot_payload /
  F3 classifier-only AI / F4 layout / F5 final.html axis per MDX_SET.
- u12: pyproject.toml — pytest-json-report>=1.5 in dev extras.
- u13: .github/workflows/multi-mdx-regression.yml — pytest+artifact CI.
- u14: scripts/update_status_board.py + tests/scripts/test_update_status_board.py
  — idempotent JSON marker updater (3 unit tests pass).
- u15: PHASE-Z-PIPELINE-STATUS-BOARD.md — 30 F0-F5 × mdx01-05 markers
  initialized `?` + workflow wiring.

Stage 4 verify: 59/59 PASS targeted (smoke 6 + updater 3 + integration 50),
386/386 PASS regression umbrella, 0 failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 07:01:58 +09:00

574 lines
24 KiB
Python

"""IMP-#91 u2 — multi-mdx regression CI scaffold (mdx 01-05 acceptance set).
Session-scoped subprocess cache that runs each MDX acceptance fixture
exactly once. u3-u11 extend this module with per-axis assertions
(structural / visual / coverage / F0-F5). u2 alone pins the cache
contract: each mdx in ``MDX_SET`` produces a run directory under
``data/runs/<run_id>/phase_z2/`` containing the step JSONs and
``final.html`` that downstream parametrized tests will read.
[[feedback_validation_first_for_closed_issues]] — fresh subprocess per
session, no frozen artifacts. [[feedback_artifact_status_naming]] — the
overall status (PASS / RENDERED_WITH_VISUAL_REGRESSION /
PARTIAL_COVERAGE / EMPTY_SHELL_NO_CONTENT) is asserted in u3-u5; u2
only pins the artifact-production contract.
"""
from __future__ import annotations
import json
import re
import subprocess
import sys
import uuid
from pathlib import Path
from typing import Dict, List, NamedTuple
import pytest
REPO_ROOT = Path(__file__).resolve().parents[2]
SAMPLES_DIR = REPO_ROOT / "samples" / "mdx_batch"
RUNS_DIR = REPO_ROOT / "data" / "runs"
SNAPSHOTS_DIR = Path(__file__).resolve().parent / "__snapshots__"
MDX_SET = ("01", "02", "03", "04", "05")
class PipelineRun(NamedTuple):
mdx_id: str
run_id: str
returncode: int
stdout: str
stderr: str
run_dir: Path
@pytest.fixture(scope="session")
def multi_mdx_runs() -> Dict[str, PipelineRun]:
"""Run the Phase Z pipeline once per mdx in ``MDX_SET`` (session-cached)."""
cache: Dict[str, PipelineRun] = {}
for mdx_id in MDX_SET:
run_id = f"imp91_{mdx_id}_{uuid.uuid4().hex[:8]}"
cp = subprocess.run(
[
sys.executable,
"-m",
"src.phase_z2_pipeline",
str(SAMPLES_DIR / f"{mdx_id}.mdx"),
run_id,
],
capture_output=True,
text=True,
timeout=360,
cwd=str(REPO_ROOT),
)
cache[mdx_id] = PipelineRun(
mdx_id=mdx_id,
run_id=run_id,
returncode=cp.returncode,
stdout=cp.stdout,
stderr=cp.stderr,
run_dir=RUNS_DIR / run_id / "phase_z2",
)
return cache
@pytest.mark.integration
@pytest.mark.parametrize("mdx_id", MDX_SET)
def test_pipeline_run_produces_step20_status(
mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun]
) -> None:
"""Cache contract: every mdx subprocess produces step20_slide_status.json."""
run = multi_mdx_runs[mdx_id]
status_path = run.run_dir / "steps" / "step20_slide_status.json"
assert status_path.is_file(), (
f"{mdx_id}.mdx run {run.run_id} did not produce {status_path} "
f"(returncode={run.returncode}); stderr tail: {run.stderr[-800:]}"
)
@pytest.mark.integration
@pytest.mark.parametrize("mdx_id", MDX_SET)
def test_structural_snapshot_matches(
mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun]
) -> None:
"""u3 — pin observed overall + per-zone selected_template_id against snapshot."""
snapshot = json.loads((SNAPSHOTS_DIR / "structural.json").read_text(encoding="utf-8"))
expected = snapshot[mdx_id]
run = multi_mdx_runs[mdx_id]
status = json.loads(
(run.run_dir / "steps" / "step20_slide_status.json").read_text(encoding="utf-8")
)["data"]
frame_sel = json.loads(
(run.run_dir / "steps" / "step09_frame_selection.json").read_text(encoding="utf-8")
)["data"]
zones = frame_sel.get("per_zone", [])
actual_zones = [
{"position": z.get("position"), "selected_template_id": z.get("selected_template_id")}
for z in zones
]
assert status.get("overall") == expected["overall"], (
f"{mdx_id}.mdx overall drift: expected {expected['overall']!r}, "
f"got {status.get('overall')!r}"
)
assert len(actual_zones) == expected["zone_count"], (
f"{mdx_id}.mdx zone_count drift: expected {expected['zone_count']}, "
f"got {len(actual_zones)} (zones={actual_zones})"
)
assert actual_zones == expected["zones"], (
f"{mdx_id}.mdx zone topology drift: expected {expected['zones']}, "
f"got {actual_zones}"
)
@pytest.mark.integration
@pytest.mark.parametrize("mdx_id", MDX_SET)
def test_visual_snapshot_matches(
mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun]
) -> None:
"""u4 — pin observed step14 visual_check overflow/clip against snapshot."""
snapshot = json.loads((SNAPSHOTS_DIR / "visual.json").read_text(encoding="utf-8"))
expected = snapshot[mdx_id]
run = multi_mdx_runs[mdx_id]
visual = json.loads(
(run.run_dir / "steps" / "step14_visual_check.json").read_text(encoding="utf-8")
)["data"]
slide_overflowed = visual.get("slide", {}).get("overflowed")
slide_body_overflowed = visual.get("slide_body", {}).get("overflowed")
visual_passed = visual.get("passed")
actual_zones = [
{
"position": z.get("position"),
"template_id": z.get("template_id"),
"overflowed": z.get("overflowed"),
"clipped_inner_count": len(z.get("clipped_inner") or []),
}
for z in visual.get("zones", [])
]
assert slide_overflowed == expected["slide_overflowed"], (
f"{mdx_id}.mdx slide.overflowed drift: expected {expected['slide_overflowed']}, "
f"got {slide_overflowed}"
)
assert slide_body_overflowed == expected["slide_body_overflowed"], (
f"{mdx_id}.mdx slide_body.overflowed drift: expected {expected['slide_body_overflowed']}, "
f"got {slide_body_overflowed}"
)
assert visual_passed == expected["passed"], (
f"{mdx_id}.mdx visual_check.passed drift: expected {expected['passed']}, "
f"got {visual_passed}"
)
assert actual_zones == expected["zones"], (
f"{mdx_id}.mdx zone visual drift: expected {expected['zones']}, "
f"got {actual_zones}"
)
@pytest.mark.integration
@pytest.mark.parametrize("mdx_id", MDX_SET)
def test_coverage_snapshot_matches(
mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun]
) -> None:
"""u5 — pin observed full_mdx_coverage + section_id parity against snapshot."""
snapshot = json.loads((SNAPSHOTS_DIR / "coverage.json").read_text(encoding="utf-8"))
expected = snapshot[mdx_id]
run = multi_mdx_runs[mdx_id]
status = json.loads(
(run.run_dir / "steps" / "step20_slide_status.json").read_text(encoding="utf-8")
)["data"]
assert status.get("rendered") == expected["rendered"], (
f"{mdx_id}.mdx rendered drift: expected {expected['rendered']}, "
f"got {status.get('rendered')}"
)
assert status.get("visual_check_passed") == expected["visual_check_passed"], (
f"{mdx_id}.mdx visual_check_passed drift: expected {expected['visual_check_passed']}, "
f"got {status.get('visual_check_passed')}"
)
assert status.get("full_mdx_coverage") == expected["full_mdx_coverage"], (
f"{mdx_id}.mdx full_mdx_coverage drift: expected {expected['full_mdx_coverage']}, "
f"got {status.get('full_mdx_coverage')}"
)
assert sorted(status.get("aligned_section_ids") or []) == sorted(expected["aligned_section_ids"]), (
f"{mdx_id}.mdx aligned_section_ids drift: expected {expected['aligned_section_ids']}, "
f"got {status.get('aligned_section_ids')}"
)
assert sorted(status.get("covered_section_ids") or []) == sorted(expected["covered_section_ids"]), (
f"{mdx_id}.mdx covered_section_ids drift: expected {expected['covered_section_ids']}, "
f"got {status.get('covered_section_ids')}"
)
assert sorted(status.get("filtered_section_ids") or []) == sorted(expected["filtered_section_ids"]), (
f"{mdx_id}.mdx filtered_section_ids drift: expected {expected['filtered_section_ids']}, "
f"got {status.get('filtered_section_ids')}"
)
@pytest.mark.integration
@pytest.mark.parametrize("mdx_id", MDX_SET)
def test_normalize_snapshot_matches(
mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun]
) -> None:
"""u6 — F0 normalize: pin observed step02_normalized shape per mdx."""
snapshot = json.loads((SNAPSHOTS_DIR / "normalize.json").read_text(encoding="utf-8"))
expected = snapshot[mdx_id]
run = multi_mdx_runs[mdx_id]
raw = json.loads(
(run.run_dir / "steps" / "step02_normalized.json").read_text(encoding="utf-8")
)
d = raw["data"]
diag = d.get("stage0_adapter_diagnostics", {}) or {}
assets = d.get("stage0_normalized_assets", {}) or {}
actual = {
"step_num": raw.get("step_num"),
"step_status": raw.get("step_status"),
"pipeline_path_connected": raw.get("pipeline_path_connected"),
"sections_count": d.get("sections_count"),
"section_ids": [s.get("section_id") for s in d.get("sections", [])],
"orphans_count": len(d.get("orphans") or []),
"details_count": len(d.get("details") or []),
"adapter_enabled": diag.get("enabled"),
"adapter_used": diag.get("used"),
"assets_popups_count": len(assets.get("popups") or []),
"assets_images_count": len(assets.get("images") or []),
"assets_tables_count": len(assets.get("tables") or []),
"slide_title_nonempty": bool(d.get("slide_title")),
"slide_footer_nonempty": bool(d.get("slide_footer")),
}
for key, want in expected.items():
got = actual[key]
assert got == want, (
f"{mdx_id}.mdx normalize.{key} drift: expected {want!r}, got {got!r}"
)
assert len(d.get("sections", [])) == expected["sections_count"], (
f"{mdx_id}.mdx sections list length mismatch with sections_count: "
f"sections_count={expected['sections_count']}, got len(sections)={len(d.get('sections', []))}"
)
for sect in d.get("sections", []):
assert (sect.get("raw_content_length") or 0) > 0, (
f"{mdx_id}.mdx section {sect.get('section_id')!r} has empty raw_content "
f"(length={sect.get('raw_content_length')!r}) — normalize lost content"
)
@pytest.mark.integration
@pytest.mark.parametrize("mdx_id", MDX_SET)
def test_v4_ranking_snapshot_matches(
mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun]
) -> None:
"""u7 — F1 V4 ranking: pin observed step05_v4_evidence per mdx.
Pins ``v4_source`` (POSIX-normalized for cross-platform stability),
``aligned_section_ids``, and per-section
``{section_id, candidate_status, candidates: [{template_id, label, confidence}]}``
in pipeline-emitted order. Confidence stays at the current 4-decimal
rounding emitted by the V4 yaml; drift any axis fails loudly so a
re-baseline is a conscious commit, not a silent shift.
"""
snapshot = json.loads((SNAPSHOTS_DIR / "v4_ranking.json").read_text(encoding="utf-8"))
expected = snapshot[mdx_id]
run = multi_mdx_runs[mdx_id]
raw = json.loads(
(run.run_dir / "steps" / "step05_v4_evidence.json").read_text(encoding="utf-8")
)
data = raw["data"]
actual_v4_source = str(data.get("v4_source") or "").replace("\\", "/")
actual_sections = [
{
"section_id": ev.get("section_id"),
"candidate_status": ev.get("candidate_status"),
"candidates": [
{
"template_id": c.get("template_id"),
"label": c.get("label"),
"confidence": c.get("confidence"),
}
for c in (ev.get("v4_candidates") or [])
],
}
for ev in (data.get("evidence_per_section") or [])
]
assert actual_v4_source == expected["v4_source"], (
f"{mdx_id}.mdx v4_source drift: expected {expected['v4_source']!r}, "
f"got {actual_v4_source!r}"
)
assert data.get("aligned_section_ids") == expected["aligned_section_ids"], (
f"{mdx_id}.mdx aligned_section_ids drift: expected {expected['aligned_section_ids']}, "
f"got {data.get('aligned_section_ids')}"
)
assert actual_sections == expected["sections"], (
f"{mdx_id}.mdx V4 ranking drift: expected {expected['sections']}, "
f"got {actual_sections}"
)
def _slot_payload_zone_shape(zone: dict) -> dict:
"""Reduce a step12 per_zone entry to a content-agnostic structural shape.
Pins builder + slot names + per-slot list cardinality + dict sub-list
counts + string non-empty flags. MDX text edits don't drift this; a
builder swap, slot rename, missing slot, or list-cardinality change
does. Sub-dict shape pins ``sections`` length only — deeper field
pinning would require a fresh u8'-axis snapshot.
"""
sp = zone.get("slot_payload") or {}
slot_names = sorted(sp.keys())
list_slot_counts: dict = {}
dict_slot_sub_counts: dict = {}
string_slot_nonempty: dict = {}
for name in slot_names:
value = sp[name]
if isinstance(value, list):
list_slot_counts[name] = len(value)
elif isinstance(value, dict):
sub: dict = {}
for sub_key, sub_val in value.items():
if isinstance(sub_val, list):
sub[sub_key] = len(sub_val)
dict_slot_sub_counts[name] = sub
elif isinstance(value, str):
string_slot_nonempty[name] = bool(value.strip())
return {
"position": zone.get("position"),
"template_id": zone.get("template_id"),
"builder": zone.get("builder"),
"slot_names": slot_names,
"list_slot_counts": list_slot_counts,
"dict_slot_sub_counts": dict_slot_sub_counts,
"string_slot_nonempty": string_slot_nonempty,
}
_AI_UNIT_KEYS = (
"source_section_ids", "label", "route_hint", "provisional",
"ai_called", "skip_reason", "apply_status",
)
@pytest.mark.integration
@pytest.mark.parametrize("mdx_id", MDX_SET)
def test_ai_classifier_snapshot_matches(
mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun]
) -> None:
"""u9 — F3 classifier-only AI: pin step12/15/16/18 classifier signals.
[[feedback_ai_isolation_contract]] / [[feedback_demo_env_toggle_policy]]
central invariant: ``ai_called`` MUST stay False per unit by default;
activation requires explicit .env toggle, never pipeline default.
"""
snapshot = json.loads((SNAPSHOTS_DIR / "ai_classifier.json").read_text(encoding="utf-8"))
expected = snapshot[mdx_id]
steps = multi_mdx_runs[mdx_id].run_dir / "steps"
ai = json.loads((steps / "step12_ai_repair.json").read_text(encoding="utf-8"))["data"]
fit = json.loads((steps / "step15_fit_classification.json").read_text(encoding="utf-8"))["data"]
router = json.loads((steps / "step16_router_decision.json").read_text(encoding="utf-8"))["data"]
failure = json.loads((steps / "step18_failure_classification.json").read_text(encoding="utf-8"))["data"]
units = [{k: u.get(k) for k in _AI_UNIT_KEYS} for u in (ai.get("per_unit") or [])]
actual = {
"units": units,
"coverage_invariant_status": (ai.get("coverage_invariant") or {}).get("status"),
"fit_visual_check_passed": fit.get("visual_check_passed"),
"fit_classifications_count": len(fit.get("classifications") or []),
"fit_categories_seen": fit.get("categories_seen") or [],
"router_active": router.get("router_active"),
"router_routed_count": router.get("routed_count"),
"router_v4_fallback_used_count": (router.get("v4_fallback_summary") or {}).get("fallback_used_count"),
"failure_type": failure.get("failure_type"),
}
for key, want in expected.items():
assert actual[key] == want, (
f"{mdx_id}.mdx ai_classifier.{key} drift: expected {want!r}, got {actual[key]!r}"
)
breaches = [u for u in units if u["ai_called"] is not False]
assert not breaches, (
f"{mdx_id}.mdx F3 AI-isolation breach (ai_called must be False by default): {breaches}"
)
def _layout_zone_shape(zone: dict) -> dict:
"""Reduce a step08 per_zone_plan entry to a content-agnostic F4 layout shape."""
sub_zones = zone.get("sub_zones_planned") or []
return {
"position": zone.get("position"),
"min_height_px": zone.get("min_height_px"),
"frame_cardinality_strict": zone.get("frame_cardinality_strict"),
"sub_zones_count": len(sub_zones),
"region_layout_candidates": zone.get("region_layout_candidates") or [],
}
@pytest.mark.integration
@pytest.mark.parametrize("mdx_id", MDX_SET)
def test_layout_snapshot_matches(
mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun]
) -> None:
"""u10 — F4 layout: pin step07_layout + step08_zone_region_ratios per mdx.
Pins the layout decision path (``layout_preset`` /
``auto_layout_preset`` / ``layout_override_applied`` /
``layout_candidates`` / ``computation``) + planning geometry
(``heights_px`` / ``widths_px`` / ``ratios`` / ``width_ratios``) +
per-zone planning shape (``position`` / ``min_height_px`` /
``frame_cardinality_strict`` / ``sub_zones_count`` /
``region_layout_candidates``). ``step_status='partial'`` is the
Step 7/8 schema-lock marker (region-level ratio + count-based v0).
mdx 03 is the only ``layout_override_applied=True`` case (vertical-2
user override per project_mdx03_frame_lock 2026-05-15 lock); drift
here flips F4 layer-A axis. mdx 04 ``top`` zone pins ``None`` for
min_height_px + frame_cardinality_strict (no frame cardinality on
the top zone — observed current state, not invented).
"""
snapshot = json.loads((SNAPSHOTS_DIR / "layout.json").read_text(encoding="utf-8"))
expected = snapshot[mdx_id]
run = multi_mdx_runs[mdx_id]
s7 = json.loads(
(run.run_dir / "steps" / "step07_layout.json").read_text(encoding="utf-8")
)
s8 = json.loads(
(run.run_dir / "steps" / "step08_zone_region_ratios.json").read_text(encoding="utf-8")
)
d7 = s7.get("data") or {}
d8 = s8.get("data") or {}
css = d7.get("layout_css") or {}
actual = {
"step7_step_status": s7.get("step_status"),
"step7_pipeline_path_connected": s7.get("pipeline_path_connected"),
"layout_preset": d7.get("layout_preset"),
"auto_layout_preset": d7.get("auto_layout_preset"),
"layout_override_applied": d7.get("layout_override_applied"),
"zones_count": d7.get("zones_count"),
"unit_count": d7.get("unit_count"),
"layout_candidates": d7.get("layout_candidates") or [],
"computation": css.get("computation"),
"dynamic_rows": css.get("dynamic_rows"),
"dynamic_cols": css.get("dynamic_cols"),
"heights_px": css.get("heights_px"),
"widths_px": css.get("widths_px"),
"ratios": css.get("ratios"),
"width_ratios": css.get("width_ratios"),
"step8_step_status": s8.get("step_status"),
"step8_pipeline_path_connected": s8.get("pipeline_path_connected"),
"zone_heights_px_planned": d8.get("zone_heights_px_planned"),
"zone_widths_px_planned": d8.get("zone_widths_px_planned"),
"zone_col_ratios_planned": d8.get("zone_col_ratios_planned"),
"per_zone_layout_shape": [
_layout_zone_shape(z) for z in (d8.get("per_zone_plan") or [])
],
}
for key, want in expected.items():
got = actual[key]
assert got == want, (
f"{mdx_id}.mdx layout.{key} drift: expected {want!r}, got {got!r}"
)
@pytest.mark.integration
@pytest.mark.parametrize("mdx_id", MDX_SET)
def test_slot_payload_snapshot_matches(
mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun]
) -> None:
"""u8 — F2 slot_payload: pin observed step12_slot_payload per_zone shape per mdx.
Snapshot pins content-agnostic structural shape (builder + slot
names + list cardinality + dict sub-list counts + string non-empty
flags), not literal payload text. MDX wording tweaks won't drift
this; builder swap, slot rename, slot count drift, or __empty__
transitions will. Empty zones must have ``builder is None`` and no
slots — this is the IMP-87 empty_shell honesty contract surface for
F2.
"""
snapshot = json.loads((SNAPSHOTS_DIR / "slot_payload.json").read_text(encoding="utf-8"))
expected = snapshot[mdx_id]
run = multi_mdx_runs[mdx_id]
raw = json.loads(
(run.run_dir / "steps" / "step12_slot_payload.json").read_text(encoding="utf-8")
)
per_zone = raw["data"].get("per_zone") or []
actual = [_slot_payload_zone_shape(z) for z in per_zone]
assert len(actual) == len(expected), (
f"{mdx_id}.mdx step12 zone_count drift: expected {len(expected)}, "
f"got {len(actual)} (positions={[z.get('position') for z in actual]})"
)
for idx, (act, exp) in enumerate(zip(actual, expected)):
assert act == exp, (
f"{mdx_id}.mdx step12 zone[{idx}] ({exp.get('position')!r}) shape drift: "
f"expected {exp}, got {act}"
)
_ZONE_TAG_RE = re.compile(
r'<div[^>]*\sdata-zone-position="([^"]+)"[^>]*\sdata-template-id="([^"]+)"',
re.IGNORECASE,
)
_SLIDE_ROOT_RE = re.compile(r'<div\s+class="slide"\s+data-page="1"')
_TITLE_RE = re.compile(r'<title>([^<]*)</title>', re.IGNORECASE)
def _extract_html_zone_topology(html: str) -> List[dict]:
"""Extract (position, template_id) pairs in document order from final.html."""
return [
{"position": m.group(1), "template_id": m.group(2)}
for m in _ZONE_TAG_RE.finditer(html)
]
@pytest.mark.integration
@pytest.mark.parametrize("mdx_id", MDX_SET)
def test_final_html_snapshot_matches(
mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun]
) -> None:
"""u11 — F5 final.html extraction: pin step13_render metadata + on-disk HTML structure.
Cross-snapshot parity gate: ``html_zone_topology`` (extracted from
final.html via ``data-zone-position`` / ``data-template-id`` markers)
MUST equal step12 slot_payload (u8) ``(position, template_id)``
sequence — Jinja2 renders from step12, not step09, so this is the
correct upstream parity (step09 selection vs step12 ``__empty__``
collapse is intentional per IMP-87 honesty gate and surfaces in u8).
Drift between final.html and slot_payload = render pipeline
disconnect. ``final.html`` on-disk size also MUST equal step13's
reported ``final_html_size_bytes`` — byte parity proves no
truncation / no double-write race.
"""
snapshot = json.loads((SNAPSHOTS_DIR / "final_html.json").read_text(encoding="utf-8"))
expected = snapshot[mdx_id]
run = multi_mdx_runs[mdx_id]
raw13 = json.loads(
(run.run_dir / "steps" / "step13_render.json").read_text(encoding="utf-8")
)
d13 = raw13.get("data") or {}
ri = d13.get("render_inputs") or {}
final_path = run.run_dir / "final.html"
assert final_path.is_file(), f"{mdx_id}.mdx final.html missing at {final_path}"
html = final_path.read_text(encoding="utf-8")
title_match = _TITLE_RE.search(html)
html_title = title_match.group(1).strip() if title_match else ""
html_topology = _extract_html_zone_topology(html)
actual = {
"step13_status": raw13.get("step_status"),
"step13_pipeline_path_connected": raw13.get("pipeline_path_connected"),
"render_inputs_zones_count": ri.get("zones_count"),
"render_inputs_layout_preset": ri.get("layout_preset"),
"render_inputs_slide_title_nonempty": bool((ri.get("slide_title") or "").strip()),
"render_inputs_slide_footer_nonempty": bool((ri.get("slide_footer") or "").strip()),
"html_title_matches_render_input": html_title == (ri.get("slide_title") or "").strip(),
"html_slide_root_count": len(_SLIDE_ROOT_RE.findall(html)),
"html_slide_footer_present": '<div class="slide-footer">' in html,
"html_zone_count": len(html_topology),
"html_zone_topology": html_topology,
"final_html_size_matches_step13_reported": (
final_path.stat().st_size == d13.get("final_html_size_bytes")
),
}
for key, want in expected.items():
assert actual[key] == want, (
f"{mdx_id}.mdx final_html.{key} drift: expected {want!r}, got {actual[key]!r}"
)
slot_payload = json.loads(
(SNAPSHOTS_DIR / "slot_payload.json").read_text(encoding="utf-8")
)[mdx_id]
slot_topology = [
{"position": z["position"], "template_id": z["template_id"]}
for z in slot_payload
]
assert html_topology == slot_topology, (
f"{mdx_id}.mdx render pipeline disconnect: final.html zone topology "
f"{html_topology} does not match step12 slot_payload topology "
f"{slot_topology} (pinned in slot_payload.json u8)"
)