crux_core/command/
context.rs1use std::future::Future;
2use std::pin::{pin, Pin};
3
4use std::task::{Context, Poll};
5
6use crossbeam_channel::Sender;
7use futures::channel::mpsc;
8use futures::future::Fuse;
9use futures::stream::StreamFuture;
10use futures::{FutureExt as _, Stream, StreamExt};
11
12use crate::capability::Operation;
13use crate::Request;
14
15use super::executor::{JoinHandle, Task};
16
17pub struct CommandContext<Effect, Event> {
20 pub(crate) effects: Sender<Effect>,
21 pub(crate) events: Sender<Event>,
22 pub(crate) tasks: Sender<Task>,
23}
24
25impl<Effect, Event> Clone for CommandContext<Effect, Event> {
27 fn clone(&self) -> Self {
28 Self {
29 effects: self.effects.clone(),
30 events: self.events.clone(),
31 tasks: self.tasks.clone(),
32 }
33 }
34}
35
36impl<Effect, Event> CommandContext<Effect, Event> {
37 pub fn notify_shell<Op>(&self, operation: Op)
39 where
40 Op: Operation,
41 Effect: From<Request<Op>>,
42 {
43 let request = Request::resolves_never(operation);
44
45 self.effects
46 .send(request.into())
47 .expect("Command could not send notification, effect channel disconnected");
48 }
49
50 pub fn request_from_shell<Op>(&self, operation: Op) -> ShellRequest<Op::Output>
53 where
54 Op: Operation,
55 Effect: From<Request<Op>> + Send + 'static,
56 {
57 let (output_sender, output_receiver) = mpsc::unbounded();
58
59 let request = Request::resolves_once(operation, move |output| {
60 let _ = output_sender.unbounded_send(output);
62 });
63
64 let send_request = {
65 let effect = request.into();
66 let effects = self.effects.clone();
67 move || {
68 effects
69 .send(effect)
70 .expect("Command could not send request effect, effect channel disconnected")
71 }
72 };
73
74 ShellRequest::new(Box::new(send_request), output_receiver)
75 }
76
77 pub fn stream_from_shell<Op>(&self, operation: Op) -> ShellStream<Op::Output>
80 where
81 Op: Operation,
82 Effect: From<Request<Op>> + Send + 'static,
83 {
84 let (output_sender, output_receiver) = mpsc::unbounded();
85
86 let request = Request::resolves_many_times(operation, move |output| {
87 output_sender.unbounded_send(output).map_err(|_| ())?;
88
89 Ok(())
91 });
92
93 let send_request = {
94 let effect = request.into();
95 let effects = self.effects.clone();
96 move || {
97 effects
98 .send(effect)
99 .expect("Command could not send stream effect, effect channel disconnected")
100 }
101 };
102
103 ShellStream::new(send_request, output_receiver)
104 }
105
106 pub fn send_event(&self, event: Event) {
109 self.events
110 .send(event)
111 .expect("Command could not send event, event channel disconnected")
112 }
113
114 pub fn spawn<F, Fut>(&self, make_future: F) -> JoinHandle
120 where
121 F: FnOnce(CommandContext<Effect, Event>) -> Fut,
122 Fut: Future<Output = ()> + Send + 'static,
123 {
124 let (sender, receiver) = crossbeam_channel::unbounded();
125
126 let ctx = self.clone();
127 let future = make_future(ctx);
128
129 let task = Task {
130 finished: Default::default(),
131 aborted: Default::default(),
132 future: future.boxed(),
133 join_handle_wakers: receiver,
134 };
135
136 let handle = JoinHandle {
137 finished: task.finished.clone(),
138 aborted: task.aborted.clone(),
139 register_waker: sender,
140 };
141
142 self.tasks
143 .send(task)
144 .expect("Command could not spawn task, tasks channel disconnected");
145
146 handle
147 }
148}
149
150pub enum ShellStream<T: Unpin + Send> {
151 ReadyToSend(Box<dyn FnOnce() + Send>, mpsc::UnboundedReceiver<T>),
152 Sent(mpsc::UnboundedReceiver<T>),
153}
154
155impl<T: Unpin + Send> ShellStream<T> {
156 fn new(
157 send_request: impl FnOnce() + Send + 'static,
158 output_receiver: mpsc::UnboundedReceiver<T>,
159 ) -> Self {
160 ShellStream::ReadyToSend(Box::new(send_request), output_receiver)
161 }
162
163 fn send(&mut self) {
164 let dummy = ShellStream::Sent(mpsc::unbounded().1);
168 let ShellStream::ReadyToSend(send_request, output_receiver) =
169 std::mem::replace(self, dummy)
170 else {
171 unreachable!();
172 };
173
174 *self = ShellStream::Sent(output_receiver);
176
177 send_request()
178 }
179}
180
181impl<T: Unpin + Send> Stream for ShellStream<T> {
182 type Item = T;
183
184 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185 match *self {
186 ShellStream::ReadyToSend(_, ref mut output_receiver) => {
187 let poll = pin!(output_receiver).poll_next(cx);
188 assert!(matches!(poll, Poll::Pending)); self.send();
191
192 Poll::Pending
193 }
194 ShellStream::Sent(ref mut output_receiver) => pin!(output_receiver).poll_next(cx),
195 }
196 }
197}
198
199pub struct ShellRequest<T: Unpin + Send> {
200 inner: Fuse<StreamFuture<ShellStream<T>>>,
201}
202
203impl<T: Unpin + Send + 'static> ShellRequest<T> {
204 fn new(
205 send_request: impl FnOnce() + Send + 'static,
206 output_receiver: mpsc::UnboundedReceiver<T>,
207 ) -> Self {
208 let inner = ShellStream::new(send_request, output_receiver)
209 .into_future()
210 .fuse();
211
212 Self { inner }
213 }
214}
215
216impl<T: Unpin + Send> Future for ShellRequest<T> {
217 type Output = T;
218
219 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
220 match self.inner.poll_unpin(cx) {
221 Poll::Ready((Some(output), _rest)) => Poll::Ready(output),
222 Poll::Ready((None, _rest)) => Poll::Pending,
223 Poll::Pending => Poll::Pending,
224 }
225 }
226}