"""IMP-48 (#77) u6 — Unit tests for ``resplit_all_reject_merges`` helper. Scope (this slice — Stage 2 plan u6): The helper ``resplit_all_reject_merges`` in ``src/phase_z2_composition.py`` is a deterministic Step 6 post-pass that decomposes a merged ``parent_merged`` / ``parent_merged_inferred`` unit carrying ``label="reject"`` into per-section singles. This file exercises the helper directly with synthetic stub V4 matches + stub sections; it does NOT touch the pipeline hook (that is u7/u8/u9's regression scope). u6 cases covered (Stage 2 plan): 1. **Detection** — merged-reject is detected when ``merge_type ∈ {"parent_merged", "parent_merged_inferred"}``, ``label == "reject"``, and ``len(source_section_ids) >= 2``. Singles / non-reject merges / one-child merges are ignored. 2. **Beneficial split** — at least one rebuilt single with ``label != "reject"`` → ``applied=True``, merged replaced by per-section singles tagged ``selection_path="resplit_from_merge"``. 3. **Non-beneficial keep-merged** — all rebuilt singles are reject → ``applied=False``, merged kept, ``skipped_units[0].reason == "no_beneficial_split"``. 4. **Layout-cap keep-merged** — projected post-split count > 4 → EVERY would-be split aborts with ``reason="layout_cap_exceeded"`` (Stage 2 Q2 default — no partial split; v0 ``select_layout_preset`` supports 1~4 units only). 5. **Override skip** — ``section_assignment_override=True`` short- circuits before detection with ``skipped_reason= "section_assignment_override"`` (IMP-06 #6 zoneSections stays ground truth). 6. **Coverage invariant** — missing section / missing V4 match records ``skipped_units[*].reason == "incomplete_rebuild"`` with the missing section ids surfaced. Merged unit is preserved. 7. **Idempotent re-entry** — calling the helper again on its own output is a no-op (singles are excluded by ``merge_type=="single"``). 8. **Audit shape invariants** — Stage 1 schema (``applied``, ``split_units``, ``skipped_units``, ``post_split_unit_count``, ``post_split_layout_preset``) is always present. ★ AI=0 throughout — PZ-1 deterministic code path only. ★ No-hardcoding (RULE_7) — stubs use MOCK_ prefixed identifiers; no real catalog template_id / frame_id / MDX sample identifier leaks. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Optional from src.phase_z2_composition import ( CompositionUnit, resplit_all_reject_merges, ) # ─── Synthetic stubs (MOCK_ prefix mandatory — IMP-30 u3 convention) ─── @dataclass class _StubV4Match: template_id: str frame_id: str frame_number: int confidence: float label: str v4_rank: Optional[int] = None selection_path: str = "rank_1" fallback_reason: Optional[str] = None provisional: bool = False @dataclass class _StubSection: section_id: str title: str = "" raw_content: str = "" _LABEL_TO_STATUS = { "use_as_is": "matched_zone", "light_edit": "adapt_matched_zone", "restructure": "extract_matched_zone", "reject": "fallback_candidate", } _ALLOWED_STATUSES = {"matched_zone", "adapt_matched_zone"} def _make_lookup(matches: dict[str, _StubV4Match]): """Build a (section_id) -> V4Match | None lookup over the given map.""" def _fn(section_id: str) -> Optional[_StubV4Match]: return matches.get(section_id) return _fn def _make_merged_unit( *, merge_type: str, source_section_ids: list[str], label: str = "reject", template_id: str = "MOCK_TMPL_PARENT", ) -> CompositionUnit: """Construct a merged CompositionUnit shaped like collect_candidates output.""" return CompositionUnit( source_section_ids=list(source_section_ids), merge_type=merge_type, frame_template_id=template_id, frame_id="MOCK_FRM_PARENT", frame_number=99, confidence=0.10, label=label, phase_z_status=_LABEL_TO_STATUS.get(label, "unknown"), raw_content="MERGED RAW CONTENT (joined string from children)", title="MOCK_PARENT", ) def _make_single_unit( section_id: str, *, label: str = "use_as_is", template_id: Optional[str] = None, ) -> CompositionUnit: """Construct a single CompositionUnit shaped like collect_candidates output.""" return CompositionUnit( source_section_ids=[section_id], merge_type="single", frame_template_id=template_id or f"MOCK_TMPL_{section_id}", frame_id=f"MOCK_FRM_{section_id}", frame_number=hash(section_id) % 32, confidence=0.80, label=label, phase_z_status=_LABEL_TO_STATUS.get(label, "unknown"), raw_content=f"section {section_id} content", title=section_id, ) # ─── Case 1 : Detection — what counts as a merged-reject ───────────── def test_detection_ignores_single_units(): """``merge_type="single"`` units never enter detection (idempotency anchor).""" units = [_make_single_unit("MOCK_S1", label="reject")] sections = [_StubSection("MOCK_S1", raw_content="single reject")] lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.1, "reject"), }) out_units, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert out_units == units assert audit["applied"] is False assert audit["detected_units"] == [] assert audit["skipped_reason"] == "no_detection" def test_detection_ignores_non_reject_merge(): """A merged unit with ``label != "reject"`` is not in scope.""" units = [_make_merged_unit( merge_type="parent_merged", source_section_ids=["MOCK_S1", "MOCK_S2"], label="light_edit", )] sections = [ _StubSection("MOCK_S1", raw_content="c1"), _StubSection("MOCK_S2", raw_content="c2"), ] lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"), "MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.9, "use_as_is"), }) out_units, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert out_units == units assert audit["applied"] is False assert audit["detected_units"] == [] assert audit["skipped_reason"] == "no_detection" def test_detection_ignores_one_child_merge(): """``len(source_section_ids) < 2`` excludes from detection.""" units = [_make_merged_unit( merge_type="parent_merged", source_section_ids=["MOCK_S1"], label="reject", )] sections = [_StubSection("MOCK_S1", raw_content="c1")] lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.1, "reject"), }) out_units, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert out_units == units assert audit["applied"] is False assert audit["detected_units"] == [] def test_detection_picks_parent_merged_reject(): """``parent_merged`` + reject + ≥2 sids → detected.""" units = [_make_merged_unit( merge_type="parent_merged", source_section_ids=["MOCK_S1", "MOCK_S2"], label="reject", )] sections = [ _StubSection("MOCK_S1", raw_content="c1"), _StubSection("MOCK_S2", raw_content="c2"), ] # All children also reject → detection only; gating skipped via no_beneficial. lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.1, "reject"), "MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.1, "reject"), }) _, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert len(audit["detected_units"]) == 1 assert audit["detected_units"][0]["merge_type"] == "parent_merged" assert audit["detected_units"][0]["label"] == "reject" assert audit["detected_units"][0]["source_section_ids"] == ["MOCK_S1", "MOCK_S2"] def test_detection_picks_parent_merged_inferred_reject(): """``parent_merged_inferred`` + reject + ≥2 sids → detected.""" units = [_make_merged_unit( merge_type="parent_merged_inferred", source_section_ids=["MOCK_S1", "MOCK_S2"], label="reject", )] sections = [ _StubSection("MOCK_S1", raw_content="c1"), _StubSection("MOCK_S2", raw_content="c2"), ] lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.1, "reject"), "MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.1, "reject"), }) _, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert len(audit["detected_units"]) == 1 assert audit["detected_units"][0]["merge_type"] == "parent_merged_inferred" # ─── Case 2 : Beneficial split — applied path ──────────────────────── def test_beneficial_split_applied_when_one_child_non_reject(): """≥1 rebuilt single with ``label != "reject"`` → apply the split.""" merged = _make_merged_unit( merge_type="parent_merged", source_section_ids=["MOCK_S1", "MOCK_S2"], label="reject", template_id="MOCK_TMPL_PARENT_DISCARDED", ) units = [merged] sections = [ _StubSection("MOCK_S1", title="S1", raw_content="MDX raw of S1"), _StubSection("MOCK_S2", title="S2", raw_content="MDX raw of S2"), ] lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"), "MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.1, "reject"), }) out_units, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert audit["applied"] is True # Merged removed, two singles inserted in order. assert len(out_units) == 2 assert [u.merge_type for u in out_units] == ["single", "single"] assert [u.source_section_ids for u in out_units] == [["MOCK_S1"], ["MOCK_S2"]] # ★ feedback_ai_isolation_contract — singles use their OWN rank-1 V4 evidence, # not the discarded merged parent's template_id. assert out_units[0].frame_template_id == "MOCK_TMPL_S1" assert out_units[1].frame_template_id == "MOCK_TMPL_S2" assert merged.frame_template_id not in {out_units[0].frame_template_id, out_units[1].frame_template_id} # ★ MDX_raw_content_invariant — singles use per-section raw_content (not the joined merged string). assert out_units[0].raw_content == "MDX raw of S1" assert out_units[1].raw_content == "MDX raw of S2" # ★ Stage 1 Q3 YES — selection_path tag applied only to split-produced singles. assert out_units[0].selection_path == "resplit_from_merge" assert out_units[1].selection_path == "resplit_from_merge" # Audit shape. assert len(audit["split_units"]) == 1 split = audit["split_units"][0] assert split["merged_source_section_ids"] == ["MOCK_S1", "MOCK_S2"] assert split["non_reject_count"] == 1 assert {s["section_id"] for s in split["split_singles"]} == {"MOCK_S1", "MOCK_S2"} assert audit["skipped_units"] == [] assert audit["post_split_unit_count"] == 2 assert audit["post_split_layout_preset"] == "horizontal-2" # ``skipped_reason`` removed when applied=True. assert "skipped_reason" not in audit def test_beneficial_split_preserves_full_coverage(): """Coverage invariant — split increases unit count, never reduces section coverage.""" merged = _make_merged_unit( merge_type="parent_merged_inferred", source_section_ids=["MOCK_S1", "MOCK_S2", "MOCK_S3"], label="reject", ) units = [merged] sections = [ _StubSection("MOCK_S1", raw_content="c1"), _StubSection("MOCK_S2", raw_content="c2"), _StubSection("MOCK_S3", raw_content="c3"), ] lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"), "MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.8, "light_edit"), "MOCK_S3": _StubV4Match("MOCK_TMPL_S3", "MOCK_FRM_S3", 3, 0.1, "reject"), }) out_units, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert audit["applied"] is True covered = {sid for u in out_units for sid in u.source_section_ids} assert covered == set(merged.source_section_ids) # ★ dropped_zero_invariant # ─── Case 3 : Non-beneficial keep-merged ───────────────────────────── def test_non_beneficial_split_keeps_merged_when_all_children_reject(): """All rebuilt singles are reject → split is not beneficial; merged kept.""" merged = _make_merged_unit( merge_type="parent_merged", source_section_ids=["MOCK_S1", "MOCK_S2"], label="reject", ) units = [merged] sections = [ _StubSection("MOCK_S1", raw_content="c1"), _StubSection("MOCK_S2", raw_content="c2"), ] lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.1, "reject"), "MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.1, "reject"), }) out_units, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert audit["applied"] is False # Merged preserved by identity (IMP-47B #76 handles it directly). assert out_units == [merged] assert audit["split_units"] == [] assert len(audit["skipped_units"]) == 1 skip = audit["skipped_units"][0] assert skip["reason"] == "no_beneficial_split" assert skip["merged_source_section_ids"] == ["MOCK_S1", "MOCK_S2"] assert audit["post_split_unit_count"] == 1 assert audit["post_split_layout_preset"] is None # ─── Case 4 : Layout-cap keep-merged ───────────────────────────────── def test_layout_cap_aborts_split_when_projected_count_exceeds_four(): """Projected post-split count > 4 → ALL would-be splits aborted. Setup: 1 single (non-target) + 1 merged-reject of 4 sections. Initial unit count = 2. If split applied, post-split = 5 (> 4 cap). Stage 2 Q2 default — keep merged, no partial split. """ other_single = _make_single_unit("MOCK_OTHER", label="use_as_is") merged = _make_merged_unit( merge_type="parent_merged", source_section_ids=["MOCK_S1", "MOCK_S2", "MOCK_S3", "MOCK_S4"], label="reject", ) units = [other_single, merged] sections = [ _StubSection("MOCK_OTHER", raw_content="other"), _StubSection("MOCK_S1", raw_content="c1"), _StubSection("MOCK_S2", raw_content="c2"), _StubSection("MOCK_S3", raw_content="c3"), _StubSection("MOCK_S4", raw_content="c4"), ] lookup = _make_lookup({ "MOCK_OTHER": _StubV4Match("MOCK_TMPL_O", "MOCK_FRM_O", 0, 0.9, "use_as_is"), # Beneficial in principle (some non-reject), but cap aborts. "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"), "MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.8, "light_edit"), "MOCK_S3": _StubV4Match("MOCK_TMPL_S3", "MOCK_FRM_S3", 3, 0.1, "reject"), "MOCK_S4": _StubV4Match("MOCK_TMPL_S4", "MOCK_FRM_S4", 4, 0.1, "reject"), }) out_units, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert audit["applied"] is False assert out_units == units # byte-identical fallback for IMP-47B handoff assert audit["split_units"] == [] assert len(audit["skipped_units"]) == 1 skip = audit["skipped_units"][0] assert skip["reason"] == "layout_cap_exceeded" assert skip["projected_post_split_count"] == 5 assert audit["post_split_unit_count"] == 2 assert audit["post_split_layout_preset"] is None # ─── Case 5 : Override skip ────────────────────────────────────────── def test_override_skip_short_circuits_before_detection(): """``section_assignment_override=True`` (IMP-06 #6) makes the helper a no-op.""" merged = _make_merged_unit( merge_type="parent_merged", source_section_ids=["MOCK_S1", "MOCK_S2"], label="reject", ) units = [merged] sections = [ _StubSection("MOCK_S1", raw_content="c1"), _StubSection("MOCK_S2", raw_content="c2"), ] lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"), "MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.8, "light_edit"), }) out_units, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, section_assignment_override=True, ) assert out_units == units # byte-identical assert audit["applied"] is False assert audit["skipped_reason"] == "section_assignment_override" # Override is upstream of detection — never enumerates. assert audit["detected_units"] == [] assert audit["split_units"] == [] assert audit["skipped_units"] == [] # ─── Case 6 : Coverage invariant — incomplete rebuild ──────────────── def test_incomplete_rebuild_keeps_merged_when_section_missing(): """A merged unit referencing a section absent from ``sections`` → ``incomplete_rebuild`` skip with the missing id surfaced. """ merged = _make_merged_unit( merge_type="parent_merged", source_section_ids=["MOCK_S1", "MOCK_MISSING"], label="reject", ) units = [merged] sections = [_StubSection("MOCK_S1", raw_content="c1")] # MOCK_MISSING absent lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"), }) out_units, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert audit["applied"] is False assert out_units == [merged] assert audit["split_units"] == [] assert len(audit["skipped_units"]) == 1 skip = audit["skipped_units"][0] assert skip["reason"] == "incomplete_rebuild" assert skip["missing_section_ids"] == ["MOCK_MISSING"] def test_incomplete_rebuild_keeps_merged_when_v4_match_missing(): """A merged unit referencing a section without V4 evidence → ``incomplete_rebuild`` skip. """ merged = _make_merged_unit( merge_type="parent_merged_inferred", source_section_ids=["MOCK_S1", "MOCK_S2"], label="reject", ) units = [merged] sections = [ _StubSection("MOCK_S1", raw_content="c1"), _StubSection("MOCK_S2", raw_content="c2"), ] lookup = _make_lookup({ # MOCK_S2 deliberately omitted to simulate missing V4 evidence. "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"), }) out_units, audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert audit["applied"] is False assert out_units == [merged] skip = audit["skipped_units"][0] assert skip["reason"] == "incomplete_rebuild" assert skip["missing_section_ids"] == ["MOCK_S2"] # ─── Case 7 : Idempotent re-entry ──────────────────────────────────── def test_idempotent_re_entry_is_noop_after_split(): """Running the helper a second time on its own output detects nothing (singles are excluded by construction). max_retry=1 (Stage 2 lock). """ merged = _make_merged_unit( merge_type="parent_merged", source_section_ids=["MOCK_S1", "MOCK_S2"], label="reject", ) units = [merged] sections = [ _StubSection("MOCK_S1", raw_content="c1"), _StubSection("MOCK_S2", raw_content="c2"), ] lookup = _make_lookup({ "MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"), "MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.8, "light_edit"), }) first_out, first_audit = resplit_all_reject_merges( units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert first_audit["applied"] is True # Second pass over the post-resplit list — should be a no-op. second_out, second_audit = resplit_all_reject_merges( first_out, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert second_audit["applied"] is False assert second_audit["detected_units"] == [] assert second_audit["skipped_reason"] == "no_detection" assert second_out == first_out # output byte-identical # ─── Case 8 : Audit shape invariants ───────────────────────────────── def test_audit_payload_always_has_stage_1_keys(): """Every return path must include the Stage 1 schema keys (additive only).""" required = { "applied", "split_units", "skipped_units", "post_split_unit_count", "post_split_layout_preset", } # 1) override skip _, audit_override = resplit_all_reject_merges( [], [], _make_lookup({}), _LABEL_TO_STATUS, _ALLOWED_STATUSES, section_assignment_override=True, ) assert required.issubset(audit_override) # 2) no detection (empty units) _, audit_empty = resplit_all_reject_merges( [], [], _make_lookup({}), _LABEL_TO_STATUS, _ALLOWED_STATUSES, ) assert required.issubset(audit_empty) assert audit_empty["post_split_unit_count"] == 0 assert audit_empty["post_split_layout_preset"] is None assert audit_empty["skipped_reason"] == "no_detection" # 3) applied path — see test_beneficial_split_applied_when_one_child_non_reject # already asserts the full applied shape.