crux_core/command/
builder.rs

1//! Command builders are an abstraction allowing chaining effects,
2//! where outputs of one effect can serve as inputs to further effects,
3//! without requiring an async context.
4//!
5//! Chaining streams with streams is currently not supported, as the semantics
6//! of the composition are unclear. If you need to compose streams, use the async
7//! API and tools from the `futures` crate.
8
9use std::{future::Future, pin::pin};
10
11use futures::{FutureExt, Stream, StreamExt};
12
13use super::{context::CommandContext, Command};
14
15/// A builder of one-off notify command
16// Task is a future which does the shell talking and returns an output
17pub struct NotificationBuilder<Effect, Event, Task> {
18    make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
19}
20
21impl<Effect, Event, Task> NotificationBuilder<Effect, Event, Task>
22where
23    Effect: Send + 'static,
24    Event: Send + 'static,
25    Task: Future<Output = ()> + Send + 'static,
26{
27    pub fn new<F>(make_task: F) -> Self
28    where
29        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
30    {
31        let make_task = Box::new(make_task);
32
33        NotificationBuilder { make_task }
34    }
35
36    /// Convert the [`NotificationBuilder`] into a future to use in an async context
37    pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
38        let make_task = self.make_task;
39        make_task(ctx)
40    }
41}
42
43impl<Effect, Event, Task> From<NotificationBuilder<Effect, Event, Task>> for Command<Effect, Event>
44where
45    Effect: Send + 'static,
46    Event: Send + 'static,
47    Task: Future<Output = ()> + Send + 'static,
48{
49    fn from(value: NotificationBuilder<Effect, Event, Task>) -> Self {
50        Command::new(|ctx| value.into_future(ctx))
51    }
52}
53
54/// A builder of one-off request command
55// Task is a future which does the shell talking and returns an output
56pub struct RequestBuilder<Effect, Event, Task> {
57    make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
58}
59
60impl<Effect, Event, Task, T> RequestBuilder<Effect, Event, Task>
61where
62    Effect: Send + 'static,
63    Event: Send + 'static,
64    Task: Future<Output = T> + Send + 'static,
65{
66    pub fn new<F>(make_task: F) -> Self
67    where
68        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
69    {
70        let make_task = Box::new(make_task);
71
72        RequestBuilder { make_task }
73    }
74
75    pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
76    where
77        F: FnOnce(T) -> U + Send + 'static,
78    {
79        RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
80    }
81
82    /// Chain another [`RequestBuilder`] to run after completion of this one,
83    /// passing the result to the provided closure `make_next_builder`.
84    ///
85    /// The returned value of the closure must be a `RequestBuilder`, which
86    /// can represent some more work to be done before the composed future
87    /// is finished.
88    ///
89    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
90    ///
91    /// The closure `make_next_builder` is only run *after* successful completion
92    /// of the `self` future.
93    ///
94    /// Note that this function consumes the receiving `RequestBuilder` and returns a
95    /// new one that represents the composition.
96    ///
97    /// # Example
98    ///
99    /// ```
100    /// # use crux_core::{Command, Request};
101    /// # use crux_core::capability::Operation;
102    /// # use serde::{Deserialize, Serialize};
103    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
104    /// # enum AnOperation {
105    /// #     One,
106    /// #     Two,
107    /// #     More(u8),
108    /// # }
109    /// #
110    /// # #[derive(Debug, PartialEq, Deserialize)]
111    /// # enum AnOperationOutput {
112    /// #     One,
113    /// #     Two,
114    /// #     Other(u8),
115    /// # }
116    /// #
117    /// # impl Operation for AnOperation {
118    /// #     type Output = AnOperationOutput;
119    /// # }
120    /// #
121    /// # #[derive(Debug)]
122    /// # enum Effect {
123    /// #     AnEffect(Request<AnOperation>),
124    /// # }
125    /// #
126    /// # impl From<Request<AnOperation>> for Effect {
127    /// #     fn from(request: Request<AnOperation>) -> Self {
128    /// #         Self::AnEffect(request)
129    /// #     }
130    /// # }
131    /// #
132    /// # #[derive(Debug, PartialEq)]
133    /// # enum Event {
134    /// #     Completed(AnOperationOutput),
135    /// # }
136    /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
137    ///     .then_request(|first| {
138    ///         let AnOperationOutput::Other(first) = first else {
139    ///             panic!("Invalid output!")
140    ///         };
141    ///
142    ///         let second = first + 1;
143    ///         Command::request_from_shell(AnOperation::More(second))
144    ///     })
145    ///     .then_send(Event::Completed);
146    ///
147    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
148    /// assert_eq!(request.operation, AnOperation::More(1));
149    ///
150    /// request
151    ///    .resolve(AnOperationOutput::Other(1))
152    ///    .expect("to resolve");
153    ///
154    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
155    /// assert_eq!(request.operation, AnOperation::More(2));
156    /// ```
157    pub fn then_request<F, U, NextTask>(
158        self,
159        make_next_builder: F,
160    ) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
161    where
162        F: FnOnce(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
163        NextTask: Future<Output = U> + Send + 'static,
164    {
165        RequestBuilder::new(|ctx| {
166            self.into_future(ctx.clone())
167                .then(|out| make_next_builder(out).into_future(ctx))
168        })
169    }
170
171    /// Chain a [`StreamBuilder`] to run after completion of this [`RequestBuilder`],
172    /// passing the result to the provided closure `make_next_builder`.
173    ///
174    /// The returned value of the closure must be a `StreamBuilder`, which
175    /// can represent some more work to be done before the composed future
176    /// is finished.
177    ///
178    /// If you want to chain a request, use [`Self::then_request`] instead.
179    ///
180    /// The closure `make_next_builder` is only run *after* successful completion
181    /// of the `self` future.
182    ///
183    /// Note that this function consumes the receiving `RequestBuilder` and returns a
184    /// [`StreamBuilder`] that represents the composition.
185    ///
186    /// # Example
187    ///
188    /// ```
189    /// # use crux_core::{Command, Request};
190    /// # use crux_core::capability::Operation;
191    /// # use serde::{Deserialize, Serialize};
192    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
193    /// # enum AnOperation {
194    /// #     One,
195    /// #     Two,
196    /// #     More(u8),
197    /// # }
198    /// #
199    /// # #[derive(Debug, PartialEq, Deserialize)]
200    /// # enum AnOperationOutput {
201    /// #     One,
202    /// #     Two,
203    /// #     Other(u8),
204    /// # }
205    /// #
206    /// # impl Operation for AnOperation {
207    /// #     type Output = AnOperationOutput;
208    /// # }
209    /// #
210    /// # #[derive(Debug)]
211    /// # enum Effect {
212    /// #     AnEffect(Request<AnOperation>),
213    /// # }
214    /// #
215    /// # impl From<Request<AnOperation>> for Effect {
216    /// #     fn from(request: Request<AnOperation>) -> Self {
217    /// #         Self::AnEffect(request)
218    /// #     }
219    /// # }
220    /// #
221    /// # #[derive(Debug, PartialEq)]
222    /// # enum Event {
223    /// #     Completed(AnOperationOutput),
224    /// # }
225    /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
226    ///    .then_stream(|first| {
227    ///       let AnOperationOutput::Other(first) = first else {
228    ///          panic!("Invalid output!")
229    ///      };
230    ///
231    ///      let second = first + 1;
232    ///      Command::stream_from_shell(AnOperation::More(second))
233    ///    })
234    ///    .then_send(Event::Completed);
235    ///
236    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
237    /// assert_eq!(request.operation, AnOperation::More(1));
238    ///
239    /// request
240    ///   .resolve(AnOperationOutput::Other(1))
241    ///   .expect("to resolve");
242    ///
243    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
244    /// assert_eq!(request.operation, AnOperation::More(2));
245    pub fn then_stream<F, U, NextTask>(
246        self,
247        make_next_builder: F,
248    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
249    where
250        F: FnOnce(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
251        NextTask: Stream<Item = U> + Send + 'static,
252    {
253        StreamBuilder::new(|ctx| {
254            self.into_future(ctx.clone())
255                .map(make_next_builder)
256                .into_stream()
257                .flat_map(move |builder| builder.into_stream(ctx.clone()))
258        })
259    }
260
261    /// Convert the request builder into a future to use in an async context
262    pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
263        let make_task = self.make_task;
264        make_task(ctx)
265    }
266
267    /// Create the command in an evented context
268    pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
269    where
270        E: FnOnce(T) -> Event + Send + 'static,
271        Task: Future<Output = T> + Send + 'static,
272    {
273        Command::new(|ctx| async move {
274            let out = self.into_future(ctx.clone()).await;
275            ctx.send_event(event(out));
276        })
277    }
278}
279
280/// A builder of stream command
281pub struct StreamBuilder<Effect, Event, Task> {
282    make_stream: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
283}
284
285impl<Effect, Event, Task, T> StreamBuilder<Effect, Event, Task>
286where
287    Effect: Send + 'static,
288    Event: Send + 'static,
289    Task: Stream<Item = T> + Send + 'static,
290{
291    pub fn new<F>(make_task: F) -> Self
292    where
293        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
294    {
295        let make_task = Box::new(make_task);
296
297        StreamBuilder {
298            make_stream: make_task,
299        }
300    }
301
302    pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
303    where
304        F: FnMut(T) -> U + Send + 'static,
305    {
306        StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
307    }
308
309    /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
310    /// passing the result to the provided closure `make_next_builder`.
311    ///
312    /// The returned value of the closure must be a [`StreamBuilder`], which
313    /// can represent some more work to be done before the composed future
314    /// is finished.
315    ///
316    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
317    ///
318    /// The closure `make_next_builder` is only run *after* successful completion
319    /// of the `self` future.
320    ///
321    /// Note that this function consumes the receiving `StreamBuilder` and returns a
322    /// new one that represents the composition.
323    ///
324    /// # Example
325    ///
326    /// ```
327    /// # use crux_core::{Command, Request};
328    /// # use crux_core::capability::Operation;
329    /// # use serde::{Deserialize, Serialize};
330    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
331    /// # enum AnOperation {
332    /// #     One,
333    /// #     Two,
334    /// #     More(u8),
335    /// # }
336    /// #
337    /// # #[derive(Debug, PartialEq, Deserialize)]
338    /// # enum AnOperationOutput {
339    /// #     One,
340    /// #     Two,
341    /// #     Other(u8),
342    /// # }
343    /// #
344    /// # impl Operation for AnOperation {
345    /// #     type Output = AnOperationOutput;
346    /// # }
347    /// #
348    /// # #[derive(Debug)]
349    /// # enum Effect {
350    /// #     AnEffect(Request<AnOperation>),
351    /// # }
352    /// #
353    /// # impl From<Request<AnOperation>> for Effect {
354    /// #     fn from(request: Request<AnOperation>) -> Self {
355    /// #         Self::AnEffect(request)
356    /// #     }
357    /// # }
358    /// #
359    /// # #[derive(Debug, PartialEq)]
360    /// # enum Event {
361    /// #     Completed(AnOperationOutput),
362    /// # }
363    /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
364    ///     .then_request(|first| {
365    ///         let AnOperationOutput::Other(first) = first else {
366    ///             panic!("Invalid output!")
367    ///         };
368    ///
369    ///         let second = first + 1;
370    ///         Command::request_from_shell(AnOperation::More(second))
371    ///     })
372    ///     .then_send(Event::Completed);
373    ///
374    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
375    /// assert_eq!(request.operation, AnOperation::More(1));
376    ///
377    /// request
378    ///    .resolve(AnOperationOutput::Other(1))
379    ///    .expect("to resolve");
380    ///
381    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
382    /// assert_eq!(request.operation, AnOperation::More(2));
383    /// ```
384    pub fn then_request<F, U, NextTask>(
385        self,
386        make_next_builder: F,
387    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
388    where
389        F: Fn(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
390        NextTask: Future<Output = U> + Send + 'static,
391    {
392        StreamBuilder::new(|ctx| {
393            self.into_stream(ctx.clone())
394                .then(move |item| make_next_builder(item).into_future(ctx.clone()))
395        })
396    }
397
398    /// Chain another [`StreamBuilder`] to run after completion of this one,
399    /// passing the result to the provided closure `make_next_builder`.
400    ///
401    /// The returned value of the closure must be a `StreamBuilder`, which
402    /// can represent some more work to be done before the composed future
403    /// is finished.
404    ///
405    /// If you want to chain a request, use [`Self::then_request`] instead.
406    ///
407    /// The closure `make_next_builder` is only run *after* successful completion
408    /// of the `self` future.
409    ///
410    /// Note that this function consumes the receiving `StreamBuilder` and returns a
411    /// new one that represents the composition.
412    ///
413    /// # Example
414    ///
415    /// ```
416    /// # use crux_core::{Command, Request};
417    /// # use crux_core::capability::Operation;
418    /// # use serde::{Deserialize, Serialize};
419    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
420    /// # enum AnOperation {
421    /// #     One,
422    /// #     Two,
423    /// #     More(u8),
424    /// # }
425    /// #
426    /// # #[derive(Debug, PartialEq, Deserialize)]
427    /// # enum AnOperationOutput {
428    /// #     One,
429    /// #     Two,
430    /// #     Other(u8),
431    /// # }
432    /// #
433    /// # impl Operation for AnOperation {
434    /// #     type Output = AnOperationOutput;
435    /// # }
436    /// #
437    /// # #[derive(Debug)]
438    /// # enum Effect {
439    /// #     AnEffect(Request<AnOperation>),
440    /// # }
441    /// #
442    /// # impl From<Request<AnOperation>> for Effect {
443    /// #     fn from(request: Request<AnOperation>) -> Self {
444    /// #         Self::AnEffect(request)
445    /// #     }
446    /// # }
447    /// #
448    /// # #[derive(Debug, PartialEq)]
449    /// # enum Event {
450    /// #     Completed(AnOperationOutput),
451    /// # }
452    /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
453    ///    .then_stream(|first| {
454    ///       let AnOperationOutput::Other(first) = first else {
455    ///          panic!("Invalid output!")
456    ///      };
457    ///
458    ///      let second = first + 1;
459    ///      Command::stream_from_shell(AnOperation::More(second))
460    ///    })
461    ///    .then_send(Event::Completed);
462    ///
463    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
464    /// assert_eq!(request.operation, AnOperation::More(1));
465    ///
466    /// request
467    ///   .resolve(AnOperationOutput::Other(1))
468    ///   .expect("to resolve");
469    ///
470    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
471    /// assert_eq!(request.operation, AnOperation::More(2));
472    pub fn then_stream<F, U, NextTask>(
473        self,
474        make_next_builder: F,
475    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
476    where
477        F: Fn(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
478        NextTask: Stream<Item = U> + Send + 'static,
479    {
480        StreamBuilder::new(move |ctx| {
481            self.into_stream(ctx.clone())
482                .map(move |item| {
483                    let next_builder = make_next_builder(item);
484                    Box::pin(next_builder.into_stream(ctx.clone()))
485                })
486                .flatten_unordered(None)
487        })
488    }
489
490    /// Create the command in an evented context
491    pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
492    where
493        E: Fn(T) -> Event + Send + 'static,
494    {
495        Command::new(|ctx| async move {
496            let mut stream = pin!(self.into_stream(ctx.clone()));
497
498            while let Some(out) = stream.next().await {
499                ctx.send_event(event(out));
500            }
501        })
502    }
503
504    /// Convert the stream builder into a stream to use in an async context
505    pub fn into_stream(self, ctx: CommandContext<Effect, Event>) -> Task {
506        let make_stream = self.make_stream;
507
508        make_stream(ctx)
509    }
510}