Capability Runtime

In the previous sections we focused on building applications in Crux and using its public APIs to do so. In this and the following chapters, we'll look at how the internals of Crux work, starting with the capability runtime.

The capability runtime is a set of components that processes effects, presenting the two perspectives we previously mentioned:

  • For the core, the shell appears to be a platform with a message based system interface
  • For the shell, the core appears as a stateful library responding to events with request for side-effects

There are a few challenges to solve in order to facilitate this interface. First, each run of the update function can call several capabilities. The requested effects are expected to be emitted together, and each batch of effects will be processed concurrently, so the calls can't be blocking. Second, each effect requested from a capability may require multiple round-trips between the core and shell to conclude and we don't want to require a call to update per round trip, so we need some ability to "suspend" execution in capabilities while waiting for an effect to be fulfilled. The ability to suspend effects introduces a new challenge - effects started in a particular capability and suspended, once resolved, need to continue execution in the same capability.

Given this concurrency and execution suspension, an async interface seems like a good candidate. Capabilities request work from the shell, .await the results, and continue their work when the result has arrived. The call to request_from_shell or stream_from_shell translates into an effect request returned from the current core "transaction" (one call to process_event or resolve).

Note

In this chapter, we will focus on the runtime and the core interface and ignore the serialization, bridge and FFI, and return to them in the following sections. The examples will assume a Rust based shell.

Async runtime

One of the fairly unique aspects of Rust's async is the fact that it doesn't come with a bundled runtime. This is recognising that asynchronous execution is useful in various different scenarios, and no one runtime can serve all of them. Crux takes advantage of this and brings its own runtime, tailored to the execution of side-effects on top of a message based interface.

For a deeper background on Rust's async architecture, we recommend the Asynchronous Programming in Rust book, especially the chapter about executing futures and tasks. We will assume you are familiar with the basic ideas and mechanics of async here.

