Files
test/converters/pipeline/step2_extract.py

789 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
extract_1_v2.py
PDF에서 텍스트(md)와 이미지(png)를 추출
- 하위 폴더 구조 유지
- 이미지 메타데이터 JSON 생성 (폴더경로, 파일명, 페이지, 위치, 캡션 등)
"""
import fitz # PyMuPDF
import os
import re
import json
import numpy as np
from pathlib import Path
from datetime import datetime
from PIL import Image
import io
# ===== OCR 설정 (선택적) =====
try:
import pytesseract
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
TESSERACT_AVAILABLE = True
except ImportError:
TESSERACT_AVAILABLE = False
print("[INFO] pytesseract 미설치 - 텍스트 잘림 필터 비활성화")
# ===== 경로 설정 =====
BASE_DIR = Path(r"D:\for python\geulbeot-light\geulbeot-light\00.test\hwpx\out") # PDF 원본 위치
OUTPUT_BASE = Path(r"D:\for python\geulbeot-light\geulbeot-light\00.test\hwpx\out\out") # 출력 위치
CAPTION_PATTERN = re.compile(
r'^\s*(?:[<\[\(\{]\s*)?(그림|figure|fig)\s*\.?\s*(?:[<\[\(\{]\s*)?0*\d+(?:\s*[-]\s*\d+)?',
re.IGNORECASE
)
def get_figure_rects(page):
"""
Identifies figure regions based on '<그림 N>' captions and vector drawings.
Returns a list of dicts: {'rect': fitz.Rect, 'caption_block': block_index}
"""
drawings = page.get_drawings()
blocks = page.get_text("blocks")
captions = []
for i, b in enumerate(blocks):
text = b[4]
if CAPTION_PATTERN.search(text):
captions.append({'rect': fitz.Rect(b[:4]), 'index': i, 'text': text, 'drawings': []})
if not captions:
return []
filtered_drawings_rects = []
for d in drawings:
r = d["rect"]
if r.height > page.rect.height / 3 and r.width < 5:
continue
if r.width > page.rect.width * 0.9:
continue
filtered_drawings_rects.append(r)
page_area = page.rect.get_area()
img_rects = []
for b in page.get_text("dict")["blocks"]:
if b.get("type") == 1:
ir = fitz.Rect(b["bbox"])
if ir.get_area() < page_area * 0.01:
continue
img_rects.append(ir)
remaining_drawings = filtered_drawings_rects + img_rects
caption_clusters = {cap['index']: [cap['rect']] for cap in captions}
def is_text_between(r1, r2, text_blocks):
if r1.intersects(r2):
return False
union = r1 | r2
for b in text_blocks:
b_rect = fitz.Rect(b[:4])
text_content = b[4]
if len(text_content.strip()) < 20:
continue
if not b_rect.intersects(union):
continue
if b_rect.intersects(r1) or b_rect.intersects(r2):
continue
return True
return False
changed = True
while changed:
changed = False
to_remove = []
for d_rect in remaining_drawings:
best_cluster_key = None
min_dist = float('inf')
for cap_index, cluster_rects in caption_clusters.items():
for r in cluster_rects:
dist = 0
if d_rect.intersects(r):
dist = 0
else:
x_dist = 0
if d_rect.x1 < r.x0: x_dist = r.x0 - d_rect.x1
elif d_rect.x0 > r.x1: x_dist = d_rect.x0 - r.x1
y_dist = 0
if d_rect.y1 < r.y0: y_dist = r.y0 - d_rect.y1
elif d_rect.y0 > r.y1: y_dist = d_rect.y0 - r.y1
if x_dist < 150 and y_dist < 150:
dist = max(x_dist, y_dist) + 0.1
else:
dist = float('inf')
if dist < min_dist:
if not is_text_between(r, d_rect, blocks):
min_dist = dist
best_cluster_key = cap_index
if min_dist == 0:
break
if best_cluster_key is not None and min_dist < 150:
caption_clusters[best_cluster_key].append(d_rect)
to_remove.append(d_rect)
changed = True
for r in to_remove:
remaining_drawings.remove(r)
figure_regions = []
for cap in captions:
cluster_rects = caption_clusters[cap['index']]
content_rects = cluster_rects[1:]
if not content_rects:
continue
union_rect = content_rects[0]
for r in content_rects[1:]:
union_rect = union_rect | r
union_rect.x0 = max(0, union_rect.x0 - 5)
union_rect.x1 = min(page.rect.width, union_rect.x1 + 5)
union_rect.y0 = max(0, union_rect.y0 - 5)
union_rect.y1 = min(page.rect.height, union_rect.y1 + 5)
cap_rect = cap['rect']
if cap_rect.y0 + cap_rect.height/2 < union_rect.y0 + union_rect.height/2:
if union_rect.y0 < cap_rect.y1: union_rect.y0 = cap_rect.y1 + 2
else:
if union_rect.y1 > cap_rect.y0: union_rect.y1 = cap_rect.y0 - 2
area = union_rect.get_area()
page_area = page.rect.get_area()
if area < page_area * 0.01:
continue
if union_rect.height < 20 and union_rect.width > page.rect.width * 0.6:
continue
if union_rect.width < 20 and union_rect.height > page.rect.height * 0.6:
continue
text_blocks = page.get_text("blocks")
text_count = 0
for b in text_blocks:
b_rect = fitz.Rect(b[:4])
if not b_rect.intersects(union_rect):
continue
text = b[4].strip()
if len(text) < 5:
continue
text_count += 1
if text_count < 0:
continue
figure_regions.append({
'rect': union_rect,
'caption_index': cap['index'],
'caption_rect': cap['rect'],
'caption_text': cap['text'].strip() # ★ 캡션 텍스트 저장
})
return figure_regions
def pixmap_metrics(pix):
arr = np.frombuffer(pix.samples, dtype=np.uint8)
c = 4 if pix.alpha else 3
arr = arr.reshape(pix.height, pix.width, c)[:, :, :3]
gray = (0.299 * arr[:, :, 0] + 0.587 * arr[:, :, 1] + 0.114 * arr[:, :, 2]).astype(np.uint8)
white = gray > 245
nonwhite_ratio = float(1.0 - white.mean())
gx = np.abs(np.diff(gray.astype(np.int16), axis=1))
gy = np.abs(np.diff(gray.astype(np.int16), axis=0))
edge = (gx[:-1, :] + gy[:, :-1]) > 40
edge_ratio = float(edge.mean())
var = float(gray.var())
return nonwhite_ratio, edge_ratio, var
def keep_figure(pix):
nonwhite_ratio, edge_ratio, var = pixmap_metrics(pix)
if nonwhite_ratio < 0.004:
return False, nonwhite_ratio, edge_ratio, var
if nonwhite_ratio < 0.012 and edge_ratio < 0.004 and var < 20:
return False, nonwhite_ratio, edge_ratio, var
return True, nonwhite_ratio, edge_ratio, var
# ===== 추가 이미지 필터 함수들 (v2.1) =====
def pix_to_pil(pix):
"""PyMuPDF Pixmap을 PIL Image로 변환"""
img_data = pix.tobytes("png")
return Image.open(io.BytesIO(img_data))
def has_cut_text_at_boundary(pix, margin=5):
"""
이미지 경계에서 텍스트가 잘렸는지 감지
- 이미지 테두리 근처에 텍스트 박스가 있으면 잘린 것으로 판단
Args:
pix: PyMuPDF Pixmap
margin: 경계로부터의 여유 픽셀 (기본 5px)
Returns:
bool: 텍스트가 잘렸으면 True
"""
if not TESSERACT_AVAILABLE:
return False # OCR 없으면 필터 비활성화
try:
img = pix_to_pil(pix)
width, height = img.size
# OCR로 텍스트 위치 추출
data = pytesseract.image_to_data(img, lang='kor+eng', output_type=pytesseract.Output.DICT)
for i, text in enumerate(data['text']):
text = str(text).strip()
if len(text) < 2: # 너무 짧은 텍스트는 무시
continue
x = data['left'][i]
y = data['top'][i]
w = data['width'][i]
h = data['height'][i]
# 텍스트가 이미지 경계에 너무 가까우면 = 잘린 것
# 왼쪽 경계
if x <= margin:
return True
# 오른쪽 경계
if x + w >= width - margin:
return True
# 상단 경계 (헤더 제외를 위해 좀 더 여유)
if y <= margin and h < height * 0.3:
return True
# 하단 경계
if y + h >= height - margin:
return True
return False
except Exception as e:
# OCR 실패 시 필터 통과 (이미지 유지)
return False
def is_decorative_background(pix, edge_threshold=0.02, color_var_threshold=500):
"""
배경 패턴 + 텍스트만 있는 장식용 이미지인지 감지
- 엣지가 적고 (복잡한 도표/사진이 아님)
- 색상 다양성이 낮으면 (단순 그라데이션 배경)
Args:
pix: PyMuPDF Pixmap
edge_threshold: 엣지 비율 임계값 (기본 0.02 = 2%)
color_var_threshold: 색상 분산 임계값
Returns:
bool: 장식용 배경이면 True
"""
try:
nonwhite_ratio, edge_ratio, var = pixmap_metrics(pix)
# 엣지가 거의 없고 (단순한 이미지)
# 색상 분산도 낮으면 (배경 패턴)
if edge_ratio < edge_threshold and var < color_var_threshold:
# 추가 확인: 텍스트만 있는지 OCR로 체크
if TESSERACT_AVAILABLE:
try:
img = pix_to_pil(pix)
text = pytesseract.image_to_string(img, lang='kor+eng').strip()
# 텍스트가 있고, 이미지가 단순하면 = 텍스트 배경
if len(text) > 3 and edge_ratio < 0.015:
return True
except:
pass
return True
return False
except Exception:
return False
def is_header_footer_region(rect, page_rect, height_threshold=0.12):
"""
헤더/푸터 영역에 있는 이미지인지 감지
- 페이지 상단 12% 또는 하단 12%에 위치
- 높이가 낮은 strip 형태
Args:
rect: 이미지 영역 (fitz.Rect)
page_rect: 페이지 전체 영역 (fitz.Rect)
height_threshold: 헤더/푸터 영역 비율 (기본 12%)
Returns:
bool: 헤더/푸터 영역이면 True
"""
page_height = page_rect.height
img_height = rect.height
# 상단 영역 체크
if rect.y0 < page_height * height_threshold:
# 높이가 페이지의 15% 미만인 strip이면 헤더
if img_height < page_height * 0.15:
return True
# 하단 영역 체크
if rect.y1 > page_height * (1 - height_threshold):
# 높이가 페이지의 15% 미만인 strip이면 푸터
if img_height < page_height * 0.15:
return True
return False
def should_filter_image(pix, rect, page_rect):
"""
이미지를 필터링해야 하는지 종합 판단
Args:
pix: PyMuPDF Pixmap
rect: 이미지 영역
page_rect: 페이지 전체 영역
Returns:
tuple: (필터링 여부, 필터링 사유)
"""
# 1. 헤더/푸터 영역 체크
if is_header_footer_region(rect, page_rect):
return True, "header_footer"
# 2. 텍스트 잘림 체크
if has_cut_text_at_boundary(pix):
return True, "cut_text"
# 3. 장식용 배경 체크
if is_decorative_background(pix):
return True, "decorative_background"
return False, None
def extract_pdf_content(pdf_path, output_md_path, img_dir, metadata):
"""
PDF 내용 추출
Args:
pdf_path: PDF 파일 경로
output_md_path: 출력 MD 파일 경로
img_dir: 이미지 저장 폴더
metadata: 메타데이터 딕셔너리 (폴더 경로, 파일명 등)
Returns:
image_metadata_list: 추출된 이미지들의 메타데이터 리스트
"""
os.makedirs(img_dir, exist_ok=True)
image_metadata_list = [] # ★ 이미지 메타데이터 수집
doc = fitz.open(pdf_path)
total_pages = len(doc)
with open(output_md_path, "w", encoding="utf-8") as md_file:
# ★ 메타데이터 헤더 추가
md_file.write(f"---\n")
md_file.write(f"source_pdf: {metadata['pdf_name']}\n")
md_file.write(f"source_folder: {metadata['relative_folder']}\n")
md_file.write(f"total_pages: {total_pages}\n")
md_file.write(f"extracted_at: {datetime.now().isoformat()}\n")
md_file.write(f"---\n\n")
md_file.write(f"# {metadata['pdf_name']}\n\n")
for page_num, page in enumerate(doc):
md_file.write(f"\n## Page {page_num + 1}\n\n")
img_rel_dir = os.path.basename(img_dir)
figure_regions = get_figure_rects(page)
kept_figures = []
for i, fig in enumerate(figure_regions):
rect = fig['rect']
pix_preview = page.get_pixmap(clip=rect, dpi=100, colorspace=fitz.csRGB)
ok, nonwhite_ratio, edge_ratio, var = keep_figure(pix_preview)
if not ok:
continue
pix = page.get_pixmap(clip=rect, dpi=150, colorspace=fitz.csRGB)
# ★ 추가 필터 적용 (v2.1)
should_filter, filter_reason = should_filter_image(pix, rect, page.rect)
if should_filter:
continue
img_name = f"p{page_num + 1:03d}_fig{len(kept_figures):02d}.png"
img_path = os.path.join(img_dir, img_name)
pix.save(img_path)
fig['img_path'] = os.path.join(img_rel_dir, img_name).replace("\\", "/")
fig['img_name'] = img_name
kept_figures.append(fig)
# ★ 이미지 메타데이터 수집
image_metadata_list.append({
"image_file": img_name,
"image_path": str(Path(img_dir) / img_name),
"type": "figure",
"source_pdf": metadata['pdf_name'],
"source_folder": metadata['relative_folder'],
"full_path": metadata['full_path'],
"page": page_num + 1,
"total_pages": total_pages,
"caption": fig.get('caption_text', ''),
"rect": {
"x0": round(rect.x0, 2),
"y0": round(rect.y0, 2),
"x1": round(rect.x1, 2),
"y1": round(rect.y1, 2)
}
})
figure_regions = kept_figures
caption_present = any(
CAPTION_PATTERN.search((tb[4] or "")) for tb in page.get_text("blocks")
)
uncaptioned_idx = 0
items = []
def inside_any_figure(block_rect, figures):
for fig in figures:
intersect = block_rect & fig["rect"]
if intersect.get_area() > 0.5 * block_rect.get_area():
return True
return False
def is_full_width_rect(r, page_rect):
return r.width >= page_rect.width * 0.78
def figure_anchor_rect(fig, page_rect):
cap = fig["caption_rect"]
rect = fig["rect"]
if cap.y0 >= rect.y0:
y = max(0.0, cap.y0 - 0.02)
else:
y = min(page_rect.height - 0.02, cap.y1 + 0.02)
return fitz.Rect(cap.x0, y, cap.x1, y + 0.02)
for fig in figure_regions:
anchor = figure_anchor_rect(fig, page.rect)
md = (
f"\n![{fig.get('caption_text', 'Figure')}]({fig['img_path']})\n"
f"*{fig.get('caption_text', '')}*\n\n"
)
items.append({
"kind": "figure",
"rect": anchor,
"kind_order": 0,
"md": md,
})
raw_blocks = page.get_text("dict")["blocks"]
for block in raw_blocks:
block_rect = fitz.Rect(block["bbox"])
if block.get("type") == 0:
if inside_any_figure(block_rect, figure_regions):
continue
items.append({
"kind": "text",
"rect": block_rect,
"kind_order": 2,
"block": block,
})
continue
if block.get("type") == 1:
if inside_any_figure(block_rect, figure_regions):
continue
if caption_present:
continue
page_area = page.rect.get_area()
if block_rect.get_area() < page_area * 0.005:
continue
ratio = block_rect.width / max(1.0, block_rect.height)
if ratio < 0.25 or ratio > 4.0:
continue
pix_preview = page.get_pixmap(
clip=block_rect, dpi=80, colorspace=fitz.csRGB
)
ok, nonwhite_ratio, edge_ratio, var = keep_figure(pix_preview)
if not ok:
continue
pix = page.get_pixmap(
clip=block_rect, dpi=150, colorspace=fitz.csRGB
)
# ★ 추가 필터 적용 (v2.1)
should_filter, filter_reason = should_filter_image(pix, block_rect, page.rect)
if should_filter:
continue
img_name = f"p{page_num + 1:03d}_photo{uncaptioned_idx:02d}.png"
img_path = os.path.join(img_dir, img_name)
pix.save(img_path)
rel = os.path.join(img_rel_dir, img_name).replace("\\", "/")
r = block_rect
md = (
f'\n![Photo]({rel})\n'
f'*Page {page_num + 1} Photo*\n\n'
)
items.append({
"kind": "raster",
"rect": block_rect,
"kind_order": 1,
"md": md,
})
# ★ 캡션 없는 이미지 메타데이터
image_metadata_list.append({
"image_file": img_name,
"image_path": str(Path(img_dir) / img_name),
"type": "photo",
"source_pdf": metadata['pdf_name'],
"source_folder": metadata['relative_folder'],
"full_path": metadata['full_path'],
"page": page_num + 1,
"total_pages": total_pages,
"caption": "",
"rect": {
"x0": round(r.x0, 2),
"y0": round(r.y0, 2),
"x1": round(r.x1, 2),
"y1": round(r.y1, 2)
}
})
uncaptioned_idx += 1
continue
# 읽기 순서 정렬
text_items = [it for it in items if it["kind"] == "text"]
page_w = page.rect.width
mid = page_w / 2.0
candidates = []
for it in text_items:
r = it["rect"]
if is_full_width_rect(r, page.rect):
continue
if r.width < page_w * 0.2:
continue
candidates.append(it)
left = [it for it in candidates if it["rect"].x0 < mid * 0.95]
right = [it for it in candidates if it["rect"].x0 > mid * 1.05]
two_cols = len(left) >= 3 and len(right) >= 3
col_y0 = None
col_y1 = None
seps = []
if two_cols and left and right:
col_y0 = min(
min(it["rect"].y0 for it in left),
min(it["rect"].y0 for it in right),
)
col_y1 = max(
max(it["rect"].y1 for it in left),
max(it["rect"].y1 for it in right),
)
for it in text_items:
r = it["rect"]
if col_y0 < r.y0 < col_y1 and is_full_width_rect(r, page.rect):
seps.append(r.y0)
seps = sorted(set(seps))
def seg_index(y0, separators):
if not separators:
return 0
n = 0
for s in separators:
if y0 >= s:
n += 1
else:
break
return n
def order_key(it):
r = it["rect"]
if not two_cols:
return (r.y0, r.x0, it["kind_order"])
if col_y0 is not None and r.y1 <= col_y0:
return (0, r.y0, r.x0, it["kind_order"])
if col_y1 is not None and r.y0 >= col_y1:
return (2, r.y0, r.x0, it["kind_order"])
seg = seg_index(r.y0, seps)
if is_full_width_rect(r, page.rect):
col = 2
else:
col = 0 if r.x0 < mid else 1
return (1, seg, col, r.y0, r.x0, it["kind_order"])
items.sort(key=order_key)
for it in items:
if it["kind"] in ("figure", "raster"):
md_file.write(it["md"])
continue
block = it["block"]
for line in block.get("lines", []):
for span in line.get("spans", []):
md_file.write(span.get("text", "") + " ")
md_file.write("\n")
md_file.write("\n")
doc.close()
return image_metadata_list
def process_all_pdfs():
"""
BASE_DIR 하위의 모든 PDF를 재귀적으로 처리
폴더 구조를 유지하면서 OUTPUT_BASE에 저장
"""
# 출력 폴더 생성
OUTPUT_BASE.mkdir(parents=True, exist_ok=True)
# 전체 이미지 메타데이터 수집
all_image_metadata = []
# 처리 통계
stats = {
"total_pdfs": 0,
"success": 0,
"failed": 0,
"total_images": 0
}
# 실패 로그
failed_files = []
print(f"=" * 60)
print(f"PDF 추출 시작")
print(f"원본 폴더: {BASE_DIR}")
print(f"출력 폴더: {OUTPUT_BASE}")
print(f"=" * 60)
# 모든 PDF 파일 찾기
pdf_files = list(BASE_DIR.rglob("*.pdf"))
stats["total_pdfs"] = len(pdf_files)
print(f"\n{len(pdf_files)}개 PDF 발견\n")
for idx, pdf_path in enumerate(pdf_files, 1):
try:
# 상대 경로 계산
relative_path = pdf_path.relative_to(BASE_DIR)
relative_folder = str(relative_path.parent)
if relative_folder == ".":
relative_folder = ""
pdf_name = pdf_path.name
pdf_stem = pdf_path.stem
# 출력 경로 설정 (폴더 구조 유지)
output_folder = OUTPUT_BASE / relative_path.parent
output_folder.mkdir(parents=True, exist_ok=True)
output_md = output_folder / f"{pdf_stem}.md"
img_folder = output_folder / f"{pdf_stem}_img"
# 메타데이터 준비
metadata = {
"pdf_name": pdf_name,
"pdf_stem": pdf_stem,
"relative_folder": relative_folder,
"full_path": str(relative_path),
}
print(f"[{idx}/{len(pdf_files)}] {relative_path}")
# PDF 처리
image_metas = extract_pdf_content(
str(pdf_path),
str(output_md),
str(img_folder),
metadata
)
all_image_metadata.extend(image_metas)
stats["success"] += 1
stats["total_images"] += len(image_metas)
print(f" ✓ 완료 (이미지 {len(image_metas)}개)")
except Exception as e:
stats["failed"] += 1
failed_files.append({
"file": str(pdf_path),
"error": str(e)
})
print(f" ✗ 실패: {e}")
# 전체 이미지 메타데이터 저장
meta_output_path = OUTPUT_BASE / "image_metadata.json"
with open(meta_output_path, "w", encoding="utf-8") as f:
json.dump(all_image_metadata, f, ensure_ascii=False, indent=2)
# 처리 요약 저장
summary = {
"processed_at": datetime.now().isoformat(),
"source_dir": str(BASE_DIR),
"output_dir": str(OUTPUT_BASE),
"statistics": stats,
"failed_files": failed_files
}
summary_path = OUTPUT_BASE / "extraction_summary.json"
with open(summary_path, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
# 결과 출력
print(f"\n" + "=" * 60)
print(f"추출 완료!")
print(f"=" * 60)
print(f"총 PDF: {stats['total_pdfs']}")
print(f"성공: {stats['success']}")
print(f"실패: {stats['failed']}")
print(f"추출된 이미지: {stats['total_images']}")
print(f"\n이미지 메타데이터: {meta_output_path}")
print(f"처리 요약: {summary_path}")
if failed_files:
print(f"\n실패한 파일:")
for f in failed_files:
print(f" - {f['file']}: {f['error']}")
if __name__ == "__main__":
process_all_pdfs()