Data Parallelism in Rust

11 June 2013

Rust currently has very strong support for concurrency in the form of actors which exchange messages and do not share memory. However, there are many tasks for which actors are not a good fit. The unbounded lifetime of actors means that they cannot safely access stack-allocated memory from another task, even if it is immutable. Actors cannot share memory except through the relatively clumsy (and somewhat expensive) mechanism of Arc structures (which stands for “atomic reference count”), meaning that if there are large data structures they can be a pain to access. Arc is also inapplicable to data structures that transition between mutable and immutable and back again.

I was recently pointed to the paper “Three layer cake for shared-memory programming”, which describes perfectly the direction in which I would like to see Rust go. Message passing is retained at the top-level, but within an actor, you have support for fork-join concurrency. I’ve been calling these fork-join tasks “jobs”. Within a job, you can get yet more concurrency via SIMD primitives.

To that end, I’ve been working on promising design for fork-join concurrency in Rust. I am very pleased both because the API looks like it will be simple, flexible, and easy to use, and because we are able to statically guarantee data-race freedom even with full support for shared memory with only minimal, generally applicable modifications to the type system (closure bounds, a few new built-in traits). The scheme also requires no changes to the runtime; it can be implemented simply as a library.

In particular, the existing borrow checker rules, which were aimed at preventing dangling pointers, turn out to be extremely well-suited to this task. I find this very interesting and very heartening as well, and I think it points to a kind of deeper analogy between memory errors in sequential programs and data races in parallel programs. I will elaborate on this theory in a later post.

The API

The API is based on a fork-join model. There are a number of helper functions, each of which starts up a number of parallel jobs and returns when those jobs have completed. The API builds on Rust’s type system to statically guarantee data-race freedom. Under typical usage, in fact, the API even guarantees deterministic results, though this guarantee can be voided by making use of locks, ports, and certain other advanced types.

I have adopted the term job to distinguish the lightweight parallel tasks that from Rust’s normal tasks. Although both can be used for parallel execution, they are quite different in most other respects. For example, jobs all share the same memory space as their parent task, whereas tasks are strictly isolated from one another. Jobs also have a fixed lifetime, whereas tasks run asynchronously. In general, executing a parallel job is supposed to act exactly as though the text of that job were inlined into the task body, modulo certain observable timing differences (and nondeterminism around locks and message ordering).

To some extent, the API that I’m going to discuss is a strawman. Expect changes to the details, of course. Also, I only present the primitive operations here; there will be a number of higher-level wrappers for common operations (like parallel map and reduce and so forth). In keeping with the “three layer cake” idea, these higher-level primitives may employ SIMD as well.

Parallel execute

The most primitive method in Parallel arsenal is parallel::execute(jobs), which takes an array of closures and executes the closures in parallel jobs (well, at least potentially—it may opt to run them sequentially if insufficient parallel resources exist). Once these jobs complete, execute() returns.

These closures are permitted to share memory and capture values by reference. We will see later that the type checker will validate that if one of these jobs writes to a particular value, then none of the other jobs can access it; you are allowed however to access the same data so long as all jobs treat it as immutable.

To make the usage more concrete, here is an example that uses parallel::execute to sum up the values in a binary tree in parallel. Note that no heap allocation is required at any point in the iteration, which is nice from a performance point of view.

struct Tree {
  val: uint,
  left: Option<~Tree>,
  right: Option<~Tree>,
}

fn sum_tree(tree: &Tree) -> uint {
  let mut left_sum = 0;
  let mut right_sum = 0;
  
  parallel::execute([
    || left_sum = sum_opt_tree(&tree.left),
    || right_sum = sum_opt_tree(&tree.right),
  ]);
  
  left_sum + right_sum + tree.val
}

fn sum_opt_tree(tree: &Option<~Tree>) -> uint {
  match tree {
     Some(~ref t) => sum_tree(t),
     None => 0,
  }
}

The type of execute() is:

fn execute(jobs: &[fn:Isolate()])

Here the fn:Isolate() means “a closure that only closes over state that is either shared or isolated from other threads”. I’ll explain this bound and the other details of static safety checking shortly.

Parallel divide

