fix: improved sync.Pool handling

This commit is contained in:
yusing 2025-04-17 14:17:50 +08:00
parent a35ac33bd5
commit cfd1d8fff0
4 changed files with 65 additions and 41 deletions

View file

@ -11,6 +11,7 @@ import (
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/task"
"github.com/yusing/go-proxy/internal/utils/synk"
)
type (
@ -20,7 +21,7 @@ type (
io AccessLogIO
buffered *bufio.Writer
lineBufPool sync.Pool // buffer pool for formatting a single log line
lineBufPool *synk.BytesPool // buffer pool for formatting a single log line
Formatter
}
@ -78,7 +79,7 @@ func NewAccessLoggerWithIO(parent task.Parent, io AccessLogIO, cfg *Config) *Acc
cfg.BufferSize = 4096
}
l := &AccessLogger{
task: parent.Subtask("accesslog"),
task: parent.Subtask("accesslog."+io.Name(), true),
cfg: cfg,
io: io,
buffered: bufio.NewWriterSize(io, cfg.BufferSize),
@ -96,9 +97,7 @@ func NewAccessLoggerWithIO(parent task.Parent, io AccessLogIO, cfg *Config) *Acc
panic("invalid access log format")
}
l.lineBufPool.New = func() any {
return bytes.NewBuffer(make([]byte, 0, 1024))
}
l.lineBufPool = synk.NewBytesPool(1024, synk.DefaultMaxBytes)
go l.start()
return l
}
@ -118,12 +117,11 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) {
return
}
line := l.lineBufPool.Get().(*bytes.Buffer)
line.Reset()
line := l.lineBufPool.Get()
defer l.lineBufPool.Put(line)
l.Formatter.Format(line, req, res)
line.WriteRune('\n')
l.write(line.Bytes())
l.Formatter.Format(bytes.NewBuffer(line), req, res)
line = append(line, '\n')
l.write(line)
}
func (l *AccessLogger) LogError(req *http.Request, err error) {

View file

@ -9,6 +9,7 @@ import (
"syscall"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/utils/synk"
)
// TODO: move to "utils/io".
@ -117,24 +118,21 @@ func getHttpFlusher(dst io.Writer) httpFlusher {
return nil
}
const (
copyBufSize = 32 * 1024
)
const copyBufSize = 32 * 1024
var copyBufPool = sync.Pool{
New: func() any {
return make([]byte, copyBufSize)
},
}
var copyBufPool = synk.NewBytesPool(copyBufSize, synk.DefaultMaxBytes)
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// This is a copy of io.Copy with context and HTTP flusher handling
// Author: yusing <yusing@6uo.me>.
func CopyClose(dst *ContextWriter, src *ContextReader) (err error) {
var buf []byte
buf := copyBufPool.Get()
defer copyBufPool.Put(buf)
var size int
if l, ok := src.Reader.(*io.LimitedReader); ok {
size := copyBufSize
size = copyBufSize
if int64(size) > l.N {
if l.N < 1 {
size = 1
@ -142,10 +140,8 @@ func CopyClose(dst *ContextWriter, src *ContextReader) (err error) {
size = int(l.N)
}
}
buf = make([]byte, 0, size)
} else {
buf = copyBufPool.Get().([]byte)
defer copyBufPool.Put(buf[:0])
size = cap(buf)
}
// close both as soon as one of them is done
wCloser, wCanClose := dst.Writer.(io.Closer)
@ -179,7 +175,7 @@ func CopyClose(dst *ContextWriter, src *ContextReader) (err error) {
flusher := getHttpFlusher(dst.Writer)
canFlush := flusher != nil
for {
nr, er := src.Reader.Read(buf[:copyBufSize])
nr, er := src.Reader.Read(buf[:size])
if nr > 0 {
nw, ew := dst.Writer.Write(buf[0:nr])
if nw < 0 || nr < nw {

View file

@ -0,0 +1,42 @@
package synk
import "sync"
type (
// Pool is a wrapper of sync.Pool that limits the size of the object.
Pool[T any] struct {
pool sync.Pool
maxSize int
}
BytesPool = Pool[byte]
)
const (
DefaultInitBytes = 1024
DefaultMaxBytes = 1024 * 1024
)
func NewPool[T any](initSize int, maxSize int) *Pool[T] {
return &Pool[T]{
pool: sync.Pool{
New: func() any {
return make([]T, 0, initSize)
},
},
maxSize: maxSize,
}
}
func NewBytesPool(initSize int, maxSize int) *BytesPool {
return NewPool[byte](initSize, maxSize)
}
func (p *Pool[T]) Get() []T {
return p.pool.Get().([]T)
}
func (p *Pool[T]) Put(b []T) {
if cap(b) <= p.maxSize {
p.pool.Put(b[:0])
}
}

View file

@ -2,9 +2,9 @@ package json
import (
"reflect"
"sync"
"github.com/bytedance/sonic"
"github.com/yusing/go-proxy/internal/utils/synk"
)
type Marshaler interface {
@ -38,8 +38,8 @@ var (
//
// - It does not support maps other than string-keyed maps.
func Marshal(v any) ([]byte, error) {
buf := newBytes()
defer putBytes(buf)
buf := bytesPool.Get()
defer bytesPool.Put(buf)
return cloneBytes(appendMarshal(reflect.ValueOf(v), buf)), nil
}
@ -47,21 +47,9 @@ func MarshalTo(v any, buf []byte) []byte {
return appendMarshal(reflect.ValueOf(v), buf)
}
const bufSize = 8192
const initBufSize = 4096
var bytesPool = sync.Pool{
New: func() any {
return make([]byte, 0, bufSize)
},
}
func newBytes() []byte {
return bytesPool.Get().([]byte)
}
func putBytes(buf []byte) {
bytesPool.Put(buf[:0])
}
var bytesPool = synk.NewBytesPool(initBufSize, synk.DefaultMaxBytes)
func cloneBytes(buf []byte) (res []byte) {
return append(res, buf...)