From 40785a1b305a1c429bacb0de18b103477200f9c1 Mon Sep 17 00:00:00 2001 From: chan Date: Tue, 12 Aug 2025 16:59:14 +0900 Subject: [PATCH] =?UTF-8?q?ui=20=EA=B0=9C=EC=84=A0=20=EB=B0=8F=20=EC=A0=95?= =?UTF-8?q?=EB=8B=B5=EC=85=8B=20=ED=8E=B8=EC=A7=91=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dockerfile | 17 +- workspace/app.py | 466 +++++++++++++++++++++++++++++++++-------------- 2 files changed, 336 insertions(+), 147 deletions(-) diff --git a/dockerfile b/dockerfile index 8105ecd..69f0ac4 100644 --- a/dockerfile +++ b/dockerfile @@ -1,30 +1,17 @@ # Dockerfile -# 1. 베이스 이미지 선택 -# 파이썬 3.10의 가벼운(slim) 버전을 기반으로 시작합니다. FROM python:3.10-slim -# 2. 작업 디렉토리 설정 -# 컨테이너 내에서 명령어를 실행할 기본 폴더를 설정합니다. WORKDIR /workspace -# 3. 의존성 파일 복사 및 설치 -# 먼저 의존성 목록 파일을 복사합니다. (이것만 바뀌었을 경우 빌드 속도 향상) COPY requirements.txt . COPY workspace/ . -# requirements.txt에 명시된 라이브러리들을 설치합니다. -# --no-cache-dir 옵션은 불필요한 캐시를 남기지 않아 이미지 크기를 줄여줍니다. + RUN pip install --no-cache-dir -r requirements.txt COPY workspace/app.py . -# 4. 앱 소스 코드 복사 -# 현재 폴더의 모든 파일을 컨테이너의 /app 폴더로 복사합니다. + COPY . . -# 5. 포트 노출 -# Streamlit의 기본 포트인 8501을 외부에 노출하도록 설정합니다. EXPOSE 8501 -# 6. 컨테이너 실행 명령어 설정 -# 컨테이너가 시작될 때 실행할 명령어를 정의합니다. -# --server.address=0.0.0.0 옵션은 컨테이너 외부에서의 접속을 허용하기 위해 필수입니다. CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0"] \ No newline at end of file diff --git a/workspace/app.py b/workspace/app.py index 667b948..45edfd6 100644 --- a/workspace/app.py +++ b/workspace/app.py @@ -1,205 +1,407 @@ - -# app.py (시드 기반 서버 사이드 세션 공유 기능) -import streamlit as st -import json -from pathlib import Path +# app.py (UI 최종 개선) import base64 -import uuid +import io +import json import shutil +import time +import uuid +import zipfile +from pathlib import Path + +import streamlit as st # --- 상수 --- -# 스크립트 파일의 위치를 기준으로 경로 설정 SESSION_BASE_PATH = Path(__file__).parent / "shared_sessions" +EDIT_KEY = "parsed" # 수정할 키를 'parsed'로 고정 + +# --- 상태 관리 함수 (세션 기반) --- + + +def get_session_path(seed): + return SESSION_BASE_PATH / seed + + +def get_session_completed_log_path(seed): + return get_session_path(seed) / "completed_files.json" + + +def load_completed_files(seed): + log_path = get_session_completed_log_path(seed) + if not log_path.exists(): + return set() + try: + with open(log_path, "r", encoding="utf-8") as f: + return set(json.load(f)) + except (json.JSONDecodeError, FileNotFoundError): + return set() + + +def save_completed_file(seed, basename): + completed_set = load_completed_files(seed) + completed_set.add(basename) + log_path = get_session_completed_log_path(seed) + with open(log_path, "w", encoding="utf-8") as f: + json.dump(list(completed_set), f, indent=2) + # --- 헬퍼 함수 --- -def get_session_path(seed): - """시드에 해당하는 세션 디렉토리 경로를 반환합니다.""" - return SESSION_BASE_PATH / seed -def save_files_to_session(seed, doc_files, json_files): - """업로드된 파일들을 서버의 세션 디렉토리에 저장합니다.""" +def save_files_to_session(seed, doc_files, json_paddle_files, json_upstage_files): session_path = get_session_path(seed) doc_path = session_path / "docs" - json_path = session_path / "jsons" + json_paddle_path = session_path / "jsons_paddle_ocr" + json_upstage_path = session_path / "jsons_upstage" - # 기존 디렉토리가 있으면 삭제하고 새로 생성 if session_path.exists(): shutil.rmtree(session_path) - doc_path.mkdir(parents=True, exist_ok=True) - json_path.mkdir(parents=True, exist_ok=True) + for p in [doc_path, json_paddle_path, json_upstage_path]: + p.mkdir(parents=True, exist_ok=True) + + for f in doc_files: + (doc_path / f.name).write_bytes(f.getbuffer()) + for f in json_paddle_files: + (json_paddle_path / f.name).write_bytes(f.getbuffer()) + for f in json_upstage_files: + (json_upstage_path / f.name).write_bytes(f.getbuffer()) - for file in doc_files: - with open(doc_path / file.name, "wb") as f: - f.write(file.getbuffer()) - for file in json_files: - with open(json_path / file.name, "wb") as f: - f.write(file.getbuffer()) def load_files_from_session(seed): - """서버의 세션 디렉토리에서 파일 목록을 로드합니다.""" session_path = get_session_path(seed) doc_path = session_path / "docs" - json_path = session_path / "jsons" + json_paddle_path = session_path / "jsons_paddle_ocr" + json_upstage_path = session_path / "jsons_upstage" + if not all([p.is_dir() for p in [doc_path, json_paddle_path, json_upstage_path]]): + return None, None, None + return ( + sorted(doc_path.iterdir()), + sorted(json_paddle_path.iterdir()), + sorted(json_upstage_path.iterdir()), + ) - if not session_path.is_dir(): - return None, None - doc_files = sorted(list(doc_path.iterdir())) - json_files = sorted(list(json_path.iterdir())) - return doc_files, json_files - -def match_disk_files(doc_files, json_files): - """디스크에 저장된 두 파일 목록(Path 객체)을 매칭합니다.""" - matched_pairs = {} +def match_files_3_way(doc_files, json_paddle_files, json_upstage_files): + matched = {} docs_map = {f.stem: f for f in doc_files} - jsons_map = {f.stem: f for f in json_files} - + jsons_paddle_map = {f.stem: f for f in json_paddle_files} + jsons_upstage_map = {f.stem: f for f in json_upstage_files} for stem, doc_file in docs_map.items(): - if stem in jsons_map: - matched_pairs[stem] = { + if stem in jsons_paddle_map and stem in jsons_upstage_map: + matched[stem] = { "doc_file": doc_file, - "json_file": jsons_map[stem] + "paddle_ocr_file": jsons_paddle_map[stem], + "upstage_file": jsons_upstage_map[stem], } - return matched_pairs + return matched -def display_pdf(file_path_or_obj): - """파일 경로 또는 업로드된 파일 객체를 받아 PDF를 표시합니다.""" + +def display_pdf(file_path): + bytes_data = file_path.read_bytes() + base64_pdf = base64.b64encode(bytes_data).decode("utf-8") + st.markdown( + f'', + unsafe_allow_html=True, + ) + + +def display_readonly_json(file_path, title): + st.subheader(title) try: - if isinstance(file_path_or_obj, Path): - with open(file_path_or_obj, "rb") as f: - bytes_data = f.read() - else: # UploadedFile - file_path_or_obj.seek(0) - bytes_data = file_path_or_obj.read() - - base64_pdf = base64.b64encode(bytes_data).decode('utf-8') - pdf_display = f'' - st.markdown(pdf_display, unsafe_allow_html=True) + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + is_list = isinstance(data, list) + result_to_display = data[0] if is_list and data else data + st.json(result_to_display) except Exception as e: - st.error(f"PDF 파일을 표시하는 중 오류가 발생했습니다: {e}") + st.error(f"JSON 표시 중 오류: {e}") + + +def create_gt_editor(seed, basename, json_file_path, model_name, is_re_edit=False): + editor_title = ( + "정답셋 재수정" if is_re_edit else f"정답셋 편집 (기반: {model_name})" + ) + st.subheader(editor_title) + + try: + with open(json_file_path, "r", encoding="utf-8") as f: + data = json.load(f) + is_list = isinstance(data, list) + result_to_display = data[0] if is_list and data else data + + if not isinstance(result_to_display, dict): + st.warning("수정할 수 있는 JSON 객체(dict)가 아닙니다.") + return + if EDIT_KEY not in result_to_display: + st.error(f"편집 대상 키인 '{EDIT_KEY}'를 JSON 파일에서 찾을 수 없습니다.") + return + + current_value = result_to_display.get(EDIT_KEY, "") + st.markdown(f"**`{EDIT_KEY}`** 키의 내용을 수정합니다.") + new_value_str = st.text_area( + "값 수정:", + value=str(current_value), + height=800, + key=f"gt_value_input_{model_name}", + ) + + if st.button("✅ 정답셋으로 저장", key=f"gt_save_button_{model_name}"): + modified_data = result_to_display.copy() + modified_data[EDIT_KEY] = new_value_str + final_data_to_save = [modified_data] if is_list else modified_data + + gt_path = get_session_path(seed) / "groundtruth" + gt_path.mkdir(parents=True, exist_ok=True) + save_path = gt_path / f"{basename}.json" + + with open(save_path, "w", encoding="utf-8") as f: + json.dump(final_data_to_save, f, indent=2, ensure_ascii=False) + + if not is_re_edit: + save_completed_file(seed, basename) + + if "re_edit_gt" in st.session_state: + del st.session_state["re_edit_gt"] + + st.success(f"'{save_path.name}' 파일이 정답셋으로 저장되었습니다!") + st.info("상태가 업데이트되었습니다. 페이지를 새로고침합니다.") + time.sleep(1) + st.rerun() + + except Exception as e: + st.error(f"JSON 편집기 생성 중 오류: {e}") + # --- 콜백 함수 --- def handle_nav_button(direction, total_files): + if "re_edit_gt" in st.session_state: + del st.session_state["re_edit_gt"] if direction == "prev" and st.session_state.current_index > 0: st.session_state.current_index -= 1 elif direction == "next" and st.session_state.current_index < total_files - 1: st.session_state.current_index += 1 + def handle_selectbox_change(): - selected_basename_with_index = st.session_state.selectbox_key - new_index = int(selected_basename_with_index.split('. ', 1)[0]) - 1 - st.session_state.current_index = new_index + if "re_edit_gt" in st.session_state: + del st.session_state["re_edit_gt"] + st.session_state.current_index = ( + int(st.session_state.selectbox_key.split(". ", 1)[0]) - 1 + ) + # --- 메인 UI 로직 --- def main(): - st.set_page_config(layout="wide", page_title="결과 비교 도구") - st.title("📑 결과 비교 및 공유 도구") - st.markdown("---") - - # 세션 상태 초기화 - if 'current_index' not in st.session_state: + st.set_page_config(layout="wide", page_title="정답셋 생성 도구") + if "current_index" not in st.session_state: st.session_state.current_index = 0 - - # 세션 저장 기본 경로 생성 SESSION_BASE_PATH.mkdir(parents=True, exist_ok=True) matched_files = None - doc_files, json_files = None, None - - # URL에서 시드 확인 - query_params = st.query_params - url_seed = query_params.get("seed") + url_seed = st.query_params.get("seed") if url_seed: - doc_files, json_files = load_files_from_session(url_seed) - if doc_files is None: - st.error(f"'{url_seed}'에 해당하는 공유 세션을 찾을 수 없습니다. 시드가 정확한지 확인하거나, 파일을 새로 업로드하세요.") + completed_files = load_completed_files(url_seed) + files = load_files_from_session(url_seed) + if files[0] is not None: + st.success(f"'{url_seed}' 시드에서 파일을 불러왔습니다.") + matched_files = match_files_3_way(*files) else: - st.success(f"'{url_seed}' 시드에서 공유된 파일을 불러왔습니다.") - matched_files = match_disk_files(doc_files, json_files) - - # 시드가 없거나, 시드로 로드 실패 시 파일 업로더 표시 + st.error(f"'{url_seed}'에 해당하는 세션을 찾을 수 없습니다.") + else: + completed_files = set() + + # --- 사이드바 --- + st.sidebar.info("화면을 넓게 보려면 오른쪽 위 화살표를 누르세요 <<") + st.sidebar.markdown("---") + + st.sidebar.header("파일 업로드") if not matched_files: - st.sidebar.header("파일 업로드") - uploaded_docs = st.sidebar.file_uploader( - "1. 원본 문서 파일(들)을 업로드하세요.", - accept_multiple_files=True, - type=['png', 'jpg', 'jpeg', 'pdf'] + docs = st.sidebar.file_uploader( + "1. 원본 문서", accept_multiple_files=True, type=["png", "jpg", "pdf"] ) - uploaded_jsons = st.sidebar.file_uploader( - "2. 결과 JSON 파일(들)을 업로드하세요.", - accept_multiple_files=True, - type=['json'] + jsons_paddle = st.sidebar.file_uploader( + "2. paddle_ocr JSON", accept_multiple_files=True, type=["json"] ) + jsons_upstage = st.sidebar.file_uploader( + "3. upstage JSON", accept_multiple_files=True, type=["json"] + ) + if all([docs, jsons_paddle, jsons_upstage]) and st.sidebar.button( + "업로드 및 세션 생성" + ): + new_seed = str(uuid.uuid4())[:8] + save_files_to_session(new_seed, docs, jsons_paddle, jsons_upstage) + st.query_params["seed"] = new_seed + st.rerun() - if uploaded_docs and uploaded_jsons: - if st.sidebar.button("업로드 및 세션 생성"): - new_seed = str(uuid.uuid4())[:8] - save_files_to_session(new_seed, uploaded_docs, uploaded_jsons) - st.query_params["seed"] = new_seed # URL 업데이트 및 앱 재실행 - st.rerun() - - # 공유 UI - if url_seed and matched_files: + if url_seed and matched_files is not None: st.sidebar.header("세션 공유") - # 현재 페이지의 전체 URL을 가져오는 것은 Streamlit에서 직접 지원하지 않으므로, - # 사용자에게 주소창의 URL을 복사하라고 안내합니다. - st.sidebar.success("세션이 활성화되었습니다!") - st.sidebar.info("다른 사람과 공유하려면 현재 브라우저 주소창의 URL을 복사하여 전달하세요.") + st.sidebar.info("URL을 복사하여 다른 사람과 세션을 공유하세요.") st.sidebar.text_input("공유 시드", url_seed, disabled=True) - # --- 결과 표시 로직 (matched_files가 있을 때만 실행) --- if not matched_files: - st.info("사이드바에서 파일을 업로드하고 '업로드 및 세션 생성' 버튼을 누르거나, 공유받은 URL로 접속하세요.") + st.info("모든 종류의 파일을 업로드하고 세션을 생성하세요.") + if matched_files is not None and not matched_files: + st.warning( + "파일 이름(확장자 제외)이 동일한 '문서-paddle_ocr-upstage' 세트를 찾을 수 없습니다." + ) return st.sidebar.header("파일 탐색") sorted_basenames = sorted(list(matched_files.keys())) - total_files = len(sorted_basenames) - st.session_state.current_index = max(0, min(st.session_state.current_index, total_files - 1)) + display_options = [ + f"{i+1}. {name} {'✅' if name in completed_files else ''}" + for i, name in enumerate(sorted_basenames) + ] - display_options = [f"{i + 1}. {name}" for i, name in enumerate(sorted_basenames)] - st.selectbox( - "파일을 직접 선택하세요:", - options=display_options, + st.sidebar.selectbox( + "파일 선택:", + display_options, index=st.session_state.current_index, - key='selectbox_key', - on_change=handle_selectbox_change + key="selectbox_key", + on_change=handle_selectbox_change, ) - col1, col2, col3 = st.sidebar.columns([1, 2, 1]) - col1.button("◀ 이전", on_click=handle_nav_button, args=("prev", total_files), use_container_width=True) - col2.markdown(f"

