mirror of
https://github.com/yusing/godoxy.git
synced 2025-06-01 01:22:34 +02:00
tweak: consolidate bytes pool management and enhance CopyClose functionality for improved performance
Some checks failed
Docker Image CI (socket-proxy) / build (push) Has been cancelled
Some checks failed
Docker Image CI (socket-proxy) / build (push) Has been cancelled
This commit is contained in:
parent
b163771956
commit
8469b6406c
8 changed files with 86 additions and 37 deletions
|
@ -3,18 +3,43 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
"runtime"
|
"runtime"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
"github.com/yusing/go-proxy/internal/utils/strutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const mb = 1024 * 1024
|
||||||
|
|
||||||
func initProfiling() {
|
func initProfiling() {
|
||||||
runtime.GOMAXPROCS(2)
|
debug.SetGCPercent(-1)
|
||||||
debug.SetMemoryLimit(100 * 1024 * 1024)
|
debug.SetMemoryLimit(50 * mb)
|
||||||
debug.SetMaxStack(15 * 1024 * 1024)
|
debug.SetMaxStack(4 * mb)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
log.Println(http.ListenAndServe(":7777", nil))
|
log.Info().Msgf("pprof server started at http://localhost:7777/debug/pprof/")
|
||||||
|
log.Error().Err(http.ListenAndServe(":7777", nil)).Msg("pprof server failed")
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(time.Second * 10)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range ticker.C {
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
log.Info().Msgf("-----------------------------------------------------")
|
||||||
|
log.Info().Msgf("Timestamp: %s", time.Now().Format(time.RFC3339))
|
||||||
|
log.Info().Msgf(" Go Heap - In Use (Alloc/HeapAlloc): %s", strutils.FormatByteSize(m.Alloc))
|
||||||
|
log.Info().Msgf(" Go Heap - Reserved from OS (HeapSys): %s", strutils.FormatByteSize(m.HeapSys))
|
||||||
|
log.Info().Msgf(" Go Stacks - In Use (StackInuse): %s", strutils.FormatByteSize(m.StackInuse))
|
||||||
|
log.Info().Msgf(" Go Runtime - Other Sys (MSpanInuse, MCacheInuse, BuckHashSys, GCSys, OtherSys): %s", strutils.FormatByteSize(m.MSpanInuse+m.MCacheInuse+m.BuckHashSys+m.GCSys+m.OtherSys))
|
||||||
|
log.Info().Msgf(" Go Runtime - Total from OS (Sys): %s", strutils.FormatByteSize(m.Sys))
|
||||||
|
log.Info().Msgf(" Number of Goroutines: %d", runtime.NumGoroutine())
|
||||||
|
log.Info().Msgf(" Number of GCs: %d", m.NumGC)
|
||||||
|
log.Info().Msg("-----------------------------------------------------")
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,8 +33,6 @@ type (
|
||||||
writeCount int64
|
writeCount int64
|
||||||
bufSize int
|
bufSize int
|
||||||
|
|
||||||
lineBufPool *synk.BytesPool // buffer pool for formatting a single log line
|
|
||||||
|
|
||||||
errRateLimiter *rate.Limiter
|
errRateLimiter *rate.Limiter
|
||||||
|
|
||||||
logger zerolog.Logger
|
logger zerolog.Logger
|
||||||
|
@ -78,6 +76,8 @@ const (
|
||||||
errBurst = 5
|
errBurst = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var lineBufPool = synk.NewBytesPool()
|
||||||
|
|
||||||
func NewAccessLogger(parent task.Parent, cfg AnyConfig) (*AccessLogger, error) {
|
func NewAccessLogger(parent task.Parent, cfg AnyConfig) (*AccessLogger, error) {
|
||||||
io, err := cfg.IO()
|
io, err := cfg.IO()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -121,7 +121,6 @@ func NewAccessLoggerWithIO(parent task.Parent, writer WriterWithName, anyCfg Any
|
||||||
rawWriter: writer,
|
rawWriter: writer,
|
||||||
writer: utils.NewBufferedWriter(writer, MinBufferSize),
|
writer: utils.NewBufferedWriter(writer, MinBufferSize),
|
||||||
bufSize: MinBufferSize,
|
bufSize: MinBufferSize,
|
||||||
lineBufPool: synk.NewBytesPool(),
|
|
||||||
errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst),
|
errRateLimiter: rate.NewLimiter(rate.Every(errRateLimit), errBurst),
|
||||||
logger: log.With().Str("file", writer.Name()).Logger(),
|
logger: log.With().Str("file", writer.Name()).Logger(),
|
||||||
}
|
}
|
||||||
|
@ -168,8 +167,8 @@ func (l *AccessLogger) Log(req *http.Request, res *http.Response) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
line := l.lineBufPool.Get()
|
line := lineBufPool.Get()
|
||||||
defer l.lineBufPool.Put(line)
|
defer lineBufPool.Put(line)
|
||||||
line = l.AppendRequestLog(line, req, res)
|
line = l.AppendRequestLog(line, req, res)
|
||||||
if line[len(line)-1] != '\n' {
|
if line[len(line)-1] != '\n' {
|
||||||
line = append(line, '\n')
|
line = append(line, '\n')
|
||||||
|
@ -182,8 +181,8 @@ func (l *AccessLogger) LogError(req *http.Request, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *AccessLogger) LogACL(info *maxmind.IPInfo, blocked bool) {
|
func (l *AccessLogger) LogACL(info *maxmind.IPInfo, blocked bool) {
|
||||||
line := l.lineBufPool.Get()
|
line := lineBufPool.Get()
|
||||||
defer l.lineBufPool.Put(line)
|
defer lineBufPool.Put(line)
|
||||||
line = l.AppendACLLog(line, info, blocked)
|
line = l.AppendACLLog(line, info, blocked)
|
||||||
if line[len(line)-1] != '\n' {
|
if line[len(line)-1] != '\n' {
|
||||||
line = append(line, '\n')
|
line = append(line, '\n')
|
||||||
|
|
|
@ -411,7 +411,7 @@ func (p *ReverseProxy) handler(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
|
||||||
rw.WriteHeader(res.StatusCode)
|
rw.WriteHeader(res.StatusCode)
|
||||||
|
|
||||||
err = U.CopyCloseWithContext(ctx, rw, res.Body) // close now, instead of defer, to populate res.Trailer
|
err = U.CopyCloseWithContext(ctx, rw, res.Body, int(res.ContentLength)) // close now, instead of defer, to populate res.Trailer
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, context.Canceled) {
|
if !errors.Is(err, context.Canceled) {
|
||||||
p.errorHandler(rw, req, err, false)
|
p.errorHandler(rw, req, err, false)
|
||||||
|
|
|
@ -72,7 +72,7 @@ func NewPipe(ctx context.Context, r io.ReadCloser, w io.WriteCloser) *Pipe {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pipe) Start() (err error) {
|
func (p *Pipe) Start() (err error) {
|
||||||
err = CopyClose(&p.w, &p.r)
|
err = CopyClose(&p.w, &p.r, 0)
|
||||||
switch {
|
switch {
|
||||||
case
|
case
|
||||||
// NOTE: ignoring broken pipe and connection reset by peer
|
// NOTE: ignoring broken pipe and connection reset by peer
|
||||||
|
@ -117,7 +117,7 @@ func getHTTPFlusher(dst io.Writer) httpFlusher {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const copyBufSize = 32 * 1024
|
const copyBufSize = synk.SizedPoolThreshold
|
||||||
|
|
||||||
var bytesPool = synk.NewBytesPool()
|
var bytesPool = synk.NewBytesPool()
|
||||||
|
|
||||||
|
@ -125,7 +125,7 @@ var bytesPool = synk.NewBytesPool()
|
||||||
// Use of this source code is governed by a BSD-style
|
// Use of this source code is governed by a BSD-style
|
||||||
// This is a copy of io.Copy with context and HTTP flusher handling
|
// This is a copy of io.Copy with context and HTTP flusher handling
|
||||||
// Author: yusing <yusing@6uo.me>.
|
// Author: yusing <yusing@6uo.me>.
|
||||||
func CopyClose(dst *ContextWriter, src *ContextReader) (err error) {
|
func CopyClose(dst *ContextWriter, src *ContextReader, sizeHint int) (err error) {
|
||||||
size := copyBufSize
|
size := copyBufSize
|
||||||
if l, ok := src.Reader.(*io.LimitedReader); ok {
|
if l, ok := src.Reader.(*io.LimitedReader); ok {
|
||||||
if int64(size) > l.N {
|
if int64(size) > l.N {
|
||||||
|
@ -135,6 +135,8 @@ func CopyClose(dst *ContextWriter, src *ContextReader) (err error) {
|
||||||
size = int(l.N)
|
size = int(l.N)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if sizeHint > 0 {
|
||||||
|
size = min(size, sizeHint)
|
||||||
}
|
}
|
||||||
buf := bytesPool.GetSized(size)
|
buf := bytesPool.GetSized(size)
|
||||||
defer bytesPool.Put(buf)
|
defer bytesPool.Put(buf)
|
||||||
|
@ -196,6 +198,6 @@ func CopyClose(dst *ContextWriter, src *ContextReader) (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func CopyCloseWithContext(ctx context.Context, dst io.Writer, src io.Reader) (err error) {
|
func CopyCloseWithContext(ctx context.Context, dst io.Writer, src io.Reader, sizeHint int) (err error) {
|
||||||
return CopyClose(NewContextWriter(ctx, dst), NewContextReader(ctx, src))
|
return CopyClose(NewContextWriter(ctx, dst), NewContextReader(ctx, src), sizeHint)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,8 @@ type weakBuf = unsafe.Pointer
|
||||||
|
|
||||||
func makeWeak(b *[]byte) weakBuf {
|
func makeWeak(b *[]byte) weakBuf {
|
||||||
ptr := runtime_registerWeakPointer(unsafe.Pointer(b))
|
ptr := runtime_registerWeakPointer(unsafe.Pointer(b))
|
||||||
runtime.KeepAlive(ptr)
|
|
||||||
addCleanup(b, addGCed, cap(*b))
|
addCleanup(b, addGCed, cap(*b))
|
||||||
|
runtime.KeepAlive(ptr)
|
||||||
return weakBuf(ptr)
|
return weakBuf(ptr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,8 +29,9 @@ func runtime_registerWeakPointer(unsafe.Pointer) unsafe.Pointer
|
||||||
func runtime_makeStrongFromWeak(unsafe.Pointer) unsafe.Pointer
|
func runtime_makeStrongFromWeak(unsafe.Pointer) unsafe.Pointer
|
||||||
|
|
||||||
type BytesPool struct {
|
type BytesPool struct {
|
||||||
pool chan weakBuf
|
sizedPool chan weakBuf
|
||||||
initSize int
|
unsizedPool chan weakBuf
|
||||||
|
initSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -41,16 +42,18 @@ const (
|
||||||
const (
|
const (
|
||||||
InPoolLimit = 32 * mb
|
InPoolLimit = 32 * mb
|
||||||
|
|
||||||
DefaultInitBytes = 4 * kb
|
UnsizedAvg = 4 * kb
|
||||||
PoolThreshold = 256 * kb
|
SizedPoolThreshold = 256 * kb
|
||||||
DropThresholdHigh = 4 * mb
|
DropThreshold = 4 * mb
|
||||||
|
|
||||||
PoolSize = InPoolLimit / PoolThreshold
|
SizedPoolSize = InPoolLimit * 8 / 10 / SizedPoolThreshold
|
||||||
|
UnsizedPoolSize = InPoolLimit * 2 / 10 / UnsizedAvg
|
||||||
)
|
)
|
||||||
|
|
||||||
var bytesPool = &BytesPool{
|
var bytesPool = &BytesPool{
|
||||||
pool: make(chan weakBuf, PoolSize),
|
sizedPool: make(chan weakBuf, SizedPoolSize),
|
||||||
initSize: DefaultInitBytes,
|
unsizedPool: make(chan weakBuf, UnsizedPoolSize),
|
||||||
|
initSize: UnsizedAvg,
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBytesPool() *BytesPool {
|
func NewBytesPool() *BytesPool {
|
||||||
|
@ -60,7 +63,7 @@ func NewBytesPool() *BytesPool {
|
||||||
func (p *BytesPool) Get() []byte {
|
func (p *BytesPool) Get() []byte {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case bWeak := <-p.pool:
|
case bWeak := <-p.unsizedPool:
|
||||||
bPtr := getBufFromWeak(bWeak)
|
bPtr := getBufFromWeak(bWeak)
|
||||||
if bPtr == nil {
|
if bPtr == nil {
|
||||||
continue
|
continue
|
||||||
|
@ -68,18 +71,19 @@ func (p *BytesPool) Get() []byte {
|
||||||
addReused(cap(bPtr))
|
addReused(cap(bPtr))
|
||||||
return bPtr
|
return bPtr
|
||||||
default:
|
default:
|
||||||
return make([]byte, 0, p.initSize)
|
return make([]byte, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *BytesPool) GetSized(size int) []byte {
|
func (p *BytesPool) GetSized(size int) []byte {
|
||||||
if size <= PoolThreshold {
|
if size <= SizedPoolThreshold {
|
||||||
|
addNonPooled(size)
|
||||||
return make([]byte, size)
|
return make([]byte, size)
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case bWeak := <-p.pool:
|
case bWeak := <-p.sizedPool:
|
||||||
bPtr := getBufFromWeak(bWeak)
|
bPtr := getBufFromWeak(bWeak)
|
||||||
if bPtr == nil {
|
if bPtr == nil {
|
||||||
continue
|
continue
|
||||||
|
@ -90,25 +94,35 @@ func (p *BytesPool) GetSized(size int) []byte {
|
||||||
return (bPtr)[:size]
|
return (bPtr)[:size]
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case p.pool <- bWeak:
|
case p.sizedPool <- bWeak:
|
||||||
default:
|
default:
|
||||||
// just drop it
|
// just drop it
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
addNonPooled(size)
|
||||||
return make([]byte, size)
|
return make([]byte, size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *BytesPool) Put(b []byte) {
|
func (p *BytesPool) Put(b []byte) {
|
||||||
size := cap(b)
|
size := cap(b)
|
||||||
if size <= PoolThreshold || size > DropThresholdHigh {
|
if size > DropThreshold {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
b = b[:0]
|
b = b[:0]
|
||||||
w := makeWeak(&b)
|
w := makeWeak(&b)
|
||||||
|
if size <= SizedPoolThreshold {
|
||||||
|
p.put(w, p.unsizedPool)
|
||||||
|
} else {
|
||||||
|
p.put(w, p.sizedPool)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//go:inline
|
||||||
|
func (p *BytesPool) put(w weakBuf, pool chan weakBuf) {
|
||||||
select {
|
select {
|
||||||
case p.pool <- w:
|
case pool <- w:
|
||||||
default:
|
default:
|
||||||
// just drop it
|
// just drop it
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,10 +14,16 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
numReused, sizeReused uint64
|
numNonPooled, sizeNonPooled uint64
|
||||||
numGCed, sizeGCed uint64
|
numReused, sizeReused uint64
|
||||||
|
numGCed, sizeGCed uint64
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func addNonPooled(size int) {
|
||||||
|
atomic.AddUint64(&numNonPooled, 1)
|
||||||
|
atomic.AddUint64(&sizeNonPooled, uint64(size))
|
||||||
|
}
|
||||||
|
|
||||||
func addReused(size int) {
|
func addReused(size int) {
|
||||||
atomic.AddUint64(&numReused, 1)
|
atomic.AddUint64(&numReused, 1)
|
||||||
atomic.AddUint64(&sizeReused, uint64(size))
|
atomic.AddUint64(&sizeReused, uint64(size))
|
||||||
|
@ -48,6 +54,8 @@ func initPoolStats() {
|
||||||
Str("sizeReused", strutils.FormatByteSize(atomic.LoadUint64(&sizeReused))).
|
Str("sizeReused", strutils.FormatByteSize(atomic.LoadUint64(&sizeReused))).
|
||||||
Uint64("numGCed", atomic.LoadUint64(&numGCed)).
|
Uint64("numGCed", atomic.LoadUint64(&numGCed)).
|
||||||
Str("sizeGCed", strutils.FormatByteSize(atomic.LoadUint64(&sizeGCed))).
|
Str("sizeGCed", strutils.FormatByteSize(atomic.LoadUint64(&sizeGCed))).
|
||||||
|
Uint64("numNonPooled", atomic.LoadUint64(&numNonPooled)).
|
||||||
|
Str("sizeNonPooled", strutils.FormatByteSize(atomic.LoadUint64(&sizeNonPooled))).
|
||||||
Msg("bytes pool stats")
|
Msg("bytes pool stats")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
package synk
|
package synk
|
||||||
|
|
||||||
|
func addNonPooled(size int) {}
|
||||||
func addReused(size int) {}
|
func addReused(size int) {}
|
||||||
func addGCed(size int) {}
|
func addGCed(size int) {}
|
||||||
func initPoolStats() {}
|
func initPoolStats() {}
|
||||||
|
|
|
@ -218,7 +218,7 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
|
||||||
rw.WriteHeader(res.StatusCode)
|
rw.WriteHeader(res.StatusCode)
|
||||||
|
|
||||||
err = utils.CopyCloseWithContext(ctx, rw, res.Body)
|
err = utils.CopyCloseWithContext(ctx, rw, res.Body, int(res.ContentLength))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, context.Canceled) {
|
if !errors.Is(err, context.Canceled) {
|
||||||
p.getErrorHandler()(rw, req, err)
|
p.getErrorHandler()(rw, req, err)
|
||||||
|
|
Loading…
Add table
Reference in a new issue