diff --git a/internal/api/handler.go b/internal/api/handler.go index f28bd0e..94909c2 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -40,6 +40,7 @@ func NewHandler(cfg config.ConfigInstance) http.Handler { mux.HandleFunc("GET", "/v1/logs/ws", auth.RequireAuth(memlogger.LogsWS(cfg.Value().MatchDomains))) mux.HandleFunc("GET", "/v1/favicon", auth.RequireAuth(favicon.GetFavIcon)) mux.HandleFunc("POST", "/v1/homepage/set", auth.RequireAuth(v1.SetHomePageOverrides)) + mux.HandleFunc("GET", "/v1/agents/ws", auth.RequireAuth(useCfg(cfg, v1.AgentsWS))) mux.HandleFunc("GET", "/v1/metrics/system_info", auth.RequireAuth(useCfg(cfg, v1.SystemInfo))) mux.HandleFunc("GET", "/v1/metrics/system_info/ws", auth.RequireAuth(useCfg(cfg, v1.SystemInfo))) mux.HandleFunc("GET", "/v1/metrics/uptime", auth.RequireAuth(uptime.Poller.ServeHTTP)) diff --git a/internal/api/v1/agents.go b/internal/api/v1/agents.go new file mode 100644 index 0000000..a758591 --- /dev/null +++ b/internal/api/v1/agents.go @@ -0,0 +1,18 @@ +package v1 + +import ( + "net/http" + "time" + + "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" + U "github.com/yusing/go-proxy/internal/api/v1/utils" + config "github.com/yusing/go-proxy/internal/config/types" +) + +func AgentsWS(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Request) { + U.PeriodicWS(cfg.Value().MatchDomains, w, r, 10*time.Second, func(conn *websocket.Conn) error { + wsjson.Write(r.Context(), conn, cfg.ListAgents()) + return nil + }) +} diff --git a/internal/metrics/period/handler.go b/internal/metrics/period/handler.go index 9cee84c..07b3852 100644 --- a/internal/metrics/period/handler.go +++ b/internal/metrics/period/handler.go @@ -7,6 +7,7 @@ import ( "github.com/coder/websocket" "github.com/coder/websocket/wsjson" "github.com/yusing/go-proxy/internal/api/v1/utils" + metricsutils "github.com/yusing/go-proxy/internal/metrics/utils" ) func (p *Poller[T, AggregateT]) lastResultHandler(w http.ResponseWriter, r *http.Request) { @@ -19,7 +20,8 @@ func (p *Poller[T, AggregateT]) lastResultHandler(w http.ResponseWriter, r *http } func (p *Poller[T, AggregateT]) ServeHTTP(w http.ResponseWriter, r *http.Request) { - period := r.URL.Query().Get("period") + query := r.URL.Query() + period := query.Get("period") if period == "" { p.lastResultHandler(w, r) return @@ -35,8 +37,11 @@ func (p *Poller[T, AggregateT]) ServeHTTP(w http.ResponseWriter, r *http.Request return } if p.aggregator != nil { - aggregated := p.aggregator(rangeData...) - utils.RespondJSON(w, r, aggregated) + total, aggregated := p.aggregator(rangeData, query) + utils.RespondJSON(w, r, map[string]any{ + "total": total, + "data": aggregated, + }) } else { utils.RespondJSON(w, r, rangeData) } @@ -45,11 +50,13 @@ func (p *Poller[T, AggregateT]) ServeHTTP(w http.ResponseWriter, r *http.Request func (p *Poller[T, AggregateT]) ServeWS(allowedDomains []string, w http.ResponseWriter, r *http.Request) { query := r.URL.Query() period := query.Get("period") - intervalStr := query.Get("interval") - interval, err := time.ParseDuration(intervalStr) + interval := metricsutils.QueryDuration(query, "interval", 0) - minInterval := p.interval() - if err != nil || interval < minInterval { + minInterval := 1 * time.Second + if interval == 0 { + interval = p.interval() + } + if interval < minInterval { interval = minInterval } @@ -65,7 +72,11 @@ func (p *Poller[T, AggregateT]) ServeWS(allowedDomains []string, w http.Response } if p.aggregator != nil { utils.PeriodicWS(allowedDomains, w, r, interval, func(conn *websocket.Conn) error { - return wsjson.Write(r.Context(), conn, p.aggregator(p.Get(periodFilter)...)) + total, aggregated := p.aggregator(p.Get(periodFilter), query) + return wsjson.Write(r.Context(), conn, map[string]any{ + "total": total, + "data": aggregated, + }) }) } else { utils.PeriodicWS(allowedDomains, w, r, interval, func(conn *websocket.Conn) error { diff --git a/internal/metrics/period/period.go b/internal/metrics/period/period.go index 58b5dea..ee6b5b8 100644 --- a/internal/metrics/period/period.go +++ b/internal/metrics/period/period.go @@ -6,6 +6,7 @@ import ( ) type Period[T any] struct { + FiveMinutes *Entries[T] FifteenMinutes *Entries[T] OneHour *Entries[T] OneDay *Entries[T] @@ -16,6 +17,7 @@ type Period[T any] struct { type Filter string const ( + PeriodFiveMinutes Filter = "5m" PeriodFifteenMinutes Filter = "15m" PeriodOneHour Filter = "1h" PeriodOneDay Filter = "1d" @@ -24,6 +26,7 @@ const ( func NewPeriod[T any]() *Period[T] { return &Period[T]{ + FiveMinutes: newEntries[T](5 * time.Minute), FifteenMinutes: newEntries[T](15 * time.Minute), OneHour: newEntries[T](1 * time.Hour), OneDay: newEntries[T](24 * time.Hour), diff --git a/internal/metrics/period/poller.go b/internal/metrics/period/poller.go index 3c42dc7..4a290c5 100644 --- a/internal/metrics/period/poller.go +++ b/internal/metrics/period/poller.go @@ -3,6 +3,7 @@ package period import ( "context" "fmt" + "net/url" "strings" "time" @@ -11,15 +12,17 @@ import ( ) type ( - PollFunc[T any] func(ctx context.Context) (*T, error) - AggregateFunc[T, AggregateT any] func(entries ...*T) AggregateT + PollFunc[T any] func(ctx context.Context, lastResult *T) (*T, error) + AggregateFunc[T, AggregateT any] func(entries []*T, query url.Values) (total int, result AggregateT) + FilterFunc[T any] func(entries []*T, keyword string) (filtered []*T) Poller[T, AggregateT any] struct { - name string - poll PollFunc[T] - aggregator AggregateFunc[T, AggregateT] - period *Period[T] - lastResult *T - errs []pollErr + name string + poll PollFunc[T] + aggregator AggregateFunc[T, AggregateT] + resultFilter FilterFunc[T] + period *Period[T] + lastResult *T + errs []pollErr } pollErr struct { err error @@ -31,7 +34,6 @@ const gatherErrsInterval = 30 * time.Second func NewPoller[T any]( name string, - interval time.Duration, poll PollFunc[T], ) *Poller[T, T] { return &Poller[T, T]{ @@ -43,7 +45,6 @@ func NewPoller[T any]( func NewPollerWithAggregator[T, AggregateT any]( name string, - interval time.Duration, poll PollFunc[T], aggregator AggregateFunc[T, AggregateT], ) *Poller[T, AggregateT] { @@ -55,8 +56,13 @@ func NewPollerWithAggregator[T, AggregateT any]( } } +func (p *Poller[T, AggregateT]) WithResultFilter(filter FilterFunc[T]) *Poller[T, AggregateT] { + p.resultFilter = filter + return p +} + func (p *Poller[T, AggregateT]) interval() time.Duration { - return p.period.FifteenMinutes.interval + return p.period.FiveMinutes.interval } func (p *Poller[T, AggregateT]) appendErr(err error) { @@ -91,7 +97,7 @@ func (p *Poller[T, AggregateT]) gatherErrs() (string, bool) { func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) { ctx, cancel := context.WithTimeout(ctx, p.interval()) defer cancel() - data, err := p.poll(ctx) + data, err := p.poll(ctx, p.lastResult) if err != nil { p.appendErr(err) return diff --git a/internal/metrics/systeminfo/system_info.go b/internal/metrics/systeminfo/system_info.go index b060baa..80808d4 100644 --- a/internal/metrics/systeminfo/system_info.go +++ b/internal/metrics/systeminfo/system_info.go @@ -2,6 +2,7 @@ package systeminfo import ( "context" + "encoding/json" "time" "github.com/shirou/gopsutil/v4/cpu" @@ -10,24 +11,27 @@ import ( "github.com/shirou/gopsutil/v4/net" "github.com/shirou/gopsutil/v4/sensors" "github.com/yusing/go-proxy/internal/metrics/period" + "github.com/yusing/go-proxy/internal/utils/strutils" ) type SystemInfo struct { - Timestamp time.Time - CPUAverage float64 - Memory *mem.VirtualMemoryStat - Disk *disk.UsageStat - Network *net.IOCountersStat - Sensors []sensors.TemperatureStat + Timestamp time.Time + CPUAverage float64 + Memory *mem.VirtualMemoryStat + Disk *disk.UsageStat + NetworkIO *net.IOCountersStat + NetworkUp float64 + NetworkDown float64 + Sensors []sensors.TemperatureStat } -var Poller = period.NewPoller("system_info", 1*time.Second, getSystemInfo) +var Poller = period.NewPoller("system_info", getSystemInfo) func init() { Poller.Start() } -func getSystemInfo(ctx context.Context) (*SystemInfo, error) { +func getSystemInfo(ctx context.Context, lastResult *SystemInfo) (*SystemInfo, error) { memoryInfo, err := mem.VirtualMemory() if err != nil { return nil, err @@ -40,7 +44,7 @@ func getSystemInfo(ctx context.Context) (*SystemInfo, error) { if err != nil { return nil, err } - networkInfo, err := net.IOCounters(false) + networkIO, err := net.IOCounters(false) if err != nil { return nil, err } @@ -48,13 +52,51 @@ func getSystemInfo(ctx context.Context) (*SystemInfo, error) { if err != nil { return nil, err } + var networkUp, networkDown float64 + if lastResult != nil { + interval := time.Since(lastResult.Timestamp).Seconds() + networkUp = float64(networkIO[0].BytesSent-lastResult.NetworkIO.BytesSent) / interval + networkDown = float64(networkIO[0].BytesRecv-lastResult.NetworkIO.BytesRecv) / interval + } return &SystemInfo{ - Timestamp: time.Now(), - CPUAverage: cpuAverage[0], - Memory: memoryInfo, - Disk: diskInfo, - Network: &networkInfo[0], - Sensors: sensors, + Timestamp: time.Now(), + CPUAverage: cpuAverage[0], + Memory: memoryInfo, + Disk: diskInfo, + NetworkIO: &networkIO[0], + NetworkUp: networkUp, + NetworkDown: networkDown, + Sensors: sensors, }, nil } + +func (s *SystemInfo) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]any{ + "timestamp": s.Timestamp.Unix(), + "time": strutils.FormatTime(s.Timestamp), + "cpu_average": s.CPUAverage, + "memory": map[string]any{ + "total": s.Memory.Total, + "available": s.Memory.Available, + "used": s.Memory.Used, + "used_percent": s.Memory.UsedPercent, + }, + "disk": map[string]any{ + "path": s.Disk.Path, + "fstype": s.Disk.Fstype, + "total": s.Disk.Total, + "used": s.Disk.Used, + "used_percent": s.Disk.UsedPercent, + "free": s.Disk.Free, + }, + "network": map[string]any{ + "name": s.NetworkIO.Name, + "bytes_sent": s.NetworkIO.BytesSent, + "bytes_recv": s.NetworkIO.BytesRecv, + "upload_speed": s.NetworkUp, + "download_speed": s.NetworkDown, + }, + "sensors": s.Sensors, + }) +} diff --git a/internal/metrics/uptime/uptime.go b/internal/metrics/uptime/uptime.go index 2f55fd3..e8cbbae 100644 --- a/internal/metrics/uptime/uptime.go +++ b/internal/metrics/uptime/uptime.go @@ -3,72 +3,123 @@ package uptime import ( "context" "encoding/json" + "net/url" + "sort" "time" + "github.com/lithammer/fuzzysearch/fuzzy" "github.com/yusing/go-proxy/internal/metrics/period" + metricsutils "github.com/yusing/go-proxy/internal/metrics/utils" "github.com/yusing/go-proxy/internal/route/routes/routequery" "github.com/yusing/go-proxy/internal/utils/strutils" "github.com/yusing/go-proxy/internal/watcher/health" ) type ( - Statuses struct { - Statuses map[string]health.Status + StatusByAlias struct { + Map map[string]health.WithHealthInfo Timestamp time.Time } Status struct { Status health.Status + Latency time.Duration Timestamp time.Time } - Aggregated map[string][]Status + RouteStatuses map[string][]*Status + Aggregated []map[string]any ) -var Poller = period.NewPollerWithAggregator("uptime", 1*time.Second, getStatuses, aggregateStatuses) +var Poller = period.NewPollerWithAggregator("uptime", getStatuses, aggregateStatuses) func init() { Poller.Start() } -func getStatuses(ctx context.Context) (*Statuses, error) { - return &Statuses{ - Statuses: routequery.HealthStatuses(), - Timestamp: time.Now(), +func getStatuses(ctx context.Context, _ *StatusByAlias) (*StatusByAlias, error) { + now := time.Now() + return &StatusByAlias{ + Map: routequery.HealthInfo(), + Timestamp: now, }, nil } -func aggregateStatuses(entries ...*Statuses) any { - aggregated := make(Aggregated) +func aggregateStatuses(entries []*StatusByAlias, query url.Values) (int, Aggregated) { + limit := metricsutils.QueryInt(query, "limit", 0) + offset := metricsutils.QueryInt(query, "offset", 0) + keyword := query.Get("keyword") + + statuses := make(RouteStatuses) for _, entry := range entries { - for alias, status := range entry.Statuses { - aggregated[alias] = append(aggregated[alias], Status{ - Status: status, + for alias, status := range entry.Map { + statuses[alias] = append(statuses[alias], &Status{ + Status: status.Status(), + Latency: status.Latency(), Timestamp: entry.Timestamp, }) } } - return aggregated.finalize() -} - -func (a Aggregated) calculateUptime(alias string) float64 { - aggregated := a[alias] - if len(aggregated) == 0 { - return 0 - } - uptime := 0 - for _, status := range aggregated { - if status.Status == health.StatusHealthy { - uptime++ + if keyword != "" { + for alias := range statuses { + if !fuzzy.MatchFold(keyword, alias) { + delete(statuses, alias) + } } } - return float64(uptime) / float64(len(aggregated)) + return len(statuses), statuses.aggregate(limit, offset) } -func (a Aggregated) finalize() map[string]map[string]interface{} { - result := make(map[string]map[string]interface{}, len(a)) - for alias, statuses := range a { - result[alias] = map[string]interface{}{ - "uptime": a.calculateUptime(alias), - "statuses": statuses, +func (rs RouteStatuses) calculateInfo(statuses []*Status) (up float64, down float64, idle float64, latency int64) { + if len(statuses) == 0 { + return 0, 0, 0, 0 + } + total := float64(0) + for _, status := range statuses { + // ignoring unknown; treating napping and starting as downtime + if status.Status == health.StatusUnknown { + continue + } + switch { + case status.Status == health.StatusHealthy: + up++ + case status.Status.Idling(): + idle++ + default: + down++ + } + total++ + latency += status.Latency.Milliseconds() + } + if total == 0 { + return 0, 0, 0, 0 + } + return up / total, down / total, idle / total, latency / int64(total) +} + +func (rs RouteStatuses) aggregate(limit int, offset int) Aggregated { + n := len(rs) + beg, end, ok := metricsutils.CalculateBeginEnd(n, limit, offset) + if !ok { + return Aggregated{} + } + i := 0 + sortedAliases := make([]string, n) + for alias := range rs { + sortedAliases[i] = alias + i++ + } + sort.Strings(sortedAliases) + sortedAliases = sortedAliases[beg:end] + result := make(Aggregated, len(sortedAliases)) + for i, alias := range sortedAliases { + statuses := rs[alias] + up, down, idle, latency := rs.calculateInfo(statuses) + result[i] = map[string]any{ + "alias": alias, + "uptime": up, + "downtime": down, + "idle": idle, + "avg_latency": latency, + "statuses": statuses, } } return result @@ -77,15 +128,16 @@ func (a Aggregated) finalize() map[string]map[string]interface{} { func (s *Status) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]interface{}{ "status": s.Status.String(), + "latency": s.Latency.Milliseconds(), "timestamp": s.Timestamp.Unix(), - "tooltip": strutils.FormatTime(s.Timestamp), + "time": strutils.FormatTime(s.Timestamp), }) } -func (s *Statuses) MarshalJSON() ([]byte, error) { +func (s *StatusByAlias) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]interface{}{ - "statuses": s.Statuses, + "statuses": s.Map, "timestamp": s.Timestamp.Unix(), - "tooltip": strutils.FormatTime(s.Timestamp), + "time": strutils.FormatTime(s.Timestamp), }) } diff --git a/internal/metrics/utils/utils.go b/internal/metrics/utils/utils.go new file mode 100644 index 0000000..2432b75 --- /dev/null +++ b/internal/metrics/utils/utils.go @@ -0,0 +1,36 @@ +package metricsutils + +import ( + "net/url" + "strconv" + "time" +) + +func CalculateBeginEnd(n, limit, offset int) (int, int, bool) { + if n == 0 || offset >= n { + return 0, 0, false + } + if limit == 0 { + limit = n + } + if offset+limit > n { + limit = n - offset + } + return offset, offset + limit, true +} + +func QueryInt(query url.Values, key string, defaultValue int) int { + value, _ := strconv.Atoi(query.Get(key)) + if value == 0 { + return defaultValue + } + return value +} + +func QueryDuration(query url.Values, key string, defaultValue time.Duration) time.Duration { + value, _ := time.ParseDuration(query.Get(key)) + if value == 0 { + return defaultValue + } + return value +} diff --git a/internal/route/routes/routequery/query.go b/internal/route/routes/routequery/query.go index 1d84d62..5a0140e 100644 --- a/internal/route/routes/routequery/query.go +++ b/internal/route/routes/routequery/query.go @@ -37,13 +37,13 @@ func HealthMap() map[string]map[string]string { return healthMap } -func HealthStatuses() map[string]health.Status { - healthMap := make(map[string]health.Status, routes.NumRoutes()) +func HealthInfo() map[string]health.WithHealthInfo { + healthMap := make(map[string]health.WithHealthInfo, routes.NumRoutes()) routes.RangeRoutes(func(alias string, r route.Route) { - if r.HealthMonitor() == nil { - return + mon := r.HealthMonitor() + if mon != nil { + healthMap[alias] = mon } - healthMap[alias] = r.HealthMonitor().Status() }) return healthMap } diff --git a/internal/utils/strutils/format.go b/internal/utils/strutils/format.go index af0db95..59b3097 100644 --- a/internal/utils/strutils/format.go +++ b/internal/utils/strutils/format.go @@ -74,7 +74,7 @@ func formatFloat(f float64) string { return strconv.FormatFloat(f, 'f', -1, 64) } -func FormatByteSize[T ~uint64](size T) string { +func FormatByteSize[T ~uint64 | ~float64](size T) (value, unit string) { const ( _ = (1 << (10 * iota)) kb @@ -85,20 +85,25 @@ func FormatByteSize[T ~uint64](size T) string { ) switch { case size < kb: - return fmt.Sprintf("%d B", size) + return fmt.Sprintf("%v", size), "B" case size < mb: - return formatFloat(float64(size)/kb) + "KiB" + return formatFloat(float64(size) / kb), "KiB" case size < gb: - return formatFloat(float64(size)/mb) + "MiB" + return formatFloat(float64(size) / mb), "MiB" case size < tb: - return formatFloat(float64(size)/gb) + "GiB" + return formatFloat(float64(size) / gb), "GiB" case size < pb: - return formatFloat(float64(size/gb)/kb) + "TiB" // prevent overflow + return formatFloat(float64(size/gb) / kb), "TiB" // prevent overflow default: - return formatFloat(float64(size/tb)/kb) + "PiB" // prevent overflow + return formatFloat(float64(size/tb) / kb), "PiB" // prevent overflow } } +func FormatByteSizeWithUnit[T ~uint64 | ~float64](size T) string { + value, unit := FormatByteSize(size) + return value + " " + unit +} + func PortString(port uint16) string { return strconv.FormatUint(uint64(port), 10) } diff --git a/internal/watcher/health/status.go b/internal/watcher/health/status.go index 3202cac..8ef6842 100644 --- a/internal/watcher/health/status.go +++ b/internal/watcher/health/status.go @@ -13,6 +13,7 @@ const ( NumStatuses int = iota - 1 HealthyMask = StatusHealthy | StatusNapping | StatusStarting + IdlingMask = StatusNapping | StatusStarting ) func (s Status) String() string { @@ -43,3 +44,7 @@ func (s Status) Good() bool { func (s Status) Bad() bool { return s&HealthyMask == 0 } + +func (s Status) Idling() bool { + return s&IdlingMask != 0 +}