The other primitive parallel operation is divide(), which takes a mutable slice and a closure. It will divide up the slice into a number of subslices and invoke your closure on it; the precise means that it uses to divide the subslice is unspecified. You can also configure divide() with the minimum granularity that it should use when dividing the array (for example, you might prefer to be called back with subslices that are a multiple of 10 items).

In a way, execute() is more fundamental than divide(), as you can implement divide() using execute() by recursively dividing the array in half. However, I choose to call divide() a second primitive because it can be far more efficient to divide the array using other strategies, such as determining how many worker threads are available and just chopping the slice into N equal parts (of course this will depend on the workload).

The reason that divide() provides you with a mutable subslice, rather than say a pointer to an individual element, is that often there is setup work that can be shared between consecutive elements when processing an array. Consider the following example, which treates a vector v as a 2d array, and invokes the closure f in parallel with each element in the array and its x and y coordinate. In this case, the initial computation of x and y from the index in v is relatively expensive, as it requires division, but updating x and y is very cheap. Using divide(), the initial division can be amortized over a large subslice.

fn update_two_dim<T>(v: &mut [T],
                     width: uint,
                     f: fn:Share(&mut T, x: uint, y: uint)) {
  assert!(v.len() % width == 0);                     
  do parallel::divide(array, 1) |slice, offset| {
    let mut (x, y) = (offset % width, offset / width);
    for slice.each_mut |p| {
      update(p, x, y);
      x += 1;
      if x == width {
        x = 0;
        y += 1;
      }
    }
  }
}

The type of divide() is:

fn divide<T>(data: &mut [T],
             granularity: uint,
             job: fn:Share(&mut [T], uint))

You’ll note that the type of the closure is different from execute(). Whereas before we had fn:Isolate, for divide() we have fn:Share. This is in fact a tighter bound that only permits content that can safely be accessed in parallel by multiple threads. The difference is due to the fact that, with execute(), each closure got its own parallel job, but with divide(), the same closure will be called simultaneously from multiple jobs.

Checking safety

So how can we guarantee that these APIs are used safely? For example, what ensures that the user doesn’t write a program like this one (variations on this example will serve as examples through this section):

fn compute_foo_and_bar(dataset: &[uint]) -> (uint, uint) {
    let mut foo = 0;
    let mut bar = 0;
    parallel::execute([
        || foo = compute_foo(dataset),
        || foo = compute_bar(dataset), // <-- Bug here!
    ]);
    (foo, bar)
}
fn compute_foo(dataset: &[uint]) -> uint { ... }
fn compute_bar(dataset: &[uint]) -> uint { ... }

In this example, the function compute_foo_and_bar() creates two closures, one of which creates compute_foo() and one of which invokes compute_bar(). The two closures run in parallel. However, there is a slight bug: both closures write to foo, though presumably the author meant for the second closure to write to bar. Therefore, this program will be rejected as racy: let’s see how the borrow checker comes to that conclusion.

Desugaring closures

In fact, this conclusion falls out of the normal borrow checker safety rules for closures (more accurately, it will fall out of those rules once I fix them). The way the borrow checker handles closures is essentially to “desugar” them into the pair of a struct that contains pointers and a fn pointer. To see what, let’s examine the first closure from our example in more detail:

|| foo = compute_foo(dataset)

If we wanted to model this closure more precisely, we could consider it as the pair of an environment struct and a function. It would look something like this:

struct FooEnv {
    foo: &mut uint,
    dataset: &[uint]
}

fn foo_fn(env: &mut TheEnv) {
    *env.foo = compute_foo(env.dataset);
}

This means that the call to parallel::execute would look something like this:

parallel::execute([
  // Roughly equivalent to `|| foo = compute_foo(dataset)`
  (&FooEnv {foo: &mut foo, dataset: dataset}, foo_fn),

  // Roughly equivalent to `|| bar = compute_bar(dataset)`
  (&BarEnv {bar: &mut foo, dataset: dataset}, bar_fn),
])

This is in fact the kind of code that gets generated at runtime. The interesting thing about looking at closure creations this way is that we can apply the standard Rust borrowing rules to them. In particular, we see that there are two mut borrows of the variable foo, and those borrows have overlapping lifetimes, which is not permitted.

The role of closure bounds

