Files
test-mcp/server.py

268 lines
10 KiB
Python

import os
import sys
import re
import asyncio
import pymysql
from datetime import datetime
from pydantic import BaseModel
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from analyze import analyze_file_content
from crawler_service import run_crawler_service, crawl_stop_event
from sql_queries import InquiryQueries, DashboardQueries
# --- 환경 설정 ---
os.environ["PYTHONIOENCODING"] = "utf-8"
# Tesseract 경로는 환경에 따라 다를 수 있으므로 환경변수 우선 사용 권장
TESSDATA_PREFIX = os.getenv("TESSDATA_PREFIX", r"C:\Users\User\AppData\Local\Programs\Tesseract-OCR\tessdata")
os.environ["TESSDATA_PREFIX"] = TESSDATA_PREFIX
app = FastAPI(title="Project Master Overseas API")
templates = Jinja2Templates(directory="templates")
# 정적 파일 마운트
app.mount("/style", StaticFiles(directory="style"), name="style")
app.mount("/js", StaticFiles(directory="js"), name="js")
app.mount("/sample_files", StaticFiles(directory="sample"), name="sample_files")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# --- 데이터 모델 ---
class AuthRequest(BaseModel):
user_id: str
password: str
# --- 유틸리티 함수 ---
def get_db_connection():
"""MySQL 데이터베이스 연결을 반환 (환경변수 기반)"""
return pymysql.connect(
host=os.getenv('DB_HOST', 'localhost'),
user=os.getenv('DB_USER', 'root'),
password=os.getenv('DB_PASSWORD', '45278434'),
database=os.getenv('DB_NAME', 'PM_proto'),
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
async def run_in_threadpool(func, *args):
"""동기 함수를 비차단 방식으로 실행"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args)
# --- HTML 라우팅 ---
@app.get("/")
async def root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/dashboard")
async def get_dashboard(request: Request):
return templates.TemplateResponse("dashboard.html", {"request": request})
@app.get("/mailTest")
async def get_mail_test(request: Request):
return templates.TemplateResponse("mailTest.html", {"request": request})
@app.get("/inquiries")
async def get_inquiries_page(request: Request):
return templates.TemplateResponse("inquiries.html", {"request": request})
class InquiryReplyRequest(BaseModel):
reply: str
status: str
handler: str
# --- 문의사항 API ---
@app.get("/api/inquiries")
async def get_inquiries(pm_type: str = None, category: str = None, status: str = None, keyword: str = None):
# ... (existing code)
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
sql = InquiryQueries.SELECT_BASE
params = []
if pm_type:
sql += " AND pm_type = %s"
params.append(pm_type)
if category:
sql += " AND category = %s"
params.append(category)
if status:
sql += " AND status = %s"
params.append(status)
if keyword:
sql += " AND (content LIKE %s OR author LIKE %s OR project_nm LIKE %s)"
params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"])
sql += f" {InquiryQueries.ORDER_BY_DESC}"
cursor.execute(sql, params)
return cursor.fetchall()
except Exception as e:
return {"error": str(e)}
@app.get("/api/inquiries/{id}")
async def get_inquiry_detail(id: int):
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(InquiryQueries.SELECT_BY_ID, (id,))
return cursor.fetchone()
except Exception as e:
return {"error": str(e)}
@app.post("/api/inquiries/{id}/reply")
async def update_inquiry_reply(id: int, req: InquiryReplyRequest):
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
handled_date = datetime.now().strftime("%Y.%m.%d")
cursor.execute(InquiryQueries.UPDATE_REPLY, (req.reply, req.status, req.handler, handled_date, id))
conn.commit()
return {"success": True}
except Exception as e:
return {"error": str(e)}
@app.delete("/api/inquiries/{id}/reply")
async def delete_inquiry_reply(id: int):
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(InquiryQueries.DELETE_REPLY, (id,))
conn.commit()
return {"success": True}
except Exception as e:
return {"error": str(e)}
# --- 분석 및 수집 API ---
@app.get("/available-dates")
async def get_available_dates():
"""히스토리 날짜 목록 반환"""
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(DashboardQueries.GET_AVAILABLE_DATES)
rows = cursor.fetchall()
return [row['crawl_date'].strftime("%Y.%m.%d") for row in rows if row['crawl_date']]
except Exception as e:
return {"error": str(e)}
@app.get("/project-data")
async def get_project_data(date: str = None):
"""특정 날짜의 프로젝트 정보 JOIN 반환"""
try:
target_date = date.replace(".", "-") if date and date != "-" else None
with get_db_connection() as conn:
with conn.cursor() as cursor:
if not target_date:
cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE)
res = cursor.fetchone()
target_date = res['last_date']
if not target_date: return {"projects": []}
cursor.execute(DashboardQueries.GET_PROJECT_LIST, (target_date,))
rows = cursor.fetchall()
projects = []
for r in rows:
name = r['short_nm'] if r['short_nm'] and r['short_nm'].strip() else r['project_nm']
projects.append([name, r['department'], r['master'], r['recent_log'], r['file_count'], r['continent'], r['country']])
return {"projects": projects}
except Exception as e:
return {"error": str(e)}
@app.get("/project-activity")
async def get_project_activity(date: str = None):
"""활성도 분석 API"""
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
if not date or date == "-":
cursor.execute(DashboardQueries.GET_LAST_CRAWL_DATE)
res = cursor.fetchone()
target_date_val = res['last_date'] if res['last_date'] else datetime.now().date()
else:
target_date_val = datetime.strptime(date.replace(".", "-"), "%Y-%m-%d").date()
target_date_dt = datetime.combine(target_date_val, datetime.min.time())
# 아코디언 리스트와 동일하게 마스터의 모든 프로젝트를 가져오되, 해당 날짜의 히스토리를 매칭
cursor.execute(DashboardQueries.GET_PROJECT_LIST_FOR_ANALYSIS, (target_date_val,))
rows = cursor.fetchall()
analysis = {"summary": {"active": 0, "warning": 0, "stale": 0, "unknown": 0}, "details": []}
for r in rows:
log, files = r['recent_log'], r['file_count']
status, days = "unknown", 999
# 파일 수 정수 변환 (데이터가 없거나 0이면 0)
file_val = int(files) if files else 0
has_log = log and log != "데이터 없음" and log != "X"
if file_val == 0:
# [핵심] 파일이 0개면 무조건 "데이터 없음"
status = "unknown"
elif has_log:
if "폴더자동삭제" in log.replace(" ", ""):
# [추가] 폴더 자동 삭제인 경우 날짜 상관없이 무조건 "방치"
status = "stale"
days = 999
else:
# 로그 날짜가 있는 경우 정밀 분석
match = re.search(r'(\d{4})\.(\d{2})\.(\d{2})', log)
if match:
diff = (target_date_dt - datetime.strptime(match.group(0), "%Y.%m.%d")).days
status = "active" if diff <= 7 else "warning" if diff <= 14 else "stale"
days = diff
else:
status = "stale"
else:
# 파일은 있지만 로그가 없는 경우
status = "stale"
analysis["summary"][status] += 1
analysis["details"].append({"name": r['short_nm'] or r['project_nm'], "status": status, "days_ago": days})
return analysis
except Exception as e:
return {"error": str(e)}
@app.post("/auth/crawl")
async def auth_crawl(req: AuthRequest):
"""크롤링 인증"""
if req.user_id == os.getenv("PM_USER_ID") and req.password == os.getenv("PM_PASSWORD"):
return {"success": True}
return {"success": False, "message": "크롤링을 할 수 없습니다."}
@app.get("/sync")
async def sync_data():
return StreamingResponse(run_crawler_service(), media_type="text_event-stream")
@app.get("/stop-sync")
async def stop_sync():
crawl_stop_event.set()
return {"success": True}
@app.get("/attachments")
async def get_attachments():
path = "sample"
if not os.path.exists(path): os.makedirs(path)
return [{"name": f, "size": f"{os.path.getsize(os.path.join(path, f))/1024:.1f} KB"}
for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
@app.get("/analyze-file")
async def analyze_file(filename: str):
return await run_in_threadpool(analyze_file_content, filename)
@app.get("/sample.png")
async def get_sample_img():
return FileResponse("sample.png")