forked from baron/baron-sso
138 lines
3.4 KiB
Go
138 lines
3.4 KiB
Go
package service
|
|
|
|
import (
|
|
"baron-sso-backend/internal/domain"
|
|
"baron-sso-backend/internal/repository"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"log/slog"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type WorksmobileRelayWorker struct {
|
|
repo repository.WorksmobileOutboxRepository
|
|
client WorksmobileDirectoryClient
|
|
interval time.Duration
|
|
batchLimit int
|
|
}
|
|
|
|
func NewWorksmobileRelayWorker(repo repository.WorksmobileOutboxRepository, client WorksmobileDirectoryClient) *WorksmobileRelayWorker {
|
|
return &WorksmobileRelayWorker{
|
|
repo: repo,
|
|
client: client,
|
|
interval: 3 * time.Second,
|
|
batchLimit: 10,
|
|
}
|
|
}
|
|
|
|
func (w *WorksmobileRelayWorker) Start(ctx context.Context) {
|
|
if w.repo == nil || w.client == nil {
|
|
slog.Warn("Worksmobile relay worker disabled")
|
|
return
|
|
}
|
|
ticker := time.NewTicker(w.interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
if err := w.ProcessOnce(ctx); err != nil && !errors.Is(err, context.Canceled) {
|
|
slog.Warn("Worksmobile relay tick failed", "error", err)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *WorksmobileRelayWorker) ProcessOnce(ctx context.Context) error {
|
|
jobs, err := w.repo.ListReady(ctx, w.batchLimit)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, job := range jobs {
|
|
if err := w.processJob(ctx, job); err != nil {
|
|
slog.Warn("Worksmobile relay job failed", "jobID", job.ID, "resourceType", job.ResourceType, "resourceID", job.ResourceID, "error", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *WorksmobileRelayWorker) processJob(ctx context.Context, job domain.WorksmobileOutbox) error {
|
|
if err := w.repo.MarkProcessing(ctx, job.ID); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := w.dispatch(ctx, job)
|
|
if err != nil {
|
|
nextAttempt := time.Now().Add(worksmobileRetryDelay(job.RetryCount))
|
|
_ = w.repo.MarkFailed(ctx, job.ID, err.Error(), nextAttempt)
|
|
return err
|
|
}
|
|
return w.repo.MarkProcessed(ctx, job.ID)
|
|
}
|
|
|
|
func (w *WorksmobileRelayWorker) dispatch(ctx context.Context, job domain.WorksmobileOutbox) error {
|
|
if job.Action == domain.WorksmobileActionDryRun {
|
|
return nil
|
|
}
|
|
|
|
switch job.ResourceType {
|
|
case domain.WorksmobileResourceOrgUnit:
|
|
if job.Action != domain.WorksmobileActionUpsert {
|
|
return nil
|
|
}
|
|
var payload WorksmobileOrgUnitPayload
|
|
if err := decodeWorksmobileRequest(job.Payload, &payload); err != nil {
|
|
return err
|
|
}
|
|
return w.client.UpsertOrgUnit(ctx, payload, stringValue(job.Payload["matchLocalPart"]))
|
|
case domain.WorksmobileResourceUser:
|
|
switch job.Action {
|
|
case domain.WorksmobileActionUpsert:
|
|
var payload WorksmobileUserPayload
|
|
if err := decodeWorksmobileRequest(job.Payload, &payload); err != nil {
|
|
return err
|
|
}
|
|
return w.client.UpsertUser(ctx, payload)
|
|
case domain.WorksmobileActionDelete:
|
|
userID := stringValue(job.Payload["loginEmail"])
|
|
if userID == "" {
|
|
userID = stringValue(job.Payload["userExternalKey"])
|
|
}
|
|
return w.client.DeleteUser(ctx, userID)
|
|
default:
|
|
return nil
|
|
}
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func decodeWorksmobileRequest(payload domain.JSONMap, target any) error {
|
|
raw := payload["request"]
|
|
if raw == nil {
|
|
return errors.New("worksmobile request payload is missing")
|
|
}
|
|
data, err := json.Marshal(raw)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
decoder := json.NewDecoder(strings.NewReader(string(data)))
|
|
decoder.DisallowUnknownFields()
|
|
return decoder.Decode(target)
|
|
}
|
|
|
|
func worksmobileRetryDelay(retryCount int) time.Duration {
|
|
if retryCount < 0 {
|
|
retryCount = 0
|
|
}
|
|
if retryCount > 5 {
|
|
retryCount = 5
|
|
}
|
|
return time.Duration(1<<retryCount) * time.Minute
|
|
}
|