crux_core/capability/
channel.rs

1// Wrappers around crossbeam_channel that only expose the functionality we need (and is safe on wasm)
2
3use std::sync::Arc;
4
5pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>)
6where
7    T: Send + 'static,
8{
9    let (sender, receiver) = crossbeam_channel::unbounded();
10    let sender = Sender {
11        inner: Arc::new(sender),
12    };
13    let receiver = Receiver { inner: receiver };
14
15    (sender, receiver)
16}
17
18pub struct Receiver<T> {
19    inner: crossbeam_channel::Receiver<T>,
20}
21
22impl<T> Receiver<T> {
23    /// Receives a message if any are waiting.
24    ///
25    /// Panics if the receiver has disconnected, so shouldn't be used if
26    /// that's possible.
27    pub fn receive(&self) -> Option<T> {
28        match self.inner.try_recv() {
29            Ok(inner) => Some(inner),
30            Err(crossbeam_channel::TryRecvError::Empty) => None,
31            Err(crossbeam_channel::TryRecvError::Disconnected) => {
32                // Users _generally_ shouldn't be messing with channels themselves, so
33                // this probably shouldn't happen.  Might happen in tests, but lets
34                // fix that if we get complaints
35                panic!("Receiver was disconnected.")
36            }
37        }
38    }
39
40    /// Receives a message if any are waiting.
41    /// Returns the error branch if the sender has disconnected.
42    ///
43    /// This API isn't that nice, but isn't intended for public consumption
44    /// so whatevs.
45    pub fn try_receive(&self) -> Result<Option<T>, ()> {
46        match self.inner.try_recv() {
47            Ok(inner) => Ok(Some(inner)),
48            Err(crossbeam_channel::TryRecvError::Empty) => Ok(None),
49            Err(crossbeam_channel::TryRecvError::Disconnected) => Err(()),
50        }
51    }
52
53    pub fn drain(&self) -> Drain<T> {
54        Drain { receiver: self }
55    }
56}
57
58pub struct Drain<'a, T> {
59    receiver: &'a Receiver<T>,
60}
61
62impl<T> Iterator for Drain<'_, T> {
63    type Item = T;
64
65    fn next(&mut self) -> Option<Self::Item> {
66        self.receiver.receive()
67    }
68}
69
70pub struct Sender<T> {
71    inner: Arc<dyn SenderInner<T> + Send + Sync>,
72}
73
74impl<T> Clone for Sender<T> {
75    fn clone(&self) -> Self {
76        Self {
77            inner: Arc::clone(&self.inner),
78        }
79    }
80}
81
82impl<T> Sender<T>
83where
84    T: 'static,
85{
86    pub fn send(&self, t: T) {
87        self.inner.send(t)
88    }
89
90    pub fn map_input<NewT, F>(&self, func: F) -> Sender<NewT>
91    where
92        F: Fn(NewT) -> T + Send + Sync + 'static,
93    {
94        Sender {
95            inner: Arc::new(MappedInner {
96                sender: Arc::clone(&self.inner),
97                func,
98            }),
99        }
100    }
101}
102
103trait SenderInner<T> {
104    fn send(&self, t: T);
105}
106
107impl<T> SenderInner<T> for crossbeam_channel::Sender<T> {
108    fn send(&self, t: T) {
109        crossbeam_channel::Sender::send(self, t).unwrap()
110    }
111}
112
113pub struct MappedInner<T, F> {
114    sender: Arc<dyn SenderInner<T> + Send + Sync>,
115    func: F,
116}
117
118impl<F, T, U> SenderInner<U> for MappedInner<T, F>
119where
120    F: Fn(U) -> T,
121{
122    fn send(&self, value: U) {
123        self.sender.send((self.func)(value))
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use static_assertions::assert_impl_all;
130
131    use super::*;
132
133    assert_impl_all!(Sender<i32>: Send);
134
135    #[test]
136    fn test_channels() {
137        let (send, recv) = channel();
138
139        send.send(Some(1));
140        assert_eq!(recv.receive(), Some(Some(1)));
141
142        let wrapped_send = send.map_input(Some);
143        wrapped_send.send(1);
144        assert_eq!(recv.receive(), Some(Some(1)));
145
146        assert_eq!(recv.receive(), None);
147    }
148}