1pub mod command;
8pub mod protocol;
9
10use std::{
11 collections::HashSet,
12 future::Future,
13 pin::Pin,
14 sync::{
15 atomic::{AtomicUsize, Ordering},
16 LazyLock, Mutex,
17 },
18 task::Poll,
19 time::SystemTime,
20};
21
22use crux_core::capability::CapabilityContext;
23
24pub use protocol::{duration::Duration, instant::Instant, TimeRequest, TimeResponse, TimerId};
25
26fn get_timer_id() -> TimerId {
27 static COUNTER: AtomicUsize = AtomicUsize::new(1);
28 TimerId(COUNTER.fetch_add(1, Ordering::Relaxed))
29}
30
31pub struct Time<Ev> {
36 context: CapabilityContext<TimeRequest, Ev>,
37}
38
39impl<Ev> crux_core::Capability<Ev> for Time<Ev> {
40 type Operation = TimeRequest;
41 type MappedSelf<MappedEv> = Time<MappedEv>;
42
43 fn map_event<F, NewEv>(&self, f: F) -> Self::MappedSelf<NewEv>
44 where
45 F: Fn(NewEv) -> Ev + Send + Sync + 'static,
46 Ev: 'static,
47 NewEv: 'static + Send,
48 {
49 Time::new(self.context.map_event(f))
50 }
51}
52
53impl<Ev> Clone for Time<Ev> {
54 fn clone(&self) -> Self {
55 Self {
56 context: self.context.clone(),
57 }
58 }
59}
60
61impl<Ev> Time<Ev>
62where
63 Ev: 'static,
64{
65 pub fn new(context: CapabilityContext<TimeRequest, Ev>) -> Self {
66 Self { context }
67 }
68
69 pub fn now<F>(&self, callback: F)
72 where
73 F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
74 {
75 self.context.spawn({
76 let context = self.context.clone();
77 let this = self.clone();
78
79 async move {
80 context.update_app(callback(this.now_async().await));
81 }
82 });
83 }
84
85 pub async fn now_async(&self) -> TimeResponse {
88 self.context.request_from_shell(TimeRequest::Now).await
89 }
90
91 pub fn notify_at<F>(&self, system_time: SystemTime, callback: F) -> TimerId
94 where
95 F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
96 {
97 let (future, id) = self.notify_at_async(system_time);
98 self.context.spawn({
99 let context = self.context.clone();
100 async move {
101 context.update_app(callback(future.await));
102 }
103 });
104 id
105 }
106
107 pub fn notify_at_async(
111 &self,
112 system_time: SystemTime,
113 ) -> (TimerFuture<impl Future<Output = TimeResponse>>, TimerId) {
114 let id = get_timer_id();
115 let future = self.context.request_from_shell(TimeRequest::NotifyAt {
116 id,
117 instant: system_time.into(),
118 });
119 (TimerFuture::new(id, future), id)
120 }
121
122 pub fn notify_after<F>(&self, duration: std::time::Duration, callback: F) -> TimerId
124 where
125 F: FnOnce(TimeResponse) -> Ev + Send + Sync + 'static,
126 {
127 let (future, id) = self.notify_after_async(duration);
128 self.context.spawn({
129 let context = self.context.clone();
130 async move {
131 context.update_app(callback(future.await));
132 }
133 });
134 id
135 }
136
137 pub fn notify_after_async(
140 &self,
141 duration: std::time::Duration,
142 ) -> (TimerFuture<impl Future<Output = TimeResponse>>, TimerId) {
143 let id = get_timer_id();
144 let future = self.context.request_from_shell(TimeRequest::NotifyAfter {
145 id,
146 duration: duration.into(),
147 });
148 (TimerFuture::new(id, future), id)
149 }
150
151 pub fn clear(&self, id: TimerId) {
152 self.context.spawn({
153 {
154 let mut lock = CLEARED_TIMER_IDS.lock().unwrap();
155 lock.insert(id);
156 }
157
158 let context = self.context.clone();
159 async move {
160 context.notify_shell(TimeRequest::Clear { id }).await;
161 }
162 });
163 }
164}
165
166pub struct TimerFuture<F>
167where
168 F: Future<Output = TimeResponse> + Unpin,
169{
170 timer_id: TimerId,
171 is_cleared: bool,
172 future: F,
173}
174
175impl<F> Future for TimerFuture<F>
176where
177 F: Future<Output = TimeResponse> + Unpin,
178{
179 type Output = TimeResponse;
180
181 fn poll(
182 self: Pin<&mut Self>,
183 cx: &mut std::task::Context<'_>,
184 ) -> std::task::Poll<Self::Output> {
185 if self.is_cleared {
186 return Poll::Ready(TimeResponse::Cleared { id: self.timer_id });
188 };
189 let timer_is_cleared = {
191 let mut lock = CLEARED_TIMER_IDS.lock().unwrap();
192 lock.remove(&self.timer_id)
193 };
194 let this = self.get_mut();
195 this.is_cleared = timer_is_cleared;
196 if timer_is_cleared {
197 Poll::Ready(TimeResponse::Cleared { id: this.timer_id })
200 } else {
201 Pin::new(&mut this.future).poll(cx)
203 }
204 }
205}
206
207impl<F> TimerFuture<F>
208where
209 F: Future<Output = TimeResponse> + Unpin,
210{
211 fn new(timer_id: TimerId, future: F) -> Self {
212 Self {
213 timer_id,
214 future,
215 is_cleared: false,
216 }
217 }
218}
219
220static CLEARED_TIMER_IDS: LazyLock<Mutex<HashSet<TimerId>>> =
225 LazyLock::new(|| Mutex::new(HashSet::new()));
226
227#[cfg(test)]
228mod test {
229 use super::*;
230
231 #[test]
232 fn test_serializing_the_request_types_as_json() {
233 let now = TimeRequest::Now;
234
235 let serialized = serde_json::to_string(&now).unwrap();
236 assert_eq!(&serialized, "\"now\"");
237
238 let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
239 assert_eq!(now, deserialized);
240
241 let now = TimeRequest::NotifyAt {
242 id: TimerId(1),
243 instant: Instant::new(1, 2),
244 };
245
246 let serialized = serde_json::to_string(&now).unwrap();
247 assert_eq!(
248 &serialized,
249 r#"{"notifyAt":{"id":1,"instant":{"seconds":1,"nanos":2}}}"#
250 );
251
252 let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
253 assert_eq!(now, deserialized);
254
255 let now = TimeRequest::NotifyAfter {
256 id: TimerId(2),
257 duration: crate::Duration::from_secs(1),
258 };
259
260 let serialized = serde_json::to_string(&now).unwrap();
261 assert_eq!(
262 &serialized,
263 r#"{"notifyAfter":{"id":2,"duration":{"nanos":1000000000}}}"#
264 );
265
266 let deserialized: TimeRequest = serde_json::from_str(&serialized).unwrap();
267 assert_eq!(now, deserialized);
268 }
269
270 #[test]
271 fn test_serializing_the_response_types_as_json() {
272 let now = TimeResponse::Now {
273 instant: Instant::new(1, 2),
274 };
275
276 let serialized = serde_json::to_string(&now).unwrap();
277 assert_eq!(
278 &serialized,
279 r#"{"now":{"instant":{"seconds":1,"nanos":2}}}"#
280 );
281
282 let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
283 assert_eq!(now, deserialized);
284
285 let now = TimeResponse::DurationElapsed { id: TimerId(1) };
286
287 let serialized = serde_json::to_string(&now).unwrap();
288 assert_eq!(&serialized, r#"{"durationElapsed":{"id":1}}"#);
289
290 let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
291 assert_eq!(now, deserialized);
292
293 let now = TimeResponse::InstantArrived { id: TimerId(2) };
294
295 let serialized = serde_json::to_string(&now).unwrap();
296 assert_eq!(&serialized, r#"{"instantArrived":{"id":2}}"#);
297
298 let deserialized: TimeResponse = serde_json::from_str(&serialized).unwrap();
299 assert_eq!(now, deserialized);
300 }
301}