From 9711867fbef0e3d680e20ca1b30c4cde66396817 Mon Sep 17 00:00:00 2001 From: yusing Date: Thu, 22 May 2025 22:43:11 +0800 Subject: [PATCH] tweak: optimize memory usage and allocation --- internal/utils/synk/pool.go | 161 ++++++++++++++++++++----- internal/utils/synk/pool_bench_test.go | 43 +++++++ 2 files changed, 172 insertions(+), 32 deletions(-) create mode 100644 internal/utils/synk/pool_bench_test.go diff --git a/internal/utils/synk/pool.go b/internal/utils/synk/pool.go index df1d015..11e282a 100644 --- a/internal/utils/synk/pool.go +++ b/internal/utils/synk/pool.go @@ -1,48 +1,145 @@ package synk import ( - "sync" + "os" + "os/signal" + "sync/atomic" + "time" + + "github.com/yusing/go-proxy/internal/logging" ) -type ( - // Pool is a wrapper of sync.Pool that limits the size of the object. - Pool[T any] struct { - pool *sync.Pool - } - BytesPool = Pool[byte] -) - -const DefaultInitBytes = 32 * 1024 - -func NewPool[T any](initSize int) *Pool[T] { - return &Pool[T]{ - pool: &sync.Pool{ - New: func() any { - return make([]T, 0, initSize) - }, - }, - } +type BytesPool struct { + pool chan []byte + initSize int } -var bytesPool = NewPool[byte](DefaultInitBytes) +const ( + kb = 1024 + mb = 1024 * kb +) + +const ( + InPoolLimit = 32 * mb + + DefaultInitBytes = 32 * kb + PoolThreshold = 64 * kb + DropThresholdHigh = 4 * mb + + PoolSize = InPoolLimit / PoolThreshold + + CleanupInterval = 5 * time.Second + MaxDropsPerCycle = 10 + MaxChecksPerCycle = 100 +) + +var bytesPool = &BytesPool{ + pool: make(chan []byte, PoolSize), + initSize: DefaultInitBytes, +} func NewBytesPool() *BytesPool { return bytesPool } -func (p *Pool[T]) Get() []T { - return p.pool.Get().([]T) -} - -func (p *Pool[T]) GetSized(size int) []T { - b := p.Get() - if cap(b) < size { - p.Put(b) - return make([]T, size) +func (p *BytesPool) Get() []byte { + select { + case b := <-p.pool: + subInPoolSize(int64(cap(b))) + return b + default: + return make([]byte, 0, p.initSize) } - return b[:size] } -func (p *Pool[T]) Put(b []T) { - p.pool.Put(b[:0]) //nolint:staticcheck +func (p *BytesPool) GetSized(size int) []byte { + if size <= PoolThreshold { + return make([]byte, size) + } + select { + case b := <-p.pool: + if size <= cap(b) { + subInPoolSize(int64(cap(b))) + return b[:size] + } + select { + case p.pool <- b: + addInPoolSize(int64(cap(b))) + default: + } + default: + } + return make([]byte, size) +} + +func (p *BytesPool) Put(b []byte) { + size := cap(b) + if size > DropThresholdHigh || poolFull() { + return + } + b = b[:0] + select { + case p.pool <- b: + addInPoolSize(int64(size)) + return + default: + // just drop it + } +} + +var inPoolSize int64 + +func addInPoolSize(size int64) { + atomic.AddInt64(&inPoolSize, size) +} + +func subInPoolSize(size int64) { + atomic.AddInt64(&inPoolSize, -size) +} + +func init() { + // Periodically drop some buffers to prevent excessive memory usage + go func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt) + + cleanupTicker := time.NewTicker(CleanupInterval) + defer cleanupTicker.Stop() + + for { + select { + case <-cleanupTicker.C: + dropBuffers() + case <-sigCh: + return + } + } + }() +} + +func poolFull() bool { + return atomic.LoadInt64(&inPoolSize) >= InPoolLimit +} + +// dropBuffers removes excess buffers from the pool when it grows too large. +func dropBuffers() { + // Check if pool has more than a threshold of buffers + count := 0 + droppedSize := 0 + checks := 0 + for count < MaxDropsPerCycle && checks < MaxChecksPerCycle && atomic.LoadInt64(&inPoolSize) > InPoolLimit*2/3 { + select { + case b := <-bytesPool.pool: + n := cap(b) + subInPoolSize(int64(n)) + droppedSize += n + count++ + default: + time.Sleep(10 * time.Millisecond) + } + checks++ + } + if count > 0 { + logging.Debug().Int("dropped", count).Int("size", droppedSize).Msg("dropped buffers from pool") + } } diff --git a/internal/utils/synk/pool_bench_test.go b/internal/utils/synk/pool_bench_test.go new file mode 100644 index 0000000..6bb6e02 --- /dev/null +++ b/internal/utils/synk/pool_bench_test.go @@ -0,0 +1,43 @@ +package synk + +import ( + "testing" +) + +var sizes = []int{1024, 4096, 16384, 65536, 32 * 1024, 128 * 1024, 512 * 1024, 1024 * 1024} + +func BenchmarkBytesPool_GetSmall(b *testing.B) { + for b.Loop() { + bytesPool.Put(bytesPool.GetSized(1024)) + } +} + +func BenchmarkBytesPool_MakeSmall(b *testing.B) { + for b.Loop() { + _ = make([]byte, 1024) + } +} + +func BenchmarkBytesPool_GetLarge(b *testing.B) { + for b.Loop() { + bytesPool.Put(bytesPool.GetSized(1024 * 1024)) + } +} + +func BenchmarkBytesPool_MakeLarge(b *testing.B) { + for b.Loop() { + _ = make([]byte, 1024*1024) + } +} + +func BenchmarkBytesPool_GetAll(b *testing.B) { + for i := range b.N { + bytesPool.Put(bytesPool.GetSized(sizes[i%len(sizes)])) + } +} + +func BenchmarkBytesPool_MakeAll(b *testing.B) { + for i := range b.N { + _ = make([]byte, sizes[i%len(sizes)]) + } +}