// // Copyright (c) 2024 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at // http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 // which is available at https://www.apache.org/licenses/LICENSE-2.0. // // SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 // // Contributors: // ZettaScale Zenoh Team, // #![cfg(all(feature = "unstable", feature = "internal_config"))] #![cfg(target_family = "unix")] use std::{ sync::{atomic::AtomicBool, Arc, Mutex}, time::Duration, }; use tokio::runtime::Handle; use zenoh::{config::WhatAmI, sample::SampleKind, Config, Session}; use zenoh_config::{EndPoint, ModeDependentValue}; use zenoh_core::{zlock, ztimeout}; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); const KEY_EXPR: &str = "test/demo"; const VALUE: &str = "zenoh"; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_acl_pub_sub() { zenoh::init_log_from_env_or("error"); test_pub_sub_deny(27447).await; test_pub_sub_allow(27447).await; test_pub_sub_deny_then_allow(27447).await; test_pub_sub_allow_then_deny(27447).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_acl_get_queryable() { zenoh::init_log_from_env_or("error"); test_get_qbl_deny(27448).await; test_get_qbl_allow(27448).await; test_get_qbl_allow_then_deny(27448).await; test_get_qbl_deny_then_allow(27448).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_acl_queryable_reply() { zenoh::init_log_from_env_or("error"); // Only test cases not covered by `test_acl_get_queryable` test_reply_deny(27449).await; test_reply_allow_then_deny(27449).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_acl_liveliness() { zenoh::init_log_from_env_or("error"); test_liveliness_allow(27450).await; test_liveliness_deny(27450).await; test_liveliness_allow_deny_token(27450).await; test_liveliness_deny_allow_token(27450).await; test_liveliness_allow_deny_sub(27450).await; test_liveliness_deny_allow_sub(27450).await; test_liveliness_allow_deny_query(27450).await; test_liveliness_deny_allow_query(27450).await; } async fn get_basic_router_config(port: u16) -> Config { let mut config = Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen .endpoints .set(vec![format!("tcp/127.0.0.1:{port}").parse().unwrap()]) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); config } async fn close_router_session(s: Session) { println!("Closing router session"); ztimeout!(s.close()).unwrap(); } async fn get_client_sessions(port: u16) -> (Session, Session) { println!("Opening client sessions"); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect .set_endpoints(ModeDependentValue::Unique(vec![format!( "tcp/127.0.0.1:{port}" ) .parse::() .unwrap()])) .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect .set_endpoints(ModeDependentValue::Unique(vec![format!( "tcp/127.0.0.1:{port}" ) .parse::() .unwrap()])) .unwrap(); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } async fn close_sessions(s01: Session, s02: Session) { println!("Closing client sessions"); ztimeout!(s01.close()).unwrap(); ztimeout!(s02.close()).unwrap(); } async fn test_pub_sub_deny(port: u16) { println!("test_pub_sub_deny"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [], "subjects": [], "policies": [], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions(port).await; { let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let deleted = Arc::new(Mutex::new(false)); let temp_recv_value = received_value.clone(); let deleted_clone = deleted.clone(); let subscriber = sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); } else if sample.kind() == SampleKind::Delete { let mut deleted = zlock!(deleted_clone); *deleted = true; } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; assert_ne!(*zlock!(received_value), VALUE); publisher.delete().await.unwrap(); tokio::time::sleep(SLEEP).await; assert!(!(*zlock!(deleted))); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; close_router_session(session).await; } async fn test_pub_sub_allow(port: u16) { println!("test_pub_sub_allow"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [], "subjects": [], "policies": [], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let deleted = Arc::new(Mutex::new(false)); let temp_recv_value = received_value.clone(); let deleted_clone = deleted.clone(); let subscriber = sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); } else if sample.kind() == SampleKind::Delete { let mut deleted = zlock!(deleted_clone); *deleted = true; } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; assert_eq!(*zlock!(received_value), VALUE); publisher.delete().await.unwrap(); tokio::time::sleep(SLEEP).await; assert!(*zlock!(deleted)); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; close_router_session(session).await; } async fn test_pub_sub_allow_then_deny(port: u16) { println!("test_pub_sub_allow_then_deny"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { "id": "r1", "permission": "deny", "flows": ["egress", "ingress"], "messages": [ "put", "delete", "declare_subscriber" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "interfaces": [ "lo", "lo0" ], } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let deleted = Arc::new(Mutex::new(false)); let temp_recv_value = received_value.clone(); let deleted_clone = deleted.clone(); let subscriber = sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); } else if sample.kind() == SampleKind::Delete { let mut deleted = zlock!(deleted_clone); *deleted = true; } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; assert_ne!(*zlock!(received_value), VALUE); publisher.delete().await.unwrap(); tokio::time::sleep(SLEEP).await; assert!(!(*zlock!(deleted))); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; close_router_session(session).await; } async fn test_pub_sub_deny_then_allow(port: u16) { println!("test_pub_sub_deny_then_allow"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "r1", "permission": "allow", "flows": ["egress", "ingress"], "messages": [ "put", "delete", "declare_subscriber" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "interfaces": [ "lo", "lo0" ], } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (sub_session, pub_session) = get_client_sessions(port).await; { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); let deleted = Arc::new(Mutex::new(false)); let temp_recv_value = received_value.clone(); let deleted_clone = deleted.clone(); let subscriber = sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().try_to_string().unwrap().into_owned(); } else if sample.kind() == SampleKind::Delete { let mut deleted = zlock!(deleted_clone); *deleted = true; } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; assert_eq!(*zlock!(received_value), VALUE); publisher.delete().await.unwrap(); tokio::time::sleep(SLEEP).await; assert!(*zlock!(deleted)); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; close_router_session(session).await; } async fn test_get_qbl_deny(port: u16) { println!("test_get_qbl_deny"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "allow reply", "permission": "allow", "messages": ["reply"], "flows": ["egress", "ingress"], "key_exprs": ["test/demo"], } ], "subjects": [ { "id": "all" } ], "policies": [ { "rules": ["allow reply"], "subjects": ["all"], } ], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions(port).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current() .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!("Error : {:?}", e), } } tokio::time::sleep(SLEEP).await; assert_ne!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_get_qbl_allow(port: u16) { println!("test_get_qbl_allow"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [], "subjects": [], "policies": [], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions(port).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current() .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!("Error : {:?}", e), } } tokio::time::sleep(SLEEP).await; assert_eq!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_get_qbl_deny_then_allow(port: u16) { println!("test_get_qbl_deny_then_allow"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "r1", "permission": "allow", "flows": ["egress", "ingress"], "messages": [ "query", "declare_queryable", "reply" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "interfaces": [ "lo", "lo0" ], } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions(port).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current() .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!("Error : {:?}", e), } } tokio::time::sleep(SLEEP).await; assert_eq!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_get_qbl_allow_then_deny(port: u16) { println!("test_get_qbl_allow_then_deny"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { "id": "r1", "permission": "deny", "flows": ["egress", "ingress"], "messages": [ "query", "declare_queryable" ], "key_exprs": [ "test/demo" ], }, ], "subjects": [ { "id": "s1", "interfaces": [ "lo", "lo0" ], } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions(port).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current() .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!("Error : {:?}", e), } } tokio::time::sleep(SLEEP).await; assert_ne!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_reply_deny(port: u16) { println!("test_reply_deny"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { "id": "allow get/declare qbl", "permission": "allow", "messages": ["query", "declare_queryable"], "key_exprs": ["test/demo"], } ], "subjects": [ { "id": "all" } ], "policies": [ { "rules": ["allow get/declare qbl"], "subjects": ["all"], } ], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions(port).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current() .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!("Error : {:?}", e), } } tokio::time::sleep(SLEEP).await; assert_ne!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_reply_allow_then_deny(port: u16) { println!("test_reply_allow_then_deny"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { "id": "r1", "permission": "deny", "messages": ["reply"], "flows": ["egress", "ingress"], "key_exprs": ["test/demo"], }, ], "subjects": [ { "id": "s1", "interfaces": [ "lo", "lo0" ], } ], "policies": [ { "rules": ["r1"], "subjects": ["s1"], } ] }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (get_session, qbl_session) = get_client_sessions(port).await; { let mut received_value = String::new(); let qbl = ztimeout!(qbl_session .declare_queryable(KEY_EXPR) .callback(move |sample| { tokio::task::block_in_place(move || { Handle::current() .block_on(async move { ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() }); }); })) .unwrap(); tokio::time::sleep(SLEEP).await; let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { match reply.result() { Ok(sample) => { received_value = sample.payload().try_to_string().unwrap().into_owned(); break; } Err(e) => println!("Error : {:?}", e), } } tokio::time::sleep(SLEEP).await; assert_ne!(received_value, VALUE); ztimeout!(qbl.undeclare()).unwrap(); } close_sessions(get_session, qbl_session).await; close_router_session(session).await; } async fn test_liveliness_deny(port: u16) { println!("test_liveliness_deny"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [], "subjects": [], "policies": [], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (reader_session, writer_session) = get_client_sessions(port).await; let received_token = Arc::new(AtomicBool::new(false)); let dropped_token = Arc::new(AtomicBool::new(false)); let received_token_reply = Arc::new(AtomicBool::new(false)); let cloned_received_token = received_token.clone(); let cloned_dropped_token = dropped_token.clone(); let cloned_received_token_reply = received_token_reply.clone(); let subscriber = reader_session .liveliness() .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); } else if sample.kind() == SampleKind::Delete { cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; // test if sub receives token declaration let liveliness = writer_session .liveliness() .declare_token(KEY_EXPR) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); // test if query receives token reply reader_session .liveliness() .get(KEY_EXPR) .timeout(TIMEOUT) .callback(move |reply| match reply.result() { Ok(_) => { cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); } Err(e) => println!("Error : {:?}", e), }) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); // test if sub receives token undeclaration ztimeout!(liveliness.undeclare()).unwrap(); tokio::time::sleep(SLEEP).await; assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); ztimeout!(subscriber.undeclare()).unwrap(); close_sessions(reader_session, writer_session).await; close_router_session(session).await; } async fn test_liveliness_allow(port: u16) { println!("test_liveliness_allow"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [], "subjects": [], "policies": [], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (reader_session, writer_session) = get_client_sessions(port).await; let received_token = Arc::new(AtomicBool::new(false)); let dropped_token = Arc::new(AtomicBool::new(false)); let received_token_reply = Arc::new(AtomicBool::new(false)); let cloned_received_token = received_token.clone(); let cloned_dropped_token = dropped_token.clone(); let cloned_received_token_reply = received_token_reply.clone(); let subscriber = reader_session .liveliness() .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); } else if sample.kind() == SampleKind::Delete { cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; // test if sub receives token declaration let liveliness = writer_session .liveliness() .declare_token(KEY_EXPR) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); // test if query receives token reply reader_session .liveliness() .get(KEY_EXPR) .timeout(TIMEOUT) .callback(move |reply| match reply.result() { Ok(_) => { cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); } Err(e) => println!("Error : {:?}", e), }) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); // test if sub receives token undeclaration ztimeout!(liveliness.undeclare()).unwrap(); tokio::time::sleep(SLEEP).await; assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); ztimeout!(subscriber.undeclare()).unwrap(); close_sessions(reader_session, writer_session).await; close_router_session(session).await; } async fn test_liveliness_allow_deny_token(port: u16) { println!("test_liveliness_allow_deny_token"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { id: "filter token", permission: "deny", messages: ["liveliness_token"], flows: ["ingress", "egress"], key_exprs: ["test/demo"], }, ], "subjects": [ { id: "all" }, ], "policies": [ { "subjects": ["all"], "rules": ["filter token"], }, ], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (reader_session, writer_session) = get_client_sessions(port).await; let received_token = Arc::new(AtomicBool::new(false)); let dropped_token = Arc::new(AtomicBool::new(false)); let received_token_reply = Arc::new(AtomicBool::new(false)); let cloned_received_token = received_token.clone(); let cloned_dropped_token = dropped_token.clone(); let cloned_received_token_reply = received_token_reply.clone(); let subscriber = reader_session .liveliness() .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); } else if sample.kind() == SampleKind::Delete { cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; // test if sub receives token declaration let liveliness = writer_session .liveliness() .declare_token(KEY_EXPR) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); // test if query receives token reply reader_session .liveliness() .get(KEY_EXPR) .timeout(TIMEOUT) .callback(move |reply| match reply.result() { Ok(_) => { cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); } Err(e) => println!("Error : {:?}", e), }) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); // test if sub receives token undeclaration ztimeout!(liveliness.undeclare()).unwrap(); tokio::time::sleep(SLEEP).await; assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); ztimeout!(subscriber.undeclare()).unwrap(); close_sessions(reader_session, writer_session).await; close_router_session(session).await; } async fn test_liveliness_deny_allow_token(port: u16) { println!("test_liveliness_deny_allow_token"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { id: "filter token", permission: "allow", messages: ["liveliness_token"], flows: ["ingress", "egress"], key_exprs: ["test/demo"], }, ], "subjects": [ { id: "all" }, ], "policies": [ { "subjects": ["all"], "rules": ["filter token"], }, ], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (reader_session, writer_session) = get_client_sessions(port).await; let received_token = Arc::new(AtomicBool::new(false)); let dropped_token = Arc::new(AtomicBool::new(false)); let received_token_reply = Arc::new(AtomicBool::new(false)); let cloned_received_token = received_token.clone(); let cloned_dropped_token = dropped_token.clone(); let cloned_received_token_reply = received_token_reply.clone(); let subscriber = reader_session .liveliness() .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); } else if sample.kind() == SampleKind::Delete { cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; // test if sub receives token declaration let liveliness = writer_session .liveliness() .declare_token(KEY_EXPR) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); // test if query receives token reply reader_session .liveliness() .get(KEY_EXPR) .timeout(TIMEOUT) .callback(move |reply| match reply.result() { Ok(_) => { cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); } Err(e) => println!("Error : {:?}", e), }) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); // test if sub receives token undeclaration ztimeout!(liveliness.undeclare()).unwrap(); tokio::time::sleep(SLEEP).await; assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); ztimeout!(subscriber.undeclare()).unwrap(); close_sessions(reader_session, writer_session).await; close_router_session(session).await; } async fn test_liveliness_allow_deny_sub(port: u16) { println!("test_liveliness_allow_deny_sub"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { id: "filter sub", permission: "deny", messages: ["declare_liveliness_subscriber"], flows: ["ingress", "egress"], key_exprs: ["test/demo"], }, ], "subjects": [ { id: "all" }, ], "policies": [ { "subjects": ["all"], "rules": ["filter sub"], }, ], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (reader_session, writer_session) = get_client_sessions(port).await; let received_token = Arc::new(AtomicBool::new(false)); let dropped_token = Arc::new(AtomicBool::new(false)); let received_token_reply = Arc::new(AtomicBool::new(false)); let cloned_received_token = received_token.clone(); let cloned_dropped_token = dropped_token.clone(); let cloned_received_token_reply = received_token_reply.clone(); let subscriber = reader_session .liveliness() .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); } else if sample.kind() == SampleKind::Delete { cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; // test if sub receives token declaration let liveliness = writer_session .liveliness() .declare_token(KEY_EXPR) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); // test if query receives token reply reader_session .liveliness() .get(KEY_EXPR) .timeout(TIMEOUT) .callback(move |reply| match reply.result() { Ok(_) => { cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); } Err(e) => println!("Error : {:?}", e), }) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); // test if sub receives token undeclaration ztimeout!(liveliness.undeclare()).unwrap(); tokio::time::sleep(SLEEP).await; assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); ztimeout!(subscriber.undeclare()).unwrap(); close_sessions(reader_session, writer_session).await; close_router_session(session).await; } async fn test_liveliness_deny_allow_sub(port: u16) { println!("test_liveliness_deny_allow_sub"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { id: "filter sub", permission: "allow", messages: ["declare_liveliness_subscriber", "liveliness_token"], flows: ["ingress", "egress"], key_exprs: ["test/demo"], }, ], "subjects": [ { id: "all" }, ], "policies": [ { "subjects": ["all"], "rules": ["filter sub"], }, ], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (reader_session, writer_session) = get_client_sessions(port).await; let received_token = Arc::new(AtomicBool::new(false)); let dropped_token = Arc::new(AtomicBool::new(false)); let received_token_reply = Arc::new(AtomicBool::new(false)); let cloned_received_token = received_token.clone(); let cloned_dropped_token = dropped_token.clone(); let cloned_received_token_reply = received_token_reply.clone(); let subscriber = reader_session .liveliness() .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); } else if sample.kind() == SampleKind::Delete { cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; // test if sub receives token declaration let liveliness = writer_session .liveliness() .declare_token(KEY_EXPR) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); // test if query receives token reply reader_session .liveliness() .get(KEY_EXPR) .timeout(TIMEOUT) .callback(move |reply| match reply.result() { Ok(_) => { cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); } Err(e) => println!("Error : {:?}", e), }) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); // test if sub receives token undeclaration ztimeout!(liveliness.undeclare()).unwrap(); tokio::time::sleep(SLEEP).await; assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); ztimeout!(subscriber.undeclare()).unwrap(); close_sessions(reader_session, writer_session).await; close_router_session(session).await; } async fn test_liveliness_allow_deny_query(port: u16) { println!("test_liveliness_allow_deny_query"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "allow", "rules": [ { id: "filter query", permission: "deny", messages: ["liveliness_query"], flows: ["ingress", "egress"], key_exprs: ["test/demo"], }, ], "subjects": [ { id: "all" }, ], "policies": [ { "subjects": ["all"], "rules": ["filter query"], }, ], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (reader_session, writer_session) = get_client_sessions(port).await; let received_token = Arc::new(AtomicBool::new(false)); let dropped_token = Arc::new(AtomicBool::new(false)); let received_token_reply = Arc::new(AtomicBool::new(false)); let cloned_received_token = received_token.clone(); let cloned_dropped_token = dropped_token.clone(); let cloned_received_token_reply = received_token_reply.clone(); let subscriber = reader_session .liveliness() .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); } else if sample.kind() == SampleKind::Delete { cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; // test if sub receives token declaration let liveliness = writer_session .liveliness() .declare_token(KEY_EXPR) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(received_token.load(std::sync::atomic::Ordering::Relaxed)); // test if query receives token reply reader_session .liveliness() .get(KEY_EXPR) .timeout(TIMEOUT) .callback(move |reply| match reply.result() { Ok(_) => { cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); } Err(e) => println!("Error : {:?}", e), }) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(!received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); // test if sub receives token undeclaration ztimeout!(liveliness.undeclare()).unwrap(); tokio::time::sleep(SLEEP).await; assert!(dropped_token.load(std::sync::atomic::Ordering::Relaxed)); ztimeout!(subscriber.undeclare()).unwrap(); close_sessions(reader_session, writer_session).await; close_router_session(session).await; } async fn test_liveliness_deny_allow_query(port: u16) { println!("test_liveliness_deny_allow_query"); let mut config_router = get_basic_router_config(port).await; config_router .insert_json5( "access_control", r#"{ "enabled": true, "default_permission": "deny", "rules": [ { id: "filter query", permission: "allow", messages: ["liveliness_query", "liveliness_token"], flows: ["ingress", "egress"], key_exprs: ["test/demo"], }, ], "subjects": [ { id: "all" }, ], "policies": [ { "subjects": ["all"], "rules": ["filter query"], }, ], }"#, ) .unwrap(); println!("Opening router session"); let session = ztimeout!(zenoh::open(config_router)).unwrap(); let (reader_session, writer_session) = get_client_sessions(port).await; let received_token = Arc::new(AtomicBool::new(false)); let dropped_token = Arc::new(AtomicBool::new(false)); let received_token_reply = Arc::new(AtomicBool::new(false)); let cloned_received_token = received_token.clone(); let cloned_dropped_token = dropped_token.clone(); let cloned_received_token_reply = received_token_reply.clone(); let subscriber = reader_session .liveliness() .declare_subscriber(KEY_EXPR) .callback(move |sample| { if sample.kind() == SampleKind::Put { cloned_received_token.store(true, std::sync::atomic::Ordering::Relaxed); } else if sample.kind() == SampleKind::Delete { cloned_dropped_token.store(true, std::sync::atomic::Ordering::Relaxed); } }) .await .unwrap(); tokio::time::sleep(SLEEP).await; // test if sub receives token declaration let liveliness = writer_session .liveliness() .declare_token(KEY_EXPR) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(!received_token.load(std::sync::atomic::Ordering::Relaxed)); // test if query receives token reply reader_session .liveliness() .get(KEY_EXPR) .timeout(TIMEOUT) .callback(move |reply| match reply.result() { Ok(_) => { cloned_received_token_reply.store(true, std::sync::atomic::Ordering::Relaxed); } Err(e) => println!("Error : {:?}", e), }) .await .unwrap(); tokio::time::sleep(SLEEP).await; assert!(received_token_reply.load(std::sync::atomic::Ordering::Relaxed)); // test if sub receives token undeclaration ztimeout!(liveliness.undeclare()).unwrap(); tokio::time::sleep(SLEEP).await; assert!(!dropped_token.load(std::sync::atomic::Ordering::Relaxed)); ztimeout!(subscriber.undeclare()).unwrap(); close_sessions(reader_session, writer_session).await; close_router_session(session).await; }