"""Phase T: 11-Stage 파이프라인. Stage 0: MDX 표준화 (코드) Stage 1A: Kei 꼭지 추출 (AI) Stage 1B: 컨셉 구체화 (AI) Stage 1.5a: 폰트 위계 + 컨테이너 비율 역산 (코드) Stage 1.7: 참고 블록 선택 (코드) Stage 1.5b: 디자인 예산 재계산 (코드) Stage 2: HTML 생성 (AI — Claude Sonnet) Stage 3: 렌더링 조립 (코드) Stage 4: 품질 게이트 (Selenium + Opus Vision) Stage 5: 서빙 (코드) 이전 Phase S 파이프라인은 generate_slide_legacy()로 보존. """ from __future__ import annotations import asyncio import json import logging import re import time from pathlib import Path from typing import Any, AsyncIterator import anthropic from src.pipeline_context import ( PipelineContext, create_context, StageFailure, build_retry_feedback, Topic, NormalizedContent, Analysis, PageStructure, ContainerInfo, TextBudget, DesignBudget, FontHierarchy, BlockReference, ) from src.kei_client import classify_content, refine_concepts, generate_structured_text from src.design_director import LAYOUT_PRESETS, select_preset from src.image_utils import get_image_sizes, embed_images from src.space_allocator import calculate_container_specs from src.slide_measurer import measure_rendered_heights, capture_slide_screenshot from src.config import settings logger = logging.getLogger(__name__) # Kei API 재시도 설정 (P0 수정: 무한 루프 방지) KEI_RETRY_INTERVAL = 10 KEI_MAX_RETRY_ATTEMPTS = 30 # 최대 30회 (5분) KEI_MAX_RETRY_DURATION = 300 # 절대 제한 300초 # ══════════════════════════════════════ # T-0.B: run_stage() 공통 실행 패턴 # ══════════════════════════════════════ async def run_stage( stage_fn, context: PipelineContext, stage_name: str, max_retries: int = 0, ) -> PipelineContext: """모든 Stage의 공통 실행 패턴. transform(context) → validate(result) → update(context) → snapshot() Args: stage_fn: async def stage(context) -> dict. 성공 시 업데이트할 필드 dict 반환. 실패 시 dict에 "_errors" 키 포함. context: 현재 누적 컨텍스트 stage_name: 스냅샷 파일명 + 로그용 max_retries: RETRYABLE 에러 시 재시도 횟수. 코드 Stage는 0. """ for attempt in range(max_retries + 1): result = await stage_fn(context) errors = result.pop("_errors", []) if not errors: # 성공: 컨텍스트에 병합 + 스냅샷 context = context.model_copy(update=result) context.save_snapshot(stage_name) logger.info(f"[{stage_name}] 완료") return context # 에러 처리 severity = errors[0].get("severity", "RETRYABLE") if errors else "RETRYABLE" if severity == "FATAL": context.log_error(stage_name, errors, attempt, "FATAL") raise StageFailure(stage_name, errors) if severity == "ADJUSTABLE": # 코드로 자동 조정 — _adjustments 키에 조정된 값이 있으면 적용 adjustments = result.pop("_adjustments", {}) if adjustments: context = context.model_copy(update=adjustments) context.warnings.append(f"[{stage_name}] 자동 조정: {errors}") context.save_snapshot(stage_name) logger.warning(f"[{stage_name}] ADJUSTABLE 자동 조정 적용") return context # RETRYABLE context.log_error(stage_name, errors, attempt, "RETRYABLE") if attempt < max_retries: # Self-Refine 피드백 생성 context.retry_feedback = build_retry_feedback( stage_name, errors, context.raw_content[:500] ) logger.warning( f"[{stage_name}] 재시도 {attempt + 1}/{max_retries}: " f"{[e.get('localization', '') for e in errors]}" ) else: logger.error(f"[{stage_name}] 재시도 소진 ({max_retries}회)") raise StageFailure(stage_name, errors) # ══════════════════════════════════════ # T-0.C: Phase T 파이프라인 (11 Stage) # ══════════════════════════════════════ async def generate_slide( content: str, manual_layout: dict[str, Any] | None = None, base_path: str = "", ) -> AsyncIterator[dict[str, str]]: """Phase T 파이프라인: MDX → 슬라이드 HTML. 11 Stage를 순차 실행하며 SSE progress 이벤트를 yield. 각 Stage는 run_stage() 패턴을 따름. """ ctx = create_context(content, base_path) try: # ── Stage 0: MDX 표준화 ── yield {"event": "progress", "data": "0/7 MDX 표준화 중..."} async def stage_0(context: PipelineContext) -> dict: from src.mdx_normalizer import normalize_mdx_content, validate_stage0 result = normalize_mdx_content(context.raw_content) errors = validate_stage0(result, context.raw_content) if errors: return {"_errors": errors} # popup_id 부여 (Stage 0 시점) from src.pipeline_context import PopupItem raw_popups = result.get("popups", []) popup_items = [] for pi, rp in enumerate(raw_popups, 1): popup_items.append(PopupItem( popup_id=f"popup_{pi}", title=rp.get("title", ""), content=rp.get("content", ""), )) return { "normalized": NormalizedContent( clean_text=result["clean_text"], title=result["title"], images=result["images"], popups=popup_items, tables=result["tables"], sections=result["sections"], ), } ctx = await run_stage(stage_0, ctx, "stage_0", max_retries=0) # ── Stage 1A: Kei 꼭지 추출 ── yield {"event": "progress", "data": "1/7 Kei 실장이 꼭지를 추출 중..."} async def stage_1a(context: PipelineContext) -> dict: if manual_layout: analysis_raw = manual_layout else: # Stage 0에서 정규화된 텍스트를 Kei에게 전달 (JSX/frontmatter 제거됨) input_text = context.normalized.clean_text or context.raw_content analysis_raw = await _retry_kei(classify_content, input_text) topics_raw = analysis_raw.get("topics", []) # Kei 응답에 있는 키만 전달, 없는 건 Pydantic 기본값 사용 topics = [Topic(**{k: v for k, v in t.items() if k in Topic.model_fields}) for t in topics_raw] page_struct_raw = analysis_raw.get("page_structure", {}) page_structure = PageStructure(roles=page_struct_raw) # X'-1: 제목은 원본 MDX frontmatter에서 가져옴 (Kei가 바꾸지 않음) original_title = context.normalized.title or analysis_raw.get("title", "") analysis = Analysis( core_message=analysis_raw.get("core_message", ""), conclusion_text=analysis_raw.get("conclusion_text", ""), title=original_title, total_pages=analysis_raw.get("total_pages", 1), layout_template=analysis_raw.get("layout_template", "A"), ) # I-6: 슬라이드 제목 ↔ 첫 꼭지 제목 중복 검증 if topics: from difflib import SequenceMatcher similarity = SequenceMatcher( None, analysis.title, topics[0].title ).ratio() if similarity > 0.7: topics[0].title = f"{topics[0].purpose}: {topics[0].summary[:30]}" logger.warning(f"[제목 중복 교정] 유사도 {similarity:.0%}") # T-2: Stage 1A 검증 (Pydantic + 원본 대조) from src.validators import validate_stage_1a validation_errors = validate_stage_1a( analysis_raw, context.normalized.clean_text, ) if validation_errors: return {"_errors": validation_errors} # Phase Y: page_structure는 Kei가 만들지 않음. # Kei 응답에 page_structure가 있어도 무시. # 코드가 section_parser + 블록 매칭으로 생성 (Stage 1A 후 별도 단계) return { "analysis": analysis, "topics": topics, "page_structure": PageStructure(roles={}), # 빈 상태, 아래에서 채움 } ctx = await run_stage(stage_1a, ctx, "stage_1a", max_retries=2) # ── Phase Y: 영역 확정 (코드: normalized.sections 기반 + 블록 매칭) ── from src.section_parser import extract_major_sections, extract_conclusion_text, map_topics_to_sections, classify_group_relations, get_candidate_blocks_for_schema, detect_component_popups from src.block_reference import _match_by_tags, _load_catalog # source of truth = normalized.sections (Stage 0 산출물) norm_sections = ctx.normalized.sections if hasattr(ctx.normalized, 'sections') else [] if hasattr(norm_sections, '__iter__') and norm_sections: if hasattr(norm_sections[0], 'model_dump'): norm_sections = [s.model_dump() for s in norm_sections] elif not isinstance(norm_sections[0], dict): norm_sections = [dict(s) for s in norm_sections] major_sections = extract_major_sections(norm_sections) # popup 대상 sub_title 목록 (컴포넌트 태그가 있던 섹션) popup_sub_titles = [] component_tags = re.findall(r'<([A-Z]\w+)\s*/>', ctx.raw_content) if component_tags: raw_lines = ctx.raw_content.split("\n") for tag_name in component_tags: current_section = "" for line in raw_lines: if line.strip().startswith("### "): current_section = re.sub(r'^#{1,3}\s*\d*\.?\d*\s*', '', line.strip()).strip() if f"<{tag_name}" in line: if current_section: popup_sub_titles.append(current_section) break # Y-13b: group relation 분류 major_sections = classify_group_relations( major_sections, normalized_sections=norm_sections, popup_sub_titles=popup_sub_titles, ) # conclusion_text: raw MDX에서 추출 또는 기존 값 정제 conclusion_text = ctx.analysis.conclusion_text or "" if not conclusion_text: conclusion_text = extract_conclusion_text(ctx.raw_content) # 선행 불릿 마커 정제 (Kei가 * 포함해서 넣을 수 있음) if conclusion_text: conclusion_text = re.sub(r'^[\*•\-]\s*', '', conclusion_text).strip() conclusion_text = re.sub(r'\*+', '', conclusion_text).strip() if conclusion_text != (ctx.analysis.conclusion_text or ""): ctx = ctx.model_copy(update={ "analysis": ctx.analysis.model_copy(update={"conclusion_text": conclusion_text}), }) # 꼭지-대목차 매핑 topic_dicts = [t.model_dump() for t in ctx.topics] section_topic_map = map_topics_to_sections(topic_dicts, major_sections) # 대목차별 묶음으로 블록 tag 매칭 → 영역 확정 # 블록 매칭 기준: Kei topic 수가 아니라 normalized sub_titles 수 catalog = _load_catalog() page_struct_roles = {} zone_names = ["top", "bottom"] # major_sections를 title로 빠르게 찾기 major_sec_map = {s["title"]: s for s in major_sections} for i, (sec_title, tids) in enumerate(section_topic_map.items()): # sub_titles = normalized.sections에서 파싱된 소항목 목록 sec = major_sec_map.get(sec_title, {}) sub_titles = sec.get("sub_titles", []) # sidebar 판단: 모든 꼭지가 reference면 sidebar all_reference = all( t.role == "reference" for t in ctx.topics if t.id in tids ) if all_reference: zone = "sidebar" elif i < len(zone_names): zone = zone_names[i] else: zone = f"bottom_{i}" # 블록 매칭: sub_titles 수 기준 (Kei topic 수 아님) slot_count = len(sub_titles) if sub_titles else len(tids) tag_match = _match_by_tags(catalog, slot_count, sub_titles, 300, zone) if tag_match: logger.info(f"[Phase Y] '{sec_title}' → 블록 {tag_match['id']} (tag_match) 확정") else: # Y-13d: tag 매칭 실패 → group schema 후보로 블록 찾기 group_schema = sec.get("group_schema", "") schema_candidates = get_candidate_blocks_for_schema(group_schema) if schema_candidates: # catalog에서 첫 번째 존재하는 후보 선택 for cand_id in schema_candidates: cand = next((b for b in catalog if b.get("id") == cand_id), None) if cand: tag_match = cand # schema 기반 선택 logger.info(f"[Y-13d] '{sec_title}' → schema={group_schema} → 블록 {cand_id}") break if not tag_match: logger.warning(f"[Y-13d] '{sec_title}' → schema={group_schema} → 블록 매칭 실패") # weight: 콘텐츠 양(content 길이) 기반 sec_content_len = len(sec.get("content", "")) page_struct_roles[sec_title] = { "zone": zone, "topic_ids": tids, "weight": sec_content_len, "sub_titles": sub_titles, "sub_types": sec.get("sub_types", []), "group_schema": sec.get("group_schema", ""), } # weight를 비율로 변환 (합계 1.0) total_content = sum(info["weight"] for info in page_struct_roles.values()) if total_content > 0: for role in page_struct_roles: page_struct_roles[role]["weight"] = round(page_struct_roles[role]["weight"] / total_content, 2) else: # content가 없으면 균등 분배 n = len(page_struct_roles) for role in page_struct_roles: page_struct_roles[role]["weight"] = round(1.0 / n, 2) # Phase Y: layout_template도 코드가 결정 # sidebar zone이 있으면 Type A, 없으면 Type B has_sidebar = any( info.get("zone") == "sidebar" for info in page_struct_roles.values() ) determined_layout = "A" if has_sidebar else "B" logger.info(f"[Phase Y] 영역 확정: {list(page_struct_roles.keys())} → layout={determined_layout}") ctx = ctx.model_copy(update={ "page_structure": PageStructure(roles=page_struct_roles), "mdx_sections": major_sections, # normalized.sections 기반 대목차 (assembler용) "analysis": ctx.analysis.model_copy(update={"layout_template": determined_layout}), }) # Phase Y: page_structure 검증 (section_parser가 만든 결과) from src.validators import validate_page_structure ps_errors = validate_page_structure(page_struct_roles) if ps_errors: logger.warning(f"[Phase Y] page_structure 검증 경고: {ps_errors}") # Y-14: 컴포넌트 popup 감지 + target_role 확정 component_popups = detect_component_popups(ctx.raw_content, ctx.base_path or "samples/mdx") if component_popups: from src.pipeline_context import PopupItem existing_popups = list(ctx.normalized.popups or []) # target_role 결정: raw MDX에서 컴포넌트 태그가 어느 ## 섹션에 있었는지 raw_lines = ctx.raw_content.split("\n") for cp in component_popups: tag = cp.get("tag", f"<{cp['name']} />") target_role = None current_section = None for line in raw_lines: if line.strip().startswith("## "): # ## 번호 제거: "## 2. DX 기반..." → "DX 기반..." # "## 2. DX 기반..." → "DX 기반..." sec_title = re.sub(r'^#{1,3}\s*\d*\.?\s*', '', line.strip()).strip() # page_structure roles에서 매칭 for rname in page_struct_roles: if sec_title and len(sec_title) >= 3 and sec_title[:6] in rname: current_section = rname break if tag.replace(" ", "") in line.replace(" ", ""): target_role = current_section break existing_popups.append(PopupItem( popup_id=f"comp_{cp['name']}", title=f"상세: {cp['name']}", content=cp["content_html"], source=cp["source"], is_component=True, target_role=target_role, )) logger.info(f"[Y-14] 컴포넌트 popup: {cp['name']} → target_role='{target_role}'") ctx = ctx.model_copy(update={ "normalized": ctx.normalized.model_copy(update={"popups": existing_popups}), }) logger.info(f"[Y-14] 컴포넌트 popup {len(component_popups)}개 추가") # ── Stage 1B: 컨셉 구체화 ── yield {"event": "progress", "data": "1.5/7 컨셉 구체화 중..."} async def stage_1b(context: PipelineContext) -> dict: # manual_layout에서 이미 source_data/structured_text가 있으면 스킵 has_source = any(t.source_data for t in context.topics) if has_source: logger.info("[1단계-B] manual_layout에서 source_data 제공됨 — Kei 호출 스킵") return {"topics": list(context.topics)} # Kei에게 원본 + 1A 결과 전달 analysis_dict = { "topics": [t.model_dump() for t in context.topics], "page_structure": context.page_structure.roles, "core_message": context.analysis.core_message, "title": context.analysis.title, } input_text = context.normalized.clean_text or context.raw_content refined = await refine_concepts(input_text, analysis_dict) if refined is None: return {"_errors": [{"severity": "RETRYABLE", "localization": "refine_concepts 반환 None"}]} # 1B 결과를 topics에 병합 refined_topics = refined.get("topics", []) updated_topics = [] for t in context.topics: match = next((rt for rt in refined_topics if rt.get("id") == t.id), None) if match: updated = t.model_copy(update={ "relation_type": match.get("relation_type", t.relation_type), "expression_hint": match.get("expression_hint", t.expression_hint), "source_data": match.get("source_data", t.source_data), }) updated_topics.append(updated) else: updated_topics.append(t) # T-2: Stage 1B 검증 (모순 탐지 + 원본 대조) from src.validators import validate_stage_1b validation_errors = validate_stage_1b( [t.model_dump() for t in updated_topics], context.normalized.clean_text, raw_content=context.raw_content, layout_template=context.analysis.layout_template, ) if validation_errors: return {"_errors": validation_errors} return {"topics": updated_topics} ctx = await run_stage(stage_1b, ctx, "stage_1b", max_retries=2) # ── Stage 1B 보완: structured_text 생성 (별도 Kei 호출) ── has_structured = any(t.structured_text for t in ctx.topics) if has_structured: logger.info("[1단계-B-ST] structured_text 이미 있음 — Kei 호출 스킵") analysis_for_st = {"topics": [t.model_dump() for t in ctx.topics]} else: input_text = ctx.normalized.clean_text or ctx.raw_content analysis_for_st = { "topics": [t.model_dump() for t in ctx.topics], } analysis_for_st = await generate_structured_text(input_text, analysis_for_st) # structured_text를 topics에 병합 st_map = {t["id"]: t.get("structured_text", "") for t in analysis_for_st["topics"]} updated_topics = [] for t in ctx.topics: st = st_map.get(t.id, "") if st: updated_topics.append(t.model_copy(update={"structured_text": st})) else: updated_topics.append(t) ctx = ctx.model_copy(update={"topics": updated_topics}) ctx.save_snapshot("stage_1b") # structured_text 포함하여 다시 저장 logger.info(f"[1단계-B-ST] structured_text 병합 완료") # ── Stage 1.5a: 컨테이너 스펙 계산 ── yield {"event": "progress", "data": "2/7 컨테이너 계산 중..."} async def stage_1_5a(context: PipelineContext) -> dict: from src.space_allocator import calculate_font_hierarchy, calculate_dynamic_ratio # 이미지 크기 측정 image_sizes = get_image_sizes(context.raw_content, context.base_path) # 역할별 텍스트 양 측정 role_text_lengths = {} for role, info in context.page_structure.roles.items(): if not isinstance(info, dict): continue role_text = context.get_role_content(role) role_text_lengths[role] = len(role_text) # T-5: 폰트 위계 확정 (텍스트 양 기반, 역할 인식) font_hierarchy_dict = calculate_font_hierarchy(role_text_lengths) font_hierarchy = FontHierarchy( key_msg=font_hierarchy_dict.get("핵심", 14.0), core=font_hierarchy_dict.get("본심", 12.0), bg=font_hierarchy_dict.get("배경", 11.0), sidebar=font_hierarchy_dict.get("첨부", 10.0), ) # 프리셋 선택 (비율 계산보다 먼저 — 프리셋의 기본 비율을 fallback으로 사용) analysis_dict = { "topics": [t.model_dump() for t in context.topics], "page_structure": context.page_structure.roles, } preset_name = select_preset(analysis_dict) preset = LAYOUT_PRESETS.get(preset_name, {}) # T-5: 동적 비율 역산 (sidebar 텍스트 양 기반 + 프리셋 기본 비율) container_ratio = calculate_dynamic_ratio( role_text_lengths, font_hierarchy_dict, slide_width=settings.slide_width, slide_height=settings.slide_height, preset=preset, ) logger.info( f"[T-5] 폰트 위계: 핵심={font_hierarchy.key_msg}, 본심={font_hierarchy.core}, " f"배경={font_hierarchy.bg}, 첨부={font_hierarchy.sidebar} / " f"비율: body:sidebar={container_ratio[0]}:{container_ratio[1]}" ) # Phase X-B: 유형에 따라 컨테이너 생성 분기 if context.analysis.layout_template in ("B", "B'", "B''"): from src.space_allocator import build_containers_type_b container_specs = build_containers_type_b( page_structure=context.page_structure.roles, slide_width=settings.slide_width, slide_height=settings.slide_height, image_sizes=image_sizes if isinstance(image_sizes, list) else ( [{**v, "key": k} for k, v in image_sizes.items()] if image_sizes else None ), ) logger.info(f"[X-B] 유형 B 컨테이너 생성") else: # 유형 A: 기존 코드 그대로 container_specs = calculate_container_specs( page_structure=context.page_structure.roles, topics=[t.model_dump() for t in context.topics], preset=preset, slide_width=settings.slide_width, slide_height=settings.slide_height, ) # ContainerSpec → ContainerInfo 변환 containers = {} for role, spec in container_specs.items(): containers[role] = ContainerInfo( role=spec.role, zone=spec.zone, topic_ids=spec.topic_ids, weight=spec.weight, height_px=spec.height_px, width_px=spec.width_px, max_height_cost=spec.max_height_cost, block_constraints=spec.block_constraints, ) # 이미지 정보 구성 slide_images = [] if image_sizes: import base64 as b64_mod # image_sizes가 list[dict]이면 직접 순회, dict이면 items() img_items = image_sizes if isinstance(image_sizes, list) else [ {**v, "key": k} for k, v in image_sizes.items() ] for img_info in img_items: img_key = img_info.get("path", img_info.get("key", "")) img_path = Path(context.base_path) / Path(img_key).name if context.base_path else Path(img_key) img_b64 = "" if img_path.exists(): img_b64 = b64_mod.b64encode(img_path.read_bytes()).decode() slide_images.append({ "path": str(img_path), "width": img_info.get("width", 0), "height": img_info.get("height", 0), "ratio": round(img_info.get("width", 1) / max(1, img_info.get("height", 1)), 2), "topic_id": img_info.get("topic_id"), "b64": img_b64, }) updated_analysis = context.analysis.model_copy(update={ "image_sizes": image_sizes or {}, }) return { "preset_name": preset_name, "preset": preset, "containers": containers, "slide_images": slide_images, "analysis": updated_analysis, "font_hierarchy": font_hierarchy, "container_ratio": container_ratio, } ctx = await run_stage(stage_1_5a, ctx, "stage_1_5a", max_retries=0) # ── Stage 1.7: 참고 블록 선택 + 디자인 레퍼런스 ── yield {"event": "progress", "data": "2.5/7 참고 블록 선택 중..."} async def stage_1_7(context: PipelineContext) -> dict: from src.block_reference import select_and_generate_references references_raw = select_and_generate_references( topics=[t.model_dump() for t in context.topics], containers=context.containers, page_structure=context.page_structure.roles, ) # V-1: dict → list[BlockReference] 모델 변환 (꼭지별) references = {} for role, ref_list in references_raw.items(): references[role] = [ BlockReference( block_id=rd["block_id"], variant=rd["variant"], visual_type=rd["visual_type"], schema_info=rd["schema_info"], design_reference_html=rd["design_reference_html"], topic_id=rd.get("topic_id"), supporting_topic_ids=rd.get("supporting_topic_ids", []), is_hierarchical=rd.get("is_hierarchical", False), ) for rd in ref_list ] return {"references": references} ctx = await run_stage(stage_1_7, ctx, "stage_1_7", max_retries=0) # ── Stage 1.8: 콘텐츠-컨테이너 적합성 검증 + 재배분 + 보강 ── yield {"event": "progress", "data": "2.8/7 적합성 검증 중..."} async def stage_1_8(context: PipelineContext) -> dict: from src.fit_verifier import ( calculate_fit, redistribute, analyze_enhancements, apply_enhancements, build_escalation_report, build_enhancement_report, calculate_sub_layout, EnhancementAnalysis, ) from src.block_assembler import assemble_slide_html_final as assemble_slide_html from src.slide_measurer import measure_rendered_heights refs_dict = {} for role, ref_list in context.references.items(): refs_dict[role] = [r.model_dump() for r in ref_list] containers_dict = {} for role, ci in context.containers.items(): containers_dict[role] = { "height_px": ci.height_px, "width_px": ci.width_px, "zone": ci.zone, } font_h = context.font_hierarchy.model_dump() normalized = context.normalized.model_dump() core_message = context.analysis.core_message # ── before: 비중대로 배정된 컨테이너 (현재 context.containers) ── run_dir = context.get_run_dir() steps_dir = run_dir / "steps" steps_dir.mkdir(parents=True, exist_ok=True) # before 시각화 저장 (Stage 1.5a의 초기 배정 상태) from src.step_visualizer import _gen_stage_1_8_fit_before _gen_stage_1_8_fit_before(context, steps_dir) logger.info(f"[Stage 1.8] before: " + ", ".join( f"{r}={ci.height_px}px" for r, ci in context.containers.items() )) # ── filled 전 sub_layouts 계산 (이미지/텍스트/keymsg 배치 결정) ── initial_fit = calculate_fit( topics=[t.model_dump() for t in context.topics], page_structure=context.page_structure.roles, containers=containers_dict, references=refs_dict, font_hierarchy=font_h, normalized=normalized, core_message=core_message, ) empty_enhancements = EnhancementAnalysis() pre_sub_layouts = {} for role, rf in initial_fit.roles.items(): ci = context.containers.get(role) if not ci or not rf.topic_fits: continue layout = calculate_sub_layout( role=role, main_height_px=ci.height_px, main_width_px=ci.width_px, topic_fits=rf.topic_fits, enhancements=empty_enhancements, font_hierarchy=font_h, ) pre_sub_layouts[role] = { "main_height_px": layout.main_height_px, "main_width_px": layout.main_width_px, "sub_containers": [ {"name": sc.name, "width_px": sc.width_px, "height_px": sc.height_px, "align": sc.align} for sc in layout.sub_containers ], "table_rows": layout.table_rows, } # context에 sub_layouts 반영 후 filled 생성 context = context.model_copy(update={"sub_layouts": pre_sub_layouts}) # ── filled→측정→Kei 재판단 루프 (최대 3회) ── kei_decisions = [] updated_containers = dict(context.containers) fit_analysis = None filled_measurement = {} font_scale = 1.0 # fit 루프에서 축소 MAX_FIT_RETRIES = 3 for fit_round in range(MAX_FIT_RETRIES): # context에 현재 kei_decisions 반영 (2회차부터 popup 결정이 반영됨) if kei_decisions: context = context.model_copy(update={ "enhancement_result": { **(context.enhancement_result or {}), "kei_decisions": kei_decisions, }, "containers": updated_containers, }) # ── filled: 컨테이너에 블록+텍스트 채움 (측정용: overflow:auto) ── filled_html = assemble_slide_html(context, measure_mode=True, font_scale=font_scale) (steps_dir / f"stage_1_8_filled{'_r'+str(fit_round) if fit_round else ''}.html").write_text( filled_html.replace('', '\n' f'
' f'Stage 1.8: filled (round {fit_round+1}/{MAX_FIT_RETRIES})
\n' '
' 'before 컨테이너에 블록+텍스트를 채움. 넘치는 영역 확인.
\n', 1), encoding="utf-8", ) # ── Selenium 측정 ── filled_measurement = await asyncio.to_thread(measure_rendered_heights, filled_html) logger.info(f"[Stage 1.8] round {fit_round+1} 측정 완료") # ── overflow 확인 ── has_overflow = False for zone_name, zone_data in filled_measurement.get("zones", {}).items(): if zone_data.get("overflowed"): excess = zone_data.get("excess_px", 0) scroll_h = zone_data.get("scrollHeight", 0) has_overflow = True if zone_name == "sidebar": for role, ci in updated_containers.items(): if ci.zone == "sidebar": new_h = max(ci.height_px, scroll_h + 10) updated_containers[role] = ci.model_copy(update={"height_px": new_h}) logger.info(f"[Stage 1.8] sidebar 확장: {role} → {new_h}px") else: logger.info(f"[Stage 1.8] {zone_name} overflow +{excess}px") if not has_overflow: logger.info(f"[Stage 1.8] round {fit_round+1}: overflow 없음 — 완료") break # ── fit 계산 + 재배분 ── for role, ci in updated_containers.items(): containers_dict[role] = { "height_px": ci.height_px, "width_px": ci.width_px, "zone": ci.zone, } fit_analysis = calculate_fit( topics=[t.model_dump() for t in context.topics], page_structure=context.page_structure.roles, containers=containers_dict, references=refs_dict, font_hierarchy=font_h, normalized=normalized, core_message=core_message, ) fit_analysis = redistribute(fit_analysis, containers_dict) # Type B: Selenium 실측 기반 zone 간 재배분 if context.analysis.layout_template in ("B", "B'", "B''"): # Selenium 측정에서 실제 overflow/여유를 가져옴 zone_to_roles = {} for role, ci in updated_containers.items(): # Selenium CSS 클래스 매핑: bottom_left/bottom_right → bottom z = ci.zone if z in ("bottom_left", "bottom_right"): z = "bottom" if z not in zone_to_roles: zone_to_roles[z] = [] zone_to_roles[z].append(role) deficit_roles = [] surplus_roles = [] for zn, zd in filled_measurement.get("zones", {}).items(): excess = zd.get("excess_px", 0) scroll_h = zd.get("scrollHeight", 0) roles_in_zone = zone_to_roles.get(zn, []) if excess > 0: for r in roles_in_zone: deficit_roles.append((r, float(excess))) elif roles_in_zone: # 실제 콘텐츠(scrollHeight)와 할당 높이 차이로 여유 계산 allocated = sum(updated_containers[r].height_px for r in roles_in_zone if r in updated_containers) slack = allocated - scroll_h if slack > 8: for r in roles_in_zone: surplus_roles.append((r, float(slack))) if deficit_roles and surplus_roles: total_deficit = sum(d for _, d in deficit_roles) total_surplus = sum(s for _, s in surplus_roles) # surplus의 최대 50%만 이전 — 하단 최소 공간 보장 transferable = min(total_deficit, total_surplus * 0.5) if transferable > 0: for role, deficit in deficit_roles: share = transferable * (deficit / total_deficit) old = fit_analysis.redistribution.get(role, fit_analysis.roles[role].allocated_px) fit_analysis.redistribution[role] = old + share for role, surplus in surplus_roles: share = transferable * (surplus / total_surplus) old = fit_analysis.redistribution.get(role, fit_analysis.roles[role].allocated_px) fit_analysis.redistribution[role] = old - share logger.info(f"[Stage 1.8] zone 간 재배분: {transferable:.0f}px 이전") # 재배분된 컨테이너 크기 적용 for role, ci in updated_containers.items(): new_h = fit_analysis.redistribution.get(role, ci.height_px) if fit_analysis.redistribution else ci.height_px updated_containers[role] = ci.model_copy(update={"height_px": int(new_h)}) logger.info(f"[Stage 1.8] round {fit_round+1} after: " + ", ".join( f"{r}={ci.height_px}px" for r, ci in updated_containers.items() )) # ── Phase Y: font_scale 축소 (재배분만으로 부족할 때) ── # 재배분 후에도 여전히 overflow면 font를 줄임 font_scale = max(0.7, font_scale - 0.1) logger.info(f"[Stage 1.8] round {fit_round+1}: font_scale → {font_scale:.1f}") # ── Kei 에스컬레이션: overflow 있으면 팝업 분리 판단 요청 ── # calculate_fit의 needs_escalation 또는 Selenium 측정의 실제 overflow if fit_analysis.needs_escalation or has_overflow: from src.kei_client import call_kei_fit_escalation report = build_escalation_report(fit_analysis) # Selenium 실측 overflow 정보를 report에 추가 (calculate_fit과 실측이 다를 수 있음) selenium_overflow_lines = [] for zn, zd in filled_measurement.get("zones", {}).items(): if zd.get("overflowed"): selenium_overflow_lines.append( f" ❌ {zn} zone: 실측 {zd.get('scrollHeight')}px / 가용 {zd.get('clientHeight')}px → +{zd.get('excess_px', 0)}px 초과" ) if selenium_overflow_lines: report += "\n\nSelenium 실측 overflow:\n" + "\n".join(selenium_overflow_lines) logger.info(f"[Stage 1.8] round {fit_round+1} 에스컬레이션 필요") kei_result = await call_kei_fit_escalation( fit_report=report, topics=[t.model_dump() for t in context.topics], content_summary=context.raw_content[:1500], role_names=list(context.page_structure.roles.keys()), ) if kei_result: new_decisions = kei_result.get("decisions", []) kei_decisions.extend(new_decisions) logger.info(f"[Stage 1.8] Kei 결정 {len(new_decisions)}건 추가 (누적 {len(kei_decisions)}건)") for d in new_decisions: logger.info(f"[V-4] {d.get('role','')} → {d.get('action','')}: {d.get('detail','')[:60]}") else: logger.warning(f"[Stage 1.8] round {fit_round+1} Kei 응답 없음 — 루프 종료") break else: logger.info(f"[Stage 1.8] round {fit_round+1}: 재배분으로 해결됨") break # Step 4: 보강 제안 분석 (fit_analysis가 있을 때만) if fit_analysis: enhancements = analyze_enhancements( topics=[t.model_dump() for t in context.topics], page_structure=context.page_structure.roles, references=refs_dict, analysis=fit_analysis, normalized=normalized, core_message=core_message, ) else: enhancements = EnhancementAnalysis() logger.info("[Stage 1.8] overflow 없음 — 보강 분석 스킵") # Step 5: Kei에게 보강 제안 확인 요청 if enhancements.enhancements: from src.kei_client import call_kei_enhancement_review report = build_enhancement_report(enhancements) logger.info(f"[Stage 1.8] Kei 보강 검토 요청: {len(enhancements.enhancements)}건") kei_review = await call_kei_enhancement_review( enhancement_report=report, topics=[t.model_dump() for t in context.topics], core_message=core_message, ) # Kei 결정 반영: reject된 것 제거, modify된 것 수정 if kei_review and kei_review.get("decisions"): for decision in kei_review["decisions"]: action = decision.get("action", "approve") d_type = decision.get("type", "") d_role = decision.get("role", "") modification = decision.get("modification", "") if action == "reject": # 해당 enhancement 제거 enhancements.enhancements = [ e for e in enhancements.enhancements if not (e.type == d_type and e.role == d_role) ] logger.info(f"[V-5] {d_role}/{d_type} → 거부됨") elif action == "modify" and modification: # 해당 enhancement 수정 for e in enhancements.enhancements: if e.type == d_type and e.role == d_role: e.description = modification logger.info(f"[V-5] {d_role}/{d_type} → 수정: {modification[:50]}") else: logger.info(f"[V-5] {d_role}/{d_type} → 승인") # Step 6: 승인된 보강 적용 + fit 재검증 enhancements = apply_enhancements(enhancements, fit_analysis) # V-10: Kei가 문맥 기반으로 bold 키워드 판단 from src.kei_client import call_kei_bold_keywords kei_bold = await call_kei_bold_keywords( topics=[t.model_dump() for t in context.topics], page_structure=context.page_structure.roles, ) if kei_bold: enhancements.bold_keywords = kei_bold logger.info(f"[V-10] Kei bold 키워드: {kei_bold}") logger.info(f"[Stage 1.8] 보강 확정: 보충블록 {len(enhancements.supplement_blocks)}개, " f"강조 {len(enhancements.emphasis_blocks)}개, " f"bold {len(enhancements.bold_keywords)}개 역할") # 재배분된 컨테이너 크기 업데이트 updated_containers = {} for role, ci in context.containers.items(): if fit_analysis and fit_analysis.redistribution: new_h = fit_analysis.redistribution.get(role, ci.height_px) else: new_h = ci.height_px updated_containers[role] = ci.model_copy(update={ "height_px": int(new_h), }) # Step 7: 세부 컨테이너 배치 계산 sub_layouts = {} fit_roles = fit_analysis.roles if fit_analysis else {} for role, rf in fit_roles.items(): new_h = fit_analysis.redistribution.get(role, rf.allocated_px) if fit_analysis and fit_analysis.redistribution else rf.allocated_px ci = context.containers.get(role) if not ci or not rf.topic_fits: continue layout = calculate_sub_layout( role=role, main_height_px=int(new_h), main_width_px=ci.width_px, topic_fits=rf.topic_fits, enhancements=enhancements, font_hierarchy=font_h, ) sub_layouts[role] = { "main_height_px": layout.main_height_px, "main_width_px": layout.main_width_px, "sub_containers": [ {"name": sc.name, "width_px": sc.width_px, "height_px": sc.height_px, "align": sc.align} for sc in layout.sub_containers ], "table_rows": layout.table_rows, } # V'-2: 팝업 원본을 Kei가 공간에 맞게 요약 popup_summaries = {} popups = context.normalized.popups or [] if popups: from src.kei_client import call_kei_summarize_popup for role, role_sub in sub_layouts.items(): role_scs = role_sub.get("sub_containers", []) text_sc = next((sc for sc in role_scs if sc["name"] in ("text_and_table", "text")), None) if not text_sc: continue # 텍스트 줄 수 계산 role_font_map = {"배경": "bg", "본심": "core", "첨부": "sidebar", "결론": "key_msg"} fk = role_font_map.get(role, "core") fs = font_h.get(fk, 12) # structured_text에서 팝업 마커 찾기 ps_info = context.page_structure.roles.get(role, {}) tids = ps_info.get("topic_ids", []) if isinstance(ps_info, dict) else [] topic_map = {t.id: t for t in context.topics} for tid in tids: topic = topic_map.get(tid) if not topic: continue st_text = topic.structured_text or topic.source_data or "" import re as _re popup_refs = _re.findall(r'\[팝업:\s*([^\]]+)\]', st_text) for pr in popup_refs: # 팝업 원본 찾기 popup = next((p for p in popups if pr in (p.title if hasattr(p, 'title') else p.get("title", ""))), None) if not popup: continue # 공란 계산: V'-4 적용 후 높이 (결론 바로 위까지 채움) from src.fit_verifier import _load_design_tokens as _ldt_v2 _v2_tokens = _ldt_v2() _v2_slide_h = _v2_tokens.get("slide_height", 720) _v2_pad = _v2_tokens["spacing_page"] _v2_header_h = _v2_tokens.get("header_height", 66) _v2_gap = _v2_tokens["spacing_block"] _v2_gap_small = _v2_tokens["spacing_small"] _v2_concl_ci = updated_containers.get("결론") or next((ci for r, ci in updated_containers.items() if ci.zone == "footer"), None) _v2_concl_h = _v2_concl_ci.height_px if _v2_concl_ci else 53 _v2_ft_top = _v2_slide_h - _v2_pad - _v2_concl_h - _v2_gap _v2_column_bottom = _v2_ft_top - _v2_gap _v2_bg_ci = next((ci for r, ci in updated_containers.items() if ci.zone == "body" and r != role), None) _v2_bg_h = _v2_bg_ci.height_px if _v2_bg_ci else 0 _v2_core_top = _v2_pad + _v2_header_h + _v2_gap + _v2_bg_h + _v2_gap_small after_h = _v2_column_bottom - _v2_core_top # 결론 위까지의 실제 본심 높이 keymsg_h = 0 for sc in role_scs: if sc.get("name") == "keymsg": keymsg_h = float(sc.get("height_px", 0)) title_h = (fs + 1) * 1.5 + 4 # 이미지 높이: 실제 비율로 계산 svg_sc = next((sc for sc in role_scs if sc.get("name") == "svg"), None) img_h = 0 if svg_sc: svg_w = float(svg_sc.get("width_px", 200)) img_ratio = next((img.get("ratio", 1) for img in (context.slide_images or []) if img.get("b64")), 1) img_h = svg_w / img_ratio if img_ratio > 0 else float(svg_sc.get("height_px", 0)) text_lines = len([l for l in st_text.split("\n") if l.strip() and not l.strip().startswith("[팝업:") and not l.strip().startswith("[이미지:") and not l.strip().lstrip("• ").startswith("출처:")]) text_h_used = text_lines * fs * 1.55 # 표 공간 = 전체 - 제목 - max(이미지,텍스트) - keymsg - padding upper_h = max(img_h, text_h_used) available_h = after_h - 16 - title_h - upper_h - keymsg_h - 8 available_w = float(text_sc["width_px"]) if available_h < fs * 3: continue # 공간 부족하면 건너뜀 summary = await call_kei_summarize_popup( popup_title=pr, popup_content=popup.content if hasattr(popup, 'content') else popup.get("content", ""), available_width_px=available_w, available_height_px=available_h, font_size=fs, ) if summary: popup_summaries[pr] = summary logger.info(f"[V'-2] {pr}: format={summary.get('format')}") # X'-6: 본문 표 요약 (유형 B — normalized.tables가 있으면) table_summaries = {} norm_tables = context.normalized.tables or [] if norm_tables and context.analysis.layout_template in ("B", "B'", "B''"): from src.kei_client import call_kei_summarize_popup for ti, table_data in enumerate(norm_tables): headers = table_data.get("headers", []) rows = table_data.get("rows", []) if not headers or not rows: continue # 표를 마크다운 형태로 변환하여 Kei에게 전달 md_table = "| " + " | ".join(headers) + " |\n" md_table += "| " + " | ".join(["---"] * len(headers)) + " |\n" for row in rows: md_table += "| " + " | ".join(str(c) for c in row) + " |\n" # 하단 우측 공간 계산 bottom_roles = [r for r, ci in updated_containers.items() if ci.zone in ("bottom_left", "bottom_right")] if bottom_roles: br_ci = next((ci for r, ci in updated_containers.items() if ci.zone == "bottom_right"), None) if br_ci: available_h = br_ci.height_px - 30 # 제목 + padding available_w = br_ci.width_px fs = font_h.get("core", 12) summary = await call_kei_summarize_popup( popup_title=f"본문표{ti+1}", popup_content=md_table, available_width_px=available_w, available_height_px=available_h, font_size=fs, ) if summary: table_summaries[f"table_{ti}"] = summary logger.info(f"[X'-6] 본문표{ti+1}: format={summary.get('format')}") # 결과를 context에 저장 (Stage 2에서 사용) return { "containers": updated_containers, "sub_layouts": sub_layouts, "measurement": filled_measurement, "font_scale": font_scale, # Phase Y: fit 루프에서 확정된 font 축소 비율 "fit_result": { "roles": { role: { "fit_status": rf.fit_status, "total_required_px": rf.total_required_px, "allocated_px": rf.allocated_px, "shortfall_px": rf.shortfall_px, } for role, rf in (fit_analysis.roles.items() if fit_analysis else {}.items()) }, "redistribution": fit_analysis.redistribution if fit_analysis else {}, "needs_escalation": fit_analysis.needs_escalation if fit_analysis else False, }, "enhancement_result": { "kei_decisions": kei_decisions, "subordinate_treatments": [ {"role": e.role, "detail": e.detail} for e in enhancements.enhancements if e.type == "subordinate" ], "supplement_blocks": [ {"role": sb.role, "block_id": sb.block_id, "content_source": sb.content_source} for sb in enhancements.supplement_blocks ], "emphasis_blocks": enhancements.emphasis_blocks, "bold_keywords": enhancements.bold_keywords, "popup_summaries": popup_summaries, "table_summaries": table_summaries, }, } ctx = await run_stage(stage_1_8, ctx, "stage_1_8", max_retries=0) # ── Stage 1.5b: 디자인 예산 재계산 (블록 선택 후) ── async def stage_1_5b(context: PipelineContext) -> dict: from src.space_allocator import calculate_design_budget updated_containers = {} for role, ci in context.containers.items(): # V-1: 꼭지별 블록 리스트 → 첫 번째 블록의 schema를 대표로 사용 ref_list = context.references.get(role, []) schema_info = ref_list[0].schema_info if ref_list else {} font_size = getattr(context.font_hierarchy, { "본심": "core", "배경": "bg", "첨부": "sidebar", "결론": "core" }.get(role, "core"), 12.0) budget = calculate_design_budget( container_height_px=ci.height_px, container_width_px=ci.width_px, block_schema=schema_info, font_size=font_size, ) updated_ci = ci.model_copy(update={ "design_budget": DesignBudget( available_height_px=budget["available_height_px"], available_width_px=budget["available_width_px"], max_circle_diameter=budget["max_circle_diameter"], max_img_width=budget["max_img_width"], max_img_height=budget["max_img_height"], fits=budget["fits"], ), }) updated_containers[role] = updated_ci if not budget["fits"]: logger.warning( f"[T-6] {role}: 디자인 예산 음수 — " f"text={budget['text_height_px']}px > container={ci.height_px}px" ) return {"containers": updated_containers} ctx = await run_stage(stage_1_5b, ctx, "stage_1_5b", max_retries=0) # ── Stage 2: HTML 생성 (Claude Sonnet) ── yield {"event": "progress", "data": "3/7 슬라이드 HTML 생성 중..."} async def stage_2(context: PipelineContext) -> dict: # Phase X-BX': Type B는 code_assembled 직접 사용, Sonnet 재구성 스킵 if context.analysis.layout_template in ("B", "B'", "B''"): from src.block_assembler import assemble_slide_html_final fs = context.font_scale if hasattr(context, 'font_scale') else 1.0 generated = assemble_slide_html_final(context, font_scale=fs) logger.info(f"[Stage 2] Type B: slide-base + 블록 (font_scale={fs:.1f})") return {"generated_html": generated} # Type A: 기존 Sonnet 재구성 코드 그대로 from src.content_verifier import generate_with_retry # PipelineContext → 기존 함수 인터페이스로 변환 analysis_dict = { "topics": [t.model_dump() for t in context.topics], "page_structure": context.page_structure.roles, "core_message": context.analysis.core_message, "title": context.analysis.title, "total_pages": context.analysis.total_pages, "image_sizes": context.analysis.image_sizes, } container_specs_dict = {} for role, ci in context.containers.items(): from src.space_allocator import ContainerSpec as LegacyContainerSpec container_specs_dict[role] = LegacyContainerSpec( role=ci.role, zone=ci.zone, topic_ids=ci.topic_ids, weight=ci.weight, height_px=ci.height_px, width_px=ci.width_px, max_height_cost=ci.max_height_cost, block_constraints=ci.block_constraints, ) # T-7 + Phase V: context를 generate_slide_html에 전달 phase_t_context = { "font_hierarchy": context.font_hierarchy.model_dump(), "container_ratio": context.container_ratio, "references": { role: [r.model_dump() for r in ref_list] for role, ref_list in context.references.items() }, "design_budgets": { role: ci.design_budget.model_dump() if ci.design_budget else {} for role, ci in context.containers.items() }, # Phase V: Stage 1.8 결과 "sub_layouts": context.sub_layouts, "fit_result": context.fit_result, "enhancements": context.enhancement_result, } analysis_dict["phase_t"] = phase_t_context generated, verification = await generate_with_retry( content=context.raw_content, analysis=analysis_dict, container_specs=container_specs_dict, preset=context.preset, images=context.slide_images, ) return {"generated_html": generated} ctx = await run_stage(stage_2, ctx, "stage_2", max_retries=0) # ── Stage 3: 렌더링 조립 ── yield {"event": "progress", "data": "4/7 슬라이드 조립 중..."} async def stage_3(context: PipelineContext) -> dict: # Phase X-BX': Type B는 Stage 2에서 이미 완전한 HTML → renderer 스킵 if context.analysis.layout_template in ("B", "B'", "B''"): logger.info("[Stage 3] Type B: renderer 스킵 (generated_html 직접 사용)") return {"rendered_html": context.generated_html} # Type A: 기존 renderer 코드 그대로 from src.renderer import render_slide_from_html analysis_dict = { "topics": [t.model_dump() for t in context.topics], "page_structure": context.page_structure.roles, "core_message": context.analysis.core_message, "title": context.analysis.title, "_container_ratio": list(context.container_ratio), "_font_hierarchy": context.font_hierarchy.model_dump(), "_containers": { role: {"height_px": ci.height_px, "width_px": ci.width_px, "zone": ci.zone} for role, ci in context.containers.items() }, "_fit_redistribution": context.fit_result.get("redistribution", {}) if context.fit_result else {}, } html = render_slide_from_html(context.generated_html, analysis_dict, context.preset) return {"rendered_html": html} ctx = await run_stage(stage_3, ctx, "stage_3", max_retries=0) # ── Stage 4: 측정 + 품질 게이트 ── yield {"event": "progress", "data": "5/7 품질 검증 중..."} async def stage_4(context: PipelineContext) -> dict: from src.kei_client import vision_quality_gate # L4: Selenium 실측 measurement = await asyncio.to_thread( measure_rendered_heights, context.rendered_html ) has_overflow = False for zone_name, zone_data in measurement.get("zones", {}).items(): if zone_data.get("overflowed"): has_overflow = True excess = zone_data.get("excess_px", 0) logger.warning(f"[L4] {zone_name}: overflow +{excess}px") if has_overflow: logger.warning("[L4] overflow 감지됨 — 결과물에 경고 포함하여 진행") else: logger.info("[L4] overflow 없음") # L5: 스크린샷 + 비전 품질 게이트 screenshot_b64 = await asyncio.to_thread( capture_slide_screenshot, context.rendered_html ) quality_score = -1 # 비전 미평가 시 -1 (거짓 100점 방지) if screenshot_b64: analysis_dict = { "topics": [t.model_dump() for t in context.topics], "core_message": context.analysis.core_message, } quality_result = await vision_quality_gate(screenshot_b64, analysis_dict) if quality_result: quality_score = quality_result.get("score", 0) if quality_score < 30: return {"_errors": [{ "severity": "FATAL", "localization": f"품질 {quality_score}/100 < 30", "instruction": "출력 차단", }]} else: logger.warning("[Stage 4] 비전 품질 평가 실패 — quality_score=-1 (미평가)") return { "measurement": measurement, "quality_score": quality_score, "screenshot_b64": screenshot_b64 or "", } ctx = await run_stage(stage_4, ctx, "stage_4", max_retries=0) # ── Stage 5: 검증 통과 확인 → final.html 출력 ── yield {"event": "progress", "data": "6/7 최종 처리 중..."} # 검증 결과 확인 measurement = ctx.measurement or {} has_overflow = any( zd.get("overflowed", False) for zd in measurement.get("zones", {}).values() ) quality = ctx.quality_score if hasattr(ctx, "quality_score") else 100 if has_overflow: overflow_zones = [ f'{zn}(+{zd.get("excess_px", 0)}px)' for zn, zd in measurement.get("zones", {}).items() if zd.get("overflowed") ] logger.warning(f"[Stage 5] overflow 감지: {overflow_zones} — 결과물에 경고 포함") if quality < 0: # 비전 미평가: 차단하지 않고 경고만. Selenium overflow 검사는 통과한 상태. logger.warning(f"[Stage 5] 비전 미평가 (quality={quality}) — Selenium 측정만으로 통과") elif quality < 30: logger.error(f"[Stage 5] 품질 {quality}/100 < 30 — 출력 차단") yield {"event": "error", "data": f"품질 검증 미달 ({quality}/100). 출력 차단."} return html = ctx.rendered_html if ctx.base_path: html = embed_images(html, ctx.base_path) ctx = ctx.model_copy(update={"rendered_html": html}) # Stage 5: popup_file 확정 (save_snapshot 전에 완료) run_dir = ctx.get_run_dir() run_dir.mkdir(parents=True, exist_ok=True) popups = ctx.normalized.popups if popups: updated_popups = [] for i, popup in enumerate(popups, 1): popup_title = popup.title popup_content = popup.content pid = popup.popup_id or f"popup_{i}" safe_title = "".join(c for c in popup_title if c.isalnum() or c in "_ -").strip() popup_filename = f"첨부{i}_{safe_title}.html" # popup_file 확정 → 새 PopupItem으로 (Pydantic immutable 대응) updated_popups.append(popup.model_copy(update={"popup_file": popup_filename})) ctx = ctx.model_copy(update={ "normalized": ctx.normalized.model_copy(update={"popups": updated_popups}), }) popups = ctx.normalized.popups # 업데이트된 참조 ctx.save_snapshot("final") # stage_4 검증판을 final 시점 context로 재생성 (popup_file 등 반영) from src.step_visualizer import generate_step_html try: generate_step_html(ctx, "stage_4") except Exception as e: logger.warning(f"[Stage 5] stage_4 재생성 실패: {e}") # final.html 저장 (run_dir / "final.html").write_text(html, encoding="utf-8") # Phase T: 팝업(상세 내용)을 별도 HTML로 분리 저장 if popups: for i, popup in enumerate(popups, 1): popup_title = popup.title popup_content = popup.content popup_filename = popup.popup_file or f"첨부{i}.html" # TP-6: 첨부 HTML에 디자인 토큰 적용 import re as _re # JSX style={{}} 잔여 정리 clean_content = _re.sub(r'style=\{\{[^}]*\}\}', '', popup_content) clean_content = clean_content.replace('
', '
') # markdown bold → HTML bold clean_content = _re.sub(r'\*\*(.+?)\*\*', r'\1', clean_content) # 콘텐츠 유형 감지: 테이블 vs 리스트 has_table = " {popup_title} — 첨부 자료

{popup_title}

첨부 자료 {i} — 슬라이드 본문의 상세 내용
{clean_content}
본 자료는 슬라이드 "{ctx.analysis.title}"의 첨부 자료입니다.
""" (run_dir / popup_filename).write_text(popup_html, encoding="utf-8") logger.info(f"[Phase T] 첨부 HTML 저장: {popup_filename}") yield {"event": "result", "data": html} logger.info(f"슬라이드 생성 완료: run={ctx.run_id}") except StageFailure as e: logger.error(f"파이프라인 중단: {e.stage_name} — {e.errors}") yield {"event": "error", "data": f"Stage {e.stage_name} 실패: {e.errors}"} except Exception as e: logger.exception(f"파이프라인 오류: {e}") yield {"event": "error", "data": str(e)} # ══════════════════════════════════════ # 레거시 파이프라인 (Phase S) — 보존 # ══════════════════════════════════════ async def _retry_kei(fn, *args, **kwargs): """Kei API 호출을 성공할 때까지 재시도한다. Kei API는 필수 인프라. fallback 없음. 최대 30회 또는 300초까지 재시도 후 TimeoutError. """ import asyncio attempt = 0 start_time = time.time() while attempt < KEI_MAX_RETRY_ATTEMPTS: attempt += 1 elapsed = time.time() - start_time if elapsed > KEI_MAX_RETRY_DURATION: raise TimeoutError( f"Kei API 타임아웃: {fn.__name__} — " f"{elapsed:.0f}초 경과 (제한 {KEI_MAX_RETRY_DURATION}초)" ) result = await fn(*args, **kwargs) if result is not None: if attempt > 1: logger.info(f"[Kei 재시도] {fn.__name__} 성공 ({attempt}번째 시도)") return result logger.warning( f"[Kei 재시도] {fn.__name__} 실패 (시도 {attempt}/{KEI_MAX_RETRY_ATTEMPTS}). " f"{KEI_RETRY_INTERVAL}초 후 재시도..." ) await asyncio.sleep(KEI_RETRY_INTERVAL) raise RuntimeError( f"Kei API 최대 재시도 초과: {fn.__name__} — {attempt}회 시도" ) def _save_step(run_dir: Path, filename: str, data: Any) -> None: """스텝 결과를 JSON 또는 HTML로 저장한다. (K-1)""" run_dir.mkdir(parents=True, exist_ok=True) filepath = run_dir / filename if filename.endswith(".html"): filepath.write_text(data, encoding="utf-8") else: with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"[중간 산출물] {filename} 저장 → {run_dir.name}/") async def generate_slide_legacy( content: str, manual_layout: dict[str, Any] | None = None, base_path: str = "", ) -> AsyncIterator[dict[str, str]]: """Phase S 레거시 파이프라인. Phase T 전환 전 보존용. Yields: SSE 이벤트: progress / result / error """ # K-1: 중간 산출물 저장용 디렉토리 run_id = str(int(time.time() * 1000)) run_dir = Path("data/runs") / run_id try: # 1단계: Kei 실장 — 꼭지 추출 + 분석 yield {"event": "progress", "data": "1/5 Kei 실장이 꼭지를 추출 중..."} if manual_layout: analysis = manual_layout else: analysis = await _retry_kei(classify_content, content) # _retry_kei는 무한 재시도. None이 올 수 없다. topic_count = len(analysis.get("topics", [])) page_count = analysis.get("total_pages", 1) logger.info(f"1단계-A 완료: {topic_count}개 꼭지, {page_count}페이지") _save_step(run_dir, "step1_analysis.json", analysis) # 1단계-B: 각 꼭지 컨셉 구체화 yield {"event": "progress", "data": "1.5/5 Kei 실장이 각 꼭지의 컨셉을 구체화 중..."} analysis = await refine_concepts(content, analysis) logger.info("1단계-B 완료: 컨셉 구체화") _save_step(run_dir, "step1b_concepts.json", { "concepts": [ {k: t.get(k) for k in ("id", "title", "purpose", "relation_type", "expression_hint", "source_data")} for t in analysis.get("topics", []) ] }) # I-6: 슬라이드 제목 ↔ 첫 꼭지 제목 중복 검증 from difflib import SequenceMatcher title = analysis.get("title", "") topics = analysis.get("topics", []) if topics: first_title = topics[0].get("title", "") similarity = SequenceMatcher(None, title, first_title).ratio() if similarity > 0.7: purpose = topics[0].get("purpose", "문제제기") topics[0]["title"] = f"{purpose}: {topics[0].get('summary', '')[:30]}" logger.warning( f"[제목 중복 교정] 유사도 {similarity:.0%} → 첫 꼭지 제목 변경" ) # 이미지 크기 측정 (base_path 있을 때만) image_sizes = get_image_sizes(content, base_path) if image_sizes: analysis["image_sizes"] = image_sizes logger.info(f"이미지 측정: {len(image_sizes)}개") # ★ Phase O-1: 컨테이너 스펙 계산 (Kei 비중 → px 확정) preset_name = select_preset(analysis) preset = LAYOUT_PRESETS.get(preset_name, {}) page_struct = analysis.get("page_structure", {}) container_specs = calculate_container_specs( page_structure=page_struct, topics=analysis.get("topics", []), preset=preset, slide_width=settings.slide_width, slide_height=settings.slide_height, ) _save_step(run_dir, "step1c_containers.json", { role: { "height_px": spec.height_px, "width_px": spec.width_px, "max_height_cost": spec.max_height_cost, "topic_ids": spec.topic_ids, "weight": spec.weight, "block_constraints": spec.block_constraints, } for role, spec in container_specs.items() }) # ★ Phase S: Claude Sonnet이 HTML 직접 생성 # 블록 선택 없음. 슬롯 채우기 없음. AI가 콘텐츠에 맞는 HTML 구조를 직접 만든다. yield {"event": "progress", "data": "2/4 슬라이드 HTML 생성 중..."} from src.content_verifier import generate_with_retry from src.renderer import render_slide_from_html from src.kei_client import vision_quality_gate import asyncio topics = analysis.get("topics", []) # 이미지 정보 구성 (base64 포함) slide_images = [] if image_sizes: import base64 as b64_mod for img_key, img_info in image_sizes.items(): img_path = Path(base_path) / img_key if base_path else Path(img_key) img_b64 = "" if img_path.exists(): img_b64 = b64_mod.b64encode(img_path.read_bytes()).decode() slide_images.append({ "path": str(img_path), "width": img_info.get("width", 0), "height": img_info.get("height", 0), "ratio": round(img_info.get("width", 1) / max(1, img_info.get("height", 1)), 2), "topic_id": img_info.get("topic_id"), "b64": img_b64, }) # Claude Sonnet HTML 생성 + 독립 검증 + 재시도 루프 generated, verification = await generate_with_retry( content=content, analysis=analysis, container_specs=container_specs, preset=preset, images=slide_images, ) _save_step(run_dir, "step2b_verification.json", { area: {"passed": r.passed, "score": r.score, "errors": r.errors} for area, r in verification.items() }) _save_step(run_dir, "step2_generated.json", { "body_html_length": len(generated.get("body_html", "")), "sidebar_html_length": len(generated.get("sidebar_html", "")), "footer_html_length": len(generated.get("footer_html", "")), "reasoning": generated.get("reasoning", ""), }) logger.info( f"[Phase S] HTML 생성 완료: body={len(generated.get('body_html', ''))}자, " f"sidebar={len(generated.get('sidebar_html', ''))}자, " f"footer={len(generated.get('footer_html', ''))}자" ) # 3단계: 렌더링 — AI 생성 HTML을 슬라이드 프레임에 삽입 yield {"event": "progress", "data": "3/4 슬라이드 조립 중..."} html = render_slide_from_html(generated, analysis, preset) logger.info("[Phase S] 슬라이드 조립 완료") _save_step(run_dir, "step3_rendered.html", html) # ★ Phase Q: 검증 렌더링 + 수학적 조정 + 비전 품질 게이트 measurement = await asyncio.to_thread(measure_rendered_heights, html) _save_step(run_dir, "step4_measurement.json", measurement) # Phase S: overflow 감지 has_overflow = False for zone_name, zone_data in measurement.get("zones", {}).items(): if zone_data.get("overflowed"): has_overflow = True logger.warning(f"[Phase S] {zone_name}: overflow +{zone_data.get('excess_px', 0)}px") if has_overflow: logger.warning("[Phase S] overflow 감지 — 결과물에 반영 (후속 품질 게이트에서 평가)") else: logger.info("[Phase S] overflow 없음") # Phase S: 비전 모델 품질 게이트 yield {"event": "progress", "data": "4/4 품질 검증 중..."} screenshot_b64 = await asyncio.to_thread(capture_slide_screenshot, html) quality_result = None if screenshot_b64: _save_step(run_dir, "step5_screenshot.txt", f"base64 PNG, {len(screenshot_b64)} chars") quality_result = await vision_quality_gate(screenshot_b64, analysis) if quality_result: _save_step(run_dir, "step5_quality_gate.json", quality_result) if not quality_result.get("passed", True): score = quality_result.get("score", 0) issues = quality_result.get("issues", []) logger.warning(f"[Q-6] 품질 게이트 FAIL: {score}/100 — {issues}") # Q-8: 심각한 품질 문제 시 출력 차단 if score < 30: logger.error(f"[Q-8] 출력 차단: 품질 {score}/100 < 30 최소 기준") yield {"event": "error", "data": f"슬라이드 품질 미달 ({score}/100). 재시도해 주세요."} return else: logger.info(f"[Q-6] 품질 게이트 PASS: {quality_result.get('score', 0)}/100") else: logger.warning("[Q-6] 스크린샷 캡처 실패 — 품질 게이트 스킵") # D-5: 이미지를 base64로 삽입 (다운로드 HTML에서도 보이도록) if base_path: html = embed_images(html, base_path) logger.info("이미지 base64 삽입 완료") _save_step(run_dir, "final.html", html) yield {"event": "result", "data": html} logger.info(f"슬라이드 생성 완료: run={run_id}") except Exception as e: logger.exception(f"파이프라인 오류: {e}") yield {"event": "error", "data": str(e)} async def _adjust_design( layout_concept: dict[str, Any], analysis: dict[str, Any], ) -> dict[str, Any]: """4단계 전반: 디자인 실무자가 텍스트 양에 맞게 CSS를 조정한다. 각 area별 블록 수, 텍스트 총량, zone 예산을 계산하고, Sonnet이 area별 CSS 변수 override를 결정한다. 블록 템플릿이 이미 CSS 변수(var(--font-body) 등)를 사용하므로, area div에서 변수를 override하면 내부 블록이 자동 조정된다. """ try: client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key) # 프리셋 정보 가져오기 preset_name = select_preset(analysis) preset = LAYOUT_PRESETS.get(preset_name, {}) zones = preset.get("zones", {}) for page in layout_concept.get("pages", []): # area별 블록 수 + 텍스트 총량 집계 area_info = {} for block in page.get("blocks", []): area = block.get("area", "body") if area not in area_info: zone = zones.get(area, {}) area_info[area] = { "block_count": 0, "total_chars": 0, "budget_px": zone.get("budget_px", 0), "width_pct": zone.get("width_pct", 100), "block_types": [], } data = block.get("data", {}) text_len = len(json.dumps(data, ensure_ascii=False)) area_info[area]["block_count"] += 1 area_info[area]["total_chars"] += text_len area_info[area]["block_types"].append(block.get("type", "")) # area 정보 텍스트 구성 area_lines = [] for area_name, info in area_info.items(): area_lines.append( f"- {area_name} (예산 {info['budget_px']}px, 너비 {info['width_pct']}%): " f"{info['block_count']}개 블록, 총 {info['total_chars']}자\n" f" 블록 타입: {', '.join(info['block_types'])}" ) system = ( "당신은 디자인 실무자이다. 편집자가 정리한 텍스트가 각 영역에 잘 들어가도록 CSS를 조정한다.\n\n" "## 원칙\n" "- 텍스트를 자르지 않는다. 디자인이 텍스트에 맞춘다.\n" "- 빈 공간을 방치하지 않는다.\n" "- 텍스트가 많으면: 폰트/여백을 줄여서 맞춘다.\n" "- 텍스트가 적으면: 폰트/여백을 늘려서 채운다.\n\n" "## 조정 가능한 CSS 변수\n" "- --font-body (기본 0.95rem): 본문 폰트 크기\n" "- --font-subtitle (기본 1.25rem): 소제목 폰트 크기\n" "- --font-caption (기본 0.8rem): 캡션 폰트 크기\n" "- --spacing-inner (기본 16px): 블록 내부 여백\n" "- --spacing-block (기본 20px): 블록 간 간격\n" "- --spacing-small (기본 8px): 작은 여백\n\n" "## 출력 형식 (JSON만. 설명 없이.)\n" "각 area에 적용할 CSS 변수 override를 inline style 문자열로 반환.\n" "조정 불필요한 area는 빈 문자열.\n" '{"area_styles": {"body": "--font-body: 0.85rem; --spacing-inner: 10px;", "sidebar": "", "footer": ""}}' ) user_prompt = ( f"## 각 영역 현황\n" + "\n".join(area_lines) + f"\n\n위 영역별로 CSS 변수 조정이 필요한지 판단하여 JSON으로 반환해줘." ) response = await client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1024, system=system, messages=[{"role": "user", "content": user_prompt}], ) result_text = response.content[0].text result = _parse_json(result_text) if result and "area_styles" in result: page["area_styles"] = result["area_styles"] logger.info( f"디자인 조정: {', '.join(f'{k}={bool(v)}' for k, v in result['area_styles'].items())}" ) else: page["area_styles"] = {} logger.info("디자인 조정: 조정 불필요 또는 파싱 실패") except Exception as e: logger.warning(f"디자인 조정 실패 (기존 스타일로 렌더링): {e}") # 실패 시 area_styles 없음 → 기존과 동일하게 렌더링 for page in layout_concept.get("pages", []): if "area_styles" not in page: page["area_styles"] = {} return layout_concept async def _review_balance( html: str, layout_concept: dict[str, Any], content: str, analysis: dict[str, Any] | None = None, measurement_text: str = "", screenshot_b64: str | None = None, ) -> dict[str, Any] | None: """5단계: Kei 실장이 조립 결과를 최종 검수한다. (J-7 + Phase L) Kei가 콘텐츠 관점 + 실제 렌더링 측정 결과 기반으로 검수: - 핵심 메시지 전달 여부 - 콘텐츠 흐름 ↔ 블록 배치 일치 - 실제 px 기반 높이/비중 검증 (Phase L) - 중요 내용 누락/축소 여부 """ try: # 블록별 텍스트 양 요약 block_summary = [] for page in layout_concept.get("pages", []): for block in page.get("blocks", []): data = block.get("data", {}) text_len = len(json.dumps(data, ensure_ascii=False)) block_summary.append( f" {block.get('area')}/{block.get('type')}: " f"데이터 {text_len}자" ) # zone 예산 정보 (analysis에서 프리셋 추출) zone_budget_text = "" overflow_hint_text = "" if analysis: preset_name = select_preset(analysis) preset = LAYOUT_PRESETS.get(preset_name, {}) zone_lines = [ f"- {name}: ~{z['budget_px']}px (너비 {z['width_pct']}%)" for name, z in preset.get("zones", {}).items() ] zone_budget_text = ( "\n\n## zone별 높이 예산\n" + "\n".join(zone_lines) ) # Stage 2에서 감지한 예상 overflow 힌트 overflow_hint = layout_concept.get("overflow", []) if overflow_hint: hint_lines = [ f"- {o['area']}: 예상 {o['total_px']}px > 예산 {o['budget_px']}px " f"(+{o['overflow_px']}px 초과)" for o in overflow_hint ] overflow_hint_text = ( "\n\n## 높이 초과 힌트 (2단계 예상치, 참고용)\n" + "\n".join(hint_lines) ) # Phase L: 렌더링 측정 결과를 overflow_hint에 추가 (실제 px 기반) if measurement_text: overflow_hint_text += f"\n\n{measurement_text}" # Kei로 최종 검수 (레거시 — Phase S에서는 메인 흐름에서 미사용) from src.kei_client import call_kei_final_review return await call_kei_final_review( html, block_summary, zone_budget_text, overflow_hint_text, analysis, screenshot_b64=screenshot_b64, ) except Exception as e: logger.warning(f"재검토 실패: {e}") return None async def _apply_adjustments( layout_concept: dict[str, Any], review: dict[str, Any], content: str, ) -> dict[str, Any]: """재검토 결과에 따라 텍스트를 재편집한다.""" adjustments = review.get("adjustments", []) if not adjustments: return layout_concept # 조정이 필요한 블록만 재편집 for adj in adjustments: area = adj.get("block_area", "") action = adj.get("action", "") ratio = adj.get("target_ratio") detail = adj.get("detail", "") for page in layout_concept.get("pages", []): for block in page.get("blocks", []): if block.get("area") != area: continue if action == "expand" and ratio: for key in block.get("char_guide", {}): block["char_guide"][key] = int( block["char_guide"][key] * ratio ) logger.info(f"조정: {area} → expand ×{ratio} ({detail})") elif action == "shrink" and ratio: for key in block.get("char_guide", {}): block["char_guide"][key] = int( block["char_guide"][key] * ratio ) logger.info(f"조정: {area} → shrink ×{ratio} ({detail})") elif action == "rewrite": if "data" in block: del block["data"] block["reason"] = f"재작성: {detail}" logger.info(f"조정: {area} → rewrite ({detail})") elif action == "kei_trim": max_chars = adj.get("max_chars", 200) if "char_guide" not in block: block["char_guide"] = {} for key in block.get("char_guide", {}): block["char_guide"][key] = min( block["char_guide"][key], max_chars ) if not block["char_guide"]: block["char_guide"] = {"text": max_chars} logger.info( f"조정: {area} → kei_trim max_chars={max_chars} " f"({detail})" ) elif action == "kei_restructure": block["detail_target"] = True if "data" in block: del block["data"] block["reason"] = f"재구성: {detail}" logger.info( f"조정: {area} → kei_restructure (detail_target)" ) # 조정된 가이드로 재편집 (레거시 — Phase S에서는 미사용) from src.content_editor import fill_content layout_concept = await fill_content(content, layout_concept) return layout_concept def _build_overflow_context( layout_concept: dict[str, Any], overflow_adjs: list[dict], ) -> list[dict]: """Sonnet이 감지한 overflow_detected를 Kei에게 전달할 형태로 변환한다. 실제 채워진 블록 데이터(텍스트)를 포함하여 Kei가 판단할 수 있도록 한다. """ overflows = [] for adj in overflow_adjs: area = adj.get("block_area", "") # 해당 zone의 블록 정보 + 실제 텍스트 추출 area_blocks = [] for page in layout_concept.get("pages", []): for block in page.get("blocks", []): if block.get("area") == area: data = block.get("data", {}) text_preview = json.dumps(data, ensure_ascii=False)[:300] area_blocks.append({ "type": block.get("type", ""), "purpose": block.get("purpose", ""), "topic_id": block.get("topic_id"), "text_preview": text_preview, }) overflows.append({ "area": area, "detail": adj.get("detail", ""), "blocks": area_blocks, }) return overflows def _convert_kei_judgment( review_result: dict[str, Any], kei_judgment: dict[str, Any], ) -> None: """Kei의 trim/restructure 판단을 review_result.adjustments에 반영한다. 기존 overflow_detected 항목을 kei_trim 또는 kei_restructure로 교체. """ decision = kei_judgment.get("decision", "") new_adjs = [] for adj in review_result.get("adjustments", []): if adj.get("action") == "overflow_detected": # overflow_detected → Kei 판단으로 교체 if decision == "trim": for target in kei_judgment.get("trim_targets", []): new_adjs.append({ "block_area": adj.get("block_area", ""), "action": "kei_trim", "max_chars": target.get("max_chars", 200), "topic_id": target.get("topic_id"), "detail": target.get("reason", ""), }) elif decision == "restructure": for tid in kei_judgment.get("detail_topics", []): new_adjs.append({ "block_area": adj.get("block_area", ""), "action": "kei_restructure", "topic_id": tid, "detail": kei_judgment.get("reason", ""), }) else: # 기존 expand/shrink/rewrite는 그대로 유지 new_adjs.append(adj) review_result["adjustments"] = new_adjs def _parse_json(text: str) -> dict[str, Any] | None: """텍스트에서 JSON을 추출한다.""" patterns = [ r"```json\s*(.*?)```", r"```\s*(.*?)```", r"(\{.*\})", ] for pattern in patterns: match = re.search(pattern, text, re.DOTALL) if match: try: return json.loads(match.group(1).strip()) except json.JSONDecodeError: continue return None