diff --git a/internal/docker/idlewatcher/waker.go b/internal/docker/idlewatcher/waker.go index ff589fc..6439e65 100644 --- a/internal/docker/idlewatcher/waker.go +++ b/internal/docker/idlewatcher/waker.go @@ -46,7 +46,7 @@ func newWaker(parent task.Parent, entry route.Entry, rp *gphttp.ReverseProxy, st rp: rp, stream: stream, } - task := parent.Subtask("idlewatcher") + task := parent.Subtask("idlewatcher." + entry.TargetName()) watcher, err := registerWatcher(task, entry, waker) if err != nil { return nil, E.Errorf("register watcher: %w", err) @@ -117,6 +117,7 @@ func (w *Watcher) Uptime() time.Duration { return 0 } +// Status implements health.HealthMonitor. func (w *Watcher) Status() health.Status { status := w.getStatusUpdateReady() if w.metric != nil { @@ -125,7 +126,6 @@ func (w *Watcher) Status() health.Status { return status } -// Status implements health.HealthMonitor. func (w *Watcher) getStatusUpdateReady() health.Status { if !w.ContainerRunning { return health.StatusNapping diff --git a/internal/net/http/middleware/errorpage/error_page.go b/internal/net/http/middleware/errorpage/error_page.go index a7eb003..e9948ba 100644 --- a/internal/net/http/middleware/errorpage/error_page.go +++ b/internal/net/http/middleware/errorpage/error_page.go @@ -32,7 +32,7 @@ func setup() { return } - t := task.RootTask("error_page", true) + t := task.RootTask("error_page", false) dirWatcher = W.NewDirectoryWatcher(t, errPagesBasePath) loadContent() go watchDir() diff --git a/internal/route/http.go b/internal/route/http.go index 67c4718..c67324e 100755 --- a/internal/route/http.go +++ b/internal/route/http.go @@ -83,7 +83,7 @@ func (r *HTTPRoute) Start(parent task.Parent) E.Error { switch { case entry.UseIdleWatcher(r): - waker, err := idlewatcher.NewHTTPWaker(r.task, r.ReverseProxyEntry, r.rp) + waker, err := idlewatcher.NewHTTPWaker(parent, r.ReverseProxyEntry, r.rp) if err != nil { r.task.Finish(err) return err @@ -144,13 +144,13 @@ func (r *HTTPRoute) Start(parent task.Parent) E.Error { r.addToLoadBalancer(parent) } else { routes.SetHTTPRoute(r.TargetName(), r) - r.task.OnFinished("entrypoint_remove_route", func() { + r.task.OnCancel("entrypoint_remove_route", func() { routes.DeleteHTTPRoute(r.TargetName()) }) } if common.PrometheusEnabled { - r.task.OnFinished("metrics_cleanup", r.rp.UnregisterMetrics) + r.task.OnCancel("metrics_cleanup", r.rp.UnregisterMetrics) } return nil } diff --git a/internal/route/provider/provider.go b/internal/route/provider/provider.go index a834ac0..9933bb3 100644 --- a/internal/route/provider/provider.go +++ b/internal/route/provider/provider.go @@ -108,9 +108,6 @@ func (p *Provider) startRoute(parent task.Parent, r *R.Route) E.Error { } p.routes.Store(r.Entry.Alias, r) - r.Task().OnFinished("provider_remove_route", func() { - p.routes.Delete(r.Entry.Alias) - }) return nil } diff --git a/internal/route/routes/routes.go b/internal/route/routes/routes.go index 4d0165f..a372aa5 100644 --- a/internal/route/routes/routes.go +++ b/internal/route/routes/routes.go @@ -18,6 +18,15 @@ func GetStreamRoutes() F.Map[string, types.StreamRoute] { return streamRoutes } +func GetHTTPRouteOrExact(alias, host string) (types.HTTPRoute, bool) { + r, ok := httpRoutes.Load(alias) + if ok { + return r, true + } + // try find with exact match + return httpRoutes.Load(host) +} + func GetHTTPRoute(alias string) (types.HTTPRoute, bool) { return httpRoutes.Load(alias) } @@ -41,3 +50,8 @@ func DeleteHTTPRoute(alias string) { func DeleteStreamRoute(alias string) { streamRoutes.Delete(alias) } + +func TestClear() { + httpRoutes = F.NewMapOf[string, types.HTTPRoute]() + streamRoutes = F.NewMapOf[string, types.StreamRoute]() +} diff --git a/internal/route/stream.go b/internal/route/stream.go index fc83721..62335f0 100755 --- a/internal/route/stream.go +++ b/internal/route/stream.go @@ -55,14 +55,13 @@ func (r *StreamRoute) Start(parent task.Parent) E.Error { r.task = parent.Subtask("stream." + r.TargetName()) r.Stream = NewStream(r) - parent.OnCancel("finish", func() { r.task.Finish(nil) }) switch { case entry.UseIdleWatcher(r): - waker, err := idlewatcher.NewStreamWaker(r.task, r.StreamEntry, r.Stream) + waker, err := idlewatcher.NewStreamWaker(parent, r.StreamEntry, r.Stream) if err != nil { r.task.Finish(err) return err @@ -88,7 +87,7 @@ func (r *StreamRoute) Start(parent task.Parent) E.Error { return E.From(err) } - r.task.OnFinished("close_stream", func() { + r.task.OnCancel("close_stream", func() { if err := r.Stream.Close(); err != nil { E.LogError("close stream failed", err, &r.l) } @@ -107,7 +106,7 @@ func (r *StreamRoute) Start(parent task.Parent) E.Error { go r.acceptConnections() routes.SetStreamRoute(r.TargetName(), r) - r.task.OnFinished("entrypoint_remove_route", func() { + r.task.OnCancel("entrypoint_remove_route", func() { routes.DeleteStreamRoute(r.TargetName()) }) return nil @@ -144,14 +143,10 @@ func (r *StreamRoute) acceptConnections() { if conn == nil { panic("connection is nil") } - connTask := r.task.Subtask("connection") go func() { err := r.Stream.Handle(conn) if err != nil && !errors.Is(err, context.Canceled) { E.LogError("handle connection error", err, &r.l) - connTask.Finish(err) - } else { - connTask.Finish("closed") } }() }