Add Phase Z runtime foundation

- add visual fit classifier, router, retry, and failure routing modules
- add composition planner and catalog-driven mapper
- add Phase Z pipeline orchestration and architecture docs
This commit is contained in:
2026-05-04 08:21:28 +09:00
parent 79f0c55745
commit e7848b602d
11 changed files with 5465 additions and 0 deletions

395
src/phase_z2_classifier.py Normal file
View File

@@ -0,0 +1,395 @@
"""Phase Z-2 fit_classifier v0 (A1 — 분류 layer 만).
Selenium visual_runtime_check 의 결과 (clipped_inner / zone overflow) 를
spec `docs/architecture/PHASE-Z-FIT-CLASSIFIER-ROUTER-SPEC.md` §3 taxonomy
의 *category* 로 분류하는 layer.
본 모듈은 ***분류만***. action / router / rerender / behavior 변경 X.
출력 = debug.json 의 `fit_classification` trace.
원칙 :
- className 이라는 raw 문자열 → semantic content_type 매핑은 *registry* 가 담당
- excess_y (px) → line_equivalent 환산은 content_type 별 line-height 기준
- category 결정은 spec §3.2 우선순위 그대로 적용 (frame_capacity_mismatch →
tabular → structural_major → layout_zone_mismatch → structural_minor →
moderate → minor → hard_visual_fail)
- 모든 결정은 trace 에 명시 — *어느 룰이 왜 적용됐는지* debug 로 검증 가능
다음 step (별도 — A2) :
overflow_router 가 본 module 의 category 를 받아 action 으로 매핑.
본 step 에서 router 는 X.
"""
from __future__ import annotations
import re
from typing import Optional
# ─── §2 className → semantic content_type registry ───────────────
# spec PHASE-Z-FIT-CLASSIFIER-ROUTER-SPEC.md §2 의 registry 그대로.
# 패턴은 *위에서 아래로* 첫 매칭 우선. 더 specific 한 패턴이 위에 와야 함.
CONTENT_TYPE_PATTERNS: list[tuple[str, str, str]] = [
# (regex pattern, semantic_content_type, description)
# transform-block / transform-row → structural_unit
# spec : "paired comparison (AS-IS/TO-BE 한 쌍이 의미 단위)"
(r"^transform-block(__|$)", "structural_unit",
"transform-block — paired comparison container"),
(r"^transform-row(__|$)", "structural_unit",
"transform-row — AS-IS/TO-BE pair row"),
(r"^transform-rows$", "structural_unit",
"transform-rows wrapper"),
# tabular — table 클래스 또는 native <table>
(r"(^|[-_])table($|[-_])", "tabular",
"table — tabular content"),
# text-line family → text_flow
(r"^text-line(--|$)", "text_flow",
"text-line — free-flowing text/bullet"),
# frame internal cell (frame 내부의 단위 cell)
(r"^f\d+b__cell(--|$)", "frame_internal_cell",
"frame internal cell"),
(r"^f\d+b__pillar(--|$)", "frame_internal_cell",
"frame internal pillar"),
(r"^f\d+b__quadrant(--|$)", "frame_internal_cell",
"frame internal quadrant"),
# frame label / title / banner / ribbon
(r"^f\d+b__title$", "frame_label",
"frame title"),
(r"^f\d+b__section-title", "frame_label",
"frame section-title"),
(r"^f\d+b__banner", "frame_label",
"frame banner"),
(r"^f\d+b__ribbon", "frame_label",
"frame ribbon"),
(r"__label", "frame_label",
"frame label"),
# frame root (f29b, f13b, f16b 자체)
(r"^f\d+b$", "frame_internal",
"frame family root"),
# visual asset
(r"__bg(\b|$)", "visual_asset", "background asset"),
(r"^bg-", "visual_asset", "background asset"),
(r"__icon(\b|$)", "visual_asset", "icon asset"),
(r"^img-", "visual_asset", "image asset"),
]
def classify_content_type(class_name: str) -> tuple[str, str]:
"""className 문자열 (공백 구분 multiple tokens 가능) → (semantic_content_type, match_reason).
공백으로 split 한 후 각 token 에 대해 CONTENT_TYPE_PATTERNS 순차 매칭.
*첫 매칭* 이 우선 (registry 의 순서가 우선순위).
매칭 안 되면 ('unknown', '').
예 :
'f29b__cell f29b__cell--left' → ('frame_internal_cell', "...")
'transform-block' → ('structural_unit', "...")
'text-line text-line--bullet' → ('text_flow', "...")
"""
if not class_name:
return ("unknown", "")
tokens = class_name.strip().split()
for token in tokens:
for pattern, ctype, desc in CONTENT_TYPE_PATTERNS:
if re.search(pattern, token):
return (ctype, f"token '{token}' matched pattern '{pattern}' ({desc})")
return ("unknown", f"no pattern matched any of tokens {tokens}")
# ─── line_equivalent 환산 ─────────────────────────────────────────
# content_type 별 *대표 단위 height* — excess_y 를 줄(또는 단위) 단위로 환산.
# structural_unit / tabular 의 경우는 "1 단위" = transform-row 또는 table-row.
DEFAULT_UNIT_HEIGHTS: dict[str, float] = {
# transform-row : padding 3+3 + line-height 11×1.45=15.95 ≈ 21.95
"structural_unit": 21.95,
# text-line : font 11 × line-height 1.6 = 17.6
"text_flow": 17.6,
# tabular row : 추정치 (실제 표 case 들어오면 calibration)
"tabular": 22.0,
# frame label / title : font 13 × line-height 1.3 = 16.9
"frame_label": 16.9,
# frame_internal* : 보수적 default (text-line 기준)
"frame_internal": 17.6,
"frame_internal_cell": 17.6,
# visual asset : crop 가능, 단위는 의미 없음 (line_eq 사용 안 됨)
"visual_asset": 17.6,
# unknown : text-line default
"unknown": 17.6,
}
def compute_line_equivalent(excess_y: float, content_type: str) -> float:
"""excess_y (px) → line_equivalent (몇 줄 / 단위 분량인가).
content_type 별 default unit height 사용. 단위 height 가 0 이거나 없으면 0 반환.
소수점 2 자리 round.
"""
unit_h = DEFAULT_UNIT_HEIGHTS.get(content_type, 17.6)
if unit_h <= 0:
return 0.0
return round(float(excess_y) / unit_h, 2)
# ─── §3 taxonomy classifier ──────────────────────────────────────
# spec §3.2 우선순위 :
# 1. frame_capacity_mismatch (composition 결과 우선)
# 2. tabular_overflow
# 3. structural_major_overflow
# 4. layout_zone_mismatch
# 5. structural_minor_overflow
# 6. moderate_overflow
# 7. minor_overflow
# 8. hard_visual_fail (fallback)
def classify_overflow(
*,
excess_y: float,
excess_x: float,
class_name: str,
inner_content_signals: Optional[list[str]] = None,
capacity_fit_status: Optional[str] = None,
) -> dict:
"""단일 overflow event (clipped_inner 또는 zone-self) 를 spec §3 category 로 분류.
Args:
excess_y / excess_x : Selenium 측정 overflow px
class_name : Selenium 이 캡처한 className 문자열 (multi-token 가능)
inner_content_signals : Selenium 이 추가로 보고한 *내부 콘텐츠 신호* list
(예: ['structural_unit'] — clipped cell 안에 transform-block 이 있음).
className 이 frame_internal_cell 같은 *컨테이너* 일 때 *실제 overflow 한
content 의 type* 을 추론하기 위해 사용.
capacity_fit_status : composition v0.2 의 capacity_fit.fit_status (있으면 우선)
Returns:
dict with inputs / derived / category / rule_applied
"""
inner_content_signals = list(inner_content_signals or [])
raw_type, type_match = classify_content_type(class_name)
# 컨테이너 (frame_internal_cell / frame_internal) 의 경우 inner signal 로 refine.
# 이유 : Selenium 이 overflow:hidden 컨테이너 (cell) 를 잡지만, 실제 *overflow 한
# content* 는 그 안의 transform-block / table / text-line. 컨테이너 className 만
# 보고는 *어떤 종류의 content 가 잘리고 있는지* 모름. inner signal 이 그걸 알려줌.
refined_via_inner = None
if raw_type in {"frame_internal_cell", "frame_internal", "unknown"} and inner_content_signals:
# spec §3.2 우선순위 따라 — tabular > structural_unit > text_flow
if "tabular" in inner_content_signals:
content_type, refined_via_inner = "tabular", "tabular (inner_signal)"
elif "structural_unit" in inner_content_signals:
content_type, refined_via_inner = "structural_unit", "structural_unit (inner_signal)"
elif "text_flow" in inner_content_signals:
content_type, refined_via_inner = "text_flow", "text_flow (inner_signal)"
else:
content_type = raw_type
else:
content_type = raw_type
line_equivalent = compute_line_equivalent(excess_y, content_type)
inputs = {
"excess_y": float(excess_y),
"excess_x": float(excess_x),
"class_name": class_name,
"inner_content_signals": inner_content_signals,
"capacity_fit_status": capacity_fit_status,
}
derived = {
"container_content_type": raw_type, # className 만 본 결과
"container_match": type_match,
"content_type": content_type, # inner signal 로 refine 된 *최종* 분류
"content_type_refined_via_inner": refined_via_inner,
"line_equivalent": line_equivalent,
"unit_height_used": DEFAULT_UNIT_HEIGHTS.get(content_type, 17.6),
}
def result(category: str, rule: str) -> dict:
return {
"inputs": inputs,
"derived": derived,
"category": category,
"rule_applied": rule,
}
# 1. frame_capacity_mismatch — composition 결과가 이미 mismatch 신호
if capacity_fit_status in {"strict_mismatch", "exceeds_max", "below_min", "exceeds_truncate"}:
return result(
"frame_capacity_mismatch",
f"capacity_fit_status='{capacity_fit_status}' — composition 단계의 "
f"capacity_fit 가 이미 mismatch 신호 (spec §3.2 우선순위 1)",
)
# 2. tabular_overflow — 표는 어떤 양이든 popup 영역
if content_type == "tabular":
return result(
"tabular_overflow",
f"content_type=tabular — 표는 행 단위 자르면 의미 손실 (spec §3.2 우선순위 2)",
)
# 3. structural_major_overflow — 1 개 이상 *완전 단위* 잘림
if content_type == "structural_unit" and line_equivalent >= 1.0:
return result(
"structural_major_overflow",
f"content_type=structural_unit AND line_equivalent={line_equivalent} >= 1.0 — "
f"의미 단위 1+ 완전 잘림 (spec §3.2 우선순위 3)",
)
# 4. layout_zone_mismatch — frame root 자체 overflow
if content_type == "frame_internal":
return result(
"layout_zone_mismatch",
f"content_type=frame_internal — frame root 자체가 zone 안에 못 들어감 "
f"(spec §3.2 우선순위 4)",
)
# 5. structural_minor_overflow — boundary spill (부분만 잘림)
if content_type == "structural_unit":
return result(
"structural_minor_overflow",
f"content_type=structural_unit AND line_equivalent={line_equivalent} < 1.0 — "
f"boundary spill (부분 단위 잘림, 완전 단위 손실 아님) (spec §3.2 우선순위 5)",
)
# 6. moderate_overflow — text/label flow 의 중간 양
if content_type in {"text_flow", "frame_label"} and 1.5 < line_equivalent <= 4.0:
return result(
"moderate_overflow",
f"content_type={content_type} AND line_equivalent={line_equivalent} ∈ (1.5, 4] "
f"(spec §3.2 우선순위 6)",
)
# 7. minor_overflow — text/label flow 의 작은 양
if content_type in {"text_flow", "frame_label"} and line_equivalent <= 1.5:
return result(
"minor_overflow",
f"content_type={content_type} AND line_equivalent={line_equivalent} ≤ 1.5 "
f"(spec §3.2 우선순위 7)",
)
# 8. hard_visual_fail — fallback (위 어디에도 안 잡힘)
return result(
"hard_visual_fail",
f"위 매핑 모두 미적용 (content_type={content_type}, line_equivalent="
f"{line_equivalent}) — fallback (spec §3.2 우선순위 8)",
)
# ─── visual_runtime_check 결과 → 전체 fit_classification trace ────
def classify_visual_runtime_check(overflow: dict, debug_zones: list[dict]) -> dict:
"""Selenium overflow + composition 의 zone debug → 전체 fit_classification 산출.
각 overflow event (zone-self overflow / cell-level clipped_inner) 를 개별 분류.
Args:
overflow : run_overflow_check 결과 (passed, slide, zones[], ...)
debug_zones : pipeline 의 debug_zones list (zone 별 capacity_fit / template_id 등)
Returns:
dict :
visual_check_passed : Selenium 통과 여부
classifications : 각 overflow event 의 분류 결과 list
summary : 텍스트 요약 (n events, categories seen)
categories_seen : 등장한 카테고리 unique list
unclassified_signals : 미분류 신호 (raw Selenium 결과 중 분류 안 된 것)
"""
if overflow.get("passed", False):
return {
"visual_check_passed": True,
"classifications": [],
"summary": "visual check passed — no overflow to classify",
"categories_seen": [],
"unclassified_signals": [],
}
# zone position → debug_zones 매핑 (capacity_fit_status 추출용)
capacity_status_by_position: dict[str, Optional[str]] = {}
template_id_by_position: dict[str, Optional[str]] = {}
for dz in (debug_zones or []):
pos = dz.get("position")
capacity_status_by_position[pos] = (
(dz.get("composition_rationale") or {})
.get("capacity_fit", {})
.get("fit_status")
)
template_id_by_position[pos] = dz.get("v4_template_id")
classifications: list[dict] = []
for z in overflow.get("zones", []):
zone_position = z.get("position", "?")
zone_template_id = z.get("template_id") or template_id_by_position.get(zone_position)
capacity_fit_status = capacity_status_by_position.get(zone_position)
# zone-self overflow (frame root 자체)
if z.get("overflowed"):
cls = classify_overflow(
excess_y=z.get("excess_y", 0),
excess_x=z.get("excess_x", 0),
class_name=zone_template_id and f"f{re.sub(r'[^0-9]', '', str(zone_template_id))[:2] or '0'}b" or "f?b",
# zone 자체는 frame root 패턴 매칭 → frame_internal 으로 분류 의도
capacity_fit_status=capacity_fit_status,
)
cls["source"] = "zone_self_overflow"
cls["zone_position"] = zone_position
cls["zone_template_id"] = zone_template_id
classifications.append(cls)
# cell-level clipped_inner
for c in z.get("clipped_inner", []):
cls = classify_overflow(
excess_y=c.get("excess_y", 0),
excess_x=c.get("excess_x", 0),
class_name=c.get("class_name", ""),
inner_content_signals=c.get("inner_content_signals") or [],
capacity_fit_status=capacity_fit_status,
)
cls["source"] = "clipped_inner"
cls["zone_position"] = zone_position
cls["zone_template_id"] = zone_template_id
cls["client_height"] = c.get("clientHeight")
cls["scroll_height"] = c.get("scrollHeight")
classifications.append(cls)
# slide-level / slide-body overflow (zones 외부) 도 분류 시도 (보통 zone-level 에서 잡히지만 보조)
unclassified: list[dict] = []
slide_m = overflow.get("slide") or {}
if slide_m.get("overflowed"):
unclassified.append({
"level": "slide",
"excess_y": slide_m.get("excess_y"),
"excess_x": slide_m.get("excess_x"),
"note": "slide-level overflow — 보통 zone 단위 분류로 충분, 미분류 보고만",
})
body_m = overflow.get("slide_body") or {}
if body_m.get("overflowed"):
unclassified.append({
"level": "slide_body",
"excess_y": body_m.get("excess_y"),
"excess_x": body_m.get("excess_x"),
"note": "slide_body overflow — 위와 같음",
})
categories = sorted({c["category"] for c in classifications})
return {
"visual_check_passed": False,
"classifications": classifications,
"summary": (
f"{len(classifications)} overflow event(s) classified, "
f"categories: {categories or 'none'}"
),
"categories_seen": categories,
"unclassified_signals": unclassified,
}

