Some checks failed
Multi-MDX Regression (IMP-91) / multi-mdx-regression (push) Failing after 26s
- catalog (frame_contracts.yaml): F18 bim_dx_comparison_table col_a/col_b label_default_role=placeholder; F30 industry_current_status_three_col + F31 industry_characteristics_three_col col_a/col_b/col_c forward-compat placeholder; F33 engn_sw_three_types untouched (no label_default). - mapper (_build_compare_table_2col): generic _resolve_label_default(col_key) branches on <col>_label_default_role — placeholder -> '' (Figma placeholder suppressed at runtime), fallback -> catalog literal (legacy default), unknown -> ValueError with template_id + role_key + value. Absent role defaults to fallback (backward compat for contracts without discriminator). - tests (tests/phase_z2/test_imp40_label_default_role.py): u4 generic matrix (placeholder / fallback / absent / unknown / 3-col axis) + u5 F18-reuse non-BIM/DX synthetic rows asserting placeholder labels emit '' and BIM/DX literal tokens do not leak. - snapshot (tests/integration/__snapshots__/slot_payload.json): mdx 01 F18 string_slot_nonempty.col_a_label/col_b_label True -> False (u6 expected drift from u3 placeholder -> empty string flip). slot_names + rows + title preserved. Verification: - imp40_label_default_role: 6/6 PASSED - phase_z2 sweep: 608/608 PASSED - multi_mdx_regression: 50/50 PASSED - cross-suite sweep: 662/662 PASSED - BIM/DX literal grep on mapper + new test: 0 hits - No mdx-specific branches (mdx 03/04/05 grep on mapper: 0 hits) Guardrails: no MDX 03/04/05 hardcoding (catalog policy only); no spacing shrink; no auto frame swap on reject; no AI call at Step 12; F33 untouched. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
944 lines
37 KiB
Python
944 lines
37 KiB
Python
"""Phase Z-2 contract-based generic mapper (v0).
|
||
|
||
frame 별 hand-coded mapper 의 대체 — catalog `frame_contracts.yaml` 에 선언된
|
||
source_shape / cardinality / role_order / payload builder 를 읽고
|
||
MdxSection → slot_payload 변환.
|
||
|
||
원칙 :
|
||
- frame ↔ mapper 의 binding = catalog 가 결정 (Python registry hardcoded X)
|
||
- cardinality / role_order / payload 형태 = catalog
|
||
- reusable primitive : ITEM_PARSERS / COLUMN_BODY_PARSERS / PAYLOAD_BUILDERS named registry
|
||
- cardinality strict 위반 → FitError → fallback path 신호 (AI restructuring 후보)
|
||
|
||
dispatch 모델 :
|
||
contract.payload.builder = named entry of PAYLOAD_BUILDERS
|
||
builder 가 (section, units, contract) → slot_payload dict 산출
|
||
builder 내부에서 ITEM_PARSERS / COLUMN_BODY_PARSERS 등 sub-primitive 호출
|
||
|
||
v0 등록 frame :
|
||
- F13 (three_parallel_requirements) → builder=items_with_role / item_parser=pillar_item
|
||
- F29 (process_product_two_way) → builder=process_product_pair / column body parsers
|
||
F16 는 다음 step.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Callable
|
||
|
||
import yaml
|
||
|
||
|
||
PROJECT_ROOT = Path(__file__).parent.parent
|
||
CATALOG_PATH = PROJECT_ROOT / "templates" / "phase_z2" / "catalog" / "frame_contracts.yaml"
|
||
V4_FALLBACK_POLICY_PATH = PROJECT_ROOT / "templates" / "phase_z2" / "catalog" / "v4_fallback_policy.yaml"
|
||
|
||
|
||
class FitError(Exception):
|
||
"""Contract 위반 — fallback path (AI restructuring) 로 넘어가야 하는 신호.
|
||
|
||
cardinality 위반 / source_shape mismatch 등. message 에 위반 이유 명시.
|
||
"""
|
||
|
||
|
||
class BuilderMissingError(FitError):
|
||
"""Contract.payload.builder ↔ PAYLOAD_BUILDERS registry mismatch.
|
||
|
||
FitError subclass — pipeline 의 기존 `except FitError` 경로가 그대로
|
||
adapter_needed 로 라우팅 (mdx04 hard crash 차단, IMP-#85 u1).
|
||
"""
|
||
|
||
|
||
class CatalogInvariantError(Exception):
|
||
"""Catalog ↔ runtime registry drift detected at load time.
|
||
|
||
Boot-time invariant violation (IMP-#85 u2). Distinct from FitError:
|
||
runtime fallback 대상이 아니라 catalog wiring 결함 (fail-fast).
|
||
"""
|
||
|
||
|
||
# ─── Catalog loading ──────────────────────────────────────────────
|
||
|
||
_CATALOG_CACHE: dict | None = None
|
||
|
||
|
||
def load_frame_contracts() -> dict:
|
||
global _CATALOG_CACHE
|
||
if _CATALOG_CACHE is None:
|
||
catalog = yaml.safe_load(CATALOG_PATH.read_text(encoding="utf-8")) or {}
|
||
_check_catalog_builder_invariant(catalog)
|
||
_CATALOG_CACHE = catalog
|
||
return _CATALOG_CACHE
|
||
|
||
|
||
def get_contract(template_id: str) -> dict | None:
|
||
return load_frame_contracts().get(template_id)
|
||
|
||
|
||
# ─── V4 fallback policy loading (IMP-38) ──────────────────────────
|
||
|
||
_V4_FALLBACK_POLICY_CACHE: dict | None = None
|
||
|
||
_V4_FALLBACK_POLICY_DEFAULT: dict = {
|
||
"policy_type": "static",
|
||
"usable_threshold": 1,
|
||
"default_max_rank": 3,
|
||
"extended_max_rank": 3, # graceful: yaml 없을 시 확장 X (byte-identical to pre-IMP-38)
|
||
}
|
||
|
||
|
||
def load_v4_fallback_policy() -> dict:
|
||
"""IMP-38 V4 fallback policy loader (separate yaml, catalog 오염 방지).
|
||
|
||
Returns dict with keys: policy_type, usable_threshold, default_max_rank, extended_max_rank.
|
||
|
||
Codex #1 권장: frame_contracts.yaml top-level 오염 회피 (별 yaml).
|
||
Codex #3 LOCK: load_frame_contracts() shape 변경 X (이 함수는 별 cache).
|
||
|
||
Graceful fallback:
|
||
yaml 파일 없을 시 → _V4_FALLBACK_POLICY_DEFAULT (default_max_rank=3, extended=3)
|
||
→ backward compat byte-identical to pre-IMP-38 behavior.
|
||
|
||
Returns:
|
||
dict — 정책 키 (정책 yaml 의 superset 가능, 알 수 없는 키는 무시 권장).
|
||
"""
|
||
global _V4_FALLBACK_POLICY_CACHE
|
||
if _V4_FALLBACK_POLICY_CACHE is None:
|
||
if V4_FALLBACK_POLICY_PATH.exists():
|
||
loaded = yaml.safe_load(V4_FALLBACK_POLICY_PATH.read_text(encoding="utf-8")) or {}
|
||
# merge with default (yaml 키 부분 누락 시 default 로 fall through)
|
||
_V4_FALLBACK_POLICY_CACHE = {**_V4_FALLBACK_POLICY_DEFAULT, **loaded}
|
||
else:
|
||
_V4_FALLBACK_POLICY_CACHE = dict(_V4_FALLBACK_POLICY_DEFAULT)
|
||
return _V4_FALLBACK_POLICY_CACHE
|
||
|
||
|
||
# ─── Source-shape splitters ──────────────────────────────────────
|
||
|
||
def _split_top_bullets(content: str) -> list[tuple[str, list[str]]]:
|
||
"""top-level bullet groups → [(top_line, nested_lines), ...]."""
|
||
groups = []
|
||
cur_top, cur_nested = None, []
|
||
for line in content.splitlines():
|
||
if not line.strip():
|
||
continue
|
||
if re.match(r"^[\*\-]\s", line):
|
||
if cur_top is not None:
|
||
groups.append((cur_top, cur_nested))
|
||
cur_top, cur_nested = line, []
|
||
elif line.startswith(" ") and cur_top is not None:
|
||
cur_nested.append(line)
|
||
if cur_top is not None:
|
||
groups.append((cur_top, cur_nested))
|
||
return groups
|
||
|
||
|
||
def _split_h3_subsections(content: str) -> list[tuple[str, str]]:
|
||
"""### N(.N) TITLE 단위 split → [(title, body), ...].
|
||
|
||
body = subsection 내부 (### 다음 줄 ~ 다음 ### 직전).
|
||
"""
|
||
pattern = re.compile(r"^###\s+(\d+(?:\.\d+)?)\s+(.+?)$", re.MULTILINE)
|
||
matches = list(pattern.finditer(content))
|
||
units = []
|
||
for i, m in enumerate(matches):
|
||
title = m.group(2).strip()
|
||
start = m.end()
|
||
end = matches[i + 1].start() if i + 1 < len(matches) else len(content)
|
||
body = content[start:end].strip()
|
||
units.append((title, body))
|
||
return units
|
||
|
||
|
||
def split_source(source_shape: str, content: str) -> list:
|
||
if source_shape == "top_bullets":
|
||
return _split_top_bullets(content)
|
||
if source_shape == "h3_subsections":
|
||
return _split_h3_subsections(content)
|
||
raise ValueError(
|
||
f"Contract supports source_shape in (top_bullets, h3_subsections). "
|
||
f"got '{source_shape}'."
|
||
)
|
||
|
||
|
||
# ─── Shared text helpers ──────────────────────────────────────────
|
||
|
||
def _split_label_for_bar(label: str) -> tuple[str, str]:
|
||
"""'기술(디지털)' → ('기술', '(디지털)'). 괄호 없으면 (label, '')."""
|
||
m = re.match(r"^([^(]+?)\s*(\([^)]+\))\s*$", label.strip())
|
||
if m:
|
||
return m.group(1).strip(), m.group(2).strip()
|
||
return label.strip(), ""
|
||
|
||
|
||
def _extract_bold_or_plain(top_line: str) -> str:
|
||
bold = re.search(r"\*\*(.+?)\*\*", top_line)
|
||
if bold:
|
||
return bold.group(1).strip()
|
||
return top_line.strip().lstrip("*-").strip()
|
||
|
||
|
||
def _text_lines_with_indent(nested_lines: list[str], base_indent: int = 0) -> list[dict]:
|
||
text_lines = []
|
||
for line in nested_lines:
|
||
if not line.strip():
|
||
continue
|
||
s = line.strip()
|
||
if s in ("<br/>", "<br>", "---"):
|
||
continue
|
||
if not re.match(r"^[\*\-]\s", s):
|
||
continue
|
||
indent = len(line) - len(line.lstrip())
|
||
rel = max(0, indent - base_indent)
|
||
indent_level = max(0, rel // 2)
|
||
text = re.sub(r"^[\*\-]\s+", "", s)
|
||
text = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", text)
|
||
text_lines.append({"text": text, "indent": indent_level})
|
||
return text_lines
|
||
|
||
|
||
def _extract_markdown_table(content: str) -> tuple[list[dict] | None, str]:
|
||
"""Markdown 표 → [{from, to}] (column 1 = from, column 3 = to).
|
||
|
||
AS-IS / TO-BE 형식의 3-column 표 (from | arrow | to) 를 transforms 로 변환.
|
||
Returns (transforms_or_None, content_without_table).
|
||
"""
|
||
pattern = re.compile(
|
||
r"(^[ \t]*\|[^\n]+\|\n[ \t]*\|[\s\-:|]+\|\n(?:[ \t]*\|[^\n]+\|\n?)+)",
|
||
re.MULTILINE,
|
||
)
|
||
m = pattern.search(content)
|
||
if not m:
|
||
return None, content
|
||
rows = [r.strip() for r in m.group(1).strip().splitlines() if r.strip()]
|
||
transforms = []
|
||
for r in rows[2:]:
|
||
cells = [c.strip() for c in r.strip("|").split("|")]
|
||
if len(cells) >= 3:
|
||
f = re.sub(r"\*\*(.+?)\*\*", r"\1", cells[0])
|
||
t = re.sub(r"\*\*(.+?)\*\*", r"\1", cells[2])
|
||
transforms.append({"from": f, "to": t})
|
||
remaining = content[:m.start()] + content[m.end():]
|
||
return (transforms or None), remaining
|
||
|
||
|
||
# ─── Item parser primitives (top-bullet 단위) ─────────────────────
|
||
|
||
def _parse_nested_pillar_sections(nested_lines: list[str]) -> list[dict]:
|
||
"""Pillar nested → [{heading, text_lines}, ...]."""
|
||
sections = []
|
||
cur_heading = None
|
||
cur_text_lines: list[dict] = []
|
||
section_base_indent: int | None = None
|
||
|
||
for line in nested_lines:
|
||
if not line.strip():
|
||
continue
|
||
indent = len(line) - len(line.lstrip())
|
||
stripped = line.strip()
|
||
if not re.match(r"^[\*\-]\s", stripped):
|
||
continue
|
||
|
||
if section_base_indent is None or indent <= section_base_indent:
|
||
if cur_heading is not None:
|
||
sections.append({"heading": cur_heading, "text_lines": cur_text_lines})
|
||
bold = re.search(r"\*\*(.+?)\*\*", stripped)
|
||
cur_heading = (bold.group(1).strip() if bold
|
||
else stripped.lstrip("*-").strip())
|
||
cur_text_lines = []
|
||
section_base_indent = indent
|
||
else:
|
||
rel_indent = indent - section_base_indent
|
||
indent_level = max(0, (rel_indent - 2) // 2)
|
||
text = re.sub(r"^[\*\-]\s+", "", stripped)
|
||
text = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", text)
|
||
cur_text_lines.append({"text": text, "indent": indent_level})
|
||
|
||
if cur_heading is not None:
|
||
sections.append({"heading": cur_heading, "text_lines": cur_text_lines})
|
||
return sections
|
||
|
||
|
||
def parse_pillar_item(unit: tuple[str, list[str]]) -> dict:
|
||
"""F13 pillar — bold = label, label 분해, nested = sections."""
|
||
top_line, nested_lines = unit
|
||
label = _extract_bold_or_plain(top_line)
|
||
label_main, label_paren = _split_label_for_bar(label)
|
||
sections = _parse_nested_pillar_sections(nested_lines)
|
||
return {
|
||
"label": label,
|
||
"label_main": label_main,
|
||
"label_paren": label_paren,
|
||
"sections": sections,
|
||
}
|
||
|
||
|
||
def parse_quadrant_item(unit: tuple[str, list[str]]) -> dict:
|
||
"""F16 quadrant — bold = label, nested = body (text_lines flat list, no heading).
|
||
|
||
F13 pillar 와의 차이 :
|
||
- pillar_item 은 nested 안에서 heading + text_lines 계층 분리
|
||
- quadrant_item 은 nested 전체를 하나의 text_lines list 로 (heading 없음)
|
||
Returns:
|
||
{label, body: [{text, indent}, ...]}
|
||
"""
|
||
top_line, nested_lines = unit
|
||
label = _extract_bold_or_plain(top_line)
|
||
non_empty = [l for l in nested_lines if l.strip()]
|
||
base = min((len(l) - len(l.lstrip()) for l in non_empty), default=0)
|
||
body = _text_lines_with_indent(nested_lines, base_indent=base)
|
||
return {"label": label, "body": body}
|
||
|
||
|
||
def parse_compare_row_2col_item(unit: tuple[str, list[str]]) -> dict:
|
||
"""F18-style — bold = category label, nested 2 bullets = col_a / col_b values.
|
||
|
||
Pattern : top bullet = **카테고리**, nested = first 2 bullets.
|
||
*Parser 는 prefix stripping 안 함* (Codex round 43 §F1-b — narrow alias 정정).
|
||
Prefix stripping 은 *builder 의 strip_col_prefix_aliases option* 으로 위임.
|
||
|
||
Returns:
|
||
{label, col_a, col_b}
|
||
"""
|
||
top_line, nested_lines = unit
|
||
label = _extract_bold_or_plain(top_line)
|
||
# nested bullets — strip bullet marker, take first 2 (no prefix stripping)
|
||
nested = []
|
||
for l in nested_lines:
|
||
l_strip = l.strip()
|
||
if re.match(r"^[\*\-]\s", l_strip):
|
||
txt = re.sub(r"^[\*\-]\s+", "", l_strip)
|
||
txt = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", txt)
|
||
nested.append(txt)
|
||
col_a = nested[0] if len(nested) > 0 else ""
|
||
col_b = nested[1] if len(nested) > 1 else ""
|
||
return {"label": label, "col_a": col_a, "col_b": col_b}
|
||
|
||
|
||
ITEM_PARSERS: dict[str, Callable] = {
|
||
"pillar_item": parse_pillar_item,
|
||
"quadrant_item": parse_quadrant_item,
|
||
"compare_row_2col_item": parse_compare_row_2col_item,
|
||
}
|
||
|
||
|
||
# ─── Column body parsers (h3 subsection body 단위) ────────────────
|
||
|
||
def _parse_column_sections(body: str, transform_first: bool) -> list[dict]:
|
||
"""Column body → list of sections.
|
||
|
||
transform_first=True 면 첫 top-bullet 의 nested 안에 markdown table 이 있으면
|
||
text_lines 대신 transforms 로 산출 (AS-IS/TO-BE).
|
||
"""
|
||
groups = _split_top_bullets(body)
|
||
sections = []
|
||
for i, (top_line, nested_lines) in enumerate(groups):
|
||
title = _extract_bold_or_plain(top_line)
|
||
if i == 0 and transform_first:
|
||
nested_text = "\n".join(nested_lines)
|
||
transforms, _ = _extract_markdown_table(nested_text)
|
||
if transforms:
|
||
sections.append({"title": title, "transforms": transforms})
|
||
continue
|
||
non_empty = [l for l in nested_lines if l.strip()]
|
||
base = min((len(l) - len(l.lstrip()) for l in non_empty), default=0)
|
||
sections.append({
|
||
"title": title,
|
||
"text_lines": _text_lines_with_indent(nested_lines, base_indent=base),
|
||
})
|
||
return sections
|
||
|
||
|
||
def parse_column_with_transform(body: str) -> list[dict]:
|
||
"""첫 top-bullet 이 AS-IS/TO-BE 표 가능 (F29 process column)."""
|
||
return _parse_column_sections(body, transform_first=True)
|
||
|
||
|
||
def parse_column_plain(body: str) -> list[dict]:
|
||
"""모두 일반 text_lines section (F29 product column)."""
|
||
return _parse_column_sections(body, transform_first=False)
|
||
|
||
|
||
COLUMN_BODY_PARSERS: dict[str, Callable] = {
|
||
"column_with_transform": parse_column_with_transform,
|
||
"column_plain": parse_column_plain,
|
||
}
|
||
|
||
|
||
# ─── Payload builders (named registry — top-level dispatch) ───────
|
||
|
||
def _resolve_title(section, payload_spec: dict, contract: dict) -> dict:
|
||
"""payload.title.source 처리 — v0 = section.title 만 지원."""
|
||
title_spec = payload_spec.get("title", {}) or {}
|
||
src = title_spec.get("source")
|
||
if src is None:
|
||
return {}
|
||
if src == "section.title":
|
||
return {"title": section.title}
|
||
raise ValueError(
|
||
f"Contract '{contract['template_id']}' has unsupported title source "
|
||
f"'{src}'. v0 supports 'section.title' only."
|
||
)
|
||
|
||
|
||
def _build_items_with_role(section, units, contract) -> dict:
|
||
"""F13-style — top_bullets 각 → array item, role_order[i] 가 item.role_field 채움.
|
||
|
||
builder_options :
|
||
item_parser : ITEM_PARSERS key
|
||
array_root : payload[array_root] 에 list 부착
|
||
role_field : item dict 에 role 부착할 key (선택)
|
||
"""
|
||
options = contract["payload"]["builder_options"]
|
||
parser_name = options["item_parser"]
|
||
parser = ITEM_PARSERS.get(parser_name)
|
||
if parser is None:
|
||
raise ValueError(
|
||
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
|
||
f"but ITEM_PARSERS has no such entry."
|
||
)
|
||
|
||
role_order = contract.get("role_order", []) or []
|
||
role_field = options.get("role_field")
|
||
|
||
items = []
|
||
for i, unit in enumerate(units):
|
||
item = parser(unit)
|
||
if role_field and i < len(role_order):
|
||
item[role_field] = role_order[i]
|
||
items.append(item)
|
||
|
||
payload: dict = {}
|
||
payload.update(_resolve_title(section, contract["payload"], contract))
|
||
payload[options["array_root"]] = items
|
||
return payload
|
||
|
||
|
||
def _build_process_product_pair(section, units, contract) -> dict:
|
||
"""F29-style — h3 subsections 2 개 = 2 명명 column.
|
||
|
||
builder_options :
|
||
pad_sections_to : N (sections list 길이 강제 — 미달 시 빈 section 으로 채움)
|
||
columns : list of
|
||
- title_to : subsection title → payload[title_to]
|
||
body_to : parsed sections → payload[body_to] = {"sections": [...]}
|
||
body_parser : COLUMN_BODY_PARSERS key
|
||
pad_empty : empty section template (선택, default = {"title": "", "text_lines": []})
|
||
"""
|
||
options = contract["payload"]["builder_options"]
|
||
pad_to = options.get("pad_sections_to")
|
||
cols = options["columns"]
|
||
|
||
if len(units) < len(cols):
|
||
raise FitError(
|
||
f"Contract '{contract['template_id']}' builder process_product_pair needs "
|
||
f"{len(cols)} subsection units, got {len(units)} in section "
|
||
f"'{getattr(section, 'section_id', '?')}'."
|
||
)
|
||
|
||
payload: dict = {}
|
||
payload.update(_resolve_title(section, contract["payload"], contract))
|
||
|
||
for i, col in enumerate(cols):
|
||
sub_title, sub_body = units[i]
|
||
parser_name = col["body_parser"]
|
||
parser = COLUMN_BODY_PARSERS.get(parser_name)
|
||
if parser is None:
|
||
raise ValueError(
|
||
f"Contract '{contract['template_id']}' references column body_parser="
|
||
f"'{parser_name}' but COLUMN_BODY_PARSERS has no such entry."
|
||
)
|
||
sections_list = parser(sub_body)
|
||
if pad_to is not None:
|
||
empty_template = col.get("pad_empty", {"title": "", "text_lines": []})
|
||
while len(sections_list) < pad_to:
|
||
sections_list.append(dict(empty_template))
|
||
sections_list = sections_list[:pad_to]
|
||
payload[col["title_to"]] = sub_title
|
||
payload[col["body_to"]] = {"sections": sections_list}
|
||
|
||
return payload
|
||
|
||
|
||
def _build_quadrant_flat_slots(section, units, contract) -> dict:
|
||
"""F16-style — top_bullets 각 → flat keyed slots (quadrant_N_label / quadrant_N_body).
|
||
|
||
F13/F29 와의 차이 = output shape 가 array 도 named columns 도 아닌 flat keyed.
|
||
role/position 은 index 1..N 으로 implicit (1=TL, 2=TR, 3=BL, 4=BR — partial template 결정).
|
||
|
||
builder_options :
|
||
item_parser : ITEM_PARSERS key (각 unit → {label, body} dict 산출)
|
||
pad_to : N (units 수 < N 이면 빈 slot 으로 채움)
|
||
truncate_at : M (units 수 > M 이면 M+1 부터 무시 + _truncated_count 기록)
|
||
label_key_pattern : "quadrant_{n}_label" (n = 1-based index)
|
||
body_key_pattern : "quadrant_{n}_body"
|
||
empty_label : pad slot 의 label 값 (default = "")
|
||
empty_body : pad slot 의 body 값 (default = [])
|
||
"""
|
||
options = contract["payload"]["builder_options"]
|
||
parser_name = options["item_parser"]
|
||
parser = ITEM_PARSERS.get(parser_name)
|
||
if parser is None:
|
||
raise ValueError(
|
||
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
|
||
f"but ITEM_PARSERS has no such entry."
|
||
)
|
||
|
||
pad_to = options.get("pad_to", 4)
|
||
truncate_at = options.get("truncate_at", pad_to)
|
||
label_key = options.get("label_key_pattern", "quadrant_{n}_label")
|
||
body_key = options.get("body_key_pattern", "quadrant_{n}_body")
|
||
empty_label = options.get("empty_label", "")
|
||
empty_body = options.get("empty_body", [])
|
||
|
||
payload: dict = {}
|
||
payload.update(_resolve_title(section, contract["payload"], contract))
|
||
|
||
visible_units = list(units[:truncate_at])
|
||
parsed = [parser(u) for u in visible_units]
|
||
|
||
for i in range(pad_to):
|
||
n = i + 1
|
||
if i < len(parsed):
|
||
payload[label_key.format(n=n)] = parsed[i]["label"]
|
||
payload[body_key.format(n=n)] = parsed[i]["body"]
|
||
else:
|
||
payload[label_key.format(n=n)] = empty_label
|
||
# list / dict default 는 항상 새 객체 — shared reference 방지
|
||
payload[body_key.format(n=n)] = list(empty_body) if isinstance(empty_body, list) else empty_body
|
||
|
||
if len(units) > truncate_at:
|
||
payload["_truncated_count"] = len(units) - truncate_at
|
||
|
||
return payload
|
||
|
||
|
||
def _build_cycle_intersect_3(section, units, contract) -> dict:
|
||
"""F12-style — cycle-3way-intersection. top_bullets 3 items → flat keyed
|
||
circle_1_label / circle_2_label / circle_3_label. *body 무시* (label only —
|
||
이 frame 의 3 메인 원 visual 은 label 만 사용). intersection 텍스트는 별
|
||
optional (default 빈 문자).
|
||
|
||
F16 quadrant_flat_slots 와 비교 :
|
||
- F16 : N=4 + body 사용 (quadrant_N_label + quadrant_N_body)
|
||
- F12 : N=3 + body 미사용 (circle_N_label 만) + intersection text 별
|
||
|
||
builder_options :
|
||
item_parser : ITEM_PARSERS key (label 만 사용, body 무시)
|
||
pad_to : N (default=3) — units < N 이면 empty label 로 채움
|
||
truncate_at : M (default=3) — units > M 이면 무시 + _truncated_count
|
||
label_key_pattern : "circle_{n}_label" (n = 1-based)
|
||
empty_label : pad slot 의 label 값 (default = "")
|
||
intersection_default : intersection 텍스트 (slot optional — default 빈 문자)
|
||
"""
|
||
options = contract["payload"]["builder_options"]
|
||
parser_name = options["item_parser"]
|
||
parser = ITEM_PARSERS.get(parser_name)
|
||
if parser is None:
|
||
raise ValueError(
|
||
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
|
||
f"but ITEM_PARSERS has no such entry."
|
||
)
|
||
|
||
pad_to = options.get("pad_to", 3)
|
||
truncate_at = options.get("truncate_at", pad_to)
|
||
label_key = options.get("label_key_pattern", "circle_{n}_label")
|
||
empty_label = options.get("empty_label", "")
|
||
intersection = options.get("intersection_default", "")
|
||
|
||
payload: dict = {}
|
||
payload.update(_resolve_title(section, contract["payload"], contract))
|
||
|
||
visible_units = list(units[:truncate_at])
|
||
parsed = [parser(u) for u in visible_units]
|
||
|
||
for i in range(pad_to):
|
||
n = i + 1
|
||
if i < len(parsed):
|
||
payload[label_key.format(n=n)] = parsed[i]["label"]
|
||
else:
|
||
payload[label_key.format(n=n)] = empty_label
|
||
|
||
payload["intersection"] = intersection
|
||
|
||
if len(units) > truncate_at:
|
||
payload["_truncated_count"] = len(units) - truncate_at
|
||
|
||
return payload
|
||
|
||
|
||
def _build_compare_table_2col(section, units, contract) -> dict:
|
||
"""F18-style — compare table with 2 columns + N category rows.
|
||
|
||
payload :
|
||
title : section.title
|
||
col_a_label : 좌 column header (예: "BIM")
|
||
col_b_label : 우 column header (예: "DX")
|
||
rows : list[{label, col_a, col_b}] — top_bullets 각각 → row
|
||
|
||
builder_options :
|
||
item_parser : ITEM_PARSERS key (예: `compare_row_2col_item`)
|
||
col_a_label_default : col_a header literal in catalog.
|
||
Semantics depend on col_a_label_default_role.
|
||
col_a_label_default_role : "placeholder" | "fallback" (IMP-40 #69).
|
||
placeholder = Figma visual placeholder; suppressed
|
||
at runtime → col_a_label emitted as "".
|
||
fallback = MDX 미명시 시 catalog literal 사용.
|
||
absent = legacy contracts default to fallback.
|
||
col_b_label_default : col_b header literal (same policy as col_a).
|
||
col_b_label_default_role : same role discriminator for col_b (IMP-40 #69).
|
||
strip_col_prefix_aliases : list[str] — col_a/col_b 값의 prefix `<alias>:`
|
||
를 strip (Codex round 43 §F1-b — narrow alias).
|
||
예 : ["BIM", "DX"]. default [] (no stripping).
|
||
max_rows : N (default 999 — practical 한계).
|
||
|
||
NOTE: MDX 측 col_a_label / col_b_label inflow 경로 없음
|
||
(compare_row_2col_item parser → {label,col_a,col_b}, _resolve_title → title only).
|
||
placeholder role 은 col_*_label 을 빈 문자열로 확정 — 정책 결정점은 catalog 한 곳뿐.
|
||
"""
|
||
options = contract["payload"]["builder_options"]
|
||
parser_name = options["item_parser"]
|
||
parser = ITEM_PARSERS.get(parser_name)
|
||
if parser is None:
|
||
raise ValueError(
|
||
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
|
||
f"but ITEM_PARSERS has no such entry."
|
||
)
|
||
|
||
def _resolve_label_default(col_key: str) -> str:
|
||
default_key = f"{col_key}_label_default"
|
||
role_key = f"{col_key}_label_default_role"
|
||
role = options.get(role_key, "fallback")
|
||
if role == "placeholder":
|
||
return ""
|
||
if role == "fallback":
|
||
return options.get(default_key, "")
|
||
raise ValueError(
|
||
f"Contract '{contract['template_id']}' builder_options.{role_key}='{role}' "
|
||
f"is invalid; expected 'placeholder' or 'fallback' (IMP-40 #69)."
|
||
)
|
||
|
||
col_a_label = _resolve_label_default("col_a")
|
||
col_b_label = _resolve_label_default("col_b")
|
||
strip_aliases = options.get("strip_col_prefix_aliases", []) or []
|
||
max_rows = options.get("max_rows", 999)
|
||
|
||
payload: dict = {}
|
||
payload.update(_resolve_title(section, contract["payload"], contract))
|
||
payload["col_a_label"] = col_a_label
|
||
payload["col_b_label"] = col_b_label
|
||
|
||
# Compile precise prefix patterns per alias (Codex round 43 §F1-b narrow).
|
||
strip_patterns = [
|
||
re.compile(rf"^{re.escape(a)}\s*[::]\s*(.+)$")
|
||
for a in strip_aliases
|
||
]
|
||
|
||
def _strip_alias(value: str) -> str:
|
||
for pat in strip_patterns:
|
||
m = pat.match(value)
|
||
if m:
|
||
return m.group(1).strip()
|
||
return value
|
||
|
||
visible = list(units[:max_rows])
|
||
rows = []
|
||
for u in visible:
|
||
row = parser(u)
|
||
if strip_patterns:
|
||
row["col_a"] = _strip_alias(row.get("col_a", ""))
|
||
row["col_b"] = _strip_alias(row.get("col_b", ""))
|
||
rows.append(row)
|
||
payload["rows"] = rows
|
||
|
||
if len(units) > max_rows:
|
||
payload["_truncated_count"] = len(units) - max_rows
|
||
|
||
return payload
|
||
|
||
|
||
def _build_paired_rows_4x2_slots(section, units, contract) -> dict:
|
||
"""F17-style — paired_rows_4x2_alternating_pills. top_bullets 8 units → 2-axis keyed slots.
|
||
|
||
1-axis (quadrant_flat_slots = TL/TR/BL/BR) vs 2-axis (row × side) :
|
||
- quadrant : index 1..4 → quadrant_N_{label,body}
|
||
- paired_rows_4x2 : index 1..8 → row_R_SIDE_{label,body} where R = ceil(i/2), SIDE = left|right
|
||
|
||
deterministic index mapping per Codex round 60 §Q3 answer + round 70 §1 :
|
||
unit 1 → row_1_left unit 2 → row_1_right
|
||
unit 3 → row_2_left unit 4 → row_2_right
|
||
unit 5 → row_3_left unit 6 → row_3_right
|
||
unit 7 → row_4_left unit 8 → row_4_right
|
||
|
||
strict 8 : under/over → FitError before render (Codex round 60 §3, round 62 acceptance
|
||
criterion "no pad_to/truncate_at fallback hides cardinality mismatch").
|
||
parser = quadrant_item (label + body heading-less) — F17 atomic issue = single label + single body.
|
||
|
||
builder_options :
|
||
item_parser : ITEM_PARSERS key (default = "quadrant_item")
|
||
label_key_pattern : "row_{r}_{side}_label"
|
||
body_key_pattern : "row_{r}_{side}_body"
|
||
rows : 4
|
||
sides : ["left", "right"]
|
||
"""
|
||
options = contract["payload"]["builder_options"]
|
||
parser_name = options["item_parser"]
|
||
parser = ITEM_PARSERS.get(parser_name)
|
||
if parser is None:
|
||
raise ValueError(
|
||
f"Contract '{contract['template_id']}' references item_parser='{parser_name}' "
|
||
f"but ITEM_PARSERS has no such entry."
|
||
)
|
||
|
||
label_key = options.get("label_key_pattern", "row_{r}_{side}_label")
|
||
body_key = options.get("body_key_pattern", "row_{r}_{side}_body")
|
||
rows = options.get("rows", 4)
|
||
sides = options.get("sides", ["left", "right"])
|
||
expected = rows * len(sides)
|
||
|
||
if len(units) != expected:
|
||
raise ValueError(
|
||
f"Contract '{contract['template_id']}' requires strict {expected} units "
|
||
f"(rows={rows} × sides={len(sides)}), got {len(units)}. "
|
||
f"silent pad/truncate is disabled for paired_rows_4x2_slots."
|
||
)
|
||
|
||
payload: dict = {}
|
||
payload.update(_resolve_title(section, contract["payload"], contract))
|
||
|
||
parsed = [parser(u) for u in units]
|
||
idx = 0
|
||
for r in range(1, rows + 1):
|
||
for side in sides:
|
||
payload[label_key.format(r=r, side=side)] = parsed[idx]["label"]
|
||
payload[body_key.format(r=r, side=side)] = parsed[idx]["body"]
|
||
idx += 1
|
||
|
||
return payload
|
||
|
||
|
||
PAYLOAD_BUILDERS: dict[str, Callable] = {
|
||
"items_with_role": _build_items_with_role,
|
||
"process_product_pair": _build_process_product_pair,
|
||
"quadrant_flat_slots": _build_quadrant_flat_slots,
|
||
"cycle_intersect_3": _build_cycle_intersect_3,
|
||
"compare_table_2col": _build_compare_table_2col,
|
||
"paired_rows_4x2_slots": _build_paired_rows_4x2_slots,
|
||
}
|
||
|
||
|
||
# ─── Catalog builder invariant (IMP-#85 u2) ──────────────────────
|
||
|
||
def _check_catalog_builder_invariant(catalog: dict) -> None:
|
||
"""Every non-`visual_pending` contract must declare a registered builder.
|
||
|
||
`visual_pending: true` contracts are scaffolding records whose builders
|
||
are tracked as VP backlog (별 axis IMP-04b / #42) — skipped here so the
|
||
catalog can keep declaring them without breaking boot.
|
||
|
||
Violations are aggregated and raised together so first-fix iteration sees
|
||
the full drift surface, not just the first row.
|
||
|
||
Raises:
|
||
CatalogInvariantError — when one or more live (non-VP) contracts
|
||
either omit `payload.builder` or reference a name absent from
|
||
`PAYLOAD_BUILDERS`.
|
||
"""
|
||
violations: list[str] = []
|
||
for template_id, contract in catalog.items():
|
||
if not isinstance(contract, dict):
|
||
continue
|
||
if contract.get("visual_pending") is True:
|
||
continue
|
||
payload = contract.get("payload") or {}
|
||
builder_name = payload.get("builder") if isinstance(payload, dict) else None
|
||
if not builder_name:
|
||
violations.append(
|
||
f"Contract '{template_id}' (non-VP) missing payload.builder."
|
||
)
|
||
continue
|
||
if builder_name not in PAYLOAD_BUILDERS:
|
||
violations.append(
|
||
f"Contract '{template_id}' (non-VP) references payload.builder="
|
||
f"'{builder_name}' not in PAYLOAD_BUILDERS registry."
|
||
)
|
||
if violations:
|
||
raise CatalogInvariantError(
|
||
f"Catalog builder invariant violated "
|
||
f"({len(violations)} non-VP contract(s)):\n - "
|
||
+ "\n - ".join(violations)
|
||
+ f"\nRegistered builders: {sorted(PAYLOAD_BUILDERS.keys())}"
|
||
)
|
||
|
||
|
||
# ─── Generic mapper (single dispatch via builder) ────────────────
|
||
|
||
def _check_cardinality(contract: dict, units: list, section) -> None:
|
||
card = contract.get("cardinality", {}) or {}
|
||
n = len(units)
|
||
strict = card.get("strict")
|
||
if strict is not None and n != strict:
|
||
raise FitError(
|
||
f"Contract '{contract['template_id']}' expects strict {strict} units "
|
||
f"(source_shape={contract['source_shape']}), got {n} "
|
||
f"in section '{getattr(section, 'section_id', '?')}'. "
|
||
f"overflow_policy={card.get('overflow_policy', 'abort_or_review')}."
|
||
)
|
||
mn = card.get("min")
|
||
if mn is not None and n < mn:
|
||
raise FitError(
|
||
f"Contract '{contract['template_id']}' expects min {mn} units, got {n} "
|
||
f"in section '{getattr(section, 'section_id', '?')}'."
|
||
)
|
||
mx = card.get("max")
|
||
if mx is not None and n > mx:
|
||
raise FitError(
|
||
f"Contract '{contract['template_id']}' expects max {mx} units, got {n} "
|
||
f"in section '{getattr(section, 'section_id', '?')}'."
|
||
)
|
||
|
||
|
||
def compute_capacity_fit(template_id: str, content: str) -> dict:
|
||
"""Content 의 item_count vs template contract capacity 비교 (planner 단계 사전 검사).
|
||
|
||
목적 : 자동 파이프라인이 "이 frame 에 이 content 넣으면 잘린다 / 안 맞는다" 를
|
||
render 전에 미리 알도록. silent truncate / FitError 차단의 입력 신호.
|
||
|
||
Returns:
|
||
dict with :
|
||
item_count : source_shape 으로 split 한 unit 수
|
||
source_shape : contract 의 source_shape ('top_bullets' / 'h3_subsections' / ...)
|
||
capacity : {strict, min, max, truncate_at, pad_to} (없는 키는 None)
|
||
fit_status : 'ok' / 'strict_mismatch' / 'exceeds_max' / 'below_min' /
|
||
'exceeds_truncate' / 'no_contract' / 'unknown_source_shape'
|
||
mismatch_reason : str | None — fit_status != 'ok' 일 때 이유
|
||
|
||
fit 룰 (자동 파이프라인이 silent loss 방지하기 위한 보수적 규칙):
|
||
1. strict cardinality 가 있으면 정확히 일치해야 함
|
||
2. max 가 있으면 그 이하
|
||
3. min 이 있으면 그 이상
|
||
4. truncate_at 이 있으면 그 이하 (초과 시 builder 가 자르므로 = 콘텐츠 손실)
|
||
5. pad_to 만 있고 item_count 가 부족 → mismatch 아님 (빈 slot 으로 채워질 뿐, 손실 X)
|
||
"""
|
||
contract = get_contract(template_id)
|
||
if contract is None:
|
||
return {
|
||
"item_count": None,
|
||
"source_shape": None,
|
||
"capacity": {"strict": None, "min": None, "max": None,
|
||
"truncate_at": None, "pad_to": None},
|
||
"fit_status": "no_contract",
|
||
"mismatch_reason": (
|
||
f"no contract for template_id='{template_id}' — capacity check skipped. "
|
||
f"이 candidate 는 catalog-only dispatch 의 ValueError 가 mapper 단계에서 발생할 것."
|
||
),
|
||
}
|
||
|
||
source_shape = contract.get("source_shape")
|
||
try:
|
||
units = split_source(source_shape, content)
|
||
except ValueError:
|
||
return {
|
||
"item_count": None,
|
||
"source_shape": source_shape,
|
||
"capacity": {"strict": None, "min": None, "max": None,
|
||
"truncate_at": None, "pad_to": None},
|
||
"fit_status": "unknown_source_shape",
|
||
"mismatch_reason": f"source_shape='{source_shape}' is not supported by split_source().",
|
||
}
|
||
item_count = len(units)
|
||
|
||
cardinality = contract.get("cardinality") or {}
|
||
strict = cardinality.get("strict")
|
||
mn = cardinality.get("min")
|
||
mx = cardinality.get("max")
|
||
|
||
builder_options = (contract.get("payload") or {}).get("builder_options") or {}
|
||
truncate_at = builder_options.get("truncate_at")
|
||
pad_to = builder_options.get("pad_to")
|
||
|
||
capacity = {
|
||
"strict": strict,
|
||
"min": mn,
|
||
"max": mx,
|
||
"truncate_at": truncate_at,
|
||
"pad_to": pad_to,
|
||
}
|
||
|
||
if strict is not None and item_count != strict:
|
||
return {
|
||
"item_count": item_count,
|
||
"source_shape": source_shape,
|
||
"capacity": capacity,
|
||
"fit_status": "strict_mismatch",
|
||
"mismatch_reason": (
|
||
f"strict cardinality {strict}, content has {item_count} items. "
|
||
f"mapper 가 FitError 를 raise 할 것."
|
||
),
|
||
}
|
||
if mx is not None and item_count > mx:
|
||
return {
|
||
"item_count": item_count,
|
||
"source_shape": source_shape,
|
||
"capacity": capacity,
|
||
"fit_status": "exceeds_max",
|
||
"mismatch_reason": f"max cardinality {mx}, content has {item_count} items.",
|
||
}
|
||
if mn is not None and item_count < mn:
|
||
return {
|
||
"item_count": item_count,
|
||
"source_shape": source_shape,
|
||
"capacity": capacity,
|
||
"fit_status": "below_min",
|
||
"mismatch_reason": f"min cardinality {mn}, content has {item_count} items.",
|
||
}
|
||
if truncate_at is not None and item_count > truncate_at:
|
||
return {
|
||
"item_count": item_count,
|
||
"source_shape": source_shape,
|
||
"capacity": capacity,
|
||
"fit_status": "exceeds_truncate",
|
||
"mismatch_reason": (
|
||
f"builder truncate_at {truncate_at}, content has {item_count} items "
|
||
f"({item_count - truncate_at} would be silently dropped). "
|
||
f"silent truncate 방지 위해 자동 선택 X."
|
||
),
|
||
}
|
||
|
||
return {
|
||
"item_count": item_count,
|
||
"source_shape": source_shape,
|
||
"capacity": capacity,
|
||
"fit_status": "ok",
|
||
"mismatch_reason": None,
|
||
}
|
||
|
||
|
||
def map_with_contract(section, contract: dict) -> dict:
|
||
"""MdxSection + contract → slot_payload via named PAYLOAD_BUILDERS dispatch.
|
||
|
||
Steps :
|
||
1. source_shape 따라 raw_content split → units
|
||
2. cardinality check (위반 → FitError)
|
||
3. payload.builder 의 named entry 조회 → builder(section, units, contract)
|
||
"""
|
||
units = split_source(contract["source_shape"], section.raw_content)
|
||
_check_cardinality(contract, units, section)
|
||
|
||
payload_spec = contract["payload"]
|
||
builder_name = payload_spec.get("builder")
|
||
if not builder_name:
|
||
raise BuilderMissingError(
|
||
f"Contract '{contract['template_id']}' missing payload.builder. "
|
||
f"available: {sorted(PAYLOAD_BUILDERS.keys())}"
|
||
)
|
||
builder = PAYLOAD_BUILDERS.get(builder_name)
|
||
if builder is None:
|
||
raise BuilderMissingError(
|
||
f"Contract '{contract['template_id']}' references payload.builder="
|
||
f"'{builder_name}' but PAYLOAD_BUILDERS has no such entry. "
|
||
f"available: {sorted(PAYLOAD_BUILDERS.keys())}"
|
||
)
|
||
return builder(section, units, contract)
|