243 lines
9.4 KiB
Python
243 lines
9.4 KiB
Python
import argparse
|
|
import json
|
|
import random
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
|
|
# --- 설정 ---
|
|
BASE_URL = "http://172.16.10.176:8892"
|
|
POLL_INTERVAL = 2 # 상태 확인 간격 (초)
|
|
SUPPORTED_EXTENSIONS = [".png", ".jpg", ".jpeg", ".webp", ".bmp", ".pdf"]
|
|
MAX_RETRIES = 5 # 최대 재시도 횟수
|
|
BACKOFF_FACTOR = 1 # 백오프 시간 기본값 (초)
|
|
|
|
|
|
def request_with_retry(method, url, **kwargs):
|
|
"""Exponential Backoff + Jitter를 적용한 재시도 로직"""
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
response = requests.request(method, url, **kwargs)
|
|
if 500 <= response.status_code < 600:
|
|
print(
|
|
f"\n⚠️ 서버 오류 ({response.status_code}). {attempt + 1}번째 재시도 중..."
|
|
)
|
|
raise requests.exceptions.HTTPError(
|
|
f"Server error: {response.status_code}"
|
|
)
|
|
|
|
response.raise_for_status()
|
|
return response
|
|
except requests.exceptions.RequestException as e:
|
|
if attempt < MAX_RETRIES - 1:
|
|
backoff_time = BACKOFF_FACTOR * (2**attempt)
|
|
jitter = random.uniform(0, 1)
|
|
sleep_time = backoff_time + jitter
|
|
print(f"\n❌ 네트워크 오류: {e}. {sleep_time:.2f}초 후 재시도합니다.")
|
|
time.sleep(sleep_time)
|
|
else:
|
|
print(
|
|
f"\n❌ 최대 재시도 횟수({MAX_RETRIES})를 초과했습니다. 최종 오류: {e}"
|
|
)
|
|
raise
|
|
|
|
|
|
def run_ocr_on_file(file_path: Path, output_dir: Path, provider: str):
|
|
"""단일 파일에 대해 OCR을 수행하고 결과를 저장합니다."""
|
|
log_prefix = f"[{file_path.name}]"
|
|
|
|
print(f"{log_prefix} 📄 처리 시작 (Provider: {provider})")
|
|
|
|
output_file_path = output_dir / f"{file_path.stem}.json"
|
|
if output_file_path.exists():
|
|
print(f"{log_prefix} 이미 결과가 존재합니다. 건너뜁니다.")
|
|
return f"SKIPPED: {file_path.name}"
|
|
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
files = {"file": (file_path.name, f, "application/octet-stream")}
|
|
endpoint = f"/ocr/{provider}"
|
|
response = request_with_retry(
|
|
"POST", f"{BASE_URL}{endpoint}", files=files, timeout=30
|
|
)
|
|
|
|
initial_data = response.json()
|
|
request_id = initial_data.get("request_id")
|
|
status_check_url = initial_data.get("status_check_url")
|
|
|
|
if not request_id or not status_check_url:
|
|
print(
|
|
f"{log_prefix} ❌ 오류: 서버 응답에 'request_id' 또는 'status_check_url'이 없습니다."
|
|
)
|
|
return f"FAILED: {file_path.name} (Invalid initial response)"
|
|
|
|
print(f"{log_prefix} - 작업 접수 완료. Request ID: {request_id}")
|
|
|
|
progress_url = f"{BASE_URL}{status_check_url}"
|
|
print(f"{log_prefix} - 결과 확인 중...", end="", flush=True)
|
|
|
|
while True:
|
|
progress_response = request_with_retry("GET", progress_url, timeout=30)
|
|
progress_data = progress_response.json()
|
|
status = progress_data.get("status")
|
|
|
|
if status == "SUCCESS":
|
|
print(" 완료!")
|
|
result_data = progress_data.get("final_result")
|
|
with open(output_file_path, "w", encoding="utf-8") as f:
|
|
json.dump(result_data, f, indent=2, ensure_ascii=False)
|
|
print(f"{log_prefix} - ✅ 결과 저장 완료: {output_file_path.name}")
|
|
return f"SUCCESS: {file_path.name}"
|
|
elif status == "FAILURE":
|
|
print(" 실패!")
|
|
print(
|
|
f"{log_prefix} - ❌ 오류: OCR 작업 실패. 상세: {progress_data.get('message')}"
|
|
)
|
|
return f"FAILED: {file_path.name} (OCR Failure)"
|
|
else: # PENDING
|
|
print(".", end="", flush=True)
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"\n{log_prefix} ❌ 최종 오류: {e}")
|
|
return f"FAILED: {file_path.name} ({e})"
|
|
except json.JSONDecodeError as e:
|
|
print(f"\n{log_prefix} ❌ 최종 오류: JSON 파싱 실패 - {e}")
|
|
return f"FAILED: {file_path.name} (JSON Parse Error)"
|
|
|
|
|
|
def main(input_dir: str, output_dir: str, provider: str, workers: int, delay: float):
|
|
"""지정된 디렉토리의 모든 지원 형식 파일에 대해 OCR을 실행합니다."""
|
|
input_path = Path(input_dir)
|
|
output_path = Path(output_dir)
|
|
|
|
if not input_path.is_dir():
|
|
print(f"오류: 입력 디렉토리를 찾을 수 없습니다: {input_dir}")
|
|
return
|
|
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
print(f"입력 디렉토리: {input_path.resolve()}")
|
|
print(f"출력 디렉토리: {output_path.resolve()}")
|
|
print(f"OCR Provider: {provider}")
|
|
|
|
files_to_process = [
|
|
p
|
|
for p in input_path.iterdir()
|
|
if p.is_file() and p.suffix.lower() in SUPPORTED_EXTENSIONS
|
|
]
|
|
|
|
if not files_to_process:
|
|
print("처리할 파일을 찾을 수 없습니다.")
|
|
return
|
|
|
|
total_files = len(files_to_process)
|
|
results = {"SUCCESS": 0, "SKIPPED": 0, "FAILED": 0}
|
|
|
|
if delay > 0:
|
|
# --- 순차(동기) 처리 모드 ---
|
|
print(f"순차 처리 모드 활성화. 요청 간 대기 시간: {delay}초")
|
|
print("-" * 40)
|
|
print(f"총 {total_files}개의 파일을 처리합니다.")
|
|
|
|
for i, file in enumerate(files_to_process):
|
|
print(f"\n--- [{i + 1}/{total_files}] ---")
|
|
try:
|
|
result_str = run_ocr_on_file(file, output_path, provider)
|
|
status = result_str.split(":")[0]
|
|
if "SUCCESS" in status:
|
|
results["SUCCESS"] += 1
|
|
elif "SKIPPED" in status:
|
|
results["SKIPPED"] += 1
|
|
else:
|
|
results["FAILED"] += 1
|
|
except Exception as exc:
|
|
print(f"\n[Main] {file.name} 처리 중 예외 발생: {exc}")
|
|
results["FAILED"] += 1
|
|
|
|
if i < total_files - 1:
|
|
print(f"요청 간 대기... ({delay}초)")
|
|
time.sleep(delay)
|
|
else:
|
|
# --- 병렬 처리 모드 ---
|
|
print(f"병렬 처리 모드 활성화. 동시 작업 수: {workers}")
|
|
print("-" * 40)
|
|
print(f"총 {total_files}개의 파일을 처리합니다.")
|
|
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
future_to_file = {
|
|
executor.submit(run_ocr_on_file, file, output_path, provider): file
|
|
for file in files_to_process
|
|
}
|
|
|
|
for i, future in enumerate(as_completed(future_to_file)):
|
|
file = future_to_file[future]
|
|
try:
|
|
result_str = future.result()
|
|
status = result_str.split(":")[0]
|
|
if "SUCCESS" in status:
|
|
results["SUCCESS"] += 1
|
|
elif "SKIPPED" in status:
|
|
results["SKIPPED"] += 1
|
|
else:
|
|
results["FAILED"] += 1
|
|
except Exception as exc:
|
|
print(f"\n[Main] {file.name} 처리 중 예외 발생: {exc}")
|
|
results["FAILED"] += 1
|
|
|
|
print(f"--- 진행 상황: {i + 1}/{total_files} ---")
|
|
|
|
print("\n" + "=" * 40)
|
|
print("모든 작업이 완료되었습니다.")
|
|
print(f" - 성공: {results['SUCCESS']}")
|
|
print(f" - 건너뜀: {results['SKIPPED']}")
|
|
print(f" - 실패: {results['FAILED']}")
|
|
print("=" * 40)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
description="디렉토리 내 파일에 대해 비동기 OCR을 병렬 또는 순차적으로 수행합니다.",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
)
|
|
parser.add_argument(
|
|
"input_dir", type=str, help="OCR을 수행할 파일들이 있는 입력 디렉토리"
|
|
)
|
|
parser.add_argument(
|
|
"output_dir", type=str, help="OCR 결과(JSON)를 저장할 출력 디렉토리"
|
|
)
|
|
parser.add_argument(
|
|
"--provider",
|
|
type=str,
|
|
default="paddle",
|
|
choices=["paddle", "upstage"],
|
|
help="사용할 OCR 공급자 (기본값: paddle)",
|
|
)
|
|
parser.add_argument(
|
|
"--workers",
|
|
type=int,
|
|
default=4,
|
|
help="병렬 처리 시 동시에 실행할 워커(스레드) 수 (기본값: 4)\n(--delay 옵션이 0보다 크면 이 값은 무시됩니다.)",
|
|
)
|
|
parser.add_argument(
|
|
"--delay",
|
|
type=float,
|
|
default=0,
|
|
help="순차 처리 시 각 요청 사이의 대기 시간(초) (기본값: 0)\n(0보다 큰 값으로 설정하면 순차 모드로 강제 실행됩니다.)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
main(args.input_dir, args.output_dir, args.provider, args.workers, args.delay)
|
|
|
|
# 사용 예시:
|
|
# 1. Paddle OCR, 병렬 워커 4개 사용 (기본)
|
|
# python workspace/run_ocr.py ./source_documents ./result_jsons
|
|
#
|
|
# 2. Upstage OCR, 병렬 워커 8개 사용
|
|
# python workspace/run_ocr.py ./source_documents ./result_jsons --provider upstage --workers 8
|
|
#
|
|
# 3. Upstage OCR, 순차 처리 (요청마다 3초 대기)
|
|
# python workspace/run_ocr.py ./source_documents ./result_jsons --provider upstage --delay 3
|