import json import logging import re from pathlib import Path from typing import Literal import markdown2 from workspace.config.setting import SUMMARY_HTML_DIR logger = logging.getLogger(__name__) def safe_filename(filename: str) -> str: base = Path(filename).stem base = re.sub(r"[^\w\-]", "_", base) return f"{base}.html" def post_process( file_name, text, generated_text, coord, ocr_model, llm_model, llm_url, mode, start_time, end_time, prompt_mode: Literal["general", "extract"] = "extract", ): result_dict = {} # ✅ 구조화 모드는 후처리 생략 if mode == "structured": result_dict = { "message": "✅ 구조화된 JSON 모델 출력입니다. post_process 후처리 생략됨.", "note": "generated 필드 참조 바랍니다.", } # ✅ 일반 추론 모드일 경우 elif prompt_mode == "general": html_content = markdown2.markdown(generated_text.strip()) html_filename = safe_filename(file_name) html_path = SUMMARY_HTML_DIR / html_filename html_path.write_text(html_content, encoding="utf-8") summary_url = f"http://172.16.10.176:8888/view/generated_html/{html_filename}" result_dict = { "message": "✅ 줄글로 생성된 모델 출력입니다. post_process 후처리 생략됨.", "note": "아래 url에 접속하여 markdown 형식으로 응답 확인하세요.", "summary_html": summary_url, } # ✅ 추출 기반 후처리 (extract) else: # ✅ JSON 코드블럭 형식 처리 if "```json" in generated_text: try: logger.debug("[PROCESS-JSON] JSON 코드블럭 형식 후처리 진행합니다.") json_block = re.search( r"```json\s*(\{.*?\})\s*```", generated_text, re.DOTALL ) if json_block: parsed_json = json.loads(json_block.group(1)) result_dict = { re.sub(r"[^ㄱ-ㅎ가-힣a-zA-Z]", "", k): v for k, v in parsed_json.items() } except Exception as e: logger.error("[PROCESS-ERROR] JSON 코드블럭 파싱 실패:", e) # ✅ 길이 초과 메시지 감지 및 처리 elif "입력 텍스트가" in generated_text and "모델 호출 생략" in generated_text: result_dict = { "message": "⚠️ 입력 텍스트가 너무 깁니다. LLM 모델 호출을 생략했습니다.", "note": "OCR로 추출된 원본 텍스트(parsed)를 참고해 주세요.", } else: # ✅ "1.제목:" 또는 "1. 제목:" 형식 처리 logger.debug("[PROCESS-STRING] JSON 코드블럭 형식이 아닙니다.") blocks = re.split(r"\n(?=\d+\.\s*[^:\n]+:)", generated_text.strip()) for block in blocks: if ":" in block: key_line, *rest = block.split(":", 1) key = re.sub(r"^\d+\.\s*", "", key_line).strip() cleaned_key = re.sub(r"[^ㄱ-ㅎ가-힣a-zA-Z]", "", key) value = rest[0].strip() if rest else "" value = re.sub(r"^[^\w가-힣a-zA-Z]+", "", value).strip() result_dict[cleaned_key] = value json_data = { "filename": file_name, f"{mode}_model": { "ocr_model": ocr_model, "llm_model": llm_model, "api_url": llm_url, }, "time": { "duration_sec": f"{end_time - start_time:.2f}", "started_at": start_time, "ended_at": end_time, }, "fields": coord, "parsed": text, "generated": generated_text, "processed": result_dict, } # final_result logger.info(json.dumps(json_data["processed"], indent=2, ensure_ascii=False)) return json_data def ocr_process(filename, ocr_model, coord, text, start_time, end_time): json_data = { "filename": filename, "model": {"ocr_model": ocr_model}, "time": { "duration_sec": f"{end_time - start_time:.2f}", "started_at": start_time, "ended_at": end_time, }, "fields": coord, "parsed": text, } return json_data