crux_time/
lib.rs

1//! Current time access for Crux apps
2//!
3//! Current time (on a wall clock) is considered a side-effect (although if we were to get pedantic, it's
4//! more of a side-cause) by Crux, and has to be obtained externally. This capability provides a simple
5//! interface to do so.
6
7pub mod command;
8pub mod protocol;
9
10use std::{
11    collections::HashSet,
12    future::Future,
13    pin::Pin,
14    sync::{
15        atomic::{AtomicUsize, Ordering},
16        LazyLock, Mutex,
17    },
18    task::Poll,
19    time::SystemTime,
20};
21
22use crux_core::capability::CapabilityContext;
23
24pub use protocol::{duration::Duration, instant::Instant, TimeRequest, TimeResponse, TimerId};
25
26fn get_timer_id() -> TimerId {
27    static COUNTER: AtomicUsize = AtomicUsize::new(1);
28    TimerId(COUNTER.fetch_add(1, Ordering::Relaxed))
29}
30
31/// The Time capability API
32///
33/// This capability provides access to the current time and allows the app to ask for
34/// notifications when a specific instant has arrived or a duration has elapsed.
35pub struct Time<Ev> {
36    context: CapabilityContext<TimeRequest, Ev>,
37}
38
39impl<Ev> crux_core::Capability<Ev> for Time<Ev> {
40    type Operation = TimeRequest;
41    type MappedSelf<MappedEv> = Time<MappedEv>;
42
43    fn map_event<F, NewEv>(&self, f: F) -> Self::MappedSelf<NewEv>
44    where
45        F: Fn(NewEv) -> Ev + Send + Sync + 'static,
46        Ev: 'static,
47        NewEv: 'static + Send,
48    {
49        Time::new(self.context.map_event(f))
50    }
51}
52
53impl<Ev> Clone for Time<Ev> {
54    fn clone(&self) -> Self {
55        Self {
56            context: self.context.clone(),
57        }
58    }
59}
60
61impl<Ev> Time<Ev>
62where
63    Ev: 'static,
64{
65    pub fn new(context: CapabilityContext<TimeRequest, Ev>) -> Self {
66        Self { context }
67    }
68
69    /// Request current time, which will be passed to the app as a [`TimeResponse`] containing an [`Instant`]
70    /// wrapped in the event produced by the `callback`.
71    pub fn now<F>(&self, callback: F)
72    where
73        F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
74    {
75        self.context.spawn({
76            let context = self.context.clone();
77            let this = self.clone();
78
79            async move {
80                context.update_app(callback(this.now_async().await));
81            }
82        });
83    }
84
85    /// Request current time, which will be passed to the app as a [`TimeResponse`] containing an [`Instant`]
86    /// This is an async call to use with [`crux_core::compose::Compose`].
87    pub async fn now_async(&self) -> TimeResponse {
88        self.context.request_from_shell(TimeRequest::Now).await
89    }
90
91    /// Ask to receive a notification when the specified
92    /// [`SystemTime`] has arrived.
93    pub fn notify_at<F>(&self, system_time: SystemTime, callback: F) -> TimerId
94    where
95        F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
96    {
97        let (future, id) = self.notify_at_async(system_time);
98        self.context.spawn({
99            let context = self.context.clone();
100            async move {
101                context.update_app(callback(future.await));
102            }
103        });
104        id
105    }
106
107    /// Ask to receive a notification when the specified
108    /// [`SystemTime`] has arrived.
109    /// This is an async call to use with [`crux_core::compose::Compose`].
110    pub fn notify_at_async(
111        &self,
112        system_time: SystemTime,
113    ) -> (TimerFuture<impl Future<Output = TimeResponse>>, TimerId) {
114        let id = get_timer_id();
115        let future = self.context.request_from_shell(TimeRequest::NotifyAt {
116            id,
117            instant: system_time.into(),
118        });
119        (TimerFuture::new(id, future), id)
120    }
121
122    /// Ask to receive a notification when the specified [`Duration`](std::time::Duration) has elapsed.
123    pub fn notify_after<F>(&self, duration: std::time::Duration, callback: F) -> TimerId
124    where
125        F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
126    {
127        let (future, id) = self.notify_after_async(duration);
128        self.context.spawn({
129            let context = self.context.clone();
130            async move {
131                context.update_app(callback(future.await));
132            }
133        });
134        id
135    }
136
137    /// Ask to receive a notification when the specified [`Duration`](std::time::Duration) has elapsed.
138    /// This is an async call to use with [`crux_core::compose::Compose`].
139    pub fn notify_after_async(
140        &self,
141        duration: std::time::Duration,
142    ) -> (TimerFuture<impl Future<Output = TimeResponse>>, TimerId) {
143        let id = get_timer_id();
144        let future = self.context.request_from_shell(TimeRequest::NotifyAfter {
145            id,
146            duration: duration.into(),
147        });
148        (TimerFuture::new(id, future), id)
149    }
150
151    pub fn clear(&self, id: TimerId) {
152        self.context.spawn({
153            {
154                let mut lock = CLEARED_TIMER_IDS.lock().unwrap();
155                lock.insert(id);
156            }
157
158            let context = self.context.clone();
159            async move {
160                context.notify_shell(TimeRequest::Clear { id }).await;
161            }
162        });
163    }
164}
165
166pub struct TimerFuture<F>
167where
168    F: Future<Output = TimeResponse> + Unpin,
169{
170    timer_id: TimerId,
171    is_cleared: bool,
172    future: F,
173}
174
175impl<F> Future for TimerFuture<F>
176where
177    F: Future<Output = TimeResponse> + Unpin,
178{
179    type Output = TimeResponse;
180
181    fn poll(
182        self: Pin<&mut Self>,
183        cx: &mut std::task::Context<'_>,
184    ) -> std::task::Poll<Self::Output> {
185        if self.is_cleared {
186            // short-circuit return
187            return Poll::Ready(TimeResponse::Cleared { id: self.timer_id });
188        };
189        // see if the timer has been cleared
190        let timer_is_cleared = {
191            let mut lock = CLEARED_TIMER_IDS.lock().unwrap();
192            lock.remove(&self.timer_id)
193        };
194        let this = self.get_mut();
195        this.is_cleared = timer_is_cleared;
196        if timer_is_cleared {
197            // if the timer has been cleared, immediately return 'Ready' without
198            // waiting for the timer to elapse
199            Poll::Ready(TimeResponse::Cleared { id: this.timer_id })
200        } else {
201            // otherwise, defer to the inner future
202            Pin::new(&mut this.future).poll(cx)
203        }
204    }
205}
206
207impl<F> TimerFuture<F>
208where
209    F: Future<Output = TimeResponse> + Unpin,
210{
211    fn new(timer_id: TimerId, future: F) -> Self {
212        Self {
213            timer_id,
214            future,
215            is_cleared: false,
216        }
217    }
218}
219
220// Global HashSet containing the ids of timers which have been _cleared_
221// but the whose futures have _not since been polled_. When the future is next
222// polled, the timer id is evicted from this set and the timer is 'poisoned'
223// so as to return immediately without waiting on the shell.
224static CLEARED_TIMER_IDS: LazyLock<Mutex<HashSet<TimerId>>> =
225    LazyLock::new(|| Mutex::new(HashSet::new()));
226
227#[cfg(test)]
228mod test {
229    use super::*;
230
231    #[test]
232    fn test_serializing_the_request_types_as_json() {
233        let now = TimeRequest::Now;
234
235        let serialized = serde_json::to_string(&now).unwrap();
236        assert_eq!(&serialized, "\"now\"");
237
238        let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
239        assert_eq!(now, deserialized);
240
241        let now = TimeRequest::NotifyAt {
242            id: TimerId(1),
243            instant: Instant::new(1, 2),
244        };
245
246        let serialized = serde_json::to_string(&now).unwrap();
247        assert_eq!(
248            &serialized,
249            r#"{"notifyAt":{"id":1,"instant":{"seconds":1,"nanos":2}}}"#
250        );
251
252        let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
253        assert_eq!(now, deserialized);
254
255        let now = TimeRequest::NotifyAfter {
256            id: TimerId(2),
257            duration: crate::Duration::from_secs(1),
258        };
259
260        let serialized = serde_json::to_string(&now).unwrap();
261        assert_eq!(
262            &serialized,
263            r#"{"notifyAfter":{"id":2,"duration":{"nanos":1000000000}}}"#
264        );
265
266        let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
267        assert_eq!(now, deserialized);
268    }
269
270    #[test]
271    fn test_serializing_the_response_types_as_json() {
272        let now = TimeResponse::Now {
273            instant: Instant::new(1, 2),
274        };
275
276        let serialized = serde_json::to_string(&now).unwrap();
277        assert_eq!(
278            &serialized,
279            r#"{"now":{"instant":{"seconds":1,"nanos":2}}}"#
280        );
281
282        let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
283        assert_eq!(now, deserialized);
284
285        let now = TimeResponse::DurationElapsed { id: TimerId(1) };
286
287        let serialized = serde_json::to_string(&now).unwrap();
288        assert_eq!(&serialized, r#"{"durationElapsed":{"id":1}}"#);
289
290        let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
291        assert_eq!(now, deserialized);
292
293        let now = TimeResponse::InstantArrived { id: TimerId(2) };
294
295        let serialized = serde_json::to_string(&now).unwrap();
296        assert_eq!(&serialized, r#"{"instantArrived":{"id":2}}"#);
297
298        let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
299        assert_eq!(now, deserialized);
300    }
301}