From a8a209f0b04d941598e511823f65a7f31a5c4773 Mon Sep 17 00:00:00 2001 From: yusing Date: Mon, 17 Feb 2025 07:18:59 +0800 Subject: [PATCH] simplify some code and implement metrics storage --- .gitignore | 1 + agent/pkg/agent/agent.compose.yml | 1 + compose.example.yml | 1 + internal/metrics/period/entries.go | 31 ++++++ internal/metrics/period/handler.go | 18 ++-- internal/metrics/period/period.go | 12 ++- internal/metrics/period/poller.go | 95 ++++++++++++++----- internal/metrics/systeminfo/system_info.go | 38 ++++++-- .../metrics/systeminfo/system_info_test.go | 15 +++ internal/metrics/uptime/uptime.go | 38 +++----- internal/watcher/health/status.go | 24 +++++ 11 files changed, 204 insertions(+), 70 deletions(-) diff --git a/.gitignore b/.gitignore index b75b081..e78c180 100755 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ bin/ error_pages/ !examples/error_pages/ profiles/ +data/ logs/ log/ diff --git a/agent/pkg/agent/agent.compose.yml b/agent/pkg/agent/agent.compose.yml index c90e25e..90bea22 100644 --- a/agent/pkg/agent/agent.compose.yml +++ b/agent/pkg/agent/agent.compose.yml @@ -12,3 +12,4 @@ services: volumes: - /var/run/docker.sock:/var/run/docker.sock - ./compose.yml:/app/compose.yml + - ./data:/app/data diff --git a/compose.example.yml b/compose.example.yml index 0b34db9..06ef4a9 100755 --- a/compose.example.yml +++ b/compose.example.yml @@ -34,6 +34,7 @@ services: - ./config:/app/config - ./logs:/app/logs - ./error_pages:/app/error_pages + - ./data:/app/data # To use autocert, certs will be stored in "./certs". # You can also use a docker volume to store it diff --git a/internal/metrics/period/entries.go b/internal/metrics/period/entries.go index fe56925..d9a5554 100644 --- a/internal/metrics/period/entries.go +++ b/internal/metrics/period/entries.go @@ -1,6 +1,7 @@ package period import ( + "encoding/json" "time" ) @@ -46,3 +47,33 @@ func (e *Entries[T]) Get() []*T { copy(res[maxEntries-e.index:], e.entries[:e.index]) return res } + +func (e *Entries[T]) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]any{ + "entries": e.Get(), + "interval": e.interval, + }) +} + +func (e *Entries[T]) UnmarshalJSON(data []byte) error { + var v struct { + Entries []*T `json:"entries"` + Interval time.Duration `json:"interval"` + } + if err := json.Unmarshal(data, &v); err != nil { + return err + } + if len(v.Entries) == 0 { + return nil + } + entries := v.Entries + if len(entries) > maxEntries { + entries = entries[:maxEntries] + } + now := time.Now() + for _, info := range entries { + e.Add(now, info) + } + e.interval = v.Interval + return nil +} diff --git a/internal/metrics/period/handler.go b/internal/metrics/period/handler.go index e2fa787..e2d4e1b 100644 --- a/internal/metrics/period/handler.go +++ b/internal/metrics/period/handler.go @@ -71,16 +71,12 @@ func (p *Poller[T, AggregateT]) getRespData(r *http.Request) (any, error) { if !ok { return nil, errors.New("invalid period") } - if p.aggregator != nil { - total, aggregated := p.aggregator(rangeData, query) - if total == -1 { - return nil, errors.New("bad request") - } - return map[string]any{ - "total": total, - "data": aggregated, - }, nil - } else { - return rangeData, nil + total, aggregated := p.aggregate(rangeData, query) + if total == -1 { + return nil, errors.New("bad request") } + return map[string]any{ + "total": total, + "data": aggregated, + }, nil } diff --git a/internal/metrics/period/period.go b/internal/metrics/period/period.go index 1efd48d..5c9f504 100644 --- a/internal/metrics/period/period.go +++ b/internal/metrics/period/period.go @@ -6,7 +6,7 @@ import ( ) type Period[T any] struct { - Entries map[Filter]*Entries[T] + Entries map[Filter]*Entries[T] `json:"entries"` mu sync.RWMutex } @@ -42,3 +42,13 @@ func (p *Period[T]) Get(filter Filter) ([]*T, bool) { } return period.Get(), true } + +func (p *Period[T]) Total() int { + p.mu.RLock() + defer p.mu.RUnlock() + total := 0 + for _, period := range p.Entries { + total += period.count + } + return total +} diff --git a/internal/metrics/period/poller.go b/internal/metrics/period/poller.go index 3ff9da9..4c6ecb2 100644 --- a/internal/metrics/period/poller.go +++ b/internal/metrics/period/poller.go @@ -2,8 +2,11 @@ package period import ( "context" + "encoding/json" "fmt" "net/url" + "os" + "path/filepath" "time" "github.com/yusing/go-proxy/internal/gperr" @@ -12,13 +15,13 @@ import ( ) type ( - PollFunc[T any] func(ctx context.Context, lastResult *T) (*T, error) - AggregateFunc[T, AggregateT any] func(entries []*T, query url.Values) (total int, result AggregateT) - FilterFunc[T any] func(entries []*T, keyword string) (filtered []*T) - Poller[T, AggregateT any] struct { + PollFunc[T any] func(ctx context.Context, lastResult *T) (*T, error) + AggregateFunc[T any, AggregateT json.Marshaler] func(entries []*T, query url.Values) (total int, result AggregateT) + FilterFunc[T any] func(entries []*T, keyword string) (filtered []*T) + Poller[T any, AggregateT json.Marshaler] struct { name string poll PollFunc[T] - aggregator AggregateFunc[T, AggregateT] + aggregate AggregateFunc[T, AggregateT] resultFilter FilterFunc[T] period *Period[T] lastResult *T @@ -33,32 +36,50 @@ type ( const ( pollInterval = 1 * time.Second gatherErrsInterval = 30 * time.Second + saveInterval = 5 * time.Minute + + saveBaseDir = "data/metrics" ) -func NewPoller[T any]( - name string, - poll PollFunc[T], -) *Poller[T, T] { - return &Poller[T, T]{ - name: name, - poll: poll, - period: NewPeriod[T](), +func init() { + if err := os.MkdirAll(saveBaseDir, 0o755); err != nil { + panic(fmt.Sprintf("failed to create metrics data directory: %s", err)) } } -func NewPollerWithAggregator[T, AggregateT any]( +func NewPoller[T any, AggregateT json.Marshaler]( name string, poll PollFunc[T], aggregator AggregateFunc[T, AggregateT], ) *Poller[T, AggregateT] { return &Poller[T, AggregateT]{ - name: name, - poll: poll, - aggregator: aggregator, - period: NewPeriod[T](), + name: name, + poll: poll, + aggregate: aggregator, + period: NewPeriod[T](), } } +func (p *Poller[T, AggregateT]) savePath() string { + return filepath.Join(saveBaseDir, fmt.Sprintf("%s.json", p.name)) +} + +func (p *Poller[T, AggregateT]) load() error { + entries, err := os.ReadFile(p.savePath()) + if err != nil { + return err + } + return json.Unmarshal(entries, &p.period) +} + +func (p *Poller[T, AggregateT]) save() error { + entries, err := json.Marshal(p.period) + if err != nil { + return err + } + return os.WriteFile(p.savePath(), entries, 0o644) +} + func (p *Poller[T, AggregateT]) WithResultFilter(filter FilterFunc[T]) *Poller[T, AggregateT] { p.resultFilter = filter return p @@ -108,23 +129,45 @@ func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) { } func (p *Poller[T, AggregateT]) Start() { + t := task.RootTask("poller." + p.name) go func() { - ctx := task.RootContext() - ticker := time.NewTicker(pollInterval) + err := p.load() + if err != nil { + if !os.IsNotExist(err) { + logging.Error().Err(err).Msgf("failed to load last metrics data for %s", p.name) + } + } else { + logging.Debug().Msgf("Loaded last metrics data for %s, %d entries", p.name, p.period.Total()) + } + + pollTicker := time.NewTicker(pollInterval) gatherErrsTicker := time.NewTicker(gatherErrsInterval) - defer ticker.Stop() - defer gatherErrsTicker.Stop() + saveTicker := time.NewTicker(saveInterval) + + defer func() { + pollTicker.Stop() + gatherErrsTicker.Stop() + saveTicker.Stop() + + p.save() + t.Finish(nil) + }() logging.Debug().Msgf("Starting poller %s with interval %s", p.name, pollInterval) - p.pollWithTimeout(ctx) + p.pollWithTimeout(t.Context()) for { select { - case <-ctx.Done(): + case <-t.Context().Done(): return - case <-ticker.C: - p.pollWithTimeout(ctx) + case <-pollTicker.C: + p.pollWithTimeout(t.Context()) + case <-saveTicker.C: + err := p.save() + if err != nil { + p.appendErr(err) + } case <-gatherErrsTicker.C: errs, ok := p.gatherErrs() if ok { diff --git a/internal/metrics/systeminfo/system_info.go b/internal/metrics/systeminfo/system_info.go index cd740f0..b34b86b 100644 --- a/internal/metrics/systeminfo/system_info.go +++ b/internal/metrics/systeminfo/system_info.go @@ -3,6 +3,7 @@ package systeminfo import ( "bytes" "context" + "encoding/json" "errors" "fmt" "net/url" @@ -54,17 +55,18 @@ type ( UploadSpeed float64 `json:"upload_speed"` DownloadSpeed float64 `json:"download_speed"` } + Sensors []sensors.TemperatureStat Aggregated []map[string]any ) type SystemInfo struct { - Timestamp int64 `json:"timestamp"` - CPUAverage *float64 `json:"cpu_average"` - Memory *MemoryUsage `json:"memory"` - Disks map[string]*Disk `json:"disks"` // disk usage by partition - DisksIO map[string]*DiskIO `json:"disks_io"` // disk IO by device - Network *Network `json:"network"` - Sensors []sensors.TemperatureStat `json:"sensors"` // sensor temperature by key + Timestamp int64 `json:"timestamp"` + CPUAverage *float64 `json:"cpu_average"` + Memory *MemoryUsage `json:"memory"` + Disks map[string]*Disk `json:"disks"` // disk usage by partition + DisksIO map[string]*DiskIO `json:"disks_io"` // disk IO by device + Network *Network `json:"network"` + Sensors Sensors `json:"sensors"` // sensor temperature by key } const ( @@ -93,7 +95,7 @@ var allQueries = []string{ querySensorTemperature, } -var Poller = period.NewPollerWithAggregator("system_info", getSystemInfo, aggregate) +var Poller = period.NewPoller("system_info", getSystemInfo, aggregate) var bufPool = sync.Pool{ New: func() any { @@ -421,6 +423,26 @@ func (s *SystemInfo) MarshalJSON() ([]byte, error) { return []byte(b.String()), nil } +func (s *Sensors) UnmarshalJSON(data []byte) error { + var v map[string]map[string]any + if err := json.Unmarshal(data, &v); err != nil { + return err + } + if len(v) == 0 { + return nil + } + *s = make(Sensors, 0, len(v)) + for k, v := range v { + *s = append(*s, sensors.TemperatureStat{ + SensorKey: k, + Temperature: v["temperature"].(float64), + High: v["high"].(float64), + Critical: v["critical"].(float64), + }) + } + return nil +} + // recharts friendly func aggregate(entries []*SystemInfo, query url.Values) (total int, result Aggregated) { n := len(entries) diff --git a/internal/metrics/systeminfo/system_info_test.go b/internal/metrics/systeminfo/system_info_test.go index 128edc2..65dfb99 100644 --- a/internal/metrics/systeminfo/system_info_test.go +++ b/internal/metrics/systeminfo/system_info_test.go @@ -6,6 +6,7 @@ import ( "reflect" "testing" + "github.com/shirou/gopsutil/v4/sensors" . "github.com/yusing/go-proxy/internal/utils/testing" ) @@ -60,6 +61,20 @@ var testInfo = &SystemInfo{ UploadSpeed: 1024.5, DownloadSpeed: 2048.5, }, + Sensors: []sensors.TemperatureStat{ + { + SensorKey: "cpu_temp", + Temperature: 30.0, + High: 40.0, + Critical: 50.0, + }, + { + SensorKey: "gpu_temp", + Temperature: 40.0, + High: 50.0, + Critical: 60.0, + }, + }, } func TestSystemInfo(t *testing.T) { diff --git a/internal/metrics/uptime/uptime.go b/internal/metrics/uptime/uptime.go index 8307767..4b40cf7 100644 --- a/internal/metrics/uptime/uptime.go +++ b/internal/metrics/uptime/uptime.go @@ -17,19 +17,19 @@ import ( type ( StatusByAlias struct { - Map map[string]*routequery.HealthInfoRaw - Timestamp time.Time + Map map[string]*routequery.HealthInfoRaw `json:"statuses"` + Timestamp int64 `json:"timestamp"` } Status struct { - Status health.Status - Latency time.Duration - Timestamp time.Time + Status health.Status `json:"status"` + Latency int64 `json:"latency"` + Timestamp int64 `json:"timestamp"` } RouteStatuses map[string][]*Status Aggregated []map[string]any ) -var Poller = period.NewPollerWithAggregator("uptime", getStatuses, aggregateStatuses) +var Poller = period.NewPoller("uptime", getStatuses, aggregateStatuses) func init() { Poller.Start() @@ -38,7 +38,7 @@ func init() { func getStatuses(ctx context.Context, _ *StatusByAlias) (*StatusByAlias, error) { return &StatusByAlias{ Map: routequery.HealthInfo(), - Timestamp: time.Now(), + Timestamp: time.Now().Unix(), }, nil } @@ -52,7 +52,7 @@ func aggregateStatuses(entries []*StatusByAlias, query url.Values) (int, Aggrega for alias, status := range entry.Map { statuses[alias] = append(statuses[alias], &Status{ Status: status.Status, - Latency: status.Latency, + Latency: status.Latency.Milliseconds(), Timestamp: entry.Timestamp, }) } @@ -67,11 +67,12 @@ func aggregateStatuses(entries []*StatusByAlias, query url.Values) (int, Aggrega return len(statuses), statuses.aggregate(limit, offset) } -func (rs RouteStatuses) calculateInfo(statuses []*Status) (up float64, down float64, idle float64, latency int64) { +func (rs RouteStatuses) calculateInfo(statuses []*Status) (up float64, down float64, idle float64, _ float64) { if len(statuses) == 0 { return 0, 0, 0, 0 } total := float64(0) + latency := float64(0) for _, status := range statuses { // ignoring unknown; treating napping and starting as downtime if status.Status == health.StatusUnknown { @@ -86,12 +87,12 @@ func (rs RouteStatuses) calculateInfo(statuses []*Status) (up float64, down floa down++ } total++ - latency += status.Latency.Milliseconds() + latency += float64(status.Latency) } if total == 0 { return 0, 0, 0, 0 } - return up / total, down / total, idle / total, latency / int64(total) + return up / total, down / total, idle / total, latency / total } func (rs RouteStatuses) aggregate(limit int, offset int) Aggregated { @@ -128,17 +129,6 @@ func (rs RouteStatuses) aggregate(limit int, offset int) Aggregated { return result } -func (s *Status) MarshalJSON() ([]byte, error) { - return json.Marshal(map[string]interface{}{ - "status": s.Status.String(), - "latency": s.Latency.Milliseconds(), - "timestamp": s.Timestamp.Unix(), - }) -} - -func (s *StatusByAlias) MarshalJSON() ([]byte, error) { - return json.Marshal(map[string]interface{}{ - "statuses": s.Map, - "timestamp": s.Timestamp.Unix(), - }) +func (result Aggregated) MarshalJSON() ([]byte, error) { + return json.Marshal([]map[string]any(result)) } diff --git a/internal/watcher/health/status.go b/internal/watcher/health/status.go index 8ef6842..355dd70 100644 --- a/internal/watcher/health/status.go +++ b/internal/watcher/health/status.go @@ -1,5 +1,7 @@ package health +import "encoding/json" + type Status uint8 const ( @@ -37,6 +39,28 @@ func (s Status) MarshalJSON() ([]byte, error) { return []byte(`"` + s.String() + `"`), nil } +func (s *Status) UnmarshalJSON(data []byte) error { + var str string + if err := json.Unmarshal(data, &str); err != nil { + return err + } + switch str { + case "healthy": + *s = StatusHealthy + case "unhealthy": + *s = StatusUnhealthy + case "napping": + *s = StatusNapping + case "starting": + *s = StatusStarting + case "error": + *s = StatusError + default: + *s = StatusUnknown + } + return nil +} + func (s Status) Good() bool { return s&HealthyMask != 0 }