""" Control Box 라벨링 서버 v2 — 전체 이미지 + bbox 오버레이 방식 사용법: python tools/labeling_server.py \ --json "data/역사이미지/slope/DJI_20260306113838_0004_everything.json" [--reset] DB 초기화 브라우저: http://localhost:7001 """ import argparse, json, sqlite3, sys from collections import defaultdict from pathlib import Path import cv2, numpy as np from fastapi import FastAPI from fastapi.responses import HTMLResponse, JSONResponse, Response import uvicorn ROOT = Path(__file__).parent.parent DB = ROOT / "labels" / "labeling.db" MIN_VOTES = 3 TRUE_RATIO = 0.6 MAX_DIM = 3000 # served image max dimension (px) CONTROL_LABELS = { "small dark object on ballast", "manhole cover", "trackside enclosure", "dark rectangle on ground", "small square box", "small metal cabinet", "small cube on gravel", "compact trackside junction box", "small square gray metal box beside rail", "small near-square electrical enclosure on the ground", } app = FastAPI() _img_data: bytes = None _orig_w = _orig_h = _disp_w = _disp_h = 0 # ── DB ──────────────────────────────────────────────────────────────────────── def get_db(): con = sqlite3.connect(str(DB)) con.row_factory = sqlite3.Row return con def init_db(candidates: list): DB.parent.mkdir(parents=True, exist_ok=True) con = get_db() con.executescript(""" CREATE TABLE IF NOT EXISTS candidates ( id INTEGER PRIMARY KEY, json_idx INTEGER, label TEXT, score REAL, bbox TEXT, image_path TEXT ); CREATE TABLE IF NOT EXISTS votes ( id INTEGER PRIMARY KEY AUTOINCREMENT, candidate_id INTEGER, user TEXT, vote INTEGER, ts DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE(candidate_id, user) ); """) if con.execute("SELECT COUNT(*) FROM candidates").fetchone()[0] > 0: n = con.execute("SELECT COUNT(*) FROM candidates").fetchone()[0] print(f"DB 기존 데이터 유지. candidates={n}") con.close() return for c in candidates: con.execute( "INSERT INTO candidates(json_idx,label,score,bbox,image_path) VALUES(?,?,?,?,?)", (c["json_idx"], c["label"], c["score"], json.dumps(c["bbox"]), c["image_path"]), ) con.commit() print(f"DB 등록: {len(candidates)}개") con.close() # ── 후보 로드 ───────────────────────────────────────────────────────────────── def load_candidates(json_path: Path) -> list: data = json.loads(json_path.read_text(encoding="utf-8")) stem = json_path.stem.replace("_everything", "") img_path = next( (json_path.parent / f"{stem}{ext}" for ext in (".JPG", ".jpg", ".png") if (json_path.parent / f"{stem}{ext}").exists()), None, ) if img_path is None: raise FileNotFoundError(f"원본 이미지 없음: {json_path.parent / stem}.*") candidates = [] for idx, seg in enumerate(data.get("segments", [])): label = seg.get("label", "").strip() if label not in CONTROL_LABELS: continue x0, y0, x1, y1 = [float(v) for v in seg["bbox"]] if (x1 - x0) < 4 or (y1 - y0) < 4: continue candidates.append({ "json_idx": idx, "label": label, "score": float(seg.get("score", 0)), "bbox": [x0, y0, x1, y1], "image_path": str(img_path), }) segs_total = len(data.get("segments", [])) print(f"필터링: {segs_total}개 → {len(candidates)}개 control_box 후보") return candidates # ── 이미지 준비 (서버 시작 시 1회) ─────────────────────────────────────────── def prepare_image(img_path: Path): global _img_data, _orig_w, _orig_h, _disp_w, _disp_h buf = np.fromfile(str(img_path), dtype=np.uint8) img = cv2.imdecode(buf, cv2.IMREAD_COLOR) if img is None: raise ValueError(f"이미지 로드 실패: {img_path}") _orig_h, _orig_w = img.shape[:2] scale = min(1.0, MAX_DIM / max(_orig_w, _orig_h)) _disp_w, _disp_h = int(_orig_w * scale), int(_orig_h * scale) if scale < 1.0: img = cv2.resize(img, (_disp_w, _disp_h), interpolation=cv2.INTER_LANCZOS4) _, enc = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 92]) _img_data = bytes(enc) print(f"이미지 준비: {_orig_w}×{_orig_h} → {_disp_w}×{_disp_h}") # ── API ─────────────────────────────────────────────────────────────────────── @app.get("/image") async def serve_image(): return Response(content=_img_data, media_type="image/jpeg") @app.get("/api/image_info") async def api_image_info(): return {"orig_w": _orig_w, "orig_h": _orig_h, "disp_w": _disp_w, "disp_h": _disp_h} @app.get("/api/candidates") async def api_candidates(user: str = ""): con = get_db() rows = con.execute(""" SELECT c.id, c.bbox, c.label, c.score, v.vote FROM candidates c LEFT JOIN votes v ON c.id=v.candidate_id AND v.user=? """, (user,)).fetchall() con.close() return {"candidates": [ { "id": r["id"], "bbox": json.loads(r["bbox"]), "label": r["label"], "score": round(r["score"], 2), "voted": r["vote"] is not None, "is_true": r["vote"] == 1 if r["vote"] is not None else None, } for r in rows ]} @app.post("/api/vote") async def api_vote(data: dict): user = data.get("user", "").strip() all_ids = data.get("all_ids", []) true_ids = set(data.get("true_ids", [])) if not user or not all_ids: return {"ok": False, "error": "파라미터 오류"} con = get_db() for cid in all_ids: con.execute( "INSERT OR REPLACE INTO votes(candidate_id,user,vote) VALUES(?,?,?)", (cid, user, 1 if cid in true_ids else 0), ) con.commit() con.close() return {"ok": True} @app.get("/api/stats") async def api_stats(): con = get_db() total = con.execute("SELECT COUNT(*) FROM candidates").fetchone()[0] voted3 = con.execute(f""" SELECT COUNT(*) FROM ( SELECT candidate_id FROM votes GROUP BY candidate_id HAVING COUNT(*) >= {MIN_VOTES} ) """).fetchone()[0] users = con.execute("SELECT COUNT(DISTINCT user) FROM votes").fetchone()[0] tvotes = con.execute("SELECT COUNT(*) FROM votes").fetchone()[0] con.close() return {"total": total, "voted3plus": voted3, "users": users, "total_votes": tvotes} @app.post("/api/export") async def api_export(): con = get_db() confirmed = con.execute(f""" SELECT c.id, c.bbox, c.image_path, SUM(v.vote) AS true_v, COUNT(v.id) AS total_v FROM candidates c JOIN votes v ON c.id=v.candidate_id GROUP BY c.id HAVING total_v >= {MIN_VOTES} AND (CAST(true_v AS REAL)/total_v) >= {TRUE_RATIO} """).fetchall() con.close() by_image = defaultdict(list) for row in confirmed: by_image[row["image_path"]].append(row) out_dir = ROOT / "labels" / "yolo_export" out_dir.mkdir(parents=True, exist_ok=True) count = 0 for img_path, rows in by_image.items(): buf = np.fromfile(img_path, dtype=np.uint8) img = cv2.imdecode(buf, cv2.IMREAD_COLOR) H, W = img.shape[:2] txt = out_dir / (Path(img_path).stem + ".txt") with open(txt, "w") as f: for row in rows: x0, y0, x1, y1 = json.loads(row["bbox"]) f.write(f"0 {((x0+x1)/2)/W:.6f} {((y0+y1)/2)/H:.6f} " f"{(x1-x0)/W:.6f} {(y1-y0)/H:.6f}\n") count += 1 return {"ok": True, "labels": count, "dir": str(out_dir)} # ── HTML ────────────────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def index(): return HTML HTML = r""" Control Box 라벨링

