crux_core/command/mod.rs
1//! Command represents one or more side-effects, resulting in interactions with the shell.
2//!
3//! Core creates Commands and returns them from the `update` function in response to events.
4//! Commands can be created directly, but more often they will be created and returned
5//! by capability APIs.
6//!
7//! A Command can execute side-effects in parallel, in sequence or a combination of both. To
8//! allow this orchestration they provide both a simple synchronous API and access to an
9//! asynchronous API.
10//!
11//! Command surfaces the effect requests and events sent in response with
12//! the [`Command::effects`] and [`Command::events`] methods. These can be used when testing
13//! the side effects requested by an `update` call.
14//!
15//! Internally, Command resembles [`FuturesUnordered`](futures::stream::FuturesUnordered):
16//! it manages and polls a number of futures and provides a context which they can use
17//! to submit effects to the shell and events back to the application.
18//!
19//! Command implements [`Stream`], making it useful in an async context,
20//! enabling, for example, wrapping Commands in one another.
21//!
22//! # Examples
23//!
24//! Commands are typically created by a capability and returned from the update function. Capabilities
25//! normally return a builder, which can be used in both sync and async context. The basic sync use
26//! is to bind the command to an Event which will be sent with the result of the command:
27//!
28//! ```
29//!# use url::Url;
30//!# use crux_core::{Command, render::render, render::RenderOperation};
31//!# use crux_core::macros::effect;
32//!# use crux_http::protocol::HttpRequest;
33//!# use crux_http::Http;
34//!# const API_URL: &str = "https://example.com/";
35//!# pub enum Event { Increment, Set(crux_http::Result<crux_http::Response<usize>>) }
36//!# #[effect]
37//!# pub enum Effect {
38//!# Render(RenderOperation),
39//!# Http(HttpRequest)
40//!# }
41//!# #[derive(Default)] pub struct Model { count: usize }
42//!# #[derive(Default)] pub struct App;
43//!# impl crux_core::App for App {
44//!# type Event = Event;
45//!# type Model = Model;
46//!# type ViewModel = ();
47//!# type Effect = Effect;
48//! fn update(
49//! &self,
50//! event: Self::Event,
51//! model: &mut Self::Model)
52//! -> Command<Effect, Event> {
53//! match event {
54//! //...
55//! Event::Increment => {
56//! let base = Url::parse(API_URL).unwrap();
57//! let url = base.join("/inc").unwrap();
58//!
59//! Http::post(url) // creates an HTTP RequestBuilder
60//! .expect_json()
61//! .build() // creates a Command RequestBuilder
62//! .then_send(Event::Set) // creates a Command
63//! }
64//! Event::Set(Ok(mut response)) => {
65//! let count = response.take_body().unwrap();
66//! model.count = count;
67//! render()
68//! }
69//! Event::Set(Err(_)) => todo!()
70//! }
71//! }
72//!# fn view(&self, model: &Self::Model) {
73//!# unimplemented!()
74//!# }
75//!# }
76//! ```
77//!
78//! Commands can be chained, allowing the outputs of the first effect to be used in constructing the second
79//! effect. For example, the following code creates a new post, then fetches the full created post based
80//! on a url read from the response to the creation request:
81//!
82//! ```
83//!# use crux_core::Command;
84//!# use crux_http::command::Http;
85//!# use crux_core::render::render;
86//!# use doctest_support::command::{Effect, Event, AnOperation, AnOperationOutput, Post};
87//!# const API_URL: &str = "https://example.com/";
88//!# let result = {
89//! let cmd: Command<Effect, Event> =
90//! Http::post(API_URL)
91//! .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
92//! .expect_json::<Post>()
93//! .build()
94//! .then_request(|result| {
95//! let post = result.unwrap();
96//! let url = &post.body().unwrap().url;
97//!
98//! Http::get(url).expect_json().build()
99//! })
100//! .then_send(Event::GotPost);
101//!
102//! // Run the http request concurrently with notifying the shell to render
103//! Command::all([cmd, render()])
104//!# };
105//! ```
106//!
107//! The same can be done with the async API, if you need more complex orchestration that is
108//! more naturally expressed in async rust
109//!
110//! ```
111//! # use crux_core::Command;
112//! # use crux_http::command::Http;
113//! # use doctest_support::command::{Effect, Event, AnOperation, AnOperationOutput, Post};
114//! # const API_URL: &str = "";
115//! let cmd: Command<Effect, Event> = Command::new(|ctx| async move {
116//! let first = Http::post(API_URL)
117//! .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
118//! .expect_json::<Post>()
119//! .build()
120//! .into_future(ctx.clone())
121//! .await;
122//!
123//! let post = first.unwrap();
124//! let url = &post.body().unwrap().url;
125//!
126//! let second = Http::get(url).expect_json().build().into_future(ctx.clone()).await;
127//!
128//! ctx.send_event(Event::GotPost(second));
129//! });
130//! ```
131//!
132//! In the async context, you can spawn additional concurrent tasks, which can, for example,
133//! communicate with each other via channels, to enable more complex orchestrations, stateful
134//! connection handling and other advanced uses.
135//!
136//! ```
137//! # use crux_core::Command;
138//! # use doctest_support::command::{Effect, Event, AnOperation};
139//! let mut cmd: Command<Effect, Event> = Command::new(|ctx| async move {
140//! let (tx, rx) = async_channel::unbounded();
141//!
142//! ctx.spawn(|ctx| async move {
143//! for i in 0..10u8 {
144//! let output = ctx.request_from_shell(AnOperation::One(i)).await;
145//! tx.send(output).await.unwrap();
146//! }
147//! });
148//!
149//! ctx.spawn(|ctx| async move {
150//! while let Ok(value) = rx.recv().await {
151//! ctx.send_event(Event::Completed(value));
152//! }
153//! ctx.send_event(Event::Aborted);
154//! });
155//! });
156//! ```
157//!
158//! Commands can be cancelled, by calling [`Command::abort_handle`]
159//! and then calling `abort` on the returned handle.
160//!
161//! ```
162//! # use crux_core::Command;
163//! # use doctest_support::command::{Effect, Event, AnOperation};
164//! let mut cmd: Command<Effect, Event> = Command::all([
165//! Command::request_from_shell(AnOperation::One(1)).then_send(Event::Completed),
166//! Command::request_from_shell(AnOperation::Two(1)).then_send(Event::Completed),
167//! ]);
168//!
169//! let handle = cmd.abort_handle();
170//!
171//! // Command is still running
172//! assert!(!cmd.was_aborted());
173//!
174//! handle.abort();
175//!
176//! // Command is now finished
177//! assert!(cmd.is_done());
178//! // And was aborted
179//! assert!(cmd.was_aborted());
180//!
181//! ```
182//!
183//! You can test that Commands yield the expected effects and events.
184//! Commands can be tested in isolation by creating them explicitly
185//! in a test, and then checking the effects and events they generated.
186//! Or you can call your app's `update` function in a test, and perform
187//! the same checks on the returned Command.
188//!
189//! ```
190//! # use crux_http::{
191//! # command::Http,
192//! # protocol::{HttpRequest, HttpResponse, HttpResult},
193//! # testing::ResponseBuilder,
194//! # };
195//! # use doctest_support::command::{Effect, Event, Post};
196//! const API_URL: &str = "https://example.com/api/posts";
197//!
198//! // Create a command to post a new Post to API_URL
199//! // and then dispatch an event with the result
200//! let mut cmd = Http::post(API_URL)
201//! .body(serde_json::json!({"title":"New Post", "body":"Hello!"}))
202//! .expect_json()
203//! .build()
204//! .then_send(Event::GotPost);
205//!
206//! // Check the effect is an HTTP request ...
207//! let effect = cmd.expect_one_effect();
208//! let Effect::Http(mut request) = effect else {
209//! panic!("Expected a HTTP effect")
210//! };
211//!
212//! // ... and the request is a POST to API_URL
213//! assert_eq!(
214//! &request.operation,
215//! &HttpRequest::post(API_URL)
216//! .header("content-type", "application/json")
217//! .body(r#"{"body":"Hello!","title":"New Post"}"#)
218//! .build()
219//! );
220//!
221//! // Resolve the request with a successful response
222//! let body = Post {
223//! url: API_URL.to_string(),
224//! title: "New Post".to_string(),
225//! body: "Hello!".to_string(),
226//! };
227//! request
228//! .resolve(HttpResult::Ok(HttpResponse::ok().json(&body).build()))
229//! .expect("Resolve should succeed");
230//!
231//! // Check the event is a GotPost event with the successful response
232//! let actual = cmd.expect_one_event();
233//! let expected = Event::GotPost(Ok(ResponseBuilder::ok().body(body).build()));
234//! assert_eq!(actual, expected);
235//!
236//! assert!(cmd.is_done());
237//! ```
238
239mod builder;
240mod context;
241mod executor;
242mod stream;
243
244use std::future::Future;
245use std::sync::Arc;
246use std::sync::atomic::AtomicBool;
247
248// TODO: consider switching to flume
249use crossbeam_channel::{Receiver, Sender};
250use executor::{Task, TaskId};
251use futures::task::AtomicWaker;
252use futures::{FutureExt as _, Stream, StreamExt as _};
253use slab::Slab;
254use stream::CommandStreamExt as _;
255
256pub use builder::{NotificationBuilder, RequestBuilder, StreamBuilder};
257pub use context::CommandContext;
258pub use executor::AbortHandle;
259pub use stream::CommandOutput;
260
261use crate::Request;
262use crate::capability::Operation;
263
264// ANCHOR: command
265#[must_use = "Unused commands never execute. Return the command from your app's update function or combine it with other commands with Command::and or Command::all"]
266pub struct Command<Effect, Event> {
267 effects: Receiver<Effect>,
268 events: Receiver<Event>,
269 context: CommandContext<Effect, Event>,
270
271 // Executor internals
272 // TODO: should this be a separate type?
273 ready_queue: Receiver<TaskId>,
274 spawn_queue: Receiver<Task>,
275 tasks: Slab<Task>,
276 ready_sender: Sender<TaskId>, // Used in creating wakers for tasks
277 waker: Arc<AtomicWaker>, // Shared with task wakers when polled in async context
278
279 // Signaling
280 aborted: Arc<AtomicBool>,
281}
282// ANCHOR_END: command
283
284// Public API
285
286impl<Effect, Event> Command<Effect, Event>
287where
288 Effect: Send + 'static,
289 Event: Send + 'static,
290{
291 /// Create a new command orchestrating effects with async Rust. This is the lowest level
292 /// API to create a Command if you need full control over its execution. In most cases you will
293 /// more likely want to create Commands with capabilities, and using the combinator APIs
294 /// ([`and`](Command::and) and [`all`](Command::all)) to orchestrate them.
295 ///
296 /// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
297 /// events back to the app, and to spawn additional tasks. The closure is expected to return a future
298 /// which becomes the command's main asynchronous task.
299 ///
300 /// # Panics
301 ///
302 /// If we could not make the task ready because the ready channel was disconnected.
303 pub fn new<F, Fut>(create_task: F) -> Self
304 where
305 F: FnOnce(CommandContext<Effect, Event>) -> Fut,
306 Fut: Future<Output = ()> + Send + 'static,
307 {
308 // RFC: do we need to think about backpressure? The channels are unbounded
309 // so a naughty Command can make massive amounts of requests or spawn a huge number of tasks.
310 // If these channels supported async, the CommandContext methods could also be async and
311 // we could give the channels some bounds
312 let (effect_sender, effect_receiver) = crossbeam_channel::unbounded();
313 let (event_sender, event_receiver) = crossbeam_channel::unbounded();
314 let (ready_sender, ready_receiver) = crossbeam_channel::unbounded();
315 let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded();
316 let (_, waker_receiver) = crossbeam_channel::unbounded();
317
318 let context = context::CommandContext {
319 effects: effect_sender,
320 events: event_sender,
321 tasks: spawn_sender,
322 rc: Arc::default(),
323 };
324
325 let aborted: Arc<AtomicBool> = Arc::default();
326 let task = Task {
327 finished: Arc::default(),
328 aborted: aborted.clone(),
329 future: create_task(context.clone()).boxed(),
330 join_handle_wakers: waker_receiver,
331 };
332
333 let mut tasks = Slab::with_capacity(1);
334 let task_id = TaskId(tasks.insert(task));
335
336 ready_sender
337 .send(task_id)
338 .expect("Could not make task ready, ready channel disconnected");
339
340 Command {
341 effects: effect_receiver,
342 events: event_receiver,
343 context,
344 ready_queue: ready_receiver,
345 spawn_queue: spawn_receiver,
346 ready_sender,
347 tasks,
348 waker: Arc::default(),
349 aborted,
350 }
351 }
352
353 /// Create an empty, completed Command. This is useful as a return value from `update` if
354 /// there are no side-effects to perform.
355 pub fn done() -> Self {
356 Command::new(|_ctx| futures::future::ready(()))
357 }
358
359 /// Create a command from another command with compatible `Effect` and `Event` types
360 pub fn from<Ef, Ev>(subcmd: Command<Ef, Ev>) -> Self
361 where
362 Ef: Send + 'static + Into<Effect> + Unpin,
363 Ev: Send + 'static + Into<Event> + Unpin,
364 Effect: Unpin,
365 Event: Unpin,
366 {
367 subcmd.map_effect(Into::into).map_event(Into::into)
368 }
369
370 /// Turn the command into another command with compatible `Effect` and `Event` types
371 pub fn into<Ef, Ev>(self) -> Command<Ef, Ev>
372 where
373 Ef: Send + 'static + Unpin,
374 Ev: Send + 'static + Unpin,
375 Effect: Unpin + Into<Ef>,
376 Event: Unpin + Into<Ev>,
377 {
378 self.map_effect(Into::into).map_event(Into::into)
379 }
380
381 /// Create a Command which dispatches an event and terminates. This is an alternative
382 /// to calling `update` recursively. The only difference is that the two `update` calls
383 /// will be visible to Crux and can show up in logs or any tooling. The trade-off is that
384 /// the event is not guaranteed to dispatch instantly - another `update` call which is
385 /// already scheduled may happen first.
386 pub fn event(event: Event) -> Self {
387 Command::new(|ctx| async move { ctx.send_event(event) })
388 }
389
390 /// Start a creation of a Command which sends a notification to the shell with a provided
391 /// `operation`.
392 ///
393 /// Returns a [`NotificationBuilder`] which can be converted into a Command directly.
394 ///
395 /// In an async context, `NotificationBuilder` can be turned into a future that resolves to the
396 /// operation output type.
397 pub fn notify_shell<Op>(
398 operation: Op,
399 ) -> builder::NotificationBuilder<Effect, Event, impl Future<Output = ()>>
400 where
401 Op: Operation,
402 Effect: From<Request<Op>>,
403 {
404 builder::NotificationBuilder::new(|ctx| async move { ctx.notify_shell(operation) })
405 }
406
407 /// Start a creation of a Command which sends a one-time request to the shell with a provided
408 /// operation.
409 ///
410 /// Returns a `RequestBuilder`, which can be converted into a Command directly, or chained
411 /// with another command builder using `.then`.
412 ///
413 /// In an async context, `RequestBuilder` can be turned into a future that resolves to the
414 /// operation output type.
415 pub fn request_from_shell<Op>(
416 operation: Op,
417 ) -> builder::RequestBuilder<Effect, Event, impl Future<Output = Op::Output>>
418 where
419 Op: Operation,
420 Effect: From<Request<Op>>,
421 {
422 builder::RequestBuilder::new(|ctx| ctx.request_from_shell(operation))
423 }
424
425 /// Start a creation of a Command which sends a stream request to the shell with a provided
426 /// operation.
427 ///
428 /// Returns a `StreamBuilder`, which can be converted into a Command directly, or chained
429 /// with a `RequestBuilder` builder using `.then`.
430 ///
431 /// In an async context, `StreamBuilder` can be turned into a stream that with the
432 /// operation output type as item.
433 pub fn stream_from_shell<Op>(
434 operation: Op,
435 ) -> builder::StreamBuilder<Effect, Event, impl Stream<Item = Op::Output>>
436 where
437 Op: Operation,
438 Effect: From<Request<Op>>,
439 {
440 builder::StreamBuilder::new(|ctx| ctx.stream_from_shell(operation))
441 }
442
443 /// Run the effect state machine until it settles, then return true
444 /// if there is any more work to do - tasks to run or events or effects to receive
445 pub fn is_done(&mut self) -> bool {
446 self.run_until_settled();
447
448 // If a context is alive, the command can still receive effects, events or tasks.
449 let all_context_dropped = Arc::strong_count(&self.context.rc) == 1;
450
451 all_context_dropped
452 && self.effects.is_empty()
453 && self.events.is_empty()
454 && self.tasks.is_empty()
455 }
456
457 /// Run the effect state machine until it settles and return an iterator over the effects
458 pub fn effects(&mut self) -> impl Iterator<Item = Effect> + '_ {
459 self.run_until_settled();
460
461 self.effects.try_iter()
462 }
463
464 /// Run the effect state machine until it settles and return an iterator over the events
465 pub fn events(&mut self) -> impl Iterator<Item = Event> + '_ {
466 self.run_until_settled();
467
468 self.events.try_iter()
469 }
470
471 // Combinators
472
473 /// Convert the command into a future to use in an async context
474 pub async fn into_future(self, ctx: CommandContext<Effect, Event>)
475 where
476 Effect: Unpin + Send + 'static,
477 Event: Unpin + Send + 'static,
478 {
479 self.host(ctx.effects, ctx.events).await;
480 }
481
482 /// Create a command running self and the other command in sequence
483 // RFC: is this actually _useful_? Unlike `.then` on `CommandBuilder` this doesn't allow using
484 // the output of the first command in building the second one, it just runs them in sequence,
485 // and the benefit is unclear.
486 pub fn then(self, other: Self) -> Self
487 where
488 Effect: Unpin,
489 Event: Unpin,
490 {
491 Command::new(|ctx| async move {
492 // first run self until done
493 self.into_future(ctx.clone()).await;
494
495 // then run other until done
496 other.into_future(ctx).await;
497 })
498 }
499
500 /// Convenience for [`Command::all`] which runs another command concurrently with this one
501 pub fn and(mut self, other: Self) -> Self
502 where
503 Effect: Unpin,
504 Event: Unpin,
505 {
506 self.spawn(|ctx| other.into_future(ctx));
507
508 self
509 }
510
511 /// Create a command running a number of commands concurrently
512 pub fn all<I>(commands: I) -> Self
513 where
514 I: IntoIterator<Item = Self>,
515 Effect: Unpin,
516 Event: Unpin,
517 {
518 let mut command = Command::done();
519
520 for c in commands {
521 command.spawn(|ctx| c.into_future(ctx));
522 }
523
524 command
525 }
526
527 // Mapping for composition
528
529 /// Map effects requested as part of this command to a different effect type.
530 ///
531 /// This is useful when composing apps to convert a command from a child app to a
532 /// command of the parent app.
533 pub fn map_effect<F, NewEffect>(self, map: F) -> Command<NewEffect, Event>
534 where
535 F: Fn(Effect) -> NewEffect + Send + Sync + 'static,
536 NewEffect: Send + Unpin + 'static,
537 Effect: Unpin,
538 Event: Unpin,
539 {
540 Command::new(|ctx| async move {
541 let mapped = self.map(|output| match output {
542 CommandOutput::Effect(effect) => CommandOutput::Effect(map(effect)),
543 CommandOutput::Event(event) => CommandOutput::Event(event),
544 });
545
546 mapped.host(ctx.effects, ctx.events).await;
547 })
548 }
549
550 /// Map events sent as part of this command to a different effect type
551 ///
552 /// This is useful when composing apps to convert a command from a child app to a
553 /// command of the parent app.
554 pub fn map_event<F, NewEvent>(self, map: F) -> Command<Effect, NewEvent>
555 where
556 F: Fn(Event) -> NewEvent + Send + Sync + 'static,
557 NewEvent: Send + Unpin + 'static,
558 Effect: Unpin,
559 Event: Unpin,
560 {
561 Command::new(|ctx| async move {
562 let mapped = self.map(|output| match output {
563 CommandOutput::Effect(effect) => CommandOutput::Effect(effect),
564 CommandOutput::Event(event) => CommandOutput::Event(map(event)),
565 });
566
567 mapped.host(ctx.effects, ctx.events).await;
568 })
569 }
570
571 /// Spawn an additional task on the command. The task will execute concurrently with
572 /// existing tasks
573 ///
574 /// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
575 /// events back to the app, and to spawn additional tasks. The closure is expected to return a future.
576 pub fn spawn<F, Fut>(&mut self, create_task: F)
577 where
578 F: FnOnce(CommandContext<Effect, Event>) -> Fut,
579 Fut: Future<Output = ()> + Send + 'static,
580 {
581 self.context.spawn(create_task);
582 }
583
584 /// Returns an abort handle which can be used to remotely terminate a running Command
585 /// and all its subtask.
586 ///
587 /// This is specifically useful for cancelling subscriptions and long running effects
588 /// which may get superseded, like timers
589 #[must_use]
590 pub fn abort_handle(&self) -> AbortHandle {
591 AbortHandle {
592 aborted: self.aborted.clone(),
593 }
594 }
595}
596
597impl<Effect, Event> FromIterator<Command<Effect, Event>> for Command<Effect, Event>
598where
599 Effect: Send + Unpin + 'static,
600 Event: Send + Unpin + 'static,
601{
602 fn from_iter<I: IntoIterator<Item = Command<Effect, Event>>>(iter: I) -> Self {
603 Command::all(iter)
604 }
605}
606
607#[cfg(test)]
608mod tests;