diff --git a/internal/docker/client.go b/internal/docker/client.go index 3a77b58..7ba3e04 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -5,16 +5,15 @@ import ( "fmt" "net/http" "sync" + "time" "github.com/docker/cli/cli/connhelper" "github.com/docker/docker/client" - "github.com/rs/zerolog" "github.com/yusing/go-proxy/agent/pkg/agent" "github.com/yusing/go-proxy/internal/common" config "github.com/yusing/go-proxy/internal/config/types" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/task" - U "github.com/yusing/go-proxy/internal/utils" ) type ( @@ -22,15 +21,14 @@ type ( *client.Client key string - refCount *U.RefCount - - l zerolog.Logger + refCount uint32 + closedOn int64 } ) var ( clientMap = make(map[string]*SharedClient, 5) - clientMapMu sync.Mutex + clientMapMu sync.RWMutex clientOptEnvHost = []client.Opt{ client.WithHostFromEnv(), @@ -38,12 +36,34 @@ var ( } ) +const ( + cleanInterval = 10 * time.Second + clientTTLSecs = int64(10) +) + func init() { + cleaner := task.RootTask("docker_clients_cleaner") + go func() { + ticker := time.NewTicker(cleanInterval) + defer ticker.Stop() + defer cleaner.Finish("program exit") + + for { + select { + case <-ticker.C: + closeTimedOutClients() + case <-cleaner.Context().Done(): + return + } + } + }() + task.OnProgramExit("docker_clients_cleanup", func() { clientMapMu.Lock() defer clientMapMu.Unlock() for _, c := range clientMap { + delete(clientMap, c.key) if c.Connected() { c.Client.Close() } @@ -51,15 +71,36 @@ func init() { }) } +func closeTimedOutClients() { + clientMapMu.Lock() + defer clientMapMu.Unlock() + + now := time.Now().Unix() + + for _, c := range clientMap { + if !c.Connected() { + delete(clientMap, c.key) + continue + } + if c.closedOn == 0 { + continue + } + if c.refCount == 0 && now-c.closedOn > clientTTLSecs { + delete(clientMap, c.key) + c.Client.Close() + logging.Debug().Str("host", c.key).Msg("docker client closed") + } + } +} + func (c *SharedClient) Connected() bool { return c != nil && c.Client != nil } // if the client is still referenced, this is no-op. func (c *SharedClient) Close() { - if c.Connected() { - c.refCount.Sub() - } + c.closedOn = time.Now().Unix() + c.refCount-- } // ConnectClient creates a new Docker client connection to the specified host. @@ -77,7 +118,8 @@ func ConnectClient(host string) (*SharedClient, error) { defer clientMapMu.Unlock() if client, ok := clientMap[host]; ok { - client.refCount.Add() + client.closedOn = 0 + client.refCount++ return client, nil } @@ -134,23 +176,11 @@ func ConnectClient(host string) (*SharedClient, error) { c := &SharedClient{ Client: client, key: host, - refCount: U.NewRefCounter(), - l: logging.With().Str("address", client.DaemonHost()).Logger(), + refCount: 1, } - c.l.Trace().Msg("client connected") - clientMap[host] = c + defer logging.Debug().Str("host", host).Msg("docker client connected") - go func() { - <-c.refCount.Zero() - clientMapMu.Lock() - delete(clientMap, c.key) - clientMapMu.Unlock() - - if c.Connected() { - c.Client.Close() - c.l.Trace().Msg("client closed") - } - }() + clientMap[c.key] = c return c, nil }