# app.py (UI 최종 개선) import base64 import io import json import shutil import time import uuid import zipfile from pathlib import Path import streamlit as st # --- 상수 --- SESSION_BASE_PATH = Path(__file__).parent / "shared_sessions" EDIT_KEY = "parsed" # 수정할 키를 'parsed'로 고정 # --- 상태 관리 함수 (세션 기반) --- def get_session_path(seed): return SESSION_BASE_PATH / seed def get_session_completed_log_path(seed): return get_session_path(seed) / "completed_files.json" def load_completed_files(seed): log_path = get_session_completed_log_path(seed) if not log_path.exists(): return set() try: with open(log_path, "r", encoding="utf-8") as f: return set(json.load(f)) except (json.JSONDecodeError, FileNotFoundError): return set() def save_completed_file(seed, basename): completed_set = load_completed_files(seed) completed_set.add(basename) log_path = get_session_completed_log_path(seed) with open(log_path, "w", encoding="utf-8") as f: json.dump(list(completed_set), f, indent=2) # --- 헬퍼 함수 --- def save_files_to_session(seed, doc_files, json_paddle_files, json_upstage_files): session_path = get_session_path(seed) doc_path = session_path / "docs" json_paddle_path = session_path / "jsons_paddle_ocr" json_upstage_path = session_path / "jsons_upstage" if session_path.exists(): shutil.rmtree(session_path) for p in [doc_path, json_paddle_path, json_upstage_path]: p.mkdir(parents=True, exist_ok=True) for f in doc_files: (doc_path / f.name).write_bytes(f.getbuffer()) for f in json_paddle_files: (json_paddle_path / f.name).write_bytes(f.getbuffer()) for f in json_upstage_files: (json_upstage_path / f.name).write_bytes(f.getbuffer()) def load_files_from_session(seed): session_path = get_session_path(seed) doc_path = session_path / "docs" json_paddle_path = session_path / "jsons_paddle_ocr" json_upstage_path = session_path / "jsons_upstage" if not all([p.is_dir() for p in [doc_path, json_paddle_path, json_upstage_path]]): return None, None, None return ( sorted(doc_path.iterdir()), sorted(json_paddle_path.iterdir()), sorted(json_upstage_path.iterdir()), ) def match_files_3_way(doc_files, json_paddle_files, json_upstage_files): matched = {} docs_map = {f.stem: f for f in doc_files} jsons_paddle_map = {f.stem: f for f in json_paddle_files} jsons_upstage_map = {f.stem: f for f in json_upstage_files} for stem, doc_file in docs_map.items(): if stem in jsons_paddle_map and stem in jsons_upstage_map: matched[stem] = { "doc_file": doc_file, "paddle_ocr_file": jsons_paddle_map[stem], "upstage_file": jsons_upstage_map[stem], } return matched def display_pdf(file_path): bytes_data = file_path.read_bytes() base64_pdf = base64.b64encode(bytes_data).decode("utf-8") st.markdown( f'', unsafe_allow_html=True, ) def display_readonly_json(file_path, title): st.subheader(title) try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) is_list = isinstance(data, list) result_to_display = data[0] if is_list and data else data st.json(result_to_display) except Exception as e: st.error(f"JSON 표시 중 오류: {e}") def create_gt_editor(seed, basename, json_file_path, model_name, is_re_edit=False): editor_title = ( "정답셋 재수정" if is_re_edit else f"정답셋 편집 (기반: {model_name})" ) st.subheader(editor_title) try: with open(json_file_path, "r", encoding="utf-8") as f: data = json.load(f) is_list = isinstance(data, list) result_to_display = data[0] if is_list and data else data if not isinstance(result_to_display, dict): st.warning("수정할 수 있는 JSON 객체(dict)가 아닙니다.") return if EDIT_KEY not in result_to_display: st.error(f"편집 대상 키인 '{EDIT_KEY}'를 JSON 파일에서 찾을 수 없습니다.") return current_value = result_to_display.get(EDIT_KEY, "") st.markdown(f"**`{EDIT_KEY}`** 키의 내용을 수정합니다.") new_value_str = st.text_area( "값 수정:", value=str(current_value), height=800, key=f"gt_value_input_{model_name}", ) if st.button("✅ 정답셋으로 저장", key=f"gt_save_button_{model_name}"): modified_data = result_to_display.copy() modified_data[EDIT_KEY] = new_value_str final_data_to_save = [modified_data] if is_list else modified_data gt_path = get_session_path(seed) / "groundtruth" gt_path.mkdir(parents=True, exist_ok=True) save_path = gt_path / f"{basename}.json" with open(save_path, "w", encoding="utf-8") as f: json.dump(final_data_to_save, f, indent=2, ensure_ascii=False) if not is_re_edit: save_completed_file(seed, basename) if "re_edit_gt" in st.session_state: del st.session_state["re_edit_gt"] st.success(f"'{save_path.name}' 파일이 정답셋으로 저장되었습니다!") st.info("상태가 업데이트되었습니다. 페이지를 새로고침합니다.") time.sleep(1) st.rerun() except Exception as e: st.error(f"JSON 편집기 생성 중 오류: {e}") # --- 콜백 함수 --- def handle_nav_button(direction, total_files): if "re_edit_gt" in st.session_state: del st.session_state["re_edit_gt"] if direction == "prev" and st.session_state.current_index > 0: st.session_state.current_index -= 1 elif direction == "next" and st.session_state.current_index < total_files - 1: st.session_state.current_index += 1 def handle_selectbox_change(): if "re_edit_gt" in st.session_state: del st.session_state["re_edit_gt"] st.session_state.current_index = ( int(st.session_state.selectbox_key.split(". ", 1)[0]) - 1 ) # --- 메인 UI 로직 --- def main(): st.set_page_config(layout="wide", page_title="정답셋 생성 도구") if "current_index" not in st.session_state: st.session_state.current_index = 0 SESSION_BASE_PATH.mkdir(parents=True, exist_ok=True) matched_files = None url_seed = st.query_params.get("seed") if url_seed: completed_files = load_completed_files(url_seed) files = load_files_from_session(url_seed) if files[0] is not None: st.success(f"'{url_seed}' 시드에서 파일을 불러왔습니다.") matched_files = match_files_3_way(*files) else: st.error(f"'{url_seed}'에 해당하는 세션을 찾을 수 없습니다.") else: completed_files = set() # --- 사이드바 --- st.sidebar.info("화면을 넓게 보려면 오른쪽 위 화살표를 누르세요 <<") st.sidebar.markdown("---") st.sidebar.header("파일 업로드") if not matched_files: docs = st.sidebar.file_uploader( "1. 원본 문서", accept_multiple_files=True, type=["png", "jpg", "pdf"] ) jsons_paddle = st.sidebar.file_uploader( "2. paddle_ocr JSON", accept_multiple_files=True, type=["json"] ) jsons_upstage = st.sidebar.file_uploader( "3. upstage JSON", accept_multiple_files=True, type=["json"] ) if all([docs, jsons_paddle, jsons_upstage]) and st.sidebar.button( "업로드 및 세션 생성" ): new_seed = str(uuid.uuid4())[:8] save_files_to_session(new_seed, docs, jsons_paddle, jsons_upstage) st.query_params["seed"] = new_seed st.rerun() if url_seed and matched_files is not None: st.sidebar.header("세션 공유") st.sidebar.info("URL을 복사하여 다른 사람과 세션을 공유하세요.") st.sidebar.text_input("공유 시드", url_seed, disabled=True) if not matched_files: st.info("모든 종류의 파일을 업로드하고 세션을 생성하세요.") if matched_files is not None and not matched_files: st.warning( "파일 이름(확장자 제외)이 동일한 '문서-paddle_ocr-upstage' 세트를 찾을 수 없습니다." ) return st.sidebar.header("파일 탐색") sorted_basenames = sorted(list(matched_files.keys())) display_options = [ f"{i+1}. {name} {'✅' if name in completed_files else ''}" for i, name in enumerate(sorted_basenames) ] st.sidebar.selectbox( "파일 선택:", display_options, index=st.session_state.current_index, key="selectbox_key", on_change=handle_selectbox_change, ) st.sidebar.header("보기 옵션") hide_reference = st.sidebar.checkbox("참고용 영역 숨기기", key="hide_reference") st.sidebar.header("내보내기") gt_dir = get_session_path(url_seed) / "groundtruth" if gt_dir.exists() and any(gt_dir.iterdir()): zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: for file_path in gt_dir.glob("*.json"): zip_file.writestr(file_path.name, file_path.read_bytes()) st.sidebar.download_button( label="정답셋 다운로드 (.zip)", data=zip_buffer.getvalue(), file_name=f"groundtruth_{url_seed}.zip", mime="application/zip", ) else: st.sidebar.write("다운로드할 정답셋이 없습니다.") # --- 메인 화면 --- current_basename = ( display_options[st.session_state.current_index] .split(" ", 1)[1] .replace(" ✅", "") .strip() ) pair = matched_files[current_basename] doc_file, paddle_file, upstage_file = ( pair["doc_file"], pair["paddle_ocr_file"], pair["upstage_file"], ) top_container = st.container() is_completed = current_basename in completed_files is_re_editing = st.session_state.get("re_edit_gt") == current_basename with top_container: nav_cols = st.columns([1, 5, 1]) nav_cols[0].button( "◀ 이전", on_click=handle_nav_button, args=("prev", len(sorted_basenames)), use_container_width=True, ) nav_cols[1].markdown( f"