improved health check

This commit is contained in:
yusing 2024-10-14 10:02:53 +08:00
parent 99207ae606
commit f38b3abdbc
20 changed files with 323 additions and 155 deletions

View file

@ -15,14 +15,10 @@ func CheckHealth(cfg *config.Config, w http.ResponseWriter, r *http.Request) {
return return
} }
isHealthy, ok := health.IsHealthy(target) status, ok := health.Inspect(target)
if !ok { if !ok {
HandleErr(w, r, ErrNotFound("target", target), http.StatusNotFound) HandleErr(w, r, ErrNotFound("target", target), http.StatusNotFound)
return return
} }
if isHealthy { WriteBody(w, []byte(status.String()))
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
} }

View file

@ -14,9 +14,10 @@ import (
) )
type Client struct { type Client struct {
*client.Client
key string key string
refCount *atomic.Int32 refCount *atomic.Int32
*client.Client
l logrus.FieldLogger l logrus.FieldLogger
} }

View file

@ -2,12 +2,14 @@ package idlewatcher
import ( import (
"context" "context"
"fmt" "encoding/json"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
"github.com/sirupsen/logrus"
gphttp "github.com/yusing/go-proxy/internal/net/http" gphttp "github.com/yusing/go-proxy/internal/net/http"
"github.com/yusing/go-proxy/internal/watcher/health"
) )
type Waker struct { type Waker struct {
@ -18,20 +20,6 @@ type Waker struct {
} }
func NewWaker(w *Watcher, rp *gphttp.ReverseProxy) *Waker { func NewWaker(w *Watcher, rp *gphttp.ReverseProxy) *Waker {
orig := rp.ServeHTTP
// workaround for stopped containers port become zero
rp.ServeHTTP = func(rw http.ResponseWriter, r *http.Request) {
if rp.TargetURL.Port() == "0" {
port, ok := portHistoryMap.Load(w.Alias)
if !ok {
w.l.Errorf("port history not found for %s", w.Alias)
http.Error(rw, "internal server error", http.StatusInternalServerError)
return
}
rp.TargetURL.Host = fmt.Sprintf("%s:%v", rp.TargetURL.Hostname(), port)
}
orig(rw, r)
}
return &Waker{ return &Waker{
Watcher: w, Watcher: w,
client: &http.Client{ client: &http.Client{
@ -43,16 +31,67 @@ func NewWaker(w *Watcher, rp *gphttp.ReverseProxy) *Waker {
} }
func (w *Waker) ServeHTTP(rw http.ResponseWriter, r *http.Request) { func (w *Waker) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
w.wake(w.rp.ServeHTTP, rw, r) shouldNext := w.wake(rw, r)
if !shouldNext {
return
}
w.rp.ServeHTTP(rw, r)
} }
func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Request) { /* HealthMonitor interface */
func (w *Waker) Start() {}
func (w *Waker) Stop() {
w.Unregister()
}
func (w *Waker) UpdateConfig(config health.HealthCheckConfig) {
panic("use idlewatcher.Register instead")
}
func (w *Waker) Name() string {
return w.String()
}
func (w *Waker) String() string {
return string(w.Alias)
}
func (w *Waker) Status() health.Status {
if w.ready.Load() {
return health.StatusHealthy
}
if !w.ContainerRunning {
return health.StatusNapping
}
return health.StatusStarting
}
func (w *Waker) Uptime() time.Duration {
return 0
}
func (w *Waker) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]any{
"name": w.Name(),
"url": w.URL,
"status": w.Status(),
"config": health.HealthCheckConfig{
Interval: w.IdleTimeout,
Timeout: w.WakeTimeout,
},
})
}
/* End of HealthMonitor interface */
func (w *Waker) wake(rw http.ResponseWriter, r *http.Request) (shouldNext bool) {
w.resetIdleTimer() w.resetIdleTimer()
// pass through if container is ready // pass through if container is ready
if w.ready.Load() { if w.ready.Load() {
next(rw, r) return true
return
} }
ctx, cancel := context.WithTimeout(r.Context(), w.WakeTimeout) ctx, cancel := context.WithTimeout(r.Context(), w.WakeTimeout)
@ -89,10 +128,9 @@ func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Requ
if w.ready.Load() { if w.ready.Load() {
if isCheckRedirect { if isCheckRedirect {
rw.WriteHeader(http.StatusOK) rw.WriteHeader(http.StatusOK)
} else { return
next(rw, r)
} }
return return true
} }
for { for {
@ -121,10 +159,10 @@ func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Requ
w.l.Debug("awaken") w.l.Debug("awaken")
if isCheckRedirect { if isCheckRedirect {
rw.WriteHeader(http.StatusOK) rw.WriteHeader(http.StatusOK)
} else { return
next(rw, r)
} }
return logrus.Infof("container %s is ready, passing through to %s", w.Alias, w.rp.TargetURL)
return true
} }
// retry until the container is ready or timeout // retry until the container is ready or timeout

View file

@ -45,10 +45,10 @@ func (impl ipHash) serveHTTP(rw http.ResponseWriter, r *http.Request) {
return return
} }
idx := hashIP(ip) % uint32(len(impl.pool)) idx := hashIP(ip) % uint32(len(impl.pool))
if !impl.pool[idx].IsHealthy() { if impl.pool[idx].Status().Bad() {
http.Error(rw, "Service unavailable", http.StatusServiceUnavailable) http.Error(rw, "Service unavailable", http.StatusServiceUnavailable)
} }
impl.pool[idx].handler.ServeHTTP(rw, r) impl.pool[idx].ServeHTTP(rw, r)
} }
func hashIP(ip string) uint32 { func hashIP(ip string) uint32 {

View file

@ -48,6 +48,6 @@ func (impl *leastConn) ServeHTTP(srvs servers, rw http.ResponseWriter, r *http.R
} }
minConn.Add(1) minConn.Add(1)
srv.handler.ServeHTTP(rw, r) srv.ServeHTTP(rw, r)
minConn.Add(-1) minConn.Add(-1)
} }

