Upload converters/pipeline/step1_convert.py

This commit is contained in:
2026-03-19 09:13:22 +09:00
parent cdee2a7d1a
commit 4e81d68824

View File

@@ -0,0 +1,774 @@
"""
측량/GIS/도로 분야 전용 PDF 변환 처리 스크립트
- 모든 파일은 PDF로 변환하여 사용함
- 지원 형식: DWG, DXF, XLSX, XLS, PPTX, PPT, DOCX, DOC, TXT, 이미지(JPG, PNG), 비디오(MP4 등)
- 비디오 파일은 음성을 텍스트로 변환(STT)하여 PDF 생성
"""
import os
import shutil
from pathlib import Path
from datetime import datetime
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
import win32com.client
import pythoncom
from PIL import Image
import subprocess
import json
class SurveyingFileConverter:
def _dbg(self, msg):
if getattr(self, "debug", False):
print(msg)
def _ensure_ffmpeg_on_path(self):
import os
import shutil
from pathlib import Path
found = shutil.which("ffmpeg")
self._dbg(f"DEBUG ffmpeg which before: {found}")
if found:
self.ffmpeg_exe = found
return True
try:
import imageio_ffmpeg
src = Path(imageio_ffmpeg.get_ffmpeg_exe())
self._dbg(f"DEBUG imageio ffmpeg exe: {src}")
self._dbg(f"DEBUG imageio ffmpeg exists: {src.exists()}")
if not src.exists():
return False
tools_dir = Path(self.output_dir) / "tools_ffmpeg"
tools_dir.mkdir(parents=True, exist_ok=True)
dst = tools_dir / "ffmpeg.exe"
if not dst.exists():
shutil.copyfile(str(src), str(dst))
os.environ["PATH"] = str(tools_dir) + os.pathsep + os.environ.get("PATH", "")
found2 = shutil.which("ffmpeg")
self._dbg(f"DEBUG ffmpeg which after: {found2}")
if found2:
self.ffmpeg_exe = found2
return True
return False
except Exception as e:
self._dbg(f"DEBUG ensure ffmpeg error: {e}")
return False
def __init__(self, source_dir, output_dir):
self.source_dir = Path(source_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.debug = True
self.ffmpeg_exe = None
ok = self._ensure_ffmpeg_on_path()
self._dbg(f"DEBUG ensure_ffmpeg_on_path result: {ok}")
# 변환 기록 리스트
self.conversion_log = []
# 지원 확장자 정의
self.office_extensions = ['.docx', '.doc', '.xlsx', '.xls', '.pptx', '.ppt']
self.image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']
self.cad_extensions = ['.dwg', '.dxf']
self.video_extensions = ['.mp4', '.avi', '.mkv', '.mov', '.m4a', '.wav', '.mp3']
self.text_extensions = ['.txt', '.md', '.csv']
self.pdf_extension = ['.pdf']
# STT 초기화: 측량 전문 용어 사전 로드 예정
self.domain_terms = ""
# HWP 보안 모듈 등록 시도
self.hwp_security_modules = [
"FilePathCheckerModuleExample",
"SecurityModule",
""
]
# DWG 변환용 뷰어 설치 확인 (설치 경로 조절 가능)
self.trueview_path = self._find_trueview()
def _find_trueview(self):
"""DWG TrueView 설치 경로 탐색"""
possible_paths = [
r"C:\Program Files\Autodesk\DWG TrueView 2025\dwgviewr.exe",
r"C:\Program Files\Autodesk\DWG TrueView 2024\dwgviewr.exe",
r"C:\Program Files\Autodesk\DWG TrueView 2023\dwgviewr.exe",
r"C:\Program Files (x86)\Autodesk\DWG TrueView 2025\dwgviewr.exe",
r"C:\Program Files (x86)\Autodesk\DWG TrueView 2024\dwgviewr.exe",
]
for path in possible_paths:
if Path(path).exists():
return path
return None
def get_all_files(self):
"""폴더 내 모든 파일 가져오기"""
all_files = []
for file_path in self.source_dir.rglob('*'):
if file_path.is_file():
all_files.append(file_path)
return all_files
def extract_audio_from_video(self, video_path, audio_output_path):
try:
import imageio_ffmpeg
from pathlib import Path
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
self._dbg(f"DEBUG extract ffmpeg_exe: {ffmpeg_exe}")
self._dbg(f"DEBUG extract ffmpeg_exe exists: {Path(ffmpeg_exe).exists()}")
self._dbg(f"DEBUG extract input exists: {Path(video_path).exists()}")
self._dbg(f"DEBUG extract out path: {audio_output_path}")
cmd = [
ffmpeg_exe,
"-i", str(video_path),
"-vn",
"-acodec", "pcm_s16le",
"-ar", "16000",
"-ac", "1",
"-y",
str(audio_output_path),
]
self._dbg("DEBUG extract cmd: " + " ".join(cmd))
result = subprocess.run(cmd, capture_output=True, timeout=300, check=True, text=True)
self._dbg(f"DEBUG extract returncode: {result.returncode}")
self._dbg(f"DEBUG extract stderr tail: {(result.stderr or '')[-300:]}")
return True
except subprocess.CalledProcessError as e:
self._dbg(f"DEBUG extract CalledProcessError returncode: {e.returncode}")
self._dbg(f"DEBUG extract stderr tail: {(e.stderr or '')[-300:]}")
return False
except Exception as e:
self._dbg(f"DEBUG extract exception: {e}")
return False
def transcribe_audio_with_whisper(self, audio_path):
try:
self._ensure_ffmpeg_on_path()
import shutil
from pathlib import Path
ffmpeg_path = shutil.which("ffmpeg")
self._dbg(f"DEBUG whisper ffmpeg which: {ffmpeg_path}")
if not ffmpeg_path:
if self.ffmpeg_exe:
import os
os.environ["PATH"] = str(Path(self.ffmpeg_exe).parent) + os.pathsep + os.environ.get("PATH", "")
audio_file = Path(audio_path)
self._dbg(f"DEBUG whisper audio exists: {audio_file.exists()}")
self._dbg(f"DEBUG whisper audio size: {audio_file.stat().st_size if audio_file.exists() else 'NA'}")
if not audio_file.exists() or audio_file.stat().st_size == 0:
return "[음성 데이터 추출 결과 없음]"
import whisper
model = whisper.load_model("medium") # 기본 base 에서 medium으로 변경
# 전문 용어 domain_terms를 initial_prompt로 사용
result = model.transcribe(
str(audio_path),
language="ko",
task="transcribe",
initial_prompt=self.domain_terms if self.domain_terms else None,
condition_on_previous_text=True, # 옵션 설정 True로
)
# 후처리: 반복 문구 등 제거 시도
text = result["text"]
text = self.clean_transcript(text)
return text
except Exception as e:
import traceback
self._dbg(f"DEBUG whisper traceback: {traceback.format_exc()}")
return f"[음성 인식 실패: {str(e)}]"
def clean_transcript(self, text):
"""Whisper 결과 후처리 - 중복/환각 제거"""
import re
# 1. 영문/한문/중국어 등 제거
text = re.sub(r'[A-Za-z]{3,}', '', text) # 3글자 이상 영문 제거
text = re.sub(r'[\u3040-\u309F\u30A0-\u30FF]+', '', text) # 일어 제거
text = re.sub(r'[\u4E00-\u9FFF]+', '', text) # 한자 제거 (중국어)
# 2. 반복 문장 제거
sentences = text.split('.')
seen = set()
unique_sentences = []
for s in sentences:
s_clean = s.strip()
if s_clean and s_clean not in seen:
seen.add(s_clean)
unique_sentences.append(s_clean)
text = '. '.join(unique_sentences)
# 3. 공백 정리
text = re.sub(r'\s+', ' ', text) # 다중 공백 제거
text = text.strip()
return text
def get_video_transcript(self, video_path):
"""동영상 파일을 텍스트로 변환"""
try:
temp_audio = video_path.parent / f"{video_path.stem}_temp_audio.wav"
# 1. 오디오 추출
if not self.extract_audio_from_video(video_path, temp_audio):
return self.get_basic_file_info(video_path) + "\n\n[오디오 추출 실패]"
if (not temp_audio.exists()) or temp_audio.stat().st_size == 0:
return self.get_basic_file_info(video_path) + "\n\n[오디오 파일 생성 실패]"
# 2. Whisper로 텍스트 변환
transcript = self.transcribe_audio_with_whisper(temp_audio)
# 3. 임시 파일 삭제
if temp_audio.exists():
temp_audio.unlink()
# 4. 결과 포맷팅
stat = video_path.stat()
lines = []
lines.append(f"동영상 파일 분석 결과 (Speech-to-Text)")
lines.append(f"=" * 60)
lines.append(f"파일명: {video_path.name}")
lines.append(f"경로: {video_path}")
lines.append(f"크기: {self.format_file_size(stat.st_size)}")
lines.append(f"분석 내용:")
lines.append("=" * 60)
lines.append("")
lines.append(transcript)
return "\n".join(lines)
except Exception as e:
return self.get_basic_file_info(video_path) + f"\n\n[분석 오류 발생: {str(e)}]"
def convert_dwg_to_pdf(self, dwg_path, pdf_path):
"""TrueView를 사용하여 DWG 파일을 PDF로 변환 시도"""
if not self.trueview_path:
return False, "DWG TrueView를 찾을 수 없습니다."
try:
# AutoCAD 스크립트 작성
script_content = f"""_-EXPORT_PDF{pdf_path}_Y"""
script_path = dwg_path.parent / f"{dwg_path.stem}_plot.scr"
with open(script_path, 'w') as f:
f.write(script_content)
# TrueView 실행
cmd = [
self.trueview_path,
str(dwg_path.absolute()),
"/b", str(script_path.absolute()),
"/nologo"
]
result = subprocess.run(cmd, timeout=120, capture_output=True)
# 스크립트 삭제
if script_path.exists():
try:
script_path.unlink()
except:
pass
# PDF 생성 확인
if pdf_path.exists() and pdf_path.stat().st_size > 0:
return True, "성공"
else:
return False, "PDF 파일 생성 실패"
except subprocess.TimeoutExpired:
return False, "변환 시간 초과"
except Exception as e:
return False, f"DWG 변환 오류: {str(e)}"
def get_basic_file_info(self, file_path):
"""기본 파일 정보 반환"""
stat = file_path.stat()
lines = []
lines.append(f"파일 상세 정보")
lines.append(f"=" * 60)
lines.append(f"파일명: {file_path.name}")
lines.append(f"경로: {file_path}")
lines.append(f"크기: {self.format_file_size(stat.st_size)}")
lines.append(f"확장자: {file_path.suffix}")
lines.append(f"생성일: {datetime.fromtimestamp(stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"수정일: {datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("=" * 60)
return "\n".join(lines)
def format_file_size(self, size_bytes):
"""파일 크기 포맷팅"""
if size_bytes == 0: return "0B"
units = ("B", "KB", "MB", "GB", "TB")
import math
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, units[i])
def convert_image_to_pdf(self, image_path, output_path):
"""이미지 파일을 PDF로 변환"""
try:
img = Image.open(image_path)
# RGBA 모드일 경우 RGB로 변환 (PDF는 투명 모드를 다른 방식으로 처리)
if img.mode in ('RGBA', 'LA', 'P'):
# 흰 배경 추가
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
img.save(output_path, 'PDF', resolution=100.0)
return True, "성공"
except Exception as e:
return False, f"이미지 변환 오류: {str(e)}"
def convert_office_to_pdf(self, file_path, output_path):
"""Office 문서를 PDF로 변환"""
ext = file_path.suffix.lower()
if ext in ['.docx', '.doc']:
return self.convert_word_to_pdf(file_path, output_path)
elif ext in ['.xlsx', '.xls']:
return self.convert_excel_to_pdf(file_path, output_path)
elif ext in ['.pptx', '.ppt']:
return self.convert_ppt_to_pdf(file_path, output_path)
else:
return False, "지원되지 않는 Office 형식"
def convert_word_to_pdf(self, file_path, output_path):
"""Word 문서를 PDF로 변환"""
word = None
try:
pythoncom.CoInitialize()
word = win32com.client.Dispatch("Word.Application")
word.Visible = False
doc = word.Documents.Open(str(file_path.absolute()))
doc.SaveAs(str(output_path.absolute()), FileFormat=17) # 17 = wdExportFormatPDF
doc.Close()
word.Quit()
return True, "성공"
except Exception as e:
return False, f"Word 변환 오류: {str(e)}"
finally:
pythoncom.CoUninitialize()
def convert_excel_to_pdf(self, file_path, output_path):
"""Excel 파일을 PDF로 변환 - 한 페이지에 출력되도록 조정"""
try:
excel = win32com.client.Dispatch("Excel.Application")
excel.Visible = False
wb = excel.Workbooks.Open(str(file_path.absolute()))
# 모든 시트 인쇄 영역 설정
for ws in wb.Worksheets:
# 페이지 설정
ws.PageSetup.Zoom = False # 자동 배율 조정 비활성화
ws.PageSetup.FitToPagesWide = 1 # 가로 너비를 1페이지에 맞춤
ws.PageSetup.FitToPagesTall = False # 세로 길이는 내용에 맞춤
ws.PageSetup.LeftMargin = excel.CentimetersToPoints(1)
ws.PageSetup.RightMargin = excel.CentimetersToPoints(1)
ws.PageSetup.TopMargin = excel.CentimetersToPoints(1)
ws.PageSetup.BottomMargin = excel.CentimetersToPoints(1)
# 용지 방향 자동 결정 (가로가 더 긴 경우 가로 방향)
used_range = ws.UsedRange
if used_range.Columns.Count > used_range.Rows.Count:
ws.PageSetup.Orientation = 2 # xlLandscape (가로)
else:
ws.PageSetup.Orientation = 1 # xlPortrait (세로)
# PDF로 저장
wb.ExportAsFixedFormat(0, str(output_path.absolute())) # 0 = PDF
wb.Close()
excel.Quit()
return True, "성공"
except Exception as e:
return False, f"Excel 변환 오류: {str(e)}"
def convert_ppt_to_pdf(self, file_path, output_path):
"""PowerPoint 파일을 PDF로 변환"""
try:
ppt = win32com.client.Dispatch("PowerPoint.Application")
presentation = ppt.Presentations.Open(str(file_path.absolute()), WithWindow=False)
presentation.SaveAs(str(output_path.absolute()), 32) # 32 = ppSaveAsPDF
presentation.Close()
ppt.Quit()
return True, "성공"
except Exception as e:
return False, f"PowerPoint 변환 오류: {str(e)}"
def convert_hwp_to_pdf(self, file_path, output_path):
hwp = None
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
hwp = win32com.client.gencache.EnsureDispatch("HWPFrame.HwpObject")
except Exception:
hwp = win32com.client.Dispatch("HWPFrame.HwpObject")
registered = False
last_reg_error = None
for module_name in getattr(self, "hwp_security_modules", [""]):
try:
hwp.RegisterModule("FilePathCheckDLL", module_name)
registered = True
break
except Exception as e:
last_reg_error = e
if not registered:
return False, f"HWP 보안 모듈 등록 실패: {last_reg_error}"
hwp.Open(str(file_path.absolute()), "", "")
hwp.HAction.GetDefault("FileSaveAsPdf", hwp.HParameterSet.HFileOpenSave.HSet)
hwp.HParameterSet.HFileOpenSave.filename = str(output_path.absolute())
hwp.HParameterSet.HFileOpenSave.Format = "PDF"
hwp.HAction.Execute("FileSaveAsPdf", hwp.HParameterSet.HFileOpenSave.HSet)
if output_path.exists() and output_path.stat().st_size > 0:
return True, "성공"
return False, "PDF 파일 생성 실패"
except Exception as e:
return False, f"HWP 변환 오류: {str(e)}"
finally:
try:
if hwp:
try:
hwp.Clear(1)
except Exception:
pass
try:
hwp.Quit()
except Exception:
pass
except Exception:
pass
def convert_text_to_pdf(self, text_path, output_path):
"""텍스트 파일을 PDF로 변환 (ReportLab 사용)"""
try:
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
# 한글 폰트 등록 (윈도우 기본 폰트 사용)
try:
pdfmetrics.registerFont(TTFont('Malgun', 'malgun.ttf'))
font_name = 'Malgun'
except:
font_name = 'Helvetica'
# 텍스트 내용 읽기
content = ""
for encoding in ['utf-8', 'cp949', 'euc-kr']:
try:
with open(text_path, 'r', encoding=encoding) as f:
content = f.read()
break
except:
continue
c = canvas.Canvas(str(output_path), pagesize=A4)
width, height = A4
c.setFont(font_name, 10)
# 여백 설정
margin = 50
y = height - margin
line_height = 14
# 줄 단위로 처리
for line in content.split('\n'):
if y < margin: # 다음 페이지로
c.showPage()
c.setFont(font_name, 10)
y = height - margin
# 긴 줄 자동 줄바꿈
if len(line) > 100:
chunks = [line[i:i+100] for i in range(0, len(line), 100)]
for chunk in chunks:
c.drawString(margin, y, chunk)
y -= line_height
else:
c.drawString(margin, y, line)
y -= line_height
c.save()
return True, "성공"
except Exception as e:
return False, f"텍스트 변환 오류: {str(e)}"
def process_file(self, file_path):
"""개별 파일 변환 처리"""
ext = file_path.suffix.lower()
# 출력 폴더 경로 계산 (원본 폴더 구조 유지)
relative_path = file_path.relative_to(self.source_dir)
output_subdir = self.output_dir / relative_path.parent
output_subdir.mkdir(parents=True, exist_ok=True)
# PDF 변환 결과 경로
output_pdf = output_subdir / f"{file_path.stem}.pdf"
success = False
message = ""
try:
# 이미 PDF인 경우 복사
if ext in self.pdf_extension:
shutil.copy2(file_path, output_pdf)
success = True
message = "PDF 복사 완료"
# DWG/DXF 처리
elif ext in self.cad_extensions:
success, message = self.convert_dwg_to_pdf(file_path, output_pdf)
# Office 파일 처리
elif ext in self.office_extensions:
success, message = self.convert_office_to_pdf(file_path, output_pdf)
# 이미지 파일 처리
elif ext in self.image_extensions:
success, message = self.convert_image_to_pdf(file_path, output_pdf)
# 동영상/음성 파일 처리
elif ext in self.video_extensions:
# 텍스트 추출 후 TXT 생성 후 PDF 변환
transcript_text = self.get_video_transcript(file_path)
temp_txt = output_subdir / f"{file_path.stem}_transcript.txt"
with open(temp_txt, 'w', encoding='utf-8') as f:
f.write(transcript_text)
# txt를 PDF로 변환
success, message = self.convert_text_to_pdf(temp_txt, output_pdf)
if success: message = "음성 추출 및 PDF 변환 성공 (txt 보관됨)"
# 원본 txt는 보관함 (참고용)
# 텍스트 파일 처리
elif ext in self.text_extensions:
success, message = self.convert_text_to_pdf(file_path, output_pdf)
# 기타 지원되지 않는 파일은 기본 정보만 담은 PDF 생성 고려 (현재는 생략)
else:
message = f"지원되지 않는 형식 ({ext})"
except Exception as e:
message = f"오류 발생: {str(e)}"
# 로그 기록
self.conversion_log.append({
'원본 경로': str(file_path),
'파일명': file_path.name,
'변환 경로': str(output_pdf) if success else "",
'상태': "성공" if success else "실패",
'메시지': message,
'처리 시간': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
return success, message
def create_excel_report(self, excel_path):
"""변환 결과를 엑셀 파일로 리포트 생성"""
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "변환 결과"
# 헤더 스타일 설정
header_fill = PatternFill(start_color="1F4E78", end_color="1F4E78", fill_type="solid")
header_font = Font(color="FFFFFF", bold=True)
# 헤더 작성
headers = ['번호', '원본 경로', '파일명', '변환 경로', '상태', '메시지', '처리 시간']
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center', vertical='center')
# 데이터 작성
for idx, log in enumerate(self.conversion_log, 2):
ws.cell(row=idx, column=1, value=idx-1)
ws.cell(row=idx, column=2, value=log['원본 경로'])
ws.cell(row=idx, column=3, value=log['파일명'])
ws.cell(row=idx, column=4, value=log['변환 경로'])
# 상태에 따른 색상 지정
status_cell = ws.cell(row=idx, column=5, value=log['상태'])
if log['상태'] == "성공":
status_cell.fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
status_cell.font = Font(color="006100")
else:
status_cell.fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
status_cell.font = Font(color="9C0006")
ws.cell(row=idx, column=6, value=log['메시지'])
ws.cell(row=idx, column=7, value=log['처리 시간'])
# 열 너비 자동 조정
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width
# 전체 요약 시트 추가
summary_ws = wb.create_sheet(title="요약")
total_files = len(self.conversion_log)
success_count = sum(1 for log in self.conversion_log if log['상태'] == "성공")
fail_count = total_files - success_count
summary_data = [
['항목', ''],
['총 파일 수', total_files],
['변환 성공', success_count],
['변환 실패', fail_count],
['성공률', f"{(success_count/total_files*100):.1f}%" if total_files > 0 else "0%"],
['', ''],
['원본 폴더', str(self.source_dir)],
['출력 폴더', str(self.output_dir)],
['처리 완료 시간', datetime.now().strftime('%Y-%m-%d %H:%M:%S')]
]
for row_idx, row_data in enumerate(summary_data, 1):
for col_idx, value in enumerate(row_data, 1):
cell = summary_ws.cell(row=row_idx, column=col_idx, value=value)
if row_idx == 1:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center' if col_idx == 1 else 'left')
summary_ws.column_dimensions['A'].width = 20
summary_ws.column_dimensions['B'].width = 60
# 저장
wb.save(excel_path)
print(f"\n처리 결과 보고서 생성 완료: {excel_path}")
def run(self):
"""전체 변환 프로세스 실행"""
print(f"변환 작업 시작: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"원본 폴더: {self.source_dir}")
print(f"출력 폴더: {self.output_dir}")
# DWG TrueView 설치 여부 확인
if self.trueview_path:
print(f"DWG TrueView 확인됨: {self.trueview_path}")
else:
print("경고: DWG TrueView를 찾을 수 없습니다. DWG 변환이 제한될 수 있습니다.")
print("-" * 80)
# 모든 파일 가져오기
all_files = self.get_all_files()
total_files = len(all_files)
# 동영상 파일 vs 일반 파일 구분
video_files = []
other_files = []
for file_path in all_files:
if file_path.suffix.lower() in self.video_extensions:
video_files.append(file_path)
else:
other_files.append(file_path)
print(f"\n총 탐색된 파일: {total_files}")
print(f" - 문서/이미지 파일: {len(other_files)}")
print(f" - 미디어 파일: {len(video_files)}")
print("\n[1단계] 문서 및 이미지 파일 변환 중...\n")
# 먼저 일반 파일 처리
for idx, file_path in enumerate(other_files, 1):
print(f"[{idx}/{len(other_files)}] {file_path.name} 처리 중...", end=' ')
success, message = self.process_file(file_path)
print(f"{'' if success else ''} {message}")
# 2단계: domain.txt 로드 (STT 향상용)
domain_path = self.source_dir.parent / "domain.txt" # D:\for python\테스트 자료(측량)\domain.txt
if domain_path.exists():
self.domain_terms = domain_path.read_text(encoding='utf-8')
print(f"\n[2단계] 전문 용어 사전(측량) 로드 완료: {domain_path}")
print(f" - 용어 수 {len(self.domain_terms.split())}개 학습됨")
else:
print(f"\n[2단계] 전문 용어 사전(측량) 없음: {domain_path}")
print(" - 기본 음성 모델로 분석을 진행합니다.")
# 3단계: 미디어 파일 처리 (Whisper STT 포함)
if video_files:
print(f"\n[3단계] 미디어 파일 음성 분석 및 변환 중...\n")
for idx, file_path in enumerate(video_files, 1):
print(f"[{idx}/{len(video_files)}] {file_path.name} 처리 중...", end=' ')
success, message = self.process_file(file_path)
print(f"{'' if success else ''} {message}")
# 엑셀 보고서 생성
excel_path = self.output_dir / f"변환결과_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
self.create_excel_report(excel_path)
# 최종 요약 출력
success_count = sum(1 for log in self.conversion_log if log['상태'] == "성공")
print("\n" + "=" * 80)
print(f"작업 완료!")
print(f"총 처리 파일: {total_files}")
print(f"성공: {success_count}")
print(f"실패: {total_files - success_count}")
print(f"성공률: {(success_count/total_files*100):.1f}%" if total_files > 0 else "0%")
print("=" * 80)
if __name__ == "__main__":
# 경로 설정
SOURCE_DIR = r"D:\for python\geulbeot-light\geulbeot-light\00.test\hwpx\in"
OUTPUT_DIR = r"D:\for python\geulbeot-light\geulbeot-light\00.test\hwpx\out"
# 변환기 실행
converter = SurveyingFileConverter(SOURCE_DIR, OUTPUT_DIR)
converter.run()