Upload converters/pipeline/step4_chunk.py
This commit is contained in:
338
03.Code/업로드용/converters/pipeline/step4_chunk.py
Normal file
338
03.Code/업로드용/converters/pipeline/step4_chunk.py
Normal file
@@ -0,0 +1,338 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
"""
|
||||
chunk_and_summary_v2.py
|
||||
|
||||
기능:
|
||||
- 이전 단계에서 생성된 output/raw/*.md 파일들을 대상으로
|
||||
1) domain_prompt.txt 기반의 의미론적 청킹(Semantic Chunking)
|
||||
2) 각 청크별 요약 생성
|
||||
3) 청크 내 이미지 참조 보관
|
||||
4) 최종 결과를 JSON 형식 (메타+청크+요약+이미지)으로 저장
|
||||
- extract_1_v2.py 실행 후 생성된 .md 파일을 대상으로 함
|
||||
- step1_domainprompt.py 실행 후 domain_prompt.txt가 존재해야 함
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from openai import OpenAI
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
|
||||
# ===== OpenAI 설정 =====
|
||||
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
|
||||
GPT_MODEL = "gpt-5-2025-08-07"
|
||||
client = OpenAI(api_key=OPENAI_API_KEY)
|
||||
|
||||
# ===== 필터링할 폴더 정의 =====
|
||||
SKIP_DIR_NAMES = {"System Volume Information", "$RECYCLE.BIN", ".git", "__pycache__", "output"}
|
||||
|
||||
# ===== 이미지 참조 패턴 =====
|
||||
IMAGE_PATTERN = re.compile(r'!\[([^\]]*)\]\(([^)]+)\)')
|
||||
|
||||
def log(msg: str):
|
||||
print(msg, flush=True)
|
||||
with (LOG_DIR / "chunk_and_summary_log.txt").open("a", encoding="utf-8") as f:
|
||||
f.write(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}\n")
|
||||
|
||||
|
||||
def load_domain_prompt() -> str:
|
||||
p = CONTEXT_DIR / "domain_prompt.txt"
|
||||
if not p.exists():
|
||||
log(f"domain_prompt.txt가 존재하지 않습니다: {p}")
|
||||
log("먼저 step1_domainprompt.py를 실행해야 합니다.")
|
||||
sys.exit(1)
|
||||
return p.read_text(encoding="utf-8", errors="ignore").strip()
|
||||
|
||||
|
||||
def safe_rel(p: Path) -> str:
|
||||
"""DATA_ROOT 기준 상대 경로 반환"""
|
||||
try:
|
||||
return str(p.relative_to(DATA_ROOT))
|
||||
except:
|
||||
return str(p)
|
||||
|
||||
|
||||
def extract_text_md(p: Path) -> str:
|
||||
"""마크다운 파일에서 텍스트 읽기"""
|
||||
try:
|
||||
return p.read_text(encoding="utf-8", errors="ignore")
|
||||
except:
|
||||
return ""
|
||||
|
||||
|
||||
def find_images_in_text(text: str) -> list:
|
||||
"""텍스트 내 이미지 참조 찾기"""
|
||||
matches = IMAGE_PATTERN.findall(text)
|
||||
return [{"alt": m[0], "path": m[1]} for m in matches]
|
||||
|
||||
|
||||
def semantic_chunk(domain_prompt: str, text: str, source_name: str):
|
||||
"""GPT 기반 의미론적 청킹"""
|
||||
if not text.strip():
|
||||
return []
|
||||
|
||||
# 텍스트가 너무 짧으면 그대로 사용
|
||||
if len(text) < 500:
|
||||
return [{
|
||||
"title": "전체 내용",
|
||||
"keywords": "",
|
||||
"content": text
|
||||
}]
|
||||
|
||||
user_prompt = f"""
|
||||
다음 문서를 의미 있는 단위(문단/항목/주제)로 분리하고,
|
||||
각 청크별 title / keywords / content 를 포함하는 JSON 배열로 출력하세요:
|
||||
1. 측량 기준, 법규 등 도메인 지식에 따라 의미 단위로만 분리
|
||||
2. 이미지 참조()가 포함된 텍스트는 해당 이미지와 관련된 설명과 함께 한 청크로 유지
|
||||
3. 각 청크는 최대 1000자 내외로 구성
|
||||
4. keywords는 쉼표로 구분된 핵심 단어 3~5개
|
||||
|
||||
문서 내용:
|
||||
{text[:12000]}
|
||||
|
||||
JSON 배열로 출력하세요. 다른 설명은 필요 없습니다.
|
||||
"""
|
||||
|
||||
try:
|
||||
resp = client.chat.completions.create(
|
||||
model=GPT_MODEL,
|
||||
messages=[
|
||||
{"role": "system", "content": domain_prompt + "\n\n당신은 문서를 의미 있는 단위로 분석하는 전문가입니다. JSON 배열로 출력하세요."},
|
||||
{"role": "user", "content": user_prompt},
|
||||
],
|
||||
)
|
||||
data = resp.choices[0].message.content.strip()
|
||||
|
||||
# JSON 파싱 전 전처리
|
||||
# ```json ... ``` 형식 제거
|
||||
if "```json" in data:
|
||||
data = data.split("```json")[1].split("```")[0].strip()
|
||||
elif "```" in data:
|
||||
data = data.split("```")[1].split("```")[0].strip()
|
||||
|
||||
if data.startswith("["):
|
||||
return json.loads(data)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
log(f"[WARN] JSON 파싱 실패 ({source_name}): {e}")
|
||||
except Exception as e:
|
||||
log(f"[WARN] semantic_chunk API 실패 ({source_name}): {e}")
|
||||
|
||||
# fallback: 페이지/문단 기반 단순 분리
|
||||
log(f"[INFO] Fallback 청킹 적용: {source_name}")
|
||||
return fallback_chunk(text)
|
||||
|
||||
|
||||
def fallback_chunk(text: str) -> list:
|
||||
"""GPT 실패 시 대체 청킹 (페이지/문단 기준)"""
|
||||
chunks = []
|
||||
|
||||
if "## Page " in text:
|
||||
pages = re.split(r'\n## Page \d+\n', text)
|
||||
for i, page_content in enumerate(pages):
|
||||
if page_content.strip():
|
||||
chunks.append({
|
||||
"title": f"Page {i+1}",
|
||||
"keywords": "",
|
||||
"content": page_content.strip()
|
||||
})
|
||||
else:
|
||||
# 빈 줄 2개 이상으로 분리
|
||||
sections = re.split(r'\n{3,}', text)
|
||||
for i, section in enumerate(sections):
|
||||
if section.strip() and len(section.strip()) > 50:
|
||||
chunks.append({
|
||||
"title": f"단락 {i+1}",
|
||||
"keywords": "",
|
||||
"content": section.strip()
|
||||
})
|
||||
|
||||
# 여전히 없으면 전체를 하나로
|
||||
if not chunks:
|
||||
chunks.append({
|
||||
"title": "전체 내용",
|
||||
"keywords": "",
|
||||
"content": text.strip()
|
||||
})
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def summary_chunk(domain_prompt: str, text: str, limit: int = 300) -> str:
|
||||
"""개별 청크 요약 생성"""
|
||||
if not text.strip():
|
||||
return ""
|
||||
|
||||
# 이미지 참조 제거 후 요약 시도 (순수 텍스트 정보 중심)
|
||||
text_only = IMAGE_PATTERN.sub('', text).strip()
|
||||
|
||||
if len(text_only) < 100:
|
||||
return text_only
|
||||
|
||||
prompt = f"""
|
||||
다음 텍스트를 {limit}자 이내의 한 문장 또는 핵심 문단으로 요약하세요.
|
||||
측량 수치, 특정 기법, 법규 번호 등이 있다면 반드시 보존하십시오.
|
||||
|
||||
{text_only[:8000]}
|
||||
"""
|
||||
try:
|
||||
resp = client.chat.completions.create(
|
||||
model=GPT_MODEL,
|
||||
messages=[
|
||||
{"role": "system", "content": domain_prompt + "\n\n텍스트를 전문적이고 정확하게 요약하는 전문가입니다."},
|
||||
{"role": "user", "content": prompt},
|
||||
],
|
||||
)
|
||||
return resp.choices[0].message.content.strip()
|
||||
except Exception as e:
|
||||
log(f"[WARN] summary 실패: {e}")
|
||||
return text_only[:limit]
|
||||
|
||||
|
||||
def save_chunk_files(src: Path, text: str, domain_prompt: str) -> int:
|
||||
"""
|
||||
개별 마크다운 파일에 대해 청킹 및 요약 후 JSON 파일로 저장
|
||||
|
||||
Returns:
|
||||
성공한 청크 개수
|
||||
"""
|
||||
stem = src.stem
|
||||
folder_ctx = safe_rel(src.parent)
|
||||
|
||||
# 1. 원본 텍스트 보관
|
||||
(TEXT_DIR / f"{stem}_text.txt").write_text(text, encoding="utf-8", errors="ignore")
|
||||
|
||||
# 2. 청킹
|
||||
chunks = semantic_chunk(domain_prompt, text, src.name)
|
||||
|
||||
if not chunks:
|
||||
log(f"[WARN] 청킹 결과 없음: {src.name}")
|
||||
return 0
|
||||
|
||||
rag_items = []
|
||||
|
||||
for idx, ch in enumerate(chunks, start=1):
|
||||
content = ch.get("content", "")
|
||||
|
||||
# 각 청크 요약
|
||||
summ = summary_chunk(domain_prompt, content, 300)
|
||||
|
||||
# 이미지 참조 찾기
|
||||
images_in_chunk = find_images_in_text(content)
|
||||
|
||||
rag_items.append({
|
||||
"source": src.name,
|
||||
"source_path": safe_rel(src),
|
||||
"chunk": idx,
|
||||
"total_chunks": len(chunks),
|
||||
"title": ch.get("title", ""),
|
||||
"keywords": ch.get("keywords", ""),
|
||||
"text": content,
|
||||
"summary": summ,
|
||||
"folder_context": folder_ctx,
|
||||
"images": images_in_chunk,
|
||||
"has_images": len(images_in_chunk) > 0
|
||||
})
|
||||
|
||||
# JSON 저장
|
||||
(JSON_DIR / f"{stem}.json").write_text(
|
||||
json.dumps(rag_items, ensure_ascii=False, indent=2),
|
||||
encoding="utf-8"
|
||||
)
|
||||
|
||||
# RAG용 통합 JSON 준비를 위해 복사
|
||||
(RAG_DIR / f"{stem}_chunks.json").write_text(
|
||||
json.dumps(rag_items, ensure_ascii=False, indent=2),
|
||||
encoding="utf-8"
|
||||
)
|
||||
|
||||
return len(chunks)
|
||||
|
||||
|
||||
def main(input_dir, output_dir):
|
||||
global DATA_ROOT, OUTPUT_ROOT, TEXT_DIR, JSON_DIR, RAG_DIR, CONTEXT_DIR, LOG_DIR
|
||||
DATA_ROOT = Path(input_dir)
|
||||
OUTPUT_ROOT = Path(output_dir)
|
||||
TEXT_DIR = OUTPUT_ROOT / "text"
|
||||
JSON_DIR = OUTPUT_ROOT / "json"
|
||||
RAG_DIR = OUTPUT_ROOT / "rag"
|
||||
CONTEXT_DIR = OUTPUT_ROOT / "context"
|
||||
LOG_DIR = OUTPUT_ROOT / "logs"
|
||||
for d in [TEXT_DIR, JSON_DIR, RAG_DIR, CONTEXT_DIR, LOG_DIR]:
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
log("=" * 60)
|
||||
log("청킹 및 요약 작업 시작")
|
||||
log(f"데이터 폴더: {DATA_ROOT}")
|
||||
log(f"출력 폴더: {OUTPUT_ROOT}")
|
||||
log("=" * 60)
|
||||
|
||||
# 도메인 프롬프트 로드
|
||||
domain_prompt = load_domain_prompt()
|
||||
log(f"도메인 프롬프트 로드 완료 ({len(domain_prompt)}자)")
|
||||
|
||||
stats = {"docs": 0, "chunks": 0, "images": 0, "errors": 0}
|
||||
|
||||
# 모든 .md 파일 찾기
|
||||
md_files = list(DATA_ROOT.rglob("*.md"))
|
||||
log(f"총 {len(md_files)}개의 .md 파일 발견\n")
|
||||
|
||||
def process_file(args):
|
||||
idx, fpath = args
|
||||
try:
|
||||
rel_path = safe_rel(fpath)
|
||||
log(f"[{idx}/{len(md_files)}] {rel_path}")
|
||||
text = extract_text_md(fpath)
|
||||
if not text.strip():
|
||||
log(f" 비어있는 문서")
|
||||
return 0, 0, 0
|
||||
images = find_images_in_text(text)
|
||||
chunk_count = save_chunk_files(fpath, text, domain_prompt)
|
||||
log(f" {chunk_count}개 청크, {len(images)}개 이미지 추출")
|
||||
return 1, chunk_count, len(images)
|
||||
except Exception as e:
|
||||
log(f" 오류 발생: {e}")
|
||||
return 0, 0, 0
|
||||
|
||||
# 병렬 처리
|
||||
with ThreadPoolExecutor(max_workers=4) as executor:
|
||||
futures = {executor.submit(process_file, (i, f)): f for i, f in enumerate(md_files, 1)}
|
||||
for future in as_completed(futures):
|
||||
d, c, img = future.result()
|
||||
stats["docs"] += d
|
||||
stats["chunks"] += c
|
||||
stats["images"] += img
|
||||
|
||||
# 전체 통계 저장
|
||||
summary = {
|
||||
"processed_at": datetime.now().isoformat(),
|
||||
"data_root": str(DATA_ROOT),
|
||||
"output_root": str(OUTPUT_ROOT),
|
||||
"statistics": stats
|
||||
}
|
||||
|
||||
(LOG_DIR / "chunk_summary_stats.json").write_text(
|
||||
json.dumps(summary, ensure_ascii=False, indent=2),
|
||||
encoding="utf-8"
|
||||
)
|
||||
|
||||
# 결과 출력
|
||||
log("\n" + "=" * 60)
|
||||
log("청킹 및 요약 완료!")
|
||||
log("=" * 60)
|
||||
log(f"처리 문서: {stats['docs']}개")
|
||||
log(f"생성 청크: {stats['chunks']}개")
|
||||
log(f"이미지 포함 청크: {stats['images']}개")
|
||||
log(f"전체 요약 통계는 LOG_DIR에 저장되었습니다.")
|
||||
log(f"\n결과 저장 위치:")
|
||||
log(f" - 텍스트: {TEXT_DIR}")
|
||||
log(f" - JSON: {JSON_DIR}")
|
||||
log(f" - RAG: {RAG_DIR}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user