Skip to main content

crux_core/command/
executor.rs

1use super::super::Command;
2
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::Ordering;
6use std::task::{Context, Poll, Wake, Waker};
7
8use crossbeam_channel::{Receiver, Sender};
9
10use futures::future::BoxFuture;
11
12use std::sync::atomic::AtomicBool;
13
14use futures::task::AtomicWaker;
15
16use std::sync::Arc;
17
18#[derive(Clone, Copy, Debug, PartialEq)]
19pub(crate) struct TaskId(pub(crate) usize);
20
21// ANCHOR: task
22pub(crate) struct Task {
23    // Used to wake the join handle when the task concludes
24    pub(crate) join_handle_wakers: Receiver<Waker>,
25    // Set to true when the task finishes, used by the join handle
26    // RFC: is there a safe way to do this relying on the waker alone?
27    pub(crate) finished: Arc<AtomicBool>,
28    // Set to true when the task is aborted. Aborted tasks will poll Ready on the
29    // next poll
30    pub(crate) aborted: Arc<AtomicBool>,
31    // The future polled by this task
32    pub(crate) future: BoxFuture<'static, ()>,
33}
34// ANCHOR_END: task
35
36impl Task {
37    pub(crate) fn is_aborted(&self) -> bool {
38        self.aborted.load(Ordering::Acquire)
39    }
40
41    fn wake_join_handles(&self) {
42        for waker in self.join_handle_wakers.try_iter() {
43            // TODO: this potentially wakes tasks which are no longer interested
44            // and wakes tasks more than once if they await multiple copies of the same join handle
45            waker.wake();
46        }
47    }
48}
49
50// Waker provided to the tasks so they can schedule themselves to be woken
51// when their future is ready to proceed.
52// Waking a task also wakes the command itself, if it is being used as a Stream
53// inside another Command (or hosted with a CommandSink)
54// ANCHOR: command_waker
55pub(crate) struct CommandWaker {
56    pub(crate) task_id: TaskId,
57    pub(crate) ready_queue: Sender<TaskId>,
58    // Waker for the executor running this command as a Stream.
59    // When the command is executed directly (e.g. in tests) this waker
60    // will not be registered.
61    pub(crate) parent_waker: Arc<AtomicWaker>,
62    woken: AtomicBool,
63}
64
65impl Wake for CommandWaker {
66    fn wake(self: Arc<Self>) {
67        self.wake_by_ref();
68    }
69
70    fn wake_by_ref(self: &Arc<Self>) {
71        // If we can't send the id to the ready queue, there is no Command to poll the task again anyway,
72        // nothing to do.
73        // TODO: Does that mean we should bail, since waking ourselves is
74        // now pointless?
75        let _ = self.ready_queue.send(self.task_id);
76        self.woken.store(true, Ordering::Release);
77
78        // Note: calling `wake` before `register` is a no-op
79        self.parent_waker.wake();
80    }
81}
82// ANCHOR_END: command_waker
83
84/// A handle used to abort a Command remotely before it is complete
85#[derive(Clone)]
86pub struct AbortHandle {
87    pub(crate) aborted: Arc<AtomicBool>,
88}
89
90impl AbortHandle {
91    /// Abort the associated Command and all its tasks.
92    ///
93    /// The tasks will be stopped (not polled any more) at the next .await point.
94    /// If you use this, make sure the tasks the Command is running are all cancellation
95    /// safe, as they can be stopped at any of the await points or even before they are first polled
96    pub fn abort(&self) {
97        self.aborted.store(true, Ordering::Release);
98    }
99}
100
101/// A handle used to await a task completion of abort the task
102#[derive(Clone)]
103pub struct JoinHandle {
104    pub(crate) register_waker: Sender<Waker>,
105    pub(crate) finished: Arc<AtomicBool>,
106    pub(crate) aborted: Arc<AtomicBool>,
107}
108
109// RFC: I'm sure the ordering as used is fine...? Right? :) In all seriousness, how would
110// one test this to make sure it works as intended in a multi-threaded context?
111impl JoinHandle {
112    /// Abort the task associated with this join handle. The task will be aborted at the
113    /// next .await point. Any tasks this task spawned will continue running.
114    // RFC: Do we need to think more thoroughly about cancellation? For example, should
115    // the tasks have a parent-child relationship where cancelling the parent cancels all
116    // the children?
117    pub fn abort(&self) {
118        self.aborted.store(true, Ordering::Release);
119    }
120
121    pub(crate) fn is_finished(&self) -> bool {
122        self.finished.load(Ordering::Acquire)
123    }
124}
125
126impl Future for JoinHandle {
127    type Output = ();
128
129    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130        if self.is_finished() {
131            Poll::Ready(())
132        } else {
133            match self.register_waker.send(cx.waker().clone()) {
134                Ok(()) => Poll::Pending,
135                // The task no longer exists, we report ready immediately
136                Err(_) => Poll::Ready(()),
137            }
138        }
139    }
140}
141
142#[derive(Debug, PartialEq)]
143pub(crate) enum TaskState {
144    Missing,
145    Suspended,
146    Completed,
147    Cancelled,
148}
149
150// Command is actually an async executor of sorts, similar to futures::FuturesUnordered
151impl<Effect, Event> Command<Effect, Event> {
152    // Run all tasks until all of them are pending
153    // ANCHOR: run_until_settled
154    pub(crate) fn run_until_settled(&mut self) {
155        if self.was_aborted() {
156            // Spawn new tasks to clear the spawn_queue as well
157            self.spawn_new_tasks();
158
159            self.tasks.clear();
160
161            return;
162        }
163
164        loop {
165            self.spawn_new_tasks();
166
167            if self.ready_queue.is_empty() {
168                break;
169            }
170
171            while let Ok(task_id) = self.ready_queue.try_recv() {
172                match self.run_task(task_id) {
173                    TaskState::Missing | TaskState::Suspended => {
174                        // Missing:
175                        //   The task has been evicted because it completed.  This can happen when
176                        //   a _running_ task schedules itself to wake, but then completes and gets
177                        //   removed
178                        // Suspended:
179                        //   we pick it up again when it's woken up
180                    }
181                    TaskState::Completed | TaskState::Cancelled => {
182                        // Remove and drop the task, it's finished
183                        let task = self.tasks.remove(task_id.0);
184
185                        task.finished.store(true, Ordering::Release);
186                        task.wake_join_handles();
187
188                        drop(task);
189                    }
190                }
191            }
192        }
193    }
194    // ANCHOR_END: run_until_settled
195
196    pub(crate) fn run_task(&mut self, task_id: TaskId) -> TaskState {
197        let Some(task) = self.tasks.get_mut(task_id.0) else {
198            return TaskState::Missing;
199        };
200
201        if task.is_aborted() {
202            return TaskState::Completed;
203        }
204
205        let ready_queue = self.ready_sender.clone();
206        let parent_waker = self.waker.clone();
207
208        let arc_waker = Arc::new(CommandWaker {
209            task_id,
210            ready_queue,
211            parent_waker,
212            woken: AtomicBool::new(false),
213        });
214
215        let waker = arc_waker.clone().into();
216        let context = &mut Context::from_waker(&waker);
217
218        let result = match task.future.as_mut().poll(context) {
219            Poll::Pending => TaskState::Suspended,
220            Poll::Ready(()) => TaskState::Completed,
221        };
222
223        drop(waker);
224
225        // If the task is pending, but there's only one copy of the waker - our one -
226        // it can never be woken up again so we most likely need to evict it.
227        // This happens for shell communication futures when their requests are dropped
228        //
229        // Note that there is an exception: the task may have used the waker and dropped it,
230        // making it ready, rather than abandoned.
231        let task_is_ready = arc_waker.woken.load(Ordering::Acquire);
232        if result == TaskState::Suspended && !task_is_ready && Arc::strong_count(&arc_waker) < 2 {
233            return TaskState::Cancelled;
234        }
235
236        result
237    }
238
239    pub(crate) fn spawn_new_tasks(&mut self) {
240        while let Ok(task) = self.spawn_queue.try_recv() {
241            let task_id = self.tasks.insert(task);
242
243            self.ready_sender
244                .send(TaskId(task_id))
245                .expect("Command can't spawn a task, ready_queue has disconnected");
246        }
247    }
248
249    #[must_use]
250    pub fn was_aborted(&self) -> bool {
251        self.aborted.load(Ordering::Acquire)
252    }
253}