crux_core/command/
stream.rs1use std::future::Future;
4use std::ops::DerefMut as _;
5use std::task::{Context, Poll};
6
7use std::pin::Pin;
8
9use futures::{Sink, Stream, StreamExt as _};
10
11use crossbeam_channel::Sender;
12use thiserror::Error;
13
14use super::Command;
15
16#[derive(Debug)]
18pub enum CommandOutput<Effect, Event> {
19 Effect(Effect),
20 Event(Event),
21}
22
23impl<Effect, Event> Stream for Command<Effect, Event>
24where
25 Effect: Unpin + Send + 'static,
26 Event: Unpin + Send + 'static,
27{
28 type Item = CommandOutput<Effect, Event>;
29
30 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31 self.waker.register(cx.waker());
32
33 self.deref_mut().run_until_settled();
35
36 if let Ok(event) = self.events.try_recv() {
40 return Poll::Ready(Some(CommandOutput::Event(event)));
41 }
42
43 if let Ok(effect) = self.effects.try_recv() {
44 return Poll::Ready(Some(CommandOutput::Effect(effect)));
45 };
46
47 if self.is_done() {
48 Poll::Ready(None)
49 } else {
50 Poll::Pending
51 }
52 }
53}
54
55pub(crate) struct CommandSink<Effect, Event> {
57 pub(crate) effects: Sender<Effect>,
58 pub(crate) events: Sender<Event>,
59}
60
61impl<Effect, Event> CommandSink<Effect, Event> {
62 pub(crate) fn new(effects: Sender<Effect>, events: Sender<Event>) -> Self {
63 Self { effects, events }
64 }
65}
66
67#[derive(Debug, Error)]
68pub(crate) enum HostedCommandError {
69 #[error("Cannot send effect to host")]
70 CannotSendEffect,
71 #[error("Cannot send event to host")]
72 CannotSendEvent,
73}
74
75impl<Effect, Event> Sink<CommandOutput<Effect, Event>> for CommandSink<Effect, Event> {
76 type Error = HostedCommandError;
77
78 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
79 Poll::Ready(Ok(()))
80 }
81
82 fn start_send(
83 self: Pin<&mut Self>,
84 item: CommandOutput<Effect, Event>,
85 ) -> Result<(), Self::Error> {
86 match item {
87 CommandOutput::Effect(effect) => self
88 .effects
89 .send(effect)
90 .map_err(|_| HostedCommandError::CannotSendEffect),
91 CommandOutput::Event(event) => self
92 .events
93 .send(event)
94 .map_err(|_| HostedCommandError::CannotSendEvent),
95 }
96 }
97
98 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
99 Poll::Ready(Ok(()))
100 }
101
102 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
103 Poll::Ready(Ok(()))
104 }
105}
106
107pub(crate) trait CommandStreamExt<Effect, Event>:
108 Stream<Item = CommandOutput<Effect, Event>>
109{
110 fn host(self, effects: Sender<Effect>, events: Sender<Event>) -> impl Future
115 where
116 Self: Send + Sized,
117 {
118 self.map(Ok).forward(CommandSink::new(effects, events))
119 }
120}
121
122impl<S, Effect, Event> CommandStreamExt<Effect, Event> for S where
123 S: Stream<Item = CommandOutput<Effect, Event>>
124{
125}