{st.session_state.current_index + 1} / {total_files}

", unsafe_allow_html=True) - col3.button("다음 ▶", on_click=handle_nav_button, args=("next", total_files), use_container_width=True) + st.sidebar.header("보기 옵션") + hide_reference = st.sidebar.checkbox("참고용 영역 숨기기", key="hide_reference") - current_basename = sorted_basenames[st.session_state.current_index] - st.header(f"🔎 비교 결과: `{current_basename}`") - - selected_pair = matched_files[current_basename] - doc_file = selected_pair["doc_file"] - json_file = selected_pair["json_file"] + st.sidebar.header("내보내기") + gt_dir = get_session_path(url_seed) / "groundtruth" + if gt_dir.exists() and any(gt_dir.iterdir()): + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: + for file_path in gt_dir.glob("*.json"): + zip_file.writestr(file_path.name, file_path.read_bytes()) - res_col1, res_col2 = st.columns(2) - with res_col1: + st.sidebar.download_button( + label="정답셋 다운로드 (.zip)", + data=zip_buffer.getvalue(), + file_name=f"groundtruth_{url_seed}.zip", + mime="application/zip", + ) + else: + st.sidebar.write("다운로드할 정답셋이 없습니다.") + + # --- 메인 화면 --- + current_basename = ( + display_options[st.session_state.current_index] + .split(" ", 1)[1] + .replace(" ✅", "") + .strip() + ) + + pair = matched_files[current_basename] + doc_file, paddle_file, upstage_file = ( + pair["doc_file"], + pair["paddle_ocr_file"], + pair["upstage_file"], + ) + + top_container = st.container() + is_completed = current_basename in completed_files + is_re_editing = st.session_state.get("re_edit_gt") == current_basename + + with top_container: + nav_cols = st.columns([1, 5, 1]) + nav_cols[0].button( + "◀ 이전", + on_click=handle_nav_button, + args=("prev", len(sorted_basenames)), + use_container_width=True, + ) + nav_cols[1].markdown( + f"

{current_basename} ({st.session_state.current_index + 1}/{len(sorted_basenames)})

", + unsafe_allow_html=True, + ) + nav_cols[2].button( + "다음 ▶", + on_click=handle_nav_button, + args=("next", len(sorted_basenames)), + use_container_width=True, + ) + st.markdown("---") + + if is_completed and not is_re_editing: + msg_col, btn_col = st.columns([3, 1]) + msg_col.success("✅ 이 파일은 정답셋 생성이 완료되었습니다.") + if btn_col.button("🔄 정답셋 다시 수정하기"): + st.session_state.re_edit_gt = current_basename + st.rerun() + elif not is_completed: + _, radio_col = st.columns([3, 1]) + source_model_name = radio_col.radio( + "정답셋 편집 기반 모델 선택:", + ("paddle_ocr", "upstage"), + horizontal=True, + label_visibility="collapsed", + ) + + if hide_reference: + col1, col3 = st.columns([1, 1]) + else: + col1, col2, col3 = st.columns([2, 1, 1]) + + with col1: st.subheader(f"원본 문서: `{doc_file.name}`") if doc_file.suffix.lower() == ".pdf": display_pdf(doc_file) else: - st.image(str(doc_file), caption=f"원본 이미지: {doc_file.name}", use_container_width=True) + st.image(str(doc_file), use_container_width=True) + + if is_re_editing: + gt_file_path = ( + get_session_path(url_seed) / "groundtruth" / f"{current_basename}.json" + ) + if gt_file_path.exists(): + with col3: + create_gt_editor( + url_seed, + current_basename, + gt_file_path, + "Ground Truth", + is_re_edit=True, + ) + else: + st.error("저장된 정답셋 파일을 찾을 수 없습니다. 새로 생성해주세요.") + if "re_edit_gt" in st.session_state: + del st.session_state.re_edit_gt + elif is_completed: + with col3: + gt_file_path = ( + get_session_path(url_seed) / "groundtruth" / f"{current_basename}.json" + ) + if gt_file_path.exists(): + display_readonly_json(gt_file_path, "저장된 정답셋") + else: + st.warning("저장된 정답셋 파일을 찾을 수 없습니다.") + else: + if source_model_name == "paddle_ocr": + with col3: + create_gt_editor(url_seed, current_basename, paddle_file, "paddle_ocr") + else: + with col3: + create_gt_editor(url_seed, current_basename, upstage_file, "upstage") + + if not hide_reference: + with col2: + if is_re_editing: + st.empty() + elif is_completed: + ref_model_name = st.radio( + "참고용 모델 선택:", + ("paddle_ocr", "upstage"), + horizontal=True, + key=f"ref_select_{current_basename}", + ) + if ref_model_name == "paddle_ocr": + display_readonly_json(paddle_file, "참고용: paddle_ocr") + else: + display_readonly_json(upstage_file, "참고용: upstage") + else: + if source_model_name == "paddle_ocr": + display_readonly_json(paddle_file, "참고용: paddle_ocr") + else: + display_readonly_json(upstage_file, "참고용: upstage") - with res_col2: - st.subheader(f"추출된 데이터: `{json_file.name}`") - try: - with open(json_file, "r", encoding="utf-8") as f: - data = json.load(f) - - result_to_display = data[0] if isinstance(data, list) and data else data - if isinstance(result_to_display, dict) and 'fields' in result_to_display: - del result_to_display['fields'] - st.json(result_to_display) - except Exception as e: - st.error(f"JSON 파일을 읽거나 처리하는 중 오류가 발생했습니다: {e}") if __name__ == "__main__": main()