As the previous example showed, the borrow checker’s normal rules already gives us many of the guarantees we require. We covered specifically how the borrow checker handles closures, but in general the borrow check guarantees that:

  • &mut T pointers are the only way to modify the memory that they point at (thus guaranteeing that if job A has an &mut pointer, no other job can be writing that memory);
  • &T pointers are immutable (thus guaranteeing that if job A has a &T pointer, no job can be writing to that memory).

However, there are several guarantees that the borrow checker does not provide which are unnecessary in a sequential context but become important when discussing data races:

  • Although &mut T pointers are the only way to mutate the memory they point at, they are not the only way to read the memory they point at. Both &const T and @mut T can produce read-only aliases; thus is one job holds an &mut T and another an &const T, the two jobs could race.
  • The Rust standard library includes a number of types that include “interior mutability”, meaning mutability inherent in the type itself, vs imposed from the outside. Examples are @mut, RcMut, and Cell. Many of these types are not threadsafe, with the notable exception of RwArc, which employs mutual exclusion.
  • Managed pointers like @T and @mut T are currently not safe to pass between parallel jobs, though in the case of @T this is an implementation limitation and not a theoretical one.

What all this means is that if the parallel::execute() accepted an array of any old closures, it could be sure that those closures would not race on any &mut T or &T values that they may have access to, but races could arise if those closures had access to &const T values (not to mention @mut T, Cell, and so on).

There is a similar danger for parallel::divide(). Unlike execute(), which accepts an array of closures and executes each one from its own parallel job, divide() accepts a single closure and it invokes that same closure many times in parallel. This means that if the closure were close over an &mut pointer, that same pointer would be available to many threads, and thus races could arise.

Enter closure bounds

We prevent both of these scenarios with closure bounds. A closure bound is a limitation on the kinds of values that a closure can contain in its environment. This is directly analogous to the bounds that appear on type parameters in generic functions. For example, if we declare a generic function f like so:

fn f<T:Freeze>(v: &[T]) { ... }

The bound :Freeze on T indicates that T may only be used with values that are freezable (which means “no interior mutability”, so e.g. @mut and Cell are excluded). Similarly, the type fn:Freeze() would indicate a function whose environment contained only freezable values.

As currently planned, closure bounds are less general than the bounds which appear on type parameters, in that they are limited to the “built-in” traits like Freeze and Send. These traits differ from other traits in that they offer no methods and you never explicitly implement them. The traits are used to describe properties of the data in question, like whether it may be mutable, and the compiler just decides automatically whether a given type belongs to this trait or not. Normally, the compiler would never consider a closure type fn() to be a member of such a trait, because it does not know what data the closure contains, but this does not necessarily apply to bounded closure types. For example, the type fn:Freeze() is itself a member of Freeze.

Perhaps in the future we can extend closure bounds to arbitrary traits. For example, a type like fn:Eq() might permit deep comparison of closure environments, or fn:Clone() could permit deep cloning. I haven’t thought deeply about this, though, and it would presumably require some sort of impl based on reflection.

Closure bounds apply to “desugared” environments

One thing that is not entirely obvious is that closure bounds refer to the desugared references found in a closure environment. Recall that a closure like || foo = compute_foo(dataset) in fact desugars to an environment where foo is represented as an &mut pointer:

struct FooEnv {
    foo: &mut uint,
    dataset: &[uint]
}

This means that such a closure would not be considered to meet the bound Freeze, because in its environment the field foo is an &mut pointer, which is not freezable. This is true even though the type of the variable foo is just uint, which does meet Freeze.

As a side-effect, a closure of type fn:Send() can only exist as a coercion from an environment-less function, since any true closure will have borrowed pointers of some kind in its environment, and they are not sendable.

Hat tip: Promising Young Intern bblum first expressed this view of closure bounds. It is much cleaner than the formulation I had before, which included some ad-hoc rules to achieve the same effect.

execute and the Isolate bound

