Files
C.E.L_Slide_test2/tests/test_phase_z2_composition_imp48.py
kyeongmin ee97f4fc78 feat(#77): IMP-48 composition planner re-split on all-reject (u1~u9)
Add resplit_all_reject_merges() helper in phase_z2_composition.py that
detects parent_merged / parent_merged_inferred units with label=reject
and rebuilds them as per-section single units using each section's own
rank-1 V4 evidence (no frame swap, MDX raw_content preserved).

Pipeline hook fires once after Step 6 settling chain (u12/u4/empty-shell)
and section_assignment_plan resolution, before Step 6 artifact write.
Guards: beneficial-split rule (>=1 non-reject), coverage equality, layout
cap (>4 abort), max_retry=1, section_assignment_override short-circuit.

Audit: comp_debug["imp48_resplit"] additive payload (applied, split_units,
skipped_units, post_split_unit_count, post_split_layout_preset);
selection_path="resplit_from_merge" telemetry on rebuilt singles;
layout_preset re-derived via select_layout_preset(new_units).

Tests: 39/39 PASS (composition u1~u6: 14 cases; pipeline u7~u9: 25 cases).
Scoped regression 720/6 with 6 failures isolated as pre-existing on
baseline 79f9ea5 (independent of IMP-48). mdx03 golden lock preserved.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 05:00:07 +09:00

588 lines
22 KiB
Python

"""IMP-48 (#77) u6 — Unit tests for ``resplit_all_reject_merges`` helper.
Scope (this slice — Stage 2 plan u6):
The helper ``resplit_all_reject_merges`` in
``src/phase_z2_composition.py`` is a deterministic Step 6 post-pass that
decomposes a merged ``parent_merged`` / ``parent_merged_inferred`` unit
carrying ``label="reject"`` into per-section singles. This file exercises
the helper directly with synthetic stub V4 matches + stub sections; it
does NOT touch the pipeline hook (that is u7/u8/u9's regression scope).
u6 cases covered (Stage 2 plan):
1. **Detection** — merged-reject is detected when
``merge_type ∈ {"parent_merged", "parent_merged_inferred"}``,
``label == "reject"``, and ``len(source_section_ids) >= 2``.
Singles / non-reject merges / one-child merges are ignored.
2. **Beneficial split** — at least one rebuilt single with
``label != "reject"`` → ``applied=True``, merged replaced by
per-section singles tagged ``selection_path="resplit_from_merge"``.
3. **Non-beneficial keep-merged** — all rebuilt singles are reject →
``applied=False``, merged kept, ``skipped_units[0].reason ==
"no_beneficial_split"``.
4. **Layout-cap keep-merged** — projected post-split count > 4 →
EVERY would-be split aborts with ``reason="layout_cap_exceeded"``
(Stage 2 Q2 default — no partial split; v0 ``select_layout_preset``
supports 1~4 units only).
5. **Override skip** — ``section_assignment_override=True`` short-
circuits before detection with ``skipped_reason=
"section_assignment_override"`` (IMP-06 #6 zoneSections stays
ground truth).
6. **Coverage invariant** — missing section / missing V4 match
records ``skipped_units[*].reason == "incomplete_rebuild"`` with
the missing section ids surfaced. Merged unit is preserved.
7. **Idempotent re-entry** — calling the helper again on its own
output is a no-op (singles are excluded by ``merge_type=="single"``).
8. **Audit shape invariants** — Stage 1 schema (``applied``,
``split_units``, ``skipped_units``, ``post_split_unit_count``,
``post_split_layout_preset``) is always present.
★ AI=0 throughout — PZ-1 deterministic code path only.
★ No-hardcoding (RULE_7) — stubs use MOCK_ prefixed identifiers; no
real catalog template_id / frame_id / MDX sample identifier leaks.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
from src.phase_z2_composition import (
CompositionUnit,
resplit_all_reject_merges,
)
# ─── Synthetic stubs (MOCK_ prefix mandatory — IMP-30 u3 convention) ───
@dataclass
class _StubV4Match:
template_id: str
frame_id: str
frame_number: int
confidence: float
label: str
v4_rank: Optional[int] = None
selection_path: str = "rank_1"
fallback_reason: Optional[str] = None
provisional: bool = False
@dataclass
class _StubSection:
section_id: str
title: str = ""
raw_content: str = ""
_LABEL_TO_STATUS = {
"use_as_is": "matched_zone",
"light_edit": "adapt_matched_zone",
"restructure": "extract_matched_zone",
"reject": "fallback_candidate",
}
_ALLOWED_STATUSES = {"matched_zone", "adapt_matched_zone"}
def _make_lookup(matches: dict[str, _StubV4Match]):
"""Build a (section_id) -> V4Match | None lookup over the given map."""
def _fn(section_id: str) -> Optional[_StubV4Match]:
return matches.get(section_id)
return _fn
def _make_merged_unit(
*,
merge_type: str,
source_section_ids: list[str],
label: str = "reject",
template_id: str = "MOCK_TMPL_PARENT",
) -> CompositionUnit:
"""Construct a merged CompositionUnit shaped like collect_candidates output."""
return CompositionUnit(
source_section_ids=list(source_section_ids),
merge_type=merge_type,
frame_template_id=template_id,
frame_id="MOCK_FRM_PARENT",
frame_number=99,
confidence=0.10,
label=label,
phase_z_status=_LABEL_TO_STATUS.get(label, "unknown"),
raw_content="MERGED RAW CONTENT (joined string from children)",
title="MOCK_PARENT",
)
def _make_single_unit(
section_id: str,
*,
label: str = "use_as_is",
template_id: Optional[str] = None,
) -> CompositionUnit:
"""Construct a single CompositionUnit shaped like collect_candidates output."""
return CompositionUnit(
source_section_ids=[section_id],
merge_type="single",
frame_template_id=template_id or f"MOCK_TMPL_{section_id}",
frame_id=f"MOCK_FRM_{section_id}",
frame_number=hash(section_id) % 32,
confidence=0.80,
label=label,
phase_z_status=_LABEL_TO_STATUS.get(label, "unknown"),
raw_content=f"section {section_id} content",
title=section_id,
)
# ─── Case 1 : Detection — what counts as a merged-reject ─────────────
def test_detection_ignores_single_units():
"""``merge_type="single"`` units never enter detection (idempotency anchor)."""
units = [_make_single_unit("MOCK_S1", label="reject")]
sections = [_StubSection("MOCK_S1", raw_content="single reject")]
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.1, "reject"),
})
out_units, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert out_units == units
assert audit["applied"] is False
assert audit["detected_units"] == []
assert audit["skipped_reason"] == "no_detection"
def test_detection_ignores_non_reject_merge():
"""A merged unit with ``label != "reject"`` is not in scope."""
units = [_make_merged_unit(
merge_type="parent_merged",
source_section_ids=["MOCK_S1", "MOCK_S2"],
label="light_edit",
)]
sections = [
_StubSection("MOCK_S1", raw_content="c1"),
_StubSection("MOCK_S2", raw_content="c2"),
]
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"),
"MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.9, "use_as_is"),
})
out_units, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert out_units == units
assert audit["applied"] is False
assert audit["detected_units"] == []
assert audit["skipped_reason"] == "no_detection"
def test_detection_ignores_one_child_merge():
"""``len(source_section_ids) < 2`` excludes from detection."""
units = [_make_merged_unit(
merge_type="parent_merged",
source_section_ids=["MOCK_S1"],
label="reject",
)]
sections = [_StubSection("MOCK_S1", raw_content="c1")]
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.1, "reject"),
})
out_units, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert out_units == units
assert audit["applied"] is False
assert audit["detected_units"] == []
def test_detection_picks_parent_merged_reject():
"""``parent_merged`` + reject + ≥2 sids → detected."""
units = [_make_merged_unit(
merge_type="parent_merged",
source_section_ids=["MOCK_S1", "MOCK_S2"],
label="reject",
)]
sections = [
_StubSection("MOCK_S1", raw_content="c1"),
_StubSection("MOCK_S2", raw_content="c2"),
]
# All children also reject → detection only; gating skipped via no_beneficial.
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.1, "reject"),
"MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.1, "reject"),
})
_, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert len(audit["detected_units"]) == 1
assert audit["detected_units"][0]["merge_type"] == "parent_merged"
assert audit["detected_units"][0]["label"] == "reject"
assert audit["detected_units"][0]["source_section_ids"] == ["MOCK_S1", "MOCK_S2"]
def test_detection_picks_parent_merged_inferred_reject():
"""``parent_merged_inferred`` + reject + ≥2 sids → detected."""
units = [_make_merged_unit(
merge_type="parent_merged_inferred",
source_section_ids=["MOCK_S1", "MOCK_S2"],
label="reject",
)]
sections = [
_StubSection("MOCK_S1", raw_content="c1"),
_StubSection("MOCK_S2", raw_content="c2"),
]
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.1, "reject"),
"MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.1, "reject"),
})
_, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert len(audit["detected_units"]) == 1
assert audit["detected_units"][0]["merge_type"] == "parent_merged_inferred"
# ─── Case 2 : Beneficial split — applied path ────────────────────────
def test_beneficial_split_applied_when_one_child_non_reject():
"""≥1 rebuilt single with ``label != "reject"`` → apply the split."""
merged = _make_merged_unit(
merge_type="parent_merged",
source_section_ids=["MOCK_S1", "MOCK_S2"],
label="reject",
template_id="MOCK_TMPL_PARENT_DISCARDED",
)
units = [merged]
sections = [
_StubSection("MOCK_S1", title="S1", raw_content="MDX raw of S1"),
_StubSection("MOCK_S2", title="S2", raw_content="MDX raw of S2"),
]
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"),
"MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.1, "reject"),
})
out_units, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert audit["applied"] is True
# Merged removed, two singles inserted in order.
assert len(out_units) == 2
assert [u.merge_type for u in out_units] == ["single", "single"]
assert [u.source_section_ids for u in out_units] == [["MOCK_S1"], ["MOCK_S2"]]
# ★ feedback_ai_isolation_contract — singles use their OWN rank-1 V4 evidence,
# not the discarded merged parent's template_id.
assert out_units[0].frame_template_id == "MOCK_TMPL_S1"
assert out_units[1].frame_template_id == "MOCK_TMPL_S2"
assert merged.frame_template_id not in {out_units[0].frame_template_id,
out_units[1].frame_template_id}
# ★ MDX_raw_content_invariant — singles use per-section raw_content (not the joined merged string).
assert out_units[0].raw_content == "MDX raw of S1"
assert out_units[1].raw_content == "MDX raw of S2"
# ★ Stage 1 Q3 YES — selection_path tag applied only to split-produced singles.
assert out_units[0].selection_path == "resplit_from_merge"
assert out_units[1].selection_path == "resplit_from_merge"
# Audit shape.
assert len(audit["split_units"]) == 1
split = audit["split_units"][0]
assert split["merged_source_section_ids"] == ["MOCK_S1", "MOCK_S2"]
assert split["non_reject_count"] == 1
assert {s["section_id"] for s in split["split_singles"]} == {"MOCK_S1", "MOCK_S2"}
assert audit["skipped_units"] == []
assert audit["post_split_unit_count"] == 2
assert audit["post_split_layout_preset"] == "horizontal-2"
# ``skipped_reason`` removed when applied=True.
assert "skipped_reason" not in audit
def test_beneficial_split_preserves_full_coverage():
"""Coverage invariant — split increases unit count, never reduces section coverage."""
merged = _make_merged_unit(
merge_type="parent_merged_inferred",
source_section_ids=["MOCK_S1", "MOCK_S2", "MOCK_S3"],
label="reject",
)
units = [merged]
sections = [
_StubSection("MOCK_S1", raw_content="c1"),
_StubSection("MOCK_S2", raw_content="c2"),
_StubSection("MOCK_S3", raw_content="c3"),
]
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"),
"MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.8, "light_edit"),
"MOCK_S3": _StubV4Match("MOCK_TMPL_S3", "MOCK_FRM_S3", 3, 0.1, "reject"),
})
out_units, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert audit["applied"] is True
covered = {sid for u in out_units for sid in u.source_section_ids}
assert covered == set(merged.source_section_ids) # ★ dropped_zero_invariant
# ─── Case 3 : Non-beneficial keep-merged ─────────────────────────────
def test_non_beneficial_split_keeps_merged_when_all_children_reject():
"""All rebuilt singles are reject → split is not beneficial; merged kept."""
merged = _make_merged_unit(
merge_type="parent_merged",
source_section_ids=["MOCK_S1", "MOCK_S2"],
label="reject",
)
units = [merged]
sections = [
_StubSection("MOCK_S1", raw_content="c1"),
_StubSection("MOCK_S2", raw_content="c2"),
]
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.1, "reject"),
"MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.1, "reject"),
})
out_units, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert audit["applied"] is False
# Merged preserved by identity (IMP-47B #76 handles it directly).
assert out_units == [merged]
assert audit["split_units"] == []
assert len(audit["skipped_units"]) == 1
skip = audit["skipped_units"][0]
assert skip["reason"] == "no_beneficial_split"
assert skip["merged_source_section_ids"] == ["MOCK_S1", "MOCK_S2"]
assert audit["post_split_unit_count"] == 1
assert audit["post_split_layout_preset"] is None
# ─── Case 4 : Layout-cap keep-merged ─────────────────────────────────
def test_layout_cap_aborts_split_when_projected_count_exceeds_four():
"""Projected post-split count > 4 → ALL would-be splits aborted.
Setup: 1 single (non-target) + 1 merged-reject of 4 sections.
Initial unit count = 2. If split applied, post-split = 5 (> 4 cap).
Stage 2 Q2 default — keep merged, no partial split.
"""
other_single = _make_single_unit("MOCK_OTHER", label="use_as_is")
merged = _make_merged_unit(
merge_type="parent_merged",
source_section_ids=["MOCK_S1", "MOCK_S2", "MOCK_S3", "MOCK_S4"],
label="reject",
)
units = [other_single, merged]
sections = [
_StubSection("MOCK_OTHER", raw_content="other"),
_StubSection("MOCK_S1", raw_content="c1"),
_StubSection("MOCK_S2", raw_content="c2"),
_StubSection("MOCK_S3", raw_content="c3"),
_StubSection("MOCK_S4", raw_content="c4"),
]
lookup = _make_lookup({
"MOCK_OTHER": _StubV4Match("MOCK_TMPL_O", "MOCK_FRM_O", 0, 0.9, "use_as_is"),
# Beneficial in principle (some non-reject), but cap aborts.
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"),
"MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.8, "light_edit"),
"MOCK_S3": _StubV4Match("MOCK_TMPL_S3", "MOCK_FRM_S3", 3, 0.1, "reject"),
"MOCK_S4": _StubV4Match("MOCK_TMPL_S4", "MOCK_FRM_S4", 4, 0.1, "reject"),
})
out_units, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert audit["applied"] is False
assert out_units == units # byte-identical fallback for IMP-47B handoff
assert audit["split_units"] == []
assert len(audit["skipped_units"]) == 1
skip = audit["skipped_units"][0]
assert skip["reason"] == "layout_cap_exceeded"
assert skip["projected_post_split_count"] == 5
assert audit["post_split_unit_count"] == 2
assert audit["post_split_layout_preset"] is None
# ─── Case 5 : Override skip ──────────────────────────────────────────
def test_override_skip_short_circuits_before_detection():
"""``section_assignment_override=True`` (IMP-06 #6) makes the helper a no-op."""
merged = _make_merged_unit(
merge_type="parent_merged",
source_section_ids=["MOCK_S1", "MOCK_S2"],
label="reject",
)
units = [merged]
sections = [
_StubSection("MOCK_S1", raw_content="c1"),
_StubSection("MOCK_S2", raw_content="c2"),
]
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"),
"MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.8, "light_edit"),
})
out_units, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
section_assignment_override=True,
)
assert out_units == units # byte-identical
assert audit["applied"] is False
assert audit["skipped_reason"] == "section_assignment_override"
# Override is upstream of detection — never enumerates.
assert audit["detected_units"] == []
assert audit["split_units"] == []
assert audit["skipped_units"] == []
# ─── Case 6 : Coverage invariant — incomplete rebuild ────────────────
def test_incomplete_rebuild_keeps_merged_when_section_missing():
"""A merged unit referencing a section absent from ``sections`` →
``incomplete_rebuild`` skip with the missing id surfaced.
"""
merged = _make_merged_unit(
merge_type="parent_merged",
source_section_ids=["MOCK_S1", "MOCK_MISSING"],
label="reject",
)
units = [merged]
sections = [_StubSection("MOCK_S1", raw_content="c1")] # MOCK_MISSING absent
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"),
})
out_units, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert audit["applied"] is False
assert out_units == [merged]
assert audit["split_units"] == []
assert len(audit["skipped_units"]) == 1
skip = audit["skipped_units"][0]
assert skip["reason"] == "incomplete_rebuild"
assert skip["missing_section_ids"] == ["MOCK_MISSING"]
def test_incomplete_rebuild_keeps_merged_when_v4_match_missing():
"""A merged unit referencing a section without V4 evidence →
``incomplete_rebuild`` skip.
"""
merged = _make_merged_unit(
merge_type="parent_merged_inferred",
source_section_ids=["MOCK_S1", "MOCK_S2"],
label="reject",
)
units = [merged]
sections = [
_StubSection("MOCK_S1", raw_content="c1"),
_StubSection("MOCK_S2", raw_content="c2"),
]
lookup = _make_lookup({
# MOCK_S2 deliberately omitted to simulate missing V4 evidence.
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"),
})
out_units, audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert audit["applied"] is False
assert out_units == [merged]
skip = audit["skipped_units"][0]
assert skip["reason"] == "incomplete_rebuild"
assert skip["missing_section_ids"] == ["MOCK_S2"]
# ─── Case 7 : Idempotent re-entry ────────────────────────────────────
def test_idempotent_re_entry_is_noop_after_split():
"""Running the helper a second time on its own output detects nothing
(singles are excluded by construction). max_retry=1 (Stage 2 lock).
"""
merged = _make_merged_unit(
merge_type="parent_merged",
source_section_ids=["MOCK_S1", "MOCK_S2"],
label="reject",
)
units = [merged]
sections = [
_StubSection("MOCK_S1", raw_content="c1"),
_StubSection("MOCK_S2", raw_content="c2"),
]
lookup = _make_lookup({
"MOCK_S1": _StubV4Match("MOCK_TMPL_S1", "MOCK_FRM_S1", 1, 0.9, "use_as_is"),
"MOCK_S2": _StubV4Match("MOCK_TMPL_S2", "MOCK_FRM_S2", 2, 0.8, "light_edit"),
})
first_out, first_audit = resplit_all_reject_merges(
units, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert first_audit["applied"] is True
# Second pass over the post-resplit list — should be a no-op.
second_out, second_audit = resplit_all_reject_merges(
first_out, sections, lookup, _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert second_audit["applied"] is False
assert second_audit["detected_units"] == []
assert second_audit["skipped_reason"] == "no_detection"
assert second_out == first_out # output byte-identical
# ─── Case 8 : Audit shape invariants ─────────────────────────────────
def test_audit_payload_always_has_stage_1_keys():
"""Every return path must include the Stage 1 schema keys (additive only)."""
required = {
"applied",
"split_units",
"skipped_units",
"post_split_unit_count",
"post_split_layout_preset",
}
# 1) override skip
_, audit_override = resplit_all_reject_merges(
[], [], _make_lookup({}), _LABEL_TO_STATUS, _ALLOWED_STATUSES,
section_assignment_override=True,
)
assert required.issubset(audit_override)
# 2) no detection (empty units)
_, audit_empty = resplit_all_reject_merges(
[], [], _make_lookup({}), _LABEL_TO_STATUS, _ALLOWED_STATUSES,
)
assert required.issubset(audit_empty)
assert audit_empty["post_split_unit_count"] == 0
assert audit_empty["post_split_layout_preset"] is None
assert audit_empty["skipped_reason"] == "no_detection"
# 3) applied path — see test_beneficial_split_applied_when_one_child_non_reject
# already asserts the full applied shape.