This is how I made the runtime this website runs on!

15 Nov, 2023

I've always been obsessed with knowledge. Ever since my family first gifted me an iMac when I was a kid, I've been fascinated by computers and the way ther work. Over the years, that interest has been trickling down into lower-level programming, and an unhealthy obsession with performance and doing stuff better than the rest. Whilst that attitude is one I usually try to tame, it has also lead me to learn and try the next step down.

So, when I saw a couple of ThePrimeagen videos reacting to criticism on async Rust and it's runtimes, I got that feeling of I should try doing it my way, so I challanged myself to build this website using my own async runtime.

The basics

The basic idea is quite simple. A Rust future is nothing more than a regular struct with the Future trait implemented.

pub trait Future {
    type Output;

    // Required method
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Thus, starting execution of the future is as easy as calling it's poll method.

// We pin the future to the stack.
// This way, it's address in memory will never change (this is a requirement for futures).
let fut: Pin<&mut impl Future<Output = ()>> = std::pin::pin!(fut);
let mut cx: Context<'_> = ...;

// After polling, the future returns either `Poll::Ready(value)` or `Poll::Pending`
let poll: Poll<()> = fut.poll(&mut cx);

The tricky part is the Context, which holds an instance of Waker.

Wakers are the mechanism that Rust futures use to wake. When a waker calls it's wake or wake_by_ref method, it tell's the underlying async runtime "hey, the future bound to me is now ready to be executed again".

A good example can be found on the Rust docs

/// A waker that wakes up the current thread when called.
struct ThreadWaker(Thread);

impl Wake for ThreadWaker {
    fn wake(self: Arc<Self>) {
        self.0.unpark();
    }
}

/// Run a future to completion on the current thread.
fn block_on<T>(fut: impl Future<Output = T>) -> T {
    // Pin the future so it can be polled.
    let mut fut = pin::pin!(fut);

    // Create a new context to be passed to the future.
    let t = thread::current();
    // Wakers can be created from `Arc`'s to objects that implement the `Wake` trait
    let waker = Arc::new(ThreadWaker(t)).into();
    let mut cx = Context::from_waker(&waker);

    // Run the future to completion.
    loop {
        match fut.as_mut().poll(&mut cx) {
            Poll::Ready(res) => return res,
            Poll::Pending => thread::park(),
        }
    }
}

block_on(async {
    println!("Hi from inside a future!");
});

This is a nice and simple implementation, but it doesn't give us any mechanism to execute asynchronous I/O. For that, we need a version of thread::park that listens to I/O handles, and wakes up whenever one is ready to be executed. Thankfully, Linux offers such functionality with the epoll system calls!

Single-threaded, asynchronous I/O

The way epoll works is through epoll instances created via epoll_create, and this instances can begin and stop listening for file descriptor readynes via epoll_ctl, and wait for I/O events via epoll_wait.

Thus, we could create a rustified epoll struct as such.

// We'll use the libc crate to get access to the epoll functions.
extern crate libc;

use std::os::fd::OwnedFd;
use std::cell::{Cell, UnsafeCell};

pub struct Epoll {
    // epoll handles are nothing more than file descriptors
    fd: OwnedFd,
    size: Cell<usize>,
    sink: UnsafeCell<Vec<libc::epoll_event>>
}

impl Epoll {
    pub fn new() -> std::io::Result<Self> {
        // `epoll_create1` is a more generic version of `epoll_create`
        // that allows providing flags when creating an instance.
        let fd = unsafe { err_handle(libc::epoll_create1(libc::EPOLL_CLOEXEC))? };

        return Ok(Self {
            fd: unsafe { OwnedFd::from_raw_fd(fd) },
            size: Cell::new(0),
            sink: UnsafeCell::new(Vec::new())
        });
    }

