Files
llm_macro/workspace/utils/checking_keys.py
2025-10-30 10:32:31 +09:00

134 lines
4.6 KiB
Python

import hashlib
import logging
import os
from typing import Optional, Union
from dotenv import load_dotenv
from fastapi import HTTPException, Security
from fastapi.security import APIKeyHeader
from services.api_key_service import validate_api_key
from snowflake import SnowflakeGenerator
from snowflake.snowflake import MAX_INSTANCE # ✅ 범위 보정에 사용
logger = logging.getLogger(__name__)
load_dotenv()
# .env 파일에서 관리자 API 키를 로드
ADMIN_API_KEY = os.getenv("ADMIN_API_KEY")
# 헤더 설정
api_key_header = APIKeyHeader(
name="X-API-KEY", auto_error=False, description="Client-specific API Key"
)
admin_api_key_header = APIKeyHeader(
name="X-Admin-KEY", auto_error=False, description="Key for administrative tasks"
)
def get_api_key(api_key: str = Security(api_key_header)):
"""요청 헤더의 X-API-KEY가 유효한지 Redis를 통해 검증합니다."""
if not validate_api_key(api_key):
logger.warning(f"유효하지 않은 API 키로 접근 시도: {api_key}")
raise HTTPException(status_code=401, detail="Invalid or missing API Key")
return api_key
def get_admin_key(admin_key: str = Security(admin_api_key_header)):
"""관리자용 API 키를 검증합니다."""
if not ADMIN_API_KEY:
logger.error(
"ADMIN_API_KEY가 서버에 설정되지 않았습니다. 관리자 API를 사용할 수 없습니다."
)
raise HTTPException(status_code=500, detail="Server configuration error")
if not admin_key or admin_key != ADMIN_API_KEY:
logger.warning("유효하지 않은 관리자 키로 관리 API 접근 시도.")
raise HTTPException(status_code=403, detail="Not authorized for this operation")
return admin_key
class APIKeyLoader:
@staticmethod
def load_gemini_key() -> str:
key = os.getenv("GEMINI_API_KEY")
if not key:
logger.error("GEMINI_API_KEY 환경 변수가 설정되지 않았습니다.")
raise ValueError("GEMINI_API_KEY 환경 변수가 설정되지 않았습니다.")
return key
@staticmethod
def load_claude_key() -> str:
key = os.getenv("ANTHROPIC_API_KEY")
if not key:
logger.error("ANTHROPIC_API_KEY 환경 변수가 설정되지 않았습니다.")
raise ValueError("ANTHROPIC_API_KEY 환경 변수가 설정되지 않았습니다.")
return key
@staticmethod
def load_gpt_key() -> str:
key = os.getenv("OPENAI_API_KEY")
if not key:
logger.error("OPENAI_API_KEY 환경 변수가 설정되지 않았습니다.")
raise ValueError("OPENAI_API_KEY 환경 변수가 설정되지 않았습니다.")
return key
def _clamp_instance(value: int) -> int:
"""0..MAX_INSTANCE 범위를 벗어나지 않도록 보정"""
if value < 0:
return 0
if value > MAX_INSTANCE:
return MAX_INSTANCE
return value
def _hash_to_instance(text: str, salt: Optional[str] = None) -> int:
"""
임의 문자열(IP/호스트명 등)을 Snowflake 인스턴스 정수에 매핑.
- 파이썬 내장 hash는 런마다 바뀌므로 sha256 사용.
- salt를 섞어 워커/파드 간 충돌 확률을 추가로 낮출 수 있음.
"""
base = text.strip().strip("[]") # IPv6 bracket 등 정리
if salt:
base = f"{base}-{salt}"
digest = int(hashlib.sha256(base.encode("utf-8")).hexdigest(), 16)
return digest % (MAX_INSTANCE + 1)
def _normalize_instance(node: Union[int, str, None]) -> int:
"""
SnowflakeGenerator가 요구하는 정수 인스턴스로 변환:
- int: 범위 보정
- 숫자 문자열: int 변환 후 보정
- 기타 문자열(IP/호스트명 등): 해시 매핑
- None/기타: 0
"""
# 충돌 완화를 위한 선택적 솔트 (예: POD_NAME / HOSTNAME / PID)
salt = os.getenv("SNOWFLAKE_SALT") or os.getenv("POD_NAME") or str(os.getpid())
if isinstance(node, int):
return _clamp_instance(node)
if isinstance(node, str):
s = node.strip()
if s.isdigit():
try:
return _clamp_instance(int(s))
except Exception:
pass
# 숫자 문자열이 아니면 해시로 매핑
return _hash_to_instance(s, salt=salt)
# None 또는 기타 타입
return 0
def create_key(node: Union[int, str, None] = 1) -> str:
"""
Snowflake 알고리즘 기반 고유 키 생성기 (request_id용).
node는 int/str/IP/호스트명/None 등 무엇이 오든 안전하게 정수 인스턴스로 매핑됩니다.
"""
instance = _normalize_instance(node)
generator = SnowflakeGenerator(instance)
return str(next(generator))