From 8469b6406c9c2ea86225c54996a67ecd2cda6bbb Mon Sep 17 00:00:00 2001 From: yusing Date: Sun, 25 May 2025 16:20:12 +0800 Subject: [PATCH] tweak: consolidate bytes pool management and enhance CopyClose functionality for improved performance --- cmd/pprof_prof.go | 35 ++++++++++++-- internal/logging/accesslog/access_logger.go | 13 +++--- .../net/gphttp/reverseproxy/reverse_proxy.go | 2 +- internal/utils/io.go | 12 +++-- internal/utils/synk/pool.go | 46 ++++++++++++------- internal/utils/synk/pool_debug.go | 12 ++++- internal/utils/synk/pool_prod.go | 1 + .../pkg/reverseproxy/reverse_proxy.go | 2 +- 8 files changed, 86 insertions(+), 37 deletions(-) diff --git a/cmd/pprof_prof.go b/cmd/pprof_prof.go index a3fbaa1..11f7a42 100644 --- a/cmd/pprof_prof.go +++ b/cmd/pprof_prof.go @@ -3,18 +3,43 @@ package main import ( - "log" "net/http" _ "net/http/pprof" "runtime" "runtime/debug" + "time" + + "github.com/rs/zerolog/log" + "github.com/yusing/go-proxy/internal/utils/strutils" ) +const mb = 1024 * 1024 + func initProfiling() { - runtime.GOMAXPROCS(2) - debug.SetMemoryLimit(100 * 1024 * 1024) - debug.SetMaxStack(15 * 1024 * 1024) + debug.SetGCPercent(-1) + debug.SetMemoryLimit(50 * mb) + debug.SetMaxStack(4 * mb) + go func() { - log.Println(http.ListenAndServe(":7777", nil)) + log.Info().Msgf("pprof server started at http://localhost:7777/debug/pprof/") + log.Error().Err(http.ListenAndServe(":7777", nil)).Msg("pprof server failed") + }() + go func() { + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + for range ticker.C { + var m runtime.MemStats + runtime.ReadMemStats(&m) + log.Info().Msgf("-----------------------------------------------------") + log.Info().Msgf("Timestamp: %s", time.Now().Format(time.RFC3339)) + log.Info().Msgf(" Go Heap - In Use (Alloc/HeapAlloc): %s", strutils.FormatByteSize(m.Alloc)) + log.Info().Msgf(" Go Heap - Reserved from OS (HeapSys): %s", strutils.FormatByteSize(m.HeapSys)) + log.Info().Msgf(" Go Stacks - In Use (StackInuse): %s", strutils.FormatByteSize(m.StackInuse)) + log.Info().Msgf(" Go Runtime - Other Sys (MSpanInuse, MCacheInuse, BuckHashSys, GCSys, OtherSys): %s", strutils.FormatByteSize(m.MSpanInuse+m.MCacheInuse+m.BuckHashSys+m.GCSys+m.OtherSys)) + log.Info().Msgf(" Go Runtime - Total from OS (Sys): %s", strutils.FormatByteSize(m.Sys)) + log.Info().Msgf(" Number of Goroutines: %d", runtime.NumGoroutine()) + log.Info().Msgf(" Number of GCs: %d", m.NumGC) + log.Info().Msg("-----------------------------------------------------") + } }() } diff --git a/internal/logging/accesslog/access_logger.go b/internal/logging/accesslog/access_logger.go index 647181f..13a8b4d 100644 --- a/internal/logging/accesslog/access_logger.go +++ b/internal/logging/accesslog/access_logger.go @@ -33,8 +33,6 @@ type ( writeCount int64 bufSize int - lineBufPool *synk.BytesPool // buffer pool for formatting a single log line - errRateLimiter *rate.Limiter logger zerolog.Logger @@ -78,6 +76,8 @@ const ( errBurst = 5 ) +var lineBufPool = synk.NewBytesPool() + func NewAccessLogger(parent task.Parent, cfg AnyConfig) (*AccessLogger, error) { io, err := cfg.IO() if err != nil { @@ -121,7 +121,6 @@ func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg Any rawWriter: writer, writer: utils.NewBufferedWriter(writer, MinBufferSize), bufSize: MinBufferSize, - lineBufPool: synk.NewBytesPool(), errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst), logger: log.With().Str("file", writer.Name()).Logger(), } @@ -168,8 +167,8 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) { return } - line := l.lineBufPool.Get() - defer l.lineBufPool.Put(line) + line := lineBufPool.Get() + defer lineBufPool.Put(line) line = l.AppendRequestLog(line, req, res) if line[len(line)-1] != '\n' { line = append(line, '\n') @@ -182,8 +181,8 @@ func (l *AccessLogger) LogError(req *http.Request, err error) { } func (l *AccessLogger) LogACL(info *maxmind.IPInfo, blocked bool) { - line := l.lineBufPool.Get() - defer l.lineBufPool.Put(line) + line := lineBufPool.Get() + defer lineBufPool.Put(line) line = l.AppendACLLog(line, info, blocked) if line[len(line)-1] != '\n' { line = append(line, '\n') diff --git a/internal/net/gphttp/reverseproxy/reverse_proxy.go b/internal/net/gphttp/reverseproxy/reverse_proxy.go index 59faafe..42ddcbc 100644 --- a/internal/net/gphttp/reverseproxy/reverse_proxy.go +++ b/internal/net/gphttp/reverseproxy/reverse_proxy.go @@ -411,7 +411,7 @@ func (p *ReverseProxy) handler(rw http.ResponseWriter, req *http.Request) { rw.WriteHeader(res.StatusCode) - err = U.CopyCloseWithContext(ctx, rw, res.Body) // close now, instead of defer, to populate res.Trailer + err = U.CopyCloseWithContext(ctx, rw, res.Body, int(res.ContentLength)) // close now, instead of defer, to populate res.Trailer if err != nil { if !errors.Is(err, context.Canceled) { p.errorHandler(rw, req, err, false) diff --git a/internal/utils/io.go b/internal/utils/io.go index 33881fc..ee40600 100644 --- a/internal/utils/io.go +++ b/internal/utils/io.go @@ -72,7 +72,7 @@ func NewPipe(ctx context.Context, r io.ReadCloser, w io.WriteCloser) *Pipe { } func (p *Pipe) Start() (err error) { - err = CopyClose(&p.w, &p.r) + err = CopyClose(&p.w, &p.r, 0) switch { case // NOTE: ignoring broken pipe and connection reset by peer @@ -117,7 +117,7 @@ func getHTTPFlusher(dst io.Writer) httpFlusher { return nil } -const copyBufSize = 32 * 1024 +const copyBufSize = synk.SizedPoolThreshold var bytesPool = synk.NewBytesPool() @@ -125,7 +125,7 @@ var bytesPool = synk.NewBytesPool() // Use of this source code is governed by a BSD-style // This is a copy of io.Copy with context and HTTP flusher handling // Author: yusing . -func CopyClose(dst *ContextWriter, src *ContextReader) (err error) { +func CopyClose(dst *ContextWriter, src *ContextReader, sizeHint int) (err error) { size := copyBufSize if l, ok := src.Reader.(*io.LimitedReader); ok { if int64(size) > l.N { @@ -135,6 +135,8 @@ func CopyClose(dst *ContextWriter, src *ContextReader) (err error) { size = int(l.N) } } + } else if sizeHint > 0 { + size = min(size, sizeHint) } buf := bytesPool.GetSized(size) defer bytesPool.Put(buf) @@ -196,6 +198,6 @@ func CopyClose(dst *ContextWriter, src *ContextReader) (err error) { } } -func CopyCloseWithContext(ctx context.Context, dst io.Writer, src io.Reader) (err error) { - return CopyClose(NewContextWriter(ctx, dst), NewContextReader(ctx, src)) +func CopyCloseWithContext(ctx context.Context, dst io.Writer, src io.Reader, sizeHint int) (err error) { + return CopyClose(NewContextWriter(ctx, dst), NewContextReader(ctx, src), sizeHint) } diff --git a/internal/utils/synk/pool.go b/internal/utils/synk/pool.go index 15cbd64..13708de 100644 --- a/internal/utils/synk/pool.go +++ b/internal/utils/synk/pool.go @@ -9,8 +9,8 @@ type weakBuf = unsafe.Pointer func makeWeak(b *[]byte) weakBuf { ptr := runtime_registerWeakPointer(unsafe.Pointer(b)) - runtime.KeepAlive(ptr) addCleanup(b, addGCed, cap(*b)) + runtime.KeepAlive(ptr) return weakBuf(ptr) } @@ -29,8 +29,9 @@ func runtime_registerWeakPointer(unsafe.Pointer) unsafe.Pointer func runtime_makeStrongFromWeak(unsafe.Pointer) unsafe.Pointer type BytesPool struct { - pool chan weakBuf - initSize int + sizedPool chan weakBuf + unsizedPool chan weakBuf + initSize int } const ( @@ -41,16 +42,18 @@ const ( const ( InPoolLimit = 32 * mb - DefaultInitBytes = 4 * kb - PoolThreshold = 256 * kb - DropThresholdHigh = 4 * mb + UnsizedAvg = 4 * kb + SizedPoolThreshold = 256 * kb + DropThreshold = 4 * mb - PoolSize = InPoolLimit / PoolThreshold + SizedPoolSize = InPoolLimit * 8 / 10 / SizedPoolThreshold + UnsizedPoolSize = InPoolLimit * 2 / 10 / UnsizedAvg ) var bytesPool = &BytesPool{ - pool: make(chan weakBuf, PoolSize), - initSize: DefaultInitBytes, + sizedPool: make(chan weakBuf, SizedPoolSize), + unsizedPool: make(chan weakBuf, UnsizedPoolSize), + initSize: UnsizedAvg, } func NewBytesPool() *BytesPool { @@ -60,7 +63,7 @@ func NewBytesPool() *BytesPool { func (p *BytesPool) Get() []byte { for { select { - case bWeak := <-p.pool: + case bWeak := <-p.unsizedPool: bPtr := getBufFromWeak(bWeak) if bPtr == nil { continue @@ -68,18 +71,19 @@ func (p *BytesPool) Get() []byte { addReused(cap(bPtr)) return bPtr default: - return make([]byte, 0, p.initSize) + return make([]byte, 0) } } } func (p *BytesPool) GetSized(size int) []byte { - if size <= PoolThreshold { + if size <= SizedPoolThreshold { + addNonPooled(size) return make([]byte, size) } for { select { - case bWeak := <-p.pool: + case bWeak := <-p.sizedPool: bPtr := getBufFromWeak(bWeak) if bPtr == nil { continue @@ -90,25 +94,35 @@ func (p *BytesPool) GetSized(size int) []byte { return (bPtr)[:size] } select { - case p.pool <- bWeak: + case p.sizedPool <- bWeak: default: // just drop it } default: } + addNonPooled(size) return make([]byte, size) } } func (p *BytesPool) Put(b []byte) { size := cap(b) - if size <= PoolThreshold || size > DropThresholdHigh { + if size > DropThreshold { return } b = b[:0] w := makeWeak(&b) + if size <= SizedPoolThreshold { + p.put(w, p.unsizedPool) + } else { + p.put(w, p.sizedPool) + } +} + +//go:inline +func (p *BytesPool) put(w weakBuf, pool chan weakBuf) { select { - case p.pool <- w: + case pool <- w: default: // just drop it } diff --git a/internal/utils/synk/pool_debug.go b/internal/utils/synk/pool_debug.go index 0a7aa00..6dfad8c 100644 --- a/internal/utils/synk/pool_debug.go +++ b/internal/utils/synk/pool_debug.go @@ -14,10 +14,16 @@ import ( ) var ( - numReused, sizeReused uint64 - numGCed, sizeGCed uint64 + numNonPooled, sizeNonPooled uint64 + numReused, sizeReused uint64 + numGCed, sizeGCed uint64 ) +func addNonPooled(size int) { + atomic.AddUint64(&numNonPooled, 1) + atomic.AddUint64(&sizeNonPooled, uint64(size)) +} + func addReused(size int) { atomic.AddUint64(&numReused, 1) atomic.AddUint64(&sizeReused, uint64(size)) @@ -48,6 +54,8 @@ func initPoolStats() { Str("sizeReused", strutils.FormatByteSize(atomic.LoadUint64(&sizeReused))). Uint64("numGCed", atomic.LoadUint64(&numGCed)). Str("sizeGCed", strutils.FormatByteSize(atomic.LoadUint64(&sizeGCed))). + Uint64("numNonPooled", atomic.LoadUint64(&numNonPooled)). + Str("sizeNonPooled", strutils.FormatByteSize(atomic.LoadUint64(&sizeNonPooled))). Msg("bytes pool stats") } } diff --git a/internal/utils/synk/pool_prod.go b/internal/utils/synk/pool_prod.go index 3cca12d..b1708aa 100644 --- a/internal/utils/synk/pool_prod.go +++ b/internal/utils/synk/pool_prod.go @@ -2,6 +2,7 @@ package synk +func addNonPooled(size int) {} func addReused(size int) {} func addGCed(size int) {} func initPoolStats() {} diff --git a/socket-proxy/pkg/reverseproxy/reverse_proxy.go b/socket-proxy/pkg/reverseproxy/reverse_proxy.go index b2abaff..d8ec555 100644 --- a/socket-proxy/pkg/reverseproxy/reverse_proxy.go +++ b/socket-proxy/pkg/reverseproxy/reverse_proxy.go @@ -218,7 +218,7 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { rw.WriteHeader(res.StatusCode) - err = utils.CopyCloseWithContext(ctx, rw, res.Body) + err = utils.CopyCloseWithContext(ctx, rw, res.Body, int(res.ContentLength)) if err != nil { if !errors.Is(err, context.Canceled) { p.getErrorHandler()(rw, req, err)