refactor(notif): enhance retry mechanism with exponential backoff and jitter; replace retry channel with a set for managing retry messages

This commit is contained in:
yusing 2025-06-14 09:31:09 +08:00
parent 96b7c3fcec
commit 4abf61a421
3 changed files with 125 additions and 58 deletions

View file

@ -1,10 +1,12 @@
package notif
import (
"math"
"math/rand/v2"
"time"
"github.com/rs/zerolog"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/rs/zerolog/log"
"github.com/yusing/go-proxy/internal/task"
F "github.com/yusing/go-proxy/internal/utils/functional"
)
@ -14,7 +16,7 @@ type (
task *task.Task
providers F.Set[Provider]
logCh chan *LogMessage
retryCh chan *RetryMessage
retryMsg F.Set[*RetryMessage]
retryTicker *time.Ticker
}
LogMessage struct {
@ -23,32 +25,22 @@ type (
Body LogBody
Color Color
}
RetryMessage struct {
Message *LogMessage
Trials int
Provider Provider
}
)
var dispatcher *Dispatcher
const retryInterval = 5 * time.Second
var maxRetries = map[zerolog.Level]int{
zerolog.DebugLevel: 1,
zerolog.InfoLevel: 1,
zerolog.WarnLevel: 3,
zerolog.ErrorLevel: 5,
zerolog.FatalLevel: 10,
zerolog.PanicLevel: 10,
}
const (
retryInterval = time.Second
maxBackoffDelay = 5 * time.Minute
backoffMultiplier = 2.0
)
func StartNotifDispatcher(parent task.Parent) *Dispatcher {
dispatcher = &Dispatcher{
task: parent.Subtask("notification", true),
providers: F.NewSet[Provider](),
logCh: make(chan *LogMessage),
retryCh: make(chan *RetryMessage, 100),
logCh: make(chan *LogMessage, 100),
retryMsg: F.NewSet[*RetryMessage](),
retryTicker: time.NewTicker(retryInterval),
}
go dispatcher.start()
@ -73,11 +65,10 @@ func (disp *Dispatcher) RegisterProvider(cfg *NotificationConfig) {
func (disp *Dispatcher) start() {
defer func() {
dispatcher = nil
disp.providers.Clear()
close(disp.logCh)
close(disp.retryCh)
disp.task.Finish(nil)
dispatcher = nil
}()
for {
@ -90,22 +81,7 @@ func (disp *Dispatcher) start() {
}
go disp.dispatch(msg)
case <-disp.retryTicker.C:
if len(disp.retryCh) == 0 {
continue
}
var msgs []*RetryMessage
done := false
for !done {
select {
case msg := <-disp.retryCh:
msgs = append(msgs, msg)
default:
done = true
}
}
if err := disp.retry(msgs); err != nil {
gperr.LogError("notification retry failed", err)
}
disp.processRetries()
}
}
}
@ -114,34 +90,100 @@ func (disp *Dispatcher) dispatch(msg *LogMessage) {
task := disp.task.Subtask("dispatcher", true)
defer task.Finish("notif dispatched")
l := log.With().
Str("level", msg.Level.String()).
Str("title", msg.Title).Logger()
disp.providers.RangeAllParallel(func(p Provider) {
if err := msg.notify(task.Context(), p); err != nil {
disp.retryCh <- &RetryMessage{
Message: msg,
Trials: 0,
Provider: p,
msg := &RetryMessage{
Message: msg,
Trials: 0,
Provider: p,
NextRetry: time.Now().Add(calculateBackoffDelay(0)),
}
disp.retryMsg.Add(msg)
l.Debug().Err(err).EmbedObject(msg).Msg("notification failed, scheduling retry")
} else {
l.Debug().Str("provider", p.GetName()).Msg("notification sent successfully")
}
})
}
func (disp *Dispatcher) retry(messages []*RetryMessage) error {
func (disp *Dispatcher) processRetries() {
if disp.retryMsg.Size() == 0 {
return
}
now := time.Now()
readyMessages := make([]*RetryMessage, 0)
for msg := range disp.retryMsg.Range {
if now.After(msg.NextRetry) {
readyMessages = append(readyMessages, msg)
disp.retryMsg.Remove(msg)
}
}
disp.retry(readyMessages)
}
func (disp *Dispatcher) retry(messages []*RetryMessage) {
if len(messages) == 0 {
return
}
task := disp.task.Subtask("retry", true)
defer task.Finish("notif retried")
errs := gperr.NewBuilder("notification failure")
successCount := 0
failureCount := 0
for _, msg := range messages {
maxTrials := maxRetries[msg.Message.Level]
log.Debug().EmbedObject(msg).Msg("attempting notification retry")
err := msg.Message.notify(task.Context(), msg.Provider)
if err == nil {
msg.NextRetry = time.Time{}
successCount++
log.Debug().EmbedObject(msg).Msg("notification retry succeeded")
continue
}
if msg.Trials > maxRetries[msg.Message.Level] {
errs.Addf("notification provider %s failed after %d trials", msg.Provider.GetName(), msg.Trials)
errs.Add(err)
continue
}
msg.Trials++
disp.retryCh <- msg
failureCount++
if msg.Trials >= maxTrials {
log.Warn().Err(err).EmbedObject(msg).Msg("notification permanently failed after max retries")
continue
}
// Schedule next retry with exponential backoff
msg.NextRetry = time.Now().Add(calculateBackoffDelay(msg.Trials))
disp.retryMsg.Add(msg)
log.Debug().EmbedObject(msg).Msg("notification retry failed, scheduled for later")
}
return errs.Error()
log.Info().
Int("total", len(messages)).
Int("successes", successCount).
Int("failures", failureCount).
Msg("notification retry batch completed")
}
// calculateBackoffDelay implements exponential backoff with jitter.
func calculateBackoffDelay(trials int) time.Duration {
if trials == 0 {
return retryInterval
}
// Exponential backoff: retryInterval * (backoffMultiplier ^ trials)
delay := min(float64(retryInterval)*math.Pow(backoffMultiplier, float64(trials)), float64(maxBackoffDelay))
// Add 20% jitter to prevent thundering herd
//nolint:gosec
jitter := delay * 0.2 * (rand.Float64() - 0.5) // -10% to +10%
return time.Duration(delay + jitter)
}

View file

@ -8,7 +8,6 @@ import (
"net/http"
"time"
"github.com/rs/zerolog/log"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/serialization"
)
@ -72,13 +71,6 @@ func (msg *LogMessage) notify(ctx context.Context, provider Provider) error {
switch resp.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent:
body, _ := io.ReadAll(resp.Body)
log.Debug().
Str("provider", provider.GetName()).
Str("url", provider.GetURL()).
Str("status", resp.Status).
RawJSON("resp_body", body).
Msg("notification sent")
return nil
default:
return fmt.Errorf("http status %d: %w", resp.StatusCode, provider.fmtError(resp.Body))

View file

@ -0,0 +1,33 @@
package notif
import (
"time"
"github.com/rs/zerolog"
)
type RetryMessage struct {
Message *LogMessage
Trials int
Provider Provider
NextRetry time.Time
}
var maxRetries = map[zerolog.Level]int{
zerolog.DebugLevel: 1,
zerolog.InfoLevel: 1,
zerolog.WarnLevel: 3,
zerolog.ErrorLevel: 5,
zerolog.FatalLevel: 10,
zerolog.PanicLevel: 10,
}
func (msg *RetryMessage) MarshalZerologObject(e *zerolog.Event) {
e.Str("provider", msg.Provider.GetName()).
Int("trial", msg.Trials+1).
Str("title", msg.Message.Title)
if !msg.NextRetry.IsZero() {
e.Int("max_retries", maxRetries[msg.Message.Level]).
Time("next_retry", msg.NextRetry)
}
}