Files
C.E.L_Slide_test2/src/phase_z2_mapper.py
kyeongmin 73a98b8ad1 IMP-04 F17 schema correction — paired_rows_4x2 + pill alternation + source-faithful theme
source = 8 atomic issues (4 paired rows × 2 cells per texts.md), 이전 strict-4
가 source 의 절반 누락. round 55~73 review-loop 의 calibration frame.

- contract : source_shape=top_bullets / layout_variant=paired_rows_4x2_alternating_pills
  / strict 8 (no pad/truncate) / role_order row_{1..4}_{left,right} / visual_hints
  pill_positions + row_gap_after / builder paired_rows_4x2_slots
- builder : new _build_paired_rows_4x2_slots — 2-axis (row × side) deterministic
  index mapping, strict 8 raises before render, quadrant_item parser 재사용
- partial : 4-row × 2-cell flex, pill alternation (row 1/3 top, row 2/4 bottom
  via column-reverse), row 2-3 visual gap, source-faithful color (rgb(204,82,0)
  →rgb(136,55,0) title + #60A451 row border + rgba(250,237,203,0.15) bg + #0c271e
  body + 2px dashed #60A451 cell 분할선), pill = CSS approximation (asset crop
  variant single-pass 비용 高 → fallback per Codex round 62/68 scope cap, pill
  shape + alternation + green/cream/brown theme 보존), no row headers (source
  부재, inference 금지)
- fixture : flat 8 top-bullet (texts.md 8 issues 그대로)
- smoke + R3 : PASS (11/11 self-check, 5535 chars partial, 8 units rendered,
  pill alternation 정합, row 2-3 gap, no invented row headers)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 15:13:46 +09:00

