Skip to main content

crux_core/command/
mod.rs

1//! Command represents one or more side-effects, resulting in interactions with the shell.
2//!
3//! Core creates Commands and returns them from the `update` function in response to events.
4//! Commands can be created directly, but more often they will be created and returned
5//! by capability APIs.
6//!
7//! A Command can execute side-effects in parallel, in sequence or a combination of both. To
8//! allow this orchestration they provide both a simple synchronous API and access to an
9//! asynchronous API.
10//!
11//! Command surfaces the effect requests and events sent in response with
12//! the [`Command::effects`] and [`Command::events`] methods. These can be used when testing
13//! the side effects requested by an `update` call.
14//!
15//! Internally, Command resembles [`FuturesUnordered`](futures::stream::FuturesUnordered):
16//! it manages and polls a number of futures and provides a context which they can use
17//! to submit effects to the shell and events back to the application.
18//!
19//! Command implements [`Stream`], making it useful in an async context,
20//! enabling, for example, wrapping Commands in one another.
21//!
22//! # Examples
23//!
24//! Commands are typically created by a capability and returned from the update function. Capabilities
25//! normally return a builder, which can be used in both sync and async context. The basic sync use
26//! is to bind the command to an Event which will be sent with the result of the command:
27//!
28//! ```
29//!# use url::Url;
30//!# use crux_core::{Command, render::render, render::RenderOperation};
31//!# use crux_core::macros::effect;
32//!# use crux_http::protocol::HttpRequest;
33//!# use crux_http::Http;
34//!# const API_URL: &str = "https://example.com/";
35//!# pub enum Event { Increment, Set(crux_http::Result<crux_http::Response<usize>>) }
36//!# #[effect]
37//!# pub enum Effect {
38//!#     Render(RenderOperation),
39//!#     Http(HttpRequest)
40//!# }
41//!# #[derive(Default)] pub struct Model { count: usize }
42//!# #[derive(Default)] pub struct App;
43//!# impl crux_core::App for App {
44//!#     type Event = Event;
45//!#     type Model = Model;
46//!#     type ViewModel = ();
47//!#     type Effect = Effect;
48//! fn update(
49//!     &self,
50//!     event: Self::Event,
51//!     model: &mut Self::Model)
52//! -> Command<Effect, Event> {
53//!     match event {
54//!         //...
55//!         Event::Increment => {
56//!             let base = Url::parse(API_URL).unwrap();
57//!             let url = base.join("/inc").unwrap();
58//!
59//!             Http::post(url)            // creates an HTTP RequestBuilder
60//!                 .expect_json()
61//!                 .build()               // creates a Command RequestBuilder
62//!                 .then_send(Event::Set) // creates a Command
63//!         }
64//!         Event::Set(Ok(mut response)) => {
65//!              let count = response.take_body().unwrap();
66//!              model.count = count;
67//!              render()
68//!         }
69//!         Event::Set(Err(_)) => todo!()
70//!     }
71//! }
72//!# fn view(&self, model: &Self::Model) {
73//!#     unimplemented!()
74//!# }
75//!# }
76//! ```
77//!
78//! Commands can be chained, allowing the outputs of the first effect to be used in constructing the second
79//! effect. For example, the following code creates a new post, then fetches the full created post based
80//! on a url read from the response to the creation request:
81//!
82//! ```
83//!# use crux_core::Command;
84//!# use crux_http::command::Http;
85//!# use crux_core::render::render;
86//!# use doctest_support::command::{Effect, Event, AnOperation, AnOperationOutput, Post};
87//!# const API_URL: &str = "https://example.com/";
88//!# let result = {
89//! let cmd: Command<Effect, Event> =
90//!     Http::post(API_URL)
91//!         .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
92//!         .expect_json::<Post>()
93//!         .build()
94//!         .then_request(|result| {
95//!             let post = result.unwrap();
96//!             let url = &post.body().unwrap().url;
97//!
98//!             Http::get(url).expect_json().build()
99//!         })
100//!         .then_send(Event::GotPost);
101//!
102//! // Run the http request concurrently with notifying the shell to render
103//! Command::all([cmd, render()])
104//!# };
105//! ```
106//!
107//! The same can be done with the async API, if you need more complex orchestration that is
108//! more naturally expressed in async rust
109//!
110//! ```
111//! # use crux_core::Command;
112//! # use crux_http::command::Http;
113//! # use doctest_support::command::{Effect, Event, AnOperation, AnOperationOutput, Post};
114//! # const API_URL: &str = "";
115//! let cmd: Command<Effect, Event> = Command::new(|ctx| async move {
116//!     let first = Http::post(API_URL)
117//!         .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
118//!         .expect_json::<Post>()
119//!         .build()
120//!         .into_future(ctx.clone())
121//!         .await;
122//!
123//!     let post = first.unwrap();
124//!     let url = &post.body().unwrap().url;
125//!
126//!     let second = Http::get(url).expect_json().build().into_future(ctx.clone()).await;
127//!
128//!     ctx.send_event(Event::GotPost(second));
129//! });
130//! ```
131//!
132//! In the async context, you can spawn additional concurrent tasks, which can, for example,
133//! communicate with each other via channels, to enable more complex orchestrations, stateful
134//! connection handling and other advanced uses.
135//!
136//! ```
137//! # use crux_core::Command;
138//! # use doctest_support::command::{Effect, Event, AnOperation};
139//! let mut cmd: Command<Effect, Event> = Command::new(|ctx| async move {
140//!     let (tx, rx) = async_channel::unbounded();
141//!
142//!     ctx.spawn(|ctx| async move {
143//!         for i in 0..10u8 {
144//!             let output = ctx.request_from_shell(AnOperation::One(i)).await;
145//!             tx.send(output).await.unwrap();
146//!         }
147//!     });
148//!
149//!     ctx.spawn(|ctx| async move {
150//!         while let Ok(value) = rx.recv().await {
151//!             ctx.send_event(Event::Completed(value));
152//!         }
153//!         ctx.send_event(Event::Aborted);
154//!     });
155//! });
156//! ```
157//!
158//! Commands can be cancelled, by calling [`Command::abort_handle`]
159//! and then calling `abort` on the returned handle.
160//!
161//! ```
162//! # use crux_core::Command;
163//! # use doctest_support::command::{Effect, Event, AnOperation};
164//! let mut cmd: Command<Effect, Event> = Command::all([
165//!     Command::request_from_shell(AnOperation::One(1)).then_send(Event::Completed),
166//!     Command::request_from_shell(AnOperation::Two(1)).then_send(Event::Completed),
167//! ]);
168//!
169//! let handle = cmd.abort_handle();
170//!
171//! // Command is still running
172//! assert!(!cmd.was_aborted());
173//!
174//! handle.abort();
175//!
176//! // Command is now finished
177//! assert!(cmd.is_done());
178//! // And was aborted
179//! assert!(cmd.was_aborted());
180//!
181//! ```
182//!
183//! You can test that Commands yield the expected effects and events.
184//! Commands can be tested in isolation by creating them explicitly
185//! in a test, and then checking the effects and events they generated.
186//! Or you can call your app's `update` function in a test, and perform
187//! the same checks on the returned Command.
188//!
189//! ```
190//! # use crux_http::{
191//! #     command::Http,
192//! #     protocol::{HttpRequest, HttpResponse, HttpResult},
193//! #     testing::ResponseBuilder,
194//! # };
195//! # use doctest_support::command::{Effect, Event, Post};
196//! const API_URL: &str = "https://example.com/api/posts";
197//!
198//! // Create a command to post a new Post to API_URL
199//! // and then dispatch an event with the result
200//! let mut cmd = Http::post(API_URL)
201//!     .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
202//!     .expect_json()
203//!     .build()
204//!     .then_send(Event::GotPost);
205//!
206//! // Check the effect is an HTTP request ...
207//! let effect = cmd.expect_one_effect();
208//! let Effect::Http(mut request) = effect else {
209//!     panic!("Expected a HTTP effect")
210//! };
211//!
212//! // ... and the request is a POST to API_URL
213//! assert_eq!(
214//!     &request.operation,
215//!     &HttpRequest::post(API_URL)
216//!         .header("content-type", "application/json")
217//!         .body(r#"{"body":"Hello!","title":"New Post"}"#)
218//!         .build()
219//! );
220//!
221//! // Resolve the request with a successful response
222//! let body = Post {
223//!     url: API_URL.to_string(),
224//!     title: "New Post".to_string(),
225//!     body: "Hello!".to_string(),
226//! };
227//! request
228//!     .resolve(HttpResult::Ok(HttpResponse::ok().json(&body).build()))
229//!     .expect("Resolve should succeed");
230//!
231//! // Check the event is a GotPost event with the successful response
232//! let actual = cmd.expect_one_event();
233//! let expected = Event::GotPost(Ok(ResponseBuilder::ok().body(body).build()));
234//! assert_eq!(actual, expected);
235//!
236//! assert!(cmd.is_done());
237//! ```
238
239mod builder;
240mod context;
241mod executor;
242mod stream;
243
244use std::future::Future;
245use std::sync::Arc;
246use std::sync::atomic::AtomicBool;
247
248// TODO: consider switching to flume
249use crossbeam_channel::{Receiver, Sender};
250use executor::{Task, TaskId};
251use futures::task::AtomicWaker;
252use futures::{FutureExt as _, Stream, StreamExt as _};
253use slab::Slab;
254use stream::CommandStreamExt as _;
255
256pub use builder::{NotificationBuilder, RequestBuilder, StreamBuilder};
257pub use context::CommandContext;
258pub use executor::AbortHandle;
259pub use stream::CommandOutput;
260
261use crate::Request;
262use crate::capability::Operation;
263
264// ANCHOR: command
265#[must_use = "Unused commands never execute. Return the command from your app's update function or combine it with other commands with Command::and or Command::all"]
266pub struct Command<Effect, Event> {
267    effects: Receiver<Effect>,
268    events: Receiver<Event>,
269    context: CommandContext<Effect, Event>,
270
271    // Executor internals
272    // TODO: should this be a separate type?
273    ready_queue: Receiver<TaskId>,
274    spawn_queue: Receiver<Task>,
275    tasks: Slab<Task>,
276    ready_sender: Sender<TaskId>, // Used in creating wakers for tasks
277    waker: Arc<AtomicWaker>,      // Shared with task wakers when polled in async context
278
279    // Signaling
280    aborted: Arc<AtomicBool>,
281}
282// ANCHOR_END: command
283
284// Public API
285
286impl<Effect, Event> Command<Effect, Event>
287where
288    Effect: Send + 'static,
289    Event: Send + 'static,
290{
291    /// Create a new command orchestrating effects with async Rust. This is the lowest level
292    /// API to create a Command if you need full control over its execution. In most cases you will
293    /// more likely want to create Commands with capabilities, and using the combinator APIs
294    /// ([`and`](Command::and) and [`all`](Command::all)) to orchestrate them.
295    ///
296    /// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
297    /// events back to the app, and to spawn additional tasks. The closure is expected to return a future
298    /// which becomes the command's main asynchronous task.
299    ///
300    /// # Panics
301    ///
302    /// If we could not make the task ready because the ready channel was disconnected.
303    pub fn new<F, Fut>(create_task: F) -> Self
304    where
305        F: FnOnce(CommandContext<Effect, Event>) -> Fut,
306        Fut: Future<Output = ()> + Send + 'static,
307    {
308        // RFC: do we need to think about backpressure? The channels are unbounded
309        // so a naughty Command can make massive amounts of requests or spawn a huge number of tasks.
310        // If these channels supported async, the CommandContext methods could also be async and
311        // we could give the channels some bounds
312        let (effect_sender, effect_receiver) = crossbeam_channel::unbounded();
313        let (event_sender, event_receiver) = crossbeam_channel::unbounded();
314        let (ready_sender, ready_receiver) = crossbeam_channel::unbounded();
315        let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded();
316        let (_, waker_receiver) = crossbeam_channel::unbounded();
317
318        let context = context::CommandContext {
319            effects: effect_sender,
320            events: event_sender,
321            tasks: spawn_sender,
322            rc: Arc::default(),
323        };
324
325        let aborted: Arc<AtomicBool> = Arc::default();
326        let task = Task {
327            finished: Arc::default(),
328            aborted: aborted.clone(),
329            future: create_task(context.clone()).boxed(),
330            join_handle_wakers: waker_receiver,
331        };
332
333        let mut tasks = Slab::with_capacity(1);
334        let task_id = TaskId(tasks.insert(task));
335
336        ready_sender
337            .send(task_id)
338            .expect("Could not make task ready, ready channel disconnected");
339
340        Command {
341            effects: effect_receiver,
342            events: event_receiver,
343            context,
344            ready_queue: ready_receiver,
345            spawn_queue: spawn_receiver,
346            ready_sender,
347            tasks,
348            waker: Arc::default(),
349            aborted,
350        }
351    }
352
353    /// Create an empty, completed Command. This is useful as a return value from `update` if
354    /// there are no side-effects to perform.
355    pub fn done() -> Self {
356        Command::new(|_ctx| futures::future::ready(()))
357    }
358
359    /// Create a command from another command with compatible `Effect` and `Event` types
360    pub fn from<Ef, Ev>(subcmd: Command<Ef, Ev>) -> Self
361    where
362        Ef: Send + 'static + Into<Effect> + Unpin,
363        Ev: Send + 'static + Into<Event> + Unpin,
364        Effect: Unpin,
365        Event: Unpin,
366    {
367        subcmd.map_effect(Into::into).map_event(Into::into)
368    }
369
370    /// Turn the command into another command with compatible `Effect` and `Event` types
371    pub fn into<Ef, Ev>(self) -> Command<Ef, Ev>
372    where
373        Ef: Send + 'static + Unpin,
374        Ev: Send + 'static + Unpin,
375        Effect: Unpin + Into<Ef>,
376        Event: Unpin + Into<Ev>,
377    {
378        self.map_effect(Into::into).map_event(Into::into)
379    }
380
381    /// Create a Command which dispatches an event and terminates. This is an alternative
382    /// to calling `update` recursively. The only difference is that the two `update` calls
383    /// will be visible to Crux and can show up in logs or any tooling. The trade-off is that
384    /// the event is not guaranteed to dispatch instantly - another `update` call which is
385    /// already scheduled may happen first.
386    pub fn event(event: Event) -> Self {
387        Command::new(|ctx| async move { ctx.send_event(event) })
388    }
389
390    /// Start a creation of a Command which sends a notification to the shell with a provided
391    /// `operation`.
392    ///
393    /// Returns a [`NotificationBuilder`] which can be converted into a Command directly.
394    ///
395    /// In an async context, `NotificationBuilder` can be turned into a future that resolves to the
396    /// operation output type.
397    pub fn notify_shell<Op>(
398        operation: Op,
399    ) -> builder::NotificationBuilder<Effect, Event, impl Future<Output = ()>>
400    where
401        Op: Operation,
402        Effect: From<Request<Op>>,
403    {
404        builder::NotificationBuilder::new(|ctx| async move { ctx.notify_shell(operation) })
405    }
406
407    /// Start a creation of a Command which sends a one-time request to the shell with a provided
408    /// operation.
409    ///
410    /// Returns a `RequestBuilder`, which can be converted into a Command directly, or chained
411    /// with another command builder using `.then`.
412    ///
413    /// In an async context, `RequestBuilder` can be turned into a future that resolves to the
414    /// operation output type.
415    pub fn request_from_shell<Op>(
416        operation: Op,
417    ) -> builder::RequestBuilder<Effect, Event, impl Future<Output = Op::Output>>
418    where
419        Op: Operation,
420        Effect: From<Request<Op>>,
421    {
422        builder::RequestBuilder::new(|ctx| ctx.request_from_shell(operation))
423    }
424
425    /// Start a creation of a Command which sends a stream request to the shell with a provided
426    /// operation.
427    ///
428    /// Returns a `StreamBuilder`, which can be converted into a Command directly, or chained
429    /// with a `RequestBuilder` builder using `.then`.
430    ///
431    /// In an async context, `StreamBuilder` can be turned into a stream that with the
432    /// operation output type as item.
433    pub fn stream_from_shell<Op>(
434        operation: Op,
435    ) -> builder::StreamBuilder<Effect, Event, impl Stream<Item = Op::Output>>
436    where
437        Op: Operation,
438        Effect: From<Request<Op>>,
439    {
440        builder::StreamBuilder::new(|ctx| ctx.stream_from_shell(operation))
441    }
442
443    /// Run the effect state machine until it settles, then return true
444    /// if there is any more work to do - tasks to run or events or effects to receive
445    pub fn is_done(&mut self) -> bool {
446        self.run_until_settled();
447
448        // If a context is alive, the command can still receive effects, events or tasks.
449        let all_context_dropped = Arc::strong_count(&self.context.rc) == 1;
450
451        all_context_dropped
452            && self.effects.is_empty()
453            && self.events.is_empty()
454            && self.tasks.is_empty()
455    }
456
457    /// Run the effect state machine until it settles and return an iterator over the effects
458    pub fn effects(&mut self) -> impl Iterator<Item = Effect> + '_ {
459        self.run_until_settled();
460
461        self.effects.try_iter()
462    }
463
464    /// Run the effect state machine until it settles and return an iterator over the events
465    pub fn events(&mut self) -> impl Iterator<Item = Event> + '_ {
466        self.run_until_settled();
467
468        self.events.try_iter()
469    }
470
471    // Combinators
472
473    /// Convert the command into a future to use in an async context
474    pub async fn into_future(self, ctx: CommandContext<Effect, Event>)
475    where
476        Effect: Unpin + Send + 'static,
477        Event: Unpin + Send + 'static,
478    {
479        self.host(ctx.effects, ctx.events).await;
480    }
481
482    /// Create a command running self and the other command in sequence
483    // RFC: is this actually _useful_? Unlike `.then` on `CommandBuilder` this doesn't allow using
484    // the output of the first command in building the second one, it just runs them in sequence,
485    // and the benefit is unclear.
486    pub fn then(self, other: Self) -> Self
487    where
488        Effect: Unpin,
489        Event: Unpin,
490    {
491        Command::new(|ctx| async move {
492            // first run self until done
493            self.into_future(ctx.clone()).await;
494
495            // then run other until done
496            other.into_future(ctx).await;
497        })
498    }
499
500    /// Convenience for [`Command::all`] which runs another command concurrently with this one
501    pub fn and(mut self, other: Self) -> Self
502    where
503        Effect: Unpin,
504        Event: Unpin,
505    {
506        self.spawn(|ctx| other.into_future(ctx));
507
508        self
509    }
510
511    /// Create a command running a number of commands concurrently
512    pub fn all<I>(commands: I) -> Self
513    where
514        I: IntoIterator<Item = Self>,
515        Effect: Unpin,
516        Event: Unpin,
517    {
518        let mut command = Command::done();
519
520        for c in commands {
521            command.spawn(|ctx| c.into_future(ctx));
522        }
523
524        command
525    }
526
527    // Mapping for composition
528
529    /// Map effects requested as part of this command to a different effect type.
530    ///
531    /// This is useful when composing apps to convert a command from a child app to a
532    /// command of the parent app.
533    pub fn map_effect<F, NewEffect>(self, map: F) -> Command<NewEffect, Event>
534    where
535        F: Fn(Effect) -> NewEffect + Send + Sync + 'static,
536        NewEffect: Send + Unpin + 'static,
537        Effect: Unpin,
538        Event: Unpin,
539    {
540        Command::new(|ctx| async move {
541            let mapped = self.map(|output| match output {
542                CommandOutput::Effect(effect) => CommandOutput::Effect(map(effect)),
543                CommandOutput::Event(event) => CommandOutput::Event(event),
544            });
545
546            mapped.host(ctx.effects, ctx.events).await;
547        })
548    }
549
550    /// Map events sent as part of this command to a different effect type
551    ///
552    /// This is useful when composing apps to convert a command from a child app to a
553    /// command of the parent app.
554    pub fn map_event<F, NewEvent>(self, map: F) -> Command<Effect, NewEvent>
555    where
556        F: Fn(Event) -> NewEvent + Send + Sync + 'static,
557        NewEvent: Send + Unpin + 'static,
558        Effect: Unpin,
559        Event: Unpin,
560    {
561        Command::new(|ctx| async move {
562            let mapped = self.map(|output| match output {
563                CommandOutput::Effect(effect) => CommandOutput::Effect(effect),
564                CommandOutput::Event(event) => CommandOutput::Event(map(event)),
565            });
566
567            mapped.host(ctx.effects, ctx.events).await;
568        })
569    }
570
571    /// Spawn an additional task on the command. The task will execute concurrently with
572    /// existing tasks
573    ///
574    /// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
575    /// events back to the app, and to spawn additional tasks. The closure is expected to return a future.
576    pub fn spawn<F, Fut>(&mut self, create_task: F)
577    where
578        F: FnOnce(CommandContext<Effect, Event>) -> Fut,
579        Fut: Future<Output = ()> + Send + 'static,
580    {
581        self.context.spawn(create_task);
582    }
583
584    /// Returns an abort handle which can be used to remotely terminate a running Command
585    /// and all its subtask.
586    ///
587    /// This is specifically useful for cancelling subscriptions and long running effects
588    /// which may get superseded, like timers
589    #[must_use]
590    pub fn abort_handle(&self) -> AbortHandle {
591        AbortHandle {
592            aborted: self.aborted.clone(),
593        }
594    }
595}
596
597impl<Effect, Event> FromIterator<Command<Effect, Event>> for Command<Effect, Event>
598where
599    Effect: Send + Unpin + 'static,
600    Event: Send + Unpin + 'static,
601{
602    fn from_iter<I: IntoIterator<Item = Command<Effect, Event>>>(iter: I) -> Self {
603        Command::all(iter)
604    }
605}
606
607#[cfg(test)]
608mod tests;