package events import ( "runtime/debug" "time" "github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/task" ) type ( EventQueue struct { task *task.Task queue []Event ticker *time.Ticker flushInterval time.Duration onFlush OnFlushFunc onError OnErrorFunc } OnFlushFunc = func(events []Event) OnErrorFunc = func(err gperr.Error) ) const eventQueueCapacity = 10 // NewEventQueue returns a new EventQueue with the given // queueTask, flushInterval, onFlush and onError. // // The returned EventQueue will start a goroutine to flush events in the queue // when the flushInterval is reached. // // The onFlush function is called when the flushInterval is reached and the queue is not empty, // // The onError function is called when an error received from the errCh, // or panic occurs in the onFlush function. Panic will cause a E.ErrPanicRecv error. // // flushTask.Finish must be called after the flush is done, // but the onFlush function can return earlier (e.g. run in another goroutine). // // If task is canceled before the flushInterval is reached, the events in queue will be discarded. func NewEventQueue(queueTask *task.Task, flushInterval time.Duration, onFlush OnFlushFunc, onError OnErrorFunc) *EventQueue { return &EventQueue{ task: queueTask, queue: make([]Event, 0, eventQueueCapacity), ticker: time.NewTicker(flushInterval), flushInterval: flushInterval, onFlush: onFlush, onError: onError, } } func (e *EventQueue) Start(eventCh <-chan Event, errCh <-chan gperr.Error) { origOnFlush := e.onFlush // recover panic in onFlush when in production mode e.onFlush = func(events []Event) { defer func() { if err := recover(); err != nil { if err, ok := err.(error); ok { e.onError(gperr.Wrap(err).Subject(e.task.Name())) } else { e.onError(gperr.New("recovered panic in onFlush").Withf("%v", err).Subject(e.task.Name())) } if common.IsDebug { panic(string(debug.Stack())) } } }() origOnFlush(events) } go func() { defer e.ticker.Stop() defer e.task.Finish(nil) for { select { case <-e.task.Context().Done(): return case <-e.ticker.C: if len(e.queue) > 0 { // clone -> clear -> flush queue := make([]Event, len(e.queue)) copy(queue, e.queue) e.queue = e.queue[:0] e.onFlush(queue) } e.ticker.Reset(e.flushInterval) case event, ok := <-eventCh: if !ok { return } e.queue = append(e.queue, event) case err, ok := <-errCh: if !ok { return } if err != nil { e.onError(err) } } } }() }