crux_core/command/
builder.rs

1//! Command builders are an abstraction allowing chaining effects,
2//! where outputs of one effect can serve as inputs to further effects,
3//! without requiring an async context.
4//!
5//! Chaining streams with streams is currently not supported, as the semantics
6//! of the composition are unclear. If you need to compose streams, use the async
7//! API and tools from the `futures` crate.
8
9use std::{future::Future, pin::pin};
10
11use futures::{FutureExt, Stream, StreamExt};
12
13use super::{context::CommandContext, Command};
14
15/// A builder of one-off request command
16// Task is a future which does the shell talking and returns an output
17pub struct RequestBuilder<Effect, Event, Task> {
18    make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
19}
20
21impl<Effect, Event, Task, T> RequestBuilder<Effect, Event, Task>
22where
23    Effect: Send + 'static,
24    Event: Send + 'static,
25    Task: Future<Output = T> + Send + 'static,
26{
27    pub fn new<F>(make_task: F) -> Self
28    where
29        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
30    {
31        let make_task = Box::new(make_task);
32
33        RequestBuilder { make_task }
34    }
35
36    pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
37    where
38        F: FnOnce(T) -> U + Send + 'static,
39    {
40        RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
41    }
42
43    /// Chain another [`RequestBuilder`] to run after completion of this one,
44    /// passing the result to the provided closure `make_next_builder`.
45    ///
46    /// The returned value of the closure must be a `RequestBuilder`, which
47    /// can represent some more work to be done before the composed future
48    /// is finished.
49    ///
50    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
51    ///
52    /// The closure `make_next_builder` is only run *after* successful completion
53    /// of the `self` future.
54    ///
55    /// Note that this function consumes the receiving `RequestBuilder` and returns a
56    /// new one that represents the composition.
57    ///
58    /// # Example
59    ///
60    /// ```
61    /// # use crux_core::{Command, Request};
62    /// # use crux_core::capability::Operation;
63    /// # use serde::{Deserialize, Serialize};
64    /// # #[derive(Debug, PartialEq, Clone, Serialize)]
65    /// # enum AnOperation {
66    /// #     One,
67    /// #     Two,
68    /// #     More(u8),
69    /// # }
70    /// #
71    /// # #[derive(Debug, PartialEq, Deserialize)]
72    /// # enum AnOperationOutput {
73    /// #     One,
74    /// #     Two,
75    /// #     Other(u8),
76    /// # }
77    /// #
78    /// # impl Operation for AnOperation {
79    /// #     type Output = AnOperationOutput;
80    /// # }
81    /// #
82    /// # #[derive(Debug)]
83    /// # enum Effect {
84    /// #     AnEffect(Request<AnOperation>),
85    /// # }
86    /// #
87    /// # impl From<Request<AnOperation>> for Effect {
88    /// #     fn from(request: Request<AnOperation>) -> Self {
89    /// #         Self::AnEffect(request)
90    /// #     }
91    /// # }
92    /// #
93    /// # #[derive(Debug, PartialEq)]
94    /// # enum Event {
95    /// #     Completed(AnOperationOutput),
96    /// # }
97    /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
98    ///     .then_request(|first| {
99    ///         let AnOperationOutput::Other(first) = first else {
100    ///             panic!("Invalid output!")
101    ///         };
102    ///
103    ///         let second = first + 1;
104    ///         Command::request_from_shell(AnOperation::More(second))
105    ///     })
106    ///     .then_send(Event::Completed);
107    ///
108    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
109    /// assert_eq!(request.operation, AnOperation::More(1));
110    ///
111    /// request
112    ///    .resolve(AnOperationOutput::Other(1))
113    ///    .expect("to resolve");
114    ///
115    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
116    /// assert_eq!(request.operation, AnOperation::More(2));
117    /// ```
118    pub fn then_request<F, U, NextTask>(
119        self,
120        make_next_builder: F,
121    ) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
122    where
123        F: FnOnce(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
124        NextTask: Future<Output = U> + Send + 'static,
125    {
126        RequestBuilder::new(|ctx| {
127            self.into_future(ctx.clone())
128                .then(|out| make_next_builder(out).into_future(ctx))
129        })
130    }
131
132    /// Chain a [`StreamBuilder`] to run after completion of this [`RequestBuilder`],
133    /// passing the result to the provided closure `make_next_builder`.
134    ///
135    /// The returned value of the closure must be a `StreamBuilder`, which
136    /// can represent some more work to be done before the composed future
137    /// is finished.
138    ///
139    /// If you want to chain a request, use [`Self::then_request`] instead.
140    ///
141    /// The closure `make_next_builder` is only run *after* successful completion
142    /// of the `self` future.
143    ///
144    /// Note that this function consumes the receiving `RequestBuilder` and returns a
145    /// [`StreamBuilder`] that represents the composition.
146    ///
147    /// # Example
148    ///
149    /// ```
150    /// # use crux_core::{Command, Request};
151    /// # use crux_core::capability::Operation;
152    /// # use serde::{Deserialize, Serialize};
153    /// # #[derive(Debug, PartialEq, Clone, Serialize)]
154    /// # enum AnOperation {
155    /// #     One,
156    /// #     Two,
157    /// #     More(u8),
158    /// # }
159    /// #
160    /// # #[derive(Debug, PartialEq, Deserialize)]
161    /// # enum AnOperationOutput {
162    /// #     One,
163    /// #     Two,
164    /// #     Other(u8),
165    /// # }
166    /// #
167    /// # impl Operation for AnOperation {
168    /// #     type Output = AnOperationOutput;
169    /// # }
170    /// #
171    /// # #[derive(Debug)]
172    /// # enum Effect {
173    /// #     AnEffect(Request<AnOperation>),
174    /// # }
175    /// #
176    /// # impl From<Request<AnOperation>> for Effect {
177    /// #     fn from(request: Request<AnOperation>) -> Self {
178    /// #         Self::AnEffect(request)
179    /// #     }
180    /// # }
181    /// #
182    /// # #[derive(Debug, PartialEq)]
183    /// # enum Event {
184    /// #     Completed(AnOperationOutput),
185    /// # }
186    /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
187    ///    .then_stream(|first| {
188    ///       let AnOperationOutput::Other(first) = first else {
189    ///          panic!("Invalid output!")
190    ///      };
191    ///
192    ///      let second = first + 1;
193    ///      Command::stream_from_shell(AnOperation::More(second))
194    ///    })
195    ///    .then_send(Event::Completed);
196    ///
197    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
198    /// assert_eq!(request.operation, AnOperation::More(1));
199    ///
200    /// request
201    ///   .resolve(AnOperationOutput::Other(1))
202    ///   .expect("to resolve");
203    ///
204    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
205    /// assert_eq!(request.operation, AnOperation::More(2));
206    pub fn then_stream<F, U, NextTask>(
207        self,
208        make_next_builder: F,
209    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
210    where
211        F: FnOnce(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
212        NextTask: Stream<Item = U> + Send + 'static,
213    {
214        StreamBuilder::new(|ctx| {
215            self.into_future(ctx.clone())
216                .map(make_next_builder)
217                .into_stream()
218                .flat_map(move |builder| builder.into_stream(ctx.clone()))
219        })
220    }
221
222    /// Convert the request builder into a future to use in an async context
223    pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
224        let make_task = self.make_task;
225        make_task(ctx)
226    }
227
228    /// Create the command in an evented context
229    pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
230    where
231        E: FnOnce(T) -> Event + Send + 'static,
232        Task: Future<Output = T> + Send + 'static,
233    {
234        Command::new(|ctx| async move {
235            let out = self.into_future(ctx.clone()).await;
236            ctx.send_event(event(out));
237        })
238    }
239}
240
241/// A builder of stream command
242pub struct StreamBuilder<Effect, Event, Task> {
243    make_stream: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
244}
245
246impl<Effect, Event, Task, T> StreamBuilder<Effect, Event, Task>
247where
248    Effect: Send + 'static,
249    Event: Send + 'static,
250    Task: Stream<Item = T> + Send + 'static,
251{
252    pub fn new<F>(make_task: F) -> Self
253    where
254        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
255    {
256        let make_task = Box::new(make_task);
257
258        StreamBuilder {
259            make_stream: make_task,
260        }
261    }
262
263    pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
264    where
265        F: FnMut(T) -> U + Send + 'static,
266    {
267        StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
268    }
269
270    /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
271    /// passing the result to the provided closure `make_next_builder`.
272    ///
273    /// The returned value of the closure must be a [`StreamBuilder`], which
274    /// can represent some more work to be done before the composed future
275    /// is finished.
276    ///
277    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
278    ///
279    /// The closure `make_next_builder` is only run *after* successful completion
280    /// of the `self` future.
281    ///
282    /// Note that this function consumes the receiving `StreamBuilder` and returns a
283    /// new one that represents the composition.
284    ///
285    /// # Example
286    ///
287    /// ```
288    /// # use crux_core::{Command, Request};
289    /// # use crux_core::capability::Operation;
290    /// # use serde::{Deserialize, Serialize};
291    /// # #[derive(Debug, PartialEq, Clone, Serialize)]
292    /// # enum AnOperation {
293    /// #     One,
294    /// #     Two,
295    /// #     More(u8),
296    /// # }
297    /// #
298    /// # #[derive(Debug, PartialEq, Deserialize)]
299    /// # enum AnOperationOutput {
300    /// #     One,
301    /// #     Two,
302    /// #     Other(u8),
303    /// # }
304    /// #
305    /// # impl Operation for AnOperation {
306    /// #     type Output = AnOperationOutput;
307    /// # }
308    /// #
309    /// # #[derive(Debug)]
310    /// # enum Effect {
311    /// #     AnEffect(Request<AnOperation>),
312    /// # }
313    /// #
314    /// # impl From<Request<AnOperation>> for Effect {
315    /// #     fn from(request: Request<AnOperation>) -> Self {
316    /// #         Self::AnEffect(request)
317    /// #     }
318    /// # }
319    /// #
320    /// # #[derive(Debug, PartialEq)]
321    /// # enum Event {
322    /// #     Completed(AnOperationOutput),
323    /// # }
324    /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
325    ///     .then_request(|first| {
326    ///         let AnOperationOutput::Other(first) = first else {
327    ///             panic!("Invalid output!")
328    ///         };
329    ///
330    ///         let second = first + 1;
331    ///         Command::request_from_shell(AnOperation::More(second))
332    ///     })
333    ///     .then_send(Event::Completed);
334    ///
335    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
336    /// assert_eq!(request.operation, AnOperation::More(1));
337    ///
338    /// request
339    ///    .resolve(AnOperationOutput::Other(1))
340    ///    .expect("to resolve");
341    ///
342    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
343    /// assert_eq!(request.operation, AnOperation::More(2));
344    /// ```
345    pub fn then_request<F, U, NextTask>(
346        self,
347        make_next_builder: F,
348    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
349    where
350        F: Fn(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
351        NextTask: Future<Output = U> + Send + 'static,
352    {
353        StreamBuilder::new(|ctx| {
354            self.into_stream(ctx.clone())
355                .then(move |item| make_next_builder(item).into_future(ctx.clone()))
356        })
357    }
358
359    /// Chain another [`StreamBuilder`] to run after completion of this one,
360    /// passing the result to the provided closure `make_next_builder`.
361    ///
362    /// The returned value of the closure must be a `StreamBuilder`, which
363    /// can represent some more work to be done before the composed future
364    /// is finished.
365    ///
366    /// If you want to chain a request, use [`Self::then_request`] instead.
367    ///
368    /// The closure `make_next_builder` is only run *after* successful completion
369    /// of the `self` future.
370    ///
371    /// Note that this function consumes the receiving `StreamBuilder` and returns a
372    /// new one that represents the composition.
373    ///
374    /// # Example
375    ///
376    /// ```
377    /// # use crux_core::{Command, Request};
378    /// # use crux_core::capability::Operation;
379    /// # use serde::{Deserialize, Serialize};
380    /// # #[derive(Debug, PartialEq, Clone, Serialize)]
381    /// # enum AnOperation {
382    /// #     One,
383    /// #     Two,
384    /// #     More(u8),
385    /// # }
386    /// #
387    /// # #[derive(Debug, PartialEq, Deserialize)]
388    /// # enum AnOperationOutput {
389    /// #     One,
390    /// #     Two,
391    /// #     Other(u8),
392    /// # }
393    /// #
394    /// # impl Operation for AnOperation {
395    /// #     type Output = AnOperationOutput;
396    /// # }
397    /// #
398    /// # #[derive(Debug)]
399    /// # enum Effect {
400    /// #     AnEffect(Request<AnOperation>),
401    /// # }
402    /// #
403    /// # impl From<Request<AnOperation>> for Effect {
404    /// #     fn from(request: Request<AnOperation>) -> Self {
405    /// #         Self::AnEffect(request)
406    /// #     }
407    /// # }
408    /// #
409    /// # #[derive(Debug, PartialEq)]
410    /// # enum Event {
411    /// #     Completed(AnOperationOutput),
412    /// # }
413    /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
414    ///    .then_stream(|first| {
415    ///       let AnOperationOutput::Other(first) = first else {
416    ///          panic!("Invalid output!")
417    ///      };
418    ///
419    ///      let second = first + 1;
420    ///      Command::stream_from_shell(AnOperation::More(second))
421    ///    })
422    ///    .then_send(Event::Completed);
423    ///
424    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
425    /// assert_eq!(request.operation, AnOperation::More(1));
426    ///
427    /// request
428    ///   .resolve(AnOperationOutput::Other(1))
429    ///   .expect("to resolve");
430    ///
431    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
432    /// assert_eq!(request.operation, AnOperation::More(2));
433    pub fn then_stream<F, U, NextTask>(
434        self,
435        make_next_builder: F,
436    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
437    where
438        F: Fn(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
439        NextTask: Stream<Item = U> + Send + 'static,
440    {
441        StreamBuilder::new(move |ctx| {
442            self.into_stream(ctx.clone())
443                .map(move |item| {
444                    let next_builder = make_next_builder(item);
445                    Box::pin(next_builder.into_stream(ctx.clone()))
446                })
447                .flatten_unordered(None)
448        })
449    }
450
451    /// Create the command in an evented context
452    pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
453    where
454        E: Fn(T) -> Event + Send + 'static,
455    {
456        Command::new(|ctx| async move {
457            let mut stream = pin!(self.into_stream(ctx.clone()));
458
459            while let Some(out) = stream.next().await {
460                ctx.send_event(event(out));
461            }
462        })
463    }
464
465    /// Convert the stream builder into a stream to use in an async context
466    pub fn into_stream(self, ctx: CommandContext<Effect, Event>) -> Task {
467        let make_stream = self.make_stream;
468
469        make_stream(ctx)
470    }
471}