    pub fn wait(&self) -> std::io::Result<()> {
        let size = self.size.get();

        // This is safe, since no other thread will access this sink, and by the time we
        // exit this function, no mutable references to the sink will exist.
        let sink: &mut Vec<libc::epoll_event> = unsafe { &mut *self.sink.get() };
        sink.reserve(size);

        let res = unsafe {
            loop {
                match err_handle(libc::epoll_wait(
                    self.fd.as_raw_fd(),
                    sink.as_mut_ptr(), // pointer to memory that will hold the event results
                    size as i32, // max amount of events that we can handle
                    // a negative timeout equals no timeout
                    // (wait indefinetively until something happens)
                    -1,
                )) {
                    Ok(x) => break x,
                    Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
                    Err(e) => return Err(e),
                }
            }
        };

        // `epoll_wait` returns the # of events that have succeded
        unsafe { sink.set_len(res as usize) };

        // TODO handle epoll events
        ...

        return std::io::Result::Ok(());
    }
}

fn err_handle(err: libc::c_int) -> std::io::Result<libc::c_int> {
    if err == -1 {
        return Err(std::io::Error::last_os_error());
    }
    return Ok(err);
}

And with this, we can create our future runner.

pub struct Runner {
    epoll: Epoll,
}

impl Runner {
    pub fn new() -> std::io::Result<Self> {
        return Ok(Self {
            epoll: Epoll::new()?
        });
    }

    pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> std::io::Result<T> {
        let mut fut = std::pin::pin!(fut);
        let mut cx: Context<'_> = ...;

        loop {
            match fut.as_mut().poll(&mut cx) {
                Poll::Ready(x) => return Ok(x),
                Poll::Pending => self.epoll.wait()?,
            }
        }
    }
}

But as you might have noticed, we have a couple if remaining issues.

Let's start by adding a way four our epoll to listen for I/O readyness. This is done via the epoll_ctl function in EPOLL_CTL_ADD mode. If we call it with the EPOLLIN, epoll will wait until the file handle is ready to read, and if we call it with the EPOLLOUT, epoll will wait until the file handle is ready to write.

Knowing this, we can implement functions to start listening for I/O readyness that will return some kind of guard, which will make the epoll stop listening to the specified file descriptor after we drop it.

All of this is nice and everything, but we don't have a way to start polling the future after the I/O wait has completed. For that, we'll need to pass some kind of information to the epoll event so we can resume execution of the future.

The simplest way possible would be to pass a Waker and a bool flag as user data. This way, whenever we have ended waiting for the epoll, we can set the flag to true and call the waker, which will wake the future.

use std::os::fd::AsRawFd;
use std::task::Waker;
use std::cell::Cell;

struct PollEvent<'a, 'b> {
    fd: libc::c_int,
    recv: &'b (Cell<bool>, Cell<Option<Waker>>),
    poll: &'a Epoll,
}

impl Epoll {
    pub fn listen_read<'a, 'b>(
        &'a self,
        fd: &impl AsRawFd,
        recv: &'b (Cell<bool>, Cell<Option<Waker>>),
    ) -> std::io::Result<PollEvent<'a, 'b>> {
        return self.listen(fd, libc::EPOLLIN);
    }

    pub fn listen_write<'a, 'b>(
        &'a self,
        fd: &impl AsRawFd,
        recv: &'b (Cell<bool>, Cell<Option<Waker>>),
    ) -> std::io::Result<PollEvent<'a, 'b>> {
        return self.listen(fd, libc::EPOLLOUT);
    }

