I've always been obsessed with knowledge.
Ever since my family first gifted me an iMac when I was a kid, I've been fascinated by computers and the way ther work. Over the years, that interest has been trickling down into lower-level programming, and an unhealthy obsession with performance and doing stuff better than the rest. Whilst that attitude is one I usually try to tame, it has also lead me to learn and try the next step down.
So, when I saw a couple of ThePrimeagen videos reacting to criticism on async Rust and it's runtimes, I got that feeling of I should try doing it my way, so I challanged myself to build this website using my own async runtime.
The basics
The basic idea is quite simple. A Rust future is nothing more than a regular struct with the Future
trait implemented.
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Thus, starting execution of the future is as easy as calling it's poll
method.
let fut: Pin<&mut impl Future<Output = ()>> = std::pin::pin!(fut);
let mut cx: Context<'_> = ...;
let poll: Poll<()> = fut.poll(&mut cx);
The tricky part is the Context
, which holds an instance of Waker
.
Wakers are the mechanism that Rust futures use to wake. When a waker calls it's wake
or wake_by_ref
method, it tell's the underlying async runtime "hey, the future bound to me is now ready to be executed again".
A good example can be found on the Rust docs
struct ThreadWaker(Thread);
impl Wake for ThreadWaker {
fn wake(self: Arc<Self>) {
self.0.unpark();
}
}
fn block_on<T>(fut: impl Future<Output = T>) -> T {
let mut fut = pin::pin!(fut);
let t = thread::current();
let waker = Arc::new(ThreadWaker(t)).into();
let mut cx = Context::from_waker(&waker);
loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(res) => return res,
Poll::Pending => thread::park(),
}
}
}
block_on(async {
println!("Hi from inside a future!");
});
This is a nice and simple implementation, but it doesn't give us any mechanism to execute asynchronous I/O.
For that, we need a version of thread::park
that listens to I/O handles, and wakes up whenever one is ready to be executed. Thankfully, Linux offers such functionality with the epoll system calls!
Single-threaded, asynchronous I/O
The way epoll works is through epoll instances created via epoll_create
, and this instances can begin and stop listening for file descriptor readynes via epoll_ctl
, and wait for I/O events via epoll_wait
.
Thus, we could create a rustified epoll struct as such.
extern crate libc;
use std::os::fd::OwnedFd;
use std::cell::{Cell, UnsafeCell};
pub struct Epoll {
fd: OwnedFd,
size: Cell<usize>,
sink: UnsafeCell<Vec<libc::epoll_event>>
}
impl Epoll {
pub fn new() -> std::io::Result<Self> {
let fd = unsafe { err_handle(libc::epoll_create1(libc::EPOLL_CLOEXEC))? };
return Ok(Self {
fd: unsafe { OwnedFd::from_raw_fd(fd) },
size: Cell::new(0),
sink: UnsafeCell::new(Vec::new())
});
}
pub fn wait(&self) -> std::io::Result<()> {
let size = self.size.get();
let sink: &mut Vec<libc::epoll_event> = unsafe { &mut *self.sink.get() };
sink.reserve(size);
let res = unsafe {
loop {
match err_handle(libc::epoll_wait(
self.fd.as_raw_fd(),
sink.as_mut_ptr(), size as i32, -1,
)) {
Ok(x) => break x,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
};
unsafe { sink.set_len(res as usize) };
...
return std::io::Result::Ok(());
}
}
fn err_handle(err: libc::c_int) -> std::io::Result<libc::c_int> {
if err == -1 {
return Err(std::io::Error::last_os_error());
}
return Ok(err);
}
And with this, we can create our future runner.
pub struct Runner {
epoll: Epoll,
}
impl Runner {
pub fn new() -> std::io::Result<Self> {
return Ok(Self {
epoll: Epoll::new()?
});
}
pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> std::io::Result<T> {
let mut fut = std::pin::pin!(fut);
let mut cx: Context<'_> = ...;
loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(x) => return Ok(x),
Poll::Pending => self.epoll.wait()?,
}
}
}
}
But as you might have noticed, we have a couple if remaining issues.
- We have no way to add I/O events for the epoll to listen
- We haven't created our version of
Waker
yet
Let's start by adding a way four our epoll to listen for I/O readyness.
This is done via the epoll_ctl
function in EPOLL_CTL_ADD
mode.
If we call it with the EPOLLIN
, epoll will wait until the file handle is ready to read,
and if we call it with the EPOLLOUT
, epoll will wait until the file handle is ready to write.
Knowing this, we can implement functions to start listening for I/O readyness that will return some kind of guard, which will make the epoll stop listening to the specified file descriptor after we drop it.
All of this is nice and everything, but we don't have a way to start polling the future after the I/O wait has completed. For that, we'll need to pass some kind of information to the epoll event so we can resume execution of the future.
The simplest way possible would be to pass a Waker
and a bool
flag as user data. This way, whenever we have ended waiting for the epoll, we can set the flag to true
and call the waker, which will wake the future.
use std::os::fd::AsRawFd;
use std::task::Waker;
use std::cell::Cell;
struct PollEvent<'a, 'b> {
fd: libc::c_int,
recv: &'b (Cell<bool>, Cell<Option<Waker>>),
poll: &'a Epoll,
}
impl Epoll {
pub fn listen_read<'a, 'b>(
&'a self,
fd: &impl AsRawFd,
recv: &'b (Cell<bool>, Cell<Option<Waker>>),
) -> std::io::Result<PollEvent<'a, 'b>> {
return self.listen(fd, libc::EPOLLIN);
}
pub fn listen_write<'a, 'b>(
&'a self,
fd: &impl AsRawFd,
recv: &'b (Cell<bool>, Cell<Option<Waker>>),
) -> std::io::Result<PollEvent<'a, 'b>> {
return self.listen(fd, libc::EPOLLOUT);
}
fn listen<'a, 'b>(
&'a self,
fd: &impl AsRawFd,
recv: &'b (Cell<bool>, Cell<Option<Waker>>),
events: u32,
) -> std::io::Result<PollEvent<'a, 'b>> {
let fd = fd.as_raw_fd();
let mut evt: libc::epoll_event = unsafe { MaybeUninit::zeroed().assume_init() };
evt.events = events | libc::EPOLLET as u32;
evt.u64 = unsafe {
libc::epoll_data {
ptr: recv as *const _ as *mut libc::c_void,
}
.u64
};
return match unsafe {
err_handle(libc::epoll_ctl(
self.fd.as_raw_fd(),
libc::EPOLL_CTL_ADD,
fd,
&mut evt,
))
} {
Ok(_) => {
self.size.set(self.size.get() + 1);
Ok(PollEvent {
fd,
poll: self,
})
}
Err(e) => Err(e),
};
}
pub fn wait(&self) -> std::io::Result<()> {
let size = self.size.get();
let sink: &mut Vec<libc::epoll_event> = unsafe { &mut *self.sink.get() };
sink.reserve(size);
let res: libc::c_int = unsafe {
loop {
match err_handle(libc::epoll_wait(
self.fd.as_raw_fd(),
sink.as_mut_ptr(),
size as i32,
-1,
)) {
Ok(x) => break x,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
};
unsafe { sink.set_len(res as usize) };
for evt in sink.drain(..) {
let data: &(Cell<bool>, Cell<Option<Waker>>) =
unsafe { &*epoll_data { u64: evt.u64 }.ptr.cast() };
data.0.set(true);
if let Some(waker) = data.1.take() {
waker.wake()
}
}
return std::io::Result::Ok(());
}
}
On the other end, we'll need a function for our PollEvent
struct that allows us to asynchronously wait for the epoll to notify us that we're ready to keeep executing.
To stop listening, we must call epoll_ctl
in EPOLL_CTL_DEL
mode.
use std::future::poll_fn;
struct PollEvent<'a, 'b> {
fd: libc::c_int,
recv: &'b (Cell<bool>, Cell<Option<Waker>>),
poll: &'a Epoll,
}
impl<'a, 'b> PollEvent<'a, 'b> {
pub async fn wait(&mut self) {
poll_fn(|cx| self.poll_wait(cx)).await;
}
fn poll_wait(&mut self, cx: &mut Context<'_>) -> std::task::Poll<()> {
if self.recv.0.replace(false) {
return std::task::Poll::Ready(());
} else {
if let Some(prev) = self.recv.1.take() {
if prev.will_wake(cx.waker()) {
self.recv.1.set(Some(prev));
return std::task::Poll::Pending;
}
}
self.recv.1.set(Some(cx.waker().clone()));
return std::task::Poll::Pending;
}
}
}
impl<'a, 'b> Drop for PollEvent<'a, 'b> {
fn drop(&mut self) {
self.poll.size.set(self.poll.size.get() - 1);
unsafe {
err_handle(libc::epoll_ctl(
self.poll.fd.as_raw_fd(),
libc::EPOLL_CTL_DEL,
self.fd,
core::ptr::null_mut(), ))
.unwrap();
}
}
}
After all this work, we can implement some basic read
and write
methods that will be able to asynchronously read/write on structs that implement std::os::fd::AsRawFd
and (std::io::Read
or std::io::Write
).
Note that in order to call this futures without blocking the thread, this structs need to be setup to execute in non-blocking mode.
use std::io::{Read, Write};
use std::os::fd::AsRawFd;
use std::cell::Cell;
async fn read<R: ?Sized + AsRawFd + Read>(
poll: &Epoll,
reader: &mut R,
buf: &mut [u8],
) -> std::io::Result<usize> {
let flag: (Cell<bool>, Cell<Option<Waker>>) = (Cell::new(false), Cell::new(None));
let mut handle: PollEvent = poll.listen_read(reader, &flag)?;
loop {
match reader.read(buf) {
Ok(x) => return Ok(x),
Err(ref e) if matches!(e.kind(), ErrorKind::WouldBlock | ErrorKind::Interrupted) => {
handle.wait().await
}
Err(e) => return Err(e),
}
}
}
async fn write<W: ?Sized + AsRawFd + Write>(
poll: &Epoll,
writer: &mut W,
buf: &[u8],
) -> std::io::Result<usize> {
let flag: (Cell<bool>, Cell<Option<Waker>>) = (Cell::new(false), Cell::new(None));
let mut handle: PollEvent = poll.listen_write(writer, &flag)?;
loop {
match writer.write(buf) {
Ok(x) => return Ok(x),
Err(ref e) if matches!(e.kind(), ErrorKind::WouldBlock | ErrorKind::Interrupted) => {
handle.wait().await
}
Err(e) => return Err(e),
};
}
}
Awesome! We can now listen for I/O readyness, but we still haven't created our Waker
that will actually resume execution of the future.
The thing is, we don't actually need one. You see, after polling the epoll, the runner will automatically repoll the future anyways, so a noop waker would sufice us, kind of.
That would be true if the only thing that could wake up our future was an I/O event, but that's not always the case.
extern crate utils_atomics;
use utils_atomics::flag::mpsc;
pub async fn oups() {
let (send, recv) = mpsc::async_flag();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(2));
send.mark();
});
recv.await;
println!("Wait completed!");
}
Thats right! Other sections of our code could be waking parts of our future, and a noop waker wouldn't catch those wakeup calls! For that, we would need our Waker
implementation to fake an I/O event to the epoll, which we can do with an eventfd
.
We can set up an eventfd
so that the epoll listens to it, and when the Waker
is awaken, it will send a signal to it, waking up the epoll as well.
use std::os::fd::{FromRawFd, IntoRawFd, AsRawFd};
pub struct Event(OwnedFd);
impl Event {
pub fn new() -> std::io::Result<Self> {
let fd = unsafe { err_handle(libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC))? };
return unsafe { Ok(Self(OwnedFd::from_raw_fd(fd))) };
}
pub fn notify(&self) -> std::io::Result<()> {
let value = 1u64;
unsafe {
err_handle(libc::write(self.0.as_raw_fd(), std::ptr::addr_of!(value).cast(), 8) as libc::c_int)?;
return Ok(());
}
}
pub fn try_clone(&self) -> std::io::Result<Event> {
self.0.try_clone().map(Self)
}
pub fn try_get(&self) -> std::io::Result<bool> {
let mut buf = 0u64;
unsafe {
return match err_handle(
libc::read(self.0.as_raw_fd(), std::ptr::addr_of_mut!(buf).cast(), 8) as libc::c_int
) {
Ok(_) => Ok(true),
Err(ref e) if matches!(e.kind(), ErrorKind::WouldBlock | ErrorKind::Interrupted) => {
Ok(false)
}
Err(e) => return Err(e),
};
};
}
}
impl IntoRawFd for Event {
fn into_raw_fd(self) -> std::os::fd::RawFd {
self.0.into_raw_fd()
}
}
impl FromRawFd for Event {
unsafe fn from_raw_fd(fd: std::os::fd::RawFd) -> Self {
Self(OwnedFd::from_raw_fd(fd))
}
}
impl AsRawFd for Event {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
self.0.as_raw_fd()
}
}
Now that we have our rustified eventfd
, we can implement our Waker
.
To do so, we'll first need to create a RawWaker
, which consists of a user data pointer and a RawWakerVTable
.
Instead of using a pointer for our user data, we'll pass the eventfd
as if it were a pointer, and implement the VTable's functions with it.
use std::{
mem::ManuallyDrop,
task::{RawWaker, RawWakerVTable, Waker},
};
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
unsafe fn clone(data: *const ()) -> RawWaker {
let signal = ManuallyDrop::new(Event::from_raw_fd(data as libc::c_int));
let new_signal = signal.try_clone().unwrap();
return raw_waker(new_signal);
}
unsafe fn wake(data: *const ()) {
let signal = Event::from_raw_fd(data as libc::c_int);
signal.notify().unwrap();
}
unsafe fn wake_by_ref(data: *const ()) {
let signal = ManuallyDrop::new(Event::from_raw_fd(data as libc::c_int));
signal.notify().unwrap();
}
unsafe fn drop(data: *const ()) {
std::mem::drop(Event::from_raw_fd(data as libc::c_int));
}
fn raw_waker(signal: Event) -> RawWaker {
let signal = signal.into_raw_fd();
return RawWaker::new(signal as *const (), &VTABLE);
}
pub fn waker(signal: Event) -> Waker {
unsafe { Waker::from_raw(raw_waker(signal)) }
}
And after adapting our Runner
implementation...
impl Runner {
pub fn block_on<T>(&self, fut: impl Future<Output = T>) -> std::io::Result<T> {
let mut fut = std::pin::pin!(fut);
let event = Event::new()?;
let recv = (std::cell::Cell::new(false), std::cell::Cell::new(None));
let handle: PollEvent = self.poll.listen_read(&event, &recv)?;
let waker: Waker = waker(signal.try_clone()?);
let mut cx: Context<'_> = Context::from_waker(&waker);
loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(x) => return Ok(x),
Poll::Pending => self.epoll.wait()?,
}
let _ = event.try_get()?;
}
}
}
We have a functional, single-threaded future runner!
To make our life's easier, we'll create a thread-local variable so that each thread has it's own runner.
thread_local! {
static LOCAL_RUNNER: UnsafeCell<Runner> = UnsafeCell::new(Runner::new().unwrap());
}
#[derive(Debug)]
#[repr(transparent)]
pub struct RunnerRef(*const Runner);
impl Deref for RunnerRef {
type Target = Runner;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { &*self.0 }
}
}
pub fn block_on<Fut: Future>(fut: Fut) -> Fut::Output {
return with_runner(|runner| runner.block_on(fut)).unwrap()
}
pub(crate) fn runner() -> RunnerRef {
return with_runner(|r| RunnerRef(r));
}
pub(crate) fn with_runner<F: FnOnce(&Runner) -> T, T>(f: F) -> T {
LOCAL_RUNNER.with(|runner| f(unsafe { &mut *runner.get() }))
}
Nighty night!
Let's test that our brand spanking new runtime is working correctly by implementing an async sleep
function that will use our epoll
.
Unix provides the timerfd
API to handle timers through file handles, exactly what we need to integrate them with our runtime!
use std::os::fd::OwnedFd;
use std::ptr::addr_of_mut;
#[derive(Debug)]
pub struct Timer(OwnedFd);
impl Timer {
pub fn new_timeout(until: Duration) -> std::io::Result<Self> {
let mut this = Self::new_(libc::CLOCK_MONOTONIC)?;
this.setup_timeout(until)?;
return Ok(this);
}
}
impl Timer {
fn new_(clockid: libc::c_int) -> std::io::Result<Self> {
let fd = unsafe { err_handle(timerfd_create(clockid, libc::TFD_NONBLOCK | libc::TFD_CLOEXEC))? };
return unsafe { Ok(Self(OwnedFd::from_raw_fd(fd))) };
}
pub fn ticks(&self) -> std::io::Result<u64> {
loop {
unsafe {
let mut tmp = 0u64;
return match err_handle(
libc::read(self.0.as_raw_fd(), addr_of_mut!(tmp).cast(), 8) as libc::c_int
) {
Ok(_) => Ok(tmp),
Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(0),
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => Err(e),
};
}
}
}
fn setup_timeout(&mut self, delta: Duration) -> std::io::Result<()> {
let value = libc::itimerspec {
it_interval: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
it_value: dur2timespec_saturating(delta),
};
unsafe {
err_handle(libc::timerfd_settime(
self.0.as_raw_fd(),
0,
&value,
core::ptr::null_mut(),
))?
};
return Ok(());
}
}
impl std::os::fd::AsRawFd for Timer {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
self.0.as_raw_fd()
}
}
As we can see in the example, we create the file descriptor via timerfd_create
, and set the time we want to sleep via timerfd_settime
.
timerfd
s can also be used to create intervals, but that functionality is beyond the intent of this example.
After that, we just need to pass the timerfd
to our Poll
and we'll have a functional async sleep function!
pub async fn sleep(dur: Duration) {
let timer = Timer::new_timeout(dur).unwrap();
let recv = (std::cell::Cell::new(false), std::cell::Cell::new(None));
let runner: RunnerRef = runner();
let mut handle = runner.epoll.listen_read(&timer, &recv).unwrap();
loop {
while timer.ticks().unwrap() == 0 {
handle.wait().await
}
}
}
And done! Here is an example main
function!
pub fn main() {
block_on(async move {
println!("Hello");
sleep(Duration::from_secs(1)).await;
println!("world!");
});
}