Skip to main content

crux_core/command/
stream.rs

1#![allow(clippy::redundant_pub_crate)]
2// Command is an async Stream
3
4use std::future::Future;
5use std::ops::DerefMut as _;
6use std::task::{Context, Poll};
7
8use std::pin::Pin;
9
10use futures::{Sink, Stream, StreamExt as _};
11
12use crossbeam_channel::Sender;
13use thiserror::Error;
14
15use super::Command;
16
17/// An item emitted from a Command when used as a Stream.
18#[derive(Debug)]
19pub enum CommandOutput<Effect, Event> {
20    Effect(Effect),
21    Event(Event),
22}
23
24impl<Effect, Event> Stream for Command<Effect, Event>
25where
26    Effect: Unpin + Send + 'static,
27    Event: Unpin + Send + 'static,
28{
29    type Item = CommandOutput<Effect, Event>;
30
31    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32        self.waker.register(cx.waker());
33
34        // run_until_settled is idempotent
35        self.deref_mut().run_until_settled();
36
37        // Check events first to preserve the order in which items were emitted. This is because
38        // sending events doesn't yield, and the next request/stream await point will be
39        // reached in the same poll, so any follow up effects will _also_ be available
40        if let Ok(event) = self.events.try_recv() {
41            return Poll::Ready(Some(CommandOutput::Event(event)));
42        }
43
44        if let Ok(effect) = self.effects.try_recv() {
45            return Poll::Ready(Some(CommandOutput::Effect(effect)));
46        }
47
48        if self.is_done() {
49            Poll::Ready(None)
50        } else {
51            Poll::Pending
52        }
53    }
54}
55
56/// A sink for a Command stream, sending all emitted effects and events into a pair of channels
57pub(crate) struct CommandSink<Effect, Event> {
58    pub(crate) effects: Sender<Effect>,
59    pub(crate) events: Sender<Event>,
60}
61
62impl<Effect, Event> CommandSink<Effect, Event> {
63    pub(crate) const fn new(effects: Sender<Effect>, events: Sender<Event>) -> Self {
64        Self { effects, events }
65    }
66}
67
68#[derive(Debug, Error)]
69pub(crate) enum HostedCommandError {
70    #[error("Cannot send effect to host")]
71    CannotSendEffect,
72    #[error("Cannot send event to host")]
73    CannotSendEvent,
74}
75
76impl<Effect, Event> Sink<CommandOutput<Effect, Event>> for CommandSink<Effect, Event> {
77    type Error = HostedCommandError;
78
79    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80        Poll::Ready(Ok(()))
81    }
82
83    fn start_send(
84        self: Pin<&mut Self>,
85        item: CommandOutput<Effect, Event>,
86    ) -> Result<(), Self::Error> {
87        match item {
88            CommandOutput::Effect(effect) => self
89                .effects
90                .send(effect)
91                .map_err(|_| HostedCommandError::CannotSendEffect),
92            CommandOutput::Event(event) => self
93                .events
94                .send(event)
95                .map_err(|_| HostedCommandError::CannotSendEvent),
96        }
97    }
98
99    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
100        Poll::Ready(Ok(()))
101    }
102
103    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
104        Poll::Ready(Ok(()))
105    }
106}
107
108pub(crate) trait CommandStreamExt<Effect, Event>:
109    Stream<Item = CommandOutput<Effect, Event>>
110{
111    /// Connect this command to a pair of effect and event channels
112    ///
113    /// This is useful if you need to multiplex several commands into the same stream of
114    /// effects and events - like Crux does.
115    fn host(self, effects: Sender<Effect>, events: Sender<Event>) -> impl Future
116    where
117        Self: Send + Sized,
118    {
119        self.map(Ok).forward(CommandSink::new(effects, events))
120    }
121}
122
123impl<S, Effect, Event> CommandStreamExt<Effect, Event> for S where
124    S: Stream<Item = CommandOutput<Effect, Event>>
125{
126}