Here is an explanation of how `fdb_spawn` communicates with the simulation through the foundationdb-rs SDK. ```rs fdb_spawn(async move { ... // code that relies on polling FdbFutures } ``` `fdb_spawn` takes in a Rust Future and polls it until Ready. ```rs pub fn fdb_spawn(f: F) where F: Future + 'static, { // Pins the Future in memory, this is mandatory for the Future to execute properly // The UnsafeCell is used for interior mutability let f = UnsafeCell::new(Box::pin(f)); // The Future is wrapped into an FDBWaker and ref counted by an Arc let waker_arc = Arc::new(FDBWaker { f }); // We build a RawWaker containing both the Arc as data and a pointer to VTABLE // VTABLE contains a custom implementation of a Waker let raw_waker = RawWaker::new(Arc::into_raw(waker_arc) as *const (), &VTABLE); // Finally we build a Waker from the RawWaker // Waker is only a wrapper around a RawWaker which maps the function of the Waker trait to // the RawWaker's vtable let waker = unsafe { Waker::from_raw(raw_waker) }; // Here calling wake is stricly equivalent to any of: // - waker.inner.vtable.wake(waker.inner.data) // - raw_waker.vtable.wake(raw_waker.data) // - VTABLE[1](Arc::into_raw(waker_arc) as *const ()) // - fdbwaker_wake(Arc::into_raw(waker_arc) as *const (), true) waker.wake(); // waker is dropped here } ``` `wake()` calls `fdbwaker_wake` (through `VTABLE[1]`) with `decrease` set to `true`. > note: waker_ref reference the `Arc`, not directly `FDBWaker` ```rs fn fdbwaker_wake(waker_ref: &FDBWaker, decrease: bool) { println!("wake {}", decrease); // This increases the ref counter of the Arc and gives back a RawWaker let waker_raw = fdbwaker_clone(waker_ref); // We build a Waker from the RawWaker let waker = unsafe { Waker::from_raw(waker_raw) }; // We build a Context from the Waker // Context is only a wrapper around the Waker let mut cx = Context::from_waker(&waker); // We get the Future stored by the FDBWaker to poll it and discard the result let f = unsafe { &mut *waker_ref.f.get() }; match f.as_mut().poll(&mut cx) { std::task::Poll::Ready(_) => println!("READY"), std::task::Poll::Pending => println!("PENDING"), } if decrease { // This decreases the ref counter of the Arc fdbwaker_drop(waker_ref); } } ``` `poll()` runs the closure until encountering an `await`. `await` calls `poll` on its future and so on. The case that we are interested in takes place when the foundationdb-rs issues a call to the fdbserver using the C-API through `fdb_sys`. Calls that return a `FdbFutureHandle` are wrapped into `FdbFuture` which implement poll in the following manner: ```rs pub(crate) struct FdbFuture { f: Option, waker: Option>, phantom: std::marker::PhantomData, } impl Future for FdbFuture where T: TryFrom + Unpin, { type Output = FdbResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { println!(" POLL"); let f = self.f.as_ref().expect("cannot poll after resolve"); let ready = unsafe { fdb_sys::fdb_future_is_ready(f.as_ptr()) }; if ready == 0 { // If the Future is not ready yet we try to get the AtomicWaker of the Future // If the AtomicWaker wasn't already set (if it is the first time poll() is called on // the Future) a new one is created (and ref counted with an Arc) and register is set // to true let f_ptr = f.as_ptr(); let mut register = false; let waker = self.waker.get_or_insert_with(|| { register = true; Arc::new(AtomicWaker::new()) }); // Recall that Context is only a wrapper around a Waker // Here `cx` contains our FDBWaker /* Context { waker: Waker { waker: RawWaker { data: *const Arc as *const (), vtable: &'static RawWakerVTable, } } } */ // register clones the Waker, which calls fdbwaker_clone through the vtable of the // RawWaker and atomically sets it as its latest waker waker.register(cx.waker()); if register { println!(" REGISTER"); // If it is the first time poll() is called on the Future // fdb_sys::fdb_future_set_callback is called to register a callback for this // FdbFutureHandle the second argument is a pointer that will be passed as argument // to the callback when called let network_waker: Arc = waker.clone(); let network_waker_ptr = Arc::into_raw(network_waker); unsafe { fdb_sys::fdb_future_set_callback( f_ptr, Some(fdb_future_callback), network_waker_ptr as *mut _, ); } } println!(" PENDING"); Poll::Pending } else { println!(" READY"); // If the Future is ready its result is querried and propagated Poll::Ready( error::eval(unsafe { fdb_sys::fdb_future_get_error(f.as_ptr()) }) .and_then(|()| T::try_from(self.f.take().expect("self.f.is_some()"))), ) } } } ``` To sum up what happened until now: - `fdb_spawn` builds a ref counted `FDBWaker`, wraps it in a Waker and calls wake on it - `fdbwaker_wake` was called with `decrease=true`, it increases the `FDBWaker` ref count and polls its Future - in case an `FdbFuture` is polled, it: - registers our `Waker` into an `AtomicWaker` (cloning it in the process) - set an `fdb_future_callback` as callback and the `AtomicWaker` as argument for the `FdbFutureHandle` - returns `Poll::Pending` - `fdbwaker_wake` decreases the ref count (because `decrease==true`) and returns - `fdb_spawn` drops the `Waker`, decreasing the ref count ``` wake true // fdbwaker_wake clone // fdbwaker_wake POLL // FdbFuture::Poll clone // FdbFuture::Poll REGISTER // FdbFuture::Poll PENDING // FdbFuture::Poll PENDING // fdbwaker_wake drop // fdbwaker_wake drop // fdb_spawn ``` After this, the Future was handed to the simulation and `fdb_spawn` returns. If your Workload is valid, execution should be returned to the simulation. The fdbserver runs, internally resolving the Future and, when done, notifies `FdbFutureHandle` by calling its callback with its argument. ```rs extern "C" fn fdb_future_callback( _f: *mut fdb_sys::FDBFuture, callback_parameter: *mut ::std::os::raw::c_void, ) { println!(" CALLBACK"); // The AtomicWaker is reconstructed from the callback_parameter let network_waker: Arc = unsafe { Arc::from_raw(callback_parameter as *const _) }; // The last Waker registered is woken up if let Some(waker) = network_waker.take() { waker.wake(); // waker is dropped here } } ``` The last `Waker` was registered by: ```rs waker.register(cx.waker()); ``` so in this case the `AtomicWaker` wakes our `Waker`, which calls `fdbwaker_wake` with `decrease=true` which increases the ref count of `FDBWorker` and polls its Future. This time `fdb_sys::fdb_future_is_ready` should return a non-zero value indicating the `FdbFuture` was resolved by the fdbserver. So the `Waker` is not registered, and thus not cloned. `FdbFuture::poll` returns `Poll::Ready`, and the closure continues to execute code until it reaches a new `await`. In case it encounters a new `FdbFuture` the cycle continues exactly in the same way: - `FdbFuture::poll` is called which: - registers the `Waker`, increasing the ref count - fdb_sys::fdb_future_set_callback is called with `fdb_future_callback` and the `AtomicWaker` - returns `Poll::Pending` - `fdbwaker_wake` decreases the ref count (because `decrease==true`) and returns - `fdb_future_callback` drops the `Waker` which decreases the ref count ``` wake true // fdbwaker_wake clone // fdbwaker_wake POLL // FdbFuture::Poll READY // FdbFuture::Poll => a new FdbFuture is polled POLL // FdbFuture::Poll clone // FdbFuture::Poll REGISTER // FdbFuture::Poll PENDING // FdbFuture::Poll PENDING // fdbwaker_wake drop // fdbwaker_wake drop // fdb_future_callback ``` On the other hand if the `FDBWaker`'s closure doesn't set a new callback in the simulation and finishes it returns `Poll::Ready(())`. The `Waker` was not registered and thus not cloned either. The ref count decreases 2 times and reaches 0, freeing `FDBWaker` and its Pinned Future. ``` wake true // fdbwaker_wake clone // fdbwaker_wake POLL // FdbFuture::Poll READY // FdbFuture::Poll => no FdbFuture to poll anymore READY // FdbFuture::Poll drop // fdbwaker_wake drop // fdb_future_callback DROPPED // FDBWaker::drop ```