diff --git a/internal/notif/base.go b/internal/notif/base.go index da55939..62ac650 100644 --- a/internal/notif/base.go +++ b/internal/notif/base.go @@ -16,9 +16,16 @@ type ProviderBase struct { Format *LogFormat `json:"format"` } +type rawError []byte + +func (e rawError) Error() string { + return string(e) +} + var ( ErrMissingToken = gperr.New("token is required") ErrURLMissingScheme = gperr.New("url missing scheme, expect 'http://' or 'https://'") + ErrUnknownError = gperr.New("unknown error") ) // Validate implements the utils.CustomValidator interface. @@ -61,10 +68,10 @@ func (base *ProviderBase) SetHeaders(logMsg *LogMessage, headers http.Header) { // no-op by default } -func (base *ProviderBase) makeRespError(resp *http.Response) error { - body, err := io.ReadAll(resp.Body) - if err == nil { - return gperr.Errorf("%s status %d: %s", base.Name, resp.StatusCode, body) +func (base *ProviderBase) fmtError(respBody io.Reader) error { + body, err := io.ReadAll(respBody) + if err == nil && len(body) > 0 { + return rawError(body) } - return gperr.Errorf("%s status %d", base.Name, resp.StatusCode) + return ErrUnknownError } diff --git a/internal/notif/body.go b/internal/notif/body.go index cac661f..bbbdb82 100644 --- a/internal/notif/body.go +++ b/internal/notif/body.go @@ -55,6 +55,10 @@ func (f *LogFormat) Parse(format string) error { return nil } +func (f *FieldsBody) Add(name, value string) { + *f = append(*f, LogField{Name: name, Value: value}) +} + func (f FieldsBody) Format(format *LogFormat) ([]byte, error) { switch format { case LogFormatMarkdown: diff --git a/internal/notif/dispatcher.go b/internal/notif/dispatcher.go index 9e7cba9..56a13d9 100644 --- a/internal/notif/dispatcher.go +++ b/internal/notif/dispatcher.go @@ -1,18 +1,21 @@ package notif import ( + "time" + "github.com/rs/zerolog" "github.com/yusing/go-proxy/internal/gperr" - "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/task" F "github.com/yusing/go-proxy/internal/utils/functional" ) type ( Dispatcher struct { - task *task.Task - logCh chan *LogMessage - providers F.Set[Provider] + task *task.Task + providers F.Set[Provider] + logCh chan *LogMessage + retryCh chan *RetryMessage + retryTicker *time.Ticker } LogMessage struct { Level zerolog.Level @@ -20,17 +23,33 @@ type ( Body LogBody Color Color } + RetryMessage struct { + Message *LogMessage + Trials int + Provider Provider + } ) var dispatcher *Dispatcher -const dispatchErr = "notification dispatch error" +const retryInterval = 5 * time.Second + +var maxRetries = map[zerolog.Level]int{ + zerolog.DebugLevel: 1, + zerolog.InfoLevel: 1, + zerolog.WarnLevel: 3, + zerolog.ErrorLevel: 5, + zerolog.FatalLevel: 10, + zerolog.PanicLevel: 10, +} func StartNotifDispatcher(parent task.Parent) *Dispatcher { dispatcher = &Dispatcher{ - task: parent.Subtask("notification"), - logCh: make(chan *LogMessage), - providers: F.NewSet[Provider](), + task: parent.Subtask("notification"), + providers: F.NewSet[Provider](), + logCh: make(chan *LogMessage), + retryCh: make(chan *RetryMessage, 100), + retryTicker: time.NewTicker(retryInterval), } go dispatcher.start() return dispatcher @@ -48,10 +67,6 @@ func Notify(msg *LogMessage) { } } -func (f *FieldsBody) Add(name, value string) { - *f = append(*f, LogField{Name: name, Value: value}) -} - func (disp *Dispatcher) RegisterProvider(cfg *NotificationConfig) { disp.providers.Add(cfg.Provider) } @@ -61,6 +76,7 @@ func (disp *Dispatcher) start() { dispatcher = nil disp.providers.Clear() close(disp.logCh) + close(disp.retryCh) disp.task.Finish(nil) }() @@ -73,6 +89,23 @@ func (disp *Dispatcher) start() { return } go disp.dispatch(msg) + case <-disp.retryTicker.C: + if len(disp.retryCh) == 0 { + continue + } + var msgs []*RetryMessage + done := false + for !done { + select { + case msg := <-disp.retryCh: + msgs = append(msgs, msg) + default: + done = true + } + } + if err := disp.retry(msgs); err != nil { + gperr.LogError("notification retry failed", err) + } } } } @@ -81,15 +114,34 @@ func (disp *Dispatcher) dispatch(msg *LogMessage) { task := disp.task.Subtask("dispatcher") defer task.Finish("notif dispatched") - errs := gperr.NewBuilderWithConcurrency(dispatchErr) disp.providers.RangeAllParallel(func(p Provider) { - if err := notifyProvider(task.Context(), p, msg); err != nil { - errs.Add(gperr.PrependSubject(p.GetName(), err)) + if err := msg.notify(task.Context(), p); err != nil { + disp.retryCh <- &RetryMessage{ + Message: msg, + Trials: 0, + Provider: p, + } } }) - if errs.HasError() { - gperr.LogError(errs.About(), errs.Error()) - } else { - logging.Debug().Str("title", msg.Title).Msgf("dispatched notif") - } +} + +func (disp *Dispatcher) retry(messages []*RetryMessage) error { + task := disp.task.Subtask("retry") + defer task.Finish("notif retried") + + errs := gperr.NewBuilder("notification failure") + for _, msg := range messages { + err := msg.Message.notify(task.Context(), msg.Provider) + if err == nil { + continue + } + if msg.Trials > maxRetries[msg.Message.Level] { + errs.Addf("notification provider %s failed after %d trials", msg.Provider.GetName(), msg.Trials) + errs.Add(err) + continue + } + msg.Trials++ + disp.retryCh <- msg + } + return errs.Error() } diff --git a/internal/notif/gotify.go b/internal/notif/gotify.go index c369304..e72a935 100644 --- a/internal/notif/gotify.go +++ b/internal/notif/gotify.go @@ -3,7 +3,7 @@ package notif import ( "encoding/json" "fmt" - "net/http" + "io" "github.com/gotify/server/v2/model" "github.com/rs/zerolog" @@ -62,12 +62,12 @@ func (client *GotifyClient) MarshalMessage(logMsg *LogMessage) ([]byte, error) { return data, nil } -// makeRespError implements Provider. -func (client *GotifyClient) makeRespError(resp *http.Response) error { +// fmtError implements Provider. +func (client *GotifyClient) fmtError(respBody io.Reader) error { var errm model.Error - err := json.NewDecoder(resp.Body).Decode(&errm) + err := json.NewDecoder(respBody).Decode(&errm) if err != nil { - return fmt.Errorf("%s status %d, but failed to decode err response: %w", client.Name, resp.StatusCode, err) + return fmt.Errorf("failed to decode err response: %w", err) } - return fmt.Errorf("%s status %d %s: %s", client.Name, resp.StatusCode, errm.Error, errm.ErrorDescription) + return fmt.Errorf("%s: %s", errm.Error, errm.ErrorDescription) } diff --git a/internal/notif/providers.go b/internal/notif/providers.go index 3032fb4..0d4c38a 100644 --- a/internal/notif/providers.go +++ b/internal/notif/providers.go @@ -3,11 +3,13 @@ package notif import ( "bytes" "context" + "fmt" + "io" "net/http" "time" "github.com/yusing/go-proxy/internal/gperr" - gphttp "github.com/yusing/go-proxy/internal/net/gphttp" + "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/utils" ) @@ -24,7 +26,7 @@ type ( MarshalMessage(logMsg *LogMessage) ([]byte, error) SetHeaders(logMsg *LogMessage, headers http.Header) - makeRespError(resp *http.Response) error + fmtError(respBody io.Reader) error } ProviderCreateFunc func(map[string]any) (Provider, gperr.Error) ProviderConfig map[string]any @@ -36,10 +38,10 @@ const ( ProviderWebhook = "webhook" ) -func notifyProvider(ctx context.Context, provider Provider, msg *LogMessage) error { +func (msg *LogMessage) notify(ctx context.Context, provider Provider) error { body, err := provider.MarshalMessage(msg) if err != nil { - return gperr.PrependSubject(provider.GetName(), err) + return err } ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -52,7 +54,7 @@ func notifyProvider(ctx context.Context, provider Provider, msg *LogMessage) err bytes.NewReader(body), ) if err != nil { - return gperr.PrependSubject(provider.GetName(), err) + return err } req.Header.Set("Content-Type", provider.GetMIMEType()) @@ -63,13 +65,22 @@ func notifyProvider(ctx context.Context, provider Provider, msg *LogMessage) err resp, err := http.DefaultClient.Do(req) if err != nil { - return gperr.PrependSubject(provider.GetName(), err) + return err } defer resp.Body.Close() - if !gphttp.IsSuccess(resp.StatusCode) { - return provider.makeRespError(resp) + switch resp.StatusCode { + case http.StatusOK, http.StatusCreated, http.StatusAccepted: + body, _ := io.ReadAll(resp.Body) + logging.Debug(). + Str("provider", provider.GetName()). + Str("url", provider.GetURL()). + Str("status", resp.Status). + RawJSON("resp_body", body). + Msg("notification sent") + return nil + default: + return fmt.Errorf("http status %d: %w", resp.StatusCode, provider.fmtError(resp.Body)) } - return nil } diff --git a/internal/notif/webhook.go b/internal/notif/webhook.go index f2343b8..a0c7d65 100644 --- a/internal/notif/webhook.go +++ b/internal/notif/webhook.go @@ -3,7 +3,6 @@ package notif import ( _ "embed" "encoding/json" - "fmt" "io" "net/http" "strings" @@ -88,16 +87,13 @@ func (webhook *Webhook) GetMIMEType() string { return webhook.MIMEType } -// makeRespError implements Provider. -func (webhook *Webhook) makeRespError(resp *http.Response) error { - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("%s status %d, failed to read body: %w", webhook.Name, resp.StatusCode, err) +// fmtError implements Provider. +func (webhook *Webhook) fmtError(respBody io.Reader) error { + body, err := io.ReadAll(respBody) + if err != nil || len(body) == 0 { + return ErrUnknownError } - if len(body) > 0 { - return fmt.Errorf("%s status %d: %s", webhook.Name, resp.StatusCode, body) - } - return fmt.Errorf("%s status %d", webhook.Name, resp.StatusCode) + return rawError(body) } func (webhook *Webhook) MarshalMessage(logMsg *LogMessage) ([]byte, error) { diff --git a/internal/utils/strutils/ansi/ansi.go b/internal/utils/strutils/ansi/ansi.go index 1f8315a..eb11f46 100644 --- a/internal/utils/strutils/ansi/ansi.go +++ b/internal/utils/strutils/ansi/ansi.go @@ -20,6 +20,26 @@ const ( HighlightWhite = BrightWhite + Bold ) +func Error(s string) string { + return WithANSI(s, HighlightRed) +} + +func Success(s string) string { + return WithANSI(s, HighlightGreen) +} + +func Warning(s string) string { + return WithANSI(s, HighlightYellow) +} + +func Info(s string) string { + return WithANSI(s, HighlightCyan) +} + +func WithANSI(s string, ansi string) string { + return ansi + s + Reset +} + func StripANSI(s string) string { return ansiRegexp.ReplaceAllString(s, "") } diff --git a/internal/utils/strutils/format.go b/internal/utils/strutils/format.go index a4f675d..45e21e7 100644 --- a/internal/utils/strutils/format.go +++ b/internal/utils/strutils/format.go @@ -219,7 +219,7 @@ func DoYouMean(s string) string { if s == "" { return "" } - return "Did you mean " + ansi.HighlightGreen + s + ansi.Reset + "?" + return "Did you mean " + ansi.Info(s) + "?" } func Pluralize(n int64) string { diff --git a/internal/watcher/events/event_queue.go b/internal/watcher/events/event_queue.go index 276a2d9..df907e7 100644 --- a/internal/watcher/events/event_queue.go +++ b/internal/watcher/events/event_queue.go @@ -56,9 +56,11 @@ func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan gperr.Error) { e.onFlush = func(events []Event) { defer func() { if err := recover(); err != nil { - e.onError(gperr.New("recovered panic in onFlush"). - Withf("%v", err). - Subject(e.task.Name())) + if err, ok := err.(error); ok { + e.onError(gperr.Wrap(err).Subject(e.task.Name())) + } else { + e.onError(gperr.New("recovered panic in onFlush").Withf("%v", err).Subject(e.task.Name())) + } if common.IsDebug { panic(string(debug.Stack())) }