    fn listen<'a, 'b>(
        &'a self,
        fd: &impl AsRawFd,
        recv: &'b (Cell<bool>, Cell<Option<Waker>>),
        events: u32,
    ) -> std::io::Result<PollEvent<'a, 'b>> {
        let fd = fd.as_raw_fd();

        // C-structs can be safely zero-initialized
        let mut evt: libc::epoll_event = unsafe { MaybeUninit::zeroed().assume_init() };
        // Use edge-triggered notifications
        evt.events = events | libc::EPOLLET as u32;
        evt.u64 = unsafe {
            libc::epoll_data {
                ptr: recv as *const _ as *mut libc::c_void,
            }
            .u64
        };

        return match unsafe {
            err_handle(libc::epoll_ctl(
                self.fd.as_raw_fd(),
                libc::EPOLL_CTL_ADD,
                fd,
                &mut evt,
            ))
        } {
            Ok(_) => {
                self.size.set(self.size.get() + 1);
                Ok(PollEvent {
                    fd,
                    poll: self,
                })
            }
            Err(e) => Err(e),
        };
    }

    pub fn wait(&self) -> std::io::Result<()> {
        let size = self.size.get();

        let sink: &mut Vec<libc::epoll_event> = unsafe { &mut *self.sink.get() };
        sink.reserve(size);

        let res: libc::c_int = unsafe {
            loop {
                match err_handle(libc::epoll_wait(
                    self.fd.as_raw_fd(),
                    sink.as_mut_ptr(),
                    size as i32,
                    -1,
                )) {
                    Ok(x) => break x,
                    Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
                    Err(e) => return Err(e),
                }
            }
        };

        unsafe { sink.set_len(res as usize) };

        // Iterate over epoll events and remove them from the sink
        for evt in sink.drain(..) {
            // This is safe, since only receivers that have been added to the epoll and still haven't been
            // dropped from it can appear here, so we are sure they still exist.
            let data: &(Cell<bool>, Cell<Option<Waker>>) =
                unsafe { &*epoll_data { u64: evt.u64 }.ptr.cast() };

            // Mark task as complete.
            data.0.set(true);

            // Call the waker (if there is one)
            if let Some(waker) = data.1.take() {
                waker.wake()
            }
        }

        return std::io::Result::Ok(());
    }
}

On the other end, we'll need a function for our PollEvent struct that allows us to asynchronously wait for the epoll to notify us that we're ready to keeep executing. To stop listening, we must call epoll_ctl in EPOLL_CTL_DEL mode.

use std::future::poll_fn;

// just as a reminder, this is the body of our `PollEvent` struct
struct PollEvent<'a, 'b> {
    fd: libc::c_int,
    recv: &'b (Cell<bool>, Cell<Option<Waker>>),
    poll: &'a Epoll,
}

impl<'a, 'b> PollEvent<'a, 'b> {
    pub async fn wait(&mut self) {
        // the `poll_fn` method allows us to create a future by writing it's `poll` method on a closure.
        poll_fn(|cx| self.poll_wait(cx)).await;
    }

