"""Phase Q 단독 테스트 스크립트. 기존 run의 step1 결과물(analysis, concepts)을 재사용하여 블록 선택 → 콘텐츠 채우기 → 렌더링만 실행한다. Kei 분석(~13분)을 건너뛰고 Phase Q 로직만 검증. 사용법: python scripts/test_phase_q.py [run_id] python scripts/test_phase_q.py 1774736083771 """ from __future__ import annotations import asyncio import json import sys import time from pathlib import Path # 프로젝트 루트를 sys.path에 추가 ROOT = Path(__file__).parent.parent sys.path.insert(0, str(ROOT)) async def run_phase_q_test(run_id: str): """기존 run의 step1 결과를 사용하여 Phase Q만 실행.""" from src.block_selector import select_block_candidates, select_fallback_candidates, load_catalog from src.space_allocator import ( calculate_container_specs, finalize_block_specs, find_container_for_topic, calculate_char_budget, calculate_budgets_for_candidates, ) from src.design_director import select_preset, LAYOUT_PRESETS from src.renderer import render_slide from src.slide_measurer import measure_rendered_heights, capture_slide_screenshot run_dir = ROOT / "data" / "runs" / run_id # 매 실행마다 새 폴더 생성 (타임스탬프) import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") out_dir = ROOT / "data" / "runs" / f"{run_id}_q_{timestamp}" out_dir.mkdir(parents=True, exist_ok=True) print(f"[Phase Q 테스트] run={run_id}") print(f" 입력: {run_dir}") print(f" 출력: {out_dir}") print() # ── Step 1 결과 로딩 (기존 것 재사용) ── analysis = json.loads((run_dir / "step1_analysis.json").read_text(encoding="utf-8")) concepts = json.loads((run_dir / "step1b_concepts.json").read_text(encoding="utf-8")) # concepts에서 relation_type을 analysis topics에 병합 concept_map = {c["id"]: c for c in concepts.get("concepts", [])} for topic in analysis.get("topics", []): tid = topic["id"] if tid in concept_map: topic["relation_type"] = concept_map[tid].get("relation_type", "none") topic["expression_hint"] = concept_map[tid].get("expression_hint", "") topic["source_data"] = concept_map[tid].get("source_data", "") # 원본 콘텐츠 (step1에 저장 안 되어 있으면 직접 입력) content_file = run_dir / "input_content.txt" if content_file.exists(): content = content_file.read_text(encoding="utf-8") else: content = """# 건설산업 DX의 올바른 이해 ## 용어의 혼용 건설산업에서 DX(Digital Transformation)와 BIM(Building Information Modeling)이 동일 개념으로 인식되고 있다. 실질적으로 DX는 산업 전반의 프로세스를 혁신하는 상위개념이며, BIM은 3차원 모델 기반의 정보 관리 도구로서 DX의 하위 기술에 해당한다. ## 혼용 대표 사례 1. 스마트 건설 활성화 방안(2022.07): 추진과제를 건설산업 디지털화로 명시하면서 실행과제는 BIM 전면 도입에 국한 2. 제7차 건설기술진흥 기본계획(2023.12): 추진방향을 디지털 전환으로 제시하면서 추진과제는 BIM 도입으로 한정 ## DX와 핵심기술의 올바른 관계 DX는 BIM, GIS, 디지털 트윈 등 핵심기술의 융합을 통해서만 실현 가능한 상위개념이다. - GIS: 지리적 데이터를 공간 분석하여 시각적으로 표현 - BIM: 시설물 생애주기 정보를 3차원 모델로 통합 관리 - 디지털 트윈: 현실 객체를 디지털로 동일하게 구현 ## 용어별 정의 - 건설산업: 광범위한 기술을 통합 융합하여 만드는 종합산업 - BIM: 3차원 모델 기반으로 통합 관리하는 정보 관리 도구 - DX: 업무방식과 가치 창출 구조를 전환하는 과정 및 결과 ## 핵심 요약 BIM은 DX의 기초가 되는 일부분이다. 각 용어의 정의와 상호관계에 대한 체계적 정립이 필요하다. """ topics = analysis.get("topics", []) page_structure = analysis.get("page_structure", {}) print(f" topics: {len(topics)}개") for t in topics: print(f" t{t['id']}: {t['title']} (relation={t.get('relation_type', '?')}, purpose={t.get('purpose', '?')})") print() # ── 컨테이너 계산 ── t0 = time.time() preset_name = select_preset(analysis) preset = LAYOUT_PRESETS.get(preset_name, {}) container_specs = calculate_container_specs(page_structure, topics, preset) print(f"[{time.time()-t0:.1f}s] 컨테이너 계산 완료:") for role, spec in container_specs.items(): print(f" {role}: {spec.height_px}px × {spec.width_px}px, topics={spec.topic_ids}") _save(out_dir, "step1c_containers.json", { role: {"height_px": s.height_px, "width_px": s.width_px, "topic_ids": s.topic_ids, "max_height_cost": s.max_height_cost, "weight": s.weight} for role, s in container_specs.items() }) # ── Q-2: 블록 후보 필터링 (결정론적) ── catalog = load_catalog() used_blocks: set[str] = set() candidates_per_topic: dict[int, list[dict]] = {} budgets_per_topic: dict[int, dict[str, dict]] = {} print(f"\n[{time.time()-t0:.1f}s] Q-2: 블록 후보 필터링") for topic in topics: tid = topic["id"] spec = find_container_for_topic(tid, container_specs) if not spec: print(f" t{tid}: 컨테이너 없음!") continue candidates = select_block_candidates(topic, spec, used_blocks, catalog) if not candidates: candidates = select_fallback_candidates(spec, used_blocks, catalog) print(f" t{tid}: fallback → {len(candidates)}개") candidates_per_topic[tid] = candidates budgets_per_topic[tid] = calculate_budgets_for_candidates(candidates, spec) per_topic_px = spec.height_px // max(1, len(spec.topic_ids)) print(f" t{tid} ({topic.get('relation_type', '?')}, {per_topic_px}px): " f"{len(candidates)}개 → [{', '.join(c['id'] for c in candidates[:5])}]") _save(out_dir, "step2_candidates.json", { str(tid): [{"id": c["id"], "category": c.get("category")} for c in cs[:5]] for tid, cs in candidates_per_topic.items() }) # ── Q-4: Kei 블록 선택 (AI 1회) ── print(f"\n[{time.time()-t0:.1f}s] Q-4: Kei 블록 선택 중... (AI 호출)") from src.kei_client import select_block_for_topics selections = None for attempt in range(5): selections = await select_block_for_topics( topics, candidates_per_topic, budgets_per_topic, container_specs, analysis ) if selections: break print(f" 재시도 {attempt + 1}/5...") await asyncio.sleep(10) if not selections: print(" ❌ Kei 블록 선택 실패") return print(f"[{time.time()-t0:.1f}s] 블록 선택 완료:") selected_blocks: dict[int, dict] = {} for topic in topics: tid = topic["id"] sel = selections.get(tid, {}) block_id = sel.get("block_id", "") spec = find_container_for_topic(tid, container_specs) if not block_id and candidates_per_topic.get(tid): block_id = candidates_per_topic[tid][0]["id"] used_blocks.add(block_id) budget = budgets_per_topic.get(tid, {}).get(block_id, {}) variant = sel.get("variant", "default") block = { "type": block_id, "_variant": variant, "topic_id": tid, "area": spec.zone if spec else "body", "purpose": topic.get("purpose", ""), "_char_budget": budget, } finalize_block_specs([block], container_specs) selected_blocks[tid] = block variant_label = f" [{variant}]" if variant != "default" else "" print(f" t{tid}: {block_id}{variant_label} (예산: {budget.get('total_chars', '?')}자) — {sel.get('reason', '')[:50]}") _save(out_dir, "step2_selection.json", { str(tid): {"type": b["type"], "variant": b.get("_variant", "default"), "area": b["area"], "budget": b.get("_char_budget", {}), "reason": selections.get(tid, {}).get("reason", "")} for tid, b in selected_blocks.items() }) # ── layout_concept 조립 ── final_blocks = [] # sidebar label sidebar_tids = [tid for tid, b in selected_blocks.items() if b.get("area") == "sidebar"] if sidebar_tids: first_topic = next((t for t in topics if t["id"] == sidebar_tids[0]), {}) section_title = first_topic.get("section_title", "") if not section_title: purpose = first_topic.get("purpose", "") section_title = {"용어정의": "용어 정의", "근거사례": "참고 자료"}.get(purpose, "") if section_title: final_blocks.append({ "area": "sidebar", "type": "divider-text", "topic_id": None, "purpose": "_label", "data": {"text": section_title}, "size": "compact", }) role_order = ["배경", "본심", "첨부", "결론"] for role in role_order: spec = container_specs.get(role) if not spec: continue for tid in spec.topic_ids: block = selected_blocks.get(tid) if block: final_blocks.append(block) layout_concept = { "title": analysis.get("title", "슬라이드"), "_container_specs": container_specs, "pages": [{ "grid_areas": preset["grid_areas"], "grid_columns": preset["grid_columns"], "grid_rows": preset["grid_rows"], "blocks": final_blocks, }], } print(f"\n[{time.time()-t0:.1f}s] 레이아웃 조립: {len(final_blocks)}개 블록") # ── Step 3: topic별 개별 호출 (Phase P fill_candidates 방식 복원) ── print(f"[{time.time()-t0:.1f}s] Step 3: Kei 편집자 텍스트 채우기 중 (topic별 개별)...") from src.content_editor import fill_candidates for topic in topics: tid = topic["id"] block = selected_blocks.get(tid) if not block: continue await fill_candidates(content, topic, [block], analysis) has_data = bool(block.get("data")) char_count = len(json.dumps(block.get("data", {}), ensure_ascii=False)) if has_data else 0 print(f" t{tid}: {block['type']} → {'✅' if has_data else '❌'} ({char_count}자)") blocks_with_data = [b for b in final_blocks if b.get("data") and b.get("topic_id") is not None] blocks_without_data = [b for b in final_blocks if not b.get("data") and b.get("topic_id") is not None] print(f"[{time.time()-t0:.1f}s] 텍스트 채우기 완료:") print(f" 데이터 있음: {len(blocks_with_data)}개 — {[b['type'] for b in blocks_with_data]}") if blocks_without_data: print(f" 데이터 없음: {len(blocks_without_data)}개 — {[b['type'] for b in blocks_without_data]}") _save(out_dir, "step3_fill_content.json", { "filled": len(blocks_with_data), "empty": len(blocks_without_data), "blocks": [ {"type": b["type"], "topic_id": b.get("topic_id"), "has_data": bool(b.get("data")), "data_preview": str(b.get("data", {}))[:100]} for b in final_blocks if b.get("topic_id") is not None ] }) # ── Step 4: CSS 조정 + 렌더링 ── print(f"\n[{time.time()-t0:.1f}s] Step 4: CSS 조정 + 렌더링...") from src.pipeline import _adjust_design layout_concept = await _adjust_design(layout_concept, analysis) html = render_slide(layout_concept) _save(out_dir, "step4_rendered.html", html) print(f"[{time.time()-t0:.1f}s] HTML 생성: {len(html)}자") # ── 측정 ── print(f"[{time.time()-t0:.1f}s] Selenium 측정 중...") measurement = await asyncio.to_thread(measure_rendered_heights, html) _save(out_dir, "step4_measurement.json", measurement) has_overflow = False for name, data in measurement.get("containers", {}).items(): status = "✅" if not data.get("overflowed") else "❌" print(f" {name}: {data.get('scrollHeight', 0)}px / {data.get('allocatedHeight', 0)}px {status}") if data.get("overflowed"): has_overflow = True slide_data = measurement.get("slide", {}) slide_status = "✅" if not slide_data.get("overflowed") else "❌" print(f" slide: {slide_data.get('scrollHeight', 0)}px / 720px {slide_status}") # ── 스크린샷 ── screenshot_b64 = await asyncio.to_thread(capture_slide_screenshot, html) if screenshot_b64: import base64 png_path = out_dir / "screenshot.png" png_path.write_bytes(base64.b64decode(screenshot_b64)) print(f"\n[{time.time()-t0:.1f}s] 스크린샷 저장: {png_path}") # ── final.html 저장 ── _save(out_dir, "final.html", html) # ── 결과 요약 ── total = time.time() - t0 print(f"\n{'='*50}") print(f"Phase Q 테스트 완료: {total:.1f}초") print(f" 블록 다양성: {len(set(b['type'] for b in final_blocks))}종류") print(f" 데이터 채움: {len(blocks_with_data)}/{len([b for b in final_blocks if b.get('topic_id') is not None])}개") print(f" overflow: {'없음 ✅' if not has_overflow else '있음 ❌'}") print(f" 출력: {out_dir}") print(f"{'='*50}") def _save(out_dir: Path, filename: str, data): path = out_dir / filename if isinstance(data, str): path.write_text(data, encoding="utf-8") else: path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") if __name__ == "__main__": import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s", datefmt="%H:%M:%S", ) # 너무 시끄러운 로거 조용히 logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("selenium").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) run_id = sys.argv[1] if len(sys.argv) > 1 else "1774736083771" asyncio.run(run_phase_q_test(run_id))