Files
railway-client/tools/labeling_server.py
minsung ccba1266b5 프로젝트 분리 이동
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 14:28:27 +09:00

504 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Control Box 라벨링 서버 v2 — 전체 이미지 + bbox 오버레이 방식
사용법: python tools/labeling_server.py \
--json "data/역사이미지/slope/DJI_20260306113838_0004_everything.json"
[--reset] DB 초기화
브라우저: http://localhost:7001
"""
import argparse, json, sqlite3, sys
from collections import defaultdict
from pathlib import Path
import cv2, numpy as np
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, JSONResponse, Response
import uvicorn
ROOT = Path(__file__).parent.parent
DB = ROOT / "labels" / "labeling.db"
MIN_VOTES = 3
TRUE_RATIO = 0.6
MAX_DIM = 3000 # served image max dimension (px)
CONTROL_LABELS = {
"small dark object on ballast", "manhole cover", "trackside enclosure",
"dark rectangle on ground", "small square box", "small metal cabinet",
"small cube on gravel", "compact trackside junction box",
"small square gray metal box beside rail",
"small near-square electrical enclosure on the ground",
}
app = FastAPI()
_img_data: bytes = None
_orig_w = _orig_h = _disp_w = _disp_h = 0
# ── DB ────────────────────────────────────────────────────────────────────────
def get_db():
con = sqlite3.connect(str(DB))
con.row_factory = sqlite3.Row
return con
def init_db(candidates: list):
DB.parent.mkdir(parents=True, exist_ok=True)
con = get_db()
con.executescript("""
CREATE TABLE IF NOT EXISTS candidates (
id INTEGER PRIMARY KEY,
json_idx INTEGER,
label TEXT,
score REAL,
bbox TEXT,
image_path TEXT
);
CREATE TABLE IF NOT EXISTS votes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
candidate_id INTEGER,
user TEXT,
vote INTEGER,
ts DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(candidate_id, user)
);
""")
if con.execute("SELECT COUNT(*) FROM candidates").fetchone()[0] > 0:
n = con.execute("SELECT COUNT(*) FROM candidates").fetchone()[0]
print(f"DB 기존 데이터 유지. candidates={n}")
con.close()
return
for c in candidates:
con.execute(
"INSERT INTO candidates(json_idx,label,score,bbox,image_path) VALUES(?,?,?,?,?)",
(c["json_idx"], c["label"], c["score"], json.dumps(c["bbox"]), c["image_path"]),
)
con.commit()
print(f"DB 등록: {len(candidates)}")
con.close()
# ── 후보 로드 ─────────────────────────────────────────────────────────────────
def load_candidates(json_path: Path) -> list:
data = json.loads(json_path.read_text(encoding="utf-8"))
stem = json_path.stem.replace("_everything", "")
img_path = next(
(json_path.parent / f"{stem}{ext}" for ext in (".JPG", ".jpg", ".png")
if (json_path.parent / f"{stem}{ext}").exists()),
None,
)
if img_path is None:
raise FileNotFoundError(f"원본 이미지 없음: {json_path.parent / stem}.*")
candidates = []
for idx, seg in enumerate(data.get("segments", [])):
label = seg.get("label", "").strip()
if label not in CONTROL_LABELS:
continue
x0, y0, x1, y1 = [float(v) for v in seg["bbox"]]
if (x1 - x0) < 4 or (y1 - y0) < 4:
continue
candidates.append({
"json_idx": idx,
"label": label,
"score": float(seg.get("score", 0)),
"bbox": [x0, y0, x1, y1],
"image_path": str(img_path),
})
segs_total = len(data.get("segments", []))
print(f"필터링: {segs_total}개 → {len(candidates)}개 control_box 후보")
return candidates
# ── 이미지 준비 (서버 시작 시 1회) ───────────────────────────────────────────
def prepare_image(img_path: Path):
global _img_data, _orig_w, _orig_h, _disp_w, _disp_h
buf = np.fromfile(str(img_path), dtype=np.uint8)
img = cv2.imdecode(buf, cv2.IMREAD_COLOR)
if img is None:
raise ValueError(f"이미지 로드 실패: {img_path}")
_orig_h, _orig_w = img.shape[:2]
scale = min(1.0, MAX_DIM / max(_orig_w, _orig_h))
_disp_w, _disp_h = int(_orig_w * scale), int(_orig_h * scale)
if scale < 1.0:
img = cv2.resize(img, (_disp_w, _disp_h), interpolation=cv2.INTER_LANCZOS4)
_, enc = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 92])
_img_data = bytes(enc)
print(f"이미지 준비: {_orig_w}×{_orig_h}{_disp_w}×{_disp_h}")
# ── API ───────────────────────────────────────────────────────────────────────
@app.get("/image")
async def serve_image():
return Response(content=_img_data, media_type="image/jpeg")
@app.get("/api/image_info")
async def api_image_info():
return {"orig_w": _orig_w, "orig_h": _orig_h, "disp_w": _disp_w, "disp_h": _disp_h}
@app.get("/api/candidates")
async def api_candidates(user: str = ""):
con = get_db()
rows = con.execute("""
SELECT c.id, c.bbox, c.label, c.score, v.vote
FROM candidates c
LEFT JOIN votes v ON c.id=v.candidate_id AND v.user=?
""", (user,)).fetchall()
con.close()
return {"candidates": [
{
"id": r["id"],
"bbox": json.loads(r["bbox"]),
"label": r["label"],
"score": round(r["score"], 2),
"voted": r["vote"] is not None,
"is_true": r["vote"] == 1 if r["vote"] is not None else None,
} for r in rows
]}
@app.post("/api/vote")
async def api_vote(data: dict):
user = data.get("user", "").strip()
all_ids = data.get("all_ids", [])
true_ids = set(data.get("true_ids", []))
if not user or not all_ids:
return {"ok": False, "error": "파라미터 오류"}
con = get_db()
for cid in all_ids:
con.execute(
"INSERT OR REPLACE INTO votes(candidate_id,user,vote) VALUES(?,?,?)",
(cid, user, 1 if cid in true_ids else 0),
)
con.commit()
con.close()
return {"ok": True}
@app.get("/api/stats")
async def api_stats():
con = get_db()
total = con.execute("SELECT COUNT(*) FROM candidates").fetchone()[0]
voted3 = con.execute(f"""
SELECT COUNT(*) FROM (
SELECT candidate_id FROM votes GROUP BY candidate_id HAVING COUNT(*) >= {MIN_VOTES}
)
""").fetchone()[0]
users = con.execute("SELECT COUNT(DISTINCT user) FROM votes").fetchone()[0]
tvotes = con.execute("SELECT COUNT(*) FROM votes").fetchone()[0]
con.close()
return {"total": total, "voted3plus": voted3, "users": users, "total_votes": tvotes}
@app.post("/api/export")
async def api_export():
con = get_db()
confirmed = con.execute(f"""
SELECT c.id, c.bbox, c.image_path,
SUM(v.vote) AS true_v, COUNT(v.id) AS total_v
FROM candidates c JOIN votes v ON c.id=v.candidate_id
GROUP BY c.id
HAVING total_v >= {MIN_VOTES} AND (CAST(true_v AS REAL)/total_v) >= {TRUE_RATIO}
""").fetchall()
con.close()
by_image = defaultdict(list)
for row in confirmed:
by_image[row["image_path"]].append(row)
out_dir = ROOT / "labels" / "yolo_export"
out_dir.mkdir(parents=True, exist_ok=True)
count = 0
for img_path, rows in by_image.items():
buf = np.fromfile(img_path, dtype=np.uint8)
img = cv2.imdecode(buf, cv2.IMREAD_COLOR)
H, W = img.shape[:2]
txt = out_dir / (Path(img_path).stem + ".txt")
with open(txt, "w") as f:
for row in rows:
x0, y0, x1, y1 = json.loads(row["bbox"])
f.write(f"0 {((x0+x1)/2)/W:.6f} {((y0+y1)/2)/H:.6f} "
f"{(x1-x0)/W:.6f} {(y1-y0)/H:.6f}\n")
count += 1
return {"ok": True, "labels": count, "dir": str(out_dir)}
# ── HTML ──────────────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def index():
return HTML
HTML = r"""<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<title>Control Box 라벨링</title>
<style>
*{box-sizing:border-box;margin:0;padding:0}
body{font-family:'Segoe UI',sans-serif;background:#1a1a2e;color:#e0e0e0;display:flex;flex-direction:column;height:100vh;overflow:hidden}
#toolbar{background:#16213e;border-bottom:1px solid #0f3460;padding:8px 14px;display:flex;align-items:center;gap:10px;flex-shrink:0;flex-wrap:wrap}
h1{color:#e94560;font-size:1rem;white-space:nowrap}
input[type=text]{background:#0f3460;border:1px solid #2a4a7a;color:#e0e0e0;padding:5px 10px;border-radius:4px;font-size:.85rem;width:130px}
input[type=text]:focus{outline:none;border-color:#e94560}
.btn{background:#e94560;color:#fff;border:none;padding:6px 12px;border-radius:5px;cursor:pointer;font-size:.82rem;font-weight:700;white-space:nowrap}
.btn:hover{background:#c73652}
.btn-sec{background:#0f3460;border:1px solid #2a4a7a;color:#e0e0e0}
.btn-sec:hover{background:#1a5299}
#stats{font-size:.75rem;color:#777;white-space:nowrap}
#legend{display:flex;gap:10px;align-items:center;font-size:.72rem;color:#888}
.leg{display:inline-flex;align-items:center;gap:3px}
.lb{width:12px;height:12px;border:2px solid;border-radius:2px;flex-shrink:0}
#wrap{flex:1;position:relative;overflow:hidden;cursor:crosshair}
canvas{display:block}
#tip{position:absolute;background:rgba(0,0,0,.85);color:#fff;font-size:.7rem;padding:3px 8px;border-radius:3px;pointer-events:none;display:none;max-width:300px}
#hint{position:absolute;bottom:8px;left:50%;transform:translateX(-50%);font-size:.72rem;color:#444;pointer-events:none;white-space:nowrap}
</style>
</head>
<body>
<div id="toolbar">
<h1>🎯 Control Box</h1>
<input type="text" id="uname" placeholder="이름/사번" autocomplete="off">
<button class="btn" onclick="init()">시작</button>
<button class="btn btn-sec" onclick="fitView()">맞춤(F)</button>
<span id="stats">—</span>
<div id="legend">
<span class="leg"><span class="lb" style="border-color:#ffaa00"></span>미투표</span>
<span class="leg"><span class="lb" style="border-color:#00cc66"></span>컨트롤박스 ✓</span>
<span class="leg"><span class="lb" style="border-color:#cc4444"></span>아님 ✗</span>
<span class="leg"><span class="lb" style="border-color:#444"></span>타인투표완료</span>
</div>
<div style="flex:1"></div>
<button class="btn btn-sec" onclick="doExport()" style="font-size:.75rem">YOLO 내보내기</button>
</div>
<div id="wrap">
<canvas id="cv"></canvas>
<div id="tip"></div>
<div id="hint">휠=줌 | 드래그=이동 | 클릭=YES/NO 토글 | F=맞춤</div>
</div>
<script>
const cv = document.getElementById('cv');
const ctx = cv.getContext('2d');
const wrap = document.getElementById('wrap');
let user = '', cands = [], imgScale = 1;
let origW = 0, origH = 0, dispW = 0, dispH = 0;
let sc = 1, ox = 0, oy = 0;
let isDrag = false, dragX = 0, dragY = 0, moved = false;
const img = new Image();
function resizeCv() {
cv.width = wrap.clientWidth;
cv.height = wrap.clientHeight;
render();
}
window.addEventListener('resize', resizeCv);
resizeCv();
async function init() {
user = document.getElementById('uname').value.trim();
if (!user) { alert('이름 입력 필요'); return; }
const info = await fetch('/api/image_info').then(r => r.json());
origW = info.orig_w; origH = info.orig_h;
dispW = info.disp_w; dispH = info.disp_h;
imgScale = dispW / origW;
img.onload = () => { fitView(); loadCands(); };
img.src = '/image?' + Date.now();
}
async function loadCands() {
const d = await fetch('/api/candidates?user=' + encodeURIComponent(user)).then(r => r.json());
cands = d.candidates.map(c => ({...c, _sel: c.voted ? c.is_true : undefined}));
updateStats();
render();
}
function updateStats() {
const myVotes = cands.filter(c => c._sel !== undefined).length;
const yes = cands.filter(c => c._sel === true).length;
const othersVoted = cands.filter(c => c.voted && c._sel === undefined).length;
document.getElementById('stats').textContent =
`후보 ${cands.length}개 | 내투표 ${myVotes}개 (YES:${yes}) | 타인완료 ${othersVoted}개`;
}
function fitView() {
if (!img.naturalWidth) return;
const ww = wrap.clientWidth, wh = wrap.clientHeight;
sc = Math.min(ww / dispW, wh / dispH) * 0.97;
ox = (ww - dispW * sc) / 2;
oy = (wh - dispH * sc) / 2;
render();
}
function render() {
const W = cv.width, H = cv.height;
ctx.fillStyle = '#0d0d1e';
ctx.fillRect(0, 0, W, H);
if (!img.naturalWidth) return;
ctx.save();
ctx.translate(ox, oy);
ctx.scale(sc, sc);
ctx.drawImage(img, 0, 0, dispW, dispH);
const lw = Math.max(1, 2 / sc);
for (const c of cands) {
const [x0, y0, x1, y1] = c.bbox.map(v => v * imgScale);
const w = x1 - x0, h = y1 - y0;
let color, fill = null;
if (c._sel === true) { color = '#00cc66'; fill = 'rgba(0,204,102,0.15)'; }
else if (c._sel === false) { color = '#cc4444'; fill = 'rgba(204,68,68,0.15)'; }
else if (c.voted) { color = '#444444'; }
else { color = c.score > 0.5 ? '#ffaa00' : '#ff7700aa'; }
ctx.lineWidth = lw;
ctx.strokeStyle = color;
if (fill) {
ctx.fillStyle = fill;
ctx.fillRect(x0, y0, w, h);
}
ctx.strokeRect(x0, y0, w, h);
}
ctx.restore();
}
// ── wheel zoom ────────────────────────────────────────────────────────────────
wrap.addEventListener('wheel', e => {
e.preventDefault();
const rect = wrap.getBoundingClientRect();
const mx = e.clientX - rect.left, my = e.clientY - rect.top;
const f = e.deltaY < 0 ? 1.15 : 1/1.15;
ox = mx - (mx - ox) * f;
oy = my - (my - oy) * f;
sc *= f;
render();
}, {passive: false});
// ── drag pan ──────────────────────────────────────────────────────────────────
wrap.addEventListener('mousedown', e => {
if (e.button !== 0) return;
isDrag = true; moved = false;
dragX = e.clientX; dragY = e.clientY;
wrap.style.cursor = 'grabbing';
});
window.addEventListener('mousemove', e => {
if (isDrag) {
const dx = e.clientX - dragX, dy = e.clientY - dragY;
if (Math.abs(dx) + Math.abs(dy) > 3) moved = true;
if (moved) { ox += dx; oy += dy; dragX = e.clientX; dragY = e.clientY; render(); }
}
// tooltip
const c = hitTest(e);
const tip = document.getElementById('tip');
if (c) {
const rect = wrap.getBoundingClientRect();
tip.style.display = 'block';
tip.style.left = (e.clientX - rect.left + 14) + 'px';
tip.style.top = (e.clientY - rect.top + 4) + 'px';
const status = c._sel === true ? '✅ YES' : c._sel === false ? '❌ NO' : c.voted ? '⬜ 타인완료' : '❓ 미투표';
tip.textContent = `[${c.id}] ${c.label} (${c.score}) — ${status}`;
} else {
tip.style.display = 'none';
}
});
window.addEventListener('mouseup', e => {
if (!isDrag) return;
wrap.style.cursor = 'crosshair';
isDrag = false;
if (!moved) handleClick(e);
});
// ── hit test ──────────────────────────────────────────────────────────────────
function imgCoords(e) {
const rect = wrap.getBoundingClientRect();
return [(e.clientX - rect.left - ox) / sc / imgScale,
(e.clientY - rect.top - oy) / sc / imgScale];
}
function hitTest(e) {
const [ix, iy] = imgCoords(e);
let best = null, bestArea = Infinity;
for (const c of cands) {
const [x0, y0, x1, y1] = c.bbox;
if (ix >= x0 && ix <= x1 && iy >= y0 && iy <= y1) {
const a = (x1-x0)*(y1-y0);
if (a < bestArea) { bestArea = a; best = c; }
}
}
return best;
}
async function handleClick(e) {
if (!user) return;
const c = hitTest(e);
if (!c) return;
// Toggle: undefined→YES→NO→YES...
c._sel = c._sel === true ? false : true;
render();
updateStats();
// Save immediately
await fetch('/api/vote', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({user, all_ids: [c.id], true_ids: c._sel ? [c.id] : []})
});
c.voted = true;
c.is_true = c._sel;
}
// ── keyboard ──────────────────────────────────────────────────────────────────
window.addEventListener('keydown', e => {
if (e.key === 'f' || e.key === 'F') fitView();
});
async function doExport() {
const r = await fetch('/api/export', {method: 'POST'}).then(r => r.json());
alert(`완료: ${r.labels}개 라벨\n경로: ${r.dir}`);
}
</script>
</body>
</html>"""
# ── 메인 ─────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--json", required=True, help="*_everything.json 경로")
ap.add_argument("--port", type=int, default=7001)
ap.add_argument("--reset", action="store_true", help="DB 초기화 후 재로드")
args = ap.parse_args()
json_path = Path(args.json)
if not json_path.exists():
print(f"JSON 없음: {json_path}"); sys.exit(1)
if args.reset and DB.exists():
DB.unlink()
print("DB 초기화 완료.")
print("후보 로딩 중...")
candidates = load_candidates(json_path)
init_db(candidates)
# Prepare image
stem = json_path.stem.replace("_everything", "")
img_path = next(
(json_path.parent / f"{stem}{ext}" for ext in (".JPG", ".jpg", ".png")
if (json_path.parent / f"{stem}{ext}").exists()),
None,
)
if img_path is None:
print(f"원본 이미지 없음: {json_path.parent / stem}.*"); sys.exit(1)
prepare_image(img_path)
print(f"\n라벨링 서버 시작: http://localhost:{args.port}")
uvicorn.run(app, host="0.0.0.0", port=args.port, log_level="warning")
if __name__ == "__main__":
main()