tweak: optimize memory usage under load
Some checks are pending
Docker Image CI (nightly) / build-nightly (push) Waiting to run
Docker Image CI (nightly) / build-nightly-agent (push) Waiting to run

This commit is contained in:
yusing 2025-05-24 22:05:42 +08:00
parent 5b7c392297
commit f0ab14cb1e
5 changed files with 127 additions and 104 deletions

View file

@ -1,16 +1,35 @@
package synk
import (
"os"
"os/signal"
"sync/atomic"
"time"
"github.com/rs/zerolog/log"
"runtime"
"unsafe"
)
type weakBuf = unsafe.Pointer
func makeWeak(b *[]byte) weakBuf {
ptr := runtime_registerWeakPointer(unsafe.Pointer(b))
runtime.KeepAlive(ptr)
addCleanup(b, addGCed, cap(*b))
return weakBuf(ptr)
}
func getBufFromWeak(w weakBuf) []byte {
ptr := (*[]byte)(runtime_makeStrongFromWeak(w))
if ptr == nil {
return nil
}
return *ptr
}
//go:linkname runtime_registerWeakPointer weak.runtime_registerWeakPointer
func runtime_registerWeakPointer(unsafe.Pointer) unsafe.Pointer
//go:linkname runtime_makeStrongFromWeak weak.runtime_makeStrongFromWeak
func runtime_makeStrongFromWeak(unsafe.Pointer) unsafe.Pointer
type BytesPool struct {
pool chan []byte
pool chan weakBuf
initSize int
}
@ -19,24 +38,18 @@ const (
mb = 1024 * kb
)
var BytesPoolEnabled = true
const (
InPoolLimit = 32 * mb
DefaultInitBytes = 32 * kb
PoolThreshold = 64 * kb
DefaultInitBytes = 4 * kb
PoolThreshold = 256 * kb
DropThresholdHigh = 4 * mb
PoolSize = InPoolLimit / PoolThreshold
CleanupInterval = 5 * time.Second
MaxDropsPerCycle = 10
MaxChecksPerCycle = 100
)
var bytesPool = &BytesPool{
pool: make(chan []byte, PoolSize),
pool: make(chan weakBuf, PoolSize),
initSize: DefaultInitBytes,
}
@ -45,110 +58,62 @@ func NewBytesPool() *BytesPool {
}
func (p *BytesPool) Get() []byte {
if !BytesPoolEnabled {
return make([]byte, 0, p.initSize)
}
for {
select {
case b := <-p.pool:
subInPoolSize(int64(cap(b)))
return b
case bWeak := <-p.pool:
bPtr := getBufFromWeak(bWeak)
if bPtr == nil {
continue
}
addReused(cap(bPtr))
return bPtr
default:
return make([]byte, 0, p.initSize)
}
}
}
func (p *BytesPool) GetSized(size int) []byte {
if !BytesPoolEnabled || size <= PoolThreshold {
if size <= PoolThreshold {
return make([]byte, size)
}
for {
select {
case b := <-p.pool:
if size <= cap(b) {
subInPoolSize(int64(cap(b)))
return b[:size]
case bWeak := <-p.pool:
bPtr := getBufFromWeak(bWeak)
if bPtr == nil {
continue
}
capB := cap(bPtr)
if capB >= size {
addReused(capB)
return (bPtr)[:size]
}
select {
case p.pool <- b:
addInPoolSize(int64(cap(b)))
case p.pool <- bWeak:
default:
// just drop it
}
default:
}
return make([]byte, size)
}
}
func (p *BytesPool) Put(b []byte) {
size := cap(b)
if size > DropThresholdHigh || poolFull() {
if size <= PoolThreshold || size > DropThresholdHigh {
return
}
b = b[:0]
w := makeWeak(&b)
select {
case p.pool <- b:
addInPoolSize(int64(size))
return
case p.pool <- w:
default:
// just drop it
}
}
var inPoolSize int64
func addInPoolSize(size int64) {
atomic.AddInt64(&inPoolSize, size)
}
func subInPoolSize(size int64) {
atomic.AddInt64(&inPoolSize, -size)
}
func init() {
// Periodically drop some buffers to prevent excessive memory usage
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
cleanupTicker := time.NewTicker(CleanupInterval)
defer cleanupTicker.Stop()
for {
if !BytesPoolEnabled {
signal.Stop(sigCh)
return
}
select {
case <-cleanupTicker.C:
dropBuffers()
case <-sigCh:
return
}
}
}()
}
func poolFull() bool {
return atomic.LoadInt64(&inPoolSize) >= InPoolLimit
}
// dropBuffers removes excess buffers from the pool when it grows too large.
func dropBuffers() {
// Check if pool has more than a threshold of buffers
count := 0
droppedSize := 0
checks := 0
for count < MaxDropsPerCycle && checks < MaxChecksPerCycle && atomic.LoadInt64(&inPoolSize) > InPoolLimit*2/3 {
select {
case b := <-bytesPool.pool:
n := cap(b)
subInPoolSize(int64(n))
droppedSize += n
count++
default:
time.Sleep(10 * time.Millisecond)
}
checks++
}
if count > 0 {
log.Debug().Int("dropped", count).Int("size", droppedSize).Msg("dropped buffers from pool")
}
initPoolStats()
}

View file

@ -4,7 +4,7 @@ import (
"testing"
)
var sizes = []int{1024, 4096, 16384, 65536, 32 * 1024, 128 * 1024, 512 * 1024, 1024 * 1024}
var sizes = []int{1024, 4096, 16384, 65536, 32 * 1024, 128 * 1024, 512 * 1024, 1024 * 1024, 2 * 1024 * 1024}
func BenchmarkBytesPool_GetSmall(b *testing.B) {
for b.Loop() {

View file

@ -0,0 +1,55 @@
//go:build !production
package synk
import (
"os"
"os/signal"
"runtime"
"sync/atomic"
"time"
"github.com/rs/zerolog/log"
"github.com/yusing/go-proxy/internal/utils/strutils"
)
var (
numReused, sizeReused uint64
numGCed, sizeGCed uint64
)
func addReused(size int) {
atomic.AddUint64(&numReused, 1)
atomic.AddUint64(&sizeReused, uint64(size))
}
func addGCed(size int) {
atomic.AddUint64(&numGCed, 1)
atomic.AddUint64(&sizeGCed, uint64(size))
}
var addCleanup = runtime.AddCleanup[[]byte, int]
func initPoolStats() {
go func() {
statsTicker := time.NewTicker(5 * time.Second)
defer statsTicker.Stop()
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
for {
select {
case <-sig:
return
case <-statsTicker.C:
log.Info().
Uint64("numReused", atomic.LoadUint64(&numReused)).
Str("sizeReused", strutils.FormatByteSize(atomic.LoadUint64(&sizeReused))).
Uint64("numGCed", atomic.LoadUint64(&numGCed)).
Str("sizeGCed", strutils.FormatByteSize(atomic.LoadUint64(&sizeGCed))).
Msg("bytes pool stats")
}
}
}()
}

View file

@ -0,0 +1,8 @@
//go:build production
package synk
func addReused(size int) {}
func addGCed(size int) {}
func initPoolStats() {}
func addCleanup(ptr *[]byte, cleanup func(int), arg int) {}

View file

@ -20,14 +20,9 @@ import (
"sync"
"github.com/yusing/go-proxy/internal/utils"
"github.com/yusing/go-proxy/internal/utils/synk"
"golang.org/x/net/http/httpguts"
)
func init() {
synk.BytesPoolEnabled = false
}
// ReverseProxy is an HTTP Handler that takes an incoming request and
// sends it to another server, proxying the response back to the
// client.