중간
This commit is contained in:
588
fletimageanalysis/multi_file_processor.py
Normal file
588
fletimageanalysis/multi_file_processor.py
Normal file
@@ -0,0 +1,588 @@
|
||||
"""
|
||||
다중 파일 처리 모듈
|
||||
여러 PDF/DXF 파일을 배치로 처리하고 결과를 CSV로 저장하는 기능을 제공합니다.
|
||||
|
||||
Author: Claude Assistant
|
||||
Created: 2025-07-14
|
||||
Version: 1.0.0
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any, Optional, Callable
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
|
||||
from pdf_processor import PDFProcessor
|
||||
from dxf_processor import EnhancedDXFProcessor
|
||||
from gemini_analyzer import GeminiAnalyzer
|
||||
from csv_exporter import TitleBlockCSVExporter
|
||||
import json # Added import
|
||||
|
||||
# 로깅 설정
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileProcessingResult:
|
||||
"""단일 파일 처리 결과"""
|
||||
file_path: str
|
||||
file_name: str
|
||||
file_type: str
|
||||
file_size: int
|
||||
processing_time: float
|
||||
success: bool
|
||||
error_message: Optional[str] = None
|
||||
|
||||
# PDF 분석 결과
|
||||
pdf_analysis_result: Optional[str] = None
|
||||
|
||||
# DXF 분석 결과
|
||||
dxf_title_blocks: Optional[List[Dict]] = None
|
||||
dxf_total_attributes: Optional[int] = None
|
||||
dxf_total_text_entities: Optional[int] = None
|
||||
|
||||
# 공통 메타데이터
|
||||
processed_at: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatchProcessingConfig:
|
||||
"""배치 처리 설정"""
|
||||
organization_type: str = "한국도로공사"
|
||||
enable_gemini_batch_mode: bool = False
|
||||
max_concurrent_files: int = 3
|
||||
save_intermediate_results: bool = True
|
||||
output_csv_path: Optional[str] = None
|
||||
include_error_files: bool = True
|
||||
|
||||
|
||||
class MultiFileProcessor:
|
||||
"""다중 파일 처리기"""
|
||||
|
||||
def __init__(self, gemini_api_key: str):
|
||||
"""
|
||||
다중 파일 처리기 초기화
|
||||
|
||||
Args:
|
||||
gemini_api_key: Gemini API 키
|
||||
"""
|
||||
self.gemini_api_key = gemini_api_key
|
||||
self.pdf_processor = PDFProcessor()
|
||||
self.dxf_processor = EnhancedDXFProcessor()
|
||||
self.gemini_analyzer = GeminiAnalyzer(gemini_api_key)
|
||||
self.csv_exporter = TitleBlockCSVExporter() # CSV 내보내기 추가
|
||||
|
||||
self.processing_results: List[FileProcessingResult] = []
|
||||
self.current_progress = 0
|
||||
self.total_files = 0
|
||||
|
||||
async def process_multiple_files(
|
||||
self,
|
||||
file_paths: List[str],
|
||||
config: BatchProcessingConfig,
|
||||
progress_callback: Optional[Callable[[int, int, str], None]] = None
|
||||
) -> List[FileProcessingResult]:
|
||||
"""
|
||||
여러 파일을 배치로 처리
|
||||
|
||||
Args:
|
||||
file_paths: 처리할 파일 경로 리스트
|
||||
config: 배치 처리 설정
|
||||
progress_callback: 진행률 콜백 함수 (current, total, status)
|
||||
|
||||
Returns:
|
||||
처리 결과 리스트
|
||||
"""
|
||||
self.processing_results = []
|
||||
self.total_files = len(file_paths)
|
||||
self.current_progress = 0
|
||||
|
||||
logger.info(f"배치 처리 시작: {self.total_files}개 파일")
|
||||
|
||||
# 동시 처리 제한을 위한 세마포어
|
||||
semaphore = asyncio.Semaphore(config.max_concurrent_files)
|
||||
|
||||
# 각 파일에 대한 처리 태스크 생성
|
||||
tasks = []
|
||||
for i, file_path in enumerate(file_paths):
|
||||
task = self._process_single_file_with_semaphore(
|
||||
semaphore, file_path, config, progress_callback, i + 1
|
||||
)
|
||||
tasks.append(task)
|
||||
|
||||
# 모든 파일 처리 완료까지 대기
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# 예외 발생한 결과 처리
|
||||
for i, result in enumerate(results):
|
||||
if isinstance(result, Exception):
|
||||
error_result = FileProcessingResult(
|
||||
file_path=file_paths[i],
|
||||
file_name=os.path.basename(file_paths[i]),
|
||||
file_type="unknown",
|
||||
file_size=0,
|
||||
processing_time=0,
|
||||
success=False,
|
||||
error_message=str(result),
|
||||
processed_at=datetime.now().isoformat()
|
||||
)
|
||||
self.processing_results.append(error_result)
|
||||
|
||||
logger.info(f"배치 처리 완료: {len(self.processing_results)}개 결과")
|
||||
|
||||
# CSV 저장
|
||||
if config.output_csv_path:
|
||||
await self.save_results_to_csv(config.output_csv_path)
|
||||
|
||||
# JSON 출력도 함께 생성 (좌표 정보 포함)
|
||||
json_output_path = config.output_csv_path.replace('.csv', '.json')
|
||||
await self.save_results_to_json(json_output_path)
|
||||
|
||||
return self.processing_results
|
||||
|
||||
async def _process_single_file_with_semaphore(
|
||||
self,
|
||||
semaphore: asyncio.Semaphore,
|
||||
file_path: str,
|
||||
config: BatchProcessingConfig,
|
||||
progress_callback: Optional[Callable[[int, int, str], None]],
|
||||
file_number: int
|
||||
) -> None:
|
||||
"""세마포어를 사용하여 단일 파일 처리"""
|
||||
async with semaphore:
|
||||
result = await self._process_single_file(file_path, config)
|
||||
self.processing_results.append(result)
|
||||
|
||||
self.current_progress += 1
|
||||
if progress_callback:
|
||||
status = f"처리 완료: {result.file_name}"
|
||||
if not result.success:
|
||||
status = f"처리 실패: {result.file_name} - {result.error_message}"
|
||||
progress_callback(self.current_progress, self.total_files, status)
|
||||
|
||||
async def _process_single_file(
|
||||
self,
|
||||
file_path: str,
|
||||
config: BatchProcessingConfig
|
||||
) -> FileProcessingResult:
|
||||
"""
|
||||
단일 파일 처리
|
||||
|
||||
Args:
|
||||
file_path: 파일 경로
|
||||
config: 처리 설정
|
||||
|
||||
Returns:
|
||||
처리 결과
|
||||
"""
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
file_name = os.path.basename(file_path)
|
||||
|
||||
try:
|
||||
# 파일 정보 수집
|
||||
file_size = os.path.getsize(file_path)
|
||||
file_type = self._detect_file_type(file_path)
|
||||
|
||||
logger.info(f"파일 처리 시작: {file_name} ({file_type})")
|
||||
|
||||
result = FileProcessingResult(
|
||||
file_path=file_path,
|
||||
file_name=file_name,
|
||||
file_type=file_type,
|
||||
file_size=file_size,
|
||||
processing_time=0,
|
||||
success=False,
|
||||
processed_at=datetime.now().isoformat()
|
||||
)
|
||||
|
||||
# 파일 유형에 따른 처리
|
||||
if file_type.lower() == 'pdf':
|
||||
await self._process_pdf_file(file_path, result, config)
|
||||
elif file_type.lower() == 'dxf':
|
||||
await self._process_dxf_file(file_path, result, config)
|
||||
else:
|
||||
raise ValueError(f"지원하지 않는 파일 형식: {file_type}")
|
||||
|
||||
result.success = True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"파일 처리 오류 ({file_name}): {str(e)}")
|
||||
result.success = False
|
||||
result.error_message = str(e)
|
||||
|
||||
finally:
|
||||
# 처리 시간 계산
|
||||
end_time = asyncio.get_event_loop().time()
|
||||
result.processing_time = round(end_time - start_time, 2)
|
||||
|
||||
return result
|
||||
|
||||
async def _process_pdf_file(
|
||||
self,
|
||||
file_path: str,
|
||||
result: FileProcessingResult,
|
||||
config: BatchProcessingConfig
|
||||
) -> None:
|
||||
"""PDF 파일 처리"""
|
||||
# PDF 이미지 변환
|
||||
images = self.pdf_processor.convert_to_images(file_path)
|
||||
if not images:
|
||||
raise ValueError("PDF를 이미지로 변환할 수 없습니다")
|
||||
|
||||
# 첫 번째 페이지만 분석 (다중 페이지 처리는 향후 개선)
|
||||
first_page = images[0]
|
||||
base64_image = self.pdf_processor.image_to_base64(first_page)
|
||||
|
||||
# PDF에서 텍스트 블록 추출
|
||||
text_blocks = self.pdf_processor.extract_text_with_coordinates(file_path, 0)
|
||||
|
||||
# Gemini API로 분석
|
||||
# 실제 구현에서는 batch mode 사용 가능
|
||||
analysis_result = await self._analyze_with_gemini(
|
||||
base64_image, text_blocks, config.organization_type
|
||||
)
|
||||
|
||||
result.pdf_analysis_result = analysis_result
|
||||
|
||||
async def _process_dxf_file(
|
||||
self,
|
||||
file_path: str,
|
||||
result: FileProcessingResult,
|
||||
config: BatchProcessingConfig
|
||||
) -> None:
|
||||
"""DXF 파일 처리"""
|
||||
# DXF 파일 분석
|
||||
extraction_result = self.dxf_processor.extract_comprehensive_data(file_path)
|
||||
|
||||
# 타이틀 블록 정보를 딕셔너리 리스트로 변환
|
||||
title_blocks = []
|
||||
for tb_info in extraction_result.title_blocks:
|
||||
tb_dict = {
|
||||
'block_name': tb_info.block_name,
|
||||
'block_position': f"{tb_info.block_position[0]:.2f}, {tb_info.block_position[1]:.2f}",
|
||||
'attributes_count': tb_info.attributes_count,
|
||||
'attributes': [
|
||||
{
|
||||
'tag': attr.tag,
|
||||
'text': attr.text,
|
||||
'prompt': attr.prompt,
|
||||
'insert_x': attr.insert_x,
|
||||
'insert_y': attr.insert_y
|
||||
}
|
||||
for attr in tb_info.all_attributes
|
||||
]
|
||||
}
|
||||
title_blocks.append(tb_dict)
|
||||
|
||||
result.dxf_title_blocks = title_blocks
|
||||
result.dxf_total_attributes = sum(tb['attributes_count'] for tb in title_blocks)
|
||||
result.dxf_total_text_entities = len(extraction_result.text_entities)
|
||||
|
||||
# 상세한 title block attributes CSV 생성
|
||||
if extraction_result.title_blocks:
|
||||
await self._save_detailed_dxf_csv(file_path, extraction_result)
|
||||
|
||||
async def _analyze_with_gemini(
|
||||
self,
|
||||
base64_image: str,
|
||||
text_blocks: list,
|
||||
organization_type: str
|
||||
) -> str:
|
||||
"""Gemini API로 이미지 분석"""
|
||||
try:
|
||||
# 비동기 처리를 위해 동기 함수를 태스크로 실행
|
||||
loop = asyncio.get_event_loop()
|
||||
result = await loop.run_in_executor(
|
||||
None,
|
||||
self.gemini_analyzer.analyze_pdf_page,
|
||||
base64_image,
|
||||
text_blocks,
|
||||
None, # prompt (default 사용)
|
||||
"image/png", # mime_type
|
||||
organization_type
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Gemini 분석 오류: {str(e)}")
|
||||
return f"분석 실패: {str(e)}"
|
||||
|
||||
async def _save_detailed_dxf_csv(
|
||||
self,
|
||||
file_path: str,
|
||||
extraction_result
|
||||
) -> None:
|
||||
"""상세한 DXF title block attributes CSV 저장"""
|
||||
try:
|
||||
# 파일명에서 확장자 제거
|
||||
file_name = os.path.splitext(os.path.basename(file_path))[0]
|
||||
|
||||
# 출력 디렉토리 확인 및 생성
|
||||
output_dir = os.path.join(os.path.dirname(file_path), '..', 'results')
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# CSV 파일명 생성
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
csv_filename = f"detailed_title_blocks_{file_name}_{timestamp}.csv"
|
||||
csv_path = os.path.join(output_dir, csv_filename)
|
||||
|
||||
# TitleBlockCSVExporter를 사용하여 CSV 생성
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(
|
||||
None,
|
||||
self.csv_exporter.save_title_block_info_to_csv,
|
||||
extraction_result.title_blocks,
|
||||
csv_path
|
||||
)
|
||||
|
||||
logger.info(f"상세 DXF CSV 저장 완료: {csv_path}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"상세 DXF CSV 저장 오류: {str(e)}")
|
||||
|
||||
def _detect_file_type(self, file_path: str) -> str:
|
||||
"""파일 확장자로 파일 유형 검출"""
|
||||
_, ext = os.path.splitext(file_path.lower())
|
||||
if ext == '.pdf':
|
||||
return 'PDF'
|
||||
elif ext == '.dxf':
|
||||
return 'DXF'
|
||||
else:
|
||||
return ext.upper().lstrip('.')
|
||||
|
||||
async def save_results_to_csv(self, output_path: str) -> None:
|
||||
"""
|
||||
처리 결과를 CSV 파일로 저장
|
||||
|
||||
Args:
|
||||
output_path: 출력 CSV 파일 경로
|
||||
"""
|
||||
try:
|
||||
# 결과를 DataFrame으로 변환
|
||||
data_rows = []
|
||||
|
||||
for result in self.processing_results:
|
||||
# 기본 정보
|
||||
row = {
|
||||
'file_name': result.file_name,
|
||||
'file_path': result.file_path,
|
||||
'file_type': result.file_type,
|
||||
'file_size_bytes': result.file_size,
|
||||
'file_size_mb': round(result.file_size / (1024 * 1024), 2),
|
||||
'processing_time_seconds': result.processing_time,
|
||||
'success': result.success,
|
||||
'error_message': result.error_message or '',
|
||||
'processed_at': result.processed_at
|
||||
}
|
||||
|
||||
# PDF 분석 결과
|
||||
if result.file_type.lower() == 'pdf':
|
||||
row['pdf_analysis_result'] = result.pdf_analysis_result or ''
|
||||
row['dxf_total_attributes'] = ''
|
||||
row['dxf_total_text_entities'] = ''
|
||||
row['dxf_title_blocks_summary'] = ''
|
||||
|
||||
# DXF 분석 결과
|
||||
elif result.file_type.lower() == 'dxf':
|
||||
row['pdf_analysis_result'] = ''
|
||||
row['dxf_total_attributes'] = result.dxf_total_attributes or 0
|
||||
row['dxf_total_text_entities'] = result.dxf_total_text_entities or 0
|
||||
|
||||
# 타이틀 블록 요약
|
||||
if result.dxf_title_blocks:
|
||||
summary = f"{len(result.dxf_title_blocks)}개 타이틀블록"
|
||||
for tb in result.dxf_title_blocks[:3]: # 처음 3개만 표시
|
||||
summary += f" | {tb['block_name']}({tb['attributes_count']}속성)"
|
||||
if len(result.dxf_title_blocks) > 3:
|
||||
summary += f" | ...외 {len(result.dxf_title_blocks)-3}개"
|
||||
row['dxf_title_blocks_summary'] = summary
|
||||
else:
|
||||
row['dxf_title_blocks_summary'] = '타이틀블록 없음'
|
||||
|
||||
data_rows.append(row)
|
||||
|
||||
# DataFrame 생성 및 CSV 저장
|
||||
df = pd.DataFrame(data_rows)
|
||||
|
||||
# pdf_analysis_result 컬럼 평탄화
|
||||
if 'pdf_analysis_result' in df.columns:
|
||||
# JSON 문자열을 딕셔너리로 변환 (이미 딕셔너리인 경우도 처리)
|
||||
df['pdf_analysis_result'] = df['pdf_analysis_result'].apply(lambda x: json.loads(x) if isinstance(x, str) and x.strip() else {}).fillna({})
|
||||
|
||||
# 평탄화된 데이터를 새로운 DataFrame으로 생성
|
||||
# errors='ignore'를 사용하여 JSON이 아닌 값은 무시
|
||||
# record_prefix를 사용하여 컬럼 이름에 접두사 추가
|
||||
pdf_analysis_df = pd.json_normalize(df['pdf_analysis_result'], errors='ignore', record_prefix='pdf_analysis_result_')
|
||||
|
||||
# 원본 df에서 pdf_analysis_result 컬럼 제거
|
||||
df = df.drop(columns=['pdf_analysis_result'])
|
||||
|
||||
# 원본 df와 평탄화된 DataFrame을 병합
|
||||
df = pd.concat([df, pdf_analysis_df], axis=1)
|
||||
|
||||
# 컬럼 순서 정렬을 위한 기본 순서 정의
|
||||
column_order = [
|
||||
'file_name', 'file_type', 'file_size_mb', 'processing_time_seconds',
|
||||
'success', 'error_message', 'processed_at', 'file_path', 'file_size_bytes',
|
||||
'dxf_total_attributes', 'dxf_total_text_entities', 'dxf_title_blocks_summary'
|
||||
]
|
||||
|
||||
# 기존 컬럼 순서를 유지하면서 새로운 컬럼을 추가
|
||||
existing_columns = [col for col in column_order if col in df.columns]
|
||||
new_columns = [col for col in df.columns if col not in existing_columns]
|
||||
df = df[existing_columns + sorted(new_columns)]
|
||||
|
||||
# UTF-8 BOM으로 저장 (한글 호환성)
|
||||
df.to_csv(output_path, index=False, encoding='utf-8-sig')
|
||||
|
||||
logger.info(f"CSV 저장 완료: {output_path}")
|
||||
logger.info(f"총 {len(data_rows)}개 파일 결과 저장")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"CSV 저장 오류: {str(e)}")
|
||||
raise
|
||||
|
||||
async def save_results_to_json(self, output_path: str) -> None:
|
||||
"""
|
||||
처리 결과를 JSON 파일로 저장 (좌표 정보 포함)
|
||||
|
||||
Args:
|
||||
output_path: 출력 JSON 파일 경로
|
||||
"""
|
||||
try:
|
||||
# 결과를 JSON 구조로 변환
|
||||
json_data = {
|
||||
"metadata": {
|
||||
"total_files": len(self.processing_results),
|
||||
"success_files": sum(1 for r in self.processing_results if r.success),
|
||||
"failed_files": sum(1 for r in self.processing_results if not r.success),
|
||||
"generated_at": datetime.now().isoformat(),
|
||||
"format_version": "1.0"
|
||||
},
|
||||
"results": []
|
||||
}
|
||||
|
||||
for result in self.processing_results:
|
||||
# 기본 정보
|
||||
result_data = {
|
||||
"file_info": {
|
||||
"name": result.file_name,
|
||||
"path": result.file_path,
|
||||
"type": result.file_type,
|
||||
"size_bytes": result.file_size,
|
||||
"size_mb": round(result.file_size / (1024 * 1024), 2)
|
||||
},
|
||||
"processing_info": {
|
||||
"success": result.success,
|
||||
"processing_time_seconds": result.processing_time,
|
||||
"processed_at": result.processed_at,
|
||||
"error_message": result.error_message
|
||||
}
|
||||
}
|
||||
|
||||
# PDF 분석 결과 (좌표 정보 포함)
|
||||
if result.file_type.lower() == 'pdf' and result.pdf_analysis_result:
|
||||
try:
|
||||
# JSON 문자열을 딕셔너리로 변환 (이미 딕셔너리인 경우도 처리)
|
||||
if isinstance(result.pdf_analysis_result, str):
|
||||
analysis_data = json.loads(result.pdf_analysis_result)
|
||||
else:
|
||||
analysis_data = result.pdf_analysis_result
|
||||
|
||||
result_data["pdf_analysis"] = analysis_data
|
||||
|
||||
except (json.JSONDecodeError, TypeError) as e:
|
||||
logger.warning(f"PDF 분석 결과 JSON 파싱 오류: {e}")
|
||||
result_data["pdf_analysis"] = {"error": "JSON 파싱 실패", "raw_data": str(result.pdf_analysis_result)}
|
||||
|
||||
# DXF 분석 결과
|
||||
elif result.file_type.lower() == 'dxf':
|
||||
result_data["dxf_analysis"] = {
|
||||
"total_attributes": result.dxf_total_attributes or 0,
|
||||
"total_text_entities": result.dxf_total_text_entities or 0,
|
||||
"title_blocks": result.dxf_title_blocks or []
|
||||
}
|
||||
|
||||
json_data["results"].append(result_data)
|
||||
|
||||
# JSON 파일 저장 (예쁜 포맷팅과 한글 지원)
|
||||
with open(output_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(json_data, f, ensure_ascii=False, indent=2, default=str)
|
||||
|
||||
logger.info(f"JSON 저장 완료: {output_path}")
|
||||
logger.info(f"총 {len(json_data['results'])}개 파일 결과 저장 (좌표 정보 포함)")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"JSON 저장 오류: {str(e)}")
|
||||
raise
|
||||
|
||||
def get_processing_summary(self) -> Dict[str, Any]:
|
||||
"""처리 결과 요약 정보 반환"""
|
||||
if not self.processing_results:
|
||||
return {}
|
||||
|
||||
total_files = len(self.processing_results)
|
||||
success_files = sum(1 for r in self.processing_results if r.success)
|
||||
failed_files = total_files - success_files
|
||||
|
||||
pdf_files = sum(1 for r in self.processing_results if r.file_type.lower() == 'pdf')
|
||||
dxf_files = sum(1 for r in self.processing_results if r.file_type.lower() == 'dxf')
|
||||
|
||||
total_processing_time = sum(r.processing_time for r in self.processing_results)
|
||||
avg_processing_time = total_processing_time / total_files if total_files > 0 else 0
|
||||
|
||||
total_file_size = sum(r.file_size for r in self.processing_results)
|
||||
|
||||
return {
|
||||
'total_files': total_files,
|
||||
'success_files': success_files,
|
||||
'failed_files': failed_files,
|
||||
'pdf_files': pdf_files,
|
||||
'dxf_files': dxf_files,
|
||||
'total_processing_time': round(total_processing_time, 2),
|
||||
'avg_processing_time': round(avg_processing_time, 2),
|
||||
'total_file_size_mb': round(total_file_size / (1024 * 1024), 2),
|
||||
'success_rate': round((success_files / total_files) * 100, 1) if total_files > 0 else 0
|
||||
}
|
||||
|
||||
|
||||
def generate_default_csv_filename() -> str:
|
||||
"""기본 CSV 파일명 생성"""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
return f"batch_analysis_results_{timestamp}.csv"
|
||||
|
||||
|
||||
# 사용 예시
|
||||
if __name__ == "__main__":
|
||||
async def main():
|
||||
# 테스트용 예시
|
||||
processor = MultiFileProcessor("your-gemini-api-key")
|
||||
|
||||
config = BatchProcessingConfig(
|
||||
organization_type="한국도로공사",
|
||||
max_concurrent_files=2,
|
||||
output_csv_path="test_results.csv"
|
||||
)
|
||||
|
||||
# 진행률 콜백 함수
|
||||
def progress_callback(current: int, total: int, status: str):
|
||||
print(f"진행률: {current}/{total} ({current/total*100:.1f}%) - {status}")
|
||||
|
||||
# 파일 경로 리스트 (실제 파일 경로로 교체 필요)
|
||||
file_paths = [
|
||||
"sample1.pdf",
|
||||
"sample2.dxf",
|
||||
"sample3.pdf"
|
||||
]
|
||||
|
||||
results = await processor.process_multiple_files(
|
||||
file_paths, config, progress_callback
|
||||
)
|
||||
|
||||
summary = processor.get_processing_summary()
|
||||
print("처리 요약:", summary)
|
||||
|
||||
# asyncio.run(main())
|
||||
Reference in New Issue
Block a user