Files
_Geulbeot/03.Code/업로드용/app.py
2026-03-19 08:55:51 +09:00

806 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
from dotenv import load_dotenv
load_dotenv()
"""
湲踰 Light v2.0
Flask 고
+ 怨듯 湲곕
"""
import os
import io
import tempfile
import json
import shutil
from datetime import datetime
from flask import Flask, render_template, request, jsonify, Response, session, send_file
import queue
import threading
from handlers.template.template_manager import TemplateManager
from pathlib import Path
from domain_api import register_domain_routes
from db import init_db
# 臾몄
from handlers.template import TemplateProcessor
app = Flask(__name__)g import BriefingProcessor
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'geulbeot-light-secret-key-v2')
register_domain_routes(app)analyzer import DocTypeAnalyzer
init_db()
# processors
由ъ媛
template_mgr = TemplateManager()
processors = {
'briefing': BriefingProcessor(),
'report': ReportProcessor(),
'template': TemplateProcessor(),
'custom': CustomDocTypeProcessor()
}
DOC_TYPES_DEFAULT = Path('templates/default/doc_types')
DOC_TYPES_USER = Path('templates/user/doc_types')
# ============== 硫 ==============
@app.route('/')
def index():
""""""
return render_template('index.html')
@app.route('/api/doc-types', methods=['GET'])
def get_doc_types():
"""臾몄
紐⑸"""
try:
doc_types = []
# default 대 ㅼ
if DOC_TYPES_DEFAULT.exists():
for folder in DOC_TYPES_DEFAULT.iterdir():
if folder.is_dir():
config_file = folder / 'config.json'
if config_file.exists():
with open(config_file, 'r', encoding='utf-8') as f:
doc_types.append(json.load(f))
# user 대 ㅼ
if DOC_TYPES_USER.exists():
for folder in DOC_TYPES_USER.iterdir():
if folder.is_dir():
config_file = folder / 'config.json'
if config_file.exists():
with open(config_file, 'r', encoding='utf-8') as f:
doc_types.append(json.load(f))
# order isDefault
doc_types.sort(key=lambda x: (x.get('order', 999), not x.get('isDefault', False)))
return jsonify(doc_types)
except Exception as e:
import traceback
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
@app.route('/api/doc-types', methods=['POST'])
def add_doc_type():
"""臾몄
異媛 (遺
)"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'JSON 곗
'}), 400
# user 대
DOC_TYPES_USER.mkdir(parents=True, exist_ok=True)
type_id = data.get('id')
if not type_id:
import time
type_id = f"user_{int(time.time())}"
data['id'] = type_id
folder_path = DOC_TYPES_USER / type_id
folder_path.mkdir(parents=True, exist_ok=True)
# config.json
with open(folder_path / 'config.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return jsonify(data)
except Exception as e:
import traceback
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
@app.route('/api/doc-types/<type_id>', methods=['DELETE'])
def delete_doc_type(type_id):
"""臾몄
"""
try:
folder_path = DOC_TYPES_USER / type_id
if not folder_path.exists():
return jsonify({'error': '臾몄
李얠
'}), 404
shutil.rmtree(folder_path)
return jsonify({'success': True, 'deleted': type_id})
except Exception as e:
import traceback
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
# ==============
API ==============
@app.route('/api/upload-files', methods=['POST'])
def upload_files():
"""
濡 /tmp/{session_id}/input/ """
try:
import uuid
files = request.files.getlist('files')
if not files:
return jsonify({'error': ''}), 400
#
session_id = str(uuid.uuid4())
input_dir = f'/tmp/{session_id}/input'
os.makedirs(input_dir, exist_ok=True)
saved = []
for f in files:
if f.filename:
# 湲
蹂寃 紐⑤ - step1~2 異異
GPD
由우ㅽ"""
try:
data = request.get_json() or {}
session_id = data.get('session_id', '')
if not session_id:
return jsonify({'error': 'session_id媛 듬'}), 400
input_dir = f'/tmp/{session_id}/input'
if not os.path.exists(input_dir):
return jsonify({'error': '
李얠
'}), 400
# step_format (洹)
from converters.pipeline.step_format import run_format_only
result = run_format_only(session_id, input_dir)
if not result.get('success'):
return jsonify(result), 500
return jsonify(result)
except Exception as e:
import traceback
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
# ==============
API ==============
@app.route('/generate', methods=['POST'])
def generate():
"""臾몄
API"""
try:
content = ""
if 'file' in request.files and request.files['file'].filename:
file = request.files['file']
content = file.read().decode('utf-8')
elif 'content' in request.form:
content = request.form.get('content', '')
doc_type = request.form.get('doc_type', 'briefing')
if doc_type.startswith('user_'):
options = {
'instruction': request.form.get('instruction', '')
}
result = processors['custom'].generate(content, doc_type, options)
else:
options = {
'page_option': request.form.get('page_option', '1'),
'department': request.form.get('department', ''),
'instruction': request.form.get('instruction', '')
}
processor = processors.get(doc_type, processors['briefing'])
result = processor.generate(content, options)
if 'error' in result:
return jsonify(result), 400 if 'trace' not in result else 500
return jsonify(result)
except Exception as e:
import traceback
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
@app.route('/generate-report', methods=['POST'])
def generate_report():
"""蹂닿
API"""
try:
data = request.get_json() or {}
content = data.get('content', '')
options = {
'folder_path': data.get('folder_path', ''),
'cover': data.get('cover', False),
'toc': data.get('toc', False),
'divider': data.get('divider', False),
'instruction': data.get('instruction', ''),
'template_id': data.get('template_id')
}
result = processors['report'].generate(content, options)
if 'error' in result:
return jsonify(result), 500
return jsonify(result)
except Exception as e:
import traceback
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
# ============== API ==============
@app.route('/refine', methods=['POST'])
def refine():
"""쇰諛 API"""
try:
feedback = request.json.get('feedback', '')
current_html = request.json.get('current_html', '') or session.get('current_html', '')
original_html = session.get('original_html', '')
doc_type = request.json.get('doc_type', 'briefing')
processor = processors.get(doc_type, processors['briefing'])
result = processor.refine(feedback, current_html, original_html)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/refine-selection', methods=['POST'])
def refine_selection():
"""
API"""
try:
data = request.json
current_html = data.get('current_html', '')
selected_text = data.get('selected_text', '')
user_request = data.get('request', '')
doc_type = data.get('doc_type', 'briefing')
processor = processors.get(doc_type, processors['briefing'])
result = processor.refine_selection(current_html, selected_text, user_request)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============== ㅼ대 API ==============
@app.route('/download/html', methods=['POST'])
def download_html():
"""HTML ㅼ대"""
html_content = request.form.get('html', '')
if not html_content:
return "No content", 400
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'report_{timestamp}.html'
return Response(
html_content,
mimetype='text/html',
headers={'Content-Disposition': f'attachment; filename={filename}'}
)
@app.route('/download/pdf', methods=['POST'])
def download_pdf():
"""PDF ㅼ대"""
try:
from weasyprint import HTML
html_content = request.form.get('html', '')
if not html_content:
return "No content", 400
pdf_buffer = io.BytesIO()
HTML(string=html_content).write_pdf(pdf_buffer)
pdf_buffer.seek(0)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'report_{timestamp}.pdf'
return Response(
pdf_buffer.getvalue(),
mimetype='application/pdf',
headers={'Content-Disposition': f'attachment; filename={filename}'}
)
except ImportError:
return jsonify({'error': 'PDF 蹂몄고 API ==============
@app.route('/assets/<path:filename>')
def serve_assets(filename):
"""濡而 assets
"""
assets_dir = os.environ.get("ASSETS_BASE_PATH", "/tmp/assets")
return send_file(os.path.join(assets_dir, filename))
@app.route('/hwp-script')
def hwp_script():
"""HWP 蹂ㅽщ """
return render_template('hwp_guide.html')
@app.route('/health')
def health():
"""ъ 泥댄"""
return jsonify({'status': 'healthy', 'version': '2.0.0'})
@app.route('/export-hwp', methods=['POST'])
def export_hwp():
"""HWP 蹂ㅽ
댁⑹듬'}), 400
temp_dir = tempfile.gettempdir()
html_path = os.path.join(temp_dir, 'geulbeot_temp.html')
hwp_path = os.path.join(temp_dir, 'geulbeot_output.hwp')
with open(html_path, 'w', encoding='utf-8') as f:
f.write(html_content)
# 蹂
ъ щ
if use_style_grouping:
final_path = converter.convert_with_styles(html_path, hwp_path)
# HWPX
return send_file(
final_path,
as_attachment=True,
download_name=f'report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.hwpx',
mimetype='application/vnd.hancom.hwpx'
)
else:
converter.convert(html_path, hwp_path)
return send_file(
hwp_path,
as_attachment=True,
download_name=f'report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.hwp',
mimetype='application/x-hwp'
)
except ImportError as e:
return jsonify({'error': f'pyhwpx
: {str(e)}'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
# 湲곗〈 add_doc_type 泥
@app.route('/api/doc-types/analyze', methods=['POST'])
def analyze_doc_type():
"""臾몄
"}), 400
file = request.files['file']
doc_name = request.form.get('name', ' 臾몄
')
#
import tempfile
temp_path = os.path.join(tempfile.gettempdir(), file.filename)
file.save(temp_path)
try:
analyzer = DocTypeAnalyzer()
result = analyzer.analyze(temp_path, doc_name)
return jsonify({
"success": True,
"config": result["config"],
"summary": {
"pageCount": result["structure"]["pageCount"],
"sections": len(result["toc"]),
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
os.remove(temp_path)
@app.route('/analyze-styles', methods=['POST'])
def analyze_styles():
"""HTML
@app.route('/anayze-styles', methods=['POST'])
def analyze_style
try:
data = request.get_json()
html_content = data.get('html', '')
if not html_content:
return jsonify({'error': 'HTML 댁⑹듬'}), 400
from converters.style_analyzer import StyleAnalyzer
from converters.hwp_style_mapping import ROLE_TO_STYLE_NAME
analyzer = StyleAnalyzer()
elements = analyzer.analyze(html_content)
# 蹂
summary = analyzer.get_role_summary()
# elem in elements[:50]:
( 50媛留)
details = []
for elem in elements[:50]:
details.append({
'role': elem.role,
'hwp_style': ROLE_TO_STYLE_NAME.get(elem.role, '諛湲'),
'text': elem.text[:50] + ('...' if len(elem.text) > 50 else ''),
'section': elem.section
})
return jsonify({
'total_elements': len(elements),
'summary': summary,
'details': details
})
except Exception as e:
import traceback
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
@app.route('/templates', methods=['GET'])
def get_templates():
"""
由 紐⑸"""
try:
templates = template_mgr.list_templates()
return jsonify(templates)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/templates', methods=['GET'])
def get_templates_api():
"""
由 紐⑸ (API 寃쎈)"""
try:
templates = template_mgr.list_templates()
return jsonify(templates)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/analyze-template', methods=['POST'])
def analyze_template():
"""
由 異異 諛 (doc_template_analyzer template_manager)"""
try:
if 'file' not in request.files:
return jsonify({'error': ''}), 400
file = request.files['file']
name = request.form.get('name', '').strip()
if not name:
return jsonify({'error': '
ν댁
'}), 400
if not file.filename:
return jsonify({'error': '
'}), 400
#
HWPX
異異
temp_dir = tempfile.gettempdir()
temp_path = os.path.join(temp_dir, file.filename)
file.save(temp_path)
try:
# v3
ъ (HWPX parsed dict)
from handlers.doc.doc_type_analyzer import DocTypeAnalyzer
parser = DocTypeAnalyzer()
parsed = parser._parse_hwpx(temp_path)
# template_manager濡 異異+
result = template_mgr.extract_and_save(
parsed, name,
source_file=file.filename
)
return jsonify(result)
finally:
try:
os.remove(temp_path)
except:
pass
except Exception as e:
import traceback
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
# ============== 臾몄
ㅽ몃)
ㅼ媛
"""
import tempfile
# 곗
寃利듬'}), 400
file = request.files['file']
name = request.form.get('name', '').strip()
description = request.form.get('description', '').strip()
if not name:
return jsonify({'error': '臾몄
ν댁<
'}), 400
if not file.filename:
return jsonify({'error': '
댁<
'}), 400
#
temp_dir = tempfile.gettempdir()
temp_path = os.path.join(temp_dir, file.filename)
file.save(temp_path)
# 硫吏
message_queue = queue.Queue()
analysis_result = {"data": None, "error": None}
def progress_callback(step_id, status, message):
"""
肄諛 - 硫吏 """
message_queue.put({
"type": "progress",
"step": step_id,
"status": status,
"message": message
})
def run_analysis():
"""
(
)"""
try:
analyzer = DocTypeAnalyzer(progress_callback=progress_callback)
result = analyzer.analyze(temp_path, name, description)
#
save_path = analyzer.save_doc_type(result["config"], result.get("template", "") )
analysis_result["data"] = {
"success": True,
"config": result["config"],
"layout": result.get("layout", {}),
"context": result.get("context", {}),
"structure": result.get("structure", {}),
"template_generated": bool(result.get("template_id") or result.get("template")),
"template_id": result.get("template_id"), #
異媛
"saved_path": save_path
}
except Exception as e:
import traceback
analysis_result["error"] = {
"message": str(e),
"trace": traceback.format_exc()
}
finally:
#
message_queue.put({"type": "complete"})
#
try:
os.remove(temp_path)
except:
pass
def generate_events():
"""SSE
깃린"""
# 遺
analysis_thread = threading.Thread(target=run_analysis)
analysis_thread.start()
# ㅽ몃━諛
while True:
try:
msg = message_queue.get(timeout=60) # 60珥
if msg["type"] == "complete":
# 遺
if analysis_result["error"]:
yield f"data: {json.dumps({'type': 'error', 'error': analysis_result['error']}, ensure_ascii=False)}\n\n"
else:
yield f"data: {json.dumps({'type': 'result', 'data': analysis_result['data']}, ensure_ascii=False)}\n\n"
break
else:
# 吏
yield f"data: {json.dumps(msg, ensure_ascii=False)}\n\n"
except queue.Empty:
#
yield f"data: {json.dumps({'type': 'error', 'error': {'message': '
珥怨'}}, ensure_ascii=False)}\n\n"
break
return Response(
generate_events(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
}
)
@app.route('/delete-template/<template_id>', methods=['DELETE'])
def delete_template(template_id):
"""
(嫄곗
議고"""
try:
result = template_mgr.load_template(tpl_id)
if 'error' in result:
return jsonify(result), 404
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/templates/<tpl_id>', methods=['DELETE'])
def delete_template_new(tpl_id):
"""
"""
try:
result = template_mgr.delete_template(tpl_id)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/doc-types/<type_id>/template', methods=['PUT'])
def change_doc_type_template(type_id):
"""臾몄
"""
try:
data = request.get_json()
new_tpl_id = data.get('template_id')
if not new_tpl_id:
return jsonify({'error': 'template_id媛
'}), 400
result = template_mgr.change_template(type_id, new_tpl_id)
if 'error' in result:
return jsonify(result), 400
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/doc-types/<type_id>/template', methods=['GET'])
def get_doc_type_template(type_id):
"""臾몄
곌껐
議고"""
try:
result = template_mgr.get_template_for_doctype(type_id)
if 'error' in result:
return jsonify(result), 404
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5000))
debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true'
app.run(host='0.0.0.0', port=port, debug=debug)