393 lines
16 KiB
Python
393 lines
16 KiB
Python
import csv
|
|
import argparse
|
|
import os
|
|
import time
|
|
import ast
|
|
import requests
|
|
import io
|
|
from datetime import datetime, timezone
|
|
from descope import (
|
|
AssociatedTenant,
|
|
DescopeClient,
|
|
AuthException,
|
|
)
|
|
|
|
def get_descope_client():
|
|
"""
|
|
환경 변수에서 Descope Project ID와 Management Key를 읽어
|
|
DescopeClient를 초기화하고 반환합니다.
|
|
"""
|
|
project_id = os.getenv("DESCOPE_PROJECT_ID")
|
|
management_key = os.getenv("DESCOPE_MANAGEMENT_KEY")
|
|
|
|
if not project_id or not management_key:
|
|
raise ValueError("DESCOPE_PROJECT_ID와 DESCOPE_MANAGEMENT_KEY 환경 변수를 설정해야 합니다.")
|
|
|
|
return DescopeClient(project_id=project_id, management_key=management_key)
|
|
|
|
|
|
def csv_to_dict(csv_file_path):
|
|
"""
|
|
CSV 파일을 읽어 딕셔너리 리스트로 변환합니다.
|
|
"""
|
|
data = []
|
|
with open(csv_file_path, mode='r', encoding='utf-8') as csv_file:
|
|
csv_reader = csv.DictReader(csv_file)
|
|
for row in csv_reader:
|
|
data.append(row)
|
|
return data
|
|
|
|
def csv_from_google_sheet_url(url: str) -> list[dict]:
|
|
"""
|
|
Google Sheets URL에서 CSV 데이터를 가져와 딕셔너리 리스트로 변환합니다.
|
|
URL은 '.../export?format=csv' 형식이어야 합니다.
|
|
"""
|
|
print(f"Google Sheets에서 데이터 다운로드 중: {url}")
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status() # HTTP 오류가 발생하면 예외를 발생시킵니다.
|
|
|
|
# Google Sheets는 종종 UTF-8-sig로 인코딩된 CSV를 반환합니다 (BOM 포함).
|
|
csv_content = response.content.decode('utf-8-sig')
|
|
|
|
# 문자열을 파일처럼 다루기 위해 io.StringIO를 사용합니다.
|
|
csv_file = io.StringIO(csv_content)
|
|
|
|
csv_reader = csv.DictReader(csv_file)
|
|
data = [row for row in csv_reader]
|
|
print("다운로드 및 파싱 완료.")
|
|
return data
|
|
except requests.exceptions.RequestException as e:
|
|
raise RuntimeError(f"Google Sheets URL에서 데이터를 가져오는 데 실패했습니다: {e}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"CSV 데이터 처리 중 오류 발생: {e}")
|
|
|
|
|
|
def _prepare_user_data(user, expiry_timestamp=None):
|
|
"""
|
|
사용자 데이터로부터 Descope API에 필요한 테넌트, 커스텀 속성 등을 준비합니다.
|
|
미리 정의된 필드 외의 모든 키-값 쌍은 custom_attributes에 포함됩니다.
|
|
"""
|
|
login_id = user.get('login_id')
|
|
|
|
# Descope 사용자 객체의 최상위 레벨 필드 또는 스크립트에서 특별히 처리되는 필드 목록
|
|
known_fields = {
|
|
'login_id', 'email', 'display_name', 'tenants', 'role_name',
|
|
'status', 'new_password', 'custom_attributes_key', 'custom_attributes_value',
|
|
'company', 'egBimLExpiryDate'
|
|
}
|
|
|
|
user_tenants = []
|
|
tenants_str = user.get('tenants')
|
|
if tenants_str:
|
|
try:
|
|
tenant_ids = ast.literal_eval(tenants_str)
|
|
roles = [user.get('role_name')] if user.get('role_name') else []
|
|
|
|
if isinstance(tenant_ids, list):
|
|
for tenant_id in tenant_ids:
|
|
user_tenants.append(AssociatedTenant(tenant_id, roles))
|
|
else:
|
|
print(f"경고: {login_id}의 'tenants' 필드가 리스트 형식이 아닙니다: {tenants_str}")
|
|
except (ValueError, SyntaxError):
|
|
print(f"경고: {login_id}의 'tenants' 필드를 파싱할 수 없습니다: {tenants_str}")
|
|
|
|
custom_attributes = {}
|
|
# user 딕셔너리의 모든 항목을 순회하며 custom_attributes 구성
|
|
for key, value in user.items():
|
|
if key not in known_fields and value:
|
|
custom_attributes[key] = value
|
|
|
|
if user.get('custom_attributes_key') and user.get('custom_attributes_value'):
|
|
custom_attributes[user['custom_attributes_key']] = user['custom_attributes_value']
|
|
|
|
if user.get('company'):
|
|
custom_attributes['company'] = user['company']
|
|
|
|
custom_attributes['completeForm'] = True
|
|
|
|
# egBimLExpiryDate 처리
|
|
final_expiry_date = expiry_timestamp
|
|
if final_expiry_date is None and user.get('egBimLExpiryDate'):
|
|
final_expiry_date = convert_to_timestamp(user.get('egBimLExpiryDate'))
|
|
|
|
if final_expiry_date is not None:
|
|
custom_attributes['egBimLExpiryDate'] = final_expiry_date
|
|
|
|
return user_tenants, custom_attributes
|
|
|
|
|
|
def convert_to_timestamp(date_value):
|
|
"""
|
|
YYYY-MM-DD 형식의 문자열이나 숫자형 타임스탬프를 UTC 초 단위 타임스탬프로 변환합니다.
|
|
"""
|
|
if not date_value:
|
|
return None
|
|
|
|
if isinstance(date_value, int):
|
|
return date_value
|
|
|
|
if isinstance(date_value, str):
|
|
try:
|
|
return int(date_value)
|
|
except ValueError:
|
|
try:
|
|
dt = datetime.strptime(date_value, '%Y-%m-%d')
|
|
dt_utc = dt.replace(tzinfo=timezone.utc)
|
|
return int(dt_utc.timestamp())
|
|
except ValueError:
|
|
print(f"경고: 날짜 형식이 잘못되었습니다. '{date_value}'는 'YYYY-MM-DD' 또는 Unix 타임스탬프여야 합니다.")
|
|
return None
|
|
|
|
print(f"경고: 지원하지 않는 날짜 타입입니다: {type(date_value)}")
|
|
return None
|
|
|
|
|
|
def create_users(users_data, expiry_timestamp=None, dry_run=False):
|
|
"""
|
|
CSV에서 읽어온 사용자 데이터를 기반으로 Descope를 통해 계정을 일괄 생성합니다.
|
|
"""
|
|
print("--- 사용자 계정 일괄 생성을 시작합니다 ---")
|
|
if dry_run:
|
|
print("*** DRY RUN 모드로 실행됩니다. 실제 데이터는 변경되지 않습니다. ***")
|
|
try:
|
|
descope_client = get_descope_client()
|
|
for user in users_data:
|
|
try:
|
|
login_id = user.get('login_id')
|
|
if not login_id:
|
|
print(f"경고: 'login_id'가 없는 행을 건너뜁니다: {user}")
|
|
continue
|
|
|
|
user_tenants, custom_attributes = _prepare_user_data(user, expiry_timestamp)
|
|
status = user.get('status', 'activated')
|
|
|
|
if dry_run:
|
|
print(f"[DRY RUN] 사용자 생성 예정: {login_id}, 속성: {custom_attributes}")
|
|
if status != 'activated':
|
|
print(f"[DRY RUN] 사용자 상태 업데이트 예정: {login_id} -> {status}")
|
|
continue
|
|
|
|
print(f"사용자 생성 시도: {login_id}")
|
|
descope_client.mgmt.user.create(
|
|
login_id=login_id,
|
|
email=user.get('email'),
|
|
display_name=user.get('display_name'),
|
|
user_tenants=user_tenants,
|
|
custom_attributes=custom_attributes,
|
|
)
|
|
print(f"성공: {login_id} 사용자가 생성되었습니다.")
|
|
|
|
if status != 'activated':
|
|
try:
|
|
print(f"사용자 상태 업데이트 시도: {login_id} -> {status}")
|
|
descope_client.mgmt.user.update_status(login_id=login_id, status=status)
|
|
print(f"성공: {login_id} 사용자의 상태가 {status}로 변경되었습니다.")
|
|
except AuthException as e:
|
|
print(f"오류: {login_id} 사용자 상태 업데이트 실패 - {e}")
|
|
|
|
time.sleep(0.5)
|
|
|
|
except AuthException as e:
|
|
print(f"오류: {login_id} 사용자 생성 실패 - {e}")
|
|
except Exception as e:
|
|
print(f"예상치 못한 오류 발생 ({login_id}): {e}")
|
|
|
|
except ValueError as e:
|
|
print(f"오류: {e}")
|
|
print("--- 사용자 계정 일괄 생성이 완료되었습니다 ---")
|
|
|
|
|
|
def update_or_create_users(users_data, expiry_timestamp=None, dry_run=False):
|
|
"""
|
|
사용자 이메일(login_id)을 확인하여 존재하면 정보를 업데이트하고, 존재하지 않으면 새로 생성합니다.
|
|
"""
|
|
print("--- 사용자 정보 업데이트 또는 생성을 시작합니다 ---")
|
|
if dry_run:
|
|
print("*** DRY RUN 모드로 실행됩니다. 실제 데이터는 변경되지 않습니다. ***")
|
|
try:
|
|
descope_client = get_descope_client()
|
|
for user in users_data:
|
|
login_id = user.get('login_id')
|
|
if not login_id:
|
|
print(f"경고: 'login_id'가 없는 행을 건너뜁니다: {user}")
|
|
continue
|
|
|
|
try:
|
|
user_tenants, custom_attributes = _prepare_user_data(user, expiry_timestamp)
|
|
display_name = user.get('display_name')
|
|
|
|
if dry_run:
|
|
print(f"[DRY RUN] 사용자 업데이트 또는 생성 예정: {login_id}, 이름: {display_name}, 속성: {custom_attributes}")
|
|
continue
|
|
|
|
try:
|
|
descope_client.mgmt.user.load(login_id=login_id)
|
|
user_exists = True
|
|
except AuthException as e:
|
|
if "user not found" in str(e).lower() or "user not found" in str(e.args).lower():
|
|
user_exists = False
|
|
else:
|
|
raise e
|
|
|
|
if user_exists:
|
|
print(f"사용자 업데이트 시도: {login_id}")
|
|
descope_client.mgmt.user.update(
|
|
login_id=login_id,
|
|
email=user.get('email'),
|
|
display_name=display_name,
|
|
user_tenants=user_tenants,
|
|
custom_attributes=custom_attributes,
|
|
)
|
|
print(f"성공: {login_id} 사용자 정보가 업데이트되었습니다.")
|
|
else:
|
|
print(f"사용자 생성 시도: {login_id}")
|
|
descope_client.mgmt.user.create(
|
|
login_id=login_id,
|
|
email=user.get('email'),
|
|
display_name=display_name,
|
|
user_tenants=user_tenants,
|
|
custom_attributes=custom_attributes,
|
|
)
|
|
print(f"성공: {login_id} 사용자가 생성되었습니다.")
|
|
|
|
status = user.get('status', 'activated')
|
|
if status != 'activated':
|
|
try:
|
|
print(f"사용자 상태 업데이트 시도: {login_id} -> {status}")
|
|
descope_client.mgmt.user.update_status(login_id=login_id, status=status)
|
|
print(f"성공: {login_id} 사용자의 상태가 {status}로 변경되었습니다.")
|
|
except AuthException as e:
|
|
print(f"오류: {login_id} 사용자 상태 업데이트 실패 - {e}")
|
|
|
|
time.sleep(0.5)
|
|
|
|
except AuthException as e:
|
|
print(f"오류: {login_id} 처리 실패 - {e}")
|
|
except Exception as e:
|
|
print(f"예상치 못한 오류 발생 ({login_id}): {e}")
|
|
|
|
except ValueError as e:
|
|
print(f"오류: {e}")
|
|
print("--- 사용자 정보 업데이트 또는 생성이 완료되었습니다 ---")
|
|
|
|
|
|
def change_passwords(users_data, dry_run=False):
|
|
"""
|
|
CSV에서 읽어온 사용자 데이터를 기반으로 Descope를 통해 비밀번호를 일괄 변경합니다.
|
|
"""
|
|
print("--- 사용자 비밀번호 일괄 변경을 시작합니다 ---")
|
|
if dry_run:
|
|
print("*** DRY RUN 모드로 실행됩니다. 실제 데이터는 변경되지 않습니다. ***")
|
|
try:
|
|
descope_client = get_descope_client()
|
|
for user in users_data:
|
|
try:
|
|
login_id = user.get('login_id')
|
|
new_password = user.get('new_password')
|
|
|
|
if not login_id or not new_password:
|
|
print(f"경고: 'login_id' 또는 'new_password'가 없는 행을 건너뜁니다: {user}")
|
|
continue
|
|
|
|
if dry_run:
|
|
print(f"[DRY RUN] 비밀번호 변경 예정: {login_id}")
|
|
continue
|
|
|
|
print(f"비밀번호 변경 시도: {login_id}")
|
|
descope_client.mgmt.user.set_active_password(
|
|
login_id=login_id,
|
|
password=new_password,
|
|
)
|
|
descope_client.mgmt.user.activate(
|
|
login_id=login_id
|
|
)
|
|
print(f"성공: {login_id} 사용자의 비밀번호가 변경되었습니다.")
|
|
time.sleep(0.5)
|
|
|
|
except AuthException as e:
|
|
print(f"오류: {login_id} 사용자의 비밀번호 변경 실패 - {e}")
|
|
except Exception as e:
|
|
print(f"예상치 못한 오류 발생 ({login_id}): {e}")
|
|
|
|
except ValueError as e:
|
|
print(f"오류: {e}")
|
|
except Exception as e:
|
|
print(f"Descope 클라이언트 초기화 중 오류 발생: {e}")
|
|
|
|
print("--- 사용자 비밀번호 일괄 변경이 완료되었습니다 ---")
|
|
|
|
def test_create_single_user(dry_run=False):
|
|
"""
|
|
단일 사용자 생성을 테스트하는 함수입니다.
|
|
"""
|
|
print("--- 단일 사용자 생성 테스트를 시작합니다 ---")
|
|
test_user_data = [{
|
|
'login_id': 'testuser.gemini@example.com',
|
|
'email': 'testuser.gemini@example.com',
|
|
'display_name': 'Gemini Test User',
|
|
'tenants': '["T31ZmUcwOZbwk0y3YmMxrPCpzpQR"]',
|
|
'company': 'Gemini Test Inc.',
|
|
'egBimLExpiryDate': 1798729200,
|
|
'status': 'activated',
|
|
'is_test_user': True
|
|
}]
|
|
create_users(test_user_data, dry_run=dry_run)
|
|
print("--- 단일 사용자 생성 테스트가 완료되었습니다 ---")
|
|
|
|
|
|
def main():
|
|
"""
|
|
메인 실행 함수
|
|
"""
|
|
parser = argparse.ArgumentParser(description="CSV 파일 또는 Google Sheets URL을 이용해 Descope 사용자를 일괄 처리합니다.")
|
|
parser.add_argument("source", nargs='?', default=None, help="입력으로 사용할 CSV 파일 경로 또는 Google Sheets URL. 'test' action의 경우 필요하지 않습니다.")
|
|
parser.add_argument(
|
|
"--action",
|
|
choices=['create', 'change_password', 'update_or_create', 'test'],
|
|
required=True,
|
|
help="수행할 작업 (create: 생성, change_password: 비밀번호 변경, update_or_create: 업데이트 또는 생성, test: 테스트)"
|
|
)
|
|
parser.add_argument("--expiry-date", help="사용자 라이선스 만료일을 'YYYY-MM-DD' 형식으로 지정합니다. 이 값을 지정하면 CSV 파일의 모든 'egBimLExpiryDate' 값을 덮어씁니다.")
|
|
parser.add_argument("--dry-run", action='store_true', help="실제 API를 호출하지 않고 실행할 작업만 출력합니다.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.action == 'test':
|
|
test_create_single_user(dry_run=args.dry_run)
|
|
return
|
|
|
|
if not args.source:
|
|
print(f"오류: '{args.action}' 작업을 위해서는 CSV 파일 경로 또는 Google Sheets URL이 필요합니다.")
|
|
parser.print_help()
|
|
return
|
|
|
|
try:
|
|
users_data = []
|
|
if args.source.startswith('http'):
|
|
users_data = csv_from_google_sheet_url(args.source)
|
|
else:
|
|
users_data = csv_to_dict(args.source)
|
|
|
|
expiry_timestamp = None
|
|
if args.expiry_date:
|
|
expiry_timestamp = convert_to_timestamp(args.expiry_date)
|
|
if expiry_timestamp is None:
|
|
return
|
|
print(f"전체 만료 날짜가 {args.expiry_date}로 설정되었습니다 (UTC 타임스탬프: {expiry_timestamp}).")
|
|
|
|
if args.action == 'create':
|
|
create_users(users_data, expiry_timestamp, dry_run=args.dry_run)
|
|
elif args.action == 'change_password':
|
|
change_passwords(users_data, dry_run=args.dry_run)
|
|
elif args.action == 'update_or_create':
|
|
update_or_create_users(users_data, expiry_timestamp, dry_run=args.dry_run)
|
|
|
|
except FileNotFoundError:
|
|
print(f"오류: 파일을 찾을 수 없습니다 - {args.source}")
|
|
except Exception as e:
|
|
print(f"오류가 발생했습니다: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|