236 lines
13 KiB
Python
236 lines
13 KiB
Python
import os
|
|
import re
|
|
import asyncio
|
|
import json
|
|
import traceback
|
|
from fastapi import FastAPI
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import StreamingResponse, FileResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from playwright.async_api import async_playwright
|
|
from dotenv import load_dotenv
|
|
from analyze import analyze_file_content
|
|
|
|
load_dotenv()
|
|
|
|
app = FastAPI()
|
|
|
|
# Mount static files (css, images etc)
|
|
app.mount("/style", StaticFiles(directory="style"), name="style")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=False,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
@app.get("/dashboard")
|
|
async def get_dashboard():
|
|
return FileResponse("dashboard.html")
|
|
|
|
@app.get("/mailTest")
|
|
async def get_mail_test():
|
|
return FileResponse("mailTest.html")
|
|
|
|
@app.get("/attachments")
|
|
async def get_attachments():
|
|
sample_path = "sample"
|
|
if not os.path.exists(sample_path):
|
|
os.makedirs(sample_path)
|
|
files = []
|
|
for f in os.listdir(sample_path):
|
|
f_path = os.path.join(sample_path, f)
|
|
if os.path.isfile(f_path):
|
|
files.append({
|
|
"name": f,
|
|
"size": f"{os.path.getsize(f_path) / 1024:.1f} KB"
|
|
})
|
|
return files
|
|
|
|
@app.get("/analyze-file")
|
|
async def analyze_file(filename: str):
|
|
return analyze_file_content(filename)
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return FileResponse("index.html")
|
|
|
|
@app.get("/sync")
|
|
async def sync_data():
|
|
async def event_generator():
|
|
user_id = os.getenv("PM_USER_ID")
|
|
password = os.getenv("PM_PASSWORD")
|
|
|
|
if not user_id or not password:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': '오류: .env 파일에 계정 정보가 없습니다.'})}\n\n"
|
|
return
|
|
|
|
results = []
|
|
|
|
async with async_playwright() as p:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': '브라우저 실행 중...'})}\n\n"
|
|
browser = await p.chromium.launch(headless=True, args=[
|
|
"--no-sandbox",
|
|
"--disable-dev-shm-usage",
|
|
"--disable-blink-features=AutomationControlled"
|
|
])
|
|
context = await browser.new_context(
|
|
viewport={'width': 1920, 'height': 1080},
|
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
|
|
)
|
|
page = await context.new_page()
|
|
|
|
try:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': '사이트 접속 및 로그인 중...'})}\n\n"
|
|
await page.goto("https://overseas.projectmastercloud.com/", wait_until="domcontentloaded")
|
|
|
|
await page.click("#login-by-id", timeout=10000)
|
|
await page.fill("#user_id", user_id)
|
|
await page.fill("#user_pw", password)
|
|
await page.click("#login-btn")
|
|
|
|
yield f"data: {json.dumps({'type': 'log', 'message': '대시보드 목록 대기 중...'})}\n\n"
|
|
await page.wait_for_selector("h4.list__contents_aria_group_body_list_item_label", timeout=60000)
|
|
|
|
locators = page.locator("h4.list__contents_aria_group_body_list_item_label")
|
|
count = await locators.count()
|
|
yield f"data: {json.dumps({'type': 'log', 'message': f'총 {count}개의 프로젝트 발견. 수집 시작.'})}\n\n"
|
|
|
|
for i in range(count):
|
|
try:
|
|
proj = page.locator("h4.list__contents_aria_group_body_list_item_label").nth(i)
|
|
project_name = (await proj.inner_text()).strip()
|
|
|
|
yield f"data: {json.dumps({'type': 'log', 'message': f'[{i+1}/{count}] {project_name} - 시작'})}\n\n"
|
|
await proj.scroll_into_view_if_needed()
|
|
await proj.click(force=True)
|
|
|
|
# 프로젝트 로딩 대기 (Gitea 방식: 물리적 대기)
|
|
await asyncio.sleep(5)
|
|
await page.wait_for_selector("div.footer", state="visible", timeout=20000)
|
|
|
|
recent_log = "기존데이터유지"
|
|
file_count = 0
|
|
|
|
# 1단계: 활동로그 수집 (Gitea 방식 복구 + 정밀 셀렉터)
|
|
try:
|
|
log_btn_sel = "body > div.footer > div.left > div.wrap.log-wrap > div.title.text"
|
|
log_btn = page.locator(log_btn_sel).first
|
|
if await log_btn.is_visible(timeout=5000):
|
|
yield f"data: {json.dumps({'type': 'log', 'message': ' - [로그] 창 열기 시도...'})}\n\n"
|
|
await log_btn.click(force=True)
|
|
await asyncio.sleep(5) # 로딩 충분히 대기
|
|
|
|
modal_sel = "article.archive-modal"
|
|
if await page.locator(modal_sel).is_visible():
|
|
yield f"data: {json.dumps({'type': 'log', 'message': ' - [로그] 모달 발견. 데이터 로딩 대기...'})}\n\n"
|
|
# .log-body 내부의 데이터만 타겟팅하도록 수정
|
|
date_sel = "article.archive-modal .log-body .date .text"
|
|
user_sel = "article.archive-modal .log-body .user .text"
|
|
act_sel = "article.archive-modal .log-body .activity .text"
|
|
|
|
# 데이터가 나타날 때까지 최대 15초 대기
|
|
success_log = False
|
|
for _ in range(15):
|
|
if await page.locator(date_sel).count() > 0:
|
|
raw_date = (await page.locator(date_sel).first.inner_text()).strip()
|
|
if raw_date:
|
|
success_log = True
|
|
break
|
|
await asyncio.sleep(1)
|
|
|
|
if success_log:
|
|
user_name = (await page.locator(user_sel).first.inner_text()).strip()
|
|
activity = (await page.locator(act_sel).first.inner_text()).strip()
|
|
formatted_date = re.sub(r'[-/]', '.', raw_date)[:10]
|
|
recent_log = f"{formatted_date}, {user_name}, {activity}"
|
|
yield f"data: {json.dumps({'type': 'log', 'message': f' - [로그] 성공: {recent_log[:30]}...'})}\n\n"
|
|
else:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': ' - [로그] 데이터 추출 실패'})}\n\n"
|
|
|
|
await page.click("article.archive-modal div.close", timeout=3000)
|
|
await asyncio.sleep(1.5)
|
|
except Exception as e:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': f' - [로그] 오류: {str(e)[:20]}'})}\n\n"
|
|
|
|
# 2단계: 구성(파일 수) 수집 (Gitea 순회 방식 복구 + 대기 시간 대폭 연장)
|
|
try:
|
|
sitemap_btn_sel = "body > div.footer > div.left > div.wrap.site-map-wrap"
|
|
sitemap_btn = page.locator(sitemap_btn_sel).first
|
|
if await sitemap_btn.is_visible(timeout=5000):
|
|
yield f"data: {json.dumps({'type': 'log', 'message': ' - [구성] 진입 시도...'})}\n\n"
|
|
await sitemap_btn.click(force=True)
|
|
|
|
# Gitea 방식: context.pages 직접 뒤져서 팝업 찾기
|
|
popup_page = None
|
|
for _ in range(30): # 최대 15초 대기
|
|
for p_item in context.pages:
|
|
try:
|
|
if "composition" in p_item.url:
|
|
popup_page = p_item
|
|
break
|
|
except: pass
|
|
if popup_page: break
|
|
await asyncio.sleep(0.5)
|
|
|
|
if popup_page:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': ' - [구성] 창 발견. 데이터 로딩 대기 (최대 30초)...'})}\n\n"
|
|
# 사용자 제공 정밀 선택자 적용 (nth-child(3)가 실제 데이터)
|
|
target_selector = "#composition-list h6:nth-child(3)"
|
|
success_comp = False
|
|
|
|
# 최대 30초간 데이터가 나타날 때까지 대기
|
|
for _ in range(30):
|
|
h6_count = await popup_page.locator(target_selector).count()
|
|
if h6_count > 0:
|
|
success_comp = True
|
|
break
|
|
await asyncio.sleep(1)
|
|
|
|
if success_comp:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': ' - [구성] 데이터 감지됨. 최종 렌더링 대기...'})}\n\n"
|
|
await asyncio.sleep(10) # 렌더링 안정화를 위한 대기
|
|
|
|
# 모든 h6:nth-child(3) 요소를 순회하며 숫자 합산
|
|
locators_h6 = popup_page.locator(target_selector)
|
|
h6_count = await locators_h6.count()
|
|
current_total = 0
|
|
for j in range(h6_count):
|
|
text = (await locators_h6.nth(j).inner_text()).strip()
|
|
# 텍스트 내에서 숫자만 추출 (여러 줄일 경우 마지막 줄 기준)
|
|
nums = re.findall(r'\d+', text.split('\n')[-1])
|
|
if nums:
|
|
current_total += int(nums[0])
|
|
|
|
file_count = current_total
|
|
yield f"data: {json.dumps({'type': 'log', 'message': f' - [구성] 성공 ({file_count}개)'})}\n\n"
|
|
else:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': ' - [구성] 로딩 타임아웃'})}\n\n"
|
|
|
|
await popup_page.close()
|
|
else:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': ' - [구성] 팝업창 발견 실패'})}\n\n"
|
|
except Exception as e:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': f' - [구성] 오류: {str(e)[:20]}'})}\n\n"
|
|
|
|
results.append({"projectName": project_name, "recentLog": recent_log, "fileCount": file_count})
|
|
|
|
# 홈 복귀
|
|
await page.locator("div.header div.title div").first.click(force=True)
|
|
await page.wait_for_selector("h4.list__contents_aria_group_body_list_item_label", timeout=20000)
|
|
await asyncio.sleep(2)
|
|
|
|
except Exception:
|
|
await page.goto("https://overseas.projectmastercloud.com/dashboard", wait_until="domcontentloaded")
|
|
|
|
yield f"data: {json.dumps({'type': 'done', 'data': results})}\n\n"
|
|
|
|
except Exception as e:
|
|
yield f"data: {json.dumps({'type': 'log', 'message': f'치명적 오류: {str(e)}'})}\n\n"
|
|
finally:
|
|
await browser.close()
|
|
|
|
return StreamingResponse(event_generator(), media_type="text_event-stream")
|