From c807b30c8f840ab63cea0bd37455ca56d70582c3 Mon Sep 17 00:00:00 2001 From: yusing Date: Wed, 12 Feb 2025 05:30:34 +0800 Subject: [PATCH] api: remove service health from prometheus, implement godoxy metrics --- agent/pkg/agent/config.go | 2 +- agent/pkg/agent/requests.go | 14 +++ agent/pkg/handler/handler.go | 3 +- agent/pkg/handler/system_info.go | 17 --- internal/api/handler.go | 7 +- internal/api/v1/system_info.go | 64 +++++++--- internal/api/v1/utils/error.go | 8 ++ internal/api/v1/utils/ws.go | 2 +- internal/docker/idlewatcher/waker.go | 7 -- internal/metrics/http_handler.go | 13 -- internal/metrics/labels.go | 7 -- internal/metrics/period/entries.go | 45 +++++++ internal/metrics/period/handler.go | 49 ++++++++ internal/metrics/period/period.go | 67 +++++++++++ internal/metrics/period/poller.go | 132 +++++++++++++++++++++ internal/metrics/system_info.go | 80 ------------- internal/metrics/systeminfo/system_info.go | 60 ++++++++++ internal/metrics/uptime/uptime.go | 73 ++++++++++++ internal/route/routes/routequery/query.go | 15 ++- internal/route/routes/routes.go | 13 ++ internal/watcher/health/monitor/monitor.go | 5 - 21 files changed, 531 insertions(+), 152 deletions(-) delete mode 100644 agent/pkg/handler/system_info.go delete mode 100644 internal/metrics/http_handler.go create mode 100644 internal/metrics/period/entries.go create mode 100644 internal/metrics/period/handler.go create mode 100644 internal/metrics/period/period.go create mode 100644 internal/metrics/period/poller.go delete mode 100644 internal/metrics/system_info.go create mode 100644 internal/metrics/systeminfo/system_info.go create mode 100644 internal/metrics/uptime/uptime.go diff --git a/agent/pkg/agent/config.go b/agent/pkg/agent/config.go index 7d149b0..bc6a6ba 100644 --- a/agent/pkg/agent/config.go +++ b/agent/pkg/agent/config.go @@ -36,7 +36,7 @@ const ( EndpointProxyHTTP = "/proxy/http" EndpointHealth = "/health" EndpointLogs = "/logs" - EndpointSystemInfo = "/system-info" + EndpointSystemInfo = "/system_info" AgentHost = certs.CertsDNSName diff --git a/agent/pkg/agent/requests.go b/agent/pkg/agent/requests.go index a6efb5b..5da76d1 100644 --- a/agent/pkg/agent/requests.go +++ b/agent/pkg/agent/requests.go @@ -16,6 +16,20 @@ func (cfg *AgentConfig) Do(ctx context.Context, method, endpoint string, body io return cfg.httpClient.Do(req) } +func (cfg *AgentConfig) Forward(req *http.Request, endpoint string) ([]byte, int, error) { + req = req.WithContext(req.Context()) + req.URL.Host = AgentHost + req.URL.Scheme = "https" + req.URL.Path = APIEndpointBase + endpoint + resp, err := cfg.httpClient.Do(req) + if err != nil { + return nil, 0, err + } + defer resp.Body.Close() + data, _ := io.ReadAll(resp.Body) + return data, resp.StatusCode, nil +} + func (cfg *AgentConfig) Fetch(ctx context.Context, endpoint string) ([]byte, int, error) { resp, err := cfg.Do(ctx, "GET", endpoint, nil) if err != nil { diff --git a/agent/pkg/handler/handler.go b/agent/pkg/handler/handler.go index adbea9c..7549240 100644 --- a/agent/pkg/handler/handler.go +++ b/agent/pkg/handler/handler.go @@ -15,6 +15,7 @@ import ( E "github.com/yusing/go-proxy/internal/error" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/logging/memlogger" + "github.com/yusing/go-proxy/internal/metrics/systeminfo" "github.com/yusing/go-proxy/internal/task" "github.com/yusing/go-proxy/internal/utils/strutils" ) @@ -49,7 +50,7 @@ func NewAgentHandler() http.Handler { }) mux.HandleMethods("GET", agent.EndpointHealth, CheckHealth) mux.HandleMethods("GET", agent.EndpointLogs, memlogger.LogsWS(nil)) - mux.HandleMethods("GET", agent.EndpointSystemInfo, SystemInfo) + mux.HandleMethods("GET", agent.EndpointSystemInfo, systeminfo.Poller.ServeHTTP) mux.ServeMux.HandleFunc("/", DockerSocketHandler()) return mux } diff --git a/agent/pkg/handler/system_info.go b/agent/pkg/handler/system_info.go deleted file mode 100644 index 9b02992..0000000 --- a/agent/pkg/handler/system_info.go +++ /dev/null @@ -1,17 +0,0 @@ -package handler - -import ( - "net/http" - - "github.com/yusing/go-proxy/internal/api/v1/utils" - "github.com/yusing/go-proxy/internal/metrics" -) - -func SystemInfo(w http.ResponseWriter, r *http.Request) { - info, err := metrics.GetSystemInfo(r.Context()) - if err != nil { - utils.HandleErr(w, r, err) - return - } - utils.RespondJSON(w, r, info) -} diff --git a/internal/api/handler.go b/internal/api/handler.go index dd988dd..fb00667 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -11,6 +11,7 @@ import ( config "github.com/yusing/go-proxy/internal/config/types" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/logging/memlogger" + "github.com/yusing/go-proxy/internal/metrics/uptime" "github.com/yusing/go-proxy/internal/utils/strutils" ) @@ -39,8 +40,10 @@ func NewHandler(cfg config.ConfigInstance) http.Handler { mux.HandleFunc("GET", "/v1/logs/ws", auth.RequireAuth(memlogger.LogsWS(cfg))) mux.HandleFunc("GET", "/v1/favicon", auth.RequireAuth(favicon.GetFavIcon)) mux.HandleFunc("POST", "/v1/homepage/set", auth.RequireAuth(v1.SetHomePageOverrides)) - mux.HandleFunc("GET", "/v1/system_info", auth.RequireAuth(useCfg(cfg, v1.SystemInfo))) - mux.HandleFunc("GET", "/v1/system_info/{agent_name}", auth.RequireAuth(useCfg(cfg, v1.SystemInfo))) + mux.HandleFunc("GET", "/v1/metrics/system_info", auth.RequireAuth(useCfg(cfg, v1.SystemInfo))) + mux.HandleFunc("GET", "/v1/metrics/system_info/ws", auth.RequireAuth(useCfg(cfg, v1.SystemInfo))) + mux.HandleFunc("GET", "/v1/metrics/uptime", auth.RequireAuth(uptime.Poller.ServeHTTP)) + mux.HandleFunc("GET", "/v1/metrics/uptime/ws", auth.RequireAuth(useCfg(cfg, uptime.Poller.ServeWS))) if common.PrometheusEnabled { mux.Handle("GET /v1/metrics", promhttp.Handler()) diff --git a/internal/api/v1/system_info.go b/internal/api/v1/system_info.go index a5a2502..ac7ff5e 100644 --- a/internal/api/v1/system_info.go +++ b/internal/api/v1/system_info.go @@ -2,29 +2,34 @@ package v1 import ( "net/http" + "strings" + "github.com/coder/websocket/wsjson" agentPkg "github.com/yusing/go-proxy/agent/pkg/agent" U "github.com/yusing/go-proxy/internal/api/v1/utils" config "github.com/yusing/go-proxy/internal/config/types" - "github.com/yusing/go-proxy/internal/metrics" + "github.com/yusing/go-proxy/internal/metrics/systeminfo" ) func SystemInfo(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Request) { - agentName := r.FormValue("agent_name") + isWS := strings.HasSuffix(r.URL.Path, "/ws") + agentName := r.URL.Query().Get("agent_name") if agentName == "" { - info, err := metrics.GetSystemInfo(r.Context()) - if err != nil { - U.HandleErr(w, r, err) - return + if isWS { + systeminfo.Poller.ServeWS(cfg, w, r) + } else { + systeminfo.Poller.ServeHTTP(w, r) } - U.RespondJSON(w, r, info) - } else { - agent, ok := cfg.GetAgent(agentName) - if !ok { - U.HandleErr(w, r, U.ErrInvalidKey("agent_name"), http.StatusNotFound) - return - } - respData, status, err := agent.Fetch(r.Context(), agentPkg.EndpointSystemInfo) + return + } + + agent, ok := cfg.GetAgent(agentName) + if !ok { + U.HandleErr(w, r, U.ErrInvalidKey("agent_name"), http.StatusNotFound) + return + } + if !isWS { + respData, status, err := agent.Forward(r, agentPkg.EndpointSystemInfo) if err != nil { U.HandleErr(w, r, err) return @@ -33,6 +38,35 @@ func SystemInfo(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Reques http.Error(w, string(respData), status) return } - U.RespondJSON(w, r, respData) + U.WriteBody(w, respData) + } else { + clientConn, err := U.InitiateWS(cfg, w, r) + if err != nil { + U.HandleErr(w, r, err) + return + } + agentConn, _, err := agent.Websocket(r.Context(), agentPkg.EndpointSystemInfo) + if err != nil { + U.HandleErr(w, r, err) + return + } + //nolint:errcheck + defer agentConn.CloseNow() + var data []byte + for { + select { + case <-r.Context().Done(): + return + default: + err := wsjson.Read(r.Context(), agentConn, &data) + if err == nil { + err = wsjson.Write(r.Context(), clientConn, data) + } + if err != nil { + U.HandleErr(w, r, err) + return + } + } + } } } diff --git a/internal/api/v1/utils/error.go b/internal/api/v1/utils/error.go index c7341e9..6c085d1 100644 --- a/internal/api/v1/utils/error.go +++ b/internal/api/v1/utils/error.go @@ -1,7 +1,9 @@ package utils import ( + "context" "encoding/json" + "errors" "net/http" E "github.com/yusing/go-proxy/internal/error" @@ -17,7 +19,13 @@ func HandleErr(w http.ResponseWriter, r *http.Request, err error, code ...int) { if err == nil { return } + if errors.Is(err, context.Canceled) { + return + } LogError(r).Msg(err.Error()) + if r.Header.Get("Upgrade") == "websocket" { + return + } if len(code) == 0 { code = []int{http.StatusInternalServerError} } diff --git a/internal/api/v1/utils/ws.go b/internal/api/v1/utils/ws.go index 4e417a0..7e7e155 100644 --- a/internal/api/v1/utils/ws.go +++ b/internal/api/v1/utils/ws.go @@ -60,7 +60,7 @@ func PeriodicWS(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Reques return case <-ticker.C: if err := do(conn); err != nil { - LogError(r).Msg(err.Error()) + HandleErr(w, r, err) return } } diff --git a/internal/docker/idlewatcher/waker.go b/internal/docker/idlewatcher/waker.go index fefbc6b..6af2381 100644 --- a/internal/docker/idlewatcher/waker.go +++ b/internal/docker/idlewatcher/waker.go @@ -4,7 +4,6 @@ import ( "sync/atomic" "time" - "github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/docker/idlewatcher/types" E "github.com/yusing/go-proxy/internal/error" "github.com/yusing/go-proxy/internal/metrics" @@ -63,12 +62,6 @@ func newWaker(parent task.Parent, route route.Route, rp *reverseproxy.ReversePro panic("both nil") } - if common.PrometheusEnabled { - m := metrics.GetServiceMetrics() - fqn := parent.Name() + "/" + route.TargetName() - waker.metric = m.HealthStatus.With(metrics.HealthMetricLabels(fqn)) - waker.metric.Set(float64(watcher.Status())) - } return watcher, nil } diff --git a/internal/metrics/http_handler.go b/internal/metrics/http_handler.go deleted file mode 100644 index 31fb3cc..0000000 --- a/internal/metrics/http_handler.go +++ /dev/null @@ -1,13 +0,0 @@ -package metrics - -import ( - "net/http" - - "github.com/prometheus/client_golang/prometheus/promhttp" -) - -func NewHandler() http.Handler { - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) - return mux -} diff --git a/internal/metrics/labels.go b/internal/metrics/labels.go index 0c42c10..4542689 100644 --- a/internal/metrics/labels.go +++ b/internal/metrics/labels.go @@ -9,7 +9,6 @@ type ( StreamRouteMetricLabels struct { Service, Visitor string } - HealthMetricLabels string ) func (lbl *HTTPRouteMetricLabels) toPromLabels() prometheus.Labels { @@ -28,9 +27,3 @@ func (lbl *StreamRouteMetricLabels) toPromLabels() prometheus.Labels { "visitor": lbl.Visitor, } } - -func (lbl HealthMetricLabels) toPromLabels() prometheus.Labels { - return prometheus.Labels{ - "service": string(lbl), - } -} diff --git a/internal/metrics/period/entries.go b/internal/metrics/period/entries.go new file mode 100644 index 0000000..ee31d7f --- /dev/null +++ b/internal/metrics/period/entries.go @@ -0,0 +1,45 @@ +package period + +import "time" + +type Entries[T any] struct { + entries [maxEntries]*T + index int + count int + interval int64 + lastAdd int64 +} + +const maxEntries = 500 + +func newEntries[T any](interval int64) *Entries[T] { + return &Entries[T]{ + interval: interval, + lastAdd: time.Now().Unix(), + } +} + +func (e *Entries[T]) Add(now int64, info *T) { + if now-e.lastAdd < e.interval { + return + } + e.entries[e.index] = info + e.index++ + if e.index >= maxEntries { + e.index = 0 + } + if e.count < maxEntries { + e.count++ + } + e.lastAdd = now +} + +func (e *Entries[T]) Get() []*T { + if e.count < maxEntries { + return e.entries[:e.count] + } + res := make([]*T, maxEntries) + copy(res, e.entries[e.index:]) + copy(res[maxEntries-e.index:], e.entries[:e.index]) + return res +} diff --git a/internal/metrics/period/handler.go b/internal/metrics/period/handler.go new file mode 100644 index 0000000..2db413d --- /dev/null +++ b/internal/metrics/period/handler.go @@ -0,0 +1,49 @@ +package period + +import ( + "net/http" + + "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" + "github.com/yusing/go-proxy/internal/api/v1/utils" + config "github.com/yusing/go-proxy/internal/config/types" +) + +func (p *Poller[T, AggregateT]) lastResultHandler(w http.ResponseWriter, r *http.Request) { + info := p.GetLastResult() + if info == nil { + http.Error(w, "no system info", http.StatusNoContent) + return + } + utils.RespondJSON(w, r, info) +} + +func (p *Poller[T, AggregateT]) ServeHTTP(w http.ResponseWriter, r *http.Request) { + period := r.URL.Query().Get("period") + if period == "" { + p.lastResultHandler(w, r) + return + } + periodFilter := Filter(period) + if !periodFilter.IsValid() { + http.Error(w, "invalid period", http.StatusBadRequest) + return + } + rangeData := p.Get(periodFilter) + if len(rangeData) == 0 { + http.Error(w, "no data", http.StatusNoContent) + return + } + if p.aggregator != nil { + aggregated := p.aggregator(rangeData...) + utils.RespondJSON(w, r, aggregated) + } else { + utils.RespondJSON(w, r, rangeData) + } +} + +func (p *Poller[T, AggregateT]) ServeWS(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Request) { + utils.PeriodicWS(cfg, w, r, p.interval, func(conn *websocket.Conn) error { + return wsjson.Write(r.Context(), conn, p.GetLastResult()) + }) +} diff --git a/internal/metrics/period/period.go b/internal/metrics/period/period.go new file mode 100644 index 0000000..15059c9 --- /dev/null +++ b/internal/metrics/period/period.go @@ -0,0 +1,67 @@ +package period + +import ( + "sync" + "time" +) + +type Period[T any] struct { + FifteenMinutes *Entries[T] + OneHour *Entries[T] + OneDay *Entries[T] + OneMonth *Entries[T] + mu sync.RWMutex +} + +type Filter string + +const ( + PeriodFifteenMinutes Filter = "15m" + PeriodOneHour Filter = "1h" + PeriodOneDay Filter = "1d" + PeriodOneMonth Filter = "1m" +) + +func NewPeriod[T any]() *Period[T] { + return &Period[T]{ + FifteenMinutes: newEntries[T](15 * 60 / maxEntries), + OneHour: newEntries[T](60 * 60 / maxEntries), + OneDay: newEntries[T](24 * 60 * 60 / maxEntries), + OneMonth: newEntries[T](30 * 24 * 60 * 60 / maxEntries), + } +} + +func (p *Period[T]) Add(info *T) { + p.mu.Lock() + defer p.mu.Unlock() + now := time.Now().Unix() + p.FifteenMinutes.Add(now, info) + p.OneHour.Add(now, info) + p.OneDay.Add(now, info) + p.OneMonth.Add(now, info) +} + +func (p *Period[T]) Get(filter Filter) []*T { + p.mu.RLock() + defer p.mu.RUnlock() + switch filter { + case PeriodFifteenMinutes: + return p.FifteenMinutes.Get() + case PeriodOneHour: + return p.OneHour.Get() + case PeriodOneDay: + return p.OneDay.Get() + case PeriodOneMonth: + return p.OneMonth.Get() + default: + panic("invalid period filter") + } +} + +func (filter Filter) IsValid() bool { + switch filter { + case PeriodFifteenMinutes, PeriodOneHour, PeriodOneDay, PeriodOneMonth: + return true + } + return false +} diff --git a/internal/metrics/period/poller.go b/internal/metrics/period/poller.go new file mode 100644 index 0000000..4caf6c2 --- /dev/null +++ b/internal/metrics/period/poller.go @@ -0,0 +1,132 @@ +package period + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/yusing/go-proxy/internal/logging" + "github.com/yusing/go-proxy/internal/task" +) + +type ( + PollFunc[T any] func(ctx context.Context) (*T, error) + AggregateFunc[T, AggregateT any] func(entries ...*T) AggregateT + Poller[T, AggregateT any] struct { + name string + poll PollFunc[T] + aggregator AggregateFunc[T, AggregateT] + period *Period[T] + interval time.Duration + lastResult *T + errs []pollErr + } + pollErr struct { + err error + count int + } +) + +const gatherErrsInterval = 30 * time.Second + +func NewPoller[T any]( + name string, + interval time.Duration, + poll PollFunc[T], +) *Poller[T, T] { + return &Poller[T, T]{ + name: name, + poll: poll, + period: NewPeriod[T](), + interval: interval, + } +} + +func NewPollerWithAggregator[T, AggregateT any]( + name string, + interval time.Duration, + poll PollFunc[T], + aggregator AggregateFunc[T, AggregateT], +) *Poller[T, AggregateT] { + return &Poller[T, AggregateT]{ + name: name, + poll: poll, + aggregator: aggregator, + period: NewPeriod[T](), + interval: interval, + } +} + +func (p *Poller[T, AggregateT]) appendErr(err error) { + if len(p.errs) == 0 { + p.errs = []pollErr{ + {err: err, count: 1}, + } + return + } + for i, e := range p.errs { + if e.err.Error() == err.Error() { + p.errs[i].count++ + return + } + } + p.errs = append(p.errs, pollErr{err: err, count: 1}) +} + +func (p *Poller[T, AggregateT]) gatherErrs() (string, bool) { + if len(p.errs) == 0 { + return "", false + } + title := fmt.Sprintf("Poller %s has encountered %d errors in the last %s seconds:", p.name, len(p.errs), gatherErrsInterval) + errs := make([]string, 0, len(p.errs)+1) + errs = append(errs, title) + for _, e := range p.errs { + errs = append(errs, fmt.Sprintf("%s: %d times", e.err.Error(), e.count)) + } + return strings.Join(errs, "\n"), true +} + +func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) (*T, error) { + ctx, cancel := context.WithTimeout(ctx, p.interval) + defer cancel() + return p.poll(ctx) +} + +func (p *Poller[T, AggregateT]) Start() { + go func() { + ctx := task.RootContext() + ticker := time.NewTicker(p.interval) + gatherErrsTicker := time.NewTicker(gatherErrsInterval) + defer ticker.Stop() + defer gatherErrsTicker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + data, err := p.pollWithTimeout(ctx) + if err != nil { + p.appendErr(err) + continue + } + p.period.Add(data) + p.lastResult = data + case <-gatherErrsTicker.C: + errs, ok := p.gatherErrs() + if ok { + logging.Error().Msg(errs) + } + } + } + }() +} + +func (p *Poller[T, AggregateT]) Get(filter Filter) []*T { + return p.period.Get(filter) +} + +func (p *Poller[T, AggregateT]) GetLastResult() *T { + return p.lastResult +} diff --git a/internal/metrics/system_info.go b/internal/metrics/system_info.go deleted file mode 100644 index fd9a94c..0000000 --- a/internal/metrics/system_info.go +++ /dev/null @@ -1,80 +0,0 @@ -package metrics - -import ( - "context" - "encoding/json" - "time" - - "github.com/shirou/gopsutil/v4/cpu" - "github.com/shirou/gopsutil/v4/disk" - "github.com/shirou/gopsutil/v4/mem" - "github.com/shirou/gopsutil/v4/net" - "github.com/shirou/gopsutil/v4/sensors" - "github.com/yusing/go-proxy/internal/utils/strutils" -) - -type ( - SystemInfo struct { - CPUAverage float64 - Memory *mem.VirtualMemoryStat - Disk *disk.UsageStat - Network *net.IOCountersStat - Sensors []sensors.TemperatureStat - } -) - -func GetSystemInfo(ctx context.Context) (*SystemInfo, error) { - memoryInfo, err := mem.VirtualMemory() - if err != nil { - return nil, err - } - cpuAverage, err := cpu.PercentWithContext(ctx, time.Second, false) - if err != nil { - return nil, err - } - diskInfo, err := disk.Usage("/") - if err != nil { - return nil, err - } - networkInfo, err := net.IOCounters(false) - if err != nil { - return nil, err - } - sensors, err := sensors.SensorsTemperatures() - if err != nil { - return nil, err - } - - return &SystemInfo{ - CPUAverage: cpuAverage[0], - Memory: memoryInfo, - Disk: diskInfo, - Network: &networkInfo[0], - Sensors: sensors, - }, nil -} - -func (info *SystemInfo) MarshalJSON() ([]byte, error) { - return json.Marshal(map[string]interface{}{ - "cpu_average": info.CPUAverage, - "memory": map[string]interface{}{ - "total": strutils.FormatByteSize(info.Memory.Total), - "available": strutils.FormatByteSize(info.Memory.Available), - "used": strutils.FormatByteSize(info.Memory.Used), - "used_percent": info.Memory.UsedPercent, - "free": strutils.FormatByteSize(info.Memory.Free), - }, - "disk": map[string]interface{}{ - "total": strutils.FormatByteSize(info.Disk.Total), - "used": strutils.FormatByteSize(info.Disk.Used), - "used_percent": info.Disk.UsedPercent, - "free": strutils.FormatByteSize(info.Disk.Free), - "fs_type": info.Disk.Fstype, - }, - "network": map[string]interface{}{ - "bytes_sent": strutils.FormatByteSize(info.Network.BytesSent), - "bytes_recv": strutils.FormatByteSize(info.Network.BytesRecv), - }, - "sensors": info.Sensors, - }) -} diff --git a/internal/metrics/systeminfo/system_info.go b/internal/metrics/systeminfo/system_info.go new file mode 100644 index 0000000..b060baa --- /dev/null +++ b/internal/metrics/systeminfo/system_info.go @@ -0,0 +1,60 @@ +package systeminfo + +import ( + "context" + "time" + + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/disk" + "github.com/shirou/gopsutil/v4/mem" + "github.com/shirou/gopsutil/v4/net" + "github.com/shirou/gopsutil/v4/sensors" + "github.com/yusing/go-proxy/internal/metrics/period" +) + +type SystemInfo struct { + Timestamp time.Time + CPUAverage float64 + Memory *mem.VirtualMemoryStat + Disk *disk.UsageStat + Network *net.IOCountersStat + Sensors []sensors.TemperatureStat +} + +var Poller = period.NewPoller("system_info", 1*time.Second, getSystemInfo) + +func init() { + Poller.Start() +} + +func getSystemInfo(ctx context.Context) (*SystemInfo, error) { + memoryInfo, err := mem.VirtualMemory() + if err != nil { + return nil, err + } + cpuAverage, err := cpu.PercentWithContext(ctx, 150*time.Millisecond, false) + if err != nil { + return nil, err + } + diskInfo, err := disk.Usage("/") + if err != nil { + return nil, err + } + networkInfo, err := net.IOCounters(false) + if err != nil { + return nil, err + } + sensors, err := sensors.SensorsTemperatures() + if err != nil { + return nil, err + } + + return &SystemInfo{ + Timestamp: time.Now(), + CPUAverage: cpuAverage[0], + Memory: memoryInfo, + Disk: diskInfo, + Network: &networkInfo[0], + Sensors: sensors, + }, nil +} diff --git a/internal/metrics/uptime/uptime.go b/internal/metrics/uptime/uptime.go new file mode 100644 index 0000000..49b3b32 --- /dev/null +++ b/internal/metrics/uptime/uptime.go @@ -0,0 +1,73 @@ +package uptime + +import ( + "context" + "time" + + "github.com/yusing/go-proxy/internal/metrics/period" + "github.com/yusing/go-proxy/internal/route/routes/routequery" + "github.com/yusing/go-proxy/internal/watcher/health" +) + +type ( + Statuses struct { + Statuses map[string]health.Status + Timestamp int64 + } + Status struct { + Status health.Status + Timestamp int64 + } + Aggregated map[string][]Status +) + +var Poller = period.NewPollerWithAggregator("uptime", 1*time.Second, getStatuses, aggregateStatuses) + +func init() { + Poller.Start() +} + +func getStatuses(ctx context.Context) (*Statuses, error) { + return &Statuses{ + Statuses: routequery.HealthStatuses(), + Timestamp: time.Now().Unix(), + }, nil +} + +func aggregateStatuses(entries ...*Statuses) any { + aggregated := make(Aggregated) + for _, entry := range entries { + for alias, status := range entry.Statuses { + aggregated[alias] = append(aggregated[alias], Status{ + Status: status, + Timestamp: entry.Timestamp, + }) + } + } + return aggregated.finalize() +} + +func (a Aggregated) calculateUptime(alias string) float64 { + aggregated := a[alias] + if len(aggregated) == 0 { + return 0 + } + uptime := 0 + for _, status := range aggregated { + if status.Status == health.StatusHealthy { + uptime++ + } + } + return float64(uptime) / float64(len(aggregated)) +} + +func (a Aggregated) finalize() map[string]map[string]interface{} { + result := make(map[string]map[string]interface{}, len(a)) + for alias, statuses := range a { + result[alias] = map[string]interface{}{ + "uptime": a.calculateUptime(alias), + "statuses": statuses, + } + } + return result +} diff --git a/internal/route/routes/routequery/query.go b/internal/route/routes/routequery/query.go index d48681d..1d84d62 100644 --- a/internal/route/routes/routequery/query.go +++ b/internal/route/routes/routequery/query.go @@ -10,6 +10,7 @@ import ( "github.com/yusing/go-proxy/internal/route/routes" route "github.com/yusing/go-proxy/internal/route/types" "github.com/yusing/go-proxy/internal/utils/strutils" + "github.com/yusing/go-proxy/internal/watcher/health" ) func getHealthInfo(r route.Route) map[string]string { @@ -30,11 +31,19 @@ func getHealthInfo(r route.Route) map[string]string { func HealthMap() map[string]map[string]string { healthMap := make(map[string]map[string]string) - routes.GetHTTPRoutes().RangeAll(func(alias string, r route.HTTPRoute) { + routes.RangeRoutes(func(alias string, r route.Route) { healthMap[alias] = getHealthInfo(r) }) - routes.GetStreamRoutes().RangeAll(func(alias string, r route.StreamRoute) { - healthMap[alias] = getHealthInfo(r) + return healthMap +} + +func HealthStatuses() map[string]health.Status { + healthMap := make(map[string]health.Status, routes.NumRoutes()) + routes.RangeRoutes(func(alias string, r route.Route) { + if r.HealthMonitor() == nil { + return + } + healthMap[alias] = r.HealthMonitor().Status() }) return healthMap } diff --git a/internal/route/routes/routes.go b/internal/route/routes/routes.go index a372aa5..535c4a5 100644 --- a/internal/route/routes/routes.go +++ b/internal/route/routes/routes.go @@ -10,6 +10,19 @@ var ( streamRoutes = F.NewMapOf[string, types.StreamRoute]() ) +func RangeRoutes(callback func(alias string, r types.Route)) { + httpRoutes.RangeAll(func(alias string, r types.HTTPRoute) { + callback(alias, r) + }) + streamRoutes.RangeAll(func(alias string, r types.StreamRoute) { + callback(alias, r) + }) +} + +func NumRoutes() int { + return httpRoutes.Size() + streamRoutes.Size() +} + func GetHTTPRoutes() F.Map[string, types.HTTPRoute] { return httpRoutes } diff --git a/internal/watcher/health/monitor/monitor.go b/internal/watcher/health/monitor/monitor.go index b27dd43..0d76877 100644 --- a/internal/watcher/health/monitor/monitor.go +++ b/internal/watcher/health/monitor/monitor.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "github.com/yusing/go-proxy/internal/common" E "github.com/yusing/go-proxy/internal/error" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/metrics" @@ -63,10 +62,6 @@ func (mon *monitor) Start(parent task.Parent) E.Error { return E.From(ErrNegativeInterval) } - if common.PrometheusEnabled { - mon.metric = metrics.GetServiceMetrics().HealthStatus.With(metrics.HealthMetricLabels(mon.service)) - } - mon.service = parent.Name() mon.task = parent.Subtask("health_monitor")