// // Copyright (c) 2023 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at // http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 // which is available at https://www.apache.org/licenses/LICENSE-2.0. // // SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 // // Contributors: // ZettaScale Zenoh Team, <zenoh@zettascale.tech> // #![cfg(feature = "test")] use std::{ops::Deref, sync::atomic::Ordering::Relaxed, time::Duration}; use rand::Rng; use zenoh_core::bail; use zenoh_result::ZResult; pub mod common; use common::{execute_concurrent, CpuLoad}; use zenoh_shm::{ metadata::{ descriptor::MetadataDescriptor, storage::GLOBAL_METADATA_STORAGE, subscription::GLOBAL_METADATA_SUBSCRIPTION, }, watchdog::{confirmator::GLOBAL_CONFIRMATOR, validator::GLOBAL_VALIDATOR}, }; fn metadata_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { let _allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?; Ok(()) } } #[test] fn metadata_alloc() { execute_concurrent(1, 1000, metadata_alloc_fn()); } #[test] fn metadata_alloc_concurrent() { execute_concurrent(100, 1000, metadata_alloc_fn()); } fn metadata_link_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| { let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?; let descr = MetadataDescriptor::from(allocated_metadata.deref()); let _linked_metadata = GLOBAL_METADATA_SUBSCRIPTION.read().link(&descr)?; Ok(()) } } #[test] fn metadata_link() { execute_concurrent(1, 1000, metadata_link_fn()); } #[test] fn metadata_link_concurrent() { execute_concurrent(100, 1000, metadata_link_fn()); } fn metadata_link_failure_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| { let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?; let descr = MetadataDescriptor::from(allocated_metadata.deref()); drop(allocated_metadata); // Some comments on this behaviour... // Even though the allocated_metadata is dropped, its SHM segment still exists in GLOBAL_METADATA_STORAGE, // so there is no way to detect that metadata is "deallocated" and the code below succeeds. The invalidation // functionality is implemented on higher level by means of a generation mechanism that protects from both metadata // and watchdog link-to-deallocated issues. This generation mechanism depends on the behaviour below, so // everything is fair :) let _linked_metadata = GLOBAL_METADATA_SUBSCRIPTION.read().link(&descr)?; Ok(()) } } #[test] fn metadata_link_failure() { execute_concurrent(1, 1000, metadata_link_failure_fn()); } #[test] fn metadata_link_failure_concurrent() { execute_concurrent(100, 1000, metadata_link_failure_fn()); } fn metadata_check_memory_fn(parallel_tasks: usize, iterations: usize) { let task_fun = |_task_index: usize, _iteration: usize| -> ZResult<()> { let allocated_metadata = GLOBAL_METADATA_STORAGE.read().allocate()?; let descr = MetadataDescriptor::from(allocated_metadata.deref()); let linked_metadata = GLOBAL_METADATA_SUBSCRIPTION.read().link(&descr)?; let mut rng = rand::thread_rng(); let allocated = allocated_metadata.header(); let linked = linked_metadata.header(); for _ in 0..100 { let gen = rng.gen(); allocated.generation.store(gen, Relaxed); assert_eq!(gen, linked.generation.load(Relaxed)); let rc = rng.gen(); allocated.refcount.store(rc, Relaxed); assert_eq!(rc, linked.refcount.load(Relaxed)); let watchdog_inv = rng.gen(); allocated.watchdog_invalidated.store(watchdog_inv, Relaxed); assert_eq!(watchdog_inv, linked.watchdog_invalidated.load(Relaxed)); assert_eq!(gen, linked.generation.load(Relaxed)); assert_eq!(rc, linked.refcount.load(Relaxed)); assert_eq!(watchdog_inv, linked.watchdog_invalidated.load(Relaxed)); } Ok(()) }; execute_concurrent(parallel_tasks, iterations, task_fun); } #[test] fn metadata_check_memory() { metadata_check_memory_fn(1, 1000); } #[test] fn metadata_check_memory_concurrent() { metadata_check_memory_fn(100, 100); } const VALIDATION_PERIOD: Duration = Duration::from_millis(100); const CONFIRMATION_PERIOD: Duration = Duration::from_millis(50); fn watchdog_confirmed_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { let allocated = GLOBAL_METADATA_STORAGE.read().allocate()?; let confirmed = GLOBAL_CONFIRMATOR.read().add(allocated.clone()); // check that the confirmed watchdog stays valid for i in 0..10 { std::thread::sleep(VALIDATION_PERIOD); let valid = confirmed.owned.test_validate() != 0; if !valid { bail!("Invalid watchdog, iteration {i}"); } } Ok(()) } } #[test] #[ignore] fn watchdog_confirmed() { execute_concurrent(1, 10, watchdog_confirmed_fn()); } #[test] #[ignore] fn watchdog_confirmed_concurrent() { execute_concurrent(1000, 10, watchdog_confirmed_fn()); } // TODO: confirmation to dangling watchdog actually writes to potentially-existing // other watchdog instance from other test running in the same process and changes it's behaviour, // so we cannot run dangling test in parallel with anything else #[test] #[ignore] fn watchdog_confirmed_dangling() { let allocated = GLOBAL_METADATA_STORAGE .read() .allocate() .expect("error allocating watchdog!"); let confirmed = GLOBAL_CONFIRMATOR.read().add(allocated.clone()); drop(allocated); // confirm dangling (not allocated) watchdog for _ in 0..10 { std::thread::sleep(VALIDATION_PERIOD); confirmed.owned.confirm(); } } fn watchdog_validated_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { let allocated = GLOBAL_METADATA_STORAGE.read().allocate()?; let confirmed = GLOBAL_CONFIRMATOR.read().add(allocated.clone()); GLOBAL_VALIDATOR.read().add(allocated.clone()); // check that the watchdog stays valid as it is confirmed for i in 0..10 { std::thread::sleep(VALIDATION_PERIOD); if allocated .header() .watchdog_invalidated .load(std::sync::atomic::Ordering::SeqCst) { bail!("Invalid watchdog, iteration {i}"); } } // Worst-case timings: // validation: |___________|___________|___________|___________| // confirmation: __|_____|_____|_____|_____| // drop(confirmed): ^ // It means that the worst-case latency for the watchdog to become invalid is VALIDATION_PERIOD*2 // check that the watchdog becomes invalid once we stop it's confirmation drop(confirmed); std::thread::sleep(VALIDATION_PERIOD * 3 + CONFIRMATION_PERIOD); assert!(allocated .header() .watchdog_invalidated .load(std::sync::atomic::Ordering::SeqCst)); Ok(()) } } #[test] #[ignore] fn watchdog_validated() { execute_concurrent(1, 10, watchdog_validated_fn()); } #[test] #[ignore] fn watchdog_validated_concurrent() { execute_concurrent(1000, 10, watchdog_validated_fn()); } fn watchdog_validated_invalid_without_confirmator_fn( ) -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { let allocated = GLOBAL_METADATA_STORAGE .read() .allocate() .expect("error allocating watchdog!"); assert!(allocated.test_validate() == 0); // add watchdog to validator GLOBAL_VALIDATOR.read().add(allocated.clone()); // check that the watchdog becomes invalid because we do not confirm it std::thread::sleep(VALIDATION_PERIOD * 2 + CONFIRMATION_PERIOD); assert!(allocated .header() .watchdog_invalidated .load(std::sync::atomic::Ordering::SeqCst)); Ok(()) } } #[test] #[ignore] fn watchdog_validated_invalid_without_confirmator() { execute_concurrent(1, 10, watchdog_validated_invalid_without_confirmator_fn()); } #[test] #[ignore] fn watchdog_validated_invalid_without_confirmator_concurrent() { execute_concurrent( 1000, 10, watchdog_validated_invalid_without_confirmator_fn(), ); } fn watchdog_validated_additional_confirmation_fn( ) -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { let allocated = GLOBAL_METADATA_STORAGE .read() .allocate() .expect("error allocating watchdog!"); let confirmed = GLOBAL_CONFIRMATOR.read().add(allocated.clone()); // add watchdog to validator GLOBAL_VALIDATOR.read().add(allocated.clone()); // make additional confirmations for _ in 0..100 { std::thread::sleep(VALIDATION_PERIOD / 10); confirmed.owned.confirm(); } // check that the watchdog stays valid as we stop additional confirmation std::thread::sleep(VALIDATION_PERIOD * 10); assert!(!allocated .header() .watchdog_invalidated .load(std::sync::atomic::Ordering::SeqCst)); // Worst-case timings: // validation: |___________|___________|___________|___________| // confirmation: __|_____|_____|_____|_____| // drop(confirmed): ^ // It means that the worst-case latency for the watchdog to become invalid is VALIDATION_PERIOD*2 // check that the watchdog becomes invalid once we stop it's regular confirmation drop(confirmed); std::thread::sleep(VALIDATION_PERIOD * 2 + CONFIRMATION_PERIOD); // check that invalidation event happened! assert!(allocated .header() .watchdog_invalidated .load(std::sync::atomic::Ordering::SeqCst)); Ok(()) } } #[test] #[ignore] fn watchdog_validated_additional_confirmation() { execute_concurrent(1, 10, watchdog_validated_additional_confirmation_fn()); } #[test] #[ignore] fn watchdog_validated_additional_confirmation_concurrent() { execute_concurrent(1000, 10, watchdog_validated_additional_confirmation_fn()); } fn watchdog_validated_overloaded_system_fn( ) -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { let allocated = GLOBAL_METADATA_STORAGE .read() .allocate() .expect("error allocating watchdog!"); let confirmed = GLOBAL_CONFIRMATOR.read().add(allocated.clone()); // add watchdog to validator GLOBAL_VALIDATOR.read().add(allocated.clone()); // check that the watchdog stays valid std::thread::sleep(VALIDATION_PERIOD * 10); assert!(!allocated .header() .watchdog_invalidated .load(std::sync::atomic::Ordering::SeqCst)); // Worst-case timings: // validation: |___________|___________|___________|___________| // confirmation: __|_____|_____|_____|_____| // drop(confirmed): ^ // It means that the worst-case latency for the watchdog to become invalid is VALIDATION_PERIOD*2 // check that the watchdog becomes invalid once we stop it's regular confirmation drop(confirmed); std::thread::sleep(VALIDATION_PERIOD * 2 + CONFIRMATION_PERIOD); // check that invalidation event happened! assert!(allocated .header() .watchdog_invalidated .load(std::sync::atomic::Ordering::SeqCst)); Ok(()) } } #[test] #[ignore] fn watchdog_validated_low_load() { let _load = CpuLoad::low(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } #[test] #[ignore] fn watchdog_validated_high_load() { let _load = CpuLoad::optimal_high(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } #[test] #[ignore] fn watchdog_validated_overloaded_system() { let _load = CpuLoad::excessive(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); }