import os import re import asyncio import json import traceback import sys import threading import queue import pymysql from datetime import datetime from playwright.async_api import async_playwright from dotenv import load_dotenv from sql_queries import CrawlerQueries load_dotenv(override=True) # 글로벌 중단 제어용 이벤트 crawl_stop_event = threading.Event() def get_db_connection(): """MySQL 데이터베이스 연결을 반환 (환경변수 기반)""" return pymysql.connect( host=os.getenv('DB_HOST', 'localhost'), user=os.getenv('DB_USER', 'root'), password=os.getenv('DB_PASSWORD', '45278434'), database=os.getenv('DB_NAME', 'PM_proto'), charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) def clean_date_string(date_str): if not date_str: return "" match = re.search(r'(\d{2})[./-](\d{2})[./-](\d{2})', date_str) if match: return f"20{match.group(1)}.{match.group(2)}.{match.group(3)}" return date_str[:10].replace("-", ".") def parse_log_id(log_id): if not log_id or "_" not in log_id: return log_id try: parts = log_id.split('_') if len(parts) >= 4: date_part = clean_date_string(parts[1]) activity = parts[3].strip() activity = re.sub(r'\(.*?\)', '', activity).strip() return f"{date_part}, {activity}" except: pass return log_id def crawler_thread_worker(msg_queue, user_id, password): crawl_stop_event.clear() if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def run(): async with async_playwright() as p: browser = None try: msg_queue.put(json.dumps({'type': 'log', 'message': '브라우저 엔진 가동 (전 기능 복구 모드)...'})) browser = await p.chromium.launch(headless=True, args=[ "--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled" ]) context = await browser.new_context( viewport={'width': 1600, 'height': 900}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" ) captured_data = {"tree": None, "_is_root_archive": False, "project_list": [], "last_project_data": None} async def global_interceptor(response): url = response.url try: if "getAllList" in url: data = await response.json() captured_data["project_list"] = data.get("data", []) elif "getTreeObject" in url: is_root = False if "params[resourcePath]=" in url: path_val = url.split("params[resourcePath]=")[1].split("&")[0] if path_val in ["%2F", "/"]: is_root = True if is_root: captured_data["tree"] = await response.json() captured_data["_is_root_archive"] = True elif "getData" in url and "overview" in url: captured_data["last_project_data"] = await response.json() except: pass context.on("response", global_interceptor) page = await context.new_page() await page.goto("https://overseas.projectmastercloud.com/dashboard", wait_until="domcontentloaded") # 로그인 if await page.locator("#login-by-id").is_visible(timeout=10000): await page.click("#login-by-id") await page.fill("#user_id", user_id) await page.fill("#user_pw", password) await page.click("#login-btn") await page.wait_for_selector("h4.list__contents_aria_group_body_list_item_label", timeout=60000) await asyncio.sleep(3) # [Phase 1] DB 마스터 정보 동기화 if captured_data["project_list"]: conn = get_db_connection() try: with conn.cursor() as cursor: for p_info in captured_data["project_list"]: cursor.execute(CrawlerQueries.UPSERT_MASTER, (p_info.get("project_id"), p_info.get("project_nm"), p_info.get("short_nm", "").strip(), p_info.get("master"), p_info.get("large_class"), p_info.get("mid_class"))) conn.commit() msg_queue.put(json.dumps({'type': 'log', 'message': 'DB 마스터 정보 동기화 완료.'})) finally: conn.close() # [Phase 2] 수집 루프 names = await page.locator("h4.list__contents_aria_group_body_list_item_label").all_inner_texts() project_names = list(dict.fromkeys([n.strip() for n in names if n.strip()])) count = len(project_names) for i, project_name in enumerate(project_names): if crawl_stop_event.is_set(): msg_queue.put(json.dumps({'type': 'log', 'message': '>>> 중단 신호 감지: 종료합니다.'})) break msg_queue.put(json.dumps({'type': 'log', 'message': f'[{i+1}/{count}] {project_name} 수집 시작'})) p_match = next((p for p in captured_data["project_list"] if p.get('project_nm') == project_name or p.get('short_nm', '').strip() == project_name), None) current_p_id = p_match.get('project_id') if p_match else None captured_data["tree"] = None; captured_data["_is_root_archive"] = False try: # 1. 프로젝트 진입 (좌표 클릭) target_el = page.locator(f"h4.list__contents_aria_group_body_list_item_label:has-text('{project_name}')").first await target_el.scroll_into_view_if_needed() box = await target_el.bounding_box() if box: await page.mouse.click(box['x'] + 5, box['y'] + 5) else: await target_el.click(force=True) await page.wait_for_selector("text=활동로그", timeout=30000) # [부서 정보 수집] getData 응답 대기 및 DB 업데이트 for _ in range(10): if captured_data.get("last_project_data"): break await asyncio.sleep(0.5) last_data = captured_data.get("last_project_data") if last_data: if isinstance(last_data, list) and len(last_data) > 0: last_data = last_data[0] if isinstance(last_data, dict): proj_data = last_data.get("data", {}) if isinstance(proj_data, list) and len(proj_data) > 0: proj_data = proj_data[0] if isinstance(proj_data, dict): dept = proj_data.get("department") p_id = proj_data.get("project_id") if dept and p_id: with get_db_connection() as conn: with conn.cursor() as cursor: cursor.execute(CrawlerQueries.UPDATE_DEPARTMENT, (dept, p_id)) conn.commit() captured_data["last_project_data"] = None # 초기화 await asyncio.sleep(2) recent_log = "데이터 없음"; file_count = 0 # 2. 활동로그 (날짜 필터 적용 버전) modal_opened = False for _ in range(3): await page.get_by_text("활동로그").first.click() try: await page.wait_for_selector("article.archive-modal", timeout=5000) modal_opened = True; break except: await asyncio.sleep(1) if modal_opened: # 날짜 필터 2020-01-01 적용 inputs = await page.locator("article.archive-modal input").all() for inp in inputs: if (await inp.get_attribute("type")) == "date": await inp.fill("2020-01-01"); break apply_btn = page.locator("article.archive-modal").get_by_text("적용").first if await apply_btn.is_visible(): await apply_btn.click() await asyncio.sleep(5) log_elements = await page.locator("article.archive-modal div[id*='_']").all() if log_elements: recent_log = parse_log_id(await log_elements[0].get_attribute("id")) await page.keyboard.press("Escape") # 3. 구성 수집 (API Fetch 방식 - 팝업 없음) await page.evaluate("""() => { const baseUrl = window.location.origin + window.location.pathname.split('/').slice(0, 2).join('/'); fetch(`${baseUrl}/archive/getTreeObject?params[storageType]=CLOUD¶ms[resourcePath]=/`); }""") for _ in range(30): if captured_data["_is_root_archive"]: break await asyncio.sleep(0.5) if captured_data["tree"]: tree_data = captured_data["tree"] if isinstance(tree_data, list) and len(tree_data) > 0: tree_data = tree_data[0] if isinstance(tree_data, dict): tree = tree_data.get('currentTreeObject', tree_data) if isinstance(tree, dict): total = len(tree.get("file", {})) folders = tree.get("folder", {}) if isinstance(folders, dict): for f in folders.values(): total += int(f.get("filesCount", 0)) file_count = total # 4. DB 실시간 저장 if current_p_id: with get_db_connection() as conn: with conn.cursor() as cursor: cursor.execute(CrawlerQueries.UPSERT_HISTORY, (current_p_id, recent_log, file_count)) conn.commit() msg_queue.put(json.dumps({'type': 'log', 'message': f' - [성공] 로그: {recent_log[:20]}... / 파일: {file_count}개'})) await page.goto("https://overseas.projectmastercloud.com/dashboard", wait_until="domcontentloaded") except Exception as e: msg_queue.put(json.dumps({'type': 'log', 'message': f' - {project_name} 실패: {str(e)}'})) await page.goto("https://overseas.projectmastercloud.com/dashboard") msg_queue.put(json.dumps({'type': 'done', 'data': []})) except Exception as e: msg_queue.put(json.dumps({'type': 'log', 'message': f'치명적 오류: {str(e)}'})) finally: if browser: await browser.close() msg_queue.put(None) loop.run_until_complete(run()) loop.close() async def run_crawler_service(): msg_queue = queue.Queue() thread = threading.Thread(target=crawler_thread_worker, args=(msg_queue, os.getenv("PM_USER_ID"), os.getenv("PM_PASSWORD"))) thread.start() while True: try: msg = await asyncio.to_thread(msg_queue.get, timeout=1.0) if msg is None: break yield f"data: {msg}\n\n" except queue.Empty: if not thread.is_alive(): break await asyncio.sleep(0.1) thread.join()