crux_core/command/
executor.rs

1use super::super::Command;
2
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::Ordering;
6use std::task::{Context, Poll, Wake, Waker};
7
8use crossbeam_channel::{Receiver, Sender};
9
10use futures::future::BoxFuture;
11
12use std::sync::atomic::AtomicBool;
13
14use futures::task::AtomicWaker;
15
16use std::sync::Arc;
17
18#[derive(Clone, Copy, Debug, PartialEq)]
19pub(crate) struct TaskId(pub(crate) usize);
20
21pub(crate) struct Task {
22    // Used to wake the join handle when the task concludes
23    pub(crate) join_handle_wakers: Receiver<Waker>,
24    // Set to true when the task finishes, used by the join handle
25    // RFC: is there a safe way to do this relying on the waker alone?
26    pub(crate) finished: Arc<AtomicBool>,
27    // Set to true when the task is aborted. Aborted tasks will poll Ready on the
28    // next poll
29    pub(crate) aborted: Arc<AtomicBool>,
30    // The future polled by this task
31    pub(crate) future: BoxFuture<'static, ()>,
32}
33
34impl Task {
35    pub(crate) fn is_aborted(&self) -> bool {
36        self.aborted.load(Ordering::Acquire)
37    }
38
39    fn wake_join_handles(&self) {
40        for waker in self.join_handle_wakers.try_iter() {
41            // TODO: this potentially wakes tasks which are no longer interested
42            // and wakes tasks more than once if they await multiple copies of the same join handle
43            waker.wake();
44        }
45    }
46}
47
48// Waker provided to the tasks so they can schedule themselves to be woken
49// when their future is ready to proceed.
50// Waking a task also wakes the command itself, if it is being used as a Stream
51// inside another Command (or hosted with a CommandSink)
52pub(crate) struct CommandWaker {
53    pub(crate) task_id: TaskId,
54    pub(crate) ready_queue: Sender<TaskId>,
55    // Waker for the executor running this command as a Stream.
56    // When the command is executed directly (e.g. in tests) this waker
57    // will not be registered.
58    pub(crate) parent_waker: Arc<AtomicWaker>,
59    woken: AtomicBool,
60}
61
62impl Wake for CommandWaker {
63    fn wake(self: Arc<Self>) {
64        self.wake_by_ref();
65    }
66
67    fn wake_by_ref(self: &Arc<Self>) {
68        // If we can't send the id to the ready queue, there is no Command to poll the task again anyway,
69        // nothing to do.
70        // TODO: Does that mean we should bail, since waking ourselves is
71        // now pointless?
72        let _ = self.ready_queue.send(self.task_id);
73        self.woken.store(true, Ordering::Release);
74
75        // Note: calling `wake` before `register` is a no-op
76        self.parent_waker.wake();
77    }
78}
79
80/// A handle used to abort a Command remotely before it is complete
81#[derive(Clone)]
82pub struct AbortHandle {
83    pub(crate) aborted: Arc<AtomicBool>,
84}
85
86impl AbortHandle {
87    /// Abort the associated Command and all its tasks.
88    ///
89    /// The tasks will be stopped (not polled any more) at the next .await point.
90    /// If you use this, make sure the tasks the Command is running are all cancellation
91    /// safe, as they can be stopped at any of the await points or even before they are first polled
92    pub fn abort(&self) {
93        self.aborted.store(true, Ordering::Release);
94    }
95}
96
97/// A handle used to await a task completion of abort the task
98#[derive(Clone)]
99pub struct JoinHandle {
100    pub(crate) register_waker: Sender<Waker>,
101    pub(crate) finished: Arc<AtomicBool>,
102    pub(crate) aborted: Arc<AtomicBool>,
103}
104
105// RFC: I'm sure the ordering as used is fine...? Right? :) In all seriousness, how would
106// one test this to make sure it works as intended in a multi-threaded context?
107impl JoinHandle {
108    /// Abort the task associated with this join handle. The task will be aborted at the
109    /// next .await point. Any tasks this task spawned will continue running.
110    // RFC: Do we need to think more thoroughly about cancellation? For example, should
111    // the tasks have a parent-child relationship where cancelling the parent cancels all
112    // the children?
113    pub fn abort(&self) {
114        self.aborted.store(true, Ordering::Release);
115    }
116
117    pub(crate) fn is_finished(&self) -> bool {
118        self.finished.load(Ordering::Acquire)
119    }
120}
121
122impl Future for JoinHandle {
123    type Output = ();
124
125    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126        if self.is_finished() {
127            Poll::Ready(())
128        } else {
129            match self.register_waker.send(cx.waker().clone()) {
130                Ok(_) => Poll::Pending,
131                // The task no longer exists, we report ready immediately
132                Err(_) => Poll::Ready(()),
133            }
134        }
135    }
136}
137
138#[derive(Debug, PartialEq)]
139pub(crate) enum TaskState {
140    Missing,
141    Suspended,
142    Completed,
143    Cancelled,
144}
145
146// Command is actually an async executor of sorts, similar to futures::FuturesUnordered
147impl<Effect, Event> Command<Effect, Event> {
148    // Run all tasks until all of them are pending
149    pub(crate) fn run_until_settled(&mut self) {
150        if self.was_aborted() {
151            self.tasks.clear();
152
153            return;
154        }
155
156        loop {
157            self.spawn_new_tasks();
158
159            if self.ready_queue.is_empty() {
160                break;
161            }
162
163            while let Ok(task_id) = self.ready_queue.try_recv() {
164                match self.run_task(task_id) {
165                    TaskState::Missing => {
166                        // The task has been evicted because it completed.  This can happen when
167                        // a _running_ task schedules itself to wake, but then completes and gets
168                        // removed
169                    }
170                    TaskState::Suspended => {
171                        // Task suspended, we pick it up again when it's woken up
172                    }
173                    TaskState::Completed | TaskState::Cancelled => {
174                        // Remove and drop the task, it's finished
175                        let task = self.tasks.remove(task_id.0);
176
177                        task.finished.store(true, Ordering::Release);
178                        task.wake_join_handles();
179
180                        drop(task);
181                    }
182                };
183            }
184        }
185    }
186
187    pub(crate) fn run_task(&mut self, task_id: TaskId) -> TaskState {
188        let Some(task) = self.tasks.get_mut(task_id.0) else {
189            return TaskState::Missing;
190        };
191
192        if task.is_aborted() {
193            return TaskState::Completed;
194        }
195
196        let ready_queue = self.ready_sender.clone();
197        let parent_waker = self.waker.clone();
198
199        let arc_waker = Arc::new(CommandWaker {
200            task_id,
201            ready_queue,
202            parent_waker,
203            woken: AtomicBool::new(false),
204        });
205
206        let waker = arc_waker.clone().into();
207        let context = &mut Context::from_waker(&waker);
208
209        let result = match task.future.as_mut().poll(context) {
210            Poll::Pending => TaskState::Suspended,
211            Poll::Ready(_) => TaskState::Completed,
212        };
213
214        drop(waker);
215
216        // If the task is pending, but there's only one copy of the waker - our one -
217        // it can never be woken up again so we most likely need to evict it.
218        // This happens for shell communication futures when their requests are dropped
219        //
220        // Note that there is an exception: the task may have used the waker and dropped it,
221        // making it ready, rather than abandoned.
222        let task_is_ready = arc_waker.woken.load(Ordering::Acquire);
223        if result == TaskState::Suspended && !task_is_ready && Arc::strong_count(&arc_waker) < 2 {
224            return TaskState::Cancelled;
225        }
226
227        result
228    }
229
230    pub(crate) fn spawn_new_tasks(&mut self) {
231        while let Ok(task) = self.spawn_queue.try_recv() {
232            let task_id = self.tasks.insert(task);
233
234            self.ready_sender
235                .send(TaskId(task_id))
236                .expect("Command can't spawn a task, ready_queue has disconnected");
237        }
238    }
239
240    pub fn was_aborted(&self) -> bool {
241        self.aborted.load(Ordering::Acquire)
242    }
243}