mod batch; mod concurrency; mod iterator; use std::{io::ErrorKind, time::Duration}; pub use super::serve_test_database; use avalanche_types::subnet::rpc::database::{ corruptabledb::Database as CorruptableDb, manager::{versioned_database::VersionedDatabase, DatabaseManager}, memdb::Database as MemDb, rpcdb::{client::DatabaseClient, server::Server as RpcDb}, }; use semver::Version; use tokio::net::TcpListener; use tonic::transport::Channel; #[tokio::test] async fn rpcdb_mutation_test() { let _ = env_logger::builder() .filter_level(log::LevelFilter::Info) .is_test(true) .try_init(); let bar_value = "bar".as_bytes().to_vec(); let baz_value = "baz".as_bytes().to_vec(); let db = MemDb::new_boxed(); let server = RpcDb::new_boxed(db); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { serve_test_database(server, listener).await.unwrap(); }); tokio::time::sleep(Duration::from_millis(100)).await; let client_conn = Channel::builder(format!("http://{}", addr).parse().unwrap()) .connect() .await .unwrap(); let mut client = DatabaseClient::new_boxed(client_conn); log::info!("put foo:bar"); let resp = client.put("foo".as_bytes(), "bar".as_bytes()).await; assert!(resp.is_ok()); log::info!("get foo:bar"); let resp = client.get("foo".as_bytes()).await; let value = resp.unwrap(); assert_eq!(value, bar_value.clone()); // verify valid response from cloning client let mut client = client.clone(); log::info!("put foo:baz"); let resp = client.put("foo".as_bytes(), "baz".as_bytes()).await; assert!(resp.is_ok()); log::info!("get foo:baz"); let resp = client.get("foo".as_bytes()).await; let value = resp.unwrap(); assert_eq!(value, baz_value.clone()); log::info!("has foo true"); let resp = client.has("foo".as_bytes()).await; assert!(resp.is_ok()); assert!(resp.unwrap()); log::info!("get fool error not found"); let resp = client.get("fool".as_bytes()).await; assert!(resp.is_err()); assert_eq!(resp.unwrap_err().kind(), ErrorKind::NotFound); log::info!("has fool false"); let resp = client.has("fool".as_bytes()).await; assert!(resp.is_ok()); assert!(!resp.unwrap()); log::info!("close client"); let resp = client.close().await; assert!(resp.is_ok()); log::info!("get foo error closed"); let resp = client.get("foo".as_bytes()).await; assert!(resp.is_err()); assert!(resp.unwrap_err().to_string().contains("database closed")); } #[tokio::test] async fn corruptibledb_mutation_test() { let _ = env_logger::builder() .filter_level(log::LevelFilter::Info) .is_test(true) .try_init(); let bar_value = "bar".as_bytes().to_vec(); let memdb = MemDb::new_boxed(); let mut corruptible = CorruptableDb::new_boxed(memdb); log::info!("put foo:bar"); let resp = corruptible.put("foo".as_bytes(), "bar".as_bytes()).await; assert!(resp.is_ok()); log::info!("get foo:bar"); let resp = corruptible.get("foo".as_bytes()).await; let value = resp.unwrap(); assert_eq!(value, bar_value.clone()); log::info!("put foo:baz"); let resp = corruptible.put("foo".as_bytes(), "baz".as_bytes()).await; assert!(resp.is_ok()); log::info!("has foo true"); let resp = corruptible.has("foo".as_bytes()).await; assert!(resp.is_ok()); assert!(resp.unwrap()); log::info!("get fool error not found"); let resp = corruptible.get("fool".as_bytes()).await; assert!(resp.is_err()); assert_eq!(resp.unwrap_err().kind(), ErrorKind::NotFound); log::info!("has fool false"); let resp = corruptible.has("fool".as_bytes()).await; assert!(resp.is_ok()); assert!(!resp.unwrap()); log::info!("close client"); let resp = corruptible.close().await; assert!(resp.is_ok()); log::info!("get foo error closed"); let resp = corruptible.put("foo".as_bytes(), "baz".as_bytes()).await; assert!(resp.is_err()); assert!(resp.unwrap_err().to_string().contains("database closed")); } #[tokio::test] async fn test_rpcdb_corruptible() { let _ = env_logger::builder() .filter_level(log::LevelFilter::Info) .is_test(true) .try_init(); let bar_value = "bar".as_bytes().to_vec(); let baz_value = "baz".as_bytes().to_vec(); let memdb = MemDb::new_boxed(); let rpc_server = RpcDb::new_boxed(memdb); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { serve_test_database(rpc_server, listener).await.unwrap(); }); tokio::time::sleep(Duration::from_millis(100)).await; let client_conn = Channel::builder(format!("http://{}", addr).parse().unwrap()) .connect() .await .unwrap(); let db = DatabaseClient::new_boxed(client_conn); let mut client = CorruptableDb::new_boxed(db); log::info!("put foo:bar"); let resp = client.put("foo".as_bytes(), "bar".as_bytes()).await; assert!(resp.is_ok()); log::info!("get foo:bar"); let resp = client.get("foo".as_bytes()).await; let value = resp.unwrap(); assert_eq!(value, bar_value.clone()); // verify valid response from cloning client let mut client = client.clone(); log::info!("put foo:baz"); let resp = client.put("foo".as_bytes(), "baz".as_bytes()).await; assert!(resp.is_ok()); log::info!("get foo:baz"); let resp = client.get("foo".as_bytes()).await; let value = resp.unwrap(); assert_eq!(value, baz_value.clone()); log::info!("has foo true"); let resp = client.has("foo".as_bytes()).await; assert!(resp.is_ok()); assert!(resp.unwrap()); log::info!("get fool error not found"); let resp = client.get("fool".as_bytes()).await; assert!(resp.is_err()); assert_eq!(resp.unwrap_err().kind(), ErrorKind::NotFound); log::info!("has fool false"); let resp = client.has("fool".as_bytes()).await; assert!(resp.is_ok()); assert!(!resp.unwrap()); log::info!("close client"); let resp = client.close().await; assert!(resp.is_ok()); log::info!("get foo error closed"); let resp = client.get("foo".as_bytes()).await; assert!(resp.is_err()); assert!(resp.unwrap_err().to_string().contains("database closed")); } #[tokio::test] async fn test_db_manager() { use avalanche_types::subnet::rpc::database::manager::Manager; let _ = env_logger::builder() .filter_level(log::LevelFilter::Info) .is_test(true) .try_init(); let bar_value = "bar".as_bytes().to_vec(); let baz_value = "baz".as_bytes().to_vec(); let memdb = MemDb::new_boxed(); let rpc_server = RpcDb::new_boxed(memdb); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { serve_test_database(rpc_server, listener).await.unwrap(); }); tokio::time::sleep(Duration::from_millis(100)).await; let client_conn = Channel::builder(format!("http://{}", addr).parse().unwrap()) .connect() .await .unwrap(); let vdb = VersionedDatabase::new( DatabaseClient::new_boxed(client_conn), Version::new(0, 0, 1), ); let databases: Vec = vec![vdb]; let manager = DatabaseManager::from_databases(databases); let current = manager.current().await.unwrap(); let mut client = current.db; log::info!("put foo:bar"); let resp = client.put("foo".as_bytes(), "bar".as_bytes()).await; assert!(resp.is_ok()); log::info!("get foo:bar"); let resp = client.get("foo".as_bytes()).await; let value = resp.unwrap(); assert_eq!(value, bar_value.clone()); // verify valid response from cloning client let mut client = client.clone(); log::info!("put foo:baz"); let resp = client.put("foo".as_bytes(), "baz".as_bytes()).await; assert!(resp.is_ok()); log::info!("get foo:baz"); let resp = client.get("foo".as_bytes()).await; let value = resp.unwrap(); assert_eq!(value, baz_value.clone()); log::info!("close all db with manager"); let _ = manager.close().await; log::info!("get foo error closed"); let resp = client.get("foo".as_bytes()).await; assert!(resp.is_err()); assert!(resp.unwrap_err().to_string().contains("database closed")); }