Skip to main content

crux_core/middleware/
effect_handling.rs

1use std::{
2    sync::{
3        Arc, Weak,
4        atomic::{AtomicBool, Ordering},
5    },
6    thread::{self, ThreadId},
7};
8
9use crate::{Request, RequestHandle, Resolvable, ResolveError, capability::Operation};
10
11use super::Layer;
12
13/// A resolver for an effect processed by middleware.
14///
15/// This type encapsulates the callback that feeds the operation's output back
16/// into the core. It **must** be called from an asynchronous context (e.g. a
17/// spawned thread, an async task via `spawn_local`, or a channel worker) —
18/// calling [`resolve`](Self::resolve) while
19/// [`try_process_effect`](EffectMiddleware::try_process_effect) is still on the call
20/// stack will panic.
21///
22/// For streaming operations ([`RequestHandle::Many`]), call `resolve` multiple
23/// times until the stream is exhausted.
24type ResolveFn<Output> = Box<dyn FnMut(&mut RequestHandle<Output>, Output) + Send>;
25
26pub struct EffectResolver<Output: Send + 'static> {
27    handle: RequestHandle<Output>,
28    resolve_fn: ResolveFn<Output>,
29    /// `true` while `try_process_effect` is executing on the call stack.
30    active: Arc<AtomicBool>,
31    /// The thread that called `try_process_effect`.
32    calling_thread: ThreadId,
33}
34
35impl<Output: Send + 'static> EffectResolver<Output> {
36    /// Resolve the effect with the given output.
37    ///
38    /// For one-shot effects this should be called exactly once. For streaming
39    /// effects it can be called multiple times.
40    ///
41    /// # Panics
42    ///
43    /// Panics if called synchronously from within
44    /// [`EffectMiddleware::try_process_effect`]. Middleware must dispatch work
45    /// asynchronously (e.g. `std::thread::spawn`, `spawn_local`, or a channel)
46    /// and call `resolve` from there.
47    ///
48    /// See <https://github.com/redbadger/crux/issues/492>
49    pub fn resolve(&mut self, output: Output) {
50        assert!(
51            !(self.active.load(Ordering::Acquire) && thread::current().id() == self.calling_thread),
52            "EffectMiddleware::try_process_effect must not call resolve() synchronously. \
53             Dispatch work asynchronously (thread, spawn_local, channel, etc.). \
54             See https://github.com/redbadger/crux/issues/492"
55        );
56        (self.resolve_fn)(&mut self.handle, output);
57    }
58}
59
60/// An effect processing middleware.
61///
62/// Implement this trait to provide effect processing in Rust on the core side.
63/// The two typical uses for this are:
64///
65/// 1. Reusing a Rust implementation of a capability compatible with all target
66///    platforms.
67/// 2. Using an existing crate which is not built with Sans-IO in mind.
68///
69/// There are a number of considerations for doing this:
70///
71/// - The effect processing will rely on system APIs or crates which MUST be
72///   portable to all platforms the library using this middleware is going to be
73///   deployed to. This is fundamentally trading off portability for reuse of the
74///   Rust implementation.
75/// - The middleware MUST process the effect asynchronously — it must not call
76///   [`EffectResolver::resolve`] before `try_process_effect` returns. On native
77///   targets this typically means spawning a thread or sending work to a
78///   channel-based worker. On WASM (which has no threads) this means using
79///   `spawn_local` or a similar async task primitive. Calling `resolve()`
80///   synchronously inside `try_process_effect` will panic.
81/// - Because the resolver may be sent to another thread (on native), the core
82///   and therefore the app are shared between threads. The app must be `Send`
83///   and `Sync`, which also forces the `Model` type to be `Send` and `Sync`.
84///   This should not be a problem — `Model` should not normally be `!Send` or
85///   `!Sync`.
86///
87/// # Example
88///
89/// ```rust,ignore
90/// impl EffectMiddleware for MyMiddleware {
91///     type Op = MyOperation;
92///
93///     fn try_process_effect(
94///         &self,
95///         operation: MyOperation,
96///         mut resolver: EffectResolver<<MyOperation as Operation>::Output>,
97///     ) {
98///         std::thread::spawn(move || {
99///             let output = do_work(operation);
100///             resolver.resolve(output);
101///         });
102///     }
103/// }
104/// ```
105pub trait EffectMiddleware: Send + Sync {
106    /// The operation type this middleware can process.
107    type Op: Operation;
108
109    /// Process the given operation and resolve via the provided resolver.
110    ///
111    /// The framework has already extracted the operation from the effect enum.
112    /// Use the [`EffectResolver`] to send the result back. The resolver **must
113    /// not** be called before this method returns — dispatch the work
114    /// asynchronously (e.g. `std::thread::spawn` on native, `spawn_local` on
115    /// WASM, or a channel send) and call [`EffectResolver::resolve`] from
116    /// there.
117    fn try_process_effect(
118        &self,
119        operation: Self::Op,
120        resolver: EffectResolver<<Self::Op as Operation>::Output>,
121    );
122}
123
124struct EffectMiddlewareLayerInner<Next, EM>
125where
126    Next: Layer + Sync + Send + 'static,
127    Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
128    EM: EffectMiddleware,
129{
130    next: Next,
131    middleware: EM,
132}
133
134/// Middleware layer able to process some of the effects. This implements the
135/// general behaviour making sure all follow-up effects are processed and routed
136/// to the right place and delegates to the generic parameter `EM`, which
137/// implements [`EffectMiddleware`].
138pub struct HandleEffectLayer<Next, EM>
139where
140    Next: Layer + Sync + Send + 'static,
141    Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
142    EM: EffectMiddleware,
143{
144    inner: Arc<EffectMiddlewareLayerInner<Next, EM>>,
145}
146
147impl<Next, EM> Layer for HandleEffectLayer<Next, EM>
148where
149    Next: Layer,
150    Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
151    EM: EffectMiddleware + 'static,
152{
153    type Event = Next::Event;
154    type Effect = Next::Effect;
155    type ViewModel = Next::ViewModel;
156
157    fn update<F: Fn(Vec<Self::Effect>) + Send + Sync + 'static>(
158        &self,
159        event: Self::Event,
160        effect_callback: F,
161    ) -> Vec<Self::Effect> {
162        self.update(event, effect_callback)
163    }
164
165    fn resolve<Output, F: Fn(Vec<Self::Effect>) + Send + Sync + 'static>(
166        &self,
167        request: &mut impl Resolvable<Output>,
168        output: Output,
169        effect_callback: F,
170    ) -> Result<Vec<Self::Effect>, ResolveError> {
171        self.resolve(request, output, effect_callback)
172    }
173
174    fn view(&self) -> Self::ViewModel {
175        self.view()
176    }
177
178    fn process_tasks<F>(&self, effect_callback: F) -> Vec<Self::Effect>
179    where
180        F: Fn(Vec<Self::Effect>) + Sync + Send + 'static,
181    {
182        self.process_tasks(effect_callback)
183    }
184}
185
186impl<Next, EM> HandleEffectLayer<Next, EM>
187where
188    Next: Layer,
189    Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
190    EM: EffectMiddleware + 'static,
191{
192    /// Typically, you would use [`Layer::handle_effects_using`] to construct a
193    /// `HandleEffectLayer` instance for a specific [`EffectMiddleware`].
194    pub fn new(next: Next, middleware: EM) -> Self {
195        Self {
196            inner: Arc::new(EffectMiddlewareLayerInner { next, middleware }),
197        }
198    }
199
200    fn update(
201        &self,
202        event: Next::Event,
203        return_effects: impl Fn(Vec<Next::Effect>) + Send + Sync + 'static,
204    ) -> Vec<Next::Effect> {
205        let inner = Arc::downgrade(&self.inner);
206        let return_effects = Arc::new(return_effects);
207        let return_effects_copy = return_effects.clone();
208
209        let effects = self
210            .inner
211            .next
212            .update(event, move |later_effects_from_next| {
213                // Eventual route
214                Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
215            });
216
217        // Immediate route
218        Self::process_known_effects(&Arc::downgrade(&self.inner), effects, &return_effects_copy)
219    }
220
221    fn resolve<Output>(
222        &self,
223        request: &mut impl Resolvable<Output>,
224        result: Output,
225        return_effects: impl Fn(Vec<Next::Effect>) + Send + Sync + 'static,
226    ) -> Result<Vec<Next::Effect>, ResolveError> {
227        let inner = Arc::downgrade(&self.inner);
228        let return_effects = Arc::new(return_effects);
229        let return_effects_copy = return_effects.clone();
230
231        let effects = self
232            .inner
233            .next
234            .resolve(request, result, move |later_effects_from_next| {
235                Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
236            })?;
237
238        // Immediate route
239        Ok(Self::process_known_effects(
240            &Arc::downgrade(&self.inner),
241            effects,
242            &return_effects_copy,
243        ))
244    }
245
246    fn view(&self) -> Next::ViewModel {
247        self.inner.next.view()
248    }
249
250    fn process_tasks<F>(&self, return_effects: F) -> Vec<Next::Effect>
251    where
252        F: Fn(Vec<Next::Effect>) + Sync + Send + 'static,
253    {
254        let inner = Arc::downgrade(&self.inner);
255        let return_effects = Arc::new(return_effects);
256        let return_effects_copy = return_effects.clone();
257
258        let effects = self
259            .inner
260            .next
261            .process_tasks(move |later_effects_from_next| {
262                // Eventual route
263                Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
264            });
265
266        // Immediate route
267        Self::process_known_effects(&Arc::downgrade(&self.inner), effects, &return_effects_copy)
268    }
269
270    fn process_known_effects(
271        inner: &Weak<EffectMiddlewareLayerInner<Next, EM>>,
272        effects: Vec<Next::Effect>,
273        return_effects: &Arc<impl Fn(Vec<Next::Effect>) + Send + Sync + 'static>,
274    ) -> Vec<Next::Effect> {
275        effects
276            .into_iter()
277            .filter_map(|effect| {
278                // Try to convert the effect into a Request for the middleware's
279                // operation type. If conversion fails, the effect is not for
280                // this middleware — pass it through.
281                let request: Request<EM::Op> = match effect.try_into() {
282                    Ok(req) => req,
283                    Err(effect) => return Some(effect),
284                };
285
286                let (operation, handle) = request.split();
287
288                // Build the resolve function that will be called from the
289                // middleware's async context (thread, spawn_local, etc.).
290                let resolve_fn = {
291                    let return_effects = return_effects.clone();
292                    let inner = inner.clone();
293
294                    move |req_handle: &mut RequestHandle<<EM::Op as Operation>::Output>, output| {
295                        let Some(strong_inner) = inner.upgrade() else {
296                            eprintln!("Inner can't be upgraded after resolving effect");
297                            return;
298                        };
299
300                        if let Ok(immediate_effects) =
301                            strong_inner.next.resolve(req_handle, output, {
302                                let return_effects = return_effects.clone();
303                                let future_inner = inner.clone();
304
305                                move |eventual_effects| {
306                                    Self::process_known_effects_with(
307                                        &future_inner,
308                                        eventual_effects,
309                                        &return_effects,
310                                    );
311                                }
312                            })
313                        {
314                            Self::process_known_effects_with(
315                                &inner,
316                                immediate_effects,
317                                &return_effects,
318                            );
319                        }
320                    }
321                };
322
323                let Some(strong_inner) = inner.upgrade() else {
324                    eprintln!("Inner can't be upgraded to process effect");
325                    return None;
326                };
327
328                // Create the resolver with the active guard.
329                let active = Arc::new(AtomicBool::new(true));
330                let resolver = EffectResolver {
331                    handle,
332                    resolve_fn: Box::new(resolve_fn),
333                    active: active.clone(),
334                    calling_thread: thread::current().id(),
335                };
336
337                // Call the middleware. resolve() will panic if called during
338                // this scope.
339                strong_inner
340                    .middleware
341                    .try_process_effect(operation, resolver);
342
343                // Allow resolve() to be called now that try_process_effect has returned.
344                active.store(false, Ordering::Release);
345
346                None
347            })
348            .collect()
349    }
350
351    fn process_known_effects_with(
352        inner: &Weak<EffectMiddlewareLayerInner<Next, EM>>,
353        effects: Vec<<Next as Layer>::Effect>,
354        return_effects: &Arc<impl Fn(Vec<<Next as Layer>::Effect>) + Send + Sync + 'static>,
355    ) {
356        let unknown_effects = Self::process_known_effects(inner, effects, return_effects);
357
358        if !unknown_effects.is_empty() {
359            return_effects(unknown_effects);
360        }
361    }
362}