    fn poll_wait(&mut self, cx: &mut Context<'_>) -> std::task::Poll<()> {
        // check if the flag is set
        if self.recv.0.replace(false) {
            // the flag was set, the wait is complete
            return std::task::Poll::Ready(());
        } else {
            // the flag was not set, we still have to wait
            if let Some(prev) = self.recv.1.take() {
                // check if the waker we previously had will wake our current one.
                // this is usefull to avoid making uneeded copies of the same waker.
                if prev.will_wake(cx.waker()) {
                    self.recv.1.set(Some(prev));
                    return std::task::Poll::Pending;
                }
            }

            // set the waker so it gets called whenever epoll finishes our wait
            self.recv.1.set(Some(cx.waker().clone()));
            return std::task::Poll::Pending;
        }
    }
}

impl<'a, 'b> Drop for PollEvent<'a, 'b> {
   fn drop(&mut self) {
        self.poll.size.set(self.poll.size.get() - 1);
        unsafe {
            err_handle(libc::epoll_ctl(
                self.poll.fd.as_raw_fd(),
                libc::EPOLL_CTL_DEL,
                self.fd,
                core::ptr::null_mut(), // no info needs to be provided in `EPOLL_CTL_DEL` mode
            ))
            .unwrap();
        }
    }
}

After all this work, we can implement some basic read and write methods that will be able to asynchronously read/write on structs that implement std::os::fd::AsRawFd and (std::io::Read or std::io::Write).

Note that in order to call this futures without blocking the thread, this structs need to be setup to execute in non-blocking mode.

use std::io::{Read, Write};
use std::os::fd::AsRawFd;
use std::cell::Cell;

async fn read<R: ?Sized + AsRawFd + Read>(
    poll: &Epoll,
    reader: &mut R,
    buf: &mut [u8],
) -> std::io::Result<usize> {
    let flag: (Cell<bool>, Cell<Option<Waker>>) = (Cell::new(false), Cell::new(None));
    let mut handle: PollEvent = poll.listen_read(reader, &flag)?;

    loop {
        match reader.read(buf) {
            Ok(x) => return Ok(x),
            Err(ref e) if matches!(e.kind(), ErrorKind::WouldBlock | ErrorKind::Interrupted) => {
                handle.wait().await
            }
            Err(e) => return Err(e),
        }
    }
}

async fn write<W: ?Sized + AsRawFd + Write>(
    poll: &Epoll,
    writer: &mut W,
    buf: &[u8],
) -> std::io::Result<usize> {
    let flag: (Cell<bool>, Cell<Option<Waker>>) = (Cell::new(false), Cell::new(None));
    let mut handle: PollEvent = poll.listen_write(writer, &flag)?;

    loop {
        match writer.write(buf) {
            Ok(x) => return Ok(x),
            Err(ref e) if matches!(e.kind(), ErrorKind::WouldBlock | ErrorKind::Interrupted) => {
                handle.wait().await
            }
            Err(e) => return Err(e),
        };
    }
}

Awesome! We can now listen for I/O readyness, but we still haven't created our Waker that will actually resume execution of the future.

The thing is, we don't actually need one. You see, after polling the epoll, the runner will automatically repoll the future anyways, so a noop waker would sufice us, kind of.

That would be true if the only thing that could wake up our future was an I/O event, but that's not always the case.

// This crate provides a simple thread-safe async flag we can use for this example
extern crate utils_atomics;

use utils_atomics::flag::mpsc;

pub async fn oups() {
    let (send, recv) = mpsc::async_flag();

    std::thread::spawn(move || {
        // simulate some "very intense" work
        std::thread::sleep(Duration::from_secs(2));
        // mark the flag as complete
        send.mark();
    });

    // wait for the flag to be marked
    recv.await;
    // a noop waker will never reach this code
    println!("Wait completed!");
}

Thats right! Other sections of our code could be waking parts of our future, and a noop waker wouldn't catch those wakeup calls! For that, we would need our Waker implementation to fake an I/O event to the epoll, which we can do with an eventfd.

We can set up an eventfd so that the epoll listens to it, and when the Waker is awaken, it will send a signal to it, waking up the epoll as well.

use std::os::fd::{FromRawFd, IntoRawFd, AsRawFd};

// as the name suggests, `eventfd`s are nothing more than file descriptors
pub struct Event(OwnedFd);

impl Event {
    pub fn new() -> std::io::Result<Self> {
        // important to set it to non-blocking mode
        let fd = unsafe { err_handle(libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC))? };
        return unsafe { Ok(Self(OwnedFd::from_raw_fd(fd))) };
    }

    // notifying an `eventfd` consists of writing to it
    pub fn notify(&self) -> std::io::Result<()> {
        let value = 1u64;
        unsafe {
            err_handle(libc::write(self.0.as_raw_fd(), std::ptr::addr_of!(value).cast(), 8) as libc::c_int)?;
            return Ok(());
        }
    }

    // this will be usefull for later
    pub fn try_clone(&self) -> std::io::Result<Event> {
        self.0.try_clone().map(Self)
    }

