Files
C.E.L_Slide_test2/scripts/ocr_augment_texts.py
kyeongmin 85c680f02a docs + V4 catalog + samples + Phase Q legacy 보존
전체 26 files (20 추가 + 6 수정), 10507 insertions.

Phase Z 문서 :
- docs/architecture/PHASE-Z-CHANGE-LOG.md (신설) — axis-by-axis 의사결정 history
  (newest-on-top). Step 7-A 부터 6 entry 박힘 + 2026-05-08 / 2026-05-08 #2
  (compat 매트릭스 폐기 / 6-B 폐기 / F14 표현 정정 / label gate policy 분리).
- docs/architecture/PHASE-Z-PIPELINE-OVERVIEW.md (수정) — Step 5/6/9 Gap note
  append (구조 무변, append-only). 6-B 폐기 사실 + Refinement F.
- docs/architecture/PHASE-Z-PIPELINE-STATUS-BOARD.md (수정) — snapshot date
  2026-05-08 갱신. §3 핵심 missing item 5 (Step 5/6/9 boundary axis breakdown
  + 폐기 기록). §6 한 줄 갱신 — 다음 axis 후보 A~F.

Project root docs :
- PLAN.md / PROGRESS.md / README.md (수정) — 토큰 체계 / 폴더 구조 / 설계 문서 /
  역할 분리 반영.
