새로운엔드포인트 추가
This commit is contained in:
335
workspace/run_ocr1.py
Normal file
335
workspace/run_ocr1.py
Normal file
@@ -0,0 +1,335 @@
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from multiprocessing import Pool, Queue, current_process
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# --- 전역 변수 및 로깅 시스템 ---
|
||||
|
||||
worker_log_queue = None
|
||||
completed_tasks_count = 0
|
||||
total_tasks = 0
|
||||
|
||||
|
||||
def setup_main_logger(log_queue):
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(processName)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setFormatter(formatter)
|
||||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||||
log_filename = f"{timestamp}_robust_run.log"
|
||||
workspace_dir = os.path.dirname(os.path.abspath(__file__)) or "."
|
||||
log_filepath = os.path.join(workspace_dir, log_filename)
|
||||
file_handler = logging.FileHandler(log_filepath, encoding="utf-8")
|
||||
file_handler.setFormatter(formatter)
|
||||
queue_listener = logging.handlers.QueueListener(
|
||||
log_queue, console_handler, file_handler
|
||||
)
|
||||
queue_listener.start()
|
||||
|
||||
bootstrap_logger = logging.getLogger("bootstrap")
|
||||
bootstrap_logger.setLevel(logging.INFO)
|
||||
bootstrap_logger.addHandler(console_handler)
|
||||
bootstrap_logger.addHandler(file_handler)
|
||||
bootstrap_logger.propagate = False
|
||||
|
||||
bootstrap_logger.info(f"메인 로거 설정 완료. 로그 파일: '{log_filepath}'")
|
||||
bootstrap_logger.info("=" * 20 + " 스크립트 시작 " + "=" * 20)
|
||||
return queue_listener, bootstrap_logger
|
||||
|
||||
|
||||
def init_worker(log_queue):
|
||||
global worker_log_queue
|
||||
worker_log_queue = log_queue
|
||||
|
||||
|
||||
def get_worker_logger():
|
||||
global worker_log_queue
|
||||
worker_logger = logging.getLogger(current_process().name)
|
||||
if not worker_logger.handlers:
|
||||
worker_logger.setLevel(logging.INFO)
|
||||
queue_handler = logging.handlers.QueueHandler(worker_log_queue)
|
||||
worker_logger.addHandler(queue_handler)
|
||||
return worker_logger
|
||||
|
||||
|
||||
# --- API 요청 함수 (변경 없음) ---
|
||||
def start_extraction(post_url, file_path, filename, headers, model_name=None):
|
||||
worker_logger = get_worker_logger()
|
||||
worker_logger.info(f"[{filename}] 파일 업로드 및 추출 요청 시작...")
|
||||
# ... (내부 로직은 이전과 동일)
|
||||
start_time = time.time()
|
||||
try:
|
||||
with open(file_path, "rb") as input_f:
|
||||
files_to_upload = {"input_file": (filename, input_f)}
|
||||
data_payload = {}
|
||||
if model_name:
|
||||
data_payload["model"] = model_name
|
||||
response = requests.post(
|
||||
post_url,
|
||||
files=files_to_upload,
|
||||
data=data_payload,
|
||||
headers=headers,
|
||||
timeout=300,
|
||||
)
|
||||
end_time = time.time()
|
||||
worker_logger.info(
|
||||
f"[{filename}] 추출 요청 완료. 소요 시간: {end_time - start_time:.2f}초, 상태 코드: {response.status_code}"
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except requests.exceptions.RequestException as e:
|
||||
end_time = time.time()
|
||||
worker_logger.exception(
|
||||
f"[{filename}] 추출 요청 중 치명적 오류 발생. 소요 시간: {end_time - start_time:.2f}초. 오류: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def check_progress(base_url, progress_path, filename, headers):
|
||||
worker_logger = get_worker_logger()
|
||||
get_url = urljoin(base_url + "/", progress_path.lstrip("/"))
|
||||
# ... (내부 로직은 이전과 동일)
|
||||
base_delay = 5
|
||||
max_delay = 60
|
||||
attempt = 0
|
||||
last_status = ""
|
||||
while True:
|
||||
delay = min(max_delay, base_delay * (2**attempt))
|
||||
jitter = delay * 0.2
|
||||
sleep_duration = delay + random.uniform(-jitter, jitter)
|
||||
worker_logger.info(
|
||||
f"[{filename}] {sleep_duration:.2f}초 후 상태 확인 (시도 {attempt + 1})..."
|
||||
)
|
||||
time.sleep(sleep_duration)
|
||||
start_time = time.time()
|
||||
try:
|
||||
response = requests.get(get_url, headers=headers, timeout=60)
|
||||
end_time = time.time()
|
||||
if response.status_code != 200:
|
||||
worker_logger.warning(
|
||||
f"[{filename}] 상태 확인 응답 코드: {response.status_code}. 소요 시간: {end_time - start_time:.2f}초"
|
||||
)
|
||||
if response.status_code == 404:
|
||||
if attempt < 5:
|
||||
worker_logger.warning(
|
||||
f"[{filename}] 작업을 찾을 수 없음(404). 재시도합니다."
|
||||
)
|
||||
attempt += 1
|
||||
continue
|
||||
else:
|
||||
worker_logger.error(
|
||||
f"[{filename}] 재시도 횟수 초과 후에도 작업을 찾을 수 없음(404). 처리 중단."
|
||||
)
|
||||
return None
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
if "final_result" in data and data.get("final_result") is not None:
|
||||
worker_logger.info(f"[{filename}] 처리 완료.")
|
||||
return data["final_result"]
|
||||
if "progress_logs" in data and data["progress_logs"]:
|
||||
status_message = data["progress_logs"][-1].get(
|
||||
"status", "상태 확인 중..."
|
||||
)
|
||||
if status_message != last_status:
|
||||
last_status = status_message
|
||||
worker_logger.info(f"[{filename}] 진행 상태: {last_status}")
|
||||
attempt += 1
|
||||
except requests.exceptions.ReadTimeout:
|
||||
end_time = time.time()
|
||||
worker_logger.warning(
|
||||
f"[{filename}] 상태 확인 타임아웃. 소요 시간: {end_time - start_time:.2f}초. 재시도..."
|
||||
)
|
||||
attempt += 1
|
||||
except requests.exceptions.RequestException as e:
|
||||
end_time = time.time()
|
||||
worker_logger.exception(
|
||||
f"[{filename}] 상태 확인 중 치명적 오류 발생. 소요 시간: {end_time - start_time:.2f}초. 오류: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# --- 워커 함수 및 콜백 함수 ---
|
||||
|
||||
|
||||
# <--- 핵심 변경 1: 워커 함수가 순번(index)과 전체 개수(total)를 인자로 받음 ---
|
||||
def process_file_worker(index, total, file_path, config):
|
||||
"""단일 파일을 처리하는 워커 함수. 이제 순번과 전체 개수를 인자로 받습니다."""
|
||||
worker_logger = get_worker_logger()
|
||||
filename = os.path.basename(file_path)
|
||||
|
||||
# <--- 핵심 변경 2: 로그 출력 시 (순번/전체) 형식 사용 ---
|
||||
worker_logger.info(f"--- ({index}/{total}) 처리 시작: {filename} ---")
|
||||
|
||||
initial_response = start_extraction(
|
||||
config["post_url"], file_path, filename, config["headers"], config["model_name"]
|
||||
)
|
||||
if not initial_response:
|
||||
worker_logger.error(f"[{filename}] 파일 처리 실패 (추출 시작 단계)")
|
||||
return index, filename, False # <--- 핵심 변경 3: 결과 반환 시 순번도 함께 전달
|
||||
|
||||
request_id = initial_response.get("request_id")
|
||||
status_check_url = initial_response.get("status_check_url")
|
||||
|
||||
if not request_id or not status_check_url:
|
||||
worker_logger.error(
|
||||
f"[{filename}] 초기 응답이 잘못되었습니다: {initial_response}"
|
||||
)
|
||||
return index, filename, False
|
||||
|
||||
worker_logger.info(f"[{filename}] 작업 요청 성공. Request ID: {request_id}")
|
||||
|
||||
final_result = check_progress(
|
||||
config["base_url"], status_check_url, filename, config["headers"]
|
||||
)
|
||||
|
||||
if final_result:
|
||||
output_path = os.path.join(
|
||||
config["output_dir"], f"{os.path.splitext(filename)[0]}.json"
|
||||
)
|
||||
try:
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
json.dump(final_result, f, indent=2, ensure_ascii=False)
|
||||
worker_logger.info(f"[{filename}] 결과 저장 완료: {output_path}")
|
||||
return index, filename, True
|
||||
except IOError:
|
||||
worker_logger.exception(f"[{filename}] 파일 저장 중 오류 발생")
|
||||
return index, filename, False
|
||||
else:
|
||||
worker_logger.error(f"[{filename}] 파일 처리 실패 (결과 확인 단계)")
|
||||
return index, filename, False
|
||||
|
||||
|
||||
def update_progress(result):
|
||||
"""진행 상황을 업데이트하고 출력하는 콜백 함수."""
|
||||
global completed_tasks_count, total_tasks
|
||||
completed_tasks_count += 1
|
||||
|
||||
# <--- 핵심 변경 4: 콜백 함수가 순번을 함께 받아서 로그에 표시 ---
|
||||
index, filename, success = result
|
||||
status = "성공" if success else "실패"
|
||||
logging.getLogger("bootstrap").info(
|
||||
f"--- ({completed_tasks_count}/{total_tasks}) 완료: [{index}번째 파일] {filename} (상태: {status}) ---"
|
||||
)
|
||||
|
||||
|
||||
# --- 메인 실행 로직 ---
|
||||
def main():
|
||||
global total_tasks
|
||||
# ... (인자 파싱 및 설정 부분은 이전과 동일) ...
|
||||
dotenv_path = os.path.join(os.path.dirname(__file__), ".env")
|
||||
load_dotenv(dotenv_path=dotenv_path)
|
||||
BASE_URL = os.getenv("BASE_URL")
|
||||
API_KEY = os.getenv("API_KEY")
|
||||
parser = argparse.ArgumentParser(
|
||||
description="문서 정보 추출 자동화 스크립트 (병렬 처리)"
|
||||
)
|
||||
parser.add_argument("input_dir", help="입력 디렉터리 경로")
|
||||
parser.add_argument(
|
||||
"-o", "--output_dir", default="results", help="출력 디렉터리 경로"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--endpoint",
|
||||
choices=["i18n", "d6c"],
|
||||
default="i18n",
|
||||
help="추출 API 엔드포인트 선택",
|
||||
)
|
||||
parser.add_argument("--model", dest="model_name", help="사용할 LLM 모델 이름")
|
||||
parser.add_argument(
|
||||
"-w", "--workers", type=int, default=4, help="동시에 실행할 워커 프로세스 수"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
log_queue = Queue()
|
||||
queue_listener, logger = setup_main_logger(log_queue)
|
||||
if not BASE_URL or not API_KEY:
|
||||
logger.error("환경 변수(BASE_URL, API_KEY)가 .env 파일에 설정되지 않았습니다.")
|
||||
queue_listener.stop()
|
||||
return
|
||||
if not os.path.isdir(args.input_dir):
|
||||
logger.error(f"입력 디렉터리를 찾을 수 없습니다 - {args.input_dir}")
|
||||
queue_listener.stop()
|
||||
return
|
||||
os.makedirs(args.output_dir, exist_ok=True)
|
||||
headers = {"X-API-KEY": API_KEY}
|
||||
post_url = f"{BASE_URL}/extract/inner/{args.endpoint}"
|
||||
logger.info(f"API 서버: {BASE_URL}")
|
||||
logger.info(f"요청 API: {post_url}")
|
||||
logger.info(f"입력 디렉터리: {args.input_dir}")
|
||||
logger.info(f"출력 디렉터리: {args.output_dir}")
|
||||
logger.info(f"동시 작업 수 (워커): {args.workers}")
|
||||
files_to_process = [
|
||||
os.path.join(args.input_dir, f)
|
||||
for f in sorted(os.listdir(args.input_dir))
|
||||
if os.path.isfile(os.path.join(args.input_dir, f))
|
||||
]
|
||||
total_tasks = len(files_to_process)
|
||||
logger.info(f"총 {total_tasks}개의 파일을 처리합니다.")
|
||||
config = {
|
||||
"base_url": BASE_URL,
|
||||
"post_url": post_url,
|
||||
"headers": headers,
|
||||
"model_name": args.model_name,
|
||||
"output_dir": args.output_dir,
|
||||
}
|
||||
|
||||
results = []
|
||||
pool = None
|
||||
try:
|
||||
pool = Pool(
|
||||
processes=args.workers, initializer=init_worker, initargs=(log_queue,)
|
||||
)
|
||||
async_results = []
|
||||
|
||||
# <--- 핵심 변경 5: for 루프에서 enumerate를 사용하여 순번(i)을 함께 전달 ---
|
||||
for i, file_path in enumerate(files_to_process):
|
||||
# 작업 인자에 (순번, 전체 개수, 파일 경로, 설정)을 담음
|
||||
task_args = (i + 1, total_tasks, file_path, config)
|
||||
res = pool.apply_async(
|
||||
process_file_worker, args=task_args, callback=update_progress
|
||||
)
|
||||
async_results.append(res)
|
||||
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
results = [res.get() for res in async_results]
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.warning("사용자에 의해 작업이 중단되었습니다.")
|
||||
if pool:
|
||||
pool.terminate()
|
||||
pool.join()
|
||||
except Exception:
|
||||
logging.getLogger(__name__).exception(
|
||||
"메인 프로세스에서 예기치 않은 오류가 발생했습니다."
|
||||
)
|
||||
if pool:
|
||||
pool.terminate()
|
||||
pool.join()
|
||||
|
||||
# <--- 핵심 변경 6: 최종 결과 요약 시 순번을 제외하고 파일명만 사용 ---
|
||||
successful_files = [filename for index, filename, success in results if success]
|
||||
failed_files = [filename for index, filename, success in results if not success]
|
||||
|
||||
logger.info("=" * 20 + " 모든 작업 완료 " + "=" * 20)
|
||||
logger.info(
|
||||
f"총 {total_tasks}개 파일 중 {len(successful_files)}개 성공, {len(failed_files)}개 실패."
|
||||
)
|
||||
if failed_files:
|
||||
logger.warning(f"실패한 파일 목록: {failed_files}")
|
||||
|
||||
queue_listener.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user