379 lines
14 KiB
Python
379 lines
14 KiB
Python
"""
|
|
간단한 다중 파일 배치 처리 모듈
|
|
getcode.py 스타일의 간단한 분석을 여러 파일에 적용하고 결과를 CSV로 저장합니다.
|
|
|
|
Author: Claude Assistant
|
|
Created: 2025-07-14
|
|
Version: 1.0.0
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import pandas as pd
|
|
import base64
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Callable
|
|
from dataclasses import dataclass
|
|
import logging
|
|
|
|
from simple_gemini_analyzer import SimpleGeminiAnalyzer
|
|
from pdf_processor import PDFProcessor
|
|
|
|
# 로깅 설정
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class SimpleBatchResult:
|
|
"""간단한 배치 처리 결과"""
|
|
file_path: str
|
|
file_name: str
|
|
file_size_mb: float
|
|
processing_time_seconds: float
|
|
success: bool
|
|
|
|
# 분석 결과
|
|
analysis_result: Optional[str] = None
|
|
analysis_timestamp: Optional[str] = None
|
|
prompt_used: Optional[str] = None
|
|
model_used: Optional[str] = None
|
|
error_message: Optional[str] = None
|
|
|
|
# 메타데이터
|
|
processed_at: Optional[str] = None
|
|
|
|
|
|
class SimpleBatchProcessor:
|
|
"""
|
|
간단한 다중 파일 배치 처리기
|
|
getcode.py 스타일의 분석을 여러 PDF 파일에 적용합니다.
|
|
"""
|
|
|
|
def __init__(self, gemini_api_key: str):
|
|
"""
|
|
배치 처리기 초기화
|
|
|
|
Args:
|
|
gemini_api_key: Gemini API 키
|
|
"""
|
|
self.gemini_api_key = gemini_api_key
|
|
self.analyzer = SimpleGeminiAnalyzer(gemini_api_key)
|
|
self.pdf_processor = PDFProcessor()
|
|
|
|
self.results: List[SimpleBatchResult] = []
|
|
self.current_progress = 0
|
|
self.total_files = 0
|
|
|
|
logger.info("간단한 배치 처리기 초기화 완료")
|
|
|
|
async def process_multiple_pdf_files(
|
|
self,
|
|
pdf_file_paths: List[str],
|
|
output_csv_path: Optional[str] = None,
|
|
custom_prompt: Optional[str] = None,
|
|
max_concurrent_files: int = 3,
|
|
progress_callback: Optional[Callable[[int, int, str], None]] = None
|
|
) -> List[SimpleBatchResult]:
|
|
"""
|
|
여러 PDF 파일을 배치로 처리하고 결과를 CSV로 저장
|
|
|
|
Args:
|
|
pdf_file_paths: 처리할 PDF 파일 경로 리스트
|
|
output_csv_path: 출력 CSV 파일 경로 (None인 경우 자동 생성)
|
|
custom_prompt: 사용자 정의 프롬프트 (None인 경우 기본 프롬프트 사용)
|
|
max_concurrent_files: 동시 처리할 최대 파일 수
|
|
progress_callback: 진행률 콜백 함수 (current, total, status)
|
|
|
|
Returns:
|
|
처리 결과 리스트
|
|
"""
|
|
self.results = []
|
|
self.total_files = len(pdf_file_paths)
|
|
self.current_progress = 0
|
|
|
|
logger.info(f"간단한 배치 처리 시작: {self.total_files}개 PDF 파일")
|
|
|
|
if not pdf_file_paths:
|
|
logger.warning("처리할 파일이 없습니다.")
|
|
return []
|
|
|
|
# 동시 처리 제한을 위한 세마포어
|
|
semaphore = asyncio.Semaphore(max_concurrent_files)
|
|
|
|
# 각 파일에 대한 처리 태스크 생성
|
|
tasks = []
|
|
for i, file_path in enumerate(pdf_file_paths):
|
|
task = self._process_single_pdf_with_semaphore(
|
|
semaphore, file_path, custom_prompt, progress_callback, i + 1
|
|
)
|
|
tasks.append(task)
|
|
|
|
# 모든 파일 처리 완료까지 대기
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
logger.info(f"배치 처리 완료: {len(self.results)}개 결과")
|
|
|
|
# CSV 저장
|
|
if output_csv_path or self.results:
|
|
csv_path = output_csv_path or self._generate_default_csv_path()
|
|
await self.save_results_to_csv(csv_path)
|
|
|
|
return self.results
|
|
|
|
async def _process_single_pdf_with_semaphore(
|
|
self,
|
|
semaphore: asyncio.Semaphore,
|
|
file_path: str,
|
|
custom_prompt: Optional[str],
|
|
progress_callback: Optional[Callable[[int, int, str], None]],
|
|
file_number: int
|
|
) -> None:
|
|
"""세마포어를 사용하여 단일 PDF 파일 처리"""
|
|
async with semaphore:
|
|
result = await self._process_single_pdf_file(file_path, custom_prompt)
|
|
self.results.append(result)
|
|
|
|
self.current_progress += 1
|
|
if progress_callback:
|
|
status = f"처리 완료: {result.file_name}"
|
|
if not result.success:
|
|
status = f"처리 실패: {result.file_name}"
|
|
progress_callback(self.current_progress, self.total_files, status)
|
|
|
|
async def _process_single_pdf_file(
|
|
self,
|
|
file_path: str,
|
|
custom_prompt: Optional[str] = None
|
|
) -> SimpleBatchResult:
|
|
"""
|
|
단일 PDF 파일 처리
|
|
|
|
Args:
|
|
file_path: PDF 파일 경로
|
|
custom_prompt: 사용자 정의 프롬프트
|
|
|
|
Returns:
|
|
처리 결과
|
|
"""
|
|
start_time = asyncio.get_event_loop().time()
|
|
file_name = os.path.basename(file_path)
|
|
|
|
try:
|
|
# 파일 정보 수집
|
|
file_size = os.path.getsize(file_path)
|
|
file_size_mb = round(file_size / (1024 * 1024), 2)
|
|
|
|
logger.info(f"PDF 파일 처리 시작: {file_name} ({file_size_mb}MB)")
|
|
|
|
# PDF를 이미지로 변환 (첫 번째 페이지만)
|
|
images = self.pdf_processor.convert_to_images(file_path, max_pages=1)
|
|
if not images:
|
|
raise ValueError("PDF를 이미지로 변환할 수 없습니다")
|
|
|
|
# 첫 번째 페이지 이미지를 바이트로 변환
|
|
first_page_image = images[0]
|
|
image_bytes = self.pdf_processor.image_to_bytes(first_page_image)
|
|
|
|
# Gemini API로 분석 (비동기 처리)
|
|
loop = asyncio.get_event_loop()
|
|
analysis_result = await loop.run_in_executor(
|
|
None,
|
|
self.analyzer.analyze_image_from_bytes,
|
|
image_bytes,
|
|
custom_prompt,
|
|
"image/png"
|
|
)
|
|
|
|
if analysis_result and analysis_result['success']:
|
|
result = SimpleBatchResult(
|
|
file_path=file_path,
|
|
file_name=file_name,
|
|
file_size_mb=file_size_mb,
|
|
processing_time_seconds=0, # 나중에 계산
|
|
success=True,
|
|
analysis_result=analysis_result['analysis_result'],
|
|
analysis_timestamp=analysis_result['timestamp'],
|
|
prompt_used=analysis_result['prompt_used'],
|
|
model_used=analysis_result['model'],
|
|
error_message=None,
|
|
processed_at=datetime.now().isoformat()
|
|
)
|
|
logger.info(f"분석 성공: {file_name}")
|
|
else:
|
|
error_msg = analysis_result['error_message'] if analysis_result else "알 수 없는 오류"
|
|
result = SimpleBatchResult(
|
|
file_path=file_path,
|
|
file_name=file_name,
|
|
file_size_mb=file_size_mb,
|
|
processing_time_seconds=0,
|
|
success=False,
|
|
analysis_result=None,
|
|
error_message=error_msg,
|
|
processed_at=datetime.now().isoformat()
|
|
)
|
|
logger.error(f"분석 실패: {file_name} - {error_msg}")
|
|
|
|
except Exception as e:
|
|
error_msg = f"파일 처리 오류: {str(e)}"
|
|
logger.error(f"파일 처리 오류 ({file_name}): {error_msg}")
|
|
result = SimpleBatchResult(
|
|
file_path=file_path,
|
|
file_name=file_name,
|
|
file_size_mb=0,
|
|
processing_time_seconds=0,
|
|
success=False,
|
|
error_message=error_msg,
|
|
processed_at=datetime.now().isoformat()
|
|
)
|
|
|
|
finally:
|
|
# 처리 시간 계산
|
|
end_time = asyncio.get_event_loop().time()
|
|
result.processing_time_seconds = round(end_time - start_time, 2)
|
|
|
|
return result
|
|
|
|
async def save_results_to_csv(self, csv_path: str) -> None:
|
|
"""
|
|
처리 결과를 CSV 파일로 저장
|
|
|
|
Args:
|
|
csv_path: 출력 CSV 파일 경로
|
|
"""
|
|
try:
|
|
if not self.results:
|
|
logger.warning("저장할 결과가 없습니다.")
|
|
return
|
|
|
|
# 결과를 DataFrame으로 변환
|
|
data_rows = []
|
|
for result in self.results:
|
|
row = {
|
|
'file_name': result.file_name,
|
|
'file_path': result.file_path,
|
|
'file_size_mb': result.file_size_mb,
|
|
'processing_time_seconds': result.processing_time_seconds,
|
|
'success': result.success,
|
|
'analysis_result': result.analysis_result or '',
|
|
'analysis_timestamp': result.analysis_timestamp or '',
|
|
'prompt_used': result.prompt_used or '',
|
|
'model_used': result.model_used or '',
|
|
'error_message': result.error_message or '',
|
|
'processed_at': result.processed_at or ''
|
|
}
|
|
data_rows.append(row)
|
|
|
|
# DataFrame 생성
|
|
df = pd.DataFrame(data_rows)
|
|
|
|
# 컬럼 순서 정렬
|
|
column_order = [
|
|
'file_name', 'success', 'file_size_mb', 'processing_time_seconds',
|
|
'analysis_result', 'prompt_used', 'model_used', 'analysis_timestamp',
|
|
'error_message', 'processed_at', 'file_path'
|
|
]
|
|
|
|
df = df[column_order]
|
|
|
|
# 출력 디렉토리 생성
|
|
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
|
|
|
|
# UTF-8 BOM으로 저장 (한글 호환성)
|
|
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
|
|
|
|
logger.info(f"CSV 저장 완료: {csv_path}")
|
|
logger.info(f"총 {len(data_rows)}개 파일 결과 저장")
|
|
|
|
# 처리 요약 로그
|
|
success_count = sum(1 for r in self.results if r.success)
|
|
failure_count = len(self.results) - success_count
|
|
logger.info(f"처리 요약 - 성공: {success_count}개, 실패: {failure_count}개")
|
|
|
|
except Exception as e:
|
|
logger.error(f"CSV 저장 오류: {str(e)}")
|
|
raise
|
|
|
|
def _generate_default_csv_path(self) -> str:
|
|
"""기본 CSV 파일 경로 생성"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
results_dir = "D:/MYCLAUDE_PROJECT/fletimageanalysis/results"
|
|
os.makedirs(results_dir, exist_ok=True)
|
|
return os.path.join(results_dir, f"simple_batch_analysis_{timestamp}.csv")
|
|
|
|
def get_processing_summary(self) -> Dict[str, Any]:
|
|
"""처리 결과 요약 정보 반환"""
|
|
if not self.results:
|
|
return {}
|
|
|
|
total_files = len(self.results)
|
|
success_files = sum(1 for r in self.results if r.success)
|
|
failed_files = total_files - success_files
|
|
|
|
total_processing_time = sum(r.processing_time_seconds for r in self.results)
|
|
avg_processing_time = total_processing_time / total_files if total_files > 0 else 0
|
|
|
|
total_file_size = sum(r.file_size_mb for r in self.results)
|
|
|
|
return {
|
|
'total_files': total_files,
|
|
'success_files': success_files,
|
|
'failed_files': failed_files,
|
|
'total_processing_time': round(total_processing_time, 2),
|
|
'avg_processing_time': round(avg_processing_time, 2),
|
|
'total_file_size_mb': round(total_file_size, 2),
|
|
'success_rate': round((success_files / total_files) * 100, 1) if total_files > 0 else 0
|
|
}
|
|
|
|
|
|
# 사용 예시
|
|
async def main():
|
|
"""사용 예시 함수"""
|
|
# API 키 설정 (실제 사용 시에는 .env 파일이나 환경변수 사용)
|
|
api_key = os.environ.get("GEMINI_API_KEY")
|
|
if not api_key:
|
|
print("❌ GEMINI_API_KEY 환경변수를 설정해주세요.")
|
|
return
|
|
|
|
# 배치 처리기 초기화
|
|
processor = SimpleBatchProcessor(api_key)
|
|
|
|
# 진행률 콜백 함수
|
|
def progress_callback(current: int, total: int, status: str):
|
|
percentage = (current / total) * 100
|
|
print(f"진행률: {current}/{total} ({percentage:.1f}%) - {status}")
|
|
|
|
# 샘플 PDF 파일 경로 (실제 사용 시에는 실제 파일 경로로 교체)
|
|
pdf_files = [
|
|
"D:/MYCLAUDE_PROJECT/fletimageanalysis/testsample/sample1.pdf",
|
|
"D:/MYCLAUDE_PROJECT/fletimageanalysis/testsample/sample2.pdf",
|
|
# 더 많은 파일 추가 가능
|
|
]
|
|
|
|
# 실제 존재하는 PDF 파일만 필터링
|
|
existing_files = [f for f in pdf_files if os.path.exists(f)]
|
|
|
|
if not existing_files:
|
|
print("❌ 처리할 PDF 파일이 없습니다.")
|
|
return
|
|
|
|
# 배치 처리 실행
|
|
results = await processor.process_multiple_pdf_files(
|
|
pdf_file_paths=existing_files,
|
|
custom_prompt=None, # 기본 프롬프트 사용
|
|
max_concurrent_files=2,
|
|
progress_callback=progress_callback
|
|
)
|
|
|
|
# 처리 요약 출력
|
|
summary = processor.get_processing_summary()
|
|
print("\n=== 처리 요약 ===")
|
|
for key, value in summary.items():
|
|
print(f"{key}: {value}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 비동기 메인 함수 실행
|
|
asyncio.run(main())
|