from __future__ import annotations from datetime import datetime from pathlib import Path import csv from io import BytesIO, StringIO import shutil import uuid from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from openpyxl import load_workbook from pydantic import BaseModel, Field from .config import LEGACY_DIR, MOCK_LOGIN_ENABLED, SNAPSHOT_DIR, UPLOAD_DIR from .db import get_conn, init_db app = FastAPI(title="MH Dashboard Organization API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) LEGACY_STATIC_DIR = LEGACY_DIR / "static" class MemberPayload(BaseModel): id: int | None = None name: str = Field(min_length=1) company: str = "" rank: str = "" role: str = "" department: str = "" grp: str = "" division: str = "" team: str = "" cell: str = "" work_status: str = "" work_time: str = "" phone: str = "" email: str = "" seat_label: str = "" photo_url: str = "" sort_order: int | None = None class MemberBulkPayload(BaseModel): items: list[MemberPayload] LEGACY_HEADER_MAP = { "이름": "name", "소속회사": "company", "직급": "rank", "직책": "role", "부서": "department", "그룹": "grp", "디비전": "division", "팀": "team", "셀": "cell", "근무상태": "work_status", "근무시간": "work_time", "전화번호": "phone", "이메일": "email", "자리위치": "seat_label", "사진": "photo_url", } def serialize_member_payload(item: MemberPayload, sort_order: int) -> tuple[object, ...]: return ( item.name.strip(), item.company.strip(), item.rank.strip(), item.role.strip(), item.department.strip(), item.grp.strip(), item.division.strip(), item.team.strip(), item.cell.strip(), item.work_status.strip(), item.work_time.strip(), item.phone.strip(), item.email.strip(), item.seat_label.strip(), item.photo_url.strip(), sort_order, ) def fetch_members() -> list[dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id, name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order, created_at, updated_at FROM members ORDER BY sort_order ASC, id ASC """ ) return cur.fetchall() def replace_members(items: list[MemberPayload]) -> list[dict[str, object]]: with get_conn() as conn: with conn.cursor() as cur: cur.execute("TRUNCATE TABLE members RESTART IDENTITY CASCADE") for index, item in enumerate(items): cur.execute( """ INSERT INTO members ( name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, serialize_member_payload(item, index), ) conn.commit() return fetch_members() def rows_to_member_payloads(rows: list[list[object]]) -> list[MemberPayload]: header_idx = next( ( idx for idx, row in enumerate(rows) if "이름" in row and "부서" in row ), -1, ) if header_idx < 0: raise HTTPException(status_code=400, detail="지원하지 않는 파일 형식입니다. '이름'과 '부서' 헤더를 찾지 못했습니다.") headers = [str(value).strip() for value in rows[header_idx]] payloads: list[MemberPayload] = [] for row in rows[header_idx + 1 :]: if not any(str(value or "").strip() for value in row): continue record: dict[str, object] = {} for col_idx, header in enumerate(headers): mapped = LEGACY_HEADER_MAP.get(header) if not mapped: continue record[mapped] = str(row[col_idx] if col_idx < len(row) and row[col_idx] is not None else "").strip() if not str(record.get("name", "")).strip(): continue payloads.append(MemberPayload(**record)) return payloads def parse_import_rows(file: UploadFile, content: bytes) -> list[MemberPayload]: suffix = Path(file.filename or "").suffix.lower() if suffix == ".csv": text = content.decode("utf-8-sig") rows = list(csv.reader(StringIO(text))) return rows_to_member_payloads(rows) if suffix in {".xlsx", ".xlsm", ".xltx", ".xltm"}: workbook = load_workbook(BytesIO(content), data_only=True) sheet = workbook[workbook.sheetnames[0]] rows = [list(row) for row in sheet.iter_rows(values_only=True)] return rows_to_member_payloads(rows) raise HTTPException(status_code=400, detail="xlsx 또는 csv 파일만 업로드할 수 있습니다.") @app.on_event("startup") def startup() -> None: UPLOAD_DIR.mkdir(parents=True, exist_ok=True) SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True) LEGACY_STATIC_DIR.mkdir(parents=True, exist_ok=True) init_db() app.mount("/legacy/static", StaticFiles(directory=LEGACY_STATIC_DIR, check_dir=False), name="legacy-static") @app.get("/api/health") def health() -> dict[str, str]: return {"status": "ok"} @app.post("/api/mock-login") def mock_login(username: str = Form(...), password: str = Form(...)) -> dict[str, object]: if not MOCK_LOGIN_ENABLED: raise HTTPException(status_code=403, detail="Mock login is disabled.") if not username.strip() or not password.strip(): raise HTTPException(status_code=400, detail="Username and password are required.") return { "user": { "username": username.strip(), "display_name": username.strip(), "role": "admin", }, "session_expires_at": datetime.utcnow().isoformat() + "Z", } @app.get("/api/members") def list_members() -> dict[str, list[dict[str, object]]]: return {"items": fetch_members()} @app.post("/api/members") def create_member(payload: MemberPayload) -> dict[str, object]: with get_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT COALESCE(MAX(sort_order), -1) + 1 AS next_order FROM members") next_order = int(cur.fetchone()["next_order"]) cur.execute( """ INSERT INTO members ( name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order, created_at, updated_at """, serialize_member_payload(payload, payload.sort_order if payload.sort_order is not None else next_order), ) member = cur.fetchone() conn.commit() return {"item": member} @app.put("/api/members/{member_id}") def update_member(member_id: int, payload: MemberPayload) -> dict[str, object]: with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE members SET name = %s, company = %s, rank = %s, role = %s, department = %s, grp = %s, division = %s, team = %s, cell = %s, work_status = %s, work_time = %s, phone = %s, email = %s, seat_label = %s, photo_url = %s, sort_order = COALESCE(%s, sort_order), updated_at = NOW() WHERE id = %s RETURNING id, name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, sort_order, created_at, updated_at """, (*serialize_member_payload(payload, payload.sort_order or 0)[:-1], payload.sort_order, member_id), ) member = cur.fetchone() if member is None: raise HTTPException(status_code=404, detail="Member not found.") conn.commit() return {"item": member} @app.delete("/api/members/{member_id}") def delete_member(member_id: int) -> dict[str, bool]: with get_conn() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM members WHERE id = %s", (member_id,)) deleted = cur.rowcount > 0 conn.commit() if not deleted: raise HTTPException(status_code=404, detail="Member not found.") return {"ok": True} @app.put("/api/members/bulk-sync") def bulk_sync_members(payload: MemberBulkPayload) -> dict[str, list[dict[str, object]]]: return {"items": replace_members(payload.items)} @app.post("/api/members/import") async def import_members(file: UploadFile = File(...)) -> dict[str, list[dict[str, object]]]: content = await file.read() items = parse_import_rows(file, content) return {"items": replace_members(items)} @app.post("/api/uploads/profile-photo") def upload_profile_photo(file: UploadFile = File(...), member_name: str = Form("")) -> dict[str, str]: suffix = Path(file.filename or "").suffix.lower() if suffix not in {".png", ".jpg", ".jpeg", ".webp", ".gif"}: raise HTTPException(status_code=400, detail="Only image files are allowed.") stem = member_name.strip().replace(" ", "-") or "member" filename = f"{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{stem}-{uuid.uuid4().hex[:8]}{suffix}" target = UPLOAD_DIR / filename with target.open("wb") as out_file: shutil.copyfileobj(file.file, out_file) return {"url": f"/uploads/{filename}"} @app.post("/api/snapshots/monthly") def create_monthly_snapshot(snapshot_month: str = Form(...)) -> dict[str, str]: filename = f"organization-snapshot-{snapshot_month}.csv" target = SNAPSHOT_DIR / filename with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT name, company, rank, role, department, grp, division, team, cell, work_status, work_time, phone, email, seat_label, photo_url, updated_at FROM members ORDER BY sort_order ASC, id ASC """ ) rows = cur.fetchall() with target.open("w", newline="", encoding="utf-8-sig") as csv_file: writer = csv.DictWriter( csv_file, fieldnames=[ "name", "company", "rank", "role", "department", "grp", "division", "team", "cell", "work_status", "work_time", "phone", "email", "seat_label", "photo_url", "updated_at", ], ) writer.writeheader() writer.writerows(rows) with conn.cursor() as cur: cur.execute( "INSERT INTO snapshots (snapshot_month, file_path) VALUES (%s, %s)", (snapshot_month, str(target)), ) conn.commit() return {"file": f"/snapshots/{filename}"} @app.get("/api/snapshots") def list_snapshots() -> dict[str, list[dict[str, object]]]: with get_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT id, snapshot_month, file_path, created_at FROM snapshots ORDER BY created_at DESC" ) snapshots = cur.fetchall() return {"items": snapshots} @app.get("/legacy/organization") def legacy_organization() -> FileResponse: target = LEGACY_DIR / "DashBoard-organization.html" if not target.exists(): raise HTTPException(status_code=404, detail="Legacy dashboard file not found.") return FileResponse(target) @app.get("/legacy/organization-backup") def legacy_organization_backup() -> FileResponse: target = LEGACY_DIR / "DashBoard-organization-backup.html" if not target.exists(): raise HTTPException(status_code=404, detail="Legacy dashboard backup not found.") return FileResponse(target) @app.get("/uploads/{filename}") def get_upload(filename: str) -> FileResponse: target = UPLOAD_DIR / filename if not target.exists(): raise HTTPException(status_code=404, detail="Upload not found.") return FileResponse(target) @app.get("/snapshots/{filename}") def get_snapshot(filename: str) -> FileResponse: target = SNAPSHOT_DIR / filename if not target.exists(): raise HTTPException(status_code=404, detail="Snapshot not found.") return FileResponse(target, media_type="text/csv")