🎯 Control Box

미투표 컨트롤박스 ✓ 아님 ✗ 타인투표완료
휠=줌 | 드래그=이동 | 클릭=YES/NO 토글 | F=맞춤
""" # ── 메인 ───────────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--json", required=True, help="*_everything.json 경로") ap.add_argument("--port", type=int, default=7001) ap.add_argument("--reset", action="store_true", help="DB 초기화 후 재로드") args = ap.parse_args() json_path = Path(args.json) if not json_path.exists(): print(f"JSON 없음: {json_path}"); sys.exit(1) if args.reset and DB.exists(): DB.unlink() print("DB 초기화 완료.") print("후보 로딩 중...") candidates = load_candidates(json_path) init_db(candidates) # Prepare image stem = json_path.stem.replace("_everything", "") img_path = next( (json_path.parent / f"{stem}{ext}" for ext in (".JPG", ".jpg", ".png") if (json_path.parent / f"{stem}{ext}").exists()), None, ) if img_path is None: print(f"원본 이미지 없음: {json_path.parent / stem}.*"); sys.exit(1) prepare_image(img_path) print(f"\n라벨링 서버 시작: http://localhost:{args.port}") uvicorn.run(app, host="0.0.0.0", port=args.port, log_level="warning") if __name__ == "__main__": main()