From 8025af60671bb000a497174509c94ae8a8ad2061 Mon Sep 17 00:00:00 2001 From: yusing Date: Sun, 20 Jul 2025 12:03:45 +0800 Subject: [PATCH] feat(healthcheck): add retries before notifying (default: 3 times) - Introduced NotifyFunc type for customizable notification handling in tests. - Added Retries field to HealthCheckConfig for controlling notification thresholds. - Implemented tests for notification behavior under various health check scenarios. --- internal/common/constants.go | 5 +- internal/notif/dispatcher.go | 2 + internal/utils/atomic/{bool.go => std.go} | 2 + internal/utils/testing/expect.go | 5 + internal/watcher/health/config.go | 2 + internal/watcher/health/monitor/monitor.go | 86 +++-- .../watcher/health/monitor/monitor_test.go | 312 ++++++++++++++++++ internal/watcher/health/status.go | 2 +- 8 files changed, 386 insertions(+), 30 deletions(-) rename internal/utils/atomic/{bool.go => std.go} (54%) create mode 100644 internal/watcher/health/monitor/monitor_test.go diff --git a/internal/common/constants.go b/internal/common/constants.go index 24b9cab..d0b3554 100644 --- a/internal/common/constants.go +++ b/internal/common/constants.go @@ -38,8 +38,9 @@ var RequiredDirectories = []string{ const DockerHostFromEnv = "$DOCKER_HOST" const ( - HealthCheckIntervalDefault = 5 * time.Second - HealthCheckTimeoutDefault = 5 * time.Second + HealthCheckIntervalDefault = 5 * time.Second + HealthCheckTimeoutDefault = 5 * time.Second + HealthCheckDownNotifyDelayDefault = 15 * time.Second WakeTimeoutDefault = "3m" StopTimeoutDefault = "3m" diff --git a/internal/notif/dispatcher.go b/internal/notif/dispatcher.go index 27e1544..1454935 100644 --- a/internal/notif/dispatcher.go +++ b/internal/notif/dispatcher.go @@ -25,6 +25,8 @@ type ( Body LogBody Color Color } + + NotifyFunc func(msg *LogMessage) ) var dispatcher *Dispatcher diff --git a/internal/utils/atomic/bool.go b/internal/utils/atomic/std.go similarity index 54% rename from internal/utils/atomic/bool.go rename to internal/utils/atomic/std.go index 0c462c3..a57315a 100644 --- a/internal/utils/atomic/bool.go +++ b/internal/utils/atomic/std.go @@ -3,3 +3,5 @@ package atomic import "sync/atomic" type Bool = atomic.Bool +type Int32 = atomic.Int32 +type Int64 = atomic.Int64 diff --git a/internal/utils/testing/expect.go b/internal/utils/testing/expect.go index 4455d7c..cd4c37d 100644 --- a/internal/utils/testing/expect.go +++ b/internal/utils/testing/expect.go @@ -65,6 +65,11 @@ func Contains[T any](t *testing.T, got T, wants []T, msgAndArgs ...any) { require.Contains(t, wants, got, msgAndArgs...) } +func StringsContain(t *testing.T, got string, want string, msgAndArgs ...any) { + t.Helper() + require.Contains(t, got, want, msgAndArgs...) +} + func Type[T any](t *testing.T, got any, msgAndArgs ...any) (_ T) { t.Helper() _, ok := got.(T) diff --git a/internal/watcher/health/config.go b/internal/watcher/health/config.go index b12c28f..787fa48 100644 --- a/internal/watcher/health/config.go +++ b/internal/watcher/health/config.go @@ -13,6 +13,7 @@ type HealthCheckConfig struct { UseGet bool `json:"use_get,omitempty"` Interval time.Duration `json:"interval" validate:"omitempty,min=1s"` Timeout time.Duration `json:"timeout" validate:"omitempty,min=1s"` + Retries int64 `json:"retries"` // <0: immediate, >=0: threshold BaseContext func() context.Context `json:"-"` } @@ -21,5 +22,6 @@ func DefaultHealthConfig() *HealthCheckConfig { return &HealthCheckConfig{ Interval: common.HealthCheckIntervalDefault, Timeout: common.HealthCheckTimeoutDefault, + Retries: int64(common.HealthCheckDownNotifyDelayDefault / common.HealthCheckIntervalDefault), } } diff --git a/internal/watcher/health/monitor/monitor.go b/internal/watcher/health/monitor/monitor.go index 2671637..0ce02a9 100644 --- a/internal/watcher/health/monitor/monitor.go +++ b/internal/watcher/health/monitor/monitor.go @@ -7,7 +7,9 @@ import ( "net/url" "time" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/docker" "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/notif" @@ -33,6 +35,9 @@ type ( isZeroPort bool + notifyFunc notif.NotifyFunc + numConsecFailures atomic.Int64 + task *task.Task } ) @@ -66,10 +71,14 @@ func NewMonitor(r routes.Route) health.HealthMonCheck { } func newMonitor(u *url.URL, config *health.HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor { + if config.Retries == 0 { + config.Retries = int64(common.HealthCheckDownNotifyDelayDefault / config.Interval) + } mon := &monitor{ config: config, checkHealth: healthCheckFunc, startTime: time.Now(), + notifyFunc: notif.Notify, } if u == nil { u = &url.URL{} @@ -258,37 +267,60 @@ func (mon *monitor) checkUpdateHealth() error { } mon.lastResult.Store(result) + // change of status if result.Healthy != (lastStatus == health.StatusHealthy) { - extras := notif.FieldsBody{ - {Name: "Service Name", Value: mon.service}, - {Name: "Time", Value: strutils.FormatTime(time.Now())}, - } - if !result.Healthy { - extras.Add("Last Seen", strutils.FormatLastSeen(GetLastSeen(mon.service))) - } - if mon.url.Load() != nil { - extras.Add("Service URL", mon.url.Load().String()) - } - if result.Detail != "" { - extras.Add("Detail", result.Detail) - } if result.Healthy { - logger.Info().Msg("service is up") - extras.Add("Ping", fmt.Sprintf("%d ms", result.Latency.Milliseconds())) - notif.Notify(¬if.LogMessage{ - Title: "✅ Service is up ✅", - Body: extras, - Color: notif.ColorSuccess, - }) - } else { - logger.Warn().Msg("service went down") - notif.Notify(¬if.LogMessage{ - Title: "❌ Service went down ❌", - Body: extras, - Color: notif.ColorError, - }) + mon.notifyServiceUp(&logger, result) + mon.numConsecFailures.Store(0) + } else if mon.config.Retries < 0 { + // immediate or meet the threshold + mon.notifyServiceDown(&logger, result) } } + // if threshold > 0, notify after threshold consecutive failures + if !result.Healthy && mon.config.Retries >= 0 && mon.numConsecFailures.Add(1) >= mon.config.Retries { + mon.numConsecFailures.Store(0) + mon.notifyServiceDown(&logger, result) + } + return err } + +func (mon *monitor) notifyServiceUp(logger *zerolog.Logger, result *health.HealthCheckResult) { + logger.Info().Msg("service is up") + extras := mon.buildNotificationExtras(result) + extras.Add("Ping", fmt.Sprintf("%d ms", result.Latency.Milliseconds())) + mon.notifyFunc(¬if.LogMessage{ + Level: zerolog.InfoLevel, + Title: "✅ Service is up ✅", + Body: extras, + Color: notif.ColorSuccess, + }) +} + +func (mon *monitor) notifyServiceDown(logger *zerolog.Logger, result *health.HealthCheckResult) { + logger.Warn().Msg("service went down") + extras := mon.buildNotificationExtras(result) + extras.Add("Last Seen", strutils.FormatLastSeen(GetLastSeen(mon.service))) + mon.notifyFunc(¬if.LogMessage{ + Level: zerolog.WarnLevel, + Title: "❌ Service went down ❌", + Body: extras, + Color: notif.ColorError, + }) +} + +func (mon *monitor) buildNotificationExtras(result *health.HealthCheckResult) notif.FieldsBody { + extras := notif.FieldsBody{ + {Name: "Service Name", Value: mon.service}, + {Name: "Time", Value: strutils.FormatTime(time.Now())}, + } + if mon.url.Load() != nil { + extras.Add("Service URL", mon.url.Load().String()) + } + if result.Detail != "" { + extras.Add("Detail", result.Detail) + } + return extras +} diff --git a/internal/watcher/health/monitor/monitor_test.go b/internal/watcher/health/monitor/monitor_test.go new file mode 100644 index 0000000..db63791 --- /dev/null +++ b/internal/watcher/health/monitor/monitor_test.go @@ -0,0 +1,312 @@ +package monitor + +import ( + "net/url" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "github.com/yusing/go-proxy/internal/notif" + "github.com/yusing/go-proxy/internal/task" + "github.com/yusing/go-proxy/internal/watcher/health" +) + +// Test notification tracker +type testNotificationTracker struct { + mu sync.RWMutex + upNotifications int + downNotifications int + lastNotification string +} + +func (t *testNotificationTracker) getStats() (up, down int, last string) { + t.mu.RLock() + defer t.mu.RUnlock() + return t.upNotifications, t.downNotifications, t.lastNotification +} + +// Create test monitor with mock health checker - returns both monitor and tracker +func createTestMonitor(config *health.HealthCheckConfig, checkFunc HealthCheckFunc) (*monitor, *testNotificationTracker) { + testURL, _ := url.Parse("http://localhost:8080") + + mon := newMonitor(testURL, config, checkFunc) + + // Override notification functions to track calls instead of actually notifying + tracker := &testNotificationTracker{} + + mon.notifyFunc = func(msg *notif.LogMessage) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + switch msg.Level { + case zerolog.InfoLevel: + tracker.upNotifications++ + tracker.lastNotification = "up" + case zerolog.WarnLevel: + tracker.downNotifications++ + tracker.lastNotification = "down" + default: + panic("unexpected log level: " + msg.Level.String()) + } + } + + return mon, tracker +} + +func TestNotification_ImmediateNotifyAfterZero(t *testing.T) { + config := &health.HealthCheckConfig{ + Interval: 100 * time.Millisecond, + Timeout: 50 * time.Millisecond, + Retries: -1, // Immediate notification + } + + mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: true}, nil + }) + + // Start with healthy service + result, err := mon.checkHealth() + require.NoError(t, err) + require.True(t, result.Healthy) + + // Set to unhealthy + mon.checkHealth = func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: false}, nil + } + + // Simulate status change detection + err = mon.checkUpdateHealth() + require.NoError(t, err) + + // With NotifyAfter=0, notification should happen immediately + require.Equal(t, health.StatusUnhealthy, mon.Status()) + + // Check notification counts - should have 1 down notification + up, down, last := tracker.getStats() + require.Equal(t, 1, down) + require.Equal(t, 0, up) + require.Equal(t, "down", last) +} + +func TestNotification_WithNotifyAfterThreshold(t *testing.T) { + config := &health.HealthCheckConfig{ + Interval: 50 * time.Millisecond, + Timeout: 50 * time.Millisecond, + Retries: 2, // Notify after 2 consecutive failures + } + + mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: true}, nil + }) + + // Start healthy + mon.status.Store(health.StatusHealthy) + + // Set to unhealthy + mon.checkHealth = func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: false}, nil + } + + // First failure - should not notify yet + err := mon.checkUpdateHealth() + require.NoError(t, err) + + // Should have no notifications yet (threshold not met) + up, down, _ := tracker.getStats() + require.Equal(t, 0, down) + require.Equal(t, 0, up) + + // Second failure - should trigger notification + err = mon.checkUpdateHealth() + require.NoError(t, err) + + // Now should have 1 down notification after threshold met + up, down, last := tracker.getStats() + require.Equal(t, 1, down) + require.Equal(t, 0, up) + require.Equal(t, "down", last) +} + +func TestNotification_ServiceRecoversBeforeThreshold(t *testing.T) { + config := &health.HealthCheckConfig{ + Interval: 100 * time.Millisecond, + Timeout: 50 * time.Millisecond, + Retries: 3, // Notify after 3 consecutive failures + } + + mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: true}, nil + }) + + // Start healthy + mon.status.Store(health.StatusHealthy) + + // Set to unhealthy + mon.checkHealth = func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: false}, nil + } + + // First failure + err := mon.checkUpdateHealth() + require.NoError(t, err) + + // Second failure + err = mon.checkUpdateHealth() + require.NoError(t, err) + + // Should have no notifications yet + up, down, _ := tracker.getStats() + require.Equal(t, 0, down) + require.Equal(t, 0, up) + + // Service recovers before third failure + mon.checkHealth = func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: true}, nil + } + + // Health check with recovery + err = mon.checkUpdateHealth() + require.NoError(t, err) + + // Should have 1 up notification, but no down notification + // because threshold was never met + up, down, last := tracker.getStats() + require.Equal(t, 0, down) + require.Equal(t, 1, up) + require.Equal(t, "up", last) +} + +func TestNotification_ConsecutiveFailureReset(t *testing.T) { + config := &health.HealthCheckConfig{ + Interval: 100 * time.Millisecond, + Timeout: 50 * time.Millisecond, + Retries: 2, // Notify after 2 consecutive failures + } + + mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: true}, nil + }) + + // Start healthy + mon.status.Store(health.StatusHealthy) + + // Set to unhealthy + mon.checkHealth = func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: false}, nil + } + + // First failure + err := mon.checkUpdateHealth() + require.NoError(t, err) + + // Recover briefly + mon.checkHealth = func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: true}, nil + } + + err = mon.checkUpdateHealth() + require.NoError(t, err) + + // Should have 1 up notification, consecutive failures should reset + up, down, _ := tracker.getStats() + require.Equal(t, 0, down) + require.Equal(t, 1, up) + + // Go down again - consecutive counter should start from 0 + mon.checkHealth = func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: false}, nil + } + + // First failure after recovery + err = mon.checkUpdateHealth() + require.NoError(t, err) + + // Should still have no down notifications (need 2 consecutive) + up, down, _ = tracker.getStats() + require.Equal(t, 0, down) + require.Equal(t, 1, up) + + // Second consecutive failure - should trigger notification + err = mon.checkUpdateHealth() + require.NoError(t, err) + + // Now should have down notification + up, down, last := tracker.getStats() + require.Equal(t, 1, down) + require.Equal(t, 1, up) + require.Equal(t, "down", last) +} + +func TestNotification_ContextCancellation(t *testing.T) { + config := &health.HealthCheckConfig{ + Interval: 100 * time.Millisecond, + Timeout: 50 * time.Millisecond, + Retries: 1, + } + + mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: true}, nil + }) + + // Create a task that we can cancel + rootTask := task.RootTask("test", true) + mon.task = rootTask.Subtask("monitor", true) + + // Start healthy, then go unhealthy + mon.status.Store(health.StatusHealthy) + mon.checkHealth = func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: false}, nil + } + + // Trigger notification + err := mon.checkUpdateHealth() + require.NoError(t, err) + + // Should have down notification + up, down, _ := tracker.getStats() + require.Equal(t, 1, down) + require.Equal(t, 0, up) + + // Cancel the task context + rootTask.Finish(nil) + + // Context cancellation doesn't affect notifications that already happened + up, down, _ = tracker.getStats() + require.Equal(t, 1, down) + require.Equal(t, 0, up) +} + +func TestImmediateUpNotification(t *testing.T) { + config := &health.HealthCheckConfig{ + Interval: 100 * time.Millisecond, + Timeout: 50 * time.Millisecond, + Retries: 2, // NotifyAfter should not affect up notifications + } + + mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: false}, nil + }) + + // Start unhealthy + mon.status.Store(health.StatusUnhealthy) + + // Set to healthy + mon.checkHealth = func() (*health.HealthCheckResult, error) { + return &health.HealthCheckResult{Healthy: true, Latency: 50 * time.Millisecond}, nil + } + + // Trigger health check + err := mon.checkUpdateHealth() + require.NoError(t, err) + + // Up notification should happen immediately regardless of NotifyAfter setting + require.Equal(t, health.StatusHealthy, mon.Status()) + + // Should have exactly 1 up notification immediately + up, down, last := tracker.getStats() + require.Equal(t, 1, up) + require.Equal(t, 0, down) + require.Equal(t, "up", last) +} diff --git a/internal/watcher/health/status.go b/internal/watcher/health/status.go index d8321aa..f843bcd 100644 --- a/internal/watcher/health/status.go +++ b/internal/watcher/health/status.go @@ -4,7 +4,7 @@ type Status uint8 const ( StatusUnknown Status = 0 - StatusHealthy = (1 << iota) + StatusHealthy Status = (1 << iota) StatusNapping StatusStarting StatusUnhealthy