Files
C.E.L_Slide_test2/scripts/test_phase_q.py
kyeongmin 29f56187c0 Phase P~S 전체 작업물: 검증 스크립트, 블록 템플릿, 설계 문서, 코드 수정
포함 내용:
- Phase P/Q/R/S 설계 문서 (IMPROVEMENT-PHASE-*.md)
- 영역별 검증 스크립트 (scripts/verify_*.py, test_*.py)
- 블록 템플릿 추가 (cards, emphasis 변형)
- 코드 수정: block_search, content_editor, design_director, slide_measurer
- catalog.yaml 블록 목록 업데이트
- CLAUDE.md, PROGRESS.md, README.md 업데이트

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 08:38:06 +09:00

347 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Phase Q 단독 테스트 스크립트.
기존 run의 step1 결과물(analysis, concepts)을 재사용하여
블록 선택 → 콘텐츠 채우기 → 렌더링만 실행한다.
Kei 분석(~13분)을 건너뛰고 Phase Q 로직만 검증.
사용법:
python scripts/test_phase_q.py [run_id]
python scripts/test_phase_q.py 1774736083771
"""
from __future__ import annotations
import asyncio
import json
import sys
import time
from pathlib import Path
# 프로젝트 루트를 sys.path에 추가
ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(ROOT))
async def run_phase_q_test(run_id: str):
"""기존 run의 step1 결과를 사용하여 Phase Q만 실행."""
from src.block_selector import select_block_candidates, select_fallback_candidates, load_catalog
from src.space_allocator import (
calculate_container_specs, finalize_block_specs, find_container_for_topic,
calculate_char_budget, calculate_budgets_for_candidates,
)
from src.design_director import select_preset, LAYOUT_PRESETS
from src.renderer import render_slide
from src.slide_measurer import measure_rendered_heights, capture_slide_screenshot
run_dir = ROOT / "data" / "runs" / run_id
# 매 실행마다 새 폴더 생성 (타임스탬프)
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = ROOT / "data" / "runs" / f"{run_id}_q_{timestamp}"
out_dir.mkdir(parents=True, exist_ok=True)
print(f"[Phase Q 테스트] run={run_id}")
print(f" 입력: {run_dir}")
print(f" 출력: {out_dir}")
print()
# ── Step 1 결과 로딩 (기존 것 재사용) ──
analysis = json.loads((run_dir / "step1_analysis.json").read_text(encoding="utf-8"))
concepts = json.loads((run_dir / "step1b_concepts.json").read_text(encoding="utf-8"))
# concepts에서 relation_type을 analysis topics에 병합
concept_map = {c["id"]: c for c in concepts.get("concepts", [])}
for topic in analysis.get("topics", []):
tid = topic["id"]
if tid in concept_map:
topic["relation_type"] = concept_map[tid].get("relation_type", "none")
topic["expression_hint"] = concept_map[tid].get("expression_hint", "")
topic["source_data"] = concept_map[tid].get("source_data", "")
# 원본 콘텐츠 (step1에 저장 안 되어 있으면 직접 입력)
content_file = run_dir / "input_content.txt"
if content_file.exists():
content = content_file.read_text(encoding="utf-8")
else:
content = """# 건설산업 DX의 올바른 이해
## 용어의 혼용
건설산업에서 DX(Digital Transformation)와 BIM(Building Information Modeling)이 동일 개념으로 인식되고 있다.
실질적으로 DX는 산업 전반의 프로세스를 혁신하는 상위개념이며, BIM은 3차원 모델 기반의 정보 관리 도구로서 DX의 하위 기술에 해당한다.
## 혼용 대표 사례
1. 스마트 건설 활성화 방안(2022.07): 추진과제를 건설산업 디지털화로 명시하면서 실행과제는 BIM 전면 도입에 국한
2. 제7차 건설기술진흥 기본계획(2023.12): 추진방향을 디지털 전환으로 제시하면서 추진과제는 BIM 도입으로 한정
## DX와 핵심기술의 올바른 관계
DX는 BIM, GIS, 디지털 트윈 등 핵심기술의 융합을 통해서만 실현 가능한 상위개념이다.
- GIS: 지리적 데이터를 공간 분석하여 시각적으로 표현
- BIM: 시설물 생애주기 정보를 3차원 모델로 통합 관리
- 디지털 트윈: 현실 객체를 디지털로 동일하게 구현
## 용어별 정의
- 건설산업: 광범위한 기술을 통합 융합하여 만드는 종합산업
- BIM: 3차원 모델 기반으로 통합 관리하는 정보 관리 도구
- DX: 업무방식과 가치 창출 구조를 전환하는 과정 및 결과
## 핵심 요약
BIM은 DX의 기초가 되는 일부분이다. 각 용어의 정의와 상호관계에 대한 체계적 정립이 필요하다.
"""
topics = analysis.get("topics", [])
page_structure = analysis.get("page_structure", {})
print(f" topics: {len(topics)}")
for t in topics:
print(f" t{t['id']}: {t['title']} (relation={t.get('relation_type', '?')}, purpose={t.get('purpose', '?')})")
print()
# ── 컨테이너 계산 ──
t0 = time.time()
preset_name = select_preset(analysis)
preset = LAYOUT_PRESETS.get(preset_name, {})
container_specs = calculate_container_specs(page_structure, topics, preset)
print(f"[{time.time()-t0:.1f}s] 컨테이너 계산 완료:")
for role, spec in container_specs.items():
print(f" {role}: {spec.height_px}px × {spec.width_px}px, topics={spec.topic_ids}")
_save(out_dir, "step1c_containers.json", {
role: {"height_px": s.height_px, "width_px": s.width_px, "topic_ids": s.topic_ids,
"max_height_cost": s.max_height_cost, "weight": s.weight}
for role, s in container_specs.items()
})
# ── Q-2: 블록 후보 필터링 (결정론적) ──
catalog = load_catalog()
used_blocks: set[str] = set()
candidates_per_topic: dict[int, list[dict]] = {}
budgets_per_topic: dict[int, dict[str, dict]] = {}
print(f"\n[{time.time()-t0:.1f}s] Q-2: 블록 후보 필터링")
for topic in topics:
tid = topic["id"]
spec = find_container_for_topic(tid, container_specs)
if not spec:
print(f" t{tid}: 컨테이너 없음!")
continue
candidates = select_block_candidates(topic, spec, used_blocks, catalog)
if not candidates:
candidates = select_fallback_candidates(spec, used_blocks, catalog)
print(f" t{tid}: fallback → {len(candidates)}")
candidates_per_topic[tid] = candidates
budgets_per_topic[tid] = calculate_budgets_for_candidates(candidates, spec)
per_topic_px = spec.height_px // max(1, len(spec.topic_ids))
print(f" t{tid} ({topic.get('relation_type', '?')}, {per_topic_px}px): "
f"{len(candidates)}개 → [{', '.join(c['id'] for c in candidates[:5])}]")
_save(out_dir, "step2_candidates.json", {
str(tid): [{"id": c["id"], "category": c.get("category")} for c in cs[:5]]
for tid, cs in candidates_per_topic.items()
})
# ── Q-4: Kei 블록 선택 (AI 1회) ──
print(f"\n[{time.time()-t0:.1f}s] Q-4: Kei 블록 선택 중... (AI 호출)")
from src.kei_client import select_block_for_topics
selections = None
for attempt in range(5):
selections = await select_block_for_topics(
topics, candidates_per_topic, budgets_per_topic,
container_specs, analysis
)
if selections:
break
print(f" 재시도 {attempt + 1}/5...")
await asyncio.sleep(10)
if not selections:
print(" ❌ Kei 블록 선택 실패")
return
print(f"[{time.time()-t0:.1f}s] 블록 선택 완료:")
selected_blocks: dict[int, dict] = {}
for topic in topics:
tid = topic["id"]
sel = selections.get(tid, {})
block_id = sel.get("block_id", "")
spec = find_container_for_topic(tid, container_specs)
if not block_id and candidates_per_topic.get(tid):
block_id = candidates_per_topic[tid][0]["id"]
used_blocks.add(block_id)
budget = budgets_per_topic.get(tid, {}).get(block_id, {})
variant = sel.get("variant", "default")
block = {
"type": block_id,
"_variant": variant,
"topic_id": tid,
"area": spec.zone if spec else "body",
"purpose": topic.get("purpose", ""),
"_char_budget": budget,
}
finalize_block_specs([block], container_specs)
selected_blocks[tid] = block
variant_label = f" [{variant}]" if variant != "default" else ""
print(f" t{tid}: {block_id}{variant_label} (예산: {budget.get('total_chars', '?')}자) — {sel.get('reason', '')[:50]}")
_save(out_dir, "step2_selection.json", {
str(tid): {"type": b["type"], "variant": b.get("_variant", "default"),
"area": b["area"], "budget": b.get("_char_budget", {}),
"reason": selections.get(tid, {}).get("reason", "")}
for tid, b in selected_blocks.items()
})
# ── layout_concept 조립 ──
final_blocks = []
# sidebar label
sidebar_tids = [tid for tid, b in selected_blocks.items() if b.get("area") == "sidebar"]
if sidebar_tids:
first_topic = next((t for t in topics if t["id"] == sidebar_tids[0]), {})
section_title = first_topic.get("section_title", "")
if not section_title:
purpose = first_topic.get("purpose", "")
section_title = {"용어정의": "용어 정의", "근거사례": "참고 자료"}.get(purpose, "")
if section_title:
final_blocks.append({
"area": "sidebar", "type": "divider-text",
"topic_id": None, "purpose": "_label",
"data": {"text": section_title}, "size": "compact",
})
role_order = ["배경", "본심", "첨부", "결론"]
for role in role_order:
spec = container_specs.get(role)
if not spec:
continue
for tid in spec.topic_ids:
block = selected_blocks.get(tid)
if block:
final_blocks.append(block)
layout_concept = {
"title": analysis.get("title", "슬라이드"),
"_container_specs": container_specs,
"pages": [{
"grid_areas": preset["grid_areas"],
"grid_columns": preset["grid_columns"],
"grid_rows": preset["grid_rows"],
"blocks": final_blocks,
}],
}
print(f"\n[{time.time()-t0:.1f}s] 레이아웃 조립: {len(final_blocks)}개 블록")
# ── Step 3: topic별 개별 호출 (Phase P fill_candidates 방식 복원) ──
print(f"[{time.time()-t0:.1f}s] Step 3: Kei 편집자 텍스트 채우기 중 (topic별 개별)...")
from src.content_editor import fill_candidates
for topic in topics:
tid = topic["id"]
block = selected_blocks.get(tid)
if not block:
continue
await fill_candidates(content, topic, [block], analysis)
has_data = bool(block.get("data"))
char_count = len(json.dumps(block.get("data", {}), ensure_ascii=False)) if has_data else 0
print(f" t{tid}: {block['type']}{'' if has_data else ''} ({char_count}자)")
blocks_with_data = [b for b in final_blocks if b.get("data") and b.get("topic_id") is not None]
blocks_without_data = [b for b in final_blocks if not b.get("data") and b.get("topic_id") is not None]
print(f"[{time.time()-t0:.1f}s] 텍스트 채우기 완료:")
print(f" 데이터 있음: {len(blocks_with_data)}개 — {[b['type'] for b in blocks_with_data]}")
if blocks_without_data:
print(f" 데이터 없음: {len(blocks_without_data)}개 — {[b['type'] for b in blocks_without_data]}")
_save(out_dir, "step3_fill_content.json", {
"filled": len(blocks_with_data),
"empty": len(blocks_without_data),
"blocks": [
{"type": b["type"], "topic_id": b.get("topic_id"),
"has_data": bool(b.get("data")),
"data_preview": str(b.get("data", {}))[:100]}
for b in final_blocks if b.get("topic_id") is not None
]
})
# ── Step 4: CSS 조정 + 렌더링 ──
print(f"\n[{time.time()-t0:.1f}s] Step 4: CSS 조정 + 렌더링...")
from src.pipeline import _adjust_design
layout_concept = await _adjust_design(layout_concept, analysis)
html = render_slide(layout_concept)
_save(out_dir, "step4_rendered.html", html)
print(f"[{time.time()-t0:.1f}s] HTML 생성: {len(html)}")
# ── 측정 ──
print(f"[{time.time()-t0:.1f}s] Selenium 측정 중...")
measurement = await asyncio.to_thread(measure_rendered_heights, html)
_save(out_dir, "step4_measurement.json", measurement)
has_overflow = False
for name, data in measurement.get("containers", {}).items():
status = "" if not data.get("overflowed") else ""
print(f" {name}: {data.get('scrollHeight', 0)}px / {data.get('allocatedHeight', 0)}px {status}")
if data.get("overflowed"):
has_overflow = True
slide_data = measurement.get("slide", {})
slide_status = "" if not slide_data.get("overflowed") else ""
print(f" slide: {slide_data.get('scrollHeight', 0)}px / 720px {slide_status}")
# ── 스크린샷 ──
screenshot_b64 = await asyncio.to_thread(capture_slide_screenshot, html)
if screenshot_b64:
import base64
png_path = out_dir / "screenshot.png"
png_path.write_bytes(base64.b64decode(screenshot_b64))
print(f"\n[{time.time()-t0:.1f}s] 스크린샷 저장: {png_path}")
# ── final.html 저장 ──
_save(out_dir, "final.html", html)
# ── 결과 요약 ──
total = time.time() - t0
print(f"\n{'='*50}")
print(f"Phase Q 테스트 완료: {total:.1f}")
print(f" 블록 다양성: {len(set(b['type'] for b in final_blocks))}종류")
print(f" 데이터 채움: {len(blocks_with_data)}/{len([b for b in final_blocks if b.get('topic_id') is not None])}")
print(f" overflow: {'없음 ✅' if not has_overflow else '있음 ❌'}")
print(f" 출력: {out_dir}")
print(f"{'='*50}")
def _save(out_dir: Path, filename: str, data):
path = out_dir / filename
if isinstance(data, str):
path.write_text(data, encoding="utf-8")
else:
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
if __name__ == "__main__":
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
# 너무 시끄러운 로거 조용히
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("selenium").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
run_id = sys.argv[1] if len(sys.argv) > 1 else "1774736083771"
asyncio.run(run_phase_q_test(run_id))