simplify some code and implement metrics storage

This commit is contained in:
yusing 2025-02-17 07:18:59 +08:00
parent 1b7b6196c5
commit a8a209f0b0
11 changed files with 204 additions and 70 deletions

1
.gitignore vendored
View file

@ -10,6 +10,7 @@ bin/
error_pages/
!examples/error_pages/
profiles/
data/
logs/
log/

View file

@ -12,3 +12,4 @@ services:
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./compose.yml:/app/compose.yml
- ./data:/app/data

View file

@ -34,6 +34,7 @@ services:
- ./config:/app/config
- ./logs:/app/logs
- ./error_pages:/app/error_pages
- ./data:/app/data
# To use autocert, certs will be stored in "./certs".
# You can also use a docker volume to store it

View file

@ -1,6 +1,7 @@
package period
import (
"encoding/json"
"time"
)
@ -46,3 +47,33 @@ func (e *Entries[T]) Get() []*T {
copy(res[maxEntries-e.index:], e.entries[:e.index])
return res
}
func (e *Entries[T]) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]any{
"entries": e.Get(),
"interval": e.interval,
})
}
func (e *Entries[T]) UnmarshalJSON(data []byte) error {
var v struct {
Entries []*T `json:"entries"`
Interval time.Duration `json:"interval"`
}
if err := json.Unmarshal(data, &v); err != nil {
return err
}
if len(v.Entries) == 0 {
return nil
}
entries := v.Entries
if len(entries) > maxEntries {
entries = entries[:maxEntries]
}
now := time.Now()
for _, info := range entries {
e.Add(now, info)
}
e.interval = v.Interval
return nil
}

View file

@ -71,16 +71,12 @@ func (p *Poller[T, AggregateT]) getRespData(r *http.Request) (any, error) {
if !ok {
return nil, errors.New("invalid period")
}
if p.aggregator != nil {
total, aggregated := p.aggregator(rangeData, query)
if total == -1 {
return nil, errors.New("bad request")
}
return map[string]any{
"total": total,
"data": aggregated,
}, nil
} else {
return rangeData, nil
total, aggregated := p.aggregate(rangeData, query)
if total == -1 {
return nil, errors.New("bad request")
}
return map[string]any{
"total": total,
"data": aggregated,
}, nil
}

View file

@ -6,7 +6,7 @@ import (
)
type Period[T any] struct {
Entries map[Filter]*Entries[T]
Entries map[Filter]*Entries[T] `json:"entries"`
mu sync.RWMutex
}
@ -42,3 +42,13 @@ func (p *Period[T]) Get(filter Filter) ([]*T, bool) {
}
return period.Get(), true
}
func (p *Period[T]) Total() int {
p.mu.RLock()
defer p.mu.RUnlock()
total := 0
for _, period := range p.Entries {
total += period.count
}
return total
}

View file

