Skip to main content

crux_core/command/
builder.rs

1//! Command builders are an abstraction allowing chaining effects,
2//! where outputs of one effect can serve as inputs to further effects,
3//! without requiring an async context.
4//!
5//! Chaining streams with streams is currently not supported, as the semantics
6//! of the composition are unclear. If you need to compose streams, use the async
7//! API and tools from the `futures` crate.
8
9use std::future::Future;
10
11use futures::{FutureExt, Stream, StreamExt};
12
13use super::{Command, context::CommandContext};
14
15/// A builder of one-off notify command
16// Task is a future which does the shell talking and returns an output
17pub struct NotificationBuilder<Effect, Event, Task> {
18    make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
19}
20
21impl<Effect, Event, Task> NotificationBuilder<Effect, Event, Task>
22where
23    Effect: Send + 'static,
24    Event: Send + 'static,
25    Task: Future<Output = ()> + Send + 'static,
26{
27    pub fn new<F>(make_task: F) -> Self
28    where
29        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
30    {
31        let make_task = Box::new(make_task);
32
33        NotificationBuilder { make_task }
34    }
35
36    /// Convert the [`NotificationBuilder`] into a future to use in an async context
37    #[must_use]
38    pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
39        let make_task = self.make_task;
40        make_task(ctx)
41    }
42
43    /// Convert the [`NotificationBuilder`] into a [`Command`] to use in an sync context
44    pub fn build(self) -> Command<Effect, Event> {
45        Command::new(move |ctx| self.into_future(ctx))
46    }
47}
48
49impl<Effect, Event, Task> From<NotificationBuilder<Effect, Event, Task>> for Command<Effect, Event>
50where
51    Effect: Send + 'static,
52    Event: Send + 'static,
53    Task: Future<Output = ()> + Send + 'static,
54{
55    fn from(value: NotificationBuilder<Effect, Event, Task>) -> Self {
56        Command::new(|ctx| value.into_future(ctx))
57    }
58}
59
60/// A builder of one-off request command
61// Task is a future which does the shell talking and returns an output
62pub struct RequestBuilder<Effect, Event, Task> {
63    make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
64}
65
66impl<Effect, Event, Task, T> RequestBuilder<Effect, Event, Task>
67where
68    Effect: Send + 'static,
69    Event: Send + 'static,
70    Task: Future<Output = T> + Send + 'static,
71{
72    pub fn new<F>(make_task: F) -> Self
73    where
74        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
75    {
76        let make_task = Box::new(make_task);
77
78        RequestBuilder { make_task }
79    }
80
81    pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
82    where
83        F: FnOnce(T) -> U + Send + 'static,
84    {
85        RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
86    }
87
88    /// Chain a [`NotificationBuilder`] to run after completion of this one,
89    /// passing the result to the provided closure `make_next_builder`.
90    ///
91    /// The returned value of the closure must be a [`NotificationBuilder`], which
92    /// can represent the notification to be sent before the composed future
93    /// is finished.
94    ///
95    /// If you want to chain a request, use [`Self::then_request`] instead.
96    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
97    ///
98    /// The closure `make_next_builder` is only run *after* successful completion
99    /// of the `self` future.
100    ///
101    /// Note that this function consumes the receiving `RequestBuilder`
102    /// and returns a [`NotificationBuilder`] that represents the composition.
103    ///
104    /// # Example
105    ///
106    /// ```
107    /// # use crux_core::{Command, Request};
108    /// # use crux_core::capability::Operation;
109    /// # use serde::{Deserialize, Serialize};
110    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
111    /// # enum AnOperation {
112    /// #     Request(u8),
113    /// #     Notify,
114    /// # }
115    /// #
116    /// # #[derive(Debug, PartialEq, Deserialize)]
117    /// # enum AnOperationOutput {
118    /// #     Response(String),
119    /// # }
120    /// #
121    /// # impl Operation for AnOperation {
122    /// #     type Output = AnOperationOutput;
123    /// # }
124    /// #
125    /// # #[derive(Debug)]
126    /// # enum Effect {
127    /// #     AnEffect(Request<AnOperation>),
128    /// # }
129    /// #
130    /// # impl From<Request<AnOperation>> for Effect {
131    /// #     fn from(request: Request<AnOperation>) -> Self {
132    /// #         Self::AnEffect(request)
133    /// #     }
134    /// # }
135    /// #
136    /// # #[derive(Debug, PartialEq)]
137    /// # enum Event {
138    /// #     Response(AnOperationOutput),
139    /// # }
140    /// let mut cmd: Command<Effect, Event> =
141    ///     Command::request_from_shell(AnOperation::Request(10))
142    ///     .then_notify(|response| {
143    ///         let AnOperationOutput::Response(_response) = response else {
144    ///             panic!("Invalid output!")
145    ///         };
146    ///
147    ///         // possibly do something with the response
148    ///
149    ///         Command::notify_shell(AnOperation::Notify)
150    ///     })
151    ///     .build();
152    ///
153    /// let effect = cmd.effects().next().unwrap();
154    /// let Effect::AnEffect(mut request) = effect;
155    ///
156    /// assert_eq!(request.operation, AnOperation::Request(10));
157    ///
158    /// request
159    ///     .resolve(AnOperationOutput::Response("ten".to_string()))
160    ///     .expect("should work");
161    ///
162    /// assert!(cmd.events().next().is_none());
163    /// let effect = cmd.effects().next().unwrap();
164    /// let Effect::AnEffect(request) = effect;
165    ///
166    /// assert_eq!(request.operation, AnOperation::Notify);
167    /// assert!(cmd.is_done());
168    /// ```
169    pub fn then_notify<F, NextTask>(
170        self,
171        make_next_builder: F,
172    ) -> NotificationBuilder<Effect, Event, impl Future<Output = ()>>
173    where
174        F: FnOnce(T) -> NotificationBuilder<Effect, Event, NextTask> + Send + 'static,
175        NextTask: Future<Output = ()> + Send + 'static,
176    {
177        NotificationBuilder::new(|ctx| {
178            self.into_future(ctx.clone())
179                .then(|out| make_next_builder(out).into_future(ctx))
180        })
181    }
182
183    /// Chain another [`RequestBuilder`] to run after completion of this one,
184    /// passing the result to the provided closure `make_next_builder`.
185    ///
186    /// The returned value of the closure must be a [`RequestBuilder`], which
187    /// can represent some more work to be done before the composed future
188    /// is finished.
189    ///
190    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
191    ///
192    /// The closure `make_next_builder` is only run *after* successful completion
193    /// of the `self` future.
194    ///
195    /// Note that this function consumes the receiving `RequestBuilder` and returns a
196    /// new one that represents the composition.
197    ///
198    /// # Example
199    ///
200    /// ```
201    /// # use crux_core::{Command, Request};
202    /// # use crux_core::capability::Operation;
203    /// # use serde::{Deserialize, Serialize};
204    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
205    /// # enum AnOperation {
206    /// #     One,
207    /// #     Two,
208    /// #     More(u8),
209    /// # }
210    /// #
211    /// # #[derive(Debug, PartialEq, Deserialize)]
212    /// # enum AnOperationOutput {
213    /// #     One,
214    /// #     Two,
215    /// #     Other(u8),
216    /// # }
217    /// #
218    /// # impl Operation for AnOperation {
219    /// #     type Output = AnOperationOutput;
220    /// # }
221    /// #
222    /// # #[derive(Debug)]
223    /// # enum Effect {
224    /// #     AnEffect(Request<AnOperation>),
225    /// # }
226    /// #
227    /// # impl From<Request<AnOperation>> for Effect {
228    /// #     fn from(request: Request<AnOperation>) -> Self {
229    /// #         Self::AnEffect(request)
230    /// #     }
231    /// # }
232    /// #
233    /// # #[derive(Debug, PartialEq)]
234    /// # enum Event {
235    /// #     Completed(AnOperationOutput),
236    /// # }
237    /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
238    ///     .then_request(|first| {
239    ///         let AnOperationOutput::Other(first) = first else {
240    ///             panic!("Invalid output!")
241    ///         };
242    ///
243    ///         let second = first + 1;
244    ///         Command::request_from_shell(AnOperation::More(second))
245    ///     })
246    ///     .then_send(Event::Completed);
247    ///
248    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
249    /// assert_eq!(request.operation, AnOperation::More(1));
250    ///
251    /// request
252    ///    .resolve(AnOperationOutput::Other(1))
253    ///    .expect("to resolve");
254    ///
255    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
256    /// assert_eq!(request.operation, AnOperation::More(2));
257    /// ```
258    pub fn then_request<F, U, NextTask>(
259        self,
260        make_next_builder: F,
261    ) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
262    where
263        F: FnOnce(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
264        NextTask: Future<Output = U> + Send + 'static,
265    {
266        RequestBuilder::new(|ctx| {
267            self.into_future(ctx.clone())
268                .then(|out| make_next_builder(out).into_future(ctx))
269        })
270    }
271
272    /// Chain a [`StreamBuilder`] to run after completion of this [`RequestBuilder`],
273    /// passing the result to the provided closure `make_next_builder`.
274    ///
275    /// The returned value of the closure must be a [`StreamBuilder`], which
276    /// can represent some more work to be done before the composed future
277    /// is finished.
278    ///
279    /// If you want to chain a request, use [`Self::then_request`] instead.
280    ///
281    /// The closure `make_next_builder` is only run *after* successful completion
282    /// of the `self` future.
283    ///
284    /// Note that this function consumes the receiving `RequestBuilder` and returns a
285    /// [`StreamBuilder`] that represents the composition.
286    ///
287    /// # Example
288    ///
289    /// ```
290    /// # use crux_core::{Command, Request};
291    /// # use crux_core::capability::Operation;
292    /// # use serde::{Deserialize, Serialize};
293    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
294    /// # enum AnOperation {
295    /// #     One,
296    /// #     Two,
297    /// #     More(u8),
298    /// # }
299    /// #
300    /// # #[derive(Debug, PartialEq, Deserialize)]
301    /// # enum AnOperationOutput {
302    /// #     One,
303    /// #     Two,
304    /// #     Other(u8),
305    /// # }
306    /// #
307    /// # impl Operation for AnOperation {
308    /// #     type Output = AnOperationOutput;
309    /// # }
310    /// #
311    /// # #[derive(Debug)]
312    /// # enum Effect {
313    /// #     AnEffect(Request<AnOperation>),
314    /// # }
315    /// #
316    /// # impl From<Request<AnOperation>> for Effect {
317    /// #     fn from(request: Request<AnOperation>) -> Self {
318    /// #         Self::AnEffect(request)
319    /// #     }
320    /// # }
321    /// #
322    /// # #[derive(Debug, PartialEq)]
323    /// # enum Event {
324    /// #     Completed(AnOperationOutput),
325    /// # }
326    /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
327    ///    .then_stream(|first| {
328    ///       let AnOperationOutput::Other(first) = first else {
329    ///          panic!("Invalid output!")
330    ///      };
331    ///
332    ///      let second = first + 1;
333    ///      Command::stream_from_shell(AnOperation::More(second))
334    ///    })
335    ///    .then_send(Event::Completed);
336    ///
337    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
338    /// assert_eq!(request.operation, AnOperation::More(1));
339    ///
340    /// request
341    ///   .resolve(AnOperationOutput::Other(1))
342    ///   .expect("to resolve");
343    ///
344    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
345    /// assert_eq!(request.operation, AnOperation::More(2));
346    pub fn then_stream<F, U, NextTask>(
347        self,
348        make_next_builder: F,
349    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
350    where
351        F: FnOnce(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
352        NextTask: Stream<Item = U> + Send + 'static,
353    {
354        StreamBuilder::new(|ctx| {
355            self.into_future(ctx.clone())
356                .map(make_next_builder)
357                .into_stream()
358                .flat_map(move |builder| builder.into_stream(ctx.clone()))
359        })
360    }
361
362    /// Convert the [`RequestBuilder`] into a future to use in an async context
363    #[must_use]
364    pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
365        let make_task = self.make_task;
366        make_task(ctx)
367    }
368
369    /// Create the command in an evented context
370    pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
371    where
372        E: FnOnce(T) -> Event + Send + 'static,
373        Task: Future<Output = T> + Send + 'static,
374    {
375        Command::new(move |ctx| {
376            self.into_future(ctx.clone())
377                .map(move |out| ctx.send_event(event(out)))
378        })
379    }
380
381    /// Convert the [`RequestBuilder`] into a [`Command`] to use in an sync context
382    ///
383    /// Note: You might be looking for [`then_send`](Self::then_send)
384    /// instead, which will send the output back into the app with an event.
385    ///
386    /// The command created in this function will *ignore* the output
387    /// of the request so may not be very useful.
388    /// It might be useful when using a 3rd party capability and you don't
389    /// care about the request's response.
390    pub fn build(self) -> Command<Effect, Event> {
391        Command::new(move |ctx| self.into_future(ctx).map(|_| ()))
392    }
393}
394
395/// A builder of stream command
396pub struct StreamBuilder<Effect, Event, Task> {
397    make_stream: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
398}
399
400impl<Effect, Event, Task, T> StreamBuilder<Effect, Event, Task>
401where
402    Effect: Send + 'static,
403    Event: Send + 'static,
404    Task: Stream<Item = T> + Send + 'static,
405{
406    pub fn new<F>(make_task: F) -> Self
407    where
408        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
409    {
410        let make_task = Box::new(make_task);
411
412        StreamBuilder {
413            make_stream: make_task,
414        }
415    }
416
417    pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
418    where
419        F: FnMut(T) -> U + Send + 'static,
420    {
421        StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
422    }
423
424    /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
425    /// passing the result to the provided closure `make_next_builder`.
426    ///
427    /// The returned value of the closure must be a [`RequestBuilder`], which
428    /// can represent some more work to be done before the composed future
429    /// is finished.
430    ///
431    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
432    ///
433    /// The closure `make_next_builder` is only run *after* successful completion
434    /// of the `self` future.
435    ///
436    /// Note that this function consumes the receiving `StreamBuilder` and returns a
437    /// new one that represents the composition.
438    ///
439    /// # Example
440    ///
441    /// ```
442    /// # use crux_core::{Command, Request};
443    /// # use crux_core::capability::Operation;
444    /// # use serde::{Deserialize, Serialize};
445    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
446    /// # enum AnOperation {
447    /// #     One,
448    /// #     Two,
449    /// #     More(u8),
450    /// # }
451    /// #
452    /// # #[derive(Debug, PartialEq, Deserialize)]
453    /// # enum AnOperationOutput {
454    /// #     One,
455    /// #     Two,
456    /// #     Other(u8),
457    /// # }
458    /// #
459    /// # impl Operation for AnOperation {
460    /// #     type Output = AnOperationOutput;
461    /// # }
462    /// #
463    /// # #[derive(Debug)]
464    /// # enum Effect {
465    /// #     AnEffect(Request<AnOperation>),
466    /// # }
467    /// #
468    /// # impl From<Request<AnOperation>> for Effect {
469    /// #     fn from(request: Request<AnOperation>) -> Self {
470    /// #         Self::AnEffect(request)
471    /// #     }
472    /// # }
473    /// #
474    /// # #[derive(Debug, PartialEq)]
475    /// # enum Event {
476    /// #     Completed(AnOperationOutput),
477    /// # }
478    /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
479    ///     .then_request(|first| {
480    ///         let AnOperationOutput::Other(first) = first else {
481    ///             panic!("Invalid output!")
482    ///         };
483    ///
484    ///         let second = first + 1;
485    ///         Command::request_from_shell(AnOperation::More(second))
486    ///     })
487    ///     .then_send(Event::Completed);
488    ///
489    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
490    /// assert_eq!(request.operation, AnOperation::More(1));
491    ///
492    /// request
493    ///    .resolve(AnOperationOutput::Other(1))
494    ///    .expect("to resolve");
495    ///
496    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
497    /// assert_eq!(request.operation, AnOperation::More(2));
498    /// ```
499    pub fn then_request<F, U, NextTask>(
500        self,
501        make_next_builder: F,
502    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
503    where
504        F: Fn(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
505        NextTask: Future<Output = U> + Send + 'static,
506    {
507        StreamBuilder::new(|ctx| {
508            self.into_stream(ctx.clone())
509                .then(move |item| make_next_builder(item).into_future(ctx.clone()))
510        })
511    }
512
513    /// Chain another [`StreamBuilder`] to run after completion of this one,
514    /// passing the result to the provided closure `make_next_builder`.
515    ///
516    /// The returned value of the closure must be a [`StreamBuilder`], which
517    /// can represent some more work to be done before the composed future
518    /// is finished.
519    ///
520    /// If you want to chain a request, use [`Self::then_request`] instead.
521    ///
522    /// The closure `make_next_builder` is only run *after* successful completion
523    /// of the `self` future.
524    ///
525    /// Note that this function consumes the receiving `StreamBuilder` and returns a
526    /// new one that represents the composition.
527    ///
528    /// # Example
529    ///
530    /// ```
531    /// # use crux_core::{Command, Request};
532    /// # use crux_core::capability::Operation;
533    /// # use serde::{Deserialize, Serialize};
534    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
535    /// # enum AnOperation {
536    /// #     One,
537    /// #     Two,
538    /// #     More(u8),
539    /// # }
540    /// #
541    /// # #[derive(Debug, PartialEq, Deserialize)]
542    /// # enum AnOperationOutput {
543    /// #     One,
544    /// #     Two,
545    /// #     Other(u8),
546    /// # }
547    /// #
548    /// # impl Operation for AnOperation {
549    /// #     type Output = AnOperationOutput;
550    /// # }
551    /// #
552    /// # #[derive(Debug)]
553    /// # enum Effect {
554    /// #     AnEffect(Request<AnOperation>),
555    /// # }
556    /// #
557    /// # impl From<Request<AnOperation>> for Effect {
558    /// #     fn from(request: Request<AnOperation>) -> Self {
559    /// #         Self::AnEffect(request)
560    /// #     }
561    /// # }
562    /// #
563    /// # #[derive(Debug, PartialEq)]
564    /// # enum Event {
565    /// #     Completed(AnOperationOutput),
566    /// # }
567    /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
568    ///    .then_stream(|first| {
569    ///       let AnOperationOutput::Other(first) = first else {
570    ///          panic!("Invalid output!")
571    ///      };
572    ///
573    ///      let second = first + 1;
574    ///      Command::stream_from_shell(AnOperation::More(second))
575    ///    })
576    ///    .then_send(Event::Completed);
577    ///
578    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
579    /// assert_eq!(request.operation, AnOperation::More(1));
580    ///
581    /// request
582    ///   .resolve(AnOperationOutput::Other(1))
583    ///   .expect("to resolve");
584    ///
585    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
586    /// assert_eq!(request.operation, AnOperation::More(2));
587    pub fn then_stream<F, U, NextTask>(
588        self,
589        make_next_builder: F,
590    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
591    where
592        F: Fn(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
593        NextTask: Stream<Item = U> + Send + 'static,
594    {
595        StreamBuilder::new(move |ctx| {
596            self.into_stream(ctx.clone())
597                .map(move |item| {
598                    let next_builder = make_next_builder(item);
599                    Box::pin(next_builder.into_stream(ctx.clone()))
600                })
601                .flatten_unordered(None)
602        })
603    }
604
605    /// Create the command in an evented context
606    pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
607    where
608        E: Fn(T) -> Event + Send + 'static,
609    {
610        Command::new(move |ctx| {
611            self.into_stream(ctx.clone()).for_each(move |out| {
612                ctx.send_event(event(out));
613                futures::future::ready(())
614            })
615        })
616    }
617
618    /// Convert the [`StreamBuilder`] into a stream to use in an async context
619    #[must_use]
620    pub fn into_stream(self, ctx: CommandContext<Effect, Event>) -> Task {
621        let make_stream = self.make_stream;
622
623        make_stream(ctx)
624    }
625
626    /// Convert the [`StreamBuilder`] into a [`Command`] to use in an sync context
627    ///
628    /// Note: You might be looking for [`then_send`](Self::then_send)
629    /// instead, which will send each item in the stream back into the
630    /// app with an event.
631    ///
632    /// The command created in this function will *ignore* the output
633    /// of the stream so may not be very useful.
634    /// It may be useful when using a 3rd party capability and you don't
635    /// care about the stream output.
636    pub fn build(self) -> Command<Effect, Event> {
637        Command::new(move |ctx| {
638            self.into_stream(ctx)
639                .for_each(|_| futures::future::ready(()))
640        })
641    }
642}