From 5da3c7cae58e969387488f919e40c0595e93f9e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9D=B4=EA=B2=BD=EB=AF=BC?= Date: Thu, 19 Mar 2026 09:13:25 +0900 Subject: [PATCH] Upload converters/pipeline/step6_corpus.py --- .../converters/pipeline/step6_corpus.py | 211 ++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 03.Code/업로드용/converters/pipeline/step6_corpus.py diff --git a/03.Code/업로드용/converters/pipeline/step6_corpus.py b/03.Code/업로드용/converters/pipeline/step6_corpus.py new file mode 100644 index 0000000..1ca3d7e --- /dev/null +++ b/03.Code/업로드용/converters/pipeline/step6_corpus.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +from dotenv import load_dotenv +load_dotenv() +""" +make_corpus_v2.py + +기능: +- output/rag/*_chunks.json 내의 모든 요약문들을 모음 +- AI가 전체 도메인 목적(측량+지리정보)에 맞게 중복 제거 및 핵심 내용 추출(Compacting) +- 결과는 context/corpus.txt로 저장되며, 이는 전체 보고서의 밑바탕(Corpus)이 됨 +- chunk_and_summary.py 실행 후 생성된 *_chunks.json 파일이 있어야 함. +- domain_prompt.txt가 존재해야 함. +""" +import os +import sys +import json +from pathlib import Path +from datetime import datetime +from openai import OpenAI + +# ===== OpenAI 설정 ===== +OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") +GPT_MODEL = "gpt-5-2025-08-07" +client = OpenAI(api_key=OPENAI_API_KEY) + +# ===== 추출 설정 ===== +BATCH_SIZE = 80 # 한 번에 처리할 요약문 개수 (API 입력 한계 고려) +MAX_CHARS_PER_BATCH = 1500 # 추출 결과 글자 수 제한 +MAX_FINAL_CHARS = 12000 # 최종 corpus 글자 수 + + +def log(msg: str): + print(msg, flush=True) + with (LOG_DIR / "make_corpus_log.txt").open("a", encoding="utf-8") as f: + f.write(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}\n") + + +def load_domain_prompt() -> str: + p = CONTEXT_DIR / "domain_prompt.txt" + if not p.exists(): + log("domain_prompt.txt가 없습니다. 먼저 step1_domainprompt.py를 실행하십시오.") + sys.exit(1) + return p.read_text(encoding="utf-8", errors="ignore").strip() + + +def load_all_summaries() -> list: + """모든 청크의 요약문을 수집""" + summaries = [] + rag_files = sorted(RAG_DIR.glob("*_chunks.json")) + + if not rag_files: + log("RAG 파일이 없습니다. 먼저 chunk_and_summary.py를 실행하십시오.") + sys.exit(1) + + for f in rag_files: + try: + units = json.loads(f.read_text(encoding="utf-8", errors="ignore")) + except Exception as e: + log(f"[WARN] RAG 로드 실패: {f.name} | {e}") + continue + + for u in units: + summ = (u.get("summary") or "").strip() + source = (u.get("source") or "").strip() + keywords = (u.get("keywords") or "") + + if summ: + # 출처 정보 포함 + entry = f"[{source}] {summ}" + if keywords: + entry += f" (키워드: {keywords})" + summaries.append(entry) + + return summaries + + +def compress_batch(domain_prompt: str, batch: list, batch_num: int, total_batches: int) -> str: + """배치 단위로 요약문들을 AI가 압축""" + + batch_text = "\n".join([f"{i+1}. {s}" for i, s in enumerate(batch)]) + + prompt = f""" +다음은 문서에서 추출된 요약들입니다. (배치 {batch_num}/{total_batches}) +도메인 프롬프트를 참고하여, 이 문서들의 핵심 지식과 사실 정보들을 압축하여 정리하십시오. + +규칙: +1) 중복되거나 유사한 내용은 하나로 통합하며 사실 관계를 명확히 함 +2) domain_prompt에 있는 주요 전문 용어나 기준(측량 기법 등)은 반드시 포함 +3) 수치 데이터(정확도, 기준점 번호 등)는 보존 +4) 과거 이력/처리 단계별 특징 등 정보성 있는 내용 중심 +5) 결과는 한글로 작성하며 글자 수는 {MAX_CHARS_PER_BATCH}자 이내로 제한 + +추출된 요약들: +{batch_text} + +출력은 다른 설명 없이 정리된 텍스트만 작성하십시오. +""" + + try: + resp = client.chat.completions.create( + model=GPT_MODEL, + messages=[ + {"role": "system", "content": domain_prompt + "\n\n당신은 문서 요약들을 주제별로 압축 정리하는 전문가입니다."}, + {"role": "user", "content": prompt} + ] + ) + result = resp.choices[0].message.content.strip() + log(f" 배치 {batch_num} 압축 완료 ({len(result)}자)") + return result + except Exception as e: + log(f"[ERROR] 배치 압축 실패: {e}") + # 실패 시 원본의 일부라도 반환 + return "\n".join(batch[:10]) + + +def merge_compressed_parts(domain_prompt: str, parts: list) -> str: + """압축된 배치 결과물들을 최종적으로 통합""" + + if len(parts) == 1: + return parts[0] + + all_parts = "\n\n---\n\n".join([f"[파트 {i+1}]\n{p}" for i, p in enumerate(parts)]) + + prompt = f""" +다음은 여러 파트로 나누어 압축된 문서 요약 결과들입니다. +이를 도메인 지식 기반의 통합 코퍼스(Corpus)로 만드십시오. + +통합 기준: +1) 도메인 전문가가 참고할 수 있는 백과사전식 기술 정보 중심 +2) domain_prompt에 부합하는 목적과 업무 흐름이 보이도록 구성 +3) 중복된 기술 설명은 최신/최고 사양 기준으로 정리 +4) 결과물은 총 {MAX_FINAL_CHARS}자 이내로 구성 + +출력은 주제별로 일목요연하게 정리된 최종 코퍼스 텍스트만 출력하십시오. +""" + + try: + resp = client.chat.completions.create( + model=GPT_MODEL, + messages=[ + {"role": "system", "content": domain_prompt + "\n\n당신은 전체 기술 코퍼스를 설계하는 정보 아키텍트입니다."}, + {"role": "user", "content": prompt} + ] + ) + return resp.choices[0].message.content.strip() + except Exception as e: + log(f"[ERROR] 최종 통합 실패: {e}") + return "\n\n".join(parts) + + +def main(input_dir, output_dir): + global DATA_ROOT, OUTPUT_ROOT, RAG_DIR, CONTEXT_DIR, LOG_DIR + DATA_ROOT = Path(input_dir) + OUTPUT_ROOT = Path(output_dir) + RAG_DIR = OUTPUT_ROOT / "rag" + CONTEXT_DIR = OUTPUT_ROOT / "context" + LOG_DIR = OUTPUT_ROOT / "logs" + for d in [RAG_DIR, CONTEXT_DIR, LOG_DIR]: + d.mkdir(parents=True, exist_ok=True) + log("=" * 60) + log("Corpus 생성 작업 시작 (AI 압축 방식)") + log("=" * 60) + + # 도메인 프롬프트 로드 + domain_prompt = load_domain_prompt() + log(f"도메인 프롬프트 로드 완료 ({len(domain_prompt)}자)") + + # 모든 요약문 수집 + summaries = load_all_summaries() + if not summaries: + log("요약 데이터가 없습니다. 먼저 chunk_and_summary.py를 실행하십시오.") + sys.exit(1) + + log(f"총 요약문 개수: {len(summaries)}개") + + # 1단계: 배치 압축 + compressed_parts = [] + total_batches = (len(summaries) + BATCH_SIZE - 1) // BATCH_SIZE + log(f"\n배치 압축 시작 ({BATCH_SIZE}개씩 {total_batches}개 배치)...") + + for i in range(total_batches): + batch = summaries[i*BATCH_SIZE : (i+1)*BATCH_SIZE] + part = compress_batch(domain_prompt, batch, i+1, total_batches) + compressed_parts.append(part) + + # 2단계: 최종 통합 + log(f"\n최종 통합 시작 ({len(compressed_parts)}개 파트)...") + final_corpus = merge_compressed_parts(domain_prompt, compressed_parts) + + # 저장 + out_path = CONTEXT_DIR / "corpus.txt" + out_path.write_text(final_corpus, encoding="utf-8") + + # 통계 출력 + raw_corpus_len = sum(len(s) for s in summaries) + log("\n" + "=" * 60) + log("Corpus 생성 완료!") + log("=" * 60) + log(f"전체 요약문: {len(summaries)}개 ({raw_corpus_len}자)") + log(f"최종 Corpus: {len(final_corpus)}자") + log(f"압축률: {100 - (len(final_corpus) / raw_corpus_len * 100):.1f}%") + log(f"\n결과 저장 위치:") + log(f" - 원본 백업: {CONTEXT_DIR / 'corpus_raw.txt'}") + log(f" - 최종 Corpus: {out_path}") + + # 원본 백업 (디버깅용) + (CONTEXT_DIR / "corpus_raw.txt").write_text("\n".join(summaries), encoding="utf-8") + + +if __name__ == "__main__": + main()