Skip to main content

crux_core/command/
executor.rs

1#![allow(clippy::redundant_pub_crate)]
2use super::super::Command;
3
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::atomic::Ordering;
7use std::task::{Context, Poll, Wake, Waker};
8
9use crossbeam_channel::{Receiver, Sender};
10
11use futures::future::BoxFuture;
12
13use std::sync::atomic::AtomicBool;
14
15use futures::task::AtomicWaker;
16
17use std::sync::Arc;
18
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub(crate) struct TaskId(pub(crate) usize);
21
22// ANCHOR: task
23pub(crate) struct Task {
24    // Used to wake the join handle when the task concludes
25    pub(crate) join_handle_wakers: Receiver<Waker>,
26    // Set to true when the task finishes, used by the join handle
27    // RFC: is there a safe way to do this relying on the waker alone?
28    pub(crate) finished: Arc<AtomicBool>,
29    // Set to true when the task is aborted. Aborted tasks will poll Ready on the
30    // next poll
31    pub(crate) aborted: Arc<AtomicBool>,
32    // The future polled by this task
33    pub(crate) future: BoxFuture<'static, ()>,
34}
35// ANCHOR_END: task
36
37impl Task {
38    pub(crate) fn is_aborted(&self) -> bool {
39        self.aborted.load(Ordering::Acquire)
40    }
41
42    fn wake_join_handles(&self) {
43        for waker in self.join_handle_wakers.try_iter() {
44            // TODO: this potentially wakes tasks which are no longer interested
45            // and wakes tasks more than once if they await multiple copies of the same join handle
46            waker.wake();
47        }
48    }
49}
50
51// Waker provided to the tasks so they can schedule themselves to be woken
52// when their future is ready to proceed.
53// Waking a task also wakes the command itself, if it is being used as a Stream
54// inside another Command (or hosted with a CommandSink)
55// ANCHOR: command_waker
56pub(crate) struct CommandWaker {
57    pub(crate) task_id: TaskId,
58    pub(crate) ready_queue: Sender<TaskId>,
59    // Waker for the executor running this command as a Stream.
60    // When the command is executed directly (e.g. in tests) this waker
61    // will not be registered.
62    pub(crate) parent_waker: Arc<AtomicWaker>,
63    woken: AtomicBool,
64}
65
66impl Wake for CommandWaker {
67    fn wake(self: Arc<Self>) {
68        self.wake_by_ref();
69    }
70
71    fn wake_by_ref(self: &Arc<Self>) {
72        // If we can't send the id to the ready queue, there is no Command to poll the task again anyway,
73        // nothing to do.
74        // TODO: Does that mean we should bail, since waking ourselves is
75        // now pointless?
76        let _ = self.ready_queue.send(self.task_id);
77        self.woken.store(true, Ordering::Release);
78
79        // Note: calling `wake` before `register` is a no-op
80        self.parent_waker.wake();
81    }
82}
83// ANCHOR_END: command_waker
84
85/// A handle used to abort a Command remotely before it is complete
86#[derive(Clone)]
87pub struct AbortHandle {
88    pub(crate) aborted: Arc<AtomicBool>,
89}
90
91impl AbortHandle {
92    /// Abort the associated Command and all its tasks.
93    ///
94    /// The tasks will be stopped (not polled any more) at the next .await point.
95    /// If you use this, make sure the tasks the Command is running are all cancellation
96    /// safe, as they can be stopped at any of the await points or even before they are first polled
97    pub fn abort(&self) {
98        self.aborted.store(true, Ordering::Release);
99    }
100}
101
102/// A handle used to await a task completion of abort the task
103#[derive(Clone)]
104pub struct JoinHandle {
105    pub(crate) register_waker: Sender<Waker>,
106    pub(crate) finished: Arc<AtomicBool>,
107    pub(crate) aborted: Arc<AtomicBool>,
108}
109
110// RFC: I'm sure the ordering as used is fine...? Right? :) In all seriousness, how would
111// one test this to make sure it works as intended in a multi-threaded context?
112impl JoinHandle {
113    /// Abort the task associated with this join handle. The task will be aborted at the
114    /// next .await point. Any tasks this task spawned will continue running.
115    // RFC: Do we need to think more thoroughly about cancellation? For example, should
116    // the tasks have a parent-child relationship where cancelling the parent cancels all
117    // the children?
118    pub fn abort(&self) {
119        self.aborted.store(true, Ordering::Release);
120    }
121
122    pub(crate) fn is_finished(&self) -> bool {
123        self.finished.load(Ordering::Acquire)
124    }
125}
126
127impl Future for JoinHandle {
128    type Output = ();
129
130    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131        if self.is_finished() {
132            Poll::Ready(())
133        } else {
134            match self.register_waker.send(cx.waker().clone()) {
135                Ok(()) => Poll::Pending,
136                // The task no longer exists, we report ready immediately
137                Err(_) => Poll::Ready(()),
138            }
139        }
140    }
141}
142
143#[derive(Debug, PartialEq, Eq)]
144pub(crate) enum TaskState {
145    Missing,
146    Suspended,
147    Completed,
148    Cancelled,
149}
150
151// Command is actually an async executor of sorts, similar to futures::FuturesUnordered
152impl<Effect, Event> Command<Effect, Event> {
153    // Run all tasks until all of them are pending
154    // ANCHOR: run_until_settled
155    pub(crate) fn run_until_settled(&mut self) {
156        if self.was_aborted() {
157            // Spawn new tasks to clear the spawn_queue as well
158            self.spawn_new_tasks();
159
160            self.tasks.clear();
161
162            return;
163        }
164
165        loop {
166            self.spawn_new_tasks();
167
168            if self.ready_queue.is_empty() {
169                break;
170            }
171
172            while let Ok(task_id) = self.ready_queue.try_recv() {
173                match self.run_task(task_id) {
174                    TaskState::Missing | TaskState::Suspended => {
175                        // Missing:
176                        //   The task has been evicted because it completed.  This can happen when
177                        //   a _running_ task schedules itself to wake, but then completes and gets
178                        //   removed
179                        // Suspended:
180                        //   we pick it up again when it's woken up
181                    }
182                    TaskState::Completed | TaskState::Cancelled => {
183                        // Remove and drop the task, it's finished
184                        let task = self.tasks.remove(task_id.0);
185
186                        task.finished.store(true, Ordering::Release);
187                        task.wake_join_handles();
188
189                        drop(task);
190                    }
191                }
192            }
193        }
194    }
195    // ANCHOR_END: run_until_settled
196
197    pub(crate) fn run_task(&mut self, task_id: TaskId) -> TaskState {
198        let Some(task) = self.tasks.get_mut(task_id.0) else {
199            return TaskState::Missing;
200        };
201
202        if task.is_aborted() {
203            return TaskState::Completed;
204        }
205
206        let ready_queue = self.ready_sender.clone();
207        let parent_waker = self.waker.clone();
208
209        let arc_waker = Arc::new(CommandWaker {
210            task_id,
211            ready_queue,
212            parent_waker,
213            woken: AtomicBool::new(false),
214        });
215
216        let waker = arc_waker.clone().into();
217        let context = &mut Context::from_waker(&waker);
218
219        let result = match task.future.as_mut().poll(context) {
220            Poll::Pending => TaskState::Suspended,
221            Poll::Ready(()) => TaskState::Completed,
222        };
223
224        drop(waker);
225
226        // If the task is pending, but there's only one copy of the waker - our one -
227        // it can never be woken up again so we most likely need to evict it.
228        // This happens for shell communication futures when their requests are dropped
229        //
230        // Note that there is an exception: the task may have used the waker and dropped it,
231        // making it ready, rather than abandoned.
232        let task_is_ready = arc_waker.woken.load(Ordering::Acquire);
233        if result == TaskState::Suspended && !task_is_ready && Arc::strong_count(&arc_waker) < 2 {
234            return TaskState::Cancelled;
235        }
236
237        result
238    }
239
240    pub(crate) fn spawn_new_tasks(&mut self) {
241        while let Ok(task) = self.spawn_queue.try_recv() {
242            let task_id = self.tasks.insert(task);
243
244            self.ready_sender
245                .send(TaskId(task_id))
246                .expect("Command can't spawn a task, ready_queue has disconnected");
247        }
248    }
249
250    #[must_use]
251    pub fn was_aborted(&self) -> bool {
252        self.aborted.load(Ordering::Acquire)
253    }
254}