| Crates.io | observable-property-tokio |
| lib.rs | observable-property-tokio |
| version | 0.4.0 |
| created_at | 2025-08-27 10:53:41.309098+00 |
| updated_at | 2025-12-17 15:02:48.695429+00 |
| description | A thread-safe, async-compatible observable property implementation for Rust using Tokio |
| homepage | https://github.com/snoekiede/ObservablePropertiesTokio |
| repository | https://github.com/snoekiede/ObservablePropertiesTokio |
| max_upload_size | |
| id | 1812453 |
| size | 277,346 |
A thread-safe, async-compatible observable property implementation for Rust that allows you to observe changes to values using Tokio for asynchronous operations. This crate is inspired by the observer pattern and is designed to work seamlessly in multi-threaded environments.
This crate is provided "as is", without warranty of any kind, express or implied. The authors and contributors are not responsible for any damages or liability arising from the use of this software. While efforts have been made to ensure the crate functions correctly, it may contain bugs or issues in certain scenarios.
Arc<dyn Fn> which may have memory overhead considerationsResult types - proper error handling is essentialclear_observers() or shutdown() methods for proper cleanup in productionPerformance characteristics may vary depending on system configuration, observer complexity, and concurrency patterns. The observer pattern implementation may introduce overhead in systems with very high frequency property changes or large numbers of observers.
By using this crate, you acknowledge that you have read and understood this disclaimer.
Arc<RwLock<>> for safe concurrent access with optimized lockingClone + Send + Sync typeResult types instead of panickingAdd this to your Cargo.toml:
[dependencies]
observable-property-tokio = "0.3.0"
tokio = { version = "1.48.0", features = ["rt", "rt-multi-thread", "macros", "time"] }
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
// Create an observable property
let property = ObservableProperty::new(42);
// Subscribe to changes
let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
println!("Value changed from {} to {}", old_value, new_value);
}))?;
// Change the value (triggers observer)
property.set(100)?;
// For async notification (uses Tokio)
property.set_async(200).await?;
// Unsubscribe when done
property.unsubscribe(observer_id)?;
// Or clear all observers at once
property.clear_observers()?;
Ok(())
}
use observable_property_tokio::ObservableProperty;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let property = ObservableProperty::new(0);
// Subscribe with an async handler
property.subscribe_async(|old, new| async move {
// Simulate async work
sleep(Duration::from_millis(100)).await;
println!("Async observer: {} -> {}", old, new);
})?;
property.set_async(42).await?;
// Give time for async observers to complete
sleep(Duration::from_millis(200)).await;
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let property = ObservableProperty::new(0);
// Only notify when value increases
property.subscribe_filtered(
Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
|old, new| new > old
)?;
property.set(10)?; // Triggers observer (0 -> 10)
property.set(5)?; // Does NOT trigger observer (10 -> 5)
property.set(15)?; // Triggers observer (5 -> 15)
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let original = ObservableProperty::new(42);
// Create a derived property that doubles the value
let doubled = original.map(|value| value * 2)?;
assert_eq!(doubled.get()?, 84);
// Create a derived property that converts to string
let as_string = original.map(|value| format!("Value: {}", value))?;
assert_eq!(as_string.get()?, "Value: 42");
// When original changes, all derived properties update automatically
original.set(10)?;
assert_eq!(doubled.get()?, 20);
assert_eq!(as_string.get()?, "Value: 10");
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
use tokio::task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let property = Arc::new(ObservableProperty::new(0));
let property_clone = property.clone();
// Subscribe from one task
property.subscribe(Arc::new(|old, new| {
println!("Value changed: {} -> {}", old, new);
}))?;
// Modify from another task
let handle = task::spawn(async move {
property_clone.set(42).map_err(|e| format!("Failed to set: {}", e))
});
handle.await??;
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let property = ObservableProperty::new("active".to_string());
// Register observers for normal operation
property.subscribe(Arc::new(|old, new| {
println!("Status changed: {} -> {}", old, new);
}))?;
property.subscribe_async(|old, new| async move {
// Simulate async processing
println!("Async processing: {} -> {}", old, new);
})?;
println!("Observer count: {}", property.observer_count());
// Clear all observers when needed
property.clear_observers()?;
println!("Observer count after clear: {}", property.observer_count());
// Or perform comprehensive shutdown
property.shutdown()?;
// Property can still be used after cleanup, but no observers will be notified
property.set("inactive".to_string())?;
Ok(())
}
use observable_property_tokio::{ObservableProperty, PropertyConfig};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
// Configure limits to prevent resource exhaustion
let config = PropertyConfig {
max_observers: 100, // Maximum number of observers
max_pending_notifications: 50, // Reserved for future use
observer_timeout_ms: 5000, // Reserved for future use
max_concurrent_async_tasks: 100, // Maximum concurrent async tasks (default: 100)
};
let property = ObservableProperty::new_with_config(0, config);
// Subscribe observers as normal
for i in 0..100 {
property.subscribe(Arc::new(move |_, new| {
println!("Observer {} notified: {}", i, new);
}))?;
}
// The 101st subscription will fail with CapacityExceeded error
match property.subscribe(Arc::new(|_, _| {})) {
Ok(_) => println!("Subscribed successfully"),
Err(e) => {
eprintln!("Subscription failed: {}", e);
eprintln!("Diagnostic: {}", e.diagnostic_info());
// Output: CAPACITY_EXCEEDED | resource=observers | current=100 | max=100 | utilization=100.0%
}
}
Ok(())
}
use observable_property_tokio::ObservableProperty;
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
let property = ObservableProperty::new(0);
// Add observers
property.subscribe(Arc::new(|old, new| {
println!("Value changed: {} -> {}", old, new);
}))?;
property.subscribe_async(|old, new| async move {
println!("Async observer: {} -> {}", old, new);
})?;
println!("Observers: {}", property.observer_count());
// ... application running ...
// Graceful shutdown with timeout
let report = property.shutdown_with_timeout(Duration::from_secs(30)).await?;
println!("Shutdown complete:");
println!(" - Observers cleared: {}", report.observers_cleared);
println!(" - Duration: {:?}", report.shutdown_duration);
println!(" - Within timeout: {}", report.completed_within_timeout);
println!(" - Diagnostic: {}", report.diagnostic_info());
// Monitor shutdown duration
if report.shutdown_duration > Duration::from_secs(10) {
eprintln!("WARNING: Shutdown took longer than expected");
}
Ok(())
}
use observable_property_tokio::{ObservableProperty, PropertyConfig};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
// Configure connection pooling to limit concurrent async tasks
let config = PropertyConfig {
max_observers: 1000,
max_pending_notifications: 100,
observer_timeout_ms: 5000,
max_concurrent_async_tasks: 5, // Only 5 async tasks run concurrently
};
let property = ObservableProperty::new_with_config(0, config);
let concurrent_count = Arc::new(AtomicUsize::new(0));
let max_concurrent = Arc::new(AtomicUsize::new(0));
// Subscribe 20 async observers that each take 100ms
for _ in 0..20 {
let counter = Arc::clone(&concurrent_count);
let max_counter = Arc::clone(&max_concurrent);
property.subscribe_async(move |old, new| {
let counter = Arc::clone(&counter);
let max_counter = Arc::clone(&max_counter);
async move {
let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
max_counter.fetch_max(current, Ordering::SeqCst);
// Simulate async work (database query, HTTP request, etc.)
sleep(Duration::from_millis(100)).await;
counter.fetch_sub(1, Ordering::SeqCst);
println!("Observer executed: {} -> {}", old, new);
}
})?;
}
// Trigger notification to all 20 observers
property.set_async(42).await?;
// Wait for all tasks to complete
sleep(Duration::from_millis(500)).await;
// Verify max concurrent was not exceeded
let max_reached = max_concurrent.load(Ordering::SeqCst);
println!("Max concurrent tasks: {} (limit: 5)", max_reached);
assert!(max_reached <= 5, "Connection pool limit was respected");
Ok(())
}
Benefits of Connection Pooling:
subscribe_async() and subscribe_async_filtered()max_concurrent_async_tasks based on your workloadProduction Recommendations:
The crate includes several examples demonstrating different usage patterns:
basic_usage.rs - Simple property observationfiltered_observers.rs - Conditional observersasync_observers.rs - Asynchronous observer handlersmulti_threading.rs - Concurrent access patternsbatching.rs - Batching for high-frequency updates with 10 scenariosbackpressure.rs - Backpressure and rate limiting with configurable limitsconnection_pooling.rs - Connection pooling for async tasks to prevent resource exhaustiongraceful_shutdown.rs - Graceful shutdown with timeout and diagnosticssubscription_token.rs - RAII-style automatic subscription cleanupproperty_mapping.rs - Creating derived properties with transformationscomplex_data_type.rs - Using with complex data structuresreference_and_async_filtered.rs - Non-cloning access and async filtered subscriptionserror_diagnostics.rs - Error diagnostic features for production monitoringRun examples with:
cargo run --example basic_usage
cargo run --example batching
cargo run --example backpressure
cargo run --example graceful_shutdown
cargo run --example subscription_token
cargo run --example property_mapping
cargo run --example error_diagnostics
# ... and so on for other examples
ObservableProperty<T> - The main observable property typeBatchedProperty<T> - Batched wrapper for reducing high-frequency update overheadPropertyConfig - Configuration for backpressure and resource limitsBatchConfig - Configuration for batching intervalsShutdownReport - Diagnostic report from graceful shutdown operationsPropertyError - Error type returned by all operations with diagnostic capabilitiesObserver<T> - Type alias for observer functions: Arc<dyn Fn(&T, &T) + Send + Sync>ObserverId - Unique identifier for observersSubscription<T> - RAII subscription handle for automatic cleanupnew(initial_value: T) - Create a new observable property with default configurationnew_with_config(initial_value: T, config: PropertyConfig) - Create with custom configurationget() -> Result<T, PropertyError> - Get current value (clones the value)get_ref() -> impl Deref<Target = T> - Get reference to current value (no cloning)set(new_value: T) -> Result<(), PropertyError> - Set value synchronouslyset_async(new_value: T) -> Result<(), PropertyError> - Set value asynchronouslyupdate(F: FnOnce(T) -> T) - Update value using a functionupdate_async(F: FnOnce(T) -> T) - Update value asynchronously using a functionsubscribe(observer: Observer<T>) -> Result<ObserverId, PropertyError> - Add observersubscribe_with_token(observer: Observer<T>) -> Result<Subscription<T>, PropertyError> - Add observer with automatic cleanupsubscribe_async<F, Fut>(handler: F) -> Result<ObserverId, PropertyError> - Add async observersubscribe_async_with_token<F, Fut>(handler: F) -> Result<Subscription<T>, PropertyError> - Add async observer with automatic cleanupsubscribe_filtered<F>(observer: Observer<T>, filter: F) -> Result<ObserverId, PropertyError> - Add filtered observerunsubscribe(id: ObserverId) -> Result<bool, PropertyError> - Remove observerobserver_count() -> usize - Get number of registered observersclear_observers() -> Result<(), PropertyError> - Remove all observersshutdown() -> Result<(), PropertyError> - Fast cleanup without waitingshutdown_with_timeout(timeout: Duration) -> Result<ShutdownReport, PropertyError> - Graceful shutdown with diagnostic reportmap<U, F>(transform: F) -> Result<ObservableProperty<U>, PropertyError> - Create derived propertyThe crate provides two shutdown methods for different use cases:
shutdown() - Fast cleanup without waiting:
property.shutdown()?; // Immediately clears all observers
shutdown_with_timeout() - Graceful shutdown with diagnostics:
let report = property.shutdown_with_timeout(Duration::from_secs(30)).await?;
// Access shutdown metrics
println!("Cleared {} observers in {:?}",
report.observers_cleared,
report.shutdown_duration);
// Check if completed within timeout
if !report.completed_within_timeout {
eprintln!("WARNING: Shutdown exceeded timeout");
}
// Get diagnostic string for logging
log::info!("Shutdown: {}", report.diagnostic_info());
The ShutdownReport struct provides comprehensive shutdown metrics:
observers_cleared: usize - Number of observers removedshutdown_duration: Duration - Time taken for shutdowncompleted_within_timeout: bool - Whether shutdown finished in timeinitiated_at_ms: u64 - Timestamp when shutdown starteddiagnostic_info() -> String - Formatted diagnostic stringUse shutdown() for:
Use shutdown_with_timeout() for:
See the graceful_shutdown.rs example for comprehensive demonstrations.
The BatchedProperty type reduces overhead for high-frequency property updates by batching notifications to observers. Instead of notifying observers for every single update, changes are collected and observers are notified only at regular intervals with the latest value.
use observable_property_tokio::{BatchedProperty, BatchConfig};
use std::time::Duration;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), observable_property_tokio::PropertyError> {
// Create a batched property with default 100ms interval
let property = BatchedProperty::new(0);
// Or use custom configuration
let config = BatchConfig {
batch_interval: Duration::from_millis(50),
};
let property = BatchedProperty::new_with_config(0, config);
// Subscribe to batched updates
property.subscribe(Arc::new(|old, new| {
println!("Batched update: {} -> {}", old, new);
}))?;
// Queue multiple updates rapidly
for i in 1..=100 {
property.queue_update(i)?;
}
// Observers will be notified once per batch interval
// with the latest value (100 in this case)
tokio::time::sleep(Duration::from_millis(150)).await;
Ok(())
}
Low-latency (50ms):
Balanced (100ms - default):
High-throughput (200ms):
Aggressive batching (500ms+):
When you need immediate updates, bypass batching:
// Queue for batching (default behavior)
property.queue_update(value)?;
// Set immediately, bypassing batching
property.set_immediate(critical_value)?;
// Or with async notification
property.set_immediate_async(critical_value).await?;
// Manual flush of pending updates
property.flush().await?;
From the batching.rs example:
Unbatched: 1000 updates
- Time: ~280ยตs
- Notifications: 1000
Batched: 1000 updates
- Time: ~30ยตs
- Notifications: 1
- Queue speedup: ~9x faster
- Notification reduction: 1000x
Real-time dashboards:
Sensor data:
Game state:
Analytics/logging:
BatchedProperty Methods:
new(value) - Create with default 100ms intervalnew_with_config(value, config) - Custom configurationqueue_update(value) - Queue for batchingset_immediate(value) - Bypass batching (sync)set_immediate_async(value) - Bypass batching (async)flush() - Force immediate flush of pending updatesget() - Get current valuesubscribe() / subscribe_async() - Add observersunsubscribe() / clear_observers() - Remove observersBatchConfig:
batch_interval: Duration - How often to flush batched updates (default: 100ms)See the comprehensive batching.rs example for 10 different scenarios and use cases.
The crate provides PropertyConfig to control resource usage and prevent exhaustion:
use observable_property_tokio::{ObservableProperty, PropertyConfig};
let config = PropertyConfig {
max_observers: 100, // Maximum number of observers (default: 1000)
max_pending_notifications: 50, // Reserved for future use (default: 100)
observer_timeout_ms: 5000, // Reserved for future use (default: 5000)
};
let property = ObservableProperty::new_with_config(initial_value, config);
When the maximum number of observers is reached, additional subscriptions will fail with a PropertyError::CapacityExceeded error:
match property.subscribe(observer) {
Ok(id) => {
// Successfully subscribed
log::info!("Observer {} subscribed", id);
}
Err(PropertyError::CapacityExceeded { current, max, resource }) => {
// Handle capacity limit gracefully
log::warn!("Cannot add observer: {}/{} {} in use", current, max, resource);
// Consider queuing the subscription or rejecting the request
}
Err(e) => {
log::error!("Subscription error: {}", e.diagnostic_info());
}
}
See the backpressure.rs example for comprehensive demonstrations.
set_async(), which provides good isolation but may have overhead for many observersArc<dyn Fn> which has some memory overheadmax_observers based on your expected load to balance flexibility and resource protectionRwLock which allows multiple readers but exclusive writersThe crate provides comprehensive error diagnostics for production monitoring and debugging.
All operations return Result<T, PropertyError> with detailed error variants:
ReadLockError / WriteLockError - Lock acquisition failures with operation context and timestampsLockPoisoned - Poisoned lock detection with operation contextObserverNotFound - Observer ID not found during unsubscribeObserverPanic - Observer function panicked with error details and observer IDObserverError - Observer execution failureTokioError - Tokio runtime errorsJoinError - Task join failuresCapacityExceeded - Resource limits exceeded with utilization metricsOperationTimeout - Operation exceeded threshold with timing detailsShutdownInProgress - Property is shutting downEach error provides a diagnostic_info() method that returns structured diagnostic data:
use observable_property_tokio::{ObservableProperty, PropertyError};
let property = ObservableProperty::new(42);
match property.set(100) {
Ok(_) => println!("Value updated"),
Err(e) => {
// Get human-readable error
eprintln!("Error: {}", e);
// Get structured diagnostic information for logging
eprintln!("Diagnostic: {}", e.diagnostic_info());
// Example output:
// "OPERATION_TIMEOUT | operation=notify_observers | elapsed_ms=5500 | threshold_ms=5000 | overage_ms=500"
}
}
The crate provides helper functions for creating errors with automatic timestamp injection:
// Automatically includes current timestamp
let error = PropertyError::read_lock_error("get_value", "lock acquisition failed");
let error = PropertyError::write_lock_error("set_value", "lock acquisition failed");
let error = PropertyError::lock_poisoned("notify", "inner lock poisoned");
let error = PropertyError::observer_panic(observer_id, "panic message");
The diagnostic information is designed for easy integration with logging frameworks:
// With the log crate
log::error!("Property operation failed: {}", error.diagnostic_info());
// With tracing
tracing::error!(
diagnostic = %error.diagnostic_info(),
error = %error,
"Property operation failed"
);
// With custom metrics
if let PropertyError::OperationTimeout { elapsed_ms, .. } = error {
metrics::histogram!("property_operation_latency_ms", elapsed_ms);
}
See error_diagnostics.rs example for comprehensive demonstrations.
This crate has been enhanced for production readiness:
set_async() by properly awaiting task completionmap() method with proper error handlingmap() method now returns Result<ObservableProperty<U>, PropertyError> for enhanced safety
? to existing map() calls: property.map(|x| x * 2)?clear_observers() and shutdown() methods for production cleanupContributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
Licensed under either
Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
This crate is a rework of the original observable-property crate to use Tokio instead of std::thread for better async compatibility.
If you encounter any issues or have questions:
Remember: This software comes with no warranty. Test thoroughly before production use.