diff --git a/api.py b/api.py index 7a81d79..d1b7b39 100644 --- a/api.py +++ b/api.py @@ -9,6 +9,7 @@ from prometheus_fastapi_instrumentator import Instrumentator from router import ocr_router from utils.celery_utils import celery_app from utils.celery_utils import health_check as celery_health_check_task +from utils.minio_utils import get_minio_client from utils.redis_utils import get_redis_client logging.basicConfig( @@ -16,7 +17,7 @@ logging.basicConfig( ) -app = FastAPI(title="OCR GATEWAY", description="OCR API 서비스", docs_url="/docs") +app = FastAPI(title="OCR LAB", description="OCR 성능 비교 분석", docs_url="/docs") app.add_middleware( CORSMiddleware, @@ -58,7 +59,6 @@ def redis_health_check(): @app.get("/health/Celery") async def celery_health_check(): """Celery 워커 상태 확인""" - # celery_app = get_celery_app() # 이제 celery_utils에서 직접 임포트합니다. try: # 1. 워커들에게 ping 보내기 @@ -124,3 +124,14 @@ async def flower_health_check(): status_code=500, detail=f"An error occurred during Flower health check: {str(e)}", ) + + +@app.get("/health/MinIO") +def minio_health_check(): + try: + client = get_minio_client() + return {"status": "MinIO ok"} + except Exception as e: + raise HTTPException( + status_code=500, detail=f"MinIO health check failed: {str(e)}" + ) diff --git a/config/setting.py b/config/setting.py index ed262d9..5145400 100644 --- a/config/setting.py +++ b/config/setting.py @@ -5,7 +5,7 @@ from dotenv import load_dotenv load_dotenv() # Redis 기본 설정 -REDIS_HOST = "ocr_redis" +REDIS_HOST = "ocr_perf_redis" REDIS_PORT = 6379 REDIS_DB = 0 @@ -14,8 +14,13 @@ CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/0" CELERY_RESULT_BACKEND = f"redis://{REDIS_HOST}:{REDIS_PORT}/1" # Celery Flower 설정 -CELERY_FLOWER = "http://ocr_celery_flower:5557/api/workers" +CELERY_FLOWER = "http://ocr_perf_flower:5557/api/workers" # Upstage API Key UPSTAGE_API_KEY = os.getenv("UPSTAGE_API_KEY") +# MinIO Settings +MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT") +MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY") +MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY") +MINIO_BUCKET_NAME = os.getenv("MINIO_BUCKET_NAME") diff --git a/router/ocr_router.py b/router/ocr_router.py index ff3bdf6..867637b 100644 --- a/router/ocr_router.py +++ b/router/ocr_router.py @@ -1,11 +1,10 @@ import json -import os -import tempfile +import logging from datetime import datetime -from typing import List from celery import chain from celery.result import AsyncResult +from config.setting import MINIO_BUCKET_NAME from fastapi import APIRouter, File, HTTPException, UploadFile from fastapi.responses import JSONResponse from tasks import ( @@ -15,112 +14,127 @@ from tasks import ( store_ocr_result, ) from utils.checking_keys import create_key +from utils.minio_utils import upload_file_to_minio from utils.redis_utils import get_redis_client +logger = logging.getLogger(__name__) router = APIRouter(prefix="/ocr", tags=["OCR"]) redis_client = get_redis_client() -async def _process_ocr_request(files: List[UploadFile], ocr_task): - results = [] - for file in files: - if not file.filename: - raise HTTPException(status_code=400, detail="파일 이름이 없습니다.") +async def _process_ocr_request(file: UploadFile, ocr_task): + if not file.filename: + raise HTTPException(status_code=400, detail="파일 이름이 없습니다.") - tmp_path = "" - try: - suffix = os.path.splitext(file.filename)[-1] - with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: - content = await file.read() - tmp_file.write(content) - tmp_path = tmp_file.name - except Exception as e: - raise HTTPException(status_code=500, detail=f"파일 저장 실패: {str(e)}") - finally: - await file.close() + request_id = create_key() + task_id = create_key() + bucket_name = MINIO_BUCKET_NAME + object_name = f"{request_id}/{file.filename}" - request_id = create_key() - task_id = create_key() + # MinIO에 파일 업로드 후 presigned URL 생성 + presigned_url = upload_file_to_minio( + file=file, bucket_name=bucket_name, object_name=object_name + ) + logger.info(f"[MinIO] ✅ presigned URL 생성 완료: {presigned_url}") - task_chain = chain( - ocr_task.s( - tmp_path=tmp_path, request_id=request_id, file_name=file.filename - ), - store_ocr_result.s(request_id=request_id, task_id=task_id), - ) - task_chain.apply_async(task_id=task_id) + task_chain = chain( + ocr_task.s( + presigned_url=presigned_url, request_id=request_id, file_name=file.filename + ), + store_ocr_result.s(request_id=request_id, task_id=task_id), + ) + task_chain.apply_async(task_id=task_id) - try: - redis_client.hset("ocr_task_mapping", request_id, task_id) - except Exception as e: - if tmp_path and os.path.exists(tmp_path): - os.remove(tmp_path) - raise HTTPException( - status_code=500, detail=f"작업 정보 저장 오류: {str(e)}" - ) + # Redis에 request_id → task_id 매핑 저장 + try: + redis_client.hset("ocr_task_mapping", request_id, task_id) + except Exception as e: + raise HTTPException(status_code=500, detail=f"작업 정보 저장 오류: {str(e)}") - try: - log_entry = { - "status": "작업 접수", - "timestamp": datetime.now().isoformat(), - "task_id": task_id, - "initial_file": file.filename, - } - redis_client.rpush(f"ocr_status:{request_id}", json.dumps(log_entry)) - except Exception: - pass + try: + log_entry = { + "status": "작업 접수", + "timestamp": datetime.now().isoformat(), + "initial_file": file.filename, + } + redis_client.rpush(f"ocr_status:{request_id}", json.dumps(log_entry)) + except Exception: + pass - results.append( - { - "message": "OCR 작업이 접수되었습니다.", - "request_id": request_id, - "task_id": task_id, - "status_check_url": f"/ocr/progress/{request_id}", - "filename": file.filename, - } - ) - return JSONResponse(content={"results": results}) + return JSONResponse( + content={ + "message": "OCR 작업이 접수되었습니다.", + "request_id": request_id, + "status_check_url": f"/ocr/progress/{request_id}", + "filename": file.filename, + } + ) @router.post("/paddle", summary="[Paddle] 파일 업로드 기반 비동기 OCR") -async def ocr_paddle_endpoint(files: List[UploadFile] = File(...)): - return await _process_ocr_request(files, parse_ocr_text) +async def ocr_paddle_endpoint(file: UploadFile = File(...)): + return await _process_ocr_request(file, parse_ocr_text) @router.post("/upstage", summary="[Upstage] 파일 업로드 기반 비동기 OCR") -async def ocr_upstage_endpoint(files: List[UploadFile] = File(...)): - return await _process_ocr_request(files, call_upstage_ocr_api) +async def ocr_upstage_endpoint(file: UploadFile = File(...)): + return await _process_ocr_request(file, call_upstage_ocr_api) -@router.get("/progress/{request_id}", summary="📊 OCR 진행 상태 및 결과 조회") +@router.get("/progress/{request_id}", summary="OCR 진행 상태 및 결과 조회") async def check_progress(request_id: str): task_id = redis_client.hget("ocr_task_mapping", request_id) if not task_id: - raise HTTPException(status_code=404, detail=f"ID {request_id} 작업을 찾을 수 없습니다.") - - result = AsyncResult(task_id, app=celery_app) - status = result.status + raise HTTPException( + status_code=404, detail=f"ID {request_id} 작업을 찾을 수 없습니다." + ) + # 1) 진행 로그 조회 try: - logs = redis_client.lrange(f"ocr_status:{request_id}", 0, -1) - parsed_logs = [json.loads(log) for log in logs] + logs_raw = redis_client.lrange(f"ocr_status:{request_id}", 0, -1) + parsed_logs = [json.loads(x) for x in logs_raw] except Exception as e: parsed_logs = [{"status": "로그 조회 실패", "error": str(e)}] + # 2) 로그 기반 파생 상태(dervived_status) 계산 + derived_status = None + if parsed_logs: + last = parsed_logs[-1].get("status") + if last in ("모든 작업 완료", "작업 완료"): + derived_status = "SUCCESS" + elif last == "작업 오류 발생": + derived_status = "FAILURE" + + # 3) Celery 상태 (가능하면 조회, 실패해도 무시) + celery_status = "PENDING" + try: + result = AsyncResult(task_id, app=celery_app) + celery_status = result.status or "PENDING" + except Exception: + pass + + # 4) **상태와 무관하게** 결과 먼저 조회 final_result = None - if status == "SUCCESS": - try: - result_str = redis_client.get(f"ocr_result:{task_id}") - if result_str: - final_result = json.loads(result_str) - except Exception as e: - final_result = {"error": f"결과 조회 실패: {str(e)}"} + try: + result_str = redis_client.get(f"ocr_result:{task_id}") + if result_str: + final_result = json.loads(result_str) + # 결과가 있으면 상태를 SUCCESS로 정규화 + if derived_status is None and celery_status not in ("FAILURE", "REVOKED"): + derived_status = "SUCCESS" + except Exception as e: + # 결과 조회 실패도 노출 + final_result = {"error": f"결과 조회 실패: {str(e)}"} + + # 5) 최종 표시 상태 선택(로그/결과가 더 신뢰되면 그걸 우선) + display_status = derived_status or celery_status return JSONResponse( content={ "request_id": request_id, "task_id": task_id, - "celery_status": status, + "celery_status": celery_status, # 원래 Celery 상태(참고용) + "status": display_status, # 사용자가 보기 쉬운 최종 상태 "progress_logs": parsed_logs, "final_result": final_result, } diff --git a/tasks.py b/tasks.py index c743f1f..3ef8ed7 100644 --- a/tasks.py +++ b/tasks.py @@ -2,8 +2,9 @@ import asyncio import json import logging import os +import tempfile import time -from datetime import datetime +from datetime import datetime, timezone import httpx import redis @@ -18,24 +19,24 @@ from utils.celery_utils import celery_app from utils.ocr_processor import ocr_process from utils.text_extractor import extract_text_from_file -# ✅ Redis 클라이언트 생성 +# Redis 클라이언트 redis_client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True ) -# ✅ 로깅 설정 +# 로깅 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# ✅ 공통 Task 베이스 클래스 - 상태 로그 기록 및 예외 후킹 제공 +# 공통 Task 베이스 클래스 (진행 로그 + 실패/성공 훅) class BaseTaskWithProgress(Task): abstract = True - def update_progress(self, request_id, status_message, step_info=None): + def update_progress(self, request_id: str, status_message: str, step_info=None): log_entry = { "status": status_message, - "timestamp": datetime.now().isoformat(), + "timestamp": datetime.now(timezone.utc).isoformat(), "step_info": step_info, } redis_client.rpush(f"ocr_status:{request_id}", json.dumps(log_entry)) @@ -49,15 +50,6 @@ class BaseTaskWithProgress(Task): {"error": str(exc), "traceback": str(einfo)}, ) logger.error(f"[{request_id}] Task Failed: {exc}") - # 실패 시 임시 파일 삭제 - tmp_path = kwargs.get("tmp_path") - if tmp_path and os.path.exists(tmp_path): - try: - os.remove(tmp_path) - self.update_progress(request_id, "임시 파일 삭제 완료") - except Exception as e: - logger.error(f"[{request_id}] 임시 파일 삭제 실패: {e}") - super().on_failure(exc, task_id, args, kwargs, einfo) def on_success(self, retval, task_id, args, kwargs): @@ -67,21 +59,92 @@ class BaseTaskWithProgress(Task): super().on_success(retval, task_id, args, kwargs) -# ✅ (Paddle) Step 2: OCR 및 후처리 수행 +# presigned URL에서 파일 다운로드 (비동기) +async def download_file_from_presigned_url(file_url: str, save_path: str): + async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: + resp = await client.get(file_url) + resp.raise_for_status() + with open(save_path, "wb") as f: + f.write(resp.content) + + +# (Paddle) OCR + 후처리 @celery_app.task(bind=True, base=BaseTaskWithProgress) -def parse_ocr_text(self, tmp_path: str, request_id: str, file_name: str): +def parse_ocr_text(self, presigned_url: str, request_id: str, file_name: str): self.update_progress(request_id, "Paddle OCR 작업 시작") - start_time = time.time() - text, coord, ocr_model = asyncio.run(extract_text_from_file(tmp_path)) - end_time = time.time() - self.update_progress(request_id, "텍스트 추출 및 후처리 완료") - result_json = ocr_process(file_name, ocr_model, coord, text, start_time, end_time) - return {"result": result_json, "tmp_path": tmp_path} + + suffix = os.path.splitext(file_name)[-1] + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: + tmp_path = tmp_file.name + + try: + # 1) 파일 다운로드 + self.update_progress(request_id, "파일 다운로드 중 (presigned URL)") + try: + asyncio.run(download_file_from_presigned_url(presigned_url, tmp_path)) + except Exception as e: + raise RuntimeError(f"파일 다운로드 실패: {e}") + self.update_progress(request_id, "파일 다운로드 완료") + + # 2) OCR 실행 + start_time = time.time() + text, coord, ocr_model = asyncio.run(extract_text_from_file(tmp_path)) + end_time = time.time() + self.update_progress(request_id, "텍스트 추출 및 후처리 완료") + + # 3) 결과 JSON 생성 + result_json = ocr_process( + file_name, # 1 + ocr_model, # 2 + coord, # 3 + text, # 4 + start_time, # 5 + end_time, # 6 + ) + return result_json + + finally: + if os.path.exists(tmp_path): + os.remove(tmp_path) -# ✅ (Upstage) Step 2: Upstage OCR API 호출 +# Upstage 응답 정규화: 가능한 많은 'text'를 모으고, 후보 bbox를 수집 +def _normalize_upstage_response(resp_json): + """ + Upstage 문서 디지타이제이션 응답에서 text와 bbox 후보를 추출. + 구조가 달라도 dict/list를 재귀 탐색하여 'text' 유사 키와 bbox 유사 키를 모읍니다. + """ + texts = [] + boxes = [] + + def walk(obj): + if isinstance(obj, dict): + for k, v in obj.items(): + kl = k.lower() + # text 후보 키 + if kl in ("text", "content", "ocr_text", "full_text", "value"): + if isinstance(v, str) and v.strip(): + texts.append(v.strip()) + # bbox/box 후보 키 + if kl in ("bbox", "box", "bounding_box", "boundingbox", "polygon"): + boxes.append(v) + # 재귀 + walk(v) + elif isinstance(obj, list): + for item in obj: + walk(item) + + walk(resp_json) + + merged_text = ( + "\n".join(texts) if texts else json.dumps(resp_json, ensure_ascii=False) + ) + return merged_text, boxes + + +# (Upstage) 외부 OCR API 호출 + 후처리 @celery_app.task(bind=True, base=BaseTaskWithProgress) -def call_upstage_ocr_api(self, tmp_path: str, request_id: str, file_name: str): +def call_upstage_ocr_api(self, presigned_url: str, request_id: str, file_name: str): self.update_progress(request_id, "Upstage OCR 작업 시작") if not UPSTAGE_API_KEY: @@ -90,36 +153,63 @@ def call_upstage_ocr_api(self, tmp_path: str, request_id: str, file_name: str): url = "https://api.upstage.ai/v1/document-digitization" headers = {"Authorization": f"Bearer {UPSTAGE_API_KEY}"} + suffix = os.path.splitext(file_name)[-1] + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: + tmp_path = tmp_file.name + try: + # 1) 파일 다운로드 + self.update_progress(request_id, "파일 다운로드 중 (presigned URL)") + try: + asyncio.run(download_file_from_presigned_url(presigned_url, tmp_path)) + except Exception as e: + raise RuntimeError(f"파일 다운로드 실패: {e}") + self.update_progress(request_id, "파일 다운로드 완료") + + # 2) Upstage API 호출(시간 측정) + start_time = time.time() with open(tmp_path, "rb") as f: files = {"document": (file_name, f, "application/octet-stream")} data = {"model": "ocr"} - with httpx.Client() as client: - response = client.post(url, headers=headers, files=files, data=data) - response.raise_for_status() + try: + with httpx.Client(timeout=120.0, follow_redirects=True) as client: + response = client.post(url, headers=headers, files=files, data=data) + response.raise_for_status() + except httpx.HTTPStatusError as e: + logger.error(f"Upstage API 오류: {e.response.text}") + raise RuntimeError(f"Upstage API 오류: {e.response.status_code}") + except Exception as e: + logger.error(f"Upstage API 호출 중 예외 발생: {e}") + raise RuntimeError("Upstage API 호출 실패") + end_time = time.time() self.update_progress(request_id, "Upstage API 호출 성공") - return {"result": response.json(), "tmp_path": tmp_path} - except httpx.HTTPStatusError as e: - logger.error(f"Upstage API 오류: {e.response.text}") - raise RuntimeError(f"Upstage API 오류: {e.response.status_code}") - except Exception as e: - logger.error(f"Upstage API 호출 중 예외 발생: {e}") - raise RuntimeError("Upstage API 호출 실패") + + # 3) 응답 정규화 → text/coord 추출 + resp_json = response.json() + text, coord = _normalize_upstage_response(resp_json) + + # 4) 공통 후처리(JSON 스키마 통일) + result_json = ocr_process( + file_name, # 1 + "upstage", # 2 + coord, # 3 + text, # 4 + start_time, # 5 + end_time, # 6 + ) + self.update_progress(request_id, "후처리 완료") + return result_json + + finally: + if os.path.exists(tmp_path): + os.remove(tmp_path) -# ✅ Step 3: 결과 Redis 저장 및 임시 파일 삭제 +# 결과 Redis 저장 (체인의 두 번째 스텝) +# router 체인: store_ocr_result.s(request_id=request_id, task_id=task_id) @celery_app.task(bind=True, base=BaseTaskWithProgress, ignore_result=True) -def store_ocr_result(self, data: dict, request_id: str, task_id: str): +def store_ocr_result(self, result_data: dict, request_id: str, task_id: str): self.update_progress(request_id, "결과 저장 중") redis_key = f"ocr_result:{task_id}" - redis_client.set(redis_key, json.dumps(data.get("result", {}))) - - tmp_path = data.get("tmp_path") - if tmp_path and os.path.exists(tmp_path): - try: - os.remove(tmp_path) - self.update_progress(request_id, "임시 파일 삭제 완료") - except Exception as e: - logger.warning(f"[{request_id}] 임시 파일 삭제 실패: {e}") - + redis_client.set(redis_key, json.dumps(result_data, ensure_ascii=False)) self.update_progress(request_id, "모든 작업 완료") diff --git a/utils/minio_utils.py b/utils/minio_utils.py new file mode 100644 index 0000000..196cb23 --- /dev/null +++ b/utils/minio_utils.py @@ -0,0 +1,99 @@ +import logging +from datetime import timedelta + +from config.setting import ( + MINIO_ACCESS_KEY, + MINIO_BUCKET_NAME, + MINIO_ENDPOINT, + MINIO_SECRET_KEY, +) +from fastapi import UploadFile +from minio import Minio +from minio.error import S3Error + +logger = logging.getLogger(__name__) + + +def get_minio_client(): + """MinIO 클라이언트를 생성하고 반환합니다.""" + try: + client = Minio( + MINIO_ENDPOINT, + access_key=MINIO_ACCESS_KEY, + secret_key=MINIO_SECRET_KEY, + secure=False, # 개발 환경에서는 False, 프로덕션에서는 True 사용 + ) + # 버킷 존재 여부 확인 및 생성 + found = client.bucket_exists(MINIO_BUCKET_NAME) + if not found: + client.make_bucket(MINIO_BUCKET_NAME) + logger.info(f"Bucket '{MINIO_BUCKET_NAME}' created.") + else: + logger.info(f"Bucket '{MINIO_BUCKET_NAME}' already exists.") + return client + except (S3Error, Exception) as e: + logger.error(f"Error connecting to MinIO: {e}") + raise + + +def upload_file_to_minio(file: UploadFile, bucket_name: str, object_name: str) -> str: + """ + 파일을 MinIO에 업로드하고, presigned URL을 반환합니다. + + Args: + file (UploadFile): FastAPI의 UploadFile 객체 + bucket_name (str): 업로드할 버킷 이름 + object_name (str): 저장될 객체 이름 (경로 포함 가능) + + Returns: + str: 생성된 presigned URL + """ + minio_client = get_minio_client() + try: + # 1. 버킷 존재 확인 및 생성 + found = minio_client.bucket_exists(bucket_name) + if not found: + minio_client.make_bucket(bucket_name) + logger.info(f"✅ 버킷 '{bucket_name}' 생성 완료.") + + # 2. 파일 업로드 + file.file.seek(0) # 파일 포인터를 처음으로 이동 + minio_client.put_object( + bucket_name, + object_name, + file.file, + length=-1, # 파일 크기를 모를 때 -1로 설정 + part_size=10 * 1024 * 1024, # 10MB 단위로 청크 업로드 + ) + logger.info(f"✅ '{object_name}' -> '{bucket_name}' 업로드 성공.") + + # 3. Presigned URL 생성 + presigned_url = minio_client.presigned_get_object( + bucket_name, + object_name, + expires=timedelta(days=7), # URL 만료 기간 (예: 7일, 필요에 따라 조절 가능) + ) + logger.info(f"✅ Presigned URL 생성 완료: {presigned_url}") + + return presigned_url + + except Exception as e: + logger.error(f"❌ MinIO 작업 실패: {e}") + raise # 실패 시 예외를 다시 발생시켜 호출 측에서 처리하도록 함 + + +def download_file_from_minio(object_name: str, local_path: str): + """ + MinIO에서 객체를 다운로드하여 로컬 파일로 저장합니다. + + Args: + object_name (str): 다운로드할 객체의 이름 + local_path (str): 파일을 저장할 로컬 경로 + """ + client = get_minio_client() + try: + client.fget_object(MINIO_BUCKET_NAME, object_name, local_path) + logger.info(f"'{object_name}' downloaded to '{local_path}' successfully.") + except S3Error as e: + logger.error(f"Error downloading from MinIO: {e}") + raise