This commit is contained in:
2025-08-01 09:33:01 +09:00
commit 486c2ceeb8
7 changed files with 442 additions and 0 deletions

134
workspace/app.py Normal file
View File

@@ -0,0 +1,134 @@
# app.py (프로젝트별 디렉터리 지원 버전)
import streamlit as st
import json
import os
import base64
from pathlib import Path
# --- 설정 ---
DOCS_DIR = Path("/data/documents")
JSON_DIR = Path("/data/jsons")
# --- 헬퍼 함수 ---
def scan_project_directories(docs_base_dir, json_base_dir):
"""
두 베이스 디렉터리를 스캔하여, 공통된 서브디렉터리(프로젝트)를 찾고
그 안의 파일 쌍을 매핑한 딕셔너리를 반환합니다.
"""
projects_data = {}
if not docs_base_dir.is_dir():
return projects_data
# 문서 디렉터리 기준으로 서브디렉터리(프로젝트)를 찾음
for project_path in docs_base_dir.iterdir():
if project_path.is_dir():
project_name = project_path.name
json_project_path = json_base_dir / project_name
# JSON 디렉터리에도 해당 프로젝트 폴더가 있는지 확인
if json_project_path.is_dir():
# 프로젝트 내에서 파일 쌍 매칭
doc_files = {f.stem: f for f in project_path.iterdir() if f.is_file()}
json_files = {f.stem: f for f in json_project_path.iterdir() if f.is_file() and f.suffix == '.json'}
matching_pairs = {}
for base_name, doc_path in doc_files.items():
if base_name in json_files:
matching_pairs[base_name] = {
"doc_path": doc_path,
"json_path": json_files[base_name]
}
if matching_pairs:
projects_data[project_name] = matching_pairs
return projects_data
def display_pdf(file_path):
"""PDF 파일을 웹 페이지에 임베드하여 표시합니다."""
with open(file_path, "rb") as f:
base64_pdf = base64.b64encode(f.read()).decode('utf-8')
pdf_display = f'<iframe src="data:application/pdf;base64,{base64_pdf}" width="100%" height="800" type="application/pdf"></iframe>'
st.markdown(pdf_display, unsafe_allow_html=True)
# --- 메인 UI 로직 ---
def main():
st.set_page_config(layout="wide", page_title="결과 비교 도구")
st.title("🗂️ 프로젝트별 결과 비교 도구")
st.markdown("---")
if not DOCS_DIR.is_dir() or not JSON_DIR.is_dir():
st.error(f"오류: 데이터 루트 디렉터리(`{DOCS_DIR}` 또는 `{JSON_DIR}`)를 찾을 수 없습니다.")
return
try:
projects_data = scan_project_directories(DOCS_DIR, JSON_DIR)
except Exception as e:
st.error(f"프로젝트 목록을 읽는 중 오류가 발생했습니다: {e}")
return
if not projects_data:
st.warning("비교할 프로젝트가 없습니다. 각 데이터 디렉터리 안에 동일한 이름의 하위 폴더가 있는지 확인하세요.")
return
# --- 1. 프로젝트 선택 ---
st.sidebar.header("파일 선택")
project_names = sorted(list(projects_data.keys()))
selected_project = st.sidebar.selectbox(
"1. 프로젝트를 선택하세요.",
project_names
)
if selected_project:
files_in_project = projects_data[selected_project]
# --- 2. 파일 선택 ---
sorted_basenames = sorted(list(files_in_project.keys()))
display_options = [f"{i}. {name}" for i, name in enumerate(sorted_basenames, 1)]
selected_option = st.sidebar.selectbox(
f"2. '{selected_project}' 프로젝트의 파일을 선택하세요.",
display_options
)
if selected_option:
original_basename = selected_option.split('. ', 1)[1]
st.header(f"🔎 비교 결과: `{selected_project} / {original_basename}`")
selected_pair = files_in_project[original_basename]
doc_path = selected_pair["doc_path"]
json_path = selected_pair["json_path"]
# --- 결과 표시 (이전과 동일) ---
col1, col2 = st.columns(2)
with col1:
st.subheader("원본 문서")
try:
if doc_path.suffix.lower() == ".pdf":
display_pdf(doc_path)
elif doc_path.suffix.lower() in ['.png', '.jpg', '.jpeg']:
st.image(str(doc_path), caption=f"원본 이미지: {doc_path.name}", use_container_width=True)
else:
st.warning(f"지원하지 않는 문서 형식입니다: {doc_path.name}")
except Exception as e:
st.error(f"문서 파일을 표시하는 중 오류가 발생했습니다: {e}")
with col2:
st.subheader("추출된 데이터 (JSON)")
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list) and len(data) > 0:
result_item = data[0]
else:
result_item = data
if 'fields' in result_item:
del result_item['fields']
st.json(result_item)
except Exception as e:
st.error(f"JSON 파일을 읽거나 처리하는 중 오류가 발생했습니다: {e}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,168 @@
import requests
import time
import json
import os
import argparse
import sys
from urllib.parse import urljoin
import logging
# --- 설정 ---
BASE_URL = "http://172.16.10.176:8888"
API_KEY = 'sk-e03e060ea4ee8edf2e057fbff3e68c28'
RETRY_COUNT_ON_404 = 3
RETRY_DELAY_ON_404 = 5
# --- 로거 설정 ---
# 전역 로거 객체 생성
logger = logging.getLogger(__name__)
def setup_logger():
"""로거를 설정하여 콘솔과 파일에 모두 출력하도록 합니다."""
logger.setLevel(logging.INFO) # 로거의 최소 레벨 설정
# 로그 포맷 지정
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 콘솔 핸들러 설정
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 파일 핸들러 설정 (예: 'script_run.log' 파일에 저장)
file_handler = logging.FileHandler('script_run.log', encoding='utf-8')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# --- API 요청 함수 ---
def start_extraction(post_url, file_path, filename, headers, model_name=None):
"""POST /extract/inner: 문서 추출 시작"""
try:
with open(file_path, 'rb') as input_f:
files_to_upload = {'input_file': (filename, input_f)}
data_payload = {}
if model_name:
data_payload['model'] = model_name
response = requests.post(post_url, files=files_to_upload, data=data_payload, headers=headers)
response.raise_for_status()
return response.json()
except Exception:
# logger.exception은 오류의 상세 정보(traceback)까지 기록해줍니다.
logger.exception(f"[{filename}] POST 요청 중 오류 발생")
return None
def check_progress(progress_path, filename, headers):
"""GET /extract/progress/{request_id}: 진행 상태 확인 (로깅 적용)"""
get_url = urljoin(BASE_URL + '/', progress_path.lstrip('/'))
retries_left = RETRY_COUNT_ON_404
last_status = ""
while True:
try:
response = requests.get(get_url, headers=headers, timeout=30)
if response.status_code == 404:
if retries_left > 0:
logger.warning(f"[{filename}] 작업을 찾을 수 없어(404) {RETRY_DELAY_ON_404}초 후 재시도합니다... ({retries_left}회 남음)")
retries_left -= 1
time.sleep(RETRY_DELAY_ON_404)
continue
else:
logger.error(f"[{filename}] 재시도 횟수 초과 후에도 작업을 찾을 수 없습니다 (404).")
return None
response.raise_for_status()
data = response.json()
if "final_result" in data and data.get("final_result") is not None:
logger.info(f"[{filename}] 처리 완료.")
return data["final_result"]
if "progress_logs" in data and data["progress_logs"]:
status_message = data["progress_logs"][-1].get("status", "상태 확인 중...")
if status_message != last_status:
last_status = status_message
logger.info(f"[{filename}] 진행 상태: {last_status}")
time.sleep(2)
except requests.exceptions.ReadTimeout:
logger.warning(f"[{filename}] 상태 확인 타임아웃. 재시도...")
time.sleep(2)
except Exception:
logger.exception(f"[{filename}] 상태 확인 중 예측하지 못한 오류 발생")
return None
# --- 메인 실행 로직 ---
def main():
# 로거를 가장 먼저 설정합니다.
setup_logger()
parser = argparse.ArgumentParser(description="문서 정보 추출 자동화 스크립트 (로깅 적용)")
parser.add_argument("input_dir", help="입력 디렉터리 경로")
parser.add_argument("-o", "--output_dir", default="results", help="출력 디렉터리 경로")
parser.add_argument("--endpoint", choices=['i18n', 'd6c'], default='i18n', help="추출 API 엔드포인트 선택 (i18n 또는 d6c)")
parser.add_argument("--model", dest="model_name", help="사용할 LLM 모델 이름")
args = parser.parse_args()
if not os.path.isdir(args.input_dir):
logger.error(f"입력 디렉터리를 찾을 수 없습니다 - {args.input_dir}")
return
os.makedirs(args.output_dir, exist_ok=True)
headers = {'X-API-KEY': API_KEY}
post_url = f"{BASE_URL}/extract/inner/{args.endpoint}"
logger.info("="*20 + " 스크립트 시작 " + "="*20)
logger.info(f"API 서버: {BASE_URL}")
logger.info(f"요청 API: {post_url}")
logger.info(f"입력 디렉터리: {args.input_dir}")
logger.info(f"출력 디렉터리: {args.output_dir}")
for filename in sorted(os.listdir(args.input_dir)):
file_path = os.path.join(args.input_dir, filename)
if not os.path.isfile(file_path):
continue
logger.info(f"--- 처리 시작: {filename} ---")
initial_response = start_extraction(post_url, file_path, filename, headers, args.model_name)
if not initial_response:
logger.error(f"[{filename}] 파일 처리 실패 (추출 시작 단계)")
continue
request_id = initial_response.get("request_id")
status_check_url = initial_response.get("status_check_url")
if not request_id or not status_check_url:
logger.error(f"[{filename}] 초기 응답이 잘못되었습니다: {initial_response}")
continue
logger.info(f"[{filename}] 작업 요청 성공. Request ID: {request_id}")
final_result = check_progress(status_check_url, filename, headers)
if final_result:
output_path = os.path.join(args.output_dir, f"{os.path.splitext(filename)[0]}.json")
try:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(final_result, f, indent=2, ensure_ascii=False)
logger.info(f"[{filename}] 결과 저장 완료: {output_path}")
except IOError:
logger.exception(f"[{filename}] 파일 저장 중 오류 발생")
else:
logger.error(f"[{filename}] 파일 처리 실패 (결과 확인 단계)")
logger.info("="*20 + " 모든 작업 완료 " + "="*20)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
# KeyboardInterrupt는 main 밖에서 처리해야 할 수 있으므로 로거를 직접 호출
logging.getLogger(__name__).warning("사용자에 의해 작업이 중단되었습니다.")