// // Copyright (c) 2023 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at // http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 // which is available at https://www.apache.org/licenses/LICENSE-2.0. // // SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 // // Contributors: // ZettaScale Zenoh Team, <zenoh@zettascale.tech> // use std::time::Duration; use clap::{arg, Command}; use zenoh::{config::Config, key_expr::keyexpr, qos::CongestionControl}; const HTML: &str = r#" <div id="result"></div> <script> if(typeof(EventSource) !== "undefined") { var source = new EventSource("/demo/sse/event"); source.addEventListener("PUT", function(e) { document.getElementById("result").innerHTML += e.data + "<br>"; }, false); } else { document.getElementById("result").innerHTML = "Sorry, your browser does not support server-sent events..."; } </script>"#; #[tokio::main] async fn main() { // initiate logging zenoh::init_log_from_env_or("error"); let config = parse_args(); let key = keyexpr::new("demo/sse").unwrap(); let value = "Pub from sse server!"; println!("Opening session..."); let session = zenoh::open(config).await.unwrap(); println!("Declaring Queryable on '{key}'..."); let queryable = session.declare_queryable(key).await.unwrap(); tokio::task::spawn({ let receiver = queryable.handler().clone(); async move { while let Ok(request) = receiver.recv_async().await { request.reply(key, HTML).await.unwrap(); } } }); let event_key = [key, "/event"].concat(); println!("Declaring Publisher on '{event_key}'..."); let publisher = session .declare_publisher(&event_key) .congestion_control(CongestionControl::Block) .await .unwrap(); println!( "Putting Data periodically ('{}': '{}')...", &event_key, value ); println!("Data updates are accessible through HTML5 SSE at http://<hostname>:8000/{key}"); loop { publisher.put(value).await.unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; } } fn parse_args() -> Config { let args = Command::new("zenoh ssl server example") .arg( arg!(-m --mode [MODE] "The zenoh session mode (peer by default).") .value_parser(["peer", "client"]), ) .arg(arg!(-e --connect [ENDPOINT]... "Endpoints to connect to.")) .arg(arg!(-l --listen [ENDPOINT]... "Endpoints to listen on.")) .arg(arg!(-c --config [FILE] "A configuration file.")) .arg( arg!(--"no-multicast-scouting" "Disable the multicast-based scouting mechanism." ), ) .get_matches(); let mut config = if let Some(conf_file) = args.get_one::<&String>("config") { Config::from_file(conf_file).unwrap() } else { Config::default() }; match args.get_one::<&String>("mode").map(|m| m.parse()) { Some(Ok(mode)) => { config.set_mode(Some(mode)).unwrap(); } Some(Err(e)) => panic!("Invalid mode: {}", e), None => {} }; if let Some(values) = args.get_many::<&String>("connect") { config .connect .endpoints .set(values.into_iter().map(|v| v.parse().unwrap()).collect()) .unwrap(); } if let Some(values) = args.get_many::<&String>("listen") { config .listen .endpoints .set(values.into_iter().map(|v| v.parse().unwrap()).collect()) .unwrap(); } if args.get_flag("no-multicast-scouting") { config.scouting.multicast.set_enabled(Some(false)).unwrap(); } config }