mirror of
https://github.com/yusing/godoxy.git
synced 2025-05-20 12:42:34 +02:00
add timeout on task wait, temporary fix task stuck
This commit is contained in:
parent
a0a81240ce
commit
659ad29875
1 changed files with 44 additions and 3 deletions
|
@ -5,7 +5,9 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/yusing/go-proxy/internal/common"
|
"github.com/yusing/go-proxy/internal/common"
|
||||||
E "github.com/yusing/go-proxy/internal/error"
|
E "github.com/yusing/go-proxy/internal/error"
|
||||||
|
@ -51,6 +53,8 @@ type (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const taskTimeout = 3 * time.Second
|
||||||
|
|
||||||
func (t *Task) Context() context.Context {
|
func (t *Task) Context() context.Context {
|
||||||
return t.ctx
|
return t.ctx
|
||||||
}
|
}
|
||||||
|
@ -83,7 +87,7 @@ func (t *Task) onCancel(about string, fn func(), waitSubTasks bool) {
|
||||||
go func() {
|
go func() {
|
||||||
<-t.ctx.Done()
|
<-t.ctx.Done()
|
||||||
if waitSubTasks {
|
if waitSubTasks {
|
||||||
t.children.Wait()
|
waitWithTimeout(&t.children)
|
||||||
}
|
}
|
||||||
t.invokeWithRecover(fn, about)
|
t.invokeWithRecover(fn, about)
|
||||||
t.onFinished.Done()
|
t.onFinished.Done()
|
||||||
|
@ -100,14 +104,51 @@ func (t *Task) Finish(reason any) {
|
||||||
|
|
||||||
func (t *Task) finish(reason any) {
|
func (t *Task) finish(reason any) {
|
||||||
t.cancel(fmtCause(reason))
|
t.cancel(fmtCause(reason))
|
||||||
t.children.Wait()
|
if !waitWithTimeout(&t.children) {
|
||||||
t.onFinished.Wait()
|
logger.Debug().
|
||||||
|
Strs("subtasks", t.listChildren()).
|
||||||
|
Msg("Timeout waiting for these subtasks to finish")
|
||||||
|
}
|
||||||
|
if !waitWithTimeout(&t.onFinished) {
|
||||||
|
logger.Debug().
|
||||||
|
Str("task", t.name).
|
||||||
|
Msg("Timeout waiting for callbacks to finish")
|
||||||
|
}
|
||||||
if t.finished != nil {
|
if t.finished != nil {
|
||||||
close(t.finished)
|
close(t.finished)
|
||||||
}
|
}
|
||||||
logger.Trace().Msg("task " + t.name + " finished")
|
logger.Trace().Msg("task " + t.name + " finished")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// debug only.
|
||||||
|
func (t *Task) listChildren() []string {
|
||||||
|
var children []string
|
||||||
|
allTasks.Range(func(child *Task) bool {
|
||||||
|
if strings.HasPrefix(child.name, t.name+".") {
|
||||||
|
children = append(children, child.name)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return children
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitWithTimeout(wg *sync.WaitGroup) bool {
|
||||||
|
done := make(chan struct{})
|
||||||
|
timeout := time.After(taskTimeout)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return true
|
||||||
|
case <-timeout:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func fmtCause(cause any) error {
|
func fmtCause(cause any) error {
|
||||||
switch cause := cause.(type) {
|
switch cause := cause.(type) {
|
||||||
case nil:
|
case nil:
|
||||||
|
|
Loading…
Add table
Reference in a new issue