From 767560804d35bb3fec60e2c59c5d0018cc0e9743 Mon Sep 17 00:00:00 2001 From: yusing Date: Tue, 25 Feb 2025 04:29:07 +0800 Subject: [PATCH] idlewatcher: refactor and fix data race --- internal/docker/idlewatcher/types/config.go | 21 +----- internal/docker/idlewatcher/waker.go | 27 +++++--- internal/docker/idlewatcher/watcher.go | 68 +++++++++++-------- internal/route/reverse_proxy.go | 4 +- internal/route/stream.go | 4 +- internal/utils/atomic/bool.go | 5 ++ .../atomic/{atomic_value.go => value.go} | 10 +-- 7 files changed, 71 insertions(+), 68 deletions(-) create mode 100644 internal/utils/atomic/bool.go rename internal/utils/atomic/{atomic_value.go => value.go} (75%) diff --git a/internal/docker/idlewatcher/types/config.go b/internal/docker/idlewatcher/types/config.go index c607bb6..829b54f 100644 --- a/internal/docker/idlewatcher/types/config.go +++ b/internal/docker/idlewatcher/types/config.go @@ -18,11 +18,6 @@ type ( StopMethod StopMethod `json:"stop_method,omitempty"` StopSignal Signal `json:"stop_signal,omitempty"` StartEndpoint string `json:"start_endpoint,omitempty"` // Optional path that must be hit to start container - - DockerHost string `json:"docker_host,omitempty"` - ContainerName string `json:"container_name,omitempty"` - ContainerID string `json:"container_id,omitempty"` - ContainerRunning bool `json:"container_running,omitempty"` } StopMethod string Signal string @@ -41,19 +36,10 @@ var validSignals = map[string]struct{}{ } func ValidateConfig(cont *docker.Container) (*Config, gperr.Error) { - if cont == nil { + if cont == nil || cont.IdleTimeout == "" { return nil, nil } - if cont.IdleTimeout == "" { - return &Config{ - DockerHost: cont.DockerHost, - ContainerName: cont.ContainerName, - ContainerID: cont.ContainerID, - ContainerRunning: cont.Running, - }, nil - } - errs := gperr.NewBuilder("invalid idlewatcher config") idleTimeout := gperr.Collect(errs, validateDurationPostitive, cont.IdleTimeout) @@ -74,11 +60,6 @@ func ValidateConfig(cont *docker.Container) (*Config, gperr.Error) { StopMethod: stopMethod, StopSignal: signal, StartEndpoint: startEndpoint, - - DockerHost: cont.DockerHost, - ContainerName: cont.ContainerName, - ContainerID: cont.ContainerID, - ContainerRunning: cont.Running, }, nil } diff --git a/internal/docker/idlewatcher/waker.go b/internal/docker/idlewatcher/waker.go index 0b6fce9..81608eb 100644 --- a/internal/docker/idlewatcher/waker.go +++ b/internal/docker/idlewatcher/waker.go @@ -1,7 +1,7 @@ package idlewatcher import ( - "sync/atomic" + "errors" "time" "github.com/yusing/go-proxy/internal/docker/idlewatcher/types" @@ -12,6 +12,7 @@ import ( route "github.com/yusing/go-proxy/internal/route/types" "github.com/yusing/go-proxy/internal/task" U "github.com/yusing/go-proxy/internal/utils" + "github.com/yusing/go-proxy/internal/utils/atomic" "github.com/yusing/go-proxy/internal/watcher/health" "github.com/yusing/go-proxy/internal/watcher/health/monitor" ) @@ -25,8 +26,7 @@ type ( stream net.Stream hc health.HealthChecker metric *metrics.Gauge - lastErr error - ready atomic.Bool + lastErr atomic.Value[error] } ) @@ -35,6 +35,8 @@ const ( idleWakerCheckTimeout = time.Second ) +var noErr = errors.New("no error") + // TODO: support stream func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReverseProxy, stream net.Stream) (Waker, gperr.Error) { @@ -127,7 +129,7 @@ func (w *Watcher) Status() health.Status { } func (w *Watcher) getStatusUpdateReady() health.Status { - if !w.ContainerRunning { + if !w.running.Load() { return health.StatusNapping } @@ -138,19 +140,26 @@ func (w *Watcher) getStatusUpdateReady() health.Status { result, err := w.hc.CheckHealth() switch { case err != nil: - w.lastErr = err + w.lastErr.Store(err) w.ready.Store(false) return health.StatusError case result.Healthy: - w.lastErr = nil + w.lastErr.Store(noErr) w.ready.Store(true) return health.StatusHealthy default: - w.lastErr = nil + w.lastErr.Store(noErr) return health.StatusStarting } } +func (w *Watcher) LastError() error { + if err := w.lastErr.Load(); err != noErr { + return err + } + return nil +} + // MarshalJSON implements health.HealthMonitor. func (w *Watcher) MarshalJSON() ([]byte, error) { var url *net.URL @@ -158,8 +167,8 @@ func (w *Watcher) MarshalJSON() ([]byte, error) { url = w.hc.URL() } var detail string - if w.lastErr != nil { - detail = w.lastErr.Error() + if err := w.LastError(); err != nil { + detail = err.Error() } return (&monitor.JSONRepresentation{ Name: w.Name(), diff --git a/internal/docker/idlewatcher/watcher.go b/internal/docker/idlewatcher/watcher.go index 5e3ff3b..7ff4a0c 100644 --- a/internal/docker/idlewatcher/watcher.go +++ b/internal/docker/idlewatcher/watcher.go @@ -15,7 +15,7 @@ import ( route "github.com/yusing/go-proxy/internal/route/types" "github.com/yusing/go-proxy/internal/task" U "github.com/yusing/go-proxy/internal/utils" - F "github.com/yusing/go-proxy/internal/utils/functional" + "github.com/yusing/go-proxy/internal/utils/atomic" "github.com/yusing/go-proxy/internal/watcher" "github.com/yusing/go-proxy/internal/watcher/events" ) @@ -26,15 +26,22 @@ type ( zerolog.Logger - *idlewatcher.Config *waker + *containerMeta + *idlewatcher.Config + + client *docker.SharedClient + running atomic.Bool + ready atomic.Bool - client *docker.SharedClient stopByMethod StopCallback // send a docker command w.r.t. `stop_method` ticker *time.Ticker lastReset time.Time task *task.Task } + containerMeta struct { + ContainerID, ContainerName string + } WakeDone <-chan error WakeFunc func() WakeDone @@ -42,8 +49,8 @@ type ( ) var ( - watcherMap = F.NewMapOf[string, *Watcher]() - watcherMapMu sync.Mutex + watcherMap = make(map[string]*Watcher) + watcherMapMu sync.RWMutex errShouldNotReachHere = errors.New("should not reach here") ) @@ -57,39 +64,44 @@ func registerWatcher(watcherTask *task.Task, route route.Route, waker *waker) (* panic(errShouldNotReachHere) } + cont := route.ContainerInfo() + key := cont.ContainerID + watcherMapMu.Lock() defer watcherMapMu.Unlock() - key := cfg.ContainerID - - if w, ok := watcherMap.Load(key); ok { - w.Config = cfg - w.waker = waker - w.resetIdleTimer() - watcherTask.Finish("used existing watcher") - return w, nil + // cancel previous watcher + if w, ok := watcherMap[key]; ok { + defer w.Finish("new request with same container id") } - client, err := docker.NewClient(cfg.DockerHost) + client, err := docker.NewClient(cont.DockerHost) if err != nil { return nil, err } w := &Watcher{ - Logger: logging.With().Str("name", cfg.ContainerName).Logger(), - Config: cfg, + Logger: logging.With().Str("name", cont.ContainerName).Logger(), waker: waker, + containerMeta: &containerMeta{ + ContainerID: cont.ContainerID, + ContainerName: cont.ContainerName, + }, + Config: cfg, client: client, task: watcherTask, ticker: time.NewTicker(cfg.IdleTimeout), } + w.running.Store(cont.Running) w.stopByMethod = w.getStopCallback() - watcherMap.Store(key, w) + watcherMap[key] = w go func() { cause := w.watchUntilDestroy() - watcherMap.Delete(w.ContainerID) + watcherMapMu.Lock() + defer watcherMapMu.Unlock() + delete(watcherMap, key) w.ticker.Stop() w.client.Close() @@ -118,10 +130,6 @@ func (w *Watcher) WakeError(err error) { w.Err(err).Str("action", "wake").Msg("error") } -func (w *Watcher) LogReason(action, reason string) { - w.Info().Str("reason", reason).Msg(action) -} - func (w *Watcher) containerStop(ctx context.Context) error { return w.client.ContainerStop(ctx, w.ContainerID, container.StopOptions{ Signal: string(w.StopSignal), @@ -156,7 +164,7 @@ func (w *Watcher) containerStatus() (string, error) { } func (w *Watcher) wakeIfStopped() error { - if w.ContainerRunning { + if w.running.Load() { return nil } @@ -239,7 +247,7 @@ func (w *Watcher) getEventCh(dockerWatcher *watcher.DockerWatcher) (eventCh <-ch // it exits only if the context is canceled, the container is destroyed, // errors occurred on docker client, or route provider died (mainly caused by config reload). func (w *Watcher) watchUntilDestroy() (returnCause error) { - dockerWatcher := watcher.NewDockerWatcher(w.Config.DockerHost) + dockerWatcher := watcher.NewDockerWatcher(w.client.DaemonHost()) dockerEventCh, dockerEventErrCh := w.getEventCh(dockerWatcher) for { @@ -254,17 +262,17 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { case e := <-dockerEventCh: switch { case e.Action == events.ActionContainerDestroy: - w.ContainerRunning = false + w.running.Store(false) w.ready.Store(false) - w.LogReason("watcher stopped", "container destroyed") + w.Info().Str("reason", "container destroyed").Msg("watcher stopped") return errors.New("container destroyed") // create / start / unpause case e.Action.IsContainerWake(): - w.ContainerRunning = true + w.running.Store(true) w.resetIdleTimer() w.Info().Msg("awaken") case e.Action.IsContainerSleep(): // stop / pause / kil - w.ContainerRunning = false + w.running.Store(false) w.ready.Store(false) w.ticker.Stop() default: @@ -283,7 +291,7 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { } case <-w.ticker.C: w.ticker.Stop() - if w.ContainerRunning { + if w.running.Load() { err := w.stopByMethod() switch { case errors.Is(err, context.Canceled): @@ -294,7 +302,7 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { } w.Err(err).Msgf("container stop with method %q failed", w.StopMethod) default: - w.LogReason("container stopped", "idle timeout") + w.Info().Str("reason", "idle timeout").Msg("container stopped") } } } diff --git a/internal/route/reverse_proxy.go b/internal/route/reverse_proxy.go index 3895f0e..cd01b51 100755 --- a/internal/route/reverse_proxy.go +++ b/internal/route/reverse_proxy.go @@ -113,10 +113,10 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error { r.HealthMon = waker case r.UseHealthCheck(): if r.IsDocker() { - client, err := docker.NewClient(r.Idlewatcher.DockerHost) + client, err := docker.NewClient(r.Container.DockerHost) if err == nil { fallback := r.newHealthMonitor() - r.HealthMon = monitor.NewDockerHealthMonitor(client, r.Idlewatcher.ContainerID, r.TargetName(), r.HealthCheck, fallback) + r.HealthMon = monitor.NewDockerHealthMonitor(client, r.Container.ContainerID, r.TargetName(), r.HealthCheck, fallback) r.task.OnCancel("close_docker_client", client.Close) } } diff --git a/internal/route/stream.go b/internal/route/stream.go index f093399..259e9ba 100755 --- a/internal/route/stream.go +++ b/internal/route/stream.go @@ -67,10 +67,10 @@ func (r *StreamRoute) Start(parent task.Parent) gperr.Error { r.HealthMon = waker case r.UseHealthCheck(): if r.IsDocker() { - client, err := docker.NewClient(r.IdlewatcherConfig().DockerHost) + client, err := docker.NewClient(r.Container.DockerHost) if err == nil { fallback := monitor.NewRawHealthChecker(r.TargetURL(), r.HealthCheck) - r.HealthMon = monitor.NewDockerHealthMonitor(client, r.IdlewatcherConfig().ContainerID, r.TargetName(), r.HealthCheck, fallback) + r.HealthMon = monitor.NewDockerHealthMonitor(client, r.Container.ContainerID, r.TargetName(), r.HealthCheck, fallback) r.task.OnCancel("close_docker_client", client.Close) } } diff --git a/internal/utils/atomic/bool.go b/internal/utils/atomic/bool.go new file mode 100644 index 0000000..0c462c3 --- /dev/null +++ b/internal/utils/atomic/bool.go @@ -0,0 +1,5 @@ +package atomic + +import "sync/atomic" + +type Bool = atomic.Bool diff --git a/internal/utils/atomic/atomic_value.go b/internal/utils/atomic/value.go similarity index 75% rename from internal/utils/atomic/atomic_value.go rename to internal/utils/atomic/value.go index 2889194..65140fe 100644 --- a/internal/utils/atomic/atomic_value.go +++ b/internal/utils/atomic/value.go @@ -22,11 +22,11 @@ func (a *Value[T]) Store(v T) { } func (a *Value[T]) Swap(v T) T { - return a.Value.Swap(v).(T) -} - -func (a *Value[T]) CompareAndSwap(oldV, newV T) bool { - return a.Value.CompareAndSwap(oldV, newV) + if v := a.Value.Swap(v); v != nil { + return v.(T) + } + var zero T + return zero } func (a *Value[T]) MarshalJSON() ([]byte, error) {