crux_time/
lib.rs

1#![deny(clippy::pedantic)]
2//! Current time access for Crux apps
3//!
4//! Current time (on a wall clock) is considered a side-effect (although if we were to get pedantic, it's
5//! more of a side-cause) by Crux, and has to be obtained externally. This capability provides a simple
6//! interface to do so.
7
8pub mod command;
9pub mod protocol;
10
11use std::{
12    collections::HashSet,
13    future::Future,
14    pin::Pin,
15    sync::{
16        atomic::{AtomicUsize, Ordering},
17        LazyLock, Mutex,
18    },
19    task::Poll,
20    time::SystemTime,
21};
22
23use crux_core::capability::CapabilityContext;
24
25pub use protocol::{duration::Duration, instant::Instant, TimeRequest, TimeResponse, TimerId};
26
27fn get_timer_id() -> TimerId {
28    static COUNTER: AtomicUsize = AtomicUsize::new(1);
29    TimerId(COUNTER.fetch_add(1, Ordering::Relaxed))
30}
31
32/// The Time capability API
33///
34/// This capability provides access to the current time and allows the app to ask for
35/// notifications when a specific instant has arrived or a duration has elapsed.
36pub struct Time<Ev> {
37    context: CapabilityContext<TimeRequest, Ev>,
38}
39
40impl<Ev> crux_core::Capability<Ev> for Time<Ev> {
41    type Operation = TimeRequest;
42    type MappedSelf<MappedEv> = Time<MappedEv>;
43
44    fn map_event<F, NewEv>(&self, f: F) -> Self::MappedSelf<NewEv>
45    where
46        F: Fn(NewEv) -> Ev + Send + Sync + 'static,
47        Ev: 'static,
48        NewEv: 'static + Send,
49    {
50        Time::new(self.context.map_event(f))
51    }
52}
53
54impl<Ev> Clone for Time<Ev> {
55    fn clone(&self) -> Self {
56        Self {
57            context: self.context.clone(),
58        }
59    }
60}
61
62impl<Ev> Time<Ev>
63where
64    Ev: 'static,
65{
66    #[must_use]
67    pub fn new(context: CapabilityContext<TimeRequest, Ev>) -> Self {
68        Self { context }
69    }
70
71    /// Request current time, which will be passed to the app as a [`TimeResponse`] containing an [`Instant`]
72    /// wrapped in the event produced by the `callback`.
73    pub fn now<F>(&self, callback: F)
74    where
75        F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
76    {
77        self.context.spawn({
78            let context = self.context.clone();
79            let this = self.clone();
80
81            async move {
82                context.update_app(callback(this.now_async().await));
83            }
84        });
85    }
86
87    /// Request current time, which will be passed to the app as a [`TimeResponse`] containing an [`Instant`]
88    /// This is an async call to use with [`crux_core::compose::Compose`].
89    pub async fn now_async(&self) -> TimeResponse {
90        self.context.request_from_shell(TimeRequest::Now).await
91    }
92
93    /// Ask to receive a notification when the specified
94    /// [`SystemTime`] has arrived.
95    pub fn notify_at<F>(&self, system_time: SystemTime, callback: F) -> TimerId
96    where
97        F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
98    {
99        let (future, id) = self.notify_at_async(system_time);
100        self.context.spawn({
101            let context = self.context.clone();
102            async move {
103                context.update_app(callback(future.await));
104            }
105        });
106        id
107    }
108
109    /// Ask to receive a notification when the specified
110    /// [`SystemTime`] has arrived.
111    /// This is an async call to use with [`crux_core::compose::Compose`].
112    #[must_use]
113    pub fn notify_at_async(
114        &self,
115        system_time: SystemTime,
116    ) -> (TimerFuture<impl Future<Output = TimeResponse>>, TimerId) {
117        let id = get_timer_id();
118        let future = self.context.request_from_shell(TimeRequest::NotifyAt {
119            id,
120            instant: system_time.into(),
121        });
122        (TimerFuture::new(id, future), id)
123    }
124
125    /// Ask to receive a notification when the specified [`Duration`](std::time::Duration) has elapsed.
126    pub fn notify_after<F>(&self, duration: std::time::Duration, callback: F) -> TimerId
127    where
128        F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
129    {
130        let (future, id) = self.notify_after_async(duration);
131        self.context.spawn({
132            let context = self.context.clone();
133            async move {
134                context.update_app(callback(future.await));
135            }
136        });
137        id
138    }
139
140    /// Ask to receive a notification when the specified [`Duration`](std::time::Duration) has elapsed.
141    /// This is an async call to use with [`crux_core::compose::Compose`].
142    #[must_use]
143    pub fn notify_after_async(
144        &self,
145        duration: std::time::Duration,
146    ) -> (TimerFuture<impl Future<Output = TimeResponse>>, TimerId) {
147        let id = get_timer_id();
148        let future = self.context.request_from_shell(TimeRequest::NotifyAfter {
149            id,
150            duration: duration.into(),
151        });
152        (TimerFuture::new(id, future), id)
153    }
154
155    /// # Panics
156    ///
157    /// Panics if we can't acquire the lock on the cleared timer IDs.
158    pub fn clear(&self, id: TimerId) {
159        self.context.spawn({
160            {
161                let mut lock = CLEARED_TIMER_IDS.lock().unwrap();
162                lock.insert(id);
163            }
164
165            let context = self.context.clone();
166            async move {
167                context.notify_shell(TimeRequest::Clear { id }).await;
168            }
169        });
170    }
171}
172
173pub struct TimerFuture<F>
174where
175    F: Future<Output = TimeResponse> + Unpin,
176{
177    timer_id: TimerId,
178    is_cleared: bool,
179    future: F,
180}
181
182impl<F> Future for TimerFuture<F>
183where
184    F: Future<Output = TimeResponse> + Unpin,
185{
186    type Output = TimeResponse;
187
188    fn poll(
189        self: Pin<&mut Self>,
190        cx: &mut std::task::Context<'_>,
191    ) -> std::task::Poll<Self::Output> {
192        if self.is_cleared {
193            // short-circuit return
194            return Poll::Ready(TimeResponse::Cleared { id: self.timer_id });
195        }
196        // see if the timer has been cleared
197        let timer_is_cleared = {
198            let mut lock = CLEARED_TIMER_IDS.lock().unwrap();
199            lock.remove(&self.timer_id)
200        };
201        let this = self.get_mut();
202        this.is_cleared = timer_is_cleared;
203        if timer_is_cleared {
204            // if the timer has been cleared, immediately return 'Ready' without
205            // waiting for the timer to elapse
206            Poll::Ready(TimeResponse::Cleared { id: this.timer_id })
207        } else {
208            // otherwise, defer to the inner future
209            Pin::new(&mut this.future).poll(cx)
210        }
211    }
212}
213
214impl<F> TimerFuture<F>
215where
216    F: Future<Output = TimeResponse> + Unpin,
217{
218    fn new(timer_id: TimerId, future: F) -> Self {
219        Self {
220            timer_id,
221            future,
222            is_cleared: false,
223        }
224    }
225}
226
227// Global HashSet containing the ids of timers which have been _cleared_
228// but the whose futures have _not since been polled_. When the future is next
229// polled, the timer id is evicted from this set and the timer is 'poisoned'
230// so as to return immediately without waiting on the shell.
231static CLEARED_TIMER_IDS: LazyLock<Mutex<HashSet<TimerId>>> =
232    LazyLock::new(|| Mutex::new(HashSet::new()));
233
234#[cfg(test)]
235mod test {
236    use super::*;
237
238    #[test]
239    fn test_serializing_the_request_types_as_json() {
240        let now = TimeRequest::Now;
241
242        let serialized = serde_json::to_string(&now).unwrap();
243        assert_eq!(&serialized, "\"now\"");
244
245        let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
246        assert_eq!(now, deserialized);
247
248        let now = TimeRequest::NotifyAt {
249            id: TimerId(1),
250            instant: Instant::new(1, 2),
251        };
252
253        let serialized = serde_json::to_string(&now).unwrap();
254        assert_eq!(
255            &serialized,
256            r#"{"notifyAt":{"id":1,"instant":{"seconds":1,"nanos":2}}}"#
257        );
258
259        let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
260        assert_eq!(now, deserialized);
261
262        let now = TimeRequest::NotifyAfter {
263            id: TimerId(2),
264            duration: crate::Duration::from_secs(1),
265        };
266
267        let serialized = serde_json::to_string(&now).unwrap();
268        assert_eq!(
269            &serialized,
270            r#"{"notifyAfter":{"id":2,"duration":{"nanos":1000000000}}}"#
271        );
272
273        let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
274        assert_eq!(now, deserialized);
275    }
276
277    #[test]
278    fn test_serializing_the_response_types_as_json() {
279        let now = TimeResponse::Now {
280            instant: Instant::new(1, 2),
281        };
282
283        let serialized = serde_json::to_string(&now).unwrap();
284        assert_eq!(
285            &serialized,
286            r#"{"now":{"instant":{"seconds":1,"nanos":2}}}"#
287        );
288
289        let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
290        assert_eq!(now, deserialized);
291
292        let now = TimeResponse::DurationElapsed { id: TimerId(1) };
293
294        let serialized = serde_json::to_string(&now).unwrap();
295        assert_eq!(&serialized, r#"{"durationElapsed":{"id":1}}"#);
296
297        let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
298        assert_eq!(now, deserialized);
299
300        let now = TimeResponse::InstantArrived { id: TimerId(2) };
301
302        let serialized = serde_json::to_string(&now).unwrap();
303        assert_eq!(&serialized, r#"{"instantArrived":{"id":2}}"#);
304
305        let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
306        assert_eq!(now, deserialized);
307    }
308}