mod helpers; use helpers::{create_and_drop, create_and_drop_with_config, CountDown}; use persy::{Config, OpenError, OpenOptions, Persy, ValueMode}; use std::fs; use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; use std::sync::Arc; use std::thread; use tempfile::{tempfile, Builder, NamedTempFile}; #[test] fn openoptions_truncate_locked() { { // first time create let _persy_create = OpenOptions::new() .create_new(true) .prepare_with(|persy: &Persy| { let mut tx = persy.begin()?; tx.create_segment("seg")?; let data = b"hello"; tx.insert("seg", &data[..])?; let prepared = tx.prepare()?; prepared.commit()?; Ok(()) }) .open("./target/truncate-locked.persy") .unwrap(); } // first acquire the file lock let persy_lock = OpenOptions::new().open("./target/truncate-locked.persy").unwrap(); // then try to truncate it let error = OpenOptions::new() .truncate(true) .open("./target/truncate-locked.persy") .map(drop) .unwrap_err(); // it must be unsuccessful match &*error { OpenError::AlreadyInUse(_) => (), otherwise => panic!("error is {:?} and not an Io error", otherwise), } // the file must not have been truncated // and contains the segment data let any_hello = persy_lock.scan("seg").unwrap().any(|(_, content)| content == b"hello"); assert!(any_hello); fs::remove_file("./target/truncate-locked.persy").unwrap(); } #[test] fn openoptions_create() { { // first time create let _persy_create = OpenOptions::new() .create_new(true) .open("./target/create.persy") .unwrap(); } // second time create let _persy_create = OpenOptions::new().create(true).open("./target/create.persy").unwrap(); fs::remove_file("./target/create.persy").unwrap(); } #[test] fn openoptions_create_new() { // first time create let _persy_create = OpenOptions::new() .create_new(true) .open("./target/create-new.persy") .unwrap(); // second time create let error = OpenOptions::new() .create_new(true) .open("./target/create-new.persy") .map(drop) .unwrap_err(); match &*error { OpenError::AlreadyExists => (), otherwise => panic!("error is {:?} and not an Io error", otherwise), } fs::remove_file("./target/create-new.persy").unwrap(); } #[test] fn openoptions_open() { { // first time create let _persy_create = OpenOptions::new().create_new(true).open("./target/open.persy").unwrap(); } // second time create let _persy_open = OpenOptions::new().open("./target/open.persy").unwrap(); fs::remove_file("./target/open.persy").unwrap(); } #[test] fn openoptions_memory() { let persy_open = OpenOptions::new().memory().unwrap(); let mut tx = persy_open.begin().unwrap(); tx.create_segment("one").unwrap(); tx.insert("one", "".as_bytes()).unwrap(); tx.prepare().unwrap().commit().unwrap(); } #[test] fn openoptions_prepare() { { // first time create let _persy_create = OpenOptions::new() .create_new(true) .open("./target/prepare.persy") .unwrap(); } { // do not prepare if already exists and not truncated let _persy_prepare = OpenOptions::new() .prepare_with(|_persy: &Persy| panic!("prepare has been called")) .open("./target/prepare.persy") .unwrap(); } let as_been_initialized = Arc::new(AtomicBool::new(false)); // second time create let _persy_open = OpenOptions::new() .truncate(true) .prepare_with({ let as_been_initialized = as_been_initialized.clone(); move |_persy: &Persy| { as_been_initialized.store(true, Relaxed); Ok(()) } }) .open("./target/prepare.persy") .unwrap(); // the truncation must have trigger the prepare function assert_eq!(as_been_initialized.load(Relaxed), true); fs::remove_file("./target/prepare.persy").unwrap(); } #[test] fn create() { Persy::create_from_file(tempfile().unwrap()).unwrap(); } #[test] fn lock_double_open() { { Persy::create("./target/file_dd.persy").unwrap(); let open = Persy::open("./target/file_dd.persy", Config::new()); assert!(!open.is_err()); let open1 = Persy::open("./target/file_dd.persy", Config::new()); assert!(open1.is_err()); } fs::remove_file("./target/file_dd.persy").unwrap(); } #[test] fn fail_double_create() { { let res = Persy::create("./target/file2.persy"); assert!(!res.is_err()); } let res = Persy::create("./target/file2.persy"); fs::remove_file("./target/file2.persy").unwrap(); assert!(res.is_err()); } #[test] fn create_open() { let file = Builder::new() .prefix("open") .suffix(".persy") .tempfile() .expect("expect temp file creation"); Persy::create_from_file(file.reopen().unwrap()).unwrap(); let open = Persy::open_from_file(file.reopen().unwrap(), Config::new()); assert!(!open.is_err()); } #[test] fn test_rollback() { create_and_drop("rollback", |persy| { let mut tx = persy.begin().unwrap(); tx.create_segment("test").unwrap(); let finalizer = tx.prepare().unwrap(); finalizer.commit().unwrap(); let mut tx = persy.begin().unwrap(); let rec_data: String = "something".into(); let bytes = rec_data.into_bytes(); let id = tx.insert("test", &bytes).unwrap(); tx.rollback().unwrap(); let read_after = persy.read("test", &id).unwrap(); if let Some(_) = read_after { assert!(false); } else { assert!(true); } }); } #[test] fn test_rollback_precommit() { create_and_drop("rollback_pre", |persy| { let mut tx = persy.begin().unwrap(); tx.create_segment("test").unwrap(); let finalizer = tx.prepare().unwrap(); finalizer.commit().unwrap(); let mut tx = persy.begin().unwrap(); let rec_data: String = "something".into(); let bytes = rec_data.into_bytes(); let id = tx.insert("test", &bytes).unwrap(); let finalizer = tx.prepare().unwrap(); finalizer.rollback().unwrap(); let read_after = persy.read("test", &id).unwrap(); if let Some(_) = read_after { assert!(false); } else { assert!(true); } }); } #[test] fn test_rollback_update() { create_and_drop("rollback_update", |persy| { let mut tx = persy.begin().unwrap(); tx.create_segment("test").unwrap(); let rec_data: String = "something".into(); let bytes = rec_data.into_bytes(); let id = tx.insert("test", &bytes).unwrap(); let read_opt = tx.read("test", &id).unwrap(); if let Some(read) = read_opt { assert_eq!(bytes, read); } else { assert!(false); } let finalizer = tx.prepare().unwrap(); finalizer.commit().unwrap(); let mut tx1 = persy.begin().unwrap(); let rec_data_1: String = "something2".into(); let bytes_1 = rec_data_1.into_bytes(); tx1.update("test", &id, &bytes_1).unwrap(); let read_after = tx1.read("test", &id).unwrap(); if let Some(val) = read_after { assert_eq!(val, bytes_1); } else { assert!(false); } tx1.rollback().unwrap(); let read_after = persy.read("test", &id).unwrap(); if let Some(val) = read_after { assert_eq!(val, bytes); } else { assert!(false); } }); } #[test] pub fn concurrent_create() { create_and_drop("rollback_create", |persy| { let mut tx = persy.begin().expect("error on transaction begin"); tx.create_segment("def").expect("error on segment creation"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); let count = Arc::new(CountDown::new(2)); for _ in &[1, 2] { let count = count.clone(); let persy = persy.clone(); thread::spawn(move || { let mut tx = persy.begin().expect("error on transaction begin"); let val = String::from("aaa").into_bytes(); tx.insert("def", &val).expect("error on insert value"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); count.count_down().expect("lock not panic"); }); } count.wait().expect("threads not finisced"); let val = String::from("aaa").into_bytes(); let mut cc = 0; for (_, content) in persy.scan("def").expect("error on scan") { assert_eq!(content, val); cc += 1; } assert_eq!(cc, 2); }); } #[test] pub fn concurrent_update_removed() { create_and_drop("concurrent_update_remove", |persy| { let mut tx = persy.begin().expect("error on transaction begin"); tx.create_segment("def").expect("error on segment creation"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); let mut tx = persy.begin().expect("error on transaction begin"); let val = String::from("aaa").into_bytes(); let id = tx.insert("def", &val).expect("error on insert value"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); let mut tx = persy.begin().expect("error on transaction begin"); let val = String::from("cccc").into_bytes(); tx.update("def", &id, &val).expect("error on update value"); let count = Arc::new(CountDown::new(1)); { let count = count.clone(); let persy = persy.clone(); let id = id.clone(); thread::spawn(move || { let mut tx = persy.begin().expect("error on transaction begin"); tx.delete("def", &id).expect("error on delete value"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); count.count_down().expect("lock not panic"); }); } count.wait().expect("threads not finisced"); let fin = tx.prepare(); assert!(fin.is_err()); }); } #[test] #[allow(unused_must_use)] pub fn test_rollback_prepared_tx() { Persy::create("./target/test_recover_rollback_prepared.persy").unwrap(); let id; let val; { let persy = Persy::open("./target/test_recover_rollback_prepared.persy", Config::new()).unwrap(); let mut tx = persy.begin().expect("error on transaction begin"); tx.create_segment("def").expect("error on segment creation"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); let mut tx = persy.begin().expect("error on transaction begin"); val = String::from("aaa").into_bytes(); id = tx.insert("def", &val).expect("error on insert value"); tx.prepare().expect("error on commit prepare"); } { let persy = Persy::open_with_recover("./target/test_recover_rollback_prepared.persy", Config::new(), |_| { false }) .unwrap(); assert_eq!(persy.read("def", &id).expect("error reading record"), None); } fs::remove_file("./target/test_recover_rollback_prepared.persy").unwrap(); } #[test] #[allow(unused_must_use)] pub fn test_autorollback_lost_finalize() { Persy::create("./target/test_auto_rollback.persy").unwrap(); let id; { let persy = Persy::open("./target/test_auto_rollback.persy", Config::new()).unwrap(); let mut tx = persy.begin().expect("error on transaction begin"); tx.create_segment("def").expect("error on segment creation"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); let mut tx = persy.begin().expect("error on transaction begin"); let val = String::from("aaa").into_bytes(); id = tx.insert("def", &val).expect("error on insert value"); tx.prepare().expect("error on commit prepare"); } { let persy = Persy::open("./target/test_auto_rollback.persy", Config::new()).unwrap(); assert_eq!(persy.read("def", &id).expect("error reading record"), None); } fs::remove_file("./target/test_auto_rollback.persy").unwrap(); } #[test] pub fn test_recover_stale_tx() { Persy::create("./target/test_recover_stale.persy").unwrap(); let id; { let persy = Persy::open("./target/test_recover_stale.persy", Config::new()).unwrap(); let mut tx = persy.begin().expect("error on transaction begin"); tx.create_segment("def").expect("error on segment creation"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); let mut tx = persy.begin().expect("error on transaction begin"); let val = String::from("aaa").into_bytes(); id = tx.insert("def", &val).expect("error on insert value"); } { let persy = Persy::open("./target/test_recover_stale.persy", Config::new()).unwrap(); assert_eq!(persy.read("def", &id).expect("error reading record"), None); } fs::remove_file("./target/test_recover_stale.persy").unwrap(); } #[test] pub fn test_multiple_open_tx_close() { let file = Builder::new() .prefix("multiple_open_tx_close") .suffix(".persy") .tempfile() .expect("expect temp file creation"); Persy::create_from_file(file.reopen().unwrap()).unwrap(); { let persy = Persy::open_from_file(file.reopen().unwrap(), Config::new()).unwrap(); let mut tx = persy.begin().expect("error on transaction begin"); tx.create_segment("def").expect("error on segment creation"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); } for ite in 1..10 { let persy = Persy::open_from_file(file.reopen().unwrap(), Config::new()).unwrap(); let mut tx = persy.begin().expect("error on transaction begin"); let val = String::from("aaa").into_bytes(); tx.insert("def", &val).expect("error on insert value"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); let mut counter = 0; for _ in persy.scan("def").expect("read persistent records ") { counter += 1; } assert_eq!(ite, counter); } } #[test] pub fn test_multiple_open_close_restore_tx() { let file = Builder::new() .prefix("multiple_open_restore_close_tx") .suffix(".persy") .tempfile() .expect("expect temp file creation"); Persy::create_from_file(file.reopen().unwrap()).unwrap(); { let persy = Persy::open_from_file(file.reopen().unwrap(), Config::new()).unwrap(); let mut tx = persy.begin().expect("error on transaction begin"); tx.create_segment("def").expect("error on segment creation"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); } for ite in 1..20 { let recover = OpenOptions::new().recover_file(file.reopen().unwrap()).unwrap(); assert!(recover.list_transactions().len() <= 10); let persy = recover.finalize().expect("open correctly"); let mut tx = persy.begin().expect("error on transaction begin"); for _ in 0..20 { let val = String::from("aaa").into_bytes(); tx.insert("def", &val).expect("error on insert value"); } let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); let counter = persy.scan("def").expect("read persistent records ").count(); assert_eq!(ite * 20, counter); } } #[test] pub fn test_open_close_restore_multiple_tx() { let file = Builder::new() .prefix("open_restore_close_multiple_tx") .suffix(".persy") .tempfile() .expect("expect temp file creation"); Persy::create_from_file(file.reopen().unwrap()).unwrap(); { let persy = Persy::open_from_file(file.reopen().unwrap(), Config::new()).unwrap(); let mut tx = persy.begin().expect("error on transaction begin"); tx.create_segment("def").expect("error on segment creation"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); } for ite in 1..20 { let recover = OpenOptions::new().recover_file(file.reopen().unwrap()).unwrap(); assert!(recover.list_transactions().len() <= 60); let persy = recover.finalize().expect("open correctly"); for _ in 0..80 { let mut tx = persy.begin().expect("error on transaction begin"); let val = String::from("aaa").into_bytes(); tx.insert("def", &val).expect("error on insert value"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); } let counter = persy.scan("def").expect("read persistent records ").count(); assert_eq!(ite * 80, counter); } } #[cfg(feature = "background_ops")] #[test] pub fn test_background_sync() { use persy::TransactionConfig; create_and_drop("background_sync", |persy| { let mut tx = persy.begin().unwrap(); tx.create_segment("test").unwrap(); let finalizer = tx.prepare().unwrap(); finalizer.commit().unwrap(); let mut tx = persy .begin_with(TransactionConfig::new().set_background_sync(true)) .unwrap(); let rec_data: String = "something".into(); let bytes = rec_data.into_bytes(); let id = tx.insert("test", &bytes).unwrap(); let prep = tx.prepare().unwrap(); prep.commit().unwrap(); let read_after = persy.read("test", &id).unwrap(); assert!(read_after.is_some()); }); } #[test] fn test_operations_no_cache() { let mut config = Config::new(); config.change_cache_size(0); create_and_drop_with_config("operations_no_cache", config, |persy| { let rec_data: String = "something".into(); let bytes = rec_data.into_bytes(); let other_rec_data: String = "other".into(); let other_bytes = other_rec_data.into_bytes(); let mut tx = persy.begin().unwrap(); tx.create_segment("test").unwrap(); tx.create_index::("test", ValueMode::Cluster).unwrap(); let id_to_update = tx.insert("test", &bytes).unwrap(); tx.prepare().unwrap().commit().unwrap(); let mut tx = persy.begin().unwrap(); let id = tx.insert("test", &bytes).unwrap(); tx.put::("test", 10, 20).unwrap(); tx.update("test", &id_to_update, &other_bytes).unwrap(); tx.prepare().unwrap().commit().unwrap(); let read_after = persy.read("test", &id).unwrap(); assert!(read_after.is_some()); let read_after = persy.read("test", &id_to_update).unwrap(); assert_eq!(read_after, Some(other_bytes)); let mut read_after = persy.get::("test", &10).unwrap(); assert!(read_after.next().is_some()); let mut tx = persy.begin().unwrap(); tx.delete("test", &id).unwrap(); tx.delete("test", &id_to_update).unwrap(); tx.remove::("test", 10, None).unwrap(); tx.prepare().unwrap().commit().unwrap(); let read_after = persy.read("test", &id).unwrap(); assert!(read_after.is_none()); let mut read_after = persy.get::("test", &10).unwrap(); assert!(read_after.next().is_none()); let mut tx = persy.begin().unwrap(); tx.drop_segment("test").unwrap(); tx.drop_index("test").unwrap(); tx.prepare().unwrap().commit().unwrap(); }); } #[test] pub fn test_not_overgrow_reopen() { let file = Builder::new() .prefix("not_overgrow_reopen") .suffix(".persy") .tempfile() .expect("expect temp file creation"); Persy::create_from_file(file.reopen().expect("reopen")).expect(&format!("file '{:?}' do not exist", file)); do_ops(&file, |persy| { let mut tx = persy.begin().expect("transaction started"); tx.create_segment("def").expect("create segment work"); tx.create_index::("def_index", ValueMode::Cluster) .expect("create index works"); tx.prepare().expect("prepare works").commit().expect("commit works"); insert_ops(persy); }); let size = file.as_file().metadata().expect("error on metadata").len(); // Allow ~100% margin of overgrow let max = size * 2; for i in 0..10 { do_ops(&file, delete_ops); let size = file.as_file().metadata().expect("error on metadata").len(); assert!(size < max, " current size: {} max: {} iter:{}", size, max, i); do_ops(&file, insert_ops); let size = file.as_file().metadata().expect("error on metadata").len(); assert!(size < max, " current size: {} max: {} iter:{}", size, max, i); } } fn insert_ops(persy: &Persy) { for v in 0..100 { let mut tx = persy.begin().expect("error on transaction begin"); let val = String::from("aaa").into_bytes(); tx.insert("def", &val).expect("error on insert value"); tx.put::("def_index", v, v).expect("error on index"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); } } fn delete_ops(persy: &Persy) { let cursor = persy.scan("def").expect("error on scan"); for (id, _) in cursor { let mut tx = persy.begin().expect("error on transaction begin"); tx.delete("def", &id).expect("error on insert value"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); } for v in 0..100 { let mut tx = persy.begin().expect("error on transaction begin"); tx.remove::("def_index", v, Some(v)).expect("error on index"); let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); } } pub fn do_ops(file: &NamedTempFile, test: F) where F: FnOnce(&Persy), { let persy = Persy::open_from_file(file.reopen().expect("reopen"), Config::new()).unwrap(); test(&persy); } #[test] pub fn test_not_overgrow() { let file = Builder::new() .prefix("not_overgrow") .suffix(".persy") .tempfile() .expect("expect temp file creation"); Persy::create_from_file(file.reopen().expect("reopen")).expect(&format!("file '{:?}' do not exist", file)); let persy = Persy::open_from_file(file.reopen().expect("reopen"), Config::new()).unwrap(); let mut tx = persy.begin().expect("transaction started"); tx.create_segment("def").expect("create segment work"); tx.create_index::("def_index", ValueMode::Cluster) .expect("create index works"); tx.prepare().expect("prepare works").commit().expect("commit works"); insert_ops(&persy); let size = file.as_file().metadata().expect("error on metadata").len(); // Allow 30% margin of overgrow let max = size + size / 3; for i in 0..10 { delete_ops(&persy); let size = file.as_file().metadata().expect("error on metadata").len(); assert!(size < max, " current size: {} max: {} iter:{}", size, max, i); insert_ops(&persy); let size = file.as_file().metadata().expect("error on metadata").len(); assert!(size < max, " current size: {} max: {} iter:{}", size, max, i); } } #[test] pub fn test_delete_reopen() { let file = Builder::new() .prefix("delete_reopen") .suffix(".persy") .tempfile() .expect("expect temp file creation"); Persy::create_from_file(file.reopen().expect("reopen")).expect(&format!("file '{:?}' do not exist", file)); do_ops(&file, |persy| { let mut tx = persy.begin().expect("transaction started"); tx.create_segment("def").expect("create segment work"); tx.prepare().expect("prepare works").commit().expect("commit works"); }); do_ops(&file, insert_data_ops); do_ops(&file, delete_data_ops); do_ops(&file, delete_data_ops); } fn insert_data_ops(persy: &Persy) { let mut tx = persy.begin().expect("error on transaction begin"); for _ in 0..100 { let val = String::from("aaa").into_bytes(); tx.insert("def", &val).expect("error on insert value"); } let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); } fn delete_data_ops(persy: &Persy) { let cursor = persy.scan("def").expect("error on scan"); let mut tx = persy.begin().expect("error on transaction begin"); for (id, _) in cursor { tx.delete("def", &id).expect("error on insert value"); } let fin = tx.prepare().expect("error on commit prepare"); fin.commit().expect("error on commit"); }