From 7ddfc4ef966136d9ff8300bf7c0923524539435d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9D=B4=EA=B2=BD=EB=AF=BC?= Date: Thu, 19 Mar 2026 09:13:22 +0900 Subject: [PATCH] Upload converters/pipeline/step2_extract.py --- .../converters/pipeline/step2_extract.py | 787 ++++++++++++++++++ 1 file changed, 787 insertions(+) create mode 100644 03.Code/업로드용/converters/pipeline/step2_extract.py diff --git a/03.Code/업로드용/converters/pipeline/step2_extract.py b/03.Code/업로드용/converters/pipeline/step2_extract.py new file mode 100644 index 0000000..fc4501d --- /dev/null +++ b/03.Code/업로드용/converters/pipeline/step2_extract.py @@ -0,0 +1,787 @@ +# -*- coding: utf-8 -*- +""" +extract_1_v2.py + +PDF에서 텍스트(md)와 이미지(png)를 추출하는 기능을 담당하는 모듈. +- 원본 폴더 구조 유지 +- 이미지 추출 시 캡션(예: <그림 1>)과 연결 +- 헤더/푸터 제외 로직 포함 +- OCR 옵션 지원 (Tesseract 설치 필요) +- JSON 기반 메타데이터 기록 (이미지경로, 캡션 등) +""" + +import fitz # PyMuPDF +import os +import re +import json +import numpy as np +from pathlib import Path +from datetime import datetime +from PIL import Image +import io + +# ===== OCR 설정 (선택적) ===== +try: + import pytesseract + import shutil + tesseract_path = shutil.which("tesseract") + if tesseract_path: + pytesseract.pytesseract.tesseract_cmd = tesseract_path + TESSERACT_AVAILABLE = True +except ImportError: + TESSERACT_AVAILABLE = False + print("[INFO] pytesseract 미설치. 이미지 텍스트 분석 기능이 제한됩니다.") + +# ===== 설정 및 상수 ===== +CAPTION_PATTERN = re.compile( + r'^\s*(?:[<\[\(\{]\s*)?(그림|figure|fig)\s*\.?\s*(?:[<\[\(\{]\s*)?0*\d+(?:\s*[-~]\s*\d+)?', + re.IGNORECASE +) + +# ===== 이미지 추출 및 캡션 매칭 핵심 로직 ===== + +def get_figure_rects(page): + """ + Identifies figure regions based on '<그림 N>' captions and vector drawings. + Returns a list of dicts: {'rect': fitz.Rect, 'caption_block': block_index} + """ + drawings = page.get_drawings() + + blocks = page.get_text("blocks") + captions = [] + + for i, b in enumerate(blocks): + text = b[4] + if CAPTION_PATTERN.search(text): + captions.append({'rect': fitz.Rect(b[:4]), 'index': i, 'text': text, 'drawings': []}) + + if not captions: + return [] + + filtered_drawings_rects = [] + for d in drawings: + r = d["rect"] + if r.height > page.rect.height / 3 and r.width < 5: + continue + if r.width > page.rect.width * 0.9: + continue + filtered_drawings_rects.append(r) + + page_area = page.rect.get_area() + img_rects = [] + for b in page.get_text("dict")["blocks"]: + if b.get("type") == 1: + ir = fitz.Rect(b["bbox"]) + if ir.get_area() < page_area * 0.01: + continue + img_rects.append(ir) + + remaining_drawings = filtered_drawings_rects + img_rects + caption_clusters = {cap['index']: [cap['rect']] for cap in captions} + + def is_text_between(r1, r2, text_blocks): + if r1.intersects(r2): + return False + union = r1 | r2 + for b in text_blocks: + b_rect = fitz.Rect(b[:4]) + text_content = b[4] + if len(text_content.strip()) < 20: + continue + if not b_rect.intersects(union): + continue + if b_rect.intersects(r1) or b_rect.intersects(r2): + continue + return True + return False + + changed = True + while changed: + changed = False + to_remove = [] + + for d_rect in remaining_drawings: + best_cluster_key = None + min_dist = float('inf') + + for cap_index, cluster_rects in caption_clusters.items(): + for r in cluster_rects: + dist = 0 + if d_rect.intersects(r): + dist = 0 + else: + x_dist = 0 + if d_rect.x1 < r.x0: x_dist = r.x0 - d_rect.x1 + elif d_rect.x0 > r.x1: x_dist = d_rect.x0 - r.x1 + + y_dist = 0 + if d_rect.y1 < r.y0: y_dist = r.y0 - d_rect.y1 + elif d_rect.y0 > r.y1: y_dist = d_rect.y0 - r.y1 + + if x_dist < 150 and y_dist < 150: + dist = max(x_dist, y_dist) + 0.1 + else: + dist = float('inf') + + if dist < min_dist: + if not is_text_between(r, d_rect, blocks): + min_dist = dist + best_cluster_key = cap_index + + if min_dist == 0: + break + + if best_cluster_key is not None and min_dist < 150: + caption_clusters[best_cluster_key].append(d_rect) + to_remove.append(d_rect) + changed = True + + for r in to_remove: + remaining_drawings.remove(r) + + figure_regions = [] + + for cap in captions: + cluster_rects = caption_clusters[cap['index']] + content_rects = cluster_rects[1:] + + if not content_rects: + continue + + union_rect = content_rects[0] + for r in content_rects[1:]: + union_rect = union_rect | r + + union_rect.x0 = max(0, union_rect.x0 - 5) + union_rect.x1 = min(page.rect.width, union_rect.x1 + 5) + union_rect.y0 = max(0, union_rect.y0 - 5) + union_rect.y1 = min(page.rect.height, union_rect.y1 + 5) + + cap_rect = cap['rect'] + + if cap_rect.y0 + cap_rect.height/2 < union_rect.y0 + union_rect.height/2: + if union_rect.y0 < cap_rect.y1: union_rect.y0 = cap_rect.y1 + 2 + else: + if union_rect.y1 > cap_rect.y0: union_rect.y1 = cap_rect.y0 - 2 + + area = union_rect.get_area() + page_area = page.rect.get_area() + + if area < page_area * 0.01: + continue + + if union_rect.height < 20 and union_rect.width > page.rect.width * 0.6: + continue + if union_rect.width < 20 and union_rect.height > page.rect.height * 0.6: + continue + + text_blocks = page.get_text("blocks") + text_count = 0 + + for b in text_blocks: + b_rect = fitz.Rect(b[:4]) + if not b_rect.intersects(union_rect): + continue + text = b[4].strip() + if len(text) < 5: + continue + text_count += 1 + + if text_count < 0: + continue + + figure_regions.append({ + 'rect': union_rect, + 'caption_index': cap['index'], + 'caption_rect': cap['rect'], + 'caption_text': cap['text'].strip() # 원본 캡션 텍스트 유지 + }) + + return figure_regions + + +def pixmap_metrics(pix): + arr = np.frombuffer(pix.samples, dtype=np.uint8) + c = 4 if pix.alpha else 3 + arr = arr.reshape(pix.height, pix.width, c)[:, :, :3] + gray = (0.299 * arr[:, :, 0] + 0.587 * arr[:, :, 1] + 0.114 * arr[:, :, 2]).astype(np.uint8) + white = gray > 245 + nonwhite_ratio = float(1.0 - white.mean()) + gx = np.abs(np.diff(gray.astype(np.int16), axis=1)) + gy = np.abs(np.diff(gray.astype(np.int16), axis=0)) + edge = (gx[:-1, :] + gy[:, :-1]) > 40 + edge_ratio = float(edge.mean()) + var = float(gray.var()) + return nonwhite_ratio, edge_ratio, var + + +def keep_figure(pix): + nonwhite_ratio, edge_ratio, var = pixmap_metrics(pix) + if nonwhite_ratio < 0.004: + return False, nonwhite_ratio, edge_ratio, var + if nonwhite_ratio < 0.012 and edge_ratio < 0.004 and var < 20: + return False, nonwhite_ratio, edge_ratio, var + return True, nonwhite_ratio, edge_ratio, var + + +# ===== 추가 이미지 필터링 알고리즘 (v2.1) ===== + +def pix_to_pil(pix): + """PyMuPDF Pixmap을 PIL Image로 변환""" + img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) + return img + +def has_cut_text_at_boundary(pix, margin=5): + """ + 이미지 경계선에 텍스트가 잘려 있는지 확인 + - 이미지 주변에 근접한 텍스트 박스가 있으면 필터링 대상으로 판단 + + Args: + pix: PyMuPDF Pixmap + margin: 경계선으로부터의 여백 (기본 5px) + + Returns: + bool: 텍스트가 잘린 경우 True + """ + if not TESSERACT_AVAILABLE: + return False # OCR 없으면 우선 통과 + + try: + img = pix_to_pil(pix) + width, height = img.size + + # OCR로 텍스트 위치 추출 + data = pytesseract.image_to_data(img, lang='kor+eng', output_type=pytesseract.Output.DICT) + + for i, text in enumerate(data['text']): + text = str(text).strip() + if len(text) < 2: # 너무 짧은 텍스트 무시 + continue + + x = data['left'][i] + y = data['top'][i] + w = data['width'][i] + h = data['height'][i] + + # 텍스트가 상하좌우 경계선에 너무 가깝다면 = 잘린 텍스트 박스일 가능성 높음 + # 좌측 경계 + if x <= margin: + return True + # 우측 경계 + if x + w >= width - margin: + return True + # 상단 경계 (제목 형태는 제외하기 위해 높이 제한 추가) + if y <= margin and h < height * 0.3: + return True + # 하단 경계 + if y + h >= height - margin: + return True + + return False + + except Exception as e: + # OCR 실패 시 필터링 없이 통과 (보수적 접근) + return False + + +def is_decorative_background(pix, edge_threshold=0.02, color_var_threshold=500): + """ + 배경 패턴(장식) 이미지인지 확인 + - 엣지 비율이 낮고 (복잡한 도형/사진이 아님) + - 색상 분산이 낮거나 특정 범위 내인 경우 (단조로운 그라데이션 등) + + Args: + pix: PyMuPDF Pixmap + edge_threshold: 엣지 비율 임계값 (기본 0.02 = 2%) + color_var_threshold: 색상 분산 임계값 + + Returns: + bool: 배경 이미지인 경우 True + """ + try: + nonwhite_ratio, edge_ratio, var = pixmap_metrics(pix) + + # 엣지 비율이 2% 미만이면서 단조로운 색상 분포라면 배경 패턴 가능성 높음 + if edge_ratio < edge_threshold and var < color_var_threshold: + # 추가적으로 텍스트 이미지인지 OCR로 체크 가능 + if TESSERACT_AVAILABLE: + try: + img = pix_to_pil(pix) + text = pytesseract.image_to_string(img, lang='kor+eng').strip() + + # 텍스트가 있고, 엣지 비율이 아주 낮다면 = 텍스트 배경 장식 + if len(text) > 3 and edge_ratio < 0.015: + return True + except: + pass + + return True + + return False + + except Exception: + return False + + +def is_header_footer_region(rect, page_rect, height_threshold=0.12): + """ + 헤더/푸터 영역에 포함되는지 확인 + - 상단 12% 또는 하단 12%에 위치한 작은 이미지는 필터링 + + Args: + rect: 이미지 영역 (fitz.Rect) + page_rect: 전체 페이지 영역 (fitz.Rect) + height_threshold: 헤더/푸터 영역 비율 (기본 12%) + + Returns: + bool: 헤더/푸터 영역이면 True + """ + page_height = page_rect.height + img_height = rect.height + + # 상단 영역 체크 + if rect.y0 < page_height * height_threshold: + # 매우 얇은 이미지(구분선 등)나 작은 로고 등 + if img_height < page_height * 0.15: + return True + + # 하단 영역 체크 + if rect.y1 > page_height * (1 - height_threshold): + # 푸터 영역의 작은 이미지 + if img_height < page_height * 0.15: + return True + + return False + + +def should_filter_image(pix, rect, page_rect): + """ + 여러 필터링 규칙을 종합하여 이미지 보존 여부 결정 + + Args: + pix: PyMuPDF Pixmap + rect: 이미지 영역 + page_rect: 전체 페이지 영역 + + Returns: + tuple: (필터링 여부, 필터링 이유) + """ + # 1. 헤더/푸터 영역 체크 + if is_header_footer_region(rect, page_rect): + return True, "header_footer" + + # 2. 잘린 텍스트 포함 여부 체크 + if has_cut_text_at_boundary(pix): + return True, "cut_text" + + # 3. 배경 장식 여부 체크 + if is_decorative_background(pix): + return True, "decorative_background" + + return False, None + + +def extract_pdf_content(pdf_path, output_md_path, img_dir, metadata): + """ + PDF 내용 추출 메인 함수 + + Args: + pdf_path: PDF 경로 + output_md_path: 출력 MD 경로 + img_dir: 이미지 저장 폴더 + metadata: 메타데이터 정보 (폴더 경로, 파일명 등) + + Returns: + image_metadata_list: 추출된 이미지 메타데이터 리스트 + """ + os.makedirs(img_dir, exist_ok=True) + + image_metadata_list = [] # 이미지 메타데이터 정보 수집 + + doc = fitz.open(pdf_path) + total_pages = len(doc) + + with open(output_md_path, "w", encoding="utf-8") as md_file: + # 문서 메타데이터 정보 추가 + md_file.write(f"---\n") + md_file.write(f"source_pdf: {metadata['pdf_name']}\n") + md_file.write(f"source_folder: {metadata['relative_folder']}\n") + md_file.write(f"total_pages: {total_pages}\n") + md_file.write(f"extracted_at: {datetime.now().isoformat()}\n") + md_file.write(f"---\n\n") + md_file.write(f"# {metadata['pdf_name']}\n\n") + + for page_num, page in enumerate(doc): + md_file.write(f"\n## Page {page_num + 1}\n\n") + img_rel_dir = os.path.basename(img_dir) + + figure_regions = get_figure_rects(page) + + kept_figures = [] + for i, fig in enumerate(figure_regions): + rect = fig['rect'] + pix_preview = page.get_pixmap(clip=rect, dpi=100, colorspace=fitz.csRGB) + ok, nonwhite_ratio, edge_ratio, var = keep_figure(pix_preview) + if not ok: + continue + + pix = page.get_pixmap(clip=rect, dpi=150, colorspace=fitz.csRGB) + + # 추가 필터링 로직 적용 (v2.1) + should_filter, filter_reason = should_filter_image(pix, rect, page.rect) + if should_filter: + continue + + img_name = f"p{page_num + 1:03d}_fig{len(kept_figures):02d}.png" + img_path = os.path.join(img_dir, img_name) + pix.save(img_path) + + fig['img_path'] = os.path.join(img_rel_dir, img_name).replace("\\", "/") + fig['img_name'] = img_name + kept_figures.append(fig) + + # 이미지 메타데이터 수집 + image_metadata_list.append({ + "image_file": img_name, + "image_path": str(Path(img_dir) / img_name), + "type": "figure", + "source_pdf": metadata['pdf_name'], + "source_folder": metadata['relative_folder'], + "full_path": metadata['full_path'], + "page": page_num + 1, + "total_pages": total_pages, + "caption": fig.get('caption_text', ''), + "rect": { + "x0": round(rect.x0, 2), + "y0": round(rect.y0, 2), + "x1": round(rect.x1, 2), + "y1": round(rect.y1, 2) + } + }) + + figure_regions = kept_figures + + caption_present = any( + CAPTION_PATTERN.search((tb[4] or "")) for tb in page.get_text("blocks") + ) + uncaptioned_idx = 0 + + items = [] + + def inside_any_figure(block_rect, figures): + for fig in figures: + intersect = block_rect & fig["rect"] + if intersect.get_area() > 0.5 * block_rect.get_area(): + return True + return False + + def is_full_width_rect(r, page_rect): + return r.width >= page_rect.width * 0.78 + + def figure_anchor_rect(fig, page_rect): + cap = fig["caption_rect"] + rect = fig["rect"] + if cap.y0 >= rect.y0: + y = max(0.0, cap.y0 - 0.02) + else: + y = min(page_rect.height - 0.02, cap.y1 + 0.02) + return fitz.Rect(cap.x0, y, cap.x1, y + 0.02) + + for fig in figure_regions: + anchor = figure_anchor_rect(fig, page.rect) + md = ( + f"\n![{fig.get('caption_text', 'Figure')}]({fig['img_path']})\n" + f"*{fig.get('caption_text', '')}*\n\n" + ) + items.append({ + "kind": "figure", + "rect": anchor, + "kind_order": 0, + "md": md, + }) + + raw_blocks = page.get_text("dict")["blocks"] + + for block in raw_blocks: + block_rect = fitz.Rect(block["bbox"]) + + if block.get("type") == 0: + if inside_any_figure(block_rect, figure_regions): + continue + items.append({ + "kind": "text", + "rect": block_rect, + "kind_order": 2, + "block": block, + }) + continue + + if block.get("type") == 1: + if inside_any_figure(block_rect, figure_regions): + continue + if caption_present: + continue + + page_area = page.rect.get_area() + if block_rect.get_area() < page_area * 0.005: + continue + + ratio = block_rect.width / max(1.0, block_rect.height) + if ratio < 0.25 or ratio > 4.0: + continue + + pix_preview = page.get_pixmap( + clip=block_rect, dpi=80, colorspace=fitz.csRGB + ) + ok, nonwhite_ratio, edge_ratio, var = keep_figure(pix_preview) + if not ok: + continue + + pix = page.get_pixmap( + clip=block_rect, dpi=150, colorspace=fitz.csRGB + ) + + # 추가 필터링 로직 적용 (v2.1) + should_filter, filter_reason = should_filter_image(pix, block_rect, page.rect) + if should_filter: + continue + + img_name = f"p{page_num + 1:03d}_photo{uncaptioned_idx:02d}.png" + img_path = os.path.join(img_dir, img_name) + pix.save(img_path) + + rel = os.path.join(img_rel_dir, img_name).replace("\\", "/") + r = block_rect + md = ( + f'\n![Photo]({rel})\n' + f'*Page {page_num + 1} Photo*\n\n' + ) + + items.append({ + "kind": "raster", + "rect": block_rect, + "kind_order": 1, + "md": md, + }) + + # 이미지 메타데이터 수집 + image_metadata_list.append({ + "image_file": img_name, + "image_path": str(Path(img_dir) / img_name), + "type": "photo", + "source_pdf": metadata['pdf_name'], + "source_folder": metadata['relative_folder'], + "full_path": metadata['full_path'], + "page": page_num + 1, + "total_pages": total_pages, + "caption": "", + "rect": { + "x0": round(r.x0, 2), + "y0": round(r.y0, 2), + "x1": round(r.x1, 2), + "y1": round(r.y1, 2) + } + }) + + uncaptioned_idx += 1 + continue + + # 레이아웃 정렬 + text_items = [it for it in items if it["kind"] == "text"] + page_w = page.rect.width + mid = page_w / 2.0 + + candidates = [] + for it in text_items: + r = it["rect"] + if is_full_width_rect(r, page.rect): + continue + if r.width < page_w * 0.2: + continue + candidates.append(it) + + left = [it for it in candidates if it["rect"].x0 < mid * 0.95] + right = [it for it in candidates if it["rect"].x0 > mid * 1.05] + two_cols = len(left) >= 3 and len(right) >= 3 + + col_y0 = None + col_y1 = None + seps = [] + + if two_cols and left and right: + col_y0 = min( + min(it["rect"].y0 for it in left), + min(it["rect"].y0 for it in right), + ) + col_y1 = max( + max(it["rect"].y1 for it in left), + max(it["rect"].y1 for it in right), + ) + for it in text_items: + r = it["rect"] + if col_y0 < r.y0 < col_y1 and is_full_width_rect(r, page.rect): + seps.append(r.y0) + seps = sorted(set(seps)) + + def seg_index(y0, separators): + if not separators: + return 0 + n = 0 + for s in separators: + if y0 >= s: + n += 1 + else: + break + return n + + def order_key(it): + r = it["rect"] + if not two_cols: + return (r.y0, r.x0, it["kind_order"]) + if col_y0 is not None and r.y1 <= col_y0: + return (0, r.y0, r.x0, it["kind_order"]) + if col_y1 is not None and r.y0 >= col_y1: + return (2, r.y0, r.x0, it["kind_order"]) + seg = seg_index(r.y0, seps) + if is_full_width_rect(r, page.rect): + col = 2 + else: + col = 0 if r.x0 < mid else 1 + return (1, seg, col, r.y0, r.x0, it["kind_order"]) + + items.sort(key=order_key) + + for it in items: + if it["kind"] in ("figure", "raster"): + md_file.write(it["md"]) + continue + + block = it["block"] + for line in block.get("lines", []): + for span in line.get("spans", []): + md_file.write(span.get("text", "") + " ") + md_file.write("\n") + md_file.write("\n") + + doc.close() + return image_metadata_list + + +def process_all_pdfs(input_dir, output_dir): + """ + BASE_DIR 내의 모든 PDF를 순차적으로 처리 + 폴더 구조를 유지하여 OUTPUT_BASE에 저장 + """ + BASE_DIR = Path(input_dir) + OUTPUT_BASE = Path(output_dir) + + # 출력 폴더 생성 + OUTPUT_BASE.mkdir(parents=True, exist_ok=True) + + # 전체 추출 된 이미지 메타데이터 통합 + all_image_metadata = [] + + # 처리 통계 + stats = { + "total_pdfs": 0, + "success": 0, + "failed": 0, + "total_images": 0 + } + + # 실패 로그 + failed_files = [] + + print(f"=" * 60) + print(f"PDF 콘텐츠 추출 시작") + print(f"소스 폴더: {BASE_DIR}") + print(f"출력 폴더: {OUTPUT_BASE}") + print(f"=" * 60) + + # 모든 PDF 파일 찾기 + pdf_files = list(BASE_DIR.rglob("*.pdf")) + list(BASE_DIR.rglob("*.PDF")) + stats["total_pdfs"] = len(pdf_files) + + print(f"발견된 PDF: {len(pdf_files)}개\n") + + for idx, pdf_path in enumerate(pdf_files, 1): + try: + # 상대 경로 계산 + relative_path = pdf_path.relative_to(BASE_DIR) + relative_folder = str(relative_path.parent) + if relative_folder == ".": + relative_folder = "" + + pdf_name = pdf_path.name + pdf_stem = pdf_path.stem + + # 출력 경로 설정 (폴더 구조 유지) + output_folder = OUTPUT_BASE / relative_path.parent + output_folder.mkdir(parents=True, exist_ok=True) + + output_md = output_folder / f"{pdf_stem}.md" + img_folder = output_folder / f"{pdf_stem}_img" + + # 메타데이터 준비 + metadata = { + "pdf_name": pdf_name, + "pdf_stem": pdf_stem, + "relative_folder": relative_folder, + "full_path": str(relative_path), + } + + print(f"[{idx}/{len(pdf_files)}] {relative_path}") + + # PDF 처리 + image_metas = extract_pdf_content( + str(pdf_path), + str(output_md), + str(img_folder), + metadata + ) + + all_image_metadata.extend(image_metas) + stats["success"] += 1 + stats["total_images"] += len(image_metas) + + print(f" 완료 (약 {len(image_metas)}개 이미지 추출)") + + except Exception as e: + stats["failed"] += 1 + failed_files.append({ + "file": str(pdf_path), + "error": str(e) + }) + print(f" 오류 발생: {e}") + + # 전체 이미지 메타데이터 저장 + meta_output_path = OUTPUT_BASE / "image_metadata.json" + with open(meta_output_path, "w", encoding="utf-8") as f: + json.dump(all_image_metadata, f, ensure_ascii=False, indent=2) + + # 처리 결과 요약 저장 + summary = { + "processed_at": datetime.now().isoformat(), + "source_dir": str(BASE_DIR), + "output_dir": str(OUTPUT_BASE), + "statistics": stats, + "failed_files": failed_files + } + + summary_path = OUTPUT_BASE / "extraction_summary.json" + with open(summary_path, "w", encoding="utf-8") as f: + json.dump(summary, f, ensure_ascii=False, indent=2) + + # 결과 출력 + print(f"\n" + "=" * 60) + print(f"추출 작업 완료!") + print(f"=" * 60) + print(f"총 대상: {stats['total_pdfs']}개") + print(f"성공: {stats['success']}개") + print(f"실패: {stats['failed']}개") + print(f"추출된 이미지: {stats['total_images']}개") + print(f"\n이미지 메타데이터: {meta_output_path}") + print(f"처리 요약: {summary_path}") + + if failed_files: + print(f"\n실패한 파일 목록은 summary_path에서 확인 가능합니다.")