diff --git a/internal/idlewatcher/provider/docker.go b/internal/idlewatcher/provider/docker.go index cd0b39a..9cceed4 100644 --- a/internal/idlewatcher/provider/docker.go +++ b/internal/idlewatcher/provider/docker.go @@ -12,7 +12,7 @@ import ( type DockerProvider struct { client *docker.SharedClient - watcher *watcher.DockerWatcher + watcher watcher.DockerWatcher containerID string } diff --git a/internal/watcher/docker_watcher.go b/internal/watcher/docker_watcher.go index 1f88587..09713c6 100644 --- a/internal/watcher/docker_watcher.go +++ b/internal/watcher/docker_watcher.go @@ -15,10 +15,7 @@ import ( ) type ( - DockerWatcher struct { - host string - client *docker.SharedClient - } + DockerWatcher string DockerListOptions = docker_events.ListOptions ) @@ -58,14 +55,75 @@ func DockerFilterContainerNameID(nameOrID string) filters.KeyValuePair { return filters.Arg("container", nameOrID) } -func NewDockerWatcher(host string) *DockerWatcher { - return &DockerWatcher{host: host} +func NewDockerWatcher(host string) DockerWatcher { + return DockerWatcher(host) } -func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error) { +func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error) { return w.EventsWithOptions(ctx, optionsDefault) } +func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error) { + eventCh := make(chan Event) + errCh := make(chan gperr.Error) + + go func() { + client, err := docker.NewClient(string(w)) + if err != nil { + errCh <- gperr.Wrap(err, "docker watcher: failed to initialize client") + return + } + + defer func() { + close(eventCh) + close(errCh) + client.Close() + }() + + cEventCh, cErrCh := client.Events(ctx, options) + defer logging.Debug().Str("host", client.Address()).Msg("docker watcher closed") + for { + select { + case <-ctx.Done(): + return + case msg := <-cEventCh: + w.handleEvent(msg, eventCh) + case err := <-cErrCh: + if err == nil { + continue + } + errCh <- w.parseError(err) + // release the error because reopening event channel may block + //nolint:ineffassign,wastedassign + err = nil + // trigger reload (clear routes) + eventCh <- reloadTrigger + + retry := time.NewTicker(dockerWatcherRetryInterval) + defer retry.Stop() + ok := false + for !ok { + select { + case <-ctx.Done(): + return + case <-retry.C: + if checkConnection(ctx, client) { + ok = true + break + } + } + } + // connection successful, trigger reload (reload routes) + eventCh <- reloadTrigger + // reopen event channel + cEventCh, cErrCh = client.Events(ctx, options) + } + } + }() + + return eventCh, errCh +} + func (w DockerWatcher) parseError(err error) gperr.Error { if errors.Is(err, context.DeadlineExceeded) { return gperr.New("docker client connection timeout") @@ -76,18 +134,7 @@ func (w DockerWatcher) parseError(err error) gperr.Error { return gperr.Wrap(err) } -func (w *DockerWatcher) checkConnection(ctx context.Context) bool { - ctx, cancel := context.WithTimeout(ctx, dockerWatcherRetryInterval) - defer cancel() - err := w.client.CheckConnection(ctx) - if err != nil { - logging.Debug().Err(err).Msg("docker watcher: connection failed") - return false - } - return true -} - -func (w *DockerWatcher) handleEvent(event docker_events.Message, ch chan<- Event) { +func (w DockerWatcher) handleEvent(event docker_events.Message, ch chan<- Event) { action, ok := events.DockerEventMap[event.Action] if !ok { return @@ -101,63 +148,13 @@ func (w *DockerWatcher) handleEvent(event docker_events.Message, ch chan<- Event } } -func (w *DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error) { - eventCh := make(chan Event) - errCh := make(chan gperr.Error) - - go func() { - var err error - w.client, err = docker.NewClient(w.host) - if err != nil { - errCh <- gperr.Wrap(err, "docker watcher: failed to initialize client") - return - } - - defer func() { - close(eventCh) - close(errCh) - w.client.Close() - }() - - cEventCh, cErrCh := w.client.Events(ctx, options) - defer logging.Debug().Str("host", w.client.Address()).Msg("docker watcher closed") - for { - select { - case <-ctx.Done(): - return - case msg := <-cEventCh: - w.handleEvent(msg, eventCh) - case err := <-cErrCh: - if err == nil { - continue - } - errCh <- w.parseError(err) - // release the error because reopening event channel may block - err = nil - // trigger reload (clear routes) - eventCh <- reloadTrigger - - retry := time.NewTicker(dockerWatcherRetryInterval) - defer retry.Stop() - ok := false - for !ok { - select { - case <-ctx.Done(): - return - case <-retry.C: - if w.checkConnection(ctx) { - ok = true - break - } - } - } - // connection successful, trigger reload (reload routes) - eventCh <- reloadTrigger - // reopen event channel - cEventCh, cErrCh = w.client.Events(ctx, options) - } - } - }() - - return eventCh, errCh +func checkConnection(ctx context.Context, client *docker.SharedClient) bool { + ctx, cancel := context.WithTimeout(ctx, dockerWatcherRetryInterval) + defer cancel() + err := client.CheckConnection(ctx) + if err != nil { + logging.Debug().Err(err).Msg("docker watcher: connection failed") + return false + } + return true }