Richard Grantham
a child falling over a metal bar by Zachary Kadolph

Skill Issues; or, Building an Event Bus in Rust

Filed under Development , Rust on

This is an article that’s been playing on my mind for quite a while, but I wasn’t sure how best to approach it. There are a couple of things I wanted to explore but finding the right angle was tricky. It also concerns a subject which was a little confronting and forced a bit of introspection. However, the end result was quite positive.

I enjoy watching The Primeagen’s stuff on YouTube when time allows. A few months back he discussed an article critical of Rust, the language, the syntax, the complexity. Everything really. Actually, a lot of the points he made were totally accurate. Rust gets hard when you start doing asynchronous stuff. I felt a pang of smugness at looking at the guy’s function featuring the code

Pin<Box<dyn Future<Output = Result<R, String>> + Send + '_>> + Send

and recognising that it is basically defining the BoxFuture trait from the futures crate:

pub type BoxFuture<'a, T> = Pin<alloc::boxed::Box<dyn Future<Output = T> + Send + 'a>>;

I have use the LocalBoxFuture version which doesn’t save the Send requirement. In fact, I created my own type around it:

pub type FutureResult<'a, T> = LocalBoxFuture<'a, eyre::Result<T>>;

Now, here comes the hubris. It’s not the same, is it? The lifetimes are different, and that matters. Lifetimes around asynchronous operations is a Hard Thing in Rust. I think I’ve got a pretty good handle of asynchronous programming having done a lot complicated stuff in Java. I enjoy working with the Vert.x toolkit which is concurrent and asynchronous by design and makes you think and work in that world. I was able to transfer this experience to Rust, and once I got past the complications of the “Rust way” of doing stuff I had it all working. It was ugly though. I was still stuck in Javaland.

Kent Beck coined the phrase “make it work, make it right, make it fast”, but I had stopped at the make it work part. It was fast, it was reusable, but still really ugly and didn’t fully conform to the Rust way of doing things. For one, I’d managed to completely avoid using the async/await construct. Not because I didn’t understand it or didn’t want to, but was down to the skill issues and having problems with, and not wanting to use, the async-trait crate. Basically, I wasn’t good enough, I hadn’t read and experimented enough, and was still happy to write Rust that looked like Java.

I bring this up because what I really wanted to do was build an event bus in Rust. Why? I mean, there are no shortage of them on crates.io. It comes back to my appreciation of Vert.x. I like the event bus that it provides and I wanted the same functions that it provides and none of the event buses I looked at worked the same way. I had tried early on in the process of learning Rust to create it myself, but I just didn’t have the skills or the knowledge yet. The features I wanted were

I’d made a couple of attempts to do this before, but skill issues prevented me from getting very far. I just wasn’t good enough. I’m still not sure I’m good enough. However, this was the time to give feel the fear and it a real go anyway. The first question was how? I considered writing using Tokio, but that was too low-level for a scared idiot like me. I actually wanted to use Actix actors. I already had it my stack with the Web framework. Maybe I could make that work.

Setting Up the Event Bus

Okay, how do we start this? Let’s jump in and create the actor for the event bus. That should be relatively simple:

#[derive(Default)]
pub struct EventBus {
}

impl Actor for EventBus {
    type Context = Context<Self>;

    fn started(&mut self, _: &mut Self::Context) {}
}

impl SystemService for EventBus {}

impl Supervised for EventBus {}

Great! Done! But next lies the challenge, and one I had stumbled on for months of considering this project. How to a handle different types of message? I’m going to be sending all kinds of events to my event bus. How do I handle them all. I couldn’t realistically write handlers for all kinds of events - even with a macro, this is a library and I wouldn’t be extending it when used. All types of event would need to be handled in the library. Which means limiting the type of messages the event bus, and handers registered on the event bus could receive.

The solution was actually staring me in the face. I remembered Vert.x’s default way of handling this: By serialising the messages to JSON. You can actually serialise things any which way you want provided you write the transcoding mechanism, but it’s simpler to stick with JSON. Once I’d finished kicking myself for not thinking of this sooner I set about an implementation.

#[derive(Message, Debug)]
#[rtype(result = "eyre::Result<Payload>")]
pub struct EventBusMessage {
    pub payload: Payload,
}

#[derive(Message, Clone, Debug, Eq, PartialEq)]
#[rtype(result = "()")]
pub enum Payload {
    Empty,
    String { value: String },
    Json {
        payload_type: String,
        value: String,
    },
}

type EventHandler = Recipient<EventBusMessage>;

This allowed me to define the EventHandler type as a Recipient of EventBusMessages. I could now complete my event bus struct.

#[derive(Default)]
pub struct EventBus {
    event_handlers_map: BTreeMap<String, VecDeque<(String, EventHandler)>>,
}

