// Copyright (C) Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! General PVF host integration tests checking the functionality of the PVF host itself. use assert_matches::assert_matches; #[cfg(all(feature = "ci-only-tests", target_os = "linux"))] use polkadot_node_core_pvf::SecurityStatus; use polkadot_node_core_pvf::{ start, testing::build_workers_and_get_paths, Config, InvalidCandidate, Metrics, PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT}; use polkadot_parachain_primitives::primitives::{BlockData, ValidationResult}; use polkadot_primitives::{ ExecutorParam, ExecutorParams, PersistedValidationData, PvfExecKind, PvfPrepKind, }; use sp_core::H256; use std::{io::Write, sync::Arc, time::Duration}; use tokio::sync::Mutex; mod adder; #[cfg(target_os = "linux")] mod process; mod worker_common; const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6); const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(6); struct TestHost { // Keep a reference to the tempdir as it gets deleted on drop. cache_dir: tempfile::TempDir, host: Mutex, } impl TestHost { async fn new() -> Self { Self::new_with_config(|_| ()).await } async fn new_with_config(f: F) -> Self where F: FnOnce(&mut Config), { let (prepare_worker_path, execute_worker_path) = build_workers_and_get_paths(); let cache_dir = tempfile::tempdir().unwrap(); let mut config = Config::new( cache_dir.path().to_owned(), None, false, prepare_worker_path, execute_worker_path, 2, 1, 2, ); f(&mut config); let (host, task) = start(config, Metrics::default()).await.unwrap(); let _ = tokio::task::spawn(task); Self { cache_dir, host: Mutex::new(host) } } async fn precheck_pvf( &self, code: &[u8], executor_params: ExecutorParams, ) -> Result<(), PrepareError> { let (result_tx, result_rx) = futures::channel::oneshot::channel(); self.host .lock() .await .precheck_pvf( PvfPrepData::from_code( code.into(), executor_params, TEST_PREPARATION_TIMEOUT, PrepareJobKind::Prechecking, ), result_tx, ) .await .unwrap(); result_rx.await.unwrap() } async fn validate_candidate( &self, code: &[u8], pvd: PersistedValidationData, pov: PoV, executor_params: ExecutorParams, ) -> Result { let (result_tx, result_rx) = futures::channel::oneshot::channel(); self.host .lock() .await .execute_pvf( PvfPrepData::from_code( code.into(), executor_params, TEST_PREPARATION_TIMEOUT, PrepareJobKind::Compilation, ), TEST_EXECUTION_TIMEOUT, Arc::new(pvd), Arc::new(pov), polkadot_node_core_pvf::Priority::Normal, result_tx, ) .await .unwrap(); result_rx.await.unwrap() } #[cfg(all(feature = "ci-only-tests", target_os = "linux"))] async fn security_status(&self) -> SecurityStatus { self.host.lock().await.security_status.clone() } } #[tokio::test] async fn prepare_job_terminates_on_timeout() { let host = TestHost::new().await; let start = std::time::Instant::now(); let result = host .precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()) .await; match result { Err(PrepareError::TimedOut) => {}, r => panic!("{:?}", r), } let duration = std::time::Instant::now().duration_since(start); assert!(duration >= TEST_PREPARATION_TIMEOUT); assert!(duration < TEST_PREPARATION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR); } #[tokio::test] async fn execute_job_terminates_on_timeout() { let host = TestHost::new().await; let pvd = PersistedValidationData { parent_head: Default::default(), relay_parent_number: 1u32, relay_parent_storage_root: H256::default(), max_pov_size: 4096 * 1024, }; let pov = PoV { block_data: BlockData(Vec::new()) }; let start = std::time::Instant::now(); let result = host .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; match result { Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)) => {}, r => panic!("{:?}", r), } let duration = std::time::Instant::now().duration_since(start); assert!(duration >= TEST_EXECUTION_TIMEOUT); assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR); } #[cfg(feature = "ci-only-tests")] #[tokio::test] async fn ensure_parallel_execution() { // Run some jobs that do not complete, thus timing out. let host = TestHost::new().await; let pvd = PersistedValidationData { parent_head: Default::default(), relay_parent_number: 1u32, relay_parent_storage_root: H256::default(), max_pov_size: 4096 * 1024, }; let pov = PoV { block_data: BlockData(Vec::new()) }; let execute_pvf_future_1 = host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), pvd.clone(), pov.clone(), Default::default(), ); let execute_pvf_future_2 = host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default(), ); let start = std::time::Instant::now(); let (res1, res2) = futures::join!(execute_pvf_future_1, execute_pvf_future_2); assert_matches!( (res1, res2), ( Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)) ) ); // Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel). let duration = std::time::Instant::now().duration_since(start); let max_duration = 2 * TEST_EXECUTION_TIMEOUT; assert!( duration < max_duration, "Expected duration {}ms to be less than {}ms", duration.as_millis(), max_duration.as_millis() ); } #[tokio::test] async fn execute_queue_doesnt_stall_if_workers_died() { let host = TestHost::new_with_config(|cfg| { cfg.execute_workers_max_num = 5; }) .await; let pvd = PersistedValidationData { parent_head: Default::default(), relay_parent_number: 1u32, relay_parent_storage_root: H256::default(), max_pov_size: 4096 * 1024, }; let pov = PoV { block_data: BlockData(Vec::new()) }; // Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The // first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of // workers should be spun up. let start = std::time::Instant::now(); futures::future::join_all((0u8..=8).map(|_| { host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), pvd.clone(), pov.clone(), Default::default(), ) })) .await; // Total time should be >= 2 x TEST_EXECUTION_TIMEOUT (two separate sets of workers that should // both timeout). let duration = std::time::Instant::now().duration_since(start); let max_duration = 2 * TEST_EXECUTION_TIMEOUT; assert!( duration >= max_duration, "Expected duration {}ms to be greater than or equal to {}ms", duration.as_millis(), max_duration.as_millis() ); } #[cfg(feature = "ci-only-tests")] #[tokio::test] async fn execute_queue_doesnt_stall_with_varying_executor_params() { let host = TestHost::new_with_config(|cfg| { cfg.execute_workers_max_num = 2; }) .await; let pvd = PersistedValidationData { parent_head: Default::default(), relay_parent_number: 1u32, relay_parent_storage_root: H256::default(), max_pov_size: 4096 * 1024, }; let pov = PoV { block_data: BlockData(Vec::new()) }; let executor_params_1 = ExecutorParams::default(); let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]); // Here we spawn 6 validation jobs for the `halt` PVF and share those between 2 workers. Every // 3rd job will have different set of executor parameters. All the workers should be killed // and in this case the queue should respawn new workers with needed executor environment // without waiting. The jobs will be executed in 3 batches, each running two jobs in parallel, // and execution time would be roughly 3 * TEST_EXECUTION_TIMEOUT let start = std::time::Instant::now(); futures::future::join_all((0u8..6).map(|i| { host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), pvd.clone(), pov.clone(), match i % 3 { 0 => executor_params_1.clone(), _ => executor_params_2.clone(), }, ) })) .await; let duration = std::time::Instant::now().duration_since(start); let min_duration = 3 * TEST_EXECUTION_TIMEOUT; let max_duration = 4 * TEST_EXECUTION_TIMEOUT; assert!( duration >= min_duration, "Expected duration {}ms to be greater than or equal to {}ms", duration.as_millis(), min_duration.as_millis() ); assert!( duration <= max_duration, "Expected duration {}ms to be less than or equal to {}ms", duration.as_millis(), max_duration.as_millis() ); } // Test that deleting a prepared artifact does not lead to a dispute when we try to execute it. #[tokio::test] async fn deleting_prepared_artifact_does_not_dispute() { let host = TestHost::new().await; let cache_dir = host.cache_dir.path(); let pvd = PersistedValidationData { parent_head: Default::default(), relay_parent_number: 1u32, relay_parent_storage_root: H256::default(), max_pov_size: 4096 * 1024, }; let pov = PoV { block_data: BlockData(Vec::new()) }; let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) .await .unwrap(); // Manually delete the prepared artifact from disk. The in-memory artifacts table won't change. { // Get the artifact path (asserting it exists). let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect(); // Should contain the artifact and the worker dir. assert_eq!(cache_dir.len(), 2); let mut artifact_path = cache_dir.pop().unwrap().unwrap(); if artifact_path.path().is_dir() { artifact_path = cache_dir.pop().unwrap().unwrap(); } // Delete the artifact. std::fs::remove_file(artifact_path.path()).unwrap(); } // Try to validate, artifact should get recreated. let result = host .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; assert_matches!(result, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout))); } // Test that corruption of a prepared artifact does not lead to a dispute when we try to execute it. #[tokio::test] async fn corrupted_prepared_artifact_does_not_dispute() { let host = TestHost::new().await; let cache_dir = host.cache_dir.path(); let pvd = PersistedValidationData { parent_head: Default::default(), relay_parent_number: 1u32, relay_parent_storage_root: H256::default(), max_pov_size: 4096 * 1024, }; let pov = PoV { block_data: BlockData(Vec::new()) }; let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) .await .unwrap(); // Manually corrupting the prepared artifact from disk. The in-memory artifacts table won't // change. let artifact_path = { // Get the artifact path (asserting it exists). let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect(); // Should contain the artifact and the worker dir. assert_eq!(cache_dir.len(), 2); let mut artifact_path = cache_dir.pop().unwrap().unwrap(); if artifact_path.path().is_dir() { artifact_path = cache_dir.pop().unwrap().unwrap(); } // Corrupt the artifact. let mut f = std::fs::OpenOptions::new() .write(true) .truncate(true) .open(artifact_path.path()) .unwrap(); f.write_all(b"corrupted wasm").unwrap(); f.flush().unwrap(); artifact_path }; assert!(artifact_path.path().exists()); // Try to validate, artifact should get removed because of the corruption. let result = host .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; assert_matches!( result, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(_))) ); // because of RuntimeConstruction we may retry host.precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) .await .unwrap(); // The actual artifact removal is done concurrently // with sending of the result of the execution // it is not a problem for further re-preparation as // artifact filenames are random for _ in 1..5 { if !artifact_path.path().exists() { break; } tokio::time::sleep(Duration::from_secs(1)).await; } assert!( !artifact_path.path().exists(), "the corrupted artifact ({}) should be deleted by the host", artifact_path.path().display() ); } #[tokio::test] async fn cache_cleared_on_startup() { // Don't drop this host, it owns the `TempDir` which gets cleared on drop. let host = TestHost::new().await; let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) .await .unwrap(); // The cache dir should contain one artifact and one worker dir. let cache_dir = host.cache_dir.path().to_owned(); assert_eq!(std::fs::read_dir(&cache_dir).unwrap().count(), 2); // Start a new host, previous artifact should be cleared. let _host = TestHost::new_with_config(|cfg| { cfg.cache_path = cache_dir.clone(); }) .await; assert_eq!(std::fs::read_dir(&cache_dir).unwrap().count(), 0); } // This test checks if the adder parachain runtime can be prepared with 10Mb preparation memory // limit enforced. At the moment of writing, the limit if far enough to prepare the PVF. If it // starts failing, either Wasmtime version has changed, or the PVF code itself has changed, and // more memory is required now. Multi-threaded preparation, if ever enabled, may also affect // memory consumption. #[tokio::test] async fn prechecking_within_memory_limits() { let host = TestHost::new().await; let result = host .precheck_pvf( ::test_parachain_adder::wasm_binary_unwrap(), ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(10 * 1024 * 1024)][..]), ) .await; assert_matches!(result, Ok(_)); } // This test checks if the adder parachain runtime can be prepared with 512Kb preparation memory // limit enforced. At the moment of writing, the limit if not enough to prepare the PVF, and the // preparation is supposed to generate an error. If the test starts failing, either Wasmtime // version has changed, or the PVF code itself has changed, and less memory is required now. #[tokio::test] async fn prechecking_out_of_memory() { use polkadot_node_core_pvf::PrepareError; let host = TestHost::new().await; let result = host .precheck_pvf( ::test_parachain_adder::wasm_binary_unwrap(), ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(512 * 1024)][..]), ) .await; assert_matches!(result, Err(PrepareError::OutOfMemory)); } // With one worker, run multiple preparation jobs serially. They should not conflict. #[tokio::test] async fn prepare_can_run_serially() { let host = TestHost::new_with_config(|cfg| { cfg.prepare_workers_hard_max_num = 1; }) .await; let _stats = host .precheck_pvf(::test_parachain_adder::wasm_binary_unwrap(), Default::default()) .await .unwrap(); // Prepare a different wasm blob to prevent skipping work. let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) .await .unwrap(); } // CI machines should be able to enable all the security features. #[cfg(all(feature = "ci-only-tests", target_os = "linux"))] #[tokio::test] async fn all_security_features_work() { let can_enable_landlock = { let res = unsafe { libc::syscall(libc::SYS_landlock_create_ruleset, 0usize, 0usize, 1u32) }; if res == -1 { let err = std::io::Error::last_os_error().raw_os_error().unwrap(); if err == libc::ENOSYS { false } else { panic!("Unexpected errno from landlock check: {err}"); } } else { true } }; let host = TestHost::new().await; assert_eq!( host.security_status().await, SecurityStatus { // Disabled in tests to not enforce the presence of security features. This CI-only test // is the only one that tests them. secure_validator_mode: false, can_enable_landlock, can_enable_seccomp: true, can_unshare_user_namespace_and_change_root: true, can_do_secure_clone: true, } ); } // Regression test to make sure the unshare-pivot-root capability does not depend on the PVF // artifacts cache existing. #[cfg(all(feature = "ci-only-tests", target_os = "linux"))] #[tokio::test] async fn nonexistent_cache_dir() { let host = TestHost::new_with_config(|cfg| { cfg.cache_path = cfg.cache_path.join("nonexistent_cache_dir"); }) .await; assert!(host.security_status().await.can_unshare_user_namespace_and_change_root); let _stats = host .precheck_pvf(::test_parachain_adder::wasm_binary_unwrap(), Default::default()) .await .unwrap(); } // Checks the the artifact is not re-prepared when the executor environment parameters change // in a way not affecting the preparation #[tokio::test] async fn artifact_does_not_reprepare_on_non_meaningful_exec_parameter_change() { let host = TestHost::new_with_config(|cfg| { cfg.prepare_workers_hard_max_num = 1; }) .await; let cache_dir = host.cache_dir.path(); let set1 = ExecutorParams::default(); let set2 = ExecutorParams::from(&[ExecutorParam::PvfExecTimeout(PvfExecKind::Backing, 2500)][..]); let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), set1) .await .unwrap(); let md1 = { let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect(); assert_eq!(cache_dir.len(), 2); let mut artifact_path = cache_dir.pop().unwrap().unwrap(); if artifact_path.path().is_dir() { artifact_path = cache_dir.pop().unwrap().unwrap(); } std::fs::metadata(artifact_path.path()).unwrap() }; // FS times are not monotonical so we wait 2 secs here to be sure that the creation time of the // second attifact will be different tokio::time::sleep(Duration::from_secs(2)).await; let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), set2) .await .unwrap(); let md2 = { let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect(); assert_eq!(cache_dir.len(), 2); let mut artifact_path = cache_dir.pop().unwrap().unwrap(); if artifact_path.path().is_dir() { artifact_path = cache_dir.pop().unwrap().unwrap(); } std::fs::metadata(artifact_path.path()).unwrap() }; assert_eq!(md1.created().unwrap(), md2.created().unwrap()); } // Checks if the artifact is re-prepared if the re-preparation is needed by the nature of // the execution environment parameters change #[tokio::test] async fn artifact_does_reprepare_on_meaningful_exec_parameter_change() { let host = TestHost::new_with_config(|cfg| { cfg.prepare_workers_hard_max_num = 1; }) .await; let cache_dir = host.cache_dir.path(); let set1 = ExecutorParams::default(); let set2 = ExecutorParams::from(&[ExecutorParam::PvfPrepTimeout(PvfPrepKind::Prepare, 60000)][..]); let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), set1) .await .unwrap(); let cache_dir_contents: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect(); assert_eq!(cache_dir_contents.len(), 2); let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), set2) .await .unwrap(); let cache_dir_contents: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect(); assert_eq!(cache_dir_contents.len(), 3); // new artifact has been added } // Checks that we cannot prepare oversized compressed code #[tokio::test] async fn invalid_compressed_code_fails_prechecking() { let host = TestHost::new().await; let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; let validation_code = sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap(); let res = host.precheck_pvf(&validation_code, Default::default()).await; assert_matches!(res, Err(PrepareError::CouldNotDecompressCodeBlob(_))); } // Checks that we cannot validate with oversized compressed code #[tokio::test] async fn invalid_compressed_code_fails_validation() { let host = TestHost::new().await; let pvd = PersistedValidationData { parent_head: Default::default(), relay_parent_number: 1u32, relay_parent_storage_root: H256::default(), max_pov_size: 4096 * 1024, }; let pov = PoV { block_data: BlockData(Vec::new()) }; let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; let validation_code = sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap(); let result = host.validate_candidate(&validation_code, pvd, pov, Default::default()).await; assert_matches!( result, Err(ValidationError::Preparation(PrepareError::CouldNotDecompressCodeBlob(_))) ); } // Checks that we cannot validate with an oversized PoV #[tokio::test] async fn invalid_compressed_pov_fails_validation() { let host = TestHost::new().await; let pvd = PersistedValidationData { parent_head: Default::default(), relay_parent_number: 1u32, relay_parent_storage_root: H256::default(), max_pov_size: 4096 * 1024, }; let raw_block_data = vec![1u8; POV_BOMB_LIMIT + 1]; let block_data = sp_maybe_compressed_blob::compress(&raw_block_data, POV_BOMB_LIMIT + 1).unwrap(); let pov = PoV { block_data: BlockData(block_data) }; let result = host .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; assert_matches!( result, Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)) ); }