Pipelines in Golang
Ever wanted to push a bunch of values through a pipeline, to be worked on by different functions (stages), in parallel or sequentially? In Go there are a few ways to accomplish that. This article summarizes and constrasts a few known approaches.
To be clear, this article isn’t about emulating Unix shell pipelines in Go. I consider the package that link points to as the go-to solution if you want to build a Unix pipeline in Go.
What’s in a pipeline?
Why would you need a pipeline? If you want to work on a value and pass it from stage to stage, what could be simpler than:
func main() {
for i := 0; i < 10; i++ {
result := start("my-value") // could be anything, really
fmt.Println("The result is:", result)
}
}
func start(val string) string {
val1 := stage1(val)
val2 := stage2(val1)
val3 := stage3(val2)
return val3
}
// dummy functions
func stage1(val string) string {
return val + " -> stage 1"
}
func stage2(val string) string {
return val + " -> stage 2"
}
func stage3(val string) string {
return val + " -> stage 3"
}
There’s, that’s just regular, imperative programming. It’s simple, it’s easy, it’s great. There’s no error handling, I left it out to keep things short, but you should definitely do that. Now, what if you want to run things in parallel? A first thought might be to do something like this:
func main() {
// input and output channels for the pipeline
input := make(chan string)
output := make(chan string)
go start(input, output) // run start() as a goroutine
for i := 0; i < 10; i++ {
input <- "my-value"
fmt.Println("The result is:", <-output)
}
// close the input channel so start() will exit and can clean up after
// itself if it so wishes.
close(input)
}
func start(input <-chan string, output chan<- string) {
// will loop until input is closed
for i := range input {
val1 := stage1(i)
val2 := stage2(val1)
val3 := stage3(val2)
output <- val3
}
}
Whew, now we’re using goroutines and channels, two of the things Go is known for. Yet we have not gained any efficiency. We’ve merely increased the complexity of the solution. There’s two big reasons for that:
- We’ve moved all calculations from one goroutine (the main goroutine)
to just one other goroutine:
start()
. This can’t possibly make things any faster. Possible solution: start more goroutines. - We immediately demand output after supplying input. This allows only
one value to be in-flight. Yet we can’t split up the loop into an input
loop and an output loop. It would deadlock! The
start()
goroutine would be blocked on theoutput <- val3
channel send after the first iteration of the input loop. Thus, even if we applied the solution to problem #1, everything would still be sequential. Possible solution: use buffered channels to allow more values to be in-flight.
Let’s apply the proposed solutions and see what we get:
func main() {
// input and output channels for the pipeline
nvals := 10
input := make(chan string, nvals)
output := make(chan string, nvals)
for i := 0; i < nvals; i++ {
go start(input, output)
}
for i := 0; i < nvals; i++ {
input <- "my-value"
}
close(input)
for i := 0; i < nvals; i++ {
fmt.Println("The result is:", <-output)
}
}
func start(input <-chan string, output chan<- string) {
// will loop until input is closed
for i := range input {
val1 := stage1(i)
val2 := stage2(val1)
val3 := stage3(val2)
output <- val3
}
}
Now we’re calculating in parallel. However, though we can decrease the
number of goroutines, we’re still required to construct channels as
large as the number of values we’re going to pass in. And if we decrease
the channel buffers, we need more goroutines to “buffer up” more values
(which is even worse). So we either have O(N) goroutines or channels
with an O(N)-buffer. That’s no good. Perhaps by using another go
construct, we can do better. Enter select
:
func main() {
// input and output channels for the pipeline
nvals := 10
input := make(chan string)
output := make(chan string)
for i := 0; i < 2; i++ {
go start(input, output)
}
for i := 0; i < nvals; {
select {
case input <- "my-value":
// succesfully input value
case res := <-output:
fmt.Println("The result is:", res)
i++
}
}
close(input)
}
func start(input <-chan string, output chan<- string) {
// will loop until input is closed
for i := range input {
val1 := stage1(i)
val2 := stage2(val1)
val3 := stage3(val2)
output <- val3
}
}
We can vary the number of goroutines and the buffers independently (and
measure how we get the best performance). By combining this with
runtime.GOMAXPROCS(runtime.NumCPU())
, all cores can be used:
cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
// start as much workers as we have cpus
for i := 0; i < cpus; i++ {
go start(input, output)
}
Yet, we haven’t really been focusing on the pipeline at all. What we’ve created is just a primitive worker pool. If a bit more queueing logic is added, a perfectly fine and usable worker queue comes out.
The concept of worker queues is probably well-known among those who follow how big web companies are performing their heavy processing tasks. Projects like resque and celery provide a means to abstract a worker queue from the programming language used. Sometimes they do this by using a database/message broker like redis and consorts as intermediaries.
I’ve found it helpful to imagine a pipeline as a sequence of worker pools passing values to each other. Each worker pool is called a stage. It’s possible for a worker pool to have just one worker, in which case that stage would run in sequence and won’t need synchronization.
To turn our earlier example into a true pipeline, we need to break out the stages and let them run in their own worker pool:
func main() {
// input and output channels for the pipeline
nvals := 10
ncpu := runtime.NumCPU()
runtime.GOMAXPROCS(ncpu)
stages := []func(<-chan string, chan<- string){
stage1,
stage2,
}
input := make(chan string)
in := input
for _, stage := range stages {
out := make(chan string)
go stage(in, out) // run start() as a goroutine
in = out // the output of one stage is the input of the next one
}
output := in // the output of the last stage
for i := 0; i < nvals; {
select {
case input <- "my-value":
// succesfully input value
case res := <-output:
fmt.Println("The result is:", res)
i++
}
}
close(input)
}
func stage1(input <-chan string, output chan<- string) {
// propagate the closing of the input channel
defer close(output)
for i := range input {
output <- stage1fn(i)
}
}
func stage2(input <-chan string, output chan<- string) {
// propagate the closing of the input channel
defer close(output)
for i := range input {
output <- stage2fn(i)
}
}
func stage1fn(val string) string {
return val + " -> stage 1"
}
func stage2fn(val string) string {
return val + " -> stage 2"
}
Very groovy. But the astute reader will notice that we’ve regressed quite a bit: we only start as many goroutines as we have stages and all stages naturally run in sequence. This means that the performance will be on par with the very first example, which was just a sequence of plain function calls. In fact, performance will be worse, as we get goroutine creation and channel sync overhead. It’s also unclear what advantages, if any, this approach has over regular function calls. It just seems more complex.
Perhaps it’s time to stop dabbling around and provide some powerful helper functions that will let us compose pipelines that use
TODO: Wide/Batch/Single/…
Now we can also see what advantages this might pose over just starting
#cpus
workers and letting each of them run all the stages until the
input is exhausted, as was proposed near the beginning of the article:
- More granular control over parallelism
- Work stealing (contrast web vs parallel lines approach, graphic?).
This can equal more efficient system resource utilization. Imagine
that stage #1 is reponsible for reading files from storage, and stage
#2 for processing them (like computing MD5’s). In the single worker
pool approach, when a worker is busy calculating an MD5, it’s not busy
reading a file. Yet these are usually things that can be done in
parallel, the kernel issues a DMA request to wherever the request
contents are and the requesting thread can go do something else (this
approach is enshrined in the
epoll/kqueue/...
APIs). When using pipelines, utilization is higher, because all workers for stage 2 can be busy calculating MD5’s and all workers for stage 1 can have a storage request underway. That way there’s less downtime and thus lower latency at little extra CPU cost.
This also indicates that it’s a good idea to split up IO tasks from CPU-heavy tasks and put them in separate stages. On the flipside, splitting a CPU-heavy task into multiple stages might not yield any gains at all (it might actually decrease performance because of cache efficiency losses).
Yet when we know we’re working with Go, we can simplify and generalize this concept. Enter the pipeline.
Queues have a nice analogue in Go: channels. They provide some of the semantics we are looking for:
- Safe to use from multiple goroutines
- First-in First-out (FIFO)
There’s one little catch: channels are bounded by their capacity. This
is true for any queue. This can be a good thing: sending into a full
channel will block until there is room, so it’s a natural backpressure
mechanism. Clever ways can be thought of to deal with this backpressure,
such as throwing away the work, or starting up another pipeline.
However, to truly emulate queue’s such as those represented by redis’
LISTs better, one could prepend a pipeline stage that uses an infinite
channel to buffer up all input.
Making it seem to the caller that there is no backpressure. The caveat
being that it’s possible for the system to run out of memory and thus
make the program fail. It’s possible to build safeguards for this but
they’re not as “convenient” as checking for NULL
from malloc()
.
Why would one want to use a pipeline instead of just straight-line code or starting a goroutine for every item you want to process in parallel (and then waiting on a sync.WaitGroup or extracting N values from a channel)?
- Bounded parallelism: avoid spawning 1000’s of goroutines when it’s not necessary (you most likely don’t have 1000’s of processors). Goroutines are cheap, but they’re not that cheap. 1
- Some pipelines support cancellation, freeing you from building it yourself in an ad-hoc way everytime.
- You can pass some pipelines around. This allows inputting values into an already running pipeline without worrying. This makes sure the end results are sequenced correctly in time (after other calls), yet they don’t have to block until the work is done.
- Batching: (combine results into big INSERT statements), difficult to do with many parallel pipelines (worker pools) as in the beginning of the article.
Some links describing possible instances of pipeline:
Use cases
Streaming pipeline
TCP sockets can be seen as an instance of a streaming pipeline.
Value pipeline
Inside of the boundaries of a programming language, it’s often easiest to think of a pipeline as a bunch of values to be passed through a few stages and spit out at the end. Contrast this with a stream of values where one has to make an effort to discern the logical pieces (messages).
When working with one language, this is jut not necessary. We already
have a perfectly fine way to delimit a message: types. To make things
generic, it seems like we’d have to pass interface{}
values.
Ad-hoc pipeline
Go blog approach.
Function pipeline
Dimitri’s approach.
-
At the moment of speaking (Go 1.3), creating a goroutine costs around 5000 bytes. ↩