crux_core/command/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
//! Command represents one or more side-effects, resulting in interactions with the shell.
//! Core creates Commands and returns them from the `update` function in response to events.
//! Commands can be created directly, but more often they will be created and returned
//! by capability APIs.
//!
//! A Command can execute side-effects in parallel, in sequence or a combination of both. To
//! allow this orchestration they provide both a simple synchronous API and access to an
//! asynchronous API.
//!
//! Command surfaces the effect requests and events sent in response with
//! the [`Command::effects`] and [`Command::events`] methods. These can be used when testing
//! the side effects requested by an `update` call.
//!
//! Internally, Command resembles [`FuturesUnordered`](futures::stream::FuturesUnordered):
//! it manages and polls a number of futures and provides a context which they can use
//! to submit effects to the shell and events back to the application.
//!
//! Command implements [`Stream`], making it useful in an async context,
//! enabling, for example, wrapping Commands in one another.
//!
//! # Examples
//!
//! Commands are typically created by a capability and returned from the update function. Capabilities
//! normally return a builder, which can be used in both sync and async context. The basic sync use
//! is to bind the command to an Event which will be sent with the result of the command:
//!
//! ```
//!# use url::Url;
//!# use crux_core::{Command, render};
//!# use crux_http::command::Http;
//!# const API_URL: &str = "https://example.com/";
//!# pub enum Event { Increment, Set(crux_http::Result<crux_http::Response<usize>>) }
//!# #[derive(crux_core::macros::Effect)]
//!# pub struct Capabilities {
//!# pub render: crux_core::render::Render<Event>,
//!# pub http: crux_http::Http<Event>,
//!# }
//!# #[derive(Default)] pub struct Model { count: usize }
//!# #[derive(Default)] pub struct App;
//!# impl crux_core::App for App {
//!# type Event = Event;
//!# type Model = Model;
//!# type ViewModel = ();
//!# type Capabilities = Capabilities;
//!# type Effect = Effect;
//! fn update(
//! &self,
//! event: Self::Event,
//! model: &mut Self::Model,
//! _caps: &Self::Capabilities)
//! -> Command<Effect, Event> {
//! match event {
//! //...
//! Event::Increment => {
//! let base = Url::parse(API_URL).unwrap();
//! let url = base.join("/inc").unwrap();
//!
//! Http::post(url) // creates an HTTP RequestBuilder
//! .expect_json()
//! .build() // creates a Command RequestBuilder
//! .then_send(Event::Set) // creates a Command
//! }
//! Event::Set(Ok(mut response)) => {
//! let count = response.take_body().unwrap();
//! model.count = count;
//! render::render()
//! }
//! Event::Set(Err(_)) => todo!()
//! }
//! }
//!# fn view(&self, model: &Self::Model) {
//!# unimplemented!()
//!# }
//!# }
//! ```
//!
//! Commands can be chained, allowing the outputs of the first effect to be used in constructing the second
//! effect. For example, the following code creates a new post, then fetches the full created post based
//! on a url read from the response to the creation request:
//!
//! ```
//!# use crux_core::Command;
//!# use crux_http::command::Http;
//!# use crux_core::render::render;
//!# use doctest_support::command::{Effect, Event, AnOperation, AnOperationOutput, Post};
//!# const API_URL: &str = "https://example.com/";
//!# let result = {
//! let cmd: Command<Effect, Event> =
//! Http::post(API_URL)
//! .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
//! .expect_json::<Post>()
//! .build()
//! .then_request(|result| {
//! let post = result.unwrap();
//! let url = &post.body().unwrap().url;
//!
//! Http::get(url).expect_json().build()
//! })
//! .then_send(Event::GotPost);
//!
//! // Run the http request concurrently with notifying the shell to render
//! Command::all([cmd, render()])
//!# };
//! ```
//!
//! The same can be done with the async API, if you need more complex orchestration that is
//! more naturally expressed in async rust
//!
//! ```
//! # use crux_core::Command;
//! # use crux_http::command::Http;
//! # use doctest_support::command::{Effect, Event, AnOperation, AnOperationOutput, Post};
//! # const API_URL: &str = "";
//! let cmd: Command<Effect, Event> = Command::new(|ctx| async move {
//! let first = Http::post(API_URL)
//! .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
//! .expect_json::<Post>()
//! .build()
//! .into_future(ctx.clone())
//! .await;
//!
//! let post = first.unwrap();
//! let url = &post.body().unwrap().url;
//!
//! let second = Http::get(url).expect_json().build().into_future(ctx.clone()).await;
//!
//! ctx.send_event(Event::GotPost(second));
//! });
//! ```
//!
//! In the async context, you can spawn additional concurrent tasks, which can, for example,
//! communicate with each other via channels, to enable more complex orchestrations, stateful
//! connection handling and other advanced uses.
//!
//! ```
//! # use crux_core::Command;
//! # use doctest_support::command::{Effect, Event, AnOperation};
//! let mut cmd: Command<Effect, Event> = Command::new(|ctx| async move {
//! let (tx, rx) = async_channel::unbounded();
//!
//! ctx.spawn(|ctx| async move {
//! for i in 0..10u8 {
//! let output = ctx.request_from_shell(AnOperation::One(i)).await;
//! tx.send(output).await.unwrap();
//! }
//! });
//!
//! ctx.spawn(|ctx| async move {
//! while let Ok(value) = rx.recv().await {
//! ctx.send_event(Event::Completed(value));
//! }
//! ctx.send_event(Event::Aborted);
//! });
//! });
//! ```
//!
//! Commands can be cancelled, by calling [`Command::abort_handle`]
//! and then calling `abort` on the returned handle.
//!
//! ```
//! # use crux_core::Command;
//! # use doctest_support::command::{Effect, Event, AnOperation};
//! let mut cmd: Command<Effect, Event> = Command::all([
//! Command::request_from_shell(AnOperation::One(1)).then_send(Event::Completed),
//! Command::request_from_shell(AnOperation::Two(1)).then_send(Event::Completed),
//! ]);
//!
//! let handle = cmd.abort_handle();
//!
//! // Command is still running
//! assert!(!cmd.was_aborted());
//!
//! handle.abort();
//!
//! // Command is now finished
//! assert!(cmd.is_done());
//! // And was aborted
//! assert!(cmd.was_aborted());
//!
//! ```
//!
//! You can test that Commands yield the expected effects and events.
//! Commands can be tested in isolation by creating them explicitly
//! in a test, and then checking the effects and events they generated.
//! Or you can call your app's `update` function in a test, and perform
//! the same checks on the returned Command.
//!
//! ```
//! # use crux_http::{
//! # command::Http,
//! # protocol::{HttpRequest, HttpResponse, HttpResult},
//! # testing::ResponseBuilder,
//! # };
//! # use doctest_support::command::{Effect, Event, Post};
//! const API_URL: &str = "https://example.com/api/posts";
//!
//! // Create a command to post a new Post to API_URL
//! // and then dispatch an event with the result
//! let mut cmd = Http::post(API_URL)
//! .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
//! .expect_json()
//! .build()
//! .then_send(Event::GotPost);
//!
//! // Check the effect is an HTTP request ...
//! let effect = cmd.effects().next().unwrap();
//! let Effect::Http(mut request) = effect else {
//! panic!("Expected a HTTP effect")
//! };
//!
//! // ... and the request is a POST to API_URL
//! assert_eq!(
//! &request.operation,
//! &HttpRequest::post(API_URL)
//! .header("content-type", "application/json")
//! .body(r#"{"body":"Hello!","title":"New Post"}"#)
//! .build()
//! );
//!
//! // Resolve the request with a successful response
//! let body = Post {
//! url: API_URL.to_string(),
//! title: "New Post".to_string(),
//! body: "Hello!".to_string(),
//! };
//! request
//! .resolve(HttpResult::Ok(HttpResponse::ok().json(&body).build()))
//! .expect("Resolve should succeed");
//!
//! // Check the event is a GotPost event with the successful response
//! let actual = cmd.events().next().unwrap();
//! let expected = Event::GotPost(Ok(ResponseBuilder::ok().body(body).build()));
//! assert_eq!(actual, expected);
//!
//! assert!(cmd.is_done());
//! ```
mod builder;
mod context;
mod executor;
mod stream;
use std::future::Future;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
// TODO: consider switching to flume
use crossbeam_channel::{Receiver, Sender};
use executor::{AbortHandle, Task, TaskId};
use futures::task::AtomicWaker;
use futures::{FutureExt as _, Stream, StreamExt as _};
use slab::Slab;
use stream::CommandStreamExt as _;
pub use builder::{RequestBuilder, StreamBuilder};
pub use context::CommandContext;
pub use stream::CommandOutput;
use crate::capability::Operation;
use crate::Request;
#[must_use = "Unused commands never execute. Return the command from your app's update function or combine it with other commands with Command::and or Command::all"]
pub struct Command<Effect, Event> {
effects: Receiver<Effect>,
events: Receiver<Event>,
context: CommandContext<Effect, Event>,
// Executor internals
// TODO: should this be a separate type?
ready_queue: Receiver<TaskId>,
spawn_queue: Receiver<Task>,
tasks: Slab<Task>,
ready_sender: Sender<TaskId>, // Used in creating wakers for tasks
waker: Arc<AtomicWaker>, // Shared with task wakers when polled in async context
// Signaling
aborted: Arc<AtomicBool>,
}
// Public API
impl<Effect, Event> Command<Effect, Event>
where
Effect: Send + 'static,
Event: Send + 'static,
{
/// Create a new command orchestrating effects with async Rust. This is the lowest level
/// API to create a Command if you need full control over its execution. In most cases you will
/// more likely want to create Commands with capabilities, and using the combinator APIs
/// ([`and`](Command::and) and [`all`](Command::all)) to orchestrate them.
///
/// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
/// events back to the app, and to spawn additional tasks. The closure is expected to return a future
/// which becomes the command's main asynchronous task.
pub fn new<F, Fut>(create_task: F) -> Self
where
F: FnOnce(CommandContext<Effect, Event>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
// RFC: do we need to think about backpressure? The channels are unbounded
// so a naughty Command can make massive amounts of requests or spawn a huge number of tasks.
// If these channels supported async, the CommandContext methods could also be async and
// we could give the channels some bounds
let (effect_sender, effect_receiver) = crossbeam_channel::unbounded();
let (event_sender, event_receiver) = crossbeam_channel::unbounded();
let (ready_sender, ready_receiver) = crossbeam_channel::unbounded();
let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded();
let (_, waker_receiver) = crossbeam_channel::unbounded();
let context = context::CommandContext {
effects: effect_sender,
events: event_sender,
tasks: spawn_sender,
};
let aborted: Arc<AtomicBool> = Default::default();
let task = Task {
finished: Default::default(),
aborted: aborted.clone(),
future: create_task(context.clone()).boxed(),
join_handle_wakers: waker_receiver,
};
let mut tasks = Slab::with_capacity(1);
let task_id = TaskId(tasks.insert(task));
ready_sender
.send(task_id)
.expect("Could not make task ready, ready channel disconnected");
Command {
effects: effect_receiver,
events: event_receiver,
context,
ready_queue: ready_receiver,
spawn_queue: spawn_receiver,
ready_sender,
tasks,
waker: Default::default(),
aborted,
}
}
/// Create an empty, completed Command. This is useful as a return value from `update` if
/// there are no side-effects to perform.
pub fn done() -> Self {
Command::new(|_ctx| futures::future::ready(()))
}
/// Create a command from another command with compatible `Effect` and `Event` types
pub fn from<Ef, Ev>(subcmd: Command<Ef, Ev>) -> Self
where
Ef: Send + 'static + Into<Effect> + Unpin,
Ev: Send + 'static + Into<Event> + Unpin,
Effect: Unpin,
Event: Unpin,
{
subcmd.map_effect(|ef| ef.into()).map_event(|ev| ev.into())
}
/// Turn the command into another command with compatible `Effect` and `Event` types
pub fn into<Ef, Ev>(self) -> Command<Ef, Ev>
where
Ef: Send + 'static + Unpin,
Ev: Send + 'static + Unpin,
Effect: Unpin + Into<Ef>,
Event: Unpin + Into<Ev>,
{
self.map_effect(|ef| ef.into()).map_event(|ev| ev.into())
}
/// Create a Command which dispatches an event and terminates. This is an alternative
/// to calling `update` recursively. The only difference is that the two `update` calls
/// will be visible to Crux and can show up in logs or any tooling. The trade-off is that
/// the event is not guaranteed to dispatch instantly - another `update` call which is
/// already scheduled may happen first.
pub fn event(event: Event) -> Self {
Command::new(|ctx| async move { ctx.send_event(event) })
}
/// Create a Command which sends a notification to the shell with a provided `operation`.
///
/// This ia synchronous equivalent of [`CommandContext::notify_shell`].
pub fn notify_shell<Op>(operation: Op) -> Command<Effect, Event>
where
Op: Operation,
Effect: From<Request<Op>>,
{
Command::new(|ctx| async move { ctx.notify_shell(operation) })
}
/// Start a creation of a Command which sends a one-time request to the shell with a provided
/// operation.
///
/// Returns a `RequestBuilder`, which can be converted into a Command directly, or chained
/// with another command builder using `.then`.
///
/// In an async context, `RequestBuilder` can be turned into a future that resolves to the
/// operation output type.
pub fn request_from_shell<Op>(
operation: Op,
) -> builder::RequestBuilder<Effect, Event, impl Future<Output = Op::Output>>
where
Op: Operation,
Effect: From<Request<Op>>,
{
builder::RequestBuilder::new(|ctx| ctx.request_from_shell(operation))
}
/// Start a creation of a Command which sends a stream request to the shell with a provided
/// operation.
///
/// Returns a `StreamBuilder`, which can be converted into a Command directly, or chained
/// with a `RequestBuilder` builder using `.then`.
///
/// In an async context, `StreamBuilder` can be turned into a stream that with the
/// operation output type as item.
pub fn stream_from_shell<Op>(
operation: Op,
) -> builder::StreamBuilder<Effect, Event, impl Stream<Item = Op::Output>>
where
Op: Operation,
Effect: From<Request<Op>>,
{
builder::StreamBuilder::new(|ctx| ctx.stream_from_shell(operation))
}
/// Run the effect state machine until it settles, then return true
/// if there is any more work to do - tasks to run or events or effects to receive
pub fn is_done(&mut self) -> bool {
self.run_until_settled();
self.effects.is_empty() && self.events.is_empty() && self.tasks.is_empty()
}
/// Run the effect state machine until it settles and return an iterator over the effects
pub fn effects(&mut self) -> impl Iterator<Item = Effect> + '_ {
self.run_until_settled();
self.effects.try_iter()
}
/// Run the effect state machine until it settles and return an iterator over the events
pub fn events(&mut self) -> impl Iterator<Item = Event> + '_ {
self.run_until_settled();
self.events.try_iter()
}
// Combinators
/// Create a command running self and the other command in sequence
// RFC: is this actually _useful_? Unlike `.then` on `CommandBuilder` this doesn't allow using
// the output of the first command in building the second one, it just runs them in sequence,
// and the benefit is unclear.
pub fn then(self, other: Self) -> Self
where
Effect: Unpin,
Event: Unpin,
{
Command::new(|ctx| async move {
// first run self until done
self.host(ctx.effects.clone(), ctx.events.clone()).await;
// then run other until done
other.host(ctx.effects, ctx.events).await;
})
}
/// Convenience for [`Command::all`] which runs another command concurrently with this one
pub fn and(mut self, other: Self) -> Self
where
Effect: Unpin,
Event: Unpin,
{
self.spawn(|ctx| other.host(ctx.effects, ctx.events).map(|_| ()));
self
}
/// Create a command running a number of commands concurrently
pub fn all<I>(commands: I) -> Self
where
I: IntoIterator<Item = Self>,
Effect: Unpin,
Event: Unpin,
{
let mut command = Command::done();
for c in commands {
command.spawn(|ctx| c.host(ctx.effects, ctx.events).map(|_| ()))
}
command
}
// Mapping for composition
/// Map effects requested as part of this command to a different effect type.
///
/// This is useful when composing apps to convert a command from a child app to a
/// command of the parent app.
pub fn map_effect<F, NewEffect>(self, map: F) -> Command<NewEffect, Event>
where
F: Fn(Effect) -> NewEffect + Send + Sync + 'static,
NewEffect: Send + Unpin + 'static,
Effect: Unpin,
Event: Unpin,
{
Command::new(|ctx| async move {
let mapped = self.map(|output| match output {
CommandOutput::Effect(effect) => CommandOutput::Effect(map(effect)),
CommandOutput::Event(event) => CommandOutput::Event(event),
});
mapped.host(ctx.effects, ctx.events).await;
})
}
/// Map events sent as part of this command to a different effect type
///
/// This is useful when composing apps to convert a command from a child app to a
/// command of the parent app.
pub fn map_event<F, NewEvent>(self, map: F) -> Command<Effect, NewEvent>
where
F: Fn(Event) -> NewEvent + Send + Sync + 'static,
NewEvent: Send + Unpin + 'static,
Effect: Unpin,
Event: Unpin,
{
Command::new(|ctx| async move {
let mapped = self.map(|output| match output {
CommandOutput::Effect(effect) => CommandOutput::Effect(effect),
CommandOutput::Event(event) => CommandOutput::Event(map(event)),
});
mapped.host(ctx.effects, ctx.events).await;
})
}
/// Spawn an additional task on the command. The task will execute concurrently with
/// existing tasks
///
/// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
/// events back to the app, and to spawn additional tasks. The closure is expected to return a future.
pub fn spawn<F, Fut>(&mut self, create_task: F)
where
F: FnOnce(CommandContext<Effect, Event>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
self.context.spawn(create_task);
}
/// Returns an abort handle which can be used to remotely terminate a running Command
/// and all its subtask.
///
/// This is specifically useful for cancelling subscriptions and long running effects
/// which may get superseded, like timers
pub fn abort_handle(&self) -> AbortHandle {
AbortHandle {
aborted: self.aborted.clone(),
}
}
}
impl<Effect, Event> FromIterator<Command<Effect, Event>> for Command<Effect, Event>
where
Effect: Send + Unpin + 'static,
Event: Send + Unpin + 'static,
{
fn from_iter<I: IntoIterator<Item = Command<Effect, Event>>>(iter: I) -> Self {
Command::all(iter)
}
}
#[cfg(test)]
mod tests;