로컬 개발 구조변경

This commit is contained in:
Lectom C Han
2025-10-13 14:57:14 +09:00
parent 771a20225f
commit 48c95b2c24
2 changed files with 167 additions and 56 deletions

View File

@@ -13,6 +13,7 @@ on:
type: choice
options:
- create
- update_or_create
- change_password
expiry_date:
description: "라이선스 만료일 (YYYY-MM-DD). 입력 시 모든 사용자의 만료일을 덮어씁니다."

220
main.py
View File

@@ -63,10 +63,80 @@ def csv_from_google_sheet_url(url: str) -> list[dict]:
raise RuntimeError(f"CSV 데이터 처리 중 오류 발생: {e}")
def _prepare_user_data(user, expiry_timestamp=None):
"""
사용자 데이터로부터 Descope API에 필요한 테넌트, 커스텀 속성 등을 준비합니다.
"""
login_id = user.get('login_id')
user_tenants = []
tenants_str = user.get('tenants')
if tenants_str:
try:
tenant_ids = ast.literal_eval(tenants_str)
roles = [user.get('role_name')] if user.get('role_name') else []
if isinstance(tenant_ids, list):
for tenant_id in tenant_ids:
user_tenants.append(AssociatedTenant(tenant_id, roles))
else:
print(f"경고: {login_id}'tenants' 필드가 리스트 형식이 아닙니다: {tenants_str}")
except (ValueError, SyntaxError):
print(f"경고: {login_id}'tenants' 필드를 파싱할 수 없습니다: {tenants_str}")
custom_attributes = {}
if user.get('custom_attributes_key') and user.get('custom_attributes_value'):
custom_attributes[user['custom_attributes_key']] = user['custom_attributes_value']
if user.get('company'):
custom_attributes['company'] = user['company']
custom_attributes['completeForm'] = True
# egBimLExpiryDate 처리
final_expiry_date = expiry_timestamp # CLI 인자가 우선순위가 가장 높음
if final_expiry_date is None and user.get('egBimLExpiryDate'):
# CLI 인자가 없고 CSV에 값이 있을 경우, 해당 값을 변환하여 사용
final_expiry_date = convert_to_timestamp(user.get('egBimLExpiryDate'))
if final_expiry_date is not None:
custom_attributes['egBimLExpiryDate'] = final_expiry_date
return user_tenants, custom_attributes
def convert_to_timestamp(date_value):
"""
YYYY-MM-DD 형식의 문자열이나 숫자형 타임스탬프를 UTC 초 단위 타임스탬프로 변환합니다.
"""
if not date_value:
return None
# Already an integer timestamp
if isinstance(date_value, int):
return date_value
if isinstance(date_value, str):
# Check if it's a string representation of an integer
try:
return int(date_value)
except ValueError:
# If not, try to parse it as a YYYY-MM-DD date string
try:
dt = datetime.strptime(date_value, '%Y-%m-%d')
dt_utc = dt.replace(tzinfo=timezone.utc)
return int(dt_utc.timestamp())
except ValueError:
print(f"경고: 날짜 형식이 잘못되었습니다. '{date_value}''YYYY-MM-DD' 또는 Unix 타임스탬프여야 합니다.")
return None
print(f"경고: 지원하지 않는 날짜 타입입니다: {type(date_value)}")
return None
def create_users(users_data, expiry_timestamp=None):
"""
CSV에서 읽어온 사용자 데이터를 기반으로 Descope를 통해 계정을 일괄 생성합니다.
각 요청 사이에 0.5초의 지연을 줍니다.
"""
print("--- 사용자 계정 일괄 생성을 시작합니다 ---")
try:
@@ -78,54 +148,20 @@ def create_users(users_data, expiry_timestamp=None):
print(f"경고: 'login_id'가 없는 행을 건너뜁니다: {user}")
continue
email = user.get('email')
display_name = user.get('display_name')
user_tenants, custom_attributes = _prepare_user_data(user, expiry_timestamp)
user_tenants = []
tenants_str = user.get('tenants')
if tenants_str:
try:
# ast.literal_eval을 사용하여 안전하게 문자열을 Python 객체로 변환
tenant_ids = ast.literal_eval(tenants_str)
roles = [user.get('role_name')] if user.get('role_name') else []
if isinstance(tenant_ids, list):
for tenant_id in tenant_ids:
user_tenants.append(AssociatedTenant(tenant_id, roles))
else:
print(f"경고: {login_id}'tenants' 필드가 리스트 형식이 아닙니다: {tenants_str}")
except (ValueError, SyntaxError):
print(f"경고: {login_id}'tenants' 필드를 파싱할 수 없습니다: {tenants_str}")
custom_attributes = {}
if user.get('custom_attributes_key') and user.get('custom_attributes_value'):
custom_attributes[user['custom_attributes_key']] = user['custom_attributes_value']
if user.get('company'):
custom_attributes['company'] = user['company']
custom_attributes['completeForm'] = True
# egBimLExpiryDate 처리: 인자로 받은 타임스탬프가 있으면 덮어쓰고, 없으면 CSV 값 사용
if expiry_timestamp is not None:
custom_attributes['egBimLExpiryDate'] = expiry_timestamp
elif user.get('egBimLExpiryDate'):
custom_attributes['egBimLExpiryDate'] = user['egBimLExpiryDate']
status = user.get('status', 'activated') # 기본값 'activated'
status = user.get('status', 'activated')
print(f"사용자 생성 시도: {login_id}")
descope_client.mgmt.user.create(
login_id=login_id,
email=email,
display_name=display_name,
email=user.get('email'),
display_name=user.get('display_name'),
user_tenants=user_tenants,
custom_attributes=custom_attributes,
)
print(f"성공: {login_id} 사용자가 생성되었습니다.")
# 상태 업데이트가 필요한 경우 (기본값과 다른 경우)
if status != 'activated':
try:
print(f"사용자 상태 업데이트 시도: {login_id} -> {status}")
@@ -143,12 +179,85 @@ def create_users(users_data, expiry_timestamp=None):
except ValueError as e:
print(f"오류: {e}")
except Exception as e:
print(f"Descope 클라이언트 초기화 중 오류 발생: {e}")
print("--- 사용자 계정 일괄 생성이 완료되었습니다 ---")
def update_or_create_users(users_data, expiry_timestamp=None):
"""
사용자 이메일(login_id)을 확인하여 존재하면 정보를 업데이트하고, 존재하지 않으면 새로 생성합니다.
"""
print("--- 사용자 정보 업데이트 또는 생성을 시작합니다 ---")
try:
descope_client = get_descope_client()
for user in users_data:
login_id = user.get('login_id')
if not login_id:
print(f"경고: 'login_id'가 없는 행을 건너뜁니다: {user}")
continue
try:
user_tenants, custom_attributes = _prepare_user_data(user, expiry_timestamp)
display_name = user.get('display_name')
# 사용자 존재 여부 확인
try:
descope_client.mgmt.user.load(login_id=login_id)
user_exists = True
except AuthException as e:
# Descope SDK는 사용자를 찾지 못하면 AuthException을 발생시킵니다.
# 오류 메시지에 "not found"가 포함되어 있는지 확인하여 사용자가 없는 경우를 특정합니다.
if "user not found" in str(e).lower() or "user not found" in str(e.args).lower():
user_exists = False
else:
raise e # 다른 종류의 AuthException은 다시 발생시킴
if user_exists:
# 사용자 업데이트
print(f"사용자 업데이트 시도: {login_id}")
descope_client.mgmt.user.update(
login_id=login_id,
email=user.get('email'),
display_name=display_name,
user_tenants=user_tenants,
custom_attributes=custom_attributes,
)
print(f"성공: {login_id} 사용자 정보가 업데이트되었습니다.")
else:
# 사용자 생성
print(f"사용자 생성 시도: {login_id}")
descope_client.mgmt.user.create(
login_id=login_id,
email=user.get('email'),
display_name=display_name,
user_tenants=user_tenants,
custom_attributes=custom_attributes,
)
print(f"성공: {login_id} 사용자가 생성되었습니다.")
# 상태 업데이트 (생성/업데이트 공통)
status = user.get('status', 'activated')
# 'activated' 상태는 기본값이므로 별도로 업데이트할 필요가 없습니다.
# 다른 상태일 경우에만 상태 업데이트를 시도합니다.
if status != 'activated':
try:
print(f"사용자 상태 업데이트 시도: {login_id} -> {status}")
descope_client.mgmt.user.update_status(login_id=login_id, status=status)
print(f"성공: {login_id} 사용자의 상태가 {status}로 변경되었습니다.")
except AuthException as e:
print(f"오류: {login_id} 사용자 상태 업데이트 실패 - {e}")
time.sleep(0.5)
except AuthException as e:
print(f"오류: {login_id} 처리 실패 - {e}")
except Exception as e:
print(f"예상치 못한 오류 발생 ({login_id}): {e}")
except ValueError as e:
print(f"오류: {e}")
print("--- 사용자 정보 업데이트 또는 생성이 완료되었습니다 ---")
def change_passwords(users_data):
"""
CSV에서 읽어온 사용자 데이터를 기반으로 Descope를 통해 비밀번호를 일괄 변경합니다.
@@ -212,9 +321,14 @@ def main():
"""
메인 실행 함수
"""
parser = argparse.ArgumentParser(description="CSV 파일 또는 Google Sheets URL을 이용해 Descope 사용자를 일괄 생성하거나 비밀번호를 변경합니다.")
parser = argparse.ArgumentParser(description="CSV 파일 또는 Google Sheets URL을 이용해 Descope 사용자를 일괄 처리합니다.")
parser.add_argument("source", nargs='?', default=None, help="입력으로 사용할 CSV 파일 경로 또는 Google Sheets URL. 'test' action의 경우 필요하지 않습니다.")
parser.add_argument("--action", choices=['create', 'change_password', 'test'], required=True, help="수행할 작업 (create: 사용자 생성, change_password: 비밀번호 변경, test: 단일 사용자 생성 테스트)")
parser.add_argument(
"--action",
choices=['create', 'change_password', 'update_or_create', 'test'],
required=True,
help="수행할 작업 (create: 생성, change_password: 비밀번호 변경, update_or_create: 업데이트 또는 생성, test: 테스트)"
)
parser.add_argument("--expiry-date", help="사용자 라이선스 만료일을 'YYYY-MM-DD' 형식으로 지정합니다. 이 값을 지정하면 CSV 파일의 모든 'egBimLExpiryDate' 값을 덮어씁니다.")
args = parser.parse_args()
@@ -224,7 +338,7 @@ def main():
return
if not args.source:
print("오류: 'create' 또는 'change_password' 작업을 위해서는 CSV 파일 경로 또는 Google Sheets URL이 필요합니다.")
print(f"오류: '{args.action}' 작업을 위해서는 CSV 파일 경로 또는 Google Sheets URL이 필요합니다.")
parser.print_help()
return
@@ -237,22 +351,18 @@ def main():
expiry_timestamp = None
if args.expiry_date:
try:
# 'YYYY-MM-DD' 형식의 문자열을 datetime 객체로 변환
dt = datetime.strptime(args.expiry_date, '%Y-%m-%d')
# UTC 타임존을 명시적으로 설정
dt_utc = dt.replace(tzinfo=timezone.utc)
# UTC 타임스탬프(초)로 변환
expiry_timestamp = int(dt_utc.timestamp())
print(f"만료 날짜가 {args.expiry_date}로 설정되었습니다 (UTC 타임스탬프: {expiry_timestamp}).")
except ValueError:
print(f"오류: 날짜 형식이 잘못되었습니다. 'YYYY-MM-DD' 형식을 사용해주세요. (입력값: {args.expiry_date})")
expiry_timestamp = convert_to_timestamp(args.expiry_date)
if expiry_timestamp is None:
# convert_to_timestamp 함수 내부에서 이미 경고 메시지를 출력하므로 여기서는 종료만 합니다.
return
print(f"전체 만료 날짜가 {args.expiry_date}로 설정되었습니다 (UTC 타임스탬프: {expiry_timestamp}).")
if args.action == 'create':
create_users(users_data, expiry_timestamp)
elif args.action == 'change_password':
change_passwords(users_data)
elif args.action == 'update_or_create':
update_or_create_users(users_data, expiry_timestamp)
except FileNotFoundError:
print(f"오류: 파일을 찾을 수 없습니다 - {args.source}")