Skip to content

A minimal and efficient workerpool implementation in Go using goroutines.

License

Notifications You must be signed in to change notification settings

dpaks/goworkers

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

22 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

GoWorkers

CircleCI Codecov Go Report Card License

A minimal and efficient scalable workerpool implementation in Go using goroutines.

Note: Do not use master branch. Use the latest release.

GoDoc

Table of Contents

Installation

$ go get github.com/dpaks/goworkers

Examples

Basic
package main

import "github.com/dpaks/goworkers"

func main() {
	// initialise
	gw := goworkers.New()

	// non-blocking call
	gw.Submit(func() {
	// do your work here
	})

	// wait till your job finishes
	gw.Stop(false)
}
With arguments
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/dpaks/goworkers"
)

func main() {
	opts := goworkers.Options{Workers: 20}
	gw := goworkers.New(opts)
	
	// your actual work
	fn := func(i int) {
		fmt.Println("Start Job", i)
		time.Sleep(time.Duration(i) * time.Second)
		fmt.Println("End Job", i)
	}
	
	for _, value := range []int{9, 7, 1, 2, 3} {
		i := value
		gw.Submit(func() {
			fn(i)
		})
	}
	log.Println("Submitted!")
	
	gw.Stop(false)
}
Without arguments
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/dpaks/goworkers"
)

func main() {
	gw := goworkers.New()
	
	fn := func(i int) {
		fmt.Println("Start Job", i)
		time.Sleep(time.Duration(i) * time.Second)
		fmt.Println("End Job", i)
	}
	
	for _, value := range []int{9, 7, 1, 2, 3} {
		i := value
		gw.Submit(func() {
			fn(i)
		})
	}
	log.Println("Submitted!")
	
	gw.Stop(false)
}
Benchmark
package main

import (
    "log"
    "time"

    "github.com/dpaks/goworkers"
)

func main() {
    tStart := time.Now()

    gw := goworkers.New()

    fn := func() {
        time.Sleep(time.Duration(5) * time.Second)
    }

    for value := 500; value > 0; value-- {
        gw.Submit(func() {
            fn()
        })
    }

    gw.Stop(false)

    tEnd := time.Now()
    tDiff := tEnd.Sub(tStart)

    log.Println("Time taken to execute 500 jobs that were 5 seconds long is only", tDiff.Seconds(), "seconds!")
}

Output: 2020/07/03 20:03:01 Time taken to execute 500 jobs that were 5 seconds long is only 5.001186599 seconds!

To Receive Error from Job
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/dpaks/goworkers"
)

func main() {
    gw := goworkers.New()

    // You must strictly start reading from the error channel before invoking
    // SubmitCheckError() else you'll miss the updates.
    // You can employ any mechanism to read from this channel.
    go func() {
        // Error channel provides errors from job, if any
        for err := range gw.ErrChan {
            fmt.Println(err)
        }
    }()

    // This is your actual function
    fn := func(i int) error {
        // Do work here
        return fmt.Errorf("Got error %d", i)
    }

    // The job submit part
    for _, value := range []int{3, 2, 1} {
        i := value
        gw.SubmitCheckError(func() error {
            return fn(i)
        })
    }
    log.Println("Submitted!")

    // Wait for jobs to finish
    // Here, wait flag is set to true. Setting wait to true ensures that
    // the output channels are read from completely.
    // Stop(true) exits only when the error channel is completely read from.
    gw.Stop(true)
}
To Receive Output and Error from Job
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/dpaks/goworkers"
)

func main() {
    gw := goworkers.New()

    type myOutput struct {
        Idx  int
        Name string
    }

    // You must strictly start reading from the error and output channels
    // before invoking SubmitCheckResult() else you'll miss the updates.
    // You can employ any mechanism to read from these channels.
    go func() {
        for {
            select {
            // Error channel provides errors from job, if any
            case err, ok := <-gw.ErrChan:
                // The error channel is closed when the workers are done with their tasks.
                // When the channel is closed, ok is set to false
                if !ok {
                    return
                }
                fmt.Printf("Error: %s\n", err.Error())
            // Result channel provides output from job, if any
            // It will be of type interface{}
            case res, ok := <-gw.ResultChan:
                // The result channel is closed when the workers are done with their tasks.
                // When the channel is closed, ok is set to false
                if !ok {
                    return
                }
                fmt.Printf("Type: %T, Value: %+v\n", res, res)
            }
        }
    }()

    // This is your actual function
    fn := func(i int) (interface{}, error) {
        // Do work here

        // return error
        if i%2 == 0 {
            return nil, fmt.Errorf("Got error %d", i)
        }
        // return output
        return myOutput{Idx: i, Name: "dummy"}, nil
    }

    // The job submit part
    for _, value := range []int{3, 2, 1} {
        i := value
        gw.SubmitCheckResult(func() (interface{}, error) {
            return fn(i)
        })
    }
    log.Println("Submitted!")

    // Wait for jobs to finish
    // Here, wait flag is set to true. Setting wait to true ensures that
    // the output channels are read from completely.
    // Stop(true) exits only when both the result and the error channels are completely read from.
    gw.Stop(true)
}

TODO

  • Add logs toggle
  • When the goworkers machine is stopped, ensure that everything is cleanedup
  • Add support for a 'results' channel
  • An option to auto-adjust worker pool size
  • Introduce timeout

FAQ

Q. I don't want to use error channel. I only need output. What do I?

A. Listen only to output channel. It is not compulsory to listen to any channel if you don't need any output.

Q. I get duplicate output.

A. In the below wrong snippet, k and v are initialised only once. Since references are passed to the Submit function, they may get overwritten with the newer value.

Wrong code

for k, v := range myMap {
    wg.SubmitCheckResult(func() (interface{}, error) {
            return myFunc(k, v)
})

Correct code

for i, j := range myMap {
    k := i
    v := j
    wg.SubmitCheckResult(func() (interface{}, error) {
            return myFunc(k, v)
})

Q. Can I use a combination of Submit(), SubmitCheckError() and SubmitCheckResult() and still use output and error channels?

A. It is absolutely safe.

About

A minimal and efficient workerpool implementation in Go using goroutines.

Topics

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published

Languages