from __future__ import annotations
import argparse
import base64
import asyncio
import json
import re
import sys
from pathlib import Path
DESIGN_AGENT_ROOT = Path(r'D:\ad-hoc\kei\design_agent')
REPO_ROOT = Path(__file__).resolve().parent.parent
LOCAL_TEMPLATES_DIR = REPO_ROOT / 'templates'
LOCAL_COMPONENTS_DIR = REPO_ROOT / 'components'
DX_COMPONENT_FALLBACK = Path(r'D:\ad-hoc\cel\src\components\dx.astro')
if str(DESIGN_AGENT_ROOT) not in sys.path:
sys.path.insert(0, str(DESIGN_AGENT_ROOT))
import src.block_reference as block_reference_module
from src.block_reference import select_and_generate_references
from src.config import settings
from src.content_verifier import generate_with_retry
import src.design_director as design_director_module
import src.html_generator as html_generator
from src.design_director import LAYOUT_PRESETS, select_preset
from src.image_utils import embed_images, get_image_sizes
from src.mdx_normalizer import normalize_mdx_content
from src.pipeline_context import (
Analysis,
BlockReference,
ContainerInfo,
DesignBudget,
FontHierarchy,
NormalizedContent,
PageStructure,
PipelineContext,
Topic,
create_context,
)
import src.renderer as renderer_module
from src.renderer import render_slide_from_html
from src.slide_measurer import capture_slide_screenshot, measure_rendered_heights
if not hasattr(html_generator, 'SIDEBAR_PROMPT') and hasattr(html_generator, '_LEGACY_SIDEBAR_PROMPT'):
html_generator.SIDEBAR_PROMPT = html_generator._LEGACY_SIDEBAR_PROMPT
if not hasattr(html_generator, 'FOOTER_PROMPT') and hasattr(html_generator, '_LEGACY_FOOTER_PROMPT'):
html_generator.FOOTER_PROMPT = html_generator._LEGACY_FOOTER_PROMPT
if LOCAL_TEMPLATES_DIR.exists():
block_reference_module.TEMPLATES_DIR = LOCAL_TEMPLATES_DIR
block_reference_module._jinja_env = None
renderer_module.TEMPLATES_DIR = LOCAL_TEMPLATES_DIR
renderer_module.CATALOG_PATH = LOCAL_TEMPLATES_DIR / 'catalog.yaml'
renderer_module._CATALOG_MAP = None
renderer_module._CATALOG_VARIANT_MAP = None
renderer_module._env = None
if hasattr(design_director_module, '_CATALOG_CACHE'):
design_director_module._CATALOG_CACHE = None
if hasattr(design_director_module, '_BLOCK_IDS_CACHE'):
design_director_module._BLOCK_IDS_CACHE = None
from src.space_allocator import (
ContainerSpec as LegacyContainerSpec,
calculate_container_specs,
calculate_design_budget,
calculate_dynamic_ratio,
calculate_font_hierarchy,
)
def _load_json(path: Path) -> dict:
return json.loads(path.read_text(encoding='utf-8-sig'))
def _write_json(path: Path, data: dict) -> None:
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
def _load_retry_plan(stage1b_path: Path) -> dict:
retry_plan_path = stage1b_path.parent / 'retry-plan.json'
if retry_plan_path.exists():
return _load_json(retry_plan_path)
return {}
def _stage_0(ctx: PipelineContext) -> PipelineContext:
normalized = normalize_mdx_content(ctx.raw_content)
ctx.normalized = NormalizedContent(
clean_text=normalized['clean_text'],
title=normalized['title'],
images=normalized['images'],
popups=normalized['popups'],
tables=normalized['tables'],
sections=normalized['sections'],
)
ctx.save_snapshot('stage_0')
return ctx
def _stage_1a(ctx: PipelineContext, stage1a: dict) -> PipelineContext:
analysis_raw = stage1a['analysis']
ctx.analysis = Analysis(
core_message=analysis_raw['core_message'],
title=analysis_raw['title'],
total_pages=analysis_raw.get('total_pages', 1),
layout_template=analysis_raw.get('layout_template', 'A'),
)
ctx.page_structure = PageStructure(roles=stage1a['page_structure'])
ctx.topics = [Topic(**raw) for raw in stage1a['topics']]
ctx.save_snapshot('stage_1a')
return ctx
def _stage_1b(ctx: PipelineContext, stage1b: dict) -> PipelineContext:
refined_map = {item['topic_id']: item for item in stage1b['concepts']}
topics = []
for raw in ctx.topics:
merged = raw.model_dump()
if raw.id in refined_map:
refined = dict(refined_map[raw.id])
refined.pop('source_data', None)
merged.update(refined)
topics.append(Topic(**merged))
ctx.topics = topics
ctx.save_snapshot('stage_1b')
return ctx
def _stage_1_5a(ctx: PipelineContext) -> PipelineContext:
image_sizes = get_image_sizes(ctx.raw_content, ctx.base_path)
role_text_lengths = {}
for role, info in ctx.page_structure.roles.items():
if isinstance(info, dict):
role_text_lengths[role] = len(ctx.get_role_content(role))
font_hierarchy_dict = calculate_font_hierarchy(role_text_lengths)
ctx.font_hierarchy = FontHierarchy(
key_msg=font_hierarchy_dict.get('핵심', 14.0),
core=font_hierarchy_dict.get('본심', 12.0),
bg=font_hierarchy_dict.get('배경', 11.0),
sidebar=font_hierarchy_dict.get('첨부', 10.0),
)
ctx.container_ratio = calculate_dynamic_ratio(role_text_lengths, font_hierarchy_dict)
analysis_dict = {
'topics': [t.model_dump() for t in ctx.topics],
'page_structure': ctx.page_structure.roles,
}
preset_name = select_preset(analysis_dict)
ctx.preset_name = preset_name
ctx.preset = LAYOUT_PRESETS.get(preset_name, {})
if not _is_run001_style_document(ctx, ctx.raw_content or ''):
ctx.preset_name = 'sidebar-right'
ctx.preset = {
**LAYOUT_PRESETS.get('sidebar-right', {}),
'grid_columns': '1fr 0fr',
'zones': {
**LAYOUT_PRESETS.get('sidebar-right', {}).get('zones', {}),
'body': {**LAYOUT_PRESETS.get('sidebar-right', {}).get('zones', {}).get('body', {}), 'width_pct': 100},
'sidebar': {**LAYOUT_PRESETS.get('sidebar-right', {}).get('zones', {}).get('sidebar', {}), 'width_pct': 0, 'budget_px': 0},
},
}
container_specs = calculate_container_specs(
page_structure=ctx.page_structure.roles,
topics=[t.model_dump() for t in ctx.topics],
preset=ctx.preset,
slide_width=settings.slide_width,
slide_height=settings.slide_height,
)
ctx.containers = {
role: ContainerInfo(
role=spec.role,
zone=spec.zone,
topic_ids=spec.topic_ids,
weight=spec.weight,
height_px=spec.height_px,
width_px=spec.width_px,
max_height_cost=spec.max_height_cost,
block_constraints=spec.block_constraints,
)
for role, spec in container_specs.items()
}
slide_images = []
normalized_images = image_sizes or {}
if isinstance(normalized_images, list):
iterable = []
for item in normalized_images:
if not isinstance(item, dict):
continue
img_key = item.get('path') or item.get('src') or item.get('image_path') or ''
iterable.append((img_key, item))
else:
iterable = list(normalized_images.items())
for img_key, img_info in iterable:
if not img_key:
continue
cleaned_key = str(img_key).lstrip('/\\').replace('/', '\\')
img_path = Path(ctx.base_path) / cleaned_key if ctx.base_path else Path(img_key)
width = int(img_info.get('width', 0) or 0)
height = int(img_info.get('height', 0) or 0)
slide_images.append({
'path': str(img_path),
'width': width,
'height': height,
'ratio': round((width or 1) / max(1, height or 1), 2),
'topic_id': img_info.get('topic_id'),
'b64': '',
})
ctx.slide_images = slide_images
ctx.analysis = ctx.analysis.model_copy(update={'image_sizes': image_sizes or {}})
ctx.save_snapshot('stage_1_5a')
return ctx
def _stage_1_7(ctx: PipelineContext) -> PipelineContext:
refs_raw = select_and_generate_references(
topics=[t.model_dump() for t in ctx.topics],
containers=ctx.containers,
page_structure=ctx.page_structure.roles,
)
normalized: dict[str, list[BlockReference]] = {}
for role, ref in refs_raw.items():
ref_list = ref if isinstance(ref, list) else [ref]
normalized[role] = [
BlockReference(
block_id=item.get('block_id', ''),
variant=item.get('variant', ''),
visual_type=item.get('visual_type', ''),
schema_info=item.get('schema_info', {}),
design_reference_html=item.get('design_reference_html', ''),
)
for item in ref_list
if isinstance(item, dict)
]
ctx.references = normalized
ctx.save_snapshot('stage_1_7')
return ctx
def _stage_1_5b(ctx: PipelineContext) -> PipelineContext:
updated = {}
font_map = {'본심': 'core', '배경': 'bg', '첨부': 'sidebar', '결론': 'core'}
for role, ci in ctx.containers.items():
refs = ctx.references.get(role, [])
ref = refs[0] if refs else None
schema_info = ref.schema_info if ref else {}
font_size = getattr(ctx.font_hierarchy, font_map.get(role, 'core'), 12.0)
budget = calculate_design_budget(
container_height_px=ci.height_px,
container_width_px=ci.width_px,
block_schema=schema_info,
font_size=font_size,
)
updated[role] = ci.model_copy(update={
'design_budget': DesignBudget(
available_height_px=budget['available_height_px'],
available_width_px=budget['available_width_px'],
max_circle_diameter=budget['max_circle_diameter'],
max_img_width=budget['max_img_width'],
max_img_height=budget['max_img_height'],
fits=budget['fits'],
)
})
ctx.containers = updated
ctx.save_snapshot('stage_1_5b')
return ctx
def _topic(ctx: PipelineContext, topic_id: int) -> Topic | None:
return next((t for t in ctx.topics if t.id == topic_id), None)
def compact_text(text: str, max_len: int) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if len(normalized) <= max_len:
return normalized
cut = normalized[:max_len].rsplit(" ", 1)[0].strip()
return (cut or normalized[:max_len]).rstrip(" ,.;:") + "..."
def preserve_80_percent(text: str, floor: int = 80, ceiling: int = 180) -> int:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return floor
return max(floor, min(ceiling, int(len(normalized) * 0.8)))
def _prefer_source_text(topic: Topic | None, fallback: str) -> str:
if not topic:
return fallback
source = re.sub(r"\s+", " ", (topic.source_data or "")).strip()
if source:
return source
summary = re.sub(r"\s+", " ", (topic.summary or "")).strip()
if summary:
return summary
return fallback
def _trim_visible_copy(text: str, floor: int = 180, ceiling: int = 520) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return ""
max_len = preserve_80_percent(normalized, floor=floor, ceiling=ceiling)
return compact_text(normalized, max_len)
def _extract_sentence(text: str, keyword: str, fallback: str) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return fallback
parts = re.split(r"(?<=[.!?])\s+", normalized)
for part in parts:
if keyword in part:
return part.strip()
return fallback
def _extract_multiple_sentences(text: str, keywords: list[str], fallback: str, limit: int = 2) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return fallback
parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", normalized) if p.strip()]
selected: list[str] = []
for keyword in keywords:
for part in parts:
if keyword in part and part not in selected:
selected.append(part)
break
if len(selected) >= limit:
break
if selected:
return " ".join(selected[:limit])
return fallback
def _plain_text(value: str) -> str:
text = value or ''
text = re.sub(r' ', '\n', text, flags=re.I)
text = re.sub(r'<[^>]+>', ' ', text)
text = text.replace('**', '').replace('*', ' ')
text = text.replace('<', '<').replace('>', '>').replace('&', '&')
text = re.sub(r'!\[[^\]]*\]\([^\)]*\)', ' ', text)
text = re.sub(r'\[[^\]]+\]\([^\)]*\)', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def _bulletish_lines(text: str, limit: int = 6) -> list[str]:
normalized = re.sub(r"\s+", " ", text or "").strip()
if not normalized:
return []
parts = re.split(r"(?:•|\*\*[^*]+\*\*:?|\s+-\s+|\.\s+)", normalized)
cleaned = []
for part in parts:
item = re.sub(r"\s+", " ", part).strip(" -•")
if not item:
continue
if len(item) < 6:
continue
cleaned.append(item)
if cleaned:
return cleaned[:limit]
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\s+", normalized) if s.strip()]
return sentences[:limit]
def _markdown_section(text: str, start_marker: str, end_marker: str | None = None) -> str:
start = text.find(start_marker)
if start == -1:
return ''
chunk = text[start + len(start_marker):]
if end_marker:
end = chunk.find(end_marker)
if end != -1:
chunk = chunk[:end]
return chunk.strip()
def _content_after_frontmatter(raw: str) -> str:
if raw.startswith('---'):
parts = raw.split('---', 2)
if len(parts) == 3:
return parts[2].strip()
return raw
def _content_after_frontmatter(raw: str) -> str:
if raw.startswith('---'):
parts = raw.split('---', 2)
if len(parts) == 3:
return parts[2].strip()
return raw
def _problem_bullets_from_raw(raw: str) -> list[str]:
content = _content_after_frontmatter(raw)
before_sep = content.split(' \n---', 1)[0]
bullets = []
for line in before_sep.splitlines():
stripped = line.strip()
if stripped.startswith('* ') and not stripped.startswith('* **'):
bullets.append(_plain_text(stripped[2:]))
return [b for b in bullets if b]
def _details_blocks(raw: str) -> list[str]:
return re.findall(r'(.*?) ', raw, flags=re.S)
def _popup_list_html(items: list[str], floor: int = 90, ceiling: int = 260) -> str:
if not items:
return '
?Details ?Details??.
'
lis = ''.join(
f'{_trim_visible_copy(item, floor=max(floor, min(len(re.sub(r"\s+", " ", item).strip()), 260)), ceiling=max(ceiling, 860))} '
for item in items
)
return f''
def _popup_comparison_table(rows: list[tuple[str, str, str]]) -> str:
if not rows:
return '?DetailsDetails??.
'
body = ''.join(
''
f'{_trim_visible_copy(dx, floor=220, ceiling=960)} '
f'{axis} '
f'{_trim_visible_copy(bim, floor=220, ceiling=960)} '
' '
for axis, dx, bim in rows
)
return (
''
'
'
''
'DX '
'?? '
'BIM '
' '
f'{body}
'
)
def _popup_button(button_id: str, label: str) -> str:
return (
f"'
f'{label} '
)
def _popup_overlay(popup_id: str, title: str, content_html: str) -> str:
return (
f''
)
def _evidence_bullets_from_raw(raw: str) -> list[str]:
blocks = _details_blocks(raw)
if not blocks:
return []
bullets = []
for line in blocks[0].splitlines():
stripped = line.strip()
if stripped.startswith('* '):
bullets.append(_plain_text(stripped[2:]))
return [b for b in bullets if b]
def _definition_sections_from_raw(raw: str) -> list[dict[str, str]]:
match = re.search(r'##\s*1\.[^\n]*\n(.*?)##\s*2\.', raw, flags=re.S)
block = match.group(1) if match else ''
sections: list[dict[str, str]] = []
current_title = None
current_lines: list[str] = []
for line in block.splitlines():
stripped = line.strip()
if stripped.startswith('* **'):
if current_title and current_lines:
sections.append({'title': _plain_text(current_title), 'body': _plain_text(' '.join(current_lines))})
current_title = stripped[2:]
current_lines = []
elif stripped.startswith('* '):
current_lines.append(stripped[2:])
if current_title and current_lines:
sections.append({'title': _plain_text(current_title), 'body': _plain_text(' '.join(current_lines))})
return sections
def _relation_bullets_from_raw(raw: str) -> list[str]:
start = re.search(r'##\s*2\.[^\n]*\n', raw)
if not start:
return []
block = raw[start.end():].split('', 1)[0]
bullets = []
for line in block.splitlines():
stripped = line.strip()
if stripped.startswith('* '):
content = _plain_text(stripped[2:])
if content and '[??' not in content:
bullets.append(content)
return bullets
def _extract_image_src_from_raw(raw: str) -> str:
m = re.search(r'!\[[^\]]*\]\(([^\)]+)\)', raw)
return m.group(1).strip() if m else ''
def _extract_caption_from_raw(raw: str) -> str:
m = re.search(r'\*\[[^\]]+\][^*]+\*', raw)
if m:
return _plain_text(m.group(0))
return 'relation diagram'
def _parse_comparison_rows_from_raw(raw: str) -> list[tuple[str, str, str]]:
blocks = _details_blocks(raw)
if len(blocks) < 2:
return []
rows: list[tuple[str, str, str]] = []
for line in blocks[1].splitlines():
stripped = line.strip()
if not stripped.startswith('|'):
continue
parts = [p.strip() for p in stripped.strip('|').split('|')]
if len(parts) != 3:
continue
if parts[0].startswith(':---') or parts[1].startswith(':---') or parts[2].startswith('---'):
continue
dx, axis, bim = (_plain_text(parts[0]), _plain_text(parts[1]), _plain_text(parts[2]))
if dx == 'DX' and bim == 'BIM':
continue
rows.append((axis, dx, bim))
return rows
def _conclusion_from_raw(raw: str) -> str:
m = re.search(r':::note\[[^\]]+\](.*?):::', raw, flags=re.S)
block = m.group(1) if m else ''
for line in block.splitlines():
stripped = line.strip()
if stripped.startswith('* '):
return _plain_text(stripped[2:])
return _plain_text(block)
def _relation_visual(image_src: str, caption: str) -> str:
if image_src:
return f' '
return (
''
''
' '
'Digital '
'DX '
' '
'BIM '
' '
'GIS '
' '
'Digital Twin '
'
'
)
def _is_run001_style_document(ctx: PipelineContext, raw: str) -> bool:
relation_types = {getattr(t, 'relation_type', '') for t in ctx.topics}
if {'hierarchy', 'comparison', 'definition', 'problem'} & relation_types:
return True
return all(keyword in raw for keyword in ['건설산업', 'BIM', 'DX']) and bool(_parse_comparison_rows_from_raw(raw))
def _section_card(title: str, lines: list[str], tone: str = 'blue') -> str:
palette = {
'orange': ('#fff7ed', '#fdba74', '#9a3412'),
'blue': ('#eff6ff', '#93c5fd', '#1e3a8a'),
'slate': ('#f8fafc', '#cbd5e1', '#334155'),
'green': ('#ecfdf5', '#86efac', '#166534'),
}
bg, border, text = palette.get(tone, palette['blue'])
items_html = ''.join(
f'{_trim_visible_copy(item, floor=160, ceiling=460)} '
for item in lines if item
)
return (
f''
)
def _component_placeholder(title: str, summary: str) -> str:
return (
''
f'
{title}
'
f'
{_trim_visible_copy(summary, floor=240, ceiling=560)}
'
'
'
)
def _type_b_body_shell(inner_html: str) -> str:
return (
''
f'{inner_html}'
'
'
)
def _insert_button_into_card(card_html: str, button_html: str) -> str:
idx = card_html.rfind('')
if idx == -1:
return card_html + button_html
return (
card_html[:idx]
+ f'{button_html}
'
+ card_html[idx:]
)
def _load_dx_effect_cards() -> list[tuple[str, list[str]]]:
candidates = [
LOCAL_COMPONENTS_DIR / 'dx.astro',
DX_COMPONENT_FALLBACK,
]
component_text = ''
for path in candidates:
if path.exists():
component_text = path.read_text(encoding='utf-8-sig')
break
if not component_text:
return []
headers = [
_plain_text(item)
for item in re.findall(r'', component_text)
]
if not headers:
return []
cards: dict[str, list[str]] = {header: [] for header in headers}
rows = re.findall(r'(.*?) ', component_text, flags=re.S)
for row in rows:
cells = re.findall(r']*>(.*?) ', row, flags=re.S)
if len(cells) < 4:
continue
category = _plain_text(cells[0]).strip()
for index, header in enumerate(headers):
bullets = re.findall(r']*>(.*?) ', cells[index + 1], flags=re.S)
for bullet in bullets:
item = _plain_text(bullet)
if not item:
continue
cards[header].append(f'{category}: {item}' if category else item)
return [(header, values[:3]) for header, values in cards.items() if values]
def _extract_heading_block(raw: str, keyword: str) -> str:
lines = raw.splitlines()
start = None
start_level = 0
for idx, line in enumerate(lines):
stripped = line.lstrip()
if stripped.startswith('#') and keyword in stripped:
start = idx + 1
start_level = len(stripped) - len(stripped.lstrip('#'))
break
if start is None:
return ''
end = len(lines)
for idx in range(start, len(lines)):
stripped = lines[idx].lstrip()
if stripped.startswith('#'):
level = len(stripped) - len(stripped.lstrip('#'))
if level <= start_level:
end = idx
break
return chr(10).join(lines[start:end]).strip()
def _extract_grouped_bullets(block: str, base_indent: int = 0) -> list[dict[str, list[str] | str]]:
groups: list[dict[str, list[str] | str]] = []
current: dict[str, list[str] | str] | None = None
for line in block.splitlines():
if not line.strip():
continue
indent = len(line) - len(line.lstrip(' '))
stripped = line.strip()
group_match = re.match(r'^[-*]\s+\*\*(.+?)\*\*(.*)$', stripped)
if group_match and indent == base_indent:
title = _plain_text(group_match.group(1))
tail = _plain_text(group_match.group(2).lstrip(' :'))
current = {'title': title, 'items': []}
if tail:
current['items'].append(tail)
groups.append(current)
continue
if current and re.match(r'^[-*]\s+', stripped):
item = _plain_text(re.sub(r'^[-*]\s+', '', stripped))
if item:
current['items'].append(item)
return groups
def _table_rows_from_block(block: str) -> list[tuple[str, str, str]]:
rows: list[tuple[str, str, str]] = []
for line in block.splitlines():
stripped = line.strip()
if not stripped.startswith('|'):
continue
parts = [p.strip() for p in stripped.strip('|').split('|')]
if len(parts) != 3:
continue
if parts[0].startswith(':---') or parts[1].startswith(':---') or parts[2].startswith('---'):
continue
left, axis, right = (_plain_text(parts[0]), _plain_text(parts[1]), _plain_text(parts[2]))
if left.startswith('As-is') and right.startswith('To-be'):
continue
rows.append((left, axis, right))
return rows
def _table_rows_as_lines(rows: list[tuple[str, str, str]], limit: int = 4) -> list[str]:
lines: list[str] = []
for left, axis, right in rows[:limit]:
merged = f'{left} ? {right}'
if axis:
merged = f'{axis}: {merged}'
lines.append(merged)
return lines
def _section_table_lines(raw: str, title: str, limit: int = 4) -> list[str]:
return _table_rows_as_lines(_table_rows_from_block(_extract_heading_block(raw, title)), limit=limit)
def _bullet_lines_from_block(block: str, limit: int = 8) -> list[str]:
lines: list[str] = []
for raw in block.splitlines():
stripped = raw.strip()
if not stripped.startswith(('-', '*')):
continue
cleaned = _plain_text(re.sub(r'^[-*]\s+', '', stripped))
if cleaned:
lines.append(cleaned)
deduped: list[str] = []
for line in lines:
if line not in deduped:
deduped.append(line)
return deduped[:limit]
def _group_card(title: str, groups: list[dict[str, list[str] | str]], tone: str = 'blue', max_items_per_group: int = 2, title_font: int = 11, body_font: float = 9.5) -> str:
palette = {
'blue': ('#eff6ff', '#93c5fd', '#1e3a8a'),
'slate': ('#f8fafc', '#cbd5e1', '#334155'),
'green': ('#ecfdf5', '#86efac', '#166534'),
}
bg, border, accent = palette.get(tone, palette['blue'])
group_html = ''
for group in groups:
items = [_plain_text(str(item)) for item in group.get('items', [])][:max_items_per_group]
if not items:
continue
group_html += (
f''
f'
{group["title"]}
'
f'
{_line_list_html(items, floor=180, ceiling=420, margin_bottom=3)} '
f'
'
)
return (
f''
f'
{title}
'
f'{group_html}'
f'
'
)
def _line_list_html(items: list[str], font_px: float = 10.0, line_height: float = 1.55, floor: int = 140, ceiling: int = 420, margin_bottom: int = 4) -> str:
return ''.join(
f'{_trim_visible_copy(item, floor=floor, ceiling=ceiling)} '
for item in items if item
)
def _flatten_group_items(groups: list[dict[str, list[str] | str]]) -> list[str]:
flattened: list[str] = []
for group in groups:
title = str(group.get('title', '')).strip()
for item in group.get('items', []):
text = _plain_text(str(item))
if text:
flattened.append(f'{title}: {text}' if title else text)
return flattened
def _detect_generic_layout_family(ctx: PipelineContext, raw: str) -> str:
template = getattr(getattr(ctx, 'analysis', None), 'layout_template', '') or ''
if template == 'B_GOAL':
return 'goal-image-stakeholder'
if template == 'B_RPP':
return 'requirements-process-product'
if template == 'B_STACK':
return 'section-stack'
relation_types = {getattr(t, 'relation_type', '') for t in ctx.topics}
if ' dict:
goal_topic = _topic(ctx, 1)
process_topic = _topic(ctx, 2)
support_topic = _topic(ctx, 3)
conclusion_topic = next((t for t in ctx.topics if getattr(t, 'layer', '') == 'conclusion'), ctx.topics[-1] if ctx.topics else None)
goal_title = goal_topic.title if goal_topic and goal_topic.title else ctx.analysis.title
process_title = process_topic.title if process_topic and process_topic.title else 'Process change'
support_title = support_topic.title if support_topic and support_topic.title else 'Stakeholder effects'
conclusion_text = _prefer_source_text(conclusion_topic, ctx.analysis.core_message if ctx.analysis else '')
goal_groups = _extract_grouped_bullets(_extract_heading_block(raw, goal_title), base_indent=0)[:3]
process_block_raw = _extract_heading_block(raw, process_title)
process_lines = _bullet_lines_from_block(process_block_raw, limit=10)
process_popup_lines = process_lines[:] or _flatten_group_items(_extract_grouped_bullets(process_block_raw, base_indent=0))
dx_cards = _load_dx_effect_cards()
stakeholder_popup_lines = [f'{title}: {line}' for title, lines in dx_cards for line in lines]
image_src = _extract_image_src_from_raw(raw)
if image_src and ctx.base_path:
candidate = Path(ctx.base_path) / image_src.lstrip('/').lstrip(chr(92)).replace('/', chr(92))
if not candidate.exists():
image_src = ''
else:
image_src = ''
image_caption = _extract_caption_from_raw(raw) or goal_title
goal_summary_strips = ''.join(
''.format(
color=color,
title=group['title'],
item=_trim_visible_copy(_plain_text(str(group.get('items', [''])[0])), floor=90, ceiling=180),
)
for group, color in zip(goal_groups, ['#c2410c', '#8b6b2e', '#166534'])
)
goal_popup = _popup_overlay('popup-goal', goal_title, _popup_list_html(_flatten_group_items(goal_groups), floor=220, ceiling=900)) if goal_groups else ''
process_popup = _popup_overlay('popup-process', process_title, _popup_list_html(process_popup_lines, floor=220, ceiling=900)) if process_popup_lines else ''
stakeholder_popup = _popup_overlay('popup-stakeholder', support_title, _popup_list_html(stakeholder_popup_lines, floor=220, ceiling=900)) if stakeholder_popup_lines else ''
visual_html = _relation_visual(image_src, image_caption).replace('height:220px', 'height:104px').replace('padding:10px', 'padding:4px') if image_src else _section_card('Goal visual', [_trim_visible_copy(_prefer_source_text(goal_topic, ''), floor=120, ceiling=260)], tone='blue')
goal_card = (
''
f'
{goal_title}
'
'
'
f'
{goal_summary_strips}
'
'
'
f'{visual_html}'
f'
{image_caption}
'
'
'
f'
{_popup_button("popup-goal", "Goal details")}
'
'
'
)
process_card = (
''
f'
{process_title}
'
f'
{_line_list_html(process_lines[:4], floor=160, ceiling=340, margin_bottom=2)} '
f'
{_popup_button("popup-process", "Process details")}
'
'
'
)
if dx_cards:
stakeholder_preview = ''.join(
''
f'
{title}
'
f'
{_trim_visible_copy(lines[0], floor=100, ceiling=180) if lines else ""}
'
'
'
for title, lines in dx_cards[:4]
)
else:
stakeholder_preview = ''.join(
f'{_trim_visible_copy(_prefer_source_text(support_topic, ""), floor=120, ceiling=240)}
'
)
stakeholder_card = (
''
f'
{support_title}
'
f'
{stakeholder_preview}
'
f'
{_popup_button("popup-stakeholder", "Stakeholder details")}
'
'
'
)
lower_block = '' + process_card + stakeholder_card + '
'
body_inner = f'{goal_card}{lower_block}{goal_popup}{process_popup}{stakeholder_popup}'
body_html = _type_b_body_shell(body_inner)
sidebar_html = '
'
footer_html = '' + f'
{_trim_visible_copy(conclusion_text, floor=140, ceiling=360)}
' + '
'
return {'body_html': body_html, 'sidebar_html': sidebar_html, 'footer_html': footer_html, 'reasoning': 'goal/effect Type B layout selected from document content traits'}
def _build_requirements_process_product_layout(ctx: PipelineContext, raw: str) -> dict:
req_topic = _topic(ctx, 1)
process_topic = _topic(ctx, 2)
product_topic = _topic(ctx, 3)
conclusion_topic = next((t for t in ctx.topics if getattr(t, 'layer', '') == 'conclusion'), ctx.topics[-1] if ctx.topics else None)
req_title = req_topic.title if req_topic and req_topic.title else ctx.analysis.title
process_title = process_topic.title if process_topic and process_topic.title else 'Process change'
product_title = product_topic.title if product_topic and product_topic.title else 'Product change'
conclusion_text = _prefer_source_text(conclusion_topic, ctx.analysis.core_message if ctx.analysis else '')
req_groups = _extract_grouped_bullets(_extract_heading_block(raw, req_title), base_indent=0)[:3]
process_groups = _extract_grouped_bullets(_extract_heading_block(raw, process_title), base_indent=0)[:3]
product_groups = _extract_grouped_bullets(_extract_heading_block(raw, product_title), base_indent=0)[:3]
process_table_lines = _section_table_lines(raw, process_title, limit=4)
req_popup = _popup_overlay('popup-req', req_title, _popup_list_html(_flatten_group_items(req_groups), floor=240, ceiling=940))
process_popup_lines = _flatten_group_items(process_groups) + process_table_lines
process_popup = _popup_overlay('popup-process', process_title, _popup_list_html(process_popup_lines, floor=240, ceiling=940))
product_popup = _popup_overlay('popup-product', product_title, _popup_list_html(_flatten_group_items(product_groups), floor=240, ceiling=940))
req_cards = ''.join(
''.format(
color=color,
title=group['title'],
items=_line_list_html([_plain_text(str(item)) for item in group.get('items', [])], floor=190, ceiling=460, margin_bottom=3),
)
for group, color in zip(req_groups, ['#2563eb', '#7c3aed', '#16a34a'])
)
requirements_block = (
''
f'
{req_title}
'
f'
{req_cards}
'
f'
{_popup_button("popup-req", "Details")}
'
'
'
)
process_left_groups = process_groups[:1]
process_right_groups = process_groups[1:]
process_left_title = process_left_groups[0]['title'] if process_left_groups else process_title
process_left_lines = process_table_lines or _flatten_group_items(process_left_groups)
process_left_card = (
''
f'
{process_left_title}
'
f'
{_line_list_html(process_left_lines[:4], floor=200, ceiling=500, margin_bottom=2)} '
'
'
)
process_right_cards = ''.join(
''.format(
title=group['title'],
items=_line_list_html([_plain_text(str(item)) for item in group.get('items', [])], floor=190, ceiling=420, margin_bottom=3),
)
for group in process_right_groups
)
process_card = (
''
f'
{process_title}
'
'
'
f'{process_left_card}'
f'
{process_right_cards}
'
'
'
f'
{_popup_button("popup-process", "Details")}
'
'
'
)
product_body = ''.join(
''
f'
{group["title"]}
'
f'
{_line_list_html([_plain_text(str(item)) for item in group.get("items", [])], floor=190, ceiling=480, margin_bottom=3)} '
'
'
for group in product_groups
)
product_card = (
''
f'
{product_title}
'
f'{product_body}'
f'
{_popup_button("popup-product", "Details")}
'
'
'
)
lower_block = '' + process_card + product_card + '
'
body_inner = f'{requirements_block}{lower_block}{req_popup}{process_popup}{product_popup}'
body_html = _type_b_body_shell(body_inner)
sidebar_html = '
'
footer_html = '' + f'
{_trim_visible_copy(conclusion_text, floor=150, ceiling=420)}
' + '
'
return {'body_html': body_html, 'sidebar_html': sidebar_html, 'footer_html': footer_html, 'reasoning': 'requirements-process-product layout selected from document content traits'}
def _build_stage2_retry_html(ctx: PipelineContext, retry_plan: dict) -> dict:
raw = ctx.raw_content or ''
is_run001_style = _is_run001_style_document(ctx, raw)
if is_run001_style:
problem_topic = _topic(ctx, 1)
definitions_topic = _topic(ctx, 2)
relation_topic = _topic(ctx, 3)
evidence_topic = _topic(ctx, 4)
comparison_topic = _topic(ctx, 5)
problem_title = problem_topic.title if problem_topic and problem_topic.title else '용어의 혼용'
definitions_title = definitions_topic.title if definitions_topic and definitions_topic.title else '용어 정의'
relation_title = relation_topic.title if relation_topic and relation_topic.title else '용어간 상호관계'
evidence_title = evidence_topic.title if evidence_topic and evidence_topic.title else '혼용 대표 사례'
comparison_title = comparison_topic.title if comparison_topic and comparison_topic.title else 'DX와 BIM의 구분'
problem_bullets = _problem_bullets_from_raw(raw)[:2]
all_evidence_bullets = _evidence_bullets_from_raw(raw)
evidence_bullets = all_evidence_bullets[:2]
definition_sections = _definition_sections_from_raw(raw)[:3]
relation_bullets = _relation_bullets_from_raw(raw)[:5]
comparison_rows = _parse_comparison_rows_from_raw(raw)
preferred_axes = ['범위', '프로세스', '성과품', '확장성']
picked_rows = [row for row in comparison_rows if row[0] in preferred_axes]
if len(picked_rows) < 4:
seen = {row[0] for row in picked_rows}
for row in comparison_rows:
if row[0] not in seen:
picked_rows.append(row)
seen.add(row[0])
if len(picked_rows) >= 4:
break
picked_rows = picked_rows[:4]
image_src = _extract_image_src_from_raw(raw)
if image_src and ctx.base_path:
candidate = Path(ctx.base_path) / image_src.lstrip('/\\').replace('/', '\\')
if not candidate.exists():
image_src = ''
else:
image_src = ''
image_caption = _extract_caption_from_raw(raw)
conclusion_text = _conclusion_from_raw(raw)
problem_items_html = ''.join(
f'{_trim_visible_copy(item, floor=130, ceiling=280)} '
for item in problem_bullets
)
evidence_items_html = ''.join(
f'{_trim_visible_copy(item, floor=140, ceiling=320)} '
for item in evidence_bullets
)
relation_items_html = ''.join(
f'{_trim_visible_copy(item, floor=120, ceiling=260)} '
for item in relation_bullets
)
definition_cards_html = ''
for idx, section in enumerate(definition_sections, start=1):
definition_cards_html += (
''
f'
{idx}
'
'
'
f'
{section["title"]}
'
f'
{_trim_visible_copy(section["body"], floor=220, ceiling=520)}
'
'
'
)
comparison_rows_html = ''
for axis, dx, bim in picked_rows:
comparison_rows_html += (
''
f'
{_trim_visible_copy(dx, floor=110, ceiling=220)}
'
f'
{axis}
'
f'
{_trim_visible_copy(bim, floor=110, ceiling=220)}
'
'
'
)
evidence_popup_html = _popup_overlay('popup-evidence', evidence_title, _popup_list_html(all_evidence_bullets, floor=220, ceiling=520))
comparison_popup_html = _popup_overlay('popup-comparison', comparison_title, _popup_comparison_table(comparison_rows))
intro_html = (
''
'
'
'
⚠
'
'
'
f'
{problem_title}
'
f'
'
f'
{evidence_title}
'
f'
'
f'{_popup_button("popup-evidence", "상세 사례")}'
'
→ 각 용어의 정의, 역할, 상호관계에 대한 체계적 정리 필요
'
'
'
)
relation_html = (
''
f'
{relation_title}
'
'
'
'
'
f'{_relation_visual(image_src, image_caption).replace("height:220px", "height:210px").replace("padding:10px", "padding:12px")}'
f'
{image_caption}
'
'
'
'
'
f'
'
'
'
'
'
f'{comparison_rows_html}'
'
'
f'{_popup_button("popup-comparison", "상세 비교 보기")}'
'
'
)
body_html = '' + intro_html + relation_html + evidence_popup_html + comparison_popup_html + '
'
sidebar_html = '' + f'
{definitions_title}
' + definition_cards_html + '
'
footer_html = '' + f'
{conclusion_text}
' + '
'
return {'body_html': body_html, 'sidebar_html': sidebar_html, 'footer_html': footer_html, 'reasoning': 'retry regrouping by content importance: grouped problem+evidence with popup details, relation block, visible comparison summary with full popup, numbered definition cards'}
layout_family = _detect_generic_layout_family(ctx, raw)
if layout_family == 'goal-image-stakeholder':
return _build_goal_image_stakeholder_layout(ctx, raw)
if layout_family == 'requirements-process-product':
return _build_requirements_process_product_layout(ctx, raw)
main_topics = [t for t in ctx.topics if getattr(t, 'layer', '') != 'conclusion']
intro_topic = main_topics[0] if len(main_topics) > 0 else None
body_topic = main_topics[1] if len(main_topics) > 1 else None
support_topic = main_topics[2] if len(main_topics) > 2 else None
conclusion_topic = next((t for t in ctx.topics if getattr(t, 'layer', '') == 'conclusion'), ctx.topics[-1] if ctx.topics else None)
intro_title = intro_topic.title if intro_topic and intro_topic.title else ctx.analysis.title
body_title = body_topic.title if body_topic and body_topic.title else '본문'
support_title = support_topic.title if support_topic and support_topic.title else '보조 정보'
conclusion_text = _prefer_source_text(conclusion_topic, ctx.analysis.core_message if ctx.analysis else '')
intro_full = _bulletish_lines(_prefer_source_text(intro_topic, ''), 12)
body_full = _bulletish_lines(_prefer_source_text(body_topic, ''), 14)
support_full = _bulletish_lines(_prefer_source_text(support_topic, ''), 12)
dx_cards = _load_dx_effect_cards() if ' len(intro_visible) else ''
body_popup = _popup_overlay('popup-body', body_title, _popup_list_html(body_full, floor=220, ceiling=680)) if len(body_full) > len(body_visible) else ''
support_popup = _popup_overlay('popup-support', support_title, _popup_list_html(support_full, floor=220, ceiling=640)) if len(support_full) > len(support_visible) else ''
image_src = _extract_image_src_from_raw(raw)
if image_src and ctx.base_path:
candidate = Path(ctx.base_path) / image_src.lstrip('/\\').replace('/', '\\')
if not candidate.exists():
image_src = ''
else:
image_src = ''
image_caption = _extract_caption_from_raw(raw) or body_title
intro_card = _section_card(intro_title, intro_visible, tone='orange')
if len(intro_full) > len(intro_visible):
intro_card = _insert_button_into_card(intro_card, _popup_button('popup-intro', '나머지 내용 보기'))
body_card = _section_card(body_title, body_visible, tone='blue')
if len(body_full) > len(body_visible):
body_card = _insert_button_into_card(body_card, _popup_button('popup-body', '상세 본문 보기'))
if image_src:
support_items_html = ''.join(
f'{_trim_visible_copy(item, floor=160, ceiling=360)} '
for item in support_visible
)
visual_block = (
''
f'{_relation_visual(image_src, image_caption).replace("height:220px", "height:176px")}'
f'
{image_caption}
'
f'
'
'
'
)
elif dx_cards:
summary_lines = [f'{title}: {lines[0]}' for title, lines in dx_cards if lines][:4]
visual_block = _section_card(support_title, summary_lines, tone='slate')
visual_block = _insert_button_into_card(visual_block, _popup_button('popup-support', '주체별 상세 보기'))
else:
visual_block = _section_card(support_title, support_visible, tone='slate')
if len(support_full) > len(support_visible):
visual_block = _insert_button_into_card(visual_block, _popup_button('popup-support', '상세 보조 내용 보기'))
sidebar_parts: list[str] = []
if dx_cards:
for title, lines in dx_cards[:3]:
sidebar_parts.append(_section_card(title, lines[:3], tone='slate'))
else:
if intro_extra:
sidebar_parts.append(_section_card(intro_title, intro_extra, tone='orange'))
if body_extra:
sidebar_parts.append(_section_card(body_title, body_extra, tone='blue'))
if support_visible:
support_sidebar = _section_card(support_title, support_visible, tone='slate')
if len(support_full) > len(support_visible):
support_sidebar = _insert_button_into_card(support_sidebar, _popup_button('popup-support', 'Details Details'))
sidebar_parts.append(support_sidebar)
if not sidebar_parts:
sidebar_parts.append(_component_placeholder(support_title, _prefer_source_text(support_topic, '보조 정보가 없음.')))
sidebar_inner = ''.join(sidebar_parts)
body_html = (
''
f'{intro_card}'
'
'
f'{body_card}'
f'{visual_block}'
'
'
f'{intro_popup}{body_popup}{support_popup}'
'
'
)
sidebar_html = '' + sidebar_inner + '
'
footer_html = '' + f'
{_trim_visible_copy(conclusion_text, floor=120, ceiling=320)}
' + '
'
return {'body_html': body_html, 'sidebar_html': sidebar_html, 'footer_html': footer_html, 'reasoning': 'generic retry layout for non-run001 documents: preserve section titles, keep visible summary blocks, and move overflow detail into popups'}
async def _stage_2(ctx: PipelineContext, retry_plan: dict | None = None) -> PipelineContext:
analysis_dict = {
'topics': [t.model_dump() for t in ctx.topics],
'page_structure': ctx.page_structure.roles,
'core_message': ctx.analysis.core_message,
'title': ctx.analysis.title,
'total_pages': ctx.analysis.total_pages,
'image_sizes': ctx.analysis.image_sizes,
}
container_specs_dict = {
role: LegacyContainerSpec(
role=ci.role,
zone=ci.zone,
topic_ids=ci.topic_ids,
weight=ci.weight,
height_px=ci.height_px,
width_px=ci.width_px,
max_height_cost=ci.max_height_cost,
block_constraints=ci.block_constraints,
)
for role, ci in ctx.containers.items()
}
analysis_dict['phase_t'] = {
'font_hierarchy': ctx.font_hierarchy.model_dump(),
'container_ratio': ctx.container_ratio,
'references': {role: [item.model_dump() for item in refs] for role, refs in ctx.references.items()},
'design_budgets': {
role: ci.design_budget.model_dump() if ci.design_budget else {}
for role, ci in ctx.containers.items()
},
}
generated, verification = await generate_with_retry(
content=ctx.raw_content,
analysis=analysis_dict,
container_specs=container_specs_dict,
preset=ctx.preset,
images=ctx.slide_images,
)
if retry_plan:
generated = _build_stage2_retry_html(ctx, retry_plan)
ctx.generated_html = generated
verification_path = ctx.get_run_dir() / 'stage_2_verification.json'
_write_json(verification_path, {
area: {
'passed': result.passed,
'score': result.score,
'errors': result.errors,
}
for area, result in verification.items()
})
ctx.save_snapshot('stage_2')
return ctx
def _stage_3(ctx: PipelineContext) -> PipelineContext:
analysis_dict = {
'topics': [t.model_dump() for t in ctx.topics],
'page_structure': ctx.page_structure.roles,
'core_message': ctx.analysis.core_message,
'title': ctx.analysis.title,
}
ctx.rendered_html = render_slide_from_html(ctx.generated_html, analysis_dict, ctx.preset)
if ctx.base_path:
ctx.rendered_html = embed_images(ctx.rendered_html, ctx.base_path)
ctx.save_snapshot('stage_3')
return ctx
def _stage_4(ctx: PipelineContext) -> PipelineContext:
ctx.measurement = measure_rendered_heights(ctx.rendered_html)
ctx.screenshot_b64 = capture_slide_screenshot(ctx.rendered_html) or ''
ctx.quality_score = 100 if not any(
zone.get('overflowed') for zone in ctx.measurement.get('zones', {}).values()
) else 60
ctx.save_snapshot('stage_4')
return ctx
async def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--input', required=True)
parser.add_argument('--stage1a', required=True)
parser.add_argument('--stage1b', required=True)
parser.add_argument('--base-path', default='')
parser.add_argument('--output-dir', required=True)
args = parser.parse_args()
content = Path(args.input).read_text(encoding='utf-8')
stage1a = _load_json(Path(args.stage1a))
stage1b_path = Path(args.stage1b)
stage1b = _load_json(stage1b_path)
retry_plan = _load_retry_plan(stage1b_path)
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
ctx = create_context(content, args.base_path)
ctx.run_dir = str(out_dir)
ctx = _stage_0(ctx)
ctx = _stage_1a(ctx, stage1a)
ctx = _stage_1b(ctx, stage1b)
ctx = _stage_1_5a(ctx)
ctx = _stage_1_7(ctx)
ctx = _stage_1_5b(ctx)
ctx = await _stage_2(ctx, retry_plan=retry_plan or None)
ctx = _stage_3(ctx)
ctx = _stage_4(ctx)
(out_dir / 'generated_html.json').write_text(
json.dumps(ctx.generated_html, ensure_ascii=False, indent=2),
encoding='utf-8',
)
(out_dir / 'final.html').write_text(ctx.rendered_html, encoding='utf-8')
(out_dir / 'measurement.json').write_text(
json.dumps(ctx.measurement, ensure_ascii=False, indent=2),
encoding='utf-8',
)
if ctx.screenshot_b64:
screenshot_bytes = base64.b64decode(ctx.screenshot_b64)
(out_dir / 'final-screenshot-current.png').write_bytes(screenshot_bytes)
(out_dir / 'final-screenshot.png').write_bytes(screenshot_bytes)
(out_dir / 'context.json').write_text(
ctx.model_dump_json(indent=2, exclude={'screenshot_b64', 'rendered_html'}),
encoding='utf-8',
)
(out_dir / 'final_context.json').write_text(
ctx.model_dump_json(indent=2, exclude={'screenshot_b64', 'rendered_html'}),
encoding='utf-8',
)
if __name__ == '__main__':
asyncio.run(main())