GoDoxy/internal/notif/dispatcher.go
Yuzerion 78900772bb
Feat/ntfy (#57)
* implement ntfy notification

* fix notification fields order

* fix schema for ntfy

---------

Co-authored-by: yusing <yusing@6uo.me>
2025-02-01 13:07:44 +08:00

114 lines
2.4 KiB
Go

package notif
import (
"github.com/rs/zerolog"
E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/task"
F "github.com/yusing/go-proxy/internal/utils/functional"
)
type (
Dispatcher struct {
task *task.Task
logCh chan *LogMessage
providers F.Set[Provider]
}
LogField struct {
Name string `json:"name"`
Value string `json:"value"`
}
LogFields []LogField
LogMessage struct {
Level zerolog.Level
Title string
Extras LogFields
Color Color
}
)
var dispatcher *Dispatcher
const dispatchErr = "notification dispatch error"
func StartNotifDispatcher(parent task.Parent) *Dispatcher {
dispatcher = &Dispatcher{
task: parent.Subtask("notification"),
logCh: make(chan *LogMessage),
providers: F.NewSet[Provider](),
}
go dispatcher.start()
return dispatcher
}
func Notify(msg *LogMessage) {
if dispatcher == nil {
return
}
select {
case <-dispatcher.task.Context().Done():
return
default:
dispatcher.logCh <- msg
}
}
func (f *LogFields) Add(name, value string) {
*f = append(*f, LogField{Name: name, Value: value})
}
func (disp *Dispatcher) RegisterProvider(cfg *NotificationConfig) {
disp.providers.Add(cfg.Provider)
}
func (disp *Dispatcher) start() {
defer func() {
dispatcher = nil
disp.providers.Clear()
close(disp.logCh)
disp.task.Finish(nil)
}()
for {
select {
case <-disp.task.Context().Done():
return
case msg, ok := <-disp.logCh:
if !ok {
return
}
go disp.dispatch(msg)
}
}
}
func (disp *Dispatcher) dispatch(msg *LogMessage) {
task := disp.task.Subtask("dispatcher")
defer task.Finish("notif dispatched")
errs := E.NewBuilder(dispatchErr)
disp.providers.RangeAllParallel(func(p Provider) {
if err := notifyProvider(task.Context(), p, msg); err != nil {
errs.Add(E.PrependSubject(p.GetName(), err))
}
})
if errs.HasError() {
E.LogError(errs.About(), errs.Error())
} else {
logging.Debug().Str("title", msg.Title).Msgf("dispatched notif")
}
}
// Run implements zerolog.Hook.
// func (disp *Dispatcher) Run(e *zerolog.Event, level zerolog.Level, message string) {
// if strings.HasPrefix(message, dispatchErr) { // prevent recursion
// return
// }
// switch level {
// case zerolog.WarnLevel, zerolog.ErrorLevel, zerolog.FatalLevel, zerolog.PanicLevel:
// disp.logCh <- &LogMessage{
// Level: level,
// Message: message,
// }
// }
// }