import csv import argparse import os import time import ast import requests import io from datetime import datetime, timezone from descope import ( AssociatedTenant, DescopeClient, AuthException, ) def get_descope_client(): """ 환경 변수에서 Descope Project ID와 Management Key를 읽어 DescopeClient를 초기화하고 반환합니다. """ project_id = os.getenv("DESCOPE_PROJECT_ID") management_key = os.getenv("DESCOPE_MANAGEMENT_KEY") if not project_id or not management_key: raise ValueError("DESCOPE_PROJECT_ID와 DESCOPE_MANAGEMENT_KEY 환경 변수를 설정해야 합니다.") return DescopeClient(project_id=project_id, management_key=management_key) def csv_to_dict(csv_file_path): """ CSV 파일을 읽어 딕셔너리 리스트로 변환합니다. """ data = [] with open(csv_file_path, mode='r', encoding='utf-8') as csv_file: csv_reader = csv.DictReader(csv_file) for row in csv_reader: data.append(row) return data def csv_from_google_sheet_url(url: str) -> list[dict]: """ Google Sheets URL에서 CSV 데이터를 가져와 딕셔너리 리스트로 변환합니다. URL은 '.../export?format=csv' 형식이어야 합니다. """ print(f"Google Sheets에서 데이터 다운로드 중: {url}") try: response = requests.get(url) response.raise_for_status() # HTTP 오류가 발생하면 예외를 발생시킵니다. # Google Sheets는 종종 UTF-8-sig로 인코딩된 CSV를 반환합니다 (BOM 포함). csv_content = response.content.decode('utf-8-sig') # 문자열을 파일처럼 다루기 위해 io.StringIO를 사용합니다. csv_file = io.StringIO(csv_content) csv_reader = csv.DictReader(csv_file) data = [row for row in csv_reader] print("다운로드 및 파싱 완료.") return data except requests.exceptions.RequestException as e: raise RuntimeError(f"Google Sheets URL에서 데이터를 가져오는 데 실패했습니다: {e}") except Exception as e: raise RuntimeError(f"CSV 데이터 처리 중 오류 발생: {e}") def create_users(users_data, expiry_timestamp=None): """ CSV에서 읽어온 사용자 데이터를 기반으로 Descope를 통해 계정을 일괄 생성합니다. 각 요청 사이에 0.5초의 지연을 줍니다. """ print("--- 사용자 계정 일괄 생성을 시작합니다 ---") try: descope_client = get_descope_client() for user in users_data: try: login_id = user.get('login_id') if not login_id: print(f"경고: 'login_id'가 없는 행을 건너뜁니다: {user}") continue email = user.get('email') display_name = user.get('display_name') user_tenants = [] tenants_str = user.get('tenants') if tenants_str: try: # ast.literal_eval을 사용하여 안전하게 문자열을 Python 객체로 변환 tenant_ids = ast.literal_eval(tenants_str) roles = [user.get('role_name')] if user.get('role_name') else [] if isinstance(tenant_ids, list): for tenant_id in tenant_ids: user_tenants.append(AssociatedTenant(tenant_id, roles)) else: print(f"경고: {login_id}의 'tenants' 필드가 리스트 형식이 아닙니다: {tenants_str}") except (ValueError, SyntaxError): print(f"경고: {login_id}의 'tenants' 필드를 파싱할 수 없습니다: {tenants_str}") custom_attributes = {} if user.get('custom_attributes_key') and user.get('custom_attributes_value'): custom_attributes[user['custom_attributes_key']] = user['custom_attributes_value'] if user.get('company'): custom_attributes['company'] = user['company'] custom_attributes['completeForm'] = True # egBimLExpiryDate 처리: 인자로 받은 타임스탬프가 있으면 덮어쓰고, 없으면 CSV 값 사용 if expiry_timestamp is not None: custom_attributes['egBimLExpiryDate'] = expiry_timestamp elif user.get('egBimLExpiryDate'): custom_attributes['egBimLExpiryDate'] = user['egBimLExpiryDate'] status = user.get('status', 'activated') # 기본값 'activated' print(f"사용자 생성 시도: {login_id}") descope_client.mgmt.user.create( login_id=login_id, email=email, display_name=display_name, user_tenants=user_tenants, custom_attributes=custom_attributes, ) print(f"성공: {login_id} 사용자가 생성되었습니다.") # 상태 업데이트가 필요한 경우 (기본값과 다른 경우) if status != 'activated': try: print(f"사용자 상태 업데이트 시도: {login_id} -> {status}") descope_client.mgmt.user.update_status(login_id=login_id, status=status) print(f"성공: {login_id} 사용자의 상태가 {status}로 변경되었습니다.") except AuthException as e: print(f"오류: {login_id} 사용자 상태 업데이트 실패 - {e}") time.sleep(0.5) except AuthException as e: print(f"오류: {login_id} 사용자 생성 실패 - {e}") except Exception as e: print(f"예상치 못한 오류 발생 ({login_id}): {e}") except ValueError as e: print(f"오류: {e}") except Exception as e: print(f"Descope 클라이언트 초기화 중 오류 발생: {e}") print("--- 사용자 계정 일괄 생성이 완료되었습니다 ---") def change_passwords(users_data): """ CSV에서 읽어온 사용자 데이터를 기반으로 Descope를 통해 비밀번호를 일괄 변경합니다. """ print("--- 사용자 비밀번호 일괄 변경을 시작합니다 ---") try: descope_client = get_descope_client() for user in users_data: try: login_id = user.get('login_id') new_password = user.get('new_password') if not login_id or not new_password: print(f"경고: 'login_id' 또는 'new_password'가 없는 행을 건너뜁니다: {user}") continue print(f"비밀번호 변경 시도: {login_id}") descope_client.mgmt.user.set_active_password( login_id=login_id, password=new_password, ) descope_client.mgmt.user.activate( login_id=login_id ) print(f"성공: {login_id} 사용자의 비밀번호가 변경되었습니다.") time.sleep(0.5) except AuthException as e: print(f"오류: {login_id} 사용자의 비밀번호 변경 실패 - {e}") except Exception as e: print(f"예상치 못한 오류 발생 ({login_id}): {e}") except ValueError as e: print(f"오류: {e}") except Exception as e: print(f"Descope 클라이언트 초기화 중 오류 발생: {e}") print("--- 사용자 비밀번호 일괄 변경이 완료되었습니다 ---") def test_create_single_user(): """ 단일 사용자 생성을 테스트하는 함수입니다. """ print("--- 단일 사용자 생성 테스트를 시작합니다 ---") test_user_data = [{ 'login_id': 'testuser.gemini@example.com', 'email': 'testuser.gemini@example.com', 'display_name': 'Gemini Test User', 'tenants': '["T31ZmUcwOZbwk0y3YmMxrPCpzpQR"]', # 실제 테스트용 Tenant ID로 변경 필요 'company': 'Gemini Test Inc.', 'egBimLExpiryDate': 1798729200, 'status': 'activated', 'verifiedEmail': True, 'is_test_user': True }] create_users(test_user_data) print("--- 단일 사용자 생성 테스트가 완료되었습니다 ---") def main(): """ 메인 실행 함수 """ parser = argparse.ArgumentParser(description="CSV 파일 또는 Google Sheets URL을 이용해 Descope 사용자를 일괄 생성하거나 비밀번호를 변경합니다.") parser.add_argument("source", nargs='?', default=None, help="입력으로 사용할 CSV 파일 경로 또는 Google Sheets URL. 'test' action의 경우 필요하지 않습니다.") parser.add_argument("--action", choices=['create', 'change_password', 'test'], required=True, help="수행할 작업 (create: 사용자 생성, change_password: 비밀번호 변경, test: 단일 사용자 생성 테스트)") parser.add_argument("--expiry-date", help="사용자 라이선스 만료일을 'YYYY-MM-DD' 형식으로 지정합니다. 이 값을 지정하면 CSV 파일의 모든 'egBimLExpiryDate' 값을 덮어씁니다.") args = parser.parse_args() if args.action == 'test': test_create_single_user() return if not args.source: print("오류: 'create' 또는 'change_password' 작업을 위해서는 CSV 파일 경로 또는 Google Sheets URL이 필요합니다.") parser.print_help() return try: users_data = [] if args.source.startswith('http'): users_data = csv_from_google_sheet_url(args.source) else: users_data = csv_to_dict(args.source) expiry_timestamp = None if args.expiry_date: try: # 'YYYY-MM-DD' 형식의 문자열을 datetime 객체로 변환 dt = datetime.strptime(args.expiry_date, '%Y-%m-%d') # UTC 타임존을 명시적으로 설정 dt_utc = dt.replace(tzinfo=timezone.utc) # UTC 타임스탬프(초)로 변환 expiry_timestamp = int(dt_utc.timestamp()) print(f"만료 날짜가 {args.expiry_date}로 설정되었습니다 (UTC 타임스탬프: {expiry_timestamp}).") except ValueError: print(f"오류: 날짜 형식이 잘못되었습니다. 'YYYY-MM-DD' 형식을 사용해주세요. (입력값: {args.expiry_date})") return if args.action == 'create': create_users(users_data, expiry_timestamp) elif args.action == 'change_password': change_passwords(users_data) except FileNotFoundError: print(f"오류: 파일을 찾을 수 없습니다 - {args.source}") except Exception as e: print(f"오류가 발생했습니다: {e}") if __name__ == "__main__": main()