refactor: docker event watcher

This commit is contained in:
yusing 2025-05-19 23:15:47 +08:00
parent 1f50ee7f2f
commit 3dbab118af
2 changed files with 76 additions and 79 deletions

View file

@ -12,7 +12,7 @@ import (
type DockerProvider struct { type DockerProvider struct {
client *docker.SharedClient client *docker.SharedClient
watcher *watcher.DockerWatcher watcher watcher.DockerWatcher
containerID string containerID string
} }

View file

@ -15,10 +15,7 @@ import (
) )
type ( type (
DockerWatcher struct { DockerWatcher string
host string
client *docker.SharedClient
}
DockerListOptions = docker_events.ListOptions DockerListOptions = docker_events.ListOptions
) )
@ -58,14 +55,75 @@ func DockerFilterContainerNameID(nameOrID string) filters.KeyValuePair {
return filters.Arg("container", nameOrID) return filters.Arg("container", nameOrID)
} }
func NewDockerWatcher(host string) *DockerWatcher { func NewDockerWatcher(host string) DockerWatcher {
return &DockerWatcher{host: host} return DockerWatcher(host)
} }
func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error) { func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error) {
return w.EventsWithOptions(ctx, optionsDefault) return w.EventsWithOptions(ctx, optionsDefault)
} }
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error) {
eventCh := make(chan Event)
errCh := make(chan gperr.Error)
go func() {
client, err := docker.NewClient(string(w))
if err != nil {
errCh <- gperr.Wrap(err, "docker watcher: failed to initialize client")
return
}
defer func() {
close(eventCh)
close(errCh)
client.Close()
}()
cEventCh, cErrCh := client.Events(ctx, options)
defer logging.Debug().Str("host", client.Address()).Msg("docker watcher closed")
for {
select {
case <-ctx.Done():
return
case msg := <-cEventCh:
w.handleEvent(msg, eventCh)
case err := <-cErrCh:
if err == nil {
continue
}
errCh <- w.parseError(err)
// release the error because reopening event channel may block
//nolint:ineffassign,wastedassign
err = nil
// trigger reload (clear routes)
eventCh <- reloadTrigger
retry := time.NewTicker(dockerWatcherRetryInterval)
defer retry.Stop()
ok := false
for !ok {
select {
case <-ctx.Done():
return
case <-retry.C:
if checkConnection(ctx, client) {
ok = true
break
}
}
}
// connection successful, trigger reload (reload routes)
eventCh <- reloadTrigger
// reopen event channel
cEventCh, cErrCh = client.Events(ctx, options)
}
}
}()
return eventCh, errCh
}
func (w DockerWatcher) parseError(err error) gperr.Error { func (w DockerWatcher) parseError(err error) gperr.Error {
if errors.Is(err, context.DeadlineExceeded) { if errors.Is(err, context.DeadlineExceeded) {
return gperr.New("docker client connection timeout") return gperr.New("docker client connection timeout")
@ -76,18 +134,7 @@ func (w DockerWatcher) parseError(err error) gperr.Error {
return gperr.Wrap(err) return gperr.Wrap(err)
} }
func (w *DockerWatcher) checkConnection(ctx context.Context) bool { func (w DockerWatcher) handleEvent(event docker_events.Message, ch chan<- Event) {
ctx, cancel := context.WithTimeout(ctx, dockerWatcherRetryInterval)
defer cancel()
err := w.client.CheckConnection(ctx)
if err != nil {
logging.Debug().Err(err).Msg("docker watcher: connection failed")
return false
}
return true
}
func (w *DockerWatcher) handleEvent(event docker_events.Message, ch chan<- Event) {
action, ok := events.DockerEventMap[event.Action] action, ok := events.DockerEventMap[event.Action]
if !ok { if !ok {
return return
@ -101,63 +148,13 @@ func (w *DockerWatcher) handleEvent(event docker_events.Message, ch chan<- Event
} }
} }
func (w *DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error) { func checkConnection(ctx context.Context, client *docker.SharedClient) bool {
eventCh := make(chan Event) ctx, cancel := context.WithTimeout(ctx, dockerWatcherRetryInterval)
errCh := make(chan gperr.Error) defer cancel()
err := client.CheckConnection(ctx)
go func() { if err != nil {
var err error logging.Debug().Err(err).Msg("docker watcher: connection failed")
w.client, err = docker.NewClient(w.host) return false
if err != nil { }
errCh <- gperr.Wrap(err, "docker watcher: failed to initialize client") return true
return
}
defer func() {
close(eventCh)
close(errCh)
w.client.Close()
}()
cEventCh, cErrCh := w.client.Events(ctx, options)
defer logging.Debug().Str("host", w.client.Address()).Msg("docker watcher closed")
for {
select {
case <-ctx.Done():
return
case msg := <-cEventCh:
w.handleEvent(msg, eventCh)
case err := <-cErrCh:
if err == nil {
continue
}
errCh <- w.parseError(err)
// release the error because reopening event channel may block
err = nil
// trigger reload (clear routes)
eventCh <- reloadTrigger
retry := time.NewTicker(dockerWatcherRetryInterval)
defer retry.Stop()
ok := false
for !ok {
select {
case <-ctx.Done():
return
case <-retry.C:
if w.checkConnection(ctx) {
ok = true
break
}
}
}
// connection successful, trigger reload (reload routes)
eventCh <- reloadTrigger
// reopen event channel
cEventCh, cErrCh = w.client.Events(ctx, options)
}
}
}()
return eventCh, errCh
} }