From 6194bac4c4ccecf66f2c34e1d908a09423ecb232 Mon Sep 17 00:00:00 2001 From: yusing Date: Sun, 10 Nov 2024 06:47:59 +0800 Subject: [PATCH] metric unregistration on route removal, fixed multi-ips as visitor label detected from x headers --- internal/api/handler.go | 4 +-- internal/docker/idlewatcher/waker.go | 26 ++++++++++++-- internal/metrics/metric.go | 12 +++---- internal/metrics/metrics.go | 48 +++++++++++++++++++++----- internal/net/http/reverse_proxy_mod.go | 33 +++++++----------- internal/route/http.go | 4 +++ internal/watcher/health/monitor.go | 21 ++++++----- 7 files changed, 100 insertions(+), 48 deletions(-) diff --git a/internal/api/handler.go b/internal/api/handler.go index f3a9162..4cf7952 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -1,7 +1,6 @@ package api import ( - "fmt" "net" "net/http" @@ -20,7 +19,7 @@ func NewServeMux() ServeMux { } func (mux ServeMux) HandleFunc(method, endpoint string, handler http.HandlerFunc) { - mux.ServeMux.HandleFunc(fmt.Sprintf("%s %s", method, endpoint), checkHost(rateLimited(handler))) + mux.ServeMux.HandleFunc(method+" "+endpoint, checkHost(rateLimited(handler))) } func NewHandler() http.Handler { @@ -55,6 +54,7 @@ func checkHost(f http.HandlerFunc) http.HandlerFunc { http.Error(w, "forbidden", http.StatusForbidden) return } + LogInfo(r).Interface("headers", r.Header).Msg("API request") f(w, r) } } diff --git a/internal/docker/idlewatcher/waker.go b/internal/docker/idlewatcher/waker.go index ea5fed7..0291bbb 100644 --- a/internal/docker/idlewatcher/waker.go +++ b/internal/docker/idlewatcher/waker.go @@ -4,8 +4,11 @@ import ( "sync/atomic" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/yusing/go-proxy/internal/common" . "github.com/yusing/go-proxy/internal/docker/idlewatcher/types" E "github.com/yusing/go-proxy/internal/error" + "github.com/yusing/go-proxy/internal/metrics" gphttp "github.com/yusing/go-proxy/internal/net/http" net "github.com/yusing/go-proxy/internal/net/types" "github.com/yusing/go-proxy/internal/proxy/entry" @@ -20,6 +23,7 @@ type waker struct { rp *gphttp.ReverseProxy stream net.Stream hc health.HealthChecker + metric *metrics.Gauge ready atomic.Bool } @@ -53,6 +57,13 @@ func newWaker(providerSubTask task.Task, entry entry.Entry, rp *gphttp.ReversePr default: panic("both nil") } + + if common.PrometheusEnabled { + m := metrics.GetServiceMetrics() + fqn := providerSubTask.Parent().Name() + "/" + entry.TargetName() + waker.metric = m.HealthStatus.With(metrics.HealthMetricLabels(fqn)) + waker.metric.Set(float64(watcher.Status())) + } return watcher, nil } @@ -68,8 +79,11 @@ func NewStreamWaker(providerSubTask task.Task, entry entry.Entry, stream net.Str // Start implements health.HealthMonitor. func (w *Watcher) Start(routeSubTask task.Task) E.Error { routeSubTask.Finish("ignored") - w.task.OnCancel("stop route", func() { + w.task.OnCancel("stop route and cleanup", func() { routeSubTask.Parent().Finish(w.task.FinishCause()) + if w.metric != nil { + prometheus.Unregister(w.metric) + } }) return nil } @@ -96,8 +110,16 @@ func (w *Watcher) Uptime() time.Duration { return 0 } -// Status implements health.HealthMonitor. func (w *Watcher) Status() health.Status { + status := w.getStatusUpdateReady() + if w.metric != nil { + w.metric.Set(float64(status)) + } + return status +} + +// Status implements health.HealthMonitor. +func (w *Watcher) getStatusUpdateReady() health.Status { if !w.ContainerRunning { return health.StatusNapping } diff --git a/internal/metrics/metric.go b/internal/metrics/metric.go index 6106b0f..5b5b295 100644 --- a/internal/metrics/metric.go +++ b/internal/metrics/metric.go @@ -4,12 +4,12 @@ import "github.com/prometheus/client_golang/prometheus" type ( Counter struct { - collector prometheus.Counter mv *prometheus.CounterVec + collector prometheus.Counter } Gauge struct { - collector prometheus.Gauge mv *prometheus.GaugeVec + collector prometheus.Gauge } Labels interface { toPromLabels() prometheus.Labels @@ -52,8 +52,8 @@ func (c *Counter) Inc() { c.collector.Inc() } -func (c *Counter) With(l Labels) prometheus.Counter { - return c.mv.With(l.toPromLabels()) +func (c *Counter) With(l Labels) *Counter { + return &Counter{mv: c.mv, collector: c.mv.With(l.toPromLabels())} } func (g *Gauge) Collect(ch chan<- prometheus.Metric) { @@ -68,6 +68,6 @@ func (g *Gauge) Set(v float64) { g.collector.Set(v) } -func (g *Gauge) With(l Labels) prometheus.Gauge { - return g.mv.With(l.toPromLabels()) +func (g *Gauge) With(l Labels) *Gauge { + return &Gauge{mv: g.mv, collector: g.mv.With(l.toPromLabels())} } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index da48f47..b37652c 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -7,16 +7,24 @@ import ( "github.com/yusing/go-proxy/internal/common" ) -type RouteMetrics struct { - HTTPReqTotal, - HTTP2xx3xx, - HTTP4xx, - HTTP5xx *Counter - HTTPReqElapsed *Gauge - HealthStatus *Gauge -} +type ( + RouteMetrics struct { + HTTPReqTotal, + HTTP2xx3xx, + HTTP4xx, + HTTP5xx *Counter + HTTPReqElapsed *Gauge + } -var rm RouteMetrics + ServiceMetrics struct { + HealthStatus *Gauge + } +) + +var ( + rm RouteMetrics + sm ServiceMetrics +) const ( routerNamespace = "router" @@ -29,10 +37,27 @@ func GetRouteMetrics() *RouteMetrics { return &rm } +func GetServiceMetrics() *ServiceMetrics { + return &sm +} + +func (rm *RouteMetrics) UnregisterService(service string) { + lbls := &HTTPRouteMetricLabels{Service: service} + prometheus.Unregister(rm.HTTP2xx3xx.With(lbls)) + prometheus.Unregister(rm.HTTP4xx.With(lbls)) + prometheus.Unregister(rm.HTTP5xx.With(lbls)) + prometheus.Unregister(rm.HTTPReqElapsed.With(lbls)) +} + func init() { if !common.PrometheusEnabled { return } + initRouteMetrics() + initServiceMetrics() +} + +func initRouteMetrics() { lbls := []string{"service", "method", "host", "visitor", "path"} partitionsHelp := ", partitioned by " + strings.Join(lbls, ", ") rm = RouteMetrics{ @@ -66,6 +91,11 @@ func init() { Name: "req_elapsed_ms", Help: "How long it took to process the request and respond a status code" + partitionsHelp, }, lbls...), + } +} + +func initServiceMetrics() { + sm = ServiceMetrics{ HealthStatus: NewGauge(prometheus.GaugeOpts{ Namespace: serviceNamespace, Name: "health_status", diff --git a/internal/net/http/reverse_proxy_mod.go b/internal/net/http/reverse_proxy_mod.go index 152dbdd..6a4f18f 100644 --- a/internal/net/http/reverse_proxy_mod.go +++ b/internal/net/http/reverse_proxy_mod.go @@ -10,7 +10,6 @@ package http // Copyright (c) 2024 yusing import ( - "bufio" "bytes" "context" "errors" @@ -123,20 +122,8 @@ func (l *httpMetricLogger) WriteHeader(status int) { }() } -// Hijack hijacks the connection. -func (l *httpMetricLogger) Hijack() (net.Conn, *bufio.ReadWriter, error) { - if h, ok := l.ResponseWriter.(http.Hijacker); ok { - return h.Hijack() - } - - return nil, nil, fmt.Errorf("not a hijacker: %T", l.ResponseWriter) -} - -// Flush sends any buffered data to the client. -func (l *httpMetricLogger) Flush() { - if flusher, ok := l.ResponseWriter.(http.Flusher); ok { - flusher.Flush() - } +func (l *httpMetricLogger) Unwrap() http.ResponseWriter { + return l.ResponseWriter } func singleJoiningSlash(a, b string) string { @@ -208,6 +195,10 @@ func NewReverseProxy(name string, target types.URL, transport http.RoundTripper) return rp } +func (p *ReverseProxy) UnregisterMetrics() { + metrics.GetRouteMetrics().UnregisterService(p.TargetName) +} + func rewriteRequestURL(req *http.Request, target *url.URL) { targetQuery := target.RawQuery req.URL.Scheme = target.Scheme @@ -280,11 +271,13 @@ func (p *ReverseProxy) handler(rw http.ResponseWriter, req *http.Request) { if common.PrometheusEnabled { t := time.Now() var visitor string - if realIP := req.Header.Get("X-Real-IP"); realIP != "" { - visitor = realIP + if realIPs := req.Header.Values("X-Real-IP"); len(realIPs) > 0 { + visitor = realIPs[len(realIPs)-1] } - if fwdIP := req.Header.Get("X-Forwarded-For"); visitor == "" && fwdIP != "" { - visitor = fwdIP + if visitor == "" { + if fwdIPs := req.Header.Values("X-Forwarded-For"); len(fwdIPs) > 0 { + visitor = fwdIPs[len(fwdIPs)-1] + } } if visitor == "" { var err error @@ -444,7 +437,7 @@ func (p *ReverseProxy) handler(rw http.ResponseWriter, req *http.Request) { Proto: outreq.Proto, ProtoMajor: outreq.ProtoMajor, ProtoMinor: outreq.ProtoMinor, - Header: make(http.Header), + Header: http.Header{}, Body: io.NopCloser(bytes.NewReader([]byte("Origin server is not reachable."))), Request: outreq, TLS: outreq.TLS, diff --git a/internal/route/http.go b/internal/route/http.go index 2966c0a..55ab56c 100755 --- a/internal/route/http.go +++ b/internal/route/http.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/rs/zerolog" + "github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/docker/idlewatcher" E "github.com/yusing/go-proxy/internal/error" gphttp "github.com/yusing/go-proxy/internal/net/http" @@ -156,6 +157,9 @@ func (r *HTTPRoute) Start(providerSubtask task.Task) E.Error { }) } + if common.PrometheusEnabled { + r.task.OnFinished("unreg metrics", r.rp.UnregisterMetrics) + } return nil } diff --git a/internal/watcher/health/monitor.go b/internal/watcher/health/monitor.go index 559c255..7116d0e 100644 --- a/internal/watcher/health/monitor.go +++ b/internal/watcher/health/monitor.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/yusing/go-proxy/internal/common" E "github.com/yusing/go-proxy/internal/error" "github.com/yusing/go-proxy/internal/logging" @@ -28,6 +29,8 @@ type ( checkHealth HealthCheckFunc startTime time.Time + metric *metrics.Gauge + task task.Task } ) @@ -61,6 +64,10 @@ func (mon *monitor) Start(routeSubtask task.Task) E.Error { return E.From(ErrNegativeInterval) } + if common.PrometheusEnabled { + mon.metric = metrics.GetServiceMetrics().HealthStatus.With(metrics.HealthMetricLabels(mon.service)) + } + go func() { logger := logging.With().Str("name", mon.service).Logger() @@ -69,6 +76,9 @@ func (mon *monitor) Start(routeSubtask task.Task) E.Error { mon.status.Store(StatusUnknown) } mon.task.Finish(nil) + if mon.metric != nil { + prometheus.Unregister(mon.metric) + } }() if err := mon.checkUpdateHealth(); err != nil { @@ -175,15 +185,8 @@ func (mon *monitor) checkUpdateHealth() error { notif.Notify(mon.service, "server is down") } } - if common.PrometheusEnabled { - go func() { - m := metrics.GetRouteMetrics() - var up float64 - if healthy { - up = 1 - } - m.HealthStatus.With(metrics.HealthMetricLabels(mon.service)).Set(up) - }() + if mon.metric != nil { + mon.metric.Set(float64(status)) } return nil