crux_core/capability/
channel.rs
1use std::sync::Arc;
4
5pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>)
6where
7 T: Send + 'static,
8{
9 let (sender, receiver) = crossbeam_channel::unbounded();
10 let sender = Sender {
11 inner: Arc::new(sender),
12 };
13 let receiver = Receiver { inner: receiver };
14
15 (sender, receiver)
16}
17
18pub struct Receiver<T> {
19 inner: crossbeam_channel::Receiver<T>,
20}
21
22impl<T> Receiver<T> {
23 pub fn receive(&self) -> Option<T> {
28 match self.inner.try_recv() {
29 Ok(inner) => Some(inner),
30 Err(crossbeam_channel::TryRecvError::Empty) => None,
31 Err(crossbeam_channel::TryRecvError::Disconnected) => {
32 panic!("Receiver was disconnected.")
36 }
37 }
38 }
39
40 pub fn try_receive(&self) -> Result<Option<T>, ()> {
46 match self.inner.try_recv() {
47 Ok(inner) => Ok(Some(inner)),
48 Err(crossbeam_channel::TryRecvError::Empty) => Ok(None),
49 Err(crossbeam_channel::TryRecvError::Disconnected) => Err(()),
50 }
51 }
52
53 pub fn drain(&self) -> Drain<T> {
54 Drain { receiver: self }
55 }
56}
57
58pub struct Drain<'a, T> {
59 receiver: &'a Receiver<T>,
60}
61
62impl<T> Iterator for Drain<'_, T> {
63 type Item = T;
64
65 fn next(&mut self) -> Option<Self::Item> {
66 self.receiver.receive()
67 }
68}
69
70pub struct Sender<T> {
71 inner: Arc<dyn SenderInner<T> + Send + Sync>,
72}
73
74impl<T> Clone for Sender<T> {
75 fn clone(&self) -> Self {
76 Self {
77 inner: Arc::clone(&self.inner),
78 }
79 }
80}
81
82impl<T> Sender<T>
83where
84 T: 'static,
85{
86 pub fn send(&self, t: T) {
87 self.inner.send(t)
88 }
89
90 pub fn map_input<NewT, F>(&self, func: F) -> Sender<NewT>
91 where
92 F: Fn(NewT) -> T + Send + Sync + 'static,
93 {
94 Sender {
95 inner: Arc::new(MappedInner {
96 sender: Arc::clone(&self.inner),
97 func,
98 }),
99 }
100 }
101}
102
103trait SenderInner<T> {
104 fn send(&self, t: T);
105}
106
107impl<T> SenderInner<T> for crossbeam_channel::Sender<T> {
108 fn send(&self, t: T) {
109 crossbeam_channel::Sender::send(self, t).unwrap()
110 }
111}
112
113pub struct MappedInner<T, F> {
114 sender: Arc<dyn SenderInner<T> + Send + Sync>,
115 func: F,
116}
117
118impl<F, T, U> SenderInner<U> for MappedInner<T, F>
119where
120 F: Fn(U) -> T,
121{
122 fn send(&self, value: U) {
123 self.sender.send((self.func)(value))
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use static_assertions::assert_impl_all;
130
131 use super::*;
132
133 assert_impl_all!(Sender<i32>: Send);
134
135 #[test]
136 fn test_channels() {
137 let (send, recv) = channel();
138
139 send.send(Some(1));
140 assert_eq!(recv.receive(), Some(Some(1)));
141
142 let wrapped_send = send.map_input(Some);
143 wrapped_send.send(1);
144 assert_eq!(recv.receive(), Some(Some(1)));
145
146 assert_eq!(recv.receive(), None);
147 }
148}