From 58cfba7695d381f4628cead75290a81dfd0a24fd Mon Sep 17 00:00:00 2001 From: yusing Date: Sun, 1 Dec 2024 11:12:25 +0800 Subject: [PATCH] refactor and fix duplicate notification --- internal/config/config.go | 3 ++- internal/notif/dispatcher.go | 36 +++++++++++++------------------- internal/utils/functional/set.go | 4 ++++ 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 35874e0..3f0c243 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -192,9 +192,10 @@ func (cfg *Config) initNotification(notifCfg []types.NotificationConfig) (err E. if len(notifCfg) == 0 { return } + dispatcher := notif.StartNotifDispatcher(cfg.task) errs := E.NewBuilder("notification providers load errors") for i, notifier := range notifCfg { - _, err := notif.RegisterProvider(cfg.task.Subtask("notifier"), notifier) + _, err := dispatcher.RegisterProvider(notifier) if err == nil { continue } diff --git a/internal/notif/dispatcher.go b/internal/notif/dispatcher.go index 148b581..8464b45 100644 --- a/internal/notif/dispatcher.go +++ b/internal/notif/dispatcher.go @@ -35,24 +35,21 @@ var ( const dispatchErr = "notification dispatch error" -func init() { - dispatcher = newNotifDispatcher() - go dispatcher.start() -} - -func newNotifDispatcher() *Dispatcher { - return &Dispatcher{ - task: task.GlobalTask("notif dispatcher"), +func StartNotifDispatcher(parent task.Task) *Dispatcher { + dispatcher = &Dispatcher{ + task: parent.Subtask("notification dispatcher"), logCh: make(chan *LogMessage), providers: F.NewSet[Provider](), } -} - -func GetDispatcher() *Dispatcher { + go dispatcher.start() return dispatcher } -func RegisterProvider(configSubTask task.Task, cfg types.NotificationConfig) (Provider, E.Error) { +func Notify(msg *LogMessage) { + dispatcher.logCh <- msg +} + +func (disp *Dispatcher) RegisterProvider(cfg types.NotificationConfig) (Provider, E.Error) { providerName, ok := cfg["provider"] if !ok { return nil, ErrMissingNotifProvider @@ -69,10 +66,7 @@ func RegisterProvider(configSubTask task.Task, cfg types.NotificationConfig) (Pr provider, err := createFunc(cfg) if err == nil { - dispatcher.providers.Add(provider) - configSubTask.OnCancel("remove provider", func() { - dispatcher.providers.Remove(provider) - }) + disp.providers.Add(provider) } return provider, err default: @@ -81,7 +75,11 @@ func RegisterProvider(configSubTask task.Task, cfg types.NotificationConfig) (Pr } func (disp *Dispatcher) start() { - defer dispatcher.task.Finish("dispatcher stopped") + defer func() { + disp.providers.Clear() + close(disp.logCh) + disp.task.Finish("dispatcher stopped") + }() for { select { @@ -123,7 +121,3 @@ func (disp *Dispatcher) dispatch(msg *LogMessage) { // } // } // } - -func Notify(msg *LogMessage) { - dispatcher.logCh <- msg -} diff --git a/internal/utils/functional/set.go b/internal/utils/functional/set.go index 6a6d8bf..f34a9e5 100644 --- a/internal/utils/functional/set.go +++ b/internal/utils/functional/set.go @@ -22,6 +22,10 @@ func (set Set[T]) Remove(v T) { set.m.Delete(v) } +func (set Set[T]) Clear() { + set.m.Clear() +} + func (set Set[T]) Contains(v T) bool { _, ok := set.m.Load(v) return ok