1472 lines
58 KiB
Python
1472 lines
58 KiB
Python
"""DA-12: 1단계 — Kei 실장 (꼭지 추출 + 분석).
|
||
|
||
Kei API를 통해 Kei persona가 사고하여 꼭지를 추출한다.
|
||
Kei API는 필수. fallback 없음. 성공할 때까지 무한 재시도.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
from typing import Any
|
||
|
||
import httpx
|
||
|
||
from src.config import settings
|
||
from src.sse_utils import stream_sse_tokens
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
KEI_PROMPT = (
|
||
"다음 콘텐츠로 슬라이드 1장을 만들려고 해. 어떤 스토리로 구성할지 설계해줘.\n\n"
|
||
"## 1단계: 핵심 메시지 파악\n"
|
||
"- 이 콘텐츠가 전달하려는 **핵심 메시지**를 한 줄로 파악해줘.\n"
|
||
"- 슬라이드를 본 사람이 기억해야 할 단 하나의 문장.\n"
|
||
"- core_message 필드에 기록.\n\n"
|
||
"## 2단계: 정보 구조 파악\n"
|
||
"- 본문 흐름(flow)과 참조 정보(reference)로 분리되는 구조인가?\n"
|
||
"- 독립적으로 참조되는 정보(용어 정의, 부록)가 있는가?\n"
|
||
"- info_structure 필드에 기술.\n\n"
|
||
"## 3단계: 슬라이드 스토리라인 설계\n"
|
||
"핵심 메시지를 전달하기 위한 **흐름**을 설계해줘.\n"
|
||
"각 꼭지에 purpose를 부여하고, topics 배열에 기록.\n\n"
|
||
"## 4단계: 꼭지별 성격 판단\n"
|
||
"각 꼭지에 대해 다음을 판단하라:\n\n"
|
||
"### sidebar 판단\n"
|
||
"- 이 꼭지의 내용이 **본문과 독립된 참조 정보**(용어 정의, 개념 비교, 참조 테이블)인가?\n"
|
||
"- 독립 참조 → role: 'reference' (sidebar 후보)\n"
|
||
"- 본문 흐름의 일부 → role: 'flow'\n\n"
|
||
"### 팝업 판단\n"
|
||
"- <details> 안에 있는 콘텐츠 → 팝업 처리 대상\n"
|
||
"- 너무 세부적인 내용 → 팝업으로 분리 가능\n\n"
|
||
"### 핵심요약\n"
|
||
"- :::note[핵심 요약] 등의 결론 텍스트가 있으면 **conclusion_text** 필드에 원본 그대로 기록\n"
|
||
"- conclusion_text는 슬라이드 하단 footer에 자동 배치됨\n\n"
|
||
"**주의: page_structure, zone, 영역 배치는 판단하지 마라.**\n"
|
||
"**영역과 zone은 코드가 블록 매칭을 통해 결정한다.**\n"
|
||
"**너는 꼭지 추출 + 각 꼭지의 성격(reference/flow, 팝업 여부)만 판단하라.**\n\n"
|
||
"## 원본 텍스트 보존 원칙 (절대 규칙)\n"
|
||
"- **제목(##, ###)은 원본 그대로 사용하라. 절대 바꾸지 마라.**\n"
|
||
" 원본이 '## 1. DX의 궁극적 목표'이면 꼭지 제목도 'DX의 궁극적 목표'.\n"
|
||
" 임의로 '핵심 목표', '전략 방향' 등으로 바꾸지 마라.\n"
|
||
"- **원본 텍스트(불릿, 설명)는 85% 이상 그대로 사용하라.**\n"
|
||
" 문장을 재작성하지 마라. 원본 문장을 그대로 가져와라.\n"
|
||
"- **결론 텍스트도 원본 그대로.** 임의로 만들지 마라.\n"
|
||
"- 원본에 있는 내용을 임의로 제거하거나 다른 의미로 바꾸지 마라.\n"
|
||
"- 텍스트 재구성이 허용되는 경우는 **빈 공간에 채울 요약(표, 팝업 요약)만**.\n"
|
||
"- 각 꼭지의 source_hint에 원본의 어떤 부분이 가는지 명시.\n\n"
|
||
"## 배치 규칙\n"
|
||
"- 참조 정보(용어 정의 등)는 role: 'reference'로 표시 → 사이드바 배치\n"
|
||
"- 본문 흐름은 role: 'flow' → 메인 영역 배치\n"
|
||
"- 결론/핵심요약은 conclusion_text 필드에 기록. page_structure에 넣지 마라.\n"
|
||
"- detail_target: true는 정말로 별도로 봐야 하는 상세 데이터에만 사용\n"
|
||
"- 이미지/표가 있으면 images[], tables[]에 기록\n"
|
||
"- 1페이지 적정 꼭지: 5개. 분량 적으면 1페이지로.\n"
|
||
"- **슬라이드 제목(title)과 첫 번째 꼭지 제목은 달라야 한다.** 슬라이드 제목은 전체 주제, 꼭지 제목은 해당 위치의 구체적 내용.\n\n"
|
||
"## 출력 형식 (JSON만)\n"
|
||
"**page_structure는 출력하지 마라. 영역/zone 배치는 코드가 결정한다.**\n\n"
|
||
"```json\n"
|
||
'{"title": "슬라이드 제목 (MDX title 또는 전체 주제)", '
|
||
'"core_message": "핵심 메시지", '
|
||
'"conclusion_text": "핵심 요약 원본 텍스트 (:::note 등에서 추출. 없으면 빈 문자열)", '
|
||
'"total_pages": 1, '
|
||
'"topics": ['
|
||
'{"id": 1, "title": "꼭지 제목 (원본 그대로)", "summary": "요약", '
|
||
'"purpose": "문제제기|근거사례|핵심전달|용어정의|결론강조|구조시각화", '
|
||
'"source_hint": "원본에서 이 꼭지에 해당하는 텍스트 범위 설명", '
|
||
'"layer": "intro|core|supporting|conclusion", '
|
||
'"role": "flow|reference", '
|
||
'"emphasis": true, "direction": "vertical|horizontal|flexible", '
|
||
'"content_type": "text|image|table|mixed", '
|
||
'"detail_target": false, "page": 1}], '
|
||
'"images": [{"topic_id": 1, "role": "key|supporting", "has_text": false, "description": "이미지 설명"}], '
|
||
'"tables": [{"topic_id": 2, "rows": 5, "cols": 3, "fits_single_page": true, "description": "표 설명"}]}\n'
|
||
"```\n\n"
|
||
"## 콘텐츠:\n"
|
||
)
|
||
|
||
|
||
def _detect_structure_hints(content: str) -> str:
|
||
"""MDX 구조에서 유형 판단 힌트를 자동 감지."""
|
||
hints = []
|
||
content_lower = content.lower()
|
||
|
||
# 용어 정의 섹션 감지
|
||
if re.search(r'##\s*\d*\.?\s*용어\s*정의', content):
|
||
hints.append("[구조 힌트] '용어 정의' 섹션 감지 → 유형 A 후보")
|
||
if re.search(r'##\s*\d*\.?\s*개념\s*비교', content):
|
||
hints.append("[구조 힌트] '개념 비교' 섹션 감지 → 유형 A 후보")
|
||
|
||
# sidebar 마크다운 감지
|
||
if 'sidebar:' in content[:200]:
|
||
pass # frontmatter의 sidebar는 Starlight 설정이므로 무시
|
||
|
||
# <details> 감지
|
||
if '<details>' in content:
|
||
hints.append("[구조 힌트] <details> 참고 사례 감지 → 팝업 처리 대상 (유형 선택과 무관)")
|
||
|
||
# 표 감지
|
||
if '|' in content and '---' in content:
|
||
hints.append("[구조 힌트] 표(테이블) 감지 → 비교 구조")
|
||
|
||
# 이미지 감지
|
||
if re.search(r'!\[.*?\]\(.*?\)', content):
|
||
hints.append("[구조 힌트] 이미지 감지 → 이미지 배치 필요")
|
||
|
||
# A 후보 힌트가 없으면 B 유력
|
||
if not any("유형 A 후보" in h for h in hints):
|
||
hints.append("[구조 힌트] 독립 참조 섹션 없음 → 유형 B 유력")
|
||
|
||
return "\n".join(hints) + "\n\n"
|
||
|
||
|
||
async def classify_content(content: str) -> dict[str, Any] | None:
|
||
"""1단계: Kei API를 통해 꼭지를 추출하고 분석한다.
|
||
|
||
Kei API만 사용. fallback 없음. 실패 시 None → pipeline에서 에러.
|
||
"""
|
||
# MDX 구조 힌트를 content 앞에 추가
|
||
hints = _detect_structure_hints(content)
|
||
result = await _call_kei_api(hints + content)
|
||
if result:
|
||
logger.info(
|
||
f"[Kei API] 꼭지 추출 완료: {result.get('title', '')}, "
|
||
f"{len(result.get('topics', []))}개 꼭지"
|
||
)
|
||
return result
|
||
|
||
logger.error("[Kei API] 꼭지 추출 실패. Kei API(localhost:8000) 확인 필요.")
|
||
return None
|
||
|
||
|
||
KEI_PROMPT_B = (
|
||
"아래는 슬라이드 스토리라인 설계 결과(1단계-A)와 원본 콘텐츠이다.\n"
|
||
"각 꼭지의 **컨셉을 구체화**해줘. 1회 응답으로 모든 꼭지를 한꺼번에 처리.\n\n"
|
||
"## 각 꼭지에 대해 판단할 것\n\n"
|
||
"1. **관계 성격 (relation_type)**:\n"
|
||
" - sequence: 시간/단계 순서 (A→B→C)\n"
|
||
" - inclusion: 포함/융합 관계 (A가 B,C를 포함하거나 B,C가 합쳐져 A를 이룸)\n"
|
||
" - comparison: 대등 비교 (A vs B)\n"
|
||
" - hierarchy: 상위-하위 (A > B > C)\n"
|
||
" - definition: 용어 정의 (나열)\n"
|
||
" - cause_effect: 원인-결과\n"
|
||
" - none: 관계 표현 불필요 (단순 텍스트)\n\n"
|
||
"2. **표현 성격 (expression_hint)** — 관계 성격을 설명. 구체 블록 이름은 쓰지 마라:\n"
|
||
" - 예: '기술 융합/포함 관계. 순서 아님. 구성요소 간 관계 표현 필요.'\n"
|
||
" - 예: '수단-목적 관계. 대등 비교가 아님. 역할 차이를 보여줘야 함.'\n"
|
||
" - 예: '용어 3개의 독립적 정의. 나열.'\n\n"
|
||
"3. **원본 데이터 핵심 항목 (source_data)**:\n"
|
||
" - 이 꼭지에 해당하는 원본의 핵심 항목들을 나열하라.\n"
|
||
" - 항목이 여러 개면 '이름(설명), 이름(설명)' 형태로 쉼표 구분.\n"
|
||
" - 원본에 팝업이 참조되면 반드시 [팝업: 제목] 마커를 포함하라.\n"
|
||
" - 원본에 이미지가 참조되면 반드시 [이미지: 제목] 마커를 포함하라.\n"
|
||
" - 출처가 있으면 포함하라.\n"
|
||
" - '활용 필요', '구체화 필요' 같은 지시사항을 쓰지 마라. 실제 콘텐츠 항목만 쓰라.\n"
|
||
" - 예시: '건설산업(종합산업, 기술 통합 융합), BIM(정보관리 도구, 출처: 국토교통부 2020)'\n"
|
||
" - 예시: '[이미지: DX와 핵심기술간 상호관계] 다이어그램, GIS 역할(공간 분석). [팝업: DX와 BIM의 구분] 비교표'\n\n"
|
||
"## 출력 형식 (JSON만)\n"
|
||
"```json\n"
|
||
'{"concepts": ['
|
||
'{"topic_id": 1, '
|
||
'"relation_type": "inclusion|sequence|comparison|hierarchy|definition|cause_effect|none", '
|
||
'"expression_hint": "관계 성격 설명 (블록 이름 쓰지 말 것)", '
|
||
'"source_data": "핵심 항목 나열 + [팝업:] [이미지:] 마커 + 출처"}]}\n'
|
||
"```\n\n"
|
||
)
|
||
|
||
|
||
async def refine_concepts(
|
||
content: str,
|
||
analysis: dict[str, Any],
|
||
) -> dict[str, Any]:
|
||
"""1단계-B: 각 꼭지의 컨셉을 구체화한다.
|
||
|
||
1단계-A 결과(topics)를 받아서, 각 꼭지의 관계 성격/표현 방법/원본 데이터를 판단.
|
||
Kei API만 사용. fallback 없음. 성공할 때까지 재시도.
|
||
1회 호출로 모든 꼭지를 한꺼번에 처리.
|
||
"""
|
||
import asyncio
|
||
|
||
topics = analysis.get("topics", [])
|
||
if not topics:
|
||
return analysis
|
||
|
||
# 꼭지 요약 텍스트
|
||
topics_text = "\n".join(
|
||
f"- 꼭지 {t.get('id', '?')}: {t.get('title', '?')} "
|
||
f"[purpose: {t.get('purpose', '?')}, summary: {t.get('summary', '')}]"
|
||
for t in topics
|
||
)
|
||
|
||
prompt = (
|
||
KEI_PROMPT_B
|
||
+ f"## 1단계-A 스토리라인 결과\n"
|
||
f"핵심 메시지: {analysis.get('core_message', '?')}\n"
|
||
f"꼭지 목록:\n{topics_text}\n\n"
|
||
f"## 원본 콘텐츠\n{content}\n"
|
||
)
|
||
|
||
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
|
||
RETRY_INTERVAL = 10
|
||
attempt = 0
|
||
|
||
while True:
|
||
attempt += 1
|
||
try:
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
f"{kei_url}/api/direct",
|
||
json={
|
||
"message": prompt,
|
||
},
|
||
timeout=None,
|
||
) as response:
|
||
if response.status_code != 200:
|
||
logger.warning(f"[1단계-B] Kei API HTTP {response.status_code} (시도 {attempt})")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
full_text = await stream_sse_tokens(response)
|
||
|
||
if not full_text:
|
||
logger.warning(f"[1단계-B] 응답 텍스트 없음 (시도 {attempt})")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
result = _parse_json(full_text)
|
||
if result and "concepts" in result:
|
||
# topics에 concept 정보 병합
|
||
# Kei가 topic_id 또는 id로 응답할 수 있으므로 양쪽 다 체크
|
||
concept_map = {}
|
||
for c in result["concepts"]:
|
||
tid = c.get("topic_id") or c.get("id")
|
||
if tid is not None:
|
||
concept_map[tid] = c
|
||
for topic in topics:
|
||
concept = concept_map.get(topic.get("id"))
|
||
if concept:
|
||
topic["relation_type"] = concept.get("relation_type", "")
|
||
topic["expression_hint"] = concept.get("expression_hint", "")
|
||
topic["source_data"] = concept.get("source_data", "")
|
||
|
||
logger.info(f"[1단계-B] 컨셉 구체화 완료: {len(result['concepts'])}개")
|
||
return analysis
|
||
else:
|
||
logger.warning(f"[1단계-B] JSON 파싱 실패 (시도 {attempt}): {full_text[:200]}")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[1단계-B] Kei API 실패 (시도 {attempt}): {e}")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
|
||
KEI_STRUCTURED_TEXT_PROMPT = (
|
||
"아래는 슬라이드 스토리라인의 꼭지 목록과 원본 콘텐츠이다.\n"
|
||
"각 꼭지에 해당하는 원본 텍스트를 **슬라이드에 넣을 형태로 구조화**하라.\n\n"
|
||
"## 절대 규칙\n"
|
||
"1. **원본 문장을 그대로 가져와라. 재작성하지 마라.**\n"
|
||
" 원본: '시설물의 요구 성능을 설계·시공·운영 전 과정에서 디지털로 검증하여 안전성 확보'\n"
|
||
" → 그대로: '• 시설물의 요구 성능을 설계·시공·운영 전 과정에서 디지털로 검증하여 안전성 확보'\n"
|
||
" ❌ 재작성 금지: '디지털 검증으로 안전성을 확보함'\n"
|
||
"2. 원본 내용의 85% 이상을 보존하라. 축약하지 마라.\n"
|
||
"3. **소제목(###)이 있으면 그대로 유지하라.** 삭제하거나 합치지 마라.\n"
|
||
" 원본: '### 안전과 품질' → structured_text에 '안전과 품질' 소제목 유지\n"
|
||
"4. 각 문장을 불릿(•)으로 구분하라.\n"
|
||
"5. 하위 항목이 있으면 들여쓰기 불릿( •)으로 구분하라.\n"
|
||
"6. 출처가 있으면 반드시 포함하라 (출처: ...).\n"
|
||
"7. 개조식 어미로 변환하라 (~있다→~있음, ~한다→~함, ~이다→삭제).\n"
|
||
"8. 팝업 참조([팝업: ...])는 그대로 유지하라.\n"
|
||
"9. 이미지 참조([이미지: ...])는 그대로 유지하라.\n\n"
|
||
"## 출력 형식 (JSON만. 설명 없이.)\n"
|
||
"```json\n"
|
||
'{"structured_texts": ['
|
||
'{"topic_id": 1, '
|
||
'"structured_text": "• 첫 번째 문장\\n• 두 번째 문장\\n • 하위 항목"}]}\n'
|
||
"```\n\n"
|
||
)
|
||
|
||
|
||
async def generate_structured_text(
|
||
content: str,
|
||
analysis: dict[str, Any],
|
||
) -> dict[str, Any]:
|
||
"""1단계-B 보완: 각 꼭지의 structured_text를 생성.
|
||
|
||
refine_concepts() 후 별도 호출. 원본 텍스트를 85% 보존하여 구조화.
|
||
"""
|
||
import asyncio
|
||
|
||
topics = analysis.get("topics", [])
|
||
if not topics:
|
||
return analysis
|
||
|
||
topics_text = "\n".join(
|
||
f"- 꼭지 {t.get('id', '?')}: {t.get('title', '?')} "
|
||
f"[purpose: {t.get('purpose', '?')}, source_hint: {t.get('source_hint', '')}]"
|
||
for t in topics
|
||
)
|
||
|
||
prompt = (
|
||
KEI_STRUCTURED_TEXT_PROMPT
|
||
+ f"## 꼭지 목록\n{topics_text}\n\n"
|
||
+ f"## 원본 콘텐츠\n{content}\n"
|
||
)
|
||
|
||
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
|
||
RETRY_INTERVAL = 10
|
||
attempt = 0
|
||
|
||
while True:
|
||
attempt += 1
|
||
try:
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
f"{kei_url}/api/direct",
|
||
json={"message": prompt},
|
||
timeout=None,
|
||
) as response:
|
||
if response.status_code != 200:
|
||
logger.warning(f"[1단계-B-ST] Kei API HTTP {response.status_code} (시도 {attempt})")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
full_text = await stream_sse_tokens(response)
|
||
|
||
if not full_text:
|
||
logger.warning(f"[1단계-B-ST] 응답 텍스트 없음 (시도 {attempt})")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
result = _parse_json(full_text)
|
||
if result and "structured_texts" in result:
|
||
st_map = {}
|
||
for st in result["structured_texts"]:
|
||
tid = st.get("topic_id") or st.get("id")
|
||
if tid is not None:
|
||
st_map[tid] = st.get("structured_text", "")
|
||
|
||
for topic in topics:
|
||
text = st_map.get(topic.get("id"), "")
|
||
if text:
|
||
topic["structured_text"] = text
|
||
|
||
filled = sum(1 for t in topics if t.get("structured_text"))
|
||
logger.info(f"[1단계-B-ST] structured_text 생성 완료: {filled}/{len(topics)}개")
|
||
return analysis
|
||
else:
|
||
logger.warning(f"[1단계-B-ST] JSON 파싱 실패 (시도 {attempt}): {full_text[:200]}")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[1단계-B-ST] Kei API 실패 (시도 {attempt}): {e}")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
|
||
async def _call_kei_api(content: str) -> dict[str, Any] | None:
|
||
"""Kei API를 통해 꼭지 추출. SSE 스트리밍으로 실시간 수신."""
|
||
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
f"{kei_url}/api/direct",
|
||
json={
|
||
"message": KEI_PROMPT + content,
|
||
},
|
||
timeout=None,
|
||
) as response:
|
||
if response.status_code != 200:
|
||
logger.warning(f"Kei API HTTP {response.status_code}")
|
||
return None
|
||
|
||
full_text = await stream_sse_tokens(response)
|
||
|
||
if not full_text:
|
||
logger.warning("Kei API 응답에서 텍스트 추출 실패")
|
||
return None
|
||
|
||
result = _parse_json(full_text)
|
||
if result and "topics" in result:
|
||
return result
|
||
|
||
logger.warning(f"Kei API JSON 파싱 실패. 텍스트: {full_text[:200]}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Kei API 호출 실패: {e}")
|
||
return None
|
||
|
||
|
||
|
||
|
||
# ──────────────────────────────────────
|
||
# Phase Q-4: 제약 기반 블록 선택 (Kei 1회 호출)
|
||
# ──────────────────────────────────────
|
||
|
||
BLOCK_SELECTION_PROMPT = """당신은 11년 경력의 기획 실장이다. 각 꼭지(topic)에 가장 적합한 블록과 변형(variant)을 선택하라.
|
||
|
||
## 판단 기준 (우선순위 순)
|
||
1. 콘텐츠의 **표현 의도(expression_hint)**를 가장 잘 시각화하는 블록+변형
|
||
2. 이 꼭지의 목적(purpose)에 부합하는 표현 방식
|
||
3. 블록에 변형(variant)이 있으면, 콘텐츠에 더 적합한 변형을 선택
|
||
4. 글자수 예산 내에서 의미 전달이 가능한 블록
|
||
5. 다른 꼭지와 같은 블록 타입이 되지 않도록 다양성 확보
|
||
|
||
## 변형(variant) 선택 규칙
|
||
- 블록에 변형이 여러 개 있으면, "when" 조건과 expression_hint를 비교하여 적합한 것 선택
|
||
- 기본(default)이 적합하면 variant를 "default"로 지정
|
||
- 변형의 "when"이 expression_hint와 맞으면 해당 variant 선택
|
||
|
||
## 출력 (JSON, 꼭지 수만큼)
|
||
{"selections": [{"topic_id": 1, "block_id": "블록 id", "variant": "default 또는 변형 id", "reason": "선택 근거 1문장"}, ...]}
|
||
"""
|
||
|
||
|
||
async def select_block_for_topics(
|
||
topics: list[dict[str, Any]],
|
||
candidates_per_topic: dict[int, list[dict]],
|
||
budgets_per_topic: dict[int, dict[str, dict]],
|
||
container_specs: dict[str, Any],
|
||
analysis: dict[str, Any],
|
||
) -> dict[int, dict] | None:
|
||
"""Phase Q-4: 필터링된 후보 목록에서 Kei가 topic별 블록을 1개씩 선택.
|
||
|
||
AI 1회 호출로 모든 topic의 블록을 동시 선택한다.
|
||
|
||
Args:
|
||
topics: 1단계 분석의 topic 리스트
|
||
candidates_per_topic: {topic_id: [후보 블록 리스트]}
|
||
budgets_per_topic: {topic_id: {block_id: budget_dict}}
|
||
container_specs: 역할별 ContainerSpec
|
||
analysis: 1단계 분석 결과
|
||
|
||
Returns:
|
||
{topic_id: {"block_id": "...", "reason": "..."}} 또는 None (실패)
|
||
"""
|
||
from src.block_selector import format_candidates_for_prompt
|
||
from src.space_allocator import find_container_for_topic
|
||
|
||
core_message = analysis.get("core_message", "")
|
||
|
||
# 프롬프트 구성
|
||
prompt_parts = [
|
||
BLOCK_SELECTION_PROMPT,
|
||
f"\n## 슬라이드 핵심 메시지\n{core_message}\n",
|
||
]
|
||
|
||
for topic in topics:
|
||
tid = topic.get("id")
|
||
candidates = candidates_per_topic.get(tid, [])
|
||
if not candidates:
|
||
continue
|
||
|
||
spec = find_container_for_topic(tid, container_specs)
|
||
per_topic_px = spec.height_px // max(1, len(spec.topic_ids)) if spec else 0
|
||
budget = budgets_per_topic.get(tid, {})
|
||
|
||
expression_hint = topic.get("expression_hint", "")
|
||
prompt_parts.append(
|
||
f"\n### 꼭지 {tid}: {topic.get('title', '')}\n"
|
||
f"- 목적: {topic.get('purpose', '')}\n"
|
||
f"- 관계 유형: {topic.get('relation_type', 'none')}\n"
|
||
f"- ★ 표현 의도: {expression_hint}\n"
|
||
f"- 컨테이너: {per_topic_px}px × {spec.width_px if spec else 700}px "
|
||
f"({spec.role if spec else '?'} {spec.zone if spec else '?'})\n"
|
||
f"- 후보 블록:\n"
|
||
f"{format_candidates_for_prompt(candidates, budget)}\n"
|
||
)
|
||
|
||
full_prompt = "\n".join(prompt_parts)
|
||
|
||
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
f"{kei_url}/api/direct",
|
||
json={
|
||
"message": full_prompt,
|
||
},
|
||
timeout=None,
|
||
) as response:
|
||
if response.status_code != 200:
|
||
logger.warning(f"[Q-4] Kei API HTTP {response.status_code}")
|
||
return None
|
||
|
||
full_text = await stream_sse_tokens(response)
|
||
|
||
if not full_text:
|
||
logger.warning("[Q-4] Kei API 응답 비어있음")
|
||
return None
|
||
|
||
result = _parse_json(full_text)
|
||
if not result or "selections" not in result:
|
||
logger.warning(f"[Q-4] JSON 파싱 실패: {full_text[:200]}")
|
||
return None
|
||
|
||
# 결과 → {topic_id: {"block_id": ..., "reason": ...}}
|
||
selections = {}
|
||
for sel in result["selections"]:
|
||
tid = sel.get("topic_id")
|
||
block_id = sel.get("block_id", "")
|
||
|
||
# catalog 존재 검증 (유령 블록 최종 차단)
|
||
candidates = candidates_per_topic.get(tid, [])
|
||
valid_ids = {c.get("id") for c in candidates}
|
||
if block_id not in valid_ids:
|
||
logger.warning(
|
||
f"[Q-4] topic {tid}: Kei가 '{block_id}' 선택했으나 후보에 없음. "
|
||
f"첫 번째 후보로 대체."
|
||
)
|
||
block_id = candidates[0]["id"] if candidates else block_id
|
||
|
||
selections[tid] = {
|
||
"block_id": block_id,
|
||
"variant": sel.get("variant", "default"),
|
||
"reason": sel.get("reason", ""),
|
||
}
|
||
|
||
logger.info(
|
||
f"[Q-4] 블록 선택 완료: "
|
||
+ ", ".join(f"t{tid}={s['block_id']}" for tid, s in selections.items())
|
||
)
|
||
return selections
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Q-4] Kei 블록 선택 실패: {e}")
|
||
return None
|
||
|
||
|
||
# ──────────────────────────────────────
|
||
# Phase Q-6: 비전 모델 품질 게이트 (VASCAR식)
|
||
# ──────────────────────────────────────
|
||
|
||
VISION_QUALITY_PROMPT = """슬라이드 스크린샷을 평가하라.
|
||
|
||
## 체크리스트 (각 항목 1-5점)
|
||
1. 콘텐츠 겹침/잘림: 모든 텍스트가 컨테이너 안에 있는가?
|
||
2. 시각적 위계: 본심 영역이 가장 두드러지는가?
|
||
3. 가독성: 모든 폰트가 읽을 수 있는 크기인가? (10px 이상)
|
||
4. 블록 다양성: 서로 다른 블록 유형을 사용하고 있는가?
|
||
5. 전문성: 한국어 비즈니스 프레젠테이션으로 적절한가?
|
||
|
||
## 출력 (JSON)
|
||
{
|
||
"passed": true/false,
|
||
"score": 0-100,
|
||
"checks": {"겹침": 5, "위계": 4, "가독성": 5, "다양성": 3, "전문성": 4},
|
||
"issues": ["문제 설명 (있으면)"],
|
||
"fix_targets": [{"area": "body", "topic_id": 3, "action": "shrink|replace|rewrite", "detail": "구체적 지시"}]
|
||
}
|
||
"""
|
||
|
||
|
||
async def vision_quality_gate(
|
||
screenshot_b64: str,
|
||
analysis: dict[str, Any],
|
||
) -> dict[str, Any] | None:
|
||
"""Phase Q-6: 스크린샷 기반 시각 품질 평가.
|
||
|
||
VASCAR 논문 기반 — 렌더링 → 비전 모델 평가 → 교정 여부 결정.
|
||
|
||
Returns:
|
||
{"passed": bool, "score": int, "issues": [...], "fix_targets": [...]}
|
||
"""
|
||
import anthropic
|
||
import base64
|
||
|
||
core_message = analysis.get("core_message", "")
|
||
topic_summary = ", ".join(
|
||
f"{t.get('id')}:{t.get('title','')}" for t in analysis.get("topics", [])
|
||
)
|
||
|
||
try:
|
||
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||
|
||
response = await client.messages.create(
|
||
model="claude-opus-4-6-20250415",
|
||
max_tokens=2048,
|
||
messages=[{
|
||
"role": "user",
|
||
"content": [
|
||
{
|
||
"type": "image",
|
||
"source": {
|
||
"type": "base64",
|
||
"media_type": "image/png",
|
||
"data": screenshot_b64,
|
||
},
|
||
},
|
||
{
|
||
"type": "text",
|
||
"text": (
|
||
VISION_QUALITY_PROMPT +
|
||
f"\n\n## 컨텍스트\n"
|
||
f"핵심 메시지: {core_message}\n"
|
||
f"꼭지 구성: {topic_summary}\n"
|
||
),
|
||
},
|
||
],
|
||
}],
|
||
)
|
||
|
||
result_text = response.content[0].text if response.content else ""
|
||
result = _parse_json(result_text)
|
||
|
||
if result:
|
||
score = result.get("score", 0)
|
||
passed = result.get("passed", score >= 50)
|
||
result["passed"] = passed
|
||
logger.info(
|
||
f"[Q-6] 품질 게이트: {score}/100 → {'PASS' if passed else 'FAIL'}"
|
||
)
|
||
return result
|
||
|
||
logger.warning(f"[Q-6] JSON 파싱 실패: {result_text[:200]}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Q-6] 비전 품질 평가 실패: {e}")
|
||
return None
|
||
|
||
|
||
# ──────────────────────────────────────
|
||
# J-7: Kei 최종 검수
|
||
# ──────────────────────────────────────
|
||
|
||
KEI_REVIEW_PROMPT = """당신은 11년 경력의 기획 실장이다. 디자인 팀장이 조립한 슬라이드를 최종 검수한다.
|
||
|
||
## 검수 관점
|
||
1. 핵심 메시지(core_message)가 시각적으로 명확히 전달되는가?
|
||
2. 콘텐츠 흐름이 블록 배치와 일치하는가?
|
||
3. 각 블록이 해당 꼭지의 purpose에 적합한가?
|
||
4. 중요한 내용이 빠지거나 과도하게 축소되지 않았는가?
|
||
5. 높이 초과: 각 zone의 블록+텍스트가 예산을 초과하는가?
|
||
- 텍스트 축약으로 해결 가능 → shrink
|
||
- 콘텐츠가 본질적으로 큼 → overflow_detected
|
||
6. **핵심전달이 body에서 가장 큰 시각적 비중을 차지하는가?**
|
||
- 핵심전달 블록이 도입부(문제제기+근거사례)보다 작으면 → rewrite로 비중 조정
|
||
7. **문제제기가 간결한가? (100자 이내)**
|
||
- 초과 시 → shrink (target_ratio: 0.5)
|
||
8. **용어정의가 sidebar에 있는가?**
|
||
- body에 있으면 → 구조 문제 지적 (issues에 명시)
|
||
9. **핵심전달 블록이 화면 안에 보이는가?**
|
||
- 잘리면 → overflow_detected
|
||
|
||
## 조정 action
|
||
- expand: 텍스트 늘림 (target_ratio, 예: 1.3)
|
||
- shrink: 텍스트 줄임 (target_ratio, 예: 0.7)
|
||
- rewrite: 텍스트 재작성 (detail에 방향)
|
||
- overflow_detected: 높이 초과, 콘텐츠 판단 필요 (zone과 블록 명시)
|
||
|
||
## 출력 (JSON만. 설명 없이.)
|
||
{"needs_adjustment": true/false, "issues": ["이슈1"], "adjustments": [{"block_area": "...", "action": "expand|shrink|rewrite|overflow_detected", "target_ratio": 1.3, "detail": "..."}]}
|
||
"""
|
||
|
||
|
||
async def call_kei_final_review(
|
||
html: str,
|
||
block_summary: list[str],
|
||
zone_budget_text: str,
|
||
overflow_hint_text: str,
|
||
analysis: dict[str, Any] | None = None,
|
||
screenshot_b64: str | None = None,
|
||
) -> dict[str, Any] | None:
|
||
"""Phase N-4: Kei(Opus)가 스크린샷을 보고 최종 검수한다.
|
||
|
||
스크린샷이 있으면: Anthropic API 직접 호출 (Opus + 멀티모달)
|
||
스크린샷이 없으면: Kei API 경유 (텍스트 기반)
|
||
어느 경로든 Kei(Opus)가 판단. Sonnet 절대 금지.
|
||
"""
|
||
import anthropic
|
||
|
||
core_message = analysis.get("core_message", "") if analysis else ""
|
||
topics_summary = ""
|
||
if analysis:
|
||
topics_summary = "\n".join(
|
||
f"- 꼭지 {t.get('id')}: {t.get('title', '')} [{t.get('purpose', '')}]"
|
||
for t in analysis.get("topics", [])
|
||
)
|
||
|
||
review_text = (
|
||
f"## 핵심 메시지\n{core_message}\n\n"
|
||
f"## 꼭지 목록\n{topics_summary}\n\n"
|
||
f"## 블록별 데이터 양\n" + "\n".join(block_summary) +
|
||
zone_budget_text +
|
||
overflow_hint_text +
|
||
f"\n\n위 슬라이드를 검수하고 조정이 필요한지 판단해. JSON만."
|
||
)
|
||
|
||
# 스크린샷이 있으면: Opus 직접 호출 + 이미지 전달
|
||
if screenshot_b64:
|
||
try:
|
||
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||
response = await client.messages.create(
|
||
model="claude-opus-4-6-20250415",
|
||
max_tokens=4096,
|
||
system=KEI_REVIEW_PROMPT,
|
||
messages=[{
|
||
"role": "user",
|
||
"content": [
|
||
{
|
||
"type": "image",
|
||
"source": {
|
||
"type": "base64",
|
||
"media_type": "image/png",
|
||
"data": screenshot_b64,
|
||
},
|
||
},
|
||
{
|
||
"type": "text",
|
||
"text": review_text,
|
||
},
|
||
],
|
||
}],
|
||
)
|
||
|
||
result_text = response.content[0].text
|
||
result = _parse_json(result_text)
|
||
if result and "needs_adjustment" in result:
|
||
logger.info(
|
||
f"[Kei 최종 검수] 스크린샷 기반, needs_adjustment={result['needs_adjustment']}"
|
||
)
|
||
return result
|
||
logger.warning("[Kei 최종 검수] 스크린샷 기반 JSON 파싱 실패")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Kei 최종 검수 (스크린샷) 실패: {e}")
|
||
return None
|
||
|
||
# 스크린샷 없으면: Kei API 경유 (텍스트 기반)
|
||
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
|
||
prompt = (
|
||
KEI_REVIEW_PROMPT + "\n\n" + review_text +
|
||
f"\n\n## 조립 HTML (요약)\n{html[:3000]}"
|
||
)
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
f"{kei_url}/api/direct",
|
||
json={
|
||
"message": prompt,
|
||
},
|
||
timeout=None,
|
||
) as response:
|
||
if response.status_code != 200:
|
||
logger.warning(f"Kei 최종 검수 HTTP {response.status_code}")
|
||
return None
|
||
full_text = await stream_sse_tokens(response)
|
||
|
||
if full_text:
|
||
result = _parse_json(full_text)
|
||
if result and "needs_adjustment" in result:
|
||
logger.info(
|
||
f"[Kei 최종 검수] 텍스트 기반, needs_adjustment={result['needs_adjustment']}"
|
||
)
|
||
return result
|
||
logger.warning("[Kei 최종 검수] JSON 파싱 실패")
|
||
return None
|
||
|
||
logger.warning("Kei 최종 검수 텍스트 추출 실패")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Kei 최종 검수 실패: {e}")
|
||
return None
|
||
|
||
|
||
# ──────────────────────────────────────
|
||
# I-9: Kei 넘침 판단 호출
|
||
# ──────────────────────────────────────
|
||
|
||
KEI_OVERFLOW_PROMPT = """당신은 슬라이드 콘텐츠 전문가이다.
|
||
디자인 팀장이 배치한 블록들이 컨테이너(zone)의 높이 예산을 초과했다.
|
||
콘텐츠의 중요도와 전달 메시지를 기준으로 어떻게 처리할지 판단하라.
|
||
|
||
## 판단 기준
|
||
- 텍스트 분량만 줄이면 현재 블록 구조 안에서 해결되는가? → "trim"
|
||
- 콘텐츠 자체가 컨테이너에 담기엔 본질적으로 많은가? → "restructure"
|
||
- 중요도가 높은 콘텐츠를 무리하게 축소하면 안 된다
|
||
- 부가/상세 정보는 팝업(detail page)으로 분리할 수 있다
|
||
|
||
## 출력 (JSON만. 설명 없이.)
|
||
|
||
Option 1 (텍스트 축약):
|
||
{"decision": "trim", "trim_targets": [{"topic_id": 1, "max_chars": 200, "reason": "부연 설명 축약 가능"}]}
|
||
|
||
Option 2 (핵심 재구성 + 팝업 분리):
|
||
{"decision": "restructure", "core_topics": [1, 2], "detail_topics": [3], "reason": "12행 비교표는 팝업으로 분리"}
|
||
"""
|
||
|
||
|
||
async def call_kei_overflow_judgment(
|
||
overflows: list[dict],
|
||
content: str,
|
||
analysis: dict[str, Any],
|
||
) -> dict[str, Any] | None:
|
||
"""Kei API에 넘침 상황을 전달하고 판단을 받는다.
|
||
|
||
반드시 Kei API 경유. Anthropic 직접 호출 절대 금지.
|
||
실패 시 None → pipeline에서 무한 재시도.
|
||
"""
|
||
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
|
||
|
||
overflow_desc = json.dumps(overflows, ensure_ascii=False, indent=2)
|
||
topics_desc = json.dumps(
|
||
[
|
||
{
|
||
"id": t.get("id"),
|
||
"title": t.get("title", ""),
|
||
"purpose": t.get("purpose", ""),
|
||
"summary": t.get("summary", "")[:100],
|
||
}
|
||
for t in analysis.get("topics", [])
|
||
],
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
# I-8: 대형 콘텐츠 정보 포함
|
||
extra_info = ""
|
||
tables = analysis.get("tables", [])
|
||
if tables:
|
||
extra_info += f"\n\n## 테이블 정보\n{json.dumps(tables, ensure_ascii=False)}"
|
||
images = analysis.get("images", [])
|
||
if images:
|
||
extra_info += f"\n\n## 이미지 정보\n{json.dumps(images, ensure_ascii=False)}"
|
||
|
||
prompt = (
|
||
KEI_OVERFLOW_PROMPT + "\n\n"
|
||
f"## 넘침 현황\n{overflow_desc}\n\n"
|
||
f"## 꼭지 목록\n{topics_desc}"
|
||
f"{extra_info}\n\n"
|
||
f"## 원본 콘텐츠 요약\n{content[:2000]}"
|
||
)
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
f"{kei_url}/api/direct",
|
||
json={
|
||
"message": prompt,
|
||
},
|
||
timeout=None,
|
||
) as response:
|
||
if response.status_code != 200:
|
||
logger.warning(f"Kei API (overflow) HTTP {response.status_code}")
|
||
return None
|
||
full_text = await stream_sse_tokens(response)
|
||
|
||
if full_text:
|
||
result = _parse_json(full_text)
|
||
if result and "decision" in result:
|
||
logger.info(f"[Kei 넘침 판단] decision={result['decision']}")
|
||
return result
|
||
logger.warning("[Kei 넘침 판단] JSON 파싱 실패 또는 decision 없음")
|
||
return None
|
||
|
||
logger.warning("Kei API (overflow) 텍스트 추출 실패")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Kei API (overflow) 호출 실패: {e}")
|
||
return None
|
||
|
||
|
||
def _parse_json(text: str) -> dict[str, Any] | None:
|
||
"""텍스트에서 JSON을 추출한다.
|
||
|
||
Kei API가 마크다운 리스트 접두사(- )를 붙여 응답하는 경우에도 처리.
|
||
"""
|
||
# 전처리: 각 줄 앞의 마크다운 리스트 접두사(- ) 제거
|
||
# Kei API가 JSON을 마크다운 리스트로 감싸서 응답하는 경우 대응
|
||
lines = text.split("\n")
|
||
cleaned_lines = []
|
||
for line in lines:
|
||
stripped = line.lstrip()
|
||
if stripped.startswith("- "):
|
||
cleaned_lines.append(stripped[2:])
|
||
elif stripped.startswith("* "):
|
||
cleaned_lines.append(stripped[2:])
|
||
else:
|
||
cleaned_lines.append(stripped)
|
||
cleaned = "\n".join(cleaned_lines)
|
||
|
||
# 원본 + 클린 버전 둘 다 시도
|
||
for target in [text, cleaned]:
|
||
patterns = [
|
||
r"```json\s*(.*?)```",
|
||
r"```\s*(.*?)```",
|
||
r"(\{.*\})",
|
||
]
|
||
for pattern in patterns:
|
||
match = re.search(pattern, target, re.DOTALL)
|
||
if match:
|
||
try:
|
||
return json.loads(match.group(1).strip())
|
||
except json.JSONDecodeError:
|
||
continue
|
||
return None
|
||
|
||
|
||
async def select_best_candidate(
|
||
topic_results: list[dict[str, Any]],
|
||
analysis: dict[str, Any],
|
||
) -> dict[str, Any]:
|
||
"""Phase P: Kei가 스크린샷을 보고 topic별 최적 블록을 선택한다.
|
||
|
||
여러 topic을 묶어서 1회 호출. 각 topic별 후보 스크린샷을 Opus 멀티모달로 비교.
|
||
|
||
Args:
|
||
topic_results: [{
|
||
"topic_id": 1,
|
||
"topic_title": "...",
|
||
"purpose": "문제제기",
|
||
"candidates": [
|
||
{"index": 0, "type": "callout-warning", "screenshot_b64": "...", "overflowed": False},
|
||
{"index": 1, "type": "dark-bullet-list", "screenshot_b64": "...", "overflowed": False},
|
||
{"index": 2, "type": "quote-big-mark", "screenshot_b64": "...", "overflowed": True},
|
||
]
|
||
}, ...]
|
||
analysis: 1단계 분석 결과 (core_message 등)
|
||
|
||
Returns:
|
||
{"selections": [{"topic_id": 1, "selected_index": 0, "reason": "..."}]}
|
||
"""
|
||
import anthropic
|
||
|
||
core_message = analysis.get("core_message", "")
|
||
|
||
# 메시지 content 블록 구성 (텍스트 + 이미지들)
|
||
content_blocks = []
|
||
|
||
# 지시문
|
||
instruction = (
|
||
f"슬라이드의 핵심 메시지: {core_message}\n\n"
|
||
"아래 각 꼭지(topic)별로 후보 블록 스크린샷을 보여준다.\n"
|
||
"각 꼭지마다 **당초 목적에 가장 적합한 1개**를 선택하라.\n\n"
|
||
"판단 기준:\n"
|
||
"1. 당초 목적(purpose)에 적합한가? (문제 제기인데 비교 블록이면 부적합)\n"
|
||
"2. 콘텐츠 의미가 왜곡되지 않는가?\n"
|
||
"3. 컨테이너에 넘치지 않는가? (overflow 표시된 것은 감점)\n"
|
||
"4. 같은 블록이 다른 topic과 중복되면 피하라.\n\n"
|
||
"전부 안 맞으면 그나마 가장 나은 것을 선택하라.\n\n"
|
||
)
|
||
|
||
# 각 topic의 후보 스크린샷 추가
|
||
for tr in topic_results:
|
||
tid = tr["topic_id"]
|
||
purpose = tr.get("purpose", "")
|
||
instruction += f"## 꼭지 {tid}: {tr.get('topic_title', '')} (목적: {purpose})\n"
|
||
|
||
for cand in tr.get("candidates", []):
|
||
idx = cand["index"]
|
||
block_type = cand.get("type", "")
|
||
overflowed = cand.get("overflowed", False)
|
||
overflow_note = " ⚠️ OVERFLOW" if overflowed else ""
|
||
instruction += f" 후보 {idx}: {block_type}{overflow_note}\n"
|
||
|
||
content_blocks.append({"type": "text", "text": instruction})
|
||
|
||
# 스크린샷 이미지 추가
|
||
for tr in topic_results:
|
||
for cand in tr.get("candidates", []):
|
||
b64 = cand.get("screenshot_b64")
|
||
if b64:
|
||
content_blocks.append({
|
||
"type": "text",
|
||
"text": f"[꼭지 {tr['topic_id']} 후보 {cand['index']}: {cand.get('type', '')}]",
|
||
})
|
||
content_blocks.append({
|
||
"type": "image",
|
||
"source": {
|
||
"type": "base64",
|
||
"media_type": "image/png",
|
||
"data": b64,
|
||
},
|
||
})
|
||
|
||
content_blocks.append({
|
||
"type": "text",
|
||
"text": (
|
||
"\n## 출력 (JSON만)\n"
|
||
'{"selections": [{"topic_id": 1, "selected_index": 0, "reason": "선택 이유"}]}'
|
||
),
|
||
})
|
||
|
||
try:
|
||
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||
response = await client.messages.create(
|
||
model="claude-opus-4-6-20250415",
|
||
max_tokens=2048,
|
||
messages=[{"role": "user", "content": content_blocks}],
|
||
)
|
||
|
||
result_text = response.content[0].text
|
||
result = _parse_json(result_text)
|
||
if result and "selections" in result:
|
||
logger.info(
|
||
f"[Phase P] Kei 최종 선택: "
|
||
+ ", ".join(f"t{s['topic_id']}→후보{s['selected_index']}" for s in result["selections"])
|
||
)
|
||
return result
|
||
|
||
logger.warning(f"[Phase P] 선택 JSON 파싱 실패: {result_text[:200]}")
|
||
return {"selections": []}
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[Phase P] Kei 최종 선택 실패: {e}")
|
||
return {"selections": []}
|
||
|
||
|
||
# ──────────────────────────────────────
|
||
# Phase V: 콘텐츠-컨테이너 적합성 에스컬레이션
|
||
# ──────────────────────────────────────
|
||
|
||
KEI_ENHANCEMENT_REVIEW_PROMPT = """당신은 슬라이드 설계 전문가이다.
|
||
아래는 슬라이드 콘텐츠 품질 강화를 위한 제안 목록이다.
|
||
각 제안을 검토하고, 승인/수정/거부를 결정하라.
|
||
|
||
## 판단 기준
|
||
- 핵심 메시지가 시각적으로 강조되는가?
|
||
- 빈 공간에 유의미한 콘텐츠를 추가할 수 있는가?
|
||
- 종속 꼭지의 처리 방식(인라인/하위블록)이 적절한가?
|
||
- bold 키워드가 핵심 용어인가?
|
||
|
||
## 출력 (JSON만. 설명 없이.)
|
||
|
||
```json
|
||
{
|
||
"decisions": [
|
||
{
|
||
"type": "subordinate|fill_space|emphasis|bold_keywords",
|
||
"role": "배경|본심|첨부|결론",
|
||
"action": "approve|modify|reject",
|
||
"modification": "수정 시 구체적 내용 (approve/reject면 빈 문자열)"
|
||
}
|
||
]
|
||
}
|
||
```
|
||
"""
|
||
|
||
|
||
async def call_kei_enhancement_review(
|
||
enhancement_report: str,
|
||
topics: list[dict],
|
||
core_message: str,
|
||
) -> dict[str, Any] | None:
|
||
"""Stage 1.8 Step 5: Kei에게 보강 제안을 보여주고 승인/수정/거부 결정을 받는다.
|
||
|
||
Kei API(/api/direct)만 사용.
|
||
"""
|
||
import asyncio
|
||
|
||
topics_text = "\n".join(
|
||
f"- 꼭지{t.get('id', '?')}: {t.get('title', '')} [{t.get('purpose', '')}]"
|
||
for t in topics
|
||
)
|
||
|
||
prompt = (
|
||
KEI_ENHANCEMENT_REVIEW_PROMPT + "\n\n"
|
||
f"## 핵심 메시지\n{core_message}\n\n"
|
||
f"## 꼭지 목록\n{topics_text}\n\n"
|
||
f"## 보강 제안\n{enhancement_report}\n"
|
||
)
|
||
|
||
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
|
||
RETRY_INTERVAL = 10
|
||
attempt = 0
|
||
|
||
while True:
|
||
attempt += 1
|
||
try:
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
f"{kei_url}/api/direct",
|
||
json={"message": prompt},
|
||
timeout=None,
|
||
) as response:
|
||
if response.status_code != 200:
|
||
logger.warning(f"[V-5 Kei 보강 검토] HTTP {response.status_code} (시도 {attempt})")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
full_text = await stream_sse_tokens(response)
|
||
|
||
if not full_text:
|
||
logger.warning(f"[V-5 Kei 보강 검토] 응답 없음 (시도 {attempt})")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
result = _parse_json(full_text)
|
||
if result and "decisions" in result:
|
||
approved = sum(1 for d in result["decisions"] if d.get("action") == "approve")
|
||
total = len(result["decisions"])
|
||
logger.info(f"[V-5 Kei 보강 검토] {approved}/{total} 승인")
|
||
return result
|
||
else:
|
||
logger.warning(f"[V-5 Kei 보강 검토] JSON 파싱 실패 (시도 {attempt})")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[V-5 Kei 보강 검토] 실패 (시도 {attempt}): {e}")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
|
||
async def call_kei_summarize_popup(
|
||
popup_title: str,
|
||
popup_content: str,
|
||
available_width_px: float,
|
||
available_height_px: float,
|
||
font_size: float,
|
||
) -> dict[str, Any] | None:
|
||
"""V'-2: 코드가 형태+크기를 결정하고, Kei가 텍스트만 채운다.
|
||
|
||
1. 코드: 팝업 원본에 표(|)가 있으면 table, 불릿(•)이면 bullets, 그 외 text
|
||
2. 코드: 공간 크기와 폰트를 고려하여 행/열 수 계산
|
||
3. Kei: 결정된 형태+크기에 맞게 원본 내용을 요약
|
||
|
||
Returns:
|
||
{
|
||
"format": "table" | "bullets" | "text",
|
||
"columns": [...], "data": [["셀", ...], ...], # table
|
||
"items": ["...", ...], # bullets
|
||
"summary": "...", # text
|
||
}
|
||
"""
|
||
import asyncio
|
||
|
||
header_h = font_size * 1.5 + font_size * 0.6
|
||
row_h = font_size * 1.5 + font_size * 0.6
|
||
bullet_h = font_size * 1.55
|
||
chars_per_col = int(available_width_px / (font_size * 0.6))
|
||
|
||
# 코드가 형태 판단
|
||
import re
|
||
has_table = popup_content.count("|") > 6 or "<table" in popup_content
|
||
has_bullets = popup_content.count("•") > 2
|
||
|
||
if has_table:
|
||
# 원본 표에서 열 수 추출 — <th> 태그 우선, 없으면 | 파싱
|
||
th_headers = re.findall(r'<th[^>]*>(.*?)</th>', popup_content)
|
||
# <strong> 등 태그 제거
|
||
th_headers = [re.sub(r'<[^>]+>', '', h).strip() for h in th_headers]
|
||
if th_headers:
|
||
# 첫 번째 <thead>의 열만 사용 (중복 테이블 헤더 제거)
|
||
orig_cols = th_headers[:3] if len(th_headers) > 3 else th_headers
|
||
col_count = len(orig_cols)
|
||
else:
|
||
table_lines = [l.strip() for l in popup_content.split("\n") if l.strip().startswith("|")]
|
||
if len(table_lines) >= 2:
|
||
orig_cols = [c.strip() for c in table_lines[0].split("|") if c.strip()]
|
||
col_count = len(orig_cols)
|
||
else:
|
||
orig_cols = []
|
||
col_count = 3
|
||
# 행 수: 공간에 맞게 계산 (제목행 제외, 데이터 행만)
|
||
space_rows = int((available_height_px - header_h) / row_h) if available_height_px > header_h else 1
|
||
# 원본 표 데이터 행 수
|
||
orig_data_rows = len(re.findall(r'<tr>', popup_content)) or len([l for l in popup_content.split("\n") if l.strip().startswith("|") and not l.strip().startswith("|--")]) - 1
|
||
orig_data_rows = max(1, orig_data_rows)
|
||
# 공간과 원본 중 작은 쪽, 최소 1 최대 5
|
||
max_rows = min(space_rows, orig_data_rows, 5)
|
||
max_rows = max(1, max_rows)
|
||
chars_per_col = int(available_width_px / col_count / (font_size * 0.6))
|
||
fmt = "table"
|
||
prompt_task = (
|
||
f"원본 표를 정확히 {col_count}열 × {max_rows}행으로 요약하라.\n"
|
||
f"열 이름: {orig_cols}\n"
|
||
f"data 배열에 정확히 {max_rows}개의 행을 넣어라. 1행만 넣지 마라.\n"
|
||
f"원본에서 가장 핵심적인 {max_rows}개 비교 항목(범위, S/W, 프로세스, 성과품, 활용 등)을 골라라.\n"
|
||
f"각 셀은 {chars_per_col}자 이내의 짧은 핵심 요약으로.\n"
|
||
f"JSON: {{\"columns\": [\"{orig_cols[0] if orig_cols else '열1'}\", ...], \"data\": [[\"셀\", ...], ...]}}"
|
||
)
|
||
elif has_bullets:
|
||
max_items = int(available_height_px / bullet_h)
|
||
max_items = max(1, max_items)
|
||
fmt = "bullets"
|
||
prompt_task = (
|
||
f"원본 불릿을 {max_items}개 이내로 요약하라.\n"
|
||
f"각 항목은 {chars_per_col}자 이내로.\n"
|
||
f"JSON: {{\"items\": [\"항목1\", ...]}}"
|
||
)
|
||
else:
|
||
max_lines = int(available_height_px / bullet_h)
|
||
fmt = "text"
|
||
prompt_task = (
|
||
f"원본을 {max_lines}줄 이내로 요약하라.\n"
|
||
f"JSON: {{\"summary\": \"요약 텍스트\"}}"
|
||
)
|
||
|
||
prompt = f"""당신은 슬라이드 콘텐츠 요약 전문가이다.
|
||
|
||
## 팝업 제목: {popup_title}
|
||
|
||
## 팝업 원본:
|
||
{popup_content}
|
||
|
||
## 요청
|
||
{prompt_task}
|
||
|
||
핵심만 남기되, 원본의 의미가 왜곡되지 않도록 하라. JSON만 응답하라."""
|
||
|
||
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
|
||
attempt = 0
|
||
|
||
while attempt < 5:
|
||
attempt += 1
|
||
try:
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
f"{kei_url}/api/direct",
|
||
json={"message": prompt},
|
||
timeout=None,
|
||
) as response:
|
||
if response.status_code != 200:
|
||
logger.warning(f"[V'-2 Kei 요약] HTTP {response.status_code} (시도 {attempt})")
|
||
await asyncio.sleep(10)
|
||
continue
|
||
full_text = await stream_sse_tokens(response)
|
||
|
||
if not full_text:
|
||
await asyncio.sleep(10)
|
||
continue
|
||
|
||
result = _parse_json(full_text)
|
||
if result and isinstance(result, dict):
|
||
# 코드가 결정한 format을 주입 (Kei는 텍스트만 채움)
|
||
result["format"] = fmt
|
||
logger.info(f"[V'-2 Kei 요약] {popup_title}: format={fmt}")
|
||
return result
|
||
else:
|
||
logger.warning(f"[V'-2 Kei 요약] 파싱 실패 (시도 {attempt})")
|
||
await asyncio.sleep(10)
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[V'-2 Kei 요약] 실패 (시도 {attempt}): {e}")
|
||
await asyncio.sleep(10)
|
||
continue
|
||
|
||
return None
|
||
|
||
|
||
async def call_kei_bold_keywords(
|
||
topics: list[dict],
|
||
page_structure: dict,
|
||
) -> dict[str, list[str]]:
|
||
"""V-10: Kei가 문맥 기반으로 각 역할의 bold 키워드를 판단한다.
|
||
|
||
Returns:
|
||
{"배경": ["키워드1", ...], "본심": [...], ...}
|
||
"""
|
||
import asyncio
|
||
|
||
# 역할별 structured_text 정리
|
||
topic_map = {t.get("id"): t for t in topics}
|
||
role_texts = {}
|
||
for role, info in page_structure.items():
|
||
if not isinstance(info, dict):
|
||
continue
|
||
tids = info.get("topic_ids", [])
|
||
texts = []
|
||
for tid in tids:
|
||
topic = topic_map.get(tid, {})
|
||
st = topic.get("structured_text", "") or topic.get("source_data", "")
|
||
if st:
|
||
texts.append(f"[{topic.get('title', '')}]\n{st}")
|
||
if texts:
|
||
role_texts[role] = "\n".join(texts)
|
||
|
||
if not role_texts:
|
||
return {}
|
||
|
||
role_section = "\n\n".join(
|
||
f"## {role}\n{text}" for role, text in role_texts.items()
|
||
)
|
||
|
||
prompt = f"""당신은 슬라이드 디자인 전문가이다.
|
||
|
||
아래는 슬라이드의 각 영역별 콘텐츠이다. 각 영역에서 **문맥상 정말 강조되어야 할 키워드**를 골라라.
|
||
|
||
규칙:
|
||
- 개수를 정하지 마라. 문맥에 맞게 필요한 만큼만 골라라.
|
||
- 단순 명사 나열이 아니라, 읽는 사람이 "이것이 핵심이구나"라고 느낄 키워드여야 한다.
|
||
- 일반적인 단어(역할, 기술, 정의 등)는 강조 대상이 아니다.
|
||
- 고유명사, 핵심 개념명, 대비되는 용어 등이 강조 대상이다.
|
||
|
||
JSON으로 응답하라:
|
||
{{"배경": ["키워드1", ...], "본심": [...], "첨부": [...], "결론": [...]}}
|
||
빈 역할은 빈 리스트로.
|
||
|
||
{role_section}"""
|
||
|
||
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
|
||
RETRY_INTERVAL = 10
|
||
attempt = 0
|
||
|
||
while attempt < 5:
|
||
attempt += 1
|
||
try:
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
f"{kei_url}/api/direct",
|
||
json={"message": prompt},
|
||
timeout=None,
|
||
) as response:
|
||
if response.status_code != 200:
|
||
logger.warning(f"[V-10 Kei bold] HTTP {response.status_code} (시도 {attempt})")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
full_text = await stream_sse_tokens(response)
|
||
|
||
if not full_text:
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
result = _parse_json(full_text)
|
||
if result and isinstance(result, dict):
|
||
logger.info(f"[V-10 Kei bold] 결과: {result}")
|
||
return result
|
||
else:
|
||
logger.warning(f"[V-10 Kei bold] 파싱 실패 (시도 {attempt})")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[V-10 Kei bold] 실패 (시도 {attempt}): {e}")
|
||
await asyncio.sleep(RETRY_INTERVAL)
|
||
continue
|
||
|
||
logger.warning("[V-10 Kei bold] 최대 재시도 초과, 빈 결과 반환")
|
||
return {}
|
||
|
||
|
||
KEI_FIT_ESCALATION_PROMPT = """당신은 슬라이드 설계 전문가이다.
|
||
콘텐츠를 컨테이너에 배치하려 했으나, 일부 영역의 콘텐츠가 공간을 초과한다.
|
||
|
||
## 핵심 원칙
|
||
- **텍스트 원문은 절대 수정/삭제/요약하지 않는다.**
|
||
- 공간이 부족하면 **하위 불릿(상세 설명)만 팝업으로 분리.**
|
||
- **소제목(카드 제목)은 반드시 슬라이드에 유지.** 절대 팝업으로 빼지 않는다.
|
||
- 슬라이드에는 소제목 + "바로가기 →" 링크. 팝업에 하위 불릿 원문 전체.
|
||
- overflow가 없는 영역은 건드리지 않는다.
|
||
|
||
## 판단 기준
|
||
- **상단(top zone)은 핵심 내용이므로 팝업 대상에서 제외.** 상단이 넘치면 하단에서 공간을 확보.
|
||
- 하단(bottom zone)에서 중요도가 낮은 콘텐츠를 팝업으로 분리.
|
||
- 표 데이터가 큰 경우 → 팝업 분리 1순위.
|
||
- 소제목/카드 제목은 슬라이드에 남겨서 구조를 유지.
|
||
- 한 번에 1~2개 역할만 결정. 전부 다 팝업으로 빼지 않는다.
|
||
|
||
## 출력 (JSON만. 설명 없이.)
|
||
- role에는 반드시 아래 "역할 목록"에 있는 **정확한 역할명**을 사용하라.
|
||
- overflow가 발생한 역할만 포함. overflow 없는 역할은 포함하지 마라.
|
||
|
||
```json
|
||
{
|
||
"decisions": [
|
||
{
|
||
"role": "역할 목록에 있는 정확한 역할명",
|
||
"action": "popup",
|
||
"detail": "팝업으로 분리할 구체적 내용 (하위 불릿만. 소제목은 유지)",
|
||
"reason": "판단 근거 1문장"
|
||
}
|
||
]
|
||
}
|
||
```
|
||
|
||
action 종류:
|
||
- popup: 하위 불릿(상세 설명)을 팝업으로 분리. 소제목은 슬라이드에 유지.
|
||
"""
|
||
|
||
|
||
async def call_kei_fit_escalation(
|
||
fit_report: str,
|
||
topics: list[dict],
|
||
content_summary: str,
|
||
role_names: list[str] | None = None,
|
||
) -> dict[str, Any] | None:
|
||
"""Phase V: 적합성 검증 실패 시 Kei에게 판단 요청.
|
||
|
||
Kei API만 사용. Anthropic 직접 호출 절대 금지.
|
||
"""
|
||
kei_url = getattr(settings, "kei_api_url", "http://localhost:8000")
|
||
|
||
topics_desc = json.dumps(
|
||
[
|
||
{
|
||
"id": t.get("id"),
|
||
"title": t.get("title", ""),
|
||
"purpose": t.get("purpose", ""),
|
||
"source_data": t.get("source_data", "")[:200],
|
||
}
|
||
for t in topics
|
||
],
|
||
ensure_ascii=False,
|
||
indent=2,
|
||
)
|
||
|
||
# 실제 역할명 목록을 prompt에 명시 (Kei가 정확한 역할명을 사용하도록)
|
||
role_list_text = ""
|
||
if role_names:
|
||
role_list_text = f"\n## 역할 목록 (role에 반드시 아래 이름을 사용)\n" + "\n".join(f"- {r}" for r in role_names)
|
||
|
||
prompt = (
|
||
KEI_FIT_ESCALATION_PROMPT + "\n\n"
|
||
f"## 적합성 검증 결과\n{fit_report}\n\n"
|
||
f"## 꼭지 목록\n{topics_desc}"
|
||
f"{role_list_text}\n\n"
|
||
f"## 원본 콘텐츠 요약\n{content_summary[:1500]}"
|
||
)
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=None) as client:
|
||
async with client.stream(
|
||
"POST",
|
||
f"{kei_url}/api/direct",
|
||
json={
|
||
"message": prompt,
|
||
},
|
||
timeout=None,
|
||
) as response:
|
||
if response.status_code != 200:
|
||
logger.warning(f"Kei API (fit) HTTP {response.status_code}")
|
||
return None
|
||
full_text = await stream_sse_tokens(response)
|
||
|
||
if full_text:
|
||
result = _parse_json(full_text)
|
||
if result and "decisions" in result:
|
||
logger.info(
|
||
f"[V-4] Kei 적합성 판단: "
|
||
+ ", ".join(
|
||
f"{d['role']}→{d['action']}"
|
||
for d in result["decisions"]
|
||
)
|
||
)
|
||
return result
|
||
logger.warning("[V-4] Kei 적합성 판단 JSON 파싱 실패")
|
||
return None
|
||
|
||
logger.warning("Kei API (fit) 텍스트 추출 실패")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Kei API (fit) 호출 실패: {e}")
|
||
return None
|