diff --git a/internal/utils/synk/pool.go b/internal/utils/synk/pool.go index 4406f69..15cbd64 100644 --- a/internal/utils/synk/pool.go +++ b/internal/utils/synk/pool.go @@ -1,16 +1,35 @@ package synk import ( - "os" - "os/signal" - "sync/atomic" - "time" - - "github.com/rs/zerolog/log" + "runtime" + "unsafe" ) +type weakBuf = unsafe.Pointer + +func makeWeak(b *[]byte) weakBuf { + ptr := runtime_registerWeakPointer(unsafe.Pointer(b)) + runtime.KeepAlive(ptr) + addCleanup(b, addGCed, cap(*b)) + return weakBuf(ptr) +} + +func getBufFromWeak(w weakBuf) []byte { + ptr := (*[]byte)(runtime_makeStrongFromWeak(w)) + if ptr == nil { + return nil + } + return *ptr +} + +//go:linkname runtime_registerWeakPointer weak.runtime_registerWeakPointer +func runtime_registerWeakPointer(unsafe.Pointer) unsafe.Pointer + +//go:linkname runtime_makeStrongFromWeak weak.runtime_makeStrongFromWeak +func runtime_makeStrongFromWeak(unsafe.Pointer) unsafe.Pointer + type BytesPool struct { - pool chan []byte + pool chan weakBuf initSize int } @@ -19,24 +38,18 @@ const ( mb = 1024 * kb ) -var BytesPoolEnabled = true - const ( InPoolLimit = 32 * mb - DefaultInitBytes = 32 * kb - PoolThreshold = 64 * kb + DefaultInitBytes = 4 * kb + PoolThreshold = 256 * kb DropThresholdHigh = 4 * mb PoolSize = InPoolLimit / PoolThreshold - - CleanupInterval = 5 * time.Second - MaxDropsPerCycle = 10 - MaxChecksPerCycle = 100 ) var bytesPool = &BytesPool{ - pool: make(chan []byte, PoolSize), + pool: make(chan weakBuf, PoolSize), initSize: DefaultInitBytes, } @@ -45,110 +58,62 @@ func NewBytesPool() *BytesPool { } func (p *BytesPool) Get() []byte { - if !BytesPoolEnabled { - return make([]byte, 0, p.initSize) - } - select { - case b := <-p.pool: - subInPoolSize(int64(cap(b))) - return b - default: - return make([]byte, 0, p.initSize) + for { + select { + case bWeak := <-p.pool: + bPtr := getBufFromWeak(bWeak) + if bPtr == nil { + continue + } + addReused(cap(bPtr)) + return bPtr + default: + return make([]byte, 0, p.initSize) + } } } func (p *BytesPool) GetSized(size int) []byte { - if !BytesPoolEnabled || size <= PoolThreshold { + if size <= PoolThreshold { return make([]byte, size) } - select { - case b := <-p.pool: - if size <= cap(b) { - subInPoolSize(int64(cap(b))) - return b[:size] - } + for { select { - case p.pool <- b: - addInPoolSize(int64(cap(b))) + case bWeak := <-p.pool: + bPtr := getBufFromWeak(bWeak) + if bPtr == nil { + continue + } + capB := cap(bPtr) + if capB >= size { + addReused(capB) + return (bPtr)[:size] + } + select { + case p.pool <- bWeak: + default: + // just drop it + } default: } - default: + return make([]byte, size) } - return make([]byte, size) } func (p *BytesPool) Put(b []byte) { size := cap(b) - if size > DropThresholdHigh || poolFull() { + if size <= PoolThreshold || size > DropThresholdHigh { return } b = b[:0] + w := makeWeak(&b) select { - case p.pool <- b: - addInPoolSize(int64(size)) - return + case p.pool <- w: default: // just drop it } } -var inPoolSize int64 - -func addInPoolSize(size int64) { - atomic.AddInt64(&inPoolSize, size) -} - -func subInPoolSize(size int64) { - atomic.AddInt64(&inPoolSize, -size) -} - func init() { - // Periodically drop some buffers to prevent excessive memory usage - go func() { - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, os.Interrupt) - - cleanupTicker := time.NewTicker(CleanupInterval) - defer cleanupTicker.Stop() - - for { - if !BytesPoolEnabled { - signal.Stop(sigCh) - return - } - select { - case <-cleanupTicker.C: - dropBuffers() - case <-sigCh: - return - } - } - }() -} - -func poolFull() bool { - return atomic.LoadInt64(&inPoolSize) >= InPoolLimit -} - -// dropBuffers removes excess buffers from the pool when it grows too large. -func dropBuffers() { - // Check if pool has more than a threshold of buffers - count := 0 - droppedSize := 0 - checks := 0 - for count < MaxDropsPerCycle && checks < MaxChecksPerCycle && atomic.LoadInt64(&inPoolSize) > InPoolLimit*2/3 { - select { - case b := <-bytesPool.pool: - n := cap(b) - subInPoolSize(int64(n)) - droppedSize += n - count++ - default: - time.Sleep(10 * time.Millisecond) - } - checks++ - } - if count > 0 { - log.Debug().Int("dropped", count).Int("size", droppedSize).Msg("dropped buffers from pool") - } + initPoolStats() } diff --git a/internal/utils/synk/pool_bench_test.go b/internal/utils/synk/pool_bench_test.go index 6bb6e02..7339a23 100644 --- a/internal/utils/synk/pool_bench_test.go +++ b/internal/utils/synk/pool_bench_test.go @@ -4,7 +4,7 @@ import ( "testing" ) -var sizes = []int{1024, 4096, 16384, 65536, 32 * 1024, 128 * 1024, 512 * 1024, 1024 * 1024} +var sizes = []int{1024, 4096, 16384, 65536, 32 * 1024, 128 * 1024, 512 * 1024, 1024 * 1024, 2 * 1024 * 1024} func BenchmarkBytesPool_GetSmall(b *testing.B) { for b.Loop() { diff --git a/internal/utils/synk/pool_debug.go b/internal/utils/synk/pool_debug.go new file mode 100644 index 0000000..0a7aa00 --- /dev/null +++ b/internal/utils/synk/pool_debug.go @@ -0,0 +1,55 @@ +//go:build !production + +package synk + +import ( + "os" + "os/signal" + "runtime" + "sync/atomic" + "time" + + "github.com/rs/zerolog/log" + "github.com/yusing/go-proxy/internal/utils/strutils" +) + +var ( + numReused, sizeReused uint64 + numGCed, sizeGCed uint64 +) + +func addReused(size int) { + atomic.AddUint64(&numReused, 1) + atomic.AddUint64(&sizeReused, uint64(size)) +} + +func addGCed(size int) { + atomic.AddUint64(&numGCed, 1) + atomic.AddUint64(&sizeGCed, uint64(size)) +} + +var addCleanup = runtime.AddCleanup[[]byte, int] + +func initPoolStats() { + go func() { + statsTicker := time.NewTicker(5 * time.Second) + defer statsTicker.Stop() + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt) + + for { + select { + case <-sig: + return + case <-statsTicker.C: + log.Info(). + Uint64("numReused", atomic.LoadUint64(&numReused)). + Str("sizeReused", strutils.FormatByteSize(atomic.LoadUint64(&sizeReused))). + Uint64("numGCed", atomic.LoadUint64(&numGCed)). + Str("sizeGCed", strutils.FormatByteSize(atomic.LoadUint64(&sizeGCed))). + Msg("bytes pool stats") + } + } + }() +} diff --git a/internal/utils/synk/pool_prod.go b/internal/utils/synk/pool_prod.go new file mode 100644 index 0000000..3cca12d --- /dev/null +++ b/internal/utils/synk/pool_prod.go @@ -0,0 +1,8 @@ +//go:build production + +package synk + +func addReused(size int) {} +func addGCed(size int) {} +func initPoolStats() {} +func addCleanup(ptr *[]byte, cleanup func(int), arg int) {} diff --git a/socket-proxy/pkg/reverseproxy/reverse_proxy.go b/socket-proxy/pkg/reverseproxy/reverse_proxy.go index 0c92ede..b2abaff 100644 --- a/socket-proxy/pkg/reverseproxy/reverse_proxy.go +++ b/socket-proxy/pkg/reverseproxy/reverse_proxy.go @@ -20,14 +20,9 @@ import ( "sync" "github.com/yusing/go-proxy/internal/utils" - "github.com/yusing/go-proxy/internal/utils/synk" "golang.org/x/net/http/httpguts" ) -func init() { - synk.BytesPoolEnabled = false -} - // ReverseProxy is an HTTP Handler that takes an incoming request and // sends it to another server, proxying the response back to the // client.