reduce memory usage and improve performance

This commit is contained in:
yusing 2025-02-17 05:05:53 +08:00
parent ed7937a026
commit 1b7b6196c5
3 changed files with 226 additions and 119 deletions

1
.gitignore vendored
View file

@ -9,6 +9,7 @@ certs*/
bin/ bin/
error_pages/ error_pages/
!examples/error_pages/ !examples/error_pages/
profiles/
logs/ logs/
log/ log/

View file

@ -1,12 +1,14 @@
package systeminfo package systeminfo
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/shirou/gopsutil/v4/cpu" "github.com/shirou/gopsutil/v4/cpu"
@ -52,21 +54,17 @@ type (
UploadSpeed float64 `json:"upload_speed"` UploadSpeed float64 `json:"upload_speed"`
DownloadSpeed float64 `json:"download_speed"` DownloadSpeed float64 `json:"download_speed"`
} }
Sensor struct { Aggregated []map[string]any
Temperature float32 `json:"temperature"`
High float32 `json:"high"`
Critical float32 `json:"critical"`
}
) )
type SystemInfo struct { type SystemInfo struct {
Timestamp int64 `json:"timestamp"` Timestamp int64 `json:"timestamp"`
CPUAverage *float64 `json:"cpu_average"` CPUAverage *float64 `json:"cpu_average"`
Memory *MemoryUsage `json:"memory"` Memory *MemoryUsage `json:"memory"`
Disks map[string]*Disk `json:"disks"` // disk usage by partition Disks map[string]*Disk `json:"disks"` // disk usage by partition
DisksIO map[string]*DiskIO `json:"disks_io"` // disk IO by device DisksIO map[string]*DiskIO `json:"disks_io"` // disk IO by device
Network *Network `json:"network"` Network *Network `json:"network"`
Sensors map[string]Sensor `json:"sensors"` // sensor temperature by key Sensors []sensors.TemperatureStat `json:"sensors"` // sensor temperature by key
} }
const ( const (
@ -82,8 +80,27 @@ const (
querySensorTemperature = "sensor_temperature" querySensorTemperature = "sensor_temperature"
) )
var allQueries = []string{
queryCPUAverage,
queryMemoryUsage,
queryMemoryUsagePercent,
queryDisksReadSpeed,
queryDisksWriteSpeed,
queryDisksIOPS,
queryDiskUsage,
queryNetworkSpeed,
queryNetworkTransfer,
querySensorTemperature,
}
var Poller = period.NewPollerWithAggregator("system_info", getSystemInfo, aggregate) var Poller = period.NewPollerWithAggregator("system_info", getSystemInfo, aggregate)
var bufPool = sync.Pool{
New: func() any {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}
func init() { func init() {
Poller.Start() Poller.Start()
} }
@ -171,6 +188,34 @@ func (s *SystemInfo) collectDisksInfo(ctx context.Context, lastResult *SystemInf
} }
s.DisksIO = make(map[string]*DiskIO, len(ioCounters)) s.DisksIO = make(map[string]*DiskIO, len(ioCounters))
for name, io := range ioCounters { for name, io := range ioCounters {
// include only /dev/sd* and /dev/nvme* disk devices
if len(name) < 3 {
continue
}
switch {
case strings.HasPrefix(name, "nvme"),
strings.HasPrefix(name, "mmcblk"): // NVMe/SD/MMC
if name[len(name)-2] == 'p' {
continue // skip partitions
}
default:
switch name[0] {
case 's', 'h', 'v': // SCSI/SATA/virtio disks
if name[1] != 'd' {
continue
}
case 'x': // Xen virtual disks
if name[1:3] != "vd" {
continue
}
default:
continue
}
last := name[len(name)-1]
if last >= '0' && last <= '9' {
continue // skip partitions
}
}
s.DisksIO[name] = &DiskIO{ s.DisksIO[name] = &DiskIO{
ReadBytes: io.ReadBytes, ReadBytes: io.ReadBytes,
WriteBytes: io.WriteBytes, WriteBytes: io.WriteBytes,
@ -243,21 +288,16 @@ func (s *SystemInfo) collectSensorsInfo(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
s.Sensors = make(map[string]Sensor, len(sensorsInfo)) s.Sensors = sensorsInfo
for _, sensor := range sensorsInfo {
s.Sensors[sensor.SensorKey] = Sensor{
Temperature: float32(sensor.Temperature),
High: float32(sensor.High),
Critical: float32(sensor.Critical),
}
}
return nil return nil
} }
// explicitly implement MarshalJSON to avoid reflection // explicitly implement MarshalJSON to avoid reflection
func (s *SystemInfo) MarshalJSON() ([]byte, error) { func (s *SystemInfo) MarshalJSON() ([]byte, error) {
var b strings.Builder b := bufPool.Get().(*bytes.Buffer)
b.Grow(1024) b.Reset()
defer bufPool.Put(b)
b.WriteRune('{') b.WriteRune('{')
// timestamp // timestamp
@ -358,14 +398,14 @@ func (s *SystemInfo) MarshalJSON() ([]byte, error) {
if s.Sensors != nil { if s.Sensors != nil {
b.WriteString("{") b.WriteString("{")
first := true first := true
for key, sensor := range s.Sensors { for _, sensor := range s.Sensors {
if !first { if !first {
b.WriteRune(',') b.WriteRune(',')
} }
b.WriteString(fmt.Sprintf( b.WriteString(fmt.Sprintf(
`%q:{"name":%q,"temperature":%s,"high":%s,"critical":%s}`, `%q:{"name":%q,"temperature":%s,"high":%s,"critical":%s}`,
key, sensor.SensorKey,
key, sensor.SensorKey,
strconv.FormatFloat(float64(sensor.Temperature), 'f', 2, 32), strconv.FormatFloat(float64(sensor.Temperature), 'f', 2, 32),
strconv.FormatFloat(float64(sensor.High), 'f', 2, 32), strconv.FormatFloat(float64(sensor.High), 'f', 2, 32),
strconv.FormatFloat(float64(sensor.Critical), 'f', 2, 32), strconv.FormatFloat(float64(sensor.Critical), 'f', 2, 32),
@ -382,11 +422,11 @@ func (s *SystemInfo) MarshalJSON() ([]byte, error) {
} }
// recharts friendly // recharts friendly
func aggregate(entries []*SystemInfo, query url.Values) (total int, result []map[string]any) { func aggregate(entries []*SystemInfo, query url.Values) (total int, result Aggregated) {
n := len(entries) n := len(entries)
aggregated := make(Aggregated, 0, n)
switch query.Get("aggregate") { switch query.Get("aggregate") {
case queryCPUAverage: case queryCPUAverage:
aggregated := make([]map[string]any, 0, n)
for _, entry := range entries { for _, entry := range entries {
if entry.CPUAverage != nil { if entry.CPUAverage != nil {
aggregated = append(aggregated, map[string]any{ aggregated = append(aggregated, map[string]any{
@ -395,9 +435,7 @@ func aggregate(entries []*SystemInfo, query url.Values) (total int, result []map
}) })
} }
} }
return len(aggregated), aggregated
case queryMemoryUsage: case queryMemoryUsage:
aggregated := make([]map[string]any, 0, n)
for _, entry := range entries { for _, entry := range entries {
if entry.Memory != nil { if entry.Memory != nil {
aggregated = append(aggregated, map[string]any{ aggregated = append(aggregated, map[string]any{
@ -406,9 +444,7 @@ func aggregate(entries []*SystemInfo, query url.Values) (total int, result []map
}) })
} }
} }
return len(aggregated), aggregated
case queryMemoryUsagePercent: case queryMemoryUsagePercent:
aggregated := make([]map[string]any, 0, n)
for _, entry := range entries { for _, entry := range entries {
if entry.Memory != nil { if entry.Memory != nil {
aggregated = append(aggregated, map[string]any{ aggregated = append(aggregated, map[string]any{
@ -417,105 +453,133 @@ func aggregate(entries []*SystemInfo, query url.Values) (total int, result []map
}) })
} }
} }
return len(aggregated), aggregated
case queryDisksReadSpeed: case queryDisksReadSpeed:
aggregated := make([]map[string]any, 0, n)
for _, entry := range entries { for _, entry := range entries {
if entry.DisksIO == nil { if entry.DisksIO == nil {
continue continue
} }
m := make(map[string]any) m := make(map[string]any, len(entry.DisksIO)+1)
for name, usage := range entry.DisksIO { for name, usage := range entry.DisksIO {
m[name] = usage.ReadSpeed m[name] = usage.ReadSpeed
} }
m["timestamp"] = entry.Timestamp m["timestamp"] = entry.Timestamp
aggregated = append(aggregated, m) aggregated = append(aggregated, m)
} }
return len(aggregated), aggregated
case queryDisksWriteSpeed: case queryDisksWriteSpeed:
aggregated := make([]map[string]any, 0, n)
for _, entry := range entries { for _, entry := range entries {
if entry.DisksIO == nil { if entry.DisksIO == nil {
continue continue
} }
m := make(map[string]any) m := make(map[string]any, len(entry.DisksIO)+1)
for name, usage := range entry.DisksIO { for name, usage := range entry.DisksIO {
m[name] = usage.WriteSpeed m[name] = usage.WriteSpeed
} }
m["timestamp"] = entry.Timestamp m["timestamp"] = entry.Timestamp
aggregated = append(aggregated, m) aggregated = append(aggregated, m)
} }
return len(aggregated), aggregated
case queryDisksIOPS: case queryDisksIOPS:
aggregated := make([]map[string]any, 0, n)
for _, entry := range entries { for _, entry := range entries {
if entry.DisksIO == nil { if entry.DisksIO == nil {
continue continue
} }
m := make(map[string]any) m := make(map[string]any, len(entry.DisksIO)+1)
for name, usage := range entry.DisksIO { for name, usage := range entry.DisksIO {
m[name] = usage.Iops m[name] = usage.Iops
} }
m["timestamp"] = entry.Timestamp m["timestamp"] = entry.Timestamp
aggregated = append(aggregated, m) aggregated = append(aggregated, m)
} }
return len(aggregated), aggregated
case queryDiskUsage: case queryDiskUsage:
aggregated := make([]map[string]any, 0, n)
for _, entry := range entries { for _, entry := range entries {
if entry.Disks == nil { if entry.Disks == nil {
continue continue
} }
m := make(map[string]any) m := make(map[string]any, len(entry.Disks)+1)
for name, disk := range entry.Disks { for name, disk := range entry.Disks {
m[name] = disk.Used m[name] = disk.Used
} }
m["timestamp"] = entry.Timestamp m["timestamp"] = entry.Timestamp
aggregated = append(aggregated, m) aggregated = append(aggregated, m)
} }
return len(aggregated), aggregated
case queryNetworkSpeed: case queryNetworkSpeed:
aggregated := make([]map[string]any, 0, n)
for _, entry := range entries { for _, entry := range entries {
if entry.Network == nil { if entry.Network == nil {
continue continue
} }
m := map[string]any{ aggregated = append(aggregated, map[string]any{
"timestamp": entry.Timestamp, "timestamp": entry.Timestamp,
"upload": entry.Network.UploadSpeed, "upload": entry.Network.UploadSpeed,
"download": entry.Network.DownloadSpeed, "download": entry.Network.DownloadSpeed,
} })
aggregated = append(aggregated, m)
} }
return len(aggregated), aggregated
case queryNetworkTransfer: case queryNetworkTransfer:
aggregated := make([]map[string]any, 0, n)
for _, entry := range entries { for _, entry := range entries {
if entry.Network == nil { if entry.Network == nil {
continue continue
} }
m := map[string]any{ aggregated = append(aggregated, map[string]any{
"timestamp": entry.Timestamp, "timestamp": entry.Timestamp,
"upload": entry.Network.BytesSent, "upload": entry.Network.BytesSent,
"download": entry.Network.BytesRecv, "download": entry.Network.BytesRecv,
} })
aggregated = append(aggregated, m)
} }
return len(aggregated), aggregated
case querySensorTemperature: case querySensorTemperature:
aggregated := make([]map[string]any, 0, n) aggregated := make([]map[string]any, 0, n)
for _, entry := range entries { for _, entry := range entries {
if entry.Sensors == nil { if entry.Sensors == nil {
continue continue
} }
m := make(map[string]any) m := make(map[string]any, len(entry.Sensors)+1)
for key, sensor := range entry.Sensors { for _, sensor := range entry.Sensors {
m[key] = sensor.Temperature m[sensor.SensorKey] = sensor.Temperature
} }
m["timestamp"] = entry.Timestamp m["timestamp"] = entry.Timestamp
aggregated = append(aggregated, m) aggregated = append(aggregated, m)
} }
return len(aggregated), aggregated default:
return -1, nil
} }
return -1, []map[string]any{} return len(aggregated), aggregated
}
func (result Aggregated) MarshalJSON() ([]byte, error) {
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufPool.Put(buf)
buf.WriteByte('[')
i := 0
n := len(result)
for _, entry := range result {
buf.WriteRune('{')
j := 0
m := len(entry)
for k, v := range entry {
buf.WriteByte('"')
buf.WriteString(k)
buf.WriteByte('"')
buf.WriteByte(':')
switch v := v.(type) {
case float64:
buf.WriteString(strconv.FormatFloat(v, 'f', 2, 64))
case uint64:
buf.WriteString(strconv.FormatUint(v, 10))
case int64:
buf.WriteString(strconv.FormatInt(v, 10))
default:
panic(fmt.Sprintf("unexpected type: %T", v))
}
if j != m-1 {
buf.WriteByte(',')
}
j++
}
buf.WriteByte('}')
if i != n-1 {
buf.WriteByte(',')
}
i++
}
buf.WriteByte(']')
return buf.Bytes(), nil
} }

View file

@ -2,77 +2,67 @@ package systeminfo
import ( import (
"encoding/json" "encoding/json"
"net/url"
"reflect"
"testing" "testing"
. "github.com/yusing/go-proxy/internal/utils/testing" . "github.com/yusing/go-proxy/internal/utils/testing"
) )
func TestSystemInfo(t *testing.T) { // Create test data
// Create test data var cpuAvg = 45.67
cpuAvg := 45.67 var testInfo = &SystemInfo{
testInfo := &SystemInfo{ Timestamp: 123456,
Timestamp: 1234567890, CPUAverage: &cpuAvg,
CPUAverage: &cpuAvg, Memory: &MemoryUsage{
Memory: &MemoryUsage{ Total: 16000000000,
Total: 16000000000, Available: 8000000000,
Available: 8000000000, Used: 8000000000,
Used: 8000000000, UsedPercent: 50.0,
},
Disks: map[string]*Disk{
"sda": {
Path: "/",
Fstype: "ext4",
Total: 500000000000,
Free: 250000000000,
Used: 250000000000,
UsedPercent: 50.0, UsedPercent: 50.0,
}, },
Disks: map[string]*Disk{ "nvme0n1": {
"sda": { Path: "/",
Path: "/", Fstype: "zfs",
Fstype: "ext4", Total: 500000000000,
Total: 500000000000, Free: 250000000000,
Free: 250000000000, Used: 250000000000,
Used: 250000000000, UsedPercent: 50.0,
UsedPercent: 50.0,
},
"nvme0n1": {
Path: "/",
Fstype: "zfs",
Total: 500000000000,
Free: 250000000000,
Used: 250000000000,
UsedPercent: 50.0,
},
}, },
DisksIO: map[string]*DiskIO{ },
"media": { DisksIO: map[string]*DiskIO{
ReadBytes: 1000000, "media": {
WriteBytes: 2000000, ReadBytes: 1000000,
ReadSpeed: 100.5, WriteBytes: 2000000,
WriteSpeed: 200.5, ReadSpeed: 100.5,
Iops: 1000, WriteSpeed: 200.5,
}, Iops: 1000,
"nvme0n1": {
ReadBytes: 1000000,
WriteBytes: 2000000,
ReadSpeed: 100.5,
WriteSpeed: 200.5,
Iops: 1000,
},
}, },
Network: &Network{ "nvme0n1": {
BytesSent: 5000000, ReadBytes: 1000000,
BytesRecv: 10000000, WriteBytes: 2000000,
UploadSpeed: 1024.5, ReadSpeed: 100.5,
DownloadSpeed: 2048.5, WriteSpeed: 200.5,
Iops: 1000,
}, },
Sensors: map[string]Sensor{ },
"cpu": { Network: &Network{
Temperature: 75.5, BytesSent: 5000000,
High: 85.0, BytesRecv: 10000000,
Critical: 95.0, UploadSpeed: 1024.5,
}, DownloadSpeed: 2048.5,
"nvme0n1": { },
Temperature: 75.5, }
High: 85.0,
Critical: 95.0,
},
},
}
func TestSystemInfo(t *testing.T) {
// Test marshaling // Test marshaling
data, err := json.Marshal(testInfo) data, err := json.Marshal(testInfo)
ExpectNoError(t, err) ExpectNoError(t, err)
@ -110,3 +100,55 @@ func TestSystemInfo(t *testing.T) {
ExpectTrue(t, decodedNil.Network == nil) ExpectTrue(t, decodedNil.Network == nil)
ExpectTrue(t, decodedNil.Sensors == nil) ExpectTrue(t, decodedNil.Sensors == nil)
} }
func TestSerialize(t *testing.T) {
entries := make([]*SystemInfo, 5)
for i := 0; i < 5; i++ {
entries[i] = testInfo
}
for _, query := range allQueries {
t.Run(query, func(t *testing.T) {
_, result := aggregate(entries, url.Values{"aggregate": []string{query}})
s, err := result.MarshalJSON()
ExpectNoError(t, err)
var v []map[string]any
ExpectNoError(t, json.Unmarshal(s, &v))
ExpectEqual(t, len(v), len(result))
for i, m := range v {
for k, v := range m {
// some int64 values are converted to float64 on json.Unmarshal
vv := reflect.ValueOf(result[i][k])
ExpectEqual(t, reflect.ValueOf(v).Convert(vv.Type()).Interface(), vv.Interface())
}
}
})
}
}
func BenchmarkSerialize(b *testing.B) {
entries := make([]*SystemInfo, b.N)
for i := 0; i < b.N; i++ {
entries[i] = testInfo
}
queries := map[string]Aggregated{}
for _, query := range allQueries {
_, result := aggregate(entries, url.Values{"aggregate": []string{query}})
queries[query] = result
}
b.ReportAllocs()
b.ResetTimer()
b.Run("optimized", func(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, query := range allQueries {
_, _ = queries[query].MarshalJSON()
}
}
})
b.Run("json", func(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, query := range allQueries {
_, _ = json.Marshal([]map[string]any(queries[query]))
}
}
})
}