crux_time/
command.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
use std::{
    future::Future,
    marker::PhantomData,
    time::{Duration, SystemTime},
};

use crux_core::{command::RequestBuilder, Command, Request};
use futures::{
    channel::oneshot::{self, Sender},
    select_biased, FutureExt,
};

use crate::{get_timer_id, TimeRequest, TimeResponse, TimerId};

/// Result of the timer run. Timers can either run to completion or be cleared early.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum TimerOutcome {
    /// Timer completed successfully.
    Completed(CompletedTimerHandle),
    /// Timer was cleared early.
    Cleared,
}

pub struct Time<Effect, Event> {
    // Allow impl level trait bounds to avoid repetition
    effect: PhantomData<Effect>,
    event: PhantomData<Event>,
}

impl<Effect, Event> Time<Effect, Event>
where
    Effect: Send + From<Request<TimeRequest>> + 'static,
    Event: Send + 'static,
{
    /// Ask for the current wall-clock time.
    pub fn now() -> RequestBuilder<Effect, Event, impl Future<Output = SystemTime>> {
        Command::request_from_shell(TimeRequest::Now).map(|r| {
            let TimeResponse::Now { instant } = r else {
                panic!("Incorrect response received for TimeRequest::Now")
            };

            instant.into()
        })
    }

    /// Ask to receive a notification when the specified
    /// [`SystemTime`](std::time::SystemTime) has arrived.
    pub fn notify_at(
        system_time: SystemTime,
    ) -> (
        RequestBuilder<Effect, Event, impl Future<Output = TimerOutcome>>,
        TimerHandle,
    ) {
        let timer_id = get_timer_id();
        let (sender, mut receiver) = oneshot::channel();

        let handle = TimerHandle {
            timer_id,
            abort: sender,
        };

        let completed_handle = CompletedTimerHandle { timer_id };

        // The `panic`s in the body of the builder would be `unreachable`s in Rust,
        // but since the shell is involved we can't check for them statically. Either way,
        // they are a developer error and suggest something quite wrong with the time implementation
        // in the shell.
        let builder = RequestBuilder::new(move |ctx| {
            async move {
                if let Ok(Some(cleared_id)) = receiver.try_recv() {
                    if cleared_id == timer_id {
                        return TimerOutcome::Cleared;
                    }
                }

                select_biased! {
                    response = ctx.request_from_shell(
                        TimeRequest::NotifyAt {
                            id: timer_id,
                            instant: system_time.into()
                        }
                    ).fuse() =>  {
                        let TimeResponse::InstantArrived { id } = response else {
                            panic!("Unexpected response to TimeRequest::NotifyAt");
                        };

                        if id != timer_id {
                            panic!("InstantArrived with unexpected timer ID");
                        }

                        TimerOutcome::Completed(completed_handle)
                    },
                    cleared = receiver => {
                        // The Err variant would mean the sender was dropped,
                        // but `receiver` is a fused future,
                        // which signals `is_terminated` true in that case,
                        // so this branch of the select will
                        // never run for the Err case
                        let cleared_id = cleared.unwrap();

                        // Follow up by asking the shell to clear the timer
                        let TimeResponse::Cleared { id } = ctx.request_from_shell(TimeRequest::Clear { id: cleared_id }).await else {
                            panic!("Unexpected response to TimeRequest::Clear");
                        };

                        if id != cleared_id {
                            panic!("Cleared with unexpected timer ID");
                        }

                        TimerOutcome::Cleared
                    }
                }
            }
        });

        (builder, handle)
    }

    /// Ask to receive a notification after the specified
    /// [`Duration`](std::time::Duration) has elapsed.
    pub fn notify_after(
        duration: Duration,
    ) -> (
        RequestBuilder<Effect, Event, impl Future<Output = TimerOutcome>>,
        TimerHandle,
    ) {
        let timer_id = get_timer_id();
        let (sender, mut receiver) = oneshot::channel();

        let handle = TimerHandle {
            timer_id,
            abort: sender,
        };

        let completed_handle = CompletedTimerHandle { timer_id };

        let builder = RequestBuilder::new(move |ctx| async move {
            if let Ok(Some(cleared_id)) = receiver.try_recv() {
                if cleared_id == timer_id {
                    return TimerOutcome::Cleared;
                }
            }

            select_biased! {
                response = ctx.request_from_shell(
                    TimeRequest::NotifyAfter {
                        id: timer_id,
                        duration: duration.into()
                    }
                ).fuse() => {
                    let TimeResponse::DurationElapsed { id } = response else {
                        panic!("Unexpected response to TimeRequest::NotifyAt");
                    };

                    if id != timer_id {
                        panic!("InstantArrived with unexpected timer ID");
                    }

                    TimerOutcome::Completed(completed_handle)
                }
                cleared = receiver => {
                    // The Err variant would mean the sender was dropped,
                    // but `receiver` is a fused future,
                    // which signals `is_terminated` true in that case,
                    // so this branch of the select will
                    // never run for the Err case
                    let cleared_id = cleared.unwrap();
                    if cleared_id != timer_id {
                        unreachable!("Cleared with the wrong ID");
                    }

                    // Follow up by asking the shell to clear the timer
                    let TimeResponse::Cleared { id } = ctx.request_from_shell(TimeRequest::Clear { id: cleared_id }).await else {
                        panic!("Unexpected response to TimeRequest::Clear");
                    };

                    if id != cleared_id {
                        panic!("Cleared resolved with unexpected timer ID");
                    }

                    TimerOutcome::Cleared
                }
            }
        });

        (builder, handle)
    }
}

/// A handle to a requested timer. Allows the timer to be cleared. The handle is safe to drop,
/// in which case the original timer is no longer abortable
#[derive(Debug)]
pub struct TimerHandle {
    timer_id: TimerId,
    abort: Sender<TimerId>,
}

impl TimerHandle {
    /// Clear the associated timer request.
    /// The shell will be notified that the timer has been cleared
    /// with `TimeRequest::Clear { id }`,
    /// so it can clean up associated resources.
    /// The original task will resolve
    /// with `TimeResponse::Cleared { id }`.
    pub fn clear(self) {
        let _ = self.abort.send(self.timer_id);
    }
}

#[derive(Debug, PartialEq, Eq, Clone)]
pub struct CompletedTimerHandle {
    timer_id: TimerId,
}

impl Eq for TimerHandle {}

impl PartialEq for TimerHandle {
    fn eq(&self, other: &Self) -> bool {
        self.timer_id == other.timer_id
    }
}

impl PartialEq<CompletedTimerHandle> for TimerHandle {
    fn eq(&self, other: &CompletedTimerHandle) -> bool {
        self.timer_id == other.timer_id
    }
}

impl PartialEq<TimerHandle> for CompletedTimerHandle {
    fn eq(&self, other: &TimerHandle) -> bool {
        self.timer_id == other.timer_id
    }
}

impl From<TimerHandle> for CompletedTimerHandle {
    fn from(value: TimerHandle) -> Self {
        Self {
            timer_id: value.timer_id,
        }
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use crux_core::Request;

    use super::{Time, TimerOutcome};
    use crate::{TimeRequest, TimeResponse};

    enum Effect {
        Time(Request<TimeRequest>),
    }

    impl From<Request<TimeRequest>> for Effect {
        fn from(value: Request<TimeRequest>) -> Self {
            Self::Time(value)
        }
    }

    #[derive(Debug, PartialEq)]
    enum Event {
        Elapsed(TimerOutcome),
    }

    #[test]
    fn timer_can_be_cleared() {
        let (cmd, handle) = Time::notify_after(Duration::from_secs(2));
        let mut cmd = cmd.then_send(Event::Elapsed);

        let effect = cmd.effects().next();

        assert!(cmd.events().next().is_none());

        let Some(Effect::Time(_request)) = effect else {
            panic!("should get an effect");
        };

        handle.clear();

        let effect = cmd.effects().next();
        assert!(cmd.events().next().is_none());

        let Some(Effect::Time(mut request)) = effect else {
            panic!("should get an effect");
        };

        let TimeRequest::Clear { id } = request.operation else {
            panic!("expected a Clear request");
        };

        request
            .resolve(TimeResponse::Cleared { id })
            .expect("should resolve");

        let event = cmd.events().next();

        assert!(matches!(event, Some(Event::Elapsed(TimerOutcome::Cleared))));
    }

    #[test]
    fn dropping_a_timer_handle_does_not_clear_the_request() {
        let (cmd, handle) = Time::notify_after(Duration::from_secs(2));
        drop(handle);

        let mut cmd = cmd.then_send(Event::Elapsed);
        let effect = cmd.effects().next();

        assert!(cmd.events().next().is_none());

        let Some(Effect::Time(mut request)) = effect else {
            panic!("should get an effect");
        };

        let TimeRequest::NotifyAfter { id, .. } = request.operation else {
            panic!("Expected a NotifyAfter");
        };

        request
            .resolve(TimeResponse::DurationElapsed { id })
            .expect("should resolve");

        let event = cmd.events().next();

        assert!(matches!(
            event,
            Some(Event::Elapsed(TimerOutcome::Completed(_)))
        ));
    }

    #[test]
    fn dropping_a_timer_request_while_holding_a_handle_and_polling() {
        let (cmd, handle) = Time::notify_after(Duration::from_secs(2));
        let mut cmd = cmd.then_send(Event::Elapsed);

        let effect: Effect = cmd.effects().next().expect("Expected an effect!");

        drop(effect);
        assert!(!cmd.is_done());

        drop(handle);
        assert!(cmd.is_done());
    }
}