첫 커밋: 로컬 프로젝트 업로드
This commit is contained in:
78
baron-sso/backend/internal/service/keto_relay_worker.go
Normal file
78
baron-sso/backend/internal/service/keto_relay_worker.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"baron-sso-backend/internal/domain"
|
||||
"baron-sso-backend/internal/repository"
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
)
|
||||
|
||||
type KetoRelayWorker interface {
|
||||
Start(ctx context.Context)
|
||||
}
|
||||
|
||||
type ketoRelayWorker struct {
|
||||
outboxRepo repository.KetoOutboxRepository
|
||||
ketoService KetoService
|
||||
interval time.Duration
|
||||
maxRetries int
|
||||
}
|
||||
|
||||
func NewKetoRelayWorker(outboxRepo repository.KetoOutboxRepository, ketoService KetoService) KetoRelayWorker {
|
||||
return &ketoRelayWorker{
|
||||
outboxRepo: outboxRepo,
|
||||
ketoService: ketoService,
|
||||
interval: 5 * time.Second, // Poll every 5 seconds
|
||||
maxRetries: 5,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *ketoRelayWorker) Start(ctx context.Context) {
|
||||
slog.Info("[KetoRelayWorker] Starting worker...")
|
||||
ticker := time.NewTicker(w.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
slog.Info("[KetoRelayWorker] Stopping worker...")
|
||||
return
|
||||
case <-ticker.C:
|
||||
w.processEntries(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *ketoRelayWorker) processEntries(ctx context.Context) {
|
||||
entries, err := w.outboxRepo.FindPending(ctx, 50) // Process up to 50 at once
|
||||
if err != nil {
|
||||
slog.Error("[KetoRelayWorker] Failed to fetch pending entries", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
w.processEntry(ctx, entry)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *ketoRelayWorker) processEntry(ctx context.Context, entry domain.KetoOutbox) {
|
||||
var err error
|
||||
if entry.Action == domain.KetoOutboxActionCreate {
|
||||
err = w.ketoService.CreateRelation(ctx, entry.Namespace, entry.Object, entry.Relation, entry.Subject)
|
||||
} else if entry.Action == domain.KetoOutboxActionDelete {
|
||||
err = w.ketoService.DeleteRelation(ctx, entry.Namespace, entry.Object, entry.Relation, entry.Subject)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
slog.Error("[KetoRelayWorker] Failed to process entry", "id", entry.ID, "error", err)
|
||||
newRetryCount := entry.RetryCount + 1
|
||||
status := domain.KetoOutboxStatusPending
|
||||
if newRetryCount >= w.maxRetries {
|
||||
status = domain.KetoOutboxStatusFailed
|
||||
}
|
||||
_ = w.outboxRepo.UpdateStatus(ctx, entry.ID, status, newRetryCount, err.Error())
|
||||
} else {
|
||||
_ = w.outboxRepo.MarkProcessed(ctx, entry.ID)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user