rx_core

Crates.iorx_core
lib.rsrx_core
version0.2.0
created_at2026-01-19 09:19:27.03836+00
updated_at2026-01-24 15:11:56.712132+00
descriptionRuntime Agnostic Reactive Extensions
homepagehttps://github.com/AlexAegis/rx_bevy
repositoryhttps://github.com/AlexAegis/rx_bevy
max_upload_size
id2054074
size826,349
Sandor (AlexAegis)

documentation

https://github.com/AlexAegis/rx_bevy

README

rx_core

crates.io ci codecov license

rx_core

A runtime agnostic implementation of Reactive Extensions for Rust!

[!IMPORTANT] Currently this crate does not provide an async executor! It was primarily developed to be used in the Bevy game engine, through rx_bevy. However, I do want to add additional executors in the future.

Documentation

What makes it different?

  • Runtime agnostic implementation.
  • Heavy use of GATs to avoid dynamic dispatch and function calls wherever possible, enabling inlining and optimizations by the compiler.
  • Deadlock free execution.

    You could even create a subject that subscribes to itself and sends events on every single value it observes, creating a fractal of subscriptions even on a single thread. But please don't.

Contents

rx_core is an extensible framework, the rx_core_common crate provides common types and traits used by all other crates.

It defines what an Observable, Observer, Subscription, Subject, Operator, Subscriber, and a Scheduler is. How Operators (and ComposableOperators) are piped together. And how Subscriptions and Subscribers avoid deadlocking situations in single-threaded situations by deferring notifications.

Observables

Observables define a stream of emissions that is instantiated upon subscription.

  • Creation:
  • Combination (Multi-Signal):
    • CombineChangesObservable - Subscribes to two different observables, and emit the latest of both both values when either of them emits. It denotes which one had changed, and it emits even when one on them haven't emitted yet.
    • CombineLatestObservable - Subscribes to two observables, and emits the latest of both values when either of them emits. It only starts emitting once both have emitted at least once.
    • ZipObservable - Subscribes to two different observables, and emit both values when both of them emits, pairing up emissions by the order they happened.
    • JoinObservable - Subscribes to two different observables, and emit the latest of both values once both of them had completed!
  • Combination (Single-Signal):
    • MergeObservable - Combine many observables of the same output type into a single observable, subscribing to all of them at once!
    • ConcatObservable - Combine many observables of the same output type into a single observable, subscribing to them one-by-one in order!
  • Timing:
  • Iterators:
  • Connectable
    • ConnectableObservable - Maintains an internal connector subject, that can subscribe to a source observable only when the connect function is called on it. Subscribers of will subscribe to this internal connector.

Observers

Observers are the destinations of subscriptions! They are the last stations of a signal.

  • PrintObserver - A simple observer that prints all signals to the console using println!.
  • FnObserver - A custom observer that uses user-supplied functions to handle signals. All signal handlers must be defined up-front.
  • DynFnObserver - A custom observer that uses user-supplied functions to handle signals. not all signal handlers have to be defined, but will panic if it observes an error without an error handler defined.
  • NoopObserver - Ignores all signals. Will panic in debug mode if it observes an error.

Subjects

Subjects are both Observers and Observables at the same time. Subjects multicast the signals they observe across all subscribers.

  • PublishSubject - Observed signals are forwarded to all active subscribers. It does not replay values to late subscribers, but terminal state (complete/error) is always replayed! Other subjects are built on top of this.
  • BehaviorSubject - Always holds a value that is replayed to late subscribers.
  • ReplaySubject - Buffers the last N values and replays them to late subscribers.
  • AsyncSubject - Reduces observed values into one and emits it to active subscribers once completed. Once completed, it also replays the result to late subscribers.
  • ProvenanceSubject - A BehaviorSubject that also stores an additional value that can be used for filtering. Useful to track the origin of a value as some subscribers may only be interested in certain origins while some are interested in all values regardless of origin.

Operators

Operators take an observable as input and return a new observable as output, enhancing the original observable with new behavior.

  • Mapping:
  • Filtering Operators (Multi-Signal):
  • Filtering Operators (Single-Signal):
    • FirstOperator - Emit the very first value, then complete.
    • FindOperator - Emit the first value matching a predicate, then complete.
    • FindIndexOperator - Emit the index of the first matching value, then complete.
    • ElementAtOperator - Emit the value at the given index then complete.
    • IsEmptyOperator - Emit a single boolean indicating if the source emitted anything before it had completed.
  • Higher-Order (Flatten Observable Observables):
    • ConcatAllOperator - Subscribes to all upstream observables one at a time in order.
    • MergeAllOperator - Subscribes to all upstream observables and merges their emissions concurrently.
    • SwitchAllOperator - Subscribe to the upstream observable, unsubscribing previous ones.
    • ExhaustAllOperator - Subscribe to the upstream observables only if there is no active subscription.
  • Higher-Order (Mapper)
    • ConcatMapOperator - Maps upstream signals into an observable, then subscribes to them one at a time in order.
    • MergeMapOperator - Maps upstream signals into an observable, then subscribes to them and merges their emissions concurrently.
    • SwitchMapOperator - Maps upstream signals into an observable, then subscribes to the latest one, unsubscribing previous ones.
    • ExhaustMapOperator - Maps upstream signals into an observable, then subscribes to them only if there is no active subscription.
  • Combination:
  • Buffering:
  • Multicasting:
    • ShareOperator - Multicast a source through a connector so downstream subscribers share one upstream subscription. The connector can be any subject.
  • Accumulator (Multi-Signal):
    • ScanOperator - Accumulate state and emit every intermediate result.
  • Accumulator (Single-Signal):
  • Side-Effects:
    • TapOperator - Mirror values into another observer while letting them pass through.
    • TapNextOperator - Run a callback for each next without touching errors or completion.
    • OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
    • OnSubscribeOperator - Run a callback when a subscription is established.
    • FinalizeOperator - Execute cleanup when the observable finishes or unsubscribes.
  • Producing:
  • Error Handling:
  • Timing Operators:
  • Composite Operators:
    • CompositeOperator - Build reusable operator chains without needing a source observable!
    • IdentityOperator - A no-op operator, used mainly as the entry point of a CompositeOperator.

Macros

For every primitive, there is a derive macro available to ease implementation. They mostly implement traits defining associated types like Out and OutError. They may also provide default, trivial implementations for when it is applicable.

See the individual macros for more information:

Testing

The rx_core_testing crate provides utilities to test your Observables and Operators.

  • MockExecutor & Scheduler - Control the passage of time manually.
  • MockObserver & NotificationCollector - Collect all observed notifications and perform assertions over them.
  • TestHarness - Perform more complex assertions to ensure proper behavior.

For Maintainers

See contributing.md

Commit count: 652

cargo fmt