1
0
forked from baron/baron-sso
Files
baron-sso/backend/cmd/adminctl/worksmobile_sync.go

1717 lines
53 KiB
Go

package main
import (
"baron-sso-backend/internal/domain"
"baron-sso-backend/internal/repository"
"baron-sso-backend/internal/service"
"context"
"encoding/csv"
"errors"
"flag"
"fmt"
"io"
"os"
"slices"
"strings"
"time"
"gorm.io/gorm"
)
type worksmobileSyncConfig struct {
TenantSlug string
SyncOrgUnits bool
UsersCSV string
InspectUsersCSV string
InspectOrgUnitsCSV string
UndeleteUsersCSV string
RemoveAliasesCSV string
FindNumberStrippedAliasesOutput string
DuplicatePhoneCountryCodeOutput string
FixDuplicatePhoneCountryCode bool
PendingUsersOutput string
ResetPendingUsersPassword string
ResetPendingUsersResultOutput string
ComparisonOutput string
AlignBaronFromWorksOutput string
AlignBaronFromWorksExclude string
InspectOutput string
CredentialBatchID string
Process bool
SerializeOrgUnits bool
SerializeUsersBatch string
BatchSize int
Delay time.Duration
MaxCycles int
}
func runWorksmobileSync(args []string) error {
config, err := resolveWorksmobileSyncConfig(args)
if err != nil {
return err
}
db, err := openDB()
if err != nil {
return err
}
ctx := context.Background()
tenantRepo := repository.NewTenantRepository(db)
userRepo := repository.NewUserRepository(db)
userGroupRepo := repository.NewUserGroupRepository(db)
outboxRepo := repository.NewWorksmobileOutboxRepository(db)
tenantService := service.NewTenantService(tenantRepo, userRepo, userGroupRepo, nil)
syncService := service.NewWorksmobileSyncService(tenantService, userRepo, outboxRepo, newWorksmobileAdminClient())
root, err := tenantService.GetTenantBySlug(ctx, config.TenantSlug)
if err != nil {
return err
}
if config.SyncOrgUnits {
enqueued, skipped, failed, err := enqueueWorksmobileOrgUnits(ctx, db, syncService, root.ID)
if err != nil {
return err
}
fmt.Printf("worksmobile orgunits enqueued: enqueued=%d skipped=%d failed=%d\n", enqueued, skipped, failed)
}
if config.UsersCSV != "" {
batchID := strings.TrimSpace(config.CredentialBatchID)
if batchID == "" {
batchID = "final-sync-" + time.Now().UTC().Format("20060102-150405Z")
}
userIDs, err := readWorksmobileUserIDsCSV(config.UsersCSV)
if err != nil {
return err
}
enqueued, failed := enqueueWorksmobileUsers(ctx, syncService, root.ID, userIDs, batchID)
fmt.Printf("worksmobile users enqueued: enqueued=%d failed=%d credential_batch_id=%s\n", enqueued, failed, batchID)
}
if config.InspectUsersCSV != "" {
if err := inspectWorksmobileRemoteUsers(ctx, config.InspectUsersCSV, config.InspectOutput, newWorksmobileAdminClient()); err != nil {
return err
}
}
if config.InspectOrgUnitsCSV != "" {
if err := inspectWorksmobileRemoteOrgUnits(ctx, config.InspectOrgUnitsCSV, config.InspectOutput, newWorksmobileAdminClient()); err != nil {
return err
}
}
if config.UndeleteUsersCSV != "" {
if err := undeleteWorksmobileUsers(ctx, config.UndeleteUsersCSV, config.InspectOutput, newWorksmobileAdminClient()); err != nil {
return err
}
}
if config.RemoveAliasesCSV != "" {
if err := removeWorksmobileUserAliases(ctx, config.RemoveAliasesCSV, config.InspectOutput, newWorksmobileAdminClient()); err != nil {
return err
}
}
if config.FindNumberStrippedAliasesOutput != "" {
if err := findNumberStrippedWorksmobileAliases(ctx, config.FindNumberStrippedAliasesOutput, newWorksmobileAdminClient()); err != nil {
return err
}
}
if config.DuplicatePhoneCountryCodeOutput != "" || config.FixDuplicatePhoneCountryCode {
if err := auditAndMaybeFixWorksmobileDuplicatePhoneCountryCodes(ctx, config.DuplicatePhoneCountryCodeOutput, config.FixDuplicatePhoneCountryCode, newWorksmobileAdminClient()); err != nil {
return err
}
}
if config.PendingUsersOutput != "" || config.ResetPendingUsersPassword != "" {
if err := exportAndMaybeResetPendingWorksmobileUsers(ctx, config.PendingUsersOutput, config.ResetPendingUsersResultOutput, config.ResetPendingUsersPassword, newWorksmobileAdminClient()); err != nil {
return err
}
}
if config.ComparisonOutput != "" {
if err := exportWorksmobileNeedsUpdateComparison(ctx, syncService, root.ID, config.ComparisonOutput); err != nil {
return err
}
}
if config.AlignBaronFromWorksOutput != "" {
identityWriter := service.NewIdentityWriteService(service.NewKratosAdminService(), nil)
if err := alignBaronNeedsUpdateUsersFromWorks(ctx, db, syncService, userRepo, identityWriter, root.ID, config.AlignBaronFromWorksOutput, config.AlignBaronFromWorksExclude); err != nil {
return err
}
}
if config.Process {
return processWorksmobileOutbox(ctx, db, outboxRepo, config)
}
return nil
}
func resolveWorksmobileSyncConfig(args []string) (worksmobileSyncConfig, error) {
fs := flag.NewFlagSet("worksmobile-sync", flag.ContinueOnError)
fs.SetOutput(os.Stderr)
config := worksmobileSyncConfig{
TenantSlug: "hanmac-family",
BatchSize: 1,
Delay: 1500 * time.Millisecond,
MaxCycles: 5000,
}
fs.StringVar(&config.TenantSlug, "tenant-slug", config.TenantSlug, "root tenant slug")
fs.BoolVar(&config.SyncOrgUnits, "orgunits", false, "regenerate and enqueue orgunit upserts")
fs.StringVar(&config.UsersCSV, "users-csv", "", "CSV containing user_id column to regenerate and enqueue user upserts")
fs.StringVar(&config.InspectUsersCSV, "inspect-users-csv", "", "CSV containing email or login_email column to compare with remote Worksmobile users")
fs.StringVar(&config.InspectOrgUnitsCSV, "inspect-orgunits-csv", "", "CSV containing orgunit_external_key or tenant_id column to compare with remote Worksmobile orgUnits")
fs.StringVar(&config.UndeleteUsersCSV, "undelete-users-csv", "", "CSV containing email or login_email column to undelete Worksmobile users")
fs.StringVar(&config.RemoveAliasesCSV, "remove-aliases-csv", "", "CSV containing owner_email and alias_email columns to remove Worksmobile aliases")
fs.StringVar(&config.FindNumberStrippedAliasesOutput, "find-number-stripped-aliases-output", "", "output CSV path for aliases whose local-part equals owner local-part with trailing digits removed")
fs.StringVar(&config.DuplicatePhoneCountryCodeOutput, "duplicate-phone-country-code-output", "", "output CSV path for Worksmobile users whose cellPhone has duplicate Korean country code")
fs.BoolVar(&config.FixDuplicatePhoneCountryCode, "fix-duplicate-phone-country-code", false, "patch Worksmobile users whose cellPhone has duplicate Korean country code")
fs.StringVar(&config.PendingUsersOutput, "pending-users-output", "", "output CSV path for current Worksmobile users whose isPending flag is true")
fs.StringVar(&config.ResetPendingUsersPassword, "reset-pending-users-password", "", "reset current Worksmobile pending users to this password")
fs.StringVar(&config.ResetPendingUsersResultOutput, "reset-pending-users-result-output", "", "output CSV path for pending-user password reset results")
fs.StringVar(&config.ComparisonOutput, "comparison-output", "", "output CSV path for current Worksmobile user comparison rows whose status is needs_update")
fs.StringVar(&config.AlignBaronFromWorksOutput, "align-baron-from-works-output", "", "output CSV path for one-time Baron user updates from current Worksmobile needs_update rows")
fs.StringVar(&config.AlignBaronFromWorksExclude, "align-baron-from-works-exclude", "", "comma-separated emails or local-parts to exclude from --align-baron-from-works-output")
fs.StringVar(&config.InspectOutput, "inspect-output", "", "output CSV path for inspect/undelete commands")
fs.StringVar(&config.CredentialBatchID, "credential-batch-id", "", "credential batch id for regenerated user password rows")
fs.BoolVar(&config.Process, "process", false, "process ready Worksmobile outbox jobs")
fs.BoolVar(&config.SerializeOrgUnits, "serialize-orgunits", false, "process ORGUNIT pending jobs one at a time by releasing next_attempt_at serially")
fs.StringVar(&config.SerializeUsersBatch, "serialize-users-batch", "", "process USER pending jobs for the given credential batch one at a time by releasing next_attempt_at serially")
fs.IntVar(&config.BatchSize, "batch-size", config.BatchSize, "worker batch size")
fs.DurationVar(&config.Delay, "delay", config.Delay, "delay between worker cycles")
fs.IntVar(&config.MaxCycles, "max-cycles", config.MaxCycles, "maximum worker cycles")
if err := fs.Parse(args); err != nil {
return config, err
}
if !config.SyncOrgUnits && config.UsersCSV == "" && config.InspectUsersCSV == "" && config.InspectOrgUnitsCSV == "" && config.UndeleteUsersCSV == "" && config.RemoveAliasesCSV == "" && config.FindNumberStrippedAliasesOutput == "" && config.DuplicatePhoneCountryCodeOutput == "" && !config.FixDuplicatePhoneCountryCode && config.PendingUsersOutput == "" && config.ResetPendingUsersPassword == "" && config.ComparisonOutput == "" && config.AlignBaronFromWorksOutput == "" && !config.Process {
return config, fmt.Errorf("nothing to do; pass --orgunits, --users-csv, --inspect-users-csv, --inspect-orgunits-csv, --undelete-users-csv, --remove-aliases-csv, --find-number-stripped-aliases-output, --duplicate-phone-country-code-output, --fix-duplicate-phone-country-code, --pending-users-output, --reset-pending-users-password, --comparison-output, --align-baron-from-works-output, or --process")
}
if config.ResetPendingUsersPassword != "" && config.ResetPendingUsersResultOutput == "" {
return config, fmt.Errorf("--reset-pending-users-result-output is required with --reset-pending-users-password")
}
if config.ResetPendingUsersPassword != "" && config.PendingUsersOutput == "" {
return config, fmt.Errorf("--pending-users-output is required with --reset-pending-users-password")
}
return config, nil
}
func enqueueWorksmobileOrgUnits(ctx context.Context, db *gorm.DB, syncService service.WorksmobileAdminService, rootID string) (int, int, int, error) {
tenantIDs, err := activeTenantSubtreeIDs(ctx, db, rootID)
if err != nil {
return 0, 0, 0, err
}
enqueued, skipped, failed := 0, 0, 0
for _, tenantID := range tenantIDs {
if tenantID == rootID {
continue
}
_, err := syncService.EnqueueOrgUnitSync(ctx, rootID, tenantID)
if err == nil {
enqueued++
continue
}
if strings.Contains(err.Error(), "not a worksmobile orgunit tenant") ||
strings.Contains(err.Error(), "excluded from Worksmobile sync") {
skipped++
continue
}
failed++
fmt.Printf("worksmobile orgunit enqueue failed: tenant_id=%s error=%v\n", tenantID, err)
}
return enqueued, skipped, failed, nil
}
func activeTenantSubtreeIDs(ctx context.Context, db *gorm.DB, rootID string) ([]string, error) {
var ids []string
err := db.WithContext(ctx).Raw(`
WITH RECURSIVE subtree AS (
SELECT id, parent_id
FROM tenants
WHERE id = ? AND deleted_at IS NULL
UNION ALL
SELECT t.id, t.parent_id
FROM tenants t
JOIN subtree s ON t.parent_id = s.id
WHERE t.deleted_at IS NULL
)
SELECT id::text
FROM subtree
ORDER BY id::text
`, rootID).Scan(&ids).Error
return ids, err
}
func readWorksmobileUserIDsCSV(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
reader := csv.NewReader(file)
rows, err := reader.ReadAll()
if err != nil {
return nil, err
}
if len(rows) == 0 {
return nil, nil
}
userIDIndex := slices.Index(rows[0], "user_id")
if userIDIndex < 0 {
return nil, fmt.Errorf("CSV must contain user_id column: %s", path)
}
seen := map[string]bool{}
userIDs := make([]string, 0, len(rows)-1)
for _, row := range rows[1:] {
if userIDIndex >= len(row) {
continue
}
userID := strings.TrimSpace(row[userIDIndex])
if userID == "" || seen[userID] {
continue
}
seen[userID] = true
userIDs = append(userIDs, userID)
}
return userIDs, nil
}
type worksmobileInspectTarget struct {
Email string
EmployeeNumber string
}
func readWorksmobileInspectTargetsCSV(path string) ([]worksmobileInspectTarget, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
reader := csv.NewReader(file)
rows, err := reader.ReadAll()
if err != nil {
return nil, err
}
if len(rows) == 0 {
return nil, nil
}
emailIndex := slices.Index(rows[0], "email")
if emailIndex < 0 {
emailIndex = slices.Index(rows[0], "login_email")
}
if emailIndex < 0 {
return nil, fmt.Errorf("CSV must contain email or login_email column: %s", path)
}
employeeNumberIndex := slices.Index(rows[0], "employee_id")
if employeeNumberIndex < 0 {
employeeNumberIndex = slices.Index(rows[0], "employeeNumber")
}
if employeeNumberIndex < 0 {
employeeNumberIndex = slices.Index(rows[0], "employee_number")
}
seen := map[string]bool{}
targets := make([]worksmobileInspectTarget, 0, len(rows)-1)
for _, row := range rows[1:] {
if emailIndex >= len(row) {
continue
}
email := strings.TrimSpace(row[emailIndex])
if !strings.HasPrefix(email, "externalKey:") {
email = strings.ToLower(email)
}
if email == "" || seen[email] {
continue
}
seen[email] = true
target := worksmobileInspectTarget{Email: email}
if employeeNumberIndex >= 0 && employeeNumberIndex < len(row) {
target.EmployeeNumber = strings.TrimSpace(row[employeeNumberIndex])
}
targets = append(targets, target)
}
return targets, nil
}
func inspectWorksmobileRemoteUsers(ctx context.Context, usersCSV string, outputPath string, client service.WorksmobileDirectoryClient) error {
targets, err := readWorksmobileInspectTargetsCSV(usersCSV)
if err != nil {
return err
}
remoteUsers, err := client.ListUsers(ctx)
if err != nil {
return err
}
remoteByEmail := map[string][]service.WorksmobileRemoteUser{}
remoteByLocalPart := map[string][]service.WorksmobileRemoteUser{}
remoteByAliasLocalPart := map[string][]service.WorksmobileRemoteUser{}
remoteByEmployeeNumber := map[string][]service.WorksmobileRemoteUser{}
for _, remote := range remoteUsers {
email := strings.ToLower(strings.TrimSpace(remote.Email))
if email != "" {
remoteByEmail[email] = append(remoteByEmail[email], remote)
}
employeeNumber := strings.TrimSpace(remote.EmployeeNumber)
if employeeNumber != "" {
remoteByEmployeeNumber[employeeNumber] = append(remoteByEmployeeNumber[employeeNumber], remote)
}
for _, candidate := range []string{remote.Email, remote.UserName} {
localPart, err := domain.ExtractNormalizedEmailLocalPart(strings.ToLower(strings.TrimSpace(candidate)))
if err == nil && localPart != "" {
remoteByLocalPart[localPart] = append(remoteByLocalPart[localPart], remote)
}
}
for _, alias := range remote.AliasEmails {
localPart, err := domain.ExtractNormalizedEmailLocalPart(strings.ToLower(strings.TrimSpace(alias)))
if err == nil && localPart != "" {
remoteByAliasLocalPart[localPart] = append(remoteByAliasLocalPart[localPart], remote)
}
}
}
writer := csv.NewWriter(os.Stdout)
var file *os.File
if strings.TrimSpace(outputPath) != "" {
file, err = os.Create(outputPath)
if err != nil {
return err
}
defer file.Close()
writer = csv.NewWriter(file)
}
defer writer.Flush()
header := []string{
"target_email",
"target_local_part",
"target_employee_number",
"exact_email_match_count",
"local_part_match_count",
"alias_local_part_match_count",
"employee_number_match_count",
"match_kind",
"remote_email",
"remote_user_name",
"remote_display_name",
"remote_external_id",
"remote_alias_emails",
"remote_cell_phone",
"remote_employee_number",
"remote_domain_id",
"remote_domain_name",
"remote_primary_orgunit_name",
"remote_active",
}
if err := writer.Write(header); err != nil {
return err
}
for _, target := range targets {
email := target.Email
localPart, _ := domain.ExtractNormalizedEmailLocalPart(email)
exactMatches := remoteByEmail[email]
localMatches := remoteByLocalPart[localPart]
aliasLocalMatches := remoteByAliasLocalPart[localPart]
employeeNumberMatches := remoteByEmployeeNumber[strings.TrimSpace(target.EmployeeNumber)]
matchKind := "none"
matches := localMatches
if len(exactMatches) > 0 {
matchKind = "exact_email"
matches = exactMatches
} else if len(localMatches) > 0 {
matchKind = "local_part"
} else if len(aliasLocalMatches) > 0 {
matchKind = "alias_local_part"
matches = aliasLocalMatches
} else if len(employeeNumberMatches) > 0 {
matchKind = "employee_number"
matches = employeeNumberMatches
}
if len(matches) == 0 {
if err := writer.Write([]string{email, localPart, target.EmployeeNumber, "0", "0", "0", "0", matchKind, "", "", "", "", "", "", "", "", "", ""}); err != nil {
return err
}
continue
}
for _, remote := range matches {
if err := writer.Write([]string{
email,
localPart,
target.EmployeeNumber,
fmt.Sprint(len(exactMatches)),
fmt.Sprint(len(localMatches)),
fmt.Sprint(len(aliasLocalMatches)),
fmt.Sprint(len(employeeNumberMatches)),
matchKind,
remote.Email,
remote.UserName,
remote.DisplayName,
remote.ExternalID,
strings.Join(remote.AliasEmails, ";"),
remote.CellPhone,
remote.EmployeeNumber,
fmt.Sprint(remote.DomainID),
remote.DomainName,
remote.PrimaryOrgUnitName,
fmt.Sprint(remote.Active),
}); err != nil {
return err
}
}
}
if err := writer.Error(); err != nil {
return err
}
if strings.TrimSpace(outputPath) != "" {
fmt.Printf("worksmobile remote user inspection written: %s targets=%d remote_users=%d\n", outputPath, len(targets), len(remoteUsers))
}
return nil
}
func readWorksmobileOrgUnitKeysCSV(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
reader := csv.NewReader(file)
rows, err := reader.ReadAll()
if err != nil {
return nil, err
}
if len(rows) == 0 {
return nil, nil
}
keyIndex := slices.Index(rows[0], "orgunit_external_key")
if keyIndex < 0 {
keyIndex = slices.Index(rows[0], "tenant_id")
}
if keyIndex < 0 {
keyIndex = slices.Index(rows[0], "orgUnitExternalKey")
}
if keyIndex < 0 {
keyIndex = slices.Index(rows[0], "local_part")
}
if keyIndex < 0 {
return nil, fmt.Errorf("CSV must contain orgunit_external_key, tenant_id, or orgUnitExternalKey column: %s", path)
}
seen := map[string]bool{}
keys := make([]string, 0, len(rows)-1)
for _, row := range rows[1:] {
if keyIndex >= len(row) {
continue
}
key := strings.TrimSpace(row[keyIndex])
key = strings.TrimPrefix(key, "externalKey:")
if key == "" || seen[key] {
continue
}
seen[key] = true
keys = append(keys, key)
}
return keys, nil
}
func inspectWorksmobileRemoteOrgUnits(ctx context.Context, orgUnitsCSV string, outputPath string, client service.WorksmobileDirectoryClient) error {
keys, err := readWorksmobileOrgUnitKeysCSV(orgUnitsCSV)
if err != nil {
return err
}
groups, err := client.ListGroups(ctx)
if err != nil {
return err
}
byExternalKey := map[string][]service.WorksmobileRemoteGroup{}
byLocalPart := map[string][]service.WorksmobileRemoteGroup{}
for _, group := range groups {
key := strings.TrimSpace(group.ExternalID)
if key != "" {
byExternalKey[key] = append(byExternalKey[key], group)
}
localPart := strings.ToLower(strings.TrimSpace(group.MailLocalPart))
if localPart != "" {
byLocalPart[localPart] = append(byLocalPart[localPart], group)
}
}
writer := csv.NewWriter(os.Stdout)
var file *os.File
if strings.TrimSpace(outputPath) != "" {
file, err = os.Create(outputPath)
if err != nil {
return err
}
defer file.Close()
writer = csv.NewWriter(file)
}
defer writer.Flush()
header := []string{
"target_external_key",
"match_count",
"match_kind",
"remote_id",
"remote_external_id",
"remote_display_name",
"remote_email",
"remote_mail_local_part",
"remote_domain_id",
"remote_domain_name",
"remote_parent_id",
"remote_parent_name",
}
if err := writer.Write(header); err != nil {
return err
}
for _, key := range keys {
matches := byExternalKey[key]
matchKind := "external_key"
if len(matches) == 0 {
matches = byLocalPart[strings.ToLower(key)]
matchKind = "local_part"
}
if len(matches) == 0 {
if err := writer.Write([]string{key, "0", "", "", "", "", "", "", "", "", "", ""}); err != nil {
return err
}
continue
}
for _, group := range matches {
if err := writer.Write([]string{
key,
fmt.Sprint(len(matches)),
matchKind,
group.ID,
group.ExternalID,
group.DisplayName,
group.Email,
group.MailLocalPart,
fmt.Sprint(group.DomainID),
group.DomainName,
group.ParentID,
group.ParentName,
}); err != nil {
return err
}
}
}
if err := writer.Error(); err != nil {
return err
}
if strings.TrimSpace(outputPath) != "" {
fmt.Printf("worksmobile remote orgunit inspection written: %s targets=%d remote_groups=%d\n", outputPath, len(keys), len(groups))
}
return nil
}
func undeleteWorksmobileUsers(ctx context.Context, usersCSV string, outputPath string, client *service.WorksmobileHTTPClient) error {
targets, err := readWorksmobileInspectTargetsCSV(usersCSV)
if err != nil {
return err
}
writer := csv.NewWriter(os.Stdout)
var file *os.File
if strings.TrimSpace(outputPath) != "" {
file, err = os.Create(outputPath)
if err != nil {
return err
}
defer file.Close()
writer = csv.NewWriter(file)
}
defer writer.Flush()
header := []string{
"email",
"get_before_status",
"get_before_user_id",
"get_before_external_id",
"get_before_is_deleted",
"undelete_status",
"undelete_error",
"get_after_status",
"get_after_user_id",
"get_after_external_id",
"get_after_is_deleted",
}
if err := writer.Write(header); err != nil {
return err
}
for _, target := range targets {
email := strings.TrimSpace(target.Email)
beforeStatus, beforeUser := worksmobileGetUserStatus(ctx, client, email)
undeleteStatus := "ok"
undeleteError := ""
if err := client.UndeleteUser(ctx, email); err != nil {
undeleteStatus = "error"
undeleteError = err.Error()
}
afterStatus, afterUser := worksmobileGetUserStatus(ctx, client, email)
row := []string{
email,
beforeStatus,
beforeUser.ID,
beforeUser.ExternalID,
fmt.Sprint(beforeUser.IsDeleted),
undeleteStatus,
undeleteError,
afterStatus,
afterUser.ID,
afterUser.ExternalID,
fmt.Sprint(afterUser.IsDeleted),
}
if err := writer.Write(row); err != nil {
return err
}
}
if err := writer.Error(); err != nil {
return err
}
if strings.TrimSpace(outputPath) != "" {
fmt.Printf("worksmobile undelete result written: %s targets=%d\n", outputPath, len(targets))
}
return nil
}
func worksmobileGetUserStatus(ctx context.Context, client *service.WorksmobileHTTPClient, email string) (string, service.WorksmobileRemoteUser) {
user, err := client.GetUser(ctx, email)
if err != nil {
return err.Error(), service.WorksmobileRemoteUser{}
}
if user == nil {
return "not_found", service.WorksmobileRemoteUser{}
}
return "ok", *user
}
type worksmobileAliasRemovalTarget struct {
OwnerEmail string
AliasEmail string
}
func readWorksmobileAliasRemovalTargetsCSV(path string) ([]worksmobileAliasRemovalTarget, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
reader := csv.NewReader(file)
rows, err := reader.ReadAll()
if err != nil {
return nil, err
}
if len(rows) == 0 {
return nil, nil
}
ownerIndex := slices.Index(rows[0], "owner_email")
aliasIndex := slices.Index(rows[0], "alias_email")
if ownerIndex < 0 || aliasIndex < 0 {
return nil, fmt.Errorf("CSV must contain owner_email and alias_email columns: %s", path)
}
targets := make([]worksmobileAliasRemovalTarget, 0, len(rows)-1)
seen := map[string]bool{}
for _, row := range rows[1:] {
if ownerIndex >= len(row) || aliasIndex >= len(row) {
continue
}
target := worksmobileAliasRemovalTarget{
OwnerEmail: strings.ToLower(strings.TrimSpace(row[ownerIndex])),
AliasEmail: strings.ToLower(strings.TrimSpace(row[aliasIndex])),
}
key := target.OwnerEmail + "\x00" + target.AliasEmail
if target.OwnerEmail == "" || target.AliasEmail == "" || seen[key] {
continue
}
seen[key] = true
targets = append(targets, target)
}
return targets, nil
}
func removeWorksmobileUserAliases(ctx context.Context, aliasesCSV string, outputPath string, client *service.WorksmobileHTTPClient) error {
targets, err := readWorksmobileAliasRemovalTargetsCSV(aliasesCSV)
if err != nil {
return err
}
writer := csv.NewWriter(os.Stdout)
var file *os.File
if strings.TrimSpace(outputPath) != "" {
file, err = os.Create(outputPath)
if err != nil {
return err
}
defer file.Close()
writer = csv.NewWriter(file)
}
defer writer.Flush()
header := []string{"owner_email", "alias_email", "status", "error"}
if err := writer.Write(header); err != nil {
return err
}
for _, target := range targets {
status := "ok"
errorMessage := ""
if err := client.RemoveUserAliasEmail(ctx, target.OwnerEmail, target.AliasEmail); err != nil {
status = "error"
errorMessage = err.Error()
}
if err := writer.Write([]string{target.OwnerEmail, target.AliasEmail, status, errorMessage}); err != nil {
return err
}
}
if err := writer.Error(); err != nil {
return err
}
if strings.TrimSpace(outputPath) != "" {
fmt.Printf("worksmobile alias removal result written: %s targets=%d\n", outputPath, len(targets))
}
return nil
}
func exportAndMaybeResetPendingWorksmobileUsers(ctx context.Context, pendingOutputPath string, resetResultOutputPath string, password string, client service.WorksmobileDirectoryClient) error {
remoteUsers, err := client.ListUsers(ctx)
if err != nil {
return err
}
pendingUsers := make([]service.WorksmobileRemoteUser, 0)
for _, remote := range remoteUsers {
if remote.IsPending {
pendingUsers = append(pendingUsers, remote)
}
}
if strings.TrimSpace(pendingOutputPath) != "" {
if err := writePendingWorksmobileUsersCSV(pendingOutputPath, pendingUsers); err != nil {
return err
}
fmt.Printf("worksmobile pending users written: %s pending=%d remote_users=%d\n", pendingOutputPath, len(pendingUsers), len(remoteUsers))
}
password = strings.TrimSpace(password)
if password == "" {
return nil
}
return resetPendingWorksmobileUserPasswords(ctx, resetResultOutputPath, pendingUsers, password, client)
}
func writePendingWorksmobileUsersCSV(outputPath string, pendingUsers []service.WorksmobileRemoteUser) error {
file, err := os.Create(outputPath)
if err != nil {
return err
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
header := []string{
"email",
"user_id",
"user_external_key",
"display_name",
"employee_number",
"domain_id",
"domain_name",
"is_awaiting",
"is_pending",
"is_suspended",
"is_deleted",
"active",
"primary_orgunit_id",
"primary_orgunit_name",
"reset_target",
}
if err := writer.Write(header); err != nil {
return err
}
for _, remote := range pendingUsers {
resetTarget := strings.TrimSpace(remote.Email) != "" && !remote.IsDeleted
if err := writer.Write([]string{
remote.Email,
remote.ID,
remote.ExternalID,
remote.DisplayName,
remote.EmployeeNumber,
fmt.Sprint(remote.DomainID),
remote.DomainName,
fmt.Sprint(remote.IsAwaiting),
fmt.Sprint(remote.IsPending),
fmt.Sprint(remote.IsSuspended),
fmt.Sprint(remote.IsDeleted),
fmt.Sprint(remote.Active),
remote.PrimaryOrgUnitID,
remote.PrimaryOrgUnitName,
fmt.Sprint(resetTarget),
}); err != nil {
return err
}
}
return writer.Error()
}
func resetPendingWorksmobileUserPasswords(ctx context.Context, outputPath string, pendingUsers []service.WorksmobileRemoteUser, password string, client service.WorksmobileDirectoryClient) error {
file, err := os.Create(outputPath)
if err != nil {
return err
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
header := []string{
"email",
"user_id",
"user_external_key",
"display_name",
"employee_number",
"domain_id",
"domain_name",
"is_awaiting",
"is_pending",
"is_suspended",
"is_deleted",
"active",
"status",
"error",
}
if err := writer.Write(header); err != nil {
return err
}
okCount := 0
errorCount := 0
skippedCount := 0
for _, remote := range pendingUsers {
status := "ok"
errorMessage := ""
if strings.TrimSpace(remote.Email) == "" {
status = "skipped"
errorMessage = "email is empty"
skippedCount++
} else if remote.IsDeleted {
status = "skipped"
errorMessage = "user is deleted"
skippedCount++
} else if err := client.ResetUserPassword(ctx, remote.Email, password); err != nil {
status = "error"
errorMessage = err.Error()
errorCount++
} else {
okCount++
}
if err := writer.Write([]string{
remote.Email,
remote.ID,
remote.ExternalID,
remote.DisplayName,
remote.EmployeeNumber,
fmt.Sprint(remote.DomainID),
remote.DomainName,
fmt.Sprint(remote.IsAwaiting),
fmt.Sprint(remote.IsPending),
fmt.Sprint(remote.IsSuspended),
fmt.Sprint(remote.IsDeleted),
fmt.Sprint(remote.Active),
status,
errorMessage,
}); err != nil {
return err
}
writer.Flush()
if err := writer.Error(); err != nil {
return err
}
}
fmt.Printf("worksmobile pending user password reset result written: %s targets=%d ok=%d skipped=%d errors=%d\n", outputPath, len(pendingUsers), okCount, skippedCount, errorCount)
return nil
}
func exportWorksmobileNeedsUpdateComparison(ctx context.Context, syncService service.WorksmobileAdminService, tenantID string, outputPath string) error {
comparison, err := syncService.GetComparison(ctx, tenantID, true)
if err != nil {
return err
}
file, err := os.Create(outputPath)
if err != nil {
return err
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
header := []string{
"baron_id",
"baron_name",
"baron_email",
"baron_primary_org_id",
"baron_primary_org_slug",
"baron_primary_org_name",
"worksmobile_id",
"external_key",
"worksmobile_name",
"worksmobile_email",
"worksmobile_domain_id",
"worksmobile_domain_name",
"worksmobile_primary_org_id",
"worksmobile_primary_org_name",
"worksmobile_primary_org_position_id",
"worksmobile_primary_org_position_name",
"worksmobile_primary_org_is_manager",
"worksmobile_level_id",
"worksmobile_level_name",
"worksmobile_task",
"status",
}
if err := writer.Write(header); err != nil {
return err
}
count := 0
for _, item := range comparison.Users {
if item.Status != "needs_update" {
continue
}
count++
isManager := ""
if item.WorksmobilePrimaryOrgIsManager != nil {
isManager = fmt.Sprint(*item.WorksmobilePrimaryOrgIsManager)
}
if err := writer.Write([]string{
item.BaronID,
item.BaronName,
item.BaronEmail,
item.BaronPrimaryOrgID,
item.BaronPrimaryOrgSlug,
item.BaronPrimaryOrgName,
item.WorksmobileID,
item.ExternalKey,
item.WorksmobileName,
item.WorksmobileEmail,
fmt.Sprint(item.WorksmobileDomainID),
item.WorksmobileDomainName,
item.WorksmobilePrimaryOrgID,
item.WorksmobilePrimaryOrgName,
item.WorksmobilePrimaryOrgPositionID,
item.WorksmobilePrimaryOrgPositionName,
isManager,
item.WorksmobileLevelID,
item.WorksmobileLevelName,
item.WorksmobileTask,
item.Status,
}); err != nil {
return err
}
}
if err := writer.Error(); err != nil {
return err
}
fmt.Printf("worksmobile needs_update comparison written: %s users=%d\n", outputPath, count)
return nil
}
func alignBaronNeedsUpdateUsersFromWorks(ctx context.Context, db *gorm.DB, syncService service.WorksmobileAdminService, userRepo repository.UserRepository, identityWriter service.IdentityWriteService, tenantID string, outputPath string, excludeRaw string) error {
comparison, err := syncService.GetComparison(ctx, tenantID, true)
if err != nil {
return err
}
if identityWriter == nil {
return fmt.Errorf("identity write service is required to align Baron users from WORKS")
}
excludes := parseWorksmobileAlignExcludes(excludeRaw)
file, err := os.Create(outputPath)
if err != nil {
return err
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
header := []string{
"baron_id",
"old_email",
"new_email",
"old_name",
"new_name",
"worksmobile_id",
"worksmobile_domain_name",
"worksmobile_primary_org_name",
"status",
"error",
}
if err := writer.Write(header); err != nil {
return err
}
targets := 0
updated := 0
skipped := 0
errorsCount := 0
for _, item := range comparison.Users {
if item.Status != "needs_update" {
continue
}
if worksmobileAlignExcluded(item, excludes) {
skipped++
if err := writer.Write([]string{
item.BaronID,
item.BaronEmail,
item.WorksmobileEmail,
item.BaronName,
item.WorksmobileName,
item.WorksmobileID,
item.WorksmobileDomainName,
item.WorksmobilePrimaryOrgName,
"skipped_excluded",
"",
}); err != nil {
return err
}
continue
}
classification, alignable := classifyWorksmobileAlignFromWorks(item)
if !alignable {
skipped++
if err := writer.Write([]string{
item.BaronID,
item.BaronEmail,
item.WorksmobileEmail,
item.BaronName,
item.WorksmobileName,
item.WorksmobileID,
item.WorksmobileDomainName,
item.WorksmobilePrimaryOrgName,
classification,
"",
}); err != nil {
return err
}
continue
}
targets++
status := classification
errorMessage := ""
oldEmail := item.BaronEmail
oldName := item.BaronName
newEmail := strings.TrimSpace(item.WorksmobileEmail)
newName := strings.TrimSpace(item.WorksmobileName)
user, findErr := userRepo.FindByID(ctx, item.BaronID)
if findErr != nil {
status = "error"
errorMessage = findErr.Error()
errorsCount++
} else {
if newEmail == "" {
newEmail = strings.TrimSpace(user.Email)
}
if newName == "" {
newName = strings.TrimSpace(user.Name)
}
identity, identityErr := identityWriter.GetIdentity(ctx, user.ID)
if identityErr != nil {
status = "error"
errorMessage = identityErr.Error()
errorsCount++
} else {
traits := copyKratosTraits(identity.Traits)
traits["email"] = newEmail
traits["name"] = newName
if _, updateErr := identityWriter.UpdateIdentity(ctx, service.IdentityUpdateRequest{
IdentityID: user.ID,
Traits: traits,
State: strings.TrimSpace(identity.State),
Reason: "worksmobile_align_baron_from_works",
Source: "adminctl_worksmobile_sync",
}); updateErr != nil {
status = "error"
errorMessage = updateErr.Error()
errorsCount++
}
}
}
if status != "error" {
updates := map[string]any{
"email": newEmail,
"name": newName,
"updated_at": time.Now().UTC(),
}
err = db.WithContext(ctx).Model(&domain.User{}).Where("id = ?", user.ID).Updates(updates).Error
if err != nil {
status = "error"
errorMessage = err.Error()
errorsCount++
} else {
updated++
}
}
if err := writer.Write([]string{
item.BaronID,
oldEmail,
newEmail,
oldName,
newName,
item.WorksmobileID,
item.WorksmobileDomainName,
item.WorksmobilePrimaryOrgName,
status,
errorMessage,
}); err != nil {
return err
}
}
if err := writer.Error(); err != nil {
return err
}
fmt.Printf("baron users aligned from worksmobile needs_update rows: %s targets=%d updated=%d skipped=%d errors=%d\n", outputPath, targets, updated, skipped, errorsCount)
return nil
}
func parseWorksmobileAlignExcludes(raw string) map[string]bool {
excludes := map[string]bool{}
for _, part := range strings.Split(raw, ",") {
value := strings.ToLower(strings.TrimSpace(part))
if value == "" {
continue
}
excludes[value] = true
if localPart, ok := emailLocalPart(value); ok {
excludes[localPart] = true
}
}
return excludes
}
func worksmobileAlignExcluded(item service.WorksmobileComparisonItem, excludes map[string]bool) bool {
for _, value := range []string{item.BaronEmail, item.WorksmobileEmail, item.BaronName, item.WorksmobileName, item.BaronID, item.WorksmobileID, item.ExternalKey} {
normalized := strings.ToLower(strings.TrimSpace(value))
if normalized == "" {
continue
}
if excludes[normalized] {
return true
}
if localPart, ok := emailLocalPart(normalized); ok && excludes[localPart] {
return true
}
}
return false
}
func classifyWorksmobileAlignFromWorks(item service.WorksmobileComparisonItem) (string, bool) {
oldEmail := strings.ToLower(strings.TrimSpace(item.BaronEmail))
newEmail := strings.ToLower(strings.TrimSpace(item.WorksmobileEmail))
if oldEmail == "" || newEmail == "" {
return "skipped_email_empty", false
}
if oldEmail == newEmail {
return "skipped_email_already_matched", false
}
oldLocalPart, oldOK := emailLocalPart(oldEmail)
newLocalPart, newOK := emailLocalPart(newEmail)
if !oldOK || !newOK {
return "skipped_email_invalid", false
}
if oldLocalPart != newLocalPart {
return "skipped_email_local_part_changed", false
}
return "updated", true
}
func copyKratosTraits(source map[string]any) map[string]any {
copied := make(map[string]any, len(source)+2)
for key, value := range source {
copied[key] = value
}
return copied
}
func findNumberStrippedWorksmobileAliases(ctx context.Context, outputPath string, client service.WorksmobileDirectoryClient) error {
remoteUsers, err := client.ListUsers(ctx)
if err != nil {
return err
}
file, err := os.Create(outputPath)
if err != nil {
return err
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
header := []string{
"owner_email",
"owner_display_name",
"owner_external_id",
"owner_employee_number",
"owner_domain_id",
"owner_domain_name",
"owner_local_part",
"stripped_local_part",
"alias_email",
"alias_local_part",
"reason",
}
if err := writer.Write(header); err != nil {
return err
}
count := 0
for _, remote := range remoteUsers {
ownerEmail := strings.ToLower(strings.TrimSpace(remote.Email))
ownerLocalPart, ok := emailLocalPart(ownerEmail)
if !ok {
continue
}
strippedLocalPart, ok := stripTrailingDigitsFromASCIIName(ownerLocalPart)
if !ok {
continue
}
for _, alias := range remote.AliasEmails {
aliasEmail := strings.ToLower(strings.TrimSpace(alias))
aliasLocalPart, ok := emailLocalPart(aliasEmail)
if !ok || aliasEmail == ownerEmail || aliasLocalPart != strippedLocalPart {
continue
}
count++
if err := writer.Write([]string{
ownerEmail,
remote.DisplayName,
remote.ExternalID,
remote.EmployeeNumber,
fmt.Sprint(remote.DomainID),
remote.DomainName,
ownerLocalPart,
strippedLocalPart,
aliasEmail,
aliasLocalPart,
"alias local-part equals owner local-part with trailing digits removed",
}); err != nil {
return err
}
}
}
if err := writer.Error(); err != nil {
return err
}
fmt.Printf("worksmobile number-stripped alias candidates written: %s candidates=%d remote_users=%d\n", outputPath, count, len(remoteUsers))
return nil
}
type worksmobilePhoneAuditClient interface {
ListUsers(ctx context.Context) ([]service.WorksmobileRemoteUser, error)
PatchUser(ctx context.Context, identifier string, payload service.WorksmobileUserPatchPayload) error
}
func auditAndMaybeFixWorksmobileDuplicatePhoneCountryCodes(ctx context.Context, outputPath string, fix bool, client worksmobilePhoneAuditClient) error {
writer := io.Writer(os.Stdout)
var file *os.File
var err error
if strings.TrimSpace(outputPath) != "" {
file, err = os.Create(outputPath)
if err != nil {
return err
}
defer file.Close()
writer = file
}
count, err := auditWorksmobileDuplicatePhoneCountryCodes(ctx, writer, fix, client)
if err != nil {
return err
}
if strings.TrimSpace(outputPath) != "" {
fmt.Printf("duplicate Worksmobile phone country-code rows written: %d path=%s fix=%t\n", count, outputPath, fix)
} else {
fmt.Printf("duplicate Worksmobile phone country-code rows written: %d fix=%t\n", count, fix)
}
return nil
}
func auditWorksmobileDuplicatePhoneCountryCodes(ctx context.Context, output io.Writer, fix bool, client worksmobilePhoneAuditClient) (int, error) {
remoteUsers, err := client.ListUsers(ctx)
if err != nil {
return 0, err
}
writer := csv.NewWriter(output)
defer writer.Flush()
header := []string{
"user_id",
"email",
"external_id",
"domain_id",
"domain_name",
"current_cell_phone",
"normalized_cell_phone",
"action",
}
if err := writer.Write(header); err != nil {
return 0, err
}
count := 0
for _, remote := range remoteUsers {
currentPhone := strings.TrimSpace(remote.CellPhone)
if !hasDuplicateKoreanCountryCode(currentPhone) {
continue
}
normalizedPhone := domain.NormalizePhoneNumber(currentPhone)
action := "audit"
if fix {
identifier := strings.TrimSpace(remote.ID)
if identifier == "" {
identifier = strings.TrimSpace(remote.Email)
}
if identifier == "" {
return count, fmt.Errorf("Worksmobile user identifier is empty for duplicate phone row")
}
if err := client.PatchUser(ctx, identifier, service.WorksmobileUserPatchPayload{
DomainID: remote.DomainID,
Email: strings.TrimSpace(remote.Email),
UserExternalKey: strings.TrimSpace(remote.ExternalID),
UserName: service.WorksmobileUserName{LastName: strings.TrimSpace(remote.DisplayName)},
CellPhone: normalizedPhone,
}); err != nil {
return count, err
}
action = "fixed"
}
row := []string{
strings.TrimSpace(remote.ID),
strings.TrimSpace(remote.Email),
strings.TrimSpace(remote.ExternalID),
fmt.Sprint(remote.DomainID),
strings.TrimSpace(remote.DomainName),
currentPhone,
normalizedPhone,
action,
}
if err := writer.Write(row); err != nil {
return count, err
}
count++
}
return count, writer.Error()
}
func hasDuplicateKoreanCountryCode(phone string) bool {
digits := strings.Builder{}
for _, r := range strings.TrimSpace(phone) {
if r >= '0' && r <= '9' {
digits.WriteRune(r)
}
}
return strings.HasPrefix(digits.String(), "8282")
}
func emailLocalPart(email string) (string, bool) {
localPart, err := domain.ExtractNormalizedEmailLocalPart(strings.ToLower(strings.TrimSpace(email)))
if err != nil || localPart == "" {
return "", false
}
return localPart, true
}
func stripTrailingDigitsFromASCIIName(localPart string) (string, bool) {
localPart = strings.ToLower(strings.TrimSpace(localPart))
if localPart == "" {
return "", false
}
end := len(localPart)
startDigits := end
for startDigits > 0 {
ch := localPart[startDigits-1]
if ch < '0' || ch > '9' {
break
}
startDigits--
}
if startDigits == end || startDigits == 0 {
return "", false
}
base := localPart[:startDigits]
for i := 0; i < len(base); i++ {
ch := base[i]
if ch < 'a' || ch > 'z' {
return "", false
}
}
return base, true
}
func enqueueWorksmobileUsers(ctx context.Context, syncService service.WorksmobileAdminService, rootID string, userIDs []string, batchID string) (int, int) {
enqueued, failed := 0, 0
for _, userID := range userIDs {
if _, err := syncService.EnqueueUserSync(ctx, rootID, userID, batchID, ""); err != nil {
failed++
fmt.Printf("worksmobile user enqueue failed: user_id=%s error=%v\n", userID, err)
continue
}
enqueued++
}
return enqueued, failed
}
func processWorksmobileOutbox(ctx context.Context, db *gorm.DB, repo repository.WorksmobileOutboxRepository, config worksmobileSyncConfig) error {
if config.SerializeOrgUnits {
return processSerializedWorksmobileOrgUnits(ctx, db, repo, config)
}
if strings.TrimSpace(config.SerializeUsersBatch) != "" {
return processSerializedWorksmobileUsers(ctx, db, repo, config)
}
worker := service.NewWorksmobileRelayWorker(repo, newWorksmobileAdminClient())
worker.SetBatchLimit(config.BatchSize)
for cycle := 1; cycle <= config.MaxCycles; cycle++ {
ready, err := countReadyWorksmobileJobs(ctx, db)
if err != nil {
return err
}
if ready == 0 {
fmt.Printf("worksmobile outbox processing complete: cycles=%d\n", cycle-1)
return printWorksmobileOutboxStatus(ctx, db)
}
if err := worker.ProcessOnce(ctx); err != nil && !errors.Is(err, context.Canceled) {
fmt.Printf("worksmobile outbox process cycle failed: cycle=%d error=%v\n", cycle, err)
}
if cycle == 1 || cycle%25 == 0 {
fmt.Printf("worksmobile outbox processing: cycle=%d ready_before=%d\n", cycle, ready)
}
if config.Delay > 0 {
time.Sleep(config.Delay)
}
}
return fmt.Errorf("worksmobile outbox processing hit max cycles: %d", config.MaxCycles)
}
func processSerializedWorksmobileOrgUnits(ctx context.Context, db *gorm.DB, repo repository.WorksmobileOutboxRepository, config worksmobileSyncConfig) error {
if err := deferPendingWorksmobileOrgUnits(ctx, db); err != nil {
return err
}
worker := service.NewWorksmobileRelayWorker(repo, newWorksmobileAdminClient())
worker.SetBatchLimit(1)
for cycle := 1; cycle <= config.MaxCycles; cycle++ {
released, err := releaseNextWorksmobileOrgUnit(ctx, db)
if err != nil {
return err
}
pending, err := countPendingWorksmobileOrgUnits(ctx, db)
if err != nil {
return err
}
if !released && pending == 0 {
fmt.Printf("worksmobile orgunit serialized processing complete: cycles=%d\n", cycle-1)
return printWorksmobileOutboxStatus(ctx, db)
}
if released {
if err := worker.ProcessOnce(ctx); err != nil && !errors.Is(err, context.Canceled) {
fmt.Printf("worksmobile orgunit process cycle failed: cycle=%d error=%v\n", cycle, err)
}
}
if cycle == 1 || cycle%25 == 0 {
fmt.Printf("worksmobile orgunit serialized processing: cycle=%d released=%t pending=%d\n", cycle, released, pending)
}
if config.Delay > 0 {
time.Sleep(config.Delay)
}
}
return fmt.Errorf("worksmobile orgunit serialized processing hit max cycles: %d", config.MaxCycles)
}
func processSerializedWorksmobileUsers(ctx context.Context, db *gorm.DB, repo repository.WorksmobileOutboxRepository, config worksmobileSyncConfig) error {
batchID := strings.TrimSpace(config.SerializeUsersBatch)
if batchID == "" {
return fmt.Errorf("serialize users batch id is required")
}
if err := deferPendingWorksmobileUsers(ctx, db, batchID); err != nil {
return err
}
worker := service.NewWorksmobileRelayWorker(repo, newWorksmobileAdminClient())
worker.SetBatchLimit(1)
for cycle := 1; cycle <= config.MaxCycles; cycle++ {
released, err := releaseNextWorksmobileUser(ctx, db, batchID)
if err != nil {
return err
}
pending, err := countPendingWorksmobileUsers(ctx, db, batchID)
if err != nil {
return err
}
if !released && pending == 0 {
fmt.Printf("worksmobile user serialized processing complete: batch_id=%s cycles=%d\n", batchID, cycle-1)
return printWorksmobileOutboxStatus(ctx, db)
}
if released {
if err := worker.ProcessOnce(ctx); err != nil && !errors.Is(err, context.Canceled) {
fmt.Printf("worksmobile user process cycle failed: cycle=%d batch_id=%s error=%v\n", cycle, batchID, err)
}
}
if cycle == 1 || cycle%100 == 0 {
fmt.Printf("worksmobile user serialized processing: batch_id=%s cycle=%d released=%t pending=%d\n", batchID, cycle, released, pending)
}
if config.Delay > 0 {
time.Sleep(config.Delay)
}
}
return fmt.Errorf("worksmobile user serialized processing hit max cycles: batch_id=%s max_cycles=%d", batchID, config.MaxCycles)
}
func deferPendingWorksmobileUsers(ctx context.Context, db *gorm.DB, batchID string) error {
return db.WithContext(ctx).Exec(`
UPDATE worksmobile_outboxes
SET next_attempt_at = now() + interval '2 hours',
updated_at = now()
WHERE status = ?
AND resource_type = ?
AND payload ->> 'credentialBatchId' = ?
`, domain.WorksmobileOutboxStatusPending, domain.WorksmobileResourceUser, batchID).Error
}
func releaseNextWorksmobileUser(ctx context.Context, db *gorm.DB, batchID string) (bool, error) {
result := db.WithContext(ctx).Exec(`
WITH next_job AS (
SELECT id
FROM worksmobile_outboxes
WHERE status = ?
AND resource_type = ?
AND payload ->> 'credentialBatchId' = ?
ORDER BY created_at ASC
LIMIT 1
)
UPDATE worksmobile_outboxes
SET next_attempt_at = NULL,
updated_at = now()
WHERE id IN (SELECT id FROM next_job)
`, domain.WorksmobileOutboxStatusPending, domain.WorksmobileResourceUser, batchID)
if result.Error != nil {
return false, result.Error
}
return result.RowsAffected > 0, nil
}
func countPendingWorksmobileUsers(ctx context.Context, db *gorm.DB, batchID string) (int64, error) {
var count int64
err := db.WithContext(ctx).Raw(`
SELECT count(*)
FROM worksmobile_outboxes
WHERE status = ?
AND resource_type = ?
AND payload ->> 'credentialBatchId' = ?
`, domain.WorksmobileOutboxStatusPending, domain.WorksmobileResourceUser, batchID).Scan(&count).Error
return count, err
}
func deferPendingWorksmobileOrgUnits(ctx context.Context, db *gorm.DB) error {
return db.WithContext(ctx).Exec(`
UPDATE worksmobile_outboxes
SET next_attempt_at = now() + interval '30 minutes',
updated_at = now()
WHERE status = ?
AND resource_type = ?
`, domain.WorksmobileOutboxStatusPending, domain.WorksmobileResourceOrgUnit).Error
}
func releaseNextWorksmobileOrgUnit(ctx context.Context, db *gorm.DB) (bool, error) {
result := db.WithContext(ctx).Exec(`
WITH candidates AS (
SELECT
id,
created_at,
NULLIF(payload #>> '{request,orgUnitExternalKey}', '') AS org_external_key,
CASE
WHEN payload #>> '{request,parentOrgUnitId}' LIKE 'externalKey:%'
THEN NULLIF(substr(payload #>> '{request,parentOrgUnitId}', length('externalKey:') + 1), '')
ELSE ''
END AS parent_external_key
FROM worksmobile_outboxes
WHERE status = ?
AND resource_type = ?
AND action = ?
),
next_job AS (
SELECT c.id
FROM candidates c
WHERE NOT (
c.parent_external_key <> ''
AND EXISTS (
SELECT 1
FROM worksmobile_outboxes parent_job
WHERE parent_job.resource_type = ?
AND parent_job.action = ?
AND parent_job.status <> ?
AND NULLIF(parent_job.payload #>> '{request,orgUnitExternalKey}', '') = c.parent_external_key
)
)
ORDER BY c.created_at ASC
LIMIT 1
)
UPDATE worksmobile_outboxes
SET next_attempt_at = NULL,
updated_at = now()
WHERE id IN (SELECT id FROM next_job)
`, domain.WorksmobileOutboxStatusPending, domain.WorksmobileResourceOrgUnit, domain.WorksmobileActionUpsert, domain.WorksmobileResourceOrgUnit, domain.WorksmobileActionUpsert, domain.WorksmobileOutboxStatusProcessed)
if result.Error != nil {
return false, result.Error
}
return result.RowsAffected > 0, nil
}
func countPendingWorksmobileOrgUnits(ctx context.Context, db *gorm.DB) (int64, error) {
var count int64
err := db.WithContext(ctx).Raw(`
SELECT count(*)
FROM worksmobile_outboxes
WHERE status = ?
AND resource_type = ?
`, domain.WorksmobileOutboxStatusPending, domain.WorksmobileResourceOrgUnit).Scan(&count).Error
return count, err
}
func countReadyWorksmobileJobs(ctx context.Context, db *gorm.DB) (int64, error) {
var count int64
err := db.WithContext(ctx).Raw(`
SELECT count(*)
FROM worksmobile_outboxes
WHERE status = ?
AND (next_attempt_at IS NULL OR next_attempt_at <= now())
`, domain.WorksmobileOutboxStatusPending).Scan(&count).Error
return count, err
}
func printWorksmobileOutboxStatus(ctx context.Context, db *gorm.DB) error {
type row struct {
ResourceType string
Status string
Count int64
}
var rows []row
if err := db.WithContext(ctx).Raw(`
SELECT resource_type, status, count(*) AS count
FROM worksmobile_outboxes
GROUP BY resource_type, status
ORDER BY resource_type, status
`).Scan(&rows).Error; err != nil {
return err
}
for _, row := range rows {
fmt.Printf("worksmobile outbox status: resource_type=%s status=%s count=%d\n", row.ResourceType, row.Status, row.Count)
}
return nil
}
func newWorksmobileAdminClient() *service.WorksmobileHTTPClient {
privateKey, _ := getenvFileOrValue("WORKS_ADMIN_OAUTH_CLIENT_PRIVATE_KEY_FILE", "WORKS_ADMIN_OAUTH_CLIENT_PRIVATE_KEY", "")
client := service.NewWorksmobileHTTPClientWithAuth(
getenv("WORKS_ADMIN_ACCESS_TOKEN", getenv("WORKS_ADMIN_OAUTH_ACCESS_TOKEN", "")),
getenv("WORKS_ADMIN_SCIM_TOKEN", ""),
service.WorksmobileOAuthConfig{
ClientID: getenv("WORKS_ADMIN_OAUTH_CLIENT_ID", ""),
ClientSecret: getenv("WORKS_ADMIN_OAUTH_CLIENT_SECRET", ""),
ServiceAccount: getenv("WORKS_ADMIN_OAUTH_CLIENT_SERVICE_ACCOUNT", ""),
PrivateKey: privateKey,
Scope: getenv("WORKS_ADMIN_OAUTH_SCOPE", "directory"),
TokenURL: getenv("WORKS_ADMIN_OAUTH_TOKEN_URL", ""),
},
)
client.BaseURL = strings.TrimSpace(getenv("WORKS_ADMIN_API_BASE_URL", ""))
client.RateLimiter = service.NewWorksmobileAPIRateLimiter(180, time.Minute)
return client
}
func getenvFileOrValue(fileKey string, valueKey string, fallback string) (string, error) {
if path := strings.TrimSpace(getenv(fileKey, "")); path != "" {
data, err := readEnvPath(path)
if err != nil {
return "", err
}
return string(data), nil
}
return getenv(valueKey, fallback), nil
}
func readEnvPath(path string) ([]byte, error) {
candidates := []string{path}
if !strings.HasPrefix(path, "/") {
candidates = append(candidates, "../"+path, "../../"+path)
}
var lastErr error
for _, candidate := range candidates {
data, err := os.ReadFile(candidate)
if err == nil {
return data, nil
}
lastErr = err
}
return nil, lastErr
}