1976 lines
75 KiB
Python
Executable File
1976 lines
75 KiB
Python
Executable File
from __future__ import annotations
|
|
|
|
import csv
|
|
import base64
|
|
from datetime import datetime
|
|
from io import BytesIO, StringIO
|
|
import json
|
|
import math
|
|
from pathlib import Path
|
|
import re
|
|
import shutil
|
|
import struct
|
|
import uuid
|
|
|
|
import ezdxf
|
|
from ezdxf import recover
|
|
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from openpyxl import load_workbook
|
|
from pydantic import BaseModel, Field
|
|
|
|
from .config import LEGACY_DIR, MOCK_LOGIN_ENABLED, UPLOAD_DIR
|
|
from .db import get_conn, init_db
|
|
|
|
|
|
app = FastAPI(title="MH Dashboard Organization API")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
LEGACY_STATIC_DIR = LEGACY_DIR / "static"
|
|
FIXED_OFFICE_SOURCE_KEY = "technical-development-center"
|
|
FIXED_OFFICE_NAME = "기술개발센터"
|
|
FIXED_OFFICE_TEMPLATE_PATH = Path(__file__).with_name("center_chair_viewer_template.html")
|
|
_fixed_office_cache: dict[str, object] | None = None
|
|
|
|
|
|
class MemberPayload(BaseModel):
|
|
id: int | None = None
|
|
name: str = Field(min_length=1)
|
|
employee_id: str = ""
|
|
company: str = ""
|
|
rank: str = ""
|
|
role: str = ""
|
|
department: str = ""
|
|
grp: str = ""
|
|
division: str = ""
|
|
team: str = ""
|
|
cell: str = ""
|
|
work_status: str = ""
|
|
work_time: str = ""
|
|
phone: str = ""
|
|
email: str = ""
|
|
seat_label: str = ""
|
|
photo_url: str = ""
|
|
sort_order: int | None = None
|
|
|
|
|
|
class MemberBulkPayload(BaseModel):
|
|
items: list[MemberPayload]
|
|
|
|
|
|
class SeatMapPayload(BaseModel):
|
|
name: str = Field(min_length=1)
|
|
image_url: str = ""
|
|
source_type: str = "image"
|
|
source_url: str = ""
|
|
preview_svg: str = ""
|
|
view_box_min_x: float | None = None
|
|
view_box_min_y: float | None = None
|
|
view_box_width: float | None = None
|
|
view_box_height: float | None = None
|
|
image_width: int | None = None
|
|
image_height: int | None = None
|
|
grid_rows: int = Field(default=1, ge=1, le=200)
|
|
grid_cols: int = Field(default=1, ge=1, le=200)
|
|
cell_gap: int = Field(default=0, ge=0, le=24)
|
|
is_active: bool = True
|
|
|
|
|
|
class SeatPlacementPayload(BaseModel):
|
|
member_id: int
|
|
seat_slot_id: int | None = None
|
|
row_index: int = Field(default=0, ge=0)
|
|
col_index: int = Field(default=0, ge=0)
|
|
seat_label: str = ""
|
|
|
|
|
|
class SeatLayoutPayload(BaseModel):
|
|
placements: list[SeatPlacementPayload]
|
|
|
|
|
|
LEGACY_HEADER_MAP = {
|
|
"이름": "name",
|
|
"name": "name",
|
|
"tag": "employee_id",
|
|
"employee_id": "employee_id",
|
|
"소속회사": "company",
|
|
"co": "company",
|
|
"company": "company",
|
|
"직급": "rank",
|
|
"rank": "rank",
|
|
"직책": "role",
|
|
"pos": "role",
|
|
"role": "role",
|
|
"부서": "department",
|
|
"part": "department",
|
|
"department": "department",
|
|
"그룹": "grp",
|
|
"gr": "grp",
|
|
"grp": "grp",
|
|
"디비전": "division",
|
|
"div": "division",
|
|
"division": "division",
|
|
"팀": "team",
|
|
"team": "team",
|
|
"teal": "team",
|
|
"셀": "cell",
|
|
"cell": "cell",
|
|
"근무상태": "work_status",
|
|
"work_status": "work_status",
|
|
"근무시간": "work_time",
|
|
"work_time": "work_time",
|
|
"전화번호": "phone",
|
|
"ph": "phone",
|
|
"phone": "phone",
|
|
"이메일": "email",
|
|
"mail": "email",
|
|
"email": "email",
|
|
"자리위치": "seat_label",
|
|
"seat_label": "seat_label",
|
|
"사진": "photo_url",
|
|
"photo_url": "photo_url",
|
|
}
|
|
|
|
|
|
def normalize_phone(value: object) -> str:
|
|
raw = str(value or "").strip()
|
|
digits = "".join(ch for ch in raw if ch.isdigit())
|
|
if not digits:
|
|
return ""
|
|
if len(digits) == 10 and not digits.startswith("0"):
|
|
digits = f"0{digits}"
|
|
if len(digits) == 11 and digits.startswith("0"):
|
|
return f"{digits[:3]}-{digits[3:7]}-{digits[7:]}"
|
|
if len(digits) == 10 and digits.startswith("0"):
|
|
return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}"
|
|
return raw
|
|
|
|
|
|
def serialize_member_payload(item: MemberPayload, sort_order: int) -> tuple[object, ...]:
|
|
return (
|
|
item.name.strip(),
|
|
item.employee_id.strip(),
|
|
item.company.strip(),
|
|
item.rank.strip(),
|
|
item.role.strip(),
|
|
item.department.strip(),
|
|
item.grp.strip(),
|
|
item.division.strip(),
|
|
item.team.strip(),
|
|
item.cell.strip(),
|
|
item.work_status.strip(),
|
|
item.work_time.strip(),
|
|
normalize_phone(item.phone),
|
|
item.email.strip(),
|
|
item.seat_label.strip(),
|
|
item.photo_url.strip(),
|
|
sort_order,
|
|
)
|
|
|
|
|
|
def fetch_members() -> list[dict[str, object]]:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
|
|
work_status, work_time, phone, email, seat_label, photo_url,
|
|
sort_order, created_at, updated_at
|
|
FROM members
|
|
ORDER BY sort_order ASC, id ASC
|
|
"""
|
|
)
|
|
return cur.fetchall()
|
|
|
|
|
|
def serialize_seat_map_payload(payload: SeatMapPayload) -> tuple[object, ...]:
|
|
return (
|
|
payload.name.strip(),
|
|
payload.source_type.strip() or "image",
|
|
payload.source_url.strip(),
|
|
payload.preview_svg,
|
|
payload.view_box_min_x,
|
|
payload.view_box_min_y,
|
|
payload.view_box_width,
|
|
payload.view_box_height,
|
|
payload.image_url.strip(),
|
|
payload.image_width,
|
|
payload.image_height,
|
|
payload.grid_rows,
|
|
payload.grid_cols,
|
|
payload.cell_gap,
|
|
payload.is_active,
|
|
)
|
|
|
|
|
|
def fetch_seat_map(seat_map_id: int) -> dict[str, object] | None:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, name, source_type, source_url, preview_svg,
|
|
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
|
|
image_url, image_width, image_height, grid_rows, grid_cols,
|
|
cell_gap, is_active, created_at, updated_at
|
|
FROM seat_maps
|
|
WHERE id = %s
|
|
""",
|
|
(seat_map_id,),
|
|
)
|
|
return cur.fetchone()
|
|
|
|
|
|
def fetch_active_seat_map() -> dict[str, object] | None:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, name, source_type, source_url, preview_svg,
|
|
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
|
|
image_url, image_width, image_height, grid_rows, grid_cols,
|
|
cell_gap, is_active, created_at, updated_at
|
|
FROM seat_maps
|
|
WHERE is_active = TRUE
|
|
ORDER BY updated_at DESC, id DESC
|
|
LIMIT 1
|
|
"""
|
|
)
|
|
return cur.fetchone()
|
|
|
|
|
|
def ensure_fixed_office_seat_map(activate: bool = True) -> dict[str, object]:
|
|
template = parse_fixed_office_template()
|
|
slots = template["slots"]
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT id
|
|
FROM seat_maps
|
|
WHERE source_type = 'fixed_html'
|
|
AND source_url = %s
|
|
LIMIT 1
|
|
""",
|
|
(FIXED_OFFICE_SOURCE_KEY,),
|
|
)
|
|
row = cur.fetchone()
|
|
if activate:
|
|
cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE")
|
|
if row is None:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO seat_maps (
|
|
name, image_url, source_type, source_url, preview_svg,
|
|
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
|
|
image_width, image_height, grid_rows, grid_cols, cell_gap, is_active
|
|
)
|
|
VALUES (%s, '', 'fixed_html', %s, '', NULL, NULL, NULL, NULL, NULL, NULL, 1, 1, 0, %s)
|
|
RETURNING id
|
|
""",
|
|
(FIXED_OFFICE_NAME, FIXED_OFFICE_SOURCE_KEY, activate),
|
|
)
|
|
seat_map_id = int(cur.fetchone()["id"])
|
|
else:
|
|
seat_map_id = int(row["id"])
|
|
cur.execute(
|
|
"""
|
|
UPDATE seat_maps
|
|
SET name = %s,
|
|
source_type = 'fixed_html',
|
|
source_url = %s,
|
|
image_url = '',
|
|
preview_svg = '',
|
|
grid_rows = 1,
|
|
grid_cols = 1,
|
|
cell_gap = 0,
|
|
is_active = %s,
|
|
updated_at = NOW()
|
|
WHERE id = %s
|
|
""",
|
|
(FIXED_OFFICE_NAME, FIXED_OFFICE_SOURCE_KEY, activate, seat_map_id),
|
|
)
|
|
|
|
cur.execute("SELECT id, slot_key FROM seat_slots WHERE seat_map_id = %s", (seat_map_id,))
|
|
existing_slots = {str(item["slot_key"]): int(item["id"]) for item in cur.fetchall()}
|
|
incoming_keys = {str(slot["slot_key"]) for slot in slots}
|
|
|
|
for slot in slots:
|
|
slot_key = str(slot["slot_key"])
|
|
if slot_key in existing_slots:
|
|
cur.execute(
|
|
"""
|
|
UPDATE seat_slots
|
|
SET label = %s, x = %s, y = %s, rotation = %s, layer_name = %s, updated_at = NOW()
|
|
WHERE seat_map_id = %s AND slot_key = %s
|
|
""",
|
|
(
|
|
slot["label"],
|
|
slot["x"],
|
|
slot["y"],
|
|
slot["rotation"],
|
|
slot["layer_name"],
|
|
seat_map_id,
|
|
slot_key,
|
|
),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO seat_slots (seat_map_id, slot_key, label, x, y, rotation, layer_name)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
""",
|
|
(
|
|
seat_map_id,
|
|
slot_key,
|
|
slot["label"],
|
|
slot["x"],
|
|
slot["y"],
|
|
slot["rotation"],
|
|
slot["layer_name"],
|
|
),
|
|
)
|
|
|
|
if existing_slots:
|
|
stale_keys = [key for key in existing_slots if key not in incoming_keys]
|
|
if stale_keys:
|
|
cur.execute(
|
|
"DELETE FROM seat_slots WHERE seat_map_id = %s AND slot_key = ANY(%s)",
|
|
(seat_map_id, stale_keys),
|
|
)
|
|
conn.commit()
|
|
seat_map = fetch_seat_map(seat_map_id)
|
|
if seat_map is None:
|
|
raise HTTPException(status_code=500, detail="Fixed office seat map initialization failed.")
|
|
return seat_map
|
|
|
|
|
|
def compute_seat_label(row_index: int, col_index: int) -> str:
|
|
quotient = row_index
|
|
row_label = ""
|
|
while True:
|
|
quotient, remainder = divmod(quotient, 26)
|
|
row_label = chr(65 + remainder) + row_label
|
|
if quotient == 0:
|
|
break
|
|
quotient -= 1
|
|
return f"{row_label}-{col_index + 1:02d}"
|
|
|
|
|
|
def compute_slot_label(index: int) -> str:
|
|
return f"CHAIR-{index + 1:03d}"
|
|
|
|
|
|
def decode_segment_values(raw_base64: str) -> list[int]:
|
|
decoded = base64.b64decode(raw_base64.encode("ascii"))
|
|
if not decoded:
|
|
return []
|
|
return [item[0] for item in struct.iter_unpack("<i", decoded)]
|
|
|
|
|
|
def parse_fixed_office_template() -> dict[str, object]:
|
|
global _fixed_office_cache
|
|
if _fixed_office_cache is not None:
|
|
return _fixed_office_cache
|
|
if not FIXED_OFFICE_TEMPLATE_PATH.exists():
|
|
raise HTTPException(status_code=500, detail="Fixed office viewer template not found.")
|
|
|
|
html = FIXED_OFFICE_TEMPLATE_PATH.read_text(encoding="utf-8")
|
|
match = re.search(r"const DATA = (\{.*?\});\n\s*function decodeSegments", html, flags=re.S)
|
|
if not match:
|
|
raise HTTPException(status_code=500, detail="Fixed office viewer data not found.")
|
|
data = json.loads(match.group(1))
|
|
chair_values = decode_segment_values(str(data["chairSegsB64"]))
|
|
slots: list[dict[str, object]] = []
|
|
for index, chair in enumerate(data["chairs"]):
|
|
slot_key, name, _kind, start, count = chair
|
|
min_x = math.inf
|
|
min_y = math.inf
|
|
max_x = -math.inf
|
|
max_y = -math.inf
|
|
start_index = int(start)
|
|
end_index = start_index + int(count)
|
|
for item_index in range(start_index, end_index):
|
|
offset = item_index * 4
|
|
x1 = chair_values[offset] / 10
|
|
y1 = chair_values[offset + 1] / 10
|
|
x2 = chair_values[offset + 2] / 10
|
|
y2 = chair_values[offset + 3] / 10
|
|
min_x = min(min_x, x1, x2)
|
|
min_y = min(min_y, y1, y2)
|
|
max_x = max(max_x, x1, x2)
|
|
max_y = max(max_y, y1, y2)
|
|
slots.append(
|
|
{
|
|
"slot_key": str(slot_key),
|
|
"label": str(slot_key),
|
|
"x": round((min_x + max_x) / 2, 3),
|
|
"y": round((min_y + max_y) / 2, 3),
|
|
"rotation": 0.0,
|
|
"layer_name": str(name),
|
|
}
|
|
)
|
|
_fixed_office_cache = {
|
|
"html": html,
|
|
"data": data,
|
|
"slots": slots,
|
|
}
|
|
return _fixed_office_cache
|
|
|
|
|
|
def is_chair_layer(layer_name: str) -> bool:
|
|
raw = layer_name.strip().lower()
|
|
compact = raw.replace("-", "").replace("_", "").replace(" ", "")
|
|
return raw in {"chair", "_chair", "-chair"} or compact.endswith("chair")
|
|
|
|
|
|
def inspect_dxf_header(file_path: Path) -> tuple[str, str]:
|
|
with file_path.open("rb") as source:
|
|
header = source.read(128)
|
|
header_text = header.decode("latin-1", errors="ignore").replace("\x00", "")
|
|
preview = header[:32].hex(" ")
|
|
|
|
if header_text.startswith("AutoCAD Binary DXF"):
|
|
return ("binary_dxf", preview)
|
|
if header_text.startswith("0\nSECTION") or header_text.startswith("0\r\nSECTION"):
|
|
return ("ascii_dxf", preview)
|
|
if header.startswith(b"AC10"):
|
|
return ("dwg_or_dwg_like", preview)
|
|
return ("unknown", preview)
|
|
|
|
|
|
def iter_render_entities(entity: ezdxf.entities.DXFGraphic, inherited_layer: str | None = None, depth: int = 0) -> list[ezdxf.entities.DXFGraphic]:
|
|
if depth > 6:
|
|
return []
|
|
entity_type = entity.dxftype()
|
|
current_layer = inherited_layer or entity.dxf.layer
|
|
if entity_type == "INSERT":
|
|
expanded: list[ezdxf.entities.DXFGraphic] = []
|
|
try:
|
|
for child in entity.virtual_entities():
|
|
child_layer = child.dxf.layer
|
|
if child_layer == "0":
|
|
child.dxf.layer = current_layer
|
|
expanded.extend(iter_render_entities(child, inherited_layer=current_layer, depth=depth + 1))
|
|
except Exception:
|
|
return []
|
|
return expanded
|
|
if inherited_layer and entity.dxf.layer == "0":
|
|
entity.dxf.layer = inherited_layer
|
|
return [entity]
|
|
|
|
|
|
def get_entity_points(entity: ezdxf.entities.DXFGraphic) -> list[tuple[float, float]]:
|
|
entity_type = entity.dxftype()
|
|
if entity_type == "LINE":
|
|
return [
|
|
(float(entity.dxf.start.x), float(entity.dxf.start.y)),
|
|
(float(entity.dxf.end.x), float(entity.dxf.end.y)),
|
|
]
|
|
if entity_type == "LWPOLYLINE":
|
|
return [(float(point[0]), float(point[1])) for point in entity.get_points("xy")]
|
|
if entity_type == "POLYLINE":
|
|
return [(float(vertex.dxf.location.x), float(vertex.dxf.location.y)) for vertex in entity.vertices]
|
|
if entity_type == "CIRCLE":
|
|
center = entity.dxf.center
|
|
radius = float(entity.dxf.radius)
|
|
return [
|
|
(float(center.x - radius), float(center.y - radius)),
|
|
(float(center.x + radius), float(center.y + radius)),
|
|
]
|
|
if entity_type == "ARC":
|
|
center = entity.dxf.center
|
|
radius = float(entity.dxf.radius)
|
|
return [
|
|
(float(center.x - radius), float(center.y - radius)),
|
|
(float(center.x + radius), float(center.y + radius)),
|
|
]
|
|
if entity_type == "POINT":
|
|
location = entity.dxf.location
|
|
return [(float(location.x), float(location.y))]
|
|
if entity_type == "SPLINE":
|
|
try:
|
|
return [(float(point[0]), float(point[1])) for point in entity.flattening(2)]
|
|
except Exception:
|
|
return []
|
|
if entity_type == "ELLIPSE":
|
|
try:
|
|
return [(float(point[0]), float(point[1])) for point in entity.flattening(2)]
|
|
except Exception:
|
|
center = entity.dxf.center
|
|
major_axis = entity.dxf.major_axis
|
|
ratio = float(entity.dxf.ratio)
|
|
radius_x = math.hypot(float(major_axis.x), float(major_axis.y))
|
|
radius_y = radius_x * ratio
|
|
return [
|
|
(float(center.x - radius_x), float(center.y - radius_y)),
|
|
(float(center.x + radius_x), float(center.y + radius_y)),
|
|
]
|
|
if entity_type == "INSERT":
|
|
insert = entity.dxf.insert
|
|
return [(float(insert.x), float(insert.y))]
|
|
return []
|
|
|
|
|
|
def get_entity_center(entity: ezdxf.entities.DXFGraphic) -> tuple[float, float] | None:
|
|
points = get_entity_points(entity)
|
|
if not points:
|
|
return None
|
|
min_x = min(point[0] for point in points)
|
|
max_x = max(point[0] for point in points)
|
|
min_y = min(point[1] for point in points)
|
|
max_y = max(point[1] for point in points)
|
|
return ((min_x + max_x) / 2.0, (min_y + max_y) / 2.0)
|
|
|
|
|
|
def get_entity_bounds(entity: ezdxf.entities.DXFGraphic) -> tuple[float, float, float, float] | None:
|
|
points = get_entity_points(entity)
|
|
if not points:
|
|
return None
|
|
min_x = min(point[0] for point in points)
|
|
max_x = max(point[0] for point in points)
|
|
min_y = min(point[1] for point in points)
|
|
max_y = max(point[1] for point in points)
|
|
return (min_x, min_y, max_x, max_y)
|
|
|
|
|
|
def compute_bounds_from_points(points: list[tuple[float, float]]) -> tuple[float, float, float, float]:
|
|
min_x = min(point[0] for point in points)
|
|
max_x = max(point[0] for point in points)
|
|
min_y = min(point[1] for point in points)
|
|
max_y = max(point[1] for point in points)
|
|
return (min_x, min_y, max(max_x - min_x, 1.0), max(max_y - min_y, 1.0))
|
|
|
|
|
|
def percentile(values: list[float], ratio: float) -> float:
|
|
if not values:
|
|
return 0.0
|
|
ordered = sorted(values)
|
|
index = max(0, min(len(ordered) - 1, round((len(ordered) - 1) * ratio)))
|
|
return float(ordered[index])
|
|
|
|
|
|
def compute_focus_bounds(slot_points: list[tuple[float, float]]) -> tuple[float, float, float, float]:
|
|
x_values = [point[0] for point in slot_points]
|
|
y_values = [point[1] for point in slot_points]
|
|
min_x = percentile(x_values, 0.02)
|
|
max_x = percentile(x_values, 0.98)
|
|
min_y = percentile(y_values, 0.02)
|
|
max_y = percentile(y_values, 0.98)
|
|
width = max(max_x - min_x, 1.0)
|
|
height = max(max_y - min_y, 1.0)
|
|
pad_x = max(width * 0.08, 500.0)
|
|
pad_y = max(height * 0.08, 500.0)
|
|
return (min_x - pad_x, min_y - pad_y, max_x + pad_x, max_y + pad_y)
|
|
|
|
|
|
def get_entity_max_span(entity: ezdxf.entities.DXFGraphic) -> float:
|
|
bounds = get_entity_bounds(entity)
|
|
if bounds is None:
|
|
return 0.0
|
|
min_x, min_y, max_x, max_y = bounds
|
|
return max(max_x - min_x, max_y - min_y)
|
|
|
|
|
|
def compute_outline_bounds(entities: list[ezdxf.entities.DXFGraphic]) -> tuple[float, float, float, float] | None:
|
|
outline_layers = {"0", "0-COL", "WID", "XH", "CO-DOOR", "CO-DO-FR", "문", "회의실"}
|
|
outline_points: list[tuple[float, float]] = []
|
|
for entity in entities:
|
|
if is_chair_layer(entity.dxf.layer):
|
|
continue
|
|
if entity.dxf.layer not in outline_layers:
|
|
continue
|
|
if get_entity_max_span(entity) < 3000:
|
|
continue
|
|
outline_points.extend(get_entity_points(entity))
|
|
if not outline_points:
|
|
return None
|
|
min_x, min_y, width, height = compute_bounds_from_points(outline_points)
|
|
pad_x = max(width * 0.025, 300.0)
|
|
pad_y = max(height * 0.025, 300.0)
|
|
return (min_x - pad_x, min_y - pad_y, min_x + width + pad_x, min_y + height + pad_y)
|
|
|
|
|
|
def bounds_intersect(bounds: tuple[float, float, float, float], focus_bounds: tuple[float, float, float, float]) -> bool:
|
|
min_x, min_y, max_x, max_y = bounds
|
|
focus_min_x, focus_min_y, focus_max_x, focus_max_y = focus_bounds
|
|
return not (
|
|
max_x < focus_min_x
|
|
or min_x > focus_max_x
|
|
or max_y < focus_min_y
|
|
or min_y > focus_max_y
|
|
)
|
|
|
|
|
|
def line_svg(points: list[tuple[float, float]], css_class: str = "seatmap-dxf-entity") -> str:
|
|
if len(points) < 2:
|
|
return ""
|
|
coordinates = " ".join(f"{x:.2f},{-y:.2f}" for x, y in points)
|
|
return (
|
|
f'<polyline class="{css_class}" points="{coordinates}" fill="none" '
|
|
'stroke-linejoin="round" stroke-linecap="round" />'
|
|
)
|
|
|
|
|
|
def circle_svg(
|
|
center_x: float,
|
|
center_y: float,
|
|
radius: float,
|
|
stroke: str = "#475569",
|
|
fill: str = "none",
|
|
css_class: str = "seatmap-dxf-entity",
|
|
) -> str:
|
|
return (
|
|
f'<circle class="{css_class}" cx="{center_x:.2f}" cy="{-center_y:.2f}" r="{radius:.2f}" '
|
|
f'stroke="{stroke}" fill="{fill}" />'
|
|
)
|
|
|
|
|
|
def build_dxf_preview_svg(
|
|
entities: list[ezdxf.entities.DXFGraphic],
|
|
bounds: tuple[float, float, float, float],
|
|
) -> str:
|
|
min_x, min_y, width, height = bounds
|
|
max_y = min_y + height
|
|
svg_parts: list[str] = []
|
|
|
|
for entity in entities:
|
|
layer_name = entity.dxf.layer
|
|
is_chair = is_chair_layer(layer_name)
|
|
css_class = "seatmap-dxf-chair-entity" if is_chair else "seatmap-dxf-entity"
|
|
entity_type = entity.dxftype()
|
|
if entity_type in {"LINE", "LWPOLYLINE", "POLYLINE", "SPLINE", "ELLIPSE"}:
|
|
svg = line_svg(get_entity_points(entity), css_class=css_class)
|
|
if svg:
|
|
svg_parts.append(svg)
|
|
elif entity_type == "CIRCLE":
|
|
center = entity.dxf.center
|
|
svg_parts.append(
|
|
circle_svg(
|
|
float(center.x),
|
|
float(center.y),
|
|
float(entity.dxf.radius),
|
|
fill="none",
|
|
css_class=css_class,
|
|
)
|
|
)
|
|
elif entity_type == "ARC":
|
|
center = entity.dxf.center
|
|
radius = float(entity.dxf.radius)
|
|
start_angle = math.radians(float(entity.dxf.start_angle))
|
|
end_angle = math.radians(float(entity.dxf.end_angle))
|
|
start_x = float(center.x) + radius * math.cos(start_angle)
|
|
start_y = float(center.y) + radius * math.sin(start_angle)
|
|
end_x = float(center.x) + radius * math.cos(end_angle)
|
|
end_y = float(center.y) + radius * math.sin(end_angle)
|
|
large_arc = 1 if abs(float(entity.dxf.end_angle) - float(entity.dxf.start_angle)) > 180 else 0
|
|
svg_parts.append(
|
|
f'<path class="{css_class}" d="M {start_x:.2f} {-start_y:.2f} '
|
|
f'A {radius:.2f} {radius:.2f} 0 {large_arc} 0 {end_x:.2f} {-end_y:.2f}" fill="none" />'
|
|
)
|
|
|
|
view_box = f"{min_x:.2f} {-max_y:.2f} {max(width, 1.0):.2f} {max(height, 1.0):.2f}"
|
|
return (
|
|
f'<svg class="seatmap-preview-svg" viewBox="{view_box}" xmlns="http://www.w3.org/2000/svg" preserveAspectRatio="xMidYMid meet">'
|
|
'<rect width="100%" height="100%" fill="#ffffff" />'
|
|
+ "".join(svg_parts)
|
|
+ "</svg>"
|
|
)
|
|
|
|
|
|
def load_dxf_document(file_path: Path) -> ezdxf.document.Drawing:
|
|
try:
|
|
return ezdxf.readfile(file_path)
|
|
except OSError:
|
|
try:
|
|
document, _ = recover.readfile(file_path)
|
|
return document
|
|
except Exception as exc:
|
|
kind, preview = inspect_dxf_header(file_path)
|
|
if kind == "binary_dxf":
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Binary DXF로 보이지만 해석에 실패했습니다. 가능하면 ASCII DXF로 다시 저장해 업로드하세요. 헤더={preview}",
|
|
) from exc
|
|
if kind == "dwg_or_dwg_like":
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"업로드한 파일은 DWG 계열 헤더(AC10xx)로 보입니다. DWG가 아니라 ASCII DXF로 다시 저장해 업로드하세요. 헤더={preview}",
|
|
) from exc
|
|
if kind == "ascii_dxf":
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"ASCII DXF로 보이지만 구조를 해석하지 못했습니다. 도면을 다른 DXF 버전으로 다시 저장해보세요. 헤더={preview}",
|
|
) from exc
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"업로드한 파일 형식을 판별하지 못했습니다. 확장자만 dxf인 파일일 수 있습니다. 헤더={preview}",
|
|
) from exc
|
|
|
|
|
|
def entity_to_segments(entity: ezdxf.entities.DXFGraphic, arc_steps: int = 24) -> list[tuple[float, float, float, float]]:
|
|
entity_type = entity.dxftype()
|
|
points = get_entity_points(entity)
|
|
if entity_type in {"LINE", "LWPOLYLINE", "POLYLINE", "SPLINE", "ELLIPSE"} and len(points) >= 2:
|
|
return [
|
|
(float(left[0]), float(left[1]), float(right[0]), float(right[1]))
|
|
for left, right in zip(points[:-1], points[1:])
|
|
]
|
|
|
|
if entity_type == "CIRCLE":
|
|
center = entity.dxf.center
|
|
radius = float(entity.dxf.radius)
|
|
samples = []
|
|
for index in range(arc_steps + 1):
|
|
angle = (math.tau * index) / arc_steps
|
|
samples.append(
|
|
(
|
|
float(center.x) + radius * math.cos(angle),
|
|
float(center.y) + radius * math.sin(angle),
|
|
)
|
|
)
|
|
return [
|
|
(float(left[0]), float(left[1]), float(right[0]), float(right[1]))
|
|
for left, right in zip(samples[:-1], samples[1:])
|
|
]
|
|
|
|
if entity_type == "ARC":
|
|
center = entity.dxf.center
|
|
radius = float(entity.dxf.radius)
|
|
start_angle = math.radians(float(entity.dxf.start_angle))
|
|
end_angle = math.radians(float(entity.dxf.end_angle))
|
|
if end_angle <= start_angle:
|
|
end_angle += math.tau
|
|
samples = []
|
|
for index in range(arc_steps + 1):
|
|
ratio = index / arc_steps
|
|
angle = start_angle + (end_angle - start_angle) * ratio
|
|
samples.append(
|
|
(
|
|
float(center.x) + radius * math.cos(angle),
|
|
float(center.y) + radius * math.sin(angle),
|
|
)
|
|
)
|
|
return [
|
|
(float(left[0]), float(left[1]), float(right[0]), float(right[1]))
|
|
for left, right in zip(samples[:-1], samples[1:])
|
|
]
|
|
|
|
return []
|
|
|
|
|
|
def build_dxf_artifacts(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]], dict[str, object]]:
|
|
document = load_dxf_document(file_path)
|
|
modelspace = document.modelspace()
|
|
base_entities = [entity for entity in modelspace if entity.dxftype() in {"LINE", "LWPOLYLINE", "POLYLINE", "CIRCLE", "ARC", "INSERT", "SPLINE", "ELLIPSE"}]
|
|
all_entities: list[ezdxf.entities.DXFGraphic] = []
|
|
for entity in base_entities:
|
|
all_entities.extend(iter_render_entities(entity))
|
|
chair_entities: list[ezdxf.entities.DXFGraphic] = []
|
|
chair_points: list[tuple[float, float]] = []
|
|
for entity in all_entities:
|
|
if is_chair_layer(entity.dxf.layer):
|
|
chair_entities.append(entity)
|
|
chair_points.extend(get_entity_points(entity))
|
|
|
|
if not chair_entities:
|
|
raise HTTPException(status_code=400, detail="DXF 파일에서 chair 계열 레이어를 찾지 못했습니다.")
|
|
|
|
if not chair_points:
|
|
raise HTTPException(status_code=400, detail="DXF 좌표를 해석하지 못했습니다.")
|
|
|
|
slots: list[dict[str, object]] = []
|
|
for index, entity in enumerate(sorted(chair_entities, key=lambda item: (-(get_entity_center(item) or (0.0, 0.0))[1], (get_entity_center(item) or (0.0, 0.0))[0]))):
|
|
center = get_entity_center(entity)
|
|
if center is None:
|
|
continue
|
|
slots.append(
|
|
{
|
|
"slot_key": entity.dxf.handle,
|
|
"label": compute_slot_label(index),
|
|
"x": round(float(center[0]), 3),
|
|
"y": round(float(center[1]), 3),
|
|
"rotation": float(getattr(entity.dxf, "rotation", 0.0) or 0.0),
|
|
"layer_name": entity.dxf.layer,
|
|
}
|
|
)
|
|
|
|
if not slots:
|
|
raise HTTPException(status_code=400, detail="chair 레이어에서 좌석 위치를 추출하지 못했습니다.")
|
|
|
|
slot_points = [(float(slot["x"]), float(slot["y"])) for slot in slots]
|
|
focus_bounds = compute_outline_bounds(all_entities) or compute_focus_bounds(slot_points)
|
|
visible_entities: list[ezdxf.entities.DXFGraphic] = []
|
|
visible_points: list[tuple[float, float]] = []
|
|
for entity in all_entities:
|
|
entity_bounds = get_entity_bounds(entity)
|
|
if entity_bounds is None:
|
|
continue
|
|
if bounds_intersect(entity_bounds, focus_bounds):
|
|
visible_entities.append(entity)
|
|
visible_points.extend(get_entity_points(entity))
|
|
|
|
if not visible_entities or not visible_points:
|
|
visible_entities = all_entities
|
|
visible_points = chair_points
|
|
|
|
focus_min_x, focus_min_y, focus_max_x, focus_max_y = focus_bounds
|
|
min_x = focus_min_x
|
|
min_y = focus_min_y
|
|
width = max(focus_max_x - focus_min_x, 1.0)
|
|
height = max(focus_max_y - focus_min_y, 1.0)
|
|
|
|
preview_svg = build_dxf_preview_svg(visible_entities, (min_x, min_y, width, height))
|
|
metadata = {
|
|
"source_type": "dxf",
|
|
"view_box_min_x": round(min_x, 3),
|
|
"view_box_min_y": round(min_y, 3),
|
|
"view_box_width": round(width, 3),
|
|
"view_box_height": round(height, 3),
|
|
"preview_svg": preview_svg,
|
|
"grid_rows": 1,
|
|
"grid_cols": 1,
|
|
"image_width": None,
|
|
"image_height": None,
|
|
"cell_gap": 0,
|
|
}
|
|
|
|
slot_map = {str(slot["slot_key"]): slot for slot in slots}
|
|
chair_segments: list[list[float]] = []
|
|
chair_items: list[dict[str, object]] = []
|
|
background_segments: list[list[float]] = []
|
|
|
|
for entity in visible_entities:
|
|
segments = entity_to_segments(entity)
|
|
if not segments:
|
|
continue
|
|
|
|
if is_chair_layer(entity.dxf.layer):
|
|
slot = slot_map.get(str(entity.dxf.handle))
|
|
if not slot:
|
|
continue
|
|
start_index = len(chair_segments)
|
|
min_seg_x = math.inf
|
|
min_seg_y = math.inf
|
|
max_seg_x = -math.inf
|
|
max_seg_y = -math.inf
|
|
for x1, y1, x2, y2 in segments:
|
|
chair_segments.append([round(x1, 3), round(y1, 3), round(x2, 3), round(y2, 3)])
|
|
min_seg_x = min(min_seg_x, x1, x2)
|
|
min_seg_y = min(min_seg_y, y1, y2)
|
|
max_seg_x = max(max_seg_x, x1, x2)
|
|
max_seg_y = max(max_seg_y, y1, y2)
|
|
chair_items.append(
|
|
{
|
|
"key": str(slot["slot_key"]),
|
|
"label": slot["label"],
|
|
"kind": "chair",
|
|
"start": start_index,
|
|
"count": len(segments),
|
|
"min_x": round(min_seg_x, 3),
|
|
"min_y": round(min_seg_y, 3),
|
|
"max_x": round(max_seg_x, 3),
|
|
"max_y": round(max_seg_y, 3),
|
|
}
|
|
)
|
|
continue
|
|
|
|
for x1, y1, x2, y2 in segments:
|
|
background_segments.append([round(x1, 3), round(y1, 3), round(x2, 3), round(y2, 3)])
|
|
|
|
viewer_data = {
|
|
"meta": {
|
|
"background_segment_count": len(background_segments),
|
|
"chair_count": len(chair_items),
|
|
"chair_segment_count": len(chair_segments),
|
|
"world": {
|
|
"min_x": round(min_x, 3),
|
|
"min_y": round(min_y, 3),
|
|
"max_x": round(min_x + width, 3),
|
|
"max_y": round(min_y + height, 3),
|
|
"width": round(width, 3),
|
|
"height": round(height, 3),
|
|
},
|
|
},
|
|
"background_segments": background_segments,
|
|
"chair_segments": chair_segments,
|
|
"chairs": chair_items,
|
|
}
|
|
return metadata, slots, viewer_data
|
|
|
|
|
|
def parse_dxf_layout(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]]]:
|
|
metadata, slots, _viewer_data = build_dxf_artifacts(file_path)
|
|
return metadata, slots
|
|
|
|
|
|
def fetch_seat_layout(seat_map_id: int) -> dict[str, object]:
|
|
seat_map = fetch_seat_map(seat_map_id)
|
|
if seat_map is None:
|
|
raise HTTPException(status_code=404, detail="Seat map not found.")
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT m.id, m.name, m.company, m.rank, m.role, m.department, m.grp, m.division,
|
|
m.team, m.cell, m.work_status, m.work_time, m.phone, m.email,
|
|
m.seat_label AS member_seat_label, m.photo_url, m.sort_order
|
|
FROM members m
|
|
ORDER BY m.sort_order ASC, m.id ASC
|
|
"""
|
|
)
|
|
members = cur.fetchall()
|
|
cur.execute(
|
|
"""
|
|
SELECT id, slot_key, label, x, y, rotation, layer_name
|
|
FROM seat_slots
|
|
WHERE seat_map_id = %s
|
|
ORDER BY label ASC, id ASC
|
|
""",
|
|
(seat_map_id,),
|
|
)
|
|
slots = cur.fetchall()
|
|
cur.execute(
|
|
"""
|
|
SELECT sp.member_id, sp.row_index, sp.col_index, sp.seat_label,
|
|
sp.seat_slot_id,
|
|
m.name, m.company, m.rank, m.role, m.department, m.grp, m.division,
|
|
m.team, m.cell, m.work_status, m.work_time, m.phone, m.email,
|
|
m.photo_url, m.sort_order
|
|
FROM seat_positions sp
|
|
JOIN members m ON m.id = sp.member_id
|
|
WHERE sp.seat_map_id = %s
|
|
ORDER BY sp.row_index ASC, sp.col_index ASC, m.sort_order ASC, m.id ASC
|
|
""",
|
|
(seat_map_id,),
|
|
)
|
|
placements = cur.fetchall()
|
|
viewer_data: dict[str, object] | None = None
|
|
if seat_map["source_type"] == "fixed_html" and seat_map.get("source_url") == FIXED_OFFICE_SOURCE_KEY:
|
|
template = parse_fixed_office_template()
|
|
viewer_data = {
|
|
"meta": {
|
|
"chair_count": len(template["slots"]),
|
|
"office": FIXED_OFFICE_NAME,
|
|
}
|
|
}
|
|
elif seat_map["source_type"] == "dxf" and seat_map.get("source_url"):
|
|
filename = Path(str(seat_map["source_url"])).name
|
|
source_path = UPLOAD_DIR / filename
|
|
if source_path.exists():
|
|
try:
|
|
_metadata, _slots, viewer_data = build_dxf_artifacts(source_path)
|
|
except Exception:
|
|
viewer_data = None
|
|
return {
|
|
"seat_map": seat_map,
|
|
"members": members,
|
|
"slots": slots,
|
|
"placements": placements,
|
|
"viewer_data": viewer_data,
|
|
}
|
|
|
|
|
|
def build_center_chair_viewer_html(layout: dict[str, object]) -> str:
|
|
slot_key_by_id = {
|
|
int(slot["id"]): str(slot["slot_key"])
|
|
for slot in layout.get("slots", [])
|
|
if slot.get("id") is not None and slot.get("slot_key") is not None
|
|
}
|
|
members_by_id = {
|
|
int(member["id"]): member
|
|
for member in layout.get("members", [])
|
|
if member.get("id") is not None
|
|
}
|
|
placed_keys: list[str] = []
|
|
assignment_items: list[dict[str, object]] = []
|
|
for placement in layout.get("placements", []):
|
|
slot_id = placement.get("seat_slot_id")
|
|
if slot_id is None:
|
|
continue
|
|
slot_key = slot_key_by_id.get(int(slot_id))
|
|
if slot_key:
|
|
placed_keys.append(slot_key)
|
|
member = members_by_id.get(int(placement.get("member_id") or 0))
|
|
if member:
|
|
assignment_items.append(
|
|
{
|
|
"key": slot_key,
|
|
"member_id": int(member["id"]),
|
|
"name": str(member.get("name") or "-"),
|
|
"rank": str(member.get("rank") or "-"),
|
|
}
|
|
)
|
|
|
|
seat_map = layout.get("seat_map") or {}
|
|
placed_literal = json.dumps(sorted(set(placed_keys)), ensure_ascii=False, separators=(",", ":"))
|
|
assignments_literal = json.dumps(assignment_items, ensure_ascii=False, separators=(",", ":"))
|
|
if seat_map.get("source_type") == "fixed_html":
|
|
html = parse_fixed_office_template()["html"]
|
|
else:
|
|
viewer_data = layout.get("viewer_data")
|
|
if not isinstance(viewer_data, dict):
|
|
raise HTTPException(status_code=404, detail="DXF viewer data not found.")
|
|
template_path = Path(__file__).with_name("center_chair_viewer_template.html")
|
|
if not template_path.exists():
|
|
raise HTTPException(status_code=500, detail="Viewer template not found.")
|
|
html = template_path.read_text(encoding="utf-8")
|
|
data_literal = json.dumps(viewer_data, ensure_ascii=False, separators=(",", ":"))
|
|
html = re.sub(
|
|
r"const DATA = .*?;\n\s*function decodeSegments",
|
|
f"const DATA = {data_literal};\n function decodeSegments",
|
|
html,
|
|
count=1,
|
|
flags=re.S,
|
|
)
|
|
html = html.replace(
|
|
'const STORAGE_KEY = "ptc-chair-selection";\n const placed = new Set(JSON.parse(localStorage.getItem(STORAGE_KEY) || "[]"));',
|
|
f"const STORAGE_KEY = null;\n const placed = new Set({placed_literal});",
|
|
1,
|
|
)
|
|
html = html.replace(
|
|
""" ctx.strokeStyle = selected
|
|
? "rgba(220, 38, 38, 0.98)"
|
|
: active
|
|
? "rgba(15, 118, 110, 0.98)"
|
|
: chair.kind === "group"
|
|
? "rgba(16, 134, 149, 0.74)"
|
|
: "rgba(21, 149, 142, 0.8)";
|
|
ctx.lineWidth = (selected ? 2.6 : active ? 2.1 : baseWidth) / camera.scale;""",
|
|
""" ctx.strokeStyle = selected
|
|
? "rgba(220, 38, 38, 0.98)"
|
|
: "rgba(15, 118, 110, 0.88)";
|
|
ctx.lineWidth = (selected ? 2.6 : active ? 2.0 : 1.6) / camera.scale;""",
|
|
1,
|
|
)
|
|
html = html.replace(
|
|
"function persistPlaced() {\n localStorage.setItem(STORAGE_KEY, JSON.stringify([...placed]));\n }",
|
|
"function persistPlaced() {\n return;\n }",
|
|
1,
|
|
)
|
|
html = html.replace(
|
|
""" window.addEventListener("pointerup", (event) => {
|
|
if (dragging && dragStart) {
|
|
const move = Math.hypot(event.clientX - dragStart.x, event.clientY - dragStart.y);
|
|
if (move < 4) {
|
|
const rect = canvas.getBoundingClientRect();
|
|
const picked = pickChair(event.clientX - rect.left, event.clientY - rect.top);
|
|
if (picked) {
|
|
if (placed.has(picked.key)) placed.delete(picked.key);
|
|
else placed.add(picked.key);
|
|
persistPlaced();
|
|
}
|
|
}
|
|
}
|
|
dragging = false;
|
|
dragStart = null;
|
|
canvas.classList.remove("dragging");
|
|
requestDraw();
|
|
});""",
|
|
""" window.addEventListener("pointerup", () => {
|
|
dragging = false;
|
|
dragStart = null;
|
|
canvas.classList.remove("dragging");
|
|
requestDraw();
|
|
});""",
|
|
1,
|
|
)
|
|
html = html.replace(
|
|
""" document.getElementById("clear-btn").addEventListener("click", () => {
|
|
placed.clear();
|
|
persistPlaced();
|
|
requestDraw();
|
|
});""",
|
|
""" document.getElementById("clear-btn").addEventListener("click", () => {
|
|
requestDraw();
|
|
});""",
|
|
1,
|
|
)
|
|
bridge_script = """
|
|
<style>
|
|
#fit-btn { display: none !important; }
|
|
#clear-btn { display: none !important; }
|
|
.seat-popup {
|
|
position: absolute;
|
|
min-width: 190px;
|
|
padding: 12px 14px;
|
|
border-radius: 16px;
|
|
background: rgba(17,24,39,0.96);
|
|
color: white;
|
|
box-shadow: 0 18px 36px rgba(15,23,42,0.22);
|
|
z-index: 4;
|
|
}
|
|
.seat-popup[hidden] { display: none; }
|
|
.seat-popup strong { display: block; margin-bottom: 6px; font-size: 14px; }
|
|
.seat-popup div { font-size: 12px; line-height: 1.45; color: rgba(255,255,255,0.82); }
|
|
.seat-popup button {
|
|
margin-top: 10px;
|
|
width: 100%;
|
|
border: none;
|
|
border-radius: 12px;
|
|
padding: 8px 10px;
|
|
font: inherit;
|
|
font-weight: 700;
|
|
cursor: pointer;
|
|
color: white;
|
|
background: rgba(220, 38, 38, 0.98);
|
|
}
|
|
</style>
|
|
<script>
|
|
const seatAssignments = new Map();
|
|
let selectedChairKey = null;
|
|
let viewerMode = "default";
|
|
const popup = document.createElement("div");
|
|
popup.className = "seat-popup";
|
|
popup.hidden = true;
|
|
document.querySelector(".viewer").appendChild(popup);
|
|
|
|
function getAssignment(key) {
|
|
return seatAssignments.get(String(key)) || null;
|
|
}
|
|
|
|
function hideSeatPopup() {
|
|
popup.hidden = true;
|
|
popup.innerHTML = "";
|
|
}
|
|
|
|
function setViewerMode(mode) {
|
|
viewerMode = mode === "compact" || mode === "readonly" ? mode : "default";
|
|
const head = document.querySelector(".viewer-head");
|
|
const actions = document.querySelector(".viewer-actions");
|
|
if (head) head.style.display = viewerMode === "compact" ? "none" : "";
|
|
if (actions) actions.style.display = viewerMode !== "default" ? "none" : "";
|
|
if (viewerMode !== "default") {
|
|
hideSeatPopup();
|
|
selectedChairKey = null;
|
|
}
|
|
}
|
|
|
|
function showSeatPopup(chairKey, x, y) {
|
|
const assignment = getAssignment(chairKey);
|
|
if (!assignment) {
|
|
hideSeatPopup();
|
|
return;
|
|
}
|
|
popup.innerHTML = `
|
|
<strong>${assignment.name}</strong>
|
|
<div>직급: ${assignment.rank || "-"}</div>
|
|
<div>상태: 배치완료</div>
|
|
${viewerMode === "default" ? `<button type="button" data-seatmap-delete="${chairKey}">자리 비우기</button>` : ""}
|
|
`;
|
|
popup.style.left = `${x + 18}px`;
|
|
popup.style.top = `${y + 18}px`;
|
|
popup.hidden = false;
|
|
}
|
|
|
|
function focusChair(chairKey, padding = 2200) {
|
|
const chair = chairGeometry.find((item) => String(item.key) === String(chairKey));
|
|
if (!chair) return;
|
|
const rect = canvas.getBoundingClientRect();
|
|
const pad = 24;
|
|
const minX = chair.minX - padding;
|
|
const maxX = chair.maxX + padding;
|
|
const minY = chair.minY - padding;
|
|
const maxY = chair.maxY + padding;
|
|
const width = Math.max(1, maxX - minX);
|
|
const height = Math.max(1, maxY - minY);
|
|
camera.scale = Math.max(0.002, Math.min(2, Math.min((rect.width - pad * 2) / width, (rect.height - pad * 2) / height)));
|
|
camera.offsetX = pad - minX * camera.scale + (rect.width - pad * 2 - width * camera.scale) / 2;
|
|
camera.offsetY = pad - (world.maxY - maxY + world.minY) * camera.scale + (rect.height - pad * 2 - height * camera.scale) / 2;
|
|
requestDraw();
|
|
}
|
|
|
|
function setAssignments(items) {
|
|
seatAssignments.clear();
|
|
placed.clear();
|
|
(items || []).forEach((item) => {
|
|
const key = String(item.key || "");
|
|
if (!key) return;
|
|
const assignment = {
|
|
key,
|
|
name: item.name || "-",
|
|
rank: item.rank || "-",
|
|
memberId: Number(item.member_id || 0),
|
|
};
|
|
seatAssignments.set(key, assignment);
|
|
placed.add(key);
|
|
});
|
|
if (selectedChairKey && !seatAssignments.has(selectedChairKey)) {
|
|
selectedChairKey = null;
|
|
hideSeatPopup();
|
|
}
|
|
if (typeof requestDraw === "function") requestDraw();
|
|
}
|
|
|
|
renderTooltip = function renderTooltipOverride() {
|
|
if (!hovered) {
|
|
tooltip.classList.remove("visible");
|
|
hoverChip.textContent = "chair hover: none";
|
|
return;
|
|
}
|
|
const assignment = getAssignment(hovered.key);
|
|
hoverChip.textContent = assignment
|
|
? `chair hover: ${assignment.name}`
|
|
: "chair hover: 공석";
|
|
tooltip.innerHTML = assignment
|
|
? `
|
|
<strong>${assignment.name}</strong>
|
|
<div>직급: ${assignment.rank || "-"}</div>
|
|
<div>상태: 배치완료</div>
|
|
`
|
|
: `
|
|
<strong>공석</strong>
|
|
<div>좌석: ${hovered.key}</div>
|
|
<div>상태: 미배치</div>
|
|
`;
|
|
tooltip.style.left = `${pointer.x + 14}px`;
|
|
tooltip.style.top = `${pointer.y + 14}px`;
|
|
tooltip.classList.add("visible");
|
|
};
|
|
|
|
const originalDraw = draw;
|
|
draw = function drawWithAssignments() {
|
|
originalDraw();
|
|
if (!seatAssignments.size) return;
|
|
const rect = canvas.getBoundingClientRect();
|
|
ctx.setTransform(pixelRatio, 0, 0, pixelRatio, 0, 0);
|
|
ctx.textBaseline = "middle";
|
|
for (const chair of chairGeometry) {
|
|
const assignment = getAssignment(chair.key);
|
|
if (!assignment) continue;
|
|
const center = worldToScreen((chair.minX + chair.maxX) / 2, (chair.minY + chair.maxY) / 2);
|
|
if (center.x < -120 || center.x > rect.width + 120 || center.y < -50 || center.y > rect.height + 50) continue;
|
|
const primary = `${assignment.name}`;
|
|
const secondary = `${assignment.rank || "-"}`;
|
|
ctx.font = "700 12px Pretendard, sans-serif";
|
|
const primaryWidth = ctx.measureText(primary).width;
|
|
ctx.font = "600 10px Pretendard, sans-serif";
|
|
const secondaryWidth = ctx.measureText(secondary).width;
|
|
const boxWidth = Math.max(primaryWidth, secondaryWidth) + 20;
|
|
const boxHeight = 34;
|
|
const boxX = center.x - boxWidth / 2;
|
|
const boxY = center.y - 46;
|
|
ctx.fillStyle = "rgba(255,255,255,0.96)";
|
|
ctx.strokeStyle = "rgba(220,38,38,0.18)";
|
|
ctx.lineWidth = 1;
|
|
ctx.beginPath();
|
|
ctx.roundRect(boxX, boxY, boxWidth, boxHeight, 10);
|
|
ctx.fill();
|
|
ctx.stroke();
|
|
ctx.fillStyle = "#111827";
|
|
ctx.font = "700 12px Pretendard, sans-serif";
|
|
ctx.fillText(primary, boxX + 10, boxY + 12);
|
|
ctx.fillStyle = "#6b7280";
|
|
ctx.font = "600 10px Pretendard, sans-serif";
|
|
ctx.fillText(secondary, boxX + 10, boxY + 25);
|
|
}
|
|
};
|
|
|
|
window.__mhSeatmap = {
|
|
getCanvas() { return document.getElementById("canvas"); },
|
|
pickChairAt(x, y) { return typeof pickChair === "function" ? pickChair(x, y) : null; },
|
|
setPlaced(keys) {
|
|
placed.clear();
|
|
(keys || []).forEach((key) => placed.add(String(key)));
|
|
if (typeof requestDraw === "function") requestDraw();
|
|
},
|
|
setAssignments,
|
|
focusChair,
|
|
setViewerMode,
|
|
};
|
|
|
|
setAssignments(__INITIAL_ASSIGNMENTS__);
|
|
|
|
canvas.addEventListener("click", (event) => {
|
|
if (viewerMode === "compact") return;
|
|
const rect = canvas.getBoundingClientRect();
|
|
const picked = window.__mhSeatmap.pickChairAt(
|
|
event.clientX - rect.left,
|
|
event.clientY - rect.top,
|
|
);
|
|
if (!picked) {
|
|
selectedChairKey = null;
|
|
hideSeatPopup();
|
|
if (typeof requestDraw === "function") requestDraw();
|
|
return;
|
|
}
|
|
selectedChairKey = seatAssignments.has(String(picked.key)) ? String(picked.key) : null;
|
|
if (selectedChairKey) showSeatPopup(selectedChairKey, event.clientX - rect.left, event.clientY - rect.top);
|
|
else hideSeatPopup();
|
|
if (typeof requestDraw === "function") requestDraw();
|
|
});
|
|
|
|
popup.addEventListener("click", (event) => {
|
|
const button = event.target.closest("[data-seatmap-delete]");
|
|
if (!button) return;
|
|
const slotKey = String(button.dataset.seatmapDelete || "");
|
|
if (!slotKey) return;
|
|
selectedChairKey = null;
|
|
hideSeatPopup();
|
|
window.parent.postMessage({ type: "seatmap-clear-slot", key: slotKey }, window.location.origin);
|
|
});
|
|
|
|
canvas.addEventListener("contextmenu", (event) => {
|
|
event.preventDefault();
|
|
});
|
|
|
|
window.addEventListener("message", (event) => {
|
|
const data = event.data;
|
|
if (!data || typeof data !== "object") return;
|
|
if (data.type === "seatmap-set-placed") {
|
|
window.__mhSeatmap.setPlaced(Array.isArray(data.keys) ? data.keys : []);
|
|
}
|
|
if (data.type === "seatmap-set-assignments") {
|
|
window.__mhSeatmap.setAssignments(Array.isArray(data.items) ? data.items : []);
|
|
}
|
|
if (data.type === "seatmap-focus-chair") {
|
|
window.__mhSeatmap.focusChair(String(data.key || ""), Number(data.padding || 2200));
|
|
}
|
|
if (data.type === "seatmap-set-mode") {
|
|
window.__mhSeatmap.setViewerMode(String(data.mode || "default"));
|
|
}
|
|
});
|
|
</script>
|
|
"""
|
|
bridge_script = bridge_script.replace("__INITIAL_ASSIGNMENTS__", assignments_literal, 1)
|
|
html = html.replace("</body>", f"{bridge_script}\n</body>", 1)
|
|
return html
|
|
|
|
|
|
def save_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> list[dict[str, object]]:
|
|
seat_map = fetch_seat_map(seat_map_id)
|
|
if seat_map is None:
|
|
raise HTTPException(status_code=404, detail="Seat map not found.")
|
|
|
|
member_ids: list[int] = []
|
|
occupied_cells: set[tuple[int, int]] = set()
|
|
occupied_slots: set[int] = set()
|
|
requires_slot = seat_map["source_type"] in {"dxf", "fixed_html"}
|
|
for item in payload.placements:
|
|
if requires_slot:
|
|
if item.seat_slot_id is None:
|
|
raise HTTPException(status_code=400, detail="고정 도면 자리배치도는 seat_slot_id가 필요합니다.")
|
|
if item.seat_slot_id in occupied_slots:
|
|
raise HTTPException(status_code=400, detail="같은 좌석에 둘 이상의 구성원을 배치할 수 없습니다.")
|
|
occupied_slots.add(item.seat_slot_id)
|
|
else:
|
|
if item.row_index >= int(seat_map["grid_rows"]) or item.col_index >= int(seat_map["grid_cols"]):
|
|
raise HTTPException(status_code=400, detail="좌표가 자리배치도 범위를 벗어났습니다.")
|
|
cell_key = (item.row_index, item.col_index)
|
|
if cell_key in occupied_cells:
|
|
raise HTTPException(status_code=400, detail="같은 칸에 둘 이상의 구성원을 배치할 수 없습니다.")
|
|
occupied_cells.add(cell_key)
|
|
member_ids.append(item.member_id)
|
|
|
|
if len(member_ids) != len(set(member_ids)):
|
|
raise HTTPException(status_code=400, detail="같은 구성원을 중복 배치할 수 없습니다.")
|
|
|
|
if member_ids:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT id FROM members WHERE id = ANY(%s)", (member_ids,))
|
|
existing_ids = {int(row["id"]) for row in cur.fetchall()}
|
|
missing_ids = sorted(set(member_ids) - existing_ids)
|
|
if missing_ids:
|
|
raise HTTPException(status_code=400, detail=f"존재하지 않는 구성원 ID가 포함되어 있습니다: {missing_ids}")
|
|
|
|
if requires_slot:
|
|
slot_ids = sorted(occupied_slots)
|
|
cur.execute("SELECT id FROM seat_slots WHERE seat_map_id = %s AND id = ANY(%s)", (seat_map_id, slot_ids))
|
|
existing_slot_ids = {int(row["id"]) for row in cur.fetchall()}
|
|
missing_slot_ids = sorted(set(slot_ids) - existing_slot_ids)
|
|
if missing_slot_ids:
|
|
raise HTTPException(status_code=400, detail=f"존재하지 않는 좌석 슬롯 ID가 포함되어 있습니다: {missing_slot_ids}")
|
|
cur.execute("SELECT id, label FROM seat_slots WHERE seat_map_id = %s", (seat_map_id,))
|
|
slot_label_map = {int(row["id"]): row["label"] for row in cur.fetchall()}
|
|
else:
|
|
slot_label_map = {}
|
|
|
|
cur.execute("DELETE FROM seat_positions WHERE seat_map_id = %s AND NOT (member_id = ANY(%s))", (seat_map_id, member_ids))
|
|
for item in payload.placements:
|
|
seat_label = item.seat_label.strip() or (
|
|
slot_label_map.get(int(item.seat_slot_id), f"SLOT-{item.seat_slot_id}")
|
|
if requires_slot and item.seat_slot_id is not None
|
|
else compute_seat_label(item.row_index, item.col_index)
|
|
)
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO seat_positions (member_id, seat_map_id, seat_slot_id, row_index, col_index, seat_label, updated_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, NOW())
|
|
ON CONFLICT (member_id) DO UPDATE
|
|
SET seat_map_id = EXCLUDED.seat_map_id,
|
|
seat_slot_id = EXCLUDED.seat_slot_id,
|
|
row_index = EXCLUDED.row_index,
|
|
col_index = EXCLUDED.col_index,
|
|
seat_label = EXCLUDED.seat_label,
|
|
updated_at = NOW()
|
|
""",
|
|
(
|
|
item.member_id,
|
|
seat_map_id,
|
|
item.seat_slot_id if requires_slot else None,
|
|
item.row_index,
|
|
item.col_index,
|
|
seat_label,
|
|
),
|
|
)
|
|
conn.commit()
|
|
else:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("DELETE FROM seat_positions WHERE seat_map_id = %s", (seat_map_id,))
|
|
conn.commit()
|
|
|
|
return fetch_seat_layout(seat_map_id)["placements"]
|
|
|
|
|
|
def get_member_count() -> int:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT COUNT(*) AS count FROM members")
|
|
return int(cur.fetchone()["count"])
|
|
|
|
|
|
def merge_import_member(item: MemberPayload, existing: dict[str, object] | None) -> MemberPayload:
|
|
if existing is None:
|
|
return item
|
|
|
|
payload = item.model_copy(deep=True)
|
|
if not payload.photo_url.strip():
|
|
payload.photo_url = str(existing.get("photo_url") or "")
|
|
if not payload.seat_label.strip():
|
|
payload.seat_label = str(existing.get("seat_label") or "")
|
|
return payload
|
|
|
|
|
|
def pick_existing_member(
|
|
item: MemberPayload,
|
|
existing_by_employee_id: dict[str, list[dict[str, object]]],
|
|
existing_by_name: dict[str, list[dict[str, object]]],
|
|
matched_ids: set[int],
|
|
) -> dict[str, object] | None:
|
|
employee_id = item.employee_id.strip()
|
|
if employee_id:
|
|
for candidate in existing_by_employee_id.get(employee_id, []):
|
|
candidate_id = int(candidate["id"])
|
|
if candidate_id not in matched_ids:
|
|
return candidate
|
|
|
|
name = item.name.strip()
|
|
if name:
|
|
available = [
|
|
candidate
|
|
for candidate in existing_by_name.get(name, [])
|
|
if int(candidate["id"]) not in matched_ids
|
|
]
|
|
if len(available) == 1:
|
|
return available[0]
|
|
|
|
return None
|
|
|
|
|
|
def replace_members(items: list[MemberPayload]) -> list[dict[str, object]]:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
|
|
work_status, work_time, phone, email, seat_label, photo_url,
|
|
sort_order, created_at, updated_at
|
|
FROM members
|
|
ORDER BY id ASC
|
|
"""
|
|
)
|
|
existing_members = cur.fetchall()
|
|
|
|
existing_by_employee_id: dict[str, list[dict[str, object]]] = {}
|
|
existing_by_name: dict[str, list[dict[str, object]]] = {}
|
|
for member in existing_members:
|
|
employee_id = str(member.get("employee_id") or "").strip()
|
|
name = str(member.get("name") or "").strip()
|
|
if employee_id:
|
|
existing_by_employee_id.setdefault(employee_id, []).append(member)
|
|
if name:
|
|
existing_by_name.setdefault(name, []).append(member)
|
|
|
|
matched_ids: set[int] = set()
|
|
for index, item in enumerate(items):
|
|
existing = pick_existing_member(item, existing_by_employee_id, existing_by_name, matched_ids)
|
|
merged_item = merge_import_member(item, existing)
|
|
if existing is None:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO members (
|
|
name, employee_id, company, rank, role, department, grp, division, team, cell,
|
|
work_status, work_time, phone, email, seat_label, photo_url, sort_order
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
""",
|
|
serialize_member_payload(merged_item, index),
|
|
)
|
|
continue
|
|
|
|
matched_ids.add(int(existing["id"]))
|
|
cur.execute(
|
|
"""
|
|
UPDATE members
|
|
SET name = %s,
|
|
employee_id = %s,
|
|
company = %s,
|
|
rank = %s,
|
|
role = %s,
|
|
department = %s,
|
|
grp = %s,
|
|
division = %s,
|
|
team = %s,
|
|
cell = %s,
|
|
work_status = %s,
|
|
work_time = %s,
|
|
phone = %s,
|
|
email = %s,
|
|
seat_label = %s,
|
|
photo_url = %s,
|
|
sort_order = %s,
|
|
updated_at = NOW()
|
|
WHERE id = %s
|
|
""",
|
|
(*serialize_member_payload(merged_item, index), int(existing["id"])),
|
|
)
|
|
stale_ids = [int(member["id"]) for member in existing_members if int(member["id"]) not in matched_ids]
|
|
if stale_ids:
|
|
cur.execute("DELETE FROM members WHERE id = ANY(%s)", (stale_ids,))
|
|
conn.commit()
|
|
return fetch_members()
|
|
|
|
|
|
def rows_to_member_payloads(rows: list[list[object]]) -> list[MemberPayload]:
|
|
def normalize_header(value: object) -> str:
|
|
return str(value or "").strip().lower()
|
|
|
|
header_idx = next(
|
|
(
|
|
idx
|
|
for idx, row in enumerate(rows)
|
|
if {"이름", "부서"}.issubset({str(value).strip() for value in row})
|
|
or {"name", "part"}.issubset({normalize_header(value) for value in row})
|
|
),
|
|
-1,
|
|
)
|
|
if header_idx < 0:
|
|
raise HTTPException(status_code=400, detail="지원하지 않는 파일 형식입니다. 필수 헤더(이름/부서 또는 name/part)를 찾지 못했습니다.")
|
|
|
|
headers = [normalize_header(value) for value in rows[header_idx]]
|
|
payloads: list[MemberPayload] = []
|
|
|
|
for row in rows[header_idx + 1 :]:
|
|
if not any(str(value or "").strip() for value in row):
|
|
continue
|
|
record: dict[str, object] = {}
|
|
for col_idx, header in enumerate(headers):
|
|
mapped = LEGACY_HEADER_MAP.get(header)
|
|
if not mapped:
|
|
continue
|
|
value = str(row[col_idx] if col_idx < len(row) and row[col_idx] is not None else "").strip()
|
|
if mapped == "phone":
|
|
value = normalize_phone(value)
|
|
record[mapped] = value
|
|
if not str(record.get("name", "")).strip():
|
|
continue
|
|
payloads.append(MemberPayload(**record))
|
|
return payloads
|
|
|
|
|
|
def parse_import_rows(file: UploadFile, content: bytes) -> list[MemberPayload]:
|
|
suffix = Path(file.filename or "").suffix.lower()
|
|
if suffix == ".csv":
|
|
text = content.decode("utf-8-sig")
|
|
rows = list(csv.reader(StringIO(text)))
|
|
return rows_to_member_payloads(rows)
|
|
if suffix in {".xlsx", ".xlsm", ".xltx", ".xltm"}:
|
|
workbook = load_workbook(BytesIO(content), data_only=True)
|
|
sheet = workbook[workbook.sheetnames[0]]
|
|
rows = [list(row) for row in sheet.iter_rows(values_only=True)]
|
|
return rows_to_member_payloads(rows)
|
|
raise HTTPException(status_code=400, detail="xlsx 또는 csv 파일만 업로드할 수 있습니다.")
|
|
|
|
|
|
@app.on_event("startup")
|
|
def startup() -> None:
|
|
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
|
LEGACY_STATIC_DIR.mkdir(parents=True, exist_ok=True)
|
|
init_db()
|
|
|
|
|
|
app.mount("/legacy/static", StaticFiles(directory=LEGACY_STATIC_DIR, check_dir=False), name="legacy-static")
|
|
|
|
|
|
@app.get("/api/health")
|
|
def health() -> dict[str, object]:
|
|
checks = {
|
|
"upload_dir": UPLOAD_DIR.exists(),
|
|
}
|
|
|
|
try:
|
|
member_count = get_member_count()
|
|
checks["database"] = True
|
|
except Exception:
|
|
member_count = None
|
|
checks["database"] = False
|
|
|
|
status = "ok" if all(checks.values()) else "degraded"
|
|
return {
|
|
"status": status,
|
|
"checks": checks,
|
|
"member_count": member_count,
|
|
"timestamp": datetime.utcnow().isoformat() + "Z",
|
|
}
|
|
|
|
|
|
@app.post("/api/mock-login")
|
|
def mock_login(username: str = Form(...), password: str = Form(...)) -> dict[str, object]:
|
|
if not MOCK_LOGIN_ENABLED:
|
|
raise HTTPException(status_code=403, detail="Mock login is disabled.")
|
|
if not username.strip() or not password.strip():
|
|
raise HTTPException(status_code=400, detail="Username and password are required.")
|
|
return {
|
|
"user": {
|
|
"username": username.strip(),
|
|
"display_name": username.strip(),
|
|
"role": "admin",
|
|
},
|
|
"session_expires_at": datetime.utcnow().isoformat() + "Z",
|
|
}
|
|
|
|
|
|
@app.get("/api/members")
|
|
def list_members() -> dict[str, list[dict[str, object]]]:
|
|
return {"items": fetch_members()}
|
|
|
|
|
|
@app.post("/api/members")
|
|
def create_member(payload: MemberPayload) -> dict[str, object]:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT COALESCE(MAX(sort_order), -1) + 1 AS next_order FROM members")
|
|
next_order = int(cur.fetchone()["next_order"])
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO members (
|
|
name, employee_id, company, rank, role, department, grp, division, team, cell,
|
|
work_status, work_time, phone, email, seat_label, photo_url, sort_order
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id, name, employee_id, company, rank, role, department, grp, division, team, cell,
|
|
work_status, work_time, phone, email, seat_label, photo_url,
|
|
sort_order, created_at, updated_at
|
|
""",
|
|
serialize_member_payload(payload, payload.sort_order if payload.sort_order is not None else next_order),
|
|
)
|
|
member = cur.fetchone()
|
|
conn.commit()
|
|
return {"item": member}
|
|
|
|
|
|
@app.put("/api/members/bulk-sync")
|
|
def bulk_sync_members(payload: MemberBulkPayload) -> dict[str, list[dict[str, object]]]:
|
|
return {"items": replace_members(payload.items)}
|
|
|
|
|
|
@app.put("/api/members/{member_id}")
|
|
def update_member(member_id: int, payload: MemberPayload) -> dict[str, object]:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE members
|
|
SET name = %s,
|
|
employee_id = %s,
|
|
company = %s,
|
|
rank = %s,
|
|
role = %s,
|
|
department = %s,
|
|
grp = %s,
|
|
division = %s,
|
|
team = %s,
|
|
cell = %s,
|
|
work_status = %s,
|
|
work_time = %s,
|
|
phone = %s,
|
|
email = %s,
|
|
seat_label = %s,
|
|
photo_url = %s,
|
|
sort_order = COALESCE(%s, sort_order),
|
|
updated_at = NOW()
|
|
WHERE id = %s
|
|
RETURNING id, name, employee_id, company, rank, role, department, grp, division, team, cell,
|
|
work_status, work_time, phone, email, seat_label, photo_url,
|
|
sort_order, created_at, updated_at
|
|
""",
|
|
(*serialize_member_payload(payload, payload.sort_order or 0)[:-1], payload.sort_order, member_id),
|
|
)
|
|
member = cur.fetchone()
|
|
if member is None:
|
|
raise HTTPException(status_code=404, detail="Member not found.")
|
|
conn.commit()
|
|
return {"item": member}
|
|
|
|
|
|
@app.delete("/api/members/{member_id}")
|
|
def delete_member(member_id: int) -> dict[str, bool]:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("DELETE FROM members WHERE id = %s", (member_id,))
|
|
deleted = cur.rowcount > 0
|
|
conn.commit()
|
|
if not deleted:
|
|
raise HTTPException(status_code=404, detail="Member not found.")
|
|
return {"ok": True}
|
|
|
|
|
|
@app.post("/api/members/import")
|
|
async def import_members(file: UploadFile = File(...)) -> dict[str, list[dict[str, object]]]:
|
|
content = await file.read()
|
|
items = parse_import_rows(file, content)
|
|
return {"items": replace_members(items)}
|
|
|
|
|
|
@app.post("/api/uploads/profile-photo")
|
|
def upload_profile_photo(file: UploadFile = File(...), member_name: str = Form("")) -> dict[str, str]:
|
|
suffix = Path(file.filename or "").suffix.lower()
|
|
if suffix not in {".png", ".jpg", ".jpeg", ".webp", ".gif"}:
|
|
raise HTTPException(status_code=400, detail="Only image files are allowed.")
|
|
stem = member_name.strip().replace(" ", "-") or "member"
|
|
filename = f"{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}"
|
|
target = UPLOAD_DIR / filename
|
|
with target.open("wb") as out_file:
|
|
shutil.copyfileobj(file.file, out_file)
|
|
return {"url": f"/uploads/{filename}"}
|
|
|
|
|
|
@app.post("/api/uploads/seat-map-image")
|
|
def upload_seat_map_image(file: UploadFile = File(...), seat_map_name: str = Form("")) -> dict[str, str]:
|
|
suffix = Path(file.filename or "").suffix.lower()
|
|
if suffix not in {".png", ".jpg", ".jpeg", ".webp", ".gif"}:
|
|
raise HTTPException(status_code=400, detail="Only image files are allowed.")
|
|
stem = seat_map_name.strip().replace(" ", "-") or "seat-map"
|
|
filename = f"seat-map-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}"
|
|
target = UPLOAD_DIR / filename
|
|
with target.open("wb") as out_file:
|
|
shutil.copyfileobj(file.file, out_file)
|
|
return {"url": f"/uploads/{filename}"}
|
|
|
|
|
|
@app.post("/api/seat-maps/dxf")
|
|
async def create_dxf_seat_map(file: UploadFile = File(...), name: str = Form(...)) -> dict[str, object]:
|
|
suffix = Path(file.filename or "").suffix.lower()
|
|
if suffix != ".dxf":
|
|
raise HTTPException(status_code=400, detail="DXF 파일만 업로드할 수 있습니다.")
|
|
|
|
stem = name.strip().replace(" ", "-") or "seat-map"
|
|
filename = f"seat-map-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}"
|
|
target = UPLOAD_DIR / filename
|
|
content = await file.read()
|
|
with target.open("wb") as out_file:
|
|
out_file.write(content)
|
|
|
|
try:
|
|
metadata, slots = parse_dxf_layout(target)
|
|
except Exception:
|
|
raise
|
|
|
|
payload = SeatMapPayload(
|
|
name=name.strip(),
|
|
source_type="dxf",
|
|
source_url=f"/uploads/{filename}",
|
|
image_url="",
|
|
preview_svg=metadata["preview_svg"],
|
|
view_box_min_x=metadata["view_box_min_x"],
|
|
view_box_min_y=metadata["view_box_min_y"],
|
|
view_box_width=metadata["view_box_width"],
|
|
view_box_height=metadata["view_box_height"],
|
|
image_width=None,
|
|
image_height=None,
|
|
grid_rows=1,
|
|
grid_cols=1,
|
|
cell_gap=0,
|
|
is_active=True,
|
|
)
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE")
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO seat_maps (
|
|
name, source_type, source_url, preview_svg,
|
|
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
|
|
image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id, name, source_type, source_url, preview_svg,
|
|
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
|
|
image_url, image_width, image_height, grid_rows, grid_cols,
|
|
cell_gap, is_active, created_at, updated_at
|
|
""",
|
|
serialize_seat_map_payload(payload),
|
|
)
|
|
seat_map = cur.fetchone()
|
|
for slot in slots:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO seat_slots (seat_map_id, slot_key, label, x, y, rotation, layer_name)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
""",
|
|
(
|
|
seat_map["id"],
|
|
slot["slot_key"],
|
|
slot["label"],
|
|
slot["x"],
|
|
slot["y"],
|
|
slot["rotation"],
|
|
slot["layer_name"],
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
return fetch_seat_layout(int(seat_map["id"]))
|
|
|
|
|
|
@app.get("/api/seat-maps/active")
|
|
def get_active_seat_map() -> dict[str, dict[str, object]]:
|
|
seat_map = ensure_fixed_office_seat_map(activate=True)
|
|
if seat_map is None:
|
|
raise HTTPException(status_code=404, detail="Active seat map not found.")
|
|
return {"item": seat_map}
|
|
|
|
|
|
@app.post("/api/seat-maps")
|
|
def create_seat_map(payload: SeatMapPayload) -> dict[str, dict[str, object]]:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
if payload.is_active:
|
|
cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE")
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO seat_maps (
|
|
name, source_type, source_url, preview_svg,
|
|
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
|
|
image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id, name, source_type, source_url, preview_svg,
|
|
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
|
|
image_url, image_width, image_height, grid_rows, grid_cols,
|
|
cell_gap, is_active, created_at, updated_at
|
|
""",
|
|
serialize_seat_map_payload(payload),
|
|
)
|
|
seat_map = cur.fetchone()
|
|
conn.commit()
|
|
return {"item": seat_map}
|
|
|
|
|
|
@app.put("/api/seat-maps/{seat_map_id}")
|
|
def update_seat_map(seat_map_id: int, payload: SeatMapPayload) -> dict[str, dict[str, object]]:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
if payload.source_type != "dxf":
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*) AS count
|
|
FROM seat_positions
|
|
WHERE seat_map_id = %s
|
|
AND (row_index >= %s OR col_index >= %s)
|
|
""",
|
|
(seat_map_id, payload.grid_rows, payload.grid_cols),
|
|
)
|
|
out_of_bounds_count = int(cur.fetchone()["count"])
|
|
if out_of_bounds_count > 0:
|
|
raise HTTPException(status_code=400, detail="현재 배치된 좌석이 새 그리드 범위를 벗어납니다. 먼저 좌석 배치를 정리하세요.")
|
|
if payload.is_active:
|
|
cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE AND id <> %s", (seat_map_id,))
|
|
cur.execute(
|
|
"""
|
|
UPDATE seat_maps
|
|
SET name = %s,
|
|
source_type = %s,
|
|
source_url = %s,
|
|
preview_svg = %s,
|
|
view_box_min_x = %s,
|
|
view_box_min_y = %s,
|
|
view_box_width = %s,
|
|
view_box_height = %s,
|
|
image_url = %s,
|
|
image_width = %s,
|
|
image_height = %s,
|
|
grid_rows = %s,
|
|
grid_cols = %s,
|
|
cell_gap = %s,
|
|
is_active = %s,
|
|
updated_at = NOW()
|
|
WHERE id = %s
|
|
RETURNING id, name, source_type, source_url, preview_svg,
|
|
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
|
|
image_url, image_width, image_height, grid_rows, grid_cols,
|
|
cell_gap, is_active, created_at, updated_at
|
|
""",
|
|
(*serialize_seat_map_payload(payload), seat_map_id),
|
|
)
|
|
seat_map = cur.fetchone()
|
|
if seat_map is None:
|
|
raise HTTPException(status_code=404, detail="Seat map not found.")
|
|
conn.commit()
|
|
return {"item": seat_map}
|
|
|
|
|
|
@app.get("/api/seat-maps/{seat_map_id}/layout")
|
|
def get_seat_layout(seat_map_id: int) -> dict[str, object]:
|
|
return fetch_seat_layout(seat_map_id)
|
|
|
|
|
|
@app.get("/api/seat-maps/{seat_map_id}/viewer")
|
|
def get_seat_map_viewer(seat_map_id: int) -> HTMLResponse:
|
|
layout = fetch_seat_layout(seat_map_id)
|
|
seat_map = layout.get("seat_map") or {}
|
|
if seat_map.get("source_type") not in {"dxf", "fixed_html"}:
|
|
raise HTTPException(status_code=400, detail="Viewer is only available for supported seat maps.")
|
|
return HTMLResponse(build_center_chair_viewer_html(layout))
|
|
|
|
|
|
@app.put("/api/seat-maps/{seat_map_id}/layout")
|
|
def update_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> dict[str, list[dict[str, object]]]:
|
|
return {"items": save_seat_layout(seat_map_id, payload)}
|
|
|
|
|
|
@app.get("/legacy/organization")
|
|
def legacy_organization() -> FileResponse:
|
|
target = LEGACY_DIR / "DashBoard-organization.html"
|
|
if not target.exists():
|
|
raise HTTPException(status_code=404, detail="Legacy dashboard file not found.")
|
|
return FileResponse(target)
|
|
|
|
|
|
@app.get("/legacy/organization-backup")
|
|
def legacy_organization_backup() -> FileResponse:
|
|
target = LEGACY_DIR / "DashBoard-organization-backup.html"
|
|
if not target.exists():
|
|
raise HTTPException(status_code=404, detail="Legacy dashboard backup not found.")
|
|
return FileResponse(target)
|
|
|
|
|
|
@app.get("/uploads/{filename}")
|
|
def get_upload(filename: str) -> FileResponse:
|
|
target = UPLOAD_DIR / filename
|
|
if not target.exists():
|
|
raise HTTPException(status_code=404, detail="Upload not found.")
|
|
return FileResponse(target)
|