Office Hours #1: Cyclic services

24 September 2018

This is a report on the second “office hours”, in which we discussed how to setup a series of services or actors that communicate with one another. This is a classic kind of problem in Rust: how to deal with cyclic data. Usually, the answer is that the cycle is not necessary (as in this case).

The setup

To start, let’s imagine that we were working in a GC’d language, like JavaScript. We want to have various “services”, each represented by an object. These services may need to communicate with one another, so we also create a directory, which stores pointers to all the services. As each service is created, they add themselves to the directory; when it’s all setup, each service can access all other services. The setup might look something like this:

function setup() {
  var directory = {};
  var service1 = new Service1(directory);
  var service2 = new Service2(directory);
  return directory;
}

function Service1(directory) {
  this.directory = directory;
  directory.service1 = self;
  ...
}

function Service2(directory) {
  this.directory = directory;
  directory.service2 = self;
  ...
}

“Transliterating” the setup to Rust directly

If you try to translate this to Rust, you will run into a big mess. For one thing, Rust really prefers for you to have all the pieces of your data structure ready when you create it, but in this case when we make the directory, the services don’t exist. So we’d have to make the struct use Option, sort of like this:

struct Directory {
    service1: Option<Service1>,
    service2: Option<Service2>,
}

This is annoying though because, once the directory is initialized, these fields will never be None.

And of course there is a deeper problem: who is the “owner” in this cyclic setup? How are we going to manage the memory? With a GC, there is no firm answer to this question: the entire cycle will be collected at the end, but until then each service keeps every other service alive.

You could setup something with Arc (atomic reference counting) in Rust that has a similar flavor. For example, the directory might have an Arc to each service and the services might have weak refs back to the directory. But Arc really works best when the data is immutable, and we want services to have state. We could solve that with atomics and/or locks, but at this point we might want to step back and see if there is a better way. Turns out, there is!

Translating the setup to Rust without cycles

Our base assumption was that each service in the system needed access to one another, since they will be communicating. But is that really true? These services are actually going to be running on different threads: all they really need to be able to do is to send each other messages. In particular, they don’t need access to the private bits of state that belong to each service.

In other words, we could rework out directory so that – instead of having a handle to each service – it only has a handle to a mailbox for each service. It might look something like this:

#[derive(Clone)]
struct Directory {
  service1: Sender<Message1>,
  service2: Sender<Message2>,
}

/// Whatever kind of message service1 expects.
struct Message1 { .. }

/// Whatever kind of message service2 expects.
struct Message2 { .. }

What is this Sender type? It is part of the channels that ship in Rust’s standard library. The idea of a channel is that when you create it, you get back two “entangled” values: a Sender and a Receiver. You send values on the sender and then you read them from the receiver; moreover, the sender can be cloned many times (the receiver cannot).

The idea here is that, when you start your actor, you create a channel to communicate with it. The actor takes the Receiver and the Sender goes into the directory for other servies to use.

Using channels, we can refactor our setup. We begin by making the channels for each actor. Then we create the directory, once we have all the pieces it needs. Finally, we can start the actors themselves:

fn make_directory() {
  use std::sync::mpsc::channel;

  // Create the channels
  let (sender1, receiver1) = channel();
  let (sender2, receiver2) = channel();

  // Create the directory
  let directory = Directory {
    service1: sender1,
    service2: sender2,
  };

  // Start the actors
  start_service1(&directory, receiver1);
  start_service2(&directory, receiver2);
}

Starting a service looks kind of like this:

fn start_service1(directory: &Directory, receiver: Receiver<Message1>) {
  // Get a handle to the directory for ourselves.
  // Note that cloning a sender just produces a second handle
  // to the same receiver.
  let mut directory = directory.clone();

  std:🧵:spawn(move || {
    // For each message received on `receiver`...
    for message in receiver {
      // ... process the message. Along the way,
      // we might send a message to another service:
      match directory.service2(Message2 { .. }) {
        Ok(()) => /* message successfully sent */,
        Err(_) => /* service2 thread has crashed or otherwise stopped */,
      }
    }
  });
}

This example also shows off how Rust channels know when their counterparts are valid (they use ref-counting internally to manage this). So, for example, we can iterate over a Receiver to get every incoming message: once all senders are gone, we will stop iterating. Beware, though: in this case, the directory itself holds one of the senders, so we need some sort of explicit message to stop the actor.