The event_handlers_map is a map of addresses to a list of EventHandlers at that address. It’s a tuple with a String so that I can name the handler for debugging purposes. A VecDeque was used to support the “non-strict round robin” method for implementing point-to-point and request-response messaging when more than one handler are registered at an address. Next I implemented registering an event handler with the event bus.

#[derive(new, Message, Debug)]
#[rtype(result = "()")]
pub struct Register {
    address: String,
    name: String,
    event_handler: EventHandler,
}

impl Handler<Register> for EventBus {
    type Result = ();

    fn handle(&mut self, msg: Register, _: &mut Self::Context) -> Self::Result {
        let mut event_handlers: VecDeque<(String, EventHandler)> = self.event_handlers_map
            .remove(&msg.address)
            .unwrap_or_default();
        event_handlers.push_back((msg.name, msg.event_handler));
        self.event_handlers_map.insert(msg.address, event_handlers);
    }
}

Handling Request-Response Messages

Now that we have that, we have to be able to send messages to it. The first message I implemented was the request-response feature. This involves defining a Request method for which we can implement a Handler. The result of which will be a eyre::Result<Payload>.

#[derive(Debug)]
pub enum EventBusError {
    HandlerNotFound { address: String },
}

impl EventBusError {
    fn handler_not_found(address: String) -> Report {
        Report::new(HandlerNotFound { address })
    }
}

impl Display for EventBusError {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            HandlerNotFound { address } => write!(
                f,
                "No handlers registered at address {}.",
                address
            ),
        }
    }
}

impl Error for EventBusError {}

#[derive(Message, Debug)]
#[rtype(result = "eyre::Result<Payload>")]
pub struct Request {
    address: String,
    payload: Payload,
}

impl Request {
    pub fn new(address: String, payload: Payload) -> Self {
        Self {
            address,
            payload,
        }
    }
}

impl Handler<Request> for EventBus {
    type Result = ResponseFuture<eyre::Result<Payload>>;

    fn handle(&mut self, msg: Request, _ctx: &mut Self::Context) -> Self::Result {
        let Some(event_handlers) = self.event_handlers_map.get_mut(&msg.address) else {
            return future_error(EventBusError::handler_not_found(msg.address))
        };
        if event_handlers.len() == 0 {
            return future_error(EventBusError::handler_not_found(msg.address));
        }

        let event_handler = if event_handlers.len() == 1 {
            event_handlers.get(0).unwrap().clone()
        } else {
            let front = event_handlers.pop_front().unwrap();
            event_handlers.push_back(front);
            event_handlers.back().unwrap().clone()
        };

        Box::pin(async move {
            match event_handler.1.send(EventBusMessage::from(msg)).await {
                Ok(result) => result,
                Err(err) => Err(Report::new(err))
            }
        })
    }
}

This first tries to get registered event handler for the address. If the map does not contain the address as a key or the handlers list is empty then an error is returned. If there is only one handler in the list then the message is sent to that handler. If there is more than one, then the first handler is popped off and pushed to the back of the list. The message is then sent to that handler. This provides the non-strict round robin algorithm for handling request-response messages with more than one handler registered at an address. future_error is a convenience method that wraps a eyre::Report in a ResponseFuture. It looks like this:

pub fn future_error<T: 'static>(report: Report) -> ResponseFuture<eyre::Result<T>> {
    Box::pin(future::err(report))
}

ResponseFuture is defined in the Actix library like this:

pub type ResponseFuture<I> = Pin<Box<dyn Future<Output = I>>>;

Handling Point-to-Point Messages

Point-to-point messages are similar to request-response messages in that they use the same non-strict round robin algorithm for delivering a message to a handler, but differ in that they don’t return a response, be that a success or failure. It’s like a fire-and-forget message.

#[derive(Message, Clone)]
#[rtype(result = "()")]
pub struct Send {
    address: String,
    payload: Payload,
}

impl Send {
    pub fn new(address: String, payload: Payload) -> Self {
        Self {
            address,
            payload,
        }
    }
}

impl Handler<Send> for EventBus {
    type Result = ();

    fn handle(&mut self, msg: Send, ctx: &mut Self::Context) -> Self::Result {
        let Some(event_handlers) = self.event_handlers_map.get_mut(&msg.address) else {
		        // no handlers registered
            return;
        };
        if event_handlers.len() == 0 {
		        // no handlers registered
            return;
        }

        let event_handler = if event_handlers.len() == 1 {
            event_handlers.get(0).unwrap().clone()
        } else {
            let front = event_handlers.pop_front().unwrap();
            event_handlers.push_back(front);
            event_handlers.back().unwrap().clone()
        };

        let fut = Box::pin(event_handler.1.send(EventBusMessage::from(msg))
            .then(|res| {
                match res {
                    Ok(res) => if let Err(report) = res {
	                    // if the handler returned an error you can log it here
                    },
                    Err(mailbox_error) => {
	                    // if the handler returned a mailbox error you can log it here
                    }
                }
                future::ready(())
            }));
        let actor_future = fut.into_actor(self);
        ctx.spawn(actor_future);
    }
}

