From 99a72451d9ac623e4da5597395e9ea2d869b94a3 Mon Sep 17 00:00:00 2001 From: yusing Date: Tue, 18 Feb 2025 05:39:53 +0800 Subject: [PATCH] fix access log rotation attempt --- .vscode/settings.example.json | 4 +- .../net/gphttp/accesslog/access_logger.go | 2 +- internal/net/gphttp/accesslog/back_scanner.go | 104 +++++++++++++ .../net/gphttp/accesslog/back_scanner_test.go | 127 +++++++++++++++ internal/net/gphttp/accesslog/mock_file.go | 7 +- internal/net/gphttp/accesslog/retention.go | 146 +----------------- .../net/gphttp/accesslog/retention_test.go | 48 ------ internal/net/gphttp/accesslog/rotate.go | 119 ++++++++++++++ internal/net/gphttp/accesslog/rotate_test.go | 86 +++++++++++ .../net/gphttp/loadbalancer/loadbalancer.go | 2 +- 10 files changed, 447 insertions(+), 198 deletions(-) create mode 100644 internal/net/gphttp/accesslog/back_scanner.go create mode 100644 internal/net/gphttp/accesslog/back_scanner_test.go create mode 100644 internal/net/gphttp/accesslog/rotate.go create mode 100644 internal/net/gphttp/accesslog/rotate_test.go diff --git a/.vscode/settings.example.json b/.vscode/settings.example.json index 2f28915..0410df0 100644 --- a/.vscode/settings.example.json +++ b/.vscode/settings.example.json @@ -1,10 +1,10 @@ { "yaml.schemas": { - "https://github.com/yusing/go-proxy/raw/v0.9/schemas/config.schema.json": [ + "https://github.com/yusing/go-proxy/raw/main/schemas/config.schema.json": [ "config.example.yml", "config.yml" ], - "https://github.com/yusing/go-proxy/raw/v0.9/schemas/routes.schema.json": [ + "https://github.com/yusing/go-proxy/raw/main/schemas/routes.schema.json": [ "providers.example.yml" ] } diff --git a/internal/net/gphttp/accesslog/access_logger.go b/internal/net/gphttp/accesslog/access_logger.go index 06a068b..9a6fc8a 100644 --- a/internal/net/gphttp/accesslog/access_logger.go +++ b/internal/net/gphttp/accesslog/access_logger.go @@ -111,7 +111,7 @@ func (l *AccessLogger) Rotate() error { l.io.Lock() defer l.io.Unlock() - return l.cfg.Retention.rotateLogFile(l.io) + return l.rotate() } func (l *AccessLogger) handleErr(err error) { diff --git a/internal/net/gphttp/accesslog/back_scanner.go b/internal/net/gphttp/accesslog/back_scanner.go new file mode 100644 index 0000000..2e55005 --- /dev/null +++ b/internal/net/gphttp/accesslog/back_scanner.go @@ -0,0 +1,104 @@ +package accesslog + +import ( + "bytes" + "io" +) + +// BackScanner provides an interface to read a file backward line by line. +type BackScanner struct { + file AccessLogIO + chunkSize int + offset int64 + buffer []byte + line []byte + err error + size int64 +} + +// NewBackScanner creates a new Scanner to read the file backward. +// chunkSize determines the size of each read chunk from the end of the file. +func NewBackScanner(file AccessLogIO, chunkSize int) *BackScanner { + size, err := file.Seek(0, io.SeekEnd) + if err != nil { + return &BackScanner{err: err} + } + return &BackScanner{ + file: file, + chunkSize: chunkSize, + offset: size, + size: size, + } +} + +// Scan advances the scanner to the previous line, which will then be available +// via the Bytes method. It returns false when there are no more lines. +func (s *BackScanner) Scan() bool { + if s.err != nil { + return false + } + + // Read chunks until a newline is found or the file is fully read + for { + // Check if there's a line in the buffer + if idx := bytes.LastIndexByte(s.buffer, '\n'); idx >= 0 { + s.line = s.buffer[idx+1:] + s.buffer = s.buffer[:idx] + if len(s.line) > 0 { + return true + } + continue + } + + for { + if s.offset <= 0 { + // No more data to read; check remaining buffer + if len(s.buffer) > 0 { + s.line = s.buffer + s.buffer = nil + return true + } + return false + } + + newOffset := max(0, s.offset-int64(s.chunkSize)) + chunkSize := s.offset - newOffset + chunk := make([]byte, chunkSize) + + n, err := s.file.ReadAt(chunk, newOffset) + if err != nil && err != io.EOF { + s.err = err + return false + } + + // Prepend the chunk to the buffer + s.buffer = append(chunk[:n], s.buffer...) + s.offset = newOffset + + // Check for newline in the updated buffer + if idx := bytes.LastIndexByte(s.buffer, '\n'); idx >= 0 { + s.line = s.buffer[idx+1:] + s.buffer = s.buffer[:idx] + if len(s.line) > 0 { + return true + } + break + } + } + } +} + +// Bytes returns the most recent line generated by a call to Scan. +func (s *BackScanner) Bytes() []byte { + return s.line +} + +// FileSize returns the size of the file. +func (s *BackScanner) FileSize() int64 { + return s.size +} + +// Err returns the first non-EOF error encountered by the scanner. +func (s *BackScanner) Err() error { + return s.err +} diff --git a/internal/net/gphttp/accesslog/back_scanner_test.go b/internal/net/gphttp/accesslog/back_scanner_test.go new file mode 100644 index 0000000..939b411 --- /dev/null +++ b/internal/net/gphttp/accesslog/back_scanner_test.go @@ -0,0 +1,127 @@ +package accesslog + +import ( + "fmt" + "strings" + "testing" +) + +func TestBackScanner(t *testing.T) { + tests := []struct { + name string + input string + expected []string + }{ + { + name: "empty file", + input: "", + expected: []string{}, + }, + { + name: "single line without newline", + input: "single line", + expected: []string{"single line"}, + }, + { + name: "single line with newline", + input: "single line\n", + expected: []string{"single line"}, + }, + { + name: "multiple lines", + input: "first\nsecond\nthird\n", + expected: []string{"third", "second", "first"}, + }, + { + name: "multiple lines without final newline", + input: "first\nsecond\nthird", + expected: []string{"third", "second", "first"}, + }, + { + name: "lines longer than chunk size", + input: "short\n" + strings.Repeat("a", 20) + "\nshort\n", + expected: []string{"short", strings.Repeat("a", 20), "short"}, + }, + { + name: "empty lines", + input: "first\n\n\nlast\n", + expected: []string{"last", "first"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup mock file + mockFile := &MockFile{} + _, err := mockFile.Write([]byte(tt.input)) + if err != nil { + t.Fatalf("failed to write to mock file: %v", err) + } + + // Create scanner with small chunk size to test chunking + scanner := NewBackScanner(mockFile, 10) + + // Collect all lines + var lines [][]byte + for scanner.Scan() { + lines = append(lines, scanner.Bytes()) + } + + // Check for scanning errors + if err := scanner.Err(); err != nil { + t.Errorf("scanner error: %v", err) + } + + // Compare results + if len(lines) != len(tt.expected) { + t.Errorf("got %d lines, want %d lines", len(lines), len(tt.expected)) + return + } + + for i, line := range lines { + if string(line) != tt.expected[i] { + t.Errorf("line %d: got %q, want %q", i, line, tt.expected[i]) + } + } + }) + } +} + +func TestBackScannerWithVaryingChunkSizes(t *testing.T) { + input := "first\nsecond\nthird\nfourth\nfifth\n" + expected := []string{"fifth", "fourth", "third", "second", "first"} + chunkSizes := []int{1, 2, 3, 5, 10, 20, 100} + + for _, chunkSize := range chunkSizes { + t.Run(fmt.Sprintf("chunk_size_%d", chunkSize), func(t *testing.T) { + mockFile := &MockFile{} + _, err := mockFile.Write([]byte(input)) + if err != nil { + t.Fatalf("failed to write to mock file: %v", err) + } + + scanner := NewBackScanner(mockFile, chunkSize) + + var lines [][]byte + for scanner.Scan() { + lines = append(lines, scanner.Bytes()) + } + + if err := scanner.Err(); err != nil { + t.Errorf("scanner error: %v", err) + } + + if len(lines) != len(expected) { + t.Errorf("got %d lines, want %d lines", len(lines), len(expected)) + return + } + + for i, line := range lines { + if string(line) != expected[i] { + t.Errorf("chunk size %d, line %d: got %q, want %q", + chunkSize, i, line, expected[i]) + } + } + }) + } +} diff --git a/internal/net/gphttp/accesslog/mock_file.go b/internal/net/gphttp/accesslog/mock_file.go index f960429..54c30c6 100644 --- a/internal/net/gphttp/accesslog/mock_file.go +++ b/internal/net/gphttp/accesslog/mock_file.go @@ -49,7 +49,6 @@ func (m *MockFile) ReadAt(p []byte, off int64) (n int, err error) { return 0, io.EOF } n = copy(p, m.data[off:]) - m.position += int64(n) return n, nil } @@ -63,7 +62,7 @@ func (m *MockFile) Truncate(size int64) error { return nil } -func (m *MockFile) Count() int { +func (m *MockFile) LineCount() int { m.Lock() defer m.Unlock() return bytes.Count(m.data[:m.position], []byte("\n")) @@ -72,3 +71,7 @@ func (m *MockFile) Count() int { func (m *MockFile) Len() int64 { return m.position } + +func (m *MockFile) Content() []byte { + return m.data[:m.position] +} diff --git a/internal/net/gphttp/accesslog/retention.go b/internal/net/gphttp/accesslog/retention.go index bf8b4c8..f0b5e2a 100644 --- a/internal/net/gphttp/accesslog/retention.go +++ b/internal/net/gphttp/accesslog/retention.go @@ -1,11 +1,7 @@ package accesslog import ( - "bufio" - "bytes" - "io" "strconv" - "time" "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/utils/strutils" @@ -16,13 +12,13 @@ type Retention struct { Last uint64 `json:"last"` } -const chunkSizeMax int64 = 128 * 1024 // 128KB - var ( ErrInvalidSyntax = gperr.New("invalid syntax") ErrZeroValue = gperr.New("zero value") ) +var defaultChunkSize = 64 * 1024 // 64KB + // Syntax: // // days|weeks|months @@ -58,141 +54,3 @@ func (r *Retention) Parse(v string) (err error) { } return } - -func (r *Retention) rotateLogFile(file AccessLogIO) (err error) { - lastN := int(r.Last) - days := int(r.Days) - - // Seek to end to get file size - size, err := file.Seek(0, io.SeekEnd) - if err != nil { - return err - } - - // Initialize ring buffer for last N lines - lines := make([][]byte, 0, lastN|(days*1000)) - pos := size - unprocessed := 0 - - var chunk [chunkSizeMax]byte - var lastLine []byte - - var shouldStop func() bool - if days > 0 { - cutoff := time.Now().AddDate(0, 0, -days) - shouldStop = func() bool { - return len(lastLine) > 0 && !parseLogTime(lastLine).After(cutoff) - } - } else { - shouldStop = func() bool { - return len(lines) == lastN - } - } - - // Read backwards until we have enough lines or reach start of file - for pos > 0 { - if pos > chunkSizeMax { - pos -= chunkSizeMax - } else { - pos = 0 - } - - // Seek to the current chunk - if _, err = file.Seek(pos, io.SeekStart); err != nil { - return err - } - - var nRead int - // Read the chunk - if nRead, err = file.Read(chunk[unprocessed:]); err != nil { - return err - } - - // last unprocessed bytes + read bytes - curChunk := chunk[:unprocessed+nRead] - unprocessed = len(curChunk) - - // Split into lines - scanner := bufio.NewScanner(bytes.NewReader(curChunk)) - for !shouldStop() && scanner.Scan() { - lastLine = scanner.Bytes() - lines = append(lines, lastLine) - unprocessed -= len(lastLine) - } - if shouldStop() { - break - } - - // move unprocessed bytes to the beginning for next iteration - copy(chunk[:], curChunk[unprocessed:]) - } - - if days > 0 { - // truncate to the end of the log within last N days - return file.Truncate(pos) - } - - // write lines to buffer in reverse order - // since we read them backwards - var buf bytes.Buffer - for i := len(lines) - 1; i >= 0; i-- { - buf.Write(lines[i]) - buf.WriteRune('\n') - } - - return writeTruncate(file, &buf) -} - -func writeTruncate(file AccessLogIO, buf *bytes.Buffer) (err error) { - // Seek to beginning and truncate - if _, err := file.Seek(0, 0); err != nil { - return err - } - - buffered := bufio.NewWriter(file) - // Write buffer back to file - nWritten, err := buffered.Write(buf.Bytes()) - if err != nil { - return err - } - if err = buffered.Flush(); err != nil { - return err - } - - // Truncate file - if err = file.Truncate(int64(nWritten)); err != nil { - return err - } - - // check bytes written == buffer size - if nWritten != buf.Len() { - return io.ErrShortWrite - } - return -} - -func parseLogTime(line []byte) (t time.Time) { - if len(line) == 0 { - return - } - - var start, end int - const jsonStart = len(`{"time":"`) - const jsonEnd = jsonStart + len(LogTimeFormat) - - if len(line) == '{' { // possibly json log - start = jsonStart - end = jsonEnd - } else { // possibly common or combined format - // Format: - - [02/Jan/2006:15:04:05 -0700] ... - start = bytes.IndexRune(line, '[') - end = bytes.IndexRune(line[start+1:], ']') - if start == -1 || end == -1 || start >= end { - return - } - } - - timeStr := line[start+1 : end] - t, _ = time.Parse(LogTimeFormat, string(timeStr)) // ignore error - return -} diff --git a/internal/net/gphttp/accesslog/retention_test.go b/internal/net/gphttp/accesslog/retention_test.go index ccb3b5d..1efd18d 100644 --- a/internal/net/gphttp/accesslog/retention_test.go +++ b/internal/net/gphttp/accesslog/retention_test.go @@ -2,11 +2,8 @@ package accesslog_test import ( "testing" - "time" . "github.com/yusing/go-proxy/internal/net/gphttp/accesslog" - "github.com/yusing/go-proxy/internal/task" - "github.com/yusing/go-proxy/internal/utils/strutils" . "github.com/yusing/go-proxy/internal/utils/testing" ) @@ -34,48 +31,3 @@ func TestParseRetention(t *testing.T) { }) } } - -func TestRetentionCommonFormat(t *testing.T) { - var file MockFile - logger := NewAccessLogger(task.RootTask("test", false), &file, &Config{ - Format: FormatCommon, - BufferSize: 1024, - }) - for range 10 { - logger.Log(req, resp) - } - logger.Flush(true) - // test.Finish(nil) - - ExpectEqual(t, logger.Config().Retention, nil) - ExpectTrue(t, file.Len() > 0) - ExpectEqual(t, file.Count(), 10) - - t.Run("keep last", func(t *testing.T) { - logger.Config().Retention = strutils.MustParse[*Retention]("last 5") - ExpectEqual(t, logger.Config().Retention.Days, 0) - ExpectEqual(t, logger.Config().Retention.Last, 5) - ExpectNoError(t, logger.Rotate()) - ExpectEqual(t, file.Count(), 5) - }) - - _ = file.Truncate(0) - - timeNow := time.Now() - for i := range 10 { - logger.Formatter.(*CommonFormatter).GetTimeNow = func() time.Time { - return timeNow.AddDate(0, 0, -i) - } - logger.Log(req, resp) - } - logger.Flush(true) - - // FIXME: keep days does not work - t.Run("keep days", func(t *testing.T) { - logger.Config().Retention = strutils.MustParse[*Retention]("3 days") - ExpectEqual(t, logger.Config().Retention.Days, 3) - ExpectEqual(t, logger.Config().Retention.Last, 0) - ExpectNoError(t, logger.Rotate()) - ExpectEqual(t, file.Count(), 3) - }) -} diff --git a/internal/net/gphttp/accesslog/rotate.go b/internal/net/gphttp/accesslog/rotate.go new file mode 100644 index 0000000..e93c22d --- /dev/null +++ b/internal/net/gphttp/accesslog/rotate.go @@ -0,0 +1,119 @@ +package accesslog + +import ( + "bytes" + "io" + "time" +) + +func (l *AccessLogger) rotate() (err error) { + // Get retention configuration + config := l.Config().Retention + var shouldKeep func(t time.Time, lineCount int) bool + + if config.Last > 0 { + shouldKeep = func(_ time.Time, lineCount int) bool { + return lineCount < int(config.Last) + } + } else if config.Days > 0 { + cutoff := time.Now().AddDate(0, 0, -int(config.Days)) + shouldKeep = func(t time.Time, _ int) bool { + return !t.IsZero() && !t.Before(cutoff) + } + } else { + return nil // No retention policy set + } + + s := NewBackScanner(l.io, defaultChunkSize) + nRead := 0 + nLines := 0 + for s.Scan() { + nRead += len(s.Bytes()) + 1 + nLines++ + t := ParseLogTime(s.Bytes()) + if !shouldKeep(t, nLines) { + break + } + } + if s.Err() != nil { + return s.Err() + } + + beg := int64(nRead) + if _, err := l.io.Seek(-beg, io.SeekEnd); err != nil { + return err + } + buf := make([]byte, nRead) + if _, err := l.io.Read(buf); err != nil { + return err + } + + if err := l.writeTruncate(buf); err != nil { + return err + } + return nil +} + +func (l *AccessLogger) writeTruncate(buf []byte) (err error) { + // Seek to beginning and truncate + if _, err := l.io.Seek(0, 0); err != nil { + return err + } + + // Write buffer back to file + nWritten, err := l.buffered.Write(buf) + if err != nil { + return err + } + if err = l.buffered.Flush(); err != nil { + return err + } + + // Truncate file + if err = l.io.Truncate(int64(nWritten)); err != nil { + return err + } + + // check bytes written == buffer size + if nWritten != len(buf) { + return io.ErrShortWrite + } + return +} + +const timeLen = len(`"time":"`) + +var timeJSON = []byte(`"time":"`) + +func ParseLogTime(line []byte) (t time.Time) { + if len(line) == 0 { + return + } + + if i := bytes.Index(line, timeJSON); i != -1 { // JSON format + var jsonStart = i + timeLen + var jsonEnd = i + timeLen + len(LogTimeFormat) + if len(line) < jsonEnd { + return + } + timeStr := line[jsonStart:jsonEnd] + t, _ = time.Parse(LogTimeFormat, string(timeStr)) + return + } + + // Common/Combined format + // Format: - - [02/Jan/2006:15:04:05 -0700] ... + start := bytes.IndexByte(line, '[') + if start == -1 { + return + } + end := bytes.IndexByte(line[start:], ']') + if end == -1 { + return + } + end += start // adjust end position relative to full line + + timeStr := line[start+1 : end] + t, _ = time.Parse(LogTimeFormat, string(timeStr)) // ignore error + return +} diff --git a/internal/net/gphttp/accesslog/rotate_test.go b/internal/net/gphttp/accesslog/rotate_test.go new file mode 100644 index 0000000..8b81792 --- /dev/null +++ b/internal/net/gphttp/accesslog/rotate_test.go @@ -0,0 +1,86 @@ +package accesslog_test + +import ( + "fmt" + "testing" + "time" + + . "github.com/yusing/go-proxy/internal/net/gphttp/accesslog" + "github.com/yusing/go-proxy/internal/task" + "github.com/yusing/go-proxy/internal/utils/strutils" + . "github.com/yusing/go-proxy/internal/utils/testing" +) + +func TestParseLogTime(t *testing.T) { + tests := []string{ + `{"foo":"bar","time":"%s","bar":"baz"}`, + `example.com 192.168.1.1 - - [%s] "GET / HTTP/1.1" 200 1234`, + } + testTime := time.Date(2024, 1, 2, 3, 4, 5, 0, time.UTC) + testTimeStr := testTime.Format(LogTimeFormat) + + for i, test := range tests { + tests[i] = fmt.Sprintf(test, testTimeStr) + } + + for _, test := range tests { + t.Run(test, func(t *testing.T) { + actual := ParseLogTime([]byte(test)) + ExpectTrue(t, actual.Equal(testTime)) + }) + } +} + +func TestRetentionCommonFormat(t *testing.T) { + var file MockFile + logger := NewAccessLogger(task.RootTask("test", false), &file, &Config{ + Format: FormatCommon, + BufferSize: 1024, + }) + for range 10 { + logger.Log(req, resp) + } + logger.Flush() + // test.Finish(nil) + + ExpectEqual(t, logger.Config().Retention, nil) + ExpectTrue(t, file.Len() > 0) + ExpectEqual(t, file.LineCount(), 10) + + t.Run("keep last", func(t *testing.T) { + logger.Config().Retention = strutils.MustParse[*Retention]("last 5") + ExpectEqual(t, logger.Config().Retention.Days, 0) + ExpectEqual(t, logger.Config().Retention.Last, 5) + ExpectNoError(t, logger.Rotate()) + ExpectEqual(t, file.LineCount(), 5) + }) + + _ = file.Truncate(0) + + timeNow := time.Now() + for i := range 10 { + logger.Formatter.(*CommonFormatter).GetTimeNow = func() time.Time { + return timeNow.AddDate(0, 0, -10+i) + } + logger.Log(req, resp) + } + logger.Flush() + ExpectEqual(t, file.LineCount(), 10) + + t.Run("keep days", func(t *testing.T) { + logger.Config().Retention = strutils.MustParse[*Retention]("3 days") + ExpectEqual(t, logger.Config().Retention.Days, 3) + ExpectEqual(t, logger.Config().Retention.Last, 0) + ExpectNoError(t, logger.Rotate()) + ExpectEqual(t, file.LineCount(), 3) + rotated := string(file.Content()) + _ = file.Truncate(0) + for i := range 3 { + logger.Formatter.(*CommonFormatter).GetTimeNow = func() time.Time { + return timeNow.AddDate(0, 0, -3+i) + } + logger.Log(req, resp) + } + ExpectEqual(t, rotated, string(file.Content())) + }) +} diff --git a/internal/net/gphttp/loadbalancer/loadbalancer.go b/internal/net/gphttp/loadbalancer/loadbalancer.go index 29603ea..3d474cf 100644 --- a/internal/net/gphttp/loadbalancer/loadbalancer.go +++ b/internal/net/gphttp/loadbalancer/loadbalancer.go @@ -125,7 +125,7 @@ func (lb *LoadBalancer) AddServer(srv Server) { lb.poolMu.Lock() defer lb.poolMu.Unlock() - if lb.pool.Has(srv.Key()) { + if lb.pool.Has(srv.Key()) { // FIXME: this should be a warning old, _ := lb.pool.Load(srv.Key()) lb.sumWeight -= old.Weight() lb.impl.OnRemoveServer(old)