Upload converters/pipeline/step5_rag.py

This commit is contained in:
2026-03-19 09:13:24 +09:00
parent 9633ff4a29
commit 2e32ba8254

View File

@@ -0,0 +1,138 @@
# -*- coding: utf-8 -*-
from dotenv import load_dotenv
load_dotenv()
"""
build_rag.py
기능:
- chunk_and_summary.py에서 생성된 output/rag/*_chunks.json 파일들을 읽어들임
- 각 청크의 text + summary를 활용하여 임베딩(text-embedding-3-small) 생성
- FAISS Index (IndexFlatIP)를 구성하여 저장
- 최종적으로 rag/faiss.index, meta.json, vectors.npy 파일을 생성함
"""
import os
import sys
import json
from pathlib import Path
import numpy as np
import faiss
from openai import OpenAI
# ===== OpenAI 설정 (구조 유지) =====
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
GPT_MODEL = "gpt-5-2025-08-07"
EMBED_MODEL = "text-embedding-3-small"
client = OpenAI(api_key=OPENAI_API_KEY)
def log(msg: str):
print(msg, flush=True)
with (LOG_DIR / "build_rag_log.txt").open("a", encoding="utf-8") as f:
f.write(msg + "\n")
def embed_texts(texts):
if not texts:
return np.zeros((0, 1536), dtype="float32")
embs = []
B = 96
for i in range(0, len(texts), B):
batch = texts[i:i+B]
resp = client.embeddings.create(model=EMBED_MODEL, input=batch)
for d in resp.data:
embs.append(np.array(d.embedding, dtype="float32"))
return np.vstack(embs)
def _build_embed_input(u: dict) -> str:
"""
text + summary를 합쳐서 임베딩용 텍스트를 만듭니다.
- text, summary 중 내용이 있는 것 사용
- 공백 정리
- 최대 길이 제한
"""
sum_ = (u.get("summary") or "").strip()
txt = (u.get("text") or "").strip()
if txt and sum_:
merged = txt + "\n\n요약: " + sum_[:1000]
else:
merged = txt or sum_
merged = " ".join(merged.split())
if not merged:
return ""
if len(merged) > 4000:
merged = merged[:4000]
return merged
def build_faiss_index():
docs = []
metas = []
rag_files = list(RAG_DIR.glob("*_chunks.json"))
if not rag_files:
log("RAG 대상 파일이 없습니다. 먼저 chunk_and_summary.py를 실행하십시오.")
sys.exit(1)
for f in rag_files:
try:
units = json.loads(f.read_text(encoding="utf-8", errors="ignore"))
except Exception as e:
log(f"[WARN] RAG 로드 실패: {f.name} | {e}")
continue
for u in units:
embed_input = _build_embed_input(u)
if not embed_input:
continue
if len(embed_input) < 40:
continue
docs.append(embed_input)
metas.append({
"source": u.get("source", ""),
"chunk": int(u.get("chunk", 0)),
"folder_context": u.get("folder_context", "")
})
if not docs:
log("임베딩할 텍스트 내용이 없습니다.")
sys.exit(1)
log(f"{len(docs)}개 청크 임베딩 시작...")
vectors = embed_texts(docs)
# 정규화 (Inner Product 사용 시 필수)
faiss.normalize_L2(vectors)
# FAISS 인덱스 생성
d = 1536
index = faiss.IndexFlatIP(d)
index.add(vectors)
# 결과 저장
faiss.write_index(index, str(RAG_DIR / "faiss.index"))
(RAG_DIR / "meta.json").write_text(json.dumps(metas, ensure_ascii=False, indent=2), encoding="utf-8")
np.save(str(RAG_DIR / "vectors.npy"), vectors)
log(f"FAISS 인덱스 저장 완료: {RAG_DIR / 'faiss.index'}")
def main(input_dir, output_dir):
global OUTPUT_ROOT, RAG_DIR, LOG_DIR
OUTPUT_ROOT = Path(output_dir)
RAG_DIR = OUTPUT_ROOT / "rag"
LOG_DIR = OUTPUT_ROOT / "logs"
for d in [RAG_DIR, LOG_DIR]:
d.mkdir(parents=True, exist_ok=True)
log("=== RAG 벡터 인덱스 구축 시작 ===")
build_faiss_index()
log("=== FAISS RAG 인덱스 구축 완료 ===")
if __name__ == "__main__":
main()