crux_core/command/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
//! Command represents one or more side-effects, resulting in interactions with the shell.
//! Core creates Commands and returns them from the `update` function in response to events.
//! Commands can be created directly, but more often they will be created and returned
//! by capability APIs.
//!
//! A Command can execute side-effects in parallel, in sequence or a combination of both. To
//! allow this orchestration they provide both a simple synchronous API and access to an
//! asynchronous API.
//!
//! Command surfaces the effect requests and events sent in response with
//! the [`Command::effects`] and [`Command::events`] methods. These can be used when testing
//! the side effects requested by an `update` call.
//!
//! Internally, Command resembles [`FuturesUnordered`](futures::stream::FuturesUnordered):
//! it manages and polls a number of futures and provides a context which they can use
//! to submit effects to the shell and events back to the application.
//!
//! Command implements [`Stream`](futures::Stream), making it useful in an async context,
//! enabling, for example, wrapping Commands in one another.
//!
//! # Examples
//!
//! TODO: simple command example with a capability API
//!
//! TODO: complex example with sync API
//!
//! TODO: basic async example
//!
//! TODO: async example with `spawn`
//!
//! TODO: cancellation example
//!
//! TODO: testing example
//!
//! TODO: composition example

mod builder;
mod context;
mod executor;
mod stream;

use std::future::Future;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

// TODO: consider switching to flume
use crossbeam_channel::{Receiver, Sender};
use executor::{AbortHandle, Task, TaskId};
use futures::task::AtomicWaker;
use futures::{FutureExt as _, Stream, StreamExt as _};
use slab::Slab;
use stream::CommandStreamExt as _;

pub use stream::CommandOutput;

use crate::capability::Operation;
use crate::Request;

pub struct Command<Effect, Event> {
    effects: Receiver<Effect>,
    events: Receiver<Event>,

    // Executor internals
    // TODO: should this be a separate type?
    ready_queue: Receiver<TaskId>,
    spawn_queue: Receiver<Task>,
    tasks: Slab<Task>,
    ready_sender: Sender<TaskId>, // Used in creating wakers for tasks
    waker: Arc<AtomicWaker>,      // Shared with task wakers when polled in async context

    // Signaling
    aborted: Arc<AtomicBool>,
}

// Public API

