use crossbeam_channel::bounded; use vertx_rust::vertx::{Vertx, VertxOptions}; use vertx_rust::zk::ZookeeperClusterManager; use vertx_rust::vertx::message::Body; use hyper::StatusCode; use hyper::Response; #[tokio::main] async fn main() { pretty_env_logger::init_timed(); let vertx_options = VertxOptions::default(); let mut vertx = Vertx::new(vertx_options); let zk = ZookeeperClusterManager::new("127.0.0.1:2181".to_string(), "io.vertx".to_string()); vertx.set_cluster_manager(zk); let event_bus = vertx.event_bus().await; event_bus.consumer("test.01", move |m, _| { let body = m.body(); Box::pin(async move { let response = format!( r#"{{"health": "{code}"}}"#, code = body.as_string().unwrap() ); m.reply(Body::String(response)); }) }); let mut http_server = vertx.create_http_server().await; http_server .get("/", move |_req, ev| { let (tx, rx) = bounded(1); ev.request("test.01", Body::String("UP".to_string()), move |m, _| { let tx = tx.clone(); Box::pin(async move {let _ = tx.send(m.body()); }) }); let body = rx.recv().unwrap(); let body = body.as_string().unwrap(); Ok(Response::builder() .status(StatusCode::OK) .header("content-type", "application/json") .body(body.clone().into()) .unwrap()) }).listen_with_default(9091, move |_, _| { Ok(Response::builder() .status(StatusCode::OK) .body("NOTFOUND".as_bytes().into()) .unwrap()) }); vertx.start().await; }