crux_core/command/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
//! Command represents one or more side-effects, resulting in interactions with the shell.
//! Core creates Commands and returns them from the `update` function in response to events.
//! Commands can be created directly, but more often they will be created and returned
//! by capability APIs.
//!
//! A Command can execute side-effects in parallel, in sequence or a combination of both. To
//! allow this orchestration they provide both a simple synchronous API and access to an
//! asynchronous API.
//!
//! Command surfaces the effect requests and events sent in response with
//! the [`Command::effects`] and [`Command::events`] methods. These can be used when testing
//! the side effects requested by an `update` call.
//!
//! Internally, Command resembles [`FuturesUnordered`](futures::stream::FuturesUnordered):
//! it manages and polls a number of futures and provides a context which they can use
//! to submit effects to the shell and events back to the application.
//!
//! Command implements [`Stream`](futures::Stream), making it useful in an async context,
//! enabling, for example, wrapping Commands in one another.
//!
//! # Examples
//!
//! TODO: simple command example with a capability API
//!
//! TODO: complex example with sync API
//!
//! TODO: basic async example
//!
//! TODO: async example with `spawn`
//!
//! TODO: cancellation example
//!
//! TODO: testing example
//!
//! TODO: composition example
mod builder;
mod context;
mod executor;
mod stream;
use std::future::Future;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
// TODO: consider switching to flume
use crossbeam_channel::{Receiver, Sender};
use executor::{AbortHandle, Task, TaskId};
use futures::task::AtomicWaker;
use futures::{FutureExt as _, Stream, StreamExt as _};
use slab::Slab;
use stream::CommandStreamExt as _;
pub use stream::CommandOutput;
use crate::capability::Operation;
use crate::Request;
pub struct Command<Effect, Event> {
effects: Receiver<Effect>,
events: Receiver<Event>,
// Executor internals
// TODO: should this be a separate type?
ready_queue: Receiver<TaskId>,
spawn_queue: Receiver<Task>,
tasks: Slab<Task>,
ready_sender: Sender<TaskId>, // Used in creating wakers for tasks
waker: Arc<AtomicWaker>, // Shared with task wakers when polled in async context
// Signaling
aborted: Arc<AtomicBool>,
}
// Public API
impl<Effect, Event> Command<Effect, Event>
where
Effect: Send + 'static,
Event: Send + 'static,
{
/// Create a new command orchestrating effects with async Rust. This is the lowest level
/// API to create a Command if you need full control over its execution. In most cases you will
/// more likely want to create Commands with capabilities, and using the combinator APIs
/// ([`then`], [`and`] and [`all`]) to orchestrate them.
///
/// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
/// events back to the app, and to spawn additional tasks. The closure is expected to return a future
/// which becomes the command's main asynchronous task.
pub fn new<F, Fut>(create_task: F) -> Self
where
F: FnOnce(context::CommandContext<Effect, Event>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
// RFC: do we need to think about backpressure? The channels are unbounded
// so a naughty Command can make massive amounts of requests or spawn a huge number of tasks.
// If these channels supported async, the CommandContext methods could also be async and
// we could give the channels some bounds
let (effect_sender, effect_receiver) = crossbeam_channel::unbounded();
let (event_sender, event_receiver) = crossbeam_channel::unbounded();
let (ready_sender, ready_receiver) = crossbeam_channel::unbounded();
let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded();
let (_, waker_receiver) = crossbeam_channel::unbounded();
let context = context::CommandContext {
effects: effect_sender,
events: event_sender,
tasks: spawn_sender,
};
let aborted: Arc<AtomicBool> = Default::default();
let task = Task {
finished: Default::default(),
aborted: aborted.clone(),
future: create_task(context).boxed(),
join_handle_wakers: waker_receiver,
};
let mut tasks = Slab::with_capacity(1);
let task_id = TaskId(tasks.insert(task));
ready_sender
.send(task_id)
.expect("Could not make task ready, ready channel disconnected");
Command {
effects: effect_receiver,
events: event_receiver,
ready_queue: ready_receiver,
spawn_queue: spawn_receiver,
ready_sender,
tasks,
waker: Default::default(),
aborted,
}
}
/// Create an empty, completed Command. This is useful as a return value from `update` if
/// there are no side-effects to perform.
pub fn done() -> Self {
let (_, effects) = crossbeam_channel::bounded(0);
let (_, events) = crossbeam_channel::bounded(0);
let (_, spawn_queue) = crossbeam_channel::bounded(0);
let (ready_sender, ready_queue) = crossbeam_channel::bounded(0);
Command {
effects,
events,
ready_queue,
spawn_queue,
tasks: Slab::with_capacity(0),
ready_sender,
waker: Default::default(),
aborted: Default::default(),
}
}
/// Create a Command which dispatches an event and terminates. This is an alternative
/// to calling `update` recursively. The only difference is that the two `update` calls
/// will be visible to Crux and can show up in logs or any tooling. The trade-off is that
/// the event is not guaranteed to dispatch instantly - another `update` call which is
/// already scheduled may happen first.
pub fn event(event: Event) -> Self {
Command::new(|ctx| async move { ctx.send_event(event) })
}
/// Create a Command which sends a notification to the shell with a provided `operation`.
///
/// This ia synchronous equivalent of [`CommandContext::notify_shell`].
pub fn notify_shell<Op>(operation: Op) -> Command<Effect, Event>
where
Op: Operation,
Effect: From<Request<Op>>,
{
Command::new(|ctx| async move { ctx.notify_shell(operation) })
}
/// Start a creation of a Command which sends a one-time request to the shell with a provided
/// operation.
///
/// Returns a `RequestBuilder`, which can be converted into a Command directly, or chained
/// with another command builder using `.then`.
///
/// In an async context, `RequestBuilder` can be turned into a future that resolves to the
/// operation output type.
pub fn request_from_shell<Op>(
operation: Op,
) -> builder::RequestBuilder<Effect, Event, impl Future<Output = Op::Output>>
where
Op: Operation,
Effect: From<Request<Op>>,
{
builder::RequestBuilder::new(|ctx| ctx.request_from_shell(operation))
}
/// Start a creation of a Command which sends a stream request to the shell with a provided
/// operation.
///
/// Returns a `StreamBuilder`, which can be converted into a Command directly, or chained
/// with a `RequestBuilder` builder using `.then`.
///
/// In an async context, `StreamBuilder` can be turned into a stream that with the
/// operation output type as item.
pub fn stream_from_shell<Op>(
operation: Op,
) -> builder::StreamBuilder<Effect, Event, impl Stream<Item = Op::Output>>
where
Op: Operation,
Effect: From<Request<Op>>,
{
builder::StreamBuilder::new(|ctx| ctx.stream_from_shell(operation))
}
/// Run the effect state machine until it settles, then return true
/// if there is any more work to do - tasks to run or events or effects to receive
pub fn is_done(&mut self) -> bool {
self.run_until_settled();
self.effects.is_empty() && self.events.is_empty() && self.tasks.is_empty()
}
/// Run the effect state machine until it settles and collect all effects generated
pub fn effects(&mut self) -> impl Iterator<Item = Effect> + '_ {
self.run_until_settled();
self.effects.try_iter()
}
/// Run the effect state machine until it settles and collect all events generated
pub fn events(&mut self) -> impl Iterator<Item = Event> + '_ {
self.run_until_settled();
self.events.try_iter()
}
// Combinators
/// Create a command running self and the other command in sequence
// RFC: is this actually _useful_? Unlike `.then` on `CommandBuilder` this doesn't allow using
// the output of the first command in building the second one, it just runs them in sequence,
// and the benefit is unclear.
pub fn then(self, other: Self) -> Self
where
Effect: Unpin,
Event: Unpin,
{
Command::new(|ctx| async move {
// first run self until done
self.host(ctx.effects.clone(), ctx.events.clone()).await;
// then run other until done
other.host(ctx.effects, ctx.events).await;
})
}
/// Convenience for [`Command::all`] which runs another command concurrently with this one
pub fn and(self, other: Self) -> Self
where
Effect: Unpin,
Event: Unpin,
{
Command::all([self, other])
}
/// Create a command running a number of commands concurrently
pub fn all<I>(commands: I) -> Self
where
I: IntoIterator<Item = Self> + Send + 'static,
Effect: Unpin,
Event: Unpin,
{
Command::new(|ctx| async move {
let select = futures::stream::select_all(commands);
select.host(ctx.effects, ctx.events).await;
})
}
// Mapping for composition
/// Map effects requested as part of this command to a different effect type.
///
/// This is useful when composing apps to convert a command from a child app to a
/// command of the parent app.
pub fn map_effect<F, NewEffect>(self, map: F) -> Command<NewEffect, Event>
where
F: Fn(Effect) -> NewEffect + Send + Sync + 'static,
NewEffect: Send + Unpin + 'static,
Effect: Unpin,
Event: Unpin,
{
Command::new(|ctx| async move {
let mapped = self.map(|output| match output {
CommandOutput::Effect(effect) => CommandOutput::Effect(map(effect)),
CommandOutput::Event(event) => CommandOutput::Event(event),
});
mapped.host(ctx.effects, ctx.events).await;
})
}
/// Map events sent as part of this command to a different effect type
///
/// This is useful when composing apps to convert a command from a child app to a
/// command of the parent app.
pub fn map_event<F, NewEvent>(self, map: F) -> Command<Effect, NewEvent>
where
F: Fn(Event) -> NewEvent + Send + Sync + 'static,
NewEvent: Send + Unpin + 'static,
Effect: Unpin,
Event: Unpin,
{
Command::new(|ctx| async move {
let mapped = self.map(|output| match output {
CommandOutput::Effect(effect) => CommandOutput::Effect(effect),
CommandOutput::Event(event) => CommandOutput::Event(map(event)),
});
mapped.host(ctx.effects, ctx.events).await;
})
}
/// Returns an abort handle which can be used to remotely terminate a running Command
/// and all its subtask.
///
/// This is specifically useful for cancelling subscriptions and long running effects
/// which may get superseded, like timers
pub fn abort_handle(&self) -> AbortHandle {
AbortHandle {
aborted: self.aborted.clone(),
}
}
}
#[cfg(test)]
mod tests;