crux_core/capability/
executor.rs

1use std::{
2    sync::{Arc, Mutex},
3    task::{Context, Wake},
4};
5
6use crossbeam_channel::{Receiver, Sender};
7use futures::{future, Future, FutureExt};
8use slab::Slab;
9
10type BoxFuture = future::BoxFuture<'static, ()>;
11
12// used in docs/internals/runtime.md
13// ANCHOR: executor
14pub(crate) struct QueuingExecutor {
15    spawn_queue: Receiver<BoxFuture>,
16    ready_queue: Receiver<TaskId>,
17    ready_sender: Sender<TaskId>,
18    tasks: Mutex<Slab<Option<BoxFuture>>>,
19}
20// ANCHOR_END: executor
21
22// used in docs/internals/runtime.md
23// ANCHOR: spawner
24#[derive(Clone)]
25pub struct Spawner {
26    future_sender: Sender<BoxFuture>,
27}
28// ANCHOR_END: spawner
29
30#[derive(Clone, Copy, Debug)]
31struct TaskId(u32);
32
33impl std::ops::Deref for TaskId {
34    type Target = u32;
35
36    fn deref(&self) -> &Self::Target {
37        &self.0
38    }
39}
40
41pub(crate) fn executor_and_spawner() -> (QueuingExecutor, Spawner) {
42    let (future_sender, spawn_queue) = crossbeam_channel::unbounded();
43    let (ready_sender, ready_queue) = crossbeam_channel::unbounded();
44
45    (
46        QueuingExecutor {
47            ready_queue,
48            spawn_queue,
49            ready_sender,
50            tasks: Mutex::new(Slab::new()),
51        },
52        Spawner { future_sender },
53    )
54}
55
56// used in docs/internals/runtime.md
57// ANCHOR: spawning
58impl Spawner {
59    pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
60        let future = future.boxed();
61        self.future_sender
62            .send(future)
63            .expect("unable to spawn an async task, task sender channel is disconnected.")
64    }
65}
66// ANCHOR_END: spawning
67
68#[derive(Clone)]
69struct TaskWaker {
70    task_id: TaskId,
71    sender: Sender<TaskId>,
72}
73
74// used in docs/internals/runtime.md
75// ANCHOR: wake
76impl Wake for TaskWaker {
77    fn wake(self: Arc<Self>) {
78        self.wake_by_ref();
79    }
80
81    fn wake_by_ref(self: &Arc<Self>) {
82        // This send can fail if the executor has been dropped.
83        // In which case, nothing to do
84        let _ = self.sender.send(self.task_id);
85    }
86}
87// ANCHOR_END: wake
88
89// used in docs/internals/runtime.md
90// ANCHOR: run_all
91impl QueuingExecutor {
92    pub fn run_all(&self) {
93        // we read off both queues and execute the tasks we receive.
94        // Since either queue can generate work for the other queue,
95        // we read from them in a loop until we are sure both queues
96        // are exhausted
97        let mut did_some_work = true;
98
99        while did_some_work {
100            did_some_work = false;
101            while let Ok(task) = self.spawn_queue.try_recv() {
102                let task_id = self
103                    .tasks
104                    .lock()
105                    .expect("Task slab poisoned")
106                    .insert(Some(task));
107                self.run_task(TaskId(task_id.try_into().expect("TaskId overflow")));
108                did_some_work = true;
109            }
110            while let Ok(task_id) = self.ready_queue.try_recv() {
111                match self.run_task(task_id) {
112                    RunTask::Unavailable => {
113                        // We were unable to run the task as it is (presumably) being run on
114                        // another thread. We re-queue the task for 'later' and do NOT set
115                        // `did_some_work = true`. That way we will keep looping and doing work
116                        // until all remaining work is 'unavailable', at which point we will bail
117                        // out of the loop, leaving the queued work to be finished by another thread.
118                        // This strategy should avoid dropping work or busy-looping
119                        // FIXME: are we potentially sending ourselves `Unavailable` and reading it
120                        // in a loop - busy looping here?
121                        self.ready_sender.send(task_id).expect("could not requeue");
122                    }
123                    RunTask::Missing => {
124                        // This is possible if a naughty future sends a wake notification while
125                        // still running, then runs to completion and is evicted from the slab.
126                        // Nothing to be done.
127                    }
128                    RunTask::Suspended | RunTask::Completed => did_some_work = true,
129                }
130            }
131        }
132    }
133
134    fn run_task(&self, task_id: TaskId) -> RunTask {
135        let mut lock = self.tasks.lock().expect("Task slab poisoned");
136        let Some(task) = lock.get_mut(*task_id as usize) else {
137            return RunTask::Missing;
138        };
139        let Some(mut task) = task.take() else {
140            // the slot exists but the task is missing - presumably it
141            // is being executed on another thread
142            return RunTask::Unavailable;
143        };
144
145        // free the mutex so other threads can make progress
146        drop(lock);
147
148        let waker = Arc::new(TaskWaker {
149            task_id,
150            sender: self.ready_sender.clone(),
151        })
152        .into();
153        let context = &mut Context::from_waker(&waker);
154
155        // poll the task
156        if task.as_mut().poll(context).is_pending() {
157            // If it's still pending, put the future back in the slot
158            self.tasks
159                .lock()
160                .expect("Task slab poisoned")
161                .get_mut(*task_id as usize)
162                .expect("Task slot is missing")
163                .replace(task);
164            RunTask::Suspended
165        } else {
166            // otherwise the future is completed and we can free the slot
167            self.tasks.lock().unwrap().remove(*task_id as usize);
168            RunTask::Completed
169        }
170    }
171}
172
173enum RunTask {
174    Missing,
175    Unavailable,
176    Suspended,
177    Completed,
178}
179
180// ANCHOR_END: run_all
181
182#[cfg(test)]
183mod tests {
184
185    use rand::Rng;
186    use std::{
187        sync::atomic::{AtomicI32, Ordering},
188        task::Poll,
189    };
190
191    use super::*;
192    use crate::capability::shell_request::ShellRequest;
193
194    #[test]
195    fn test_task_does_not_leak() {
196        // Arc is a convenient RAII counter
197        let counter = Arc::new(());
198        assert_eq!(Arc::strong_count(&counter), 1);
199
200        let (executor, spawner) = executor_and_spawner();
201
202        let future = {
203            let counter = counter.clone();
204            async move {
205                assert_eq!(Arc::strong_count(&counter), 2);
206                ShellRequest::<()>::new().await;
207            }
208        };
209
210        spawner.spawn(future);
211        executor.run_all();
212        drop(executor);
213        drop(spawner);
214        assert_eq!(Arc::strong_count(&counter), 1);
215    }
216
217    #[test]
218    fn test_multithreaded_executor() {
219        // We define a future which chaotically sends notifications to wake up the task
220        // The future has a random chance to suspend or to defer to its children which
221        // may also suspend. However it will ultimately resolve to `Ready` and once it
222        // has done so will stay finished
223        struct Chaotic {
224            ready_once: bool,
225            children: Vec<Chaotic>,
226        }
227
228        static CHAOS_COUNT: AtomicI32 = AtomicI32::new(0);
229
230        impl Chaotic {
231            fn new_with_children(num_children: usize) -> Self {
232                CHAOS_COUNT.fetch_add(1, Ordering::SeqCst);
233                Self {
234                    ready_once: false,
235                    children: (0..num_children)
236                        .map(|_| Chaotic::new_with_children(num_children - 1))
237                        .collect(),
238                }
239            }
240        }
241
242        impl Future for Chaotic {
243            type Output = ();
244
245            fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
246                // once we're done, we're done
247                if self.ready_once {
248                    return Poll::Ready(());
249                }
250                if rand::rng().random_bool(0.1) {
251                    cx.waker().wake_by_ref();
252
253                    Poll::Pending
254                } else {
255                    let mut ready = true;
256                    let this = self.get_mut();
257                    for child in &mut this.children {
258                        if child.poll_unpin(cx).is_pending() {
259                            ready = false;
260                        }
261                    }
262                    if ready {
263                        this.ready_once = true;
264                        // throw a wake in for extra chaos
265                        cx.waker().wake_by_ref();
266                        CHAOS_COUNT.fetch_sub(1, Ordering::SeqCst);
267                        Poll::Ready(())
268                    } else {
269                        Poll::Pending
270                    }
271                }
272            }
273        }
274
275        let (executor, spawner) = executor_and_spawner();
276        // 100 futures with many (1957) children each equals lots of chaos
277        for _ in 0..100 {
278            let future = Chaotic::new_with_children(6);
279            spawner.spawn(future);
280        }
281        assert_eq!(CHAOS_COUNT.load(Ordering::SeqCst), 195700);
282        let executor = Arc::new(executor);
283        assert_eq!(executor.spawn_queue.len(), 100);
284
285        // Spawn 10 threads and run all
286        let handles = (0..10)
287            .map(|_| {
288                let executor = executor.clone();
289                std::thread::spawn(move || {
290                    executor.run_all();
291                })
292            })
293            .collect::<Vec<_>>();
294        for handle in handles {
295            handle.join().unwrap();
296        }
297        // nothing left in queue, all futures resolved
298        assert_eq!(executor.spawn_queue.len(), 0);
299        assert_eq!(executor.ready_queue.len(), 0);
300        assert_eq!(CHAOS_COUNT.load(Ordering::SeqCst), 0);
301    }
302}