fix data race

This commit is contained in:
yusing 2025-02-24 19:24:46 +08:00
parent 135c79d2ad
commit 0d388a396c
8 changed files with 29 additions and 20 deletions

View file

@ -13,6 +13,8 @@ import (
"github.com/yusing/go-proxy/internal/watcher/health/monitor"
)
var defaultHealthConfig = health.DefaultHealthConfig()
func CheckHealth(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
scheme := query.Get("scheme")
@ -46,7 +48,7 @@ func CheckHealth(w http.ResponseWriter, r *http.Request) {
Scheme: scheme,
Host: host,
Path: path,
}), health.DefaultHealthConfig).CheckHealth()
}), defaultHealthConfig).CheckHealth()
case "tcp", "udp":
host := query.Get("host")
if host == "" {
@ -64,7 +66,7 @@ func CheckHealth(w http.ResponseWriter, r *http.Request) {
result, err = monitor.NewRawHealthChecker(types.NewURL(&url.URL{
Scheme: scheme,
Host: host,
}), health.DefaultHealthConfig).CheckHealth()
}), defaultHealthConfig).CheckHealth()
}
if err != nil {

View file

@ -15,6 +15,7 @@ import (
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/homepage"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/metrics/uptime"
"github.com/yusing/go-proxy/internal/net/gphttp/middleware"
"github.com/yusing/go-proxy/internal/route/routes/routequery"
"github.com/yusing/go-proxy/internal/task"
@ -139,6 +140,7 @@ func main() {
API: true,
})
uptime.Poller.Start()
config.WatchChanges()
task.WaitExit(cfg.Value().TimeoutShutdown)

View file

@ -12,6 +12,7 @@ import (
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/task"
"github.com/yusing/go-proxy/internal/utils/atomic"
)
type (
@ -24,7 +25,7 @@ type (
aggregate AggregateFunc[T, AggregateT]
resultFilter FilterFunc[T]
period *Period[T]
lastResult *T
lastResult atomic.Value[*T]
errs []pollErr
}
pollErr struct {
@ -119,13 +120,13 @@ func (p *Poller[T, AggregateT]) clearErrs() {
func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, pollInterval)
defer cancel()
data, err := p.poll(ctx, p.lastResult)
data, err := p.poll(ctx, p.lastResult.Load())
if err != nil {
p.appendErr(err)
return
}
p.period.Add(data)
p.lastResult = data
p.lastResult.Store(data)
}
func (p *Poller[T, AggregateT]) Start() {
@ -184,5 +185,5 @@ func (p *Poller[T, AggregateT]) Get(filter Filter) ([]*T, bool) {
}
func (p *Poller[T, AggregateT]) GetLastResult() *T {
return p.lastResult
return p.lastResult.Load()
}

View file

@ -31,10 +31,6 @@ type (
var Poller = period.NewPoller("uptime", getStatuses, aggregateStatuses)
func init() {
Poller.Start()
}
func getStatuses(ctx context.Context, _ *StatusByAlias) (*StatusByAlias, error) {
return &StatusByAlias{
Map: routequery.HealthInfo(),

View file

@ -125,6 +125,7 @@ func (r *Route) Start(parent task.Parent) (err gperr.Error) {
if r.impl == nil {
return gperr.New("route not initialized")
}
return r.impl.Start(parent)
}
@ -337,7 +338,7 @@ func (r *Route) Finalize() {
r.Port.Listening, r.Port.Proxy = lp, pp
if r.HealthCheck == nil {
r.HealthCheck = health.DefaultHealthConfig
r.HealthCheck = health.DefaultHealthConfig()
}
if !r.HealthCheck.Disable {

View file

@ -10,7 +10,11 @@ type Value[T any] struct {
}
func (a *Value[T]) Load() T {
return a.Value.Load().(T)
if v := a.Value.Load(); v != nil {
return v.(T)
}
var zero T
return zero
}
func (a *Value[T]) Store(v T) {

View file

@ -14,7 +14,9 @@ type HealthCheckConfig struct {
Timeout time.Duration `json:"timeout" validate:"omitempty,min=1s"`
}
var DefaultHealthConfig = &HealthCheckConfig{
Interval: common.HealthCheckIntervalDefault,
Timeout: common.HealthCheckTimeoutDefault,
func DefaultHealthConfig() *HealthCheckConfig {
return &HealthCheckConfig{
Interval: common.HealthCheckIntervalDefault,
Timeout: common.HealthCheckTimeoutDefault,
}
}

View file

@ -25,7 +25,7 @@ type (
url atomic.Value[*types.URL]
status atomic.Value[health.Status]
lastResult *health.HealthCheckResult
lastResult atomic.Value[*health.HealthCheckResult]
checkHealth HealthCheckFunc
startTime time.Time
@ -139,10 +139,11 @@ func (mon *monitor) Uptime() time.Duration {
// Latency implements HealthMonitor.
func (mon *monitor) Latency() time.Duration {
if mon.lastResult == nil {
res := mon.lastResult.Load()
if res == nil {
return 0
}
return mon.lastResult.Latency
return res.Latency
}
// Name implements HealthMonitor.
@ -158,7 +159,7 @@ func (mon *monitor) String() string {
// MarshalJSON implements json.Marshaler of HealthMonitor.
func (mon *monitor) MarshalJSON() ([]byte, error) {
res := mon.lastResult
res := mon.lastResult.Load()
if res == nil {
res = &health.HealthCheckResult{
Healthy: true,
@ -190,7 +191,7 @@ func (mon *monitor) checkUpdateHealth() error {
return nil
}
mon.lastResult = result
mon.lastResult.Store(result)
var status health.Status
if result.Healthy {
status = health.StatusHealthy