crux_core/command/mod.rs
1//! Command represents one or more side-effects, resulting in interactions with the shell.
2//! Core creates Commands and returns them from the `update` function in response to events.
3//! Commands can be created directly, but more often they will be created and returned
4//! by capability APIs.
5//!
6//! A Command can execute side-effects in parallel, in sequence or a combination of both. To
7//! allow this orchestration they provide both a simple synchronous API and access to an
8//! asynchronous API.
9//!
10//! Command surfaces the effect requests and events sent in response with
11//! the [`Command::effects`] and [`Command::events`] methods. These can be used when testing
12//! the side effects requested by an `update` call.
13//!
14//! Internally, Command resembles [`FuturesUnordered`](futures::stream::FuturesUnordered):
15//! it manages and polls a number of futures and provides a context which they can use
16//! to submit effects to the shell and events back to the application.
17//!
18//! Command implements [`Stream`], making it useful in an async context,
19//! enabling, for example, wrapping Commands in one another.
20//!
21//! # Examples
22//!
23//! Commands are typically created by a capability and returned from the update function. Capabilities
24//! normally return a builder, which can be used in both sync and async context. The basic sync use
25//! is to bind the command to an Event which will be sent with the result of the command:
26//!
27//! ```
28//!# use url::Url;
29//!# use crux_core::{Command, render};
30//!# use crux_http::command::Http;
31//!# const API_URL: &str = "https://example.com/";
32//!# pub enum Event { Increment, Set(crux_http::Result<crux_http::Response<usize>>) }
33//!# #[derive(crux_core::macros::Effect)]
34//!# pub struct Capabilities {
35//!# pub render: crux_core::render::Render<Event>,
36//!# pub http: crux_http::Http<Event>,
37//!# }
38//!# #[derive(Default)] pub struct Model { count: usize }
39//!# #[derive(Default)] pub struct App;
40//!# impl crux_core::App for App {
41//!# type Event = Event;
42//!# type Model = Model;
43//!# type ViewModel = ();
44//!# type Capabilities = Capabilities;
45//!# type Effect = Effect;
46//! fn update(
47//! &self,
48//! event: Self::Event,
49//! model: &mut Self::Model,
50//! _caps: &Self::Capabilities)
51//! -> Command<Effect, Event> {
52//! match event {
53//! //...
54//! Event::Increment => {
55//! let base = Url::parse(API_URL).unwrap();
56//! let url = base.join("/inc").unwrap();
57//!
58//! Http::post(url) // creates an HTTP RequestBuilder
59//! .expect_json()
60//! .build() // creates a Command RequestBuilder
61//! .then_send(Event::Set) // creates a Command
62//! }
63//! Event::Set(Ok(mut response)) => {
64//! let count = response.take_body().unwrap();
65//! model.count = count;
66//! render::render()
67//! }
68//! Event::Set(Err(_)) => todo!()
69//! }
70//! }
71//!# fn view(&self, model: &Self::Model) {
72//!# unimplemented!()
73//!# }
74//!# }
75//! ```
76//!
77//! Commands can be chained, allowing the outputs of the first effect to be used in constructing the second
78//! effect. For example, the following code creates a new post, then fetches the full created post based
79//! on a url read from the response to the creation request:
80//!
81//! ```
82//!# use crux_core::Command;
83//!# use crux_http::command::Http;
84//!# use crux_core::render::render;
85//!# use doctest_support::command::{Effect, Event, AnOperation, AnOperationOutput, Post};
86//!# const API_URL: &str = "https://example.com/";
87//!# let result = {
88//! let cmd: Command<Effect, Event> =
89//! Http::post(API_URL)
90//! .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
91//! .expect_json::<Post>()
92//! .build()
93//! .then_request(|result| {
94//! let post = result.unwrap();
95//! let url = &post.body().unwrap().url;
96//!
97//! Http::get(url).expect_json().build()
98//! })
99//! .then_send(Event::GotPost);
100//!
101//! // Run the http request concurrently with notifying the shell to render
102//! Command::all([cmd, render()])
103//!# };
104//! ```
105//!
106//! The same can be done with the async API, if you need more complex orchestration that is
107//! more naturally expressed in async rust
108//!
109//! ```
110//! # use crux_core::Command;
111//! # use crux_http::command::Http;
112//! # use doctest_support::command::{Effect, Event, AnOperation, AnOperationOutput, Post};
113//! # const API_URL: &str = "";
114//! let cmd: Command<Effect, Event> = Command::new(|ctx| async move {
115//! let first = Http::post(API_URL)
116//! .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
117//! .expect_json::<Post>()
118//! .build()
119//! .into_future(ctx.clone())
120//! .await;
121//!
122//! let post = first.unwrap();
123//! let url = &post.body().unwrap().url;
124//!
125//! let second = Http::get(url).expect_json().build().into_future(ctx.clone()).await;
126//!
127//! ctx.send_event(Event::GotPost(second));
128//! });
129//! ```
130//!
131//! In the async context, you can spawn additional concurrent tasks, which can, for example,
132//! communicate with each other via channels, to enable more complex orchestrations, stateful
133//! connection handling and other advanced uses.
134//!
135//! ```
136//! # use crux_core::Command;
137//! # use doctest_support::command::{Effect, Event, AnOperation};
138//! let mut cmd: Command<Effect, Event> = Command::new(|ctx| async move {
139//! let (tx, rx) = async_channel::unbounded();
140//!
141//! ctx.spawn(|ctx| async move {
142//! for i in 0..10u8 {
143//! let output = ctx.request_from_shell(AnOperation::One(i)).await;
144//! tx.send(output).await.unwrap();
145//! }
146//! });
147//!
148//! ctx.spawn(|ctx| async move {
149//! while let Ok(value) = rx.recv().await {
150//! ctx.send_event(Event::Completed(value));
151//! }
152//! ctx.send_event(Event::Aborted);
153//! });
154//! });
155//! ```
156//!
157//! Commands can be cancelled, by calling [`Command::abort_handle`]
158//! and then calling `abort` on the returned handle.
159//!
160//! ```
161//! # use crux_core::Command;
162//! # use doctest_support::command::{Effect, Event, AnOperation};
163//! let mut cmd: Command<Effect, Event> = Command::all([
164//! Command::request_from_shell(AnOperation::One(1)).then_send(Event::Completed),
165//! Command::request_from_shell(AnOperation::Two(1)).then_send(Event::Completed),
166//! ]);
167//!
168//! let handle = cmd.abort_handle();
169//!
170//! // Command is still running
171//! assert!(!cmd.was_aborted());
172//!
173//! handle.abort();
174//!
175//! // Command is now finished
176//! assert!(cmd.is_done());
177//! // And was aborted
178//! assert!(cmd.was_aborted());
179//!
180//! ```
181//!
182//! You can test that Commands yield the expected effects and events.
183//! Commands can be tested in isolation by creating them explicitly
184//! in a test, and then checking the effects and events they generated.
185//! Or you can call your app's `update` function in a test, and perform
186//! the same checks on the returned Command.
187//!
188//! ```
189//! # use crux_http::{
190//! # command::Http,
191//! # protocol::{HttpRequest, HttpResponse, HttpResult},
192//! # testing::ResponseBuilder,
193//! # };
194//! # use doctest_support::command::{Effect, Event, Post};
195//! const API_URL: &str = "https://example.com/api/posts";
196//!
197//! // Create a command to post a new Post to API_URL
198//! // and then dispatch an event with the result
199//! let mut cmd = Http::post(API_URL)
200//! .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
201//! .expect_json()
202//! .build()
203//! .then_send(Event::GotPost);
204//!
205//! // Check the effect is an HTTP request ...
206//! let effect = cmd.effects().next().unwrap();
207//! let Effect::Http(mut request) = effect else {
208//! panic!("Expected a HTTP effect")
209//! };
210//!
211//! // ... and the request is a POST to API_URL
212//! assert_eq!(
213//! &request.operation,
214//! &HttpRequest::post(API_URL)
215//! .header("content-type", "application/json")
216//! .body(r#"{"body":"Hello!","title":"New Post"}"#)
217//! .build()
218//! );
219//!
220//! // Resolve the request with a successful response
221//! let body = Post {
222//! url: API_URL.to_string(),
223//! title: "New Post".to_string(),
224//! body: "Hello!".to_string(),
225//! };
226//! request
227//! .resolve(HttpResult::Ok(HttpResponse::ok().json(&body).build()))
228//! .expect("Resolve should succeed");
229//!
230//! // Check the event is a GotPost event with the successful response
231//! let actual = cmd.events().next().unwrap();
232//! let expected = Event::GotPost(Ok(ResponseBuilder::ok().body(body).build()));
233//! assert_eq!(actual, expected);
234//!
235//! assert!(cmd.is_done());
236//! ```
237
238mod builder;
239mod context;
240mod executor;
241mod stream;
242
243use std::future::Future;
244use std::sync::atomic::AtomicBool;
245use std::sync::Arc;
246
247// TODO: consider switching to flume
248use crossbeam_channel::{Receiver, Sender};
249use executor::{AbortHandle, Task, TaskId};
250use futures::task::AtomicWaker;
251use futures::{FutureExt as _, Stream, StreamExt as _};
252use slab::Slab;
253use stream::CommandStreamExt as _;
254
255pub use builder::{NotificationBuilder, RequestBuilder, StreamBuilder};
256pub use context::CommandContext;
257pub use stream::CommandOutput;
258
259use crate::capability::Operation;
260use crate::Request;
261
262#[must_use = "Unused commands never execute. Return the command from your app's update function or combine it with other commands with Command::and or Command::all"]
263pub struct Command<Effect, Event> {
264 effects: Receiver<Effect>,
265 events: Receiver<Event>,
266 context: CommandContext<Effect, Event>,
267
268 // Executor internals
269 // TODO: should this be a separate type?
270 ready_queue: Receiver<TaskId>,
271 spawn_queue: Receiver<Task>,
272 tasks: Slab<Task>,
273 ready_sender: Sender<TaskId>, // Used in creating wakers for tasks
274 waker: Arc<AtomicWaker>, // Shared with task wakers when polled in async context
275
276 // Signaling
277 aborted: Arc<AtomicBool>,
278}
279
280// Public API
281
282impl<Effect, Event> Command<Effect, Event>
283where
284 Effect: Send + 'static,
285 Event: Send + 'static,
286{
287 /// Create a new command orchestrating effects with async Rust. This is the lowest level
288 /// API to create a Command if you need full control over its execution. In most cases you will
289 /// more likely want to create Commands with capabilities, and using the combinator APIs
290 /// ([`and`](Command::and) and [`all`](Command::all)) to orchestrate them.
291 ///
292 /// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
293 /// events back to the app, and to spawn additional tasks. The closure is expected to return a future
294 /// which becomes the command's main asynchronous task.
295 ///
296 /// # Panics
297 ///
298 /// If we could not make the task ready because the ready channel was disconnected.
299 pub fn new<F, Fut>(create_task: F) -> Self
300 where
301 F: FnOnce(CommandContext<Effect, Event>) -> Fut,
302 Fut: Future<Output = ()> + Send + 'static,
303 {
304 // RFC: do we need to think about backpressure? The channels are unbounded
305 // so a naughty Command can make massive amounts of requests or spawn a huge number of tasks.
306 // If these channels supported async, the CommandContext methods could also be async and
307 // we could give the channels some bounds
308 let (effect_sender, effect_receiver) = crossbeam_channel::unbounded();
309 let (event_sender, event_receiver) = crossbeam_channel::unbounded();
310 let (ready_sender, ready_receiver) = crossbeam_channel::unbounded();
311 let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded();
312 let (_, waker_receiver) = crossbeam_channel::unbounded();
313
314 let context = context::CommandContext {
315 effects: effect_sender,
316 events: event_sender,
317 tasks: spawn_sender,
318 };
319
320 let aborted: Arc<AtomicBool> = Arc::default();
321 let task = Task {
322 finished: Arc::default(),
323 aborted: aborted.clone(),
324 future: create_task(context.clone()).boxed(),
325 join_handle_wakers: waker_receiver,
326 };
327
328 let mut tasks = Slab::with_capacity(1);
329 let task_id = TaskId(tasks.insert(task));
330
331 ready_sender
332 .send(task_id)
333 .expect("Could not make task ready, ready channel disconnected");
334
335 Command {
336 effects: effect_receiver,
337 events: event_receiver,
338 context,
339 ready_queue: ready_receiver,
340 spawn_queue: spawn_receiver,
341 ready_sender,
342 tasks,
343 waker: Arc::default(),
344 aborted,
345 }
346 }
347
348 /// Create an empty, completed Command. This is useful as a return value from `update` if
349 /// there are no side-effects to perform.
350 pub fn done() -> Self {
351 Command::new(|_ctx| futures::future::ready(()))
352 }
353
354 /// Create a command from another command with compatible `Effect` and `Event` types
355 pub fn from<Ef, Ev>(subcmd: Command<Ef, Ev>) -> Self
356 where
357 Ef: Send + 'static + Into<Effect> + Unpin,
358 Ev: Send + 'static + Into<Event> + Unpin,
359 Effect: Unpin,
360 Event: Unpin,
361 {
362 subcmd.map_effect(Into::into).map_event(Into::into)
363 }
364
365 /// Turn the command into another command with compatible `Effect` and `Event` types
366 pub fn into<Ef, Ev>(self) -> Command<Ef, Ev>
367 where
368 Ef: Send + 'static + Unpin,
369 Ev: Send + 'static + Unpin,
370 Effect: Unpin + Into<Ef>,
371 Event: Unpin + Into<Ev>,
372 {
373 self.map_effect(Into::into).map_event(Into::into)
374 }
375
376 /// Create a Command which dispatches an event and terminates. This is an alternative
377 /// to calling `update` recursively. The only difference is that the two `update` calls
378 /// will be visible to Crux and can show up in logs or any tooling. The trade-off is that
379 /// the event is not guaranteed to dispatch instantly - another `update` call which is
380 /// already scheduled may happen first.
381 pub fn event(event: Event) -> Self {
382 Command::new(|ctx| async move { ctx.send_event(event) })
383 }
384
385 /// Start a creation of a Command which sends a notification to the shell with a provided
386 /// `operation`.
387 ///
388 /// Returns a [`NotificationBuilder`] which can be converted into a Command directly.
389 ///
390 /// In an async context, `NotificationBuilder` can be turned into a future that resolves to the
391 /// operation output type.
392 pub fn notify_shell<Op>(
393 operation: Op,
394 ) -> builder::NotificationBuilder<Effect, Event, impl Future<Output = ()>>
395 where
396 Op: Operation,
397 Effect: From<Request<Op>>,
398 {
399 builder::NotificationBuilder::new(|ctx| async move { ctx.notify_shell(operation) })
400 }
401
402 /// Start a creation of a Command which sends a one-time request to the shell with a provided
403 /// operation.
404 ///
405 /// Returns a `RequestBuilder`, which can be converted into a Command directly, or chained
406 /// with another command builder using `.then`.
407 ///
408 /// In an async context, `RequestBuilder` can be turned into a future that resolves to the
409 /// operation output type.
410 pub fn request_from_shell<Op>(
411 operation: Op,
412 ) -> builder::RequestBuilder<Effect, Event, impl Future<Output = Op::Output>>
413 where
414 Op: Operation,
415 Effect: From<Request<Op>>,
416 {
417 builder::RequestBuilder::new(|ctx| ctx.request_from_shell(operation))
418 }
419
420 /// Start a creation of a Command which sends a stream request to the shell with a provided
421 /// operation.
422 ///
423 /// Returns a `StreamBuilder`, which can be converted into a Command directly, or chained
424 /// with a `RequestBuilder` builder using `.then`.
425 ///
426 /// In an async context, `StreamBuilder` can be turned into a stream that with the
427 /// operation output type as item.
428 pub fn stream_from_shell<Op>(
429 operation: Op,
430 ) -> builder::StreamBuilder<Effect, Event, impl Stream<Item = Op::Output>>
431 where
432 Op: Operation,
433 Effect: From<Request<Op>>,
434 {
435 builder::StreamBuilder::new(|ctx| ctx.stream_from_shell(operation))
436 }
437
438 /// Run the effect state machine until it settles, then return true
439 /// if there is any more work to do - tasks to run or events or effects to receive
440 pub fn is_done(&mut self) -> bool {
441 self.run_until_settled();
442
443 self.effects.is_empty() && self.events.is_empty() && self.tasks.is_empty()
444 }
445
446 /// Run the effect state machine until it settles and return an iterator over the effects
447 pub fn effects(&mut self) -> impl Iterator<Item = Effect> + '_ {
448 self.run_until_settled();
449
450 self.effects.try_iter()
451 }
452
453 /// Run the effect state machine until it settles and return an iterator over the events
454 pub fn events(&mut self) -> impl Iterator<Item = Event> + '_ {
455 self.run_until_settled();
456
457 self.events.try_iter()
458 }
459
460 // Combinators
461
462 /// Create a command running self and the other command in sequence
463 // RFC: is this actually _useful_? Unlike `.then` on `CommandBuilder` this doesn't allow using
464 // the output of the first command in building the second one, it just runs them in sequence,
465 // and the benefit is unclear.
466 pub fn then(self, other: Self) -> Self
467 where
468 Effect: Unpin,
469 Event: Unpin,
470 {
471 Command::new(|ctx| async move {
472 // first run self until done
473 self.host(ctx.effects.clone(), ctx.events.clone()).await;
474
475 // then run other until done
476 other.host(ctx.effects, ctx.events).await;
477 })
478 }
479
480 /// Convenience for [`Command::all`] which runs another command concurrently with this one
481 pub fn and(mut self, other: Self) -> Self
482 where
483 Effect: Unpin,
484 Event: Unpin,
485 {
486 self.spawn(|ctx| other.host(ctx.effects, ctx.events).map(|_| ()));
487
488 self
489 }
490
491 /// Create a command running a number of commands concurrently
492 pub fn all<I>(commands: I) -> Self
493 where
494 I: IntoIterator<Item = Self>,
495 Effect: Unpin,
496 Event: Unpin,
497 {
498 let mut command = Command::done();
499
500 for c in commands {
501 command.spawn(|ctx| c.host(ctx.effects, ctx.events).map(|_| ()));
502 }
503
504 command
505 }
506
507 // Mapping for composition
508
509 /// Map effects requested as part of this command to a different effect type.
510 ///
511 /// This is useful when composing apps to convert a command from a child app to a
512 /// command of the parent app.
513 pub fn map_effect<F, NewEffect>(self, map: F) -> Command<NewEffect, Event>
514 where
515 F: Fn(Effect) -> NewEffect + Send + Sync + 'static,
516 NewEffect: Send + Unpin + 'static,
517 Effect: Unpin,
518 Event: Unpin,
519 {
520 Command::new(|ctx| async move {
521 let mapped = self.map(|output| match output {
522 CommandOutput::Effect(effect) => CommandOutput::Effect(map(effect)),
523 CommandOutput::Event(event) => CommandOutput::Event(event),
524 });
525
526 mapped.host(ctx.effects, ctx.events).await;
527 })
528 }
529
530 /// Map events sent as part of this command to a different effect type
531 ///
532 /// This is useful when composing apps to convert a command from a child app to a
533 /// command of the parent app.
534 pub fn map_event<F, NewEvent>(self, map: F) -> Command<Effect, NewEvent>
535 where
536 F: Fn(Event) -> NewEvent + Send + Sync + 'static,
537 NewEvent: Send + Unpin + 'static,
538 Effect: Unpin,
539 Event: Unpin,
540 {
541 Command::new(|ctx| async move {
542 let mapped = self.map(|output| match output {
543 CommandOutput::Effect(effect) => CommandOutput::Effect(effect),
544 CommandOutput::Event(event) => CommandOutput::Event(map(event)),
545 });
546
547 mapped.host(ctx.effects, ctx.events).await;
548 })
549 }
550
551 /// Spawn an additional task on the command. The task will execute concurrently with
552 /// existing tasks
553 ///
554 /// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
555 /// events back to the app, and to spawn additional tasks. The closure is expected to return a future.
556 pub fn spawn<F, Fut>(&mut self, create_task: F)
557 where
558 F: FnOnce(CommandContext<Effect, Event>) -> Fut,
559 Fut: Future<Output = ()> + Send + 'static,
560 {
561 self.context.spawn(create_task);
562 }
563
564 /// Returns an abort handle which can be used to remotely terminate a running Command
565 /// and all its subtask.
566 ///
567 /// This is specifically useful for cancelling subscriptions and long running effects
568 /// which may get superseded, like timers
569 #[must_use]
570 pub fn abort_handle(&self) -> AbortHandle {
571 AbortHandle {
572 aborted: self.aborted.clone(),
573 }
574 }
575}
576
577impl<Effect, Event> FromIterator<Command<Effect, Event>> for Command<Effect, Event>
578where
579 Effect: Send + Unpin + 'static,
580 Event: Send + Unpin + 'static,
581{
582 fn from_iter<I: IntoIterator<Item = Command<Effect, Event>>>(iter: I) -> Self {
583 Command::all(iter)
584 }
585}
586
587#[cfg(test)]
588mod tests;