📦 Initialize Geulbeot structure and merge Prompts & test projects
This commit is contained in:
457
03. Code/geulbeot_10th/domain_api.py
Normal file
457
03. Code/geulbeot_10th/domain_api.py
Normal file
@@ -0,0 +1,457 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
domain_api.py
|
||||
|
||||
도메인 지식 관리 API + 파이프라인 래퍼
|
||||
app.py에서 import하여 사용
|
||||
|
||||
사용법 (app.py):
|
||||
from domain_api import register_domain_routes
|
||||
register_domain_routes(app)
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
from flask import request, jsonify
|
||||
|
||||
# ===== 경로 설정 =====
|
||||
# app.py와 같은 레벨에 domains/ 폴더가 있다고 가정
|
||||
BASE_DIR = Path(__file__).parent
|
||||
DOMAIN_CONFIG_PATH = BASE_DIR / "domain_config.json"
|
||||
DOMAIN_DIR = BASE_DIR / "domain"
|
||||
|
||||
# 파이프라인 출력 경로 (step3~9가 사용하는 경로)
|
||||
# 실제 환경에 맞게 수정 필요
|
||||
PIPELINE_OUTPUT_ROOT = Path(os.getenv(
|
||||
"PIPELINE_OUTPUT_ROOT",
|
||||
r"D:\for python\geulbeot-light\geulbeot-light\00.test\hwpx\out\out"
|
||||
))
|
||||
CONTEXT_DIR = PIPELINE_OUTPUT_ROOT / "context"
|
||||
|
||||
|
||||
def register_domain_routes(app):
|
||||
"""Flask 앱에 도메인 관련 라우트 등록"""
|
||||
|
||||
@app.route('/api/domain-config', methods=['GET'])
|
||||
def get_domain_config():
|
||||
"""도메인 구조 설정 반환"""
|
||||
try:
|
||||
if DOMAIN_CONFIG_PATH.exists():
|
||||
config = json.loads(DOMAIN_CONFIG_PATH.read_text(encoding='utf-8'))
|
||||
|
||||
# 각 도메인 파일 존재 여부 체크
|
||||
for cat in config.get('categories', []):
|
||||
if cat.get('file'):
|
||||
fpath = DOMAIN_DIR / cat['file']
|
||||
cat['file_exists'] = fpath.exists()
|
||||
cat['file_size'] = fpath.stat().st_size if fpath.exists() else 0
|
||||
|
||||
for child in cat.get('children', []):
|
||||
if child.get('file'):
|
||||
fpath = DOMAIN_DIR / child['file']
|
||||
child['file_exists'] = fpath.exists()
|
||||
child['file_size'] = fpath.stat().st_size if fpath.exists() else 0
|
||||
|
||||
return jsonify(config)
|
||||
else:
|
||||
return jsonify({'error': 'domain_config.json not found', 'categories': []}), 404
|
||||
except Exception as e:
|
||||
return jsonify({'error': str(e), 'categories': []}), 500
|
||||
|
||||
|
||||
@app.route('/api/domain-combine', methods=['POST'])
|
||||
def combine_domains():
|
||||
"""
|
||||
선택된 도메인 .txt 파일들을 합쳐서 domain_prompt.txt로 저장
|
||||
|
||||
요청:
|
||||
{ "selected": ["civil_general", "survey", "bim"] }
|
||||
|
||||
응답:
|
||||
{ "success": true, "combined_length": 3200, "selected_names": [...] }
|
||||
"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
selected_ids = data.get('selected', [])
|
||||
|
||||
if not selected_ids:
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'combined_length': 0,
|
||||
'selected_names': [],
|
||||
'message': '선택 없음 - step3 자동 분석 모드'
|
||||
})
|
||||
|
||||
# config 로드
|
||||
config = json.loads(DOMAIN_CONFIG_PATH.read_text(encoding='utf-8'))
|
||||
|
||||
# 선택된 ID → 파일 경로 + 이름 매핑
|
||||
domain_parts = []
|
||||
guide_parts = []
|
||||
selected_names = []
|
||||
|
||||
for cat in config.get('categories', []):
|
||||
is_guide = (cat['id'] == 'report_guide')
|
||||
target = guide_parts if is_guide else domain_parts
|
||||
|
||||
if cat['id'] in selected_ids and cat.get('file'):
|
||||
fpath = DOMAIN_DIR / cat['file']
|
||||
if fpath.exists():
|
||||
content = fpath.read_text(encoding='utf-8', errors='ignore').strip()
|
||||
if content:
|
||||
target.append(f"[{cat['label']}]\n{content}")
|
||||
selected_names.append(cat['label'])
|
||||
|
||||
for child in cat.get('children', []):
|
||||
if child['id'] in selected_ids and child.get('file'):
|
||||
fpath = DOMAIN_DIR / child['file']
|
||||
if fpath.exists():
|
||||
content = fpath.read_text(encoding='utf-8', errors='ignore').strip()
|
||||
if content:
|
||||
target.append(f"[{child['label']}]\n{content}")
|
||||
selected_names.append(child['label'])
|
||||
selected_names.append(child['label'])
|
||||
|
||||
if not domain_parts and not guide_parts:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': '선택된 도메인의 지식 파일이 비어있습니다.'
|
||||
})
|
||||
|
||||
sep = "\n\n" + "=" * 50 + "\n\n"
|
||||
sections = []
|
||||
|
||||
if domain_parts:
|
||||
domain_names = [n for n in selected_names if n not in ['목차 구성 가이드', '보고서 문체 가이드']]
|
||||
sections.append(
|
||||
f"너는 다음 분야의 전문가이다: {', '.join(domain_names)}.\n"
|
||||
f"다음의 도메인 지식을 기반으로, 사실에 근거하여 전문적이고 정확한 내용을 작성하라.\n"
|
||||
f"추측이나 창작은 금지하며, 제공된 근거 자료의 원문을 최대한 보존하라.\n\n"
|
||||
f"[도메인 전문 지식]\n" + sep.join(domain_parts)
|
||||
)
|
||||
|
||||
if guide_parts:
|
||||
sections.append(
|
||||
f"[보고서 작성 가이드]\n"
|
||||
f"다음 가이드를 참고하여 보고서의 목차 구성과 문체를 결정하라.\n\n"
|
||||
+ sep.join(guide_parts)
|
||||
)
|
||||
|
||||
final_text = ("\n\n" + "=" * 60 + "\n\n").join(sections)
|
||||
|
||||
# report_guide는 항상 자동 주입 (사용자 선택 아님)
|
||||
guide_dir = DOMAIN_DIR / 'report_guide'
|
||||
if guide_dir.exists():
|
||||
guide_texts = []
|
||||
for gf in sorted(guide_dir.glob('*.txt')):
|
||||
content = gf.read_text(encoding='utf-8', errors='ignore').strip()
|
||||
if content:
|
||||
guide_texts.append(content)
|
||||
if guide_texts:
|
||||
guide_sep = "\n\n" + "=" * 50 + "\n\n"
|
||||
final_text += (
|
||||
"\n\n" + "=" * 60 + "\n\n"
|
||||
"[보고서 작성 가이드]\n"
|
||||
"다음 가이드를 참고하여 보고서의 목차 구성과 문체를 결정하라.\n\n"
|
||||
+ guide_sep.join(guide_texts)
|
||||
)
|
||||
|
||||
# domain_prompt.txt로 저장
|
||||
CONTEXT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
output_path = CONTEXT_DIR / "domain_prompt.txt"
|
||||
output_path.write_text(final_text, encoding='utf-8')
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'combined_length': len(final_text),
|
||||
'selected_names': selected_names,
|
||||
'selected_ids': selected_ids,
|
||||
'output_path': str(output_path)
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({'success': False, 'error': str(e)}), 500
|
||||
|
||||
|
||||
@app.route('/api/domain-list', methods=['GET'])
|
||||
def list_domain_files():
|
||||
"""
|
||||
domains/ 폴더의 .txt 파일 목록 반환
|
||||
도메인 지식 파일 관리용
|
||||
"""
|
||||
try:
|
||||
files = []
|
||||
|
||||
if DOMAIN_DIR.exists():
|
||||
for f in sorted(DOMAIN_DIR.rglob('*.txt')):
|
||||
rel = f.relative_to(DOMAIN_DIR)
|
||||
files.append({
|
||||
'path': str(rel),
|
||||
'name': f.stem,
|
||||
'size': f.stat().st_size,
|
||||
'preview': f.read_text(encoding='utf-8', errors='ignore')[:200]
|
||||
})
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'files': files,
|
||||
'domains_dir': str(DOMAIN_DIR)
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({'success': False, 'error': str(e)}), 500
|
||||
|
||||
|
||||
@app.route('/api/domain-save', methods=['POST'])
|
||||
def save_domain_file():
|
||||
"""
|
||||
도메인 지식 파일 저장/수정
|
||||
|
||||
요청:
|
||||
{ "id": "survey", "content": "측량 분야의 전문 지식..." }
|
||||
"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
domain_id = data.get('id', '')
|
||||
content = data.get('content', '')
|
||||
|
||||
if not domain_id or not content:
|
||||
return jsonify({'success': False, 'error': 'id와 content가 필요합니다.'})
|
||||
|
||||
# config에서 파일 경로 찾기
|
||||
config = json.loads(DOMAIN_CONFIG_PATH.read_text(encoding='utf-8'))
|
||||
file_path = None
|
||||
|
||||
for cat in config.get('categories', []):
|
||||
if cat['id'] == domain_id:
|
||||
file_path = cat.get('file')
|
||||
break
|
||||
for child in cat.get('children', []):
|
||||
if child['id'] == domain_id:
|
||||
file_path = child.get('file')
|
||||
break
|
||||
if file_path:
|
||||
break
|
||||
|
||||
if not file_path:
|
||||
return jsonify({'success': False, 'error': f'도메인 ID를 찾을 수 없습니다: {domain_id}'})
|
||||
|
||||
# 파일 저장
|
||||
full_path = BASE_DIR / file_path
|
||||
full_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
full_path.write_text(content, encoding='utf-8')
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'path': str(full_path),
|
||||
'size': len(content)
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({'success': False, 'error': str(e)}), 500
|
||||
|
||||
|
||||
@app.route('/api/pipeline/status', methods=['GET'])
|
||||
def pipeline_status():
|
||||
"""파이프라인 상태 확인 - 각 step의 출력 파일 존재 여부"""
|
||||
try:
|
||||
status = {
|
||||
'step3_domain': (CONTEXT_DIR / 'domain_prompt.txt').exists(),
|
||||
'step4_chunks': len(list((PIPELINE_OUTPUT_ROOT / 'rag').glob('*_chunks.json'))) if (PIPELINE_OUTPUT_ROOT / 'rag').exists() else 0,
|
||||
'step5_faiss': (PIPELINE_OUTPUT_ROOT / 'rag' / 'faiss.index').exists(),
|
||||
'step6_corpus': (CONTEXT_DIR / 'corpus.txt').exists(),
|
||||
'step7_outline': (CONTEXT_DIR / 'outline_issue_report.txt').exists(),
|
||||
'step8_report': (PIPELINE_OUTPUT_ROOT / 'generated' / 'report_draft.md').exists(),
|
||||
'step9_html': (PIPELINE_OUTPUT_ROOT / 'generated' / 'report.html').exists(),
|
||||
}
|
||||
|
||||
return jsonify({'success': True, 'status': status})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({'success': False, 'error': str(e)}), 500
|
||||
|
||||
|
||||
# ===== 파이프라인 실행 API =====
|
||||
|
||||
@app.route('/api/generate-toc', methods=['POST'])
|
||||
def generate_toc():
|
||||
"""
|
||||
목차 생성 API (step3 → 4 → 5 → 6 → 7)
|
||||
|
||||
도메인 선택을 한 경우: step3 스킵 (이미 domain_prompt.txt 있음)
|
||||
도메인 선택 안 한 경우: step3 실행
|
||||
|
||||
요청:
|
||||
{
|
||||
"folder_path": "D:\\...",
|
||||
"domain_selected": true/false,
|
||||
"selected_domains": ["civil_general", "survey"]
|
||||
}
|
||||
|
||||
응답:
|
||||
{
|
||||
"success": true,
|
||||
"title": "보고서 제목",
|
||||
"toc_items": [
|
||||
{ "num": "1.1.1", "title": "...", "guide": "...", "keywords": [...] }
|
||||
]
|
||||
}
|
||||
"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
folder_path = data.get('folder_path', '')
|
||||
domain_selected = data.get('domain_selected', False)
|
||||
|
||||
# TODO: 실제 step 실행 연결
|
||||
# 현재는 목차 파일이 이미 있으면 읽어서 반환
|
||||
|
||||
outline_path = CONTEXT_DIR / 'outline_issue_report.txt'
|
||||
|
||||
if outline_path.exists():
|
||||
# 기존 목차 파싱
|
||||
toc_items = parse_outline_for_frontend(outline_path)
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'title': toc_items[0].get('report_title', '보고서') if toc_items else '보고서',
|
||||
'toc_items': toc_items,
|
||||
'source': 'cached'
|
||||
})
|
||||
else:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': '목차 파일이 아직 생성되지 않았습니다. 파이프라인을 먼저 실행해주세요.',
|
||||
'hint': 'step3~7을 순서대로 실행해야 합니다.'
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({'success': False, 'error': str(e)}), 500
|
||||
|
||||
@app.route('/api/generate-report-from-toc', methods=['POST'])
|
||||
def generate_report_from_toc():
|
||||
"""
|
||||
편집된 목차로 보고서 생성 (step8 → step9)
|
||||
|
||||
요청:
|
||||
{
|
||||
"toc_items": [...], # 편집된 목차
|
||||
"write_mode": "restructure",
|
||||
"instruction": "..."
|
||||
}
|
||||
"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
toc_items = data.get('toc_items', [])
|
||||
write_mode = data.get('write_mode', 'restructure')
|
||||
instruction = data.get('instruction', '')
|
||||
|
||||
# TODO: step8 실행 (generate_report_gemini)
|
||||
# TODO: step9 실행 (md_to_html_publisher)
|
||||
|
||||
# 현재는 기존 generated 파일이 있으면 반환
|
||||
report_html_path = PIPELINE_OUTPUT_ROOT / 'generated' / 'report.html'
|
||||
|
||||
if report_html_path.exists():
|
||||
html = report_html_path.read_text(encoding='utf-8')
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'html': html,
|
||||
'source': 'cached'
|
||||
})
|
||||
else:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': '보고서 파일이 아직 생성되지 않았습니다.'
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({'success': False, 'error': str(e)}), 500
|
||||
|
||||
|
||||
|
||||
@app.route('/api/check-folder', methods=['POST'])
|
||||
def check_folder():
|
||||
"""폴더 경로의 파일 목록 + 확장자별 분류"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
folder = Path(data.get('folder_path', ''))
|
||||
|
||||
if not folder.exists() or not folder.is_dir():
|
||||
return jsonify({'success': False, 'error': '폴더를 찾을 수 없습니다.'})
|
||||
|
||||
SUPPORTED = {'.hwpx', '.hwp', '.pdf', '.docx', '.xlsx', '.pptx', '.txt', '.csv', 'md', 'json','img', 'png', 'html'}
|
||||
|
||||
all_files = [f for f in folder.rglob('*') if f.is_file()]
|
||||
ok_files = [f for f in all_files if f.suffix.lower() in SUPPORTED]
|
||||
unknown_files = [f for f in all_files if f.suffix.lower() not in SUPPORTED]
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'total': len(all_files),
|
||||
'ok': len(ok_files),
|
||||
'unknown': len(unknown_files),
|
||||
'ok_list': [{'name': f.name, 'size': f.stat().st_size} for f in ok_files],
|
||||
'unknown_list': [f.name for f in unknown_files]
|
||||
})
|
||||
except Exception as e:
|
||||
return jsonify({'success': False, 'error': str(e)}), 500
|
||||
|
||||
|
||||
def parse_outline_for_frontend(outline_path: Path) -> list:
|
||||
"""
|
||||
outline_issue_report.txt를 파싱하여
|
||||
displayTocWithAnimation() 형식으로 변환
|
||||
|
||||
반환 형식:
|
||||
[
|
||||
{
|
||||
"num": "1.1.1",
|
||||
"title": "소목차 제목",
|
||||
"guide": "집필 가이드",
|
||||
"keywords": ["키워드1", "키워드2"]
|
||||
}
|
||||
]
|
||||
"""
|
||||
import re
|
||||
|
||||
raw = outline_path.read_text(encoding='utf-8', errors='ignore').splitlines()
|
||||
if not raw:
|
||||
return []
|
||||
|
||||
report_title = raw[0].strip()
|
||||
items = []
|
||||
|
||||
re_l3_head = re.compile(r'^\s*(\d+\.\d+\.\d+)\s+(.+)$')
|
||||
re_l3_topic = re.compile(r'^\s*[\-\*]\s+(.+?)\s*\|\s*(.+?)\s*\|\s*(\[.+?\])\s*\|\s*(.+)$')
|
||||
re_keywords = re.compile(r'(#\S+)')
|
||||
|
||||
current_l3 = None
|
||||
|
||||
for ln in raw[1:]:
|
||||
line = ln.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
m3h = re_l3_head.match(line)
|
||||
if m3h:
|
||||
current_l3 = {
|
||||
'num': m3h.group(1),
|
||||
'title': m3h.group(2),
|
||||
'report_title': report_title,
|
||||
'guide': '',
|
||||
'keywords': []
|
||||
}
|
||||
items.append(current_l3)
|
||||
continue
|
||||
|
||||
m3t = re_l3_topic.match(line)
|
||||
if m3t and current_l3:
|
||||
kws = [k.lstrip('#').strip() for k in re_keywords.findall(m3t.group(2))]
|
||||
# 기존 키워드에 추가
|
||||
current_l3['keywords'].extend(kws)
|
||||
# 가이드 누적
|
||||
if current_l3['guide']:
|
||||
current_l3['guide'] += ' / '
|
||||
current_l3['guide'] += m3t.group(4)
|
||||
|
||||
return items
|
||||
Reference in New Issue
Block a user