from __future__ import annotations
import csv
import base64
from datetime import date, datetime, time, timedelta, timezone
import hashlib
import hmac
from io import BytesIO, StringIO
import json
import math
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
import re
import secrets
import shutil
import struct
import unicodedata
import uuid
import ezdxf
from ezdxf import recover
from fastapi import FastAPI, File, Form, Header, HTTPException, Request, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, Response
from fastapi.staticfiles import StaticFiles
from openpyxl import load_workbook
from pydantic import BaseModel, Field
from .config import BASE_DIR, LEGACY_DIR, MOCK_LOGIN_ENABLED, UPLOAD_DIR
from .db import get_conn, init_db
app = FastAPI(title="MH Dashboard Organization API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
LEGACY_STATIC_DIR = LEGACY_DIR / "static"
INCOMING_FILES_DIR = BASE_DIR / "incoming-files"
INCOMING_SERVED_DIR = INCOMING_FILES_DIR / "served"
INCOMING_REFERENCE_DIR = INCOMING_FILES_DIR / "reference"
BUSINESS_DASHBOARD_DIR = INCOMING_FILES_DIR / "사업관리대장"
BUSINESS_DASHBOARD_WRAPPER_PATH = BUSINESS_DASHBOARD_DIR / "MH 통합 대시보드_260320.html"
BUSINESS_DASHBOARD_THEME_CSS = BUSINESS_DASHBOARD_DIR / "MH 통합 대시보드_260320.css"
FIXED_OFFICE_SOURCE_KEY = "technical-development-center"
FIXED_OFFICE_CONFIGS = {
"technical-development-center": {
"name": "기술개발센터",
"html_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_map.html",
"payload_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_payload.js",
},
"hanmac-building-6f": {
"name": "한맥빌딩 6층",
"html_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_map_6f.html",
"payload_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_payload_6f.js",
},
"hanmac-building-7f": {
"name": "한맥빌딩 7층",
"html_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_map_7f.html",
"payload_path": INCOMING_FILES_DIR / "seat" / "center_chair_people_payload_7f.js",
},
}
_fixed_office_cache: dict[str, dict[str, object]] = {}
_business_ledger_html_cache: str | None = None
BUSINESS_LEDGER_DEFAULT_SOURCE_KEY = "business_ledger_default"
AUTH_DEFAULT_PASSWORD = "1111"
AUTH_PASSWORD_ITERATIONS = 390000
AUTH_SESSION_HOURS = 12
APP_TIMEZONE = timezone(timedelta(hours=9))
PAYMENT_HEADER_ORDER = [
"상신회사", "청구일", "발행일", "발행월", "계정코드", "관리계정코드", "각사 계정명", "프로젝트코드",
"사업명", "사업명(표출PJT)", "사업명(인트라넷기준)", "사업분야", "세부분야", "기획/개발/영업",
"대분류", "중분류", "소분류", "부서명", "팀명", "거래처", "적요", "차변공급가", "대변공급가",
"지출", "수입", "특이사항", "구분", "프로젝트성격", "", "", "", "", "", "", "", "", ""
]
MH_HEADER_ORDER = [
"No", "근무일자", "주말/지각", "팀 분류", "팀", "사원번호", "이름", "직책", "user_state", "시차시간",
"사업 종류", "메인업무 프로젝트 코드", "메인업무 프로젝트명", "메인업무 서브 코드", "메인업무 근무시간", "검토",
"사업 종류", "추가업무1 프로젝트 코드", "추가업무1 프로젝트명", "추가업무1 서브 코드", "추가업무1 근무시간",
"사업 종류", "추가업무2 프로젝트 코드", "추가업무2 프로젝트명", "추가업무2 서브 코드", "추가업무2 근무시간",
"사업 종류", "추가업무3 프로젝트 코드", "추가업무3 프로젝트명", "추가업무3 서브 코드", "추가업무3 근무시간",
"사업 종류", "추가업무4 프로젝트 코드", "추가업무4 프로젝트명", "추가업무4 서브 코드", "추가업무4 근무시간",
"사업 종류", "추가업무5 프로젝트 코드", "추가업무5 프로젝트명", "추가업무5 서브 코드", "추가업무5 근무시간",
"사업 종류", "연장근무 프로젝트 코드", "연장근무 프로젝트명", "연장근무 서브코드", "연장근무 시간(실제)", "연장근무 시간(가공)"
]
def build_business_ledger_html() -> str:
global _business_ledger_html_cache
if _business_ledger_html_cache is not None:
return _business_ledger_html_cache
if not BUSINESS_DASHBOARD_WRAPPER_PATH.exists():
raise FileNotFoundError("Business dashboard wrapper file not found.")
source = BUSINESS_DASHBOARD_WRAPPER_PATH.read_text(encoding="utf-8-sig")
match = re.search(r"const BUSINESS_HTML_B64='([^']+)';", source)
if not match:
raise ValueError("Embedded business ledger source was not found.")
decoded = base64.b64decode(match.group(1)).decode("utf-8")
head_injection = (
''
''
''
)
html = decoded.replace("", f"{head_injection}", 1)
html = html.replace("
", '', 1)
html = html.replace("", '', 1)
_business_ledger_html_cache = html
return html
def sync_default_business_ledger_source(cur) -> None:
if not BUSINESS_DASHBOARD_DIR.exists():
return
candidates = [
BUSINESS_DASHBOARD_DIR / "사업관리대장-1.xlsx",
BUSINESS_DASHBOARD_DIR / "사업관리 대장-1.xlsx",
BUSINESS_DASHBOARD_DIR / "사업관리대장.xlsx",
BUSINESS_DASHBOARD_DIR / "사업관리 대장.xlsx",
]
source_path = next((candidate for candidate in candidates if candidate.exists()), None)
if source_path is None:
return
content = source_path.read_bytes()
content_sha256 = hashlib.sha256(content).hexdigest()
meta_json = {
"byte_size": len(content),
"source_path": str(source_path),
"synced_from": "startup",
}
cur.execute(
"""
INSERT INTO integration_binary_sources (
source_key, source_name, filename, mime_type, content, content_sha256, meta_json, imported_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s::jsonb, NOW())
ON CONFLICT (source_key) DO UPDATE
SET source_name = EXCLUDED.source_name,
filename = EXCLUDED.filename,
mime_type = EXCLUDED.mime_type,
content = EXCLUDED.content,
content_sha256 = EXCLUDED.content_sha256,
meta_json = EXCLUDED.meta_json,
imported_at = NOW()
WHERE integration_binary_sources.content_sha256 IS DISTINCT FROM EXCLUDED.content_sha256
OR integration_binary_sources.filename IS DISTINCT FROM EXCLUDED.filename
OR integration_binary_sources.mime_type IS DISTINCT FROM EXCLUDED.mime_type
OR integration_binary_sources.meta_json IS DISTINCT FROM EXCLUDED.meta_json
""",
(
BUSINESS_LEDGER_DEFAULT_SOURCE_KEY,
"사업관리대장 기본 원본",
source_path.name,
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
content,
content_sha256,
json.dumps(meta_json, ensure_ascii=False),
),
)
app.mount(
"/integrations/ledger-assets",
StaticFiles(directory=str(BUSINESS_DASHBOARD_DIR), check_dir=False),
name="integration-ledger-assets",
)
class MemberPayload(BaseModel):
id: int | None = None
name: str = Field(min_length=1)
employee_id: str = ""
company: str = ""
rank: str = ""
role: str = ""
department: str = ""
grp: str = ""
division: str = ""
team: str = ""
cell: str = ""
work_status: str = ""
work_time: str = ""
phone: str = ""
email: str = ""
seat_label: str = ""
photo_url: str = ""
sort_order: int | None = None
class MemberBulkPayload(BaseModel):
items: list[MemberPayload]
class SeatMapPayload(BaseModel):
name: str = Field(min_length=1)
image_url: str = ""
source_type: str = "image"
source_url: str = ""
preview_svg: str = ""
view_box_min_x: float | None = None
view_box_min_y: float | None = None
view_box_width: float | None = None
view_box_height: float | None = None
image_width: int | None = None
image_height: int | None = None
grid_rows: int = Field(default=1, ge=1, le=200)
grid_cols: int = Field(default=1, ge=1, le=200)
cell_gap: int = Field(default=0, ge=0, le=24)
is_active: bool = True
class SeatPlacementPayload(BaseModel):
member_id: int
seat_slot_id: int | None = None
row_index: int = Field(default=0, ge=0)
col_index: int = Field(default=0, ge=0)
seat_label: str = ""
class SeatLayoutPayload(BaseModel):
placements: list[SeatPlacementPayload]
LEGACY_HEADER_MAP = {
"이름": "name",
"name": "name",
"tag": "employee_id",
"employee_id": "employee_id",
"소속회사": "company",
"co": "company",
"company": "company",
"직급": "rank",
"rank": "rank",
"직책": "role",
"pos": "role",
"role": "role",
"부서": "department",
"part": "department",
"department": "department",
"그룹": "grp",
"gr": "grp",
"grp": "grp",
"디비전": "division",
"div": "division",
"division": "division",
"팀": "team",
"team": "team",
"teal": "team",
"셀": "cell",
"cell": "cell",
"근무상태": "work_status",
"work_status": "work_status",
"근무시간": "work_time",
"work_time": "work_time",
"전화번호": "phone",
"ph": "phone",
"phone": "phone",
"이메일": "email",
"mail": "email",
"email": "email",
"자리위치": "seat_label",
"seat_label": "seat_label",
"사진": "photo_url",
"photo_url": "photo_url",
}
def normalize_phone(value: object) -> str:
raw = str(value or "").strip()
digits = "".join(ch for ch in raw if ch.isdigit())
if not digits:
return ""
if len(digits) == 10 and not digits.startswith("0"):
digits = f"0{digits}"
if len(digits) == 11 and digits.startswith("0"):
return f"{digits[:3]}-{digits[3:7]}-{digits[7:]}"
if len(digits) == 10 and digits.startswith("0"):
return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}"
return raw
def serialize_member_payload(item: MemberPayload, sort_order: int) -> tuple[object, ...]:
return (
item.name.strip(),
item.employee_id.strip(),
item.company.strip(),
item.rank.strip(),
item.role.strip(),
item.department.strip(),
item.grp.strip(),
item.division.strip(),
item.team.strip(),
item.cell.strip(),
item.work_status.strip(),
item.work_time.strip(),
normalize_phone(item.phone),
item.email.strip(),
item.seat_label.strip(),
item.photo_url.strip(),
sort_order,
)
def _encode_auth_bytes(value: bytes) -> str:
return base64.urlsafe_b64encode(value).decode("ascii").rstrip("=")
def _decode_auth_bytes(value: str) -> bytes:
padded = value + "=" * (-len(value) % 4)
return base64.urlsafe_b64decode(padded.encode("ascii"))
def hash_password(password: str, *, salt: bytes | None = None) -> str:
actual_salt = salt or secrets.token_bytes(16)
digest = hashlib.pbkdf2_hmac(
"sha256",
password.encode("utf-8"),
actual_salt,
AUTH_PASSWORD_ITERATIONS,
)
return f"pbkdf2_sha256${AUTH_PASSWORD_ITERATIONS}${_encode_auth_bytes(actual_salt)}${_encode_auth_bytes(digest)}"
def verify_password(password: str, stored_hash: str) -> bool:
try:
algorithm, iterations_raw, salt_raw, digest_raw = stored_hash.split("$", 3)
if algorithm != "pbkdf2_sha256":
return False
iterations = int(iterations_raw)
salt = _decode_auth_bytes(salt_raw)
expected = _decode_auth_bytes(digest_raw)
except Exception:
return False
actual = hashlib.pbkdf2_hmac(
"sha256",
password.encode("utf-8"),
salt,
iterations,
)
return hmac.compare_digest(actual, expected)
def serialize_auth_user(user: dict[str, object]) -> dict[str, object]:
return {
"id": int(user["id"]),
"username": str(user.get("username") or ""),
"display_name": str(user.get("display_name") or ""),
"role": str(user.get("role") or "admin"),
"member_id": int(user["member_id"]) if user.get("member_id") is not None else None,
"rank": str(user.get("rank") or ""),
}
def build_auth_session_payload(user: dict[str, object], session_id: uuid.UUID, expires_at: datetime) -> dict[str, object]:
expires_at_text = expires_at.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
return {
"token": str(session_id),
"user": serialize_auth_user(user),
"session_expires_at": expires_at_text,
}
def extract_bearer_token(authorization: str | None) -> str | None:
if not authorization:
return None
scheme, _, token = authorization.partition(" ")
if scheme.lower() != "bearer" or not token.strip():
return None
return token.strip()
def ensure_default_admin_user(cur) -> None:
cur.execute(
"""
INSERT INTO auth.users (
username, password_hash, display_name, role, member_id, is_active, created_from, password_changed_at
)
VALUES (%s, %s, %s, %s, NULL, TRUE, 'seed_admin', NOW())
ON CONFLICT (username) DO UPDATE
SET password_hash = EXCLUDED.password_hash,
display_name = EXCLUDED.display_name,
role = EXCLUDED.role,
is_active = TRUE,
updated_at = NOW()
""",
("1", hash_password("1"), "System Admin", "admin"),
)
def sync_auth_users_from_members(cur) -> None:
cur.execute(
"""
SELECT id, employee_id, name
FROM members
WHERE COALESCE(TRIM(employee_id), '') <> ''
ORDER BY id ASC
"""
)
members = cur.fetchall()
cur.execute(
"""
SELECT id, username, password_hash, display_name, role, member_id, is_active, created_from
FROM auth.users
"""
)
existing_users = cur.fetchall()
existing_by_member_id: dict[int, dict[str, object]] = {}
existing_by_username: dict[str, dict[str, object]] = {}
for user in existing_users:
if user.get("member_id") is not None:
existing_by_member_id[int(user["member_id"])] = user
username = str(user.get("username") or "").strip().lower()
if username:
existing_by_username[username] = user
matched_user_ids: set[int] = set()
seen_usernames: set[str] = set()
default_hash = hash_password(AUTH_DEFAULT_PASSWORD)
for member in members:
member_id = int(member["id"])
username = str(member.get("employee_id") or "").strip().lower()
display_name = str(member.get("name") or "").strip() or username
if username in seen_usernames:
raise HTTPException(status_code=400, detail=f"중복 사번이 있어 로그인 계정을 생성할 수 없습니다: {username}")
seen_usernames.add(username)
existing = existing_by_member_id.get(member_id) or existing_by_username.get(username)
if existing is None:
cur.execute(
"""
INSERT INTO auth.users (
username, password_hash, display_name, role, member_id, is_active, created_from, password_changed_at
)
VALUES (%s, %s, %s, %s, %s, TRUE, 'member_import', NOW())
RETURNING id
""",
(username, default_hash, display_name, "admin", member_id),
)
matched_user_ids.add(int(cur.fetchone()["id"]))
continue
matched_user_ids.add(int(existing["id"]))
password_hash = str(existing.get("password_hash") or "").strip() or default_hash
cur.execute(
"""
UPDATE auth.users
SET username = %s,
password_hash = %s,
display_name = %s,
member_id = %s,
is_active = TRUE,
updated_at = NOW()
WHERE id = %s
""",
(
username,
password_hash,
display_name,
member_id,
int(existing["id"]),
),
)
if matched_user_ids:
cur.execute(
"""
UPDATE auth.users
SET is_active = FALSE,
member_id = NULL,
updated_at = NOW()
WHERE created_from = 'member_import'
AND id <> ALL(%s)
AND member_id IS NOT NULL
""",
(sorted(matched_user_ids),),
)
else:
cur.execute(
"""
UPDATE auth.users
SET is_active = FALSE,
member_id = NULL,
updated_at = NOW()
WHERE created_from = 'member_import'
AND member_id IS NOT NULL
"""
)
ensure_default_admin_user(cur)
def fetch_members() -> list[dict[str, object]]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url,
sort_order, created_at, updated_at
FROM members
ORDER BY sort_order ASC, id ASC
"""
)
return cur.fetchall()
def parse_as_of(as_of: str | None) -> datetime | None:
raw = str(as_of or "").strip()
if not raw:
return None
try:
if "T" in raw:
parsed = datetime.fromisoformat(raw.replace("Z", "+00:00"))
if parsed.tzinfo is None:
return parsed.replace(tzinfo=APP_TIMEZONE)
return parsed
parsed_date = date.fromisoformat(raw)
return datetime.combine(parsed_date, time.max, tzinfo=APP_TIMEZONE)
except ValueError as exc:
raise HTTPException(status_code=400, detail="Invalid as_of format. Use YYYY-MM-DD or ISO datetime.") from exc
def create_history_revision(cur, label_prefix: str, note: str) -> int:
cur.execute(
"""
INSERT INTO history_revisions (scope, revision_label, note)
VALUES ('organization', %s, %s)
RETURNING id
""",
(f"{label_prefix}-{datetime.now(APP_TIMEZONE).strftime('%Y%m%d-%H%M%S-%f')}", note),
)
return int(cur.fetchone()["id"])
def fetch_current_member_state(cur) -> dict[int, dict[str, object]]:
cur.execute(
"""
SELECT id, name, employee_id, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, seat_label, photo_url, sort_order,
created_at, updated_at
FROM members
"""
)
return {int(row["id"]): row for row in cur.fetchall()}
def sync_member_versions(cur, member_ids: list[int], change_reason: str, revision_no: int) -> None:
if not member_ids:
return
unique_ids = sorted(set(int(member_id) for member_id in member_ids))
current_members = fetch_current_member_state(cur)
cur.execute(
"""
SELECT id, member_id, name, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, photo_url, valid_from, valid_to
FROM member_versions
WHERE member_id = ANY(%s)
AND valid_to IS NULL
""",
(unique_ids,),
)
active_versions = {int(row["member_id"]): row for row in cur.fetchall()}
for member_id in unique_ids:
current = current_members.get(member_id)
active = active_versions.get(member_id)
if current is None:
if active is not None:
cur.execute(
"UPDATE member_versions SET valid_to = NOW() WHERE id = %s AND valid_to IS NULL",
(int(active["id"]),),
)
continue
current_tuple = (
str(current.get("name") or ""),
str(current.get("company") or ""),
str(current.get("rank") or ""),
str(current.get("role") or ""),
str(current.get("department") or ""),
str(current.get("grp") or ""),
str(current.get("division") or ""),
str(current.get("team") or ""),
str(current.get("cell") or ""),
str(current.get("work_status") or ""),
str(current.get("work_time") or ""),
str(current.get("phone") or ""),
str(current.get("email") or ""),
str(current.get("photo_url") or ""),
)
active_tuple = None
if active is not None:
active_tuple = (
str(active.get("name") or ""),
str(active.get("company") or ""),
str(active.get("rank") or ""),
str(active.get("role") or ""),
str(active.get("department") or ""),
str(active.get("grp") or ""),
str(active.get("division") or ""),
str(active.get("team") or ""),
str(active.get("cell") or ""),
str(active.get("work_status") or ""),
str(active.get("work_time") or ""),
str(active.get("phone") or ""),
str(active.get("email") or ""),
str(active.get("photo_url") or ""),
)
if active_tuple == current_tuple:
continue
if active is not None:
cur.execute(
"UPDATE member_versions SET valid_to = NOW() WHERE id = %s AND valid_to IS NULL",
(int(active["id"]),),
)
cur.execute(
"""
INSERT INTO member_versions (
member_id, name, company, rank, role, department, grp, division, team, cell,
work_status, work_time, phone, email, photo_url,
valid_from, valid_to, revision_no, changed_by_user_id, change_reason
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NULL, %s, NULL, %s)
""",
(member_id, *current_tuple, revision_no, change_reason),
)
def fetch_current_seat_assignments(cur) -> dict[int, dict[str, object]]:
cur.execute(
"""
SELECT member_id, seat_map_id, seat_slot_id, seat_label, updated_at
FROM seat_positions
"""
)
return {int(row["member_id"]): row for row in cur.fetchall()}
def sync_seat_assignment_versions(cur, member_ids: list[int], change_reason: str, revision_no: int) -> None:
if not member_ids:
return
unique_ids = sorted(set(int(member_id) for member_id in member_ids))
current_assignments = fetch_current_seat_assignments(cur)
cur.execute(
"""
SELECT id, member_id, seat_map_id, seat_slot_id, seat_label
FROM seat_assignment_versions
WHERE member_id = ANY(%s)
AND valid_to IS NULL
""",
(unique_ids,),
)
active_versions = {int(row["member_id"]): row for row in cur.fetchall()}
for member_id in unique_ids:
current = current_assignments.get(member_id)
active = active_versions.get(member_id)
current_tuple = None
if current is not None:
current_tuple = (
current.get("seat_map_id"),
current.get("seat_slot_id"),
str(current.get("seat_label") or ""),
)
active_tuple = None
if active is not None:
active_tuple = (
active.get("seat_map_id"),
active.get("seat_slot_id"),
str(active.get("seat_label") or ""),
)
if active_tuple == current_tuple:
continue
if active is not None:
cur.execute(
"UPDATE seat_assignment_versions SET valid_to = NOW() WHERE id = %s AND valid_to IS NULL",
(int(active["id"]),),
)
if current is None:
continue
cur.execute(
"""
INSERT INTO seat_assignment_versions (
member_id, seat_map_id, seat_slot_id, seat_label,
valid_from, valid_to, revision_no, changed_by_user_id, change_reason
)
VALUES (%s, %s, %s, %s, NOW(), NULL, %s, NULL, %s)
""",
(
member_id,
current.get("seat_map_id"),
current.get("seat_slot_id"),
str(current.get("seat_label") or ""),
revision_no,
change_reason,
),
)
def fetch_members_as_of(cur, as_of: datetime) -> list[dict[str, object]]:
cur.execute(
"""
SELECT mv.member_id AS id,
mv.name,
COALESCE(m.employee_id, '') AS employee_id,
mv.company,
mv.rank,
mv.role,
mv.department,
mv.grp,
mv.division,
mv.team,
mv.cell,
mv.work_status,
mv.work_time,
mv.phone,
mv.email,
COALESCE(sav.seat_label, '') AS seat_label,
mv.photo_url,
COALESCE(m.sort_order, 2147483647) AS sort_order,
mv.created_at,
mv.valid_from AS updated_at,
mv.valid_to AS history_valid_to
FROM member_versions mv
LEFT JOIN members m
ON m.id = mv.member_id
LEFT JOIN seat_assignment_versions sav
ON sav.member_id = mv.member_id
AND sav.valid_from <= %s
AND (sav.valid_to IS NULL OR sav.valid_to > %s)
WHERE mv.valid_from <= %s
AND (mv.valid_to IS NULL OR mv.valid_to > %s)
ORDER BY COALESCE(m.sort_order, 2147483647) ASC, mv.member_id ASC
""",
(as_of, as_of, as_of, as_of),
)
return cur.fetchall()
def build_member_compare_items(from_items: list[dict[str, object]], to_items: list[dict[str, object]]) -> list[dict[str, object]]:
tracked_fields = (
("company", "소속회사", "기본"),
("rank", "직급", "기본"),
("role", "직책", "기본"),
("department", "부서", "조직"),
("grp", "그룹", "조직"),
("division", "디비전", "조직"),
("team", "팀", "조직"),
("cell", "셀", "조직"),
("work_status", "근무상태", "기본"),
("work_time", "근무시간", "기본"),
("phone", "전화번호", "기본"),
("email", "이메일", "기본"),
)
def build_summary(item: dict[str, object] | None) -> list[str]:
if not item:
return []
summary_fields = (
("rank", "직급"),
("role", "직책"),
("department", "부서"),
("grp", "그룹"),
("division", "디비전"),
("team", "팀"),
("cell", "셀"),
)
lines: list[str] = []
for field, label in summary_fields:
value = str(item.get(field) or "").strip()
if value:
lines.append(f"{label}: {value}")
return lines
from_map = {int(item["id"]): item for item in from_items}
to_map = {int(item["id"]): item for item in to_items}
all_ids = sorted(set(from_map) | set(to_map))
items: list[dict[str, object]] = []
for member_id in all_ids:
before = from_map.get(member_id)
after = to_map.get(member_id)
if before is None and after is not None:
items.append(
{
"member_id": member_id,
"name": str(after.get("name") or "-"),
"status": "added",
"status_label": "신규",
"categories": ["신규"],
"changed_at": after.get("updated_at"),
"changes": [],
"before_lines": [],
"after_lines": build_summary(after),
}
)
continue
if before is not None and after is None:
items.append(
{
"member_id": member_id,
"name": str(before.get("name") or "-"),
"status": "removed",
"status_label": "삭제",
"categories": ["삭제"],
"changed_at": before.get("history_valid_to") or before.get("updated_at"),
"changes": [],
"before_lines": build_summary(before),
"after_lines": [],
}
)
continue
if before is None or after is None:
continue
changes: list[dict[str, str]] = []
categories: set[str] = set()
for field, label, category in tracked_fields:
before_value = str(before.get(field) or "").strip()
after_value = str(after.get(field) or "").strip()
if before_value == after_value:
continue
changes.append(
{
"field": field,
"label": label,
"before": before_value,
"after": after_value,
}
)
categories.add(category)
if not changes:
continue
items.append(
{
"member_id": member_id,
"name": str(after.get("name") or before.get("name") or "-"),
"status": "updated",
"status_label": "변경",
"categories": sorted(categories),
"changed_at": after.get("updated_at") or before.get("updated_at"),
"changes": changes,
"before_lines": [f"{change['label']}: {change['before'] or '-'}" for change in changes],
"after_lines": [f"{change['label']}: {change['after'] or '-'}" for change in changes],
}
)
order_map = {"added": 0, "updated": 1, "removed": 2}
items.sort(key=lambda item: (order_map.get(str(item.get("status") or ""), 9), str(item.get("name") or ""), int(item.get("member_id") or 0)))
return items
def serialize_seat_map_payload(payload: SeatMapPayload) -> tuple[object, ...]:
return (
payload.name.strip(),
payload.source_type.strip() or "image",
payload.source_url.strip(),
payload.preview_svg,
payload.view_box_min_x,
payload.view_box_min_y,
payload.view_box_width,
payload.view_box_height,
payload.image_url.strip(),
payload.image_width,
payload.image_height,
payload.grid_rows,
payload.grid_cols,
payload.cell_gap,
payload.is_active,
)
def fetch_seat_map(seat_map_id: int) -> dict[str, object] | None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_url, image_width, image_height, grid_rows, grid_cols,
cell_gap, is_active, created_at, updated_at
FROM seat_maps
WHERE id = %s
""",
(seat_map_id,),
)
return cur.fetchone()
def fetch_active_seat_map() -> dict[str, object] | None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_url, image_width, image_height, grid_rows, grid_cols,
cell_gap, is_active, created_at, updated_at
FROM seat_maps
WHERE is_active = TRUE
ORDER BY updated_at DESC, id DESC
LIMIT 1
"""
)
return cur.fetchone()
def ensure_fixed_office_seat_map(office_key: str = FIXED_OFFICE_SOURCE_KEY, activate: bool = True) -> dict[str, object]:
config = FIXED_OFFICE_CONFIGS.get(office_key)
if not config:
raise HTTPException(status_code=404, detail="Fixed office configuration not found.")
template = parse_fixed_office_template(office_key)
slots = template["slots"]
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id
FROM seat_maps
WHERE source_type = 'fixed_html'
AND source_url = %s
LIMIT 1
""",
(office_key,),
)
row = cur.fetchone()
if activate:
cur.execute("UPDATE seat_maps SET is_active = FALSE, updated_at = NOW() WHERE is_active = TRUE")
if row is None:
cur.execute(
"""
INSERT INTO seat_maps (
name, image_url, source_type, source_url, preview_svg,
view_box_min_x, view_box_min_y, view_box_width, view_box_height,
image_width, image_height, grid_rows, grid_cols, cell_gap, is_active
)
VALUES (%s, '', 'fixed_html', %s, '', NULL, NULL, NULL, NULL, NULL, NULL, 1, 1, 0, %s)
RETURNING id
""",
(str(config["name"]), office_key, activate),
)
seat_map_id = int(cur.fetchone()["id"])
else:
seat_map_id = int(row["id"])
cur.execute(
"""
UPDATE seat_maps
SET name = %s,
source_type = 'fixed_html',
source_url = %s,
image_url = '',
preview_svg = '',
grid_rows = 1,
grid_cols = 1,
cell_gap = 0,
is_active = %s,
updated_at = NOW()
WHERE id = %s
""",
(str(config["name"]), office_key, activate, seat_map_id),
)
cur.execute("SELECT id, slot_key FROM seat_slots WHERE seat_map_id = %s", (seat_map_id,))
existing_slots = {str(item["slot_key"]): int(item["id"]) for item in cur.fetchall()}
incoming_keys = {str(slot["slot_key"]) for slot in slots}
for slot in slots:
slot_key = str(slot["slot_key"])
if slot_key in existing_slots:
cur.execute(
"""
UPDATE seat_slots
SET label = %s, x = %s, y = %s, rotation = %s, layer_name = %s, updated_at = NOW()
WHERE seat_map_id = %s AND slot_key = %s
""",
(
slot["label"],
slot["x"],
slot["y"],
slot["rotation"],
slot["layer_name"],
seat_map_id,
slot_key,
),
)
else:
cur.execute(
"""
INSERT INTO seat_slots (seat_map_id, slot_key, label, x, y, rotation, layer_name)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
seat_map_id,
slot_key,
slot["label"],
slot["x"],
slot["y"],
slot["rotation"],
slot["layer_name"],
),
)
if existing_slots:
stale_keys = [key for key in existing_slots if key not in incoming_keys]
if stale_keys:
cur.execute(
"DELETE FROM seat_slots WHERE seat_map_id = %s AND slot_key = ANY(%s)",
(seat_map_id, stale_keys),
)
conn.commit()
seat_map = fetch_seat_map(seat_map_id)
if seat_map is None:
raise HTTPException(status_code=500, detail="Fixed office seat map initialization failed.")
return seat_map
def compute_seat_label(row_index: int, col_index: int) -> str:
quotient = row_index
row_label = ""
while True:
quotient, remainder = divmod(quotient, 26)
row_label = chr(65 + remainder) + row_label
if quotient == 0:
break
quotient -= 1
return f"{row_label}-{col_index + 1:02d}"
def compute_slot_label(index: int) -> str:
return f"CHAIR-{index + 1:03d}"
def decode_segment_values(raw_base64: str) -> list[int]:
decoded = base64.b64decode(raw_base64.encode("ascii"))
if not decoded:
return []
return [item[0] for item in struct.iter_unpack(" dict[str, object]:
cached = _fixed_office_cache.get(office_key)
if cached is not None:
return cached
config = FIXED_OFFICE_CONFIGS.get(office_key)
if not config:
raise HTTPException(status_code=404, detail="Fixed office configuration not found.")
html_path = Path(str(config["html_path"]))
payload_path = Path(str(config["payload_path"]))
if not html_path.exists():
raise HTTPException(status_code=500, detail=f"Fixed office viewer template not found: {office_key}")
if not payload_path.exists():
raise HTTPException(status_code=500, detail=f"Fixed office payload not found: {office_key}")
html = html_path.read_text(encoding="utf-8")
payload_js = payload_path.read_text(encoding="utf-8")
payload_match = re.search(r"window\.CHAIR_MAP_DATA\s*=\s*(\{.*\});?\s*$", payload_js, flags=re.S)
if not payload_match:
raise HTTPException(status_code=500, detail=f"Fixed office viewer data not found: {office_key}")
html = re.sub(
r'',
f"",
html,
count=1,
)
data = json.loads(payload_match.group(1))
chair_values = decode_segment_values(str(data["chairSegsB64"]))
slots: list[dict[str, object]] = []
for index, chair in enumerate(data["chairs"]):
slot_key, name, _kind, start, count = chair
min_x = math.inf
min_y = math.inf
max_x = -math.inf
max_y = -math.inf
start_index = int(start)
end_index = start_index + int(count)
for item_index in range(start_index, end_index):
offset = item_index * 4
x1 = chair_values[offset] / 10
y1 = chair_values[offset + 1] / 10
x2 = chair_values[offset + 2] / 10
y2 = chair_values[offset + 3] / 10
min_x = min(min_x, x1, x2)
min_y = min(min_y, y1, y2)
max_x = max(max_x, x1, x2)
max_y = max(max_y, y1, y2)
slots.append(
{
"slot_key": str(slot_key),
"label": str(slot_key),
"x": round((min_x + max_x) / 2, 3),
"y": round((min_y + max_y) / 2, 3),
"rotation": 0.0,
"layer_name": str(name),
}
)
parsed = {
"html": html,
"data": data,
"slots": slots,
}
_fixed_office_cache[office_key] = parsed
return parsed
def is_chair_layer(layer_name: str) -> bool:
raw = layer_name.strip().lower()
compact = raw.replace("-", "").replace("_", "").replace(" ", "")
return raw in {"chair", "_chair", "-chair"} or compact.endswith("chair")
def inspect_dxf_header(file_path: Path) -> tuple[str, str]:
with file_path.open("rb") as source:
header = source.read(128)
header_text = header.decode("latin-1", errors="ignore").replace("\x00", "")
preview = header[:32].hex(" ")
if header_text.startswith("AutoCAD Binary DXF"):
return ("binary_dxf", preview)
if header_text.startswith("0\nSECTION") or header_text.startswith("0\r\nSECTION"):
return ("ascii_dxf", preview)
if header.startswith(b"AC10"):
return ("dwg_or_dwg_like", preview)
return ("unknown", preview)
def iter_render_entities(entity: ezdxf.entities.DXFGraphic, inherited_layer: str | None = None, depth: int = 0) -> list[ezdxf.entities.DXFGraphic]:
if depth > 6:
return []
entity_type = entity.dxftype()
current_layer = inherited_layer or entity.dxf.layer
if entity_type == "INSERT":
expanded: list[ezdxf.entities.DXFGraphic] = []
try:
for child in entity.virtual_entities():
child_layer = child.dxf.layer
if child_layer == "0":
child.dxf.layer = current_layer
expanded.extend(iter_render_entities(child, inherited_layer=current_layer, depth=depth + 1))
except Exception:
return []
return expanded
if inherited_layer and entity.dxf.layer == "0":
entity.dxf.layer = inherited_layer
return [entity]
def get_entity_points(entity: ezdxf.entities.DXFGraphic) -> list[tuple[float, float]]:
entity_type = entity.dxftype()
if entity_type == "LINE":
return [
(float(entity.dxf.start.x), float(entity.dxf.start.y)),
(float(entity.dxf.end.x), float(entity.dxf.end.y)),
]
if entity_type == "LWPOLYLINE":
return [(float(point[0]), float(point[1])) for point in entity.get_points("xy")]
if entity_type == "POLYLINE":
return [(float(vertex.dxf.location.x), float(vertex.dxf.location.y)) for vertex in entity.vertices]
if entity_type == "CIRCLE":
center = entity.dxf.center
radius = float(entity.dxf.radius)
return [
(float(center.x - radius), float(center.y - radius)),
(float(center.x + radius), float(center.y + radius)),
]
if entity_type == "ARC":
center = entity.dxf.center
radius = float(entity.dxf.radius)
return [
(float(center.x - radius), float(center.y - radius)),
(float(center.x + radius), float(center.y + radius)),
]
if entity_type == "POINT":
location = entity.dxf.location
return [(float(location.x), float(location.y))]
if entity_type == "SPLINE":
try:
return [(float(point[0]), float(point[1])) for point in entity.flattening(2)]
except Exception:
return []
if entity_type == "ELLIPSE":
try:
return [(float(point[0]), float(point[1])) for point in entity.flattening(2)]
except Exception:
center = entity.dxf.center
major_axis = entity.dxf.major_axis
ratio = float(entity.dxf.ratio)
radius_x = math.hypot(float(major_axis.x), float(major_axis.y))
radius_y = radius_x * ratio
return [
(float(center.x - radius_x), float(center.y - radius_y)),
(float(center.x + radius_x), float(center.y + radius_y)),
]
if entity_type == "INSERT":
insert = entity.dxf.insert
return [(float(insert.x), float(insert.y))]
return []
def get_entity_center(entity: ezdxf.entities.DXFGraphic) -> tuple[float, float] | None:
points = get_entity_points(entity)
if not points:
return None
min_x = min(point[0] for point in points)
max_x = max(point[0] for point in points)
min_y = min(point[1] for point in points)
max_y = max(point[1] for point in points)
return ((min_x + max_x) / 2.0, (min_y + max_y) / 2.0)
def get_entity_bounds(entity: ezdxf.entities.DXFGraphic) -> tuple[float, float, float, float] | None:
points = get_entity_points(entity)
if not points:
return None
min_x = min(point[0] for point in points)
max_x = max(point[0] for point in points)
min_y = min(point[1] for point in points)
max_y = max(point[1] for point in points)
return (min_x, min_y, max_x, max_y)
def compute_bounds_from_points(points: list[tuple[float, float]]) -> tuple[float, float, float, float]:
min_x = min(point[0] for point in points)
max_x = max(point[0] for point in points)
min_y = min(point[1] for point in points)
max_y = max(point[1] for point in points)
return (min_x, min_y, max(max_x - min_x, 1.0), max(max_y - min_y, 1.0))
def percentile(values: list[float], ratio: float) -> float:
if not values:
return 0.0
ordered = sorted(values)
index = max(0, min(len(ordered) - 1, round((len(ordered) - 1) * ratio)))
return float(ordered[index])
def compute_focus_bounds(slot_points: list[tuple[float, float]]) -> tuple[float, float, float, float]:
x_values = [point[0] for point in slot_points]
y_values = [point[1] for point in slot_points]
min_x = percentile(x_values, 0.02)
max_x = percentile(x_values, 0.98)
min_y = percentile(y_values, 0.02)
max_y = percentile(y_values, 0.98)
width = max(max_x - min_x, 1.0)
height = max(max_y - min_y, 1.0)
pad_x = max(width * 0.08, 500.0)
pad_y = max(height * 0.08, 500.0)
return (min_x - pad_x, min_y - pad_y, max_x + pad_x, max_y + pad_y)
def get_entity_max_span(entity: ezdxf.entities.DXFGraphic) -> float:
bounds = get_entity_bounds(entity)
if bounds is None:
return 0.0
min_x, min_y, max_x, max_y = bounds
return max(max_x - min_x, max_y - min_y)
def compute_outline_bounds(entities: list[ezdxf.entities.DXFGraphic]) -> tuple[float, float, float, float] | None:
outline_layers = {"0", "0-COL", "WID", "XH", "CO-DOOR", "CO-DO-FR", "문", "회의실"}
outline_points: list[tuple[float, float]] = []
for entity in entities:
if is_chair_layer(entity.dxf.layer):
continue
if entity.dxf.layer not in outline_layers:
continue
if get_entity_max_span(entity) < 3000:
continue
outline_points.extend(get_entity_points(entity))
if not outline_points:
return None
min_x, min_y, width, height = compute_bounds_from_points(outline_points)
pad_x = max(width * 0.025, 300.0)
pad_y = max(height * 0.025, 300.0)
return (min_x - pad_x, min_y - pad_y, min_x + width + pad_x, min_y + height + pad_y)
def bounds_intersect(bounds: tuple[float, float, float, float], focus_bounds: tuple[float, float, float, float]) -> bool:
min_x, min_y, max_x, max_y = bounds
focus_min_x, focus_min_y, focus_max_x, focus_max_y = focus_bounds
return not (
max_x < focus_min_x
or min_x > focus_max_x
or max_y < focus_min_y
or min_y > focus_max_y
)
def line_svg(points: list[tuple[float, float]], css_class: str = "seatmap-dxf-entity") -> str:
if len(points) < 2:
return ""
coordinates = " ".join(f"{x:.2f},{-y:.2f}" for x, y in points)
return (
f''
)
def circle_svg(
center_x: float,
center_y: float,
radius: float,
stroke: str = "#475569",
fill: str = "none",
css_class: str = "seatmap-dxf-entity",
) -> str:
return (
f''
)
def build_dxf_preview_svg(
entities: list[ezdxf.entities.DXFGraphic],
bounds: tuple[float, float, float, float],
) -> str:
min_x, min_y, width, height = bounds
max_y = min_y + height
svg_parts: list[str] = []
for entity in entities:
layer_name = entity.dxf.layer
is_chair = is_chair_layer(layer_name)
css_class = "seatmap-dxf-chair-entity" if is_chair else "seatmap-dxf-entity"
entity_type = entity.dxftype()
if entity_type in {"LINE", "LWPOLYLINE", "POLYLINE", "SPLINE", "ELLIPSE"}:
svg = line_svg(get_entity_points(entity), css_class=css_class)
if svg:
svg_parts.append(svg)
elif entity_type == "CIRCLE":
center = entity.dxf.center
svg_parts.append(
circle_svg(
float(center.x),
float(center.y),
float(entity.dxf.radius),
fill="none",
css_class=css_class,
)
)
elif entity_type == "ARC":
center = entity.dxf.center
radius = float(entity.dxf.radius)
start_angle = math.radians(float(entity.dxf.start_angle))
end_angle = math.radians(float(entity.dxf.end_angle))
start_x = float(center.x) + radius * math.cos(start_angle)
start_y = float(center.y) + radius * math.sin(start_angle)
end_x = float(center.x) + radius * math.cos(end_angle)
end_y = float(center.y) + radius * math.sin(end_angle)
large_arc = 1 if abs(float(entity.dxf.end_angle) - float(entity.dxf.start_angle)) > 180 else 0
svg_parts.append(
f''
)
view_box = f"{min_x:.2f} {-max_y:.2f} {max(width, 1.0):.2f} {max(height, 1.0):.2f}"
return (
f'"
)
def load_dxf_document(file_path: Path) -> ezdxf.document.Drawing:
try:
return ezdxf.readfile(file_path)
except OSError:
try:
document, _ = recover.readfile(file_path)
return document
except Exception as exc:
kind, preview = inspect_dxf_header(file_path)
if kind == "binary_dxf":
raise HTTPException(
status_code=400,
detail=f"Binary DXF로 보이지만 해석에 실패했습니다. 가능하면 ASCII DXF로 다시 저장해 업로드하세요. 헤더={preview}",
) from exc
if kind == "dwg_or_dwg_like":
raise HTTPException(
status_code=400,
detail=f"업로드한 파일은 DWG 계열 헤더(AC10xx)로 보입니다. DWG가 아니라 ASCII DXF로 다시 저장해 업로드하세요. 헤더={preview}",
) from exc
if kind == "ascii_dxf":
raise HTTPException(
status_code=400,
detail=f"ASCII DXF로 보이지만 구조를 해석하지 못했습니다. 도면을 다른 DXF 버전으로 다시 저장해보세요. 헤더={preview}",
) from exc
raise HTTPException(
status_code=400,
detail=f"업로드한 파일 형식을 판별하지 못했습니다. 확장자만 dxf인 파일일 수 있습니다. 헤더={preview}",
) from exc
def entity_to_segments(entity: ezdxf.entities.DXFGraphic, arc_steps: int = 24) -> list[tuple[float, float, float, float]]:
entity_type = entity.dxftype()
points = get_entity_points(entity)
if entity_type in {"LINE", "LWPOLYLINE", "POLYLINE", "SPLINE", "ELLIPSE"} and len(points) >= 2:
return [
(float(left[0]), float(left[1]), float(right[0]), float(right[1]))
for left, right in zip(points[:-1], points[1:])
]
if entity_type == "CIRCLE":
center = entity.dxf.center
radius = float(entity.dxf.radius)
samples = []
for index in range(arc_steps + 1):
angle = (math.tau * index) / arc_steps
samples.append(
(
float(center.x) + radius * math.cos(angle),
float(center.y) + radius * math.sin(angle),
)
)
return [
(float(left[0]), float(left[1]), float(right[0]), float(right[1]))
for left, right in zip(samples[:-1], samples[1:])
]
if entity_type == "ARC":
center = entity.dxf.center
radius = float(entity.dxf.radius)
start_angle = math.radians(float(entity.dxf.start_angle))
end_angle = math.radians(float(entity.dxf.end_angle))
if end_angle <= start_angle:
end_angle += math.tau
samples = []
for index in range(arc_steps + 1):
ratio = index / arc_steps
angle = start_angle + (end_angle - start_angle) * ratio
samples.append(
(
float(center.x) + radius * math.cos(angle),
float(center.y) + radius * math.sin(angle),
)
)
return [
(float(left[0]), float(left[1]), float(right[0]), float(right[1]))
for left, right in zip(samples[:-1], samples[1:])
]
return []
def build_dxf_artifacts(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]], dict[str, object]]:
document = load_dxf_document(file_path)
modelspace = document.modelspace()
base_entities = [entity for entity in modelspace if entity.dxftype() in {"LINE", "LWPOLYLINE", "POLYLINE", "CIRCLE", "ARC", "INSERT", "SPLINE", "ELLIPSE"}]
all_entities: list[ezdxf.entities.DXFGraphic] = []
for entity in base_entities:
all_entities.extend(iter_render_entities(entity))
chair_entities: list[ezdxf.entities.DXFGraphic] = []
chair_points: list[tuple[float, float]] = []
for entity in all_entities:
if is_chair_layer(entity.dxf.layer):
chair_entities.append(entity)
chair_points.extend(get_entity_points(entity))
if not chair_entities:
raise HTTPException(status_code=400, detail="DXF 파일에서 chair 계열 레이어를 찾지 못했습니다.")
if not chair_points:
raise HTTPException(status_code=400, detail="DXF 좌표를 해석하지 못했습니다.")
slots: list[dict[str, object]] = []
for index, entity in enumerate(sorted(chair_entities, key=lambda item: (-(get_entity_center(item) or (0.0, 0.0))[1], (get_entity_center(item) or (0.0, 0.0))[0]))):
center = get_entity_center(entity)
if center is None:
continue
slots.append(
{
"slot_key": entity.dxf.handle,
"label": compute_slot_label(index),
"x": round(float(center[0]), 3),
"y": round(float(center[1]), 3),
"rotation": float(getattr(entity.dxf, "rotation", 0.0) or 0.0),
"layer_name": entity.dxf.layer,
}
)
if not slots:
raise HTTPException(status_code=400, detail="chair 레이어에서 좌석 위치를 추출하지 못했습니다.")
slot_points = [(float(slot["x"]), float(slot["y"])) for slot in slots]
focus_bounds = compute_outline_bounds(all_entities) or compute_focus_bounds(slot_points)
visible_entities: list[ezdxf.entities.DXFGraphic] = []
visible_points: list[tuple[float, float]] = []
for entity in all_entities:
entity_bounds = get_entity_bounds(entity)
if entity_bounds is None:
continue
if bounds_intersect(entity_bounds, focus_bounds):
visible_entities.append(entity)
visible_points.extend(get_entity_points(entity))
if not visible_entities or not visible_points:
visible_entities = all_entities
visible_points = chair_points
focus_min_x, focus_min_y, focus_max_x, focus_max_y = focus_bounds
min_x = focus_min_x
min_y = focus_min_y
width = max(focus_max_x - focus_min_x, 1.0)
height = max(focus_max_y - focus_min_y, 1.0)
preview_svg = build_dxf_preview_svg(visible_entities, (min_x, min_y, width, height))
metadata = {
"source_type": "dxf",
"view_box_min_x": round(min_x, 3),
"view_box_min_y": round(min_y, 3),
"view_box_width": round(width, 3),
"view_box_height": round(height, 3),
"preview_svg": preview_svg,
"grid_rows": 1,
"grid_cols": 1,
"image_width": None,
"image_height": None,
"cell_gap": 0,
}
slot_map = {str(slot["slot_key"]): slot for slot in slots}
chair_segments: list[list[float]] = []
chair_items: list[dict[str, object]] = []
background_segments: list[list[float]] = []
for entity in visible_entities:
segments = entity_to_segments(entity)
if not segments:
continue
if is_chair_layer(entity.dxf.layer):
slot = slot_map.get(str(entity.dxf.handle))
if not slot:
continue
start_index = len(chair_segments)
min_seg_x = math.inf
min_seg_y = math.inf
max_seg_x = -math.inf
max_seg_y = -math.inf
for x1, y1, x2, y2 in segments:
chair_segments.append([round(x1, 3), round(y1, 3), round(x2, 3), round(y2, 3)])
min_seg_x = min(min_seg_x, x1, x2)
min_seg_y = min(min_seg_y, y1, y2)
max_seg_x = max(max_seg_x, x1, x2)
max_seg_y = max(max_seg_y, y1, y2)
chair_items.append(
{
"key": str(slot["slot_key"]),
"label": slot["label"],
"kind": "chair",
"start": start_index,
"count": len(segments),
"min_x": round(min_seg_x, 3),
"min_y": round(min_seg_y, 3),
"max_x": round(max_seg_x, 3),
"max_y": round(max_seg_y, 3),
}
)
continue
for x1, y1, x2, y2 in segments:
background_segments.append([round(x1, 3), round(y1, 3), round(x2, 3), round(y2, 3)])
viewer_data = {
"meta": {
"background_segment_count": len(background_segments),
"chair_count": len(chair_items),
"chair_segment_count": len(chair_segments),
"world": {
"min_x": round(min_x, 3),
"min_y": round(min_y, 3),
"max_x": round(min_x + width, 3),
"max_y": round(min_y + height, 3),
"width": round(width, 3),
"height": round(height, 3),
},
},
"background_segments": background_segments,
"chair_segments": chair_segments,
"chairs": chair_items,
}
return metadata, slots, viewer_data
def parse_dxf_layout(file_path: Path) -> tuple[dict[str, object], list[dict[str, object]]]:
metadata, slots, _viewer_data = build_dxf_artifacts(file_path)
return metadata, slots
def fetch_seat_layout(seat_map_id: int, as_of: datetime | None = None) -> dict[str, object]:
seat_map = fetch_seat_map(seat_map_id)
if seat_map is None:
raise HTTPException(status_code=404, detail="Seat map not found.")
with get_conn() as conn:
with conn.cursor() as cur:
if as_of is None:
cur.execute(
"""
SELECT m.id, m.name, m.company, m.rank, m.role, m.department, m.grp, m.division,
m.team, m.cell, m.work_status, m.work_time, m.phone, m.email,
m.seat_label AS member_seat_label, m.photo_url, m.sort_order
FROM members m
ORDER BY m.sort_order ASC, m.id ASC
"""
)
members = cur.fetchall()
else:
members = fetch_members_as_of(cur, as_of)
cur.execute(
"""
SELECT id, slot_key, label, x, y, rotation, layer_name
FROM seat_slots
WHERE seat_map_id = %s
ORDER BY label ASC, id ASC
""",
(seat_map_id,),
)
slots = cur.fetchall()
if as_of is None:
cur.execute(
"""
SELECT sp.member_id, sp.row_index, sp.col_index, sp.seat_label,
sp.seat_slot_id,
m.name, m.company, m.rank, m.role, m.department, m.grp, m.division,
m.team, m.cell, m.work_status, m.work_time, m.phone, m.email,
m.photo_url, m.sort_order
FROM seat_positions sp
JOIN members m ON m.id = sp.member_id
WHERE sp.seat_map_id = %s
ORDER BY sp.row_index ASC, sp.col_index ASC, m.sort_order ASC, m.id ASC
""",
(seat_map_id,),
)
placements = cur.fetchall()
else:
cur.execute(
"""
SELECT sav.member_id, 0 AS row_index, 0 AS col_index, sav.seat_label,
sav.seat_slot_id,
mv.name, mv.company, mv.rank, mv.role, mv.department, mv.grp, mv.division,
mv.team, mv.cell, mv.work_status, mv.work_time, mv.phone, mv.email,
mv.photo_url, m.sort_order
FROM seat_assignment_versions sav
JOIN members m ON m.id = sav.member_id
JOIN member_versions mv
ON mv.member_id = sav.member_id
AND mv.valid_from <= %s
AND (mv.valid_to IS NULL OR mv.valid_to > %s)
WHERE sav.seat_map_id = %s
AND sav.valid_from <= %s
AND (sav.valid_to IS NULL OR sav.valid_to > %s)
ORDER BY m.sort_order ASC, m.id ASC
""",
(as_of, as_of, seat_map_id, as_of, as_of),
)
placements = cur.fetchall()
cur.execute("SELECT name FROM member_retirements")
retired_names = {str(row["name"] or "").strip() for row in cur.fetchall() if str(row["name"] or "").strip()}
for member in members:
member["is_retired"] = str(member.get("name") or "").strip() in retired_names
viewer_data: dict[str, object] | None = None
office_key = str(seat_map.get("source_url") or FIXED_OFFICE_SOURCE_KEY)
fixed_office = FIXED_OFFICE_CONFIGS.get(office_key)
if seat_map["source_type"] == "fixed_html" and fixed_office:
template = parse_fixed_office_template(office_key)
viewer_data = {
"meta": {
"chair_count": len(template["slots"]),
"office": str(fixed_office["name"]),
}
}
elif seat_map["source_type"] == "dxf" and seat_map.get("source_url"):
filename = Path(str(seat_map["source_url"])).name
source_path = UPLOAD_DIR / filename
if source_path.exists():
try:
_metadata, _slots, viewer_data = build_dxf_artifacts(source_path)
except Exception:
viewer_data = None
return {
"seat_map": seat_map,
"members": members,
"slots": slots,
"placements": placements,
"viewer_data": viewer_data,
}
def build_center_chair_viewer_html(layout: dict[str, object]) -> str:
slot_key_by_id = {
int(slot["id"]): str(slot["slot_key"])
for slot in layout.get("slots", [])
if slot.get("id") is not None and slot.get("slot_key") is not None
}
members_by_id = {
int(member["id"]): member
for member in layout.get("members", [])
if member.get("id") is not None
}
placed_keys: list[str] = []
assignment_items: list[dict[str, object]] = []
for placement in layout.get("placements", []):
slot_id = placement.get("seat_slot_id")
if slot_id is None:
continue
slot_key = slot_key_by_id.get(int(slot_id))
if slot_key:
placed_keys.append(slot_key)
member = members_by_id.get(int(placement.get("member_id") or 0))
if member:
assignment_items.append(
{
"key": slot_key,
"member_id": int(member["id"]),
"name": str(member.get("name") or "-"),
"rank": str(member.get("rank") or "-"),
}
)
seat_map = layout.get("seat_map") or {}
placed_literal = json.dumps(sorted(set(placed_keys)), ensure_ascii=False, separators=(",", ":"))
assignments_literal = json.dumps(assignment_items, ensure_ascii=False, separators=(",", ":"))
if seat_map.get("source_type") == "fixed_html":
office_key = str(seat_map.get("source_url") or FIXED_OFFICE_SOURCE_KEY)
html = parse_fixed_office_template(office_key)["html"]
else:
viewer_data = layout.get("viewer_data")
if not isinstance(viewer_data, dict):
raise HTTPException(status_code=404, detail="DXF viewer data not found.")
template_path = Path(__file__).with_name("center_chair_viewer_template.html")
if not template_path.exists():
raise HTTPException(status_code=500, detail="Viewer template not found.")
html = template_path.read_text(encoding="utf-8")
data_literal = json.dumps(viewer_data, ensure_ascii=False, separators=(",", ":"))
html = re.sub(
r"const DATA = .*?;\n\s*function decodeSegments",
f"const DATA = {data_literal};\n function decodeSegments",
html,
count=1,
flags=re.S,
)
html = html.replace(
'const STORAGE_KEY = "ptc-chair-selection";\n const placed = new Set(JSON.parse(localStorage.getItem(STORAGE_KEY) || "[]"));',
f"const STORAGE_KEY = null;\n const placed = new Set({placed_literal});",
1,
)
html = html.replace(
""" ctx.strokeStyle = selected
? "rgba(220, 38, 38, 0.98)"
: active
? "rgba(15, 118, 110, 0.98)"
: chair.kind === "group"
? "rgba(16, 134, 149, 0.74)"
: "rgba(21, 149, 142, 0.8)";
ctx.lineWidth = (selected ? 2.6 : active ? 2.1 : baseWidth) / camera.scale;""",
""" ctx.strokeStyle = selected
? "rgba(220, 38, 38, 0.98)"
: "rgba(15, 118, 110, 0.88)";
ctx.lineWidth = (selected ? 2.6 : active ? 2.0 : 1.6) / camera.scale;""",
1,
)
html = html.replace(
""" sorted.forEach((chair, index) => {
chair.key = String(index + 1);
chair.seatNo = index + 1;
});""",
""" sorted.forEach((chair, index) => {
chair.seatNo = index + 1;
});""",
1,
)
html = html.replace(
"function persistPlaced() {\n localStorage.setItem(STORAGE_KEY, JSON.stringify([...placed]));\n }",
"function persistPlaced() {\n return;\n }",
1,
)
html = html.replace(
""" window.addEventListener("pointerup", (event) => {
if (dragging && dragStart) {
const move = Math.hypot(event.clientX - dragStart.x, event.clientY - dragStart.y);
if (move < 4) {
const rect = canvas.getBoundingClientRect();
const picked = pickChair(event.clientX - rect.left, event.clientY - rect.top);
if (picked) {
if (placed.has(picked.key)) placed.delete(picked.key);
else placed.add(picked.key);
persistPlaced();
if (activePersonId) {
const currentChair = getChairByPerson(activePersonId);
if (chairAssignments[picked.key] === activePersonId) {
delete chairAssignments[picked.key];
} else {
if (currentChair && currentChair !== picked.key) delete chairAssignments[currentChair];
chairAssignments[picked.key] = activePersonId;
}
persistAssignments();
renderPeopleList();
}
}
}
}
dragging = false;
dragStart = null;
canvas.classList.remove("dragging");
requestDraw();
});""",
""" window.addEventListener("pointerup", () => {
dragging = false;
dragStart = null;
canvas.classList.remove("dragging");
requestDraw();
});""",
1,
)
html = html.replace(
""" window.addEventListener("pointerup", (event) => {
if (dragging && dragStart) {
const move = Math.hypot(event.clientX - dragStart.x, event.clientY - dragStart.y);
if (move < 4) {
const rect = canvas.getBoundingClientRect();
const picked = pickChair(event.clientX - rect.left, event.clientY - rect.top);
if (picked) {
if (placed.has(picked.key)) placed.delete(picked.key);
else placed.add(picked.key);
persistPlaced();
}
}
}
dragging = false;
dragStart = null;
canvas.classList.remove("dragging");
requestDraw();
});""",
""" window.addEventListener("pointerup", () => {
dragging = false;
dragStart = null;
canvas.classList.remove("dragging");
requestDraw();
});""",
1,
)
html = html.replace(
""" document.getElementById("clear-btn").addEventListener("click", () => {
placed.clear();
persistPlaced();
requestDraw();
});""",
""" document.getElementById("clear-btn").addEventListener("click", () => {
requestDraw();
});""",
1,
)
bridge_script = """
"""
bridge_script = bridge_script.replace("__INITIAL_ASSIGNMENTS__", assignments_literal, 1)
html = html.replace("