GoDoxy/internal/notif/dispatcher.go

147 lines
3 KiB
Go

package notif
import (
"time"
"github.com/rs/zerolog"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/task"
F "github.com/yusing/go-proxy/internal/utils/functional"
)
type (
Dispatcher struct {
task *task.Task
providers F.Set[Provider]
logCh chan *LogMessage
retryCh chan *RetryMessage
retryTicker *time.Ticker
}
LogMessage struct {
Level zerolog.Level
Title string
Body LogBody
Color Color
}
RetryMessage struct {
Message *LogMessage
Trials int
Provider Provider
}
)
var dispatcher *Dispatcher
const retryInterval = 5 * time.Second
var maxRetries = map[zerolog.Level]int{
zerolog.DebugLevel: 1,
zerolog.InfoLevel: 1,
zerolog.WarnLevel: 3,
zerolog.ErrorLevel: 5,
zerolog.FatalLevel: 10,
zerolog.PanicLevel: 10,
}
func StartNotifDispatcher(parent task.Parent) *Dispatcher {
dispatcher = &Dispatcher{
task: parent.Subtask("notification"),
providers: F.NewSet[Provider](),
logCh: make(chan *LogMessage),
retryCh: make(chan *RetryMessage, 100),
retryTicker: time.NewTicker(retryInterval),
}
go dispatcher.start()
return dispatcher
}
func Notify(msg *LogMessage) {
if dispatcher == nil {
return
}
select {
case <-dispatcher.task.Context().Done():
return
default:
dispatcher.logCh <- msg
}
}
func (disp *Dispatcher) RegisterProvider(cfg *NotificationConfig) {
disp.providers.Add(cfg.Provider)
}
func (disp *Dispatcher) start() {
defer func() {
dispatcher = nil
disp.providers.Clear()
close(disp.logCh)
close(disp.retryCh)
disp.task.Finish(nil)
}()
for {
select {
case <-disp.task.Context().Done():
return
case msg, ok := <-disp.logCh:
if !ok {
return
}
go disp.dispatch(msg)
case <-disp.retryTicker.C:
if len(disp.retryCh) == 0 {
continue
}
var msgs []*RetryMessage
done := false
for !done {
select {
case msg := <-disp.retryCh:
msgs = append(msgs, msg)
default:
done = true
}
}
if err := disp.retry(msgs); err != nil {
gperr.LogError("notification retry failed", err)
}
}
}
}
func (disp *Dispatcher) dispatch(msg *LogMessage) {
task := disp.task.Subtask("dispatcher")
defer task.Finish("notif dispatched")
disp.providers.RangeAllParallel(func(p Provider) {
if err := msg.notify(task.Context(), p); err != nil {
disp.retryCh <- &RetryMessage{
Message: msg,
Trials: 0,
Provider: p,
}
}
})
}
func (disp *Dispatcher) retry(messages []*RetryMessage) error {
task := disp.task.Subtask("retry")
defer task.Finish("notif retried")
errs := gperr.NewBuilder("notification failure")
for _, msg := range messages {
err := msg.Message.notify(task.Context(), msg.Provider)
if err == nil {
continue
}
if msg.Trials > maxRetries[msg.Message.Level] {
errs.Addf("notification provider %s failed after %d trials", msg.Provider.GetName(), msg.Trials)
errs.Add(err)
continue
}
msg.Trials++
disp.retryCh <- msg
}
return errs.Error()
}