Skip to main content

Command Palette

Search for a command to run...

Mora Journal #0

Init

Updated
9 min read
Mora Journal #0

Mora is my attempt at building a distributed event scheduler capable of notifying clients of scheduled events with sub-second precision.

Mora started a few months ago (sept. 2021) when I was hired by Prima where I had the chance to develop a personal project as an onboarding step, mainly to get confident with Elixir. I have to say Elixir grew on me pretty quickly, I was mesmerized by functional programming in general and the concurrency model of the language. I was able to make a very basic distributed scheduler in a matter of hours (~30). I then had to pause it for a few months due to personal circumstances. Fast forward six months and I just want to re-do everything in Rust. I think mora has to cover a lot of interesting topics to perform correctly, to name a few:

  • Distributed transactions and storage

  • Clustering (with all its quirks such as clock synchronization)

  • Event scheduling

  • Client notification

and while I don't see much potential in it as a commercial product I see it as an incredible learning tool and I would like to write my journey in these blog posts, to share them with others, but mainly to keep tracks of what I do.

Why Rust?

I had the pleasure of working with it at Prima in a real production environment. I was, once again, mesmerized, it conveys everything I was looking for in a language:

  • Performance

  • Excellent Toolchain

  • Safety

  • TDD oriented

Mora to me has always been proving grounds for my programming endeavours, so I decided to reimplement everything in Rust to better appreciate the language and dive deep inside it. I'm going to try and document as much of it as possible, hence the mora journal (MJ).

Implementing a priority queue in Rust

Key aspect of mora's internal engine is its priority queue structure that sorts scheduled events. Priority queues store items based on a priority key that sorts either in ascending or descending order. pqueues in general are very useful whenever you have to enqueue items based on specific logic. Dequeueing from a pqueue means removing the highest priority from the queue.

We are going to start with a very dumb, inefficient, slow version of priority queue and work our way towards something more elaborate (eventually thread-safe).

:warning: I usually apply TDD whenever I code something and indeed I started from tests to implement this queue, nonetheless I feel like explaining things starting from tests it's not very blog-oriented. For blog's sake I will explain my design/code decision first and then show tests. :warning:

A Common behavior for all queues

First things first let's define a common trait our priority queues must implement in a dedicated priority_queue.rs file:

pub trait PriorityQueue<K, V> {
    fn is_empty(&self) -> bool;
    fn len(&self) -> usize;
    fn enqueue(&mut self, key: K, value: V) -> Option<V>;
    fn dequeue(&mut self, count: usize) -> Vec<V>;
}

There's a lot to comment here, let's inspect line by line:

Here we are defining a public trait for our priority queue, a trait is something other structs can implement that makes it easy to code shared behaviors. Inside the <> we can see two generic types K (for key) V (for value). This means we can replace those types with whatever we want to store in the queue, such as PriorityQueue<u32,Foo>.

pub trait PriorityQueue<K, V> {

This function should return whether our queue is empty or not, pretty straight forward. It will come in handy later on. &self means that we need the reference of the struct that implements this trait. -> bool means we will return a boolean value.

fn is_empty(&self) -> bool;

Returns the amount of items currently in the queue in the form of a usize which is either a u32 in 32 bit systems or a u64 in 64 bit systems.

fn len(&self) -> usize;

Here we have something new &mut which means we need a mutable reference to our struct to invoke this function. We also return an Option<V> which means we will return either Some(value) or None. Options are pretty useful and we will see later on why.

fn enqueue(&mut self, key: K, value: V) -> Option<V>;

enqueue's antagonist dequeue will take a mutable reference of our queue and dequeue as many values as count says. It return a Vec.

fn dequeue(&mut self, count: usize) -> Vec<V>;

Cool, we have defined a common behavior for our queues, now we need to implement a dumb pqueue.

A dumb priority queue

Let's setup a very simple and easy to grasp priority queue. We are going to be using a simple hashmap as our data structure and we are going to implement our previously defined trait.

use std::collections::HashMap;
use std::hash::Hash;
use crate::priority_queue::PriorityQueue;

pub struct DumbPriorityQueue<K, V>
where
    K: Clone + Eq + Hash + Ord,
    V: Clone,
{
    map: HashMap<K, V>,
}

There's a couple things worth mentioning here:

