From 2e68baa93e554527f07e2c5c30fb05d91d1cd68c Mon Sep 17 00:00:00 2001 From: yusing Date: Fri, 16 May 2025 07:15:45 +0800 Subject: [PATCH] tweak: optimize memory allocation and increase throughput --- internal/logging/accesslog/access_logger.go | 15 +- internal/logging/accesslog/rotate.go | 3 +- internal/metrics/systeminfo/json.go | 2 +- internal/utils/buf_writer.go | 210 ++++++++++++++++++++ internal/utils/io.go | 26 +-- internal/utils/synk/pool.go | 36 ++-- 6 files changed, 252 insertions(+), 40 deletions(-) create mode 100644 internal/utils/buf_writer.go diff --git a/internal/logging/accesslog/access_logger.go b/internal/logging/accesslog/access_logger.go index 77b28d2..8d6a83e 100644 --- a/internal/logging/accesslog/access_logger.go +++ b/internal/logging/accesslog/access_logger.go @@ -1,7 +1,6 @@ package accesslog import ( - "bufio" "io" "net/http" "sync" @@ -13,6 +12,7 @@ import ( "github.com/yusing/go-proxy/internal/logging" maxmind "github.com/yusing/go-proxy/internal/maxmind/types" "github.com/yusing/go-proxy/internal/task" + "github.com/yusing/go-proxy/internal/utils" "github.com/yusing/go-proxy/internal/utils/strutils" "github.com/yusing/go-proxy/internal/utils/synk" "golang.org/x/time/rate" @@ -26,7 +26,7 @@ type ( rawWriter io.Writer closer []io.Closer supportRotate []supportRotate - writer *bufio.Writer + writer *utils.BufferedWriter writeLock sync.Mutex closed bool @@ -116,9 +116,9 @@ func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg Any task: parent.Subtask("accesslog."+writer.Name(), true), cfg: cfg, rawWriter: writer, - writer: bufio.NewWriterSize(writer, MinBufferSize), + writer: utils.NewBufferedWriter(writer, MinBufferSize), bufSize: MinBufferSize, - lineBufPool: synk.NewBytesPool(256, 768), // for common/combined usually < 256B; for json < 512B + lineBufPool: synk.NewBytesPool(), errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst), logger: logging.With().Str("file", writer.Name()).Logger(), } @@ -269,6 +269,7 @@ func (l *AccessLogger) Close() error { c.Close() } } + l.writer.Release() l.closed = true return nil } @@ -339,6 +340,10 @@ func (l *AccessLogger) adjustBuffer() { Str("new", strutils.FormatByteSize(newBufSize)). Msg("adjusted buffer size") - l.writer = bufio.NewWriterSize(l.rawWriter, newBufSize) + err := l.writer.Resize(newBufSize) + if err != nil { + l.handleErr(err) + return + } l.bufSize = newBufSize } diff --git a/internal/logging/accesslog/rotate.go b/internal/logging/accesslog/rotate.go index 614f2ad..52d9171 100644 --- a/internal/logging/accesslog/rotate.go +++ b/internal/logging/accesslog/rotate.go @@ -66,8 +66,7 @@ type lineInfo struct { Size int64 // Size of this line } -// do not allocate initial size -var rotateBytePool = synk.NewBytesPool(0, 16*1024*1024) +var rotateBytePool = synk.NewBytesPool() // rotateLogFile rotates the log file based on the retention policy. // It returns the result of the rotation and an error if any. diff --git a/internal/metrics/systeminfo/json.go b/internal/metrics/systeminfo/json.go index 6a8a6d6..9898fc8 100644 --- a/internal/metrics/systeminfo/json.go +++ b/internal/metrics/systeminfo/json.go @@ -9,7 +9,7 @@ import ( "github.com/yusing/go-proxy/internal/utils/synk" ) -var bufPool = synk.NewBytesPool(1024, 16384) +var bufPool = synk.NewBytesPool() // explicitly implement MarshalJSON to avoid reflection. func (s *SystemInfo) MarshalJSON() ([]byte, error) { diff --git a/internal/utils/buf_writer.go b/internal/utils/buf_writer.go new file mode 100644 index 0000000..75df4af --- /dev/null +++ b/internal/utils/buf_writer.go @@ -0,0 +1,210 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Modified from bufio.Writer by yusing . +package utils + +import ( + "io" + "unicode/utf8" +) + +// buffered output + +// BufferedWriter implements buffering for an [io.BufferedWriter] object. +// If an error occurs writing to a [BufferedWriter], no more data will be +// accepted and all subsequent writes, and [BufferedWriter.Flush], will return the error. +// After all data has been written, the client should call the +// [BufferedWriter.Flush] method to guarantee all data has been forwarded to +// the underlying [io.BufferedWriter]. +type BufferedWriter struct { + err error + buf []byte + n int + wr io.Writer +} + +// NewBufferedWriter returns a new [BufferedWriter] whose buffer has at least the specified +// size. If the argument io.Writer is already a [BufferedWriter] with large enough +// size, it returns the underlying [BufferedWriter]. +func NewBufferedWriter(w io.Writer, size int) *BufferedWriter { + // Is it already a Writer? + b, ok := w.(*BufferedWriter) + if ok && len(b.buf) >= size { + return b + } + return &BufferedWriter{ + buf: bytesPool.GetSized(size), + wr: w, + } +} + +// Size returns the size of the underlying buffer in bytes. +func (b *BufferedWriter) Size() int { return len(b.buf) } + +func (b *BufferedWriter) Resize(size int) error { + err := b.Flush() + if err != nil { + return err + } + if cap(b.buf) >= size { + b.buf = b.buf[:size] + } else { + b.Release() + b.buf = bytesPool.GetSized(size) + } + b.err = nil + b.n = 0 + return nil +} + +func (b *BufferedWriter) Release() { + bytesPool.Put(b.buf) +} + +// Flush writes any buffered data to the underlying [io.Writer]. +func (b *BufferedWriter) Flush() error { + if b.err != nil { + return b.err + } + if b.n == 0 { + return nil + } + n, err := b.wr.Write(b.buf[0:b.n]) + if n < b.n && err == nil { + err = io.ErrShortWrite + } + if err != nil { + if n > 0 && n < b.n { + copy(b.buf[0:b.n-n], b.buf[n:b.n]) + } + b.n -= n + b.err = err + return err + } + b.n = 0 + return nil +} + +// Available returns how many bytes are unused in the buffer. +func (b *BufferedWriter) Available() int { return len(b.buf) - b.n } + +// AvailableBuffer returns an empty buffer with b.Available() capacity. +// This buffer is intended to be appended to and +// passed to an immediately succeeding [BufferedWriter.Write] call. +// The buffer is only valid until the next write operation on b. +func (b *BufferedWriter) AvailableBuffer() []byte { + return b.buf[b.n:][:0] +} + +// Buffered returns the number of bytes that have been written into the current buffer. +func (b *BufferedWriter) Buffered() int { return b.n } + +// Write writes the contents of p into the buffer. +// It returns the number of bytes written. +// If nn < len(p), it also returns an error explaining +// why the write is short. +func (b *BufferedWriter) Write(p []byte) (nn int, err error) { + for len(p) > b.Available() && b.err == nil { + var n int + if b.Buffered() == 0 { + // Large write, empty buffer. + // Write directly from p to avoid copy. + n, b.err = b.wr.Write(p) + } else { + n = copy(b.buf[b.n:], p) + b.n += n + b.Flush() + } + nn += n + p = p[n:] + } + if b.err != nil { + return nn, b.err + } + n := copy(b.buf[b.n:], p) + b.n += n + nn += n + return nn, nil +} + +// WriteByte writes a single byte. +func (b *BufferedWriter) WriteByte(c byte) error { + if b.err != nil { + return b.err + } + if b.Available() <= 0 && b.Flush() != nil { + return b.err + } + b.buf[b.n] = c + b.n++ + return nil +} + +// WriteRune writes a single Unicode code point, returning +// the number of bytes written and any error. +func (b *BufferedWriter) WriteRune(r rune) (size int, err error) { + // Compare as uint32 to correctly handle negative runes. + if uint32(r) < utf8.RuneSelf { + err = b.WriteByte(byte(r)) + if err != nil { + return 0, err + } + return 1, nil + } + if b.err != nil { + return 0, b.err + } + n := b.Available() + if n < utf8.UTFMax { + if b.Flush(); b.err != nil { + return 0, b.err + } + n = b.Available() + if n < utf8.UTFMax { + // Can only happen if buffer is silly small. + return b.WriteString(string(r)) + } + } + size = utf8.EncodeRune(b.buf[b.n:], r) + b.n += size + return size, nil +} + +// WriteString writes a string. +// It returns the number of bytes written. +// If the count is less than len(s), it also returns an error explaining +// why the write is short. +func (b *BufferedWriter) WriteString(s string) (int, error) { + var sw io.StringWriter + tryStringWriter := true + + nn := 0 + for len(s) > b.Available() && b.err == nil { + var n int + if b.Buffered() == 0 && sw == nil && tryStringWriter { + // Check at most once whether b.wr is a StringWriter. + sw, tryStringWriter = b.wr.(io.StringWriter) + } + if b.Buffered() == 0 && tryStringWriter { + // Large write, empty buffer, and the underlying writer supports + // WriteString: forward the write to the underlying StringWriter. + // This avoids an extra copy. + n, b.err = sw.WriteString(s) + } else { + n = copy(b.buf[b.n:], s) + b.n += n + b.Flush() + } + nn += n + s = s[n:] + } + if b.err != nil { + return nn, b.err + } + n := copy(b.buf[b.n:], s) + b.n += n + nn += n + return nn, nil +} diff --git a/internal/utils/io.go b/internal/utils/io.go index 2ee52d2..5d8488a 100644 --- a/internal/utils/io.go +++ b/internal/utils/io.go @@ -9,6 +9,7 @@ import ( "syscall" "github.com/yusing/go-proxy/internal/gperr" + "github.com/yusing/go-proxy/internal/utils/synk" ) // TODO: move to "utils/io". @@ -110,31 +111,24 @@ type httpFlusher interface { Flush() error } -func getHttpFlusher(dst io.Writer) httpFlusher { +func getHTTPFlusher(dst io.Writer) httpFlusher { if rw, ok := dst.(http.ResponseWriter); ok { return http.NewResponseController(rw) } return nil } -const ( - copyBufSize = 32 * 1024 -) +const copyBufSize = 32 * 1024 -var copyBufPool = sync.Pool{ - New: func() any { - return make([]byte, copyBufSize) - }, -} +var bytesPool = synk.NewBytesPool() // Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // This is a copy of io.Copy with context and HTTP flusher handling // Author: yusing . func CopyClose(dst *ContextWriter, src *ContextReader) (err error) { - var buf []byte + size := copyBufSize if l, ok := src.Reader.(*io.LimitedReader); ok { - size := copyBufSize if int64(size) > l.N { if l.N < 1 { size = 1 @@ -142,11 +136,9 @@ func CopyClose(dst *ContextWriter, src *ContextReader) (err error) { size = int(l.N) } } - buf = make([]byte, 0, size) - } else { - buf = copyBufPool.Get().([]byte) - defer copyBufPool.Put(buf) } + buf := bytesPool.GetSized(size) + defer bytesPool.Put(buf) // close both as soon as one of them is done wCloser, wCanClose := dst.Writer.(io.Closer) rCloser, rCanClose := src.Reader.(io.Closer) @@ -176,10 +168,10 @@ func CopyClose(dst *ContextWriter, src *ContextReader) (err error) { } } } - flusher := getHttpFlusher(dst.Writer) + flusher := getHTTPFlusher(dst.Writer) canFlush := flusher != nil for { - nr, er := src.Reader.Read(buf[:copyBufSize]) + nr, er := src.Reader.Read(buf) if nr > 0 { nw, ew := dst.Writer.Write(buf[0:nr]) if nw < 0 || nr < nw { diff --git a/internal/utils/synk/pool.go b/internal/utils/synk/pool.go index 098147b..df1d015 100644 --- a/internal/utils/synk/pool.go +++ b/internal/utils/synk/pool.go @@ -1,42 +1,48 @@ package synk -import "sync" +import ( + "sync" +) type ( // Pool is a wrapper of sync.Pool that limits the size of the object. Pool[T any] struct { - pool sync.Pool - maxSize int + pool *sync.Pool } BytesPool = Pool[byte] ) -const ( - DefaultInitBytes = 1024 - DefaultMaxBytes = 1024 * 1024 -) +const DefaultInitBytes = 32 * 1024 -func NewPool[T any](initSize int, maxSize int) *Pool[T] { +func NewPool[T any](initSize int) *Pool[T] { return &Pool[T]{ - pool: sync.Pool{ + pool: &sync.Pool{ New: func() any { return make([]T, 0, initSize) }, }, - maxSize: maxSize, } } -func NewBytesPool(initSize int, maxSize int) *BytesPool { - return NewPool[byte](initSize, maxSize) +var bytesPool = NewPool[byte](DefaultInitBytes) + +func NewBytesPool() *BytesPool { + return bytesPool } func (p *Pool[T]) Get() []T { return p.pool.Get().([]T) } -func (p *Pool[T]) Put(b []T) { - if cap(b) <= p.maxSize { - p.pool.Put(b[:0]) //nolint:staticcheck +func (p *Pool[T]) GetSized(size int) []T { + b := p.Get() + if cap(b) < size { + p.Put(b) + return make([]T, size) } + return b[:size] +} + +func (p *Pool[T]) Put(b []T) { + p.pool.Put(b[:0]) //nolint:staticcheck }