Phase P~S 전체 작업물: 검증 스크립트, 블록 템플릿, 설계 문서, 코드 수정

포함 내용:
- Phase P/Q/R/S 설계 문서 (IMPROVEMENT-PHASE-*.md)
- 영역별 검증 스크립트 (scripts/verify_*.py, test_*.py)
- 블록 템플릿 추가 (cards, emphasis 변형)
- 코드 수정: block_search, content_editor, design_director, slide_measurer
- catalog.yaml 블록 목록 업데이트
- CLAUDE.md, PROGRESS.md, README.md 업데이트

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-31 08:38:06 +09:00
parent 0e4b8c091c
commit 29f56187c0
44 changed files with 9431 additions and 313 deletions

View File

@@ -175,6 +175,38 @@ def search_blocks_for_topics(
return _format_for_prompt(sorted_blocks)
def search_candidates_per_topic(
topics: list[dict],
top_k: int = 2,
) -> dict[int, list[dict]]:
"""Phase P: 각 topic별 FAISS 상위 후보를 반환한다.
Args:
topics: 1단계 꼭지 분석 결과
top_k: topic당 반환할 후보 수
Returns:
{topic_id: [블록 메타데이터 목록]} — 각 topic별 상위 top_k개
"""
if not _ensure_loaded():
return {}
result: dict[int, list[dict]] = {}
for topic in topics:
tid = topic.get("id")
if tid is None:
continue
query = _build_query(topic)
candidates = search_blocks(query, top_k=top_k + 2) # 여유분 확보 (중복 제거용)
result[tid] = candidates[:top_k]
logger.info(
f"[Phase P] topic별 FAISS 후보: "
+ ", ".join(f"t{tid}={[c['id'] for c in cs]}" for tid, cs in result.items())
)
return result
def _build_query(topic: dict) -> str:
"""꼭지 정보에서 검색 쿼리를 생성한다. (Phase M: 역할+관계+표현 추가)"""
parts = [

View File

@@ -28,18 +28,18 @@ EDITOR_PROMPT = """당신은 도메인 전문가이자 콘텐츠 편집자이다
## 핵심 원칙
- **원본 텍스트를 최대한 보존한다.** 슬라이드 공간에 맞게 약간만 축약한다.
- 의미를 바꾸거나 완전히 재작성하지 않는다.
- 팀장이 제시한 글자 가이드는 참고. 의미를 살리려면 가이드를 초과해도 된다.
- 글자수 예산(★ 표시)이 있으면 반드시 지킨다. 초과하면 overflow가 발생한다.
- 예산 내라면 원본을 최대한 보존. 예산 초과 시에만 뒤에서부터 축약.
- 디자인 실무자가 텍스트에 맞게 디자인을 조정할 것이므로, 텍스트를 억지로 자르지 않는다.
- **모든 슬롯을 빠짐없이 채운다. 빈 슬롯 금지.**
## 편집 규칙
- 전체 컨텍스트와 핵심 용어를 보존한다
- 원본 표현을 살리되, 슬라이드에 맞게 약간만 다듬는다
- 개조식(불릿, 번호)으로 작성한다. 줄글 금지.
- 각 블록의 **목적(purpose)**을 보고 해당 목적에 맞는 텍스트를 원본에서 가져온다
- **불릿 항목은 반드시 각각 별도 줄(\n)로 작성한다.** 한 줄에 여러 항목을 넣지 마라.
- 올바른 예: "• 추진과제: 건설산업 디지털화\n• 실행과제: BIM 전면 도입\n• 출처: 국토교통부"
- 잘못된 예: "• 추진과제: 건설산업 디지털화 • 실행과제: BIM 전면 도입 • 출처: 국토교통부"
- 올바른 예: "• 추진과제: 건설산업 디지털화\n• 실행과제: BIM 전면 도입"
- 잘못된 예: "• 추진과제: 건설산업 디지털화 • 실행과제: BIM 전면 도입"
- 출처가 있는 내용은 출처를 반드시 보존한다
- 출처가 없는 수치나 통계를 만들지 않는다
@@ -95,15 +95,31 @@ async def fill_content(
char_guide = block.get("char_guide", {})
topic_id = block.get("topic_id", i + 1)
# Phase Q: topic의 source_data를 찾아서 직접 전달
source_data_text = ""
if analysis:
for topic in analysis.get("topics", []):
if topic.get("id") == topic_id:
sd = topic.get("source_data", "")
if sd:
source_data_text = sd
break
req_text = (
f"블록 {i+1} ({block_type}, 영역: {block.get('area', '?')}, topic_id: {topic_id}):\n"
f" 목적(purpose): {block.get('purpose', '미지정')}\n"
f" 용도: {block.get('reason', '미지정')}\n"
f" 크기: {block.get('size', 'medium')}\n"
f" 필수 슬롯: {slots.get('required', [])}\n"
f" 선택 슬롯: {slots.get('optional', [])}"
)
# source_data를 최우선으로 전달
if source_data_text:
req_text += (
f"\n ★★ source_data (이 텍스트를 그대로 슬롯에 배치하라):\n"
f" {source_data_text}"
)
# I-5: 슬롯 의미 설명 전달 (slot_desc가 있으면)
slot_desc = slots.get("slot_desc", {})
if slot_desc:
@@ -114,9 +130,20 @@ async def fill_content(
guide_lines = [f" {k}: ~{v}" for k, v in char_guide.items()]
req_text += "\n 글자 수 가이드 (참고, 의미 우선):\n" + "\n".join(guide_lines)
# Phase O-4: 컨테이너 기반 블록 스펙 전달
# Phase Q-3: 글자수 예산 전달 (char_budget 우선, 없으면 Phase O 스펙)
char_budget = block.get("_char_budget", {})
container_h = block.get("_container_height_px")
if container_h:
if char_budget:
req_text += (
f"\n ★ 글자수 예산 (하드 제약 — 반드시 준수):"
f"\n - 최대 항목 수: {char_budget.get('max_items', '제한 없음')}"
f"\n - 항목당 최대 글자 수: {char_budget.get('chars_per_item', '제한 없음')}"
f"\n - 총 최대 글자 수: {char_budget.get('total_chars', '제한 없음')}"
f"\n - 폰트 크기: {char_budget.get('font_size_px', 15.2)}px"
f"\n 이 예산은 컨테이너 크기에서 수학적으로 도출됨. 초과 시 overflow 발생."
)
elif container_h:
max_items = block.get("_max_items", "제한 없음")
max_chars_item = block.get("_max_chars_per_item", "제한 없음")
max_chars_total = block.get("_max_chars_total", "제한 없음")
@@ -157,69 +184,102 @@ async def fill_content(
)
user_prompt = (
f"## 원본 콘텐츠\n{content}\n\n"
f"## 원본 콘텐츠 (참고용 — source_data가 있으면 source_data 우선)\n{content}\n\n"
f"## 블록 배치{page_label}\n"
+ "\n".join(slot_requirements)
+ source_section
+ "\n\n## 요청\n"
" 블록별로 슬롯에 들어갈 텍스트를 정리하여 JSON으로 반환해줘.\n"
"원본에서 추출하라. 재작성하지 마라. 축약만 허용.\n"
"자세히보기 대상 블록은 summary + detail 두 버전을 작성해.\n"
"각 블록의 ★★ source_data를 해당 블록 슬롯에 그대로 배치하라.\n"
"source_data의 텍스트를 축약/요약/재작성하지 마라. 그대로 넣어라.\n"
"글자수 예산 초과 시에만 뒤에서부터 잘라내라.\n"
"형식:\n"
'{"blocks": [{"area": "...", "type": "...", "topic_id": 1, "data": {슬롯 키-값}}]}'
)
try:
# Kei API만 사용. fallback 없음. 성공할 때까지 무한 재시도.
result_text = await _call_kei_editor_with_retry(user_prompt)
# Phase Q: 파싱 실패 시 재시도 (빈 data로 넘어가지 않는다)
import asyncio
MAX_FILL_RETRIES = 3
fill_success = False
filled = _parse_json(result_text)
for fill_attempt in range(MAX_FILL_RETRIES):
try:
result_text = await _call_kei_editor_with_retry(user_prompt)
if filled and "blocks" in filled:
for filled_block in filled["blocks"]:
matched = False
# 1차: topic_id로 정확 매칭
if filled_block.get("topic_id"):
for orig_block in blocks:
if orig_block.get("topic_id") == filled_block.get("topic_id"):
# data 덮어쓰되 column_override 등 기존 메타 보존 (J-6)
new_data = filled_block.get("data", {})
preserved = {}
if "data" in orig_block:
for k in ("column_override",):
if k in orig_block["data"]:
preserved[k] = orig_block["data"][k]
orig_block["data"] = {**new_data, **preserved}
matched = True
break
# 2차: area + type으로 매칭 (topic_id 없을 때)
if not matched:
for orig_block in blocks:
if (
orig_block.get("area") == filled_block.get("area")
and orig_block.get("type") == filled_block.get("type")
and "data" not in orig_block
):
# data 덮어쓰되 column_override 등 기존 메타 보존 (J-6)
new_data = filled_block.get("data", {})
preserved = {}
if "data" in orig_block:
for k in ("column_override",):
if k in orig_block["data"]:
preserved[k] = orig_block["data"][k]
orig_block["data"] = {**new_data, **preserved}
break
filled = _parse_json(result_text)
logger.info(
f"텍스트 정리 완료 (페이지 {page_idx + 1}): "
f"{len(filled['blocks'])}개 블록"
)
else:
logger.warning(f"텍스트 정리 파싱 실패 (페이지 {page_idx + 1}). 재시도 필요하지만 텍스트는 받았으므로 진행.")
if filled and "blocks" in filled:
filled_count = 0
for filled_block in filled["blocks"]:
matched = False
# 1차: topic_id로 정확 매칭
if filled_block.get("topic_id"):
for orig_block in blocks:
if orig_block.get("topic_id") == filled_block.get("topic_id"):
new_data = filled_block.get("data", {})
preserved = {}
if "data" in orig_block:
for k in ("column_override",):
if k in orig_block["data"]:
preserved[k] = orig_block["data"][k]
orig_block["data"] = {**new_data, **preserved}
matched = True
filled_count += 1
break
# 2차: area + type으로 매칭 (topic_id 없을 때)
if not matched:
for orig_block in blocks:
if (
orig_block.get("area") == filled_block.get("area")
and orig_block.get("type") == filled_block.get("type")
and "data" not in orig_block
):
new_data = filled_block.get("data", {})
preserved = {}
if "data" in orig_block:
for k in ("column_override",):
if k in orig_block["data"]:
preserved[k] = orig_block["data"][k]
orig_block["data"] = {**new_data, **preserved}
filled_count += 1
break
except Exception as e:
logger.error(f"텍스트 편집자 호출 실패: {e}", exc_info=True)
raise
logger.info(
f"텍스트 정리 완료 (페이지 {page_idx + 1}): "
f"{filled_count}/{len(filled['blocks'])}개 블록 매칭"
)
# 검증: data가 실제로 채워진 블록이 있는가?
blocks_with_data = [b for b in blocks if b.get("data") and b.get("topic_id") is not None]
if blocks_with_data:
fill_success = True
break
else:
logger.warning(
f"[fill_content] 파싱 성공했으나 매칭된 블록 0개 "
f"(시도 {fill_attempt + 1}/{MAX_FILL_RETRIES})"
)
else:
logger.warning(
f"[fill_content] JSON 파싱 실패 (시도 {fill_attempt + 1}/{MAX_FILL_RETRIES}). "
f"응답: {result_text[:200] if result_text else '(비어있음)'}"
)
except Exception as e:
logger.error(f"텍스트 편집자 호출 실패 (시도 {fill_attempt + 1}): {e}")
if fill_attempt == MAX_FILL_RETRIES - 1:
raise
# 재시도 전 대기
if fill_attempt < MAX_FILL_RETRIES - 1:
await asyncio.sleep(5)
if not fill_success:
# 최대 재시도 후에도 실패 — 에러 발생 (빈 data로 진행하지 않음)
empty_blocks = [b.get("type") for b in blocks if not b.get("data") and b.get("topic_id") is not None]
raise RuntimeError(
f"fill_content 최대 재시도({MAX_FILL_RETRIES}회) 후에도 "
f"데이터 채우기 실패. 빈 블록: {empty_blocks}"
)
return layout_concept
@@ -271,7 +331,115 @@ async def _call_kei_editor_with_retry(prompt: str) -> str:
# _apply_defaults 삭제됨 — Kei API 무한 재시도로 fallback 불필요.
async def fill_candidates(
content: str,
topic: dict[str, Any],
candidates: list[dict[str, Any]],
analysis: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""Phase P: 1개 topic의 후보 3개 블록을 한꺼번에 텍스트 편집한다.
Kei 편집자 1회 호출로 3개 블록 각각의 슬롯에 맞게 편집.
Args:
content: 원본 텍스트
topic: 해당 topic 정보 (id, title, purpose, source_hint 등)
candidates: 후보 블록 3개 (type, _container_height_px, _max_items 등 포함)
analysis: 1단계 분석 결과
Returns:
candidates 리스트에 data가 채워진 상태로 반환
"""
tid = topic.get("id", "?")
purpose = topic.get("purpose", "")
source_hint = topic.get("source_hint", "")
source_data = topic.get("source_data", "")
# 각 후보 블록의 슬롯 + 컨테이너 스펙 정리
block_sections = []
for i, block in enumerate(candidates):
block_type = block.get("type", "")
slots = BLOCK_SLOTS.get(block_type, {})
section = (
f"### 후보 {i+1}: {block_type}\n"
f" 필수 슬롯: {slots.get('required', [])}\n"
f" 선택 슬롯: {slots.get('optional', [])}"
)
slot_desc = slots.get("slot_desc", {})
if slot_desc:
desc_lines = [f" {k}: {v}" for k, v in slot_desc.items()]
section += "\n 슬롯 설명:\n" + "\n".join(desc_lines)
# Phase R: expression_hint + variant 전달
if topic.get("expression_hint"):
section += f"\n ★ 표현 의도: {topic['expression_hint']}"
variant = block.get("_variant", "default")
if variant != "default":
section += f"\n ★ 변형: {variant}"
# Phase Q: 글자수 예산 전달 (있으면 우선, 없으면 Phase O 스펙)
char_budget = block.get("_char_budget", {})
container_h = block.get("_container_height_px")
if char_budget:
section += (
f"\n ★ 글자수 예산 (하드 제약 — 초과 시 overflow):"
f"\n 총 글자: {char_budget.get('total_chars', '제한 없음')}"
f"\n 최대 항목: {char_budget.get('max_items', '제한 없음')}"
f"\n 항목당 글자: {char_budget.get('chars_per_item', '제한 없음')}"
)
elif container_h:
section += (
f"\n ★ 컨테이너 제약:"
f"\n 높이: {container_h}px"
f"\n 최대 항목: {block.get('_max_items', '제한 없음')}"
f"\n 항목당 글자: {block.get('_max_chars_per_item', '제한 없음')}"
f"\n 총 글자: {block.get('_max_chars_total', '제한 없음')}"
)
block_sections.append(section)
source_section = ""
if source_hint or source_data:
source_section = (
f"\n\n## 원본 데이터 (이 텍스트에서 추출하라. 재작성 금지.)\n"
f" source_hint: {source_hint}\n"
f" source_data: {source_data}"
)
prompt = (
f"## 원본 콘텐츠\n{content}\n\n"
f"## 꼭지 {tid}: {topic.get('title', '')}\n"
f" 목적: {purpose}\n\n"
f"## 후보 블록 3개 — 각각의 슬롯에 맞게 텍스트를 편집하라\n\n"
+ "\n\n".join(block_sections)
+ source_section
+ "\n\n## 요청\n"
"위 3개 후보 블록 각각에 맞는 텍스트를 JSON으로 반환해줘.\n"
"원본에서 추출하라. 재작성 금지. 축약만 허용.\n"
"형식:\n"
'{"candidates": [\n'
' {"candidate_index": 0, "type": "블록타입", "data": {슬롯 키-값}},\n'
' {"candidate_index": 1, "type": "블록타입", "data": {슬롯 키-값}},\n'
' {"candidate_index": 2, "type": "블록타입", "data": {슬롯 키-값}}\n'
']}'
)
result_text = await _call_kei_editor_with_retry(prompt)
filled = _parse_json(result_text)
if filled and "candidates" in filled:
for filled_item in filled["candidates"]:
idx = filled_item.get("candidate_index", -1)
if 0 <= idx < len(candidates):
candidates[idx]["data"] = filled_item.get("data", {})
logger.info(f"[Phase P] 꼭지 {tid}: 후보 {len(filled['candidates'])}개 텍스트 편집 완료")
else:
logger.warning(f"[Phase P] 꼭지 {tid}: 텍스트 편집 파싱 실패")
return candidates
def _parse_json(text: str) -> dict[str, Any] | None:

View File

@@ -450,6 +450,99 @@ def _load_catalog() -> str:
# Step B(Sonnet) 제거됨 — Phase O에서 Kei 확정 + 코드 검증으로 대체.
async def _opus_batch_recommend(
analysis: dict[str, Any],
faiss_candidates: dict[int, list[dict]],
container_specs: dict | None = None,
) -> dict[int, str]:
"""Phase P: 전체 topic을 한꺼번에 보여주고 topic별 Opus 추천 1개씩 받는다.
FAISS 후보 2개를 함께 보여주고, Opus가 도메인 지식으로 다른 1개를 추천.
1회 Kei API 호출로 전체 topic 처리.
Returns:
{topic_id: block_type} — 각 topic별 Opus 추천 블록
"""
import httpx
from src.sse_utils import stream_sse_tokens
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
# 각 topic의 정보 + FAISS 후보 정리
topic_sections = []
for topic in analysis.get("topics", []):
tid = topic.get("id")
faiss_blocks = faiss_candidates.get(tid, [])
faiss_ids = [b["id"] for b in faiss_blocks]
# 컨테이너 제약 정보
container_info = ""
if container_specs:
from src.space_allocator import find_container_for_topic
spec = find_container_for_topic(tid, container_specs)
if spec:
per_topic = spec.height_px // max(1, len(spec.topic_ids))
container_info = f"컨테이너: {per_topic}px, 허용 height_cost: {spec.max_height_cost} 이하"
topic_sections.append(
f"- 꼭지 {tid}: {topic.get('title', '')}\n"
f" purpose: {topic.get('purpose', '')}\n"
f" relation_type: {topic.get('relation_type', '')}\n"
f" expression_hint: {topic.get('expression_hint', '')}\n"
f" FAISS 후보: {faiss_ids}\n"
f" {container_info}"
)
prompt = (
"아래 각 꼭지에 대해 FAISS가 추천한 블록 2개를 참고하되,\n"
"도메인 지식을 활용하여 **FAISS 후보에 없는 다른 블록 1개**를 추천해줘.\n"
"FAISS 후보와 중복되면 안 된다.\n"
"각 꼭지의 purpose, relation_type, expression_hint를 보고\n"
"**콘텐츠의 의미와 목적에 가장 적합한** 블록을 추천하라.\n"
"컨테이너 크기 제약도 반드시 고려하라.\n\n"
f"## 꼭지 목록\n" + "\n".join(topic_sections) +
"\n\n## 출력 (JSON만)\n"
'{"recommendations": [{"topic_id": 1, "block_type": "...", "reason": "..."}]}'
)
try:
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"POST",
f"{kei_url}/api/message",
json={
"message": prompt,
"session_id": "design-agent-p-recommend",
"mode_hint": "chat",
},
timeout=None,
) as response:
if response.status_code != 200:
logger.warning(f"[Phase P] Opus 배치 추천 HTTP {response.status_code}")
return {}
full_text = await stream_sse_tokens(response)
if not full_text:
return {}
result = _parse_json(full_text)
if result and "recommendations" in result:
mapping = {}
for rec in result["recommendations"]:
tid = rec.get("topic_id") or rec.get("id")
if tid is not None:
mapping[tid] = rec.get("block_type", "")
logger.info(f"[Phase P] Opus 배치 추천: {mapping}")
return mapping
logger.warning(f"[Phase P] Opus 배치 추천 JSON 파싱 실패: {full_text[:200]}")
return {}
except Exception as e:
logger.warning(f"[Phase P] Opus 배치 추천 실패: {e}")
return {}
async def _opus_block_recommendation(
analysis: dict[str, Any],
block_candidates: str,

View File

@@ -217,6 +217,76 @@ def format_measurement_for_kei(
return "\n".join(lines)
def measure_candidate_block(html: str) -> dict[str, Any]:
"""Phase P: 단일 후보 블록을 렌더링하여 높이 측정 + 스크린샷 캡처.
Args:
html: render_block_in_container()로 생성된 완전한 HTML
Returns:
{
"scrollHeight": 실제 콘텐츠 높이,
"containerHeight": 컨테이너 높이,
"overflowed": 넘침 여부,
"excess_px": 초과 px,
"screenshot_b64": base64 PNG 문자열
}
"""
options = Options()
options.add_argument("--headless=new")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--force-device-scale-factor=1")
options.add_argument("--window-size=1400,900")
driver = None
try:
driver = webdriver.Chrome(options=options)
import urllib.parse
encoded = urllib.parse.quote(html)
driver.get(f"data:text/html;charset=utf-8,{encoded}")
try:
driver.execute_script("return document.fonts.ready")
except Exception:
pass
result = driver.execute_script("""
var container = document.querySelector('.candidate-container');
if (!container) return {error: 'container not found'};
return {
scrollHeight: container.scrollHeight,
containerHeight: parseInt(container.style.height) || container.clientHeight,
overflowed: container.scrollHeight > container.clientHeight + 2,
excess_px: Math.max(0, container.scrollHeight - container.clientHeight)
};
""")
if not result or "error" in result:
return {"scrollHeight": 0, "containerHeight": 0, "overflowed": False, "excess_px": 0, "screenshot_b64": None}
# 스크린샷 캡처
from selenium.webdriver.common.by import By
container = driver.find_element(By.CSS_SELECTOR, ".candidate-container")
screenshot_b64 = container.screenshot_as_base64
result["screenshot_b64"] = screenshot_b64
return result
except Exception as e:
logger.warning(f"[Phase P] 후보 블록 측정 실패: {e}")
return {"scrollHeight": 0, "containerHeight": 0, "overflowed": False, "excess_px": 0, "screenshot_b64": None}
finally:
if driver:
try:
driver.quit()
except Exception:
pass
def capture_slide_screenshot(html: str) -> str | None:
"""Phase N-4: 렌더링된 슬라이드의 스크린샷을 base64 PNG로 캡처한다.