Minimalistic and High-performance goroutine worker pool written in Go
Minimalistic and High-performance goroutine worker pool written in Go
This library is meant to provide a simple way to limit concurrency when executing some function over a limited resource or service.
Some common scenarios include:
go get -u github.com/alitto/pond
package mainimport ( "fmt"
"github.com/alitto/pond"
)
func main() {
// Create a buffered (non-blocking) pool that can scale up to 100 workers // and has a buffer capacity of 1000 tasks pool := pond.New(100, 1000) // Submit 1000 tasks for i := 0; i < 1000; i++ { n := i pool.Submit(func() { fmt.Printf("Running task #%d\n", n) }) } // Stop the pool and wait for all submitted tasks to complete pool.StopAndWait()
}
package mainimport ( "fmt"
"github.com/alitto/pond"
)
func main() {
// Create an unbuffered (blocking) pool with a fixed // number of workers pool := pond.New(10, 0, pond.MinWorkers(10)) // Submit 1000 tasks for i := 0; i < 1000; i++ { n := i pool.Submit(func() { fmt.Printf("Running task #%d\n", n) }) } // Stop the pool and wait for all submitted tasks to complete pool.StopAndWait()
}
package mainimport ( "fmt"
"github.com/alitto/pond"
)
func main() {
// Create a pool pool := pond.New(10, 1000) defer pool.StopAndWait() // Create a task group group := pool.Group() // Submit a group of related tasks for i := 0; i < 20; i++ { n := i group.Submit(func() { fmt.Printf("Running group task #%d\n", n) }) } // Wait for all tasks in the group to complete group.Wait()
}
go // This will create a pool with 5 running worker goroutines pool := pond.New(10, 1000, pond.MinWorkers(5))
go // This will create a pool that will remove workers 100ms after they become idle pool := pond.New(10, 1000, pond.IdleTimeout(100 * time.Millisecond))
fmt.Printfwith the following contents:
Worker exits from a panic: [panic] \n Stack trace: [stack trace]). Example: ```go // Custom panic handler function panicHandler := func(p interface{}) { fmt.Printf("Task panicked: %v", p) }
// This will create a pool that will handle panics using a custom panic handler pool := pond.New(10, 1000, pond.PanicHandler(panicHandler)))
- **Strategy**: Configures the strategy used to resize the pool when backpressure is detected. You can create a custom strategy by implementing the `pond.ResizingStrategy` interface or choose one of the 3 presets: - **Eager**: maximizes responsiveness at the expense of higher resource usage, which can reduce throughput under certain conditions. This strategy is meant for worker pools that will operate at a small percentage of their capacity most of the time and may occasionally receive bursts of tasks. This is the default strategy. - **Balanced**: tries to find a balance between responsiveness and throughput. It's suitable for general purpose worker pools or those that will operate close to 50% of their capacity most of the time. - **Lazy**: maximizes throughput at the expense of responsiveness. This strategy is meant for worker pools that will operate close to their max. capacity most of the time.go // Example: create pools with different resizing strategies eagerPool := pond.New(10, 1000, pond.Strategy(pond.Eager())) balancedPool := pond.New(10, 1000, pond.Strategy(pond.Balanced())) lazyPool := pond.New(10, 1000, pond.Strategy(pond.Lazy())) ```
The following chart illustrates the behaviour of the different pool resizing strategies as the number of submitted tasks increases. Each line represents the number of worker goroutines in the pool (pool size) and the x-axis reflects the number of submitted tasks (cumulative).
As the name suggests, the "Eager" strategy always spawns an extra worker when there are no idles, which causes the pool to grow almost linearly with the number of submitted tasks. On the other end, the "Lazy" strategy creates one worker every N submitted tasks, where N is the maximum number of available CPUs (GOMAXPROCS). The "Balanced" strategy represents a middle ground between the previous two because it creates a worker every N/2 submitted tasks.
Each worker pool instance exposes useful metrics that can be queried through the following methods:
pool.RunningWorkers() int: Current number of running workers
pool.IdleWorkers() int: Current number of idle workers
pool.MinWorkers() int: Minimum number of worker goroutines
pool.MaxWorkers() int: Maxmimum number of worker goroutines
pool.MaxCapacity() int: Maximum number of tasks that can be waiting in the queue at any given time (queue capacity)
pool.SubmittedTasks() uint64: Total number of tasks submitted since the pool was created
pool.WaitingTasks() uint64: Current number of tasks in the queue that are waiting to be executed
pool.SuccessfulTasks() uint64: Total number of tasks that have successfully completed their exection since the pool was created
pool.FailedTasks() uint64: Total number of tasks that completed with panic since the pool was created
pool.CompletedTasks() uint64: Total number of tasks that have completed their exection either successfully or with panic since the pool was created
In our Prometheus example we showcase how to configure collectors for these metrics and expose them to Prometheus.
Full API reference is available at https://pkg.go.dev/github.com/alitto/pond
See Benchmarks.
Here are some of the resources which have served as inspiration when writing this library:
Feel free to send a pull request if you consider there's something which can be improved. Also, please open up an issue if you run into a problem when using this library or just have a question.