Files
test-mcp/server.py
Taehoon 4a995c11f4 feat: MySQL DB 정규화(Master/History) 및 시계열 데이터 수집 시스템 통합
1. 마스터/히스토리 테이블 분리 및 마이그레이션 완료\n2. 날짜별 데이터 축적 및 대시보드 필터링 기능 추가\n3. Playwright 수집 로직(날짜필터, 좌표클릭, 정밀합산) 완전 복구
2026-03-10 16:24:13 +09:00

168 lines
5.6 KiB
Python

import os
import sys
# 한글 환경 및 Tesseract 경로 강제 설정
os.environ["PYTHONIOENCODING"] = "utf-8"
os.environ["TESSDATA_PREFIX"] = r"C:\Users\User\AppData\Local\Programs\Tesseract-OCR\tessdata"
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from analyze import analyze_file_content
from crawler_service import run_crawler_service
import asyncio
from fastapi import Request
app = FastAPI(title="Project Master Overseas API")
templates = Jinja2Templates(directory="templates")
# --- 유틸리티: 동기 함수를 스레드 풀에서 실행 ---
async def run_in_threadpool(func, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args)
# 정적 파일 및 미들웨어 설정
app.mount("/style", StaticFiles(directory="style"), name="style")
app.mount("/js", StaticFiles(directory="js"), name="js")
app.mount("/sample_files", StaticFiles(directory="sample"), name="sample_files")
@app.get("/sample.png")
async def get_sample_img():
return FileResponse("sample.png")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# --- HTML 라우팅 ---
import pymysql
def get_db_connection():
return pymysql.connect(
host='localhost',
user='root',
password='45278434',
database='crawling',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
@app.get("/available-dates")
async def get_available_dates():
"""
히스토리 테이블에서 유니크한 크롤링 날짜 목록을 반환
"""
try:
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("SELECT DISTINCT crawl_date FROM projects_history ORDER BY crawl_date DESC")
rows = cursor.fetchall()
dates = [row['crawl_date'].strftime("%Y.%m.%d") for row in rows if row['crawl_date']]
return dates
finally:
conn.close()
except Exception as e:
return {"error": str(e)}
@app.get("/project-data")
async def get_project_data(date: str = None):
"""
특정 날짜의 데이터를 JOIN하여 반환
"""
try:
conn = get_db_connection()
try:
with conn.cursor() as cursor:
if not date or date == "-":
cursor.execute("SELECT MAX(crawl_date) as last_date FROM projects_history")
target_date_row = cursor.fetchone()
target_date = target_date_row['last_date']
else:
target_date = date.replace(".", "-")
if not target_date:
return {"projects": [], "last_updated": "-"}
# 마스터 정보와 히스토리 정보를 JOIN
sql = """
SELECT m.project_nm, m.short_nm, m.department, m.master,
h.recent_log, h.file_count, m.continent, m.country
FROM projects_master m
JOIN projects_history h ON m.project_id = h.project_id
WHERE h.crawl_date = %s
ORDER BY m.project_id ASC
"""
cursor.execute(sql, (target_date,))
rows = cursor.fetchall()
projects = []
for row in rows:
display_name = row['short_nm'] if row['short_nm'] and row['short_nm'].strip() else row['project_nm']
projects.append([
display_name,
row['department'],
row['master'],
row['recent_log'],
row['file_count'],
row['continent'],
row['country']
])
return {"projects": projects, "last_updated": target_date.strftime("%Y.%m.%d") if hasattr(target_date, 'strftime') else str(target_date).replace("-", ".")}
finally:
conn.close()
except Exception as e:
return {"error": str(e)}
@app.get("/")
async def root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/dashboard")
async def get_dashboard(request: Request):
return templates.TemplateResponse("dashboard.html", {"request": request})
@app.get("/mailTest")
@app.get("/mailTest.html")
async def get_mail_test(request: Request):
return templates.TemplateResponse("mailTest.html", {"request": request})
# --- 데이터 API ---
@app.get("/attachments")
async def get_attachments():
sample_path = "sample"
if not os.path.exists(sample_path):
os.makedirs(sample_path)
files = []
for f in os.listdir(sample_path):
f_path = os.path.join(sample_path, f)
if os.path.isfile(f_path):
files.append({
"name": f,
"size": f"{os.path.getsize(f_path) / 1024:.1f} KB"
})
return files
@app.get("/analyze-file")
async def analyze_file(filename: str):
"""
분석 서비스(analyze.py) 호출 - 스레드 풀에서 비차단 방식으로 실행
"""
return await run_in_threadpool(analyze_file_content, filename)
@app.get("/sync")
async def sync_data():
"""
크롤링 서비스(crawler_service.py) 호출
"""
print(">>> /sync request received")
return StreamingResponse(run_crawler_service(), media_type="text_event-stream")