Upload converters/pipeline/step6_corpus.py

This commit is contained in:
2026-03-19 09:13:25 +09:00
parent 2e32ba8254
commit 5da3c7cae5

View File

@@ -0,0 +1,211 @@
# -*- coding: utf-8 -*-
from dotenv import load_dotenv
load_dotenv()
"""
make_corpus_v2.py
기능:
- output/rag/*_chunks.json 내의 모든 요약문들을 모음
- AI가 전체 도메인 목적(측량+지리정보)에 맞게 중복 제거 및 핵심 내용 추출(Compacting)
- 결과는 context/corpus.txt로 저장되며, 이는 전체 보고서의 밑바탕(Corpus)이 됨
- chunk_and_summary.py 실행 후 생성된 *_chunks.json 파일이 있어야 함.
- domain_prompt.txt가 존재해야 함.
"""
import os
import sys
import json
from pathlib import Path
from datetime import datetime
from openai import OpenAI
# ===== OpenAI 설정 =====
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
GPT_MODEL = "gpt-5-2025-08-07"
client = OpenAI(api_key=OPENAI_API_KEY)
# ===== 추출 설정 =====
BATCH_SIZE = 80 # 한 번에 처리할 요약문 개수 (API 입력 한계 고려)
MAX_CHARS_PER_BATCH = 1500 # 추출 결과 글자 수 제한
MAX_FINAL_CHARS = 12000 # 최종 corpus 글자 수
def log(msg: str):
print(msg, flush=True)
with (LOG_DIR / "make_corpus_log.txt").open("a", encoding="utf-8") as f:
f.write(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}\n")
def load_domain_prompt() -> str:
p = CONTEXT_DIR / "domain_prompt.txt"
if not p.exists():
log("domain_prompt.txt가 없습니다. 먼저 step1_domainprompt.py를 실행하십시오.")
sys.exit(1)
return p.read_text(encoding="utf-8", errors="ignore").strip()
def load_all_summaries() -> list:
"""모든 청크의 요약문을 수집"""
summaries = []
rag_files = sorted(RAG_DIR.glob("*_chunks.json"))
if not rag_files:
log("RAG 파일이 없습니다. 먼저 chunk_and_summary.py를 실행하십시오.")
sys.exit(1)
for f in rag_files:
try:
units = json.loads(f.read_text(encoding="utf-8", errors="ignore"))
except Exception as e:
log(f"[WARN] RAG 로드 실패: {f.name} | {e}")
continue
for u in units:
summ = (u.get("summary") or "").strip()
source = (u.get("source") or "").strip()
keywords = (u.get("keywords") or "")
if summ:
# 출처 정보 포함
entry = f"[{source}] {summ}"
if keywords:
entry += f" (키워드: {keywords})"
summaries.append(entry)
return summaries
def compress_batch(domain_prompt: str, batch: list, batch_num: int, total_batches: int) -> str:
"""배치 단위로 요약문들을 AI가 압축"""
batch_text = "\n".join([f"{i+1}. {s}" for i, s in enumerate(batch)])
prompt = f"""
다음은 문서에서 추출된 요약들입니다. (배치 {batch_num}/{total_batches})
도메인 프롬프트를 참고하여, 이 문서들의 핵심 지식과 사실 정보들을 압축하여 정리하십시오.
규칙:
1) 중복되거나 유사한 내용은 하나로 통합하며 사실 관계를 명확히 함
2) domain_prompt에 있는 주요 전문 용어나 기준(측량 기법 등)은 반드시 포함
3) 수치 데이터(정확도, 기준점 번호 등)는 보존
4) 과거 이력/처리 단계별 특징 등 정보성 있는 내용 중심
5) 결과는 한글로 작성하며 글자 수는 {MAX_CHARS_PER_BATCH}자 이내로 제한
추출된 요약들:
{batch_text}
출력은 다른 설명 없이 정리된 텍스트만 작성하십시오.
"""
try:
resp = client.chat.completions.create(
model=GPT_MODEL,
messages=[
{"role": "system", "content": domain_prompt + "\n\n당신은 문서 요약들을 주제별로 압축 정리하는 전문가입니다."},
{"role": "user", "content": prompt}
]
)
result = resp.choices[0].message.content.strip()
log(f" 배치 {batch_num} 압축 완료 ({len(result)}자)")
return result
except Exception as e:
log(f"[ERROR] 배치 압축 실패: {e}")
# 실패 시 원본의 일부라도 반환
return "\n".join(batch[:10])
def merge_compressed_parts(domain_prompt: str, parts: list) -> str:
"""압축된 배치 결과물들을 최종적으로 통합"""
if len(parts) == 1:
return parts[0]
all_parts = "\n\n---\n\n".join([f"[파트 {i+1}]\n{p}" for i, p in enumerate(parts)])
prompt = f"""
다음은 여러 파트로 나누어 압축된 문서 요약 결과들입니다.
이를 도메인 지식 기반의 통합 코퍼스(Corpus)로 만드십시오.
통합 기준:
1) 도메인 전문가가 참고할 수 있는 백과사전식 기술 정보 중심
2) domain_prompt에 부합하는 목적과 업무 흐름이 보이도록 구성
3) 중복된 기술 설명은 최신/최고 사양 기준으로 정리
4) 결과물은 총 {MAX_FINAL_CHARS}자 이내로 구성
출력은 주제별로 일목요연하게 정리된 최종 코퍼스 텍스트만 출력하십시오.
"""
try:
resp = client.chat.completions.create(
model=GPT_MODEL,
messages=[
{"role": "system", "content": domain_prompt + "\n\n당신은 전체 기술 코퍼스를 설계하는 정보 아키텍트입니다."},
{"role": "user", "content": prompt}
]
)
return resp.choices[0].message.content.strip()
except Exception as e:
log(f"[ERROR] 최종 통합 실패: {e}")
return "\n\n".join(parts)
def main(input_dir, output_dir):
global DATA_ROOT, OUTPUT_ROOT, RAG_DIR, CONTEXT_DIR, LOG_DIR
DATA_ROOT = Path(input_dir)
OUTPUT_ROOT = Path(output_dir)
RAG_DIR = OUTPUT_ROOT / "rag"
CONTEXT_DIR = OUTPUT_ROOT / "context"
LOG_DIR = OUTPUT_ROOT / "logs"
for d in [RAG_DIR, CONTEXT_DIR, LOG_DIR]:
d.mkdir(parents=True, exist_ok=True)
log("=" * 60)
log("Corpus 생성 작업 시작 (AI 압축 방식)")
log("=" * 60)
# 도메인 프롬프트 로드
domain_prompt = load_domain_prompt()
log(f"도메인 프롬프트 로드 완료 ({len(domain_prompt)}자)")
# 모든 요약문 수집
summaries = load_all_summaries()
if not summaries:
log("요약 데이터가 없습니다. 먼저 chunk_and_summary.py를 실행하십시오.")
sys.exit(1)
log(f"총 요약문 개수: {len(summaries)}")
# 1단계: 배치 압축
compressed_parts = []
total_batches = (len(summaries) + BATCH_SIZE - 1) // BATCH_SIZE
log(f"\n배치 압축 시작 ({BATCH_SIZE}개씩 {total_batches}개 배치)...")
for i in range(total_batches):
batch = summaries[i*BATCH_SIZE : (i+1)*BATCH_SIZE]
part = compress_batch(domain_prompt, batch, i+1, total_batches)
compressed_parts.append(part)
# 2단계: 최종 통합
log(f"\n최종 통합 시작 ({len(compressed_parts)}개 파트)...")
final_corpus = merge_compressed_parts(domain_prompt, compressed_parts)
# 저장
out_path = CONTEXT_DIR / "corpus.txt"
out_path.write_text(final_corpus, encoding="utf-8")
# 통계 출력
raw_corpus_len = sum(len(s) for s in summaries)
log("\n" + "=" * 60)
log("Corpus 생성 완료!")
log("=" * 60)
log(f"전체 요약문: {len(summaries)}개 ({raw_corpus_len}자)")
log(f"최종 Corpus: {len(final_corpus)}")
log(f"압축률: {100 - (len(final_corpus) / raw_corpus_len * 100):.1f}%")
log(f"\n결과 저장 위치:")
log(f" - 원본 백업: {CONTEXT_DIR / 'corpus_raw.txt'}")
log(f" - 최종 Corpus: {out_path}")
# 원본 백업 (디버깅용)
(CONTEXT_DIR / "corpus_raw.txt").write_text("\n".join(summaries), encoding="utf-8")
if __name__ == "__main__":
main()