Files
C.E.L_Slide_test2/src/pipeline_context.py

341 lines
13 KiB
Python

"""Phase T-0: 파이프라인 누적 컨텍스트 객체.
모든 Stage가 하나의 PipelineContext를 공유하며,
각 Stage가 transform → validate → update 패턴을 따른다.
Pydantic BaseModel 채택 이유 (T-0 조사 결과):
- model_dump_json()으로 스냅샷 직렬화 한 줄
- validate_assignment=True로 타입 오류 즉시 감지
- Path, Optional, list[dict] 자동 처리
- 프로젝트가 이미 Pydantic 사용 중 (config.py, FastAPI)
"""
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, Field, model_validator
# ──────────────────────────────────────
# 하위 모델
# ──────────────────────────────────────
class PopupItem(BaseModel):
"""팝업/첨부 항목.
생애주기:
Stage 0 → title, content 확정
Y-14 감지 → popup_id 확정, is_component, source
Stage 2 → popup_id로 참조 (popup_file은 아직 없음)
Stage 5 저장 → popup_file 확정 (run_dir + 파일명 정책)
"""
popup_id: str = "" # 감지 시점에 확정 (예: "popup_1", "comp_DxEffect")
title: str = ""
content: str = ""
source: str | None = None
is_component: bool = False
target_role: str | None = None # Y-14에서 확정: 이 popup이 속하는 role 이름
popup_file: str | None = None # Stage 5에서 확정
class NormalizedContent(BaseModel):
"""Stage 0 출력: MDX 정규화 결과."""
clean_text: str = ""
title: str = ""
images: list[dict[str, str]] = Field(default_factory=list)
popups: list[PopupItem] = Field(default_factory=list)
tables: list[dict[str, Any]] = Field(default_factory=list)
sections: list[dict[str, Any]] = Field(default_factory=list)
class Topic(BaseModel):
"""Stage 1A + 1B 출력: 개별 꼭지 정보.
weight는 여기에 없음 — page_structure의 역할별 속성임.
"""
id: int = 0
title: str = ""
purpose: str = ""
role: str = ""
layer: str = ""
source_hint: str = ""
# Stage 1B에서 병합
relation_type: str = "" # 7개 enum: hierarchy/cause_effect/comparison/sequence/definition/inclusion/none
expression_hint: str = ""
source_data: str = ""
structured_text: str = "" # Stage 1B: 원본 85% 보존 구조화 텍스트 (조립용)
summary: str = ""
class PageStructure(BaseModel):
"""Stage 1A 출력: 역할별 비중 구조."""
roles: dict[str, dict[str, Any]] = Field(default_factory=dict)
# 예: {"본심": {"topic_ids": [1,2], "weight": 0.6}, "배경": {...}, ...}
class Analysis(BaseModel):
"""Stage 1A 출력: Kei 분석 결과 전체."""
core_message: str = ""
conclusion_text: str = "" # Phase Y: slide-base footer에 들어갈 핵심요약 원본 텍스트
title: str = ""
total_pages: int = 1
layout_template: str = "A" # Phase X-B: Kei가 선택한 유형 (A 또는 B)
image_sizes: dict[str, dict[str, Any]] = Field(default_factory=dict)
# topics와 page_structure는 PipelineContext 최상위에 위치
class TextBudget(BaseModel):
"""Stage 1.5a 출력: 텍스트 예산."""
font_size: float = 12.0
chars_per_line: int = 0
max_lines: int = 0
max_chars: int = 0
source_chars: int = 0
needs_compression: bool = False
class DesignBudget(BaseModel):
"""Stage 1.5b 출력: 디자인 요소 예산."""
available_height_px: int = 0
available_width_px: int = 0
max_circle_diameter: int = 0
max_img_width: int = 0
max_img_height: int = 0
fits: bool = True
class ContainerInfo(BaseModel):
"""Stage 1.5a/1.5b 통합: 역할별 컨테이너 정보."""
role: str = ""
zone: str = ""
topic_ids: list[int] = Field(default_factory=list)
weight: float = 0.0
height_px: int = 0
width_px: int = 0
max_height_cost: str = "medium"
text_budget: Optional[TextBudget] = None
design_budget: Optional[DesignBudget] = None
block_constraints: dict[str, Any] = Field(default_factory=dict)
class FontHierarchy(BaseModel):
"""Stage 1.5a 출력: 확정된 폰트 위계."""
key_msg: float = 14.0 # 핵심 메시지 (가장 큼)
core: float = 12.0 # 본문
bg: float = 11.0 # 배경 (10-12 범위)
sidebar: float = 10.0 # 첨부 (9-11 범위)
@model_validator(mode="after")
def check_hierarchy(self):
"""폰트 위계 유지 검증: key_msg > core >= bg > sidebar."""
if not (self.key_msg > self.core >= self.bg > self.sidebar):
raise ValueError(
f"폰트 위계 위반: key_msg({self.key_msg}) > core({self.core}) "
f">= bg({self.bg}) > sidebar({self.sidebar}) 이어야 함"
)
return self
class BlockReference(BaseModel):
"""Stage 1.7 출력: 참고 블록 정보."""
block_id: str = ""
variant: str = "default"
visual_type: str = ""
schema_info: dict[str, Any] = Field(default_factory=dict)
design_reference_html: str = ""
topic_id: int | None = None
supporting_topic_ids: list[int] = Field(default_factory=list)
is_hierarchical: bool = False
class StageError(BaseModel):
"""Stage 실행 중 발생한 에러."""
stage: str = ""
attempt: int = 0
severity: str = "RETRYABLE" # FATAL / RETRYABLE / ADJUSTABLE
errors: list[dict[str, Any]] = Field(default_factory=list)
# ──────────────────────────────────────
# 메인 컨텍스트
# ──────────────────────────────────────
class PipelineContext(BaseModel):
"""파이프라인 전체를 관통하는 누적 컨텍스트.
각 Stage가 이 객체를 받아서 필요한 필드를 읽고,
결과를 model_copy(update=...)로 병합한다.
"""
model_config = {"validate_assignment": True, "arbitrary_types_allowed": True}
# ── 메타 ──
run_id: str = ""
run_dir: Optional[str] = None # Path를 str로 저장 (JSON 직렬화)
raw_content: str = "" # 원본 MDX (변경 불가 참조용)
base_path: str = "" # 이미지 기준 경로
# ── Stage 0 ──
normalized: NormalizedContent = Field(default_factory=NormalizedContent)
# ── Stage 1A ──
analysis: Analysis = Field(default_factory=Analysis)
topics: list[Topic] = Field(default_factory=list)
page_structure: PageStructure = Field(default_factory=PageStructure)
# ── Phase Y: MDX 원본 섹션 (## 파싱 결과) ──
mdx_sections: list[dict[str, Any]] = Field(default_factory=list) # [{title, content, level, is_intro}]
# ── Stage 1.5a ──
font_hierarchy: FontHierarchy = Field(default_factory=FontHierarchy)
container_ratio: tuple[int, int] = (0, 0) # Stage 1.5a에서 설정 (body_pct, sidebar_pct)
containers: dict[str, ContainerInfo] = Field(default_factory=dict)
# ── Stage 1.7 ──
references: dict[str, list[BlockReference]] = Field(default_factory=dict)
preset_name: str = ""
preset: dict[str, Any] = Field(default_factory=dict)
# ── Stage 1.8 ──
fit_result: dict[str, Any] = Field(default_factory=dict)
font_scale: float = 1.0 # Phase Y: fit 루프에서 확정된 font 축소 비율
enhancement_result: dict[str, Any] = Field(default_factory=dict)
sub_layouts: dict[str, Any] = Field(default_factory=dict) # role → ContainerLayout 직렬화
# ── Stage 2 ──
generated_html: dict[str, str] = Field(default_factory=dict) # body_html, sidebar_html, footer_html
# ── Stage 3 ──
rendered_html: str = ""
# ── Stage 4 ──
measurement: dict[str, Any] = Field(default_factory=dict)
quality_score: int = 0
screenshot_b64: str = ""
# ── 에러/경고 추적 ──
errors: list[StageError] = Field(default_factory=list)
warnings: list[str] = Field(default_factory=list)
retry_feedback: str = "" # 재시도 시 Self-Refine 피드백
# ── 이미지 ──
slide_images: list[dict[str, Any]] = Field(default_factory=list)
def get_run_dir(self) -> Path:
"""run_dir를 Path 객체로 반환."""
if self.run_dir:
return Path(self.run_dir)
p = Path("data/runs") / self.run_id
return p
def save_snapshot(self, stage_name: str) -> None:
"""디버깅용 스냅샷 저장. JSON + HTML 시각화."""
run_dir = self.get_run_dir()
run_dir.mkdir(parents=True, exist_ok=True)
# JSON
path = run_dir / f"{stage_name}_context.json"
path.write_text(
self.model_dump_json(indent=2, exclude={"screenshot_b64", "rendered_html"}),
encoding="utf-8",
)
# HTML 시각화
try:
from src.step_visualizer import generate_step_html
steps_dir = run_dir / "steps"
steps_dir.mkdir(exist_ok=True)
generate_step_html(stage_name, self, steps_dir)
except Exception as e:
pass # 시각화 실패해도 파이프라인은 계속
def log_error(self, stage: str, errors: list[dict], attempt: int = 0,
severity: str = "RETRYABLE") -> None:
"""에러를 컨텍스트에 기록."""
self.errors.append(StageError(
stage=stage,
attempt=attempt,
severity=severity,
errors=errors,
))
def get_role_content(self, role: str) -> str:
"""역할(본심/배경/첨부/결론)에 해당하는 원본 텍스트를 반환.
page_structure에서 topic_ids를 찾고,
해당 topics의 source_data를 합쳐서 반환.
source_data가 없으면 normalized.clean_text에서 source_hint로 매칭.
"""
role_info = self.page_structure.roles.get(role, {})
topic_ids = role_info.get("topic_ids", [])
texts = []
for t in self.topics:
if t.id in topic_ids:
if t.source_data:
texts.append(t.source_data)
elif t.source_hint and self.normalized.sections:
# source_hint로 섹션 매칭
for sec in self.normalized.sections:
if t.source_hint.lower() in sec.get("title", "").lower():
texts.append(sec.get("content", ""))
break
return "\n\n".join(texts) if texts else ""
# ──────────────────────────────────────
# Stage 실행 유틸리티
# ──────────────────────────────────────
class StageFailure(Exception):
"""Stage 실행 실패 (재시도 소진)."""
def __init__(self, stage_name: str, errors: list[dict]):
self.stage_name = stage_name
self.errors = errors
super().__init__(f"Stage {stage_name} 실패: {errors}")
def build_retry_feedback(stage_name: str, errors: list[dict],
original_text: str = "") -> str:
"""Self-Refine 패턴: localization + evidence + instruction.
NeurIPS 2023 Self-Refine + VASCAR Scorer/Suggester 분리 패턴.
"""
lines = [
f"## 이전 {stage_name} 결과의 검증 실패. 다음 문제를 수정하라.\n"
]
for i, err in enumerate(errors, 1):
lines.append(f"### 문제 {i}: {err.get('field', err.get('layer', ''))}")
if err.get("localization"):
lines.append(f"- 위치: {err['localization']}")
if err.get("current_value"):
lines.append(f"- 현재 값: {err['current_value']}")
if err.get("evidence"):
lines.append(f"- 원본 근거: \"{err['evidence']}\"")
if err.get("instruction"):
lines.append(f"- 수정 지시: {err['instruction']}")
lines.append("")
if original_text:
excerpt = original_text[:500]
lines.append(f"## 원본 텍스트 (참고)\n{excerpt}\n")
lines.append("위 문제들을 해결한 결과를 다시 생성하라. 원본에 없는 해석을 추가하지 마라.")
return "\n".join(lines)
def create_context(content: str, base_path: str = "") -> PipelineContext:
"""파이프라인 시작 시 초기 컨텍스트 생성."""
run_id = time.strftime("%Y%m%d_%H%M%S")
run_dir = str(Path("data/runs") / run_id)
return PipelineContext(
run_id=run_id,
run_dir=run_dir,
raw_content=content,
base_path=base_path,
)