From bda547198efdaed0423f3f94ab83623fd43827c0 Mon Sep 17 00:00:00 2001 From: yusing Date: Mon, 24 Feb 2025 07:50:23 +0800 Subject: [PATCH] improved docker reconnect mechanism, removed redundant checkings, refactor --- agent/pkg/agent/config.go | 9 ++- internal/api/v1/dockerapi/utils.go | 6 +- internal/docker/client.go | 45 +++++++++---- internal/docker/idlewatcher/watcher.go | 11 ++-- internal/docker/inspect.go | 2 +- internal/docker/list_containers.go | 5 +- internal/route/reverse_proxy.go | 2 +- internal/route/stream.go | 2 +- internal/watcher/docker_watcher.go | 88 ++++++++++---------------- 9 files changed, 84 insertions(+), 86 deletions(-) diff --git a/agent/pkg/agent/config.go b/agent/pkg/agent/config.go index d64a287..83243a2 100644 --- a/agent/pkg/agent/config.go +++ b/agent/pkg/agent/config.go @@ -156,12 +156,19 @@ func (cfg *AgentConfig) Transport() *http.Transport { if addr != AgentHost+":443" { return nil, &net.AddrError{Err: "invalid address", Addr: addr} } - return gphttp.DefaultDialer.DialContext(ctx, network, cfg.Addr) + if network != "tcp" { + return nil, &net.OpError{Op: "dial", Net: network, Source: nil, Addr: nil} + } + return cfg.DialContext(ctx) }, TLSClientConfig: cfg.tlsConfig, } } +func (cfg *AgentConfig) DialContext(ctx context.Context) (net.Conn, error) { + return gphttp.DefaultDialer.DialContext(ctx, "tcp", cfg.Addr) +} + func (cfg *AgentConfig) Name() string { return cfg.name } diff --git a/internal/api/v1/dockerapi/utils.go b/internal/api/v1/dockerapi/utils.go index 9f2fc88..2345b00 100644 --- a/internal/api/v1/dockerapi/utils.go +++ b/internal/api/v1/dockerapi/utils.go @@ -36,7 +36,7 @@ func getDockerClients() (DockerClients, gperr.Error) { connErrs := gperr.NewBuilder("failed to connect to docker") for name, host := range dockerHosts { - dockerClient, err := docker.ConnectClient(host) + dockerClient, err := docker.NewClient(host) if err != nil { connErrs.Add(err) continue @@ -45,7 +45,7 @@ func getDockerClients() (DockerClients, gperr.Error) { } for _, agent := range cfg.ListAgents() { - dockerClient, err := docker.ConnectClient(agent.FakeDockerHost()) + dockerClient, err := docker.NewClient(agent.FakeDockerHost()) if err != nil { connErrs.Add(err) continue @@ -74,7 +74,7 @@ func getDockerClient(w http.ResponseWriter, server string) (*docker.SharedClient if host == "" { return nil, false, nil } - dockerClient, err := docker.ConnectClient(host) + dockerClient, err := docker.NewClient(host) if err != nil { return nil, false, err } diff --git a/internal/docker/client.go b/internal/docker/client.go index 7ba3e04..4ecc348 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -1,8 +1,10 @@ package docker import ( + "context" "errors" "fmt" + "net" "net/http" "sync" "time" @@ -23,11 +25,14 @@ type ( key string refCount uint32 closedOn int64 + + addr string + dial func(ctx context.Context) (net.Conn, error) } ) var ( - clientMap = make(map[string]*SharedClient, 5) + clientMap = make(map[string]*SharedClient, 10) clientMapMu sync.RWMutex clientOptEnvHost = []client.Opt{ @@ -64,9 +69,7 @@ func init() { for _, c := range clientMap { delete(clientMap, c.key) - if c.Connected() { - c.Client.Close() - } + c.Client.Close() } }) } @@ -78,10 +81,6 @@ func closeTimedOutClients() { now := time.Now().Unix() for _, c := range clientMap { - if !c.Connected() { - delete(clientMap, c.key) - continue - } if c.closedOn == 0 { continue } @@ -93,8 +92,17 @@ func closeTimedOutClients() { } } -func (c *SharedClient) Connected() bool { - return c != nil && c.Client != nil +func (c *SharedClient) Address() string { + return c.addr +} + +func (c *SharedClient) CheckConnection(ctx context.Context) error { + conn, err := c.dial(ctx) + if err != nil { + return err + } + conn.Close() + return nil } // if the client is still referenced, this is no-op. @@ -103,7 +111,7 @@ func (c *SharedClient) Close() { c.refCount-- } -// ConnectClient creates a new Docker client connection to the specified host. +// NewClient creates a new Docker client connection to the specified host. // // Returns existing client if available. // @@ -113,7 +121,7 @@ func (c *SharedClient) Close() { // Returns: // - Client: the Docker client connection. // - error: an error if the connection failed. -func ConnectClient(host string) (*SharedClient, error) { +func NewClient(host string) (*SharedClient, error) { clientMapMu.Lock() defer clientMapMu.Unlock() @@ -125,6 +133,8 @@ func ConnectClient(host string) (*SharedClient, error) { // create client var opt []client.Opt + var addr string + var dial func(ctx context.Context) (net.Conn, error) if agent.IsDockerHostAgent(host) { cfg, ok := config.GetInstance().GetAgent(host) @@ -136,6 +146,8 @@ func ConnectClient(host string) (*SharedClient, error) { client.WithHTTPClient(cfg.NewHTTPClient()), client.WithAPIVersionNegotiation(), } + addr = "tcp://" + cfg.Addr + dial = cfg.DialContext } else { switch host { case "": @@ -177,9 +189,16 @@ func ConnectClient(host string) (*SharedClient, error) { Client: client, key: host, refCount: 1, + addr: addr, + dial: dial, } - defer logging.Debug().Str("host", host).Msg("docker client connected") + // non-agent client + if c.dial == nil { + c.dial = client.Dialer() + } + + defer logging.Debug().Str("host", host).Msg("docker client initialized") clientMap[c.key] = c return c, nil diff --git a/internal/docker/idlewatcher/watcher.go b/internal/docker/idlewatcher/watcher.go index 82d4043..5e3ff3b 100644 --- a/internal/docker/idlewatcher/watcher.go +++ b/internal/docker/idlewatcher/watcher.go @@ -8,7 +8,7 @@ import ( "github.com/docker/docker/api/types/container" "github.com/rs/zerolog" - D "github.com/yusing/go-proxy/internal/docker" + "github.com/yusing/go-proxy/internal/docker" idlewatcher "github.com/yusing/go-proxy/internal/docker/idlewatcher/types" "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/logging" @@ -29,7 +29,7 @@ type ( *idlewatcher.Config *waker - client *D.SharedClient + client *docker.SharedClient stopByMethod StopCallback // send a docker command w.r.t. `stop_method` ticker *time.Ticker lastReset time.Time @@ -70,7 +70,7 @@ func registerWatcher(watcherTask *task.Task, route route.Route, waker *waker) (* return w, nil } - client, err := D.ConnectClient(cfg.DockerHost) + client, err := docker.NewClient(cfg.DockerHost) if err != nil { return nil, err } @@ -146,9 +146,6 @@ func (w *Watcher) containerStart(ctx context.Context) error { } func (w *Watcher) containerStatus() (string, error) { - if !w.client.Connected() { - return "", errors.New("docker client not connected") - } ctx, cancel := context.WithTimeoutCause(w.task.Context(), dockerReqTimeout, errors.New("docker request timeout")) defer cancel() json, err := w.client.ContainerInspect(ctx, w.ContainerID) @@ -242,7 +239,7 @@ func (w *Watcher) getEventCh(dockerWatcher *watcher.DockerWatcher) (eventCh <-ch // it exits only if the context is canceled, the container is destroyed, // errors occurred on docker client, or route provider died (mainly caused by config reload). func (w *Watcher) watchUntilDestroy() (returnCause error) { - dockerWatcher := watcher.NewDockerWatcherWithClient(w.client) + dockerWatcher := watcher.NewDockerWatcher(w.Config.DockerHost) dockerEventCh, dockerEventErrCh := w.getEventCh(dockerWatcher) for { diff --git a/internal/docker/inspect.go b/internal/docker/inspect.go index 94e70e8..8eb1413 100644 --- a/internal/docker/inspect.go +++ b/internal/docker/inspect.go @@ -7,7 +7,7 @@ import ( ) func Inspect(dockerHost string, containerID string) (*Container, error) { - client, err := ConnectClient(dockerHost) + client, err := NewClient(dockerHost) if err != nil { return nil, err } diff --git a/internal/docker/list_containers.go b/internal/docker/list_containers.go index ba9b96e..517b59d 100644 --- a/internal/docker/list_containers.go +++ b/internal/docker/list_containers.go @@ -5,7 +5,6 @@ import ( "errors" "time" - "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" ) @@ -22,8 +21,8 @@ var listOptions = container.ListOptions{ All: true, } -func ListContainers(clientHost string) ([]types.Container, error) { - dockerClient, err := ConnectClient(clientHost) +func ListContainers(clientHost string) ([]container.Summary, error) { + dockerClient, err := NewClient(clientHost) if err != nil { return nil, err } diff --git a/internal/route/reverse_proxy.go b/internal/route/reverse_proxy.go index b9d640f..3895f0e 100755 --- a/internal/route/reverse_proxy.go +++ b/internal/route/reverse_proxy.go @@ -113,7 +113,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error { r.HealthMon = waker case r.UseHealthCheck(): if r.IsDocker() { - client, err := docker.ConnectClient(r.Idlewatcher.DockerHost) + client, err := docker.NewClient(r.Idlewatcher.DockerHost) if err == nil { fallback := r.newHealthMonitor() r.HealthMon = monitor.NewDockerHealthMonitor(client, r.Idlewatcher.ContainerID, r.TargetName(), r.HealthCheck, fallback) diff --git a/internal/route/stream.go b/internal/route/stream.go index 7f4c206..f093399 100755 --- a/internal/route/stream.go +++ b/internal/route/stream.go @@ -67,7 +67,7 @@ func (r *StreamRoute) Start(parent task.Parent) gperr.Error { r.HealthMon = waker case r.UseHealthCheck(): if r.IsDocker() { - client, err := docker.ConnectClient(r.IdlewatcherConfig().DockerHost) + client, err := docker.NewClient(r.IdlewatcherConfig().DockerHost) if err == nil { fallback := monitor.NewRawHealthChecker(r.TargetURL(), r.HealthCheck) r.HealthMon = monitor.NewDockerHealthMonitor(client, r.IdlewatcherConfig().ContainerID, r.TargetName(), r.HealthCheck, fallback) diff --git a/internal/watcher/docker_watcher.go b/internal/watcher/docker_watcher.go index a4a7ebb..4cae218 100644 --- a/internal/watcher/docker_watcher.go +++ b/internal/watcher/docker_watcher.go @@ -16,9 +16,8 @@ import ( type ( DockerWatcher struct { - host string - client *docker.SharedClient - clientOwned bool + host string + client *docker.SharedClient } DockerListOptions = docker_events.ListOptions ) @@ -60,29 +59,14 @@ func DockerFilterContainerNameID(nameOrID string) filters.KeyValuePair { } func NewDockerWatcher(host string) *DockerWatcher { - return &DockerWatcher{ - host: host, - clientOwned: true, - } -} - -func NewDockerWatcherWithClient(client *docker.SharedClient) *DockerWatcher { - return &DockerWatcher{ - client: client, - } + return &DockerWatcher{host: host} } func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error) { return w.EventsWithOptions(ctx, optionsDefault) } -func (w *DockerWatcher) Close() { - if w.clientOwned && w.client.Connected() { - w.client.Close() - } -} - -func (w *DockerWatcher) parseError(err error) gperr.Error { +func (w DockerWatcher) parseError(err error) gperr.Error { if errors.Is(err, context.DeadlineExceeded) { return gperr.New("docker client connection timeout") } @@ -95,69 +79,61 @@ func (w *DockerWatcher) parseError(err error) gperr.Error { func (w *DockerWatcher) checkConnection(ctx context.Context) bool { ctx, cancel := context.WithTimeout(ctx, dockerWatcherRetryInterval) defer cancel() - _, err := w.client.Ping(ctx) + err := w.client.CheckConnection(ctx) if err != nil { + logging.Debug().Err(err).Msg("docker watcher: connection failed") return false } return true } +func (w *DockerWatcher) handleEvent(event docker_events.Message, ch chan<- Event) { + action, ok := events.DockerEventMap[event.Action] + if !ok { + return + } + ch <- Event{ + Type: events.EventTypeDocker, + ActorID: event.Actor.ID, + ActorAttributes: event.Actor.Attributes, // labels + ActorName: event.Actor.Attributes["name"], + Action: action, + } +} + func (w *DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error) { eventCh := make(chan Event) errCh := make(chan gperr.Error) go func() { + var err error + w.client, err = docker.NewClient(w.host) + if err != nil { + errCh <- gperr.Wrap(err, "docker watcher: failed to initialize client") + return + } + defer func() { close(eventCh) close(errCh) - w.Close() + w.client.Close() }() - if !w.client.Connected() { - var err error - w.client, err = docker.ConnectClient(w.host) - attempts := 0 - retryTicker := time.NewTicker(dockerWatcherRetryInterval) - for err != nil { - attempts++ - errCh <- gperr.Errorf("docker connection attempt #%d: %w", attempts, err) - select { - case <-ctx.Done(): - retryTicker.Stop() - return - case <-retryTicker.C: - w.client, err = docker.ConnectClient(w.host) - } - } - retryTicker.Stop() - } - - defer w.Close() - cEventCh, cErrCh := w.client.Events(ctx, options) - defer logging.Debug().Str("host", w.host).Msg("docker watcher closed") + defer logging.Debug().Str("host", w.client.Address()).Msg("docker watcher closed") for { select { case <-ctx.Done(): return case msg := <-cEventCh: - action, ok := events.DockerEventMap[msg.Action] - if !ok { - continue - } - event := Event{ - Type: events.EventTypeDocker, - ActorID: msg.Actor.ID, - ActorAttributes: msg.Actor.Attributes, // labels - ActorName: msg.Actor.Attributes["name"], - Action: action, - } - eventCh <- event + w.handleEvent(msg, eventCh) case err := <-cErrCh: if err == nil { continue } errCh <- w.parseError(err) + // release the error because reopening event channel may block + err = nil // trigger reload (clear routes) eventCh <- reloadTrigger for !w.checkConnection(ctx) {