// // Copyright (c) 2024 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at // http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 // which is available at https://www.apache.org/licenses/LICENSE-2.0. // // SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 // // Contributors: // ZettaScale Zenoh Team, // #![cfg(feature = "internal_config")] mod test { use std::{ fs, path::PathBuf, sync::{atomic::AtomicBool, Arc, Mutex}, time::Duration, }; use once_cell::sync::Lazy; use tokio::runtime::Handle; use zenoh::{config::WhatAmI, Config, Session}; use zenoh_config::{EndPoint, ModeDependentValue}; use zenoh_core::{zlock, ztimeout}; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); const KEY_EXPR: &str = "test/demo"; const VALUE: &str = "zenoh"; static TESTFILES_PATH: Lazy = Lazy::new(std::env::temp_dir); static TESTFILES_CREATED: Lazy = Lazy::new(|| AtomicBool::new(false)); #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_authentication_usrpwd() { zenoh_util::init_log_from_env_or("error"); create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); test_pub_sub_deny_then_allow_usrpswd(29447).await; test_pub_sub_allow_then_deny_usrpswd(29447).await; test_get_qbl_allow_then_deny_usrpswd(29447).await; test_get_qbl_deny_then_allow_usrpswd(29447).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_authentication_tls() { zenoh_util::init_log_from_env_or("error"); create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); test_pub_sub_deny_then_allow_tls(29448, false).await; test_pub_sub_allow_then_deny_tls(29449).await; test_get_qbl_allow_then_deny_tls(29450).await; test_get_qbl_deny_then_allow_tls(29451).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_authentication_quic() { zenoh_util::init_log_from_env_or("error"); create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); test_pub_sub_deny_then_allow_quic(29452).await; test_pub_sub_allow_then_deny_quic(29453).await; test_get_qbl_deny_then_allow_quic(29454).await; test_get_qbl_allow_then_deny_quic(29455).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_authentication_lowlatency() { // Test link AuthIds accessibility for lowlatency transport zenoh_util::init_log_from_env_or("error"); create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); test_pub_sub_deny_then_allow_tls(29456, true).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_authentication_subject_combinations() { zenoh_util::init_log_from_env_or("error"); create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); test_deny_allow_combination(29457).await; test_allow_deny_combination(29458).await; } #[allow(clippy::all)] async fn create_new_files(certs_dir: std::path::PathBuf) -> std::io::Result<()> { let created = TESTFILES_CREATED.fetch_or(true, std::sync::atomic::Ordering::SeqCst); if created { // only create files once per tests println!("Skipping testfile creation: files already created by another test instance"); return Ok(()); } use std::io::prelude::*; let ca_pem = b"-----BEGIN CERTIFICATE----- MIIDiTCCAnGgAwIBAgIUO1x6LAlICgKs5+pYUTo4CughfKEwDQYJKoZIhvcNAQEL BQAwVDELMAkGA1UEBhMCRlIxCzAJBgNVBAgMAklGMQswCQYDVQQHDAJQUjERMA8G A1UECgwIenMsIEluYy4xGDAWBgNVBAMMD3pzX3Rlc3Rfcm9vdF9jYTAeFw0yNDAz MTExNDM0MjNaFw0yNTAzMTExNDM0MjNaMFQxCzAJBgNVBAYTAkZSMQswCQYDVQQI DAJJRjELMAkGA1UEBwwCUFIxETAPBgNVBAoMCHpzLCBJbmMuMRgwFgYDVQQDDA96 c190ZXN0X3Jvb3RfY2EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC3 pFWM+IJNsRCYHt1v/TliecppwVZV+ZHfFw9JKN9ev4K/fWHUiAOwp91MOLxbaYKd C6dxW28YVGltoGz3kUZJZcJRQVso1jXv24Op4muOsiYXukLc4TU2F6dG1XqkLt5t svsYAQFf1uK3//QZFVRBosJEn+jjiJ4XCvt49mnPRolp1pNKX0z31mZO6bSly6c9 OVlJMjWpDCYSOuf6qZZ36fa9eSut2bRJIPY0QCsgnqYBTnIEhksS+3jy6Qt+QpLz 95pFdLbW/MW4XKpaDltyYkO6QrBekF6uWRlvyAHU+NqvXZ4F/3Z5l26qLuBcsLPJ kyawkO+yNIDxORmQgMczAgMBAAGjUzBRMB0GA1UdDgQWBBThgotd9ws2ryEEaKp2 +RMOWV8D7jAfBgNVHSMEGDAWgBThgotd9ws2ryEEaKp2+RMOWV8D7jAPBgNVHRMB Af8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQA9QoPv78hGmvmqF4GZeqrOBKQB N/H5wL7f8H6BXU/wpNo2nnWOJn3u37lT+zivAdGEv+x+GeKekcugKBCSluhBLpVb VNXe4WwMm5FBuO2NRBN2nblTMm1kEO00nVk1/yNo4hI8mj7d4YLU62d7324osNpF wHqu6B0/c99JeKRvODGswyff1i8rJ1jpcgk/JmHg7UQBHEIkn0cRR0f9W3Mxv6b5 ZeowRe81neWNkC6IMiMmzA0iHGkhoUMA15qG1ZKOr1XR364LH5BfNNpzAWYwkvJs 0JFrrdw+rm+cRJWs55yiyCCs7pyg1IJkY/o8bifdCOUgIyonzffwREk3+kZR -----END CERTIFICATE-----"; let client_side_pem = b"-----BEGIN CERTIFICATE----- MIIDjDCCAnSgAwIBAgIUOi9jKILrOzfRNGIkQ48S90NehpkwDQYJKoZIhvcNAQEL BQAwVDELMAkGA1UEBhMCRlIxCzAJBgNVBAgMAklGMQswCQYDVQQHDAJQUjERMA8G A1UECgwIenMsIEluYy4xGDAWBgNVBAMMD3pzX3Rlc3Rfcm9vdF9jYTAeFw0yNDAz MTkxMTMxNDhaFw0yNTAzMTkxMTMxNDhaMFAxCzAJBgNVBAYTAkZSMQswCQYDVQQI DAJJRjELMAkGA1UEBwwCUFIxETAPBgNVBAoMCHpzLCBJbmMuMRQwEgYDVQQDDAtj bGllbnRfc2lkZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMzU2p1a ly/1bi2TDZ8+Qlvk9/3KyHqrg2BGZUxB3Pj/lufDuYNwOHkss99wp8gzMsT28mD4 y6X7nCgEN8WeHl+/xfLuGsWIBa1OOr6dz0qewoWFsor01cQ8+nwAKlgnz6IvHfkQ OJZD/QYSdyn6c1AcIyS60vo4qMjyI4OVb1Dl4WpC4vCmWvDT0WjBZ5GckCnuQ8wS wZ5MtPuMQf8kYX95ll7eBtDfEXF9Oja0l1/5SmlHuKyqDy4sIKovxtFHTqgb8PUc yT33pUHOsBXruNBxl1MKq1outdMqcQknT6FAC+aVZ7bTlwhnH8p5Apn57g+dJYTI 9dCr1e2oK5NohhkCAwEAAaNaMFgwFgYDVR0RBA8wDYILY2xpZW50X3NpZGUwHQYD VR0OBBYEFHDUYYfQacLj1tp49OG9NbPuL0N/MB8GA1UdIwQYMBaAFOGCi133Czav IQRoqnb5Ew5ZXwPuMA0GCSqGSIb3DQEBCwUAA4IBAQB+nFAe6QyD2AaFdgrFOyEE MeYb97sy9p5ylhMYyU62AYsIzzpTY74wBG78qYPIw3lAYzNcN0L6T6kBQ4lu6gFm XB0SqCZ2AkwvV8tTlbLkZeoO6rONeke6c8cJsxYN7NiknDvTMrkTTgiyvbCWfEVX Htnc4j/KzSBX3UjVcbPM3L/6KwMRw050/6RCiOIPFjTOCfTGoDx5fIyBk3ch/Plw TkH2juHxX0/aCxr8hRE1v9+pXXlGnGoKbsDMLN9Aziu6xzdT/kD7BvyoM8rh7CE5 ae7/R4sd13cZ2WGDPimqO0z1kItMOIdiYvk4DgOg+J8hZSkKT56erafdDa2LPBE6 -----END CERTIFICATE-----"; let client_side_key = b"-----BEGIN PRIVATE KEY----- MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDM1NqdWpcv9W4t kw2fPkJb5Pf9ysh6q4NgRmVMQdz4/5bnw7mDcDh5LLPfcKfIMzLE9vJg+Mul+5wo BDfFnh5fv8Xy7hrFiAWtTjq+nc9KnsKFhbKK9NXEPPp8ACpYJ8+iLx35EDiWQ/0G Encp+nNQHCMkutL6OKjI8iODlW9Q5eFqQuLwplrw09FowWeRnJAp7kPMEsGeTLT7 jEH/JGF/eZZe3gbQ3xFxfTo2tJdf+UppR7isqg8uLCCqL8bRR06oG/D1HMk996VB zrAV67jQcZdTCqtaLrXTKnEJJ0+hQAvmlWe205cIZx/KeQKZ+e4PnSWEyPXQq9Xt qCuTaIYZAgMBAAECggEAAlqVVw7UEzLjtN4eX1S6tD3jvCzFBETdjgENF7TfjlR4 lln9UyV6Xqkc+Y28vdwZwqHwW90sEPCc5ShUQD7+jBzi8FVcZSX4o7rVCbz8RXgg 1eI5EKf632YQflWNpwTxGcTnGCY/sjleil/yst6sDdD+9eR4OXQme2Wt8wyH8pLm bf1OensGrFu3kJaPMOfP6jXnqEqkUPqmaCNW7+Ans8E+4J9oksRVPQJEuxwSjdJu BlG50KKpl0XwZ/u/hkkj8/BlRDa62YMGJkFOwaaGUu2/0UU139XaJiMSPoL6t/BU 1H15dtW9liEtnHIssXMRzc9cg+xPgCs79ABXSZaFUQKBgQD4mH/DcEFwkZQcr08i GUk0RE5arAqHui4eiujcPZVV6j/L7PHHmabKRPBlsndFP7KUCtvzNRmHq7JWDkpF S36OE4e94CBYb0CIrO8OO5zl1vGAn5qa9ckefSFz9AMWW+hSuo185hFjt67BMaI0 8CxfYDH+QY5D4JE5RhSwsOmiUQKBgQDS7qjq+MQKPHHTztyHK8IbAfEGlrBdCAjf K1bDX2BdfbRJMZ+y8LgK5HxDPlNx2/VauBLsIyU1Zirepd8hOsbCVoK1fOq+T7bY KdB1oqLK1Rq1sMBc26F24LBaZ3Pw5XgYEcvaOW0JFQ9Oc4VjcIXKjTNhobNOegfK QDnw8fEtSQKBgQDrCuTh2GVHFZ3AcVCUoOvB60NaH4flRHcOkbARbHihvtWK7gC8 A97bJ8tTnCWA5/TkXFAR54a36/K1wtUeJ38Evhp9wEdU1ftiPn/YKSzzcwLr5fu7 v9/kX9MdWv0ASu2iKphUGwMeETG9oDwJaXvKwZ0DFOB59P3Z9RTi6qI7wQKBgQCp uBZ6WgeDJPeBsaSHrpHUIU/KOV1WvaxFxR1evlNPZmG1sxQIat/rA8VoZbHGn3Ff uVSgY/cAbGB6HYTXu+9JV0p8tTI8Ru+cJqjwvhe2lJmVL87X6HCWsluzoiIL5tcm pssbn7E36ZYTTag6RsOgItUA7ZbUwiOafOsiD8o64QKBgE6nOkAfy5mbp7X+q9uD J5y6IXpY/Oia/RwveLWFbI/aum4Nnhb6L9Y0XlrYjm4cJOchQyDR7FF6f4EuAiYb wdxBbkxXpwXnfKCtNvMF/wZMvPVaS5HTQga8hXMrtlW6jtTJ4HmkTTB/MILAXVkJ EHi+N70PcrYg6li415TGfgDz -----END PRIVATE KEY-----"; let server_side_pem = b"-----BEGIN CERTIFICATE----- MIIDjDCCAnSgAwIBAgIUOi9jKILrOzfRNGIkQ48S90NehpgwDQYJKoZIhvcNAQEL BQAwVDELMAkGA1UEBhMCRlIxCzAJBgNVBAgMAklGMQswCQYDVQQHDAJQUjERMA8G A1UECgwIenMsIEluYy4xGDAWBgNVBAMMD3pzX3Rlc3Rfcm9vdF9jYTAeFw0yNDAz MTkxMTMxMDRaFw0yNTAzMTkxMTMxMDRaMFAxCzAJBgNVBAYTAkZSMQswCQYDVQQI DAJJRjELMAkGA1UEBwwCUFIxETAPBgNVBAoMCHpzLCBJbmMuMRQwEgYDVQQDDAtz ZXJ2ZXJfc2lkZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKw4eKzt T1inzuEIPBaPksWyjoD9n6uJx9jAQ2wRB6rXiAsXVLRSuczdGDpb1MwAqoIi6ozw tzDRwkr58vUNaTCswxadlAmB44JEVYKZoublHjlVj5ygr0R4R5F2T9tIV+jpqZuK HR4dHe8PiDCiWVzWvYwOLVKXQKSeaE2Z143ukVIJ85qmNykJ066AVhgWnIYSCR9c s7WPBdTWAW3L4yNlast9hfvxdQNDs5AtUnJKfAX+7DylPAm8V7YjU1k9AtTNPbpy kb9X97ErsB8891MmZaGZp0J6tnuucDkk0dlowMVvi2aUCsYoKF5DgGxtyVAeLhTP 70GenaLe2uwG8fMCAwEAAaNaMFgwFgYDVR0RBA8wDYILc2VydmVyX3NpZGUwHQYD VR0OBBYEFBKms1sOw8nM/O5SN1EZIH+LsWaPMB8GA1UdIwQYMBaAFOGCi133Czav IQRoqnb5Ew5ZXwPuMA0GCSqGSIb3DQEBCwUAA4IBAQA6H/sfm8YUn86+GwxNR9i9 MCL7WHVRx3gS9ENK87+HtZNL2TVvhPJtupG3Pjgqi33FOHrM4rMUcWSZeCEycVgy 5cjimQLwfDljIBRQE6sem3gKf0obdWl5AlPDLTL/iKj5Su7NycrjZFYqkjZjn+58 fe8lzHNeP/3RQTgjJ98lQI0bdzGDG1+QoxTgPEc77vgN0P4MHJYx2auz/7jYBqNJ ko8nugIQsd4kOhmOIBUQ8aXkXFktSQIerEGB8uw5iF2cCdH/sTCvhzhxLb4IWo/O 0cAZ+Vs4FW3KUn/Y44yrVAWl1H6xdFsNXBqbzVEMzlt/RV3rH70RDCc20XhP+w+g -----END CERTIFICATE-----"; let server_side_key = b"-----BEGIN PRIVATE KEY----- MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCsOHis7U9Yp87h CDwWj5LFso6A/Z+ricfYwENsEQeq14gLF1S0UrnM3Rg6W9TMAKqCIuqM8Lcw0cJK +fL1DWkwrMMWnZQJgeOCRFWCmaLm5R45VY+coK9EeEeRdk/bSFfo6ambih0eHR3v D4gwollc1r2MDi1Sl0CknmhNmdeN7pFSCfOapjcpCdOugFYYFpyGEgkfXLO1jwXU 1gFty+MjZWrLfYX78XUDQ7OQLVJySnwF/uw8pTwJvFe2I1NZPQLUzT26cpG/V/ex K7AfPPdTJmWhmadCerZ7rnA5JNHZaMDFb4tmlArGKCheQ4BsbclQHi4Uz+9Bnp2i 3trsBvHzAgMBAAECggEAUjpIS/CmkOLWYRVoczEr197QMYBnCyUm2TO7PU7IRWbR GtKR6+MPuWPbHIoaCSlMQARhztdj8BhG1zuOKDi1/7qNDzA/rWZp9RmhZlDquamt i5xxjEwgQuXW7fn6WO2qo5dlFtGT43vtfeYBlY7+cdhJ+iQOub9j6vWDQYHxrF7x yM8xvNzomHThvLFzWXJV/nGjX5pqPraMmwJUW+MGX0YaEr6tClqsc1Kmxhs3iIUo 1JCqh3FpVu2i/mR9fdcQ0ONT/s1UHzy+1Bhmh3j2Fuk4+ZeLMfxTfFxk5U0BeMQY sES3qmd+pG5iqPW+AmXy299G89jf5+1Q4J2Km5KOUQKBgQDidifoeknpi9hRHLLD w/7KMMe8yYg3c3dv5p0iUQQ2pXd1lJIFQ+B2/D+hfOXhnN/iCDap89ll2LoQ2Q9L 38kQXH06HCM2q11RP0BEsZCG0CnluS+JVNnjs/ALi+yc4HSpzKPs3zXIC3dLOUbq ov5Esa5h/RU6+NO+DH72TWTv6wKBgQDCryPKtOcLp1eqdwIBRoXdPZeUdZdnwT8+ 70DnC+YdOjFkqTbaoYE5ePa3ziGOZyTFhJbPgiwEdj9Ez1JSgqLLv5hBc4s6FigK D7fOnn7Q7+al/kEW7+X5yoSl1bFuPCqGL1xxzxmpDY8Gf3nyZ+QGfWIenbk3nq12 nTgINyWMGQKBgQDSrxBDxXl8EMGH/MYHQRGKs8UvSuMyi3bjoU4w/eSInno75qPO yC5NJDJin9sSgar8E54fkSCBExdP01DayvC5CwLqDAFqvBTOIKU/A18tPP6tnRKv lkQ8Bkxdwai47k07J4qeNa9IU/qA/mGOq2MZL6DHwvd8bMA5gFCh/rDYTwKBgAPm gGASScK5Ao+evMKLyCjLkBrgVD026O542qMGYQDa5pxuq3Or4qvlGYRLM+7ncBwo 8OCNahZYzCGzyaFvjpVobEN7biGmyfyRngwcrsu+0q8mreUov0HG5etwoZJk0DFK B58cGBaD+AaYTTgnDrF2l52naUuM+Uq0EahQeocZAoGBAMJEGUFyEdm1JATkNhBv ruDzj07PCjdvq3lUJix2ZlKlabsi5V+oYxMmrUSU8Nkaxy6O+qETNRNWQeWbPQHL IZx/qrP32PmWX0IVj3pbfKHQSpOKNGzL9xUJ/FIycZWyT3yGf24KBuJwIx7xSrRx qNsoty1gY/y3n7SN/iMZo8lO -----END PRIVATE KEY-----"; let credentials_txt = b"client1name:client1passwd client2name:client2passwd"; struct Testfile<'a> { name: &'a str, value: &'a [u8], } let test_files = vec![ Testfile { name: "ca.pem", value: ca_pem, }, Testfile { name: "clientsidekey.pem", value: client_side_key, }, Testfile { name: "clientside.pem", value: client_side_pem, }, Testfile { name: "serversidekey.pem", value: server_side_key, }, Testfile { name: "serverside.pem", value: server_side_pem, }, Testfile { name: "credentials.txt", value: credentials_txt, }, ]; for test_file in test_files { let file_path = certs_dir.join(test_file.name); let mut file = fs::File::create(&file_path)?; file.write_all(test_file.value)?; } println!("testfiles created successfully."); Ok(()) } async fn get_basic_router_config_tls(port: u16, lowlatency: bool) -> Config { let cert_path = TESTFILES_PATH.to_string_lossy(); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen .endpoints .set(vec![format!("tls/127.0.0.1:{}", port).parse().unwrap()]) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); config .insert_json5( "transport", r#"{ "link": { "protocols": [ "tls" ], "tls": { "enable_mtls": true, "verify_name_on_connect": false }, }, }"#, ) .unwrap(); config .transport .link .tls .set_listen_private_key(Some(format!("{}/serversidekey.pem", cert_path))) .unwrap(); config .transport .link .tls .set_listen_certificate(Some(format!("{}/serverside.pem", cert_path))) .unwrap(); config .transport .link .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); config.transport.unicast.set_lowlatency(lowlatency).unwrap(); config .transport .unicast .qos .set_enabled(!lowlatency) .unwrap(); config } async fn get_basic_router_config_quic(port: u16) -> Config { let cert_path = TESTFILES_PATH.to_string_lossy(); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen .endpoints .set(vec![format!("quic/127.0.0.1:{}", port).parse().unwrap()]) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); config .insert_json5( "transport", r#"{ "link": { "protocols": [ "quic" ], "tls": { "enable_mtls": true, "verify_name_on_connect": false }, }, }"#, ) .unwrap(); config .transport .link .tls .set_listen_private_key(Some(format!("{}/serversidekey.pem", cert_path))) .unwrap(); config .transport .link .tls .set_listen_certificate(Some(format!("{}/serverside.pem", cert_path))) .unwrap(); config .transport .link .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); config } async fn get_basic_router_config_usrpswd(port: u16) -> Config { let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen .endpoints .set(vec![format!("tcp/127.0.0.1:{port}").parse().unwrap()]) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); config .insert_json5( "transport", r#"{ "auth": { usrpwd: { user: "routername", password: "routerpasswd", }, }, }"#, ) .unwrap(); config .transport .auth .usrpwd .set_dictionary_file(Some(format!( "{}/credentials.txt", TESTFILES_PATH.to_string_lossy() ))) .unwrap(); config } async fn close_router_session(s: Session) { println!("Closing router session"); ztimeout!(s.close()).unwrap(); } async fn get_basic_router_config_quic_usrpswd(port: u16) -> Config { let cert_path = TESTFILES_PATH.to_string_lossy(); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen .endpoints .set(vec![ format!("quic/127.0.0.1:{port}").parse().unwrap(), format!("tcp/127.0.0.1:{port}").parse().unwrap(), ]) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); config .insert_json5( "transport", r#"{ "link": { "protocols": [ "quic", "tcp" ], "tls": { "enable_mtls": true, "verify_name_on_connect": false }, }, "auth": { usrpwd: { user: "routername", password: "routerpasswd", }, }, }"#, ) .unwrap(); config .transport .auth .usrpwd .set_dictionary_file(Some(format!( "{}/credentials.txt", TESTFILES_PATH.to_string_lossy() ))) .unwrap(); config .transport .link .tls .set_listen_private_key(Some(format!("{}/serversidekey.pem", cert_path))) .unwrap(); config .transport .link .tls .set_listen_certificate(Some(format!("{}/serverside.pem", cert_path))) .unwrap(); config .transport .link .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); config } async fn get_client_sessions_tls(port: u16, lowlatency: bool) -> (Session, Session) { let cert_path = TESTFILES_PATH.to_string_lossy(); println!("Opening client sessions"); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect .set_endpoints(ModeDependentValue::Unique(vec![format!( "tls/127.0.0.1:{port}" ) .parse::() .unwrap()])) .unwrap(); config .insert_json5( "transport", r#"{ "link": { "protocols": [ "tls" ], "tls": { "enable_mtls": true, "verify_name_on_connect": false } } }"#, ) .unwrap(); config .transport .link .tls .set_connect_private_key(Some(format!("{}/clientsidekey.pem", cert_path))) .unwrap(); config .transport .link .tls .set_connect_certificate(Some(format!("{}/clientside.pem", cert_path))) .unwrap(); config .transport .link .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); config.transport.unicast.set_lowlatency(lowlatency).unwrap(); config .transport .unicast .qos .set_enabled(!lowlatency) .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect .set_endpoints(ModeDependentValue::Unique(vec![format!( "tls/127.0.0.1:{port}" ) .parse::() .unwrap()])) .unwrap(); config .insert_json5( "transport", r#"{ "link": { "protocols": [ "tls" ], "tls": { "enable_mtls": true, "verify_name_on_connect": false } } }"#, ) .unwrap(); config .transport .link .tls .set_connect_private_key(Some(format!("{}/clientsidekey.pem", cert_path))) .unwrap(); config .transport .link .tls .set_connect_certificate(Some(format!("{}/clientside.pem", cert_path))) .unwrap(); config .transport .link .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); config.transport.unicast.set_lowlatency(lowlatency).unwrap(); config .transport .unicast .qos .set_enabled(!lowlatency) .unwrap(); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } async fn get_client_sessions_quic(port: u16) -> (Session, Session) { let cert_path = TESTFILES_PATH.to_string_lossy(); println!("Opening client sessions"); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect .set_endpoints(ModeDependentValue::Unique(vec![format!( "quic/127.0.0.1:{port}" ) .parse::() .unwrap()])) .unwrap(); config .insert_json5( "transport", r#"{ "link": { "protocols": [ "quic" ], "tls": { "enable_mtls": true, "verify_name_on_connect": false } } }"#, ) .unwrap(); config .transport .link .tls .set_connect_private_key(Some(format!("{}/clientsidekey.pem", cert_path))) .unwrap(); config .transport .link .tls .set_connect_certificate(Some(format!("{}/clientside.pem", cert_path))) .unwrap(); config .transport .link .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect .set_endpoints(ModeDependentValue::Unique(vec![format!( "quic/127.0.0.1:{port}" ) .parse::() .unwrap()])) .unwrap(); config .insert_json5( "transport", r#"{ "link": { "protocols": [ "quic" ], "tls": { "enable_mtls": true, "verify_name_on_connect": false } } }"#, ) .unwrap(); config .transport .link .tls .set_connect_private_key(Some(format!("{}/clientsidekey.pem", cert_path))) .unwrap(); config .transport .link .tls .set_connect_certificate(Some(format!("{}/clientside.pem", cert_path))) .unwrap(); config .transport .link .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } async fn get_client_sessions_usrpswd(port: u16) -> (Session, Session) { println!("Opening client sessions"); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect .set_endpoints(ModeDependentValue::Unique(vec![format!( "tcp/127.0.0.1:{port}" ) .parse::() .unwrap()])) .unwrap(); config .insert_json5( "transport", r#"{ "auth": { usrpwd: { user: "client1name", password: "client1passwd", }, } }"#, ) .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect .set_endpoints(ModeDependentValue::Unique(vec![format!( "tcp/127.0.0.1:{port}" ) .parse::() .unwrap()])) .unwrap(); config .insert_json5( "transport", r#"{ "auth": { usrpwd: { user: "client2name", password: "client2passwd", }, } }"#, ) .unwrap(); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } async fn get_client_sessions_quic_usrpswd(port: u16) -> (Session, Session) { let cert_path = TESTFILES_PATH.to_string_lossy(); println!("Opening client sessions"); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect .set_endpoints(ModeDependentValue::Unique(vec![format!( "quic/127.0.0.1:{port}" ) .parse::() .unwrap()])) .unwrap(); config .insert_json5( "transport", r#"{ "link": { "protocols": [ "quic" ], "tls": { "enable_mtls": true, "verify_name_on_connect": false } }, "auth": { usrpwd: { user: "client1name", password: "client1passwd", }, } }"#, ) .unwrap(); config .transport .link .tls .set_connect_private_key(Some(format!("{}/clientsidekey.pem", cert_path))) .unwrap(); config .transport .link .tls .set_connect_certificate(Some(format!("{}/clientside.pem", cert_path))) .unwrap(); config .transport .link .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect .set_endpoints(ModeDependentValue::Unique(vec![format!( "quic/127.0.0.1:{port}" ) .parse::() .unwrap()])) .unwrap(); config .insert_json5( "transport", r#"{ "link": { "protocols": [ "quic" ], "tls": { "enable_mtls": true, "verify_name_on_connect": false } }, "auth": { usrpwd: { user: "client2name", password: "client2passwd", }, } }"#, ) .unwrap(); config .transport .link .tls .set_connect_private_key(Some(format!("{}/clientsidekey.pem", cert_path))) .unwrap(); config .transport .link .tls .set_connect_certificate(Some(format!("{}/clientside.pem", cert_path))) .unwrap(); config .transport .link .tls .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } async fn close_sessions(s01: Session, s02: Session) { println!("Closing client sessions"); ztimeout!(s01.close()).unwrap(); ztimeout!(s02.close()).unwrap(); } async fn test_pub_sub_deny_then_allow_tls(port: u16, lowlatency: bool) { println!("test_pub_sub_deny_then_allow_tls"); let mut config_router = get_basic_router_config_tls(port, lowlatency).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "r1", "permission": "allow", "flows": ["ingress","egress"], "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "cert_common_names": [ "client_side" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions_tls(port, lowlatency).await; { let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let temp_recv_value = received_value.clone(); let subscriber = sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); }) .await .unwrap(); tokio::time::sleep(SLEEP).await; publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; assert_eq!(*zlock!(received_value), VALUE); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; close_router_session(session).await; } async fn test_pub_sub_allow_then_deny_tls(port: u16) { println!("test_pub_sub_allow_then_deny_tls"); let mut config_router = get_basic_router_config_tls(port, false).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { "id": "r1", "permission": "deny", "flows": ["egress"], "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "cert_common_names": [ "client_side" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions_tls(port, false).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let temp_recv_value = received_value.clone(); let subscriber = ztimeout!(sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); })) .unwrap(); tokio::time::sleep(SLEEP).await; ztimeout!(publisher.put(VALUE)).unwrap(); tokio::time::sleep(SLEEP).await; assert_ne!(*zlock!(received_value), VALUE); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; close_router_session(session).await; } async fn test_get_qbl_deny_then_allow_tls(port: u16) { println!("test_get_qbl_deny_then_allow_tls"); let mut config_router = get_basic_router_config_tls(port, false).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "r1", "permission": "allow", "flows": ["egress", "ingress"], "messages": [ "query", "declare_queryable", "reply", ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "cert_common_names": [ "client_side" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions_tls(port, false).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current().block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!( "Error : {}", e.payload() .try_to_string() .unwrap_or_else(|e| e.to_string().into()) ), } } tokio::time::sleep(SLEEP).await; assert_eq!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_get_qbl_allow_then_deny_tls(port: u16) { println!("test_get_qbl_allow_then_deny_tls"); let mut config_router = get_basic_router_config_tls(port, false).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { "id": "r1", "permission": "deny", "flows": ["egress"], "messages": [ "query", "declare_queryable", "reply" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "cert_common_names": [ "client_side" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions_tls(port, false).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current().block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!( "Error : {}", e.payload() .try_to_string() .unwrap_or_else(|e| e.to_string().into()) ), } } tokio::time::sleep(SLEEP).await; assert_ne!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_pub_sub_deny_then_allow_quic(port: u16) { println!("test_pub_sub_deny_then_allow_quic"); let mut config_router = get_basic_router_config_quic(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "r1", "permission": "allow", "flows": ["egress", "ingress"], "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "cert_common_names": [ "client_side" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions_quic(port).await; { let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let temp_recv_value = received_value.clone(); let subscriber = sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); }) .await .unwrap(); tokio::time::sleep(SLEEP).await; publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; assert_eq!(*zlock!(received_value), VALUE); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; close_router_session(session).await; } #[allow(unused)] async fn test_pub_sub_allow_then_deny_quic(port: u16) { println!("test_pub_sub_allow_then_deny_quic"); let mut config_router = get_basic_router_config_quic(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { "id": "r1", "permission": "deny", "flows": ["egress"], "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "cert_common_names": [ "client_side" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions_quic(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let temp_recv_value = received_value.clone(); let subscriber = ztimeout!(sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); })) .unwrap(); tokio::time::sleep(SLEEP).await; ztimeout!(publisher.put(VALUE)).unwrap(); tokio::time::sleep(SLEEP).await; assert_ne!(*zlock!(received_value), VALUE); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; close_router_session(session).await; } #[allow(unused)] async fn test_get_qbl_deny_then_allow_quic(port: u16) { println!("test_get_qbl_deny_then_allow_quic"); let mut config_router = get_basic_router_config_quic(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "r1", "permission": "allow", "flows": ["egress", "ingress"], "messages": [ "query", "declare_queryable", "reply" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "cert_common_names": [ "client_side" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions_quic(port).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current().block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!( "Error : {}", e.payload() .try_to_string() .unwrap_or_else(|e| e.to_string().into()) ), } } tokio::time::sleep(SLEEP).await; assert_eq!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } #[allow(unused)] async fn test_get_qbl_allow_then_deny_quic(port: u16) { println!("test_get_qbl_allow_then_deny_quic"); let mut config_router = get_basic_router_config_quic(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { "id": "r1", "permission": "deny", "flows": ["egress"], "messages": [ "query", "declare_queryable", "reply" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "cert_common_names": [ "client_side" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions_quic(port).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current().block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!( "Error : {}", e.payload() .try_to_string() .unwrap_or_else(|e| e.to_string().into()) ), } } tokio::time::sleep(SLEEP).await; assert_ne!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_pub_sub_deny_then_allow_usrpswd(port: u16) { println!("test_pub_sub_deny_then_allow_usrpswd"); let mut config_router = get_basic_router_config_usrpswd(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "r1", "permission": "allow", "flows": ["ingress", "egress"], "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "usernames": [ "client1name", "client2name" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions_usrpswd(port).await; { let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let temp_recv_value = received_value.clone(); let subscriber = sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); }) .await .unwrap(); tokio::time::sleep(SLEEP).await; publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; assert_eq!(*zlock!(received_value), VALUE); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; close_router_session(session).await; } async fn test_pub_sub_allow_then_deny_usrpswd(port: u16) { println!("test_pub_sub_allow_then_deny_usrpswd"); let mut config_router = get_basic_router_config_usrpswd(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { "id": "r1", "permission": "deny", "flows": ["egress"], "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "usernames": [ "client1name", "client2name" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions_usrpswd(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let temp_recv_value = received_value.clone(); let subscriber = ztimeout!(sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); })) .unwrap(); tokio::time::sleep(SLEEP).await; ztimeout!(publisher.put(VALUE)).unwrap(); tokio::time::sleep(SLEEP).await; assert_ne!(*zlock!(received_value), VALUE); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; close_router_session(session).await; } async fn test_get_qbl_deny_then_allow_usrpswd(port: u16) { println!("test_get_qbl_deny_then_allow_usrpswd"); let mut config_router = get_basic_router_config_usrpswd(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "r1", "permission": "allow", "flows": ["ingress", "egress"], "messages": [ "query", "declare_queryable", "reply" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "usernames": [ "client1name", "client2name" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions_usrpswd(port).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current().block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!( "Error : {}", e.payload() .try_to_string() .unwrap_or_else(|e| e.to_string().into()) ), } } tokio::time::sleep(SLEEP).await; assert_eq!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_get_qbl_allow_then_deny_usrpswd(port: u16) { println!("test_get_qbl_allow_then_deny_usrpswd"); let mut config_router = get_basic_router_config_usrpswd(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { "id": "r1", "permission": "deny", "flows": ["egress"], "messages": [ "query", "declare_queryable", "reply" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "usernames": [ "client1name", "client2name" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions_usrpswd(port).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current().block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!( "Error : {}", e.payload() .try_to_string() .unwrap_or_else(|e| e.to_string().into()) ), } } tokio::time::sleep(SLEEP).await; assert_ne!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_deny_allow_combination(port: u16) { println!("test_deny_allow_combination"); let mut config_router = get_basic_router_config_quic_usrpswd(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "r1", "permission": "allow", "flows": ["ingress", "egress"], "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "cert_common_names": [ "client_side" ], "usernames": [ "client1name", "client2name" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions_usrpswd(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let temp_recv_value = received_value.clone(); let subscriber = ztimeout!(sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); })) .unwrap(); tokio::time::sleep(SLEEP).await; ztimeout!(publisher.put(VALUE)).unwrap(); tokio::time::sleep(SLEEP).await; assert_ne!(*zlock!(received_value), VALUE); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; let (sub_session, pub_session) = get_client_sessions_quic_usrpswd(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let temp_recv_value = received_value.clone(); let subscriber = ztimeout!(sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); })) .unwrap(); tokio::time::sleep(SLEEP).await; ztimeout!(publisher.put(VALUE)).unwrap(); tokio::time::sleep(SLEEP).await; assert_eq!(*zlock!(received_value), VALUE); ztimeout!(subscriber.undeclare()).unwrap(); } close_router_session(session).await; } async fn test_allow_deny_combination(port: u16) { println!("test_allow_deny_combination"); let mut config_router = get_basic_router_config_quic_usrpswd(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { "id": "r1", "permission": "deny", "flows": ["egress"], "messages": [ "put", "declare_subscriber" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "cert_common_names": [ "client_side" ], "usernames": [ "client1name", "client2name" ] } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions_usrpswd(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let temp_recv_value = received_value.clone(); let subscriber = ztimeout!(sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); })) .unwrap(); tokio::time::sleep(SLEEP).await; ztimeout!(publisher.put(VALUE)).unwrap(); tokio::time::sleep(SLEEP).await; assert_eq!(*zlock!(received_value), VALUE); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; let (sub_session, pub_session) = get_client_sessions_quic_usrpswd(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let temp_recv_value = received_value.clone(); let subscriber = ztimeout!(sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); })) .unwrap(); tokio::time::sleep(SLEEP).await; ztimeout!(publisher.put(VALUE)).unwrap(); tokio::time::sleep(SLEEP).await; assert_ne!(*zlock!(received_value), VALUE); ztimeout!(subscriber.undeclare()).unwrap(); } close_router_session(session).await; } }