# -*- coding: utf-8 -*- """ domain_api.py 도메인 지식 관리 API + 파이프라인 래퍼 app.py에서 import하여 사용 사용법 (app.py): from domain_api import register_domain_routes register_domain_routes(app) """ import os import json from pathlib import Path from flask import request, jsonify # ===== 경로 설정 ===== # app.py와 같은 레벨에 domains/ 폴더가 있다고 가정 BASE_DIR = Path(__file__).parent DOMAIN_CONFIG_PATH = BASE_DIR / "domain_config.json" DOMAIN_DIR = BASE_DIR / "domain" # 파이프라인 출력 경로 (step3~9가 사용하는 경로) # 실제 환경에 맞게 수정 필요 PIPELINE_OUTPUT_ROOT = Path(os.getenv( "PIPELINE_OUTPUT_ROOT", r"D:\for python\geulbeot-light\geulbeot-light\00.test\hwpx\out\out" )) CONTEXT_DIR = PIPELINE_OUTPUT_ROOT / "context" def register_domain_routes(app): """Flask 앱에 도메인 관련 라우트 등록""" @app.route('/api/domain-config', methods=['GET']) def get_domain_config(): """도메인 구조 설정 반환""" try: if DOMAIN_CONFIG_PATH.exists(): config = json.loads(DOMAIN_CONFIG_PATH.read_text(encoding='utf-8')) # 각 도메인 파일 존재 여부 체크 for cat in config.get('categories', []): if cat.get('file'): fpath = DOMAIN_DIR / cat['file'] cat['file_exists'] = fpath.exists() cat['file_size'] = fpath.stat().st_size if fpath.exists() else 0 for child in cat.get('children', []): if child.get('file'): fpath = DOMAIN_DIR / child['file'] child['file_exists'] = fpath.exists() child['file_size'] = fpath.stat().st_size if fpath.exists() else 0 return jsonify(config) else: return jsonify({'error': 'domain_config.json not found', 'categories': []}), 404 except Exception as e: return jsonify({'error': str(e), 'categories': []}), 500 @app.route('/api/domain-combine', methods=['POST']) def combine_domains(): """ 선택된 도메인 .txt 파일들을 합쳐서 domain_prompt.txt로 저장 요청: { "selected": ["civil_general", "survey", "bim"] } 응답: { "success": true, "combined_length": 3200, "selected_names": [...] } """ try: data = request.get_json() selected_ids = data.get('selected', []) if not selected_ids: return jsonify({ 'success': True, 'combined_length': 0, 'selected_names': [], 'message': '선택 없음 - step3 자동 분석 모드' }) # config 로드 config = json.loads(DOMAIN_CONFIG_PATH.read_text(encoding='utf-8')) # 선택된 ID → 파일 경로 + 이름 매핑 domain_parts = [] guide_parts = [] selected_names = [] for cat in config.get('categories', []): is_guide = (cat['id'] == 'report_guide') target = guide_parts if is_guide else domain_parts if cat['id'] in selected_ids and cat.get('file'): fpath = DOMAIN_DIR / cat['file'] if fpath.exists(): content = fpath.read_text(encoding='utf-8', errors='ignore').strip() if content: target.append(f"[{cat['label']}]\n{content}") selected_names.append(cat['label']) for child in cat.get('children', []): if child['id'] in selected_ids and child.get('file'): fpath = DOMAIN_DIR / child['file'] if fpath.exists(): content = fpath.read_text(encoding='utf-8', errors='ignore').strip() if content: target.append(f"[{child['label']}]\n{content}") selected_names.append(child['label']) selected_names.append(child['label']) if not domain_parts and not guide_parts: return jsonify({ 'success': False, 'error': '선택된 도메인의 지식 파일이 비어있습니다.' }) sep = "\n\n" + "=" * 50 + "\n\n" sections = [] if domain_parts: domain_names = [n for n in selected_names if n not in ['목차 구성 가이드', '보고서 문체 가이드']] sections.append( f"너는 다음 분야의 전문가이다: {', '.join(domain_names)}.\n" f"다음의 도메인 지식을 기반으로, 사실에 근거하여 전문적이고 정확한 내용을 작성하라.\n" f"추측이나 창작은 금지하며, 제공된 근거 자료의 원문을 최대한 보존하라.\n\n" f"[도메인 전문 지식]\n" + sep.join(domain_parts) ) if guide_parts: sections.append( f"[보고서 작성 가이드]\n" f"다음 가이드를 참고하여 보고서의 목차 구성과 문체를 결정하라.\n\n" + sep.join(guide_parts) ) final_text = ("\n\n" + "=" * 60 + "\n\n").join(sections) # report_guide는 항상 자동 주입 (사용자 선택 아님) guide_dir = DOMAIN_DIR / 'report_guide' if guide_dir.exists(): guide_texts = [] for gf in sorted(guide_dir.glob('*.txt')): content = gf.read_text(encoding='utf-8', errors='ignore').strip() if content: guide_texts.append(content) if guide_texts: guide_sep = "\n\n" + "=" * 50 + "\n\n" final_text += ( "\n\n" + "=" * 60 + "\n\n" "[보고서 작성 가이드]\n" "다음 가이드를 참고하여 보고서의 목차 구성과 문체를 결정하라.\n\n" + guide_sep.join(guide_texts) ) # domain_prompt.txt로 저장 CONTEXT_DIR.mkdir(parents=True, exist_ok=True) output_path = CONTEXT_DIR / "domain_prompt.txt" output_path.write_text(final_text, encoding='utf-8') return jsonify({ 'success': True, 'combined_length': len(final_text), 'selected_names': selected_names, 'selected_ids': selected_ids, 'output_path': str(output_path) }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/domain-list', methods=['GET']) def list_domain_files(): """ domains/ 폴더의 .txt 파일 목록 반환 도메인 지식 파일 관리용 """ try: files = [] if DOMAIN_DIR.exists(): for f in sorted(DOMAIN_DIR.rglob('*.txt')): rel = f.relative_to(DOMAIN_DIR) files.append({ 'path': str(rel), 'name': f.stem, 'size': f.stat().st_size, 'preview': f.read_text(encoding='utf-8', errors='ignore')[:200] }) return jsonify({ 'success': True, 'files': files, 'domains_dir': str(DOMAIN_DIR) }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/domain-save', methods=['POST']) def save_domain_file(): """ 도메인 지식 파일 저장/수정 요청: { "id": "survey", "content": "측량 분야의 전문 지식..." } """ try: data = request.get_json() domain_id = data.get('id', '') content = data.get('content', '') if not domain_id or not content: return jsonify({'success': False, 'error': 'id와 content가 필요합니다.'}) # config에서 파일 경로 찾기 config = json.loads(DOMAIN_CONFIG_PATH.read_text(encoding='utf-8')) file_path = None for cat in config.get('categories', []): if cat['id'] == domain_id: file_path = cat.get('file') break for child in cat.get('children', []): if child['id'] == domain_id: file_path = child.get('file') break if file_path: break if not file_path: return jsonify({'success': False, 'error': f'도메인 ID를 찾을 수 없습니다: {domain_id}'}) # 파일 저장 full_path = BASE_DIR / file_path full_path.parent.mkdir(parents=True, exist_ok=True) full_path.write_text(content, encoding='utf-8') return jsonify({ 'success': True, 'path': str(full_path), 'size': len(content) }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/pipeline/status', methods=['GET']) def pipeline_status(): """파이프라인 상태 확인 - 각 step의 출력 파일 존재 여부""" try: status = { 'step3_domain': (CONTEXT_DIR / 'domain_prompt.txt').exists(), 'step4_chunks': len(list((PIPELINE_OUTPUT_ROOT / 'rag').glob('*_chunks.json'))) if (PIPELINE_OUTPUT_ROOT / 'rag').exists() else 0, 'step5_faiss': (PIPELINE_OUTPUT_ROOT / 'rag' / 'faiss.index').exists(), 'step6_corpus': (CONTEXT_DIR / 'corpus.txt').exists(), 'step7_outline': (CONTEXT_DIR / 'outline_issue_report.txt').exists(), 'step8_report': (PIPELINE_OUTPUT_ROOT / 'generated' / 'report_draft.md').exists(), 'step9_html': (PIPELINE_OUTPUT_ROOT / 'generated' / 'report.html').exists(), } return jsonify({'success': True, 'status': status}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 # ===== 파이프라인 실행 API ===== @app.route('/api/generate-toc', methods=['POST']) def generate_toc(): """ 목차 생성 API (step3 → 4 → 5 → 6 → 7) 도메인 선택을 한 경우: step3 스킵 (이미 domain_prompt.txt 있음) 도메인 선택 안 한 경우: step3 실행 요청: { "folder_path": "D:\\...", "domain_selected": true/false, "selected_domains": ["civil_general", "survey"] } 응답: { "success": true, "title": "보고서 제목", "toc_items": [ { "num": "1.1.1", "title": "...", "guide": "...", "keywords": [...] } ] } """ try: data = request.get_json() folder_path = data.get('folder_path', '') domain_selected = data.get('domain_selected', False) # TODO: 실제 step 실행 연결 # 현재는 목차 파일이 이미 있으면 읽어서 반환 outline_path = CONTEXT_DIR / 'outline_issue_report.txt' if outline_path.exists(): # 기존 목차 파싱 toc_items = parse_outline_for_frontend(outline_path) return jsonify({ 'success': True, 'title': toc_items[0].get('report_title', '보고서') if toc_items else '보고서', 'toc_items': toc_items, 'source': 'cached' }) else: return jsonify({ 'success': False, 'error': '목차 파일이 아직 생성되지 않았습니다. 파이프라인을 먼저 실행해주세요.', 'hint': 'step3~7을 순서대로 실행해야 합니다.' }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/generate-report-from-toc', methods=['POST']) def generate_report_from_toc(): """ 편집된 목차로 보고서 생성 (step8 → step9) 요청: { "toc_items": [...], # 편집된 목차 "write_mode": "restructure", "instruction": "..." } """ try: data = request.get_json() toc_items = data.get('toc_items', []) write_mode = data.get('write_mode', 'restructure') instruction = data.get('instruction', '') # TODO: step8 실행 (generate_report_gemini) # TODO: step9 실행 (md_to_html_publisher) # 현재는 기존 generated 파일이 있으면 반환 report_html_path = PIPELINE_OUTPUT_ROOT / 'generated' / 'report.html' if report_html_path.exists(): html = report_html_path.read_text(encoding='utf-8') return jsonify({ 'success': True, 'html': html, 'source': 'cached' }) else: return jsonify({ 'success': False, 'error': '보고서 파일이 아직 생성되지 않았습니다.' }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/check-folder', methods=['POST']) def check_folder(): """폴더 경로의 파일 목록 + 확장자별 분류""" try: data = request.get_json() folder = Path(data.get('folder_path', '')) if not folder.exists() or not folder.is_dir(): return jsonify({'success': False, 'error': '폴더를 찾을 수 없습니다.'}) SUPPORTED = {'.hwpx', '.hwp', '.pdf', '.docx', '.xlsx', '.pptx', '.txt', '.csv', 'md', 'json','img', 'png', 'html'} all_files = [f for f in folder.rglob('*') if f.is_file()] ok_files = [f for f in all_files if f.suffix.lower() in SUPPORTED] unknown_files = [f for f in all_files if f.suffix.lower() not in SUPPORTED] return jsonify({ 'success': True, 'total': len(all_files), 'ok': len(ok_files), 'unknown': len(unknown_files), 'ok_list': [{'name': f.name, 'size': f.stat().st_size} for f in ok_files], 'unknown_list': [f.name for f in unknown_files] }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 def parse_outline_for_frontend(outline_path: Path) -> list: """ outline_issue_report.txt를 파싱하여 displayTocWithAnimation() 형식으로 변환 반환 형식: [ { "num": "1.1.1", "title": "소목차 제목", "guide": "집필 가이드", "keywords": ["키워드1", "키워드2"] } ] """ import re raw = outline_path.read_text(encoding='utf-8', errors='ignore').splitlines() if not raw: return [] report_title = raw[0].strip() items = [] re_l3_head = re.compile(r'^\s*(\d+\.\d+\.\d+)\s+(.+)$') re_l3_topic = re.compile(r'^\s*[\-\*]\s+(.+?)\s*\|\s*(.+?)\s*\|\s*(\[.+?\])\s*\|\s*(.+)$') re_keywords = re.compile(r'(#\S+)') current_l3 = None for ln in raw[1:]: line = ln.strip() if not line: continue m3h = re_l3_head.match(line) if m3h: current_l3 = { 'num': m3h.group(1), 'title': m3h.group(2), 'report_title': report_title, 'guide': '', 'keywords': [] } items.append(current_l3) continue m3t = re_l3_topic.match(line) if m3t and current_l3: kws = [k.lstrip('#').strip() for k in re_keywords.findall(m3t.group(2))] # 기존 키워드에 추가 current_l3['keywords'].extend(kws) # 가이드 누적 if current_l3['guide']: current_l3['guide'] += ' / ' current_l3['guide'] += m3t.group(4) return items