1#![deny(clippy::pedantic)]
2pub mod command;
9pub mod protocol;
10
11use std::{
12 collections::HashSet,
13 future::Future,
14 pin::Pin,
15 sync::{
16 atomic::{AtomicUsize, Ordering},
17 LazyLock, Mutex,
18 },
19 task::Poll,
20 time::SystemTime,
21};
22
23use crux_core::capability::CapabilityContext;
24
25pub use protocol::{duration::Duration, instant::Instant, TimeRequest, TimeResponse, TimerId};
26
27fn get_timer_id() -> TimerId {
28 static COUNTER: AtomicUsize = AtomicUsize::new(1);
29 TimerId(COUNTER.fetch_add(1, Ordering::Relaxed))
30}
31
32pub struct Time<Ev> {
37 context: CapabilityContext<TimeRequest, Ev>,
38}
39
40impl<Ev> crux_core::Capability<Ev> for Time<Ev> {
41 type Operation = TimeRequest;
42 type MappedSelf<MappedEv> = Time<MappedEv>;
43
44 fn map_event<F, NewEv>(&self, f: F) -> Self::MappedSelf<NewEv>
45 where
46 F: Fn(NewEv) -> Ev + Send + Sync + 'static,
47 Ev: 'static,
48 NewEv: 'static + Send,
49 {
50 Time::new(self.context.map_event(f))
51 }
52}
53
54impl<Ev> Clone for Time<Ev> {
55 fn clone(&self) -> Self {
56 Self {
57 context: self.context.clone(),
58 }
59 }
60}
61
62impl<Ev> Time<Ev>
63where
64 Ev: 'static,
65{
66 #[must_use]
67 pub fn new(context: CapabilityContext<TimeRequest, Ev>) -> Self {
68 Self { context }
69 }
70
71 pub fn now<F>(&self, callback: F)
74 where
75 F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
76 {
77 self.context.spawn({
78 let context = self.context.clone();
79 let this = self.clone();
80
81 async move {
82 context.update_app(callback(this.now_async().await));
83 }
84 });
85 }
86
87 pub async fn now_async(&self) -> TimeResponse {
90 self.context.request_from_shell(TimeRequest::Now).await
91 }
92
93 pub fn notify_at<F>(&self, system_time: SystemTime, callback: F) -> TimerId
96 where
97 F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
98 {
99 let (future, id) = self.notify_at_async(system_time);
100 self.context.spawn({
101 let context = self.context.clone();
102 async move {
103 context.update_app(callback(future.await));
104 }
105 });
106 id
107 }
108
109 #[must_use]
113 pub fn notify_at_async(
114 &self,
115 system_time: SystemTime,
116 ) -> (TimerFuture<impl Future<Output = TimeResponse>>, TimerId) {
117 let id = get_timer_id();
118 let future = self.context.request_from_shell(TimeRequest::NotifyAt {
119 id,
120 instant: system_time.into(),
121 });
122 (TimerFuture::new(id, future), id)
123 }
124
125 pub fn notify_after<F>(&self, duration: std::time::Duration, callback: F) -> TimerId
127 where
128 F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
129 {
130 let (future, id) = self.notify_after_async(duration);
131 self.context.spawn({
132 let context = self.context.clone();
133 async move {
134 context.update_app(callback(future.await));
135 }
136 });
137 id
138 }
139
140 #[must_use]
143 pub fn notify_after_async(
144 &self,
145 duration: std::time::Duration,
146 ) -> (TimerFuture<impl Future<Output = TimeResponse>>, TimerId) {
147 let id = get_timer_id();
148 let future = self.context.request_from_shell(TimeRequest::NotifyAfter {
149 id,
150 duration: duration.into(),
151 });
152 (TimerFuture::new(id, future), id)
153 }
154
155 pub fn clear(&self, id: TimerId) {
159 self.context.spawn({
160 {
161 let mut lock = CLEARED_TIMER_IDS.lock().unwrap();
162 lock.insert(id);
163 }
164
165 let context = self.context.clone();
166 async move {
167 context.notify_shell(TimeRequest::Clear { id }).await;
168 }
169 });
170 }
171}
172
173pub struct TimerFuture<F>
174where
175 F: Future<Output = TimeResponse> + Unpin,
176{
177 timer_id: TimerId,
178 is_cleared: bool,
179 future: F,
180}
181
182impl<F> Future for TimerFuture<F>
183where
184 F: Future<Output = TimeResponse> + Unpin,
185{
186 type Output = TimeResponse;
187
188 fn poll(
189 self: Pin<&mut Self>,
190 cx: &mut std::task::Context<'_>,
191 ) -> std::task::Poll<Self::Output> {
192 if self.is_cleared {
193 return Poll::Ready(TimeResponse::Cleared { id: self.timer_id });
195 }
196 let timer_is_cleared = {
198 let mut lock = CLEARED_TIMER_IDS.lock().unwrap();
199 lock.remove(&self.timer_id)
200 };
201 let this = self.get_mut();
202 this.is_cleared = timer_is_cleared;
203 if timer_is_cleared {
204 Poll::Ready(TimeResponse::Cleared { id: this.timer_id })
207 } else {
208 Pin::new(&mut this.future).poll(cx)
210 }
211 }
212}
213
214impl<F> TimerFuture<F>
215where
216 F: Future<Output = TimeResponse> + Unpin,
217{
218 fn new(timer_id: TimerId, future: F) -> Self {
219 Self {
220 timer_id,
221 future,
222 is_cleared: false,
223 }
224 }
225}
226
227static CLEARED_TIMER_IDS: LazyLock<Mutex<HashSet<TimerId>>> =
232 LazyLock::new(|| Mutex::new(HashSet::new()));
233
234#[cfg(test)]
235mod test {
236 use super::*;
237
238 #[test]
239 fn test_serializing_the_request_types_as_json() {
240 let now = TimeRequest::Now;
241
242 let serialized = serde_json::to_string(&now).unwrap();
243 assert_eq!(&serialized, "\"now\"");
244
245 let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
246 assert_eq!(now, deserialized);
247
248 let now = TimeRequest::NotifyAt {
249 id: TimerId(1),
250 instant: Instant::new(1, 2),
251 };
252
253 let serialized = serde_json::to_string(&now).unwrap();
254 assert_eq!(
255 &serialized,
256 r#"{"notifyAt":{"id":1,"instant":{"seconds":1,"nanos":2}}}"#
257 );
258
259 let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
260 assert_eq!(now, deserialized);
261
262 let now = TimeRequest::NotifyAfter {
263 id: TimerId(2),
264 duration: crate::Duration::from_secs(1),
265 };
266
267 let serialized = serde_json::to_string(&now).unwrap();
268 assert_eq!(
269 &serialized,
270 r#"{"notifyAfter":{"id":2,"duration":{"nanos":1000000000}}}"#
271 );
272
273 let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
274 assert_eq!(now, deserialized);
275 }
276
277 #[test]
278 fn test_serializing_the_response_types_as_json() {
279 let now = TimeResponse::Now {
280 instant: Instant::new(1, 2),
281 };
282
283 let serialized = serde_json::to_string(&now).unwrap();
284 assert_eq!(
285 &serialized,
286 r#"{"now":{"instant":{"seconds":1,"nanos":2}}}"#
287 );
288
289 let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
290 assert_eq!(now, deserialized);
291
292 let now = TimeResponse::DurationElapsed { id: TimerId(1) };
293
294 let serialized = serde_json::to_string(&now).unwrap();
295 assert_eq!(&serialized, r#"{"durationElapsed":{"id":1}}"#);
296
297 let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
298 assert_eq!(now, deserialized);
299
300 let now = TimeResponse::InstantArrived { id: TimerId(2) };
301
302 let serialized = serde_json::to_string(&now).unwrap();
303 assert_eq!(&serialized, r#"{"instantArrived":{"id":2}}"#);
304
305 let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
306 assert_eq!(now, deserialized);
307 }
308}