Files
MH-DashBoard-organization/backend/app/main.py
2026-03-25 17:34:37 +09:00

1150 lines
44 KiB
Python
Executable File

from __future__ import annotations
from datetime import datetime
from pathlib import Path
import csv
from io import BytesIO, StringIO
import math
import shutil
import uuid
import ezdxf
from ezdxf import recover
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from openpyxl import load_workbook
from pydantic import BaseModel, Field
from .config import LEGACY_DIR, MOCK_LOGIN_ENABLED, UPLOAD_DIR
from .db import get_conn, init_db
app = FastAPI(title="MH Dashboard Organization API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
LEGACY_STATIC_DIR = LEGACY_DIR / "static"
class MemberPayload(BaseModel):
id: int | None = None
name: str = Field(min_length=1)
employee_id: str = ""
company: str = ""
rank: str = ""
role: str = ""
department: str = ""
grp: str = ""
division: str = ""
team: str = ""
cell: str = ""
work_status: str = ""
work_time: str = ""
phone: str = ""
email: str = ""
seat_label: str = ""
photo_url: str = ""
sort_order: int | None = None
class MemberBulkPayload(BaseModel):
items: list[MemberPayload]
class SeatMapPayload(BaseModel):
name: str = Field(min_length=1)
image_url: str = ""
source_type: str = "image"
source_url: str = ""
preview_svg: str = ""
view_box_min_x: float | None = None
view_box_min_y: float | None = None
view_box_width: float | None = None
view_box_height: float | None = None
image_width: int | None = None
image_height: int | None = None
grid_rows: int = Field(default=1, ge=1, le=200)
grid_cols: int = Field(default=1, ge=1, le=200)
cell_gap: int = Field(default=0, ge=0, le=24)
is_active: bool = True
class SeatPlacementPayload(BaseModel):
member_id: int
seat_slot_id: int | None = None
row_index: int = Field(default=0, ge=0)
col_index: int = Field(default=0, ge=0)
seat_label: str = ""
class SeatLayoutPayload(BaseModel):
placements: list[SeatPlacementPayload]
LEGACY_HEADER_MAP = {
"이름": "name",
"name": "name",
"tag": "employee_id",
"employee_id": "employee_id",
"소속회사": "company",
"co": "company",
"company": "company",
"직급": "rank",
"rank": "rank",
"직책": "role",
"pos": "role",
"role": "role",
"부서": "department",
"part": "department",
"department": "department",
"그룹": "grp",
"gr": "grp",
"grp": "grp",
"디비전": "division",
"div": "division",
"division": "division",
"": "team",
"team": "team",
"teal": "team",
"": "cell",
"cell": "cell",
"근무상태": "work_status",
"work_status": "work_status",
"근무시간": "work_time",
"work_time": "work_time",
"전화번호": "phone",
"phone": "phone",
"이메일": "email",
"email": "email",
"자리위치": "seat_label",
"seat_label": "seat_label",
"사진": "photo_url",
"photo_url": "photo_url",
}
def serialize_member_payload(item: MemberPayload, sort_order: int) -> tuple[object, ...]:
return (
item.name.strip(),
item.employee_id.strip(),
item.company.strip(),
item.rank.strip(),
item.role.strip(),
item.department.strip(),
item.grp.strip(),
item.division.strip(),
item.team.strip(),
item.cell.strip(),
item.work_status.strip(),
item.work_time.strip(),
item.phone.strip(),
item.email.strip(),
item.seat_label.strip(),
item.photo_url.strip(),
sort_order,
)
def fetch_members() -> list[dict[str, object]]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url,
sort_order, created_at, updated_at
FROM members
ORDER BY sort_order ASC, id ASC
"""
)
return cur.fetchall()
def serialize_seat_map_payload(payload: SeatMapPayload) -> tuple[object, ...]:
return (
payload.name.strip(),
payload.source_type.strip() or "image",
payload.source_url.strip(),
payload.preview_svg,
payload.view_box_min_x,
payload.view_box_min_y,
payload.view_box_width,
payload.view_box_height,
payload.image_url.strip(),
payload.image_width,
payload.image_height,
payload.grid_rows,
payload.grid_cols,
payload.cell_gap,
payload.is_active,
)
def fetch_seat_map(seat_map_id: int) -> dict[str, object] | None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_url, image_width, image_height, grid_rows, grid_cols,
cell_gap, is_active, created_at, updated_at
FROM seat_maps
WHERE id = %s
""",
(seat_map_id,),
)
return cur.fetchone()
def fetch_active_seat_map() -> dict[str, object] | None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_url, image_width, image_height, grid_rows, grid_cols,
cell_gap, is_active, created_at, updated_at
FROM seat_maps
WHERE is_active = TRUE
ORDER BY updated_at DESC, id DESC
LIMIT 1
"""
)
return cur.fetchone()
def compute_seat_label(row_index: int, col_index: int) -> str:
quotient = row_index
row_label = ""
while True:
quotient, remainder = divmod(quotient, 26)
row_label = chr(65 + remainder) + row_label
if quotient == 0:
break
quotient -= 1
return f"{row_label}-{col_index + 1:02d}"
def compute_slot_label(index: int) -> str:
return f"CHAIR-{index + 1:03d}"
def is_chair_layer(layer_name: str) -> bool:
raw = layer_name.strip().lower()
compact = raw.replace("-", "").replace("_", "").replace(" ", "")
return raw in {"chair", "_chair", "-chair"} or compact.endswith("chair")
def inspect_dxf_header(file_path: Path) -> tuple[str, str]:
with file_path.open("rb") as source:
header = source.read(128)
header_text = header.decode("latin-1", errors="ignore").replace("\x00", "")
preview = header[:32].hex(" ")
if header_text.startswith("AutoCAD Binary DXF"):
return ("binary_dxf", preview)
if header_text.startswith("0\nSECTION") or header_text.startswith("0\r\nSECTION"):
return ("ascii_dxf", preview)
if header.startswith(b"AC10"):
return ("dwg_or_dwg_like", preview)
return ("unknown", preview)
def iter_render_entities(entity: ezdxf.entities.DXFGraphic, inherited_layer: str | None = None, depth: int = 0) -> list[ezdxf.entities.DXFGraphic]:
if depth > 6:
return []
entity_type = entity.dxftype()
current_layer = inherited_layer or entity.dxf.layer
if entity_type == "INSERT":
expanded: list[ezdxf.entities.DXFGraphic] = []
try:
for child in entity.virtual_entities():
child_layer = child.dxf.layer
if child_layer == "0":
child.dxf.layer = current_layer
expanded.extend(iter_render_entities(child, inherited_layer=current_layer, depth=depth + 1))
except Exception:
return []
return expanded
if inherited_layer and entity.dxf.layer == "0":
entity.dxf.layer = inherited_layer
return [entity]
def get_entity_points(entity: ezdxf.entities.DXFGraphic) -> list[tuple[float, float]]:
entity_type = entity.dxftype()
if entity_type == "LINE":
return [
(float(entity.dxf.start.x), float(entity.dxf.start.y)),
(float(entity.dxf.end.x), float(entity.dxf.end.y)),
]
if entity_type == "LWPOLYLINE":
return [(float(point[0]), float(point[1])) for point in entity.get_points("xy")]
if entity_type == "POLYLINE":
return [(float(vertex.dxf.location.x), float(vertex.dxf.location.y)) for vertex in entity.vertices]
if entity_type == "CIRCLE":
center = entity.dxf.center
radius = float(entity.dxf.radius)
return [
(float(center.x - radius), float(center.y - radius)),
(float(center.x + radius), float(center.y + radius)),
]
if entity_type == "ARC":
center = entity.dxf.center
radius = float(entity.dxf.radius)
return [
(float(center.x - radius), float(center.y - radius)),
(float(center.x + radius), float(center.y + radius)),
]
if entity_type == "POINT":
location = entity.dxf.location
return [(float(location.x), float(location.y))]
if entity_type == "SPLINE":
try:
return [(float(point[0]), float(point[1])) for point in entity.flattening(2)]
except Exception:
return []
if entity_type == "ELLIPSE":
try:
return [(float(point[0]), float(point[1])) for point in entity.flattening(2)]
except Exception:
center = entity.dxf.center
major_axis = entity.dxf.major_axis
ratio = float(entity.dxf.ratio)
radius_x = math.hypot(float(major_axis.x), float(major_axis.y))
radius_y = radius_x * ratio
return [
(float(center.x - radius_x), float(center.y - radius_y)),
(float(center.x + radius_x), float(center.y + radius_y)),
]
if entity_type == "INSERT":
insert = entity.dxf.insert
return [(float(insert.x), float(insert.y))]
return []
def get_entity_center(entity: ezdxf.entities.DXFGraphic) -> tuple[float, float] | None:
points = get_entity_points(entity)
if not points:
return None
min_x = min(point[0] for point in points)
max_x = max(point[0] for point in points)
min_y = min(point[1] for point in points)
max_y = max(point[1] for point in points)
return ((min_x + max_x) / 2.0, (min_y + max_y) / 2.0)
def get_entity_bounds(entity: ezdxf.entities.DXFGraphic) -> tuple[float, float, float, float] | None:
points = get_entity_points(entity)
if not points:
return None
min_x = min(point[0] for point in points)
max_x = max(point[0] for point in points)
min_y = min(point[1] for point in points)
max_y = max(point[1] for point in points)
return (min_x, min_y, max_x, max_y)
def compute_bounds_from_points(points: list[tuple[float, float]]) -> tuple[float, float, float, float]:
min_x = min(point[0] for point in points)
max_x = max(point[0] for point in points)
min_y = min(point[1] for point in points)
max_y = max(point[1] for point in points)
return (min_x, min_y, max(max_x - min_x, 1.0), max(max_y - min_y, 1.0))
def percentile(values: list[float], ratio: float) -> float:
if not values:
return 0.0
ordered = sorted(values)
index = max(0, min(len(ordered) - 1, round((len(ordered) - 1) * ratio)))
return float(ordered[index])
def compute_focus_bounds(slot_points: list[tuple[float, float]]) -> tuple[float, float, float, float]:
x_values = [point[0] for point in slot_points]
y_values = [point[1] for point in slot_points]
min_x = percentile(x_values, 0.02)
max_x = percentile(x_values, 0.98)
min_y = percentile(y_values, 0.02)
max_y = percentile(y_values, 0.98)
width = max(max_x - min_x, 1.0)
height = max(max_y - min_y, 1.0)
pad_x = max(width * 0.08, 500.0)
pad_y = max(height * 0.08, 500.0)
return (min_x - pad_x, min_y - pad_y, max_x + pad_x, max_y + pad_y)
def bounds_intersect(bounds: tuple[float, float, float, float], focus_bounds: tuple[float, float, float, float]) -> bool:
min_x, min_y, max_x, max_y = bounds
focus_min_x, focus_min_y, focus_max_x, focus_max_y = focus_bounds
return not (
max_x < focus_min_x
or min_x > focus_max_x
or max_y < focus_min_y
or min_y > focus_max_y
)
def line_svg(points: list[tuple[float, float]], css_class: str = "seatmap-dxf-entity") -> str:
if len(points) < 2:
return ""
coordinates = " ".join(f"{x:.2f},{-y:.2f}" for x, y in points)
return (
f'<polyline class="{css_class}" points="{coordinates}" fill="none" '
'stroke-linejoin="round" stroke-linecap="round" />'
)
def circle_svg(
center_x: float,
center_y: float,
radius: float,
stroke: str = "#475569",
fill: str = "none",
css_class: str = "seatmap-dxf-entity",
) -> str:
return (
f'<circle class="{css_class}" cx="{center_x:.2f}" cy="{-center_y:.2f}" r="{radius:.2f}" '
f'stroke="{stroke}" fill="{fill}" />'
)
def build_dxf_preview_svg(
entities: list[ezdxf.entities.DXFGraphic],
bounds: tuple[float, float, float, float],
) -> str:
min_x, min_y, width, height = bounds
max_y = min_y + height
svg_parts: list[str] = []
for entity in entities:
layer_name = entity.dxf.layer
is_chair = is_chair_layer(layer_name)
css_class = "seatmap-dxf-chair-entity" if is_chair else "seatmap-dxf-entity"
entity_type = entity.dxftype()
if entity_type in {"LINE", "LWPOLYLINE", "POLYLINE", "SPLINE", "ELLIPSE"}:
svg = line_svg(get_entity_points(entity), css_class=css_class)
if svg:
svg_parts.append(svg)
elif entity_type == "CIRCLE":
center = entity.dxf.center
svg_parts.append(
circle_svg(
float(center.x),
float(center.y),
float(entity.dxf.radius),
fill="none",
css_class=css_class,
)
)
elif entity_type == "ARC":
center = entity.dxf.center
radius = float(entity.dxf.radius)
start_angle = math.radians(float(entity.dxf.start_angle))
end_angle = math.radians(float(entity.dxf.end_angle))
start_x = float(center.x) + radius * math.cos(start_angle)
start_y = float(center.y) + radius * math.sin(start_angle)
end_x = float(center.x) + radius * math.cos(end_angle)
end_y = float(center.y) + radius * math.sin(end_angle)
large_arc = 1 if abs(float(entity.dxf.end_angle) - float(entity.dxf.start_angle)) > 180 else 0
svg_parts.append(
f'<path class="{css_class}" d="M {start_x:.2f} {-start_y:.2f} '
f'A {radius:.2f} {radius:.2f} 0 {large_arc} 0 {end_x:.2f} {-end_y:.2f}" fill="none" />'
)
view_box = f"{min_x:.2f} {-max_y:.2f} {max(width, 1.0):.2f} {max(height, 1.0):.2f}"
return (
f'<svg class="seatmap-preview-svg" viewBox="{view_box}" xmlns="http://www.w3.org/2000/svg" preserveAspectRatio="xMidYMid meet">'
'<rect width="100%" height="100%" fill="#ffffff" />'
+ "".join(svg_parts)
+ "</svg>"
)
def parse_dxf_layout(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]]]:
try:
document = ezdxf.readfile(file_path)
except OSError:
try:
document, _ = recover.readfile(file_path)
except Exception as exc:
kind, preview = inspect_dxf_header(file_path)
if kind == "binary_dxf":
raise HTTPException(
status_code=400,
detail=f"Binary DXF로 보이지만 해석에 실패했습니다. 가능하면 ASCII DXF로 다시 저장해 업로드하세요. 헤더={preview}",
) from exc
if kind == "dwg_or_dwg_like":
raise HTTPException(
status_code=400,
detail=f"업로드한 파일은 DWG 계열 헤더(AC10xx)로 보입니다. DWG가 아니라 ASCII DXF로 다시 저장해 업로드하세요. 헤더={preview}",
) from exc
if kind == "ascii_dxf":
raise HTTPException(
status_code=400,
detail=f"ASCII DXF로 보이지만 구조를 해석하지 못했습니다. 도면을 다른 DXF 버전으로 다시 저장해보세요. 헤더={preview}",
) from exc
raise HTTPException(
status_code=400,
detail=f"업로드한 파일 형식을 판별하지 못했습니다. 확장자만 dxf인 파일일 수 있습니다. 헤더={preview}",
) from exc
modelspace = document.modelspace()
base_entities = [entity for entity in modelspace if entity.dxftype() in {"LINE", "LWPOLYLINE", "POLYLINE", "CIRCLE", "ARC", "INSERT", "SPLINE", "ELLIPSE"}]
all_entities: list[ezdxf.entities.DXFGraphic] = []
for entity in base_entities:
all_entities.extend(iter_render_entities(entity))
chair_entities: list[ezdxf.entities.DXFGraphic] = []
chair_points: list[tuple[float, float]] = []
for entity in all_entities:
if is_chair_layer(entity.dxf.layer):
chair_entities.append(entity)
chair_points.extend(get_entity_points(entity))
if not chair_entities:
raise HTTPException(status_code=400, detail="DXF 파일에서 chair 계열 레이어를 찾지 못했습니다.")
if not chair_points:
raise HTTPException(status_code=400, detail="DXF 좌표를 해석하지 못했습니다.")
slots: list[dict[str, object]] = []
for index, entity in enumerate(sorted(chair_entities, key=lambda item: (-(get_entity_center(item) or (0.0, 0.0))[1], (get_entity_center(item) or (0.0, 0.0))[0]))):
center = get_entity_center(entity)
if center is None:
continue
slots.append(
{
"slot_key": entity.dxf.handle,
"label": compute_slot_label(index),
"x": round(float(center[0]), 3),
"y": round(float(center[1]), 3),
"rotation": float(getattr(entity.dxf, "rotation", 0.0) or 0.0),
"layer_name": entity.dxf.layer,
}
)
if not slots:
raise HTTPException(status_code=400, detail="chair 레이어에서 좌석 위치를 추출하지 못했습니다.")
slot_points = [(float(slot["x"]), float(slot["y"])) for slot in slots]
focus_bounds = compute_focus_bounds(slot_points)
visible_entities: list[ezdxf.entities.DXFGraphic] = []
visible_points: list[tuple[float, float]] = []
for entity in all_entities:
entity_bounds = get_entity_bounds(entity)
if entity_bounds is None:
continue
if bounds_intersect(entity_bounds, focus_bounds):
visible_entities.append(entity)
visible_points.extend(get_entity_points(entity))
if not visible_entities or not visible_points:
visible_entities = all_entities
visible_points = chair_points
focus_min_x, focus_min_y, focus_max_x, focus_max_y = focus_bounds
min_x = focus_min_x
min_y = focus_min_y
width = max(focus_max_x - focus_min_x, 1.0)
height = max(focus_max_y - focus_min_y, 1.0)
preview_svg = build_dxf_preview_svg(visible_entities, (min_x, min_y, width, height))
metadata = {
"source_type": "dxf",
"view_box_min_x": round(min_x, 3),
"view_box_min_y": round(min_y, 3),
"view_box_width": round(width, 3),
"view_box_height": round(height, 3),
"preview_svg": preview_svg,
"grid_rows": 1,
"grid_cols": 1,
"image_width": None,
"image_height": None,
"cell_gap": 0,
}
return metadata, slots
def fetch_seat_layout(seat_map_id: int) -> dict[str, object]:
seat_map = fetch_seat_map(seat_map_id)
if seat_map is None:
raise HTTPException(status_code=404, detail="Seat map not found.")
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT m.id, m.name, m.company, m.rank, m.role, m.department, m.grp, m.division,
m.team, m.cell, m.work_status, m.work_time, m.phone, m.email,
m.seat_label AS member_seat_label, m.photo_url, m.sort_order
FROM members m
ORDER BY m.sort_order ASC, m.id ASC
"""
)
members = cur.fetchall()
cur.execute(
"""
SELECT id, slot_key, label, x, y, rotation, layer_name
FROM seat_slots
WHERE seat_map_id = %s
ORDER BY label ASC, id ASC
""",
(seat_map_id,),
)
slots = cur.fetchall()
cur.execute(
"""
SELECT sp.member_id, sp.row_index, sp.col_index, sp.seat_label,
sp.seat_slot_id,
m.name, m.company, m.rank, m.role, m.department, m.grp, m.division,
m.team, m.cell, m.work_status, m.work_time, m.phone, m.email,
m.photo_url, m.sort_order
FROM seat_positions sp
JOIN members m ON m.id = sp.member_id
WHERE sp.seat_map_id = %s
ORDER BY sp.row_index ASC, sp.col_index ASC, m.sort_order ASC, m.id ASC
""",
(seat_map_id,),
)
placements = cur.fetchall()
return {
"seat_map": seat_map,
"members": members,
"slots": slots,
"placements": placements,
}
def save_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> list[dict[str, object]]:
seat_map = fetch_seat_map(seat_map_id)
if seat_map is None:
raise HTTPException(status_code=404, detail="Seat map not found.")
member_ids: list[int] = []
occupied_cells: set[tuple[int, int]] = set()
occupied_slots: set[int] = set()
is_dxf = seat_map["source_type"] == "dxf"
for item in payload.placements:
if is_dxf:
if item.seat_slot_id is None:
raise HTTPException(status_code=400, detail="DXF 자리배치도는 seat_slot_id가 필요합니다.")
if item.seat_slot_id in occupied_slots:
raise HTTPException(status_code=400, detail="같은 좌석에 둘 이상의 구성원을 배치할 수 없습니다.")
occupied_slots.add(item.seat_slot_id)
else:
if item.row_index >= int(seat_map["grid_rows"]) or item.col_index >= int(seat_map["grid_cols"]):
raise HTTPException(status_code=400, detail="좌표가 자리배치도 범위를 벗어났습니다.")
cell_key = (item.row_index, item.col_index)
if cell_key in occupied_cells:
raise HTTPException(status_code=400, detail="같은 칸에 둘 이상의 구성원을 배치할 수 없습니다.")
occupied_cells.add(cell_key)
member_ids.append(item.member_id)
if len(member_ids) != len(set(member_ids)):
raise HTTPException(status_code=400, detail="같은 구성원을 중복 배치할 수 없습니다.")
if member_ids:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id FROM members WHERE id = ANY(%s)", (member_ids,))
existing_ids = {int(row["id"]) for row in cur.fetchall()}
missing_ids = sorted(set(member_ids) - existing_ids)
if missing_ids:
raise HTTPException(status_code=400, detail=f"존재하지 않는 구성원 ID가 포함되어 있습니다: {missing_ids}")
if is_dxf:
slot_ids = sorted(occupied_slots)
cur.execute("SELECT id FROM seat_slots WHERE seat_map_id = %s AND id = ANY(%s)", (seat_map_id, slot_ids))
existing_slot_ids = {int(row["id"]) for row in cur.fetchall()}
missing_slot_ids = sorted(set(slot_ids) - existing_slot_ids)
if missing_slot_ids:
raise HTTPException(status_code=400, detail=f"존재하지 않는 좌석 슬롯 ID가 포함되어 있습니다: {missing_slot_ids}")
cur.execute("SELECT id, label FROM seat_slots WHERE seat_map_id = %s", (seat_map_id,))
slot_label_map = {int(row["id"]): row["label"] for row in cur.fetchall()}
else:
slot_label_map = {}
cur.execute("DELETE FROM seat_positions WHERE seat_map_id = %s AND NOT (member_id = ANY(%s))", (seat_map_id, member_ids))
for item in payload.placements:
seat_label = item.seat_label.strip() or (
slot_label_map.get(int(item.seat_slot_id), f"SLOT-{item.seat_slot_id}")
if is_dxf and item.seat_slot_id is not None
else compute_seat_label(item.row_index, item.col_index)
)
cur.execute(
"""
INSERT INTO seat_positions (member_id, seat_map_id, seat_slot_id, row_index, col_index, seat_label, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (member_id) DO UPDATE
SET seat_map_id = EXCLUDED.seat_map_id,
seat_slot_id = EXCLUDED.seat_slot_id,
row_index = EXCLUDED.row_index,
col_index = EXCLUDED.col_index,
seat_label = EXCLUDED.seat_label,
updated_at = NOW()
""",
(
item.member_id,
seat_map_id,
item.seat_slot_id if is_dxf else None,
item.row_index,
item.col_index,
seat_label,
),
)
conn.commit()
else:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM seat_positions WHERE seat_map_id = %s", (seat_map_id,))
conn.commit()
return fetch_seat_layout(seat_map_id)["placements"]
def get_member_count() -> int:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) AS count FROM members")
return int(cur.fetchone()["count"])
def replace_members(items: list[MemberPayload]) -> list[dict[str, object]]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("TRUNCATE TABLE members RESTART IDENTITY CASCADE")
for index, item in enumerate(items):
cur.execute(
"""
INSERT INTO members (
name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url, sort_order
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
serialize_member_payload(item, index),
)
conn.commit()
return fetch_members()
def rows_to_member_payloads(rows: list[list[object]]) -> list[MemberPayload]:
def normalize_header(value: object) -> str:
return str(value or "").strip().lower()
header_idx = next(
(
idx
for idx, row in enumerate(rows)
if {"이름", "부서"}.issubset({str(value).strip() for value in row})
or {"name", "part"}.issubset({normalize_header(value) for value in row})
),
-1,
)
if header_idx < 0:
raise HTTPException(status_code=400, detail="지원하지 않는 파일 형식입니다. 필수 헤더(이름/부서 또는 name/part)를 찾지 못했습니다.")
headers = [normalize_header(value) for value in rows[header_idx]]
payloads: list[MemberPayload] = []
for row in rows[header_idx + 1 :]:
if not any(str(value or "").strip() for value in row):
continue
record: dict[str, object] = {}
for col_idx, header in enumerate(headers):
mapped = LEGACY_HEADER_MAP.get(header)
if not mapped:
continue
record[mapped] = str(row[col_idx] if col_idx < len(row) and row[col_idx] is not None else "").strip()
if not str(record.get("name", "")).strip():
continue
payloads.append(MemberPayload(**record))
return payloads
def parse_import_rows(file: UploadFile, content: bytes) -> list[MemberPayload]:
suffix = Path(file.filename or "").suffix.lower()
if suffix == ".csv":
text = content.decode("utf-8-sig")
rows = list(csv.reader(StringIO(text)))
return rows_to_member_payloads(rows)
if suffix in {".xlsx", ".xlsm", ".xltx", ".xltm"}:
workbook = load_workbook(BytesIO(content), data_only=True)
sheet = workbook[workbook.sheetnames[0]]
rows = [list(row) for row in sheet.iter_rows(values_only=True)]
return rows_to_member_payloads(rows)
raise HTTPException(status_code=400, detail="xlsx 또는 csv 파일만 업로드할 수 있습니다.")
@app.on_event("startup")
def startup() -> None:
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
LEGACY_STATIC_DIR.mkdir(parents=True, exist_ok=True)
init_db()
app.mount("/legacy/static", StaticFiles(directory=LEGACY_STATIC_DIR, check_dir=False), name="legacy-static")
@app.get("/api/health")
def health() -> dict[str, object]:
checks = {
"upload_dir": UPLOAD_DIR.exists(),
}
try:
member_count = get_member_count()
checks["database"] = True
except Exception:
member_count = None
checks["database"] = False
status = "ok" if all(checks.values()) else "degraded"
return {
"status": status,
"checks": checks,
"member_count": member_count,
"timestamp": datetime.utcnow().isoformat() + "Z",
}
@app.post("/api/mock-login")
def mock_login(username: str = Form(...), password: str = Form(...)) -> dict[str, object]:
if not MOCK_LOGIN_ENABLED:
raise HTTPException(status_code=403, detail="Mock login is disabled.")
if not username.strip() or not password.strip():
raise HTTPException(status_code=400, detail="Username and password are required.")
return {
"user": {
"username": username.strip(),
"display_name": username.strip(),
"role": "admin",
},
"session_expires_at": datetime.utcnow().isoformat() + "Z",
}
@app.get("/api/members")
def list_members() -> dict[str, list[dict[str, object]]]:
return {"items": fetch_members()}
@app.post("/api/members")
def create_member(payload: MemberPayload) -> dict[str, object]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COALESCE(MAX(sort_order), -1) + 1 AS next_order FROM members")
next_order = int(cur.fetchone()["next_order"])
cur.execute(
"""
INSERT INTO members (
name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url, sort_order
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url,
sort_order, created_at, updated_at
""",
serialize_member_payload(payload, payload.sort_order if payload.sort_order is not None else next_order),
)
member = cur.fetchone()
conn.commit()
return {"item": member}
@app.put("/api/members/bulk-sync")
def bulk_sync_members(payload: MemberBulkPayload) -> dict[str, list[dict[str, object]]]:
return {"items": replace_members(payload.items)}
@app.put("/api/members/{member_id}")
def update_member(member_id: int, payload: MemberPayload) -> dict[str, object]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE members
SET name = %s,
employee_id = %s,
company = %s,
rank = %s,
role = %s,
department = %s,
grp = %s,
division = %s,
team = %s,
cell = %s,
work_status = %s,
work_time = %s,
phone = %s,
email = %s,
seat_label = %s,
photo_url = %s,
sort_order = COALESCE(%s, sort_order),
updated_at = NOW()
WHERE id = %s
RETURNING id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url,
sort_order, created_at, updated_at
""",
(*serialize_member_payload(payload, payload.sort_order or 0)[:-1], payload.sort_order, member_id),
)
member = cur.fetchone()
if member is None:
raise HTTPException(status_code=404, detail="Member not found.")
conn.commit()
return {"item": member}
@app.delete("/api/members/{member_id}")
def delete_member(member_id: int) -> dict[str, bool]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM members WHERE id = %s", (member_id,))
deleted = cur.rowcount > 0
conn.commit()
if not deleted:
raise HTTPException(status_code=404, detail="Member not found.")
return {"ok": True}
@app.post("/api/members/import")
async def import_members(file: UploadFile = File(...)) -> dict[str, list[dict[str, object]]]:
content = await file.read()
items = parse_import_rows(file, content)
return {"items": replace_members(items)}
@app.post("/api/uploads/profile-photo")
def upload_profile_photo(file: UploadFile = File(...), member_name: str = Form("")) -> dict[str, str]:
suffix = Path(file.filename or "").suffix.lower()
if suffix not in {".png", ".jpg", ".jpeg", ".webp", ".gif"}:
raise HTTPException(status_code=400, detail="Only image files are allowed.")
stem = member_name.strip().replace(" ", "-") or "member"
filename = f"{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}"
target = UPLOAD_DIR / filename
with target.open("wb") as out_file:
shutil.copyfileobj(file.file, out_file)
return {"url": f"/uploads/{filename}"}
@app.post("/api/uploads/seat-map-image")
def upload_seat_map_image(file: UploadFile = File(...), seat_map_name: str = Form("")) -> dict[str, str]:
suffix = Path(file.filename or "").suffix.lower()
if suffix not in {".png", ".jpg", ".jpeg", ".webp", ".gif"}:
raise HTTPException(status_code=400, detail="Only image files are allowed.")
stem = seat_map_name.strip().replace(" ", "-") or "seat-map"
filename = f"seat-map-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}"
target = UPLOAD_DIR / filename
with target.open("wb") as out_file:
shutil.copyfileobj(file.file, out_file)
return {"url": f"/uploads/{filename}"}
@app.post("/api/seat-maps/dxf")
async def create_dxf_seat_map(file: UploadFile = File(...), name: str = Form(...)) -> dict[str, object]:
suffix = Path(file.filename or "").suffix.lower()
if suffix != ".dxf":
raise HTTPException(status_code=400, detail="DXF 파일만 업로드할 수 있습니다.")
stem = name.strip().replace(" ", "-") or "seat-map"
filename = f"seat-map-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}"
target = UPLOAD_DIR / filename
content = await file.read()
with target.open("wb") as out_file:
out_file.write(content)
try:
metadata, slots = parse_dxf_layout(target)
except Exception:
raise
payload = SeatMapPayload(
name=name.strip(),
source_type="dxf",
source_url=f"/uploads/{filename}",
image_url="",
preview_svg=metadata["preview_svg"],
view_box_min_x=metadata["view_box_min_x"],
view_box_min_y=metadata["view_box_min_y"],
view_box_width=metadata["view_box_width"],
view_box_height=metadata["view_box_height"],
image_width=None,
image_height=None,
grid_rows=1,
grid_cols=1,
cell_gap=0,
is_active=True,
)
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE")
cur.execute(
"""
INSERT INTO seat_maps (
name, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id, name, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_url, image_width, image_height, grid_rows, grid_cols,
cell_gap, is_active, created_at, updated_at
""",
serialize_seat_map_payload(payload),
)
seat_map = cur.fetchone()
for slot in slots:
cur.execute(
"""
INSERT INTO seat_slots (seat_map_id, slot_key, label, x, y, rotation, layer_name)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
seat_map["id"],
slot["slot_key"],
slot["label"],
slot["x"],
slot["y"],
slot["rotation"],
slot["layer_name"],
),
)
conn.commit()
return fetch_seat_layout(int(seat_map["id"]))
@app.get("/api/seat-maps/active")
def get_active_seat_map() -> dict[str, dict[str, object]]:
seat_map = fetch_active_seat_map()
if seat_map is None:
raise HTTPException(status_code=404, detail="Active seat map not found.")
return {"item": seat_map}
@app.post("/api/seat-maps")
def create_seat_map(payload: SeatMapPayload) -> dict[str, dict[str, object]]:
with get_conn() as conn:
with conn.cursor() as cur:
if payload.is_active:
cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE")
cur.execute(
"""
INSERT INTO seat_maps (
name, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_url, image_width, image_height, grid_rows, grid_cols, cell_gap, is_active
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id, name, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_url, image_width, image_height, grid_rows, grid_cols,
cell_gap, is_active, created_at, updated_at
""",
serialize_seat_map_payload(payload),
)
seat_map = cur.fetchone()
conn.commit()
return {"item": seat_map}
@app.put("/api/seat-maps/{seat_map_id}")
def update_seat_map(seat_map_id: int, payload: SeatMapPayload) -> dict[str, dict[str, object]]:
with get_conn() as conn:
with conn.cursor() as cur:
if payload.source_type != "dxf":
cur.execute(
"""
SELECT COUNT(*) AS count
FROM seat_positions
WHERE seat_map_id = %s
AND (row_index >= %s OR col_index >= %s)
""",
(seat_map_id, payload.grid_rows, payload.grid_cols),
)
out_of_bounds_count = int(cur.fetchone()["count"])
if out_of_bounds_count > 0:
raise HTTPException(status_code=400, detail="현재 배치된 좌석이 새 그리드 범위를 벗어납니다. 먼저 좌석 배치를 정리하세요.")
if payload.is_active:
cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE AND id <> %s", (seat_map_id,))
cur.execute(
"""
UPDATE seat_maps
SET name = %s,
source_type = %s,
source_url = %s,
preview_svg = %s,
view_box_min_x = %s,
view_box_min_y = %s,
view_box_width = %s,
view_box_height = %s,
image_url = %s,
image_width = %s,
image_height = %s,
grid_rows = %s,
grid_cols = %s,
cell_gap = %s,
is_active = %s,
updated_at = NOW()
WHERE id = %s
RETURNING id, name, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_url, image_width, image_height, grid_rows, grid_cols,
cell_gap, is_active, created_at, updated_at
""",
(*serialize_seat_map_payload(payload), seat_map_id),
)
seat_map = cur.fetchone()
if seat_map is None:
raise HTTPException(status_code=404, detail="Seat map not found.")
conn.commit()
return {"item": seat_map}
@app.get("/api/seat-maps/{seat_map_id}/layout")
def get_seat_layout(seat_map_id: int) -> dict[str, object]:
return fetch_seat_layout(seat_map_id)
@app.put("/api/seat-maps/{seat_map_id}/layout")
def update_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> dict[str, list[dict[str, object]]]:
return {"items": save_seat_layout(seat_map_id, payload)}
@app.get("/legacy/organization")
def legacy_organization() -> FileResponse:
target = LEGACY_DIR / "DashBoard-organization.html"
if not target.exists():
raise HTTPException(status_code=404, detail="Legacy dashboard file not found.")
return FileResponse(target)
@app.get("/legacy/organization-backup")
def legacy_organization_backup() -> FileResponse:
target = LEGACY_DIR / "DashBoard-organization-backup.html"
if not target.exists():
raise HTTPException(status_code=404, detail="Legacy dashboard backup not found.")
return FileResponse(target)
@app.get("/uploads/{filename}")
def get_upload(filename: str) -> FileResponse:
target = UPLOAD_DIR / filename
if not target.exists():
raise HTTPException(status_code=404, detail="Upload not found.")
return FileResponse(target)