import os import re import asyncio import json import csv import traceback from datetime import datetime from playwright.async_api import async_playwright from dotenv import load_dotenv load_dotenv() def clean_date_string(date_str): """ 날짜 문자열에서 YY.MM.DD 형식만 추출합니다. """ if not date_str: return "" match = re.search(r'(\d{2})[./-](\d{2})[./-](\d{2})', date_str) if match: return f"{match.group(1)}.{match.group(2)}.{match.group(3)}" return date_str[:8] async def run_crawler_service(): """ 상세 패킷을 강제 호출하여 팝업 없이 상세 파일 개수를 수집하며 모든 성공 로직을 보존한 크롤러입니다. """ user_id = os.getenv("PM_USER_ID") password = os.getenv("PM_PASSWORD") if not user_id or not password: yield f"data: {json.dumps({'type': 'log', 'message': '오류: .env 파일에 계정 정보가 없습니다.'})}\n\n" return results = [] async with async_playwright() as p: browser = None try: yield f"data: {json.dumps({'type': 'log', 'message': '브라우저 엔진 (데이터 강제 유도 모드) 가동...'})}\n\n" browser = await p.chromium.launch(headless=False, args=["--no-sandbox", "--disable-dev-shm-usage"]) context = await browser.new_context(viewport={'width': 1600, 'height': 900}) captured_data = {"log": None, "tree": None} async def global_interceptor(response): url = response.url try: # 상세 패킷 감시 (params[resourcePath]=/ 가 포함된 상세 응답 우선) if "getTreeObject" in url: data = await response.json() if data.get('currentTreeObject', {}).get('folder'): captured_data["tree"] = data elif "Log" in url: captured_data["log"] = await response.json() except: pass context.on("response", global_interceptor) page = await context.new_page() # --- 1. 로그인 (안정 로직) --- await page.goto("https://overseas.projectmastercloud.com/dashboard", wait_until="domcontentloaded") if await page.locator("#login-by-id").is_visible(timeout=5000): await page.click("#login-by-id") await page.fill("#user_id", user_id) await page.fill("#user_pw", password) await page.click("#login-btn") await page.wait_for_selector("h4.list__contents_aria_group_body_list_item_label", timeout=60000) await asyncio.sleep(5) names = await page.locator("h4.list__contents_aria_group_body_list_item_label").all_inner_texts() project_names = [n.strip() for n in names if n.strip()] count = len(project_names) yield f"data: {json.dumps({'type': 'log', 'message': f'총 {count}개의 프로젝트 수집 시작.'})}\n\n" for i, project_name in enumerate(project_names): captured_data["log"] = None captured_data["tree"] = None yield f"data: {json.dumps({'type': 'log', 'message': f'[{i+1}/{count}] {project_name} 수집 시작'})}\n\n" try: # 상세 페이지 진입 (안정 로직) target_el = page.get_by_text(project_name).first await target_el.scroll_into_view_if_needed() box = await target_el.bounding_box() if box: await page.mouse.click(box['x'] + 5, box['y'] + 5) else: await target_el.click(force=True) await page.wait_for_selector("text=활동로그", timeout=30000) await asyncio.sleep(3) recent_log = "데이터 없음" file_count = 0 # --- 2. 활동로그 수집 (100% 복구 로직) --- try: modal_opened = False for _ in range(3): log_btn = page.get_by_text("활동로그").first await page.evaluate("(el) => el.click()", await log_btn.element_handle()) try: await page.wait_for_selector("article.archive-modal", timeout=5000) modal_opened = True break except: await asyncio.sleep(1) if modal_opened: inputs = await page.locator("article.archive-modal input").all() for inp in inputs: itype = await inp.get_attribute("type") iname = (await inp.get_attribute("name") or "").lower() iclass = (await inp.get_attribute("class") or "").lower() if itype == "date" or "start" in iname or "start" in iclass: await inp.fill("2020-01-01") break captured_data["log"] = None apply_btn = page.locator("article.archive-modal").get_by_text("적용").first if await apply_btn.is_visible(): await apply_btn.click() await asyncio.sleep(4) if captured_data["log"]: data = captured_data["log"] logs = data.get('logData', []) or data.get('result', []) if logs and isinstance(logs, list) and len(logs) > 0: top = logs[0] rd = top.get('log_date') or top.get('date') or "" u = top.get('user_name') or top.get('user') or "" c = top.get('activity_content') or top.get('activity') or "" recent_log = f"{clean_date_string(rd)}, {u}, {c}" if recent_log == "데이터 없음": modal = page.locator("article.archive-modal") try: d_v = (await modal.locator(".log-body .date .text").first.inner_text()).strip() u_v = (await modal.locator(".log-body .user .text").first.inner_text()).strip() a_v = (await modal.locator(".log-body .activity .text").first.inner_text()).strip() if d_v: recent_log = f"{clean_date_string(d_v)}, {u_v}, {a_v}" except: pass yield f"data: {json.dumps({'type': 'log', 'message': f' - [로그 결과] {recent_log}'})}\n\n" await page.keyboard.press("Escape") else: yield f"data: {json.dumps({'type': 'log', 'message': ' - [로그] 모달 진입 실패'})}\n\n" except Exception as le: yield f"data: {json.dumps({'type': 'log', 'message': f' - [로그] 오류: {str(le)}'})}\n\n" # --- 3. 구성 수집 (상세 패킷 강제 유도) --- try: # [지능형 유도] 상세 정보를 포함한 API 요청을 브라우저가 직접 날리게 함 await page.evaluate("""() => { fetch('/api/getTreeObject?params[storageType]=CLOUD¶ms[resourcePath]=/'); }""") # 패킷 대기 (최대 10초) for _ in range(20): if captured_data["tree"]: break await asyncio.sleep(0.5) if captured_data["tree"]: data = captured_data["tree"] # 분석된 딕셔너리 구조 합산 folders = data.get('currentTreeObject', {}).get('folder', {}) total_files = 0 if isinstance(folders, dict): for folder_name, folder_info in folders.items(): total_files += int(folder_info.get('filesCount', 0)) file_count = total_files yield f"data: {json.dumps({'type': 'log', 'message': f' - [구성] 상세 합산 성공 ({file_count}개)'})}\n\n" else: yield f"data: {json.dumps({'type': 'log', 'message': ' - [구성] 상세 데이터 응답 없음'})}\n\n" except: pass results.append({"projectName": project_name, "recentLog": recent_log, "fileCount": file_count}) await page.goto("https://overseas.projectmastercloud.com/dashboard", wait_until="domcontentloaded") await page.wait_for_selector("h4.list__contents_aria_group_body_list_item_label", timeout=20000) except Exception as e: yield f"data: {json.dumps({'type': 'log', 'message': f' - [{project_name}] 건너뜀 (사유: {str(e)})'})}\n\n" await page.goto("https://overseas.projectmastercloud.com/dashboard") # --- 4. CSV 파일 저장 --- try: today_str = datetime.now().strftime("%Y.%m.%d") csv_path = f"crawling_result {today_str}.csv" with open(csv_path, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=["projectName", "recentLog", "fileCount"]) writer.writeheader() writer.writerows(results) yield f"data: {json.dumps({'type': 'log', 'message': f'✅ 모든 데이터가 {csv_path}에 저장되었습니다.'})}\n\n" except Exception as fe: yield f"data: {json.dumps({'type': 'log', 'message': f'❌ CSV 저장 실패: {str(fe)}'})}\n\n" yield f"data: {json.dumps({'type': 'done', 'data': results})}\n\n" except Exception as e: yield f"data: {json.dumps({'type': 'log', 'message': f'치명적 오류: {str(e)}'})}\n\n" finally: if browser: await browser.close()