Skip to main content

crux_core/command/
context.rs

1use std::future::Future;
2use std::pin::{Pin, pin};
3
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use crossbeam_channel::Sender;
8use futures::channel::mpsc;
9use futures::future::Fuse;
10use futures::stream::StreamFuture;
11use futures::{FutureExt as _, Stream, StreamExt};
12
13use crate::Request;
14use crate::capability::Operation;
15
16use super::executor::{JoinHandle, Task};
17
18/// Context enabling tasks to communicate with the parent Command,
19/// specifically submit effects, events and spawn further tasks
20// ANCHOR: command_context
21pub struct CommandContext<Effect, Event> {
22    pub(crate) effects: Sender<Effect>,
23    pub(crate) events: Sender<Event>,
24    pub(crate) tasks: Sender<Task>,
25    pub(crate) rc: Arc<()>,
26}
27// ANCHOR_END: command_context
28
29// derive(Clone) wants Effect and Event to be clone which is not actually necessary
30impl<Effect, Event> Clone for CommandContext<Effect, Event> {
31    fn clone(&self) -> Self {
32        Self {
33            effects: self.effects.clone(),
34            events: self.events.clone(),
35            tasks: self.tasks.clone(),
36            rc: self.rc.clone(),
37        }
38    }
39}
40
41impl<Effect, Event> CommandContext<Effect, Event> {
42    /// Create a one-off notification to the shell. This method returns immediately.
43    #[allow(clippy::missing_panics_doc)]
44    pub fn notify_shell<Op>(&self, operation: Op)
45    where
46        Op: Operation,
47        Effect: From<Request<Op>>,
48    {
49        let request = Request::resolves_never(operation);
50
51        self.effects
52            .send(request.into())
53            .expect("Command could not send notification, effect channel disconnected");
54    }
55
56    /// Create a one-off request for an operation. Returns a future which eventually resolves
57    /// with the output of the operation provided by the shell.
58    ///
59    /// # Cancellation behaviour
60    ///
61    /// `ShellRequest` futures may never resolve, if the corresponding [`RequestHandle`]
62    /// is dropped by the shell. Such cases are detected by the Command and the owning task is aborted.
63    /// That is to say - any `.await` point on a `ShellRequest` is a potential abort point for the
64    /// enclosing future.
65    #[allow(clippy::missing_panics_doc)]
66    // ANCHOR: request_from_shell
67    pub fn request_from_shell<Op>(&self, operation: Op) -> ShellRequest<Op::Output>
68    where
69        Op: Operation,
70        Effect: From<Request<Op>> + Send + 'static,
71    {
72        let (output_sender, output_receiver) = mpsc::unbounded();
73
74        let request = Request::resolves_once(operation, move |output| {
75            // If the channel is closed, the associated task has been cancelled
76            let _ = output_sender.unbounded_send(output);
77        });
78
79        let send_request = {
80            let effect = request.into();
81            let effects = self.effects.clone();
82            move || {
83                effects
84                    .send(effect)
85                    .expect("Command could not send request effect, effect channel disconnected");
86            }
87        };
88
89        ShellRequest::new(Box::new(send_request), output_receiver)
90    }
91    // ANCHOR_END: request_from_shell
92
93    /// Create a stream request for an operation. Returns a stream producing the
94    /// with the output of the operation every time it is provided by the shell.
95    ///
96    /// # Cancellation behaviour
97    ///
98    /// `ShellStream` futures may never resolve, if the corresponding [`RequestHandle`]
99    /// is dropped by the shell. Such cases are detected by the Command and the owning task is aborted.
100    /// That is to say - any `.await` point on a `ShellRequest` is a potential abort point for the
101    /// enclosing future.
102    #[allow(clippy::missing_panics_doc)]
103    pub fn stream_from_shell<Op>(&self, operation: Op) -> ShellStream<Op::Output>
104    where
105        Op: Operation,
106        Effect: From<Request<Op>> + Send + 'static,
107    {
108        let (output_sender, output_receiver) = mpsc::unbounded();
109
110        let request = Request::resolves_many_times(operation, move |output| {
111            output_sender.unbounded_send(output).map_err(|_| ())?;
112
113            // TODO: revisit the error handling in here
114            Ok(())
115        });
116
117        let send_request = {
118            let effect = request.into();
119            let effects = self.effects.clone();
120            move || {
121                effects
122                    .send(effect)
123                    .expect("Command could not send stream effect, effect channel disconnected");
124            }
125        };
126
127        ShellStream::new(send_request, output_receiver)
128    }
129
130    /// Send an event which should be handed to the update function. This is used to communicate the result
131    /// (or a sequence of results) of a command back to the app so that state can be updated accordingly
132    #[allow(clippy::missing_panics_doc)]
133    pub fn send_event(&self, event: Event) {
134        self.events
135            .send(event)
136            .expect("Command could not send event, event channel disconnected");
137    }
138
139    /// Spawn a new task within the same command. The task will execute concurrently with other tasks within the
140    /// command until it either concludes, is aborted, or until the parent command is aborted.
141    ///
142    /// Returns a `JoinHandle` which can be used as a future to await the completion of the task. It can also
143    /// be used to abort the task.
144    #[allow(clippy::missing_panics_doc)]
145    // ANCHOR: spawn
146    pub fn spawn<F, Fut>(&self, make_future: F) -> JoinHandle
147    where
148        F: FnOnce(CommandContext<Effect, Event>) -> Fut,
149        Fut: Future<Output = ()> + Send + 'static,
150    {
151        let (sender, receiver) = crossbeam_channel::unbounded();
152
153        let ctx = self.clone();
154        let future = make_future(ctx);
155
156        let task = Task {
157            finished: Arc::default(),
158            aborted: Arc::default(),
159            future: future.boxed(),
160            join_handle_wakers: receiver,
161        };
162
163        let handle = JoinHandle {
164            finished: task.finished.clone(),
165            aborted: task.aborted.clone(),
166            register_waker: sender,
167        };
168
169        self.tasks
170            .send(task)
171            .expect("Command could not spawn task, tasks channel disconnected");
172
173        handle
174    }
175    // ANCHOR_END: spawn
176}
177
178pub enum ShellStream<T: Unpin + Send> {
179    ReadyToSend(Box<dyn FnOnce() + Send>, mpsc::UnboundedReceiver<T>),
180    Sent(mpsc::UnboundedReceiver<T>),
181}
182
183impl<T: Unpin + Send> ShellStream<T> {
184    fn new(
185        send_request: impl FnOnce() + Send + 'static,
186        output_receiver: mpsc::UnboundedReceiver<T>,
187    ) -> Self {
188        ShellStream::ReadyToSend(Box::new(send_request), output_receiver)
189    }
190
191    fn send(&mut self) {
192        // Since neither part is Clone, we'll need to do an Indiana Jones
193
194        // 1. take items out of self
195        let dummy = ShellStream::Sent(mpsc::unbounded().1);
196        let ShellStream::ReadyToSend(send_request, output_receiver) =
197            std::mem::replace(self, dummy)
198        else {
199            unreachable!("cannot send");
200        };
201
202        // 2. replace self with with a Sent using the original receiver
203        *self = ShellStream::Sent(output_receiver);
204
205        send_request();
206    }
207}
208
209impl<T: Unpin + Send> Stream for ShellStream<T> {
210    type Item = T;
211
212    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
213        match *self {
214            ShellStream::ReadyToSend(_, ref mut output_receiver) => {
215                let poll = pin!(output_receiver).poll_next(cx);
216                assert!(matches!(poll, Poll::Pending)); // we have not sent the request yet
217
218                self.send();
219
220                Poll::Pending
221            }
222            ShellStream::Sent(ref mut output_receiver) => pin!(output_receiver).poll_next(cx),
223        }
224    }
225}
226
227pub struct ShellRequest<T: Unpin + Send> {
228    inner: Fuse<StreamFuture<ShellStream<T>>>,
229}
230
231impl<T: Unpin + Send + 'static> ShellRequest<T> {
232    fn new(
233        send_request: impl FnOnce() + Send + 'static,
234        output_receiver: mpsc::UnboundedReceiver<T>,
235    ) -> Self {
236        let inner = ShellStream::new(send_request, output_receiver)
237            .into_future()
238            .fuse();
239
240        Self { inner }
241    }
242}
243
244impl<T: Unpin + Send> Future for ShellRequest<T> {
245    type Output = T;
246
247    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248        match self.inner.poll_unpin(cx) {
249            Poll::Ready((Some(output), _rest)) => Poll::Ready(output),
250            Poll::Ready((None, _rest)) => Poll::Pending,
251            Poll::Pending => Poll::Pending,
252        }
253    }
254}