crux_core/command/
context.rs1use std::future::Future;
2use std::pin::{Pin, pin};
3
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use crossbeam_channel::Sender;
8use futures::channel::mpsc;
9use futures::future::Fuse;
10use futures::stream::StreamFuture;
11use futures::{FutureExt as _, Stream, StreamExt};
12
13use crate::Request;
14use crate::capability::Operation;
15
16use super::executor::{JoinHandle, Task};
17
18pub struct CommandContext<Effect, Event> {
22 pub(crate) effects: Sender<Effect>,
23 pub(crate) events: Sender<Event>,
24 pub(crate) tasks: Sender<Task>,
25 pub(crate) rc: Arc<()>,
26}
27impl<Effect, Event> Clone for CommandContext<Effect, Event> {
31 fn clone(&self) -> Self {
32 Self {
33 effects: self.effects.clone(),
34 events: self.events.clone(),
35 tasks: self.tasks.clone(),
36 rc: self.rc.clone(),
37 }
38 }
39}
40
41impl<Effect, Event> CommandContext<Effect, Event> {
42 #[allow(clippy::missing_panics_doc)]
44 pub fn notify_shell<Op>(&self, operation: Op)
45 where
46 Op: Operation,
47 Effect: From<Request<Op>>,
48 {
49 let request = Request::resolves_never(operation);
50
51 self.effects
52 .send(request.into())
53 .expect("Command could not send notification, effect channel disconnected");
54 }
55
56 #[allow(clippy::missing_panics_doc)]
66 pub fn request_from_shell<Op>(&self, operation: Op) -> ShellRequest<Op::Output>
68 where
69 Op: Operation,
70 Effect: From<Request<Op>> + Send + 'static,
71 {
72 let (output_sender, output_receiver) = mpsc::unbounded();
73
74 let request = Request::resolves_once(operation, move |output| {
75 let _ = output_sender.unbounded_send(output);
77 });
78
79 let send_request = {
80 let effect = request.into();
81 let effects = self.effects.clone();
82 move || {
83 effects
84 .send(effect)
85 .expect("Command could not send request effect, effect channel disconnected");
86 }
87 };
88
89 ShellRequest::new(Box::new(send_request), output_receiver)
90 }
91 #[allow(clippy::missing_panics_doc)]
103 pub fn stream_from_shell<Op>(&self, operation: Op) -> ShellStream<Op::Output>
104 where
105 Op: Operation,
106 Effect: From<Request<Op>> + Send + 'static,
107 {
108 let (output_sender, output_receiver) = mpsc::unbounded();
109
110 let request = Request::resolves_many_times(operation, move |output| {
111 output_sender.unbounded_send(output).map_err(|_| ())?;
112
113 Ok(())
115 });
116
117 let send_request = {
118 let effect = request.into();
119 let effects = self.effects.clone();
120 move || {
121 effects
122 .send(effect)
123 .expect("Command could not send stream effect, effect channel disconnected");
124 }
125 };
126
127 ShellStream::new(send_request, output_receiver)
128 }
129
130 #[allow(clippy::missing_panics_doc)]
133 pub fn send_event(&self, event: Event) {
134 self.events
135 .send(event)
136 .expect("Command could not send event, event channel disconnected");
137 }
138
139 #[allow(clippy::missing_panics_doc)]
145 pub fn spawn<F, Fut>(&self, make_future: F) -> JoinHandle
147 where
148 F: FnOnce(Self) -> Fut,
149 Fut: Future<Output = ()> + Send + 'static,
150 {
151 let (sender, receiver) = crossbeam_channel::unbounded();
152
153 let ctx = self.clone();
154 let future = make_future(ctx);
155
156 let task = Task {
157 finished: Arc::default(),
158 aborted: Arc::default(),
159 future: future.boxed(),
160 join_handle_wakers: receiver,
161 };
162
163 let handle = JoinHandle {
164 finished: task.finished.clone(),
165 aborted: task.aborted.clone(),
166 register_waker: sender,
167 };
168
169 self.tasks
170 .send(task)
171 .expect("Command could not spawn task, tasks channel disconnected");
172
173 handle
174 }
175 }
177
178pub enum ShellStream<T: Unpin + Send> {
179 ReadyToSend(Box<dyn FnOnce() + Send>, mpsc::UnboundedReceiver<T>),
180 Sent(mpsc::UnboundedReceiver<T>),
181}
182
183impl<T: Unpin + Send> ShellStream<T> {
184 fn new(
185 send_request: impl FnOnce() + Send + 'static,
186 output_receiver: mpsc::UnboundedReceiver<T>,
187 ) -> Self {
188 Self::ReadyToSend(Box::new(send_request), output_receiver)
189 }
190
191 fn send(&mut self) {
192 let dummy = Self::Sent(mpsc::unbounded().1);
196 let Self::ReadyToSend(send_request, output_receiver) = std::mem::replace(self, dummy)
197 else {
198 unreachable!("cannot send");
199 };
200
201 *self = Self::Sent(output_receiver);
203
204 send_request();
205 }
206}
207
208impl<T: Unpin + Send> Stream for ShellStream<T> {
209 type Item = T;
210
211 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
212 match *self {
213 Self::ReadyToSend(_, ref mut output_receiver) => {
214 let poll = pin!(output_receiver).poll_next(cx);
215 assert!(matches!(poll, Poll::Pending)); self.send();
218
219 Poll::Pending
220 }
221 Self::Sent(ref mut output_receiver) => pin!(output_receiver).poll_next(cx),
222 }
223 }
224}
225
226pub struct ShellRequest<T: Unpin + Send> {
227 inner: Fuse<StreamFuture<ShellStream<T>>>,
228}
229
230impl<T: Unpin + Send + 'static> ShellRequest<T> {
231 fn new(
232 send_request: impl FnOnce() + Send + 'static,
233 output_receiver: mpsc::UnboundedReceiver<T>,
234 ) -> Self {
235 let inner = ShellStream::new(send_request, output_receiver)
236 .into_future()
237 .fuse();
238
239 Self { inner }
240 }
241}
242
243impl<T: Unpin + Send> Future for ShellRequest<T> {
244 type Output = T;
245
246 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
247 match self.inner.poll_unpin(cx) {
248 Poll::Ready((Some(output), _rest)) => Poll::Ready(output),
249 Poll::Ready((None, _rest)) => Poll::Pending,
250 Poll::Pending => Poll::Pending,
251 }
252 }
253}