from __future__ import annotations import argparse import base64 import asyncio import json import re import sys from pathlib import Path DESIGN_AGENT_ROOT = Path(r'D:\ad-hoc\kei\design_agent') REPO_ROOT = Path(__file__).resolve().parent.parent LOCAL_TEMPLATES_DIR = REPO_ROOT / 'templates' if str(DESIGN_AGENT_ROOT) not in sys.path: sys.path.insert(0, str(DESIGN_AGENT_ROOT)) import src.block_reference as block_reference_module from src.block_reference import select_and_generate_references from src.config import settings from src.content_verifier import generate_with_retry import src.design_director as design_director_module import src.html_generator as html_generator from src.design_director import LAYOUT_PRESETS, select_preset from src.image_utils import embed_images, get_image_sizes from src.mdx_normalizer import normalize_mdx_content from src.pipeline_context import ( Analysis, BlockReference, ContainerInfo, DesignBudget, FontHierarchy, NormalizedContent, PageStructure, PipelineContext, Topic, create_context, ) import src.renderer as renderer_module from src.renderer import render_slide_from_html from src.slide_measurer import capture_slide_screenshot, measure_rendered_heights if not hasattr(html_generator, 'SIDEBAR_PROMPT') and hasattr(html_generator, '_LEGACY_SIDEBAR_PROMPT'): html_generator.SIDEBAR_PROMPT = html_generator._LEGACY_SIDEBAR_PROMPT if not hasattr(html_generator, 'FOOTER_PROMPT') and hasattr(html_generator, '_LEGACY_FOOTER_PROMPT'): html_generator.FOOTER_PROMPT = html_generator._LEGACY_FOOTER_PROMPT if LOCAL_TEMPLATES_DIR.exists(): block_reference_module.TEMPLATES_DIR = LOCAL_TEMPLATES_DIR block_reference_module._jinja_env = None renderer_module.TEMPLATES_DIR = LOCAL_TEMPLATES_DIR renderer_module.CATALOG_PATH = LOCAL_TEMPLATES_DIR / 'catalog.yaml' renderer_module._CATALOG_MAP = None renderer_module._CATALOG_VARIANT_MAP = None renderer_module._env = None if hasattr(design_director_module, '_CATALOG_CACHE'): design_director_module._CATALOG_CACHE = None if hasattr(design_director_module, '_BLOCK_IDS_CACHE'): design_director_module._BLOCK_IDS_CACHE = None from src.space_allocator import ( ContainerSpec as LegacyContainerSpec, calculate_container_specs, calculate_design_budget, calculate_dynamic_ratio, calculate_font_hierarchy, ) def _load_json(path: Path) -> dict: return json.loads(path.read_text(encoding='utf-8-sig')) def _write_json(path: Path, data: dict) -> None: path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8') def _load_retry_plan(stage1b_path: Path) -> dict: retry_plan_path = stage1b_path.parent / 'retry-plan.json' if retry_plan_path.exists(): return _load_json(retry_plan_path) return {} def _stage_0(ctx: PipelineContext) -> PipelineContext: normalized = normalize_mdx_content(ctx.raw_content) ctx.normalized = NormalizedContent( clean_text=normalized['clean_text'], title=normalized['title'], images=normalized['images'], popups=normalized['popups'], tables=normalized['tables'], sections=normalized['sections'], ) ctx.save_snapshot('stage_0') return ctx def _stage_1a(ctx: PipelineContext, stage1a: dict) -> PipelineContext: analysis_raw = stage1a['analysis'] ctx.analysis = Analysis( core_message=analysis_raw['core_message'], title=analysis_raw['title'], total_pages=analysis_raw.get('total_pages', 1), ) ctx.page_structure = PageStructure(roles=stage1a['page_structure']) ctx.topics = [Topic(**raw) for raw in stage1a['topics']] ctx.save_snapshot('stage_1a') return ctx def _stage_1b(ctx: PipelineContext, stage1b: dict) -> PipelineContext: refined_map = {item['topic_id']: item for item in stage1b['concepts']} topics = [] for raw in ctx.topics: merged = raw.model_dump() if raw.id in refined_map: refined = dict(refined_map[raw.id]) refined.pop('source_data', None) merged.update(refined) topics.append(Topic(**merged)) ctx.topics = topics ctx.save_snapshot('stage_1b') return ctx def _stage_1_5a(ctx: PipelineContext) -> PipelineContext: image_sizes = get_image_sizes(ctx.raw_content, ctx.base_path) role_text_lengths = {} for role, info in ctx.page_structure.roles.items(): if isinstance(info, dict): role_text_lengths[role] = len(ctx.get_role_content(role)) font_hierarchy_dict = calculate_font_hierarchy(role_text_lengths) ctx.font_hierarchy = FontHierarchy( key_msg=font_hierarchy_dict.get('핵심', 14.0), core=font_hierarchy_dict.get('본심', 12.0), bg=font_hierarchy_dict.get('배경', 11.0), sidebar=font_hierarchy_dict.get('첨부', 10.0), ) ctx.container_ratio = calculate_dynamic_ratio(role_text_lengths, font_hierarchy_dict) analysis_dict = { 'topics': [t.model_dump() for t in ctx.topics], 'page_structure': ctx.page_structure.roles, } preset_name = select_preset(analysis_dict) ctx.preset_name = preset_name ctx.preset = LAYOUT_PRESETS.get(preset_name, {}) container_specs = calculate_container_specs( page_structure=ctx.page_structure.roles, topics=[t.model_dump() for t in ctx.topics], preset=ctx.preset, slide_width=settings.slide_width, slide_height=settings.slide_height, ) ctx.containers = { role: ContainerInfo( role=spec.role, zone=spec.zone, topic_ids=spec.topic_ids, weight=spec.weight, height_px=spec.height_px, width_px=spec.width_px, max_height_cost=spec.max_height_cost, block_constraints=spec.block_constraints, ) for role, spec in container_specs.items() } slide_images = [] normalized_images = image_sizes or {} if isinstance(normalized_images, list): iterable = [] for item in normalized_images: if not isinstance(item, dict): continue img_key = item.get('path') or item.get('src') or item.get('image_path') or '' iterable.append((img_key, item)) else: iterable = list(normalized_images.items()) for img_key, img_info in iterable: if not img_key: continue cleaned_key = str(img_key).lstrip('/\\').replace('/', '\\') img_path = Path(ctx.base_path) / cleaned_key if ctx.base_path else Path(img_key) width = int(img_info.get('width', 0) or 0) height = int(img_info.get('height', 0) or 0) slide_images.append({ 'path': str(img_path), 'width': width, 'height': height, 'ratio': round((width or 1) / max(1, height or 1), 2), 'topic_id': img_info.get('topic_id'), 'b64': '', }) ctx.slide_images = slide_images ctx.analysis = ctx.analysis.model_copy(update={'image_sizes': image_sizes or {}}) ctx.save_snapshot('stage_1_5a') return ctx def _stage_1_7(ctx: PipelineContext) -> PipelineContext: refs_raw = select_and_generate_references( topics=[t.model_dump() for t in ctx.topics], containers=ctx.containers, page_structure=ctx.page_structure.roles, ) normalized: dict[str, list[BlockReference]] = {} for role, ref in refs_raw.items(): ref_list = ref if isinstance(ref, list) else [ref] normalized[role] = [ BlockReference( block_id=item.get('block_id', ''), variant=item.get('variant', ''), visual_type=item.get('visual_type', ''), schema_info=item.get('schema_info', {}), design_reference_html=item.get('design_reference_html', ''), ) for item in ref_list if isinstance(item, dict) ] ctx.references = normalized ctx.save_snapshot('stage_1_7') return ctx def _stage_1_5b(ctx: PipelineContext) -> PipelineContext: updated = {} font_map = {'본심': 'core', '배경': 'bg', '첨부': 'sidebar', '결론': 'core'} for role, ci in ctx.containers.items(): refs = ctx.references.get(role, []) ref = refs[0] if refs else None schema_info = ref.schema_info if ref else {} font_size = getattr(ctx.font_hierarchy, font_map.get(role, 'core'), 12.0) budget = calculate_design_budget( container_height_px=ci.height_px, container_width_px=ci.width_px, block_schema=schema_info, font_size=font_size, ) updated[role] = ci.model_copy(update={ 'design_budget': DesignBudget( available_height_px=budget['available_height_px'], available_width_px=budget['available_width_px'], max_circle_diameter=budget['max_circle_diameter'], max_img_width=budget['max_img_width'], max_img_height=budget['max_img_height'], fits=budget['fits'], ) }) ctx.containers = updated ctx.save_snapshot('stage_1_5b') return ctx def _topic(ctx: PipelineContext, topic_id: int) -> Topic | None: return next((t for t in ctx.topics if t.id == topic_id), None) def compact_text(text: str, max_len: int) -> str: normalized = re.sub(r"\s+", " ", text).strip() if len(normalized) <= max_len: return normalized cut = normalized[:max_len].rsplit(" ", 1)[0].strip() return (cut or normalized[:max_len]).rstrip(" ,.;:") + "..." def preserve_80_percent(text: str, floor: int = 80, ceiling: int = 180) -> int: normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return floor return max(floor, min(ceiling, int(len(normalized) * 0.8))) def _prefer_source_text(topic: Topic | None, fallback: str) -> str: if not topic: return fallback source = re.sub(r"\s+", " ", (topic.source_data or "")).strip() if source and len(source) >= max(80, len(fallback)): return source summary = re.sub(r"\s+", " ", (topic.summary or "")).strip() if source and len(source) >= 40: return source if summary: return summary return fallback def _trim_visible_copy(text: str, floor: int = 120, ceiling: int = 320) -> str: normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return "" max_len = preserve_80_percent(normalized, floor=floor, ceiling=ceiling) return compact_text(normalized, max_len) def _extract_sentence(text: str, keyword: str, fallback: str) -> str: normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return fallback parts = re.split(r"(?<=[.!?])\s+", normalized) for part in parts: if keyword in part: return part.strip() return fallback def _extract_multiple_sentences(text: str, keywords: list[str], fallback: str, limit: int = 2) -> str: normalized = re.sub(r"\s+", " ", text).strip() if not normalized: return fallback parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", normalized) if p.strip()] selected: list[str] = [] for keyword in keywords: for part in parts: if keyword in part and part not in selected: selected.append(part) break if len(selected) >= limit: break if selected: return " ".join(selected[:limit]) return fallback def _plain_text(value: str) -> str: text = value or '' text = re.sub(r'', '\n', text, flags=re.I) text = re.sub(r'<[^>]+>', ' ', text) text = text.replace('**', '').replace('*', ' ') text = text.replace('<', '<').replace('>', '>').replace('&', '&') text = re.sub(r'!\[[^\]]*\]\([^\)]*\)', ' ', text) text = re.sub(r'\[[^\]]+\]\([^\)]*\)', ' ', text) text = re.sub(r'\s+', ' ', text).strip() return text def _markdown_section(text: str, start_marker: str, end_marker: str | None = None) -> str: start = text.find(start_marker) if start == -1: return '' chunk = text[start + len(start_marker):] if end_marker: end = chunk.find(end_marker) if end != -1: chunk = chunk[:end] return chunk.strip() def _content_after_frontmatter(raw: str) -> str: if raw.startswith('---'): parts = raw.split('---', 2) if len(parts) == 3: return parts[2].strip() return raw def _content_after_frontmatter(raw: str) -> str: if raw.startswith('---'): parts = raw.split('---', 2) if len(parts) == 3: return parts[2].strip() return raw def _problem_bullets_from_raw(raw: str) -> list[str]: content = _content_after_frontmatter(raw) before_sep = content.split('
\n---', 1)[0] bullets = [] for line in before_sep.splitlines(): stripped = line.strip() if stripped.startswith('* ') and not stripped.startswith('* **'): bullets.append(_plain_text(stripped[2:])) return [b for b in bullets if b] def _details_blocks(raw: str) -> list[str]: return re.findall(r'
(.*?)
', raw, flags=re.S) def _popup_list_html(items: list[str], floor: int = 90, ceiling: int = 260) -> str: if not items: return '
??? ?? ??? ????.
' lis = ''.join( f'
  • {_trim_visible_copy(item, floor=floor, ceiling=ceiling)}
  • ' for item in items ) return f'' def _popup_comparison_table(rows: list[tuple[str, str, str]]) -> str: if not rows: return '
    ??? ???? ????.
    ' body = ''.join( '' f'{_trim_visible_copy(dx, floor=160, ceiling=420)}' f'{axis}' f'{_trim_visible_copy(bim, floor=160, ceiling=420)}' '' for axis, dx, bim in rows ) return ( '
    ' '' '' '' '' '' '' f'{body}
    DX??BIM
    ' ) def _popup_button(button_id: str, label: str) -> str: return ( f"' ) def _popup_overlay(popup_id: str, title: str, content_html: str) -> str: return ( f'' ) def _evidence_bullets_from_raw(raw: str) -> list[str]: blocks = _details_blocks(raw) if not blocks: return [] bullets = [] for line in blocks[0].splitlines(): stripped = line.strip() if stripped.startswith('* '): bullets.append(_plain_text(stripped[2:])) return [b for b in bullets if b] def _definition_sections_from_raw(raw: str) -> list[dict[str, str]]: match = re.search(r'##\s*1\.[^\n]*\n(.*?)##\s*2\.', raw, flags=re.S) block = match.group(1) if match else '' sections: list[dict[str, str]] = [] current_title = None current_lines: list[str] = [] for line in block.splitlines(): stripped = line.strip() if stripped.startswith('* **'): if current_title and current_lines: sections.append({'title': _plain_text(current_title), 'body': _plain_text(' '.join(current_lines))}) current_title = stripped[2:] current_lines = [] elif stripped.startswith('* '): current_lines.append(stripped[2:]) if current_title and current_lines: sections.append({'title': _plain_text(current_title), 'body': _plain_text(' '.join(current_lines))}) return sections def _relation_bullets_from_raw(raw: str) -> list[str]: start = re.search(r'##\s*2\.[^\n]*\n', raw) if not start: return [] block = raw[start.end():].split('
    ', 1)[0] bullets = [] for line in block.splitlines(): stripped = line.strip() if stripped.startswith('* '): content = _plain_text(stripped[2:]) if content and '[??' not in content: bullets.append(content) return bullets def _extract_image_src_from_raw(raw: str) -> str: m = re.search(r'!\[[^\]]*\]\(([^\)]+)\)', raw) return m.group(1).strip() if m else '' def _extract_caption_from_raw(raw: str) -> str: m = re.search(r'\*\[[^\]]+\][^*]+\*', raw) if m: return _plain_text(m.group(0)) return 'relation diagram' def _parse_comparison_rows_from_raw(raw: str) -> list[tuple[str, str, str]]: blocks = _details_blocks(raw) if len(blocks) < 2: return [] rows: list[tuple[str, str, str]] = [] for line in blocks[1].splitlines(): stripped = line.strip() if not stripped.startswith('|'): continue parts = [p.strip() for p in stripped.strip('|').split('|')] if len(parts) != 3: continue if parts[0].startswith(':---') or parts[1].startswith(':---') or parts[2].startswith('---'): continue dx, axis, bim = (_plain_text(parts[0]), _plain_text(parts[1]), _plain_text(parts[2])) if dx == 'DX' and bim == 'BIM': continue rows.append((axis, dx, bim)) return rows def _conclusion_from_raw(raw: str) -> str: m = re.search(r':::note\[[^\]]+\](.*?):::', raw, flags=re.S) block = m.group(1) if m else '' for line in block.splitlines(): stripped = line.strip() if stripped.startswith('* '): return _plain_text(stripped[2:]) return _plain_text(block) def _relation_visual(image_src: str, caption: str) -> str: if image_src: return f'{caption}' return ( '
    ' '' '' 'Digital' 'DX' '' 'BIM' '' 'GIS' '' 'Digital Twin' '
    ' ) def _build_stage2_retry_html(ctx: PipelineContext, retry_plan: dict) -> dict: raw = ctx.raw_content or '' problem_topic = _topic(ctx, 1) definitions_topic = _topic(ctx, 2) relation_topic = _topic(ctx, 3) evidence_topic = _topic(ctx, 4) comparison_topic = _topic(ctx, 5) problem_title = problem_topic.title if problem_topic and problem_topic.title else '??? ??' definitions_title = definitions_topic.title if definitions_topic and definitions_topic.title else '?? ??' relation_title = relation_topic.title if relation_topic and relation_topic.title else '??? ????' evidence_title = evidence_topic.title if evidence_topic and evidence_topic.title else '?? ?? ??' comparison_title = comparison_topic.title if comparison_topic and comparison_topic.title else 'DX? BIM? ??' problem_bullets = _problem_bullets_from_raw(raw)[:2] all_evidence_bullets = _evidence_bullets_from_raw(raw) evidence_bullets = all_evidence_bullets[:2] definition_sections = _definition_sections_from_raw(raw)[:3] relation_bullets = _relation_bullets_from_raw(raw)[:5] comparison_rows = _parse_comparison_rows_from_raw(raw) preferred_axes = ['??', '????', '???', '???'] picked_rows = [row for row in comparison_rows if row[0] in preferred_axes] if len(picked_rows) < 4: seen = {row[0] for row in picked_rows} for row in comparison_rows: if row[0] not in seen: picked_rows.append(row) seen.add(row[0]) if len(picked_rows) >= 4: break picked_rows = picked_rows[:4] image_src = _extract_image_src_from_raw(raw) if image_src and ctx.base_path: candidate = Path(ctx.base_path) / image_src.lstrip('/\\').replace('/', '\\') if not candidate.exists(): image_src = '' else: image_src = '' image_caption = _extract_caption_from_raw(raw) conclusion_text = _conclusion_from_raw(raw) problem_items_html = ''.join( f'
  • {_trim_visible_copy(item, floor=130, ceiling=280)}
  • ' for item in problem_bullets ) evidence_items_html = ''.join( f'
  • {_trim_visible_copy(item, floor=140, ceiling=320)}
  • ' for item in evidence_bullets ) relation_items_html = ''.join( f'
  • {_trim_visible_copy(item, floor=120, ceiling=260)}
  • ' for item in relation_bullets ) definition_cards_html = '' for idx, section in enumerate(definition_sections, start=1): definition_cards_html += ( '
    ' f'
    {idx}
    ' '
    ' f'
    {section["title"]}
    ' f'
    {_trim_visible_copy(section["body"], floor=220, ceiling=520)}
    ' '
    ' '
    ' ) comparison_rows_html = '' for axis, dx, bim in picked_rows: comparison_rows_html += ( '
    ' f'
    {_trim_visible_copy(dx, floor=110, ceiling=220)}
    ' f'
    {axis}
    ' f'
    {_trim_visible_copy(bim, floor=110, ceiling=220)}
    ' '
    ' ) evidence_popup_html = _popup_overlay( 'popup-evidence', evidence_title, _popup_list_html(all_evidence_bullets, floor=220, ceiling=520), ) comparison_popup_html = _popup_overlay( 'popup-comparison', comparison_title, _popup_comparison_table(comparison_rows), ) intro_html = ( '
    ' '
    ' '
    ' '
    ' f'
    {problem_title}
    ' f'
      {problem_items_html}
    ' f'
    {evidence_title}
    ' f'
      {evidence_items_html}
    ' f'{_popup_button("popup-evidence", "?? ???")}' '
    → ? ??? ??, ??, ????? ?? ??? ?? ??
    ' '
    ' '
    ' '
    ' ) relation_html = ( '
    ' f'
    {relation_title}
    ' '
    ' '
    ' f'{_relation_visual(image_src, image_caption).replace("height:220px", "height:210px").replace("padding:10px", "padding:12px")}' f'
    {image_caption}
    ' '
    ' '
    ' f'
      {relation_items_html}
    ' '
    ' '
    ' '
    DX
    ' '
    ??
    ' '
    BIM
    ' '
    ' f'{comparison_rows_html}' '
    ' f'{_popup_button("popup-comparison", "??? ?? ??")}' '
    ' '
    ' '
    ' ) body_html = ( '
    ' f'{intro_html}' f'{relation_html}' f'{evidence_popup_html}' f'{comparison_popup_html}' '
    ' ) sidebar_html = ( '
    ' f'
    {definitions_title}
    ' f'{definition_cards_html}' '
    ' ) footer_html = ( '
    ' f'
    {conclusion_text}
    ' '
    ' ) return { 'body_html': body_html, 'sidebar_html': sidebar_html, 'footer_html': footer_html, 'reasoning': 'retry regrouping by content importance: grouped problem+evidence with popup details, relation block, visible comparison summary with full popup, numbered definition cards', } async def _stage_2(ctx: PipelineContext, retry_plan: dict | None = None) -> PipelineContext: analysis_dict = { 'topics': [t.model_dump() for t in ctx.topics], 'page_structure': ctx.page_structure.roles, 'core_message': ctx.analysis.core_message, 'title': ctx.analysis.title, 'total_pages': ctx.analysis.total_pages, 'image_sizes': ctx.analysis.image_sizes, } container_specs_dict = { role: LegacyContainerSpec( role=ci.role, zone=ci.zone, topic_ids=ci.topic_ids, weight=ci.weight, height_px=ci.height_px, width_px=ci.width_px, max_height_cost=ci.max_height_cost, block_constraints=ci.block_constraints, ) for role, ci in ctx.containers.items() } analysis_dict['phase_t'] = { 'font_hierarchy': ctx.font_hierarchy.model_dump(), 'container_ratio': ctx.container_ratio, 'references': {role: [item.model_dump() for item in refs] for role, refs in ctx.references.items()}, 'design_budgets': { role: ci.design_budget.model_dump() if ci.design_budget else {} for role, ci in ctx.containers.items() }, } generated, verification = await generate_with_retry( content=ctx.raw_content, analysis=analysis_dict, container_specs=container_specs_dict, preset=ctx.preset, images=ctx.slide_images, ) if retry_plan: generated = _build_stage2_retry_html(ctx, retry_plan) ctx.generated_html = generated verification_path = ctx.get_run_dir() / 'stage_2_verification.json' _write_json(verification_path, { area: { 'passed': result.passed, 'score': result.score, 'errors': result.errors, } for area, result in verification.items() }) ctx.save_snapshot('stage_2') return ctx def _stage_3(ctx: PipelineContext) -> PipelineContext: analysis_dict = { 'topics': [t.model_dump() for t in ctx.topics], 'page_structure': ctx.page_structure.roles, 'core_message': ctx.analysis.core_message, 'title': ctx.analysis.title, } ctx.rendered_html = render_slide_from_html(ctx.generated_html, analysis_dict, ctx.preset) if ctx.base_path: ctx.rendered_html = embed_images(ctx.rendered_html, ctx.base_path) ctx.save_snapshot('stage_3') return ctx def _stage_4(ctx: PipelineContext) -> PipelineContext: ctx.measurement = measure_rendered_heights(ctx.rendered_html) ctx.screenshot_b64 = capture_slide_screenshot(ctx.rendered_html) or '' ctx.quality_score = 100 if not any( zone.get('overflowed') for zone in ctx.measurement.get('zones', {}).values() ) else 60 ctx.save_snapshot('stage_4') return ctx async def main() -> None: parser = argparse.ArgumentParser() parser.add_argument('--input', required=True) parser.add_argument('--stage1a', required=True) parser.add_argument('--stage1b', required=True) parser.add_argument('--base-path', default='') parser.add_argument('--output-dir', required=True) args = parser.parse_args() content = Path(args.input).read_text(encoding='utf-8') stage1a = _load_json(Path(args.stage1a)) stage1b_path = Path(args.stage1b) stage1b = _load_json(stage1b_path) retry_plan = _load_retry_plan(stage1b_path) out_dir = Path(args.output_dir) out_dir.mkdir(parents=True, exist_ok=True) ctx = create_context(content, args.base_path) ctx.run_dir = str(out_dir) ctx = _stage_0(ctx) ctx = _stage_1a(ctx, stage1a) ctx = _stage_1b(ctx, stage1b) ctx = _stage_1_5a(ctx) ctx = _stage_1_7(ctx) ctx = _stage_1_5b(ctx) ctx = await _stage_2(ctx, retry_plan=retry_plan or None) ctx = _stage_3(ctx) ctx = _stage_4(ctx) (out_dir / 'generated_html.json').write_text( json.dumps(ctx.generated_html, ensure_ascii=False, indent=2), encoding='utf-8', ) (out_dir / 'final.html').write_text(ctx.rendered_html, encoding='utf-8') (out_dir / 'measurement.json').write_text( json.dumps(ctx.measurement, ensure_ascii=False, indent=2), encoding='utf-8', ) if ctx.screenshot_b64: screenshot_bytes = base64.b64decode(ctx.screenshot_b64) (out_dir / 'final-screenshot-current.png').write_bytes(screenshot_bytes) (out_dir / 'final-screenshot.png').write_bytes(screenshot_bytes) (out_dir / 'context.json').write_text( ctx.model_dump_json(indent=2, exclude={'screenshot_b64', 'rendered_html'}), encoding='utf-8', ) (out_dir / 'final_context.json').write_text( ctx.model_dump_json(indent=2, exclude={'screenshot_b64', 'rendered_html'}), encoding='utf-8', ) if __name__ == '__main__': asyncio.run(main())