1
0
forked from baron/baron-sso
Files
baron-sso/backend/internal/service/worksmobile_relay_worker.go

261 lines
7.4 KiB
Go

package service
import (
"baron-sso-backend/internal/domain"
"baron-sso-backend/internal/repository"
"context"
"encoding/json"
"errors"
"log/slog"
"sort"
"strings"
"time"
)
type WorksmobileRelayWorker struct {
repo repository.WorksmobileOutboxRepository
client WorksmobileDirectoryClient
interval time.Duration
batchLimit int
}
func NewWorksmobileRelayWorker(repo repository.WorksmobileOutboxRepository, client WorksmobileDirectoryClient) *WorksmobileRelayWorker {
return &WorksmobileRelayWorker{
repo: repo,
client: client,
interval: 3 * time.Second,
batchLimit: 10,
}
}
func (w *WorksmobileRelayWorker) Start(ctx context.Context) {
if w.repo == nil || w.client == nil {
slog.Warn("Worksmobile relay worker disabled")
return
}
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
for {
if err := w.ProcessOnce(ctx); err != nil && !errors.Is(err, context.Canceled) {
slog.Warn("Worksmobile relay tick failed", "error", err)
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}
func (w *WorksmobileRelayWorker) ProcessOnce(ctx context.Context) error {
jobs, err := w.repo.ListReady(ctx, w.batchLimit)
if err != nil {
return err
}
jobs = sortWorksmobileReadyJobs(jobs)
for _, job := range jobs {
if err := w.processJob(ctx, job); err != nil {
slog.Warn("Worksmobile relay job failed", "jobID", job.ID, "resourceType", job.ResourceType, "resourceID", job.ResourceID, "error", err)
}
}
return nil
}
func (w *WorksmobileRelayWorker) processJob(ctx context.Context, job domain.WorksmobileOutbox) error {
claimed, err := w.repo.MarkProcessing(ctx, job.ID)
if err != nil {
return err
}
if !claimed {
return nil
}
err = w.dispatch(ctx, job)
if err != nil {
nextAttempt := time.Now().Add(worksmobileRetryDelay(job.RetryCount))
_ = w.repo.MarkFailed(ctx, job.ID, err.Error(), nextAttempt)
return err
}
return w.repo.MarkProcessed(ctx, job.ID)
}
func (w *WorksmobileRelayWorker) dispatch(ctx context.Context, job domain.WorksmobileOutbox) error {
if job.Action == domain.WorksmobileActionDryRun {
return nil
}
switch job.ResourceType {
case domain.WorksmobileResourceOrgUnit:
if job.Action == domain.WorksmobileActionDelete {
return w.client.DeleteOrgUnit(ctx, stringValue(job.Payload["worksmobileId"]))
}
if job.Action != domain.WorksmobileActionUpsert {
return nil
}
var payload WorksmobileOrgUnitPayload
if err := decodeWorksmobileRequest(job.Payload, &payload); err != nil {
return err
}
return w.client.UpsertOrgUnit(ctx, payload, stringValue(job.Payload["matchLocalPart"]))
case domain.WorksmobileResourceUser:
switch job.Action {
case domain.WorksmobileActionUpsert:
var payload WorksmobileUserPayload
if err := decodeWorksmobileRequest(job.Payload, &payload); err != nil {
return err
}
aliasEmails := append([]string(nil), payload.AliasEmails...)
payload.AliasEmails = nil
if err := w.client.UpsertUser(ctx, payload); err != nil {
return err
}
for _, aliasEmail := range aliasEmails {
if err := w.client.AddUserAliasEmail(ctx, payload.Email, aliasEmail); err != nil {
return err
}
}
if stringValue(job.Payload["baronStatus"]) == domain.UserStatusActive {
return w.client.SetUserActive(ctx, worksmobileOutboxUserIdentifier(job), true)
}
return nil
case domain.WorksmobileActionDelete:
return w.client.DeleteUser(ctx, worksmobileOutboxUserIdentifier(job))
case domain.WorksmobileActionSuspend:
return w.client.SetUserActive(ctx, worksmobileOutboxUserIdentifier(job), false)
case domain.WorksmobileActionPasswordReset:
var payload WorksmobilePasswordResetPayload
if err := decodeWorksmobileRequest(job.Payload, &payload); err != nil {
return err
}
identifier := strings.TrimSpace(payload.Email)
if identifier == "" {
identifier = worksmobileOutboxUserIdentifier(job)
}
return w.client.ResetUserPassword(ctx, identifier, payload.PasswordConfig.Password)
default:
return nil
}
default:
return nil
}
}
func sortWorksmobileReadyJobs(jobs []domain.WorksmobileOutbox) []domain.WorksmobileOutbox {
sorted := append([]domain.WorksmobileOutbox(nil), jobs...)
depthByID := worksmobileOrgUnitDepths(sorted)
sort.SliceStable(sorted, func(i, j int) bool {
leftClass := worksmobileRelayOrderClass(sorted[i])
rightClass := worksmobileRelayOrderClass(sorted[j])
if leftClass != rightClass {
return leftClass < rightClass
}
leftDepth := depthByID[sorted[i].ID]
rightDepth := depthByID[sorted[j].ID]
if leftDepth != rightDepth {
return leftDepth < rightDepth
}
return sorted[i].CreatedAt.Before(sorted[j].CreatedAt)
})
return sorted
}
func worksmobileRelayOrderClass(job domain.WorksmobileOutbox) int {
if job.ResourceType == domain.WorksmobileResourceOrgUnit && job.Action == domain.WorksmobileActionUpsert {
return 0
}
if job.ResourceType == domain.WorksmobileResourceUser {
return 1
}
return 2
}
func worksmobileOrgUnitDepths(jobs []domain.WorksmobileOutbox) map[string]int {
type orgUnitJob struct {
jobID string
parentKey string
}
byExternalKey := map[string]orgUnitJob{}
for _, job := range jobs {
externalKey, parentKey := worksmobileOrgUnitExternalKeys(job)
if externalKey == "" {
continue
}
byExternalKey[externalKey] = orgUnitJob{jobID: job.ID, parentKey: parentKey}
}
depthByExternalKey := map[string]int{}
var depth func(externalKey string, seen map[string]bool) int
depth = func(externalKey string, seen map[string]bool) int {
if value, ok := depthByExternalKey[externalKey]; ok {
return value
}
job, ok := byExternalKey[externalKey]
if !ok || job.parentKey == "" || seen[externalKey] {
depthByExternalKey[externalKey] = 0
return 0
}
seen[externalKey] = true
value := depth(job.parentKey, seen) + 1
delete(seen, externalKey)
depthByExternalKey[externalKey] = value
return value
}
depthByJobID := map[string]int{}
for externalKey, job := range byExternalKey {
depthByJobID[job.jobID] = depth(externalKey, map[string]bool{})
}
return depthByJobID
}
func worksmobileOrgUnitExternalKeys(job domain.WorksmobileOutbox) (string, string) {
if job.ResourceType != domain.WorksmobileResourceOrgUnit || job.Action != domain.WorksmobileActionUpsert {
return "", ""
}
var payload WorksmobileOrgUnitPayload
if err := decodeWorksmobileRequest(job.Payload, &payload); err != nil {
return "", ""
}
parentKey := strings.TrimSpace(payload.ParentOrgUnitID)
if strings.HasPrefix(parentKey, "externalKey:") {
parentKey = strings.TrimSpace(strings.TrimPrefix(parentKey, "externalKey:"))
} else {
parentKey = ""
}
return strings.TrimSpace(payload.OrgUnitExternalKey), parentKey
}
func worksmobileOutboxUserIdentifier(job domain.WorksmobileOutbox) string {
userID := stringValue(job.Payload["loginEmail"])
if userID == "" {
userID = stringValue(job.Payload["userExternalKey"])
}
return userID
}
func decodeWorksmobileRequest(payload domain.JSONMap, target any) error {
raw := payload["request"]
if raw == nil {
return errors.New("worksmobile request payload is missing")
}
data, err := json.Marshal(raw)
if err != nil {
return err
}
decoder := json.NewDecoder(strings.NewReader(string(data)))
decoder.DisallowUnknownFields()
return decoder.Decode(target)
}
func worksmobileRetryDelay(retryCount int) time.Duration {
if retryCount < 0 {
retryCount = 0
}
if retryCount > 5 {
retryCount = 5
}
return time.Duration(1<<retryCount) * time.Minute
}