fix reload stuck

This commit is contained in:
yusing 2025-01-02 11:30:09 +08:00
parent 2fe0b888bd
commit cd74b76483
6 changed files with 23 additions and 17 deletions

View file

@ -46,7 +46,7 @@ func newWaker(parent task.Parent, entry route.Entry, rp *gphttp.ReverseProxy, st
rp: rp, rp: rp,
stream: stream, stream: stream,
} }
task := parent.Subtask("idlewatcher") task := parent.Subtask("idlewatcher." + entry.TargetName())
watcher, err := registerWatcher(task, entry, waker) watcher, err := registerWatcher(task, entry, waker)
if err != nil { if err != nil {
return nil, E.Errorf("register watcher: %w", err) return nil, E.Errorf("register watcher: %w", err)
@ -117,6 +117,7 @@ func (w *Watcher) Uptime() time.Duration {
return 0 return 0
} }
// Status implements health.HealthMonitor.
func (w *Watcher) Status() health.Status { func (w *Watcher) Status() health.Status {
status := w.getStatusUpdateReady() status := w.getStatusUpdateReady()
if w.metric != nil { if w.metric != nil {
@ -125,7 +126,6 @@ func (w *Watcher) Status() health.Status {
return status return status
} }
// Status implements health.HealthMonitor.
func (w *Watcher) getStatusUpdateReady() health.Status { func (w *Watcher) getStatusUpdateReady() health.Status {
if !w.ContainerRunning { if !w.ContainerRunning {
return health.StatusNapping return health.StatusNapping

View file

@ -32,7 +32,7 @@ func setup() {
return return
} }
t := task.RootTask("error_page", true) t := task.RootTask("error_page", false)
dirWatcher = W.NewDirectoryWatcher(t, errPagesBasePath) dirWatcher = W.NewDirectoryWatcher(t, errPagesBasePath)
loadContent() loadContent()
go watchDir() go watchDir()

View file

@ -83,7 +83,7 @@ func (r *HTTPRoute) Start(parent task.Parent) E.Error {
switch { switch {
case entry.UseIdleWatcher(r): case entry.UseIdleWatcher(r):
waker, err := idlewatcher.NewHTTPWaker(r.task, r.ReverseProxyEntry, r.rp) waker, err := idlewatcher.NewHTTPWaker(parent, r.ReverseProxyEntry, r.rp)
if err != nil { if err != nil {
r.task.Finish(err) r.task.Finish(err)
return err return err
@ -144,13 +144,13 @@ func (r *HTTPRoute) Start(parent task.Parent) E.Error {
r.addToLoadBalancer(parent) r.addToLoadBalancer(parent)
} else { } else {
routes.SetHTTPRoute(r.TargetName(), r) routes.SetHTTPRoute(r.TargetName(), r)
r.task.OnFinished("entrypoint_remove_route", func() { r.task.OnCancel("entrypoint_remove_route", func() {
routes.DeleteHTTPRoute(r.TargetName()) routes.DeleteHTTPRoute(r.TargetName())
}) })
} }
if common.PrometheusEnabled { if common.PrometheusEnabled {
r.task.OnFinished("metrics_cleanup", r.rp.UnregisterMetrics) r.task.OnCancel("metrics_cleanup", r.rp.UnregisterMetrics)
} }
return nil return nil
} }

View file

@ -108,9 +108,6 @@ func (p *Provider) startRoute(parent task.Parent, r *R.Route) E.Error {
} }
p.routes.Store(r.Entry.Alias, r) p.routes.Store(r.Entry.Alias, r)
r.Task().OnFinished("provider_remove_route", func() {
p.routes.Delete(r.Entry.Alias)
})
return nil return nil
} }

View file

@ -18,6 +18,15 @@ func GetStreamRoutes() F.Map[string, types.StreamRoute] {
return streamRoutes return streamRoutes
} }
func GetHTTPRouteOrExact(alias, host string) (types.HTTPRoute, bool) {
r, ok := httpRoutes.Load(alias)
if ok {
return r, true
}
// try find with exact match
return httpRoutes.Load(host)
}
func GetHTTPRoute(alias string) (types.HTTPRoute, bool) { func GetHTTPRoute(alias string) (types.HTTPRoute, bool) {
return httpRoutes.Load(alias) return httpRoutes.Load(alias)
} }
@ -41,3 +50,8 @@ func DeleteHTTPRoute(alias string) {
func DeleteStreamRoute(alias string) { func DeleteStreamRoute(alias string) {
streamRoutes.Delete(alias) streamRoutes.Delete(alias)
} }
func TestClear() {
httpRoutes = F.NewMapOf[string, types.HTTPRoute]()
streamRoutes = F.NewMapOf[string, types.StreamRoute]()
}

View file

@ -55,14 +55,13 @@ func (r *StreamRoute) Start(parent task.Parent) E.Error {
r.task = parent.Subtask("stream." + r.TargetName()) r.task = parent.Subtask("stream." + r.TargetName())
r.Stream = NewStream(r) r.Stream = NewStream(r)
parent.OnCancel("finish", func() { parent.OnCancel("finish", func() {
r.task.Finish(nil) r.task.Finish(nil)
}) })
switch { switch {
case entry.UseIdleWatcher(r): case entry.UseIdleWatcher(r):
waker, err := idlewatcher.NewStreamWaker(r.task, r.StreamEntry, r.Stream) waker, err := idlewatcher.NewStreamWaker(parent, r.StreamEntry, r.Stream)
if err != nil { if err != nil {
r.task.Finish(err) r.task.Finish(err)
return err return err
@ -88,7 +87,7 @@ func (r *StreamRoute) Start(parent task.Parent) E.Error {
return E.From(err) return E.From(err)
} }
r.task.OnFinished("close_stream", func() { r.task.OnCancel("close_stream", func() {
if err := r.Stream.Close(); err != nil { if err := r.Stream.Close(); err != nil {
E.LogError("close stream failed", err, &r.l) E.LogError("close stream failed", err, &r.l)
} }
@ -107,7 +106,7 @@ func (r *StreamRoute) Start(parent task.Parent) E.Error {
go r.acceptConnections() go r.acceptConnections()
routes.SetStreamRoute(r.TargetName(), r) routes.SetStreamRoute(r.TargetName(), r)
r.task.OnFinished("entrypoint_remove_route", func() { r.task.OnCancel("entrypoint_remove_route", func() {
routes.DeleteStreamRoute(r.TargetName()) routes.DeleteStreamRoute(r.TargetName())
}) })
return nil return nil
@ -144,14 +143,10 @@ func (r *StreamRoute) acceptConnections() {
if conn == nil { if conn == nil {
panic("connection is nil") panic("connection is nil")
} }
connTask := r.task.Subtask("connection")
go func() { go func() {
err := r.Stream.Handle(conn) err := r.Stream.Handle(conn)
if err != nil && !errors.Is(err, context.Canceled) { if err != nil && !errors.Is(err, context.Canceled) {
E.LogError("handle connection error", err, &r.l) E.LogError("handle connection error", err, &r.l)
connTask.Finish(err)
} else {
connTask.Finish("closed")
} }
}() }()
} }