feat(healthcheck): add retries before notifying (default: 3 times)

- Introduced NotifyFunc type for customizable notification handling in tests.
- Added Retries field to HealthCheckConfig for controlling notification thresholds.
- Implemented tests for notification behavior under various health check scenarios.
This commit is contained in:
yusing 2025-07-20 12:03:45 +08:00
parent 47910774dd
commit 8025af6067
8 changed files with 386 additions and 30 deletions

View file

@ -40,6 +40,7 @@ const DockerHostFromEnv = "$DOCKER_HOST"
const ( const (
HealthCheckIntervalDefault = 5 * time.Second HealthCheckIntervalDefault = 5 * time.Second
HealthCheckTimeoutDefault = 5 * time.Second HealthCheckTimeoutDefault = 5 * time.Second
HealthCheckDownNotifyDelayDefault = 15 * time.Second
WakeTimeoutDefault = "3m" WakeTimeoutDefault = "3m"
StopTimeoutDefault = "3m" StopTimeoutDefault = "3m"

View file

@ -25,6 +25,8 @@ type (
Body LogBody Body LogBody
Color Color Color Color
} }
NotifyFunc func(msg *LogMessage)
) )
var dispatcher *Dispatcher var dispatcher *Dispatcher

View file

@ -3,3 +3,5 @@ package atomic
import "sync/atomic" import "sync/atomic"
type Bool = atomic.Bool type Bool = atomic.Bool
type Int32 = atomic.Int32
type Int64 = atomic.Int64

View file

@ -65,6 +65,11 @@ func Contains[T any](t *testing.T, got T, wants []T, msgAndArgs ...any) {
require.Contains(t, wants, got, msgAndArgs...) require.Contains(t, wants, got, msgAndArgs...)
} }
func StringsContain(t *testing.T, got string, want string, msgAndArgs ...any) {
t.Helper()
require.Contains(t, got, want, msgAndArgs...)
}
func Type[T any](t *testing.T, got any, msgAndArgs ...any) (_ T) { func Type[T any](t *testing.T, got any, msgAndArgs ...any) (_ T) {
t.Helper() t.Helper()
_, ok := got.(T) _, ok := got.(T)

View file

@ -13,6 +13,7 @@ type HealthCheckConfig struct {
UseGet bool `json:"use_get,omitempty"` UseGet bool `json:"use_get,omitempty"`
Interval time.Duration `json:"interval" validate:"omitempty,min=1s"` Interval time.Duration `json:"interval" validate:"omitempty,min=1s"`
Timeout time.Duration `json:"timeout" validate:"omitempty,min=1s"` Timeout time.Duration `json:"timeout" validate:"omitempty,min=1s"`
Retries int64 `json:"retries"` // <0: immediate, >=0: threshold
BaseContext func() context.Context `json:"-"` BaseContext func() context.Context `json:"-"`
} }
@ -21,5 +22,6 @@ func DefaultHealthConfig() *HealthCheckConfig {
return &HealthCheckConfig{ return &HealthCheckConfig{
Interval: common.HealthCheckIntervalDefault, Interval: common.HealthCheckIntervalDefault,
Timeout: common.HealthCheckTimeoutDefault, Timeout: common.HealthCheckTimeoutDefault,
Retries: int64(common.HealthCheckDownNotifyDelayDefault / common.HealthCheckIntervalDefault),
} }
} }

View file

