Files
test-mcp/server.py

187 lines
7.4 KiB
Python

import os
import sys
import re
import asyncio
import pymysql
from datetime import datetime
from pydantic import BaseModel
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from analyze import analyze_file_content
from crawler_service import run_crawler_service, crawl_stop_event
# --- 환경 설정 ---
os.environ["PYTHONIOENCODING"] = "utf-8"
# Tesseract 경로는 환경에 따라 다를 수 있으므로 환경변수 우선 사용 권장
TESSDATA_PREFIX = os.getenv("TESSDATA_PREFIX", r"C:\Users\User\AppData\Local\Programs\Tesseract-OCR\tessdata")
os.environ["TESSDATA_PREFIX"] = TESSDATA_PREFIX
app = FastAPI(title="Project Master Overseas API")
templates = Jinja2Templates(directory="templates")
# 정적 파일 마운트
app.mount("/style", StaticFiles(directory="style"), name="style")
app.mount("/js", StaticFiles(directory="js"), name="js")
app.mount("/sample_files", StaticFiles(directory="sample"), name="sample_files")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# --- 데이터 모델 ---
class AuthRequest(BaseModel):
user_id: str
password: str
# --- 유틸리티 함수 ---
def get_db_connection():
"""MySQL 데이터베이스 연결을 반환 (환경변수 기반)"""
return pymysql.connect(
host=os.getenv('DB_HOST', 'localhost'),
user=os.getenv('DB_USER', 'root'),
password=os.getenv('DB_PASSWORD', '45278434'),
database=os.getenv('DB_NAME', 'PM_proto'),
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
async def run_in_threadpool(func, *args):
"""동기 함수를 비차단 방식으로 실행"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args)
# --- HTML 라우팅 ---
@app.get("/")
async def root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/dashboard")
async def get_dashboard(request: Request):
return templates.TemplateResponse("dashboard.html", {"request": request})
@app.get("/mailTest")
async def get_mail_test(request: Request):
return templates.TemplateResponse("mailTest.html", {"request": request})
# --- 분석 및 수집 API ---
@app.get("/available-dates")
async def get_available_dates():
"""히스토리 날짜 목록 반환"""
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT DISTINCT crawl_date FROM projects_history ORDER BY crawl_date DESC")
rows = cursor.fetchall()
return [row['crawl_date'].strftime("%Y.%m.%d") for row in rows if row['crawl_date']]
except Exception as e:
return {"error": str(e)}
@app.get("/project-data")
async def get_project_data(date: str = None):
"""특정 날짜의 프로젝트 정보 JOIN 반환"""
try:
target_date = date.replace(".", "-") if date and date != "-" else None
with get_db_connection() as conn:
with conn.cursor() as cursor:
if not target_date:
cursor.execute("SELECT MAX(crawl_date) as last_date FROM projects_history")
res = cursor.fetchone()
target_date = res['last_date']
if not target_date: return {"projects": []}
sql = """
SELECT m.project_nm, m.short_nm, m.department, m.master,
h.recent_log, h.file_count, m.continent, m.country
FROM projects_master m
JOIN projects_history h ON m.project_id = h.project_id
WHERE h.crawl_date = %s ORDER BY m.project_id ASC
"""
cursor.execute(sql, (target_date,))
rows = cursor.fetchall()
projects = []
for r in rows:
name = r['short_nm'] if r['short_nm'] and r['short_nm'].strip() else r['project_nm']
projects.append([name, r['department'], r['master'], r['recent_log'], r['file_count'], r['continent'], r['country']])
return {"projects": projects}
except Exception as e:
return {"error": str(e)}
@app.get("/project-activity")
async def get_project_activity(date: str = None):
"""활성도 분석 API"""
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
if not date or date == "-":
cursor.execute("SELECT MAX(crawl_date) as last_date FROM projects_history")
res = cursor.fetchone()
target_date_val = res['last_date'] if res['last_date'] else datetime.now().date()
else:
target_date_val = datetime.strptime(date.replace(".", "-"), "%Y-%m-%d").date()
target_date_dt = datetime.combine(target_date_val, datetime.min.time())
sql = """
SELECT m.project_id, m.project_nm, m.short_nm, h.recent_log, h.file_count
FROM projects_master m
LEFT JOIN projects_history h ON m.project_id = h.project_id AND h.crawl_date = %s
"""
cursor.execute(sql, (target_date_val,))
rows = cursor.fetchall()
analysis = {"summary": {"active": 0, "warning": 0, "stale": 0, "unknown": 0}, "details": []}
for r in rows:
log, files = r['recent_log'], r['file_count']
status, days = "unknown", 999
if log and log != "데이터 없음" and files and files > 0:
match = re.search(r'(\d{4})\.(\d{2})\.(\d{2})', log)
if match:
diff = (target_date_dt - datetime.strptime(match.group(0), "%Y.%m.%d")).days
status = "active" if diff <= 7 else "warning" if diff <= 14 else "stale"
days = diff
analysis["summary"][status] += 1
analysis["details"].append({"name": r['short_nm'] or r['project_nm'], "status": status, "days_ago": days})
return analysis
except Exception as e:
return {"error": str(e)}
@app.post("/auth/crawl")
async def auth_crawl(req: AuthRequest):
"""크롤링 인증"""
if req.user_id == os.getenv("PM_USER_ID") and req.password == os.getenv("PM_PASSWORD"):
return {"success": True}
return {"success": False, "message": "크롤링을 할 수 없습니다."}
@app.get("/sync")
async def sync_data():
return StreamingResponse(run_crawler_service(), media_type="text_event-stream")
@app.get("/stop-sync")
async def stop_sync():
crawl_stop_event.set()
return {"success": True}
@app.get("/attachments")
async def get_attachments():
path = "sample"
if not os.path.exists(path): os.makedirs(path)
return [{"name": f, "size": f"{os.path.getsize(os.path.join(path, f))/1024:.1f} KB"}
for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
@app.get("/analyze-file")
async def analyze_file(filename: str):
return await run_in_threadpool(analyze_file_content, filename)
@app.get("/sample.png")
async def get_sample_img():
return FileResponse("sample.png")