Files
2025-02-14 12:13:05 +09:00

96 lines
2.8 KiB
Python

import os
import shutil
from pathlib import Path
from typing import List
from config import OUTPUT_DIR, UPLOAD_DIR
from fastapi import FastAPI, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from redis_client import redis_client
from rq import Queue
from worker import process_task
# RQ 작업 큐 생성
task_queue = Queue("task_queue1", connection=redis_client)
app = FastAPI()
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 모든 오리진 허용
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 업로드 및 출력 디렉토리 생성
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
@app.post("/upload/")
async def upload_directory(files: List[UploadFile]):
"""사용자가 업로드한 파일들을 UPLOAD_DIR에 저장"""
uploaded_files = []
for file in files:
file_path = Path(UPLOAD_DIR) / file.filename
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
uploaded_files.append(file.filename)
return {"message": "파일 업로드 성공", "files": uploaded_files}
@app.get("/convert/")
async def convert_files():
"""업로드된 파일을 변환 큐(RQ)에 등록하고 Job ID 반환"""
files = os.listdir(UPLOAD_DIR)
enqueued_tasks = []
for file in files:
file_ext = file.split(".")[-1].lower()
if file_ext in [
"txt",
"html",
"pdf",
"hwp",
"pptx",
"xlsx",
"docx",
]:
task = {
"filename": file,
"extension": file_ext,
"input_path": os.path.join(UPLOAD_DIR, file),
"output_path": os.path.join(OUTPUT_DIR, f"{Path(file).stem}.md"),
}
job = task_queue.enqueue(process_task, task) # RQ에 작업 등록
enqueued_tasks.append({"task": task, "job_id": job.id})
return {"message": "작업이 큐에 추가되었습니다.", "tasks": enqueued_tasks}
@app.get("/task/{job_id}")
async def get_task_status(job_id: str):
"""RQ에서 특정 Job ID의 상태 확인"""
job = task_queue.fetch_job(job_id)
if not job:
return {"error": "존재하지 않는 작업입니다."}
return {
"job_id": job.id,
"status": job.get_status(),
"result": job.result,
"enqueued_at": str(job.enqueued_at),
"ended_at": str(job.ended_at) if job.ended_at else None,
}
@app.get("/download/{filename}")
async def download_file(filename: str):
"""변환된 Markdown 파일을 다운로드합니다."""
file_path = Path(OUTPUT_DIR) / filename
if not file_path.exists():
return {"error": "파일이 존재하지 않습니다."}
return file_path.read_text(encoding="utf-8")