From 3b94c7bb436b69ffd79df1069d68910613e78da5 Mon Sep 17 00:00:00 2001 From: yusing Date: Fri, 14 Feb 2025 05:16:56 +0800 Subject: [PATCH] add buffering to docker watcher --- internal/watcher/docker_watcher.go | 38 +++++++++++++++--------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/internal/watcher/docker_watcher.go b/internal/watcher/docker_watcher.go index d8babfd..647357e 100644 --- a/internal/watcher/docker_watcher.go +++ b/internal/watcher/docker_watcher.go @@ -65,49 +65,49 @@ func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Error return w.EventsWithOptions(ctx, optionsDefault) } +func (w DockerWatcher) Close() { + if w.clientOwned && w.client.Connected() { + w.client.Close() + } +} + func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan E.Error) { - eventCh := make(chan Event) - errCh := make(chan E.Error) + eventCh := make(chan Event, 100) + errCh := make(chan E.Error, 10) go func() { - defer close(eventCh) - defer close(errCh) - defer func() { - if w.clientOwned && w.client.Connected() { - w.client.Close() - } + close(eventCh) + close(errCh) + w.Close() }() if !w.client.Connected() { var err error + w.client, err = D.ConnectClient(w.host) attempts := 0 - for { - w.client, err = D.ConnectClient(w.host) - if err == nil { - break - } + retryTicker := time.NewTicker(dockerWatcherRetryInterval) + for err != nil { attempts++ errCh <- E.Errorf("docker connection attempt #%d: %w", attempts, err) select { case <-ctx.Done(): + retryTicker.Stop() return - default: - time.Sleep(dockerWatcherRetryInterval) + case <-retryTicker.C: + w.client, err = D.ConnectClient(w.host) } } + retryTicker.Stop() } - defer w.client.Close() + defer w.Close() cEventCh, cErrCh := w.client.Events(ctx, options) for { select { case <-ctx.Done(): - if err := E.From(ctx.Err()); err != nil && !err.Is(context.Canceled) { - errCh <- err - } return case msg := <-cEventCh: action, ok := events.DockerEventMap[msg.Action]