233 lines
8.0 KiB
Python
233 lines
8.0 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import re
|
|
|
|
import cv2
|
|
import docx # PyMuPDF, python-docx
|
|
import fitz
|
|
import numpy as np
|
|
from paddleocr import PaddleOCR
|
|
from pdf2image import convert_from_path
|
|
from PIL import Image
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def extract_text_from_file(file_path):
|
|
ext = os.path.splitext(file_path)[-1].lower()
|
|
images = []
|
|
|
|
if ext == ".pdf":
|
|
# ① 먼저 PDF에서 텍스트 추출 시도 -> GT를 만들기에 무조건 ocr 과정 거치도록 변경
|
|
# text_only = await asyncio.to_thread(extract_text_from_pdf_direct, file_path)
|
|
# if text_only.strip():
|
|
# logger.info(
|
|
# "[UTILS-TEXT] PDF는 텍스트 기반입니다. (OCR 없이 텍스트 추출 완료)"
|
|
# )
|
|
# return text_only, [], "OCR not used"
|
|
|
|
# ② 텍스트가 없으면 이미지 변환 → OCR 수행
|
|
images = await asyncio.to_thread(convert_from_path, file_path, dpi=400)
|
|
page_count = len(images)
|
|
logger.info(f"[UTILS-CONVERT] PDF에서 이미지로 변환 완료 ({page_count} 페이지)")
|
|
|
|
elif ext in [".jpg", ".jpeg", ".png"]:
|
|
img = await asyncio.to_thread(Image.open, file_path)
|
|
images = [img]
|
|
logger.info("[UTILS-IMAGE] 이미지 파일 로딩 완료")
|
|
|
|
elif ext == ".docx":
|
|
text_only = await asyncio.to_thread(extract_text_from_docx, file_path)
|
|
logger.info("[UTILS-DOCX] Word 문서 텍스트 추출 완료")
|
|
return text_only, [], "OCR not used"
|
|
|
|
else:
|
|
logger.error(
|
|
"[ERROR] 지원하지 않는 파일 형식입니다. (PDF, JPG, JPEG, PNG, DOCX만 허용)"
|
|
)
|
|
raise ValueError("지원하지 않는 파일 형식입니다.")
|
|
|
|
full_response, coord_response = await asyncio.to_thread(
|
|
extract_text_paddle_ocr, images
|
|
)
|
|
return full_response, coord_response, "paddle_ocr"
|
|
|
|
|
|
# ✅ PDF 텍스트 기반 여부 확인 및 텍스트 추출
|
|
def extract_text_from_pdf_direct(pdf_path):
|
|
text = ""
|
|
try:
|
|
with fitz.open(pdf_path) as doc:
|
|
for page in doc:
|
|
text += page.get_text()
|
|
valid_chars = re.findall(r"[가-힣a-zA-Z]", text)
|
|
logger.info(f"len(valid_chars): {len(valid_chars)}")
|
|
if len(valid_chars) < 10:
|
|
return text # 텍스트가 충분하지 않으면 바로 반환
|
|
else:
|
|
text += page.get_text()
|
|
except Exception as e:
|
|
logger.info("[ERROR] PDF 텍스트 추출 실패:", e)
|
|
return text
|
|
|
|
|
|
# ✅ DOCX 텍스트 추출
|
|
def extract_text_from_docx(docx_path):
|
|
text = ""
|
|
try:
|
|
doc = docx.Document(docx_path)
|
|
for para in doc.paragraphs:
|
|
text += para.text + "\n"
|
|
except Exception as e:
|
|
logger.info("[ERROR] DOCX 텍스트 추출 실패:", e)
|
|
return text
|
|
|
|
|
|
# ✅ OCR 전 이미지 전처리 함수
|
|
def preprocess_image_for_ocr(pil_img, page_idx=None):
|
|
logger.info("[UTILS-OCR] 이미지 전처리 시작")
|
|
img = np.array(pil_img.convert("RGB")) # PIL → OpenCV 변환
|
|
img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) # 그레이스케일 변환
|
|
img = cv2.bilateralFilter(img, 9, 75, 75) # 노이즈 제거
|
|
img = cv2.adaptiveThreshold(
|
|
img,
|
|
255,
|
|
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY,
|
|
31,
|
|
10, # 대비 향상
|
|
)
|
|
img = cv2.resize(
|
|
img, None, fx=2, fy=2, interpolation=cv2.INTER_LINEAR
|
|
) # 해상도 확대
|
|
|
|
return Image.fromarray(img)
|
|
|
|
|
|
def _to_rgb_uint8(img_np: np.ndarray) -> np.ndarray:
|
|
"""
|
|
입력 이미지를 3채널 RGB, uint8 [0,255] 로 표준화
|
|
허용 입력: HxW, HxWx1, HxWx3, HxWx4, float[0..1]/[0..255], int 등
|
|
"""
|
|
if img_np is None:
|
|
raise ValueError("Input image is None")
|
|
|
|
# dtype/범위 표준화
|
|
if img_np.dtype != np.uint8:
|
|
arr = img_np.astype(np.float32)
|
|
if arr.max() <= 1.0: # [0,1]로 보이면 스케일업
|
|
arr *= 255.0
|
|
arr = np.clip(arr, 0, 255).astype(np.uint8)
|
|
img_np = arr
|
|
|
|
# 채널 표준화
|
|
if img_np.ndim == 2: # HxW
|
|
img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2RGB)
|
|
elif img_np.ndim == 3:
|
|
h, w, c = img_np.shape
|
|
if c == 1:
|
|
img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2RGB)
|
|
elif c == 4:
|
|
img_np = cv2.cvtColor(img_np, cv2.COLOR_RGBA2RGB)
|
|
elif c == 3:
|
|
pass # 그대로 사용
|
|
else:
|
|
raise ValueError(f"Unsupported channel count: {c}")
|
|
else:
|
|
raise ValueError(f"Unsupported ndim: {img_np.ndim}")
|
|
|
|
return img_np
|
|
|
|
|
|
def extract_text_paddle_ocr(images):
|
|
"""
|
|
PaddleOCR를 사용하여 이미지에서 텍스트 추출 및 좌표 정보 반환
|
|
"""
|
|
# os.environ["CUDA_VISIBLE_DEVICES"] = "" # GPU 사용 안 함
|
|
ocr = PaddleOCR(
|
|
use_doc_orientation_classify=False, use_doc_unwarping=False, lang="korean"
|
|
)
|
|
|
|
coord_response = []
|
|
all_text_boxes = [] # (y_center, x_center, text, box) 저장용
|
|
|
|
for page_idx, img in enumerate(images):
|
|
print(f"[PaddleOCR] 페이지 {page_idx + 1} OCR로 텍스트 추출 중...")
|
|
img_np = np.array(img)
|
|
|
|
# ✅ 채널/타입 표준화 (grayscale/rgba/float 등 대응)
|
|
try:
|
|
img_np = _to_rgb_uint8(img_np)
|
|
except Exception as e:
|
|
print(f"[PaddleOCR] 페이지 {page_idx + 1} 입력 표준화 실패: {e}")
|
|
continue # 문제 페이지 스킵 후 다음 페이지 진행
|
|
|
|
# ✅ 과도한 해상도 안정화 (최대 변 4000px)
|
|
h, w = img_np.shape[:2]
|
|
max_side = max(h, w)
|
|
max_side_limit = 4000
|
|
if max_side > max_side_limit:
|
|
scale = max_side_limit / max_side
|
|
new_size = (int(w * scale), int(h * scale))
|
|
img_np = cv2.resize(img_np, new_size, interpolation=cv2.INTER_AREA)
|
|
print(f"[PaddleOCR] Resized to {img_np.shape[1]}x{img_np.shape[0]}")
|
|
|
|
results = ocr.predict(input=img_np)
|
|
|
|
print(f"[PaddleOCR] 페이지 {page_idx + 1} OCR 결과 개수: {len(results)}")
|
|
for res_idx, res in enumerate(results):
|
|
print(f"[PaddleOCR] 페이지 {page_idx + 1} 결과 {res_idx + 1}개 추출 완료")
|
|
|
|
res_dic = dict(res.items())
|
|
texts = res_dic.get("rec_texts", [])
|
|
boxes = res_dic.get("rec_boxes", [])
|
|
|
|
for text, box in zip(texts, boxes):
|
|
if isinstance(box, np.ndarray):
|
|
box = box.tolist()
|
|
# ✅ box 정규화
|
|
if all(isinstance(p, (int, float)) for p in box):
|
|
if len(box) % 2 == 0:
|
|
box = [[box[i], box[i + 1]] for i in range(0, len(box), 2)]
|
|
else:
|
|
print(f"[PaddleOCR] 잘못된 box 형식: {box}")
|
|
continue
|
|
|
|
coord_response.append(box)
|
|
|
|
# 중심 좌표 계산 (y → 줄 순서, x → 단어 순서)
|
|
x_coords = [p[0] for p in box]
|
|
y_coords = [p[1] for p in box]
|
|
x_center = sum(x_coords) / len(x_coords)
|
|
y_center = sum(y_coords) / len(y_coords)
|
|
|
|
all_text_boxes.append((y_center, x_center, text))
|
|
|
|
# ✅ 위치 기반 정렬
|
|
all_text_boxes.sort(key=lambda x: (x[0], x[1])) # y 먼저, 그 다음 x 정렬
|
|
|
|
# ✅ 줄 단위 그룹핑
|
|
lines = []
|
|
current_line = []
|
|
prev_y = None
|
|
line_threshold = 15 # 줄 묶음 y 오차 허용값
|
|
|
|
for y, x, text in all_text_boxes:
|
|
if prev_y is None or abs(y - prev_y) < line_threshold:
|
|
current_line.append((x, text))
|
|
else:
|
|
current_line.sort(key=lambda xx: xx[0])
|
|
lines.append(" ".join(t for _, t in current_line))
|
|
current_line = [(x, text)]
|
|
prev_y = y
|
|
|
|
if current_line:
|
|
current_line.sort(key=lambda xx: xx[0])
|
|
lines.append(" ".join(t for _, t in current_line))
|
|
|
|
parsed_text = "\n".join(lines)
|
|
|
|
print("[PaddleOCR] 전체 페이지 텍스트 및 좌표 추출 완료")
|
|
return parsed_text, coord_response
|