package service import ( "baron-sso-backend/internal/domain" "baron-sso-backend/internal/repository" "context" "encoding/json" "errors" "log/slog" "strings" "time" ) type WorksmobileRelayWorker struct { repo repository.WorksmobileOutboxRepository client WorksmobileDirectoryClient interval time.Duration batchLimit int } func NewWorksmobileRelayWorker(repo repository.WorksmobileOutboxRepository, client WorksmobileDirectoryClient) *WorksmobileRelayWorker { return &WorksmobileRelayWorker{ repo: repo, client: client, interval: 3 * time.Second, batchLimit: 10, } } func (w *WorksmobileRelayWorker) Start(ctx context.Context) { if w.repo == nil || w.client == nil { slog.Warn("Worksmobile relay worker disabled") return } ticker := time.NewTicker(w.interval) defer ticker.Stop() for { if err := w.ProcessOnce(ctx); err != nil && !errors.Is(err, context.Canceled) { slog.Warn("Worksmobile relay tick failed", "error", err) } select { case <-ctx.Done(): return case <-ticker.C: } } } func (w *WorksmobileRelayWorker) ProcessOnce(ctx context.Context) error { jobs, err := w.repo.ListReady(ctx, w.batchLimit) if err != nil { return err } for _, job := range jobs { if err := w.processJob(ctx, job); err != nil { slog.Warn("Worksmobile relay job failed", "jobID", job.ID, "resourceType", job.ResourceType, "resourceID", job.ResourceID, "error", err) } } return nil } func (w *WorksmobileRelayWorker) processJob(ctx context.Context, job domain.WorksmobileOutbox) error { if err := w.repo.MarkProcessing(ctx, job.ID); err != nil { return err } err := w.dispatch(ctx, job) if err != nil { nextAttempt := time.Now().Add(worksmobileRetryDelay(job.RetryCount)) _ = w.repo.MarkFailed(ctx, job.ID, err.Error(), nextAttempt) return err } return w.repo.MarkProcessed(ctx, job.ID) } func (w *WorksmobileRelayWorker) dispatch(ctx context.Context, job domain.WorksmobileOutbox) error { if job.Action == domain.WorksmobileActionDryRun { return nil } switch job.ResourceType { case domain.WorksmobileResourceOrgUnit: if job.Action != domain.WorksmobileActionUpsert { return nil } var payload WorksmobileOrgUnitPayload if err := decodeWorksmobileRequest(job.Payload, &payload); err != nil { return err } err := w.client.CreateOrgUnit(ctx, payload) if apiErr, ok := err.(WorksmobileHTTPError); ok && apiErr.StatusCode == 409 { return nil } return err case domain.WorksmobileResourceUser: switch job.Action { case domain.WorksmobileActionUpsert: var payload WorksmobileUserPayload if err := decodeWorksmobileRequest(job.Payload, &payload); err != nil { return err } return w.client.UpsertUser(ctx, payload) case domain.WorksmobileActionDelete: userID := stringValue(job.Payload["loginEmail"]) if userID == "" { userID = stringValue(job.Payload["userExternalKey"]) } return w.client.DeleteUser(ctx, userID) default: return nil } default: return nil } } func decodeWorksmobileRequest(payload domain.JSONMap, target any) error { raw := payload["request"] if raw == nil { return errors.New("worksmobile request payload is missing") } data, err := json.Marshal(raw) if err != nil { return err } decoder := json.NewDecoder(strings.NewReader(string(data))) decoder.DisallowUnknownFields() return decoder.Decode(target) } func worksmobileRetryDelay(retryCount int) time.Duration { if retryCount < 0 { retryCount = 0 } if retryCount > 5 { retryCount = 5 } return time.Duration(1<