# -*- coding: utf-8 -*- from dotenv import load_dotenv load_dotenv() """ 湲踰 Light v2.0 Flask 고 + 怨듯 湲곕 """ import os import io import tempfile import json import shutil from datetime import datetime from flask import Flask, render_template, request, jsonify, Response, session, send_file import queue import threading from handlers.template.template_manager import TemplateManager from pathlib import Path from domain_api import register_domain_routes from db import init_db # 臾몄 蹂 濡 몄 from handlers.template import TemplateProcessor app = Flask(__name__)g import BriefingProcessor app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'geulbeot-light-secret-key-v2') register_domain_routes(app)analyzer import DocTypeAnalyzer init_db() # processors 由ъ媛 template_mgr = TemplateManager() processors = { 'briefing': BriefingProcessor(), 'report': ReportProcessor(), 'template': TemplateProcessor(), 'custom': CustomDocTypeProcessor() } DOC_TYPES_DEFAULT = Path('templates/default/doc_types') DOC_TYPES_USER = Path('templates/user/doc_types') # ============== 硫 ============== @app.route('/') def index(): """硫""" return render_template('index.html') @app.route('/api/doc-types', methods=['GET']) def get_doc_types(): """臾몄 紐⑸""" try: doc_types = [] # default 대 ㅼ if DOC_TYPES_DEFAULT.exists(): for folder in DOC_TYPES_DEFAULT.iterdir(): if folder.is_dir(): config_file = folder / 'config.json' if config_file.exists(): with open(config_file, 'r', encoding='utf-8') as f: doc_types.append(json.load(f)) # user 대 ㅼ if DOC_TYPES_USER.exists(): for folder in DOC_TYPES_USER.iterdir(): if folder.is_dir(): config_file = folder / 'config.json' if config_file.exists(): with open(config_file, 'r', encoding='utf-8') as f: doc_types.append(json.load(f)) # order isDefault doc_types.sort(key=lambda x: (x.get('order', 999), not x.get('isDefault', False))) return jsonify(doc_types) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 @app.route('/api/doc-types', methods=['POST']) def add_doc_type(): """臾몄 異媛 (遺 )""" try: data = request.get_json() if not data: return jsonify({'error': 'JSON 곗 곌 ⑸'}), 400 # user 대 DOC_TYPES_USER.mkdir(parents=True, exist_ok=True) type_id = data.get('id') if not type_id: import time type_id = f"user_{int(time.time())}" data['id'] = type_id folder_path = DOC_TYPES_USER / type_id folder_path.mkdir(parents=True, exist_ok=True) # config.json with open(folder_path / 'config.json', 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) return jsonify(data) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 @app.route('/api/doc-types/', methods=['DELETE']) def delete_doc_type(type_id): """臾몄 """ try: folder_path = DOC_TYPES_USER / type_id if not folder_path.exists(): return jsonify({'error': '臾몄 李얠 듬'}), 404 shutil.rmtree(folder_path) return jsonify({'success': True, 'deleted': type_id}) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 # ============== 濡 API ============== @app.route('/api/upload-files', methods=['POST']) def upload_files(): """ 濡 /tmp/{session_id}/input/ """ try: import uuid files = request.files.getlist('files') if not files: return jsonify({'error': '듬'}), 400 # 몄 session_id = str(uuid.uuid4()) input_dir = f'/tmp/{session_id}/input' os.makedirs(input_dir, exist_ok=True) saved = [] for f in files: if f.filename: # 湲 쇰 蹂 蹂寃 紐⑤ - step1~2 異異 GPD 由우ㅽ""" try: data = request.get_json() or {} session_id = data.get('session_id', '') if not session_id: return jsonify({'error': 'session_id媛 듬'}), 400 input_dir = f'/tmp/{session_id}/input' if not os.path.exists(input_dir): return jsonify({'error': ' 濡 李얠 듬'}), 400 # step_format (洹) from converters.pipeline.step_format import run_format_only result = run_format_only(session_id, input_dir) if not result.get('success'): return jsonify(result), 500 return jsonify(result) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 # ============== API ============== @app.route('/generate', methods=['POST']) def generate(): """臾몄 API""" try: content = "" if 'file' in request.files and request.files['file'].filename: file = request.files['file'] content = file.read().decode('utf-8') elif 'content' in request.form: content = request.form.get('content', '') doc_type = request.form.get('doc_type', 'briefing') if doc_type.startswith('user_'): options = { 'instruction': request.form.get('instruction', '') } result = processors['custom'].generate(content, doc_type, options) else: options = { 'page_option': request.form.get('page_option', '1'), 'department': request.form.get('department', ''), 'instruction': request.form.get('instruction', '') } processor = processors.get(doc_type, processors['briefing']) result = processor.generate(content, options) if 'error' in result: return jsonify(result), 400 if 'trace' not in result else 500 return jsonify(result) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 @app.route('/generate-report', methods=['POST']) def generate_report(): """蹂닿 API""" try: data = request.get_json() or {} content = data.get('content', '') options = { 'folder_path': data.get('folder_path', ''), 'cover': data.get('cover', False), 'toc': data.get('toc', False), 'divider': data.get('divider', False), 'instruction': data.get('instruction', ''), 'template_id': data.get('template_id') } result = processors['report'].generate(content, options) if 'error' in result: return jsonify(result), 500 return jsonify(result) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 # ============== API ============== @app.route('/refine', methods=['POST']) def refine(): """쇰諛 諛 API""" try: feedback = request.json.get('feedback', '') current_html = request.json.get('current_html', '') or session.get('current_html', '') original_html = session.get('original_html', '') doc_type = request.json.get('doc_type', 'briefing') processor = processors.get(doc_type, processors['briefing']) result = processor.refine(feedback, current_html, original_html) if 'error' in result: return jsonify(result), 400 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/refine-selection', methods=['POST']) def refine_selection(): """ 遺 API""" try: data = request.json current_html = data.get('current_html', '') selected_text = data.get('selected_text', '') user_request = data.get('request', '') doc_type = data.get('doc_type', 'briefing') processor = processors.get(doc_type, processors['briefing']) result = processor.refine_selection(current_html, selected_text, user_request) if 'error' in result: return jsonify(result), 400 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 # ============== ㅼ대 API ============== @app.route('/download/html', methods=['POST']) def download_html(): """HTML ㅼ대""" html_content = request.form.get('html', '') if not html_content: return "No content", 400 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'report_{timestamp}.html' return Response( html_content, mimetype='text/html', headers={'Content-Disposition': f'attachment; filename={filename}'} ) @app.route('/download/pdf', methods=['POST']) def download_pdf(): """PDF ㅼ대""" try: from weasyprint import HTML html_content = request.form.get('html', '') if not html_content: return "No content", 400 pdf_buffer = io.BytesIO() HTML(string=html_content).write_pdf(pdf_buffer) pdf_buffer.seek(0) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'report_{timestamp}.pdf' return Response( pdf_buffer.getvalue(), mimetype='application/pdf', headers={'Content-Disposition': f'attachment; filename={filename}'} ) except ImportError: return jsonify({'error': 'PDF 蹂몄고 API ============== @app.route('/assets/') def serve_assets(filename): """濡而 assets 대 鍮""" assets_dir = os.environ.get("ASSETS_BASE_PATH", "/tmp/assets") return send_file(os.path.join(assets_dir, filename)) @app.route('/hwp-script') def hwp_script(): """HWP 蹂ㅽщ┰ """ return render_template('hwp_guide.html') @app.route('/health') def health(): """ъ 泥댄""" return jsonify({'status': 'healthy', 'version': '2.0.0'}) @app.route('/export-hwp', methods=['POST']) def export_hwp(): """HWP 蹂ㅽ 吏 댁⑹듬'}), 400 temp_dir = tempfile.gettempdir() html_path = os.path.join(temp_dir, 'geulbeot_temp.html') hwp_path = os.path.join(temp_dir, 'geulbeot_output.hwp') with open(html_path, 'w', encoding='utf-8') as f: f.write(html_content) # 蹂 ㅽ ъ щ if use_style_grouping: final_path = converter.convert_with_styles(html_path, hwp_path) # HWPX return send_file( final_path, as_attachment=True, download_name=f'report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.hwpx', mimetype='application/vnd.hancom.hwpx' ) else: converter.convert(html_path, hwp_path) return send_file( hwp_path, as_attachment=True, download_name=f'report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.hwp', mimetype='application/x-hwp' ) except ImportError as e: return jsonify({'error': f'pyhwpx : {str(e)}'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 # 湲곗〈 add_doc_type 泥 @app.route('/api/doc-types/analyze', methods=['POST']) def analyze_doc_type(): """臾몄 遺 ⑸"}), 400 file = request.files['file'] doc_name = request.form.get('name', ' 臾몄 ') # import tempfile temp_path = os.path.join(tempfile.gettempdir(), file.filename) file.save(temp_path) try: analyzer = DocTypeAnalyzer() result = analyzer.analyze(temp_path, doc_name) return jsonify({ "success": True, "config": result["config"], "summary": { "pageCount": result["structure"]["pageCount"], "sections": len(result["toc"]), except Exception as e: return jsonify({"error": str(e)}), 500 finally: os.remove(temp_path) @app.route('/analyze-styles', methods=['POST']) def analyze_styles(): """HTML ㅽ @app.route('/anayze-styles', methods=['POST']) def analyze_style try: data = request.get_json() html_content = data.get('html', '') if not html_content: return jsonify({'error': 'HTML 댁⑹듬'}), 400 from converters.style_analyzer import StyleAnalyzer from converters.hwp_style_mapping import ROLE_TO_STYLE_NAME analyzer = StyleAnalyzer() elements = analyzer.analyze(html_content) # 蹂 summary = analyzer.get_role_summary() # elem in elements[:50]: 蹂 (泥 50媛留) details = [] for elem in elements[:50]: details.append({ 'role': elem.role, 'hwp_style': ROLE_TO_STYLE_NAME.get(elem.role, '諛湲'), 'text': elem.text[:50] + ('...' if len(elem.text) > 50 else ''), 'section': elem.section }) return jsonify({ 'total_elements': len(elements), 'summary': summary, 'details': details }) except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 @app.route('/templates', methods=['GET']) def get_templates(): """ 由 紐⑸""" try: templates = template_mgr.list_templates() return jsonify(templates) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/templates', methods=['GET']) def get_templates_api(): """ 由 紐⑸ (API 寃쎈)""" try: templates = template_mgr.list_templates() return jsonify(templates) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/analyze-template', methods=['POST']) def analyze_template(): """ 由 異異 諛 (doc_template_analyzer template_manager)""" try: if 'file' not in request.files: return jsonify({'error': '듬'}), 400 file = request.files['file'] name = request.form.get('name', '').strip() if not name: return jsonify({'error': ' 由 ν댁< 몄'}), 400 if not file.filename: return jsonify({'error': ' 댁< 몄'}), 400 # HWPX 由 異異 temp_dir = tempfile.gettempdir() temp_path = os.path.join(temp_dir, file.filename) file.save(temp_path) try: # v3 ъ (HWPX parsed dict) from handlers.doc.doc_type_analyzer import DocTypeAnalyzer parser = DocTypeAnalyzer() parsed = parser._parse_hwpx(temp_path) # template_manager濡 異異+ result = template_mgr.extract_and_save( parsed, name, source_file=file.filename ) return jsonify(result) finally: try: os.remove(temp_path) except: pass except Exception as e: import traceback return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 # ============== 臾몄 遺 遺 ㅽ몃━諛) ㅼ媛 쇰 媛 ④ ⑹ """ import tempfile # 곗 寃利듬'}), 400 file = request.files['file'] name = request.form.get('name', '').strip() description = request.form.get('description', '').strip() if not name: return jsonify({'error': '臾몄 ν댁< 몄'}), 400 if not file.filename: return jsonify({'error': ' 댁< 몄'}), 400 # temp_dir = tempfile.gettempdir() temp_path = os.path.join(temp_dir, file.filename) file.save(temp_path) # 硫吏 message_queue = queue.Queue() analysis_result = {"data": None, "error": None} def progress_callback(step_id, status, message): """吏 肄諛 - 硫吏 媛""" message_queue.put({ "type": "progress", "step": step_id, "status": status, "message": message }) def run_analysis(): """遺 ㅽ (蹂 ㅻ)""" try: analyzer = DocTypeAnalyzer(progress_callback=progress_callback) result = analyzer.analyze(temp_path, name, description) # save_path = analyzer.save_doc_type(result["config"], result.get("template", "") ) analysis_result["data"] = { "success": True, "config": result["config"], "layout": result.get("layout", {}), "context": result.get("context", {}), "structure": result.get("structure", {}), "template_generated": bool(result.get("template_id") or result.get("template")), "template_id": result.get("template_id"), # 異媛 "saved_path": save_path } except Exception as e: import traceback analysis_result["error"] = { "message": str(e), "trace": traceback.format_exc() } finally: # 猷 message_queue.put({"type": "complete"}) # try: os.remove(temp_path) except: pass def generate_events(): """SSE 깃린""" # 遺 analysis_thread = threading.Thread(target=run_analysis) analysis_thread.start() # ㅽ몃━諛 while True: try: msg = message_queue.get(timeout=60) # 60珥 if msg["type"] == "complete": # 遺 猷 if analysis_result["error"]: yield f"data: {json.dumps({'type': 'error', 'error': analysis_result['error']}, ensure_ascii=False)}\n\n" else: yield f"data: {json.dumps({'type': 'result', 'data': analysis_result['data']}, ensure_ascii=False)}\n\n" break else: # 吏 yield f"data: {json.dumps(msg, ensure_ascii=False)}\n\n" except queue.Empty: # yield f"data: {json.dumps({'type': 'error', 'error': {'message': '遺 媛 珥怨'}}, ensure_ascii=False)}\n\n" break return Response( generate_events(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' } ) @app.route('/delete-template/', methods=['DELETE']) def delete_template(template_id): """ 由 (嫄곗 뱀 由 議고""" try: result = template_mgr.load_template(tpl_id) if 'error' in result: return jsonify(result), 404 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/templates/', methods=['DELETE']) def delete_template_new(tpl_id): """ 由 """ try: result = template_mgr.delete_template(tpl_id) if 'error' in result: return jsonify(result), 400 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/doc-types//template', methods=['PUT']) def change_doc_type_template(type_id): """臾몄 由 援""" try: data = request.get_json() new_tpl_id = data.get('template_id') if not new_tpl_id: return jsonify({'error': 'template_id媛 ⑸'}), 400 result = template_mgr.change_template(type_id, new_tpl_id) if 'error' in result: return jsonify(result), 400 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/doc-types//template', methods=['GET']) def get_doc_type_template(type_id): """臾몄 곌껐 由 議고""" try: result = template_mgr.get_template_for_doctype(type_id) if 'error' in result: return jsonify(result), 404 return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': port = int(os.environ.get('PORT', 5000)) debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true' app.run(host='0.0.0.0', port=port, debug=debug)