199 lines
6.6 KiB
Python
199 lines
6.6 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import re
|
|
|
|
import cv2
|
|
import docx # PyMuPDF, python-docx
|
|
import fitz
|
|
import numpy as np
|
|
import pytesseract
|
|
from paddleocr import PaddleOCR
|
|
from pdf2image import convert_from_path
|
|
from PIL import Image
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def extract_text_from_file(file_path):
|
|
ext = os.path.splitext(file_path)[-1].lower()
|
|
images = []
|
|
|
|
if ext == ".pdf":
|
|
# ① 먼저 PDF에서 텍스트 추출 시도
|
|
text_only = await asyncio.to_thread(extract_text_from_pdf_direct, file_path)
|
|
if text_only.strip():
|
|
logger.info(
|
|
"[UTILS-TEXT] PDF는 텍스트 기반입니다. (OCR 없이 텍스트 추출 완료)"
|
|
)
|
|
return text_only, [], "OCR not used"
|
|
|
|
# ② 텍스트가 없으면 이미지 변환 → OCR 수행
|
|
images = await asyncio.to_thread(convert_from_path, file_path, dpi=400)
|
|
page_count = len(images)
|
|
logger.info(f"[UTILS-CONVERT] PDF에서 이미지로 변환 완료 ({page_count} 페이지)")
|
|
|
|
elif ext in [".jpg", ".jpeg", ".png"]:
|
|
img = await asyncio.to_thread(Image.open, file_path)
|
|
images = [img]
|
|
logger.info("[UTILS-IMAGE] 이미지 파일 로딩 완료")
|
|
|
|
elif ext == ".docx":
|
|
text_only = await asyncio.to_thread(extract_text_from_docx, file_path)
|
|
logger.info("[UTILS-DOCX] Word 문서 텍스트 추출 완료")
|
|
return text_only, [], "OCR not used"
|
|
|
|
else:
|
|
logger.error(
|
|
"[ERROR] 지원하지 않는 파일 형식입니다. (PDF, JPG, JPEG, PNG, DOCX만 허용)"
|
|
)
|
|
raise ValueError("지원하지 않는 파일 형식입니다.")
|
|
|
|
# full_response, coord_response = await asyncio.to_thread(
|
|
# extract_text_ocr, images
|
|
# )
|
|
# return full_response, coord_response, "pytesseract"
|
|
full_response, coord_response = await asyncio.to_thread(
|
|
extract_text_paddle_ocr, images
|
|
)
|
|
return full_response, coord_response, "paddle_ocr"
|
|
|
|
|
|
# ✅ PDF 텍스트 기반 여부 확인 및 텍스트 추출
|
|
def extract_text_from_pdf_direct(pdf_path):
|
|
text = ""
|
|
try:
|
|
with fitz.open(pdf_path) as doc:
|
|
for page in doc:
|
|
text += page.get_text()
|
|
valid_chars = re.findall(r"[가-힣a-zA-Z]", text)
|
|
logger.info(f"len(valid_chars): {len(valid_chars)}")
|
|
if len(valid_chars) < 10:
|
|
return text # 텍스트가 충분하지 않으면 바로 반환
|
|
else:
|
|
text += page.get_text()
|
|
except Exception as e:
|
|
logger.info("[ERROR] PDF 텍스트 추출 실패:", e)
|
|
return text
|
|
|
|
|
|
# ✅ DOCX 텍스트 추출
|
|
def extract_text_from_docx(docx_path):
|
|
text = ""
|
|
try:
|
|
doc = docx.Document(docx_path)
|
|
for para in doc.paragraphs:
|
|
text += para.text + "\n"
|
|
except Exception as e:
|
|
logger.info("[ERROR] DOCX 텍스트 추출 실패:", e)
|
|
return text
|
|
|
|
|
|
# ✅ OCR 전 이미지 전처리 함수
|
|
def preprocess_image_for_ocr(pil_img, page_idx=None):
|
|
logger.info("[UTILS-OCR] 이미지 전처리 시작")
|
|
img = np.array(pil_img.convert("RGB")) # PIL → OpenCV 변환
|
|
img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) # 그레이스케일 변환
|
|
img = cv2.bilateralFilter(img, 9, 75, 75) # 노이즈 제거
|
|
img = cv2.adaptiveThreshold(
|
|
img,
|
|
255,
|
|
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY,
|
|
31,
|
|
10, # 대비 향상
|
|
)
|
|
img = cv2.resize(
|
|
img, None, fx=2, fy=2, interpolation=cv2.INTER_LINEAR
|
|
) # 해상도 확대
|
|
|
|
# # ✅ 전처리 이미지 저장
|
|
# save_path = os.path.join("preprocess_image.png")
|
|
# logger.info(f"[UTILS-OCR] 전처리 이미지 저장: {save_path}")
|
|
# cv2.imwrite(save_path, img)
|
|
|
|
return Image.fromarray(img)
|
|
|
|
|
|
# ✅ OCR 수행 (좌표 포함)
|
|
def extract_text_ocr(images):
|
|
"""
|
|
tesseract를 사용하여 이미지에서 텍스트 추출 및 좌표 정보 반환
|
|
"""
|
|
all_texts = []
|
|
coord_response = []
|
|
|
|
for page_idx, img in enumerate(images):
|
|
logger.info(f"[UTILS-OCR] 페이지 {page_idx + 1} OCR로 텍스트 추출 중...")
|
|
pre_img = preprocess_image_for_ocr(img)
|
|
text = pytesseract.image_to_string(
|
|
pre_img, lang="kor+eng", config="--oem 3 --psm 6"
|
|
)
|
|
all_texts.append(text)
|
|
|
|
ocr_data = pytesseract.image_to_data(
|
|
pre_img,
|
|
output_type=pytesseract.Output.DICT,
|
|
lang="kor+eng",
|
|
config="--oem 3 --psm 6",
|
|
)
|
|
for i in range(len(ocr_data["text"])):
|
|
word = ocr_data["text"][i].strip()
|
|
if word == "":
|
|
continue
|
|
x, y, w, h = (
|
|
ocr_data["left"][i],
|
|
ocr_data["top"][i],
|
|
ocr_data["width"][i],
|
|
ocr_data["height"][i],
|
|
)
|
|
coord_response.append(
|
|
{"text": word, "coords": [x, y, x + w, y + h], "page": page_idx + 1}
|
|
)
|
|
|
|
logger.info(f"[UTILS-OCR] 페이지 {page_idx + 1} 텍스트 및 좌표 추출 완료")
|
|
|
|
full_response = "\n".join(all_texts)
|
|
return full_response, coord_response
|
|
|
|
|
|
def extract_text_paddle_ocr(images):
|
|
"""
|
|
PaddleOCR를 사용하여 이미지에서 텍스트 추출 및 좌표 정보 반환
|
|
"""
|
|
# os.environ["CUDA_VISIBLE_DEVICES"] = "" # GPU 사용 안 함
|
|
ocr = PaddleOCR(
|
|
use_doc_orientation_classify=False, use_doc_unwarping=False, lang="korean"
|
|
)
|
|
|
|
full_response = []
|
|
coord_response = []
|
|
|
|
for page_idx, img in enumerate(images):
|
|
print(f"[PaddleOCR] 페이지 {page_idx + 1} OCR로 텍스트 추출 중...")
|
|
img_np = np.array(img)
|
|
|
|
if len(img_np.shape) == 2: # grayscale → RGB 변환
|
|
img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2RGB)
|
|
|
|
results = ocr.predict(input=img_np)
|
|
|
|
print(f"[PaddleOCR] 페이지 {page_idx + 1} OCR 결과 개수: {len(results)}")
|
|
for res_idx, res in enumerate(results):
|
|
print(f"[PaddleOCR] 페이지 {page_idx + 1} 결과 {res_idx + 1}개 추출 완료")
|
|
|
|
res_dic = dict(res.items())
|
|
texts = res_dic.get("rec_texts", [])
|
|
boxes = res_dic.get("rec_boxes", [])
|
|
|
|
full_response.extend(texts)
|
|
|
|
# ndarray → list 변환
|
|
clean_boxes = [
|
|
box.tolist() if isinstance(box, np.ndarray) else box for box in boxes
|
|
]
|
|
coord_response.extend(clean_boxes)
|
|
|
|
print("[PaddleOCR] 전체 페이지 텍스트 및 좌표 추출 완료")
|
|
return " ".join(full_response), coord_response
|