diff --git a/batcher/batcher.go b/batcher/batcher.go index 544e48f..710d972 100644 --- a/batcher/batcher.go +++ b/batcher/batcher.go @@ -16,16 +16,17 @@ type Batcher struct { timeout time.Duration prefilter func(interface{}) error - lock sync.Mutex - submit chan *work - doWork func([]interface{}) error - done chan bool + lock sync.Mutex + submit chan *work + doWork func([]interface{}) error + batchCounter sync.WaitGroup + flushTimer *time.Timer } // New constructs a new batcher that will batch all calls to Run that occur within // `timeout` time before calling doWork just once for the entire batch. The doWork // function must be safe to run concurrently with itself as this may occur, especially -// when the timeout is small. +// when the doWork function is slow, or the timeout is small. func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher { return &Batcher{ timeout: timeout, @@ -70,21 +71,23 @@ func (b *Batcher) submitWork(w *work) { b.lock.Lock() defer b.lock.Unlock() + // kick off a new batch if needed if b.submit == nil { - b.done = make(chan bool) + b.batchCounter.Add(1) b.submit = make(chan *work, 4) - go b.batch() + go b.batch(b.submit) + b.flushTimer = time.AfterFunc(b.timeout, b.flushCurrentBatch) } + // then add this work to the current batch b.submit <- w } -func (b *Batcher) batch() { +func (b *Batcher) batch(input <-chan *work) { + defer b.batchCounter.Done() + var params []interface{} var futures []chan error - input := b.submit - - go b.timer() for work := range input { params = append(params, work.param) @@ -97,30 +100,21 @@ func (b *Batcher) batch() { future <- ret close(future) } - close(b.done) } -func (b *Batcher) timer() { - time.Sleep(b.timeout) - - b.flush() -} - -// Shutdown flush the changes and wait to be saved +// Shutdown flushes and executes any pending batches. If wait is true, it also waits for the pending batches +// to finish executing before it returns. This can be used to avoid waiting for the timeout to expire when +// gracefully shutting down your application. Calling Run at any point after calling Shutdown will lead to +// undefined behaviour. func (b *Batcher) Shutdown(wait bool) { - b.flush() + b.flushCurrentBatch() if wait { - if b.done != nil { - // wait done channel - <-b.done - } + b.batchCounter.Wait() } } -// Flush saves the changes before the timer expires. -// It is useful to flush the changes when you shutdown your application -func (b *Batcher) flush() { +func (b *Batcher) flushCurrentBatch() { b.lock.Lock() defer b.lock.Unlock() @@ -128,6 +122,11 @@ func (b *Batcher) flush() { return } + // stop the timer to avoid spurious flushes and trigger immediate cleanup in case this flush was + // triggered manually by a call to Shutdown (it has to happen inside the lock, so it can't be done + // in the Shutdown method directly) + b.flushTimer.Stop() + close(b.submit) b.submit = nil }