From 4dda54c9e6d4f2959654d56a4b538c349961f2e5 Mon Sep 17 00:00:00 2001 From: yusing Date: Wed, 1 Jan 2025 06:09:35 +0800 Subject: [PATCH] access logger improvements --- internal/net/http/accesslog/access_logger.go | 143 +++++++++++------- .../net/http/accesslog/access_logger_test.go | 35 +++-- 2 files changed, 108 insertions(+), 70 deletions(-) diff --git a/internal/net/http/accesslog/access_logger.go b/internal/net/http/accesslog/access_logger.go index b16284e..5b95fe8 100644 --- a/internal/net/http/accesslog/access_logger.go +++ b/internal/net/http/accesslog/access_logger.go @@ -2,13 +2,11 @@ package accesslog import ( "bytes" - "fmt" "io" "net/http" - "os" + "sync" "time" - "github.com/yusing/go-proxy/internal/common" E "github.com/yusing/go-proxy/internal/error" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/task" @@ -16,13 +14,27 @@ import ( type ( AccessLogger struct { - parent *task.Task - buf chan []byte - cfg *Config - w io.WriteCloser + task *task.Task + cfg *Config + io AccessLogIO + + buf bytes.Buffer + bufPool sync.Pool + flushThreshold int + flushMu sync.Mutex + Formatter } + AccessLogIO interface { + io.ReadWriteCloser + io.ReadWriteSeeker + io.ReaderAt + sync.Locker + Name() string // file name or path + Truncate(size int64) error + } + Formatter interface { // Format writes a log line to line without a trailing newline Format(line *bytes.Buffer, req *http.Request, res *http.Response) @@ -31,48 +43,35 @@ type ( var logger = logging.With().Str("module", "accesslog").Logger() -var TestTimeNow = time.Now().Format(logTimeFormat) - -const logTimeFormat = "02/Jan/2006:15:04:05 -0700" - -func NewFileAccessLogger(parent *task.Task, cfg *Config) (*AccessLogger, error) { - f, err := os.OpenFile(cfg.Path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) - if err != nil { - return nil, fmt.Errorf("access log open error: %w", err) - } - return NewAccessLogger(parent, f, cfg), nil -} - -func NewAccessLogger(parent *task.Task, w io.WriteCloser, cfg *Config) *AccessLogger { +func NewAccessLogger(parent task.Parent, io AccessLogIO, cfg *Config) *AccessLogger { l := &AccessLogger{ - parent: parent, - cfg: cfg, - w: w, + task: parent.Subtask("accesslog"), + cfg: cfg, + io: io, } - fmt := CommonFormatter{cfg: &l.cfg.Fields} + if cfg.BufferSize < 1024 { + cfg.BufferSize = DefaultBufferSize + } + + fmt := &CommonFormatter{cfg: &l.cfg.Fields, GetTimeNow: time.Now} switch l.cfg.Format { case FormatCommon: l.Formatter = fmt case FormatCombined: - l.Formatter = CombinedFormatter{CommonFormatter: fmt} + l.Formatter = (*CombinedFormatter)(fmt) case FormatJSON: - l.Formatter = JSONFormatter{CommonFormatter: fmt} + l.Formatter = (*JSONFormatter)(fmt) } - if cfg.BufferSize == 0 { - cfg.BufferSize = DefaultBufferSize + + l.flushThreshold = int(cfg.BufferSize * 4 / 5) // 80% + l.buf.Grow(int(cfg.BufferSize)) + l.bufPool.New = func() any { + return new(bytes.Buffer) } - l.buf = make(chan []byte, cfg.BufferSize) go l.start() return l } -func timeNow() string { - if !common.IsTest { - return time.Now().Format(logTimeFormat) - } - return TestTimeNow -} - func (l *AccessLogger) checkKeep(req *http.Request, res *http.Response) bool { if !l.cfg.Filters.StatusCodes.CheckKeep(req, res) || !l.cfg.Filters.Method.CheckKeep(req, res) || @@ -88,25 +87,42 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) { return } - var line bytes.Buffer - l.Format(&line, req, res) + line := l.bufPool.Get().(*bytes.Buffer) + l.Format(line, req, res) line.WriteRune('\n') - select { - case <-l.parent.Context().Done(): - return - default: - l.buf <- line.Bytes() - } + l.flushMu.Lock() + l.buf.Write(line.Bytes()) + line.Reset() + l.bufPool.Put(line) + l.flushMu.Unlock() } func (l *AccessLogger) LogError(req *http.Request, err error) { l.Log(req, &http.Response{StatusCode: http.StatusInternalServerError, Status: err.Error()}) } -func (l *AccessLogger) close() { - close(l.buf) - l.w.Close() +func (l *AccessLogger) Config() *Config { + return l.cfg +} + +// func (l *AccessLogger) Rotate() error { +// if l.cfg.Retention == nil { +// return nil +// } +// l.io.Lock() +// defer l.io.Unlock() + +// return l.cfg.Retention.rotateLogFile(l.io) +// } + +func (l *AccessLogger) Flush(force bool) { + l.flushMu.Lock() + if force || l.buf.Len() >= l.flushThreshold { + l.writeLine(l.buf.Bytes()) + l.buf.Reset() + } + l.flushMu.Unlock() } func (l *AccessLogger) handleErr(err error) { @@ -114,21 +130,34 @@ func (l *AccessLogger) handleErr(err error) { } func (l *AccessLogger) start() { - task := l.parent.Subtask("access log flusher") - defer task.Finish("done") - defer l.close() + defer func() { + if l.buf.Len() > 0 { // flush last + l.writeLine(l.buf.Bytes()) + } + l.io.Close() + l.task.Finish(nil) + }() + + // periodic + threshold flush + flushTicker := time.NewTicker(5 * time.Second) for { select { - case <-task.Context().Done(): + case <-l.task.Context().Done(): return + case <-flushTicker.C: + l.Flush(true) default: - for line := range l.buf { - _, err := l.w.Write(line) - if err != nil { - l.handleErr(err) - } - } + l.Flush(false) } } } + +func (l *AccessLogger) writeLine(line []byte) { + l.io.Lock() // prevent write on log rotation + _, err := l.io.Write(line) + l.io.Unlock() + if err != nil { + l.handleErr(err) + } +} diff --git a/internal/net/http/accesslog/access_logger_test.go b/internal/net/http/accesslog/access_logger_test.go index 6818ebe..382dc7e 100644 --- a/internal/net/http/accesslog/access_logger_test.go +++ b/internal/net/http/accesslog/access_logger_test.go @@ -7,10 +7,10 @@ import ( "net/http" "net/url" "testing" + "time" E "github.com/yusing/go-proxy/internal/error" . "github.com/yusing/go-proxy/internal/net/http/accesslog" - taskPkg "github.com/yusing/go-proxy/internal/task" . "github.com/yusing/go-proxy/internal/utils/testing" ) @@ -51,19 +51,25 @@ var ( } ) -func fmtLog(cfg *Config) string { - var line bytes.Buffer - logger := NewAccessLogger(taskPkg.GlobalTask("test logger"), nil, cfg) - logger.Format(&line, req, resp) - return line.String() +func fmtLog(cfg *Config) (ts string, line string) { + var buf bytes.Buffer + + t := time.Now() + logger := NewAccessLogger(nil, nil, cfg) + logger.Formatter.(*CommonFormatter).GetTimeNow = func() time.Time { + return t + } + logger.Format(&buf, req, resp) + return t.Format(LogTimeFormat), buf.String() } func TestAccessLoggerCommon(t *testing.T) { config := DefaultConfig() config.Format = FormatCommon - ExpectEqual(t, fmtLog(config), + ts, log := fmtLog(config) + ExpectEqual(t, log, fmt.Sprintf("%s %s - - [%s] \"%s %s %s\" %d %d", - host, remote, TestTimeNow, method, uri, proto, status, contentLength, + host, remote, ts, method, uri, proto, status, contentLength, ), ) } @@ -71,9 +77,10 @@ func TestAccessLoggerCommon(t *testing.T) { func TestAccessLoggerCombined(t *testing.T) { config := DefaultConfig() config.Format = FormatCombined - ExpectEqual(t, fmtLog(config), + ts, log := fmtLog(config) + ExpectEqual(t, log, fmt.Sprintf("%s %s - - [%s] \"%s %s %s\" %d %d \"%s\" \"%s\"", - host, remote, TestTimeNow, method, uri, proto, status, contentLength, referer, ua, + host, remote, ts, method, uri, proto, status, contentLength, referer, ua, ), ) } @@ -82,9 +89,10 @@ func TestAccessLoggerRedactQuery(t *testing.T) { config := DefaultConfig() config.Format = FormatCommon config.Fields.Query.Default = FieldModeRedact - ExpectEqual(t, fmtLog(config), + ts, log := fmtLog(config) + ExpectEqual(t, log, fmt.Sprintf("%s %s - - [%s] \"%s %s %s\" %d %d", - host, remote, TestTimeNow, method, uriRedacted, proto, status, contentLength, + host, remote, ts, method, uriRedacted, proto, status, contentLength, ), ) } @@ -93,7 +101,8 @@ func getJSONEntry(t *testing.T, config *Config) JSONLogEntry { t.Helper() config.Format = FormatJSON var entry JSONLogEntry - err := json.Unmarshal([]byte(fmtLog(config)), &entry) + _, log := fmtLog(config) + err := json.Unmarshal([]byte(log), &entry) ExpectNoError(t, err) return entry }