From 305453c1f49ab9aa9ce1ba0b7016e6a6a65c08cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9D=B4=EA=B2=BD=EB=AF=BC?= Date: Thu, 19 Mar 2026 14:01:37 +0900 Subject: [PATCH] =?UTF-8?q?Cleanup:=20Deleting=2003.Code/=EC=97=85?= =?UTF-8?q?=EB=A1=9C=EB=93=9C=EC=9A=A9/converters/pipeline/step2=5Fextract?= =?UTF-8?q?.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../converters/pipeline/step2_extract.py | 791 ------------------ 1 file changed, 791 deletions(-) delete mode 100644 03.Code/업로드용/converters/pipeline/step2_extract.py diff --git a/03.Code/업로드용/converters/pipeline/step2_extract.py b/03.Code/업로드용/converters/pipeline/step2_extract.py deleted file mode 100644 index 0f4c4f1..0000000 --- a/03.Code/업로드용/converters/pipeline/step2_extract.py +++ /dev/null @@ -1,791 +0,0 @@ -# -*- coding: utf-8 -*- -""" -extract_1_v2.py - -PDF에서 텍스트(md)와 이미지(png)를 추출 -- 하위 폴더 구조 유지 -- 이미지 메타데이터 JSON 생성 (폴더경로, 파일명, 페이지, 위치, 캡션 등) -""" - -import fitz # PyMuPDF -import os -import re -import json -import numpy as np -from pathlib import Path -from datetime import datetime -from PIL import Image -import io - -# ===== OCR 설정 (선택적) ===== -try: - import pytesseract - import shutil - tesseract_path = shutil.which("tesseract") - if tesseract_path: - pytesseract.pytesseract.tesseract_cmd = tesseract_path - TESSERACT_AVAILABLE = True -except ImportError: - TESSERACT_AVAILABLE = False - print("[INFO] pytesseract 미설치 - 텍스트 잘림 필터 비활성화") - - -CAPTION_PATTERN = re.compile( - r'^\s*(?:[<\[\(\{]\s*)?(그림|figure|fig)\s*\.?\s*(?:[<\[\(\{]\s*)?0*\d+(?:\s*[-–]\s*\d+)?', - re.IGNORECASE -) - - -def get_figure_rects(page): - """ - Identifies figure regions based on '<그림 N>' captions and vector drawings. - Returns a list of dicts: {'rect': fitz.Rect, 'caption_block': block_index} - """ - drawings = page.get_drawings() - - blocks = page.get_text("blocks") - captions = [] - - for i, b in enumerate(blocks): - text = b[4] - if CAPTION_PATTERN.search(text): - captions.append({'rect': fitz.Rect(b[:4]), 'index': i, 'text': text, 'drawings': []}) - - if not captions: - return [] - - filtered_drawings_rects = [] - for d in drawings: - r = d["rect"] - if r.height > page.rect.height / 3 and r.width < 5: - continue - if r.width > page.rect.width * 0.9: - continue - filtered_drawings_rects.append(r) - - page_area = page.rect.get_area() - img_rects = [] - for b in page.get_text("dict")["blocks"]: - if b.get("type") == 1: - ir = fitz.Rect(b["bbox"]) - if ir.get_area() < page_area * 0.01: - continue - img_rects.append(ir) - - remaining_drawings = filtered_drawings_rects + img_rects - caption_clusters = {cap['index']: [cap['rect']] for cap in captions} - - def is_text_between(r1, r2, text_blocks): - if r1.intersects(r2): - return False - union = r1 | r2 - for b in text_blocks: - b_rect = fitz.Rect(b[:4]) - text_content = b[4] - if len(text_content.strip()) < 20: - continue - if not b_rect.intersects(union): - continue - if b_rect.intersects(r1) or b_rect.intersects(r2): - continue - return True - return False - - changed = True - while changed: - changed = False - to_remove = [] - - for d_rect in remaining_drawings: - best_cluster_key = None - min_dist = float('inf') - - for cap_index, cluster_rects in caption_clusters.items(): - for r in cluster_rects: - dist = 0 - if d_rect.intersects(r): - dist = 0 - else: - x_dist = 0 - if d_rect.x1 < r.x0: x_dist = r.x0 - d_rect.x1 - elif d_rect.x0 > r.x1: x_dist = d_rect.x0 - r.x1 - - y_dist = 0 - if d_rect.y1 < r.y0: y_dist = r.y0 - d_rect.y1 - elif d_rect.y0 > r.y1: y_dist = d_rect.y0 - r.y1 - - if x_dist < 150 and y_dist < 150: - dist = max(x_dist, y_dist) + 0.1 - else: - dist = float('inf') - - if dist < min_dist: - if not is_text_between(r, d_rect, blocks): - min_dist = dist - best_cluster_key = cap_index - - if min_dist == 0: - break - - if best_cluster_key is not None and min_dist < 150: - caption_clusters[best_cluster_key].append(d_rect) - to_remove.append(d_rect) - changed = True - - for r in to_remove: - remaining_drawings.remove(r) - - figure_regions = [] - - for cap in captions: - cluster_rects = caption_clusters[cap['index']] - content_rects = cluster_rects[1:] - - if not content_rects: - continue - - union_rect = content_rects[0] - for r in content_rects[1:]: - union_rect = union_rect | r - - union_rect.x0 = max(0, union_rect.x0 - 5) - union_rect.x1 = min(page.rect.width, union_rect.x1 + 5) - union_rect.y0 = max(0, union_rect.y0 - 5) - union_rect.y1 = min(page.rect.height, union_rect.y1 + 5) - - cap_rect = cap['rect'] - - if cap_rect.y0 + cap_rect.height/2 < union_rect.y0 + union_rect.height/2: - if union_rect.y0 < cap_rect.y1: union_rect.y0 = cap_rect.y1 + 2 - else: - if union_rect.y1 > cap_rect.y0: union_rect.y1 = cap_rect.y0 - 2 - - area = union_rect.get_area() - page_area = page.rect.get_area() - - if area < page_area * 0.01: - continue - - if union_rect.height < 20 and union_rect.width > page.rect.width * 0.6: - continue - if union_rect.width < 20 and union_rect.height > page.rect.height * 0.6: - continue - - text_blocks = page.get_text("blocks") - text_count = 0 - - for b in text_blocks: - b_rect = fitz.Rect(b[:4]) - if not b_rect.intersects(union_rect): - continue - text = b[4].strip() - if len(text) < 5: - continue - text_count += 1 - - if text_count < 0: - continue - - figure_regions.append({ - 'rect': union_rect, - 'caption_index': cap['index'], - 'caption_rect': cap['rect'], - 'caption_text': cap['text'].strip() # ★ 캡션 텍스트 저장 - }) - - return figure_regions - - -def pixmap_metrics(pix): - arr = np.frombuffer(pix.samples, dtype=np.uint8) - c = 4 if pix.alpha else 3 - arr = arr.reshape(pix.height, pix.width, c)[:, :, :3] - gray = (0.299 * arr[:, :, 0] + 0.587 * arr[:, :, 1] + 0.114 * arr[:, :, 2]).astype(np.uint8) - white = gray > 245 - nonwhite_ratio = float(1.0 - white.mean()) - gx = np.abs(np.diff(gray.astype(np.int16), axis=1)) - gy = np.abs(np.diff(gray.astype(np.int16), axis=0)) - edge = (gx[:-1, :] + gy[:, :-1]) > 40 - edge_ratio = float(edge.mean()) - var = float(gray.var()) - return nonwhite_ratio, edge_ratio, var - - -def keep_figure(pix): - nonwhite_ratio, edge_ratio, var = pixmap_metrics(pix) - if nonwhite_ratio < 0.004: - return False, nonwhite_ratio, edge_ratio, var - if nonwhite_ratio < 0.012 and edge_ratio < 0.004 and var < 20: - return False, nonwhite_ratio, edge_ratio, var - return True, nonwhite_ratio, edge_ratio, var - - -# ===== 추가 이미지 필터 함수들 (v2.1) ===== - -def pix_to_pil(pix): - """PyMuPDF Pixmap을 PIL Image로 변환""" - img_data = pix.tobytes("png") - return Image.open(io.BytesIO(img_data)) - - -def has_cut_text_at_boundary(pix, margin=5): - """ - 이미지 경계에서 텍스트가 잘렸는지 감지 - - 이미지 테두리 근처에 텍스트 박스가 있으면 잘린 것으로 판단 - - Args: - pix: PyMuPDF Pixmap - margin: 경계로부터의 여유 픽셀 (기본 5px) - - Returns: - bool: 텍스트가 잘렸으면 True - """ - if not TESSERACT_AVAILABLE: - return False # OCR 없으면 필터 비활성화 - - try: - img = pix_to_pil(pix) - width, height = img.size - - # OCR로 텍스트 위치 추출 - data = pytesseract.image_to_data(img, lang='kor+eng', output_type=pytesseract.Output.DICT) - - for i, text in enumerate(data['text']): - text = str(text).strip() - if len(text) < 2: # 너무 짧은 텍스트는 무시 - continue - - x = data['left'][i] - y = data['top'][i] - w = data['width'][i] - h = data['height'][i] - - # 텍스트가 이미지 경계에 너무 가까우면 = 잘린 것 - # 왼쪽 경계 - if x <= margin: - return True - # 오른쪽 경계 - if x + w >= width - margin: - return True - # 상단 경계 (헤더 제외를 위해 좀 더 여유) - if y <= margin and h < height * 0.3: - return True - # 하단 경계 - if y + h >= height - margin: - return True - - return False - - except Exception as e: - # OCR 실패 시 필터 통과 (이미지 유지) - return False - - -def is_decorative_background(pix, edge_threshold=0.02, color_var_threshold=500): - """ - 배경 패턴 + 텍스트만 있는 장식용 이미지인지 감지 - - 엣지가 적고 (복잡한 도표/사진이 아님) - - 색상 다양성이 낮으면 (단순 그라데이션 배경) - - Args: - pix: PyMuPDF Pixmap - edge_threshold: 엣지 비율 임계값 (기본 0.02 = 2%) - color_var_threshold: 색상 분산 임계값 - - Returns: - bool: 장식용 배경이면 True - """ - try: - nonwhite_ratio, edge_ratio, var = pixmap_metrics(pix) - - # 엣지가 거의 없고 (단순한 이미지) - # 색상 분산도 낮으면 (배경 패턴) - if edge_ratio < edge_threshold and var < color_var_threshold: - # 추가 확인: 텍스트만 있는지 OCR로 체크 - if TESSERACT_AVAILABLE: - try: - img = pix_to_pil(pix) - text = pytesseract.image_to_string(img, lang='kor+eng').strip() - - # 텍스트가 있고, 이미지가 단순하면 = 텍스트 배경 - if len(text) > 3 and edge_ratio < 0.015: - return True - except: - pass - - return True - - return False - - except Exception: - return False - - -def is_header_footer_region(rect, page_rect, height_threshold=0.12): - """ - 헤더/푸터 영역에 있는 이미지인지 감지 - - 페이지 상단 12% 또는 하단 12%에 위치 - - 높이가 낮은 strip 형태 - - Args: - rect: 이미지 영역 (fitz.Rect) - page_rect: 페이지 전체 영역 (fitz.Rect) - height_threshold: 헤더/푸터 영역 비율 (기본 12%) - - Returns: - bool: 헤더/푸터 영역이면 True - """ - page_height = page_rect.height - img_height = rect.height - - # 상단 영역 체크 - if rect.y0 < page_height * height_threshold: - # 높이가 페이지의 15% 미만인 strip이면 헤더 - if img_height < page_height * 0.15: - return True - - # 하단 영역 체크 - if rect.y1 > page_height * (1 - height_threshold): - # 높이가 페이지의 15% 미만인 strip이면 푸터 - if img_height < page_height * 0.15: - return True - - return False - - -def should_filter_image(pix, rect, page_rect): - """ - 이미지를 필터링해야 하는지 종합 판단 - - Args: - pix: PyMuPDF Pixmap - rect: 이미지 영역 - page_rect: 페이지 전체 영역 - - Returns: - tuple: (필터링 여부, 필터링 사유) - """ - # 1. 헤더/푸터 영역 체크 - if is_header_footer_region(rect, page_rect): - return True, "header_footer" - - # 2. 텍스트 잘림 체크 - if has_cut_text_at_boundary(pix): - return True, "cut_text" - - # 3. 장식용 배경 체크 - if is_decorative_background(pix): - return True, "decorative_background" - - return False, None - - -def extract_pdf_content(pdf_path, output_md_path, img_dir, metadata): - """ - PDF 내용 추출 - - Args: - pdf_path: PDF 파일 경로 - output_md_path: 출력 MD 파일 경로 - img_dir: 이미지 저장 폴더 - metadata: 메타데이터 딕셔너리 (폴더 경로, 파일명 등) - - Returns: - image_metadata_list: 추출된 이미지들의 메타데이터 리스트 - """ - os.makedirs(img_dir, exist_ok=True) - - image_metadata_list = [] # ★ 이미지 메타데이터 수집 - - doc = fitz.open(pdf_path) - total_pages = len(doc) - - with open(output_md_path, "w", encoding="utf-8") as md_file: - # ★ 메타데이터 헤더 추가 - md_file.write(f"---\n") - md_file.write(f"source_pdf: {metadata['pdf_name']}\n") - md_file.write(f"source_folder: {metadata['relative_folder']}\n") - md_file.write(f"total_pages: {total_pages}\n") - md_file.write(f"extracted_at: {datetime.now().isoformat()}\n") - md_file.write(f"---\n\n") - md_file.write(f"# {metadata['pdf_name']}\n\n") - - for page_num, page in enumerate(doc): - md_file.write(f"\n## Page {page_num + 1}\n\n") - img_rel_dir = os.path.basename(img_dir) - - figure_regions = get_figure_rects(page) - - kept_figures = [] - for i, fig in enumerate(figure_regions): - rect = fig['rect'] - pix_preview = page.get_pixmap(clip=rect, dpi=100, colorspace=fitz.csRGB) - ok, nonwhite_ratio, edge_ratio, var = keep_figure(pix_preview) - if not ok: - continue - - pix = page.get_pixmap(clip=rect, dpi=150, colorspace=fitz.csRGB) - - # ★ 추가 필터 적용 (v2.1) - should_filter, filter_reason = should_filter_image(pix, rect, page.rect) - if should_filter: - continue - - img_name = f"p{page_num + 1:03d}_fig{len(kept_figures):02d}.png" - img_path = os.path.join(img_dir, img_name) - pix.save(img_path) - - fig['img_path'] = os.path.join(img_rel_dir, img_name).replace("\\", "/") - fig['img_name'] = img_name - kept_figures.append(fig) - - # ★ 이미지 메타데이터 수집 - image_metadata_list.append({ - "image_file": img_name, - "image_path": str(Path(img_dir) / img_name), - "type": "figure", - "source_pdf": metadata['pdf_name'], - "source_folder": metadata['relative_folder'], - "full_path": metadata['full_path'], - "page": page_num + 1, - "total_pages": total_pages, - "caption": fig.get('caption_text', ''), - "rect": { - "x0": round(rect.x0, 2), - "y0": round(rect.y0, 2), - "x1": round(rect.x1, 2), - "y1": round(rect.y1, 2) - } - }) - - figure_regions = kept_figures - - caption_present = any( - CAPTION_PATTERN.search((tb[4] or "")) for tb in page.get_text("blocks") - ) - uncaptioned_idx = 0 - - items = [] - - def inside_any_figure(block_rect, figures): - for fig in figures: - intersect = block_rect & fig["rect"] - if intersect.get_area() > 0.5 * block_rect.get_area(): - return True - return False - - def is_full_width_rect(r, page_rect): - return r.width >= page_rect.width * 0.78 - - def figure_anchor_rect(fig, page_rect): - cap = fig["caption_rect"] - rect = fig["rect"] - if cap.y0 >= rect.y0: - y = max(0.0, cap.y0 - 0.02) - else: - y = min(page_rect.height - 0.02, cap.y1 + 0.02) - return fitz.Rect(cap.x0, y, cap.x1, y + 0.02) - - for fig in figure_regions: - anchor = figure_anchor_rect(fig, page.rect) - md = ( - f"\n![{fig.get('caption_text', 'Figure')}]({fig['img_path']})\n" - f"*{fig.get('caption_text', '')}*\n\n" - ) - items.append({ - "kind": "figure", - "rect": anchor, - "kind_order": 0, - "md": md, - }) - - raw_blocks = page.get_text("dict")["blocks"] - - for block in raw_blocks: - block_rect = fitz.Rect(block["bbox"]) - - if block.get("type") == 0: - if inside_any_figure(block_rect, figure_regions): - continue - items.append({ - "kind": "text", - "rect": block_rect, - "kind_order": 2, - "block": block, - }) - continue - - if block.get("type") == 1: - if inside_any_figure(block_rect, figure_regions): - continue - if caption_present: - continue - - page_area = page.rect.get_area() - if block_rect.get_area() < page_area * 0.005: - continue - - ratio = block_rect.width / max(1.0, block_rect.height) - if ratio < 0.25 or ratio > 4.0: - continue - - pix_preview = page.get_pixmap( - clip=block_rect, dpi=80, colorspace=fitz.csRGB - ) - ok, nonwhite_ratio, edge_ratio, var = keep_figure(pix_preview) - if not ok: - continue - - pix = page.get_pixmap( - clip=block_rect, dpi=150, colorspace=fitz.csRGB - ) - - # ★ 추가 필터 적용 (v2.1) - should_filter, filter_reason = should_filter_image(pix, block_rect, page.rect) - if should_filter: - continue - - img_name = f"p{page_num + 1:03d}_photo{uncaptioned_idx:02d}.png" - img_path = os.path.join(img_dir, img_name) - pix.save(img_path) - - rel = os.path.join(img_rel_dir, img_name).replace("\\", "/") - r = block_rect - md = ( - f'\n![Photo]({rel})\n' - f'*Page {page_num + 1} Photo*\n\n' - ) - - items.append({ - "kind": "raster", - "rect": block_rect, - "kind_order": 1, - "md": md, - }) - - # ★ 캡션 없는 이미지 메타데이터 - image_metadata_list.append({ - "image_file": img_name, - "image_path": str(Path(img_dir) / img_name), - "type": "photo", - "source_pdf": metadata['pdf_name'], - "source_folder": metadata['relative_folder'], - "full_path": metadata['full_path'], - "page": page_num + 1, - "total_pages": total_pages, - "caption": "", - "rect": { - "x0": round(r.x0, 2), - "y0": round(r.y0, 2), - "x1": round(r.x1, 2), - "y1": round(r.y1, 2) - } - }) - - uncaptioned_idx += 1 - continue - - # 읽기 순서 정렬 - text_items = [it for it in items if it["kind"] == "text"] - page_w = page.rect.width - mid = page_w / 2.0 - - candidates = [] - for it in text_items: - r = it["rect"] - if is_full_width_rect(r, page.rect): - continue - if r.width < page_w * 0.2: - continue - candidates.append(it) - - left = [it for it in candidates if it["rect"].x0 < mid * 0.95] - right = [it for it in candidates if it["rect"].x0 > mid * 1.05] - two_cols = len(left) >= 3 and len(right) >= 3 - - col_y0 = None - col_y1 = None - seps = [] - - if two_cols and left and right: - col_y0 = min( - min(it["rect"].y0 for it in left), - min(it["rect"].y0 for it in right), - ) - col_y1 = max( - max(it["rect"].y1 for it in left), - max(it["rect"].y1 for it in right), - ) - for it in text_items: - r = it["rect"] - if col_y0 < r.y0 < col_y1 and is_full_width_rect(r, page.rect): - seps.append(r.y0) - seps = sorted(set(seps)) - - def seg_index(y0, separators): - if not separators: - return 0 - n = 0 - for s in separators: - if y0 >= s: - n += 1 - else: - break - return n - - def order_key(it): - r = it["rect"] - if not two_cols: - return (r.y0, r.x0, it["kind_order"]) - if col_y0 is not None and r.y1 <= col_y0: - return (0, r.y0, r.x0, it["kind_order"]) - if col_y1 is not None and r.y0 >= col_y1: - return (2, r.y0, r.x0, it["kind_order"]) - seg = seg_index(r.y0, seps) - if is_full_width_rect(r, page.rect): - col = 2 - else: - col = 0 if r.x0 < mid else 1 - return (1, seg, col, r.y0, r.x0, it["kind_order"]) - - items.sort(key=order_key) - - for it in items: - if it["kind"] in ("figure", "raster"): - md_file.write(it["md"]) - continue - - block = it["block"] - for line in block.get("lines", []): - for span in line.get("spans", []): - md_file.write(span.get("text", "") + " ") - md_file.write("\n") - md_file.write("\n") - - doc.close() - return image_metadata_list - - -def process_all_pdfs(input_dir, output_dir): - """ - BASE_DIR 하위의 모든 PDF를 재귀적으로 처리 - 폴더 구조를 유지하면서 OUTPUT_BASE에 저장 - """ - BASE_DIR = Path(input_dir) - OUTPUT_BASE = Path(output_dir) - # 출력 폴더 생성 - OUTPUT_BASE.mkdir(parents=True, exist_ok=True) - - # 전체 이미지 메타데이터 수집 - all_image_metadata = [] - - # 처리 통계 - stats = { - "total_pdfs": 0, - "success": 0, - "failed": 0, - "total_images": 0 - } - - # 실패 로그 - failed_files = [] - - print(f"=" * 60) - print(f"PDF 추출 시작") - print(f"원본 폴더: {BASE_DIR}") - print(f"출력 폴더: {OUTPUT_BASE}") - print(f"=" * 60) - - # 모든 PDF 파일 찾기 - pdf_files = list(BASE_DIR.rglob("*.pdf")) - stats["total_pdfs"] = len(pdf_files) - - print(f"\n총 {len(pdf_files)}개 PDF 발견\n") - - for idx, pdf_path in enumerate(pdf_files, 1): - try: - # 상대 경로 계산 - relative_path = pdf_path.relative_to(BASE_DIR) - relative_folder = str(relative_path.parent) - if relative_folder == ".": - relative_folder = "" - - pdf_name = pdf_path.name - pdf_stem = pdf_path.stem - - # 출력 경로 설정 (폴더 구조 유지) - output_folder = OUTPUT_BASE / relative_path.parent - output_folder.mkdir(parents=True, exist_ok=True) - - output_md = output_folder / f"{pdf_stem}.md" - img_folder = output_folder / f"{pdf_stem}_img" - - # 메타데이터 준비 - metadata = { - "pdf_name": pdf_name, - "pdf_stem": pdf_stem, - "relative_folder": relative_folder, - "full_path": str(relative_path), - } - - print(f"[{idx}/{len(pdf_files)}] {relative_path}") - - # PDF 처리 - image_metas = extract_pdf_content( - str(pdf_path), - str(output_md), - str(img_folder), - metadata - ) - - all_image_metadata.extend(image_metas) - stats["success"] += 1 - stats["total_images"] += len(image_metas) - - print(f" ✓ 완료 (이미지 {len(image_metas)}개)") - - except Exception as e: - stats["failed"] += 1 - failed_files.append({ - "file": str(pdf_path), - "error": str(e) - }) - print(f" ✗ 실패: {e}") - - # 전체 이미지 메타데이터 저장 - meta_output_path = OUTPUT_BASE / "image_metadata.json" - with open(meta_output_path, "w", encoding="utf-8") as f: - json.dump(all_image_metadata, f, ensure_ascii=False, indent=2) - - # 처리 요약 저장 - summary = { - "processed_at": datetime.now().isoformat(), - "source_dir": str(BASE_DIR), - "output_dir": str(OUTPUT_BASE), - "statistics": stats, - "failed_files": failed_files - } - - summary_path = OUTPUT_BASE / "extraction_summary.json" - with open(summary_path, "w", encoding="utf-8") as f: - json.dump(summary, f, ensure_ascii=False, indent=2) - - # 결과 출력 - print(f"\n" + "=" * 60) - print(f"추출 완료!") - print(f"=" * 60) - print(f"총 PDF: {stats['total_pdfs']}개") - print(f"성공: {stats['success']}개") - print(f"실패: {stats['failed']}개") - print(f"추출된 이미지: {stats['total_images']}개") - print(f"\n이미지 메타데이터: {meta_output_path}") - print(f"처리 요약: {summary_path}") - - if failed_files: - print(f"\n실패한 파일:") - for f in failed_files: - print(f" - {f['file']}: {f['error']}") - - -if __name__ == "__main__": - process_all_pdfs()