From 16e6e72454b04bb498d670cbb7ac7944077dfd5c Mon Sep 17 00:00:00 2001 From: yusing Date: Thu, 1 May 2025 05:57:02 +0800 Subject: [PATCH] feat(access_log): dynamic buffer size --- internal/logging/accesslog/access_logger.go | 98 +++++++++++++------ internal/logging/accesslog/config.go | 10 +- internal/logging/accesslog/config_test.go | 2 - .../logging/accesslog/file_logger_test.go | 1 - internal/logging/accesslog/rotate.go | 3 + 5 files changed, 75 insertions(+), 39 deletions(-) diff --git a/internal/logging/accesslog/access_logger.go b/internal/logging/accesslog/access_logger.go index 4f3ba46..cde4e32 100644 --- a/internal/logging/accesslog/access_logger.go +++ b/internal/logging/accesslog/access_logger.go @@ -4,8 +4,8 @@ import ( "bufio" "io" "net/http" - "os" "sync" + "sync/atomic" "time" "github.com/rs/zerolog" @@ -13,6 +13,7 @@ import ( "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/task" + "github.com/yusing/go-proxy/internal/utils/strutils" "github.com/yusing/go-proxy/internal/utils/synk" "golang.org/x/time/rate" ) @@ -22,12 +23,17 @@ type ( task *task.Task cfg *Config + rawWriter io.Writer closer []io.Closer supportRotate []supportRotate writer *bufio.Writer writeLock sync.Mutex closed bool + wps int64 + bufSize int + lastAdjust time.Time + lineBufPool *synk.BytesPool // buffer pool for formatting a single log line errRateLimiter *rate.Limiter @@ -60,15 +66,13 @@ type ( ) const ( - StdoutbufSize = 64 MinBufferSize = 4 * kilobyte - MaxBufferSize = 1 * megabyte + MaxBufferSize = 8 * megabyte + + bufferAdjustInterval = time.Second // How often we check & adjust ) -const ( - flushInterval = 30 * time.Second - defaultRotateInterval = time.Hour -) +const defaultRotateInterval = time.Hour const ( errRateLimit = 200 * time.Millisecond @@ -105,18 +109,6 @@ func unwrap[Writer any](w io.Writer) []Writer { func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg AnyConfig) *AccessLogger { cfg := anyCfg.ToConfig() - if cfg.BufferSize == 0 { - cfg.BufferSize = DefaultBufferSize - } - if cfg.BufferSize < MinBufferSize { - cfg.BufferSize = MinBufferSize - } - if cfg.BufferSize > MaxBufferSize { - cfg.BufferSize = MaxBufferSize - } - if _, ok := writer.(*os.File); ok { - cfg.BufferSize = StdoutbufSize - } if cfg.RotateInterval == 0 { cfg.RotateInterval = defaultRotateInterval } @@ -124,7 +116,9 @@ func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg Any l := &AccessLogger{ task: parent.Subtask("accesslog."+writer.Name(), true), cfg: cfg, - writer: bufio.NewWriterSize(writer, cfg.BufferSize), + rawWriter: writer, + writer: bufio.NewWriterSize(writer, MinBufferSize), + bufSize: MinBufferSize, lineBufPool: synk.NewBytesPool(256, 768), // for common/combined usually < 256B; for json < 512B errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst), logger: logging.With().Str("file", writer.Name()).Logger(), @@ -223,9 +217,9 @@ func (l *AccessLogger) Rotate() (result *RotateResult, err error) { func (l *AccessLogger) handleErr(err error) { if l.errRateLimiter.Allow() { - gperr.LogError("failed to write access log", err) + gperr.LogError("failed to write access log", err, &l.logger) } else { - gperr.LogError("too many errors, stopping access log", err) + gperr.LogError("too many errors, stopping access log", err, &l.logger) l.task.Finish(err) } } @@ -237,19 +231,16 @@ func (l *AccessLogger) start() { l.task.Finish(nil) }() - // flushes the buffer every 30 seconds - flushTicker := time.NewTicker(30 * time.Second) - defer flushTicker.Stop() - rotateTicker := time.NewTicker(l.cfg.RotateInterval) defer rotateTicker.Stop() + bufAdjTicker := time.NewTicker(bufferAdjustInterval) + defer bufAdjTicker.Stop() + for { select { case <-l.task.Context().Done(): return - case <-flushTicker.C: - l.Flush() case <-rotateTicker.C: if !l.ShouldRotate() { continue @@ -262,6 +253,8 @@ func (l *AccessLogger) start() { } else { l.logger.Info().Msg("no rotation needed") } + case <-bufAdjTicker.C: + l.adjustBuffer() } } } @@ -298,8 +291,55 @@ func (l *AccessLogger) write(data []byte) { if l.closed { return } - _, err := l.writer.Write(data) + n, err := l.writer.Write(data) if err != nil { l.handleErr(err) + } else if n < len(data) { + l.handleErr(gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, len(data), n)) } + atomic.AddInt64(&l.wps, int64(n)) +} + +func (l *AccessLogger) adjustBuffer() { + wps := int(atomic.SwapInt64(&l.wps, 0)) + origBufSize := l.bufSize + newBufSize := origBufSize + + halfDiff := (wps - origBufSize) / 2 + if halfDiff < 0 { + halfDiff = -halfDiff + } + step := max(halfDiff, wps/2) + + switch { + case origBufSize < wps: + newBufSize += step + if newBufSize > MaxBufferSize { + newBufSize = MaxBufferSize + } + case origBufSize > wps: + newBufSize -= step + if newBufSize < MinBufferSize { + newBufSize = MinBufferSize + } + } + + if newBufSize == origBufSize { + return + } + + l.writeLock.Lock() + defer l.writeLock.Unlock() + if l.closed { + return + } + + l.logger.Info(). + Str("wps", strutils.FormatByteSize(wps)). + Str("old", strutils.FormatByteSize(origBufSize)). + Str("new", strutils.FormatByteSize(newBufSize)). + Msg("adjusted buffer size") + + l.writer = bufio.NewWriterSize(l.rawWriter, newBufSize) + l.bufSize = newBufSize } diff --git a/internal/logging/accesslog/config.go b/internal/logging/accesslog/config.go index 82aa381..ebabe6e 100644 --- a/internal/logging/accesslog/config.go +++ b/internal/logging/accesslog/config.go @@ -9,7 +9,7 @@ import ( type ( ConfigBase struct { - BufferSize int `json:"buffer_size"` + B int `json:"buffer_size"` // Deprecated: buffer size is adjusted dynamically Path string `json:"path"` Stdout bool `json:"stdout"` Retention *Retention `json:"retention" aliases:"keep"` @@ -58,8 +58,6 @@ var ( ReqLoggerFormats = []Format{FormatCommon, FormatCombined, FormatJSON} ) -const DefaultBufferSize = 64 * kilobyte // 64KB - func (cfg *ConfigBase) Validate() gperr.Error { if cfg.Path == "" && !cfg.Stdout { return gperr.New("path or stdout is required") @@ -102,8 +100,7 @@ func (cfg *RequestLoggerConfig) ToConfig() *Config { func DefaultRequestLoggerConfig() *RequestLoggerConfig { return &RequestLoggerConfig{ ConfigBase: ConfigBase{ - BufferSize: DefaultBufferSize, - Retention: &Retention{Days: 30}, + Retention: &Retention{Days: 30}, }, Format: FormatCombined, Fields: Fields{ @@ -123,8 +120,7 @@ func DefaultRequestLoggerConfig() *RequestLoggerConfig { func DefaultACLLoggerConfig() *ACLLoggerConfig { return &ACLLoggerConfig{ ConfigBase: ConfigBase{ - BufferSize: DefaultBufferSize, - Retention: &Retention{Days: 30}, + Retention: &Retention{Days: 30}, }, } } diff --git a/internal/logging/accesslog/config_test.go b/internal/logging/accesslog/config_test.go index 6d10f09..a44199d 100644 --- a/internal/logging/accesslog/config_test.go +++ b/internal/logging/accesslog/config_test.go @@ -11,7 +11,6 @@ import ( func TestNewConfig(t *testing.T) { labels := map[string]string{ - "proxy.buffer_size": "10", "proxy.format": "combined", "proxy.path": "/tmp/access.log", "proxy.filters.status_codes.values": "200-299", @@ -33,7 +32,6 @@ func TestNewConfig(t *testing.T) { err = utils.MapUnmarshalValidate(parsed, &config) expect.NoError(t, err) - expect.Equal(t, config.BufferSize, 10) expect.Equal(t, config.Format, FormatCombined) expect.Equal(t, config.Path, "/tmp/access.log") expect.Equal(t, config.Filters.StatusCodes.Values, []*StatusCodeRange{{Start: 200, End: 299}}) diff --git a/internal/logging/accesslog/file_logger_test.go b/internal/logging/accesslog/file_logger_test.go index 884214b..6667227 100644 --- a/internal/logging/accesslog/file_logger_test.go +++ b/internal/logging/accesslog/file_logger_test.go @@ -50,7 +50,6 @@ func TestConcurrentAccessLoggerLogAndFlush(t *testing.T) { file := NewMockFile() cfg := DefaultRequestLoggerConfig() - cfg.BufferSize = 1024 parent := task.RootTask("test", false) loggerCount := 5 diff --git a/internal/logging/accesslog/rotate.go b/internal/logging/accesslog/rotate.go index c7a7dcb..614f2ad 100644 --- a/internal/logging/accesslog/rotate.go +++ b/internal/logging/accesslog/rotate.go @@ -6,6 +6,7 @@ import ( "time" "github.com/rs/zerolog" + "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/utils" "github.com/yusing/go-proxy/internal/utils/strutils" "github.com/yusing/go-proxy/internal/utils/synk" @@ -201,6 +202,8 @@ func rotateLogFileByPolicy(file supportRotate, config *Retention) (result *Rotat // Write it to the new position if _, err := file.WriteAt(buf, writePos); err != nil { return nil, err + } else if n < line.Size { + return nil, gperr.Errorf("%w, writing %d bytes, only %d written", io.ErrShortWrite, line.Size, n) } writePos += n }