You may recall that the parallel::execute() fn was declared as fn execute(jobs: &[fn:Isolate()]). As the type indicates, it accepts an array of closures that carry the Isolate bound. The Isolate bound accepts “all values that can be transmitted and isolated to a single parallel job”, which in practice means:

  • &mut T where T:Isolate
  • &T where T:Isolate
  • Scalar values like uint, int, etc
  • Structs, tuples, and enums if all components can be isolated, and the types are not internally mutable
  • The “atomic ref count” type Arc
  • The “mutex” type RwArc
  • Closures that can be isolated (i.e., fn:Isolate(T1, T2) -> T3, where T1, T2, and T3 need not meet any bound in particular)

Note that this list excludes both &const T and @mut T, as well as types like Cell, Rc, and RcMut that employ internal mutation. The list still includes a number of mutable types, however:

  • &mut T is safe because, as discussed previously, if one closure has access to an &mut T, then none of the other closures has access to that same value.
  • Arc is safe because its reference counters are maintained atomically.
  • RwArc is safe because it uses a mutex to guarantee that any mutation which occurs is always threadsafe.

The Isolate bound excludes all managed data. More discussion on this topic is found below.

divide and the Share bound

The divide function is declared as follows:

fn divide<T:Isolate>(data: &mut [T],
                     granularity: uint,
                     job: fn:Share(&mut [T], uint))

Note that the job closure requires the Share bound, which specifies data that can be safely shared amongst many parallel jobs (meaning, the same value can be accessed simultaneously by many jobs at once). The data that is being divided only requires the Isolate bound, since that data will never be accessible to multiple jobs at a time.

The definition of Share is a subset of Isolate that excludes &mut:

  • &T where T:Share
  • Scalar values like uint, int, etc
  • Structs, tuples, and enums if all components can be shared, and the types are not internally mutable
  • The “atomic ref count” type Arc
  • The “mutex” type RwArc
  • Closures that can be shared (i.e., fn:Share(T1, T2) -> T3, where T1, T2, and T3 need not meet any bound in particular)

Note that it is still safe to share RwArc (the mutex type).

Relationship of Isolate and Share to existing bounds

Rust already has two “built-in” bounds, Freeze and Send, which are both somewhat more strict that Isolate and Share (Freeze, for example, would reject RwArc, and Send would reject all &T or @T values). I think the full hierarchy is as follows:

Isolate
  Share
    Freeze
    Send

Here the indentation is intended to indicate an inclusion relationship. In other words, anything that meets Send or Freeze meets both Isolate and Share, but not vice versa. Similarly, anything that meets Share meets Isolate, but not vice versa.

Extending to permit sharing of managed data

In general we can implement a minimal version of this scheme with no changes to the runtime. The parallel job dispatching can be implemented as a library building on the existing scheduler. Unsafe code would be used to “leak” the closures outside of one task and into another. This is safe because we know that parent thread will be blocking, and thus the stack-allocated closures will remain valid; data-race-freedom is then guaranteed by the mechanisms I have already discussed. Once the jobs are executed over on the tasks, messages are sent back and the execute or divide function returns and permits the main thread to resume executing.

If we wanted to allow @T values to be passed to parallel jobs, we would have to make some deeper changes. First, this is incompatible with the non-atomic reference counting and cycle collection mechanism we use today. Even when we move to a tracing collector, though, we will have to generalize the collector to permit multiple parallel jobs to access in parallel. This is still a more limited setting than a full-on cross-process garbage collector, as you find in JVMs, but it will nonetheless add significant complexity. Similar issues arise in PJS; I have a blog post coming out soon that will discuss some of our plans in that regard, most of which apply equally to Rust, though not entirely, because the Rust model is more general than PJS.

Note that nothing prevents parallel jobs from allocating and using @T and other values internally. This is fine; we are guaranteed that these values cannot outlive the parallel job because the closure cannot closure over any location that could store such values, and the closure argument and return types must always meet the Isolate bound.

Summary

This post summarizes my current thinking about how to achieve ligher-weight data parallelism in Rust. The critical differences between the parallel jobs described here and the existing tasks are that jobs would have access to borrowed pointers and thus to data found in the stack frame and even, to some extent, managed heap of the parent task. Jobs would always have a fixed lifetime and follow a fork-join pattern. Parallel jobs as I describe form the “second layer” in the “three layer cake” of parallel programming, enabling fork-join parallelism while retaining deterministic semantics (modulo the use of RwArc). They can also form the basis for a number of higher-level APIs and combine well with future extensions to support SIMD.