feat(idlesleep): support container dependencies, including custom and docker depends_on, code refactor

This commit is contained in:
yusing 2025-06-04 23:26:38 +08:00
parent 22ab043e06
commit a39d527fc1
15 changed files with 507 additions and 152 deletions

2
go.mod
View file

@ -32,6 +32,7 @@ require (
golang.org/x/crypto v0.38.0 // encrypting password with bcrypt golang.org/x/crypto v0.38.0 // encrypting password with bcrypt
golang.org/x/net v0.40.0 // HTTP header utilities golang.org/x/net v0.40.0 // HTTP header utilities
golang.org/x/oauth2 v0.30.0 // oauth2 authentication golang.org/x/oauth2 v0.30.0 // oauth2 authentication
golang.org/x/sync v0.14.0
golang.org/x/time v0.11.0 // time utilities golang.org/x/time v0.11.0 // time utilities
) )
@ -227,7 +228,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
go.uber.org/ratelimit v0.3.1 // indirect go.uber.org/ratelimit v0.3.1 // indirect
golang.org/x/mod v0.24.0 // indirect golang.org/x/mod v0.24.0 // indirect
golang.org/x/sync v0.14.0 // indirect
golang.org/x/sys v0.33.0 // indirect golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.25.0 // indirect golang.org/x/text v0.25.0 // indirect
golang.org/x/tools v0.33.0 // indirect golang.org/x/tools v0.33.0 // indirect

View file

@ -56,13 +56,15 @@ type (
var DummyContainer = new(Container) var DummyContainer = new(Container)
func FromDocker(c *container.SummaryTrimmed, dockerHost string) (res *Container) { func FromDocker(c *container.SummaryTrimmed, dockerHost string) (res *Container) {
isExplicit := false _, isExplicit := c.Labels[LabelAliases]
helper := containerHelper{c} helper := containerHelper{c}
for lbl := range c.Labels { if !isExplicit {
if strings.HasPrefix(lbl, NSProxy+".") { // walk through all labels to check if any label starts with NSProxy.
isExplicit = true for lbl := range c.Labels {
} else { if strings.HasPrefix(lbl, NSProxy+".") {
delete(c.Labels, lbl) isExplicit = true
break
}
} }
} }
@ -124,14 +126,30 @@ func (c *Container) UpdatePorts() error {
continue continue
} }
c.PublicPortMapping[portInt] = container.Port{ c.PublicPortMapping[portInt] = container.Port{
PublicPort: uint16(portInt), PublicPort: uint16(portInt), //nolint:gosec
PrivatePort: uint16(portInt), PrivatePort: uint16(portInt), //nolint:gosec
Type: proto, Type: proto,
} }
} }
return nil return nil
} }
func (c *Container) DockerComposeProject() string {
return c.Labels["com.docker.compose.project"]
}
func (c *Container) DockerComposeService() string {
return c.Labels["com.docker.compose.service"]
}
func (c *Container) Dependencies() []string {
deps := c.Labels[LabelDependsOn]
if deps == "" {
deps = c.Labels["com.docker.compose.depends_on"]
}
return strings.Split(deps, ",")
}
var databaseMPs = map[string]struct{}{ var databaseMPs = map[string]struct{}{
"/var/lib/postgresql/data": {}, "/var/lib/postgresql/data": {},
"/var/lib/mysql": {}, "/var/lib/mysql": {},
@ -214,17 +232,22 @@ func (c *Container) loadDeleteIdlewatcherLabels(helper containerHelper) {
"stop_timeout": helper.getDeleteLabel(LabelStopTimeout), "stop_timeout": helper.getDeleteLabel(LabelStopTimeout),
"stop_signal": helper.getDeleteLabel(LabelStopSignal), "stop_signal": helper.getDeleteLabel(LabelStopSignal),
"start_endpoint": helper.getDeleteLabel(LabelStartEndpoint), "start_endpoint": helper.getDeleteLabel(LabelStartEndpoint),
"depends_on": c.Dependencies(),
} }
// ensure it's deleted from labels
helper.getDeleteLabel(LabelDependsOn)
// set only if idlewatcher is enabled // set only if idlewatcher is enabled
idleTimeout := cfg["idle_timeout"] idleTimeout := cfg["idle_timeout"]
if idleTimeout != "" { if idleTimeout != "" {
idwCfg := &idlewatcher.Config{ idwCfg := new(idlewatcher.Config)
Docker: &idlewatcher.DockerConfig{ idwCfg.Docker = &idlewatcher.DockerConfig{
DockerHost: c.DockerHost, DockerHost: c.DockerHost,
ContainerID: c.ContainerID, ContainerID: c.ContainerID,
ContainerName: c.ContainerName, ContainerName: c.ContainerName,
},
} }
err := serialization.MapUnmarshalValidate(cfg, idwCfg) err := serialization.MapUnmarshalValidate(cfg, idwCfg)
if err != nil { if err != nil {
gperr.LogWarn("invalid idlewatcher config", gperr.PrependSubject(c.ContainerName, err)) gperr.LogWarn("invalid idlewatcher config", gperr.PrependSubject(c.ContainerName, err))

View file

@ -13,4 +13,5 @@ const (
LabelStopTimeout = NSProxy + ".stop_timeout" LabelStopTimeout = NSProxy + ".stop_timeout"
LabelStopSignal = NSProxy + ".stop_signal" LabelStopSignal = NSProxy + ".stop_signal"
LabelStartEndpoint = NSProxy + ".start_endpoint" LabelStartEndpoint = NSProxy + ".start_endpoint"
LabelDependsOn = NSProxy + ".depends_on"
) )

View file

@ -11,3 +11,12 @@ func (w *Watcher) cancelled(reqCtx context.Context) bool {
return false return false
} }
} }
func (w *Watcher) waitStarted(reqCtx context.Context) bool {
select {
case <-reqCtx.Done():
return false
case <-w.route.Started():
return true
}
}

View file

@ -39,3 +39,10 @@ func Watchers() iter.Seq2[string, watcherDebug] {
} }
} }
} }
func fmtErr(err error) string {
if err == nil {
return ""
}
return err.Error()
}