View file

@ -3,6 +3,7 @@ package loadbalancer
import ( import (
"net/http" "net/http"
"sync" "sync"
"time"
"github.com/go-acme/lego/v4/log" "github.com/go-acme/lego/v4/log"
E "github.com/yusing/go-proxy/internal/error" E "github.com/yusing/go-proxy/internal/error"
@ -25,12 +26,13 @@ type (
} }
LoadBalancer struct { LoadBalancer struct {
impl impl
Config *Config
pool servers pool servers
poolMu sync.Mutex poolMu sync.Mutex
sumWeight weightType sumWeight weightType
startTime time.Time
} }
weightType uint16 weightType uint16
@ -38,7 +40,7 @@ type (
const maxWeight weightType = 100 const maxWeight weightType = 100
func New(cfg Config) *LoadBalancer { func New(cfg *Config) *LoadBalancer {
lb := &LoadBalancer{Config: cfg, pool: servers{}} lb := &LoadBalancer{Config: cfg, pool: servers{}}
mode := cfg.Mode mode := cfg.Mode
if !cfg.Mode.ValidateUpdate() { if !cfg.Mode.ValidateUpdate() {
@ -167,6 +169,8 @@ func (lb *LoadBalancer) Start() {
if lb.sumWeight != 0 { if lb.sumWeight != 0 {
log.Warnf("weighted mode not supported yet") log.Warnf("weighted mode not supported yet")
} }
lb.startTime = time.Now()
logger.Debugf("loadbalancer %s started", lb.Link) logger.Debugf("loadbalancer %s started", lb.Link)
} }
@ -178,15 +182,20 @@ func (lb *LoadBalancer) Stop() {
logger.Debugf("loadbalancer %s stopped", lb.Link) logger.Debugf("loadbalancer %s stopped", lb.Link)
} }
func (lb *LoadBalancer) Uptime() time.Duration {
return time.Since(lb.startTime)
}
func (lb *LoadBalancer) availServers() servers { func (lb *LoadBalancer) availServers() servers {
lb.poolMu.Lock() lb.poolMu.Lock()
defer lb.poolMu.Unlock() defer lb.poolMu.Unlock()
avail := make(servers, 0, len(lb.pool)) avail := make(servers, 0, len(lb.pool))
for _, s := range lb.pool { for _, s := range lb.pool {
if s.IsHealthy() { if s.Status().Bad() {
avail = append(avail, s) continue
} }
avail = append(avail, s)
} }
return avail return avail
} }

View file

@ -9,7 +9,7 @@ import (
func TestRebalance(t *testing.T) { func TestRebalance(t *testing.T) {
t.Parallel() t.Parallel()
t.Run("zero", func(t *testing.T) { t.Run("zero", func(t *testing.T) {
lb := New(Config{}) lb := New(new(Config))
for range 10 { for range 10 {
lb.AddServer(&Server{}) lb.AddServer(&Server{})
} }
@ -17,7 +17,7 @@ func TestRebalance(t *testing.T) {
ExpectEqual(t, lb.sumWeight, maxWeight) ExpectEqual(t, lb.sumWeight, maxWeight)
}) })
t.Run("less", func(t *testing.T) { t.Run("less", func(t *testing.T) {
lb := New(Config{}) lb := New(new(Config))
lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .1)}) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .1)})
lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .2)}) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .2)})
lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .3)}) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .3)})
@ -28,7 +28,7 @@ func TestRebalance(t *testing.T) {
ExpectEqual(t, lb.sumWeight, maxWeight) ExpectEqual(t, lb.sumWeight, maxWeight)
}) })
t.Run("more", func(t *testing.T) { t.Run("more", func(t *testing.T) {
lb := New(Config{}) lb := New(new(Config))
lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .1)}) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .1)})
lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .2)}) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .2)})
lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .3)}) lb.AddServer(&Server{Weight: weightType(float64(maxWeight) * .3)})