Handling Publish Messages

A publish message is send to all handlers registered at and address. If any of those handlers return an error it is not returned. Just like the point-to-point message, it’s fire and forget. Any error logging should be taken care of within the publish handler. I’ve left out the logging because how I log may not be the same as how you log.

#[derive(Message, Clone, Debug)]
#[rtype(result = "()")]
pub struct Publish {
    address: String,
    payload: Payload,
}

impl Publish {
    pub fn new(address: String, payload: Payload) -> Self {
        Self {
            address,
            payload,
        }
    }
}

impl Handler<Publish> for EventBus {
    type Result = ();

    fn handle(&mut self, msg: Publish, ctx: &mut Self::Context) -> Self::Result {
        let Some(event_handlers) = self.event_handlers_map.get_mut(&msg.address) else {
		        // no handlers registered
            return;
        };
        if event_handlers.len() == 0 {
		        // no handlers registered
            return;
        }

        let event_handlers_clone = event_handlers.clone();
        let mut requests = Vec::new();
        for event_handler in event_handlers_clone {
            requests.push(event_handler.1.send(EventBusMessage { payload: msg.payload.clone() }));
        }
        let fut = Box::pin(join_all(requests).then(|res| {
            res.into_iter()
                .for_each(|item| match item {
                    Ok(res) => if let Err(report) = res {
	                    // if the handler returned an error you can log it here
                    },
                    Err(mailbox_error) => {
	                    // if the handler returned a mailbox error you can log it here
                    }
                });
            future::ready(())
        }));
        let actor_future = fut.into_actor(self);
        ctx.spawn(actor_future);
    }
}

Testing it Out

Consider the following very simple example handler. It simply returns the payload that was supplied.

struct Echo {}

impl Actor for Echo {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        EventBus::from_registry()
            .send(Register::new("echo".into(), "Echo".into(), ctx.address().recipient()))
            .into_actor(self)
            .then(|_, _slf, _| fut::ready(()))
            .spawn(ctx);
    }
}

impl Handler<EventBusMessage> for Echo {
    type Result = eyre::Result<Payload>;

    fn handle(&mut self, msg: EventBusMessage, _ctx: &mut Self::Context) -> Self::Result {
        Ok(msg.payload)
    }
}

We can write the following test to ensure the payload we’re sending is returned:

#[actix::test]
async fn test_echo_handler() {
    Echo {}.start();
    sleep(Duration::from_millis(50)).await;
    let request = Request::new("echo".into(), Payload::String { value: "Hello, world!".into() });
    match EventBus::from_registry().send(request).await.unwrap() {
        Ok(payload) => assert_eq!(Payload::String { value: "Hello, world!".into() }, payload),
        Err(err) => panic!("Unexpected error {}", err)
    }
}

If you’re wondering about the sleep() call, it’s because registering the handler with the event bus is done asynchronously. Unless I wait for it to finish the test message will get sent before the handler registration has been performed. This is not ideal, but I can deal with it with for unit testing.

So, What’s the Use Case?

That’s a fair question. Why do I want an event bus that works like this? What is my use case? I’ve come to find “my way” of developing software along Domain-Driven Development (DDD) principles. The first thing I built with this event bus was a library providing a Command Query Responsibility Segregation (CQRS) pattern. This provided a set of handlers and macros writing command, query and event handlers. The command and query handlers make use of the request-response capability of the event bus. Command handlers also generate events, which are published to the event bus so event handlers can consume them.

The event bus also fulfils a development practice that I like. Specifically, decoupling the API from the business logic. So, business logic is implemented as handlers and the API layer will interact with the business logic through messaging. This forces the developer (me) to keep the API layer, whatever it may be, to be relatively lightweight and focussed on only doing what it needs to do. Namely, receiving and validating input and generating output.

Porting over my existing code base to this new library, in spite of trying to keep the contracts as similar as I could was actually more difficult that I expected. And it all came down to lifetimes. I think this is an area of Rust in which I’m still pretty weak. I still haven’t figured out how to stop clone()ing things all over the place to get around lifetime issues. I hope I’m able to, because I’m sure that would contribute to the “make it fast” part of software development.

Conclusion

This article describes some of my own troubles and hang-ups learning how to effectively develop in Rust and some of my attempts to get over them and produce something cool and useful, even if it’s only useful to me.

Banner image by Zachary Kadolph on Unsplash