- IMPROVEMENT-REDESIGN.md (신설) — Phase Z 설계 핵심 문서.
- PROCESS_OVERVIEW.html (신설) — 파이프라인 개요 시각.
- docs/tasks/* (신설) — Phase Z task 문서.

V4 catalog (Phase Z runtime 필수 의존성) :
- tests/matching/v4_full32_result.yaml (신설, 4888 줄) — V4 매칭 결과 32 frame
  × 10 MDX section. lookup_v4_match() / lookup_v4_candidates() 가 본 파일 read.
  Phase Z runtime 이 *없으면 즉시 abort* — clone 후 즉시 동작 가능 보장.

Samples :
- samples/mdx_batch/04.mdx (신설) — MDX04 기본 sample.
- samples/mdx/04. DX 지연 요인.mdx (신설) — MDX04 원본.

Phase Q legacy 보존 (별 axis "Phase Q audit & salvage" 영역) :
- src/block_matcher_tfidf.py / catalog_blocks.py / frame_extractor.py /
  pipeline_v2.py — Phase Q (옛 파이프라인) src 신규 untracked 파일들.
  Phase Z runtime 와 의존성 0. Phase Q audit axis 에서 검토 예정.
- scripts/eval_block_matcher.py / fetch_all_frame_screenshots.py /
  match_17_units_my_matcher.py / match_mdx_strict.py / match_mdx_to_frames_tfidf.py /
  ocr_augment_texts.py / run_pipeline_v2.py / previews/ — Phase Q 작업 시
  사용한 옛 script. 같이 보존.
- run_mdx03_pipeline.py (수정) — Phase Q 진입점 (no flag) + Phase Z 진입점
  (--phase-z2 flag) 동시 wrapper. Phase Z 만 사용 시 `python -m
  src.phase_z2_pipeline samples/mdx_batch/03.mdx <run_id>` 직접 호출.

비-scope :
- tests/matching/ (v4_full32_result.yaml 외 ~63MB) — V4 진화 history /
  reports / DECK / ATTACH. Phase Q audit axis 에서 검토.
- tests/pipeline/ (~15MB) — pipeline data. Phase Q audit 영역.
- templates/catalog/blocks.yaml — 옛 block catalog. Phase Q audit.
- templates/phase_z2/frames/ — 옛 frame partial 위치. Phase Q audit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 09:47:58 +09:00

399 lines
14 KiB
Python

"""32개 프레임 preview PNG에 EasyOCR + 이미지 전처리를 돌려,
기존 texts.md에 없는 '이미지 베이크 텍스트' 델타를 추출/보강.
흐름:
1. 원본 PNG 로드
2. 두 가지 변형을 OCR:
(a) 원본 그대로
(b) 2배 업스케일 + 대비 강화 (녹색/저대비 장식 텍스트 잡기용)
3. 두 결과 합치고 confidence 컷 (low=0.15, high=0.5)
4. 오인식 교정 사전 적용 (SIW→S/W, 움합의→융합의 등)
5. 기존 texts.md 토큰과 비교하여 델타 추출
6. 프레임별 통계(감지 수, 델타 수, 누락 여부) 리포트
7. --apply 시 texts.md 파일들에 델타 추가
사용:
python scripts/ocr_augment_texts.py # 드라이런 (리포트만)
python scripts/ocr_augment_texts.py --apply # texts.md 수정
python scripts/ocr_augment_texts.py --only 1171281172
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
PREVIEW_DIR = Path("data/figma_previews")
INDEX_PATH = PREVIEW_DIR / "index.json"
BLOCKS_DIR = Path("figma_to_html_agent/blocks")
APPEND_SECTION_HEADER = "## OCR 보강 (이미지 베이크 텍스트, 자동 추출)"
APPEND_SECTION_MARKER = "<!-- OCR_AUGMENT_V1 -->"
# conf 기준
CONF_HIGH = 0.5 # 이 이상은 그대로 채택
CONF_LOW = 0.15 # 이 이하는 버림. 사이 구간은 교정 사전 거쳐야 채택
# 자주 틀리는 오인식 → 올바른 표현 (정확히 일치 시만 치환)
OCR_CORRECTIONS: dict[str, str] = {
"siw": "S/W",
"sw": "S/W",
"hiw": "H/W",
"hw": "H/W",
"움합의": "융합의",
"(직관지 역할": "직관지 역할",
"패텔입": "패러다임",
"|말": "개발",
"대발": "개발",
"Civil": "Civil",
"I/W": "S/W",
"l/w": "S/W",
}
# 버리고 싶은 노이즈 패턴 (OCR이 기호/잔여물 잡은 것)
NOISE_PATTERNS = [
re.compile(r"^[\W_]+$"), # 기호만
re.compile(r"^\d{1,2}$"), # 숫자 1-2자리
re.compile(r"^.$"), # 한 글자
]
def is_noise(text: str) -> bool:
for p in NOISE_PATTERNS:
if p.match(text):
return True
return False
def apply_corrections(text: str) -> str:
"""교정 사전 적용. 대소문자 무시 완전 일치만."""
key = text.strip().lower()
if key in OCR_CORRECTIONS:
return OCR_CORRECTIONS[key]
# 부분 치환 (문구 안에 숨은 경우)
result = text
for bad, good in OCR_CORRECTIONS.items():
pattern = re.compile(re.escape(bad), re.IGNORECASE)
result = pattern.sub(good, result)
return result
def normalize_for_compare(text: str) -> str:
t = text.lower()
t = re.sub(r"[^\w가-힣]+", "", t)
return t
def load_existing_tokens(texts_md: Path) -> set[str]:
if not texts_md.exists():
return set()
text = texts_md.read_text(encoding="utf-8")
# 기존 OCR 섹션 제외
if APPEND_SECTION_MARKER in text:
idx = text.find(APPEND_SECTION_MARKER)
header_idx = text.rfind(APPEND_SECTION_HEADER, 0, idx)
if header_idx >= 0:
text = text[:header_idx]
lines = []
for ln in text.splitlines():
s = ln.strip()
if s.startswith("#") or s.startswith(">"):
continue
lines.append(ln)
body = " ".join(lines)
tokens: set[str] = set()
for tok in re.split(r"[\s\|\-·•/,.()\[\]:;!?#`'\"*~_+=<>&]+", body):
if not tok:
continue
norm = normalize_for_compare(tok)
if norm and len(norm) >= 2:
tokens.add(norm)
return tokens
def preprocess_upscale(png_path: Path, scale: float = 2.0, contrast: float = 1.4):
"""이미지를 업스케일 + 대비 강화해서 bytes 반환."""
from PIL import Image, ImageEnhance
img = Image.open(png_path).convert("RGB")
w, h = img.size
img = img.resize((int(w * scale), int(h * scale)), Image.LANCZOS)
img = ImageEnhance.Contrast(img).enhance(contrast)
import io
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
def run_ocr_variants(reader, png_path: Path) -> list[tuple[str, float, tuple]]:
"""원본 + 업스케일 두 번 OCR. (text, conf, bbox_center) 리스트."""
import numpy as np
from PIL import Image
collected: list[tuple[str, float, tuple]] = []
# 1) 원본
res1 = reader.readtext(str(png_path), detail=1, paragraph=False)
for bbox, text, conf in res1:
xs = [p[0] for p in bbox]
ys = [p[1] for p in bbox]
center = ((min(xs) + max(xs)) / 2, (min(ys) + max(ys)) / 2)
collected.append((text, float(conf), center))
# 2) 업스케일 + 대비 강화
enhanced_bytes = preprocess_upscale(png_path)
img = np.array(Image.open(__import__("io").BytesIO(enhanced_bytes)).convert("RGB"))
res2 = reader.readtext(img, detail=1, paragraph=False)
for bbox, text, conf in res2:
xs = [p[0] for p in bbox]
ys = [p[1] for p in bbox]
# 원본 좌표계로 환산 (÷2)
center = ((min(xs) + max(xs)) / 4, (min(ys) + max(ys)) / 4)
collected.append((text, float(conf), center))
return collected
def dedupe_by_position(items: list[tuple[str, float, tuple]]) -> list[tuple[str, float, tuple]]:
"""같은 위치(±30px)에서 중복 감지된 것들을 confidence 높은 쪽으로 축약."""
result: list[tuple[str, float, tuple]] = []
for text, conf, center in sorted(items, key=lambda r: -r[1]):
dupe = False
for rt, rc, rcenter in result:
if abs(rcenter[0] - center[0]) < 30 and abs(rcenter[1] - center[1]) < 30:
# 텍스트 정규화 같으면 중복
if normalize_for_compare(rt) == normalize_for_compare(text):
dupe = True
break
# 같은 위치에서 더 긴 버전이 이미 있으면 중복으로 간주
if normalize_for_compare(text) in normalize_for_compare(rt):
dupe = True
break
if not dupe:
result.append((text, conf, center))
return result
def extract_accepted(items: list[tuple[str, float, tuple]]) -> list[tuple[str, float]]:
"""confidence + 교정 적용 후 최종 채택된 (text, conf) 리스트.
규칙:
- 교정 사전에 명시된 오인식(예: '패텔입''패러다임')은 confidence 무관 채택
- 그 외 conf < CONF_LOW는 노이즈로 버림
- CONF_LOW ~ CONF_HIGH 사이: 한글 2자 이상 또는 교정 발생한 것만
- CONF_HIGH 이상: 그대로 채택
"""
accepted: list[tuple[str, float]] = []
for text, conf, _ in items:
if is_noise(text):
continue
corrected = apply_corrections(text)
was_corrected = corrected != text
if is_noise(corrected):
continue
if was_corrected:
# 교정 사전 매칭 → conf 무관 채택 (신뢰도는 0.99로 덮어씀 — 사전 매칭 확신)
accepted.append((corrected, max(conf, 0.99)))
continue
if conf < CONF_LOW:
continue
if conf < CONF_HIGH:
if not re.search(r"[가-힣]{2,}", corrected):
continue
accepted.append((corrected, conf))
return accepted
def find_delta(accepted: list[tuple[str, float]], existing: set[str]) -> list[tuple[str, float]]:
delta: list[tuple[str, float]] = []
seen: set[str] = set()
for phrase, conf in accepted:
n = normalize_for_compare(phrase)
if not n or len(n) < 2:
continue
if n in seen:
continue
if n in existing:
continue
words = [w for w in re.split(r"[\s\|\-·•/,.()\[\]:;!?#`'\"*~_+=<>&]+", phrase) if w]
word_norms = [normalize_for_compare(w) for w in words]
has_new = any(wn and len(wn) >= 2 and wn not in existing for wn in word_norms)
if not has_new and n not in existing:
continue
seen.add(n)
delta.append((phrase, conf))
return delta
def strip_prev_ocr_section(text: str) -> str:
marker = APPEND_SECTION_MARKER
idx = text.find(marker)
if idx < 0:
return text
header_idx = text.rfind(APPEND_SECTION_HEADER, 0, idx)
cut = header_idx if header_idx >= 0 else idx
return text[:cut].rstrip() + "\n"
def append_delta(texts_md: Path, delta: list[tuple[str, float]]) -> str:
original = texts_md.read_text(encoding="utf-8") if texts_md.exists() else ""
cleaned = strip_prev_ocr_section(original)
if not delta:
return cleaned
ts = datetime.now().strftime("%Y-%m-%d")
lines = [
"",
APPEND_SECTION_HEADER,
"",
f"> EasyOCR(2x 업스케일 + 대비강화) 자동 추출 ({ts}). 기존 텍스트 레이어에 없던 단어/문구만.",
APPEND_SECTION_MARKER,
"",
]
for phrase, conf in delta:
lines.append(f"- {phrase} _(conf={conf:.2f})_")
lines.append("")
return cleaned.rstrip() + "\n" + "\n".join(lines)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--apply", action="store_true", help="texts.md에 실제 반영")
ap.add_argument("--only", type=str, default="")
args = ap.parse_args()
idx: dict[str, dict] = json.loads(INDEX_PATH.read_text(encoding="utf-8"))
print("[init] EasyOCR 로딩 (한/영, CPU)...")
import easyocr
reader = easyocr.Reader(["ko", "en"], gpu=False, verbose=False)
print("[init] OK")
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = Path("data/runs") / f"{ts}_ocr_augment"
out_dir.mkdir(parents=True, exist_ok=True)
numbers = sorted(idx.keys())
summary_rows: list[dict] = []
detail_lines: list[str] = []
for num in numbers:
entry = idx[num]
fid = entry["frame_id"]
if args.only and fid != args.only:
continue
png = PREVIEW_DIR / f"{num}.png"
texts_md = BLOCKS_DIR / fid / "texts.md"
if not png.exists():
continue
print(f"[{num}] {fid} OCR...", end="", flush=True)
raw_items = run_ocr_variants(reader, png)
deduped = dedupe_by_position(raw_items)
accepted = extract_accepted(deduped)
existing = load_existing_tokens(texts_md)
delta = find_delta(accepted, existing)
# 저신뢰 detection (잠재 누락 신호): conf < LOW 인데 위치 정보가 있는 것 개수
low_conf_count = sum(1 for _, c, _ in raw_items if c < CONF_LOW)
print(f" 감지(중복제거) {len(deduped)}개 채택 {len(accepted)}개 델타 {len(delta)}"
f"저신뢰잔여 {low_conf_count}")
summary_rows.append({
"num": num,
"fid": fid,
"raw": len(raw_items),
"dedup": len(deduped),
"accepted": len(accepted),
"delta": len(delta),
"low_conf": low_conf_count,
"delta_items": delta,
"low_conf_items": [(t, c) for t, c, _ in raw_items if c < CONF_LOW],
})
detail_lines.append(f"\n### {num}. frame `{fid}`")
detail_lines.append(f"- OCR 감지(중복제거 후): {len(deduped)}")
detail_lines.append(f"- 기존 texts.md 토큰: {len(existing)}")
detail_lines.append(f"- 채택(교정 후): {len(accepted)}")
detail_lines.append(f"- **델타(신규 보강): {len(delta)}개**")
if delta:
detail_lines.append("")
detail_lines.append("| 신규 문구 | conf |")
detail_lines.append("|---|---|")
for p, c in delta:
detail_lines.append(f"| {p} | {c:.2f} |")
low = summary_rows[-1]["low_conf_items"]
if low:
detail_lines.append("")
detail_lines.append(f"<details><summary>저신뢰 잔여 {len(low)}개 (잠재 누락 단서)</summary>")
detail_lines.append("")
for t, c in sorted(low, key=lambda x: -x[1])[:20]:
detail_lines.append(f"- `{t}` (conf={c:.3f})")
if len(low) > 20:
detail_lines.append(f"- ... 외 {len(low)-20}")
detail_lines.append("</details>")
if args.apply:
new_text = append_delta(texts_md, delta)
texts_md.parent.mkdir(parents=True, exist_ok=True)
texts_md.write_text(new_text, encoding="utf-8")
# ─── summary ───
frames_with_delta = [r for r in summary_rows if r["delta"] > 0]
frames_no_delta = [r for r in summary_rows if r["delta"] == 0]
report = [
"# OCR 보강 리포트 (EasyOCR + 전처리 + 교정)",
"",
f"- 드라이런: {'적용됨 (--apply)' if args.apply else '드라이런 (texts.md 미수정)'}",
f"- 대상 프레임: {len(summary_rows)}",
f"- **텍스트 누락(델타 > 0) 프레임: {len(frames_with_delta)}개**",
f"- 델타 없음(보강 불필요) 프레임: {len(frames_no_delta)}",
"",
"## 프레임별 요약",
"",
"| # | frame_id | 감지 | 채택 | **델타** | 저신뢰 | 델타 미리보기 |",
"|---|---|---|---|---|---|---|",
]
for r in summary_rows:
preview = "; ".join(p for p, _ in r["delta_items"][:4])
if len(r["delta_items"]) > 4:
preview += ""
mark = "🔴" if r["delta"] > 0 else "·"
report.append(
f"| {r['num']} | `{r['fid']}` | {r['dedup']} | {r['accepted']} | "
f"{mark} **{r['delta']}** | {r['low_conf']} | {preview} |"
)
report.append("\n## 텍스트 누락 프레임 리스트 (델타 > 0)\n")
if frames_with_delta:
for r in frames_with_delta:
items = ", ".join(p for p, _ in r["delta_items"])
report.append(f"- **#{r['num']}** `{r['fid']}` — 델타 {r['delta']}개: {items}")
else:
report.append("_(누락 없음)_")
report.append("\n## 상세")
report.extend(detail_lines)
out = out_dir / "report.md"
out.write_text("\n".join(report), encoding="utf-8")
print(f"\n[saved] {out}")
if args.apply:
print("[applied] texts.md 파일들 업데이트 완료")
else:
print("[dryrun] --apply 를 붙이면 texts.md 에 반영됩니다")
return 0
if __name__ == "__main__":
raise SystemExit(main())