Files
ocr_gateway_test/api.py
2025-10-27 09:18:24 +09:00

127 lines
4.1 KiB
Python

# ocr/api.py
import logging
import httpx
from config.setting import CELERY_FLOWER
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
from router import ocr_api_router
from utils.celery_utils import celery_app
from utils.celery_utils import health_check as celery_health_check_task
from utils.redis_utils import get_redis_client
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s"
)
app = FastAPI(title="OCR GATEWAY", description="OCR API 서비스", docs_url="/docs")
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://172.16.42.101",
"http://gsim.hanmaceng.co.kr",
"http://gsim.hanmaceng.co.kr:6464",
],
allow_origin_regex=r"http://(172\.16\.\d{1,3}\.\d{1,3}|gsim\.hanmaceng\.co\.kr)(:\d+)?",
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Prometheus Metrics Exporter 활성화
Instrumentator().instrument(app).expose(app)
app.include_router(ocr_api_router)
@app.get("/health/API")
async def health_check():
"""애플리케이션 상태 확인"""
return {"status": "API ok"}
@app.get("/health/Redis")
def redis_health_check():
client = get_redis_client()
if client is None:
raise HTTPException(status_code=500, detail="Redis connection failed")
try:
client.ping()
return {"status": "Redis ok"}
except Exception:
raise HTTPException(status_code=500, detail="Redis ping failed")
@app.get("/health/Celery")
async def celery_health_check():
"""Celery 워커 상태 확인"""
# celery_app = get_celery_app() # 이제 celery_utils에서 직접 임포트합니다.
try:
# 1. 워커들에게 ping 보내기
active_workers = celery_app.control.ping(timeout=1.0)
if not active_workers:
raise HTTPException(
status_code=503, detail="No active Celery workers found."
)
# 2. 간단한 작업 실행하여 E2E 확인
task = celery_health_check_task.delay()
result = task.get(timeout=10) # 10초 타임아웃
if task.state == "SUCCESS" and result.get("status") == "ok":
return {
"status": "Celery is healthy",
"active_workers": active_workers,
"task_status": "SUCCESS",
}
else:
raise HTTPException(
status_code=500,
detail=f"Celery health check task failed with state: {task.state}",
)
except HTTPException as e:
# 이미 HTTPException인 경우 그대로 전달
raise e
except Exception as e:
logging.error(f"Celery health check failed: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"An error occurred during Celery health check: {str(e)}",
)
@app.get("/health/Flower")
async def flower_health_check():
"""Flower 모니터링 대시보드 상태 확인"""
try:
flower_api_url = CELERY_FLOWER # Use the full URL from settings
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(flower_api_url)
response.raise_for_status()
# Just check if the API is reachable
if response.status_code == 200:
return {"status": "Flower is running"}
else:
raise HTTPException(
status_code=response.status_code,
detail=f"Flower API returned status {response.status_code}",
)
except httpx.RequestError as e:
logging.error(f"Could not connect to Flower: {e}", exc_info=True)
raise HTTPException(
status_code=503, detail=f"Could not connect to Flower: {str(e)}"
)
except Exception as e:
logging.error(f"Flower health check failed: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"An error occurred during Flower health check: {str(e)}",
)