- merge_markdown.py: 96개 페이지별 MD를 단일 파일로 병합
- 이미지를 output/images/ 폴더로 통합, p{NN}_ prefix로 파일명 충돌 방지
- file_range 파라미터로 부분 테스트 가능
- docs/tutorial.md: merge 명령어 및 사용법 문서화
- docs/history: 작업 이력 파일 추가
소요 시간: 10분 | Context: input 18k / output 2k tokens
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
311 lines
10 KiB
Python
311 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PDF to Markdown converter with cropped figure extraction
|
|
Uses marker-pdf to detect figures, then crops them from page images.
|
|
Supports 2-column (multi-column) → single-column reordering.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import glob
|
|
from pathlib import Path
|
|
from marker.converters.pdf import PdfConverter
|
|
from marker.models import create_model_dict
|
|
from marker.output import text_from_rendered
|
|
from PIL import Image
|
|
import fitz # PyMuPDF
|
|
|
|
|
|
def is_scanned_pdf(pdf_path: str, sample_pages: int = 3) -> bool:
|
|
"""페이지에 선택 가능한 텍스트가 없으면 스캔 PDF로 판단"""
|
|
doc = fitz.open(pdf_path)
|
|
total = min(sample_pages, len(doc))
|
|
text_chars = 0
|
|
for i in range(total):
|
|
text_chars += len(doc[i].get_text().strip())
|
|
doc.close()
|
|
return text_chars < 50 # 글자 수가 매우 적으면 스캔본
|
|
|
|
|
|
def reorder_text_by_columns(pdf_path: str) -> str:
|
|
"""
|
|
텍스트 기반 PDF 전용: PyMuPDF 블록 좌표로 2단 → 1단 순서 재정렬.
|
|
각 페이지에서 좌측 컬럼 전체 → 우측 컬럼 전체 순으로 읽음.
|
|
"""
|
|
doc = fitz.open(pdf_path)
|
|
pages_text = []
|
|
|
|
for page in doc:
|
|
blocks = page.get_text("blocks", sort=False)
|
|
text_blocks = [b for b in blocks if b[6] == 0 and b[4].strip()]
|
|
if not text_blocks:
|
|
continue
|
|
|
|
page_width = page.rect.width
|
|
mid_x = page_width / 2
|
|
|
|
left = [b for b in text_blocks if b[2] <= mid_x + 30]
|
|
right = [b for b in text_blocks if b[0] >= mid_x - 30]
|
|
span = [b for b in text_blocks if b[0] < mid_x - 30 and b[2] > mid_x + 30]
|
|
|
|
is_two_col = len(left) >= 2 and len(right) >= 2 and not span
|
|
|
|
if is_two_col:
|
|
left.sort(key=lambda b: b[1])
|
|
right.sort(key=lambda b: b[1])
|
|
ordered = left + right
|
|
else:
|
|
ordered = sorted(text_blocks, key=lambda b: (b[1], b[0]))
|
|
|
|
pages_text.append("\n\n".join(b[4].strip() for b in ordered))
|
|
|
|
doc.close()
|
|
return "\n\n---\n\n".join(pages_text)
|
|
|
|
|
|
def extract_figure_images(pdf_path: str, rendered, output_dir: str, base_name: str):
|
|
"""
|
|
Extract figure images by cropping from page images based on marker's detection
|
|
|
|
Args:
|
|
pdf_path: Path to PDF file
|
|
rendered: Marker's rendered output with figure positions
|
|
output_dir: Output directory
|
|
base_name: Base filename
|
|
|
|
Returns:
|
|
dict: Mapping of image names to image data
|
|
"""
|
|
images_dict = {}
|
|
|
|
# Check if rendered has pages with image information
|
|
if not hasattr(rendered, 'pages') or not rendered.pages:
|
|
print(" No page information in rendered output")
|
|
return images_dict
|
|
|
|
# Open PDF with PyMuPDF to render pages as images
|
|
doc = fitz.open(pdf_path)
|
|
|
|
print(f" Processing {len(rendered.pages)} pages for figure extraction...")
|
|
|
|
for page_idx, page_data in enumerate(rendered.pages):
|
|
page_num = page_idx + 1
|
|
|
|
# Check if page has images/figures
|
|
if not hasattr(page_data, 'images') or not page_data.images:
|
|
continue
|
|
|
|
print(f" Page {page_num}: Found {len(page_data.images)} figure(s)")
|
|
|
|
# Render page as image
|
|
pdf_page = doc[page_idx]
|
|
|
|
# Render at 2x resolution for better quality
|
|
mat = fitz.Matrix(2, 2)
|
|
pix = pdf_page.get_pixmap(matrix=mat)
|
|
|
|
# Convert to PIL Image
|
|
import io
|
|
img_data = pix.tobytes("png")
|
|
page_img = Image.open(io.BytesIO(img_data))
|
|
|
|
# Extract each figure from this page
|
|
for fig_idx, fig_info in enumerate(page_data.images):
|
|
try:
|
|
# Get bounding box (marker stores positions)
|
|
if hasattr(fig_info, 'bbox'):
|
|
bbox = fig_info.bbox
|
|
|
|
# Scale bbox coordinates (marker uses PDF coordinates)
|
|
# Adjust for 2x rendering
|
|
x0, y0, x1, y1 = bbox
|
|
x0, y0, x1, y1 = int(x0 * 2), int(y0 * 2), int(x1 * 2), int(y1 * 2)
|
|
|
|
# Crop the figure
|
|
cropped = page_img.crop((x0, y0, x1, y1))
|
|
|
|
# Save to bytes
|
|
from io import BytesIO
|
|
img_bytes = BytesIO()
|
|
cropped.save(img_bytes, format='PNG')
|
|
|
|
# Generate image name
|
|
img_name = f"_page_{page_num}_Figure_{fig_idx + 1}.png"
|
|
images_dict[img_name] = img_bytes.getvalue()
|
|
|
|
print(f" Cropped figure {fig_idx + 1}: {x1-x0}x{y1-y0}px")
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Could not crop figure {fig_idx + 1}: {e}")
|
|
|
|
doc.close()
|
|
return images_dict
|
|
|
|
|
|
def convert_pdf_with_cropped_images(pdf_path: str, output_dir: str = "output"):
|
|
"""
|
|
Convert PDF to Markdown with cropped figure images.
|
|
- 스캔 PDF: marker-pdf OCR + 레이아웃 검출 (2단 자동 처리)
|
|
- 텍스트 PDF: PyMuPDF 블록 좌표 기반 2단→1단 재정렬
|
|
"""
|
|
import io
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
pdf_file = Path(pdf_path)
|
|
base_name = pdf_file.stem
|
|
|
|
print(f"\nConverting {pdf_file.name}...")
|
|
|
|
scanned = is_scanned_pdf(pdf_path)
|
|
print(f" PDF type: {'scanned (OCR)' if scanned else 'text-based (PyMuPDF column reorder)'}")
|
|
|
|
try:
|
|
if not scanned:
|
|
# 텍스트 기반 PDF: PyMuPDF로 2단 재정렬 추출
|
|
print(" Extracting text with column reordering...")
|
|
text = reorder_text_by_columns(pdf_path)
|
|
metadata = None
|
|
marker_images = {}
|
|
else:
|
|
# 스캔 PDF: marker-pdf가 OCR + 레이아웃(2단) 처리
|
|
converter = PdfConverter(
|
|
artifact_dict=create_model_dict(),
|
|
)
|
|
|
|
print(" Running marker-pdf OCR and layout detection...")
|
|
rendered = converter(pdf_path)
|
|
|
|
text, metadata, marker_images = text_from_rendered(rendered)
|
|
|
|
# Fix image paths: prepend {base_name}_images/ folder to image references
|
|
# 공백을 %20으로 인코딩 — Obsidian(CommonMark) 경로 파싱 오류 방지
|
|
safe_base_name = base_name.replace(' ', '%20')
|
|
text = re.sub(
|
|
r'!\[([^\]]*)\]\(([^/)][^)]*)\)',
|
|
rf'',
|
|
text
|
|
)
|
|
|
|
# Save markdown
|
|
output_path = os.path.join(output_dir, f"{base_name}.md")
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|
f.write(text)
|
|
print(f" OK Markdown saved: {output_path}")
|
|
|
|
# Extract cropped figure images
|
|
print(" Extracting figures from pages...")
|
|
cropped_images = extract_figure_images(pdf_path, rendered, output_dir, base_name)
|
|
|
|
if cropped_images:
|
|
images_dir = os.path.join(output_dir, f"{base_name}_images")
|
|
os.makedirs(images_dir, exist_ok=True)
|
|
|
|
for img_name, img_data in cropped_images.items():
|
|
img_path = os.path.join(images_dir, img_name)
|
|
with open(img_path, "wb") as f:
|
|
f.write(img_data)
|
|
|
|
print(f" OK {len(cropped_images)} figures saved to: {images_dir}")
|
|
else:
|
|
print(" ! No figures extracted (trying alternative method...)")
|
|
# Fallback: use marker's images if available
|
|
if marker_images:
|
|
images_dir = os.path.join(output_dir, f"{base_name}_images")
|
|
os.makedirs(images_dir, exist_ok=True)
|
|
|
|
saved_count = 0
|
|
for img_name, img_data in marker_images.items():
|
|
try:
|
|
from io import BytesIO
|
|
if isinstance(img_data, Image.Image):
|
|
img_bytes = BytesIO()
|
|
img_data.save(img_bytes, format='PNG')
|
|
img_bytes = img_bytes.getvalue()
|
|
else:
|
|
img_bytes = img_data
|
|
|
|
if img_bytes and len(img_bytes) > 0:
|
|
img_path = os.path.join(images_dir, img_name)
|
|
with open(img_path, "wb") as f:
|
|
f.write(img_bytes)
|
|
saved_count += 1
|
|
except Exception as e:
|
|
print(f" Warning: Could not save {img_name}: {e}")
|
|
|
|
if saved_count > 0:
|
|
print(f" OK {saved_count} images from marker saved")
|
|
else:
|
|
print(" ! No valid images to save")
|
|
|
|
# Save metadata
|
|
if metadata:
|
|
import json
|
|
metadata_path = os.path.join(output_dir, f"{base_name}_metadata.json")
|
|
with open(metadata_path, "w", encoding="utf-8") as f:
|
|
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f" ERROR: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
|
|
def convert_all_pdfs(input_dir: str = "input", output_dir: str = "output"):
|
|
"""
|
|
Convert all PDFs with cropped figure extraction
|
|
Each PDF is converted in a separate process to avoid multiprocessing issues
|
|
"""
|
|
pdf_pattern = os.path.join(input_dir, "*.pdf")
|
|
pdf_files = sorted(glob.glob(pdf_pattern))
|
|
|
|
if not pdf_files:
|
|
print(f"No PDF files found in {input_dir}")
|
|
return
|
|
|
|
print(f"Found {len(pdf_files)} PDF file(s)")
|
|
print("=" * 60)
|
|
|
|
successful = 0
|
|
failed = 0
|
|
|
|
import subprocess
|
|
import sys
|
|
|
|
for pdf_file in pdf_files:
|
|
print(f"\nStarting conversion of: {os.path.basename(pdf_file)}")
|
|
|
|
result = subprocess.run(
|
|
[sys.executable, __file__, "--single", pdf_file, output_dir],
|
|
capture_output=False
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
successful += 1
|
|
else:
|
|
failed += 1
|
|
print(f" FAILED: {os.path.basename(pdf_file)}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"Conversion complete!")
|
|
print(f" Successful: {successful}")
|
|
print(f" Failed: {failed}")
|
|
print(f" Total: {len(pdf_files)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
# Check if running in single-file mode (called by subprocess)
|
|
if len(sys.argv) >= 4 and sys.argv[1] == "--single":
|
|
pdf_file = sys.argv[2]
|
|
output_dir = sys.argv[3]
|
|
success = convert_pdf_with_cropped_images(pdf_file, output_dir)
|
|
sys.exit(0 if success else 1)
|
|
else:
|
|
# Normal batch mode
|
|
convert_all_pdfs()
|