from __future__ import annotations
import argparse
import base64
import asyncio
import json
import re
import sys
from pathlib import Path
DESIGN_AGENT_ROOT = Path(r'D:\ad-hoc\kei\design_agent')
REPO_ROOT = Path(__file__).resolve().parent.parent
LOCAL_TEMPLATES_DIR = REPO_ROOT / 'templates'
if str(DESIGN_AGENT_ROOT) not in sys.path:
sys.path.insert(0, str(DESIGN_AGENT_ROOT))
import src.block_reference as block_reference_module
from src.block_reference import select_and_generate_references
from src.config import settings
from src.content_verifier import generate_with_retry
import src.design_director as design_director_module
import src.html_generator as html_generator
from src.design_director import LAYOUT_PRESETS, select_preset
from src.image_utils import embed_images, get_image_sizes
from src.mdx_normalizer import normalize_mdx_content
from src.pipeline_context import (
Analysis,
BlockReference,
ContainerInfo,
DesignBudget,
FontHierarchy,
NormalizedContent,
PageStructure,
PipelineContext,
Topic,
create_context,
)
import src.renderer as renderer_module
from src.renderer import render_slide_from_html
from src.slide_measurer import capture_slide_screenshot, measure_rendered_heights
if not hasattr(html_generator, 'SIDEBAR_PROMPT') and hasattr(html_generator, '_LEGACY_SIDEBAR_PROMPT'):
html_generator.SIDEBAR_PROMPT = html_generator._LEGACY_SIDEBAR_PROMPT
if not hasattr(html_generator, 'FOOTER_PROMPT') and hasattr(html_generator, '_LEGACY_FOOTER_PROMPT'):
html_generator.FOOTER_PROMPT = html_generator._LEGACY_FOOTER_PROMPT
if LOCAL_TEMPLATES_DIR.exists():
block_reference_module.TEMPLATES_DIR = LOCAL_TEMPLATES_DIR
block_reference_module._jinja_env = None
renderer_module.TEMPLATES_DIR = LOCAL_TEMPLATES_DIR
renderer_module.CATALOG_PATH = LOCAL_TEMPLATES_DIR / 'catalog.yaml'
renderer_module._CATALOG_MAP = None
renderer_module._CATALOG_VARIANT_MAP = None
renderer_module._env = None
if hasattr(design_director_module, '_CATALOG_CACHE'):
design_director_module._CATALOG_CACHE = None
if hasattr(design_director_module, '_BLOCK_IDS_CACHE'):
design_director_module._BLOCK_IDS_CACHE = None
from src.space_allocator import (
ContainerSpec as LegacyContainerSpec,
calculate_container_specs,
calculate_design_budget,
calculate_dynamic_ratio,
calculate_font_hierarchy,
)
def _load_json(path: Path) -> dict:
return json.loads(path.read_text(encoding='utf-8-sig'))
def _write_json(path: Path, data: dict) -> None:
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
def _load_retry_plan(stage1b_path: Path) -> dict:
retry_plan_path = stage1b_path.parent / 'retry-plan.json'
if retry_plan_path.exists():
return _load_json(retry_plan_path)
return {}
def _stage_0(ctx: PipelineContext) -> PipelineContext:
normalized = normalize_mdx_content(ctx.raw_content)
ctx.normalized = NormalizedContent(
clean_text=normalized['clean_text'],
title=normalized['title'],
images=normalized['images'],
popups=normalized['popups'],
tables=normalized['tables'],
sections=normalized['sections'],
)
ctx.save_snapshot('stage_0')
return ctx
def _stage_1a(ctx: PipelineContext, stage1a: dict) -> PipelineContext:
analysis_raw = stage1a['analysis']
ctx.analysis = Analysis(
core_message=analysis_raw['core_message'],
title=analysis_raw['title'],
total_pages=analysis_raw.get('total_pages', 1),
)
ctx.page_structure = PageStructure(roles=stage1a['page_structure'])
ctx.topics = [Topic(**raw) for raw in stage1a['topics']]
ctx.save_snapshot('stage_1a')
return ctx
def _stage_1b(ctx: PipelineContext, stage1b: dict) -> PipelineContext:
refined_map = {item['topic_id']: item for item in stage1b['concepts']}
topics = []
for raw in ctx.topics:
merged = raw.model_dump()
if raw.id in refined_map:
refined = dict(refined_map[raw.id])
refined.pop('source_data', None)
merged.update(refined)
topics.append(Topic(**merged))
ctx.topics = topics
ctx.save_snapshot('stage_1b')
return ctx
def _stage_1_5a(ctx: PipelineContext) -> PipelineContext:
image_sizes = get_image_sizes(ctx.raw_content, ctx.base_path)
role_text_lengths = {}
for role, info in ctx.page_structure.roles.items():
if isinstance(info, dict):
role_text_lengths[role] = len(ctx.get_role_content(role))
font_hierarchy_dict = calculate_font_hierarchy(role_text_lengths)
ctx.font_hierarchy = FontHierarchy(
key_msg=font_hierarchy_dict.get('핵심', 14.0),
core=font_hierarchy_dict.get('본심', 12.0),
bg=font_hierarchy_dict.get('배경', 11.0),
sidebar=font_hierarchy_dict.get('첨부', 10.0),
)
ctx.container_ratio = calculate_dynamic_ratio(role_text_lengths, font_hierarchy_dict)
analysis_dict = {
'topics': [t.model_dump() for t in ctx.topics],
'page_structure': ctx.page_structure.roles,
}
preset_name = select_preset(analysis_dict)
ctx.preset_name = preset_name
ctx.preset = LAYOUT_PRESETS.get(preset_name, {})
container_specs = calculate_container_specs(
page_structure=ctx.page_structure.roles,
topics=[t.model_dump() for t in ctx.topics],
preset=ctx.preset,
slide_width=settings.slide_width,
slide_height=settings.slide_height,
)
ctx.containers = {
role: ContainerInfo(
role=spec.role,
zone=spec.zone,
topic_ids=spec.topic_ids,
weight=spec.weight,
height_px=spec.height_px,
width_px=spec.width_px,
max_height_cost=spec.max_height_cost,
block_constraints=spec.block_constraints,
)
for role, spec in container_specs.items()
}
slide_images = []
normalized_images = image_sizes or {}
if isinstance(normalized_images, list):
iterable = []
for item in normalized_images:
if not isinstance(item, dict):
continue
img_key = item.get('path') or item.get('src') or item.get('image_path') or ''
iterable.append((img_key, item))
else:
iterable = list(normalized_images.items())
for img_key, img_info in iterable:
if not img_key:
continue
cleaned_key = str(img_key).lstrip('/\\').replace('/', '\\')
img_path = Path(ctx.base_path) / cleaned_key if ctx.base_path else Path(img_key)
width = int(img_info.get('width', 0) or 0)
height = int(img_info.get('height', 0) or 0)
slide_images.append({
'path': str(img_path),
'width': width,
'height': height,
'ratio': round((width or 1) / max(1, height or 1), 2),
'topic_id': img_info.get('topic_id'),
'b64': '',
})
ctx.slide_images = slide_images
ctx.analysis = ctx.analysis.model_copy(update={'image_sizes': image_sizes or {}})
ctx.save_snapshot('stage_1_5a')
return ctx
def _stage_1_7(ctx: PipelineContext) -> PipelineContext:
refs_raw = select_and_generate_references(
topics=[t.model_dump() for t in ctx.topics],
containers=ctx.containers,
page_structure=ctx.page_structure.roles,
)
normalized: dict[str, list[BlockReference]] = {}
for role, ref in refs_raw.items():
ref_list = ref if isinstance(ref, list) else [ref]
normalized[role] = [
BlockReference(
block_id=item.get('block_id', ''),
variant=item.get('variant', ''),
visual_type=item.get('visual_type', ''),
schema_info=item.get('schema_info', {}),
design_reference_html=item.get('design_reference_html', ''),
)
for item in ref_list
if isinstance(item, dict)
]
ctx.references = normalized
ctx.save_snapshot('stage_1_7')
return ctx
def _stage_1_5b(ctx: PipelineContext) -> PipelineContext:
updated = {}
font_map = {'본심': 'core', '배경': 'bg', '첨부': 'sidebar', '결론': 'core'}
for role, ci in ctx.containers.items():
refs = ctx.references.get(role, [])
ref = refs[0] if refs else None
schema_info = ref.schema_info if ref else {}
font_size = getattr(ctx.font_hierarchy, font_map.get(role, 'core'), 12.0)
budget = calculate_design_budget(
container_height_px=ci.height_px,
container_width_px=ci.width_px,
block_schema=schema_info,
font_size=font_size,
)
updated[role] = ci.model_copy(update={
'design_budget': DesignBudget(
available_height_px=budget['available_height_px'],
available_width_px=budget['available_width_px'],
max_circle_diameter=budget['max_circle_diameter'],
max_img_width=budget['max_img_width'],
max_img_height=budget['max_img_height'],
fits=budget['fits'],
)
})
ctx.containers = updated
ctx.save_snapshot('stage_1_5b')
return ctx
def _topic(ctx: PipelineContext, topic_id: int) -> Topic | None:
return next((t for t in ctx.topics if t.id == topic_id), None)
def compact_text(text: str, max_len: int) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if len(normalized) <= max_len:
return normalized
cut = normalized[:max_len].rsplit(" ", 1)[0].strip()
return (cut or normalized[:max_len]).rstrip(" ,.;:") + "..."
def preserve_80_percent(text: str, floor: int = 80, ceiling: int = 180) -> int:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return floor
return max(floor, min(ceiling, int(len(normalized) * 0.8)))
def _prefer_source_text(topic: Topic | None, fallback: str) -> str:
if not topic:
return fallback
source = re.sub(r"\s+", " ", (topic.source_data or "")).strip()
if source and len(source) >= max(80, len(fallback)):
return source
summary = re.sub(r"\s+", " ", (topic.summary or "")).strip()
if source and len(source) >= 40:
return source
if summary:
return summary
return fallback
def _trim_visible_copy(text: str, floor: int = 120, ceiling: int = 320) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return ""
max_len = preserve_80_percent(normalized, floor=floor, ceiling=ceiling)
return compact_text(normalized, max_len)
def _extract_sentence(text: str, keyword: str, fallback: str) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return fallback
parts = re.split(r"(?<=[.!?])\s+", normalized)
for part in parts:
if keyword in part:
return part.strip()
return fallback
def _extract_multiple_sentences(text: str, keywords: list[str], fallback: str, limit: int = 2) -> str:
normalized = re.sub(r"\s+", " ", text).strip()
if not normalized:
return fallback
parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", normalized) if p.strip()]
selected: list[str] = []
for keyword in keywords:
for part in parts:
if keyword in part and part not in selected:
selected.append(part)
break
if len(selected) >= limit:
break
if selected:
return " ".join(selected[:limit])
return fallback
def _plain_text(value: str) -> str:
text = value or ''
text = re.sub(r'
', '\n', text, flags=re.I)
text = re.sub(r'<[^>]+>', ' ', text)
text = text.replace('**', '').replace('*', ' ')
text = text.replace('<', '<').replace('>', '>').replace('&', '&')
text = re.sub(r'!\[[^\]]*\]\([^\)]*\)', ' ', text)
text = re.sub(r'\[[^\]]+\]\([^\)]*\)', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def _bulletish_lines(text: str, limit: int = 6) -> list[str]:
normalized = re.sub(r"\s+", " ", text or "").strip()
if not normalized:
return []
parts = re.split(r"(?:•|\*\*[^*]+\*\*:?|\s+-\s+|\.\s+)", normalized)
cleaned = []
for part in parts:
item = re.sub(r"\s+", " ", part).strip(" -•")
if not item:
continue
if len(item) < 6:
continue
cleaned.append(item)
if cleaned:
return cleaned[:limit]
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\s+", normalized) if s.strip()]
return sentences[:limit]
def _markdown_section(text: str, start_marker: str, end_marker: str | None = None) -> str:
start = text.find(start_marker)
if start == -1:
return ''
chunk = text[start + len(start_marker):]
if end_marker:
end = chunk.find(end_marker)
if end != -1:
chunk = chunk[:end]
return chunk.strip()
def _content_after_frontmatter(raw: str) -> str:
if raw.startswith('---'):
parts = raw.split('---', 2)
if len(parts) == 3:
return parts[2].strip()
return raw
def _content_after_frontmatter(raw: str) -> str:
if raw.startswith('---'):
parts = raw.split('---', 2)
if len(parts) == 3:
return parts[2].strip()
return raw
def _problem_bullets_from_raw(raw: str) -> list[str]:
content = _content_after_frontmatter(raw)
before_sep = content.split('
\n---', 1)[0]
bullets = []
for line in before_sep.splitlines():
stripped = line.strip()
if stripped.startswith('* ') and not stripped.startswith('* **'):
bullets.append(_plain_text(stripped[2:]))
return [b for b in bullets if b]
def _details_blocks(raw: str) -> list[str]:
return re.findall(r'