refactor: update task management code

- Rename needFinish to waitFinish
- Fixed some tasks not being waited they should be
- Adjusted mutex usage in the directory watcher to utilize read-write locks for improved concurrency management.
This commit is contained in:
yusing 2025-04-17 14:47:14 +08:00
parent 41bef09be2
commit 78bed43199
9 changed files with 23 additions and 20 deletions

View file

@ -46,7 +46,7 @@ const (
) )
func initClientCleaner() { func initClientCleaner() {
cleaner := task.RootTask("docker_clients_cleaner") cleaner := task.RootTask("docker_clients_cleaner", false)
go func() { go func() {
ticker := time.NewTicker(cleanInterval) ticker := time.NewTicker(cleanInterval)
defer ticker.Stop() defer ticker.Stop()

View file

@ -133,7 +133,7 @@ func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) {
} }
func (p *Poller[T, AggregateT]) Start() { func (p *Poller[T, AggregateT]) Start() {
t := task.RootTask("poller." + p.name) t := task.RootTask("poller."+p.name, true)
err := p.load() err := p.load()
if err != nil { if err != nil {
if !os.IsNotExist(err) { if !os.IsNotExist(err) {

View file

@ -33,7 +33,7 @@ const dispatchErr = "notification dispatch error"
func StartNotifDispatcher(parent task.Parent) *Dispatcher { func StartNotifDispatcher(parent task.Parent) *Dispatcher {
dispatcher = &Dispatcher{ dispatcher = &Dispatcher{
task: parent.Subtask("notification"), task: parent.Subtask("notification", true),
logCh: make(chan *LogMessage), logCh: make(chan *LogMessage),
providers: F.NewSet[Provider](), providers: F.NewSet[Provider](),
} }
@ -86,7 +86,7 @@ func (disp *Dispatcher) dispatch(msg *LogMessage) {
if true { if true {
return return
} }
task := disp.task.Subtask("dispatcher") task := disp.task.Subtask("dispatcher", true)
defer task.Finish("notif dispatched") defer task.Finish("notif dispatched")
errs := gperr.NewBuilder(dispatchErr) errs := gperr.NewBuilder(dispatchErr)

View file

@ -57,7 +57,7 @@ func (t *Task) callbackList() []map[string]any {
func (t *Task) MarshalMap() map[string]any { func (t *Task) MarshalMap() map[string]any {
return map[string]any{ return map[string]any{
"name": t.name, "name": t.name,
"need_finish": strconv.FormatBool(t.needFinish), "need_finish": strconv.FormatBool(t.waitFinish),
"childrens": t.children, "childrens": t.children,
"callbacks": t.callbackList(), "callbacks": t.callbackList(),
"finish_called": t.finishedCalled, "finish_called": t.finishedCalled,

View file

@ -45,7 +45,7 @@ type (
callbacks map[*Callback]struct{} callbacks map[*Callback]struct{}
callbacksDone chan struct{} callbacksDone chan struct{}
needFinish bool waitFinish bool
finished chan struct{} finished chan struct{}
// finishedCalled == 1 Finish has been called // finishedCalled == 1 Finish has been called
// but does not mean that the task is finished yet // but does not mean that the task is finished yet
@ -59,7 +59,7 @@ type (
} }
Parent interface { Parent interface {
Context() context.Context Context() context.Context
Subtask(name string, needFinish ...bool) *Task Subtask(name string, waitFinish bool) *Task
Name() string Name() string
Finish(reason any) Finish(reason any)
OnCancel(name string, f func()) OnCancel(name string, f func())
@ -141,13 +141,11 @@ func (t *Task) finish(reason any) {
// Subtask returns a new subtask with the given name, derived from the parent's context. // Subtask returns a new subtask with the given name, derived from the parent's context.
// //
// This should not be called after Finish is called. // This should not be called after Finish is called.
func (t *Task) Subtask(name string, needFinish ...bool) *Task { func (t *Task) Subtask(name string, waitFinish bool) *Task {
nf := len(needFinish) == 0 || needFinish[0]
ctx, cancel := context.WithCancelCause(t.ctx) ctx, cancel := context.WithCancelCause(t.ctx)
child := &Task{ child := &Task{
parent: t, parent: t,
needFinish: nf, waitFinish: waitFinish,
finished: make(chan struct{}), finished: make(chan struct{}),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -161,7 +159,7 @@ func (t *Task) Subtask(name string, needFinish ...bool) *Task {
allTasks.Add(child) allTasks.Add(child)
t.addChildCount() t.addChildCount()
if !nf { if !waitFinish {
go func() { go func() {
<-child.ctx.Done() <-child.ctx.Done()
child.Finish(nil) child.Finish(nil)

View file

@ -17,7 +17,7 @@ func TestChildTaskCancellation(t *testing.T) {
t.Cleanup(testCleanup) t.Cleanup(testCleanup)
parent := testTask() parent := testTask()
child := parent.Subtask("") child := parent.Subtask("child", false)
go func() { go func() {
defer child.Finish(nil) defer child.Finish(nil)

View file

@ -25,8 +25,8 @@ func testCleanup() {
} }
// RootTask returns a new Task with the given name, derived from the root context. // RootTask returns a new Task with the given name, derived from the root context.
func RootTask(name string, needFinish ...bool) *Task { func RootTask(name string, needFinish bool) *Task {
return root.Subtask(name, needFinish...) return root.Subtask(name, needFinish)
} }
func newRoot() *Task { func newRoot() *Task {
@ -66,6 +66,9 @@ func GracefulShutdown(timeout time.Duration) (err error) {
return return
case <-after: case <-after:
logging.Warn().Msgf("Timeout waiting for %d tasks to finish", allTasks.Size()) logging.Warn().Msgf("Timeout waiting for %d tasks to finish", allTasks.Size())
for t := range allTasks.Range {
logging.Warn().Msgf("Task %s is still running", t.name)
}
return context.DeadlineExceeded return context.DeadlineExceeded
} }
} }

View file

@ -21,7 +21,7 @@ type DirWatcher struct {
w *fsnotify.Watcher w *fsnotify.Watcher
fwMap map[string]*fileWatcher fwMap map[string]*fileWatcher
mu sync.Mutex mu sync.RWMutex
eventCh chan Event eventCh chan Event
errCh chan gperr.Error errCh chan gperr.Error
@ -56,7 +56,7 @@ func NewDirectoryWatcher(parent task.Parent, dirPath string) *DirWatcher {
fwMap: make(map[string]*fileWatcher), fwMap: make(map[string]*fileWatcher),
eventCh: make(chan Event), eventCh: make(chan Event),
errCh: make(chan gperr.Error), errCh: make(chan gperr.Error),
task: parent.Subtask("dir_watcher(" + dirPath + ")"), task: parent.Subtask("dir_watcher("+dirPath+")", true),
} }
go helper.start() go helper.start()
return helper return helper
@ -95,7 +95,9 @@ func (h *DirWatcher) cleanup() {
close(fw.eventCh) close(fw.eventCh)
close(fw.errCh) close(fw.errCh)
} }
h.fwMap = nil
h.task.Finish(nil) h.task.Finish(nil)
h.Info().Msg("directory watcher closed")
} }
func (h *DirWatcher) start() { func (h *DirWatcher) start() {
@ -143,9 +145,9 @@ func (h *DirWatcher) start() {
} }
// send event to file watcher too // send event to file watcher too
h.mu.Lock() h.mu.RLock()
w, ok := h.fwMap[relPath] w, ok := h.fwMap[relPath]
h.mu.Unlock() h.mu.RUnlock()
if ok { if ok {
select { select {
case w.eventCh <- msg: case w.eventCh <- msg:

View file

@ -88,7 +88,7 @@ func (mon *monitor) Start(parent task.Parent) gperr.Error {
} }
mon.service = parent.Name() mon.service = parent.Name()
mon.task = parent.Subtask("health_monitor") mon.task = parent.Subtask("health_monitor", true)
go func() { go func() {
logger := logging.With().Str("name", mon.service).Logger() logger := logging.With().Str("name", mon.service).Logger()