Crates.io | observable-property-tokio |
lib.rs | observable-property-tokio |
version | 0.1.3 |
created_at | 2025-08-27 10:53:41.309098+00 |
updated_at | 2025-08-27 11:04:21.019819+00 |
description | A thread-safe, async-compatible observable property implementation for Rust using Tokio |
homepage | https://github.com/snoekiede/ObservablePropertiesTokio |
repository | https://github.com/snoekiede/ObservablePropertiesTokio |
max_upload_size | |
id | 1812453 |
size | 84,873 |
A thread-safe, async-compatible observable property implementation for Rust that allows you to observe changes to values using Tokio for asynchronous operations. This crate is inspired by the observer pattern and is designed to work seamlessly in multi-threaded environments.
This crate is provided "as is", without warranty of any kind, express or implied. The authors and contributors are not responsible for any damages or liability arising from the use of this software. While efforts have been made to ensure the crate functions correctly, it may contain bugs or issues in certain scenarios.
Arc<dyn Fn>
which may have memory overhead considerationsResult
types - proper error handling is essentialPerformance characteristics may vary depending on system configuration, observer complexity, and concurrency patterns. The observer pattern implementation may introduce overhead in systems with very high frequency property changes or large numbers of observers.
By using this crate, you acknowledge that you have read and understood this disclaimer.
Arc<RwLock<>>
for safe concurrent accessClone + Send + Sync
typeResult
types instead of panickingAdd this to your Cargo.toml
:
[dependencies]
observable-property-tokio = "0.1.0"
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "macros", "time"] }
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
// Create an observable property
let property = ObservableProperty::new(42);
// Subscribe to changes
let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
println!("Value changed from {} to {}", old_value, new_value);
}))?;
// Change the value (triggers observer)
property.set(100)?;
// For async notification (uses Tokio)
property.set_async(200).await?;
// Unsubscribe when done
property.unsubscribe(observer_id)?;
Ok(())
}
use observable_property_tokio::ObservableProperty;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let property = ObservableProperty::new(0);
// Subscribe with an async handler
property.subscribe_async(|old, new| async move {
// Simulate async work
sleep(Duration::from_millis(100)).await;
println!("Async observer: {} -> {}", old, new);
})?;
property.set_async(42).await?;
// Give time for async observers to complete
sleep(Duration::from_millis(200)).await;
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let property = ObservableProperty::new(0);
// Only notify when value increases
property.subscribe_filtered(
Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
|old, new| new > old
)?;
property.set(10)?; // Triggers observer (0 -> 10)
property.set(5)?; // Does NOT trigger observer (10 -> 5)
property.set(15)?; // Triggers observer (5 -> 15)
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
use tokio::task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let property = Arc::new(ObservableProperty::new(0));
let property_clone = property.clone();
// Subscribe from one task
property.subscribe(Arc::new(|old, new| {
println!("Value changed: {} -> {}", old, new);
}))?;
// Modify from another task
let handle = task::spawn(async move {
property_clone.set(42).map_err(|e| format!("Failed to set: {}", e))
});
handle.await??;
Ok(())
}
The crate includes several examples demonstrating different usage patterns:
basic_usage.rs
- Simple property observationfiltered_observers.rs
- Conditional observersasync_observers.rs
- Asynchronous observer handlersmulti_threading.rs
- Concurrent access patternsRun examples with:
cargo run --example basic_usage
cargo run --example filtered_observers
cargo run --example async_observers
cargo run --example multi_threading
ObservableProperty<T>
- The main observable property typePropertyError
- Error type returned by all operationsObserver<T>
- Type alias for observer functions: Arc<dyn Fn(&T, &T) + Send + Sync>
ObserverId
- Unique identifier for observersnew(initial_value: T)
- Create a new observable propertyget() -> Result<T, PropertyError>
- Get current valueset(new_value: T) -> Result<(), PropertyError>
- Set value synchronouslyset_async(new_value: T) -> Result<(), PropertyError>
- Set value asynchronouslysubscribe(observer: Observer<T>) -> Result<ObserverId, PropertyError>
- Add observersubscribe_async<F, Fut>(handler: F) -> Result<ObserverId, PropertyError>
- Add async observersubscribe_filtered<F>(observer: Observer<T>, filter: F) -> Result<ObserverId, PropertyError>
- Add filtered observerunsubscribe(id: ObserverId) -> Result<bool, PropertyError>
- Remove observerset_async()
, which provides good isolation but may have overhead for many observersArc<dyn Fn>
which has some memory overheadRwLock
which allows multiple readers but exclusive writersContributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
Licensed under either
Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
This crate is a rework of the original observable-property
crate to use Tokio instead of std::thread
for better async compatibility.
If you encounter any issues or have questions:
Remember: This software comes with no warranty. Test thoroughly before production use.