339 lines
11 KiB
Python
339 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
"""
|
|
chunk_and_summary_v2.py
|
|
|
|
기능:
|
|
- 이전 단계에서 생성된 output/raw/*.md 파일들을 대상으로
|
|
1) domain_prompt.txt 기반의 의미론적 청킹(Semantic Chunking)
|
|
2) 각 청크별 요약 생성
|
|
3) 청크 내 이미지 참조 보관
|
|
4) 최종 결과를 JSON 형식 (메타+청크+요약+이미지)으로 저장
|
|
- extract_1_v2.py 실행 후 생성된 .md 파일을 대상으로 함
|
|
- step1_domainprompt.py 실행 후 domain_prompt.txt가 존재해야 함
|
|
"""
|
|
import os
|
|
import sys
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from openai import OpenAI
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
|
|
# ===== OpenAI 설정 =====
|
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
|
|
GPT_MODEL = "gpt-5-2025-08-07"
|
|
client = OpenAI(api_key=OPENAI_API_KEY)
|
|
|
|
# ===== 필터링할 폴더 정의 =====
|
|
SKIP_DIR_NAMES = {"System Volume Information", "$RECYCLE.BIN", ".git", "__pycache__", "output"}
|
|
|
|
# ===== 이미지 참조 패턴 =====
|
|
IMAGE_PATTERN = re.compile(r'!\[([^\]]*)\]\(([^)]+)\)')
|
|
|
|
def log(msg: str):
|
|
print(msg, flush=True)
|
|
with (LOG_DIR / "chunk_and_summary_log.txt").open("a", encoding="utf-8") as f:
|
|
f.write(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}\n")
|
|
|
|
|
|
def load_domain_prompt() -> str:
|
|
p = CONTEXT_DIR / "domain_prompt.txt"
|
|
if not p.exists():
|
|
log(f"domain_prompt.txt가 존재하지 않습니다: {p}")
|
|
log("먼저 step1_domainprompt.py를 실행해야 합니다.")
|
|
sys.exit(1)
|
|
return p.read_text(encoding="utf-8", errors="ignore").strip()
|
|
|
|
|
|
def safe_rel(p: Path) -> str:
|
|
"""DATA_ROOT 기준 상대 경로 반환"""
|
|
try:
|
|
return str(p.relative_to(DATA_ROOT))
|
|
except:
|
|
return str(p)
|
|
|
|
|
|
def extract_text_md(p: Path) -> str:
|
|
"""마크다운 파일에서 텍스트 읽기"""
|
|
try:
|
|
return p.read_text(encoding="utf-8", errors="ignore")
|
|
except:
|
|
return ""
|
|
|
|
|
|
def find_images_in_text(text: str) -> list:
|
|
"""텍스트 내 이미지 참조 찾기"""
|
|
matches = IMAGE_PATTERN.findall(text)
|
|
return [{"alt": m[0], "path": m[1]} for m in matches]
|
|
|
|
|
|
def semantic_chunk(domain_prompt: str, text: str, source_name: str):
|
|
"""GPT 기반 의미론적 청킹"""
|
|
if not text.strip():
|
|
return []
|
|
|
|
# 텍스트가 너무 짧으면 그대로 사용
|
|
if len(text) < 500:
|
|
return [{
|
|
"title": "전체 내용",
|
|
"keywords": "",
|
|
"content": text
|
|
}]
|
|
|
|
user_prompt = f"""
|
|
다음 문서를 의미 있는 단위(문단/항목/주제)로 분리하고,
|
|
각 청크별 title / keywords / content 를 포함하는 JSON 배열로 출력하세요:
|
|
1. 측량 기준, 법규 등 도메인 지식에 따라 의미 단위로만 분리
|
|
2. 이미지 참조()가 포함된 텍스트는 해당 이미지와 관련된 설명과 함께 한 청크로 유지
|
|
3. 각 청크는 최대 1000자 내외로 구성
|
|
4. keywords는 쉼표로 구분된 핵심 단어 3~5개
|
|
|
|
문서 내용:
|
|
{text[:12000]}
|
|
|
|
JSON 배열로 출력하세요. 다른 설명은 필요 없습니다.
|
|
"""
|
|
|
|
try:
|
|
resp = client.chat.completions.create(
|
|
model=GPT_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": domain_prompt + "\n\n당신은 문서를 의미 있는 단위로 분석하는 전문가입니다. JSON 배열로 출력하세요."},
|
|
{"role": "user", "content": user_prompt},
|
|
],
|
|
)
|
|
data = resp.choices[0].message.content.strip()
|
|
|
|
# JSON 파싱 전 전처리
|
|
# ```json ... ``` 형식 제거
|
|
if "```json" in data:
|
|
data = data.split("```json")[1].split("```")[0].strip()
|
|
elif "```" in data:
|
|
data = data.split("```")[1].split("```")[0].strip()
|
|
|
|
if data.startswith("["):
|
|
return json.loads(data)
|
|
|
|
except json.JSONDecodeError as e:
|
|
log(f"[WARN] JSON 파싱 실패 ({source_name}): {e}")
|
|
except Exception as e:
|
|
log(f"[WARN] semantic_chunk API 실패 ({source_name}): {e}")
|
|
|
|
# fallback: 페이지/문단 기반 단순 분리
|
|
log(f"[INFO] Fallback 청킹 적용: {source_name}")
|
|
return fallback_chunk(text)
|
|
|
|
|
|
def fallback_chunk(text: str) -> list:
|
|
"""GPT 실패 시 대체 청킹 (페이지/문단 기준)"""
|
|
chunks = []
|
|
|
|
if "## Page " in text:
|
|
pages = re.split(r'\n## Page \d+\n', text)
|
|
for i, page_content in enumerate(pages):
|
|
if page_content.strip():
|
|
chunks.append({
|
|
"title": f"Page {i+1}",
|
|
"keywords": "",
|
|
"content": page_content.strip()
|
|
})
|
|
else:
|
|
# 빈 줄 2개 이상으로 분리
|
|
sections = re.split(r'\n{3,}', text)
|
|
for i, section in enumerate(sections):
|
|
if section.strip() and len(section.strip()) > 50:
|
|
chunks.append({
|
|
"title": f"단락 {i+1}",
|
|
"keywords": "",
|
|
"content": section.strip()
|
|
})
|
|
|
|
# 여전히 없으면 전체를 하나로
|
|
if not chunks:
|
|
chunks.append({
|
|
"title": "전체 내용",
|
|
"keywords": "",
|
|
"content": text.strip()
|
|
})
|
|
|
|
return chunks
|
|
|
|
|
|
def summary_chunk(domain_prompt: str, text: str, limit: int = 300) -> str:
|
|
"""개별 청크 요약 생성"""
|
|
if not text.strip():
|
|
return ""
|
|
|
|
# 이미지 참조 제거 후 요약 시도 (순수 텍스트 정보 중심)
|
|
text_only = IMAGE_PATTERN.sub('', text).strip()
|
|
|
|
if len(text_only) < 100:
|
|
return text_only
|
|
|
|
prompt = f"""
|
|
다음 텍스트를 {limit}자 이내의 한 문장 또는 핵심 문단으로 요약하세요.
|
|
측량 수치, 특정 기법, 법규 번호 등이 있다면 반드시 보존하십시오.
|
|
|
|
{text_only[:8000]}
|
|
"""
|
|
try:
|
|
resp = client.chat.completions.create(
|
|
model=GPT_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": domain_prompt + "\n\n텍스트를 전문적이고 정확하게 요약하는 전문가입니다."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
)
|
|
return resp.choices[0].message.content.strip()
|
|
except Exception as e:
|
|
log(f"[WARN] summary 실패: {e}")
|
|
return text_only[:limit]
|
|
|
|
|
|
def save_chunk_files(src: Path, text: str, domain_prompt: str) -> int:
|
|
"""
|
|
개별 마크다운 파일에 대해 청킹 및 요약 후 JSON 파일로 저장
|
|
|
|
Returns:
|
|
성공한 청크 개수
|
|
"""
|
|
stem = src.stem
|
|
folder_ctx = safe_rel(src.parent)
|
|
|
|
# 1. 원본 텍스트 보관
|
|
(TEXT_DIR / f"{stem}_text.txt").write_text(text, encoding="utf-8", errors="ignore")
|
|
|
|
# 2. 청킹
|
|
chunks = semantic_chunk(domain_prompt, text, src.name)
|
|
|
|
if not chunks:
|
|
log(f"[WARN] 청킹 결과 없음: {src.name}")
|
|
return 0
|
|
|
|
rag_items = []
|
|
|
|
for idx, ch in enumerate(chunks, start=1):
|
|
content = ch.get("content", "")
|
|
|
|
# 각 청크 요약
|
|
summ = summary_chunk(domain_prompt, content, 300)
|
|
|
|
# 이미지 참조 찾기
|
|
images_in_chunk = find_images_in_text(content)
|
|
|
|
rag_items.append({
|
|
"source": src.name,
|
|
"source_path": safe_rel(src),
|
|
"chunk": idx,
|
|
"total_chunks": len(chunks),
|
|
"title": ch.get("title", ""),
|
|
"keywords": ch.get("keywords", ""),
|
|
"text": content,
|
|
"summary": summ,
|
|
"folder_context": folder_ctx,
|
|
"images": images_in_chunk,
|
|
"has_images": len(images_in_chunk) > 0
|
|
})
|
|
|
|
# JSON 저장
|
|
(JSON_DIR / f"{stem}.json").write_text(
|
|
json.dumps(rag_items, ensure_ascii=False, indent=2),
|
|
encoding="utf-8"
|
|
)
|
|
|
|
# RAG용 통합 JSON 준비를 위해 복사
|
|
(RAG_DIR / f"{stem}_chunks.json").write_text(
|
|
json.dumps(rag_items, ensure_ascii=False, indent=2),
|
|
encoding="utf-8"
|
|
)
|
|
|
|
return len(chunks)
|
|
|
|
|
|
def main(input_dir, output_dir):
|
|
global DATA_ROOT, OUTPUT_ROOT, TEXT_DIR, JSON_DIR, RAG_DIR, CONTEXT_DIR, LOG_DIR
|
|
DATA_ROOT = Path(input_dir)
|
|
OUTPUT_ROOT = Path(output_dir)
|
|
TEXT_DIR = OUTPUT_ROOT / "text"
|
|
JSON_DIR = OUTPUT_ROOT / "json"
|
|
RAG_DIR = OUTPUT_ROOT / "rag"
|
|
CONTEXT_DIR = OUTPUT_ROOT / "context"
|
|
LOG_DIR = OUTPUT_ROOT / "logs"
|
|
for d in [TEXT_DIR, JSON_DIR, RAG_DIR, CONTEXT_DIR, LOG_DIR]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
log("=" * 60)
|
|
log("청킹 및 요약 작업 시작")
|
|
log(f"데이터 폴더: {DATA_ROOT}")
|
|
log(f"출력 폴더: {OUTPUT_ROOT}")
|
|
log("=" * 60)
|
|
|
|
# 도메인 프롬프트 로드
|
|
domain_prompt = load_domain_prompt()
|
|
log(f"도메인 프롬프트 로드 완료 ({len(domain_prompt)}자)")
|
|
|
|
stats = {"docs": 0, "chunks": 0, "images": 0, "errors": 0}
|
|
|
|
# 모든 .md 파일 찾기
|
|
md_files = list(DATA_ROOT.rglob("*.md"))
|
|
log(f"총 {len(md_files)}개의 .md 파일 발견\n")
|
|
|
|
def process_file(args):
|
|
idx, fpath = args
|
|
try:
|
|
rel_path = safe_rel(fpath)
|
|
log(f"[{idx}/{len(md_files)}] {rel_path}")
|
|
text = extract_text_md(fpath)
|
|
if not text.strip():
|
|
log(f" 비어있는 문서")
|
|
return 0, 0, 0
|
|
images = find_images_in_text(text)
|
|
chunk_count = save_chunk_files(fpath, text, domain_prompt)
|
|
log(f" {chunk_count}개 청크, {len(images)}개 이미지 추출")
|
|
return 1, chunk_count, len(images)
|
|
except Exception as e:
|
|
log(f" 오류 발생: {e}")
|
|
return 0, 0, 0
|
|
|
|
# 병렬 처리
|
|
with ThreadPoolExecutor(max_workers=4) as executor:
|
|
futures = {executor.submit(process_file, (i, f)): f for i, f in enumerate(md_files, 1)}
|
|
for future in as_completed(futures):
|
|
d, c, img = future.result()
|
|
stats["docs"] += d
|
|
stats["chunks"] += c
|
|
stats["images"] += img
|
|
|
|
# 전체 통계 저장
|
|
summary = {
|
|
"processed_at": datetime.now().isoformat(),
|
|
"data_root": str(DATA_ROOT),
|
|
"output_root": str(OUTPUT_ROOT),
|
|
"statistics": stats
|
|
}
|
|
|
|
(LOG_DIR / "chunk_summary_stats.json").write_text(
|
|
json.dumps(summary, ensure_ascii=False, indent=2),
|
|
encoding="utf-8"
|
|
)
|
|
|
|
# 결과 출력
|
|
log("\n" + "=" * 60)
|
|
log("청킹 및 요약 완료!")
|
|
log("=" * 60)
|
|
log(f"처리 문서: {stats['docs']}개")
|
|
log(f"생성 청크: {stats['chunks']}개")
|
|
log(f"이미지 포함 청크: {stats['images']}개")
|
|
log(f"전체 요약 통계는 LOG_DIR에 저장되었습니다.")
|
|
log(f"\n결과 저장 위치:")
|
|
log(f" - 텍스트: {TEXT_DIR}")
|
|
log(f" - JSON: {JSON_DIR}")
|
|
log(f" - RAG: {RAG_DIR}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|