import asyncio import io import json from typing import Optional from urllib.parse import urlparse import requests from config.setting import ( D6C_PROMPT_PATH, I18N_PROMPT_PATH, PGN_REDIS_DB, PGN_REDIS_HOST, PGN_REDIS_PORT, ) from fastapi import APIRouter, Depends, File, Form, Request, UploadFile from fastapi.responses import JSONResponse from redis import Redis from services.inference_service import InferenceHandler from utils.checking_files import ( clone_upload_file, validate_all_files, ) from utils.checking_keys import create_key, get_api_key from utils.minio_utils import fetch_result_from_minio # Redis 클라이언트 (LLM Gateway 전용) redis_client = Redis( host=PGN_REDIS_HOST, port=PGN_REDIS_PORT, db=PGN_REDIS_DB, decode_responses=True ) router = APIRouter(prefix="/extract", tags=["Extraction"]) # ✅ 공통 비동기 추론 엔드포인트 생성기 def register_extract_route( path: str, mode: str, default_model: str, summary: str, description: str ): @router.post(path, summary=summary, description=description) async def extract_endpoint( request_info: Request, input_file: UploadFile = File(...), prompt_file: Optional[UploadFile] = File( default=None, description="⚠️ prompt_file 업로드하지 않을 경우, **'Send empty value'** 체크박스를 반드시 해제해주세요.", ), model: Optional[str] = Form(default=default_model), api_key: str = Depends(get_api_key), ): validate_all_files(input_file) # ✅ 고유한 요청 ID 생성 request_id = create_key() result_id = create_key() cloned_input = clone_upload_file(input_file) if input_file else None cloned_prompt = clone_upload_file(prompt_file) if prompt_file else None # ✅ 백그라운드에서 작업 실행 asyncio.create_task( InferenceHandler.handle_extract_background( request_id=request_id, result_id=result_id, input_file=cloned_input, schema_file=None, prompt_file=cloned_prompt, mode=mode, model_list=[model], request_info=request_info, api_key=api_key, ) ) # ✅ request_id → result_id 매핑 저장 redis_client.hset("pipeline_result_mapping", request_id, result_id) return JSONResponse( content={ "message": "문서 추출 및 생성형 응답 작업이 백그라운드에서 실행 중입니다.", "request_id": request_id, "status_check_url": f"/extract/progress/{request_id}", } ) # FastAPI 문서화용 정보 부여 extract_endpoint.__name__ = f"extract_{mode}" extract_endpoint.__doc__ = description return extract_endpoint # ✅ 내부 모델용 등록 extract_inner = register_extract_route( path="/inner", mode="inner", default_model="gemma3:27b", summary="내부 LLM 기반 문서 정보 추출 (비동기)", description="""### **요약** 내부망에 배포된 LLM(Ollama 기반)을 사용하여 문서(PDF, 이미지 등)에서 정보를 추출하고 응답을 생성합니다. 이 엔드포인트는 사전 정의된 기본 프롬프트를 사용하며, 비동기적으로 처리됩니다. ### **작동 방식** 1. **요청 접수**: `input_file`을 받아 고유 `request_id`를 생성하고 즉시 반환합니다. 2. **백그라운드 처리**: - `input_file`에 대해 **OCR API**를 호출하여 텍스트를 추출합니다. - 시스템에 내장된 기본 프롬프트와 추출된 텍스트를 조합합니다. (`prompt_file`을 업로드하여 기본 프롬프트를 대체할 수 있습니다.) - 내부 LLM(Ollama)에 추론을 요청합니다. 3. **상태 및 결과 확인**: `GET /extract/progress/{request_id}`로 작업 상태와 최종 결과를 조회합니다. ### **입력 (multipart/form-data)** - `input_file` (**필수**): 정보 추출의 대상이 될 문서 파일. - 지원 형식: `.pdf`, `.docx`, `.jpg`, `.png`, `.jpeg` 등. - `prompt_file` (선택): 기본 프롬프트 대신 사용할 사용자 정의 `.txt` 프롬프트 파일. - `model` (선택): 사용할 내부 LLM 모델 이름. (기본값: `gemma3:27b`) ### **출력 (application/json)** - **초기 응답**: ```json { "message": "작업이 백그라운드에서 실행 중입니다.", "request_id": "고유한 요청 ID", "status_check_url": "/extract/progress/고유한 요청 ID" } ``` - **최종 결과**: `GET /extract/progress/{request_id}`를 통해 확인 가능. """, ) # ✅ 외부 모델용 등록 extract_outer = register_extract_route( path="/outer", mode="outer", default_model="gemini-2.5-flash", summary="외부 LLM 기반 문서 정보 추출 (비동기)", description="""### **요약** 외부 상용 LLM(예: GPT, Gemini)을 사용하여 문서에서 정보를 추출하고 응답을 생성합니다. 내부 LLM 엔드포인트와 작동 방식은 동일하나, 외부 API를 호출합니다. ### **작동 방식** 1. **요청 접수**: `input_file`을 받아 `request_id`를 생성 후 즉시 반환합니다. 2. **백그라운드 처리**: - `input_file`에서 **OCR API**를 통해 텍스트를 추출합니다. - 내장된 기본 프롬프트(또는 사용자 정의 `prompt_file`)와 텍스트를 조합합니다. - 외부 LLM API(OpenAI, Google 등)에 추론을 요청합니다. 3. **상태 및 결과 확인**: `GET /extract/progress/{request_id}`로 작업 상태와 최종 결과를 조회합니다. ### **입력 (multipart/form-data)** - `input_file` (**필수**): 정보 추출 대상 문서 파일. - `prompt_file` (선택): 기본 프롬프트 대신 사용할 `.txt` 파일. - `model` (선택): 사용할 외부 LLM 모델 이름. (기본값: `gemini-2.5-flash`) ### **출력 (application/json)** - **초기 응답**: ```json { "message": "작업이 백그라운드에서 실행 중입니다.", "request_id": "고유한 요청 ID", "status_check_url": "/extract/progress/고유한 요청 ID" } ``` - **최종 결과**: `GET /extract/progress/{request_id}`를 통해 확인 가능. """, ) # ✅ 멀티모달 GPT 테스트용 등록 extract_outer_gpt = register_extract_route( path="/outer/gpt", mode="multimodal", default_model="gpt-4o", summary="멀티모달 GPT 테스트용", description="""### **요약** GPT-4o와 같은 멀티모달 모델을 사용하여, 문서(PDF, 이미지 등)에서 정보를 추출하고 응답을 생성합니다. ### **작동 방식** - 다른 추출 엔드포인트와 동일한 비동기 파이프라인을 따릅니다. - 추론 단계에서 시스템은 멀티모달 출력을 생성하도록 특화된 프롬프트(`multimodal_prompt`)를 사용합니다. - LLM은 `input_file`의 내용과 `prompt_file`의 구조를 바탕으로 JSON 객체를 생성합니다. ### **입력 (multipart/form-data)** - `input_file` (**필수**): 정보 추출 대상 문서 파일. - `prompt_file` (**선택**): 구조화용 기본 프롬프트 대신 사용할 `.txt` 파일. - `model` (선택): 사용할 LLM 모델 이름. ### **출력 (application/json)** - **초기 응답**: `request_id` 포함. - **최종 결과**: `GET /extract/progress/{request_id}` 조회 시, 지정된 스키마를 따르는 JSON 객체가 반환됩니다. """, ) # ✅ 멀티모달 Gemini 테스트용 등록 extract_outer_gemini = register_extract_route( path="/outer/gemini", mode="multimodal", default_model="gemini-2.5-flash", summary="멀티모달 Gemini 테스트용", description="""### **요약** Gemini와 같은 멀티모달 모델을 사용하여, 문서(PDF, 이미지 등)에서 정보를 추출하고 응답을 생성합니다. ### **작동 방식** - 다른 추출 엔드포인트와 동일한 비동기 파이프라인을 따릅니다. - 추론 단계에서 시스템은 멀티모달 출력을 생성하도록 특화된 프롬프트(`multimodal_prompt`)를 사용합니다. - LLM은 `input_file`의 내용과 `prompt_file`의 구조를 바탕으로 JSON 객체를 생성합니다. ### **입력 (multipart/form-data)** - `input_file` (**필수**): 정보 추출 대상 문서 파일. - `prompt_file` (**선택**): 구조화용 기본 프롬프트 대신 사용할 `.txt` 파일. - `model` (선택): 사용할 LLM 모델 이름. ### **출력 (application/json)** - **초기 응답**: `request_id` 포함. - **최종 결과**: `GET /extract/progress/{request_id}` 조회 시, 지정된 스키마를 따르는 JSON 객체가 반환됩니다. """, ) @router.post( "/inner/d6c", summary="국내 문서 테스트용", ) async def extract_d6c( request_info: Request, input_file: UploadFile = File(...), model: Optional[str] = Form(default="gemma3:27b"), api_key: str = Depends(get_api_key), ): validate_all_files(input_file) request_id = create_key() result_id = create_key() cloned_input = clone_upload_file(input_file) if input_file else None # 설정에 정의된 기본 I18N 프롬프트 파일을 항상 사용 with open(I18N_PROMPT_PATH, "rb") as f: content = f.read() # 메모리 내 파일 객체 생성 spooled_file = io.BytesIO(content) # UploadFile과 유사한 객체를 생성하여 백그라운드 핸들러로 전달 dummy_prompt_file = UploadFile(filename=I18N_PROMPT_PATH.name, file=spooled_file) cloned_prompt = clone_upload_file(dummy_prompt_file) asyncio.create_task( InferenceHandler.handle_extract_background( request_id=request_id, result_id=result_id, input_file=cloned_input, schema_file=None, prompt_file=cloned_prompt, mode="inner", model_list=[model], request_info=request_info, api_key=api_key, ) ) redis_client.hset("pipeline_result_mapping", request_id, result_id) return JSONResponse( content={ "message": "문서 추출 및 생성형 응답 작업이 백그라운드에서 실행 중입니다.", "request_id": request_id, "status_check_url": f"/extract/progress/{request_id}", } ) @router.post( "/inner/i18n", summary="해외 문서 테스트용", ) async def extract_i18n( request_info: Request, input_file: UploadFile = File(...), model: Optional[str] = Form(default="gemma3:27b"), api_key: str = Depends(get_api_key), ): validate_all_files(input_file) request_id = create_key() result_id = create_key() cloned_input = clone_upload_file(input_file) if input_file else None # 설정에 정의된 기본 I18N 프롬프트 파일을 항상 사용 with open(D6C_PROMPT_PATH, "rb") as f: content = f.read() # 메모리 내 파일 객체 생성 spooled_file = io.BytesIO(content) # UploadFile과 유사한 객체를 생성하여 백그라운드 핸들러로 전달 dummy_prompt_file = UploadFile(filename=D6C_PROMPT_PATH.name, file=spooled_file) cloned_prompt = clone_upload_file(dummy_prompt_file) asyncio.create_task( InferenceHandler.handle_extract_background( request_id=request_id, result_id=result_id, input_file=cloned_input, schema_file=None, prompt_file=cloned_prompt, mode="inner", model_list=[model], request_info=request_info, api_key=api_key, ) ) redis_client.hset("pipeline_result_mapping", request_id, result_id) return JSONResponse( content={ "message": "문서 추출 및 생성형 응답 작업이 백그라운드에서 실행 중입니다.", "request_id": request_id, "status_check_url": f"/extract/progress/{request_id}", } ) # ✅ structured 모드: 구조화 JSON 스키마 기반 추론 @router.post( "/inner/structured", summary="구조화된 JSON 정보 추출 (비동기)", description="""### **요약** 사용자가 제공한 `schema_file`에 정의된 JSON 스키마에 따라, 문서에서 정보를 추출하여 구조화된 JSON으로 반환합니다. ### **작동 방식** - 다른 추출 엔드포인트와 동일한 비동기 파이프라인을 따릅니다. - 추론 단계에서 시스템은 구조화된 출력을 생성하도록 특화된 프롬프트(`structured_prompt`)를 사용합니다. - LLM은 `input_file`의 내용과 `schema_file`의 구조를 바탕으로 JSON 객체를 생성합니다. ### **입력 (multipart/form-data)** - `input_file` (**필수**): 정보 추출 대상 문서 파일. - `schema_file` (**필수**): 원하는 출력 JSON 구조를 정의하는 `.json` 파일. - `prompt_file` (**필수**): 구조화용 기본 프롬프트 대신 사용할 `.txt` 파일. - `model` (선택): 사용할 LLM 모델 이름. ### **출력 (application/json)** - **초기 응답**: `request_id` 포함. - **최종 결과**: `GET /extract/progress/{request_id}` 조회 시, 지정된 스키마를 따르는 JSON 객체가 반환됩니다. """, ) async def extract_structured_inner( request_info: Request, input_file: UploadFile = File(...), schema_file: UploadFile = File(...), prompt_file: UploadFile = File(...), model: Optional[str] = Form(default="gemma3:27b"), api_key: str = Depends(get_api_key), ): validate_all_files(input_file) # ✅ 고유한 요청 ID 생성 request_id = create_key() result_id = create_key() cloned_input = clone_upload_file(input_file) if input_file else None cloned_schema = clone_upload_file(schema_file) if schema_file else None cloned_prompt = clone_upload_file(prompt_file) if prompt_file else None # ✅ 백그라운드에서 작업 실행 asyncio.create_task( InferenceHandler.handle_extract_background( request_id=request_id, result_id=result_id, input_file=cloned_input, schema_file=cloned_schema, prompt_file=cloned_prompt, mode="structured", model_list=[model], request_info=request_info, api_key=api_key, ) ) # ✅ request_id → result_id 매핑 저장 redis_client.hset("pipeline_result_mapping", request_id, result_id) return JSONResponse( content={ "message": "문서 추출 및 생성형 응답 작업이 백그라운드에서 실행 중입니다.", "request_id": request_id, "status_check_url": f"/extract/progress/{request_id}", } ) # ✅ structured 모드: 구조화 JSON 스키마 기반 추론 @router.post( "/outer/structured", summary="구조화된 JSON 정보 추출 (비동기)", description="""### **요약** 사용자가 제공한 `schema_file`에 정의된 JSON 스키마에 따라, 문서에서 정보를 추출하여 구조화된 JSON으로 반환합니다. ### **작동 방식** - 다른 추출 엔드포인트와 동일한 비동기 파이프라인을 따릅니다. - 추론 단계에서 시스템은 구조화된 출력을 생성하도록 특화된 프롬프트(`structured_prompt`)를 사용합니다. - LLM은 `input_file`의 내용과 `schema_file`의 구조를 바탕으로 JSON 객체를 생성합니다. ### **입력 (multipart/form-data)** - `input_file` (**필수**): 정보 추출 대상 문서 파일. - `schema_file` (**선택**): 원하는 출력 JSON 구조를 정의하는 `.json` 파일. - `prompt_file` (**선택**): 구조화용 기본 프롬프트 대신 사용할 `.txt` 파일. - `model` (선택): 사용할 LLM 모델 이름. ### **출력 (application/json)** - **초기 응답**: `request_id` 포함. - **최종 결과**: `GET /extract/progress/{request_id}` 조회 시, 지정된 스키마를 따르는 JSON 객체가 반환됩니다. """, ) async def extract_structured_outer( request_info: Request, input_file: UploadFile = File(...), schema_file: Optional[UploadFile] = File( default=None, description="⚠️ schema_file 업로드하지 않을 경우, **'Send empty value'** 체크박스를 반드시 해제해주세요.", ), prompt_file: Optional[UploadFile] = File( default=None, description="⚠️ prompt_file 업로드하지 않을 경우, **'Send empty value'** 체크박스를 반드시 해제해주세요.", ), model: Optional[str] = Form(default="gemma3:27b"), api_key: str = Depends(get_api_key), ): validate_all_files(input_file) # ✅ 고유한 요청 ID 생성 request_id = create_key() result_id = create_key() cloned_input = clone_upload_file(input_file) if input_file else None cloned_schema = clone_upload_file(schema_file) if schema_file else None cloned_prompt = clone_upload_file(prompt_file) if prompt_file else None # ✅ 백그라운드에서 작업 실행 asyncio.create_task( InferenceHandler.handle_extract_background( request_id=request_id, result_id=result_id, input_file=cloned_input, schema_file=cloned_schema, prompt_file=cloned_prompt, mode="structured", model_list=[model], request_info=request_info, api_key=api_key, ) ) # ✅ request_id → result_id 매핑 저장 redis_client.hset("pipeline_result_mapping", request_id, result_id) return JSONResponse( content={ "message": "문서 추출 및 생성형 응답 작업이 백그라운드에서 실행 중입니다.", "request_id": request_id, "status_check_url": f"/extract/progress/{request_id}", } ) # ✅ 상태 로그 조회 API @router.get( "/progress/{request_id}", summary="정보 추출 작업 상태 및 결과 조회", description="""### **요약** `POST /extract/*` 계열 엔드포인트 요청 시 반환된 `request_id`를 사용하여, 해당 정보 추출 작업의 진행 상태와 최종 결과를 조회합니다. ### **작동 방식** - `request_id`를 기반으로 Redis에 저장된 작업 로그와 결과 데이터를 조회합니다. - 작업이 진행 중일 때는 현재까지의 로그를, 완료되었을 때는 로그와 함께 최종 결과(`final_result`)를 반환합니다. ### **입력** - `request_id`: 조회할 작업의 고유 ID. ### **출력 (application/json)** - **성공 시**: ```json { "request_id": "요청 시 사용된 ID", "progress_logs": [ { "timestamp": "...", "status": "OCR 시작", "details": "..." }, { "timestamp": "...", "status": "입력 길이 검사 시작", "details": "..." }, { "timestamp": "...", "status": "LLM 추론 시작", "details": "..." }, { "timestamp": "...", "status": "LLM 추론 완료 및 후처리 시작", "details": "..." }, { "timestamp": "...", "status": "후처리 완료 및 결과 반환"", "details": "..." } ], "final_result": { "filename": "입력 파일", "processed": "LLM의 최종 응답 내용" } } ``` - **ID가 유효하지 않을 경우 (404 Not Found)**: ```json { "message": "{request_id}에 대한 상태 로그가 없습니다." } ``` """, ) async def get_pipeline_status(request_id: str): redis_key = f"pipeline_status:{request_id}" # 1. 상태 로그 조회 logs = redis_client.lrange(redis_key, 0, -1) if not logs: return JSONResponse( status_code=404, content={"message": f"{request_id}에 대한 상태 로그가 없습니다."}, ) parsed_logs = [json.loads(log) for log in logs] if logs else [] # 2. request_id → result_id 매핑 조회 result_id = redis_client.hget("pipeline_result_mapping", request_id) final_result = None if result_id: # 3. Redis에서 최종 결과 조회 result_key = f"pipeline_result:{result_id}" result_str = redis_client.get(result_key) if result_str: try: final_result = json.loads(result_str) return JSONResponse( content={ "request_id": request_id, "progress_logs": parsed_logs, "final_result": final_result, } ) except json.JSONDecodeError: final_result = { "massage": "[REDIS] 결과 존재하지만, 디코딩에 실패했습니다." } else: print(f"[REDIS] request_id {request_id} 가 Redis에 없습니다.") # 4. Redis에 결과가 없으면 MinIO에서 조회 try: print(f"[MINIO] MinIO에서 결과를 가져오는 중: {request_id}") final_result = fetch_result_from_minio(request_id) if final_result: return JSONResponse( content={ "request_id": request_id, "progress_logs": parsed_logs, "final_result": final_result, } ) else: # MinIO에서 결과가 없으면 작업 진행 상태 실시간 확인 return JSONResponse( content={ "request_id": request_id, "progress_logs": parsed_logs, "final_result": "작업이 진행 중입니다. 결과는 아직 생성되지 않았습니다.", } ) except Exception as e: print(f"[MINIO] MinIO 결과 조회 중 실패했습니다: {e}") ## 조찬영 @router.post( "/inner2/d6c", summary="국내 문서 테스트용", ) async def extract2_d6c( request_info: Request, minio_url: str = Form(...), model: Optional[str] = Form(default="qwen3:30b"), api_key: str = Depends(get_api_key), ): try: response = requests.get(minio_url) response.raise_for_status() # 4xx/5xx 응답에 대해 HTTPError 발생 except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return JSONResponse( status_code=400, content={ "message": "제공된 MinIO URL이 만료되었거나 접근 권한이 없습니다." }, ) else: return JSONResponse( status_code=400, content={ "message": f"URL에서 파일을 가져오는 데 실패했습니다: {e.response.status_code} {e.response.reason}" }, ) except requests.exceptions.RequestException as e: return JSONResponse( status_code=400, content={"message": f"URL에 연결하는 중 오류가 발생했습니다: {e}"}, ) # URL에서 쿼리 파라미터를 제외한 파일 이름 추출 parsed_url = urlparse(minio_url) file_name = parsed_url.path.split("/")[-1] # 다운로드한 파일 데이터로 UploadFile 객체 생성 input_file = UploadFile(filename=file_name, file=io.BytesIO(response.content)) validate_all_files(input_file) request_id = create_key() result_id = create_key() cloned_input = clone_upload_file(input_file) if input_file else None # 설정에 정의된 기본 I18N 프롬프트 파일을 항상 사용 with open(I18N_PROMPT_PATH, "rb") as f: content = f.read() # 메모리 내 파일 객체 생성 spooled_file = io.BytesIO(content) # UploadFile과 유사한 객체를 생성하여 백그라운드 핸들러로 전달 dummy_prompt_file = UploadFile(filename=I18N_PROMPT_PATH.name, file=spooled_file) cloned_prompt = clone_upload_file(dummy_prompt_file) asyncio.create_task( InferenceHandler.handle_extract_background( request_id=request_id, result_id=result_id, input_file=cloned_input, schema_file=None, prompt_file=cloned_prompt, mode="inner", model_list=[model], request_info=request_info, api_key=api_key, ) ) redis_client.hset("pipeline_result_mapping", request_id, result_id) return JSONResponse( content={ "message": "문서 추출 및 생성형 응답 작업이 백그라운드에서 실행 중입니다.", "request_id": request_id, "status_check_url": f"/extract/progress/{request_id}", } ) @router.post( "/inner2/i18n", summary="해외 문서 테스트용", ) async def extract2_i18n( request_info: Request, minio_url: str = Form(...), model: Optional[str] = Form(default="qwen3:30b"), api_key: str = Depends(get_api_key), ): try: response = requests.get(minio_url) response.raise_for_status() # 4xx/5xx 응답에 대해 HTTPError 발생 except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return JSONResponse( status_code=400, content={ "message": "제공된 MinIO URL이 만료되었거나 접근 권한이 없습니다." }, ) else: return JSONResponse( status_code=400, content={ "message": f"URL에서 파일을 가져오는 데 실패했습니다: {e.response.status_code} {e.response.reason}" }, ) except requests.exceptions.RequestException as e: return JSONResponse( status_code=400, content={"message": f"URL에 연결하는 중 오류가 발생했습니다: {e}"}, ) # URL에서 쿼리 파라미터를 제외한 파일 이름 추출 parsed_url = urlparse(minio_url) file_name = parsed_url.path.split("/")[-1] # 다운로드한 파일 데이터로 UploadFile 객체 생성 input_file = UploadFile(filename=file_name, file=io.BytesIO(response.content)) validate_all_files(input_file) request_id = create_key() result_id = create_key() cloned_input = clone_upload_file(input_file) if input_file else None # 설정에 정의된 기본 I18N 프롬프트 파일을 항상 사용 with open(D6C_PROMPT_PATH, "rb") as f: content = f.read() # 메모리 내 파일 객체 생성 spooled_file = io.BytesIO(content) # UploadFile과 유사한 객체를 생성하여 백그라운드 핸들러로 전달 dummy_prompt_file = UploadFile(filename=D6C_PROMPT_PATH.name, file=spooled_file) cloned_prompt = clone_upload_file(dummy_prompt_file) asyncio.create_task( InferenceHandler.handle_extract_background( request_id=request_id, result_id=result_id, input_file=cloned_input, schema_file=None, prompt_file=cloned_prompt, mode="inner", model_list=[model], request_info=request_info, api_key=api_key, ) ) redis_client.hset("pipeline_result_mapping", request_id, result_id) return JSONResponse( content={ "message": "문서 추출 및 생성형 응답 작업이 백그라운드에서 실행 중입니다.", "request_id": request_id, "status_check_url": f"/extract/progress/{request_id}", } ) ## 조찬영