mirror of
https://github.com/yusing/godoxy.git
synced 2025-05-24 14:22:33 +02:00
tweak: optimize memory usage and allocation
This commit is contained in:
parent
fc8592ab45
commit
9711867fbe
2 changed files with 172 additions and 32 deletions
|
@ -1,48 +1,145 @@
|
|||
package synk
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/yusing/go-proxy/internal/logging"
|
||||
)
|
||||
|
||||
type (
|
||||
// Pool is a wrapper of sync.Pool that limits the size of the object.
|
||||
Pool[T any] struct {
|
||||
pool *sync.Pool
|
||||
}
|
||||
BytesPool = Pool[byte]
|
||||
)
|
||||
|
||||
const DefaultInitBytes = 32 * 1024
|
||||
|
||||
func NewPool[T any](initSize int) *Pool[T] {
|
||||
return &Pool[T]{
|
||||
pool: &sync.Pool{
|
||||
New: func() any {
|
||||
return make([]T, 0, initSize)
|
||||
},
|
||||
},
|
||||
}
|
||||
type BytesPool struct {
|
||||
pool chan []byte
|
||||
initSize int
|
||||
}
|
||||
|
||||
var bytesPool = NewPool[byte](DefaultInitBytes)
|
||||
const (
|
||||
kb = 1024
|
||||
mb = 1024 * kb
|
||||
)
|
||||
|
||||
const (
|
||||
InPoolLimit = 32 * mb
|
||||
|
||||
DefaultInitBytes = 32 * kb
|
||||
PoolThreshold = 64 * kb
|
||||
DropThresholdHigh = 4 * mb
|
||||
|
||||
PoolSize = InPoolLimit / PoolThreshold
|
||||
|
||||
CleanupInterval = 5 * time.Second
|
||||
MaxDropsPerCycle = 10
|
||||
MaxChecksPerCycle = 100
|
||||
)
|
||||
|
||||
var bytesPool = &BytesPool{
|
||||
pool: make(chan []byte, PoolSize),
|
||||
initSize: DefaultInitBytes,
|
||||
}
|
||||
|
||||
func NewBytesPool() *BytesPool {
|
||||
return bytesPool
|
||||
}
|
||||
|
||||
func (p *Pool[T]) Get() []T {
|
||||
return p.pool.Get().([]T)
|
||||
}
|
||||
|
||||
func (p *Pool[T]) GetSized(size int) []T {
|
||||
b := p.Get()
|
||||
if cap(b) < size {
|
||||
p.Put(b)
|
||||
return make([]T, size)
|
||||
func (p *BytesPool) Get() []byte {
|
||||
select {
|
||||
case b := <-p.pool:
|
||||
subInPoolSize(int64(cap(b)))
|
||||
return b
|
||||
default:
|
||||
return make([]byte, 0, p.initSize)
|
||||
}
|
||||
return b[:size]
|
||||
}
|
||||
|
||||
func (p *Pool[T]) Put(b []T) {
|
||||
p.pool.Put(b[:0]) //nolint:staticcheck
|
||||
func (p *BytesPool) GetSized(size int) []byte {
|
||||
if size <= PoolThreshold {
|
||||
return make([]byte, size)
|
||||
}
|
||||
select {
|
||||
case b := <-p.pool:
|
||||
if size <= cap(b) {
|
||||
subInPoolSize(int64(cap(b)))
|
||||
return b[:size]
|
||||
}
|
||||
select {
|
||||
case p.pool <- b:
|
||||
addInPoolSize(int64(cap(b)))
|
||||
default:
|
||||
}
|
||||
default:
|
||||
}
|
||||
return make([]byte, size)
|
||||
}
|
||||
|
||||
func (p *BytesPool) Put(b []byte) {
|
||||
size := cap(b)
|
||||
if size > DropThresholdHigh || poolFull() {
|
||||
return
|
||||
}
|
||||
b = b[:0]
|
||||
select {
|
||||
case p.pool <- b:
|
||||
addInPoolSize(int64(size))
|
||||
return
|
||||
default:
|
||||
// just drop it
|
||||
}
|
||||
}
|
||||
|
||||
var inPoolSize int64
|
||||
|
||||
func addInPoolSize(size int64) {
|
||||
atomic.AddInt64(&inPoolSize, size)
|
||||
}
|
||||
|
||||
func subInPoolSize(size int64) {
|
||||
atomic.AddInt64(&inPoolSize, -size)
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Periodically drop some buffers to prevent excessive memory usage
|
||||
go func() {
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, os.Interrupt)
|
||||
|
||||
cleanupTicker := time.NewTicker(CleanupInterval)
|
||||
defer cleanupTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-cleanupTicker.C:
|
||||
dropBuffers()
|
||||
case <-sigCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func poolFull() bool {
|
||||
return atomic.LoadInt64(&inPoolSize) >= InPoolLimit
|
||||
}
|
||||
|
||||
// dropBuffers removes excess buffers from the pool when it grows too large.
|
||||
func dropBuffers() {
|
||||
// Check if pool has more than a threshold of buffers
|
||||
count := 0
|
||||
droppedSize := 0
|
||||
checks := 0
|
||||
for count < MaxDropsPerCycle && checks < MaxChecksPerCycle && atomic.LoadInt64(&inPoolSize) > InPoolLimit*2/3 {
|
||||
select {
|
||||
case b := <-bytesPool.pool:
|
||||
n := cap(b)
|
||||
subInPoolSize(int64(n))
|
||||
droppedSize += n
|
||||
count++
|
||||
default:
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
checks++
|
||||
}
|
||||
if count > 0 {
|
||||
logging.Debug().Int("dropped", count).Int("size", droppedSize).Msg("dropped buffers from pool")
|
||||
}
|
||||
}
|
||||
|
|
43
internal/utils/synk/pool_bench_test.go
Normal file
43
internal/utils/synk/pool_bench_test.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package synk
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
var sizes = []int{1024, 4096, 16384, 65536, 32 * 1024, 128 * 1024, 512 * 1024, 1024 * 1024}
|
||||
|
||||
func BenchmarkBytesPool_GetSmall(b *testing.B) {
|
||||
for b.Loop() {
|
||||
bytesPool.Put(bytesPool.GetSized(1024))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBytesPool_MakeSmall(b *testing.B) {
|
||||
for b.Loop() {
|
||||
_ = make([]byte, 1024)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBytesPool_GetLarge(b *testing.B) {
|
||||
for b.Loop() {
|
||||
bytesPool.Put(bytesPool.GetSized(1024 * 1024))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBytesPool_MakeLarge(b *testing.B) {
|
||||
for b.Loop() {
|
||||
_ = make([]byte, 1024*1024)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBytesPool_GetAll(b *testing.B) {
|
||||
for i := range b.N {
|
||||
bytesPool.Put(bytesPool.GetSized(sizes[i%len(sizes)]))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBytesPool_MakeAll(b *testing.B) {
|
||||
for i := range b.N {
|
||||
_ = make([]byte, sizes[i%len(sizes)])
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue