crux_core/command/
mod.rs

1//! Command represents one or more side-effects, resulting in interactions with the shell.
2//! Core creates Commands and returns them from the `update` function in response to events.
3//! Commands can be created directly, but more often they will be created and returned
4//! by capability APIs.
5//!
6//! A Command can execute side-effects in parallel, in sequence or a combination of both. To
7//! allow this orchestration they provide both a simple synchronous API and access to an
8//! asynchronous API.
9//!
10//! Command surfaces the effect requests and events sent in response with
11//! the [`Command::effects`] and [`Command::events`] methods. These can be used when testing
12//! the side effects requested by an `update` call.
13//!
14//! Internally, Command resembles [`FuturesUnordered`](futures::stream::FuturesUnordered):
15//! it manages and polls a number of futures and provides a context which they can use
16//! to submit effects to the shell and events back to the application.
17//!
18//! Command implements [`Stream`], making it useful in an async context,
19//! enabling, for example, wrapping Commands in one another.
20//!
21//! # Examples
22//!
23//! Commands are typically created by a capability and returned from the update function. Capabilities
24//! normally return a builder, which can be used in both sync and async context. The basic sync use
25//! is to bind the command to an Event which will be sent with the result of the command:
26//!
27//! ```
28//!# use url::Url;
29//!# use crux_core::{Command, render};
30//!# use crux_http::command::Http;
31//!# const API_URL: &str = "https://example.com/";
32//!# pub enum Event { Increment, Set(crux_http::Result<crux_http::Response<usize>>) }
33//!# #[derive(crux_core::macros::Effect)]
34//!# pub struct Capabilities {
35//!#     pub render: crux_core::render::Render<Event>,
36//!#     pub http: crux_http::Http<Event>,
37//!# }
38//!# #[derive(Default)] pub struct Model { count: usize }
39//!# #[derive(Default)] pub struct App;
40//!# impl crux_core::App for App {
41//!#     type Event = Event;
42//!#     type Model = Model;
43//!#     type ViewModel = ();
44//!#     type Capabilities = Capabilities;
45//!#     type Effect = Effect;
46//! fn update(
47//!     &self,
48//!     event: Self::Event,
49//!     model: &mut Self::Model,
50//!     _caps: &Self::Capabilities)
51//! -> Command<Effect, Event> {
52//!     match event {
53//!         //...
54//!         Event::Increment => {
55//!             let base = Url::parse(API_URL).unwrap();
56//!             let url = base.join("/inc").unwrap();
57//!
58//!             Http::post(url)            // creates an HTTP RequestBuilder
59//!                 .expect_json()
60//!                 .build()               // creates a Command RequestBuilder
61//!                 .then_send(Event::Set) // creates a Command
62//!         }
63//!         Event::Set(Ok(mut response)) => {
64//!              let count = response.take_body().unwrap();
65//!              model.count = count;
66//!              render::render()
67//!         }
68//!         Event::Set(Err(_)) => todo!()
69//!     }
70//! }
71//!# fn view(&self, model: &Self::Model) {
72//!#     unimplemented!()
73//!# }
74//!# }
75//! ```
76//!
77//! Commands can be chained, allowing the outputs of the first effect to be used in constructing the second
78//! effect. For example, the following code creates a new post, then fetches the full created post based
79//! on a url read from the response to the creation request:
80//!
81//! ```
82//!# use crux_core::Command;
83//!# use crux_http::command::Http;
84//!# use crux_core::render::render;
85//!# use doctest_support::command::{Effect, Event, AnOperation, AnOperationOutput, Post};
86//!# const API_URL: &str = "https://example.com/";
87//!# let result = {
88//! let cmd: Command<Effect, Event> =
89//!     Http::post(API_URL)
90//!         .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
91//!         .expect_json::<Post>()
92//!         .build()
93//!         .then_request(|result| {
94//!             let post = result.unwrap();
95//!             let url = &post.body().unwrap().url;
96//!
97//!             Http::get(url).expect_json().build()
98//!         })
99//!         .then_send(Event::GotPost);
100//!
101//! // Run the http request concurrently with notifying the shell to render
102//! Command::all([cmd, render()])
103//!# };
104//! ```
105//!
106//! The same can be done with the async API, if you need more complex orchestration that is
107//! more naturally expressed in async rust
108//!
109//! ```
110//! # use crux_core::Command;
111//! # use crux_http::command::Http;
112//! # use doctest_support::command::{Effect, Event, AnOperation, AnOperationOutput, Post};
113//! # const API_URL: &str = "";
114//! let cmd: Command<Effect, Event> = Command::new(|ctx| async move {
115//!     let first = Http::post(API_URL)
116//!         .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
117//!         .expect_json::<Post>()
118//!         .build()
119//!         .into_future(ctx.clone())
120//!         .await;
121//!
122//!     let post = first.unwrap();
123//!     let url = &post.body().unwrap().url;
124//!
125//!     let second = Http::get(url).expect_json().build().into_future(ctx.clone()).await;
126//!
127//!     ctx.send_event(Event::GotPost(second));
128//! });
129//! ```
130//!
131//! In the async context, you can spawn additional concurrent tasks, which can, for example,
132//! communicate with each other via channels, to enable more complex orchestrations, stateful
133//! connection handling and other advanced uses.
134//!
135//! ```
136//! # use crux_core::Command;
137//! # use doctest_support::command::{Effect, Event, AnOperation};
138//! let mut cmd: Command<Effect, Event> = Command::new(|ctx| async move {
139//!     let (tx, rx) = async_channel::unbounded();
140//!
141//!     ctx.spawn(|ctx| async move {
142//!         for i in 0..10u8 {
143//!             let output = ctx.request_from_shell(AnOperation::One(i)).await;
144//!             tx.send(output).await.unwrap();
145//!         }
146//!     });
147//!
148//!     ctx.spawn(|ctx| async move {
149//!         while let Ok(value) = rx.recv().await {
150//!             ctx.send_event(Event::Completed(value));
151//!         }
152//!         ctx.send_event(Event::Aborted);
153//!     });
154//! });
155//! ```
156//!
157//! Commands can be cancelled, by calling [`Command::abort_handle`]
158//! and then calling `abort` on the returned handle.
159//!
160//! ```
161//! # use crux_core::Command;
162//! # use doctest_support::command::{Effect, Event, AnOperation};
163//! let mut cmd: Command<Effect, Event> = Command::all([
164//!     Command::request_from_shell(AnOperation::One(1)).then_send(Event::Completed),
165//!     Command::request_from_shell(AnOperation::Two(1)).then_send(Event::Completed),
166//! ]);
167//!
168//! let handle = cmd.abort_handle();
169//!
170//! // Command is still running
171//! assert!(!cmd.was_aborted());
172//!
173//! handle.abort();
174//!
175//! // Command is now finished
176//! assert!(cmd.is_done());
177//! // And was aborted
178//! assert!(cmd.was_aborted());
179//!
180//! ```
181//!
182//! You can test that Commands yield the expected effects and events.
183//! Commands can be tested in isolation by creating them explicitly
184//! in a test, and then checking the effects and events they generated.
185//! Or you can call your app's `update` function in a test, and perform
186//! the same checks on the returned Command.
187//!
188//! ```
189//! # use crux_http::{
190//! #     command::Http,
191//! #     protocol::{HttpRequest, HttpResponse, HttpResult},
192//! #     testing::ResponseBuilder,
193//! # };
194//! # use doctest_support::command::{Effect, Event, Post};
195//! const API_URL: &str = "https://example.com/api/posts";
196//!
197//! // Create a command to post a new Post to API_URL
198//! // and then dispatch an event with the result
199//! let mut cmd = Http::post(API_URL)
200//!     .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
201//!     .expect_json()
202//!     .build()
203//!     .then_send(Event::GotPost);
204//!
205//! // Check the effect is an HTTP request ...
206//! let effect = cmd.effects().next().unwrap();
207//! let Effect::Http(mut request) = effect else {
208//!     panic!("Expected a HTTP effect")
209//! };
210//!
211//! // ... and the request is a POST to API_URL
212//! assert_eq!(
213//!     &request.operation,
214//!     &HttpRequest::post(API_URL)
215//!         .header("content-type", "application/json")
216//!         .body(r#"{"body":"Hello!","title":"New Post"}"#)
217//!         .build()
218//! );
219//!
220//! // Resolve the request with a successful response
221//! let body = Post {
222//!     url: API_URL.to_string(),
223//!     title: "New Post".to_string(),
224//!     body: "Hello!".to_string(),
225//! };
226//! request
227//!     .resolve(HttpResult::Ok(HttpResponse::ok().json(&body).build()))
228//!     .expect("Resolve should succeed");
229//!
230//! // Check the event is a GotPost event with the successful response
231//! let actual = cmd.events().next().unwrap();
232//! let expected = Event::GotPost(Ok(ResponseBuilder::ok().body(body).build()));
233//! assert_eq!(actual, expected);
234//!
235//! assert!(cmd.is_done());
236//! ```
237
238mod builder;
239mod context;
240mod executor;
241mod stream;
242
243use std::future::Future;
244use std::sync::atomic::AtomicBool;
245use std::sync::Arc;
246
247// TODO: consider switching to flume
248use crossbeam_channel::{Receiver, Sender};
249use executor::{AbortHandle, Task, TaskId};
250use futures::task::AtomicWaker;
251use futures::{FutureExt as _, Stream, StreamExt as _};
252use slab::Slab;
253use stream::CommandStreamExt as _;
254
255pub use builder::{NotificationBuilder, RequestBuilder, StreamBuilder};
256pub use context::CommandContext;
257pub use stream::CommandOutput;
258
259use crate::capability::Operation;
260use crate::Request;
261
262#[must_use = "Unused commands never execute. Return the command from your app's update function or combine it with other commands with Command::and or Command::all"]
263pub struct Command<Effect, Event> {
264    effects: Receiver<Effect>,
265    events: Receiver<Event>,
266    context: CommandContext<Effect, Event>,
267
268    // Executor internals
269    // TODO: should this be a separate type?
270    ready_queue: Receiver<TaskId>,
271    spawn_queue: Receiver<Task>,
272    tasks: Slab<Task>,
273    ready_sender: Sender<TaskId>, // Used in creating wakers for tasks
274    waker: Arc<AtomicWaker>,      // Shared with task wakers when polled in async context
275
276    // Signaling
277    aborted: Arc<AtomicBool>,
278}
279
280// Public API
281
282impl<Effect, Event> Command<Effect, Event>
283where
284    Effect: Send + 'static,
285    Event: Send + 'static,
286{
287    /// Create a new command orchestrating effects with async Rust. This is the lowest level
288    /// API to create a Command if you need full control over its execution. In most cases you will
289    /// more likely want to create Commands with capabilities, and using the combinator APIs
290    /// ([`and`](Command::and) and [`all`](Command::all)) to orchestrate them.
291    ///
292    /// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
293    /// events back to the app, and to spawn additional tasks. The closure is expected to return a future
294    /// which becomes the command's main asynchronous task.
295    ///
296    /// # Panics
297    ///
298    /// If we could not make the task ready because the ready channel was disconnected.
299    pub fn new<F, Fut>(create_task: F) -> Self
300    where
301        F: FnOnce(CommandContext<Effect, Event>) -> Fut,
302        Fut: Future<Output = ()> + Send + 'static,
303    {
304        // RFC: do we need to think about backpressure? The channels are unbounded
305        // so a naughty Command can make massive amounts of requests or spawn a huge number of tasks.
306        // If these channels supported async, the CommandContext methods could also be async and
307        // we could give the channels some bounds
308        let (effect_sender, effect_receiver) = crossbeam_channel::unbounded();
309        let (event_sender, event_receiver) = crossbeam_channel::unbounded();
310        let (ready_sender, ready_receiver) = crossbeam_channel::unbounded();
311        let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded();
312        let (_, waker_receiver) = crossbeam_channel::unbounded();
313
314        let context = context::CommandContext {
315            effects: effect_sender,
316            events: event_sender,
317            tasks: spawn_sender,
318        };
319
320        let aborted: Arc<AtomicBool> = Arc::default();
321        let task = Task {
322            finished: Arc::default(),
323            aborted: aborted.clone(),
324            future: create_task(context.clone()).boxed(),
325            join_handle_wakers: waker_receiver,
326        };
327
328        let mut tasks = Slab::with_capacity(1);
329        let task_id = TaskId(tasks.insert(task));
330
331        ready_sender
332            .send(task_id)
333            .expect("Could not make task ready, ready channel disconnected");
334
335        Command {
336            effects: effect_receiver,
337            events: event_receiver,
338            context,
339            ready_queue: ready_receiver,
340            spawn_queue: spawn_receiver,
341            ready_sender,
342            tasks,
343            waker: Arc::default(),
344            aborted,
345        }
346    }
347
348    /// Create an empty, completed Command. This is useful as a return value from `update` if
349    /// there are no side-effects to perform.
350    pub fn done() -> Self {
351        Command::new(|_ctx| futures::future::ready(()))
352    }
353
354    /// Create a command from another command with compatible `Effect` and `Event` types
355    pub fn from<Ef, Ev>(subcmd: Command<Ef, Ev>) -> Self
356    where
357        Ef: Send + 'static + Into<Effect> + Unpin,
358        Ev: Send + 'static + Into<Event> + Unpin,
359        Effect: Unpin,
360        Event: Unpin,
361    {
362        subcmd.map_effect(Into::into).map_event(Into::into)
363    }
364
365    /// Turn the command into another command with compatible `Effect` and `Event` types
366    pub fn into<Ef, Ev>(self) -> Command<Ef, Ev>
367    where
368        Ef: Send + 'static + Unpin,
369        Ev: Send + 'static + Unpin,
370        Effect: Unpin + Into<Ef>,
371        Event: Unpin + Into<Ev>,
372    {
373        self.map_effect(Into::into).map_event(Into::into)
374    }
375
376    /// Create a Command which dispatches an event and terminates. This is an alternative
377    /// to calling `update` recursively. The only difference is that the two `update` calls
378    /// will be visible to Crux and can show up in logs or any tooling. The trade-off is that
379    /// the event is not guaranteed to dispatch instantly - another `update` call which is
380    /// already scheduled may happen first.
381    pub fn event(event: Event) -> Self {
382        Command::new(|ctx| async move { ctx.send_event(event) })
383    }
384
385    /// Start a creation of a Command which sends a notification to the shell with a provided
386    /// `operation`.
387    ///
388    /// Returns a [`NotificationBuilder`] which can be converted into a Command directly.
389    ///
390    /// In an async context, `NotificationBuilder` can be turned into a future that resolves to the
391    /// operation output type.
392    pub fn notify_shell<Op>(
393        operation: Op,
394    ) -> builder::NotificationBuilder<Effect, Event, impl Future<Output = ()>>
395    where
396        Op: Operation,
397        Effect: From<Request<Op>>,
398    {
399        builder::NotificationBuilder::new(|ctx| async move { ctx.notify_shell(operation) })
400    }
401
402    /// Start a creation of a Command which sends a one-time request to the shell with a provided
403    /// operation.
404    ///
405    /// Returns a `RequestBuilder`, which can be converted into a Command directly, or chained
406    /// with another command builder using `.then`.
407    ///
408    /// In an async context, `RequestBuilder` can be turned into a future that resolves to the
409    /// operation output type.
410    pub fn request_from_shell<Op>(
411        operation: Op,
412    ) -> builder::RequestBuilder<Effect, Event, impl Future<Output = Op::Output>>
413    where
414        Op: Operation,
415        Effect: From<Request<Op>>,
416    {
417        builder::RequestBuilder::new(|ctx| ctx.request_from_shell(operation))
418    }
419
420    /// Start a creation of a Command which sends a stream request to the shell with a provided
421    /// operation.
422    ///
423    /// Returns a `StreamBuilder`, which can be converted into a Command directly, or chained
424    /// with a `RequestBuilder` builder using `.then`.
425    ///
426    /// In an async context, `StreamBuilder` can be turned into a stream that with the
427    /// operation output type as item.
428    pub fn stream_from_shell<Op>(
429        operation: Op,
430    ) -> builder::StreamBuilder<Effect, Event, impl Stream<Item = Op::Output>>
431    where
432        Op: Operation,
433        Effect: From<Request<Op>>,
434    {
435        builder::StreamBuilder::new(|ctx| ctx.stream_from_shell(operation))
436    }
437
438    /// Run the effect state machine until it settles, then return true
439    /// if there is any more work to do - tasks to run or events or effects to receive
440    pub fn is_done(&mut self) -> bool {
441        self.run_until_settled();
442
443        self.effects.is_empty() && self.events.is_empty() && self.tasks.is_empty()
444    }
445
446    /// Run the effect state machine until it settles and return an iterator over the effects
447    pub fn effects(&mut self) -> impl Iterator<Item = Effect> + '_ {
448        self.run_until_settled();
449
450        self.effects.try_iter()
451    }
452
453    /// Run the effect state machine until it settles and return an iterator over the events
454    pub fn events(&mut self) -> impl Iterator<Item = Event> + '_ {
455        self.run_until_settled();
456
457        self.events.try_iter()
458    }
459
460    // Combinators
461
462    /// Create a command running self and the other command in sequence
463    // RFC: is this actually _useful_? Unlike `.then` on `CommandBuilder` this doesn't allow using
464    // the output of the first command in building the second one, it just runs them in sequence,
465    // and the benefit is unclear.
466    pub fn then(self, other: Self) -> Self
467    where
468        Effect: Unpin,
469        Event: Unpin,
470    {
471        Command::new(|ctx| async move {
472            // first run self until done
473            self.host(ctx.effects.clone(), ctx.events.clone()).await;
474
475            // then run other until done
476            other.host(ctx.effects, ctx.events).await;
477        })
478    }
479
480    /// Convenience for [`Command::all`] which runs another command concurrently with this one
481    pub fn and(mut self, other: Self) -> Self
482    where
483        Effect: Unpin,
484        Event: Unpin,
485    {
486        self.spawn(|ctx| other.host(ctx.effects, ctx.events).map(|_| ()));
487
488        self
489    }
490
491    /// Create a command running a number of commands concurrently
492    pub fn all<I>(commands: I) -> Self
493    where
494        I: IntoIterator<Item = Self>,
495        Effect: Unpin,
496        Event: Unpin,
497    {
498        let mut command = Command::done();
499
500        for c in commands {
501            command.spawn(|ctx| c.host(ctx.effects, ctx.events).map(|_| ()));
502        }
503
504        command
505    }
506
507    // Mapping for composition
508
509    /// Map effects requested as part of this command to a different effect type.
510    ///
511    /// This is useful when composing apps to convert a command from a child app to a
512    /// command of the parent app.
513    pub fn map_effect<F, NewEffect>(self, map: F) -> Command<NewEffect, Event>
514    where
515        F: Fn(Effect) -> NewEffect + Send + Sync + 'static,
516        NewEffect: Send + Unpin + 'static,
517        Effect: Unpin,
518        Event: Unpin,
519    {
520        Command::new(|ctx| async move {
521            let mapped = self.map(|output| match output {
522                CommandOutput::Effect(effect) => CommandOutput::Effect(map(effect)),
523                CommandOutput::Event(event) => CommandOutput::Event(event),
524            });
525
526            mapped.host(ctx.effects, ctx.events).await;
527        })
528    }
529
530    /// Map events sent as part of this command to a different effect type
531    ///
532    /// This is useful when composing apps to convert a command from a child app to a
533    /// command of the parent app.
534    pub fn map_event<F, NewEvent>(self, map: F) -> Command<Effect, NewEvent>
535    where
536        F: Fn(Event) -> NewEvent + Send + Sync + 'static,
537        NewEvent: Send + Unpin + 'static,
538        Effect: Unpin,
539        Event: Unpin,
540    {
541        Command::new(|ctx| async move {
542            let mapped = self.map(|output| match output {
543                CommandOutput::Effect(effect) => CommandOutput::Effect(effect),
544                CommandOutput::Event(event) => CommandOutput::Event(map(event)),
545            });
546
547            mapped.host(ctx.effects, ctx.events).await;
548        })
549    }
550
551    /// Spawn an additional task on the command. The task will execute concurrently with
552    /// existing tasks
553    ///
554    /// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
555    /// events back to the app, and to spawn additional tasks. The closure is expected to return a future.
556    pub fn spawn<F, Fut>(&mut self, create_task: F)
557    where
558        F: FnOnce(CommandContext<Effect, Event>) -> Fut,
559        Fut: Future<Output = ()> + Send + 'static,
560    {
561        self.context.spawn(create_task);
562    }
563
564    /// Returns an abort handle which can be used to remotely terminate a running Command
565    /// and all its subtask.
566    ///
567    /// This is specifically useful for cancelling subscriptions and long running effects
568    /// which may get superseded, like timers
569    #[must_use]
570    pub fn abort_handle(&self) -> AbortHandle {
571        AbortHandle {
572            aborted: self.aborted.clone(),
573        }
574    }
575}
576
577impl<Effect, Event> FromIterator<Command<Effect, Event>> for Command<Effect, Event>
578where
579    Effect: Send + Unpin + 'static,
580    Event: Send + Unpin + 'static,
581{
582    fn from_iter<I: IntoIterator<Item = Command<Effect, Event>>>(iter: I) -> Self {
583        Command::all(iter)
584    }
585}
586
587#[cfg(test)]
588mod tests;