Blog Index

IRPC

by Rüdiger Klaehn

IRPC - a lightweight rpc crate for iroh connections

When writing async rust code such as iroh protocols, you will frequently use message passing to communicate between independent parts of your code.

You will start by defining a message enum that contains the different requests your task is supposed to handle, and then write a loop inside the handler task, like a very primitive version of an actor.

Let's do a simple example, an async key value store, with just Set and Get.

enum Request {
  Set {
    key: String,
    value: String,
    response: oneshot::Sender<()>,
  }
  Get {
    key: String,
    response: oneshot::Sender<Option<String>>,
  }
}

Your "client" then is a tokio mpsc::Sender<Command> or a small wrapper around it that makes it more convenient to use. And your server is a task that contains a handler loop.

Calling such a service is quite cumbersome, e.g. calling Get:

let (tx, rx) = oneshot::channel();
client.send(Command::Get { key: "a".to_string(), response: tx }).await?;
let res = rx.await?;

So you will usually write a client struct that is a newtype wrapper around the mpsc Sender to add some syntax candy:

struct Client(mpsc::Sender<Request>);
impl Client {
  ...
  async fn get(&self, key: String) -> Result<Option<String>> {
    let (tx, rx) = oneshot::channel();
    self.0.send(Request::Get { key, response: tx }).await?;
    Ok(rx.await??)
  }
  ...
}

If you want to have some more complex requests, no problem. E.g. here is how a request would look like to add an entry from a stream:

enum Request {
  ...
  SetFromStrean {
    key: String,
    value: mpsc::Receiver<String>,
    response: oneshot::Sender<()>,
  }
  ...
}

Or a request that gets a value as a stream:

enum Request {
  ...
  GetAsStream {
    key: String,
    response: mpsc::Sender<Result<String>>,
  }
  ...
}

You already have an async boundary and a message passing based protocol, so it seems like it would be easy to also use this protocol across a process boundary. But you still want to retain the ability to use it in-process with zero overhead.

To cross a process boundary, the commands have to be serializable. But the response or update channels are not. We need to separate the message itself and the update and response channels.

At this point things start to get quite verbose:

#[derive(Serialize, Deserialize)]
struct GetRequest {
  key: String,
}

#[derive(Serialize, Deserialize)]
struct SetRequest {
  key: String,
  value: String,
}

/// the serializable request. This is what the remote side reads first to know what to do
#[derive(Serialize, Deserialize)]
enum Request {
  Get(GetRequest),
  Set(SetRequest),
}

/// the full request including response channels. This is what is used in-process.
enum RequestWithChannels {
  Get { request: GetRequest, response: oneshot::Sender<String> },
  Set { request: SetRequest, response: oneshot::Sender<()> },
}

impl From<RequestWithChannels> for Request { ... }

How does the actual cross process communication look like, for example for get? Let's use postcard for serialization/deserialization:

async fn get_remote(connection: Connection, key: String) -> Result<Option<String>> {
  let (send, recv) = connection.open_bi().await?;
  send.write_all(postcard::to_stdvec(GetRequest { key })?).await?;
  let res = recv.read_to_end(1024).await?;
  let res = postcard::from_bytes(&res)?;
  Ok(res)
}

The server side looks similar. We read a Request from an incoming connection, then based on the enum case decide which request we need to handle:

async fn server(connection: Connection, store: BTreeMap<String, String>) -> Result<()> {
  while let Ok((send, recv)) = connection.accept_bi().await {
    let request = recv.read_to_end(1024).await?;
    let request: Request = postcard::from_bytes(&request)?;
    match request {
      Request::Get(GetRequest { key }) => {
        let response = store.get(key);
        let response = postcard::to_stdvec(&response)?;
        send.write_all(&response).await?;
        send.finish();
      }
      ...
    }

  }
}

This works well for simple requests where there is no update channel and just a single response. But we also want to support requests with updates like SetFromStrean and requests with stream responses like GetAsStream.

To support this efficiently, it is best to length prefix both the initial request, subsequent updates, and responses. Even if a Request "knows" its own size, deserializing from an async stream is very inefficient.

Now we have a protocol that supports different rpc types (rpc, client streaming, server streaming, bidi streaming) and that can be used both locally (via the FullRequest enum) and remotely.

But we said that we wanted to be able to seamlessly switch between remote or local. So let's do that (length prefixes omitted):

enum Client {
  Local(mpsc::Sender<FullRequest>),
  Remote(quinn::Connection),
}

impl Client {
  async fn get(&self, key: String) -> Result<Option<String>> {
    let request = GetRequest { key };
    match self {
      Self::Local(chan) => {
        let (tx, rx) = oneshot::channel();
        let request = FullRequest { request, response: tx };
        chan.send(request).await?;
        Ok(rx.await??)
      }
      Self::Remote(conn) => {
        let (send, recv) = connection.open_bi().await?;
        send.write_all(postcard::to_stdvec(request)?).await?;
        let res = recv.read_to_end(1024).await?;
        let res = postcard::from_bytes(&res)?;
        Ok(res)
      }
    }
  }
}

This is all pretty straightforward code, but very tedious to write, especially for a large and complex protocol.

There is some work that we can't avoid. We have to define the different request types. We have to specify for each request type if there is no response, a single resposne, or a stream of responses. We also have to specify if there is a stream of updates, and make sure that all these types (requests, updates and responses) are serializable, which can sometimes be a pain when it comes to error types.

But what about all this boilerplate?

  • Defining the two different enums for a serializable request and a full request including channels
  • Implementing a client with async fns for each request type
  • Implementing a server that reads messages and dispatches on them
  • serializing and deserializing using postcard with length prefixes

The irpc crate is meant solely to reduce the tedious boilerplate involved in writing the above manually.

It does not abstract over the connection type - it only supports [iroh-quinn] send- and receive streams out of the box, so the only two possible connection types are iroh p2p QUIC connections and normal QUIC connections. It also does not abstract over the local channel type - a local channel is always a tokio mpsc channel. Serialization is always postcard, and length prefixes are always postcard varints.

So let's take a look how the kv service looks using irpc:

The service definition contains just what is absolutely needed. For each request type we have to define what the response item type is (in this case String or ()), and what the response channel type is (none, oneshot or mpsc).

The rpc_requests macro will store this information and also create the RequestWithChannels enum that adds the appropriate channels for each request type. It will also generate a number of From-conversions to make working with the requests more pleasant.

struct KvService {}
impl Service for KvStoreService {}

#[rpc_requests(KvService, message = RequestWithChannels)]
#[derive(Serialize, Deserialize)]
enum Request {
  #[rpc(tx=oneshot::Sender<String>)]
  Get(GetRequest),
  #[rpc(tx=oneshot::Sender<()>)]
  Put(PutRequest),
}

Now let's look at the client:

struct Client(irpc::Client<RequestWithChannels, Request, KvService>);
impl Client {
  fn get(&self, key: String) -> Result<Option<String>> {
    Ok(self.0.rpc(GetRequest { key }).await?)
  }
}

The fn rpc on irpc::Client will only be available for messages where the update channel is not set and the response channel is an oneshot channel, so you will get compile errors if you try to use a request in the wrong way.

Iroh is a dial-any-device networking library that just works. Compose from an ecosystem of ready-made protocols to get the features you need, or go fully custom on a clean abstraction over dumb pipes. Iroh is open source, and already running in production on hundreds of thousands of devices.
To get started, take a look at our docs, dive directly into the code, or chat with us in our discord channel.