crux_core/command/
stream.rs1#![allow(clippy::redundant_pub_crate)]
2use std::future::Future;
5use std::ops::DerefMut as _;
6use std::task::{Context, Poll};
7
8use std::pin::Pin;
9
10use futures::{Sink, Stream, StreamExt as _};
11
12use crossbeam_channel::Sender;
13use thiserror::Error;
14
15use super::Command;
16
17#[derive(Debug)]
19pub enum CommandOutput<Effect, Event> {
20 Effect(Effect),
21 Event(Event),
22}
23
24impl<Effect, Event> Stream for Command<Effect, Event>
25where
26 Effect: Unpin + Send + 'static,
27 Event: Unpin + Send + 'static,
28{
29 type Item = CommandOutput<Effect, Event>;
30
31 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32 self.waker.register(cx.waker());
33
34 self.deref_mut().run_until_settled();
36
37 if let Ok(event) = self.events.try_recv() {
41 return Poll::Ready(Some(CommandOutput::Event(event)));
42 }
43
44 if let Ok(effect) = self.effects.try_recv() {
45 return Poll::Ready(Some(CommandOutput::Effect(effect)));
46 }
47
48 if self.is_done() {
49 Poll::Ready(None)
50 } else {
51 Poll::Pending
52 }
53 }
54}
55
56pub(crate) struct CommandSink<Effect, Event> {
58 pub(crate) effects: Sender<Effect>,
59 pub(crate) events: Sender<Event>,
60}
61
62impl<Effect, Event> CommandSink<Effect, Event> {
63 pub(crate) const fn new(effects: Sender<Effect>, events: Sender<Event>) -> Self {
64 Self { effects, events }
65 }
66}
67
68#[derive(Debug, Error)]
69pub(crate) enum HostedCommandError {
70 #[error("Cannot send effect to host")]
71 CannotSendEffect,
72 #[error("Cannot send event to host")]
73 CannotSendEvent,
74}
75
76impl<Effect, Event> Sink<CommandOutput<Effect, Event>> for CommandSink<Effect, Event> {
77 type Error = HostedCommandError;
78
79 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80 Poll::Ready(Ok(()))
81 }
82
83 fn start_send(
84 self: Pin<&mut Self>,
85 item: CommandOutput<Effect, Event>,
86 ) -> Result<(), Self::Error> {
87 match item {
88 CommandOutput::Effect(effect) => self
89 .effects
90 .send(effect)
91 .map_err(|_| HostedCommandError::CannotSendEffect),
92 CommandOutput::Event(event) => self
93 .events
94 .send(event)
95 .map_err(|_| HostedCommandError::CannotSendEvent),
96 }
97 }
98
99 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
100 Poll::Ready(Ok(()))
101 }
102
103 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
104 Poll::Ready(Ok(()))
105 }
106}
107
108pub(crate) trait CommandStreamExt<Effect, Event>:
109 Stream<Item = CommandOutput<Effect, Event>>
110{
111 fn host(self, effects: Sender<Effect>, events: Sender<Event>) -> impl Future
116 where
117 Self: Send + Sized,
118 {
119 self.map(Ok).forward(CommandSink::new(effects, events))
120 }
121}
122
123impl<S, Effect, Event> CommandStreamExt<Effect, Event> for S where
124 S: Stream<Item = CommandOutput<Effect, Event>>
125{
126}