View file

@ -0,0 +1,65 @@
package idlewatcher
import (
"context"
"errors"
"fmt"
)
type watcherError struct {
watcher *Watcher
err error
}
func (e *watcherError) Unwrap() error {
return e.err
}
func (e *watcherError) Error() string {
return fmt.Sprintf("watcher %q error: %s", e.watcher.cfg.ContainerName(), e.err.Error())
}
func (w *Watcher) newWatcherError(err error) error {
if errors.Is(err, causeReload) {
return nil
}
if wErr, ok := err.(*watcherError); ok { //nolint:errorlint
return wErr
}
return &watcherError{watcher: w, err: convertError(err)}
}
type depError struct {
action string
dep *dependency
err error
}
func (e *depError) Unwrap() error {
return e.err
}
func (e *depError) Error() string {
return fmt.Sprintf("%s failed for dependency %q: %s", e.action, e.dep.cfg.ContainerName(), e.err.Error())
}
func (w *Watcher) newDepError(action string, dep *dependency, err error) error {
if errors.Is(err, causeReload) {
return nil
}
if dErr, ok := err.(*depError); ok { //nolint:errorlint
return dErr
}
return w.newWatcherError(&depError{action: action, dep: dep, err: convertError(err)})
}
func convertError(err error) error {
switch {
case err == nil:
return nil
case errors.Is(err, context.DeadlineExceeded):
return errors.New("timeout")
default:
return err
}
}

View file

