# -*- coding: utf-8 -*- from dotenv import load_dotenv load_dotenv() """ chunk_and_summary_v2.py 기능: - 이전 단계에서 생성된 output/raw/*.md 파일들을 대상으로 1) domain_prompt.txt 기반의 의미론적 청킹(Semantic Chunking) 2) 각 청크별 요약 생성 3) 청크 내 이미지 참조 보관 4) 최종 결과를 JSON 형식 (메타+청크+요약+이미지)으로 저장 - extract_1_v2.py 실행 후 생성된 .md 파일을 대상으로 함 - step1_domainprompt.py 실행 후 domain_prompt.txt가 존재해야 함 """ import os import sys import json import re from pathlib import Path from datetime import datetime from openai import OpenAI from concurrent.futures import ThreadPoolExecutor, as_completed # ===== OpenAI 설정 ===== OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") GPT_MODEL = "gpt-5-2025-08-07" client = OpenAI(api_key=OPENAI_API_KEY) # ===== 필터링할 폴더 정의 ===== SKIP_DIR_NAMES = {"System Volume Information", "$RECYCLE.BIN", ".git", "__pycache__", "output"} # ===== 이미지 참조 패턴 ===== IMAGE_PATTERN = re.compile(r'!\[([^\]]*)\]\(([^)]+)\)') def log(msg: str): print(msg, flush=True) with (LOG_DIR / "chunk_and_summary_log.txt").open("a", encoding="utf-8") as f: f.write(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}\n") def load_domain_prompt() -> str: p = CONTEXT_DIR / "domain_prompt.txt" if not p.exists(): log(f"domain_prompt.txt가 존재하지 않습니다: {p}") log("먼저 step1_domainprompt.py를 실행해야 합니다.") sys.exit(1) return p.read_text(encoding="utf-8", errors="ignore").strip() def safe_rel(p: Path) -> str: """DATA_ROOT 기준 상대 경로 반환""" try: return str(p.relative_to(DATA_ROOT)) except: return str(p) def extract_text_md(p: Path) -> str: """마크다운 파일에서 텍스트 읽기""" try: return p.read_text(encoding="utf-8", errors="ignore") except: return "" def find_images_in_text(text: str) -> list: """텍스트 내 이미지 참조 찾기""" matches = IMAGE_PATTERN.findall(text) return [{"alt": m[0], "path": m[1]} for m in matches] def semantic_chunk(domain_prompt: str, text: str, source_name: str): """GPT 기반 의미론적 청킹""" if not text.strip(): return [] # 텍스트가 너무 짧으면 그대로 사용 if len(text) < 500: return [{ "title": "전체 내용", "keywords": "", "content": text }] user_prompt = f""" 다음 문서를 의미 있는 단위(문단/항목/주제)로 분리하고, 각 청크별 title / keywords / content 를 포함하는 JSON 배열로 출력하세요: 1. 측량 기준, 법규 등 도메인 지식에 따라 의미 단위로만 분리 2. 이미지 참조(![...](...))가 포함된 텍스트는 해당 이미지와 관련된 설명과 함께 한 청크로 유지 3. 각 청크는 최대 1000자 내외로 구성 4. keywords는 쉼표로 구분된 핵심 단어 3~5개 문서 내용: {text[:12000]} JSON 배열로 출력하세요. 다른 설명은 필요 없습니다. """ try: resp = client.chat.completions.create( model=GPT_MODEL, messages=[ {"role": "system", "content": domain_prompt + "\n\n당신은 문서를 의미 있는 단위로 분석하는 전문가입니다. JSON 배열로 출력하세요."}, {"role": "user", "content": user_prompt}, ], ) data = resp.choices[0].message.content.strip() # JSON 파싱 전 전처리 # ```json ... ``` 형식 제거 if "```json" in data: data = data.split("```json")[1].split("```")[0].strip() elif "```" in data: data = data.split("```")[1].split("```")[0].strip() if data.startswith("["): return json.loads(data) except json.JSONDecodeError as e: log(f"[WARN] JSON 파싱 실패 ({source_name}): {e}") except Exception as e: log(f"[WARN] semantic_chunk API 실패 ({source_name}): {e}") # fallback: 페이지/문단 기반 단순 분리 log(f"[INFO] Fallback 청킹 적용: {source_name}") return fallback_chunk(text) def fallback_chunk(text: str) -> list: """GPT 실패 시 대체 청킹 (페이지/문단 기준)""" chunks = [] if "## Page " in text: pages = re.split(r'\n## Page \d+\n', text) for i, page_content in enumerate(pages): if page_content.strip(): chunks.append({ "title": f"Page {i+1}", "keywords": "", "content": page_content.strip() }) else: # 빈 줄 2개 이상으로 분리 sections = re.split(r'\n{3,}', text) for i, section in enumerate(sections): if section.strip() and len(section.strip()) > 50: chunks.append({ "title": f"단락 {i+1}", "keywords": "", "content": section.strip() }) # 여전히 없으면 전체를 하나로 if not chunks: chunks.append({ "title": "전체 내용", "keywords": "", "content": text.strip() }) return chunks def summary_chunk(domain_prompt: str, text: str, limit: int = 300) -> str: """개별 청크 요약 생성""" if not text.strip(): return "" # 이미지 참조 제거 후 요약 시도 (순수 텍스트 정보 중심) text_only = IMAGE_PATTERN.sub('', text).strip() if len(text_only) < 100: return text_only prompt = f""" 다음 텍스트를 {limit}자 이내의 한 문장 또는 핵심 문단으로 요약하세요. 측량 수치, 특정 기법, 법규 번호 등이 있다면 반드시 보존하십시오. {text_only[:8000]} """ try: resp = client.chat.completions.create( model=GPT_MODEL, messages=[ {"role": "system", "content": domain_prompt + "\n\n텍스트를 전문적이고 정확하게 요약하는 전문가입니다."}, {"role": "user", "content": prompt}, ], ) return resp.choices[0].message.content.strip() except Exception as e: log(f"[WARN] summary 실패: {e}") return text_only[:limit] def save_chunk_files(src: Path, text: str, domain_prompt: str) -> int: """ 개별 마크다운 파일에 대해 청킹 및 요약 후 JSON 파일로 저장 Returns: 성공한 청크 개수 """ stem = src.stem folder_ctx = safe_rel(src.parent) # 1. 원본 텍스트 보관 (TEXT_DIR / f"{stem}_text.txt").write_text(text, encoding="utf-8", errors="ignore") # 2. 청킹 chunks = semantic_chunk(domain_prompt, text, src.name) if not chunks: log(f"[WARN] 청킹 결과 없음: {src.name}") return 0 rag_items = [] for idx, ch in enumerate(chunks, start=1): content = ch.get("content", "") # 각 청크 요약 summ = summary_chunk(domain_prompt, content, 300) # 이미지 참조 찾기 images_in_chunk = find_images_in_text(content) rag_items.append({ "source": src.name, "source_path": safe_rel(src), "chunk": idx, "total_chunks": len(chunks), "title": ch.get("title", ""), "keywords": ch.get("keywords", ""), "text": content, "summary": summ, "folder_context": folder_ctx, "images": images_in_chunk, "has_images": len(images_in_chunk) > 0 }) # JSON 저장 (JSON_DIR / f"{stem}.json").write_text( json.dumps(rag_items, ensure_ascii=False, indent=2), encoding="utf-8" ) # RAG용 통합 JSON 준비를 위해 복사 (RAG_DIR / f"{stem}_chunks.json").write_text( json.dumps(rag_items, ensure_ascii=False, indent=2), encoding="utf-8" ) return len(chunks) def main(input_dir, output_dir): global DATA_ROOT, OUTPUT_ROOT, TEXT_DIR, JSON_DIR, RAG_DIR, CONTEXT_DIR, LOG_DIR DATA_ROOT = Path(input_dir) OUTPUT_ROOT = Path(output_dir) TEXT_DIR = OUTPUT_ROOT / "text" JSON_DIR = OUTPUT_ROOT / "json" RAG_DIR = OUTPUT_ROOT / "rag" CONTEXT_DIR = OUTPUT_ROOT / "context" LOG_DIR = OUTPUT_ROOT / "logs" for d in [TEXT_DIR, JSON_DIR, RAG_DIR, CONTEXT_DIR, LOG_DIR]: d.mkdir(parents=True, exist_ok=True) log("=" * 60) log("청킹 및 요약 작업 시작") log(f"데이터 폴더: {DATA_ROOT}") log(f"출력 폴더: {OUTPUT_ROOT}") log("=" * 60) # 도메인 프롬프트 로드 domain_prompt = load_domain_prompt() log(f"도메인 프롬프트 로드 완료 ({len(domain_prompt)}자)") stats = {"docs": 0, "chunks": 0, "images": 0, "errors": 0} # 모든 .md 파일 찾기 md_files = list(DATA_ROOT.rglob("*.md")) log(f"총 {len(md_files)}개의 .md 파일 발견\n") def process_file(args): idx, fpath = args try: rel_path = safe_rel(fpath) log(f"[{idx}/{len(md_files)}] {rel_path}") text = extract_text_md(fpath) if not text.strip(): log(f" 비어있는 문서") return 0, 0, 0 images = find_images_in_text(text) chunk_count = save_chunk_files(fpath, text, domain_prompt) log(f" {chunk_count}개 청크, {len(images)}개 이미지 추출") return 1, chunk_count, len(images) except Exception as e: log(f" 오류 발생: {e}") return 0, 0, 0 # 병렬 처리 with ThreadPoolExecutor(max_workers=4) as executor: futures = {executor.submit(process_file, (i, f)): f for i, f in enumerate(md_files, 1)} for future in as_completed(futures): d, c, img = future.result() stats["docs"] += d stats["chunks"] += c stats["images"] += img # 전체 통계 저장 summary = { "processed_at": datetime.now().isoformat(), "data_root": str(DATA_ROOT), "output_root": str(OUTPUT_ROOT), "statistics": stats } (LOG_DIR / "chunk_summary_stats.json").write_text( json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8" ) # 결과 출력 log("\n" + "=" * 60) log("청킹 및 요약 완료!") log("=" * 60) log(f"처리 문서: {stats['docs']}개") log(f"생성 청크: {stats['chunks']}개") log(f"이미지 포함 청크: {stats['images']}개") log(f"전체 요약 통계는 LOG_DIR에 저장되었습니다.") log(f"\n결과 저장 위치:") log(f" - 텍스트: {TEXT_DIR}") log(f" - JSON: {JSON_DIR}") log(f" - RAG: {RAG_DIR}") if __name__ == "__main__": main()