From 9633ff4a291781a3d8e0f0f515f9627bce593c78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9D=B4=EA=B2=BD=EB=AF=BC?= Date: Thu, 19 Mar 2026 09:13:23 +0900 Subject: [PATCH] Upload converters/pipeline/step4_chunk.py --- .../converters/pipeline/step4_chunk.py | 338 ++++++++++++++++++ 1 file changed, 338 insertions(+) create mode 100644 03.Code/업로드용/converters/pipeline/step4_chunk.py diff --git a/03.Code/업로드용/converters/pipeline/step4_chunk.py b/03.Code/업로드용/converters/pipeline/step4_chunk.py new file mode 100644 index 0000000..5eeab8a --- /dev/null +++ b/03.Code/업로드용/converters/pipeline/step4_chunk.py @@ -0,0 +1,338 @@ +# -*- coding: utf-8 -*- +from dotenv import load_dotenv +load_dotenv() +""" +chunk_and_summary_v2.py + +기능: +- 이전 단계에서 생성된 output/raw/*.md 파일들을 대상으로 + 1) domain_prompt.txt 기반의 의미론적 청킹(Semantic Chunking) + 2) 각 청크별 요약 생성 + 3) 청크 내 이미지 참조 보관 + 4) 최종 결과를 JSON 형식 (메타+청크+요약+이미지)으로 저장 +- extract_1_v2.py 실행 후 생성된 .md 파일을 대상으로 함 +- step1_domainprompt.py 실행 후 domain_prompt.txt가 존재해야 함 +""" +import os +import sys +import json +import re +from pathlib import Path +from datetime import datetime +from openai import OpenAI +from concurrent.futures import ThreadPoolExecutor, as_completed + + +# ===== OpenAI 설정 ===== +OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") +GPT_MODEL = "gpt-5-2025-08-07" +client = OpenAI(api_key=OPENAI_API_KEY) + +# ===== 필터링할 폴더 정의 ===== +SKIP_DIR_NAMES = {"System Volume Information", "$RECYCLE.BIN", ".git", "__pycache__", "output"} + +# ===== 이미지 참조 패턴 ===== +IMAGE_PATTERN = re.compile(r'!\[([^\]]*)\]\(([^)]+)\)') + +def log(msg: str): + print(msg, flush=True) + with (LOG_DIR / "chunk_and_summary_log.txt").open("a", encoding="utf-8") as f: + f.write(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}\n") + + +def load_domain_prompt() -> str: + p = CONTEXT_DIR / "domain_prompt.txt" + if not p.exists(): + log(f"domain_prompt.txt가 존재하지 않습니다: {p}") + log("먼저 step1_domainprompt.py를 실행해야 합니다.") + sys.exit(1) + return p.read_text(encoding="utf-8", errors="ignore").strip() + + +def safe_rel(p: Path) -> str: + """DATA_ROOT 기준 상대 경로 반환""" + try: + return str(p.relative_to(DATA_ROOT)) + except: + return str(p) + + +def extract_text_md(p: Path) -> str: + """마크다운 파일에서 텍스트 읽기""" + try: + return p.read_text(encoding="utf-8", errors="ignore") + except: + return "" + + +def find_images_in_text(text: str) -> list: + """텍스트 내 이미지 참조 찾기""" + matches = IMAGE_PATTERN.findall(text) + return [{"alt": m[0], "path": m[1]} for m in matches] + + +def semantic_chunk(domain_prompt: str, text: str, source_name: str): + """GPT 기반 의미론적 청킹""" + if not text.strip(): + return [] + + # 텍스트가 너무 짧으면 그대로 사용 + if len(text) < 500: + return [{ + "title": "전체 내용", + "keywords": "", + "content": text + }] + + user_prompt = f""" +다음 문서를 의미 있는 단위(문단/항목/주제)로 분리하고, +각 청크별 title / keywords / content 를 포함하는 JSON 배열로 출력하세요: +1. 측량 기준, 법규 등 도메인 지식에 따라 의미 단위로만 분리 +2. 이미지 참조(![...](...))가 포함된 텍스트는 해당 이미지와 관련된 설명과 함께 한 청크로 유지 +3. 각 청크는 최대 1000자 내외로 구성 +4. keywords는 쉼표로 구분된 핵심 단어 3~5개 + +문서 내용: +{text[:12000]} + +JSON 배열로 출력하세요. 다른 설명은 필요 없습니다. +""" + + try: + resp = client.chat.completions.create( + model=GPT_MODEL, + messages=[ + {"role": "system", "content": domain_prompt + "\n\n당신은 문서를 의미 있는 단위로 분석하는 전문가입니다. JSON 배열로 출력하세요."}, + {"role": "user", "content": user_prompt}, + ], + ) + data = resp.choices[0].message.content.strip() + + # JSON 파싱 전 전처리 + # ```json ... ``` 형식 제거 + if "```json" in data: + data = data.split("```json")[1].split("```")[0].strip() + elif "```" in data: + data = data.split("```")[1].split("```")[0].strip() + + if data.startswith("["): + return json.loads(data) + + except json.JSONDecodeError as e: + log(f"[WARN] JSON 파싱 실패 ({source_name}): {e}") + except Exception as e: + log(f"[WARN] semantic_chunk API 실패 ({source_name}): {e}") + + # fallback: 페이지/문단 기반 단순 분리 + log(f"[INFO] Fallback 청킹 적용: {source_name}") + return fallback_chunk(text) + + +def fallback_chunk(text: str) -> list: + """GPT 실패 시 대체 청킹 (페이지/문단 기준)""" + chunks = [] + + if "## Page " in text: + pages = re.split(r'\n## Page \d+\n', text) + for i, page_content in enumerate(pages): + if page_content.strip(): + chunks.append({ + "title": f"Page {i+1}", + "keywords": "", + "content": page_content.strip() + }) + else: + # 빈 줄 2개 이상으로 분리 + sections = re.split(r'\n{3,}', text) + for i, section in enumerate(sections): + if section.strip() and len(section.strip()) > 50: + chunks.append({ + "title": f"단락 {i+1}", + "keywords": "", + "content": section.strip() + }) + + # 여전히 없으면 전체를 하나로 + if not chunks: + chunks.append({ + "title": "전체 내용", + "keywords": "", + "content": text.strip() + }) + + return chunks + + +def summary_chunk(domain_prompt: str, text: str, limit: int = 300) -> str: + """개별 청크 요약 생성""" + if not text.strip(): + return "" + + # 이미지 참조 제거 후 요약 시도 (순수 텍스트 정보 중심) + text_only = IMAGE_PATTERN.sub('', text).strip() + + if len(text_only) < 100: + return text_only + + prompt = f""" +다음 텍스트를 {limit}자 이내의 한 문장 또는 핵심 문단으로 요약하세요. +측량 수치, 특정 기법, 법규 번호 등이 있다면 반드시 보존하십시오. + +{text_only[:8000]} +""" + try: + resp = client.chat.completions.create( + model=GPT_MODEL, + messages=[ + {"role": "system", "content": domain_prompt + "\n\n텍스트를 전문적이고 정확하게 요약하는 전문가입니다."}, + {"role": "user", "content": prompt}, + ], + ) + return resp.choices[0].message.content.strip() + except Exception as e: + log(f"[WARN] summary 실패: {e}") + return text_only[:limit] + + +def save_chunk_files(src: Path, text: str, domain_prompt: str) -> int: + """ + 개별 마크다운 파일에 대해 청킹 및 요약 후 JSON 파일로 저장 + + Returns: + 성공한 청크 개수 + """ + stem = src.stem + folder_ctx = safe_rel(src.parent) + + # 1. 원본 텍스트 보관 + (TEXT_DIR / f"{stem}_text.txt").write_text(text, encoding="utf-8", errors="ignore") + + # 2. 청킹 + chunks = semantic_chunk(domain_prompt, text, src.name) + + if not chunks: + log(f"[WARN] 청킹 결과 없음: {src.name}") + return 0 + + rag_items = [] + + for idx, ch in enumerate(chunks, start=1): + content = ch.get("content", "") + + # 각 청크 요약 + summ = summary_chunk(domain_prompt, content, 300) + + # 이미지 참조 찾기 + images_in_chunk = find_images_in_text(content) + + rag_items.append({ + "source": src.name, + "source_path": safe_rel(src), + "chunk": idx, + "total_chunks": len(chunks), + "title": ch.get("title", ""), + "keywords": ch.get("keywords", ""), + "text": content, + "summary": summ, + "folder_context": folder_ctx, + "images": images_in_chunk, + "has_images": len(images_in_chunk) > 0 + }) + + # JSON 저장 + (JSON_DIR / f"{stem}.json").write_text( + json.dumps(rag_items, ensure_ascii=False, indent=2), + encoding="utf-8" + ) + + # RAG용 통합 JSON 준비를 위해 복사 + (RAG_DIR / f"{stem}_chunks.json").write_text( + json.dumps(rag_items, ensure_ascii=False, indent=2), + encoding="utf-8" + ) + + return len(chunks) + + +def main(input_dir, output_dir): + global DATA_ROOT, OUTPUT_ROOT, TEXT_DIR, JSON_DIR, RAG_DIR, CONTEXT_DIR, LOG_DIR + DATA_ROOT = Path(input_dir) + OUTPUT_ROOT = Path(output_dir) + TEXT_DIR = OUTPUT_ROOT / "text" + JSON_DIR = OUTPUT_ROOT / "json" + RAG_DIR = OUTPUT_ROOT / "rag" + CONTEXT_DIR = OUTPUT_ROOT / "context" + LOG_DIR = OUTPUT_ROOT / "logs" + for d in [TEXT_DIR, JSON_DIR, RAG_DIR, CONTEXT_DIR, LOG_DIR]: + d.mkdir(parents=True, exist_ok=True) + log("=" * 60) + log("청킹 및 요약 작업 시작") + log(f"데이터 폴더: {DATA_ROOT}") + log(f"출력 폴더: {OUTPUT_ROOT}") + log("=" * 60) + + # 도메인 프롬프트 로드 + domain_prompt = load_domain_prompt() + log(f"도메인 프롬프트 로드 완료 ({len(domain_prompt)}자)") + + stats = {"docs": 0, "chunks": 0, "images": 0, "errors": 0} + + # 모든 .md 파일 찾기 + md_files = list(DATA_ROOT.rglob("*.md")) + log(f"총 {len(md_files)}개의 .md 파일 발견\n") + + def process_file(args): + idx, fpath = args + try: + rel_path = safe_rel(fpath) + log(f"[{idx}/{len(md_files)}] {rel_path}") + text = extract_text_md(fpath) + if not text.strip(): + log(f" 비어있는 문서") + return 0, 0, 0 + images = find_images_in_text(text) + chunk_count = save_chunk_files(fpath, text, domain_prompt) + log(f" {chunk_count}개 청크, {len(images)}개 이미지 추출") + return 1, chunk_count, len(images) + except Exception as e: + log(f" 오류 발생: {e}") + return 0, 0, 0 + + # 병렬 처리 + with ThreadPoolExecutor(max_workers=4) as executor: + futures = {executor.submit(process_file, (i, f)): f for i, f in enumerate(md_files, 1)} + for future in as_completed(futures): + d, c, img = future.result() + stats["docs"] += d + stats["chunks"] += c + stats["images"] += img + + # 전체 통계 저장 + summary = { + "processed_at": datetime.now().isoformat(), + "data_root": str(DATA_ROOT), + "output_root": str(OUTPUT_ROOT), + "statistics": stats + } + + (LOG_DIR / "chunk_summary_stats.json").write_text( + json.dumps(summary, ensure_ascii=False, indent=2), + encoding="utf-8" + ) + + # 결과 출력 + log("\n" + "=" * 60) + log("청킹 및 요약 완료!") + log("=" * 60) + log(f"처리 문서: {stats['docs']}개") + log(f"생성 청크: {stats['chunks']}개") + log(f"이미지 포함 청크: {stats['images']}개") + log(f"전체 요약 통계는 LOG_DIR에 저장되었습니다.") + log(f"\n결과 저장 위치:") + log(f" - 텍스트: {TEXT_DIR}") + log(f" - JSON: {JSON_DIR}") + log(f" - RAG: {RAG_DIR}") + + +if __name__ == "__main__": + main()