commit a5e23e8da57ce276a561325d748c8d3aaf68b899 Author: kyy Date: Mon Oct 27 09:18:24 2025 +0900 first commit diff --git a/.env b/.env new file mode 100644 index 0000000..9641516 --- /dev/null +++ b/.env @@ -0,0 +1,8 @@ +REDIS_HOST = ocr_gateway_test_redis +REDIS_PORT = 6379 +REDIS_DB = 0 + +CELERY_FLOWER=http://ocr_gateway_test_flower:5556/api/workers + +UPSTAGE_API_KEY=up_Bb8A7xWmYbtaSvEoYarr4EGhqiL4r +UPSTAGE_API_URL=https://api.upstage.ai/v1/document-digitization \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e66de5a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,25 @@ +FROM paddlepaddle/paddle:3.2.0-gpu-cuda11.8-cudnn8.9 + +RUN apt-get update && \ + apt-get install -y \ + poppler-utils \ + tesseract-ocr \ + tesseract-ocr-kor \ + libgl1 \ + curl \ + tree \ + git \ + build-essential && \ + apt-get clean && rm -rf /var/lib/apt/lists/* + +ENV TESSDATA_PREFIX=/usr/share/tesseract-ocr/5/tessdata + +COPY requirements.txt . +RUN pip install --upgrade pip && \ + pip install --no-cache-dir -r requirements.txt && \ + pip install --no-cache-dir paddleocr[all] + +WORKDIR /workspace +COPY . . + +CMD ["uvicorn", "api:app", "--workers", "2", "--host", "0.0.0.0", "--port", "8880"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..2a080bf --- /dev/null +++ b/README.md @@ -0,0 +1,44 @@ +# OCR Gateway 테스트 프로젝트 + +이 프로젝트는 OCR Gateway를 테스트하거나 새로운 모델을 연동하여 테스트하기 위해 구성되었습니다. + +## 프로젝트 구조 및 기능 + +``` +/mnt/c/Python/workspace/ocr_gateway_test/ +├───.env # 환경 변수 설정 파일 (API 키, 비밀 값 등) +├───api.py # FastAPI 애플리케이션의 메인 실행 파일 +├───docker-compose.yml # Docker 다중 컨테이너 실행을 위한 설정 파일 +├───Dockerfile # Python 애플리케이션의 Docker 이미지 생성 파일 +├───pyproject.toml # Python 프로젝트 설정 및 의존성 관리 파일 +├───requirements.txt # Python 패키지 의존성 목록 +├───tasks.py # Celery 비동기 작업을 정의하는 파일 +├───.git/ # Git 버전 관리 폴더 +├───config/ # 애플리케이션 설정 관련 폴더 +│ ├───__init__.py +│ └───setting.py # 애플리케이션의 주요 설정 값 (경로, 모델 정보 등) +├───router/ # API 엔드포인트(라우팅)를 관리하는 폴더 +│ ├───__init__.py +│ └───ocr_api_router.py # OCR 관련 API 라우터를 정의하는 파일 +└───utils/ # 공통 유틸리티 및 핵심 기능을 모아둔 폴더 + ├───__init__.py + ├───celery_utils.py # Celery 관련 유틸리티 함수 + ├───checking_keys.py # API 키 유효성 검사 등 키 관련 유틸리티 + ├───file_handler.py # 파일 업로드, 다운로드 등 파일 처리 유틸리티 + ├───ocr_processor.py # OCR 처리 로직을 관리하고 모델을 선택하는 유틸리티 + ├───preprocessor.py # OCR 처리 전 이미지 전처리를 담당하는 유틸리티 + ├───redis_utils.py # Redis 관련 유틸리티 함수 + └───text_extractor.py # 실제 OCR 모델을 호출하여 텍스트를 추출하는 함수들이 있는 파일 +``` + +## 신규 모델 추가 및 실행 가이드 + +새로운 OCR 모델을 추가하려면 다음 단계를 따르세요. + +1. **모델 함수 추가**: `utils/text_extractor.py` 파일에 새로운 OCR 모델을 호출하고 텍스트를 추출하는 Python 함수를 추가합니다. + +2. **빌드 및 실행**: 터미널에서 아래 명령어를 실행하여 변경사항을 적용하고 Docker 컨테이너를 다시 빌드하고 실행합니다. + + ```bash + docker compose up -d --build + ``` diff --git a/api.py b/api.py new file mode 100644 index 0000000..6bf55cf --- /dev/null +++ b/api.py @@ -0,0 +1,126 @@ +# ocr/api.py +import logging + +import httpx +from config.setting import CELERY_FLOWER +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from prometheus_fastapi_instrumentator import Instrumentator +from router import ocr_api_router +from utils.celery_utils import celery_app +from utils.celery_utils import health_check as celery_health_check_task +from utils.redis_utils import get_redis_client + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s" +) + + +app = FastAPI(title="OCR GATEWAY", description="OCR API 서비스", docs_url="/docs") + +app.add_middleware( + CORSMiddleware, + allow_origins=[ + "http://172.16.42.101", + "http://gsim.hanmaceng.co.kr", + "http://gsim.hanmaceng.co.kr:6464", + ], + allow_origin_regex=r"http://(172\.16\.\d{1,3}\.\d{1,3}|gsim\.hanmaceng\.co\.kr)(:\d+)?", + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Prometheus Metrics Exporter 활성화 +Instrumentator().instrument(app).expose(app) + +app.include_router(ocr_api_router) + + +@app.get("/health/API") +async def health_check(): + """애플리케이션 상태 확인""" + return {"status": "API ok"} + + +@app.get("/health/Redis") +def redis_health_check(): + client = get_redis_client() + if client is None: + raise HTTPException(status_code=500, detail="Redis connection failed") + try: + client.ping() + return {"status": "Redis ok"} + except Exception: + raise HTTPException(status_code=500, detail="Redis ping failed") + + +@app.get("/health/Celery") +async def celery_health_check(): + """Celery 워커 상태 확인""" + # celery_app = get_celery_app() # 이제 celery_utils에서 직접 임포트합니다. + + try: + # 1. 워커들에게 ping 보내기 + active_workers = celery_app.control.ping(timeout=1.0) + if not active_workers: + raise HTTPException( + status_code=503, detail="No active Celery workers found." + ) + + # 2. 간단한 작업 실행하여 E2E 확인 + task = celery_health_check_task.delay() + result = task.get(timeout=10) # 10초 타임아웃 + + if task.state == "SUCCESS" and result.get("status") == "ok": + return { + "status": "Celery is healthy", + "active_workers": active_workers, + "task_status": "SUCCESS", + } + else: + raise HTTPException( + status_code=500, + detail=f"Celery health check task failed with state: {task.state}", + ) + + except HTTPException as e: + # 이미 HTTPException인 경우 그대로 전달 + raise e + except Exception as e: + logging.error(f"Celery health check failed: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"An error occurred during Celery health check: {str(e)}", + ) + + +@app.get("/health/Flower") +async def flower_health_check(): + """Flower 모니터링 대시보드 상태 확인""" + try: + flower_api_url = CELERY_FLOWER # Use the full URL from settings + async with httpx.AsyncClient(timeout=5.0) as client: + response = await client.get(flower_api_url) + response.raise_for_status() + + # Just check if the API is reachable + if response.status_code == 200: + return {"status": "Flower is running"} + else: + raise HTTPException( + status_code=response.status_code, + detail=f"Flower API returned status {response.status_code}", + ) + + except httpx.RequestError as e: + logging.error(f"Could not connect to Flower: {e}", exc_info=True) + raise HTTPException( + status_code=503, detail=f"Could not connect to Flower: {str(e)}" + ) + except Exception as e: + logging.error(f"Flower health check failed: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"An error occurred during Flower health check: {str(e)}", + ) diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/config/setting.py b/config/setting.py new file mode 100644 index 0000000..73c6cf0 --- /dev/null +++ b/config/setting.py @@ -0,0 +1,23 @@ +import os + +from dotenv import load_dotenv + +load_dotenv() + +# Redis 기본 설정 +REDIS_HOST = os.getenv("REDIS_HOST", "ocr_gateway_redis") +REDIS_PORT = os.getenv("REDIS_PORT", 6379) +REDIS_DB = os.getenv("REDIS_DB", 0) + +# Celery 설정 +CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/0" +CELERY_RESULT_BACKEND = f"redis://{REDIS_HOST}:{REDIS_PORT}/1" + +# Celery Flower 설정 +CELERY_FLOWER = os.getenv("CELERY_FLOWER", "http://ocr_gateway_flower:5556/api/workers") + +# Upsage API Key +UPSTAGE_API_KEY = os.getenv("UPSTAGE_API_KEY") +UPSTAGE_API_URL = os.getenv( + "UPSTAGE_API_URL", "https://api.upstage.ai/v1/document-ai/ocr" +) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..c7fe2e6 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,114 @@ +services: + ocr_gateway_test: + build: + context: . + image: ocr_gateway_test + container_name: ocr_gateway_test + restart: always + ports: + - "8880:8880" + env_file: + - .env + environment: + - TZ=Asia/Seoul + - CELERY_BROKER_URL=redis://ocr_gateway_test_redis:6379/0 + - CELERY_RESULT_BACKEND=redis://ocr_gateway_test_redis:6379/1 + - TESSDATA_PREFIX=/usr/share/tesseract-ocr/4.00/tessdata + - PADDLE_DEVICE=${PADDLE_DEVICE:-gpu} + depends_on: + ocr_gateway_test_redis: + condition: service_healthy + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: all + capabilities: [gpu] + healthcheck: + test: + [ + "CMD-SHELL", + "curl -f http://localhost:8880/health/API && curl -f http://localhost:8880/health/Redis && curl -f http://localhost:8880/health/Celery && curl -f http://localhost:8880/health/Flower", + ] + interval: 60s + timeout: 5s + retries: 3 + start_period: 10s + networks: + - llm_gateway_test_net + + ocr_gateway_test_worker: + build: + context: . + image: ocr_gateway_test + container_name: ocr_gateway_test_worker + restart: always + env_file: + - .env + environment: + - TZ=Asia/Seoul + - CELERY_BROKER_URL=redis://ocr_gateway_test_redis:6379/0 + - CELERY_RESULT_BACKEND=redis://ocr_gateway_test_redis:6379/1 + - TESSDATA_PREFIX=/usr/share/tesseract-ocr/4.00/tessdata + - PADDLE_DEVICE=${PADDLE_DEVICE:-gpu} + command: celery -A tasks worker --loglevel=info --pool=threads --concurrency=4 + depends_on: + ocr_gateway_test_redis: + condition: service_healthy + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: all + capabilities: [gpu] + networks: + - llm_gateway_test_net + + ocr_gateway_test_flower: + image: mher/flower:latest + container_name: ocr_gateway_test_flower + restart: always + env_file: + - .env + environment: + - TZ=Asia/Seoul + - FLOWER_UNAUTHENTICATED_API=true + - TESSDATA_PREFIX=/usr/share/tessdata + entrypoint: + ["sh", "-c", "celery --broker='redis://ocr_gateway_test_redis:6379/0' flower --port=5557"] + ports: + - "5557:5557" + depends_on: + ocr_gateway_test_redis: + condition: service_healthy + networks: + - llm_gateway_test_net + + ocr_gateway_test_redis: + image: redis:7-alpine + container_name: ocr_gateway_test_redis + command: + [ + "redis-server", + "--maxmemory", + "256mb", + "--maxmemory-policy", + "allkeys-lru", + ] + ports: + - "6383:6379" + restart: always + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - llm_gateway_test_net + +networks: + llm_gateway_test_net: + name: llm_gateway_test_net + external: true diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6edb9b6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,94 @@ +# pyproject.toml + +[project] +name = "ocr-gateway" +version = "0.0.0" +requires-python = ">=3.10,<3.12" +dependencies = [ + "fastapi", + "uvicorn[standard]", + "pytesseract", + "pdf2image", + "PyMuPDF", + "python-docx", + "Pillow", + "aiofiles", + "httpx", + "snowflake-id", + "prometheus-fastapi-instrumentator", + "python-multipart", + "redis", + "celery", + "minio", + "opencv-python-headless", + "python-dotenv", + "requests", +] + +[project.optional-dependencies] +test = [ + "pytest>=8", + "pytest-cov", + "anyio", +] + +[tool.ruff] +# 공통 설정 +line-length = 120 +indent-width = 4 +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".git-rewrite", + ".hg", + ".mypy_cache", + ".nox", + ".pants.d", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "venv", +] + +[tool.ruff.lint] +# 기본적으로 Pyflakes('F')와 pycodestyle('E') 코드의 하위 집합을 활성화 +select = ["E4", "E7", "E9", "F"] +ignore = [] +# 활성화된 모든 규칙에 대한 수정 허용 +fixable = ["ALL"] +unfixable = [] +# 밑줄 접두사가 붙은 경우 사용하지 않는 변수를 허용 +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-A-Z0-9_]*[a-zA-Z0-9]+?))$" + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" +skip-magic-trailing-comma = false +line-ending = "auto" +docstring-code-format = false +docstring-code-line-length = "dynamic" + +[tool.pytest.ini_options] +markers = [ + "integration: mark a test as an integration test.", + "e2e: mark a test as an end-to-end test.", + "gpu: mark a test as gpu dependent.", + "minio: mark a test as requiring MinIO.", +] +filterwarnings = [ + "ignore::DeprecationWarning", +] + +[tool.setuptools] +py-modules = ["api", "tasks"] +packages = ["config", "router", "utils"] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9431c2e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,23 @@ +fastapi +uvicorn[standard] +asyncio +pytesseract +pdf2image +PyMuPDF +python-docx +Pillow +aiofiles +httpx +aiofiles +snowflake-id + +prometheus-fastapi-instrumentator +python-multipart +redis +celery + +minio +opencv-python-headless +python-dotenv +requests +pytest \ No newline at end of file diff --git a/router/__init__.py b/router/__init__.py new file mode 100644 index 0000000..3ad80e9 --- /dev/null +++ b/router/__init__.py @@ -0,0 +1,3 @@ +from .ocr_api_router import router as ocr_api_router + +__all__ = ["ocr_api_router"] diff --git a/router/ocr_api_router.py b/router/ocr_api_router.py new file mode 100644 index 0000000..1b175d9 --- /dev/null +++ b/router/ocr_api_router.py @@ -0,0 +1,127 @@ +import json +import logging +from datetime import datetime + +from celery.result import AsyncResult +from fastapi import APIRouter, HTTPException +from fastapi.responses import JSONResponse +from tasks import ( + celery_app, + run_ocr_pipeline, # 🔁 새로 만든 체인 함수 임포트 +) +from utils.checking_keys import create_key +from utils.redis_utils import get_redis_client + +router = APIRouter(prefix="/ocr", tags=["OCR"]) + +redis_client = get_redis_client() +logger = logging.getLogger(__name__) + + +@router.post("", summary="🔍 presigned URL 기반 비동기 OCR 처리") +async def ocr_endpoint(file_requests: dict): + """ + Presigned URL과 OCR 모델을 지정하여 비동기 OCR 작업을 요청합니다. + + - **`file_url`**: OCR을 수행할 파일에 접근할 수 있는 Presigned URL + - **`filename`**: 원본 파일의 이름 + - **`ocr_model`**: 사용할 OCR 모델 (`tesseract`, `pp-ocr`, `pp-structure`, `upstage` 중 선택) + + 요청이 접수되면, 작업 추적을 위한 `request_id`와 `task_id`가 즉시 반환됩니다. + """ + results = [] + file_url = file_requests.get("file_url") + filename = file_requests.get("filename") + ocr_model = file_requests.get("ocr_model") + + if not file_url or not filename: + raise HTTPException(status_code=400, detail="file_url, filename 필수") + + request_id = create_key() + task_id = create_key() + run_ocr_pipeline(file_url, filename, request_id, task_id, ocr_model) + + # Redis에 request_id → task_id 매핑 저장 + try: + redis_client.hset("ocr_task_mapping", request_id, task_id) + except Exception as e: + raise HTTPException(status_code=500, detail=f"작업 정보 저장 오류: {str(e)}") + + # 작업 로그 redis에 기록 + try: + log_entry = { + "status": "작업 접수", + "timestamp": datetime.now().isoformat(), + "task_id": task_id, + "initial_file": filename, + } + redis_client.rpush(f"ocr_status:{request_id}", json.dumps(log_entry)) + except Exception: + pass + + # 작업을 등록한 후, 실제 OCR 처리를 기다리지 않고 즉시 응답 + results.append( + { + "message": "OCR 작업이 접수되었습니다.", + "request_id": request_id, + "task_id": task_id, + "status_check_url": f"/ocr/progress/{request_id}", + } + ) + + return JSONResponse(content={"results": results}) + + +# 실제 OCR 결과는 GET /ocr/progress/{request_id} 엔드포인트를 통해 별도로 조회 +@router.get("/progress/{request_id}", summary="📊 OCR 진행 상태 및 결과 조회") +async def check_progress(request_id: str): + """ + `request_id`를 이용해 OCR 작업의 진행 상태와 최종 결과를 조회합니다. + + - **`celery_status`**: Celery 작업의 현재 상태 (`PENDING`, `STARTED`, `SUCCESS`, `FAILURE` 등) + - **`progress_logs`**: 작업 접수부터 완료까지의 단계별 진행 상황 로그 + - **`final_result`**: OCR 처리가 성공적으로 완료되었을 때, 추출된 텍스트와 좌표 정보가 포함된 최종 결과 + """ + # request_id → task_id 매핑 확인 + task_id = redis_client.hget("ocr_task_mapping", request_id) + + if not task_id: + raise HTTPException( + status_code=404, detail=f"Meeting ID {request_id} 작업 없음" + ) + + # Celery 작업 상태 조회 + result = AsyncResult(task_id, app=celery_app) + status = result.status + + # 작업 로그 조회 + try: + logs = redis_client.lrange(f"ocr_status:{request_id}", 0, -1) + parsed_logs = [json.loads(log) for log in logs] + except Exception as e: + parsed_logs = [ + { + "status": "로그 가져오기 실패", + "error": str(e), + "timestamp": datetime.now().isoformat(), + } + ] + + # 최종 결과 Redis에서 조회 + final_result = None + try: + result_str = redis_client.get(f"ocr_result:{task_id}") + if result_str: + final_result = json.loads(result_str) + except Exception as e: + final_result = {"error": f"결과 조회 실패: {str(e)}"} + + return JSONResponse( + content={ + "request_id": request_id, + "task_id": task_id, + "celery_status": status, + "progress_logs": parsed_logs, + "final_result": final_result, + } + ) diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..25f6d79 --- /dev/null +++ b/tasks.py @@ -0,0 +1,157 @@ +import asyncio +import json +import logging +import os +import tempfile +import time +from datetime import datetime + +import httpx +import redis +from celery import Task, chain +from config.setting import REDIS_DB, REDIS_HOST, REDIS_PORT +from utils.celery_utils import celery_app +from utils.ocr_processor import ocr_process +from utils.text_extractor import extract_text_from_file + +# ✅ Redis 클라이언트 생성 +redis_client = redis.Redis( + host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True +) + +# ✅ 로깅 설정 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# ✅ 공통 Task 베이스 클래스 - 상태 로그 기록 및 예외 후킹 제공 +class BaseTaskWithProgress(Task): + """ + Celery Task를 상속한 공통 Task 베이스 클래스입니다. + 주요 목적은: + + - update_progress()로 Redis에 작업 진행상황 저장 + - on_failure, on_success 메서드를 오버라이딩하여 자동 상태 기록 + + 주요 기능: + - update_progress: 단계별 상태를 ocr_status:{request_id}에 rpush + - on_failure: 예외 발생 시 에러 로그 저장 + - on_success: 작업 성공 시 성공 로그 저장 + """ + + abstract = True + + def update_progress(self, request_id, status_message, step_info=None): + log_entry = { + "status": status_message, + "timestamp": datetime.now().isoformat(), + "step_info": step_info, + } + redis_client.rpush(f"ocr_status:{request_id}", json.dumps(log_entry)) + logger.info(f"[{request_id}] Task Progress: {status_message}") + + def on_failure(self, exc, task_id, args, kwargs, einfo): + request_id = kwargs.get("request_id", "unknown") + self.update_progress( + request_id, + "작업 오류 발생", + {"error": str(exc), "traceback": str(einfo)}, + ) + logger.error(f"[{request_id}] Task Failed: {exc}") + super().on_failure(exc, task_id, args, kwargs, einfo) + + def on_success(self, retval, task_id, args, kwargs): + request_id = kwargs.get("request_id", "unknown") + self.update_progress(request_id, "작업 완료") + logger.info(f"[{request_id}] Task Succeeded") + super().on_success(retval, task_id, args, kwargs) + + +# ✅ Step 1: presigned URL에서 파일 다운로드 +@celery_app.task(bind=True, base=BaseTaskWithProgress) +def fetch_file_from_url( + self, file_url: str, file_name: str, request_id: str, task_id: str +): + self.update_progress(request_id, "파일 다운로드 중") + suffix = os.path.splitext(file_name)[-1] + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: + tmp_path = tmp_file.name + + try: + asyncio.run( + download_file_from_presigned_url(file_url, tmp_path) + ) # 비동기 다운로드 함수 호출 + except Exception as e: + raise RuntimeError(f"파일 다운로드 실패: {e}") + + self.update_progress(request_id, "파일 다운로드 완료") + return tmp_path + + +# ✅ Step 2: OCR 및 후처리 수행 +@celery_app.task(bind=True, base=BaseTaskWithProgress) +def parse_ocr_text( + self, tmp_path: str, request_id: str, file_name: str, ocr_model: str = "upstage" +): + self.update_progress(request_id, "OCR 작업 시작") + start_time = time.time() + text, coord, ocr_model = asyncio.run(extract_text_from_file(tmp_path, ocr_model)) + end_time = time.time() + self.update_progress(request_id, "텍스트 추출 및 후처리 완료") + result_json = ocr_process(file_name, ocr_model, coord, text, start_time, end_time) + return {"result": result_json, "tmp_path": tmp_path} + + +# ✅ Step 3: 결과 Redis 저장 및 임시 파일 삭제 +@celery_app.task(bind=True, base=BaseTaskWithProgress) +def store_ocr_result(self, data: dict, request_id: str, task_id: str): + self.update_progress(request_id, "결과 저장 중") + redis_key = f"ocr_result:{task_id}" + redis_client.set(redis_key, json.dumps(data["result"])) + + try: + os.remove(data["tmp_path"]) + except Exception: + logger.warning(f"[{request_id}] 임시 파일 삭제 실패") + + self.update_progress(request_id, "모든 작업 완료") + + +# ✅ 실제 presigned URL에서 파일 다운로드 수행 +async def download_file_from_presigned_url(file_url: str, save_path: str): + async with httpx.AsyncClient() as client: + resp = await client.get(file_url) + resp.raise_for_status() + with open(save_path, "wb") as f: + f.write(resp.content) + + +# ✅ 전체 OCR 체인 실행 함수 +def run_ocr_pipeline(file_url, file_name, request_id, task_id, ocr_model): + chain( + fetch_file_from_url.s( + file_url=file_url, file_name=file_name, request_id=request_id, task_id=task_id + ) # ✅ Step 1: presigned URL에서 파일 다운로드 + | parse_ocr_text.s( + request_id=request_id, file_name=file_name, ocr_model=ocr_model + ) # ✅ Step 2: OCR 및 후처리 수행 + | store_ocr_result.s( + request_id=request_id, task_id=task_id + ) # ✅ Step 3: 결과 Redis 저장 및 임시 파일 삭제 + ).apply_async(task_id=task_id) + + +# ✅ 결과 조회 함수: Redis에서 task_id로 OCR 결과 조회 +def get_ocr_result(task_id: str): + redis_key = f"ocr_result:{task_id}" + result = redis_client.get(redis_key) + if result: + return json.loads(result) + return None + + +# ✅ 상태 로그 조회 함수: Redis에서 request_id 기반 상태 로그 조회 +def get_ocr_status_log(request_id: str): + redis_key = f"ocr_status:{request_id}" + logs = redis_client.lrange(redis_key, 0, -1) + return [json.loads(entry) for entry in logs] diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/celery_utils.py b/utils/celery_utils.py new file mode 100644 index 0000000..0b7958c --- /dev/null +++ b/utils/celery_utils.py @@ -0,0 +1,13 @@ +# utils/celery_utils.py +from celery import Celery +from config.setting import CELERY_BROKER_URL, CELERY_RESULT_BACKEND + +# Define and export the single Celery app instance +celery_app = Celery( + "ocr_tasks", broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND +) + + +@celery_app.task(name="health_check") +def health_check(): + return {"status": "ok"} diff --git a/utils/checking_keys.py b/utils/checking_keys.py new file mode 100644 index 0000000..5706f41 --- /dev/null +++ b/utils/checking_keys.py @@ -0,0 +1,14 @@ +import logging + +from dotenv import load_dotenv +from snowflake import SnowflakeGenerator + +logger = logging.getLogger(__name__) +load_dotenv() + +def create_key(node: int = 1) -> str: + """ + Snowflake 알고리즘 기반 고유 키 생성기 (request_id용) + """ + generator = SnowflakeGenerator(node) + return str(next(generator)) diff --git a/utils/file_handler.py b/utils/file_handler.py new file mode 100644 index 0000000..c68de83 --- /dev/null +++ b/utils/file_handler.py @@ -0,0 +1,99 @@ +import asyncio +import logging +import os +import re + +import docx +import fitz +from pdf2image import convert_from_path +from PIL import Image + +logger = logging.getLogger(__name__) + + +async def process_file(file_path, ocr_model): + """ + 파일 경로를 기반으로 파일 유형을 확인하고 적절한 처리를 수행합니다. + - PDF, 이미지는 OCR을 위해 이미지 객체 리스트를 반환합니다. + - DOCX는 직접 텍스트를 추출하여 반환합니다. + - 지원하지 않는 형식은 ValueError를 발생시킵니다. + """ + ext = os.path.splitext(file_path)[-1].lower() + images = [] + text_only = None + needs_ocr = False + + # Upstage는 원본 파일 업로드 → 변환 불필요 + if ocr_model == "upstage": + # if ext == ".pdf": + # text_only = await asyncio.to_thread(extract_text_from_pdf_direct, file_path) + # if text_only.strip(): # 텍스트가 충분히 추출되었다면 OCR 생략 + # logger.info(f"[UTILS-TEXT] {ocr_model}: PDF 텍스트 충분 → OCR 생략") + # needs_ocr = False + # return images, text_only, needs_ocr + # else: # 텍스트가 충분하지 않다면 OCR 필요 + # logger.info(f"[FILE-HANDLER] {ocr_model}: PDF 텍스트 부족 → OCR 필요") + # needs_ocr = True + # return images, text_only, needs_ocr + # else: + logger.info(f"[FILE-HANDLER] {ocr_model}: PDF 외 파일은 OCR 필요 (파일 변환 불필요) ") + needs_ocr = True + return images, text_only, needs_ocr + + # Upstage가 아닌 경우 파일 형식에 따라 처리 + if ext == ".pdf": + # text_only = await asyncio.to_thread(extract_text_from_pdf_direct, file_path) + # if text_only.strip(): # 텍스트가 충분히 추출되었다면 OCR 생략 + # logger.info(f"[UTILS-TEXT] {ocr_model}: PDF 텍스트 충분 → OCR 생략") + # needs_ocr = False + # return images, text_only, needs_ocr + + images = await asyncio.to_thread(convert_from_path, file_path, dpi=400) + logger.info(f"[FILE-HANDLER] {ocr_model}: PDF → 이미지 변환 완료 ({len(images)} 페이지)") + needs_ocr = True + + elif ext in [".jpg", ".jpeg", ".png"]: + img = await asyncio.to_thread(Image.open, file_path) + images = [img] + logger.info(f"[FILE-HANDLER] {ocr_model}: 이미지 파일 로딩 완료") + needs_ocr = True + + elif ext == ".docx": + text_only = await asyncio.to_thread(extract_text_from_docx, file_path) + logger.info(f"[FILE-HANDLER] {ocr_model}: Word 문서 텍스트 추출 완료") + needs_ocr = False + + else: + logger.error(f"[ERROR] 지원하지 않는 파일 형식: {ext}") + raise ValueError("지원하지 않는 파일 형식입니다. (PDF, JPG, JPEG, PNG, DOCX)") + + return images, text_only, needs_ocr + + +def extract_text_from_pdf_direct(pdf_path): + text = "" + try: + with fitz.open(pdf_path) as doc: + for page in doc: + text += page.get_text() + valid_chars = re.findall(r"[가-힣a-zA-Z]", text) + logger.info(f"len(valid_chars): {len(valid_chars)}") + if len(valid_chars) < 10: + return text # 텍스트가 충분하지 않으면 바로 반환 + else: + text += page.get_text() + except Exception as e: + logger.info("[ERROR] PDF 텍스트 추출 실패:", e) + return text + + +def extract_text_from_docx(docx_path): + """DOCX 파일에서 텍스트를 추출합니다.""" + text = "" + try: + doc = docx.Document(docx_path) + for para in doc.paragraphs: + text += para.text + "\n" + except Exception as e: + logger.error(f"[ERROR] DOCX 텍스트 추출 실패: {e}") + return text diff --git a/utils/ocr_processor.py b/utils/ocr_processor.py new file mode 100644 index 0000000..4a621bd --- /dev/null +++ b/utils/ocr_processor.py @@ -0,0 +1,14 @@ +def ocr_process(filename, ocr_model, coord, text, start_time, end_time): + json_data = { + "filename": filename, + "model": {"ocr_model": ocr_model}, + "time": { + "duration_sec": f"{end_time - start_time:.2f}", + "started_at": start_time, + "ended_at": end_time, + }, + "fields": coord, + "parsed": text, + } + + return json_data diff --git a/utils/preprocessor.py b/utils/preprocessor.py new file mode 100644 index 0000000..b9cf77d --- /dev/null +++ b/utils/preprocessor.py @@ -0,0 +1,61 @@ +import cv2 +import numpy as np +from PIL import Image +import logging + +logger = logging.getLogger(__name__) + + +def to_rgb_uint8(img_np: np.ndarray) -> np.ndarray: + """ + 입력 이미지를 3채널 RGB, uint8 [0,255] 로 표준화 + 허용 입력: HxW, HxWx1, HxWx3, HxWx4, float[0..1]/[0..255], int 등 + """ + if img_np is None: + raise ValueError("Input image is None") + + # dtype/범위 표준화 + if img_np.dtype != np.uint8: + arr = img_np.astype(np.float32) + if arr.max() <= 1.0: # [0,1]로 보이면 스케일업 + arr *= 255.0 + arr = np.clip(arr, 0, 255).astype(np.uint8) + img_np = arr + + # 채널 표준화 + if img_np.ndim == 2: # HxW + img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2RGB) + elif img_np.ndim == 3: + h, w, c = img_np.shape + if c == 1: + img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2RGB) + elif c == 4: + img_np = cv2.cvtColor(img_np, cv2.COLOR_RGBA2RGB) + elif c == 3: + pass # 그대로 사용 + else: + raise ValueError(f"Unsupported channel count: {c}") + else: + raise ValueError(f"Unsupported ndim: {img_np.ndim}") + + return img_np + +# tesseract 전처리 함수 +def tess_prep_cv2(pil_img): + logger.info("[UTILS-OCR] 이미지 전처리 시작") + img = np.array(pil_img.convert("RGB")) # PIL → OpenCV 변환 + img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) # 그레이스케일 변환 + img = cv2.bilateralFilter(img, 9, 75, 75) # 노이즈 제거 + img = cv2.adaptiveThreshold( + img, + 255, + cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY, + 31, + 10, # 대비 향상 + ) + img = cv2.resize( + img, None, fx=2, fy=2, interpolation=cv2.INTER_LINEAR + ) # 해상도 확대 + + return Image.fromarray(img) \ No newline at end of file diff --git a/utils/redis_utils.py b/utils/redis_utils.py new file mode 100644 index 0000000..1eda51f --- /dev/null +++ b/utils/redis_utils.py @@ -0,0 +1,22 @@ +# utils/redis_utils.py + +import redis +from config.setting import REDIS_DB, REDIS_HOST, REDIS_PORT + + +def get_redis_client(): + """ + Redis 클라이언트를 반환합니다. decode_responses=True 설정으로 문자열을 자동 디코딩합니다. + """ + try: + redis_client = redis.Redis( + host=REDIS_HOST, + port=REDIS_PORT, + db=REDIS_DB, + decode_responses=True, + ) + # 연결 확인 (ping) + redis_client.ping() + return redis_client + except redis.ConnectionError as e: + raise RuntimeError(f"Redis 연결 실패: {e}") diff --git a/utils/text_extractor.py b/utils/text_extractor.py new file mode 100644 index 0000000..d71b914 --- /dev/null +++ b/utils/text_extractor.py @@ -0,0 +1,316 @@ +import asyncio +import logging +import os +from pathlib import Path + +import cv2 +import httpx +import numpy as np +import paddle +import pytesseract +from config.setting import UPSTAGE_API_KEY, UPSTAGE_API_URL +from paddleocr import PaddleOCR, PPStructureV3 + +from .file_handler import process_file +from .preprocessor import tess_prep_cv2, to_rgb_uint8 + +logger = logging.getLogger(__name__) + + +# PaddleOCR 및 PPStructure 모델을 전역 변수로 초기화 +# 이렇게 하면 Celery 워커가 시작될 때 한 번만 모델을 로드합니다. +_paddle_ocr_model = None +_paddle_structure_model = None + + +def get_paddle_ocr_model(): + """PaddleOCR 모델 인스턴스를 반환합니다 (Singleton).""" + global _paddle_ocr_model + if _paddle_ocr_model is None: + device = os.getenv("PADDLE_DEVICE", "cpu") + logger.info(f"Initializing PaddleOCR model on device: {device}") + _paddle_ocr_model = PaddleOCR( + use_doc_orientation_classify=False, + use_doc_unwarping=False, + device=device, + lang="korean", + ) + logger.info("PaddleOCR model initialized.") + return _paddle_ocr_model + + +def get_paddle_structure_model(): + """PPStructure 모델 인스턴스를 반환합니다 (Singleton).""" + global _paddle_structure_model + if _paddle_structure_model is None: + device = os.getenv("PADDLE_DEVICE", "cpu") + logger.info(f"Initializing PPStructure model on device: {device}") + _paddle_structure_model = PPStructureV3( + use_doc_orientation_classify=False, + use_doc_unwarping=False, + device=device, + lang="korean", + layout_threshold=0.3, # 레이아웃 인식 실패로 임계값 수정됨 + ) + logger.info("PPStructure model initialized.") + return _paddle_structure_model + + +async def extract_text_from_file(file_path, ocr_model): + """ + 파일을 처리하고 OCR 모델을 적용하여 텍스트를 추출합니다. + """ + images, text_only, needs_ocr = await process_file(file_path, ocr_model) + + if not needs_ocr: + return text_only, [], "OCR not used" + + if ocr_model == "tesseract": + logger.info(f"[TESSERACT] {ocr_model} 로 이미지에서 텍스트 추출 중...") + full_response, coord_response = await asyncio.to_thread( + extract_tesseract_ocr, images + ) + elif ocr_model == "pp-ocr": + logger.info(f"[PP-OCR] {ocr_model}로 이미지에서 텍스트 추출 중...") + full_response, coord_response = await asyncio.to_thread( + extract_paddle_ocr, images + ) + elif ocr_model == "pp-structure": + logger.info(f"[PP-STRUCTURE] {ocr_model}로 이미지에서 텍스트 추출 중...") + full_response, coord_response = await asyncio.to_thread( + extract_paddle_structure, images + ) + elif ocr_model == "upstage": + logger.info(f"[UPSTAGE] {ocr_model}로 이미지에서 텍스트 추출 중...") + full_response, coord_response = await extract_upstage_ocr(file_path) + else: + logger.error(f"[OCR MODEL] 지원하지 않는 모델입니다. ({ocr_model})") + raise ValueError(f"지원하지 않는 OCR 모델입니다: {ocr_model}") + + return full_response, coord_response, ocr_model + + +# ✅ tesseract +def extract_tesseract_ocr(images): + """ + tesseract를 사용하여 이미지에서 텍스트 추출 및 좌표 정보 반환 + """ + all_texts = [] + coord_response = [] + + for page_idx, img in enumerate(images): + logger.info(f"[UTILS-OCR] 페이지 {page_idx + 1} OCR로 텍스트 추출 중...") + pre_img = tess_prep_cv2(img) + text = pytesseract.image_to_string( + pre_img, lang="kor+eng", config="--oem 3 --psm 6" + ) + all_texts.append(text) + + ocr_data = pytesseract.image_to_data( + pre_img, + output_type=pytesseract.Output.DICT, + lang="kor+eng", + config="--oem 3 --psm 6", + ) + for i in range(len(ocr_data["text"])): + word = ocr_data["text"][i].strip() + if word == "": + continue + x, y, w, h = ( + ocr_data["left"][i], + ocr_data["top"][i], + ocr_data["width"][i], + ocr_data["height"][i], + ) + coord_response.append( + {"text": word, "coords": [x, y, x + w, y + h], "page": page_idx + 1} + ) + + logger.info(f"[UTILS-OCR] 페이지 {page_idx + 1} 텍스트 및 좌표 추출 완료") + + full_response = "\n".join(all_texts) + return full_response, coord_response + + +# ✅ PaddleOCR +def extract_paddle_ocr(images): + """ + PaddleOCR를 사용하여 이미지에서 텍스트 추출 및 좌표 정보 반환 + """ + ocr = get_paddle_ocr_model() + + full_response = [] + coord_response = [] + + for page_idx, img in enumerate(images): + print(f"[PaddleOCR] 페이지 {page_idx + 1} OCR로 텍스트 추출 중...") + img_np = np.array(img) + + # ✅ 채널/타입 표준화 (grayscale/rgba/float 등 대응) + try: + img_np = to_rgb_uint8(img_np) + except Exception as e: + print(f"[PaddleOCR] 페이지 {page_idx + 1} 입력 표준화 실패: {e}") + continue # 문제 페이지 스킵 후 다음 페이지 진행 + + # ✅ 과도한 해상도 안정화 (최대 변 4000px) + h, w = img_np.shape[:2] + max_side = max(h, w) + max_side_limit = 4000 + if max_side > max_side_limit: + scale = max_side_limit / max_side + new_size = (int(w * scale), int(h * scale)) + img_np = cv2.resize(img_np, new_size, interpolation=cv2.INTER_AREA) + print(f"[PaddleOCR] Resized to {img_np.shape[1]}x{img_np.shape[0]}") + + results = ocr.predict(input=img_np) + + try: + if paddle.is_compiled_with_cuda(): + paddle.device.cuda.synchronize() + paddle.device.cuda.empty_cache() + except Exception: + pass + + print(f"[PaddleOCR] 페이지 {page_idx + 1} OCR 결과 개수: {len(results)}") + for res_idx, res in enumerate(results): + print(f"[PaddleOCR] 페이지 {page_idx + 1} 결과 {res_idx + 1}개 추출 완료") + res_dic = dict(res.items()) + + texts = res_dic.get("rec_texts", []) + boxes = res_dic.get("rec_boxes", []) + + for text, bbox in zip(texts, boxes): + full_response.append(text) + coord_response.append( + {"text": text, "coords": bbox.tolist(), "page": page_idx + 1} + ) + + print("[PaddleOCR] 전체 페이지 텍스트 및 좌표 추출 완료") + return "\n".join(full_response), coord_response + + +# ✅ PaddleStructure +def extract_paddle_structure(images): + """ + PaddleSTRUCTURE 사용하여 이미지에서 텍스트 추출 및 좌표 정보 반환 + """ + structure = get_paddle_structure_model() + + full_response = [] + coord_response = [] + + for page_idx, img in enumerate(images): + print(f"[PaddleSTRUCTURE] 페이지 {page_idx + 1} OCR로 텍스트 추출 중...") + img_np = np.array(img) + print(f"[Padddle-IMG]{img}") + + # ✅ 채널/타입 표준화 (grayscale/rgba/float 등 대응) + try: + img_np = to_rgb_uint8(img_np) + except Exception as e: + print(f"[PaddleSTRUCTURE] 페이지 {page_idx + 1} 입력 표준화 실패: {e}") + continue # 문제 페이지 스킵 후 다음 페이지 진행 + + # ✅ 과도한 해상도 안정화 (최대 변 4000px) + h, w = img_np.shape[:2] + max_side = max(h, w) + max_side_limit = 4000 + if max_side > max_side_limit: + scale = max_side_limit / max_side + new_size = (int(w * scale), int(h * scale)) + img_np = cv2.resize(img_np, new_size, interpolation=cv2.INTER_AREA) + print(f"[PaddleSTRUCTURE] Resized to {img_np.shape[1]}x{img_np.shape[0]}") + + results = structure.predict(input=img_np) + + try: + if paddle.is_compiled_with_cuda(): + paddle.device.cuda.empty_cache() + except Exception: + pass + + print(f"[PaddleSTRUCTURE] 페이지 {page_idx + 1} OCR 결과 개수: {len(results)}") + for res_idx, res in enumerate(results): + print( + f"[PaddleSTRUCTURE] 페이지 {page_idx + 1} 결과 {res_idx + 1}개 추출 완료" + ) + res_dic = dict(res.items()) + blocks = res_dic.get("parsing_res_list", []) or [] + + for block in blocks: + bd = block.to_dict() + + content = bd.get("content", []) + bbox = bd.get("bbox", []) + + full_response.append(content) + + coord_response.append( + {"text": content, "coords": bbox, "page": page_idx + 1} + ) + + print("[PaddleSTRUCTURE] 전체 페이지 텍스트 및 좌표 추출 완료") + return "\n".join(full_response), coord_response + + +# ✅ Upstage OCR API +async def extract_upstage_ocr(file_path: str): + """ + Upstage OCR API를 사용하여 이미지에서 텍스트 및 좌표 추출 + """ + if not UPSTAGE_API_KEY: + raise ValueError("Upstage API 키가 설정되지 않았습니다.") + if not file_path or not os.path.exists(file_path): + raise FileNotFoundError(f"파일이 존재하지 않습니다: {file_path}") + + url = UPSTAGE_API_URL + if not url: + url = "https://api.upstage.ai/v1/document-ai/ocr" + logger.warning(f"UPSTAGE_API_URL not set in config, using default: {url}") + + headers = {"Authorization": f"Bearer {UPSTAGE_API_KEY}"} + data = {"model": "ocr"} + filename = Path(file_path).name + full_text_parts = [] + coord_response = [] + + with open(file_path, "rb") as f: + files = {"document": (filename, f, "application/octet-stream")} + try: + async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: + response = await client.post( + url, headers=headers, files=files, data=data + ) + response.raise_for_status() + result = response.json() + except httpx.HTTPStatusError as e: + logger.error(f"Upstage API 오류: {e.response.text}") + raise RuntimeError(f"Upstage API 오류: {e.response.status_code}") + + try: + pages = result.get("pages", []) + for page_idx, p in enumerate(pages, start=1): + txt = p.get("text") + if txt: + full_text_parts.append(txt) + + for w in p.get("words", []): + verts = (w.get("boundingBox", {}) or {}).get("vertices") + if not verts or len(verts) != 4: + continue + xs = [v.get("x", 0) for v in verts] + ys = [v.get("y", 0) for v in verts] + coord_response.append( + { + "text": w.get("text"), + "coords": [min(xs), min(ys), max(xs), max(ys)], + "page": page_idx, + } + ) + except Exception as e: + logger.error(f"[UPSTAGE] JSON 파싱 실패: {e} / 원본 result: {result}") + return "", [] + + full_response = "\n".join(full_text_parts) + return full_response, coord_response