backup: save fixed office seatmap snapshot

This commit is contained in:
hyunho
2026-03-26 09:42:25 +09:00
parent e62a6a5458
commit 6f5e61ca1a
15 changed files with 2042 additions and 224 deletions

File diff suppressed because one or more lines are too long

View File

@@ -1,18 +1,22 @@
from __future__ import annotations
from datetime import datetime
from pathlib import Path
import csv
import base64
from datetime import datetime
from io import BytesIO, StringIO
import json
import math
from pathlib import Path
import re
import shutil
import struct
import uuid
import ezdxf
from ezdxf import recover
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from openpyxl import load_workbook
from pydantic import BaseModel, Field
@@ -32,6 +36,10 @@ app.add_middleware(
)
LEGACY_STATIC_DIR = LEGACY_DIR / "static"
FIXED_OFFICE_SOURCE_KEY = "technical-development-center"
FIXED_OFFICE_NAME = "기술개발센터"
FIXED_OFFICE_TEMPLATE_PATH = Path(__file__).with_name("center_chair_viewer_template.html")
_fixed_office_cache: dict[str, object] | None = None
class MemberPayload(BaseModel):
@@ -239,6 +247,112 @@ def fetch_active_seat_map() -> dict[str, object] | None:
return cur.fetchone()
def ensure_fixed_office_seat_map(activate: bool = True) -> dict[str, object]:
template = parse_fixed_office_template()
slots = template["slots"]
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id
FROM seat_maps
WHERE source_type = 'fixed_html'
AND source_url = %s
LIMIT 1
""",
(FIXED_OFFICE_SOURCE_KEY,),
)
row = cur.fetchone()
if activate:
cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE")
if row is None:
cur.execute(
"""
INSERT INTO seat_maps (
name, image_url, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_width, image_height, grid_rows, grid_cols, cell_gap, is_active
)
VALUES (%s, '', 'fixed_html', %s, '', NULL, NULL, NULL, NULL, NULL, NULL, 1, 1, 0, %s)
RETURNING id
""",
(FIXED_OFFICE_NAME, FIXED_OFFICE_SOURCE_KEY, activate),
)
seat_map_id = int(cur.fetchone()["id"])
else:
seat_map_id = int(row["id"])
cur.execute(
"""
UPDATE seat_maps
SET name = %s,
source_type = 'fixed_html',
source_url = %s,
image_url = '',
preview_svg = '',
grid_rows = 1,
grid_cols = 1,
cell_gap = 0,
is_active = %s,
updated_at = NOW()
WHERE id = %s
""",
(FIXED_OFFICE_NAME, FIXED_OFFICE_SOURCE_KEY, activate, seat_map_id),
)
cur.execute("SELECT id, slot_key FROM seat_slots WHERE seat_map_id = %s", (seat_map_id,))
existing_slots = {str(item["slot_key"]): int(item["id"]) for item in cur.fetchall()}
incoming_keys = {str(slot["slot_key"]) for slot in slots}
for slot in slots:
slot_key = str(slot["slot_key"])
if slot_key in existing_slots:
cur.execute(
"""
UPDATE seat_slots
SET label = %s, x = %s, y = %s, rotation = %s, layer_name = %s, updated_at = NOW()
WHERE seat_map_id = %s AND slot_key = %s
""",
(
slot["label"],
slot["x"],
slot["y"],
slot["rotation"],
slot["layer_name"],
seat_map_id,
slot_key,
),
)
else:
cur.execute(
"""
INSERT INTO seat_slots (seat_map_id, slot_key, label, x, y, rotation, layer_name)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
seat_map_id,
slot_key,
slot["label"],
slot["x"],
slot["y"],
slot["rotation"],
slot["layer_name"],
),
)
if existing_slots:
stale_keys = [key for key in existing_slots if key not in incoming_keys]
if stale_keys:
cur.execute(
"DELETE FROM seat_slots WHERE seat_map_id = %s AND slot_key = ANY(%s)",
(seat_map_id, stale_keys),
)
conn.commit()
seat_map = fetch_seat_map(seat_map_id)
if seat_map is None:
raise HTTPException(status_code=500, detail="Fixed office seat map initialization failed.")
return seat_map
def compute_seat_label(row_index: int, col_index: int) -> str:
quotient = row_index
row_label = ""
@@ -255,6 +369,63 @@ def compute_slot_label(index: int) -> str:
return f"CHAIR-{index + 1:03d}"
def decode_segment_values(raw_base64: str) -> list[int]:
decoded = base64.b64decode(raw_base64.encode("ascii"))
if not decoded:
return []
return [item[0] for item in struct.iter_unpack("<i", decoded)]
def parse_fixed_office_template() -> dict[str, object]:
global _fixed_office_cache
if _fixed_office_cache is not None:
return _fixed_office_cache
if not FIXED_OFFICE_TEMPLATE_PATH.exists():
raise HTTPException(status_code=500, detail="Fixed office viewer template not found.")
html = FIXED_OFFICE_TEMPLATE_PATH.read_text(encoding="utf-8")
match = re.search(r"const DATA = (\{.*?\});\n\s*function decodeSegments", html, flags=re.S)
if not match:
raise HTTPException(status_code=500, detail="Fixed office viewer data not found.")
data = json.loads(match.group(1))
chair_values = decode_segment_values(str(data["chairSegsB64"]))
slots: list[dict[str, object]] = []
for index, chair in enumerate(data["chairs"]):
slot_key, name, _kind, start, count = chair
min_x = math.inf
min_y = math.inf
max_x = -math.inf
max_y = -math.inf
start_index = int(start)
end_index = start_index + int(count)
for item_index in range(start_index, end_index):
offset = item_index * 4
x1 = chair_values[offset] / 10
y1 = chair_values[offset + 1] / 10
x2 = chair_values[offset + 2] / 10
y2 = chair_values[offset + 3] / 10
min_x = min(min_x, x1, x2)
min_y = min(min_y, y1, y2)
max_x = max(max_x, x1, x2)
max_y = max(max_y, y1, y2)
slots.append(
{
"slot_key": str(slot_key),
"label": str(slot_key),
"x": round((min_x + max_x) / 2, 3),
"y": round((min_y + max_y) / 2, 3),
"rotation": 0.0,
"layer_name": str(name),
}
)
_fixed_office_cache = {
"html": html,
"data": data,
"slots": slots,
}
return _fixed_office_cache
def is_chair_layer(layer_name: str) -> bool:
raw = layer_name.strip().lower()
compact = raw.replace("-", "").replace("_", "").replace(" ", "")
@@ -515,12 +686,13 @@ def build_dxf_preview_svg(
)
def parse_dxf_layout(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]]]:
def load_dxf_document(file_path: Path) -> ezdxf.document.Drawing:
try:
document = ezdxf.readfile(file_path)
return ezdxf.readfile(file_path)
except OSError:
try:
document, _ = recover.readfile(file_path)
return document
except Exception as exc:
kind, preview = inspect_dxf_header(file_path)
if kind == "binary_dxf":
@@ -538,10 +710,65 @@ def parse_dxf_layout(file_path: Path) -> tuple[dict[str, object], list[dict[str,
status_code=400,
detail=f"ASCII DXF로 보이지만 구조를 해석하지 못했습니다. 도면을 다른 DXF 버전으로 다시 저장해보세요. 헤더={preview}",
) from exc
raise HTTPException(
status_code=400,
detail=f"업로드한 파일 형식을 판별하지 못했습니다. 확장자만 dxf인 파일일 수 있습니다. 헤더={preview}",
) from exc
raise HTTPException(
status_code=400,
detail=f"업로드한 파일 형식을 판별하지 못했습니다. 확장자만 dxf인 파일일 수 있습니다. 헤더={preview}",
) from exc
def entity_to_segments(entity: ezdxf.entities.DXFGraphic, arc_steps: int = 24) -> list[tuple[float, float, float, float]]:
entity_type = entity.dxftype()
points = get_entity_points(entity)
if entity_type in {"LINE", "LWPOLYLINE", "POLYLINE", "SPLINE", "ELLIPSE"} and len(points) >= 2:
return [
(float(left[0]), float(left[1]), float(right[0]), float(right[1]))
for left, right in zip(points[:-1], points[1:])
]
if entity_type == "CIRCLE":
center = entity.dxf.center
radius = float(entity.dxf.radius)
samples = []
for index in range(arc_steps + 1):
angle = (math.tau * index) / arc_steps
samples.append(
(
float(center.x) + radius * math.cos(angle),
float(center.y) + radius * math.sin(angle),
)
)
return [
(float(left[0]), float(left[1]), float(right[0]), float(right[1]))
for left, right in zip(samples[:-1], samples[1:])
]
if entity_type == "ARC":
center = entity.dxf.center
radius = float(entity.dxf.radius)
start_angle = math.radians(float(entity.dxf.start_angle))
end_angle = math.radians(float(entity.dxf.end_angle))
if end_angle <= start_angle:
end_angle += math.tau
samples = []
for index in range(arc_steps + 1):
ratio = index / arc_steps
angle = start_angle + (end_angle - start_angle) * ratio
samples.append(
(
float(center.x) + radius * math.cos(angle),
float(center.y) + radius * math.sin(angle),
)
)
return [
(float(left[0]), float(left[1]), float(right[0]), float(right[1]))
for left, right in zip(samples[:-1], samples[1:])
]
return []
def build_dxf_artifacts(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]], dict[str, object]]:
document = load_dxf_document(file_path)
modelspace = document.modelspace()
base_entities = [entity for entity in modelspace if entity.dxftype() in {"LINE", "LWPOLYLINE", "POLYLINE", "CIRCLE", "ARC", "INSERT", "SPLINE", "ELLIPSE"}]
all_entities: list[ezdxf.entities.DXFGraphic] = []
@@ -615,6 +842,73 @@ def parse_dxf_layout(file_path: Path) -> tuple[dict[str, object], list[dict[str,
"image_height": None,
"cell_gap": 0,
}
slot_map = {str(slot["slot_key"]): slot for slot in slots}
chair_segments: list[list[float]] = []
chair_items: list[dict[str, object]] = []
background_segments: list[list[float]] = []
for entity in visible_entities:
segments = entity_to_segments(entity)
if not segments:
continue
if is_chair_layer(entity.dxf.layer):
slot = slot_map.get(str(entity.dxf.handle))
if not slot:
continue
start_index = len(chair_segments)
min_seg_x = math.inf
min_seg_y = math.inf
max_seg_x = -math.inf
max_seg_y = -math.inf
for x1, y1, x2, y2 in segments:
chair_segments.append([round(x1, 3), round(y1, 3), round(x2, 3), round(y2, 3)])
min_seg_x = min(min_seg_x, x1, x2)
min_seg_y = min(min_seg_y, y1, y2)
max_seg_x = max(max_seg_x, x1, x2)
max_seg_y = max(max_seg_y, y1, y2)
chair_items.append(
{
"key": str(slot["slot_key"]),
"label": slot["label"],
"kind": "chair",
"start": start_index,
"count": len(segments),
"min_x": round(min_seg_x, 3),
"min_y": round(min_seg_y, 3),
"max_x": round(max_seg_x, 3),
"max_y": round(max_seg_y, 3),
}
)
continue
for x1, y1, x2, y2 in segments:
background_segments.append([round(x1, 3), round(y1, 3), round(x2, 3), round(y2, 3)])
viewer_data = {
"meta": {
"background_segment_count": len(background_segments),
"chair_count": len(chair_items),
"chair_segment_count": len(chair_segments),
"world": {
"min_x": round(min_x, 3),
"min_y": round(min_y, 3),
"max_x": round(min_x + width, 3),
"max_y": round(min_y + height, 3),
"width": round(width, 3),
"height": round(height, 3),
},
},
"background_segments": background_segments,
"chair_segments": chair_segments,
"chairs": chair_items,
}
return metadata, slots, viewer_data
def parse_dxf_layout(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]]]:
metadata, slots, _viewer_data = build_dxf_artifacts(file_path)
return metadata, slots
@@ -660,14 +954,101 @@ def fetch_seat_layout(seat_map_id: int) -> dict[str, object]:
(seat_map_id,),
)
placements = cur.fetchall()
viewer_data: dict[str, object] | None = None
if seat_map["source_type"] == "fixed_html" and seat_map.get("source_url") == FIXED_OFFICE_SOURCE_KEY:
template = parse_fixed_office_template()
viewer_data = {
"meta": {
"chair_count": len(template["slots"]),
"office": FIXED_OFFICE_NAME,
}
}
elif seat_map["source_type"] == "dxf" and seat_map.get("source_url"):
filename = Path(str(seat_map["source_url"])).name
source_path = UPLOAD_DIR / filename
if source_path.exists():
try:
_metadata, _slots, viewer_data = build_dxf_artifacts(source_path)
except Exception:
viewer_data = None
return {
"seat_map": seat_map,
"members": members,
"slots": slots,
"placements": placements,
"viewer_data": viewer_data,
}
def build_center_chair_viewer_html(layout: dict[str, object]) -> str:
slot_key_by_id = {
int(slot["id"]): str(slot["slot_key"])
for slot in layout.get("slots", [])
if slot.get("id") is not None and slot.get("slot_key") is not None
}
placed_keys: list[str] = []
for placement in layout.get("placements", []):
slot_id = placement.get("seat_slot_id")
if slot_id is None:
continue
slot_key = slot_key_by_id.get(int(slot_id))
if slot_key:
placed_keys.append(slot_key)
seat_map = layout.get("seat_map") or {}
placed_literal = json.dumps(sorted(set(placed_keys)), ensure_ascii=False, separators=(",", ":"))
if seat_map.get("source_type") == "fixed_html":
html = parse_fixed_office_template()["html"]
else:
viewer_data = layout.get("viewer_data")
if not isinstance(viewer_data, dict):
raise HTTPException(status_code=404, detail="DXF viewer data not found.")
template_path = Path(__file__).with_name("center_chair_viewer_template.html")
if not template_path.exists():
raise HTTPException(status_code=500, detail="Viewer template not found.")
html = template_path.read_text(encoding="utf-8")
data_literal = json.dumps(viewer_data, ensure_ascii=False, separators=(",", ":"))
html = re.sub(
r"const DATA = .*?;\n\s*function decodeSegments",
f"const DATA = {data_literal};\n function decodeSegments",
html,
count=1,
flags=re.S,
)
html = html.replace(
'const STORAGE_KEY = "ptc-chair-selection";\n const placed = new Set(JSON.parse(localStorage.getItem(STORAGE_KEY) || "[]"));',
f"const STORAGE_KEY = null;\n const placed = new Set({placed_literal});",
1,
)
html = html.replace(
"function persistPlaced() {\n localStorage.setItem(STORAGE_KEY, JSON.stringify([...placed]));\n }",
"function persistPlaced() {\n return;\n }",
1,
)
bridge_script = """
<script>
window.__mhSeatmap = {
getCanvas() { return document.getElementById("canvas"); },
pickChairAt(x, y) { return typeof pickChair === "function" ? pickChair(x, y) : null; },
setPlaced(keys) {
placed.clear();
(keys || []).forEach((key) => placed.add(String(key)));
if (typeof requestDraw === "function") requestDraw();
}
};
window.addEventListener("message", (event) => {
const data = event.data;
if (!data || typeof data !== "object") return;
if (data.type === "seatmap-set-placed") {
window.__mhSeatmap.setPlaced(Array.isArray(data.keys) ? data.keys : []);
}
});
</script>
"""
html = html.replace("</body>", f"{bridge_script}\n</body>", 1)
return html
def save_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> list[dict[str, object]]:
seat_map = fetch_seat_map(seat_map_id)
if seat_map is None:
@@ -676,11 +1057,11 @@ def save_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> list[dict[
member_ids: list[int] = []
occupied_cells: set[tuple[int, int]] = set()
occupied_slots: set[int] = set()
is_dxf = seat_map["source_type"] == "dxf"
requires_slot = seat_map["source_type"] in {"dxf", "fixed_html"}
for item in payload.placements:
if is_dxf:
if requires_slot:
if item.seat_slot_id is None:
raise HTTPException(status_code=400, detail="DXF 자리배치도는 seat_slot_id가 필요합니다.")
raise HTTPException(status_code=400, detail="고정 도면 자리배치도는 seat_slot_id가 필요합니다.")
if item.seat_slot_id in occupied_slots:
raise HTTPException(status_code=400, detail="같은 좌석에 둘 이상의 구성원을 배치할 수 없습니다.")
occupied_slots.add(item.seat_slot_id)
@@ -705,7 +1086,7 @@ def save_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> list[dict[
if missing_ids:
raise HTTPException(status_code=400, detail=f"존재하지 않는 구성원 ID가 포함되어 있습니다: {missing_ids}")
if is_dxf:
if requires_slot:
slot_ids = sorted(occupied_slots)
cur.execute("SELECT id FROM seat_slots WHERE seat_map_id = %s AND id = ANY(%s)", (seat_map_id, slot_ids))
existing_slot_ids = {int(row["id"]) for row in cur.fetchall()}
@@ -721,7 +1102,7 @@ def save_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> list[dict[
for item in payload.placements:
seat_label = item.seat_label.strip() or (
slot_label_map.get(int(item.seat_slot_id), f"SLOT-{item.seat_slot_id}")
if is_dxf and item.seat_slot_id is not None
if requires_slot and item.seat_slot_id is not None
else compute_seat_label(item.row_index, item.col_index)
)
cur.execute(
@@ -739,7 +1120,7 @@ def save_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> list[dict[
(
item.member_id,
seat_map_id,
item.seat_slot_id if is_dxf else None,
item.seat_slot_id if requires_slot else None,
item.row_index,
item.col_index,
seat_label,
@@ -1076,7 +1457,7 @@ async def create_dxf_seat_map(file: UploadFile = File(...), name: str = Form(...
@app.get("/api/seat-maps/active")
def get_active_seat_map() -> dict[str, dict[str, object]]:
seat_map = fetch_active_seat_map()
seat_map = ensure_fixed_office_seat_map(activate=True)
if seat_map is None:
raise HTTPException(status_code=404, detail="Active seat map not found.")
return {"item": seat_map}
@@ -1166,6 +1547,15 @@ def get_seat_layout(seat_map_id: int) -> dict[str, object]:
return fetch_seat_layout(seat_map_id)
@app.get("/api/seat-maps/{seat_map_id}/viewer")
def get_seat_map_viewer(seat_map_id: int) -> HTMLResponse:
layout = fetch_seat_layout(seat_map_id)
seat_map = layout.get("seat_map") or {}
if seat_map.get("source_type") not in {"dxf", "fixed_html"}:
raise HTTPException(status_code=400, detail="Viewer is only available for supported seat maps.")
return HTMLResponse(build_center_chair_viewer_html(layout))
@app.put("/api/seat-maps/{seat_map_id}/layout")
def update_seat_layout(seat_map_id: int, payload: SeatLayoutPayload) -> dict[str, list[dict[str, object]]]:
return {"items": save_seat_layout(seat_map_id, payload)}