원 레포랑 완전 분리

This commit is contained in:
ai-cell-a100-1
2025-08-11 18:56:38 +09:00
commit 7217d3cbaa
86 changed files with 6631 additions and 0 deletions

View File

View File

@@ -0,0 +1,57 @@
import os
from io import BytesIO
import tiktoken
from config.setting import ALLOWED_EXTENSIONS
from fastapi import HTTPException, UploadFile
def validate_all_files(*upload_files: UploadFile):
for upload_file in upload_files:
if not upload_file:
continue
_, ext = os.path.splitext(upload_file.filename.lower())
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=(
f"파일 '{upload_file.filename}'은(는) 지원하지 않는 확장자입니다. "
f"허용된 확장자는 {', '.join(ALLOWED_EXTENSIONS)} 입니다."
),
)
def token_counter(prompt: str, text: str) -> int:
try:
enc = tiktoken.get_encoding("cl100k_base") # OpenAI 기반 tokenizer
token_count = len(enc.encode(prompt + text))
except Exception:
token_count = len(prompt + text) // 4 # fallback: 대략적 추정
return token_count
# ✅ UploadFile을 대신할 수 있는 간단한 래퍼 클래스
class SimpleUploadFile:
def __init__(
self,
filename: str,
content: bytes,
content_type: str = "application/octet-stream",
):
self.filename = filename
self.file = BytesIO(content)
self.content_type = content_type
# ✅ UploadFile 객체 복사 → SimpleUploadFile로 변환
def clone_upload_file(upload_file: UploadFile) -> SimpleUploadFile:
file_bytes = upload_file.file.read()
upload_file.file.seek(0)
return SimpleUploadFile(
filename=upload_file.filename,
content=file_bytes,
content_type=upload_file.content_type,
)

View File

@@ -0,0 +1,78 @@
import logging
import os
from dotenv import load_dotenv
from fastapi import HTTPException, Security
from fastapi.security import APIKeyHeader
from services.api_key_service import validate_api_key
from snowflake import SnowflakeGenerator
logger = logging.getLogger(__name__)
load_dotenv()
# .env 파일에서 관리자 API 키를 로드
ADMIN_API_KEY = os.getenv("ADMIN_API_KEY")
# 헤더 설정
api_key_header = APIKeyHeader(
name="X-API-KEY", auto_error=False, description="Client-specific API Key"
)
admin_api_key_header = APIKeyHeader(
name="X-Admin-KEY", auto_error=False, description="Key for administrative tasks"
)
def get_api_key(api_key: str = Security(api_key_header)):
"""요청 헤더의 X-API-KEY가 유효한지 Redis를 통해 검증합니다."""
if not validate_api_key(api_key):
logger.warning(f"유효하지 않은 API 키로 접근 시도: {api_key}")
raise HTTPException(status_code=401, detail="Invalid or missing API Key")
return api_key
def get_admin_key(admin_key: str = Security(admin_api_key_header)):
"""관리자용 API 키를 검증합니다."""
if not ADMIN_API_KEY:
logger.error(
"ADMIN_API_KEY가 서버에 설정되지 않았습니다. 관리자 API를 사용할 수 없습니다."
)
raise HTTPException(status_code=500, detail="Server configuration error")
if not admin_key or admin_key != ADMIN_API_KEY:
logger.warning("유효하지 않은 관리자 키로 관리 API 접근 시도.")
raise HTTPException(status_code=403, detail="Not authorized for this operation")
return admin_key
class APIKeyLoader:
@staticmethod
def load_gemini_key() -> str:
key = os.getenv("GEMINI_API_KEY")
if not key:
logger.error("GEMINI_API_KEY 환경 변수가 설정되지 않았습니다.")
raise ValueError("GEMINI_API_KEY 환경 변수가 설정되지 않았습니다.")
return key
@staticmethod
def load_claude_key() -> str:
key = os.getenv("ANTHROPIC_API_KEY")
if not key:
logger.error("ANTHROPIC_API_KEY 환경 변수가 설정되지 않았습니다.")
raise ValueError("ANTHROPIC_API_KEY 환경 변수가 설정되지 않았습니다.")
return key
@staticmethod
def load_gpt_key() -> str:
key = os.getenv("OPENAI_API_KEY")
if not key:
logger.error("OPENAI_API_KEY 환경 변수가 설정되지 않았습니다.")
raise ValueError("OPENAI_API_KEY 환경 변수가 설정되지 않았습니다.")
return key
def create_key(node: int = 1) -> str:
"""
Snowflake 알고리즘 기반 고유 키 생성기 (request_id용)
"""
generator = SnowflakeGenerator(node)
return str(next(generator))

View File

