crux_core/command/
context.rs1use std::future::Future;
2use std::pin::{pin, Pin};
3
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use crossbeam_channel::Sender;
8use futures::channel::mpsc;
9use futures::future::Fuse;
10use futures::stream::StreamFuture;
11use futures::{FutureExt as _, Stream, StreamExt};
12
13use crate::capability::Operation;
14use crate::Request;
15
16use super::executor::{JoinHandle, Task};
17
18pub struct CommandContext<Effect, Event> {
21 pub(crate) effects: Sender<Effect>,
22 pub(crate) events: Sender<Event>,
23 pub(crate) tasks: Sender<Task>,
24}
25
26impl<Effect, Event> Clone for CommandContext<Effect, Event> {
28 fn clone(&self) -> Self {
29 Self {
30 effects: self.effects.clone(),
31 events: self.events.clone(),
32 tasks: self.tasks.clone(),
33 }
34 }
35}
36
37impl<Effect, Event> CommandContext<Effect, Event> {
38 #[allow(clippy::missing_panics_doc)]
40 pub fn notify_shell<Op>(&self, operation: Op)
41 where
42 Op: Operation,
43 Effect: From<Request<Op>>,
44 {
45 let request = Request::resolves_never(operation);
46
47 self.effects
48 .send(request.into())
49 .expect("Command could not send notification, effect channel disconnected");
50 }
51
52 #[allow(clippy::missing_panics_doc)]
55 pub fn request_from_shell<Op>(&self, operation: Op) -> ShellRequest<Op::Output>
56 where
57 Op: Operation,
58 Effect: From<Request<Op>> + Send + 'static,
59 {
60 let (output_sender, output_receiver) = mpsc::unbounded();
61
62 let request = Request::resolves_once(operation, move |output| {
63 let _ = output_sender.unbounded_send(output);
65 });
66
67 let send_request = {
68 let effect = request.into();
69 let effects = self.effects.clone();
70 move || {
71 effects
72 .send(effect)
73 .expect("Command could not send request effect, effect channel disconnected");
74 }
75 };
76
77 ShellRequest::new(Box::new(send_request), output_receiver)
78 }
79
80 #[allow(clippy::missing_panics_doc)]
83 pub fn stream_from_shell<Op>(&self, operation: Op) -> ShellStream<Op::Output>
84 where
85 Op: Operation,
86 Effect: From<Request<Op>> + Send + 'static,
87 {
88 let (output_sender, output_receiver) = mpsc::unbounded();
89
90 let request = Request::resolves_many_times(operation, move |output| {
91 output_sender.unbounded_send(output).map_err(|_| ())?;
92
93 Ok(())
95 });
96
97 let send_request = {
98 let effect = request.into();
99 let effects = self.effects.clone();
100 move || {
101 effects
102 .send(effect)
103 .expect("Command could not send stream effect, effect channel disconnected");
104 }
105 };
106
107 ShellStream::new(send_request, output_receiver)
108 }
109
110 #[allow(clippy::missing_panics_doc)]
113 pub fn send_event(&self, event: Event) {
114 self.events
115 .send(event)
116 .expect("Command could not send event, event channel disconnected");
117 }
118
119 #[allow(clippy::missing_panics_doc)]
125 pub fn spawn<F, Fut>(&self, make_future: F) -> JoinHandle
126 where
127 F: FnOnce(CommandContext<Effect, Event>) -> Fut,
128 Fut: Future<Output = ()> + Send + 'static,
129 {
130 let (sender, receiver) = crossbeam_channel::unbounded();
131
132 let ctx = self.clone();
133 let future = make_future(ctx);
134
135 let task = Task {
136 finished: Arc::default(),
137 aborted: Arc::default(),
138 future: future.boxed(),
139 join_handle_wakers: receiver,
140 };
141
142 let handle = JoinHandle {
143 finished: task.finished.clone(),
144 aborted: task.aborted.clone(),
145 register_waker: sender,
146 };
147
148 self.tasks
149 .send(task)
150 .expect("Command could not spawn task, tasks channel disconnected");
151
152 handle
153 }
154}
155
156pub enum ShellStream<T: Unpin + Send> {
157 ReadyToSend(Box<dyn FnOnce() + Send>, mpsc::UnboundedReceiver<T>),
158 Sent(mpsc::UnboundedReceiver<T>),
159}
160
161impl<T: Unpin + Send> ShellStream<T> {
162 fn new(
163 send_request: impl FnOnce() + Send + 'static,
164 output_receiver: mpsc::UnboundedReceiver<T>,
165 ) -> Self {
166 ShellStream::ReadyToSend(Box::new(send_request), output_receiver)
167 }
168
169 fn send(&mut self) {
170 let dummy = ShellStream::Sent(mpsc::unbounded().1);
174 let ShellStream::ReadyToSend(send_request, output_receiver) =
175 std::mem::replace(self, dummy)
176 else {
177 unreachable!();
178 };
179
180 *self = ShellStream::Sent(output_receiver);
182
183 send_request();
184 }
185}
186
187impl<T: Unpin + Send> Stream for ShellStream<T> {
188 type Item = T;
189
190 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
191 match *self {
192 ShellStream::ReadyToSend(_, ref mut output_receiver) => {
193 let poll = pin!(output_receiver).poll_next(cx);
194 assert!(matches!(poll, Poll::Pending)); self.send();
197
198 Poll::Pending
199 }
200 ShellStream::Sent(ref mut output_receiver) => pin!(output_receiver).poll_next(cx),
201 }
202 }
203}
204
205pub struct ShellRequest<T: Unpin + Send> {
206 inner: Fuse<StreamFuture<ShellStream<T>>>,
207}
208
209impl<T: Unpin + Send + 'static> ShellRequest<T> {
210 fn new(
211 send_request: impl FnOnce() + Send + 'static,
212 output_receiver: mpsc::UnboundedReceiver<T>,
213 ) -> Self {
214 let inner = ShellStream::new(send_request, output_receiver)
215 .into_future()
216 .fuse();
217
218 Self { inner }
219 }
220}
221
222impl<T: Unpin + Send> Future for ShellRequest<T> {
223 type Output = T;
224
225 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
226 match self.inner.poll_unpin(cx) {
227 Poll::Ready((Some(output), _rest)) => Poll::Ready(output),
228 Poll::Ready((None, _rest)) => Poll::Pending,
229 Poll::Pending => Poll::Pending,
230 }
231 }
232}