Files
C.E.L_Slide_test2/src/pipeline.py
kyeongmin b0bcffc0f6 Phase N+O: 컨테이너 기반 레이아웃 + Step B 제거 + 전면 정리
- Phase N: catalog 개선, fallback 전면 제거, Kei API 무한 재시도, topic_id 버그 수정
- Phase O: 컨테이너 스펙 계산(비중→px), 블록 스펙 확정, 렌더러 container div
- Step B(Sonnet) 제거: Kei(A-2)+코드로 대체. STEP_B_PROMPT/fallback/DOWNGRADE_MAP 삭제
- Selenium: container div 감지 추가
- catalog.yaml: ref_chars 구조 변환 + FAISS 재빌드
- 문서 전면 갱신: README, PROGRESS, IMPROVEMENT, Phase I~O md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 15:20:51 +09:00

728 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""DA-14: 전체 파이프라인 (5단계).
1. Kei 실장: 꼭지 추출 + 분석
2. 디자인 팀장: 레이아웃 설계
3. 텍스트 편집자: 텍스트 정리
4. 디자인 실무자: HTML 조립
5. 디자인 팀장: 전체 재검토
"""
from __future__ import annotations
import json
import logging
import re
import time
from pathlib import Path
from typing import Any, AsyncIterator
import anthropic
from src.kei_client import classify_content, refine_concepts, call_kei_overflow_judgment, call_kei_final_review
from src.design_director import create_layout_concept, LAYOUT_PRESETS, select_preset
from src.content_editor import fill_content
from src.renderer import render_slide
from src.image_utils import get_image_sizes, embed_images
from src.space_allocator import calculate_container_specs, finalize_block_specs, find_container_for_topic, calculate_trim_chars
from src.slide_measurer import measure_rendered_heights, format_measurement_for_kei, capture_slide_screenshot
from src.config import settings
logger = logging.getLogger(__name__)
# Kei API 재시도 간격(초). 제한 없음 — 성공할 때까지 무한 재시도.
KEI_RETRY_INTERVAL = 10
async def _retry_kei(fn, *args, **kwargs):
"""Kei API 호출을 성공할 때까지 무한 재시도한다.
Kei API는 필수 인프라. fallback 없음. 제한 없음.
10분이든 20분이든 Kei가 응답할 때까지 기다린다.
"""
import asyncio
attempt = 0
while True:
attempt += 1
result = await fn(*args, **kwargs)
if result is not None:
if attempt > 1:
logger.info(f"[Kei 재시도] {fn.__name__} 성공 ({attempt}번째 시도)")
return result
logger.warning(
f"[Kei 재시도] {fn.__name__} 실패 (시도 {attempt}). "
f"{KEI_RETRY_INTERVAL}초 후 재시도..."
)
await asyncio.sleep(KEI_RETRY_INTERVAL)
def _save_step(run_dir: Path, filename: str, data: Any) -> None:
"""스텝 결과를 JSON 또는 HTML로 저장한다. (K-1)"""
run_dir.mkdir(parents=True, exist_ok=True)
filepath = run_dir / filename
if filename.endswith(".html"):
filepath.write_text(data, encoding="utf-8")
else:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"[중간 산출물] {filename} 저장 → {run_dir.name}/")
async def generate_slide(
content: str,
manual_layout: dict[str, Any] | None = None,
base_path: str = "",
) -> AsyncIterator[dict[str, str]]:
"""콘텐츠를 슬라이드 HTML로 변환하는 5단계 파이프라인.
Yields:
SSE 이벤트: progress / result / error
"""
# K-1: 중간 산출물 저장용 디렉토리
run_id = str(int(time.time() * 1000))
run_dir = Path("data/runs") / run_id
try:
# 1단계: Kei 실장 — 꼭지 추출 + 분석
yield {"event": "progress", "data": "1/5 Kei 실장이 꼭지를 추출 중..."}
if manual_layout:
analysis = manual_layout
else:
analysis = await _retry_kei(classify_content, content)
# _retry_kei는 무한 재시도. None이 올 수 없다.
topic_count = len(analysis.get("topics", []))
page_count = analysis.get("total_pages", 1)
logger.info(f"1단계-A 완료: {topic_count}개 꼭지, {page_count}페이지")
_save_step(run_dir, "step1_analysis.json", analysis)
# 1단계-B: 각 꼭지 컨셉 구체화
yield {"event": "progress", "data": "1.5/5 Kei 실장이 각 꼭지의 컨셉을 구체화 중..."}
analysis = await refine_concepts(content, analysis)
logger.info("1단계-B 완료: 컨셉 구체화")
_save_step(run_dir, "step1b_concepts.json", {
"concepts": [
{k: t.get(k) for k in ("id", "title", "purpose", "relation_type", "expression_hint", "source_data")}
for t in analysis.get("topics", [])
]
})
# I-6: 슬라이드 제목 ↔ 첫 꼭지 제목 중복 검증
from difflib import SequenceMatcher
title = analysis.get("title", "")
topics = analysis.get("topics", [])
if topics:
first_title = topics[0].get("title", "")
similarity = SequenceMatcher(None, title, first_title).ratio()
if similarity > 0.7:
purpose = topics[0].get("purpose", "문제제기")
topics[0]["title"] = f"{purpose}: {topics[0].get('summary', '')[:30]}"
logger.warning(
f"[제목 중복 교정] 유사도 {similarity:.0%} → 첫 꼭지 제목 변경"
)
# 이미지 크기 측정 (base_path 있을 때만)
image_sizes = get_image_sizes(content, base_path)
if image_sizes:
analysis["image_sizes"] = image_sizes
logger.info(f"이미지 측정: {len(image_sizes)}")
# ★ Phase O-1: 컨테이너 스펙 계산 (Kei 비중 → px 확정)
preset_name = select_preset(analysis)
preset = LAYOUT_PRESETS.get(preset_name, {})
page_struct = analysis.get("page_structure", {})
container_specs = calculate_container_specs(
page_structure=page_struct,
topics=analysis.get("topics", []),
preset=preset,
slide_width=settings.slide_width,
slide_height=settings.slide_height,
)
_save_step(run_dir, "step1c_containers.json", {
role: {
"height_px": spec.height_px,
"width_px": spec.width_px,
"max_height_cost": spec.max_height_cost,
"topic_ids": spec.topic_ids,
"weight": spec.weight,
"block_constraints": spec.block_constraints,
}
for role, spec in container_specs.items()
})
# 2단계: 디자인 팀장 — Step A(프리셋) + Step A-2(Kei 블록 확정) + Step B(zone 배치)
yield {"event": "progress", "data": "2/5 디자인 팀장이 레이아웃을 설계 중..."}
layout_concept = await create_layout_concept(content, analysis, container_specs=container_specs)
total_blocks = sum(
len(p.get("blocks", [])) for p in layout_concept.get("pages", [])
)
logger.info(
f"2단계 완료: {len(layout_concept.get('pages', []))}페이지, "
f"{total_blocks}개 블록"
)
_save_step(run_dir, "step2_layout.json", {
"preset": layout_concept.get("pages", [{}])[0].get("grid_areas", ""),
"blocks": [
{
"area": b.get("area"), "type": b.get("type"),
"topic_id": b.get("topic_id"), "purpose": b.get("purpose"),
"reason": b.get("reason", ""), "size": b.get("size", ""),
}
for p in layout_concept.get("pages", [])
for b in p.get("blocks", [])
],
"overflow": layout_concept.get("overflow", []),
})
# ★ Phase O-3: 블록 스펙 확정 (컨테이너 크기 → 항목수/글자수/폰트)
for page in layout_concept.get("pages", []):
finalize_block_specs(page.get("blocks", []), container_specs)
# 컨테이너 스펙을 layout_concept에 저장 (렌더러에서 사용)
layout_concept["_container_specs"] = container_specs
_save_step(run_dir, "step2c_block_specs.json", {
"blocks": [
{
"type": b.get("type"), "topic_id": b.get("topic_id"),
"area": b.get("area"),
"_container_height_px": b.get("_container_height_px"),
"_max_items": b.get("_max_items"),
"_max_chars_per_item": b.get("_max_chars_per_item"),
"_max_chars_total": b.get("_max_chars_total"),
"_font_size_px": b.get("_font_size_px"),
}
for p in layout_concept.get("pages", [])
for b in p.get("blocks", [])
]
})
# 3단계: 텍스트 편집자 — 텍스트 정리
yield {"event": "progress", "data": "3/5 텍스트 편집자가 핵심을 정리 중..."}
layout_concept = await fill_content(content, layout_concept, analysis)
logger.info("3단계 완료: 텍스트 정리")
_save_step(run_dir, "step3_filled_blocks.json", {
"blocks": [
{
"area": b.get("area"), "type": b.get("type"),
"topic_id": b.get("topic_id"), "purpose": b.get("purpose"),
"data": b.get("data", {}),
"char_count": len(json.dumps(b.get("data", {}), ensure_ascii=False)),
}
for p in layout_concept.get("pages", [])
for b in p.get("blocks", [])
]
})
# 4단계: 디자인 실무자 — 디자인 조정 + HTML 조립
yield {"event": "progress", "data": "4/5 디자인 실무자가 슬라이드를 조립 중..."}
layout_concept = await _adjust_design(layout_concept, analysis)
html = render_slide(layout_concept)
logger.info("4단계 완료: HTML 조립")
_save_step(run_dir, "step4_css_adjustment.json", {
"area_styles": layout_concept.get("pages", [{}])[0].get("area_styles", {})
})
_save_step(run_dir, "step4_rendered.html", html)
# Phase L: 렌더링 측정 + 피드백 루프 (최대 3회)
import asyncio
MAX_MEASURE_ROUNDS = 3
measurement = None
for measure_round in range(MAX_MEASURE_ROUNDS):
measurement = await asyncio.to_thread(measure_rendered_heights, html)
_save_step(run_dir, f"step4_measurement_round{measure_round + 1}.json", measurement)
# overflow 감지 — zone + container 양쪽 체크
has_overflow = False
for zone_name, zone_data in measurement.get("zones", {}).items():
if zone_data.get("overflowed"):
has_overflow = True
break
# Phase O: container 레벨 overflow도 체크
for cont_name, cont_data in measurement.get("containers", {}).items():
if cont_data.get("overflowed"):
has_overflow = True
logger.warning(
f"[측정] container-{cont_name}: "
f"scroll={cont_data.get('scrollHeight')}px > "
f"allocated={cont_data.get('allocatedHeight')}px "
f"(+{cont_data.get('excess_px')}px)"
)
break
if not has_overflow:
logger.info(f"[측정] 모든 zone/container 정상 (round {measure_round + 1})")
break
logger.warning(f"[측정] overflow 감지 (round {measure_round + 1})")
# 수학적 축약량 계산 → 편집자 재호출
adjusted = False
for zone_name, zone_data in measurement.get("zones", {}).items():
if not zone_data.get("overflowed"):
continue
excess = zone_data.get("excess_px", 0)
zone_info = preset.get("zones", {}).get(zone_name, {})
width_px = int(settings.slide_width * zone_info.get("width_pct", 100) / 100 * 0.85)
# Phase O: overflow 블록의 _max_chars_total 축소
for block_m in zone_data.get("blocks", []):
if block_m.get("overflowed"):
trim_chars = calculate_trim_chars(
block_m.get("excess_px", excess),
width_px,
)
for page in layout_concept.get("pages", []):
for block in page.get("blocks", []):
if block.get("area") == zone_name:
current_max = block.get("_max_chars_total", 400)
block["_max_chars_total"] = max(20, current_max - trim_chars)
if "data" in block:
del block["data"]
adjusted = True
logger.info(
f"[측정 조정] {zone_name}/{block_m.get('block_type')}: "
f"{block_m.get('excess_px')}px 초과 → "
f"_max_chars_total {current_max}{block['_max_chars_total']} ({trim_chars}자 축약)"
)
break
if not adjusted:
logger.info("[측정] 조정 대상 없음, 현재 결과 확정")
break
# 편집자 재호출 → 재렌더링
layout_concept = await fill_content(content, layout_concept, analysis)
layout_concept = await _adjust_design(layout_concept, analysis)
html = render_slide(layout_concept)
logger.info(f"[측정] round {measure_round + 1} 재렌더링 완료")
# 측정 결과 텍스트 (Kei 검수에 전달)
measurement_text = format_measurement_for_kei(measurement) if measurement else ""
# Phase N-4: 5단계 — Kei 실장 최종 검수 (스크린샷 기반, 최대 1회)
# overflow 없으면 skip (시간 절약)
has_any_overflow = False
if measurement:
for zone_data in measurement.get("zones", {}).values():
if zone_data.get("overflowed"):
has_any_overflow = True
break
if measurement.get("slide", {}).get("overflowed"):
has_any_overflow = True
MAX_REVIEW_ROUNDS = 1
screenshot_b64 = None
if not has_any_overflow:
logger.info("5단계 skip: overflow 없음. 검수 불필요.")
else:
yield {"event": "progress", "data": "5/5 Kei 실장이 최종 검수 중..."}
# 스크린샷 캡처 (Selenium)
screenshot_b64 = await asyncio.to_thread(capture_slide_screenshot, html)
if screenshot_b64:
_save_step(run_dir, "step5_screenshot.txt", f"base64 PNG, {len(screenshot_b64)} chars")
logger.info("[5단계] 스크린샷 캡처 완료 → Kei에게 전달")
for review_round in range(MAX_REVIEW_ROUNDS if has_any_overflow else 0):
review_result = await _review_balance(
html, layout_concept, content, analysis, measurement_text,
screenshot_b64=screenshot_b64,
)
if not review_result or not review_result.get("needs_adjustment"):
if review_round == 0:
logger.info("5단계 완료: 조정 불필요")
else:
logger.info(f"5단계 완료: {review_round}차 조정 후 균형 확인")
break
issues = review_result.get("issues", [])
logger.info(
f"5단계 ({review_round + 1}/{MAX_REVIEW_ROUNDS}): "
f"조정 필요 — {issues}"
)
_save_step(run_dir, f"step5_review_round{review_round + 1}.json", review_result)
# overflow_detected가 있으면 Kei에게 판단 요청 (Sonnet은 감지만, 판단은 Kei)
overflow_adjs = [
adj for adj in review_result.get("adjustments", [])
if adj.get("action") == "overflow_detected"
]
if overflow_adjs:
overflow_context = _build_overflow_context(
layout_concept, overflow_adjs
)
kei_judgment = await call_kei_overflow_judgment(
overflow_context, content, analysis
)
if kei_judgment is None:
# 넘침 판단도 Kei 필수 — 성공할 때까지 무한 재시도
kei_judgment = await _retry_kei(
call_kei_overflow_judgment, overflow_context, content, analysis
)
_convert_kei_judgment(review_result, kei_judgment)
logger.info(
f"[Kei 넘침 판단] decision={kei_judgment.get('decision')}"
)
layout_concept = await _apply_adjustments(
layout_concept, review_result, content
)
html = render_slide(layout_concept)
logger.info(f"5단계: {review_round + 1}차 조정 반영, 재검토 진행")
else:
# MAX_REVIEW_ROUNDS 초과
logger.warning(
f"5단계: 최대 재조정 횟수({MAX_REVIEW_ROUNDS}) 도달. 현재 결과로 확정."
)
# D-5: 이미지를 base64로 삽입 (다운로드 HTML에서도 보이도록)
if base_path:
html = embed_images(html, base_path)
logger.info("이미지 base64 삽입 완료")
_save_step(run_dir, "final.html", html)
yield {"event": "result", "data": html}
logger.info(f"슬라이드 생성 완료: {len(layout_concept.get('pages', []))}페이지, run={run_id}")
except Exception as e:
logger.exception(f"파이프라인 오류: {e}")
yield {"event": "error", "data": str(e)}
async def _adjust_design(
layout_concept: dict[str, Any],
analysis: dict[str, Any],
) -> dict[str, Any]:
"""4단계 전반: 디자인 실무자가 텍스트 양에 맞게 CSS를 조정한다.
각 area별 블록 수, 텍스트 총량, zone 예산을 계산하고,
Sonnet이 area별 CSS 변수 override를 결정한다.
블록 템플릿이 이미 CSS 변수(var(--font-body) 등)를 사용하므로,
area div에서 변수를 override하면 내부 블록이 자동 조정된다.
"""
try:
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
# 프리셋 정보 가져오기
preset_name = select_preset(analysis)
preset = LAYOUT_PRESETS.get(preset_name, {})
zones = preset.get("zones", {})
for page in layout_concept.get("pages", []):
# area별 블록 수 + 텍스트 총량 집계
area_info = {}
for block in page.get("blocks", []):
area = block.get("area", "body")
if area not in area_info:
zone = zones.get(area, {})
area_info[area] = {
"block_count": 0,
"total_chars": 0,
"budget_px": zone.get("budget_px", 490),
"width_pct": zone.get("width_pct", 100),
"block_types": [],
}
data = block.get("data", {})
text_len = len(json.dumps(data, ensure_ascii=False))
area_info[area]["block_count"] += 1
area_info[area]["total_chars"] += text_len
area_info[area]["block_types"].append(block.get("type", ""))
# area 정보 텍스트 구성
area_lines = []
for area_name, info in area_info.items():
area_lines.append(
f"- {area_name} (예산 {info['budget_px']}px, 너비 {info['width_pct']}%): "
f"{info['block_count']}개 블록, 총 {info['total_chars']}\n"
f" 블록 타입: {', '.join(info['block_types'])}"
)
system = (
"당신은 디자인 실무자이다. 편집자가 정리한 텍스트가 각 영역에 잘 들어가도록 CSS를 조정한다.\n\n"
"## 원칙\n"
"- 텍스트를 자르지 않는다. 디자인이 텍스트에 맞춘다.\n"
"- 빈 공간을 방치하지 않는다.\n"
"- 텍스트가 많으면: 폰트/여백을 줄여서 맞춘다.\n"
"- 텍스트가 적으면: 폰트/여백을 늘려서 채운다.\n\n"
"## 조정 가능한 CSS 변수\n"
"- --font-body (기본 0.95rem): 본문 폰트 크기\n"
"- --font-subtitle (기본 1.25rem): 소제목 폰트 크기\n"
"- --font-caption (기본 0.8rem): 캡션 폰트 크기\n"
"- --spacing-inner (기본 16px): 블록 내부 여백\n"
"- --spacing-block (기본 20px): 블록 간 간격\n"
"- --spacing-small (기본 8px): 작은 여백\n\n"
"## 출력 형식 (JSON만. 설명 없이.)\n"
"각 area에 적용할 CSS 변수 override를 inline style 문자열로 반환.\n"
"조정 불필요한 area는 빈 문자열.\n"
'{"area_styles": {"body": "--font-body: 0.85rem; --spacing-inner: 10px;", "sidebar": "", "footer": ""}}'
)
user_prompt = (
f"## 각 영역 현황\n" + "\n".join(area_lines) +
f"\n\n위 영역별로 CSS 변수 조정이 필요한지 판단하여 JSON으로 반환해줘."
)
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=system,
messages=[{"role": "user", "content": user_prompt}],
)
result_text = response.content[0].text
result = _parse_json(result_text)
if result and "area_styles" in result:
page["area_styles"] = result["area_styles"]
logger.info(
f"디자인 조정: {', '.join(f'{k}={bool(v)}' for k, v in result['area_styles'].items())}"
)
else:
page["area_styles"] = {}
logger.info("디자인 조정: 조정 불필요 또는 파싱 실패")
except Exception as e:
logger.warning(f"디자인 조정 실패 (기존 스타일로 렌더링): {e}")
# 실패 시 area_styles 없음 → 기존과 동일하게 렌더링
for page in layout_concept.get("pages", []):
if "area_styles" not in page:
page["area_styles"] = {}
return layout_concept
async def _review_balance(
html: str,
layout_concept: dict[str, Any],
content: str,
analysis: dict[str, Any] | None = None,
measurement_text: str = "",
screenshot_b64: str | None = None,
) -> dict[str, Any] | None:
"""5단계: Kei 실장이 조립 결과를 최종 검수한다. (J-7 + Phase L)
Kei가 콘텐츠 관점 + 실제 렌더링 측정 결과 기반으로 검수:
- 핵심 메시지 전달 여부
- 콘텐츠 흐름 ↔ 블록 배치 일치
- 실제 px 기반 높이/비중 검증 (Phase L)
- 중요 내용 누락/축소 여부
"""
try:
# 블록별 텍스트 양 요약
block_summary = []
for page in layout_concept.get("pages", []):
for block in page.get("blocks", []):
data = block.get("data", {})
text_len = len(json.dumps(data, ensure_ascii=False))
block_summary.append(
f" {block.get('area')}/{block.get('type')}: "
f"데이터 {text_len}"
)
# zone 예산 정보 (analysis에서 프리셋 추출)
zone_budget_text = ""
overflow_hint_text = ""
if analysis:
preset_name = select_preset(analysis)
preset = LAYOUT_PRESETS.get(preset_name, {})
zone_lines = [
f"- {name}: ~{z['budget_px']}px (너비 {z['width_pct']}%)"
for name, z in preset.get("zones", {}).items()
]
zone_budget_text = (
"\n\n## zone별 높이 예산\n" + "\n".join(zone_lines)
)
# Stage 2에서 감지한 예상 overflow 힌트
overflow_hint = layout_concept.get("overflow", [])
if overflow_hint:
hint_lines = [
f"- {o['area']}: 예상 {o['total_px']}px > 예산 {o['budget_px']}px "
f"(+{o['overflow_px']}px 초과)"
for o in overflow_hint
]
overflow_hint_text = (
"\n\n## 높이 초과 힌트 (2단계 예상치, 참고용)\n"
+ "\n".join(hint_lines)
)
# Phase L: 렌더링 측정 결과를 overflow_hint에 추가 (실제 px 기반)
if measurement_text:
overflow_hint_text += f"\n\n{measurement_text}"
# Kei로 최종 검수 (Sonnet 절대 금지, 스크린샷 있으면 이미지 기반)
return await call_kei_final_review(
html, block_summary, zone_budget_text, overflow_hint_text, analysis,
screenshot_b64=screenshot_b64,
)
except Exception as e:
logger.warning(f"재검토 실패: {e}")
return None
async def _apply_adjustments(
layout_concept: dict[str, Any],
review: dict[str, Any],
content: str,
) -> dict[str, Any]:
"""재검토 결과에 따라 텍스트를 재편집한다."""
adjustments = review.get("adjustments", [])
if not adjustments:
return layout_concept
# 조정이 필요한 블록만 재편집
for adj in adjustments:
area = adj.get("block_area", "")
action = adj.get("action", "")
ratio = adj.get("target_ratio")
detail = adj.get("detail", "")
for page in layout_concept.get("pages", []):
for block in page.get("blocks", []):
if block.get("area") != area:
continue
if action == "expand" and ratio:
for key in block.get("char_guide", {}):
block["char_guide"][key] = int(
block["char_guide"][key] * ratio
)
logger.info(f"조정: {area} → expand ×{ratio} ({detail})")
elif action == "shrink" and ratio:
for key in block.get("char_guide", {}):
block["char_guide"][key] = int(
block["char_guide"][key] * ratio
)
logger.info(f"조정: {area} → shrink ×{ratio} ({detail})")
elif action == "rewrite":
if "data" in block:
del block["data"]
block["reason"] = f"재작성: {detail}"
logger.info(f"조정: {area} → rewrite ({detail})")
elif action == "kei_trim":
max_chars = adj.get("max_chars", 200)
if "char_guide" not in block:
block["char_guide"] = {}
for key in block.get("char_guide", {}):
block["char_guide"][key] = min(
block["char_guide"][key], max_chars
)
if not block["char_guide"]:
block["char_guide"] = {"text": max_chars}
logger.info(
f"조정: {area} → kei_trim max_chars={max_chars} "
f"({detail})"
)
elif action == "kei_restructure":
block["detail_target"] = True
if "data" in block:
del block["data"]
block["reason"] = f"재구성: {detail}"
logger.info(
f"조정: {area} → kei_restructure (detail_target)"
)
# 조정된 가이드로 재편집
layout_concept = await fill_content(content, layout_concept)
return layout_concept
def _build_overflow_context(
layout_concept: dict[str, Any],
overflow_adjs: list[dict],
) -> list[dict]:
"""Sonnet이 감지한 overflow_detected를 Kei에게 전달할 형태로 변환한다.
실제 채워진 블록 데이터(텍스트)를 포함하여 Kei가 판단할 수 있도록 한다.
"""
overflows = []
for adj in overflow_adjs:
area = adj.get("block_area", "")
# 해당 zone의 블록 정보 + 실제 텍스트 추출
area_blocks = []
for page in layout_concept.get("pages", []):
for block in page.get("blocks", []):
if block.get("area") == area:
data = block.get("data", {})
text_preview = json.dumps(data, ensure_ascii=False)[:300]
area_blocks.append({
"type": block.get("type", ""),
"purpose": block.get("purpose", ""),
"topic_id": block.get("topic_id"),
"text_preview": text_preview,
})
overflows.append({
"area": area,
"detail": adj.get("detail", ""),
"blocks": area_blocks,
})
return overflows
def _convert_kei_judgment(
review_result: dict[str, Any],
kei_judgment: dict[str, Any],
) -> None:
"""Kei의 trim/restructure 판단을 review_result.adjustments에 반영한다.
기존 overflow_detected 항목을 kei_trim 또는 kei_restructure로 교체.
"""
decision = kei_judgment.get("decision", "")
new_adjs = []
for adj in review_result.get("adjustments", []):
if adj.get("action") == "overflow_detected":
# overflow_detected → Kei 판단으로 교체
if decision == "trim":
for target in kei_judgment.get("trim_targets", []):
new_adjs.append({
"block_area": adj.get("block_area", ""),
"action": "kei_trim",
"max_chars": target.get("max_chars", 200),
"topic_id": target.get("topic_id"),
"detail": target.get("reason", ""),
})
elif decision == "restructure":
for tid in kei_judgment.get("detail_topics", []):
new_adjs.append({
"block_area": adj.get("block_area", ""),
"action": "kei_restructure",
"topic_id": tid,
"detail": kei_judgment.get("reason", ""),
})
else:
# 기존 expand/shrink/rewrite는 그대로 유지
new_adjs.append(adj)
review_result["adjustments"] = new_adjs
def _parse_json(text: str) -> dict[str, Any] | None:
"""텍스트에서 JSON을 추출한다."""
patterns = [
r"```json\s*(.*?)```",
r"```\s*(.*?)```",
r"(\{.*\})",
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
continue
return None