From a7da8ffb9026caa090de4a4ce4899602f5b358d7 Mon Sep 17 00:00:00 2001 From: yusing Date: Fri, 28 Mar 2025 08:10:03 +0800 Subject: [PATCH] refactor: clean up code and fix race condition in idlewatcher --- internal/docker/idlewatcher/container.go | 52 ++++++ internal/docker/idlewatcher/loading_page.go | 6 +- internal/docker/idlewatcher/state.go | 39 +++++ internal/docker/idlewatcher/types/config.go | 39 ++--- internal/docker/idlewatcher/waker.go | 97 ++++++----- internal/docker/idlewatcher/waker_http.go | 52 +++--- internal/docker/idlewatcher/waker_stream.go | 7 +- internal/docker/idlewatcher/watcher.go | 168 +++++++++----------- 8 files changed, 264 insertions(+), 196 deletions(-) create mode 100644 internal/docker/idlewatcher/container.go create mode 100644 internal/docker/idlewatcher/state.go diff --git a/internal/docker/idlewatcher/container.go b/internal/docker/idlewatcher/container.go new file mode 100644 index 0000000..a0245fe --- /dev/null +++ b/internal/docker/idlewatcher/container.go @@ -0,0 +1,52 @@ +package idlewatcher + +import ( + "context" + "errors" + + "github.com/docker/docker/api/types/container" +) + +type ( + containerMeta struct { + ContainerID, ContainerName string + } + containerState struct { + running bool + ready bool + err error + } +) + +func (w *Watcher) containerStop(ctx context.Context) error { + return w.client.ContainerStop(ctx, w.ContainerID, container.StopOptions{ + Signal: string(w.StopSignal), + Timeout: &w.StopTimeout, + }) +} + +func (w *Watcher) containerPause(ctx context.Context) error { + return w.client.ContainerPause(ctx, w.ContainerID) +} + +func (w *Watcher) containerKill(ctx context.Context) error { + return w.client.ContainerKill(ctx, w.ContainerID, string(w.StopSignal)) +} + +func (w *Watcher) containerUnpause(ctx context.Context) error { + return w.client.ContainerUnpause(ctx, w.ContainerID) +} + +func (w *Watcher) containerStart(ctx context.Context) error { + return w.client.ContainerStart(ctx, w.ContainerID, container.StartOptions{}) +} + +func (w *Watcher) containerStatus() (string, error) { + ctx, cancel := context.WithTimeoutCause(w.task.Context(), dockerReqTimeout, errors.New("docker request timeout")) + defer cancel() + json, err := w.client.ContainerInspect(ctx, w.ContainerID) + if err != nil { + return "", err + } + return json.State.Status, nil +} diff --git a/internal/docker/idlewatcher/loading_page.go b/internal/docker/idlewatcher/loading_page.go index 80913c7..b6e9860 100644 --- a/internal/docker/idlewatcher/loading_page.go +++ b/internal/docker/idlewatcher/loading_page.go @@ -6,7 +6,7 @@ import ( "strings" "text/template" - "github.com/yusing/go-proxy/internal/common" + "github.com/yusing/go-proxy/internal/net/gphttp/httpheaders" ) type templateData struct { @@ -23,11 +23,11 @@ func (w *Watcher) makeLoadingPageBody() []byte { msg := w.ContainerName + " is starting..." data := new(templateData) - data.CheckRedirectHeader = common.HeaderCheckRedirect + data.CheckRedirectHeader = httpheaders.HeaderGoDoxyCheckRedirect data.Title = w.ContainerName data.Message = strings.ReplaceAll(msg, " ", " ") - buf := bytes.NewBuffer(make([]byte, len(loadingPage)+len(data.Title)+len(data.Message)+len(common.HeaderCheckRedirect))) + buf := bytes.NewBuffer(make([]byte, len(loadingPage)+len(data.Title)+len(data.Message)+len(httpheaders.HeaderGoDoxyCheckRedirect))) err := loadingPageTmpl.Execute(buf, data) if err != nil { // should never happen in production panic(err) diff --git a/internal/docker/idlewatcher/state.go b/internal/docker/idlewatcher/state.go new file mode 100644 index 0000000..939ec4e --- /dev/null +++ b/internal/docker/idlewatcher/state.go @@ -0,0 +1,39 @@ +package idlewatcher + +func (w *Watcher) running() bool { + return w.state.Load().running +} + +func (w *Watcher) ready() bool { + return w.state.Load().ready +} + +func (w *Watcher) error() error { + return w.state.Load().err +} + +func (w *Watcher) setReady() { + w.state.Store(&containerState{ + running: true, + ready: true, + }) +} + +func (w *Watcher) setStarting() { + w.state.Store(&containerState{ + running: true, + ready: false, + }) +} + +func (w *Watcher) setNapping() { + w.setError(nil) +} + +func (w *Watcher) setError(err error) { + w.state.Store(&containerState{ + running: false, + ready: false, + err: err, + }) +} diff --git a/internal/docker/idlewatcher/types/config.go b/internal/docker/idlewatcher/types/config.go index a813cec..829b54f 100644 --- a/internal/docker/idlewatcher/types/config.go +++ b/internal/docker/idlewatcher/types/config.go @@ -7,7 +7,7 @@ import ( "time" "github.com/yusing/go-proxy/internal/docker" - E "github.com/yusing/go-proxy/internal/error" + "github.com/yusing/go-proxy/internal/gperr" ) type ( @@ -18,11 +18,6 @@ type ( StopMethod StopMethod `json:"stop_method,omitempty"` StopSignal Signal `json:"stop_signal,omitempty"` StartEndpoint string `json:"start_endpoint,omitempty"` // Optional path that must be hit to start container - - DockerHost string `json:"docker_host,omitempty"` - ContainerName string `json:"container_name,omitempty"` - ContainerID string `json:"container_id,omitempty"` - ContainerRunning bool `json:"container_running,omitempty"` } StopMethod string Signal string @@ -40,28 +35,19 @@ var validSignals = map[string]struct{}{ "INT": {}, "TERM": {}, "HUP": {}, "QUIT": {}, } -func ValidateConfig(cont *docker.Container) (*Config, E.Error) { - if cont == nil { +func ValidateConfig(cont *docker.Container) (*Config, gperr.Error) { + if cont == nil || cont.IdleTimeout == "" { return nil, nil } - if cont.IdleTimeout == "" { - return &Config{ - DockerHost: cont.DockerHost, - ContainerName: cont.ContainerName, - ContainerID: cont.ContainerID, - ContainerRunning: cont.Running, - }, nil - } + errs := gperr.NewBuilder("invalid idlewatcher config") - errs := E.NewBuilder("invalid idlewatcher config") - - idleTimeout := E.Collect(errs, validateDurationPostitive, cont.IdleTimeout) - wakeTimeout := E.Collect(errs, validateDurationPostitive, cont.WakeTimeout) - stopTimeout := E.Collect(errs, validateDurationPostitive, cont.StopTimeout) - stopMethod := E.Collect(errs, validateStopMethod, cont.StopMethod) - signal := E.Collect(errs, validateSignal, cont.StopSignal) - startEndpoint := E.Collect(errs, validateStartEndpoint, cont.StartEndpoint) + idleTimeout := gperr.Collect(errs, validateDurationPostitive, cont.IdleTimeout) + wakeTimeout := gperr.Collect(errs, validateDurationPostitive, cont.WakeTimeout) + stopTimeout := gperr.Collect(errs, validateDurationPostitive, cont.StopTimeout) + stopMethod := gperr.Collect(errs, validateStopMethod, cont.StopMethod) + signal := gperr.Collect(errs, validateSignal, cont.StopSignal) + startEndpoint := gperr.Collect(errs, validateStartEndpoint, cont.StartEndpoint) if errs.HasError() { return nil, errs.Error() @@ -74,11 +60,6 @@ func ValidateConfig(cont *docker.Container) (*Config, E.Error) { StopMethod: stopMethod, StopSignal: signal, StartEndpoint: startEndpoint, - - DockerHost: cont.DockerHost, - ContainerName: cont.ContainerName, - ContainerID: cont.ContainerID, - ContainerRunning: cont.Running, }, nil } diff --git a/internal/docker/idlewatcher/waker.go b/internal/docker/idlewatcher/waker.go index 9a3ca13..e535a2a 100644 --- a/internal/docker/idlewatcher/waker.go +++ b/internal/docker/idlewatcher/waker.go @@ -1,14 +1,12 @@ package idlewatcher import ( - "sync/atomic" "time" - "github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/docker/idlewatcher/types" - E "github.com/yusing/go-proxy/internal/error" + "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/metrics" - "github.com/yusing/go-proxy/internal/net/http/reverseproxy" + "github.com/yusing/go-proxy/internal/net/gphttp/reverseproxy" net "github.com/yusing/go-proxy/internal/net/types" route "github.com/yusing/go-proxy/internal/route/types" "github.com/yusing/go-proxy/internal/task" @@ -26,8 +24,6 @@ type ( stream net.Stream hc health.HealthChecker metric *metrics.Gauge - - ready atomic.Bool } ) @@ -38,7 +34,7 @@ const ( // TODO: support stream -func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy, stream net.Stream) (Waker, E.Error) { +func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy, stream net.Stream) (Waker, gperr.Error) { hcCfg := route.HealthCheckConfig() hcCfg.Timeout = idleWakerCheckTimeout @@ -46,13 +42,14 @@ func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReversePro rp: rp, stream: stream, } - task := parent.Subtask("idlewatcher." + route.TargetName()) - watcher, err := registerWatcher(task, route, waker) + watcher, err := registerWatcher(parent, route, waker) if err != nil { - return nil, E.Errorf("register watcher: %w", err) + return nil, gperr.Errorf("register watcher: %w", err) } switch { + case route.IsAgent(): + waker.hc = monitor.NewAgentProxiedMonitor(route.Agent(), hcCfg, monitor.AgentTargetFromURL(route.TargetURL())) case rp != nil: waker.hc = monitor.NewHTTPHealthChecker(route.TargetURL(), hcCfg) case stream != nil: @@ -61,26 +58,20 @@ func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReversePro panic("both nil") } - if common.PrometheusEnabled { - m := metrics.GetServiceMetrics() - fqn := parent.Name() + "/" + route.TargetName() - waker.metric = m.HealthStatus.With(metrics.HealthMetricLabels(fqn)) - waker.metric.Set(float64(watcher.Status())) - } return watcher, nil } // lifetime should follow route provider. -func NewHTTPWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy) (Waker, E.Error) { +func NewHTTPWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy) (Waker, gperr.Error) { return newWaker(parent, route, rp, nil) } -func NewStreamWaker(parent task.Parent, route route.Route, stream net.Stream) (Waker, E.Error) { +func NewStreamWaker(parent task.Parent, route route.Route, stream net.Stream) (Waker, gperr.Error) { return newWaker(parent, route, nil, stream) } // Start implements health.HealthMonitor. -func (w *Watcher) Start(parent task.Parent) E.Error { +func (w *Watcher) Start(parent task.Parent) gperr.Error { w.task.OnCancel("route_cleanup", func() { parent.Finish(w.task.FinishCause()) if w.metric != nil { @@ -124,33 +115,50 @@ func (w *Watcher) Latency() time.Duration { // Status implements health.HealthMonitor. func (w *Watcher) Status() health.Status { - status := w.getStatusUpdateReady() - if w.metric != nil { - w.metric.Set(float64(status)) - } - return status -} - -func (w *Watcher) getStatusUpdateReady() health.Status { - if !w.ContainerRunning { - return health.StatusNapping - } - - if w.ready.Load() { - return health.StatusHealthy - } - - result, err := w.hc.CheckHealth() - switch { - case err != nil: - w.ready.Store(false) + state := w.state.Load() + if state.err != nil { return health.StatusError - case result.Healthy: - w.ready.Store(true) + } + if state.ready { return health.StatusHealthy - default: + } + if state.running { return health.StatusStarting } + return health.StatusNapping +} + +func (w *Watcher) checkUpdateState() (ready bool, err error) { + // already ready + if w.ready() { + return true, nil + } + + if !w.running() { + return false, nil + } + + if w.metric != nil { + defer w.metric.Set(float64(w.Status())) + } + + // the new container info not yet updated + if w.hc.URL().Host == "" { + return false, nil + } + + res, err := w.hc.CheckHealth() + if err != nil { + w.setError(err) + return false, err + } + + if res.Healthy { + w.setReady() + return true, nil + } + w.setStarting() + return false, nil } // MarshalJSON implements health.HealthMonitor. @@ -159,10 +167,15 @@ func (w *Watcher) MarshalJSON() ([]byte, error) { if w.hc.URL().Port() != "0" { url = w.hc.URL() } + var detail string + if err := w.error(); err != nil { + detail = err.Error() + } return (&monitor.JSONRepresentation{ Name: w.Name(), Status: w.Status(), Config: w.hc.Config(), URL: url, + Detail: detail, }).MarshalJSON() } diff --git a/internal/docker/idlewatcher/waker_http.go b/internal/docker/idlewatcher/waker_http.go index b3b7c9b..4658bc5 100644 --- a/internal/docker/idlewatcher/waker_http.go +++ b/internal/docker/idlewatcher/waker_http.go @@ -7,9 +7,8 @@ import ( "strconv" "time" - "github.com/yusing/go-proxy/internal/common" - gphttp "github.com/yusing/go-proxy/internal/net/http" - "github.com/yusing/go-proxy/internal/watcher/health" + gphttp "github.com/yusing/go-proxy/internal/net/gphttp" + "github.com/yusing/go-proxy/internal/net/gphttp/httpheaders" ) type ForceCacheControl struct { @@ -42,11 +41,25 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } } +func (w *Watcher) cancelled(reqCtx context.Context, rw http.ResponseWriter) bool { + select { + case <-reqCtx.Done(): + w.WakeDebug().Str("cause", context.Cause(reqCtx).Error()).Msg("canceled") + return true + case <-w.task.Context().Done(): + w.WakeDebug().Str("cause", w.task.FinishCause().Error()).Msg("canceled") + http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) + return true + default: + return false + } +} + func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldNext bool) { w.resetIdleTimer() // pass through if container is already ready - if w.ready.Load() { + if w.ready() { return true } @@ -56,14 +69,10 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN return false } - if r.Body != nil { - defer r.Body.Close() - } - accept := gphttp.GetAccept(r.Header) acceptHTML := (r.Method == http.MethodGet && accept.AcceptHTML() || r.RequestURI == "/" && accept.IsEmpty()) - isCheckRedirect := r.Header.Get(common.HeaderCheckRedirect) != "" + isCheckRedirect := r.Header.Get(httpheaders.HeaderGoDoxyCheckRedirect) != "" if !isCheckRedirect && acceptHTML { // Send a loading response to the client body := w.makeLoadingPageBody() @@ -82,21 +91,7 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN ctx, cancel := context.WithTimeoutCause(r.Context(), w.WakeTimeout, errors.New("wake timeout")) defer cancel() - checkCanceled := func() (canceled bool) { - select { - case <-ctx.Done(): - w.WakeDebug().Str("cause", context.Cause(ctx).Error()).Msg("canceled") - return true - case <-w.task.Context().Done(): - w.WakeDebug().Str("cause", w.task.FinishCause().Error()).Msg("canceled") - http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) - return true - default: - return false - } - } - - if checkCanceled() { + if w.cancelled(ctx, rw) { return false } @@ -109,11 +104,16 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN } for { - if checkCanceled() { + if w.cancelled(ctx, rw) { return false } - if w.Status() == health.StatusHealthy { + ready, err := w.checkUpdateState() + if err != nil { + http.Error(rw, "Error waking container", http.StatusInternalServerError) + return false + } + if ready { w.resetIdleTimer() if isCheckRedirect { w.Debug().Msgf("redirecting to %s ...", w.hc.URL()) diff --git a/internal/docker/idlewatcher/waker_stream.go b/internal/docker/idlewatcher/waker_stream.go index f1ade45..9c7954e 100644 --- a/internal/docker/idlewatcher/waker_stream.go +++ b/internal/docker/idlewatcher/waker_stream.go @@ -8,7 +8,6 @@ import ( "time" "github.com/yusing/go-proxy/internal/net/types" - "github.com/yusing/go-proxy/internal/watcher/health" ) // Setup implements types.Stream. @@ -50,7 +49,7 @@ func (w *Watcher) wakeFromStream() error { w.resetIdleTimer() // pass through if container is already ready - if w.ready.Load() { + if w.ready() { return nil } @@ -78,7 +77,9 @@ func (w *Watcher) wakeFromStream() error { default: } - if w.Status() == health.StatusHealthy { + if ready, err := w.checkUpdateState(); err != nil { + return err + } else if ready { w.resetIdleTimer() w.Debug().Msg("container is ready, passing through to " + w.hc.URL().String()) return nil diff --git a/internal/docker/idlewatcher/watcher.go b/internal/docker/idlewatcher/watcher.go index eb72fbc..0f12a03 100644 --- a/internal/docker/idlewatcher/watcher.go +++ b/internal/docker/idlewatcher/watcher.go @@ -6,16 +6,15 @@ import ( "sync" "time" - "github.com/docker/docker/api/types/container" "github.com/rs/zerolog" - D "github.com/yusing/go-proxy/internal/docker" + "github.com/yusing/go-proxy/internal/docker" idlewatcher "github.com/yusing/go-proxy/internal/docker/idlewatcher/types" - E "github.com/yusing/go-proxy/internal/error" + "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/logging" route "github.com/yusing/go-proxy/internal/route/types" "github.com/yusing/go-proxy/internal/task" U "github.com/yusing/go-proxy/internal/utils" - F "github.com/yusing/go-proxy/internal/utils/functional" + "github.com/yusing/go-proxy/internal/utils/atomic" "github.com/yusing/go-proxy/internal/watcher" "github.com/yusing/go-proxy/internal/watcher/events" ) @@ -26,75 +25,89 @@ type ( zerolog.Logger - *idlewatcher.Config *waker + *containerMeta + *idlewatcher.Config + + client *docker.SharedClient + state atomic.Value[*containerState] - client *D.SharedClient stopByMethod StopCallback // send a docker command w.r.t. `stop_method` ticker *time.Ticker lastReset time.Time task *task.Task } - WakeDone <-chan error - WakeFunc func() WakeDone StopCallback func() error ) var ( - watcherMap = F.NewMapOf[string, *Watcher]() - watcherMapMu sync.Mutex + watcherMap = make(map[string]*Watcher) + watcherMapMu sync.RWMutex errShouldNotReachHere = errors.New("should not reach here") ) const dockerReqTimeout = 3 * time.Second -func registerWatcher(watcherTask *task.Task, route route.Route, waker *waker) (*Watcher, error) { +func registerWatcher(parent task.Parent, route route.Route, waker *waker) (*Watcher, error) { cfg := route.IdlewatcherConfig() if cfg.IdleTimeout == 0 { panic(errShouldNotReachHere) } + cont := route.ContainerInfo() + key := cont.ContainerID + watcherMapMu.Lock() defer watcherMapMu.Unlock() + w, ok := watcherMap[key] + if !ok { + client, err := docker.NewClient(cont.DockerHost) + if err != nil { + return nil, err + } - key := cfg.ContainerID - - if w, ok := watcherMap.Load(key); ok { - w.Config = cfg - w.waker = waker - w.resetIdleTimer() - watcherTask.Finish("used existing watcher") - return w, nil + w = &Watcher{ + Logger: logging.With().Str("name", cont.ContainerName).Logger(), + client: client, + task: parent.Subtask("idlewatcher." + cont.ContainerName), + ticker: time.NewTicker(cfg.IdleTimeout), + } } - client, err := D.ConnectClient(cfg.DockerHost) - if err != nil { - return nil, err + // FIXME: possible race condition here + w.waker = waker + w.containerMeta = &containerMeta{ + ContainerID: cont.ContainerID, + ContainerName: cont.ContainerName, + } + w.Config = cfg + w.ticker.Reset(cfg.IdleTimeout) + + if cont.Running { + w.setStarting() + } else { + w.setNapping() } - w := &Watcher{ - Logger: logging.With().Str("name", cfg.ContainerName).Logger(), - Config: cfg, - waker: waker, - client: client, - task: watcherTask, - ticker: time.NewTicker(cfg.IdleTimeout), + if !ok { + w.stopByMethod = w.getStopCallback() + watcherMap[key] = w + + go func() { + cause := w.watchUntilDestroy() + + watcherMapMu.Lock() + defer watcherMapMu.Unlock() + delete(watcherMap, key) + + w.ticker.Stop() + w.client.Close() + w.task.Finish(cause) + }() } - w.stopByMethod = w.getStopCallback() - watcherMap.Store(key, w) - - go func() { - cause := w.watchUntilDestroy() - - watcherMap.Delete(w.ContainerID) - - w.ticker.Stop() - w.client.Close() - w.task.Finish(cause) - }() return w, nil } @@ -118,45 +131,8 @@ func (w *Watcher) WakeError(err error) { w.Err(err).Str("action", "wake").Msg("error") } -func (w *Watcher) LogReason(action, reason string) { - w.Info().Str("reason", reason).Msg(action) -} - -func (w *Watcher) containerStop(ctx context.Context) error { - return w.client.ContainerStop(ctx, w.ContainerID, container.StopOptions{ - Signal: string(w.StopSignal), - Timeout: &w.StopTimeout, - }) -} - -func (w *Watcher) containerPause(ctx context.Context) error { - return w.client.ContainerPause(ctx, w.ContainerID) -} - -func (w *Watcher) containerKill(ctx context.Context) error { - return w.client.ContainerKill(ctx, w.ContainerID, string(w.StopSignal)) -} - -func (w *Watcher) containerUnpause(ctx context.Context) error { - return w.client.ContainerUnpause(ctx, w.ContainerID) -} - -func (w *Watcher) containerStart(ctx context.Context) error { - return w.client.ContainerStart(ctx, w.ContainerID, container.StartOptions{}) -} - -func (w *Watcher) containerStatus() (string, error) { - ctx, cancel := context.WithTimeoutCause(w.task.Context(), dockerReqTimeout, errors.New("docker request timeout")) - defer cancel() - json, err := w.client.ContainerInspect(ctx, w.ContainerID) - if err != nil { - return "", err - } - return json.State.Status, nil -} - func (w *Watcher) wakeIfStopped() error { - if w.ContainerRunning { + if w.running() { return nil } @@ -177,7 +153,7 @@ func (w *Watcher) wakeIfStopped() error { case "running": return nil default: - return E.Errorf("unexpected container status: %s", status) + return gperr.Errorf("unexpected container status: %s", status) } } @@ -210,8 +186,8 @@ func (w *Watcher) expires() time.Time { return w.lastReset.Add(w.IdleTimeout) } -func (w *Watcher) getEventCh(dockerWatcher watcher.DockerWatcher) (eventCh <-chan events.Event, errCh <-chan E.Error) { - eventCh, errCh = dockerWatcher.EventsWithOptions(w.Task().Context(), watcher.DockerListOptions{ +func (w *Watcher) getEventCh(ctx context.Context, dockerWatcher *watcher.DockerWatcher) (eventCh <-chan events.Event, errCh <-chan gperr.Error) { + eventCh, errCh = dockerWatcher.EventsWithOptions(ctx, watcher.DockerListOptions{ Filters: watcher.NewDockerFilter( watcher.DockerFilterContainer, watcher.DockerFilterContainerNameID(w.ContainerID), @@ -239,8 +215,11 @@ func (w *Watcher) getEventCh(dockerWatcher watcher.DockerWatcher) (eventCh <-cha // it exits only if the context is canceled, the container is destroyed, // errors occurred on docker client, or route provider died (mainly caused by config reload). func (w *Watcher) watchUntilDestroy() (returnCause error) { + eventCtx, eventCancel := context.WithCancel(w.task.Context()) + defer eventCancel() + dockerWatcher := watcher.NewDockerWatcher(w.client.DaemonHost()) - dockerEventCh, dockerEventErrCh := w.getEventCh(dockerWatcher) + dockerEventCh, dockerEventErrCh := w.getEventCh(eventCtx, dockerWatcher) for { select { @@ -248,24 +227,23 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { return w.task.FinishCause() case err := <-dockerEventErrCh: if !err.Is(context.Canceled) { - E.LogError("idlewatcher error", err, &w.Logger) + gperr.LogError("idlewatcher error", err, &w.Logger) } return err case e := <-dockerEventCh: switch { case e.Action == events.ActionContainerDestroy: - w.ContainerRunning = false - w.ready.Store(false) - w.LogReason("watcher stopped", "container destroyed") + w.setError(errors.New("container destroyed")) + w.Info().Str("reason", "container destroyed").Msg("watcher stopped") return errors.New("container destroyed") // create / start / unpause case e.Action.IsContainerWake(): - w.ContainerRunning = true + w.setStarting() w.resetIdleTimer() w.Info().Msg("awaken") case e.Action.IsContainerSleep(): // stop / pause / kil - w.ContainerRunning = false - w.ready.Store(false) + w.setNapping() + w.resetIdleTimer() w.ticker.Stop() default: w.Error().Msg("unexpected docker event: " + e.String()) @@ -279,11 +257,15 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { w.Debug().Msgf("id changed %s -> %s", w.ContainerID, e.ActorID) w.ContainerID = e.ActorID // recreate event stream - dockerEventCh, dockerEventErrCh = w.getEventCh(dockerWatcher) + eventCancel() + + eventCtx, eventCancel = context.WithCancel(w.task.Context()) + defer eventCancel() + dockerEventCh, dockerEventErrCh = w.getEventCh(eventCtx, dockerWatcher) } case <-w.ticker.C: w.ticker.Stop() - if w.ContainerRunning { + if w.running() { err := w.stopByMethod() switch { case errors.Is(err, context.Canceled): @@ -294,7 +276,7 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { } w.Err(err).Msgf("container stop with method %q failed", w.StopMethod) default: - w.LogReason("container stopped", "idle timeout") + w.Info().Str("reason", "idle timeout").Msg("container stopped") } } }