vlm 버전 블라인드 테스트 환경

This commit is contained in:
2025-10-30 16:51:52 +09:00
parent 9e51fd56e9
commit f19055f813

View File

@@ -0,0 +1,339 @@
import base64
import csv
import datetime
import json
import os
import random
from pathlib import Path
import pandas as pd
import streamlit as st
# --- Configuration ---
RESULTS_BASE_DIR = Path("results")
# For Docker compatibility, read the docs path from an environment variable.
# Fallback to a relative path for local execution.
DOCS_DIR = Path(os.getenv("DOCS_DIR", "results/docs"))
# Use a new votes file for this version of the app to avoid conflicts.
VOTES_FILE = Path("results/blind_test_votes_v2.csv")
def display_pdf(file_path_or_obj):
"""파일 경로 또는 업로드된 파일 객체를 받아 PDF를 표시합니다."""
try:
if isinstance(file_path_or_obj, Path):
with open(file_path_or_obj, "rb") as f:
bytes_data = f.read()
else: # UploadedFile
file_path_or_obj.seek(0)
bytes_data = file_path_or_obj.read()
base64_pdf = base64.b64encode(bytes_data).decode("utf-8")
pdf_display = f'<iframe src="data:application/pdf;base64,{base64_pdf}" width="100%" height="800" type="application/pdf"></iframe>'
st.markdown(pdf_display, unsafe_allow_html=True)
except Exception as e:
st.error(f"PDF 파일을 표시하는 중 오류가 발생했습니다: {e}")
def get_model_dirs(base_dir: Path):
"""Get a list of valid model combination directories."""
if not base_dir.is_dir():
return []
return sorted([d.name for d in base_dir.iterdir() if d.is_dir()])
def get_json_files(model_dir: Path):
"""Get a list of JSON files in a specific directory."""
if not model_dir.is_dir():
return []
return sorted([f.name for f in model_dir.glob("*.json")])
def load_all_versions_of_file(base_dir: Path, json_filename: str):
"""
Loads all versions of a specific file from all model directories,
adapted for the new JSON structure where 'filename' and 'model' keys are absent.
"""
all_versions = []
model_dirs = get_model_dirs(base_dir)
stem = Path(json_filename).stem
# Find the original document file (pdf, png, etc.) by matching the stem
original_doc_path = next(DOCS_DIR.glob(f"{stem}.*"), None)
original_filename = original_doc_path.name if original_doc_path else None
if not original_filename:
st.warning(
f"Could not find original document for '{json_filename}' in '{DOCS_DIR}'"
)
for model_dir in model_dirs:
file_path = base_dir / model_dir / json_filename
if file_path.exists():
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Check for keys present in the new JSON structure
if "parsed" in data and "result" in data:
record = {
"model_combination_dir": model_dir,
"filename": original_filename, # Filename of the original doc
"llm_model": data.get("llm_model"),
"parsed": data.get("parsed"),
"result": data.get("result"),
}
all_versions.append(record)
except (json.JSONDecodeError, KeyError) as e:
st.warning(f"Could not read or parse {file_path}: {e}")
return all_versions
def load_voted_files(votes_file: Path, voter_id: str):
"""Loads the filenames of items already voted on by a specific user."""
if not votes_file.exists() or not voter_id:
return set()
try:
df = pd.read_csv(votes_file)
# Filter by the current voter's ID
user_votes = df[df["voter_id"] == voter_id]
return set(user_votes["filename"].unique())
except (pd.errors.EmptyDataError, KeyError):
# Handle empty file or file without the 'filename' column
return set()
def save_vote(
filename,
vote,
comments,
shuffled_items,
current_file_with_marker,
all_display_files,
voter_id,
):
"""Saves the user's vote (without ocr_model) and sets the index for the next file."""
if not vote:
st.warning("Please select a result to vote for.")
return
chosen_label = vote.split(" ")[1]
chosen_index = ord(chosen_label) - 65
winner = shuffled_items[chosen_index]
vote_record = {
"timestamp": datetime.datetime.now().isoformat(),
"voter_id": voter_id,
"filename": filename,
"winning_model_combination": winner["model_combination_dir"],
"winning_llm_model": winner["llm_model"],
"comments": comments,
"all_model_combinations_shown": [
item["model_combination_dir"] for item in shuffled_items
],
}
VOTES_FILE.parent.mkdir(parents=True, exist_ok=True)
file_exists = VOTES_FILE.exists()
# Define fieldnames to ensure consistent column order, without winning_ocr_model
fieldnames = [
"timestamp",
"voter_id",
"filename",
"winning_model_combination",
"winning_llm_model",
"comments",
"all_model_combinations_shown",
]
with open(VOTES_FILE, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists or os.path.getsize(VOTES_FILE) == 0:
writer.writeheader()
writer.writerow(vote_record)
st.toast(f"✅ Vote for '{filename}' saved! Moving to the next file.", icon="🎉")
current_index = all_display_files.index(current_file_with_marker)
next_index = (current_index + 1) % len(all_display_files)
st.session_state["next_file_index"] = next_index
# No st.rerun() needed here, it's automatic after callback
def main():
"""Main function to run the Streamlit app."""
st.set_page_config(layout="wide", page_title="Blind Test Evaluator V2")
st.title("🕵️‍♂️ OCR & LLM Blind Test Evaluator (V2)")
# --- User Authentication ---
st.sidebar.header("👤 Voter Identification")
if "voter_id" not in st.session_state:
voter_id_input = st.sidebar.text_input(
"Enter your Voter ID (e.g., user_a):", key="voter_id_input"
)
if st.sidebar.button("Start Voting"):
if voter_id_input:
st.session_state["voter_id"] = voter_id_input
st.rerun()
else:
st.sidebar.warning("Please enter a Voter ID.")
return # Stop the rest of the app from running until ID is entered
voter_id = st.session_state["voter_id"]
st.sidebar.success(f"Logged in as: **{voter_id}**")
if st.sidebar.button("Logout"):
del st.session_state["voter_id"]
st.rerun()
st.markdown("Compare results from different models and share the comparison view.")
query_params = st.query_params
initial_dir = query_params.get("dir")
initial_file = query_params.get("file")
st.sidebar.header("📂 Navigation")
model_dirs = get_model_dirs(RESULTS_BASE_DIR)
if not model_dirs:
st.error(f"No result directories found in '{RESULTS_BASE_DIR}'.")
return
try:
dir_index = model_dirs.index(initial_dir) if initial_dir in model_dirs else 0
except ValueError:
dir_index = 0
selected_dir = st.sidebar.selectbox(
"Select a Model Directory:", model_dirs, index=dir_index, key="dir_selector"
)
json_files = get_json_files(RESULTS_BASE_DIR / selected_dir)
if not json_files:
st.sidebar.warning("No JSON files in this directory.")
return
voted_files = load_voted_files(VOTES_FILE, voter_id)
display_files = [f"{f}" if f in voted_files else f for f in json_files]
if "next_file_index" in st.session_state:
file_index = st.session_state.pop("next_file_index")
else:
try:
initial_display_file = None
if initial_file and initial_file in json_files:
initial_display_file = (
f"{initial_file}"
if initial_file in voted_files
else initial_file
)
file_index = (
display_files.index(initial_display_file)
if initial_display_file in display_files
else 0
)
except (ValueError, TypeError):
file_index = 0
selected_file_with_marker = st.sidebar.selectbox(
"Choose a file to evaluate:",
display_files,
index=file_index,
key="file_selector",
)
selected_json_file = selected_file_with_marker.lstrip("")
st.query_params["dir"] = selected_dir
st.query_params["file"] = selected_json_file
st.header(f"🔍 Evaluating: `{selected_json_file}`")
file_results = load_all_versions_of_file(RESULTS_BASE_DIR, selected_json_file)
if not file_results:
st.warning(
"Could not find any valid versions of this file across the model directories."
)
return
original_doc_filename = file_results[0].get("filename")
if original_doc_filename:
original_doc_path = DOCS_DIR / original_doc_filename
with st.expander("📄 View Original Document", expanded=True):
if original_doc_path.exists():
suffix = original_doc_path.suffix.lower()
if suffix in [".png", ".jpg", ".jpeg", ".bmp"]:
st.image(str(original_doc_path))
elif suffix == ".pdf":
display_pdf(original_doc_path)
else:
st.warning(f"Unsupported file type for preview: '{suffix}'")
else:
st.error(f"Original document not found at: {original_doc_path}")
st.divider()
st.markdown("All available versions of this file are shown below in random order.")
if (
"shuffled_order" not in st.session_state
or st.session_state.get("current_file") != selected_json_file
):
st.session_state.current_file = selected_json_file
random.shuffle(file_results)
st.session_state.shuffled_order = file_results
shuffled_results = st.session_state.shuffled_order
num_results = len(shuffled_results)
cols = st.columns(num_results)
vote_options = []
for i, (col, result_item) in enumerate(zip(cols, shuffled_results)):
label = f"Result {chr(65 + i)}"
vote_options.append(label)
with col:
st.subheader(label)
st.markdown("**Source:** `?`")
with st.expander("👁️ View Parsed Text (from OCR)"):
st.text_area(
"Parsed Content",
result_item["parsed"],
height=250,
key=f"parsed_{i}",
)
st.write("**Extracted Data (from LLM):**")
st.json(result_item["result"], expanded=True)
st.divider()
st.header("🗳️ Cast Your Vote")
st.markdown("After reviewing all results, select the one you find most accurate.")
col1, col2 = st.columns([1, 2])
with col1:
vote = st.radio(
"Which result is the best?",
options=vote_options,
key=f"vote_{selected_json_file}",
horizontal=True,
)
with col2:
comments = st.text_input(
"Comments (optional)", key=f"comments_{selected_json_file}"
)
st.button(
"💾 Submit Evaluation & Next",
on_click=save_vote,
args=(
selected_json_file,
vote,
comments,
shuffled_results,
selected_file_with_marker,
display_files,
voter_id,
),
type="primary",
)
if __name__ == "__main__":
main()