crux_core/middleware/effect_handling.rs
1use std::{
2 sync::{
3 Arc, Weak,
4 atomic::{AtomicBool, Ordering},
5 },
6 thread::{self, ThreadId},
7};
8
9use crate::{Request, RequestHandle, Resolvable, ResolveError, capability::Operation};
10
11use super::Layer;
12
13/// A resolver for an effect processed by middleware.
14///
15/// This type encapsulates the callback that feeds the operation's output back
16/// into the core. It **must** be called from an asynchronous context (e.g. a
17/// spawned thread, an async task via `spawn_local`, or a channel worker) —
18/// calling [`resolve`](Self::resolve) while
19/// [`try_process_effect`](EffectMiddleware::try_process_effect) is still on the call
20/// stack will panic.
21///
22/// For streaming operations ([`RequestHandle::Many`]), call `resolve` multiple
23/// times until the stream is exhausted.
24type ResolveFn<Output> = Box<dyn FnMut(&mut RequestHandle<Output>, Output) + Send>;
25
26pub struct EffectResolver<Output: Send + 'static> {
27 handle: RequestHandle<Output>,
28 resolve_fn: ResolveFn<Output>,
29 /// `true` while `try_process_effect` is executing on the call stack.
30 active: Arc<AtomicBool>,
31 /// The thread that called `try_process_effect`.
32 calling_thread: ThreadId,
33}
34
35impl<Output: Send + 'static> EffectResolver<Output> {
36 /// Resolve the effect with the given output.
37 ///
38 /// For one-shot effects this should be called exactly once. For streaming
39 /// effects it can be called multiple times.
40 ///
41 /// # Panics
42 ///
43 /// Panics if called synchronously from within
44 /// [`EffectMiddleware::try_process_effect`]. Middleware must dispatch work
45 /// asynchronously (e.g. `std::thread::spawn`, `spawn_local`, or a channel)
46 /// and call `resolve` from there.
47 ///
48 /// See <https://github.com/redbadger/crux/issues/492>
49 pub fn resolve(&mut self, output: Output) {
50 assert!(
51 !(self.active.load(Ordering::Acquire) && thread::current().id() == self.calling_thread),
52 "EffectMiddleware::try_process_effect must not call resolve() synchronously. \
53 Dispatch work asynchronously (thread, spawn_local, channel, etc.). \
54 See https://github.com/redbadger/crux/issues/492"
55 );
56 (self.resolve_fn)(&mut self.handle, output);
57 }
58}
59
60/// An effect processing middleware.
61///
62/// Implement this trait to provide effect processing in Rust on the core side.
63/// The two typical uses for this are:
64///
65/// 1. Reusing a Rust implementation of a capability compatible with all target
66/// platforms.
67/// 2. Using an existing crate which is not built with Sans-IO in mind.
68///
69/// There are a number of considerations for doing this:
70///
71/// - The effect processing will rely on system APIs or crates which MUST be
72/// portable to all platforms the library using this middleware is going to be
73/// deployed to. This is fundamentally trading off portability for reuse of the
74/// Rust implementation.
75/// - The middleware MUST process the effect asynchronously — it must not call
76/// [`EffectResolver::resolve`] before `try_process_effect` returns. On native
77/// targets this typically means spawning a thread or sending work to a
78/// channel-based worker. On WASM (which has no threads) this means using
79/// `spawn_local` or a similar async task primitive. Calling `resolve()`
80/// synchronously inside `try_process_effect` will panic.
81/// - Because the resolver may be sent to another thread (on native), the core
82/// and therefore the app are shared between threads. The app must be `Send`
83/// and `Sync`, which also forces the `Model` type to be `Send` and `Sync`.
84/// This should not be a problem — `Model` should not normally be `!Send` or
85/// `!Sync`.
86///
87/// # Example
88///
89/// ```rust,ignore
90/// impl EffectMiddleware for MyMiddleware {
91/// type Op = MyOperation;
92///
93/// fn try_process_effect(
94/// &self,
95/// operation: MyOperation,
96/// mut resolver: EffectResolver<<MyOperation as Operation>::Output>,
97/// ) {
98/// std::thread::spawn(move || {
99/// let output = do_work(operation);
100/// resolver.resolve(output);
101/// });
102/// }
103/// }
104/// ```
105pub trait EffectMiddleware: Send + Sync {
106 /// The operation type this middleware can process.
107 type Op: Operation;
108
109 /// Process the given operation and resolve via the provided resolver.
110 ///
111 /// The framework has already extracted the operation from the effect enum.
112 /// Use the [`EffectResolver`] to send the result back. The resolver **must
113 /// not** be called before this method returns — dispatch the work
114 /// asynchronously (e.g. `std::thread::spawn` on native, `spawn_local` on
115 /// WASM, or a channel send) and call [`EffectResolver::resolve`] from
116 /// there.
117 fn try_process_effect(
118 &self,
119 operation: Self::Op,
120 resolver: EffectResolver<<Self::Op as Operation>::Output>,
121 );
122}
123
124struct EffectMiddlewareLayerInner<Next, EM>
125where
126 Next: Layer + Sync + Send + 'static,
127 Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
128 EM: EffectMiddleware,
129{
130 next: Next,
131 middleware: EM,
132}
133
134/// Middleware layer able to process some of the effects.
135///
136/// This implements the general behaviour making sure all follow-up effects are
137/// processed and routed to the right place and delegates to the generic parameter `EM`,
138/// which implements [`EffectMiddleware`].
139pub struct HandleEffectLayer<Next, EM>
140where
141 Next: Layer + Sync + Send + 'static,
142 Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
143 EM: EffectMiddleware,
144{
145 inner: Arc<EffectMiddlewareLayerInner<Next, EM>>,
146}
147
148impl<Next, EM> Layer for HandleEffectLayer<Next, EM>
149where
150 Next: Layer,
151 Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
152 EM: EffectMiddleware + 'static,
153{
154 type Event = Next::Event;
155 type Effect = Next::Effect;
156 type ViewModel = Next::ViewModel;
157
158 fn update<F: Fn(Vec<Self::Effect>) + Send + Sync + 'static>(
159 &self,
160 event: Self::Event,
161 effect_callback: F,
162 ) -> Vec<Self::Effect> {
163 self.update(event, effect_callback)
164 }
165
166 fn resolve<Output, F: Fn(Vec<Self::Effect>) + Send + Sync + 'static>(
167 &self,
168 request: &mut impl Resolvable<Output>,
169 output: Output,
170 effect_callback: F,
171 ) -> Result<Vec<Self::Effect>, ResolveError> {
172 self.resolve(request, output, effect_callback)
173 }
174
175 fn view(&self) -> Self::ViewModel {
176 self.view()
177 }
178
179 fn process_tasks<F>(&self, effect_callback: F) -> Vec<Self::Effect>
180 where
181 F: Fn(Vec<Self::Effect>) + Sync + Send + 'static,
182 {
183 self.process_tasks(effect_callback)
184 }
185}
186
187impl<Next, EM> HandleEffectLayer<Next, EM>
188where
189 Next: Layer,
190 Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
191 EM: EffectMiddleware + 'static,
192{
193 /// Typically, you would use [`Layer::handle_effects_using`] to construct a
194 /// `HandleEffectLayer` instance for a specific [`EffectMiddleware`].
195 pub fn new(next: Next, middleware: EM) -> Self {
196 Self {
197 inner: Arc::new(EffectMiddlewareLayerInner { next, middleware }),
198 }
199 }
200
201 fn update(
202 &self,
203 event: Next::Event,
204 return_effects: impl Fn(Vec<Next::Effect>) + Send + Sync + 'static,
205 ) -> Vec<Next::Effect> {
206 let inner = Arc::downgrade(&self.inner);
207 let return_effects = Arc::new(return_effects);
208 let return_effects_copy = return_effects.clone();
209
210 let effects = self
211 .inner
212 .next
213 .update(event, move |later_effects_from_next| {
214 // Eventual route
215 Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
216 });
217
218 // Immediate route
219 Self::process_known_effects(&Arc::downgrade(&self.inner), effects, &return_effects_copy)
220 }
221
222 fn resolve<Output>(
223 &self,
224 request: &mut impl Resolvable<Output>,
225 result: Output,
226 return_effects: impl Fn(Vec<Next::Effect>) + Send + Sync + 'static,
227 ) -> Result<Vec<Next::Effect>, ResolveError> {
228 let inner = Arc::downgrade(&self.inner);
229 let return_effects = Arc::new(return_effects);
230 let return_effects_copy = return_effects.clone();
231
232 let effects = self
233 .inner
234 .next
235 .resolve(request, result, move |later_effects_from_next| {
236 Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
237 })?;
238
239 // Immediate route
240 Ok(Self::process_known_effects(
241 &Arc::downgrade(&self.inner),
242 effects,
243 &return_effects_copy,
244 ))
245 }
246
247 fn view(&self) -> Next::ViewModel {
248 self.inner.next.view()
249 }
250
251 fn process_tasks<F>(&self, return_effects: F) -> Vec<Next::Effect>
252 where
253 F: Fn(Vec<Next::Effect>) + Sync + Send + 'static,
254 {
255 let inner = Arc::downgrade(&self.inner);
256 let return_effects = Arc::new(return_effects);
257 let return_effects_copy = return_effects.clone();
258
259 let effects = self
260 .inner
261 .next
262 .process_tasks(move |later_effects_from_next| {
263 // Eventual route
264 Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
265 });
266
267 // Immediate route
268 Self::process_known_effects(&Arc::downgrade(&self.inner), effects, &return_effects_copy)
269 }
270
271 fn process_known_effects(
272 inner: &Weak<EffectMiddlewareLayerInner<Next, EM>>,
273 effects: Vec<Next::Effect>,
274 return_effects: &Arc<impl Fn(Vec<Next::Effect>) + Send + Sync + 'static>,
275 ) -> Vec<Next::Effect> {
276 effects
277 .into_iter()
278 .filter_map(|effect| {
279 // Try to convert the effect into a Request for the middleware's
280 // operation type. If conversion fails, the effect is not for
281 // this middleware — pass it through.
282 let request: Request<EM::Op> = match effect.try_into() {
283 Ok(req) => req,
284 Err(effect) => return Some(effect),
285 };
286
287 let (operation, handle) = request.split();
288
289 // Build the resolve function that will be called from the
290 // middleware's async context (thread, spawn_local, etc.).
291 let resolve_fn = {
292 let return_effects = return_effects.clone();
293 let inner = inner.clone();
294
295 move |req_handle: &mut RequestHandle<<EM::Op as Operation>::Output>, output| {
296 let Some(strong_inner) = inner.upgrade() else {
297 eprintln!("Inner can't be upgraded after resolving effect");
298 return;
299 };
300
301 if let Ok(immediate_effects) =
302 strong_inner.next.resolve(req_handle, output, {
303 let return_effects = return_effects.clone();
304 let future_inner = inner.clone();
305
306 move |eventual_effects| {
307 Self::process_known_effects_with(
308 &future_inner,
309 eventual_effects,
310 &return_effects,
311 );
312 }
313 })
314 {
315 Self::process_known_effects_with(
316 &inner,
317 immediate_effects,
318 &return_effects,
319 );
320 }
321 }
322 };
323
324 let Some(strong_inner) = inner.upgrade() else {
325 eprintln!("Inner can't be upgraded to process effect");
326 return None;
327 };
328
329 // Create the resolver with the active guard.
330 let active = Arc::new(AtomicBool::new(true));
331 let resolver = EffectResolver {
332 handle,
333 resolve_fn: Box::new(resolve_fn),
334 active: active.clone(),
335 calling_thread: thread::current().id(),
336 };
337
338 // Call the middleware. resolve() will panic if called during
339 // this scope.
340 strong_inner
341 .middleware
342 .try_process_effect(operation, resolver);
343
344 // Allow resolve() to be called now that try_process_effect has returned.
345 active.store(false, Ordering::Release);
346
347 None
348 })
349 .collect()
350 }
351
352 fn process_known_effects_with(
353 inner: &Weak<EffectMiddlewareLayerInner<Next, EM>>,
354 effects: Vec<<Next as Layer>::Effect>,
355 return_effects: &Arc<impl Fn(Vec<<Next as Layer>::Effect>) + Send + Sync + 'static>,
356 ) {
357 let unknown_effects = Self::process_known_effects(inner, effects, return_effects);
358
359 if !unknown_effects.is_empty() {
360 return_effects(unknown_effects);
361 }
362 }
363}