use std::env; use std::io::Write; use std::net; use std::str; use std::time::Duration; use clap::{App, Arg}; use log::{error, info, trace}; use uuid::Uuid; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; use mqtt::control::variable_header::ConnectReturnCode; use mqtt::packet::*; use mqtt::TopicFilter; use mqtt::{Decodable, Encodable, QualityOfService}; fn generate_client_id() -> String { format!("/MQTT/rust/{}", Uuid::new_v4()) } #[tokio::main] async fn main() { // configure logging env::set_var("RUST_LOG", env::var_os("RUST_LOG").unwrap_or_else(|| "info".into())); env_logger::init(); let matches = App::new("sub-client") .author("Y. T. Chung ") .arg( Arg::with_name("SERVER") .short("S") .long("server") .takes_value(true) .required(true) .help("MQTT server address (host:port)"), ) .arg( Arg::with_name("SUBSCRIBE") .short("s") .long("subscribe") .takes_value(true) .multiple(true) .required(true) .help("Channel filter to subscribe"), ) .arg( Arg::with_name("USER_NAME") .short("u") .long("username") .takes_value(true) .help("Login user name"), ) .arg( Arg::with_name("PASSWORD") .short("p") .long("password") .takes_value(true) .help("Password"), ) .arg( Arg::with_name("CLIENT_ID") .short("i") .long("client-identifier") .takes_value(true) .help("Client identifier"), ) .get_matches(); let server_addr = matches.value_of("SERVER").unwrap(); let client_id = matches .value_of("CLIENT_ID") .map(|x| x.to_owned()) .unwrap_or_else(generate_client_id); let channel_filters: Vec<(TopicFilter, QualityOfService)> = matches .values_of("SUBSCRIBE") .unwrap() .map(|c| (TopicFilter::new(c.to_string()).unwrap(), QualityOfService::Level0)) .collect(); let keep_alive = 10; info!("Connecting to {:?} ... ", server_addr); let mut stream = net::TcpStream::connect(server_addr).unwrap(); info!("Connected!"); info!("Client identifier {:?}", client_id); let mut conn = ConnectPacket::new(client_id); conn.set_clean_session(true); conn.set_keep_alive(keep_alive); let mut buf = Vec::new(); conn.encode(&mut buf).unwrap(); stream.write_all(&buf[..]).unwrap(); let connack = ConnackPacket::decode(&mut stream).unwrap(); trace!("CONNACK {:?}", connack); if connack.connect_return_code() != ConnectReturnCode::ConnectionAccepted { panic!( "Failed to connect to server, return code {:?}", connack.connect_return_code() ); } // const CHANNEL_FILTER: &'static str = "typing-speed-test.aoeu.eu"; info!("Applying channel filters {:?} ...", channel_filters); let sub = SubscribePacket::new(10, channel_filters); let mut buf = Vec::new(); sub.encode(&mut buf).unwrap(); stream.write_all(&buf[..]).unwrap(); loop { let packet = match VariablePacket::decode(&mut stream) { Ok(pk) => pk, Err(err) => { error!("Error in receiving packet {:?}", err); continue; } }; trace!("PACKET {:?}", packet); if let VariablePacket::SubackPacket(ref ack) = packet { if ack.packet_identifier() != 10 { panic!("SUBACK packet identifier not match"); } info!("Subscribed!"); break; } } // connection made, start the async work stream.set_nonblocking(true).unwrap(); let mut stream = TcpStream::from_std(stream).unwrap(); let (mut mqtt_read, mut mqtt_write) = stream.split(); let ping_sender = async move { loop { info!("Sending PINGREQ to broker"); let pingreq_packet = PingreqPacket::new(); let mut buf = Vec::new(); pingreq_packet.encode(&mut buf).unwrap(); mqtt_write.write_all(&buf).await.unwrap(); tokio::time::sleep(Duration::from_secs(keep_alive as u64 / 2)).await; } }; let receiver = async move { while let Ok(packet) = VariablePacket::parse(&mut mqtt_read).await { trace!("PACKET {:?}", packet); match packet { VariablePacket::PingrespPacket(..) => { info!("Received PINGRESP from broker .."); } VariablePacket::PublishPacket(ref publ) => { let msg = match str::from_utf8(publ.payload()) { Ok(msg) => msg, Err(err) => { error!("Failed to decode publish message {:?}", err); continue; } }; info!("PUBLISH ({}): {}", publ.topic_name(), msg); } _ => {} } } }; tokio::pin!(ping_sender); tokio::pin!(receiver); tokio::join!(ping_sender, receiver); }