Skip to main content

crux_core/middleware/
effect_handling.rs

1use std::{
2    sync::{
3        Arc, Weak,
4        atomic::{AtomicBool, Ordering},
5    },
6    thread::{self, ThreadId},
7};
8
9use crate::{Request, RequestHandle, Resolvable, ResolveError, capability::Operation};
10
11use super::Layer;
12
13/// A resolver for an effect processed by middleware.
14///
15/// This type encapsulates the callback that feeds the operation's output back
16/// into the core. It **must** be called from an asynchronous context (e.g. a
17/// spawned thread, an async task via `spawn_local`, or a channel worker) —
18/// calling [`resolve`](Self::resolve) while
19/// [`try_process_effect`](EffectMiddleware::try_process_effect) is still on the call
20/// stack will panic.
21///
22/// For streaming operations ([`RequestHandle::Many`]), call `resolve` multiple
23/// times until the stream is exhausted.
24type ResolveFn<Output> = Box<dyn FnMut(&mut RequestHandle<Output>, Output) + Send>;
25
26pub struct EffectResolver<Output: Send + 'static> {
27    handle: RequestHandle<Output>,
28    resolve_fn: ResolveFn<Output>,
29    /// `true` while `try_process_effect` is executing on the call stack.
30    active: Arc<AtomicBool>,
31    /// The thread that called `try_process_effect`.
32    calling_thread: ThreadId,
33}
34
35impl<Output: Send + 'static> EffectResolver<Output> {
36    /// Resolve the effect with the given output.
37    ///
38    /// For one-shot effects this should be called exactly once. For streaming
39    /// effects it can be called multiple times.
40    ///
41    /// # Panics
42    ///
43    /// Panics if called synchronously from within
44    /// [`EffectMiddleware::try_process_effect`]. Middleware must dispatch work
45    /// asynchronously (e.g. `std::thread::spawn`, `spawn_local`, or a channel)
46    /// and call `resolve` from there.
47    ///
48    /// See <https://github.com/redbadger/crux/issues/492>
49    pub fn resolve(&mut self, output: Output) {
50        assert!(
51            !(self.active.load(Ordering::Acquire) && thread::current().id() == self.calling_thread),
52            "EffectMiddleware::try_process_effect must not call resolve() synchronously. \
53             Dispatch work asynchronously (thread, spawn_local, channel, etc.). \
54             See https://github.com/redbadger/crux/issues/492"
55        );
56        (self.resolve_fn)(&mut self.handle, output);
57    }
58}
59
60/// An effect processing middleware.
61///
62/// Implement this trait to provide effect processing in Rust on the core side.
63/// The two typical uses for this are:
64///
65/// 1. Reusing a Rust implementation of a capability compatible with all target
66///    platforms.
67/// 2. Using an existing crate which is not built with Sans-IO in mind.
68///
69/// There are a number of considerations for doing this:
70///
71/// - The effect processing will rely on system APIs or crates which MUST be
72///   portable to all platforms the library using this middleware is going to be
73///   deployed to. This is fundamentally trading off portability for reuse of the
74///   Rust implementation.
75/// - The middleware MUST process the effect asynchronously — it must not call
76///   [`EffectResolver::resolve`] before `try_process_effect` returns. On native
77///   targets this typically means spawning a thread or sending work to a
78///   channel-based worker. On WASM (which has no threads) this means using
79///   `spawn_local` or a similar async task primitive. Calling `resolve()`
80///   synchronously inside `try_process_effect` will panic.
81/// - Because the resolver may be sent to another thread (on native), the core
82///   and therefore the app are shared between threads. The app must be `Send`
83///   and `Sync`, which also forces the `Model` type to be `Send` and `Sync`.
84///   This should not be a problem — `Model` should not normally be `!Send` or
85///   `!Sync`.
86///
87/// # Example
88///
89/// ```rust,ignore
90/// impl EffectMiddleware for MyMiddleware {
91///     type Op = MyOperation;
92///
93///     fn try_process_effect(
94///         &self,
95///         operation: MyOperation,
96///         mut resolver: EffectResolver<<MyOperation as Operation>::Output>,
97///     ) {
98///         std::thread::spawn(move || {
99///             let output = do_work(operation);
100///             resolver.resolve(output);
101///         });
102///     }
103/// }
104/// ```
105pub trait EffectMiddleware: Send + Sync {
106    /// The operation type this middleware can process.
107    type Op: Operation;
108
109    /// Process the given operation and resolve via the provided resolver.
110    ///
111    /// The framework has already extracted the operation from the effect enum.
112    /// Use the [`EffectResolver`] to send the result back. The resolver **must
113    /// not** be called before this method returns — dispatch the work
114    /// asynchronously (e.g. `std::thread::spawn` on native, `spawn_local` on
115    /// WASM, or a channel send) and call [`EffectResolver::resolve`] from
116    /// there.
117    fn try_process_effect(
118        &self,
119        operation: Self::Op,
120        resolver: EffectResolver<<Self::Op as Operation>::Output>,
121    );
122}
123
124struct EffectMiddlewareLayerInner<Next, EM>
125where
126    Next: Layer + Sync + Send + 'static,
127    Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
128    EM: EffectMiddleware,
129{
130    next: Next,
131    middleware: EM,
132}
133
134/// Middleware layer able to process some of the effects.
135///
136/// This implements the general behaviour making sure all follow-up effects are
137/// processed and routed to the right place and delegates to the generic parameter `EM`,
138/// which implements [`EffectMiddleware`].
139pub struct HandleEffectLayer<Next, EM>
140where
141    Next: Layer + Sync + Send + 'static,
142    Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
143    EM: EffectMiddleware,
144{
145    inner: Arc<EffectMiddlewareLayerInner<Next, EM>>,
146}
147
148impl<Next, EM> Layer for HandleEffectLayer<Next, EM>
149where
150    Next: Layer,
151    Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
152    EM: EffectMiddleware + 'static,
153{
154    type Event = Next::Event;
155    type Effect = Next::Effect;
156    type ViewModel = Next::ViewModel;
157
158    fn update<F: Fn(Vec<Self::Effect>) + Send + Sync + 'static>(
159        &self,
160        event: Self::Event,
161        effect_callback: F,
162    ) -> Vec<Self::Effect> {
163        self.update(event, effect_callback)
164    }
165
166    fn resolve<Output, F: Fn(Vec<Self::Effect>) + Send + Sync + 'static>(
167        &self,
168        request: &mut impl Resolvable<Output>,
169        output: Output,
170        effect_callback: F,
171    ) -> Result<Vec<Self::Effect>, ResolveError> {
172        self.resolve(request, output, effect_callback)
173    }
174
175    fn view(&self) -> Self::ViewModel {
176        self.view()
177    }
178
179    fn process_tasks<F>(&self, effect_callback: F) -> Vec<Self::Effect>
180    where
181        F: Fn(Vec<Self::Effect>) + Sync + Send + 'static,
182    {
183        self.process_tasks(effect_callback)
184    }
185}
186
187impl<Next, EM> HandleEffectLayer<Next, EM>
188where
189    Next: Layer,
190    Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
191    EM: EffectMiddleware + 'static,
192{
193    /// Typically, you would use [`Layer::handle_effects_using`] to construct a
194    /// `HandleEffectLayer` instance for a specific [`EffectMiddleware`].
195    pub fn new(next: Next, middleware: EM) -> Self {
196        Self {
197            inner: Arc::new(EffectMiddlewareLayerInner { next, middleware }),
198        }
199    }
200
201    fn update(
202        &self,
203        event: Next::Event,
204        return_effects: impl Fn(Vec<Next::Effect>) + Send + Sync + 'static,
205    ) -> Vec<Next::Effect> {
206        let inner = Arc::downgrade(&self.inner);
207        let return_effects = Arc::new(return_effects);
208        let return_effects_copy = return_effects.clone();
209
210        let effects = self
211            .inner
212            .next
213            .update(event, move |later_effects_from_next| {
214                // Eventual route
215                Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
216            });
217
218        // Immediate route
219        Self::process_known_effects(&Arc::downgrade(&self.inner), effects, &return_effects_copy)
220    }
221
222    fn resolve<Output>(
223        &self,
224        request: &mut impl Resolvable<Output>,
225        result: Output,
226        return_effects: impl Fn(Vec<Next::Effect>) + Send + Sync + 'static,
227    ) -> Result<Vec<Next::Effect>, ResolveError> {
228        let inner = Arc::downgrade(&self.inner);
229        let return_effects = Arc::new(return_effects);
230        let return_effects_copy = return_effects.clone();
231
232        let effects = self
233            .inner
234            .next
235            .resolve(request, result, move |later_effects_from_next| {
236                Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
237            })?;
238
239        // Immediate route
240        Ok(Self::process_known_effects(
241            &Arc::downgrade(&self.inner),
242            effects,
243            &return_effects_copy,
244        ))
245    }
246
247    fn view(&self) -> Next::ViewModel {
248        self.inner.next.view()
249    }
250
251    fn process_tasks<F>(&self, return_effects: F) -> Vec<Next::Effect>
252    where
253        F: Fn(Vec<Next::Effect>) + Sync + Send + 'static,
254    {
255        let inner = Arc::downgrade(&self.inner);
256        let return_effects = Arc::new(return_effects);
257        let return_effects_copy = return_effects.clone();
258
259        let effects = self
260            .inner
261            .next
262            .process_tasks(move |later_effects_from_next| {
263                // Eventual route
264                Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
265            });
266
267        // Immediate route
268        Self::process_known_effects(&Arc::downgrade(&self.inner), effects, &return_effects_copy)
269    }
270
271    fn process_known_effects(
272        inner: &Weak<EffectMiddlewareLayerInner<Next, EM>>,
273        effects: Vec<Next::Effect>,
274        return_effects: &Arc<impl Fn(Vec<Next::Effect>) + Send + Sync + 'static>,
275    ) -> Vec<Next::Effect> {
276        effects
277            .into_iter()
278            .filter_map(|effect| {
279                // Try to convert the effect into a Request for the middleware's
280                // operation type. If conversion fails, the effect is not for
281                // this middleware — pass it through.
282                let request: Request<EM::Op> = match effect.try_into() {
283                    Ok(req) => req,
284                    Err(effect) => return Some(effect),
285                };
286
287                let (operation, handle) = request.split();
288
289                // Build the resolve function that will be called from the
290                // middleware's async context (thread, spawn_local, etc.).
291                let resolve_fn = {
292                    let return_effects = return_effects.clone();
293                    let inner = inner.clone();
294
295                    move |req_handle: &mut RequestHandle<<EM::Op as Operation>::Output>, output| {
296                        let Some(strong_inner) = inner.upgrade() else {
297                            eprintln!("Inner can't be upgraded after resolving effect");
298                            return;
299                        };
300
301                        if let Ok(immediate_effects) =
302                            strong_inner.next.resolve(req_handle, output, {
303                                let return_effects = return_effects.clone();
304                                let future_inner = inner.clone();
305
306                                move |eventual_effects| {
307                                    Self::process_known_effects_with(
308                                        &future_inner,
309                                        eventual_effects,
310                                        &return_effects,
311                                    );
312                                }
313                            })
314                        {
315                            Self::process_known_effects_with(
316                                &inner,
317                                immediate_effects,
318                                &return_effects,
319                            );
320                        }
321                    }
322                };
323
324                let Some(strong_inner) = inner.upgrade() else {
325                    eprintln!("Inner can't be upgraded to process effect");
326                    return None;
327                };
328
329                // Create the resolver with the active guard.
330                let active = Arc::new(AtomicBool::new(true));
331                let resolver = EffectResolver {
332                    handle,
333                    resolve_fn: Box::new(resolve_fn),
334                    active: active.clone(),
335                    calling_thread: thread::current().id(),
336                };
337
338                // Call the middleware. resolve() will panic if called during
339                // this scope.
340                strong_inner
341                    .middleware
342                    .try_process_effect(operation, resolver);
343
344                // Allow resolve() to be called now that try_process_effect has returned.
345                active.store(false, Ordering::Release);
346
347                None
348            })
349            .collect()
350    }
351
352    fn process_known_effects_with(
353        inner: &Weak<EffectMiddlewareLayerInner<Next, EM>>,
354        effects: Vec<<Next as Layer>::Effect>,
355        return_effects: &Arc<impl Fn(Vec<<Next as Layer>::Effect>) + Send + Sync + 'static>,
356    ) {
357        let unknown_effects = Self::process_known_effects(inner, effects, return_effects);
358
359        if !unknown_effects.is_empty() {
360            return_effects(unknown_effects);
361        }
362    }
363}