crux_core/command/
context.rs

1use std::future::Future;
2use std::pin::{pin, Pin};
3
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use crossbeam_channel::Sender;
8use futures::channel::mpsc;
9use futures::future::Fuse;
10use futures::stream::StreamFuture;
11use futures::{FutureExt as _, Stream, StreamExt};
12
13use crate::capability::Operation;
14use crate::Request;
15
16use super::executor::{JoinHandle, Task};
17
18/// Context enabling tasks to communicate with the parent Command,
19/// specifically submit effects, events and spawn further tasks
20pub struct CommandContext<Effect, Event> {
21    pub(crate) effects: Sender<Effect>,
22    pub(crate) events: Sender<Event>,
23    pub(crate) tasks: Sender<Task>,
24}
25
26// derive(Clone) wants Effect and Event to be clone which is not actually necessary
27impl<Effect, Event> Clone for CommandContext<Effect, Event> {
28    fn clone(&self) -> Self {
29        Self {
30            effects: self.effects.clone(),
31            events: self.events.clone(),
32            tasks: self.tasks.clone(),
33        }
34    }
35}
36
37impl<Effect, Event> CommandContext<Effect, Event> {
38    /// Create a one-off notification to the shell. This method returns immediately.
39    #[allow(clippy::missing_panics_doc)]
40    pub fn notify_shell<Op>(&self, operation: Op)
41    where
42        Op: Operation,
43        Effect: From<Request<Op>>,
44    {
45        let request = Request::resolves_never(operation);
46
47        self.effects
48            .send(request.into())
49            .expect("Command could not send notification, effect channel disconnected");
50    }
51
52    /// Create a one-off request for an operation. Returns a future which eventually resolves
53    /// with the output of the operation provided by the shell.
54    #[allow(clippy::missing_panics_doc)]
55    pub fn request_from_shell<Op>(&self, operation: Op) -> ShellRequest<Op::Output>
56    where
57        Op: Operation,
58        Effect: From<Request<Op>> + Send + 'static,
59    {
60        let (output_sender, output_receiver) = mpsc::unbounded();
61
62        let request = Request::resolves_once(operation, move |output| {
63            // If the channel is closed, the associated task has been cancelled
64            let _ = output_sender.unbounded_send(output);
65        });
66
67        let send_request = {
68            let effect = request.into();
69            let effects = self.effects.clone();
70            move || {
71                effects
72                    .send(effect)
73                    .expect("Command could not send request effect, effect channel disconnected");
74            }
75        };
76
77        ShellRequest::new(Box::new(send_request), output_receiver)
78    }
79
80    /// Create a stream request for an operation. Returns a stream producing the
81    /// with the output of the operation every time it is provided by the shell.
82    #[allow(clippy::missing_panics_doc)]
83    pub fn stream_from_shell<Op>(&self, operation: Op) -> ShellStream<Op::Output>
84    where
85        Op: Operation,
86        Effect: From<Request<Op>> + Send + 'static,
87    {
88        let (output_sender, output_receiver) = mpsc::unbounded();
89
90        let request = Request::resolves_many_times(operation, move |output| {
91            output_sender.unbounded_send(output).map_err(|_| ())?;
92
93            // TODO: revisit the error handling in here
94            Ok(())
95        });
96
97        let send_request = {
98            let effect = request.into();
99            let effects = self.effects.clone();
100            move || {
101                effects
102                    .send(effect)
103                    .expect("Command could not send stream effect, effect channel disconnected");
104            }
105        };
106
107        ShellStream::new(send_request, output_receiver)
108    }
109
110    /// Send an event which should be handed to the update function. This is used to communicate the result
111    /// (or a sequence of results) of a command back to the app so that state can be updated accordingly
112    #[allow(clippy::missing_panics_doc)]
113    pub fn send_event(&self, event: Event) {
114        self.events
115            .send(event)
116            .expect("Command could not send event, event channel disconnected");
117    }
118
119    /// Spawn a new task within the same command. The task will execute concurrently with other tasks within the
120    /// command until it either concludes, is aborted, or until the parent command is aborted.
121    ///
122    /// Returns a `JoinHandle` which can be used as a future to await the completion of the task. It can also
123    /// be used to abort the task.
124    #[allow(clippy::missing_panics_doc)]
125    pub fn spawn<F, Fut>(&self, make_future: F) -> JoinHandle
126    where
127        F: FnOnce(CommandContext<Effect, Event>) -> Fut,
128        Fut: Future<Output = ()> + Send + 'static,
129    {
130        let (sender, receiver) = crossbeam_channel::unbounded();
131
132        let ctx = self.clone();
133        let future = make_future(ctx);
134
135        let task = Task {
136            finished: Arc::default(),
137            aborted: Arc::default(),
138            future: future.boxed(),
139            join_handle_wakers: receiver,
140        };
141
142        let handle = JoinHandle {
143            finished: task.finished.clone(),
144            aborted: task.aborted.clone(),
145            register_waker: sender,
146        };
147
148        self.tasks
149            .send(task)
150            .expect("Command could not spawn task, tasks channel disconnected");
151
152        handle
153    }
154}
155
156pub enum ShellStream<T: Unpin + Send> {
157    ReadyToSend(Box<dyn FnOnce() + Send>, mpsc::UnboundedReceiver<T>),
158    Sent(mpsc::UnboundedReceiver<T>),
159}
160
161impl<T: Unpin + Send> ShellStream<T> {
162    fn new(
163        send_request: impl FnOnce() + Send + 'static,
164        output_receiver: mpsc::UnboundedReceiver<T>,
165    ) -> Self {
166        ShellStream::ReadyToSend(Box::new(send_request), output_receiver)
167    }
168
169    fn send(&mut self) {
170        // Since neither part is Clone, we'll need to do an Indiana Jones
171
172        // 1. take items out of self
173        let dummy = ShellStream::Sent(mpsc::unbounded().1);
174        let ShellStream::ReadyToSend(send_request, output_receiver) =
175            std::mem::replace(self, dummy)
176        else {
177            unreachable!();
178        };
179
180        // 2. replace self with with a Sent using the original receiver
181        *self = ShellStream::Sent(output_receiver);
182
183        send_request();
184    }
185}
186
187impl<T: Unpin + Send> Stream for ShellStream<T> {
188    type Item = T;
189
190    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
191        match *self {
192            ShellStream::ReadyToSend(_, ref mut output_receiver) => {
193                let poll = pin!(output_receiver).poll_next(cx);
194                assert!(matches!(poll, Poll::Pending)); // we have not sent the request yet
195
196                self.send();
197
198                Poll::Pending
199            }
200            ShellStream::Sent(ref mut output_receiver) => pin!(output_receiver).poll_next(cx),
201        }
202    }
203}
204
205pub struct ShellRequest<T: Unpin + Send> {
206    inner: Fuse<StreamFuture<ShellStream<T>>>,
207}
208
209impl<T: Unpin + Send + 'static> ShellRequest<T> {
210    fn new(
211        send_request: impl FnOnce() + Send + 'static,
212        output_receiver: mpsc::UnboundedReceiver<T>,
213    ) -> Self {
214        let inner = ShellStream::new(send_request, output_receiver)
215            .into_future()
216            .fuse();
217
218        Self { inner }
219    }
220}
221
222impl<T: Unpin + Send> Future for ShellRequest<T> {
223    type Output = T;
224
225    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
226        match self.inner.poll_unpin(cx) {
227            Poll::Ready((Some(output), _rest)) => Poll::Ready(output),
228            Poll::Ready((None, _rest)) => Poll::Pending,
229            Poll::Pending => Poll::Pending,
230        }
231    }
232}