crux_core/command/
executor.rs1use super::super::Command;
2
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::Ordering;
6use std::task::{Context, Poll, Wake, Waker};
7
8use crossbeam_channel::{Receiver, Sender};
9
10use futures::future::BoxFuture;
11
12use std::sync::atomic::AtomicBool;
13
14use futures::task::AtomicWaker;
15
16use std::sync::Arc;
17
18#[derive(Clone, Copy, Debug, PartialEq)]
19pub(crate) struct TaskId(pub(crate) usize);
20
21pub(crate) struct Task {
22 pub(crate) join_handle_wakers: Receiver<Waker>,
24 pub(crate) finished: Arc<AtomicBool>,
27 pub(crate) aborted: Arc<AtomicBool>,
30 pub(crate) future: BoxFuture<'static, ()>,
32}
33
34impl Task {
35 pub(crate) fn is_aborted(&self) -> bool {
36 self.aborted.load(Ordering::Acquire)
37 }
38
39 fn wake_join_handles(&self) {
40 for waker in self.join_handle_wakers.try_iter() {
41 waker.wake();
44 }
45 }
46}
47
48pub(crate) struct CommandWaker {
53 pub(crate) task_id: TaskId,
54 pub(crate) ready_queue: Sender<TaskId>,
55 pub(crate) parent_waker: Arc<AtomicWaker>,
59 woken: AtomicBool,
60}
61
62impl Wake for CommandWaker {
63 fn wake(self: Arc<Self>) {
64 self.wake_by_ref();
65 }
66
67 fn wake_by_ref(self: &Arc<Self>) {
68 let _ = self.ready_queue.send(self.task_id);
73 self.woken.store(true, Ordering::Release);
74
75 self.parent_waker.wake();
77 }
78}
79
80#[derive(Clone)]
82pub struct AbortHandle {
83 pub(crate) aborted: Arc<AtomicBool>,
84}
85
86impl AbortHandle {
87 pub fn abort(&self) {
93 self.aborted.store(true, Ordering::Release);
94 }
95}
96
97#[derive(Clone)]
99pub struct JoinHandle {
100 pub(crate) register_waker: Sender<Waker>,
101 pub(crate) finished: Arc<AtomicBool>,
102 pub(crate) aborted: Arc<AtomicBool>,
103}
104
105impl JoinHandle {
108 pub fn abort(&self) {
114 self.aborted.store(true, Ordering::Release);
115 }
116
117 pub(crate) fn is_finished(&self) -> bool {
118 self.finished.load(Ordering::Acquire)
119 }
120}
121
122impl Future for JoinHandle {
123 type Output = ();
124
125 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126 if self.is_finished() {
127 Poll::Ready(())
128 } else {
129 match self.register_waker.send(cx.waker().clone()) {
130 Ok(_) => Poll::Pending,
131 Err(_) => Poll::Ready(()),
133 }
134 }
135 }
136}
137
138#[derive(Debug, PartialEq)]
139pub(crate) enum TaskState {
140 Missing,
141 Suspended,
142 Completed,
143 Cancelled,
144}
145
146impl<Effect, Event> Command<Effect, Event> {
148 pub(crate) fn run_until_settled(&mut self) {
150 if self.was_aborted() {
151 self.tasks.clear();
152
153 return;
154 }
155
156 loop {
157 self.spawn_new_tasks();
158
159 if self.ready_queue.is_empty() {
160 break;
161 }
162
163 while let Ok(task_id) = self.ready_queue.try_recv() {
164 match self.run_task(task_id) {
165 TaskState::Missing => {
166 }
170 TaskState::Suspended => {
171 }
173 TaskState::Completed | TaskState::Cancelled => {
174 let task = self.tasks.remove(task_id.0);
176
177 task.finished.store(true, Ordering::Release);
178 task.wake_join_handles();
179
180 drop(task);
181 }
182 };
183 }
184 }
185 }
186
187 pub(crate) fn run_task(&mut self, task_id: TaskId) -> TaskState {
188 let Some(task) = self.tasks.get_mut(task_id.0) else {
189 return TaskState::Missing;
190 };
191
192 if task.is_aborted() {
193 return TaskState::Completed;
194 }
195
196 let ready_queue = self.ready_sender.clone();
197 let parent_waker = self.waker.clone();
198
199 let arc_waker = Arc::new(CommandWaker {
200 task_id,
201 ready_queue,
202 parent_waker,
203 woken: AtomicBool::new(false),
204 });
205
206 let waker = arc_waker.clone().into();
207 let context = &mut Context::from_waker(&waker);
208
209 let result = match task.future.as_mut().poll(context) {
210 Poll::Pending => TaskState::Suspended,
211 Poll::Ready(_) => TaskState::Completed,
212 };
213
214 drop(waker);
215
216 let task_is_ready = arc_waker.woken.load(Ordering::Acquire);
223 if result == TaskState::Suspended && !task_is_ready && Arc::strong_count(&arc_waker) < 2 {
224 return TaskState::Cancelled;
225 }
226
227 result
228 }
229
230 pub(crate) fn spawn_new_tasks(&mut self) {
231 while let Ok(task) = self.spawn_queue.try_recv() {
232 let task_id = self.tasks.insert(task);
233
234 self.ready_sender
235 .send(TaskId(task_id))
236 .expect("Command can't spawn a task, ready_queue has disconnected");
237 }
238 }
239
240 pub fn was_aborted(&self) -> bool {
241 self.aborted.load(Ordering::Acquire)
242 }
243}