refactor: split backend route domains

This commit is contained in:
hyunho
2026-04-01 16:58:51 +09:00
parent 03e90d18a3
commit c3afc0c772
5 changed files with 643 additions and 532 deletions

172
backend/app/auth_routes.py Normal file
View File

@@ -0,0 +1,172 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import uuid
from typing import Callable
from fastapi import FastAPI, Form, Header, HTTPException, Request
def register_auth_routes(
app: FastAPI,
*,
get_conn,
verify_password: Callable[[str, str], bool],
build_auth_session_payload: Callable[[dict[str, object], uuid.UUID, datetime], dict[str, object]],
extract_bearer_token: Callable[[str | None], str | None],
mock_login_enabled: bool,
auth_session_hours: int,
) -> None:
@app.post("/api/auth/login")
def auth_login(
request: Request,
username: str = Form(...),
password: str = Form(...),
) -> dict[str, object]:
normalized_username = username.strip().lower()
if not normalized_username or not password.strip():
raise HTTPException(status_code=400, detail="사번과 비밀번호를 입력해주세요.")
ip_address = request.client.host if request.client else None
user_agent = request.headers.get("user-agent", "")
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT u.id, u.username, u.password_hash, u.display_name, u.role, u.member_id, u.is_active,
m.rank
FROM auth.users u
LEFT JOIN members m ON m.id = u.member_id
WHERE LOWER(u.username) = %s
""",
(normalized_username,),
)
user = cur.fetchone()
if user is None:
cur.execute(
"""
INSERT INTO auth.login_audit_logs (username, success, failure_reason, ip_address, user_agent)
VALUES (%s, FALSE, %s, %s, %s)
""",
(normalized_username, "unknown_user", ip_address, user_agent),
)
conn.commit()
raise HTTPException(status_code=401, detail="사번 또는 비밀번호가 올바르지 않습니다.")
if not bool(user.get("is_active")):
cur.execute(
"""
INSERT INTO auth.login_audit_logs (username, user_id, success, failure_reason, ip_address, user_agent)
VALUES (%s, %s, FALSE, %s, %s, %s)
""",
(normalized_username, int(user["id"]), "inactive_user", ip_address, user_agent),
)
conn.commit()
raise HTTPException(status_code=403, detail="비활성화된 계정입니다.")
if not verify_password(password, str(user.get("password_hash") or "")):
cur.execute(
"""
INSERT INTO auth.login_audit_logs (username, user_id, success, failure_reason, ip_address, user_agent)
VALUES (%s, %s, FALSE, %s, %s, %s)
""",
(normalized_username, int(user["id"]), "invalid_password", ip_address, user_agent),
)
conn.commit()
raise HTTPException(status_code=401, detail="사번 또는 비밀번호가 올바르지 않습니다.")
expires_at = datetime.now(timezone.utc) + timedelta(hours=auth_session_hours)
session_id = uuid.uuid4()
cur.execute(
"""
INSERT INTO auth.sessions (id, user_id, expires_at, ip_address, user_agent)
VALUES (%s, %s, %s, %s, %s)
""",
(session_id, int(user["id"]), expires_at, ip_address, user_agent),
)
cur.execute(
"UPDATE auth.users SET last_login_at = NOW(), updated_at = NOW() WHERE id = %s",
(int(user["id"]),),
)
cur.execute(
"""
INSERT INTO auth.login_audit_logs (username, user_id, success, failure_reason, ip_address, user_agent)
VALUES (%s, %s, TRUE, NULL, %s, %s)
""",
(normalized_username, int(user["id"]), ip_address, user_agent),
)
conn.commit()
return build_auth_session_payload(user, session_id, expires_at)
@app.post("/api/auth/logout")
def auth_logout(authorization: str | None = Header(default=None)) -> dict[str, bool]:
token = extract_bearer_token(authorization)
if not token:
return {"ok": True}
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE auth.sessions
SET revoked_at = NOW()
WHERE id = %s
AND revoked_at IS NULL
""",
(token,),
)
conn.commit()
return {"ok": True}
@app.get("/api/auth/me")
def auth_me(authorization: str | None = Header(default=None)) -> dict[str, object]:
token = extract_bearer_token(authorization)
if not token:
raise HTTPException(status_code=401, detail="인증 정보가 없습니다.")
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT s.id AS session_id, s.expires_at, s.revoked_at,
u.id, u.username, u.display_name, u.role, u.member_id, u.is_active,
m.rank
FROM auth.sessions s
JOIN auth.users u ON u.id = s.user_id
LEFT JOIN members m ON m.id = u.member_id
WHERE s.id = %s
""",
(token,),
)
row = cur.fetchone()
if row is None or row.get("revoked_at") is not None:
raise HTTPException(status_code=401, detail="세션이 유효하지 않습니다.")
expires_at = row["expires_at"]
now_utc = datetime.now(timezone.utc)
if expires_at is None or expires_at <= now_utc:
raise HTTPException(status_code=401, detail="세션이 만료되었습니다.")
if not bool(row.get("is_active")):
raise HTTPException(status_code=403, detail="비활성화된 계정입니다.")
return build_auth_session_payload(row, uuid.UUID(str(row["session_id"])), expires_at)
@app.post("/api/mock-login")
def mock_login(username: str = Form(...), password: str = Form(...)) -> dict[str, object]:
if not mock_login_enabled:
raise HTTPException(status_code=403, detail="Mock login is disabled.")
if not username.strip() or not password.strip():
raise HTTPException(status_code=400, detail="Username and password are required.")
return {
"user": {
"username": username.strip(),
"display_name": username.strip(),
"role": "admin",
},
"session_expires_at": datetime.utcnow().isoformat() + "Z",
}