From a162371ec5877fc9dfc62038c5a61beb65b56d65 Mon Sep 17 00:00:00 2001 From: yusing Date: Wed, 14 May 2025 21:38:28 +0800 Subject: [PATCH] feat: parallelize system info collection and refactor code --- internal/metrics/systeminfo/json.go | 201 +++++++++++++++++ internal/metrics/systeminfo/system_info.go | 237 +++------------------ 2 files changed, 232 insertions(+), 206 deletions(-) create mode 100644 internal/metrics/systeminfo/json.go diff --git a/internal/metrics/systeminfo/json.go b/internal/metrics/systeminfo/json.go new file mode 100644 index 0000000..6a8a6d6 --- /dev/null +++ b/internal/metrics/systeminfo/json.go @@ -0,0 +1,201 @@ +package systeminfo + +import ( + "encoding/json" + "fmt" + "strconv" + + "github.com/shirou/gopsutil/v4/sensors" + "github.com/yusing/go-proxy/internal/utils/synk" +) + +var bufPool = synk.NewBytesPool(1024, 16384) + +// explicitly implement MarshalJSON to avoid reflection. +func (s *SystemInfo) MarshalJSON() ([]byte, error) { + b := bufPool.Get() + defer bufPool.Put(b) + + b = append(b, '{') + + // timestamp + b = append(b, `"timestamp":`...) + b = strconv.AppendInt(b, s.Timestamp, 10) + + // cpu_average + b = append(b, `,"cpu_average":`...) + if s.CPUAverage != nil { + b = strconv.AppendFloat(b, *s.CPUAverage, 'f', 2, 64) + } else { + b = append(b, "null"...) + } + + // memory + b = append(b, `,"memory":`...) + if s.Memory != nil { + b = fmt.Appendf(b, + `{"total":%d,"available":%d,"used":%d,"used_percent":%.2f}`, + s.Memory.Total, + s.Memory.Available, + s.Memory.Used, + s.Memory.UsedPercent, + ) + } else { + b = append(b, "null"...) + } + + // disk + b = append(b, `,"disks":`...) + if len(s.Disks) > 0 { + b = append(b, '{') + first := true + for device, disk := range s.Disks { + if !first { + b = append(b, ',') + } + b = fmt.Appendf(b, + `"%s":{"device":"%s","path":"%s","fstype":"%s","total":%d,"free":%d,"used":%d,"used_percent":%.2f}`, + device, + device, + disk.Path, + disk.Fstype, + disk.Total, + disk.Free, + disk.Used, + disk.UsedPercent, + ) + first = false + } + b = append(b, '}') + } else { + b = append(b, "null"...) + } + + // disks_io + b = append(b, `,"disks_io":`...) + if len(s.DisksIO) > 0 { + b = append(b, '{') + first := true + for name, usage := range s.DisksIO { + if !first { + b = append(b, ',') + } + b = fmt.Appendf(b, + `"%s":{"name":"%s","read_bytes":%d,"write_bytes":%d,"read_speed":%.2f,"write_speed":%.2f,"iops":%d}`, + name, + name, + usage.ReadBytes, + usage.WriteBytes, + usage.ReadSpeed, + usage.WriteSpeed, + usage.Iops, + ) + first = false + } + b = append(b, '}') + } else { + b = append(b, "null"...) + } + + // network + b = append(b, `,"network":`...) + if s.Network != nil { + b = fmt.Appendf(b, + `{"bytes_sent":%d,"bytes_recv":%d,"upload_speed":%.2f,"download_speed":%.2f}`, + s.Network.BytesSent, + s.Network.BytesRecv, + s.Network.UploadSpeed, + s.Network.DownloadSpeed, + ) + } else { + b = append(b, "null"...) + } + + // sensors + b = append(b, `,"sensors":`...) + if len(s.Sensors) > 0 { + b = append(b, '{') + first := true + for _, sensor := range s.Sensors { + if !first { + b = append(b, ',') + } + b = fmt.Appendf(b, + `"%s":{"name":"%s","temperature":%.2f,"high":%.2f,"critical":%.2f}`, + sensor.SensorKey, + sensor.SensorKey, + sensor.Temperature, + sensor.High, + sensor.Critical, + ) + first = false + } + b = append(b, '}') + } else { + b = append(b, "null"...) + } + + b = append(b, '}') + return b, nil +} + +func (s *Sensors) UnmarshalJSON(data []byte) error { + var v map[string]map[string]any + if err := json.Unmarshal(data, &v); err != nil { + return err + } + if len(v) == 0 { + return nil + } + *s = make(Sensors, 0, len(v)) + for k, v := range v { + *s = append(*s, sensors.TemperatureStat{ + SensorKey: k, + Temperature: v["temperature"].(float64), + High: v["high"].(float64), + Critical: v["critical"].(float64), + }) + } + return nil +} + +func (result Aggregated) MarshalJSON() ([]byte, error) { + buf := bufPool.Get() + defer bufPool.Put(buf) + + buf = append(buf, '[') + i := 0 + n := len(result) + for _, entry := range result { + buf = append(buf, '{') + j := 0 + m := len(entry) + for k, v := range entry { + buf = append(buf, '"') + buf = append(buf, k...) + buf = append(buf, '"') + buf = append(buf, ':') + switch v := v.(type) { + case float64: + buf = strconv.AppendFloat(buf, v, 'f', 2, 64) + case uint64: + buf = strconv.AppendUint(buf, v, 10) + case int64: + buf = strconv.AppendInt(buf, v, 10) + default: + panic(fmt.Sprintf("unexpected type: %T", v)) + } + if j != m-1 { + buf = append(buf, ',') + } + j++ + } + buf = append(buf, '}') + if i != n-1 { + buf = append(buf, ',') + } + i++ + } + buf = append(buf, ']') + return buf, nil +} diff --git a/internal/metrics/systeminfo/system_info.go b/internal/metrics/systeminfo/system_info.go index cdd75fc..f154244 100644 --- a/internal/metrics/systeminfo/system_info.go +++ b/internal/metrics/systeminfo/system_info.go @@ -2,11 +2,9 @@ package systeminfo // import github.com/yusing/go-proxy/internal/metrics/systemi import ( "context" - "encoding/json" "errors" - "fmt" "net/url" - "strconv" + "sync" "syscall" "time" @@ -20,7 +18,6 @@ import ( "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/metrics/period" - "github.com/yusing/go-proxy/internal/utils/synk" ) // json tags are left for tests @@ -68,34 +65,53 @@ var allQueries = []string{ var Poller = period.NewPoller("system_info", getSystemInfo, aggregate) -func _() { // check if this behavior is not changed - var _ sensors.Warnings = disk.Warnings{} -} - func isNoDataAvailable(err error) bool { return errors.Is(err, syscall.ENODATA) } func getSystemInfo(ctx context.Context, lastResult *SystemInfo) (*SystemInfo, error) { - errs := gperr.NewBuilder("failed to get system info") + errs := gperr.NewBuilderWithConcurrency("failed to get system info") var s SystemInfo s.Timestamp = time.Now().Unix() + var wg sync.WaitGroup + if !common.MetricsDisableCPU { - errs.Add(s.collectCPUInfo(ctx)) + wg.Add(1) + go func() { + defer wg.Done() + errs.Add(s.collectCPUInfo(ctx)) + }() } if !common.MetricsDisableMemory { - errs.Add(s.collectMemoryInfo(ctx)) + wg.Add(1) + go func() { + defer wg.Done() + errs.Add(s.collectMemoryInfo(ctx)) + }() } if !common.MetricsDisableDisk { - errs.Add(s.collectDisksInfo(ctx, lastResult)) + wg.Add(1) + go func() { + defer wg.Done() + errs.Add(s.collectDisksInfo(ctx, lastResult)) + }() } if !common.MetricsDisableNetwork { - errs.Add(s.collectNetworkInfo(ctx, lastResult)) + wg.Add(1) + go func() { + defer wg.Done() + errs.Add(s.collectNetworkInfo(ctx, lastResult)) + }() } if !common.MetricsDisableSensors { - errs.Add(s.collectSensorsInfo(ctx)) + wg.Add(1) + go func() { + defer wg.Done() + errs.Add(s.collectSensorsInfo(ctx)) + }() } + wg.Wait() if errs.HasError() { allWarnings := gperr.NewBuilder("") @@ -207,157 +223,7 @@ func (s *SystemInfo) collectSensorsInfo(ctx context.Context) error { return nil } -var bufPool = synk.NewBytesPool(1024, 16384) - -// explicitly implement MarshalJSON to avoid reflection -func (s *SystemInfo) MarshalJSON() ([]byte, error) { - b := bufPool.Get() - defer bufPool.Put(b) - - b = append(b, '{') - - // timestamp - b = append(b, `"timestamp":`...) - b = strconv.AppendInt(b, s.Timestamp, 10) - - // cpu_average - b = append(b, `,"cpu_average":`...) - if s.CPUAverage != nil { - b = strconv.AppendFloat(b, *s.CPUAverage, 'f', 2, 64) - } else { - b = append(b, "null"...) - } - - // memory - b = append(b, `,"memory":`...) - if s.Memory != nil { - b = fmt.Appendf(b, - `{"total":%d,"available":%d,"used":%d,"used_percent":%.2f}`, - s.Memory.Total, - s.Memory.Available, - s.Memory.Used, - s.Memory.UsedPercent, - ) - } else { - b = append(b, "null"...) - } - - // disk - b = append(b, `,"disks":`...) - if len(s.Disks) > 0 { - b = append(b, '{') - first := true - for device, disk := range s.Disks { - if !first { - b = append(b, ',') - } - b = fmt.Appendf(b, - `"%s":{"device":"%s","path":"%s","fstype":"%s","total":%d,"free":%d,"used":%d,"used_percent":%.2f}`, - device, - device, - disk.Path, - disk.Fstype, - disk.Total, - disk.Free, - disk.Used, - disk.UsedPercent, - ) - first = false - } - b = append(b, '}') - } else { - b = append(b, "null"...) - } - - // disks_io - b = append(b, `,"disks_io":`...) - if len(s.DisksIO) > 0 { - b = append(b, '{') - first := true - for name, usage := range s.DisksIO { - if !first { - b = append(b, ',') - } - b = fmt.Appendf(b, - `"%s":{"name":"%s","read_bytes":%d,"write_bytes":%d,"read_speed":%.2f,"write_speed":%.2f,"iops":%d}`, - name, - name, - usage.ReadBytes, - usage.WriteBytes, - usage.ReadSpeed, - usage.WriteSpeed, - usage.Iops, - ) - first = false - } - b = append(b, '}') - } else { - b = append(b, "null"...) - } - - // network - b = append(b, `,"network":`...) - if s.Network != nil { - b = fmt.Appendf(b, - `{"bytes_sent":%d,"bytes_recv":%d,"upload_speed":%.2f,"download_speed":%.2f}`, - s.Network.BytesSent, - s.Network.BytesRecv, - s.Network.UploadSpeed, - s.Network.DownloadSpeed, - ) - } else { - b = append(b, "null"...) - } - - // sensors - b = append(b, `,"sensors":`...) - if len(s.Sensors) > 0 { - b = append(b, '{') - first := true - for _, sensor := range s.Sensors { - if !first { - b = append(b, ',') - } - b = fmt.Appendf(b, - `"%s":{"name":"%s","temperature":%.2f,"high":%.2f,"critical":%.2f}`, - sensor.SensorKey, - sensor.SensorKey, - sensor.Temperature, - sensor.High, - sensor.Critical, - ) - first = false - } - b = append(b, '}') - } else { - b = append(b, "null"...) - } - - b = append(b, '}') - return b, nil -} - -func (s *Sensors) UnmarshalJSON(data []byte) error { - var v map[string]map[string]any - if err := json.Unmarshal(data, &v); err != nil { - return err - } - if len(v) == 0 { - return nil - } - *s = make(Sensors, 0, len(v)) - for k, v := range v { - *s = append(*s, sensors.TemperatureStat{ - SensorKey: k, - Temperature: v["temperature"].(float64), - High: v["high"].(float64), - Critical: v["critical"].(float64), - }) - } - return nil -} - -// recharts friendly +// recharts friendly. func aggregate(entries []*SystemInfo, query url.Values) (total int, result Aggregated) { n := len(entries) aggregated := make(Aggregated, 0, n) @@ -476,44 +342,3 @@ func aggregate(entries []*SystemInfo, query url.Values) (total int, result Aggre } return len(aggregated), aggregated } - -func (result Aggregated) MarshalJSON() ([]byte, error) { - buf := bufPool.Get() - defer bufPool.Put(buf) - - buf = append(buf, '[') - i := 0 - n := len(result) - for _, entry := range result { - buf = append(buf, '{') - j := 0 - m := len(entry) - for k, v := range entry { - buf = append(buf, '"') - buf = append(buf, k...) - buf = append(buf, '"') - buf = append(buf, ':') - switch v := v.(type) { - case float64: - buf = strconv.AppendFloat(buf, v, 'f', 2, 64) - case uint64: - buf = strconv.AppendUint(buf, v, 10) - case int64: - buf = strconv.AppendInt(buf, v, 10) - default: - panic(fmt.Sprintf("unexpected type: %T", v)) - } - if j != m-1 { - buf = append(buf, ',') - } - j++ - } - buf = append(buf, '}') - if i != n-1 { - buf = append(buf, ',') - } - i++ - } - buf = append(buf, ']') - return buf, nil -}