First commit

This commit is contained in:
kyy
2025-10-30 10:09:14 +09:00
commit 996bea3a3a
89 changed files with 2110 additions and 0 deletions

Binary file not shown.

View File

@@ -0,0 +1,133 @@
import hashlib
import logging
import os
from typing import Optional, Union
from dotenv import load_dotenv
from fastapi import HTTPException, Security
from fastapi.security import APIKeyHeader
from services.api_key_service import validate_api_key
from snowflake import SnowflakeGenerator
from snowflake.snowflake import MAX_INSTANCE # ✅ 범위 보정에 사용
logger = logging.getLogger(__name__)
load_dotenv()
# .env 파일에서 관리자 API 키를 로드
ADMIN_API_KEY = os.getenv("ADMIN_API_KEY")
# 헤더 설정
api_key_header = APIKeyHeader(
name="X-API-KEY", auto_error=False, description="Client-specific API Key"
)
admin_api_key_header = APIKeyHeader(
name="X-Admin-KEY", auto_error=False, description="Key for administrative tasks"
)
def get_api_key(api_key: str = Security(api_key_header)):
"""요청 헤더의 X-API-KEY가 유효한지 Redis를 통해 검증합니다."""
if not validate_api_key(api_key):
logger.warning(f"유효하지 않은 API 키로 접근 시도: {api_key}")
raise HTTPException(status_code=401, detail="Invalid or missing API Key")
return api_key
def get_admin_key(admin_key: str = Security(admin_api_key_header)):
"""관리자용 API 키를 검증합니다."""
if not ADMIN_API_KEY:
logger.error(
"ADMIN_API_KEY가 서버에 설정되지 않았습니다. 관리자 API를 사용할 수 없습니다."
)
raise HTTPException(status_code=500, detail="Server configuration error")
if not admin_key or admin_key != ADMIN_API_KEY:
logger.warning("유효하지 않은 관리자 키로 관리 API 접근 시도.")
raise HTTPException(status_code=403, detail="Not authorized for this operation")
return admin_key
class APIKeyLoader:
@staticmethod
def load_gemini_key() -> str:
key = os.getenv("GEMINI_API_KEY")
if not key:
logger.error("GEMINI_API_KEY 환경 변수가 설정되지 않았습니다.")
raise ValueError("GEMINI_API_KEY 환경 변수가 설정되지 않았습니다.")
return key
@staticmethod
def load_claude_key() -> str:
key = os.getenv("ANTHROPIC_API_KEY")
if not key:
logger.error("ANTHROPIC_API_KEY 환경 변수가 설정되지 않았습니다.")
raise ValueError("ANTHROPIC_API_KEY 환경 변수가 설정되지 않았습니다.")
return key
@staticmethod
def load_gpt_key() -> str:
key = os.getenv("OPENAI_API_KEY")
if not key:
logger.error("OPENAI_API_KEY 환경 변수가 설정되지 않았습니다.")
raise ValueError("OPENAI_API_KEY 환경 변수가 설정되지 않았습니다.")
return key
def _clamp_instance(value: int) -> int:
"""0..MAX_INSTANCE 범위를 벗어나지 않도록 보정"""
if value < 0:
return 0
if value > MAX_INSTANCE:
return MAX_INSTANCE
return value
def _hash_to_instance(text: str, salt: Optional[str] = None) -> int:
"""
임의 문자열(IP/호스트명 등)을 Snowflake 인스턴스 정수에 매핑.
- 파이썬 내장 hash는 런마다 바뀌므로 sha256 사용.
- salt를 섞어 워커/파드 간 충돌 확률을 추가로 낮출 수 있음.
"""
base = text.strip().strip("[]") # IPv6 bracket 등 정리
if salt:
base = f"{base}-{salt}"
digest = int(hashlib.sha256(base.encode("utf-8")).hexdigest(), 16)
return digest % (MAX_INSTANCE + 1)
def _normalize_instance(node: Union[int, str, None]) -> int:
"""
SnowflakeGenerator가 요구하는 정수 인스턴스로 변환:
- int: 범위 보정
- 숫자 문자열: int 변환 후 보정
- 기타 문자열(IP/호스트명 등): 해시 매핑
- None/기타: 0
"""
# 충돌 완화를 위한 선택적 솔트 (예: POD_NAME / HOSTNAME / PID)
salt = os.getenv("SNOWFLAKE_SALT") or os.getenv("POD_NAME") or str(os.getpid())
if isinstance(node, int):
return _clamp_instance(node)
if isinstance(node, str):
s = node.strip()
if s.isdigit():
try:
return _clamp_instance(int(s))
except Exception:
pass
# 숫자 문자열이 아니면 해시로 매핑
return _hash_to_instance(s, salt=salt)
# None 또는 기타 타입
return 0
def create_key(node: Union[int, str, None] = 1) -> str:
"""
Snowflake 알고리즘 기반 고유 키 생성기 (request_id용).
node는 int/str/IP/호스트명/None 등 무엇이 오든 안전하게 정수 인스턴스로 매핑됩니다.
"""
instance = _normalize_instance(node)
generator = SnowflakeGenerator(instance)
return str(next(generator))