View file

@ -15,7 +15,7 @@ func (lb *roundRobin) OnRemoveServer(srv *Server) {}
func (lb *roundRobin) ServeHTTP(srvs servers, rw http.ResponseWriter, r *http.Request) { func (lb *roundRobin) ServeHTTP(srvs servers, rw http.ResponseWriter, r *http.Request) {
index := lb.index.Add(1) index := lb.index.Add(1)
srvs[index%uint32(len(srvs))].handler.ServeHTTP(rw, r) srvs[index%uint32(len(srvs))].ServeHTTP(rw, r)
if lb.index.Load() >= 2*uint32(len(srvs)) { if lb.index.Load() >= 2*uint32(len(srvs)) {
lb.index.Store(0) lb.index.Store(0)
} }

View file

@ -2,6 +2,7 @@ package loadbalancer
import ( import (
"net/http" "net/http"
"time"
"github.com/yusing/go-proxy/internal/net/types" "github.com/yusing/go-proxy/internal/net/types"
U "github.com/yusing/go-proxy/internal/utils" U "github.com/yusing/go-proxy/internal/utils"
@ -33,10 +34,18 @@ func NewServer(name string, url types.URL, weight weightType, handler http.Handl
return srv return srv
} }
func (srv *Server) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
srv.handler.ServeHTTP(rw, r)
}
func (srv *Server) String() string { func (srv *Server) String() string {
return srv.Name return srv.Name
} }
func (srv *Server) IsHealthy() bool { func (srv *Server) Status() health.Status {
return srv.healthMon.IsHealthy() return srv.healthMon.Status()
}
func (srv *Server) Uptime() time.Duration {
return srv.healthMon.Uptime()
} }

View file

@ -16,37 +16,41 @@ import (
type ( type (
ReverseProxyEntry struct { // real model after validation ReverseProxyEntry struct { // real model after validation
Alias T.Alias `json:"alias"` Alias T.Alias `json:"alias"`
Scheme T.Scheme `json:"scheme"` Scheme T.Scheme `json:"scheme"`
URL net.URL `json:"url"` URL net.URL `json:"url"`
NoTLSVerify bool `json:"no_tls_verify"` NoTLSVerify bool `json:"no_tls_verify,omitempty"`
PathPatterns T.PathPatterns `json:"path_patterns"` PathPatterns T.PathPatterns `json:"path_patterns"`
HealthCheck health.HealthCheckConfig `json:"healthcheck"` HealthCheck *health.HealthCheckConfig `json:"healthcheck"`
LoadBalance loadbalancer.Config `json:"load_balance"` LoadBalance *loadbalancer.Config `json:"load_balance,omitempty"`
Middlewares D.NestedLabelMap `json:"middlewares"` Middlewares D.NestedLabelMap `json:"middlewares,omitempty"`
/* Docker only */ /* Docker only */
IdleTimeout time.Duration `json:"idle_timeout"` IdleTimeout time.Duration `json:"idle_timeout"`
WakeTimeout time.Duration `json:"wake_timeout"` WakeTimeout time.Duration `json:"wake_timeout"`
StopMethod T.StopMethod `json:"stop_method"` StopMethod T.StopMethod `json:"stop_method"`
StopTimeout int `json:"stop_timeout"` StopTimeout int `json:"stop_timeout"`
StopSignal T.Signal `json:"stop_signal"` StopSignal T.Signal `json:"stop_signal,omitempty"`
DockerHost string `json:"docker_host"` DockerHost string `json:"docker_host"`
ContainerName string `json:"container_name"` ContainerName string `json:"container_name"`
ContainerID string `json:"container_id"` ContainerID string `json:"container_id"`
ContainerRunning bool `json:"container_running"` ContainerRunning bool `json:"container_running"`
} }
StreamEntry struct { StreamEntry struct {
Alias T.Alias `json:"alias"` Alias T.Alias `json:"alias"`
Scheme T.StreamScheme `json:"scheme"` Scheme T.StreamScheme `json:"scheme"`
Host T.Host `json:"host"` Host T.Host `json:"host"`
Port T.StreamPort `json:"port"` Port T.StreamPort `json:"port"`
Healthcheck health.HealthCheckConfig `json:"healthcheck"` Healthcheck *health.HealthCheckConfig `json:"healthcheck"`
} }
) )
func (rp *ReverseProxyEntry) UseIdleWatcher() bool { func (rp *ReverseProxyEntry) UseIdleWatcher() bool {
return rp.IdleTimeout > 0 && rp.DockerHost != "" return rp.IdleTimeout > 0 && rp.IsDocker()
}
func (rp *ReverseProxyEntry) UseLoadBalance() bool {
return rp.LoadBalance.Link != ""
} }
func (rp *ReverseProxyEntry) IsDocker() bool { func (rp *ReverseProxyEntry) IsDocker() bool {
@ -57,6 +61,10 @@ func (rp *ReverseProxyEntry) IsZeroPort() bool {
return rp.URL.Port() == "0" return rp.URL.Port() == "0"
} }
func (rp *ReverseProxyEntry) ShouldNotServe() bool {
return rp.IsZeroPort() && !rp.UseIdleWatcher()
}
func ValidateEntry(m *types.RawEntry) (any, E.NestedError) { func ValidateEntry(m *types.RawEntry) (any, E.NestedError) {
m.FillMissingFields() m.FillMissingFields()
@ -120,8 +128,8 @@ func validateRPEntry(m *types.RawEntry, s T.Scheme, b E.Builder) *ReverseProxyEn
URL: net.NewURL(url), URL: net.NewURL(url),
NoTLSVerify: m.NoTLSVerify, NoTLSVerify: m.NoTLSVerify,
PathPatterns: pathPatterns, PathPatterns: pathPatterns,
HealthCheck: m.HealthCheck, HealthCheck: &m.HealthCheck,
LoadBalance: m.LoadBalance, LoadBalance: &m.LoadBalance,
Middlewares: m.Middlewares, Middlewares: m.Middlewares,
IdleTimeout: idleTimeout, IdleTimeout: idleTimeout,
WakeTimeout: wakeTimeout, WakeTimeout: wakeTimeout,
@ -154,6 +162,6 @@ func validateStreamEntry(m *types.RawEntry, b E.Builder) *StreamEntry {
Scheme: *scheme, Scheme: *scheme,
Host: host, Host: host,
Port: port, Port: port,
Healthcheck: m.HealthCheck, Healthcheck: &m.HealthCheck,
} }
} }

View file

@ -24,13 +24,14 @@ import (
type ( type (
HTTPRoute struct { HTTPRoute struct {
*P.ReverseProxyEntry *P.ReverseProxyEntry `json:"entry"`
LoadBalancer *loadbalancer.LoadBalancer `json:"load_balancer"`
healthMon health.HealthMonitor LoadBalancer *loadbalancer.LoadBalancer `json:"load_balancer,omitempty"`
server *loadbalancer.Server HealthMon health.HealthMonitor `json:"health"`
handler http.Handler
rp *gphttp.ReverseProxy server *loadbalancer.Server
handler http.Handler
rp *gphttp.ReverseProxy
} }
SubdomainKey = PT.Alias SubdomainKey = PT.Alias
@ -89,17 +90,6 @@ func NewHTTPRoute(entry *P.ReverseProxyEntry) (*HTTPRoute, E.NestedError) {
ReverseProxyEntry: entry, ReverseProxyEntry: entry,
rp: rp, rp: rp,
} }
if entry.LoadBalance.Link != "" && entry.HealthCheck.Disabled {
logrus.Warnf("%s.healthCheck.disabled cannot be false when loadbalancer is enabled", entry.Alias)
entry.HealthCheck.Disabled = true
}
if !entry.HealthCheck.Disabled {
r.healthMon = health.NewHTTPHealthMonitor(
common.GlobalTask("Reverse proxy "+r.String()),
entry.URL,
entry.HealthCheck,
)
}
return r, nil return r, nil
} }
@ -116,19 +106,30 @@ func (r *HTTPRoute) Start() E.NestedError {
return nil return nil
} }
if r.ShouldNotServe() {
return nil
}
httpRoutesMu.Lock() httpRoutesMu.Lock()
defer httpRoutesMu.Unlock() defer httpRoutesMu.Unlock()
if r.HealthCheck.Disabled && (r.UseIdleWatcher() || r.UseLoadBalance()) {
logrus.Warnf("%s.healthCheck.disabled cannot be false when loadbalancer or idlewatcher is enabled", r.Alias)
r.HealthCheck.Disabled = true
}
switch { switch {
case r.UseIdleWatcher(): case r.UseIdleWatcher():
watcher, err := idlewatcher.Register(r.ReverseProxyEntry) watcher, err := idlewatcher.Register(r.ReverseProxyEntry)
if err != nil { if err != nil {
return err return err
} }
r.handler = idlewatcher.NewWaker(watcher, r.rp) waker := idlewatcher.NewWaker(watcher, r.rp)
case r.IsZeroPort() || r.handler = waker
r.IsDocker() && !r.ContainerRunning: r.HealthMon = waker
return nil case !r.HealthCheck.Disabled:
r.HealthMon = health.NewHTTPHealthMonitor(common.GlobalTask("Reverse proxy "+r.String()), r.URL(), r.HealthCheck)
fallthrough
case len(r.PathPatterns) == 1 && r.PathPatterns[0] == "/": case len(r.PathPatterns) == 1 && r.PathPatterns[0] == "/":
r.handler = ReverseProxyHandler{r.rp} r.handler = ReverseProxyHandler{r.rp}
default: default:
@ -139,14 +140,14 @@ func (r *HTTPRoute) Start() E.NestedError {
r.handler = mux r.handler = mux
} }
if r.LoadBalance.Link == "" { if r.UseLoadBalance() {
httpRoutes.Store(string(r.Alias), r)
} else {
r.addToLoadBalancer() r.addToLoadBalancer()
} else {
httpRoutes.Store(string(r.Alias), r)
} }
if r.healthMon != nil { if r.HealthMon != nil {
r.healthMon.Start() r.HealthMon.Start()
} }
return nil return nil
} }
@ -159,25 +160,15 @@ func (r *HTTPRoute) Stop() (_ E.NestedError) {
httpRoutesMu.Lock() httpRoutesMu.Lock()
defer httpRoutesMu.Unlock() defer httpRoutesMu.Unlock()
if waker, ok := r.handler.(*idlewatcher.Waker); ok { if r.LoadBalancer != nil {
waker.Unregister() r.removeFromLoadBalancer()
}
if r.server != nil {
linked, ok := httpRoutes.Load(r.LoadBalance.Link)
if ok {
linked.LoadBalancer.RemoveServer(r.server)
}
if linked.LoadBalancer.IsEmpty() {
httpRoutes.Delete(r.LoadBalance.Link)
}
r.server = nil
} else { } else {
httpRoutes.Delete(string(r.Alias)) httpRoutes.Delete(string(r.Alias))
} }
if r.healthMon != nil { if r.HealthMon != nil {
r.healthMon.Stop() r.HealthMon.Stop()
r.HealthMon = nil
} }
r.handler = nil r.handler = nil
@ -203,10 +194,21 @@ func (r *HTTPRoute) addToLoadBalancer() {
} }
httpRoutes.Store(r.LoadBalance.Link, linked) httpRoutes.Store(r.LoadBalance.Link, linked)
} }
r.server = loadbalancer.NewServer(string(r.Alias), r.rp.TargetURL, r.LoadBalance.Weight, r.handler, r.healthMon) r.LoadBalancer = lb
r.server = loadbalancer.NewServer(string(r.Alias), r.rp.TargetURL, r.LoadBalance.Weight, r.handler, r.HealthMon)
lb.AddServer(r.server) lb.AddServer(r.server)
} }
func (r *HTTPRoute) removeFromLoadBalancer() {
r.LoadBalancer.RemoveServer(r.server)
if r.LoadBalancer.IsEmpty() {
httpRoutes.Delete(r.LoadBalance.Link)
logrus.Debugf("loadbalancer %q removed from route table", r.LoadBalance.Link)
}
r.server = nil
r.LoadBalancer = nil
}
func ProxyHandler(w http.ResponseWriter, r *http.Request) { func ProxyHandler(w http.ResponseWriter, r *http.Request) {
mux, err := findMuxFunc(r.Host) mux, err := findMuxFunc(r.Host)
// Why use StatusNotFound instead of StatusBadRequest or StatusBadGateway? // Why use StatusNotFound instead of StatusBadRequest or StatusBadGateway?

30
internal/utils/atomic.go Normal file
View file

@ -0,0 +1,30 @@
package utils
import (
"encoding/json"
"sync/atomic"
)
type AtomicValue[T any] struct {
atomic.Value
}
func (a *AtomicValue[T]) Load() T {
return a.Value.Load().(T)
}
func (a *AtomicValue[T]) Store(v T) {
a.Value.Store(v)
}
func (a *AtomicValue[T]) Swap(v T) T {
return a.Value.Swap(v).(T)
}
func (a *AtomicValue[T]) CompareAndSwap(oldV, newV T) bool {
return a.Value.CompareAndSwap(oldV, newV)
}
func (a *AtomicValue[T]) MarshalJSON() ([]byte, error) {
return json.Marshal(a.Load())
}

View file

@ -1,8 +0,0 @@
package functional
func FirstValueOf[KT comparable, VT any](m map[KT]VT) (_ VT, ok bool) {
for _, v := range m {
return v, true
}
return
}

View file

@ -98,6 +98,14 @@ func Serialize(data any) (SerializedObject, E.NestedError) {
if jsonTag == "-" { if jsonTag == "-" {
continue // Ignore this field if the tag is "-" continue // Ignore this field if the tag is "-"
} }
if strings.Contains(jsonTag, ",omitempty") {
if field.Type.Kind() == reflect.Ptr && value.Field(i).IsNil() {
continue
}
if value.Field(i).IsZero() {
continue
}
}
// If the json tag is not empty, use it as the key // If the json tag is not empty, use it as the key
switch { switch {

View file

@ -43,10 +43,11 @@ func DockerrFilterContainer(nameOrID string) filters.KeyValuePair {
func NewDockerWatcher(host string) DockerWatcher { func NewDockerWatcher(host string) DockerWatcher {
return DockerWatcher{ return DockerWatcher{
host: host,
FieldLogger: (logrus. FieldLogger: (logrus.
WithField("module", "docker_watcher"). WithField("module", "docker_watcher").
WithField("host", host))} WithField("host", host)),
host: host,
}
} }
func NewDockerWatcherWithClient(client D.Client) DockerWatcher { func NewDockerWatcherWithClient(client D.Client) DockerWatcher {
@ -65,8 +66,6 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
eventCh := make(chan Event) eventCh := make(chan Event)
errCh := make(chan E.NestedError) errCh := make(chan E.NestedError)
eventsCtx, eventsCancel := context.WithCancel(ctx)
go func() { go func() {
defer close(eventCh) defer close(eventCh)
defer close(errCh) defer close(errCh)
@ -100,7 +99,7 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
w.Debugf("client connected") w.Debugf("client connected")
cEventCh, cErrCh := w.client.Events(eventsCtx, options) cEventCh, cErrCh := w.client.Events(ctx, options)
w.Debugf("watcher started") w.Debugf("watcher started")
@ -134,9 +133,7 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
eventsCancel()
time.Sleep(dockerWatcherRetryInterval) time.Sleep(dockerWatcherRetryInterval)
eventsCtx, eventsCancel = context.WithCancel(ctx)
cEventCh, cErrCh = w.client.Events(ctx, options) cEventCh, cErrCh = w.client.Events(ctx, options)
} }
} }
@ -149,6 +146,6 @@ func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerList
var optionsWatchAll = DockerListOptions{Filters: NewDockerFilter( var optionsWatchAll = DockerListOptions{Filters: NewDockerFilter(
DockerFilterContainer, DockerFilterContainer,
DockerFilterStart, DockerFilterStart,
DockerFilterStop, // DockerFilterStop,
DockerFilterDie, DockerFilterDie,
)} )}

View file

@ -8,8 +8,8 @@ import (
type HealthCheckConfig struct { type HealthCheckConfig struct {
Disabled bool `json:"disabled" yaml:"disabled"` Disabled bool `json:"disabled" yaml:"disabled"`
Path string `json:"path" yaml:"path"` Path string `json:"path,omitempty" yaml:"path"`
UseGet bool `json:"use_get" yaml:"use_get"` UseGet bool `json:"use_get,omitempty" yaml:"use_get"`
Interval time.Duration `json:"interval" yaml:"interval"` Interval time.Duration `json:"interval" yaml:"interval"`
Timeout time.Duration `json:"timeout" yaml:"timeout"` Timeout time.Duration `json:"timeout" yaml:"timeout"`
} }

View file

@ -15,9 +15,9 @@ type HTTPHealthMonitor struct {
pinger *http.Client pinger *http.Client
} }
func NewHTTPHealthMonitor(task common.Task, url types.URL, config HealthCheckConfig) HealthMonitor { func NewHTTPHealthMonitor(task common.Task, url types.URL, config *HealthCheckConfig) HealthMonitor {
mon := new(HTTPHealthMonitor) mon := new(HTTPHealthMonitor)
mon.monitor = newMonitor(task, url, &config, mon.checkHealth) mon.monitor = newMonitor(task, url, config, mon.checkHealth)
mon.pinger = &http.Client{Timeout: config.Timeout} mon.pinger = &http.Client{Timeout: config.Timeout}
if config.UseGet { if config.UseGet {
mon.method = http.MethodGet mon.method = http.MethodGet
@ -31,7 +31,7 @@ func (mon *HTTPHealthMonitor) checkHealth() (healthy bool, detail string, err er
req, reqErr := http.NewRequestWithContext( req, reqErr := http.NewRequestWithContext(
mon.task.Context(), mon.task.Context(),
mon.method, mon.method,
mon.URL.String(), mon.url.JoinPath(mon.config.Path).String(),
nil, nil,
) )
if reqErr != nil { if reqErr != nil {

View file

@ -2,13 +2,14 @@ package health
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/common"
"github.com/yusing/go-proxy/internal/net/types" "github.com/yusing/go-proxy/internal/net/types"
U "github.com/yusing/go-proxy/internal/utils"
F "github.com/yusing/go-proxy/internal/utils/functional" F "github.com/yusing/go-proxy/internal/utils/functional"
) )
@ -16,17 +17,20 @@ type (
HealthMonitor interface { HealthMonitor interface {
Start() Start()
Stop() Stop()
IsHealthy() bool Status() Status
Uptime() time.Duration
Name() string
String() string String() string
MarshalJSON() ([]byte, error)
} }
HealthCheckFunc func() (healthy bool, detail string, err error) HealthCheckFunc func() (healthy bool, detail string, err error)
monitor struct { monitor struct {
Name string config *HealthCheckConfig
URL types.URL url types.URL
Interval time.Duration
healthy atomic.Bool status U.AtomicValue[Status]
checkHealth HealthCheckFunc checkHealth HealthCheckFunc
startTime time.Time
task common.Task task common.Task
cancel context.CancelFunc cancel context.CancelFunc
@ -41,29 +45,29 @@ var monMap = F.NewMapOf[string, HealthMonitor]()
func newMonitor(task common.Task, url types.URL, config *HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor { func newMonitor(task common.Task, url types.URL, config *HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor {
task, cancel := task.SubtaskWithCancel("Health monitor for %s", task.Name()) task, cancel := task.SubtaskWithCancel("Health monitor for %s", task.Name())
mon := &monitor{ mon := &monitor{
Name: task.Name(), config: config,
URL: url.JoinPath(config.Path), url: url,
Interval: config.Interval,
checkHealth: healthCheckFunc, checkHealth: healthCheckFunc,
startTime: time.Now(),
task: task, task: task,
cancel: cancel, cancel: cancel,
done: make(chan struct{}), done: make(chan struct{}),
} }
mon.healthy.Store(true) mon.status.Store(StatusHealthy)
return mon return mon
} }
func IsHealthy(name string) (healthy bool, ok bool) { func Inspect(name string) (status Status, ok bool) {
mon, ok := monMap.Load(name) mon, ok := monMap.Load(name)
if !ok { if !ok {
return return
} }
return mon.IsHealthy(), true return mon.Status(), true
} }
func (mon *monitor) Start() { func (mon *monitor) Start() {
defer monMap.Store(mon.Name, mon) defer monMap.Store(mon.task.Name(), mon)
defer logger.Debugf("%s health monitor started", mon) defer logger.Debugf("%s health monitor started", mon.String())
go func() { go func() {
defer close(mon.done) defer close(mon.done)
@ -74,7 +78,7 @@ func (mon *monitor) Start() {
return return
} }
ticker := time.NewTicker(mon.Interval) ticker := time.NewTicker(mon.config.Interval)
defer ticker.Stop() defer ticker.Stop()
for { for {
@ -89,13 +93,13 @@ func (mon *monitor) Start() {
} }
} }
}() }()
logger.Debugf("health monitor %q started", mon) logger.Debugf("health monitor %q started", mon.String())
} }
func (mon *monitor) Stop() { func (mon *monitor) Stop() {
defer logger.Debugf("%s health monitor stopped", mon) defer logger.Debugf("%s health monitor stopped", mon.String())
monMap.Delete(mon.Name) monMap.Delete(mon.task.Name())
mon.mu.Lock() mon.mu.Lock()
defer mon.mu.Unlock() defer mon.mu.Unlock()
@ -108,31 +112,57 @@ func (mon *monitor) Stop() {
<-mon.done <-mon.done
mon.cancel = nil mon.cancel = nil
mon.status.Store(StatusUnknown)
} }
func (mon *monitor) IsHealthy() bool { func (mon *monitor) Status() Status {
return mon.healthy.Load() return mon.status.Load()
}
func (mon *monitor) Uptime() time.Duration {
return time.Since(mon.startTime)
}
func (mon *monitor) Name() string {
return mon.task.Name()
} }
func (mon *monitor) String() string { func (mon *monitor) String() string {
return mon.Name return mon.Name()
}
func (mon *monitor) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]any{
"name": mon.Name(),
"url": mon.url,
"status": mon.status.Load(),
"uptime": mon.Uptime().String(),
"started": mon.startTime.Unix(),
"config": mon.config,
})
} }
func (mon *monitor) checkUpdateHealth() (hasError bool) { func (mon *monitor) checkUpdateHealth() (hasError bool) {
healthy, detail, err := mon.checkHealth() healthy, detail, err := mon.checkHealth()
if err != nil { if err != nil {
mon.healthy.Store(false) mon.status.Store(StatusError)
if !errors.Is(err, context.Canceled) { if !errors.Is(err, context.Canceled) {
logger.Errorf("server %q failed to check health: %s", mon, err) logger.Errorf("%s failed to check health: %s", mon.String(), err)
} }
mon.Stop() mon.Stop()
return false return false
} }
if healthy != mon.healthy.Swap(healthy) { var status Status
if healthy {
status = StatusHealthy
} else {
status = StatusUnhealthy
}
if healthy != (mon.status.Swap(status) == StatusHealthy) {
if healthy { if healthy {
logger.Infof("server %q is up", mon) logger.Infof("%s is up", mon.String())
} else { } else {
logger.Warnf("server %q is down: %s", mon, detail) logger.Warnf("%s is down: %s", mon.String(), detail)
} }
} }

View file

@ -14,9 +14,9 @@ type (
} }
) )
func NewRawHealthMonitor(task common.Task, url types.URL, config HealthCheckConfig) HealthMonitor { func NewRawHealthMonitor(task common.Task, url types.URL, config *HealthCheckConfig) HealthMonitor {
mon := new(RawHealthMonitor) mon := new(RawHealthMonitor)
mon.monitor = newMonitor(task, url, &config, mon.checkAvail) mon.monitor = newMonitor(task, url, config, mon.checkAvail)
mon.dialer = &net.Dialer{ mon.dialer = &net.Dialer{
Timeout: config.Timeout, Timeout: config.Timeout,
FallbackDelay: -1, FallbackDelay: -1,
@ -25,7 +25,7 @@ func NewRawHealthMonitor(task common.Task, url types.URL, config HealthCheckConf
} }
func (mon *RawHealthMonitor) checkAvail() (avail bool, detail string, err error) { func (mon *RawHealthMonitor) checkAvail() (avail bool, detail string, err error) {
conn, dialErr := mon.dialer.DialContext(mon.task.Context(), mon.URL.Scheme, mon.URL.Host) conn, dialErr := mon.dialer.DialContext(mon.task.Context(), mon.url.Scheme, mon.url.Host)
if dialErr != nil { if dialErr != nil {
detail = dialErr.Error() detail = dialErr.Error()
/* trunk-ignore(golangci-lint/nilerr) */ /* trunk-ignore(golangci-lint/nilerr) */

View file

@ -0,0 +1,48 @@
package health
import "encoding/json"
type Status int
const (
StatusUnknown Status = (iota << 1)
StatusHealthy
StatusNapping
StatusStarting
StatusUnhealthy
StatusError
NumStatuses int = iota - 1
HealthyMask = StatusHealthy | StatusNapping | StatusStarting
)
func (s Status) String() string {
switch s {
case StatusHealthy:
return "healthy"
case StatusUnhealthy:
return "unhealthy"
case StatusNapping:
return "napping"
case StatusStarting:
return "starting"
case StatusError:
return "error"
default:
return "unknown"
}
}
func (s Status) MarshalJSON() ([]byte, error) {
return json.Marshal(s.String())
}
func (s Status) Good() bool {
return s&HealthyMask != 0
}
func (s Status) Bad() bool {
return s&HealthyMask == 0
}