@@ -0,0 +1,35 @@
import io
from pathlib import Path
from typing import List
import httpx
async def prepare_images_from_file(
file_url: str, filename: str, max_pages: int = 5, dpi: int = 180
) -> List[bytes]:
"""presigned URL → bytes. PDF이면 앞쪽 max_pages 페이지만 이미지로 변환하여 bytes 리스트 반환"""
async with httpx.AsyncClient() as client:
resp = await client.get(file_url, timeout=None)
resp.raise_for_status()
file_bytes = resp.content
ext = Path(filename).suffix.lower()
if ext in [".pdf", ".tif", ".tiff"]:
try:
from pdf2image import convert_from_bytes
except ImportError as e:
raise RuntimeError(
"pdf2image가 필요합니다. `pip install pdf2image poppler-utils`"
) from e
pil_images = convert_from_bytes(file_bytes, dpi=dpi)
images = []
for i, im in enumerate(pil_images[:max_pages]):
buf = io.BytesIO()
im.save(buf, format="PNG")
images.append(buf.getvalue())
return images
else:
# 단일 이미지
return [file_bytes]

View File

@@ -0,0 +1,182 @@
import csv
import json
import logging
import time
from datetime import datetime
from pathlib import Path
import redis
from config.setting import PGN_REDIS_DB, PGN_REDIS_HOST, PGN_REDIS_PORT
from fastapi import Depends, Request
from utils.checking_keys import get_api_key
from utils.request_utils import get_client_ip, get_swagger_port
logger = logging.getLogger(__name__)
redis_client = redis.Redis(
host=PGN_REDIS_HOST, port=PGN_REDIS_PORT, db=PGN_REDIS_DB, decode_responses=True
)
def log_user_request(
request_info: str,
endpoint: str,
input_filename: str,
model: str,
prompt_filename: str,
context_length: int,
api_key: str,
):
client_ip = get_client_ip(request_info)
swagger_port = get_swagger_port(request_info)
# ✅ 1. CSV 파일 저장
logs_dir = Path("./logs")
logs_dir.mkdir(exist_ok=True)
csv_path = logs_dir / f"{client_ip}.csv"
new_file = not csv_path.exists()
with open(csv_path, mode="a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if new_file:
writer.writerow(
[
"timestamp",
"swagger_port",
"endpoint",
"input_filename",
"prompt_filename",
"model",
"context_length",
"api_key",
]
)
writer.writerow(
[
time.strftime("%Y-%m-%d %H:%M:%S"),
swagger_port,
endpoint,
input_filename,
prompt_filename,
model,
context_length,
api_key,
]
)
# ✅ 2. Loki용 JSON 로그 출력
logger.info(
json.dumps(
{
"ip": client_ip,
"swagger_port": swagger_port,
"endpoint": endpoint,
"input_filename": input_filename,
"prompt_filename": prompt_filename,
"model": model,
"context_length": context_length,
"api_key": api_key,
"event": "inference_log",
}
)
)
def log_generation_info(custom_mode: bool, user_prompt: str = ""):
logger.info(f"[GENERATE-PROMPT-USED] 사용자 정의 프롬프트 사용유무: {custom_mode}")
if custom_mode:
logger.info(f"[GENERATE-USER-PROMPT]\n{user_prompt}")
else:
logger.info("[GENERATE-DEFAULT-PROMPT] Default_prompt")
def log_pipeline_status(request_id: str, status_message: str, step_info: dict = None):
log_entry = {
"status": status_message,
"timestamp": datetime.now().isoformat(),
"step_info": step_info,
}
redis_client.rpush(f"pipeline_status:{request_id}", json.dumps(log_entry))
def ns_to_sec(ns: int) -> float:
"""나노초를 초로 변환"""
return round(ns / 1e9, 3) # 소수점 3자리
def log_ollama_stats(res: dict):
"""Ollama 응답 JSON 내 추론 통계를 한 줄 JSON 로그로 출력 (초 단위 변환 + token/s 포함)"""
# 원본 값
total_duration = res.get("total_duration")
load_duration = res.get("load_duration")
prompt_eval_count = res.get("prompt_eval_count")
prompt_eval_duration = res.get("prompt_eval_duration")
eval_count = res.get("eval_count")
eval_duration = res.get("eval_duration")
# 초 단위로 변환
stats = {
"model": res.get("model"),
"total_duration_ns": total_duration,
"total_duration_sec": ns_to_sec(total_duration),
"load_duration_ns": load_duration,
"load_duration_sec": ns_to_sec(load_duration),
"prompt_eval_count": prompt_eval_count,
"prompt_eval_duration_ns": prompt_eval_duration,
"prompt_eval_duration_sec": ns_to_sec(prompt_eval_duration),
"eval_count": eval_count,
"eval_duration_ns": eval_duration,
"eval_duration_sec": ns_to_sec(eval_duration),
}
# token/s 계산
if eval_count and eval_duration:
stats["generation_speed_tok_per_sec"] = round(
eval_count / (eval_duration / 1e9), 2
)
logger.info("[OLLAMA-STATS] " + json.dumps(stats, ensure_ascii=False))
class EndpointLogger:
def __init__(self, request: Request, api_key: str = Depends(get_api_key)):
self.request = request
self.api_key = api_key
def log(
self,
model: str,
input_filename: str = "N/A",
prompt_filename: str = "N/A",
context_length: int = 0,
):
try:
log_user_request(
request_info=self.request,
endpoint=self.request.url.path,
input_filename=input_filename,
model=model,
prompt_filename=prompt_filename,
context_length=context_length,
api_key=self.api_key,
)
except Exception as e:
logger.warning(
f"Failed to log request for endpoint {self.request.url.path}: {e}"
)
class HealthCheckFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
# The access log record for uvicorn has the data in `args`.
# record.args = (client_addr, method, path, http_version, status_code)
# e.g. ('127.0.0.1:37894', 'GET', '/health/API', '1.1', 200)
if isinstance(record.args, tuple) and len(record.args) == 5:
method = record.args[1]
path = record.args[2]
status_code = record.args[4]
if method == 'GET' and isinstance(path, str) and path.startswith('/health') and status_code == 200:
return False
return True

View File

@@ -0,0 +1,164 @@
import io
import json
import logging
from datetime import timedelta
from typing import Optional
from config.setting import (
MINIO_ACCESS_KEY,
MINIO_ENDPOINT,
MINIO_RESULTS_BUCKET_NAME,
MINIO_SECRET_KEY,
)
from fastapi import UploadFile
from minio import Minio
from minio.error import S3Error
# MinIO 클라이언트 전역 생성
minio_client = Minio(
MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=False,
)
logger = logging.getLogger(__name__)
def get_minio_client():
"""
MinIO 클라이언트를 반환합니다. 연결 확인을 위해 list_buckets() 호출로 테스트합니다.
"""
try:
client = Minio(
MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=False, # HTTPS 사용 여부에 맞게 설정
)
# ✅ 연결 테스트 (버킷 목록 조회)
client.list_buckets()
return client
except Exception as e:
raise RuntimeError(f"MinIO 연결 실패: {e}")
def save_result_to_minio(result_dict: dict, object_name: str) -> str:
"""
결과 JSON(dict)을 BytesIO로 인코딩하여 MinIO에 저장하고 presigned URL 반환
"""
try:
# JSON -> BytesIO
result_bytes = io.BytesIO(
json.dumps(result_dict, ensure_ascii=False).encode("utf-8")
)
result_bytes.seek(0)
# MinIO에 업로드
minio_client.put_object(
bucket_name=MINIO_RESULTS_BUCKET_NAME,
object_name=object_name,
data=result_bytes,
length=result_bytes.getbuffer().nbytes,
content_type="application/json",
)
# presigned URL 생성
presigned_url = minio_client.presigned_get_object(
MINIO_RESULTS_BUCKET_NAME,
object_name,
)
return presigned_url
except Exception as e:
logger.error(f"❌ MinIO 작업 실패: {e}")
raise
def upload_file_to_minio_v2(
file: UploadFile, bucket_name: str, object_name: str
) -> str:
"""
파일을 MinIO에 업로드하고, presigned URL을 반환합니다.
Args:
file (UploadFile): FastAPI의 UploadFile 객체
bucket_name (str): 업로드할 버킷 이름
object_name (str): 저장될 객체 이름 (경로 포함 가능)
Returns:
str: 생성된 presigned URL
"""
try:
# 1. 버킷 존재 확인 및 생성
found = minio_client.bucket_exists(bucket_name)
if not found:
minio_client.make_bucket(bucket_name)
logger.info(f"✅ 버킷 '{bucket_name}' 생성 완료.")
# 2. 파일 업로드
file.file.seek(0) # 파일 포인터를 처음으로 이동
minio_client.put_object(
bucket_name,
object_name,
file.file,
length=-1, # 파일 크기를 모를 때 -1로 설정
part_size=10 * 1024 * 1024, # 10MB 단위로 청크 업로드
)
logger.info(f"'{object_name}' -> '{bucket_name}' 업로드 성공.")
# 3. Presigned URL 생성
presigned_url = minio_client.presigned_get_object(
bucket_name,
object_name,
expires=timedelta(days=7), # URL 만료 기간 (예: 7일, 필요에 따라 조절 가능)
)
logger.info(f"✅ Presigned URL 생성 완료: {presigned_url}")
return presigned_url
except Exception as e:
logger.error(f"❌ MinIO 작업 실패: {e}")
raise # 실패 시 예외를 다시 발생시켜 호출 측에서 처리하도록 함
def fetch_result_from_minio(request_id: str) -> Optional[dict]:
try:
# MinIO에서 객체 목록 가져오기 (폴더 내 전체 파일 조회)
objects = minio_client.list_objects(
bucket_name=MINIO_RESULTS_BUCKET_NAME,
prefix=f"{request_id}/",
recursive=True,
)
json_obj = next(
(obj for obj in objects if obj.object_name.endswith(".json")), None
)
if not json_obj:
logger.warning(
f"[MINIO] request_id: {request_id} 경로에 .json 파일이 존재하지 않습니다."
)
return None
object_name = json_obj.object_name
print(
f"[MINIO] request_id: {request_id}에 대한 결과를 가져옵니다. 대상 파일: {object_name}"
)
# 객체 다운로드
response = minio_client.get_object(MINIO_RESULTS_BUCKET_NAME, object_name)
content = response.read()
# JSON 디코드
result_dict = json.loads(content.decode("utf-8"))
logger.info(f"[MINIO] 결과 JSON 로드 성공: {object_name}")
return result_dict
except S3Error as e:
logger.error(f"[MINIO] S3Error 발생: {e}")
return None
except Exception as e:
logger.error(f"[MINIO] 기타 오류 발생: {e}")
return None

View File

@@ -0,0 +1,32 @@
import hashlib
import os
from workspace.config.setting import CACHED_PROMPT_DIR
# ✅ 프롬프트 캐시 저장 디렉토리가 없으면 자동 생성
def ensure_cache_dir():
os.makedirs(CACHED_PROMPT_DIR, exist_ok=True)
# ✅ 파일에서 바이트를 읽어옴 (UploadFile 또는 SimpleUploadFile 모두 대응)
def read_file_bytes(upload_file) -> bytes:
upload_file.file.seek(0)
return upload_file.file.read()
# ✅ SHA-256 해시 생성
def compute_file_hash(upload_file) -> str:
content = read_file_bytes(upload_file)
return hashlib.sha256(content).hexdigest()
# ✅ {해시}.txt 형태로 저장
def save_prompt_file_if_not_exists(file_hash: str, upload_file) -> str:
ensure_cache_dir()
file_path = os.path.join(CACHED_PROMPT_DIR, f"{file_hash}.txt")
if not os.path.exists(file_path):
content = read_file_bytes(upload_file)
with open(file_path, "wb") as f:
f.write(content)
return file_path

View File

@@ -0,0 +1,22 @@
# utils/redis_utils.py
import redis
from config.setting import PGN_REDIS_DB, PGN_REDIS_HOST, PGN_REDIS_PORT
def get_redis_client():
"""
Redis 클라이언트를 반환합니다. decode_responses=True 설정으로 문자열을 자동 디코딩합니다.
"""
try:
redis_client = redis.Redis(
host=PGN_REDIS_HOST,
port=PGN_REDIS_PORT,
db=PGN_REDIS_DB,
decode_responses=True,
)
# 연결 확인 (ping)
redis_client.ping()
return redis_client
except redis.ConnectionError as e:
raise RuntimeError(f"Redis 연결 실패: {e}")

View File

@@ -0,0 +1,27 @@
from fastapi import Request
# 🔽 사용자 IP 확인
def get_client_ip(request: Request) -> str:
xff = request.headers.get("X-Forwarded-For")
if xff:
return xff.split(",")[0].strip() # 첫 번째 값(실제 클라이언트 IP)
xri = request.headers.get("X-Real-IP")
if xri:
return xri # Nginx가 전달한 원래 클라이언트 IP
return request.client.host # 마지막 fallback (프록시/NAT IP)
# 🔽 요청 SWAGGER 포트 확인
def get_swagger_port(request: Request) -> str:
# 우선순위: X-Forwarded-Port → request.url.port → Host 헤더 → 기본 포트 추정
port = request.headers.get("X-Forwarded-Port")
if port:
return port
if request.url.port:
return str(request.url.port)
host_header = request.headers.get("host")
if host_header and ":" in host_header:
return host_header.split(":")[1]
# 마지막으로 기본 포트(HTTPS 443, HTTP 80) 추정
return "443" if request.headers.get("X-Forwarded-Proto") == "https" else "80"

View File

@@ -0,0 +1,21 @@
class PromptFormatter:
SYSTEM_PROMPT = """
다음은 스캔된 공문서에서 OCR로 추출된 원시 텍스트입니다.
오타나 줄바꿈 오류가 있을 수 있으니 의미를 유추하여 정확한 정보를 추출해주세요.
다음 주어진 항목을 JSON 형식(```json)으로 작성해주세요:
"""
@staticmethod
def format(text: str, user_prompt: str = None, custom_mode: bool = False, prompt_mode: str = "extract") -> str:
if custom_mode and prompt_mode == "extract":
return (
f"{PromptFormatter.SYSTEM_PROMPT}\n\n"
f"{user_prompt}\n\n"
f"다음은 OCR로 추출된 원시 텍스트입니다:\n\n{text}"
)
else:
return (
f"{user_prompt}\n\n"
f"다음은 OCR로 추출된 원시 텍스트입니다:\n\n{text}"
)

View File

@@ -0,0 +1,479 @@
import copy
import json
import logging
from collections import OrderedDict
from typing import Optional
import anthropic
import google.generativeai as genai
import requests
from anthropic._exceptions import BadRequestError, OverloadedError
from fastapi import HTTPException
from google.api_core.exceptions import ResourceExhausted
from openai import OpenAI
from utils.checking_keys import APIKeyLoader
from utils.logging_utils import log_generation_info, log_ollama_stats
from utils.text_formatter import PromptFormatter
logger = logging.getLogger(__name__)
# ✅ 1. Ollama Gen
class OllamaGenerator:
def __init__(
self, model="gemma3:27b", api_url="http://pgn_ollama_gemma:11534/api/generate"
):
self.model = model
self.api_url = api_url
# ✅ 1-1. Gen-General
def generate(
self, text, user_prompt=None, custom_mode=False, prompt_mode: str = "extract"
):
log_generation_info(custom_mode, user_prompt)
prompt = PromptFormatter.format(text, user_prompt, custom_mode, prompt_mode)
# /no_think 자동 부착
if "qwen" in self.model.lower():
prompt += " /no_think"
payload = {"model": self.model, "prompt": prompt, "stream": False}
try:
response = requests.post(self.api_url, json=payload)
response.raise_for_status()
res = response.json()
if "response" not in res:
raise ValueError(
"[GENERATE-OLLAMA-ERROR] LLM 응답에 'response' 키가 없습니다."
)
log_ollama_stats(res)
return res["response"], self.model, self.api_url
# ☑️ GEMINI API 초과 시, exception
except Exception as e:
logger.error(f"[OLLAMA-ERROR] 서버 연결 실패: {e}")
raise HTTPException(
status_code=500,
detail="Ollama 서빙 서버에 연결할 수 없습니다.\n서버가 실행 중인지 확인하세요.",
)
# ✅ 1-2. Gen-Structure
def structured_generate(
self,
text,
user_prompt=None,
custom_mode=False,
schema_override: Optional[dict] = None,
):
log_generation_info(custom_mode, user_prompt)
prompt = PromptFormatter.format(text, user_prompt, custom_mode)
payload = {
"model": self.model,
"prompt": prompt,
"format": schema_override,
"stream": False,
}
response = requests.post(self.api_url, json=payload)
response.raise_for_status()
res = response.json()
if "response" not in res:
raise ValueError(
"[GENERATE-OLLAMA-ERROR] LLM 응답에 'response' 키가 없습니다."
)
# ✅ 추론 통계 정보 로그 추가
log_ollama_stats(res)
# ✅ 클래스 검증 제거 → JSON 파싱만 수행
try:
structured = json.loads(res["response"])
return structured, self.model, self.api_url
except json.JSONDecodeError as e:
logger.error(f"[PARSE-ERROR] LLM 응답이 JSON으로 파싱되지 않음: {e}")
raise ValueError("LLM 응답이 JSON 형식이 아닙니다.")
# ✅ 2. Gemini Gen
class GeminiGenerator:
def __init__(self, model="gemini-2.5-pro-exp-03-25"):
self.api_key = APIKeyLoader.load_gemini_key()
genai.configure(api_key=self.api_key)
self.model = genai.GenerativeModel(model)
def clean_schema_for_gemini(self, schema: dict) -> dict:
# Gemini는 title 등 일부 필드를 허용하지 않음
cleaned = dict(schema) # shallow copy
cleaned.pop("title", None)
cleaned.pop("$schema", None)
# 기타 필요 시 추가 제거
return cleaned
# ✅ 2-1. Gen-General
def generate(
self, text, user_prompt=None, custom_mode=False, prompt_mode: str = "extract"
):
log_generation_info(custom_mode, user_prompt)
prompt = PromptFormatter.format(text, user_prompt, custom_mode, prompt_mode)
try:
response = self.model.generate_content(prompt)
if not response.text:
raise ValueError(
"[GENERATE-GEMINI-ERROR] LLM 응답에 'response' 가 없습니다."
)
return (
response.text,
self.model.model_name.split("/")[-1],
"google.generativeai SDK",
)
# ☑️ GEMINI API 초과 시, exception
except ResourceExhausted as e:
logger.error(f"[GEMINI-ERROR] 할당량 초과: {e}")
raise HTTPException(
status_code=500,
detail="Gemini 모델의 일일 API 사용량이 초과되었습니다.\n'claude-3-7-sonnet-20250219' 또는 'gpt-4.1' 모델로 다시 시도하세요.",
)
# ✅ 2-2. Gen-Structure
def structured_generate(
self,
text,
user_prompt=None,
custom_mode=False,
schema_override: Optional[dict] = None,
):
log_generation_info(custom_mode, user_prompt)
prompt = PromptFormatter.format(text, user_prompt, custom_mode)
response_schema = (
self.clean_schema_for_gemini(schema_override) if schema_override else None
)
try:
response = self.model.generate_content(
contents=prompt,
generation_config=genai.GenerationConfig(
response_mime_type="application/json",
response_schema=response_schema,
),
)
if not response.text:
raise ValueError(
"❌ Gemini 응답에서 구조화된 데이터를 파싱하지 못했습니다."
)
parsed = json.loads(response.text)
if isinstance(parsed, list) and isinstance(parsed[0], dict):
structured = parsed[0]
elif isinstance(parsed, dict):
structured = parsed
elif isinstance(parsed, list) and isinstance(parsed[0], str):
structured = json.loads(parsed[0])
else:
raise ValueError("❌ 응답 형식이 예상과 다릅니다.")
# ✅ 필드 순서 정렬
if schema_override and "properties" in schema_override:
ordered_keys = list(schema_override["properties"].keys())
structured = OrderedDict(
(key, structured.get(key)) for key in ordered_keys
)
return (
structured,
self.model.model_name.split("/")[-1],
"google.generativeai SDK",
)
# ☑️ GEMINI API 초과 시, exception
except ResourceExhausted as e:
logger.error(f"[GEMINI-STRUCTURED-ERROR] 할당량 초과: {e}")
raise HTTPException(
status_code=500,
detail="'Gemini' 모델의 일일 API 사용량이 초과되었습니다.\n'claude-3-7-sonnet-20250219' 또는 'gpt-4.1' 모델로 다시 시도하세요.",
)
except json.JSONDecodeError as e:
logger.error(f"[GEMINI-STRUCTURED-PARSE-ERROR] 응답 JSON 파싱 실패: {e}")
raise ValueError("Gemini 응답이 JSON 형식이 아닙니다.")
def generate_multimodal(self, images, prompt, schema_override=None):
import io
from PIL import Image
content = [prompt]
for image_bytes in images:
try:
img = Image.open(io.BytesIO(image_bytes))
content.append(img)
except Exception as e:
logger.error(f"[GEMINI-MULTIMODAL-ERROR] 이미지 처리 실패: {e}")
raise HTTPException(
status_code=400, detail=f"이미지 파일을 처리할 수 없습니다: {e}"
)
try:
response = self.model.generate_content(content)
if not response.text:
raise ValueError(
"[GENERATE-GEMINI-ERROR] LLM 응답에 'response' 가 없습니다."
)
return (
response.text,
self.model.model_name.split("/")[-1],
"google.generativeai SDK",
)
except ResourceExhausted as e:
logger.error(f"[GEMINI-MULTIMODAL-ERROR] 할당량 초과: {e}")
raise HTTPException(
status_code=500,
detail="Gemini 모델의 일일 API 사용량이 초과되었습니다.\n'claude-3-7-sonnet-20250219' 또는 'gpt-4.1' 모델로 다시 시도하세요.",
)
except Exception as e:
logger.error(f"[GEMINI-MULTIMODAL-ERROR] Gemini 응답 파싱 실패: {e}")
raise HTTPException(
status_code=500, detail=f"❌ Gemini 응답 생성에 실패했습니다: {e}"
)
# ✅ 3. Cluade Gen
class ClaudeGenerator:
def __init__(self, model="claude-3-7-sonnet-20250219"):
self.api_key = APIKeyLoader.load_claude_key()
self.client = anthropic.Anthropic(api_key=self.api_key)
self.model = model
# ✅ 3-1. Gen-General
def generate(
self, text, user_prompt=None, custom_mode=False, prompt_mode: str = "extract"
):
log_generation_info(custom_mode, user_prompt)
prompt = PromptFormatter.format(text, user_prompt, custom_mode, prompt_mode)
try:
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
messages=[{"role": "user", "content": prompt}],
)
if not response.content[0].text:
raise ValueError(
"[GENERATE-CLAUDE-ERROR] LLM 응답에 'response' 가 없습니다."
)
return response.content[0].text, self.model, "anthropic.Anthropic SDK"
# ☑️ CLAUDE API 초과 시, exception
except (BadRequestError, OverloadedError) as e:
logger.error(f"[CLAUDE-STRUCTURED-ERROR] Claude API 에러 발생: {e}")
raise HTTPException(
status_code=500,
detail="Claude 모델의 일일 API 사용량이 초과되었습니다.\n'gemini-2.5-pro-exp-03-25' 또는 'gpt-4.1' 모델로 다시 시도하세요.",
)
# ✅ 3-2. Gen-Structure
def structured_generate(
self,
text,
user_prompt=None,
custom_mode=False,
schema_override: Optional[dict] = None,
):
log_generation_info(custom_mode, user_prompt)
prompt = PromptFormatter.format(text, user_prompt, custom_mode)
# ✅ Claude는 JSON Schema의 key가 모두 영문이어야 함
if schema_override:
try:
for k in schema_override.get("properties", {}).keys():
if any(ord(ch) > 127 for ch in k):
# 한글 포함 여부 확인
raise HTTPException(
status_code=400,
detail="❌ Claude 모델은 JSON Schema의 필드명이 영어로만 구성되어 있어야 합니다. 필드명을 영문으로 수정해 주세요.",
)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"스키마 처리 중 오류 발생: {str(e)}"
)
tools = [
{
"name": "build_text_analysis_result",
"description": "Extract structured fields from OCR text in document format",
"input_schema": schema_override,
}
]
try:
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
messages=[{"role": "user", "content": prompt}],
tools=tools,
tool_choice={"type": "tool", "name": "build_text_analysis_result"},
)
structured = response.content[0].input
return structured, self.model, "anthropic.Anthropic SDK"
# ☑️ CLAUDE API 초과 시, exception
except (BadRequestError, OverloadedError) as e:
logger.error(f"[CLAUDE-STRUCTURED-ERROR] Claude API 에러 발생: {e}")
raise HTTPException(
status_code=500,
detail="Claude 모델의 일일 API 사용량이 초과되었습니다.\n'gemini-2.5-pro-exp-03-25' 또는 'gpt-4.1' 모델로 다시 시도하세요.",
)
# ✅ 4. GPT Gen
class GptGenerator:
def __init__(self, model="gpt-4o"):
# ✅ OpenAI API Key 로딩 및 유효성 검증
raw = APIKeyLoader.load_gpt_key()
if not raw:
raise RuntimeError("OPENAI_API_KEY가 설정되지 않았습니다.")
self.api_key = raw.strip()
if not self.api_key.startswith(("sk-", "sk-proj-")):
raise RuntimeError("유효하지 않은 OpenAI API Key 형식입니다.")
self.client = OpenAI(api_key=self.api_key)
self.model = model
def enforce_strict_schema(self, schema: dict) -> dict:
strict_schema = copy.deepcopy(schema)
# ✅ required 자동 보완
props = strict_schema.get("properties", {})
existing_required = set(strict_schema.get("required", []))
all_keys = set(props.keys())
# 누락된 필드를 required에 추가
missing_required = all_keys - existing_required
strict_schema["required"] = list(existing_required | missing_required)
# ✅ additionalProperties 보장
if "additionalProperties" not in strict_schema:
strict_schema["additionalProperties"] = False
return strict_schema
# ✅ 4-1. Gen-General
def generate(
self, text, user_prompt=None, custom_mode=False, prompt_mode: str = "extract"
):
log_generation_info(custom_mode, user_prompt)
prompt = PromptFormatter.format(text, user_prompt, custom_mode, prompt_mode)
try:
response = self.client.responses.create(model=self.model, input=prompt)
except Exception as e:
logger.error(f"[GENERATE-GPT-ERROR] OpenAI API 호출 중 예외 발생: {e}")
raise RuntimeError("GPT 생성 요청 중 오류가 발생했습니다.") from e
try:
if (
not response.output
or not response.output[0].content
or not response.output[0].content[0].text
):
raise ValueError("LLM 응답에 'response'가 없습니다.")
except Exception as e:
logger.error(
f"[GENERATE-GPT-ERROR] 응답 파싱 실패: {e} | 원본 응답: {response}"
)
raise RuntimeError("GPT 응답 파싱 중 오류가 발생했습니다.") from e
return response.output[0].content[0].text, self.model, "OpenAI Python SDK"
# ✅ 4-2. Gen-Structure
def structured_generate(
self,
text,
user_prompt=None,
custom_mode=False,
schema_override: Optional[dict] = None,
):
log_generation_info(custom_mode, user_prompt)
prompt = PromptFormatter.format(text, user_prompt, custom_mode)
schema = self.enforce_strict_schema(schema_override) if schema_override else {}
# ✅ Function Calling 방식으로 schema_override 전달
tools = [
{
"type": "function",
"function": {
"name": "build_summary",
"description": "Extract structured document summary from OCR text.",
"parameters": schema,
"strict": True,
},
}
]
try:
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{
"role": "system",
"content": "You are an assistant that extracts structured document summary from OCR text.",
},
{"role": "user", "content": prompt},
],
tools=tools,
tool_choice={"type": "function", "function": {"name": "build_summary"}},
)
tool_call = response.choices[0].message.tool_calls[0]
arguments_json = tool_call.function.arguments
structured = json.loads(arguments_json)
return structured, self.model, "OpenAI Function Calling"
except Exception as e:
logger.error(f"[GPT-STRUCTURED-ERROR] GPT 응답 파싱 실패: {e}")
raise HTTPException(
status_code=500, detail="❌ GPT 구조화 응답 생성에 실패했습니다."
)
def generate_multimodal(self, images, prompt, schema_override=None):
import base64
content = [{"type": "text", "text": prompt}]
for image_bytes in images:
base64_image = base64.b64encode(image_bytes).decode("utf-8")
content.append(
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{base64_image}"},
}
)
messages = [{"role": "user", "content": content}]
try:
response = self.client.chat.completions.create(
model=self.model, messages=messages, max_tokens=4096
)
generated_text = response.choices[0].message.content
return generated_text, self.model, "OpenAI Python SDK"
except Exception as e:
logger.error(f"[GPT-MULTIMODAL-ERROR] GPT-4o 응답 파싱 실패: {e}")
raise HTTPException(
status_code=500, detail="❌ GPT-4o 응답 생성에 실패했습니다."
)

