metric unregistration on route removal, fixed multi-ips as visitor label detected from x headers

This commit is contained in:
yusing 2024-11-10 06:47:59 +08:00
parent a1d1325ad6
commit 6194bac4c4
7 changed files with 100 additions and 48 deletions

View file

@ -1,7 +1,6 @@
package api
import (
"fmt"
"net"
"net/http"
@ -20,7 +19,7 @@ func NewServeMux() ServeMux {
}
func (mux ServeMux) HandleFunc(method, endpoint string, handler http.HandlerFunc) {
mux.ServeMux.HandleFunc(fmt.Sprintf("%s %s", method, endpoint), checkHost(rateLimited(handler)))
mux.ServeMux.HandleFunc(method+" "+endpoint, checkHost(rateLimited(handler)))
}
func NewHandler() http.Handler {
@ -55,6 +54,7 @@ func checkHost(f http.HandlerFunc) http.HandlerFunc {
http.Error(w, "forbidden", http.StatusForbidden)
return
}
LogInfo(r).Interface("headers", r.Header).Msg("API request")
f(w, r)
}
}

View file

@ -4,8 +4,11 @@ import (
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/yusing/go-proxy/internal/common"
. "github.com/yusing/go-proxy/internal/docker/idlewatcher/types"
E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/metrics"
gphttp "github.com/yusing/go-proxy/internal/net/http"
net "github.com/yusing/go-proxy/internal/net/types"
"github.com/yusing/go-proxy/internal/proxy/entry"
@ -20,6 +23,7 @@ type waker struct {
rp *gphttp.ReverseProxy
stream net.Stream
hc health.HealthChecker
metric *metrics.Gauge
ready atomic.Bool
}
@ -53,6 +57,13 @@ func newWaker(providerSubTask task.Task, entry entry.Entry, rp *gphttp.ReversePr
default:
panic("both nil")
}
if common.PrometheusEnabled {
m := metrics.GetServiceMetrics()
fqn := providerSubTask.Parent().Name() + "/" + entry.TargetName()
waker.metric = m.HealthStatus.With(metrics.HealthMetricLabels(fqn))
waker.metric.Set(float64(watcher.Status()))
}
return watcher, nil
}
@ -68,8 +79,11 @@ func NewStreamWaker(providerSubTask task.Task, entry entry.Entry, stream net.Str
// Start implements health.HealthMonitor.
func (w *Watcher) Start(routeSubTask task.Task) E.Error {
routeSubTask.Finish("ignored")
w.task.OnCancel("stop route", func() {
w.task.OnCancel("stop route and cleanup", func() {
routeSubTask.Parent().Finish(w.task.FinishCause())
if w.metric != nil {
prometheus.Unregister(w.metric)
}
})
return nil
}
@ -96,8 +110,16 @@ func (w *Watcher) Uptime() time.Duration {
return 0
}
// Status implements health.HealthMonitor.
func (w *Watcher) Status() health.Status {
status := w.getStatusUpdateReady()
if w.metric != nil {
w.metric.Set(float64(status))
}
return status
}
// Status implements health.HealthMonitor.
func (w *Watcher) getStatusUpdateReady() health.Status {
if !w.ContainerRunning {
return health.StatusNapping
}

View file

@ -4,12 +4,12 @@ import "github.com/prometheus/client_golang/prometheus"
type (
Counter struct {
collector prometheus.Counter
mv *prometheus.CounterVec
collector prometheus.Counter
}
Gauge struct {
collector prometheus.Gauge
mv *prometheus.GaugeVec
collector prometheus.Gauge
}
Labels interface {
toPromLabels() prometheus.Labels
@ -52,8 +52,8 @@ func (c *Counter) Inc() {
c.collector.Inc()
}
func (c *Counter) With(l Labels) prometheus.Counter {
return c.mv.With(l.toPromLabels())
func (c *Counter) With(l Labels) *Counter {
return &Counter{mv: c.mv, collector: c.mv.With(l.toPromLabels())}
}
func (g *Gauge) Collect(ch chan<- prometheus.Metric) {
@ -68,6 +68,6 @@ func (g *Gauge) Set(v float64) {
g.collector.Set(v)
}
func (g *Gauge) With(l Labels) prometheus.Gauge {
return g.mv.With(l.toPromLabels())
func (g *Gauge) With(l Labels) *Gauge {
return &Gauge{mv: g.mv, collector: g.mv.With(l.toPromLabels())}
}

View file

@ -7,16 +7,24 @@ import (
"github.com/yusing/go-proxy/internal/common"
)
type RouteMetrics struct {
HTTPReqTotal,
HTTP2xx3xx,
HTTP4xx,
HTTP5xx *Counter
HTTPReqElapsed *Gauge
HealthStatus *Gauge
}
type (
RouteMetrics struct {
HTTPReqTotal,
HTTP2xx3xx,
HTTP4xx,
HTTP5xx *Counter
HTTPReqElapsed *Gauge
}
var rm RouteMetrics
ServiceMetrics struct {
HealthStatus *Gauge
}
)
var (
rm RouteMetrics
sm ServiceMetrics
)
const (
routerNamespace = "router"
@ -29,10 +37,27 @@ func GetRouteMetrics() *RouteMetrics {
return &rm
}
func GetServiceMetrics() *ServiceMetrics {
return &sm
}
func (rm *RouteMetrics) UnregisterService(service string) {
lbls := &HTTPRouteMetricLabels{Service: service}
prometheus.Unregister(rm.HTTP2xx3xx.With(lbls))
prometheus.Unregister(rm.HTTP4xx.With(lbls))
prometheus.Unregister(rm.HTTP5xx.With(lbls))
prometheus.Unregister(rm.HTTPReqElapsed.With(lbls))
}
func init() {
if !common.PrometheusEnabled {
return
}
initRouteMetrics()
initServiceMetrics()
}
func initRouteMetrics() {
lbls := []string{"service", "method", "host", "visitor", "path"}
partitionsHelp := ", partitioned by " + strings.Join(lbls, ", ")
rm = RouteMetrics{
@ -66,6 +91,11 @@ func init() {
Name: "req_elapsed_ms",
Help: "How long it took to process the request and respond a status code" + partitionsHelp,
}, lbls...),
}
}
func initServiceMetrics() {
sm = ServiceMetrics{
HealthStatus: NewGauge(prometheus.GaugeOpts{
Namespace: serviceNamespace,
Name: "health_status",

View file

@ -10,7 +10,6 @@ package http
// Copyright (c) 2024 yusing
import (
"bufio"
"bytes"
"context"
"errors"
@ -123,20 +122,8 @@ func (l *httpMetricLogger) WriteHeader(status int) {
}()
}
// Hijack hijacks the connection.
func (l *httpMetricLogger) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if h, ok := l.ResponseWriter.(http.Hijacker); ok {
return h.Hijack()
}
return nil, nil, fmt.Errorf("not a hijacker: %T", l.ResponseWriter)
}
// Flush sends any buffered data to the client.
func (l *httpMetricLogger) Flush() {
if flusher, ok := l.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
func (l *httpMetricLogger) Unwrap() http.ResponseWriter {
return l.ResponseWriter
}
func singleJoiningSlash(a, b string) string {
@ -208,6 +195,10 @@ func NewReverseProxy(name string, target types.URL, transport http.RoundTripper)
return rp
}
func (p *ReverseProxy) UnregisterMetrics() {
metrics.GetRouteMetrics().UnregisterService(p.TargetName)
}
func rewriteRequestURL(req *http.Request, target *url.URL) {
targetQuery := target.RawQuery
req.URL.Scheme = target.Scheme
@ -280,11 +271,13 @@ func (p *ReverseProxy) handler(rw http.ResponseWriter, req *http.Request) {
if common.PrometheusEnabled {
t := time.Now()
var visitor string
if realIP := req.Header.Get("X-Real-IP"); realIP != "" {
visitor = realIP
if realIPs := req.Header.Values("X-Real-IP"); len(realIPs) > 0 {
visitor = realIPs[len(realIPs)-1]
}
if fwdIP := req.Header.Get("X-Forwarded-For"); visitor == "" && fwdIP != "" {
visitor = fwdIP
if visitor == "" {
if fwdIPs := req.Header.Values("X-Forwarded-For"); len(fwdIPs) > 0 {
visitor = fwdIPs[len(fwdIPs)-1]
}
}
if visitor == "" {
var err error
@ -444,7 +437,7 @@ func (p *ReverseProxy) handler(rw http.ResponseWriter, req *http.Request) {
Proto: outreq.Proto,
ProtoMajor: outreq.ProtoMajor,
ProtoMinor: outreq.ProtoMinor,
Header: make(http.Header),
Header: http.Header{},
Body: io.NopCloser(bytes.NewReader([]byte("Origin server is not reachable."))),
Request: outreq,
TLS: outreq.TLS,

View file

@ -8,6 +8,7 @@ import (
"sync"
"github.com/rs/zerolog"
"github.com/yusing/go-proxy/internal/common"
"github.com/yusing/go-proxy/internal/docker/idlewatcher"
E "github.com/yusing/go-proxy/internal/error"
gphttp "github.com/yusing/go-proxy/internal/net/http"
@ -156,6 +157,9 @@ func (r *HTTPRoute) Start(providerSubtask task.Task) E.Error {
})
}
if common.PrometheusEnabled {
r.task.OnFinished("unreg metrics", r.rp.UnregisterMetrics)
}
return nil
}

View file

@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/yusing/go-proxy/internal/common"
E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/logging"
@ -28,6 +29,8 @@ type (
checkHealth HealthCheckFunc
startTime time.Time
metric *metrics.Gauge
task task.Task
}
)
@ -61,6 +64,10 @@ func (mon *monitor) Start(routeSubtask task.Task) E.Error {
return E.From(ErrNegativeInterval)
}
if common.PrometheusEnabled {
mon.metric = metrics.GetServiceMetrics().HealthStatus.With(metrics.HealthMetricLabels(mon.service))
}
go func() {
logger := logging.With().Str("name", mon.service).Logger()
@ -69,6 +76,9 @@ func (mon *monitor) Start(routeSubtask task.Task) E.Error {
mon.status.Store(StatusUnknown)
}
mon.task.Finish(nil)
if mon.metric != nil {
prometheus.Unregister(mon.metric)
}
}()
if err := mon.checkUpdateHealth(); err != nil {
@ -175,15 +185,8 @@ func (mon *monitor) checkUpdateHealth() error {
notif.Notify(mon.service, "server is down")
}
}
if common.PrometheusEnabled {
go func() {
m := metrics.GetRouteMetrics()
var up float64
if healthy {
up = 1
}
m.HealthStatus.With(metrics.HealthMetricLabels(mon.service)).Set(up)
}()
if mon.metric != nil {
mon.metric.Set(float64(status))
}
return nil