# -*- coding: utf-8 -*- """ 글벗 Light v2.0 Flask 라우팅 + 공통 기능 """ import os import io import tempfile import json import shutil from datetime import datetime from flask import Flask, render_template, request, jsonify, Response, session, send_file import queue import threading from handlers.template_manager import TemplateManager from pathlib import Path # 문서 유형별 프로세서 from handlers.template import TemplateProcessor from handlers.briefing import BriefingProcessor from handlers.report import ReportProcessor from handlers.custom_doc_type import CustomDocTypeProcessor from handlers.doc_type_analyzer import DocTypeAnalyzer app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'geulbeot-light-secret-key-v2') # processors 딕셔너리에 추가 template_mgr = TemplateManager() processors = { 'briefing': BriefingProcessor(), 'report': ReportProcessor(), 'template': TemplateProcessor(), 'custom': CustomDocTypeProcessor() } DOC_TYPES_DEFAULT = Path('templates/default/doc_types') DOC_TYPES_USER = Path('templates/user/doc_types') # ============== 메인 페이지 ============== @app.route('/') def index(): """메인 페이지""" return render_template('index.html') @app.route('/api/doc-types', methods=['GET']) def get_doc_types(): """문서 유형 목록 조회""" try: doc_types = [] # default 폴더 스캔 if DOC_TYPES_DEFAULT.exists(): for folder in DOC_TYPES_DEFAULT.iterdir(): if folder.is_dir(): config_file = folder / 'config.json' if config_file.exists(): with open(config_file, 'r', encoding='utf-8') as f: doc_types.append(json.load(f)) # user 폴더 스캔 if DOC_TYPES_USER.exists(): for folder in DOC_TYPES_USER.iterdir(): if folder.is_dir(): config_file = folder / 'config.json' if config_file.exists(): with open(config_file, 'r', encoding='utf-8') as f: doc_types.append(json.load(f)) # order → isDefault 순 정렬 doc_types.sort(key=lambda x: (x.get('order', 999), not x.get('isDefault', False))) return jsonify(doc_types) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 @app.route('/api/doc-types', methods=['POST']) def add_doc_type(): """문서 유형 추가 (분석 결과 저장)""" try: data = request.get_json() if not data: return jsonify({'error': 'JSON 데이터가 필요합니다'}), 400 # user 폴더 생성 DOC_TYPES_USER.mkdir(parents=True, exist_ok=True) type_id = data.get('id') if not type_id: import time type_id = f"user_{int(time.time())}" data['id'] = type_id folder_path = DOC_TYPES_USER / type_id folder_path.mkdir(parents=True, exist_ok=True) # config.json 저장 with open(folder_path / 'config.json', 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) return jsonify(data) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 @app.route('/api/doc-types/', methods=['DELETE']) def delete_doc_type(type_id): """문서 유형 삭제""" try: folder_path = DOC_TYPES_USER / type_id if not folder_path.exists(): return jsonify({'error': '문서 유형을 찾을 수 없습니다'}), 404 shutil.rmtree(folder_path) return jsonify({'success': True, 'deleted': type_id}) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 # ============== 생성 API ============== @app.route('/generate', methods=['POST']) def generate(): """문서 생성 API""" try: content = "" if 'file' in request.files and request.files['file'].filename: file = request.files['file'] content = file.read().decode('utf-8') elif 'content' in request.form: content = request.form.get('content', '') doc_type = request.form.get('doc_type', 'briefing') if doc_type.startswith('user_'): options = { 'instruction': request.form.get('instruction', '') } result = processors['custom'].generate(content, doc_type, options) else: options = { 'page_option': request.form.get('page_option', '1'), 'department': request.form.get('department', ''), 'instruction': request.form.get('instruction', '') } processor = processors.get(doc_type, processors['briefing']) result = processor.generate(content, options) if 'error' in result: return jsonify(result), 400 if 'trace' not in result else 500 return jsonify(result) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 @app.route('/generate-report', methods=['POST']) def generate_report(): """보고서 생성 API""" try: data = request.get_json() or {} content = data.get('content', '') options = { 'folder_path': data.get('folder_path', ''), 'cover': data.get('cover', False), 'toc': data.get('toc', False), 'divider': data.get('divider', False), 'instruction': data.get('instruction', ''), 'template_id': data.get('template_id') } result = processors['report'].generate(content, options) if 'error' in result: return jsonify(result), 500 return jsonify(result) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 # ============== 수정 API ============== @app.route('/refine', methods=['POST']) def refine(): """피드백 반영 API""" try: feedback = request.json.get('feedback', '') current_html = request.json.get('current_html', '') or session.get('current_html', '') original_html = session.get('original_html', '') doc_type = request.json.get('doc_type', 'briefing') processor = processors.get(doc_type, processors['briefing']) result = processor.refine(feedback, current_html, original_html) if 'error' in result: return jsonify(result), 400 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/refine-selection', methods=['POST']) def refine_selection(): """선택 부분 수정 API""" try: data = request.json current_html = data.get('current_html', '') selected_text = data.get('selected_text', '') user_request = data.get('request', '') doc_type = data.get('doc_type', 'briefing') processor = processors.get(doc_type, processors['briefing']) result = processor.refine_selection(current_html, selected_text, user_request) if 'error' in result: return jsonify(result), 400 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 # ============== 다운로드 API ============== @app.route('/download/html', methods=['POST']) def download_html(): """HTML 파일 다운로드""" html_content = request.form.get('html', '') if not html_content: return "No content", 400 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'report_{timestamp}.html' return Response( html_content, mimetype='text/html', headers={'Content-Disposition': f'attachment; filename={filename}'} ) @app.route('/download/pdf', methods=['POST']) def download_pdf(): """PDF 파일 다운로드""" try: from weasyprint import HTML html_content = request.form.get('html', '') if not html_content: return "No content", 400 pdf_buffer = io.BytesIO() HTML(string=html_content).write_pdf(pdf_buffer) pdf_buffer.seek(0) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'report_{timestamp}.pdf' return Response( pdf_buffer.getvalue(), mimetype='application/pdf', headers={'Content-Disposition': f'attachment; filename={filename}'} ) except ImportError: return jsonify({'error': 'PDF 변환 미지원'}), 501 except Exception as e: return jsonify({'error': f'PDF 변환 오류: {str(e)}'}), 500 # ============== 기타 API ============== @app.route('/assets/') def serve_assets(filename): """로컬 assets 폴더 서빙""" assets_dir = r"D:\for python\geulbeot-light\geulbeot-light\output\assets" return send_file(os.path.join(assets_dir, filename)) @app.route('/hwp-script') def hwp_script(): """HWP 변환 스크립트 안내""" return render_template('hwp_guide.html') @app.route('/health') def health(): """헬스 체크""" return jsonify({'status': 'healthy', 'version': '2.0.0'}) @app.route('/export-hwp', methods=['POST']) def export_hwp(): """HWP 변환 (스타일 그루핑 지원)""" try: data = request.get_json() html_content = data.get('html', '') doc_type = data.get('doc_type', 'briefing') use_style_grouping = data.get('style_grouping', False) # 새 옵션 if not html_content: return jsonify({'error': 'HTML 내용이 없습니다'}), 400 temp_dir = tempfile.gettempdir() html_path = os.path.join(temp_dir, 'geulbeot_temp.html') hwp_path = os.path.join(temp_dir, 'geulbeot_output.hwp') with open(html_path, 'w', encoding='utf-8') as f: f.write(html_content) # 변환기 선택 if doc_type == 'briefing': from converters.html_to_hwp_briefing import HtmlToHwpConverter else: from converters.html_to_hwp import HtmlToHwpConverter converter = HtmlToHwpConverter(visible=False) # 스타일 그루핑 사용 여부 if use_style_grouping: final_path = converter.convert_with_styles(html_path, hwp_path) # HWPX 파일 전송 return send_file( final_path, as_attachment=True, download_name=f'report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.hwpx', mimetype='application/vnd.hancom.hwpx' ) else: converter.convert(html_path, hwp_path) return send_file( hwp_path, as_attachment=True, download_name=f'report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.hwp', mimetype='application/x-hwp' ) except ImportError as e: return jsonify({'error': f'pyhwpx 필요: {str(e)}'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 # 기존 add_doc_type 대체 또는 수정 @app.route('/api/doc-types/analyze', methods=['POST']) def analyze_doc_type(): """문서 유형 분석 API""" if 'file' not in request.files: return jsonify({"error": "파일이 필요합니다"}), 400 file = request.files['file'] doc_name = request.form.get('name', '새 문서 유형') # 임시 저장 import tempfile temp_path = os.path.join(tempfile.gettempdir(), file.filename) file.save(temp_path) try: analyzer = DocTypeAnalyzer() result = analyzer.analyze(temp_path, doc_name) return jsonify({ "success": True, "config": result["config"], "summary": { "pageCount": result["structure"]["pageCount"], "sections": len(result["toc"]), "style": result["style"] } }) except Exception as e: return jsonify({"error": str(e)}), 500 finally: os.remove(temp_path) @app.route('/analyze-styles', methods=['POST']) def analyze_styles(): """HTML 스타일 분석 미리보기""" try: data = request.get_json() html_content = data.get('html', '') if not html_content: return jsonify({'error': 'HTML 내용이 없습니다'}), 400 from converters.style_analyzer import StyleAnalyzer from converters.hwp_style_mapping import ROLE_TO_STYLE_NAME analyzer = StyleAnalyzer() elements = analyzer.analyze(html_content) # 요약 정보 summary = analyzer.get_role_summary() # 상세 정보 (처음 50개만) details = [] for elem in elements[:50]: details.append({ 'role': elem.role, 'hwp_style': ROLE_TO_STYLE_NAME.get(elem.role, '바탕글'), 'text': elem.text[:50] + ('...' if len(elem.text) > 50 else ''), 'section': elem.section }) return jsonify({ 'total_elements': len(elements), 'summary': summary, 'details': details }) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 @app.route('/templates', methods=['GET']) def get_templates(): """저장된 템플릿 목록 조회""" try: templates = template_mgr.list_templates() return jsonify(templates) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/templates', methods=['GET']) def get_templates_api(): """템플릿 목록 조회 (API 경로)""" try: templates = template_mgr.list_templates() return jsonify(templates) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/analyze-template', methods=['POST']) def analyze_template(): """템플릿 추출 및 저장 (doc_template_analyzer → template_manager)""" try: if 'file' not in request.files: return jsonify({'error': '파일이 없습니다'}), 400 file = request.files['file'] name = request.form.get('name', '').strip() if not name: return jsonify({'error': '템플릿 이름을 입력해주세요'}), 400 if not file.filename: return jsonify({'error': '파일을 선택해주세요'}), 400 # 임시 저장 → HWPX 파싱 → 템플릿 추출 temp_dir = tempfile.gettempdir() temp_path = os.path.join(temp_dir, file.filename) file.save(temp_path) try: # v3 파서 재사용 (HWPX → parsed dict) from handlers.doc_type_analyzer import DocTypeAnalyzer parser = DocTypeAnalyzer() parsed = parser._parse_hwpx(temp_path) # template_manager로 추출+저장 result = template_mgr.extract_and_save( parsed, name, source_file=file.filename ) return jsonify(result) finally: try: os.remove(temp_path) except: pass except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 # ============== 문서 유형 분석 SSE API ============== @app.route('/api/doc-types/analyze-stream', methods=['POST']) def analyze_doc_type_stream(): """ 문서 유형 분석 (SSE 스트리밍) 실시간으로 각 단계의 진행 상황을 전달 """ import tempfile # 파일 및 데이터 검증 if 'file' not in request.files: return jsonify({'error': '파일이 없습니다'}), 400 file = request.files['file'] name = request.form.get('name', '').strip() description = request.form.get('description', '').strip() if not name: return jsonify({'error': '문서 유형 이름을 입력해주세요'}), 400 if not file.filename: return jsonify({'error': '파일을 선택해주세요'}), 400 # 임시 파일 저장 temp_dir = tempfile.gettempdir() temp_path = os.path.join(temp_dir, file.filename) file.save(temp_path) # 메시지 큐 생성 message_queue = queue.Queue() analysis_result = {"data": None, "error": None} def progress_callback(step_id, status, message): """진행 상황 콜백 - 메시지 큐에 추가""" message_queue.put({ "type": "progress", "step": step_id, "status": status, "message": message }) def run_analysis(): """분석 실행 (별도 스레드)""" try: analyzer = DocTypeAnalyzer(progress_callback=progress_callback) result = analyzer.analyze(temp_path, name, description) # 저장 save_path = analyzer.save_doc_type(result["config"], result.get("template", "") ) analysis_result["data"] = { "success": True, "config": result["config"], "layout": result.get("layout", {}), "context": result.get("context", {}), "structure": result.get("structure", {}), "template_generated": bool(result.get("template_id") or result.get("template")), "template_id": result.get("template_id"), # ★ 추가 "saved_path": save_path } except Exception as e: import traceback analysis_result["error"] = { "message": str(e), "trace": traceback.format_exc() } finally: # 완료 신호 message_queue.put({"type": "complete"}) # 임시 파일 삭제 try: os.remove(temp_path) except: pass def generate_events(): """SSE 이벤트 생성기""" # 분석 시작 analysis_thread = threading.Thread(target=run_analysis) analysis_thread.start() # 이벤트 스트리밍 while True: try: msg = message_queue.get(timeout=60) # 60초 타임아웃 if msg["type"] == "complete": # 분석 완료 if analysis_result["error"]: yield f"data: {json.dumps({'type': 'error', 'error': analysis_result['error']}, ensure_ascii=False)}\n\n" else: yield f"data: {json.dumps({'type': 'result', 'data': analysis_result['data']}, ensure_ascii=False)}\n\n" break else: # 진행 상황 yield f"data: {json.dumps(msg, ensure_ascii=False)}\n\n" except queue.Empty: # 타임아웃 yield f"data: {json.dumps({'type': 'error', 'error': {'message': '분석 시간 초과'}}, ensure_ascii=False)}\n\n" break return Response( generate_events(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' } ) @app.route('/delete-template/', methods=['DELETE']) def delete_template(template_id): """템플릿 삭제 (레거시 호환)""" try: result = template_mgr.delete_template(template_id) if 'error' in result: return jsonify(result), 400 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/templates/', methods=['GET']) def get_template(tpl_id): """특정 템플릿 조회""" try: result = template_mgr.load_template(tpl_id) if 'error' in result: return jsonify(result), 404 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/templates/', methods=['DELETE']) def delete_template_new(tpl_id): """템플릿 삭제""" try: result = template_mgr.delete_template(tpl_id) if 'error' in result: return jsonify(result), 400 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/doc-types//template', methods=['PUT']) def change_doc_type_template(type_id): """문서 유형의 템플릿 교체""" try: data = request.get_json() new_tpl_id = data.get('template_id') if not new_tpl_id: return jsonify({'error': 'template_id가 필요합니다'}), 400 result = template_mgr.change_template(type_id, new_tpl_id) if 'error' in result: return jsonify(result), 400 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/doc-types//template', methods=['GET']) def get_doc_type_template(type_id): """문서 유형에 연결된 템플릿 조회""" try: result = template_mgr.get_template_for_doctype(type_id) if 'error' in result: return jsonify(result), 404 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': port = int(os.environ.get('PORT', 5000)) debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true' app.run(host='0.0.0.0', port=port, debug=debug)