use anyhow::{bail, Context, Result}; use indexmap::IndexSet; use std::{ env, path::{Path, PathBuf}, sync::atomic::{AtomicUsize, Ordering}, time::Duration, }; use tokio::{fs, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::subscriber::DefaultGuard; use url::Url; use warg_client::{ storage::{ContentStorage, PublishEntry, PublishInfo}, FileSystemClient, StorageLockResult, }; use warg_crypto::{ hash::AnyHash, signing::{KeyID, PrivateKey}, }; use warg_protocol::{operator, registry::PackageName}; use warg_server::{ datastore::DataStore, policy::{content::WasmContentPolicy, record::AuthorizedKeyPolicy}, Config, Server, }; use wit_parser::{Resolve, UnresolvedPackage}; pub fn test_namespaces() -> Option> { Some(vec![( "test".to_string(), operator::NamespaceState::Defined, )]) } pub fn test_operator_key() -> PrivateKey { let key = "ecdsa-p256:I+UlDo0HxyBBFeelhPPWmD+LnklOpqZDkrFP5VduASk="; PrivateKey::decode(key.to_string()).unwrap() } pub fn test_signing_key() -> PrivateKey { let key = "ecdsa-p256:2CV1EpLaSYEn4In4OAEDAj5O4Hzu8AFAxgHXuG310Ew="; PrivateKey::decode(key.to_string()).unwrap() } pub async fn create_client(config: &warg_client::Config) -> Result { match FileSystemClient::try_new_with_config(None, config, None).await? { StorageLockResult::Acquired(client) => Ok(client), _ => bail!("failed to acquire storage lock"), } } pub struct ServerInstance { task: Option>, shutdown: CancellationToken, _subscriber_guard: DefaultGuard, } impl Drop for ServerInstance { fn drop(&mut self) { futures::executor::block_on(async move { self.shutdown.cancel(); self.task.take().unwrap().await.unwrap(); }); } } pub async fn root() -> Result { static NEXT_ID: AtomicUsize = AtomicUsize::new(0); std::thread_local! { static TEST_ID: usize = NEXT_ID.fetch_add(1, Ordering::SeqCst); } let id = TEST_ID.with(|n| *n); let mut path = env::current_exe()?; path.pop(); // remove test exe name path.pop(); // remove `deps` path.pop(); // remove `debug` or `release` path.push("tests"); path.push( std::env::current_exe() .context("failed to get process name")? .file_name() .context("failed to get process name")? .to_str() .context("failed to get process name")?, ); path.push(format!("{id}")); fs::remove_dir_all(&path).await.ok(); let server_content_dir = path.join("server"); fs::create_dir_all(&server_content_dir).await?; let registries_dir = path.join("registries"); fs::create_dir_all(®istries_dir).await?; let content_dir = path.join("content"); fs::create_dir_all(&content_dir).await?; Ok(path) } /// Sets up logging for the current thread, active until the returned guard is dropped. fn thread_test_logging() -> DefaultGuard { let subscriber = tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .with_test_writer() .finish(); tracing::subscriber::set_default(subscriber) } /// Spawns a server as a background task. pub async fn spawn_server( root: &Path, content_base_url: Option, data_store: Option>, authorized_keys: Option>, ) -> Result<(ServerInstance, warg_client::Config)> { let _subscriber_guard = thread_test_logging(); let shutdown = CancellationToken::new(); let mut config = Config::new(test_operator_key(), test_namespaces(), root.join("server")) .with_addr(([127, 0, 0, 1], 0)) .with_shutdown(shutdown.clone().cancelled_owned()) .with_checkpoint_interval(Duration::from_millis(100)) .with_content_policy(WasmContentPolicy::default()); // For the tests, we assume only wasm content is allowed. if let Some(content_url) = content_base_url { config = config.with_content_base_url(content_url); } if let Some(authorized_keys) = authorized_keys { let mut policy = AuthorizedKeyPolicy::new(); for (namespace, key) in authorized_keys { policy = policy.with_namespace_key(namespace, key)?; } config = config.with_record_policy(policy); } if let Some(store) = data_store { config = config.with_boxed_data_store(store); } let server = Server::new(config).initialize().await?; let addr = server.local_addr()?; tracing::debug!("Test server running at {addr}"); let task = tokio::spawn(async move { let _subscriber_guard = thread_test_logging(); server.serve().await.unwrap(); }); let instance = ServerInstance { task: Some(task), shutdown, _subscriber_guard, }; let config = warg_client::Config { home_url: Some(format!("http://{addr}")), registries_dir: Some(root.join("registries")), content_dir: Some(root.join("content")), namespace_map_path: Some(root.join("namespaces")), keys: IndexSet::new(), keyring_auth: false, ignore_federation_hints: false, disable_auto_accept_federation_hints: false, disable_auto_package_init: true, disable_interactive: true, keyring_backend: None, }; Ok((instance, config)) } pub async fn publish( client: &FileSystemClient, name: &PackageName, version: &str, content: Vec, init: bool, signing_key: &PrivateKey, ) -> Result { let digest = client .content() .store_content( Box::pin(futures::stream::once(async move { Ok(content.into()) })), None, ) .await?; let mut entries = Vec::with_capacity(2); if init { entries.push(PublishEntry::Init); } entries.push(PublishEntry::Release { version: version.parse().unwrap(), content: digest.clone(), }); let record_id = client .publish_with_info( signing_key, PublishInfo { name: name.clone(), head: None, entries, }, ) .await?; client .wait_for_publish(name, &record_id, Duration::from_millis(100)) .await?; Ok(digest) } pub async fn publish_component( client: &FileSystemClient, name: &PackageName, version: &str, wat: &str, init: bool, signing_key: &PrivateKey, ) -> Result { publish( client, name, version, wat::parse_str(wat)?, init, signing_key, ) .await } pub async fn publish_wit( client: &FileSystemClient, name: &PackageName, version: &str, wit: &str, init: bool, signing_key: &PrivateKey, ) -> Result { let mut resolve = Resolve::new(); let pkg = resolve.push(UnresolvedPackage::parse(Path::new("foo.wit"), wit)?)?; publish( client, name, version, wit_component::encode(Some(true), &resolve, pkg)?, init, signing_key, ) .await }