    // waiting for an `eventfd` consists of trying to read it
    pub fn try_get(&self) -> std::io::Result<bool> {
        let mut buf = 0u64;
        unsafe {
            return match err_handle(
                libc::read(self.0.as_raw_fd(), std::ptr::addr_of_mut!(buf).cast(), 8) as libc::c_int
            ) {
                Ok(_) => Ok(true),
                Err(ref e) if matches!(e.kind(), ErrorKind::WouldBlock | ErrorKind::Interrupted) => {
                    Ok(false)
                }
                Err(e) => return Err(e),
            };
        };
    }
}

impl IntoRawFd for Event {
    fn into_raw_fd(self) -> std::os::fd::RawFd {
        self.0.into_raw_fd()
    }
}

impl FromRawFd for Event {
    unsafe fn from_raw_fd(fd: std::os::fd::RawFd) -> Self {
        Self(OwnedFd::from_raw_fd(fd))
    }
}

impl AsRawFd for Event {
    fn as_raw_fd(&self) -> std::os::fd::RawFd {
        self.0.as_raw_fd()
    }
}

Now that we have our rustified eventfd, we can implement our Waker. To do so, we'll first need to create a RawWaker, which consists of a user data pointer and a RawWakerVTable.

Instead of using a pointer for our user data, we'll pass the eventfd as if it were a pointer, and implement the VTable's functions with it.

use std::{
    mem::ManuallyDrop,
    task::{RawWaker, RawWakerVTable, Waker},
};

static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);

/* VTABLE FUNCTIONS */
unsafe fn clone(data: *const ()) -> RawWaker {
    let signal = ManuallyDrop::new(Event::from_raw_fd(data as libc::c_int));
    // Unlikely to fail, and we don't have an error-handling mechanism for cloning Wakers
    let new_signal = signal.try_clone().unwrap();
    return raw_waker(new_signal);
}

unsafe fn wake(data: *const ()) {
    let signal = Event::from_raw_fd(data as libc::c_int);
    signal.notify().unwrap();
}

unsafe fn wake_by_ref(data: *const ()) {
    // With `ManuallyDrop`, we're efectively "taking a reference" to the event.
    let signal = ManuallyDrop::new(Event::from_raw_fd(data as libc::c_int));
    signal.notify().unwrap();
}

unsafe fn drop(data: *const ()) {
    std::mem::drop(Event::from_raw_fd(data as libc::c_int));
}

/* WAKER CREATION */
fn raw_waker(signal: Event) -> RawWaker {
    let signal = signal.into_raw_fd();
    return RawWaker::new(signal as *const (), &VTABLE);
}

pub fn waker(signal: Event) -> Waker {
    unsafe { Waker::from_raw(raw_waker(signal)) }
}

And after adapting our Runner implementation...

impl Runner {
    pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> std::io::Result<T> {
        let mut fut = std::pin::pin!(fut);

        let event = Event::new()?;
        // we need them because of our epoll implementation, but we won't actually use them
        let recv = (std::cell::Cell::new(false), std::cell::Cell::new(None));
        let handle: PollEvent = self.poll.listen_read(&event, &recv)?;

        let waker: Waker = waker(signal.try_clone()?);
        let mut cx: Context<'_> = Context::from_waker(&waker);

        loop {
            match fut.as_mut().poll(&mut cx) {
                Poll::Ready(x) => return Ok(x),
                Poll::Pending => self.epoll.wait()?,
            }

            // Event already handled, clear it (if needed)
            let _ = event.try_get()?;
        }
    }
}

We have a functional, single-threaded future runner!

To make our life's easier, we'll create a thread-local variable so that each thread has it's own runner.

thread_local! {
    static LOCAL_RUNNER: UnsafeCell<Runner> = UnsafeCell::new(Runner::new().unwrap());
}

// SAFETY: This is safe because this pointer will be valid for the duration of the thread, and pointers are
// `!Send` and `!Sync` by default, so this value will not be able to be moved to another thread.
#[derive(Debug)]
#[repr(transparent)]
pub struct RunnerRef(*const Runner);