@ -2,8 +2,11 @@ package period
import (
"context"
"encoding/json"
"fmt"
"net/url"
"os"
"path/filepath"
"time"
"github.com/yusing/go-proxy/internal/gperr"
@ -12,13 +15,13 @@ import (
)
type (
PollFunc[T any] func(ctx context.Context, lastResult *T) (*T, error)
AggregateFunc[T, AggregateT any] func(entries []*T, query url.Values) (total int, result AggregateT)
FilterFunc[T any] func(entries []*T, keyword string) (filtered []*T)
Poller[T, AggregateT any] struct {
PollFunc[T any] func(ctx context.Context, lastResult *T) (*T, error)
AggregateFunc[T any, AggregateT json.Marshaler] func(entries []*T, query url.Values) (total int, result AggregateT)
FilterFunc[T any] func(entries []*T, keyword string) (filtered []*T)
Poller[T any, AggregateT json.Marshaler] struct {
name string
poll PollFunc[T]
aggregator AggregateFunc[T, AggregateT]
aggregate AggregateFunc[T, AggregateT]
resultFilter FilterFunc[T]
period *Period[T]
lastResult *T
@ -33,32 +36,50 @@ type (
const (
pollInterval = 1 * time.Second
gatherErrsInterval = 30 * time.Second
saveInterval = 5 * time.Minute
saveBaseDir = "data/metrics"
)
func NewPoller[T any](
name string,
poll PollFunc[T],
) *Poller[T, T] {
return &Poller[T, T]{
name: name,
poll: poll,
period: NewPeriod[T](),
func init() {
if err := os.MkdirAll(saveBaseDir, 0o755); err != nil {
panic(fmt.Sprintf("failed to create metrics data directory: %s", err))
}
}
func NewPollerWithAggregator[T, AggregateT any](
func NewPoller[T any, AggregateT json.Marshaler](
name string,
poll PollFunc[T],
aggregator AggregateFunc[T, AggregateT],
) *Poller[T, AggregateT] {
return &Poller[T, AggregateT]{
name: name,
poll: poll,
aggregator: aggregator,
period: NewPeriod[T](),
name: name,
poll: poll,
aggregate: aggregator,
period: NewPeriod[T](),
}
}
func (p *Poller[T, AggregateT]) savePath() string {
return filepath.Join(saveBaseDir, fmt.Sprintf("%s.json", p.name))
}
func (p *Poller[T, AggregateT]) load() error {
entries, err := os.ReadFile(p.savePath())
if err != nil {
return err
}
return json.Unmarshal(entries, &p.period)
}
func (p *Poller[T, AggregateT]) save() error {
entries, err := json.Marshal(p.period)
if err != nil {
return err
}
return os.WriteFile(p.savePath(), entries, 0o644)
}
func (p *Poller[T, AggregateT]) WithResultFilter(filter FilterFunc[T]) *Poller[T, AggregateT] {
p.resultFilter = filter
return p
@ -108,23 +129,45 @@ func (p *Poller[T, AggregateT]) pollWithTimeout(ctx context.Context) {
}
func (p *Poller[T, AggregateT]) Start() {
t := task.RootTask("poller." + p.name)
go func() {
ctx := task.RootContext()
ticker := time.NewTicker(pollInterval)
err := p.load()
if err != nil {
if !os.IsNotExist(err) {
logging.Error().Err(err).Msgf("failed to load last metrics data for %s", p.name)
}
} else {
logging.Debug().Msgf("Loaded last metrics data for %s, %d entries", p.name, p.period.Total())
}
pollTicker := time.NewTicker(pollInterval)
gatherErrsTicker := time.NewTicker(gatherErrsInterval)
defer ticker.Stop()
defer gatherErrsTicker.Stop()
saveTicker := time.NewTicker(saveInterval)
defer func() {
pollTicker.Stop()
gatherErrsTicker.Stop()
saveTicker.Stop()
p.save()
t.Finish(nil)
}()
logging.Debug().Msgf("Starting poller %s with interval %s", p.name, pollInterval)
p.pollWithTimeout(ctx)
p.pollWithTimeout(t.Context())
for {
select {
case <-ctx.Done():
case <-t.Context().Done():
return
case <-ticker.C:
p.pollWithTimeout(ctx)
case <-pollTicker.C:
p.pollWithTimeout(t.Context())
case <-saveTicker.C:
err := p.save()
if err != nil {
p.appendErr(err)
}
case <-gatherErrsTicker.C:
errs, ok := p.gatherErrs()
if ok {

View file

@ -3,6 +3,7 @@ package systeminfo
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
@ -54,17 +55,18 @@ type (
UploadSpeed float64 `json:"upload_speed"`
DownloadSpeed float64 `json:"download_speed"`
}
Sensors []sensors.TemperatureStat
Aggregated []map[string]any
)
type SystemInfo struct {
Timestamp int64 `json:"timestamp"`
CPUAverage *float64 `json:"cpu_average"`
Memory *MemoryUsage `json:"memory"`
Disks map[string]*Disk `json:"disks"` // disk usage by partition
DisksIO map[string]*DiskIO `json:"disks_io"` // disk IO by device
Network *Network `json:"network"`
Sensors []sensors.TemperatureStat `json:"sensors"` // sensor temperature by key
Timestamp int64 `json:"timestamp"`
CPUAverage *float64 `json:"cpu_average"`
Memory *MemoryUsage `json:"memory"`
Disks map[string]*Disk `json:"disks"` // disk usage by partition
DisksIO map[string]*DiskIO `json:"disks_io"` // disk IO by device
Network *Network `json:"network"`
Sensors Sensors `json:"sensors"` // sensor temperature by key
}
const (
@ -93,7 +95,7 @@ var allQueries = []string{
querySensorTemperature,
}
var Poller = period.NewPollerWithAggregator("system_info", getSystemInfo, aggregate)
var Poller = period.NewPoller("system_info", getSystemInfo, aggregate)
var bufPool = sync.Pool{
New: func() any {
@ -421,6 +423,26 @@ func (s *SystemInfo) MarshalJSON() ([]byte, error) {
return []byte(b.String()), nil
}
func (s *Sensors) UnmarshalJSON(data []byte) error {
var v map[string]map[string]any
if err := json.Unmarshal(data, &v); err != nil {
return err
}
if len(v) == 0 {
return nil
}
*s = make(Sensors, 0, len(v))
for k, v := range v {
*s = append(*s, sensors.TemperatureStat{
SensorKey: k,
Temperature: v["temperature"].(float64),
High: v["high"].(float64),
Critical: v["critical"].(float64),
})
}
return nil
}
// recharts friendly
func aggregate(entries []*SystemInfo, query url.Values) (total int, result Aggregated) {
n := len(entries)

View file

@ -6,6 +6,7 @@ import (
"reflect"
"testing"
"github.com/shirou/gopsutil/v4/sensors"
. "github.com/yusing/go-proxy/internal/utils/testing"
)
@ -60,6 +61,20 @@ var testInfo = &SystemInfo{
UploadSpeed: 1024.5,
DownloadSpeed: 2048.5,
},
Sensors: []sensors.TemperatureStat{
{
SensorKey: "cpu_temp",
Temperature: 30.0,
High: 40.0,
Critical: 50.0,
},
{
SensorKey: "gpu_temp",
Temperature: 40.0,
High: 50.0,
Critical: 60.0,
},
},
}
func TestSystemInfo(t *testing.T) {

View file

@ -17,19 +17,19 @@ import (
type (
StatusByAlias struct {
Map map[string]*routequery.HealthInfoRaw
Timestamp time.Time
Map map[string]*routequery.HealthInfoRaw `json:"statuses"`
Timestamp int64 `json:"timestamp"`
}
Status struct {
Status health.Status
Latency time.Duration
Timestamp time.Time
Status health.Status `json:"status"`
Latency int64 `json:"latency"`
Timestamp int64 `json:"timestamp"`
}
RouteStatuses map[string][]*Status
Aggregated []map[string]any
)
var Poller = period.NewPollerWithAggregator("uptime", getStatuses, aggregateStatuses)
var Poller = period.NewPoller("uptime", getStatuses, aggregateStatuses)
func init() {
Poller.Start()
@ -38,7 +38,7 @@ func init() {
func getStatuses(ctx context.Context, _ *StatusByAlias) (*StatusByAlias, error) {
return &StatusByAlias{
Map: routequery.HealthInfo(),
Timestamp: time.Now(),
Timestamp: time.Now().Unix(),
}, nil
}
@ -52,7 +52,7 @@ func aggregateStatuses(entries []*StatusByAlias, query url.Values) (int, Aggrega
for alias, status := range entry.Map {
statuses[alias] = append(statuses[alias], &Status{
Status: status.Status,
Latency: status.Latency,
Latency: status.Latency.Milliseconds(),
Timestamp: entry.Timestamp,
})
}
@ -67,11 +67,12 @@ func aggregateStatuses(entries []*StatusByAlias, query url.Values) (int, Aggrega
return len(statuses), statuses.aggregate(limit, offset)
}
func (rs RouteStatuses) calculateInfo(statuses []*Status) (up float64, down float64, idle float64, latency int64) {
func (rs RouteStatuses) calculateInfo(statuses []*Status) (up float64, down float64, idle float64, _ float64) {
if len(statuses) == 0 {
return 0, 0, 0, 0
}
total := float64(0)
latency := float64(0)
for _, status := range statuses {
// ignoring unknown; treating napping and starting as downtime
if status.Status == health.StatusUnknown {
@ -86,12 +87,12 @@ func (rs RouteStatuses) calculateInfo(statuses []*Status) (up float64, down floa
down++
}
total++
latency += status.Latency.Milliseconds()
latency += float64(status.Latency)
}
if total == 0 {
return 0, 0, 0, 0
}
return up / total, down / total, idle / total, latency / int64(total)
return up / total, down / total, idle / total, latency / total
}
func (rs RouteStatuses) aggregate(limit int, offset int) Aggregated {
@ -128,17 +129,6 @@ func (rs RouteStatuses) aggregate(limit int, offset int) Aggregated {
return result
}
func (s *Status) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{
"status": s.Status.String(),
"latency": s.Latency.Milliseconds(),
"timestamp": s.Timestamp.Unix(),
})
}
func (s *StatusByAlias) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{
"statuses": s.Map,
"timestamp": s.Timestamp.Unix(),
})
func (result Aggregated) MarshalJSON() ([]byte, error) {
return json.Marshal([]map[string]any(result))
}

View file

@ -1,5 +1,7 @@
package health
import "encoding/json"
type Status uint8
const (
@ -37,6 +39,28 @@ func (s Status) MarshalJSON() ([]byte, error) {
return []byte(`"` + s.String() + `"`), nil
}
func (s *Status) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
switch str {
case "healthy":
*s = StatusHealthy
case "unhealthy":
*s = StatusUnhealthy
case "napping":
*s = StatusNapping
case "starting":
*s = StatusStarting
case "error":
*s = StatusError
default:
*s = StatusUnknown
}
return nil
}
func (s Status) Good() bool {
return s&HealthyMask != 0
}