From ba8705fb8453bc3a66e84b4be06e156c7c057444 Mon Sep 17 00:00:00 2001 From: yusing Date: Fri, 3 Jan 2025 03:08:21 +0800 Subject: [PATCH] fix shutdown stuck or panic --- internal/task/impl.go | 9 ++++++--- internal/task/task.go | 4 +--- internal/task/utils.go | 7 +++++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/internal/task/impl.go b/internal/task/impl.go index 3c8bd79..ffde2b4 100644 --- a/internal/task/impl.go +++ b/internal/task/impl.go @@ -12,6 +12,8 @@ func (t *Task) addCallback(about string, fn func(), waitSubTasks bool) { defer t.mu.Unlock() if t.callbacks == nil { t.callbacks = make(map[*Callback]struct{}) + } + if t.callbacksDone == nil { t.callbacksDone = make(chan struct{}) } t.callbacks[&Callback{fn, about, waitSubTasks}] = struct{}{} @@ -28,14 +30,15 @@ func (t *Task) addChildCount() { } func (t *Task) subChildCount() { - if atomic.AddUint32(&t.children, ^uint32(0)) == 0 { + switch atomic.AddUint32(&t.children, ^uint32(0)) { + case 0: close(t.childrenDone) + case ^uint32(0): + panic("negative child count") } } func (t *Task) runCallbacks() { - t.mu.Lock() - defer t.mu.Unlock() if len(t.callbacks) == 0 { return } diff --git a/internal/task/task.go b/internal/task/task.go index 0675490..a457aed 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -117,9 +117,7 @@ func (t *Task) finish(reason any) { Strs("callbacks", t.listCallbacks()). Msg("Timeout waiting for callbacks to finish") } - if t.finished != nil { - close(t.finished) - } + close(t.finished) if t == root { return } diff --git a/internal/task/utils.go b/internal/task/utils.go index d0b2fbf..e44f860 100644 --- a/internal/task/utils.go +++ b/internal/task/utils.go @@ -30,9 +30,12 @@ func RootTask(name string, needFinish ...bool) *Task { } func newRoot() *Task { - t := &Task{name: "root"} + t := &Task{ + name: "root", + childrenDone: make(chan struct{}), + finished: make(chan struct{}), + } t.ctx, t.cancel = context.WithCancelCause(context.Background()) - t.callbacks = make(map[*Callback]struct{}) return t }