도먼 추가 정보 생성 기능 구현

This commit is contained in:
2025-07-23 13:37:33 +09:00
parent 1a2d57a306
commit 8a5f3128ad
7 changed files with 611 additions and 112 deletions

View File

@@ -52,7 +52,7 @@ class FileProcessingResult:
@dataclass
class BatchProcessingConfig:
"""배치 처리 설정"""
organization_type: str = "국토교통부"
organization_type: str = "한국도로공사"
enable_gemini_batch_mode: bool = False
max_concurrent_files: int = 3
save_intermediate_results: bool = True
@@ -137,6 +137,10 @@ class MultiFileProcessor:
# CSV 저장
if config.output_csv_path:
await self.save_results_to_csv(config.output_csv_path)
# JSON 출력도 함께 생성 (좌표 정보 포함)
json_output_path = config.output_csv_path.replace('.csv', '.json')
await self.save_results_to_json(json_output_path)
return self.processing_results
@@ -351,96 +355,179 @@ class MultiFileProcessor:
async def save_results_to_csv(self, output_path: str) -> None:
"""
처리 결과를 CSV 파일로 저장
처리 결과를 CSV 파일로 저장합니다. (파일당 한 줄, 추가 테이블은 JSON으로 저장)
Args:
output_path: 출력 CSV 파일 경로
"""
try:
# 결과를 DataFrame으로 변환
data_rows = []
# 모든 결과를 순회하며 한 줄 데이터로 가공
for result in self.processing_results:
# 기본 정보
row = {
'file_name': result.file_name,
'file_path': result.file_path,
'file_type': result.file_type,
'file_size_bytes': result.file_size,
'file_size_mb': round(result.file_size / (1024 * 1024), 2),
'processing_time_seconds': result.processing_time,
'success': result.success,
'error_message': result.error_message or '',
'processed_at': result.processed_at
}
if result.file_type.lower() == 'pdf' and result.success and result.pdf_analysis_result:
try:
pdf_data = json.loads(result.pdf_analysis_result)
# 'additional_tables'를 추출하여 별도 처리
additional_tables = pdf_data.pop('additional_tables', [])
if additional_tables:
# JSON 문자열로 변환하여 한 셀에 저장 (한글 유지, 좌표 제외)
tables_for_csv = [
{
"table_title": table.get("table_title"),
"table_data": [
[cell.get("value") for cell in table_row]
for table_row in table.get("table_data", [])
]
} for table in additional_tables
]
row['additional_tables'] = json.dumps(tables_for_csv, ensure_ascii=False)
else:
row['additional_tables'] = ''
# 나머지 기본 정보들을 row에 추가 (좌표 제외)
for key, value in pdf_data.items():
if isinstance(value, dict):
row[key] = value.get('value', '')
else:
row[key] = value
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"PDF 결과 파싱 오류 ({result.file_name}): {e}")
row['error_message'] = f"JSON 파싱 실패: {e}"
# PDF 분석 결과
if result.file_type.lower() == 'pdf':
row['pdf_analysis_result'] = result.pdf_analysis_result or ''
row['dxf_total_attributes'] = ''
row['dxf_total_text_entities'] = ''
row['dxf_title_blocks_summary'] = ''
# DXF 분석 결과
elif result.file_type.lower() == 'dxf':
row['pdf_analysis_result'] = ''
row['dxf_total_attributes'] = result.dxf_total_attributes or 0
row['dxf_total_text_entities'] = result.dxf_total_text_entities or 0
# 타이틀 블록 요약
if result.dxf_title_blocks:
summary = f"{len(result.dxf_title_blocks)}개 타이틀블록"
for tb in result.dxf_title_blocks[:3]: # 처음 3개만 표시
summary += f" | {tb['block_name']}({tb['attributes_count']}속성)"
if len(result.dxf_title_blocks) > 3:
summary += f" | ...외 {len(result.dxf_title_blocks)-3}"
row['dxf_title_blocks_summary'] = summary
else:
row['dxf_title_blocks_summary'] = '타이틀블록 없음'
data_rows.append(row)
# DataFrame 생성 및 CSV 저장
if not data_rows:
logger.warning("CSV로 저장할 데이터가 없습니다.")
return
df = pd.DataFrame(data_rows)
# pdf_analysis_result 컬럼 평탄화
if 'pdf_analysis_result' in df.columns:
# JSON 문자열을 딕셔너리로 변환 (이미 딕셔너리인 경우도 처리)
df['pdf_analysis_result'] = df['pdf_analysis_result'].apply(lambda x: json.loads(x) if isinstance(x, str) and x.strip() else {}).fillna({})
# 평탄화된 데이터를 새로운 DataFrame으로 생성
# errors='ignore'를 사용하여 JSON이 아닌 값은 무시
# record_prefix를 사용하여 컬럼 이름에 접두사 추가
pdf_analysis_df = pd.json_normalize(df['pdf_analysis_result'], errors='ignore', record_prefix='pdf_analysis_result_')
# 원본 df에서 pdf_analysis_result 컬럼 제거
df = df.drop(columns=['pdf_analysis_result'])
# 원본 df와 평탄화된 DataFrame을 병합
df = pd.concat([df, pdf_analysis_df], axis=1)
# 컬럼 순서 정렬을 위한 기본 순서 정의
column_order = [
'file_name', 'file_type', 'file_size_mb', 'processing_time_seconds',
'success', 'error_message', 'processed_at', 'file_path', 'file_size_bytes',
'dxf_total_attributes', 'dxf_total_text_entities', 'dxf_title_blocks_summary'
# --- 컬럼 순서 정의 및 재정렬 ---
base_columns = [
'file_name', 'file_path', 'file_type', 'file_size_mb',
'processing_time_seconds', 'success', 'error_message', 'processed_at'
]
pdf_metadata_columns = [
"도면명", "도면명_line0", "도면명_line1", "도면명_line2", "편철번호", "도면번호",
"Main_Title", "Sub_Title", "Main Title", "Sub Title", "사업명", "사업명_top", "사업명_bot",
"시설_공구", "수평_도면_축척", "수직_도면_축척", "수평축척", "수직축척", "적용표준버전", "적용표준",
"건설분야", "건설단계", "설계공구_공구명", "설계공구_범위", "시공공구_공구명", "시공공구_범위",
"설계사", "시공사", "노선이정", "개정번호_1", "개정날짜_1", "개정내용_1", "작성자_1", "검토자_1", "확인자_1",
"개정차수", "개정일자", "과업책임자", "분야별책임자", "설계자", "위치정보"
]
# 새로 추가된 컬럼은 맨 뒤로
new_columns = ['additional_tables']
dxf_columns = ['dxf_total_attributes', 'dxf_total_text_entities']
# 기존 컬럼 순서를 유지하면서 새로운 컬럼을 추가
existing_columns = [col for col in column_order if col in df.columns]
new_columns = [col for col in df.columns if col not in existing_columns]
df = df[existing_columns + sorted(new_columns)]
potential_columns = base_columns + pdf_metadata_columns + dxf_columns + new_columns
# UTF-8 BOM으로 저장 (한글 호환성)
final_ordered_columns = [col for col in potential_columns if col in df.columns]
remaining_columns = sorted([col for col in df.columns if col not in final_ordered_columns])
df = df[final_ordered_columns + remaining_columns]
# UTF-8 BOM으로 저장
df.to_csv(output_path, index=False, encoding='utf-8-sig')
logger.info(f"CSV 저장 완료: {output_path}")
logger.info(f"{len(data_rows)}파일 결과 저장")
logger.info(f"{len(df)} 결과 저장")
except Exception as e:
logger.error(f"CSV 저장 오류: {str(e)}")
raise
async def save_results_to_json(self, output_path: str) -> None:
"""
처리 결과를 JSON 파일로 저장 (좌표 정보 포함)
Args:
output_path: 출력 JSON 파일 경로
"""
try:
# 결과를 JSON 구조로 변환
json_data = {
"metadata": {
"total_files": len(self.processing_results),
"success_files": sum(1 for r in self.processing_results if r.success),
"failed_files": sum(1 for r in self.processing_results if not r.success),
"generated_at": datetime.now().isoformat(),
"format_version": "1.0"
},
"results": []
}
for result in self.processing_results:
# 기본 정보
result_data = {
"file_info": {
"name": result.file_name,
"path": result.file_path,
"type": result.file_type,
"size_bytes": result.file_size,
"size_mb": round(result.file_size / (1024 * 1024), 2)
},
"processing_info": {
"success": result.success,
"processing_time_seconds": result.processing_time,
"processed_at": result.processed_at,
"error_message": result.error_message
}
}
# PDF 분석 결과 (좌표 정보 포함)
if result.file_type.lower() == 'pdf' and result.pdf_analysis_result:
try:
# JSON 문자열을 딕셔너리로 변환 (이미 딕셔너리인 경우도 처리)
if isinstance(result.pdf_analysis_result, str):
analysis_data = json.loads(result.pdf_analysis_result)
else:
analysis_data = result.pdf_analysis_result
result_data["pdf_analysis"] = analysis_data
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"PDF 분석 결과 JSON 파싱 오류: {e}")
result_data["pdf_analysis"] = {"error": "JSON 파싱 실패", "raw_data": str(result.pdf_analysis_result)}
# DXF 분석 결과
elif result.file_type.lower() == 'dxf':
result_data["dxf_analysis"] = {
"total_attributes": result.dxf_total_attributes or 0,
"total_text_entities": result.dxf_total_text_entities or 0,
"title_blocks": result.dxf_title_blocks or []
}
json_data["results"].append(result_data)
# JSON 파일 저장 (예쁜 포맷팅과 한글 지원)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=2, default=str)
logger.info(f"JSON 저장 완료: {output_path}")
logger.info(f"{len(json_data['results'])}개 파일 결과 저장 (좌표 정보 포함)")
except Exception as e:
logger.error(f"JSON 저장 오류: {str(e)}")
raise
def get_processing_summary(self) -> Dict[str, Any]:
"""처리 결과 요약 정보 반환"""
if not self.processing_results:
@@ -484,7 +571,7 @@ if __name__ == "__main__":
processor = MultiFileProcessor("your-gemini-api-key")
config = BatchProcessingConfig(
organization_type="국토교통부",
organization_type="한국도로공사",
max_concurrent_files=2,
output_csv_path="test_results.csv"
)