Upload converter pipeline: step1_convert.py

This commit is contained in:
2026-03-19 09:40:04 +09:00
parent f22fd99fec
commit dd936b230b

View File

@@ -1,10 +1,11 @@
""" """
측량/GIS/도로 분야 전용 PDF 변환 처리 스크립트 측량/GIS/드론 관련 자료 PDF 변환 및 정리 시스템
- 모든 파일 형식을 PDF로 변환
- DWG 파일: DWG TrueView를 사용한 자동 PDF 변환
- 동영상 파일: Whisper를 사용한 음성→텍스트 변환 후 PDF 생성
- 원본 경로와 변환 파일 경로를 엑셀로 관리
"""
- 모든 파일은 PDF로 변환하여 사용함
- 지원 형식: DWG, DXF, XLSX, XLS, PPTX, PPT, DOCX, DOC, TXT, 이미지(JPG, PNG), 비디오(MP4 등)
- 비디오 파일은 음성을 텍스트로 변환(STT)하여 PDF 생성
"""
import os import os
import shutil import shutil
from pathlib import Path from pathlib import Path
@@ -35,7 +36,7 @@ class SurveyingFileConverter:
try: try:
import imageio_ffmpeg import imageio_ffmpeg
src = Path(imageio_ffmpeg.get_ffmpeg_exe()) src = Path(imageio_ffmpeg.get_ffmpeg_exe())
self._dbg(f"DEBUG imageio ffmpeg exe: {src}") self._dbg(f"DEBUG imageio ffmpeg exe: {src}")
self._dbg(f"DEBUG imageio ffmpeg exists: {src.exists()}") self._dbg(f"DEBUG imageio ffmpeg exists: {src.exists()}")
@@ -77,32 +78,32 @@ class SurveyingFileConverter:
ok = self._ensure_ffmpeg_on_path() ok = self._ensure_ffmpeg_on_path()
self._dbg(f"DEBUG ensure_ffmpeg_on_path result: {ok}") self._dbg(f"DEBUG ensure_ffmpeg_on_path result: {ok}")
# 변환 기록 리스트 # 변환 로그를 저장할 리스트
self.conversion_log = [] self.conversion_log = []
# 지원 확장자 정의 # ★ 추가: 도메인 용어 사전
self.office_extensions = ['.docx', '.doc', '.xlsx', '.xls', '.pptx', '.ppt']
self.image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']
self.cad_extensions = ['.dwg', '.dxf']
self.video_extensions = ['.mp4', '.avi', '.mkv', '.mov', '.m4a', '.wav', '.mp3']
self.text_extensions = ['.txt', '.md', '.csv']
self.pdf_extension = ['.pdf']
# STT 초기화: 측량 전문 용어 사전 로드 예정
self.domain_terms = "" self.domain_terms = ""
# HWP 보안 모듈 등록 시도 # HWP 보안 모듈 후보 목록 추가
self.hwp_security_modules = [ self.hwp_security_modules = [
"FilePathCheckerModuleExample", "FilePathCheckerModuleExample",
"SecurityModule", "SecurityModule",
"" ""
] ]
# DWG 변환용 뷰어 설치 확인 (설치 경로 조절 가능) # 지원 파일 확장자 정의
self.image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff', '.tif', '.webp'}
self.office_extensions = {'.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.hwp', '.hwpx'}
self.video_extensions = {'.mp4', '.avi', '.mov', '.wmv', '.flv', '.mkv', '.m4v'}
self.text_extensions = {'.txt', '.csv', '.log', '.md'}
self.pdf_extension = {'.pdf'}
self.dwg_extensions = {'.dwg', '.dxf'}
# DWG TrueView 경로 설정 (설치 버전에 맞게 조정)
self.trueview_path = self._find_trueview() self.trueview_path = self._find_trueview()
def _find_trueview(self): def _find_trueview(self):
"""DWG TrueView 설치 경로 탐색""" """DWG TrueView 설치 경로 자동 탐색"""
possible_paths = [ possible_paths = [
r"C:\Program Files\Autodesk\DWG TrueView 2025\dwgviewr.exe", r"C:\Program Files\Autodesk\DWG TrueView 2025\dwgviewr.exe",
r"C:\Program Files\Autodesk\DWG TrueView 2024\dwgviewr.exe", r"C:\Program Files\Autodesk\DWG TrueView 2024\dwgviewr.exe",
@@ -110,21 +111,21 @@ class SurveyingFileConverter:
r"C:\Program Files (x86)\Autodesk\DWG TrueView 2025\dwgviewr.exe", r"C:\Program Files (x86)\Autodesk\DWG TrueView 2025\dwgviewr.exe",
r"C:\Program Files (x86)\Autodesk\DWG TrueView 2024\dwgviewr.exe", r"C:\Program Files (x86)\Autodesk\DWG TrueView 2024\dwgviewr.exe",
] ]
for path in possible_paths: for path in possible_paths:
if Path(path).exists(): if Path(path).exists():
return path return path
return None return None
def get_all_files(self): def get_all_files(self):
"""폴더 내 모든 파일 가져오기""" """하위 모든 폴더의 파일 목록 가져오기"""
all_files = [] all_files = []
for file_path in self.source_dir.rglob('*'): for file_path in self.source_dir.rglob('*'):
if file_path.is_file(): if file_path.is_file():
all_files.append(file_path) all_files.append(file_path)
return all_files return all_files
def extract_audio_from_video(self, video_path, audio_output_path): def extract_audio_from_video(self, video_path, audio_output_path):
try: try:
import imageio_ffmpeg import imageio_ffmpeg
@@ -160,42 +161,42 @@ class SurveyingFileConverter:
except Exception as e: except Exception as e:
self._dbg(f"DEBUG extract exception: {e}") self._dbg(f"DEBUG extract exception: {e}")
return False return False
def transcribe_audio_with_whisper(self, audio_path): def transcribe_audio_with_whisper(self, audio_path):
try: try:
self._ensure_ffmpeg_on_path() self._ensure_ffmpeg_on_path()
import shutil import shutil
from pathlib import Path from pathlib import Path
ffmpeg_path = shutil.which("ffmpeg") ffmpeg_path = shutil.which("ffmpeg")
self._dbg(f"DEBUG whisper ffmpeg which: {ffmpeg_path}") self._dbg(f"DEBUG whisper ffmpeg which: {ffmpeg_path}")
if not ffmpeg_path: if not ffmpeg_path:
if self.ffmpeg_exe: if self.ffmpeg_exe:
import os import os
os.environ["PATH"] = str(Path(self.ffmpeg_exe).parent) + os.pathsep + os.environ.get("PATH", "") os.environ["PATH"] = str(Path(self.ffmpeg_exe).parent) + os.pathsep + os.environ.get("PATH", "")
audio_file = Path(audio_path) audio_file = Path(audio_path)
self._dbg(f"DEBUG whisper audio exists: {audio_file.exists()}") self._dbg(f"DEBUG whisper audio exists: {audio_file.exists()}")
self._dbg(f"DEBUG whisper audio size: {audio_file.stat().st_size if audio_file.exists() else 'NA'}") self._dbg(f"DEBUG whisper audio size: {audio_file.stat().st_size if audio_file.exists() else 'NA'}")
if not audio_file.exists() or audio_file.stat().st_size == 0: if not audio_file.exists() or audio_file.stat().st_size == 0:
return "[음성 데이터 추출 결과 없음]" return "[오디오 파일이 비어있거나 존재하지 않음]"
import whisper import whisper
model = whisper.load_model("medium") # 기본 base 에서 medium으로 변경 model = whisper.load_model("medium") # base medium 변경
# 전문 용어 domain_terms를 initial_prompt로 사용 # domain_terms를 initial_prompt로 사용
result = model.transcribe( result = model.transcribe(
str(audio_path), str(audio_path),
language="ko", language="ko",
task="transcribe", task="transcribe",
initial_prompt=self.domain_terms if self.domain_terms else None, initial_prompt=self.domain_terms if self.domain_terms else None,
condition_on_previous_text=True, # 옵션 설정 True로 condition_on_previous_text=True, # ★ 다시 True로
) )
# 후처리: 반복 문구 등 제거 시도 # 후처리: 반복 및 이상한 텍스트 제거
text = result["text"] text = result["text"]
text = self.clean_transcript(text) text = self.clean_transcript(text)
return text return text
@@ -206,14 +207,14 @@ class SurveyingFileConverter:
return f"[음성 인식 실패: {str(e)}]" return f"[음성 인식 실패: {str(e)}]"
def clean_transcript(self, text): def clean_transcript(self, text):
"""Whisper 결과 후처리 - 복/환각 제거""" """Whisper 결과 후처리 - 복/환각 제거"""
import re import re
# 1. 영문/한문/중국어 제거 # 1. 영어/일본어/중국어 환각 제거
text = re.sub(r'[A-Za-z]{3,}', '', text) # 3글자 이상 영 제거 text = re.sub(r'[A-Za-z]{3,}', '', text) # 3글자 이상 영 제거
text = re.sub(r'[\u3040-\u309F\u30A0-\u30FF]+', '', text) # 일어 제거 text = re.sub(r'[\u3040-\u309F\u30A0-\u30FF]+', '', text) # 일어 제거
text = re.sub(r'[\u4E00-\u9FFF]+', '', text) # 한자 제거 (중국어) text = re.sub(r'[\u4E00-\u9FFF]+', '', text) # 한자 제거 (필요시)
# 2. 반복 문장 제거 # 2. 반복 문장 제거
sentences = text.split('.') sentences = text.split('.')
seen = set() seen = set()
@@ -223,63 +224,67 @@ class SurveyingFileConverter:
if s_clean and s_clean not in seen: if s_clean and s_clean not in seen:
seen.add(s_clean) seen.add(s_clean)
unique_sentences.append(s_clean) unique_sentences.append(s_clean)
text = '. '.join(unique_sentences) text = '. '.join(unique_sentences)
# 3. 공백 정리 # 3. 이상한 문자 정리
text = re.sub(r'\s+', ' ', text) # 다중 공백 제거 text = re.sub(r'\s+', ' ', text) # 다중 공백 제거
text = text.strip() text = text.strip()
return text return text
def get_video_transcript(self, video_path): def get_video_transcript(self, video_path):
"""동영상 파일을 텍스트로 변환""" """동영상 파일의 음성을 텍스트로 변환"""
try: try:
# 임시 오디오 파일 경로
temp_audio = video_path.parent / f"{video_path.stem}_temp_audio.wav" temp_audio = video_path.parent / f"{video_path.stem}_temp_audio.wav"
# 1. 오디오 추출 # 1. 동영상에서 오디오 추출
if not self.extract_audio_from_video(video_path, temp_audio): if not self.extract_audio_from_video(video_path, temp_audio):
return self.get_basic_file_info(video_path) + "\n\n[오디오 추출 실패]" return self.get_basic_file_info(video_path) + "\n\n[오디오 추출 실패]"
if (not temp_audio.exists()) or temp_audio.stat().st_size == 0: if (not temp_audio.exists()) or temp_audio.stat().st_size == 0:
return self.get_basic_file_info(video_path) + "\n\n[오디오 파일 생성 실패]" return self.get_basic_file_info(video_path) + "\n\n[오디오 파일 생성 실패]"
# 2. Whisper로 텍스트 변환 # 2. Whisper로 음성 인식
transcript = self.transcribe_audio_with_whisper(temp_audio) transcript = self.transcribe_audio_with_whisper(temp_audio)
# 3. 임시 파일 삭제 # 3. 임시 오디오 파일 삭제
if temp_audio.exists(): if temp_audio.exists():
temp_audio.unlink() temp_audio.unlink()
# 4. 결과 포맷팅 # 4. 결과 포맷팅
stat = video_path.stat() stat = video_path.stat()
lines = [] lines = []
lines.append(f"동영상 파일 분석 결과 (Speech-to-Text)") lines.append(f"동영상 파일 음성 전사 (Speech-to-Text)")
lines.append(f"=" * 60) lines.append(f"=" * 60)
lines.append(f"파일명: {video_path.name}") lines.append(f"파일명: {video_path.name}")
lines.append(f"경로: {video_path}") lines.append(f"경로: {video_path}")
lines.append(f"크기: {self.format_file_size(stat.st_size)}") lines.append(f"파일 크기: {self.format_file_size(stat.st_size)}")
lines.append(f"분석 내용:") lines.append(f"생성일: {datetime.fromtimestamp(stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("")
lines.append("=" * 60)
lines.append("음성 내용:")
lines.append("=" * 60) lines.append("=" * 60)
lines.append("") lines.append("")
lines.append(transcript) lines.append(transcript)
return "\n".join(lines) return "\n".join(lines)
except Exception as e: except Exception as e:
return self.get_basic_file_info(video_path) + f"\n\n[분석 오류 발생: {str(e)}]" return self.get_basic_file_info(video_path) + f"\n\n[음성 인식 오류: {str(e)}]"
def convert_dwg_to_pdf(self, dwg_path, pdf_path): def convert_dwg_to_pdf_trueview(self, dwg_path, pdf_path):
"""TrueView를 사용하여 DWG 파일을 PDF 변환 시도""" """DWG TrueView를 사용 DWG PDF 변환"""
if not self.trueview_path: if not self.trueview_path:
return False, "DWG TrueView를 찾을 수 없습니다." return False, "DWG TrueView가 설치되지 않음"
try: try:
# AutoCAD 스크립트 # AutoCAD 스크립트
script_content = f"""_-EXPORT_PDF{pdf_path}_Y""" script_content = f"""_-EXPORT_PDF{pdf_path}_Y"""
script_path = dwg_path.parent / f"{dwg_path.stem}_plot.scr" script_path = dwg_path.parent / f"{dwg_path.stem}_plot.scr"
with open(script_path, 'w') as f: with open(script_path, 'w') as f:
f.write(script_content) f.write(script_content)
# TrueView 실행 # TrueView 실행
cmd = [ cmd = [
self.trueview_path, self.trueview_path,
@@ -287,59 +292,55 @@ class SurveyingFileConverter:
"/b", str(script_path.absolute()), "/b", str(script_path.absolute()),
"/nologo" "/nologo"
] ]
result = subprocess.run(cmd, timeout=120, capture_output=True) result = subprocess.run(cmd, timeout=120, capture_output=True)
# 스크립트 삭제 # 스크립트 파일 삭제
if script_path.exists(): if script_path.exists():
try: try:
script_path.unlink() script_path.unlink()
except: except:
pass pass
# PDF 생성 확인 # PDF 생성 확인
if pdf_path.exists() and pdf_path.stat().st_size > 0: if pdf_path.exists():
return True, "성공" return True, "성공"
else: else:
return False, "PDF 파일 생성 실패" return False, "PDF 생성 실패"
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
return False, "변환 시간 초과" return False, "변환 시간 초과"
except Exception as e: except Exception as e:
return False, f"DWG 변환 오류: {str(e)}" return False, f"DWG 변환 실패: {str(e)}"
def get_basic_file_info(self, file_path): def get_basic_file_info(self, file_path):
"""기본 파일 정보 반환""" """기본 파일 정보 반환"""
stat = file_path.stat() stat = file_path.stat()
lines = [] lines = []
lines.append(f"파일 상세 정보") lines.append(f"파일 정보")
lines.append(f"=" * 60) lines.append(f"=" * 60)
lines.append(f"파일명: {file_path.name}") lines.append(f"파일명: {file_path.name}")
lines.append(f"경로: {file_path}") lines.append(f"경로: {file_path}")
lines.append(f"크기: {self.format_file_size(stat.st_size)}") lines.append(f"파일 크기: {self.format_file_size(stat.st_size)}")
lines.append(f"확장자: {file_path.suffix}")
lines.append(f"생성일: {datetime.fromtimestamp(stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S')}") lines.append(f"생성일: {datetime.fromtimestamp(stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"수정일: {datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')}") lines.append(f"수정일: {datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("=" * 60)
return "\n".join(lines) return "\n".join(lines)
def format_file_size(self, size_bytes): def format_file_size(self, size_bytes):
"""파일 크기 포맷팅""" """파일 크기를 읽기 쉬운 형식으로 변환"""
if size_bytes == 0: return "0B" for unit in ['B', 'KB', 'MB', 'GB']:
units = ("B", "KB", "MB", "GB", "TB") if size_bytes < 1024.0:
import math return f"{size_bytes:.2f} {unit}"
i = int(math.floor(math.log(size_bytes, 1024))) size_bytes /= 1024.0
p = math.pow(1024, i) return f"{size_bytes:.2f} TB"
s = round(size_bytes / p, 2)
return "%s %s" % (s, units[i])
def convert_image_to_pdf(self, image_path, output_path): def convert_image_to_pdf(self, image_path, output_path):
"""이미지 파일을 PDF로 변환""" """이미지 파일을 PDF로 변환"""
try: try:
img = Image.open(image_path) img = Image.open(image_path)
# RGBA 모드일 경우 RGB로 변환 (PDF는 투명 모드를 다른 방식으로 처리) # RGB 모드로 변환 (RGBA나 다른 모드 처리)
if img.mode in ('RGBA', 'LA', 'P'): if img.mode in ('RGBA', 'LA', 'P'):
# 흰 배경 추가 # 흰 배경 생성
background = Image.new('RGB', img.size, (255, 255, 255)) background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P': if img.mode == 'P':
img = img.convert('RGBA') img = img.convert('RGBA')
@@ -347,87 +348,96 @@ class SurveyingFileConverter:
img = background img = background
elif img.mode != 'RGB': elif img.mode != 'RGB':
img = img.convert('RGB') img = img.convert('RGB')
img.save(output_path, 'PDF', resolution=100.0) img.save(output_path, 'PDF', resolution=100.0)
return True, "성공" return True, "성공"
except Exception as e: except Exception as e:
return False, f"이미지 변환 오류: {str(e)}" return False, f"이미지 변환 실패: {str(e)}"
def convert_office_to_pdf(self, file_path, output_path): def convert_office_to_pdf(self, file_path, output_path):
"""Office 문서를 PDF로 변환""" """Office 문서를 PDF로 변환"""
ext = file_path.suffix.lower() pythoncom.CoInitialize()
if ext in ['.docx', '.doc']: try:
return self.convert_word_to_pdf(file_path, output_path) ext = file_path.suffix.lower()
elif ext in ['.xlsx', '.xls']:
return self.convert_excel_to_pdf(file_path, output_path) if ext in {'.hwp', '.hwpx'}:
elif ext in ['.pptx', '.ppt']: return self.convert_hwp_to_pdf(file_path, output_path)
return self.convert_ppt_to_pdf(file_path, output_path) elif ext in {'.doc', '.docx'}:
else: return self.convert_word_to_pdf(file_path, output_path)
return False, "지원되지 않는 Office 형식" elif ext in {'.xls', '.xlsx'}:
return self.convert_excel_to_pdf(file_path, output_path)
elif ext in {'.ppt', '.pptx'}:
return self.convert_ppt_to_pdf(file_path, output_path)
else:
return False, "지원하지 않는 Office 형식"
except Exception as e:
return False, f"Office 변환 실패: {str(e)}"
finally:
pythoncom.CoUninitialize()
def convert_word_to_pdf(self, file_path, output_path): def convert_word_to_pdf(self, file_path, output_path):
"""Word 문서를 PDF로 변환""" """Word 문서를 PDF로 변환"""
word = None
try: try:
pythoncom.CoInitialize()
word = win32com.client.Dispatch("Word.Application") word = win32com.client.Dispatch("Word.Application")
word.Visible = False word.Visible = False
doc = word.Documents.Open(str(file_path.absolute())) doc = word.Documents.Open(str(file_path.absolute()))
doc.SaveAs(str(output_path.absolute()), FileFormat=17) # 17 = wdExportFormatPDF doc.SaveAs(str(output_path.absolute()), FileFormat=17) # 17 = PDF
doc.Close() doc.Close()
word.Quit() word.Quit()
return True, "성공" return True, "성공"
except Exception as e: except Exception as e:
return False, f"Word 변환 오류: {str(e)}" return False, f"Word 변환 실패: {str(e)}"
finally:
pythoncom.CoUninitialize()
def convert_excel_to_pdf(self, file_path, output_path): def convert_excel_to_pdf(self, file_path, output_path):
"""Excel 파일을 PDF로 변환 - 한 페이지에 출력되도록 조정""" """Excel 파일을 PDF로 변환 - 열 너비에 맞춰 출력"""
try: try:
excel = win32com.client.Dispatch("Excel.Application") excel = win32com.client.Dispatch("Excel.Application")
excel.Visible = False excel.Visible = False
wb = excel.Workbooks.Open(str(file_path.absolute())) wb = excel.Workbooks.Open(str(file_path.absolute()))
# 모든 시트 인쇄 영역 설정 # 모든 시트에 대해 페이지 설정
for ws in wb.Worksheets: for ws in wb.Worksheets:
# 페이지 설정 # 페이지 설정
ws.PageSetup.Zoom = False # 자동 배율 조정 비활성화 ws.PageSetup.Zoom = False # 자동 크기 조정 비활성화
ws.PageSetup.FitToPagesWide = 1 # 가로 너비를 1페이지에 맞춤 ws.PageSetup.FitToPagesWide = 1 # 너비를 1페이지에 맞춤
ws.PageSetup.FitToPagesTall = False # 세로 길이는 내용에 맞춤 ws.PageSetup.FitToPagesTall = False # 높이는 자동 (내용에 따라)
# 여백 최소화 (단위: 포인트, 1cm ≈ 28.35 포인트)
ws.PageSetup.LeftMargin = excel.CentimetersToPoints(1) ws.PageSetup.LeftMargin = excel.CentimetersToPoints(1)
ws.PageSetup.RightMargin = excel.CentimetersToPoints(1) ws.PageSetup.RightMargin = excel.CentimetersToPoints(1)
ws.PageSetup.TopMargin = excel.CentimetersToPoints(1) ws.PageSetup.TopMargin = excel.CentimetersToPoints(1)
ws.PageSetup.BottomMargin = excel.CentimetersToPoints(1) ws.PageSetup.BottomMargin = excel.CentimetersToPoints(1)
# 용지 방향 자동 결정 (가로가 긴 경우 가로 방향) # 용지 방향 자동 결정 (가로가 긴 경우 가로 방향)
used_range = ws.UsedRange used_range = ws.UsedRange
if used_range.Columns.Count > used_range.Rows.Count: if used_range.Columns.Count > used_range.Rows.Count:
ws.PageSetup.Orientation = 2 # xlLandscape (가로) ws.PageSetup.Orientation = 2 # xlLandscape (가로)
else: else:
ws.PageSetup.Orientation = 1 # xlPortrait (세로) ws.PageSetup.Orientation = 1 # xlPortrait (세로)
# PDF로 저장 # PDF로 저장
wb.ExportAsFixedFormat(0, str(output_path.absolute())) # 0 = PDF wb.ExportAsFixedFormat(0, str(output_path.absolute())) # 0 = PDF
wb.Close() wb.Close()
excel.Quit() excel.Quit()
return True, "성공" return True, "성공"
except Exception as e: except Exception as e:
return False, f"Excel 변환 오류: {str(e)}" return False, f"Excel 변환 실패: {str(e)}"
def convert_ppt_to_pdf(self, file_path, output_path): def convert_ppt_to_pdf(self, file_path, output_path):
"""PowerPoint 파일을 PDF로 변환""" """PowerPoint 파일을 PDF로 변환"""
try: try:
ppt = win32com.client.Dispatch("PowerPoint.Application") ppt = win32com.client.Dispatch("PowerPoint.Application")
presentation = ppt.Presentations.Open(str(file_path.absolute()), WithWindow=False) ppt.Visible = True
presentation.SaveAs(str(output_path.absolute()), 32) # 32 = ppSaveAsPDF presentation = ppt.Presentations.Open(str(file_path.absolute()))
presentation.SaveAs(str(output_path.absolute()), 32) # 32 = PDF
presentation.Close() presentation.Close()
ppt.Quit() ppt.Quit()
return True, "성공" return True, "성공"
except Exception as e: except Exception as e:
return False, f"PowerPoint 변환 오류: {str(e)}" return False, f"PowerPoint 변환 실패: {str(e)}"
def convert_hwp_to_pdf(self, file_path, output_path): def convert_hwp_to_pdf(self, file_path, output_path):
hwp = None hwp = None
try: try:
@@ -461,10 +471,10 @@ class SurveyingFileConverter:
if output_path.exists() and output_path.stat().st_size > 0: if output_path.exists() and output_path.stat().st_size > 0:
return True, "성공" return True, "성공"
return False, "PDF 파일 생성 실패" return False, "PDF 생성 확인 실패"
except Exception as e: except Exception as e:
return False, f"HWP 변환 오류: {str(e)}" return False, f"HWP 변환 실패: {str(e)}"
finally: finally:
try: try:
if hwp: if hwp:
@@ -482,48 +492,43 @@ class SurveyingFileConverter:
def convert_text_to_pdf(self, text_path, output_path): def convert_text_to_pdf(self, text_path, output_path):
"""텍스트 파일을 PDF로 변환 (ReportLab 사용)""" """텍스트 파일을 PDF로 변환 (reportlab 사용)"""
try: try:
from reportlab.lib.pagesizes import A4 from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont from reportlab.pdfbase.ttfonts import TTFont
# 한글 폰트 등록 (윈도우 기본 폰트 사용) # 한글 폰트 등록 (시스템에 설치된 폰트 사용)
try: try:
pdfmetrics.registerFont(TTFont('Malgun', 'malgun.ttf')) pdfmetrics.registerFont(TTFont('Malgun', 'malgun.ttf'))
font_name = 'Malgun' font_name = 'Malgun'
except: except:
font_name = 'Helvetica' font_name = 'Helvetica'
# 텍스트 내용 읽기 # 텍스트 읽기
content = "" with open(text_path, 'r', encoding='utf-8', errors='ignore') as f:
for encoding in ['utf-8', 'cp949', 'euc-kr']: content = f.read()
try:
with open(text_path, 'r', encoding=encoding) as f: # PDF 생성
content = f.read()
break
except:
continue
c = canvas.Canvas(str(output_path), pagesize=A4) c = canvas.Canvas(str(output_path), pagesize=A4)
width, height = A4 width, height = A4
c.setFont(font_name, 10) c.setFont(font_name, 10)
# 여백 설정 # 여백 설정
margin = 50 margin = 50
y = height - margin y = height - margin
line_height = 14 line_height = 14
# 줄 단위로 처리 # 줄 단위로 처리
for line in content.split('\n'): for line in content.split('\n'):
if y < margin: # 다음 페이지로 if y < margin: # 페이지 넘김
c.showPage() c.showPage()
c.setFont(font_name, 10) c.setFont(font_name, 10)
y = height - margin y = height - margin
# 긴 줄 자동 줄바꿈 # 긴 줄 자동으로 줄바꿈
if len(line) > 100: if len(line) > 100:
chunks = [line[i:i+100] for i in range(0, len(line), 100)] chunks = [line[i:i+100] for i in range(0, len(line), 100)]
for chunk in chunks: for chunk in chunks:
@@ -532,120 +537,125 @@ class SurveyingFileConverter:
else: else:
c.drawString(margin, y, line) c.drawString(margin, y, line)
y -= line_height y -= line_height
c.save() c.save()
return True, "성공" return True, "성공"
except Exception as e: except Exception as e:
return False, f"텍스트 변환 오류: {str(e)}" return False, f"텍스트 변환 실패: {str(e)}"
def process_file(self, file_path): def process_file(self, file_path):
"""개별 파일 변환 처리""" """개별 파일 처리"""
ext = file_path.suffix.lower() ext = file_path.suffix.lower()
# 출력 폴더 경로 계산 (원본 폴더 구조 유지) # 출력 파일명 생성 (원본 경로 구조 유지)
relative_path = file_path.relative_to(self.source_dir) relative_path = file_path.relative_to(self.source_dir)
output_subdir = self.output_dir / relative_path.parent output_subdir = self.output_dir / relative_path.parent
output_subdir.mkdir(parents=True, exist_ok=True) output_subdir.mkdir(parents=True, exist_ok=True)
# PDF 변환 결과 경로 # PDF 파일명
output_pdf = output_subdir / f"{file_path.stem}.pdf" output_pdf = output_subdir / f"{file_path.stem}.pdf"
success = False success = False
message = "" message = ""
try: try:
# 이미 PDF인 경우 복사 # 이미 PDF인 경우
if ext in self.pdf_extension: if ext in self.pdf_extension:
shutil.copy2(file_path, output_pdf) shutil.copy2(file_path, output_pdf)
success = True success = True
message = "PDF 복사 완료" message = "PDF 복사 완료"
# DWG/DXF 처리 # DWG/DXF 파일
elif ext in self.cad_extensions: elif ext in self.dwg_extensions:
success, message = self.convert_dwg_to_pdf(file_path, output_pdf) success, message = self.convert_dwg_to_pdf_trueview(file_path, output_pdf)
# Office 파일 처리 # 이미지 파일
elif ext in self.office_extensions:
success, message = self.convert_office_to_pdf(file_path, output_pdf)
# 이미지 파일 처리
elif ext in self.image_extensions: elif ext in self.image_extensions:
success, message = self.convert_image_to_pdf(file_path, output_pdf) success, message = self.convert_image_to_pdf(file_path, output_pdf)
# 동영상/음성 파일 처리 # Office 문서
elif ext in self.office_extensions:
success, message = self.convert_office_to_pdf(file_path, output_pdf)
# 동영상 파일 - 음성을 텍스트로 변환 후 PDF 생성
elif ext in self.video_extensions: elif ext in self.video_extensions:
# 텍스트 추출 후 TXT 생성 후 PDF 변환 # 음성→텍스트 변환
transcript_text = self.get_video_transcript(file_path) transcript_text = self.get_video_transcript(file_path)
# 임시 txt 파일 생성
temp_txt = output_subdir / f"{file_path.stem}_transcript.txt" temp_txt = output_subdir / f"{file_path.stem}_transcript.txt"
with open(temp_txt, 'w', encoding='utf-8') as f: with open(temp_txt, 'w', encoding='utf-8') as f:
f.write(transcript_text) f.write(transcript_text)
# txt를 PDF로 변환 # txt를 PDF로 변환
success, message = self.convert_text_to_pdf(temp_txt, output_pdf) success, message = self.convert_text_to_pdf(temp_txt, output_pdf)
if success: message = "음성 추출 및 PDF 변환 성공 (txt 보관됨)"
if success:
# 원본 txt는 보관함 (참고용) message = "성공 (음성 인식 완료)"
# 텍스트 파일 처리 # 임시 txt 파일은 남겨둠 (참고용)
# 텍스트 파일
elif ext in self.text_extensions: elif ext in self.text_extensions:
success, message = self.convert_text_to_pdf(file_path, output_pdf) success, message = self.convert_text_to_pdf(file_path, output_pdf)
# 기타 지원되지 않는 파일은 기본 정보만 담은 PDF 생성 고려 (현재는 생략)
else: else:
message = f"지원지 않는 형식 ({ext})" message = f"지원지 않는 파일 형식: {ext}"
except Exception as e: except Exception as e:
message = f"오류 발생: {str(e)}" message = f"처리 중 오류: {str(e)}"
# 로그 기록 # 로그 기록
self.conversion_log.append({ self.conversion_log.append({
'원본 경로': str(file_path), '원본 경로': str(file_path),
'파일명': file_path.name, '파일명': file_path.name,
'변환 경로': str(output_pdf) if success else "", '파일 형식': ext,
'변환 PDF 경로': str(output_pdf) if success else "",
'상태': "성공" if success else "실패", '상태': "성공" if success else "실패",
'메시지': message, '메시지': message,
'처리 시간': datetime.now().strftime('%Y-%m-%d %H:%M:%S') '처리 시간': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}) })
return success, message return success, message
def create_excel_report(self, excel_path): def create_excel_report(self, excel_path):
"""변환 결과를 엑셀 파일로 리포트 생성""" """변환 결과를 엑셀로 저장"""
wb = openpyxl.Workbook() wb = openpyxl.Workbook()
ws = wb.active ws = wb.active
ws.title = "변환 결과" ws.title = "변환 결과"
# 헤더 스타일 설정 # 헤더 스타일
header_fill = PatternFill(start_color="1F4E78", end_color="1F4E78", fill_type="solid") header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid")
header_font = Font(color="FFFFFF", bold=True) header_font = Font(bold=True, color="FFFFFF")
# 헤더 작성 # 헤더 작성
headers = ['번호', '원본 경로', '파일명', '변환 경로', '상태', '메시지', '처리 시간'] headers = ['번호', '원본 경로', '파일명', '파일 형식', '변환 PDF 경로', '상태', '메시지', '처리 시간']
for col, header in enumerate(headers, 1): for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header) cell = ws.cell(row=1, column=col, value=header)
cell.fill = header_fill cell.fill = header_fill
cell.font = header_font cell.font = header_font
cell.alignment = Alignment(horizontal='center', vertical='center') cell.alignment = Alignment(horizontal='center', vertical='center')
# 데이터 작성 # 데이터 작성
for idx, log in enumerate(self.conversion_log, 2): for idx, log in enumerate(self.conversion_log, 2):
ws.cell(row=idx, column=1, value=idx-1) ws.cell(row=idx, column=1, value=idx-1)
ws.cell(row=idx, column=2, value=log['원본 경로']) ws.cell(row=idx, column=2, value=log['원본 경로'])
ws.cell(row=idx, column=3, value=log['파일명']) ws.cell(row=idx, column=3, value=log['파일명'])
ws.cell(row=idx, column=4, value=log['변환 경로']) ws.cell(row=idx, column=4, value=log['파일 형식'])
ws.cell(row=idx, column=5, value=log['변환 PDF 경로'])
# 상태에 따른 색상 지정
status_cell = ws.cell(row=idx, column=5, value=log['상태']) # 상태에 따라 색상 표시
status_cell = ws.cell(row=idx, column=6, value=log['상태'])
if log['상태'] == "성공": if log['상태'] == "성공":
status_cell.fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") status_cell.fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
status_cell.font = Font(color="006100") status_cell.font = Font(color="006100")
else: else:
status_cell.fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") status_cell.fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
status_cell.font = Font(color="9C0006") status_cell.font = Font(color="9C0006")
ws.cell(row=idx, column=6, value=log['메시지']) ws.cell(row=idx, column=7, value=log['메시지'])
ws.cell(row=idx, column=7, value=log['처리 시간']) ws.cell(row=idx, column=8, value=log['처리 시간'])
# 열 너비 자동 조정 # 열 너비 자동 조정
for column in ws.columns: for column in ws.columns:
max_length = 0 max_length = 0
@@ -658,14 +668,14 @@ class SurveyingFileConverter:
pass pass
adjusted_width = min(max_length + 2, 50) adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width ws.column_dimensions[column_letter].width = adjusted_width
# 전체 요약 시트 추가 # 요약 시트 추가
summary_ws = wb.create_sheet(title="요약") summary_ws = wb.create_sheet(title="요약")
total_files = len(self.conversion_log) total_files = len(self.conversion_log)
success_count = sum(1 for log in self.conversion_log if log['상태'] == "성공") success_count = sum(1 for log in self.conversion_log if log['상태'] == "성공")
fail_count = total_files - success_count fail_count = total_files - success_count
summary_data = [ summary_data = [
['항목', ''], ['항목', ''],
['총 파일 수', total_files], ['총 파일 수', total_files],
@@ -675,9 +685,9 @@ class SurveyingFileConverter:
['', ''], ['', ''],
['원본 폴더', str(self.source_dir)], ['원본 폴더', str(self.source_dir)],
['출력 폴더', str(self.output_dir)], ['출력 폴더', str(self.output_dir)],
['처리 완료 시간', datetime.now().strftime('%Y-%m-%d %H:%M:%S')] ['작업 완료 시간', datetime.now().strftime('%Y-%m-%d %H:%M:%S')]
] ]
for row_idx, row_data in enumerate(summary_data, 1): for row_idx, row_data in enumerate(summary_data, 1):
for col_idx, value in enumerate(row_data, 1): for col_idx, value in enumerate(row_data, 1):
cell = summary_ws.cell(row=row_idx, column=col_idx, value=value) cell = summary_ws.cell(row=row_idx, column=col_idx, value=value)
@@ -685,80 +695,80 @@ class SurveyingFileConverter:
cell.fill = header_fill cell.fill = header_fill
cell.font = header_font cell.font = header_font
cell.alignment = Alignment(horizontal='center' if col_idx == 1 else 'left') cell.alignment = Alignment(horizontal='center' if col_idx == 1 else 'left')
summary_ws.column_dimensions['A'].width = 20 summary_ws.column_dimensions['A'].width = 20
summary_ws.column_dimensions['B'].width = 60 summary_ws.column_dimensions['B'].width = 60
# 저장 # 저장
wb.save(excel_path) wb.save(excel_path)
print(f"\n처리 결과 보고서 생성 완료: {excel_path}") print(f"\n엑셀 보고서 생성 완료: {excel_path}")
def run(self): def run(self):
"""전체 변환 프로세스 실행""" """전체 변환 작업 실행"""
print(f"변환 작업 시작: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"작업 시작: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"원본 폴더: {self.source_dir}") print(f"원본 폴더: {self.source_dir}")
print(f"출력 폴더: {self.output_dir}") print(f"출력 폴더: {self.output_dir}")
# DWG TrueView 설치 여부 확인 # DWG TrueView 확인
if self.trueview_path: if self.trueview_path:
print(f"DWG TrueView 확인됨: {self.trueview_path}") print(f"DWG TrueView 발견: {self.trueview_path}")
else: else:
print("경고: DWG TrueView를 찾을 수 없습니다. DWG 변환이 제한될 수 있습니다.") print("경고: DWG TrueView를 찾을 수 없습니다. DWG 파일 변환이 불가능합니다.")
print("-" * 80) print("-" * 80)
# 모든 파일 가져오기 # 모든 파일 가져오기
all_files = self.get_all_files() all_files = self.get_all_files()
total_files = len(all_files) total_files = len(all_files)
# 동영상 파일 vs 일반 파일 구분 # ★ 파일 분류: 동영상 vs 나머지
video_files = [] video_files = []
other_files = [] other_files = []
for file_path in all_files: for file_path in all_files:
if file_path.suffix.lower() in self.video_extensions: if file_path.suffix.lower() in self.video_extensions:
video_files.append(file_path) video_files.append(file_path)
else: else:
other_files.append(file_path) other_files.append(file_path)
print(f"\n 탐색된 파일: {total_files}") print(f"\n{total_files} 파일 발견")
print(f" - 문서/이미지 파일: {len(other_files)}") print(f" - 문서/이미지 : {len(other_files)}")
print(f" - 미디어 파일: {len(video_files)}") print(f" - 동영상: {len(video_files)}")
print("\n[1단계] 문서 및 이미지 파일 변환 ...\n") print("\n[1단계] 문서 파일 변환 시작...\n")
# 먼저 일반 파일 처리 # ★ 1단계: 문서 파일 먼저 처리
for idx, file_path in enumerate(other_files, 1): for idx, file_path in enumerate(other_files, 1):
print(f"[{idx}/{len(other_files)}] {file_path.name} 처리 중...", end=' ') print(f"[{idx}/{len(other_files)}] {file_path.name} 처리 중...", end=' ')
success, message = self.process_file(file_path) success, message = self.process_file(file_path)
print(f"{'' if success else ''} {message}") print(f"{'' if success else ''} {message}")
# 2단계: domain.txt 로드 (STT 향상용) # 2단계: domain.txt 로드
domain_path = self.source_dir.parent / "domain.txt" # D:\for python\테스트 자료(측량)\domain.txt domain_path = self.source_dir.parent / "domain.txt" # D:\for python\테스트 (측량)\domain.txt
if domain_path.exists(): if domain_path.exists():
self.domain_terms = domain_path.read_text(encoding='utf-8') self.domain_terms = domain_path.read_text(encoding='utf-8')
print(f"\n[2단계] 전문 용어 사전(측량) 로드 완료: {domain_path}") print(f"\n[2단계] 도메인 용어 사전 로드 완료: {domain_path}")
print(f" - 용어 수 {len(self.domain_terms.split())}학습됨") print(f" - 용어 수: 약 {len(self.domain_terms.split())}단어")
else: else:
print(f"\n[2단계] 전문 용어 사전(측량) 없음: {domain_path}") print(f"\n[2단계] 도메인 용어 사전 없음: {domain_path}")
print(" - 기본 음성 모델로 분석을 진행합니다.") print(" - 기본 음성 인식으로 진행합니다.")
# 3단계: 미디어 파일 처리 (Whisper STT 포함) # 3단계: 동영상 파일 처리
if video_files: if video_files:
print(f"\n[3단계] 미디어 파일 음성 분석 및 변환 중...\n") print(f"\n[3단계] 동영상 음성 인식 시작...\n")
for idx, file_path in enumerate(video_files, 1): for idx, file_path in enumerate(video_files, 1):
print(f"[{idx}/{len(video_files)}] {file_path.name} 처리 중...", end=' ') print(f"[{idx}/{len(video_files)}] {file_path.name} 처리 중...", end=' ')
success, message = self.process_file(file_path) success, message = self.process_file(file_path)
print(f"{'' if success else ''} {message}") print(f"{'' if success else ''} {message}")
# 엑셀 보고서 생성 # 엑셀 보고서 생성
excel_path = self.output_dir / f"변환결과_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" excel_path = self.output_dir / f"변환_결과_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
self.create_excel_report(excel_path) self.create_excel_report(excel_path)
# 최종 요약 출력 # 최종 요약
success_count = sum(1 for log in self.conversion_log if log['상태'] == "성공") success_count = sum(1 for log in self.conversion_log if log['상태'] == "성공")
print("\n" + "=" * 80) print("\n" + "=" * 80)
print(f"작업 완료!") print(f"작업 완료!")
print(f" 처리 파일: {total_files}") print(f"총 파일: {total_files}")
print(f"성공: {success_count}") print(f"성공: {success_count}")
print(f"실패: {total_files - success_count}") print(f"실패: {total_files - success_count}")
print(f"성공률: {(success_count/total_files*100):.1f}%" if total_files > 0 else "0%") print(f"성공률: {(success_count/total_files*100):.1f}%" if total_files > 0 else "0%")
@@ -768,7 +778,7 @@ if __name__ == "__main__":
# 경로 설정 # 경로 설정
SOURCE_DIR = r"D:\for python\geulbeot-light\geulbeot-light\00.test\hwpx\in" SOURCE_DIR = r"D:\for python\geulbeot-light\geulbeot-light\00.test\hwpx\in"
OUTPUT_DIR = r"D:\for python\geulbeot-light\geulbeot-light\00.test\hwpx\out" OUTPUT_DIR = r"D:\for python\geulbeot-light\geulbeot-light\00.test\hwpx\out"
# 변환기 실행 # 변환기 실행
converter = SurveyingFileConverter(SOURCE_DIR, OUTPUT_DIR) converter = SurveyingFileConverter(SOURCE_DIR, OUTPUT_DIR)
converter.run() converter.run()