use fundamentum_sdk_mqtt::models::{HeartBeat, StatesEvent}; use fundamentum_sdk_mqtt::{ async_event_loop_listener, Device, SecurityBuilder, SecurityFileFetcher, }; use fundamentum_sdk_mqtt::{Client, ClientSettings, Message, Packet, QoS}; // Cli tools use clap::Parser; // Custom system fetcher use sysinfo::{System, SystemExt}; // Tokio tracing tools use tracing::{error, info}; // Cli tools #[derive(Debug, Parser)] #[clap(name = "pubsub")] #[clap(about = "Dimonoff Inc. PubSub Example", long_about = None, version, propagate_version = true)] struct CliCore { /// Device's private_key #[clap(long, default_value = "./rsa_private.pem")] private_key: String, /// Device's project_id #[clap(long)] project_id: u64, /// Device's region_id #[clap(long)] region_id: u64, /// Device's registry_id #[clap(long)] registry_id: u64, /// Device's serial #[clap(long)] serial: String, /// Fundamentum's MQTT endpoint #[clap(long, default_value = "mqtts.fundamentum-iot-dev.com")] endpoint: String, } // Custom Device's State #[derive(Debug, Default, serde::Deserialize, serde::Serialize)] pub struct State { random: String, } impl State { pub fn new() -> Self { Self { random: String::from("Hello world"), } } } // Function to get the number of seconds since boot fn seconds_since_boot() -> u64 { let system = System::default(); system.get_uptime() } #[tokio::main] async fn main() -> Result<(), Box> { // install global collector configured based on RUST_LOG env var. tracing_subscriber::fmt::init(); // claps let args = CliCore::parse(); let fundamentum_settings = ClientSettings::new( SecurityBuilder::default() .project_id(args.project_id) .fetcher(SecurityFileFetcher::new_boxed(args.private_key)) .build()?, // Security strategy Device::new( args.serial, // serial args.project_id, // project's id args.region_id, // region's id args.registry_id, // registry's id ), args.endpoint, // uri endpoint None, // override mqtt configuration ); let (iot_core_client, eventloop) = Client::new(fundamentum_settings).await?; // Basic subscribe on the command's channel iot_core_client .subscribe_commands(QoS::AtMostOnce) .await .unwrap(); // Basic broadcast mutli listeners let mut receiver1 = iot_core_client.get_receiver(); let mut receiver2 = iot_core_client.get_receiver(); // Example: StatesEvent // Basic publish on state iot_core_client .publish_states( QoS::AtMostOnce, &StatesEvent::from_states(serde_json::to_value(&State::new()).unwrap()), ) .await .unwrap(); // listener 1 let recv1_thread = tokio::spawn(async move { loop { match receiver1.recv().await { Ok(event) => match event { Message::Other(packet) => match packet { Packet::Publish(p) => { info!("Received message {:?} on topic: {:#?}", p.payload, p.topic) } _ => info!("Got event on receiver1: {:?}", packet), }, Message::Commands(cmd) => info!("Got event on receiver1: {:?}", cmd), _ => (), }, Err(e) => error!("e: {:?}", e), } } }); // listener 2 let recv2_thread = tokio::spawn(async move { loop { match receiver2.recv().await { Ok(event) => info!("Got event on receiver2: {:?}", event), Err(e) => error!("e: {:?}", e), } } }); // sender 1 // Example: HeartBeat let send1_thread = tokio::spawn(async move { loop { let secs = seconds_since_boot(); iot_core_client .publish_heartbeat(QoS::AtMostOnce, &HeartBeat::new(secs)) .await .unwrap(); info!("heatbeat! uptime: {}", secs); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await } }); // global listener let listen_thread = tokio::spawn(async move { async_event_loop_listener(eventloop).await.unwrap(); }); // joinall let _ = tokio::join!(listen_thread, recv1_thread, recv2_thread, send1_thread); Ok(()) }