diff --git a/internal/notif/dispatcher.go b/internal/notif/dispatcher.go index 8d4846c..27e1544 100644 --- a/internal/notif/dispatcher.go +++ b/internal/notif/dispatcher.go @@ -1,10 +1,12 @@ package notif import ( + "math" + "math/rand/v2" "time" "github.com/rs/zerolog" - "github.com/yusing/go-proxy/internal/gperr" + "github.com/rs/zerolog/log" "github.com/yusing/go-proxy/internal/task" F "github.com/yusing/go-proxy/internal/utils/functional" ) @@ -14,7 +16,7 @@ type ( task *task.Task providers F.Set[Provider] logCh chan *LogMessage - retryCh chan *RetryMessage + retryMsg F.Set[*RetryMessage] retryTicker *time.Ticker } LogMessage struct { @@ -23,32 +25,22 @@ type ( Body LogBody Color Color } - RetryMessage struct { - Message *LogMessage - Trials int - Provider Provider - } ) var dispatcher *Dispatcher -const retryInterval = 5 * time.Second - -var maxRetries = map[zerolog.Level]int{ - zerolog.DebugLevel: 1, - zerolog.InfoLevel: 1, - zerolog.WarnLevel: 3, - zerolog.ErrorLevel: 5, - zerolog.FatalLevel: 10, - zerolog.PanicLevel: 10, -} +const ( + retryInterval = time.Second + maxBackoffDelay = 5 * time.Minute + backoffMultiplier = 2.0 +) func StartNotifDispatcher(parent task.Parent) *Dispatcher { dispatcher = &Dispatcher{ task: parent.Subtask("notification", true), providers: F.NewSet[Provider](), - logCh: make(chan *LogMessage), - retryCh: make(chan *RetryMessage, 100), + logCh: make(chan *LogMessage, 100), + retryMsg: F.NewSet[*RetryMessage](), retryTicker: time.NewTicker(retryInterval), } go dispatcher.start() @@ -73,11 +65,10 @@ func (disp *Dispatcher) RegisterProvider(cfg *NotificationConfig) { func (disp *Dispatcher) start() { defer func() { - dispatcher = nil disp.providers.Clear() close(disp.logCh) - close(disp.retryCh) disp.task.Finish(nil) + dispatcher = nil }() for { @@ -90,22 +81,7 @@ func (disp *Dispatcher) start() { } go disp.dispatch(msg) case <-disp.retryTicker.C: - if len(disp.retryCh) == 0 { - continue - } - var msgs []*RetryMessage - done := false - for !done { - select { - case msg := <-disp.retryCh: - msgs = append(msgs, msg) - default: - done = true - } - } - if err := disp.retry(msgs); err != nil { - gperr.LogError("notification retry failed", err) - } + disp.processRetries() } } } @@ -114,34 +90,100 @@ func (disp *Dispatcher) dispatch(msg *LogMessage) { task := disp.task.Subtask("dispatcher", true) defer task.Finish("notif dispatched") + l := log.With(). + Str("level", msg.Level.String()). + Str("title", msg.Title).Logger() + disp.providers.RangeAllParallel(func(p Provider) { if err := msg.notify(task.Context(), p); err != nil { - disp.retryCh <- &RetryMessage{ - Message: msg, - Trials: 0, - Provider: p, + msg := &RetryMessage{ + Message: msg, + Trials: 0, + Provider: p, + NextRetry: time.Now().Add(calculateBackoffDelay(0)), } + disp.retryMsg.Add(msg) + l.Debug().Err(err).EmbedObject(msg).Msg("notification failed, scheduling retry") + } else { + l.Debug().Str("provider", p.GetName()).Msg("notification sent successfully") } }) } -func (disp *Dispatcher) retry(messages []*RetryMessage) error { +func (disp *Dispatcher) processRetries() { + if disp.retryMsg.Size() == 0 { + return + } + + now := time.Now() + + readyMessages := make([]*RetryMessage, 0) + for msg := range disp.retryMsg.Range { + if now.After(msg.NextRetry) { + readyMessages = append(readyMessages, msg) + disp.retryMsg.Remove(msg) + } + } + + disp.retry(readyMessages) +} + +func (disp *Dispatcher) retry(messages []*RetryMessage) { + if len(messages) == 0 { + return + } + task := disp.task.Subtask("retry", true) defer task.Finish("notif retried") - errs := gperr.NewBuilder("notification failure") + successCount := 0 + failureCount := 0 + for _, msg := range messages { + maxTrials := maxRetries[msg.Message.Level] + log.Debug().EmbedObject(msg).Msg("attempting notification retry") + err := msg.Message.notify(task.Context(), msg.Provider) if err == nil { + msg.NextRetry = time.Time{} + successCount++ + log.Debug().EmbedObject(msg).Msg("notification retry succeeded") continue } - if msg.Trials > maxRetries[msg.Message.Level] { - errs.Addf("notification provider %s failed after %d trials", msg.Provider.GetName(), msg.Trials) - errs.Add(err) - continue - } + msg.Trials++ - disp.retryCh <- msg + failureCount++ + + if msg.Trials >= maxTrials { + log.Warn().Err(err).EmbedObject(msg).Msg("notification permanently failed after max retries") + continue + } + + // Schedule next retry with exponential backoff + msg.NextRetry = time.Now().Add(calculateBackoffDelay(msg.Trials)) + disp.retryMsg.Add(msg) + + log.Debug().EmbedObject(msg).Msg("notification retry failed, scheduled for later") } - return errs.Error() + + log.Info(). + Int("total", len(messages)). + Int("successes", successCount). + Int("failures", failureCount). + Msg("notification retry batch completed") +} + +// calculateBackoffDelay implements exponential backoff with jitter. +func calculateBackoffDelay(trials int) time.Duration { + if trials == 0 { + return retryInterval + } + + // Exponential backoff: retryInterval * (backoffMultiplier ^ trials) + delay := min(float64(retryInterval)*math.Pow(backoffMultiplier, float64(trials)), float64(maxBackoffDelay)) + + // Add 20% jitter to prevent thundering herd + //nolint:gosec + jitter := delay * 0.2 * (rand.Float64() - 0.5) // -10% to +10% + return time.Duration(delay + jitter) } diff --git a/internal/notif/providers.go b/internal/notif/providers.go index be7ba9f..9f52a0b 100644 --- a/internal/notif/providers.go +++ b/internal/notif/providers.go @@ -8,7 +8,6 @@ import ( "net/http" "time" - "github.com/rs/zerolog/log" "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/serialization" ) @@ -72,13 +71,6 @@ func (msg *LogMessage) notify(ctx context.Context, provider Provider) error { switch resp.StatusCode { case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent: - body, _ := io.ReadAll(resp.Body) - log.Debug(). - Str("provider", provider.GetName()). - Str("url", provider.GetURL()). - Str("status", resp.Status). - RawJSON("resp_body", body). - Msg("notification sent") return nil default: return fmt.Errorf("http status %d: %w", resp.StatusCode, provider.fmtError(resp.Body)) diff --git a/internal/notif/retry_message.go b/internal/notif/retry_message.go new file mode 100644 index 0000000..a743945 --- /dev/null +++ b/internal/notif/retry_message.go @@ -0,0 +1,33 @@ +package notif + +import ( + "time" + + "github.com/rs/zerolog" +) + +type RetryMessage struct { + Message *LogMessage + Trials int + Provider Provider + NextRetry time.Time +} + +var maxRetries = map[zerolog.Level]int{ + zerolog.DebugLevel: 1, + zerolog.InfoLevel: 1, + zerolog.WarnLevel: 3, + zerolog.ErrorLevel: 5, + zerolog.FatalLevel: 10, + zerolog.PanicLevel: 10, +} + +func (msg *RetryMessage) MarshalZerologObject(e *zerolog.Event) { + e.Str("provider", msg.Provider.GetName()). + Int("trial", msg.Trials+1). + Str("title", msg.Message.Title) + if !msg.NextRetry.IsZero() { + e.Int("max_retries", maxRetries[msg.Message.Level]). + Time("next_retry", msg.NextRetry) + } +}