fixed idlewatcher panic, dashboard app name, route not removing on container destroy

This commit is contained in:
yusing 2024-10-16 00:57:10 +08:00
parent 56b778f19c
commit c0c61709ca
10 changed files with 53 additions and 56 deletions

View file

@ -125,6 +125,7 @@ func newSubTask(ctx context.Context, name string) *task {
}
tasksMap.Store(t, struct{}{})
taskWg.Add(1)
logrus.Debugf("task %q started", name)
return t
}

View file

@ -47,9 +47,6 @@ func (cfg *Config) HomepageConfig() homepage.Config {
item := entry.Homepage
if item == nil {
item = new(homepage.Item)
}
if !item.Show && item.IsEmpty() {
item.Show = true
}

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/sirupsen/logrus"
E "github.com/yusing/go-proxy/internal/error"
gphttp "github.com/yusing/go-proxy/internal/net/http"
"github.com/yusing/go-proxy/internal/net/types"
"github.com/yusing/go-proxy/internal/watcher/health"
@ -125,7 +126,7 @@ func (w *Waker) wake(rw http.ResponseWriter, r *http.Request) (shouldNext bool)
select {
case <-w.task.Context().Done():
http.Error(rw, "Waking timed out", http.StatusGatewayTimeout)
http.Error(rw, "Service unavailable", http.StatusServiceUnavailable)
return
case <-ctx.Done():
http.Error(rw, "Waking timed out", http.StatusGatewayTimeout)
@ -133,12 +134,11 @@ func (w *Waker) wake(rw http.ResponseWriter, r *http.Request) (shouldNext bool)
default:
}
// wake the container and reset idle timer
// also wait for another wake request
w.wakeCh <- struct{}{}
if <-w.wakeDone != nil {
http.Error(rw, "Error sending wake request", http.StatusInternalServerError)
w.l.Debug("wake signal received")
err := w.wakeIfStopped()
if err != nil {
w.l.Error(E.FailWith("wake", err))
http.Error(rw, "Error waking container", http.StatusInternalServerError)
return
}
@ -153,6 +153,9 @@ func (w *Waker) wake(rw http.ResponseWriter, r *http.Request) (shouldNext bool)
for {
select {
case <-w.task.Context().Done():
http.Error(rw, "Service unavailable", http.StatusServiceUnavailable)
return
case <-ctx.Done():
http.Error(rw, "Waking timed out", http.StatusGatewayTimeout)
return

View file

@ -27,9 +27,7 @@ type (
ready atomic.Bool // whether the site is ready to accept connection
stopByMethod StopCallback // send a docker command w.r.t. `stop_method`
wakeCh chan struct{}
wakeDone chan E.NestedError
ticker *time.Ticker
ticker *time.Ticker
task common.Task
cancel context.CancelFunc
@ -84,8 +82,6 @@ func Register(entry *P.ReverseProxyEntry) (*Watcher, E.NestedError) {
ReverseProxyEntry: entry,
client: client,
refCount: U.NewRefCounter(),
wakeCh: make(chan struct{}, 1),
wakeDone: make(chan E.NestedError),
ticker: time.NewTicker(entry.IdleTimeout),
l: logger.WithField("container", entry.ContainerName),
}
@ -127,6 +123,9 @@ func (w *Watcher) containerStart() error {
}
func (w *Watcher) containerStatus() (string, E.NestedError) {
if !w.client.Connected() {
return "", E.Failure("docker client closed")
}
json, err := w.client.ContainerInspect(w.task.Context(), w.ContainerID)
if err != nil {
return "", E.FailWith("inspect container", err)
@ -201,10 +200,9 @@ func (w *Watcher) watchUntilCancel() {
})
defer func() {
w.cancel()
w.ticker.Stop()
w.client.Close()
close(w.wakeDone)
close(w.wakeCh)
watcherMap.Delete(w.ContainerID)
w.task.Finished()
}()
@ -220,6 +218,7 @@ func (w *Watcher) watchUntilCancel() {
case err := <-dockerEventErrCh:
if err != nil && err.IsNot(context.Canceled) {
w.l.Error(E.FailWith("docker watcher", err))
return
}
case e := <-dockerEventCh:
switch {
@ -227,27 +226,22 @@ func (w *Watcher) watchUntilCancel() {
case e.Action.IsContainerWake():
w.ContainerRunning = true
w.resetIdleTimer()
w.l.Info(e)
default: // stop / pause / kil
w.l.Info("container awaken")
case e.Action.IsContainerSleep(): // stop / pause / kil
w.ContainerRunning = false
w.ticker.Stop()
w.ready.Store(false)
w.l.Info(e)
default:
w.l.Errorf("unexpected docker event: %s", e)
}
case <-w.ticker.C:
w.l.Debug("idle timeout")
w.ticker.Stop()
if err := w.stopByMethod(); err != nil && err.IsNot(context.Canceled) {
w.l.Error(E.FailWith("stop", err).Extraf("stop method: %s", w.StopMethod))
} else {
w.l.Info("stopped by idle timeout")
}
case <-w.wakeCh:
w.l.Debug("wake signal received")
w.resetIdleTimer()
err := w.wakeIfStopped()
if err != nil {
w.l.Error(E.FailWith("wake", err))
}
w.wakeDone <- err
}
}
}

View file

@ -89,12 +89,6 @@ func (p *DockerProvider) shouldIgnore(container *D.Container) bool {
}
func (p *DockerProvider) OnEvent(event W.Event, oldRoutes R.Routes) (res EventResult) {
switch event.Action {
case events.ActionContainerStart, events.ActionContainerStop:
break
default:
return
}
b := E.NewBuilder("event %s error", event)
defer b.To(&res.err)
@ -105,15 +99,26 @@ func (p *DockerProvider) OnEvent(event W.Event, oldRoutes R.Routes) (res EventRe
matches.Store(k, v)
}
})
//FIXME: docker event die stuck
var newRoutes R.Routes
var err E.NestedError
if matches.Size() == 0 { // id & container name changed
switch {
// id & container name changed
case matches.Size() == 0:
matches = oldRoutes
newRoutes, err = p.LoadRoutesImpl()
b.Add(err)
} else {
case event.Action == events.ActionContainerDestroy:
// stop all old routes
matches.RangeAllParallel(func(_ string, v *R.Route) {
oldRoutes.Delete(v.Entry.Alias)
b.Add(v.Stop())
res.nRemoved++
})
return
default:
cont, err := D.Inspect(p.dockerHost, event.ActorID)
if err != nil {
b.Add(E.FailWith("inspect container", err))
@ -124,6 +129,7 @@ func (p *DockerProvider) OnEvent(event W.Event, oldRoutes R.Routes) (res EventRe
// stop all old routes
matches.RangeAllParallel(func(_ string, v *R.Route) {
b.Add(v.Stop())
res.nRemoved++
})
return
}
@ -145,19 +151,13 @@ func (p *DockerProvider) OnEvent(event W.Event, oldRoutes R.Routes) (res EventRe
newRoutes.RangeAll(func(alias string, newRoute *R.Route) {
oldRoute, exists := oldRoutes.Load(alias)
if exists {
if err := oldRoute.Stop(); err != nil {
b.Add(err)
}
}
oldRoutes.Store(alias, newRoute)
if err := newRoute.Start(); err != nil {
b.Add(err)
}
if exists {
b.Add(oldRoute.Stop())
res.nReloaded++
} else {
res.nAdded++
}
b.Add(newRoute.Start())
oldRoutes.Store(alias, newRoute)
})
return

View file

@ -189,9 +189,11 @@ func (p *Provider) watchEvents() {
case <-p.watcherTask.Context().Done():
return
case event := <-events:
task := p.watcherTask.Subtask("%s event %s", event.Type, event)
l.Infof("%s event %q", event.Type, event)
res := p.OnEvent(event, p.routes)
task.Finished()
if res.nAdded+res.nRemoved+res.nReloaded > 0 {
l.Infof("%s event %q", event.Type, event)
l.Infof("| %d NEW | %d REMOVED | %d RELOADED |", res.nAdded, res.nRemoved, res.nReloaded)
}
if res.err != nil {

View file

@ -129,7 +129,7 @@ func (r *HTTPRoute) Start() E.NestedError {
r.handler = waker
r.HealthMon = waker
case !r.HealthCheck.Disabled:
r.HealthMon = health.NewHTTPHealthMonitor(common.GlobalTask("Reverse proxy "+r.String()), r.URL(), r.HealthCheck)
r.HealthMon = health.NewHTTPHealthMonitor(common.GlobalTask(r.String()), r.URL(), r.HealthCheck)
}
if r.handler == nil {

View file

@ -28,6 +28,7 @@ var (
DockerFilterStart = filters.Arg("event", string(docker_events.ActionStart))
DockerFilterStop = filters.Arg("event", string(docker_events.ActionStop))
DockerFilterDie = filters.Arg("event", string(docker_events.ActionDie))
DockerFilterDestroy = filters.Arg("event", string(docker_events.ActionDestroy))
DockerFilterKill = filters.Arg("event", string(docker_events.ActionKill))
DockerFilterPause = filters.Arg("event", string(docker_events.ActionPause))
DockerFilterUnpause = filters.Arg("event", string(docker_events.ActionUnPause))
@ -97,12 +98,8 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
defer w.client.Close()
w.Debugf("client connected")
cEventCh, cErrCh := w.client.Events(ctx, options)
w.Debugf("watcher started")
for {
select {
case <-ctx.Done():
@ -148,4 +145,5 @@ var optionsWatchAll = DockerListOptions{Filters: NewDockerFilter(
DockerFilterStart,
// DockerFilterStop,
DockerFilterDie,
DockerFilterDestroy,
)}

View file

@ -32,6 +32,7 @@ const (
ActionContainerStop
ActionContainerPause
ActionContainerDie
ActionContainerDestroy
actionContainerWakeMask = ActionContainerCreate | ActionContainerStart | ActionContainerUnpause
actionContainerSleepMask = ActionContainerKill | ActionContainerStop | ActionContainerPause | ActionContainerDie
@ -47,10 +48,11 @@ var DockerEventMap = map[dockerEvents.Action]Action{
dockerEvents.ActionStart: ActionContainerStart,
dockerEvents.ActionUnPause: ActionContainerUnpause,
dockerEvents.ActionKill: ActionContainerKill,
dockerEvents.ActionStop: ActionContainerStop,
dockerEvents.ActionPause: ActionContainerPause,
dockerEvents.ActionDie: ActionContainerDie,
dockerEvents.ActionKill: ActionContainerKill,
dockerEvents.ActionStop: ActionContainerStop,
dockerEvents.ActionPause: ActionContainerPause,
dockerEvents.ActionDie: ActionContainerDie,
dockerEvents.ActionDestroy: ActionContainerDestroy,
}
var fileActionNameMap = map[Action]string{

View file

@ -127,7 +127,7 @@ func (mon *monitor) String() string {
func (mon *monitor) MarshalJSON() ([]byte, error) {
return (&JSONRepresentation{
Name: mon.Name(),
Name: mon.service,
Config: mon.config,
Status: mon.status.Load(),
Started: mon.startTime,