From a39d527fc1f6f47f48c5cb626b807018c0446a92 Mon Sep 17 00:00:00 2001 From: yusing Date: Wed, 4 Jun 2025 23:26:38 +0800 Subject: [PATCH] feat(idlesleep): support container dependencies, including custom and docker depends_on, code refactor --- go.mod | 2 +- internal/docker/container.go | 51 ++- internal/docker/labels.go | 1 + internal/idlewatcher/common.go | 9 + internal/idlewatcher/debug.go | 7 + internal/idlewatcher/errors.go | 65 ++++ internal/idlewatcher/handle_http.go | 26 +- internal/idlewatcher/handle_stream.go | 14 +- internal/idlewatcher/health.go | 67 ++-- internal/idlewatcher/types/config.go | 28 +- internal/idlewatcher/types/config_test.go | 3 +- internal/idlewatcher/watcher.go | 381 ++++++++++++++++++---- internal/route/reverse_proxy.go | 2 +- internal/route/routes/route.go | 1 + internal/route/stream.go | 2 +- 15 files changed, 507 insertions(+), 152 deletions(-) create mode 100644 internal/idlewatcher/errors.go diff --git a/go.mod b/go.mod index 6a7d3dc..ec126f4 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( golang.org/x/crypto v0.38.0 // encrypting password with bcrypt golang.org/x/net v0.40.0 // HTTP header utilities golang.org/x/oauth2 v0.30.0 // oauth2 authentication + golang.org/x/sync v0.14.0 golang.org/x/time v0.11.0 // time utilities ) @@ -227,7 +228,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/ratelimit v0.3.1 // indirect golang.org/x/mod v0.24.0 // indirect - golang.org/x/sync v0.14.0 // indirect golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.25.0 // indirect golang.org/x/tools v0.33.0 // indirect diff --git a/internal/docker/container.go b/internal/docker/container.go index ddc4ac6..c3162f5 100644 --- a/internal/docker/container.go +++ b/internal/docker/container.go @@ -56,13 +56,15 @@ type ( var DummyContainer = new(Container) func FromDocker(c *container.SummaryTrimmed, dockerHost string) (res *Container) { - isExplicit := false + _, isExplicit := c.Labels[LabelAliases] helper := containerHelper{c} - for lbl := range c.Labels { - if strings.HasPrefix(lbl, NSProxy+".") { - isExplicit = true - } else { - delete(c.Labels, lbl) + if !isExplicit { + // walk through all labels to check if any label starts with NSProxy. + for lbl := range c.Labels { + if strings.HasPrefix(lbl, NSProxy+".") { + isExplicit = true + break + } } } @@ -124,14 +126,30 @@ func (c *Container) UpdatePorts() error { continue } c.PublicPortMapping[portInt] = container.Port{ - PublicPort: uint16(portInt), - PrivatePort: uint16(portInt), + PublicPort: uint16(portInt), //nolint:gosec + PrivatePort: uint16(portInt), //nolint:gosec Type: proto, } } return nil } +func (c *Container) DockerComposeProject() string { + return c.Labels["com.docker.compose.project"] +} + +func (c *Container) DockerComposeService() string { + return c.Labels["com.docker.compose.service"] +} + +func (c *Container) Dependencies() []string { + deps := c.Labels[LabelDependsOn] + if deps == "" { + deps = c.Labels["com.docker.compose.depends_on"] + } + return strings.Split(deps, ",") +} + var databaseMPs = map[string]struct{}{ "/var/lib/postgresql/data": {}, "/var/lib/mysql": {}, @@ -214,17 +232,22 @@ func (c *Container) loadDeleteIdlewatcherLabels(helper containerHelper) { "stop_timeout": helper.getDeleteLabel(LabelStopTimeout), "stop_signal": helper.getDeleteLabel(LabelStopSignal), "start_endpoint": helper.getDeleteLabel(LabelStartEndpoint), + "depends_on": c.Dependencies(), } + + // ensure it's deleted from labels + helper.getDeleteLabel(LabelDependsOn) + // set only if idlewatcher is enabled idleTimeout := cfg["idle_timeout"] if idleTimeout != "" { - idwCfg := &idlewatcher.Config{ - Docker: &idlewatcher.DockerConfig{ - DockerHost: c.DockerHost, - ContainerID: c.ContainerID, - ContainerName: c.ContainerName, - }, + idwCfg := new(idlewatcher.Config) + idwCfg.Docker = &idlewatcher.DockerConfig{ + DockerHost: c.DockerHost, + ContainerID: c.ContainerID, + ContainerName: c.ContainerName, } + err := serialization.MapUnmarshalValidate(cfg, idwCfg) if err != nil { gperr.LogWarn("invalid idlewatcher config", gperr.PrependSubject(c.ContainerName, err)) diff --git a/internal/docker/labels.go b/internal/docker/labels.go index 0a9e0a5..7489f02 100644 --- a/internal/docker/labels.go +++ b/internal/docker/labels.go @@ -13,4 +13,5 @@ const ( LabelStopTimeout = NSProxy + ".stop_timeout" LabelStopSignal = NSProxy + ".stop_signal" LabelStartEndpoint = NSProxy + ".start_endpoint" + LabelDependsOn = NSProxy + ".depends_on" ) diff --git a/internal/idlewatcher/common.go b/internal/idlewatcher/common.go index 6753237..ecc93bb 100644 --- a/internal/idlewatcher/common.go +++ b/internal/idlewatcher/common.go @@ -11,3 +11,12 @@ func (w *Watcher) cancelled(reqCtx context.Context) bool { return false } } + +func (w *Watcher) waitStarted(reqCtx context.Context) bool { + select { + case <-reqCtx.Done(): + return false + case <-w.route.Started(): + return true + } +} diff --git a/internal/idlewatcher/debug.go b/internal/idlewatcher/debug.go index 527d3fc..08a26ae 100644 --- a/internal/idlewatcher/debug.go +++ b/internal/idlewatcher/debug.go @@ -39,3 +39,10 @@ func Watchers() iter.Seq2[string, watcherDebug] { } } } + +func fmtErr(err error) string { + if err == nil { + return "" + } + return err.Error() +} diff --git a/internal/idlewatcher/errors.go b/internal/idlewatcher/errors.go new file mode 100644 index 0000000..7cadada --- /dev/null +++ b/internal/idlewatcher/errors.go @@ -0,0 +1,65 @@ +package idlewatcher + +import ( + "context" + "errors" + "fmt" +) + +type watcherError struct { + watcher *Watcher + err error +} + +func (e *watcherError) Unwrap() error { + return e.err +} + +func (e *watcherError) Error() string { + return fmt.Sprintf("watcher %q error: %s", e.watcher.cfg.ContainerName(), e.err.Error()) +} + +func (w *Watcher) newWatcherError(err error) error { + if errors.Is(err, causeReload) { + return nil + } + if wErr, ok := err.(*watcherError); ok { //nolint:errorlint + return wErr + } + return &watcherError{watcher: w, err: convertError(err)} +} + +type depError struct { + action string + dep *dependency + err error +} + +func (e *depError) Unwrap() error { + return e.err +} + +func (e *depError) Error() string { + return fmt.Sprintf("%s failed for dependency %q: %s", e.action, e.dep.cfg.ContainerName(), e.err.Error()) +} + +func (w *Watcher) newDepError(action string, dep *dependency, err error) error { + if errors.Is(err, causeReload) { + return nil + } + if dErr, ok := err.(*depError); ok { //nolint:errorlint + return dErr + } + return w.newWatcherError(&depError{action: action, dep: dep, err: convertError(err)}) +} + +func convertError(err error) error { + switch { + case err == nil: + return nil + case errors.Is(err, context.DeadlineExceeded): + return errors.New("timeout") + default: + return err + } +} diff --git a/internal/idlewatcher/handle_http.go b/internal/idlewatcher/handle_http.go index 49b5b50..0c2f0e9 100644 --- a/internal/idlewatcher/handle_http.go +++ b/internal/idlewatcher/handle_http.go @@ -1,8 +1,6 @@ package idlewatcher import ( - "context" - "errors" "net/http" "strconv" "time" @@ -38,10 +36,6 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) { return default: f := &ForceCacheControl{expires: w.expires().Format(http.TimeFormat), ResponseWriter: rw} - w, ok := watcherMap[w.Key()] // could've been reloaded - if !ok { - return - } w.rp.ServeHTTP(f, r) } } @@ -50,6 +44,14 @@ func isFaviconPath(path string) bool { return path == "/favicon.ico" } +func (w *Watcher) redirectToStartEndpoint(rw http.ResponseWriter, r *http.Request) { + uri := "/" + if w.cfg.StartEndpoint != "" { + uri = w.cfg.StartEndpoint + } + http.Redirect(rw, r, uri, http.StatusTemporaryRedirect) +} + func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldNext bool) { w.resetIdleTimer() @@ -92,7 +94,7 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN ctx := r.Context() if w.cancelled(ctx) { - gphttp.ServerError(rw, r, context.Cause(ctx), http.StatusServiceUnavailable) + w.redirectToStartEndpoint(rw, r) return false } @@ -103,17 +105,19 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN return false } - var ready bool - for { w.resetIdleTimer() if w.cancelled(ctx) { - gphttp.ServerError(rw, r, context.Cause(ctx), http.StatusServiceUnavailable) + w.redirectToStartEndpoint(rw, r) return false } - w, ready, err = checkUpdateState(w.Key()) + if !w.waitStarted(ctx) { + return false + } + + ready, err := w.checkUpdateState() if err != nil { gphttp.ServerError(rw, r, err) return false diff --git a/internal/idlewatcher/handle_stream.go b/internal/idlewatcher/handle_stream.go index 50cf6e2..96810af 100644 --- a/internal/idlewatcher/handle_stream.go +++ b/internal/idlewatcher/handle_stream.go @@ -2,7 +2,6 @@ package idlewatcher import ( "context" - "errors" "net" "time" @@ -53,22 +52,13 @@ func (w *Watcher) wakeFromStream() error { } w.l.Debug().Msg("wake signal received") - err := w.wakeIfStopped() + err := w.Wake(context.Background()) if err != nil { return err } - ctx, cancel := context.WithTimeoutCause(w.task.Context(), w.cfg.WakeTimeout, errors.New("wake timeout")) - defer cancel() - - var ready bool - for { - if w.cancelled(ctx) { - return context.Cause(ctx) - } - - w, ready, err = checkUpdateState(w.Key()) + ready, err := w.checkUpdateState() if err != nil { return err } diff --git a/internal/idlewatcher/health.go b/internal/idlewatcher/health.go index 8e3a8c6..6828e34 100644 --- a/internal/idlewatcher/health.go +++ b/internal/idlewatcher/health.go @@ -1,7 +1,6 @@ package idlewatcher import ( - "errors" "time" "github.com/yusing/go-proxy/internal/gperr" @@ -80,43 +79,6 @@ func (w *Watcher) Detail() string { return "napping" } -func checkUpdateState(key string) (w *Watcher, ready bool, err error) { - watcherMapMu.RLock() - w, ok := watcherMap[key] - if !ok { - watcherMapMu.RUnlock() - return nil, false, errors.New("watcher not found") - } - watcherMapMu.RUnlock() - - // already ready - if w.ready() { - return w, true, nil - } - - if !w.running() { - return w, false, nil - } - - // the new container info not yet updated - if w.hc.URL().Host == "" { - return w, false, nil - } - - res, err := w.hc.CheckHealth() - if err != nil { - w.setError(err) - return w, false, err - } - - if res.Healthy { - w.setReady() - return w, true, nil - } - w.setStarting() - return w, false, nil -} - // MarshalJSON implements health.HealthMonitor. func (w *Watcher) MarshalJSON() ([]byte, error) { url := w.hc.URL() @@ -135,3 +97,32 @@ func (w *Watcher) MarshalJSON() ([]byte, error) { Detail: detail, }).MarshalJSON() } + +func (w *Watcher) checkUpdateState() (ready bool, err error) { + // already ready + if w.ready() { + return true, nil + } + + if !w.running() { + return false, nil + } + + // the new container info not yet updated + if w.hc.URL().Host == "" { + return false, nil + } + + res, err := w.hc.CheckHealth() + if err != nil { + w.setError(err) + return false, err + } + + if res.Healthy { + w.setReady() + return true, nil + } + w.setStarting() + return false, nil +} diff --git a/internal/idlewatcher/types/config.go b/internal/idlewatcher/types/config.go index ea1d123..116354d 100644 --- a/internal/idlewatcher/types/config.go +++ b/internal/idlewatcher/types/config.go @@ -10,16 +10,26 @@ import ( ) type ( - Config struct { + ProviderConfig struct { Proxmox *ProxmoxConfig `json:"proxmox,omitempty"` Docker *DockerConfig `json:"docker,omitempty"` + } + IdlewatcherConfig struct { + // 0: no idle watcher. + // Positive: idle watcher with idle timeout. + // Negative: idle watcher as a dependency. IdleTimeout time.Duration `json:"idle_timeout" json_ext:"duration"` + IdleTimeout time.Duration `json:"idle_timeout"` + WakeTimeout time.Duration `json:"wake_timeout"` + StopTimeout time.Duration `json:"stop_timeout"` + StopMethod StopMethod `json:"stop_method"` + StopSignal Signal `json:"stop_signal,omitempty"` + } + Config struct { + ProviderConfig + IdlewatcherConfig - IdleTimeout time.Duration `json:"idle_timeout" json_ext:"duration"` - WakeTimeout time.Duration `json:"wake_timeout" json_ext:"duration"` - StopTimeout time.Duration `json:"stop_timeout" json_ext:"duration"` - StopMethod StopMethod `json:"stop_method"` - StopSignal Signal `json:"stop_signal,omitempty"` - StartEndpoint string `json:"start_endpoint,omitempty"` // Optional path that must be hit to start container + StartEndpoint string `json:"start_endpoint,omitempty"` // Optional path that must be hit to start container + DependsOn []string `json:"depends_on,omitempty"` } StopMethod string Signal string @@ -55,11 +65,11 @@ func (c *Config) ContainerName() string { if c.Docker != nil { return c.Docker.ContainerName } - return "lxc " + strconv.Itoa(c.Proxmox.VMID) + return "lxc-" + strconv.Itoa(c.Proxmox.VMID) } func (c *Config) Validate() gperr.Error { - if c.IdleTimeout == 0 { // no idle timeout means no idle watcher + if c.IdleTimeout == 0 { // zero idle timeout means no idle watcher return nil } errs := gperr.NewBuilder("idlewatcher config validation error") diff --git a/internal/idlewatcher/types/config_test.go b/internal/idlewatcher/types/config_test.go index b9f3861..5d194b9 100644 --- a/internal/idlewatcher/types/config_test.go +++ b/internal/idlewatcher/types/config_test.go @@ -35,7 +35,8 @@ func TestValidateStartEndpoint(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - cfg := Config{StartEndpoint: tc.input} + cfg := new(Config) + cfg.StartEndpoint = tc.input err := cfg.validateStartEndpoint() if err == nil { expect.Equal(t, cfg.StartEndpoint, tc.input) diff --git a/internal/idlewatcher/watcher.go b/internal/idlewatcher/watcher.go index ddfc722..3d344a4 100644 --- a/internal/idlewatcher/watcher.go +++ b/internal/idlewatcher/watcher.go @@ -3,6 +3,8 @@ package idlewatcher import ( "context" "errors" + "maps" + "strings" "sync" "time" @@ -20,10 +22,13 @@ import ( "github.com/yusing/go-proxy/internal/watcher/events" "github.com/yusing/go-proxy/internal/watcher/health" "github.com/yusing/go-proxy/internal/watcher/health/monitor" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/singleflight" ) type ( routeHelper struct { + route routes.Route rp *reverseproxy.ReverseProxy stream net.Stream hc health.HealthChecker @@ -48,8 +53,15 @@ type ( state atomic.Value[*containerState] lastReset atomic.Value[time.Time] - ticker *time.Ticker - task *task.Task + idleTicker *time.Ticker + task *task.Task + + dependsOn []*dependency + } + + dependency struct { + *Watcher + waitHealthy bool } StopCallback func() error @@ -57,9 +69,12 @@ type ( const ContextKey = "idlewatcher.watcher" +// TODO: replace -1 with neverTick + var ( watcherMap = make(map[string]*Watcher) watcherMapMu sync.RWMutex + singleFlight singleflight.Group ) const ( @@ -79,51 +94,167 @@ var ( const reqTimeout = 3 * time.Second +// prevents dependencies from being stopped automatically. +const neverTick = time.Duration(1<<63 - 1) + // TODO: fix stream type. -func NewWatcher(parent task.Parent, r routes.Route) (*Watcher, error) { - cfg := r.IdlewatcherConfig() +func NewWatcher(parent task.Parent, r routes.Route, cfg *idlewatcher.Config) (*Watcher, error) { key := cfg.Key() watcherMapMu.RLock() // if the watcher already exists, finish it w, exists := watcherMap[key] - if exists { - if w.cfg == cfg { - // same address, likely two routes from the same container - return w, nil - } - w.task.FinishAndWait(causeReload) - } watcherMapMu.RUnlock() - w = &Watcher{ - ticker: time.NewTicker(cfg.IdleTimeout), - cfg: cfg, - routeHelper: routeHelper{ - hc: monitor.NewMonitor(r), - }, + if exists { + if len(cfg.DependsOn) > 0 { + w.cfg.DependsOn = cfg.DependsOn + } + if cfg.IdleTimeout > 0 { + w.cfg.IdlewatcherConfig = cfg.IdlewatcherConfig + } + cfg = w.cfg + w.resetIdleTimer() + } else { + w = &Watcher{ + idleTicker: time.NewTicker(cfg.IdleTimeout), + cfg: cfg, + routeHelper: routeHelper{ + hc: monitor.NewMonitor(r), + }, + dependsOn: make([]*dependency, 0, len(cfg.DependsOn)), + } + } + + depErrors := gperr.NewBuilder() + for i, dep := range cfg.DependsOn { + depSegments := strings.Split(dep, ":") + dep = depSegments[0] + if dep == "" { // empty dependency (likely stopped container), skip; it will be removed by dedupDependencies() + continue + } + cfg.DependsOn[i] = dep + waitHealthy := false + if len(depSegments) > 1 { // likely from `com.docker.compose.depends_on` label + switch depSegments[1] { + case "service_started": + case "service_healthy": + waitHealthy = true + // case "service_completed_successfully": + default: + depErrors.Addf("dependency %q has unsupported condition %q", dep, depSegments[1]) + continue + } + } + + cont := r.ContainerInfo() + + var depRoute routes.Route + var ok bool + + // try to find the dependency in the same provider and the same docker compose project first + if cont != nil { + depRoute, ok = r.GetProvider().FindService(cont.DockerComposeProject(), dep) + } + + if !ok { + depRoute, ok = routes.Get(dep) + if !ok { + depErrors.Addf("dependency %q not found", dep) + continue + } + } + + if depRoute == r { + depErrors.Addf("dependency %q cannot have itself as a dependency (same route)", dep) + continue + } + + // wait for the dependency to be started + <-depRoute.Started() + + if waitHealthy && !depRoute.UseHealthCheck() { + depErrors.Addf("dependency %q has service_healthy condition but has healthcheck disabled", dep) + continue + } + + depCfg := depRoute.IdlewatcherConfig() + if depCfg == nil { + depCfg = new(idlewatcher.Config) + depCfg.IdlewatcherConfig = cfg.IdlewatcherConfig + depCfg.IdleTimeout = neverTick // disable auto sleep for dependencies + } else if depCfg.IdleTimeout > 0 { + depErrors.Addf("dependency %q has positive idle timeout %s", dep, depCfg.IdleTimeout) + continue + } + + if depCfg.Docker == nil && depCfg.Proxmox == nil { + depCont := depRoute.ContainerInfo() + if depCont != nil { + depCfg.Docker = &idlewatcher.DockerConfig{ + DockerHost: depCont.DockerHost, + ContainerID: depCont.ContainerID, + ContainerName: depCont.ContainerName, + } + depCfg.DependsOn = depCont.Dependencies() + } else { + depErrors.Addf("dependency %q has no idlewatcher config but is not a docker container", dep) + continue + } + } + + if depCfg.Key() == cfg.Key() { + depErrors.Addf("dependency %q cannot have itself as a dependency (same container)", dep) + continue + } + + depCfg.IdleTimeout = neverTick // disable auto sleep for dependencies + + depWatcher, err := NewWatcher(parent, depRoute, depCfg) + if err != nil { + depErrors.Add(err) + continue + } + w.dependsOn = append(w.dependsOn, &dependency{ + Watcher: depWatcher, + waitHealthy: waitHealthy, + }) + } + + if w.provider != nil { // it's a reload, close the old provider + w.provider.Close() + } + + if depErrors.HasError() { + return nil, depErrors.Error() + } + + if !exists { + watcherMapMu.Lock() + defer watcherMapMu.Unlock() } var p idlewatcher.Provider - var providerType string var err error + var kind string switch { case cfg.Docker != nil: p, err = provider.NewDockerProvider(cfg.Docker.DockerHost, cfg.Docker.ContainerID) - providerType = "docker" + kind = "docker" default: p, err = provider.NewProxmoxProvider(cfg.Proxmox.Node, cfg.Proxmox.VMID) - providerType = "proxmox" + kind = "proxmox" } + w.l = log.With(). + Stringer("idle_timeout", cfg.IdleTimeout). + Str("kind", kind). + Str("container", cfg.ContainerName()). + Logger() if err != nil { return nil, err } w.provider = p - w.l = log.With(). - Str("provider", providerType). - Str("container", cfg.ContainerName()). - Logger() switch r := r.(type) { case routes.ReverseProxyRoute: @@ -131,18 +262,22 @@ func NewWatcher(parent task.Parent, r routes.Route) (*Watcher, error) { case routes.StreamRoute: w.stream = r default: - return nil, gperr.Errorf("unexpected route type: %T", r) + w.provider.Close() + return nil, w.newWatcherError(gperr.Errorf("unexpected route type: %T", r)) } + w.route = r ctx, cancel := context.WithTimeout(parent.Context(), reqTimeout) defer cancel() status, err := w.provider.ContainerStatus(ctx) if err != nil { w.provider.Close() - return nil, gperr.Wrap(err, "failed to get container status") + return nil, w.newWatcherError(err) } + w.state.Store(&containerState{status: status}) - switch p := w.provider.(type) { + // when more providers are added, we need to add a new case here. + switch p := w.provider.(type) { //nolint:gocritic case *provider.ProxmoxProvider: shutdownTimeout := max(time.Second, cfg.StopTimeout-idleWakerCheckTimeout) err = p.LXCSetShutdownTimeout(ctx, cfg.Proxmox.VMID, shutdownTimeout) @@ -151,31 +286,38 @@ func NewWatcher(parent task.Parent, r routes.Route) (*Watcher, error) { } } - w.state.Store(&containerState{status: status}) + if !exists { + w.task = parent.Subtask("idlewatcher."+r.Name(), true) + watcherMap[key] = w - w.task = parent.Subtask("idlewatcher."+r.Name(), true) + go func() { + cause := w.watchUntilDestroy() + if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) { + watcherMapMu.Lock() + delete(watcherMap, key) + watcherMapMu.Unlock() + w.l.Info().Msg("idlewatcher stopped") + } else if !errors.Is(cause, causeReload) { + gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l) + } - watcherMapMu.Lock() - watcherMap[key] = w - watcherMapMu.Unlock() + w.idleTicker.Stop() + w.provider.Close() + w.task.Finish(cause) + }() + } - go func() { - cause := w.watchUntilDestroy() - if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) { - watcherMapMu.Lock() - delete(watcherMap, key) - watcherMapMu.Unlock() - w.l.Info().Msg("idlewatcher stopped") - } else if !errors.Is(cause, causeReload) { - gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l) - } + hcCfg := w.hc.Config() + hcCfg.BaseContext = func() context.Context { + return w.task.Context() + } + hcCfg.Timeout = cfg.WakeTimeout - w.ticker.Stop() - w.provider.Close() - w.task.Finish(cause) - }() + w.dedupDependencies() + + w.l = w.l.With().Strs("deps", cfg.DependsOn).Logger() if exists { - w.l.Info().Msg("idlewatcher reloaded") + w.l.Debug().Msg("idlewatcher reloaded") } else { w.l.Info().Msg("idlewatcher started") } @@ -186,7 +328,65 @@ func (w *Watcher) Key() string { return w.cfg.Key() } +// Wake wakes the container. +// +// It will cancel as soon as the either of the passed in context or the watcher is done. +// +// It uses singleflight to prevent multiple wake calls at the same time. +// +// It will wake the dependencies first, and then wake itself. +// If the container is already running, it will do nothing. +// If the container is not running, it will start it. +// If the container is paused, it will unpause it. +// If the container is stopped, it will do nothing. func (w *Watcher) Wake(ctx context.Context) error { + // wake dependencies first. + if err := w.wakeDependencies(ctx); err != nil { + return w.newWatcherError(err) + } + + // wake itself. + // use container name instead of Key() here as the container id will change on restart (docker). + _, err, _ := singleFlight.Do(w.cfg.ContainerName(), func() (any, error) { + return nil, w.wakeIfStopped(ctx) + }) + if err != nil { + return w.newWatcherError(err) + } + + return nil +} + +func (w *Watcher) wakeDependencies(ctx context.Context) error { + if len(w.dependsOn) == 0 { + return nil + } + + errs := errgroup.Group{} + for _, dep := range w.dependsOn { + errs.Go(func() error { + if err := dep.Wake(ctx); err != nil { + return err + } + if dep.waitHealthy { + for { + select { + case <-ctx.Done(): + return w.newDepError("wait_healthy", dep, context.Cause(ctx)) + default: + if h, err := dep.hc.CheckHealth(); err != nil { + return err + } else if h.Healthy { + return nil + } + time.Sleep(idleWakerCheckInterval) + } + } + } + return nil + }) + } + return errs.Wait() } func (w *Watcher) wakeIfStopped(ctx context.Context) error { @@ -210,34 +410,66 @@ func (w *Watcher) wakeIfStopped(ctx context.Context) error { } } +func (w *Watcher) stopDependencies() error { + if len(w.dependsOn) == 0 { + return nil + } + + errs := errgroup.Group{} + for _, dep := range w.dependsOn { + errs.Go(dep.stopByMethod) + } + return errs.Wait() +} + func (w *Watcher) stopByMethod() error { + // no need singleflight here because it will only be called once every tick. + + // if the container is not running, skip and stop dependencies. if !w.running() { + if err := w.stopDependencies(); err != nil { + return w.newWatcherError(err) + } return nil } cfg := w.cfg - ctx, cancel := context.WithTimeout(w.task.Context(), cfg.StopTimeout) + ctx, cancel := context.WithTimeout(context.Background(), cfg.StopTimeout) defer cancel() + // stop itself first. + var err error switch cfg.StopMethod { case idlewatcher.StopMethodPause: - return w.provider.ContainerPause(ctx) + err = w.provider.ContainerPause(ctx) case idlewatcher.StopMethodStop: - return w.provider.ContainerStop(ctx, cfg.StopSignal, int(cfg.StopTimeout.Seconds())) + err = w.provider.ContainerStop(ctx, cfg.StopSignal, int(cfg.StopTimeout.Seconds())) case idlewatcher.StopMethodKill: - return w.provider.ContainerKill(ctx, cfg.StopSignal) + err = w.provider.ContainerKill(ctx, cfg.StopSignal) default: - return gperr.Errorf("unexpected stop method: %q", cfg.StopMethod) + err = w.newWatcherError(gperr.Errorf("unexpected stop method: %q", cfg.StopMethod)) } + + if err != nil { + return w.newWatcherError(err) + } + + w.l.Info().Msg("container stopped") + + // then stop dependencies. + if err := w.stopDependencies(); err != nil { + return w.newWatcherError(err) + } + return nil } func (w *Watcher) resetIdleTimer() { - w.ticker.Reset(w.cfg.IdleTimeout) + w.idleTicker.Reset(w.cfg.IdleTimeout) w.lastReset.Store(time.Now()) } func (w *Watcher) expires() time.Time { - if !w.running() { + if !w.running() || w.cfg.IdleTimeout <= 0 { return time.Time{} } return w.lastReset.Load().Add(w.cfg.IdleTimeout) @@ -278,15 +510,15 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { w.l.Info().Msg("awaken") case e.Action.IsContainerStop(): // stop / kill / die w.setNapping(idlewatcher.ContainerStatusStopped) - w.ticker.Stop() + w.idleTicker.Stop() case e.Action.IsContainerPause(): // pause w.setNapping(idlewatcher.ContainerStatusPaused) - w.ticker.Stop() + w.idleTicker.Stop() default: - w.l.Error().Stringer("action", e.Action).Msg("unexpected container action") + w.l.Debug().Stringer("action", e.Action).Msg("unexpected container action") } - case <-w.ticker.C: - w.ticker.Stop() + case <-w.idleTicker.C: + w.idleTicker.Stop() if w.running() { err := w.stopByMethod() switch { @@ -298,16 +530,37 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) { } w.l.Err(err).Msgf("container stop with method %q failed", w.cfg.StopMethod) default: - w.l.Info().Str("reason", "idle timeout").Msg("container stopped") + w.l.Info().Msg("idle timeout") } } } } } -func fmtErr(err error) string { - if err == nil { - return "" +func (w *Watcher) dedupDependencies() { + // remove from dependencies if the dependency is also a dependency of another dependency, or have duplicates. + deps := w.dependencies() + for _, dep := range w.dependsOn { + depdeps := dep.dependencies() + for depdep := range depdeps { + delete(deps, depdep) + } } - return err.Error() + newDepOn := make([]string, 0, len(deps)) + newDeps := make([]*dependency, 0, len(deps)) + for _, dep := range deps { + newDepOn = append(newDepOn, dep.cfg.ContainerName()) + newDeps = append(newDeps, dep) + } + w.cfg.DependsOn = newDepOn + w.dependsOn = newDeps +} + +func (w *Watcher) dependencies() map[string]*dependency { + deps := make(map[string]*dependency) + for _, dep := range w.dependsOn { + deps[dep.Key()] = dep + maps.Copy(deps, dep.dependencies()) + } + return deps } diff --git a/internal/route/reverse_proxy.go b/internal/route/reverse_proxy.go index 025ce77..f614cf8 100755 --- a/internal/route/reverse_proxy.go +++ b/internal/route/reverse_proxy.go @@ -102,7 +102,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error { switch { case r.UseIdleWatcher(): - waker, err := idlewatcher.NewWatcher(parent, r) + waker, err := idlewatcher.NewWatcher(parent, r, r.IdlewatcherConfig()) if err != nil { r.task.Finish(err) return gperr.Wrap(err) diff --git a/internal/route/routes/route.go b/internal/route/routes/route.go index 83a1f30..a8a7d2c 100644 --- a/internal/route/routes/route.go +++ b/internal/route/routes/route.go @@ -23,6 +23,7 @@ type ( task.TaskFinisher pool.Object ProviderName() string + GetProvider() Provider TargetURL() *net.URL HealthMonitor() health.HealthMonitor References() []string diff --git a/internal/route/stream.go b/internal/route/stream.go index 1060c1a..077a89c 100755 --- a/internal/route/stream.go +++ b/internal/route/stream.go @@ -46,7 +46,7 @@ func (r *StreamRoute) Start(parent task.Parent) gperr.Error { switch { case r.UseIdleWatcher(): - waker, err := idlewatcher.NewWatcher(parent, r) + waker, err := idlewatcher.NewWatcher(parent, r, r.IdlewatcherConfig()) if err != nil { r.task.Finish(err) return gperr.Wrap(err, "idlewatcher error")