from __future__ import annotations import argparse import base64 import asyncio import json import re import sys from pathlib import Path DESIGN_AGENT_ROOT = Path(r'D:\ad-hoc\kei\design_agent') REPO_ROOT = Path(__file__).resolve().parent.parent LOCAL_TEMPLATES_DIR = REPO_ROOT / 'templates' LOCAL_COMPONENTS_DIR = REPO_ROOT / 'components' DX_COMPONENT_FALLBACK = Path(r'D:\ad-hoc\cel\src\components\dx.astro') if str(DESIGN_AGENT_ROOT) not in sys.path: sys.path.insert(0, str(DESIGN_AGENT_ROOT)) import src.block_reference as block_reference_module from src.block_reference import select_and_generate_references from src.config import settings from src.content_verifier import generate_with_retry import src.design_director as design_director_module import src.html_generator as html_generator from src.design_director import LAYOUT_PRESETS, select_preset from src.image_utils import embed_images, get_image_sizes from src.mdx_normalizer import normalize_mdx_content from src.pipeline_context import ( Analysis, BlockReference, ContainerInfo, DesignBudget, FontHierarchy, NormalizedContent, PageStructure, PipelineContext, Topic, create_context, ) import src.renderer as renderer_module from src.renderer import render_slide_from_html from src.slide_measurer import capture_slide_screenshot, measure_rendered_heights if not hasattr(html_generator, 'SIDEBAR_PROMPT') and hasattr(html_generator, '_LEGACY_SIDEBAR_PROMPT'): html_generator.SIDEBAR_PROMPT = html_generator._LEGACY_SIDEBAR_PROMPT if not hasattr(html_generator, 'FOOTER_PROMPT') and hasattr(html_generator, '_LEGACY_FOOTER_PROMPT'): html_generator.FOOTER_PROMPT = html_generator._LEGACY_FOOTER_PROMPT if LOCAL_TEMPLATES_DIR.exists(): block_reference_module.TEMPLATES_DIR = LOCAL_TEMPLATES_DIR block_reference_module._jinja_env = None renderer_module.TEMPLATES_DIR = LOCAL_TEMPLATES_DIR renderer_module.CATALOG_PATH = LOCAL_TEMPLATES_DIR / 'catalog.yaml' renderer_module._CATALOG_MAP = None renderer_module._CATALOG_VARIANT_MAP = None renderer_module._env = None if hasattr(design_director_module, '_CATALOG_CACHE'): design_director_module._CATALOG_CACHE = None if hasattr(design_director_module, '_BLOCK_IDS_CACHE'): design_director_module._BLOCK_IDS_CACHE = None from src.space_allocator import ( ContainerSpec as LegacyContainerSpec, calculate_container_specs, calculate_design_budget, calculate_dynamic_ratio, calculate_font_hierarchy, ) def _load_json(path: Path) -> dict: return json.loads(path.read_text(encoding='utf-8-sig')) def _write_json(path: Path, data: dict) -> None: path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8') def _load_retry_plan(stage1b_path: Path) -> dict: retry_plan_path = stage1b_path.parent / 'retry-plan.json' if retry_plan_path.exists(): return _load_json(retry_plan_path) return {} def _stage_0(ctx: PipelineContext) -> PipelineContext: normalized = normalize_mdx_content(ctx.raw_content) ctx.normalized = NormalizedContent( clean_text=normalized['clean_text'], title=normalized['title'], images=normalized['images'], popups=normalized['popups'], tables=normalized['tables'], sections=normalized['sections'], ) ctx.save_snapshot('stage_0') return ctx def _stage_1a(ctx: PipelineContext, stage1a: dict) -> PipelineContext: analysis_raw = stage1a['analysis'] ctx.analysis = Analysis( core_message=analysis_raw['core_message'], title=analysis_raw['title'], total_pages=analysis_raw.get('total_pages', 1), ) ctx.page_structure = PageStructure(roles=stage1a['page_structure']) ctx.topics = [Topic(**raw) for raw in stage1a['topics']] ctx.save_snapshot('stage_1a') return ctx def _stage_1b(ctx: PipelineContext, stage1b: dict) -> PipelineContext: refined_map = {item['topic_id']: item for item in stage1b['concepts']} topics = [] for raw in ctx.topics: merged = raw.model_dump() if raw.id in refined_map: refined = dict(refined_map[raw.id]) refined.pop('source_data', None) merged.update(refined) topics.append(Topic(**merged)) ctx.topics = topics ctx.save_snapshot('stage_1b') return ctx def _stage_1_5a(ctx: PipelineContext) -> PipelineContext: image_sizes = get_image_sizes(ctx.raw_content, ctx.base_path) role_text_lengths = {} for role, info in ctx.page_structure.roles.items(): if isinstance(info, dict): role_text_lengths[role] = len(ctx.get_role_content(role)) font_hierarchy_dict = calculate_font_hierarchy(role_text_lengths) ctx.font_hierarchy = FontHierarchy( key_msg=font_hierarchy_dict.get('핵심', 14.0), core=font_hierarchy_dict.get('본심', 12.0), bg=font_hierarchy_dict.get('배경', 11.0), sidebar=font_hierarchy_dict.get('첨부', 10.0), ) ctx.container_ratio = calculate_dynamic_ratio(role_text_lengths, font_hierarchy_dict) analysis_dict = { 'topics': [t.model_dump() for t in ctx.topics], 'page_structure': ctx.page_structure.roles, } preset_name = select_preset(analysis_dict) ctx.preset_name = preset_name ctx.preset = LAYOUT_PRESETS.get(preset_name, {}) if not _is_run001_style_document(ctx, ctx.raw_content or ''): ctx.preset_name = 'sidebar-right' ctx.preset = { **LAYOUT_PRESETS.get('sidebar-right', {}), 'grid_columns': '1fr 0fr', 'zones': { **LAYOUT_PRESETS.get('sidebar-right', {}).get('zones', {}), 'body': {**LAYOUT_PRESETS.get('sidebar-right', {}).get('zones', {}).get('body', {}), 'width_pct': 100}, 'sidebar': {**LAYOUT_PRESETS.get('sidebar-right', {}).get('zones', {}).get('sidebar', {}), 'width_pct': 0, 'budget_px': 0}, }, } container_specs = calculate_container_specs( page_structure=ctx.page_structure.roles, topics=[t.model_dump() for t in ctx.topics], preset=ctx.preset, slide_width=settings.slide_width, slide_height=settings.slide_height, ) ctx.containers = { role: ContainerInfo( role=spec.role, zone=spec.zone, topic_ids=spec.topic_ids, weight=spec.weight, height_px=spec.height_px, width_px=spec.width_px, max_height_cost=spec.max_height_cost, block_constraints=spec.block_constraints, ) for role, spec in container_specs.items() } slide_images = [] normalized_images = image_sizes or {} if isinstance(normalized_images, list): iterable = [] for item in normalized_images: if not isinstance(item, dict): continue img_key = item.get('path') or item.get('src') or item.get('image_path') or '' iterable.append((img_key, item)) else: iterable = list(normalized_images.items()) for img_key, img_info in iterable: if not img_key: continue cleaned_key = str(img_key).lstrip('/\\').replace('/', '\\') img_path = Path(ctx.base_path) / cleaned_key if ctx.base_path else Path(img_key) width = int(img_info.get('width', 0) or 0) height = int(img_info.get('height', 0) or 0) slide_images.append({ 'path': str(img_path), 'width': width, 'height': height, 'ratio': round((width or 1) / max(1, height or 1), 2), 'topic_id': img_info.get('topic_id'), 'b64': '', }) ctx.slide_images = slide_images ctx.analysis = ctx.analysis.model_copy(update={'image_sizes': image_sizes or {}}) ctx.save_snapshot('stage_1_5a') return ctx def _stage_1_7(ctx: PipelineContext) -> PipelineContext: refs_raw = select_and_generate_references( topics=[t.model_dump() for t in ctx.topics], containers=ctx.containers, page_structure=ctx.page_structure.roles, ) normalized: dict[str, list[BlockReference]] = {} for role, ref in refs_raw.items(): ref_list = ref if isinstance(ref, list) else [ref] normalized[role] = [ BlockReference( block_id=item.get('block_id', ''), variant=item.get('variant', ''), visual_type=item.get('visual_type', ''), schema_info=item.get('schema_info', {}), design_reference_html=item.get('design_reference_html', ''), ) for item in ref_list if isinstance(item, dict) ] ctx.references = normalized ctx.save_snapshot('stage_1_7') return ctx def _stage_1_5b(ctx: PipelineContext) -> PipelineContext: updated = {} font_map = {'본심': 'core', '배경': 'bg', '첨부': 'sidebar', '결론': 'core'} for role, ci in ctx.containers.items(): refs = ctx.references.get(role, []) ref = refs[0] if refs else None schema_info = ref.schema_info if ref else {} font_size = getattr(ctx.font_hierarchy, font_map.get(role, 'core'), 12.0) budget = calculate_design_budget( container_height_px=ci.height_px, container_width_px=ci.width_px, block_schema=schema_info, font_size=font_size, ) updated[role] = ci.model_copy(update={ 'design_budget': DesignBudget( available_height_px=budget['available_height_px'], available_width_px=budget['available_width_px'], max_circle_diameter=budget['max_circle_diameter'], max_img_width=budget['max_img_width'], max_img_height=budget['max_img_height'], fits=budget['fits'], ) }) ctx.containers = updated ctx.save_snapshot('stage_1_5b') return ctx def _topic(ctx: PipelineContext, topic_id: int) -> Topic | None: return next((t for t in ctx.topics if t.id == topic_id), None) def compact_text(text: str, max_len: int) -> str: normalized = re.sub(r"\s+", " ", text).strip() if len(normalized) <= max_len: return normalized cut = normalized[:max_len].rsplit(" ", 1)[0].strip() return (cut or normalized[:max_len]).rstrip(" ,.;:") + "..." def preserve_80_percent(text: str, floor: int = 80, ceiling: int = 180) -> int: normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return floor return max(floor, min(ceiling, int(len(normalized) * 0.8))) def _prefer_source_text(topic: Topic | None, fallback: str) -> str: if not topic: return fallback source = re.sub(r"\s+", " ", (topic.source_data or "")).strip() if source and len(source) >= max(80, len(fallback)): return source summary = re.sub(r"\s+", " ", (topic.summary or "")).strip() if source and len(source) >= 40: return source if summary: return summary return fallback def _trim_visible_copy(text: str, floor: int = 120, ceiling: int = 320) -> str: normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return "" max_len = preserve_80_percent(normalized, floor=floor, ceiling=ceiling) return compact_text(normalized, max_len) def _extract_sentence(text: str, keyword: str, fallback: str) -> str: normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return fallback parts = re.split(r"(?<=[.!?])\s+", normalized) for part in parts: if keyword in part: return part.strip() return fallback def _extract_multiple_sentences(text: str, keywords: list[str], fallback: str, limit: int = 2) -> str: normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return fallback parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", normalized) if p.strip()] selected: list[str] = [] for keyword in keywords: for part in parts: if keyword in part and part not in selected: selected.append(part) break if len(selected) >= limit: break if selected: return " ".join(selected[:limit]) return fallback def _plain_text(value: str) -> str: text = value or '' text = re.sub(r'', '\n', text, flags=re.I) text = re.sub(r'<[^>]+>', ' ', text) text = text.replace('**', '').replace('*', ' ') text = text.replace('<', '<').replace('>', '>').replace('&', '&') text = re.sub(r'!\[[^\]]*\]\([^\)]*\)', ' ', text) text = re.sub(r'\[[^\]]+\]\([^\)]*\)', ' ', text) text = re.sub(r'\s+', ' ', text).strip() return text def _bulletish_lines(text: str, limit: int = 6) -> list[str]: normalized = re.sub(r"\s+", " ", text or "").strip() if not normalized: return [] parts = re.split(r"(?:•|\*\*[^*]+\*\*:?|\s+-\s+|\.\s+)", normalized) cleaned = [] for part in parts: item = re.sub(r"\s+", " ", part).strip(" -•") if not item: continue if len(item) < 6: continue cleaned.append(item) if cleaned: return cleaned[:limit] sentences = [s.strip() for s in re.split(r"(?<=[.!?])\s+", normalized) if s.strip()] return sentences[:limit] def _markdown_section(text: str, start_marker: str, end_marker: str | None = None) -> str: start = text.find(start_marker) if start == -1: return '' chunk = text[start + len(start_marker):] if end_marker: end = chunk.find(end_marker) if end != -1: chunk = chunk[:end] return chunk.strip() def _content_after_frontmatter(raw: str) -> str: if raw.startswith('---'): parts = raw.split('---', 2) if len(parts) == 3: return parts[2].strip() return raw def _content_after_frontmatter(raw: str) -> str: if raw.startswith('---'): parts = raw.split('---', 2) if len(parts) == 3: return parts[2].strip() return raw def _problem_bullets_from_raw(raw: str) -> list[str]: content = _content_after_frontmatter(raw) before_sep = content.split('
\n---', 1)[0] bullets = [] for line in before_sep.splitlines(): stripped = line.strip() if stripped.startswith('* ') and not stripped.startswith('* **'): bullets.append(_plain_text(stripped[2:])) return [b for b in bullets if b] def _details_blocks(raw: str) -> list[str]: return re.findall(r'
(.*?)
', raw, flags=re.S) def _popup_list_html(items: list[str], floor: int = 90, ceiling: int = 260) -> str: if not items: return '
??? ?? ??? ????.
' lis = ''.join( f'
  • {_trim_visible_copy(item, floor=floor, ceiling=ceiling)}
  • ' for item in items ) return f'' def _popup_comparison_table(rows: list[tuple[str, str, str]]) -> str: if not rows: return '
    ??? ???? ????.
    ' body = ''.join( '' f'{_trim_visible_copy(dx, floor=160, ceiling=420)}' f'{axis}' f'{_trim_visible_copy(bim, floor=160, ceiling=420)}' '' for axis, dx, bim in rows ) return ( '
    ' '' '' '' '' '' '' f'{body}
    DX??BIM
    ' ) def _popup_button(button_id: str, label: str) -> str: return ( f"' ) def _popup_overlay(popup_id: str, title: str, content_html: str) -> str: return ( f'' ) def _evidence_bullets_from_raw(raw: str) -> list[str]: blocks = _details_blocks(raw) if not blocks: return [] bullets = [] for line in blocks[0].splitlines(): stripped = line.strip() if stripped.startswith('* '): bullets.append(_plain_text(stripped[2:])) return [b for b in bullets if b] def _definition_sections_from_raw(raw: str) -> list[dict[str, str]]: match = re.search(r'##\s*1\.[^\n]*\n(.*?)##\s*2\.', raw, flags=re.S) block = match.group(1) if match else '' sections: list[dict[str, str]] = [] current_title = None current_lines: list[str] = [] for line in block.splitlines(): stripped = line.strip() if stripped.startswith('* **'): if current_title and current_lines: sections.append({'title': _plain_text(current_title), 'body': _plain_text(' '.join(current_lines))}) current_title = stripped[2:] current_lines = [] elif stripped.startswith('* '): current_lines.append(stripped[2:]) if current_title and current_lines: sections.append({'title': _plain_text(current_title), 'body': _plain_text(' '.join(current_lines))}) return sections def _relation_bullets_from_raw(raw: str) -> list[str]: start = re.search(r'##\s*2\.[^\n]*\n', raw) if not start: return [] block = raw[start.end():].split('
    ', 1)[0] bullets = [] for line in block.splitlines(): stripped = line.strip() if stripped.startswith('* '): content = _plain_text(stripped[2:]) if content and '[??' not in content: bullets.append(content) return bullets def _extract_image_src_from_raw(raw: str) -> str: m = re.search(r'!\[[^\]]*\]\(([^\)]+)\)', raw) return m.group(1).strip() if m else '' def _extract_caption_from_raw(raw: str) -> str: m = re.search(r'\*\[[^\]]+\][^*]+\*', raw) if m: return _plain_text(m.group(0)) return 'relation diagram' def _parse_comparison_rows_from_raw(raw: str) -> list[tuple[str, str, str]]: blocks = _details_blocks(raw) if len(blocks) < 2: return [] rows: list[tuple[str, str, str]] = [] for line in blocks[1].splitlines(): stripped = line.strip() if not stripped.startswith('|'): continue parts = [p.strip() for p in stripped.strip('|').split('|')] if len(parts) != 3: continue if parts[0].startswith(':---') or parts[1].startswith(':---') or parts[2].startswith('---'): continue dx, axis, bim = (_plain_text(parts[0]), _plain_text(parts[1]), _plain_text(parts[2])) if dx == 'DX' and bim == 'BIM': continue rows.append((axis, dx, bim)) return rows def _conclusion_from_raw(raw: str) -> str: m = re.search(r':::note\[[^\]]+\](.*?):::', raw, flags=re.S) block = m.group(1) if m else '' for line in block.splitlines(): stripped = line.strip() if stripped.startswith('* '): return _plain_text(stripped[2:]) return _plain_text(block) def _relation_visual(image_src: str, caption: str) -> str: if image_src: return f'{caption}' return ( '
    ' '' '' 'Digital' 'DX' '' 'BIM' '' 'GIS' '' 'Digital Twin' '
    ' ) def _is_run001_style_document(ctx: PipelineContext, raw: str) -> bool: relation_types = {getattr(t, 'relation_type', '') for t in ctx.topics} if {'hierarchy', 'comparison', 'definition', 'problem'} & relation_types: return True return all(keyword in raw for keyword in ['건설산업', 'BIM', 'DX']) and bool(_parse_comparison_rows_from_raw(raw)) def _section_card(title: str, lines: list[str], tone: str = 'blue') -> str: palette = { 'orange': ('#fff7ed', '#fdba74', '#9a3412'), 'blue': ('#eff6ff', '#93c5fd', '#1e3a8a'), 'slate': ('#f8fafc', '#cbd5e1', '#334155'), 'green': ('#ecfdf5', '#86efac', '#166534'), } bg, border, text = palette.get(tone, palette['blue']) items_html = ''.join( f'
  • {_trim_visible_copy(item, floor=160, ceiling=460)}
  • ' for item in lines if item ) return ( f'
    ' f'
    {title}
    ' f'
      {items_html}
    ' '
    ' ) def _component_placeholder(title: str, summary: str) -> str: return ( '
    ' f'
    {title}
    ' f'
    {_trim_visible_copy(summary, floor=240, ceiling=560)}
    ' '
    ' ) def _type_b_body_shell(inner_html: str) -> str: return ( '
    ' f'{inner_html}' '
    ' ) def _insert_button_into_card(card_html: str, button_html: str) -> str: idx = card_html.rfind('') if idx == -1: return card_html + button_html return ( card_html[:idx] + f'
    {button_html}
    ' + card_html[idx:] ) def _load_dx_effect_cards() -> list[tuple[str, list[str]]]: candidates = [ LOCAL_COMPONENTS_DIR / 'dx.astro', DX_COMPONENT_FALLBACK, ] component_text = '' for path in candidates: if path.exists(): component_text = path.read_text(encoding='utf-8-sig') break if not component_text: return [] headers = [ _plain_text(item) for item in re.findall(r'([^<]+)', component_text) ] if not headers: return [] cards: dict[str, list[str]] = {header: [] for header in headers} rows = re.findall(r'(.*?)', component_text, flags=re.S) for row in rows: cells = re.findall(r']*>(.*?)', row, flags=re.S) if len(cells) < 4: continue category = _plain_text(cells[0]).strip() for index, header in enumerate(headers): bullets = re.findall(r']*>(.*?)', cells[index + 1], flags=re.S) for bullet in bullets: item = _plain_text(bullet) if not item: continue cards[header].append(f'{category}: {item}' if category else item) return [(header, values[:3]) for header, values in cards.items() if values] def _extract_heading_block(raw: str, keyword: str) -> str: lines = raw.splitlines() start = None start_level = 0 for idx, line in enumerate(lines): stripped = line.lstrip() if stripped.startswith('#') and keyword in stripped: start = idx + 1 start_level = len(stripped) - len(stripped.lstrip('#')) break if start is None: return '' end = len(lines) for idx in range(start, len(lines)): stripped = lines[idx].lstrip() if stripped.startswith('#'): level = len(stripped) - len(stripped.lstrip('#')) if level <= start_level: end = idx break return chr(10).join(lines[start:end]).strip() def _extract_grouped_bullets(block: str, base_indent: int = 0) -> list[dict[str, list[str] | str]]: groups: list[dict[str, list[str] | str]] = [] current: dict[str, list[str] | str] | None = None for line in block.splitlines(): if not line.strip(): continue indent = len(line) - len(line.lstrip(' ')) stripped = line.strip() group_match = re.match(r'^[-*]\s+\*\*(.+?)\*\*(.*)$', stripped) if group_match and indent == base_indent: title = _plain_text(group_match.group(1)) tail = _plain_text(group_match.group(2).lstrip(' :')) current = {'title': title, 'items': []} if tail: current['items'].append(tail) groups.append(current) continue if current and re.match(r'^[-*]\s+', stripped): item = _plain_text(re.sub(r'^[-*]\s+', '', stripped)) if item: current['items'].append(item) return groups def _flatten_group_items(groups: list[dict[str, list[str] | str]]) -> list[str]: flattened: list[str] = [] for group in groups: title = str(group.get('title', '')).strip() for item in group.get('items', []): text = _plain_text(str(item)) if text: flattened.append(f'{title}: {text}' if title else text) return flattened def _detect_generic_layout_family(ctx: PipelineContext, raw: str) -> str: relation_types = {getattr(t, 'relation_type', '') for t in ctx.topics} if ' dict: goal_topic = _topic(ctx, 1) process_topic = _topic(ctx, 2) support_topic = _topic(ctx, 3) conclusion_topic = next((t for t in ctx.topics if getattr(t, 'layer', '') == 'conclusion'), ctx.topics[-1] if ctx.topics else None) goal_title = goal_topic.title if goal_topic and goal_topic.title else ctx.analysis.title process_title = process_topic.title if process_topic and process_topic.title else 'Process change' support_title = support_topic.title if support_topic and support_topic.title else 'Stakeholder effects' conclusion_text = _prefer_source_text(conclusion_topic, ctx.analysis.core_message if ctx.analysis else '') goal_groups = _extract_grouped_bullets(_extract_heading_block(raw, goal_title), base_indent=0)[:3] goal_popup_lines = _flatten_group_items(goal_groups) process_groups = _extract_grouped_bullets(_extract_heading_block(raw, process_title), base_indent=2) or _extract_grouped_bullets(_extract_heading_block(raw, process_title), base_indent=0) process_popup_lines = _flatten_group_items(process_groups) dx_cards = _load_dx_effect_cards() stakeholder_popup_lines = [f'{title}: {line}' for title, lines in dx_cards for line in lines] image_src = _extract_image_src_from_raw(raw) if image_src and ctx.base_path: candidate = Path(ctx.base_path) / image_src.lstrip('/').lstrip(chr(92)).replace('/', chr(92)) if not candidate.exists(): image_src = '' else: image_src = '' image_caption = _extract_caption_from_raw(raw) or goal_title goal_sections_html = ''.join( '
    ' '
    {title}
    ' '
      {items}
    ' '
    '.format( color=color, title=group['title'], items=''.join( f'
  • {_trim_visible_copy(_plain_text(str(item)), floor=110, ceiling=240)}
  • ' for item in group.get('items', [])[:2] ), ) for group, color in zip(goal_groups, ['#c2410c', '#8b6b2e', '#166534']) ) goal_popup = _popup_overlay('popup-goal', goal_title, _popup_list_html(goal_popup_lines, floor=220, ceiling=680)) if goal_popup_lines else '' process_popup = _popup_overlay('popup-process', process_title, _popup_list_html(process_popup_lines, floor=220, ceiling=680)) if process_popup_lines else '' stakeholder_popup = _popup_overlay('popup-stakeholder', support_title, _popup_list_html(stakeholder_popup_lines, floor=220, ceiling=680)) if stakeholder_popup_lines else '' goal_card = ( '
    ' f'
    {goal_title}
    ' '
    ' f'
    {goal_sections_html}
    ' '
    ' f'{_relation_visual(image_src, image_caption).replace("height:220px", "height:250px")}' f'
    {image_caption}
    ' '
    ' f'
    {_popup_button("popup-goal", "Goal details")}
    ' '
    ' ) process_cards_html = ''.join( '
    ' '
    {title}
    ' '
      {items}
    ' '
    '.format( title=group['title'], items=''.join( f'
  • {_trim_visible_copy(_plain_text(str(item)), floor=110, ceiling=240)}
  • ' for item in group.get('items', [])[:2] ), ) for group in process_groups[:4] ) process_card = ( '
    ' f'
    {process_title}
    ' f'
    {process_cards_html}
    ' f'
    {_popup_button("popup-process", "Process details")}
    ' '
    ' ) if dx_cards: stakeholder_cards_html = ''.join( '
    ' f'
    {idx}
    ' '
    ' f'
    {title}
    ' f'
      {"".join(f"
    • {_trim_visible_copy(line, floor=120, ceiling=240)}
    • " for line in lines[:2])}
    ' '
    ' for idx, (title, lines) in enumerate(dx_cards[:3], start=1) ) else: stakeholder_cards_html = _component_placeholder(support_title, _prefer_source_text(support_topic, 'No stakeholder detail available.')) stakeholder_card = ( '
    ' f'
    {support_title}
    ' f'
    {stakeholder_cards_html}
    ' f'
    {_popup_button("popup-stakeholder", "Stakeholder details")}
    ' '
    ' ) body_inner = ( f'{goal_card}' f'{process_card}' f'{stakeholder_card}' f'{goal_popup}{process_popup}{stakeholder_popup}' ) body_html = _type_b_body_shell(body_inner) sidebar_html = '
    ' footer_html = '
    ' + f'
    {_trim_visible_copy(conclusion_text, floor=120, ceiling=320)}
    ' + '
    ' return {'body_html': body_html, 'sidebar_html': sidebar_html, 'footer_html': footer_html, 'reasoning': 'goal-image-stakeholder layout selected from document content traits'} def _build_requirements_process_product_layout(ctx: PipelineContext, raw: str) -> dict: req_topic = _topic(ctx, 1) process_topic = _topic(ctx, 2) product_topic = _topic(ctx, 3) conclusion_topic = next((t for t in ctx.topics if getattr(t, 'layer', '') == 'conclusion'), ctx.topics[-1] if ctx.topics else None) req_title = req_topic.title if req_topic and req_topic.title else ctx.analysis.title process_title = process_topic.title if process_topic and process_topic.title else 'Process change' product_title = product_topic.title if product_topic and product_topic.title else 'Product change' conclusion_text = _prefer_source_text(conclusion_topic, ctx.analysis.core_message if ctx.analysis else '') req_groups = _extract_grouped_bullets(_extract_heading_block(raw, req_title), base_indent=0)[:3] process_groups = _extract_grouped_bullets(_extract_heading_block(raw, process_title), base_indent=0)[:3] product_groups = _extract_grouped_bullets(_extract_heading_block(raw, product_title), base_indent=0)[:3] req_popup = _popup_overlay('popup-req', req_title, _popup_list_html(_flatten_group_items(req_groups), floor=220, ceiling=700)) process_popup = _popup_overlay('popup-process', process_title, _popup_list_html(_flatten_group_items(process_groups), floor=220, ceiling=700)) product_popup = _popup_overlay('popup-product', product_title, _popup_list_html(_flatten_group_items(product_groups), floor=220, ceiling=700)) req_cards = ''.join( '
    ' '
    {title}
    ' '
      {items}
    ' '
    '.format( color=color, title=group['title'], items=''.join( f'
  • {_trim_visible_copy(_plain_text(str(item)), floor=120, ceiling=260)}
  • ' for item in group.get('items', [])[:3] ), ) for group, color in zip(req_groups, ['#2563eb', '#7c3aed', '#16a34a']) ) requirements_block = ( '
    ' f'
    {req_title}
    ' f'
    {req_cards}
    ' f'
    {_popup_button("popup-req", "Requirements details")}
    ' '
    ' ) process_cards = ''.join( '
    ' '
    {title}
    ' '
      {items}
    ' '
    '.format( title=group['title'], items=''.join( f'
  • {_trim_visible_copy(_plain_text(str(item)), floor=120, ceiling=260)}
  • ' for item in group.get('items', [])[:2] ), ) for group in process_groups ) process_block = ( '
    ' f'
    {process_title}
    ' f'
    {process_cards}
    ' f'
    {_popup_button("popup-process", "Process details")}
    ' '
    ' ) product_cards = ''.join( '
    ' '
    {title}
    ' '
      {items}
    ' '
    '.format( title=group['title'], items=''.join( f'
  • {_trim_visible_copy(_plain_text(str(item)), floor=120, ceiling=260)}
  • ' for item in group.get('items', [])[:2] ), ) for group in product_groups ) product_block = ( '
    ' f'
    {product_title}
    ' f'
    {product_cards}
    ' f'
    {_popup_button("popup-product", "Product details")}
    ' '
    ' ) body_inner = ( f'{requirements_block}' f'{process_block}' f'{product_block}' f'{req_popup}{process_popup}{product_popup}' ) body_html = _type_b_body_shell(body_inner) sidebar_html = '
    ' footer_html = '
    ' + f'
    {_trim_visible_copy(conclusion_text, floor=120, ceiling=320)}
    ' + '
    ' return {'body_html': body_html, 'sidebar_html': sidebar_html, 'footer_html': footer_html, 'reasoning': 'requirements-process-product layout selected from document content traits'} def _build_stage2_retry_html(ctx: PipelineContext, retry_plan: dict) -> dict: raw = ctx.raw_content or '' is_run001_style = _is_run001_style_document(ctx, raw) if is_run001_style: problem_topic = _topic(ctx, 1) definitions_topic = _topic(ctx, 2) relation_topic = _topic(ctx, 3) evidence_topic = _topic(ctx, 4) comparison_topic = _topic(ctx, 5) problem_title = problem_topic.title if problem_topic and problem_topic.title else '용어의 혼용' definitions_title = definitions_topic.title if definitions_topic and definitions_topic.title else '용어 정의' relation_title = relation_topic.title if relation_topic and relation_topic.title else '용어간 상호관계' evidence_title = evidence_topic.title if evidence_topic and evidence_topic.title else '혼용 대표 사례' comparison_title = comparison_topic.title if comparison_topic and comparison_topic.title else 'DX와 BIM의 구분' problem_bullets = _problem_bullets_from_raw(raw)[:2] all_evidence_bullets = _evidence_bullets_from_raw(raw) evidence_bullets = all_evidence_bullets[:2] definition_sections = _definition_sections_from_raw(raw)[:3] relation_bullets = _relation_bullets_from_raw(raw)[:5] comparison_rows = _parse_comparison_rows_from_raw(raw) preferred_axes = ['범위', '프로세스', '성과품', '확장성'] picked_rows = [row for row in comparison_rows if row[0] in preferred_axes] if len(picked_rows) < 4: seen = {row[0] for row in picked_rows} for row in comparison_rows: if row[0] not in seen: picked_rows.append(row) seen.add(row[0]) if len(picked_rows) >= 4: break picked_rows = picked_rows[:4] image_src = _extract_image_src_from_raw(raw) if image_src and ctx.base_path: candidate = Path(ctx.base_path) / image_src.lstrip('/\\').replace('/', '\\') if not candidate.exists(): image_src = '' else: image_src = '' image_caption = _extract_caption_from_raw(raw) conclusion_text = _conclusion_from_raw(raw) problem_items_html = ''.join( f'
  • {_trim_visible_copy(item, floor=130, ceiling=280)}
  • ' for item in problem_bullets ) evidence_items_html = ''.join( f'
  • {_trim_visible_copy(item, floor=140, ceiling=320)}
  • ' for item in evidence_bullets ) relation_items_html = ''.join( f'
  • {_trim_visible_copy(item, floor=120, ceiling=260)}
  • ' for item in relation_bullets ) definition_cards_html = '' for idx, section in enumerate(definition_sections, start=1): definition_cards_html += ( '
    ' f'
    {idx}
    ' '
    ' f'
    {section["title"]}
    ' f'
    {_trim_visible_copy(section["body"], floor=220, ceiling=520)}
    ' '
    ' ) comparison_rows_html = '' for axis, dx, bim in picked_rows: comparison_rows_html += ( '
    ' f'
    {_trim_visible_copy(dx, floor=110, ceiling=220)}
    ' f'
    {axis}
    ' f'
    {_trim_visible_copy(bim, floor=110, ceiling=220)}
    ' '
    ' ) evidence_popup_html = _popup_overlay('popup-evidence', evidence_title, _popup_list_html(all_evidence_bullets, floor=220, ceiling=520)) comparison_popup_html = _popup_overlay('popup-comparison', comparison_title, _popup_comparison_table(comparison_rows)) intro_html = ( '
    ' '
    ' '
    ' '
    ' f'
    {problem_title}
    ' f'
      {problem_items_html}
    ' f'
    {evidence_title}
    ' f'
      {evidence_items_html}
    ' f'{_popup_button("popup-evidence", "상세 사례")}' '
    → 각 용어의 정의, 역할, 상호관계에 대한 체계적 정리 필요
    ' '
    ' ) relation_html = ( '
    ' f'
    {relation_title}
    ' '
    ' '
    ' f'{_relation_visual(image_src, image_caption).replace("height:220px", "height:210px").replace("padding:10px", "padding:12px")}' f'
    {image_caption}
    ' '
    ' '
    ' f'
      {relation_items_html}
    ' '
    ' '
    ' '
    DX
    ' '
    구분
    ' '
    BIM
    ' '
    ' f'{comparison_rows_html}' '
    ' f'{_popup_button("popup-comparison", "상세 비교 보기")}' '
    ' ) body_html = '
    ' + intro_html + relation_html + evidence_popup_html + comparison_popup_html + '
    ' sidebar_html = '
    ' + f'
    {definitions_title}
    ' + definition_cards_html + '
    ' footer_html = '
    ' + f'
    {conclusion_text}
    ' + '
    ' return {'body_html': body_html, 'sidebar_html': sidebar_html, 'footer_html': footer_html, 'reasoning': 'retry regrouping by content importance: grouped problem+evidence with popup details, relation block, visible comparison summary with full popup, numbered definition cards'} layout_family = _detect_generic_layout_family(ctx, raw) if layout_family == 'goal-image-stakeholder': return _build_goal_image_stakeholder_layout(ctx, raw) if layout_family == 'requirements-process-product': return _build_requirements_process_product_layout(ctx, raw) main_topics = [t for t in ctx.topics if getattr(t, 'layer', '') != 'conclusion'] intro_topic = main_topics[0] if len(main_topics) > 0 else None body_topic = main_topics[1] if len(main_topics) > 1 else None support_topic = main_topics[2] if len(main_topics) > 2 else None conclusion_topic = next((t for t in ctx.topics if getattr(t, 'layer', '') == 'conclusion'), ctx.topics[-1] if ctx.topics else None) intro_title = intro_topic.title if intro_topic and intro_topic.title else ctx.analysis.title body_title = body_topic.title if body_topic and body_topic.title else '본문' support_title = support_topic.title if support_topic and support_topic.title else '보조 정보' conclusion_text = _prefer_source_text(conclusion_topic, ctx.analysis.core_message if ctx.analysis else '') intro_full = _bulletish_lines(_prefer_source_text(intro_topic, ''), 12) body_full = _bulletish_lines(_prefer_source_text(body_topic, ''), 14) support_full = _bulletish_lines(_prefer_source_text(support_topic, ''), 12) dx_cards = _load_dx_effect_cards() if ' len(intro_visible) else '' body_popup = _popup_overlay('popup-body', body_title, _popup_list_html(body_full, floor=220, ceiling=680)) if len(body_full) > len(body_visible) else '' support_popup = _popup_overlay('popup-support', support_title, _popup_list_html(support_full, floor=220, ceiling=640)) if len(support_full) > len(support_visible) else '' image_src = _extract_image_src_from_raw(raw) if image_src and ctx.base_path: candidate = Path(ctx.base_path) / image_src.lstrip('/\\').replace('/', '\\') if not candidate.exists(): image_src = '' else: image_src = '' image_caption = _extract_caption_from_raw(raw) or body_title intro_card = _section_card(intro_title, intro_visible, tone='orange') if len(intro_full) > len(intro_visible): intro_card = _insert_button_into_card(intro_card, _popup_button('popup-intro', '나머지 내용 보기')) body_card = _section_card(body_title, body_visible, tone='blue') if len(body_full) > len(body_visible): body_card = _insert_button_into_card(body_card, _popup_button('popup-body', '상세 본문 보기')) if image_src: support_items_html = ''.join( f'
  • {_trim_visible_copy(item, floor=160, ceiling=360)}
  • ' for item in support_visible ) visual_block = ( '
    ' f'{_relation_visual(image_src, image_caption).replace("height:220px", "height:176px")}' f'
    {image_caption}
    ' f'
      {support_items_html}
    ' '
    ' ) elif dx_cards: summary_lines = [f'{title}: {lines[0]}' for title, lines in dx_cards if lines][:4] visual_block = _section_card(support_title, summary_lines, tone='slate') visual_block = _insert_button_into_card(visual_block, _popup_button('popup-support', '주체별 상세 보기')) else: visual_block = _section_card(support_title, support_visible, tone='slate') if len(support_full) > len(support_visible): visual_block = _insert_button_into_card(visual_block, _popup_button('popup-support', '상세 보조 내용 보기')) sidebar_parts: list[str] = [] if dx_cards: for title, lines in dx_cards[:3]: sidebar_parts.append(_section_card(title, lines[:3], tone='slate')) else: if intro_extra: sidebar_parts.append(_section_card(intro_title, intro_extra, tone='orange')) if body_extra: sidebar_parts.append(_section_card(body_title, body_extra, tone='blue')) if support_visible: support_sidebar = _section_card(support_title, support_visible, tone='slate') if len(support_full) > len(support_visible): support_sidebar = _insert_button_into_card(support_sidebar, _popup_button('popup-support', '?? ?? ?? ??')) sidebar_parts.append(support_sidebar) if not sidebar_parts: sidebar_parts.append(_component_placeholder(support_title, _prefer_source_text(support_topic, '보조 정보가 없음.'))) sidebar_inner = ''.join(sidebar_parts) body_html = ( '
    ' f'{intro_card}' '
    ' f'{body_card}' f'{visual_block}' '
    ' f'{intro_popup}{body_popup}{support_popup}' '
    ' ) sidebar_html = '
    ' + sidebar_inner + '
    ' footer_html = '
    ' + f'
    {_trim_visible_copy(conclusion_text, floor=120, ceiling=320)}
    ' + '
    ' return {'body_html': body_html, 'sidebar_html': sidebar_html, 'footer_html': footer_html, 'reasoning': 'generic retry layout for non-run001 documents: preserve section titles, keep visible summary blocks, and move overflow detail into popups'} async def _stage_2(ctx: PipelineContext, retry_plan: dict | None = None) -> PipelineContext: analysis_dict = { 'topics': [t.model_dump() for t in ctx.topics], 'page_structure': ctx.page_structure.roles, 'core_message': ctx.analysis.core_message, 'title': ctx.analysis.title, 'total_pages': ctx.analysis.total_pages, 'image_sizes': ctx.analysis.image_sizes, } container_specs_dict = { role: LegacyContainerSpec( role=ci.role, zone=ci.zone, topic_ids=ci.topic_ids, weight=ci.weight, height_px=ci.height_px, width_px=ci.width_px, max_height_cost=ci.max_height_cost, block_constraints=ci.block_constraints, ) for role, ci in ctx.containers.items() } analysis_dict['phase_t'] = { 'font_hierarchy': ctx.font_hierarchy.model_dump(), 'container_ratio': ctx.container_ratio, 'references': {role: [item.model_dump() for item in refs] for role, refs in ctx.references.items()}, 'design_budgets': { role: ci.design_budget.model_dump() if ci.design_budget else {} for role, ci in ctx.containers.items() }, } generated, verification = await generate_with_retry( content=ctx.raw_content, analysis=analysis_dict, container_specs=container_specs_dict, preset=ctx.preset, images=ctx.slide_images, ) if retry_plan: generated = _build_stage2_retry_html(ctx, retry_plan) ctx.generated_html = generated verification_path = ctx.get_run_dir() / 'stage_2_verification.json' _write_json(verification_path, { area: { 'passed': result.passed, 'score': result.score, 'errors': result.errors, } for area, result in verification.items() }) ctx.save_snapshot('stage_2') return ctx def _stage_3(ctx: PipelineContext) -> PipelineContext: analysis_dict = { 'topics': [t.model_dump() for t in ctx.topics], 'page_structure': ctx.page_structure.roles, 'core_message': ctx.analysis.core_message, 'title': ctx.analysis.title, } ctx.rendered_html = render_slide_from_html(ctx.generated_html, analysis_dict, ctx.preset) if ctx.base_path: ctx.rendered_html = embed_images(ctx.rendered_html, ctx.base_path) ctx.save_snapshot('stage_3') return ctx def _stage_4(ctx: PipelineContext) -> PipelineContext: ctx.measurement = measure_rendered_heights(ctx.rendered_html) ctx.screenshot_b64 = capture_slide_screenshot(ctx.rendered_html) or '' ctx.quality_score = 100 if not any( zone.get('overflowed') for zone in ctx.measurement.get('zones', {}).values() ) else 60 ctx.save_snapshot('stage_4') return ctx async def main() -> None: parser = argparse.ArgumentParser() parser.add_argument('--input', required=True) parser.add_argument('--stage1a', required=True) parser.add_argument('--stage1b', required=True) parser.add_argument('--base-path', default='') parser.add_argument('--output-dir', required=True) args = parser.parse_args() content = Path(args.input).read_text(encoding='utf-8') stage1a = _load_json(Path(args.stage1a)) stage1b_path = Path(args.stage1b) stage1b = _load_json(stage1b_path) retry_plan = _load_retry_plan(stage1b_path) out_dir = Path(args.output_dir) out_dir.mkdir(parents=True, exist_ok=True) ctx = create_context(content, args.base_path) ctx.run_dir = str(out_dir) ctx = _stage_0(ctx) ctx = _stage_1a(ctx, stage1a) ctx = _stage_1b(ctx, stage1b) ctx = _stage_1_5a(ctx) ctx = _stage_1_7(ctx) ctx = _stage_1_5b(ctx) ctx = await _stage_2(ctx, retry_plan=retry_plan or None) ctx = _stage_3(ctx) ctx = _stage_4(ctx) (out_dir / 'generated_html.json').write_text( json.dumps(ctx.generated_html, ensure_ascii=False, indent=2), encoding='utf-8', ) (out_dir / 'final.html').write_text(ctx.rendered_html, encoding='utf-8') (out_dir / 'measurement.json').write_text( json.dumps(ctx.measurement, ensure_ascii=False, indent=2), encoding='utf-8', ) if ctx.screenshot_b64: screenshot_bytes = base64.b64decode(ctx.screenshot_b64) (out_dir / 'final-screenshot-current.png').write_bytes(screenshot_bytes) (out_dir / 'final-screenshot.png').write_bytes(screenshot_bytes) (out_dir / 'context.json').write_text( ctx.model_dump_json(indent=2, exclude={'screenshot_b64', 'rendered_html'}), encoding='utf-8', ) (out_dir / 'final_context.json').write_text( ctx.model_dump_json(indent=2, exclude={'screenshot_b64', 'rendered_html'}), encoding='utf-8', ) if __name__ == '__main__': asyncio.run(main())