import base64 import csv import datetime import json import os import random from pathlib import Path import pandas as pd import streamlit as st # --- Configuration --- RESULTS_BASE_DIR = Path("results") # For Docker compatibility, read the docs path from an environment variable. # Fallback to a relative path for local execution. DOCS_DIR = Path(os.getenv("DOCS_DIR", "results/docs")) # Use a new votes file for this version of the app to avoid conflicts. VOTES_FILE = Path("results/blind_test_votes_v2.csv") def display_pdf(file_path_or_obj): """파일 경로 또는 업로드된 파일 객체를 받아 PDF를 표시합니다.""" try: if isinstance(file_path_or_obj, Path): with open(file_path_or_obj, "rb") as f: bytes_data = f.read() else: # UploadedFile file_path_or_obj.seek(0) bytes_data = file_path_or_obj.read() base64_pdf = base64.b64encode(bytes_data).decode("utf-8") pdf_display = f'' st.markdown(pdf_display, unsafe_allow_html=True) except Exception as e: st.error(f"PDF 파일을 표시하는 중 오류가 발생했습니다: {e}") def get_model_dirs(base_dir: Path): """Get a list of valid model combination directories.""" if not base_dir.is_dir(): return [] return sorted([d.name for d in base_dir.iterdir() if d.is_dir()]) def get_json_files(model_dir: Path): """Get a list of JSON files in a specific directory.""" if not model_dir.is_dir(): return [] return sorted([f.name for f in model_dir.glob("*.json")]) def load_all_versions_of_file(base_dir: Path, json_filename: str): """ Loads all versions of a specific file from all model directories, adapted for the new JSON structure where 'filename' and 'model' keys are absent. """ all_versions = [] model_dirs = get_model_dirs(base_dir) stem = Path(json_filename).stem # Find the original document file (pdf, png, etc.) by matching the stem original_doc_path = next(DOCS_DIR.glob(f"{stem}.*"), None) original_filename = original_doc_path.name if original_doc_path else None if not original_filename: st.warning( f"Could not find original document for '{json_filename}' in '{DOCS_DIR}'" ) for model_dir in model_dirs: file_path = base_dir / model_dir / json_filename if file_path.exists(): try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) # Check for keys present in the new JSON structure if "parsed" in data and "result" in data: record = { "model_combination_dir": model_dir, "filename": original_filename, # Filename of the original doc "llm_model": data.get("llm_model"), "parsed": data.get("parsed"), "result": data.get("result"), } all_versions.append(record) except (json.JSONDecodeError, KeyError) as e: st.warning(f"Could not read or parse {file_path}: {e}") return all_versions def load_voted_files(votes_file: Path, voter_id: str): """Loads the filenames of items already voted on by a specific user.""" if not votes_file.exists() or not voter_id: return set() try: df = pd.read_csv(votes_file) # Filter by the current voter's ID user_votes = df[df["voter_id"] == voter_id] return set(user_votes["filename"].unique()) except (pd.errors.EmptyDataError, KeyError): # Handle empty file or file without the 'filename' column return set() def save_vote( filename, vote, comments, shuffled_items, current_file_with_marker, all_display_files, voter_id, ): """Saves the user's vote (without ocr_model) and sets the index for the next file.""" if not vote: st.warning("Please select a result to vote for.") return chosen_label = vote.split(" ")[1] chosen_index = ord(chosen_label) - 65 winner = shuffled_items[chosen_index] vote_record = { "timestamp": datetime.datetime.now().isoformat(), "voter_id": voter_id, "filename": filename, "winning_model_combination": winner["model_combination_dir"], "winning_llm_model": winner["llm_model"], "comments": comments, "all_model_combinations_shown": [ item["model_combination_dir"] for item in shuffled_items ], } VOTES_FILE.parent.mkdir(parents=True, exist_ok=True) file_exists = VOTES_FILE.exists() # Define fieldnames to ensure consistent column order, without winning_ocr_model fieldnames = [ "timestamp", "voter_id", "filename", "winning_model_combination", "winning_llm_model", "comments", "all_model_combinations_shown", ] with open(VOTES_FILE, "a", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) if not file_exists or os.path.getsize(VOTES_FILE) == 0: writer.writeheader() writer.writerow(vote_record) st.toast(f"✅ Vote for '{filename}' saved! Moving to the next file.", icon="🎉") current_index = all_display_files.index(current_file_with_marker) next_index = (current_index + 1) % len(all_display_files) st.session_state["next_file_index"] = next_index # No st.rerun() needed here, it's automatic after callback def main(): """Main function to run the Streamlit app.""" st.set_page_config(layout="wide", page_title="Blind Test Evaluator V2") st.title("🕵️‍♂️ OCR & LLM Blind Test Evaluator (V2)") # --- User Authentication --- st.sidebar.header("👤 Voter Identification") if "voter_id" not in st.session_state: voter_id_input = st.sidebar.text_input( "Enter your Voter ID (e.g., user_a):", key="voter_id_input" ) if st.sidebar.button("Start Voting"): if voter_id_input: st.session_state["voter_id"] = voter_id_input st.rerun() else: st.sidebar.warning("Please enter a Voter ID.") return # Stop the rest of the app from running until ID is entered voter_id = st.session_state["voter_id"] st.sidebar.success(f"Logged in as: **{voter_id}**") if st.sidebar.button("Logout"): del st.session_state["voter_id"] st.rerun() st.markdown("Compare results from different models and share the comparison view.") query_params = st.query_params initial_dir = query_params.get("dir") initial_file = query_params.get("file") st.sidebar.header("📂 Navigation") model_dirs = get_model_dirs(RESULTS_BASE_DIR) if not model_dirs: st.error(f"No result directories found in '{RESULTS_BASE_DIR}'.") return try: dir_index = model_dirs.index(initial_dir) if initial_dir in model_dirs else 0 except ValueError: dir_index = 0 selected_dir = st.sidebar.selectbox( "Select a Model Directory:", model_dirs, index=dir_index, key="dir_selector" ) json_files = get_json_files(RESULTS_BASE_DIR / selected_dir) if not json_files: st.sidebar.warning("No JSON files in this directory.") return voted_files = load_voted_files(VOTES_FILE, voter_id) display_files = [f"✅ {f}" if f in voted_files else f for f in json_files] if "next_file_index" in st.session_state: file_index = st.session_state.pop("next_file_index") else: try: initial_display_file = None if initial_file and initial_file in json_files: initial_display_file = ( f"✅ {initial_file}" if initial_file in voted_files else initial_file ) file_index = ( display_files.index(initial_display_file) if initial_display_file in display_files else 0 ) except (ValueError, TypeError): file_index = 0 selected_file_with_marker = st.sidebar.selectbox( "Choose a file to evaluate:", display_files, index=file_index, key="file_selector", ) selected_json_file = selected_file_with_marker.lstrip("✅ ") st.query_params["dir"] = selected_dir st.query_params["file"] = selected_json_file st.header(f"🔍 Evaluating: `{selected_json_file}`") file_results = load_all_versions_of_file(RESULTS_BASE_DIR, selected_json_file) if not file_results: st.warning( "Could not find any valid versions of this file across the model directories." ) return original_doc_filename = file_results[0].get("filename") if original_doc_filename: original_doc_path = DOCS_DIR / original_doc_filename with st.expander("📄 View Original Document", expanded=True): if original_doc_path.exists(): suffix = original_doc_path.suffix.lower() if suffix in [".png", ".jpg", ".jpeg", ".bmp"]: st.image(str(original_doc_path)) elif suffix == ".pdf": display_pdf(original_doc_path) else: st.warning(f"Unsupported file type for preview: '{suffix}'") else: st.error(f"Original document not found at: {original_doc_path}") st.divider() st.markdown("All available versions of this file are shown below in random order.") if ( "shuffled_order" not in st.session_state or st.session_state.get("current_file") != selected_json_file ): st.session_state.current_file = selected_json_file random.shuffle(file_results) st.session_state.shuffled_order = file_results shuffled_results = st.session_state.shuffled_order num_results = len(shuffled_results) cols = st.columns(num_results) vote_options = [] for i, (col, result_item) in enumerate(zip(cols, shuffled_results)): label = f"Result {chr(65 + i)}" vote_options.append(label) with col: st.subheader(label) st.markdown("**Source:** `?`") with st.expander("👁️ View Parsed Text (from OCR)"): st.text_area( "Parsed Content", result_item["parsed"], height=250, key=f"parsed_{i}", ) st.write("**Extracted Data (from LLM):**") st.json(result_item["result"], expanded=True) st.divider() st.header("🗳️ Cast Your Vote") st.markdown("After reviewing all results, select the one you find most accurate.") col1, col2 = st.columns([1, 2]) with col1: vote = st.radio( "Which result is the best?", options=vote_options, key=f"vote_{selected_json_file}", horizontal=True, ) with col2: comments = st.text_input( "Comments (optional)", key=f"comments_{selected_json_file}" ) st.button( "💾 Submit Evaluation & Next", on_click=save_vote, args=( selected_json_file, vote, comments, shuffled_results, selected_file_with_marker, display_files, voter_id, ), type="primary", ) if __name__ == "__main__": main()