diff --git a/internal/docker/idlewatcher/watcher.go b/internal/docker/idlewatcher/watcher.go index 599579e..82d4043 100644 --- a/internal/docker/idlewatcher/watcher.go +++ b/internal/docker/idlewatcher/watcher.go @@ -213,7 +213,7 @@ func (w *Watcher) expires() time.Time { return w.lastReset.Add(w.IdleTimeout) } -func (w *Watcher) getEventCh(dockerWatcher watcher.DockerWatcher) (eventCh <-chan events.Event, errCh <-chan gperr.Error) { +func (w *Watcher) getEventCh(dockerWatcher *watcher.DockerWatcher) (eventCh <-chan events.Event, errCh <-chan gperr.Error) { eventCh, errCh = dockerWatcher.EventsWithOptions(w.Task().Context(), watcher.DockerListOptions{ Filters: watcher.NewDockerFilter( watcher.DockerFilterContainer, diff --git a/internal/route/provider/event_handler.go b/internal/route/provider/event_handler.go index 5035d58..2f2b939 100644 --- a/internal/route/provider/event_handler.go +++ b/internal/route/provider/event_handler.go @@ -6,6 +6,7 @@ import ( "github.com/yusing/go-proxy/internal/route/provider/types" "github.com/yusing/go-proxy/internal/task" "github.com/yusing/go-proxy/internal/watcher" + eventsPkg "github.com/yusing/go-proxy/internal/watcher/events" ) type EventHandler struct { @@ -29,10 +30,19 @@ func (p *Provider) newEventHandler() *EventHandler { func (handler *EventHandler) Handle(parent task.Parent, events []watcher.Event) { oldRoutes := handler.provider.routes + + isForceReload := false + for _, event := range events { + if event.Action == eventsPkg.ActionForceReload { + isForceReload = true + break + } + } + newRoutes, err := handler.provider.loadRoutes() if err != nil { handler.errs.Add(err) - if len(newRoutes) == 0 { + if len(newRoutes) == 0 && !isForceReload { return } } diff --git a/internal/watcher/docker_watcher.go b/internal/watcher/docker_watcher.go index 89b8a0d..a4a7ebb 100644 --- a/internal/watcher/docker_watcher.go +++ b/internal/watcher/docker_watcher.go @@ -2,19 +2,22 @@ package watcher import ( "context" + "errors" "time" docker_events "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" - D "github.com/yusing/go-proxy/internal/docker" + "github.com/docker/docker/client" + "github.com/yusing/go-proxy/internal/docker" "github.com/yusing/go-proxy/internal/gperr" + "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/watcher/events" ) type ( DockerWatcher struct { host string - client *D.SharedClient + client *docker.SharedClient clientOwned bool } DockerListOptions = docker_events.ListOptions @@ -42,38 +45,66 @@ var ( )} dockerWatcherRetryInterval = 3 * time.Second + + reloadTrigger = Event{ + Type: events.EventTypeDocker, + Action: events.ActionForceReload, + ActorAttributes: map[string]string{}, + ActorName: "", + ActorID: "", + } ) func DockerFilterContainerNameID(nameOrID string) filters.KeyValuePair { return filters.Arg("container", nameOrID) } -func NewDockerWatcher(host string) DockerWatcher { - return DockerWatcher{ +func NewDockerWatcher(host string) *DockerWatcher { + return &DockerWatcher{ host: host, clientOwned: true, } } -func NewDockerWatcherWithClient(client *D.SharedClient) DockerWatcher { - return DockerWatcher{ +func NewDockerWatcherWithClient(client *docker.SharedClient) *DockerWatcher { + return &DockerWatcher{ client: client, } } -func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error) { +func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan gperr.Error) { return w.EventsWithOptions(ctx, optionsDefault) } -func (w DockerWatcher) Close() { +func (w *DockerWatcher) Close() { if w.clientOwned && w.client.Connected() { w.client.Close() } } -func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error) { - eventCh := make(chan Event, 100) - errCh := make(chan gperr.Error, 10) +func (w *DockerWatcher) parseError(err error) gperr.Error { + if errors.Is(err, context.DeadlineExceeded) { + return gperr.New("docker client connection timeout") + } + if client.IsErrConnectionFailed(err) { + return gperr.New("docker client connection failure") + } + return gperr.Wrap(err) +} + +func (w *DockerWatcher) checkConnection(ctx context.Context) bool { + ctx, cancel := context.WithTimeout(ctx, dockerWatcherRetryInterval) + defer cancel() + _, err := w.client.Ping(ctx) + if err != nil { + return false + } + return true +} + +func (w *DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan gperr.Error) { + eventCh := make(chan Event) + errCh := make(chan gperr.Error) go func() { defer func() { @@ -84,7 +115,7 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList if !w.client.Connected() { var err error - w.client, err = D.ConnectClient(w.host) + w.client, err = docker.ConnectClient(w.host) attempts := 0 retryTicker := time.NewTicker(dockerWatcherRetryInterval) for err != nil { @@ -95,7 +126,7 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList retryTicker.Stop() return case <-retryTicker.C: - w.client, err = D.ConnectClient(w.host) + w.client, err = docker.ConnectClient(w.host) } } retryTicker.Stop() @@ -104,7 +135,7 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList defer w.Close() cEventCh, cErrCh := w.client.Events(ctx, options) - + defer logging.Debug().Str("host", w.host).Msg("docker watcher closed") for { select { case <-ctx.Done(): @@ -126,14 +157,21 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList if err == nil { continue } - errCh <- gperr.Wrap(err) - select { - case <-ctx.Done(): - return - default: - time.Sleep(dockerWatcherRetryInterval) - cEventCh, cErrCh = w.client.Events(ctx, options) + errCh <- w.parseError(err) + // trigger reload (clear routes) + eventCh <- reloadTrigger + for !w.checkConnection(ctx) { + select { + case <-ctx.Done(): + return + case <-time.After(dockerWatcherRetryInterval): + continue + } } + // connection successful, trigger reload (reload routes) + eventCh <- reloadTrigger + // reopen event channel + cEventCh, cErrCh = w.client.Events(ctx, options) } } }() diff --git a/internal/watcher/events/events.go b/internal/watcher/events/events.go index 069c376..6b851d0 100644 --- a/internal/watcher/events/events.go +++ b/internal/watcher/events/events.go @@ -34,6 +34,8 @@ const ( ActionContainerDie ActionContainerDestroy + ActionForceReload + actionContainerWakeMask = ActionContainerCreate | ActionContainerStart | ActionContainerUnpause actionContainerSleepMask = ActionContainerKill | ActionContainerStop | ActionContainerPause | ActionContainerDie )