crux_core/command/builder.rs
1//! Command builders are an abstraction allowing chaining effects,
2//! where outputs of one effect can serve as inputs to further effects,
3//! without requiring an async context.
4//!
5//! Chaining streams with streams is currently not supported, as the semantics
6//! of the composition are unclear. If you need to compose streams, use the async
7//! API and tools from the `futures` crate.
8
9use std::future::Future;
10
11use futures::{FutureExt, Stream, StreamExt};
12
13use super::{Command, context::CommandContext};
14
15/// A builder of one-off notify command
16// Task is a future which does the shell talking and returns an output
17pub struct NotificationBuilder<Effect, Event, Task> {
18 make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
19}
20
21impl<Effect, Event, Task> NotificationBuilder<Effect, Event, Task>
22where
23 Effect: Send + 'static,
24 Event: Send + 'static,
25 Task: Future<Output = ()> + Send + 'static,
26{
27 pub fn new<F>(make_task: F) -> Self
28 where
29 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
30 {
31 let make_task = Box::new(make_task);
32
33 NotificationBuilder { make_task }
34 }
35
36 /// Convert the [`NotificationBuilder`] into a future to use in an async context
37 #[must_use]
38 pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
39 let make_task = self.make_task;
40 make_task(ctx)
41 }
42
43 /// Convert the [`NotificationBuilder`] into a [`Command`] to use in an sync context
44 pub fn build(self) -> Command<Effect, Event> {
45 Command::new(move |ctx| self.into_future(ctx))
46 }
47}
48
49impl<Effect, Event, Task> From<NotificationBuilder<Effect, Event, Task>> for Command<Effect, Event>
50where
51 Effect: Send + 'static,
52 Event: Send + 'static,
53 Task: Future<Output = ()> + Send + 'static,
54{
55 fn from(value: NotificationBuilder<Effect, Event, Task>) -> Self {
56 Command::new(|ctx| value.into_future(ctx))
57 }
58}
59
60/// A builder of one-off request command
61// Task is a future which does the shell talking and returns an output
62pub struct RequestBuilder<Effect, Event, Task> {
63 make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
64}
65
66impl<Effect, Event, Task, T> RequestBuilder<Effect, Event, Task>
67where
68 Effect: Send + 'static,
69 Event: Send + 'static,
70 Task: Future<Output = T> + Send + 'static,
71{
72 pub fn new<F>(make_task: F) -> Self
73 where
74 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
75 {
76 let make_task = Box::new(make_task);
77
78 RequestBuilder { make_task }
79 }
80
81 pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
82 where
83 F: FnOnce(T) -> U + Send + 'static,
84 {
85 RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
86 }
87
88 /// Chain a [`NotificationBuilder`] to run after completion of this one,
89 /// passing the result to the provided closure `make_next_builder`.
90 ///
91 /// The returned value of the closure must be a [`NotificationBuilder`], which
92 /// can represent the notification to be sent before the composed future
93 /// is finished.
94 ///
95 /// If you want to chain a request, use [`Self::then_request`] instead.
96 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
97 ///
98 /// The closure `make_next_builder` is only run *after* successful completion
99 /// of the `self` future.
100 ///
101 /// Note that this function consumes the receiving `RequestBuilder`
102 /// and returns a [`NotificationBuilder`] that represents the composition.
103 ///
104 /// # Example
105 ///
106 /// ```
107 /// # use crux_core::{Command, Request};
108 /// # use crux_core::capability::Operation;
109 /// # use serde::{Deserialize, Serialize};
110 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
111 /// # enum AnOperation {
112 /// # Request(u8),
113 /// # Notify,
114 /// # }
115 /// #
116 /// # #[derive(Debug, PartialEq, Deserialize)]
117 /// # enum AnOperationOutput {
118 /// # Response(String),
119 /// # }
120 /// #
121 /// # impl Operation for AnOperation {
122 /// # type Output = AnOperationOutput;
123 /// # }
124 /// #
125 /// # #[derive(Debug)]
126 /// # enum Effect {
127 /// # AnEffect(Request<AnOperation>),
128 /// # }
129 /// #
130 /// # impl From<Request<AnOperation>> for Effect {
131 /// # fn from(request: Request<AnOperation>) -> Self {
132 /// # Self::AnEffect(request)
133 /// # }
134 /// # }
135 /// #
136 /// # #[derive(Debug, PartialEq)]
137 /// # enum Event {
138 /// # Response(AnOperationOutput),
139 /// # }
140 /// let mut cmd: Command<Effect, Event> =
141 /// Command::request_from_shell(AnOperation::Request(10))
142 /// .then_notify(|response| {
143 /// let AnOperationOutput::Response(_response) = response else {
144 /// panic!("Invalid output!")
145 /// };
146 ///
147 /// // possibly do something with the response
148 ///
149 /// Command::notify_shell(AnOperation::Notify)
150 /// })
151 /// .build();
152 ///
153 /// let effect = cmd.effects().next().unwrap();
154 /// let Effect::AnEffect(mut request) = effect;
155 ///
156 /// assert_eq!(request.operation, AnOperation::Request(10));
157 ///
158 /// request
159 /// .resolve(AnOperationOutput::Response("ten".to_string()))
160 /// .expect("should work");
161 ///
162 /// assert!(cmd.events().next().is_none());
163 /// let effect = cmd.effects().next().unwrap();
164 /// let Effect::AnEffect(request) = effect;
165 ///
166 /// assert_eq!(request.operation, AnOperation::Notify);
167 /// assert!(cmd.is_done());
168 /// ```
169 pub fn then_notify<F, NextTask>(
170 self,
171 make_next_builder: F,
172 ) -> NotificationBuilder<Effect, Event, impl Future<Output = ()>>
173 where
174 F: FnOnce(T) -> NotificationBuilder<Effect, Event, NextTask> + Send + 'static,
175 NextTask: Future<Output = ()> + Send + 'static,
176 {
177 NotificationBuilder::new(|ctx| {
178 self.into_future(ctx.clone())
179 .then(|out| make_next_builder(out).into_future(ctx))
180 })
181 }
182
183 /// Chain another [`RequestBuilder`] to run after completion of this one,
184 /// passing the result to the provided closure `make_next_builder`.
185 ///
186 /// The returned value of the closure must be a [`RequestBuilder`], which
187 /// can represent some more work to be done before the composed future
188 /// is finished.
189 ///
190 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
191 ///
192 /// The closure `make_next_builder` is only run *after* successful completion
193 /// of the `self` future.
194 ///
195 /// Note that this function consumes the receiving `RequestBuilder` and returns a
196 /// new one that represents the composition.
197 ///
198 /// # Example
199 ///
200 /// ```
201 /// # use crux_core::{Command, Request};
202 /// # use crux_core::capability::Operation;
203 /// # use serde::{Deserialize, Serialize};
204 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
205 /// # enum AnOperation {
206 /// # One,
207 /// # Two,
208 /// # More(u8),
209 /// # }
210 /// #
211 /// # #[derive(Debug, PartialEq, Deserialize)]
212 /// # enum AnOperationOutput {
213 /// # One,
214 /// # Two,
215 /// # Other(u8),
216 /// # }
217 /// #
218 /// # impl Operation for AnOperation {
219 /// # type Output = AnOperationOutput;
220 /// # }
221 /// #
222 /// # #[derive(Debug)]
223 /// # enum Effect {
224 /// # AnEffect(Request<AnOperation>),
225 /// # }
226 /// #
227 /// # impl From<Request<AnOperation>> for Effect {
228 /// # fn from(request: Request<AnOperation>) -> Self {
229 /// # Self::AnEffect(request)
230 /// # }
231 /// # }
232 /// #
233 /// # #[derive(Debug, PartialEq)]
234 /// # enum Event {
235 /// # Completed(AnOperationOutput),
236 /// # }
237 /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
238 /// .then_request(|first| {
239 /// let AnOperationOutput::Other(first) = first else {
240 /// panic!("Invalid output!")
241 /// };
242 ///
243 /// let second = first + 1;
244 /// Command::request_from_shell(AnOperation::More(second))
245 /// })
246 /// .then_send(Event::Completed);
247 ///
248 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
249 /// assert_eq!(request.operation, AnOperation::More(1));
250 ///
251 /// request
252 /// .resolve(AnOperationOutput::Other(1))
253 /// .expect("to resolve");
254 ///
255 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
256 /// assert_eq!(request.operation, AnOperation::More(2));
257 /// ```
258 pub fn then_request<F, U, NextTask>(
259 self,
260 make_next_builder: F,
261 ) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
262 where
263 F: FnOnce(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
264 NextTask: Future<Output = U> + Send + 'static,
265 {
266 RequestBuilder::new(|ctx| {
267 self.into_future(ctx.clone())
268 .then(|out| make_next_builder(out).into_future(ctx))
269 })
270 }
271
272 /// Chain a [`StreamBuilder`] to run after completion of this [`RequestBuilder`],
273 /// passing the result to the provided closure `make_next_builder`.
274 ///
275 /// The returned value of the closure must be a [`StreamBuilder`], which
276 /// can represent some more work to be done before the composed future
277 /// is finished.
278 ///
279 /// If you want to chain a request, use [`Self::then_request`] instead.
280 ///
281 /// The closure `make_next_builder` is only run *after* successful completion
282 /// of the `self` future.
283 ///
284 /// Note that this function consumes the receiving `RequestBuilder` and returns a
285 /// [`StreamBuilder`] that represents the composition.
286 ///
287 /// # Example
288 ///
289 /// ```
290 /// # use crux_core::{Command, Request};
291 /// # use crux_core::capability::Operation;
292 /// # use serde::{Deserialize, Serialize};
293 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
294 /// # enum AnOperation {
295 /// # One,
296 /// # Two,
297 /// # More(u8),
298 /// # }
299 /// #
300 /// # #[derive(Debug, PartialEq, Deserialize)]
301 /// # enum AnOperationOutput {
302 /// # One,
303 /// # Two,
304 /// # Other(u8),
305 /// # }
306 /// #
307 /// # impl Operation for AnOperation {
308 /// # type Output = AnOperationOutput;
309 /// # }
310 /// #
311 /// # #[derive(Debug)]
312 /// # enum Effect {
313 /// # AnEffect(Request<AnOperation>),
314 /// # }
315 /// #
316 /// # impl From<Request<AnOperation>> for Effect {
317 /// # fn from(request: Request<AnOperation>) -> Self {
318 /// # Self::AnEffect(request)
319 /// # }
320 /// # }
321 /// #
322 /// # #[derive(Debug, PartialEq)]
323 /// # enum Event {
324 /// # Completed(AnOperationOutput),
325 /// # }
326 /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
327 /// .then_stream(|first| {
328 /// let AnOperationOutput::Other(first) = first else {
329 /// panic!("Invalid output!")
330 /// };
331 ///
332 /// let second = first + 1;
333 /// Command::stream_from_shell(AnOperation::More(second))
334 /// })
335 /// .then_send(Event::Completed);
336 ///
337 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
338 /// assert_eq!(request.operation, AnOperation::More(1));
339 ///
340 /// request
341 /// .resolve(AnOperationOutput::Other(1))
342 /// .expect("to resolve");
343 ///
344 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
345 /// assert_eq!(request.operation, AnOperation::More(2));
346 pub fn then_stream<F, U, NextTask>(
347 self,
348 make_next_builder: F,
349 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
350 where
351 F: FnOnce(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
352 NextTask: Stream<Item = U> + Send + 'static,
353 {
354 StreamBuilder::new(|ctx| {
355 self.into_future(ctx.clone())
356 .map(make_next_builder)
357 .into_stream()
358 .flat_map(move |builder| builder.into_stream(ctx.clone()))
359 })
360 }
361
362 /// Convert the [`RequestBuilder`] into a future to use in an async context
363 #[must_use]
364 pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
365 let make_task = self.make_task;
366 make_task(ctx)
367 }
368
369 /// Create the command in an evented context
370 pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
371 where
372 E: FnOnce(T) -> Event + Send + 'static,
373 Task: Future<Output = T> + Send + 'static,
374 {
375 Command::new(move |ctx| {
376 self.into_future(ctx.clone())
377 .map(move |out| ctx.send_event(event(out)))
378 })
379 }
380
381 /// Convert the [`RequestBuilder`] into a [`Command`] to use in an sync context
382 ///
383 /// Note: You might be looking for [`then_send`](Self::then_send)
384 /// instead, which will send the output back into the app with an event.
385 ///
386 /// The command created in this function will *ignore* the output
387 /// of the request so may not be very useful.
388 /// It might be useful when using a 3rd party capability and you don't
389 /// care about the request's response.
390 pub fn build(self) -> Command<Effect, Event> {
391 Command::new(move |ctx| self.into_future(ctx).map(|_| ()))
392 }
393}
394
395/// A builder of stream command
396pub struct StreamBuilder<Effect, Event, Task> {
397 make_stream: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
398}
399
400impl<Effect, Event, Task, T> StreamBuilder<Effect, Event, Task>
401where
402 Effect: Send + 'static,
403 Event: Send + 'static,
404 Task: Stream<Item = T> + Send + 'static,
405{
406 pub fn new<F>(make_task: F) -> Self
407 where
408 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
409 {
410 let make_task = Box::new(make_task);
411
412 StreamBuilder {
413 make_stream: make_task,
414 }
415 }
416
417 pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
418 where
419 F: FnMut(T) -> U + Send + 'static,
420 {
421 StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
422 }
423
424 /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
425 /// passing the result to the provided closure `make_next_builder`.
426 ///
427 /// The returned value of the closure must be a [`RequestBuilder`], which
428 /// can represent some more work to be done before the composed future
429 /// is finished.
430 ///
431 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
432 ///
433 /// The closure `make_next_builder` is only run *after* successful completion
434 /// of the `self` future.
435 ///
436 /// Note that this function consumes the receiving `StreamBuilder` and returns a
437 /// new one that represents the composition.
438 ///
439 /// # Example
440 ///
441 /// ```
442 /// # use crux_core::{Command, Request};
443 /// # use crux_core::capability::Operation;
444 /// # use serde::{Deserialize, Serialize};
445 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
446 /// # enum AnOperation {
447 /// # One,
448 /// # Two,
449 /// # More(u8),
450 /// # }
451 /// #
452 /// # #[derive(Debug, PartialEq, Deserialize)]
453 /// # enum AnOperationOutput {
454 /// # One,
455 /// # Two,
456 /// # Other(u8),
457 /// # }
458 /// #
459 /// # impl Operation for AnOperation {
460 /// # type Output = AnOperationOutput;
461 /// # }
462 /// #
463 /// # #[derive(Debug)]
464 /// # enum Effect {
465 /// # AnEffect(Request<AnOperation>),
466 /// # }
467 /// #
468 /// # impl From<Request<AnOperation>> for Effect {
469 /// # fn from(request: Request<AnOperation>) -> Self {
470 /// # Self::AnEffect(request)
471 /// # }
472 /// # }
473 /// #
474 /// # #[derive(Debug, PartialEq)]
475 /// # enum Event {
476 /// # Completed(AnOperationOutput),
477 /// # }
478 /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
479 /// .then_request(|first| {
480 /// let AnOperationOutput::Other(first) = first else {
481 /// panic!("Invalid output!")
482 /// };
483 ///
484 /// let second = first + 1;
485 /// Command::request_from_shell(AnOperation::More(second))
486 /// })
487 /// .then_send(Event::Completed);
488 ///
489 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
490 /// assert_eq!(request.operation, AnOperation::More(1));
491 ///
492 /// request
493 /// .resolve(AnOperationOutput::Other(1))
494 /// .expect("to resolve");
495 ///
496 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
497 /// assert_eq!(request.operation, AnOperation::More(2));
498 /// ```
499 pub fn then_request<F, U, NextTask>(
500 self,
501 make_next_builder: F,
502 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
503 where
504 F: Fn(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
505 NextTask: Future<Output = U> + Send + 'static,
506 {
507 StreamBuilder::new(|ctx| {
508 self.into_stream(ctx.clone())
509 .then(move |item| make_next_builder(item).into_future(ctx.clone()))
510 })
511 }
512
513 /// Chain another [`StreamBuilder`] to run after completion of this one,
514 /// passing the result to the provided closure `make_next_builder`.
515 ///
516 /// The returned value of the closure must be a [`StreamBuilder`], which
517 /// can represent some more work to be done before the composed future
518 /// is finished.
519 ///
520 /// If you want to chain a request, use [`Self::then_request`] instead.
521 ///
522 /// The closure `make_next_builder` is only run *after* successful completion
523 /// of the `self` future.
524 ///
525 /// Note that this function consumes the receiving `StreamBuilder` and returns a
526 /// new one that represents the composition.
527 ///
528 /// # Example
529 ///
530 /// ```
531 /// # use crux_core::{Command, Request};
532 /// # use crux_core::capability::Operation;
533 /// # use serde::{Deserialize, Serialize};
534 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
535 /// # enum AnOperation {
536 /// # One,
537 /// # Two,
538 /// # More(u8),
539 /// # }
540 /// #
541 /// # #[derive(Debug, PartialEq, Deserialize)]
542 /// # enum AnOperationOutput {
543 /// # One,
544 /// # Two,
545 /// # Other(u8),
546 /// # }
547 /// #
548 /// # impl Operation for AnOperation {
549 /// # type Output = AnOperationOutput;
550 /// # }
551 /// #
552 /// # #[derive(Debug)]
553 /// # enum Effect {
554 /// # AnEffect(Request<AnOperation>),
555 /// # }
556 /// #
557 /// # impl From<Request<AnOperation>> for Effect {
558 /// # fn from(request: Request<AnOperation>) -> Self {
559 /// # Self::AnEffect(request)
560 /// # }
561 /// # }
562 /// #
563 /// # #[derive(Debug, PartialEq)]
564 /// # enum Event {
565 /// # Completed(AnOperationOutput),
566 /// # }
567 /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
568 /// .then_stream(|first| {
569 /// let AnOperationOutput::Other(first) = first else {
570 /// panic!("Invalid output!")
571 /// };
572 ///
573 /// let second = first + 1;
574 /// Command::stream_from_shell(AnOperation::More(second))
575 /// })
576 /// .then_send(Event::Completed);
577 ///
578 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
579 /// assert_eq!(request.operation, AnOperation::More(1));
580 ///
581 /// request
582 /// .resolve(AnOperationOutput::Other(1))
583 /// .expect("to resolve");
584 ///
585 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
586 /// assert_eq!(request.operation, AnOperation::More(2));
587 pub fn then_stream<F, U, NextTask>(
588 self,
589 make_next_builder: F,
590 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
591 where
592 F: Fn(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
593 NextTask: Stream<Item = U> + Send + 'static,
594 {
595 StreamBuilder::new(move |ctx| {
596 self.into_stream(ctx.clone())
597 .map(move |item| {
598 let next_builder = make_next_builder(item);
599 Box::pin(next_builder.into_stream(ctx.clone()))
600 })
601 .flatten_unordered(None)
602 })
603 }
604
605 /// Create the command in an evented context
606 pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
607 where
608 E: Fn(T) -> Event + Send + 'static,
609 {
610 Command::new(move |ctx| {
611 self.into_stream(ctx.clone()).for_each(move |out| {
612 ctx.send_event(event(out));
613 futures::future::ready(())
614 })
615 })
616 }
617
618 /// Convert the [`StreamBuilder`] into a stream to use in an async context
619 #[must_use]
620 pub fn into_stream(self, ctx: CommandContext<Effect, Event>) -> Task {
621 let make_stream = self.make_stream;
622
623 make_stream(ctx)
624 }
625
626 /// Convert the [`StreamBuilder`] into a [`Command`] to use in an sync context
627 ///
628 /// Note: You might be looking for [`then_send`](Self::then_send)
629 /// instead, which will send each item in the stream back into the
630 /// app with an event.
631 ///
632 /// The command created in this function will *ignore* the output
633 /// of the stream so may not be very useful.
634 /// It may be useful when using a 3rd party capability and you don't
635 /// care about the stream output.
636 pub fn build(self) -> Command<Effect, Event> {
637 Command::new(move |ctx| {
638 self.into_stream(ctx)
639 .for_each(|_| futures::future::ready(()))
640 })
641 }
642}