"""Phase T: 11-Stage 파이프라인.
Stage 0: MDX 표준화 (코드)
Stage 1A: Kei 꼭지 추출 (AI)
Stage 1B: 컨셉 구체화 (AI)
Stage 1.5a: 폰트 위계 + 컨테이너 비율 역산 (코드)
Stage 1.7: 참고 블록 선택 (코드)
Stage 1.5b: 디자인 예산 재계산 (코드)
Stage 2: HTML 생성 (AI — Claude Sonnet)
Stage 3: 렌더링 조립 (코드)
Stage 4: 품질 게이트 (Selenium + Opus Vision)
Stage 5: 서빙 (코드)
이전 Phase S 파이프라인은 generate_slide_legacy()로 보존.
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from pathlib import Path
from typing import Any, AsyncIterator
import anthropic
from src.pipeline_context import (
PipelineContext, create_context, StageFailure, build_retry_feedback,
Topic, NormalizedContent, Analysis, PageStructure,
ContainerInfo, TextBudget, DesignBudget, FontHierarchy, BlockReference,
)
from src.kei_client import classify_content, refine_concepts, generate_structured_text
from src.design_director import LAYOUT_PRESETS, select_preset
from src.image_utils import get_image_sizes, embed_images
from src.space_allocator import calculate_container_specs
from src.slide_measurer import measure_rendered_heights, capture_slide_screenshot
from src.config import settings
logger = logging.getLogger(__name__)
# Kei API 재시도 설정 (P0 수정: 무한 루프 방지)
KEI_RETRY_INTERVAL = 10
KEI_MAX_RETRY_ATTEMPTS = 30 # 최대 30회 (5분)
KEI_MAX_RETRY_DURATION = 300 # 절대 제한 300초
# ══════════════════════════════════════
# T-0.B: run_stage() 공통 실행 패턴
# ══════════════════════════════════════
async def run_stage(
stage_fn,
context: PipelineContext,
stage_name: str,
max_retries: int = 0,
) -> PipelineContext:
"""모든 Stage의 공통 실행 패턴.
transform(context) → validate(result) → update(context) → snapshot()
Args:
stage_fn: async def stage(context) -> dict.
성공 시 업데이트할 필드 dict 반환.
실패 시 dict에 "_errors" 키 포함.
context: 현재 누적 컨텍스트
stage_name: 스냅샷 파일명 + 로그용
max_retries: RETRYABLE 에러 시 재시도 횟수. 코드 Stage는 0.
"""
for attempt in range(max_retries + 1):
result = await stage_fn(context)
errors = result.pop("_errors", [])
if not errors:
# 성공: 컨텍스트에 병합 + 스냅샷
context = context.model_copy(update=result)
context.save_snapshot(stage_name)
logger.info(f"[{stage_name}] 완료")
return context
# 에러 처리
severity = errors[0].get("severity", "RETRYABLE") if errors else "RETRYABLE"
if severity == "FATAL":
context.log_error(stage_name, errors, attempt, "FATAL")
raise StageFailure(stage_name, errors)
if severity == "ADJUSTABLE":
# 코드로 자동 조정 — _adjustments 키에 조정된 값이 있으면 적용
adjustments = result.pop("_adjustments", {})
if adjustments:
context = context.model_copy(update=adjustments)
context.warnings.append(f"[{stage_name}] 자동 조정: {errors}")
context.save_snapshot(stage_name)
logger.warning(f"[{stage_name}] ADJUSTABLE 자동 조정 적용")
return context
# RETRYABLE
context.log_error(stage_name, errors, attempt, "RETRYABLE")
if attempt < max_retries:
# Self-Refine 피드백 생성
context.retry_feedback = build_retry_feedback(
stage_name, errors, context.raw_content[:500]
)
logger.warning(
f"[{stage_name}] 재시도 {attempt + 1}/{max_retries}: "
f"{[e.get('localization', '') for e in errors]}"
)
else:
logger.error(f"[{stage_name}] 재시도 소진 ({max_retries}회)")
raise StageFailure(stage_name, errors)
# ══════════════════════════════════════
# T-0.C: Phase T 파이프라인 (11 Stage)
# ══════════════════════════════════════
async def generate_slide(
content: str,
manual_layout: dict[str, Any] | None = None,
base_path: str = "",
) -> AsyncIterator[dict[str, str]]:
"""Phase T 파이프라인: MDX → 슬라이드 HTML.
11 Stage를 순차 실행하며 SSE progress 이벤트를 yield.
각 Stage는 run_stage() 패턴을 따름.
"""
ctx = create_context(content, base_path)
try:
# ── Stage 0: MDX 표준화 ──
yield {"event": "progress", "data": "0/7 MDX 표준화 중..."}
async def stage_0(context: PipelineContext) -> dict:
from src.mdx_normalizer import normalize_mdx_content, validate_stage0
result = normalize_mdx_content(context.raw_content)
errors = validate_stage0(result, context.raw_content)
if errors:
return {"_errors": errors}
return {
"normalized": NormalizedContent(
clean_text=result["clean_text"],
title=result["title"],
images=result["images"],
popups=result["popups"],
tables=result["tables"],
sections=result["sections"],
),
}
ctx = await run_stage(stage_0, ctx, "stage_0", max_retries=0)
# ── Stage 1A: Kei 꼭지 추출 ──
yield {"event": "progress", "data": "1/7 Kei 실장이 꼭지를 추출 중..."}
async def stage_1a(context: PipelineContext) -> dict:
if manual_layout:
analysis_raw = manual_layout
else:
# Stage 0에서 정규화된 텍스트를 Kei에게 전달 (JSX/frontmatter 제거됨)
input_text = context.normalized.clean_text or context.raw_content
analysis_raw = await _retry_kei(classify_content, input_text)
topics_raw = analysis_raw.get("topics", [])
# Kei 응답에 있는 키만 전달, 없는 건 Pydantic 기본값 사용
topics = [Topic(**{k: v for k, v in t.items() if k in Topic.model_fields}) for t in topics_raw]
page_struct_raw = analysis_raw.get("page_structure", {})
page_structure = PageStructure(roles=page_struct_raw)
analysis = Analysis(
core_message=analysis_raw.get("core_message", ""),
title=analysis_raw.get("title", ""),
total_pages=analysis_raw.get("total_pages", 1),
)
# I-6: 슬라이드 제목 ↔ 첫 꼭지 제목 중복 검증
if topics:
from difflib import SequenceMatcher
similarity = SequenceMatcher(
None, analysis.title, topics[0].title
).ratio()
if similarity > 0.7:
topics[0].title = f"{topics[0].purpose}: {topics[0].summary[:30]}"
logger.warning(f"[제목 중복 교정] 유사도 {similarity:.0%}")
# T-2: Stage 1A 검증 (Pydantic + 원본 대조)
from src.validators import validate_stage_1a
validation_errors = validate_stage_1a(
analysis_raw,
context.normalized.clean_text,
)
if validation_errors:
return {"_errors": validation_errors}
return {
"analysis": analysis,
"topics": topics,
"page_structure": page_structure,
}
ctx = await run_stage(stage_1a, ctx, "stage_1a", max_retries=2)
# ── Stage 1B: 컨셉 구체화 ──
yield {"event": "progress", "data": "1.5/7 컨셉 구체화 중..."}
async def stage_1b(context: PipelineContext) -> dict:
# manual_layout에서 이미 source_data/structured_text가 있으면 스킵
has_source = any(t.source_data for t in context.topics)
if has_source:
logger.info("[1단계-B] manual_layout에서 source_data 제공됨 — Kei 호출 스킵")
return {"topics": list(context.topics)}
# Kei에게 원본 + 1A 결과 전달
analysis_dict = {
"topics": [t.model_dump() for t in context.topics],
"page_structure": context.page_structure.roles,
"core_message": context.analysis.core_message,
"title": context.analysis.title,
}
input_text = context.normalized.clean_text or context.raw_content
refined = await refine_concepts(input_text, analysis_dict)
if refined is None:
return {"_errors": [{"severity": "RETRYABLE", "localization": "refine_concepts 반환 None"}]}
# 1B 결과를 topics에 병합
refined_topics = refined.get("topics", [])
updated_topics = []
for t in context.topics:
match = next((rt for rt in refined_topics if rt.get("id") == t.id), None)
if match:
updated = t.model_copy(update={
"relation_type": match.get("relation_type", t.relation_type),
"expression_hint": match.get("expression_hint", t.expression_hint),
"source_data": match.get("source_data", t.source_data),
})
updated_topics.append(updated)
else:
updated_topics.append(t)
# T-2: Stage 1B 검증 (모순 탐지 + 원본 대조)
from src.validators import validate_stage_1b
validation_errors = validate_stage_1b(
[t.model_dump() for t in updated_topics],
context.normalized.clean_text,
raw_content=context.raw_content,
)
if validation_errors:
return {"_errors": validation_errors}
return {"topics": updated_topics}
ctx = await run_stage(stage_1b, ctx, "stage_1b", max_retries=2)
# ── Stage 1B 보완: structured_text 생성 (별도 Kei 호출) ──
has_structured = any(t.structured_text for t in ctx.topics)
if has_structured:
logger.info("[1단계-B-ST] structured_text 이미 있음 — Kei 호출 스킵")
analysis_for_st = {"topics": [t.model_dump() for t in ctx.topics]}
else:
input_text = ctx.normalized.clean_text or ctx.raw_content
analysis_for_st = {
"topics": [t.model_dump() for t in ctx.topics],
}
analysis_for_st = await generate_structured_text(input_text, analysis_for_st)
# structured_text를 topics에 병합
st_map = {t["id"]: t.get("structured_text", "") for t in analysis_for_st["topics"]}
updated_topics = []
for t in ctx.topics:
st = st_map.get(t.id, "")
if st:
updated_topics.append(t.model_copy(update={"structured_text": st}))
else:
updated_topics.append(t)
ctx = ctx.model_copy(update={"topics": updated_topics})
ctx.save_snapshot("stage_1b") # structured_text 포함하여 다시 저장
logger.info(f"[1단계-B-ST] structured_text 병합 완료")
# ── Stage 1.5a: 컨테이너 스펙 계산 ──
yield {"event": "progress", "data": "2/7 컨테이너 계산 중..."}
async def stage_1_5a(context: PipelineContext) -> dict:
from src.space_allocator import calculate_font_hierarchy, calculate_dynamic_ratio
# 이미지 크기 측정
image_sizes = get_image_sizes(context.raw_content, context.base_path)
# 역할별 텍스트 양 측정
role_text_lengths = {}
for role, info in context.page_structure.roles.items():
if not isinstance(info, dict):
continue
role_text = context.get_role_content(role)
role_text_lengths[role] = len(role_text)
# T-5: 폰트 위계 확정 (텍스트 양 기반, 역할 인식)
font_hierarchy_dict = calculate_font_hierarchy(role_text_lengths)
font_hierarchy = FontHierarchy(
key_msg=font_hierarchy_dict.get("핵심", 14.0),
core=font_hierarchy_dict.get("본심", 12.0),
bg=font_hierarchy_dict.get("배경", 11.0),
sidebar=font_hierarchy_dict.get("첨부", 10.0),
)
# 프리셋 선택 (비율 계산보다 먼저 — 프리셋의 기본 비율을 fallback으로 사용)
analysis_dict = {
"topics": [t.model_dump() for t in context.topics],
"page_structure": context.page_structure.roles,
}
preset_name = select_preset(analysis_dict)
preset = LAYOUT_PRESETS.get(preset_name, {})
# T-5: 동적 비율 역산 (sidebar 텍스트 양 기반 + 프리셋 기본 비율)
container_ratio = calculate_dynamic_ratio(
role_text_lengths, font_hierarchy_dict,
slide_width=settings.slide_width,
slide_height=settings.slide_height,
preset=preset,
)
logger.info(
f"[T-5] 폰트 위계: 핵심={font_hierarchy.key_msg}, 본심={font_hierarchy.core}, "
f"배경={font_hierarchy.bg}, 첨부={font_hierarchy.sidebar} / "
f"비율: body:sidebar={container_ratio[0]}:{container_ratio[1]}"
)
# 컨테이너 스펙 계산 (기존 space_allocator 활용)
container_specs = calculate_container_specs(
page_structure=context.page_structure.roles,
topics=[t.model_dump() for t in context.topics],
preset=preset,
slide_width=settings.slide_width,
slide_height=settings.slide_height,
)
# ContainerSpec → ContainerInfo 변환
containers = {}
for role, spec in container_specs.items():
containers[role] = ContainerInfo(
role=spec.role,
zone=spec.zone,
topic_ids=spec.topic_ids,
weight=spec.weight,
height_px=spec.height_px,
width_px=spec.width_px,
max_height_cost=spec.max_height_cost,
block_constraints=spec.block_constraints,
)
# 이미지 정보 구성
slide_images = []
if image_sizes:
import base64 as b64_mod
# image_sizes가 list[dict]이면 직접 순회, dict이면 items()
img_items = image_sizes if isinstance(image_sizes, list) else [
{**v, "key": k} for k, v in image_sizes.items()
]
for img_info in img_items:
img_key = img_info.get("path", img_info.get("key", ""))
img_path = Path(context.base_path) / Path(img_key).name if context.base_path else Path(img_key)
img_b64 = ""
if img_path.exists():
img_b64 = b64_mod.b64encode(img_path.read_bytes()).decode()
slide_images.append({
"path": str(img_path),
"width": img_info.get("width", 0),
"height": img_info.get("height", 0),
"ratio": round(img_info.get("width", 1) / max(1, img_info.get("height", 1)), 2),
"topic_id": img_info.get("topic_id"),
"b64": img_b64,
})
updated_analysis = context.analysis.model_copy(update={
"image_sizes": image_sizes or {},
})
return {
"preset_name": preset_name,
"preset": preset,
"containers": containers,
"slide_images": slide_images,
"analysis": updated_analysis,
"font_hierarchy": font_hierarchy,
"container_ratio": container_ratio,
}
ctx = await run_stage(stage_1_5a, ctx, "stage_1_5a", max_retries=0)
# ── Stage 1.7: 참고 블록 선택 + 디자인 레퍼런스 ──
yield {"event": "progress", "data": "2.5/7 참고 블록 선택 중..."}
async def stage_1_7(context: PipelineContext) -> dict:
from src.block_reference import select_and_generate_references
references_raw = select_and_generate_references(
topics=[t.model_dump() for t in context.topics],
containers=context.containers,
page_structure=context.page_structure.roles,
)
# V-1: dict → list[BlockReference] 모델 변환 (꼭지별)
references = {}
for role, ref_list in references_raw.items():
references[role] = [
BlockReference(
block_id=rd["block_id"],
variant=rd["variant"],
visual_type=rd["visual_type"],
schema_info=rd["schema_info"],
design_reference_html=rd["design_reference_html"],
topic_id=rd.get("topic_id"),
supporting_topic_ids=rd.get("supporting_topic_ids", []),
is_hierarchical=rd.get("is_hierarchical", False),
)
for rd in ref_list
]
return {"references": references}
ctx = await run_stage(stage_1_7, ctx, "stage_1_7", max_retries=0)
# ── Stage 1.8: 콘텐츠-컨테이너 적합성 검증 + 재배분 + 보강 ──
yield {"event": "progress", "data": "2.8/7 적합성 검증 중..."}
async def stage_1_8(context: PipelineContext) -> dict:
from src.fit_verifier import (
calculate_fit, redistribute, analyze_enhancements,
apply_enhancements, build_escalation_report,
build_enhancement_report, calculate_sub_layout,
EnhancementAnalysis,
)
from src.block_assembler import assemble_slide_html
from src.slide_measurer import measure_rendered_heights
refs_dict = {}
for role, ref_list in context.references.items():
refs_dict[role] = [r.model_dump() for r in ref_list]
containers_dict = {}
for role, ci in context.containers.items():
containers_dict[role] = {
"height_px": ci.height_px,
"width_px": ci.width_px,
"zone": ci.zone,
}
font_h = context.font_hierarchy.model_dump()
normalized = context.normalized.model_dump()
core_message = context.analysis.core_message
# ── before: 비중대로 배정된 컨테이너 (현재 context.containers) ──
run_dir = context.get_run_dir()
steps_dir = run_dir / "steps"
steps_dir.mkdir(parents=True, exist_ok=True)
# before 시각화 저장 (Stage 1.5a의 초기 배정 상태)
from src.step_visualizer import _gen_stage_1_8_fit_before
_gen_stage_1_8_fit_before(context, steps_dir)
logger.info(f"[Stage 1.8] before: " + ", ".join(
f"{r}={ci.height_px}px" for r, ci in context.containers.items()
))
# ── filled 전 sub_layouts 계산 (이미지/텍스트/keymsg 배치 결정) ──
initial_fit = calculate_fit(
topics=[t.model_dump() for t in context.topics],
page_structure=context.page_structure.roles,
containers=containers_dict,
references=refs_dict,
font_hierarchy=font_h,
normalized=normalized,
core_message=core_message,
)
empty_enhancements = EnhancementAnalysis()
pre_sub_layouts = {}
for role, rf in initial_fit.roles.items():
ci = context.containers.get(role)
if not ci or not rf.topic_fits:
continue
layout = calculate_sub_layout(
role=role,
main_height_px=ci.height_px,
main_width_px=ci.width_px,
topic_fits=rf.topic_fits,
enhancements=empty_enhancements,
font_hierarchy=font_h,
)
pre_sub_layouts[role] = {
"main_height_px": layout.main_height_px,
"main_width_px": layout.main_width_px,
"sub_containers": [
{"name": sc.name, "width_px": sc.width_px, "height_px": sc.height_px, "align": sc.align}
for sc in layout.sub_containers
],
"table_rows": layout.table_rows,
}
# context에 sub_layouts 반영 후 filled 생성
context = context.model_copy(update={"sub_layouts": pre_sub_layouts})
# ── filled: before 컨테이너에 블록+텍스트 채움 → Selenium 측정 ──
filled_html = assemble_slide_html(context)
(steps_dir / "stage_1_8_filled.html").write_text(
filled_html.replace('
', '\n'
''
'Stage 1.8: filled (블록+텍스트 채운 상태)
\n'
''
'before 컨테이너에 블록+텍스트를 채움. 넘치는 영역 확인.
\n', 1),
encoding="utf-8",
)
filled_measurement = await asyncio.to_thread(measure_rendered_heights, filled_html)
logger.info(f"[Stage 1.8] filled 측정 완료")
# ── 판단: 넘치는 영역 처리 ──
updated_containers = dict(context.containers) # 복사
for zone_name, zone_data in filled_measurement.get("zones", {}).items():
if zone_data.get("overflowed"):
excess = zone_data.get("excess_px", 0)
scroll_h = zone_data.get("scrollHeight", 0)
if zone_name == "sidebar":
# sidebar 예외: 세로 확장 허용
for role, ci in updated_containers.items():
if ci.zone == "sidebar":
new_h = max(ci.height_px, scroll_h + 10) # 여유 10px
updated_containers[role] = ci.model_copy(update={"height_px": new_h})
logger.info(f"[Stage 1.8] sidebar 예외 확장: {role} {ci.height_px}px → {new_h}px")
elif zone_name == "body":
# body: 배경↔본심 재배분으로 처리 (후속 redistribute에서)
logger.info(f"[Stage 1.8] body overflow +{excess}px — 재배분 필요")
# containers_dict 업데이트 (sidebar 확장 반영)
for role, ci in updated_containers.items():
containers_dict[role] = {
"height_px": ci.height_px,
"width_px": ci.width_px,
"zone": ci.zone,
}
# ── fit 계산 + 재배분 (업데이트된 컨테이너 기준) ──
fit_analysis = calculate_fit(
topics=[t.model_dump() for t in context.topics],
page_structure=context.page_structure.roles,
containers=containers_dict,
references=refs_dict,
font_hierarchy=font_h,
normalized=normalized,
core_message=core_message,
)
fit_analysis = redistribute(fit_analysis, containers_dict)
# ── after: 조정된 컨테이너 ──
for role, ci in updated_containers.items():
new_h = fit_analysis.redistribution.get(role, ci.height_px) if fit_analysis.redistribution else ci.height_px
updated_containers[role] = ci.model_copy(update={"height_px": int(new_h)})
logger.info(f"[Stage 1.8] after: " + ", ".join(
f"{r}={ci.height_px}px" for r, ci in updated_containers.items()
))
# Step 3: Kei 에스컬레이션 (필요 시)
if fit_analysis.needs_escalation:
from src.kei_client import call_kei_fit_escalation
report = build_escalation_report(fit_analysis)
logger.info(f"[Stage 1.8] 에스컬레이션 필요:\n{report}")
kei_result = await call_kei_fit_escalation(
fit_report=report,
topics=[t.model_dump() for t in context.topics],
content_summary=context.raw_content[:1500],
)
kei_decisions = []
if kei_result:
kei_decisions = kei_result.get("decisions", [])
logger.info(f"[Stage 1.8] Kei 결정: {len(kei_decisions)}건")
for d in kei_decisions:
action = d.get("action", "")
target_role = d.get("role", "")
detail = d.get("detail", "")
logger.info(f"[V-4] {target_role} → {action}: {detail}")
if action == "restructure" and target_role in fit_analysis.roles:
fit_analysis = redistribute(fit_analysis, containers_dict)
else:
kei_decisions = []
# Step 4: 보강 제안 분석
enhancements = analyze_enhancements(
topics=[t.model_dump() for t in context.topics],
page_structure=context.page_structure.roles,
references=refs_dict,
analysis=fit_analysis,
normalized=normalized,
core_message=core_message,
)
# Step 5: Kei에게 보강 제안 확인 요청
if enhancements.enhancements:
from src.kei_client import call_kei_enhancement_review
report = build_enhancement_report(enhancements)
logger.info(f"[Stage 1.8] Kei 보강 검토 요청: {len(enhancements.enhancements)}건")
kei_review = await call_kei_enhancement_review(
enhancement_report=report,
topics=[t.model_dump() for t in context.topics],
core_message=core_message,
)
# Kei 결정 반영: reject된 것 제거, modify된 것 수정
if kei_review and kei_review.get("decisions"):
for decision in kei_review["decisions"]:
action = decision.get("action", "approve")
d_type = decision.get("type", "")
d_role = decision.get("role", "")
modification = decision.get("modification", "")
if action == "reject":
# 해당 enhancement 제거
enhancements.enhancements = [
e for e in enhancements.enhancements
if not (e.type == d_type and e.role == d_role)
]
logger.info(f"[V-5] {d_role}/{d_type} → 거부됨")
elif action == "modify" and modification:
# 해당 enhancement 수정
for e in enhancements.enhancements:
if e.type == d_type and e.role == d_role:
e.description = modification
logger.info(f"[V-5] {d_role}/{d_type} → 수정: {modification[:50]}")
else:
logger.info(f"[V-5] {d_role}/{d_type} → 승인")
# Step 6: 승인된 보강 적용 + fit 재검증
enhancements = apply_enhancements(enhancements, fit_analysis)
# V-10: Kei가 문맥 기반으로 bold 키워드 판단
from src.kei_client import call_kei_bold_keywords
kei_bold = await call_kei_bold_keywords(
topics=[t.model_dump() for t in context.topics],
page_structure=context.page_structure.roles,
)
if kei_bold:
enhancements.bold_keywords = kei_bold
logger.info(f"[V-10] Kei bold 키워드: {kei_bold}")
logger.info(f"[Stage 1.8] 보강 확정: 보충블록 {len(enhancements.supplement_blocks)}개, "
f"강조 {len(enhancements.emphasis_blocks)}개, "
f"bold {len(enhancements.bold_keywords)}개 역할")
# 재배분된 컨테이너 크기 업데이트
updated_containers = {}
for role, ci in context.containers.items():
new_h = fit_analysis.redistribution.get(role, ci.height_px) if fit_analysis.redistribution else ci.height_px
updated_containers[role] = ci.model_copy(update={
"height_px": int(new_h),
})
# Step 7: 세부 컨테이너 배치 계산
sub_layouts = {}
for role, rf in fit_analysis.roles.items():
new_h = fit_analysis.redistribution.get(role, rf.allocated_px) if fit_analysis.redistribution else rf.allocated_px
ci = context.containers.get(role)
if not ci or not rf.topic_fits:
continue
layout = calculate_sub_layout(
role=role,
main_height_px=int(new_h),
main_width_px=ci.width_px,
topic_fits=rf.topic_fits,
enhancements=enhancements,
font_hierarchy=font_h,
)
sub_layouts[role] = {
"main_height_px": layout.main_height_px,
"main_width_px": layout.main_width_px,
"sub_containers": [
{"name": sc.name, "width_px": sc.width_px, "height_px": sc.height_px, "align": sc.align}
for sc in layout.sub_containers
],
"table_rows": layout.table_rows,
}
# V'-2: 팝업 원본을 Kei가 공간에 맞게 요약
popup_summaries = {}
popups = context.normalized.popups or []
if popups:
from src.kei_client import call_kei_summarize_popup
for role, role_sub in sub_layouts.items():
role_scs = role_sub.get("sub_containers", [])
text_sc = next((sc for sc in role_scs if sc["name"] in ("text_and_table", "text")), None)
if not text_sc:
continue
# 텍스트 줄 수 계산
role_font_map = {"배경": "bg", "본심": "core", "첨부": "sidebar", "결론": "key_msg"}
fk = role_font_map.get(role, "core")
fs = font_h.get(fk, 12)
# structured_text에서 팝업 마커 찾기
ps_info = context.page_structure.roles.get(role, {})
tids = ps_info.get("topic_ids", []) if isinstance(ps_info, dict) else []
topic_map = {t.id: t for t in context.topics}
for tid in tids:
topic = topic_map.get(tid)
if not topic:
continue
st_text = topic.structured_text or topic.source_data or ""
import re as _re
popup_refs = _re.findall(r'\[팝업:\s*([^\]]+)\]', st_text)
for pr in popup_refs:
# 팝업 원본 찾기
popup = next((p for p in popups if pr in p.get("title", "")), None)
if not popup:
continue
# 공란 계산: V'-4 적용 후 높이 (결론 바로 위까지 채움)
from src.fit_verifier import _load_design_tokens as _ldt_v2
_v2_tokens = _ldt_v2()
_v2_slide_h = _v2_tokens.get("slide_height", 720)
_v2_pad = _v2_tokens["spacing_page"]
_v2_header_h = _v2_tokens.get("header_height", 66)
_v2_gap = _v2_tokens["spacing_block"]
_v2_gap_small = _v2_tokens["spacing_small"]
_v2_concl_ci = updated_containers.get("결론") or next((ci for r, ci in updated_containers.items() if ci.zone == "footer"), None)
_v2_concl_h = _v2_concl_ci.height_px if _v2_concl_ci else 53
_v2_ft_top = _v2_slide_h - _v2_pad - _v2_concl_h - _v2_gap
_v2_column_bottom = _v2_ft_top - _v2_gap
_v2_bg_ci = next((ci for r, ci in updated_containers.items() if ci.zone == "body" and r != role), None)
_v2_bg_h = _v2_bg_ci.height_px if _v2_bg_ci else 0
_v2_core_top = _v2_pad + _v2_header_h + _v2_gap + _v2_bg_h + _v2_gap_small
after_h = _v2_column_bottom - _v2_core_top # 결론 위까지의 실제 본심 높이
keymsg_h = 0
for sc in role_scs:
if sc.get("name") == "keymsg":
keymsg_h = float(sc.get("height_px", 0))
title_h = (fs + 1) * 1.5 + 4
# 이미지 높이: 실제 비율로 계산
svg_sc = next((sc for sc in role_scs if sc.get("name") == "svg"), None)
img_h = 0
if svg_sc:
svg_w = float(svg_sc.get("width_px", 200))
img_ratio = next((img.get("ratio", 1) for img in (context.slide_images or []) if img.get("b64")), 1)
img_h = svg_w / img_ratio if img_ratio > 0 else float(svg_sc.get("height_px", 0))
text_lines = len([l for l in st_text.split("\n") if l.strip() and not l.strip().startswith("[팝업:") and not l.strip().startswith("[이미지:") and not l.strip().lstrip("• ").startswith("출처:")])
text_h_used = text_lines * fs * 1.55
# 표 공간 = 전체 - 제목 - max(이미지,텍스트) - keymsg - padding
upper_h = max(img_h, text_h_used)
available_h = after_h - 16 - title_h - upper_h - keymsg_h - 8
available_w = float(text_sc["width_px"])
if available_h < fs * 3:
continue # 공간 부족하면 건너뜀
summary = await call_kei_summarize_popup(
popup_title=pr,
popup_content=popup.get("content", ""),
available_width_px=available_w,
available_height_px=available_h,
font_size=fs,
)
if summary:
popup_summaries[pr] = summary
logger.info(f"[V'-2] {pr}: format={summary.get('format')}")
# 결과를 context에 저장 (Stage 2에서 사용)
return {
"containers": updated_containers,
"sub_layouts": sub_layouts,
"measurement": filled_measurement,
"fit_result": {
"roles": {
role: {
"fit_status": rf.fit_status,
"total_required_px": rf.total_required_px,
"allocated_px": rf.allocated_px,
"shortfall_px": rf.shortfall_px,
}
for role, rf in fit_analysis.roles.items()
},
"redistribution": fit_analysis.redistribution,
"needs_escalation": fit_analysis.needs_escalation,
},
"enhancement_result": {
"kei_decisions": kei_decisions,
"subordinate_treatments": [
{"role": e.role, "detail": e.detail}
for e in enhancements.enhancements if e.type == "subordinate"
],
"supplement_blocks": [
{"role": sb.role, "block_id": sb.block_id, "content_source": sb.content_source}
for sb in enhancements.supplement_blocks
],
"emphasis_blocks": enhancements.emphasis_blocks,
"bold_keywords": enhancements.bold_keywords,
"popup_summaries": popup_summaries,
},
}
ctx = await run_stage(stage_1_8, ctx, "stage_1_8", max_retries=0)
# ── Stage 1.5b: 디자인 예산 재계산 (블록 선택 후) ──
async def stage_1_5b(context: PipelineContext) -> dict:
from src.space_allocator import calculate_design_budget
updated_containers = {}
for role, ci in context.containers.items():
# V-1: 꼭지별 블록 리스트 → 첫 번째 블록의 schema를 대표로 사용
ref_list = context.references.get(role, [])
schema_info = ref_list[0].schema_info if ref_list else {}
font_size = getattr(context.font_hierarchy, {
"본심": "core", "배경": "bg", "첨부": "sidebar", "결론": "core"
}.get(role, "core"), 12.0)
budget = calculate_design_budget(
container_height_px=ci.height_px,
container_width_px=ci.width_px,
block_schema=schema_info,
font_size=font_size,
)
updated_ci = ci.model_copy(update={
"design_budget": DesignBudget(
available_height_px=budget["available_height_px"],
available_width_px=budget["available_width_px"],
max_circle_diameter=budget["max_circle_diameter"],
max_img_width=budget["max_img_width"],
max_img_height=budget["max_img_height"],
fits=budget["fits"],
),
})
updated_containers[role] = updated_ci
if not budget["fits"]:
logger.warning(
f"[T-6] {role}: 디자인 예산 음수 — "
f"text={budget['text_height_px']}px > container={ci.height_px}px"
)
return {"containers": updated_containers}
ctx = await run_stage(stage_1_5b, ctx, "stage_1_5b", max_retries=0)
# ── Stage 2: HTML 생성 (Claude Sonnet) ──
yield {"event": "progress", "data": "3/7 슬라이드 HTML 생성 중..."}
async def stage_2(context: PipelineContext) -> dict:
from src.content_verifier import generate_with_retry
# PipelineContext → 기존 함수 인터페이스로 변환
analysis_dict = {
"topics": [t.model_dump() for t in context.topics],
"page_structure": context.page_structure.roles,
"core_message": context.analysis.core_message,
"title": context.analysis.title,
"total_pages": context.analysis.total_pages,
"image_sizes": context.analysis.image_sizes,
}
container_specs_dict = {}
for role, ci in context.containers.items():
from src.space_allocator import ContainerSpec as LegacyContainerSpec
container_specs_dict[role] = LegacyContainerSpec(
role=ci.role,
zone=ci.zone,
topic_ids=ci.topic_ids,
weight=ci.weight,
height_px=ci.height_px,
width_px=ci.width_px,
max_height_cost=ci.max_height_cost,
block_constraints=ci.block_constraints,
)
# T-7 + Phase V: context를 generate_slide_html에 전달
phase_t_context = {
"font_hierarchy": context.font_hierarchy.model_dump(),
"container_ratio": context.container_ratio,
"references": {
role: [r.model_dump() for r in ref_list]
for role, ref_list in context.references.items()
},
"design_budgets": {
role: ci.design_budget.model_dump() if ci.design_budget else {}
for role, ci in context.containers.items()
},
# Phase V: Stage 1.8 결과
"sub_layouts": context.sub_layouts,
"fit_result": context.fit_result,
"enhancements": context.enhancement_result,
}
analysis_dict["phase_t"] = phase_t_context
generated, verification = await generate_with_retry(
content=context.raw_content,
analysis=analysis_dict,
container_specs=container_specs_dict,
preset=context.preset,
images=context.slide_images,
)
return {"generated_html": generated}
ctx = await run_stage(stage_2, ctx, "stage_2", max_retries=0)
# ── Stage 3: 렌더링 조립 ──
yield {"event": "progress", "data": "4/7 슬라이드 조립 중..."}
async def stage_3(context: PipelineContext) -> dict:
from src.renderer import render_slide_from_html
analysis_dict = {
"topics": [t.model_dump() for t in context.topics],
"page_structure": context.page_structure.roles,
"core_message": context.analysis.core_message,
"title": context.analysis.title,
"_container_ratio": list(context.container_ratio),
"_font_hierarchy": context.font_hierarchy.model_dump(),
"_containers": {
role: {"height_px": ci.height_px, "width_px": ci.width_px, "zone": ci.zone}
for role, ci in context.containers.items()
},
"_fit_redistribution": context.fit_result.get("redistribution", {}) if context.fit_result else {},
}
html = render_slide_from_html(context.generated_html, analysis_dict, context.preset)
return {"rendered_html": html}
ctx = await run_stage(stage_3, ctx, "stage_3", max_retries=0)
# ── Stage 4: 측정 + 품질 게이트 ──
yield {"event": "progress", "data": "5/7 품질 검증 중..."}
async def stage_4(context: PipelineContext) -> dict:
from src.kei_client import vision_quality_gate
# L4: Selenium 실측
measurement = await asyncio.to_thread(
measure_rendered_heights, context.rendered_html
)
has_overflow = False
for zone_name, zone_data in measurement.get("zones", {}).items():
if zone_data.get("overflowed"):
has_overflow = True
excess = zone_data.get("excess_px", 0)
logger.warning(f"[L4] {zone_name}: overflow +{excess}px")
if has_overflow:
logger.warning("[L4] overflow 감지됨 — 결과물에 경고 포함하여 진행")
else:
logger.info("[L4] overflow 없음")
# L5: 스크린샷 + 비전 품질 게이트
screenshot_b64 = await asyncio.to_thread(
capture_slide_screenshot, context.rendered_html
)
quality_score = 100
if screenshot_b64:
analysis_dict = {
"topics": [t.model_dump() for t in context.topics],
"core_message": context.analysis.core_message,
}
quality_result = await vision_quality_gate(screenshot_b64, analysis_dict)
if quality_result:
quality_score = quality_result.get("score", 0)
if quality_score < 30:
return {"_errors": [{
"severity": "FATAL",
"localization": f"품질 {quality_score}/100 < 30",
"instruction": "출력 차단",
}]}
return {
"measurement": measurement,
"quality_score": quality_score,
"screenshot_b64": screenshot_b64 or "",
}
ctx = await run_stage(stage_4, ctx, "stage_4", max_retries=0)
# ── Stage 5: 검증 통과 확인 → final.html 출력 ──
yield {"event": "progress", "data": "6/7 최종 처리 중..."}
# 검증 결과 확인
measurement = ctx.measurement or {}
has_overflow = any(
zd.get("overflowed", False)
for zd in measurement.get("zones", {}).values()
)
quality = ctx.quality_score if hasattr(ctx, "quality_score") else 100
if has_overflow:
overflow_zones = [
f'{zn}(+{zd.get("excess_px", 0)}px)'
for zn, zd in measurement.get("zones", {}).items()
if zd.get("overflowed")
]
logger.warning(f"[Stage 5] overflow 감지: {overflow_zones} — 결과물에 경고 포함")
if quality < 30:
logger.error(f"[Stage 5] 품질 {quality}/100 < 30 — 출력 차단")
yield {"event": "error", "data": f"품질 검증 미달 ({quality}/100). 출력 차단."}
return
html = ctx.rendered_html
if ctx.base_path:
html = embed_images(html, ctx.base_path)
ctx = ctx.model_copy(update={"rendered_html": html})
ctx.save_snapshot("final")
# final.html 저장
run_dir = ctx.get_run_dir()
run_dir.mkdir(parents=True, exist_ok=True)
(run_dir / "final.html").write_text(html, encoding="utf-8")
# Phase T: 팝업(상세 내용)을 별도 HTML로 분리 저장
popups = ctx.normalized.popups
if popups:
for i, popup in enumerate(popups, 1):
popup_title = popup.get("title", f"첨부{i}")
popup_content = popup.get("content", "")
# 파일명에서 특수문자 제거
safe_title = "".join(c for c in popup_title if c.isalnum() or c in "_ -").strip()
popup_filename = f"첨부{i}_{safe_title}.html"
# TP-6: 첨부 HTML에 디자인 토큰 적용
import re as _re
# JSX style={{}} 잔여 정리
clean_content = _re.sub(r'style=\{\{[^}]*\}\}', '', popup_content)
clean_content = clean_content.replace('
', '
')
# markdown bold → HTML bold
clean_content = _re.sub(r'\*\*(.+?)\*\*', r'\1', clean_content)
popup_html = f"""
{popup_title} — 첨부 자료
{popup_title}
첨부 자료 {i} — 슬라이드 본문의 상세 내용
{clean_content}
본 자료는 슬라이드 "{ctx.analysis.title}"의 첨부 자료입니다.
"""
(run_dir / popup_filename).write_text(popup_html, encoding="utf-8")
logger.info(f"[Phase T] 첨부 HTML 저장: {popup_filename}")
yield {"event": "result", "data": html}
logger.info(f"슬라이드 생성 완료: run={ctx.run_id}")
except StageFailure as e:
logger.error(f"파이프라인 중단: {e.stage_name} — {e.errors}")
yield {"event": "error", "data": f"Stage {e.stage_name} 실패: {e.errors}"}
except Exception as e:
logger.exception(f"파이프라인 오류: {e}")
yield {"event": "error", "data": str(e)}
# ══════════════════════════════════════
# 레거시 파이프라인 (Phase S) — 보존
# ══════════════════════════════════════
async def _retry_kei(fn, *args, **kwargs):
"""Kei API 호출을 성공할 때까지 재시도한다.
Kei API는 필수 인프라. fallback 없음.
최대 30회 또는 300초까지 재시도 후 TimeoutError.
"""
import asyncio
attempt = 0
start_time = time.time()
while attempt < KEI_MAX_RETRY_ATTEMPTS:
attempt += 1
elapsed = time.time() - start_time
if elapsed > KEI_MAX_RETRY_DURATION:
raise TimeoutError(
f"Kei API 타임아웃: {fn.__name__} — "
f"{elapsed:.0f}초 경과 (제한 {KEI_MAX_RETRY_DURATION}초)"
)
result = await fn(*args, **kwargs)
if result is not None:
if attempt > 1:
logger.info(f"[Kei 재시도] {fn.__name__} 성공 ({attempt}번째 시도)")
return result
logger.warning(
f"[Kei 재시도] {fn.__name__} 실패 (시도 {attempt}/{KEI_MAX_RETRY_ATTEMPTS}). "
f"{KEI_RETRY_INTERVAL}초 후 재시도..."
)
await asyncio.sleep(KEI_RETRY_INTERVAL)
raise RuntimeError(
f"Kei API 최대 재시도 초과: {fn.__name__} — {attempt}회 시도"
)
def _save_step(run_dir: Path, filename: str, data: Any) -> None:
"""스텝 결과를 JSON 또는 HTML로 저장한다. (K-1)"""
run_dir.mkdir(parents=True, exist_ok=True)
filepath = run_dir / filename
if filename.endswith(".html"):
filepath.write_text(data, encoding="utf-8")
else:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"[중간 산출물] {filename} 저장 → {run_dir.name}/")
async def generate_slide_legacy(
content: str,
manual_layout: dict[str, Any] | None = None,
base_path: str = "",
) -> AsyncIterator[dict[str, str]]:
"""Phase S 레거시 파이프라인. Phase T 전환 전 보존용.
Yields:
SSE 이벤트: progress / result / error
"""
# K-1: 중간 산출물 저장용 디렉토리
run_id = str(int(time.time() * 1000))
run_dir = Path("data/runs") / run_id
try:
# 1단계: Kei 실장 — 꼭지 추출 + 분석
yield {"event": "progress", "data": "1/5 Kei 실장이 꼭지를 추출 중..."}
if manual_layout:
analysis = manual_layout
else:
analysis = await _retry_kei(classify_content, content)
# _retry_kei는 무한 재시도. None이 올 수 없다.
topic_count = len(analysis.get("topics", []))
page_count = analysis.get("total_pages", 1)
logger.info(f"1단계-A 완료: {topic_count}개 꼭지, {page_count}페이지")
_save_step(run_dir, "step1_analysis.json", analysis)
# 1단계-B: 각 꼭지 컨셉 구체화
yield {"event": "progress", "data": "1.5/5 Kei 실장이 각 꼭지의 컨셉을 구체화 중..."}
analysis = await refine_concepts(content, analysis)
logger.info("1단계-B 완료: 컨셉 구체화")
_save_step(run_dir, "step1b_concepts.json", {
"concepts": [
{k: t.get(k) for k in ("id", "title", "purpose", "relation_type", "expression_hint", "source_data")}
for t in analysis.get("topics", [])
]
})
# I-6: 슬라이드 제목 ↔ 첫 꼭지 제목 중복 검증
from difflib import SequenceMatcher
title = analysis.get("title", "")
topics = analysis.get("topics", [])
if topics:
first_title = topics[0].get("title", "")
similarity = SequenceMatcher(None, title, first_title).ratio()
if similarity > 0.7:
purpose = topics[0].get("purpose", "문제제기")
topics[0]["title"] = f"{purpose}: {topics[0].get('summary', '')[:30]}"
logger.warning(
f"[제목 중복 교정] 유사도 {similarity:.0%} → 첫 꼭지 제목 변경"
)
# 이미지 크기 측정 (base_path 있을 때만)
image_sizes = get_image_sizes(content, base_path)
if image_sizes:
analysis["image_sizes"] = image_sizes
logger.info(f"이미지 측정: {len(image_sizes)}개")
# ★ Phase O-1: 컨테이너 스펙 계산 (Kei 비중 → px 확정)
preset_name = select_preset(analysis)
preset = LAYOUT_PRESETS.get(preset_name, {})
page_struct = analysis.get("page_structure", {})
container_specs = calculate_container_specs(
page_structure=page_struct,
topics=analysis.get("topics", []),
preset=preset,
slide_width=settings.slide_width,
slide_height=settings.slide_height,
)
_save_step(run_dir, "step1c_containers.json", {
role: {
"height_px": spec.height_px,
"width_px": spec.width_px,
"max_height_cost": spec.max_height_cost,
"topic_ids": spec.topic_ids,
"weight": spec.weight,
"block_constraints": spec.block_constraints,
}
for role, spec in container_specs.items()
})
# ★ Phase S: Claude Sonnet이 HTML 직접 생성
# 블록 선택 없음. 슬롯 채우기 없음. AI가 콘텐츠에 맞는 HTML 구조를 직접 만든다.
yield {"event": "progress", "data": "2/4 슬라이드 HTML 생성 중..."}
from src.content_verifier import generate_with_retry
from src.renderer import render_slide_from_html
from src.kei_client import vision_quality_gate
import asyncio
topics = analysis.get("topics", [])
# 이미지 정보 구성 (base64 포함)
slide_images = []
if image_sizes:
import base64 as b64_mod
for img_key, img_info in image_sizes.items():
img_path = Path(base_path) / img_key if base_path else Path(img_key)
img_b64 = ""
if img_path.exists():
img_b64 = b64_mod.b64encode(img_path.read_bytes()).decode()
slide_images.append({
"path": str(img_path),
"width": img_info.get("width", 0),
"height": img_info.get("height", 0),
"ratio": round(img_info.get("width", 1) / max(1, img_info.get("height", 1)), 2),
"topic_id": img_info.get("topic_id"),
"b64": img_b64,
})
# Claude Sonnet HTML 생성 + 독립 검증 + 재시도 루프
generated, verification = await generate_with_retry(
content=content,
analysis=analysis,
container_specs=container_specs,
preset=preset,
images=slide_images,
)
_save_step(run_dir, "step2b_verification.json", {
area: {"passed": r.passed, "score": r.score, "errors": r.errors}
for area, r in verification.items()
})
_save_step(run_dir, "step2_generated.json", {
"body_html_length": len(generated.get("body_html", "")),
"sidebar_html_length": len(generated.get("sidebar_html", "")),
"footer_html_length": len(generated.get("footer_html", "")),
"reasoning": generated.get("reasoning", ""),
})
logger.info(
f"[Phase S] HTML 생성 완료: body={len(generated.get('body_html', ''))}자, "
f"sidebar={len(generated.get('sidebar_html', ''))}자, "
f"footer={len(generated.get('footer_html', ''))}자"
)
# 3단계: 렌더링 — AI 생성 HTML을 슬라이드 프레임에 삽입
yield {"event": "progress", "data": "3/4 슬라이드 조립 중..."}
html = render_slide_from_html(generated, analysis, preset)
logger.info("[Phase S] 슬라이드 조립 완료")
_save_step(run_dir, "step3_rendered.html", html)
# ★ Phase Q: 검증 렌더링 + 수학적 조정 + 비전 품질 게이트
measurement = await asyncio.to_thread(measure_rendered_heights, html)
_save_step(run_dir, "step4_measurement.json", measurement)
# Phase S: overflow 감지
has_overflow = False
for zone_name, zone_data in measurement.get("zones", {}).items():
if zone_data.get("overflowed"):
has_overflow = True
logger.warning(f"[Phase S] {zone_name}: overflow +{zone_data.get('excess_px', 0)}px")
if has_overflow:
logger.warning("[Phase S] overflow 감지 — 결과물에 반영 (후속 품질 게이트에서 평가)")
else:
logger.info("[Phase S] overflow 없음")
# Phase S: 비전 모델 품질 게이트
yield {"event": "progress", "data": "4/4 품질 검증 중..."}
screenshot_b64 = await asyncio.to_thread(capture_slide_screenshot, html)
quality_result = None
if screenshot_b64:
_save_step(run_dir, "step5_screenshot.txt", f"base64 PNG, {len(screenshot_b64)} chars")
quality_result = await vision_quality_gate(screenshot_b64, analysis)
if quality_result:
_save_step(run_dir, "step5_quality_gate.json", quality_result)
if not quality_result.get("passed", True):
score = quality_result.get("score", 0)
issues = quality_result.get("issues", [])
logger.warning(f"[Q-6] 품질 게이트 FAIL: {score}/100 — {issues}")
# Q-8: 심각한 품질 문제 시 출력 차단
if score < 30:
logger.error(f"[Q-8] 출력 차단: 품질 {score}/100 < 30 최소 기준")
yield {"event": "error", "data": f"슬라이드 품질 미달 ({score}/100). 재시도해 주세요."}
return
else:
logger.info(f"[Q-6] 품질 게이트 PASS: {quality_result.get('score', 0)}/100")
else:
logger.warning("[Q-6] 스크린샷 캡처 실패 — 품질 게이트 스킵")
# D-5: 이미지를 base64로 삽입 (다운로드 HTML에서도 보이도록)
if base_path:
html = embed_images(html, base_path)
logger.info("이미지 base64 삽입 완료")
_save_step(run_dir, "final.html", html)
yield {"event": "result", "data": html}
logger.info(f"슬라이드 생성 완료: run={run_id}")
except Exception as e:
logger.exception(f"파이프라인 오류: {e}")
yield {"event": "error", "data": str(e)}
async def _adjust_design(
layout_concept: dict[str, Any],
analysis: dict[str, Any],
) -> dict[str, Any]:
"""4단계 전반: 디자인 실무자가 텍스트 양에 맞게 CSS를 조정한다.
각 area별 블록 수, 텍스트 총량, zone 예산을 계산하고,
Sonnet이 area별 CSS 변수 override를 결정한다.
블록 템플릿이 이미 CSS 변수(var(--font-body) 등)를 사용하므로,
area div에서 변수를 override하면 내부 블록이 자동 조정된다.
"""
try:
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
# 프리셋 정보 가져오기
preset_name = select_preset(analysis)
preset = LAYOUT_PRESETS.get(preset_name, {})
zones = preset.get("zones", {})
for page in layout_concept.get("pages", []):
# area별 블록 수 + 텍스트 총량 집계
area_info = {}
for block in page.get("blocks", []):
area = block.get("area", "body")
if area not in area_info:
zone = zones.get(area, {})
area_info[area] = {
"block_count": 0,
"total_chars": 0,
"budget_px": zone.get("budget_px", 0),
"width_pct": zone.get("width_pct", 100),
"block_types": [],
}
data = block.get("data", {})
text_len = len(json.dumps(data, ensure_ascii=False))
area_info[area]["block_count"] += 1
area_info[area]["total_chars"] += text_len
area_info[area]["block_types"].append(block.get("type", ""))
# area 정보 텍스트 구성
area_lines = []
for area_name, info in area_info.items():
area_lines.append(
f"- {area_name} (예산 {info['budget_px']}px, 너비 {info['width_pct']}%): "
f"{info['block_count']}개 블록, 총 {info['total_chars']}자\n"
f" 블록 타입: {', '.join(info['block_types'])}"
)
system = (
"당신은 디자인 실무자이다. 편집자가 정리한 텍스트가 각 영역에 잘 들어가도록 CSS를 조정한다.\n\n"
"## 원칙\n"
"- 텍스트를 자르지 않는다. 디자인이 텍스트에 맞춘다.\n"
"- 빈 공간을 방치하지 않는다.\n"
"- 텍스트가 많으면: 폰트/여백을 줄여서 맞춘다.\n"
"- 텍스트가 적으면: 폰트/여백을 늘려서 채운다.\n\n"
"## 조정 가능한 CSS 변수\n"
"- --font-body (기본 0.95rem): 본문 폰트 크기\n"
"- --font-subtitle (기본 1.25rem): 소제목 폰트 크기\n"
"- --font-caption (기본 0.8rem): 캡션 폰트 크기\n"
"- --spacing-inner (기본 16px): 블록 내부 여백\n"
"- --spacing-block (기본 20px): 블록 간 간격\n"
"- --spacing-small (기본 8px): 작은 여백\n\n"
"## 출력 형식 (JSON만. 설명 없이.)\n"
"각 area에 적용할 CSS 변수 override를 inline style 문자열로 반환.\n"
"조정 불필요한 area는 빈 문자열.\n"
'{"area_styles": {"body": "--font-body: 0.85rem; --spacing-inner: 10px;", "sidebar": "", "footer": ""}}'
)
user_prompt = (
f"## 각 영역 현황\n" + "\n".join(area_lines) +
f"\n\n위 영역별로 CSS 변수 조정이 필요한지 판단하여 JSON으로 반환해줘."
)
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=system,
messages=[{"role": "user", "content": user_prompt}],
)
result_text = response.content[0].text
result = _parse_json(result_text)
if result and "area_styles" in result:
page["area_styles"] = result["area_styles"]
logger.info(
f"디자인 조정: {', '.join(f'{k}={bool(v)}' for k, v in result['area_styles'].items())}"
)
else:
page["area_styles"] = {}
logger.info("디자인 조정: 조정 불필요 또는 파싱 실패")
except Exception as e:
logger.warning(f"디자인 조정 실패 (기존 스타일로 렌더링): {e}")
# 실패 시 area_styles 없음 → 기존과 동일하게 렌더링
for page in layout_concept.get("pages", []):
if "area_styles" not in page:
page["area_styles"] = {}
return layout_concept
async def _review_balance(
html: str,
layout_concept: dict[str, Any],
content: str,
analysis: dict[str, Any] | None = None,
measurement_text: str = "",
screenshot_b64: str | None = None,
) -> dict[str, Any] | None:
"""5단계: Kei 실장이 조립 결과를 최종 검수한다. (J-7 + Phase L)
Kei가 콘텐츠 관점 + 실제 렌더링 측정 결과 기반으로 검수:
- 핵심 메시지 전달 여부
- 콘텐츠 흐름 ↔ 블록 배치 일치
- 실제 px 기반 높이/비중 검증 (Phase L)
- 중요 내용 누락/축소 여부
"""
try:
# 블록별 텍스트 양 요약
block_summary = []
for page in layout_concept.get("pages", []):
for block in page.get("blocks", []):
data = block.get("data", {})
text_len = len(json.dumps(data, ensure_ascii=False))
block_summary.append(
f" {block.get('area')}/{block.get('type')}: "
f"데이터 {text_len}자"
)
# zone 예산 정보 (analysis에서 프리셋 추출)
zone_budget_text = ""
overflow_hint_text = ""
if analysis:
preset_name = select_preset(analysis)
preset = LAYOUT_PRESETS.get(preset_name, {})
zone_lines = [
f"- {name}: ~{z['budget_px']}px (너비 {z['width_pct']}%)"
for name, z in preset.get("zones", {}).items()
]
zone_budget_text = (
"\n\n## zone별 높이 예산\n" + "\n".join(zone_lines)
)
# Stage 2에서 감지한 예상 overflow 힌트
overflow_hint = layout_concept.get("overflow", [])
if overflow_hint:
hint_lines = [
f"- {o['area']}: 예상 {o['total_px']}px > 예산 {o['budget_px']}px "
f"(+{o['overflow_px']}px 초과)"
for o in overflow_hint
]
overflow_hint_text = (
"\n\n## 높이 초과 힌트 (2단계 예상치, 참고용)\n"
+ "\n".join(hint_lines)
)
# Phase L: 렌더링 측정 결과를 overflow_hint에 추가 (실제 px 기반)
if measurement_text:
overflow_hint_text += f"\n\n{measurement_text}"
# Kei로 최종 검수 (레거시 — Phase S에서는 메인 흐름에서 미사용)
from src.kei_client import call_kei_final_review
return await call_kei_final_review(
html, block_summary, zone_budget_text, overflow_hint_text, analysis,
screenshot_b64=screenshot_b64,
)
except Exception as e:
logger.warning(f"재검토 실패: {e}")
return None
async def _apply_adjustments(
layout_concept: dict[str, Any],
review: dict[str, Any],
content: str,
) -> dict[str, Any]:
"""재검토 결과에 따라 텍스트를 재편집한다."""
adjustments = review.get("adjustments", [])
if not adjustments:
return layout_concept
# 조정이 필요한 블록만 재편집
for adj in adjustments:
area = adj.get("block_area", "")
action = adj.get("action", "")
ratio = adj.get("target_ratio")
detail = adj.get("detail", "")
for page in layout_concept.get("pages", []):
for block in page.get("blocks", []):
if block.get("area") != area:
continue
if action == "expand" and ratio:
for key in block.get("char_guide", {}):
block["char_guide"][key] = int(
block["char_guide"][key] * ratio
)
logger.info(f"조정: {area} → expand ×{ratio} ({detail})")
elif action == "shrink" and ratio:
for key in block.get("char_guide", {}):
block["char_guide"][key] = int(
block["char_guide"][key] * ratio
)
logger.info(f"조정: {area} → shrink ×{ratio} ({detail})")
elif action == "rewrite":
if "data" in block:
del block["data"]
block["reason"] = f"재작성: {detail}"
logger.info(f"조정: {area} → rewrite ({detail})")
elif action == "kei_trim":
max_chars = adj.get("max_chars", 200)
if "char_guide" not in block:
block["char_guide"] = {}
for key in block.get("char_guide", {}):
block["char_guide"][key] = min(
block["char_guide"][key], max_chars
)
if not block["char_guide"]:
block["char_guide"] = {"text": max_chars}
logger.info(
f"조정: {area} → kei_trim max_chars={max_chars} "
f"({detail})"
)
elif action == "kei_restructure":
block["detail_target"] = True
if "data" in block:
del block["data"]
block["reason"] = f"재구성: {detail}"
logger.info(
f"조정: {area} → kei_restructure (detail_target)"
)
# 조정된 가이드로 재편집 (레거시 — Phase S에서는 미사용)
from src.content_editor import fill_content
layout_concept = await fill_content(content, layout_concept)
return layout_concept
def _build_overflow_context(
layout_concept: dict[str, Any],
overflow_adjs: list[dict],
) -> list[dict]:
"""Sonnet이 감지한 overflow_detected를 Kei에게 전달할 형태로 변환한다.
실제 채워진 블록 데이터(텍스트)를 포함하여 Kei가 판단할 수 있도록 한다.
"""
overflows = []
for adj in overflow_adjs:
area = adj.get("block_area", "")
# 해당 zone의 블록 정보 + 실제 텍스트 추출
area_blocks = []
for page in layout_concept.get("pages", []):
for block in page.get("blocks", []):
if block.get("area") == area:
data = block.get("data", {})
text_preview = json.dumps(data, ensure_ascii=False)[:300]
area_blocks.append({
"type": block.get("type", ""),
"purpose": block.get("purpose", ""),
"topic_id": block.get("topic_id"),
"text_preview": text_preview,
})
overflows.append({
"area": area,
"detail": adj.get("detail", ""),
"blocks": area_blocks,
})
return overflows
def _convert_kei_judgment(
review_result: dict[str, Any],
kei_judgment: dict[str, Any],
) -> None:
"""Kei의 trim/restructure 판단을 review_result.adjustments에 반영한다.
기존 overflow_detected 항목을 kei_trim 또는 kei_restructure로 교체.
"""
decision = kei_judgment.get("decision", "")
new_adjs = []
for adj in review_result.get("adjustments", []):
if adj.get("action") == "overflow_detected":
# overflow_detected → Kei 판단으로 교체
if decision == "trim":
for target in kei_judgment.get("trim_targets", []):
new_adjs.append({
"block_area": adj.get("block_area", ""),
"action": "kei_trim",
"max_chars": target.get("max_chars", 200),
"topic_id": target.get("topic_id"),
"detail": target.get("reason", ""),
})
elif decision == "restructure":
for tid in kei_judgment.get("detail_topics", []):
new_adjs.append({
"block_area": adj.get("block_area", ""),
"action": "kei_restructure",
"topic_id": tid,
"detail": kei_judgment.get("reason", ""),
})
else:
# 기존 expand/shrink/rewrite는 그대로 유지
new_adjs.append(adj)
review_result["adjustments"] = new_adjs
def _parse_json(text: str) -> dict[str, Any] | None:
"""텍스트에서 JSON을 추출한다."""
patterns = [
r"```json\s*(.*?)```",
r"```\s*(.*?)```",
r"(\{.*\})",
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
continue
return None