use bytes::Bytes; use kip_db::kernel::io::{FileExtension, IoFactory, IoType}; use kip_db::kernel::lsm::storage::KipStorage; use kip_db::kernel::KernelResult; use kip_db::kernel::Storage; use std::io::{Read, Seek, SeekFrom, Write}; use tempfile::TempDir; use walkdir::WalkDir; #[test] fn get_stored_value() -> KernelResult<()> { #[cfg(feature = "sled")] { use kip_db::kernel::sled_storage::SledStorage; get_stored_value_with_kv_store::()?; } get_stored_value_with_kv_store::()?; Ok(()) } fn get_stored_value_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let key1: Vec = encode_key("key1")?; let key2: Vec = encode_key("key2")?; let value1: Vec = encode_key("value1")?; let value2: Vec = encode_key("value2")?; let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let kv_store = T::open(temp_dir.path()).await?; kv_store .set(Bytes::from(key1.clone()), Bytes::from(value1.clone())) .await?; kv_store .set(Bytes::from(key2.clone()), Bytes::from(value2.clone())) .await?; kv_store.flush().await?; kv_store.get(&key1).await?; kv_store.get(&key2).await?; kv_store.flush().await?; // Open from disk again and check persistent data. drop(kv_store); let kv_store = T::open(temp_dir.path()).await?; assert_eq!(kv_store.get(&key1).await?, Some(Bytes::from(value1))); assert_eq!(kv_store.get(&key2).await?, Some(Bytes::from(value2))); Ok(()) }) } // Should overwrite existent value. #[test] fn overwrite_value() -> KernelResult<()> { #[cfg(feature = "sled")] { use kip_db::kernel::sled_storage::SledStorage; overwrite_value_with_kv_store::()?; } overwrite_value_with_kv_store::()?; Ok(()) } fn overwrite_value_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let key1: Vec = encode_key("key1")?; let value1: Vec = encode_key("value1")?; let value2: Vec = encode_key("value2")?; let value3: Vec = encode_key("value3")?; let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let kv_store = T::open(temp_dir.path()).await?; kv_store .set(Bytes::from(key1.clone()), Bytes::from(value1.clone())) .await?; kv_store.flush().await?; assert_eq!( kv_store.get(&key1).await?, Some(Bytes::from(value1.clone())) ); kv_store .set(Bytes::from(key1.clone()), Bytes::from(value2.clone())) .await?; kv_store.flush().await?; assert_eq!( kv_store.get(&key1).await?, Some(Bytes::from(value2.clone())) ); drop(kv_store); let kv_store = T::open(temp_dir.path()).await?; assert_eq!( kv_store.get(&key1).await?, Some(Bytes::from(value2.clone())) ); kv_store .set(Bytes::from(key1.clone()), Bytes::from(value3.clone())) .await?; kv_store.flush().await?; assert_eq!( kv_store.get(&key1).await?, Some(Bytes::from(value3.clone())) ); Ok(()) }) } // Should get `None` when getting a non-existent key. #[test] fn get_non_existent_value() -> KernelResult<()> { #[cfg(feature = "sled")] { use kip_db::kernel::sled_storage::SledStorage; get_non_existent_value_with_kv_store::()?; } get_non_existent_value_with_kv_store::()?; Ok(()) } fn get_non_existent_value_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let key1: Vec = encode_key("key1")?; let key2: Vec = encode_key("key2")?; let value1: Vec = encode_key("value1")?; let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let kv_store = T::open(temp_dir.path()).await?; kv_store .set(Bytes::from(key1.clone()), Bytes::from(value1)) .await?; assert_eq!(kv_store.get(&key2).await?, None); // Open from disk again and check persistent data. kv_store.flush().await?; drop(kv_store); let kv_store = T::open(temp_dir.path()).await?; assert_eq!(kv_store.get(&key2).await?, None); Ok(()) }) } #[test] fn remove_non_existent_key() -> KernelResult<()> { #[cfg(feature = "sled")] { use kip_db::kernel::sled_storage::SledStorage; remove_non_existent_key_with_kv_store::()?; } remove_non_existent_key_with_kv_store::()?; Ok(()) } fn remove_non_existent_key_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let key1: Vec = encode_key("key1")?; let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let kv_store = T::open(temp_dir.path()).await?; assert!(kv_store.remove(&key1).await.is_err()); Ok(()) }) } #[test] fn remove_key() -> KernelResult<()> { #[cfg(feature = "sled")] { use kip_db::kernel::sled_storage::SledStorage; remove_key_with_kv_store::()?; } remove_key_with_kv_store::()?; Ok(()) } fn remove_key_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let key1: Vec = encode_key("key1")?; let value1: Vec = encode_key("value1")?; let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let kv_store = T::open(temp_dir.path()).await?; kv_store .set(Bytes::from(key1.clone()), Bytes::from(value1)) .await?; assert!(kv_store.remove(&key1).await.is_ok()); assert_eq!(kv_store.get(&key1).await?, None); Ok(()) }) } // Insert data until total size of the directory decreases. // Test data correctness after compaction. #[test] fn compaction() -> KernelResult<()> { #[cfg(feature = "sled")] { use kip_db::kernel::sled_storage::SledStorage; compaction_with_kv_store::()?; } compaction_with_kv_store::()?; Ok(()) } // 如果此处出现异常,可以尝试降低压缩阈值或者提高检测时间 fn compaction_with_kv_store() -> KernelResult<()> { tokio_test::block_on(async move { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let kv_store = T::open(temp_dir.path()).await?; let dir_size = || { let entries = WalkDir::new(temp_dir.path()).into_iter(); let len: walkdir::Result = entries .map(|res| { res.and_then(|entry| entry.metadata()) .map(|metadata| metadata.len()) }) .sum(); len.expect("fail to get directory size") }; let mut current_size = dir_size(); for iter in 0..1000 { for key_id in 0..1000 { let key = format!("key{}", key_id); let value = format!("{}", iter); kv_store .set( Bytes::from(encode_key(key.as_str())?), Bytes::from(encode_key(value.as_str())?), ) .await? } kv_store.flush().await?; let new_size = dir_size(); if new_size > current_size { current_size = new_size; continue; } // Compaction triggered. drop(kv_store); // reopen and check content. let kv_store = T::open(temp_dir.path()).await?; for key_id in 0..1000 { let key = format!("key{}", key_id); assert_eq!( kv_store.get(&encode_key(key.as_str())?).await?, Some(Bytes::from(encode_key(format!("{}", iter).as_str())?)) ); } return Ok(()); } panic!("No compaction detected"); }) } #[test] fn test_io() -> KernelResult<()> { let temp_dir = TempDir::new().expect("unable to create temporary working directory"); let factory = IoFactory::new(temp_dir.path(), FileExtension::Log).unwrap(); io_type_test(&factory, IoType::Buf)?; io_type_test(&factory, IoType::Direct)?; Ok(()) } fn io_type_test(factory: &IoFactory, io_type: IoType) -> KernelResult<()> { let mut writer = factory.writer(1, io_type)?; let data_write1 = vec![b'1', b'2', b'3']; let data_write2 = vec![b'4', b'5', b'6']; let pos_1 = writer.current_pos()?; let len_1 = writer.write(&data_write1)?; let pos_2 = writer.current_pos()?; let len_2 = writer.write(&data_write2)?; writer.flush()?; let mut reader = factory.reader(1, io_type)?; let mut buf = [0; 6]; reader.read_exact(&mut buf)?; assert_eq!([b'1', b'2', b'3', b'4', b'5', b'6'], buf); assert_eq!(pos_1, 0); assert_eq!(pos_2, 3); assert_eq!(len_1, 3); assert_eq!(len_2, 3); assert_eq!(reader.seek(SeekFrom::Start(2))?, 2); assert_eq!(reader.seek(SeekFrom::End(-1))?, 5); assert_eq!(reader.seek(SeekFrom::Current(-1))?, 4); assert_eq!(reader.file_size()?, 6); assert!(factory.exists(1)?); factory.clean(1)?; assert!(!factory.exists(1)?); Ok(()) } fn encode_key(key: &str) -> KernelResult> { Ok(bincode::serialize(key)?) }