From 343ae4958a63be7438d16d364babe711937c3a44 Mon Sep 17 00:00:00 2001 From: chan Date: Tue, 12 Aug 2025 16:58:38 +0900 Subject: [PATCH] =?UTF-8?q?=EC=83=88=EB=A1=9C=EC=9A=B4=EC=97=94=EB=93=9C?= =?UTF-8?q?=ED=8F=AC=EC=9D=B8=ED=8A=B8=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 5 +- copy_files.py | 112 ----------- workspace/process_directory.py | 195 ------------------- workspace/run_ocr1.py | 335 +++++++++++++++++++++++++++++++++ workspace/run_ocr2.py | 242 ++++++++++++++++++++++++ workspace/show_summary.py | 92 --------- 6 files changed, 581 insertions(+), 400 deletions(-) delete mode 100644 copy_files.py delete mode 100644 workspace/process_directory.py create mode 100644 workspace/run_ocr1.py create mode 100644 workspace/run_ocr2.py delete mode 100644 workspace/show_summary.py diff --git a/.gitignore b/.gitignore index b82ceb9..a6ecfb0 100644 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,7 @@ ENV/ /results script_run.log /ocr_data -/workspace/shared_sessions \ No newline at end of file +/workspace/shared_sessions +/filtered_data +/josn +/data \ No newline at end of file diff --git a/copy_files.py b/copy_files.py deleted file mode 100644 index e603916..0000000 --- a/copy_files.py +++ /dev/null @@ -1,112 +0,0 @@ - -import os -import shutil - -def copy_target_files_with_structure(): - # 소스 디렉토리와 대상 디렉토리 설정 - root_source_dir = '/home/jackjack/test/ocr_macro/ocr_data' - dest_dir = '/home/jackjack/test/ocr_macro/filtered_data' - - # 복사할 파일 목록 - files_to_copy = [ - "20250701101504-789-402-926.jpg", "20250707164925-895-935-673.jpg", - "20250708092450-131-769-592.jpg", "20250708124502-268-927-842.jpg", - "20250709105123-169-457-765.jpg", "20250714150847-882-936-950.jpg", - "20250721090407-508-883-473.jpg", "20250724145851-721-283-914.jpg", - "20250729105852-697-150-153.jpg", "20250730180509-798-917-821.jpg", - "20170619133715-134-321-633.jpg", "20171017141811-255-321-370.jpg", - "20180103094436-462-212-348.jpg", "20180131103459-868-481-465.jpg", - "20180411134455-600-132-301.jpg", "20180412092830-356-712-939.jpg", - "20180807102155-126-746-229.jpg", "20190507165642-222-795-363.jpg", - "20191227103340-434-827-409.jpg", "20200113103330-999-251-437.png", - "20200313140454-282-318-706.jpg", "20201203162517-973-818-382.jpg", - "20250305150305-354-816-193.jpg", "2018-0319102207-217049.pdf", - "2018-0319114254-217049.pdf", "2021-0713114710-219044.pdf", - "2021-0713114843-219044.pdf", "2024-1129132456-223033.pdf", - "2024-1202134504-223033.pdf", "2024-1202134828-223033.pdf", - "2024-1216141625-211046.pdf", "2024-1231131430-223033.pdf", - "2025-0102114806-223033.pdf", "2025-0102115602-223033.pdf", - "20250715092937-779-181-466.jpg", "20250715110944-951-537-524.jpg", - "20250715111622-358-588-698.jpg", "20250715112411-186-289-669.jpg", - "20250715135137-801-844-961.jpg", "20250715161950-712-251-637.jpg", - "20250715162045-552-568-375.jpg", "20250715165509-176-474-591.jpg", - "20250715172557-573-573-629.jpg", "20250716093130-913-217-747.jpg", - "20250716105706-162-939-389.jpg", "20250716110134-808-994-942.jpg", - "20250716134023-322-796-383.jpg", "20250716163458-700-360-433.jpg", - "20250717093052-782-277-690.jpg", "20250717103222-584-701-241.jpg", - "20250717103712-214-193-157.jpg", "20250717110901-449-871-865.jpg", - "20250717155048-253-564-315.jpg", "20250717172043-664-630-683.jpg", - "20250718080610-968-626-824.jpg", "20250718093242-193-502-326.jpg", - "20250718105942-802-175-536.jpg", "20250718154510-618-961-614.jpg", - "20250718171201-832-262-559.jpg", "20250721103440-887-127-453.jpg", - "20250721103440-949-954-201.jpg", "20250721103556-832-150-503.jpg", - "20250721111443-531-701-811.jpg", "20250721111443-912-880-634.jpg", - "20250721112249-956-647-309.jpg", "20250721130808-958-549-703.jpg", - "20250721133831-152-461-423.jpg", "20250721145455-511-434-514.jpg", - "20250721145455-875-554-320.jpg", "20250721145456-782-822-874.jpg", - "20250721155757-121-923-232.jpg", "20250721160111-763-493-901.jpg", - "20250721160359-227-567-869.jpg", "20250721160359-337-126-571.jpg", - "20250721172118-534-854-174.jpg", "20250722083248-564-741-719.jpg", - "20250722101426-428-671-780.jpg", "20250722101619-869-994-366.jpg", - "20250722113040-790-828-516.jpg", "20250722113435-988-461-994.jpg", - "20250722132834-142-640-698.jpg", "20250722151220-665-449-414.jpg", - "20250722151447-194-809-212.jpg", "20250722151659-492-562-414.jpg", - "20250722155515-295-661-246.jpg", "20250722164044-771-951-768.jpg", - "20250723090127-752-277-978.jpg", "20250723103830-197-217-803.jpg", - "20250723110935-882-617-879.jpg", "20250723113848-341-499-399.jpg", - "20250723113849-860-361-766.jpg", "20250723135403-994-597-524.jpg", - "20250723135644-957-724-435.jpg", "20250723140727-539-276-326.jpg", - "20250723151024-958-230-632.jpg", "20250723160751-628-951-424.jpg", - "20250723160846-651-369-917.jpg", "20250723162424-328-470-393.jpg", - "20250724083131-482-629-632.jpg", "20250724084439-705-558-529.jpg", - "20250724085219-940-177-263.jpg", "20250724112248-515-638-257.jpg", - "20250724140126-814-266-218.jpg", "20250724165128-348-167-761.jpg", - "20250724170756-316-660-852.jpg", "20250725084748-172-127-509.jpg", - "20250725090550-647-253-595.jpg", "20250725103854-127-797-609.jpg", - "20250725112611-877-225-953.jpg", "20250725150958-785-430-943.jpg", - "20250725160005-618-961-614.jpg", "20250725160006-645-814-611.jpg", - "20250728110536-229-869-218.jpg", "20250728110536-422-535-360.jpg", - "20250728110536-848-126-746.jpg", "20250728133331-290-838-249.jpg", - "20250728133631-893-551-661.jpg", "20250728133731-800-849-608.jpg", - "20250728133919-745-435-884.jpg", "20250728141244-723-384-786.jpg", - "20250728163719-158-329-264.jpg", "20250729091304-312-462-757.jpg", - "20250729101639-845-837-748.jpg", "20250729150847-216-665-480.jpg", - "20250729152047-863-915-863.jpg", "20250729152047-872-458-985.jpg", - "20250729152047-915-601-759.jpg", "20250730093300-400-680-981.jpg", - "20250730101956-808-881-885.jpg" - ] - - # 대상 디렉토리가 없으면 생성 - if not os.path.exists(dest_dir): - os.makedirs(dest_dir) - print(f"'{dest_dir}' 디렉토리를 생성했습니다.") - - copied_files = set() - # root_source_dir부터 시작해서 모든 하위 디렉토리를 재귀적으로 탐색 - for dirpath, _, filenames in os.walk(root_source_dir): - for filename in filenames: - if filename in files_to_copy and filename not in copied_files: - source_file = os.path.join(dirpath, filename) - - # 원본 디렉토리 구조를 유지하기 위한 경로 계산 - relative_path = os.path.relpath(dirpath, root_source_dir) - new_dest_dir = os.path.join(dest_dir, relative_path) - - # 새로운 목적지 디렉토리 생성 - os.makedirs(new_dest_dir, exist_ok=True) - - dest_file = os.path.join(new_dest_dir, filename) - - shutil.copy2(source_file, dest_file) - print(f"'{filename}'을(를) '{new_dest_dir}'(으)로 복사했습니다.") - copied_files.add(filename) - - # 복사되지 않은 파일 확인 - missing_files = set(files_to_copy) - copied_files - if missing_files: - print("\n다음 파일들은 찾지 못했습니다:") - for filename in sorted(list(missing_files)): - print(f"- {filename}") - -if __name__ == "__main__": - copy_target_files_with_structure() diff --git a/workspace/process_directory.py b/workspace/process_directory.py deleted file mode 100644 index 0a89e8d..0000000 --- a/workspace/process_directory.py +++ /dev/null @@ -1,195 +0,0 @@ -import requests -import time -import json -import os -import argparse -import sys -from urllib.parse import urljoin -import logging -from dotenv import load_dotenv - -# --- 로거 설정 --- -# 전역 로거 객체 생성 -logger = logging.getLogger(__name__) - -def setup_logger(): - """ - 로거를 설정하여 콘솔과 고유한 타임스탬프를 가진 파일에 모두 출력하도록 합니다. - 로그 파일은 'workspace' 디렉터리 내에 저장됩니다. - """ - # 기존에 추가된 핸들러가 있다면 제거하여 중복 로깅 방지 - for handler in logger.handlers[:]: - logger.removeHandler(handler) - - logger.setLevel(logging.INFO) - formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') - - # 콘솔 핸들러 - console_handler = logging.StreamHandler() - console_handler.setFormatter(formatter) - logger.addHandler(console_handler) - - # 고유한 파일명을 위한 타임스탬프 생성 - timestamp = time.strftime("%Y%m%d_%H%M%S") - log_filename = f"{timestamp}_script_run.log" - - # 로그 파일을 workspace 디렉터리 안에 생성 - workspace_dir = os.path.dirname(__file__) - log_filepath = os.path.join(workspace_dir, log_filename) - - # 파일 핸들러 - file_handler = logging.FileHandler(log_filepath, encoding='utf-8') - file_handler.setFormatter(formatter) - logger.addHandler(file_handler) - - # 생성된 로그 파일 경로를 로깅 - logger.info(f"로그 파일이 '{log_filepath}'에 생성되었습니다.") - - -# --- API 요청 함수 --- - -def start_extraction(post_url, file_path, filename, headers, model_name=None): - """POST /extract/inner: 문서 추출 시작""" - try: - with open(file_path, 'rb') as input_f: - files_to_upload = {'input_file': (filename, input_f)} - data_payload = {} - if model_name: - data_payload['model'] = model_name - - response = requests.post(post_url, files=files_to_upload, data=data_payload, headers=headers) - response.raise_for_status() - - return response.json() - except Exception: - logger.exception(f"[{filename}] POST 요청 중 오류 발생") - return None - -def check_progress(base_url, progress_path, filename, headers): - """GET /extract/progress/{request_id}: 진행 상태 확인 (로깅 적용)""" - get_url = urljoin(base_url + '/', progress_path.lstrip('/')) - - RETRY_COUNT_ON_404 = 3 - RETRY_DELAY_ON_404 = 5 - retries_left = RETRY_COUNT_ON_404 - last_status = "" - - while True: - try: - response = requests.get(get_url, headers=headers, timeout=30) - - if response.status_code == 404: - if retries_left > 0: - logger.warning(f"[{filename}] 작업을 찾을 수 없어(404) {RETRY_DELAY_ON_404}초 후 재시도합니다... ({retries_left}회 남음)") - retries_left -= 1 - time.sleep(RETRY_DELAY_ON_404) - continue - else: - logger.error(f"[{filename}] 재시도 횟수 초과 후에도 작업을 찾을 수 없습니다 (404).") - return None - - response.raise_for_status() - data = response.json() - - if "final_result" in data and data.get("final_result") is not None: - logger.info(f"[{filename}] 처리 완료.") - return data["final_result"] - - if "progress_logs" in data and data["progress_logs"]: - status_message = data["progress_logs"][-1].get("status", "상태 확인 중...") - if status_message != last_status: - last_status = status_message - logger.info(f"[{filename}] 진행 상태: {last_status}") - - time.sleep(2) - except requests.exceptions.ReadTimeout: - logger.warning(f"[{filename}] 상태 확인 타임아웃. 재시도...") - time.sleep(2) - except Exception: - logger.exception(f"[{filename}] 상태 확인 중 예측하지 못한 오류 발생") - return None - -# --- 메인 실행 로직 --- - -def main(): - # .env 파일에서 환경 변수 로드 (workspace 디렉터리 기준) - dotenv_path = os.path.join(os.path.dirname(__file__), '.env') - load_dotenv(dotenv_path=dotenv_path) - - # 로거를 가장 먼저 설정 - setup_logger() - - # 환경 변수에서 API 정보 가져오기 - BASE_URL = os.getenv("BASE_URL") - API_KEY = os.getenv("API_KEY") - - if not BASE_URL or not API_KEY: - logger.error("환경 변수(BASE_URL, API_KEY)가 설정되지 않았습니다. workspace/.env 파일을 확인하세요.") - return - - parser = argparse.ArgumentParser(description="문서 정보 추출 자동화 스크립트") - parser.add_argument("input_dir", help="입력 디렉터리 경로") - parser.add_argument("-o", "--output_dir", default="results", help="출력 디렉터리 경로") - parser.add_argument("--endpoint", choices=['i18n', 'd6c'], default='i18n', help="추출 API 엔드포인트 선택 (i18n 또는 d6c)") - parser.add_argument("--model", dest="model_name", help="사용할 LLM 모델 이름") - args = parser.parse_args() - - if not os.path.isdir(args.input_dir): - logger.error(f"입력 디렉터리를 찾을 수 없습니다 - {args.input_dir}") - return - - os.makedirs(args.output_dir, exist_ok=True) - headers = {'X-API-KEY': API_KEY} - - post_url = f"{BASE_URL}/extract/inner/{args.endpoint}" - - logger.info("="*20 + " 스크립트 시작 " + "="*20) - logger.info(f"API 서버: {BASE_URL}") - logger.info(f"요청 API: {post_url}") - logger.info(f"입력 디렉터리: {args.input_dir}") - logger.info(f"출력 디렉터리: {args.output_dir}") - - # 처리할 파일 목록 준비 - files_to_process = [f for f in sorted(os.listdir(args.input_dir)) if os.path.isfile(os.path.join(args.input_dir, f))] - total_files = len(files_to_process) - logger.info(f"총 {total_files}개의 파일을 처리합니다.") - - for i, filename in enumerate(files_to_process): - file_path = os.path.join(args.input_dir, filename) - - logger.info(f"--- ({i+1}/{total_files}) 처리 시작: {filename} ---") - - initial_response = start_extraction(post_url, file_path, filename, headers, args.model_name) - if not initial_response: - logger.error(f"[{filename}] 파일 처리 실패 (추출 시작 단계)") - continue - - request_id = initial_response.get("request_id") - status_check_url = initial_response.get("status_check_url") - - if not request_id or not status_check_url: - logger.error(f"[{filename}] 초기 응답이 잘못되었습니다: {initial_response}") - continue - - logger.info(f"[{filename}] 작업 요청 성공. Request ID: {request_id}") - - final_result = check_progress(BASE_URL, status_check_url, filename, headers) - - if final_result: - output_path = os.path.join(args.output_dir, f"{os.path.splitext(filename)[0]}.json") - try: - with open(output_path, 'w', encoding='utf-8') as f: - json.dump(final_result, f, indent=2, ensure_ascii=False) - logger.info(f"[{filename}] 결과 저장 완료: {output_path}") - except IOError: - logger.exception(f"[{filename}] 파일 저장 중 오류 발생") - else: - logger.error(f"[{filename}] 파일 처리 실패 (결과 확인 단계)") - - logger.info("="*20 + " 모든 작업 완료 " + "="*20) - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - logging.getLogger(__name__).warning("사용자에 의해 작업이 중단되었습니다.") \ No newline at end of file diff --git a/workspace/run_ocr1.py b/workspace/run_ocr1.py new file mode 100644 index 0000000..f1d1ff3 --- /dev/null +++ b/workspace/run_ocr1.py @@ -0,0 +1,335 @@ +import argparse +import json +import logging +import logging.handlers +import os +import random +import time +from multiprocessing import Pool, Queue, current_process +from urllib.parse import urljoin + +import requests +from dotenv import load_dotenv + +# --- 전역 변수 및 로깅 시스템 --- + +worker_log_queue = None +completed_tasks_count = 0 +total_tasks = 0 + + +def setup_main_logger(log_queue): + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + formatter = logging.Formatter( + "%(asctime)s - %(processName)s - %(levelname)s - %(message)s" + ) + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + timestamp = time.strftime("%Y%m%d_%H%M%S") + log_filename = f"{timestamp}_robust_run.log" + workspace_dir = os.path.dirname(os.path.abspath(__file__)) or "." + log_filepath = os.path.join(workspace_dir, log_filename) + file_handler = logging.FileHandler(log_filepath, encoding="utf-8") + file_handler.setFormatter(formatter) + queue_listener = logging.handlers.QueueListener( + log_queue, console_handler, file_handler + ) + queue_listener.start() + + bootstrap_logger = logging.getLogger("bootstrap") + bootstrap_logger.setLevel(logging.INFO) + bootstrap_logger.addHandler(console_handler) + bootstrap_logger.addHandler(file_handler) + bootstrap_logger.propagate = False + + bootstrap_logger.info(f"메인 로거 설정 완료. 로그 파일: '{log_filepath}'") + bootstrap_logger.info("=" * 20 + " 스크립트 시작 " + "=" * 20) + return queue_listener, bootstrap_logger + + +def init_worker(log_queue): + global worker_log_queue + worker_log_queue = log_queue + + +def get_worker_logger(): + global worker_log_queue + worker_logger = logging.getLogger(current_process().name) + if not worker_logger.handlers: + worker_logger.setLevel(logging.INFO) + queue_handler = logging.handlers.QueueHandler(worker_log_queue) + worker_logger.addHandler(queue_handler) + return worker_logger + + +# --- API 요청 함수 (변경 없음) --- +def start_extraction(post_url, file_path, filename, headers, model_name=None): + worker_logger = get_worker_logger() + worker_logger.info(f"[{filename}] 파일 업로드 및 추출 요청 시작...") + # ... (내부 로직은 이전과 동일) + start_time = time.time() + try: + with open(file_path, "rb") as input_f: + files_to_upload = {"input_file": (filename, input_f)} + data_payload = {} + if model_name: + data_payload["model"] = model_name + response = requests.post( + post_url, + files=files_to_upload, + data=data_payload, + headers=headers, + timeout=300, + ) + end_time = time.time() + worker_logger.info( + f"[{filename}] 추출 요청 완료. 소요 시간: {end_time - start_time:.2f}초, 상태 코드: {response.status_code}" + ) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + end_time = time.time() + worker_logger.exception( + f"[{filename}] 추출 요청 중 치명적 오류 발생. 소요 시간: {end_time - start_time:.2f}초. 오류: {e}" + ) + return None + + +def check_progress(base_url, progress_path, filename, headers): + worker_logger = get_worker_logger() + get_url = urljoin(base_url + "/", progress_path.lstrip("/")) + # ... (내부 로직은 이전과 동일) + base_delay = 5 + max_delay = 60 + attempt = 0 + last_status = "" + while True: + delay = min(max_delay, base_delay * (2**attempt)) + jitter = delay * 0.2 + sleep_duration = delay + random.uniform(-jitter, jitter) + worker_logger.info( + f"[{filename}] {sleep_duration:.2f}초 후 상태 확인 (시도 {attempt + 1})..." + ) + time.sleep(sleep_duration) + start_time = time.time() + try: + response = requests.get(get_url, headers=headers, timeout=60) + end_time = time.time() + if response.status_code != 200: + worker_logger.warning( + f"[{filename}] 상태 확인 응답 코드: {response.status_code}. 소요 시간: {end_time - start_time:.2f}초" + ) + if response.status_code == 404: + if attempt < 5: + worker_logger.warning( + f"[{filename}] 작업을 찾을 수 없음(404). 재시도합니다." + ) + attempt += 1 + continue + else: + worker_logger.error( + f"[{filename}] 재시도 횟수 초과 후에도 작업을 찾을 수 없음(404). 처리 중단." + ) + return None + response.raise_for_status() + data = response.json() + if "final_result" in data and data.get("final_result") is not None: + worker_logger.info(f"[{filename}] 처리 완료.") + return data["final_result"] + if "progress_logs" in data and data["progress_logs"]: + status_message = data["progress_logs"][-1].get( + "status", "상태 확인 중..." + ) + if status_message != last_status: + last_status = status_message + worker_logger.info(f"[{filename}] 진행 상태: {last_status}") + attempt += 1 + except requests.exceptions.ReadTimeout: + end_time = time.time() + worker_logger.warning( + f"[{filename}] 상태 확인 타임아웃. 소요 시간: {end_time - start_time:.2f}초. 재시도..." + ) + attempt += 1 + except requests.exceptions.RequestException as e: + end_time = time.time() + worker_logger.exception( + f"[{filename}] 상태 확인 중 치명적 오류 발생. 소요 시간: {end_time - start_time:.2f}초. 오류: {e}" + ) + return None + + +# --- 워커 함수 및 콜백 함수 --- + + +# <--- 핵심 변경 1: 워커 함수가 순번(index)과 전체 개수(total)를 인자로 받음 --- +def process_file_worker(index, total, file_path, config): + """단일 파일을 처리하는 워커 함수. 이제 순번과 전체 개수를 인자로 받습니다.""" + worker_logger = get_worker_logger() + filename = os.path.basename(file_path) + + # <--- 핵심 변경 2: 로그 출력 시 (순번/전체) 형식 사용 --- + worker_logger.info(f"--- ({index}/{total}) 처리 시작: {filename} ---") + + initial_response = start_extraction( + config["post_url"], file_path, filename, config["headers"], config["model_name"] + ) + if not initial_response: + worker_logger.error(f"[{filename}] 파일 처리 실패 (추출 시작 단계)") + return index, filename, False # <--- 핵심 변경 3: 결과 반환 시 순번도 함께 전달 + + request_id = initial_response.get("request_id") + status_check_url = initial_response.get("status_check_url") + + if not request_id or not status_check_url: + worker_logger.error( + f"[{filename}] 초기 응답이 잘못되었습니다: {initial_response}" + ) + return index, filename, False + + worker_logger.info(f"[{filename}] 작업 요청 성공. Request ID: {request_id}") + + final_result = check_progress( + config["base_url"], status_check_url, filename, config["headers"] + ) + + if final_result: + output_path = os.path.join( + config["output_dir"], f"{os.path.splitext(filename)[0]}.json" + ) + try: + with open(output_path, "w", encoding="utf-8") as f: + json.dump(final_result, f, indent=2, ensure_ascii=False) + worker_logger.info(f"[{filename}] 결과 저장 완료: {output_path}") + return index, filename, True + except IOError: + worker_logger.exception(f"[{filename}] 파일 저장 중 오류 발생") + return index, filename, False + else: + worker_logger.error(f"[{filename}] 파일 처리 실패 (결과 확인 단계)") + return index, filename, False + + +def update_progress(result): + """진행 상황을 업데이트하고 출력하는 콜백 함수.""" + global completed_tasks_count, total_tasks + completed_tasks_count += 1 + + # <--- 핵심 변경 4: 콜백 함수가 순번을 함께 받아서 로그에 표시 --- + index, filename, success = result + status = "성공" if success else "실패" + logging.getLogger("bootstrap").info( + f"--- ({completed_tasks_count}/{total_tasks}) 완료: [{index}번째 파일] {filename} (상태: {status}) ---" + ) + + +# --- 메인 실행 로직 --- +def main(): + global total_tasks + # ... (인자 파싱 및 설정 부분은 이전과 동일) ... + dotenv_path = os.path.join(os.path.dirname(__file__), ".env") + load_dotenv(dotenv_path=dotenv_path) + BASE_URL = os.getenv("BASE_URL") + API_KEY = os.getenv("API_KEY") + parser = argparse.ArgumentParser( + description="문서 정보 추출 자동화 스크립트 (병렬 처리)" + ) + parser.add_argument("input_dir", help="입력 디렉터리 경로") + parser.add_argument( + "-o", "--output_dir", default="results", help="출력 디렉터리 경로" + ) + parser.add_argument( + "--endpoint", + choices=["i18n", "d6c"], + default="i18n", + help="추출 API 엔드포인트 선택", + ) + parser.add_argument("--model", dest="model_name", help="사용할 LLM 모델 이름") + parser.add_argument( + "-w", "--workers", type=int, default=4, help="동시에 실행할 워커 프로세스 수" + ) + args = parser.parse_args() + log_queue = Queue() + queue_listener, logger = setup_main_logger(log_queue) + if not BASE_URL or not API_KEY: + logger.error("환경 변수(BASE_URL, API_KEY)가 .env 파일에 설정되지 않았습니다.") + queue_listener.stop() + return + if not os.path.isdir(args.input_dir): + logger.error(f"입력 디렉터리를 찾을 수 없습니다 - {args.input_dir}") + queue_listener.stop() + return + os.makedirs(args.output_dir, exist_ok=True) + headers = {"X-API-KEY": API_KEY} + post_url = f"{BASE_URL}/extract/inner/{args.endpoint}" + logger.info(f"API 서버: {BASE_URL}") + logger.info(f"요청 API: {post_url}") + logger.info(f"입력 디렉터리: {args.input_dir}") + logger.info(f"출력 디렉터리: {args.output_dir}") + logger.info(f"동시 작업 수 (워커): {args.workers}") + files_to_process = [ + os.path.join(args.input_dir, f) + for f in sorted(os.listdir(args.input_dir)) + if os.path.isfile(os.path.join(args.input_dir, f)) + ] + total_tasks = len(files_to_process) + logger.info(f"총 {total_tasks}개의 파일을 처리합니다.") + config = { + "base_url": BASE_URL, + "post_url": post_url, + "headers": headers, + "model_name": args.model_name, + "output_dir": args.output_dir, + } + + results = [] + pool = None + try: + pool = Pool( + processes=args.workers, initializer=init_worker, initargs=(log_queue,) + ) + async_results = [] + + # <--- 핵심 변경 5: for 루프에서 enumerate를 사용하여 순번(i)을 함께 전달 --- + for i, file_path in enumerate(files_to_process): + # 작업 인자에 (순번, 전체 개수, 파일 경로, 설정)을 담음 + task_args = (i + 1, total_tasks, file_path, config) + res = pool.apply_async( + process_file_worker, args=task_args, callback=update_progress + ) + async_results.append(res) + + pool.close() + pool.join() + + results = [res.get() for res in async_results] + + except KeyboardInterrupt: + logger.warning("사용자에 의해 작업이 중단되었습니다.") + if pool: + pool.terminate() + pool.join() + except Exception: + logging.getLogger(__name__).exception( + "메인 프로세스에서 예기치 않은 오류가 발생했습니다." + ) + if pool: + pool.terminate() + pool.join() + + # <--- 핵심 변경 6: 최종 결과 요약 시 순번을 제외하고 파일명만 사용 --- + successful_files = [filename for index, filename, success in results if success] + failed_files = [filename for index, filename, success in results if not success] + + logger.info("=" * 20 + " 모든 작업 완료 " + "=" * 20) + logger.info( + f"총 {total_tasks}개 파일 중 {len(successful_files)}개 성공, {len(failed_files)}개 실패." + ) + if failed_files: + logger.warning(f"실패한 파일 목록: {failed_files}") + + queue_listener.stop() + + +if __name__ == "__main__": + main() diff --git a/workspace/run_ocr2.py b/workspace/run_ocr2.py new file mode 100644 index 0000000..0dc3079 --- /dev/null +++ b/workspace/run_ocr2.py @@ -0,0 +1,242 @@ +import argparse +import json +import random +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path + +import requests + +# --- 설정 --- +BASE_URL = "http://172.16.10.176:8892" +POLL_INTERVAL = 2 # 상태 확인 간격 (초) +SUPPORTED_EXTENSIONS = [".png", ".jpg", ".jpeg", ".webp", ".bmp", ".pdf"] +MAX_RETRIES = 5 # 최대 재시도 횟수 +BACKOFF_FACTOR = 1 # 백오프 시간 기본값 (초) + + +def request_with_retry(method, url, **kwargs): + """Exponential Backoff + Jitter를 적용한 재시도 로직""" + for attempt in range(MAX_RETRIES): + try: + response = requests.request(method, url, **kwargs) + if 500 <= response.status_code < 600: + print( + f"\n⚠️ 서버 오류 ({response.status_code}). {attempt + 1}번째 재시도 중..." + ) + raise requests.exceptions.HTTPError( + f"Server error: {response.status_code}" + ) + + response.raise_for_status() + return response + except requests.exceptions.RequestException as e: + if attempt < MAX_RETRIES - 1: + backoff_time = BACKOFF_FACTOR * (2**attempt) + jitter = random.uniform(0, 1) + sleep_time = backoff_time + jitter + print(f"\n❌ 네트워크 오류: {e}. {sleep_time:.2f}초 후 재시도합니다.") + time.sleep(sleep_time) + else: + print( + f"\n❌ 최대 재시도 횟수({MAX_RETRIES})를 초과했습니다. 최종 오류: {e}" + ) + raise + + +def run_ocr_on_file(file_path: Path, output_dir: Path, provider: str): + """단일 파일에 대해 OCR을 수행하고 결과를 저장합니다.""" + log_prefix = f"[{file_path.name}]" + + print(f"{log_prefix} 📄 처리 시작 (Provider: {provider})") + + output_file_path = output_dir / f"{file_path.stem}.json" + if output_file_path.exists(): + print(f"{log_prefix} 이미 결과가 존재합니다. 건너뜁니다.") + return f"SKIPPED: {file_path.name}" + + try: + with open(file_path, "rb") as f: + files = {"file": (file_path.name, f, "application/octet-stream")} + endpoint = f"/ocr/{provider}" + response = request_with_retry( + "POST", f"{BASE_URL}{endpoint}", files=files, timeout=30 + ) + + initial_data = response.json() + request_id = initial_data.get("request_id") + status_check_url = initial_data.get("status_check_url") + + if not request_id or not status_check_url: + print( + f"{log_prefix} ❌ 오류: 서버 응답에 'request_id' 또는 'status_check_url'이 없습니다." + ) + return f"FAILED: {file_path.name} (Invalid initial response)" + + print(f"{log_prefix} - 작업 접수 완료. Request ID: {request_id}") + + progress_url = f"{BASE_URL}{status_check_url}" + print(f"{log_prefix} - 결과 확인 중...", end="", flush=True) + + while True: + progress_response = request_with_retry("GET", progress_url, timeout=30) + progress_data = progress_response.json() + status = progress_data.get("status") + + if status == "SUCCESS": + print(" 완료!") + result_data = progress_data.get("final_result") + with open(output_file_path, "w", encoding="utf-8") as f: + json.dump(result_data, f, indent=2, ensure_ascii=False) + print(f"{log_prefix} - ✅ 결과 저장 완료: {output_file_path.name}") + return f"SUCCESS: {file_path.name}" + elif status == "FAILURE": + print(" 실패!") + print( + f"{log_prefix} - ❌ 오류: OCR 작업 실패. 상세: {progress_data.get('message')}" + ) + return f"FAILED: {file_path.name} (OCR Failure)" + else: # PENDING + print(".", end="", flush=True) + time.sleep(POLL_INTERVAL) + + except requests.exceptions.RequestException as e: + print(f"\n{log_prefix} ❌ 최종 오류: {e}") + return f"FAILED: {file_path.name} ({e})" + except json.JSONDecodeError as e: + print(f"\n{log_prefix} ❌ 최종 오류: JSON 파싱 실패 - {e}") + return f"FAILED: {file_path.name} (JSON Parse Error)" + + +def main(input_dir: str, output_dir: str, provider: str, workers: int, delay: float): + """지정된 디렉토리의 모든 지원 형식 파일에 대해 OCR을 실행합니다.""" + input_path = Path(input_dir) + output_path = Path(output_dir) + + if not input_path.is_dir(): + print(f"오류: 입력 디렉토리를 찾을 수 없습니다: {input_dir}") + return + + output_path.mkdir(parents=True, exist_ok=True) + print(f"입력 디렉토리: {input_path.resolve()}") + print(f"출력 디렉토리: {output_path.resolve()}") + print(f"OCR Provider: {provider}") + + files_to_process = [ + p + for p in input_path.iterdir() + if p.is_file() and p.suffix.lower() in SUPPORTED_EXTENSIONS + ] + + if not files_to_process: + print("처리할 파일을 찾을 수 없습니다.") + return + + total_files = len(files_to_process) + results = {"SUCCESS": 0, "SKIPPED": 0, "FAILED": 0} + + if delay > 0: + # --- 순차(동기) 처리 모드 --- + print(f"순차 처리 모드 활성화. 요청 간 대기 시간: {delay}초") + print("-" * 40) + print(f"총 {total_files}개의 파일을 처리합니다.") + + for i, file in enumerate(files_to_process): + print(f"\n--- [{i + 1}/{total_files}] ---") + try: + result_str = run_ocr_on_file(file, output_path, provider) + status = result_str.split(":")[0] + if "SUCCESS" in status: + results["SUCCESS"] += 1 + elif "SKIPPED" in status: + results["SKIPPED"] += 1 + else: + results["FAILED"] += 1 + except Exception as exc: + print(f"\n[Main] {file.name} 처리 중 예외 발생: {exc}") + results["FAILED"] += 1 + + if i < total_files - 1: + print(f"요청 간 대기... ({delay}초)") + time.sleep(delay) + else: + # --- 병렬 처리 모드 --- + print(f"병렬 처리 모드 활성화. 동시 작업 수: {workers}") + print("-" * 40) + print(f"총 {total_files}개의 파일을 처리합니다.") + + with ThreadPoolExecutor(max_workers=workers) as executor: + future_to_file = { + executor.submit(run_ocr_on_file, file, output_path, provider): file + for file in files_to_process + } + + for i, future in enumerate(as_completed(future_to_file)): + file = future_to_file[future] + try: + result_str = future.result() + status = result_str.split(":")[0] + if "SUCCESS" in status: + results["SUCCESS"] += 1 + elif "SKIPPED" in status: + results["SKIPPED"] += 1 + else: + results["FAILED"] += 1 + except Exception as exc: + print(f"\n[Main] {file.name} 처리 중 예외 발생: {exc}") + results["FAILED"] += 1 + + print(f"--- 진행 상황: {i + 1}/{total_files} ---") + + print("\n" + "=" * 40) + print("모든 작업이 완료되었습니다.") + print(f" - 성공: {results['SUCCESS']}") + print(f" - 건너뜀: {results['SKIPPED']}") + print(f" - 실패: {results['FAILED']}") + print("=" * 40) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="디렉토리 내 파일에 대해 비동기 OCR을 병렬 또는 순차적으로 수행합니다.", + formatter_class=argparse.RawTextHelpFormatter, + ) + parser.add_argument( + "input_dir", type=str, help="OCR을 수행할 파일들이 있는 입력 디렉토리" + ) + parser.add_argument( + "output_dir", type=str, help="OCR 결과(JSON)를 저장할 출력 디렉토리" + ) + parser.add_argument( + "--provider", + type=str, + default="paddle", + choices=["paddle", "upstage"], + help="사용할 OCR 공급자 (기본값: paddle)", + ) + parser.add_argument( + "--workers", + type=int, + default=4, + help="병렬 처리 시 동시에 실행할 워커(스레드) 수 (기본값: 4)\n(--delay 옵션이 0보다 크면 이 값은 무시됩니다.)", + ) + parser.add_argument( + "--delay", + type=float, + default=0, + help="순차 처리 시 각 요청 사이의 대기 시간(초) (기본값: 0)\n(0보다 큰 값으로 설정하면 순차 모드로 강제 실행됩니다.)", + ) + + args = parser.parse_args() + + main(args.input_dir, args.output_dir, args.provider, args.workers, args.delay) + +# 사용 예시: +# 1. Paddle OCR, 병렬 워커 4개 사용 (기본) +# python workspace/run_ocr.py ./source_documents ./result_jsons +# +# 2. Upstage OCR, 병렬 워커 8개 사용 +# python workspace/run_ocr.py ./source_documents ./result_jsons --provider upstage --workers 8 +# +# 3. Upstage OCR, 순차 처리 (요청마다 3초 대기) +# python workspace/run_ocr.py ./source_documents ./result_jsons --provider upstage --delay 3 diff --git a/workspace/show_summary.py b/workspace/show_summary.py deleted file mode 100644 index 6af7f9b..0000000 --- a/workspace/show_summary.py +++ /dev/null @@ -1,92 +0,0 @@ -# workspace/show_summary.py -import os -import json -import argparse -import pandas as pd - -def generate_summary(directory_path): - """ - 지정된 디렉터리에서 모든 JSON 파일을 읽어 요약 정보를 추출하고, - pandas DataFrame으로 반환합니다. - """ - summary_data = [] - - if not os.path.isdir(directory_path): - print(f"오류: 디렉터리를 찾을 수 없습니다 - {directory_path}") - return None - - for filename in sorted(os.listdir(directory_path)): - if filename.endswith('.json'): - file_path = os.path.join(directory_path, filename) - - try: - with open(file_path, 'r', encoding='utf-8') as f: - data = json.load(f) - - # JSON 파일이 리스트 형태이므로 첫 번째 항목을 사용 - if isinstance(data, list) and data: - item = data[0] - else: - # 예상치 못한 형식이면 건너뛰기 - continue - - # 필요한 정보 추출 - row_data = { - 'filename': item.get('filename'), - 'duration_sec': item.get('time', {}).get('duration_sec') - } - - # 'processed' 딕셔너리의 모든 키-값을 row_data에 추가 - processed_info = item.get('processed', {}) - if isinstance(processed_info, dict): - row_data.update(processed_info) - - summary_data.append(row_data) - - except (json.JSONDecodeError, IndexError) as e: - print(f"파일 처리 중 오류 발생 ({filename}): {e}") - except Exception as e: - print(f"알 수 없는 오류 발생 ({filename}): {e}") - - if not summary_data: - print("처리할 JSON 파일이 없습니다.") - return None - - return pd.DataFrame(summary_data) - -def main(): - """메인 실행 함수""" - parser = argparse.ArgumentParser(description="JSON 파일들을 읽어 요약 테이블을 생성하고 CSV로 저장하는 스크립트") - parser.add_argument("input_dir", help="JSON 파일들이 포함된 입력 디렉터리 경로") - parser.add_argument("-o", "--output", help="요약 결과를 저장할 CSV 파일 경로") - args = parser.parse_args() - - # pandas 출력 옵션 설정 - pd.set_option('display.max_rows', 500) - pd.set_option('display.max_columns', 50) - pd.set_option('display.width', 200) - - summary_df = generate_summary(args.input_dir) - - if summary_df is not None: - print("\n--- JSON 처리 결과 요약 ---") - print(summary_df) - print("\n") - - # CSV 파일로 저장하는 로직 추가 - if args.output: - output_path = args.output - # 파일명에 .csv 확장자가 없으면 자동으로 추가 - if not output_path.lower().endswith('.csv'): - output_path += '.csv' - - try: - # CSV 파일 저장 시 Excel에서 한글이 깨지지 않도록 'utf-8-sig' 인코딩 사용 - summary_df.to_csv(output_path, index=False, encoding='utf-8-sig') - print(f"요약 결과가 '{output_path}' 파일로 성공적으로 저장되었습니다.") - except Exception as e: - print(f"CSV 파일 저장 중 오류가 발생했습니다: {e}") - print("\n") - -if __name__ == "__main__": - main()