Skip to content

Commit

Permalink
Refactor & add test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
bearaujus-test committed Oct 14, 2023
1 parent 6232647 commit 2fc22a5
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 41 deletions.
19 changes: 4 additions & 15 deletions bworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func (bw *bWorker) Do(job Job) {
if job == nil || bw.shutdown {
return
}
bw.jobWG.Add(1)
bw.jobs <- job
job.queueToChan(bw.jobWG, bw.jobs)
}

func (bw *bWorker) Wait() {
Expand All @@ -57,21 +56,11 @@ func (bw *bWorker) Shutdown() {
}

func (bw *bWorker) ResetErr() {
if bw.optErr == nil {
return
}
bw.mu.Lock()
*bw.optErr = nil
bw.mu.Unlock()
resetOptErrIfUsed(bw.mu, bw.optErr)
}

func (bw *bWorker) ResetErrs() {
if bw.optErrs == nil {
return
}
bw.mu.Lock()
*bw.optErrs = nil
bw.mu.Unlock()
resetOptErrsIfUsed(bw.mu, bw.optErrs)
}

func (bw *bWorker) startWorkers(numWorkers int) {
Expand All @@ -87,6 +76,6 @@ func (bw *bWorker) startWorkers(numWorkers int) {
func (bw *bWorker) startWorker() {
defer bw.wg.Done()
for job := range bw.jobs {
job.execute(bw.optRetry, bw.jobWG, bw.mu, bw.optErr, bw.optErrs)
job.do(bw.jobWG, bw.mu, bw.optRetry, bw.optErr, bw.optErrs)
}
}
17 changes: 3 additions & 14 deletions bworker_flex.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func (bwf *bWorkerFlex) Do(job Job) {
if job == nil || bwf.shutdown {
return
}
bwf.jobWG.Add(1)
go job.execute(bwf.optRetry, bwf.jobWG, bwf.mu, bwf.optErr, bwf.optErrs)
job.executeInBackground(bwf.jobWG, bwf.mu, bwf.optRetry, bwf.optErr, bwf.optErrs)
}

func (bwf *bWorkerFlex) Wait() {
Expand All @@ -51,19 +50,9 @@ func (bwf *bWorkerFlex) Shutdown() {
}

func (bwf *bWorkerFlex) ResetErr() {
if bwf.optErr == nil {
return
}
bwf.mu.Lock()
*bwf.optErr = nil
bwf.mu.Unlock()
resetOptErrIfUsed(bwf.mu, bwf.optErr)
}

func (bwf *bWorkerFlex) ResetErrs() {
if bwf.optErrs == nil {
return
}
bwf.mu.Lock()
*bwf.optErrs = nil
bwf.mu.Unlock()
resetOptErrsIfUsed(bwf.mu, bwf.optErrs)
}
11 changes: 10 additions & 1 deletion bworker_flex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,16 @@ func TestBWorkerFlex(t *testing.T) {
bw.ResetErrs()
})

t.Run("test corner case for worker option", func(t *testing.T) {
t.Run("occur error when error option not set", func(t *testing.T) {
bw := NewBWorkerFlex()
defer bw.Shutdown()

bw.Do(func() error {
return errors.New("an error")
})
})

t.Run("test corner case for worker flex option", func(t *testing.T) {
bw := NewBWorkerFlex(
WithRetryFlex(0),
WithErrorFlex(nil),
Expand Down
9 changes: 9 additions & 0 deletions bworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ func TestBWorker(t *testing.T) {
bw.ResetErrs()
})

t.Run("occur error when error option not set", func(t *testing.T) {
bw := NewBWorker(1)
defer bw.Shutdown()

bw.Do(func() error {
return errors.New("an error")
})
})

t.Run("test corner case for worker option", func(t *testing.T) {
bw := NewBWorker(1,
WithJobBuffer(0),
Expand Down
43 changes: 43 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package bworker

import "sync"

func setOptErrIfUsed(mu *sync.Mutex, optErr *error, err error) {
if optErr == nil {
return
}

mu.Lock()
*optErr = err
mu.Unlock()
}

func resetOptErrIfUsed(mu *sync.Mutex, optErr *error) {
if optErr == nil {
return
}

mu.Lock()
*optErr = nil
mu.Unlock()
}

func appendOptErrsIfUsed(mu *sync.Mutex, optErrs *[]error, err error) {
if optErrs == nil {
return
}

mu.Lock()
*optErrs = append(*optErrs, err)
mu.Unlock()
}

func resetOptErrsIfUsed(mu *sync.Mutex, optErrs *[]error) {
if optErrs == nil {
return
}

mu.Lock()
*optErrs = nil
mu.Unlock()
}
24 changes: 13 additions & 11 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ import "sync"
// Job represent a function to be executed by a worker.
type Job func() error

func (j Job) execute(retry int, wg *sync.WaitGroup, mu *sync.Mutex, optErr *error, optErrs *[]error) {
func (j Job) executeInBackground(wg *sync.WaitGroup, mu *sync.Mutex, retry int, optErr *error, optErrs *[]error) {
wg.Add(1)
go j.do(wg, mu, retry, optErr, optErrs)
}

func (j Job) queueToChan(wg *sync.WaitGroup, c chan Job) {
wg.Add(1)
c <- j
}

func (j Job) do(wg *sync.WaitGroup, mu *sync.Mutex, retry int, optErr *error, optErrs *[]error) {
defer wg.Done()
attempts := 1 + retry
for attempt := 0; attempt < attempts; attempt++ {
Expand All @@ -16,15 +26,7 @@ func (j Job) execute(retry int, wg *sync.WaitGroup, mu *sync.Mutex, optErr *erro
if attempt != attempts-1 {
continue
}
if optErr != nil {
mu.Lock()
*optErr = err
mu.Unlock()
}
if optErrs != nil {
mu.Lock()
*optErrs = append(*optErrs, err)
mu.Unlock()
}
setOptErrIfUsed(mu, optErr, err)
appendOptErrsIfUsed(mu, optErrs, err)
}
}

0 comments on commit 2fc22a5

Please sign in to comment.