refactor and fix duplicate notification

This commit is contained in:
yusing 2024-12-01 11:12:25 +08:00
parent d1cb7a5ce4
commit 58cfba7695
3 changed files with 21 additions and 22 deletions

View file

@ -192,9 +192,10 @@ func (cfg *Config) initNotification(notifCfg []types.NotificationConfig) (err E.
if len(notifCfg) == 0 {
return
}
dispatcher := notif.StartNotifDispatcher(cfg.task)
errs := E.NewBuilder("notification providers load errors")
for i, notifier := range notifCfg {
_, err := notif.RegisterProvider(cfg.task.Subtask("notifier"), notifier)
_, err := dispatcher.RegisterProvider(notifier)
if err == nil {
continue
}

View file

@ -35,24 +35,21 @@ var (
const dispatchErr = "notification dispatch error"
func init() {
dispatcher = newNotifDispatcher()
go dispatcher.start()
}
func newNotifDispatcher() *Dispatcher {
return &Dispatcher{
task: task.GlobalTask("notif dispatcher"),
func StartNotifDispatcher(parent task.Task) *Dispatcher {
dispatcher = &Dispatcher{
task: parent.Subtask("notification dispatcher"),
logCh: make(chan *LogMessage),
providers: F.NewSet[Provider](),
}
}
func GetDispatcher() *Dispatcher {
go dispatcher.start()
return dispatcher
}
func RegisterProvider(configSubTask task.Task, cfg types.NotificationConfig) (Provider, E.Error) {
func Notify(msg *LogMessage) {
dispatcher.logCh <- msg
}
func (disp *Dispatcher) RegisterProvider(cfg types.NotificationConfig) (Provider, E.Error) {
providerName, ok := cfg["provider"]
if !ok {
return nil, ErrMissingNotifProvider
@ -69,10 +66,7 @@ func RegisterProvider(configSubTask task.Task, cfg types.NotificationConfig) (Pr
provider, err := createFunc(cfg)
if err == nil {
dispatcher.providers.Add(provider)
configSubTask.OnCancel("remove provider", func() {
dispatcher.providers.Remove(provider)
})
disp.providers.Add(provider)
}
return provider, err
default:
@ -81,7 +75,11 @@ func RegisterProvider(configSubTask task.Task, cfg types.NotificationConfig) (Pr
}
func (disp *Dispatcher) start() {
defer dispatcher.task.Finish("dispatcher stopped")
defer func() {
disp.providers.Clear()
close(disp.logCh)
disp.task.Finish("dispatcher stopped")
}()
for {
select {
@ -123,7 +121,3 @@ func (disp *Dispatcher) dispatch(msg *LogMessage) {
// }
// }
// }
func Notify(msg *LogMessage) {
dispatcher.logCh <- msg
}

View file

@ -22,6 +22,10 @@ func (set Set[T]) Remove(v T) {
set.m.Delete(v)
}
func (set Set[T]) Clear() {
set.m.Clear()
}
func (set Set[T]) Contains(v T) bool {
_, ok := set.m.Load(v)
return ok