crux_core/command/
stream.rs

1// Command is an async Stream
2
3use std::future::Future;
4use std::ops::DerefMut as _;
5use std::task::{Context, Poll};
6
7use std::pin::Pin;
8
9use futures::{Sink, Stream, StreamExt as _};
10
11use crossbeam_channel::Sender;
12use thiserror::Error;
13
14use super::Command;
15
16/// An item emitted from a Command when used as a Stream.
17#[derive(Debug)]
18pub enum CommandOutput<Effect, Event> {
19    Effect(Effect),
20    Event(Event),
21}
22
23impl<Effect, Event> Stream for Command<Effect, Event>
24where
25    Effect: Unpin + Send + 'static,
26    Event: Unpin + Send + 'static,
27{
28    type Item = CommandOutput<Effect, Event>;
29
30    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31        self.waker.register(cx.waker());
32
33        // run_until_settled is idempotent
34        self.deref_mut().run_until_settled();
35
36        // Check events first to preserve the order in which items were emitted. This is because
37        // sending events doesn't yield, and the next request/stream await point will be
38        // reached in the same poll, so any follow up effects will _also_ be available
39        if let Ok(event) = self.events.try_recv() {
40            return Poll::Ready(Some(CommandOutput::Event(event)));
41        }
42
43        if let Ok(effect) = self.effects.try_recv() {
44            return Poll::Ready(Some(CommandOutput::Effect(effect)));
45        };
46
47        if self.is_done() {
48            Poll::Ready(None)
49        } else {
50            Poll::Pending
51        }
52    }
53}
54
55/// A sink for a Command stream, sending all emitted effects and events into a pair of channels
56pub(crate) struct CommandSink<Effect, Event> {
57    pub(crate) effects: Sender<Effect>,
58    pub(crate) events: Sender<Event>,
59}
60
61impl<Effect, Event> CommandSink<Effect, Event> {
62    pub(crate) fn new(effects: Sender<Effect>, events: Sender<Event>) -> Self {
63        Self { effects, events }
64    }
65}
66
67#[derive(Debug, Error)]
68pub(crate) enum HostedCommandError {
69    #[error("Cannot send effect to host")]
70    CannotSendEffect,
71    #[error("Cannot send event to host")]
72    CannotSendEvent,
73}
74
75impl<Effect, Event> Sink<CommandOutput<Effect, Event>> for CommandSink<Effect, Event> {
76    type Error = HostedCommandError;
77
78    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
79        Poll::Ready(Ok(()))
80    }
81
82    fn start_send(
83        self: Pin<&mut Self>,
84        item: CommandOutput<Effect, Event>,
85    ) -> Result<(), Self::Error> {
86        match item {
87            CommandOutput::Effect(effect) => self
88                .effects
89                .send(effect)
90                .map_err(|_| HostedCommandError::CannotSendEffect),
91            CommandOutput::Event(event) => self
92                .events
93                .send(event)
94                .map_err(|_| HostedCommandError::CannotSendEvent),
95        }
96    }
97
98    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
99        Poll::Ready(Ok(()))
100    }
101
102    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
103        Poll::Ready(Ok(()))
104    }
105}
106
107pub(crate) trait CommandStreamExt<Effect, Event>:
108    Stream<Item = CommandOutput<Effect, Event>>
109{
110    /// Connect this command to a pair of effect and event channels
111    ///
112    /// This is useful if you need to multiplex several commands into the same stream of
113    /// effects and events - like Crux does.
114    fn host(self, effects: Sender<Effect>, events: Sender<Event>) -> impl Future
115    where
116        Self: Send + Sized,
117    {
118        self.map(Ok).forward(CommandSink::new(effects, events))
119    }
120}
121
122impl<S, Effect, Event> CommandStreamExt<Effect, Event> for S where
123    S: Stream<Item = CommandOutput<Effect, Event>>
124{
125}