  1. The first three lines of code are imports, we need to reference those structs and traits to be able to use them in our file.

  2. the where clause means this struct needs to enforce a couple of rules to the relative types:

    • K must derive Clone (for cloning keys), Eq (for equality), Hash (for hashing and storing inside the map) and Ord (for sorting). Notice the + sign means all this restrictions must be applied at the same time.

    • V just needs to be clonable.

Let's now implement our PriorityQueue trait:

impl<K, V> PriorityQueue<K, V> for DumbPriorityQueue<K, V>
where
    K: Clone + Eq + Hash + Ord,
    V: Clone,
{
    fn is_empty(&self) -> bool {
        self.map.is_empty()
    }

    fn len(&self) -> usize {
        self.map.len()
    }

    fn enqueue(&mut self, key: K, value: V) -> Option<V> {
        self.map.insert(key, value)
    }

    fn dequeue(&mut self, count: usize) -> Vec<V> {
        let mut keys: Vec<K> = self.map.keys().cloned().collect::<Vec<K>>();
        keys.sort();
        keys.iter()
            .take(count)
            .map(|k| self.map.remove(k).unwrap())
            .collect::<Vec<V>>()
    }
}

Here we can see syntax for implementing traits with generic types:

impl<K, V> PriorityQueue<K, V> for DumbPriorityQueue<K, V>

This means that we are indeed implementing for K and V the trait PriorityQueue for the struct DumbPriorityQueue. We also need our where clauses for enforcing.

First three functions are pretty straight-forward so we won't cover them, let's see what dequeue does:

let mut keys: Vec<K> = self.map.keys().cloned().collect::<Vec<K>>();

Here we clone all the map keys and collect them in a mutable Vec. We then sort our keys calling the sort function (provided by the Ord trait). As we stated earlier in this function we are holding a mutable reference to our struct, therefore we can indeed modify our internal map.

keys.iter()
    .take(count)
    .map(|k| self.map.remove(k).unwrap())
    .collect::<Vec<V>>()

With a bit of iterator magic we can iterate with iter(), take as many keys as count indicates, mapping those taken keys in values remove-ing them from the map at the same time and finally collect them in a Vec<V>.

Testing :white_check_mark:

That's a terrible priority queue, but how do we know if it works? Let's write some tests:

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn new_queue_is_empty() {
        let pq = DumbPriorityQueue::<u32, u32>::default();
        assert!(pq.is_empty())
    }

    #[test]
    fn new_queue_has_zero_elements() {
        let pq = DumbPriorityQueue::<u32, u32>::default();
        assert_eq!(pq.len(), 0)
    }

    #[test]
    fn enqueue_adds_element_to_queue() {
        let mut pq = DumbPriorityQueue::<u32, u32>::default();
        pq.enqueue(1, 1);
        assert_eq!(pq.len(), 1);
        assert!(!pq.is_empty());
    }

    #[test]
    fn take_elments_returns_elements_ordered_by_key() {
        let mut pq = DumbPriorityQueue::<u32, u32>::default();
        pq.enqueue(4, 3);
        pq.enqueue(2, 2);
        pq.enqueue(1, 1);
        pq.enqueue(3, 3);
        let values: Vec<u32> = pq.dequeue(3);

        assert_eq!(values.len(), 3);
        assert_eq!(values, [1, 2, 3]);
        assert_eq!(pq.len(), 1)
    }
}

Rust tests are very easy to understand, we won't go into detail but the thing we're interested in is this:

running 4 tests
test dumb_priority_queue::tests::new_queue_is_empty ... ok
test dumb_priority_queue::tests::new_queue_has_zero_elements ... ok
test dumb_priority_queue::tests::take_elments_returns_elements_ordered_by_key ... ok
test dumb_priority_queue::tests::enqueue_adds_element_to_queue ... ok
test result: ok. 4 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

