feat(#76): IMP-47B reject-as-AI-adaptation activation (u1~u13 backend + tests)
- u1~u9: AI fallback infrastructure (router/prompts/schema/validator) + Step 12 hook - u10: e2e reject chain (writes final.html with AI-repaired slot, full coverage) - u11: frontend wiring deferred to follow-up commit (split from IMP-41 hunks) - u12: coverage_invariant guard - u13: cache save gate (visual_check PASS + user_approved/auto_cache) — Codex #22 verified Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -36,6 +36,7 @@ _ALLOWED_TOP_LEVEL: frozenset[str] = frozenset(
|
||||
"ast",
|
||||
"dataclasses",
|
||||
"enum",
|
||||
"hashlib",
|
||||
"json",
|
||||
"pathlib",
|
||||
"random",
|
||||
|
||||
@@ -1,32 +1,67 @@
|
||||
"""IMP-33 u6 — AI fallback cache gate tests.
|
||||
"""IMP-46 u2 — Persistent JSON cache backend tests.
|
||||
|
||||
Verifies the IMP-46 gate contract:
|
||||
* ``read_proposal`` is a stub (returns None until IMP-46).
|
||||
* ``save_proposal`` enforces both gates before any write attempt.
|
||||
* Storage itself raises NotImplementedError (IMP-46 marker).
|
||||
Scope (Stage 2 plan, u2):
|
||||
|
||||
* Replaced ``NotImplementedError`` marker with a real persistent backend
|
||||
at ``data/frame_cache/{frame_id}/{signature_hash}.json``.
|
||||
* Preserved IMP-33 u6 dual write gate: ``visual_check_passed`` AND
|
||||
``user_approved`` BOTH required (loud :class:`AiFallbackCacheGateError`
|
||||
before any filesystem touch).
|
||||
* Round-trip every :class:`ProposalKind`; round-trip ``slide_css`` None
|
||||
*and* set; missing or corrupt files miss silently.
|
||||
* Fingerprint *comparison* is u3; here we only check that the field is
|
||||
persisted.
|
||||
|
||||
All filesystem writes are scoped to ``tmp_path`` via
|
||||
``monkeypatch.setattr`` on the module-level :data:`CACHE_ROOT`, so the
|
||||
production directory is never touched by these tests.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import pathlib
|
||||
|
||||
import pytest
|
||||
|
||||
from src.phase_z2_ai_fallback import cache as cache_mod
|
||||
from src.phase_z2_ai_fallback.cache import (
|
||||
AiFallbackCacheGateError,
|
||||
KEY_DELIMITER,
|
||||
SCHEMA_VERSION,
|
||||
read_proposal,
|
||||
save_proposal,
|
||||
)
|
||||
from src.phase_z2_ai_fallback.schema import AiFallbackProposal, ProposalKind
|
||||
|
||||
|
||||
def _proposal() -> AiFallbackProposal:
|
||||
_FRAME_ID = "1171281190"
|
||||
_SIG_HASH = "a" * 64 # SHA256-shaped placeholder; cache is shape-agnostic.
|
||||
_KEY = f"{_FRAME_ID}{KEY_DELIMITER}{_SIG_HASH}"
|
||||
|
||||
|
||||
def _proposal(
|
||||
kind: ProposalKind = ProposalKind.BUILDER_OPTIONS_PATCH,
|
||||
payload: dict | None = None,
|
||||
) -> AiFallbackProposal:
|
||||
return AiFallbackProposal(
|
||||
proposal_kind=ProposalKind.BUILDER_OPTIONS_PATCH,
|
||||
payload={"item_parser": "bullet_v2"},
|
||||
rationale="u6-test",
|
||||
proposal_kind=kind,
|
||||
payload=payload if payload is not None else {"item_parser": "bullet_v2"},
|
||||
rationale="u2-test",
|
||||
)
|
||||
|
||||
|
||||
def test_read_proposal_returns_none_for_any_key():
|
||||
assert read_proposal("frame=foo|cardinality=3") is None
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolated_cache_root(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch):
|
||||
"""Redirect the cache root to an isolated tmp directory for every test."""
|
||||
monkeypatch.setattr(cache_mod, "CACHE_ROOT", tmp_path / "frame_cache")
|
||||
yield tmp_path / "frame_cache"
|
||||
|
||||
|
||||
# -- read_proposal --------------------------------------------------------
|
||||
|
||||
|
||||
def test_read_proposal_returns_none_for_missing_file():
|
||||
assert read_proposal(_KEY) is None
|
||||
|
||||
|
||||
def test_read_proposal_rejects_empty_key():
|
||||
@@ -34,10 +69,65 @@ def test_read_proposal_rejects_empty_key():
|
||||
read_proposal("")
|
||||
|
||||
|
||||
def test_read_proposal_rejects_non_string_key():
|
||||
with pytest.raises(ValueError):
|
||||
read_proposal(None) # type: ignore[arg-type]
|
||||
|
||||
|
||||
def test_read_proposal_returns_none_for_legacy_key_format():
|
||||
"""Router back-compat: pre-u4 cache_key (no '::') misses silently."""
|
||||
assert read_proposal("frame:1171281190:cardinality:many") is None
|
||||
|
||||
|
||||
def test_read_proposal_returns_none_for_corrupt_json(_isolated_cache_root: pathlib.Path):
|
||||
path = _isolated_cache_root / _FRAME_ID / f"{_SIG_HASH}.json"
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("{not valid json", encoding="utf-8")
|
||||
assert read_proposal(_KEY) is None
|
||||
|
||||
|
||||
def test_read_proposal_returns_none_for_non_dict_root(_isolated_cache_root: pathlib.Path):
|
||||
path = _isolated_cache_root / _FRAME_ID / f"{_SIG_HASH}.json"
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("[]", encoding="utf-8")
|
||||
assert read_proposal(_KEY) is None
|
||||
|
||||
|
||||
def test_read_proposal_returns_none_when_payload_proposal_missing(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
path = _isolated_cache_root / _FRAME_ID / f"{_SIG_HASH}.json"
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps({"schema_version": 1}), encoding="utf-8")
|
||||
assert read_proposal(_KEY) is None
|
||||
|
||||
|
||||
def test_read_proposal_returns_none_for_forbidden_proposal_kind(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
path = _isolated_cache_root / _FRAME_ID / f"{_SIG_HASH}.json"
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"schema_version": 1,
|
||||
"proposal": {"proposal_kind": "mdx_text", "payload": {}, "rationale": ""},
|
||||
"slide_css": None,
|
||||
"fingerprints": {},
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
assert read_proposal(_KEY) is None
|
||||
|
||||
|
||||
# -- save_proposal: write gates -------------------------------------------
|
||||
|
||||
|
||||
def test_save_rejects_when_visual_check_failed():
|
||||
with pytest.raises(AiFallbackCacheGateError) as exc:
|
||||
save_proposal(
|
||||
"k", _proposal(), visual_check_passed=False, user_approved=True
|
||||
_KEY, _proposal(), visual_check_passed=False, user_approved=True
|
||||
)
|
||||
assert "visual_check_passed" in str(exc.value)
|
||||
|
||||
@@ -45,7 +135,7 @@ def test_save_rejects_when_visual_check_failed():
|
||||
def test_save_rejects_when_user_not_approved():
|
||||
with pytest.raises(AiFallbackCacheGateError) as exc:
|
||||
save_proposal(
|
||||
"k", _proposal(), visual_check_passed=True, user_approved=False
|
||||
_KEY, _proposal(), visual_check_passed=True, user_approved=False
|
||||
)
|
||||
assert "user_approved" in str(exc.value)
|
||||
|
||||
@@ -53,16 +143,20 @@ def test_save_rejects_when_user_not_approved():
|
||||
def test_save_rejects_when_both_gates_false():
|
||||
with pytest.raises(AiFallbackCacheGateError):
|
||||
save_proposal(
|
||||
"k", _proposal(), visual_check_passed=False, user_approved=False
|
||||
_KEY, _proposal(), visual_check_passed=False, user_approved=False
|
||||
)
|
||||
|
||||
|
||||
def test_save_raises_not_implemented_when_both_gates_pass():
|
||||
with pytest.raises(NotImplementedError) as exc:
|
||||
def test_save_gate_violation_does_not_touch_filesystem(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
with pytest.raises(AiFallbackCacheGateError):
|
||||
save_proposal(
|
||||
"k", _proposal(), visual_check_passed=True, user_approved=True
|
||||
_KEY, _proposal(), visual_check_passed=False, user_approved=True
|
||||
)
|
||||
assert "IMP-46" in str(exc.value)
|
||||
# Cache root may or may not exist depending on fixture order, but the
|
||||
# frame_id directory must NOT exist when the gate rejects the write.
|
||||
assert not (_isolated_cache_root / _FRAME_ID).exists()
|
||||
|
||||
|
||||
def test_save_rejects_empty_key():
|
||||
@@ -75,16 +169,340 @@ def test_save_rejects_empty_key():
|
||||
def test_save_rejects_non_proposal_object():
|
||||
with pytest.raises(TypeError):
|
||||
save_proposal(
|
||||
"k",
|
||||
_KEY,
|
||||
{"proposal_kind": "builder_options_patch"}, # type: ignore[arg-type]
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
)
|
||||
|
||||
|
||||
def test_gate_error_is_not_notimplementederror():
|
||||
with pytest.raises(AiFallbackCacheGateError):
|
||||
def test_save_rejects_legacy_key_format():
|
||||
"""Writes must use the structural ``frame_id::signature_hash`` form."""
|
||||
with pytest.raises(ValueError):
|
||||
save_proposal(
|
||||
"k", _proposal(), visual_check_passed=False, user_approved=True
|
||||
"frame:1171281190:cardinality:many",
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
)
|
||||
|
||||
|
||||
def test_save_rejects_slide_css_non_string():
|
||||
with pytest.raises(TypeError):
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
slide_css=123, # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
|
||||
def test_save_rejects_fingerprints_non_dict():
|
||||
with pytest.raises(TypeError):
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints=["contract_sha", "abc"], # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
|
||||
def test_gate_error_is_not_notimplementederror():
|
||||
"""The persistent backend no longer raises ``NotImplementedError`` —
|
||||
callers must distinguish gate violation from absent persistence."""
|
||||
assert not issubclass(AiFallbackCacheGateError, NotImplementedError)
|
||||
|
||||
|
||||
# -- save_proposal: persistence + round-trip ------------------------------
|
||||
|
||||
|
||||
def test_save_creates_parent_directories(_isolated_cache_root: pathlib.Path):
|
||||
assert not (_isolated_cache_root / _FRAME_ID).exists()
|
||||
save_proposal(
|
||||
_KEY, _proposal(), visual_check_passed=True, user_approved=True
|
||||
)
|
||||
assert (_isolated_cache_root / _FRAME_ID / f"{_SIG_HASH}.json").is_file()
|
||||
|
||||
|
||||
def test_save_returns_resolved_path(_isolated_cache_root: pathlib.Path):
|
||||
path = save_proposal(
|
||||
_KEY, _proposal(), visual_check_passed=True, user_approved=True
|
||||
)
|
||||
assert path == _isolated_cache_root / _FRAME_ID / f"{_SIG_HASH}.json"
|
||||
|
||||
|
||||
def test_save_payload_includes_schema_version(_isolated_cache_root: pathlib.Path):
|
||||
path = save_proposal(
|
||||
_KEY, _proposal(), visual_check_passed=True, user_approved=True
|
||||
)
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
assert data["schema_version"] == SCHEMA_VERSION
|
||||
|
||||
|
||||
def test_save_payload_includes_proposal_dump(_isolated_cache_root: pathlib.Path):
|
||||
proposal = _proposal(payload={"item_parser": "pillar_item"})
|
||||
path = save_proposal(
|
||||
_KEY, proposal, visual_check_passed=True, user_approved=True
|
||||
)
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
assert data["proposal"] == proposal.model_dump(mode="json")
|
||||
|
||||
|
||||
def test_round_trip_default_slide_css_is_none(_isolated_cache_root: pathlib.Path):
|
||||
path = save_proposal(
|
||||
_KEY, _proposal(), visual_check_passed=True, user_approved=True
|
||||
)
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
assert data["slide_css"] is None
|
||||
assert data["fingerprints"] == {}
|
||||
|
||||
|
||||
def test_round_trip_with_slide_css_set(_isolated_cache_root: pathlib.Path):
|
||||
css = ".slide { padding: 40px; }"
|
||||
path = save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
slide_css=css,
|
||||
)
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
assert data["slide_css"] == css
|
||||
|
||||
|
||||
def test_round_trip_with_fingerprints(_isolated_cache_root: pathlib.Path):
|
||||
fingerprints = {
|
||||
"contract_sha": "c" * 64,
|
||||
"partial_sha": "p" * 64,
|
||||
"catalog_sha": "x" * 64,
|
||||
}
|
||||
path = save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints=fingerprints,
|
||||
)
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
assert data["fingerprints"] == fingerprints
|
||||
|
||||
|
||||
def test_read_returns_proposal_after_save(_isolated_cache_root: pathlib.Path):
|
||||
original = _proposal(payload={"key": "value"})
|
||||
save_proposal(
|
||||
_KEY, original, visual_check_passed=True, user_approved=True
|
||||
)
|
||||
loaded = read_proposal(_KEY)
|
||||
assert loaded is not None
|
||||
assert loaded.proposal_kind == original.proposal_kind
|
||||
assert loaded.payload == original.payload
|
||||
assert loaded.rationale == original.rationale
|
||||
|
||||
|
||||
@pytest.mark.parametrize("kind", list(ProposalKind))
|
||||
def test_round_trip_all_proposal_kinds(
|
||||
kind: ProposalKind, _isolated_cache_root: pathlib.Path
|
||||
):
|
||||
"""Every whitelisted ProposalKind survives save → read unchanged."""
|
||||
if kind is ProposalKind.PARTIAL_OVERRIDES:
|
||||
payload = {"slots": {"pillar_1": "alpha"}}
|
||||
elif kind is ProposalKind.SLOT_MAPPING_PROPOSAL:
|
||||
payload = {"mapping": [{"from": "a", "to": "b"}]}
|
||||
else:
|
||||
payload = {"item_parser": "bullet_v2"}
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(kind=kind, payload=payload),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
)
|
||||
loaded = read_proposal(_KEY)
|
||||
assert loaded is not None
|
||||
assert loaded.proposal_kind is kind
|
||||
assert loaded.payload == payload
|
||||
|
||||
|
||||
def test_save_overwrites_existing_entry(_isolated_cache_root: pathlib.Path):
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(payload={"v": 1}),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
)
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(payload={"v": 2}),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
)
|
||||
loaded = read_proposal(_KEY)
|
||||
assert loaded is not None
|
||||
assert loaded.payload == {"v": 2}
|
||||
|
||||
|
||||
def test_file_layout_uses_frame_id_directory(_isolated_cache_root: pathlib.Path):
|
||||
"""Storage layout = ``frame_id/`` directory, ``signature_hash.json`` file."""
|
||||
other_frame_key = f"{_FRAME_ID}_other{KEY_DELIMITER}{_SIG_HASH}"
|
||||
save_proposal(
|
||||
_KEY, _proposal(), visual_check_passed=True, user_approved=True
|
||||
)
|
||||
save_proposal(
|
||||
other_frame_key,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
)
|
||||
assert (_isolated_cache_root / _FRAME_ID / f"{_SIG_HASH}.json").is_file()
|
||||
assert (
|
||||
_isolated_cache_root / f"{_FRAME_ID}_other" / f"{_SIG_HASH}.json"
|
||||
).is_file()
|
||||
|
||||
|
||||
def test_different_signature_hashes_isolated(_isolated_cache_root: pathlib.Path):
|
||||
"""Two distinct signature hashes under the same frame_id never collide."""
|
||||
key_a = f"{_FRAME_ID}{KEY_DELIMITER}{'a' * 64}"
|
||||
key_b = f"{_FRAME_ID}{KEY_DELIMITER}{'b' * 64}"
|
||||
save_proposal(
|
||||
key_a,
|
||||
_proposal(payload={"sig": "a"}),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
)
|
||||
save_proposal(
|
||||
key_b,
|
||||
_proposal(payload={"sig": "b"}),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
)
|
||||
loaded_a = read_proposal(key_a)
|
||||
loaded_b = read_proposal(key_b)
|
||||
assert loaded_a is not None and loaded_a.payload == {"sig": "a"}
|
||||
assert loaded_b is not None and loaded_b.payload == {"sig": "b"}
|
||||
|
||||
|
||||
def test_parse_key_rejects_triple_delimiter():
|
||||
"""Two ``::`` markers (extra delimiter inside signature) is rejected."""
|
||||
assert (
|
||||
read_proposal(
|
||||
f"{_FRAME_ID}{KEY_DELIMITER}{_SIG_HASH}{KEY_DELIMITER}extra"
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
# -- IMP-46 u5: auto_cache gate (2^3 truth table) -------------------------
|
||||
#
|
||||
# Three booleans: visual_check_passed (V), user_approved (U), auto_cache (A).
|
||||
# Contract: V=True AND (U=True OR A=True) -> persist; else gate-raise.
|
||||
# V is never bypassable; A=True only relaxes U=False.
|
||||
|
||||
_GATE_TRUTH_TABLE = [
|
||||
# (V, U, A, expect_persist)
|
||||
(False, False, False, False),
|
||||
(False, False, True, False),
|
||||
(False, True, False, False),
|
||||
(False, True, True, False),
|
||||
(True, False, False, False),
|
||||
(True, False, True, True),
|
||||
(True, True, False, True),
|
||||
(True, True, True, True),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("v,u,a,expect_persist", _GATE_TRUTH_TABLE)
|
||||
def test_save_gate_truth_table(
|
||||
v: bool,
|
||||
u: bool,
|
||||
a: bool,
|
||||
expect_persist: bool,
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
) -> None:
|
||||
"""IMP-46 u5 — exhaustive 2^3 enumeration of (V, U, A) -> {persist, raise}."""
|
||||
if expect_persist:
|
||||
path = save_proposal(
|
||||
_KEY,
|
||||
_proposal(payload={"v": int(v), "u": int(u), "a": int(a)}),
|
||||
visual_check_passed=v,
|
||||
user_approved=u,
|
||||
auto_cache=a,
|
||||
)
|
||||
assert path.is_file(), f"truth row (V={v}, U={u}, A={a}) must persist"
|
||||
else:
|
||||
with pytest.raises(AiFallbackCacheGateError):
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=v,
|
||||
user_approved=u,
|
||||
auto_cache=a,
|
||||
)
|
||||
# Gate violations must never touch the filesystem (parent dir absent).
|
||||
assert not (_isolated_cache_root / _FRAME_ID).exists(), (
|
||||
f"truth row (V={v}, U={u}, A={a}) leaked a directory"
|
||||
)
|
||||
|
||||
|
||||
def test_auto_cache_default_off_preserves_dual_gate_semantics(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
) -> None:
|
||||
"""Calling save_proposal without ``auto_cache`` keeps the IMP-46 u2 behaviour."""
|
||||
with pytest.raises(AiFallbackCacheGateError) as exc:
|
||||
save_proposal(
|
||||
_KEY, _proposal(), visual_check_passed=True, user_approved=False
|
||||
)
|
||||
assert "user_approved" in str(exc.value)
|
||||
assert not (_isolated_cache_root / _FRAME_ID).exists()
|
||||
|
||||
|
||||
def test_auto_cache_cannot_bypass_visual_check() -> None:
|
||||
"""``visual_check_passed=False`` raises even with ``auto_cache=True``."""
|
||||
with pytest.raises(AiFallbackCacheGateError) as exc:
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=False,
|
||||
user_approved=True,
|
||||
auto_cache=True,
|
||||
)
|
||||
assert "visual_check_passed" in str(exc.value)
|
||||
|
||||
|
||||
def test_auto_cache_bypass_user_approved_persists(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
) -> None:
|
||||
"""``auto_cache=True`` with ``user_approved=False`` persists the proposal."""
|
||||
path = save_proposal(
|
||||
_KEY,
|
||||
_proposal(payload={"bypass": "user"}),
|
||||
visual_check_passed=True,
|
||||
user_approved=False,
|
||||
auto_cache=True,
|
||||
)
|
||||
assert path.is_file()
|
||||
loaded = read_proposal(_KEY)
|
||||
assert loaded is not None
|
||||
assert loaded.payload == {"bypass": "user"}
|
||||
|
||||
|
||||
def test_auto_cache_rejects_non_bool() -> None:
|
||||
"""``auto_cache`` must be a bool (loud TypeError, symmetric with other kwargs)."""
|
||||
with pytest.raises(TypeError):
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
auto_cache="yes", # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
|
||||
def test_auto_cache_is_keyword_only() -> None:
|
||||
"""``auto_cache`` must be passed by keyword (positional rejected)."""
|
||||
import inspect
|
||||
|
||||
sig = inspect.signature(save_proposal)
|
||||
param = sig.parameters["auto_cache"]
|
||||
assert param.kind is inspect.Parameter.KEYWORD_ONLY
|
||||
assert param.default is False
|
||||
|
||||
347
tests/phase_z2_ai_fallback/test_cache_invalidation.py
Normal file
347
tests/phase_z2_ai_fallback/test_cache_invalidation.py
Normal file
@@ -0,0 +1,347 @@
|
||||
"""IMP-46 u3 — Fingerprint-based cache invalidation tests.
|
||||
|
||||
Scope (Stage 2 plan, u3):
|
||||
|
||||
* ``save_proposal`` persists ``fingerprints`` verbatim (u2 already covers
|
||||
the round-trip; this suite re-asserts the read-side comparator).
|
||||
* ``read_proposal`` accepts an optional ``fingerprints`` kwarg. When
|
||||
supplied, the stored dict must equal the supplied dict EXACTLY (strict
|
||||
equality). Mismatch — including missing keys, extra keys, or value
|
||||
drift — returns ``None``.
|
||||
* Default ``fingerprints=None`` performs no comparison (back-compat for
|
||||
legacy callers).
|
||||
* Fingerprint *computation* stays outside ``cache.py`` — these tests
|
||||
treat the three declared shas (``contract_sha`` / ``partial_sha`` /
|
||||
``catalog_sha``) as opaque hex strings, never recomputing them. The
|
||||
cache layer is a content-addressed *comparator*, not a content
|
||||
*hasher*.
|
||||
|
||||
All filesystem writes are scoped to ``tmp_path`` via
|
||||
``monkeypatch.setattr`` on the module-level :data:`CACHE_ROOT`.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import pathlib
|
||||
|
||||
import pytest
|
||||
|
||||
from src.phase_z2_ai_fallback import cache as cache_mod
|
||||
from src.phase_z2_ai_fallback.cache import (
|
||||
KEY_DELIMITER,
|
||||
read_proposal,
|
||||
save_proposal,
|
||||
)
|
||||
from src.phase_z2_ai_fallback.schema import AiFallbackProposal, ProposalKind
|
||||
|
||||
|
||||
_FRAME_ID = "1171281190"
|
||||
_SIG_HASH = "f" * 64
|
||||
_KEY = f"{_FRAME_ID}{KEY_DELIMITER}{_SIG_HASH}"
|
||||
|
||||
_FINGERPRINTS_BASELINE: dict[str, str] = {
|
||||
"contract_sha": "c" * 64,
|
||||
"partial_sha": "p" * 64,
|
||||
"catalog_sha": "x" * 64,
|
||||
}
|
||||
|
||||
|
||||
def _proposal(payload: dict | None = None) -> AiFallbackProposal:
|
||||
return AiFallbackProposal(
|
||||
proposal_kind=ProposalKind.BUILDER_OPTIONS_PATCH,
|
||||
payload=payload if payload is not None else {"item_parser": "bullet_v2"},
|
||||
rationale="u3-test",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolated_cache_root(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr(cache_mod, "CACHE_ROOT", tmp_path / "frame_cache")
|
||||
yield tmp_path / "frame_cache"
|
||||
|
||||
|
||||
# -- save side: fingerprints persisted verbatim ---------------------------
|
||||
|
||||
|
||||
def test_save_persists_fingerprints_verbatim(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
path = save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints=_FINGERPRINTS_BASELINE,
|
||||
)
|
||||
stored = json.loads(path.read_text(encoding="utf-8"))["fingerprints"]
|
||||
assert stored == _FINGERPRINTS_BASELINE
|
||||
|
||||
|
||||
# -- read side: back-compat (no fingerprints kwarg) -----------------------
|
||||
|
||||
|
||||
def test_read_without_fingerprints_kwarg_returns_proposal(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
"""Legacy read path (no kwarg) skips invalidation — round-trip succeeds."""
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints=_FINGERPRINTS_BASELINE,
|
||||
)
|
||||
loaded = read_proposal(_KEY)
|
||||
assert loaded is not None
|
||||
assert loaded.payload == {"item_parser": "bullet_v2"}
|
||||
|
||||
|
||||
def test_read_without_fingerprints_kwarg_ignores_stored_mismatch(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
"""A caller that has not adopted fingerprint-aware lookup must still
|
||||
see the proposal — invalidation only kicks in when explicitly asked."""
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints={"contract_sha": "old"},
|
||||
)
|
||||
loaded = read_proposal(_KEY)
|
||||
assert loaded is not None
|
||||
|
||||
|
||||
# -- read side: matching fingerprints -------------------------------------
|
||||
|
||||
|
||||
def test_read_with_matching_fingerprints_returns_proposal(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints=_FINGERPRINTS_BASELINE,
|
||||
)
|
||||
loaded = read_proposal(_KEY, fingerprints=dict(_FINGERPRINTS_BASELINE))
|
||||
assert loaded is not None
|
||||
assert loaded.proposal_kind is ProposalKind.BUILDER_OPTIONS_PATCH
|
||||
|
||||
|
||||
def test_read_with_empty_fingerprints_matches_empty_stored(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
"""Both sides empty is an exact match, not a special-case None."""
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
# default fingerprints=None → stored as {}
|
||||
)
|
||||
loaded = read_proposal(_KEY, fingerprints={})
|
||||
assert loaded is not None
|
||||
|
||||
|
||||
# -- read side: invalidation on mismatch ----------------------------------
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"drifted_axis",
|
||||
["contract_sha", "partial_sha", "catalog_sha"],
|
||||
)
|
||||
def test_read_invalidates_on_single_axis_drift(
|
||||
drifted_axis: str, _isolated_cache_root: pathlib.Path
|
||||
):
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints=_FINGERPRINTS_BASELINE,
|
||||
)
|
||||
supplied = dict(_FINGERPRINTS_BASELINE)
|
||||
supplied[drifted_axis] = "deadbeef" * 8 # 64-char distinct value
|
||||
assert read_proposal(_KEY, fingerprints=supplied) is None
|
||||
|
||||
|
||||
def test_read_invalidates_when_caller_supplies_extra_key(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
"""Strict equality — extra key on caller side is a mismatch."""
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints=_FINGERPRINTS_BASELINE,
|
||||
)
|
||||
supplied = dict(_FINGERPRINTS_BASELINE)
|
||||
supplied["future_axis_sha"] = "z" * 64
|
||||
assert read_proposal(_KEY, fingerprints=supplied) is None
|
||||
|
||||
|
||||
def test_read_invalidates_when_caller_supplies_subset(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
"""Strict equality — subset on caller side is a mismatch."""
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints=_FINGERPRINTS_BASELINE,
|
||||
)
|
||||
subset = {"contract_sha": _FINGERPRINTS_BASELINE["contract_sha"]}
|
||||
assert read_proposal(_KEY, fingerprints=subset) is None
|
||||
|
||||
|
||||
def test_read_invalidates_when_entry_saved_without_fingerprints(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
"""A pre-invalidation cache entry (empty stored fingerprints) MUST NOT
|
||||
satisfy a fingerprint-aware lookup — caller demands proof of freshness."""
|
||||
save_proposal(
|
||||
_KEY,
|
||||
_proposal(),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
# default fingerprints=None → stored as {}
|
||||
)
|
||||
assert read_proposal(_KEY, fingerprints=_FINGERPRINTS_BASELINE) is None
|
||||
|
||||
|
||||
def test_read_invalidates_when_stored_fingerprints_not_dict(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
"""Hand-corrupted payload (fingerprints serialized as non-dict) → None."""
|
||||
path = _isolated_cache_root / _FRAME_ID / f"{_SIG_HASH}.json"
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"schema_version": 1,
|
||||
"proposal": _proposal().model_dump(mode="json"),
|
||||
"slide_css": None,
|
||||
"fingerprints": ["contract_sha", "c" * 64],
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
assert read_proposal(_KEY, fingerprints=_FINGERPRINTS_BASELINE) is None
|
||||
|
||||
|
||||
def test_read_invalidates_when_stored_fingerprints_field_missing(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
"""Legacy payload (no ``fingerprints`` field at all) → None when caller
|
||||
demands fingerprint comparison."""
|
||||
path = _isolated_cache_root / _FRAME_ID / f"{_SIG_HASH}.json"
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"schema_version": 1,
|
||||
"proposal": _proposal().model_dump(mode="json"),
|
||||
"slide_css": None,
|
||||
# fingerprints field deliberately omitted
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
assert read_proposal(_KEY, fingerprints={"contract_sha": "c" * 64}) is None
|
||||
|
||||
|
||||
def test_read_with_matching_fingerprints_still_loses_to_missing_file():
|
||||
"""File missing takes precedence over fingerprint check — no false hit."""
|
||||
assert read_proposal(_KEY, fingerprints=_FINGERPRINTS_BASELINE) is None
|
||||
|
||||
|
||||
def test_read_with_matching_fingerprints_still_loses_to_corrupt_json(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
path = _isolated_cache_root / _FRAME_ID / f"{_SIG_HASH}.json"
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("{not valid json", encoding="utf-8")
|
||||
assert read_proposal(_KEY, fingerprints=_FINGERPRINTS_BASELINE) is None
|
||||
|
||||
|
||||
# -- read side: input validation symmetry with save -----------------------
|
||||
|
||||
|
||||
def test_read_rejects_non_dict_fingerprints():
|
||||
with pytest.raises(TypeError):
|
||||
read_proposal(_KEY, fingerprints=["contract_sha", "c" * 64]) # type: ignore[arg-type]
|
||||
|
||||
|
||||
def test_read_rejects_non_dict_fingerprints_string():
|
||||
with pytest.raises(TypeError):
|
||||
read_proposal(_KEY, fingerprints="contract_sha=c" * 8) # type: ignore[arg-type]
|
||||
|
||||
|
||||
def test_read_rejects_non_dict_fingerprints_int():
|
||||
with pytest.raises(TypeError):
|
||||
read_proposal(_KEY, fingerprints=42) # type: ignore[arg-type]
|
||||
|
||||
|
||||
# -- isolation: cache.py never computes fingerprints ----------------------
|
||||
|
||||
|
||||
def test_cache_module_has_no_fingerprint_computer():
|
||||
"""Guardrail: cache.py is a *comparator*, not a *hasher*. The three
|
||||
declared shas are computed outside this module (step 12 / pipeline
|
||||
glue). Adding a fingerprint computer here would leak Phase Z runtime
|
||||
knowledge into the cache layer and violate AI isolation."""
|
||||
public_surface = [
|
||||
name
|
||||
for name in dir(cache_mod)
|
||||
if not name.startswith("_") and callable(getattr(cache_mod, name))
|
||||
]
|
||||
forbidden_substrings = ("hash", "sha", "fingerprint")
|
||||
leaks = [
|
||||
name
|
||||
for name in public_surface
|
||||
if any(sub in name.lower() for sub in forbidden_substrings)
|
||||
]
|
||||
assert leaks == [], (
|
||||
f"cache.py public surface leaks fingerprint computation: {leaks}; "
|
||||
"computation must live outside cache.py per IMP-46 u3 contract."
|
||||
)
|
||||
|
||||
|
||||
# -- isolation across distinct fingerprint sets ---------------------------
|
||||
|
||||
|
||||
def test_distinct_fingerprint_sets_isolated_per_signature(
|
||||
_isolated_cache_root: pathlib.Path,
|
||||
):
|
||||
"""Two entries under different signature hashes keep their own
|
||||
fingerprints; reading one with the other's fingerprints misses."""
|
||||
key_a = f"{_FRAME_ID}{KEY_DELIMITER}{'a' * 64}"
|
||||
key_b = f"{_FRAME_ID}{KEY_DELIMITER}{'b' * 64}"
|
||||
fps_a = {"contract_sha": "a" * 64}
|
||||
fps_b = {"contract_sha": "b" * 64}
|
||||
save_proposal(
|
||||
key_a,
|
||||
_proposal(payload={"sig": "a"}),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints=fps_a,
|
||||
)
|
||||
save_proposal(
|
||||
key_b,
|
||||
_proposal(payload={"sig": "b"}),
|
||||
visual_check_passed=True,
|
||||
user_approved=True,
|
||||
fingerprints=fps_b,
|
||||
)
|
||||
# Crossed lookups miss.
|
||||
assert read_proposal(key_a, fingerprints=fps_b) is None
|
||||
assert read_proposal(key_b, fingerprints=fps_a) is None
|
||||
# Aligned lookups hit.
|
||||
a_hit = read_proposal(key_a, fingerprints=fps_a)
|
||||
b_hit = read_proposal(key_b, fingerprints=fps_b)
|
||||
assert a_hit is not None and a_hit.payload == {"sig": "a"}
|
||||
assert b_hit is not None and b_hit.payload == {"sig": "b"}
|
||||
93
tests/phase_z2_ai_fallback/test_cache_repo_layout.py
Normal file
93
tests/phase_z2_ai_fallback/test_cache_repo_layout.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""IMP-46 u6 — repository layout coverage for the persistent frame cache.
|
||||
|
||||
This module is a *layout* contract test, not a runtime test. It asserts the
|
||||
files committed to source control that make ``data/frame_cache/`` exist on a
|
||||
fresh checkout while keeping cached JSON payloads ignored by git:
|
||||
|
||||
* ``data/frame_cache/.gitkeep`` is tracked (so the cache root exists for a
|
||||
fresh clone before any AI fallback run materialises payloads).
|
||||
* ``.gitignore`` ignores ``data/*`` broadly, re-includes the
|
||||
``data/frame_cache/`` directory, ignores its contents, and re-includes
|
||||
``data/frame_cache/.gitkeep`` so cache payloads under
|
||||
``data/frame_cache/{frame_id}/{signature_hash}.json`` remain ignored.
|
||||
|
||||
If somebody removes the ``.gitkeep`` marker, drops the negation lines from
|
||||
``.gitignore``, or commits a real cache payload, this test fails. The cache
|
||||
module surface (cache.py) is exercised by ``test_cache.py`` /
|
||||
``test_cache_invalidation.py`` and is intentionally *not* re-asserted here —
|
||||
this file is the layout-only lock that Stage 2 u6 declared.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[2]
|
||||
GITIGNORE_PATH = REPO_ROOT / ".gitignore"
|
||||
CACHE_ROOT = REPO_ROOT / "data" / "frame_cache"
|
||||
GITKEEP_PATH = CACHE_ROOT / ".gitkeep"
|
||||
|
||||
|
||||
def _gitignore_lines() -> list[str]:
|
||||
assert GITIGNORE_PATH.is_file(), f".gitignore missing at {GITIGNORE_PATH}"
|
||||
text = GITIGNORE_PATH.read_text(encoding="utf-8")
|
||||
return [line.strip() for line in text.splitlines()]
|
||||
|
||||
|
||||
def test_frame_cache_root_directory_exists() -> None:
|
||||
"""``data/frame_cache/`` must exist on disk as the cache root."""
|
||||
assert CACHE_ROOT.is_dir(), (
|
||||
f"frame cache root missing: {CACHE_ROOT}. The directory must exist "
|
||||
"for save_proposal to write JSON payloads without first conjuring a "
|
||||
"parent on demand from outside the cache module."
|
||||
)
|
||||
|
||||
|
||||
def test_gitkeep_marker_is_tracked_file() -> None:
|
||||
"""``data/frame_cache/.gitkeep`` is the marker that keeps the dir tracked."""
|
||||
assert GITKEEP_PATH.is_file(), (
|
||||
f".gitkeep marker missing: {GITKEEP_PATH}. Without it the cache root "
|
||||
"would disappear on a fresh clone (everything under data/ is "
|
||||
"ignored by default)."
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"rule",
|
||||
[
|
||||
# Broad ignore for everything under data/ (cache payloads, runs/, etc.).
|
||||
"data/*",
|
||||
# Re-include the frame_cache directory itself so child negations work.
|
||||
"!data/frame_cache/",
|
||||
# Ignore everything inside frame_cache/ (cached JSON payloads).
|
||||
"data/frame_cache/*",
|
||||
# Re-include the .gitkeep marker only.
|
||||
"!data/frame_cache/.gitkeep",
|
||||
],
|
||||
)
|
||||
def test_gitignore_contains_frame_cache_exception(rule: str) -> None:
|
||||
"""The four ignore rules together pin the 'track marker only' contract."""
|
||||
lines = _gitignore_lines()
|
||||
assert rule in lines, (
|
||||
f".gitignore missing IMP-46 u6 rule: {rule!r}. The four-line block "
|
||||
"(data/*, !data/frame_cache/, data/frame_cache/*, "
|
||||
"!data/frame_cache/.gitkeep) together ensure the cache root is "
|
||||
"tracked while cached payloads remain ignored."
|
||||
)
|
||||
|
||||
|
||||
def test_gitignore_rule_order_keeps_payloads_ignored() -> None:
|
||||
"""Rule order matters: the ``data/frame_cache/*`` re-ignore must come
|
||||
AFTER the ``!data/frame_cache/`` directory re-include, otherwise the
|
||||
re-include would shadow it and cached JSON payloads would be tracked."""
|
||||
lines = _gitignore_lines()
|
||||
reinclude_dir = lines.index("!data/frame_cache/")
|
||||
reignore_contents = lines.index("data/frame_cache/*")
|
||||
reinclude_marker = lines.index("!data/frame_cache/.gitkeep")
|
||||
assert reinclude_dir < reignore_contents < reinclude_marker, (
|
||||
"gitignore IMP-46 u6 block out of order: expected "
|
||||
"'!data/frame_cache/' < 'data/frame_cache/*' < "
|
||||
"'!data/frame_cache/.gitkeep' so cached payloads stay ignored while "
|
||||
"only the marker is tracked."
|
||||
)
|
||||
184
tests/phase_z2_ai_fallback/test_signature.py
Normal file
184
tests/phase_z2_ai_fallback/test_signature.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""IMP-46 u1 — Frame cache signature builder tests.
|
||||
|
||||
Verifies:
|
||||
* Determinism — identical inputs yield the same SHA256 digest.
|
||||
* Axis-change sensitivity — every one of the 8 declared axes mutates the
|
||||
digest when changed in isolation.
|
||||
* Public surface — only the 8 declared axes are accepted (no
|
||||
sample/section identifier leakage).
|
||||
* char_count bucket boundaries (0-50, 51-150, 151-400, 401-1000, 1001+).
|
||||
* source_shape enum equivalence (string and SourceShape inputs match).
|
||||
* schema_version is part of the hashed payload (digest stable for fixture).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
|
||||
import pytest
|
||||
|
||||
from src.phase_z2_ai_fallback.signature import (
|
||||
CHAR_COUNT_BUCKET_LABELS,
|
||||
SCHEMA_VERSION,
|
||||
SourceShape,
|
||||
bucket_char_count,
|
||||
build_signature,
|
||||
)
|
||||
|
||||
|
||||
def _base_kwargs() -> dict:
|
||||
return dict(
|
||||
frame_id="frame_03",
|
||||
v4_label="light_edit",
|
||||
cardinality=3,
|
||||
source_shape=SourceShape.BULLET,
|
||||
h3_count=2,
|
||||
char_count_bucket="51-150",
|
||||
layout_preset="sidebar-right",
|
||||
zone_position="top",
|
||||
)
|
||||
|
||||
|
||||
def test_schema_version_is_one() -> None:
|
||||
assert SCHEMA_VERSION == 1
|
||||
|
||||
|
||||
def test_bucket_labels_match_spec() -> None:
|
||||
assert CHAR_COUNT_BUCKET_LABELS == (
|
||||
"0-50",
|
||||
"51-150",
|
||||
"151-400",
|
||||
"401-1000",
|
||||
"1001+",
|
||||
)
|
||||
|
||||
|
||||
def test_signature_is_deterministic() -> None:
|
||||
a = build_signature(**_base_kwargs())
|
||||
b = build_signature(**_base_kwargs())
|
||||
assert a == b
|
||||
assert len(a) == 64
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"axis, new_value",
|
||||
[
|
||||
("frame_id", "frame_04"),
|
||||
("v4_label", "restructure"),
|
||||
("cardinality", 5),
|
||||
("source_shape", SourceShape.PARAGRAPH),
|
||||
("h3_count", 3),
|
||||
("char_count_bucket", "151-400"),
|
||||
("layout_preset", "two-column"),
|
||||
("zone_position", "bottom_l"),
|
||||
],
|
||||
)
|
||||
def test_signature_changes_for_each_axis(axis: str, new_value: object) -> None:
|
||||
base = build_signature(**_base_kwargs())
|
||||
kwargs = _base_kwargs()
|
||||
kwargs[axis] = new_value
|
||||
assert build_signature(**kwargs) != base
|
||||
|
||||
|
||||
def test_signature_accepts_string_source_shape() -> None:
|
||||
enum_sig = build_signature(**_base_kwargs())
|
||||
kwargs = _base_kwargs()
|
||||
kwargs["source_shape"] = "bullet"
|
||||
assert build_signature(**kwargs) == enum_sig
|
||||
|
||||
|
||||
def test_signature_rejects_unknown_source_shape() -> None:
|
||||
kwargs = _base_kwargs()
|
||||
kwargs["source_shape"] = "nonsense"
|
||||
with pytest.raises(ValueError):
|
||||
build_signature(**kwargs)
|
||||
|
||||
|
||||
def test_signature_rejects_unknown_char_count_bucket() -> None:
|
||||
kwargs = _base_kwargs()
|
||||
kwargs["char_count_bucket"] = "999-1234"
|
||||
with pytest.raises(ValueError):
|
||||
build_signature(**kwargs)
|
||||
|
||||
|
||||
def test_signature_handles_none_cardinality() -> None:
|
||||
kwargs = _base_kwargs()
|
||||
kwargs["cardinality"] = None
|
||||
sig = build_signature(**kwargs)
|
||||
assert len(sig) == 64
|
||||
kwargs2 = _base_kwargs()
|
||||
kwargs2["cardinality"] = 0
|
||||
assert build_signature(**kwargs2) != sig
|
||||
|
||||
|
||||
def test_signature_surface_only_8_declared_axes() -> None:
|
||||
params = set(inspect.signature(build_signature).parameters)
|
||||
expected = {
|
||||
"frame_id",
|
||||
"v4_label",
|
||||
"cardinality",
|
||||
"source_shape",
|
||||
"h3_count",
|
||||
"char_count_bucket",
|
||||
"layout_preset",
|
||||
"zone_position",
|
||||
}
|
||||
assert params == expected
|
||||
|
||||
|
||||
def test_bucket_boundaries() -> None:
|
||||
assert bucket_char_count(0) == "0-50"
|
||||
assert bucket_char_count(50) == "0-50"
|
||||
assert bucket_char_count(51) == "51-150"
|
||||
assert bucket_char_count(150) == "51-150"
|
||||
assert bucket_char_count(151) == "151-400"
|
||||
assert bucket_char_count(400) == "151-400"
|
||||
assert bucket_char_count(401) == "401-1000"
|
||||
assert bucket_char_count(1000) == "401-1000"
|
||||
assert bucket_char_count(1001) == "1001+"
|
||||
assert bucket_char_count(10_000) == "1001+"
|
||||
|
||||
|
||||
def test_bucket_rejects_negative() -> None:
|
||||
with pytest.raises(ValueError):
|
||||
bucket_char_count(-1)
|
||||
|
||||
|
||||
def test_bucket_rejects_non_int() -> None:
|
||||
with pytest.raises(TypeError):
|
||||
bucket_char_count(3.14) # type: ignore[arg-type]
|
||||
with pytest.raises(TypeError):
|
||||
bucket_char_count(True) # type: ignore[arg-type]
|
||||
|
||||
|
||||
def test_signature_stable_known_fixture() -> None:
|
||||
"""Lock the digest for a known fixture so a silent payload-shape change
|
||||
(e.g. a new axis sneaks in, or schema_version drifts) breaks this test.
|
||||
"""
|
||||
sig = build_signature(
|
||||
frame_id="frame_03",
|
||||
v4_label="light_edit",
|
||||
cardinality=3,
|
||||
source_shape=SourceShape.BULLET,
|
||||
h3_count=2,
|
||||
char_count_bucket="51-150",
|
||||
layout_preset="sidebar-right",
|
||||
zone_position="top",
|
||||
)
|
||||
import hashlib
|
||||
import json
|
||||
|
||||
expected_payload = {
|
||||
"schema_version": 1,
|
||||
"frame_id": "frame_03",
|
||||
"v4_label": "light_edit",
|
||||
"cardinality": 3,
|
||||
"source_shape": "bullet",
|
||||
"h3_count": 2,
|
||||
"char_count_bucket": "51-150",
|
||||
"layout_preset": "sidebar-right",
|
||||
"zone_position": "top",
|
||||
}
|
||||
expected = hashlib.sha256(
|
||||
json.dumps(expected_payload, sort_keys=True, ensure_ascii=False).encode("utf-8")
|
||||
).hexdigest()
|
||||
assert sig == expected
|
||||
@@ -1,12 +1,18 @@
|
||||
"""IMP-33 u8 — Step 12 AI repair wiring tests.
|
||||
"""IMP-33 u8 + IMP-46 u4 + IMP-47B u2 — Step 12 AI repair wiring tests.
|
||||
|
||||
Covers the two structural gates layered on top of the u7 router:
|
||||
Covers the structural gates layered on top of the u7 router:
|
||||
* IMP-30 provisional gate (only provisional units may invoke AI repair)
|
||||
* Reject gate (route_hint=design_reference_only NEVER calls AI)
|
||||
Plus the record-shape contract returned for downstream Step 12 artifacts.
|
||||
* Catch-all ``route_not_ai_adaptation:<hint>`` skip — every route_hint
|
||||
other than ``ai_adaptation_required`` (including the legacy
|
||||
``design_reference_only`` hint) falls through to a single uniform skip
|
||||
after the IMP-47B u2 removal of the bespoke reject gate.
|
||||
Plus the record-shape contract returned for downstream Step 12 artifacts
|
||||
and the IMP-46 u4 structural cache key + fingerprints contract.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
@@ -24,6 +30,12 @@ class FakeUnit:
|
||||
source_section_ids: list[str] = field(default_factory=lambda: ["s1"])
|
||||
raw_content: str = "raw"
|
||||
v4_rank: int | None = 1
|
||||
cardinality: int | None = None
|
||||
layout_preset: str = ""
|
||||
zone_position: str = ""
|
||||
source_shape: str = "paragraph"
|
||||
h3_count: int = 0
|
||||
char_count: int = 0
|
||||
|
||||
|
||||
_ROUTE_HINTS: dict[str | None, str | None] = {
|
||||
@@ -64,6 +76,25 @@ def _call(
|
||||
return step12_mod.gather_step12_ai_repair_proposals(units, **kwargs)
|
||||
|
||||
|
||||
def _ai_unit(**overrides: Any) -> FakeUnit:
|
||||
"""Construct an AI-eligible FakeUnit (provisional + restructure) with sane defaults."""
|
||||
base: dict[str, Any] = dict(
|
||||
label="restructure",
|
||||
provisional=True,
|
||||
frame_template_id="tmpl_x",
|
||||
frame_id="fid_123",
|
||||
source_section_ids=["02-1"],
|
||||
layout_preset="single_column",
|
||||
zone_position="zone_a",
|
||||
source_shape="bullet",
|
||||
h3_count=3,
|
||||
char_count=200,
|
||||
cardinality=5,
|
||||
)
|
||||
base.update(overrides)
|
||||
return FakeUnit(**base)
|
||||
|
||||
|
||||
def test_non_provisional_unit_is_skipped_without_ai_call(monkeypatch):
|
||||
router = MagicMock()
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
@@ -75,13 +106,20 @@ def test_non_provisional_unit_is_skipped_without_ai_call(monkeypatch):
|
||||
router.assert_not_called()
|
||||
|
||||
|
||||
def test_reject_route_is_skipped_without_ai_call(monkeypatch):
|
||||
def test_design_reference_route_falls_through_to_route_not_ai_adaptation(monkeypatch):
|
||||
"""IMP-47B u2 — the bespoke 'design_reference_only_no_ai' skip is gone.
|
||||
|
||||
Any non-AI-adaptation route_hint (including the legacy
|
||||
``design_reference_only`` hint exercised here via the local test mapping
|
||||
of ``reject``) now flows into the single ``route_not_ai_adaptation:<hint>``
|
||||
catch-all. Production reject routing is exercised by u9.
|
||||
"""
|
||||
router = MagicMock()
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
units = [FakeUnit(label="reject", provisional=True)]
|
||||
records = _call(units)
|
||||
assert records[0]["ai_called"] is False
|
||||
assert records[0]["skip_reason"] == "design_reference_only_no_ai"
|
||||
assert records[0]["skip_reason"] == "route_not_ai_adaptation:design_reference_only"
|
||||
assert records[0]["route_hint"] == "design_reference_only"
|
||||
router.assert_not_called()
|
||||
|
||||
@@ -153,29 +191,206 @@ def test_mixed_units_each_independently_classified(monkeypatch):
|
||||
records = _call(units)
|
||||
assert [r["skip_reason"] for r in records] == [
|
||||
"not_provisional",
|
||||
"design_reference_only_no_ai",
|
||||
"route_not_ai_adaptation:design_reference_only",
|
||||
"router_short_circuit",
|
||||
"not_provisional",
|
||||
]
|
||||
assert router.call_count == 1
|
||||
|
||||
|
||||
def test_cache_key_includes_template_and_section_ids(monkeypatch):
|
||||
# ---------------------------------------------------------------------------
|
||||
# IMP-46 u4 — structural cache key + fingerprints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_cache_key_format_is_frame_id_plus_sha256(monkeypatch):
|
||||
"""cache_key is '{frame_id}::{64-hex-sha256}', NOT template_id + section_ids."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
_call([_ai_unit()])
|
||||
cache_key = router.call_args.kwargs["cache_key"]
|
||||
assert "::" in cache_key
|
||||
frame_part, _, signature_part = cache_key.partition("::")
|
||||
assert frame_part == "fid_123"
|
||||
assert len(signature_part) == 64
|
||||
assert all(c in "0123456789abcdef" for c in signature_part)
|
||||
# The legacy "template_id::sorted(section_ids)" form is gone.
|
||||
assert "tmpl_x" not in cache_key
|
||||
assert "02-1" not in cache_key
|
||||
|
||||
|
||||
def test_cache_key_invariant_to_section_id_changes(monkeypatch):
|
||||
"""Same structural axes → same cache_key regardless of source_section_ids."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
_call([_ai_unit(source_section_ids=["02-1"])])
|
||||
key_a = router.call_args.kwargs["cache_key"]
|
||||
router.reset_mock()
|
||||
_call([_ai_unit(source_section_ids=["05-2", "07-3"])])
|
||||
key_b = router.call_args.kwargs["cache_key"]
|
||||
assert key_a == key_b
|
||||
|
||||
|
||||
def test_cache_key_invariant_to_template_id_changes(monkeypatch):
|
||||
"""frame_template_id is NOT part of the structural signature (frame_id is)."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
_call([_ai_unit(frame_template_id="tmpl_x")])
|
||||
key_a = router.call_args.kwargs["cache_key"]
|
||||
router.reset_mock()
|
||||
_call([_ai_unit(frame_template_id="tmpl_OTHER")])
|
||||
key_b = router.call_args.kwargs["cache_key"]
|
||||
assert key_a == key_b
|
||||
|
||||
|
||||
def test_cache_key_changes_when_any_signature_axis_changes(monkeypatch):
|
||||
"""Flipping any of the 7 unit-derived signature axes mutates cache_key."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
_call([_ai_unit()])
|
||||
base_key = router.call_args.kwargs["cache_key"]
|
||||
perturbations: dict[str, Any] = {
|
||||
"frame_id": "fid_OTHER",
|
||||
"label": "use_as_is", # v4_label axis change; still routed to AI via _ROUTE_HINTS? No.
|
||||
# ↑ "use_as_is" → "direct_render" → would skip. Use another ai-adaptation-mapped label.
|
||||
# Replace with frame_id-only diff to keep route stable. Drop this entry below.
|
||||
}
|
||||
# Rebuild perturbations restricted to axes that don't change routing.
|
||||
perturbations = {
|
||||
"frame_id": "fid_OTHER",
|
||||
"layout_preset": "two_column",
|
||||
"zone_position": "zone_b",
|
||||
"source_shape": "paragraph",
|
||||
"h3_count": 7,
|
||||
"char_count": 500, # bucket boundary crossing (151-400 → 401-1000)
|
||||
"cardinality": 4,
|
||||
}
|
||||
for axis, value in perturbations.items():
|
||||
router.reset_mock()
|
||||
_call([_ai_unit(**{axis: value})])
|
||||
new_key = router.call_args.kwargs["cache_key"]
|
||||
assert new_key != base_key, f"signature axis {axis!r} did not mutate cache_key"
|
||||
|
||||
|
||||
def test_char_count_bucket_collapses_within_bucket(monkeypatch):
|
||||
"""Different char_counts in the SAME bucket → identical cache_key."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
_call([_ai_unit(char_count=160)])
|
||||
key_low = router.call_args.kwargs["cache_key"]
|
||||
router.reset_mock()
|
||||
_call([_ai_unit(char_count=399)])
|
||||
key_high = router.call_args.kwargs["cache_key"]
|
||||
assert key_low == key_high # both fall in "151-400"
|
||||
router.reset_mock()
|
||||
_call([_ai_unit(char_count=401)])
|
||||
key_overflow = router.call_args.kwargs["cache_key"]
|
||||
assert key_overflow != key_low # crossed into "401-1000"
|
||||
|
||||
|
||||
def test_fingerprints_attached_to_ai_record(monkeypatch):
|
||||
"""AI-called records expose contract_sha + partial_sha + catalog_sha."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
contract = {"frame_id": "fid", "payload": {"x": 1}, "sub_zones": []}
|
||||
partial = {"some": "partial", "deeper": [1, 2, 3]}
|
||||
catalog_value = "deadbeef" * 8
|
||||
recs = _call(
|
||||
[_ai_unit()],
|
||||
get_contract_fn=lambda _t: contract,
|
||||
figma_partial_loader=lambda _t: partial,
|
||||
catalog_sha_loader=lambda: catalog_value,
|
||||
)
|
||||
fps = recs[0]["fingerprints"]
|
||||
assert isinstance(fps, dict)
|
||||
assert set(fps.keys()) == {"contract_sha", "partial_sha", "catalog_sha"}
|
||||
assert all(isinstance(v, str) for v in fps.values())
|
||||
assert fps["catalog_sha"] == catalog_value
|
||||
# contract_sha and partial_sha must be deterministic SHA256 over JSON-sorted payloads.
|
||||
expected_contract = hashlib.sha256(
|
||||
json.dumps(contract, sort_keys=True, ensure_ascii=False).encode("utf-8")
|
||||
).hexdigest()
|
||||
expected_partial = hashlib.sha256(
|
||||
json.dumps(partial, sort_keys=True, ensure_ascii=False).encode("utf-8")
|
||||
).hexdigest()
|
||||
assert fps["contract_sha"] == expected_contract
|
||||
assert fps["partial_sha"] == expected_partial
|
||||
|
||||
|
||||
def test_fingerprints_default_catalog_sha_is_empty_string(monkeypatch):
|
||||
"""No catalog_sha_loader → catalog_sha defaults to '' (sentinel, not missing key)."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
recs = _call([_ai_unit()])
|
||||
fps = recs[0]["fingerprints"]
|
||||
assert fps["catalog_sha"] == ""
|
||||
# contract_sha + partial_sha keys still present (always 3 keys).
|
||||
assert set(fps.keys()) == {"contract_sha", "partial_sha", "catalog_sha"}
|
||||
|
||||
|
||||
def test_fingerprints_change_when_contract_changes(monkeypatch):
|
||||
"""Different frame_contract → different contract_sha, partial_sha unchanged."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
fps_a = _call([_ai_unit()], get_contract_fn=lambda _t: {"a": 1})[0]["fingerprints"]
|
||||
fps_b = _call([_ai_unit()], get_contract_fn=lambda _t: {"a": 2})[0]["fingerprints"]
|
||||
assert fps_a["contract_sha"] != fps_b["contract_sha"]
|
||||
assert fps_a["partial_sha"] == fps_b["partial_sha"]
|
||||
|
||||
|
||||
def test_fingerprints_change_when_partial_changes(monkeypatch):
|
||||
"""Different figma_partial_json → different partial_sha, contract_sha unchanged."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
fps_a = _call(
|
||||
[_ai_unit()], figma_partial_loader=lambda _t: {"p": 1}
|
||||
)[0]["fingerprints"]
|
||||
fps_b = _call(
|
||||
[_ai_unit()], figma_partial_loader=lambda _t: {"p": 2}
|
||||
)[0]["fingerprints"]
|
||||
assert fps_a["partial_sha"] != fps_b["partial_sha"]
|
||||
assert fps_a["contract_sha"] == fps_b["contract_sha"]
|
||||
|
||||
|
||||
def test_v4_result_cardinality_uses_unit_value(monkeypatch):
|
||||
"""v4_result['cardinality'] mirrors the unit's cardinality (no longer hardcoded None)."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
_call([_ai_unit(cardinality=7)])
|
||||
assert router.call_args.kwargs["v4_result"]["cardinality"] == 7
|
||||
router.reset_mock()
|
||||
_call([_ai_unit(cardinality=None)])
|
||||
assert router.call_args.kwargs["v4_result"]["cardinality"] is None
|
||||
|
||||
|
||||
def test_skipped_records_have_no_cache_key_or_fingerprints(monkeypatch):
|
||||
"""Non-AI-eligible records keep cache_key and fingerprints as None."""
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", MagicMock(return_value=None))
|
||||
units = [
|
||||
FakeUnit(
|
||||
label="restructure",
|
||||
provisional=True,
|
||||
frame_template_id="tmpl_abc",
|
||||
source_section_ids=["02-1", "02-2"],
|
||||
)
|
||||
FakeUnit(label="restructure", provisional=False),
|
||||
FakeUnit(label="reject", provisional=True),
|
||||
FakeUnit(label="light_edit", provisional=True),
|
||||
]
|
||||
_call(units)
|
||||
assert router.call_args.kwargs["cache_key"] == "tmpl_abc::02-1,02-2"
|
||||
recs = _call(units)
|
||||
for rec in recs:
|
||||
assert rec["cache_key"] is None
|
||||
assert rec["fingerprints"] is None
|
||||
|
||||
|
||||
def test_record_shape_contract_is_stable(monkeypatch):
|
||||
def test_catalog_sha_loader_called_once_per_gather(monkeypatch):
|
||||
"""catalog_sha is computed once per gather call, not per unit."""
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
loader = MagicMock(return_value="cafefeed" * 8)
|
||||
_call(
|
||||
[_ai_unit(), _ai_unit(frame_id="fid_other"), _ai_unit(frame_id="fid_third")],
|
||||
catalog_sha_loader=loader,
|
||||
)
|
||||
loader.assert_called_once()
|
||||
|
||||
|
||||
def test_record_shape_contract_is_stable_with_u4_fields(monkeypatch):
|
||||
"""Record schema includes the IMP-46 u4 cache_key + fingerprints fields."""
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", MagicMock(return_value=None))
|
||||
units = [FakeUnit(label="reject", provisional=True)]
|
||||
rec = _call(units)[0]
|
||||
@@ -190,4 +405,98 @@ def test_record_shape_contract_is_stable(monkeypatch):
|
||||
"skip_reason",
|
||||
"proposal",
|
||||
"error",
|
||||
"cache_key",
|
||||
"fingerprints",
|
||||
}
|
||||
|
||||
|
||||
def test_cache_key_is_compatible_with_cache_parse_key(monkeypatch):
|
||||
"""cache_key produced here must round-trip through cache.py's _parse_key."""
|
||||
from src.phase_z2_ai_fallback.cache import KEY_DELIMITER, _parse_key
|
||||
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
_call([_ai_unit()])
|
||||
cache_key = router.call_args.kwargs["cache_key"]
|
||||
parsed = _parse_key(cache_key)
|
||||
assert parsed is not None
|
||||
frame_id, signature_hash = parsed
|
||||
assert frame_id == "fid_123"
|
||||
assert len(signature_hash) == 64
|
||||
assert KEY_DELIMITER not in signature_hash
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# IMP-47B u9 — Step 12 reject eligibility + normal-path AI=0 regression
|
||||
# ---------------------------------------------------------------------------
|
||||
# Locks the end-to-end Step 12 contract against the production route helper
|
||||
# `_imp05_route_hint`. The local `_ROUTE_HINTS` mapping above intentionally
|
||||
# preserves the legacy ``reject -> design_reference_only`` form to exercise
|
||||
# the catch-all fall-through branch; u9 instead drives gather with the real
|
||||
# production map (post-u1 flip) so reject provisional units reach the router
|
||||
# and normal-path labels stay AI=0.
|
||||
|
||||
|
||||
def test_production_reject_route_reaches_router_when_provisional(monkeypatch):
|
||||
"""Post-u1, provisional reject units must reach ``route_ai_fallback``."""
|
||||
from src.phase_z2_pipeline import _imp05_route_hint
|
||||
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
records = step12_mod.gather_step12_ai_repair_proposals(
|
||||
[FakeUnit(label="reject", provisional=True)],
|
||||
route_for_label=_imp05_route_hint,
|
||||
get_contract_fn=_get_contract,
|
||||
frame_visual_loader=_frame_visual,
|
||||
)
|
||||
assert records[0]["route_hint"] == "ai_adaptation_required"
|
||||
assert records[0]["skip_reason"] == "router_short_circuit"
|
||||
assert records[0]["ai_called"] is False
|
||||
router.assert_called_once()
|
||||
|
||||
|
||||
def test_production_normal_route_labels_never_reach_router(monkeypatch):
|
||||
"""Normal-path labels stay AI=0 even when the unit is provisional."""
|
||||
from src.phase_z2_pipeline import _imp05_route_hint
|
||||
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
units = [
|
||||
FakeUnit(label="use_as_is", provisional=True),
|
||||
FakeUnit(label="light_edit", provisional=True),
|
||||
FakeUnit(label=None, provisional=True),
|
||||
]
|
||||
records = step12_mod.gather_step12_ai_repair_proposals(
|
||||
units,
|
||||
route_for_label=_imp05_route_hint,
|
||||
get_contract_fn=_get_contract,
|
||||
frame_visual_loader=_frame_visual,
|
||||
)
|
||||
assert records[0]["skip_reason"] == "route_not_ai_adaptation:direct_render"
|
||||
assert records[1]["skip_reason"] == (
|
||||
"route_not_ai_adaptation:deterministic_minor_adjustment"
|
||||
)
|
||||
assert records[2]["skip_reason"] == "route_not_ai_adaptation:None"
|
||||
router.assert_not_called()
|
||||
|
||||
|
||||
def test_production_non_provisional_reject_skipped_before_route_gate(monkeypatch):
|
||||
"""The provisional gate fires before the route gate (production routing).
|
||||
|
||||
Even with reject routed to ``ai_adaptation_required`` (post-u1), a
|
||||
non-provisional reject unit must short-circuit at ``not_provisional``
|
||||
without ever consulting ``route_for_label`` for an AI dispatch.
|
||||
"""
|
||||
from src.phase_z2_pipeline import _imp05_route_hint
|
||||
|
||||
router = MagicMock(return_value=None)
|
||||
monkeypatch.setattr(step12_mod, "route_ai_fallback", router)
|
||||
records = step12_mod.gather_step12_ai_repair_proposals(
|
||||
[FakeUnit(label="reject", provisional=False)],
|
||||
route_for_label=_imp05_route_hint,
|
||||
get_contract_fn=_get_contract,
|
||||
frame_visual_loader=_frame_visual,
|
||||
)
|
||||
assert records[0]["skip_reason"] == "not_provisional"
|
||||
assert records[0]["ai_called"] is False
|
||||
router.assert_not_called()
|
||||
|
||||
Reference in New Issue
Block a user