173 lines
7.2 KiB
Python
173 lines
7.2 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
import uuid
|
|
from typing import Callable
|
|
|
|
from fastapi import FastAPI, Form, Header, HTTPException, Request
|
|
|
|
|
|
def register_auth_routes(
|
|
app: FastAPI,
|
|
*,
|
|
get_conn,
|
|
verify_password: Callable[[str, str], bool],
|
|
build_auth_session_payload: Callable[[dict[str, object], uuid.UUID, datetime], dict[str, object]],
|
|
extract_bearer_token: Callable[[str | None], str | None],
|
|
mock_login_enabled: bool,
|
|
auth_session_hours: int,
|
|
) -> None:
|
|
@app.post("/api/auth/login")
|
|
def auth_login(
|
|
request: Request,
|
|
username: str = Form(...),
|
|
password: str = Form(...),
|
|
) -> dict[str, object]:
|
|
normalized_username = username.strip().lower()
|
|
if not normalized_username or not password.strip():
|
|
raise HTTPException(status_code=400, detail="사번과 비밀번호를 입력해주세요.")
|
|
|
|
ip_address = request.client.host if request.client else None
|
|
user_agent = request.headers.get("user-agent", "")
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT u.id, u.username, u.password_hash, u.display_name, u.role, u.member_id, u.is_active,
|
|
m.rank
|
|
FROM auth.users u
|
|
LEFT JOIN members m ON m.id = u.member_id
|
|
WHERE LOWER(u.username) = %s
|
|
""",
|
|
(normalized_username,),
|
|
)
|
|
user = cur.fetchone()
|
|
|
|
if user is None:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO auth.login_audit_logs (username, success, failure_reason, ip_address, user_agent)
|
|
VALUES (%s, FALSE, %s, %s, %s)
|
|
""",
|
|
(normalized_username, "unknown_user", ip_address, user_agent),
|
|
)
|
|
conn.commit()
|
|
raise HTTPException(status_code=401, detail="사번 또는 비밀번호가 올바르지 않습니다.")
|
|
|
|
if not bool(user.get("is_active")):
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO auth.login_audit_logs (username, user_id, success, failure_reason, ip_address, user_agent)
|
|
VALUES (%s, %s, FALSE, %s, %s, %s)
|
|
""",
|
|
(normalized_username, int(user["id"]), "inactive_user", ip_address, user_agent),
|
|
)
|
|
conn.commit()
|
|
raise HTTPException(status_code=403, detail="비활성화된 계정입니다.")
|
|
|
|
if not verify_password(password, str(user.get("password_hash") or "")):
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO auth.login_audit_logs (username, user_id, success, failure_reason, ip_address, user_agent)
|
|
VALUES (%s, %s, FALSE, %s, %s, %s)
|
|
""",
|
|
(normalized_username, int(user["id"]), "invalid_password", ip_address, user_agent),
|
|
)
|
|
conn.commit()
|
|
raise HTTPException(status_code=401, detail="사번 또는 비밀번호가 올바르지 않습니다.")
|
|
|
|
expires_at = datetime.now(timezone.utc) + timedelta(hours=auth_session_hours)
|
|
session_id = uuid.uuid4()
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO auth.sessions (id, user_id, expires_at, ip_address, user_agent)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
""",
|
|
(session_id, int(user["id"]), expires_at, ip_address, user_agent),
|
|
)
|
|
cur.execute(
|
|
"UPDATE auth.users SET last_login_at = NOW(), updated_at = NOW() WHERE id = %s",
|
|
(int(user["id"]),),
|
|
)
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO auth.login_audit_logs (username, user_id, success, failure_reason, ip_address, user_agent)
|
|
VALUES (%s, %s, TRUE, NULL, %s, %s)
|
|
""",
|
|
(normalized_username, int(user["id"]), ip_address, user_agent),
|
|
)
|
|
conn.commit()
|
|
|
|
return build_auth_session_payload(user, session_id, expires_at)
|
|
|
|
@app.post("/api/auth/logout")
|
|
def auth_logout(authorization: str | None = Header(default=None)) -> dict[str, bool]:
|
|
token = extract_bearer_token(authorization)
|
|
if not token:
|
|
return {"ok": True}
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE auth.sessions
|
|
SET revoked_at = NOW()
|
|
WHERE id = %s
|
|
AND revoked_at IS NULL
|
|
""",
|
|
(token,),
|
|
)
|
|
conn.commit()
|
|
return {"ok": True}
|
|
|
|
@app.get("/api/auth/me")
|
|
def auth_me(authorization: str | None = Header(default=None)) -> dict[str, object]:
|
|
token = extract_bearer_token(authorization)
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="인증 정보가 없습니다.")
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT s.id AS session_id, s.expires_at, s.revoked_at,
|
|
u.id, u.username, u.display_name, u.role, u.member_id, u.is_active,
|
|
m.rank
|
|
FROM auth.sessions s
|
|
JOIN auth.users u ON u.id = s.user_id
|
|
LEFT JOIN members m ON m.id = u.member_id
|
|
WHERE s.id = %s
|
|
""",
|
|
(token,),
|
|
)
|
|
row = cur.fetchone()
|
|
|
|
if row is None or row.get("revoked_at") is not None:
|
|
raise HTTPException(status_code=401, detail="세션이 유효하지 않습니다.")
|
|
|
|
expires_at = row["expires_at"]
|
|
now_utc = datetime.now(timezone.utc)
|
|
if expires_at is None or expires_at <= now_utc:
|
|
raise HTTPException(status_code=401, detail="세션이 만료되었습니다.")
|
|
|
|
if not bool(row.get("is_active")):
|
|
raise HTTPException(status_code=403, detail="비활성화된 계정입니다.")
|
|
|
|
return build_auth_session_payload(row, uuid.UUID(str(row["session_id"])), expires_at)
|
|
|
|
@app.post("/api/mock-login")
|
|
def mock_login(username: str = Form(...), password: str = Form(...)) -> dict[str, object]:
|
|
if not mock_login_enabled:
|
|
raise HTTPException(status_code=403, detail="Mock login is disabled.")
|
|
if not username.strip() or not password.strip():
|
|
raise HTTPException(status_code=400, detail="Username and password are required.")
|
|
return {
|
|
"user": {
|
|
"username": username.strip(),
|
|
"display_name": username.strip(),
|
|
"role": "admin",
|
|
},
|
|
"session_expires_at": datetime.utcnow().isoformat() + "Z",
|
|
}
|