From 9b2ee628aa89388b96add49691a070fc877d2569 Mon Sep 17 00:00:00 2001 From: yusing Date: Sat, 1 Mar 2025 15:47:08 +0800 Subject: [PATCH] fix docker client data race on Close(), remove SharedClient.IsConnected --- internal/docker/client.go | 21 +++------ internal/docker/idlewatcher/watcher.go | 5 +-- internal/watcher/docker_watcher.go | 59 +++----------------------- 3 files changed, 13 insertions(+), 72 deletions(-) diff --git a/internal/docker/client.go b/internal/docker/client.go index dc98a8e..fa89884 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -4,6 +4,7 @@ import ( "errors" "net/http" "sync" + "sync/atomic" "time" "github.com/docker/cli/cli/connhelper" @@ -61,9 +62,7 @@ func init() { for _, c := range clientMap { delete(clientMap, c.key) - if c.Connected() { - c.Client.Close() - } + c.Client.Close() } }) } @@ -75,10 +74,6 @@ func closeTimedOutClients() { now := time.Now().Unix() for _, c := range clientMap { - if !c.Connected() { - delete(clientMap, c.key) - continue - } if c.closedOn == 0 { continue } @@ -90,14 +85,10 @@ func closeTimedOutClients() { } } -func (c *SharedClient) Connected() bool { - return c != nil && c.Client != nil -} - // if the client is still referenced, this is no-op. func (c *SharedClient) Close() { - c.closedOn = time.Now().Unix() - c.refCount-- + atomic.StoreInt64(&c.closedOn, time.Now().Unix()) + atomic.AddUint32(&c.refCount, ^uint32(0)) } // ConnectClient creates a new Docker client connection to the specified host. @@ -115,8 +106,8 @@ func ConnectClient(host string) (*SharedClient, error) { defer clientMapMu.Unlock() if client, ok := clientMap[host]; ok { - client.closedOn = 0 - client.refCount++ + atomic.StoreInt64(&client.closedOn, 0) + atomic.AddUint32(&client.refCount, 1) return client, nil } diff --git a/internal/docker/idlewatcher/watcher.go b/internal/docker/idlewatcher/watcher.go index bf35371..eb72fbc 100644 --- a/internal/docker/idlewatcher/watcher.go +++ b/internal/docker/idlewatcher/watcher.go @@ -146,9 +146,6 @@ func (w *Watcher) containerStart(ctx context.Context) error { } func (w *Watcher) containerStatus() (string, error) { - if !w.client.Connected() { - return "", errors.New("docker client not connected") - } ctx, cancel := context.WithTimeoutCause(w.task.Context(), dockerReqTimeout, errors.New("docker request timeout")) defer cancel() json, err := w.client.ContainerInspect(ctx, w.ContainerID) @@ -242,7 +239,7 @@ func (w *Watcher) getEventCh(dockerWatcher watcher.DockerWatcher) (eventCh <-cha // it exits only if the context is canceled, the container is destroyed, // errors occurred on docker client, or route provider died (mainly caused by config reload). func (w *Watcher) watchUntilDestroy() (returnCause error) { - dockerWatcher := watcher.NewDockerWatcherWithClient(w.client) + dockerWatcher := watcher.NewDockerWatcher(w.client.DaemonHost()) dockerEventCh, dockerEventErrCh := w.getEventCh(dockerWatcher) for { diff --git a/internal/watcher/docker_watcher.go b/internal/watcher/docker_watcher.go index cc81fe5..331d465 100644 --- a/internal/watcher/docker_watcher.go +++ b/internal/watcher/docker_watcher.go @@ -6,20 +6,15 @@ import ( docker_events "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" - "github.com/rs/zerolog" D "github.com/yusing/go-proxy/internal/docker" E "github.com/yusing/go-proxy/internal/error" - "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/watcher/events" ) type ( DockerWatcher struct { - zerolog.Logger - - host string - client *D.SharedClient - clientOwned bool + host string + client *D.SharedClient } DockerListOptions = docker_events.ListOptions ) @@ -53,24 +48,7 @@ func DockerFilterContainerNameID(nameOrID string) filters.KeyValuePair { } func NewDockerWatcher(host string) DockerWatcher { - return DockerWatcher{ - host: host, - clientOwned: true, - Logger: logging.With(). - Str("type", "docker"). - Str("host", host). - Logger(), - } -} - -func NewDockerWatcherWithClient(client *D.SharedClient) DockerWatcher { - return DockerWatcher{ - client: client, - Logger: logging.With(). - Str("type", "docker"). - Str("host", client.DaemonHost()). - Logger(), - } + return DockerWatcher{host: host} } func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Error) { @@ -82,36 +60,12 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList errCh := make(chan E.Error) go func() { - defer close(eventCh) - defer close(errCh) - defer func() { - if w.clientOwned && w.client.Connected() { - w.client.Close() - } + defer close(eventCh) + defer close(errCh) + w.client.Close() }() - if !w.client.Connected() { - var err error - attempts := 0 - for { - w.client, err = D.ConnectClient(w.host) - if err == nil { - break - } - attempts++ - errCh <- E.Errorf("docker connection attempt #%d: %w", attempts, err) - select { - case <-ctx.Done(): - return - default: - time.Sleep(dockerWatcherRetryInterval) - } - } - } - - defer w.client.Close() - cEventCh, cErrCh := w.client.Events(ctx, options) for { @@ -124,7 +78,6 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList case msg := <-cEventCh: action, ok := events.DockerEventMap[msg.Action] if !ok { - w.Debug().Msgf("ignored unknown docker event: %s for container %s", msg.Action, msg.Actor.Attributes["name"]) continue } event := Event{