Parallel iterators, part 3: Consumers
14 November 2016
This post is the (long awaited, or at least long promised) third post in my series on Rayon’s parallel iterators. The previous two posts were some time ago, but I’ve been feeling inspired to push more on Rayon lately, and I remembered that I had never finished this blog post series.
Here is a list of the other posts in the series. If you haven’t read them, or don’t remember them, you will want to do so before reading this one:
- The first post, “Foundations”, explains how sequential iterators work. It is also a nice introduction to some of the key techniques for zero-cost abstraction.
- The second post, “Producers”, then shows how we can adapt the sequential iterator approach to permit parallel iteration. It focuses on the concept of parallel producers: these are basically splittable iterators. They give you the ability to say “break this producer into two producer, one of which produces the left half, and one the right half”. You can then process those two halves in parallel. When the number of work items gets small enough, you can convert a producer into a sequential iterator and consume it sequentially.
This third post will introduce parallel consumers. Parallel
consumers are the dual to a parallel producer: they abstract out the
parallel algorithm. We’ll use this to extend beyond the sum()
action
and cover how we can implementation a collect()
operation that
efficiently builds up a big vector of data.
(Note: originally, I had intended this third post to cover how
combinators like filter()
and flat_map()
work. These combinators
are special because they produce a variable number of
elements. However, in writing this post, it became clear that it would
be better to first introduce consumers, and then cover how to extend
them to support filter()
and flat_map()
.)
Motivating example
In this post, we’ll cover two examples. The first will be the running example from the previous two posts, a dot-product iterator chain:
vec1.par_iter()
.zip(vec2.par_iter())
.map(|(i, j)| i * j)
.sum()
After that, we’ll look at a slight variation, where instead of summing up the partial products, we collect them into a vector:
let c: Vec<_> =
vec1.par_iter()
.zip(vec2.par_iter())
.map(|(i, j)| i * j)
.collect(); // <-- only thing different
Review: parallel producers
In the second post, I introduced the basics of how parallel
iterators work. The key idea was the Producer
trait, which is a
variant on iterators that is amenable to “divide-and-conquer”
parallelization:
trait Producer: IntoIterator {
// Divide into two producers, one of which produces data
// with indices `0..index` and the other with indices `index..`.
fn split_at(self, index: usize) -> (Self, Self);
}
Unlike normal iterators, which only support extracting one element at
a time, a parallel producer can be split into two – and this can
happen again and again. At some point, when you think you’ve got small
enough pieces, you can convert it into an iterator (you see it extends
IntoIterator
) and work sequentially.
To see this in action, let’s revisit the sum_producer()
function
that I covered in my previous blog post;
sum_producer()
basically executes the sum()
operation, but
extracting data from a producer. Later on in the post, we’re going to
see how consumers abstract out the sum part of this code, leaving us
with a generic function that can be used to execute all sorts of
parallel iterator chains.
fn sum_producer<P>(mut producer: P, len: usize) -> i32
where P: Producer<Item=i32>
{
if len > THRESHOLD {
// Input too large: divide it up
let mid = len / 2;
let (left_producer, right_producer) = producer.split_at(mid);
let (left_sum, right_sum) = rayon::join(
|| sum_producer(left_producer, mid),
|| sum_producer(right_producer, len - mid));
left_sum + right_sum
} else {
// Input too small: sum sequentially
let mut sum = 0.0;
for value in producer {
sum += value;
}
sum
}
}
Enter parallel consumers
What we would like to do in this post is to try and make an abstract
version of this sum_producer()
function, one that can do all kinds
of parallel operations, rather than just summing up a list of numbers.
The way we do this is by introducing the notion of a parallel
consumer. Consumers represent the “action” at the end of the
iterator; they define what to do with each item that gets produced:
vec1.par_iter() // defines initial producer...
.zip(vec2.par_iter()) // ...wraps to make a new producer...
.map(|(i, j)| i * j) // ...wraps again...
.sum() // ...defines the consumer
The Consumer
trait looks like this. You can see it has a few more
moving parts than producers.
// `Item` is the type of value that the producer will feed us.
pub trait Consumer<Item>: Send + Sized {
// Type of value that consumer produces at the end.
type Result: Send;
// Splits the consumer into two consumers at `index`.
// Also returns a *reducer* for combining their results afterwards.
type Reducer: Reducer<Self::Result>;
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer);
// Convert the consumer into a *folder*, which can sequentially
// process items one by one and produce a result.
type Folder: Folder<Item, Result=Self::Result>;
fn into_folder(self) -> Self::Folder;
}
The basic workflow for driving a producer/consumer pair is as follows:
- You start out with one producer/consumer pair; using
split_at()
, these can be split into two pairs and then those pairs can be processed in parallel. Splitting a consumer also returns something called a reducer, we’ll get to its role in a bit. - At some point, to process sequentially, you convert the producer
into an iterator using
into_iter()
and convert the consumer into a folder usinginto_folder()
. You then draw items from the producer and feed them to the folder. At the end, the folder produces a result (of typeC::Result
, whereC
is the consumer type) and this is returned. - As we walk back up the stack, at each point where we had split the
consumer into two, we now have two results, which must be combined
using the reducer (also returned by
split_at()
).
Let’s take a closer look at the folder and reducer. Folders are
defined by the Folder
trait, a simplified version of which
is shown below. They can be fed items one by one and, at the end,
produce some kind of result:
pub trait Folder<Item> {
type Result;
/// Consume next item and return new sequential state.
fn consume(self, item: Item) -> Self;
/// Finish consuming items, produce final result.
fn complete(self) -> Self::Result;
}
Of course, when we split, we will have two halves, both of which will
produce a result. Thus when a consumer splits, it also returns a
reducer that knows how to combine those results back
again. The Reducer
trait is shown below. It just consists
of a single method reduce()
:
pub trait Reducer<Result> {
/// Reduce two final results into one; this is executed after a
/// split.
fn reduce(self, left: Result, right: Result) -> Result;
}
Generalizing sum_producer()
In effect, the consumer abstracts out the “parallel operation” that
the iterator is going to perform. Armed with this consumer trait, we
can now revisit the sum_producer()
method we saw before. That function
was specific to adding up a series of values, but we’d like to produce
an abstract version that works for any consumer. In the Rayon source,
this function is called bridge_producer_consumer
. Here is a
simplified version. It is helpful to compare it to sum_producer()
from before; I’ll include some “footnote comments” (like [1]
, [2]
)
to highlight those differences.
// `sum_producer` was specific to summing up a series of `i32`
// values, which produced another `i32` value. This version is generic
// over any producer/consumer. The consumer consumes `P::Item` (whatever
// the producer produces) and then the fn as a whole returns a
// `C::Result`.
fn bridge_producer_consumer<P, C>(len: usize,
mut producer: P,
mut consumer: C)
-> C::Result
where P: Producer, C: Consumer<P::Item>
{
if len > THRESHOLD {
// Input too large: divide it up
let mid = len / 2;
// As before, split the producer into two halves at the mid-point.
let (left_producer, right_producer) = producer.split_at(mid);
// Also divide the consumer into two consumers.
// This also gives us a *reducer* for later.
let (left_consumer, right_consumer, reducer) = consumer.split_at(mid);
// Parallelize the processing of the left/right halves,
// producing two results.
let (left_result, right_result) =
rayon::join(
|| bridge_producer_consumer(mid, left_producer, left_consumer),
|| bridge_producer_consumer(len - mid, right_producer, right_consumer));
// Finally, reduce the two intermediate results.
// In `sum_producer`, this was `left_result + right_result`,
// but here we use the reducer.
reducer.reduce(left_result, right_result)
} else {
// Input too small: process sequentially.
// Get a *folder* from the consumer.
// In `sum_producer`, this was `let mut sum = 0`.
let mut folder = consumer.into_folder();
// Convert producer into sequential iterator.
// Feed each item to the folder in turn.
// In `sum_producer`, this was `sum += item`.
for item in producer {
folder = folder.consume(item);
}
// Convert the folder into a result.
// In `sum_producer`, this was just `sum`.
folder.complete()
}
}
Implementing the consumer for sum()
Next, let’s look at how one might implement the sum
consumer, so
that we can use it with bridge_producer_consumer()
. As before, we’ll
just focus on a sum
that works on i32
values, to keep things
relatively simple. We’ll start out by declaring a trio of three types
(consumer, folder, and reducer).
struct I32SumConsumer {
// This type requires no state. This will be important
// in the next post!
}
struct I32SumFolder {
// Current sum thus far.
sum: i32
}
struct I32SumReducer {
// No state here either.
}
Next, let’s implement the Consumer
trait for I32SumConsumer
:
impl Consumer for I32SumConsumer {
type Folder = I32SumFolder;
type Reducer = I32SumReducer;
type Result = i32;
// Since we have no state, "splitting" just means making some
// empty structs:
fn split_at(self, _index: usize) -> (Self, Self, Self::Result) {
(I32SumConsumer { }, I32SumConsumer { }, I32SumReducer { })
}
// Folder starts out with a sum of zero.
fn into_folder(self) -> Self::Folder {
I32SumFolder { sum: 0 }
}
}
The folder is also very simple. It takes each value and adds it to the current sum.
impl Folder<i32> for I32SumFolder {
type Result = i32;
fn consume(self, item: i32) -> Self {
// we take ownership the current folder
// at each step, and produce a new one
// as the result:
I32SumFolder { sum: self.sum + item }
}
fn complete(self) -> i32 {
self.sum
}
}
And, finally, the reducer just sums up two sums. The self
goes
unused since our reducer doesn’t have any state of its own.
impl Reducer<i32> for I32SumFolder {
fn reduce(self, left: i32, right: i32) -> i32 {
left + right
}
}
Implementing the consumer for collect()
Now that we’ve built up this generic framework for consumers, let’s
put it to use by defining a second consumer. This time I want to
define how collect()
works; just like in sequential iterators,
collect()
allows users to accumulate the parallel items into a
collection. In this case, we’re going to examine one particular
variant of collect()
, which writes values into a vector:
let c: Vec<_> =
vec1.par_iter()
.zip(vec2.par_iter())
.map(|(i, j)| i * j)
.collect(); // <-- only thing different
In fact, internally, Rayon’s collect()
for vectors is
written in terms of a more efficient primitive,
collect_into()
. collect_into()
takes a mutable reference to a
vector and stores the results in there: this allows you to re-use a
pre-existing vector and avoid allocation overheads. It’s particularly
good for double buffering scenarios. To use collect_into()
explicitly, one would write something like:
let mut c: Vec<_> = vec![];
vec1.par_iter()
.zip(vec2.par_iter())
.map(|(i, j)| i * j)
.collect_into(&mut c);
collect_into()
first ensures that the vector has enough capacity for
the items in the iterator and then creates a particular consumer that,
for each item, will store it into the appropriate place in the vector.
We’re going to walk through a simplified version of the
collect_into()
consumer. This version will be specialized to vectors
of i32
values; moreover, it’s going to avoid any use of unsafe code
and just assume that the vector is initialized to the right length
(perhaps with 0
values). The real version works
for arbitrary types and avoids initialization by using a dab of unsafe
code (just about the only unsafe code in the parallel iterators part
of Rayon, actually).
Let’s start with the type definitions for the consumer, folder, and reducer. They look like this:
struct I32CollectVecConsumer<'c> {
data: &'c mut [i32],
}
struct I32CollectVecFolder<'c> {
data: &'c mut [i32],
index: usize,
}
struct I32SumReducer {
}
These type definitions kind of suggest to you an outline for this is
going to work. When the consumer starts, it has a mutable slice of
integers that it will eventually store into (the &'c mut [i32]
); the
lifetime 'c
here represents the span of time in which the collection
is happening. Remember that in Rust a mutable reference is also a
unique reference, which means that we don’t have to worry about
other threads reading or messing with our array while we store into
it.
When the time comes to switch to the folder, we still have a slice to store into, but now we also have an index. That tracks how many items we have stored thus far.
Finally, the reducer struct is empty, because once the values are stored, there really isn’t any data to reduce. For collect, the reduction step will just be a no-op.
OK, let’s see how the consumer trait is defined. The idea here is
simple: each time the consumer is split at some index N
, it splits
its mutable slice into two halves at N
, and returns two consumers, one with
each half:
impl<'c> Consumer for I32VecCollectConsumer<'c> {
type Folder = I32VecCollectFolder<'c>;
type Reducer = I32VecCollectReducer;
// The "result" of a `collect_into()` is just unit.
// We are executing this for its side effects.
type Result = ();
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
// Divide the slice into two halves at `index`:
let (left, right) = self.data.split_at_mut(index);
// Construct the new consumers:
(I32VecCollectConsumer { data: left },
I32VecCollectConsumer { data: right },
I32VecCollectReducer { })
}
// When we convert to a folder, give over the slice and start
// the index at 0.
fn into_folder(self) -> Self::Folder {
I32VecCollectFolder { data: self.data, index: 0 }
}
}
The folder trait is also pretty simple. Each time we consume a new
integer, we’ll store it into the slice and increment index
:
impl Folder<i32> for I32SumFolder {
type Result = ();
fn consume(self, item: i32) -> Self {
self.data[self.index] = item;
I32CollectVecFolder { data: self.data, index: self.index + 1 }
}
fn complete(self) {
}
}
Finally, since collect_into()
has no result, the “reduction” step
is just a no-op:
impl Reducer<()> for I32CollectVecFolder {
fn reduce(self, _left: (), _right: ()) {
}
}
Conclusion
This post continued our explanation of how Rayon’s parallel iterators
work. Whereas the previous post introduced parallel
producers, this post showed how we can abstract out parallel
consumers as well. Parallel consumers basically represent the
“parallel actions” at the end of a parallel iterator, like sum()
or
collect()
.
Using parallel consumers allows us to have one common routine,
bridge_producer_consumer()
, that is used to draw items from a
producer and feed them to a consumer. This routine thus defines
precisely the parallel logic itself, independent from any particular
parallel iterator. In future posts, we’ll discuss a bit how that same
routine can also use some adaptive techniques to try and moderate
splitting overhead automatically and dynamically.
I want to emphasize something about this post and the previous one:
you may have noticed a general lack of unsafe code. One of the very
cool things about Rayon is that the vast majority of the unsafety is
confined to the join()
implementation. For the most part, the
parallel iterators just build on this new abstraction.
It is hard to overstate the benefits of confining unsafe code in this
way. For one thing, I’ve caught a lot of bugs in the iterator code I
was writing. But even better, it means that it is relatively easy to
unit test and review parallel iterator PRs. We don’t have to worry
about crazy data-race bugs that only crop up if we test for hours and
hours. It’s enough to just make sure we use a variant of
bridge_producer_consumer()
that splits very deeply, so that we test
the split/recombine logic.