GoDoxy/internal/idlewatcher/watcher.go

566 lines
14 KiB
Go

package idlewatcher
import (
"context"
"errors"
"maps"
"strings"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/idlewatcher/provider"
idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types"
"github.com/yusing/go-proxy/internal/net/gphttp/reverseproxy"
net "github.com/yusing/go-proxy/internal/net/types"
"github.com/yusing/go-proxy/internal/route/routes"
"github.com/yusing/go-proxy/internal/task"
U "github.com/yusing/go-proxy/internal/utils"
"github.com/yusing/go-proxy/internal/utils/atomic"
"github.com/yusing/go-proxy/internal/watcher/events"
"github.com/yusing/go-proxy/internal/watcher/health"
"github.com/yusing/go-proxy/internal/watcher/health/monitor"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
)
type (
routeHelper struct {
route routes.Route
rp *reverseproxy.ReverseProxy
stream net.Stream
hc health.HealthChecker
}
containerState struct {
status idlewatcher.ContainerStatus
ready bool
err error
}
Watcher struct {
_ U.NoCopy
routeHelper
l zerolog.Logger
cfg *idlewatcher.Config
provider idlewatcher.Provider
state atomic.Value[*containerState]
lastReset atomic.Value[time.Time]
idleTicker *time.Ticker
task *task.Task
dependsOn []*dependency
}
dependency struct {
*Watcher
waitHealthy bool
}
StopCallback func() error
)
const ContextKey = "idlewatcher.watcher"
// TODO: replace -1 with neverTick
var (
watcherMap = make(map[string]*Watcher)
watcherMapMu sync.RWMutex
singleFlight singleflight.Group
)
const (
idleWakerCheckInterval = 100 * time.Millisecond
idleWakerCheckTimeout = time.Second
)
var dummyHealthCheckConfig = &health.HealthCheckConfig{
Interval: idleWakerCheckInterval,
Timeout: idleWakerCheckTimeout,
}
var (
causeReload = gperr.New("reloaded") //nolint:errname
causeContainerDestroy = gperr.New("container destroyed") //nolint:errname
)
const reqTimeout = 3 * time.Second
// prevents dependencies from being stopped automatically.
const neverTick = time.Duration(1<<63 - 1)
// TODO: fix stream type.
func NewWatcher(parent task.Parent, r routes.Route, cfg *idlewatcher.Config) (*Watcher, error) {
key := cfg.Key()
watcherMapMu.RLock()
// if the watcher already exists, finish it
w, exists := watcherMap[key]
watcherMapMu.RUnlock()
if exists {
if len(cfg.DependsOn) > 0 {
w.cfg.DependsOn = cfg.DependsOn
}
if cfg.IdleTimeout > 0 {
w.cfg.IdlewatcherConfig = cfg.IdlewatcherConfig
}
cfg = w.cfg
w.resetIdleTimer()
} else {
w = &Watcher{
idleTicker: time.NewTicker(cfg.IdleTimeout),
cfg: cfg,
routeHelper: routeHelper{
hc: monitor.NewMonitor(r),
},
dependsOn: make([]*dependency, 0, len(cfg.DependsOn)),
}
}
depErrors := gperr.NewBuilder()
for i, dep := range cfg.DependsOn {
depSegments := strings.Split(dep, ":")
dep = depSegments[0]
if dep == "" { // empty dependency (likely stopped container), skip; it will be removed by dedupDependencies()
continue
}
cfg.DependsOn[i] = dep
waitHealthy := false
if len(depSegments) > 1 { // likely from `com.docker.compose.depends_on` label
switch depSegments[1] {
case "service_started":
case "service_healthy":
waitHealthy = true
// case "service_completed_successfully":
default:
depErrors.Addf("dependency %q has unsupported condition %q", dep, depSegments[1])
continue
}
}
cont := r.ContainerInfo()
var depRoute routes.Route
var ok bool
// try to find the dependency in the same provider and the same docker compose project first
if cont != nil {
depRoute, ok = r.GetProvider().FindService(cont.DockerComposeProject(), dep)
}
if !ok {
depRoute, ok = routes.Get(dep)
if !ok {
depErrors.Addf("dependency %q not found", dep)
continue
}
}
if depRoute == r {
depErrors.Addf("dependency %q cannot have itself as a dependency (same route)", dep)
continue
}
// wait for the dependency to be started
<-depRoute.Started()
if waitHealthy && !depRoute.UseHealthCheck() {
depErrors.Addf("dependency %q has service_healthy condition but has healthcheck disabled", dep)
continue
}
depCfg := depRoute.IdlewatcherConfig()
if depCfg == nil {
depCfg = new(idlewatcher.Config)
depCfg.IdlewatcherConfig = cfg.IdlewatcherConfig
depCfg.IdleTimeout = neverTick // disable auto sleep for dependencies
} else if depCfg.IdleTimeout > 0 {
depErrors.Addf("dependency %q has positive idle timeout %s", dep, depCfg.IdleTimeout)
continue
}
if depCfg.Docker == nil && depCfg.Proxmox == nil {
depCont := depRoute.ContainerInfo()
if depCont != nil {
depCfg.Docker = &idlewatcher.DockerConfig{
DockerHost: depCont.DockerHost,
ContainerID: depCont.ContainerID,
ContainerName: depCont.ContainerName,
}
depCfg.DependsOn = depCont.Dependencies()
} else {
depErrors.Addf("dependency %q has no idlewatcher config but is not a docker container", dep)
continue
}
}
if depCfg.Key() == cfg.Key() {
depErrors.Addf("dependency %q cannot have itself as a dependency (same container)", dep)
continue
}
depCfg.IdleTimeout = neverTick // disable auto sleep for dependencies
depWatcher, err := NewWatcher(parent, depRoute, depCfg)
if err != nil {
depErrors.Add(err)
continue
}
w.dependsOn = append(w.dependsOn, &dependency{
Watcher: depWatcher,
waitHealthy: waitHealthy,
})
}
if w.provider != nil { // it's a reload, close the old provider
w.provider.Close()
}
if depErrors.HasError() {
return nil, depErrors.Error()
}
if !exists {
watcherMapMu.Lock()
defer watcherMapMu.Unlock()
}
var p idlewatcher.Provider
var err error
var kind string
switch {
case cfg.Docker != nil:
p, err = provider.NewDockerProvider(cfg.Docker.DockerHost, cfg.Docker.ContainerID)
kind = "docker"
default:
p, err = provider.NewProxmoxProvider(cfg.Proxmox.Node, cfg.Proxmox.VMID)
kind = "proxmox"
}
w.l = log.With().
Stringer("idle_timeout", cfg.IdleTimeout).
Str("kind", kind).
Str("container", cfg.ContainerName()).
Logger()
if err != nil {
return nil, err
}
w.provider = p
switch r := r.(type) {
case routes.ReverseProxyRoute:
w.rp = r.ReverseProxy()
case routes.StreamRoute:
w.stream = r
default:
w.provider.Close()
return nil, w.newWatcherError(gperr.Errorf("unexpected route type: %T", r))
}
w.route = r
ctx, cancel := context.WithTimeout(parent.Context(), reqTimeout)
defer cancel()
status, err := w.provider.ContainerStatus(ctx)
if err != nil {
w.provider.Close()
return nil, w.newWatcherError(err)
}
w.state.Store(&containerState{status: status})
// when more providers are added, we need to add a new case here.
switch p := w.provider.(type) { //nolint:gocritic
case *provider.ProxmoxProvider:
shutdownTimeout := max(time.Second, cfg.StopTimeout-idleWakerCheckTimeout)
err = p.LXCSetShutdownTimeout(ctx, cfg.Proxmox.VMID, shutdownTimeout)
if err != nil {
w.l.Warn().Err(err).Msg("failed to set shutdown timeout")
}
}
if !exists {
w.task = parent.Subtask("idlewatcher."+r.Name(), true)
watcherMap[key] = w
go func() {
cause := w.watchUntilDestroy()
if errors.Is(cause, causeContainerDestroy) || errors.Is(cause, task.ErrProgramExiting) {
watcherMapMu.Lock()
delete(watcherMap, key)
watcherMapMu.Unlock()
w.l.Info().Msg("idlewatcher stopped")
} else if !errors.Is(cause, causeReload) {
gperr.LogError("idlewatcher stopped unexpectedly", cause, &w.l)
}
w.idleTicker.Stop()
w.provider.Close()
w.task.Finish(cause)
}()
}
hcCfg := w.hc.Config()
hcCfg.BaseContext = func() context.Context {
return w.task.Context()
}
hcCfg.Timeout = cfg.WakeTimeout
w.dedupDependencies()
w.l = w.l.With().Strs("deps", cfg.DependsOn).Logger()
if exists {
w.l.Debug().Msg("idlewatcher reloaded")
} else {
w.l.Info().Msg("idlewatcher started")
}
return w, nil
}
func (w *Watcher) Key() string {
return w.cfg.Key()
}
// Wake wakes the container.
//
// It will cancel as soon as the either of the passed in context or the watcher is done.
//
// It uses singleflight to prevent multiple wake calls at the same time.
//
// It will wake the dependencies first, and then wake itself.
// If the container is already running, it will do nothing.
// If the container is not running, it will start it.
// If the container is paused, it will unpause it.
// If the container is stopped, it will do nothing.
func (w *Watcher) Wake(ctx context.Context) error {
// wake dependencies first.
if err := w.wakeDependencies(ctx); err != nil {
return w.newWatcherError(err)
}
// wake itself.
// use container name instead of Key() here as the container id will change on restart (docker).
_, err, _ := singleFlight.Do(w.cfg.ContainerName(), func() (any, error) {
return nil, w.wakeIfStopped(ctx)
})
if err != nil {
return w.newWatcherError(err)
}
return nil
}
func (w *Watcher) wakeDependencies(ctx context.Context) error {
if len(w.dependsOn) == 0 {
return nil
}
errs := errgroup.Group{}
for _, dep := range w.dependsOn {
errs.Go(func() error {
if err := dep.Wake(ctx); err != nil {
return err
}
if dep.waitHealthy {
for {
select {
case <-ctx.Done():
return w.newDepError("wait_healthy", dep, context.Cause(ctx))
default:
if h, err := dep.hc.CheckHealth(); err != nil {
return err
} else if h.Healthy {
return nil
}
time.Sleep(idleWakerCheckInterval)
}
}
}
return nil
})
}
return errs.Wait()
}
func (w *Watcher) wakeIfStopped(ctx context.Context) error {
state := w.state.Load()
if state.status == idlewatcher.ContainerStatusRunning {
w.l.Debug().Msg("container is already running")
return nil
}
ctx, cancel := context.WithTimeout(ctx, w.cfg.WakeTimeout)
defer cancel()
switch state.status {
case idlewatcher.ContainerStatusStopped:
w.l.Info().Msg("starting container")
return w.provider.ContainerStart(ctx)
case idlewatcher.ContainerStatusPaused:
w.l.Info().Msg("unpausing container")
return w.provider.ContainerUnpause(ctx)
default:
return gperr.Errorf("unexpected container status: %s", state.status)
}
}
func (w *Watcher) stopDependencies() error {
if len(w.dependsOn) == 0 {
return nil
}
errs := errgroup.Group{}
for _, dep := range w.dependsOn {
errs.Go(dep.stopByMethod)
}
return errs.Wait()
}
func (w *Watcher) stopByMethod() error {
// no need singleflight here because it will only be called once every tick.
// if the container is not running, skip and stop dependencies.
if !w.running() {
if err := w.stopDependencies(); err != nil {
return w.newWatcherError(err)
}
return nil
}
cfg := w.cfg
ctx, cancel := context.WithTimeout(context.Background(), cfg.StopTimeout)
defer cancel()
// stop itself first.
var err error
switch cfg.StopMethod {
case idlewatcher.StopMethodPause:
err = w.provider.ContainerPause(ctx)
case idlewatcher.StopMethodStop:
err = w.provider.ContainerStop(ctx, cfg.StopSignal, int(cfg.StopTimeout.Seconds()))
case idlewatcher.StopMethodKill:
err = w.provider.ContainerKill(ctx, cfg.StopSignal)
default:
err = w.newWatcherError(gperr.Errorf("unexpected stop method: %q", cfg.StopMethod))
}
if err != nil {
return w.newWatcherError(err)
}
w.l.Info().Msg("container stopped")
// then stop dependencies.
if err := w.stopDependencies(); err != nil {
return w.newWatcherError(err)
}
return nil
}
func (w *Watcher) resetIdleTimer() {
w.idleTicker.Reset(w.cfg.IdleTimeout)
w.lastReset.Store(time.Now())
}
func (w *Watcher) expires() time.Time {
if !w.running() || w.cfg.IdleTimeout <= 0 {
return time.Time{}
}
return w.lastReset.Load().Add(w.cfg.IdleTimeout)
}
// watchUntilDestroy waits for the container to be created, started, or unpaused,
// and then reset the idle timer.
//
// When the container is stopped, paused,
// or killed, the idle timer is stopped and the ContainerRunning flag is set to false.
//
// When the idle timer fires, the container is stopped according to the
// stop method.
//
// it exits only if the context is canceled, the container is destroyed,
// errors occurred on docker client, or route provider died (mainly caused by config reload).
func (w *Watcher) watchUntilDestroy() (returnCause error) {
eventCh, errCh := w.provider.Watch(w.Task().Context())
for {
select {
case <-w.task.Context().Done():
return gperr.Wrap(w.task.FinishCause())
case err := <-errCh:
gperr.LogError("watcher error", err, &w.l)
case e := <-eventCh:
w.l.Debug().Stringer("action", e.Action).Msg("state changed")
switch e.Action {
case events.ActionContainerDestroy:
return causeContainerDestroy
case events.ActionForceReload:
continue
}
w.resetIdleTimer()
switch {
case e.Action.IsContainerStart(): // create / start / unpause
w.setStarting()
w.l.Info().Msg("awaken")
case e.Action.IsContainerStop(): // stop / kill / die
w.setNapping(idlewatcher.ContainerStatusStopped)
w.idleTicker.Stop()
case e.Action.IsContainerPause(): // pause
w.setNapping(idlewatcher.ContainerStatusPaused)
w.idleTicker.Stop()
default:
w.l.Debug().Stringer("action", e.Action).Msg("unexpected container action")
}
case <-w.idleTicker.C:
w.idleTicker.Stop()
if w.running() {
err := w.stopByMethod()
switch {
case errors.Is(err, context.Canceled):
continue
case err != nil:
if errors.Is(err, context.DeadlineExceeded) {
err = errors.New("timeout waiting for container to stop, please set a higher value for `stop_timeout`")
}
w.l.Err(err).Msgf("container stop with method %q failed", w.cfg.StopMethod)
default:
w.l.Info().Msg("idle timeout")
}
}
}
}
}
func (w *Watcher) dedupDependencies() {
// remove from dependencies if the dependency is also a dependency of another dependency, or have duplicates.
deps := w.dependencies()
for _, dep := range w.dependsOn {
depdeps := dep.dependencies()
for depdep := range depdeps {
delete(deps, depdep)
}
}
newDepOn := make([]string, 0, len(deps))
newDeps := make([]*dependency, 0, len(deps))
for _, dep := range deps {
newDepOn = append(newDepOn, dep.cfg.ContainerName())
newDeps = append(newDeps, dep)
}
w.cfg.DependsOn = newDepOn
w.dependsOn = newDeps
}
func (w *Watcher) dependencies() map[string]*dependency {
deps := make(map[string]*dependency)
for _, dep := range w.dependsOn {
deps[dep.Key()] = dep
maps.Copy(deps, dep.dependencies())
}
return deps
}