crux_core/command/
context.rsuse std::future::Future;
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use crossbeam_channel::Sender;
use futures::channel::mpsc;
use futures::stream::StreamFuture;
use futures::{FutureExt as _, Stream, StreamExt};
use crate::capability::Operation;
use crate::Request;
use super::executor::{JoinHandle, Task};
pub struct CommandContext<Effect, Event> {
pub(crate) effects: Sender<Effect>,
pub(crate) events: Sender<Event>,
pub(crate) tasks: Sender<Task>,
}
impl<Effect, Event> Clone for CommandContext<Effect, Event> {
fn clone(&self) -> Self {
Self {
effects: self.effects.clone(),
events: self.events.clone(),
tasks: self.tasks.clone(),
}
}
}
impl<Effect, Event> CommandContext<Effect, Event> {
pub fn notify_shell<Op>(&self, operation: Op)
where
Op: Operation,
Effect: From<Request<Op>>,
{
let request = Request::resolves_never(operation);
self.effects
.send(request.into())
.expect("Command could not send notification, effect channel disconnected");
}
pub fn request_from_shell<Op>(&self, operation: Op) -> ShellRequest<Op::Output>
where
Op: Operation,
Effect: From<Request<Op>> + Send + 'static,
{
let (output_sender, output_receiver) = mpsc::unbounded();
let request = Request::resolves_once(operation, move |output| {
let _ = output_sender.unbounded_send(output);
});
let send_request = {
let effect = request.into();
let effects = self.effects.clone();
move || {
effects
.send(effect)
.expect("Command could not send request effect, effect channel disconnected")
}
};
ShellRequest::new(Box::new(send_request), output_receiver)
}
pub fn stream_from_shell<Op>(&self, operation: Op) -> ShellStream<Op::Output>
where
Op: Operation,
Effect: From<Request<Op>> + Send + 'static,
{
let (output_sender, output_receiver) = mpsc::unbounded();
let request = Request::resolves_many_times(operation, move |output| {
output_sender.unbounded_send(output).map_err(|_| ())?;
Ok(())
});
let send_request = {
let effect = request.into();
let effects = self.effects.clone();
move || {
effects
.send(effect)
.expect("Command could not send stream effect, effect channel disconnected")
}
};
ShellStream::new(send_request, output_receiver)
}
pub fn send_event(&self, event: Event) {
self.events
.send(event)
.expect("Command could not send event, event channel disconnected")
}
pub fn spawn<F, Fut>(&self, make_future: F) -> JoinHandle
where
F: FnOnce(CommandContext<Effect, Event>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let (sender, receiver) = crossbeam_channel::unbounded();
let ctx = self.clone();
let future = make_future(ctx);
let task = Task {
finished: Default::default(),
aborted: Default::default(),
future: future.boxed(),
join_handle_wakers: receiver,
};
let handle = JoinHandle {
finished: task.finished.clone(),
aborted: task.aborted.clone(),
register_waker: sender,
};
self.tasks
.send(task)
.expect("Command could not spawn task, tasks channel disconnected");
handle
}
}
pub enum ShellStream<T: Unpin + Send> {
ReadyToSend(Box<dyn FnOnce() + Send>, mpsc::UnboundedReceiver<T>),
Sent(mpsc::UnboundedReceiver<T>),
}
impl<T: Unpin + Send> ShellStream<T> {
fn new(
send_request: impl FnOnce() + Send + 'static,
output_receiver: mpsc::UnboundedReceiver<T>,
) -> Self {
ShellStream::ReadyToSend(Box::new(send_request), output_receiver)
}
fn send(&mut self) {
let ShellStream::ReadyToSend(send_request, output_receiver) =
std::mem::replace(self, ShellStream::Sent(mpsc::unbounded().1))
else {
unreachable!();
};
*self = ShellStream::Sent(output_receiver);
send_request()
}
}
impl<T: Unpin + Send> Stream for ShellStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match *self {
ShellStream::ReadyToSend(_, ref mut output_receiver) => {
let poll = pin!(output_receiver).poll_next(cx);
assert!(matches!(poll, Poll::Pending)); self.send();
Poll::Pending
}
ShellStream::Sent(ref mut output_receiver) => pin!(output_receiver).poll_next(cx),
}
}
}
pub struct ShellRequest<T: Unpin + Send> {
inner: StreamFuture<ShellStream<T>>,
}
impl<T: Unpin + Send + 'static> ShellRequest<T> {
fn new(
send_request: impl FnOnce() + Send + 'static,
output_receiver: mpsc::UnboundedReceiver<T>,
) -> Self {
let inner = ShellStream::new(send_request, output_receiver).into_future();
Self { inner }
}
}
impl<T: Unpin + Send> Future for ShellRequest<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.poll_unpin(cx) {
Poll::Ready((Some(output), _rest)) => Poll::Ready(output),
Poll::Ready((None, _rest)) => Poll::Pending,
Poll::Pending => Poll::Pending,
}
}
}