diff --git a/workspace/blind_test_app_v2.py b/workspace/blind_test_app_v2.py new file mode 100644 index 0000000..8f14fab --- /dev/null +++ b/workspace/blind_test_app_v2.py @@ -0,0 +1,339 @@ +import base64 +import csv +import datetime +import json +import os +import random +from pathlib import Path + +import pandas as pd +import streamlit as st + +# --- Configuration --- +RESULTS_BASE_DIR = Path("results") +# For Docker compatibility, read the docs path from an environment variable. +# Fallback to a relative path for local execution. +DOCS_DIR = Path(os.getenv("DOCS_DIR", "results/docs")) +# Use a new votes file for this version of the app to avoid conflicts. +VOTES_FILE = Path("results/blind_test_votes_v2.csv") + + +def display_pdf(file_path_or_obj): + """파일 경로 또는 업로드된 파일 객체를 받아 PDF를 표시합니다.""" + try: + if isinstance(file_path_or_obj, Path): + with open(file_path_or_obj, "rb") as f: + bytes_data = f.read() + else: # UploadedFile + file_path_or_obj.seek(0) + bytes_data = file_path_or_obj.read() + + base64_pdf = base64.b64encode(bytes_data).decode("utf-8") + pdf_display = f'' + st.markdown(pdf_display, unsafe_allow_html=True) + except Exception as e: + st.error(f"PDF 파일을 표시하는 중 오류가 발생했습니다: {e}") + + +def get_model_dirs(base_dir: Path): + """Get a list of valid model combination directories.""" + if not base_dir.is_dir(): + return [] + return sorted([d.name for d in base_dir.iterdir() if d.is_dir()]) + + +def get_json_files(model_dir: Path): + """Get a list of JSON files in a specific directory.""" + if not model_dir.is_dir(): + return [] + return sorted([f.name for f in model_dir.glob("*.json")]) + + +def load_all_versions_of_file(base_dir: Path, json_filename: str): + """ + Loads all versions of a specific file from all model directories, + adapted for the new JSON structure where 'filename' and 'model' keys are absent. + """ + all_versions = [] + model_dirs = get_model_dirs(base_dir) + stem = Path(json_filename).stem + + # Find the original document file (pdf, png, etc.) by matching the stem + original_doc_path = next(DOCS_DIR.glob(f"{stem}.*"), None) + original_filename = original_doc_path.name if original_doc_path else None + + if not original_filename: + st.warning( + f"Could not find original document for '{json_filename}' in '{DOCS_DIR}'" + ) + + for model_dir in model_dirs: + file_path = base_dir / model_dir / json_filename + if file_path.exists(): + try: + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # Check for keys present in the new JSON structure + if "parsed" in data and "result" in data: + record = { + "model_combination_dir": model_dir, + "filename": original_filename, # Filename of the original doc + "llm_model": data.get("llm_model"), + "parsed": data.get("parsed"), + "result": data.get("result"), + } + all_versions.append(record) + except (json.JSONDecodeError, KeyError) as e: + st.warning(f"Could not read or parse {file_path}: {e}") + + return all_versions + + +def load_voted_files(votes_file: Path, voter_id: str): + """Loads the filenames of items already voted on by a specific user.""" + if not votes_file.exists() or not voter_id: + return set() + try: + df = pd.read_csv(votes_file) + # Filter by the current voter's ID + user_votes = df[df["voter_id"] == voter_id] + return set(user_votes["filename"].unique()) + except (pd.errors.EmptyDataError, KeyError): + # Handle empty file or file without the 'filename' column + return set() + + +def save_vote( + filename, + vote, + comments, + shuffled_items, + current_file_with_marker, + all_display_files, + voter_id, +): + """Saves the user's vote (without ocr_model) and sets the index for the next file.""" + if not vote: + st.warning("Please select a result to vote for.") + return + + chosen_label = vote.split(" ")[1] + chosen_index = ord(chosen_label) - 65 + winner = shuffled_items[chosen_index] + + vote_record = { + "timestamp": datetime.datetime.now().isoformat(), + "voter_id": voter_id, + "filename": filename, + "winning_model_combination": winner["model_combination_dir"], + "winning_llm_model": winner["llm_model"], + "comments": comments, + "all_model_combinations_shown": [ + item["model_combination_dir"] for item in shuffled_items + ], + } + + VOTES_FILE.parent.mkdir(parents=True, exist_ok=True) + file_exists = VOTES_FILE.exists() + + # Define fieldnames to ensure consistent column order, without winning_ocr_model + fieldnames = [ + "timestamp", + "voter_id", + "filename", + "winning_model_combination", + "winning_llm_model", + "comments", + "all_model_combinations_shown", + ] + + with open(VOTES_FILE, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + if not file_exists or os.path.getsize(VOTES_FILE) == 0: + writer.writeheader() + writer.writerow(vote_record) + + st.toast(f"✅ Vote for '{filename}' saved! Moving to the next file.", icon="🎉") + + current_index = all_display_files.index(current_file_with_marker) + next_index = (current_index + 1) % len(all_display_files) + st.session_state["next_file_index"] = next_index + # No st.rerun() needed here, it's automatic after callback + + +def main(): + """Main function to run the Streamlit app.""" + st.set_page_config(layout="wide", page_title="Blind Test Evaluator V2") + st.title("🕵️‍♂️ OCR & LLM Blind Test Evaluator (V2)") + + # --- User Authentication --- + st.sidebar.header("👤 Voter Identification") + if "voter_id" not in st.session_state: + voter_id_input = st.sidebar.text_input( + "Enter your Voter ID (e.g., user_a):", key="voter_id_input" + ) + if st.sidebar.button("Start Voting"): + if voter_id_input: + st.session_state["voter_id"] = voter_id_input + st.rerun() + else: + st.sidebar.warning("Please enter a Voter ID.") + return # Stop the rest of the app from running until ID is entered + + voter_id = st.session_state["voter_id"] + st.sidebar.success(f"Logged in as: **{voter_id}**") + if st.sidebar.button("Logout"): + del st.session_state["voter_id"] + st.rerun() + + st.markdown("Compare results from different models and share the comparison view.") + + query_params = st.query_params + initial_dir = query_params.get("dir") + initial_file = query_params.get("file") + + st.sidebar.header("📂 Navigation") + model_dirs = get_model_dirs(RESULTS_BASE_DIR) + if not model_dirs: + st.error(f"No result directories found in '{RESULTS_BASE_DIR}'.") + return + + try: + dir_index = model_dirs.index(initial_dir) if initial_dir in model_dirs else 0 + except ValueError: + dir_index = 0 + + selected_dir = st.sidebar.selectbox( + "Select a Model Directory:", model_dirs, index=dir_index, key="dir_selector" + ) + + json_files = get_json_files(RESULTS_BASE_DIR / selected_dir) + if not json_files: + st.sidebar.warning("No JSON files in this directory.") + return + + voted_files = load_voted_files(VOTES_FILE, voter_id) + display_files = [f"✅ {f}" if f in voted_files else f for f in json_files] + + if "next_file_index" in st.session_state: + file_index = st.session_state.pop("next_file_index") + else: + try: + initial_display_file = None + if initial_file and initial_file in json_files: + initial_display_file = ( + f"✅ {initial_file}" + if initial_file in voted_files + else initial_file + ) + file_index = ( + display_files.index(initial_display_file) + if initial_display_file in display_files + else 0 + ) + except (ValueError, TypeError): + file_index = 0 + + selected_file_with_marker = st.sidebar.selectbox( + "Choose a file to evaluate:", + display_files, + index=file_index, + key="file_selector", + ) + selected_json_file = selected_file_with_marker.lstrip("✅ ") + + st.query_params["dir"] = selected_dir + st.query_params["file"] = selected_json_file + + st.header(f"🔍 Evaluating: `{selected_json_file}`") + + file_results = load_all_versions_of_file(RESULTS_BASE_DIR, selected_json_file) + if not file_results: + st.warning( + "Could not find any valid versions of this file across the model directories." + ) + return + + original_doc_filename = file_results[0].get("filename") + if original_doc_filename: + original_doc_path = DOCS_DIR / original_doc_filename + with st.expander("📄 View Original Document", expanded=True): + if original_doc_path.exists(): + suffix = original_doc_path.suffix.lower() + if suffix in [".png", ".jpg", ".jpeg", ".bmp"]: + st.image(str(original_doc_path)) + elif suffix == ".pdf": + display_pdf(original_doc_path) + else: + st.warning(f"Unsupported file type for preview: '{suffix}'") + else: + st.error(f"Original document not found at: {original_doc_path}") + + st.divider() + st.markdown("All available versions of this file are shown below in random order.") + + if ( + "shuffled_order" not in st.session_state + or st.session_state.get("current_file") != selected_json_file + ): + st.session_state.current_file = selected_json_file + random.shuffle(file_results) + st.session_state.shuffled_order = file_results + + shuffled_results = st.session_state.shuffled_order + num_results = len(shuffled_results) + cols = st.columns(num_results) + vote_options = [] + + for i, (col, result_item) in enumerate(zip(cols, shuffled_results)): + label = f"Result {chr(65 + i)}" + vote_options.append(label) + with col: + st.subheader(label) + st.markdown("**Source:** `?`") + with st.expander("👁️ View Parsed Text (from OCR)"): + st.text_area( + "Parsed Content", + result_item["parsed"], + height=250, + key=f"parsed_{i}", + ) + st.write("**Extracted Data (from LLM):**") + st.json(result_item["result"], expanded=True) + + st.divider() + st.header("🗳️ Cast Your Vote") + st.markdown("After reviewing all results, select the one you find most accurate.") + + col1, col2 = st.columns([1, 2]) + with col1: + vote = st.radio( + "Which result is the best?", + options=vote_options, + key=f"vote_{selected_json_file}", + horizontal=True, + ) + with col2: + comments = st.text_input( + "Comments (optional)", key=f"comments_{selected_json_file}" + ) + + st.button( + "💾 Submit Evaluation & Next", + on_click=save_vote, + args=( + selected_json_file, + vote, + comments, + shuffled_results, + selected_file_with_marker, + display_files, + voter_id, + ), + type="primary", + ) + + +if __name__ == "__main__": + main()