232 lines
7.6 KiB
Python
232 lines
7.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
make_corpus_v2.py
|
|
|
|
기능:
|
|
- output/rag/*_chunks.json 에서 모든 청크의 summary를 모아
|
|
- AI가 CEL 목적(교육+자사솔루션 홍보)에 맞게 압축 정리
|
|
- 중복은 빈도 표시, 희귀하지만 중요한 건 [핵심] 표시
|
|
- 결과를 output/context/corpus.txt 로 저장
|
|
|
|
전제:
|
|
- chunk_and_summary.py 실행 후 *_chunks.json 들이 존재해야 한다.
|
|
- domain_prompt.txt가 존재해야 한다.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
from openai import OpenAI
|
|
from api_config import API_KEYS
|
|
|
|
# ===== 경로 설정 =====
|
|
DATA_ROOT = Path(r"D:\for python\survey_test\process")
|
|
OUTPUT_ROOT = Path(r"D:\for python\survey_test\output")
|
|
RAG_DIR = OUTPUT_ROOT / "rag"
|
|
CONTEXT_DIR = OUTPUT_ROOT / "context"
|
|
LOG_DIR = OUTPUT_ROOT / "logs"
|
|
|
|
for d in [RAG_DIR, CONTEXT_DIR, LOG_DIR]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
|
|
# ===== OpenAI 설정 =====
|
|
OPENAI_API_KEY = API_KEYS.get('GPT_API_KEY', '')
|
|
GPT_MODEL = "gpt-5-2025-08-07"
|
|
|
|
client = OpenAI(api_key=OPENAI_API_KEY)
|
|
|
|
# ===== 압축 설정 =====
|
|
BATCH_SIZE = 80 # 한 번에 처리할 요약 개수
|
|
MAX_CHARS_PER_BATCH = 3000 # 배치당 압축 결과 글자수
|
|
MAX_FINAL_CHARS = 8000 # 최종 corpus 글자수
|
|
|
|
|
|
def log(msg: str):
|
|
print(msg, flush=True)
|
|
with (LOG_DIR / "make_corpus_log.txt").open("a", encoding="utf-8") as f:
|
|
f.write(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}\n")
|
|
|
|
|
|
def load_domain_prompt() -> str:
|
|
p = CONTEXT_DIR / "domain_prompt.txt"
|
|
if not p.exists():
|
|
log("domain_prompt.txt가 없습니다. 먼저 step1을 실행해야 합니다.")
|
|
sys.exit(1)
|
|
return p.read_text(encoding="utf-8", errors="ignore").strip()
|
|
|
|
|
|
def load_all_summaries() -> list:
|
|
"""모든 청크의 summary + 출처 정보 수집"""
|
|
summaries = []
|
|
rag_files = sorted(RAG_DIR.glob("*_chunks.json"))
|
|
|
|
if not rag_files:
|
|
log("RAG 파일(*_chunks.json)이 없습니다. 먼저 chunk_and_summary.py를 실행해야 합니다.")
|
|
sys.exit(1)
|
|
|
|
for f in rag_files:
|
|
try:
|
|
units = json.loads(f.read_text(encoding="utf-8", errors="ignore"))
|
|
except Exception as e:
|
|
log(f"[WARN] RAG 파일 읽기 실패: {f.name} | {e}")
|
|
continue
|
|
|
|
for u in units:
|
|
summ = (u.get("summary") or "").strip()
|
|
source = (u.get("source") or "").strip()
|
|
keywords = (u.get("keywords") or "")
|
|
|
|
if summ:
|
|
# 출처와 키워드 포함
|
|
entry = f"[{source}] {summ}"
|
|
if keywords:
|
|
entry += f" (키워드: {keywords})"
|
|
summaries.append(entry)
|
|
|
|
return summaries
|
|
|
|
|
|
def compress_batch(domain_prompt: str, batch: list, batch_num: int, total_batches: int) -> str:
|
|
"""배치 단위로 요약들을 AI가 압축"""
|
|
|
|
batch_text = "\n".join([f"{i+1}. {s}" for i, s in enumerate(batch)])
|
|
|
|
prompt = f"""
|
|
아래는 문서에서 추출한 요약 {len(batch)}개이다. (배치 {batch_num}/{total_batches})
|
|
|
|
[요약 목록]
|
|
{batch_text}
|
|
|
|
다음 기준으로 이 요약들을 압축 정리하라:
|
|
|
|
1) 중복/유사 내용: 하나로 통합하되, 여러 문서에서 언급되면 "(N회 언급)" 표시
|
|
2) domain_prompt에 명시된 핵심 솔루션/시스템: 반드시 보존하고 [솔루션] 표시
|
|
3) domain_prompt의 목적에 중요한 내용 우선 보존:
|
|
- 해당 분야의 기초 개념
|
|
- 기존 방식의 한계점과 문제점
|
|
- 새로운 기술/방식의 장점
|
|
4) 단순 나열/절차만 있는 내용: 과감히 축약
|
|
5) 희귀하지만 핵심적인 인사이트: [핵심] 표시
|
|
|
|
출력 형식:
|
|
- 주제별로 그룹핑
|
|
- 각 항목은 1~2문장으로 간결하게
|
|
- 전체 {MAX_CHARS_PER_BATCH}자 이내
|
|
- 마크다운 없이 순수 텍스트로
|
|
"""
|
|
|
|
try:
|
|
resp = client.chat.completions.create(
|
|
model=GPT_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": domain_prompt + "\n\n너는 문서 요약을 주제별로 압축 정리하는 전문가이다."},
|
|
{"role": "user", "content": prompt}
|
|
]
|
|
)
|
|
result = resp.choices[0].message.content.strip()
|
|
log(f" 배치 {batch_num}/{total_batches} 압축 완료 ({len(result)}자)")
|
|
return result
|
|
except Exception as e:
|
|
log(f"[ERROR] 배치 {batch_num} 압축 실패: {e}")
|
|
# 실패 시 원본 일부 반환
|
|
return "\n".join(batch[:10])
|
|
|
|
|
|
def merge_compressed_parts(domain_prompt: str, parts: list) -> str:
|
|
"""배치별 압축 결과를 최종 통합"""
|
|
|
|
if len(parts) == 1:
|
|
return parts[0]
|
|
|
|
all_parts = "\n\n---\n\n".join([f"[파트 {i+1}]\n{p}" for i, p in enumerate(parts)])
|
|
|
|
prompt = f"""
|
|
아래는 대량의 문서 요약을 배치별로 압축한 결과이다.
|
|
이것을 최종 corpus로 통합하라.
|
|
|
|
[배치별 압축 결과]
|
|
{all_parts}
|
|
|
|
통합 기준:
|
|
1) 파트 간 중복 내용 제거 및 통합
|
|
2) domain_prompt에 명시된 목적과 흐름에 맞게 재구성
|
|
3) [솔루션], [핵심], (N회 언급) 표시는 유지
|
|
4) 전체 {MAX_FINAL_CHARS}자 이내
|
|
|
|
출력: 주제별로 정리된 최종 corpus (마크다운 없이)
|
|
"""
|
|
|
|
try:
|
|
resp = client.chat.completions.create(
|
|
model=GPT_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": domain_prompt + "\n\n너는 CEL 교육 콘텐츠 기획을 위한 corpus를 설계하는 전문가이다."},
|
|
{"role": "user", "content": prompt}
|
|
]
|
|
)
|
|
return resp.choices[0].message.content.strip()
|
|
except Exception as e:
|
|
log(f"[ERROR] 최종 통합 실패: {e}")
|
|
return "\n\n".join(parts)
|
|
|
|
|
|
def main():
|
|
log("=" * 60)
|
|
log("corpus 생성 시작 (AI 압축 버전)")
|
|
log("=" * 60)
|
|
|
|
# 도메인 프롬프트 로드
|
|
domain_prompt = load_domain_prompt()
|
|
log(f"도메인 프롬프트 로드 완료 ({len(domain_prompt)}자)")
|
|
|
|
# 모든 요약 수집
|
|
summaries = load_all_summaries()
|
|
if not summaries:
|
|
log("summary가 없습니다. corpus를 생성할 수 없습니다.")
|
|
sys.exit(1)
|
|
|
|
log(f"원본 요약 수집 완료: {len(summaries)}개")
|
|
|
|
# 원본 저장 (백업)
|
|
raw_corpus = "\n".join(summaries)
|
|
raw_path = CONTEXT_DIR / "corpus_raw.txt"
|
|
raw_path.write_text(raw_corpus, encoding="utf-8")
|
|
log(f"원본 corpus 백업: {raw_path} ({len(raw_corpus)}자)")
|
|
|
|
# 배치별 압축
|
|
total_batches = (len(summaries) + BATCH_SIZE - 1) // BATCH_SIZE
|
|
log(f"\n배치 압축 시작 ({BATCH_SIZE}개씩, 총 {total_batches}배치)")
|
|
|
|
compressed_parts = []
|
|
for i in range(0, len(summaries), BATCH_SIZE):
|
|
batch = summaries[i:i+BATCH_SIZE]
|
|
batch_num = (i // BATCH_SIZE) + 1
|
|
|
|
compressed = compress_batch(domain_prompt, batch, batch_num, total_batches)
|
|
compressed_parts.append(compressed)
|
|
|
|
# 최종 통합
|
|
log(f"\n최종 통합 시작 ({len(compressed_parts)}개 파트)")
|
|
final_corpus = merge_compressed_parts(domain_prompt, compressed_parts)
|
|
|
|
# 저장
|
|
out_path = CONTEXT_DIR / "corpus.txt"
|
|
out_path.write_text(final_corpus, encoding="utf-8")
|
|
|
|
# 통계
|
|
log("\n" + "=" * 60)
|
|
log("corpus 생성 완료!")
|
|
log("=" * 60)
|
|
log(f"원본 요약: {len(summaries)}개 ({len(raw_corpus)}자)")
|
|
log(f"압축 corpus: {len(final_corpus)}자")
|
|
log(f"압축률: {100 - (len(final_corpus) / len(raw_corpus) * 100):.1f}%")
|
|
log(f"\n저장 위치:")
|
|
log(f" - 원본: {raw_path}")
|
|
log(f" - 압축: {out_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |