crux_core/command/
builder.rs

1//! Command builders are an abstraction allowing chaining effects,
2//! where outputs of one effect can serve as inputs to further effects,
3//! without requiring an async context.
4//!
5//! Chaining streams with streams is currently not supported, as the semantics
6//! of the composition are unclear. If you need to compose streams, use the async
7//! API and tools from the `futures` crate.
8
9use std::{future::Future, pin::pin};
10
11use futures::{FutureExt, Stream, StreamExt};
12
13use super::{context::CommandContext, Command};
14
15/// A builder of one-off notify command
16// Task is a future which does the shell talking and returns an output
17pub struct NotificationBuilder<Effect, Event, Task> {
18    make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
19}
20
21impl<Effect, Event, Task> NotificationBuilder<Effect, Event, Task>
22where
23    Effect: Send + 'static,
24    Event: Send + 'static,
25    Task: Future<Output = ()> + Send + 'static,
26{
27    pub fn new<F>(make_task: F) -> Self
28    where
29        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
30    {
31        let make_task = Box::new(make_task);
32
33        NotificationBuilder { make_task }
34    }
35
36    /// Convert the [`NotificationBuilder`] into a future to use in an async context
37    #[must_use]
38    pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
39        let make_task = self.make_task;
40        make_task(ctx)
41    }
42}
43
44impl<Effect, Event, Task> From<NotificationBuilder<Effect, Event, Task>> for Command<Effect, Event>
45where
46    Effect: Send + 'static,
47    Event: Send + 'static,
48    Task: Future<Output = ()> + Send + 'static,
49{
50    fn from(value: NotificationBuilder<Effect, Event, Task>) -> Self {
51        Command::new(|ctx| value.into_future(ctx))
52    }
53}
54
55/// A builder of one-off request command
56// Task is a future which does the shell talking and returns an output
57pub struct RequestBuilder<Effect, Event, Task> {
58    make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
59}
60
61impl<Effect, Event, Task, T> RequestBuilder<Effect, Event, Task>
62where
63    Effect: Send + 'static,
64    Event: Send + 'static,
65    Task: Future<Output = T> + Send + 'static,
66{
67    pub fn new<F>(make_task: F) -> Self
68    where
69        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
70    {
71        let make_task = Box::new(make_task);
72
73        RequestBuilder { make_task }
74    }
75
76    pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
77    where
78        F: FnOnce(T) -> U + Send + 'static,
79    {
80        RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
81    }
82
83    /// Chain another [`RequestBuilder`] to run after completion of this one,
84    /// passing the result to the provided closure `make_next_builder`.
85    ///
86    /// The returned value of the closure must be a `RequestBuilder`, which
87    /// can represent some more work to be done before the composed future
88    /// is finished.
89    ///
90    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
91    ///
92    /// The closure `make_next_builder` is only run *after* successful completion
93    /// of the `self` future.
94    ///
95    /// Note that this function consumes the receiving `RequestBuilder` and returns a
96    /// new one that represents the composition.
97    ///
98    /// # Example
99    ///
100    /// ```
101    /// # use crux_core::{Command, Request};
102    /// # use crux_core::capability::Operation;
103    /// # use serde::{Deserialize, Serialize};
104    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
105    /// # enum AnOperation {
106    /// #     One,
107    /// #     Two,
108    /// #     More(u8),
109    /// # }
110    /// #
111    /// # #[derive(Debug, PartialEq, Deserialize)]
112    /// # enum AnOperationOutput {
113    /// #     One,
114    /// #     Two,
115    /// #     Other(u8),
116    /// # }
117    /// #
118    /// # impl Operation for AnOperation {
119    /// #     type Output = AnOperationOutput;
120    /// # }
121    /// #
122    /// # #[derive(Debug)]
123    /// # enum Effect {
124    /// #     AnEffect(Request<AnOperation>),
125    /// # }
126    /// #
127    /// # impl From<Request<AnOperation>> for Effect {
128    /// #     fn from(request: Request<AnOperation>) -> Self {
129    /// #         Self::AnEffect(request)
130    /// #     }
131    /// # }
132    /// #
133    /// # #[derive(Debug, PartialEq)]
134    /// # enum Event {
135    /// #     Completed(AnOperationOutput),
136    /// # }
137    /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
138    ///     .then_request(|first| {
139    ///         let AnOperationOutput::Other(first) = first else {
140    ///             panic!("Invalid output!")
141    ///         };
142    ///
143    ///         let second = first + 1;
144    ///         Command::request_from_shell(AnOperation::More(second))
145    ///     })
146    ///     .then_send(Event::Completed);
147    ///
148    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
149    /// assert_eq!(request.operation, AnOperation::More(1));
150    ///
151    /// request
152    ///    .resolve(AnOperationOutput::Other(1))
153    ///    .expect("to resolve");
154    ///
155    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
156    /// assert_eq!(request.operation, AnOperation::More(2));
157    /// ```
158    pub fn then_request<F, U, NextTask>(
159        self,
160        make_next_builder: F,
161    ) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
162    where
163        F: FnOnce(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
164        NextTask: Future<Output = U> + Send + 'static,
165    {
166        RequestBuilder::new(|ctx| {
167            self.into_future(ctx.clone())
168                .then(|out| make_next_builder(out).into_future(ctx))
169        })
170    }
171
172    /// Chain a [`StreamBuilder`] to run after completion of this [`RequestBuilder`],
173    /// passing the result to the provided closure `make_next_builder`.
174    ///
175    /// The returned value of the closure must be a `StreamBuilder`, which
176    /// can represent some more work to be done before the composed future
177    /// is finished.
178    ///
179    /// If you want to chain a request, use [`Self::then_request`] instead.
180    ///
181    /// The closure `make_next_builder` is only run *after* successful completion
182    /// of the `self` future.
183    ///
184    /// Note that this function consumes the receiving `RequestBuilder` and returns a
185    /// [`StreamBuilder`] that represents the composition.
186    ///
187    /// # Example
188    ///
189    /// ```
190    /// # use crux_core::{Command, Request};
191    /// # use crux_core::capability::Operation;
192    /// # use serde::{Deserialize, Serialize};
193    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
194    /// # enum AnOperation {
195    /// #     One,
196    /// #     Two,
197    /// #     More(u8),
198    /// # }
199    /// #
200    /// # #[derive(Debug, PartialEq, Deserialize)]
201    /// # enum AnOperationOutput {
202    /// #     One,
203    /// #     Two,
204    /// #     Other(u8),
205    /// # }
206    /// #
207    /// # impl Operation for AnOperation {
208    /// #     type Output = AnOperationOutput;
209    /// # }
210    /// #
211    /// # #[derive(Debug)]
212    /// # enum Effect {
213    /// #     AnEffect(Request<AnOperation>),
214    /// # }
215    /// #
216    /// # impl From<Request<AnOperation>> for Effect {
217    /// #     fn from(request: Request<AnOperation>) -> Self {
218    /// #         Self::AnEffect(request)
219    /// #     }
220    /// # }
221    /// #
222    /// # #[derive(Debug, PartialEq)]
223    /// # enum Event {
224    /// #     Completed(AnOperationOutput),
225    /// # }
226    /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
227    ///    .then_stream(|first| {
228    ///       let AnOperationOutput::Other(first) = first else {
229    ///          panic!("Invalid output!")
230    ///      };
231    ///
232    ///      let second = first + 1;
233    ///      Command::stream_from_shell(AnOperation::More(second))
234    ///    })
235    ///    .then_send(Event::Completed);
236    ///
237    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
238    /// assert_eq!(request.operation, AnOperation::More(1));
239    ///
240    /// request
241    ///   .resolve(AnOperationOutput::Other(1))
242    ///   .expect("to resolve");
243    ///
244    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
245    /// assert_eq!(request.operation, AnOperation::More(2));
246    pub fn then_stream<F, U, NextTask>(
247        self,
248        make_next_builder: F,
249    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
250    where
251        F: FnOnce(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
252        NextTask: Stream<Item = U> + Send + 'static,
253    {
254        StreamBuilder::new(|ctx| {
255            self.into_future(ctx.clone())
256                .map(make_next_builder)
257                .into_stream()
258                .flat_map(move |builder| builder.into_stream(ctx.clone()))
259        })
260    }
261
262    /// Convert the request builder into a future to use in an async context
263    #[must_use]
264    pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
265        let make_task = self.make_task;
266        make_task(ctx)
267    }
268
269    /// Create the command in an evented context
270    pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
271    where
272        E: FnOnce(T) -> Event + Send + 'static,
273        Task: Future<Output = T> + Send + 'static,
274    {
275        Command::new(|ctx| async move {
276            let out = self.into_future(ctx.clone()).await;
277            ctx.send_event(event(out));
278        })
279    }
280}
281
282/// A builder of stream command
283pub struct StreamBuilder<Effect, Event, Task> {
284    make_stream: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
285}
286
287impl<Effect, Event, Task, T> StreamBuilder<Effect, Event, Task>
288where
289    Effect: Send + 'static,
290    Event: Send + 'static,
291    Task: Stream<Item = T> + Send + 'static,
292{
293    pub fn new<F>(make_task: F) -> Self
294    where
295        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
296    {
297        let make_task = Box::new(make_task);
298
299        StreamBuilder {
300            make_stream: make_task,
301        }
302    }
303
304    pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
305    where
306        F: FnMut(T) -> U + Send + 'static,
307    {
308        StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
309    }
310
311    /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
312    /// passing the result to the provided closure `make_next_builder`.
313    ///
314    /// The returned value of the closure must be a [`StreamBuilder`], which
315    /// can represent some more work to be done before the composed future
316    /// is finished.
317    ///
318    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
319    ///
320    /// The closure `make_next_builder` is only run *after* successful completion
321    /// of the `self` future.
322    ///
323    /// Note that this function consumes the receiving `StreamBuilder` and returns a
324    /// new one that represents the composition.
325    ///
326    /// # Example
327    ///
328    /// ```
329    /// # use crux_core::{Command, Request};
330    /// # use crux_core::capability::Operation;
331    /// # use serde::{Deserialize, Serialize};
332    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
333    /// # enum AnOperation {
334    /// #     One,
335    /// #     Two,
336    /// #     More(u8),
337    /// # }
338    /// #
339    /// # #[derive(Debug, PartialEq, Deserialize)]
340    /// # enum AnOperationOutput {
341    /// #     One,
342    /// #     Two,
343    /// #     Other(u8),
344    /// # }
345    /// #
346    /// # impl Operation for AnOperation {
347    /// #     type Output = AnOperationOutput;
348    /// # }
349    /// #
350    /// # #[derive(Debug)]
351    /// # enum Effect {
352    /// #     AnEffect(Request<AnOperation>),
353    /// # }
354    /// #
355    /// # impl From<Request<AnOperation>> for Effect {
356    /// #     fn from(request: Request<AnOperation>) -> Self {
357    /// #         Self::AnEffect(request)
358    /// #     }
359    /// # }
360    /// #
361    /// # #[derive(Debug, PartialEq)]
362    /// # enum Event {
363    /// #     Completed(AnOperationOutput),
364    /// # }
365    /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
366    ///     .then_request(|first| {
367    ///         let AnOperationOutput::Other(first) = first else {
368    ///             panic!("Invalid output!")
369    ///         };
370    ///
371    ///         let second = first + 1;
372    ///         Command::request_from_shell(AnOperation::More(second))
373    ///     })
374    ///     .then_send(Event::Completed);
375    ///
376    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
377    /// assert_eq!(request.operation, AnOperation::More(1));
378    ///
379    /// request
380    ///    .resolve(AnOperationOutput::Other(1))
381    ///    .expect("to resolve");
382    ///
383    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
384    /// assert_eq!(request.operation, AnOperation::More(2));
385    /// ```
386    pub fn then_request<F, U, NextTask>(
387        self,
388        make_next_builder: F,
389    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
390    where
391        F: Fn(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
392        NextTask: Future<Output = U> + Send + 'static,
393    {
394        StreamBuilder::new(|ctx| {
395            self.into_stream(ctx.clone())
396                .then(move |item| make_next_builder(item).into_future(ctx.clone()))
397        })
398    }
399
400    /// Chain another [`StreamBuilder`] to run after completion of this one,
401    /// passing the result to the provided closure `make_next_builder`.
402    ///
403    /// The returned value of the closure must be a `StreamBuilder`, which
404    /// can represent some more work to be done before the composed future
405    /// is finished.
406    ///
407    /// If you want to chain a request, use [`Self::then_request`] instead.
408    ///
409    /// The closure `make_next_builder` is only run *after* successful completion
410    /// of the `self` future.
411    ///
412    /// Note that this function consumes the receiving `StreamBuilder` and returns a
413    /// new one that represents the composition.
414    ///
415    /// # Example
416    ///
417    /// ```
418    /// # use crux_core::{Command, Request};
419    /// # use crux_core::capability::Operation;
420    /// # use serde::{Deserialize, Serialize};
421    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
422    /// # enum AnOperation {
423    /// #     One,
424    /// #     Two,
425    /// #     More(u8),
426    /// # }
427    /// #
428    /// # #[derive(Debug, PartialEq, Deserialize)]
429    /// # enum AnOperationOutput {
430    /// #     One,
431    /// #     Two,
432    /// #     Other(u8),
433    /// # }
434    /// #
435    /// # impl Operation for AnOperation {
436    /// #     type Output = AnOperationOutput;
437    /// # }
438    /// #
439    /// # #[derive(Debug)]
440    /// # enum Effect {
441    /// #     AnEffect(Request<AnOperation>),
442    /// # }
443    /// #
444    /// # impl From<Request<AnOperation>> for Effect {
445    /// #     fn from(request: Request<AnOperation>) -> Self {
446    /// #         Self::AnEffect(request)
447    /// #     }
448    /// # }
449    /// #
450    /// # #[derive(Debug, PartialEq)]
451    /// # enum Event {
452    /// #     Completed(AnOperationOutput),
453    /// # }
454    /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
455    ///    .then_stream(|first| {
456    ///       let AnOperationOutput::Other(first) = first else {
457    ///          panic!("Invalid output!")
458    ///      };
459    ///
460    ///      let second = first + 1;
461    ///      Command::stream_from_shell(AnOperation::More(second))
462    ///    })
463    ///    .then_send(Event::Completed);
464    ///
465    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
466    /// assert_eq!(request.operation, AnOperation::More(1));
467    ///
468    /// request
469    ///   .resolve(AnOperationOutput::Other(1))
470    ///   .expect("to resolve");
471    ///
472    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
473    /// assert_eq!(request.operation, AnOperation::More(2));
474    pub fn then_stream<F, U, NextTask>(
475        self,
476        make_next_builder: F,
477    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
478    where
479        F: Fn(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
480        NextTask: Stream<Item = U> + Send + 'static,
481    {
482        StreamBuilder::new(move |ctx| {
483            self.into_stream(ctx.clone())
484                .map(move |item| {
485                    let next_builder = make_next_builder(item);
486                    Box::pin(next_builder.into_stream(ctx.clone()))
487                })
488                .flatten_unordered(None)
489        })
490    }
491
492    /// Create the command in an evented context
493    pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
494    where
495        E: Fn(T) -> Event + Send + 'static,
496    {
497        Command::new(|ctx| async move {
498            let mut stream = pin!(self.into_stream(ctx.clone()));
499
500            while let Some(out) = stream.next().await {
501                ctx.send_event(event(out));
502            }
503        })
504    }
505
506    /// Convert the stream builder into a stream to use in an async context
507    #[must_use]
508    pub fn into_stream(self, ctx: CommandContext<Effect, Event>) -> Task {
509        let make_stream = self.make_stream;
510
511        make_stream(ctx)
512    }
513}