GoDoxy/internal/task/impl.go
2025-01-03 03:30:15 +08:00

78 lines
1.3 KiB
Go

package task
import (
"errors"
"fmt"
"sync/atomic"
"time"
)
func (t *Task) addCallback(about string, fn func(), waitSubTasks bool) {
t.mu.Lock()
defer t.mu.Unlock()
if t.callbacks == nil {
t.callbacks = make(map[*Callback]struct{})
}
if t.callbacksDone == nil {
t.callbacksDone = make(chan struct{})
}
t.callbacks[&Callback{fn, about, waitSubTasks}] = struct{}{}
}
func (t *Task) addChildCount() {
if atomic.AddUint32(&t.children, 1) == 1 {
t.mu.Lock()
if t.childrenDone == nil {
t.childrenDone = make(chan struct{})
}
t.mu.Unlock()
}
}
func (t *Task) subChildCount() {
switch atomic.AddUint32(&t.children, ^uint32(0)) {
case 0:
close(t.childrenDone)
case ^uint32(0):
panic("negative child count")
}
}
func (t *Task) runCallbacks() {
if len(t.callbacks) == 0 {
return
}
for c := range t.callbacks {
if c.waitChildren {
waitWithTimeout(t.childrenDone)
}
t.invokeWithRecover(c.fn, c.about)
delete(t.callbacks, c)
}
close(t.callbacksDone)
}
func waitWithTimeout(ch <-chan struct{}) bool {
if ch == nil {
return true
}
select {
case <-ch:
return true
case <-time.After(taskTimeout):
return false
}
}
func fmtCause(cause any) error {
switch cause := cause.(type) {
case nil:
return nil
case error:
return cause
case string:
return errors.New(cause)
default:
return fmt.Errorf("%v", cause)
}
}