"""IMP-#91 u2 — multi-mdx regression CI scaffold (mdx 01-05 acceptance set). Session-scoped subprocess cache that runs each MDX acceptance fixture exactly once. u3-u11 extend this module with per-axis assertions (structural / visual / coverage / F0-F5). u2 alone pins the cache contract: each mdx in ``MDX_SET`` produces a run directory under ``data/runs//phase_z2/`` containing the step JSONs and ``final.html`` that downstream parametrized tests will read. [[feedback_validation_first_for_closed_issues]] — fresh subprocess per session, no frozen artifacts. [[feedback_artifact_status_naming]] — the overall status (PASS / RENDERED_WITH_VISUAL_REGRESSION / PARTIAL_COVERAGE / EMPTY_SHELL_NO_CONTENT) is asserted in u3-u5; u2 only pins the artifact-production contract. """ from __future__ import annotations import json import re import subprocess import sys import uuid from pathlib import Path from typing import Dict, List, NamedTuple import pytest REPO_ROOT = Path(__file__).resolve().parents[2] SAMPLES_DIR = REPO_ROOT / "samples" / "mdx_batch" RUNS_DIR = REPO_ROOT / "data" / "runs" SNAPSHOTS_DIR = Path(__file__).resolve().parent / "__snapshots__" MDX_SET = ("01", "02", "03", "04", "05") class PipelineRun(NamedTuple): mdx_id: str run_id: str returncode: int stdout: str stderr: str run_dir: Path @pytest.fixture(scope="session") def multi_mdx_runs() -> Dict[str, PipelineRun]: """Run the Phase Z pipeline once per mdx in ``MDX_SET`` (session-cached).""" cache: Dict[str, PipelineRun] = {} for mdx_id in MDX_SET: run_id = f"imp91_{mdx_id}_{uuid.uuid4().hex[:8]}" cp = subprocess.run( [ sys.executable, "-m", "src.phase_z2_pipeline", str(SAMPLES_DIR / f"{mdx_id}.mdx"), run_id, ], capture_output=True, text=True, timeout=360, cwd=str(REPO_ROOT), ) cache[mdx_id] = PipelineRun( mdx_id=mdx_id, run_id=run_id, returncode=cp.returncode, stdout=cp.stdout, stderr=cp.stderr, run_dir=RUNS_DIR / run_id / "phase_z2", ) return cache @pytest.mark.integration @pytest.mark.parametrize("mdx_id", MDX_SET) def test_pipeline_run_produces_step20_status( mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun] ) -> None: """Cache contract: every mdx subprocess produces step20_slide_status.json.""" run = multi_mdx_runs[mdx_id] status_path = run.run_dir / "steps" / "step20_slide_status.json" assert status_path.is_file(), ( f"{mdx_id}.mdx run {run.run_id} did not produce {status_path} " f"(returncode={run.returncode}); stderr tail: {run.stderr[-800:]}" ) @pytest.mark.integration @pytest.mark.parametrize("mdx_id", MDX_SET) def test_structural_snapshot_matches( mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun] ) -> None: """u3 — pin observed overall + per-zone selected_template_id against snapshot.""" snapshot = json.loads((SNAPSHOTS_DIR / "structural.json").read_text(encoding="utf-8")) expected = snapshot[mdx_id] run = multi_mdx_runs[mdx_id] status = json.loads( (run.run_dir / "steps" / "step20_slide_status.json").read_text(encoding="utf-8") )["data"] frame_sel = json.loads( (run.run_dir / "steps" / "step09_frame_selection.json").read_text(encoding="utf-8") )["data"] zones = frame_sel.get("per_zone", []) actual_zones = [ {"position": z.get("position"), "selected_template_id": z.get("selected_template_id")} for z in zones ] assert status.get("overall") == expected["overall"], ( f"{mdx_id}.mdx overall drift: expected {expected['overall']!r}, " f"got {status.get('overall')!r}" ) assert len(actual_zones) == expected["zone_count"], ( f"{mdx_id}.mdx zone_count drift: expected {expected['zone_count']}, " f"got {len(actual_zones)} (zones={actual_zones})" ) assert actual_zones == expected["zones"], ( f"{mdx_id}.mdx zone topology drift: expected {expected['zones']}, " f"got {actual_zones}" ) @pytest.mark.integration @pytest.mark.parametrize("mdx_id", MDX_SET) def test_visual_snapshot_matches( mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun] ) -> None: """u4 — pin observed step14 visual_check overflow/clip against snapshot.""" snapshot = json.loads((SNAPSHOTS_DIR / "visual.json").read_text(encoding="utf-8")) expected = snapshot[mdx_id] run = multi_mdx_runs[mdx_id] visual = json.loads( (run.run_dir / "steps" / "step14_visual_check.json").read_text(encoding="utf-8") )["data"] slide_overflowed = visual.get("slide", {}).get("overflowed") slide_body_overflowed = visual.get("slide_body", {}).get("overflowed") visual_passed = visual.get("passed") actual_zones = [ { "position": z.get("position"), "template_id": z.get("template_id"), "overflowed": z.get("overflowed"), "clipped_inner_count": len(z.get("clipped_inner") or []), } for z in visual.get("zones", []) ] assert slide_overflowed == expected["slide_overflowed"], ( f"{mdx_id}.mdx slide.overflowed drift: expected {expected['slide_overflowed']}, " f"got {slide_overflowed}" ) assert slide_body_overflowed == expected["slide_body_overflowed"], ( f"{mdx_id}.mdx slide_body.overflowed drift: expected {expected['slide_body_overflowed']}, " f"got {slide_body_overflowed}" ) assert visual_passed == expected["passed"], ( f"{mdx_id}.mdx visual_check.passed drift: expected {expected['passed']}, " f"got {visual_passed}" ) assert actual_zones == expected["zones"], ( f"{mdx_id}.mdx zone visual drift: expected {expected['zones']}, " f"got {actual_zones}" ) @pytest.mark.integration @pytest.mark.parametrize("mdx_id", MDX_SET) def test_coverage_snapshot_matches( mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun] ) -> None: """u5 — pin observed full_mdx_coverage + section_id parity against snapshot.""" snapshot = json.loads((SNAPSHOTS_DIR / "coverage.json").read_text(encoding="utf-8")) expected = snapshot[mdx_id] run = multi_mdx_runs[mdx_id] status = json.loads( (run.run_dir / "steps" / "step20_slide_status.json").read_text(encoding="utf-8") )["data"] assert status.get("rendered") == expected["rendered"], ( f"{mdx_id}.mdx rendered drift: expected {expected['rendered']}, " f"got {status.get('rendered')}" ) assert status.get("visual_check_passed") == expected["visual_check_passed"], ( f"{mdx_id}.mdx visual_check_passed drift: expected {expected['visual_check_passed']}, " f"got {status.get('visual_check_passed')}" ) assert status.get("full_mdx_coverage") == expected["full_mdx_coverage"], ( f"{mdx_id}.mdx full_mdx_coverage drift: expected {expected['full_mdx_coverage']}, " f"got {status.get('full_mdx_coverage')}" ) assert sorted(status.get("aligned_section_ids") or []) == sorted(expected["aligned_section_ids"]), ( f"{mdx_id}.mdx aligned_section_ids drift: expected {expected['aligned_section_ids']}, " f"got {status.get('aligned_section_ids')}" ) assert sorted(status.get("covered_section_ids") or []) == sorted(expected["covered_section_ids"]), ( f"{mdx_id}.mdx covered_section_ids drift: expected {expected['covered_section_ids']}, " f"got {status.get('covered_section_ids')}" ) assert sorted(status.get("filtered_section_ids") or []) == sorted(expected["filtered_section_ids"]), ( f"{mdx_id}.mdx filtered_section_ids drift: expected {expected['filtered_section_ids']}, " f"got {status.get('filtered_section_ids')}" ) @pytest.mark.integration @pytest.mark.parametrize("mdx_id", MDX_SET) def test_normalize_snapshot_matches( mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun] ) -> None: """u6 — F0 normalize: pin observed step02_normalized shape per mdx.""" snapshot = json.loads((SNAPSHOTS_DIR / "normalize.json").read_text(encoding="utf-8")) expected = snapshot[mdx_id] run = multi_mdx_runs[mdx_id] raw = json.loads( (run.run_dir / "steps" / "step02_normalized.json").read_text(encoding="utf-8") ) d = raw["data"] diag = d.get("stage0_adapter_diagnostics", {}) or {} assets = d.get("stage0_normalized_assets", {}) or {} actual = { "step_num": raw.get("step_num"), "step_status": raw.get("step_status"), "pipeline_path_connected": raw.get("pipeline_path_connected"), "sections_count": d.get("sections_count"), "section_ids": [s.get("section_id") for s in d.get("sections", [])], "orphans_count": len(d.get("orphans") or []), "details_count": len(d.get("details") or []), "adapter_enabled": diag.get("enabled"), "adapter_used": diag.get("used"), "assets_popups_count": len(assets.get("popups") or []), "assets_images_count": len(assets.get("images") or []), "assets_tables_count": len(assets.get("tables") or []), "slide_title_nonempty": bool(d.get("slide_title")), "slide_footer_nonempty": bool(d.get("slide_footer")), } for key, want in expected.items(): got = actual[key] assert got == want, ( f"{mdx_id}.mdx normalize.{key} drift: expected {want!r}, got {got!r}" ) assert len(d.get("sections", [])) == expected["sections_count"], ( f"{mdx_id}.mdx sections list length mismatch with sections_count: " f"sections_count={expected['sections_count']}, got len(sections)={len(d.get('sections', []))}" ) for sect in d.get("sections", []): assert (sect.get("raw_content_length") or 0) > 0, ( f"{mdx_id}.mdx section {sect.get('section_id')!r} has empty raw_content " f"(length={sect.get('raw_content_length')!r}) — normalize lost content" ) @pytest.mark.integration @pytest.mark.parametrize("mdx_id", MDX_SET) def test_v4_ranking_snapshot_matches( mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun] ) -> None: """u7 — F1 V4 ranking: pin observed step05_v4_evidence per mdx. Pins ``v4_source`` (POSIX-normalized for cross-platform stability), ``aligned_section_ids``, and per-section ``{section_id, candidate_status, candidates: [{template_id, label, confidence}]}`` in pipeline-emitted order. Confidence stays at the current 4-decimal rounding emitted by the V4 yaml; drift any axis fails loudly so a re-baseline is a conscious commit, not a silent shift. """ snapshot = json.loads((SNAPSHOTS_DIR / "v4_ranking.json").read_text(encoding="utf-8")) expected = snapshot[mdx_id] run = multi_mdx_runs[mdx_id] raw = json.loads( (run.run_dir / "steps" / "step05_v4_evidence.json").read_text(encoding="utf-8") ) data = raw["data"] actual_v4_source = str(data.get("v4_source") or "").replace("\\", "/") actual_sections = [ { "section_id": ev.get("section_id"), "candidate_status": ev.get("candidate_status"), "candidates": [ { "template_id": c.get("template_id"), "label": c.get("label"), "confidence": c.get("confidence"), } for c in (ev.get("v4_candidates") or []) ], } for ev in (data.get("evidence_per_section") or []) ] assert actual_v4_source == expected["v4_source"], ( f"{mdx_id}.mdx v4_source drift: expected {expected['v4_source']!r}, " f"got {actual_v4_source!r}" ) assert data.get("aligned_section_ids") == expected["aligned_section_ids"], ( f"{mdx_id}.mdx aligned_section_ids drift: expected {expected['aligned_section_ids']}, " f"got {data.get('aligned_section_ids')}" ) assert actual_sections == expected["sections"], ( f"{mdx_id}.mdx V4 ranking drift: expected {expected['sections']}, " f"got {actual_sections}" ) def _slot_payload_zone_shape(zone: dict) -> dict: """Reduce a step12 per_zone entry to a content-agnostic structural shape. Pins builder + slot names + per-slot list cardinality + dict sub-list counts + string non-empty flags. MDX text edits don't drift this; a builder swap, slot rename, missing slot, or list-cardinality change does. Sub-dict shape pins ``sections`` length only — deeper field pinning would require a fresh u8'-axis snapshot. """ sp = zone.get("slot_payload") or {} slot_names = sorted(sp.keys()) list_slot_counts: dict = {} dict_slot_sub_counts: dict = {} string_slot_nonempty: dict = {} for name in slot_names: value = sp[name] if isinstance(value, list): list_slot_counts[name] = len(value) elif isinstance(value, dict): sub: dict = {} for sub_key, sub_val in value.items(): if isinstance(sub_val, list): sub[sub_key] = len(sub_val) dict_slot_sub_counts[name] = sub elif isinstance(value, str): string_slot_nonempty[name] = bool(value.strip()) return { "position": zone.get("position"), "template_id": zone.get("template_id"), "builder": zone.get("builder"), "slot_names": slot_names, "list_slot_counts": list_slot_counts, "dict_slot_sub_counts": dict_slot_sub_counts, "string_slot_nonempty": string_slot_nonempty, } _AI_UNIT_KEYS = ( "source_section_ids", "label", "route_hint", "provisional", "ai_called", "skip_reason", "apply_status", ) @pytest.mark.integration @pytest.mark.parametrize("mdx_id", MDX_SET) def test_ai_classifier_snapshot_matches( mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun] ) -> None: """u9 — F3 classifier-only AI: pin step12/15/16/18 classifier signals. [[feedback_ai_isolation_contract]] / [[feedback_demo_env_toggle_policy]] central invariant: ``ai_called`` MUST stay False per unit by default; activation requires explicit .env toggle, never pipeline default. """ snapshot = json.loads((SNAPSHOTS_DIR / "ai_classifier.json").read_text(encoding="utf-8")) expected = snapshot[mdx_id] steps = multi_mdx_runs[mdx_id].run_dir / "steps" ai = json.loads((steps / "step12_ai_repair.json").read_text(encoding="utf-8"))["data"] fit = json.loads((steps / "step15_fit_classification.json").read_text(encoding="utf-8"))["data"] router = json.loads((steps / "step16_router_decision.json").read_text(encoding="utf-8"))["data"] failure = json.loads((steps / "step18_failure_classification.json").read_text(encoding="utf-8"))["data"] units = [{k: u.get(k) for k in _AI_UNIT_KEYS} for u in (ai.get("per_unit") or [])] actual = { "units": units, "coverage_invariant_status": (ai.get("coverage_invariant") or {}).get("status"), "fit_visual_check_passed": fit.get("visual_check_passed"), "fit_classifications_count": len(fit.get("classifications") or []), "fit_categories_seen": fit.get("categories_seen") or [], "router_active": router.get("router_active"), "router_routed_count": router.get("routed_count"), "router_v4_fallback_used_count": (router.get("v4_fallback_summary") or {}).get("fallback_used_count"), "failure_type": failure.get("failure_type"), } for key, want in expected.items(): assert actual[key] == want, ( f"{mdx_id}.mdx ai_classifier.{key} drift: expected {want!r}, got {actual[key]!r}" ) breaches = [u for u in units if u["ai_called"] is not False] assert not breaches, ( f"{mdx_id}.mdx F3 AI-isolation breach (ai_called must be False by default): {breaches}" ) def _layout_zone_shape(zone: dict) -> dict: """Reduce a step08 per_zone_plan entry to a content-agnostic F4 layout shape.""" sub_zones = zone.get("sub_zones_planned") or [] return { "position": zone.get("position"), "min_height_px": zone.get("min_height_px"), "frame_cardinality_strict": zone.get("frame_cardinality_strict"), "sub_zones_count": len(sub_zones), "region_layout_candidates": zone.get("region_layout_candidates") or [], } @pytest.mark.integration @pytest.mark.parametrize("mdx_id", MDX_SET) def test_layout_snapshot_matches( mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun] ) -> None: """u10 — F4 layout: pin step07_layout + step08_zone_region_ratios per mdx. Pins the layout decision path (``layout_preset`` / ``auto_layout_preset`` / ``layout_override_applied`` / ``layout_candidates`` / ``computation``) + planning geometry (``heights_px`` / ``widths_px`` / ``ratios`` / ``width_ratios``) + per-zone planning shape (``position`` / ``min_height_px`` / ``frame_cardinality_strict`` / ``sub_zones_count`` / ``region_layout_candidates``). ``step_status='partial'`` is the Step 7/8 schema-lock marker (region-level ratio + count-based v0). mdx 03 is the only ``layout_override_applied=True`` case (vertical-2 user override per project_mdx03_frame_lock 2026-05-15 lock); drift here flips F4 layer-A axis. mdx 04 ``top`` zone pins ``None`` for min_height_px + frame_cardinality_strict (no frame cardinality on the top zone — observed current state, not invented). """ snapshot = json.loads((SNAPSHOTS_DIR / "layout.json").read_text(encoding="utf-8")) expected = snapshot[mdx_id] run = multi_mdx_runs[mdx_id] s7 = json.loads( (run.run_dir / "steps" / "step07_layout.json").read_text(encoding="utf-8") ) s8 = json.loads( (run.run_dir / "steps" / "step08_zone_region_ratios.json").read_text(encoding="utf-8") ) d7 = s7.get("data") or {} d8 = s8.get("data") or {} css = d7.get("layout_css") or {} actual = { "step7_step_status": s7.get("step_status"), "step7_pipeline_path_connected": s7.get("pipeline_path_connected"), "layout_preset": d7.get("layout_preset"), "auto_layout_preset": d7.get("auto_layout_preset"), "layout_override_applied": d7.get("layout_override_applied"), "zones_count": d7.get("zones_count"), "unit_count": d7.get("unit_count"), "layout_candidates": d7.get("layout_candidates") or [], "computation": css.get("computation"), "dynamic_rows": css.get("dynamic_rows"), "dynamic_cols": css.get("dynamic_cols"), "heights_px": css.get("heights_px"), "widths_px": css.get("widths_px"), "ratios": css.get("ratios"), "width_ratios": css.get("width_ratios"), "step8_step_status": s8.get("step_status"), "step8_pipeline_path_connected": s8.get("pipeline_path_connected"), "zone_heights_px_planned": d8.get("zone_heights_px_planned"), "zone_widths_px_planned": d8.get("zone_widths_px_planned"), "zone_col_ratios_planned": d8.get("zone_col_ratios_planned"), "per_zone_layout_shape": [ _layout_zone_shape(z) for z in (d8.get("per_zone_plan") or []) ], } for key, want in expected.items(): got = actual[key] assert got == want, ( f"{mdx_id}.mdx layout.{key} drift: expected {want!r}, got {got!r}" ) @pytest.mark.integration @pytest.mark.parametrize("mdx_id", MDX_SET) def test_slot_payload_snapshot_matches( mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun] ) -> None: """u8 — F2 slot_payload: pin observed step12_slot_payload per_zone shape per mdx. Snapshot pins content-agnostic structural shape (builder + slot names + list cardinality + dict sub-list counts + string non-empty flags), not literal payload text. MDX wording tweaks won't drift this; builder swap, slot rename, slot count drift, or __empty__ transitions will. Empty zones must have ``builder is None`` and no slots — this is the IMP-87 empty_shell honesty contract surface for F2. """ snapshot = json.loads((SNAPSHOTS_DIR / "slot_payload.json").read_text(encoding="utf-8")) expected = snapshot[mdx_id] run = multi_mdx_runs[mdx_id] raw = json.loads( (run.run_dir / "steps" / "step12_slot_payload.json").read_text(encoding="utf-8") ) per_zone = raw["data"].get("per_zone") or [] actual = [_slot_payload_zone_shape(z) for z in per_zone] assert len(actual) == len(expected), ( f"{mdx_id}.mdx step12 zone_count drift: expected {len(expected)}, " f"got {len(actual)} (positions={[z.get('position') for z in actual]})" ) for idx, (act, exp) in enumerate(zip(actual, expected)): assert act == exp, ( f"{mdx_id}.mdx step12 zone[{idx}] ({exp.get('position')!r}) shape drift: " f"expected {exp}, got {act}" ) _ZONE_TAG_RE = re.compile( r']*\sdata-zone-position="([^"]+)"[^>]*\sdata-template-id="([^"]+)"', re.IGNORECASE, ) _SLIDE_ROOT_RE = re.compile(r'([^<]*)', re.IGNORECASE) def _extract_html_zone_topology(html: str) -> List[dict]: """Extract (position, template_id) pairs in document order from final.html.""" return [ {"position": m.group(1), "template_id": m.group(2)} for m in _ZONE_TAG_RE.finditer(html) ] @pytest.mark.integration @pytest.mark.parametrize("mdx_id", MDX_SET) def test_final_html_snapshot_matches( mdx_id: str, multi_mdx_runs: Dict[str, PipelineRun] ) -> None: """u11 — F5 final.html extraction: pin step13_render metadata + on-disk HTML structure. Cross-snapshot parity gate: ``html_zone_topology`` (extracted from final.html via ``data-zone-position`` / ``data-template-id`` markers) MUST equal step12 slot_payload (u8) ``(position, template_id)`` sequence — Jinja2 renders from step12, not step09, so this is the correct upstream parity (step09 selection vs step12 ``__empty__`` collapse is intentional per IMP-87 honesty gate and surfaces in u8). Drift between final.html and slot_payload = render pipeline disconnect. ``final.html`` on-disk size also MUST equal step13's reported ``final_html_size_bytes`` — byte parity proves no truncation / no double-write race. """ snapshot = json.loads((SNAPSHOTS_DIR / "final_html.json").read_text(encoding="utf-8")) expected = snapshot[mdx_id] run = multi_mdx_runs[mdx_id] raw13 = json.loads( (run.run_dir / "steps" / "step13_render.json").read_text(encoding="utf-8") ) d13 = raw13.get("data") or {} ri = d13.get("render_inputs") or {} final_path = run.run_dir / "final.html" assert final_path.is_file(), f"{mdx_id}.mdx final.html missing at {final_path}" html = final_path.read_text(encoding="utf-8") title_match = _TITLE_RE.search(html) html_title = title_match.group(1).strip() if title_match else "" html_topology = _extract_html_zone_topology(html) actual = { "step13_status": raw13.get("step_status"), "step13_pipeline_path_connected": raw13.get("pipeline_path_connected"), "render_inputs_zones_count": ri.get("zones_count"), "render_inputs_layout_preset": ri.get("layout_preset"), "render_inputs_slide_title_nonempty": bool((ri.get("slide_title") or "").strip()), "render_inputs_slide_footer_nonempty": bool((ri.get("slide_footer") or "").strip()), "html_title_matches_render_input": html_title == (ri.get("slide_title") or "").strip(), "html_slide_root_count": len(_SLIDE_ROOT_RE.findall(html)), "html_slide_footer_present": '