Similarly, when you send a message on a Rust channel, it knows if the receiver has gone away. If so, send will return an Err value, so you can recover (e.g., maybe by restarting the service).

Implementing our own (very simple) channels

Maybe it’s interesting to peer “beneath the hood” a bit into channels. It also gives some insight into how to generalize what we just did into a pattern. Let’s implement a very simple channel, one with a fixed length of 1 and without all the error recovery business of counting channels and so forth.

Note: If you’d like to just view the code, click here to view the complete example on the Rust playground.

To start with, we need to create our Sender and Receiver types. We see that each of them holds onto a shared value, which contains the actual state (guarded by a mutex):

use std::sync::{Arc, Condvar, Mutex};

pub struct Sender<T: Send> {
  shared: Arc<SharedState<T>>
}

pub struct Receiver<T: Send> {
  shared: Arc<SharedState<T>>
}

// Hidden shared state, not exposed
// to end-users
struct SharedState<T: Send> {
  value: Mutex<Option<T>>,
  condvar: Condvar,
}

To create a channel, we make the shared state, and then give the sender and receiver access to it:

fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
  let shared = Arc::new(SharedState {
    value: Mutex::new(None),
    condvar: Condvar::new(),
  });
  let sender = Sender { shared: shared.clone() };
  let receiver = Receiver { shared };
  (sender, receiver)
}

Finally, we can implement send on the sender. It will try to store the value into the mutex, blocking so long as the mutex is None:

impl<T: Send> Sender<T> {
  pub fn send(&self, value: T) {
    let mut shared_value = self.shared.value.lock().unwrap();
    loop {
      if shared_value.is_none() {
        *shared_value = Some(value);
        self.shared.condvar.notify_all();
        return;
      }

      // wait until the receiver reads
      shared_value = self.shared.condvar.wait(shared_value).unwrap();
    }
  }
}

Finally, we can implement receive on the Receiver. This just waits until the shared.value field is Some, in which case it overwrites it with None and returns the inner value:

impl<T: Send> Receiver<T> {
  pub fn receive(&self) -> T {
    let mut shared_value = self.shared.value.lock().unwrap();
    loop {
      if let Some(value) = shared_value.take() {
        self.shared.condvar.notify_all();
        return value;
      }

      // wait until the sender sends
      shared_value = self.shared.condvar.wait(shared_value).unwrap();
    }
  }
}

Again, here is a link to the complete example on the Rust playground.

Dynamic set of services

In our example thus far we used a static Directory struct with fields. We might like to change to a more flexible setup, in which the set of services grows and/or changes dynamically. To do that, I would expect us to replace the directory with a HashMap mapping from kind of service name to a Sender for that service. We might even want to put that directory behind a mutex, so that if one service panics, we can replace the Sender with a new one. But at that point we’re building up an entire actor infrastructure, and that’s too much for one post, so I’ll stop here. =)

Generalizing the pattern

So what was the general lesson here? In often happens that, when writing in a GC’d language, we get accustomed to lumping together all kinds of data together, and then knowing what data we should and should not touch. In our original JS example, all the services had a pointer to the complete state of one another – but we expected them to just leave messages and not to mutate the internal variables of other services. Rust is not so trusting.

In Rust, it often pays to separate out the “one big struct” into smaller pieces. In this case, we separated out the “message processing” part of a service from the rest of the service state. Note that when we implemented this message processing – e.g., our channel impl – we still had to use some caution. We had to guard the data with a lock, for example. But because we’ve separated the rest of the service’s state out, we don’t need to use locks for that, because no other service can reach it.

This case had the added complication of a cycle and the associated memory management headaches. It’s worth pointing out that even in our actor implementation, the cycle hasn’t gone away. It’s just reduced in scope. Each service has a reference to the directory, and the directory has a reference to the Sender for each service. As an example of where you can see this, if you have your service iterate over all the messages from its receiver (as we did):

for msg in self.receiver { .. }

This loop will continue until all of the senders associated with this Receiver go away. But the service itself has a reference to the directory, and that directory contains a Sender for this receiver, so this loop will never terminate – unless we explicitly break. This isn’t too big a surprise: Actor lifetimes tend to require “active management”. Similar problems arise in GC systems when you have big cycles of objects, as they can easily create leaks.