| Crates.io | observable-property-tokio |
| lib.rs | observable-property-tokio |
| version | 0.2.0 |
| created_at | 2025-08-27 10:53:41.309098+00 |
| updated_at | 2025-09-12 09:41:49.910233+00 |
| description | A thread-safe, async-compatible observable property implementation for Rust using Tokio |
| homepage | https://github.com/snoekiede/ObservablePropertiesTokio |
| repository | https://github.com/snoekiede/ObservablePropertiesTokio |
| max_upload_size | |
| id | 1812453 |
| size | 131,072 |
A thread-safe, async-compatible observable property implementation for Rust that allows you to observe changes to values using Tokio for asynchronous operations. This crate is inspired by the observer pattern and is designed to work seamlessly in multi-threaded environments.
This crate is provided "as is", without warranty of any kind, express or implied. The authors and contributors are not responsible for any damages or liability arising from the use of this software. While efforts have been made to ensure the crate functions correctly, it may contain bugs or issues in certain scenarios.
Arc<dyn Fn> which may have memory overhead considerationsResult types - proper error handling is essentialPerformance characteristics may vary depending on system configuration, observer complexity, and concurrency patterns. The observer pattern implementation may introduce overhead in systems with very high frequency property changes or large numbers of observers.
By using this crate, you acknowledge that you have read and understood this disclaimer.
Arc<RwLock<>> for safe concurrent accessClone + Send + Sync typeResult types instead of panickingget_ref()Add this to your Cargo.toml:
[dependencies]
observable-property-tokio = "0.2.0"
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "macros", "time"] }
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
// Create an observable property
let property = ObservableProperty::new(42);
// Subscribe to changes
let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
println!("Value changed from {} to {}", old_value, new_value);
}))?;
// Change the value (triggers observer)
property.set(100)?;
// For async notification (uses Tokio)
property.set_async(200).await?;
// Unsubscribe when done
property.unsubscribe(observer_id)?;
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let property = ObservableProperty::new(0);
{
// Create a subscription that automatically unsubscribes when dropped
let _subscription = property.subscribe_with_token(Arc::new(|old, new| {
println!("Value changed: {} -> {}", old, new);
}))?;
property.set(42)?; // Observer is notified
} // _subscription is dropped here, automatically unsubscribing
property.set(100)?; // Observer is no longer notified
Ok(())
}
use observable_property_tokio::ObservableProperty;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let property = ObservableProperty::new(0);
// Subscribe with an async handler
property.subscribe_async(|old, new| async move {
// Simulate async work
sleep(Duration::from_millis(100)).await;
println!("Async observer: {} -> {}", old, new);
})?;
property.set_async(42).await?;
// Give time for async observers to complete
sleep(Duration::from_millis(200)).await;
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let property = ObservableProperty::new(0);
// Only notify when value increases
property.subscribe_filtered(
Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
|old, new| new > old
)?;
property.set(10)?; // Triggers observer (0 -> 10)
property.set(5)?; // Does NOT trigger observer (10 -> 5)
property.set(15)?; // Triggers observer (5 -> 15)
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let original = ObservableProperty::new(42);
// Create a derived property that doubles the value
let doubled = original.map(|value| value * 2);
assert_eq!(doubled.get()?, 84);
// Create a derived property that converts to string
let as_string = original.map(|value| format!("Value: {}", value));
assert_eq!(as_string.get()?, "Value: 42");
// When original changes, all derived properties update automatically
original.set(10)?;
assert_eq!(doubled.get()?, 20);
assert_eq!(as_string.get()?, "Value: 10");
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
use tokio::task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let property = Arc::new(ObservableProperty::new(0));
let property_clone = property.clone();
// Subscribe from one task
property.subscribe(Arc::new(|old, new| {
println!("Value changed: {} -> {}", old, new);
}))?;
// Modify from another task
let handle = task::spawn(async move {
property_clone.set(42).map_err(|e| format!("Failed to set: {}", e))
});
handle.await??;
Ok(())
}
The crate includes several examples demonstrating different usage patterns:
basic_usage.rs - Simple property observationfiltered_observers.rs - Conditional observersasync_observers.rs - Asynchronous observer handlersmulti_threading.rs - Concurrent access patternssubscription_token.rs - RAII-style automatic subscription cleanupproperty_mapping.rs - Creating derived properties with transformationscomplex_data_type.rs - Using with complex data structuresreference_and_async_filtered.rs - Non-cloning access and async filtered subscriptionsRun examples with:
cargo run --example basic_usage
cargo run --example subscription_token
cargo run --example property_mapping
# ... and so on for other examples
ObservableProperty<T> - The main observable property typePropertyError - Error type returned by all operationsObserver<T> - Type alias for observer functions: Arc<dyn Fn(&T, &T) + Send + Sync>ObserverId - Unique identifier for observersSubscription<T> - RAII subscription handle for automatic cleanupnew(initial_value: T) - Create a new observable propertyget() -> Result<T, PropertyError> - Get current value (clones the value)get_ref() -> impl Deref<Target = T> - Get reference to current value (no cloning)set(new_value: T) -> Result<(), PropertyError> - Set value synchronouslyset_async(new_value: T) -> Result<(), PropertyError> - Set value asynchronouslyupdate(F: FnOnce(T) -> T) - Update value using a functionupdate_async(F: FnOnce(T) -> T) - Update value asynchronously using a functionsubscribe(observer: Observer<T>) -> Result<ObserverId, PropertyError> - Add observersubscribe_with_token(observer: Observer<T>) -> Result<Subscription<T>, PropertyError> - Add observer with automatic cleanupsubscribe_async<F, Fut>(handler: F) -> Result<ObserverId, PropertyError> - Add async observersubscribe_async_with_token<F, Fut>(handler: F) -> Result<Subscription<T>, PropertyError> - Add async observer with automatic cleanupsubscribe_filtered<F>(observer: Observer<T>, filter: F) -> Result<ObserverId, PropertyError> - Add filtered observersubscribe_filtered_with_token<F>(observer: Observer<T>, filter: F) -> Result<Subscription<T>, PropertyError> - Add filtered observer with automatic cleanupsubscribe_async_filtered<F, Fut, Filt>(handler: F, filter: Filt) -> Result<ObserverId, PropertyError> - Add async filtered observersubscribe_async_filtered_with_token<F, Fut, Filt>(handler: F, filter: Filt) -> Result<Subscription<T>, PropertyError> - Add async filtered observer with automatic cleanupunsubscribe(id: ObserverId) -> Result<(), PropertyError> - Remove observertry_unsubscribe(id: ObserverId) -> bool - Remove observer (ignores non-existent IDs)map<U, F>(&self, transform: F) -> ObservableProperty<U> - Create derived property with transformationobserver_count() -> usize - Get the number of active observersset_async(), which provides good isolation but may have overhead for many observersArc<dyn Fn> which has some memory overheadRwLock which allows multiple readers but exclusive writersget_ref() over get() to avoid cloning when appropriateContributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
Licensed under either
Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
This crate is a rework of the original observable-property crate to use Tokio instead of std::thread for better async compatibility.
If you encounter any issues or have questions:
Remember: This software comes with no warranty. Test thoroughly before production use.