819 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Phase Z-2 contract-based generic mapper (v0).
frame 별 hand-coded mapper 의 대체 — catalog `frame_contracts.yaml` 에 선언된
source_shape / cardinality / role_order / payload builder 를 읽고
MdxSection → slot_payload 변환.
원칙 :
- frame ↔ mapper 의 binding = catalog 가 결정 (Python registry hardcoded X)
- cardinality / role_order / payload 형태 = catalog
- reusable primitive : ITEM_PARSERS / COLUMN_BODY_PARSERS / PAYLOAD_BUILDERS named registry
- cardinality strict 위반 → FitError → fallback path 신호 (AI restructuring 후보)
dispatch 모델 :
contract.payload.builder = named entry of PAYLOAD_BUILDERS
builder 가 (section, units, contract) → slot_payload dict 산출
builder 내부에서 ITEM_PARSERS / COLUMN_BODY_PARSERS 등 sub-primitive 호출
v0 등록 frame :
- F13 (three_parallel_requirements) → builder=items_with_role / item_parser=pillar_item
- F29 (process_product_two_way) → builder=process_product_pair / column body parsers
F16 는 다음 step.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Callable
import yaml
PROJECT_ROOT = Path(__file__).parent.parent
CATALOG_PATH = PROJECT_ROOT / "templates" / "phase_z2" / "catalog" / "frame_contracts.yaml"
class FitError(Exception):
"""Contract 위반 — fallback path (AI restructuring) 로 넘어가야 하는 신호.
cardinality 위반 / source_shape mismatch 등. message 에 위반 이유 명시.
"""
# ─── Catalog loading ──────────────────────────────────────────────
_CATALOG_CACHE: dict | None = None
def load_frame_contracts() -> dict:
global _CATALOG_CACHE
if _CATALOG_CACHE is None:
_CATALOG_CACHE = yaml.safe_load(CATALOG_PATH.read_text(encoding="utf-8")) or {}
return _CATALOG_CACHE
def get_contract(template_id: str) -> dict | None:
return load_frame_contracts().get(template_id)
# ─── Source-shape splitters ──────────────────────────────────────
def _split_top_bullets(content: str) -> list[tuple[str, list[str]]]:
"""top-level bullet groups → [(top_line, nested_lines), ...]."""
groups = []
cur_top, cur_nested = None, []
for line in content.splitlines():
if not line.strip():
continue
if re.match(r"^[\*\-]\s", line):
if cur_top is not None:
groups.append((cur_top, cur_nested))
cur_top, cur_nested = line, []
elif line.startswith(" ") and cur_top is not None:
cur_nested.append(line)
if cur_top is not None:
groups.append((cur_top, cur_nested))
return groups
def _split_h3_subsections(content: str) -> list[tuple[str, str]]:
"""### N(.N) TITLE 단위 split → [(title, body), ...].
body = subsection 내부 (### 다음 줄 ~ 다음 ### 직전).
"""
pattern = re.compile(r"^###\s+(\d+(?:\.\d+)?)\s+(.+?)$", re.MULTILINE)
matches = list(pattern.finditer(content))
units = []
for i, m in enumerate(matches):
title = m.group(2).strip()
start = m.end()
end = matches[i + 1].start() if i + 1 < len(matches) else len(content)
body = content[start:end].strip()
units.append((title, body))
return units
def split_source(source_shape: str, content: str) -> list:
if source_shape == "top_bullets":
return _split_top_bullets(content)
if source_shape == "h3_subsections":
return _split_h3_subsections(content)
raise ValueError(
f"Contract supports source_shape in (top_bullets, h3_subsections). "
f"got '{source_shape}'."
)
# ─── Shared text helpers ──────────────────────────────────────────
def _split_label_for_bar(label: str) -> tuple[str, str]:
"""'기술(디지털)' → ('기술', '(디지털)'). 괄호 없으면 (label, '')."""
m = re.match(r"^([^(]+?)\s*(\([^)]+\))\s*$", label.strip())
if m:
return m.group(1).strip(), m.group(2).strip()
return label.strip(), ""
def _extract_bold_or_plain(top_line: str) -> str:
bold = re.search(r"\*\*(.+?)\*\*", top_line)
if bold:
return bold.group(1).strip()
return top_line.strip().lstrip("*-").strip()
def _text_lines_with_indent(nested_lines: list[str], base_indent: int = 0) -> list[dict]:
text_lines = []
for line in nested_lines:
if not line.strip():
continue
s = line.strip()
if s in ("<br/>", "<br>", "---"):
continue
if not re.match(r"^[\*\-]\s", s):
continue
indent = len(line) - len(line.lstrip())
rel = max(0, indent - base_indent)
indent_level = max(0, rel // 2)
text = re.sub(r"^[\*\-]\s+", "", s)
text = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", text)
text_lines.append({"text": text, "indent": indent_level})
return text_lines
def _extract_markdown_table(content: str) -> tuple[list[dict] | None, str]:
"""Markdown 표 → [{from, to}] (column 1 = from, column 3 = to).
AS-IS / TO-BE 형식의 3-column 표 (from | arrow | to) 를 transforms 로 변환.
Returns (transforms_or_None, content_without_table).
"""
pattern = re.compile(
r"(^[ \t]*\|[^\n]+\|\n[ \t]*\|[\s\-:|]+\|\n(?:[ \t]*\|[^\n]+\|\n?)+)",
re.MULTILINE,
)
m = pattern.search(content)
if not m:
return None, content
rows = [r.strip() for r in m.group(1).strip().splitlines() if r.strip()]
transforms = []
for r in rows[2:]:
cells = [c.strip() for c in r.strip("|").split("|")]
if len(cells) >= 3:
f = re.sub(r"\*\*(.+?)\*\*", r"\1", cells[0])
t = re.sub(r"\*\*(.+?)\*\*", r"\1", cells[2])
transforms.append({"from": f, "to": t})
remaining = content[:m.start()] + content[m.end():]
return (transforms or None), remaining
# ─── Item parser primitives (top-bullet 단위) ─────────────────────
def _parse_nested_pillar_sections(nested_lines: list[str]) -> list[dict]:
"""Pillar nested → [{heading, text_lines}, ...]."""
sections = []
cur_heading = None
cur_text_lines: list[dict] = []
section_base_indent: int | None = None
for line in nested_lines:
if not line.strip():
continue
indent = len(line) - len(line.lstrip())
stripped = line.strip()
if not re.match(r"^[\*\-]\s", stripped):
continue
if section_base_indent is None or indent <= section_base_indent:
if cur_heading is not None:
sections.append({"heading": cur_heading, "text_lines": cur_text_lines})
bold = re.search(r"\*\*(.+?)\*\*", stripped)
cur_heading = (bold.group(1).strip() if bold
else stripped.lstrip("*-").strip())
cur_text_lines = []
section_base_indent = indent
else:
rel_indent = indent - section_base_indent
indent_level = max(0, (rel_indent - 2) // 2)
text = re.sub(r"^[\*\-]\s+", "", stripped)
text = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", text)
cur_text_lines.append({"text": text, "indent": indent_level})
if cur_heading is not None:
sections.append({"heading": cur_heading, "text_lines": cur_text_lines})
return sections
def parse_pillar_item(unit: tuple[str, list[str]]) -> dict:
"""F13 pillar — bold = label, label 분해, nested = sections."""
top_line, nested_lines = unit
label = _extract_bold_or_plain(top_line)
label_main, label_paren = _split_label_for_bar(label)
sections = _parse_nested_pillar_sections(nested_lines)
return {
"label": label,
"label_main": label_main,
"label_paren": label_paren,
"sections": sections,
}
def parse_quadrant_item(unit: tuple[str, list[str]]) -> dict:
"""F16 quadrant — bold = label, nested = body (text_lines flat list, no heading).
F13 pillar 와의 차이 :
- pillar_item 은 nested 안에서 heading + text_lines 계층 분리
- quadrant_item 은 nested 전체를 하나의 text_lines list 로 (heading 없음)
Returns:
{label, body: [{text, indent}, ...]}
"""
top_line, nested_lines = unit
label = _extract_bold_or_plain(top_line)
non_empty = [l for l in nested_lines if l.strip()]
base = min((len(l) - len(l.lstrip()) for l in non_empty), default=0)
body = _text_lines_with_indent(nested_lines, base_indent=base)
return {"label": label, "body": body}
def parse_compare_row_2col_item(unit: tuple[str, list[str]]) -> dict:
"""F18-style — bold = category label, nested 2 bullets = col_a / col_b values.
Pattern : top bullet = **카테고리**, nested = first 2 bullets.
*Parser 는 prefix stripping 안 함* (Codex round 43 §F1-b — narrow alias 정정).
Prefix stripping 은 *builder 의 strip_col_prefix_aliases option* 으로 위임.
Returns:
{label, col_a, col_b}
"""
top_line, nested_lines = unit
label = _extract_bold_or_plain(top_line)
# nested bullets — strip bullet marker, take first 2 (no prefix stripping)
nested = []
for l in nested_lines:
l_strip = l.strip()
if re.match(r"^[\*\-]\s", l_strip):
txt = re.sub(r"^[\*\-]\s+", "", l_strip)
txt = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", txt)
nested.append(txt)
col_a = nested[0] if len(nested) > 0 else ""
col_b = nested[1] if len(nested) > 1 else ""
return {"label": label, "col_a": col_a, "col_b": col_b}
ITEM_PARSERS: dict[str, Callable] = {
"pillar_item": parse_pillar_item,
"quadrant_item": parse_quadrant_item,
"compare_row_2col_item": parse_compare_row_2col_item,
}
# ─── Column body parsers (h3 subsection body 단위) ────────────────
def _parse_column_sections(body: str, transform_first: bool) -> list[dict]:
"""Column body → list of sections.
transform_first=True 면 첫 top-bullet 의 nested 안에 markdown table 이 있으면
text_lines 대신 transforms 로 산출 (AS-IS/TO-BE).
"""
groups = _split_top_bullets(body)
sections = []
for i, (top_line, nested_lines) in enumerate(groups):
title = _extract_bold_or_plain(top_line)
if i == 0 and transform_first:
nested_text = "\n".join(nested_lines)
transforms, _ = _extract_markdown_table(nested_text)
if transforms:
sections.append({"title": title, "transforms": transforms})
continue
non_empty = [l for l in nested_lines if l.strip()]
base = min((len(l) - len(l.lstrip()) for l in non_empty), default=0)
sections.append({
"title": title,
"text_lines": _text_lines_with_indent(nested_lines, base_indent=base),
})
return sections
def parse_column_with_transform(body: str) -> list[dict]:
"""첫 top-bullet 이 AS-IS/TO-BE 표 가능 (F29 process column)."""
return _parse_column_sections(body, transform_first=True)
def parse_column_plain(body: str) -> list[dict]:
"""모두 일반 text_lines section (F29 product column)."""
return _parse_column_sections(body, transform_first=False)
COLUMN_BODY_PARSERS: dict[str, Callable] = {
"column_with_transform": parse_column_with_transform,
"column_plain": parse_column_plain,
}
# ─── Payload builders (named registry — top-level dispatch) ───────
def _resolve_title(section, payload_spec: dict, contract: dict) -> dict:
"""payload.title.source 처리 — v0 = section.title 만 지원."""
title_spec = payload_spec.get("title", {}) or {}
src = title_spec.get("source")
if src is None:
return {}
if src == "section.title":
return {"title": section.title}
raise ValueError(
f"Contract '{contract['template_id']}' has unsupported title source "
f"'{src}'. v0 supports 'section.title' only."
)
def _build_items_with_role(section, units, contract) -> dict:
"""F13-style — top_bullets 각 → array item, role_order[i] 가 item.role_field 채움.
builder_options :
item_parser : ITEM_PARSERS key
array_root : payload[array_root] 에 list 부착
role_field : item dict 에 role 부착할 key (선택)
"""
options = contract["payload"]["builder_options"]
parser_name = options["item_parser"]
parser = ITEM_PARSERS.get(parser_name)
if parser is None:
raise ValueError(
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
f"but ITEM_PARSERS has no such entry."
)
role_order = contract.get("role_order", []) or []
role_field = options.get("role_field")
items = []
for i, unit in enumerate(units):
item = parser(unit)
if role_field and i < len(role_order):
item[role_field] = role_order[i]
items.append(item)
payload: dict = {}
payload.update(_resolve_title(section, contract["payload"], contract))
payload[options["array_root"]] = items
return payload
def _build_process_product_pair(section, units, contract) -> dict:
"""F29-style — h3 subsections 2 개 = 2 명명 column.
builder_options :
pad_sections_to : N (sections list 길이 강제 — 미달 시 빈 section 으로 채움)
columns : list of
- title_to : subsection title → payload[title_to]
body_to : parsed sections → payload[body_to] = {"sections": [...]}
body_parser : COLUMN_BODY_PARSERS key
pad_empty : empty section template (선택, default = {"title": "", "text_lines": []})
"""
options = contract["payload"]["builder_options"]
pad_to = options.get("pad_sections_to")
cols = options["columns"]
if len(units) < len(cols):
raise FitError(
f"Contract '{contract['template_id']}' builder process_product_pair needs "
f"{len(cols)} subsection units, got {len(units)} in section "
f"'{getattr(section, 'section_id', '?')}'."
)
payload: dict = {}
payload.update(_resolve_title(section, contract["payload"], contract))
for i, col in enumerate(cols):
sub_title, sub_body = units[i]
parser_name = col["body_parser"]
parser = COLUMN_BODY_PARSERS.get(parser_name)
if parser is None:
raise ValueError(
f"Contract '{contract['template_id']}' references column body_parser="
f"'{parser_name}' but COLUMN_BODY_PARSERS has no such entry."
)
sections_list = parser(sub_body)
if pad_to is not None:
empty_template = col.get("pad_empty", {"title": "", "text_lines": []})
while len(sections_list) < pad_to:
sections_list.append(dict(empty_template))
sections_list = sections_list[:pad_to]
payload[col["title_to"]] = sub_title
payload[col["body_to"]] = {"sections": sections_list}
return payload
def _build_quadrant_flat_slots(section, units, contract) -> dict:
"""F16-style — top_bullets 각 → flat keyed slots (quadrant_N_label / quadrant_N_body).
F13/F29 와의 차이 = output shape 가 array 도 named columns 도 아닌 flat keyed.
role/position 은 index 1..N 으로 implicit (1=TL, 2=TR, 3=BL, 4=BR — partial template 결정).
builder_options :
item_parser : ITEM_PARSERS key (각 unit → {label, body} dict 산출)
pad_to : N (units 수 < N 이면 빈 slot 으로 채움)
truncate_at : M (units 수 > M 이면 M+1 부터 무시 + _truncated_count 기록)
label_key_pattern : "quadrant_{n}_label" (n = 1-based index)
body_key_pattern : "quadrant_{n}_body"
empty_label : pad slot 의 label 값 (default = "")
empty_body : pad slot 의 body 값 (default = [])
"""
options = contract["payload"]["builder_options"]
parser_name = options["item_parser"]
parser = ITEM_PARSERS.get(parser_name)
if parser is None:
raise ValueError(
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
f"but ITEM_PARSERS has no such entry."
)
pad_to = options.get("pad_to", 4)
truncate_at = options.get("truncate_at", pad_to)
label_key = options.get("label_key_pattern", "quadrant_{n}_label")
body_key = options.get("body_key_pattern", "quadrant_{n}_body")
empty_label = options.get("empty_label", "")
empty_body = options.get("empty_body", [])
payload: dict = {}
payload.update(_resolve_title(section, contract["payload"], contract))
visible_units = list(units[:truncate_at])
parsed = [parser(u) for u in visible_units]
for i in range(pad_to):
n = i + 1
if i < len(parsed):
payload[label_key.format(n=n)] = parsed[i]["label"]
payload[body_key.format(n=n)] = parsed[i]["body"]
else:
payload[label_key.format(n=n)] = empty_label
# list / dict default 는 항상 새 객체 — shared reference 방지
payload[body_key.format(n=n)] = list(empty_body) if isinstance(empty_body, list) else empty_body
if len(units) > truncate_at:
payload["_truncated_count"] = len(units) - truncate_at
return payload
def _build_cycle_intersect_3(section, units, contract) -> dict:
"""F12-style — cycle-3way-intersection. top_bullets 3 items → flat keyed
circle_1_label / circle_2_label / circle_3_label. *body 무시* (label only —
이 frame 의 3 메인 원 visual 은 label 만 사용). intersection 텍스트는 별
optional (default 빈 문자).
F16 quadrant_flat_slots 와 비교 :
- F16 : N=4 + body 사용 (quadrant_N_label + quadrant_N_body)
- F12 : N=3 + body 미사용 (circle_N_label 만) + intersection text 별
builder_options :
item_parser : ITEM_PARSERS key (label 만 사용, body 무시)
pad_to : N (default=3) — units < N 이면 empty label 로 채움
truncate_at : M (default=3) — units > M 이면 무시 + _truncated_count
label_key_pattern : "circle_{n}_label" (n = 1-based)
empty_label : pad slot 의 label 값 (default = "")
intersection_default : intersection 텍스트 (slot optional — default 빈 문자)
"""
options = contract["payload"]["builder_options"]
parser_name = options["item_parser"]
parser = ITEM_PARSERS.get(parser_name)
if parser is None:
raise ValueError(
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
f"but ITEM_PARSERS has no such entry."
)
pad_to = options.get("pad_to", 3)
truncate_at = options.get("truncate_at", pad_to)
label_key = options.get("label_key_pattern", "circle_{n}_label")
empty_label = options.get("empty_label", "")
intersection = options.get("intersection_default", "")
payload: dict = {}
payload.update(_resolve_title(section, contract["payload"], contract))
visible_units = list(units[:truncate_at])
parsed = [parser(u) for u in visible_units]
for i in range(pad_to):
n = i + 1
if i < len(parsed):
payload[label_key.format(n=n)] = parsed[i]["label"]
else:
payload[label_key.format(n=n)] = empty_label
payload["intersection"] = intersection
if len(units) > truncate_at:
payload["_truncated_count"] = len(units) - truncate_at
return payload
def _build_compare_table_2col(section, units, contract) -> dict:
"""F18-style — compare table with 2 columns + N category rows.
payload :
title : section.title
col_a_label : 좌 column header (예: "BIM")
col_b_label : 우 column header (예: "DX")
rows : list[{label, col_a, col_b}] — top_bullets 각각 → row
builder_options :
item_parser : ITEM_PARSERS key (예: `compare_row_2col_item`)
col_a_label_default : col_a header (MDX 미명시 시 fallback. F1-a fix)
col_b_label_default : col_b header (MDX 미명시 시 fallback)
strip_col_prefix_aliases : list[str] — col_a/col_b 값의 prefix `<alias>:`
를 strip (Codex round 43 §F1-b — narrow alias).
예 : ["BIM", "DX"]. default [] (no stripping).
max_rows : N (default 999 — practical 한계).
"""
options = contract["payload"]["builder_options"]
parser_name = options["item_parser"]
parser = ITEM_PARSERS.get(parser_name)
if parser is None:
raise ValueError(
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
f"but ITEM_PARSERS has no such entry."
)
col_a_label = options.get("col_a_label_default", "")
col_b_label = options.get("col_b_label_default", "")
strip_aliases = options.get("strip_col_prefix_aliases", []) or []
max_rows = options.get("max_rows", 999)
payload: dict = {}
payload.update(_resolve_title(section, contract["payload"], contract))
payload["col_a_label"] = col_a_label
payload["col_b_label"] = col_b_label
# Compile precise prefix patterns per alias (Codex round 43 §F1-b narrow).
strip_patterns = [
re.compile(rf"^{re.escape(a)}\s*[:]\s*(.+)$")
for a in strip_aliases
]
def _strip_alias(value: str) -> str:
for pat in strip_patterns:
m = pat.match(value)
if m:
return m.group(1).strip()
return value
visible = list(units[:max_rows])
rows = []
for u in visible:
row = parser(u)
if strip_patterns:
row["col_a"] = _strip_alias(row.get("col_a", ""))
row["col_b"] = _strip_alias(row.get("col_b", ""))
rows.append(row)
payload["rows"] = rows
if len(units) > max_rows:
payload["_truncated_count"] = len(units) - max_rows
return payload
def _build_paired_rows_4x2_slots(section, units, contract) -> dict:
"""F17-style — paired_rows_4x2_alternating_pills. top_bullets 8 units → 2-axis keyed slots.
1-axis (quadrant_flat_slots = TL/TR/BL/BR) vs 2-axis (row × side) :
- quadrant : index 1..4 → quadrant_N_{label,body}
- paired_rows_4x2 : index 1..8 → row_R_SIDE_{label,body} where R = ceil(i/2), SIDE = left|right
deterministic index mapping per Codex round 60 §Q3 answer + round 70 §1 :
unit 1 → row_1_left unit 2 → row_1_right
unit 3 → row_2_left unit 4 → row_2_right
unit 5 → row_3_left unit 6 → row_3_right
unit 7 → row_4_left unit 8 → row_4_right
strict 8 : under/over → FitError before render (Codex round 60 §3, round 62 acceptance
criterion "no pad_to/truncate_at fallback hides cardinality mismatch").
parser = quadrant_item (label + body heading-less) — F17 atomic issue = single label + single body.
builder_options :
item_parser : ITEM_PARSERS key (default = "quadrant_item")
label_key_pattern : "row_{r}_{side}_label"
body_key_pattern : "row_{r}_{side}_body"
rows : 4
sides : ["left", "right"]
"""
options = contract["payload"]["builder_options"]
parser_name = options["item_parser"]
parser = ITEM_PARSERS.get(parser_name)
if parser is None:
raise ValueError(
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
f"but ITEM_PARSERS has no such entry."
)
label_key = options.get("label_key_pattern", "row_{r}_{side}_label")
body_key = options.get("body_key_pattern", "row_{r}_{side}_body")
rows = options.get("rows", 4)
sides = options.get("sides", ["left", "right"])
expected = rows * len(sides)
if len(units) != expected:
raise ValueError(
f"Contract '{contract['template_id']}' requires strict {expected} units "
f"(rows={rows} × sides={len(sides)}), got {len(units)}. "
f"silent pad/truncate is disabled for paired_rows_4x2_slots."
)
payload: dict = {}
payload.update(_resolve_title(section, contract["payload"], contract))
parsed = [parser(u) for u in units]
idx = 0
for r in range(1, rows + 1):
for side in sides:
payload[label_key.format(r=r, side=side)] = parsed[idx]["label"]
payload[body_key.format(r=r, side=side)] = parsed[idx]["body"]
idx += 1
return payload
PAYLOAD_BUILDERS: dict[str, Callable] = {
"items_with_role": _build_items_with_role,
"process_product_pair": _build_process_product_pair,
"quadrant_flat_slots": _build_quadrant_flat_slots,
"cycle_intersect_3": _build_cycle_intersect_3,
"compare_table_2col": _build_compare_table_2col,
"paired_rows_4x2_slots": _build_paired_rows_4x2_slots,
}
# ─── Generic mapper (single dispatch via builder) ────────────────
def _check_cardinality(contract: dict, units: list, section) -> None:
card = contract.get("cardinality", {}) or {}
n = len(units)
strict = card.get("strict")
if strict is not None and n != strict:
raise FitError(
f"Contract '{contract['template_id']}' expects strict {strict} units "
f"(source_shape={contract['source_shape']}), got {n} "
f"in section '{getattr(section, 'section_id', '?')}'. "
f"overflow_policy={card.get('overflow_policy', 'abort_or_review')}."
)
mn = card.get("min")
if mn is not None and n < mn:
raise FitError(
f"Contract '{contract['template_id']}' expects min {mn} units, got {n} "
f"in section '{getattr(section, 'section_id', '?')}'."
)
mx = card.get("max")
if mx is not None and n > mx:
raise FitError(
f"Contract '{contract['template_id']}' expects max {mx} units, got {n} "
f"in section '{getattr(section, 'section_id', '?')}'."
)
def compute_capacity_fit(template_id: str, content: str) -> dict:
"""Content 의 item_count vs template contract capacity 비교 (planner 단계 사전 검사).
목적 : 자동 파이프라인이 "이 frame 에 이 content 넣으면 잘린다 / 안 맞는다"
render 전에 미리 알도록. silent truncate / FitError 차단의 입력 신호.
Returns:
dict with :
item_count : source_shape 으로 split 한 unit 수
source_shape : contract 의 source_shape ('top_bullets' / 'h3_subsections' / ...)
capacity : {strict, min, max, truncate_at, pad_to} (없는 키는 None)
fit_status : 'ok' / 'strict_mismatch' / 'exceeds_max' / 'below_min' /
'exceeds_truncate' / 'no_contract' / 'unknown_source_shape'
mismatch_reason : str | None — fit_status != 'ok' 일 때 이유
fit 룰 (자동 파이프라인이 silent loss 방지하기 위한 보수적 규칙):
1. strict cardinality 가 있으면 정확히 일치해야 함
2. max 가 있으면 그 이하
3. min 이 있으면 그 이상
4. truncate_at 이 있으면 그 이하 (초과 시 builder 가 자르므로 = 콘텐츠 손실)
5. pad_to 만 있고 item_count 가 부족 → mismatch 아님 (빈 slot 으로 채워질 뿐, 손실 X)
"""
contract = get_contract(template_id)
if contract is None:
return {
"item_count": None,
"source_shape": None,
"capacity": {"strict": None, "min": None, "max": None,
"truncate_at": None, "pad_to": None},
"fit_status": "no_contract",
"mismatch_reason": (
f"no contract for template_id='{template_id}' — capacity check skipped. "
f"이 candidate 는 catalog-only dispatch 의 ValueError 가 mapper 단계에서 발생할 것."
),
}
source_shape = contract.get("source_shape")
try:
units = split_source(source_shape, content)
except ValueError:
return {
"item_count": None,
"source_shape": source_shape,
"capacity": {"strict": None, "min": None, "max": None,
"truncate_at": None, "pad_to": None},
"fit_status": "unknown_source_shape",
"mismatch_reason": f"source_shape='{source_shape}' is not supported by split_source().",
}
item_count = len(units)
cardinality = contract.get("cardinality") or {}
strict = cardinality.get("strict")
mn = cardinality.get("min")
mx = cardinality.get("max")
builder_options = (contract.get("payload") or {}).get("builder_options") or {}
truncate_at = builder_options.get("truncate_at")
pad_to = builder_options.get("pad_to")
capacity = {
"strict": strict,
"min": mn,
"max": mx,
"truncate_at": truncate_at,
"pad_to": pad_to,
}
if strict is not None and item_count != strict:
return {
"item_count": item_count,
"source_shape": source_shape,
"capacity": capacity,
"fit_status": "strict_mismatch",
"mismatch_reason": (
f"strict cardinality {strict}, content has {item_count} items. "
f"mapper 가 FitError 를 raise 할 것."
),
}
if mx is not None and item_count > mx:
return {
"item_count": item_count,
"source_shape": source_shape,
"capacity": capacity,
"fit_status": "exceeds_max",
"mismatch_reason": f"max cardinality {mx}, content has {item_count} items.",
}
if mn is not None and item_count < mn:
return {
"item_count": item_count,
"source_shape": source_shape,
"capacity": capacity,
"fit_status": "below_min",
"mismatch_reason": f"min cardinality {mn}, content has {item_count} items.",
}
if truncate_at is not None and item_count > truncate_at:
return {
"item_count": item_count,
"source_shape": source_shape,
"capacity": capacity,
"fit_status": "exceeds_truncate",
"mismatch_reason": (
f"builder truncate_at {truncate_at}, content has {item_count} items "
f"({item_count - truncate_at} would be silently dropped). "
f"silent truncate 방지 위해 자동 선택 X."
),
}
return {
"item_count": item_count,
"source_shape": source_shape,
"capacity": capacity,
"fit_status": "ok",
"mismatch_reason": None,
}
def map_with_contract(section, contract: dict) -> dict:
"""MdxSection + contract → slot_payload via named PAYLOAD_BUILDERS dispatch.
Steps :
1. source_shape 따라 raw_content split → units
2. cardinality check (위반 → FitError)
3. payload.builder 의 named entry 조회 → builder(section, units, contract)
"""
units = split_source(contract["source_shape"], section.raw_content)
_check_cardinality(contract, units, section)
payload_spec = contract["payload"]
builder_name = payload_spec.get("builder")
if not builder_name:
raise ValueError(
f"Contract '{contract['template_id']}' missing payload.builder. "
f"available: {sorted(PAYLOAD_BUILDERS.keys())}"
)
builder = PAYLOAD_BUILDERS.get(builder_name)
if builder is None:
raise ValueError(
f"Contract '{contract['template_id']}' references payload.builder="
f"'{builder_name}' but PAYLOAD_BUILDERS has no such entry. "
f"available: {sorted(PAYLOAD_BUILDERS.keys())}"
)
return builder(section, units, contract)