crux_core/command/executor.rs
1#![allow(clippy::redundant_pub_crate)]
2use super::super::Command;
3
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::atomic::Ordering;
7use std::task::{Context, Poll, Wake, Waker};
8
9use crossbeam_channel::{Receiver, Sender};
10
11use futures::future::BoxFuture;
12
13use std::sync::atomic::AtomicBool;
14
15use futures::task::AtomicWaker;
16
17use std::sync::Arc;
18
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub(crate) struct TaskId(pub(crate) usize);
21
22// ANCHOR: task
23pub(crate) struct Task {
24 // Used to wake the join handle when the task concludes
25 pub(crate) join_handle_wakers: Receiver<Waker>,
26 // Set to true when the task finishes, used by the join handle
27 // RFC: is there a safe way to do this relying on the waker alone?
28 pub(crate) finished: Arc<AtomicBool>,
29 // Set to true when the task is aborted. Aborted tasks will poll Ready on the
30 // next poll
31 pub(crate) aborted: Arc<AtomicBool>,
32 // The future polled by this task
33 pub(crate) future: BoxFuture<'static, ()>,
34}
35// ANCHOR_END: task
36
37impl Task {
38 pub(crate) fn is_aborted(&self) -> bool {
39 self.aborted.load(Ordering::Acquire)
40 }
41
42 fn wake_join_handles(&self) {
43 for waker in self.join_handle_wakers.try_iter() {
44 // TODO: this potentially wakes tasks which are no longer interested
45 // and wakes tasks more than once if they await multiple copies of the same join handle
46 waker.wake();
47 }
48 }
49}
50
51// Waker provided to the tasks so they can schedule themselves to be woken
52// when their future is ready to proceed.
53// Waking a task also wakes the command itself, if it is being used as a Stream
54// inside another Command (or hosted with a CommandSink)
55// ANCHOR: command_waker
56pub(crate) struct CommandWaker {
57 pub(crate) task_id: TaskId,
58 pub(crate) ready_queue: Sender<TaskId>,
59 // Waker for the executor running this command as a Stream.
60 // When the command is executed directly (e.g. in tests) this waker
61 // will not be registered.
62 pub(crate) parent_waker: Arc<AtomicWaker>,
63 woken: AtomicBool,
64}
65
66impl Wake for CommandWaker {
67 fn wake(self: Arc<Self>) {
68 self.wake_by_ref();
69 }
70
71 fn wake_by_ref(self: &Arc<Self>) {
72 // If we can't send the id to the ready queue, there is no Command to poll the task again anyway,
73 // nothing to do.
74 // TODO: Does that mean we should bail, since waking ourselves is
75 // now pointless?
76 let _ = self.ready_queue.send(self.task_id);
77 self.woken.store(true, Ordering::Release);
78
79 // Note: calling `wake` before `register` is a no-op
80 self.parent_waker.wake();
81 }
82}
83// ANCHOR_END: command_waker
84
85/// A handle used to abort a Command remotely before it is complete
86#[derive(Clone)]
87pub struct AbortHandle {
88 pub(crate) aborted: Arc<AtomicBool>,
89}
90
91impl AbortHandle {
92 /// Abort the associated Command and all its tasks.
93 ///
94 /// The tasks will be stopped (not polled any more) at the next .await point.
95 /// If you use this, make sure the tasks the Command is running are all cancellation
96 /// safe, as they can be stopped at any of the await points or even before they are first polled
97 pub fn abort(&self) {
98 self.aborted.store(true, Ordering::Release);
99 }
100}
101
102/// A handle used to await a task completion of abort the task
103#[derive(Clone)]
104pub struct JoinHandle {
105 pub(crate) register_waker: Sender<Waker>,
106 pub(crate) finished: Arc<AtomicBool>,
107 pub(crate) aborted: Arc<AtomicBool>,
108}
109
110// RFC: I'm sure the ordering as used is fine...? Right? :) In all seriousness, how would
111// one test this to make sure it works as intended in a multi-threaded context?
112impl JoinHandle {
113 /// Abort the task associated with this join handle. The task will be aborted at the
114 /// next .await point. Any tasks this task spawned will continue running.
115 // RFC: Do we need to think more thoroughly about cancellation? For example, should
116 // the tasks have a parent-child relationship where cancelling the parent cancels all
117 // the children?
118 pub fn abort(&self) {
119 self.aborted.store(true, Ordering::Release);
120 }
121
122 pub(crate) fn is_finished(&self) -> bool {
123 self.finished.load(Ordering::Acquire)
124 }
125}
126
127impl Future for JoinHandle {
128 type Output = ();
129
130 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131 if self.is_finished() {
132 Poll::Ready(())
133 } else {
134 match self.register_waker.send(cx.waker().clone()) {
135 Ok(()) => Poll::Pending,
136 // The task no longer exists, we report ready immediately
137 Err(_) => Poll::Ready(()),
138 }
139 }
140 }
141}
142
143#[derive(Debug, PartialEq, Eq)]
144pub(crate) enum TaskState {
145 Missing,
146 Suspended,
147 Completed,
148 Cancelled,
149}
150
151// Command is actually an async executor of sorts, similar to futures::FuturesUnordered
152impl<Effect, Event> Command<Effect, Event> {
153 // Run all tasks until all of them are pending
154 // ANCHOR: run_until_settled
155 pub(crate) fn run_until_settled(&mut self) {
156 if self.was_aborted() {
157 // Spawn new tasks to clear the spawn_queue as well
158 self.spawn_new_tasks();
159
160 self.tasks.clear();
161
162 return;
163 }
164
165 loop {
166 self.spawn_new_tasks();
167
168 if self.ready_queue.is_empty() {
169 break;
170 }
171
172 while let Ok(task_id) = self.ready_queue.try_recv() {
173 match self.run_task(task_id) {
174 TaskState::Missing | TaskState::Suspended => {
175 // Missing:
176 // The task has been evicted because it completed. This can happen when
177 // a _running_ task schedules itself to wake, but then completes and gets
178 // removed
179 // Suspended:
180 // we pick it up again when it's woken up
181 }
182 TaskState::Completed | TaskState::Cancelled => {
183 // Remove and drop the task, it's finished
184 let task = self.tasks.remove(task_id.0);
185
186 task.finished.store(true, Ordering::Release);
187 task.wake_join_handles();
188
189 drop(task);
190 }
191 }
192 }
193 }
194 }
195 // ANCHOR_END: run_until_settled
196
197 pub(crate) fn run_task(&mut self, task_id: TaskId) -> TaskState {
198 let Some(task) = self.tasks.get_mut(task_id.0) else {
199 return TaskState::Missing;
200 };
201
202 if task.is_aborted() {
203 return TaskState::Completed;
204 }
205
206 let ready_queue = self.ready_sender.clone();
207 let parent_waker = self.waker.clone();
208
209 let arc_waker = Arc::new(CommandWaker {
210 task_id,
211 ready_queue,
212 parent_waker,
213 woken: AtomicBool::new(false),
214 });
215
216 let waker = arc_waker.clone().into();
217 let context = &mut Context::from_waker(&waker);
218
219 let result = match task.future.as_mut().poll(context) {
220 Poll::Pending => TaskState::Suspended,
221 Poll::Ready(()) => TaskState::Completed,
222 };
223
224 drop(waker);
225
226 // If the task is pending, but there's only one copy of the waker - our one -
227 // it can never be woken up again so we most likely need to evict it.
228 // This happens for shell communication futures when their requests are dropped
229 //
230 // Note that there is an exception: the task may have used the waker and dropped it,
231 // making it ready, rather than abandoned.
232 let task_is_ready = arc_waker.woken.load(Ordering::Acquire);
233 if result == TaskState::Suspended && !task_is_ready && Arc::strong_count(&arc_waker) < 2 {
234 return TaskState::Cancelled;
235 }
236
237 result
238 }
239
240 pub(crate) fn spawn_new_tasks(&mut self) {
241 while let Ok(task) = self.spawn_queue.try_recv() {
242 let task_id = self.tasks.insert(task);
243
244 self.ready_sender
245 .send(TaskId(task_id))
246 .expect("Command can't spawn a task, ready_queue has disconnected");
247 }
248 }
249
250 #[must_use]
251 pub fn was_aborted(&self) -> bool {
252 self.aborted.load(Ordering::Acquire)
253 }
254}