From 78bed431994fc0f93290fe3d9efcd4dcb3b85bb3 Mon Sep 17 00:00:00 2001 From: yusing Date: Thu, 17 Apr 2025 14:47:14 +0800 Subject: [PATCH] refactor: update task management code - Rename needFinish to waitFinish - Fixed some tasks not being waited they should be - Adjusted mutex usage in the directory watcher to utilize read-write locks for improved concurrency management. --- internal/docker/client.go | 2 +- internal/metrics/period/poller.go | 2 +- internal/notif/dispatcher.go | 4 ++-- internal/task/debug.go | 2 +- internal/task/task.go | 12 +++++------- internal/task/task_test.go | 2 +- internal/task/utils.go | 7 +++++-- internal/watcher/directory_watcher.go | 10 ++++++---- internal/watcher/health/monitor/monitor.go | 2 +- 9 files changed, 23 insertions(+), 20 deletions(-) diff --git a/internal/docker/client.go b/internal/docker/client.go index c3ba621..92b54cf 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -46,7 +46,7 @@ const ( ) func initClientCleaner() { - cleaner := task.RootTask("docker_clients_cleaner") + cleaner := task.RootTask("docker_clients_cleaner", false) go func() { ticker := time.NewTicker(cleanInterval) defer ticker.Stop() diff --git a/internal/metrics/period/poller.go b/internal/metrics/period/poller.go index 0e36e0d..6735cbe 100644 --- a/internal/metrics/period/poller.go +++ b/internal/metrics/period/poller.go @@ -133,7 +133,7 @@ func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) { } func (p *Poller[T, AggregateT]) Start() { - t := task.RootTask("poller." + p.name) + t := task.RootTask("poller."+p.name, true) err := p.load() if err != nil { if !os.IsNotExist(err) { diff --git a/internal/notif/dispatcher.go b/internal/notif/dispatcher.go index fd138e5..aab444f 100644 --- a/internal/notif/dispatcher.go +++ b/internal/notif/dispatcher.go @@ -33,7 +33,7 @@ const dispatchErr = "notification dispatch error" func StartNotifDispatcher(parent task.Parent) *Dispatcher { dispatcher = &Dispatcher{ - task: parent.Subtask("notification"), + task: parent.Subtask("notification", true), logCh: make(chan *LogMessage), providers: F.NewSet[Provider](), } @@ -86,7 +86,7 @@ func (disp *Dispatcher) dispatch(msg *LogMessage) { if true { return } - task := disp.task.Subtask("dispatcher") + task := disp.task.Subtask("dispatcher", true) defer task.Finish("notif dispatched") errs := gperr.NewBuilder(dispatchErr) diff --git a/internal/task/debug.go b/internal/task/debug.go index 97b3086..3a07e3c 100644 --- a/internal/task/debug.go +++ b/internal/task/debug.go @@ -57,7 +57,7 @@ func (t *Task) callbackList() []map[string]any { func (t *Task) MarshalMap() map[string]any { return map[string]any{ "name": t.name, - "need_finish": strconv.FormatBool(t.needFinish), + "need_finish": strconv.FormatBool(t.waitFinish), "childrens": t.children, "callbacks": t.callbackList(), "finish_called": t.finishedCalled, diff --git a/internal/task/task.go b/internal/task/task.go index b93eae2..503ca32 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -45,7 +45,7 @@ type ( callbacks map[*Callback]struct{} callbacksDone chan struct{} - needFinish bool + waitFinish bool finished chan struct{} // finishedCalled == 1 Finish has been called // but does not mean that the task is finished yet @@ -59,7 +59,7 @@ type ( } Parent interface { Context() context.Context - Subtask(name string, needFinish ...bool) *Task + Subtask(name string, waitFinish bool) *Task Name() string Finish(reason any) OnCancel(name string, f func()) @@ -141,13 +141,11 @@ func (t *Task) finish(reason any) { // Subtask returns a new subtask with the given name, derived from the parent's context. // // This should not be called after Finish is called. -func (t *Task) Subtask(name string, needFinish ...bool) *Task { - nf := len(needFinish) == 0 || needFinish[0] - +func (t *Task) Subtask(name string, waitFinish bool) *Task { ctx, cancel := context.WithCancelCause(t.ctx) child := &Task{ parent: t, - needFinish: nf, + waitFinish: waitFinish, finished: make(chan struct{}), ctx: ctx, cancel: cancel, @@ -161,7 +159,7 @@ func (t *Task) Subtask(name string, needFinish ...bool) *Task { allTasks.Add(child) t.addChildCount() - if !nf { + if !waitFinish { go func() { <-child.ctx.Done() child.Finish(nil) diff --git a/internal/task/task_test.go b/internal/task/task_test.go index f9c35aa..4168b0e 100644 --- a/internal/task/task_test.go +++ b/internal/task/task_test.go @@ -17,7 +17,7 @@ func TestChildTaskCancellation(t *testing.T) { t.Cleanup(testCleanup) parent := testTask() - child := parent.Subtask("") + child := parent.Subtask("child", false) go func() { defer child.Finish(nil) diff --git a/internal/task/utils.go b/internal/task/utils.go index 5e8a803..88f4177 100644 --- a/internal/task/utils.go +++ b/internal/task/utils.go @@ -25,8 +25,8 @@ func testCleanup() { } // RootTask returns a new Task with the given name, derived from the root context. -func RootTask(name string, needFinish ...bool) *Task { - return root.Subtask(name, needFinish...) +func RootTask(name string, needFinish bool) *Task { + return root.Subtask(name, needFinish) } func newRoot() *Task { @@ -66,6 +66,9 @@ func GracefulShutdown(timeout time.Duration) (err error) { return case <-after: logging.Warn().Msgf("Timeout waiting for %d tasks to finish", allTasks.Size()) + for t := range allTasks.Range { + logging.Warn().Msgf("Task %s is still running", t.name) + } return context.DeadlineExceeded } } diff --git a/internal/watcher/directory_watcher.go b/internal/watcher/directory_watcher.go index 45cca6a..5baa241 100644 --- a/internal/watcher/directory_watcher.go +++ b/internal/watcher/directory_watcher.go @@ -21,7 +21,7 @@ type DirWatcher struct { w *fsnotify.Watcher fwMap map[string]*fileWatcher - mu sync.Mutex + mu sync.RWMutex eventCh chan Event errCh chan gperr.Error @@ -56,7 +56,7 @@ func NewDirectoryWatcher(parent task.Parent, dirPath string) *DirWatcher { fwMap: make(map[string]*fileWatcher), eventCh: make(chan Event), errCh: make(chan gperr.Error), - task: parent.Subtask("dir_watcher(" + dirPath + ")"), + task: parent.Subtask("dir_watcher("+dirPath+")", true), } go helper.start() return helper @@ -95,7 +95,9 @@ func (h *DirWatcher) cleanup() { close(fw.eventCh) close(fw.errCh) } + h.fwMap = nil h.task.Finish(nil) + h.Info().Msg("directory watcher closed") } func (h *DirWatcher) start() { @@ -143,9 +145,9 @@ func (h *DirWatcher) start() { } // send event to file watcher too - h.mu.Lock() + h.mu.RLock() w, ok := h.fwMap[relPath] - h.mu.Unlock() + h.mu.RUnlock() if ok { select { case w.eventCh <- msg: diff --git a/internal/watcher/health/monitor/monitor.go b/internal/watcher/health/monitor/monitor.go index fcb22c5..13a18a9 100644 --- a/internal/watcher/health/monitor/monitor.go +++ b/internal/watcher/health/monitor/monitor.go @@ -88,7 +88,7 @@ func (mon *monitor) Start(parent task.Parent) gperr.Error { } mon.service = parent.Name() - mon.task = parent.Subtask("health_monitor") + mon.task = parent.Subtask("health_monitor", true) go func() { logger := logging.With().Str("name", mon.service).Logger()