From ee27237083e8aab78d0e77a450104e2d1c31821b Mon Sep 17 00:00:00 2001 From: yusing Date: Tue, 18 Feb 2025 01:12:42 +0800 Subject: [PATCH] simplify access logger with bufio.Writer --- .../net/gphttp/accesslog/access_logger.go | 98 +++++++++---------- internal/net/gphttp/accesslog/config.go | 2 +- .../net/gphttp/accesslog/file_logger_test.go | 4 +- 3 files changed, 49 insertions(+), 55 deletions(-) diff --git a/internal/net/gphttp/accesslog/access_logger.go b/internal/net/gphttp/accesslog/access_logger.go index 36e7486..06a068b 100644 --- a/internal/net/gphttp/accesslog/access_logger.go +++ b/internal/net/gphttp/accesslog/access_logger.go @@ -1,6 +1,7 @@ package accesslog import ( + "bufio" "bytes" "io" "net/http" @@ -14,16 +15,12 @@ import ( type ( AccessLogger struct { - task *task.Task - cfg *Config - io AccessLogIO - - buf bytes.Buffer // buffer for non-flushed log - bufMu sync.RWMutex - bufPool sync.Pool // buffer pool for formatting a single log line - - flushThreshold int + task *task.Task + cfg *Config + io AccessLogIO + buffered *bufio.Writer + lineBufPool sync.Pool // buffer pool for formatting a single log line Formatter } @@ -44,14 +41,18 @@ type ( ) func NewAccessLogger(parent task.Parent, io AccessLogIO, cfg *Config) *AccessLogger { - l := &AccessLogger{ - task: parent.Subtask("accesslog"), - cfg: cfg, - io: io, - } - if cfg.BufferSize < 1024 { + if cfg.BufferSize == 0 { cfg.BufferSize = DefaultBufferSize } + if cfg.BufferSize < 4096 { + cfg.BufferSize = 4096 + } + l := &AccessLogger{ + task: parent.Subtask("accesslog"), + cfg: cfg, + io: io, + buffered: bufio.NewWriterSize(io, cfg.BufferSize), + } fmt := CommonFormatter{cfg: &l.cfg.Fields, GetTimeNow: time.Now} switch l.cfg.Format { @@ -65,10 +66,8 @@ func NewAccessLogger(parent task.Parent, io AccessLogIO, cfg *Config) *AccessLog panic("invalid access log format") } - l.flushThreshold = int(cfg.BufferSize * 4 / 5) // 80% - l.buf.Grow(int(cfg.BufferSize)) - l.bufPool.New = func() any { - return new(bytes.Buffer) + l.lineBufPool.New = func() any { + return bytes.NewBuffer(make([]byte, 0, 1024)) } go l.start() return l @@ -89,15 +88,12 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) { return } - line := l.bufPool.Get().(*bytes.Buffer) - l.Format(line, req, res) - line.WriteRune('\n') - - l.bufMu.Lock() - l.buf.Write(line.Bytes()) + line := l.lineBufPool.Get().(*bytes.Buffer) line.Reset() - l.bufPool.Put(line) - l.bufMu.Unlock() + defer l.lineBufPool.Put(line) + l.Formatter.Format(line, req, res) + line.WriteRune('\n') + l.write(line.Bytes()) } func (l *AccessLogger) LogError(req *http.Request, err error) { @@ -118,52 +114,50 @@ func (l *AccessLogger) Rotate() error { return l.cfg.Retention.rotateLogFile(l.io) } -func (l *AccessLogger) Flush(force bool) { - if l.buf.Len() == 0 { - return - } - if force || l.buf.Len() >= l.flushThreshold { - l.bufMu.RLock() - l.write(l.buf.Bytes()) - l.buf.Reset() - l.bufMu.RUnlock() - } -} - func (l *AccessLogger) handleErr(err error) { gperr.LogError("failed to write access log", err) } func (l *AccessLogger) start() { defer func() { - if l.buf.Len() > 0 { // flush last - l.write(l.buf.Bytes()) + if err := l.Flush(); err != nil { + l.handleErr(err) } - l.io.Close() + l.close() l.task.Finish(nil) }() - // periodic flush + threshold flush - periodic := time.NewTicker(5 * time.Second) - threshold := time.NewTicker(time.Second) - defer periodic.Stop() - defer threshold.Stop() + // flushes the buffer every 30 seconds + flushTicker := time.NewTicker(30 * time.Second) + defer flushTicker.Stop() for { select { case <-l.task.Context().Done(): return - case <-periodic.C: - l.Flush(true) - case <-threshold.C: - l.Flush(false) + case <-flushTicker.C: + if err := l.Flush(); err != nil { + l.handleErr(err) + } } } } +func (l *AccessLogger) Flush() error { + l.io.Lock() + defer l.io.Unlock() + return l.buffered.Flush() +} + +func (l *AccessLogger) close() { + l.io.Lock() + defer l.io.Unlock() + l.io.Close() +} + func (l *AccessLogger) write(data []byte) { l.io.Lock() // prevent concurrent write, i.e. log rotation, other access loggers - _, err := l.io.Write(data) + _, err := l.buffered.Write(data) l.io.Unlock() if err != nil { l.handleErr(err) diff --git a/internal/net/gphttp/accesslog/config.go b/internal/net/gphttp/accesslog/config.go index 820c302..a1dbe2f 100644 --- a/internal/net/gphttp/accesslog/config.go +++ b/internal/net/gphttp/accesslog/config.go @@ -17,7 +17,7 @@ type ( Cookies FieldConfig `json:"cookies"` } Config struct { - BufferSize uint `json:"buffer_size" validate:"gte=1"` + BufferSize int `json:"buffer_size"` Format Format `json:"format" validate:"oneof=common combined json"` Path string `json:"path" validate:"required"` Filters Filters `json:"filters"` diff --git a/internal/net/gphttp/accesslog/file_logger_test.go b/internal/net/gphttp/accesslog/file_logger_test.go index ffa7aab..0321a85 100644 --- a/internal/net/gphttp/accesslog/file_logger_test.go +++ b/internal/net/gphttp/accesslog/file_logger_test.go @@ -71,14 +71,14 @@ func TestConcurrentAccessLoggerLogAndFlush(t *testing.T) { go func(l *AccessLogger) { defer wg.Done() parallelLog(l, req, resp, logCountPerLogger) - l.Flush(true) + l.Flush() }(logger) } wg.Wait() expected := loggerCount * logCountPerLogger - actual := file.Count() + actual := file.LineCount() ExpectEqual(t, actual, expected) }