From f38b3abdbc5bd10e2f13ee1a5b5e854f61066a88 Mon Sep 17 00:00:00 2001 From: yusing Date: Mon, 14 Oct 2024 10:02:53 +0800 Subject: [PATCH] improved health check --- internal/api/v1/checkhealth.go | 8 +- internal/docker/client.go | 3 +- internal/docker/idlewatcher/waker.go | 88 +++++++++++++------ internal/net/http/loadbalancer/ip_hash.go | 4 +- internal/net/http/loadbalancer/least_conn.go | 2 +- .../net/http/loadbalancer/loadbalancer.go | 17 +++- .../http/loadbalancer/loadbalancer_test.go | 6 +- internal/net/http/loadbalancer/round_robin.go | 2 +- internal/net/http/loadbalancer/server.go | 13 ++- internal/proxy/entry.go | 44 ++++++---- internal/route/http.go | 86 +++++++++--------- internal/utils/atomic.go | 30 +++++++ internal/utils/functional/map_utils.go | 8 -- internal/utils/serialization.go | 8 ++ internal/watcher/docker_watcher.go | 13 ++- internal/watcher/health/healthcheck_config.go | 4 +- internal/watcher/health/http.go | 6 +- internal/watcher/health/monitor.go | 82 +++++++++++------ internal/watcher/health/raw.go | 6 +- internal/watcher/health/status.go | 48 ++++++++++ 20 files changed, 323 insertions(+), 155 deletions(-) create mode 100644 internal/utils/atomic.go delete mode 100644 internal/utils/functional/map_utils.go create mode 100644 internal/watcher/health/status.go diff --git a/internal/api/v1/checkhealth.go b/internal/api/v1/checkhealth.go index 94534f7..453ee02 100644 --- a/internal/api/v1/checkhealth.go +++ b/internal/api/v1/checkhealth.go @@ -15,14 +15,10 @@ func CheckHealth(cfg *config.Config, w http.ResponseWriter, r *http.Request) { return } - isHealthy, ok := health.IsHealthy(target) + status, ok := health.Inspect(target) if !ok { HandleErr(w, r, ErrNotFound("target", target), http.StatusNotFound) return } - if isHealthy { - w.WriteHeader(http.StatusOK) - } else { - w.WriteHeader(http.StatusServiceUnavailable) - } + WriteBody(w, []byte(status.String())) } diff --git a/internal/docker/client.go b/internal/docker/client.go index 074d730..5f28fd1 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -14,9 +14,10 @@ import ( ) type Client struct { + *client.Client + key string refCount *atomic.Int32 - *client.Client l logrus.FieldLogger } diff --git a/internal/docker/idlewatcher/waker.go b/internal/docker/idlewatcher/waker.go index da9f94e..466768a 100644 --- a/internal/docker/idlewatcher/waker.go +++ b/internal/docker/idlewatcher/waker.go @@ -2,12 +2,14 @@ package idlewatcher import ( "context" - "fmt" + "encoding/json" "net/http" "strconv" "time" + "github.com/sirupsen/logrus" gphttp "github.com/yusing/go-proxy/internal/net/http" + "github.com/yusing/go-proxy/internal/watcher/health" ) type Waker struct { @@ -18,20 +20,6 @@ type Waker struct { } func NewWaker(w *Watcher, rp *gphttp.ReverseProxy) *Waker { - orig := rp.ServeHTTP - // workaround for stopped containers port become zero - rp.ServeHTTP = func(rw http.ResponseWriter, r *http.Request) { - if rp.TargetURL.Port() == "0" { - port, ok := portHistoryMap.Load(w.Alias) - if !ok { - w.l.Errorf("port history not found for %s", w.Alias) - http.Error(rw, "internal server error", http.StatusInternalServerError) - return - } - rp.TargetURL.Host = fmt.Sprintf("%s:%v", rp.TargetURL.Hostname(), port) - } - orig(rw, r) - } return &Waker{ Watcher: w, client: &http.Client{ @@ -43,16 +31,67 @@ func NewWaker(w *Watcher, rp *gphttp.ReverseProxy) *Waker { } func (w *Waker) ServeHTTP(rw http.ResponseWriter, r *http.Request) { - w.wake(w.rp.ServeHTTP, rw, r) + shouldNext := w.wake(rw, r) + if !shouldNext { + return + } + w.rp.ServeHTTP(rw, r) } -func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Request) { +/* HealthMonitor interface */ + +func (w *Waker) Start() {} + +func (w *Waker) Stop() { + w.Unregister() +} + +func (w *Waker) UpdateConfig(config health.HealthCheckConfig) { + panic("use idlewatcher.Register instead") +} + +func (w *Waker) Name() string { + return w.String() +} + +func (w *Waker) String() string { + return string(w.Alias) +} + +func (w *Waker) Status() health.Status { + if w.ready.Load() { + return health.StatusHealthy + } + if !w.ContainerRunning { + return health.StatusNapping + } + return health.StatusStarting +} + +func (w *Waker) Uptime() time.Duration { + return 0 +} + +func (w *Waker) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]any{ + "name": w.Name(), + "url": w.URL, + "status": w.Status(), + "config": health.HealthCheckConfig{ + Interval: w.IdleTimeout, + Timeout: w.WakeTimeout, + }, + }) +} + +/* End of HealthMonitor interface */ + +func (w *Waker) wake(rw http.ResponseWriter, r *http.Request) (shouldNext bool) { w.resetIdleTimer() // pass through if container is ready if w.ready.Load() { - next(rw, r) - return + return true } ctx, cancel := context.WithTimeout(r.Context(), w.WakeTimeout) @@ -89,10 +128,9 @@ func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Requ if w.ready.Load() { if isCheckRedirect { rw.WriteHeader(http.StatusOK) - } else { - next(rw, r) + return } - return + return true } for { @@ -121,10 +159,10 @@ func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Requ w.l.Debug("awaken") if isCheckRedirect { rw.WriteHeader(http.StatusOK) - } else { - next(rw, r) + return } - return + logrus.Infof("container %s is ready, passing through to %s", w.Alias, w.rp.TargetURL) + return true } // retry until the container is ready or timeout diff --git a/internal/net/http/loadbalancer/ip_hash.go b/internal/net/http/loadbalancer/ip_hash.go index a48caf7..da32778 100644 --- a/internal/net/http/loadbalancer/ip_hash.go +++ b/internal/net/http/loadbalancer/ip_hash.go @@ -45,10 +45,10 @@ func (impl ipHash) serveHTTP(rw http.ResponseWriter, r *http.Request) { return } idx := hashIP(ip) % uint32(len(impl.pool)) - if !impl.pool[idx].IsHealthy() { + if impl.pool[idx].Status().Bad() { http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) } - impl.pool[idx].handler.ServeHTTP(rw, r) + impl.pool[idx].ServeHTTP(rw, r) } func hashIP(ip string) uint32 { diff --git a/internal/net/http/loadbalancer/least_conn.go b/internal/net/http/loadbalancer/least_conn.go index 3c1b872..2ca9794 100644 --- a/internal/net/http/loadbalancer/least_conn.go +++ b/internal/net/http/loadbalancer/least_conn.go @@ -48,6 +48,6 @@ func (impl *leastConn) ServeHTTP(srvs servers, rw http.ResponseWriter, r *http.R } minConn.Add(1) - srv.handler.ServeHTTP(rw, r) + srv.ServeHTTP(rw, r) minConn.Add(-1) } diff --git a/internal/net/http/loadbalancer/loadbalancer.go b/internal/net/http/loadbalancer/loadbalancer.go index c6b82a3..3a86bfd 100644 --- a/internal/net/http/loadbalancer/loadbalancer.go +++ b/internal/net/http/loadbalancer/loadbalancer.go @@ -3,6 +3,7 @@ package loadbalancer import ( "net/http" "sync" + "time" "github.com/go-acme/lego/v4/log" E "github.com/yusing/go-proxy/internal/error" @@ -25,12 +26,13 @@ type ( } LoadBalancer struct { impl - Config + *Config pool servers poolMu sync.Mutex sumWeight weightType + startTime time.Time } weightType uint16 @@ -38,7 +40,7 @@ type ( const maxWeight weightType = 100 -func New(cfg Config) *LoadBalancer { +func New(cfg *Config) *LoadBalancer { lb := &LoadBalancer{Config: cfg, pool: servers{}} mode := cfg.Mode if !cfg.Mode.ValidateUpdate() { @@ -167,6 +169,8 @@ func (lb *LoadBalancer) Start() { if lb.sumWeight != 0 { log.Warnf("weighted mode not supported yet") } + + lb.startTime = time.Now() logger.Debugf("loadbalancer %s started", lb.Link) } @@ -178,15 +182,20 @@ func (lb *LoadBalancer) Stop() { logger.Debugf("loadbalancer %s stopped", lb.Link) } +func (lb *LoadBalancer) Uptime() time.Duration { + return time.Since(lb.startTime) +} + func (lb *LoadBalancer) availServers() servers { lb.poolMu.Lock() defer lb.poolMu.Unlock() avail := make(servers, 0, len(lb.pool)) for _, s := range lb.pool { - if s.IsHealthy() { - avail = append(avail, s) + if s.Status().Bad() { + continue } + avail = append(avail, s) } return avail } diff --git a/internal/net/http/loadbalancer/loadbalancer_test.go b/internal/net/http/loadbalancer/loadbalancer_test.go index 4b5f9ec..7b1a043 100644 --- a/internal/net/http/loadbalancer/loadbalancer_test.go +++ b/internal/net/http/loadbalancer/loadbalancer_test.go @@ -9,7 +9,7 @@ import ( func TestRebalance(t *testing.T) { t.Parallel() t.Run("zero", func(t *testing.T) { - lb := New(Config{}) + lb := New(new(Config)) for range 10 { lb.AddServer(&Server{}) } @@ -17,7 +17,7 @@ func TestRebalance(t *testing.T) { ExpectEqual(t, lb.sumWeight, maxWeight) }) t.Run("less", func(t *testing.T) { - lb := New(Config{}) + lb := New(new(Config)) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .1)}) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .2)}) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .3)}) @@ -28,7 +28,7 @@ func TestRebalance(t *testing.T) { ExpectEqual(t, lb.sumWeight, maxWeight) }) t.Run("more", func(t *testing.T) { - lb := New(Config{}) + lb := New(new(Config)) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .1)}) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .2)}) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .3)}) diff --git a/internal/net/http/loadbalancer/round_robin.go b/internal/net/http/loadbalancer/round_robin.go index 3db994a..557d4e3 100644 --- a/internal/net/http/loadbalancer/round_robin.go +++ b/internal/net/http/loadbalancer/round_robin.go @@ -15,7 +15,7 @@ func (lb *roundRobin) OnRemoveServer(srv *Server) {} func (lb *roundRobin) ServeHTTP(srvs servers, rw http.ResponseWriter, r *http.Request) { index := lb.index.Add(1) - srvs[index%uint32(len(srvs))].handler.ServeHTTP(rw, r) + srvs[index%uint32(len(srvs))].ServeHTTP(rw, r) if lb.index.Load() >= 2*uint32(len(srvs)) { lb.index.Store(0) } diff --git a/internal/net/http/loadbalancer/server.go b/internal/net/http/loadbalancer/server.go index 8376a52..45a02d3 100644 --- a/internal/net/http/loadbalancer/server.go +++ b/internal/net/http/loadbalancer/server.go @@ -2,6 +2,7 @@ package loadbalancer import ( "net/http" + "time" "github.com/yusing/go-proxy/internal/net/types" U "github.com/yusing/go-proxy/internal/utils" @@ -33,10 +34,18 @@ func NewServer(name string, url types.URL, weight weightType, handler http.Handl return srv } +func (srv *Server) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + srv.handler.ServeHTTP(rw, r) +} + func (srv *Server) String() string { return srv.Name } -func (srv *Server) IsHealthy() bool { - return srv.healthMon.IsHealthy() +func (srv *Server) Status() health.Status { + return srv.healthMon.Status() +} + +func (srv *Server) Uptime() time.Duration { + return srv.healthMon.Uptime() } diff --git a/internal/proxy/entry.go b/internal/proxy/entry.go index d3ce683..97bbb12 100644 --- a/internal/proxy/entry.go +++ b/internal/proxy/entry.go @@ -16,37 +16,41 @@ import ( type ( ReverseProxyEntry struct { // real model after validation - Alias T.Alias `json:"alias"` - Scheme T.Scheme `json:"scheme"` - URL net.URL `json:"url"` - NoTLSVerify bool `json:"no_tls_verify"` - PathPatterns T.PathPatterns `json:"path_patterns"` - HealthCheck health.HealthCheckConfig `json:"healthcheck"` - LoadBalance loadbalancer.Config `json:"load_balance"` - Middlewares D.NestedLabelMap `json:"middlewares"` + Alias T.Alias `json:"alias"` + Scheme T.Scheme `json:"scheme"` + URL net.URL `json:"url"` + NoTLSVerify bool `json:"no_tls_verify,omitempty"` + PathPatterns T.PathPatterns `json:"path_patterns"` + HealthCheck *health.HealthCheckConfig `json:"healthcheck"` + LoadBalance *loadbalancer.Config `json:"load_balance,omitempty"` + Middlewares D.NestedLabelMap `json:"middlewares,omitempty"` /* Docker only */ IdleTimeout time.Duration `json:"idle_timeout"` WakeTimeout time.Duration `json:"wake_timeout"` StopMethod T.StopMethod `json:"stop_method"` StopTimeout int `json:"stop_timeout"` - StopSignal T.Signal `json:"stop_signal"` + StopSignal T.Signal `json:"stop_signal,omitempty"` DockerHost string `json:"docker_host"` ContainerName string `json:"container_name"` ContainerID string `json:"container_id"` ContainerRunning bool `json:"container_running"` } StreamEntry struct { - Alias T.Alias `json:"alias"` - Scheme T.StreamScheme `json:"scheme"` - Host T.Host `json:"host"` - Port T.StreamPort `json:"port"` - Healthcheck health.HealthCheckConfig `json:"healthcheck"` + Alias T.Alias `json:"alias"` + Scheme T.StreamScheme `json:"scheme"` + Host T.Host `json:"host"` + Port T.StreamPort `json:"port"` + Healthcheck *health.HealthCheckConfig `json:"healthcheck"` } ) func (rp *ReverseProxyEntry) UseIdleWatcher() bool { - return rp.IdleTimeout > 0 && rp.DockerHost != "" + return rp.IdleTimeout > 0 && rp.IsDocker() +} + +func (rp *ReverseProxyEntry) UseLoadBalance() bool { + return rp.LoadBalance.Link != "" } func (rp *ReverseProxyEntry) IsDocker() bool { @@ -57,6 +61,10 @@ func (rp *ReverseProxyEntry) IsZeroPort() bool { return rp.URL.Port() == "0" } +func (rp *ReverseProxyEntry) ShouldNotServe() bool { + return rp.IsZeroPort() && !rp.UseIdleWatcher() +} + func ValidateEntry(m *types.RawEntry) (any, E.NestedError) { m.FillMissingFields() @@ -120,8 +128,8 @@ func validateRPEntry(m *types.RawEntry, s T.Scheme, b E.Builder) *ReverseProxyEn URL: net.NewURL(url), NoTLSVerify: m.NoTLSVerify, PathPatterns: pathPatterns, - HealthCheck: m.HealthCheck, - LoadBalance: m.LoadBalance, + HealthCheck: &m.HealthCheck, + LoadBalance: &m.LoadBalance, Middlewares: m.Middlewares, IdleTimeout: idleTimeout, WakeTimeout: wakeTimeout, @@ -154,6 +162,6 @@ func validateStreamEntry(m *types.RawEntry, b E.Builder) *StreamEntry { Scheme: *scheme, Host: host, Port: port, - Healthcheck: m.HealthCheck, + Healthcheck: &m.HealthCheck, } } diff --git a/internal/route/http.go b/internal/route/http.go index 8654135..904b8cc 100755 --- a/internal/route/http.go +++ b/internal/route/http.go @@ -24,13 +24,14 @@ import ( type ( HTTPRoute struct { - *P.ReverseProxyEntry - LoadBalancer *loadbalancer.LoadBalancer `json:"load_balancer"` + *P.ReverseProxyEntry `json:"entry"` - healthMon health.HealthMonitor - server *loadbalancer.Server - handler http.Handler - rp *gphttp.ReverseProxy + LoadBalancer *loadbalancer.LoadBalancer `json:"load_balancer,omitempty"` + HealthMon health.HealthMonitor `json:"health"` + + server *loadbalancer.Server + handler http.Handler + rp *gphttp.ReverseProxy } SubdomainKey = PT.Alias @@ -89,17 +90,6 @@ func NewHTTPRoute(entry *P.ReverseProxyEntry) (*HTTPRoute, E.NestedError) { ReverseProxyEntry: entry, rp: rp, } - if entry.LoadBalance.Link != "" && entry.HealthCheck.Disabled { - logrus.Warnf("%s.healthCheck.disabled cannot be false when loadbalancer is enabled", entry.Alias) - entry.HealthCheck.Disabled = true - } - if !entry.HealthCheck.Disabled { - r.healthMon = health.NewHTTPHealthMonitor( - common.GlobalTask("Reverse proxy "+r.String()), - entry.URL, - entry.HealthCheck, - ) - } return r, nil } @@ -116,19 +106,30 @@ func (r *HTTPRoute) Start() E.NestedError { return nil } + if r.ShouldNotServe() { + return nil + } + httpRoutesMu.Lock() defer httpRoutesMu.Unlock() + if r.HealthCheck.Disabled && (r.UseIdleWatcher() || r.UseLoadBalance()) { + logrus.Warnf("%s.healthCheck.disabled cannot be false when loadbalancer or idlewatcher is enabled", r.Alias) + r.HealthCheck.Disabled = true + } + switch { case r.UseIdleWatcher(): watcher, err := idlewatcher.Register(r.ReverseProxyEntry) if err != nil { return err } - r.handler = idlewatcher.NewWaker(watcher, r.rp) - case r.IsZeroPort() || - r.IsDocker() && !r.ContainerRunning: - return nil + waker := idlewatcher.NewWaker(watcher, r.rp) + r.handler = waker + r.HealthMon = waker + case !r.HealthCheck.Disabled: + r.HealthMon = health.NewHTTPHealthMonitor(common.GlobalTask("Reverse proxy "+r.String()), r.URL(), r.HealthCheck) + fallthrough case len(r.PathPatterns) == 1 && r.PathPatterns[0] == "/": r.handler = ReverseProxyHandler{r.rp} default: @@ -139,14 +140,14 @@ func (r *HTTPRoute) Start() E.NestedError { r.handler = mux } - if r.LoadBalance.Link == "" { - httpRoutes.Store(string(r.Alias), r) - } else { + if r.UseLoadBalance() { r.addToLoadBalancer() + } else { + httpRoutes.Store(string(r.Alias), r) } - if r.healthMon != nil { - r.healthMon.Start() + if r.HealthMon != nil { + r.HealthMon.Start() } return nil } @@ -159,25 +160,15 @@ func (r *HTTPRoute) Stop() (_ E.NestedError) { httpRoutesMu.Lock() defer httpRoutesMu.Unlock() - if waker, ok := r.handler.(*idlewatcher.Waker); ok { - waker.Unregister() - } - - if r.server != nil { - linked, ok := httpRoutes.Load(r.LoadBalance.Link) - if ok { - linked.LoadBalancer.RemoveServer(r.server) - } - if linked.LoadBalancer.IsEmpty() { - httpRoutes.Delete(r.LoadBalance.Link) - } - r.server = nil + if r.LoadBalancer != nil { + r.removeFromLoadBalancer() } else { httpRoutes.Delete(string(r.Alias)) } - if r.healthMon != nil { - r.healthMon.Stop() + if r.HealthMon != nil { + r.HealthMon.Stop() + r.HealthMon = nil } r.handler = nil @@ -203,10 +194,21 @@ func (r *HTTPRoute) addToLoadBalancer() { } httpRoutes.Store(r.LoadBalance.Link, linked) } - r.server = loadbalancer.NewServer(string(r.Alias), r.rp.TargetURL, r.LoadBalance.Weight, r.handler, r.healthMon) + r.LoadBalancer = lb + r.server = loadbalancer.NewServer(string(r.Alias), r.rp.TargetURL, r.LoadBalance.Weight, r.handler, r.HealthMon) lb.AddServer(r.server) } +func (r *HTTPRoute) removeFromLoadBalancer() { + r.LoadBalancer.RemoveServer(r.server) + if r.LoadBalancer.IsEmpty() { + httpRoutes.Delete(r.LoadBalance.Link) + logrus.Debugf("loadbalancer %q removed from route table", r.LoadBalance.Link) + } + r.server = nil + r.LoadBalancer = nil +} + func ProxyHandler(w http.ResponseWriter, r *http.Request) { mux, err := findMuxFunc(r.Host) // Why use StatusNotFound instead of StatusBadRequest or StatusBadGateway? diff --git a/internal/utils/atomic.go b/internal/utils/atomic.go new file mode 100644 index 0000000..a2c37b3 --- /dev/null +++ b/internal/utils/atomic.go @@ -0,0 +1,30 @@ +package utils + +import ( + "encoding/json" + "sync/atomic" +) + +type AtomicValue[T any] struct { + atomic.Value +} + +func (a *AtomicValue[T]) Load() T { + return a.Value.Load().(T) +} + +func (a *AtomicValue[T]) Store(v T) { + a.Value.Store(v) +} + +func (a *AtomicValue[T]) Swap(v T) T { + return a.Value.Swap(v).(T) +} + +func (a *AtomicValue[T]) CompareAndSwap(oldV, newV T) bool { + return a.Value.CompareAndSwap(oldV, newV) +} + +func (a *AtomicValue[T]) MarshalJSON() ([]byte, error) { + return json.Marshal(a.Load()) +} diff --git a/internal/utils/functional/map_utils.go b/internal/utils/functional/map_utils.go deleted file mode 100644 index 41e94da..0000000 --- a/internal/utils/functional/map_utils.go +++ /dev/null @@ -1,8 +0,0 @@ -package functional - -func FirstValueOf[KT comparable, VT any](m map[KT]VT) (_ VT, ok bool) { - for _, v := range m { - return v, true - } - return -} diff --git a/internal/utils/serialization.go b/internal/utils/serialization.go index 577b69c..f35823a 100644 --- a/internal/utils/serialization.go +++ b/internal/utils/serialization.go @@ -98,6 +98,14 @@ func Serialize(data any) (SerializedObject, E.NestedError) { if jsonTag == "-" { continue // Ignore this field if the tag is "-" } + if strings.Contains(jsonTag, ",omitempty") { + if field.Type.Kind() == reflect.Ptr && value.Field(i).IsNil() { + continue + } + if value.Field(i).IsZero() { + continue + } + } // If the json tag is not empty, use it as the key switch { diff --git a/internal/watcher/docker_watcher.go b/internal/watcher/docker_watcher.go index 07b03c1..32c0c15 100644 --- a/internal/watcher/docker_watcher.go +++ b/internal/watcher/docker_watcher.go @@ -43,10 +43,11 @@ func DockerrFilterContainer(nameOrID string) filters.KeyValuePair { func NewDockerWatcher(host string) DockerWatcher { return DockerWatcher{ - host: host, FieldLogger: (logrus. WithField("module", "docker_watcher"). - WithField("host", host))} + WithField("host", host)), + host: host, + } } func NewDockerWatcherWithClient(client D.Client) DockerWatcher { @@ -65,8 +66,6 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList eventCh := make(chan Event) errCh := make(chan E.NestedError) - eventsCtx, eventsCancel := context.WithCancel(ctx) - go func() { defer close(eventCh) defer close(errCh) @@ -100,7 +99,7 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList w.Debugf("client connected") - cEventCh, cErrCh := w.client.Events(eventsCtx, options) + cEventCh, cErrCh := w.client.Events(ctx, options) w.Debugf("watcher started") @@ -134,9 +133,7 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList case <-ctx.Done(): return default: - eventsCancel() time.Sleep(dockerWatcherRetryInterval) - eventsCtx, eventsCancel = context.WithCancel(ctx) cEventCh, cErrCh = w.client.Events(ctx, options) } } @@ -149,6 +146,6 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList var optionsWatchAll = DockerListOptions{Filters: NewDockerFilter( DockerFilterContainer, DockerFilterStart, - DockerFilterStop, + // DockerFilterStop, DockerFilterDie, )} diff --git a/internal/watcher/health/healthcheck_config.go b/internal/watcher/health/healthcheck_config.go index 86a512e..31e0043 100644 --- a/internal/watcher/health/healthcheck_config.go +++ b/internal/watcher/health/healthcheck_config.go @@ -8,8 +8,8 @@ import ( type HealthCheckConfig struct { Disabled bool `json:"disabled" yaml:"disabled"` - Path string `json:"path" yaml:"path"` - UseGet bool `json:"use_get" yaml:"use_get"` + Path string `json:"path,omitempty" yaml:"path"` + UseGet bool `json:"use_get,omitempty" yaml:"use_get"` Interval time.Duration `json:"interval" yaml:"interval"` Timeout time.Duration `json:"timeout" yaml:"timeout"` } diff --git a/internal/watcher/health/http.go b/internal/watcher/health/http.go index c4cd267..2c490e8 100644 --- a/internal/watcher/health/http.go +++ b/internal/watcher/health/http.go @@ -15,9 +15,9 @@ type HTTPHealthMonitor struct { pinger *http.Client } -func NewHTTPHealthMonitor(task common.Task, url types.URL, config HealthCheckConfig) HealthMonitor { +func NewHTTPHealthMonitor(task common.Task, url types.URL, config *HealthCheckConfig) HealthMonitor { mon := new(HTTPHealthMonitor) - mon.monitor = newMonitor(task, url, &config, mon.checkHealth) + mon.monitor = newMonitor(task, url, config, mon.checkHealth) mon.pinger = &http.Client{Timeout: config.Timeout} if config.UseGet { mon.method = http.MethodGet @@ -31,7 +31,7 @@ func (mon *HTTPHealthMonitor) checkHealth() (healthy bool, detail string, err er req, reqErr := http.NewRequestWithContext( mon.task.Context(), mon.method, - mon.URL.String(), + mon.url.JoinPath(mon.config.Path).String(), nil, ) if reqErr != nil { diff --git a/internal/watcher/health/monitor.go b/internal/watcher/health/monitor.go index 56b093d..ab62dde 100644 --- a/internal/watcher/health/monitor.go +++ b/internal/watcher/health/monitor.go @@ -2,13 +2,14 @@ package health import ( "context" + "encoding/json" "errors" "sync" - "sync/atomic" "time" "github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/net/types" + U "github.com/yusing/go-proxy/internal/utils" F "github.com/yusing/go-proxy/internal/utils/functional" ) @@ -16,17 +17,20 @@ type ( HealthMonitor interface { Start() Stop() - IsHealthy() bool + Status() Status + Uptime() time.Duration + Name() string String() string + MarshalJSON() ([]byte, error) } HealthCheckFunc func() (healthy bool, detail string, err error) monitor struct { - Name string - URL types.URL - Interval time.Duration + config *HealthCheckConfig + url types.URL - healthy atomic.Bool + status U.AtomicValue[Status] checkHealth HealthCheckFunc + startTime time.Time task common.Task cancel context.CancelFunc @@ -41,29 +45,29 @@ var monMap = F.NewMapOf[string, HealthMonitor]() func newMonitor(task common.Task, url types.URL, config *HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor { task, cancel := task.SubtaskWithCancel("Health monitor for %s", task.Name()) mon := &monitor{ - Name: task.Name(), - URL: url.JoinPath(config.Path), - Interval: config.Interval, + config: config, + url: url, checkHealth: healthCheckFunc, + startTime: time.Now(), task: task, cancel: cancel, done: make(chan struct{}), } - mon.healthy.Store(true) + mon.status.Store(StatusHealthy) return mon } -func IsHealthy(name string) (healthy bool, ok bool) { +func Inspect(name string) (status Status, ok bool) { mon, ok := monMap.Load(name) if !ok { return } - return mon.IsHealthy(), true + return mon.Status(), true } func (mon *monitor) Start() { - defer monMap.Store(mon.Name, mon) - defer logger.Debugf("%s health monitor started", mon) + defer monMap.Store(mon.task.Name(), mon) + defer logger.Debugf("%s health monitor started", mon.String()) go func() { defer close(mon.done) @@ -74,7 +78,7 @@ func (mon *monitor) Start() { return } - ticker := time.NewTicker(mon.Interval) + ticker := time.NewTicker(mon.config.Interval) defer ticker.Stop() for { @@ -89,13 +93,13 @@ func (mon *monitor) Start() { } } }() - logger.Debugf("health monitor %q started", mon) + logger.Debugf("health monitor %q started", mon.String()) } func (mon *monitor) Stop() { - defer logger.Debugf("%s health monitor stopped", mon) + defer logger.Debugf("%s health monitor stopped", mon.String()) - monMap.Delete(mon.Name) + monMap.Delete(mon.task.Name()) mon.mu.Lock() defer mon.mu.Unlock() @@ -108,31 +112,57 @@ func (mon *monitor) Stop() { <-mon.done mon.cancel = nil + mon.status.Store(StatusUnknown) } -func (mon *monitor) IsHealthy() bool { - return mon.healthy.Load() +func (mon *monitor) Status() Status { + return mon.status.Load() +} + +func (mon *monitor) Uptime() time.Duration { + return time.Since(mon.startTime) +} + +func (mon *monitor) Name() string { + return mon.task.Name() } func (mon *monitor) String() string { - return mon.Name + return mon.Name() +} + +func (mon *monitor) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]any{ + "name": mon.Name(), + "url": mon.url, + "status": mon.status.Load(), + "uptime": mon.Uptime().String(), + "started": mon.startTime.Unix(), + "config": mon.config, + }) } func (mon *monitor) checkUpdateHealth() (hasError bool) { healthy, detail, err := mon.checkHealth() if err != nil { - mon.healthy.Store(false) + mon.status.Store(StatusError) if !errors.Is(err, context.Canceled) { - logger.Errorf("server %q failed to check health: %s", mon, err) + logger.Errorf("%s failed to check health: %s", mon.String(), err) } mon.Stop() return false } - if healthy != mon.healthy.Swap(healthy) { + var status Status + if healthy { + status = StatusHealthy + } else { + status = StatusUnhealthy + } + if healthy != (mon.status.Swap(status) == StatusHealthy) { if healthy { - logger.Infof("server %q is up", mon) + logger.Infof("%s is up", mon.String()) } else { - logger.Warnf("server %q is down: %s", mon, detail) + logger.Warnf("%s is down: %s", mon.String(), detail) } } diff --git a/internal/watcher/health/raw.go b/internal/watcher/health/raw.go index e3fb447..b45d4d8 100644 --- a/internal/watcher/health/raw.go +++ b/internal/watcher/health/raw.go @@ -14,9 +14,9 @@ type ( } ) -func NewRawHealthMonitor(task common.Task, url types.URL, config HealthCheckConfig) HealthMonitor { +func NewRawHealthMonitor(task common.Task, url types.URL, config *HealthCheckConfig) HealthMonitor { mon := new(RawHealthMonitor) - mon.monitor = newMonitor(task, url, &config, mon.checkAvail) + mon.monitor = newMonitor(task, url, config, mon.checkAvail) mon.dialer = &net.Dialer{ Timeout: config.Timeout, FallbackDelay: -1, @@ -25,7 +25,7 @@ func NewRawHealthMonitor(task common.Task, url types.URL, config HealthCheckConf } func (mon *RawHealthMonitor) checkAvail() (avail bool, detail string, err error) { - conn, dialErr := mon.dialer.DialContext(mon.task.Context(), mon.URL.Scheme, mon.URL.Host) + conn, dialErr := mon.dialer.DialContext(mon.task.Context(), mon.url.Scheme, mon.url.Host) if dialErr != nil { detail = dialErr.Error() /* trunk-ignore(golangci-lint/nilerr) */ diff --git a/internal/watcher/health/status.go b/internal/watcher/health/status.go new file mode 100644 index 0000000..3c30c9d --- /dev/null +++ b/internal/watcher/health/status.go @@ -0,0 +1,48 @@ +package health + +import "encoding/json" + +type Status int + +const ( + StatusUnknown Status = (iota << 1) + + StatusHealthy + StatusNapping + StatusStarting + StatusUnhealthy + StatusError + + NumStatuses int = iota - 1 + + HealthyMask = StatusHealthy | StatusNapping | StatusStarting +) + +func (s Status) String() string { + switch s { + case StatusHealthy: + return "healthy" + case StatusUnhealthy: + return "unhealthy" + case StatusNapping: + return "napping" + case StatusStarting: + return "starting" + case StatusError: + return "error" + default: + return "unknown" + } +} + +func (s Status) MarshalJSON() ([]byte, error) { + return json.Marshal(s.String()) +} + +func (s Status) Good() bool { + return s&HealthyMask != 0 +} + +func (s Status) Bad() bool { + return s&HealthyMask == 0 +}