571
src/phase_z2_composition.py Normal file
View File

@@ -0,0 +1,571 @@
"""Phase Z-2 Composition Planner v0.
Pipeline 의 빠진 layer = MDX 덩어리들을 *최종 zone unit* 으로 묶는 결정 layer.
위치 :
parse_mdx → align_sections_to_v4_granularity → [본 모듈] → render
원칙 (절대 룰) :
- 특정 MDX / frame / section 하드코딩 X (예: "04-2 면" / "F16 이면")
- 모든 결정 = catalog 메타 + V4 evidence parametric
- 같은 코드가 MDX 02/03/04/05/06... 모두 처리 — 결과는 케이스마다 다름
- drilling 결과 = 입력 (재료), composition planner 결과 = 출력 (zone units)
- slide-level layout = zone 까지만 나눔. zone 내부 분할은 frame partial 책임
8 layout preset vocabulary :
L1 single / L2 horizontal-2 / L3 vertical-2
L4 top-1-bottom-2 / L5 top-2-bottom-1
L6 left-1-right-2 / L7 left-2-right-1
L8 grid-2x2
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
# ─── 8 Layout Preset Vocabulary ────────────────────────────────
LAYOUT_PRESETS: dict[str, dict] = {
"single": {
"zones": 1,
"topology": "single",
"positions": ["primary"],
"css_areas": '"primary"',
"css_cols": "1fr",
"css_rows": "1fr",
},
"horizontal-2": {
"zones": 2,
"topology": "rows",
"positions": ["top", "bottom"],
"css_areas": '"top" "bottom"',
"css_cols": "1fr",
"css_rows": "1fr 1fr",
},
"vertical-2": {
"zones": 2,
"topology": "cols",
"positions": ["left", "right"],
"css_areas": '"left right"',
"css_cols": "1fr 1fr",
"css_rows": "1fr",
},
"top-1-bottom-2": {
"zones": 3,
"topology": "T",
"positions": ["top", "bottom-left", "bottom-right"],
"css_areas": '"top top" "bottom-left bottom-right"',
"css_cols": "1fr 1fr",
"css_rows": "1fr 1fr",
},
"top-2-bottom-1": {
"zones": 3,
"topology": "inverted-T",
"positions": ["top-left", "top-right", "bottom"],
"css_areas": '"top-left top-right" "bottom bottom"',
"css_cols": "1fr 1fr",
"css_rows": "1fr 1fr",
},
"left-1-right-2": {
"zones": 3,
"topology": "side-T-left",
"positions": ["left", "right-top", "right-bottom"],
"css_areas": '"left right-top" "left right-bottom"',
"css_cols": "1fr 1fr",
"css_rows": "1fr 1fr",
},
"left-2-right-1": {
"zones": 3,
"topology": "side-T-right",
"positions": ["left-top", "right", "left-bottom"],
"css_areas": '"left-top right" "left-bottom right"',
"css_cols": "1fr 1fr",
"css_rows": "1fr 1fr",
},
"grid-2x2": {
"zones": 4,
"topology": "2x2",
"positions": ["top-left", "top-right", "bottom-left", "bottom-right"],
"css_areas": '"top-left top-right" "bottom-left bottom-right"',
"css_cols": "1fr 1fr",
"css_rows": "1fr 1fr",
},
}
# ─── CompositionUnit ────────────────────────────────────────────
@dataclass
class CompositionUnit:
"""Slide 내 1 zone 후보 = MDX section(s) + 매칭된 frame.
source_section_ids : 1 개 = single, 2+ = merged
merge_type :
- "single" : 단일 section
- "parent_merged" : parent V4 entry 존재 (v0)
- "parent_merged_inferred" : parent V4 entry 없음, child evidence 로 추론 (v0.1)
frame_* : V4 evidence 그대로 (catalog 메타 X 하드코딩 X)
score : 종합 점수
rationale : score breakdown 추적
review_required : True 면 자동 선택 X — debug 에만 노출, 사용자/AI 검토 후
별도 path (light_edit / restructure / AI restructuring) 로 처리
review_reasons : 왜 review_required 가 True 인지 (자가검증용 — child label mix /
template_id 불일치 / cardinality 불호환 등)
"""
source_section_ids: list[str]
merge_type: str
frame_template_id: str
frame_id: str
frame_number: int
confidence: float
label: str # use_as_is / light_edit / restructure / reject
phase_z_status: str
raw_content: str
title: str
score: float = 0.0
rationale: dict = field(default_factory=dict)
# 자동 파이프라인 단계 상태 (review/UI 개념 X — 현재는 자동 결정 + 명확한 실패 기록만)
# auto_selectable=False 면 자동 선택 단계에서 제외. filter_reasons 가 그 이유.
# 예: parent_merged_inferred 의 W1/W2/W3 (rep status / all reject / majority not-auto-renderable)
# 사용자/AI 검토는 별 layer (interactive editor) 에서 처리. 본 dataclass 는 자동 결정 완결.
auto_selectable: bool = True
filter_reasons: list[str] = field(default_factory=list)
# informational signals — auto_selectable 여부와 무관. future axis 가 점수화할 영역.
# 예: "children disagree on rank-1 template_id" / "minority of children non-auto-renderable"
notes: list[str] = field(default_factory=list)
# ─── Heading Tree ──────────────────────────────────────────────
def derive_parent_id(section_id: str) -> Optional[str]:
"""section_id 에서 parent 도출 — V4 키 컨벤션 기반.
예시 (코멘트, 룰 X) :
- "04-2.1""04-2" (decimal suffix → strip)
- "04-1" → None (top-level, no parent)
- "04" → None
"""
parts = section_id.split("-", 1)
if len(parts) != 2:
return None
mdx_id, suffix = parts
if "." in suffix:
parent_suffix = suffix.split(".")[0]
return f"{mdx_id}-{parent_suffix}"
return None
def build_heading_tree(sections) -> dict:
"""Section list → tree {section_id: {section, children}}."""
tree = {s.section_id: {"section": s, "children": []} for s in sections}
for s in sections:
parent = derive_parent_id(s.section_id)
if parent and parent in tree:
tree[parent]["children"].append(s.section_id)
return tree
# ─── Candidate Generation ──────────────────────────────────────
def _apply_capacity_fit(candidate: CompositionUnit, capacity_fit_fn) -> None:
"""capacity_fit_fn 결과를 candidate 의 rationale + auto_selectable + filter_reasons 에 반영.
fit_status 가 'ok' / 'no_contract' / 'unknown_source_shape' 이면 auto_selectable 영향 X
(no_contract 는 catalog-only mapper 가 별도로 ValueError 처리).
그 외 (strict_mismatch / exceeds_max / below_min / exceeds_truncate) 는 silent loss 또는
mapper FitError 가 발생할 후보 → auto_selectable=False + filter_reasons 'C1: ...'.
"""
if capacity_fit_fn is None:
return
fit = capacity_fit_fn(candidate.frame_template_id, candidate.raw_content)
candidate.rationale["capacity_fit"] = fit
if fit["fit_status"] in {"ok", "no_contract", "unknown_source_shape"}:
return
candidate.auto_selectable = False
candidate.filter_reasons.append(
f"C1: capacity mismatch ({fit['fit_status']}) — {fit['mismatch_reason']}"
)
def collect_candidates(sections, v4_lookup_fn, v4_label_to_status: dict,
auto_renderable_statuses: Optional[set[str]] = None,
capacity_fit_fn=None):
"""Generate composition candidates.
v0.1 candidate types :
1. single : per leaf section (V4 entry 필수)
2. parent_merged : parent 자체에 V4 entry 존재 (parent 가 직접 매칭됨)
3. parent_merged_inferred : parent V4 없음. child evidence 로 representative
template_id 추론
원칙 :
- 특정 section_id / template_id / frame 하드코딩 X
- 모든 결정 = derive_parent_id() + V4 evidence + v4_label_to_status mapping + 주입된 fn (파라메트릭)
Args:
sections : align 결과
v4_lookup_fn : (section_id) → V4Match | None
v4_label_to_status : V4 label → Phase Z status mapping
auto_renderable_statuses : 자동 렌더 허용 status set (W1/W3 판정 입력)
capacity_fit_fn : Optional (template_id, content) → fit dict.
제공되면 모든 candidate 에 적용 — capacity mismatch 시 auto_selectable=False
(silent truncate / mapper FitError 사전 차단).
Returns:
list[CompositionUnit]
"""
if auto_renderable_statuses is None:
auto_renderable_statuses = set()
candidates = []
# 1. Separate
for s in sections:
match = v4_lookup_fn(s.section_id)
if match is None:
continue
c = CompositionUnit(
source_section_ids=[s.section_id],
merge_type="single",
frame_template_id=match.template_id,
frame_id=match.frame_id,
frame_number=match.frame_number,
confidence=match.confidence,
label=match.label,
phase_z_status=v4_label_to_status.get(match.label, "unknown"),
raw_content=s.raw_content,
title=s.title,
)
_apply_capacity_fit(c, capacity_fit_fn)
candidates.append(c)
# parent → children 그룹화
parent_to_children: dict[str, list] = {}
for s in sections:
pid = derive_parent_id(s.section_id)
if pid:
parent_to_children.setdefault(pid, []).append(s)
# 2. parent_merged (parent 자체가 V4 에 매칭된 경우)
for pid, children in parent_to_children.items():
parent_match = v4_lookup_fn(pid)
if parent_match is None:
continue # branch 3 가 처리
if len(children) < 2:
continue # merge 의미 없음
merged_raw = "\n\n".join(c.raw_content for c in children)
c_pm = CompositionUnit(
source_section_ids=[c.section_id for c in children],
merge_type="parent_merged",
frame_template_id=parent_match.template_id,
frame_id=parent_match.frame_id,
frame_number=parent_match.frame_number,
confidence=parent_match.confidence,
label=parent_match.label,
phase_z_status=v4_label_to_status.get(parent_match.label, "unknown"),
raw_content=merged_raw,
title=pid,
)
_apply_capacity_fit(c_pm, capacity_fit_fn)
candidates.append(c_pm)
# 3. parent_merged_inferred (v0.1) — parent V4 없음, child evidence 기반
for pid, children in parent_to_children.items():
if v4_lookup_fn(pid) is not None:
continue # branch 2 가 이미 처리
if len(children) < 2:
continue
# children 중 V4 매칭 있는 것들만 evidence 로 사용
child_matches: list[tuple] = []
for c in children:
m = v4_lookup_fn(c.section_id)
if m is not None:
child_matches.append((c, m))
if len(child_matches) < 2:
continue # 최소 2 child evidence 필요
# representative = 가장 confidence 높은 child match (v0.1.1 단순 룰)
# 향후 axes : top-k convergence, template family agreement, cardinality_fit 등
rep_child, rep_match = max(child_matches, key=lambda cm: cm[1].confidence)
# 자동 선택 가능 여부 = auto_selectable. default True (strong inferred merge).
# 다음 weak 신호 중 하나라도 있으면 auto_selectable=False (filter_reasons 에 사유) :
# W1 : representative status 가 auto-renderable 아님 → 자동 렌더 자체가 막힘
# W2 : 모든 child 가 reject → merge 의미 자체가 없음
# W3 : auto-renderable 아닌 child label 이 majority (>50%)
# informational notes (auto_selectable 영향 X, future axis 점수화 영역) :
# N1 : children 의 rank-1 template_id 가 서로 다름 → top-k / family compat
# N2 : non-auto-renderable child label 이 일부 (소수) 존재
rep_status = v4_label_to_status.get(rep_match.label, "unknown")
child_labels = [m.label for _, m in child_matches]
child_template_ids_unique = sorted({m.template_id for _, m in child_matches})
n_children = len(child_matches)
n_not_auto = sum(
1 for l in child_labels
if v4_label_to_status.get(l) not in auto_renderable_statuses
)
filter_reasons: list[str] = []
notes: list[str] = []
if rep_status not in auto_renderable_statuses:
filter_reasons.append(
f"W1: representative status '{rep_status}' (label={rep_match.label}) "
f"not in auto_renderable_statuses={sorted(auto_renderable_statuses)}."
)
if all(l == "reject" for l in child_labels):
filter_reasons.append(
"W2: all children labeled 'reject' — merge has no fit basis."
)
if n_children > 0 and n_not_auto * 2 > n_children:
non_auto_labels = sorted({
l for l in child_labels
if v4_label_to_status.get(l) not in auto_renderable_statuses
})
filter_reasons.append(
f"W3: majority of children ({n_not_auto}/{n_children}) have "
f"non-auto-renderable labels {non_auto_labels}."
)
if len(child_template_ids_unique) > 1:
notes.append(
f"N1: children's rank-1 template_id differs ({child_template_ids_unique}). "
f"representative='{rep_match.template_id}' (highest child confidence). "
f"top-k / family compatibility 평가는 future axis."
)
if 0 < n_not_auto <= n_children // 2:
non_auto_labels_minority = sorted({
l for l in child_labels
if v4_label_to_status.get(l) not in auto_renderable_statuses
})
notes.append(
f"N2: minority ({n_not_auto}/{n_children}) of children non-auto-renderable "
f"({non_auto_labels_minority}). representative is auto-renderable, merge proceeds."
)
auto_selectable = len(filter_reasons) == 0
merged_raw = "\n\n".join(c.raw_content for c, _ in child_matches)
c_inf = CompositionUnit(
source_section_ids=[c.section_id for c, _ in child_matches],
merge_type="parent_merged_inferred",
frame_template_id=rep_match.template_id,
frame_id=rep_match.frame_id,
frame_number=rep_match.frame_number,
confidence=rep_match.confidence,
label=rep_match.label,
phase_z_status=rep_status,
raw_content=merged_raw,
title=pid,
auto_selectable=auto_selectable,
filter_reasons=filter_reasons,
notes=notes,
)
_apply_capacity_fit(c_inf, capacity_fit_fn)
candidates.append(c_inf)
return candidates
# ─── Scoring ───────────────────────────────────────────────────
# v0 label weights — V4 label → score multiplier.
# 향후 axes 추가 (cardinality_fit / hierarchy_coherence / density) 시 확장.
V0_LABEL_WEIGHT = {
"use_as_is": 1.0,
"light_edit": 0.7,
"restructure": 0.4,
"reject": 0.0,
}
def score_candidate(c: CompositionUnit) -> CompositionUnit:
"""v0 scoring : confidence × label_weight.
추후 추가될 axes (rationale 에 자리만 잡아둠) :
- cardinality_fit : item_count vs frame ideal/min/max
- hierarchy_coherence : merge_type 적합도
- density_score : content 밀도 vs zone 크기
"""
label_weight = V0_LABEL_WEIGHT.get(c.label, 0.0)
frame_compat = c.confidence * label_weight
c.score = frame_compat
# 기존 rationale 보존 (예: collect_candidates 가 넣은 capacity_fit)
c.rationale.update({
"frame_compat": round(frame_compat, 4),
"confidence": c.confidence,
"label": c.label,
"label_weight": label_weight,
"merge_type": c.merge_type,
# placeholders for future axes
"hierarchy_coherence": None,
"density_score": None,
})
return c
# ─── Selection ─────────────────────────────────────────────────
def select_composition_units(candidates, allowed_statuses: set[str]) -> list[CompositionUnit]:
"""Greedy non-overlapping selection by score, with coverage tiebreak.
1. 모든 candidate 점수 매김
2. filter :
- phase_z_status ∈ allowed_statuses
- auto_selectable=True (W1/W2/W3 신호 통과)
3. 정렬 키 = (score desc, source_section_ids 수 desc)
— 동점이면 더 많은 section 을 cover 하는 후보 우선.
parent_merged_inferred 가 같은 점수의 single 후보를 *coverage 우위* 로 이김.
4. greedy : 이미 covered 된 section 을 가진 후보는 skip
5. 최종 선택 = covered set 채워나감
auto_selectable=False candidate 는 자동 선택 X. debug 의 candidates_summary 에는 남음.
UI/editor layer 에서 사용자가 별도 처리 가능 (현 v0 범위 X).
"""
scored = [score_candidate(c) for c in candidates]
viable = [
c for c in scored
if c.phase_z_status in allowed_statuses and c.auto_selectable
]
viable.sort(key=lambda c: (c.score, len(c.source_section_ids)), reverse=True)
selected = []
covered = set()
for c in viable:
if any(sid in covered for sid in c.source_section_ids):
continue
selected.append(c)
covered.update(c.source_section_ids)
return selected
# ─── Layout Preset Selection ───────────────────────────────────
def select_layout_preset(units: list[CompositionUnit]) -> Optional[str]:
"""v0 : count-based default selection.
1 unit → single
2 units → horizontal-2 (default. vertical-2 는 aspect signal 추가 시 분기)
3 units → top-1-bottom-2 (default. 다른 3-zone variant 는 content-weight signal 추가 시 분기)
4 units → grid-2x2
v0 한계 :
- aspect / content-weight 신호 미반영 → 2 units 는 항상 horizontal, 3 units 는 항상 top-1-bottom-2
- 향후 unit.raw_content 기반 weight 산정 시 정교화
"""
n = len(units)
if n == 0:
return None
if n == 1:
return "single"
if n == 2:
return "horizontal-2"
if n == 3:
return "top-1-bottom-2"
if n == 4:
return "grid-2x2"
raise ValueError(
f"Composition v0 : layout for {n} units not supported (max 4). "
"Larger counts require split-into-multiple-slides decision (future)."
)
# ─── Public entry — composition pipeline ───────────────────────
def plan_composition(sections, v4_lookup_fn, v4_label_to_status: dict,
allowed_statuses: set[str],
capacity_fit_fn=None) -> tuple[list[CompositionUnit], Optional[str], dict]:
"""Composition planner v0.2 entry.
v0.2 변경 :
- capacity_fit_fn 주입 시 모든 candidate 에 capacity 사전 검사
(silent truncate / mapper FitError 사전 차단). 불일치 시 auto_selectable=False
+ filter_reason 'C1: ...'.
v0.1 / v0.1.1 동작 (유지) :
- parent_merged_inferred candidate 생성 (parent V4 없어도)
- review 개념 X. auto_selectable + filter_reasons 만으로 자동 결정
- selection : score desc + coverage 우세 tiebreak
Returns:
units : 자동 선택된 composition units
layout_preset : 8 vocabulary 중 하나 (또는 None)
debug : 후보 전체 + capacity_fit + filter_reasons + preset 결정 근거
"""
candidates = collect_candidates(
sections, v4_lookup_fn, v4_label_to_status,
auto_renderable_statuses=allowed_statuses,
capacity_fit_fn=capacity_fit_fn,
)
scored_all = [score_candidate(c) for c in candidates]
units = select_composition_units(candidates, allowed_statuses)
preset = select_layout_preset(units)
def _candidate_state(c: CompositionUnit) -> str:
if c in units:
return "selected"
if c.phase_z_status not in allowed_statuses:
return "filtered_status" # V4 label → status not auto-renderable
if not c.auto_selectable:
# filter_reasons prefix 로 capacity 와 weak 구분
if any(r.startswith("C") for r in c.filter_reasons):
return "filtered_capacity" # C1 (capacity mismatch)
return "filtered_weak" # W1/W2/W3 (parent_merged_inferred only)
return "filtered_lost" # viable 였지만 coverage 충돌로 밀림
candidates_summary = [
{
"source_section_ids": c.source_section_ids,
"merge_type": c.merge_type,
"template_id": c.frame_template_id,
"label": c.label,
"phase_z_status": c.phase_z_status,
"score": c.score,
"selection_state": _candidate_state(c),
"auto_selectable": c.auto_selectable,
"filter_reasons": list(c.filter_reasons),
"notes": list(c.notes),
"capacity_fit": c.rationale.get("capacity_fit"),
}
for c in scored_all
]
merge_candidates = [
s for s in candidates_summary
if s["merge_type"] in {"parent_merged", "parent_merged_inferred"}
]
capacity_mismatches = [
s for s in candidates_summary
if s["selection_state"] == "filtered_capacity"
]
debug = {
"planner_version": "v0.2",
"selection_rule": (
"score desc, then source_section_ids count desc (coverage tiebreak). "
"filter = phase_z_status ∉ allowed_statuses OR auto_selectable=False. "
"auto_selectable=False 사유 : C1 (capacity mismatch — silent truncate / FitError 차단), "
"W1 (rep not auto-renderable), W2 (all children reject), W3 (majority children non-auto-renderable)."
),
"candidates_total": len(scored_all),
"candidates_viable_auto": len([
c for c in scored_all
if c.phase_z_status in allowed_statuses and c.auto_selectable
]),
"candidates_summary": candidates_summary,
"merge_candidates": merge_candidates,
"capacity_mismatches": capacity_mismatches,
"selected_units_count": len(units),
"layout_preset": preset,
"layout_preset_rationale": (
f"v0 count-based: {len(units)} units → {preset}"
if preset else "no viable units"
),
}
return units, preset, debug

View File

@@ -0,0 +1,237 @@
"""Phase Z-2 retry_failure_classifier + next_action_router (A4 — 분류 / 매핑만).
A3 (zone_ratio_retry) 의 결과 (retry_trace) 를 받아 :
1. **retry_failure_classifier** : 실패 type 을 4 종 중 하나로 분류
2. **next_action_router** : failure_type → next_proposed_action 매핑
본 module 은 ***분류 + 매핑까지만***. layout_adjust / frame_reselect / details_popup
실행 X. retry_trace 에 `failure_classification` + `next_action_proposal` 두 필드 추가.
**잠근 매핑** (사용자 잠금 — 2026-04-29) :
| failure_type | next_proposed_action |
|---|---|
| donor_slack_insufficient | layout_adjust |
| no_donor_candidates | layout_adjust |
| rerender_still_fails | frame_reselect |
| not_attempted | none |
**escalation 단계 hierarchy** (이번 기본 매핑이 따르는 원칙) :
```
layout_adjust (가장 가벼움 — zone 배치만 변경)
↓ 그래도 안 되면
frame_reselect (중간 — frame 자체 변경)
↓ 그래도 안 되면
details_popup_escalation (가장 invasive — content popup, 마지막 resort)
```
`details_popup_escalation` 은 본 매핑에 *없음* — tabular_overflow / structural_major_overflow /
frame_reselect 실패 이후 단계에서 다룸 (별 step).
"""
from __future__ import annotations
from typing import Optional
# ─── §A4-1 failure_type registry ──────────────────────────────────
FAILURE_TYPE_DESCRIPTIONS: dict[str, str] = {
"not_attempted": (
"retry was not attempted (router_active=False or zone_ratio_retry "
"not in proposed actions). 정상 path 의 일부 — 실패 X"
),
"donor_slack_insufficient": (
"primary donor 의 slack 이 target_added_px 보다 작음. 현재 layout 안 "
"redistribution 한도 도달"
),
"no_donor_candidates": (
"donor 후보 자체 없음 — single layout / sibling visual fail / capacity "
"mismatch / slack 0 등의 이유로 zone redistribution 불가"
),
"rerender_still_fails": (
"redistribution 실행 + rerender 까지 했는데도 visual_check 실패. "
"현재 frame/zone 조합이 content 와 맞지 않음"
),
}
# ─── §A4-2 next_action mapping (사용자 잠금) ──────────────────────
NEXT_ACTION_BY_FAILURE: dict[str, str] = {
"donor_slack_insufficient": "layout_adjust",
"no_donor_candidates": "layout_adjust",
"rerender_still_fails": "frame_reselect",
"not_attempted": "none",
}
NEXT_ACTION_RATIONALE: dict[str, str] = {
"donor_slack_insufficient": (
"현재 layout 안 redistribution 끝남 → 다른 layout topology 검토 "
"(layout_adjust). frame 자체는 아직 의심 대상 X"
),
"no_donor_candidates": (
"donor 자체 없거나 모두 막힘 → layout topology 부터 재구성하여 "
"sibling/space 다시 만들어 보는 게 우선 (layout_adjust). frame 변경은 그 다음"
),
"rerender_still_fails": (
"redistribution + rerender 까지 했는데도 visual fail → 현재 "
"frame/zone 조합 자체 부적합, V4 top-k 의 다른 frame 평가 (frame_reselect). "
"popup 직행은 아직 빠름 (tabular / structural_major 가 아닌 한)"
),
"not_attempted": (
"retry 시도 자체가 없었음 (visual ok 등) — escalation 불필요"
),
}
# 본 매핑이 가리키는 next action 들의 *현재 코드* 구현 상태
NEXT_ACTION_IMPLEMENTATION_STATUS: dict[str, str] = {
"layout_adjust": "MISSING",
"frame_reselect": "MISSING",
"none": "n/a",
}
# ─── classifier ──────────────────────────────────────────────────
def classify_retry_failure(retry_trace: dict) -> Optional[dict]:
"""retry_trace → failure classification.
Returns:
None : retry 가 *성공* 한 case (retry_passed=True). 분류할 failure 없음.
dict : {failure_type, classification_rule}
"""
# case 0 : retry 성공 — failure 없음
if retry_trace.get("retry_passed"):
return None
# case 1 : retry 시도 자체 안 됨 (router_active=False 또는 다른 action)
if not retry_trace.get("retry_attempted"):
return {
"failure_type": "not_attempted",
"classification_rule": (
"retry_attempted=False — router_active=False or zone_ratio_retry "
"not in proposed_actions"
),
}
# case 2 : plan 단계 실패 (rerender 안 일어남)
plan = retry_trace.get("plan") or {}
if plan and not plan.get("feasible"):
reason = (plan.get("failure_reason") or "")
reason_lower = reason.lower()
# donor slack insufficient — primary donor 가 있으나 slack 부족
if (
"primary donor" in reason_lower
and "slack" in reason_lower
and "target_added_px" in reason_lower
):
return {
"failure_type": "donor_slack_insufficient",
"classification_rule": (
"plan.feasible=False AND failure_reason matches "
"'primary donor ... slack ... target_added_px ...'"
),
}
# no donor candidates — sibling 자체 없거나 모두 자격 미달
if "no donor candidates" in reason_lower:
return {
"failure_type": "no_donor_candidates",
"classification_rule": (
"plan.feasible=False AND failure_reason matches "
"'no donor candidates'"
),
}
# 위 두 패턴 미매칭 — 보수적으로 no_donor_candidates 로 분류
# (donor 가 거의 모두 막힌 경우 와 구조적으로 비슷)
return {
"failure_type": "no_donor_candidates",
"classification_rule": (
f"plan.feasible=False, failure_reason did not match known patterns. "
f"defaulting to 'no_donor_candidates'. raw failure_reason: {reason!r}"
),
}
# case 3 : plan feasible AND rerender 했는데 visual fail
if retry_trace.get("rerender_attempted") and not retry_trace.get("retry_passed"):
return {
"failure_type": "rerender_still_fails",
"classification_rule": (
"plan.feasible=True AND rerender_attempted=True AND retry_passed=False"
),
}
# case 4 (defensive) : 어떤 case 에도 안 잡힘 — 보수적 fallback
return {
"failure_type": "not_attempted",
"classification_rule": (
"no failure pattern matched (defensive fallback). retry_trace 구조 "
"예상과 다름 — 검토 필요"
),
}
# ─── router ──────────────────────────────────────────────────────
def route_retry_failure(failure_type: str) -> dict:
"""failure_type → next_proposed_action mapping.
Returns:
dict :
next_proposed_action
next_action_rationale
next_action_implementation_status
mapping_source
"""
next_action = NEXT_ACTION_BY_FAILURE.get(failure_type)
if next_action is None:
return {
"next_proposed_action": None,
"next_action_rationale": (
f"failure_type '{failure_type}' has no mapping in NEXT_ACTION_BY_FAILURE"
),
"next_action_implementation_status": "unknown",
"mapping_source": "no mapping (unknown failure_type)",
}
return {
"next_proposed_action": next_action,
"next_action_rationale": NEXT_ACTION_RATIONALE.get(failure_type, ""),
"next_action_implementation_status": NEXT_ACTION_IMPLEMENTATION_STATUS.get(
next_action, "unknown"
),
"mapping_source": "A4 NEXT_ACTION_BY_FAILURE (사용자 잠금 2026-04-29)",
}
# ─── enrichment wrapper ──────────────────────────────────────────
def enrich_retry_trace_with_failure_classification(retry_trace: dict) -> dict:
"""retry_trace 에 `failure_classification` + `next_action_proposal` 두 필드 추가.
Mutates retry_trace in place AND returns it.
retry_passed=True 인 경우 → 두 필드 모두 None (failure 없음, escalation 없음).
"""
fc = classify_retry_failure(retry_trace)
if fc is None:
# retry succeeded — no failure to classify
retry_trace["failure_classification"] = None
retry_trace["next_action_proposal"] = None
return retry_trace
failure_type = fc["failure_type"]
nr = route_retry_failure(failure_type)
retry_trace["failure_classification"] = {
"failure_type": failure_type,
"failure_type_description": FAILURE_TYPE_DESCRIPTIONS.get(failure_type, ""),
"classification_rule": fc["classification_rule"],
}
retry_trace["next_action_proposal"] = nr
return retry_trace

609
src/phase_z2_mapper.py Normal file
View File

@@ -0,0 +1,609 @@
"""Phase Z-2 contract-based generic mapper (v0).
frame 별 hand-coded mapper 의 대체 — catalog `frame_contracts.yaml` 에 선언된
source_shape / cardinality / role_order / payload builder 를 읽고
MdxSection → slot_payload 변환.
원칙 :
- frame ↔ mapper 의 binding = catalog 가 결정 (Python registry hardcoded X)
- cardinality / role_order / payload 형태 = catalog
- reusable primitive : ITEM_PARSERS / COLUMN_BODY_PARSERS / PAYLOAD_BUILDERS named registry
- cardinality strict 위반 → FitError → fallback path 신호 (AI restructuring 후보)
dispatch 모델 :
contract.payload.builder = named entry of PAYLOAD_BUILDERS
builder 가 (section, units, contract) → slot_payload dict 산출
builder 내부에서 ITEM_PARSERS / COLUMN_BODY_PARSERS 등 sub-primitive 호출
v0 등록 frame :
- F13 (three_parallel_requirements) → builder=items_with_role / item_parser=pillar_item
- F29 (process_product_two_way) → builder=process_product_pair / column body parsers
F16 는 다음 step.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Callable
import yaml
PROJECT_ROOT = Path(__file__).parent.parent
CATALOG_PATH = PROJECT_ROOT / "templates" / "phase_z2" / "catalog" / "frame_contracts.yaml"
class FitError(Exception):
"""Contract 위반 — fallback path (AI restructuring) 로 넘어가야 하는 신호.
cardinality 위반 / source_shape mismatch 등. message 에 위반 이유 명시.
"""
# ─── Catalog loading ──────────────────────────────────────────────
_CATALOG_CACHE: dict | None = None
def load_frame_contracts() -> dict:
global _CATALOG_CACHE
if _CATALOG_CACHE is None:
_CATALOG_CACHE = yaml.safe_load(CATALOG_PATH.read_text(encoding="utf-8")) or {}
return _CATALOG_CACHE
def get_contract(template_id: str) -> dict | None:
return load_frame_contracts().get(template_id)
# ─── Source-shape splitters ──────────────────────────────────────
def _split_top_bullets(content: str) -> list[tuple[str, list[str]]]:
"""top-level bullet groups → [(top_line, nested_lines), ...]."""
groups = []
cur_top, cur_nested = None, []
for line in content.splitlines():
if not line.strip():
continue
if re.match(r"^[\*\-]\s", line):
if cur_top is not None:
groups.append((cur_top, cur_nested))
cur_top, cur_nested = line, []
elif line.startswith(" ") and cur_top is not None:
cur_nested.append(line)
if cur_top is not None:
groups.append((cur_top, cur_nested))
return groups
def _split_h3_subsections(content: str) -> list[tuple[str, str]]:
"""### N(.N) TITLE 단위 split → [(title, body), ...].
body = subsection 내부 (### 다음 줄 ~ 다음 ### 직전).
"""
pattern = re.compile(r"^###\s+(\d+(?:\.\d+)?)\s+(.+?)$", re.MULTILINE)
matches = list(pattern.finditer(content))
units = []
for i, m in enumerate(matches):
title = m.group(2).strip()
start = m.end()
end = matches[i + 1].start() if i + 1 < len(matches) else len(content)
body = content[start:end].strip()
units.append((title, body))
return units
def split_source(source_shape: str, content: str) -> list:
if source_shape == "top_bullets":
return _split_top_bullets(content)
if source_shape == "h3_subsections":
return _split_h3_subsections(content)
raise ValueError(
f"Contract supports source_shape in (top_bullets, h3_subsections). "
f"got '{source_shape}'."
)
# ─── Shared text helpers ──────────────────────────────────────────
def _split_label_for_bar(label: str) -> tuple[str, str]:
"""'기술(디지털)' → ('기술', '(디지털)'). 괄호 없으면 (label, '')."""
m = re.match(r"^([^(]+?)\s*(\([^)]+\))\s*$", label.strip())
if m:
return m.group(1).strip(), m.group(2).strip()
return label.strip(), ""
def _extract_bold_or_plain(top_line: str) -> str:
bold = re.search(r"\*\*(.+?)\*\*", top_line)
if bold:
return bold.group(1).strip()
return top_line.strip().lstrip("*-").strip()
def _text_lines_with_indent(nested_lines: list[str], base_indent: int = 0) -> list[dict]:
text_lines = []
for line in nested_lines:
if not line.strip():
continue
s = line.strip()
if s in ("<br/>", "<br>", "---"):
continue
if not re.match(r"^[\*\-]\s", s):
continue
indent = len(line) - len(line.lstrip())
rel = max(0, indent - base_indent)
indent_level = max(0, rel // 2)
text = re.sub(r"^[\*\-]\s+", "", s)
text = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", text)
text_lines.append({"text": text, "indent": indent_level})
return text_lines
def _extract_markdown_table(content: str) -> tuple[list[dict] | None, str]:
"""Markdown 표 → [{from, to}] (column 1 = from, column 3 = to).
AS-IS / TO-BE 형식의 3-column 표 (from | arrow | to) 를 transforms 로 변환.
Returns (transforms_or_None, content_without_table).
"""
pattern = re.compile(
r"(^[ \t]*\|[^\n]+\|\n[ \t]*\|[\s\-:|]+\|\n(?:[ \t]*\|[^\n]+\|\n?)+)",
re.MULTILINE,
)
m = pattern.search(content)
if not m:
return None, content
rows = [r.strip() for r in m.group(1).strip().splitlines() if r.strip()]
transforms = []
for r in rows[2:]:
cells = [c.strip() for c in r.strip("|").split("|")]
if len(cells) >= 3:
f = re.sub(r"\*\*(.+?)\*\*", r"\1", cells[0])
t = re.sub(r"\*\*(.+?)\*\*", r"\1", cells[2])
transforms.append({"from": f, "to": t})
remaining = content[:m.start()] + content[m.end():]
return (transforms or None), remaining
# ─── Item parser primitives (top-bullet 단위) ─────────────────────
def _parse_nested_pillar_sections(nested_lines: list[str]) -> list[dict]:
"""Pillar nested → [{heading, text_lines}, ...]."""
sections = []
cur_heading = None
cur_text_lines: list[dict] = []
section_base_indent: int | None = None
for line in nested_lines:
if not line.strip():
continue
indent = len(line) - len(line.lstrip())
stripped = line.strip()
if not re.match(r"^[\*\-]\s", stripped):
continue
if section_base_indent is None or indent <= section_base_indent:
if cur_heading is not None:
sections.append({"heading": cur_heading, "text_lines": cur_text_lines})
bold = re.search(r"\*\*(.+?)\*\*", stripped)
cur_heading = (bold.group(1).strip() if bold
else stripped.lstrip("*-").strip())
cur_text_lines = []
section_base_indent = indent
else:
rel_indent = indent - section_base_indent
indent_level = max(0, (rel_indent - 2) // 2)
text = re.sub(r"^[\*\-]\s+", "", stripped)
text = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", text)
cur_text_lines.append({"text": text, "indent": indent_level})
if cur_heading is not None:
sections.append({"heading": cur_heading, "text_lines": cur_text_lines})
return sections
def parse_pillar_item(unit: tuple[str, list[str]]) -> dict:
"""F13 pillar — bold = label, label 분해, nested = sections."""
top_line, nested_lines = unit
label = _extract_bold_or_plain(top_line)
label_main, label_paren = _split_label_for_bar(label)
sections = _parse_nested_pillar_sections(nested_lines)
return {
"label": label,
"label_main": label_main,
"label_paren": label_paren,
"sections": sections,
}
def parse_quadrant_item(unit: tuple[str, list[str]]) -> dict:
"""F16 quadrant — bold = label, nested = body (text_lines flat list, no heading).
F13 pillar 와의 차이 :
- pillar_item 은 nested 안에서 heading + text_lines 계층 분리
- quadrant_item 은 nested 전체를 하나의 text_lines list 로 (heading 없음)
Returns:
{label, body: [{text, indent}, ...]}
"""
top_line, nested_lines = unit
label = _extract_bold_or_plain(top_line)
non_empty = [l for l in nested_lines if l.strip()]
base = min((len(l) - len(l.lstrip()) for l in non_empty), default=0)
body = _text_lines_with_indent(nested_lines, base_indent=base)
return {"label": label, "body": body}
ITEM_PARSERS: dict[str, Callable] = {
"pillar_item": parse_pillar_item,
"quadrant_item": parse_quadrant_item,
}
# ─── Column body parsers (h3 subsection body 단위) ────────────────
def _parse_column_sections(body: str, transform_first: bool) -> list[dict]:
"""Column body → list of sections.
transform_first=True 면 첫 top-bullet 의 nested 안에 markdown table 이 있으면
text_lines 대신 transforms 로 산출 (AS-IS/TO-BE).
"""
groups = _split_top_bullets(body)
sections = []
for i, (top_line, nested_lines) in enumerate(groups):
title = _extract_bold_or_plain(top_line)
if i == 0 and transform_first:
nested_text = "\n".join(nested_lines)
transforms, _ = _extract_markdown_table(nested_text)
if transforms:
sections.append({"title": title, "transforms": transforms})
continue
non_empty = [l for l in nested_lines if l.strip()]
base = min((len(l) - len(l.lstrip()) for l in non_empty), default=0)
sections.append({
"title": title,
"text_lines": _text_lines_with_indent(nested_lines, base_indent=base),
})
return sections
def parse_column_with_transform(body: str) -> list[dict]:
"""첫 top-bullet 이 AS-IS/TO-BE 표 가능 (F29 process column)."""
return _parse_column_sections(body, transform_first=True)
def parse_column_plain(body: str) -> list[dict]:
"""모두 일반 text_lines section (F29 product column)."""
return _parse_column_sections(body, transform_first=False)
COLUMN_BODY_PARSERS: dict[str, Callable] = {
"column_with_transform": parse_column_with_transform,
"column_plain": parse_column_plain,
}
# ─── Payload builders (named registry — top-level dispatch) ───────
def _resolve_title(section, payload_spec: dict, contract: dict) -> dict:
"""payload.title.source 처리 — v0 = section.title 만 지원."""
title_spec = payload_spec.get("title", {}) or {}
src = title_spec.get("source")
if src is None:
return {}
if src == "section.title":
return {"title": section.title}
raise ValueError(
f"Contract '{contract['template_id']}' has unsupported title source "
f"'{src}'. v0 supports 'section.title' only."
)
def _build_items_with_role(section, units, contract) -> dict:
"""F13-style — top_bullets 각 → array item, role_order[i] 가 item.role_field 채움.
builder_options :
item_parser : ITEM_PARSERS key
array_root : payload[array_root] 에 list 부착
role_field : item dict 에 role 부착할 key (선택)
"""
options = contract["payload"]["builder_options"]
parser_name = options["item_parser"]
parser = ITEM_PARSERS.get(parser_name)
if parser is None:
raise ValueError(
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
f"but ITEM_PARSERS has no such entry."
)
role_order = contract.get("role_order", []) or []
role_field = options.get("role_field")
items = []
for i, unit in enumerate(units):
item = parser(unit)
if role_field and i < len(role_order):
item[role_field] = role_order[i]
items.append(item)
payload: dict = {}
payload.update(_resolve_title(section, contract["payload"], contract))
payload[options["array_root"]] = items
return payload
def _build_process_product_pair(section, units, contract) -> dict:
"""F29-style — h3 subsections 2 개 = 2 명명 column.
builder_options :
pad_sections_to : N (sections list 길이 강제 — 미달 시 빈 section 으로 채움)
columns : list of
- title_to : subsection title → payload[title_to]
body_to : parsed sections → payload[body_to] = {"sections": [...]}
body_parser : COLUMN_BODY_PARSERS key
pad_empty : empty section template (선택, default = {"title": "", "text_lines": []})
"""
options = contract["payload"]["builder_options"]
pad_to = options.get("pad_sections_to")
cols = options["columns"]
if len(units) < len(cols):
raise FitError(
f"Contract '{contract['template_id']}' builder process_product_pair needs "
f"{len(cols)} subsection units, got {len(units)} in section "
f"'{getattr(section, 'section_id', '?')}'."
)
payload: dict = {}
payload.update(_resolve_title(section, contract["payload"], contract))
for i, col in enumerate(cols):
sub_title, sub_body = units[i]
parser_name = col["body_parser"]
parser = COLUMN_BODY_PARSERS.get(parser_name)
if parser is None:
raise ValueError(
f"Contract '{contract['template_id']}' references column body_parser="
f"'{parser_name}' but COLUMN_BODY_PARSERS has no such entry."
)
sections_list = parser(sub_body)
if pad_to is not None:
empty_template = col.get("pad_empty", {"title": "", "text_lines": []})
while len(sections_list) < pad_to:
sections_list.append(dict(empty_template))
sections_list = sections_list[:pad_to]
payload[col["title_to"]] = sub_title
payload[col["body_to"]] = {"sections": sections_list}
return payload
def _build_quadrant_flat_slots(section, units, contract) -> dict:
"""F16-style — top_bullets 각 → flat keyed slots (quadrant_N_label / quadrant_N_body).
F13/F29 와의 차이 = output shape 가 array 도 named columns 도 아닌 flat keyed.
role/position 은 index 1..N 으로 implicit (1=TL, 2=TR, 3=BL, 4=BR — partial template 결정).
builder_options :
item_parser : ITEM_PARSERS key (각 unit → {label, body} dict 산출)
pad_to : N (units 수 < N 이면 빈 slot 으로 채움)
truncate_at : M (units 수 > M 이면 M+1 부터 무시 + _truncated_count 기록)
label_key_pattern : "quadrant_{n}_label" (n = 1-based index)
body_key_pattern : "quadrant_{n}_body"
empty_label : pad slot 의 label 값 (default = "")
empty_body : pad slot 의 body 값 (default = [])
"""
options = contract["payload"]["builder_options"]
parser_name = options["item_parser"]
parser = ITEM_PARSERS.get(parser_name)
if parser is None:
raise ValueError(
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
f"but ITEM_PARSERS has no such entry."
)
pad_to = options.get("pad_to", 4)
truncate_at = options.get("truncate_at", pad_to)
label_key = options.get("label_key_pattern", "quadrant_{n}_label")
body_key = options.get("body_key_pattern", "quadrant_{n}_body")
empty_label = options.get("empty_label", "")
empty_body = options.get("empty_body", [])
payload: dict = {}
payload.update(_resolve_title(section, contract["payload"], contract))
visible_units = list(units[:truncate_at])
parsed = [parser(u) for u in visible_units]
for i in range(pad_to):
n = i + 1
if i < len(parsed):
payload[label_key.format(n=n)] = parsed[i]["label"]
payload[body_key.format(n=n)] = parsed[i]["body"]
else:
payload[label_key.format(n=n)] = empty_label
# list / dict default 는 항상 새 객체 — shared reference 방지
payload[body_key.format(n=n)] = list(empty_body) if isinstance(empty_body, list) else empty_body
if len(units) > truncate_at:
payload["_truncated_count"] = len(units) - truncate_at
return payload
PAYLOAD_BUILDERS: dict[str, Callable] = {
"items_with_role": _build_items_with_role,
"process_product_pair": _build_process_product_pair,
"quadrant_flat_slots": _build_quadrant_flat_slots,
}
# ─── Generic mapper (single dispatch via builder) ────────────────
def _check_cardinality(contract: dict, units: list, section) -> None:
card = contract.get("cardinality", {}) or {}
n = len(units)
strict = card.get("strict")
if strict is not None and n != strict:
raise FitError(
f"Contract '{contract['template_id']}' expects strict {strict} units "
f"(source_shape={contract['source_shape']}), got {n} "
f"in section '{getattr(section, 'section_id', '?')}'. "
f"overflow_policy={card.get('overflow_policy', 'abort_or_review')}."
)
mn = card.get("min")
if mn is not None and n < mn:
raise FitError(
f"Contract '{contract['template_id']}' expects min {mn} units, got {n} "
f"in section '{getattr(section, 'section_id', '?')}'."
)
mx = card.get("max")
if mx is not None and n > mx:
raise FitError(
f"Contract '{contract['template_id']}' expects max {mx} units, got {n} "
f"in section '{getattr(section, 'section_id', '?')}'."
)
def compute_capacity_fit(template_id: str, content: str) -> dict:
"""Content 의 item_count vs template contract capacity 비교 (planner 단계 사전 검사).
목적 : 자동 파이프라인이 "이 frame 에 이 content 넣으면 잘린다 / 안 맞는다"
render 전에 미리 알도록. silent truncate / FitError 차단의 입력 신호.
Returns:
dict with :
item_count : source_shape 으로 split 한 unit 수
source_shape : contract 의 source_shape ('top_bullets' / 'h3_subsections' / ...)
capacity : {strict, min, max, truncate_at, pad_to} (없는 키는 None)
fit_status : 'ok' / 'strict_mismatch' / 'exceeds_max' / 'below_min' /
'exceeds_truncate' / 'no_contract' / 'unknown_source_shape'
mismatch_reason : str | None — fit_status != 'ok' 일 때 이유
fit 룰 (자동 파이프라인이 silent loss 방지하기 위한 보수적 규칙):
1. strict cardinality 가 있으면 정확히 일치해야 함
2. max 가 있으면 그 이하
3. min 이 있으면 그 이상
4. truncate_at 이 있으면 그 이하 (초과 시 builder 가 자르므로 = 콘텐츠 손실)
5. pad_to 만 있고 item_count 가 부족 → mismatch 아님 (빈 slot 으로 채워질 뿐, 손실 X)
"""
contract = get_contract(template_id)
if contract is None:
return {
"item_count": None,
"source_shape": None,
"capacity": {"strict": None, "min": None, "max": None,
"truncate_at": None, "pad_to": None},
"fit_status": "no_contract",
"mismatch_reason": (
f"no contract for template_id='{template_id}' — capacity check skipped. "
f"이 candidate 는 catalog-only dispatch 의 ValueError 가 mapper 단계에서 발생할 것."
),
}
source_shape = contract.get("source_shape")
try:
units = split_source(source_shape, content)
except ValueError:
return {
"item_count": None,
"source_shape": source_shape,
"capacity": {"strict": None, "min": None, "max": None,
"truncate_at": None, "pad_to": None},
"fit_status": "unknown_source_shape",
"mismatch_reason": f"source_shape='{source_shape}' is not supported by split_source().",
}
item_count = len(units)
cardinality = contract.get("cardinality") or {}
strict = cardinality.get("strict")
mn = cardinality.get("min")
mx = cardinality.get("max")
builder_options = (contract.get("payload") or {}).get("builder_options") or {}
truncate_at = builder_options.get("truncate_at")
pad_to = builder_options.get("pad_to")
capacity = {
"strict": strict,
"min": mn,
"max": mx,
"truncate_at": truncate_at,
"pad_to": pad_to,
}
if strict is not None and item_count != strict:
return {
"item_count": item_count,
"source_shape": source_shape,
"capacity": capacity,
"fit_status": "strict_mismatch",
"mismatch_reason": (
f"strict cardinality {strict}, content has {item_count} items. "
f"mapper 가 FitError 를 raise 할 것."
),
}
if mx is not None and item_count > mx:
return {
"item_count": item_count,
"source_shape": source_shape,
"capacity": capacity,
"fit_status": "exceeds_max",
"mismatch_reason": f"max cardinality {mx}, content has {item_count} items.",
}
if mn is not None and item_count < mn:
return {
"item_count": item_count,
"source_shape": source_shape,
"capacity": capacity,
"fit_status": "below_min",
"mismatch_reason": f"min cardinality {mn}, content has {item_count} items.",
}
if truncate_at is not None and item_count > truncate_at:
return {
"item_count": item_count,
"source_shape": source_shape,
"capacity": capacity,
"fit_status": "exceeds_truncate",
"mismatch_reason": (
f"builder truncate_at {truncate_at}, content has {item_count} items "
f"({item_count - truncate_at} would be silently dropped). "
f"silent truncate 방지 위해 자동 선택 X."
),
}
return {
"item_count": item_count,
"source_shape": source_shape,
"capacity": capacity,
"fit_status": "ok",
"mismatch_reason": None,
}
def map_with_contract(section, contract: dict) -> dict:
"""MdxSection + contract → slot_payload via named PAYLOAD_BUILDERS dispatch.
Steps :
1. source_shape 따라 raw_content split → units
2. cardinality check (위반 → FitError)
3. payload.builder 의 named entry 조회 → builder(section, units, contract)
"""
units = split_source(contract["source_shape"], section.raw_content)
_check_cardinality(contract, units, section)
payload_spec = contract["payload"]
builder_name = payload_spec.get("builder")
if not builder_name:
raise ValueError(
f"Contract '{contract['template_id']}' missing payload.builder. "
f"available: {sorted(PAYLOAD_BUILDERS.keys())}"
)
builder = PAYLOAD_BUILDERS.get(builder_name)
if builder is None:
raise ValueError(
f"Contract '{contract['template_id']}' references payload.builder="
f"'{builder_name}' but PAYLOAD_BUILDERS has no such entry. "
f"available: {sorted(PAYLOAD_BUILDERS.keys())}"
)
return builder(section, units, contract)

1227
src/phase_z2_pipeline.py Normal file

File diff suppressed because it is too large Load Diff

215
src/phase_z2_retry.py Normal file
View File

@@ -0,0 +1,215 @@
"""Phase Z-2 zone_ratio_retry action v0 (A3 — 실제 zone redistribution 구현).
router 가 *제안* 한 zone_ratio_retry action 의 **실행 layer**.
원칙 (A3 locked rules — 사용자 잠금 7+1) :
1. retry budget = 1 — 한 번만 시도
2. slide / slide-body / title / divider / footer / zone gap 모두 고정
(공통 spacing 깎기 금지)
3. 조정 대상 = router 가 지목한 *target zone* 만 height 증가
4. donor 선택 기준 :
- 같은 layout 의 sibling zone
- visual_check 통과 (이 zone 자체엔 overflow 없음)
- capacity_fit 가 ok
- 현재 height > min_height_px (slack > 0)
- donor min_height_px 아래로 줄지 X
- 여러 후보면 slack 가장 큰 것부터 (greedy)
- 부족 시 retry 실패
5. target_added_px = observed excess_y + safety_margin (small fixed)
— donor 가 min_height 아래로 가면 실패
6. retry 후 status :
- 성공 → PASS 가능
- 실패 → RENDERED_WITH_VISUAL_REGRESSION 유지 (CSS/padding/tolerance 보정 X)
7. debug trace 필수 (retry_attempted / target / donor / before/after / passed / reason)
8. revert 정책 ((b)) :
- redistribution check 실패 → rerender 안 함, original final.html 유지
- rerender 후 visual_check 실패 → original 로 revert (final.html 변경 X),
retried_candidate.html 은 *진단 artifact* 로만 별도 보관
- retry 성공 시에만 final.html = retried version
본 module 은 *plan + apply layer*. rerender / final.html 갱신 / revert 는 pipeline 이.
"""
from __future__ import annotations
import math
from typing import Optional
# 작은 고정 safety margin — 실험적 default. debug 에 기록.
DEFAULT_SAFETY_MARGIN_PX = 4
def plan_zone_ratio_retry(
*,
debug_zones: list[dict],
overflow: dict,
fit_classification: dict,
router_decision: dict,
safety_margin_px: int = DEFAULT_SAFETY_MARGIN_PX,
) -> Optional[dict]:
"""zone_ratio_retry 의 redistribution plan 을 산출.
*plan 만*. 실제 height 적용 / rerender X (caller 가 처리).
Returns:
None : retry 시도 자체가 불필요 (router 가 zone_ratio_retry 제안 X)
dict : retry attempt 정보 (feasible 여부 + 상세)
feasible=True 이면 caller 가 zones_after 로 layout_css 재구성 + rerender 시도.
feasible=False 이면 caller 는 retry 포기 (original final.html 유지).
"""
if not router_decision.get("router_active"):
return None
# zone_ratio_retry 가 router 제안에 포함된 첫 classification 을 target 으로
target_cls = None
for cls in fit_classification.get("classifications", []) or []:
if cls.get("proposed_action") == "zone_ratio_retry":
target_cls = cls
break
if target_cls is None:
return None # 다른 action (popup / reselect) — 본 retry 대상 아님
target_zone_position = target_cls.get("zone_position")
target_excess_y = float(target_cls.get("inputs", {}).get("excess_y", 0))
# round up to integer (subpixel 끼면 부족할 수 있음)
target_added_px = int(math.ceil(target_excess_y)) + int(safety_margin_px)
# zones_before — debug_zones 의 height_px 를 모음
zones_before: dict[str, int] = {}
zone_min_by_pos: dict[str, int] = {}
for dz in debug_zones:
pos = dz.get("position")
if pos is None:
continue
h = dz.get("height_px")
m = dz.get("min_height_px")
if h is None or m is None:
continue
zones_before[pos] = int(h)
zone_min_by_pos[pos] = int(m)
# overflow zone 별 visual fail 정보
overflow_zone_status: dict[str, dict] = {}
for z in overflow.get("zones", []) or []:
overflow_zone_status[z.get("position")] = z
# donor 후보 식별
donor_candidates: list[dict] = []
for dz in debug_zones:
pos = dz.get("position")
if pos is None or pos == target_zone_position:
continue
# rule 4-(a) sibling 확인은 layout 내 sibling = 같은 zones list 안에 있으면 OK
# (본 함수는 1 layout 내 zones 만 받음)
# rule 4-(b) visual_check 통과 — 이 zone 에 자체 overflow / clipped_inner 없음
zinfo = overflow_zone_status.get(pos, {})
zone_self_overflow = bool(zinfo.get("overflowed"))
zone_inner_clipped = bool(zinfo.get("clipped_inner"))
if zone_self_overflow or zone_inner_clipped:
continue
# rule 4-(c) capacity_fit 가 ok
cap_status = (
(dz.get("composition_rationale") or {}).get("capacity_fit", {}).get("fit_status")
)
# 'ok' 아니거나 missing/unknown 이면 보수적으로 제외 (no_contract 는 허용 — capacity_fit 자체 부재)
if cap_status not in {"ok", "no_contract", None}:
continue
# rule 4-(d) 현재 height > min_height
height = zones_before.get(pos)
min_h = zone_min_by_pos.get(pos)
if height is None or min_h is None:
continue
slack = height - min_h
if slack <= 0:
continue
donor_candidates.append({
"position": pos,
"current_height": height,
"min_height": min_h,
"slack": slack,
"capacity_fit_status": cap_status,
})
# rule 4-(f) 여러 후보면 slack 가장 큰 것부터
donor_candidates.sort(key=lambda d: d["slack"], reverse=True)
# base plan dict (failure / success 공용)
base_plan = {
"target_zone_position": target_zone_position,
"target_excess_y": target_excess_y,
"target_added_px": target_added_px,
"safety_margin_px_used": int(safety_margin_px),
"donor_candidates_considered": donor_candidates,
"zones_before": dict(zones_before),
}
if not donor_candidates:
return {
**base_plan,
"feasible": False,
"donor_zone_position": None,
"donor_reduced_px": 0,
"zones_after": dict(zones_before),
"failure_reason": (
f"no donor candidates eligible (sibling visual_check OK + "
f"capacity_fit ok/no_contract + slack > 0)"
),
}
# A3 minimal : single primary donor (multi-donor 는 future)
primary_donor = donor_candidates[0]
if primary_donor["slack"] < target_added_px:
return {
**base_plan,
"feasible": False,
"donor_zone_position": primary_donor["position"],
"donor_max_slack": primary_donor["slack"],
"donor_reduced_px": 0,
"zones_after": dict(zones_before),
"failure_reason": (
f"primary donor '{primary_donor['position']}' slack {primary_donor['slack']}px "
f"< target_added_px {target_added_px}px (excess_y {target_excess_y} + "
f"safety_margin {safety_margin_px}). multi-donor aggregation is future axis."
),
}
# feasible
zones_after = dict(zones_before)
zones_after[target_zone_position] = zones_before[target_zone_position] + target_added_px
zones_after[primary_donor["position"]] = (
zones_before[primary_donor["position"]] - target_added_px
)
return {
**base_plan,
"feasible": True,
"donor_zone_position": primary_donor["position"],
"donor_reduced_px": target_added_px,
"zones_after": zones_after,
}
def apply_retry_to_layout_css(layout_css: dict, plan: dict, zones_data: list[dict],
total_height: int, gap_px: int) -> dict:
"""retry plan 의 zones_after 를 반영한 *새* layout_css 반환 (mutation X).
horizontal-2 같은 dynamic_rows 인 경우만 해당. fr-default layout 은 retry target 아님
(왜냐하면 dynamic heights 가 없으면 redistribution 의미 없음).
"""
new_layout_css = dict(layout_css)
# zone position 순서대로 height_px 추출
new_heights_px = [plan["zones_after"][zd["position"]] for zd in zones_data]
new_layout_css["heights_px"] = new_heights_px
new_layout_css["rows"] = " ".join(f"{h}px" for h in new_heights_px)
new_layout_css["ratios"] = [round(h / total_height, 3) for h in new_heights_px]
new_layout_css["computation"] = "zone_ratio_retry override (A3)"
new_layout_css["dynamic_rows"] = True
new_layout_css["raw_zone_layout"] = (layout_css.get("raw_zone_layout") or {}).copy()
new_layout_css["raw_zone_layout"]["retry_applied"] = True
return new_layout_css

181
src/phase_z2_router.py Normal file
View File

@@ -0,0 +1,181 @@
"""Phase Z-2 overflow_router v0 (A2 — 정책 매핑 layer 만).
fit_classifier 의 출력 (category) 를 spec §4 의 *proposed_action* 으로 매핑하는 layer.
본 module 은 ***매핑까지만***. 실제 action 실행은 별도 step (A3+).
출력 = 각 classification 에 proposed_action 추가 + router 전체 summary.
원칙 :
- classifier = 사실 분류 (category 결정)
- router = 정책 결정 (그 category 면 무엇을 *제안* 할 것인가)
- 본 단계는 *제안 trace* 만. pipeline behavior / abort 정책 / rerender 변경 X
- 실행 안 됨 → 현재 코드는 여전히 visual_check_passed=False 시 sys.exit(1)
그러나 debug.json 에 *어떤 action 이 제안됐는지* 가 기록됨
다음 step (별도 — A3) :
zone_ratio_retry action 의 *실제 구현* — 지금 spec §4 mapping 의 가장 자주
트리거되는 action.
"""
from __future__ import annotations
from typing import Optional
# ─── §4 mapping table (spec PHASE-Z-FIT-CLASSIFIER-ROUTER-SPEC §4) ──
# category → proposed_action (primary)
ACTION_BY_CATEGORY: dict[str, str] = {
"minor_overflow": "zone_ratio_retry",
"moderate_overflow": "layout_adjust",
"structural_minor_overflow": "zone_ratio_retry",
"structural_major_overflow": "details_popup_escalation",
"tabular_overflow": "details_popup_escalation",
"frame_capacity_mismatch": "frame_reselect",
"layout_zone_mismatch": "layout_adjust",
"hard_visual_fail": "abort",
}
# 매핑 근거 — *왜 이 category 면 이 action 인가* trace 용
ACTION_RATIONALE: dict[str, str] = {
"minor_overflow":
"1.5 줄 미만 text/label flow → zone 양보 / spacing 재계산으로 fit 가능",
"moderate_overflow":
"1.5~4 줄 text/label → layout/zone ratio 재분배 필요",
"structural_minor_overflow":
"structural unit boundary spill (<1 unit drop) → zone 양보로 fit, 단위 자르기 X",
"structural_major_overflow":
"1+ structural unit 완전 잘림 → 의미 손실, popup 으로 escalate",
"tabular_overflow":
"표는 행 단위로 잘리면 의미 손실 → popup escalate (또는 table-friendly frame reselect)",
"frame_capacity_mismatch":
"composition capacity_fit 가 이미 mismatch 신호 → V4 top-k 의 다른 frame 평가",
"layout_zone_mismatch":
"frame root 자체 overflow → layout preset 변경 또는 zone 키움",
"hard_visual_fail":
"위 매핑 모두 미적용 — 마지막 fallback (현재 코드는 sys.exit 으로 abort)",
}
# 각 action 의 *현재 코드* 구현 상태 (2026-04-29 기준)
# A2 단계에서 이 매핑이 *어디까지 자동 처리되고 어디서 막히는지* trace 확보용
ACTION_IMPLEMENTATION_STATUS: dict[str, str] = {
"zone_ratio_retry": "IMPLEMENTED", # A3 (2026-04-29) phase_z2_retry.plan_zone_ratio_retry + pipeline orchestration
"layout_adjust": "MISSING",
"details_popup_escalation": "MISSING", # CLAUDE.md 의 <details> 원칙은 있음, runtime 미구현
"frame_reselect": "MISSING", # V4 top-k 자료는 있음, planner 가 rank-1 만
"adapter_needed": "PARTIAL", # composition v0.1.1 의 mapper FitError catch
"abort": "IMPLEMENTED", # sys.exit(1) — pipeline 의 현재 default
}
# ─── 단일 분류 → routing 결과 ─────────────────────────────────────
def route_action(category: str) -> dict:
"""category → proposed_action mapping 결과 (단일).
Returns:
dict :
proposed_action : action 이름 (또는 None)
rationale : *왜* 이 action 인가
implementation_status : implemented / partial / missing / unknown
mapping_source : "spec §4 ACTION_BY_CATEGORY" 또는 "no mapping"
"""
action = ACTION_BY_CATEGORY.get(category)
if action is None:
return {
"proposed_action": None,
"rationale": f"category '{category}' has no mapping in ACTION_BY_CATEGORY",
"implementation_status": "unknown",
"mapping_source": "no mapping (unknown category)",
}
return {
"proposed_action": action,
"rationale": ACTION_RATIONALE.get(category, ""),
"implementation_status": ACTION_IMPLEMENTATION_STATUS.get(action, "unknown"),
"mapping_source": "spec §4 ACTION_BY_CATEGORY",
}
# ─── fit_classification 전체 → router decision ──────────────────
def route_fit_classification(fit_classification: dict) -> dict:
"""fit_classification 의 모든 classifications 에 proposed_action 추가 + summary.
각 classification 에 다음 필드를 *추가* (기존 필드 보존) :
- proposed_action
- proposed_action_rationale
- proposed_action_implementation_status
- proposed_action_mapping_source
Returns:
router decision summary dict :
router_active : True/False (visual_check_passed=False 일 때만 True)
proposed_actions_summary : unique action 들 sorted list
implementation_status_summary : {status: count} dict
routed_count : 처리된 classification 수
routed_details : per-classification routing trace
missing_actions_pending_impl : 본 routing 에서 *현재 미구현* 인 action 모음
note : 사용자 안내 텍스트
"""
if fit_classification.get("visual_check_passed", True):
return {
"router_active": False,
"proposed_actions_summary": [],
"implementation_status_summary": {},
"routed_count": 0,
"routed_details": [],
"missing_actions_pending_impl": [],
"note": "visual check passed — no overflow to route",
}
classifications = fit_classification.get("classifications", []) or []
routed_details = []
for cls in classifications:
category = cls.get("category", "hard_visual_fail")
routing = route_action(category)
# classification entry 에 proposed_action 정보 *추가* (기존 필드 보존)
cls["proposed_action"] = routing["proposed_action"]
cls["proposed_action_rationale"] = routing["rationale"]
cls["proposed_action_implementation_status"] = routing["implementation_status"]
cls["proposed_action_mapping_source"] = routing["mapping_source"]
routed_details.append({
"source": cls.get("source"),
"zone_position": cls.get("zone_position"),
"category": category,
"proposed_action": routing["proposed_action"],
"implementation_status": routing["implementation_status"],
})
# summary
actions_seen = sorted({
r["proposed_action"] for r in routed_details
if r["proposed_action"] is not None
})
status_breakdown: dict[str, int] = {}
missing_actions: list[str] = []
for r in routed_details:
s = r["implementation_status"]
status_breakdown[s] = status_breakdown.get(s, 0) + 1
if s == "MISSING" and r["proposed_action"] not in missing_actions:
missing_actions.append(r["proposed_action"])
return {
"router_active": True,
"proposed_actions_summary": actions_seen,
"implementation_status_summary": status_breakdown,
"routed_count": len(routed_details),
"routed_details": routed_details,
"missing_actions_pending_impl": sorted(missing_actions),
"note": (
"router 는 category → proposed_action 매핑까지 담당. 실제 action 실행은 "
"pipeline 의 별도 orchestrator 가 처리 (예: zone_ratio_retry 는 "
"_attempt_zone_ratio_retry 에서 실행). proposed_action 의 implementation_status "
"가 IMPLEMENTED 이면 pipeline 이 시도하고 결과는 retry_trace 에 기록, "
"MISSING 이면 그 action 은 실행 X 이고 기존 abort/status 흐름 (sys.exit(1)) 으로 종료."
),
}