crux_core/command/executor.rs
1use super::super::Command;
2
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::Ordering;
6use std::task::{Context, Poll, Wake, Waker};
7
8use crossbeam_channel::{Receiver, Sender};
9
10use futures::future::BoxFuture;
11
12use std::sync::atomic::AtomicBool;
13
14use futures::task::AtomicWaker;
15
16use std::sync::Arc;
17
18#[derive(Clone, Copy, Debug, PartialEq)]
19pub(crate) struct TaskId(pub(crate) usize);
20
21// ANCHOR: task
22pub(crate) struct Task {
23 // Used to wake the join handle when the task concludes
24 pub(crate) join_handle_wakers: Receiver<Waker>,
25 // Set to true when the task finishes, used by the join handle
26 // RFC: is there a safe way to do this relying on the waker alone?
27 pub(crate) finished: Arc<AtomicBool>,
28 // Set to true when the task is aborted. Aborted tasks will poll Ready on the
29 // next poll
30 pub(crate) aborted: Arc<AtomicBool>,
31 // The future polled by this task
32 pub(crate) future: BoxFuture<'static, ()>,
33}
34// ANCHOR_END: task
35
36impl Task {
37 pub(crate) fn is_aborted(&self) -> bool {
38 self.aborted.load(Ordering::Acquire)
39 }
40
41 fn wake_join_handles(&self) {
42 for waker in self.join_handle_wakers.try_iter() {
43 // TODO: this potentially wakes tasks which are no longer interested
44 // and wakes tasks more than once if they await multiple copies of the same join handle
45 waker.wake();
46 }
47 }
48}
49
50// Waker provided to the tasks so they can schedule themselves to be woken
51// when their future is ready to proceed.
52// Waking a task also wakes the command itself, if it is being used as a Stream
53// inside another Command (or hosted with a CommandSink)
54// ANCHOR: command_waker
55pub(crate) struct CommandWaker {
56 pub(crate) task_id: TaskId,
57 pub(crate) ready_queue: Sender<TaskId>,
58 // Waker for the executor running this command as a Stream.
59 // When the command is executed directly (e.g. in tests) this waker
60 // will not be registered.
61 pub(crate) parent_waker: Arc<AtomicWaker>,
62 woken: AtomicBool,
63}
64
65impl Wake for CommandWaker {
66 fn wake(self: Arc<Self>) {
67 self.wake_by_ref();
68 }
69
70 fn wake_by_ref(self: &Arc<Self>) {
71 // If we can't send the id to the ready queue, there is no Command to poll the task again anyway,
72 // nothing to do.
73 // TODO: Does that mean we should bail, since waking ourselves is
74 // now pointless?
75 let _ = self.ready_queue.send(self.task_id);
76 self.woken.store(true, Ordering::Release);
77
78 // Note: calling `wake` before `register` is a no-op
79 self.parent_waker.wake();
80 }
81}
82// ANCHOR_END: command_waker
83
84/// A handle used to abort a Command remotely before it is complete
85#[derive(Clone)]
86pub struct AbortHandle {
87 pub(crate) aborted: Arc<AtomicBool>,
88}
89
90impl AbortHandle {
91 /// Abort the associated Command and all its tasks.
92 ///
93 /// The tasks will be stopped (not polled any more) at the next .await point.
94 /// If you use this, make sure the tasks the Command is running are all cancellation
95 /// safe, as they can be stopped at any of the await points or even before they are first polled
96 pub fn abort(&self) {
97 self.aborted.store(true, Ordering::Release);
98 }
99}
100
101/// A handle used to await a task completion of abort the task
102#[derive(Clone)]
103pub struct JoinHandle {
104 pub(crate) register_waker: Sender<Waker>,
105 pub(crate) finished: Arc<AtomicBool>,
106 pub(crate) aborted: Arc<AtomicBool>,
107}
108
109// RFC: I'm sure the ordering as used is fine...? Right? :) In all seriousness, how would
110// one test this to make sure it works as intended in a multi-threaded context?
111impl JoinHandle {
112 /// Abort the task associated with this join handle. The task will be aborted at the
113 /// next .await point. Any tasks this task spawned will continue running.
114 // RFC: Do we need to think more thoroughly about cancellation? For example, should
115 // the tasks have a parent-child relationship where cancelling the parent cancels all
116 // the children?
117 pub fn abort(&self) {
118 self.aborted.store(true, Ordering::Release);
119 }
120
121 pub(crate) fn is_finished(&self) -> bool {
122 self.finished.load(Ordering::Acquire)
123 }
124}
125
126impl Future for JoinHandle {
127 type Output = ();
128
129 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130 if self.is_finished() {
131 Poll::Ready(())
132 } else {
133 match self.register_waker.send(cx.waker().clone()) {
134 Ok(()) => Poll::Pending,
135 // The task no longer exists, we report ready immediately
136 Err(_) => Poll::Ready(()),
137 }
138 }
139 }
140}
141
142#[derive(Debug, PartialEq)]
143pub(crate) enum TaskState {
144 Missing,
145 Suspended,
146 Completed,
147 Cancelled,
148}
149
150// Command is actually an async executor of sorts, similar to futures::FuturesUnordered
151impl<Effect, Event> Command<Effect, Event> {
152 // Run all tasks until all of them are pending
153 // ANCHOR: run_until_settled
154 pub(crate) fn run_until_settled(&mut self) {
155 if self.was_aborted() {
156 // Spawn new tasks to clear the spawn_queue as well
157 self.spawn_new_tasks();
158
159 self.tasks.clear();
160
161 return;
162 }
163
164 loop {
165 self.spawn_new_tasks();
166
167 if self.ready_queue.is_empty() {
168 break;
169 }
170
171 while let Ok(task_id) = self.ready_queue.try_recv() {
172 match self.run_task(task_id) {
173 TaskState::Missing | TaskState::Suspended => {
174 // Missing:
175 // The task has been evicted because it completed. This can happen when
176 // a _running_ task schedules itself to wake, but then completes and gets
177 // removed
178 // Suspended:
179 // we pick it up again when it's woken up
180 }
181 TaskState::Completed | TaskState::Cancelled => {
182 // Remove and drop the task, it's finished
183 let task = self.tasks.remove(task_id.0);
184
185 task.finished.store(true, Ordering::Release);
186 task.wake_join_handles();
187
188 drop(task);
189 }
190 }
191 }
192 }
193 }
194 // ANCHOR_END: run_until_settled
195
196 pub(crate) fn run_task(&mut self, task_id: TaskId) -> TaskState {
197 let Some(task) = self.tasks.get_mut(task_id.0) else {
198 return TaskState::Missing;
199 };
200
201 if task.is_aborted() {
202 return TaskState::Completed;
203 }
204
205 let ready_queue = self.ready_sender.clone();
206 let parent_waker = self.waker.clone();
207
208 let arc_waker = Arc::new(CommandWaker {
209 task_id,
210 ready_queue,
211 parent_waker,
212 woken: AtomicBool::new(false),
213 });
214
215 let waker = arc_waker.clone().into();
216 let context = &mut Context::from_waker(&waker);
217
218 let result = match task.future.as_mut().poll(context) {
219 Poll::Pending => TaskState::Suspended,
220 Poll::Ready(()) => TaskState::Completed,
221 };
222
223 drop(waker);
224
225 // If the task is pending, but there's only one copy of the waker - our one -
226 // it can never be woken up again so we most likely need to evict it.
227 // This happens for shell communication futures when their requests are dropped
228 //
229 // Note that there is an exception: the task may have used the waker and dropped it,
230 // making it ready, rather than abandoned.
231 let task_is_ready = arc_waker.woken.load(Ordering::Acquire);
232 if result == TaskState::Suspended && !task_is_ready && Arc::strong_count(&arc_waker) < 2 {
233 return TaskState::Cancelled;
234 }
235
236 result
237 }
238
239 pub(crate) fn spawn_new_tasks(&mut self) {
240 while let Ok(task) = self.spawn_queue.try_recv() {
241 let task_id = self.tasks.insert(task);
242
243 self.ready_sender
244 .send(TaskId(task_id))
245 .expect("Command can't spawn a task, ready_queue has disconnected");
246 }
247 }
248
249 #[must_use]
250 pub fn was_aborted(&self) -> bool {
251 self.aborted.load(Ordering::Acquire)
252 }
253}