Files
test-mcp/server.py
Taehoon 600c54c1f0 feat: 크롤러 부서 정보 수집 추가 및 대시보드 데이터 정확도 개선
- getData API 가로채기 기능을 통한 부서(department) 자동 수집 구현
- 파일 0개 기준의 "데이터 없음" 분류 로직 최적화 (LEFT JOIN 적용)
- 관리자 권한 인증 모달 스타일 복구 및 UI 정밀 조정
- 중복 등록 프로젝트(sm-25-032-phlinfra) DB 정리 및 테스트 파일 삭제
2026-03-11 17:52:12 +09:00

204 lines
8.2 KiB
Python

import os
import sys
import re
import asyncio
import pymysql
from datetime import datetime
from pydantic import BaseModel
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from analyze import analyze_file_content
from crawler_service import run_crawler_service, crawl_stop_event
# --- 환경 설정 ---
os.environ["PYTHONIOENCODING"] = "utf-8"
# Tesseract 경로는 환경에 따라 다를 수 있으므로 환경변수 우선 사용 권장
TESSDATA_PREFIX = os.getenv("TESSDATA_PREFIX", r"C:\Users\User\AppData\Local\Programs\Tesseract-OCR\tessdata")
os.environ["TESSDATA_PREFIX"] = TESSDATA_PREFIX
app = FastAPI(title="Project Master Overseas API")
templates = Jinja2Templates(directory="templates")
# 정적 파일 마운트
app.mount("/style", StaticFiles(directory="style"), name="style")
app.mount("/js", StaticFiles(directory="js"), name="js")
app.mount("/sample_files", StaticFiles(directory="sample"), name="sample_files")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# --- 데이터 모델 ---
class AuthRequest(BaseModel):
user_id: str
password: str
# --- 유틸리티 함수 ---
def get_db_connection():
"""MySQL 데이터베이스 연결을 반환 (환경변수 기반)"""
return pymysql.connect(
host=os.getenv('DB_HOST', 'localhost'),
user=os.getenv('DB_USER', 'root'),
password=os.getenv('DB_PASSWORD', '45278434'),
database=os.getenv('DB_NAME', 'PM_proto'),
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
async def run_in_threadpool(func, *args):
"""동기 함수를 비차단 방식으로 실행"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args)
# --- HTML 라우팅 ---
@app.get("/")
async def root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/dashboard")
async def get_dashboard(request: Request):
return templates.TemplateResponse("dashboard.html", {"request": request})
@app.get("/mailTest")
async def get_mail_test(request: Request):
return templates.TemplateResponse("mailTest.html", {"request": request})
# --- 분석 및 수집 API ---
@app.get("/available-dates")
async def get_available_dates():
"""히스토리 날짜 목록 반환"""
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT DISTINCT crawl_date FROM projects_history ORDER BY crawl_date DESC")
rows = cursor.fetchall()
return [row['crawl_date'].strftime("%Y.%m.%d") for row in rows if row['crawl_date']]
except Exception as e:
return {"error": str(e)}
@app.get("/project-data")
async def get_project_data(date: str = None):
"""특정 날짜의 프로젝트 정보 JOIN 반환"""
try:
target_date = date.replace(".", "-") if date and date != "-" else None
with get_db_connection() as conn:
with conn.cursor() as cursor:
if not target_date:
cursor.execute("SELECT MAX(crawl_date) as last_date FROM projects_history")
res = cursor.fetchone()
target_date = res['last_date']
if not target_date: return {"projects": []}
sql = """
SELECT m.project_nm, m.short_nm, m.department, m.master,
h.recent_log, h.file_count, m.continent, m.country
FROM projects_master m
LEFT JOIN projects_history h ON m.project_id = h.project_id AND h.crawl_date = %s
ORDER BY m.project_id ASC
"""
cursor.execute(sql, (target_date,))
rows = cursor.fetchall()
projects = []
for r in rows:
name = r['short_nm'] if r['short_nm'] and r['short_nm'].strip() else r['project_nm']
projects.append([name, r['department'], r['master'], r['recent_log'], r['file_count'], r['continent'], r['country']])
return {"projects": projects}
except Exception as e:
return {"error": str(e)}
@app.get("/project-activity")
async def get_project_activity(date: str = None):
"""활성도 분석 API"""
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
if not date or date == "-":
cursor.execute("SELECT MAX(crawl_date) as last_date FROM projects_history")
res = cursor.fetchone()
target_date_val = res['last_date'] if res['last_date'] else datetime.now().date()
else:
target_date_val = datetime.strptime(date.replace(".", "-"), "%Y-%m-%d").date()
target_date_dt = datetime.combine(target_date_val, datetime.min.time())
# 아코디언 리스트와 동일하게 마스터의 모든 프로젝트를 가져오되, 해당 날짜의 히스토리를 매칭
sql = """
SELECT m.project_id, m.project_nm, m.short_nm, h.recent_log, h.file_count
FROM projects_master m
LEFT JOIN projects_history h ON m.project_id = h.project_id AND h.crawl_date = %s
"""
cursor.execute(sql, (target_date_val,))
rows = cursor.fetchall()
analysis = {"summary": {"active": 0, "warning": 0, "stale": 0, "unknown": 0}, "details": []}
for r in rows:
log, files = r['recent_log'], r['file_count']
status, days = "unknown", 999
# 파일 수 정수 변환 (데이터가 없거나 0이면 0)
file_val = int(files) if files else 0
has_log = log and log != "데이터 없음" and log != "X"
if file_val == 0:
# [핵심] 파일이 0개면 무조건 "데이터 없음"
status = "unknown"
elif has_log:
# 로그 날짜가 있는 경우 정밀 분석
match = re.search(r'(\d{4})\.(\d{2})\.(\d{2})', log)
if match:
diff = (target_date_dt - datetime.strptime(match.group(0), "%Y.%m.%d")).days
status = "active" if diff <= 7 else "warning" if diff <= 14 else "stale"
days = diff
else:
status = "stale"
else:
# 파일은 있지만 로그가 없는 경우
status = "stale"
analysis["summary"][status] += 1
analysis["details"].append({"name": r['short_nm'] or r['project_nm'], "status": status, "days_ago": days})
return analysis
except Exception as e:
return {"error": str(e)}
@app.post("/auth/crawl")
async def auth_crawl(req: AuthRequest):
"""크롤링 인증"""
if req.user_id == os.getenv("PM_USER_ID") and req.password == os.getenv("PM_PASSWORD"):
return {"success": True}
return {"success": False, "message": "크롤링을 할 수 없습니다."}
@app.get("/sync")
async def sync_data():
return StreamingResponse(run_crawler_service(), media_type="text_event-stream")
@app.get("/stop-sync")
async def stop_sync():
crawl_stop_event.set()
return {"success": True}
@app.get("/attachments")
async def get_attachments():
path = "sample"
if not os.path.exists(path): os.makedirs(path)
return [{"name": f, "size": f"{os.path.getsize(os.path.join(path, f))/1024:.1f} KB"}
for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
@app.get("/analyze-file")
async def analyze_file(filename: str):
return await run_in_threadpool(analyze_file_content, filename)
@app.get("/sample.png")
async def get_sample_img():
return FileResponse("sample.png")