

A runtime agnostic implementation of Reactive Extensions
for Rust!
[!IMPORTANT]
Currently this crate does not provide an async executor!
It was primarily developed to be used in the
Bevy game engine, through
rx_bevy.
However, I do want to add additional executors in the future.
Documentation
What makes it different?
- Runtime agnostic implementation.
- Heavy use of GATs to avoid dynamic dispatch and function calls wherever
possible, enabling inlining and optimizations by the compiler.
- Deadlock free execution.
You could even create a subject that subscribes to itself and sends events
on every single value it observes, creating a fractal of subscriptions even
on a single thread. But please don't.
Contents
rx_core is an extensible framework, the rx_core_common crate provides
common types and traits used by all other crates.
It defines what an Observable, Observer, Subscription, Subject, Operator,
Subscriber, and a Scheduler is. How Operators (and ComposableOperators) are
piped together. And how Subscriptions and Subscribers avoid deadlocking
situations in single-threaded situations by deferring notifications.
Observables
Observables define a stream of emissions that is instantiated upon subscription.
- Creation:
- CreateObservable -
Define your own function that will interact with the subscriber!
- DeferredObservable -
Subscribes to an observable returned by a function.
- Immediate Observables:
- Miscellaneous Observables:
- NeverObservable -
Never emits, never unsubscribes! Only once dropped!
Warning: you need to handle subscriptions made to this yourself!
- Combination (Multi-Signal):
- CombineChangesObservable -
Subscribes to two different observables, and emit the latest of both both
values when either of them emits. It denotes which one had changed, and it
emits even when one on them haven't emitted yet.
- CombineLatestObservable -
Subscribes to two observables, and emits the latest of both values when
either of them emits. It only starts emitting once both have emitted at
least once.
- ZipObservable -
Subscribes to two different observables, and emit both values when both
of them emits, pairing up emissions by the order they happened.
- JoinObservable -
Subscribes to two different observables, and emit the latest of both values
once both of them had completed!
- Combination (Single-Signal):
- MergeObservable -
Combine many observables of the same output type into a single observable,
subscribing to all of them at once!
- ConcatObservable -
Combine many observables of the same output type into a single observable,
subscribing to them one-by-one in order!
- Timing:
- Iterators:
- Connectable
- ConnectableObservable -
Maintains an internal connector subject, that can subscribe to a source
observable only when the
connect function is called on it. Subscribers of
will subscribe to this internal connector.
Observers
Observers are the destinations of subscriptions! They are the last stations
of a signal.
- PrintObserver -
A simple observer that prints all signals to the console using
println!.
- FnObserver -
A custom observer that uses user-supplied functions to handle signals.
All signal handlers must be defined up-front.
- DynFnObserver -
A custom observer that uses user-supplied functions to handle signals.
not all signal handlers have to be defined, but will panic if it observes
an error without an error handler defined.
- NoopObserver -
Ignores all signals. Will panic in debug mode if it observes an error.
Subjects
Subjects are both Observers and Observables at the same time. Subjects
multicast the signals they observe across all subscribers.
- PublishSubject -
Observed signals are forwarded to all active subscribers. It does not replay
values to late subscribers, but terminal state (complete/error) is always
replayed! Other subjects are built on top of this.
- BehaviorSubject -
Always holds a value that is replayed to late subscribers.
- ReplaySubject -
Buffers the last
N values and replays them to late subscribers.
- AsyncSubject -
Reduces observed values into one and emits it to active subscribers once
completed. Once completed, it also replays the result to late subscribers.
- ProvenanceSubject -
A
BehaviorSubject that also stores an additional value that can be used
for filtering. Useful to track the origin of a value as some subscribers may
only be interested in certain origins while some are interested in all values
regardless of origin.
Operators
Operators take an observable as input and return a new observable as output,
enhancing the original observable with new behavior.
- Mapping:
- Filtering Operators (Multi-Signal):
- Filtering Operators (Single-Signal):
- FirstOperator -
Emit the very first value, then complete.
- FindOperator -
Emit the first value matching a predicate, then complete.
- FindIndexOperator -
Emit the index of the first matching value, then complete.
- ElementAtOperator -
Emit the value at the given index then complete.
- IsEmptyOperator -
Emit a single boolean indicating if the source emitted anything before it
had completed.
- Higher-Order (Flatten Observable Observables):
- ConcatAllOperator -
Subscribes to all upstream observables one at a time in order.
- MergeAllOperator -
Subscribes to all upstream observables and merges their emissions
concurrently.
- SwitchAllOperator -
Subscribe to the upstream observable, unsubscribing previous ones.
- ExhaustAllOperator -
Subscribe to the upstream observables only if there is no active
subscription.
- Higher-Order (Mapper)
- ConcatMapOperator -
Maps upstream signals into an observable, then subscribes to them one at a
time in order.
- MergeMapOperator -
Maps upstream signals into an observable, then subscribes to them and merges
their emissions concurrently.
- SwitchMapOperator -
Maps upstream signals into an observable, then subscribes to the latest one,
unsubscribing previous ones.
- ExhaustMapOperator -
Maps upstream signals into an observable, then subscribes to them only if
there is no active subscription.
- Combination:
- Buffering:
- Multicasting:
- ShareOperator -
Multicast a source through a connector so downstream subscribers share one
upstream subscription. The connector can be any subject.
- Accumulator (Multi-Signal):
- ScanOperator -
Accumulate state and emit every intermediate result.
- Accumulator (Single-Signal):
- Side-Effects:
- TapOperator -
Mirror values into another observer while letting them pass through.
- TapNextOperator -
Run a callback for each
next without touching errors or completion.
- OnNextOperator -
Invoke a callback for each value that can also decide whether to forward it.
- OnSubscribeOperator -
Run a callback when a subscription is established.
- FinalizeOperator -
Execute cleanup when the observable finishes or unsubscribes.
- Producing:
- Error Handling:
- Timing Operators:
- Composite Operators:
- CompositeOperator -
Build reusable operator chains without needing a source observable!
- IdentityOperator -
A no-op operator, used mainly as the entry point of a
CompositeOperator.
Macros
For every primitive, there is a derive macro available to ease implementation.
They mostly implement traits defining associated types like Out and
OutError. They may also provide default, trivial implementations for when it
is applicable.
See the individual macros for more information:
Testing
The rx_core_testing crate provides utilities to test your Observables and
Operators.
- MockExecutor & Scheduler - Control the passage of time manually.
- MockObserver & NotificationCollector - Collect all observed notifications
and perform assertions over them.
- TestHarness - Perform more complex assertions to ensure proper behavior.
For Maintainers
See contributing.md