crux_core/command/
context.rs1use std::future::Future;
2use std::pin::{Pin, pin};
3
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use crossbeam_channel::Sender;
8use futures::channel::mpsc;
9use futures::future::Fuse;
10use futures::stream::StreamFuture;
11use futures::{FutureExt as _, Stream, StreamExt};
12
13use crate::Request;
14use crate::capability::Operation;
15
16use super::executor::{JoinHandle, Task};
17
18pub struct CommandContext<Effect, Event> {
22 pub(crate) effects: Sender<Effect>,
23 pub(crate) events: Sender<Event>,
24 pub(crate) tasks: Sender<Task>,
25 pub(crate) rc: Arc<()>,
26}
27impl<Effect, Event> Clone for CommandContext<Effect, Event> {
31 fn clone(&self) -> Self {
32 Self {
33 effects: self.effects.clone(),
34 events: self.events.clone(),
35 tasks: self.tasks.clone(),
36 rc: self.rc.clone(),
37 }
38 }
39}
40
41impl<Effect, Event> CommandContext<Effect, Event> {
42 #[allow(clippy::missing_panics_doc)]
44 pub fn notify_shell<Op>(&self, operation: Op)
45 where
46 Op: Operation,
47 Effect: From<Request<Op>>,
48 {
49 let request = Request::resolves_never(operation);
50
51 self.effects
52 .send(request.into())
53 .expect("Command could not send notification, effect channel disconnected");
54 }
55
56 #[allow(clippy::missing_panics_doc)]
66 pub fn request_from_shell<Op>(&self, operation: Op) -> ShellRequest<Op::Output>
68 where
69 Op: Operation,
70 Effect: From<Request<Op>> + Send + 'static,
71 {
72 let (output_sender, output_receiver) = mpsc::unbounded();
73
74 let request = Request::resolves_once(operation, move |output| {
75 let _ = output_sender.unbounded_send(output);
77 });
78
79 let send_request = {
80 let effect = request.into();
81 let effects = self.effects.clone();
82 move || {
83 effects
84 .send(effect)
85 .expect("Command could not send request effect, effect channel disconnected");
86 }
87 };
88
89 ShellRequest::new(Box::new(send_request), output_receiver)
90 }
91 #[allow(clippy::missing_panics_doc)]
103 pub fn stream_from_shell<Op>(&self, operation: Op) -> ShellStream<Op::Output>
104 where
105 Op: Operation,
106 Effect: From<Request<Op>> + Send + 'static,
107 {
108 let (output_sender, output_receiver) = mpsc::unbounded();
109
110 let request = Request::resolves_many_times(operation, move |output| {
111 output_sender.unbounded_send(output).map_err(|_| ())?;
112
113 Ok(())
115 });
116
117 let send_request = {
118 let effect = request.into();
119 let effects = self.effects.clone();
120 move || {
121 effects
122 .send(effect)
123 .expect("Command could not send stream effect, effect channel disconnected");
124 }
125 };
126
127 ShellStream::new(send_request, output_receiver)
128 }
129
130 #[allow(clippy::missing_panics_doc)]
133 pub fn send_event(&self, event: Event) {
134 self.events
135 .send(event)
136 .expect("Command could not send event, event channel disconnected");
137 }
138
139 #[allow(clippy::missing_panics_doc)]
145 pub fn spawn<F, Fut>(&self, make_future: F) -> JoinHandle
147 where
148 F: FnOnce(CommandContext<Effect, Event>) -> Fut,
149 Fut: Future<Output = ()> + Send + 'static,
150 {
151 let (sender, receiver) = crossbeam_channel::unbounded();
152
153 let ctx = self.clone();
154 let future = make_future(ctx);
155
156 let task = Task {
157 finished: Arc::default(),
158 aborted: Arc::default(),
159 future: future.boxed(),
160 join_handle_wakers: receiver,
161 };
162
163 let handle = JoinHandle {
164 finished: task.finished.clone(),
165 aborted: task.aborted.clone(),
166 register_waker: sender,
167 };
168
169 self.tasks
170 .send(task)
171 .expect("Command could not spawn task, tasks channel disconnected");
172
173 handle
174 }
175 }
177
178pub enum ShellStream<T: Unpin + Send> {
179 ReadyToSend(Box<dyn FnOnce() + Send>, mpsc::UnboundedReceiver<T>),
180 Sent(mpsc::UnboundedReceiver<T>),
181}
182
183impl<T: Unpin + Send> ShellStream<T> {
184 fn new(
185 send_request: impl FnOnce() + Send + 'static,
186 output_receiver: mpsc::UnboundedReceiver<T>,
187 ) -> Self {
188 ShellStream::ReadyToSend(Box::new(send_request), output_receiver)
189 }
190
191 fn send(&mut self) {
192 let dummy = ShellStream::Sent(mpsc::unbounded().1);
196 let ShellStream::ReadyToSend(send_request, output_receiver) =
197 std::mem::replace(self, dummy)
198 else {
199 unreachable!("cannot send");
200 };
201
202 *self = ShellStream::Sent(output_receiver);
204
205 send_request();
206 }
207}
208
209impl<T: Unpin + Send> Stream for ShellStream<T> {
210 type Item = T;
211
212 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
213 match *self {
214 ShellStream::ReadyToSend(_, ref mut output_receiver) => {
215 let poll = pin!(output_receiver).poll_next(cx);
216 assert!(matches!(poll, Poll::Pending)); self.send();
219
220 Poll::Pending
221 }
222 ShellStream::Sent(ref mut output_receiver) => pin!(output_receiver).poll_next(cx),
223 }
224 }
225}
226
227pub struct ShellRequest<T: Unpin + Send> {
228 inner: Fuse<StreamFuture<ShellStream<T>>>,
229}
230
231impl<T: Unpin + Send + 'static> ShellRequest<T> {
232 fn new(
233 send_request: impl FnOnce() + Send + 'static,
234 output_receiver: mpsc::UnboundedReceiver<T>,
235 ) -> Self {
236 let inner = ShellStream::new(send_request, output_receiver)
237 .into_future()
238 .fuse();
239
240 Self { inner }
241 }
242}
243
244impl<T: Unpin + Send> Future for ShellRequest<T> {
245 type Output = T;
246
247 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248 match self.inner.poll_unpin(cx) {
249 Poll::Ready((Some(output), _rest)) => Poll::Ready(output),
250 Poll::Ready((None, _rest)) => Poll::Pending,
251 Poll::Pending => Poll::Pending,
252 }
253 }
254}