crux_core/command/builder.rs
1//! Command builders are an abstraction allowing chaining effects,
2//! where outputs of one effect can serve as inputs to further effects,
3//! without requiring an async context.
4//!
5//! Chaining streams with streams is currently not supported, as the semantics
6//! of the composition are unclear. If you need to compose streams, use the async
7//! API and tools from the `futures` crate.
8
9use std::{future::Future, pin::pin};
10
11use futures::{FutureExt, Stream, StreamExt};
12
13use super::{context::CommandContext, Command};
14
15/// A builder of one-off notify command
16// Task is a future which does the shell talking and returns an output
17pub struct NotificationBuilder<Effect, Event, Task> {
18 make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
19}
20
21impl<Effect, Event, Task> NotificationBuilder<Effect, Event, Task>
22where
23 Effect: Send + 'static,
24 Event: Send + 'static,
25 Task: Future<Output = ()> + Send + 'static,
26{
27 pub fn new<F>(make_task: F) -> Self
28 where
29 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
30 {
31 let make_task = Box::new(make_task);
32
33 NotificationBuilder { make_task }
34 }
35
36 /// Convert the [`NotificationBuilder`] into a future to use in an async context
37 pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
38 let make_task = self.make_task;
39 make_task(ctx)
40 }
41}
42
43impl<Effect, Event, Task> From<NotificationBuilder<Effect, Event, Task>> for Command<Effect, Event>
44where
45 Effect: Send + 'static,
46 Event: Send + 'static,
47 Task: Future<Output = ()> + Send + 'static,
48{
49 fn from(value: NotificationBuilder<Effect, Event, Task>) -> Self {
50 Command::new(|ctx| value.into_future(ctx))
51 }
52}
53
54/// A builder of one-off request command
55// Task is a future which does the shell talking and returns an output
56pub struct RequestBuilder<Effect, Event, Task> {
57 make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
58}
59
60impl<Effect, Event, Task, T> RequestBuilder<Effect, Event, Task>
61where
62 Effect: Send + 'static,
63 Event: Send + 'static,
64 Task: Future<Output = T> + Send + 'static,
65{
66 pub fn new<F>(make_task: F) -> Self
67 where
68 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
69 {
70 let make_task = Box::new(make_task);
71
72 RequestBuilder { make_task }
73 }
74
75 pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
76 where
77 F: FnOnce(T) -> U + Send + 'static,
78 {
79 RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
80 }
81
82 /// Chain another [`RequestBuilder`] to run after completion of this one,
83 /// passing the result to the provided closure `make_next_builder`.
84 ///
85 /// The returned value of the closure must be a `RequestBuilder`, which
86 /// can represent some more work to be done before the composed future
87 /// is finished.
88 ///
89 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
90 ///
91 /// The closure `make_next_builder` is only run *after* successful completion
92 /// of the `self` future.
93 ///
94 /// Note that this function consumes the receiving `RequestBuilder` and returns a
95 /// new one that represents the composition.
96 ///
97 /// # Example
98 ///
99 /// ```
100 /// # use crux_core::{Command, Request};
101 /// # use crux_core::capability::Operation;
102 /// # use serde::{Deserialize, Serialize};
103 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
104 /// # enum AnOperation {
105 /// # One,
106 /// # Two,
107 /// # More(u8),
108 /// # }
109 /// #
110 /// # #[derive(Debug, PartialEq, Deserialize)]
111 /// # enum AnOperationOutput {
112 /// # One,
113 /// # Two,
114 /// # Other(u8),
115 /// # }
116 /// #
117 /// # impl Operation for AnOperation {
118 /// # type Output = AnOperationOutput;
119 /// # }
120 /// #
121 /// # #[derive(Debug)]
122 /// # enum Effect {
123 /// # AnEffect(Request<AnOperation>),
124 /// # }
125 /// #
126 /// # impl From<Request<AnOperation>> for Effect {
127 /// # fn from(request: Request<AnOperation>) -> Self {
128 /// # Self::AnEffect(request)
129 /// # }
130 /// # }
131 /// #
132 /// # #[derive(Debug, PartialEq)]
133 /// # enum Event {
134 /// # Completed(AnOperationOutput),
135 /// # }
136 /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
137 /// .then_request(|first| {
138 /// let AnOperationOutput::Other(first) = first else {
139 /// panic!("Invalid output!")
140 /// };
141 ///
142 /// let second = first + 1;
143 /// Command::request_from_shell(AnOperation::More(second))
144 /// })
145 /// .then_send(Event::Completed);
146 ///
147 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
148 /// assert_eq!(request.operation, AnOperation::More(1));
149 ///
150 /// request
151 /// .resolve(AnOperationOutput::Other(1))
152 /// .expect("to resolve");
153 ///
154 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
155 /// assert_eq!(request.operation, AnOperation::More(2));
156 /// ```
157 pub fn then_request<F, U, NextTask>(
158 self,
159 make_next_builder: F,
160 ) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
161 where
162 F: FnOnce(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
163 NextTask: Future<Output = U> + Send + 'static,
164 {
165 RequestBuilder::new(|ctx| {
166 self.into_future(ctx.clone())
167 .then(|out| make_next_builder(out).into_future(ctx))
168 })
169 }
170
171 /// Chain a [`StreamBuilder`] to run after completion of this [`RequestBuilder`],
172 /// passing the result to the provided closure `make_next_builder`.
173 ///
174 /// The returned value of the closure must be a `StreamBuilder`, which
175 /// can represent some more work to be done before the composed future
176 /// is finished.
177 ///
178 /// If you want to chain a request, use [`Self::then_request`] instead.
179 ///
180 /// The closure `make_next_builder` is only run *after* successful completion
181 /// of the `self` future.
182 ///
183 /// Note that this function consumes the receiving `RequestBuilder` and returns a
184 /// [`StreamBuilder`] that represents the composition.
185 ///
186 /// # Example
187 ///
188 /// ```
189 /// # use crux_core::{Command, Request};
190 /// # use crux_core::capability::Operation;
191 /// # use serde::{Deserialize, Serialize};
192 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
193 /// # enum AnOperation {
194 /// # One,
195 /// # Two,
196 /// # More(u8),
197 /// # }
198 /// #
199 /// # #[derive(Debug, PartialEq, Deserialize)]
200 /// # enum AnOperationOutput {
201 /// # One,
202 /// # Two,
203 /// # Other(u8),
204 /// # }
205 /// #
206 /// # impl Operation for AnOperation {
207 /// # type Output = AnOperationOutput;
208 /// # }
209 /// #
210 /// # #[derive(Debug)]
211 /// # enum Effect {
212 /// # AnEffect(Request<AnOperation>),
213 /// # }
214 /// #
215 /// # impl From<Request<AnOperation>> for Effect {
216 /// # fn from(request: Request<AnOperation>) -> Self {
217 /// # Self::AnEffect(request)
218 /// # }
219 /// # }
220 /// #
221 /// # #[derive(Debug, PartialEq)]
222 /// # enum Event {
223 /// # Completed(AnOperationOutput),
224 /// # }
225 /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
226 /// .then_stream(|first| {
227 /// let AnOperationOutput::Other(first) = first else {
228 /// panic!("Invalid output!")
229 /// };
230 ///
231 /// let second = first + 1;
232 /// Command::stream_from_shell(AnOperation::More(second))
233 /// })
234 /// .then_send(Event::Completed);
235 ///
236 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
237 /// assert_eq!(request.operation, AnOperation::More(1));
238 ///
239 /// request
240 /// .resolve(AnOperationOutput::Other(1))
241 /// .expect("to resolve");
242 ///
243 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
244 /// assert_eq!(request.operation, AnOperation::More(2));
245 pub fn then_stream<F, U, NextTask>(
246 self,
247 make_next_builder: F,
248 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
249 where
250 F: FnOnce(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
251 NextTask: Stream<Item = U> + Send + 'static,
252 {
253 StreamBuilder::new(|ctx| {
254 self.into_future(ctx.clone())
255 .map(make_next_builder)
256 .into_stream()
257 .flat_map(move |builder| builder.into_stream(ctx.clone()))
258 })
259 }
260
261 /// Convert the request builder into a future to use in an async context
262 pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
263 let make_task = self.make_task;
264 make_task(ctx)
265 }
266
267 /// Create the command in an evented context
268 pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
269 where
270 E: FnOnce(T) -> Event + Send + 'static,
271 Task: Future<Output = T> + Send + 'static,
272 {
273 Command::new(|ctx| async move {
274 let out = self.into_future(ctx.clone()).await;
275 ctx.send_event(event(out));
276 })
277 }
278}
279
280/// A builder of stream command
281pub struct StreamBuilder<Effect, Event, Task> {
282 make_stream: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
283}
284
285impl<Effect, Event, Task, T> StreamBuilder<Effect, Event, Task>
286where
287 Effect: Send + 'static,
288 Event: Send + 'static,
289 Task: Stream<Item = T> + Send + 'static,
290{
291 pub fn new<F>(make_task: F) -> Self
292 where
293 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
294 {
295 let make_task = Box::new(make_task);
296
297 StreamBuilder {
298 make_stream: make_task,
299 }
300 }
301
302 pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
303 where
304 F: FnMut(T) -> U + Send + 'static,
305 {
306 StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
307 }
308
309 /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
310 /// passing the result to the provided closure `make_next_builder`.
311 ///
312 /// The returned value of the closure must be a [`StreamBuilder`], which
313 /// can represent some more work to be done before the composed future
314 /// is finished.
315 ///
316 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
317 ///
318 /// The closure `make_next_builder` is only run *after* successful completion
319 /// of the `self` future.
320 ///
321 /// Note that this function consumes the receiving `StreamBuilder` and returns a
322 /// new one that represents the composition.
323 ///
324 /// # Example
325 ///
326 /// ```
327 /// # use crux_core::{Command, Request};
328 /// # use crux_core::capability::Operation;
329 /// # use serde::{Deserialize, Serialize};
330 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
331 /// # enum AnOperation {
332 /// # One,
333 /// # Two,
334 /// # More(u8),
335 /// # }
336 /// #
337 /// # #[derive(Debug, PartialEq, Deserialize)]
338 /// # enum AnOperationOutput {
339 /// # One,
340 /// # Two,
341 /// # Other(u8),
342 /// # }
343 /// #
344 /// # impl Operation for AnOperation {
345 /// # type Output = AnOperationOutput;
346 /// # }
347 /// #
348 /// # #[derive(Debug)]
349 /// # enum Effect {
350 /// # AnEffect(Request<AnOperation>),
351 /// # }
352 /// #
353 /// # impl From<Request<AnOperation>> for Effect {
354 /// # fn from(request: Request<AnOperation>) -> Self {
355 /// # Self::AnEffect(request)
356 /// # }
357 /// # }
358 /// #
359 /// # #[derive(Debug, PartialEq)]
360 /// # enum Event {
361 /// # Completed(AnOperationOutput),
362 /// # }
363 /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
364 /// .then_request(|first| {
365 /// let AnOperationOutput::Other(first) = first else {
366 /// panic!("Invalid output!")
367 /// };
368 ///
369 /// let second = first + 1;
370 /// Command::request_from_shell(AnOperation::More(second))
371 /// })
372 /// .then_send(Event::Completed);
373 ///
374 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
375 /// assert_eq!(request.operation, AnOperation::More(1));
376 ///
377 /// request
378 /// .resolve(AnOperationOutput::Other(1))
379 /// .expect("to resolve");
380 ///
381 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
382 /// assert_eq!(request.operation, AnOperation::More(2));
383 /// ```
384 pub fn then_request<F, U, NextTask>(
385 self,
386 make_next_builder: F,
387 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
388 where
389 F: Fn(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
390 NextTask: Future<Output = U> + Send + 'static,
391 {
392 StreamBuilder::new(|ctx| {
393 self.into_stream(ctx.clone())
394 .then(move |item| make_next_builder(item).into_future(ctx.clone()))
395 })
396 }
397
398 /// Chain another [`StreamBuilder`] to run after completion of this one,
399 /// passing the result to the provided closure `make_next_builder`.
400 ///
401 /// The returned value of the closure must be a `StreamBuilder`, which
402 /// can represent some more work to be done before the composed future
403 /// is finished.
404 ///
405 /// If you want to chain a request, use [`Self::then_request`] instead.
406 ///
407 /// The closure `make_next_builder` is only run *after* successful completion
408 /// of the `self` future.
409 ///
410 /// Note that this function consumes the receiving `StreamBuilder` and returns a
411 /// new one that represents the composition.
412 ///
413 /// # Example
414 ///
415 /// ```
416 /// # use crux_core::{Command, Request};
417 /// # use crux_core::capability::Operation;
418 /// # use serde::{Deserialize, Serialize};
419 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
420 /// # enum AnOperation {
421 /// # One,
422 /// # Two,
423 /// # More(u8),
424 /// # }
425 /// #
426 /// # #[derive(Debug, PartialEq, Deserialize)]
427 /// # enum AnOperationOutput {
428 /// # One,
429 /// # Two,
430 /// # Other(u8),
431 /// # }
432 /// #
433 /// # impl Operation for AnOperation {
434 /// # type Output = AnOperationOutput;
435 /// # }
436 /// #
437 /// # #[derive(Debug)]
438 /// # enum Effect {
439 /// # AnEffect(Request<AnOperation>),
440 /// # }
441 /// #
442 /// # impl From<Request<AnOperation>> for Effect {
443 /// # fn from(request: Request<AnOperation>) -> Self {
444 /// # Self::AnEffect(request)
445 /// # }
446 /// # }
447 /// #
448 /// # #[derive(Debug, PartialEq)]
449 /// # enum Event {
450 /// # Completed(AnOperationOutput),
451 /// # }
452 /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
453 /// .then_stream(|first| {
454 /// let AnOperationOutput::Other(first) = first else {
455 /// panic!("Invalid output!")
456 /// };
457 ///
458 /// let second = first + 1;
459 /// Command::stream_from_shell(AnOperation::More(second))
460 /// })
461 /// .then_send(Event::Completed);
462 ///
463 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
464 /// assert_eq!(request.operation, AnOperation::More(1));
465 ///
466 /// request
467 /// .resolve(AnOperationOutput::Other(1))
468 /// .expect("to resolve");
469 ///
470 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
471 /// assert_eq!(request.operation, AnOperation::More(2));
472 pub fn then_stream<F, U, NextTask>(
473 self,
474 make_next_builder: F,
475 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
476 where
477 F: Fn(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
478 NextTask: Stream<Item = U> + Send + 'static,
479 {
480 StreamBuilder::new(move |ctx| {
481 self.into_stream(ctx.clone())
482 .map(move |item| {
483 let next_builder = make_next_builder(item);
484 Box::pin(next_builder.into_stream(ctx.clone()))
485 })
486 .flatten_unordered(None)
487 })
488 }
489
490 /// Create the command in an evented context
491 pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
492 where
493 E: Fn(T) -> Event + Send + 'static,
494 {
495 Command::new(|ctx| async move {
496 let mut stream = pin!(self.into_stream(ctx.clone()));
497
498 while let Some(out) = stream.next().await {
499 ctx.send_event(event(out));
500 }
501 })
502 }
503
504 /// Convert the stream builder into a stream to use in an async context
505 pub fn into_stream(self, ctx: CommandContext<Effect, Event>) -> Task {
506 let make_stream = self.make_stream;
507
508 make_stream(ctx)
509 }
510}