feat: MySQL DB 연동 및 크롤링 로직 정상화 (ID 매칭 및 데이터 정밀화)

This commit is contained in:
2026-03-10 14:11:49 +09:00
parent 9369e18eb8
commit 743cce543b
21 changed files with 696 additions and 204 deletions

View File

@@ -2,101 +2,159 @@ import os
import re
import asyncio
import json
import csv
import traceback
import sys
import threading
import queue
import pymysql
from datetime import datetime
from playwright.async_api import async_playwright
from dotenv import load_dotenv
load_dotenv()
def get_db_connection():
"""MySQL 데이터베이스 연결을 반환합니다."""
return pymysql.connect(
host='localhost',
user='root',
password='45278434',
database='crawling',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
def clean_date_string(date_str):
"""
날짜 문자열에서 YY.MM.DD 형식만 추출합니다.
"""
if not date_str: return ""
match = re.search(r'(\d{2})[./-](\d{2})[./-](\d{2})', date_str)
if match:
return f"{match.group(1)}.{match.group(2)}.{match.group(3)}"
return date_str[:8]
return f"20{match.group(1)}.{match.group(2)}.{match.group(3)}"
return date_str[:10].replace("-", ".")
async def run_crawler_service():
"""
상세 패킷을 강제 호출하여 팝업 없이 상세 파일 개수를 수집하며 모든 성공 로직을 보존한 크롤러입니다.
"""
user_id = os.getenv("PM_USER_ID")
password = os.getenv("PM_PASSWORD")
def parse_log_id(log_id):
"""ID 구조: 로그고유번호_시간_활동한 사람_활동내용_활동대상"""
if not log_id or "_" not in log_id: return log_id
try:
parts = log_id.split('_')
if len(parts) >= 4:
date_part = clean_date_string(parts[1])
activity = parts[3].strip()
activity = re.sub(r'\(.*?\)', '', activity).strip()
return f"{date_part}, {activity}"
except: pass
return log_id
if not user_id or not password:
yield f"data: {json.dumps({'type': 'log', 'message': '오류: .env 파일에 계정 정보가 없습니다.'})}\n\n"
return
results = []
async with async_playwright() as p:
browser = None
try:
yield f"data: {json.dumps({'type': 'log', 'message': '브라우저 엔진 (데이터 강제 유도 모드) 가동...'})}\n\n"
browser = await p.chromium.launch(headless=False, args=["--no-sandbox", "--disable-dev-shm-usage"])
context = await browser.new_context(viewport={'width': 1600, 'height': 900})
captured_data = {"log": None, "tree": None}
async def global_interceptor(response):
url = response.url
try:
# 상세 패킷 감시 (params[resourcePath]=/ 가 포함된 상세 응답 우선)
if "getTreeObject" in url:
data = await response.json()
if data.get('currentTreeObject', {}).get('folder'):
captured_data["tree"] = data
elif "Log" in url:
captured_data["log"] = await response.json()
except: pass
context.on("response", global_interceptor)
page = await context.new_page()
# --- 1. 로그인 (안정 로직) ---
await page.goto("https://overseas.projectmastercloud.com/dashboard", wait_until="domcontentloaded")
if await page.locator("#login-by-id").is_visible(timeout=5000):
await page.click("#login-by-id")
await page.fill("#user_id", user_id)
await page.fill("#user_pw", password)
await page.click("#login-btn")
await page.wait_for_selector("h4.list__contents_aria_group_body_list_item_label", timeout=60000)
await asyncio.sleep(5)
names = await page.locator("h4.list__contents_aria_group_body_list_item_label").all_inner_texts()
project_names = [n.strip() for n in names if n.strip()]
count = len(project_names)
yield f"data: {json.dumps({'type': 'log', 'message': f'{count}개의 프로젝트 수집 시작.'})}\n\n"
for i, project_name in enumerate(project_names):
captured_data["log"] = None
captured_data["tree"] = None
yield f"data: {json.dumps({'type': 'log', 'message': f'[{i+1}/{count}] {project_name} 수집 시작'})}\n\n"
def crawler_thread_worker(msg_queue, user_id, password):
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def run():
async with async_playwright() as p:
browser = None
try:
msg_queue.put(json.dumps({'type': 'log', 'message': '브라우저 엔진 가동 (전 기능 완벽 복구 모드)...'}))
browser = await p.chromium.launch(headless=False, args=["--no-sandbox"])
context = await browser.new_context(viewport={'width': 1600, 'height': 900})
try:
# 상세 페이지 진입 (안정 로직)
target_el = page.get_by_text(project_name).first
await target_el.scroll_into_view_if_needed()
box = await target_el.bounding_box()
if box: await page.mouse.click(box['x'] + 5, box['y'] + 5)
else: await target_el.click(force=True)
await page.wait_for_selector("text=활동로그", timeout=30000)
await asyncio.sleep(3)
captured_data = {"tree": None, "_is_root_archive": False, "_tree_url": "", "project_list": []}
recent_log = "데이터 없음"
file_count = 0
# --- 2. 활동로그 수집 (100% 복구 로직) ---
async def global_interceptor(response):
url = response.url
try:
if "getAllList" in url:
data = await response.json()
captured_data["project_list"] = data.get("data", [])
elif "getTreeObject" in url:
# [핵심 복원] 정확한 루트 경로 판별 로직
is_root = False
if "params[resourcePath]=" in url:
path_val = url.split("params[resourcePath]=")[1].split("&")[0]
if path_val in ["%2F", "/"]: is_root = True
if is_root:
data = await response.json()
captured_data["tree"] = data
captured_data["_is_root_archive"] = "archive" in url
captured_data["_tree_url"] = url
except: pass
context.on("response", global_interceptor)
page = await context.new_page()
await page.goto("https://overseas.projectmastercloud.com/dashboard", wait_until="domcontentloaded")
# 로그인
if await page.locator("#login-by-id").is_visible(timeout=5000):
await page.click("#login-by-id")
await page.fill("#user_id", user_id)
await page.fill("#user_pw", password)
await page.click("#login-btn")
# 리스트 로딩 대기
await page.wait_for_selector("h4.list__contents_aria_group_body_list_item_label", timeout=60000)
await asyncio.sleep(3)
# [Phase 1] DB 기초 정보 동기화 (엄격한 매칭)
if captured_data["project_list"]:
conn = get_db_connection()
try:
with conn.cursor() as cursor:
for p_info in captured_data["project_list"]:
p_nm = p_info.get("project_nm")
try:
sql = """
INSERT INTO overseas_projects (project_id, project_nm, short_nm, master, continent, country)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
project_id = VALUES(project_id), project_nm = VALUES(project_nm),
short_nm = VALUES(short_nm), master = VALUES(master),
continent = VALUES(continent), country = VALUES(country)
"""
cursor.execute(sql, (p_info.get("project_id"), p_nm, p_info.get("short_nm", "").strip(),
p_info.get("master"), p_info.get("large_class"), p_info.get("mid_class")))
except: continue
conn.commit()
msg_queue.put(json.dumps({'type': 'log', 'message': f'DB 기초 정보 동기화 완료 ({len(captured_data["project_list"])}개)'}))
finally: conn.close()
# [Phase 2] h4 태그 기반 수집 루프
names = await page.locator("h4.list__contents_aria_group_body_list_item_label").all_inner_texts()
project_names = list(dict.fromkeys([n.strip() for n in names if n.strip()]))
count = len(project_names)
for i, project_name in enumerate(project_names):
# 현재 프로젝트의 고유 ID 매칭 (저장용)
p_match = next((p for p in captured_data["project_list"] if p.get('project_nm') == project_name or p.get('short_nm', '').strip() == project_name), None)
current_p_id = p_match.get('project_id') if p_match else None
captured_data["tree"] = None
captured_data["_is_root_archive"] = False
msg_queue.put(json.dumps({'type': 'log', 'message': f'[{i+1}/{count}] {project_name} 수집 시작'}))
try:
# 1. 프로젝트 진입 ([완전 복원] 좌표 클릭)
target_el = page.locator(f"h4.list__contents_aria_group_body_list_item_label:has-text('{project_name}')").first
await target_el.scroll_into_view_if_needed()
box = await target_el.bounding_box()
if box: await page.mouse.click(box['x'] + 5, box['y'] + 5)
else: await target_el.click(force=True)
await page.wait_for_selector("text=활동로그", timeout=30000)
await asyncio.sleep(2)
recent_log = "데이터 없음"
file_count = 0
# 2. 활동로그 ([완전 복원] 3회 재시도 + 좌표 클릭 + 날짜 필터)
modal_opened = False
for _ in range(3):
log_btn = page.get_by_text("활동로그").first
await page.evaluate("(el) => el.click()", await log_btn.element_handle())
btn_box = await log_btn.bounding_box()
if btn_box: await page.mouse.click(btn_box['x'] + 5, btn_box['y'] + 5)
else: await page.evaluate("(el) => el.click()", await log_btn.element_handle())
try:
await page.wait_for_selector("article.archive-modal", timeout=5000)
modal_opened = True
@@ -104,96 +162,91 @@ async def run_crawler_service():
except: await asyncio.sleep(1)
if modal_opened:
# 날짜 필터 입력
inputs = await page.locator("article.archive-modal input").all()
for inp in inputs:
itype = await inp.get_attribute("type")
iname = (await inp.get_attribute("name") or "").lower()
iclass = (await inp.get_attribute("class") or "").lower()
if itype == "date" or "start" in iname or "start" in iclass:
if (await inp.get_attribute("type")) == "date":
await inp.fill("2020-01-01")
break
captured_data["log"] = None
apply_btn = page.locator("article.archive-modal").get_by_text("적용").first
if await apply_btn.is_visible():
await apply_btn.click()
await asyncio.sleep(4)
if captured_data["log"]:
data = captured_data["log"]
logs = data.get('logData', []) or data.get('result', [])
if logs and isinstance(logs, list) and len(logs) > 0:
top = logs[0]
rd = top.get('log_date') or top.get('date') or ""
u = top.get('user_name') or top.get('user') or ""
c = top.get('activity_content') or top.get('activity') or ""
recent_log = f"{clean_date_string(rd)}, {u}, {c}"
if recent_log == "데이터 없음":
modal = page.locator("article.archive-modal")
try:
d_v = (await modal.locator(".log-body .date .text").first.inner_text()).strip()
u_v = (await modal.locator(".log-body .user .text").first.inner_text()).strip()
a_v = (await modal.locator(".log-body .activity .text").first.inner_text()).strip()
if d_v: recent_log = f"{clean_date_string(d_v)}, {u_v}, {a_v}"
except: pass
yield f"data: {json.dumps({'type': 'log', 'message': f' - [로그 결과] {recent_log}'})}\n\n"
await asyncio.sleep(5) # 렌더링 보장
log_elements = await page.locator("article.archive-modal div[id*='_']").all()
if log_elements:
raw_id = await log_elements[0].get_attribute("id")
recent_log = parse_log_id(raw_id)
msg_queue.put(json.dumps({'type': 'log', 'message': f' - [분석] 최신 로그 ID 추출 성공: {recent_log}'}))
msg_queue.put(json.dumps({'type': 'log', 'message': f' - [최종 결과] {recent_log}'}))
await page.keyboard.press("Escape")
else:
yield f"data: {json.dumps({'type': 'log', 'message': ' - [로그] 모달 진입 실패'})}\n\n"
except Exception as le:
yield f"data: {json.dumps({'type': 'log', 'message': f' - [로그] 오류: {str(le)}'})}\n\n"
# --- 3. 구성 수집 (상세 패킷 강제 유도) ---
try:
# [지능형 유도] 상세 정보를 포함한 API 요청을 브라우저가 직접 날리게 함
await page.evaluate("""() => {
fetch('/api/getTreeObject?params[storageType]=CLOUD&params[resourcePath]=/');
# 3. 구성 수집 ([완전 복원] BaseURL fetch + 정밀 합산)
await page.evaluate("""() => {
const baseUrl = window.location.origin + window.location.pathname.split('/').slice(0, 2).join('/');
fetch(`${baseUrl}/archive/getTreeObject?params[storageType]=CLOUD&params[resourcePath]=/`);
}""")
# 패킷 대기 (최대 10초)
for _ in range(20):
if captured_data["tree"]: break
for _ in range(30):
if captured_data["_is_root_archive"]: break
await asyncio.sleep(0.5)
if captured_data["tree"]:
data = captured_data["tree"]
# 분석된 딕셔너리 구조 합산
folders = data.get('currentTreeObject', {}).get('folder', {})
total_files = 0
data_root = captured_data["tree"]
tree = data_root.get('currentTreeObject', data_root) if isinstance(data_root, dict) else {}
total = 0
# 루트 파일 합산
rf = tree.get("file", {})
total += len(rf) if isinstance(rf, (dict, list)) else 0
# 폴더별 filesCount 합산
folders = tree.get("folder", {})
if isinstance(folders, dict):
for folder_name, folder_info in folders.items():
total_files += int(folder_info.get('filesCount', 0))
file_count = total_files
yield f"data: {json.dumps({'type': 'log', 'message': f' - [구성] 상세 합산 성공 ({file_count}개)'})}\n\n"
else:
yield f"data: {json.dumps({'type': 'log', 'message': ' - [구성] 상세 데이터 응답 없음'})}\n\n"
except: pass
for f in folders.values():
c = f.get("filesCount", "0")
total += int(c) if str(c).isdigit() else 0
file_count = total
msg_queue.put(json.dumps({'type': 'log', 'message': f' - [구성] 데이터 채택 성공: ...{captured_data.get("_tree_url", "")[-40:]}'}))
msg_queue.put(json.dumps({'type': 'log', 'message': f' - [구성] 최종 정밀 합산 성공 ({file_count}개)'}))
results.append({"projectName": project_name, "recentLog": recent_log, "fileCount": file_count})
await page.goto("https://overseas.projectmastercloud.com/dashboard", wait_until="domcontentloaded")
await page.wait_for_selector("h4.list__contents_aria_group_body_list_item_label", timeout=20000)
except Exception as e:
yield f"data: {json.dumps({'type': 'log', 'message': f' - [{project_name}] 건너뜀 (사유: {str(e)})'})}\n\n"
await page.goto("https://overseas.projectmastercloud.com/dashboard")
# 4. DB 실시간 저장 (ID 기반)
if current_p_id:
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "UPDATE overseas_projects SET recent_log = %s, file_count = %s WHERE project_id = %s"
cursor.execute(sql, (recent_log, file_count, current_p_id))
conn.commit()
msg_queue.put(json.dumps({'type': 'log', 'message': f' - [DB] 업데이트 완료 (ID: {current_p_id})'}))
finally: conn.close()
# --- 4. CSV 파일 저장 ---
try:
today_str = datetime.now().strftime("%Y.%m.%d")
csv_path = f"crawling_result {today_str}.csv"
with open(csv_path, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=["projectName", "recentLog", "fileCount"])
writer.writeheader()
writer.writerows(results)
yield f"data: {json.dumps({'type': 'log', 'message': f'✅ 모든 데이터가 {csv_path}에 저장되었습니다.'})}\n\n"
except Exception as fe:
yield f"data: {json.dumps({'type': 'log', 'message': f'❌ CSV 저장 실패: {str(fe)}'})}\n\n"
await page.goto("https://overseas.projectmastercloud.com/dashboard", wait_until="domcontentloaded")
except Exception as e:
msg_queue.put(json.dumps({'type': 'log', 'message': f' - [{project_name}] 건너뜀: {str(e)}'}))
await page.goto("https://overseas.projectmastercloud.com/dashboard")
yield f"data: {json.dumps({'type': 'done', 'data': results})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'log', 'message': f'치명적 오류: {str(e)}'})}\n\n"
finally:
if browser: await browser.close()
msg_queue.put(json.dumps({'type': 'done', 'data': []}))
except Exception as e:
msg_queue.put(json.dumps({'type': 'log', 'message': f'치명적 오류: {str(e)}'}))
finally:
if browser: await browser.close()
msg_queue.put(None)
loop.run_until_complete(run())
loop.close()
async def run_crawler_service():
user_id = os.getenv("PM_USER_ID")
password = os.getenv("PM_PASSWORD")
msg_queue = queue.Queue()
thread = threading.Thread(target=crawler_thread_worker, args=(msg_queue, user_id, password))
thread.start()
while True:
try:
msg = await asyncio.to_thread(msg_queue.get, timeout=1.0)
if msg is None: break
yield f"data: {msg}\n\n"
except queue.Empty:
if not thread.is_alive(): break
await asyncio.sleep(0.1)
thread.join()