forked from baron/baron-sso
92 lines
2.7 KiB
Go
92 lines
2.7 KiB
Go
package repository
|
|
|
|
import (
|
|
"baron-sso-backend/internal/domain"
|
|
"context"
|
|
"time"
|
|
|
|
"gorm.io/gorm"
|
|
"gorm.io/gorm/clause"
|
|
)
|
|
|
|
type RPUsageOutboxRepository interface {
|
|
Create(ctx context.Context, event *domain.RPUsageEvent) error
|
|
ListReady(ctx context.Context, limit int) ([]domain.RPUsageEvent, error)
|
|
MarkProcessing(ctx context.Context, id string) error
|
|
MarkProcessed(ctx context.Context, id string) error
|
|
MarkFailed(ctx context.Context, id string, message string, nextAttemptAt time.Time) error
|
|
}
|
|
|
|
type rpUsageOutboxRepository struct {
|
|
db *gorm.DB
|
|
}
|
|
|
|
func NewRPUsageOutboxRepository(db *gorm.DB) RPUsageOutboxRepository {
|
|
return &rpUsageOutboxRepository{db: db}
|
|
}
|
|
|
|
func (r *rpUsageOutboxRepository) Create(ctx context.Context, event *domain.RPUsageEvent) error {
|
|
if event.Payload == nil {
|
|
event.Payload = domain.JSONMap{}
|
|
}
|
|
if event.Status == "" {
|
|
event.Status = domain.RPUsageOutboxStatusPending
|
|
}
|
|
if event.OccurredAt.IsZero() {
|
|
event.OccurredAt = time.Now()
|
|
}
|
|
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
|
|
Columns: []clause.Column{{Name: "dedupe_key"}},
|
|
DoNothing: true,
|
|
}).Create(event).Error
|
|
}
|
|
|
|
func (r *rpUsageOutboxRepository) ListReady(ctx context.Context, limit int) ([]domain.RPUsageEvent, error) {
|
|
if limit <= 0 || limit > 100 {
|
|
limit = 20
|
|
}
|
|
var rows []domain.RPUsageEvent
|
|
err := r.db.WithContext(ctx).
|
|
Where("status = ? AND (next_attempt_at IS NULL OR next_attempt_at <= ?)", domain.RPUsageOutboxStatusPending, time.Now()).
|
|
Order("occurred_at asc, created_at asc").
|
|
Limit(limit).
|
|
Find(&rows).Error
|
|
return rows, err
|
|
}
|
|
|
|
func (r *rpUsageOutboxRepository) MarkProcessing(ctx context.Context, id string) error {
|
|
return r.db.WithContext(ctx).
|
|
Model(&domain.RPUsageEvent{}).
|
|
Where("id = ? AND status = ?", id, domain.RPUsageOutboxStatusPending).
|
|
Updates(map[string]any{
|
|
"status": domain.RPUsageOutboxStatusProcessing,
|
|
"updated_at": time.Now(),
|
|
}).Error
|
|
}
|
|
|
|
func (r *rpUsageOutboxRepository) MarkProcessed(ctx context.Context, id string) error {
|
|
now := time.Now()
|
|
return r.db.WithContext(ctx).
|
|
Model(&domain.RPUsageEvent{}).
|
|
Where("id = ?", id).
|
|
Updates(map[string]any{
|
|
"status": domain.RPUsageOutboxStatusProcessed,
|
|
"last_error": "",
|
|
"processed_at": &now,
|
|
"updated_at": now,
|
|
}).Error
|
|
}
|
|
|
|
func (r *rpUsageOutboxRepository) MarkFailed(ctx context.Context, id string, message string, nextAttemptAt time.Time) error {
|
|
return r.db.WithContext(ctx).
|
|
Model(&domain.RPUsageEvent{}).
|
|
Where("id = ?", id).
|
|
Updates(map[string]any{
|
|
"status": domain.RPUsageOutboxStatusFailed,
|
|
"retry_count": gorm.Expr("retry_count + 1"),
|
|
"last_error": message,
|
|
"next_attempt_at": &nextAttemptAt,
|
|
"updated_at": time.Now(),
|
|
}).Error
|
|
}
|