@ -7,7 +7,9 @@ import (
"net/url" "net/url"
"time" "time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/yusing/go-proxy/internal/common"
"github.com/yusing/go-proxy/internal/docker" "github.com/yusing/go-proxy/internal/docker"
"github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/notif" "github.com/yusing/go-proxy/internal/notif"
@ -33,6 +35,9 @@ type (
isZeroPort bool isZeroPort bool
notifyFunc notif.NotifyFunc
numConsecFailures atomic.Int64
task *task.Task task *task.Task
} }
) )
@ -66,10 +71,14 @@ func NewMonitor(r routes.Route) health.HealthMonCheck {
} }
func newMonitor(u *url.URL, config *health.HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor { func newMonitor(u *url.URL, config *health.HealthCheckConfig, healthCheckFunc HealthCheckFunc) *monitor {
if config.Retries == 0 {
config.Retries = int64(common.HealthCheckDownNotifyDelayDefault / config.Interval)
}
mon := &monitor{ mon := &monitor{
config: config, config: config,
checkHealth: healthCheckFunc, checkHealth: healthCheckFunc,
startTime: time.Now(), startTime: time.Now(),
notifyFunc: notif.Notify,
} }
if u == nil { if u == nil {
u = &url.URL{} u = &url.URL{}
@ -258,37 +267,60 @@ func (mon *monitor) checkUpdateHealth() error {
} }
mon.lastResult.Store(result) mon.lastResult.Store(result)
// change of status
if result.Healthy != (lastStatus == health.StatusHealthy) { if result.Healthy != (lastStatus == health.StatusHealthy) {
if result.Healthy {
mon.notifyServiceUp(&logger, result)
mon.numConsecFailures.Store(0)
} else if mon.config.Retries < 0 {
// immediate or meet the threshold
mon.notifyServiceDown(&logger, result)
}
}
// if threshold > 0, notify after threshold consecutive failures
if !result.Healthy && mon.config.Retries >= 0 && mon.numConsecFailures.Add(1) >= mon.config.Retries {
mon.numConsecFailures.Store(0)
mon.notifyServiceDown(&logger, result)
}
return err
}
func (mon *monitor) notifyServiceUp(logger *zerolog.Logger, result *health.HealthCheckResult) {
logger.Info().Msg("service is up")
extras := mon.buildNotificationExtras(result)
extras.Add("Ping", fmt.Sprintf("%d ms", result.Latency.Milliseconds()))
mon.notifyFunc(&notif.LogMessage{
Level: zerolog.InfoLevel,
Title: "✅ Service is up ✅",
Body: extras,
Color: notif.ColorSuccess,
})
}
func (mon *monitor) notifyServiceDown(logger *zerolog.Logger, result *health.HealthCheckResult) {
logger.Warn().Msg("service went down")
extras := mon.buildNotificationExtras(result)
extras.Add("Last Seen", strutils.FormatLastSeen(GetLastSeen(mon.service)))
mon.notifyFunc(&notif.LogMessage{
Level: zerolog.WarnLevel,
Title: "❌ Service went down ❌",
Body: extras,
Color: notif.ColorError,
})
}
func (mon *monitor) buildNotificationExtras(result *health.HealthCheckResult) notif.FieldsBody {
extras := notif.FieldsBody{ extras := notif.FieldsBody{
{Name: "Service Name", Value: mon.service}, {Name: "Service Name", Value: mon.service},
{Name: "Time", Value: strutils.FormatTime(time.Now())}, {Name: "Time", Value: strutils.FormatTime(time.Now())},
} }
if !result.Healthy {
extras.Add("Last Seen", strutils.FormatLastSeen(GetLastSeen(mon.service)))
}
if mon.url.Load() != nil { if mon.url.Load() != nil {
extras.Add("Service URL", mon.url.Load().String()) extras.Add("Service URL", mon.url.Load().String())
} }
if result.Detail != "" { if result.Detail != "" {
extras.Add("Detail", result.Detail) extras.Add("Detail", result.Detail)
} }
if result.Healthy { return extras
logger.Info().Msg("service is up")
extras.Add("Ping", fmt.Sprintf("%d ms", result.Latency.Milliseconds()))
notif.Notify(&notif.LogMessage{
Title: "✅ Service is up ✅",
Body: extras,
Color: notif.ColorSuccess,
})
} else {
logger.Warn().Msg("service went down")
notif.Notify(&notif.LogMessage{
Title: "❌ Service went down ❌",
Body: extras,
Color: notif.ColorError,
})
}
}
return err
} }

View file

@ -0,0 +1,312 @@
package monitor
import (
"net/url"
"sync"
"testing"
"time"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"github.com/yusing/go-proxy/internal/notif"
"github.com/yusing/go-proxy/internal/task"
"github.com/yusing/go-proxy/internal/watcher/health"
)
// Test notification tracker
type testNotificationTracker struct {
mu sync.RWMutex
upNotifications int
downNotifications int
lastNotification string
}
func (t *testNotificationTracker) getStats() (up, down int, last string) {
t.mu.RLock()
defer t.mu.RUnlock()
return t.upNotifications, t.downNotifications, t.lastNotification
}
// Create test monitor with mock health checker - returns both monitor and tracker
func createTestMonitor(config *health.HealthCheckConfig, checkFunc HealthCheckFunc) (*monitor, *testNotificationTracker) {
testURL, _ := url.Parse("http://localhost:8080")
mon := newMonitor(testURL, config, checkFunc)
// Override notification functions to track calls instead of actually notifying
tracker := &testNotificationTracker{}
mon.notifyFunc = func(msg *notif.LogMessage) {
tracker.mu.Lock()
defer tracker.mu.Unlock()
switch msg.Level {
case zerolog.InfoLevel:
tracker.upNotifications++
tracker.lastNotification = "up"
case zerolog.WarnLevel:
tracker.downNotifications++
tracker.lastNotification = "down"
default:
panic("unexpected log level: " + msg.Level.String())
}
}
return mon, tracker
}
func TestNotification_ImmediateNotifyAfterZero(t *testing.T) {
config := &health.HealthCheckConfig{
Interval: 100 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: -1, // Immediate notification
}
mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: true}, nil
})
// Start with healthy service
result, err := mon.checkHealth()
require.NoError(t, err)
require.True(t, result.Healthy)
// Set to unhealthy
mon.checkHealth = func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: false}, nil
}
// Simulate status change detection
err = mon.checkUpdateHealth()
require.NoError(t, err)
// With NotifyAfter=0, notification should happen immediately
require.Equal(t, health.StatusUnhealthy, mon.Status())
// Check notification counts - should have 1 down notification
up, down, last := tracker.getStats()
require.Equal(t, 1, down)
require.Equal(t, 0, up)
require.Equal(t, "down", last)
}
func TestNotification_WithNotifyAfterThreshold(t *testing.T) {
config := &health.HealthCheckConfig{
Interval: 50 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: 2, // Notify after 2 consecutive failures
}
mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: true}, nil
})
// Start healthy
mon.status.Store(health.StatusHealthy)
// Set to unhealthy
mon.checkHealth = func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: false}, nil
}
// First failure - should not notify yet
err := mon.checkUpdateHealth()
require.NoError(t, err)
// Should have no notifications yet (threshold not met)
up, down, _ := tracker.getStats()
require.Equal(t, 0, down)
require.Equal(t, 0, up)
// Second failure - should trigger notification
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Now should have 1 down notification after threshold met
up, down, last := tracker.getStats()
require.Equal(t, 1, down)
require.Equal(t, 0, up)
require.Equal(t, "down", last)
}
func TestNotification_ServiceRecoversBeforeThreshold(t *testing.T) {
config := &health.HealthCheckConfig{
Interval: 100 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: 3, // Notify after 3 consecutive failures
}
mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: true}, nil
})
// Start healthy
mon.status.Store(health.StatusHealthy)
// Set to unhealthy
mon.checkHealth = func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: false}, nil
}
// First failure
err := mon.checkUpdateHealth()
require.NoError(t, err)
// Second failure
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Should have no notifications yet
up, down, _ := tracker.getStats()
require.Equal(t, 0, down)
require.Equal(t, 0, up)
// Service recovers before third failure
mon.checkHealth = func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: true}, nil
}
// Health check with recovery
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Should have 1 up notification, but no down notification
// because threshold was never met
up, down, last := tracker.getStats()
require.Equal(t, 0, down)
require.Equal(t, 1, up)
require.Equal(t, "up", last)
}
func TestNotification_ConsecutiveFailureReset(t *testing.T) {
config := &health.HealthCheckConfig{
Interval: 100 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: 2, // Notify after 2 consecutive failures
}
mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: true}, nil
})
// Start healthy
mon.status.Store(health.StatusHealthy)
// Set to unhealthy
mon.checkHealth = func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: false}, nil
}
// First failure
err := mon.checkUpdateHealth()
require.NoError(t, err)
// Recover briefly
mon.checkHealth = func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: true}, nil
}
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Should have 1 up notification, consecutive failures should reset
up, down, _ := tracker.getStats()
require.Equal(t, 0, down)
require.Equal(t, 1, up)
// Go down again - consecutive counter should start from 0
mon.checkHealth = func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: false}, nil
}
// First failure after recovery
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Should still have no down notifications (need 2 consecutive)
up, down, _ = tracker.getStats()
require.Equal(t, 0, down)
require.Equal(t, 1, up)
// Second consecutive failure - should trigger notification
err = mon.checkUpdateHealth()
require.NoError(t, err)
// Now should have down notification
up, down, last := tracker.getStats()
require.Equal(t, 1, down)
require.Equal(t, 1, up)
require.Equal(t, "down", last)
}
func TestNotification_ContextCancellation(t *testing.T) {
config := &health.HealthCheckConfig{
Interval: 100 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: 1,
}
mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: true}, nil
})
// Create a task that we can cancel
rootTask := task.RootTask("test", true)
mon.task = rootTask.Subtask("monitor", true)
// Start healthy, then go unhealthy
mon.status.Store(health.StatusHealthy)
mon.checkHealth = func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: false}, nil
}
// Trigger notification
err := mon.checkUpdateHealth()
require.NoError(t, err)
// Should have down notification
up, down, _ := tracker.getStats()
require.Equal(t, 1, down)
require.Equal(t, 0, up)
// Cancel the task context
rootTask.Finish(nil)
// Context cancellation doesn't affect notifications that already happened
up, down, _ = tracker.getStats()
require.Equal(t, 1, down)
require.Equal(t, 0, up)
}
func TestImmediateUpNotification(t *testing.T) {
config := &health.HealthCheckConfig{
Interval: 100 * time.Millisecond,
Timeout: 50 * time.Millisecond,
Retries: 2, // NotifyAfter should not affect up notifications
}
mon, tracker := createTestMonitor(config, func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: false}, nil
})
// Start unhealthy
mon.status.Store(health.StatusUnhealthy)
// Set to healthy
mon.checkHealth = func() (*health.HealthCheckResult, error) {
return &health.HealthCheckResult{Healthy: true, Latency: 50 * time.Millisecond}, nil
}
// Trigger health check
err := mon.checkUpdateHealth()
require.NoError(t, err)
// Up notification should happen immediately regardless of NotifyAfter setting
require.Equal(t, health.StatusHealthy, mon.Status())
// Should have exactly 1 up notification immediately
up, down, last := tracker.getStats()
require.Equal(t, 1, up)
require.Equal(t, 0, down)
require.Equal(t, "up", last)
}

View file

@ -4,7 +4,7 @@ type Status uint8
const ( const (
StatusUnknown Status = 0 StatusUnknown Status = 0
StatusHealthy = (1 << iota) StatusHealthy Status = (1 << iota)
StatusNapping StatusNapping
StatusStarting StatusStarting
StatusUnhealthy StatusUnhealthy