diff --git a/batch_cli.py b/batch_cli.py new file mode 100644 index 0000000..1129b9a --- /dev/null +++ b/batch_cli.py @@ -0,0 +1,216 @@ +""" +배치 처리 명령줄 인터페이스 +WPF 애플리케이션에서 호출 가능한 간단한 배치 처리 도구 + +Usage: + python batch_cli.py --files "file1.pdf,file2.dxf" --schema "한국도로공사" --concurrent 3 --batch-mode true --save-intermediate false --include-errors true --output "results.csv" +""" + +import argparse +import asyncio +import logging +import os +import sys +import time +from datetime import datetime +from typing import List + +# 프로젝트 모듈 임포트 +from config import Config +from multi_file_processor import MultiFileProcessor, BatchProcessingConfig, generate_default_csv_filename + +# 로깅 설정 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class BatchCLI: + """배치 처리 명령줄 인터페이스 클래스""" + + def __init__(self): + self.processor = None + self.start_time = None + + def setup_processor(self) -> bool: + """다중 파일 처리기 설정""" + try: + # 설정 검증 + config_errors = Config.validate_config() + if config_errors: + for error in config_errors: + print(f"ERROR: {error}") + return False + + # Gemini API 키 확인 + gemini_api_key = Config.get_gemini_api_key() + if not gemini_api_key: + print("ERROR: Gemini API 키가 설정되지 않았습니다") + return False + + # 처리기 초기화 + self.processor = MultiFileProcessor(gemini_api_key) + print("START: 배치 처리기 초기화 완료") + return True + + except Exception as e: + print(f"ERROR: 처리기 초기화 실패: {e}") + return False + + def parse_file_paths(self, files_arg: str) -> List[str]: + """파일 경로 문자열을 리스트로 파싱""" + if not files_arg: + return [] + + # 쉼표로 구분된 파일 경로들을 분리 + file_paths = [path.strip().strip('"\'') for path in files_arg.split(',')] + + # 파일 존재 여부 확인 + valid_paths = [] + for path in file_paths: + if os.path.exists(path): + valid_paths.append(path) + print(f"START: 파일 확인: {os.path.basename(path)}") + else: + print(f"ERROR: 파일을 찾을 수 없습니다: {path}") + + return valid_paths + + def parse_file_list_from_file(self, file_list_path: str) -> List[str]: + """파일 리스트 파일에서 파일 경로들을 읽어옴""" + if not file_list_path or not os.path.exists(file_list_path): + return [] + + valid_paths = [] + try: + with open(file_list_path, 'r', encoding='utf-8') as f: + for line in f: + path = line.strip().strip('"\'') + if path and os.path.exists(path): + valid_paths.append(path) + print(f"START: 파일 확인: {os.path.basename(path)}") + elif path: + print(f"ERROR: 파일을 찾을 수 없음: {path}") + + print(f"START: 총 {len(valid_paths)}개 파일 로드됨") + return valid_paths + + except Exception as e: + print(f"ERROR: 파일 리스트 읽기 실패: {e}") + return [] + + def create_batch_config(self, args) -> BatchProcessingConfig: + """명령줄 인수에서 배치 설정 생성""" + config = BatchProcessingConfig( + organization_type=args.schema, + enable_gemini_batch_mode=args.batch_mode, + max_concurrent_files=args.concurrent, + save_intermediate_results=args.save_intermediate, + output_csv_path=args.output, + include_error_files=args.include_errors + ) + return config + + def progress_callback(self, current: int, total: int, status: str): + """진행률 콜백 함수 - WPF가 기대하는 형식으로 출력""" + # WPF가 파싱할 수 있는 간단한 형식으로 출력 + print(f"PROGRESS: {current}/{total}") + print(f"COMPLETED: {status}") + + async def run_batch_processing(self, file_paths: List[str], config: BatchProcessingConfig) -> bool: + """배치 처리 실행""" + try: + self.start_time = time.time() + total_files = len(file_paths) + + print(f"START: 배치 처리 시작: {total_files}개 파일") + + # 처리 실행 + results = await self.processor.process_multiple_files( + file_paths, config, self.progress_callback + ) + + # 처리 완료 + end_time = time.time() + total_time = end_time - self.start_time + + # 요약 정보 + summary = self.processor.get_processing_summary() + + print(f"COMPLETED: 배치 처리 완료!") + print(f"COMPLETED: 총 처리 시간: {total_time:.1f}초") + print(f"COMPLETED: 성공: {summary['success_files']}개, 실패: {summary['failed_files']}개") + print(f"COMPLETED: CSV 결과 저장: {config.output_csv_path}") + print(f"COMPLETED: JSON 결과 저장: {config.output_csv_path.replace('.csv', '.json')}") + + return True + + except Exception as e: + print(f"ERROR: 배치 처리 중 오류: {e}") + return False + + +def str_to_bool(value: str) -> bool: + """문자열을 boolean으로 변환""" + return value.lower() in ["true", "1", "yes", "on"] + + +async def main(): + """메인 함수""" + parser = argparse.ArgumentParser(description="PDF/DXF 파일 배치 처리 도구") + + # 파일 입력 방식 (둘 중 하나 필수) + input_group = parser.add_mutually_exclusive_group(required=True) + input_group.add_argument("--files", "-f", help="처리할 파일 경로들 (쉼표로 구분)") + input_group.add_argument("--file-list", "-fl", help="처리할 파일 경로가 담긴 텍스트 파일") + + # 선택적 인수들 + parser.add_argument("--schema", "-s", default="한국도로공사", help="분석 스키마") + parser.add_argument("--concurrent", "-c", type=int, default=3, help="동시 처리할 파일 수") + parser.add_argument("--batch-mode", "-b", default="false", help="배치 모드 사용 여부") + parser.add_argument("--save-intermediate", "-i", default="true", help="중간 결과 저장 여부") + parser.add_argument("--include-errors", "-e", default="true", help="오류 파일 포함 여부") + parser.add_argument("--output", "-o", help="출력 CSV 파일 경로 (JSON 파일도 함께 생성됨)") + + args = parser.parse_args() + + # CLI 인스턴스 생성 + cli = BatchCLI() + + # 처리기 설정 + if not cli.setup_processor(): + sys.exit(1) + + # 파일 경로 파싱 + if args.files: + input_files = cli.parse_file_paths(args.files) + else: + input_files = cli.parse_file_list_from_file(args.file_list) + + if not input_files: + print("ERROR: 처리할 파일이 없습니다.") + sys.exit(1) + + # boolean 변환 + args.batch_mode = str_to_bool(args.batch_mode) + args.save_intermediate = str_to_bool(args.save_intermediate) + args.include_errors = str_to_bool(args.include_errors) + + # 배치 설정 생성 + config = cli.create_batch_config(args) + + # 배치 처리 실행 + success = await cli.run_batch_processing(input_files, config) + + if success: + print("SUCCESS: 배치 처리가 성공적으로 완료되었습니다.") + sys.exit(0) + else: + print("ERROR: 배치 처리 중 오류가 발생했습니다.") + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/gemini_analyzer.py b/gemini_analyzer.py index 5d8b9b0..d25ae4a 100644 --- a/gemini_analyzer.py +++ b/gemini_analyzer.py @@ -29,33 +29,65 @@ ValueWithCoords = types.Schema( required=["value", "x", "y"] ) +# 범용 표의 한 행을 위한 스키마 +GenericTableRow = types.Schema( + type=types.Type.ARRAY, + items=ValueWithCoords, + description="표의 한 행. 각 셀의 값과 좌표를 포함합니다." +) + +# 페이지에서 추출된 범용 표를 위한 스키마 +GenericTable = types.Schema( + type=types.Type.OBJECT, + properties={ + "table_title": types.Schema(type=types.Type.STRING, description="추출된 표의 내용을 설명하는 제목 (예: '범례', 'IP 정보', '개정 이력')."), + "table_data": types.Schema( + type=types.Type.ARRAY, + items=GenericTableRow, + description="표의 데이터. 각 내부 리스트가 하나의 행을 나타냅니다." + ) + }, + description="도면에서 발견된 구조화된 정보 블록이나 표." +) + + # 모든 필드가 ValueWithCoords를 사용하도록 스키마 업데이트 SCHEMA_EXPRESSWAY = types.Schema( type=types.Type.OBJECT, properties={ - "도면명": ValueWithCoords, + "도면명_line0": ValueWithCoords, + "도면명_line1": ValueWithCoords, + "도면명_line2": ValueWithCoords, "편철번호": ValueWithCoords, "도면번호": ValueWithCoords, - "Main Title": ValueWithCoords, - "Sub Title": ValueWithCoords, - "수평축척": ValueWithCoords, - "수직축척": ValueWithCoords, + "Main_Title": ValueWithCoords, + "Sub_Title": ValueWithCoords, + "수평_도면_축척": ValueWithCoords, + "수직_도면_축척": ValueWithCoords, "적용표준버전": ValueWithCoords, - "사업명": ValueWithCoords, + "사업명_top": ValueWithCoords, + "사업명_bot": ValueWithCoords, "시설_공구": ValueWithCoords, - "설계공구_Station": ValueWithCoords, - "시공공구_Station": ValueWithCoords, + "설계공구_공구명": ValueWithCoords, + "설계공구_범위": ValueWithCoords, + "시공공구_공구명": ValueWithCoords, + "시공공구_범위": ValueWithCoords, "건설분야": ValueWithCoords, "건설단계": ValueWithCoords, "설계사": ValueWithCoords, "시공사": ValueWithCoords, "노선이정": ValueWithCoords, - "계정번호": ValueWithCoords, - "계정날짜": ValueWithCoords, - "계정내용": ValueWithCoords, - "작성자": ValueWithCoords, - "검토자": ValueWithCoords, - "확인자": ValueWithCoords + "개정번호_1": ValueWithCoords, + "개정날짜_1": ValueWithCoords, + "개정내용_1": ValueWithCoords, + "작성자_1": ValueWithCoords, + "검토자_1": ValueWithCoords, + "확인자_1": ValueWithCoords, + "additional_tables": types.Schema( + type=types.Type.ARRAY, + items=GenericTable, + description="도면에서 발견된 추가적인 표나 정보 블록 목록." + ) }, ) @@ -74,12 +106,17 @@ SCHEMA_TRANSPORTATION = types.Schema( "시설_공구": ValueWithCoords, "건설분야": ValueWithCoords, "건설단계": ValueWithCoords, - "계정차수": ValueWithCoords, - "계정일자": ValueWithCoords, + "개정차수": ValueWithCoords, + "개정일자": ValueWithCoords, "과업책임자": ValueWithCoords, "분야별책임자": ValueWithCoords, "설계자": ValueWithCoords, - "위치정보": ValueWithCoords + "위치정보": ValueWithCoords, + "additional_tables": types.Schema( + type=types.Type.ARRAY, + items=GenericTable, + description="도면에서 발견된 추가적인 표나 정보 블록 목록." + ) }, ) @@ -139,9 +176,17 @@ class GeminiAnalyzer: "\n\n--- 추출된 텍스트와 좌표 정보 ---\n" + text_context + "\n\n--- 지시사항 ---\n" - "위 텍스트와 좌표 정보를 바탕으로, 이미지의 내용을 분석하여 JSON 스키마를 채워주세요." - "각 필드에 해당하는 텍스트를 찾고, 해당 텍스트의 'value'와 시작 'x', 'y' 좌표를 JSON에 기입하세요." - "해당하는 값이 없으면 빈 문자열을 사용하세요." + "1. 위 텍스트와 좌표 정보를 바탕으로, 이미지의 내용을 분석하여 JSON 스키마의 기본 필드를 채워주세요.\n" + "2. **(중요)** 도면 내에 표나 사각형으로 구분된 정보 블록이 있다면, `additional_tables` 필드에 추가해주세요. 예를 들어 'IP 정보 표', '범례(Legend)', '구조물 설명', '개정 이력' 등이 해당됩니다.\n" + " - 각 표/블록에 대해 `table_title`에 적절한 제목을 붙여주세요.\n" + " - `table_data`에는 표의 모든 행을 추출하여 리스트 형태로 넣어주세요. 각 행은 셀들의 리스트입니다.\n" + "3. 각 필드에 해당하는 텍스트를 찾고, 해당 텍스트의 'value'와 시작 'x', 'y' 좌표를 JSON에 기입하세요.\n" + "4. 해당하는 값이 없으면 빈 문자열이나 빈 리스트를 사용하세요.\n" + "\n--- 필드 설명 ---\n" + "- `{ }_Title`: 중앙 상단의 비교적 큰 폰트입니다.\n" + "- `사업명_top`에 해당하는 텍스트 아랫줄은 '시설_공구' 항목입니다.\n" + "- `도면명_line{n}`: 도면명에 해당하는 여러 줄의 텍스트를 위에서부터 0, 1, 2 순서로 기입합니다. 만약 두 줄이라면 line0은 비워두고 line1, line2를 채웁니다.\n" + "- `설계공구`/`시공공구`: '공구명'과 '범위'로 나뉘어 기입될 수 있습니다. (예: '설계공구 | 제2-1공구 | 12780.00-15860.00' -> `설계공구_공구명`: '제2-1공구', `설계공구_범위`: '12780.00-15860.00')\n" ) contents = [ @@ -178,6 +223,49 @@ class GeminiAnalyzer: result = response.text # JSON 응답을 파싱하여 다시 직렬화 (일관된 포맷팅) parsed_json = json.loads(result) + + # 디버깅: Gemini 응답 내용 로깅 + logger.info(f"=== Gemini 응답 디버깅 ===") + logger.info(f"조직 유형: {organization_type}") + logger.info(f"응답 필드 수: {len(parsed_json) if isinstance(parsed_json, dict) else 'N/A'}") + + if isinstance(parsed_json, dict): + # 새로운 필드들이 응답에 포함되었는지 확인 + new_fields = ["설계공구_Station_col1", "설계공구_Station_col2", "시공공구_Station_col1", "시공공구_Station_col2"] + old_fields = ["설계공구_Station", "시공공구_Station"] + + logger.info("=== 새 필드 확인 ===") + for field in new_fields: + if field in parsed_json: + field_data = parsed_json[field] + if isinstance(field_data, dict) and field_data.get('value'): + logger.info(f"✅ {field}: '{field_data.get('value')}' at ({field_data.get('x', 'N/A')}, {field_data.get('y', 'N/A')})") + else: + logger.info(f"⚠️ {field}: 빈 값 또는 잘못된 형식 - {field_data}") + else: + logger.info(f"❌ {field}: 응답에 없음") + + logger.info("=== 기존 필드 확인 ===") + for field in old_fields: + if field in parsed_json: + field_data = parsed_json[field] + if isinstance(field_data, dict) and field_data.get('value'): + logger.info(f"⚠️ {field}: '{field_data.get('value')}' (기존 필드가 여전히 존재)") + else: + logger.info(f"⚠️ {field}: 빈 값 - {field_data}") + else: + logger.info(f"✅ {field}: 응답에 없음 (예상됨)") + + logger.info("=== 전체 응답 필드 목록 ===") + for key in parsed_json.keys(): + value = parsed_json[key] + if isinstance(value, dict) and 'value' in value: + logger.info(f"필드: {key} = '{value.get('value', '')}' at ({value.get('x', 'N/A')}, {value.get('y', 'N/A')})") + else: + logger.info(f"필드: {key} = {type(value).__name__}") + + logger.info("=== 디버깅 끝 ===") + pretty_result = json.dumps(parsed_json, ensure_ascii=False, indent=2) logger.info(f"분석 완료: {len(pretty_result)} 문자") return pretty_result diff --git a/mapping_table_json.json b/mapping_table_json.json index 19fa197..1db153d 100644 --- a/mapping_table_json.json +++ b/mapping_table_json.json @@ -1,77 +1,89 @@ { "mapping_table": { "ailabel_to_systems": { - "도면명": { + "도면명_line0": { + "molit": "", + "expressway": "TD_DNAME_TOP", + "railway": "", + "docaikey": "DNAME_TOP" + }, + "도면명_line1": { "molit": "DI_TITLE", - "expressway": "TD_DNAME_MAIN", + "expressway": "TD_DNAME_MAIN", "railway": "TD_DNAME_MAIN", "docaikey": "DNAME_MAIN" }, - "편철번호": { + "도면명_line2": { "molit": "DI_SUBTITLE", "expressway": "TD_DNAME_BOT", - "railway": "TD_DNAME_BOT", + "railway": "TD_DNAME_BOT", "docaikey": "DNAME_BOT" }, - "도면번호": { + "편철번호": { "molit": "DA_PAGENO", "expressway": "TD_DWGNO", "railway": "TD_DWGNO", "docaikey": "DWGNO" }, - "Main Title": { + "도면번호": { "molit": "DI_DRWNO", "expressway": "TD_DWGCODE", "railway": "TD_DWGCODE", "docaikey": "DWGCODE" }, - "Sub Title": { + "Main_Title": { "molit": "UD_TITLE", "expressway": "TB_MTITIL", "railway": "TB_MTITIL", "docaikey": "MTITIL" }, - "수평축척": { + "Sub_Title": { "molit": "UD_SUBTITLE", "expressway": "TB_STITL", "railway": "TB_STITL", "docaikey": "STITL" }, - "수직축척": { - "molit": "", - "expressway": "TD_DWGCODE_PREV", - "railway": "", - "docaikey": "DWGCODE_PREV" - }, - "도면축척": { + "수평_도면_축척": { "molit": "DA_HSCALE", "expressway": "TD_HSCAL", "railway": "", "docaikey": "HSCAL" }, - "적용표준버전": { - "molit": "DA_STDNAME", - "expressway": "STDNAME", + "수직_도면_축척": { + "molit": "DA_VSCALE", + "expressway": "TD_VSCAL", "railway": "", - "docaikey": "" + "docaikey": "VSCAL" }, - "사업명": { + "적용표준버전": { "molit": "DA_STDVER", "expressway": "TD_VERSION", "railway": "TD_VERSION", "docaikey": "VERSION" }, - "시설_공구": { + "사업명_top": { "molit": "PI_CNAME", "expressway": "TB_CNAME", "railway": "", "docaikey": "TBCNAME" }, - "설계공구_Station": { + "시설_공구": { "molit": "UD_CDNAME", "expressway": "TB_CSCOP", "railway": "", "docaikey": "CSCOP" + }, + "설계공구_공구명": { + "molit": "", + "expressway": "TD_DSECT", + "railway": "", + "docaikey": "DSECT" + }, + "시공공구_공구명": { + "molit": "", + "expressway": "TD_CSECT", + "railway": "", + "docaikey": "CSECT" }, "건설분야": { "molit": "PA_CCLASS", @@ -86,54 +98,66 @@ "docaikey": "CSTEP" }, "설계사": { - "molit": "TD_DCOMP", + "molit": "", "expressway": "TD_DCOMP", - "railway": "", + "railway": "TD_DCOMP", "docaikey": "DCOMP" }, "시공사": { - "molit": "TD_CCOMP", + "molit": "", "expressway": "TD_CCOMP", - "railway": "", + "railway": "TD_CCOMP", "docaikey": "CCOMP" }, "노선이정": { - "molit": "TD_LNDST", - "expressway": "", + "molit": "", + "expressway": "TD_LNDST", "railway": "", "docaikey": "LNDST" }, - "계정번호": { + "설계공구_범위": { + "molit": "", + "expressway": "TD_DDIST", + "railway": "", + "docaikey": "DDIST" + }, + "시공공구_범위": { + "molit": "", + "expressway": "TD_CDIST", + "railway": "", + "docaikey": "CDIST" + }, + "개정번호_1": { "molit": "DC_RNUM1", "expressway": "TR_RNUM1", "railway": "TR_RNUM1", "docaikey": "RNUM1" }, - "계정날짜": { + "개정날짜_1": { "molit": "DC_RDATE1", "expressway": "TR_RDAT1", "railway": "TR_RDAT1", "docaikey": "RDAT1" }, - "개정내용": { + "개정내용_1": { "molit": "DC_RDES1", "expressway": "TR_RCON1", "railway": "TR_RCON1", "docaikey": "RCON1" }, - "작성자": { + "작성자_1": { "molit": "DC_RDGN1", "expressway": "TR_DGN1", "railway": "TR_DGN1", "docaikey": "DGN1" }, - "검토자": { + "검토자_1": { "molit": "DC_RCHK1", "expressway": "TR_CHK1", "railway": "TR_CHK1", "docaikey": "CHK1" }, - "확인자": { + "확인자_1": { "molit": "DC_RAPP1", "expressway": "TR_APP1", "railway": "TR_APP1", @@ -143,7 +167,7 @@ "system_mappings": { "expressway_to_transportation": { "TD_DNAME_MAIN": "DNAME_MAIN", - "TD_DNAME_BOT": "DNAME_BOT", + "TD_DNAME_BOT": "DNAME_BOT", "TD_DWGNO": "DWGNO", "TD_DWGCODE": "DWGCODE", "TB_MTITIL": "MTITIL", diff --git a/merge_excel_files.py b/merge_excel_files.py new file mode 100644 index 0000000..8c12edd --- /dev/null +++ b/merge_excel_files.py @@ -0,0 +1,47 @@ +import pandas as pd +import os +import glob +import sys + +def merge_excel_files(input_directory, output_filename="merged_output.xlsx"): + """ + Merges all .xlsx files in the specified directory into a single Excel file, + with each original file becoming a sheet named after the original filename. + + Args: + input_directory (str): The path to the directory containing the Excel files. + output_filename (str): The name of the output merged Excel file. + """ + excel_files = glob.glob(os.path.join(input_directory, "*.xlsx")) + + if not excel_files: + print(f"No .xlsx files found in '{input_directory}'.") + return + + print(f"Found {len(excel_files)} Excel files to merge:") + for f in excel_files: + print(f"- {os.path.basename(f)}") + + output_path = os.path.join(input_directory, output_filename) + + with pd.ExcelWriter(output_path, engine='openpyxl') as writer: + for excel_file in excel_files: + try: + df = pd.read_excel(excel_file) + sheet_name = os.path.splitext(os.path.basename(excel_file))[0] + df.to_excel(writer, sheet_name=sheet_name, index=False) + print(f"Successfully added '{os.path.basename(excel_file)}' as sheet '{sheet_name}'.") + except Exception as e: + print(f"Error processing '{os.path.basename(excel_file)}': {e}") + + print(f"All Excel files merged into '{output_path}'.") + +if __name__ == "__main__": + if len(sys.argv) > 1: + input_dir = sys.argv[1] + else: + input_dir = os.getcwd() + + directory_name = os.path.basename(input_dir) + output_filename = f"{directory_name}.xlsx" + merge_excel_files(input_dir, output_filename=output_filename) diff --git a/multi_file_processor.py b/multi_file_processor.py index 3d0e7f6..cb9c831 100644 --- a/multi_file_processor.py +++ b/multi_file_processor.py @@ -52,7 +52,7 @@ class FileProcessingResult: @dataclass class BatchProcessingConfig: """배치 처리 설정""" - organization_type: str = "국토교통부" + organization_type: str = "한국도로공사" enable_gemini_batch_mode: bool = False max_concurrent_files: int = 3 save_intermediate_results: bool = True @@ -137,6 +137,10 @@ class MultiFileProcessor: # CSV 저장 if config.output_csv_path: await self.save_results_to_csv(config.output_csv_path) + + # JSON 출력도 함께 생성 (좌표 정보 포함) + json_output_path = config.output_csv_path.replace('.csv', '.json') + await self.save_results_to_json(json_output_path) return self.processing_results @@ -351,96 +355,179 @@ class MultiFileProcessor: async def save_results_to_csv(self, output_path: str) -> None: """ - 처리 결과를 CSV 파일로 저장 + 처리 결과를 CSV 파일로 저장합니다. (파일당 한 줄, 추가 테이블은 JSON으로 저장) Args: output_path: 출력 CSV 파일 경로 """ try: - # 결과를 DataFrame으로 변환 data_rows = [] - + + # 모든 결과를 순회하며 한 줄 데이터로 가공 for result in self.processing_results: - # 기본 정보 row = { 'file_name': result.file_name, 'file_path': result.file_path, 'file_type': result.file_type, - 'file_size_bytes': result.file_size, 'file_size_mb': round(result.file_size / (1024 * 1024), 2), 'processing_time_seconds': result.processing_time, 'success': result.success, 'error_message': result.error_message or '', 'processed_at': result.processed_at } + + if result.file_type.lower() == 'pdf' and result.success and result.pdf_analysis_result: + try: + pdf_data = json.loads(result.pdf_analysis_result) + + # 'additional_tables'를 추출하여 별도 처리 + additional_tables = pdf_data.pop('additional_tables', []) + if additional_tables: + # JSON 문자열로 변환하여 한 셀에 저장 (한글 유지, 좌표 제외) + tables_for_csv = [ + { + "table_title": table.get("table_title"), + "table_data": [ + [cell.get("value") for cell in table_row] + for table_row in table.get("table_data", []) + ] + } for table in additional_tables + ] + row['additional_tables'] = json.dumps(tables_for_csv, ensure_ascii=False) + else: + row['additional_tables'] = '' + + # 나머지 기본 정보들을 row에 추가 (좌표 제외) + for key, value in pdf_data.items(): + if isinstance(value, dict): + row[key] = value.get('value', '') + else: + row[key] = value + + except (json.JSONDecodeError, TypeError) as e: + logger.warning(f"PDF 결과 파싱 오류 ({result.file_name}): {e}") + row['error_message'] = f"JSON 파싱 실패: {e}" - # PDF 분석 결과 - if result.file_type.lower() == 'pdf': - row['pdf_analysis_result'] = result.pdf_analysis_result or '' - row['dxf_total_attributes'] = '' - row['dxf_total_text_entities'] = '' - row['dxf_title_blocks_summary'] = '' - - # DXF 분석 결과 elif result.file_type.lower() == 'dxf': - row['pdf_analysis_result'] = '' row['dxf_total_attributes'] = result.dxf_total_attributes or 0 row['dxf_total_text_entities'] = result.dxf_total_text_entities or 0 - - # 타이틀 블록 요약 - if result.dxf_title_blocks: - summary = f"{len(result.dxf_title_blocks)}개 타이틀블록" - for tb in result.dxf_title_blocks[:3]: # 처음 3개만 표시 - summary += f" | {tb['block_name']}({tb['attributes_count']}속성)" - if len(result.dxf_title_blocks) > 3: - summary += f" | ...외 {len(result.dxf_title_blocks)-3}개" - row['dxf_title_blocks_summary'] = summary - else: - row['dxf_title_blocks_summary'] = '타이틀블록 없음' data_rows.append(row) - - # DataFrame 생성 및 CSV 저장 + + if not data_rows: + logger.warning("CSV로 저장할 데이터가 없습니다.") + return + df = pd.DataFrame(data_rows) - # pdf_analysis_result 컬럼 평탄화 - if 'pdf_analysis_result' in df.columns: - # JSON 문자열을 딕셔너리로 변환 (이미 딕셔너리인 경우도 처리) - df['pdf_analysis_result'] = df['pdf_analysis_result'].apply(lambda x: json.loads(x) if isinstance(x, str) and x.strip() else {}).fillna({}) - - # 평탄화된 데이터를 새로운 DataFrame으로 생성 - # errors='ignore'를 사용하여 JSON이 아닌 값은 무시 - # record_prefix를 사용하여 컬럼 이름에 접두사 추가 - pdf_analysis_df = pd.json_normalize(df['pdf_analysis_result'], errors='ignore', record_prefix='pdf_analysis_result_') - - # 원본 df에서 pdf_analysis_result 컬럼 제거 - df = df.drop(columns=['pdf_analysis_result']) - - # 원본 df와 평탄화된 DataFrame을 병합 - df = pd.concat([df, pdf_analysis_df], axis=1) - - # 컬럼 순서 정렬을 위한 기본 순서 정의 - column_order = [ - 'file_name', 'file_type', 'file_size_mb', 'processing_time_seconds', - 'success', 'error_message', 'processed_at', 'file_path', 'file_size_bytes', - 'dxf_total_attributes', 'dxf_total_text_entities', 'dxf_title_blocks_summary' + # --- 컬럼 순서 정의 및 재정렬 --- + base_columns = [ + 'file_name', 'file_path', 'file_type', 'file_size_mb', + 'processing_time_seconds', 'success', 'error_message', 'processed_at' ] + pdf_metadata_columns = [ + "도면명", "도면명_line0", "도면명_line1", "도면명_line2", "편철번호", "도면번호", + "Main_Title", "Sub_Title", "Main Title", "Sub Title", "사업명", "사업명_top", "사업명_bot", + "시설_공구", "수평_도면_축척", "수직_도면_축척", "수평축척", "수직축척", "적용표준버전", "적용표준", + "건설분야", "건설단계", "설계공구_공구명", "설계공구_범위", "시공공구_공구명", "시공공구_범위", + "설계사", "시공사", "노선이정", "개정번호_1", "개정날짜_1", "개정내용_1", "작성자_1", "검토자_1", "확인자_1", + "개정차수", "개정일자", "과업책임자", "분야별책임자", "설계자", "위치정보" + ] + # 새로 추가된 컬럼은 맨 뒤로 + new_columns = ['additional_tables'] + dxf_columns = ['dxf_total_attributes', 'dxf_total_text_entities'] - # 기존 컬럼 순서를 유지하면서 새로운 컬럼을 추가 - existing_columns = [col for col in column_order if col in df.columns] - new_columns = [col for col in df.columns if col not in existing_columns] - df = df[existing_columns + sorted(new_columns)] + potential_columns = base_columns + pdf_metadata_columns + dxf_columns + new_columns - # UTF-8 BOM으로 저장 (한글 호환성) + final_ordered_columns = [col for col in potential_columns if col in df.columns] + remaining_columns = sorted([col for col in df.columns if col not in final_ordered_columns]) + + df = df[final_ordered_columns + remaining_columns] + + # UTF-8 BOM으로 저장 df.to_csv(output_path, index=False, encoding='utf-8-sig') logger.info(f"CSV 저장 완료: {output_path}") - logger.info(f"총 {len(data_rows)}개 파일 결과 저장") + logger.info(f"총 {len(df)}개 행 결과 저장") except Exception as e: logger.error(f"CSV 저장 오류: {str(e)}") raise + async def save_results_to_json(self, output_path: str) -> None: + """ + 처리 결과를 JSON 파일로 저장 (좌표 정보 포함) + + Args: + output_path: 출력 JSON 파일 경로 + """ + try: + # 결과를 JSON 구조로 변환 + json_data = { + "metadata": { + "total_files": len(self.processing_results), + "success_files": sum(1 for r in self.processing_results if r.success), + "failed_files": sum(1 for r in self.processing_results if not r.success), + "generated_at": datetime.now().isoformat(), + "format_version": "1.0" + }, + "results": [] + } + + for result in self.processing_results: + # 기본 정보 + result_data = { + "file_info": { + "name": result.file_name, + "path": result.file_path, + "type": result.file_type, + "size_bytes": result.file_size, + "size_mb": round(result.file_size / (1024 * 1024), 2) + }, + "processing_info": { + "success": result.success, + "processing_time_seconds": result.processing_time, + "processed_at": result.processed_at, + "error_message": result.error_message + } + } + + # PDF 분석 결과 (좌표 정보 포함) + if result.file_type.lower() == 'pdf' and result.pdf_analysis_result: + try: + # JSON 문자열을 딕셔너리로 변환 (이미 딕셔너리인 경우도 처리) + if isinstance(result.pdf_analysis_result, str): + analysis_data = json.loads(result.pdf_analysis_result) + else: + analysis_data = result.pdf_analysis_result + + result_data["pdf_analysis"] = analysis_data + + except (json.JSONDecodeError, TypeError) as e: + logger.warning(f"PDF 분석 결과 JSON 파싱 오류: {e}") + result_data["pdf_analysis"] = {"error": "JSON 파싱 실패", "raw_data": str(result.pdf_analysis_result)} + + # DXF 분석 결과 + elif result.file_type.lower() == 'dxf': + result_data["dxf_analysis"] = { + "total_attributes": result.dxf_total_attributes or 0, + "total_text_entities": result.dxf_total_text_entities or 0, + "title_blocks": result.dxf_title_blocks or [] + } + + json_data["results"].append(result_data) + + # JSON 파일 저장 (예쁜 포맷팅과 한글 지원) + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(json_data, f, ensure_ascii=False, indent=2, default=str) + + logger.info(f"JSON 저장 완료: {output_path}") + logger.info(f"총 {len(json_data['results'])}개 파일 결과 저장 (좌표 정보 포함)") + + except Exception as e: + logger.error(f"JSON 저장 오류: {str(e)}") + raise + def get_processing_summary(self) -> Dict[str, Any]: """처리 결과 요약 정보 반환""" if not self.processing_results: @@ -484,7 +571,7 @@ if __name__ == "__main__": processor = MultiFileProcessor("your-gemini-api-key") config = BatchProcessingConfig( - organization_type="국토교통부", + organization_type="한국도로공사", max_concurrent_files=2, output_csv_path="test_results.csv" ) diff --git a/requirements-cli.txt b/requirements-cli.txt new file mode 100644 index 0000000..63a8757 --- /dev/null +++ b/requirements-cli.txt @@ -0,0 +1,9 @@ +# Essential packages for CLI batch processing only +PyMuPDF>=1.26.3 +google-genai>=1.0.0 +Pillow>=10.0.0 +ezdxf>=1.4.2 +numpy>=1.24.0 +python-dotenv>=1.0.0 +pandas>=2.0.0 +requests>=2.31.0 \ No newline at end of file