""" 다중 파일 처리 모듈 여러 PDF/DXF 파일을 배치로 처리하고 결과를 CSV로 저장하는 기능을 제공합니다. Author: Claude Assistant Created: 2025-07-14 Version: 1.0.0 """ import asyncio import os import pandas as pd from datetime import datetime from typing import List, Dict, Any, Optional, Callable from dataclasses import dataclass import logging from pdf_processor import PDFProcessor from dxf_processor import EnhancedDXFProcessor from gemini_analyzer import GeminiAnalyzer from csv_exporter import TitleBlockCSVExporter import json # Added import # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class FileProcessingResult: """단일 파일 처리 결과""" file_path: str file_name: str file_type: str file_size: int processing_time: float success: bool error_message: Optional[str] = None # PDF 분석 결과 pdf_analysis_result: Optional[str] = None # DXF 분석 결과 dxf_title_blocks: Optional[List[Dict]] = None dxf_total_attributes: Optional[int] = None dxf_total_text_entities: Optional[int] = None # 공통 메타데이터 processed_at: Optional[str] = None @dataclass class BatchProcessingConfig: """배치 처리 설정""" organization_type: str = "한국도로공사" enable_gemini_batch_mode: bool = False max_concurrent_files: int = 3 save_intermediate_results: bool = True output_csv_path: Optional[str] = None include_error_files: bool = True class MultiFileProcessor: """다중 파일 처리기""" def __init__(self, gemini_api_key: str): """ 다중 파일 처리기 초기화 Args: gemini_api_key: Gemini API 키 """ self.gemini_api_key = gemini_api_key self.pdf_processor = PDFProcessor() self.dxf_processor = EnhancedDXFProcessor() self.gemini_analyzer = GeminiAnalyzer(gemini_api_key) self.csv_exporter = TitleBlockCSVExporter() # CSV 내보내기 추가 self.processing_results: List[FileProcessingResult] = [] self.current_progress = 0 self.total_files = 0 async def process_multiple_files( self, file_paths: List[str], config: BatchProcessingConfig, progress_callback: Optional[Callable[[int, int, str], None]] = None ) -> List[FileProcessingResult]: """ 여러 파일을 배치로 처리 Args: file_paths: 처리할 파일 경로 리스트 config: 배치 처리 설정 progress_callback: 진행률 콜백 함수 (current, total, status) Returns: 처리 결과 리스트 """ self.processing_results = [] self.total_files = len(file_paths) self.current_progress = 0 logger.info(f"배치 처리 시작: {self.total_files}개 파일") # 동시 처리 제한을 위한 세마포어 semaphore = asyncio.Semaphore(config.max_concurrent_files) # 각 파일에 대한 처리 태스크 생성 tasks = [] for i, file_path in enumerate(file_paths): task = self._process_single_file_with_semaphore( semaphore, file_path, config, progress_callback, i + 1 ) tasks.append(task) # 모든 파일 처리 완료까지 대기 results = await asyncio.gather(*tasks, return_exceptions=True) # 예외 발생한 결과 처리 for i, result in enumerate(results): if isinstance(result, Exception): error_result = FileProcessingResult( file_path=file_paths[i], file_name=os.path.basename(file_paths[i]), file_type="unknown", file_size=0, processing_time=0, success=False, error_message=str(result), processed_at=datetime.now().isoformat() ) self.processing_results.append(error_result) logger.info(f"배치 처리 완료: {len(self.processing_results)}개 결과") # CSV 저장 if config.output_csv_path: await self.save_results_to_csv(config.output_csv_path) # JSON 출력도 함께 생성 (좌표 정보 포함) json_output_path = config.output_csv_path.replace('.csv', '.json') await self.save_results_to_json(json_output_path) return self.processing_results async def _process_single_file_with_semaphore( self, semaphore: asyncio.Semaphore, file_path: str, config: BatchProcessingConfig, progress_callback: Optional[Callable[[int, int, str], None]], file_number: int ) -> None: """세마포어를 사용하여 단일 파일 처리""" async with semaphore: result = await self._process_single_file(file_path, config) self.processing_results.append(result) self.current_progress += 1 if progress_callback: status = f"처리 완료: {result.file_name}" if not result.success: status = f"처리 실패: {result.file_name} - {result.error_message}" progress_callback(self.current_progress, self.total_files, status) async def _process_single_file( self, file_path: str, config: BatchProcessingConfig ) -> FileProcessingResult: """ 단일 파일 처리 Args: file_path: 파일 경로 config: 처리 설정 Returns: 처리 결과 """ start_time = asyncio.get_event_loop().time() file_name = os.path.basename(file_path) try: # 파일 정보 수집 file_size = os.path.getsize(file_path) file_type = self._detect_file_type(file_path) logger.info(f"파일 처리 시작: {file_name} ({file_type})") result = FileProcessingResult( file_path=file_path, file_name=file_name, file_type=file_type, file_size=file_size, processing_time=0, success=False, processed_at=datetime.now().isoformat() ) # 파일 유형에 따른 처리 if file_type.lower() == 'pdf': await self._process_pdf_file(file_path, result, config) elif file_type.lower() == 'dxf': await self._process_dxf_file(file_path, result, config) else: raise ValueError(f"지원하지 않는 파일 형식: {file_type}") result.success = True except Exception as e: logger.error(f"파일 처리 오류 ({file_name}): {str(e)}") result.success = False result.error_message = str(e) finally: # 처리 시간 계산 end_time = asyncio.get_event_loop().time() result.processing_time = round(end_time - start_time, 2) return result async def _process_pdf_file( self, file_path: str, result: FileProcessingResult, config: BatchProcessingConfig ) -> None: """PDF 파일 처리""" # PDF 이미지 변환 images = self.pdf_processor.convert_to_images(file_path) if not images: raise ValueError("PDF를 이미지로 변환할 수 없습니다") # 첫 번째 페이지만 분석 (다중 페이지 처리는 향후 개선) first_page = images[0] base64_image = self.pdf_processor.image_to_base64(first_page) # PDF에서 텍스트 블록 추출 text_blocks = self.pdf_processor.extract_text_with_coordinates(file_path, 0) # Gemini API로 분석 # 실제 구현에서는 batch mode 사용 가능 analysis_result = await self._analyze_with_gemini( base64_image, text_blocks, config.organization_type ) result.pdf_analysis_result = analysis_result async def _process_dxf_file( self, file_path: str, result: FileProcessingResult, config: BatchProcessingConfig ) -> None: """DXF 파일 처리""" # DXF 파일 분석 extraction_result = self.dxf_processor.extract_comprehensive_data(file_path) # 타이틀 블록 정보를 딕셔너리 리스트로 변환 title_blocks = [] for tb_info in extraction_result.title_blocks: tb_dict = { 'block_name': tb_info.block_name, 'block_position': f"{tb_info.block_position[0]:.2f}, {tb_info.block_position[1]:.2f}", 'attributes_count': tb_info.attributes_count, 'attributes': [ { 'tag': attr.tag, 'text': attr.text, 'prompt': attr.prompt, 'insert_x': attr.insert_x, 'insert_y': attr.insert_y } for attr in tb_info.all_attributes ] } title_blocks.append(tb_dict) result.dxf_title_blocks = title_blocks result.dxf_total_attributes = sum(tb['attributes_count'] for tb in title_blocks) result.dxf_total_text_entities = len(extraction_result.text_entities) # 상세한 title block attributes CSV 생성 if extraction_result.title_blocks: await self._save_detailed_dxf_csv(file_path, extraction_result) async def _analyze_with_gemini( self, base64_image: str, text_blocks: list, organization_type: str ) -> str: """Gemini API로 이미지 분석""" try: # 비동기 처리를 위해 동기 함수를 태스크로 실행 loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, self.gemini_analyzer.analyze_pdf_page, base64_image, text_blocks, None, # prompt (default 사용) "image/png", # mime_type organization_type ) return result except Exception as e: logger.error(f"Gemini 분석 오류: {str(e)}") return f"분석 실패: {str(e)}" async def _save_detailed_dxf_csv( self, file_path: str, extraction_result ) -> None: """상세한 DXF title block attributes CSV 저장""" try: # 파일명에서 확장자 제거 file_name = os.path.splitext(os.path.basename(file_path))[0] # 출력 디렉토리 확인 및 생성 output_dir = os.path.join(os.path.dirname(file_path), '..', 'results') os.makedirs(output_dir, exist_ok=True) # CSV 파일명 생성 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_filename = f"detailed_title_blocks_{file_name}_{timestamp}.csv" csv_path = os.path.join(output_dir, csv_filename) # TitleBlockCSVExporter를 사용하여 CSV 생성 loop = asyncio.get_event_loop() await loop.run_in_executor( None, self.csv_exporter.save_title_block_info_to_csv, extraction_result.title_blocks, csv_path ) logger.info(f"상세 DXF CSV 저장 완료: {csv_path}") except Exception as e: logger.error(f"상세 DXF CSV 저장 오류: {str(e)}") def _detect_file_type(self, file_path: str) -> str: """파일 확장자로 파일 유형 검출""" _, ext = os.path.splitext(file_path.lower()) if ext == '.pdf': return 'PDF' elif ext == '.dxf': return 'DXF' else: return ext.upper().lstrip('.') async def save_results_to_csv(self, output_path: str) -> None: """ 처리 결과를 CSV 파일로 저장합니다. (파일당 한 줄, 추가 테이블은 JSON으로 저장) Args: output_path: 출력 CSV 파일 경로 """ try: data_rows = [] # 모든 결과를 순회하며 한 줄 데이터로 가공 for result in self.processing_results: row = { 'file_name': result.file_name, 'file_path': result.file_path, 'file_type': result.file_type, 'file_size_mb': round(result.file_size / (1024 * 1024), 2), 'processing_time_seconds': result.processing_time, 'success': result.success, 'error_message': result.error_message or '', 'processed_at': result.processed_at } if result.file_type.lower() == 'pdf' and result.success and result.pdf_analysis_result: try: pdf_data = json.loads(result.pdf_analysis_result) # 'additional_tables'를 추출하여 별도 처리 additional_tables = pdf_data.pop('additional_tables', []) if additional_tables: # JSON 문자열로 변환하여 한 셀에 저장 (한글 유지, 좌표 제외) tables_for_csv = [ { "table_title": table.get("table_title"), "table_data": [ [cell.get("value") for cell in table_row] for table_row in table.get("table_data", []) ] } for table in additional_tables ] row['additional_tables'] = json.dumps(tables_for_csv, ensure_ascii=False) else: row['additional_tables'] = '' # 나머지 기본 정보들을 row에 추가 (좌표 제외) for key, value in pdf_data.items(): if isinstance(value, dict): row[key] = value.get('value', '') else: row[key] = value except (json.JSONDecodeError, TypeError) as e: logger.warning(f"PDF 결과 파싱 오류 ({result.file_name}): {e}") row['error_message'] = f"JSON 파싱 실패: {e}" elif result.file_type.lower() == 'dxf': row['dxf_total_attributes'] = result.dxf_total_attributes or 0 row['dxf_total_text_entities'] = result.dxf_total_text_entities or 0 data_rows.append(row) if not data_rows: logger.warning("CSV로 저장할 데이터가 없습니다.") return df = pd.DataFrame(data_rows) # --- 컬럼 순서 정의 및 재정렬 --- base_columns = [ 'file_name', 'file_path', 'file_type', 'file_size_mb', 'processing_time_seconds', 'success', 'error_message', 'processed_at' ] pdf_metadata_columns = [ "도면명", "도면명_line0", "도면명_line1", "도면명_line2", "편철번호", "도면번호", "Main_Title", "Sub_Title", "Main Title", "Sub Title", "사업명", "사업명_top", "사업명_bot", "시설_공구", "수평_도면_축척", "수직_도면_축척", "수평축척", "수직축척", "적용표준버전", "적용표준", "건설분야", "건설단계", "설계공구_공구명", "설계공구_범위", "시공공구_공구명", "시공공구_범위", "설계사", "시공사", "노선이정", "개정번호_1", "개정날짜_1", "개정내용_1", "작성자_1", "검토자_1", "확인자_1", "개정차수", "개정일자", "과업책임자", "분야별책임자", "설계자", "위치정보" ] # 새로 추가된 컬럼은 맨 뒤로 new_columns = ['additional_tables'] dxf_columns = ['dxf_total_attributes', 'dxf_total_text_entities'] potential_columns = base_columns + pdf_metadata_columns + dxf_columns + new_columns final_ordered_columns = [col for col in potential_columns if col in df.columns] remaining_columns = sorted([col for col in df.columns if col not in final_ordered_columns]) df = df[final_ordered_columns + remaining_columns] # UTF-8 BOM으로 저장 df.to_csv(output_path, index=False, encoding='utf-8-sig') logger.info(f"CSV 저장 완료: {output_path}") logger.info(f"총 {len(df)}개 행 결과 저장") except Exception as e: logger.error(f"CSV 저장 오류: {str(e)}") raise async def save_results_to_json(self, output_path: str) -> None: """ 처리 결과를 JSON 파일로 저장 (좌표 정보 포함) Args: output_path: 출력 JSON 파일 경로 """ try: # 결과를 JSON 구조로 변환 json_data = { "metadata": { "total_files": len(self.processing_results), "success_files": sum(1 for r in self.processing_results if r.success), "failed_files": sum(1 for r in self.processing_results if not r.success), "generated_at": datetime.now().isoformat(), "format_version": "1.0" }, "results": [] } for result in self.processing_results: # 기본 정보 result_data = { "file_info": { "name": result.file_name, "path": result.file_path, "type": result.file_type, "size_bytes": result.file_size, "size_mb": round(result.file_size / (1024 * 1024), 2) }, "processing_info": { "success": result.success, "processing_time_seconds": result.processing_time, "processed_at": result.processed_at, "error_message": result.error_message } } # PDF 분석 결과 (좌표 정보 포함) if result.file_type.lower() == 'pdf' and result.pdf_analysis_result: try: # JSON 문자열을 딕셔너리로 변환 (이미 딕셔너리인 경우도 처리) if isinstance(result.pdf_analysis_result, str): analysis_data = json.loads(result.pdf_analysis_result) else: analysis_data = result.pdf_analysis_result result_data["pdf_analysis"] = analysis_data except (json.JSONDecodeError, TypeError) as e: logger.warning(f"PDF 분석 결과 JSON 파싱 오류: {e}") result_data["pdf_analysis"] = {"error": "JSON 파싱 실패", "raw_data": str(result.pdf_analysis_result)} # DXF 분석 결과 elif result.file_type.lower() == 'dxf': result_data["dxf_analysis"] = { "total_attributes": result.dxf_total_attributes or 0, "total_text_entities": result.dxf_total_text_entities or 0, "title_blocks": result.dxf_title_blocks or [] } json_data["results"].append(result_data) # JSON 파일 저장 (예쁜 포맷팅과 한글 지원) with open(output_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=2, default=str) logger.info(f"JSON 저장 완료: {output_path}") logger.info(f"총 {len(json_data['results'])}개 파일 결과 저장 (좌표 정보 포함)") except Exception as e: logger.error(f"JSON 저장 오류: {str(e)}") raise def get_processing_summary(self) -> Dict[str, Any]: """처리 결과 요약 정보 반환""" if not self.processing_results: return {} total_files = len(self.processing_results) success_files = sum(1 for r in self.processing_results if r.success) failed_files = total_files - success_files pdf_files = sum(1 for r in self.processing_results if r.file_type.lower() == 'pdf') dxf_files = sum(1 for r in self.processing_results if r.file_type.lower() == 'dxf') total_processing_time = sum(r.processing_time for r in self.processing_results) avg_processing_time = total_processing_time / total_files if total_files > 0 else 0 total_file_size = sum(r.file_size for r in self.processing_results) return { 'total_files': total_files, 'success_files': success_files, 'failed_files': failed_files, 'pdf_files': pdf_files, 'dxf_files': dxf_files, 'total_processing_time': round(total_processing_time, 2), 'avg_processing_time': round(avg_processing_time, 2), 'total_file_size_mb': round(total_file_size / (1024 * 1024), 2), 'success_rate': round((success_files / total_files) * 100, 1) if total_files > 0 else 0 } def generate_default_csv_filename() -> str: """기본 CSV 파일명 생성""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"batch_analysis_results_{timestamp}.csv" # 사용 예시 if __name__ == "__main__": async def main(): # 테스트용 예시 processor = MultiFileProcessor("your-gemini-api-key") config = BatchProcessingConfig( organization_type="한국도로공사", max_concurrent_files=2, output_csv_path="test_results.csv" ) # 진행률 콜백 함수 def progress_callback(current: int, total: int, status: str): print(f"진행률: {current}/{total} ({current/total*100:.1f}%) - {status}") # 파일 경로 리스트 (실제 파일 경로로 교체 필요) file_paths = [ "sample1.pdf", "sample2.dxf", "sample3.pdf" ] results = await processor.process_multiple_files( file_paths, config, progress_callback ) summary = processor.get_processing_summary() print("처리 요약:", summary) # asyncio.run(main())