crux_core/middleware/effect_handling.rs
1use std::{
2 sync::{
3 Arc, Weak,
4 atomic::{AtomicBool, Ordering},
5 },
6 thread::{self, ThreadId},
7};
8
9use crate::{Request, RequestHandle, Resolvable, ResolveError, capability::Operation};
10
11use super::Layer;
12
13/// A resolver for an effect processed by middleware.
14///
15/// This type encapsulates the callback that feeds the operation's output back
16/// into the core. It **must** be called from an asynchronous context (e.g. a
17/// spawned thread, an async task via `spawn_local`, or a channel worker) —
18/// calling [`resolve`](Self::resolve) while
19/// [`try_process_effect`](EffectMiddleware::try_process_effect) is still on the call
20/// stack will panic.
21///
22/// For streaming operations ([`RequestHandle::Many`]), call `resolve` multiple
23/// times until the stream is exhausted.
24type ResolveFn<Output> = Box<dyn FnMut(&mut RequestHandle<Output>, Output) + Send>;
25
26pub struct EffectResolver<Output: Send + 'static> {
27 handle: RequestHandle<Output>,
28 resolve_fn: ResolveFn<Output>,
29 /// `true` while `try_process_effect` is executing on the call stack.
30 active: Arc<AtomicBool>,
31 /// The thread that called `try_process_effect`.
32 calling_thread: ThreadId,
33}
34
35impl<Output: Send + 'static> EffectResolver<Output> {
36 /// Resolve the effect with the given output.
37 ///
38 /// For one-shot effects this should be called exactly once. For streaming
39 /// effects it can be called multiple times.
40 ///
41 /// # Panics
42 ///
43 /// Panics if called synchronously from within
44 /// [`EffectMiddleware::try_process_effect`]. Middleware must dispatch work
45 /// asynchronously (e.g. `std::thread::spawn`, `spawn_local`, or a channel)
46 /// and call `resolve` from there.
47 ///
48 /// See <https://github.com/redbadger/crux/issues/492>
49 pub fn resolve(&mut self, output: Output) {
50 assert!(
51 !(self.active.load(Ordering::Acquire) && thread::current().id() == self.calling_thread),
52 "EffectMiddleware::try_process_effect must not call resolve() synchronously. \
53 Dispatch work asynchronously (thread, spawn_local, channel, etc.). \
54 See https://github.com/redbadger/crux/issues/492"
55 );
56 (self.resolve_fn)(&mut self.handle, output);
57 }
58}
59
60/// An effect processing middleware.
61///
62/// Implement this trait to provide effect processing in Rust on the core side.
63/// The two typical uses for this are:
64///
65/// 1. Reusing a Rust implementation of a capability compatible with all target
66/// platforms.
67/// 2. Using an existing crate which is not built with Sans-IO in mind.
68///
69/// There are a number of considerations for doing this:
70///
71/// - The effect processing will rely on system APIs or crates which MUST be
72/// portable to all platforms the library using this middleware is going to be
73/// deployed to. This is fundamentally trading off portability for reuse of the
74/// Rust implementation.
75/// - The middleware MUST process the effect asynchronously — it must not call
76/// [`EffectResolver::resolve`] before `try_process_effect` returns. On native
77/// targets this typically means spawning a thread or sending work to a
78/// channel-based worker. On WASM (which has no threads) this means using
79/// `spawn_local` or a similar async task primitive. Calling `resolve()`
80/// synchronously inside `try_process_effect` will panic.
81/// - Because the resolver may be sent to another thread (on native), the core
82/// and therefore the app are shared between threads. The app must be `Send`
83/// and `Sync`, which also forces the `Model` type to be `Send` and `Sync`.
84/// This should not be a problem — `Model` should not normally be `!Send` or
85/// `!Sync`.
86///
87/// # Example
88///
89/// ```rust,ignore
90/// impl EffectMiddleware for MyMiddleware {
91/// type Op = MyOperation;
92///
93/// fn try_process_effect(
94/// &self,
95/// operation: MyOperation,
96/// mut resolver: EffectResolver<<MyOperation as Operation>::Output>,
97/// ) {
98/// std::thread::spawn(move || {
99/// let output = do_work(operation);
100/// resolver.resolve(output);
101/// });
102/// }
103/// }
104/// ```
105pub trait EffectMiddleware: Send + Sync {
106 /// The operation type this middleware can process.
107 type Op: Operation;
108
109 /// Process the given operation and resolve via the provided resolver.
110 ///
111 /// The framework has already extracted the operation from the effect enum.
112 /// Use the [`EffectResolver`] to send the result back. The resolver **must
113 /// not** be called before this method returns — dispatch the work
114 /// asynchronously (e.g. `std::thread::spawn` on native, `spawn_local` on
115 /// WASM, or a channel send) and call [`EffectResolver::resolve`] from
116 /// there.
117 fn try_process_effect(
118 &self,
119 operation: Self::Op,
120 resolver: EffectResolver<<Self::Op as Operation>::Output>,
121 );
122}
123
124struct EffectMiddlewareLayerInner<Next, EM>
125where
126 Next: Layer + Sync + Send + 'static,
127 Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
128 EM: EffectMiddleware,
129{
130 next: Next,
131 middleware: EM,
132}
133
134/// Middleware layer able to process some of the effects. This implements the
135/// general behaviour making sure all follow-up effects are processed and routed
136/// to the right place and delegates to the generic parameter `EM`, which
137/// implements [`EffectMiddleware`].
138pub struct HandleEffectLayer<Next, EM>
139where
140 Next: Layer + Sync + Send + 'static,
141 Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
142 EM: EffectMiddleware,
143{
144 inner: Arc<EffectMiddlewareLayerInner<Next, EM>>,
145}
146
147impl<Next, EM> Layer for HandleEffectLayer<Next, EM>
148where
149 Next: Layer,
150 Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
151 EM: EffectMiddleware + 'static,
152{
153 type Event = Next::Event;
154 type Effect = Next::Effect;
155 type ViewModel = Next::ViewModel;
156
157 fn update<F: Fn(Vec<Self::Effect>) + Send + Sync + 'static>(
158 &self,
159 event: Self::Event,
160 effect_callback: F,
161 ) -> Vec<Self::Effect> {
162 self.update(event, effect_callback)
163 }
164
165 fn resolve<Output, F: Fn(Vec<Self::Effect>) + Send + Sync + 'static>(
166 &self,
167 request: &mut impl Resolvable<Output>,
168 output: Output,
169 effect_callback: F,
170 ) -> Result<Vec<Self::Effect>, ResolveError> {
171 self.resolve(request, output, effect_callback)
172 }
173
174 fn view(&self) -> Self::ViewModel {
175 self.view()
176 }
177
178 fn process_tasks<F>(&self, effect_callback: F) -> Vec<Self::Effect>
179 where
180 F: Fn(Vec<Self::Effect>) + Sync + Send + 'static,
181 {
182 self.process_tasks(effect_callback)
183 }
184}
185
186impl<Next, EM> HandleEffectLayer<Next, EM>
187where
188 Next: Layer,
189 Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
190 EM: EffectMiddleware + 'static,
191{
192 /// Typically, you would use [`Layer::handle_effects_using`] to construct a
193 /// `HandleEffectLayer` instance for a specific [`EffectMiddleware`].
194 pub fn new(next: Next, middleware: EM) -> Self {
195 Self {
196 inner: Arc::new(EffectMiddlewareLayerInner { next, middleware }),
197 }
198 }
199
200 fn update(
201 &self,
202 event: Next::Event,
203 return_effects: impl Fn(Vec<Next::Effect>) + Send + Sync + 'static,
204 ) -> Vec<Next::Effect> {
205 let inner = Arc::downgrade(&self.inner);
206 let return_effects = Arc::new(return_effects);
207 let return_effects_copy = return_effects.clone();
208
209 let effects = self
210 .inner
211 .next
212 .update(event, move |later_effects_from_next| {
213 // Eventual route
214 Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
215 });
216
217 // Immediate route
218 Self::process_known_effects(&Arc::downgrade(&self.inner), effects, &return_effects_copy)
219 }
220
221 fn resolve<Output>(
222 &self,
223 request: &mut impl Resolvable<Output>,
224 result: Output,
225 return_effects: impl Fn(Vec<Next::Effect>) + Send + Sync + 'static,
226 ) -> Result<Vec<Next::Effect>, ResolveError> {
227 let inner = Arc::downgrade(&self.inner);
228 let return_effects = Arc::new(return_effects);
229 let return_effects_copy = return_effects.clone();
230
231 let effects = self
232 .inner
233 .next
234 .resolve(request, result, move |later_effects_from_next| {
235 Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
236 })?;
237
238 // Immediate route
239 Ok(Self::process_known_effects(
240 &Arc::downgrade(&self.inner),
241 effects,
242 &return_effects_copy,
243 ))
244 }
245
246 fn view(&self) -> Next::ViewModel {
247 self.inner.next.view()
248 }
249
250 fn process_tasks<F>(&self, return_effects: F) -> Vec<Next::Effect>
251 where
252 F: Fn(Vec<Next::Effect>) + Sync + Send + 'static,
253 {
254 let inner = Arc::downgrade(&self.inner);
255 let return_effects = Arc::new(return_effects);
256 let return_effects_copy = return_effects.clone();
257
258 let effects = self
259 .inner
260 .next
261 .process_tasks(move |later_effects_from_next| {
262 // Eventual route
263 Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
264 });
265
266 // Immediate route
267 Self::process_known_effects(&Arc::downgrade(&self.inner), effects, &return_effects_copy)
268 }
269
270 fn process_known_effects(
271 inner: &Weak<EffectMiddlewareLayerInner<Next, EM>>,
272 effects: Vec<Next::Effect>,
273 return_effects: &Arc<impl Fn(Vec<Next::Effect>) + Send + Sync + 'static>,
274 ) -> Vec<Next::Effect> {
275 effects
276 .into_iter()
277 .filter_map(|effect| {
278 // Try to convert the effect into a Request for the middleware's
279 // operation type. If conversion fails, the effect is not for
280 // this middleware — pass it through.
281 let request: Request<EM::Op> = match effect.try_into() {
282 Ok(req) => req,
283 Err(effect) => return Some(effect),
284 };
285
286 let (operation, handle) = request.split();
287
288 // Build the resolve function that will be called from the
289 // middleware's async context (thread, spawn_local, etc.).
290 let resolve_fn = {
291 let return_effects = return_effects.clone();
292 let inner = inner.clone();
293
294 move |req_handle: &mut RequestHandle<<EM::Op as Operation>::Output>, output| {
295 let Some(strong_inner) = inner.upgrade() else {
296 eprintln!("Inner can't be upgraded after resolving effect");
297 return;
298 };
299
300 if let Ok(immediate_effects) =
301 strong_inner.next.resolve(req_handle, output, {
302 let return_effects = return_effects.clone();
303 let future_inner = inner.clone();
304
305 move |eventual_effects| {
306 Self::process_known_effects_with(
307 &future_inner,
308 eventual_effects,
309 &return_effects,
310 );
311 }
312 })
313 {
314 Self::process_known_effects_with(
315 &inner,
316 immediate_effects,
317 &return_effects,
318 );
319 }
320 }
321 };
322
323 let Some(strong_inner) = inner.upgrade() else {
324 eprintln!("Inner can't be upgraded to process effect");
325 return None;
326 };
327
328 // Create the resolver with the active guard.
329 let active = Arc::new(AtomicBool::new(true));
330 let resolver = EffectResolver {
331 handle,
332 resolve_fn: Box::new(resolve_fn),
333 active: active.clone(),
334 calling_thread: thread::current().id(),
335 };
336
337 // Call the middleware. resolve() will panic if called during
338 // this scope.
339 strong_inner
340 .middleware
341 .try_process_effect(operation, resolver);
342
343 // Allow resolve() to be called now that try_process_effect has returned.
344 active.store(false, Ordering::Release);
345
346 None
347 })
348 .collect()
349 }
350
351 fn process_known_effects_with(
352 inner: &Weak<EffectMiddlewareLayerInner<Next, EM>>,
353 effects: Vec<<Next as Layer>::Effect>,
354 return_effects: &Arc<impl Fn(Vec<<Next as Layer>::Effect>) + Send + Sync + 'static>,
355 ) {
356 let unknown_effects = Self::process_known_effects(inner, effects, return_effects);
357
358 if !unknown_effects.is_empty() {
359 return_effects(unknown_effects);
360 }
361 }
362}