crux_core/command/builder.rs
1//! Command builders are an abstraction allowing chaining effects,
2//! where outputs of one effect can serve as inputs to further effects,
3//! without requiring an async context.
4//!
5//! Chaining streams with streams is currently not supported, as the semantics
6//! of the composition are unclear. If you need to compose streams, use the async
7//! API and tools from the `futures` crate.
8
9use std::{future::Future, pin::pin};
10
11use futures::{FutureExt, Stream, StreamExt};
12
13use super::{context::CommandContext, Command};
14
15/// A builder of one-off notify command
16// Task is a future which does the shell talking and returns an output
17pub struct NotificationBuilder<Effect, Event, Task> {
18 make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
19}
20
21impl<Effect, Event, Task> NotificationBuilder<Effect, Event, Task>
22where
23 Effect: Send + 'static,
24 Event: Send + 'static,
25 Task: Future<Output = ()> + Send + 'static,
26{
27 pub fn new<F>(make_task: F) -> Self
28 where
29 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
30 {
31 let make_task = Box::new(make_task);
32
33 NotificationBuilder { make_task }
34 }
35
36 /// Convert the [`NotificationBuilder`] into a future to use in an async context
37 #[must_use]
38 pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
39 let make_task = self.make_task;
40 make_task(ctx)
41 }
42}
43
44impl<Effect, Event, Task> From<NotificationBuilder<Effect, Event, Task>> for Command<Effect, Event>
45where
46 Effect: Send + 'static,
47 Event: Send + 'static,
48 Task: Future<Output = ()> + Send + 'static,
49{
50 fn from(value: NotificationBuilder<Effect, Event, Task>) -> Self {
51 Command::new(|ctx| value.into_future(ctx))
52 }
53}
54
55/// A builder of one-off request command
56// Task is a future which does the shell talking and returns an output
57pub struct RequestBuilder<Effect, Event, Task> {
58 make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
59}
60
61impl<Effect, Event, Task, T> RequestBuilder<Effect, Event, Task>
62where
63 Effect: Send + 'static,
64 Event: Send + 'static,
65 Task: Future<Output = T> + Send + 'static,
66{
67 pub fn new<F>(make_task: F) -> Self
68 where
69 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
70 {
71 let make_task = Box::new(make_task);
72
73 RequestBuilder { make_task }
74 }
75
76 pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
77 where
78 F: FnOnce(T) -> U + Send + 'static,
79 {
80 RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
81 }
82
83 /// Chain another [`RequestBuilder`] to run after completion of this one,
84 /// passing the result to the provided closure `make_next_builder`.
85 ///
86 /// The returned value of the closure must be a `RequestBuilder`, which
87 /// can represent some more work to be done before the composed future
88 /// is finished.
89 ///
90 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
91 ///
92 /// The closure `make_next_builder` is only run *after* successful completion
93 /// of the `self` future.
94 ///
95 /// Note that this function consumes the receiving `RequestBuilder` and returns a
96 /// new one that represents the composition.
97 ///
98 /// # Example
99 ///
100 /// ```
101 /// # use crux_core::{Command, Request};
102 /// # use crux_core::capability::Operation;
103 /// # use serde::{Deserialize, Serialize};
104 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
105 /// # enum AnOperation {
106 /// # One,
107 /// # Two,
108 /// # More(u8),
109 /// # }
110 /// #
111 /// # #[derive(Debug, PartialEq, Deserialize)]
112 /// # enum AnOperationOutput {
113 /// # One,
114 /// # Two,
115 /// # Other(u8),
116 /// # }
117 /// #
118 /// # impl Operation for AnOperation {
119 /// # type Output = AnOperationOutput;
120 /// # }
121 /// #
122 /// # #[derive(Debug)]
123 /// # enum Effect {
124 /// # AnEffect(Request<AnOperation>),
125 /// # }
126 /// #
127 /// # impl From<Request<AnOperation>> for Effect {
128 /// # fn from(request: Request<AnOperation>) -> Self {
129 /// # Self::AnEffect(request)
130 /// # }
131 /// # }
132 /// #
133 /// # #[derive(Debug, PartialEq)]
134 /// # enum Event {
135 /// # Completed(AnOperationOutput),
136 /// # }
137 /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
138 /// .then_request(|first| {
139 /// let AnOperationOutput::Other(first) = first else {
140 /// panic!("Invalid output!")
141 /// };
142 ///
143 /// let second = first + 1;
144 /// Command::request_from_shell(AnOperation::More(second))
145 /// })
146 /// .then_send(Event::Completed);
147 ///
148 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
149 /// assert_eq!(request.operation, AnOperation::More(1));
150 ///
151 /// request
152 /// .resolve(AnOperationOutput::Other(1))
153 /// .expect("to resolve");
154 ///
155 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
156 /// assert_eq!(request.operation, AnOperation::More(2));
157 /// ```
158 pub fn then_request<F, U, NextTask>(
159 self,
160 make_next_builder: F,
161 ) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
162 where
163 F: FnOnce(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
164 NextTask: Future<Output = U> + Send + 'static,
165 {
166 RequestBuilder::new(|ctx| {
167 self.into_future(ctx.clone())
168 .then(|out| make_next_builder(out).into_future(ctx))
169 })
170 }
171
172 /// Chain a [`StreamBuilder`] to run after completion of this [`RequestBuilder`],
173 /// passing the result to the provided closure `make_next_builder`.
174 ///
175 /// The returned value of the closure must be a `StreamBuilder`, which
176 /// can represent some more work to be done before the composed future
177 /// is finished.
178 ///
179 /// If you want to chain a request, use [`Self::then_request`] instead.
180 ///
181 /// The closure `make_next_builder` is only run *after* successful completion
182 /// of the `self` future.
183 ///
184 /// Note that this function consumes the receiving `RequestBuilder` and returns a
185 /// [`StreamBuilder`] that represents the composition.
186 ///
187 /// # Example
188 ///
189 /// ```
190 /// # use crux_core::{Command, Request};
191 /// # use crux_core::capability::Operation;
192 /// # use serde::{Deserialize, Serialize};
193 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
194 /// # enum AnOperation {
195 /// # One,
196 /// # Two,
197 /// # More(u8),
198 /// # }
199 /// #
200 /// # #[derive(Debug, PartialEq, Deserialize)]
201 /// # enum AnOperationOutput {
202 /// # One,
203 /// # Two,
204 /// # Other(u8),
205 /// # }
206 /// #
207 /// # impl Operation for AnOperation {
208 /// # type Output = AnOperationOutput;
209 /// # }
210 /// #
211 /// # #[derive(Debug)]
212 /// # enum Effect {
213 /// # AnEffect(Request<AnOperation>),
214 /// # }
215 /// #
216 /// # impl From<Request<AnOperation>> for Effect {
217 /// # fn from(request: Request<AnOperation>) -> Self {
218 /// # Self::AnEffect(request)
219 /// # }
220 /// # }
221 /// #
222 /// # #[derive(Debug, PartialEq)]
223 /// # enum Event {
224 /// # Completed(AnOperationOutput),
225 /// # }
226 /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
227 /// .then_stream(|first| {
228 /// let AnOperationOutput::Other(first) = first else {
229 /// panic!("Invalid output!")
230 /// };
231 ///
232 /// let second = first + 1;
233 /// Command::stream_from_shell(AnOperation::More(second))
234 /// })
235 /// .then_send(Event::Completed);
236 ///
237 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
238 /// assert_eq!(request.operation, AnOperation::More(1));
239 ///
240 /// request
241 /// .resolve(AnOperationOutput::Other(1))
242 /// .expect("to resolve");
243 ///
244 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
245 /// assert_eq!(request.operation, AnOperation::More(2));
246 pub fn then_stream<F, U, NextTask>(
247 self,
248 make_next_builder: F,
249 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
250 where
251 F: FnOnce(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
252 NextTask: Stream<Item = U> + Send + 'static,
253 {
254 StreamBuilder::new(|ctx| {
255 self.into_future(ctx.clone())
256 .map(make_next_builder)
257 .into_stream()
258 .flat_map(move |builder| builder.into_stream(ctx.clone()))
259 })
260 }
261
262 /// Convert the request builder into a future to use in an async context
263 #[must_use]
264 pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
265 let make_task = self.make_task;
266 make_task(ctx)
267 }
268
269 /// Create the command in an evented context
270 pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
271 where
272 E: FnOnce(T) -> Event + Send + 'static,
273 Task: Future<Output = T> + Send + 'static,
274 {
275 Command::new(|ctx| async move {
276 let out = self.into_future(ctx.clone()).await;
277 ctx.send_event(event(out));
278 })
279 }
280}
281
282/// A builder of stream command
283pub struct StreamBuilder<Effect, Event, Task> {
284 make_stream: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
285}
286
287impl<Effect, Event, Task, T> StreamBuilder<Effect, Event, Task>
288where
289 Effect: Send + 'static,
290 Event: Send + 'static,
291 Task: Stream<Item = T> + Send + 'static,
292{
293 pub fn new<F>(make_task: F) -> Self
294 where
295 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
296 {
297 let make_task = Box::new(make_task);
298
299 StreamBuilder {
300 make_stream: make_task,
301 }
302 }
303
304 pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
305 where
306 F: FnMut(T) -> U + Send + 'static,
307 {
308 StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
309 }
310
311 /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
312 /// passing the result to the provided closure `make_next_builder`.
313 ///
314 /// The returned value of the closure must be a [`StreamBuilder`], which
315 /// can represent some more work to be done before the composed future
316 /// is finished.
317 ///
318 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
319 ///
320 /// The closure `make_next_builder` is only run *after* successful completion
321 /// of the `self` future.
322 ///
323 /// Note that this function consumes the receiving `StreamBuilder` and returns a
324 /// new one that represents the composition.
325 ///
326 /// # Example
327 ///
328 /// ```
329 /// # use crux_core::{Command, Request};
330 /// # use crux_core::capability::Operation;
331 /// # use serde::{Deserialize, Serialize};
332 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
333 /// # enum AnOperation {
334 /// # One,
335 /// # Two,
336 /// # More(u8),
337 /// # }
338 /// #
339 /// # #[derive(Debug, PartialEq, Deserialize)]
340 /// # enum AnOperationOutput {
341 /// # One,
342 /// # Two,
343 /// # Other(u8),
344 /// # }
345 /// #
346 /// # impl Operation for AnOperation {
347 /// # type Output = AnOperationOutput;
348 /// # }
349 /// #
350 /// # #[derive(Debug)]
351 /// # enum Effect {
352 /// # AnEffect(Request<AnOperation>),
353 /// # }
354 /// #
355 /// # impl From<Request<AnOperation>> for Effect {
356 /// # fn from(request: Request<AnOperation>) -> Self {
357 /// # Self::AnEffect(request)
358 /// # }
359 /// # }
360 /// #
361 /// # #[derive(Debug, PartialEq)]
362 /// # enum Event {
363 /// # Completed(AnOperationOutput),
364 /// # }
365 /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
366 /// .then_request(|first| {
367 /// let AnOperationOutput::Other(first) = first else {
368 /// panic!("Invalid output!")
369 /// };
370 ///
371 /// let second = first + 1;
372 /// Command::request_from_shell(AnOperation::More(second))
373 /// })
374 /// .then_send(Event::Completed);
375 ///
376 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
377 /// assert_eq!(request.operation, AnOperation::More(1));
378 ///
379 /// request
380 /// .resolve(AnOperationOutput::Other(1))
381 /// .expect("to resolve");
382 ///
383 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
384 /// assert_eq!(request.operation, AnOperation::More(2));
385 /// ```
386 pub fn then_request<F, U, NextTask>(
387 self,
388 make_next_builder: F,
389 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
390 where
391 F: Fn(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
392 NextTask: Future<Output = U> + Send + 'static,
393 {
394 StreamBuilder::new(|ctx| {
395 self.into_stream(ctx.clone())
396 .then(move |item| make_next_builder(item).into_future(ctx.clone()))
397 })
398 }
399
400 /// Chain another [`StreamBuilder`] to run after completion of this one,
401 /// passing the result to the provided closure `make_next_builder`.
402 ///
403 /// The returned value of the closure must be a `StreamBuilder`, which
404 /// can represent some more work to be done before the composed future
405 /// is finished.
406 ///
407 /// If you want to chain a request, use [`Self::then_request`] instead.
408 ///
409 /// The closure `make_next_builder` is only run *after* successful completion
410 /// of the `self` future.
411 ///
412 /// Note that this function consumes the receiving `StreamBuilder` and returns a
413 /// new one that represents the composition.
414 ///
415 /// # Example
416 ///
417 /// ```
418 /// # use crux_core::{Command, Request};
419 /// # use crux_core::capability::Operation;
420 /// # use serde::{Deserialize, Serialize};
421 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
422 /// # enum AnOperation {
423 /// # One,
424 /// # Two,
425 /// # More(u8),
426 /// # }
427 /// #
428 /// # #[derive(Debug, PartialEq, Deserialize)]
429 /// # enum AnOperationOutput {
430 /// # One,
431 /// # Two,
432 /// # Other(u8),
433 /// # }
434 /// #
435 /// # impl Operation for AnOperation {
436 /// # type Output = AnOperationOutput;
437 /// # }
438 /// #
439 /// # #[derive(Debug)]
440 /// # enum Effect {
441 /// # AnEffect(Request<AnOperation>),
442 /// # }
443 /// #
444 /// # impl From<Request<AnOperation>> for Effect {
445 /// # fn from(request: Request<AnOperation>) -> Self {
446 /// # Self::AnEffect(request)
447 /// # }
448 /// # }
449 /// #
450 /// # #[derive(Debug, PartialEq)]
451 /// # enum Event {
452 /// # Completed(AnOperationOutput),
453 /// # }
454 /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
455 /// .then_stream(|first| {
456 /// let AnOperationOutput::Other(first) = first else {
457 /// panic!("Invalid output!")
458 /// };
459 ///
460 /// let second = first + 1;
461 /// Command::stream_from_shell(AnOperation::More(second))
462 /// })
463 /// .then_send(Event::Completed);
464 ///
465 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
466 /// assert_eq!(request.operation, AnOperation::More(1));
467 ///
468 /// request
469 /// .resolve(AnOperationOutput::Other(1))
470 /// .expect("to resolve");
471 ///
472 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
473 /// assert_eq!(request.operation, AnOperation::More(2));
474 pub fn then_stream<F, U, NextTask>(
475 self,
476 make_next_builder: F,
477 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
478 where
479 F: Fn(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
480 NextTask: Stream<Item = U> + Send + 'static,
481 {
482 StreamBuilder::new(move |ctx| {
483 self.into_stream(ctx.clone())
484 .map(move |item| {
485 let next_builder = make_next_builder(item);
486 Box::pin(next_builder.into_stream(ctx.clone()))
487 })
488 .flatten_unordered(None)
489 })
490 }
491
492 /// Create the command in an evented context
493 pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
494 where
495 E: Fn(T) -> Event + Send + 'static,
496 {
497 Command::new(|ctx| async move {
498 let mut stream = pin!(self.into_stream(ctx.clone()));
499
500 while let Some(out) = stream.next().await {
501 ctx.send_event(event(out));
502 }
503 })
504 }
505
506 /// Convert the stream builder into a stream to use in an async context
507 #[must_use]
508 pub fn into_stream(self, ctx: CommandContext<Effect, Event>) -> Task {
509 let make_stream = self.make_stream;
510
511 make_stream(ctx)
512 }
513}