The job of an async runtime is to manage a number of tasks, each driving one future to completion. This management is done by an executor, which is responsible for scheduling the futures and polling them at the right time to drive their execution forward. Most "grown up" runtimes will do this on a number of threads in a thread pool, but in Crux, we run in the context of a single function call (of the app's update function) and potentially in a webassembly context which is single threaded anyway, so our baby runtime only needs to poll all the tasks sequentially, to see if any of them need to continue.

Polling all the tasks would work, and in our case wouldn't even be that inefficient, but the async system is set up to avoid unnecessary polling of futures with one additional concept - wakers. A waker is a mechanism which can be used to signal to the executor that something that a given task is waiting on has changed, and the task's future should be polled, because it will be able to proceed. This is how "at the right time" from the above paragraph is decided.

In our case there's a single situation which causes such a change - a result has arrived from the shell, for a particular effect requested earlier.

Warning

Always use the capability APIs provided by Crux for async work (see the capabilities chapter). Using other async APIs can lead to unexpected behaviour, because the resulting futures are not tied to crux effects. Such futures will resolve, but only after the next shell request causes the crux executor to execute.

One effect's life cycle

So, step by step, our strategy for the capabilities to handle effects is:

  1. A capability spawns a task and submits a future with some code to run
  2. The new task is scheduled to be polled next time the executor runs
  3. The executor goes through the list of ready tasks until it gets to our task and polls it
  4. The future runs to the point where the first async call is awaited. In capabilities, this should only be a future returned from one of the calls to request something from the shell, or a future resulting from a composition of such futures (through async method calls or combinators like select or join).
  5. The shell request future's first step is to create the request and prepare it to be sent. We will look at the mechanics of the sending shortly, but for now it's only important that part of this request is a callback used to resolve it.
  6. The request future, as part of the first poll by the executor, sends the request to be handed to the shell. As there is no result from the shell yet, it returns a pending state and the task is suspended.
  7. The request is passed on to the shell to resolve (as a return value from process_event or resolve)
  8. Eventually, the shell has a result ready for the request and asks the core to resolve the request.
  9. The request callback mentioned above is executed, puts the provided result in the future's mutable state, and calls the future's waker, also stored in the future's state, to wake the future up. The waker enqueues the future for processing on the executor.
  10. The executor runs again (asked to do so by the core's resolve API after calling the callback), and polls the awoken future.
  11. the future sees there is now a result available and continues the execution of the original task until a further await or until completion.

The cycle may repeat a few times, depending on the capability implementation, but eventually the original task completes and is removed.

This is probably a lot to take in, but the basic gist is that capability futures (the ones submitted to spawn) always pause on request futures (the ones returned from request_from_shell et al.), which submit requests. Resolving requests updates the state of the original future and wakes it up to continue execution.

With that in mind we can look at the individual moving parts and how they communicate.

Spawning tasks on the executor

The first step for anything to happen is spawning a task from a capability. Each capability is created with a CapabilityContext. This is the definition:

pub struct CapabilityContext<Op, Event>
where
    Op: Operation,
{
    inner: std::sync::Arc<ContextInner<Op, Event>>,
}

struct ContextInner<Op, Event>
where
    Op: Operation,
{
    shell_channel: Sender<Request<Op>>,
    app_channel: Sender<Event>,
    spawner: executor::Spawner,
}

There are a couple of sending ends of channels for requests and events, which we will get to soon, and also a spawner, from the executor module. The Spawner looks like this:

#[derive(Clone)]
pub struct Spawner {
    future_sender: Sender<BoxFuture>,
}

also holding a sending end of a channel, this one for Tasks.

Tasks are a fairly simple data structure, holding a future and another sending end of the tasks channel, because tasks need to be able to submit themselves when awoken.

Tasks are spawned by the Spawner as follows:

impl Spawner {
    pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        self.future_sender
            .send(future)
            .expect("unable to spawn an async task, task sender channel is disconnected.")
    }
}

after constructing a task, it is submitted using the task sender.

The final piece of the puzzle is the executor itself:

pub(crate) struct QueuingExecutor {
    spawn_queue: Receiver<BoxFuture>,
    ready_queue: Receiver<TaskId>,
    ready_sender: Sender<TaskId>,
    tasks: Mutex<Slab<Option<BoxFuture>>>,
}

This is the receiving end of the channel from the spawner.

The executor has a single public method, run_all:

impl QueuingExecutor {
    pub fn run_all(&self) {
        // we read off both queues and execute the tasks we receive.
        // Since either queue can generate work for the other queue,
        // we read from them in a loop until we are sure both queues
        // are exhausted
        let mut did_some_work = true;

        while did_some_work {
            did_some_work = false;
            while let Ok(task) = self.spawn_queue.try_recv() {
                let task_id = self
                    .tasks
                    .lock()
                    .expect("Task slab poisoned")
                    .insert(Some(task));
                self.run_task(TaskId(task_id.try_into().expect("TaskId overflow")));
                did_some_work = true;
            }
            while let Ok(task_id) = self.ready_queue.try_recv() {
                match self.run_task(task_id) {
                    RunTask::Unavailable => {
                        // We were unable to run the task as it is (presumably) being run on
                        // another thread. We re-queue the task for 'later' and do NOT set
                        // `did_some_work = true`. That way we will keep looping and doing work
                        // until all remaining work is 'unavailable', at which point we will bail
                        // out of the loop, leaving the queued work to be finished by another thread.
                        // This strategy should avoid dropping work or busy-looping
                        // FIXME: are we potentially sending ourselves `Unavailable` and reading it
                        // in a loop - busy looping here?
                        self.ready_sender.send(task_id).expect("could not requeue");
                    }
                    RunTask::Missing => {
                        // This is possible if a naughty future sends a wake notification while
                        // still running, then runs to completion and is evicted from the slab.
                        // Nothing to be done.
                    }
                    RunTask::Suspended | RunTask::Completed => did_some_work = true,
                }
            }
        }
    }

    fn run_task(&self, task_id: TaskId) -> RunTask {
        let mut lock = self.tasks.lock().expect("Task slab poisoned");
        let Some(task) = lock.get_mut(*task_id as usize) else {
            return RunTask::Missing;
        };
        let Some(mut task) = task.take() else {
            // the slot exists but the task is missing - presumably it
            // is being executed on another thread
            return RunTask::Unavailable;
        };

        // free the mutex so other threads can make progress
        drop(lock);

        let waker = Arc::new(TaskWaker {
            task_id,
            sender: self.ready_sender.clone(),
        })
        .into();
        let context = &mut Context::from_waker(&waker);

        // poll the task
        if task.as_mut().poll(context).is_pending() {
            // If it's still pending, put the future back in the slot
            self.tasks
                .lock()
                .expect("Task slab poisoned")
                .get_mut(*task_id as usize)
                .expect("Task slot is missing")
                .replace(task);
            RunTask::Suspended
        } else {
            // otherwise the future is completed and we can free the slot
            self.tasks.lock().unwrap().remove(*task_id as usize);
            RunTask::Completed
        }
    }
}

enum RunTask {
    Missing,
    Unavailable,
    Suspended,
    Completed,
}

besides the locking and waker mechanics, the gist is quite simple - while there are tasks in the ready_queue, poll the future held in each one.

The last interesting bit of this part is how the waker is provided to the poll call. The waker_ref creates a waker which, when asked to wake up, will call this method on the task:

impl Wake for TaskWaker {
    fn wake(self: Arc<Self>) {
        self.wake_by_ref();
    }

    fn wake_by_ref(self: &Arc<Self>) {
        // This send can fail if the executor has been dropped.
        // In which case, nothing to do
        let _ = self.sender.send(self.task_id);
    }
}

this is where the task resubmits itself for processing on the next run.

While there are a lot of moving pieces involved, the basic mechanics are relatively straightforward - tasks are submitted either by the spawner, or the futures awoken by arriving responses to the requests they submitted. The queue of tasks is processed whenever run_all is called on the executor. This happens in the Core API implementation. Both process_event and resolve call run_all after their respective task - calling the app's update function, or resolving the given task.

Now we know how the futures get executed, suspended and resumed, we can examine the flow of information between capabilities and the Core API calls layered on top.

Requests flow from capabilities to the shell

The key to understanding how the effects get processed and executed is to name all the various pieces of information, and discuss how they are wrapped in each other.

The basic inner piece of the effect request is an operation. This is the intent which the capability is submitting to the shell. Each operation has an associated output value, with which the operation request can be resolved. There are multiple capabilities in each app, and in order for the shell to easily tell which capability's effect it needs to handle, we wrap the operation in an effect. The Effect type is a generated enum based on the app's set of capabilities, with one variant per capability. It allows us to multiplex (or type erase) the different typed operations into a single type, which can be matched on to process the operations.

Finally, the effect is wrapped in a request which carries the effect, and an associated resolve callback to which the output will eventually be given. We discussed this callback in the previous section - its job is to update the paused future's state and resume it. The request is the value passed to the shell, and used as both the description of the effect intent, and the "token" used to resolve it.

Now we can look at how all this wrapping is facilitated. Recall from the previous section that each capability has access to a CapabilityContext, which holds a sending end of two channels, one for events - the app_channel and one for requests - the shell_channel, whose type is Sender<Request<Op>>. These channels serve both as thread synchronisation and queueing mechanism between the capabilities and the core of crux. As you can see, the requests expected are typed for the capability's operation type.

Looking at the core itself, we see their Receiver ends.

pub struct Core<A>
where
    A: App,
{
    // WARNING: The user controlled types _must_ be defined first
    // so that they are dropped first, in case they contain coordination
    // primitives which attempt to wake up a future when dropped. For that
    // reason the executor _must_ outlive the user type instances

    // user types
    model: RwLock<A::Model>,
    capabilities: A::Capabilities,
    app: A,

    // internals
    requests: Receiver<A::Effect>,
    capability_events: Receiver<A::Event>,
    executor: QueuingExecutor,

    // temporary command support
    command_spawner: CommandSpawner<A::Effect, A::Event>,
}

One detail to note is that the receiving end of the requests channel is a Receiver<Ef>. The channel has an additional feature - it can map between the input types and output types, and, in this case, serve as a multiplexer, wrapping the operation in the corresponding Effect variant. Each sending end is specialised for the respective capability, but the receiving end gets an already wrapped Effect.

A single update cycle

To piece all these things together, lets look at processing a single call from the shell. Both process_event and resolve share a common step advancing the capability runtime.

Here is process_event:

    pub fn process_event(&self, event: A::Event) -> Vec<A::Effect> {
        let mut model = self.model.write().expect("Model RwLock was poisoned.");

        let command = self.app.update(event, &mut model, &self.capabilities);

        // drop the model here, we don't want to hold the lock for the process() call
        drop(model);

        self.command_spawner.spawn(command);
        self.process()
    }

and here is resolve:

    pub fn resolve<Op>(&self, request: &mut Request<Op>, result: Op::Output) -> Vec<A::Effect>
    where
        Op: Operation,
    {
        let resolve_result = request.resolve(result);
        debug_assert!(resolve_result.is_ok());

        self.process()
    }

The interesting things happen in the common process method:

    pub(crate) fn process(&self) -> Vec<A::Effect> {
        self.executor.run_all();

        while let Some(capability_event) = self.capability_events.receive() {
            let mut model = self.model.write().expect("Model RwLock was poisoned.");
            let command = self
                .app
                .update(capability_event, &mut model, &self.capabilities);

            drop(model);

            self.command_spawner.spawn(command);
            self.executor.run_all();
        }

        self.requests.drain().collect()
    }

First, we run all ready tasks in the executor. There can be new tasks ready because we just ran the app's update function (which may have spawned some task via capability calls) or resolved some effects (which woke up their suspended futures).

Next, we drain the events channel (where events are submitted from capabilities by context.update_app) and one by one, send them to the update function, running the executor after each one.

Finally, we collect all of the effect requests submitted in the process and return them to the shell.

Resolving requests

We've now seen everything other than the mechanics of resolving requests. This is ultimately just a callback carried by the request, but for additional type safety, it is tagged by the expected number of resolutions

type ResolveOnce<Out> = Box<dyn FnOnce(Out) + Send>;
type ResolveMany<Out> = Box<dyn Fn(Out) -> Result<(), ()> + Send>;

/// Resolve is a callback used to resolve an effect request and continue
/// one of the capability Tasks running on the executor.
pub(crate) enum Resolve<Out> {
    Never,
    Once(ResolveOnce<Out>),
    Many(ResolveMany<Out>),
}

We've already mentioned the resolve function itself briefly, but for completeness, here's an example from request_from_shell:

        let request = Request::resolves_once(operation, move |result| {
            let Some(shared_state) = callback_shared_state.upgrade() else {
                // The ShellRequest was dropped before we were called, so just
                // do nothing.
                return;
            };

            let mut shared_state = shared_state.lock().unwrap();

            // Attach the result to the shared state of the future
            shared_state.result = Some(result);
            // Signal the executor to wake the task holding this future
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

Bar the locking and sharing mechanics, all it does is update the state of the future (shared_state) and then calls wake on the future's waker to schedule it on the executor.

In the next chapter, we will look at how this process changes when Crux is used via an FFI interface where requests and responses need to be serialised in order to pass across the language boundary.