# -*- coding: utf-8 -*- """ extract_1_v2.py PDF에서 텍스트(md)와 이미지(png)를 추출 - 하위 폴더 구조 유지 - 이미지 메타데이터 JSON 생성 (폴더경로, 파일명, 페이지, 위치, 캡션 등) """ import fitz # PyMuPDF import os import re import json import numpy as np from pathlib import Path from datetime import datetime from PIL import Image import io # ===== OCR 설정 (선택적) ===== try: import pytesseract pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" TESSERACT_AVAILABLE = True except ImportError: TESSERACT_AVAILABLE = False print("[INFO] pytesseract 미설치 - 텍스트 잘림 필터 비활성화") # ===== 경로 설정 ===== BASE_DIR = Path(r"D:\for python\survey_test\extract") # PDF 원본 위치 OUTPUT_BASE = Path(r"D:\for python\survey_test\process") # 출력 위치 CAPTION_PATTERN = re.compile( r'^\s*(?:[<\[\(\{]\s*)?(그림|figure|fig)\s*\.?\s*(?:[<\[\(\{]\s*)?0*\d+(?:\s*[-–]\s*\d+)?', re.IGNORECASE ) def get_figure_rects(page): """ Identifies figure regions based on '<그림 N>' captions and vector drawings. Returns a list of dicts: {'rect': fitz.Rect, 'caption_block': block_index} """ drawings = page.get_drawings() blocks = page.get_text("blocks") captions = [] for i, b in enumerate(blocks): text = b[4] if CAPTION_PATTERN.search(text): captions.append({'rect': fitz.Rect(b[:4]), 'index': i, 'text': text, 'drawings': []}) if not captions: return [] filtered_drawings_rects = [] for d in drawings: r = d["rect"] if r.height > page.rect.height / 3 and r.width < 5: continue if r.width > page.rect.width * 0.9: continue filtered_drawings_rects.append(r) page_area = page.rect.get_area() img_rects = [] for b in page.get_text("dict")["blocks"]: if b.get("type") == 1: ir = fitz.Rect(b["bbox"]) if ir.get_area() < page_area * 0.01: continue img_rects.append(ir) remaining_drawings = filtered_drawings_rects + img_rects caption_clusters = {cap['index']: [cap['rect']] for cap in captions} def is_text_between(r1, r2, text_blocks): if r1.intersects(r2): return False union = r1 | r2 for b in text_blocks: b_rect = fitz.Rect(b[:4]) text_content = b[4] if len(text_content.strip()) < 20: continue if not b_rect.intersects(union): continue if b_rect.intersects(r1) or b_rect.intersects(r2): continue return True return False changed = True while changed: changed = False to_remove = [] for d_rect in remaining_drawings: best_cluster_key = None min_dist = float('inf') for cap_index, cluster_rects in caption_clusters.items(): for r in cluster_rects: dist = 0 if d_rect.intersects(r): dist = 0 else: x_dist = 0 if d_rect.x1 < r.x0: x_dist = r.x0 - d_rect.x1 elif d_rect.x0 > r.x1: x_dist = d_rect.x0 - r.x1 y_dist = 0 if d_rect.y1 < r.y0: y_dist = r.y0 - d_rect.y1 elif d_rect.y0 > r.y1: y_dist = d_rect.y0 - r.y1 if x_dist < 150 and y_dist < 150: dist = max(x_dist, y_dist) + 0.1 else: dist = float('inf') if dist < min_dist: if not is_text_between(r, d_rect, blocks): min_dist = dist best_cluster_key = cap_index if min_dist == 0: break if best_cluster_key is not None and min_dist < 150: caption_clusters[best_cluster_key].append(d_rect) to_remove.append(d_rect) changed = True for r in to_remove: remaining_drawings.remove(r) figure_regions = [] for cap in captions: cluster_rects = caption_clusters[cap['index']] content_rects = cluster_rects[1:] if not content_rects: continue union_rect = content_rects[0] for r in content_rects[1:]: union_rect = union_rect | r union_rect.x0 = max(0, union_rect.x0 - 5) union_rect.x1 = min(page.rect.width, union_rect.x1 + 5) union_rect.y0 = max(0, union_rect.y0 - 5) union_rect.y1 = min(page.rect.height, union_rect.y1 + 5) cap_rect = cap['rect'] if cap_rect.y0 + cap_rect.height/2 < union_rect.y0 + union_rect.height/2: if union_rect.y0 < cap_rect.y1: union_rect.y0 = cap_rect.y1 + 2 else: if union_rect.y1 > cap_rect.y0: union_rect.y1 = cap_rect.y0 - 2 area = union_rect.get_area() page_area = page.rect.get_area() if area < page_area * 0.01: continue if union_rect.height < 20 and union_rect.width > page.rect.width * 0.6: continue if union_rect.width < 20 and union_rect.height > page.rect.height * 0.6: continue text_blocks = page.get_text("blocks") text_count = 0 for b in text_blocks: b_rect = fitz.Rect(b[:4]) if not b_rect.intersects(union_rect): continue text = b[4].strip() if len(text) < 5: continue text_count += 1 if text_count < 0: continue figure_regions.append({ 'rect': union_rect, 'caption_index': cap['index'], 'caption_rect': cap['rect'], 'caption_text': cap['text'].strip() # ★ 캡션 텍스트 저장 }) return figure_regions def pixmap_metrics(pix): arr = np.frombuffer(pix.samples, dtype=np.uint8) c = 4 if pix.alpha else 3 arr = arr.reshape(pix.height, pix.width, c)[:, :, :3] gray = (0.299 * arr[:, :, 0] + 0.587 * arr[:, :, 1] + 0.114 * arr[:, :, 2]).astype(np.uint8) white = gray > 245 nonwhite_ratio = float(1.0 - white.mean()) gx = np.abs(np.diff(gray.astype(np.int16), axis=1)) gy = np.abs(np.diff(gray.astype(np.int16), axis=0)) edge = (gx[:-1, :] + gy[:, :-1]) > 40 edge_ratio = float(edge.mean()) var = float(gray.var()) return nonwhite_ratio, edge_ratio, var def keep_figure(pix): nonwhite_ratio, edge_ratio, var = pixmap_metrics(pix) if nonwhite_ratio < 0.004: return False, nonwhite_ratio, edge_ratio, var if nonwhite_ratio < 0.012 and edge_ratio < 0.004 and var < 20: return False, nonwhite_ratio, edge_ratio, var return True, nonwhite_ratio, edge_ratio, var # ===== 추가 이미지 필터 함수들 (v2.1) ===== def pix_to_pil(pix): """PyMuPDF Pixmap을 PIL Image로 변환""" img_data = pix.tobytes("png") return Image.open(io.BytesIO(img_data)) def has_cut_text_at_boundary(pix, margin=5): """ 이미지 경계에서 텍스트가 잘렸는지 감지 - 이미지 테두리 근처에 텍스트 박스가 있으면 잘린 것으로 판단 Args: pix: PyMuPDF Pixmap margin: 경계로부터의 여유 픽셀 (기본 5px) Returns: bool: 텍스트가 잘렸으면 True """ if not TESSERACT_AVAILABLE: return False # OCR 없으면 필터 비활성화 try: img = pix_to_pil(pix) width, height = img.size # OCR로 텍스트 위치 추출 data = pytesseract.image_to_data(img, lang='kor+eng', output_type=pytesseract.Output.DICT) for i, text in enumerate(data['text']): text = str(text).strip() if len(text) < 2: # 너무 짧은 텍스트는 무시 continue x = data['left'][i] y = data['top'][i] w = data['width'][i] h = data['height'][i] # 텍스트가 이미지 경계에 너무 가까우면 = 잘린 것 # 왼쪽 경계 if x <= margin: return True # 오른쪽 경계 if x + w >= width - margin: return True # 상단 경계 (헤더 제외를 위해 좀 더 여유) if y <= margin and h < height * 0.3: return True # 하단 경계 if y + h >= height - margin: return True return False except Exception as e: # OCR 실패 시 필터 통과 (이미지 유지) return False def is_decorative_background(pix, edge_threshold=0.02, color_var_threshold=500): """ 배경 패턴 + 텍스트만 있는 장식용 이미지인지 감지 - 엣지가 적고 (복잡한 도표/사진이 아님) - 색상 다양성이 낮으면 (단순 그라데이션 배경) Args: pix: PyMuPDF Pixmap edge_threshold: 엣지 비율 임계값 (기본 0.02 = 2%) color_var_threshold: 색상 분산 임계값 Returns: bool: 장식용 배경이면 True """ try: nonwhite_ratio, edge_ratio, var = pixmap_metrics(pix) # 엣지가 거의 없고 (단순한 이미지) # 색상 분산도 낮으면 (배경 패턴) if edge_ratio < edge_threshold and var < color_var_threshold: # 추가 확인: 텍스트만 있는지 OCR로 체크 if TESSERACT_AVAILABLE: try: img = pix_to_pil(pix) text = pytesseract.image_to_string(img, lang='kor+eng').strip() # 텍스트가 있고, 이미지가 단순하면 = 텍스트 배경 if len(text) > 3 and edge_ratio < 0.015: return True except: pass return True return False except Exception: return False def is_header_footer_region(rect, page_rect, height_threshold=0.12): """ 헤더/푸터 영역에 있는 이미지인지 감지 - 페이지 상단 12% 또는 하단 12%에 위치 - 높이가 낮은 strip 형태 Args: rect: 이미지 영역 (fitz.Rect) page_rect: 페이지 전체 영역 (fitz.Rect) height_threshold: 헤더/푸터 영역 비율 (기본 12%) Returns: bool: 헤더/푸터 영역이면 True """ page_height = page_rect.height img_height = rect.height # 상단 영역 체크 if rect.y0 < page_height * height_threshold: # 높이가 페이지의 15% 미만인 strip이면 헤더 if img_height < page_height * 0.15: return True # 하단 영역 체크 if rect.y1 > page_height * (1 - height_threshold): # 높이가 페이지의 15% 미만인 strip이면 푸터 if img_height < page_height * 0.15: return True return False def should_filter_image(pix, rect, page_rect): """ 이미지를 필터링해야 하는지 종합 판단 Args: pix: PyMuPDF Pixmap rect: 이미지 영역 page_rect: 페이지 전체 영역 Returns: tuple: (필터링 여부, 필터링 사유) """ # 1. 헤더/푸터 영역 체크 if is_header_footer_region(rect, page_rect): return True, "header_footer" # 2. 텍스트 잘림 체크 if has_cut_text_at_boundary(pix): return True, "cut_text" # 3. 장식용 배경 체크 if is_decorative_background(pix): return True, "decorative_background" return False, None def extract_pdf_content(pdf_path, output_md_path, img_dir, metadata): """ PDF 내용 추출 Args: pdf_path: PDF 파일 경로 output_md_path: 출력 MD 파일 경로 img_dir: 이미지 저장 폴더 metadata: 메타데이터 딕셔너리 (폴더 경로, 파일명 등) Returns: image_metadata_list: 추출된 이미지들의 메타데이터 리스트 """ os.makedirs(img_dir, exist_ok=True) image_metadata_list = [] # ★ 이미지 메타데이터 수집 doc = fitz.open(pdf_path) total_pages = len(doc) with open(output_md_path, "w", encoding="utf-8") as md_file: # ★ 메타데이터 헤더 추가 md_file.write(f"---\n") md_file.write(f"source_pdf: {metadata['pdf_name']}\n") md_file.write(f"source_folder: {metadata['relative_folder']}\n") md_file.write(f"total_pages: {total_pages}\n") md_file.write(f"extracted_at: {datetime.now().isoformat()}\n") md_file.write(f"---\n\n") md_file.write(f"# {metadata['pdf_name']}\n\n") for page_num, page in enumerate(doc): md_file.write(f"\n## Page {page_num + 1}\n\n") img_rel_dir = os.path.basename(img_dir) figure_regions = get_figure_rects(page) kept_figures = [] for i, fig in enumerate(figure_regions): rect = fig['rect'] pix_preview = page.get_pixmap(clip=rect, dpi=100, colorspace=fitz.csRGB) ok, nonwhite_ratio, edge_ratio, var = keep_figure(pix_preview) if not ok: continue pix = page.get_pixmap(clip=rect, dpi=150, colorspace=fitz.csRGB) # ★ 추가 필터 적용 (v2.1) should_filter, filter_reason = should_filter_image(pix, rect, page.rect) if should_filter: continue img_name = f"p{page_num + 1:03d}_fig{len(kept_figures):02d}.png" img_path = os.path.join(img_dir, img_name) pix.save(img_path) fig['img_path'] = os.path.join(img_rel_dir, img_name).replace("\\", "/") fig['img_name'] = img_name kept_figures.append(fig) # ★ 이미지 메타데이터 수집 image_metadata_list.append({ "image_file": img_name, "image_path": str(Path(img_dir) / img_name), "type": "figure", "source_pdf": metadata['pdf_name'], "source_folder": metadata['relative_folder'], "full_path": metadata['full_path'], "page": page_num + 1, "total_pages": total_pages, "caption": fig.get('caption_text', ''), "rect": { "x0": round(rect.x0, 2), "y0": round(rect.y0, 2), "x1": round(rect.x1, 2), "y1": round(rect.y1, 2) } }) figure_regions = kept_figures caption_present = any( CAPTION_PATTERN.search((tb[4] or "")) for tb in page.get_text("blocks") ) uncaptioned_idx = 0 items = [] def inside_any_figure(block_rect, figures): for fig in figures: intersect = block_rect & fig["rect"] if intersect.get_area() > 0.5 * block_rect.get_area(): return True return False def is_full_width_rect(r, page_rect): return r.width >= page_rect.width * 0.78 def figure_anchor_rect(fig, page_rect): cap = fig["caption_rect"] rect = fig["rect"] if cap.y0 >= rect.y0: y = max(0.0, cap.y0 - 0.02) else: y = min(page_rect.height - 0.02, cap.y1 + 0.02) return fitz.Rect(cap.x0, y, cap.x1, y + 0.02) for fig in figure_regions: anchor = figure_anchor_rect(fig, page.rect) md = ( f"\n![{fig.get('caption_text', 'Figure')}]({fig['img_path']})\n" f"*{fig.get('caption_text', '')}*\n\n" ) items.append({ "kind": "figure", "rect": anchor, "kind_order": 0, "md": md, }) raw_blocks = page.get_text("dict")["blocks"] for block in raw_blocks: block_rect = fitz.Rect(block["bbox"]) if block.get("type") == 0: if inside_any_figure(block_rect, figure_regions): continue items.append({ "kind": "text", "rect": block_rect, "kind_order": 2, "block": block, }) continue if block.get("type") == 1: if inside_any_figure(block_rect, figure_regions): continue if caption_present: continue page_area = page.rect.get_area() if block_rect.get_area() < page_area * 0.005: continue ratio = block_rect.width / max(1.0, block_rect.height) if ratio < 0.25 or ratio > 4.0: continue pix_preview = page.get_pixmap( clip=block_rect, dpi=80, colorspace=fitz.csRGB ) ok, nonwhite_ratio, edge_ratio, var = keep_figure(pix_preview) if not ok: continue pix = page.get_pixmap( clip=block_rect, dpi=150, colorspace=fitz.csRGB ) # ★ 추가 필터 적용 (v2.1) should_filter, filter_reason = should_filter_image(pix, block_rect, page.rect) if should_filter: continue img_name = f"p{page_num + 1:03d}_photo{uncaptioned_idx:02d}.png" img_path = os.path.join(img_dir, img_name) pix.save(img_path) rel = os.path.join(img_rel_dir, img_name).replace("\\", "/") r = block_rect md = ( f'\n![Photo]({rel})\n' f'*Page {page_num + 1} Photo*\n\n' ) items.append({ "kind": "raster", "rect": block_rect, "kind_order": 1, "md": md, }) # ★ 캡션 없는 이미지 메타데이터 image_metadata_list.append({ "image_file": img_name, "image_path": str(Path(img_dir) / img_name), "type": "photo", "source_pdf": metadata['pdf_name'], "source_folder": metadata['relative_folder'], "full_path": metadata['full_path'], "page": page_num + 1, "total_pages": total_pages, "caption": "", "rect": { "x0": round(r.x0, 2), "y0": round(r.y0, 2), "x1": round(r.x1, 2), "y1": round(r.y1, 2) } }) uncaptioned_idx += 1 continue # 읽기 순서 정렬 text_items = [it for it in items if it["kind"] == "text"] page_w = page.rect.width mid = page_w / 2.0 candidates = [] for it in text_items: r = it["rect"] if is_full_width_rect(r, page.rect): continue if r.width < page_w * 0.2: continue candidates.append(it) left = [it for it in candidates if it["rect"].x0 < mid * 0.95] right = [it for it in candidates if it["rect"].x0 > mid * 1.05] two_cols = len(left) >= 3 and len(right) >= 3 col_y0 = None col_y1 = None seps = [] if two_cols and left and right: col_y0 = min( min(it["rect"].y0 for it in left), min(it["rect"].y0 for it in right), ) col_y1 = max( max(it["rect"].y1 for it in left), max(it["rect"].y1 for it in right), ) for it in text_items: r = it["rect"] if col_y0 < r.y0 < col_y1 and is_full_width_rect(r, page.rect): seps.append(r.y0) seps = sorted(set(seps)) def seg_index(y0, separators): if not separators: return 0 n = 0 for s in separators: if y0 >= s: n += 1 else: break return n def order_key(it): r = it["rect"] if not two_cols: return (r.y0, r.x0, it["kind_order"]) if col_y0 is not None and r.y1 <= col_y0: return (0, r.y0, r.x0, it["kind_order"]) if col_y1 is not None and r.y0 >= col_y1: return (2, r.y0, r.x0, it["kind_order"]) seg = seg_index(r.y0, seps) if is_full_width_rect(r, page.rect): col = 2 else: col = 0 if r.x0 < mid else 1 return (1, seg, col, r.y0, r.x0, it["kind_order"]) items.sort(key=order_key) for it in items: if it["kind"] in ("figure", "raster"): md_file.write(it["md"]) continue block = it["block"] for line in block.get("lines", []): for span in line.get("spans", []): md_file.write(span.get("text", "") + " ") md_file.write("\n") md_file.write("\n") doc.close() return image_metadata_list def process_all_pdfs(): """ BASE_DIR 하위의 모든 PDF를 재귀적으로 처리 폴더 구조를 유지하면서 OUTPUT_BASE에 저장 """ # 출력 폴더 생성 OUTPUT_BASE.mkdir(parents=True, exist_ok=True) # 전체 이미지 메타데이터 수집 all_image_metadata = [] # 처리 통계 stats = { "total_pdfs": 0, "success": 0, "failed": 0, "total_images": 0 } # 실패 로그 failed_files = [] print(f"=" * 60) print(f"PDF 추출 시작") print(f"원본 폴더: {BASE_DIR}") print(f"출력 폴더: {OUTPUT_BASE}") print(f"=" * 60) # 모든 PDF 파일 찾기 pdf_files = list(BASE_DIR.rglob("*.pdf")) stats["total_pdfs"] = len(pdf_files) print(f"\n총 {len(pdf_files)}개 PDF 발견\n") for idx, pdf_path in enumerate(pdf_files, 1): try: # 상대 경로 계산 relative_path = pdf_path.relative_to(BASE_DIR) relative_folder = str(relative_path.parent) if relative_folder == ".": relative_folder = "" pdf_name = pdf_path.name pdf_stem = pdf_path.stem # 출력 경로 설정 (폴더 구조 유지) output_folder = OUTPUT_BASE / relative_path.parent output_folder.mkdir(parents=True, exist_ok=True) output_md = output_folder / f"{pdf_stem}.md" img_folder = output_folder / f"{pdf_stem}_img" # 메타데이터 준비 metadata = { "pdf_name": pdf_name, "pdf_stem": pdf_stem, "relative_folder": relative_folder, "full_path": str(relative_path), } print(f"[{idx}/{len(pdf_files)}] {relative_path}") # PDF 처리 image_metas = extract_pdf_content( str(pdf_path), str(output_md), str(img_folder), metadata ) all_image_metadata.extend(image_metas) stats["success"] += 1 stats["total_images"] += len(image_metas) print(f" ✓ 완료 (이미지 {len(image_metas)}개)") except Exception as e: stats["failed"] += 1 failed_files.append({ "file": str(pdf_path), "error": str(e) }) print(f" ✗ 실패: {e}") # 전체 이미지 메타데이터 저장 meta_output_path = OUTPUT_BASE / "image_metadata.json" with open(meta_output_path, "w", encoding="utf-8") as f: json.dump(all_image_metadata, f, ensure_ascii=False, indent=2) # 처리 요약 저장 summary = { "processed_at": datetime.now().isoformat(), "source_dir": str(BASE_DIR), "output_dir": str(OUTPUT_BASE), "statistics": stats, "failed_files": failed_files } summary_path = OUTPUT_BASE / "extraction_summary.json" with open(summary_path, "w", encoding="utf-8") as f: json.dump(summary, f, ensure_ascii=False, indent=2) # 결과 출력 print(f"\n" + "=" * 60) print(f"추출 완료!") print(f"=" * 60) print(f"총 PDF: {stats['total_pdfs']}개") print(f"성공: {stats['success']}개") print(f"실패: {stats['failed']}개") print(f"추출된 이미지: {stats['total_images']}개") print(f"\n이미지 메타데이터: {meta_output_path}") print(f"처리 요약: {summary_path}") if failed_files: print(f"\n실패한 파일:") for f in failed_files: print(f" - {f['file']}: {f['error']}") if __name__ == "__main__": process_all_pdfs()