"""Phase S 테스트: 각 스텝별 결과물을 폴더에 정리. 각 스텝을 순서대로 실행하고, 중간 산출물을 JSON + PNG로 저장. step1/ step2/ step3/ step4/ 폴더로 분리. 사용법: python scripts/test_phase_s.py [run_id] """ from __future__ import annotations import asyncio, json, sys, time, datetime, base64 from pathlib import Path ROOT = Path(__file__).parent.parent sys.path.insert(0, str(ROOT)) async def main(run_id: str): from src.html_generator import generate_slide_html from src.html_validator import validate_and_clean_html from src.content_verifier import generate_with_retry, verify_all_areas from src.renderer import render_slide_from_html from src.slide_measurer import measure_rendered_heights, capture_slide_screenshot from src.design_director import select_preset, LAYOUT_PRESETS from src.space_allocator import calculate_container_specs from src.image_utils import get_image_sizes run_dir = ROOT / "data" / "runs" / run_id timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") out_dir = ROOT / "data" / "runs" / f"{run_id}_phaseS_{timestamp}" out_dir.mkdir(parents=True, exist_ok=True) print(f"[Phase S 테스트]") print(f" 입력: {run_dir}") print(f" 출력: {out_dir}") print() # ── Step 1 결과 로딩 (기존 Kei 분석 재사용) ── step1_dir = out_dir / "step1_kei_analysis" step1_dir.mkdir(exist_ok=True) analysis = json.loads((run_dir / "step1_analysis.json").read_text(encoding="utf-8")) concepts = json.loads((run_dir / "step1b_concepts.json").read_text(encoding="utf-8")) # concepts 병합 concept_map = {c["id"]: c for c in concepts.get("concepts", [])} for topic in analysis.get("topics", []): tid = topic["id"] if tid in concept_map: topic["relation_type"] = concept_map[tid].get("relation_type", "none") topic["expression_hint"] = concept_map[tid].get("expression_hint", "") topic["source_data"] = concept_map[tid].get("source_data", "") _save(step1_dir, "analysis.json", analysis) _save(step1_dir, "concepts.json", concepts) topics = analysis["topics"] t0 = time.time() print(f"[Step 1] Kei 분석 결과 로딩 완료") print(f" 제목: {analysis.get('title', '')}") print(f" 핵심 메시지: {analysis.get('core_message', '')}") print(f" topics: {len(topics)}개") for t in topics: print(f" t{t['id']}: {t['title']} ({t.get('purpose', '')} / {t.get('relation_type', '')})") # 원본 콘텐츠 content = """# 건설산업 DX의 올바른 이해 ## 용어의 혼용 건설산업에서 DX(Digital Transformation)와 BIM(Building Information Modeling)이 동일 개념으로 인식되고 있다. 실질적으로 DX는 산업 전반의 프로세스를 혁신하는 상위개념이며, BIM은 3차원 모델 기반의 정보 관리 도구로서 DX의 하위 기술에 해당한다. 그러나 현장에서는 BIM 도입만으로 DX가 완성된 것으로 오인하는 사례가 빈번하다. ## 혼용 대표 사례 1. 스마트 건설 활성화 방안(2022.07): 추진과제를 건설산업 디지털화로 명시하면서 실행과제는 BIM 전면 도입, BIM 전문인력 양성에 국한 2. 제7차 건설기술진흥 기본계획(2023.12): 추진방향을 디지털 전환을 통한 스마트 건설 확산으로 제시하면서 추진과제는 BIM 도입으로 건설산업 디지털화로 한정 ## DX와 핵심기술의 올바른 관계 DX는 BIM, GIS, 디지털 트윈 등 핵심기술의 융합을 통해서만 실현 가능한 상위개념이다. - GIS: 지리적 데이터를 공간 분석하여 시각적으로 표현, 위치기반 정보 제공 - BIM: 시설물의 생애주기 동안 발생한 모든 정보를 3차원 모델 기반으로 통합·관리하는 정보 관리 도구 - 디지털 트윈: 현실 세계의 물리적 객체나 시스템을 디지털 환경에 동일하게 구현하는 기술 DX는 이들 기술을 통합하여 업무방식과 가치 창출 구조를 근본적으로 전환하는 과정 및 결과이다. ## 용어별 정의 - 건설산업: 부동산 개발, 설계, 시공, 유지보수를 포괄하는 종합산업으로, 광범위한 기술을 통합·융합하여 인프라를 만드는 산업 - BIM(Building Information Modeling): 형상정보와 속성정보가 포함된 3D 모델로, 시설물의 생애주기 동안 발생한 모든 정보를 3차원 모델 기반으로 통합·관리하는 정보 관리 도구. 건설 정보와 절차를 표준화된 방식으로 연계하고 디지털 협업이 가능하도록 하는 핵심 인프라 기술 - DX(Digital Transformation): 디지털 기술을 활용하여 업무방식과 가치 창출 구조를 전환하는 과정 및 결과. 단순한 기술 도입이 아닌, 고객 가치와 의사결정 방식의 근본적인 변화로 산업의 새로운 방향을 정립 ## 핵심 요약 BIM은 건설산업의 디지털전환(DX)을 수행하는 과정에서 가장 기초가 되는 일부분이다. 각 용어의 정의, 역할, 상호관계에 대한 체계적 정립이 필요하다. """ # ── Step 1.5: 컨테이너 계산 ── step1c_dir = out_dir / "step1c_containers" step1c_dir.mkdir(exist_ok=True) preset_name = select_preset(analysis) preset = LAYOUT_PRESETS[preset_name] container_specs = calculate_container_specs( analysis.get("page_structure", {}), topics, preset ) container_info = { role: {"height_px": s.height_px, "width_px": s.width_px, "topic_ids": s.topic_ids, "weight": s.weight} for role, s in container_specs.items() } _save(step1c_dir, "containers.json", container_info) _save(step1c_dir, "preset.json", {"name": preset_name, "grid_areas": preset.get("grid_areas", ""), "grid_columns": preset.get("grid_columns", "")}) print(f"\n[Step 1.5] 컨테이너 계산 완료 ({time.time()-t0:.0f}s)") for role, spec in container_specs.items(): print(f" {role}: {spec.height_px}px × {spec.width_px}px, topics={spec.topic_ids}") # 이미지 정보 # dx1.png를 본심(topic 3)에 사용 dx1_path = Path("D:/ad-hoc/cel/public/assets/images/dx1.png") slide_images = [] if dx1_path.exists(): from PIL import Image as PILImage img = PILImage.open(dx1_path) img_b64 = base64.b64encode(dx1_path.read_bytes()).decode() slide_images.append({ "path": str(dx1_path), "width": img.width, "height": img.height, "ratio": round(img.width / max(1, img.height), 2), "topic_id": 3, "b64": img_b64, }) print(f"\n 이미지: dx1.png ({img.width}×{img.height}px, topic 3)") # ══════════════════════════════════════ # ★ Step 2: Claude Sonnet HTML 생성 # ══════════════════════════════════════ step2_dir = out_dir / "step2_html_generation" step2_dir.mkdir(exist_ok=True) print(f"\n[Step 2] Claude Sonnet HTML 생성 + 검증 루프... ({time.time()-t0:.0f}s)") generated, verification = await generate_with_retry( content=content, analysis=analysis, container_specs=container_specs, preset=preset, images=slide_images, max_retries=2, ) _save(step2_dir, "generated_meta.json", { "body_html_length": len(generated.get("body_html", "")), "sidebar_html_length": len(generated.get("sidebar_html", "")), "footer_html_length": len(generated.get("footer_html", "")), "reasoning": generated.get("reasoning", ""), }) _save(step2_dir, "body.html", generated.get("body_html", "")) _save(step2_dir, "sidebar.html", generated.get("sidebar_html", "")) _save(step2_dir, "footer.html", generated.get("footer_html", "")) # 검증 결과 저장 step2b_dir = out_dir / "step2b_verification" step2b_dir.mkdir(exist_ok=True) for area_name, result in verification.items(): _save(step2b_dir, f"{area_name}.json", { "passed": result.passed, "score": result.score, "checks": result.checks, "errors": result.errors, "warnings": result.warnings, }) status = "✅ PASS" if result.passed else f"❌ FAIL" print(f" 검증 {area_name}: {status} (score={result.score:.0%}, errors={len(result.errors)})") for err in result.errors: print(f" ⚠ {err}") print(f" 완료 ({time.time()-t0:.0f}s)") print(f" body: {len(generated.get('body_html', ''))}자") print(f" sidebar: {len(generated.get('sidebar_html', ''))}자") print(f" footer: {len(generated.get('footer_html', ''))}자") # 각 영역별 개별 PNG 생성 for area_name, area_html in [("body", generated.get("body_html", "")), ("sidebar", generated.get("sidebar_html", "")), ("footer", generated.get("footer_html", ""))]: if not area_html: continue width = 767 if area_name == "body" else 380 if area_name == "sidebar" else 1088 area_wrapped = _wrap_area(area_html, width) s = await asyncio.to_thread(capture_slide_screenshot, area_wrapped) if s: (step2_dir / f"{area_name}.png").write_bytes(base64.b64decode(s)) print(f" {area_name}.png 저장 완료") # ══════════════════════════════════════ # ★ Step 3: 슬라이드 조립 + 렌더링 # ══════════════════════════════════════ step3_dir = out_dir / "step3_slide" step3_dir.mkdir(exist_ok=True) print(f"\n[Step 3] 슬라이드 조립 + 렌더링... ({time.time()-t0:.0f}s)") html = render_slide_from_html(generated, analysis, preset) _save(step3_dir, "slide.html", html) _save(out_dir, "final.html", html) s = await asyncio.to_thread(capture_slide_screenshot, html) if s: (step3_dir / "slide.png").write_bytes(base64.b64decode(s)) (out_dir / "screenshot.png").write_bytes(base64.b64decode(s)) print(f" slide.png 저장 완료") # ══════════════════════════════════════ # ★ Step 4: 측정 + 품질 검증 # ══════════════════════════════════════ step4_dir = out_dir / "step4_verification" step4_dir.mkdir(exist_ok=True) print(f"\n[Step 4] Selenium 측정... ({time.time()-t0:.0f}s)") measurement = await asyncio.to_thread(measure_rendered_heights, html) _save(step4_dir, "measurement.json", measurement) slide_data = measurement.get("slide", {}) print(f" slide: {slide_data.get('scrollHeight', 0)}px / 720px {'✅' if not slide_data.get('overflowed') else '❌'}") for zone_name, zone_data in measurement.get("zones", {}).items(): status = "✅" if not zone_data.get("overflowed") else f"❌ +{zone_data.get('excess_px', 0)}px" print(f" {zone_name}: {zone_data.get('scrollHeight', 0)}px / {zone_data.get('clientHeight', 0)}px {status}") total = time.time() - t0 print(f"\n{'='*60}") print(f"Phase S 테스트 완료: {total:.0f}초") print(f" 블록 선택: 없음") print(f" 슬롯 채우기: 없음") print(f" HTML 생성: Claude Sonnet 직접 생성") print(f" 결과: {out_dir}") print(f"") print(f" 폴더 구조:") print(f" step1_kei_analysis/ — Kei 분석 결과") print(f" step1c_containers/ — 컨테이너 계산") print(f" step2_html_generation/ — Claude 생성 HTML (영역별 JSON + PNG)") print(f" step3_slide/ — 조립된 슬라이드 (HTML + PNG)") print(f" step4_verification/ — Selenium 측정") print(f" final.html — 최종 결과물") print(f" screenshot.png — 최종 스크린샷") print(f"{'='*60}") def _wrap_area(inner_html: str, width: int) -> str: return f"""
""" def _save(out_dir, name, data): path = out_dir / name if isinstance(data, str): path.write_text(data, encoding="utf-8") else: path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") if __name__ == "__main__": import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s", datefmt="%H:%M:%S", ) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("selenium").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) run_id = sys.argv[1] if len(sys.argv) > 1 else "1774736083771" asyncio.run(main(run_id))