diff --git a/internal/docker/idlewatcher/container.go b/internal/docker/idlewatcher/container.go new file mode 100644 index 0000000..6a8ca1e --- /dev/null +++ b/internal/docker/idlewatcher/container.go @@ -0,0 +1,41 @@ +package idlewatcher + +import ( + "context" + "errors" + + "github.com/docker/docker/api/types/container" +) + +func (w *Watcher) containerStop(ctx context.Context) error { + return w.client.ContainerStop(ctx, w.ContainerID, container.StopOptions{ + Signal: string(w.StopSignal), + Timeout: &w.StopTimeout, + }) +} + +func (w *Watcher) containerPause(ctx context.Context) error { + return w.client.ContainerPause(ctx, w.ContainerID) +} + +func (w *Watcher) containerKill(ctx context.Context) error { + return w.client.ContainerKill(ctx, w.ContainerID, string(w.StopSignal)) +} + +func (w *Watcher) containerUnpause(ctx context.Context) error { + return w.client.ContainerUnpause(ctx, w.ContainerID) +} + +func (w *Watcher) containerStart(ctx context.Context) error { + return w.client.ContainerStart(ctx, w.ContainerID, container.StartOptions{}) +} + +func (w *Watcher) containerStatus() (string, error) { + ctx, cancel := context.WithTimeoutCause(w.task.Context(), dockerReqTimeout, errors.New("docker request timeout")) + defer cancel() + json, err := w.client.ContainerInspect(ctx, w.ContainerID) + if err != nil { + return "", err + } + return json.State.Status, nil +} diff --git a/internal/docker/idlewatcher/state.go b/internal/docker/idlewatcher/state.go new file mode 100644 index 0000000..939ec4e --- /dev/null +++ b/internal/docker/idlewatcher/state.go @@ -0,0 +1,39 @@ +package idlewatcher + +func (w *Watcher) running() bool { + return w.state.Load().running +} + +func (w *Watcher) ready() bool { + return w.state.Load().ready +} + +func (w *Watcher) error() error { + return w.state.Load().err +} + +func (w *Watcher) setReady() { + w.state.Store(&containerState{ + running: true, + ready: true, + }) +} + +func (w *Watcher) setStarting() { + w.state.Store(&containerState{ + running: true, + ready: false, + }) +} + +func (w *Watcher) setNapping() { + w.setError(nil) +} + +func (w *Watcher) setError(err error) { + w.state.Store(&containerState{ + running: false, + ready: false, + err: err, + }) +} diff --git a/internal/docker/idlewatcher/waker.go b/internal/docker/idlewatcher/waker.go index 81608eb..dd4c009 100644 --- a/internal/docker/idlewatcher/waker.go +++ b/internal/docker/idlewatcher/waker.go @@ -1,7 +1,6 @@ package idlewatcher import ( - "errors" "time" "github.com/yusing/go-proxy/internal/docker/idlewatcher/types" @@ -12,7 +11,6 @@ import ( route "github.com/yusing/go-proxy/internal/route/types" "github.com/yusing/go-proxy/internal/task" U "github.com/yusing/go-proxy/internal/utils" - "github.com/yusing/go-proxy/internal/utils/atomic" "github.com/yusing/go-proxy/internal/watcher/health" "github.com/yusing/go-proxy/internal/watcher/health/monitor" ) @@ -22,11 +20,10 @@ type ( waker struct { _ U.NoCopy - rp *reverseproxy.ReverseProxy - stream net.Stream - hc health.HealthChecker - metric *metrics.Gauge - lastErr atomic.Value[error] + rp *reverseproxy.ReverseProxy + stream net.Stream + hc health.HealthChecker + metric *metrics.Gauge } ) @@ -35,8 +32,6 @@ const ( idleWakerCheckTimeout = time.Second ) -var noErr = errors.New("no error") - // TODO: support stream func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy, stream net.Stream) (Waker, gperr.Error) { @@ -47,8 +42,7 @@ func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReversePro rp: rp, stream: stream, } - task := parent.Subtask("idlewatcher." + route.TargetName()) - watcher, err := registerWatcher(task, route, waker) + watcher, err := registerWatcher(parent, route, waker) if err != nil { return nil, gperr.Errorf("register watcher: %w", err) } @@ -121,43 +115,46 @@ func (w *Watcher) Latency() time.Duration { // Status implements health.HealthMonitor. func (w *Watcher) Status() health.Status { - status := w.getStatusUpdateReady() - if w.metric != nil { - w.metric.Set(float64(status)) - } - return status -} - -func (w *Watcher) getStatusUpdateReady() health.Status { - if !w.running.Load() { - return health.StatusNapping - } - - if w.ready.Load() { - return health.StatusHealthy - } - - result, err := w.hc.CheckHealth() - switch { - case err != nil: - w.lastErr.Store(err) - w.ready.Store(false) + state := w.state.Load() + if state.err != nil { return health.StatusError - case result.Healthy: - w.lastErr.Store(noErr) - w.ready.Store(true) + } + if state.ready { return health.StatusHealthy - default: - w.lastErr.Store(noErr) + } + if state.running { return health.StatusStarting } + return health.StatusNapping } -func (w *Watcher) LastError() error { - if err := w.lastErr.Load(); err != noErr { - return err +func (w *Watcher) checkUpdateState() (ready bool, err error) { + // already ready + if w.ready() { + return true, nil } - return nil + + if w.metric != nil { + defer w.metric.Set(float64(w.Status())) + } + + // the new container info not yet updated + if w.hc.URL().Host == "" { + return false, nil + } + + res, err := w.hc.CheckHealth() + if err != nil { + w.setError(err) + return false, err + } + + if res.Healthy { + w.setReady() + return true, nil + } + w.setStarting() + return false, nil } // MarshalJSON implements health.HealthMonitor. @@ -167,7 +164,7 @@ func (w *Watcher) MarshalJSON() ([]byte, error) { url = w.hc.URL() } var detail string - if err := w.LastError(); err != nil { + if err := w.error(); err != nil { detail = err.Error() } return (&monitor.JSONRepresentation{ diff --git a/internal/docker/idlewatcher/waker_http.go b/internal/docker/idlewatcher/waker_http.go index f3b1975..4658bc5 100644 --- a/internal/docker/idlewatcher/waker_http.go +++ b/internal/docker/idlewatcher/waker_http.go @@ -9,7 +9,6 @@ import ( gphttp "github.com/yusing/go-proxy/internal/net/gphttp" "github.com/yusing/go-proxy/internal/net/gphttp/httpheaders" - "github.com/yusing/go-proxy/internal/watcher/health" ) type ForceCacheControl struct { @@ -42,11 +41,25 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } } +func (w *Watcher) cancelled(reqCtx context.Context, rw http.ResponseWriter) bool { + select { + case <-reqCtx.Done(): + w.WakeDebug().Str("cause", context.Cause(reqCtx).Error()).Msg("canceled") + return true + case <-w.task.Context().Done(): + w.WakeDebug().Str("cause", w.task.FinishCause().Error()).Msg("canceled") + http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) + return true + default: + return false + } +} + func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldNext bool) { w.resetIdleTimer() // pass through if container is already ready - if w.ready.Load() { + if w.ready() { return true } @@ -56,10 +69,6 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN return false } - if r.Body != nil { - defer r.Body.Close() - } - accept := gphttp.GetAccept(r.Header) acceptHTML := (r.Method == http.MethodGet && accept.AcceptHTML() || r.RequestURI == "/" && accept.IsEmpty()) @@ -82,21 +91,7 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN ctx, cancel := context.WithTimeoutCause(r.Context(), w.WakeTimeout, errors.New("wake timeout")) defer cancel() - checkCanceled := func() (canceled bool) { - select { - case <-ctx.Done(): - w.WakeDebug().Str("cause", context.Cause(ctx).Error()).Msg("canceled") - return true - case <-w.task.Context().Done(): - w.WakeDebug().Str("cause", w.task.FinishCause().Error()).Msg("canceled") - http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) - return true - default: - return false - } - } - - if checkCanceled() { + if w.cancelled(ctx, rw) { return false } @@ -109,11 +104,16 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN } for { - if checkCanceled() { + if w.cancelled(ctx, rw) { return false } - if w.Status() == health.StatusHealthy { + ready, err := w.checkUpdateState() + if err != nil { + http.Error(rw, "Error waking container", http.StatusInternalServerError) + return false + } + if ready { w.resetIdleTimer() if isCheckRedirect { w.Debug().Msgf("redirecting to %s ...", w.hc.URL()) diff --git a/internal/docker/idlewatcher/waker_stream.go b/internal/docker/idlewatcher/waker_stream.go index f1ade45..9c7954e 100644 --- a/internal/docker/idlewatcher/waker_stream.go +++ b/internal/docker/idlewatcher/waker_stream.go @@ -8,7 +8,6 @@ import ( "time" "github.com/yusing/go-proxy/internal/net/types" - "github.com/yusing/go-proxy/internal/watcher/health" ) // Setup implements types.Stream. @@ -50,7 +49,7 @@ func (w *Watcher) wakeFromStream() error { w.resetIdleTimer() // pass through if container is already ready - if w.ready.Load() { + if w.ready() { return nil } @@ -78,7 +77,9 @@ func (w *Watcher) wakeFromStream() error { default: } - if w.Status() == health.StatusHealthy { + if ready, err := w.checkUpdateState(); err != nil { + return err + } else if ready { w.resetIdleTimer() w.Debug().Msg("container is ready, passing through to " + w.hc.URL().String()) return nil diff --git a/internal/docker/idlewatcher/watcher.go b/internal/docker/idlewatcher/watcher.go index 7ff4a0c..a60832f 100644 --- a/internal/docker/idlewatcher/watcher.go +++ b/internal/docker/idlewatcher/watcher.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/docker/docker/api/types/container" "github.com/rs/zerolog" "github.com/yusing/go-proxy/internal/docker" idlewatcher "github.com/yusing/go-proxy/internal/docker/idlewatcher/types" @@ -30,9 +29,8 @@ type ( *containerMeta *idlewatcher.Config - client *docker.SharedClient - running atomic.Bool - ready atomic.Bool + client *docker.SharedClient + state atomic.Value[*containerState] stopByMethod StopCallback // send a docker command w.r.t. `stop_method` ticker *time.Ticker @@ -42,9 +40,12 @@ type ( containerMeta struct { ContainerID, ContainerName string } + containerState struct { + running bool + ready bool + err error + } - WakeDone <-chan error - WakeFunc func() WakeDone StopCallback func() error ) @@ -57,7 +58,7 @@ var ( const dockerReqTimeout = 3 * time.Second -func registerWatcher(watcherTask *task.Task, route route.Route, waker *waker) (*Watcher, error) { +func registerWatcher(parent task.Parent, route route.Route, waker *waker) (*Watcher, error) { cfg := route.IdlewatcherConfig() if cfg.IdleTimeout == 0 { @@ -69,44 +70,52 @@ func registerWatcher(watcherTask *task.Task, route route.Route, waker *waker) (* watcherMapMu.Lock() defer watcherMapMu.Unlock() + w, ok := watcherMap[key] + if !ok { + client, err := docker.NewClient(cont.DockerHost) + if err != nil { + return nil, err + } - // cancel previous watcher - if w, ok := watcherMap[key]; ok { - defer w.Finish("new request with same container id") + w = &Watcher{ + Logger: logging.With().Str("name", cont.ContainerName).Logger(), + client: client, + task: parent.Subtask("idlewatcher." + cont.ContainerName), + ticker: time.NewTicker(cfg.IdleTimeout), + } } - client, err := docker.NewClient(cont.DockerHost) - if err != nil { - return nil, err + // FIXME: possible race condition here + w.waker = waker + w.containerMeta = &containerMeta{ + ContainerID: cont.ContainerID, + ContainerName: cont.ContainerName, + } + w.Config = cfg + w.ticker.Reset(cfg.IdleTimeout) + + if cont.Running { + w.setStarting() + } else { + w.setNapping() } - w := &Watcher{ - Logger: logging.With().Str("name", cont.ContainerName).Logger(), - waker: waker, - containerMeta: &containerMeta{ - ContainerID: cont.ContainerID, - ContainerName: cont.ContainerName, - }, - Config: cfg, - client: client, - task: watcherTask, - ticker: time.NewTicker(cfg.IdleTimeout), + if !ok { + w.stopByMethod = w.getStopCallback() + watcherMap[key] = w + + go func() { + cause := w.watchUntilDestroy() + + watcherMapMu.Lock() + defer watcherMapMu.Unlock() + delete(watcherMap, key) + + w.ticker.Stop() + w.client.Close() + w.task.Finish(cause) + }() } - w.running.Store(cont.Running) - w.stopByMethod = w.getStopCallback() - watcherMap[key] = w - - go func() { - cause := w.watchUntilDestroy() - - watcherMapMu.Lock() - defer watcherMapMu.Unlock() - delete(watcherMap, key) - - w.ticker.Stop() - w.client.Close() - w.task.Finish(cause) - }() return w, nil } @@ -130,41 +139,8 @@ func (w *Watcher) WakeError(err error) { w.Err(err).Str("action", "wake").Msg("error") } -func (w *Watcher) containerStop(ctx context.Context) error { - return w.client.ContainerStop(ctx, w.ContainerID, container.StopOptions{ - Signal: string(w.StopSignal), - Timeout: &w.StopTimeout, - }) -} - -func (w *Watcher) containerPause(ctx context.Context) error { - return w.client.ContainerPause(ctx, w.ContainerID) -} - -func (w *Watcher) containerKill(ctx context.Context) error { - return w.client.ContainerKill(ctx, w.ContainerID, string(w.StopSignal)) -} - -func (w *Watcher) containerUnpause(ctx context.Context) error { - return w.client.ContainerUnpause(ctx, w.ContainerID) -} - -func (w *Watcher) containerStart(ctx context.Context) error { - return w.client.ContainerStart(ctx, w.ContainerID, container.StartOptions{}) -} - -func (w *Watcher) containerStatus() (string, error) { - ctx, cancel := context.WithTimeoutCause(w.task.Context(), dockerReqTimeout, errors.New("docker request timeout")) - defer cancel() - json, err := w.client.ContainerInspect(ctx, w.ContainerID) - if err != nil { - return "", err - } - return json.State.Status, nil -} - func (w *Watcher) wakeIfStopped() error { - if w.running.Load() { + if w.running() { return nil } @@ -218,8 +194,8 @@ func (w *Watcher) expires() time.Time { return w.lastReset.Add(w.IdleTimeout) } -func (w *Watcher) getEventCh(dockerWatcher *watcher.DockerWatcher) (eventCh <-chan events.Event, errCh <-chan gperr.Error) { - eventCh, errCh = dockerWatcher.EventsWithOptions(w.Task().Context(), watcher.DockerListOptions{ +func (w *Watcher) getEventCh(ctx context.Context, dockerWatcher *watcher.DockerWatcher) (eventCh <-chan events.Event, errCh <-chan gperr.Error) { + eventCh, errCh = dockerWatcher.EventsWithOptions(ctx, watcher.DockerListOptions{ Filters: watcher.NewDockerFilter( watcher.DockerFilterContainer, watcher.DockerFilterContainerNameID(w.ContainerID), @@ -247,8 +223,11 @@ func (w *Watcher) getEventCh(dockerWatcher *watcher.DockerWatcher) (eventCh <-ch // it exits only if the context is canceled, the container is destroyed, // errors occurred on docker client, or route provider died (mainly caused by config reload). func (w *Watcher) watchUntilDestroy() (returnCause error) { + eventCtx, eventCancel := context.WithCancel(w.task.Context()) + defer eventCancel() + dockerWatcher := watcher.NewDockerWatcher(w.client.DaemonHost()) - dockerEventCh, dockerEventErrCh := w.getEventCh(dockerWatcher) + dockerEventCh, dockerEventErrCh := w.getEventCh(eventCtx, dockerWatcher) for { select { @@ -262,18 +241,17 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { case e := <-dockerEventCh: switch { case e.Action == events.ActionContainerDestroy: - w.running.Store(false) - w.ready.Store(false) + w.setError(errors.New("container destroyed")) w.Info().Str("reason", "container destroyed").Msg("watcher stopped") return errors.New("container destroyed") // create / start / unpause case e.Action.IsContainerWake(): - w.running.Store(true) + w.setStarting() w.resetIdleTimer() w.Info().Msg("awaken") case e.Action.IsContainerSleep(): // stop / pause / kil - w.running.Store(false) - w.ready.Store(false) + w.setNapping() + w.resetIdleTimer() w.ticker.Stop() default: w.Error().Msg("unexpected docker event: " + e.String()) @@ -287,11 +265,15 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { w.Debug().Msgf("id changed %s -> %s", w.ContainerID, e.ActorID) w.ContainerID = e.ActorID // recreate event stream - dockerEventCh, dockerEventErrCh = w.getEventCh(dockerWatcher) + eventCancel() + + eventCtx, eventCancel = context.WithCancel(w.task.Context()) + defer eventCancel() + dockerEventCh, dockerEventErrCh = w.getEventCh(eventCtx, dockerWatcher) } case <-w.ticker.C: w.ticker.Stop() - if w.running.Load() { + if w.running() { err := w.stopByMethod() switch { case errors.Is(err, context.Canceled):