| Crates.io | observable-property |
| lib.rs | observable-property |
| version | 0.4.1 |
| created_at | 2025-08-17 12:59:36.368613+00 |
| updated_at | 2025-12-17 13:58:54.574616+00 |
| description | A thread-safe observable property implementation for Rust |
| homepage | https://github.com/snoekiede/ObservableProperties |
| repository | https://github.com/snoekiede/ObservableProperties |
| max_upload_size | |
| id | 1799395 |
| size | 202,139 |
Arc<RwLock<>> for safe concurrent accesswith_max_threads()unwrap() calls - all errors are handled gracefullyA thread-safe observable property implementation for Rust that allows you to observe changes to values across multiple threads. Built with comprehensive error handling and no unwrap() calls for maximum reliability.
Add this to your Cargo.toml:
[dependencies]
observable-property = "0.3.5"
use observable_property::ObservableProperty;
use std::sync::Arc;
fn main() -> Result<(), observable_property::PropertyError> {
// Create an observable property
let property = ObservableProperty::new(42);
// Subscribe to changes
let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
println!("Value changed from {} to {}", old_value, new_value);
})).map_err(|e| {
eprintln!("Failed to subscribe: {}", e);
e
})?;
// Change the value (triggers observer)
property.set(100).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?; // Prints: Value changed from 42 to 100
// Unsubscribe when done
property.unsubscribe(observer_id).map_err(|e| {
eprintln!("Failed to unsubscribe: {}", e);
e
})?;
Ok(())
}
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::thread;
fn main() -> Result<(), observable_property::PropertyError> {
let property = Arc::new(ObservableProperty::new(0));
let property_clone = property.clone();
// Subscribe from one thread
property.subscribe(Arc::new(|old, new| {
println!("Value changed: {} -> {}", old, new);
})).map_err(|e| {
eprintln!("Failed to subscribe: {}", e);
e
})?;
// Modify from another thread
thread::spawn(move || {
if let Err(e) = property_clone.set(42) {
eprintln!("Failed to set value: {}", e);
}
}).join().expect("Thread panicked");
Ok(())
}
use observable_property::ObservableProperty;
use std::sync::Arc;
fn main() -> Result<(), observable_property::PropertyError> {
let counter = ObservableProperty::new(0);
// Only notify when value increases
let observer_id = counter.subscribe_filtered(
Arc::new(|old, new| println!("Increased: {} -> {}", old, new)),
|old, new| new > old
).map_err(|e| {
eprintln!("Failed to subscribe: {}", e);
e
})?;
counter.set(5).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?; // Triggers observer: "Increased: 0 -> 5"
counter.set(3).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?; // Does NOT trigger observer
counter.set(10).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?; // Triggers observer: "Increased: 3 -> 10"
Ok(())
}
For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:
use observable_property::ObservableProperty;
use std::sync::Arc;
fn main() -> Result<(), observable_property::PropertyError> {
let property = ObservableProperty::new(0);
{
// Create RAII subscription - automatically cleaned up when dropped
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
println!("Value changed: {} -> {}", old, new);
})).map_err(|e| {
eprintln!("Failed to create subscription: {}", e);
e
})?;
property.set(42).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?; // Prints: "Value changed: 0 -> 42"
// Subscription automatically unsubscribes when leaving this scope
}
// No observer active anymore
property.set(100).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?; // No output
Ok(())
}
Combine filtering with automatic cleanup:
use observable_property::ObservableProperty;
use std::sync::Arc;
fn main() -> Result<(), observable_property::PropertyError> {
let temperature = ObservableProperty::new(20.0);
{
// Monitor temperature increases > 5 degrees with automatic cleanup
let _heat_warning = temperature.subscribe_filtered_with_subscription(
Arc::new(|old, new| {
println!("๐ฅ Heat warning! {:.1}ยฐC -> {:.1}ยฐC", old, new);
}),
|old, new| new > old && (new - old) > 5.0
).map_err(|e| {
eprintln!("Failed to create heat warning subscription: {}", e);
e
})?;
temperature.set(22.0).map_err(|e| {
eprintln!("Failed to set temperature: {}", e);
e
})?; // No warning (only 2ยฐC increase)
temperature.set(28.0).map_err(|e| {
eprintln!("Failed to set temperature: {}", e);
e
})?; // Triggers warning (6ยฐC increase)
// Subscription automatically cleaned up here
}
temperature.set(35.0).map_err(|e| {
eprintln!("Failed to set temperature: {}", e);
e
})?; // No warning (subscription was cleaned up)
Ok(())
}
For observers that might perform time-consuming operations, use async notifications to avoid blocking:
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::time::Duration;
fn main() -> Result<(), observable_property::PropertyError> {
let property = ObservableProperty::new(0);
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
// This slow observer won't block the caller
std::thread::sleep(Duration::from_millis(100));
println!("Slow observer: {} -> {}", old, new);
})).map_err(|e| {
eprintln!("Failed to create subscription: {}", e);
e
})?;
// This returns immediately even though observer is slow
property.set_async(42).map_err(|e| {
eprintln!("Failed to set value asynchronously: {}", e);
e
})?;
// Continue with other work while observers run in background
println!("This prints immediately!");
Ok(())
}
Customize the thread pool size for async notifications based on your system requirements:
use observable_property::ObservableProperty;
use std::sync::Arc;
fn main() -> Result<(), observable_property::PropertyError> {
// For high-throughput systems (more CPU cores)
let high_perf_property = ObservableProperty::with_max_threads(0, 8);
// For resource-constrained systems (embedded/mobile)
let low_resource_property = ObservableProperty::with_max_threads(42, 1);
// For I/O-heavy observers (network/database operations)
let io_heavy_property = ObservableProperty::with_max_threads("data".to_string(), 16);
// Use like any other property
let _subscription = high_perf_property.subscribe_with_subscription(Arc::new(|old, new| {
println!("High performance: {} -> {}", old, new);
}))?;
// Async notifications will use the configured thread pool
high_perf_property.set_async(100)?;
Ok(())
}
The library uses a comprehensive error system for robust, production-ready error handling. All operations are designed to fail gracefully with meaningful error messages - there are no unwrap() calls that can cause unexpected panics.
The library provides detailed error information through the PropertyError enum:
use observable_property::{ObservableProperty, PropertyError};
use std::sync::Arc;
fn example() -> Result<(), PropertyError> {
let property = ObservableProperty::new(42);
match property.subscribe(Arc::new(|old, new| {
println!("Value: {} -> {}", old, new);
})) {
Ok(observer_id) => {
// Successfully subscribed
property.set(100)?;
property.unsubscribe(observer_id)?;
}
Err(PropertyError::PoisonedLock) => {
eprintln!("Property lock was poisoned by a panic in another thread");
}
Err(PropertyError::WriteLockError { context }) => {
eprintln!("Failed to acquire write lock: {}", context);
}
Err(e) => {
eprintln!("Other error: {}", e);
}
}
Ok(())
}
The library is designed to handle edge cases gracefully:
// Even if a lock is poisoned, operations fail gracefully
match property.subscribe_with_subscription(observer) {
Ok(_subscription) => println!("Successfully subscribed"),
Err(PropertyError::PoisonedLock) => {
// Handle gracefully - no panics, clear error message
eprintln!("Property is in an invalid state due to a previous panic");
// Can still safely continue program execution
}
Err(e) => eprintln!("Other error: {}", e),
}
The library provides two approaches for managing observer subscriptions:
let observer_id = property.subscribe(observer)?;
// ... use the property
property.unsubscribe(observer_id)?; // Manual cleanup required
let _subscription = property.subscribe_with_subscription(observer)?;
// ... use the property
// Automatic cleanup when _subscription goes out of scope
Benefits of RAII subscriptions:
set_async() to run observers in background threads.RwLock internally. Consider having fewer, larger properties rather than many small ones.Arc<dyn Fn> and kept until unsubscribed or subscription is dropped.All operations are thread-safe with comprehensive error handling:
unwrap() calls - all potential failure points use proper error handlingPrefer subscribe_with_subscription() and subscribe_filtered_with_subscription() over manual subscription management:
// โ
Recommended: RAII subscription
let _subscription = property.subscribe_with_subscription(observer)?;
// Automatically cleaned up
// โ Discouraged: Manual management (error-prone)
let id = property.subscribe(observer)?;
property.unsubscribe(id)?; // Easy to forget or miss in error paths
Use block scoping for temporary subscriptions:
{
let _temp_subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
println!("Temporary monitoring: {} -> {}", old, new);
}))?;
// Do some work with monitoring active
property.set(42)?;
// Subscription automatically ends here
}
// No more monitoring
Keep observer functions lightweight for better performance:
// โ
Good: Lightweight observer
let _subscription = property.subscribe_with_subscription(Arc::new(|_, new| {
log::info!("Value updated to {}", new);
}))?;
// โ Avoid: Heavy computation in observer
let _subscription = property.subscribe_with_subscription(Arc::new(|_, new| {
// This blocks all other observers!
expensive_computation(*new);
}))?;
// โ
Better: Use async for heavy work
property.set_async(new_value)?; // Non-blocking
The library is production-ready with robust error handling. Always handle potential errors:
Key benefits:
unwrap() calls that could panic unexpectedlymatch property.subscribe_with_subscription(observer) {
Ok(_subscription) => {
// Use subscription
}
Err(PropertyError::PoisonedLock) => {
// Handle poisoned lock scenario
eprintln!("Property is in invalid state");
}
Err(e) => {
eprintln!("Failed to create subscription: {}", e);
}
}
If you're upgrading from manual subscription management:
// Before (manual management)
let observer_id = property.subscribe(Arc::new(|old, new| {
println!("Value: {} -> {}", old, new);
}))?;
// ... do work
property.unsubscribe(observer_id)?;
// After (RAII management)
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
println!("Value: {} -> {}", old, new);
}))?;
// ... do work
// Automatic cleanup!
with_max_threads() constructor allows custom thread limits for async notificationsunwrap() calls: Replaced with proper error handling using expect() with descriptive messagesThe library now provides both robust error handling and configurable performance tuning, making it suitable for a wide range of production environments from embedded systems to high-throughput servers.
Contributions are welcome! Please feel free to submit a Pull Request.
This software is provided "as-is" without any express or implied warranties. While every effort has been made to ensure reliability and correctness, the authors and contributors make no guarantees regarding the software's performance, suitability for any particular purpose, or freedom from defects. Use this library at your own risk.
Users are responsible for:
The comprehensive error handling and extensive test suite are designed to promote reliability, but do not constitute a warranty or guarantee of correctness.
This project is licensed under either the:
at your option.