fix docker client data race on Close(), remove SharedClient.IsConnected

This commit is contained in:
yusing 2025-03-01 15:47:08 +08:00
parent 357ad26a0e
commit 9b2ee628aa
3 changed files with 13 additions and 72 deletions

View file

@ -4,6 +4,7 @@ import (
"errors" "errors"
"net/http" "net/http"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/docker/cli/cli/connhelper" "github.com/docker/cli/cli/connhelper"
@ -61,10 +62,8 @@ func init() {
for _, c := range clientMap { for _, c := range clientMap {
delete(clientMap, c.key) delete(clientMap, c.key)
if c.Connected() {
c.Client.Close() c.Client.Close()
} }
}
}) })
} }
@ -75,10 +74,6 @@ func closeTimedOutClients() {
now := time.Now().Unix() now := time.Now().Unix()
for _, c := range clientMap { for _, c := range clientMap {
if !c.Connected() {
delete(clientMap, c.key)
continue
}
if c.closedOn == 0 { if c.closedOn == 0 {
continue continue
} }
@ -90,14 +85,10 @@ func closeTimedOutClients() {
} }
} }
func (c *SharedClient) Connected() bool {
return c != nil && c.Client != nil
}
// if the client is still referenced, this is no-op. // if the client is still referenced, this is no-op.
func (c *SharedClient) Close() { func (c *SharedClient) Close() {
c.closedOn = time.Now().Unix() atomic.StoreInt64(&c.closedOn, time.Now().Unix())
c.refCount-- atomic.AddUint32(&c.refCount, ^uint32(0))
} }
// ConnectClient creates a new Docker client connection to the specified host. // ConnectClient creates a new Docker client connection to the specified host.
@ -115,8 +106,8 @@ func ConnectClient(host string) (*SharedClient, error) {
defer clientMapMu.Unlock() defer clientMapMu.Unlock()
if client, ok := clientMap[host]; ok { if client, ok := clientMap[host]; ok {
client.closedOn = 0 atomic.StoreInt64(&client.closedOn, 0)
client.refCount++ atomic.AddUint32(&client.refCount, 1)
return client, nil return client, nil
} }

View file

@ -146,9 +146,6 @@ func (w *Watcher) containerStart(ctx context.Context) error {
} }
func (w *Watcher) containerStatus() (string, error) { func (w *Watcher) containerStatus() (string, error) {
if !w.client.Connected() {
return "", errors.New("docker client not connected")
}
ctx, cancel := context.WithTimeoutCause(w.task.Context(), dockerReqTimeout, errors.New("docker request timeout")) ctx, cancel := context.WithTimeoutCause(w.task.Context(), dockerReqTimeout, errors.New("docker request timeout"))
defer cancel() defer cancel()
json, err := w.client.ContainerInspect(ctx, w.ContainerID) json, err := w.client.ContainerInspect(ctx, w.ContainerID)
@ -242,7 +239,7 @@ func (w *Watcher) getEventCh(dockerWatcher watcher.DockerWatcher) (eventCh <-cha
// it exits only if the context is canceled, the container is destroyed, // it exits only if the context is canceled, the container is destroyed,
// errors occurred on docker client, or route provider died (mainly caused by config reload). // errors occurred on docker client, or route provider died (mainly caused by config reload).
func (w *Watcher) watchUntilDestroy() (returnCause error) { func (w *Watcher) watchUntilDestroy() (returnCause error) {
dockerWatcher := watcher.NewDockerWatcherWithClient(w.client) dockerWatcher := watcher.NewDockerWatcher(w.client.DaemonHost())
dockerEventCh, dockerEventErrCh := w.getEventCh(dockerWatcher) dockerEventCh, dockerEventErrCh := w.getEventCh(dockerWatcher)
for { for {

View file

@ -6,20 +6,15 @@ import (
docker_events "github.com/docker/docker/api/types/events" docker_events "github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/filters"
"github.com/rs/zerolog"
D "github.com/yusing/go-proxy/internal/docker" D "github.com/yusing/go-proxy/internal/docker"
E "github.com/yusing/go-proxy/internal/error" E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/watcher/events" "github.com/yusing/go-proxy/internal/watcher/events"
) )
type ( type (
DockerWatcher struct { DockerWatcher struct {
zerolog.Logger
host string host string
client *D.SharedClient client *D.SharedClient
clientOwned bool
} }
DockerListOptions = docker_events.ListOptions DockerListOptions = docker_events.ListOptions
) )
@ -53,24 +48,7 @@ func DockerFilterContainerNameID(nameOrID string) filters.KeyValuePair {
} }
func NewDockerWatcher(host string) DockerWatcher { func NewDockerWatcher(host string) DockerWatcher {
return DockerWatcher{ return DockerWatcher{host: host}
host: host,
clientOwned: true,
Logger: logging.With().
Str("type", "docker").
Str("host", host).
Logger(),
}
}
func NewDockerWatcherWithClient(client *D.SharedClient) DockerWatcher {
return DockerWatcher{
client: client,
Logger: logging.With().
Str("type", "docker").
Str("host", client.DaemonHost()).
Logger(),
}
} }
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Error) { func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Error) {
@ -82,36 +60,12 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
errCh := make(chan E.Error) errCh := make(chan E.Error)
go func() { go func() {
defer func() {
defer close(eventCh) defer close(eventCh)
defer close(errCh) defer close(errCh)
defer func() {
if w.clientOwned && w.client.Connected() {
w.client.Close() w.client.Close()
}
}() }()
if !w.client.Connected() {
var err error
attempts := 0
for {
w.client, err = D.ConnectClient(w.host)
if err == nil {
break
}
attempts++
errCh <- E.Errorf("docker connection attempt #%d: %w", attempts, err)
select {
case <-ctx.Done():
return
default:
time.Sleep(dockerWatcherRetryInterval)
}
}
}
defer w.client.Close()
cEventCh, cErrCh := w.client.Events(ctx, options) cEventCh, cErrCh := w.client.Events(ctx, options)
for { for {
@ -124,7 +78,6 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
case msg := <-cEventCh: case msg := <-cEventCh:
action, ok := events.DockerEventMap[msg.Action] action, ok := events.DockerEventMap[msg.Action]
if !ok { if !ok {
w.Debug().Msgf("ignored unknown docker event: %s for container %s", msg.Action, msg.Actor.Attributes["name"])
continue continue
} }
event := Event{ event := Event{