Files
ocr_macro/workspace/app.py
2025-09-03 10:29:02 +09:00

456 lines
16 KiB
Python

# app.py (UI 최종 개선)
import base64
import io
import json
import shutil
import time
import uuid
import zipfile
from pathlib import Path
import streamlit as st
# --- 상수 ---
SESSION_BASE_PATH = Path(__file__).parent / "shared_sessions"
EDIT_KEY = "parsed" # 수정할 키를 'parsed'로 고정
# --- 상태 관리 함수 (세션 기반) ---
def get_session_path(seed):
return SESSION_BASE_PATH / seed
def get_session_completed_log_path(seed):
return get_session_path(seed) / "completed_files.json"
def load_completed_files(seed):
log_path = get_session_completed_log_path(seed)
if not log_path.exists():
return set()
try:
with open(log_path, "r", encoding="utf-8") as f:
return set(json.load(f))
except (json.JSONDecodeError, FileNotFoundError):
return set()
def save_completed_file(seed, basename):
completed_set = load_completed_files(seed)
completed_set.add(basename)
log_path = get_session_completed_log_path(seed)
with open(log_path, "w", encoding="utf-8") as f:
json.dump(list(completed_set), f, indent=2)
def get_existing_sessions():
""" "shared_sessions" 디렉토리에서 기존 세션 목록을 가져옵니다. """
if not SESSION_BASE_PATH.exists():
return []
return sorted([d.name for d in SESSION_BASE_PATH.iterdir() if d.is_dir()])
# --- 헬퍼 함수 ---
def save_files_to_session(seed, doc_files, json_paddle_files, json_upstage_files):
session_path = get_session_path(seed)
doc_path = session_path / "docs"
json_paddle_path = session_path / "jsons_paddle_ocr"
json_upstage_path = session_path / "jsons_upstage"
if session_path.exists():
shutil.rmtree(session_path)
for p in [doc_path, json_paddle_path, json_upstage_path]:
p.mkdir(parents=True, exist_ok=True)
for f in doc_files:
(doc_path / f.name).write_bytes(f.getbuffer())
for f in json_paddle_files:
(json_paddle_path / f.name).write_bytes(f.getbuffer())
for f in json_upstage_files:
(json_upstage_path / f.name).write_bytes(f.getbuffer())
def load_files_from_session(seed):
session_path = get_session_path(seed)
doc_path = session_path / "docs"
json_paddle_path = session_path / "jsons_paddle_ocr"
json_upstage_path = session_path / "jsons_upstage"
if not all([p.is_dir() for p in [doc_path, json_paddle_path, json_upstage_path]]):
return None, None, None
return (
sorted(doc_path.iterdir()),
sorted(json_paddle_path.iterdir()),
sorted(json_upstage_path.iterdir()),
)
def match_files_3_way(doc_files, json_paddle_files, json_upstage_files):
matched = {}
docs_map = {f.stem: f for f in doc_files}
jsons_paddle_map = {f.stem: f for f in json_paddle_files}
jsons_upstage_map = {f.stem: f for f in json_upstage_files}
for stem, doc_file in docs_map.items():
if stem in jsons_paddle_map and stem in jsons_upstage_map:
matched[stem] = {
"doc_file": doc_file,
"paddle_ocr_file": jsons_paddle_map[stem],
"upstage_file": jsons_upstage_map[stem],
}
return matched
def display_pdf(file_path):
bytes_data = file_path.read_bytes()
base64_pdf = base64.b64encode(bytes_data).decode("utf-8")
st.markdown(
f'<iframe src="data:application/pdf;base64,{base64_pdf}" width="100%" height="800" type="application/pdf"></iframe>',
unsafe_allow_html=True,
)
def display_readonly_json(file_path, title):
st.subheader(title)
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
is_list = isinstance(data, list)
result_to_display = data[0] if is_list and data else data
st.json(result_to_display)
except Exception as e:
st.error(f"JSON 표시 중 오류: {e}")
def create_gt_editor(seed, basename, json_file_path, model_name, is_re_edit=False):
editor_title = (
"정답셋 재수정" if is_re_edit else f"정답셋 편집 (기반: {model_name})"
)
st.subheader(editor_title)
try:
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
is_list = isinstance(data, list)
result_to_display = data[0] if is_list and data else data
if not isinstance(result_to_display, dict):
st.warning("수정할 수 있는 JSON 객체(dict)가 아닙니다.")
return
if EDIT_KEY not in result_to_display:
st.error(f"편집 대상 키인 '{EDIT_KEY}'를 JSON 파일에서 찾을 수 없습니다.")
return
current_value = result_to_display.get(EDIT_KEY, "")
st.markdown(f"**`{EDIT_KEY}`** 키의 내용을 수정합니다.")
new_value_str = st.text_area(
"값 수정:",
value=str(current_value),
height=800,
key=f"gt_value_input_{model_name}",
)
if st.button("✅ 정답셋으로 저장", key=f"gt_save_button_{model_name}"):
modified_data = result_to_display.copy()
modified_data[EDIT_KEY] = new_value_str
final_data_to_save = [modified_data] if is_list else modified_data
gt_path = get_session_path(seed) / "groundtruth"
gt_path.mkdir(parents=True, exist_ok=True)
save_path = gt_path / f"{basename}.json"
with open(save_path, "w", encoding="utf-8") as f:
json.dump(final_data_to_save, f, indent=2, ensure_ascii=False)
if not is_re_edit:
save_completed_file(seed, basename)
if "re_edit_gt" in st.session_state:
del st.session_state["re_edit_gt"]
st.success(f"'{save_path.name}' 파일이 정답셋으로 저장되었습니다!")
st.info("상태가 업데이트되었습니다. 페이지를 새로고침합니다.")
time.sleep(1)
st.rerun()
except Exception as e:
st.error(f"JSON 편집기 생성 중 오류: {e}")
# --- 콜백 함수 ---
def handle_nav_button(direction, total_files):
if "re_edit_gt" in st.session_state:
del st.session_state["re_edit_gt"]
if direction == "prev" and st.session_state.current_index > 0:
st.session_state.current_index -= 1
elif direction == "next" and st.session_state.current_index < total_files - 1:
st.session_state.current_index += 1
def handle_selectbox_change():
if "re_edit_gt" in st.session_state:
del st.session_state["re_edit_gt"]
st.session_state.current_index = (
int(st.session_state.selectbox_key.split(". ", 1)[0]) - 1
)
# --- 메인 UI 로직 ---
def main():
st.set_page_config(layout="wide", page_title="정답셋 생성 도구")
if "current_index" not in st.session_state:
st.session_state.current_index = 0
SESSION_BASE_PATH.mkdir(parents=True, exist_ok=True)
st.sidebar.info("화면을 넓게 보려면 오른쪽 위 화살표를 누르세요 <<")
st.sidebar.markdown("---")
st.sidebar.header("세션 선택")
existing_sessions = get_existing_sessions()
session_options = ["새 세션 생성"] + existing_sessions
current_seed_from_url = st.query_params.get("seed")
# URL에 시드가 없으면 "새 세션 생성"을 기본값으로
if not current_seed_from_url:
current_selection = "새 세션 생성"
# URL의 시드가 존재하지 않는 세션이면 경고 후 "새 세션 생성"으로
elif current_seed_from_url not in existing_sessions:
st.sidebar.warning(f"URL의 시드 '{current_seed_from_url}'에 해당하는 세션을 찾을 수 없습니다.")
current_selection = "새 세션 생성"
# 잘못된 시드는 URL에서 제거
if "seed" in st.query_params:
del st.query_params["seed"]
else:
current_selection = current_seed_from_url
selected_session = st.sidebar.selectbox(
"작업할 세션을 선택하세요.",
session_options,
index=session_options.index(current_selection),
key="session_selector",
)
# 사용자가 선택을 변경하면 URL을 업데이트하고 앱을 다시 실행
if selected_session != current_selection:
if selected_session == "새 세션 생성":
if "seed" in st.query_params:
del st.query_params["seed"]
else:
st.query_params["seed"] = selected_session
st.rerun()
# --- 이후 로직은 URL의 'seed' 쿼리 파라미터를 기반으로 동작 ---
url_seed = st.query_params.get("seed")
matched_files = None
completed_files = set()
if url_seed:
completed_files = load_completed_files(url_seed)
files = load_files_from_session(url_seed)
if files[0] is not None:
st.success(f"'{url_seed}' 세션에서 파일을 불러왔습니다.")
matched_files = match_files_3_way(*files)
else:
# 이 경우는 위에서 처리되었지만, 안전장치로 남겨둠
st.error(f"'{url_seed}'에 해당하는 세션을 찾을 수 없습니다.")
if "seed" in st.query_params:
del st.query_params["seed"]
st.rerun()
# 파일 업로드 UI는 새 세션 생성 시에만 표시
if not url_seed:
st.sidebar.header("새 세션 생성 (파일 업로드)")
docs = st.sidebar.file_uploader(
"1. 원본 문서", accept_multiple_files=True, type=["png", "jpg", "pdf"]
)
jsons_paddle = st.sidebar.file_uploader(
"2. paddle_ocr JSON", accept_multiple_files=True, type=["json"]
)
jsons_upstage = st.sidebar.file_uploader(
"3. upstage JSON", accept_multiple_files=True, type=["json"]
)
if all([docs, jsons_paddle, jsons_upstage]) and st.sidebar.button(
"업로드 및 세션 생성"
):
new_seed = str(uuid.uuid4())[:8]
save_files_to_session(new_seed, docs, jsons_paddle, jsons_upstage)
st.query_params["seed"] = new_seed
st.rerun()
if url_seed and matched_files is not None:
st.sidebar.header("세션 공유")
st.sidebar.info("URL을 복사하여 다른 사람과 세션을 공유하세요.")
st.sidebar.text_input("공유 시드", url_seed, disabled=True)
if not url_seed:
st.info("새로운 세션을 생성하려면 사이드바에서 모든 종류의 파일을 업로드하세요.")
return
if not matched_files:
st.warning(
"파일 이름(확장자 제외)이 동일한 '문서-paddle_ocr-upstage' 세트를 찾을 수 없습니다."
)
return
st.sidebar.header("파일 탐색")
sorted_basenames = sorted(list(matched_files.keys()))
display_options = [
f"{i+1}. {name} {'' if name in completed_files else ''}"
for i, name in enumerate(sorted_basenames)
]
st.sidebar.selectbox(
"파일 선택:",
display_options,
index=st.session_state.current_index,
key="selectbox_key",
on_change=handle_selectbox_change,
)
st.sidebar.header("보기 옵션")
hide_reference = st.sidebar.checkbox("참고용 영역 숨기기", key="hide_reference")
st.sidebar.header("내보내기")
gt_dir = get_session_path(url_seed) / "groundtruth"
if gt_dir.exists() and any(gt_dir.iterdir()):
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
for file_path in gt_dir.glob("*.json"):
zip_file.writestr(file_path.name, file_path.read_bytes())
st.sidebar.download_button(
label="정답셋 다운로드 (.zip)",
data=zip_buffer.getvalue(),
file_name=f"groundtruth_{url_seed}.zip",
mime="application/zip",
)
else:
st.sidebar.write("다운로드할 정답셋이 없습니다.")
# --- 메인 화면 ---
current_basename = (
display_options[st.session_state.current_index]
.split(" ", 1)[1]
.replace("", "")
.strip()
)
pair = matched_files[current_basename]
doc_file, paddle_file, upstage_file = (
pair["doc_file"],
pair["paddle_ocr_file"],
pair["upstage_file"],
)
top_container = st.container()
is_completed = current_basename in completed_files
is_re_editing = st.session_state.get("re_edit_gt") == current_basename
with top_container:
nav_cols = st.columns([1, 5, 1])
nav_cols[0].button(
"◀ 이전",
on_click=handle_nav_button,
args=("prev", len(sorted_basenames)),
use_container_width=True,
)
nav_cols[1].markdown(
f"<h4 style='text-align: center; white-space: nowrap; overflow: hidden; text-overflow: ellipsis;'>{current_basename} ({st.session_state.current_index + 1}/{len(sorted_basenames)})</h4>",
unsafe_allow_html=True,
)
nav_cols[2].button(
"다음 ▶",
on_click=handle_nav_button,
args=("next", len(sorted_basenames)),
use_container_width=True,
)
st.markdown("---")
if is_completed and not is_re_editing:
msg_col, btn_col = st.columns([3, 1])
msg_col.success("✅ 이 파일은 정답셋 생성이 완료되었습니다.")
if btn_col.button("🔄 정답셋 다시 수정하기"):
st.session_state.re_edit_gt = current_basename
st.rerun()
elif not is_completed:
_, radio_col = st.columns([3, 1])
source_model_name = radio_col.radio(
"정답셋 편집 기반 모델 선택:",
("paddle_ocr", "upstage"),
horizontal=True,
label_visibility="collapsed",
)
if hide_reference:
col1, col3 = st.columns([1, 1])
else:
col1, col2, col3 = st.columns([2, 1, 1])
with col1:
st.subheader(f"원본 문서: `{doc_file.name}`")
if doc_file.suffix.lower() == ".pdf":
display_pdf(doc_file)
else:
st.image(str(doc_file), use_container_width=True)
if is_re_editing:
gt_file_path = (
get_session_path(url_seed) / "groundtruth" / f"{current_basename}.json"
)
if gt_file_path.exists():
with col3:
create_gt_editor(
url_seed,
current_basename,
gt_file_path,
"Ground Truth",
is_re_edit=True,
)
else:
st.error("저장된 정답셋 파일을 찾을 수 없습니다. 새로 생성해주세요.")
if "re_edit_gt" in st.session_state:
del st.session_state.re_edit_gt
elif is_completed:
with col3:
gt_file_path = (
get_session_path(url_seed) / "groundtruth" / f"{current_basename}.json"
)
if gt_file_path.exists():
display_readonly_json(gt_file_path, "저장된 정답셋")
else:
st.warning("저장된 정답셋 파일을 찾을 수 없습니다.")
else:
if source_model_name == "paddle_ocr":
with col3:
create_gt_editor(url_seed, current_basename, paddle_file, "paddle_ocr")
else:
with col3:
create_gt_editor(url_seed, current_basename, upstage_file, "upstage")
if not hide_reference:
with col2:
if is_re_editing:
st.empty()
elif is_completed:
ref_model_name = st.radio(
"참고용 모델 선택:",
("paddle_ocr", "upstage"),
horizontal=True,
key=f"ref_select_{current_basename}",
)
if ref_model_name == "paddle_ocr":
display_readonly_json(paddle_file, "참고용: paddle_ocr")
else:
display_readonly_json(upstage_file, "참고용: upstage")
else:
if source_model_name == "paddle_ocr":
display_readonly_json(paddle_file, "참고용: paddle_ocr")
else:
display_readonly_json(upstage_file, "참고용: upstage")
if __name__ == "__main__":
main()