from __future__ import annotations from datetime import datetime, timedelta, timezone import uuid from typing import Callable from fastapi import FastAPI, Form, Header, HTTPException, Request def register_auth_routes( app: FastAPI, *, get_conn, verify_password: Callable[[str, str], bool], build_auth_session_payload: Callable[[dict[str, object], uuid.UUID, datetime], dict[str, object]], extract_bearer_token: Callable[[str | None], str | None], mock_login_enabled: bool, auth_session_hours: int, ) -> None: @app.post("/api/auth/login") def auth_login( request: Request, username: str = Form(...), password: str = Form(...), ) -> dict[str, object]: normalized_username = username.strip().lower() if not normalized_username or not password.strip(): raise HTTPException(status_code=400, detail="사번과 비밀번호를 입력해주세요.") ip_address = request.client.host if request.client else None user_agent = request.headers.get("user-agent", "") with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT u.id, u.username, u.password_hash, u.display_name, u.role, u.member_id, u.is_active, m.rank FROM auth.users u LEFT JOIN members m ON m.id = u.member_id WHERE LOWER(u.username) = %s """, (normalized_username,), ) user = cur.fetchone() if user is None: cur.execute( """ INSERT INTO auth.login_audit_logs (username, success, failure_reason, ip_address, user_agent) VALUES (%s, FALSE, %s, %s, %s) """, (normalized_username, "unknown_user", ip_address, user_agent), ) conn.commit() raise HTTPException(status_code=401, detail="사번 또는 비밀번호가 올바르지 않습니다.") if not bool(user.get("is_active")): cur.execute( """ INSERT INTO auth.login_audit_logs (username, user_id, success, failure_reason, ip_address, user_agent) VALUES (%s, %s, FALSE, %s, %s, %s) """, (normalized_username, int(user["id"]), "inactive_user", ip_address, user_agent), ) conn.commit() raise HTTPException(status_code=403, detail="비활성화된 계정입니다.") if not verify_password(password, str(user.get("password_hash") or "")): cur.execute( """ INSERT INTO auth.login_audit_logs (username, user_id, success, failure_reason, ip_address, user_agent) VALUES (%s, %s, FALSE, %s, %s, %s) """, (normalized_username, int(user["id"]), "invalid_password", ip_address, user_agent), ) conn.commit() raise HTTPException(status_code=401, detail="사번 또는 비밀번호가 올바르지 않습니다.") expires_at = datetime.now(timezone.utc) + timedelta(hours=auth_session_hours) session_id = uuid.uuid4() cur.execute( """ INSERT INTO auth.sessions (id, user_id, expires_at, ip_address, user_agent) VALUES (%s, %s, %s, %s, %s) """, (session_id, int(user["id"]), expires_at, ip_address, user_agent), ) cur.execute( "UPDATE auth.users SET last_login_at = NOW(), updated_at = NOW() WHERE id = %s", (int(user["id"]),), ) cur.execute( """ INSERT INTO auth.login_audit_logs (username, user_id, success, failure_reason, ip_address, user_agent) VALUES (%s, %s, TRUE, NULL, %s, %s) """, (normalized_username, int(user["id"]), ip_address, user_agent), ) conn.commit() return build_auth_session_payload(user, session_id, expires_at) @app.post("/api/auth/logout") def auth_logout(authorization: str | None = Header(default=None)) -> dict[str, bool]: token = extract_bearer_token(authorization) if not token: return {"ok": True} with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE auth.sessions SET revoked_at = NOW() WHERE id = %s AND revoked_at IS NULL """, (token,), ) conn.commit() return {"ok": True} @app.get("/api/auth/me") def auth_me(authorization: str | None = Header(default=None)) -> dict[str, object]: token = extract_bearer_token(authorization) if not token: raise HTTPException(status_code=401, detail="인증 정보가 없습니다.") with get_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT s.id AS session_id, s.expires_at, s.revoked_at, u.id, u.username, u.display_name, u.role, u.member_id, u.is_active, m.rank FROM auth.sessions s JOIN auth.users u ON u.id = s.user_id LEFT JOIN members m ON m.id = u.member_id WHERE s.id = %s """, (token,), ) row = cur.fetchone() if row is None or row.get("revoked_at") is not None: raise HTTPException(status_code=401, detail="세션이 유효하지 않습니다.") expires_at = row["expires_at"] now_utc = datetime.now(timezone.utc) if expires_at is None or expires_at <= now_utc: raise HTTPException(status_code=401, detail="세션이 만료되었습니다.") if not bool(row.get("is_active")): raise HTTPException(status_code=403, detail="비활성화된 계정입니다.") return build_auth_session_payload(row, uuid.UUID(str(row["session_id"])), expires_at) @app.post("/api/mock-login") def mock_login(username: str = Form(...), password: str = Form(...)) -> dict[str, object]: if not mock_login_enabled: raise HTTPException(status_code=403, detail="Mock login is disabled.") if not username.strip() or not password.strip(): raise HTTPException(status_code=400, detail="Username and password are required.") return { "user": { "username": username.strip(), "display_name": username.strip(), "role": "admin", }, "session_expires_at": datetime.utcnow().isoformat() + "Z", }