@ -1,8 +1,6 @@
package idlewatcher package idlewatcher
import ( import (
"context"
"errors"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
@ -38,10 +36,6 @@ func (w *Watcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
return return
default: default:
f := &ForceCacheControl{expires: w.expires().Format(http.TimeFormat), ResponseWriter: rw} f := &ForceCacheControl{expires: w.expires().Format(http.TimeFormat), ResponseWriter: rw}
w, ok := watcherMap[w.Key()] // could've been reloaded
if !ok {
return
}
w.rp.ServeHTTP(f, r) w.rp.ServeHTTP(f, r)
} }
} }
@ -50,6 +44,14 @@ func isFaviconPath(path string) bool {
return path == "/favicon.ico" return path == "/favicon.ico"
} }
func (w *Watcher) redirectToStartEndpoint(rw http.ResponseWriter, r *http.Request) {
uri := "/"
if w.cfg.StartEndpoint != "" {
uri = w.cfg.StartEndpoint
}
http.Redirect(rw, r, uri, http.StatusTemporaryRedirect)
}
func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldNext bool) { func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldNext bool) {
w.resetIdleTimer() w.resetIdleTimer()
@ -92,7 +94,7 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN
ctx := r.Context() ctx := r.Context()
if w.cancelled(ctx) { if w.cancelled(ctx) {
gphttp.ServerError(rw, r, context.Cause(ctx), http.StatusServiceUnavailable) w.redirectToStartEndpoint(rw, r)
return false return false
} }
@ -103,17 +105,19 @@ func (w *Watcher) wakeFromHTTP(rw http.ResponseWriter, r *http.Request) (shouldN
return false return false
} }
var ready bool
for { for {
w.resetIdleTimer() w.resetIdleTimer()
if w.cancelled(ctx) { if w.cancelled(ctx) {
gphttp.ServerError(rw, r, context.Cause(ctx), http.StatusServiceUnavailable) w.redirectToStartEndpoint(rw, r)
return false return false
} }
w, ready, err = checkUpdateState(w.Key()) if !w.waitStarted(ctx) {
return false
}
ready, err := w.checkUpdateState()
if err != nil { if err != nil {
gphttp.ServerError(rw, r, err) gphttp.ServerError(rw, r, err)
return false return false

View file

@ -2,7 +2,6 @@ package idlewatcher
import ( import (
"context" "context"
"errors"
"net" "net"
"time" "time"
@ -53,22 +52,13 @@ func (w *Watcher) wakeFromStream() error {
} }
w.l.Debug().Msg("wake signal received") w.l.Debug().Msg("wake signal received")
err := w.wakeIfStopped() err := w.Wake(context.Background())
if err != nil { if err != nil {
return err return err
} }
ctx, cancel := context.WithTimeoutCause(w.task.Context(), w.cfg.WakeTimeout, errors.New("wake timeout"))
defer cancel()
var ready bool
for { for {
if w.cancelled(ctx) { ready, err := w.checkUpdateState()
return context.Cause(ctx)
}
w, ready, err = checkUpdateState(w.Key())
if err != nil { if err != nil {
return err return err
} }

View file

@ -1,7 +1,6 @@
package idlewatcher package idlewatcher
import ( import (
"errors"
"time" "time"
"github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/gperr"
@ -80,43 +79,6 @@ func (w *Watcher) Detail() string {
return "napping" return "napping"
} }
func checkUpdateState(key string) (w *Watcher, ready bool, err error) {
watcherMapMu.RLock()
w, ok := watcherMap[key]
if !ok {
watcherMapMu.RUnlock()
return nil, false, errors.New("watcher not found")
}
watcherMapMu.RUnlock()
// already ready
if w.ready() {
return w, true, nil
}
if !w.running() {
return w, false, nil
}
// the new container info not yet updated
if w.hc.URL().Host == "" {
return w, false, nil
}
res, err := w.hc.CheckHealth()
if err != nil {
w.setError(err)
return w, false, err
}
if res.Healthy {
w.setReady()
return w, true, nil
}
w.setStarting()
return w, false, nil
}
// MarshalJSON implements health.HealthMonitor. // MarshalJSON implements health.HealthMonitor.
func (w *Watcher) MarshalJSON() ([]byte, error) { func (w *Watcher) MarshalJSON() ([]byte, error) {
url := w.hc.URL() url := w.hc.URL()
@ -135,3 +97,32 @@ func (w *Watcher) MarshalJSON() ([]byte, error) {
Detail: detail, Detail: detail,
}).MarshalJSON() }).MarshalJSON()
} }
func (w *Watcher) checkUpdateState() (ready bool, err error) {
// already ready
if w.ready() {
return true, nil
}
if !w.running() {
return false, nil
}
// the new container info not yet updated
if w.hc.URL().Host == "" {
return false, nil
}
res, err := w.hc.CheckHealth()
if err != nil {
w.setError(err)
return false, err
}
if res.Healthy {
w.setReady()
return true, nil
}
w.setStarting()
return false, nil
}

View file

@ -10,16 +10,26 @@ import (
) )
type ( type (
Config struct { ProviderConfig struct {
Proxmox *ProxmoxConfig `json:"proxmox,omitempty"` Proxmox *ProxmoxConfig `json:"proxmox,omitempty"`
Docker *DockerConfig `json:"docker,omitempty"` Docker *DockerConfig `json:"docker,omitempty"`
}
IdlewatcherConfig struct {
// 0: no idle watcher.
// Positive: idle watcher with idle timeout.
// Negative: idle watcher as a dependency. IdleTimeout time.Duration `json:"idle_timeout" json_ext:"duration"`
IdleTimeout time.Duration `json:"idle_timeout"`
WakeTimeout time.Duration `json:"wake_timeout"`
StopTimeout time.Duration `json:"stop_timeout"`
StopMethod StopMethod `json:"stop_method"`
StopSignal Signal `json:"stop_signal,omitempty"`
}
Config struct {
ProviderConfig
IdlewatcherConfig
IdleTimeout time.Duration `json:"idle_timeout" json_ext:"duration"` StartEndpoint string `json:"start_endpoint,omitempty"` // Optional path that must be hit to start container
WakeTimeout time.Duration `json:"wake_timeout" json_ext:"duration"` DependsOn []string `json:"depends_on,omitempty"`
StopTimeout time.Duration `json:"stop_timeout" json_ext:"duration"`
StopMethod StopMethod `json:"stop_method"`
StopSignal Signal `json:"stop_signal,omitempty"`
StartEndpoint string `json:"start_endpoint,omitempty"` // Optional path that must be hit to start container
} }
StopMethod string StopMethod string
Signal string Signal string
@ -55,11 +65,11 @@ func (c *Config) ContainerName() string {
if c.Docker != nil { if c.Docker != nil {
return c.Docker.ContainerName return c.Docker.ContainerName
} }
return "lxc " + strconv.Itoa(c.Proxmox.VMID) return "lxc-" + strconv.Itoa(c.Proxmox.VMID)
} }
func (c *Config) Validate() gperr.Error { func (c *Config) Validate() gperr.Error {
if c.IdleTimeout == 0 { // no idle timeout means no idle watcher if c.IdleTimeout == 0 { // zero idle timeout means no idle watcher
return nil return nil
} }
errs := gperr.NewBuilder("idlewatcher config validation error") errs := gperr.NewBuilder("idlewatcher config validation error")

View file

@ -35,7 +35,8 @@ func TestValidateStartEndpoint(t *testing.T) {
} }
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
cfg := Config{StartEndpoint: tc.input} cfg := new(Config)
cfg.StartEndpoint = tc.input
err := cfg.validateStartEndpoint() err := cfg.validateStartEndpoint()
if err == nil { if err == nil {
expect.Equal(t, cfg.StartEndpoint, tc.input) expect.Equal(t, cfg.StartEndpoint, tc.input)

View file

@ -3,6 +3,8 @@ package idlewatcher
import ( import (
"context" "context"
"errors" "errors"
"maps"
"strings"
"sync" "sync"
"time" "time"
@ -20,10 +22,13 @@ import (
"github.com/yusing/go-proxy/internal/watcher/events" "github.com/yusing/go-proxy/internal/watcher/events"
"github.com/yusing/go-proxy/internal/watcher/health" "github.com/yusing/go-proxy/internal/watcher/health"
"github.com/yusing/go-proxy/internal/watcher/health/monitor" "github.com/yusing/go-proxy/internal/watcher/health/monitor"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
) )
type ( type (
routeHelper struct { routeHelper struct {
route routes.Route
rp *reverseproxy.ReverseProxy rp *reverseproxy.ReverseProxy
stream net.Stream stream net.Stream
hc health.HealthChecker hc health.HealthChecker
@ -48,8 +53,15 @@ type (
state atomic.Value[*containerState] state atomic.Value[*containerState]
lastReset atomic.Value[time.Time] lastReset atomic.Value[time.Time]
ticker *time.Ticker idleTicker *time.Ticker
task *task.Task task *task.Task
dependsOn []*dependency
}
dependency struct {
*Watcher
waitHealthy bool
} }
StopCallback func() error StopCallback func() error
@ -57,9 +69,12 @@ type (
const ContextKey = "idlewatcher.watcher" const ContextKey = "idlewatcher.watcher"
// TODO: replace -1 with neverTick
var ( var (
watcherMap = make(map[string]*Watcher) watcherMap = make(map[string]*Watcher)
watcherMapMu sync.RWMutex watcherMapMu sync.RWMutex
singleFlight singleflight.Group
) )
const ( const (
@ -79,51 +94,167 @@ var (
const reqTimeout = 3 * time.Second const reqTimeout = 3 * time.Second
// prevents dependencies from being stopped automatically.
const neverTick = time.Duration(1<<63 - 1)
// TODO: fix stream type. // TODO: fix stream type.
func NewWatcher(parent task.Parent, r routes.Route) (*Watcher, error) { func NewWatcher(parent task.Parent, r routes.Route, cfg *idlewatcher.Config) (*Watcher, error) {
cfg := r.IdlewatcherConfig()
key := cfg.Key() key := cfg.Key()
watcherMapMu.RLock() watcherMapMu.RLock()
// if the watcher already exists, finish it // if the watcher already exists, finish it
w, exists := watcherMap[key] w, exists := watcherMap[key]
if exists {
if w.cfg == cfg {
// same address, likely two routes from the same container
return w, nil
}
w.task.FinishAndWait(causeReload)
}
watcherMapMu.RUnlock() watcherMapMu.RUnlock()
w = &Watcher{ if exists {
ticker: time.NewTicker(cfg.IdleTimeout), if len(cfg.DependsOn) > 0 {
cfg: cfg, w.cfg.DependsOn = cfg.DependsOn
routeHelper: routeHelper{ }
hc: monitor.NewMonitor(r), if cfg.IdleTimeout > 0 {
}, w.cfg.IdlewatcherConfig = cfg.IdlewatcherConfig
}
cfg = w.cfg
w.resetIdleTimer()
} else {
w = &Watcher{
idleTicker: time.NewTicker(cfg.IdleTimeout),
cfg: cfg,
routeHelper: routeHelper{
hc: monitor.NewMonitor(r),
},
dependsOn: make([]*dependency, 0, len(cfg.DependsOn)),
}
}
depErrors := gperr.NewBuilder()
for i, dep := range cfg.DependsOn {
depSegments := strings.Split(dep, ":")
dep = depSegments[0]
if dep == "" { // empty dependency (likely stopped container), skip; it will be removed by dedupDependencies()
continue
}
cfg.DependsOn[i] = dep
waitHealthy := false
if len(depSegments) > 1 { // likely from `com.docker.compose.depends_on` label
switch depSegments[1] {
case "service_started":
case "service_healthy":
waitHealthy = true
// case "service_completed_successfully":
default:
depErrors.Addf("dependency %q has unsupported condition %q", dep, depSegments[1])
continue
}
}
cont := r.ContainerInfo()
var depRoute routes.Route
var ok bool
// try to find the dependency in the same provider and the same docker compose project first
if cont != nil {
depRoute, ok = r.GetProvider().FindService(cont.DockerComposeProject(), dep)
}
if !ok {
depRoute, ok = routes.Get(dep)
if !ok {
depErrors.Addf("dependency %q not found", dep)
continue
}
}
if depRoute == r {
depErrors.Addf("dependency %q cannot have itself as a dependency (same route)", dep)
continue
}
// wait for the dependency to be started
<-depRoute.Started()
if waitHealthy && !depRoute.UseHealthCheck() {
depErrors.Addf("dependency %q has service_healthy condition but has healthcheck disabled", dep)
continue
}
depCfg := depRoute.IdlewatcherConfig()
if depCfg == nil {
depCfg = new(idlewatcher.Config)
depCfg.IdlewatcherConfig = cfg.IdlewatcherConfig
depCfg.IdleTimeout = neverTick // disable auto sleep for dependencies
} else if depCfg.IdleTimeout > 0 {
depErrors.Addf("dependency %q has positive idle timeout %s", dep, depCfg.IdleTimeout)
continue
}
if depCfg.Docker == nil && depCfg.Proxmox == nil {
depCont := depRoute.ContainerInfo()
if depCont != nil {
depCfg.Docker = &idlewatcher.DockerConfig{
DockerHost: depCont.DockerHost,
ContainerID: depCont.ContainerID,
ContainerName: depCont.ContainerName,
}
depCfg.DependsOn = depCont.Dependencies()
} else {
depErrors.Addf("dependency %q has no idlewatcher config but is not a docker container", dep)
continue
}
}
if depCfg.Key() == cfg.Key() {
depErrors.Addf("dependency %q cannot have itself as a dependency (same container)", dep)
continue
}
depCfg.IdleTimeout = neverTick // disable auto sleep for dependencies
depWatcher, err := NewWatcher(parent, depRoute, depCfg)
if err != nil {
depErrors.Add(err)
continue
}
w.dependsOn = append(w.dependsOn, &dependency{
Watcher: depWatcher,
waitHealthy: waitHealthy,
})
}
if w.provider != nil { // it's a reload, close the old provider
w.provider.Close()
}
if depErrors.HasError() {
return nil, depErrors.Error()
}
if !exists {
watcherMapMu.Lock()
defer watcherMapMu.Unlock()
} }
var p idlewatcher.Provider var p idlewatcher.Provider
var providerType string
var err error var err error
var kind string
switch { switch {
case cfg.Docker != nil: case cfg.Docker != nil:
p, err = provider.NewDockerProvider(cfg.Docker.DockerHost, cfg.Docker.ContainerID) p, err = provider.NewDockerProvider(cfg.Docker.DockerHost, cfg.Docker.ContainerID)
providerType = "docker" kind = "docker"
default: default:
p, err = provider.NewProxmoxProvider(cfg.Proxmox.Node, cfg.Proxmox.VMID) p, err = provider.NewProxmoxProvider(cfg.Proxmox.Node, cfg.Proxmox.VMID)
providerType = "proxmox" kind = "proxmox"
} }
w.l = log.With().
Stringer("idle_timeout", cfg.IdleTimeout).
Str("kind", kind).
Str("container", cfg.ContainerName()).
Logger()
if err != nil { if err != nil {
return nil, err return nil, err
} }
w.provider = p w.provider = p
w.l = log.With().
Str("provider", providerType).
Str("container", cfg.ContainerName()).
Logger()
switch r := r.(type) { switch r := r.(type) {
case routes.ReverseProxyRoute: case routes.ReverseProxyRoute:
@ -131,18 +262,22 @@ func NewWatcher(parent task.Parent, r routes.Route) (*Watcher, error) {
case routes.StreamRoute: case routes.StreamRoute:
w.stream = r w.stream = r
default: default:
return nil, gperr.Errorf("unexpected route type: %T", r) w.provider.Close()
return nil, w.newWatcherError(gperr.Errorf("unexpected route type: %T", r))
} }
w.route = r
ctx, cancel := context.WithTimeout(parent.Context(), reqTimeout) ctx, cancel := context.WithTimeout(parent.Context(), reqTimeout)
defer cancel() defer cancel()
status, err := w.provider.ContainerStatus(ctx) status, err := w.provider.ContainerStatus(ctx)
if err != nil { if err != nil {
w.provider.Close() w.provider.Close()
return nil, gperr.Wrap(err, "failed to get container status") return nil, w.newWatcherError(err)
} }
w.state.Store(&containerState{status: status})
switch p := w.provider.(type) { // when more providers are added, we need to add a new case here.
switch p := w.provider.(type) { //nolint:gocritic
case *provider.ProxmoxProvider: case *provider.ProxmoxProvider:
shutdownTimeout := max(time.Second, cfg.StopTimeout-idleWakerCheckTimeout) shutdownTimeout := max(time.Second, cfg.StopTimeout-idleWakerCheckTimeout)
err = p.LXCSetShutdownTimeout(ctx, cfg.Proxmox.VMID, shutdownTimeout) err = p.LXCSetShutdownTimeout(ctx, cfg.Proxmox.VMID, shutdownTimeout)
@ -151,31 +286,38 @@ func NewWatcher(parent task.Parent, r routes.Route) (*Watcher, error) {
} }
} }
w.state.Store(&containerState{status: status}) if !exists {
w.task = parent.Subtask("idlewatcher."+r.Name(), true)
watcherMap[key] = w
w.task = parent.Subtask("idlewatcher."+r.Name(), true) go func() {
cause := w.watchUntilDestroy()
if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) {
watcherMapMu.Lock()
delete(watcherMap, key)
watcherMapMu.Unlock()
w.l.Info().Msg("idlewatcher stopped")
} else if !errors.Is(cause, causeReload) {
gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l)
}
watcherMapMu.Lock() w.idleTicker.Stop()
watcherMap[key] = w w.provider.Close()
watcherMapMu.Unlock() w.task.Finish(cause)
}()
}
go func() { hcCfg := w.hc.Config()
cause := w.watchUntilDestroy() hcCfg.BaseContext = func() context.Context {
if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) { return w.task.Context()
watcherMapMu.Lock() }
delete(watcherMap, key) hcCfg.Timeout = cfg.WakeTimeout
watcherMapMu.Unlock()
w.l.Info().Msg("idlewatcher stopped")
} else if !errors.Is(cause, causeReload) {
gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l)
}
w.ticker.Stop() w.dedupDependencies()
w.provider.Close()
w.task.Finish(cause) w.l = w.l.With().Strs("deps", cfg.DependsOn).Logger()
}()
if exists { if exists {
w.l.Info().Msg("idlewatcher reloaded") w.l.Debug().Msg("idlewatcher reloaded")
} else { } else {
w.l.Info().Msg("idlewatcher started") w.l.Info().Msg("idlewatcher started")
} }
@ -186,7 +328,65 @@ func (w *Watcher) Key() string {
return w.cfg.Key() return w.cfg.Key()
} }
// Wake wakes the container.
//
// It will cancel as soon as the either of the passed in context or the watcher is done.
//
// It uses singleflight to prevent multiple wake calls at the same time.
//
// It will wake the dependencies first, and then wake itself.
// If the container is already running, it will do nothing.
// If the container is not running, it will start it.
// If the container is paused, it will unpause it.
// If the container is stopped, it will do nothing.
func (w *Watcher) Wake(ctx context.Context) error { func (w *Watcher) Wake(ctx context.Context) error {
// wake dependencies first.
if err := w.wakeDependencies(ctx); err != nil {
return w.newWatcherError(err)
}
// wake itself.
// use container name instead of Key() here as the container id will change on restart (docker).
_, err, _ := singleFlight.Do(w.cfg.ContainerName(), func() (any, error) {
return nil, w.wakeIfStopped(ctx)
})
if err != nil {
return w.newWatcherError(err)
}
return nil
}
func (w *Watcher) wakeDependencies(ctx context.Context) error {
if len(w.dependsOn) == 0 {
return nil
}
errs := errgroup.Group{}
for _, dep := range w.dependsOn {
errs.Go(func() error {
if err := dep.Wake(ctx); err != nil {
return err
}
if dep.waitHealthy {
for {
select {
case <-ctx.Done():
return w.newDepError("wait_healthy", dep, context.Cause(ctx))
default:
if h, err := dep.hc.CheckHealth(); err != nil {
return err
} else if h.Healthy {
return nil
}
time.Sleep(idleWakerCheckInterval)
}
}
}
return nil
})
}
return errs.Wait()
} }
func (w *Watcher) wakeIfStopped(ctx context.Context) error { func (w *Watcher) wakeIfStopped(ctx context.Context) error {
@ -210,34 +410,66 @@ func (w *Watcher) wakeIfStopped(ctx context.Context) error {
} }
} }
func (w *Watcher) stopDependencies() error {
if len(w.dependsOn) == 0 {
return nil
}
errs := errgroup.Group{}
for _, dep := range w.dependsOn {
errs.Go(dep.stopByMethod)
}
return errs.Wait()
}
func (w *Watcher) stopByMethod() error { func (w *Watcher) stopByMethod() error {
// no need singleflight here because it will only be called once every tick.
// if the container is not running, skip and stop dependencies.
if !w.running() { if !w.running() {
if err := w.stopDependencies(); err != nil {
return w.newWatcherError(err)
}
return nil return nil
} }
cfg := w.cfg cfg := w.cfg
ctx, cancel := context.WithTimeout(w.task.Context(), cfg.StopTimeout) ctx, cancel := context.WithTimeout(context.Background(), cfg.StopTimeout)
defer cancel() defer cancel()
// stop itself first.
var err error
switch cfg.StopMethod { switch cfg.StopMethod {
case idlewatcher.StopMethodPause: case idlewatcher.StopMethodPause:
return w.provider.ContainerPause(ctx) err = w.provider.ContainerPause(ctx)
case idlewatcher.StopMethodStop: case idlewatcher.StopMethodStop:
return w.provider.ContainerStop(ctx, cfg.StopSignal, int(cfg.StopTimeout.Seconds())) err = w.provider.ContainerStop(ctx, cfg.StopSignal, int(cfg.StopTimeout.Seconds()))
case idlewatcher.StopMethodKill: case idlewatcher.StopMethodKill:
return w.provider.ContainerKill(ctx, cfg.StopSignal) err = w.provider.ContainerKill(ctx, cfg.StopSignal)
default: default:
return gperr.Errorf("unexpected stop method: %q", cfg.StopMethod) err = w.newWatcherError(gperr.Errorf("unexpected stop method: %q", cfg.StopMethod))
} }
if err != nil {
return w.newWatcherError(err)
}
w.l.Info().Msg("container stopped")
// then stop dependencies.
if err := w.stopDependencies(); err != nil {
return w.newWatcherError(err)
}
return nil
} }
func (w *Watcher) resetIdleTimer() { func (w *Watcher) resetIdleTimer() {
w.ticker.Reset(w.cfg.IdleTimeout) w.idleTicker.Reset(w.cfg.IdleTimeout)
w.lastReset.Store(time.Now()) w.lastReset.Store(time.Now())
} }
func (w *Watcher) expires() time.Time { func (w *Watcher) expires() time.Time {
if !w.running() { if !w.running() || w.cfg.IdleTimeout <= 0 {
return time.Time{} return time.Time{}
} }
return w.lastReset.Load().Add(w.cfg.IdleTimeout) return w.lastReset.Load().Add(w.cfg.IdleTimeout)
@ -278,15 +510,15 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) {
w.l.Info().Msg("awaken") w.l.Info().Msg("awaken")
case e.Action.IsContainerStop(): // stop / kill / die case e.Action.IsContainerStop(): // stop / kill / die
w.setNapping(idlewatcher.ContainerStatusStopped) w.setNapping(idlewatcher.ContainerStatusStopped)
w.ticker.Stop() w.idleTicker.Stop()
case e.Action.IsContainerPause(): // pause case e.Action.IsContainerPause(): // pause
w.setNapping(idlewatcher.ContainerStatusPaused) w.setNapping(idlewatcher.ContainerStatusPaused)
w.ticker.Stop() w.idleTicker.Stop()
default: default:
w.l.Error().Stringer("action", e.Action).Msg("unexpected container action") w.l.Debug().Stringer("action", e.Action).Msg("unexpected container action")
} }
case <-w.ticker.C: case <-w.idleTicker.C:
w.ticker.Stop() w.idleTicker.Stop()
if w.running() { if w.running() {
err := w.stopByMethod() err := w.stopByMethod()
switch { switch {
@ -298,16 +530,37 @@ func (w *Watcher) watchUntilDestroy() (returnCause error) {
} }
w.l.Err(err).Msgf("container stop with method %q failed", w.cfg.StopMethod) w.l.Err(err).Msgf("container stop with method %q failed", w.cfg.StopMethod)
default: default:
w.l.Info().Str("reason", "idle timeout").Msg("container stopped") w.l.Info().Msg("idle timeout")
} }
} }
} }
} }
} }
func fmtErr(err error) string { func (w *Watcher) dedupDependencies() {
if err == nil { // remove from dependencies if the dependency is also a dependency of another dependency, or have duplicates.
return "" deps := w.dependencies()
for _, dep := range w.dependsOn {
depdeps := dep.dependencies()
for depdep := range depdeps {
delete(deps, depdep)
}
} }
return err.Error() newDepOn := make([]string, 0, len(deps))
newDeps := make([]*dependency, 0, len(deps))
for _, dep := range deps {
newDepOn = append(newDepOn, dep.cfg.ContainerName())
newDeps = append(newDeps, dep)
}
w.cfg.DependsOn = newDepOn
w.dependsOn = newDeps
}
func (w *Watcher) dependencies() map[string]*dependency {
deps := make(map[string]*dependency)
for _, dep := range w.dependsOn {
deps[dep.Key()] = dep
maps.Copy(deps, dep.dependencies())
}
return deps
} }

View file

@ -102,7 +102,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error {
switch { switch {
case r.UseIdleWatcher(): case r.UseIdleWatcher():
waker, err := idlewatcher.NewWatcher(parent, r) waker, err := idlewatcher.NewWatcher(parent, r, r.IdlewatcherConfig())
if err != nil { if err != nil {
r.task.Finish(err) r.task.Finish(err)
return gperr.Wrap(err) return gperr.Wrap(err)

View file

@ -23,6 +23,7 @@ type (
task.TaskFinisher task.TaskFinisher
pool.Object pool.Object
ProviderName() string ProviderName() string
GetProvider() Provider
TargetURL() *net.URL TargetURL() *net.URL
HealthMonitor() health.HealthMonitor HealthMonitor() health.HealthMonitor
References() []string References() []string

View file

@ -46,7 +46,7 @@ func (r *StreamRoute) Start(parent task.Parent) gperr.Error {
switch { switch {
case r.UseIdleWatcher(): case r.UseIdleWatcher():
waker, err := idlewatcher.NewWatcher(parent, r) waker, err := idlewatcher.NewWatcher(parent, r, r.IdlewatcherConfig())
if err != nil { if err != nil {
r.task.Finish(err) r.task.Finish(err)
return gperr.Wrap(err, "idlewatcher error") return gperr.Wrap(err, "idlewatcher error")