Command Runtime
In the previous sections we focused on building applications in Crux and using its public APIs to do so. In this and the following chapters, we'll look at how the internals of Crux work, starting with the command runtime.
The command runtime is a set of components that process effects, presenting the two perspectives we previously mentioned:
- For the core, the shell appears to be a platform with a message based system interface
- For the shell, the core appears as a stateful library responding to events with requests for side-effects
There are a few challenges to solve in order to facilitate this interface.
First, each run of the update function returns a Command which may
contain several concurrent tasks, each requesting effects from the shell.
The requested effects are expected to be emitted together, and each batch
of effects will be processed concurrently, so the calls can't be blocking.
Second, each effect may require multiple round-trips between the core and
shell to conclude and we don't want to require a call to update per
round trip, so we need some ability to "suspend" execution while waiting
for an effect to be fulfilled. The ability to suspend effects introduces a
new challenge — effects which are suspended need, once resolved, to
continue execution in the same async task.
Given this concurrency and execution suspension, an async interface seems
like a good candidate. Commands request work from the shell, .await the
results, and continue their work when the result has arrived. The call to
request_from_shell or stream_from_shell translates into an effect
request returned from the current core "transaction" (one call to
process_event or resolve).
In this chapter, we will focus on the runtime and the core interface and ignore the serialisation, bridge and FFI, and return to them in the following sections. The examples will assume a Rust based shell.
Async runtime
One of the fairly unique aspects of Rust's async is the fact that it doesn't come with a bundled runtime. This is recognising that asynchronous execution is useful in various different scenarios, and no one runtime can serve all of them. Crux takes advantage of this and brings its own runtime, tailored to the execution of side-effects on top of a message based interface.
For a deeper background on Rust's async architecture, we recommend the Asynchronous Programming in Rust book, especially the chapter about executing futures and tasks. We will assume you are familiar with the basic ideas and mechanics of async here.
The job of an async runtime is to manage a number of tasks, each driving one
future to completion. This management is done by an executor, which is
responsible for scheduling the futures and polling them at the right time to
drive their execution forward. General-purpose runtimes like Tokio do
this on a number of threads in a thread pool, but in Crux, we run in
the context of a single function call (of the app's update function)
and potentially in a WebAssembly context which is single-threaded
anyway, so our runtime only needs to poll all the tasks sequentially,
to see if any of them need to continue.
Polling all the tasks would work, and in our case wouldn't even be that inefficient, but the async system is set up to avoid unnecessary polling of futures with one additional concept - wakers. A waker is a mechanism which can be used to signal to the executor that something that a given task is waiting on has changed, and the task's future should be polled, because it will be able to proceed. This is how "at the right time" from the above paragraph is decided.
In our case there's a single situation which causes such a change - a result has arrived from the shell, for a particular effect requested earlier.
Always use the Command APIs provided by Crux for async work (see the capabilities chapter). Using other async APIs can lead to unexpected behaviour, because the resulting futures are not tied to Crux effects. Such futures will resolve, but only after the next shell request causes the Crux executor to execute.
If you want to depend on a crate that requires a standard runtime like Tokio, you can integrate it through an effect via middleware.
One effect's life cycle
So, step by step, our strategy for commands to handle effects is:
- A
Commandcreates a task containing a future with some code to run (viaCommand::neworctx.spawn) - The new task is scheduled to be polled next time the executor runs
- The executor goes through the list of ready tasks until it gets to our task and polls it
- The future runs to the point where the first async call is
awaited. In commands, this should only be a future returned from one of the calls to request something from the shell, or a future resulting from a composition of such futures (through async method calls or combinators likeselectorjoin). - The shell request future's first step is to create the request and prepare it to be sent. We will look at the mechanics of the sending shortly, but for now it's only important that part of this request is a callback used to resolve it.
- The request future, as part of the first poll by the executor, sends the request to be handed to the shell. As there is no result from the shell yet, it returns a pending state and the task is suspended.
- The request is passed on to the shell to resolve (as a return value
from
process_eventorresolve) - Eventually, the shell has a result ready for the request and asks
the core to
resolvethe request. - The request's resolve callback is executed, sending the provided result through an internal channel. The channel wakes the future's waker, which enqueues the task for processing on the executor.
- The executor runs again (asked to do so by the core's
resolveAPI after calling the callback), and polls the awoken future. - The future sees there is now a result available and continues the execution of the original task until a further await or until completion.
The cycle may repeat a few times, depending on the command implementation, but eventually the original task completes and is removed.
This is probably a lot to take in, but the basic gist is that command
futures (the ones created by Command::new or ctx.spawn) always
pause on request futures (the ones returned from request_from_shell
et al.), which submit requests. Resolving requests updates the state
of the original future and wakes it up to continue execution.
With that in mind we can look at the individual moving parts and how they communicate.
Spawning tasks on the executor
The first step for anything to happen is creating a Command with a
task. Each task runs within a CommandContext, which provides the
interface for communicating with the shell and the app:
pub struct CommandContext<Effect, Event> {
pub(crate) effects: Sender<Effect>,
pub(crate) events: Sender<Event>,
pub(crate) tasks: Sender<Task>,
pub(crate) rc: Arc<()>,
}
There are sending ends of channels for effects and events, and also
a sender for spawning new tasks. The rc field is a reference
counter used to track whether any contexts are still alive
(indicating the command may still produce more work).
A Command is itself an async executor, managing a set of tasks:
#[must_use = "Unused commands never execute. Return the command from your app's update function or combine it with other commands with Command::and or Command::all"]
pub struct Command<Effect, Event> {
effects: Receiver<Effect>,
events: Receiver<Event>,
context: CommandContext<Effect, Event>,
// Executor internals
// TODO: should this be a separate type?
ready_queue: Receiver<TaskId>,
spawn_queue: Receiver<Task>,
tasks: Slab<Task>,
ready_sender: Sender<TaskId>, // Used in creating wakers for tasks
waker: Arc<AtomicWaker>, // Shared with task wakers when polled in async context
// Signaling
aborted: Arc<AtomicBool>,
}
It holds the receiving ends of the effect and event channels, along
with the executor internals: a Slab of tasks, a ready queue of
task IDs, and a spawn queue for new tasks.
Each Task is a simple data structure holding a future and some
coordination state:
pub(crate) struct Task {
// Used to wake the join handle when the task concludes
pub(crate) join_handle_wakers: Receiver<Waker>,
// Set to true when the task finishes, used by the join handle
// RFC: is there a safe way to do this relying on the waker alone?
pub(crate) finished: Arc<AtomicBool>,
// Set to true when the task is aborted. Aborted tasks will poll Ready on the
// next poll
pub(crate) aborted: Arc<AtomicBool>,
// The future polled by this task
pub(crate) future: BoxFuture<'static, ()>,
}
Tasks are spawned by CommandContext::spawn:
pub fn spawn<F, Fut>(&self, make_future: F) -> JoinHandle
where
F: FnOnce(CommandContext<Effect, Event>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let (sender, receiver) = crossbeam_channel::unbounded();
let ctx = self.clone();
let future = make_future(ctx);
let task = Task {
finished: Arc::default(),
aborted: Arc::default(),
future: future.boxed(),
join_handle_wakers: receiver,
};
let handle = JoinHandle {
finished: task.finished.clone(),
aborted: task.aborted.clone(),
register_waker: sender,
};
self.tasks
.send(task)
.expect("Command could not spawn task, tasks channel disconnected");
handle
}
After constructing a task with the future returned by the closure,
it is sent to the command's spawn queue. A JoinHandle is returned,
which can be used to await the task's completion or abort it.
The command runs all tasks to completion (or suspension) with
run_until_settled:
pub(crate) fn run_until_settled(&mut self) {
if self.was_aborted() {
// Spawn new tasks to clear the spawn_queue as well
self.spawn_new_tasks();
self.tasks.clear();
return;
}
loop {
self.spawn_new_tasks();
if self.ready_queue.is_empty() {
break;
}
while let Ok(task_id) = self.ready_queue.try_recv() {
match self.run_task(task_id) {
TaskState::Missing | TaskState::Suspended => {
// Missing:
// The task has been evicted because it completed. This can happen when
// a _running_ task schedules itself to wake, but then completes and gets
// removed
// Suspended:
// we pick it up again when it's woken up
}
TaskState::Completed | TaskState::Cancelled => {
// Remove and drop the task, it's finished
let task = self.tasks.remove(task_id.0);
task.finished.store(true, Ordering::Release);
task.wake_join_handles();
drop(task);
}
}
}
}
}
The method first checks if the command has been aborted. If not, it loops: spawning any new tasks from the spawn queue, then polling each ready task. Tasks that complete are removed. Tasks that are suspended wait to be woken.
The waking mechanism is provided by CommandWaker:
pub(crate) struct CommandWaker {
pub(crate) task_id: TaskId,
pub(crate) ready_queue: Sender<TaskId>,
// Waker for the executor running this command as a Stream.
// When the command is executed directly (e.g. in tests) this waker
// will not be registered.
pub(crate) parent_waker: Arc<AtomicWaker>,
woken: AtomicBool,
}
impl Wake for CommandWaker {
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}
fn wake_by_ref(self: &Arc<Self>) {
// If we can't send the id to the ready queue, there is no Command to poll the task again anyway,
// nothing to do.
// TODO: Does that mean we should bail, since waking ourselves is
// now pointless?
let _ = self.ready_queue.send(self.task_id);
self.woken.store(true, Ordering::Release);
// Note: calling `wake` before `register` is a no-op
self.parent_waker.wake();
}
}
When a task's future needs to be woken (because a shell response has arrived), the waker sends the task's ID back to the ready queue and also wakes the parent waker (used when the command is running as a stream inside another command).
While there are a lot of moving pieces involved, the basic mechanics
are relatively straightforward — tasks are submitted either by
Command::new, ctx.spawn, or awoken by arriving responses to the
requests they submitted. The queue of tasks is processed whenever
run_until_settled is called. This happens in the Core API
implementation: both process_event and resolve trigger it as
part of their processing.
Now we know how the futures get executed, suspended and resumed, we can examine the flow of information between commands and the Core API calls layered on top.
Requests flow from commands to the shell
The key to understanding how the effects get processed and executed is to name all the various pieces of information, and discuss how they are wrapped in each other.
The basic inner piece of the effect request is an operation. This
is the intent which the command is submitting to the shell. Each
operation has an associated output value, with which the operation
request can be resolved. There are multiple capabilities in each
app, and in order for the shell to easily tell which capability's
effect it needs to handle, we wrap the operation in an effect. The
Effect type is a generated enum based on the app's set of
capabilities, with one variant per capability. It allows us to
multiplex (or type erase) the different typed operations into a
single type, which can be matched on to process the operations.
Finally, the effect is wrapped in a request which carries the effect, and an associated resolve callback to which the output will eventually be given. We discussed this callback in the previous section — its job is to send the result through an internal channel, waking up the paused future. The request is the value passed to the shell, and used as both the description of the effect intent, and the "token" used to resolve it.
Each task in a command has access to a CommandContext, which holds
the sending ends of channels for effects and events. When a task
calls request_from_shell, the context creates a Request
containing the operation and a resolve callback, wraps it in the
app's Effect type (via the From trait), and sends it through the
effects channel. The Command collects these effects and surfaces
them to the Core.
Looking at the core itself:
pub struct Core<A>
where
A: App,
{
// WARNING: The user controlled types _must_ be defined first
// so that they are dropped first, in case they contain coordination
// primitives which attempt to wake up a future when dropped. For that
// reason the executor _must_ outlive the user type instances
// user types
model: RwLock<A::Model>,
app: A,
// internals
root_command: Mutex<Command<A::Effect, A::Event>>,
}
The Core holds a root_command — a single long-lived Command
onto which all commands returned from update are spawned. This
root command acts as the top-level executor, collecting all effects
and events across all active commands.
A single update cycle
To piece all these things together, let's look at processing a
single call from the shell. Both process_event and resolve share
a common step advancing the command runtime.
Here is process_event:
pub fn process_event(&self, event: A::Event) -> Vec<A::Effect> {
let mut model = self.model.write().expect("Model RwLock was poisoned.");
let command = self.app.update(event, &mut model);
// drop the model here, we don't want to hold the lock for the process() call
drop(model);
let mut root_command = self
.root_command
.lock()
.expect("Capability runtime lock was poisoned");
root_command.spawn(|ctx| command.into_future(ctx));
drop(root_command);
self.process()
}
and here is resolve:
pub fn resolve<Output>(
&self,
request: &mut impl Resolvable<Output>,
result: Output,
) -> Result<Vec<A::Effect>, ResolveError>
{
let resolve_result = request.resolve(result);
debug_assert!(resolve_result.is_ok());
resolve_result?;
Ok(self.process())
}
The interesting things happen in the common process method:
pub(crate) fn process(&self) -> Vec<A::Effect> {
let mut root_command = self
.root_command
.lock()
.expect("Capability runtime lock was poisoned");
let mut events: VecDeque<_> = root_command.events().collect();
while let Some(event_from_commands) = events.pop_front() {
let mut model = self.model.write().expect("Model RwLock was poisoned.");
let command = self.app.update(event_from_commands, &mut model);
drop(model);
root_command.spawn(|ctx| command.into_future(ctx));
events.extend(root_command.events());
}
root_command.effects().collect()
}
First, we drain events from the root command (which internally runs
all ready tasks before collecting). There can be new events because
we just returned a command from update (which may have immediately
sent events) or resolved some effects (which woke up suspended
futures that then sent events).
For each event, we call update again, spawning the returned
command onto the root command, and drain any further events produced.
This continues until no more events remain.
Finally, we collect all of the effect requests submitted in the process and return them to the shell.
Resolving requests
We've now seen everything other than the mechanics of resolving
requests. The resolve callback is carried by the request as a
RequestHandle, tagged by the expected number of resolutions:
type ResolveOnce<Out> = Box<dyn FnOnce(Out) + Send>;
type ResolveMany<Out> = Box<dyn Fn(Out) -> Result<(), ()> + Send>;
/// Resolve is a callback used to resolve an effect request and continue
/// one of the capability Tasks running on the executor.
pub enum RequestHandle<Out> {
Never,
Once(ResolveOnce<Out>),
Many(ResolveMany<Out>),
}
A RequestHandle can be Never (for notifications that don't
expect a response), Once (for one-shot requests), or Many (for
streaming requests). Resolving a Once handle consumes it, turning
it into Never to prevent double-resolution.
Here's how the resolve callback is set up in request_from_shell:
pub fn request_from_shell<Op>(&self, operation: Op) -> ShellRequest<Op::Output>
where
Op: Operation,
Effect: From<Request<Op>> + Send + 'static,
{
let (output_sender, output_receiver) = mpsc::unbounded();
let request = Request::resolves_once(operation, move |output| {
// If the channel is closed, the associated task has been cancelled
let _ = output_sender.unbounded_send(output);
});
let send_request = {
let effect = request.into();
let effects = self.effects.clone();
move || {
effects
.send(effect)
.expect("Command could not send request effect, effect channel disconnected");
}
};
ShellRequest::new(Box::new(send_request), output_receiver)
}
The callback sends the output through an mpsc channel. On the
receiving end, the ShellRequest future is waiting — when the value
arrives, the channel wakes the future's waker, which schedules the
task on the executor to continue.
In the next chapter, we will look at how this process changes when Crux is used via an FFI interface where requests and responses need to be serialised in order to pass across the language boundary.