add buffering to docker watcher

This commit is contained in:
yusing 2025-02-14 05:16:56 +08:00
parent f0198616ad
commit 3b94c7bb43

View file

@ -65,49 +65,49 @@ func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Error
return w.EventsWithOptions(ctx, optionsDefault) return w.EventsWithOptions(ctx, optionsDefault)
} }
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan E.Error) { func (w DockerWatcher) Close() {
eventCh := make(chan Event)
errCh := make(chan E.Error)
go func() {
defer close(eventCh)
defer close(errCh)
defer func() {
if w.clientOwned && w.client.Connected() { if w.clientOwned && w.client.Connected() {
w.client.Close() w.client.Close()
} }
}
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan E.Error) {
eventCh := make(chan Event, 100)
errCh := make(chan E.Error, 10)
go func() {
defer func() {
close(eventCh)
close(errCh)
w.Close()
}() }()
if !w.client.Connected() { if !w.client.Connected() {
var err error var err error
attempts := 0
for {
w.client, err = D.ConnectClient(w.host) w.client, err = D.ConnectClient(w.host)
if err == nil { attempts := 0
break retryTicker := time.NewTicker(dockerWatcherRetryInterval)
} for err != nil {
attempts++ attempts++
errCh <- E.Errorf("docker connection attempt #%d: %w", attempts, err) errCh <- E.Errorf("docker connection attempt #%d: %w", attempts, err)
select { select {
case <-ctx.Done(): case <-ctx.Done():
retryTicker.Stop()
return return
default: case <-retryTicker.C:
time.Sleep(dockerWatcherRetryInterval) w.client, err = D.ConnectClient(w.host)
} }
} }
retryTicker.Stop()
} }
defer w.client.Close() defer w.Close()
cEventCh, cErrCh := w.client.Events(ctx, options) cEventCh, cErrCh := w.client.Events(ctx, options)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
if err := E.From(ctx.Err()); err != nil && !err.Is(context.Canceled) {
errCh <- err
}
return return
case msg := <-cEventCh: case msg := <-cEventCh:
action, ok := events.DockerEventMap[msg.Action] action, ok := events.DockerEventMap[msg.Action]