GoDoxy/internal/watcher/events/event_queue.go
2025-05-10 10:46:31 +08:00

106 lines
2.7 KiB
Go

package events
import (
"runtime/debug"
"time"
"github.com/yusing/go-proxy/internal/common"
"github.com/yusing/go-proxy/internal/gperr"
"github.com/yusing/go-proxy/internal/task"
)
type (
EventQueue struct {
task *task.Task
queue []Event
ticker *time.Ticker
flushInterval time.Duration
onFlush OnFlushFunc
onError OnErrorFunc
}
OnFlushFunc = func(events []Event)
OnErrorFunc = func(err gperr.Error)
)
const eventQueueCapacity = 10
// NewEventQueue returns a new EventQueue with the given
// queueTask, flushInterval, onFlush and onError.
//
// The returned EventQueue will start a goroutine to flush events in the queue
// when the flushInterval is reached.
//
// The onFlush function is called when the flushInterval is reached and the queue is not empty,
//
// The onError function is called when an error received from the errCh,
// or panic occurs in the onFlush function. Panic will cause a E.ErrPanicRecv error.
//
// flushTask.Finish must be called after the flush is done,
// but the onFlush function can return earlier (e.g. run in another goroutine).
//
// If task is canceled before the flushInterval is reached, the events in queue will be discarded.
func NewEventQueue(queueTask *task.Task, flushInterval time.Duration, onFlush OnFlushFunc, onError OnErrorFunc) *EventQueue {
return &EventQueue{
task: queueTask,
queue: make([]Event, 0, eventQueueCapacity),
ticker: time.NewTicker(flushInterval),
flushInterval: flushInterval,
onFlush: onFlush,
onError: onError,
}
}
func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan gperr.Error) {
origOnFlush := e.onFlush
// recover panic in onFlush when in production mode
e.onFlush = func(events []Event) {
defer func() {
if errV := recover(); errV != nil {
if err, ok := errV.(error); ok {
e.onError(gperr.Wrap(err).Subject(e.task.Name()))
} else {
e.onError(gperr.New("recovered panic in onFlush").Withf("%v", errV).Subject(e.task.Name()))
}
if common.IsDebug {
panic(string(debug.Stack()))
}
}
}()
origOnFlush(events)
}
go func() {
defer e.ticker.Stop()
defer e.task.Finish(nil)
for {
select {
case <-e.task.Context().Done():
return
case <-e.ticker.C:
if len(e.queue) > 0 {
// clone -> clear -> flush
queue := make([]Event, len(e.queue))
copy(queue, e.queue)
e.queue = e.queue[:0]
e.onFlush(queue)
}
e.ticker.Reset(e.flushInterval)
case event, ok := <-eventCh:
if !ok {
return
}
e.queue = append(e.queue, event)
case err, ok := <-errCh:
if !ok {
return
}
if err != nil {
e.onError(err)
}
}
}
}()
}