"""32개 프레임 preview PNG에 EasyOCR + 이미지 전처리를 돌려, 기존 texts.md에 없는 '이미지 베이크 텍스트' 델타를 추출/보강. 흐름: 1. 원본 PNG 로드 2. 두 가지 변형을 OCR: (a) 원본 그대로 (b) 2배 업스케일 + 대비 강화 (녹색/저대비 장식 텍스트 잡기용) 3. 두 결과 합치고 confidence 컷 (low=0.15, high=0.5) 4. 오인식 교정 사전 적용 (SIW→S/W, 움합의→융합의 등) 5. 기존 texts.md 토큰과 비교하여 델타 추출 6. 프레임별 통계(감지 수, 델타 수, 누락 여부) 리포트 7. --apply 시 texts.md 파일들에 델타 추가 사용: python scripts/ocr_augment_texts.py # 드라이런 (리포트만) python scripts/ocr_augment_texts.py --apply # texts.md 수정 python scripts/ocr_augment_texts.py --only 1171281172 """ from __future__ import annotations import argparse import json import re import sys from datetime import datetime from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) PREVIEW_DIR = Path("data/figma_previews") INDEX_PATH = PREVIEW_DIR / "index.json" BLOCKS_DIR = Path("figma_to_html_agent/blocks") APPEND_SECTION_HEADER = "## OCR 보강 (이미지 베이크 텍스트, 자동 추출)" APPEND_SECTION_MARKER = "" # conf 기준 CONF_HIGH = 0.5 # 이 이상은 그대로 채택 CONF_LOW = 0.15 # 이 이하는 버림. 사이 구간은 교정 사전 거쳐야 채택 # 자주 틀리는 오인식 → 올바른 표현 (정확히 일치 시만 치환) OCR_CORRECTIONS: dict[str, str] = { "siw": "S/W", "sw": "S/W", "hiw": "H/W", "hw": "H/W", "움합의": "융합의", "(직관지 역할": "직관지 역할", "패텔입": "패러다임", "|말": "개발", "대발": "개발", "Civil": "Civil", "I/W": "S/W", "l/w": "S/W", } # 버리고 싶은 노이즈 패턴 (OCR이 기호/잔여물 잡은 것) NOISE_PATTERNS = [ re.compile(r"^[\W_]+$"), # 기호만 re.compile(r"^\d{1,2}$"), # 숫자 1-2자리 re.compile(r"^.$"), # 한 글자 ] def is_noise(text: str) -> bool: for p in NOISE_PATTERNS: if p.match(text): return True return False def apply_corrections(text: str) -> str: """교정 사전 적용. 대소문자 무시 완전 일치만.""" key = text.strip().lower() if key in OCR_CORRECTIONS: return OCR_CORRECTIONS[key] # 부분 치환 (문구 안에 숨은 경우) result = text for bad, good in OCR_CORRECTIONS.items(): pattern = re.compile(re.escape(bad), re.IGNORECASE) result = pattern.sub(good, result) return result def normalize_for_compare(text: str) -> str: t = text.lower() t = re.sub(r"[^\w가-힣]+", "", t) return t def load_existing_tokens(texts_md: Path) -> set[str]: if not texts_md.exists(): return set() text = texts_md.read_text(encoding="utf-8") # 기존 OCR 섹션 제외 if APPEND_SECTION_MARKER in text: idx = text.find(APPEND_SECTION_MARKER) header_idx = text.rfind(APPEND_SECTION_HEADER, 0, idx) if header_idx >= 0: text = text[:header_idx] lines = [] for ln in text.splitlines(): s = ln.strip() if s.startswith("#") or s.startswith(">"): continue lines.append(ln) body = " ".join(lines) tokens: set[str] = set() for tok in re.split(r"[\s\|\-·•/,.()\[\]:;!?#`'\"*~_+=<>&]+", body): if not tok: continue norm = normalize_for_compare(tok) if norm and len(norm) >= 2: tokens.add(norm) return tokens def preprocess_upscale(png_path: Path, scale: float = 2.0, contrast: float = 1.4): """이미지를 업스케일 + 대비 강화해서 bytes 반환.""" from PIL import Image, ImageEnhance img = Image.open(png_path).convert("RGB") w, h = img.size img = img.resize((int(w * scale), int(h * scale)), Image.LANCZOS) img = ImageEnhance.Contrast(img).enhance(contrast) import io buf = io.BytesIO() img.save(buf, format="PNG") return buf.getvalue() def run_ocr_variants(reader, png_path: Path) -> list[tuple[str, float, tuple]]: """원본 + 업스케일 두 번 OCR. (text, conf, bbox_center) 리스트.""" import numpy as np from PIL import Image collected: list[tuple[str, float, tuple]] = [] # 1) 원본 res1 = reader.readtext(str(png_path), detail=1, paragraph=False) for bbox, text, conf in res1: xs = [p[0] for p in bbox] ys = [p[1] for p in bbox] center = ((min(xs) + max(xs)) / 2, (min(ys) + max(ys)) / 2) collected.append((text, float(conf), center)) # 2) 업스케일 + 대비 강화 enhanced_bytes = preprocess_upscale(png_path) img = np.array(Image.open(__import__("io").BytesIO(enhanced_bytes)).convert("RGB")) res2 = reader.readtext(img, detail=1, paragraph=False) for bbox, text, conf in res2: xs = [p[0] for p in bbox] ys = [p[1] for p in bbox] # 원본 좌표계로 환산 (÷2) center = ((min(xs) + max(xs)) / 4, (min(ys) + max(ys)) / 4) collected.append((text, float(conf), center)) return collected def dedupe_by_position(items: list[tuple[str, float, tuple]]) -> list[tuple[str, float, tuple]]: """같은 위치(±30px)에서 중복 감지된 것들을 confidence 높은 쪽으로 축약.""" result: list[tuple[str, float, tuple]] = [] for text, conf, center in sorted(items, key=lambda r: -r[1]): dupe = False for rt, rc, rcenter in result: if abs(rcenter[0] - center[0]) < 30 and abs(rcenter[1] - center[1]) < 30: # 텍스트 정규화 같으면 중복 if normalize_for_compare(rt) == normalize_for_compare(text): dupe = True break # 같은 위치에서 더 긴 버전이 이미 있으면 중복으로 간주 if normalize_for_compare(text) in normalize_for_compare(rt): dupe = True break if not dupe: result.append((text, conf, center)) return result def extract_accepted(items: list[tuple[str, float, tuple]]) -> list[tuple[str, float]]: """confidence + 교정 적용 후 최종 채택된 (text, conf) 리스트. 규칙: - 교정 사전에 명시된 오인식(예: '패텔입'→'패러다임')은 confidence 무관 채택 - 그 외 conf < CONF_LOW는 노이즈로 버림 - CONF_LOW ~ CONF_HIGH 사이: 한글 2자 이상 또는 교정 발생한 것만 - CONF_HIGH 이상: 그대로 채택 """ accepted: list[tuple[str, float]] = [] for text, conf, _ in items: if is_noise(text): continue corrected = apply_corrections(text) was_corrected = corrected != text if is_noise(corrected): continue if was_corrected: # 교정 사전 매칭 → conf 무관 채택 (신뢰도는 0.99로 덮어씀 — 사전 매칭 확신) accepted.append((corrected, max(conf, 0.99))) continue if conf < CONF_LOW: continue if conf < CONF_HIGH: if not re.search(r"[가-힣]{2,}", corrected): continue accepted.append((corrected, conf)) return accepted def find_delta(accepted: list[tuple[str, float]], existing: set[str]) -> list[tuple[str, float]]: delta: list[tuple[str, float]] = [] seen: set[str] = set() for phrase, conf in accepted: n = normalize_for_compare(phrase) if not n or len(n) < 2: continue if n in seen: continue if n in existing: continue words = [w for w in re.split(r"[\s\|\-·•/,.()\[\]:;!?#`'\"*~_+=<>&]+", phrase) if w] word_norms = [normalize_for_compare(w) for w in words] has_new = any(wn and len(wn) >= 2 and wn not in existing for wn in word_norms) if not has_new and n not in existing: continue seen.add(n) delta.append((phrase, conf)) return delta def strip_prev_ocr_section(text: str) -> str: marker = APPEND_SECTION_MARKER idx = text.find(marker) if idx < 0: return text header_idx = text.rfind(APPEND_SECTION_HEADER, 0, idx) cut = header_idx if header_idx >= 0 else idx return text[:cut].rstrip() + "\n" def append_delta(texts_md: Path, delta: list[tuple[str, float]]) -> str: original = texts_md.read_text(encoding="utf-8") if texts_md.exists() else "" cleaned = strip_prev_ocr_section(original) if not delta: return cleaned ts = datetime.now().strftime("%Y-%m-%d") lines = [ "", APPEND_SECTION_HEADER, "", f"> EasyOCR(2x 업스케일 + 대비강화) 자동 추출 ({ts}). 기존 텍스트 레이어에 없던 단어/문구만.", APPEND_SECTION_MARKER, "", ] for phrase, conf in delta: lines.append(f"- {phrase} _(conf={conf:.2f})_") lines.append("") return cleaned.rstrip() + "\n" + "\n".join(lines) def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--apply", action="store_true", help="texts.md에 실제 반영") ap.add_argument("--only", type=str, default="") args = ap.parse_args() idx: dict[str, dict] = json.loads(INDEX_PATH.read_text(encoding="utf-8")) print("[init] EasyOCR 로딩 (한/영, CPU)...") import easyocr reader = easyocr.Reader(["ko", "en"], gpu=False, verbose=False) print("[init] OK") ts = datetime.now().strftime("%Y%m%d_%H%M%S") out_dir = Path("data/runs") / f"{ts}_ocr_augment" out_dir.mkdir(parents=True, exist_ok=True) numbers = sorted(idx.keys()) summary_rows: list[dict] = [] detail_lines: list[str] = [] for num in numbers: entry = idx[num] fid = entry["frame_id"] if args.only and fid != args.only: continue png = PREVIEW_DIR / f"{num}.png" texts_md = BLOCKS_DIR / fid / "texts.md" if not png.exists(): continue print(f"[{num}] {fid} OCR...", end="", flush=True) raw_items = run_ocr_variants(reader, png) deduped = dedupe_by_position(raw_items) accepted = extract_accepted(deduped) existing = load_existing_tokens(texts_md) delta = find_delta(accepted, existing) # 저신뢰 detection (잠재 누락 신호): conf < LOW 인데 위치 정보가 있는 것 개수 low_conf_count = sum(1 for _, c, _ in raw_items if c < CONF_LOW) print(f" 감지(중복제거) {len(deduped)}개 채택 {len(accepted)}개 델타 {len(delta)}개 " f"저신뢰잔여 {low_conf_count}개") summary_rows.append({ "num": num, "fid": fid, "raw": len(raw_items), "dedup": len(deduped), "accepted": len(accepted), "delta": len(delta), "low_conf": low_conf_count, "delta_items": delta, "low_conf_items": [(t, c) for t, c, _ in raw_items if c < CONF_LOW], }) detail_lines.append(f"\n### {num}. frame `{fid}`") detail_lines.append(f"- OCR 감지(중복제거 후): {len(deduped)}개") detail_lines.append(f"- 기존 texts.md 토큰: {len(existing)}개") detail_lines.append(f"- 채택(교정 후): {len(accepted)}개") detail_lines.append(f"- **델타(신규 보강): {len(delta)}개**") if delta: detail_lines.append("") detail_lines.append("| 신규 문구 | conf |") detail_lines.append("|---|---|") for p, c in delta: detail_lines.append(f"| {p} | {c:.2f} |") low = summary_rows[-1]["low_conf_items"] if low: detail_lines.append("") detail_lines.append(f"
저신뢰 잔여 {len(low)}개 (잠재 누락 단서)") detail_lines.append("") for t, c in sorted(low, key=lambda x: -x[1])[:20]: detail_lines.append(f"- `{t}` (conf={c:.3f})") if len(low) > 20: detail_lines.append(f"- ... 외 {len(low)-20}개") detail_lines.append("
") if args.apply: new_text = append_delta(texts_md, delta) texts_md.parent.mkdir(parents=True, exist_ok=True) texts_md.write_text(new_text, encoding="utf-8") # ─── summary ─── frames_with_delta = [r for r in summary_rows if r["delta"] > 0] frames_no_delta = [r for r in summary_rows if r["delta"] == 0] report = [ "# OCR 보강 리포트 (EasyOCR + 전처리 + 교정)", "", f"- 드라이런: {'적용됨 (--apply)' if args.apply else '드라이런 (texts.md 미수정)'}", f"- 대상 프레임: {len(summary_rows)}개", f"- **텍스트 누락(델타 > 0) 프레임: {len(frames_with_delta)}개**", f"- 델타 없음(보강 불필요) 프레임: {len(frames_no_delta)}개", "", "## 프레임별 요약", "", "| # | frame_id | 감지 | 채택 | **델타** | 저신뢰 | 델타 미리보기 |", "|---|---|---|---|---|---|---|", ] for r in summary_rows: preview = "; ".join(p for p, _ in r["delta_items"][:4]) if len(r["delta_items"]) > 4: preview += "…" mark = "🔴" if r["delta"] > 0 else "·" report.append( f"| {r['num']} | `{r['fid']}` | {r['dedup']} | {r['accepted']} | " f"{mark} **{r['delta']}** | {r['low_conf']} | {preview} |" ) report.append("\n## 텍스트 누락 프레임 리스트 (델타 > 0)\n") if frames_with_delta: for r in frames_with_delta: items = ", ".join(p for p, _ in r["delta_items"]) report.append(f"- **#{r['num']}** `{r['fid']}` — 델타 {r['delta']}개: {items}") else: report.append("_(누락 없음)_") report.append("\n## 상세") report.extend(detail_lines) out = out_dir / "report.md" out.write_text("\n".join(report), encoding="utf-8") print(f"\n[saved] {out}") if args.apply: print("[applied] texts.md 파일들 업데이트 완료") else: print("[dryrun] --apply 를 붙이면 texts.md 에 반영됩니다") return 0 if __name__ == "__main__": raise SystemExit(main())