// // Copyright (c) 2023 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at // http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 // which is available at https://www.apache.org/licenses/LICENSE-2.0. // // SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 // // Contributors: // ZettaScale Zenoh Team, // #![cfg(feature = "internal_config")] use std::time::Duration; use zenoh::{query::Reply, sample::SampleKind, Session}; use zenoh_core::ztimeout; const TIMEOUT: Duration = Duration::from_secs(10); async fn open_session(listen: &[&str], connect: &[&str]) -> Session { let mut config = zenoh::Config::default(); config .listen .endpoints .set( listen .iter() .map(|e| e.parse().unwrap()) .collect::>(), ) .unwrap(); config .connect .endpoints .set( connect .iter() .map(|e| e.parse().unwrap()) .collect::>(), ) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][01a] Opening session"); ztimeout!(zenoh::open(config)).unwrap() } async fn close_session(session: Session) { println!("[ ][01d] Closing session"); ztimeout!(session.close()).unwrap(); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_events() { let session = open_session(&["tcp/127.0.0.1:18447"], &[]).await; let zid = session.zid(); let sub1 = ztimeout!(session.declare_subscriber(format!("@/{zid}/session/transport/unicast/*"))) .unwrap(); let sub2 = ztimeout!( session.declare_subscriber(format!("@/{zid}/session/transport/unicast/*/link/*")) ) .unwrap(); let session2 = open_session(&["tcp/127.0.0.1:18448"], &["tcp/127.0.0.1:18447"]).await; let zid2 = session2.zid(); let sample = ztimeout!(sub1.recv_async()); assert!(sample.is_ok()); let key_expr = sample.as_ref().unwrap().key_expr().as_str(); assert!(key_expr.eq(&format!("@/{zid}/session/transport/unicast/{zid2}"))); assert!(sample.as_ref().unwrap().kind() == SampleKind::Put); let sample = ztimeout!(sub2.recv_async()); assert!(sample.is_ok()); let key_expr = sample.as_ref().unwrap().key_expr().as_str(); assert!(key_expr.starts_with(&format!("@/{zid}/session/transport/unicast/{zid2}/link/"))); assert!(sample.as_ref().unwrap().kind() == SampleKind::Put); let replies: Vec = ztimeout!(session.get(format!("@/{zid}/session/transport/unicast/*"))) .unwrap() .into_iter() .collect(); assert!(replies.len() == 1); assert!(replies[0].result().is_ok()); let key_expr = replies[0].result().unwrap().key_expr().as_str(); assert!(key_expr.eq(&format!("@/{zid}/session/transport/unicast/{zid2}"))); let replies: Vec = ztimeout!(session.get(format!("@/{zid}/session/transport/unicast/*/link/*"))) .unwrap() .into_iter() .collect(); assert!(replies.len() == 1); assert!(replies[0].result().is_ok()); let key_expr = replies[0].result().unwrap().key_expr().as_str(); assert!(key_expr.starts_with(&format!("@/{zid}/session/transport/unicast/{zid2}/link/"))); close_session(session2).await; let sample = ztimeout!(sub1.recv_async()); assert!(sample.is_ok()); let key_expr = sample.as_ref().unwrap().key_expr().as_str(); assert!(key_expr.eq(&format!("@/{zid}/session/transport/unicast/{zid2}"))); assert!(sample.as_ref().unwrap().kind() == SampleKind::Delete); let sample = ztimeout!(sub2.recv_async()); assert!(sample.is_ok()); let key_expr = sample.as_ref().unwrap().key_expr().as_str(); assert!(key_expr.starts_with(&format!("@/{zid}/session/transport/unicast/{zid2}/link/"))); assert!(sample.as_ref().unwrap().kind() == SampleKind::Delete); ztimeout!(sub2.undeclare()).unwrap(); ztimeout!(sub1.undeclare()).unwrap(); close_session(session).await; }