crux_core/command/builder.rs
1//! Command builders are an abstraction allowing chaining effects,
2//! where outputs of one effect can serve as inputs to further effects,
3//! without requiring an async context.
4//!
5//! Chaining streams with streams is currently not supported, as the semantics
6//! of the composition are unclear. If you need to compose streams, use the async
7//! API and tools from the `futures` crate.
8
9use std::{future::Future, pin::pin};
10
11use futures::{FutureExt, Stream, StreamExt};
12
13use super::{context::CommandContext, Command};
14
15/// A builder of one-off request command
16// Task is a future which does the shell talking and returns an output
17pub struct RequestBuilder<Effect, Event, Task> {
18 make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
19}
20
21impl<Effect, Event, Task, T> RequestBuilder<Effect, Event, Task>
22where
23 Effect: Send + 'static,
24 Event: Send + 'static,
25 Task: Future<Output = T> + Send + 'static,
26{
27 pub fn new<F>(make_task: F) -> Self
28 where
29 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
30 {
31 let make_task = Box::new(make_task);
32
33 RequestBuilder { make_task }
34 }
35
36 pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
37 where
38 F: FnOnce(T) -> U + Send + 'static,
39 {
40 RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
41 }
42
43 /// Chain another [`RequestBuilder`] to run after completion of this one,
44 /// passing the result to the provided closure `make_next_builder`.
45 ///
46 /// The returned value of the closure must be a `RequestBuilder`, which
47 /// can represent some more work to be done before the composed future
48 /// is finished.
49 ///
50 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
51 ///
52 /// The closure `make_next_builder` is only run *after* successful completion
53 /// of the `self` future.
54 ///
55 /// Note that this function consumes the receiving `RequestBuilder` and returns a
56 /// new one that represents the composition.
57 ///
58 /// # Example
59 ///
60 /// ```
61 /// # use crux_core::{Command, Request};
62 /// # use crux_core::capability::Operation;
63 /// # use serde::{Deserialize, Serialize};
64 /// # #[derive(Debug, PartialEq, Clone, Serialize)]
65 /// # enum AnOperation {
66 /// # One,
67 /// # Two,
68 /// # More(u8),
69 /// # }
70 /// #
71 /// # #[derive(Debug, PartialEq, Deserialize)]
72 /// # enum AnOperationOutput {
73 /// # One,
74 /// # Two,
75 /// # Other(u8),
76 /// # }
77 /// #
78 /// # impl Operation for AnOperation {
79 /// # type Output = AnOperationOutput;
80 /// # }
81 /// #
82 /// # #[derive(Debug)]
83 /// # enum Effect {
84 /// # AnEffect(Request<AnOperation>),
85 /// # }
86 /// #
87 /// # impl From<Request<AnOperation>> for Effect {
88 /// # fn from(request: Request<AnOperation>) -> Self {
89 /// # Self::AnEffect(request)
90 /// # }
91 /// # }
92 /// #
93 /// # #[derive(Debug, PartialEq)]
94 /// # enum Event {
95 /// # Completed(AnOperationOutput),
96 /// # }
97 /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
98 /// .then_request(|first| {
99 /// let AnOperationOutput::Other(first) = first else {
100 /// panic!("Invalid output!")
101 /// };
102 ///
103 /// let second = first + 1;
104 /// Command::request_from_shell(AnOperation::More(second))
105 /// })
106 /// .then_send(Event::Completed);
107 ///
108 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
109 /// assert_eq!(request.operation, AnOperation::More(1));
110 ///
111 /// request
112 /// .resolve(AnOperationOutput::Other(1))
113 /// .expect("to resolve");
114 ///
115 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
116 /// assert_eq!(request.operation, AnOperation::More(2));
117 /// ```
118 pub fn then_request<F, U, NextTask>(
119 self,
120 make_next_builder: F,
121 ) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
122 where
123 F: FnOnce(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
124 NextTask: Future<Output = U> + Send + 'static,
125 {
126 RequestBuilder::new(|ctx| {
127 self.into_future(ctx.clone())
128 .then(|out| make_next_builder(out).into_future(ctx))
129 })
130 }
131
132 /// Chain a [`StreamBuilder`] to run after completion of this [`RequestBuilder`],
133 /// passing the result to the provided closure `make_next_builder`.
134 ///
135 /// The returned value of the closure must be a `StreamBuilder`, which
136 /// can represent some more work to be done before the composed future
137 /// is finished.
138 ///
139 /// If you want to chain a request, use [`Self::then_request`] instead.
140 ///
141 /// The closure `make_next_builder` is only run *after* successful completion
142 /// of the `self` future.
143 ///
144 /// Note that this function consumes the receiving `RequestBuilder` and returns a
145 /// [`StreamBuilder`] that represents the composition.
146 ///
147 /// # Example
148 ///
149 /// ```
150 /// # use crux_core::{Command, Request};
151 /// # use crux_core::capability::Operation;
152 /// # use serde::{Deserialize, Serialize};
153 /// # #[derive(Debug, PartialEq, Clone, Serialize)]
154 /// # enum AnOperation {
155 /// # One,
156 /// # Two,
157 /// # More(u8),
158 /// # }
159 /// #
160 /// # #[derive(Debug, PartialEq, Deserialize)]
161 /// # enum AnOperationOutput {
162 /// # One,
163 /// # Two,
164 /// # Other(u8),
165 /// # }
166 /// #
167 /// # impl Operation for AnOperation {
168 /// # type Output = AnOperationOutput;
169 /// # }
170 /// #
171 /// # #[derive(Debug)]
172 /// # enum Effect {
173 /// # AnEffect(Request<AnOperation>),
174 /// # }
175 /// #
176 /// # impl From<Request<AnOperation>> for Effect {
177 /// # fn from(request: Request<AnOperation>) -> Self {
178 /// # Self::AnEffect(request)
179 /// # }
180 /// # }
181 /// #
182 /// # #[derive(Debug, PartialEq)]
183 /// # enum Event {
184 /// # Completed(AnOperationOutput),
185 /// # }
186 /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
187 /// .then_stream(|first| {
188 /// let AnOperationOutput::Other(first) = first else {
189 /// panic!("Invalid output!")
190 /// };
191 ///
192 /// let second = first + 1;
193 /// Command::stream_from_shell(AnOperation::More(second))
194 /// })
195 /// .then_send(Event::Completed);
196 ///
197 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
198 /// assert_eq!(request.operation, AnOperation::More(1));
199 ///
200 /// request
201 /// .resolve(AnOperationOutput::Other(1))
202 /// .expect("to resolve");
203 ///
204 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
205 /// assert_eq!(request.operation, AnOperation::More(2));
206 pub fn then_stream<F, U, NextTask>(
207 self,
208 make_next_builder: F,
209 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
210 where
211 F: FnOnce(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
212 NextTask: Stream<Item = U> + Send + 'static,
213 {
214 StreamBuilder::new(|ctx| {
215 self.into_future(ctx.clone())
216 .map(make_next_builder)
217 .into_stream()
218 .flat_map(move |builder| builder.into_stream(ctx.clone()))
219 })
220 }
221
222 /// Convert the request builder into a future to use in an async context
223 pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
224 let make_task = self.make_task;
225 make_task(ctx)
226 }
227
228 /// Create the command in an evented context
229 pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
230 where
231 E: FnOnce(T) -> Event + Send + 'static,
232 Task: Future<Output = T> + Send + 'static,
233 {
234 Command::new(|ctx| async move {
235 let out = self.into_future(ctx.clone()).await;
236 ctx.send_event(event(out));
237 })
238 }
239}
240
241/// A builder of stream command
242pub struct StreamBuilder<Effect, Event, Task> {
243 make_stream: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
244}
245
246impl<Effect, Event, Task, T> StreamBuilder<Effect, Event, Task>
247where
248 Effect: Send + 'static,
249 Event: Send + 'static,
250 Task: Stream<Item = T> + Send + 'static,
251{
252 pub fn new<F>(make_task: F) -> Self
253 where
254 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
255 {
256 let make_task = Box::new(make_task);
257
258 StreamBuilder {
259 make_stream: make_task,
260 }
261 }
262
263 pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
264 where
265 F: FnMut(T) -> U + Send + 'static,
266 {
267 StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
268 }
269
270 /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
271 /// passing the result to the provided closure `make_next_builder`.
272 ///
273 /// The returned value of the closure must be a [`StreamBuilder`], which
274 /// can represent some more work to be done before the composed future
275 /// is finished.
276 ///
277 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
278 ///
279 /// The closure `make_next_builder` is only run *after* successful completion
280 /// of the `self` future.
281 ///
282 /// Note that this function consumes the receiving `StreamBuilder` and returns a
283 /// new one that represents the composition.
284 ///
285 /// # Example
286 ///
287 /// ```
288 /// # use crux_core::{Command, Request};
289 /// # use crux_core::capability::Operation;
290 /// # use serde::{Deserialize, Serialize};
291 /// # #[derive(Debug, PartialEq, Clone, Serialize)]
292 /// # enum AnOperation {
293 /// # One,
294 /// # Two,
295 /// # More(u8),
296 /// # }
297 /// #
298 /// # #[derive(Debug, PartialEq, Deserialize)]
299 /// # enum AnOperationOutput {
300 /// # One,
301 /// # Two,
302 /// # Other(u8),
303 /// # }
304 /// #
305 /// # impl Operation for AnOperation {
306 /// # type Output = AnOperationOutput;
307 /// # }
308 /// #
309 /// # #[derive(Debug)]
310 /// # enum Effect {
311 /// # AnEffect(Request<AnOperation>),
312 /// # }
313 /// #
314 /// # impl From<Request<AnOperation>> for Effect {
315 /// # fn from(request: Request<AnOperation>) -> Self {
316 /// # Self::AnEffect(request)
317 /// # }
318 /// # }
319 /// #
320 /// # #[derive(Debug, PartialEq)]
321 /// # enum Event {
322 /// # Completed(AnOperationOutput),
323 /// # }
324 /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
325 /// .then_request(|first| {
326 /// let AnOperationOutput::Other(first) = first else {
327 /// panic!("Invalid output!")
328 /// };
329 ///
330 /// let second = first + 1;
331 /// Command::request_from_shell(AnOperation::More(second))
332 /// })
333 /// .then_send(Event::Completed);
334 ///
335 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
336 /// assert_eq!(request.operation, AnOperation::More(1));
337 ///
338 /// request
339 /// .resolve(AnOperationOutput::Other(1))
340 /// .expect("to resolve");
341 ///
342 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
343 /// assert_eq!(request.operation, AnOperation::More(2));
344 /// ```
345 pub fn then_request<F, U, NextTask>(
346 self,
347 make_next_builder: F,
348 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
349 where
350 F: Fn(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
351 NextTask: Future<Output = U> + Send + 'static,
352 {
353 StreamBuilder::new(|ctx| {
354 self.into_stream(ctx.clone())
355 .then(move |item| make_next_builder(item).into_future(ctx.clone()))
356 })
357 }
358
359 /// Chain another [`StreamBuilder`] to run after completion of this one,
360 /// passing the result to the provided closure `make_next_builder`.
361 ///
362 /// The returned value of the closure must be a `StreamBuilder`, which
363 /// can represent some more work to be done before the composed future
364 /// is finished.
365 ///
366 /// If you want to chain a request, use [`Self::then_request`] instead.
367 ///
368 /// The closure `make_next_builder` is only run *after* successful completion
369 /// of the `self` future.
370 ///
371 /// Note that this function consumes the receiving `StreamBuilder` and returns a
372 /// new one that represents the composition.
373 ///
374 /// # Example
375 ///
376 /// ```
377 /// # use crux_core::{Command, Request};
378 /// # use crux_core::capability::Operation;
379 /// # use serde::{Deserialize, Serialize};
380 /// # #[derive(Debug, PartialEq, Clone, Serialize)]
381 /// # enum AnOperation {
382 /// # One,
383 /// # Two,
384 /// # More(u8),
385 /// # }
386 /// #
387 /// # #[derive(Debug, PartialEq, Deserialize)]
388 /// # enum AnOperationOutput {
389 /// # One,
390 /// # Two,
391 /// # Other(u8),
392 /// # }
393 /// #
394 /// # impl Operation for AnOperation {
395 /// # type Output = AnOperationOutput;
396 /// # }
397 /// #
398 /// # #[derive(Debug)]
399 /// # enum Effect {
400 /// # AnEffect(Request<AnOperation>),
401 /// # }
402 /// #
403 /// # impl From<Request<AnOperation>> for Effect {
404 /// # fn from(request: Request<AnOperation>) -> Self {
405 /// # Self::AnEffect(request)
406 /// # }
407 /// # }
408 /// #
409 /// # #[derive(Debug, PartialEq)]
410 /// # enum Event {
411 /// # Completed(AnOperationOutput),
412 /// # }
413 /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
414 /// .then_stream(|first| {
415 /// let AnOperationOutput::Other(first) = first else {
416 /// panic!("Invalid output!")
417 /// };
418 ///
419 /// let second = first + 1;
420 /// Command::stream_from_shell(AnOperation::More(second))
421 /// })
422 /// .then_send(Event::Completed);
423 ///
424 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
425 /// assert_eq!(request.operation, AnOperation::More(1));
426 ///
427 /// request
428 /// .resolve(AnOperationOutput::Other(1))
429 /// .expect("to resolve");
430 ///
431 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
432 /// assert_eq!(request.operation, AnOperation::More(2));
433 pub fn then_stream<F, U, NextTask>(
434 self,
435 make_next_builder: F,
436 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
437 where
438 F: Fn(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
439 NextTask: Stream<Item = U> + Send + 'static,
440 {
441 StreamBuilder::new(move |ctx| {
442 self.into_stream(ctx.clone())
443 .map(move |item| {
444 let next_builder = make_next_builder(item);
445 Box::pin(next_builder.into_stream(ctx.clone()))
446 })
447 .flatten_unordered(None)
448 })
449 }
450
451 /// Create the command in an evented context
452 pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
453 where
454 E: Fn(T) -> Event + Send + 'static,
455 {
456 Command::new(|ctx| async move {
457 let mut stream = pin!(self.into_stream(ctx.clone()));
458
459 while let Some(out) = stream.next().await {
460 ctx.send_event(event(out));
461 }
462 })
463 }
464
465 /// Convert the stream builder into a stream to use in an async context
466 pub fn into_stream(self, ctx: CommandContext<Effect, Event>) -> Task {
467 let make_stream = self.make_stream;
468
469 make_stream(ctx)
470 }
471}