View File

@@ -0,0 +1,22 @@
# utils/redis_utils.py
import redis
from config.setting import PGN_REDIS_DB, PGN_REDIS_HOST, PGN_REDIS_PORT
def get_redis_client():
"""
Redis 클라이언트를 반환합니다. decode_responses=True 설정으로 문자열을 자동 디코딩합니다.
"""
try:
redis_client = redis.Redis(
host=PGN_REDIS_HOST,
port=PGN_REDIS_PORT,
db=PGN_REDIS_DB,
decode_responses=True,
)
# 연결 확인 (ping)
redis_client.ping()
return redis_client
except redis.ConnectionError as e:
raise RuntimeError(f"Redis 연결 실패: {e}")

View File

@@ -0,0 +1,90 @@
import datetime
import json
import logging
import re
import unicodedata
from pathlib import Path
logger = logging.getLogger(__name__)
def safe_filename(filename: str) -> str:
# 확장자 제거
print(f"[FILE NAME] {filename}")
base = Path(filename).stem
base = unicodedata.normalize("NFKC", base)
base = base.replace(" ", "_")
base = re.sub(r"[^\w\-\.가-힣]", "_", base, flags=re.UNICODE)
base = re.sub(r"_+", "_", base).strip("._-")
# 비어있으면 안전한 기본값
if not base:
base = f"result_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return f"{base}.html"
def post_process(input_json, generated_text, llm_model):
result_dict = {}
# ✅ JSON 코드블럭 형식 처리
if "```json" in generated_text:
try:
logger.debug("[PROCESS-JSON] JSON 코드블럭 형식 후처리 진행합니다.")
json_block = re.search(
r"```json\s*(\{.*?\})\s*```", generated_text, re.DOTALL
)
if json_block:
parsed_json = json.loads(json_block.group(1))
result_dict = {
re.sub(r"[^ㄱ-ㅎ가-힣a-zA-Z]", "", k): v
for k, v in parsed_json.items()
}
except Exception as e:
logger.error("[PROCESS-ERROR] JSON 코드블럭 파싱 실패:", e)
# ✅ 길이 초과 메시지 감지 및 처리
elif "입력 텍스트가" in generated_text and "모델 호출 생략" in generated_text:
result_dict = {
"message": "⚠️ 입력 텍스트가 너무 깁니다. LLM 모델 호출을 생략했습니다.",
"note": "OCR로 추출된 원본 텍스트(parsed)를 참고해 주세요.",
}
else:
# ✅ "1.제목:" 또는 "1. 제목:" 형식 처리
logger.debug("[PROCESS-STRING] JSON 코드블럭 형식이 아닙니다.")
blocks = re.split(r"\n(?=\d+\.\s*[^:\n]+:)", generated_text.strip())
for block in blocks:
if ":" in block:
key_line, *rest = block.split(":", 1)
key = re.sub(r"^\d+\.\s*", "", key_line).strip()
cleaned_key = re.sub(r"[^ㄱ-ㅎ가-힣a-zA-Z]", "", key)
value = rest[0].strip() if rest else ""
value = re.sub(r"^[^\w가-힣a-zA-Z]+", "", value).strip()
result_dict[cleaned_key] = value
input_json["result"] = result_dict
input_json["llm_model"] = llm_model
# final_result
logger.info(json.dumps(input_json["result"], indent=2, ensure_ascii=False))
return input_json
def ocr_process(filename, ocr_model, coord, text, start_time, end_time):
json_data = {
"filename": filename,
"model": {"ocr_model": ocr_model},
"time": {
"duration_sec": f"{end_time - start_time:.2f}",
"started_at": start_time,
"ended_at": end_time,
},
"fields": coord,
"parsed": text,
}
return json_data