// Copyright (C) 2021 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at hello@quickwit.io.
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
//! Fail points are a form of code instrumentation that allow errors and other behaviors
//! to be injected dynamically at runtime, primarily for testing purposes. Fail
//! points are flexible and can be configured to exhibit a variety of behaviors,
//! including panics, early returns, and sleeps. They can be controlled both
//! programmatically and via the environment, and can be triggered conditionally
//! and probabilistically.
//!
//! They rely on a global variable, which requires them to be executed in a single
//! thread.
//! For this reason, we isolate them from the other unit tests and define an
//! independant binary target.
//!
//! They are not executed by default.
//! They are executed in CI and can be executed locally
//! `cargo test --features fail/failpoints test_failpoint -- --test-threads`
//!
//! Below we test panics at different steps in the indexing pipeline.
use std::path::Path;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use fail::FailScenario;
use quickwit_actors::{create_test_mailbox, ActorExitStatus, Universe};
use quickwit_common::rand::append_random_suffix;
use quickwit_common::split_file;
use quickwit_indexing::actors::MergeExecutor;
use quickwit_indexing::merge_policy::MergeOperation;
use quickwit_indexing::models::{MergeScratch, ScratchDirectory};
use quickwit_indexing::{get_tantivy_directory_from_split_bundle, new_split_id, TestSandbox};
use quickwit_metastore::{SplitMetadata, SplitState};
use tantivy::Directory;
#[tokio::test]
async fn test_failpoint_no_failure() -> anyhow::Result<()> {
let scenario = FailScenario::setup();
aux_test_failpoints().await?;
scenario.teardown();
Ok(())
}
fn deterministic_panic_sequence(mut panics: Vec) -> impl Fn() + Send + Sync {
panics.reverse();
let panics = Mutex::new(panics);
move || {
let should_panic = panics.lock().unwrap().pop().unwrap_or(false);
if should_panic {
panic!("panicked");
}
}
}
#[tokio::test]
async fn test_failpoint_packager_panics_right_away() -> anyhow::Result<()> {
let scenario = FailScenario::setup();
fail::cfg_callback("packager:before", deterministic_panic_sequence(vec![true])).unwrap();
aux_test_failpoints().await?;
scenario.teardown();
Ok(())
}
#[tokio::test]
async fn test_failpoint_packager_panics_after_one_success() -> anyhow::Result<()> {
let scenario = FailScenario::setup();
fail::cfg_callback(
"packager:before",
deterministic_panic_sequence(vec![false, true]),
)
.unwrap();
aux_test_failpoints().await?;
scenario.teardown();
Ok(())
}
#[tokio::test]
async fn test_failpoint_publisher_panics_after_one_success() -> anyhow::Result<()> {
let scenario = FailScenario::setup();
fail::cfg_callback(
"publisher:before",
deterministic_panic_sequence(vec![false, true]),
)
.unwrap();
aux_test_failpoints().await?;
scenario.teardown();
Ok(())
}
#[tokio::test]
async fn test_failpoint_publisher_panics_right_away() -> anyhow::Result<()> {
let scenario = FailScenario::setup();
fail::cfg_callback("publisher:before", deterministic_panic_sequence(vec![true])).unwrap();
aux_test_failpoints().await?;
scenario.teardown();
Ok(())
}
#[tokio::test]
async fn test_failpoint_publisher_after_panics_right_away() -> anyhow::Result<()> {
let scenario = FailScenario::setup();
fail::cfg_callback("publisher:after", deterministic_panic_sequence(vec![true])).unwrap();
aux_test_failpoints().await?;
scenario.teardown();
Ok(())
}
#[tokio::test]
async fn test_failpoint_uploader_panics_right_away() -> anyhow::Result<()> {
let scenario = FailScenario::setup();
fail::cfg_callback(
"uploader:before",
deterministic_panic_sequence(vec![false, true]),
)
.unwrap();
aux_test_failpoints().await?;
scenario.teardown();
Ok(())
}
#[tokio::test]
async fn test_failpoint_uploader_panics_after_one_sucess() -> anyhow::Result<()> {
let scenario = FailScenario::setup();
fail::cfg_callback("uploader:before", deterministic_panic_sequence(vec![true])).unwrap();
aux_test_failpoints().await?;
scenario.teardown();
Ok(())
}
#[tokio::test]
async fn test_failpoint_uploader_after_panics_right_away() -> anyhow::Result<()> {
let scenario = FailScenario::setup();
fail::cfg_callback("uploader:after", deterministic_panic_sequence(vec![true])).unwrap();
aux_test_failpoints().await?;
scenario.teardown();
Ok(())
}
async fn aux_test_failpoints() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let doc_mapper_yaml = r#"
field_mappings:
- name: body
type: text
- name: ts
type: i64
fast: true
"#;
let indexing_setting_yaml = r#"
timestamp_field: ts
"#;
let search_fields = ["body"];
let index_id = append_random_suffix("test-index");
let test_index_builder = TestSandbox::create(
&index_id,
doc_mapper_yaml,
indexing_setting_yaml,
&search_fields,
)
.await?;
let batch_1: Vec = vec![
serde_json::json!({"body ": "1", "ts": 1629889530 }),
serde_json::json!({"body ": "2", "ts": 1629889531 }),
];
let batch_2: Vec = vec![
serde_json::json!({"body ": "3", "ts": 1629889532 }),
serde_json::json!({"body ": "4", "ts": 1629889533 }),
];
test_index_builder.add_documents(batch_1).await?;
test_index_builder.add_documents(batch_2).await?;
let mut splits = test_index_builder
.metastore()
.list_splits(&index_id, SplitState::Published, None, None)
.await?;
splits.sort_by_key(|split| *split.split_metadata.time_range.clone().unwrap().start());
assert_eq!(splits.len(), 2);
assert_eq!(
splits[0].split_metadata.time_range.clone().unwrap(),
1629889530..=1629889531
);
assert_eq!(
splits[1].split_metadata.time_range.clone().unwrap(),
1629889532..=1629889533
);
Ok(())
}
const TEST_TEXT: &'static str = r#"His sole child, my lord, and bequeathed to my
overlooking. I have those hopes of her good that
her education promises; her dispositions she
inherits, which makes fair gifts fairer; for where
an unclean mind carries virtuous qualities, there
commendations go with pity; they are virtues and
traitors too; in her they are the better for their
simpleness; she derives her honesty and achieves her goodness."#;
#[tokio::test]
async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Result<()> {
// This tests checks that if a merger is killed in a middle of
// a merge, then the controlled directory makes it possible to
// abort the merging operation and return quickly.
// NOTE(fmassot): This test is working but not as exactly we would want.
// Ideally we want the actor to stop while merging which is a long task and we
// don't want to wait until it's finished. But... the merging phase is
// currently in a protected zone and thus there will be not kill switch activated
// during this period. We added the protected zone because without we observe from
// time to time a kill switch activation because the ControlledDirectory did not
// do any write during a HEARTBEAT... Before removing the protect zone, we need
// to investigate this instability. Then this test will finally be really helpful.
quickwit_common::setup_logging_for_tests();
let doc_mapper_yaml = r#"
field_mappings:
- name: body
type: text
- name: ts
type: i64
fast: true
"#;
let indexing_setting_yaml = r#"
timestamp_field: ts
split_num_docs_target: 1000
"#;
let search_fields = ["body"];
let index_id = "test-index";
let test_index_builder = TestSandbox::create(
index_id,
doc_mapper_yaml,
indexing_setting_yaml,
&search_fields,
)
.await?;
let batch: Vec =
std::iter::repeat_with(|| serde_json::json!({"body ": TEST_TEXT, "ts": 1631072713 }))
.take(500)
.collect();
for _ in 0..2 {
test_index_builder.add_documents(batch.clone()).await?;
}
let metastore = test_index_builder.metastore();
let split_infos = metastore.list_all_splits(index_id).await?;
let splits: Vec = split_infos
.into_iter()
.map(|split| split.split_metadata)
.collect();
let merge_scratch_directory = ScratchDirectory::for_test()?;
let downloaded_splits_directory =
merge_scratch_directory.named_temp_child("downloaded-splits-")?;
let storage = test_index_builder.storage();
let mut tantivy_dirs: Vec> = vec![];
for split in &splits {
let split_filename = split_file(split.split_id());
let dest_filepath = downloaded_splits_directory.path().join(&split_filename);
storage
.copy_to_file(Path::new(&split_filename), &dest_filepath)
.await?;
tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap());
}
let merge_scratch = MergeScratch {
merge_operation: MergeOperation::Merge {
merge_split_id: new_split_id(),
splits,
},
merge_scratch_directory,
downloaded_splits_directory,
tantivy_dirs,
};
let (merge_packager_mailbox, _merge_packager_inbox) = create_test_mailbox();
let merge_executor = MergeExecutor::new(
index_id.to_string(),
merge_packager_mailbox,
None,
None,
10_000_000,
20_000_000,
);
let universe = Universe::new();
let (merge_executor_mailbox, merge_executor_handle) =
universe.spawn_actor(merge_executor).spawn();
// We want to make sure that the processing of the message gets
// aborted not by the actor framework, before the message is being processed.
//
// To do so, we
// - pause the actor right before the merge operation
// - send the message
// - wait 500ms to make sure the test has reached the "pause" point
// - kill the universe
// - unpause
//
// Before the controlled directory, the merge operation would have continued until it
// finished, taking hundreds of millisecs to terminate.
fail::cfg("before-merge-split", "pause").unwrap();
merge_executor_mailbox.send_message(merge_scratch).await?;
std::mem::drop(merge_executor_mailbox);
tokio::time::sleep(Duration::from_millis(500)).await;
universe.kill();
let start = Instant::now();
fail::cfg("before-merge-split", "off").unwrap();
let (exit_status, _) = merge_executor_handle.join().await;
assert!(start.elapsed() < Duration::from_millis(10));
assert!(matches!(exit_status, ActorExitStatus::Failure(_)));
Ok(())
}