from __future__ import annotations import csv import base64 from datetime import datetime from io import BytesIO, StringIO import json import math from pathlib import Path import re import shutil import struct import uuid import ezdxf from ezdxf import recover from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from openpyxl import load_workbook from pydantic import BaseModel, Field from .config import LEGACY_DIR, MOCK_LOGIN_ENABLED, UPLOAD_DIR from .db import get_conn, init_db app = FastAPI(title="MH Dashboard Organization API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) LEGACY_STATIC_DIR = LEGACY_DIR / "static" FIXED_OFFICE_SOURCE_KEY = "technical-development-center" FIXED_OFFICE_NAME = "기술개발센터" FIXED_OFFICE_TEMPLATE_PATH = Path(__file__).with_name("center_chair_viewer_template.html") _fixed_office_cache: dict[str, object] | None = None class MemberPayload(BaseModel): id: int | None = None name: str = Field(min_length=1) employee_id: str = "" company: str = "" rank: str = "" role: str = "" department: str = "" grp: str = "" division: str = "" team: str = "" cell: str = "" work_status: str = "" work_time: str = "" phone: str = "" email: str = "" seat_label: str = "" photo_url: str = "" sort_order: int | None = None class MemberBulkPayload(BaseModel): items: list[MemberPayload] class SeatMapPayload(BaseModel): name: str = Field(min_length=1) image_url: str = "" source_type: str = "image" source_url: str = "" preview_svg: str = "" view_box_min_x: float | None = None view_box_min_y: float | None = None view_box_width: float | None = None view_box_height: float | None = None image_width: int | None = None image_height: int | None = None grid_rows: int = Field(default=1, ge=1, le=200) grid_cols: int = Field(default=1, ge=1, le=200) cell_gap: int = Field(default=0, ge=0, le=24) is_active: bool = True class SeatPlacementPayload(BaseModel): member_id: int seat_slot_id: int | None = None row_index: int = Field(default=0, ge=0) col_index: int = Field(default=0, ge=0) seat_label: str = "" class SeatLayoutPayload(BaseModel): placements: list[SeatPlacementPayload] LEGACY_HEADER_MAP = { "이름": "name", "name": "name", "tag": "employee_id", "employee_id": "employee_id", "소속회사": "company", "co": "company", "company": "company", "직급": "rank", "rank": "rank", "직책": "role", "pos": "role", "role": "role", "부서": "department", "part": "department", "department": "department", "그룹": "grp", "gr": "grp", "grp": "grp", "디비전": "division", "div": "division", "division": "division", "팀": "team", "team": "team", "teal": "team", "셀": "cell", "cell": "cell", "근무상태": "work_status", "work_status": "work_status", "근무시간": "work_time", "work_time": "work_time", "전화번호": "phone", "ph": "phone", "phone": "phone", "이메일": "email", "mail": "email", "email": "email", "자리위치": "seat_label", "seat_label": "seat_label", "사진": "photo_url", "photo_url": "photo_url", } def normalize_phone(value: object) -> str: raw = str(value or "").strip() digits = "".join(ch for ch in raw if ch.isdigit()) if not digits: return "" if len(digits) == 10 and not digits.startswith("0"): digits = f"0{digits}" if len(digits) == 11 and digits.startswith("0"): return f"{digits[:3]}-{digits[3:7]}-{digits[7:]}" if len(digits) == 10 and digits.startswith("0"): return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}" return raw def serialize_member_payload(item: MemberPayload, sort_order: int) -> tuple[object, ...]: return ( item.name.strip(), item.employee_id.strip(), item.company.strip(), item.rank.strip(), item.role.strip(), item.department.strip(), item.grp.strip(), item.division.strip(), item.team.strip(), item.cell.strip(), item.work_status.strip(), item.work_time.strip(), normalize_phone(item.phone), item.email.strip(), item.seat_label.strip(), item.photo_url.strip(), sort_order, ) def fetch_members() -> list[dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order, created_at, updated_at FROM members ORDER BY sort_order ASC, id ASC """ ) return cur.fetchall() def serialize_seat_map_payload(payload: SeatMapPayload) -> tuple[object, ...]: return ( payload.name.strip(), payload.source_type.strip() or "image", payload.source_url.strip(), payload.preview_svg, payload.view_box_min_x, payload.view_box_min_y, payload.view_box_width, payload.view_box_height, payload.image_url.strip(), payload.image_width, payload.image_height, payload.grid_rows, payload.grid_cols, payload.cell_gap, payload.is_active, ) def fetch_seat_map(seat_map_id: int) -> dict[str, object] | None: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id, name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active, created_at, updated_at FROM seat_maps WHERE id = %s """, (seat_map_id,), ) return cur.fetchone() def fetch_active_seat_map() -> dict[str, object] | None: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id, name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active, created_at, updated_at FROM seat_maps WHERE is_active = TRUE ORDER BY updated_at DESC, id DESC LIMIT 1 """ ) return cur.fetchone() def ensure_fixed_office_seat_map(activate: bool = True) -> dict[str, object]: template = parse_fixed_office_template() slots = template["slots"] with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id FROM seat_maps WHERE source_type = 'fixed_html' AND source_url = %s LIMIT 1 """, (FIXED_OFFICE_SOURCE_KEY,), ) row = cur.fetchone() if activate: cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE") if row is None: cur.execute( """ INSERT INTO seat_maps ( name, image_url, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active ) VALUES (%s, '', 'fixed_html', %s, '', NULL, NULL, NULL, NULL, NULL, NULL, 1, 1, 0, %s) RETURNING id """, (FIXED_OFFICE_NAME, FIXED_OFFICE_SOURCE_KEY, activate), ) seat_map_id = int(cur.fetchone()["id"]) else: seat_map_id = int(row["id"]) cur.execute( """ UPDATE seat_maps SET name = %s, source_type = 'fixed_html', source_url = %s, image_url = '', preview_svg = '', grid_rows = 1, grid_cols = 1, cell_gap = 0, is_active = %s, updated_at = NOW() WHERE id = %s """, (FIXED_OFFICE_NAME, FIXED_OFFICE_SOURCE_KEY, activate, seat_map_id), ) cur.execute("SELECT id, slot_key FROM seat_slots WHERE seat_map_id = %s", (seat_map_id,)) existing_slots = {str(item["slot_key"]): int(item["id"]) for item in cur.fetchall()} incoming_keys = {str(slot["slot_key"]) for slot in slots} for slot in slots: slot_key = str(slot["slot_key"]) if slot_key in existing_slots: cur.execute( """ UPDATE seat_slots SET label = %s, x = %s, y = %s, rotation = %s, layer_name = %s, updated_at = NOW() WHERE seat_map_id = %s AND slot_key = %s """, ( slot["label"], slot["x"], slot["y"], slot["rotation"], slot["layer_name"], seat_map_id, slot_key, ), ) else: cur.execute( """ INSERT INTO seat_slots (seat_map_id, slot_key, label, x, y, rotation, layer_name) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( seat_map_id, slot_key, slot["label"], slot["x"], slot["y"], slot["rotation"], slot["layer_name"], ), ) if existing_slots: stale_keys = [key for key in existing_slots if key not in incoming_keys] if stale_keys: cur.execute( "DELETE FROM seat_slots WHERE seat_map_id = %s AND slot_key = ANY(%s)", (seat_map_id, stale_keys), ) conn.commit() seat_map = fetch_seat_map(seat_map_id) if seat_map is None: raise HTTPException(status_code=500, detail="Fixed office seat map initialization failed.") return seat_map def compute_seat_label(row_index: int, col_index: int) -> str: quotient = row_index row_label = "" while True: quotient, remainder = divmod(quotient, 26) row_label = chr(65 + remainder) + row_label if quotient == 0: break quotient -= 1 return f"{row_label}-{col_index + 1:02d}" def compute_slot_label(index: int) -> str: return f"CHAIR-{index + 1:03d}" def decode_segment_values(raw_base64: str) -> list[int]: decoded = base64.b64decode(raw_base64.encode("ascii")) if not decoded: return [] return [item[0] for item in struct.iter_unpack(" dict[str, object]: global _fixed_office_cache if _fixed_office_cache is not None: return _fixed_office_cache if not FIXED_OFFICE_TEMPLATE_PATH.exists(): raise HTTPException(status_code=500, detail="Fixed office viewer template not found.") html = FIXED_OFFICE_TEMPLATE_PATH.read_text(encoding="utf-8") match = re.search(r"const DATA = (\{.*?\});\n\s*function decodeSegments", html, flags=re.S) if not match: raise HTTPException(status_code=500, detail="Fixed office viewer data not found.") data = json.loads(match.group(1)) chair_values = decode_segment_values(str(data["chairSegsB64"])) slots: list[dict[str, object]] = [] for index, chair in enumerate(data["chairs"]): slot_key, name, _kind, start, count = chair min_x = math.inf min_y = math.inf max_x = -math.inf max_y = -math.inf start_index = int(start) end_index = start_index + int(count) for item_index in range(start_index, end_index): offset = item_index * 4 x1 = chair_values[offset] / 10 y1 = chair_values[offset + 1] / 10 x2 = chair_values[offset + 2] / 10 y2 = chair_values[offset + 3] / 10 min_x = min(min_x, x1, x2) min_y = min(min_y, y1, y2) max_x = max(max_x, x1, x2) max_y = max(max_y, y1, y2) slots.append( { "slot_key": str(slot_key), "label": str(slot_key), "x": round((min_x + max_x) / 2, 3), "y": round((min_y + max_y) / 2, 3), "rotation": 0.0, "layer_name": str(name), } ) _fixed_office_cache = { "html": html, "data": data, "slots": slots, } return _fixed_office_cache def is_chair_layer(layer_name: str) -> bool: raw = layer_name.strip().lower() compact = raw.replace("-", "").replace("_", "").replace(" ", "") return raw in {"chair", "_chair", "-chair"} or compact.endswith("chair") def inspect_dxf_header(file_path: Path) -> tuple[str, str]: with file_path.open("rb") as source: header = source.read(128) header_text = header.decode("latin-1", errors="ignore").replace("\x00", "") preview = header[:32].hex(" ") if header_text.startswith("AutoCAD Binary DXF"): return ("binary_dxf", preview) if header_text.startswith("0\nSECTION") or header_text.startswith("0\r\nSECTION"): return ("ascii_dxf", preview) if header.startswith(b"AC10"): return ("dwg_or_dwg_like", preview) return ("unknown", preview) def iter_render_entities(entity: ezdxf.entities.DXFGraphic, inherited_layer: str | None = None, depth: int = 0) -> list[ezdxf.entities.DXFGraphic]: if depth > 6: return [] entity_type = entity.dxftype() current_layer = inherited_layer or entity.dxf.layer if entity_type == "INSERT": expanded: list[ezdxf.entities.DXFGraphic] = [] try: for child in entity.virtual_entities(): child_layer = child.dxf.layer if child_layer == "0": child.dxf.layer = current_layer expanded.extend(iter_render_entities(child, inherited_layer=current_layer, depth=depth + 1)) except Exception: return [] return expanded if inherited_layer and entity.dxf.layer == "0": entity.dxf.layer = inherited_layer return [entity] def get_entity_points(entity: ezdxf.entities.DXFGraphic) -> list[tuple[float, float]]: entity_type = entity.dxftype() if entity_type == "LINE": return [ (float(entity.dxf.start.x), float(entity.dxf.start.y)), (float(entity.dxf.end.x), float(entity.dxf.end.y)), ] if entity_type == "LWPOLYLINE": return [(float(point[0]), float(point[1])) for point in entity.get_points("xy")] if entity_type == "POLYLINE": return [(float(vertex.dxf.location.x), float(vertex.dxf.location.y)) for vertex in entity.vertices] if entity_type == "CIRCLE": center = entity.dxf.center radius = float(entity.dxf.radius) return [ (float(center.x - radius), float(center.y - radius)), (float(center.x + radius), float(center.y + radius)), ] if entity_type == "ARC": center = entity.dxf.center radius = float(entity.dxf.radius) return [ (float(center.x - radius), float(center.y - radius)), (float(center.x + radius), float(center.y + radius)), ] if entity_type == "POINT": location = entity.dxf.location return [(float(location.x), float(location.y))] if entity_type == "SPLINE": try: return [(float(point[0]), float(point[1])) for point in entity.flattening(2)] except Exception: return [] if entity_type == "ELLIPSE": try: return [(float(point[0]), float(point[1])) for point in entity.flattening(2)] except Exception: center = entity.dxf.center major_axis = entity.dxf.major_axis ratio = float(entity.dxf.ratio) radius_x = math.hypot(float(major_axis.x), float(major_axis.y)) radius_y = radius_x * ratio return [ (float(center.x - radius_x), float(center.y - radius_y)), (float(center.x + radius_x), float(center.y + radius_y)), ] if entity_type == "INSERT": insert = entity.dxf.insert return [(float(insert.x), float(insert.y))] return [] def get_entity_center(entity: ezdxf.entities.DXFGraphic) -> tuple[float, float] | None: points = get_entity_points(entity) if not points: return None min_x = min(point[0] for point in points) max_x = max(point[0] for point in points) min_y = min(point[1] for point in points) max_y = max(point[1] for point in points) return ((min_x + max_x) / 2.0, (min_y + max_y) / 2.0) def get_entity_bounds(entity: ezdxf.entities.DXFGraphic) -> tuple[float, float, float, float] | None: points = get_entity_points(entity) if not points: return None min_x = min(point[0] for point in points) max_x = max(point[0] for point in points) min_y = min(point[1] for point in points) max_y = max(point[1] for point in points) return (min_x, min_y, max_x, max_y) def compute_bounds_from_points(points: list[tuple[float, float]]) -> tuple[float, float, float, float]: min_x = min(point[0] for point in points) max_x = max(point[0] for point in points) min_y = min(point[1] for point in points) max_y = max(point[1] for point in points) return (min_x, min_y, max(max_x - min_x, 1.0), max(max_y - min_y, 1.0)) def percentile(values: list[float], ratio: float) -> float: if not values: return 0.0 ordered = sorted(values) index = max(0, min(len(ordered) - 1, round((len(ordered) - 1) * ratio))) return float(ordered[index]) def compute_focus_bounds(slot_points: list[tuple[float, float]]) -> tuple[float, float, float, float]: x_values = [point[0] for point in slot_points] y_values = [point[1] for point in slot_points] min_x = percentile(x_values, 0.02) max_x = percentile(x_values, 0.98) min_y = percentile(y_values, 0.02) max_y = percentile(y_values, 0.98) width = max(max_x - min_x, 1.0) height = max(max_y - min_y, 1.0) pad_x = max(width * 0.08, 500.0) pad_y = max(height * 0.08, 500.0) return (min_x - pad_x, min_y - pad_y, max_x + pad_x, max_y + pad_y) def get_entity_max_span(entity: ezdxf.entities.DXFGraphic) -> float: bounds = get_entity_bounds(entity) if bounds is None: return 0.0 min_x, min_y, max_x, max_y = bounds return max(max_x - min_x, max_y - min_y) def compute_outline_bounds(entities: list[ezdxf.entities.DXFGraphic]) -> tuple[float, float, float, float] | None: outline_layers = {"0", "0-COL", "WID", "XH", "CO-DOOR", "CO-DO-FR", "문", "회의실"} outline_points: list[tuple[float, float]] = [] for entity in entities: if is_chair_layer(entity.dxf.layer): continue if entity.dxf.layer not in outline_layers: continue if get_entity_max_span(entity) < 3000: continue outline_points.extend(get_entity_points(entity)) if not outline_points: return None min_x, min_y, width, height = compute_bounds_from_points(outline_points) pad_x = max(width * 0.025, 300.0) pad_y = max(height * 0.025, 300.0) return (min_x - pad_x, min_y - pad_y, min_x + width + pad_x, min_y + height + pad_y) def bounds_intersect(bounds: tuple[float, float, float, float], focus_bounds: tuple[float, float, float, float]) -> bool: min_x, min_y, max_x, max_y = bounds focus_min_x, focus_min_y, focus_max_x, focus_max_y = focus_bounds return not ( max_x < focus_min_x or min_x > focus_max_x or max_y < focus_min_y or min_y > focus_max_y ) def line_svg(points: list[tuple[float, float]], css_class: str = "seatmap-dxf-entity") -> str: if len(points) < 2: return "" coordinates = " ".join(f"{x:.2f},{-y:.2f}" for x, y in points) return ( f'' ) def circle_svg( center_x: float, center_y: float, radius: float, stroke: str = "#475569", fill: str = "none", css_class: str = "seatmap-dxf-entity", ) -> str: return ( f'' ) def build_dxf_preview_svg( entities: list[ezdxf.entities.DXFGraphic], bounds: tuple[float, float, float, float], ) -> str: min_x, min_y, width, height = bounds max_y = min_y + height svg_parts: list[str] = [] for entity in entities: layer_name = entity.dxf.layer is_chair = is_chair_layer(layer_name) css_class = "seatmap-dxf-chair-entity" if is_chair else "seatmap-dxf-entity" entity_type = entity.dxftype() if entity_type in {"LINE", "LWPOLYLINE", "POLYLINE", "SPLINE", "ELLIPSE"}: svg = line_svg(get_entity_points(entity), css_class=css_class) if svg: svg_parts.append(svg) elif entity_type == "CIRCLE": center = entity.dxf.center svg_parts.append( circle_svg( float(center.x), float(center.y), float(entity.dxf.radius), fill="none", css_class=css_class, ) ) elif entity_type == "ARC": center = entity.dxf.center radius = float(entity.dxf.radius) start_angle = math.radians(float(entity.dxf.start_angle)) end_angle = math.radians(float(entity.dxf.end_angle)) start_x = float(center.x) + radius * math.cos(start_angle) start_y = float(center.y) + radius * math.sin(start_angle) end_x = float(center.x) + radius * math.cos(end_angle) end_y = float(center.y) + radius * math.sin(end_angle) large_arc = 1 if abs(float(entity.dxf.end_angle) - float(entity.dxf.start_angle)) > 180 else 0 svg_parts.append( f'' ) view_box = f"{min_x:.2f} {-max_y:.2f} {max(width, 1.0):.2f} {max(height, 1.0):.2f}" return ( f'' '' + "".join(svg_parts) + "" ) def load_dxf_document(file_path: Path) -> ezdxf.document.Drawing: try: return ezdxf.readfile(file_path) except OSError: try: document, _ = recover.readfile(file_path) return document except Exception as exc: kind, preview = inspect_dxf_header(file_path) if kind == "binary_dxf": raise HTTPException( status_code=400, detail=f"Binary DXF로 보이지만 해석에 실패했습니다. 가능하면 ASCII DXF로 다시 저장해 업로드하세요. 헤더={preview}", ) from exc if kind == "dwg_or_dwg_like": raise HTTPException( status_code=400, detail=f"업로드한 파일은 DWG 계열 헤더(AC10xx)로 보입니다. DWG가 아니라 ASCII DXF로 다시 저장해 업로드하세요. 헤더={preview}", ) from exc if kind == "ascii_dxf": raise HTTPException( status_code=400, detail=f"ASCII DXF로 보이지만 구조를 해석하지 못했습니다. 도면을 다른 DXF 버전으로 다시 저장해보세요. 헤더={preview}", ) from exc raise HTTPException( status_code=400, detail=f"업로드한 파일 형식을 판별하지 못했습니다. 확장자만 dxf인 파일일 수 있습니다. 헤더={preview}", ) from exc def entity_to_segments(entity: ezdxf.entities.DXFGraphic, arc_steps: int = 24) -> list[tuple[float, float, float, float]]: entity_type = entity.dxftype() points = get_entity_points(entity) if entity_type in {"LINE", "LWPOLYLINE", "POLYLINE", "SPLINE", "ELLIPSE"} and len(points) >= 2: return [ (float(left[0]), float(left[1]), float(right[0]), float(right[1])) for left, right in zip(points[:-1], points[1:]) ] if entity_type == "CIRCLE": center = entity.dxf.center radius = float(entity.dxf.radius) samples = [] for index in range(arc_steps + 1): angle = (math.tau * index) / arc_steps samples.append( ( float(center.x) + radius * math.cos(angle), float(center.y) + radius * math.sin(angle), ) ) return [ (float(left[0]), float(left[1]), float(right[0]), float(right[1])) for left, right in zip(samples[:-1], samples[1:]) ] if entity_type == "ARC": center = entity.dxf.center radius = float(entity.dxf.radius) start_angle = math.radians(float(entity.dxf.start_angle)) end_angle = math.radians(float(entity.dxf.end_angle)) if end_angle <= start_angle: end_angle += math.tau samples = [] for index in range(arc_steps + 1): ratio = index / arc_steps angle = start_angle + (end_angle - start_angle) * ratio samples.append( ( float(center.x) + radius * math.cos(angle), float(center.y) + radius * math.sin(angle), ) ) return [ (float(left[0]), float(left[1]), float(right[0]), float(right[1])) for left, right in zip(samples[:-1], samples[1:]) ] return [] def build_dxf_artifacts(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]], dict[str, object]]: document = load_dxf_document(file_path) modelspace = document.modelspace() base_entities = [entity for entity in modelspace if entity.dxftype() in {"LINE", "LWPOLYLINE", "POLYLINE", "CIRCLE", "ARC", "INSERT", "SPLINE", "ELLIPSE"}] all_entities: list[ezdxf.entities.DXFGraphic] = [] for entity in base_entities: all_entities.extend(iter_render_entities(entity)) chair_entities: list[ezdxf.entities.DXFGraphic] = [] chair_points: list[tuple[float, float]] = [] for entity in all_entities: if is_chair_layer(entity.dxf.layer): chair_entities.append(entity) chair_points.extend(get_entity_points(entity)) if not chair_entities: raise HTTPException(status_code=400, detail="DXF 파일에서 chair 계열 레이어를 찾지 못했습니다.") if not chair_points: raise HTTPException(status_code=400, detail="DXF 좌표를 해석하지 못했습니다.") slots: list[dict[str, object]] = [] for index, entity in enumerate(sorted(chair_entities, key=lambda item: (-(get_entity_center(item) or (0.0, 0.0))[1], (get_entity_center(item) or (0.0, 0.0))[0]))): center = get_entity_center(entity) if center is None: continue slots.append( { "slot_key": entity.dxf.handle, "label": compute_slot_label(index), "x": round(float(center[0]), 3), "y": round(float(center[1]), 3), "rotation": float(getattr(entity.dxf, "rotation", 0.0) or 0.0), "layer_name": entity.dxf.layer, } ) if not slots: raise HTTPException(status_code=400, detail="chair 레이어에서 좌석 위치를 추출하지 못했습니다.") slot_points = [(float(slot["x"]), float(slot["y"])) for slot in slots] focus_bounds = compute_outline_bounds(all_entities) or compute_focus_bounds(slot_points) visible_entities: list[ezdxf.entities.DXFGraphic] = [] visible_points: list[tuple[float, float]] = [] for entity in all_entities: entity_bounds = get_entity_bounds(entity) if entity_bounds is None: continue if bounds_intersect(entity_bounds, focus_bounds): visible_entities.append(entity) visible_points.extend(get_entity_points(entity)) if not visible_entities or not visible_points: visible_entities = all_entities visible_points = chair_points focus_min_x, focus_min_y, focus_max_x, focus_max_y = focus_bounds min_x = focus_min_x min_y = focus_min_y width = max(focus_max_x - focus_min_x, 1.0) height = max(focus_max_y - focus_min_y, 1.0) preview_svg = build_dxf_preview_svg(visible_entities, (min_x, min_y, width, height)) metadata = { "source_type": "dxf", "view_box_min_x": round(min_x, 3), "view_box_min_y": round(min_y, 3), "view_box_width": round(width, 3), "view_box_height": round(height, 3), "preview_svg": preview_svg, "grid_rows": 1, "grid_cols": 1, "image_width": None, "image_height": None, "cell_gap": 0, } slot_map = {str(slot["slot_key"]): slot for slot in slots} chair_segments: list[list[float]] = [] chair_items: list[dict[str, object]] = [] background_segments: list[list[float]] = [] for entity in visible_entities: segments = entity_to_segments(entity) if not segments: continue if is_chair_layer(entity.dxf.layer): slot = slot_map.get(str(entity.dxf.handle)) if not slot: continue start_index = len(chair_segments) min_seg_x = math.inf min_seg_y = math.inf max_seg_x = -math.inf max_seg_y = -math.inf for x1, y1, x2, y2 in segments: chair_segments.append([round(x1, 3), round(y1, 3), round(x2, 3), round(y2, 3)]) min_seg_x = min(min_seg_x, x1, x2) min_seg_y = min(min_seg_y, y1, y2) max_seg_x = max(max_seg_x, x1, x2) max_seg_y = max(max_seg_y, y1, y2) chair_items.append( { "key": str(slot["slot_key"]), "label": slot["label"], "kind": "chair", "start": start_index, "count": len(segments), "min_x": round(min_seg_x, 3), "min_y": round(min_seg_y, 3), "max_x": round(max_seg_x, 3), "max_y": round(max_seg_y, 3), } ) continue for x1, y1, x2, y2 in segments: background_segments.append([round(x1, 3), round(y1, 3), round(x2, 3), round(y2, 3)]) viewer_data = { "meta": { "background_segment_count": len(background_segments), "chair_count": len(chair_items), "chair_segment_count": len(chair_segments), "world": { "min_x": round(min_x, 3), "min_y": round(min_y, 3), "max_x": round(min_x + width, 3), "max_y": round(min_y + height, 3), "width": round(width, 3), "height": round(height, 3), }, }, "background_segments": background_segments, "chair_segments": chair_segments, "chairs": chair_items, } return metadata, slots, viewer_data def parse_dxf_layout(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]]]: metadata, slots, _viewer_data = build_dxf_artifacts(file_path) return metadata, slots def fetch_seat_layout(seat_map_id: int) -> dict[str, object]: seat_map = fetch_seat_map(seat_map_id) if seat_map is None: raise HTTPException(status_code=404, detail="Seat map not found.") with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT m.id, m.name, m.company, m.rank, m.role, m.department, m.grp, m.division, m.team, m.cell, m.work_status, m.work_time, m.phone, m.email, m.seat_label AS member_seat_label, m.photo_url, m.sort_order FROM members m ORDER BY m.sort_order ASC, m.id ASC """ ) members = cur.fetchall() cur.execute( """ SELECT id, slot_key, label, x, y, rotation, layer_name FROM seat_slots WHERE seat_map_id = %s ORDER BY label ASC, id ASC """, (seat_map_id,), ) slots = cur.fetchall() cur.execute( """ SELECT sp.member_id, sp.row_index, sp.col_index, sp.seat_label, sp.seat_slot_id, m.name, m.company, m.rank, m.role, m.department, m.grp, m.division, m.team, m.cell, m.work_status, m.work_time, m.phone, m.email, m.photo_url, m.sort_order FROM seat_positions sp JOIN members m ON m.id = sp.member_id WHERE sp.seat_map_id = %s ORDER BY sp.row_index ASC, sp.col_index ASC, m.sort_order ASC, m.id ASC """, (seat_map_id,), ) placements = cur.fetchall() viewer_data: dict[str, object] | None = None if seat_map["source_type"] == "fixed_html" and seat_map.get("source_url") == FIXED_OFFICE_SOURCE_KEY: template = parse_fixed_office_template() viewer_data = { "meta": { "chair_count": len(template["slots"]), "office": FIXED_OFFICE_NAME, } } elif seat_map["source_type"] == "dxf" and seat_map.get("source_url"): filename = Path(str(seat_map["source_url"])).name source_path = UPLOAD_DIR / filename if source_path.exists(): try: _metadata, _slots, viewer_data = build_dxf_artifacts(source_path) except Exception: viewer_data = None return { "seat_map": seat_map, "members": members, "slots": slots, "placements": placements, "viewer_data": viewer_data, } def build_center_chair_viewer_html(layout: dict[str, object]) -> str: slot_key_by_id = { int(slot["id"]): str(slot["slot_key"]) for slot in layout.get("slots", []) if slot.get("id") is not None and slot.get("slot_key") is not None } placed_keys: list[str] = [] for placement in layout.get("placements", []): slot_id = placement.get("seat_slot_id") if slot_id is None: continue slot_key = slot_key_by_id.get(int(slot_id)) if slot_key: placed_keys.append(slot_key) seat_map = layout.get("seat_map") or {} placed_literal = json.dumps(sorted(set(placed_keys)), ensure_ascii=False, separators=(",", ":")) if seat_map.get("source_type") == "fixed_html": html = parse_fixed_office_template()["html"] else: viewer_data = layout.get("viewer_data") if not isinstance(viewer_data, dict): raise HTTPException(status_code=404, detail="DXF viewer data not found.") template_path = Path(__file__).with_name("center_chair_viewer_template.html") if not template_path.exists(): raise HTTPException(status_code=500, detail="Viewer template not found.") html = template_path.read_text(encoding="utf-8") data_literal = json.dumps(viewer_data, ensure_ascii=False, separators=(",", ":")) html = re.sub( r"const DATA = .*?;\n\s*function decodeSegments", f"const DATA = {data_literal};\n function decodeSegments", html, count=1, flags=re.S, ) html = html.replace( 'const STORAGE_KEY = "ptc-chair-selection";\n const placed = new Set(JSON.parse(localStorage.getItem(STORAGE_KEY) || "[]"));', f"const STORAGE_KEY = null;\n const placed = new Set({placed_literal});", 1, ) html = html.replace( """ ctx.strokeStyle = selected ? "rgba(220, 38, 38, 0.98)" : active ? "rgba(15, 118, 110, 0.98)" : chair.kind === "group" ? "rgba(16, 134, 149, 0.74)" : "rgba(21, 149, 142, 0.8)"; ctx.lineWidth = (selected ? 2.6 : active ? 2.1 : baseWidth) / camera.scale;""", """ ctx.strokeStyle = selected ? "rgba(220, 38, 38, 0.98)" : "rgba(15, 118, 110, 0.88)"; ctx.lineWidth = (selected ? 2.6 : active ? 2.0 : 1.6) / camera.scale;""", 1, ) html = html.replace( "function persistPlaced() {\n localStorage.setItem(STORAGE_KEY, JSON.stringify([...placed]));\n }", "function persistPlaced() {\n return;\n }", 1, ) html = html.replace( """ window.addEventListener("pointerup", (event) => { if (dragging && dragStart) { const move = Math.hypot(event.clientX - dragStart.x, event.clientY - dragStart.y); if (move < 4) { const rect = canvas.getBoundingClientRect(); const picked = pickChair(event.clientX - rect.left, event.clientY - rect.top); if (picked) { if (placed.has(picked.key)) placed.delete(picked.key); else placed.add(picked.key); persistPlaced(); } } } dragging = false; dragStart = null; canvas.classList.remove("dragging"); requestDraw(); });""", """ window.addEventListener("pointerup", () => { dragging = false; dragStart = null; canvas.classList.remove("dragging"); requestDraw(); });""", 1, ) html = html.replace( """ document.getElementById("clear-btn").addEventListener("click", () => { placed.clear(); persistPlaced(); requestDraw(); });""", """ document.getElementById("clear-btn").addEventListener("click", () => { requestDraw(); });""", 1, ) bridge_script = """ """ html = html.replace("", f"{bridge_script}\n", 1) return html def save_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> list[dict[str, object]]: seat_map = fetch_seat_map(seat_map_id) if seat_map is None: raise HTTPException(status_code=404, detail="Seat map not found.") member_ids: list[int] = [] occupied_cells: set[tuple[int, int]] = set() occupied_slots: set[int] = set() requires_slot = seat_map["source_type"] in {"dxf", "fixed_html"} for item in payload.placements: if requires_slot: if item.seat_slot_id is None: raise HTTPException(status_code=400, detail="고정 도면 자리배치도는 seat_slot_id가 필요합니다.") if item.seat_slot_id in occupied_slots: raise HTTPException(status_code=400, detail="같은 좌석에 둘 이상의 구성원을 배치할 수 없습니다.") occupied_slots.add(item.seat_slot_id) else: if item.row_index >= int(seat_map["grid_rows"]) or item.col_index >= int(seat_map["grid_cols"]): raise HTTPException(status_code=400, detail="좌표가 자리배치도 범위를 벗어났습니다.") cell_key = (item.row_index, item.col_index) if cell_key in occupied_cells: raise HTTPException(status_code=400, detail="같은 칸에 둘 이상의 구성원을 배치할 수 없습니다.") occupied_cells.add(cell_key) member_ids.append(item.member_id) if len(member_ids) != len(set(member_ids)): raise HTTPException(status_code=400, detail="같은 구성원을 중복 배치할 수 없습니다.") if member_ids: with get_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT id FROM members WHERE id = ANY(%s)", (member_ids,)) existing_ids = {int(row["id"]) for row in cur.fetchall()} missing_ids = sorted(set(member_ids) - existing_ids) if missing_ids: raise HTTPException(status_code=400, detail=f"존재하지 않는 구성원 ID가 포함되어 있습니다: {missing_ids}") if requires_slot: slot_ids = sorted(occupied_slots) cur.execute("SELECT id FROM seat_slots WHERE seat_map_id = %s AND id = ANY(%s)", (seat_map_id, slot_ids)) existing_slot_ids = {int(row["id"]) for row in cur.fetchall()} missing_slot_ids = sorted(set(slot_ids) - existing_slot_ids) if missing_slot_ids: raise HTTPException(status_code=400, detail=f"존재하지 않는 좌석 슬롯 ID가 포함되어 있습니다: {missing_slot_ids}") cur.execute("SELECT id, label FROM seat_slots WHERE seat_map_id = %s", (seat_map_id,)) slot_label_map = {int(row["id"]): row["label"] for row in cur.fetchall()} else: slot_label_map = {} cur.execute("DELETE FROM seat_positions WHERE seat_map_id = %s AND NOT (member_id = ANY(%s))", (seat_map_id, member_ids)) for item in payload.placements: seat_label = item.seat_label.strip() or ( slot_label_map.get(int(item.seat_slot_id), f"SLOT-{item.seat_slot_id}") if requires_slot and item.seat_slot_id is not None else compute_seat_label(item.row_index, item.col_index) ) cur.execute( """ INSERT INTO seat_positions (member_id, seat_map_id, seat_slot_id, row_index, col_index, seat_label, updated_at) VALUES (%s, %s, %s, %s, %s, %s, NOW()) ON CONFLICT (member_id) DO UPDATE SET seat_map_id = EXCLUDED.seat_map_id, seat_slot_id = EXCLUDED.seat_slot_id, row_index = EXCLUDED.row_index, col_index = EXCLUDED.col_index, seat_label = EXCLUDED.seat_label, updated_at = NOW() """, ( item.member_id, seat_map_id, item.seat_slot_id if requires_slot else None, item.row_index, item.col_index, seat_label, ), ) conn.commit() else: with get_conn() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM seat_positions WHERE seat_map_id = %s", (seat_map_id,)) conn.commit() return fetch_seat_layout(seat_map_id)["placements"] def get_member_count() -> int: with get_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) AS count FROM members") return int(cur.fetchone()["count"]) def replace_members(items: list[MemberPayload]) -> list[dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: cur.execute("TRUNCATE TABLE members RESTART IDENTITY CASCADE") for index, item in enumerate(items): cur.execute( """ INSERT INTO members ( name, employee_id, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, serialize_member_payload(item, index), ) conn.commit() return fetch_members() def rows_to_member_payloads(rows: list[list[object]]) -> list[MemberPayload]: def normalize_header(value: object) -> str: return str(value or "").strip().lower() header_idx = next( ( idx for idx, row in enumerate(rows) if {"이름", "부서"}.issubset({str(value).strip() for value in row}) or {"name", "part"}.issubset({normalize_header(value) for value in row}) ), -1, ) if header_idx < 0: raise HTTPException(status_code=400, detail="지원하지 않는 파일 형식입니다. 필수 헤더(이름/부서 또는 name/part)를 찾지 못했습니다.") headers = [normalize_header(value) for value in rows[header_idx]] payloads: list[MemberPayload] = [] for row in rows[header_idx + 1 :]: if not any(str(value or "").strip() for value in row): continue record: dict[str, object] = {} for col_idx, header in enumerate(headers): mapped = LEGACY_HEADER_MAP.get(header) if not mapped: continue value = str(row[col_idx] if col_idx < len(row) and row[col_idx] is not None else "").strip() if mapped == "phone": value = normalize_phone(value) record[mapped] = value if not str(record.get("name", "")).strip(): continue payloads.append(MemberPayload(**record)) return payloads def parse_import_rows(file: UploadFile, content: bytes) -> list[MemberPayload]: suffix = Path(file.filename or "").suffix.lower() if suffix == ".csv": text = content.decode("utf-8-sig") rows = list(csv.reader(StringIO(text))) return rows_to_member_payloads(rows) if suffix in {".xlsx", ".xlsm", ".xltx", ".xltm"}: workbook = load_workbook(BytesIO(content), data_only=True) sheet = workbook[workbook.sheetnames[0]] rows = [list(row) for row in sheet.iter_rows(values_only=True)] return rows_to_member_payloads(rows) raise HTTPException(status_code=400, detail="xlsx 또는 csv 파일만 업로드할 수 있습니다.") @app.on_event("startup") def startup() -> None: UPLOAD_DIR.mkdir(parents=True, exist_ok=True) LEGACY_STATIC_DIR.mkdir(parents=True, exist_ok=True) init_db() app.mount("/legacy/static", StaticFiles(directory=LEGACY_STATIC_DIR, check_dir=False), name="legacy-static") @app.get("/api/health") def health() -> dict[str, object]: checks = { "upload_dir": UPLOAD_DIR.exists(), } try: member_count = get_member_count() checks["database"] = True except Exception: member_count = None checks["database"] = False status = "ok" if all(checks.values()) else "degraded" return { "status": status, "checks": checks, "member_count": member_count, "timestamp": datetime.utcnow().isoformat() + "Z", } @app.post("/api/mock-login") def mock_login(username: str = Form(...), password: str = Form(...)) -> dict[str, object]: if not MOCK_LOGIN_ENABLED: raise HTTPException(status_code=403, detail="Mock login is disabled.") if not username.strip() or not password.strip(): raise HTTPException(status_code=400, detail="Username and password are required.") return { "user": { "username": username.strip(), "display_name": username.strip(), "role": "admin", }, "session_expires_at": datetime.utcnow().isoformat() + "Z", } @app.get("/api/members") def list_members() -> dict[str, list[dict[str, object]]]: return {"items": fetch_members()} @app.post("/api/members") def create_member(payload: MemberPayload) -> dict[str, object]: with get_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT COALESCE(MAX(sort_order), -1) + 1 AS next_order FROM members") next_order = int(cur.fetchone()["next_order"]) cur.execute( """ INSERT INTO members ( name, employee_id, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, name, employee_id, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order, created_at, updated_at """, serialize_member_payload(payload, payload.sort_order if payload.sort_order is not None else next_order), ) member = cur.fetchone() conn.commit() return {"item": member} @app.put("/api/members/bulk-sync") def bulk_sync_members(payload: MemberBulkPayload) -> dict[str, list[dict[str, object]]]: return {"items": replace_members(payload.items)} @app.put("/api/members/{member_id}") def update_member(member_id: int, payload: MemberPayload) -> dict[str, object]: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE members SET name = %s, employee_id = %s, company = %s, rank = %s, role = %s, department = %s, grp = %s, division = %s, team = %s, cell = %s, work_status = %s, work_time = %s, phone = %s, email = %s, seat_label = %s, photo_url = %s, sort_order = COALESCE(%s, sort_order), updated_at = NOW() WHERE id = %s RETURNING id, name, employee_id, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order, created_at, updated_at """, (*serialize_member_payload(payload, payload.sort_order or 0)[:-1], payload.sort_order, member_id), ) member = cur.fetchone() if member is None: raise HTTPException(status_code=404, detail="Member not found.") conn.commit() return {"item": member} @app.delete("/api/members/{member_id}") def delete_member(member_id: int) -> dict[str, bool]: with get_conn() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM members WHERE id = %s", (member_id,)) deleted = cur.rowcount > 0 conn.commit() if not deleted: raise HTTPException(status_code=404, detail="Member not found.") return {"ok": True} @app.post("/api/members/import") async def import_members(file: UploadFile = File(...)) -> dict[str, list[dict[str, object]]]: content = await file.read() items = parse_import_rows(file, content) return {"items": replace_members(items)} @app.post("/api/uploads/profile-photo") def upload_profile_photo(file: UploadFile = File(...), member_name: str = Form("")) -> dict[str, str]: suffix = Path(file.filename or "").suffix.lower() if suffix not in {".png", ".jpg", ".jpeg", ".webp", ".gif"}: raise HTTPException(status_code=400, detail="Only image files are allowed.") stem = member_name.strip().replace(" ", "-") or "member" filename = f"{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}" target = UPLOAD_DIR / filename with target.open("wb") as out_file: shutil.copyfileobj(file.file, out_file) return {"url": f"/uploads/{filename}"} @app.post("/api/uploads/seat-map-image") def upload_seat_map_image(file: UploadFile = File(...), seat_map_name: str = Form("")) -> dict[str, str]: suffix = Path(file.filename or "").suffix.lower() if suffix not in {".png", ".jpg", ".jpeg", ".webp", ".gif"}: raise HTTPException(status_code=400, detail="Only image files are allowed.") stem = seat_map_name.strip().replace(" ", "-") or "seat-map" filename = f"seat-map-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}" target = UPLOAD_DIR / filename with target.open("wb") as out_file: shutil.copyfileobj(file.file, out_file) return {"url": f"/uploads/{filename}"} @app.post("/api/seat-maps/dxf") async def create_dxf_seat_map(file: UploadFile = File(...), name: str = Form(...)) -> dict[str, object]: suffix = Path(file.filename or "").suffix.lower() if suffix != ".dxf": raise HTTPException(status_code=400, detail="DXF 파일만 업로드할 수 있습니다.") stem = name.strip().replace(" ", "-") or "seat-map" filename = f"seat-map-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}" target = UPLOAD_DIR / filename content = await file.read() with target.open("wb") as out_file: out_file.write(content) try: metadata, slots = parse_dxf_layout(target) except Exception: raise payload = SeatMapPayload( name=name.strip(), source_type="dxf", source_url=f"/uploads/{filename}", image_url="", preview_svg=metadata["preview_svg"], view_box_min_x=metadata["view_box_min_x"], view_box_min_y=metadata["view_box_min_y"], view_box_width=metadata["view_box_width"], view_box_height=metadata["view_box_height"], image_width=None, image_height=None, grid_rows=1, grid_cols=1, cell_gap=0, is_active=True, ) with get_conn() as conn: with conn.cursor() as cur: cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE") cur.execute( """ INSERT INTO seat_maps ( name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active, created_at, updated_at """, serialize_seat_map_payload(payload), ) seat_map = cur.fetchone() for slot in slots: cur.execute( """ INSERT INTO seat_slots (seat_map_id, slot_key, label, x, y, rotation, layer_name) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( seat_map["id"], slot["slot_key"], slot["label"], slot["x"], slot["y"], slot["rotation"], slot["layer_name"], ), ) conn.commit() return fetch_seat_layout(int(seat_map["id"])) @app.get("/api/seat-maps/active") def get_active_seat_map() -> dict[str, dict[str, object]]: seat_map = ensure_fixed_office_seat_map(activate=True) if seat_map is None: raise HTTPException(status_code=404, detail="Active seat map not found.") return {"item": seat_map} @app.post("/api/seat-maps") def create_seat_map(payload: SeatMapPayload) -> dict[str, dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: if payload.is_active: cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE") cur.execute( """ INSERT INTO seat_maps ( name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active, created_at, updated_at """, serialize_seat_map_payload(payload), ) seat_map = cur.fetchone() conn.commit() return {"item": seat_map} @app.put("/api/seat-maps/{seat_map_id}") def update_seat_map(seat_map_id: int, payload: SeatMapPayload) -> dict[str, dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: if payload.source_type != "dxf": cur.execute( """ SELECT COUNT(*) AS count FROM seat_positions WHERE seat_map_id = %s AND (row_index >= %s OR col_index >= %s) """, (seat_map_id, payload.grid_rows, payload.grid_cols), ) out_of_bounds_count = int(cur.fetchone()["count"]) if out_of_bounds_count > 0: raise HTTPException(status_code=400, detail="현재 배치된 좌석이 새 그리드 범위를 벗어납니다. 먼저 좌석 배치를 정리하세요.") if payload.is_active: cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE AND id <> %s", (seat_map_id,)) cur.execute( """ UPDATE seat_maps SET name = %s, source_type = %s, source_url = %s, preview_svg = %s, view_box_min_x = %s, view_box_min_y = %s, view_box_width = %s, view_box_height = %s, image_url = %s, image_width = %s, image_height = %s, grid_rows = %s, grid_cols = %s, cell_gap = %s, is_active = %s, updated_at = NOW() WHERE id = %s RETURNING id, name, source_type, source_url, preview_svg, view_box_min_x, view_box_min_y, view_box_width, view_box_height, image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active, created_at, updated_at """, (*serialize_seat_map_payload(payload), seat_map_id), ) seat_map = cur.fetchone() if seat_map is None: raise HTTPException(status_code=404, detail="Seat map not found.") conn.commit() return {"item": seat_map} @app.get("/api/seat-maps/{seat_map_id}/layout") def get_seat_layout(seat_map_id: int) -> dict[str, object]: return fetch_seat_layout(seat_map_id) @app.get("/api/seat-maps/{seat_map_id}/viewer") def get_seat_map_viewer(seat_map_id: int) -> HTMLResponse: layout = fetch_seat_layout(seat_map_id) seat_map = layout.get("seat_map") or {} if seat_map.get("source_type") not in {"dxf", "fixed_html"}: raise HTTPException(status_code=400, detail="Viewer is only available for supported seat maps.") return HTMLResponse(build_center_chair_viewer_html(layout)) @app.put("/api/seat-maps/{seat_map_id}/layout") def update_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> dict[str, list[dict[str, object]]]: return {"items": save_seat_layout(seat_map_id, payload)} @app.get("/legacy/organization") def legacy_organization() -> FileResponse: target = LEGACY_DIR / "DashBoard-organization.html" if not target.exists(): raise HTTPException(status_code=404, detail="Legacy dashboard file not found.") return FileResponse(target) @app.get("/legacy/organization-backup") def legacy_organization_backup() -> FileResponse: target = LEGACY_DIR / "DashBoard-organization-backup.html" if not target.exists(): raise HTTPException(status_code=404, detail="Legacy dashboard backup not found.") return FileResponse(target) @app.get("/uploads/{filename}") def get_upload(filename: str) -> FileResponse: target = UPLOAD_DIR / filename if not target.exists(): raise HTTPException(status_code=404, detail="Upload not found.") return FileResponse(target)