diff --git a/internal/common/task.go b/internal/common/task.go index dd55a6c..c471aa1 100644 --- a/internal/common/task.go +++ b/internal/common/task.go @@ -125,6 +125,7 @@ func newSubTask(ctx context.Context, name string) *task { } tasksMap.Store(t, struct{}{}) taskWg.Add(1) + logrus.Debugf("task %q started", name) return t } diff --git a/internal/config/query.go b/internal/config/query.go index cd77ef9..5243169 100644 --- a/internal/config/query.go +++ b/internal/config/query.go @@ -47,9 +47,6 @@ func (cfg *Config) HomepageConfig() homepage.Config { item := entry.Homepage if item == nil { item = new(homepage.Item) - } - - if !item.Show && item.IsEmpty() { item.Show = true } diff --git a/internal/docker/idlewatcher/waker.go b/internal/docker/idlewatcher/waker.go index c0430d0..34bbb5e 100644 --- a/internal/docker/idlewatcher/waker.go +++ b/internal/docker/idlewatcher/waker.go @@ -7,6 +7,7 @@ import ( "time" "github.com/sirupsen/logrus" + E "github.com/yusing/go-proxy/internal/error" gphttp "github.com/yusing/go-proxy/internal/net/http" "github.com/yusing/go-proxy/internal/net/types" "github.com/yusing/go-proxy/internal/watcher/health" @@ -125,7 +126,7 @@ func (w *Waker) wake(rw http.ResponseWriter, r *http.Request) (shouldNext bool) select { case <-w.task.Context().Done(): - http.Error(rw, "Waking timed out", http.StatusGatewayTimeout) + http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) return case <-ctx.Done(): http.Error(rw, "Waking timed out", http.StatusGatewayTimeout) @@ -133,12 +134,11 @@ func (w *Waker) wake(rw http.ResponseWriter, r *http.Request) (shouldNext bool) default: } - // wake the container and reset idle timer - // also wait for another wake request - w.wakeCh <- struct{}{} - - if <-w.wakeDone != nil { - http.Error(rw, "Error sending wake request", http.StatusInternalServerError) + w.l.Debug("wake signal received") + err := w.wakeIfStopped() + if err != nil { + w.l.Error(E.FailWith("wake", err)) + http.Error(rw, "Error waking container", http.StatusInternalServerError) return } @@ -153,6 +153,9 @@ func (w *Waker) wake(rw http.ResponseWriter, r *http.Request) (shouldNext bool) for { select { + case <-w.task.Context().Done(): + http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) + return case <-ctx.Done(): http.Error(rw, "Waking timed out", http.StatusGatewayTimeout) return diff --git a/internal/docker/idlewatcher/watcher.go b/internal/docker/idlewatcher/watcher.go index b3f3776..7ad67ce 100644 --- a/internal/docker/idlewatcher/watcher.go +++ b/internal/docker/idlewatcher/watcher.go @@ -27,9 +27,7 @@ type ( ready atomic.Bool // whether the site is ready to accept connection stopByMethod StopCallback // send a docker command w.r.t. `stop_method` - wakeCh chan struct{} - wakeDone chan E.NestedError - ticker *time.Ticker + ticker *time.Ticker task common.Task cancel context.CancelFunc @@ -84,8 +82,6 @@ func Register(entry *P.ReverseProxyEntry) (*Watcher, E.NestedError) { ReverseProxyEntry: entry, client: client, refCount: U.NewRefCounter(), - wakeCh: make(chan struct{}, 1), - wakeDone: make(chan E.NestedError), ticker: time.NewTicker(entry.IdleTimeout), l: logger.WithField("container", entry.ContainerName), } @@ -127,6 +123,9 @@ func (w *Watcher) containerStart() error { } func (w *Watcher) containerStatus() (string, E.NestedError) { + if !w.client.Connected() { + return "", E.Failure("docker client closed") + } json, err := w.client.ContainerInspect(w.task.Context(), w.ContainerID) if err != nil { return "", E.FailWith("inspect container", err) @@ -201,10 +200,9 @@ func (w *Watcher) watchUntilCancel() { }) defer func() { + w.cancel() w.ticker.Stop() w.client.Close() - close(w.wakeDone) - close(w.wakeCh) watcherMap.Delete(w.ContainerID) w.task.Finished() }() @@ -220,6 +218,7 @@ func (w *Watcher) watchUntilCancel() { case err := <-dockerEventErrCh: if err != nil && err.IsNot(context.Canceled) { w.l.Error(E.FailWith("docker watcher", err)) + return } case e := <-dockerEventCh: switch { @@ -227,27 +226,22 @@ func (w *Watcher) watchUntilCancel() { case e.Action.IsContainerWake(): w.ContainerRunning = true w.resetIdleTimer() - w.l.Info(e) - default: // stop / pause / kil + w.l.Info("container awaken") + case e.Action.IsContainerSleep(): // stop / pause / kil w.ContainerRunning = false w.ticker.Stop() w.ready.Store(false) - w.l.Info(e) + default: + w.l.Errorf("unexpected docker event: %s", e) } case <-w.ticker.C: w.l.Debug("idle timeout") w.ticker.Stop() if err := w.stopByMethod(); err != nil && err.IsNot(context.Canceled) { w.l.Error(E.FailWith("stop", err).Extraf("stop method: %s", w.StopMethod)) + } else { + w.l.Info("stopped by idle timeout") } - case <-w.wakeCh: - w.l.Debug("wake signal received") - w.resetIdleTimer() - err := w.wakeIfStopped() - if err != nil { - w.l.Error(E.FailWith("wake", err)) - } - w.wakeDone <- err } } } diff --git a/internal/proxy/provider/docker.go b/internal/proxy/provider/docker.go index 50fae90..aa48151 100755 --- a/internal/proxy/provider/docker.go +++ b/internal/proxy/provider/docker.go @@ -89,12 +89,6 @@ func (p *DockerProvider) shouldIgnore(container *D.Container) bool { } func (p *DockerProvider) OnEvent(event W.Event, oldRoutes R.Routes) (res EventResult) { - switch event.Action { - case events.ActionContainerStart, events.ActionContainerStop: - break - default: - return - } b := E.NewBuilder("event %s error", event) defer b.To(&res.err) @@ -105,15 +99,26 @@ func (p *DockerProvider) OnEvent(event W.Event, oldRoutes R.Routes) (res EventRe matches.Store(k, v) } }) + //FIXME: docker event die stuck var newRoutes R.Routes var err E.NestedError - if matches.Size() == 0 { // id & container name changed + switch { + // id & container name changed + case matches.Size() == 0: matches = oldRoutes newRoutes, err = p.LoadRoutesImpl() b.Add(err) - } else { + case event.Action == events.ActionContainerDestroy: + // stop all old routes + matches.RangeAllParallel(func(_ string, v *R.Route) { + oldRoutes.Delete(v.Entry.Alias) + b.Add(v.Stop()) + res.nRemoved++ + }) + return + default: cont, err := D.Inspect(p.dockerHost, event.ActorID) if err != nil { b.Add(E.FailWith("inspect container", err)) @@ -124,6 +129,7 @@ func (p *DockerProvider) OnEvent(event W.Event, oldRoutes R.Routes) (res EventRe // stop all old routes matches.RangeAllParallel(func(_ string, v *R.Route) { b.Add(v.Stop()) + res.nRemoved++ }) return } @@ -145,19 +151,13 @@ func (p *DockerProvider) OnEvent(event W.Event, oldRoutes R.Routes) (res EventRe newRoutes.RangeAll(func(alias string, newRoute *R.Route) { oldRoute, exists := oldRoutes.Load(alias) if exists { - if err := oldRoute.Stop(); err != nil { - b.Add(err) - } - } - oldRoutes.Store(alias, newRoute) - if err := newRoute.Start(); err != nil { - b.Add(err) - } - if exists { + b.Add(oldRoute.Stop()) res.nReloaded++ } else { res.nAdded++ } + b.Add(newRoute.Start()) + oldRoutes.Store(alias, newRoute) }) return diff --git a/internal/proxy/provider/provider.go b/internal/proxy/provider/provider.go index 2407efc..1318a76 100644 --- a/internal/proxy/provider/provider.go +++ b/internal/proxy/provider/provider.go @@ -189,9 +189,11 @@ func (p *Provider) watchEvents() { case <-p.watcherTask.Context().Done(): return case event := <-events: + task := p.watcherTask.Subtask("%s event %s", event.Type, event) + l.Infof("%s event %q", event.Type, event) res := p.OnEvent(event, p.routes) + task.Finished() if res.nAdded+res.nRemoved+res.nReloaded > 0 { - l.Infof("%s event %q", event.Type, event) l.Infof("| %d NEW | %d REMOVED | %d RELOADED |", res.nAdded, res.nRemoved, res.nReloaded) } if res.err != nil { diff --git a/internal/route/http.go b/internal/route/http.go index c361b11..2e89ed4 100755 --- a/internal/route/http.go +++ b/internal/route/http.go @@ -129,7 +129,7 @@ func (r *HTTPRoute) Start() E.NestedError { r.handler = waker r.HealthMon = waker case !r.HealthCheck.Disabled: - r.HealthMon = health.NewHTTPHealthMonitor(common.GlobalTask("Reverse proxy "+r.String()), r.URL(), r.HealthCheck) + r.HealthMon = health.NewHTTPHealthMonitor(common.GlobalTask(r.String()), r.URL(), r.HealthCheck) } if r.handler == nil { diff --git a/internal/watcher/docker_watcher.go b/internal/watcher/docker_watcher.go index 32c0c15..57369c5 100644 --- a/internal/watcher/docker_watcher.go +++ b/internal/watcher/docker_watcher.go @@ -28,6 +28,7 @@ var ( DockerFilterStart = filters.Arg("event", string(docker_events.ActionStart)) DockerFilterStop = filters.Arg("event", string(docker_events.ActionStop)) DockerFilterDie = filters.Arg("event", string(docker_events.ActionDie)) + DockerFilterDestroy = filters.Arg("event", string(docker_events.ActionDestroy)) DockerFilterKill = filters.Arg("event", string(docker_events.ActionKill)) DockerFilterPause = filters.Arg("event", string(docker_events.ActionPause)) DockerFilterUnpause = filters.Arg("event", string(docker_events.ActionUnPause)) @@ -97,12 +98,8 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList defer w.client.Close() - w.Debugf("client connected") - cEventCh, cErrCh := w.client.Events(ctx, options) - w.Debugf("watcher started") - for { select { case <-ctx.Done(): @@ -148,4 +145,5 @@ var optionsWatchAll = DockerListOptions{Filters: NewDockerFilter( DockerFilterStart, // DockerFilterStop, DockerFilterDie, + DockerFilterDestroy, )} diff --git a/internal/watcher/events/events.go b/internal/watcher/events/events.go index 49d39a7..876f77d 100644 --- a/internal/watcher/events/events.go +++ b/internal/watcher/events/events.go @@ -32,6 +32,7 @@ const ( ActionContainerStop ActionContainerPause ActionContainerDie + ActionContainerDestroy actionContainerWakeMask = ActionContainerCreate | ActionContainerStart | ActionContainerUnpause actionContainerSleepMask = ActionContainerKill | ActionContainerStop | ActionContainerPause | ActionContainerDie @@ -47,10 +48,11 @@ var DockerEventMap = map[dockerEvents.Action]Action{ dockerEvents.ActionStart: ActionContainerStart, dockerEvents.ActionUnPause: ActionContainerUnpause, - dockerEvents.ActionKill: ActionContainerKill, - dockerEvents.ActionStop: ActionContainerStop, - dockerEvents.ActionPause: ActionContainerPause, - dockerEvents.ActionDie: ActionContainerDie, + dockerEvents.ActionKill: ActionContainerKill, + dockerEvents.ActionStop: ActionContainerStop, + dockerEvents.ActionPause: ActionContainerPause, + dockerEvents.ActionDie: ActionContainerDie, + dockerEvents.ActionDestroy: ActionContainerDestroy, } var fileActionNameMap = map[Action]string{ diff --git a/internal/watcher/health/monitor.go b/internal/watcher/health/monitor.go index 37f3e39..71b1db3 100644 --- a/internal/watcher/health/monitor.go +++ b/internal/watcher/health/monitor.go @@ -127,7 +127,7 @@ func (mon *monitor) String() string { func (mon *monitor) MarshalJSON() ([]byte, error) { return (&JSONRepresentation{ - Name: mon.Name(), + Name: mon.service, Config: mon.config, Status: mon.status.Load(), Started: mon.startTime,