Cleanup: Deleting 03.Code/업로드용/converters/pipeline/step2_extract.py
This commit is contained in:
@@ -1,791 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
extract_1_v2.py
|
||||
|
||||
PDF에서 텍스트(md)와 이미지(png)를 추출
|
||||
- 하위 폴더 구조 유지
|
||||
- 이미지 메타데이터 JSON 생성 (폴더경로, 파일명, 페이지, 위치, 캡션 등)
|
||||
"""
|
||||
|
||||
import fitz # PyMuPDF
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from PIL import Image
|
||||
import io
|
||||
|
||||
# ===== OCR 설정 (선택적) =====
|
||||
try:
|
||||
import pytesseract
|
||||
import shutil
|
||||
tesseract_path = shutil.which("tesseract")
|
||||
if tesseract_path:
|
||||
pytesseract.pytesseract.tesseract_cmd = tesseract_path
|
||||
TESSERACT_AVAILABLE = True
|
||||
except ImportError:
|
||||
TESSERACT_AVAILABLE = False
|
||||
print("[INFO] pytesseract 미설치 - 텍스트 잘림 필터 비활성화")
|
||||
|
||||
|
||||
CAPTION_PATTERN = re.compile(
|
||||
r'^\s*(?:[<\[\(\{]\s*)?(그림|figure|fig)\s*\.?\s*(?:[<\[\(\{]\s*)?0*\d+(?:\s*[-–]\s*\d+)?',
|
||||
re.IGNORECASE
|
||||
)
|
||||
|
||||
|
||||
def get_figure_rects(page):
|
||||
"""
|
||||
Identifies figure regions based on '<그림 N>' captions and vector drawings.
|
||||
Returns a list of dicts: {'rect': fitz.Rect, 'caption_block': block_index}
|
||||
"""
|
||||
drawings = page.get_drawings()
|
||||
|
||||
blocks = page.get_text("blocks")
|
||||
captions = []
|
||||
|
||||
for i, b in enumerate(blocks):
|
||||
text = b[4]
|
||||
if CAPTION_PATTERN.search(text):
|
||||
captions.append({'rect': fitz.Rect(b[:4]), 'index': i, 'text': text, 'drawings': []})
|
||||
|
||||
if not captions:
|
||||
return []
|
||||
|
||||
filtered_drawings_rects = []
|
||||
for d in drawings:
|
||||
r = d["rect"]
|
||||
if r.height > page.rect.height / 3 and r.width < 5:
|
||||
continue
|
||||
if r.width > page.rect.width * 0.9:
|
||||
continue
|
||||
filtered_drawings_rects.append(r)
|
||||
|
||||
page_area = page.rect.get_area()
|
||||
img_rects = []
|
||||
for b in page.get_text("dict")["blocks"]:
|
||||
if b.get("type") == 1:
|
||||
ir = fitz.Rect(b["bbox"])
|
||||
if ir.get_area() < page_area * 0.01:
|
||||
continue
|
||||
img_rects.append(ir)
|
||||
|
||||
remaining_drawings = filtered_drawings_rects + img_rects
|
||||
caption_clusters = {cap['index']: [cap['rect']] for cap in captions}
|
||||
|
||||
def is_text_between(r1, r2, text_blocks):
|
||||
if r1.intersects(r2):
|
||||
return False
|
||||
union = r1 | r2
|
||||
for b in text_blocks:
|
||||
b_rect = fitz.Rect(b[:4])
|
||||
text_content = b[4]
|
||||
if len(text_content.strip()) < 20:
|
||||
continue
|
||||
if not b_rect.intersects(union):
|
||||
continue
|
||||
if b_rect.intersects(r1) or b_rect.intersects(r2):
|
||||
continue
|
||||
return True
|
||||
return False
|
||||
|
||||
changed = True
|
||||
while changed:
|
||||
changed = False
|
||||
to_remove = []
|
||||
|
||||
for d_rect in remaining_drawings:
|
||||
best_cluster_key = None
|
||||
min_dist = float('inf')
|
||||
|
||||
for cap_index, cluster_rects in caption_clusters.items():
|
||||
for r in cluster_rects:
|
||||
dist = 0
|
||||
if d_rect.intersects(r):
|
||||
dist = 0
|
||||
else:
|
||||
x_dist = 0
|
||||
if d_rect.x1 < r.x0: x_dist = r.x0 - d_rect.x1
|
||||
elif d_rect.x0 > r.x1: x_dist = d_rect.x0 - r.x1
|
||||
|
||||
y_dist = 0
|
||||
if d_rect.y1 < r.y0: y_dist = r.y0 - d_rect.y1
|
||||
elif d_rect.y0 > r.y1: y_dist = d_rect.y0 - r.y1
|
||||
|
||||
if x_dist < 150 and y_dist < 150:
|
||||
dist = max(x_dist, y_dist) + 0.1
|
||||
else:
|
||||
dist = float('inf')
|
||||
|
||||
if dist < min_dist:
|
||||
if not is_text_between(r, d_rect, blocks):
|
||||
min_dist = dist
|
||||
best_cluster_key = cap_index
|
||||
|
||||
if min_dist == 0:
|
||||
break
|
||||
|
||||
if best_cluster_key is not None and min_dist < 150:
|
||||
caption_clusters[best_cluster_key].append(d_rect)
|
||||
to_remove.append(d_rect)
|
||||
changed = True
|
||||
|
||||
for r in to_remove:
|
||||
remaining_drawings.remove(r)
|
||||
|
||||
figure_regions = []
|
||||
|
||||
for cap in captions:
|
||||
cluster_rects = caption_clusters[cap['index']]
|
||||
content_rects = cluster_rects[1:]
|
||||
|
||||
if not content_rects:
|
||||
continue
|
||||
|
||||
union_rect = content_rects[0]
|
||||
for r in content_rects[1:]:
|
||||
union_rect = union_rect | r
|
||||
|
||||
union_rect.x0 = max(0, union_rect.x0 - 5)
|
||||
union_rect.x1 = min(page.rect.width, union_rect.x1 + 5)
|
||||
union_rect.y0 = max(0, union_rect.y0 - 5)
|
||||
union_rect.y1 = min(page.rect.height, union_rect.y1 + 5)
|
||||
|
||||
cap_rect = cap['rect']
|
||||
|
||||
if cap_rect.y0 + cap_rect.height/2 < union_rect.y0 + union_rect.height/2:
|
||||
if union_rect.y0 < cap_rect.y1: union_rect.y0 = cap_rect.y1 + 2
|
||||
else:
|
||||
if union_rect.y1 > cap_rect.y0: union_rect.y1 = cap_rect.y0 - 2
|
||||
|
||||
area = union_rect.get_area()
|
||||
page_area = page.rect.get_area()
|
||||
|
||||
if area < page_area * 0.01:
|
||||
continue
|
||||
|
||||
if union_rect.height < 20 and union_rect.width > page.rect.width * 0.6:
|
||||
continue
|
||||
if union_rect.width < 20 and union_rect.height > page.rect.height * 0.6:
|
||||
continue
|
||||
|
||||
text_blocks = page.get_text("blocks")
|
||||
text_count = 0
|
||||
|
||||
for b in text_blocks:
|
||||
b_rect = fitz.Rect(b[:4])
|
||||
if not b_rect.intersects(union_rect):
|
||||
continue
|
||||
text = b[4].strip()
|
||||
if len(text) < 5:
|
||||
continue
|
||||
text_count += 1
|
||||
|
||||
if text_count < 0:
|
||||
continue
|
||||
|
||||
figure_regions.append({
|
||||
'rect': union_rect,
|
||||
'caption_index': cap['index'],
|
||||
'caption_rect': cap['rect'],
|
||||
'caption_text': cap['text'].strip() # ★ 캡션 텍스트 저장
|
||||
})
|
||||
|
||||
return figure_regions
|
||||
|
||||
|
||||
def pixmap_metrics(pix):
|
||||
arr = np.frombuffer(pix.samples, dtype=np.uint8)
|
||||
c = 4 if pix.alpha else 3
|
||||
arr = arr.reshape(pix.height, pix.width, c)[:, :, :3]
|
||||
gray = (0.299 * arr[:, :, 0] + 0.587 * arr[:, :, 1] + 0.114 * arr[:, :, 2]).astype(np.uint8)
|
||||
white = gray > 245
|
||||
nonwhite_ratio = float(1.0 - white.mean())
|
||||
gx = np.abs(np.diff(gray.astype(np.int16), axis=1))
|
||||
gy = np.abs(np.diff(gray.astype(np.int16), axis=0))
|
||||
edge = (gx[:-1, :] + gy[:, :-1]) > 40
|
||||
edge_ratio = float(edge.mean())
|
||||
var = float(gray.var())
|
||||
return nonwhite_ratio, edge_ratio, var
|
||||
|
||||
|
||||
def keep_figure(pix):
|
||||
nonwhite_ratio, edge_ratio, var = pixmap_metrics(pix)
|
||||
if nonwhite_ratio < 0.004:
|
||||
return False, nonwhite_ratio, edge_ratio, var
|
||||
if nonwhite_ratio < 0.012 and edge_ratio < 0.004 and var < 20:
|
||||
return False, nonwhite_ratio, edge_ratio, var
|
||||
return True, nonwhite_ratio, edge_ratio, var
|
||||
|
||||
|
||||
# ===== 추가 이미지 필터 함수들 (v2.1) =====
|
||||
|
||||
def pix_to_pil(pix):
|
||||
"""PyMuPDF Pixmap을 PIL Image로 변환"""
|
||||
img_data = pix.tobytes("png")
|
||||
return Image.open(io.BytesIO(img_data))
|
||||
|
||||
|
||||
def has_cut_text_at_boundary(pix, margin=5):
|
||||
"""
|
||||
이미지 경계에서 텍스트가 잘렸는지 감지
|
||||
- 이미지 테두리 근처에 텍스트 박스가 있으면 잘린 것으로 판단
|
||||
|
||||
Args:
|
||||
pix: PyMuPDF Pixmap
|
||||
margin: 경계로부터의 여유 픽셀 (기본 5px)
|
||||
|
||||
Returns:
|
||||
bool: 텍스트가 잘렸으면 True
|
||||
"""
|
||||
if not TESSERACT_AVAILABLE:
|
||||
return False # OCR 없으면 필터 비활성화
|
||||
|
||||
try:
|
||||
img = pix_to_pil(pix)
|
||||
width, height = img.size
|
||||
|
||||
# OCR로 텍스트 위치 추출
|
||||
data = pytesseract.image_to_data(img, lang='kor+eng', output_type=pytesseract.Output.DICT)
|
||||
|
||||
for i, text in enumerate(data['text']):
|
||||
text = str(text).strip()
|
||||
if len(text) < 2: # 너무 짧은 텍스트는 무시
|
||||
continue
|
||||
|
||||
x = data['left'][i]
|
||||
y = data['top'][i]
|
||||
w = data['width'][i]
|
||||
h = data['height'][i]
|
||||
|
||||
# 텍스트가 이미지 경계에 너무 가까우면 = 잘린 것
|
||||
# 왼쪽 경계
|
||||
if x <= margin:
|
||||
return True
|
||||
# 오른쪽 경계
|
||||
if x + w >= width - margin:
|
||||
return True
|
||||
# 상단 경계 (헤더 제외를 위해 좀 더 여유)
|
||||
if y <= margin and h < height * 0.3:
|
||||
return True
|
||||
# 하단 경계
|
||||
if y + h >= height - margin:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
# OCR 실패 시 필터 통과 (이미지 유지)
|
||||
return False
|
||||
|
||||
|
||||
def is_decorative_background(pix, edge_threshold=0.02, color_var_threshold=500):
|
||||
"""
|
||||
배경 패턴 + 텍스트만 있는 장식용 이미지인지 감지
|
||||
- 엣지가 적고 (복잡한 도표/사진이 아님)
|
||||
- 색상 다양성이 낮으면 (단순 그라데이션 배경)
|
||||
|
||||
Args:
|
||||
pix: PyMuPDF Pixmap
|
||||
edge_threshold: 엣지 비율 임계값 (기본 0.02 = 2%)
|
||||
color_var_threshold: 색상 분산 임계값
|
||||
|
||||
Returns:
|
||||
bool: 장식용 배경이면 True
|
||||
"""
|
||||
try:
|
||||
nonwhite_ratio, edge_ratio, var = pixmap_metrics(pix)
|
||||
|
||||
# 엣지가 거의 없고 (단순한 이미지)
|
||||
# 색상 분산도 낮으면 (배경 패턴)
|
||||
if edge_ratio < edge_threshold and var < color_var_threshold:
|
||||
# 추가 확인: 텍스트만 있는지 OCR로 체크
|
||||
if TESSERACT_AVAILABLE:
|
||||
try:
|
||||
img = pix_to_pil(pix)
|
||||
text = pytesseract.image_to_string(img, lang='kor+eng').strip()
|
||||
|
||||
# 텍스트가 있고, 이미지가 단순하면 = 텍스트 배경
|
||||
if len(text) > 3 and edge_ratio < 0.015:
|
||||
return True
|
||||
except:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def is_header_footer_region(rect, page_rect, height_threshold=0.12):
|
||||
"""
|
||||
헤더/푸터 영역에 있는 이미지인지 감지
|
||||
- 페이지 상단 12% 또는 하단 12%에 위치
|
||||
- 높이가 낮은 strip 형태
|
||||
|
||||
Args:
|
||||
rect: 이미지 영역 (fitz.Rect)
|
||||
page_rect: 페이지 전체 영역 (fitz.Rect)
|
||||
height_threshold: 헤더/푸터 영역 비율 (기본 12%)
|
||||
|
||||
Returns:
|
||||
bool: 헤더/푸터 영역이면 True
|
||||
"""
|
||||
page_height = page_rect.height
|
||||
img_height = rect.height
|
||||
|
||||
# 상단 영역 체크
|
||||
if rect.y0 < page_height * height_threshold:
|
||||
# 높이가 페이지의 15% 미만인 strip이면 헤더
|
||||
if img_height < page_height * 0.15:
|
||||
return True
|
||||
|
||||
# 하단 영역 체크
|
||||
if rect.y1 > page_height * (1 - height_threshold):
|
||||
# 높이가 페이지의 15% 미만인 strip이면 푸터
|
||||
if img_height < page_height * 0.15:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def should_filter_image(pix, rect, page_rect):
|
||||
"""
|
||||
이미지를 필터링해야 하는지 종합 판단
|
||||
|
||||
Args:
|
||||
pix: PyMuPDF Pixmap
|
||||
rect: 이미지 영역
|
||||
page_rect: 페이지 전체 영역
|
||||
|
||||
Returns:
|
||||
tuple: (필터링 여부, 필터링 사유)
|
||||
"""
|
||||
# 1. 헤더/푸터 영역 체크
|
||||
if is_header_footer_region(rect, page_rect):
|
||||
return True, "header_footer"
|
||||
|
||||
# 2. 텍스트 잘림 체크
|
||||
if has_cut_text_at_boundary(pix):
|
||||
return True, "cut_text"
|
||||
|
||||
# 3. 장식용 배경 체크
|
||||
if is_decorative_background(pix):
|
||||
return True, "decorative_background"
|
||||
|
||||
return False, None
|
||||
|
||||
|
||||
def extract_pdf_content(pdf_path, output_md_path, img_dir, metadata):
|
||||
"""
|
||||
PDF 내용 추출
|
||||
|
||||
Args:
|
||||
pdf_path: PDF 파일 경로
|
||||
output_md_path: 출력 MD 파일 경로
|
||||
img_dir: 이미지 저장 폴더
|
||||
metadata: 메타데이터 딕셔너리 (폴더 경로, 파일명 등)
|
||||
|
||||
Returns:
|
||||
image_metadata_list: 추출된 이미지들의 메타데이터 리스트
|
||||
"""
|
||||
os.makedirs(img_dir, exist_ok=True)
|
||||
|
||||
image_metadata_list = [] # ★ 이미지 메타데이터 수집
|
||||
|
||||
doc = fitz.open(pdf_path)
|
||||
total_pages = len(doc)
|
||||
|
||||
with open(output_md_path, "w", encoding="utf-8") as md_file:
|
||||
# ★ 메타데이터 헤더 추가
|
||||
md_file.write(f"---\n")
|
||||
md_file.write(f"source_pdf: {metadata['pdf_name']}\n")
|
||||
md_file.write(f"source_folder: {metadata['relative_folder']}\n")
|
||||
md_file.write(f"total_pages: {total_pages}\n")
|
||||
md_file.write(f"extracted_at: {datetime.now().isoformat()}\n")
|
||||
md_file.write(f"---\n\n")
|
||||
md_file.write(f"# {metadata['pdf_name']}\n\n")
|
||||
|
||||
for page_num, page in enumerate(doc):
|
||||
md_file.write(f"\n## Page {page_num + 1}\n\n")
|
||||
img_rel_dir = os.path.basename(img_dir)
|
||||
|
||||
figure_regions = get_figure_rects(page)
|
||||
|
||||
kept_figures = []
|
||||
for i, fig in enumerate(figure_regions):
|
||||
rect = fig['rect']
|
||||
pix_preview = page.get_pixmap(clip=rect, dpi=100, colorspace=fitz.csRGB)
|
||||
ok, nonwhite_ratio, edge_ratio, var = keep_figure(pix_preview)
|
||||
if not ok:
|
||||
continue
|
||||
|
||||
pix = page.get_pixmap(clip=rect, dpi=150, colorspace=fitz.csRGB)
|
||||
|
||||
# ★ 추가 필터 적용 (v2.1)
|
||||
should_filter, filter_reason = should_filter_image(pix, rect, page.rect)
|
||||
if should_filter:
|
||||
continue
|
||||
|
||||
img_name = f"p{page_num + 1:03d}_fig{len(kept_figures):02d}.png"
|
||||
img_path = os.path.join(img_dir, img_name)
|
||||
pix.save(img_path)
|
||||
|
||||
fig['img_path'] = os.path.join(img_rel_dir, img_name).replace("\\", "/")
|
||||
fig['img_name'] = img_name
|
||||
kept_figures.append(fig)
|
||||
|
||||
# ★ 이미지 메타데이터 수집
|
||||
image_metadata_list.append({
|
||||
"image_file": img_name,
|
||||
"image_path": str(Path(img_dir) / img_name),
|
||||
"type": "figure",
|
||||
"source_pdf": metadata['pdf_name'],
|
||||
"source_folder": metadata['relative_folder'],
|
||||
"full_path": metadata['full_path'],
|
||||
"page": page_num + 1,
|
||||
"total_pages": total_pages,
|
||||
"caption": fig.get('caption_text', ''),
|
||||
"rect": {
|
||||
"x0": round(rect.x0, 2),
|
||||
"y0": round(rect.y0, 2),
|
||||
"x1": round(rect.x1, 2),
|
||||
"y1": round(rect.y1, 2)
|
||||
}
|
||||
})
|
||||
|
||||
figure_regions = kept_figures
|
||||
|
||||
caption_present = any(
|
||||
CAPTION_PATTERN.search((tb[4] or "")) for tb in page.get_text("blocks")
|
||||
)
|
||||
uncaptioned_idx = 0
|
||||
|
||||
items = []
|
||||
|
||||
def inside_any_figure(block_rect, figures):
|
||||
for fig in figures:
|
||||
intersect = block_rect & fig["rect"]
|
||||
if intersect.get_area() > 0.5 * block_rect.get_area():
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_full_width_rect(r, page_rect):
|
||||
return r.width >= page_rect.width * 0.78
|
||||
|
||||
def figure_anchor_rect(fig, page_rect):
|
||||
cap = fig["caption_rect"]
|
||||
rect = fig["rect"]
|
||||
if cap.y0 >= rect.y0:
|
||||
y = max(0.0, cap.y0 - 0.02)
|
||||
else:
|
||||
y = min(page_rect.height - 0.02, cap.y1 + 0.02)
|
||||
return fitz.Rect(cap.x0, y, cap.x1, y + 0.02)
|
||||
|
||||
for fig in figure_regions:
|
||||
anchor = figure_anchor_rect(fig, page.rect)
|
||||
md = (
|
||||
f"\n\n"
|
||||
f"*{fig.get('caption_text', '')}*\n\n"
|
||||
)
|
||||
items.append({
|
||||
"kind": "figure",
|
||||
"rect": anchor,
|
||||
"kind_order": 0,
|
||||
"md": md,
|
||||
})
|
||||
|
||||
raw_blocks = page.get_text("dict")["blocks"]
|
||||
|
||||
for block in raw_blocks:
|
||||
block_rect = fitz.Rect(block["bbox"])
|
||||
|
||||
if block.get("type") == 0:
|
||||
if inside_any_figure(block_rect, figure_regions):
|
||||
continue
|
||||
items.append({
|
||||
"kind": "text",
|
||||
"rect": block_rect,
|
||||
"kind_order": 2,
|
||||
"block": block,
|
||||
})
|
||||
continue
|
||||
|
||||
if block.get("type") == 1:
|
||||
if inside_any_figure(block_rect, figure_regions):
|
||||
continue
|
||||
if caption_present:
|
||||
continue
|
||||
|
||||
page_area = page.rect.get_area()
|
||||
if block_rect.get_area() < page_area * 0.005:
|
||||
continue
|
||||
|
||||
ratio = block_rect.width / max(1.0, block_rect.height)
|
||||
if ratio < 0.25 or ratio > 4.0:
|
||||
continue
|
||||
|
||||
pix_preview = page.get_pixmap(
|
||||
clip=block_rect, dpi=80, colorspace=fitz.csRGB
|
||||
)
|
||||
ok, nonwhite_ratio, edge_ratio, var = keep_figure(pix_preview)
|
||||
if not ok:
|
||||
continue
|
||||
|
||||
pix = page.get_pixmap(
|
||||
clip=block_rect, dpi=150, colorspace=fitz.csRGB
|
||||
)
|
||||
|
||||
# ★ 추가 필터 적용 (v2.1)
|
||||
should_filter, filter_reason = should_filter_image(pix, block_rect, page.rect)
|
||||
if should_filter:
|
||||
continue
|
||||
|
||||
img_name = f"p{page_num + 1:03d}_photo{uncaptioned_idx:02d}.png"
|
||||
img_path = os.path.join(img_dir, img_name)
|
||||
pix.save(img_path)
|
||||
|
||||
rel = os.path.join(img_rel_dir, img_name).replace("\\", "/")
|
||||
r = block_rect
|
||||
md = (
|
||||
f'\n\n'
|
||||
f'*Page {page_num + 1} Photo*\n\n'
|
||||
)
|
||||
|
||||
items.append({
|
||||
"kind": "raster",
|
||||
"rect": block_rect,
|
||||
"kind_order": 1,
|
||||
"md": md,
|
||||
})
|
||||
|
||||
# ★ 캡션 없는 이미지 메타데이터
|
||||
image_metadata_list.append({
|
||||
"image_file": img_name,
|
||||
"image_path": str(Path(img_dir) / img_name),
|
||||
"type": "photo",
|
||||
"source_pdf": metadata['pdf_name'],
|
||||
"source_folder": metadata['relative_folder'],
|
||||
"full_path": metadata['full_path'],
|
||||
"page": page_num + 1,
|
||||
"total_pages": total_pages,
|
||||
"caption": "",
|
||||
"rect": {
|
||||
"x0": round(r.x0, 2),
|
||||
"y0": round(r.y0, 2),
|
||||
"x1": round(r.x1, 2),
|
||||
"y1": round(r.y1, 2)
|
||||
}
|
||||
})
|
||||
|
||||
uncaptioned_idx += 1
|
||||
continue
|
||||
|
||||
# 읽기 순서 정렬
|
||||
text_items = [it for it in items if it["kind"] == "text"]
|
||||
page_w = page.rect.width
|
||||
mid = page_w / 2.0
|
||||
|
||||
candidates = []
|
||||
for it in text_items:
|
||||
r = it["rect"]
|
||||
if is_full_width_rect(r, page.rect):
|
||||
continue
|
||||
if r.width < page_w * 0.2:
|
||||
continue
|
||||
candidates.append(it)
|
||||
|
||||
left = [it for it in candidates if it["rect"].x0 < mid * 0.95]
|
||||
right = [it for it in candidates if it["rect"].x0 > mid * 1.05]
|
||||
two_cols = len(left) >= 3 and len(right) >= 3
|
||||
|
||||
col_y0 = None
|
||||
col_y1 = None
|
||||
seps = []
|
||||
|
||||
if two_cols and left and right:
|
||||
col_y0 = min(
|
||||
min(it["rect"].y0 for it in left),
|
||||
min(it["rect"].y0 for it in right),
|
||||
)
|
||||
col_y1 = max(
|
||||
max(it["rect"].y1 for it in left),
|
||||
max(it["rect"].y1 for it in right),
|
||||
)
|
||||
for it in text_items:
|
||||
r = it["rect"]
|
||||
if col_y0 < r.y0 < col_y1 and is_full_width_rect(r, page.rect):
|
||||
seps.append(r.y0)
|
||||
seps = sorted(set(seps))
|
||||
|
||||
def seg_index(y0, separators):
|
||||
if not separators:
|
||||
return 0
|
||||
n = 0
|
||||
for s in separators:
|
||||
if y0 >= s:
|
||||
n += 1
|
||||
else:
|
||||
break
|
||||
return n
|
||||
|
||||
def order_key(it):
|
||||
r = it["rect"]
|
||||
if not two_cols:
|
||||
return (r.y0, r.x0, it["kind_order"])
|
||||
if col_y0 is not None and r.y1 <= col_y0:
|
||||
return (0, r.y0, r.x0, it["kind_order"])
|
||||
if col_y1 is not None and r.y0 >= col_y1:
|
||||
return (2, r.y0, r.x0, it["kind_order"])
|
||||
seg = seg_index(r.y0, seps)
|
||||
if is_full_width_rect(r, page.rect):
|
||||
col = 2
|
||||
else:
|
||||
col = 0 if r.x0 < mid else 1
|
||||
return (1, seg, col, r.y0, r.x0, it["kind_order"])
|
||||
|
||||
items.sort(key=order_key)
|
||||
|
||||
for it in items:
|
||||
if it["kind"] in ("figure", "raster"):
|
||||
md_file.write(it["md"])
|
||||
continue
|
||||
|
||||
block = it["block"]
|
||||
for line in block.get("lines", []):
|
||||
for span in line.get("spans", []):
|
||||
md_file.write(span.get("text", "") + " ")
|
||||
md_file.write("\n")
|
||||
md_file.write("\n")
|
||||
|
||||
doc.close()
|
||||
return image_metadata_list
|
||||
|
||||
|
||||
def process_all_pdfs(input_dir, output_dir):
|
||||
"""
|
||||
BASE_DIR 하위의 모든 PDF를 재귀적으로 처리
|
||||
폴더 구조를 유지하면서 OUTPUT_BASE에 저장
|
||||
"""
|
||||
BASE_DIR = Path(input_dir)
|
||||
OUTPUT_BASE = Path(output_dir)
|
||||
# 출력 폴더 생성
|
||||
OUTPUT_BASE.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 전체 이미지 메타데이터 수집
|
||||
all_image_metadata = []
|
||||
|
||||
# 처리 통계
|
||||
stats = {
|
||||
"total_pdfs": 0,
|
||||
"success": 0,
|
||||
"failed": 0,
|
||||
"total_images": 0
|
||||
}
|
||||
|
||||
# 실패 로그
|
||||
failed_files = []
|
||||
|
||||
print(f"=" * 60)
|
||||
print(f"PDF 추출 시작")
|
||||
print(f"원본 폴더: {BASE_DIR}")
|
||||
print(f"출력 폴더: {OUTPUT_BASE}")
|
||||
print(f"=" * 60)
|
||||
|
||||
# 모든 PDF 파일 찾기
|
||||
pdf_files = list(BASE_DIR.rglob("*.pdf"))
|
||||
stats["total_pdfs"] = len(pdf_files)
|
||||
|
||||
print(f"\n총 {len(pdf_files)}개 PDF 발견\n")
|
||||
|
||||
for idx, pdf_path in enumerate(pdf_files, 1):
|
||||
try:
|
||||
# 상대 경로 계산
|
||||
relative_path = pdf_path.relative_to(BASE_DIR)
|
||||
relative_folder = str(relative_path.parent)
|
||||
if relative_folder == ".":
|
||||
relative_folder = ""
|
||||
|
||||
pdf_name = pdf_path.name
|
||||
pdf_stem = pdf_path.stem
|
||||
|
||||
# 출력 경로 설정 (폴더 구조 유지)
|
||||
output_folder = OUTPUT_BASE / relative_path.parent
|
||||
output_folder.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
output_md = output_folder / f"{pdf_stem}.md"
|
||||
img_folder = output_folder / f"{pdf_stem}_img"
|
||||
|
||||
# 메타데이터 준비
|
||||
metadata = {
|
||||
"pdf_name": pdf_name,
|
||||
"pdf_stem": pdf_stem,
|
||||
"relative_folder": relative_folder,
|
||||
"full_path": str(relative_path),
|
||||
}
|
||||
|
||||
print(f"[{idx}/{len(pdf_files)}] {relative_path}")
|
||||
|
||||
# PDF 처리
|
||||
image_metas = extract_pdf_content(
|
||||
str(pdf_path),
|
||||
str(output_md),
|
||||
str(img_folder),
|
||||
metadata
|
||||
)
|
||||
|
||||
all_image_metadata.extend(image_metas)
|
||||
stats["success"] += 1
|
||||
stats["total_images"] += len(image_metas)
|
||||
|
||||
print(f" ✓ 완료 (이미지 {len(image_metas)}개)")
|
||||
|
||||
except Exception as e:
|
||||
stats["failed"] += 1
|
||||
failed_files.append({
|
||||
"file": str(pdf_path),
|
||||
"error": str(e)
|
||||
})
|
||||
print(f" ✗ 실패: {e}")
|
||||
|
||||
# 전체 이미지 메타데이터 저장
|
||||
meta_output_path = OUTPUT_BASE / "image_metadata.json"
|
||||
with open(meta_output_path, "w", encoding="utf-8") as f:
|
||||
json.dump(all_image_metadata, f, ensure_ascii=False, indent=2)
|
||||
|
||||
# 처리 요약 저장
|
||||
summary = {
|
||||
"processed_at": datetime.now().isoformat(),
|
||||
"source_dir": str(BASE_DIR),
|
||||
"output_dir": str(OUTPUT_BASE),
|
||||
"statistics": stats,
|
||||
"failed_files": failed_files
|
||||
}
|
||||
|
||||
summary_path = OUTPUT_BASE / "extraction_summary.json"
|
||||
with open(summary_path, "w", encoding="utf-8") as f:
|
||||
json.dump(summary, f, ensure_ascii=False, indent=2)
|
||||
|
||||
# 결과 출력
|
||||
print(f"\n" + "=" * 60)
|
||||
print(f"추출 완료!")
|
||||
print(f"=" * 60)
|
||||
print(f"총 PDF: {stats['total_pdfs']}개")
|
||||
print(f"성공: {stats['success']}개")
|
||||
print(f"실패: {stats['failed']}개")
|
||||
print(f"추출된 이미지: {stats['total_images']}개")
|
||||
print(f"\n이미지 메타데이터: {meta_output_path}")
|
||||
print(f"처리 요약: {summary_path}")
|
||||
|
||||
if failed_files:
|
||||
print(f"\n실패한 파일:")
|
||||
for f in failed_files:
|
||||
print(f" - {f['file']}: {f['error']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
process_all_pdfs()
|
||||
Reference in New Issue
Block a user