impl Deref for RunnerRef {
    type Target = Runner;

    #[inline]
    fn deref(&self) -> &Self::Target {
        unsafe { &*self.0 }
    }
}

pub fn block_on<Fut: Future>(fut: Fut) -> Fut::Output {
    return with_runner(|runner| runner.block_on(fut)).unwrap()
}

pub(crate) fn runner() -> RunnerRef {
    return with_runner(|r| RunnerRef(r));
}

pub(crate) fn with_runner<F: FnOnce(&Runner) -> T, T>(f: F) -> T {
    LOCAL_RUNNER.with(|runner| f(unsafe { &mut *runner.get() }))
}

Nighty night!

Let's test that our brand spanking new runtime is working correctly by implementing an async sleep function that will use our epoll.

Unix provides the timerfd API to handle timers through file handles, exactly what we need to integrate them with our runtime!

use std::os::fd::OwnedFd;
use std::ptr::addr_of_mut;

// Our rustified `timerfd`
#[derive(Debug)]
pub struct Timer(OwnedFd);

impl Timer {
    pub fn new_timeout(until: Duration) -> std::io::Result<Self> {
        let mut this = Self::new_(libc::CLOCK_MONOTONIC)?;
        this.setup_timeout(until)?;
        return Ok(this);
    }
}

impl Timer {
    fn new_(clockid: libc::c_int) -> std::io::Result<Self> {
        // Remember to set the `timerfd` as non-blocking
        let fd = unsafe { err_handle(timerfd_create(clockid, libc::TFD_NONBLOCK | libc::TFD_CLOEXEC))? };
        return unsafe { Ok(Self(OwnedFd::from_raw_fd(fd))) };
    }

    /// Number of ticks occurred since last check
    pub fn ticks(&self) -> std::io::Result<u64> {
        loop {
            unsafe {
                let mut tmp = 0u64;
                return match err_handle(
                    libc::read(self.0.as_raw_fd(), addr_of_mut!(tmp).cast(), 8) as libc::c_int
                ) {
                    Ok(_) => Ok(tmp),
                    // If it were to block, that would mean no ticks have happened since we last checked
                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(0),
                    Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
                    Err(e) => Err(e),
                };
            }
        }
    }

    fn setup_timeout(&mut self, delta: Duration) -> std::io::Result<()> {
        let value = libc::itimerspec {
            it_interval: libc::timespec {
                tv_sec: 0,
                tv_nsec: 0,
            },
            it_value: dur2timespec_saturating(delta),
        };

        unsafe {
            err_handle(libc::timerfd_settime(
                self.0.as_raw_fd(),
                0,
                &value,
                core::ptr::null_mut(),
            ))?
        };
        return Ok(());
    }
}

impl std::os::fd::AsRawFd for Timer {
    fn as_raw_fd(&self) -> std::os::fd::RawFd {
        self.0.as_raw_fd()
    }
}

As we can see in the example, we create the file descriptor via timerfd_create, and set the time we want to sleep via timerfd_settime.

timerfds can also be used to create intervals, but that functionality is beyond the intent of this example.

After that, we just need to pass the timerfd to our Poll and we'll have a functional async sleep function!

pub async fn sleep(dur: Duration) {
    let timer = Timer::new_timeout(dur).unwrap();
    let recv = (std::cell::Cell::new(false), std::cell::Cell::new(None));
    let runner: RunnerRef = runner();

    // Tell `epoll` to listen to whenever the timeout has passed
    let mut handle = runner.epoll.listen_read(&timer, &recv).unwrap();

    loop {
        while timer.ticks().unwrap() == 0 {
            handle.wait().await
        }
    }
}

And done! Here is an example main function!

pub fn main() {
    block_on(async move {
        println!("Hello");
        sleep(Duration::from_secs(1)).await;
        println!("world!");
    });
}