Skip to content

Commit

Permalink
Refactor batcher to fix race issues
Browse files Browse the repository at this point in the history
- Replace done channel with a WaitGroup to fix a race condition and to
  properly wait for *all* active batches during shutdown
- Use time.AfterFunc() instead of manually spawning a goroutine and
  calling sleep()
- Improve a few method names and doc comments
- move some logic from batch() to submitWork() so it's always obvious
  what is or isn't guarded by the lock
  • Loading branch information
eapache committed Feb 19, 2024
1 parent 18a6173 commit 8652ab4
Showing 1 changed file with 26 additions and 27 deletions.
53 changes: 26 additions & 27 deletions batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ type Batcher struct {
timeout time.Duration
prefilter func(interface{}) error

lock sync.Mutex
submit chan *work
doWork func([]interface{}) error
done chan bool
lock sync.Mutex
submit chan *work
doWork func([]interface{}) error
batchCounter sync.WaitGroup
flushTimer *time.Timer
}

// New constructs a new batcher that will batch all calls to Run that occur within
// `timeout` time before calling doWork just once for the entire batch. The doWork
// function must be safe to run concurrently with itself as this may occur, especially
// when the timeout is small.
// when the doWork function is slow, or the timeout is small.
func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher {
return &Batcher{
timeout: timeout,
Expand Down Expand Up @@ -70,21 +71,23 @@ func (b *Batcher) submitWork(w *work) {
b.lock.Lock()
defer b.lock.Unlock()

// kick off a new batch if needed
if b.submit == nil {
b.done = make(chan bool)
b.batchCounter.Add(1)
b.submit = make(chan *work, 4)
go b.batch()
go b.batch(b.submit)
b.flushTimer = time.AfterFunc(b.timeout, b.flushCurrentBatch)
}

// then add this work to the current batch
b.submit <- w
}

func (b *Batcher) batch() {
func (b *Batcher) batch(input <-chan *work) {
defer b.batchCounter.Done()

var params []interface{}
var futures []chan error
input := b.submit

go b.timer()

for work := range input {
params = append(params, work.param)
Expand All @@ -97,37 +100,33 @@ func (b *Batcher) batch() {
future <- ret
close(future)
}
close(b.done)
}

func (b *Batcher) timer() {
time.Sleep(b.timeout)

b.flush()
}

// Shutdown flush the changes and wait to be saved
// Shutdown flushes and executes any pending batches. If wait is true, it also waits for the pending batches
// to finish executing before it returns. This can be used to avoid waiting for the timeout to expire when
// gracefully shutting down your application. Calling Run at any point after calling Shutdown will lead to
// undefined behaviour.
func (b *Batcher) Shutdown(wait bool) {
b.flush()
b.flushCurrentBatch()

if wait {
if b.done != nil {
// wait done channel
<-b.done
}
b.batchCounter.Wait()
}
}

// Flush saves the changes before the timer expires.
// It is useful to flush the changes when you shutdown your application
func (b *Batcher) flush() {
func (b *Batcher) flushCurrentBatch() {
b.lock.Lock()
defer b.lock.Unlock()

if b.submit == nil {
return
}

// stop the timer to avoid spurious flushes and trigger immediate cleanup in case this flush was
// triggered manually by a call to Shutdown (it has to happen inside the lock, so it can't be done
// in the Shutdown method directly)
b.flushTimer.Stop()

close(b.submit)
b.submit = nil
}

0 comments on commit 8652ab4

Please sign in to comment.