mirror of
https://github.com/yusing/godoxy.git
synced 2025-05-20 12:42:34 +02:00
simplify access logger with bufio.Writer
This commit is contained in:
parent
72306e91a2
commit
ee27237083
3 changed files with 49 additions and 55 deletions
|
@ -1,6 +1,7 @@
|
||||||
package accesslog
|
package accesslog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -17,13 +18,9 @@ type (
|
||||||
task *task.Task
|
task *task.Task
|
||||||
cfg *Config
|
cfg *Config
|
||||||
io AccessLogIO
|
io AccessLogIO
|
||||||
|
buffered *bufio.Writer
|
||||||
|
|
||||||
buf bytes.Buffer // buffer for non-flushed log
|
lineBufPool sync.Pool // buffer pool for formatting a single log line
|
||||||
bufMu sync.RWMutex
|
|
||||||
bufPool sync.Pool // buffer pool for formatting a single log line
|
|
||||||
|
|
||||||
flushThreshold int
|
|
||||||
|
|
||||||
Formatter
|
Formatter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,13 +41,17 @@ type (
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewAccessLogger(parent task.Parent, io AccessLogIO, cfg *Config) *AccessLogger {
|
func NewAccessLogger(parent task.Parent, io AccessLogIO, cfg *Config) *AccessLogger {
|
||||||
|
if cfg.BufferSize == 0 {
|
||||||
|
cfg.BufferSize = DefaultBufferSize
|
||||||
|
}
|
||||||
|
if cfg.BufferSize < 4096 {
|
||||||
|
cfg.BufferSize = 4096
|
||||||
|
}
|
||||||
l := &AccessLogger{
|
l := &AccessLogger{
|
||||||
task: parent.Subtask("accesslog"),
|
task: parent.Subtask("accesslog"),
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
io: io,
|
io: io,
|
||||||
}
|
buffered: bufio.NewWriterSize(io, cfg.BufferSize),
|
||||||
if cfg.BufferSize < 1024 {
|
|
||||||
cfg.BufferSize = DefaultBufferSize
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt := CommonFormatter{cfg: &l.cfg.Fields, GetTimeNow: time.Now}
|
fmt := CommonFormatter{cfg: &l.cfg.Fields, GetTimeNow: time.Now}
|
||||||
|
@ -65,10 +66,8 @@ func NewAccessLogger(parent task.Parent, io AccessLogIO, cfg *Config) *AccessLog
|
||||||
panic("invalid access log format")
|
panic("invalid access log format")
|
||||||
}
|
}
|
||||||
|
|
||||||
l.flushThreshold = int(cfg.BufferSize * 4 / 5) // 80%
|
l.lineBufPool.New = func() any {
|
||||||
l.buf.Grow(int(cfg.BufferSize))
|
return bytes.NewBuffer(make([]byte, 0, 1024))
|
||||||
l.bufPool.New = func() any {
|
|
||||||
return new(bytes.Buffer)
|
|
||||||
}
|
}
|
||||||
go l.start()
|
go l.start()
|
||||||
return l
|
return l
|
||||||
|
@ -89,15 +88,12 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
line := l.bufPool.Get().(*bytes.Buffer)
|
line := l.lineBufPool.Get().(*bytes.Buffer)
|
||||||
l.Format(line, req, res)
|
|
||||||
line.WriteRune('\n')
|
|
||||||
|
|
||||||
l.bufMu.Lock()
|
|
||||||
l.buf.Write(line.Bytes())
|
|
||||||
line.Reset()
|
line.Reset()
|
||||||
l.bufPool.Put(line)
|
defer l.lineBufPool.Put(line)
|
||||||
l.bufMu.Unlock()
|
l.Formatter.Format(line, req, res)
|
||||||
|
line.WriteRune('\n')
|
||||||
|
l.write(line.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *AccessLogger) LogError(req *http.Request, err error) {
|
func (l *AccessLogger) LogError(req *http.Request, err error) {
|
||||||
|
@ -118,52 +114,50 @@ func (l *AccessLogger) Rotate() error {
|
||||||
return l.cfg.Retention.rotateLogFile(l.io)
|
return l.cfg.Retention.rotateLogFile(l.io)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *AccessLogger) Flush(force bool) {
|
|
||||||
if l.buf.Len() == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if force || l.buf.Len() >= l.flushThreshold {
|
|
||||||
l.bufMu.RLock()
|
|
||||||
l.write(l.buf.Bytes())
|
|
||||||
l.buf.Reset()
|
|
||||||
l.bufMu.RUnlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *AccessLogger) handleErr(err error) {
|
func (l *AccessLogger) handleErr(err error) {
|
||||||
gperr.LogError("failed to write access log", err)
|
gperr.LogError("failed to write access log", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *AccessLogger) start() {
|
func (l *AccessLogger) start() {
|
||||||
defer func() {
|
defer func() {
|
||||||
if l.buf.Len() > 0 { // flush last
|
if err := l.Flush(); err != nil {
|
||||||
l.write(l.buf.Bytes())
|
l.handleErr(err)
|
||||||
}
|
}
|
||||||
l.io.Close()
|
l.close()
|
||||||
l.task.Finish(nil)
|
l.task.Finish(nil)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// periodic flush + threshold flush
|
// flushes the buffer every 30 seconds
|
||||||
periodic := time.NewTicker(5 * time.Second)
|
flushTicker := time.NewTicker(30 * time.Second)
|
||||||
threshold := time.NewTicker(time.Second)
|
defer flushTicker.Stop()
|
||||||
defer periodic.Stop()
|
|
||||||
defer threshold.Stop()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-l.task.Context().Done():
|
case <-l.task.Context().Done():
|
||||||
return
|
return
|
||||||
case <-periodic.C:
|
case <-flushTicker.C:
|
||||||
l.Flush(true)
|
if err := l.Flush(); err != nil {
|
||||||
case <-threshold.C:
|
l.handleErr(err)
|
||||||
l.Flush(false)
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *AccessLogger) Flush() error {
|
||||||
|
l.io.Lock()
|
||||||
|
defer l.io.Unlock()
|
||||||
|
return l.buffered.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *AccessLogger) close() {
|
||||||
|
l.io.Lock()
|
||||||
|
defer l.io.Unlock()
|
||||||
|
l.io.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func (l *AccessLogger) write(data []byte) {
|
func (l *AccessLogger) write(data []byte) {
|
||||||
l.io.Lock() // prevent concurrent write, i.e. log rotation, other access loggers
|
l.io.Lock() // prevent concurrent write, i.e. log rotation, other access loggers
|
||||||
_, err := l.io.Write(data)
|
_, err := l.buffered.Write(data)
|
||||||
l.io.Unlock()
|
l.io.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.handleErr(err)
|
l.handleErr(err)
|
||||||
|
|
|
@ -17,7 +17,7 @@ type (
|
||||||
Cookies FieldConfig `json:"cookies"`
|
Cookies FieldConfig `json:"cookies"`
|
||||||
}
|
}
|
||||||
Config struct {
|
Config struct {
|
||||||
BufferSize uint `json:"buffer_size" validate:"gte=1"`
|
BufferSize int `json:"buffer_size"`
|
||||||
Format Format `json:"format" validate:"oneof=common combined json"`
|
Format Format `json:"format" validate:"oneof=common combined json"`
|
||||||
Path string `json:"path" validate:"required"`
|
Path string `json:"path" validate:"required"`
|
||||||
Filters Filters `json:"filters"`
|
Filters Filters `json:"filters"`
|
||||||
|
|
|
@ -71,14 +71,14 @@ func TestConcurrentAccessLoggerLogAndFlush(t *testing.T) {
|
||||||
go func(l *AccessLogger) {
|
go func(l *AccessLogger) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
parallelLog(l, req, resp, logCountPerLogger)
|
parallelLog(l, req, resp, logCountPerLogger)
|
||||||
l.Flush(true)
|
l.Flush()
|
||||||
}(logger)
|
}(logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
expected := loggerCount * logCountPerLogger
|
expected := loggerCount * logCountPerLogger
|
||||||
actual := file.Count()
|
actual := file.LineCount()
|
||||||
ExpectEqual(t, actual, expected)
|
ExpectEqual(t, actual, expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue