crux_core/command/
context.rs

1use std::future::Future;
2use std::pin::{pin, Pin};
3
4use std::task::{Context, Poll};
5
6use crossbeam_channel::Sender;
7use futures::channel::mpsc;
8use futures::future::Fuse;
9use futures::stream::StreamFuture;
10use futures::{FutureExt as _, Stream, StreamExt};
11
12use crate::capability::Operation;
13use crate::Request;
14
15use super::executor::{JoinHandle, Task};
16
17/// Context enabling tasks to communicate with the parent Command,
18/// specifically submit effects, events and spawn further tasks
19pub struct CommandContext<Effect, Event> {
20    pub(crate) effects: Sender<Effect>,
21    pub(crate) events: Sender<Event>,
22    pub(crate) tasks: Sender<Task>,
23}
24
25// derive(Clone) wants Effect and Event to be clone which is not actually necessary
26impl<Effect, Event> Clone for CommandContext<Effect, Event> {
27    fn clone(&self) -> Self {
28        Self {
29            effects: self.effects.clone(),
30            events: self.events.clone(),
31            tasks: self.tasks.clone(),
32        }
33    }
34}
35
36impl<Effect, Event> CommandContext<Effect, Event> {
37    /// Create a one-off notification to the shell. This method returns immediately.
38    pub fn notify_shell<Op>(&self, operation: Op)
39    where
40        Op: Operation,
41        Effect: From<Request<Op>>,
42    {
43        let request = Request::resolves_never(operation);
44
45        self.effects
46            .send(request.into())
47            .expect("Command could not send notification, effect channel disconnected");
48    }
49
50    /// Create a one-off request for an operation. Returns a future which eventually resolves
51    /// with the output of the operation provided by the shell.
52    pub fn request_from_shell<Op>(&self, operation: Op) -> ShellRequest<Op::Output>
53    where
54        Op: Operation,
55        Effect: From<Request<Op>> + Send + 'static,
56    {
57        let (output_sender, output_receiver) = mpsc::unbounded();
58
59        let request = Request::resolves_once(operation, move |output| {
60            // If the channel is closed, the associated task has been cancelled
61            let _ = output_sender.unbounded_send(output);
62        });
63
64        let send_request = {
65            let effect = request.into();
66            let effects = self.effects.clone();
67            move || {
68                effects
69                    .send(effect)
70                    .expect("Command could not send request effect, effect channel disconnected")
71            }
72        };
73
74        ShellRequest::new(Box::new(send_request), output_receiver)
75    }
76
77    /// Create a stream request for an operation. Returns a stream producing the
78    /// with the output of the operation every time it is provided by the shell.
79    pub fn stream_from_shell<Op>(&self, operation: Op) -> ShellStream<Op::Output>
80    where
81        Op: Operation,
82        Effect: From<Request<Op>> + Send + 'static,
83    {
84        let (output_sender, output_receiver) = mpsc::unbounded();
85
86        let request = Request::resolves_many_times(operation, move |output| {
87            output_sender.unbounded_send(output).map_err(|_| ())?;
88
89            // TODO: revisit the error handling in here
90            Ok(())
91        });
92
93        let send_request = {
94            let effect = request.into();
95            let effects = self.effects.clone();
96            move || {
97                effects
98                    .send(effect)
99                    .expect("Command could not send stream effect, effect channel disconnected")
100            }
101        };
102
103        ShellStream::new(send_request, output_receiver)
104    }
105
106    /// Send an event which should be handed to the update function. This is used to communicate the result
107    /// (or a sequence of results) of a command back to the app so that state can be updated accordingly
108    pub fn send_event(&self, event: Event) {
109        self.events
110            .send(event)
111            .expect("Command could not send event, event channel disconnected")
112    }
113
114    /// Spawn a new task within the same command. The task will execute concurrently with other tasks within the
115    /// command until it either concludes, is aborted, or until the parent command is aborted.
116    ///
117    /// Returns a JoinHandle which can be used as a future to await the completion of the task. It can also
118    /// be used to abort the task.
119    pub fn spawn<F, Fut>(&self, make_future: F) -> JoinHandle
120    where
121        F: FnOnce(CommandContext<Effect, Event>) -> Fut,
122        Fut: Future<Output = ()> + Send + 'static,
123    {
124        let (sender, receiver) = crossbeam_channel::unbounded();
125
126        let ctx = self.clone();
127        let future = make_future(ctx);
128
129        let task = Task {
130            finished: Default::default(),
131            aborted: Default::default(),
132            future: future.boxed(),
133            join_handle_wakers: receiver,
134        };
135
136        let handle = JoinHandle {
137            finished: task.finished.clone(),
138            aborted: task.aborted.clone(),
139            register_waker: sender,
140        };
141
142        self.tasks
143            .send(task)
144            .expect("Command could not spawn task, tasks channel disconnected");
145
146        handle
147    }
148}
149
150pub enum ShellStream<T: Unpin + Send> {
151    ReadyToSend(Box<dyn FnOnce() + Send>, mpsc::UnboundedReceiver<T>),
152    Sent(mpsc::UnboundedReceiver<T>),
153}
154
155impl<T: Unpin + Send> ShellStream<T> {
156    fn new(
157        send_request: impl FnOnce() + Send + 'static,
158        output_receiver: mpsc::UnboundedReceiver<T>,
159    ) -> Self {
160        ShellStream::ReadyToSend(Box::new(send_request), output_receiver)
161    }
162
163    fn send(&mut self) {
164        // Since neither part is Clone, we'll need to do an Indiana Jones
165
166        // 1. take items out of self
167        let dummy = ShellStream::Sent(mpsc::unbounded().1);
168        let ShellStream::ReadyToSend(send_request, output_receiver) =
169            std::mem::replace(self, dummy)
170        else {
171            unreachable!();
172        };
173
174        // 2. replace self with with a Sent using the original receiver
175        *self = ShellStream::Sent(output_receiver);
176
177        send_request()
178    }
179}
180
181impl<T: Unpin + Send> Stream for ShellStream<T> {
182    type Item = T;
183
184    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185        match *self {
186            ShellStream::ReadyToSend(_, ref mut output_receiver) => {
187                let poll = pin!(output_receiver).poll_next(cx);
188                assert!(matches!(poll, Poll::Pending)); // we have not sent the request yet
189
190                self.send();
191
192                Poll::Pending
193            }
194            ShellStream::Sent(ref mut output_receiver) => pin!(output_receiver).poll_next(cx),
195        }
196    }
197}
198
199pub struct ShellRequest<T: Unpin + Send> {
200    inner: Fuse<StreamFuture<ShellStream<T>>>,
201}
202
203impl<T: Unpin + Send + 'static> ShellRequest<T> {
204    fn new(
205        send_request: impl FnOnce() + Send + 'static,
206        output_receiver: mpsc::UnboundedReceiver<T>,
207    ) -> Self {
208        let inner = ShellStream::new(send_request, output_receiver)
209            .into_future()
210            .fuse();
211
212        Self { inner }
213    }
214}
215
216impl<T: Unpin + Send> Future for ShellRequest<T> {
217    type Output = T;
218
219    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
220        match self.inner.poll_unpin(cx) {
221            Poll::Ready((Some(output), _rest)) => Poll::Ready(output),
222            Poll::Ready((None, _rest)) => Poll::Pending,
223            Poll::Pending => Poll::Pending,
224        }
225    }
226}