View File

@@ -0,0 +1,134 @@
import json
import logging
import re
from pathlib import Path
from typing import Literal
import markdown2
from workspace.config.setting import SUMMARY_HTML_DIR
logger = logging.getLogger(__name__)
def safe_filename(filename: str) -> str:
base = Path(filename).stem
base = re.sub(r"[^\w\-]", "_", base)
return f"{base}.html"
def post_process(
file_name,
text,
generated_text,
coord,
ocr_model,
llm_model,
llm_url,
mode,
start_time,
end_time,
prompt_mode: Literal["general", "extract"] = "extract",
):
result_dict = {}
# ✅ 구조화 모드는 후처리 생략
if mode == "structured":
result_dict = {
"message": "✅ 구조화된 JSON 모델 출력입니다. post_process 후처리 생략됨.",
"note": "generated 필드 참조 바랍니다.",
}
# ✅ 일반 추론 모드일 경우
elif prompt_mode == "general":
html_content = markdown2.markdown(generated_text.strip())
html_filename = safe_filename(file_name)
html_path = SUMMARY_HTML_DIR / html_filename
html_path.write_text(html_content, encoding="utf-8")
summary_url = f"http://172.16.10.176:8888/view/generated_html/{html_filename}"
result_dict = {
"message": "✅ 줄글로 생성된 모델 출력입니다. post_process 후처리 생략됨.",
"note": "아래 url에 접속하여 markdown 형식으로 응답 확인하세요.",
"summary_html": summary_url,
}
# ✅ 추출 기반 후처리 (extract)
else:
# ✅ JSON 코드블럭 형식 처리
if "```json" in generated_text:
try:
logger.debug("[PROCESS-JSON] JSON 코드블럭 형식 후처리 진행합니다.")
json_block = re.search(
r"```json\s*(\{.*?\})\s*```", generated_text, re.DOTALL
)
if json_block:
parsed_json = json.loads(json_block.group(1))
result_dict = {
re.sub(r"[^ㄱ-ㅎ가-힣a-zA-Z]", "", k): v
for k, v in parsed_json.items()
}
except Exception as e:
logger.error("[PROCESS-ERROR] JSON 코드블럭 파싱 실패:", e)
# ✅ 길이 초과 메시지 감지 및 처리
elif "입력 텍스트가" in generated_text and "모델 호출 생략" in generated_text:
result_dict = {
"message": "⚠️ 입력 텍스트가 너무 깁니다. LLM 모델 호출을 생략했습니다.",
"note": "OCR로 추출된 원본 텍스트(parsed)를 참고해 주세요.",
}
else:
# ✅ "1.제목:" 또는 "1. 제목:" 형식 처리
logger.debug("[PROCESS-STRING] JSON 코드블럭 형식이 아닙니다.")
blocks = re.split(r"\n(?=\d+\.\s*[^:\n]+:)", generated_text.strip())
for block in blocks:
if ":" in block:
key_line, *rest = block.split(":", 1)
key = re.sub(r"^\d+\.\s*", "", key_line).strip()
cleaned_key = re.sub(r"[^ㄱ-ㅎ가-힣a-zA-Z]", "", key)
value = rest[0].strip() if rest else ""
value = re.sub(r"^[^\w가-힣a-zA-Z]+", "", value).strip()
result_dict[cleaned_key] = value
json_data = {
"filename": file_name,
f"{mode}_model": {
"ocr_model": ocr_model,
"llm_model": llm_model,
"api_url": llm_url,
},
"time": {
"duration_sec": f"{end_time - start_time:.2f}",
"started_at": start_time,
"ended_at": end_time,
},
"fields": coord,
"parsed": text,
"generated": generated_text,
"processed": result_dict,
}
# final_result
logger.info(json.dumps(json_data["processed"], indent=2, ensure_ascii=False))
return json_data
def ocr_process(filename, ocr_model, coord, text, start_time, end_time):
json_data = {
"filename": filename,
"model": {"ocr_model": ocr_model},
"time": {
"duration_sec": f"{end_time - start_time:.2f}",
"started_at": start_time,
"ended_at": end_time,
},
"fields": coord,
"parsed": text,
}
return json_data