impl<Effect, Event> Command<Effect, Event>
where
    Effect: Send + 'static,
    Event: Send + 'static,
{
    /// Create a new command orchestrating effects with async Rust. This is the lowest level
    /// API to create a Command if you need full control over its execution. In most cases you will
    /// more likely want to create Commands with capabilities, and using the combinator APIs
    /// ([`then`], [`and`] and [`all`]) to orchestrate them.
    ///
    /// The `create_task` closure receives a [`CommandContext`] that it can use to send shell requests,
    /// events back to the app, and to spawn additional tasks. The closure is expected to return a future
    /// which becomes the command's main asynchronous task.
    pub fn new<F, Fut>(create_task: F) -> Self
    where
        F: FnOnce(context::CommandContext<Effect, Event>) -> Fut,
        Fut: Future<Output = ()> + Send + 'static,
    {
        // RFC: do we need to think about backpressure? The channels are unbounded
        // so a naughty Command can make massive amounts of requests or spawn a huge number of tasks.
        // If these channels supported async, the CommandContext methods could also be async and
        // we could give the channels some bounds
        let (effect_sender, effect_receiver) = crossbeam_channel::unbounded();
        let (event_sender, event_receiver) = crossbeam_channel::unbounded();
        let (ready_sender, ready_receiver) = crossbeam_channel::unbounded();
        let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded();
        let (_, waker_receiver) = crossbeam_channel::unbounded();

        let context = context::CommandContext {
            effects: effect_sender,
            events: event_sender,
            tasks: spawn_sender,
        };

        let aborted: Arc<AtomicBool> = Default::default();
        let task = Task {
            finished: Default::default(),
            aborted: aborted.clone(),
            future: create_task(context).boxed(),
            join_handle_wakers: waker_receiver,
        };

        let mut tasks = Slab::with_capacity(1);
        let task_id = TaskId(tasks.insert(task));

        ready_sender
            .send(task_id)
            .expect("Could not make task ready, ready channel disconnected");

        Command {
            effects: effect_receiver,
            events: event_receiver,
            ready_queue: ready_receiver,
            spawn_queue: spawn_receiver,
            ready_sender,
            tasks,
            waker: Default::default(),
            aborted,
        }
    }

    /// Create an empty, completed Command. This is useful as a return value from `update` if
    /// there are no side-effects to perform.
    pub fn done() -> Self {
        let (_, effects) = crossbeam_channel::bounded(0);
        let (_, events) = crossbeam_channel::bounded(0);
        let (_, spawn_queue) = crossbeam_channel::bounded(0);
        let (ready_sender, ready_queue) = crossbeam_channel::bounded(0);

        Command {
            effects,
            events,
            ready_queue,
            spawn_queue,
            tasks: Slab::with_capacity(0),
            ready_sender,
            waker: Default::default(),
            aborted: Default::default(),
        }
    }

    /// Create a Command which dispatches an event and terminates. This is an alternative
    /// to calling `update` recursively. The only difference is that the two `update` calls
    /// will be visible to Crux and can show up in logs or any tooling. The trade-off is that
    /// the event is not guaranteed to dispatch instantly - another `update` call which is
    /// already scheduled may happen first.
    pub fn event(event: Event) -> Self {
        Command::new(|ctx| async move { ctx.send_event(event) })
    }

    /// Create a Command which sends a notification to the shell with a provided `operation`.
    ///
    /// This ia synchronous equivalent of [`CommandContext::notify_shell`].
    pub fn notify_shell<Op>(operation: Op) -> Command<Effect, Event>
    where
        Op: Operation,
        Effect: From<Request<Op>>,
    {
        Command::new(|ctx| async move { ctx.notify_shell(operation) })
    }

    /// Start a creation of a Command which sends a one-time request to the shell with a provided
    /// operation.
    ///
    /// Returns a `RequestBuilder`, which can be converted into a Command directly, or chained
    /// with another command builder using `.then`.
    ///
    /// In an async context, `RequestBuilder` can be turned into a future that resolves to the
    /// operation output type.
    pub fn request_from_shell<Op>(
        operation: Op,
    ) -> builder::RequestBuilder<Effect, Event, impl Future<Output = Op::Output>>
    where
        Op: Operation,
        Effect: From<Request<Op>>,
    {
        builder::RequestBuilder::new(|ctx| ctx.request_from_shell(operation))
    }

    /// Start a creation of a Command which sends a stream request to the shell with a provided
    /// operation.
    ///
    /// Returns a `StreamBuilder`, which can be converted into a Command directly, or chained
    /// with a `RequestBuilder` builder using `.then`.
    ///
    /// In an async context, `StreamBuilder` can be turned into a stream that with the
    /// operation output type as item.
    pub fn stream_from_shell<Op>(
        operation: Op,
    ) -> builder::StreamBuilder<Effect, Event, impl Stream<Item = Op::Output>>
    where
        Op: Operation,
        Effect: From<Request<Op>>,
    {
        builder::StreamBuilder::new(|ctx| ctx.stream_from_shell(operation))
    }

    /// Run the effect state machine until it settles, then return true
    /// if there is any more work to do - tasks to run or events or effects to receive
    pub fn is_done(&mut self) -> bool {
        self.run_until_settled();

        self.effects.is_empty() && self.events.is_empty() && self.tasks.is_empty()
    }

    /// Run the effect state machine until it settles and collect all effects generated
    pub fn effects(&mut self) -> impl Iterator<Item = Effect> + '_ {
        self.run_until_settled();

        self.effects.try_iter()
    }

    /// Run the effect state machine until it settles and collect all events generated
    pub fn events(&mut self) -> impl Iterator<Item = Event> + '_ {
        self.run_until_settled();

        self.events.try_iter()
    }

    // Combinators

    /// Create a command running self and the other command in sequence
    // RFC: is this actually _useful_? Unlike `.then` on `CommandBuilder` this doesn't allow using
    // the output of the first command in building the second one, it just runs them in sequence,
    // and the benefit is unclear.
    pub fn then(self, other: Self) -> Self
    where
        Effect: Unpin,
        Event: Unpin,
    {
        Command::new(|ctx| async move {
            // first run self until done
            self.host(ctx.effects.clone(), ctx.events.clone()).await;

            // then run other until done
            other.host(ctx.effects, ctx.events).await;
        })
    }

    /// Convenience for [`Command::all`] which runs another command concurrently with this one
    pub fn and(self, other: Self) -> Self
    where
        Effect: Unpin,
        Event: Unpin,
    {
        Command::all([self, other])
    }

    /// Create a command running a number of commands concurrently
    pub fn all<I>(commands: I) -> Self
    where
        I: IntoIterator<Item = Self> + Send + 'static,
        Effect: Unpin,
        Event: Unpin,
    {
        Command::new(|ctx| async move {
            let select = futures::stream::select_all(commands);

            select.host(ctx.effects, ctx.events).await;
        })
    }

    // Mapping for composition

    /// Map effects requested as part of this command to a different effect type.
    ///
    /// This is useful when composing apps to convert a command from a child app to a
    /// command of the parent app.
    pub fn map_effect<F, NewEffect>(self, map: F) -> Command<NewEffect, Event>
    where
        F: Fn(Effect) -> NewEffect + Send + Sync + 'static,
        NewEffect: Send + Unpin + 'static,
        Effect: Unpin,
        Event: Unpin,
    {
        Command::new(|ctx| async move {
            let mapped = self.map(|output| match output {
                CommandOutput::Effect(effect) => CommandOutput::Effect(map(effect)),
                CommandOutput::Event(event) => CommandOutput::Event(event),
            });

            mapped.host(ctx.effects, ctx.events).await;
        })
    }

    /// Map events sent as part of this command to a different effect type
    ///
    /// This is useful when composing apps to convert a command from a child app to a
    /// command of the parent app.
    pub fn map_event<F, NewEvent>(self, map: F) -> Command<Effect, NewEvent>
    where
        F: Fn(Event) -> NewEvent + Send + Sync + 'static,
        NewEvent: Send + Unpin + 'static,
        Effect: Unpin,
        Event: Unpin,
    {
        Command::new(|ctx| async move {
            let mapped = self.map(|output| match output {
                CommandOutput::Effect(effect) => CommandOutput::Effect(effect),
                CommandOutput::Event(event) => CommandOutput::Event(map(event)),
            });

            mapped.host(ctx.effects, ctx.events).await;
        })
    }

    /// Returns an abort handle which can be used to remotely terminate a running Command
    /// and all its subtask.
    ///
    /// This is specifically useful for cancelling subscriptions and long running effects
    /// which may get superseded, like timers
    pub fn abort_handle(&self) -> AbortHandle {
        AbortHandle {
            aborted: self.aborted.clone(),
        }
    }
}

#[cfg(test)]
mod tests;