Files
ocr_macro/workspace/blind_test_app.py
2025-10-30 09:38:52 +09:00

515 lines
12 KiB
Python

import streamlit as st
import pandas as pd
import json
from pathlib import Path
import random
import csv
import datetime
import urllib.parse
import os
import base64
# --- Configuration ---
RESULTS_BASE_DIR = Path("results")
# For Docker compatibility, read the docs path from an environment variable.
# Fallback to a relative path for local execution.
DOCS_DIR = Path(os.getenv("DOCS_DIR", "results/docs"))
VOTES_FILE = Path("results/blind_test_votes.csv")
def load_voted_files(votes_file: Path):
"""Loads the filenames of already voted items from the CSV."""
if not votes_file.exists():
return set()
try:
df = pd.read_csv(votes_file)
# The column in save_vote is 'filename', which stores the json filename
return set(df["filename"].unique())
except (pd.errors.EmptyDataError, KeyError):
# Handle empty file or file without the 'filename' column
return set()
def display_pdf(file_path_or_obj):
"""파일 경로 또는 업로드된 파일 객체를 받아 PDF를 표시합니다."""
try:
if isinstance(file_path_or_obj, Path):
with open(file_path_or_obj, "rb") as f:
bytes_data = f.read()
else: # UploadedFile
file_path_or_obj.seek(0)
bytes_data = file_path_or_obj.read()
base64_pdf = base64.b64encode(bytes_data).decode("utf-8")
pdf_display = f'<iframe src="data:application/pdf;base64,{base64_pdf}" width="100%" height="800" type="application/pdf"></iframe>'
st.markdown(pdf_display, unsafe_allow_html=True)
except Exception as e:
st.error(f"PDF 파일을 표시하는 중 오류가 발생했습니다: {e}")
def get_model_dirs(base_dir: Path):
"""Get a list of valid model combination directories."""
if not base_dir.is_dir():
return []
return sorted([d.name for d in base_dir.iterdir() if d.is_dir()])
def get_json_files(model_dir: Path):
"""Get a list of JSON files in a specific directory."""
if not model_dir.is_dir():
return []
return sorted([f.name for f in model_dir.glob("*.json")])
def load_all_versions_of_file(base_dir: Path, filename: str):
"""Loads all versions of a specific file from all model directories."""
all_versions = []
model_dirs = get_model_dirs(base_dir)
for model_dir in model_dirs:
# We need to look for the JSON file, not the original doc
json_filename = Path(filename).stem + ".json"
file_path = base_dir / model_dir / json_filename
if file_path.exists():
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if "model" in data and "result" in data:
record = {
"model_combination_dir": model_dir,
"filename": data.get("filename"), # This should be the original filename
"ocr_model": data.get("model", {}).get("ocr_model"),
"llm_model": data.get("llm_model"),
"parsed": data.get("parsed"),
"result": data.get("result"),
}
all_versions.append(record)
except (json.JSONDecodeError, KeyError) as e:
st.warning(f"Could not read or parse {file_path}: {e}")
return all_versions
def load_voted_files(votes_file: Path, voter_id: str):
"""Loads the filenames of items already voted on by a specific user."""
if not votes_file.exists() or not voter_id:
return set()
try:
df = pd.read_csv(votes_file)
# Filter by the current voter's ID
user_votes = df[df['voter_id'] == voter_id]
return set(user_votes["filename"].unique())
except (pd.errors.EmptyDataError, KeyError):
return set()
def save_vote(filename, vote, comments, shuffled_items, current_file_with_marker, all_display_files, voter_id):
"""Saves the user's vote (with voter_id) and sets the index for the next file."""
if not vote:
st.warning("Please select a result to vote for.")
return
chosen_label = vote.split(" ")[1]
chosen_index = ord(chosen_label) - 65
winner = shuffled_items[chosen_index]
vote_record = {
"timestamp": datetime.datetime.now().isoformat(),
"voter_id": voter_id, # Add voter ID to the record
"filename": filename,
"winning_model_combination": winner["model_combination_dir"],
"winning_ocr_model": winner["ocr_model"],
"winning_llm_model": winner["llm_model"],
"comments": comments,
"all_model_combinations_shown": [item["model_combination_dir"] for item in shuffled_items],
}
VOTES_FILE.parent.mkdir(parents=True, exist_ok=True)
file_exists = VOTES_FILE.exists()
# Define fieldnames to ensure consistent column order, including the new voter_id
fieldnames = ["timestamp", "voter_id", "filename", "winning_model_combination", "winning_ocr_model", "winning_llm_model", "comments", "all_model_combinations_shown"]
with open(VOTES_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists or os.path.getsize(VOTES_FILE) == 0:
writer.writeheader()
writer.writerow(vote_record)
st.toast(f"✅ Vote for '{filename}' saved! Moving to the next file.", icon="🎉")
current_index = all_display_files.index(current_file_with_marker)
next_index = (current_index + 1) % len(all_display_files)
st.session_state['next_file_index'] = next_index
# No st.rerun() needed here, it's automatic after callback
def main():
"""Main function to run the Streamlit app."""
st.set_page_config(layout="wide", page_title="Blind Test Evaluator")
st.title("🕵️‍♂️ OCR & LLM Blind Test Evaluator")
# --- User Authentication ---
st.sidebar.header("👤 Voter Identification")
if 'voter_id' not in st.session_state:
voter_id_input = st.sidebar.text_input("Enter your Voter ID (e.g., user_a):", key="voter_id_input")
if st.sidebar.button("Start Voting"):
if voter_id_input:
st.session_state['voter_id'] = voter_id_input
st.rerun()
else:
st.sidebar.warning("Please enter a Voter ID.")
return # Stop the rest of the app from running until ID is entered
voter_id = st.session_state['voter_id']
st.sidebar.success(f"Logged in as: **{voter_id}**")
if st.sidebar.button("Logout"):
del st.session_state['voter_id']
st.rerun()
st.markdown("Compare results from different models and share the comparison view.")
query_params = st.query_params
initial_dir = query_params.get("dir")
initial_file = query_params.get("file")
st.sidebar.header("📂 Navigation")
model_dirs = get_model_dirs(RESULTS_BASE_DIR)
if not model_dirs:
st.error(f"No result directories found in '{RESULTS_BASE_DIR}'.")
return
try:
dir_index = model_dirs.index(initial_dir) if initial_dir in model_dirs else 0
except ValueError:
dir_index = 0
selected_dir = st.sidebar.selectbox(
"Select a Model Directory:",
model_dirs,
index=dir_index,
key="dir_selector"
)
json_files = get_json_files(RESULTS_BASE_DIR / selected_dir)
if not json_files:
st.sidebar.warning("No JSON files in this directory.")
return
voted_files = load_voted_files(VOTES_FILE, voter_id)
display_files = [f"{f}" if f in voted_files else f for f in json_files]
if 'next_file_index' in st.session_state:
file_index = st.session_state.pop('next_file_index')
else:
try:
initial_display_file = None
if initial_file and initial_file in json_files:
initial_display_file = f"{initial_file}" if initial_file in voted_files else initial_file
file_index = display_files.index(initial_display_file) if initial_display_file in display_files else 0
except (ValueError, TypeError):
file_index = 0
selected_file_with_marker = st.sidebar.selectbox(
"Choose a file to evaluate:",
display_files,
index=file_index,
key="file_selector"
)
selected_json_file = selected_file_with_marker.lstrip("")
st.query_params["dir"] = selected_dir
st.query_params["file"] = selected_json_file
st.header(f"🔍 Evaluating: `{selected_json_file}`")
file_results = load_all_versions_of_file(RESULTS_BASE_DIR, selected_json_file)
if not file_results:
st.warning("Could not find any valid versions of this file across the model directories.")
return
original_doc_filename = file_results[0].get("filename")
if original_doc_filename:
original_doc_path = DOCS_DIR / original_doc_filename
with st.expander("📄 View Original Document", expanded=True):
if original_doc_path.exists():
suffix = original_doc_path.suffix.lower()
if suffix in ['.png', '.jpg', '.jpeg', '.bmp']:
st.image(str(original_doc_path))
elif suffix == '.pdf':
display_pdf(original_doc_path)
else:
st.warning(f"Unsupported file type for preview: '{suffix}'")
else:
st.error(f"Original document not found at: {original_doc_path}")
st.divider()
st.markdown("All available versions of this file are shown below in random order.")
if "shuffled_order" not in st.session_state or st.session_state.get("current_file") != selected_json_file:
st.session_state.current_file = selected_json_file
random.shuffle(file_results)
st.session_state.shuffled_order = file_results
shuffled_results = st.session_state.shuffled_order
num_results = len(shuffled_results)
cols = st.columns(num_results)
vote_options = []
for i, (col, result_item) in enumerate(zip(cols, shuffled_results)):
label = f"Result {chr(65 + i)}"
vote_options.append(label)
with col:
st.subheader(label)
st.markdown(f"**Source:** `?`")
with st.expander("👁️ View Parsed Text (from OCR)"):
st.text_area(
"Parsed Content",
result_item["parsed"],
height=250,
key=f"parsed_{i}"
)
st.write("**Extracted Data (from LLM):**")
st.json(result_item["result"], expanded=True)
st.divider()
st.header("🗳️ Cast Your Vote")
st.markdown("After reviewing all results, select the one you find most accurate.")
col1, col2 = st.columns([1, 2])
with col1:
vote = st.radio(
"Which result is the best?",
options=vote_options,
key=f"vote_{selected_json_file}",
horizontal=True,
)
with col2:
comments = st.text_input(
"Comments (optional)",
key=f"comments_{selected_json_file}"
)
st.button(
"💾 Submit Evaluation & Next",
on_click=save_vote,
args=(selected_json_file, vote, comments, shuffled_results, selected_file_with_marker, display_files, voter_id),
type="primary"
)
if __name__ == "__main__":
main()