// Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use std::collections::BTreeSet; use anyhow::Result; use async_trait::async_trait; #[cfg(with_dynamodb)] use linera_views::dynamo_db::DynamoDbStore; #[cfg(with_rocksdb)] use linera_views::rocks_db::RocksDbStore; #[cfg(with_scylladb)] use linera_views::scylla_db::ScyllaDbStore; #[cfg(any(with_scylladb, with_rocksdb, with_dynamodb))] use linera_views::store::AdminKeyValueStore as _; use linera_views::{ batch::{ Batch, WriteOperation, WriteOperation::{Delete, DeletePrefix, Put}, }, collection_view::HashedCollectionView, context::{create_test_memory_context, Context, MemoryContext, ViewContext}, key_value_store_view::{KeyValueStoreView, ViewContainer}, log_view::HashedLogView, lru_caching::{LruCachingMemoryStore, LruCachingStore}, map_view::{ByteMapView, HashedMapView}, memory::MemoryStore, queue_view::HashedQueueView, random::make_deterministic_rng, reentrant_collection_view::HashedReentrantCollectionView, register_view::HashedRegisterView, set_view::HashedSetView, store::TestKeyValueStore as _, test_utils::{ get_random_byte_vector, get_random_key_value_operations, get_random_key_values, span_random_reordering_put_delete, }, views::{CryptoHashRootView, HashableView, Hasher, RootView, View, ViewError}, }; use rand::{Rng, RngCore}; #[derive(CryptoHashRootView)] pub struct StateView { pub x1: HashedRegisterView, pub x2: HashedRegisterView, pub log: HashedLogView, pub map: HashedMapView, pub set: HashedSetView, pub queue: HashedQueueView, pub collection: HashedCollectionView>, pub collection2: HashedCollectionView< C, String, HashedCollectionView>, >, pub collection3: HashedCollectionView>, pub collection4: HashedReentrantCollectionView>, pub key_value_store: KeyValueStoreView, } #[async_trait] pub trait StateStorage { type Context: Context + Clone + Send + Sync + 'static; async fn new() -> Self; async fn load(&mut self, id: usize) -> Result, ViewError>; } pub struct MemoryTestStorage { accessed_chains: BTreeSet, store: MemoryStore, } #[async_trait] impl StateStorage for MemoryTestStorage { type Context = MemoryContext; async fn new() -> Self { let store = MemoryStore::new_test_store().await.unwrap(); MemoryTestStorage { accessed_chains: BTreeSet::new(), store, } } async fn load(&mut self, id: usize) -> Result, ViewError> { self.accessed_chains.insert(id); let base_key = bcs::to_bytes(&id)?; let store = self.store.clone(); let context = Self::Context::new_unsafe(store, base_key, id); StateView::load(context).await } } pub struct KeyValueStoreTestStorage { accessed_chains: BTreeSet, store: ViewContainer>, } #[async_trait] impl StateStorage for KeyValueStoreTestStorage { type Context = ViewContext>>; async fn new() -> Self { let context = create_test_memory_context(); let store = ViewContainer::new(context).await.unwrap(); KeyValueStoreTestStorage { accessed_chains: BTreeSet::new(), store, } } async fn load(&mut self, id: usize) -> Result, ViewError> { self.accessed_chains.insert(id); let base_key = bcs::to_bytes(&id)?; let store = self.store.clone(); let context = Self::Context::new_unsafe(store, base_key, id); StateView::load(context).await } } pub struct LruMemoryStorage { accessed_chains: BTreeSet, store: LruCachingStore, } #[async_trait] impl StateStorage for LruMemoryStorage { type Context = ViewContext; async fn new() -> Self { let store = MemoryStore::new_test_store().await.unwrap(); let cache_size = 1000; let store = LruCachingStore::new(store, cache_size); LruMemoryStorage { accessed_chains: BTreeSet::new(), store, } } async fn load(&mut self, id: usize) -> Result, ViewError> { self.accessed_chains.insert(id); let base_key = bcs::to_bytes(&id)?; let store = self.store.clone(); let context = Self::Context::new_unsafe(store, base_key, id); StateView::load(context).await } } #[cfg(with_rocksdb)] pub struct RocksDbTestStorage { store: RocksDbStore, accessed_chains: BTreeSet, } #[cfg(with_rocksdb)] #[async_trait] impl StateStorage for RocksDbTestStorage { type Context = ViewContext; async fn new() -> Self { let store = RocksDbStore::new_test_store().await.unwrap(); let accessed_chains = BTreeSet::new(); RocksDbTestStorage { store, accessed_chains, } } async fn load(&mut self, id: usize) -> Result, ViewError> { self.accessed_chains.insert(id); let root_key = bcs::to_bytes(&id)?; let store = self.store.clone_with_root_key(&root_key)?; let context = ViewContext::create_root_context(store, id).await?; StateView::load(context).await } } #[cfg(with_scylladb)] pub struct ScyllaDbTestStorage { store: ScyllaDbStore, accessed_chains: BTreeSet, } #[cfg(with_scylladb)] #[async_trait] impl StateStorage for ScyllaDbTestStorage { type Context = ViewContext; async fn new() -> Self { let store = ScyllaDbStore::new_test_store().await.unwrap(); let accessed_chains = BTreeSet::new(); ScyllaDbTestStorage { store, accessed_chains, } } async fn load(&mut self, id: usize) -> Result, ViewError> { self.accessed_chains.insert(id); let root_key = bcs::to_bytes(&id)?; let store = self.store.clone_with_root_key(&root_key)?; let context = ViewContext::create_root_context(store, id).await?; StateView::load(context).await } } #[cfg(with_dynamodb)] pub struct DynamoDbTestStorage { store: DynamoDbStore, accessed_chains: BTreeSet, } #[cfg(with_dynamodb)] #[async_trait] impl StateStorage for DynamoDbTestStorage { type Context = ViewContext; async fn new() -> Self { let store = DynamoDbStore::new_test_store().await.unwrap(); let accessed_chains = BTreeSet::new(); DynamoDbTestStorage { store, accessed_chains, } } async fn load(&mut self, id: usize) -> Result, ViewError> { self.accessed_chains.insert(id); let root_key = bcs::to_bytes(&id)?; let store = self.store.clone_with_root_key(&root_key)?; let context = ViewContext::create_root_context(store, id).await?; StateView::load(context).await } } #[derive(Debug)] pub struct TestConfig { with_x1: bool, with_x2: bool, with_flush: bool, with_map: bool, with_set: bool, with_queue: bool, with_log: bool, with_collection: bool, } impl Default for TestConfig { fn default() -> Self { Self { with_x1: true, with_x2: true, with_flush: true, with_map: true, with_set: true, with_queue: true, with_log: true, with_collection: true, } } } impl TestConfig { fn samples() -> Vec { vec![ TestConfig { with_x1: false, with_x2: false, with_flush: false, with_map: false, with_set: false, with_queue: false, with_log: false, with_collection: false, }, TestConfig { with_x1: true, with_x2: true, with_flush: false, with_map: false, with_set: false, with_queue: false, with_log: false, with_collection: false, }, TestConfig { with_x1: false, with_x2: false, with_flush: true, with_map: false, with_set: false, with_queue: true, with_log: true, with_collection: false, }, TestConfig { with_x1: false, with_x2: false, with_flush: true, with_map: true, with_set: true, with_queue: false, with_log: false, with_collection: true, }, TestConfig::default(), ] } } #[cfg(test)] async fn test_store( store: &mut S, config: &TestConfig, ) -> Result<::Output> where S: StateStorage, ViewError: From<<::Context as Context>::Error>, { let default_hash = { let view = store.load(1).await?; view.hash().await? }; { let mut view = store.load(1).await?; if config.with_x1 { assert_eq!(view.x1.extra(), &1); } let hash = view.hash().await?; assert_eq!(hash, default_hash); if config.with_x1 { assert_eq!(view.x1.get(), &0); view.x1.set(1); } view.rollback(); assert_eq!(view.hash().await?, hash); if config.with_x2 { view.x2.set(2); } if config.with_x2 { assert_ne!(view.hash().await?, hash); } if config.with_log { view.log.push(4); } if config.with_queue { view.queue.push_back(8); assert_eq!(view.queue.front().await?, Some(8)); view.queue.push_back(7); view.queue.delete_front(); } if config.with_map { view.map.insert("Hello", 5)?; assert_eq!(view.map.indices().await?, vec!["Hello".to_string()]); let mut count = 0; view.map .for_each_index(|_index| { count += 1; Ok(()) }) .await?; assert_eq!(count, 1); } if config.with_set { view.set.insert(&42)?; assert_eq!(view.set.indices().await?, vec![42]); let mut count = 0; view.set .for_each_index(|_index| { count += 1; Ok(()) }) .await?; assert_eq!(count, 1); } if config.with_x1 { assert_eq!(view.x1.get(), &0); } if config.with_x2 { assert_eq!(view.x2.get(), &2); } if config.with_log { assert_eq!(view.log.read(0..10).await?, vec![4]); } if config.with_queue { assert_eq!(view.queue.read_front(10).await?, vec![7]); } if config.with_map { assert_eq!(view.map.get("Hello").await?, Some(5)); } if config.with_set { assert!(view.set.contains(&42).await?); } if config.with_collection { { let subview = view.collection.load_entry_mut("hola").await?; subview.push(17); subview.push(18); assert_eq!(view.collection.indices().await?, vec!["hola".to_string()]); let mut count = 0; view.collection .for_each_index(|_index| { count += 1; Ok(()) }) .await?; assert_eq!(count, 1); } let subview = view.collection.try_load_entry("hola").await?.unwrap(); assert_eq!(subview.read(0..10).await?, vec![17, 18]); } }; let staged_hash = { let mut view = store.load(1).await?; assert_eq!(view.hash().await?, default_hash); if config.with_x1 { assert_eq!(view.x1.get(), &0); } if config.with_x2 { assert_eq!(view.x2.get(), &0); } if config.with_log { assert_eq!(view.log.read(0..10).await?, Vec::::new()); } if config.with_queue { assert_eq!(view.queue.read_front(10).await?, Vec::::new()); } if config.with_map { assert_eq!(view.map.get("Hello").await?, None); } if config.with_set { assert!(!view.set.contains(&42).await?); } if config.with_collection { let subview = view.collection.load_entry_or_insert("hola").await?; assert_eq!(subview.read(0..10).await?, Vec::::new()); let subview = view.collection2.load_entry_mut("ciao").await?; let subsubview = subview.load_entry_mut("!").await?; subsubview.set(3); assert_eq!(subsubview.get(), &3); } if config.with_x1 { view.x1.set(1); } if config.with_log { view.log.push(4); } if config.with_queue { view.queue.push_back(7); } if config.with_map { view.map.insert("Hello", 5)?; view.map.insert("Hi", 2)?; view.map.remove("Hi")?; } if config.with_set { view.set.insert(&42)?; view.set.insert(&59)?; view.set.remove(&59)?; } if config.with_collection { let subview = view.collection.load_entry_mut("hola").await?; subview.push(17); subview.push(18); } if config.with_flush { view.save().await?; } let hash1 = view.hash().await?; let hash2 = view.hash().await?; view.save().await?; let hash3 = view.hash().await?; assert_eq!(hash1, hash2); assert_eq!(hash1, hash3); hash1 }; { let mut view = store.load(1).await?; let stored_hash = view.hash().await?; assert_eq!(staged_hash, stored_hash); if config.with_x1 { assert_eq!(view.x1.get(), &1); } if config.with_x2 { assert_eq!(view.x2.get(), &0); } if config.with_log { assert_eq!(view.log.read(0..10).await?, vec![4]); } if config.with_queue { view.queue.push_back(8); assert_eq!(view.queue.read_front(10).await?, vec![7, 8]); assert_eq!(view.queue.read_front(1).await?, vec![7]); assert_eq!(view.queue.read_back(10).await?, vec![7, 8]); assert_eq!(view.queue.read_back(1).await?, vec![8]); assert_eq!(view.queue.front().await?, Some(7)); assert_eq!(view.queue.back().await?, Some(8)); assert_eq!(view.queue.count(), 2); view.queue.delete_front(); assert_eq!(view.queue.front().await?, Some(8)); view.queue.delete_front(); assert_eq!(view.queue.front().await?, None); assert_eq!(view.queue.count(), 0); view.queue.push_back(13); } if config.with_map { assert_eq!(view.map.get("Hello").await?, Some(5)); assert_eq!(view.map.get("Hi").await?, None); } if config.with_set { assert!(view.set.contains(&42).await?); assert!(!view.set.contains(&59).await?); } if config.with_collection { let subview = view.collection.try_load_entry("hola").await?.unwrap(); assert_eq!(subview.read(0..10).await?, vec![17, 18]); assert_eq!(subview.read(..).await?, vec![17, 18]); assert_eq!(subview.read(1..).await?, vec![18]); assert_eq!(subview.read(..=0).await?, vec![17]); } if config.with_flush { view.save().await?; } if config.with_collection { let subview = view.collection2.load_entry_mut("ciao").await?; let subsubview = subview.try_load_entry("!").await?.unwrap(); assert!(subview.try_load_entry("!").await.is_err()); assert_eq!(subsubview.get(), &3); assert_eq!(view.collection.indices().await?, vec!["hola".to_string()]); view.collection.remove_entry("hola")?; } if config.with_x1 && config.with_x2 && config.with_map && config.with_set && config.with_queue && config.with_log && config.with_collection { assert_ne!(view.hash().await?, stored_hash); } view.save().await?; } { let mut view = store.load(1).await?; if config.with_collection { { let mut subview = view.collection4.try_load_entry_mut("hola").await?; assert_eq!(subview.read_front(10).await?, Vec::::new()); assert!(view.collection4.try_load_entry_mut("hola").await.is_err()); if config.with_queue { subview.push_back(13); assert_eq!(subview.front().await?, Some(13)); subview.delete_front(); assert_eq!(subview.front().await?, None); assert_eq!(subview.count(), 0); } } { let subview = view.collection4.try_load_entry("hola").await?.unwrap(); assert_eq!(subview.count(), 0); assert!(view.collection4.try_load_entry("hola").await.is_ok()); } } } if config.with_map { { let mut view = store.load(1).await?; let value = view.map.get_mut_or_default("Geia").await?; assert_eq!(*value, 0); *value = 42; let value = view.map.get_mut_or_default("Geia").await?; assert_eq!(*value, 42); view.save().await?; } { let view = store.load(1).await?; assert_eq!(view.map.get("Geia").await?, Some(42)); } { let mut view = store.load(1).await?; let value = view.map.get_mut_or_default("Geia").await?; assert_eq!(*value, 42); *value = 43; view.rollback(); let value = view.map.get_mut_or_default("Geia").await?; assert_eq!(*value, 42); } } if config.with_map { { let mut view = store.load(1).await?; view.map.insert("Konnichiwa", 5)?; let value = view.map.get_mut("Konnichiwa").await?.unwrap(); *value = 6; view.save().await?; } { let view = store.load(1).await?; assert_eq!(view.map.get("Konnichiwa").await?, Some(6)); } } { let mut view = store.load(1).await?; if config.with_collection { let subview = view.collection.load_entry_or_insert("hola").await?; assert_eq!(subview.read(0..10).await?, Vec::::new()); } if config.with_queue { assert_eq!(view.queue.front().await?, Some(13)); view.queue.delete_front(); assert_eq!(view.queue.front().await?, None); assert_eq!(view.queue.count(), 0); } view.clear(); view.save().await?; } Ok(staged_hash) } #[derive(CryptoHashRootView)] pub struct ByteMapStateView { pub map: ByteMapView, } #[tokio::test] async fn test_byte_map_view() -> Result<()> { let context = create_test_memory_context(); { let mut view = ByteMapStateView::load(context.clone()).await?; view.map.insert(vec![0, 1], 5); view.map.insert(vec![2, 3], 23); view.save().await?; } { let mut view = ByteMapStateView::load(context.clone()).await?; view.map.remove_by_prefix(vec![0]); let val = view.map.get_mut_or_default(&[0, 1]).await?; assert_eq!(*val, 0); let val = view.map.get_mut(&[2, 3]).await?; assert_eq!(val, Some(&mut 23)); view.save().await?; } { let mut view = ByteMapStateView::load(context.clone()).await?; view.map.remove_by_prefix(vec![2]); let val = view.map.get_mut(&[2, 3]).await?; assert_eq!(val, None); } Ok(()) } #[cfg(test)] async fn test_views_in_lru_memory_param(config: &TestConfig) -> Result<()> { tracing::warn!("Testing config {:?} with lru memory", config); let mut store = LruMemoryStorage::new().await; test_store(&mut store, config).await?; assert_eq!(store.accessed_chains.len(), 1); Ok(()) } #[tokio::test] async fn test_views_in_lru_memory() -> Result<()> { for config in TestConfig::samples() { test_views_in_lru_memory_param(&config).await?; } Ok(()) } #[cfg(test)] async fn test_views_in_memory_param(config: &TestConfig) -> Result<()> { tracing::warn!("Testing config {:?} with memory", config); let mut store = MemoryTestStorage::new().await; test_store(&mut store, config).await?; assert_eq!(store.accessed_chains.len(), 1); Ok(()) } #[tokio::test] async fn test_views_in_memory() -> Result<()> { for config in TestConfig::samples() { test_views_in_memory_param(&config).await?; } Ok(()) } #[cfg(test)] async fn test_views_in_key_value_store_view_memory_param(config: &TestConfig) -> Result<()> { tracing::warn!( "Testing config {:?} with key_value_store_view on memory", config ); let mut store = KeyValueStoreTestStorage::new().await; test_store(&mut store, config).await?; Ok(()) } #[tokio::test] async fn test_views_in_key_value_store_view_memory() -> Result<()> { for config in TestConfig::samples() { test_views_in_key_value_store_view_memory_param(&config).await?; } Ok(()) } #[cfg(with_rocksdb)] #[cfg(test)] async fn test_views_in_rocks_db_param(config: &TestConfig) -> Result<()> { tracing::warn!("Testing config {:?} with rocks_db", config); let mut store = RocksDbTestStorage::new().await; let hash = test_store(&mut store, config).await?; assert_eq!(store.accessed_chains.len(), 1); let mut store = MemoryTestStorage::new().await; let hash2 = test_store(&mut store, config).await?; assert_eq!(hash, hash2); Ok(()) } #[cfg(with_rocksdb)] #[tokio::test] async fn test_views_in_rocks_db() -> Result<()> { for config in TestConfig::samples() { test_views_in_rocks_db_param(&config).await?; } Ok(()) } #[cfg(with_scylladb)] #[cfg(test)] async fn test_views_in_scylla_db_param(config: &TestConfig) -> Result<()> { tracing::warn!("Testing config {:?} with scylla_db", config); let mut store = ScyllaDbTestStorage::new().await; let hash = test_store(&mut store, config).await?; assert_eq!(store.accessed_chains.len(), 1); let mut store = MemoryTestStorage::new().await; let hash2 = test_store(&mut store, config).await?; assert_eq!(hash, hash2); Ok(()) } #[cfg(with_scylladb)] #[tokio::test] async fn test_views_in_scylla_db() -> Result<()> { for config in TestConfig::samples() { test_views_in_scylla_db_param(&config).await?; } Ok(()) } #[cfg(with_dynamodb)] #[tokio::test] async fn test_views_in_dynamo_db() -> Result<()> { let mut store = DynamoDbTestStorage::new().await; let config = TestConfig::default(); let hash = test_store(&mut store, &config).await?; assert_eq!(store.accessed_chains.len(), 1); let mut store = MemoryTestStorage::new().await; let hash2 = test_store(&mut store, &config).await?; assert_eq!(hash, hash2); Ok(()) } #[cfg(with_rocksdb)] #[cfg(test)] async fn test_store_rollback_kernel(store: &mut S) -> Result<()> where S: StateStorage, ViewError: From<<::Context as Context>::Error>, { { let mut view = store.load(1).await?; view.queue.push_back(8); view.map.insert("Hello", 5)?; let subview = view.collection.load_entry_mut("hola").await?; subview.push(17); view.save().await?; } { let mut view = store.load(1).await?; view.queue.push_back(7); view.map.insert("Hello", 4)?; let subview = view.collection.load_entry_mut("DobryDen").await?; subview.push(16); view.rollback(); view.save().await?; } { let mut view = store.load(1).await?; view.queue.clear(); view.map.clear(); view.collection.clear(); view.rollback(); view.save().await?; } { let view = store.load(1).await?; assert_eq!(view.queue.front().await?, Some(8)); assert_eq!(view.map.get("Hello").await?, Some(5)); assert_eq!(view.collection.indices().await?, vec!["hola".to_string()]); } Ok(()) } #[cfg(with_rocksdb)] #[tokio::test] async fn test_store_rollback() -> Result<()> { let mut store = MemoryTestStorage::new().await; test_store_rollback_kernel(&mut store).await?; let mut store = RocksDbTestStorage::new().await; test_store_rollback_kernel(&mut store).await?; Ok(()) } #[tokio::test] async fn test_collection_removal() -> Result<()> { type EntryType = HashedRegisterView, u8>; type CollectionViewType = HashedCollectionView, u8, EntryType>; let context = create_test_memory_context(); // Write a dummy entry into the collection. let mut collection = CollectionViewType::load(context.clone()).await?; let entry = collection.load_entry_mut(&1).await?; entry.set(1); let mut batch = Batch::new(); collection.flush(&mut batch)?; collection.context().write_batch(batch).await?; // Remove the entry from the collection. let mut collection = CollectionViewType::load(context.clone()).await?; collection.remove_entry(&1)?; let mut batch = Batch::new(); collection.flush(&mut batch)?; collection.context().write_batch(batch).await?; // Check that the entry was removed. let collection = CollectionViewType::load(context.clone()).await?; assert!(!collection.indices().await?.contains(&1)); Ok(()) } async fn test_removal_api_first_second_condition( first_condition: bool, second_condition: bool, ) -> Result<()> { type EntryType = HashedRegisterView, u8>; type CollectionViewType = HashedCollectionView, u8, EntryType>; let context = create_test_memory_context(); // First add an entry `1` with value `100` and commit let mut collection: CollectionViewType = HashedCollectionView::load(context.clone()).await?; let entry = collection.load_entry_mut(&1).await?; entry.set(100); let mut batch = Batch::new(); collection.flush(&mut batch)?; collection.context().write_batch(batch).await?; // Reload the collection view and remove the entry, but don't commit yet let mut collection: CollectionViewType = HashedCollectionView::load(context.clone()).await?; collection.remove_entry(&1)?; // Now, read the entry with a different value if a certain condition is true if first_condition { let entry = collection.load_entry_mut(&1).await?; entry.set(200); } // Finally, either commit or rollback based on some other condition if second_condition { // If rolling back, then the entry `1` still exists with value `100`. collection.rollback(); } // We commit let mut batch = Batch::new(); collection.flush(&mut batch)?; collection.context().write_batch(batch).await?; let mut collection: CollectionViewType = HashedCollectionView::load(context.clone()).await?; let expected_val = if second_condition { Some(100) } else if first_condition { Some(200) } else { None }; match expected_val { Some(expected_val_i) => { let subview = collection.load_entry_mut(&1).await?; assert_eq!(subview.get(), &expected_val_i); } None => { assert!(!collection.indices().await?.contains(&1)); } }; Ok(()) } #[tokio::test] async fn test_removal_api() -> Result<()> { for first_condition in [true, false] { for second_condition in [true, false] { test_removal_api_first_second_condition(first_condition, second_condition).await?; } } Ok(()) } #[cfg(test)] async fn compute_hash_unordered_put_view( rng: &mut impl RngCore, store: &mut S, key_value_vector: Vec<(Vec, Vec)>, ) -> Result<::Output> where S: StateStorage, ViewError: From<<::Context as Context>::Error>, { let mut view = store.load(1).await?; for key_value in key_value_vector { let key = key_value.0; let value = key_value.1; let key_str = format!("{:?}", &key); let value_usize = (*value.first().unwrap()) as usize; view.map.insert(&key_str, value_usize)?; view.key_value_store.insert(key, value).await?; { let subview = view.collection.load_entry_mut(&key_str).await?; subview.push(value_usize as u32); } // let choice = rng.gen_range(0..20); if choice == 0 { view.save().await?; } } Ok(view.hash().await?) } #[cfg(test)] async fn compute_hash_unordered_putdelete_view( rng: &mut impl RngCore, store: &mut S, operations: Vec, ) -> Result<::Output> where S: StateStorage, ViewError: From<<::Context as Context>::Error>, { let mut view = store.load(1).await?; for operation in operations { match operation { Put { key, value } => { let key_str = format!("{:?}", &key); let first_value = *value.first().unwrap(); let first_value_usize = first_value as usize; let first_value_u64 = first_value as u64; let mut tmp = *view.x1.get(); tmp += first_value_u64; view.x1.set(tmp); view.map.insert(&key_str, first_value_usize)?; view.key_value_store.insert(key, value).await?; { let subview = view.collection.load_entry_mut(&key_str).await?; subview.push(first_value as u32); } } Delete { key } => { let key_str = format!("{:?}", &key); view.map.remove(&key_str)?; view.key_value_store.remove(key).await?; } DeletePrefix { key_prefix: _ } => {} } // let choice = rng.gen_range(0..10); if choice == 0 { view.save().await?; } } Ok(view.hash().await?) } #[cfg(test)] async fn compute_hash_ordered_view( rng: &mut impl RngCore, store: &mut S, key_value_vector: Vec<(Vec, Vec)>, ) -> Result<::Output> where S: StateStorage, ViewError: From<<::Context as Context>::Error>, { let mut view = store.load(1).await?; for key_value in key_value_vector { let value = key_value.1; let value_usize = (*value.first().unwrap()) as usize; view.log.push(value_usize as u32); view.queue.push_back(value_usize as u64); // let choice = rng.gen_range(0..20); if choice == 0 { view.save().await?; } } Ok(view.hash().await?) } #[cfg(test)] async fn compute_hash_view_iter(rng: &mut R, n: usize, k: usize) -> Result<()> { use rand::seq::SliceRandom; let mut unord1_hashes = Vec::new(); let mut unord2_hashes = Vec::new(); let mut ord_hashes = Vec::new(); let key_value_vector = get_random_key_values(rng, n); let info_op = get_random_key_value_operations(rng, n, k); let n_iter = 4; for _ in 0..n_iter { let mut key_value_vector_b = key_value_vector.clone(); key_value_vector_b.shuffle(rng); let operations = span_random_reordering_put_delete(rng, info_op.clone()); // let mut store1 = MemoryTestStorage::new().await; unord1_hashes .push(compute_hash_unordered_put_view(rng, &mut store1, key_value_vector_b).await?); let mut store2 = MemoryTestStorage::new().await; unord2_hashes .push(compute_hash_unordered_putdelete_view(rng, &mut store2, operations).await?); let mut store3 = MemoryTestStorage::new().await; ord_hashes .push(compute_hash_ordered_view(rng, &mut store3, key_value_vector.clone()).await?); } for i in 1..n_iter { assert_eq!( unord1_hashes.first().unwrap(), unord1_hashes.get(i).unwrap(), ); assert_eq!( unord2_hashes.first().unwrap(), unord2_hashes.get(i).unwrap(), ); assert_eq!(ord_hashes.first().unwrap(), ord_hashes.get(i).unwrap()); } Ok(()) } #[tokio::test] async fn compute_hash_view_iter_large() -> Result<()> { let n_iter = 2; let n = 100; let k = 30; let mut rng = make_deterministic_rng(); for _ in 0..n_iter { compute_hash_view_iter(&mut rng, n, k).await?; } Ok(()) } #[cfg(test)] async fn check_hash_memoization_persistence( rng: &mut impl RngCore, store: &mut S, key_value_vector: Vec<(Vec, Vec)>, ) -> Result<()> where S: StateStorage, ViewError: From<<::Context as Context>::Error>, { let mut hash = { let view = store.load(1).await?; view.hash().await? }; for pair in key_value_vector { let str0 = format!("{:?}", &pair.0); let str1 = format!("{:?}", &pair.1); let pair0_first_u8 = *pair.0.first().unwrap(); let pair1_first_u8 = *pair.1.first().unwrap(); let choice = rng.gen_range(0..7); if choice < 3 { let mut view = store.load(1).await?; view.x1.set(pair0_first_u8 as u64); view.x2.set(pair1_first_u8 as u32); view.log.push(pair0_first_u8 as u32); view.log.push(pair1_first_u8 as u32); view.queue.push_back(pair0_first_u8 as u64); view.queue.push_back(pair1_first_u8 as u64); view.map.insert(&str0, pair1_first_u8 as usize)?; view.map.insert(&str1, pair0_first_u8 as usize)?; view.key_value_store .insert(pair.0.clone(), pair.1.clone()) .await?; if choice == 0 { view.rollback(); let hash_new = view.hash().await?; assert_eq!(hash, hash_new); } else { let hash_new = view.hash().await?; assert_ne!(hash, hash_new); if choice == 2 { view.save().await?; hash = hash_new; } } } if choice == 3 { let view = store.load(1).await?; let hash_new = view.hash().await?; assert_eq!(hash, hash_new); } if choice == 4 { let mut view = store.load(1).await?; let subview = view.collection.load_entry_mut(&str0).await?; subview.push(pair1_first_u8 as u32); let hash_new = view.hash().await?; assert_ne!(hash, hash_new); view.save().await?; hash = hash_new; } if choice == 5 { let mut view = store.load(1).await?; if view.queue.count() > 0 { view.queue.delete_front(); let hash_new = view.hash().await?; assert_ne!(hash, hash_new); view.save().await?; hash = hash_new; } } if choice == 6 { let mut view = store.load(1).await?; let indices = view.collection.indices().await?; let size = indices.len(); if size > 0 { let pos = rng.gen_range(0..size); let x = &indices[pos]; view.collection.remove_entry(x)?; let hash_new = view.hash().await?; assert_ne!(hash, hash_new); view.save().await?; hash = hash_new; } } } Ok(()) } #[tokio::test] async fn check_hash_memoization_persistence_large() -> Result<()> { let n = 100; let mut rng = make_deterministic_rng(); let key_value_vector = get_random_key_values(&mut rng, n); let mut store = MemoryTestStorage::new().await; check_hash_memoization_persistence(&mut rng, &mut store, key_value_vector).await } #[cfg(test)] async fn check_large_write(store: &mut S, vector: Vec) -> Result<()> where S: StateStorage, ViewError: From<<::Context as Context>::Error>, { let hash1 = { let mut view = store.load(1).await?; for val in vector { view.log.push(val as u32); } let hash = view.hash().await?; view.save().await?; hash }; let view = store.load(1).await?; let hash2 = view.hash().await?; assert_eq!(hash1, hash2); Ok(()) } #[tokio::test] #[cfg(with_dynamodb)] async fn check_large_write_dynamo_db() -> Result<()> { // By writing 1000 elements we seriously check the Amazon journaling // writing system. let n = 1000; let mut rng = make_deterministic_rng(); let vector = get_random_byte_vector(&mut rng, &[], n); let mut store = DynamoDbTestStorage::new().await; check_large_write(&mut store, vector).await } #[tokio::test] async fn check_large_write_memory() -> Result<()> { let n = 1000; let mut rng = make_deterministic_rng(); let vector = get_random_byte_vector(&mut rng, &[], n); let mut store = MemoryTestStorage::new().await; check_large_write(&mut store, vector).await }