import io import json import logging from datetime import timedelta from typing import Optional from config.setting import ( MINIO_ACCESS_KEY, MINIO_ENDPOINT, MINIO_RESULTS_BUCKET_NAME, MINIO_SECRET_KEY, ) from fastapi import UploadFile from minio import Minio from minio.error import S3Error # MinIO 클라이언트 전역 생성 minio_client = Minio( MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=False, ) logger = logging.getLogger(__name__) def get_minio_client(): """ MinIO 클라이언트를 반환합니다. 연결 확인을 위해 list_buckets() 호출로 테스트합니다. """ try: client = Minio( MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=False, # HTTPS 사용 여부에 맞게 설정 ) # ✅ 연결 테스트 (버킷 목록 조회) client.list_buckets() return client except Exception as e: raise RuntimeError(f"MinIO 연결 실패: {e}") def save_result_to_minio(result_dict: dict, object_name: str) -> str: """ 결과 JSON(dict)을 BytesIO로 인코딩하여 MinIO에 저장하고 presigned URL 반환 """ try: # JSON -> BytesIO result_bytes = io.BytesIO( json.dumps(result_dict, ensure_ascii=False).encode("utf-8") ) result_bytes.seek(0) # MinIO에 업로드 minio_client.put_object( bucket_name=MINIO_RESULTS_BUCKET_NAME, object_name=object_name, data=result_bytes, length=result_bytes.getbuffer().nbytes, content_type="application/json", ) # presigned URL 생성 presigned_url = minio_client.presigned_get_object( MINIO_RESULTS_BUCKET_NAME, object_name, ) return presigned_url except Exception as e: logger.error(f"❌ MinIO 작업 실패: {e}") raise def upload_file_to_minio_v2( file: UploadFile, bucket_name: str, object_name: str ) -> str: """ 파일을 MinIO에 업로드하고, presigned URL을 반환합니다. Args: file (UploadFile): FastAPI의 UploadFile 객체 bucket_name (str): 업로드할 버킷 이름 object_name (str): 저장될 객체 이름 (경로 포함 가능) Returns: str: 생성된 presigned URL """ try: # 1. 버킷 존재 확인 및 생성 found = minio_client.bucket_exists(bucket_name) if not found: minio_client.make_bucket(bucket_name) logger.info(f"✅ 버킷 '{bucket_name}' 생성 완료.") # 2. 파일 업로드 file.file.seek(0) # 파일 포인터를 처음으로 이동 minio_client.put_object( bucket_name, object_name, file.file, length=-1, # 파일 크기를 모를 때 -1로 설정 part_size=10 * 1024 * 1024, # 10MB 단위로 청크 업로드 ) logger.info(f"✅ '{object_name}' -> '{bucket_name}' 업로드 성공.") # 3. Presigned URL 생성 presigned_url = minio_client.presigned_get_object( bucket_name, object_name, expires=timedelta(days=7), # URL 만료 기간 (예: 7일, 필요에 따라 조절 가능) ) logger.info(f"✅ Presigned URL 생성 완료: {presigned_url}") return presigned_url except Exception as e: logger.error(f"❌ MinIO 작업 실패: {e}") raise # 실패 시 예외를 다시 발생시켜 호출 측에서 처리하도록 함 def fetch_result_from_minio(request_id: str) -> Optional[dict]: try: # MinIO에서 객체 목록 가져오기 (폴더 내 전체 파일 조회) objects = minio_client.list_objects( bucket_name=MINIO_RESULTS_BUCKET_NAME, prefix=f"{request_id}/", recursive=True, ) json_obj = next( (obj for obj in objects if obj.object_name.endswith(".json")), None ) if not json_obj: logger.warning( f"[MINIO] request_id: {request_id} 경로에 .json 파일이 존재하지 않습니다." ) return None object_name = json_obj.object_name print( f"[MINIO] request_id: {request_id}에 대한 결과를 가져옵니다. 대상 파일: {object_name}" ) # 객체 다운로드 response = minio_client.get_object(MINIO_RESULTS_BUCKET_NAME, object_name) content = response.read() # JSON 디코드 result_dict = json.loads(content.decode("utf-8")) logger.info(f"[MINIO] 결과 JSON 로드 성공: {object_name}") return result_dict except S3Error as e: logger.error(f"[MINIO] S3Error 발생: {e}") return None except Exception as e: logger.error(f"[MINIO] 기타 오류 발생: {e}") return None