from __future__ import annotations from datetime import datetime from pathlib import Path import csv from io import BytesIO, StringIO import math import shutil import uuid import ezdxf from ezdxf import recover from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from openpyxl import load_workbook from pydantic import BaseModel, Field from .config import LEGACY_DIR, MOCK_LOGIN_ENABLED, UPLOAD_DIR from .db import get_conn, init_db app = FastAPI(title="MH Dashboard Organization API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) LEGACY_STATIC_DIR = LEGACY_DIR / "static" class MemberPayload(BaseModel): id: int | None = None name: str = Field(min_length=1) company: str = "" rank: str = "" role: str = "" department: str = "" grp: str = "" division: str = "" team: str = "" cell: str = "" work_status: str = "" work_time: str = "" phone: str = "" email: str = "" seat_label: str = "" photo_url: str = "" sort_order: int | None = None class MemberBulkPayload(BaseModel): items: list[MemberPayload] class SeatMapPayload(BaseModel): name: str = Field(min_length=1) image_url: str = "" source_type: str = "image" source_url: str = "" preview_svg: str = "" view_box_min_x: float | None = None view_box_min_y: float | None = None view_box_width: float | None = None view_box_height: float | None = None image_width: int | None = None image_height: int | None = None grid_rows: int = Field(default=1, ge=1, le=200) grid_cols: int = Field(default=1, ge=1, le=200) cell_gap: int = Field(default=0, ge=0, le=24) is_active: bool = True class SeatPlacementPayload(BaseModel): member_id: int seat_slot_id: int | None = None row_index: int = Field(default=0, ge=0) col_index: int = Field(default=0, ge=0) seat_label: str = "" class SeatLayoutPayload(BaseModel): placements: list[SeatPlacementPayload] LEGACY_HEADER_MAP = { "이름": "name", "name": "name", "소속회사": "company", "co": "company", "company": "company", "직급": "rank", "rank": "rank", "직책": "role", "pos": "role", "role": "role", "부서": "department", "part": "department", "department": "department", "그룹": "grp", "gr": "grp", "grp": "grp", "디비전": "division", "div": "division", "division": "division", "팀": "team", "team": "team", "teal": "team", "셀": "cell", "cell": "cell", "근무상태": "work_status", "work_status": "work_status", "근무시간": "work_time", "work_time": "work_time", "전화번호": "phone", "phone": "phone", "이메일": "email", "email": "email", "자리위치": "seat_label", "seat_label": "seat_label", "사진": "photo_url", "photo_url": "photo_url", } def serialize_member_payload(item: MemberPayload, sort_order: int) -> tuple[object, ...]: return ( item.name.strip(), item.company.strip(), item.rank.strip(), item.role.strip(), item.department.strip(), item.grp.strip(), item.division.strip(), item.team.strip(), item.cell.strip(), item.work_status.strip(), item.work_time.strip(), item.phone.strip(), item.email.strip(), item.seat_label.strip(), item.photo_url.strip(), sort_order, ) def fetch_members() -> list[dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id, name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order, created_at, updated_at FROM members ORDER BY sort_order ASC, id ASC """ ) return cur.fetchall() def serialize_seat_map_payload(payload: SeatMapPayload) -> tuple[object, ...]: return ( payload.name.strip(), payload.source_type.strip() or "image", payload.source_url.strip(), payload.preview_svg, payload.view_box_min_x, payload.view_box_min_y, payload.view_box_width, payload.view_box_height, payload.image_url.strip(), payload.image_width, payload.image_height, payload.grid_rows, payload.grid_cols, payload.cell_gap, payload.is_active, ) def fetch_seat_map(seat_map_id: int) -> dict[str, object] | None: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id, name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active, created_at, updated_at FROM seat_maps WHERE id = %s """, (seat_map_id,), ) return cur.fetchone() def fetch_active_seat_map() -> dict[str, object] | None: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id, name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active, created_at, updated_at FROM seat_maps WHERE is_active = TRUE ORDER BY updated_at DESC, id DESC LIMIT 1 """ ) return cur.fetchone() def compute_seat_label(row_index: int, col_index: int) -> str: quotient = row_index row_label = "" while True: quotient, remainder = divmod(quotient, 26) row_label = chr(65 + remainder) + row_label if quotient == 0: break quotient -= 1 return f"{row_label}-{col_index + 1:02d}" def compute_slot_label(index: int) -> str: return f"CHAIR-{index + 1:03d}" def get_entity_points(entity: ezdxf.entities.DXFGraphic) -> list[tuple[float, float]]: entity_type = entity.dxftype() if entity_type == "LINE": return [ (float(entity.dxf.start.x), float(entity.dxf.start.y)), (float(entity.dxf.end.x), float(entity.dxf.end.y)), ] if entity_type == "LWPOLYLINE": return [(float(point[0]), float(point[1])) for point in entity.get_points("xy")] if entity_type == "POLYLINE": return [(float(vertex.dxf.location.x), float(vertex.dxf.location.y)) for vertex in entity.vertices] if entity_type == "CIRCLE": center = entity.dxf.center radius = float(entity.dxf.radius) return [ (float(center.x - radius), float(center.y - radius)), (float(center.x + radius), float(center.y + radius)), ] if entity_type == "ARC": center = entity.dxf.center radius = float(entity.dxf.radius) return [ (float(center.x - radius), float(center.y - radius)), (float(center.x + radius), float(center.y + radius)), ] if entity_type == "POINT": location = entity.dxf.location return [(float(location.x), float(location.y))] if entity_type == "INSERT": insert = entity.dxf.insert return [(float(insert.x), float(insert.y))] return [] def get_entity_center(entity: ezdxf.entities.DXFGraphic) -> tuple[float, float] | None: points = get_entity_points(entity) if not points: return None min_x = min(point[0] for point in points) max_x = max(point[0] for point in points) min_y = min(point[1] for point in points) max_y = max(point[1] for point in points) return ((min_x + max_x) / 2.0, (min_y + max_y) / 2.0) def line_svg(points: list[tuple[float, float]]) -> str: if len(points) < 2: return "" coordinates = " ".join(f"{x:.2f},{-y:.2f}" for x, y in points) return f'' def circle_svg(center_x: float, center_y: float, radius: float, stroke: str = "#cbd5e1", fill: str = "none") -> str: return ( f'' ) def build_dxf_preview_svg( entities: list[ezdxf.entities.DXFGraphic], chair_slots: list[dict[str, object]], bounds: tuple[float, float, float, float], ) -> str: min_x, min_y, width, height = bounds max_y = min_y + height svg_parts: list[str] = [] for entity in entities: layer_name = entity.dxf.layer.lower() if layer_name == "chair": continue entity_type = entity.dxftype() if entity_type in {"LINE", "LWPOLYLINE", "POLYLINE"}: svg = line_svg(get_entity_points(entity)) if svg: svg_parts.append(svg) elif entity_type == "CIRCLE": center = entity.dxf.center svg_parts.append(circle_svg(float(center.x), float(center.y), float(entity.dxf.radius))) elif entity_type == "ARC": center = entity.dxf.center radius = float(entity.dxf.radius) start_angle = math.radians(float(entity.dxf.start_angle)) end_angle = math.radians(float(entity.dxf.end_angle)) start_x = float(center.x) + radius * math.cos(start_angle) start_y = float(center.y) + radius * math.sin(start_angle) end_x = float(center.x) + radius * math.cos(end_angle) end_y = float(center.y) + radius * math.sin(end_angle) large_arc = 1 if abs(float(entity.dxf.end_angle) - float(entity.dxf.start_angle)) > 180 else 0 svg_parts.append( f'' ) for slot in chair_slots: svg_parts.append( circle_svg( float(slot["x"]), float(slot["y"]), 10, stroke="#0f766e", fill="rgba(45, 212, 191, 0.22)", ) ) view_box = f"{min_x:.2f} {-max_y:.2f} {max(width, 1.0):.2f} {max(height, 1.0):.2f}" return ( f'' '' + "".join(svg_parts) + "" ) def parse_dxf_layout(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]]]: try: document = ezdxf.readfile(file_path) except OSError: try: document, _ = recover.readfile(file_path) except Exception as exc: with file_path.open("rb") as source: header = source.read(64) header_text = header.decode("latin-1", errors="ignore") if header.startswith(b"AC10") or "AutoCAD Binary DXF" in header_text: raise HTTPException( status_code=400, detail="DXF 파일을 해석하지 못했습니다. binary DXF 또는 손상된 DXF일 수 있습니다. 가능하면 ASCII DXF로 다시 저장해 업로드하세요.", ) from exc raise HTTPException( status_code=400, detail="업로드한 파일이 DXF 형식으로 읽히지 않습니다. DWG 파일이거나 확장자만 dxf로 바뀐 파일일 수 있습니다.", ) from exc modelspace = document.modelspace() all_entities = [entity for entity in modelspace if entity.dxftype() in {"LINE", "LWPOLYLINE", "POLYLINE", "CIRCLE", "ARC", "POINT", "INSERT"}] points: list[tuple[float, float]] = [] chair_entities: list[ezdxf.entities.DXFGraphic] = [] for entity in all_entities: entity_points = get_entity_points(entity) if entity_points: points.extend(entity_points) if entity.dxf.layer.lower() == "chair": chair_entities.append(entity) if not chair_entities: raise HTTPException(status_code=400, detail="DXF 파일에서 chair 레이어를 찾지 못했습니다.") if not points: raise HTTPException(status_code=400, detail="DXF 좌표를 해석하지 못했습니다.") min_x = min(point[0] for point in points) max_x = max(point[0] for point in points) min_y = min(point[1] for point in points) max_y = max(point[1] for point in points) width = max(max_x - min_x, 1.0) height = max(max_y - min_y, 1.0) slots: list[dict[str, object]] = [] for index, entity in enumerate(sorted(chair_entities, key=lambda item: (-(get_entity_center(item) or (0.0, 0.0))[1], (get_entity_center(item) or (0.0, 0.0))[0]))): center = get_entity_center(entity) if center is None: continue slots.append( { "slot_key": entity.dxf.handle, "label": compute_slot_label(index), "x": round(float(center[0]), 3), "y": round(float(center[1]), 3), "rotation": float(getattr(entity.dxf, "rotation", 0.0) or 0.0), "layer_name": entity.dxf.layer, } ) if not slots: raise HTTPException(status_code=400, detail="chair 레이어에서 좌석 위치를 추출하지 못했습니다.") preview_svg = build_dxf_preview_svg(all_entities, slots, (min_x, min_y, width, height)) metadata = { "source_type": "dxf", "view_box_min_x": round(min_x, 3), "view_box_min_y": round(min_y, 3), "view_box_width": round(width, 3), "view_box_height": round(height, 3), "preview_svg": preview_svg, "grid_rows": 1, "grid_cols": max(len(slots), 1), "image_width": None, "image_height": None, "cell_gap": 0, } return metadata, slots def fetch_seat_layout(seat_map_id: int) -> dict[str, object]: seat_map = fetch_seat_map(seat_map_id) if seat_map is None: raise HTTPException(status_code=404, detail="Seat map not found.") with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT m.id, m.name, m.company, m.rank, m.role, m.department, m.grp, m.division, m.team, m.cell, m.work_status, m.work_time, m.phone, m.email, m.seat_label AS member_seat_label, m.photo_url, m.sort_order FROM members m ORDER BY m.sort_order ASC, m.id ASC """ ) members = cur.fetchall() cur.execute( """ SELECT id, slot_key, label, x, y, rotation, layer_name FROM seat_slots WHERE seat_map_id = %s ORDER BY label ASC, id ASC """, (seat_map_id,), ) slots = cur.fetchall() cur.execute( """ SELECT sp.member_id, sp.row_index, sp.col_index, sp.seat_label, sp.seat_slot_id, m.name, m.company, m.rank, m.role, m.department, m.grp, m.division, m.team, m.cell, m.work_status, m.work_time, m.phone, m.email, m.photo_url, m.sort_order FROM seat_positions sp JOIN members m ON m.id = sp.member_id WHERE sp.seat_map_id = %s ORDER BY sp.row_index ASC, sp.col_index ASC, m.sort_order ASC, m.id ASC """, (seat_map_id,), ) placements = cur.fetchall() return { "seat_map": seat_map, "members": members, "slots": slots, "placements": placements, } def save_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> list[dict[str, object]]: seat_map = fetch_seat_map(seat_map_id) if seat_map is None: raise HTTPException(status_code=404, detail="Seat map not found.") member_ids: list[int] = [] occupied_cells: set[tuple[int, int]] = set() occupied_slots: set[int] = set() is_dxf = seat_map["source_type"] == "dxf" for item in payload.placements: if is_dxf: if item.seat_slot_id is None: raise HTTPException(status_code=400, detail="DXF 자리배치도는 seat_slot_id가 필요합니다.") if item.seat_slot_id in occupied_slots: raise HTTPException(status_code=400, detail="같은 좌석에 둘 이상의 구성원을 배치할 수 없습니다.") occupied_slots.add(item.seat_slot_id) else: if item.row_index >= int(seat_map["grid_rows"]) or item.col_index >= int(seat_map["grid_cols"]): raise HTTPException(status_code=400, detail="좌표가 자리배치도 범위를 벗어났습니다.") cell_key = (item.row_index, item.col_index) if cell_key in occupied_cells: raise HTTPException(status_code=400, detail="같은 칸에 둘 이상의 구성원을 배치할 수 없습니다.") occupied_cells.add(cell_key) member_ids.append(item.member_id) if len(member_ids) != len(set(member_ids)): raise HTTPException(status_code=400, detail="같은 구성원을 중복 배치할 수 없습니다.") if member_ids: with get_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT id FROM members WHERE id = ANY(%s)", (member_ids,)) existing_ids = {int(row["id"]) for row in cur.fetchall()} missing_ids = sorted(set(member_ids) - existing_ids) if missing_ids: raise HTTPException(status_code=400, detail=f"존재하지 않는 구성원 ID가 포함되어 있습니다: {missing_ids}") if is_dxf: slot_ids = sorted(occupied_slots) cur.execute("SELECT id FROM seat_slots WHERE seat_map_id = %s AND id = ANY(%s)", (seat_map_id, slot_ids)) existing_slot_ids = {int(row["id"]) for row in cur.fetchall()} missing_slot_ids = sorted(set(slot_ids) - existing_slot_ids) if missing_slot_ids: raise HTTPException(status_code=400, detail=f"존재하지 않는 좌석 슬롯 ID가 포함되어 있습니다: {missing_slot_ids}") cur.execute("SELECT id, label FROM seat_slots WHERE seat_map_id = %s", (seat_map_id,)) slot_label_map = {int(row["id"]): row["label"] for row in cur.fetchall()} else: slot_label_map = {} cur.execute("DELETE FROM seat_positions WHERE seat_map_id = %s AND NOT (member_id = ANY(%s))", (seat_map_id, member_ids)) for item in payload.placements: seat_label = item.seat_label.strip() or ( slot_label_map.get(int(item.seat_slot_id), f"SLOT-{item.seat_slot_id}") if is_dxf and item.seat_slot_id is not None else compute_seat_label(item.row_index, item.col_index) ) cur.execute( """ INSERT INTO seat_positions (member_id, seat_map_id, seat_slot_id, row_index, col_index, seat_label, updated_at) VALUES (%s, %s, %s, %s, %s, %s, NOW()) ON CONFLICT (member_id) DO UPDATE SET seat_map_id = EXCLUDED.seat_map_id, seat_slot_id = EXCLUDED.seat_slot_id, row_index = EXCLUDED.row_index, col_index = EXCLUDED.col_index, seat_label = EXCLUDED.seat_label, updated_at = NOW() """, ( item.member_id, seat_map_id, item.seat_slot_id if is_dxf else None, item.row_index, item.col_index, seat_label, ), ) conn.commit() else: with get_conn() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM seat_positions WHERE seat_map_id = %s", (seat_map_id,)) conn.commit() return fetch_seat_layout(seat_map_id)["placements"] def get_member_count() -> int: with get_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) AS count FROM members") return int(cur.fetchone()["count"]) def replace_members(items: list[MemberPayload]) -> list[dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: cur.execute("TRUNCATE TABLE members RESTART IDENTITY CASCADE") for index, item in enumerate(items): cur.execute( """ INSERT INTO members ( name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, serialize_member_payload(item, index), ) conn.commit() return fetch_members() def rows_to_member_payloads(rows: list[list[object]]) -> list[MemberPayload]: def normalize_header(value: object) -> str: return str(value or "").strip().lower() header_idx = next( ( idx for idx, row in enumerate(rows) if {"이름", "부서"}.issubset({str(value).strip() for value in row}) or {"name", "part"}.issubset({normalize_header(value) for value in row}) ), -1, ) if header_idx < 0: raise HTTPException(status_code=400, detail="지원하지 않는 파일 형식입니다. 필수 헤더(이름/부서 또는 name/part)를 찾지 못했습니다.") headers = [normalize_header(value) for value in rows[header_idx]] payloads: list[MemberPayload] = [] for row in rows[header_idx + 1 :]: if not any(str(value or "").strip() for value in row): continue record: dict[str, object] = {} for col_idx, header in enumerate(headers): mapped = LEGACY_HEADER_MAP.get(header) if not mapped: continue record[mapped] = str(row[col_idx] if col_idx < len(row) and row[col_idx] is not None else "").strip() if not str(record.get("name", "")).strip(): continue payloads.append(MemberPayload(**record)) return payloads def parse_import_rows(file: UploadFile, content: bytes) -> list[MemberPayload]: suffix = Path(file.filename or "").suffix.lower() if suffix == ".csv": text = content.decode("utf-8-sig") rows = list(csv.reader(StringIO(text))) return rows_to_member_payloads(rows) if suffix in {".xlsx", ".xlsm", ".xltx", ".xltm"}: workbook = load_workbook(BytesIO(content), data_only=True) sheet = workbook[workbook.sheetnames[0]] rows = [list(row) for row in sheet.iter_rows(values_only=True)] return rows_to_member_payloads(rows) raise HTTPException(status_code=400, detail="xlsx 또는 csv 파일만 업로드할 수 있습니다.") @app.on_event("startup") def startup() -> None: UPLOAD_DIR.mkdir(parents=True, exist_ok=True) LEGACY_STATIC_DIR.mkdir(parents=True, exist_ok=True) init_db() app.mount("/legacy/static", StaticFiles(directory=LEGACY_STATIC_DIR, check_dir=False), name="legacy-static") @app.get("/api/health") def health() -> dict[str, object]: checks = { "upload_dir": UPLOAD_DIR.exists(), } try: member_count = get_member_count() checks["database"] = True except Exception: member_count = None checks["database"] = False status = "ok" if all(checks.values()) else "degraded" return { "status": status, "checks": checks, "member_count": member_count, "timestamp": datetime.utcnow().isoformat() + "Z", } @app.post("/api/mock-login") def mock_login(username: str = Form(...), password: str = Form(...)) -> dict[str, object]: if not MOCK_LOGIN_ENABLED: raise HTTPException(status_code=403, detail="Mock login is disabled.") if not username.strip() or not password.strip(): raise HTTPException(status_code=400, detail="Username and password are required.") return { "user": { "username": username.strip(), "display_name": username.strip(), "role": "admin", }, "session_expires_at": datetime.utcnow().isoformat() + "Z", } @app.get("/api/members") def list_members() -> dict[str, list[dict[str, object]]]: return {"items": fetch_members()} @app.post("/api/members") def create_member(payload: MemberPayload) -> dict[str, object]: with get_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT COALESCE(MAX(sort_order), -1) + 1 AS next_order FROM members") next_order = int(cur.fetchone()["next_order"]) cur.execute( """ INSERT INTO members ( name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order, created_at, updated_at """, serialize_member_payload(payload, payload.sort_order if payload.sort_order is not None else next_order), ) member = cur.fetchone() conn.commit() return {"item": member} @app.put("/api/members/bulk-sync") def bulk_sync_members(payload: MemberBulkPayload) -> dict[str, list[dict[str, object]]]: return {"items": replace_members(payload.items)} @app.put("/api/members/{member_id}") def update_member(member_id: int, payload: MemberPayload) -> dict[str, object]: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE members SET name = %s, company = %s, rank = %s, role = %s, department = %s, grp = %s, division = %s, team = %s, cell = %s, work_status = %s, work_time = %s, phone = %s, email = %s, seat_label = %s, photo_url = %s, sort_order = COALESCE(%s, sort_order), updated_at = NOW() WHERE id = %s RETURNING id, name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order, created_at, updated_at """, (*serialize_member_payload(payload, payload.sort_order or 0)[:-1], payload.sort_order, member_id), ) member = cur.fetchone() if member is None: raise HTTPException(status_code=404, detail="Member not found.") conn.commit() return {"item": member} @app.delete("/api/members/{member_id}") def delete_member(member_id: int) -> dict[str, bool]: with get_conn() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM members WHERE id = %s", (member_id,)) deleted = cur.rowcount > 0 conn.commit() if not deleted: raise HTTPException(status_code=404, detail="Member not found.") return {"ok": True} @app.post("/api/members/import") async def import_members(file: UploadFile = File(...)) -> dict[str, list[dict[str, object]]]: content = await file.read() items = parse_import_rows(file, content) return {"items": replace_members(items)} @app.post("/api/uploads/profile-photo") def upload_profile_photo(file: UploadFile = File(...), member_name: str = Form("")) -> dict[str, str]: suffix = Path(file.filename or "").suffix.lower() if suffix not in {".png", ".jpg", ".jpeg", ".webp", ".gif"}: raise HTTPException(status_code=400, detail="Only image files are allowed.") stem = member_name.strip().replace(" ", "-") or "member" filename = f"{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}" target = UPLOAD_DIR / filename with target.open("wb") as out_file: shutil.copyfileobj(file.file, out_file) return {"url": f"/uploads/{filename}"} @app.post("/api/uploads/seat-map-image") def upload_seat_map_image(file: UploadFile = File(...), seat_map_name: str = Form("")) -> dict[str, str]: suffix = Path(file.filename or "").suffix.lower() if suffix not in {".png", ".jpg", ".jpeg", ".webp", ".gif"}: raise HTTPException(status_code=400, detail="Only image files are allowed.") stem = seat_map_name.strip().replace(" ", "-") or "seat-map" filename = f"seat-map-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}" target = UPLOAD_DIR / filename with target.open("wb") as out_file: shutil.copyfileobj(file.file, out_file) return {"url": f"/uploads/{filename}"} @app.post("/api/seat-maps/dxf") async def create_dxf_seat_map(file: UploadFile = File(...), name: str = Form(...)) -> dict[str, object]: suffix = Path(file.filename or "").suffix.lower() if suffix != ".dxf": raise HTTPException(status_code=400, detail="DXF 파일만 업로드할 수 있습니다.") stem = name.strip().replace(" ", "-") or "seat-map" filename = f"seat-map-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}" target = UPLOAD_DIR / filename content = await file.read() with target.open("wb") as out_file: out_file.write(content) try: metadata, slots = parse_dxf_layout(target) except Exception: if target.exists(): target.unlink(missing_ok=True) raise payload = SeatMapPayload( name=name.strip(), source_type="dxf", source_url=f"/uploads/{filename}", image_url="", preview_svg=metadata["preview_svg"], view_box_min_x=metadata["view_box_min_x"], view_box_min_y=metadata["view_box_min_y"], view_box_width=metadata["view_box_width"], view_box_height=metadata["view_box_height"], image_width=None, image_height=None, grid_rows=1, grid_cols=max(len(slots), 1), cell_gap=0, is_active=True, ) with get_conn() as conn: with conn.cursor() as cur: cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE") cur.execute( """ INSERT INTO seat_maps ( name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active, created_at, updated_at """, serialize_seat_map_payload(payload), ) seat_map = cur.fetchone() for slot in slots: cur.execute( """ INSERT INTO seat_slots (seat_map_id, slot_key, label, x, y, rotation, layer_name) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( seat_map["id"], slot["slot_key"], slot["label"], slot["x"], slot["y"], slot["rotation"], slot["layer_name"], ), ) conn.commit() return fetch_seat_layout(int(seat_map["id"])) @app.get("/api/seat-maps/active") def get_active_seat_map() -> dict[str, dict[str, object]]: seat_map = fetch_active_seat_map() if seat_map is None: raise HTTPException(status_code=404, detail="Active seat map not found.") return {"item": seat_map} @app.post("/api/seat-maps") def create_seat_map(payload: SeatMapPayload) -> dict[str, dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: if payload.is_active: cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE") cur.execute( """ INSERT INTO seat_maps ( name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active, created_at, updated_at """, serialize_seat_map_payload(payload), ) seat_map = cur.fetchone() conn.commit() return {"item": seat_map} @app.put("/api/seat-maps/{seat_map_id}") def update_seat_map(seat_map_id: int, payload: SeatMapPayload) -> dict[str, dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: if payload.source_type != "dxf": cur.execute( """ SELECT COUNT(*) AS count FROM seat_positions WHERE seat_map_id = %s AND (row_index >= %s OR col_index >= %s) """, (seat_map_id, payload.grid_rows, payload.grid_cols), ) out_of_bounds_count = int(cur.fetchone()["count"]) if out_of_bounds_count > 0: raise HTTPException(status_code=400, detail="현재 배치된 좌석이 새 그리드 범위를 벗어납니다. 먼저 좌석 배치를 정리하세요.") if payload.is_active: cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE AND id <> %s", (seat_map_id,)) cur.execute( """ UPDATE seat_maps SET name = %s, source_type = %s, source_url = %s, preview_svg = %s, view_box_min_x = %s, view_box_min_y = %s, view_box_width = %s, view_box_height = %s, image_url = %s, image_width = %s, image_height = %s, grid_rows = %s, grid_cols = %s, cell_gap = %s, is_active = %s, updated_at = NOW() WHERE id = %s RETURNING id, name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active, created_at, updated_at """, (*serialize_seat_map_payload(payload), seat_map_id), ) seat_map = cur.fetchone() if seat_map is None: raise HTTPException(status_code=404, detail="Seat map not found.") conn.commit() return {"item": seat_map} @app.get("/api/seat-maps/{seat_map_id}/layout") def get_seat_layout(seat_map_id: int) -> dict[str, object]: return fetch_seat_layout(seat_map_id) @app.put("/api/seat-maps/{seat_map_id}/layout") def update_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> dict[str, list[dict[str, object]]]: return {"items": save_seat_layout(seat_map_id, payload)} @app.get("/legacy/organization") def legacy_organization() -> FileResponse: target = LEGACY_DIR / "DashBoard-organization.html" if not target.exists(): raise HTTPException(status_code=404, detail="Legacy dashboard file not found.") return FileResponse(target) @app.get("/legacy/organization-backup") def legacy_organization_backup() -> FileResponse: target = LEGACY_DIR / "DashBoard-organization-backup.html" if not target.exists(): raise HTTPException(status_code=404, detail="Legacy dashboard backup not found.") return FileResponse(target) @app.get("/uploads/{filename}") def get_upload(filename: str) -> FileResponse: target = UPLOAD_DIR / filename if not target.exists(): raise HTTPException(status_code=404, detail="Upload not found.") return FileResponse(target)