import asyncio import json import logging import os import shutil from pathlib import Path from typing import List, Optional from config.setting import ( DEFAULT_PROMPT_PATH, MINIO_BUCKET_NAME, PGN_REDIS_DB, PGN_REDIS_HOST, PGN_REDIS_PORT, STRUCTURED_PROMPT_PATH, UPLOAD_DIR, ) from fastapi import HTTPException, UploadFile from redis import Redis from utils.logging_utils import log_user_request from utils.minio_utils import save_result_to_minio, upload_file_to_minio_v2 from utils.prompt_cache import compute_file_hash, save_prompt_file_if_not_exists from services.dummy_service import DummyService from services.model_service import ModelInfoService from services.pipeline_runner import PipelineRunner # Redis 클라이언트 (LLM Gateway 전용) redis_client = Redis( host=PGN_REDIS_HOST, port=PGN_REDIS_PORT, db=PGN_REDIS_DB, decode_responses=True ) logger = logging.getLogger(__name__) class InferenceHandler: # ☑️ /general-공통처리함수 @staticmethod async def handle_general_background( request_id: str, result_id: str, input_file: UploadFile, prompt_file: UploadFile, mode: str, model: str, api_key: str, schema_file: Optional[UploadFile] = None, request_info: Optional[str] = None, ): logger.info(f"[INPUT_FILE_NAME]: {input_file.filename}") try: # ✅ prompt_file이 없으면 사용자 에러 응답 반환 if not prompt_file or not prompt_file.filename: raise HTTPException( status_code=400, detail="❌ 프롬프트 파일(prompt_file)은 반드시 업로드해야 합니다.", ) # ✅ 파일 저장 filename = input_file.filename file_path = os.path.join(UPLOAD_DIR, filename) with open(file_path, "wb") as f: shutil.copyfileobj(input_file.file, f) # 🔽 프롬프트 해시 처리 + 캐시 저장 + 내용 읽기 file_hash = compute_file_hash(prompt_file) cached_prompt_path = save_prompt_file_if_not_exists(file_hash, prompt_file) with open(cached_prompt_path, encoding="utf-8") as f: prompt = f.read() custom_mode = True # ✅ schema_file 있으면 structured 모드로 전환 및 로딩 schema_override = None if schema_file and schema_file.filename: schema_override = json.loads(await schema_file.read()) mode = "structured" # override # ✅ 모델 정보 수집 info_response = await ModelInfoService.get_model_info() info = json.loads(info_response.body.decode("utf-8")) inner_models = info["models"]["inner_model"]["model_list"] outer_models = info["models"]["outer_model"]["model_list"] model_url_map = await ModelInfoService.get_ollama_model_map() # presigned_url = upload_file_to_minio(input_file, request_id) presigned_url = upload_file_to_minio_v2( file=input_file, bucket_name=MINIO_BUCKET_NAME, object_name=f"{request_id}/{filename}", ) logger.info(f"[MinIO] presigned URL 생성 완료: {presigned_url}") # ✅ run_pipeline 재사용 (schema_override는 일반 추론이므로 None) results_minio = await PipelineRunner.run_pipeline( request_info=request_info, request_id=request_id, file_path=presigned_url, filename=filename, prompt=prompt, prompt_filename=Path(cached_prompt_path).name, custom_mode=custom_mode, mode=mode, model=model, inner_models=inner_models, outer_models=outer_models, model_url_map=model_url_map, api_key=api_key, schema_override=schema_override, prompt_mode="general", ) # ✅ 결과 Redis 저장 results_redis = {k: v for k, v in results_minio.items() if k != "fields"} redis_key = f"pipeline_result:{result_id}" redis_client.set( redis_key, json.dumps(results_redis, ensure_ascii=False), ex=60 * 60 ) logger.info(f"[REDIS] 결과 Redis 저장 완료: {result_id}") except Exception as e: logger.error(f"[PIPELINE] ❌ result_id={result_id} 처리 실패: {e}") redis_client.set( f"pipeline_result:{result_id}", json.dumps({"error": str(e)}), ex=60 * 60, ) # ✅ 결과 MinIO 저장 (전체본) try: minio_key = f"{request_id}/{input_file.filename.rsplit('.', 1)[0]}.json" presigned_url = save_result_to_minio( result_dict=results_minio, object_name=minio_key, ) logger.info(f"[MinIO] 결과 MinIO 저장 완료: {presigned_url}") except Exception as e: logger.error(f"[MinIO] 결과 저장 실패: {e}") raise HTTPException( status_code=500, detail="결과 파일 저장 중 오류가 발생했습니다.", ) @staticmethod async def handle_extract_background( request_id: str, result_id: str, input_file: UploadFile, schema_file: Optional[UploadFile], prompt_file: Optional[UploadFile], mode: str, model_list: List[str], api_key: str, request_info: Optional[str] = None, ): # ✅ dummy 요청 처리 if model_list == ["dummy"]: try: log_user_request( request_info=request_info, endpoint="dummy/extract/outer", input_filename="None", model="dummy", prompt_filename="None", context_length=0, api_key=api_key, ) return await DummyService.extract_dummy() except Exception as e: logger.info(f"Failed to log 'dummy/extract/outer' request: {e}") try: if prompt_file and prompt_file.filename: file_hash = compute_file_hash(prompt_file) cached_prompt_path = save_prompt_file_if_not_exists( file_hash, prompt_file ) with open(cached_prompt_path, encoding="utf-8") as f: prompt = f.read() custom_mode = True prompt_filename = Path(cached_prompt_path).name else: prompt_path = ( STRUCTURED_PROMPT_PATH if mode == "structured" else DEFAULT_PROMPT_PATH ) with open(prompt_path, encoding="utf-8") as f: prompt = f.read() custom_mode = False prompt_filename = Path(prompt_path).name if schema_file and schema_file.filename: # clone_upload_file()로 복제된 파일은 UploadFile과 달리 await .read() 지원 안 함 schema_content = schema_file.file.read() # 파일 핸들로 읽기 schema_override = json.loads(schema_content.decode("utf-8")) else: # with open(STRUCTURED_SCHEMA_PATH, "r", encoding="utf-8") as f: # schema_override = json.load(f) schema_override = None info_response = await ModelInfoService.get_model_info() info = json.loads(info_response.body.decode("utf-8")) inner_models = info["models"]["inner_model"]["model_list"] outer_models = info["models"]["outer_model"]["model_list"] model_url_map = await ModelInfoService.get_ollama_model_map() # presigned_url = upload_file_to_minio(input_file, request_id) presigned_url = upload_file_to_minio_v2( file=input_file, bucket_name=MINIO_BUCKET_NAME, object_name=f"{request_id}/{input_file.filename}", ) logger.info(f"[MinIO] presigned URL 생성 완료: {presigned_url}") tasks = [] for model in model_list: tasks.append( PipelineRunner.run_pipeline( request_info=request_info, request_id=request_id, file_path=presigned_url, filename=input_file.filename, prompt=prompt, prompt_filename=prompt_filename, custom_mode=custom_mode, mode=mode, model=model, inner_models=inner_models, outer_models=outer_models, model_url_map=model_url_map if model in inner_models else {}, api_key=api_key, schema_override=schema_override, prompt_mode="extract", ) ) result_set = await asyncio.gather(*tasks) results_minio = [] results_redis = [] for result in result_set: results_minio.append(result) # 'fields' 키 제외한 버전 생성 result_filtered = {k: v for k, v in result.items() if k != "fields"} results_redis.append(result_filtered) # ✅ 결과 Redis 저장 (요약본) redis_key = f"pipeline_result:{result_id}" redis_client.set( redis_key, json.dumps(results_redis, ensure_ascii=False), ex=60 * 60, ) logger.info(f"[REDIS] 결과 Redis 저장 완료: {result_id}") except Exception as e: logger.error(f"[PIPELINE] ❌ result_id={result_id} 처리 실패: {e}") redis_client.set( f"pipeline_result:{result_id}", json.dumps({"error": str(e)}), ex=60 * 60, ) # ✅ 결과 MinIO 저장 (전체본) try: minio_key = f"{request_id}/{input_file.filename.rsplit('.', 1)[0]}.json" presigned_url = save_result_to_minio( result_dict=results_minio, object_name=minio_key, ) logger.info(f"[MinIO] 결과 MinIO 저장 완료: {presigned_url}") except Exception as e: logger.error(f"[MinIO] 결과 저장 실패: {e}") raise HTTPException( status_code=500, detail="결과 파일 저장 중 오류가 발생했습니다.", )