import argparse import json import logging import logging.handlers import os import random import time from multiprocessing import Pool, Queue, current_process from urllib.parse import urljoin import requests from dotenv import load_dotenv # --- 전역 변수 및 로깅 시스템 --- worker_log_queue = None completed_tasks_count = 0 total_tasks = 0 def setup_main_logger(log_queue): logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) formatter = logging.Formatter( "%(asctime)s - %(processName)s - %(levelname)s - %(message)s" ) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) timestamp = time.strftime("%Y%m%d_%H%M%S") log_filename = f"{timestamp}_robust_run.log" workspace_dir = os.path.dirname(os.path.abspath(__file__)) or "." log_filepath = os.path.join(workspace_dir, log_filename) file_handler = logging.FileHandler(log_filepath, encoding="utf-8") file_handler.setFormatter(formatter) queue_listener = logging.handlers.QueueListener( log_queue, console_handler, file_handler ) queue_listener.start() bootstrap_logger = logging.getLogger("bootstrap") bootstrap_logger.setLevel(logging.INFO) bootstrap_logger.addHandler(console_handler) bootstrap_logger.addHandler(file_handler) bootstrap_logger.propagate = False bootstrap_logger.info(f"메인 로거 설정 완료. 로그 파일: '{log_filepath}'") bootstrap_logger.info("=" * 20 + " 스크립트 시작 " + "=" * 20) return queue_listener, bootstrap_logger def init_worker(log_queue): global worker_log_queue worker_log_queue = log_queue def get_worker_logger(): global worker_log_queue worker_logger = logging.getLogger(current_process().name) if not worker_logger.handlers: worker_logger.setLevel(logging.INFO) queue_handler = logging.handlers.QueueHandler(worker_log_queue) worker_logger.addHandler(queue_handler) return worker_logger # --- API 요청 함수 (변경 없음) --- def start_extraction(post_url, file_path, filename, headers, model_name=None): worker_logger = get_worker_logger() worker_logger.info(f"[{filename}] 파일 업로드 및 추출 요청 시작...") # ... (내부 로직은 이전과 동일) start_time = time.time() try: with open(file_path, "rb") as input_f: files_to_upload = {"input_file": (filename, input_f)} data_payload = {} if model_name: data_payload["model"] = model_name response = requests.post( post_url, files=files_to_upload, data=data_payload, headers=headers, timeout=300, ) end_time = time.time() worker_logger.info( f"[{filename}] 추출 요청 완료. 소요 시간: {end_time - start_time:.2f}초, 상태 코드: {response.status_code}" ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: end_time = time.time() worker_logger.exception( f"[{filename}] 추출 요청 중 치명적 오류 발생. 소요 시간: {end_time - start_time:.2f}초. 오류: {e}" ) return None def check_progress(base_url, progress_path, filename, headers): worker_logger = get_worker_logger() get_url = urljoin(base_url + "/", progress_path.lstrip("/")) # ... (내부 로직은 이전과 동일) base_delay = 5 max_delay = 60 attempt = 0 last_status = "" while True: delay = min(max_delay, base_delay * (2**attempt)) jitter = delay * 0.2 sleep_duration = delay + random.uniform(-jitter, jitter) worker_logger.info( f"[{filename}] {sleep_duration:.2f}초 후 상태 확인 (시도 {attempt + 1})..." ) time.sleep(sleep_duration) start_time = time.time() try: response = requests.get(get_url, headers=headers, timeout=60) end_time = time.time() if response.status_code != 200: worker_logger.warning( f"[{filename}] 상태 확인 응답 코드: {response.status_code}. 소요 시간: {end_time - start_time:.2f}초" ) if response.status_code == 404: if attempt < 5: worker_logger.warning( f"[{filename}] 작업을 찾을 수 없음(404). 재시도합니다." ) attempt += 1 continue else: worker_logger.error( f"[{filename}] 재시도 횟수 초과 후에도 작업을 찾을 수 없음(404). 처리 중단." ) return None response.raise_for_status() data = response.json() if "final_result" in data and data.get("final_result") is not None: worker_logger.info(f"[{filename}] 처리 완료.") return data["final_result"] if "progress_logs" in data and data["progress_logs"]: status_message = data["progress_logs"][-1].get( "status", "상태 확인 중..." ) if status_message != last_status: last_status = status_message worker_logger.info(f"[{filename}] 진행 상태: {last_status}") attempt += 1 except requests.exceptions.ReadTimeout: end_time = time.time() worker_logger.warning( f"[{filename}] 상태 확인 타임아웃. 소요 시간: {end_time - start_time:.2f}초. 재시도..." ) attempt += 1 except requests.exceptions.RequestException as e: end_time = time.time() worker_logger.exception( f"[{filename}] 상태 확인 중 치명적 오류 발생. 소요 시간: {end_time - start_time:.2f}초. 오류: {e}" ) return None # --- 워커 함수 및 콜백 함수 --- # <--- 핵심 변경 1: 워커 함수가 순번(index)과 전체 개수(total)를 인자로 받음 --- def process_file_worker(index, total, file_path, config): """단일 파일을 처리하는 워커 함수. 이제 순번과 전체 개수를 인자로 받습니다.""" worker_logger = get_worker_logger() filename = os.path.basename(file_path) # <--- 핵심 변경 2: 로그 출력 시 (순번/전체) 형식 사용 --- worker_logger.info(f"--- ({index}/{total}) 처리 시작: {filename} ---") initial_response = start_extraction( config["post_url"], file_path, filename, config["headers"], config["model_name"] ) if not initial_response: worker_logger.error(f"[{filename}] 파일 처리 실패 (추출 시작 단계)") return index, filename, False # <--- 핵심 변경 3: 결과 반환 시 순번도 함께 전달 request_id = initial_response.get("request_id") status_check_url = initial_response.get("status_check_url") if not request_id or not status_check_url: worker_logger.error( f"[{filename}] 초기 응답이 잘못되었습니다: {initial_response}" ) return index, filename, False worker_logger.info(f"[{filename}] 작업 요청 성공. Request ID: {request_id}") final_result = check_progress( config["base_url"], status_check_url, filename, config["headers"] ) if final_result: output_path = os.path.join( config["output_dir"], f"{os.path.splitext(filename)[0]}.json" ) try: with open(output_path, "w", encoding="utf-8") as f: json.dump(final_result, f, indent=2, ensure_ascii=False) worker_logger.info(f"[{filename}] 결과 저장 완료: {output_path}") return index, filename, True except IOError: worker_logger.exception(f"[{filename}] 파일 저장 중 오류 발생") return index, filename, False else: worker_logger.error(f"[{filename}] 파일 처리 실패 (결과 확인 단계)") return index, filename, False def update_progress(result): """진행 상황을 업데이트하고 출력하는 콜백 함수.""" global completed_tasks_count, total_tasks completed_tasks_count += 1 # <--- 핵심 변경 4: 콜백 함수가 순번을 함께 받아서 로그에 표시 --- index, filename, success = result status = "성공" if success else "실패" logging.getLogger("bootstrap").info( f"--- ({completed_tasks_count}/{total_tasks}) 완료: [{index}번째 파일] {filename} (상태: {status}) ---" ) # --- 메인 실행 로직 --- def main(): global total_tasks # ... (인자 파싱 및 설정 부분은 이전과 동일) ... dotenv_path = os.path.join(os.path.dirname(__file__), ".env") load_dotenv(dotenv_path=dotenv_path) BASE_URL = os.getenv("BASE_URL") API_KEY = os.getenv("API_KEY") parser = argparse.ArgumentParser( description="문서 정보 추출 자동화 스크립트 (병렬 처리)" ) parser.add_argument("input_dir", help="입력 디렉터리 경로") parser.add_argument( "-o", "--output_dir", default="results", help="출력 디렉터리 경로" ) parser.add_argument( "--endpoint", choices=["i18n", "d6c"], default="i18n", help="추출 API 엔드포인트 선택", ) parser.add_argument("--model", dest="model_name", help="사용할 LLM 모델 이름") parser.add_argument( "-w", "--workers", type=int, default=4, help="동시에 실행할 워커 프로세스 수" ) args = parser.parse_args() log_queue = Queue() queue_listener, logger = setup_main_logger(log_queue) if not BASE_URL or not API_KEY: logger.error("환경 변수(BASE_URL, API_KEY)가 .env 파일에 설정되지 않았습니다.") queue_listener.stop() return if not os.path.isdir(args.input_dir): logger.error(f"입력 디렉터리를 찾을 수 없습니다 - {args.input_dir}") queue_listener.stop() return os.makedirs(args.output_dir, exist_ok=True) headers = {"X-API-KEY": API_KEY} post_url = f"{BASE_URL}/extract/inner/{args.endpoint}" logger.info(f"API 서버: {BASE_URL}") logger.info(f"요청 API: {post_url}") logger.info(f"입력 디렉터리: {args.input_dir}") logger.info(f"출력 디렉터리: {args.output_dir}") logger.info(f"동시 작업 수 (워커): {args.workers}") files_to_process = [ os.path.join(args.input_dir, f) for f in sorted(os.listdir(args.input_dir)) if os.path.isfile(os.path.join(args.input_dir, f)) ] total_tasks = len(files_to_process) logger.info(f"총 {total_tasks}개의 파일을 처리합니다.") config = { "base_url": BASE_URL, "post_url": post_url, "headers": headers, "model_name": args.model_name, "output_dir": args.output_dir, } results = [] pool = None try: pool = Pool( processes=args.workers, initializer=init_worker, initargs=(log_queue,) ) async_results = [] # <--- 핵심 변경 5: for 루프에서 enumerate를 사용하여 순번(i)을 함께 전달 --- for i, file_path in enumerate(files_to_process): # 작업 인자에 (순번, 전체 개수, 파일 경로, 설정)을 담음 task_args = (i + 1, total_tasks, file_path, config) res = pool.apply_async( process_file_worker, args=task_args, callback=update_progress ) async_results.append(res) pool.close() pool.join() results = [res.get() for res in async_results] except KeyboardInterrupt: logger.warning("사용자에 의해 작업이 중단되었습니다.") if pool: pool.terminate() pool.join() except Exception: logging.getLogger(__name__).exception( "메인 프로세스에서 예기치 않은 오류가 발생했습니다." ) if pool: pool.terminate() pool.join() # <--- 핵심 변경 6: 최종 결과 요약 시 순번을 제외하고 파일명만 사용 --- successful_files = [filename for index, filename, success in results if success] failed_files = [filename for index, filename, success in results if not success] logger.info("=" * 20 + " 모든 작업 완료 " + "=" * 20) logger.info( f"총 {total_tasks}개 파일 중 {len(successful_files)}개 성공, {len(failed_files)}개 실패." ) if failed_files: logger.warning(f"실패한 파일 목록: {failed_files}") queue_listener.stop() if __name__ == "__main__": main()