It works! Sure we could probably test other edge cases but for now this will have to do. If you are wondering why we built a priority queue that works on reverse (it yields items from lowest priority to higher) it will all make sense down the road.

Benchmarking :bar-chart:

We said earlier that this implementation is dumb and not really performance-oriented, let's do some benchmarks so we can compare them with future queues we will develop.

For benchmarking I'll use criterion.rs, I'll spare you the details involved in setting criterion up since the library is very well documented, let's skip to our benchmarks directly.

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use mora_queue::{dumb_priority_queue::DumbPriorityQueue, priority_queue::PriorityQueue};

fn dumb_enqueue(c: &mut Criterion) {
    let mut pq = DumbPriorityQueue::<i32, i32>::default();
    let mut count = 0;
    c.bench_function("dumb_enqueue", |b| {
        b.iter(|| {
            count = count + 1;
            pq.enqueue(black_box(count), black_box(-count))
        })
    });
}

fn dumb_dequeue(c: &mut Criterion) {
    let mut pq = DumbPriorityQueue::<i32, i32>::default();

    (0..10000).for_each(|x| match x % 2 {
        1 => {
            pq.enqueue(-x, x);
        }
        0 => {
            pq.enqueue(x, x);
        }
        _ => {}
    });

    c.bench_function("dumb_dequeue", |b| b.iter(|| pq.dequeue(black_box(100))));
}

criterion_group!(queue_benches, dumb_enqueue, dumb_dequeue);
criterion_main!(queue_benches);

To benchmark with criterion we define a function that take a Criterion struct that allows us to call bench_function. Now I'm an absolute begineer in a lot of things, especially benchmarking. I kept it extremely simple with both the enqueue and dequeue functions. By running cargo bench --bench priority_queue -- -s dumb --verbose we get the following:

Benchmarking dumb_enqueue
Benchmarking dumb_enqueue: Warming up for 3.0000 s
Benchmarking dumb_enqueue: Collecting 100 samples in estimated 5.0003 s (35309600 iterations)
Benchmarking dumb_enqueue: Analyzing
dumb_enqueue            time:   [116.44 ns 169.23 ns 285.70 ns]
                        change: [-39.540% -1.9099% +58.350%] (p = 0.83 > 0.05)
                        No change in performance detected.
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe
slope  [116.44 ns 285.70 ns] R^2            [0.0004001 0.0003678]
mean   [112.54 ns 186.75 ns] std. dev.      [6.9781 ns 411.85 ns]
median [109.16 ns 113.14 ns] med. abs. dev. [4.1726 ns 8.2814 ns]

Benchmarking dumb_dequeue
Benchmarking dumb_dequeue: Warming up for 3.0000 s
Benchmarking dumb_dequeue: Collecting 100 samples in estimated 5.0009 s (11912950 iterations)
Benchmarking dumb_dequeue: Analyzing
dumb_dequeue            time:   [413.87 ns 415.25 ns 416.97 ns]
                        change: [+0.0958% +0.7162% +1.3969%] (p = 0.02 < 0.05)
                        Change within noise threshold.
Found 8 outliers among 100 measurements (8.00%)
  5 (5.00%) high mild
  3 (3.00%) high severe
slope  [413.87 ns 416.97 ns] R^2            [0.9598277 0.9590296]
mean   [413.73 ns 417.35 ns] std. dev.      [4.2076 ns 14.543 ns]
median [412.60 ns 414.16 ns] med. abs. dev. [2.6068 ns 4.9808 ns]

which basically tells us that we run our dumb_enqueue function for a total of 36 million iterations resulting in an average time of execution of ~169ns, while our dumb_dequeue is a bit slower on ~415ns on 12 million iterations.

This sample clearly its useless as we don't have anything to compare it to but in the next post we'll go around a few implementations that would hopefully lower the average executions by a lot! Results for this implementation are available here.

What's next?

We are still a long way from the current mora capabilities, in the next post we will tackle some more efficient priority queue implementations, stay tuned!

Mora Journal

Part 1 of 1

In this journal, I will gather all insights and notable development efforts around Mora.