Files
C.E.L._slide_test/scripts/run_from_artifacts.py

746 lines
29 KiB
Python

from __future__ import annotations
import argparse
import base64
import asyncio
import json
import re
import sys
from pathlib import Path
DESIGN_AGENT_ROOT = Path(r'D:\ad-hoc\kei\design_agent')
REPO_ROOT = Path(__file__).resolve().parent.parent
LOCAL_TEMPLATES_DIR = REPO_ROOT / 'templates'
if str(DESIGN_AGENT_ROOT) not in sys.path:
sys.path.insert(0, str(DESIGN_AGENT_ROOT))
import src.block_reference as block_reference_module
from src.block_reference import select_and_generate_references
from src.config import settings
from src.content_verifier import generate_with_retry
import src.design_director as design_director_module
import src.html_generator as html_generator
from src.design_director import LAYOUT_PRESETS, select_preset
from src.image_utils import embed_images, get_image_sizes
from src.mdx_normalizer import normalize_mdx_content
from src.pipeline_context import (
Analysis,
BlockReference,
ContainerInfo,
DesignBudget,
FontHierarchy,
NormalizedContent,
PageStructure,
PipelineContext,
Topic,
create_context,
)
import src.renderer as renderer_module
from src.renderer import render_slide_from_html
from src.slide_measurer import capture_slide_screenshot, measure_rendered_heights
if not hasattr(html_generator, 'SIDEBAR_PROMPT') and hasattr(html_generator, '_LEGACY_SIDEBAR_PROMPT'):
html_generator.SIDEBAR_PROMPT = html_generator._LEGACY_SIDEBAR_PROMPT
if not hasattr(html_generator, 'FOOTER_PROMPT') and hasattr(html_generator, '_LEGACY_FOOTER_PROMPT'):
html_generator.FOOTER_PROMPT = html_generator._LEGACY_FOOTER_PROMPT
if LOCAL_TEMPLATES_DIR.exists():
block_reference_module.TEMPLATES_DIR = LOCAL_TEMPLATES_DIR
block_reference_module._jinja_env = None
renderer_module.TEMPLATES_DIR = LOCAL_TEMPLATES_DIR
renderer_module.CATALOG_PATH = LOCAL_TEMPLATES_DIR / 'catalog.yaml'
renderer_module._CATALOG_MAP = None
renderer_module._CATALOG_VARIANT_MAP = None
renderer_module._env = None
if hasattr(design_director_module, '_CATALOG_CACHE'):
design_director_module._CATALOG_CACHE = None
if hasattr(design_director_module, '_BLOCK_IDS_CACHE'):
design_director_module._BLOCK_IDS_CACHE = None
from src.space_allocator import (
ContainerSpec as LegacyContainerSpec,
calculate_container_specs,
calculate_design_budget,
calculate_dynamic_ratio,
calculate_font_hierarchy,
)
def _load_json(path: Path) -> dict:
return json.loads(path.read_text(encoding='utf-8-sig'))
def _write_json(path: Path, data: dict) -> None:
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
def _load_retry_plan(stage1b_path: Path) -> dict:
retry_plan_path = stage1b_path.parent / 'retry-plan.json'
if retry_plan_path.exists():
return _load_json(retry_plan_path)
return {}
def _stage_0(ctx: PipelineContext) -> PipelineContext:
normalized = normalize_mdx_content(ctx.raw_content)
ctx.normalized = NormalizedContent(
clean_text=normalized['clean_text'],
title=normalized['title'],
images=normalized['images'],
popups=normalized['popups'],
tables=normalized['tables'],
sections=normalized['sections'],
)
ctx.save_snapshot('stage_0')
return ctx
def _stage_1a(ctx: PipelineContext, stage1a: dict) -> PipelineContext:
analysis_raw = stage1a['analysis']
ctx.analysis = Analysis(
core_message=analysis_raw['core_message'],
title=analysis_raw['title'],
total_pages=analysis_raw.get('total_pages', 1),
)
ctx.page_structure = PageStructure(roles=stage1a['page_structure'])
ctx.topics = [Topic(**raw) for raw in stage1a['topics']]
ctx.save_snapshot('stage_1a')
return ctx
def _stage_1b(ctx: PipelineContext, stage1b: dict) -> PipelineContext:
refined_map = {item['topic_id']: item for item in stage1b['concepts']}
topics = []
for raw in ctx.topics:
merged = raw.model_dump()
if raw.id in refined_map:
refined = dict(refined_map[raw.id])
refined.pop('source_data', None)
merged.update(refined)
topics.append(Topic(**merged))
ctx.topics = topics
ctx.save_snapshot('stage_1b')
return ctx
def _stage_1_5a(ctx: PipelineContext) -> PipelineContext:
image_sizes = get_image_sizes(ctx.raw_content, ctx.base_path)
role_text_lengths = {}
for role, info in ctx.page_structure.roles.items():
if isinstance(info, dict):
role_text_lengths[role] = len(ctx.get_role_content(role))
font_hierarchy_dict = calculate_font_hierarchy(role_text_lengths)
ctx.font_hierarchy = FontHierarchy(
key_msg=font_hierarchy_dict.get('핵심', 14.0),
core=font_hierarchy_dict.get('본심', 12.0),
bg=font_hierarchy_dict.get('배경', 11.0),
sidebar=font_hierarchy_dict.get('첨부', 10.0),
)
ctx.container_ratio = calculate_dynamic_ratio(role_text_lengths, font_hierarchy_dict)
analysis_dict = {
'topics': [t.model_dump() for t in ctx.topics],
'page_structure': ctx.page_structure.roles,
}
preset_name = select_preset(analysis_dict)
ctx.preset_name = preset_name
ctx.preset = LAYOUT_PRESETS.get(preset_name, {})
container_specs = calculate_container_specs(
page_structure=ctx.page_structure.roles,
topics=[t.model_dump() for t in ctx.topics],
preset=ctx.preset,
slide_width=settings.slide_width,
slide_height=settings.slide_height,
)
ctx.containers = {
role: ContainerInfo(
role=spec.role,
zone=spec.zone,
topic_ids=spec.topic_ids,
weight=spec.weight,
height_px=spec.height_px,
width_px=spec.width_px,
max_height_cost=spec.max_height_cost,
block_constraints=spec.block_constraints,
)
for role, spec in container_specs.items()
}
slide_images = []
for img_key, img_info in (image_sizes or {}).items():
img_path = Path(ctx.base_path) / img_key if ctx.base_path else Path(img_key)
slide_images.append({
'path': str(img_path),
'width': img_info.get('width', 0),
'height': img_info.get('height', 0),
'ratio': round(img_info.get('width', 1) / max(1, img_info.get('height', 1)), 2),
'topic_id': img_info.get('topic_id'),
'b64': '',
})
ctx.slide_images = slide_images
ctx.analysis = ctx.analysis.model_copy(update={'image_sizes': image_sizes or {}})
ctx.save_snapshot('stage_1_5a')
return ctx
def _stage_1_7(ctx: PipelineContext) -> PipelineContext:
refs_raw = select_and_generate_references(
topics=[t.model_dump() for t in ctx.topics],
containers=ctx.containers,
page_structure=ctx.page_structure.roles,
)
normalized: dict[str, list[BlockReference]] = {}
for role, ref in refs_raw.items():
ref_list = ref if isinstance(ref, list) else [ref]
normalized[role] = [
BlockReference(
block_id=item.get('block_id', ''),
variant=item.get('variant', ''),
visual_type=item.get('visual_type', ''),
schema_info=item.get('schema_info', {}),
design_reference_html=item.get('design_reference_html', ''),
)
for item in ref_list
if isinstance(item, dict)
]
ctx.references = normalized
ctx.save_snapshot('stage_1_7')
return ctx
def _stage_1_5b(ctx: PipelineContext) -> PipelineContext:
updated = {}
font_map = {'본심': 'core', '배경': 'bg', '첨부': 'sidebar', '결론': 'core'}
for role, ci in ctx.containers.items():
refs = ctx.references.get(role, [])
ref = refs[0] if refs else None
schema_info = ref.schema_info if ref else {}
font_size = getattr(ctx.font_hierarchy, font_map.get(role, 'core'), 12.0)
budget = calculate_design_budget(
container_height_px=ci.height_px,
container_width_px=ci.width_px,
block_schema=schema_info,
font_size=font_size,
)
updated[role] = ci.model_copy(update={
'design_budget': DesignBudget(
available_height_px=budget['available_height_px'],
available_width_px=budget['available_width_px'],
max_circle_diameter=budget['max_circle_diameter'],
max_img_width=budget['max_img_width'],
max_img_height=budget['max_img_height'],
fits=budget['fits'],
)
})
ctx.containers = updated
ctx.save_snapshot('stage_1_5b')
return ctx
def _topic(ctx: PipelineContext, topic_id: int) -> Topic | None:
return next((t for t in ctx.topics if t.id == topic_id), None)
def compact_text(text: str, max_len: int) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if len(normalized) <= max_len:
return normalized
cut = normalized[:max_len].rsplit(" ", 1)[0].strip()
return (cut or normalized[:max_len]).rstrip(" ,.;:") + "..."
def preserve_80_percent(text: str, floor: int = 80, ceiling: int = 180) -> int:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return floor
return max(floor, min(ceiling, int(len(normalized) * 0.8)))
def _prefer_source_text(topic: Topic | None, fallback: str) -> str:
if not topic:
return fallback
source = re.sub(r"\s+", " ", (topic.source_data or "")).strip()
if source and len(source) >= max(80, len(fallback)):
return source
summary = re.sub(r"\s+", " ", (topic.summary or "")).strip()
if source and len(source) >= 40:
return source
if summary:
return summary
return fallback
def _trim_visible_copy(text: str, floor: int = 120, ceiling: int = 320) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return ""
max_len = preserve_80_percent(normalized, floor=floor, ceiling=ceiling)
return compact_text(normalized, max_len)
def _extract_sentence(text: str, keyword: str, fallback: str) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return fallback
parts = re.split(r"(?<=[.!?])\s+", normalized)
for part in parts:
if keyword in part:
return part.strip()
return fallback
def _extract_multiple_sentences(text: str, keywords: list[str], fallback: str, limit: int = 2) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return fallback
parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", normalized) if p.strip()]
selected: list[str] = []
for keyword in keywords:
for part in parts:
if keyword in part and part not in selected:
selected.append(part)
break
if len(selected) >= limit:
break
if selected:
return " ".join(selected[:limit])
return fallback
def _plain_text(value: str) -> str:
text = value or ''
text = re.sub(r'<br\s*/?>', '\n', text, flags=re.I)
text = re.sub(r'<[^>]+>', ' ', text)
text = text.replace('**', '').replace('*', ' ')
text = text.replace('&lt;', '<').replace('&gt;', '>').replace('&amp;', '&')
text = re.sub(r'!\[[^\]]*\]\([^\)]*\)', ' ', text)
text = re.sub(r'\[[^\]]+\]\([^\)]*\)', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def _markdown_section(text: str, start_marker: str, end_marker: str | None = None) -> str:
start = text.find(start_marker)
if start == -1:
return ''
chunk = text[start + len(start_marker):]
if end_marker:
end = chunk.find(end_marker)
if end != -1:
chunk = chunk[:end]
return chunk.strip()
def _content_after_frontmatter(raw: str) -> str:
if raw.startswith('---'):
parts = raw.split('---', 2)
if len(parts) == 3:
return parts[2].strip()
return raw
def _content_after_frontmatter(raw: str) -> str:
if raw.startswith('---'):
parts = raw.split('---', 2)
if len(parts) == 3:
return parts[2].strip()
return raw
def _problem_bullets_from_raw(raw: str) -> list[str]:
content = _content_after_frontmatter(raw)
before_sep = content.split('<br/>\n---', 1)[0]
bullets = []
for line in before_sep.splitlines():
stripped = line.strip()
if stripped.startswith('* ') and not stripped.startswith('* **'):
bullets.append(_plain_text(stripped[2:]))
return [b for b in bullets if b]
def _details_blocks(raw: str) -> list[str]:
return re.findall(r'<details>(.*?)</details>', raw, flags=re.S)
def _evidence_bullets_from_raw(raw: str) -> list[str]:
blocks = _details_blocks(raw)
if not blocks:
return []
bullets = []
for line in blocks[0].splitlines():
stripped = line.strip()
if stripped.startswith('* '):
bullets.append(_plain_text(stripped[2:]))
return [b for b in bullets if b]
def _definition_sections_from_raw(raw: str) -> list[dict[str, str]]:
match = re.search(r'##\s*1\.[^\n]*\n(.*?)##\s*2\.', raw, flags=re.S)
block = match.group(1) if match else ''
sections: list[dict[str, str]] = []
current_title = None
current_lines: list[str] = []
for line in block.splitlines():
stripped = line.strip()
if stripped.startswith('* **'):
if current_title and current_lines:
sections.append({'title': _plain_text(current_title), 'body': _plain_text(' '.join(current_lines))})
current_title = stripped[2:]
current_lines = []
elif stripped.startswith('* '):
current_lines.append(stripped[2:])
if current_title and current_lines:
sections.append({'title': _plain_text(current_title), 'body': _plain_text(' '.join(current_lines))})
return sections
def _relation_bullets_from_raw(raw: str) -> list[str]:
start = re.search(r'##\s*2\.[^\n]*\n', raw)
if not start:
return []
block = raw[start.end():].split('<details>', 1)[0]
bullets = []
for line in block.splitlines():
stripped = line.strip()
if stripped.startswith('* '):
content = _plain_text(stripped[2:])
if content and '[??' not in content:
bullets.append(content)
return bullets
def _extract_image_src_from_raw(raw: str) -> str:
m = re.search(r'!\[[^\]]*\]\(([^\)]+)\)', raw)
return m.group(1).strip() if m else ''
def _extract_caption_from_raw(raw: str) -> str:
m = re.search(r'\*\[[^\]]+\][^*]+\*', raw)
if m:
return _plain_text(m.group(0))
return 'relation diagram'
def _parse_comparison_rows_from_raw(raw: str) -> list[tuple[str, str, str]]:
blocks = _details_blocks(raw)
if len(blocks) < 2:
return []
rows: list[tuple[str, str, str]] = []
for line in blocks[1].splitlines():
stripped = line.strip()
if not stripped.startswith('|'):
continue
parts = [p.strip() for p in stripped.strip('|').split('|')]
if len(parts) != 3:
continue
if parts[0].startswith(':---') or parts[1].startswith(':---') or parts[2].startswith('---'):
continue
dx, axis, bim = (_plain_text(parts[0]), _plain_text(parts[1]), _plain_text(parts[2]))
if dx == 'DX' and bim == 'BIM':
continue
rows.append((axis, dx, bim))
return rows
def _conclusion_from_raw(raw: str) -> str:
m = re.search(r':::note\[[^\]]+\](.*?):::', raw, flags=re.S)
block = m.group(1) if m else ''
for line in block.splitlines():
stripped = line.strip()
if stripped.startswith('* '):
return _plain_text(stripped[2:])
return _plain_text(block)
def _relation_visual(image_src: str, caption: str) -> str:
if image_src:
return f'<img src="{image_src}" alt="{caption}" style="width:100%; height:220px; object-fit:contain; border:1px solid #dbeafe; border-radius:12px; background:#f8fafc; padding:10px;" />'
return (
'<div style="width:100%; height:220px; border:1px solid #dbeafe; border-radius:12px; background:#f8fafc; padding:10px; display:flex; align-items:center; justify-content:center;">'
'<svg viewBox="0 0 320 220" width="100%" height="100%" aria-label="relation diagram">'
'<circle cx="150" cy="110" r="82" fill="#1d4ed8" opacity="0.95"></circle>'
'<text x="150" y="102" text-anchor="middle" font-size="12" fill="#dbeafe">Digital</text>'
'<text x="150" y="122" text-anchor="middle" font-size="28" font-weight="800" fill="#ffffff">DX</text>'
'<circle cx="92" cy="130" r="38" fill="#f59e0b" opacity="0.95"></circle>'
'<text x="92" y="136" text-anchor="middle" font-size="18" font-weight="800" fill="#ffffff">BIM</text>'
'<circle cx="205" cy="92" r="34" fill="#06b6d4" opacity="0.95"></circle>'
'<text x="205" y="98" text-anchor="middle" font-size="16" font-weight="800" fill="#ffffff">GIS</text>'
'<circle cx="196" cy="154" r="32" fill="#f97316" opacity="0.95"></circle>'
'<text x="196" y="158" text-anchor="middle" font-size="11" font-weight="800" fill="#ffffff">Digital Twin</text>'
'</svg></div>'
)
def _build_stage2_retry_html(ctx: PipelineContext, retry_plan: dict) -> dict:
raw = ctx.raw_content or ''
problem_topic = _topic(ctx, 1)
definitions_topic = _topic(ctx, 2)
relation_topic = _topic(ctx, 3)
evidence_topic = _topic(ctx, 4)
comparison_topic = _topic(ctx, 5)
problem_title = problem_topic.title if problem_topic and problem_topic.title else 'Problem'
definitions_title = definitions_topic.title if definitions_topic and definitions_topic.title else 'Definitions'
relation_title = relation_topic.title if relation_topic and relation_topic.title else 'Relationship'
evidence_title = evidence_topic.title if evidence_topic and evidence_topic.title else 'Evidence'
comparison_title = comparison_topic.title if comparison_topic and comparison_topic.title else 'Comparison'
problem_bullets = _problem_bullets_from_raw(raw)[:2]
evidence_bullets = _evidence_bullets_from_raw(raw)[:4]
definition_sections = _definition_sections_from_raw(raw)[:3]
relation_bullets = _relation_bullets_from_raw(raw)[:4]
comparison_rows = _parse_comparison_rows_from_raw(raw)
preferred_axes = ['??', '????', '???', '???']
picked_rows = [row for row in comparison_rows if row[0] in preferred_axes]
if len(picked_rows) < 4:
seen = {row[0] for row in picked_rows}
for row in comparison_rows:
if row[0] not in seen:
picked_rows.append(row)
seen.add(row[0])
if len(picked_rows) >= 4:
break
picked_rows = picked_rows[:4]
image_src = _extract_image_src_from_raw(raw)
if image_src and ctx.base_path:
candidate = Path(ctx.base_path) / image_src.lstrip('/\\').replace('/', '\\')
if not candidate.exists():
image_src = ''
else:
image_src = ''
image_caption = _extract_caption_from_raw(raw)
conclusion_text = _conclusion_from_raw(raw)
intro_len = sum(len(x) for x in problem_bullets + evidence_bullets)
defs_len = sum(len(s['body']) for s in definition_sections)
relation_len = sum(len(x) for x in relation_bullets)
sidebar_width = '34%' if defs_len >= relation_len else '31%'
main_width = '66%' if defs_len >= relation_len else '69%'
relation_visual_height = '210px' if intro_len > 320 else '230px'
problem_items_html = ''.join(
f'<li style="margin-left:16px; margin-bottom:4px;">{_trim_visible_copy(item, floor=90, ceiling=220)}</li>'
for item in problem_bullets
)
evidence_items_html = ''.join(
f'<li style="margin-left:16px; margin-bottom:4px;">{_trim_visible_copy(item, floor=80, ceiling=180)}</li>'
for item in evidence_bullets
)
relation_items_html = ''.join(
f'<li style="margin-left:16px; margin-bottom:4px;">{_trim_visible_copy(item, floor=80, ceiling=210)}</li>'
for item in relation_bullets
)
definition_cards_html = ''
for section in definition_sections:
definition_cards_html += (
'<div style="background:#ffffff; border:1px solid #cbd5e1; border-radius:10px; padding:10px 12px;">'
f'<div style="font-size:11px; font-weight:800; color:#0f172a; margin-bottom:4px;">{section["title"]}</div>'
f'<div style="font-size:9px; line-height:1.55; color:#334155;">{_trim_visible_copy(section["body"], floor=120, ceiling=250)}</div>'
'</div>'
)
comparison_rows_html = ''
for axis, dx, bim in picked_rows:
comparison_rows_html += (
'<tr>'
f'<td style="border:1px solid #bfdbfe; padding:6px 8px; font-size:8px; line-height:1.4; color:#1e3a8a; width:42%;">{_trim_visible_copy(dx, floor=55, ceiling=120)}</td>'
f'<td style="border:1px solid #bfdbfe; padding:6px 8px; font-size:8px; line-height:1.4; font-weight:800; color:#0f172a; width:16%; text-align:center; background:#eff6ff;">{axis}</td>'
f'<td style="border:1px solid #bfdbfe; padding:6px 8px; font-size:8px; line-height:1.4; color:#334155; width:42%;">{_trim_visible_copy(bim, floor=55, ceiling=120)}</td>'
'</tr>'
)
intro_html = (
'<div style="background:linear-gradient(135deg,#fff7ed 0%,#ffedd5 100%); border:1px solid #fdba74; border-radius:12px; padding:10px 12px; display:grid; grid-template-columns:1fr 1fr; gap:12px;">'
f'<div><div style="font-size:12px; font-weight:800; color:#c2410c; margin-bottom:6px;">{problem_title}</div><ul style="font-size:10px; line-height:1.6; color:#7c2d12; padding-left:0; margin:0; list-style:disc;">{problem_items_html}</ul></div>'
f'<div><div style="font-size:12px; font-weight:800; color:#9a3412; margin-bottom:6px;">{evidence_title}</div><ul style="font-size:9px; line-height:1.55; color:#7c2d12; padding-left:0; margin:0; list-style:disc;">{evidence_items_html}</ul></div>'
'</div>'
)
relation_html = (
f'<div style="background:#ffffff; border:1px solid #cbd5e1; border-radius:14px; padding:12px 14px; display:grid; grid-template-columns:280px 1fr; gap:12px;">'
'<div style="display:flex; flex-direction:column; gap:6px;">'
f'{_relation_visual(image_src, image_caption).replace("height:220px", f"height:{relation_visual_height}")}'
f'<div style="font-size:9px; line-height:1.4; color:#166534; background:#dcfce7; border:1px solid #86efac; border-radius:999px; padding:4px 8px; text-align:center;">{image_caption}</div>'
'</div>'
'<div style="display:flex; flex-direction:column; gap:8px;">'
f'<div style="font-size:12px; font-weight:800; color:#1e40af;">{relation_title}</div>'
f'<ul style="font-size:10px; line-height:1.6; color:#334155; padding-left:0; margin:0; list-style:disc;">{relation_items_html}</ul>'
'</div>'
'</div>'
)
comparison_html = (
'<div style="background:#eff6ff; border:1px solid #bfdbfe; border-radius:12px; padding:8px 10px;">'
f'<div style="font-size:11px; font-weight:800; color:#1d4ed8; margin-bottom:6px;">{comparison_title}</div>'
f'<table style="width:100%; border-collapse:collapse; table-layout:fixed;">{comparison_rows_html}</table>'
'</div>'
)
body_html = (
'<div style="width:100%; height:100%; box-sizing:border-box; font-family:\'Segoe UI\',sans-serif; color:#0f172a; display:flex; flex-direction:column; gap:8px;">'
f'{intro_html}'
f'{relation_html}'
f'{comparison_html}'
'</div>'
)
sidebar_html = (
'<div style="width:100%; height:100%; box-sizing:border-box; font-family:\'Segoe UI\',sans-serif; display:flex; flex-direction:column; gap:8px;">'
f'<div style="background:#ffffff; border:1px solid #cbd5e1; border-radius:12px; padding:10px 12px;"><div style="font-size:12px; font-weight:800; color:#1e293b; margin-bottom:8px;">{definitions_title}</div><div style="display:flex; flex-direction:column; gap:8px;">{definition_cards_html}</div></div>'
'</div>'
)
footer_html = (
'<div style="background:linear-gradient(135deg, #006aff 0%, #00aaff 100%); border-radius:10px; padding:10px 20px; text-align:center; color:#ffffff; width:100%; height:52px; display:flex; align-items:center; justify-content:center; box-sizing:border-box;">'
f'<div style="font-size:12px; font-weight:800; line-height:1.35;">{conclusion_text}</div>'
'</div>'
)
return {
'body_html': body_html,
'sidebar_html': sidebar_html,
'footer_html': footer_html,
'reasoning': f"retry regrouping by content importance: intro(problem+evidence), body(relation+comparison), sidebar(definitions), widths {main_width}/{sidebar_width}",
}
async def _stage_2(ctx: PipelineContext, retry_plan: dict | None = None) -> PipelineContext:
analysis_dict = {
'topics': [t.model_dump() for t in ctx.topics],
'page_structure': ctx.page_structure.roles,
'core_message': ctx.analysis.core_message,
'title': ctx.analysis.title,
'total_pages': ctx.analysis.total_pages,
'image_sizes': ctx.analysis.image_sizes,
}
container_specs_dict = {
role: LegacyContainerSpec(
role=ci.role,
zone=ci.zone,
topic_ids=ci.topic_ids,
weight=ci.weight,
height_px=ci.height_px,
width_px=ci.width_px,
max_height_cost=ci.max_height_cost,
block_constraints=ci.block_constraints,
)
for role, ci in ctx.containers.items()
}
analysis_dict['phase_t'] = {
'font_hierarchy': ctx.font_hierarchy.model_dump(),
'container_ratio': ctx.container_ratio,
'references': {role: [item.model_dump() for item in refs] for role, refs in ctx.references.items()},
'design_budgets': {
role: ci.design_budget.model_dump() if ci.design_budget else {}
for role, ci in ctx.containers.items()
},
}
generated, verification = await generate_with_retry(
content=ctx.raw_content,
analysis=analysis_dict,
container_specs=container_specs_dict,
preset=ctx.preset,
images=ctx.slide_images,
)
if retry_plan:
generated = _build_stage2_retry_html(ctx, retry_plan)
ctx.generated_html = generated
verification_path = ctx.get_run_dir() / 'stage_2_verification.json'
_write_json(verification_path, {
area: {
'passed': result.passed,
'score': result.score,
'errors': result.errors,
}
for area, result in verification.items()
})
ctx.save_snapshot('stage_2')
return ctx
def _stage_3(ctx: PipelineContext) -> PipelineContext:
analysis_dict = {
'topics': [t.model_dump() for t in ctx.topics],
'page_structure': ctx.page_structure.roles,
'core_message': ctx.analysis.core_message,
'title': ctx.analysis.title,
}
ctx.rendered_html = render_slide_from_html(ctx.generated_html, analysis_dict, ctx.preset)
if ctx.base_path:
ctx.rendered_html = embed_images(ctx.rendered_html, ctx.base_path)
ctx.save_snapshot('stage_3')
return ctx
def _stage_4(ctx: PipelineContext) -> PipelineContext:
ctx.measurement = measure_rendered_heights(ctx.rendered_html)
ctx.screenshot_b64 = capture_slide_screenshot(ctx.rendered_html) or ''
ctx.quality_score = 100 if not any(
zone.get('overflowed') for zone in ctx.measurement.get('zones', {}).values()
) else 60
ctx.save_snapshot('stage_4')
return ctx
async def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--input', required=True)
parser.add_argument('--stage1a', required=True)
parser.add_argument('--stage1b', required=True)
parser.add_argument('--base-path', default='')
parser.add_argument('--output-dir', required=True)
args = parser.parse_args()
content = Path(args.input).read_text(encoding='utf-8')
stage1a = _load_json(Path(args.stage1a))
stage1b_path = Path(args.stage1b)
stage1b = _load_json(stage1b_path)
retry_plan = _load_retry_plan(stage1b_path)
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
ctx = create_context(content, args.base_path)
ctx.run_dir = str(out_dir)
ctx = _stage_0(ctx)
ctx = _stage_1a(ctx, stage1a)
ctx = _stage_1b(ctx, stage1b)
ctx = _stage_1_5a(ctx)
ctx = _stage_1_7(ctx)
ctx = _stage_1_5b(ctx)
ctx = await _stage_2(ctx, retry_plan=retry_plan or None)
ctx = _stage_3(ctx)
ctx = _stage_4(ctx)
(out_dir / 'generated_html.json').write_text(
json.dumps(ctx.generated_html, ensure_ascii=False, indent=2),
encoding='utf-8',
)
(out_dir / 'final.html').write_text(ctx.rendered_html, encoding='utf-8')
(out_dir / 'measurement.json').write_text(
json.dumps(ctx.measurement, ensure_ascii=False, indent=2),
encoding='utf-8',
)
if ctx.screenshot_b64:
screenshot_bytes = base64.b64decode(ctx.screenshot_b64)
(out_dir / 'final-screenshot-current.png').write_bytes(screenshot_bytes)
(out_dir / 'final-screenshot.png').write_bytes(screenshot_bytes)
(out_dir / 'context.json').write_text(
ctx.model_dump_json(indent=2, exclude={'screenshot_b64', 'rendered_html'}),
encoding='utf-8',
)
(out_dir / 'final_context.json').write_text(
ctx.model_dump_json(indent=2, exclude={'screenshot_b64', 'rendered_html'}),
encoding='utf-8',
)
if __name__ == '__main__':
asyncio.run(main())