use fundamentum_sdk_mqtt::models::{CommandStatus, DeviceCommandUpdate}; use fundamentum_sdk_mqtt::{ async_event_loop_listener, Device, SecurityBuilder, SecurityFileFetcher, }; use fundamentum_sdk_mqtt::{Client, ClientSettings, Message, Packet, QoS}; // Cli tools use clap::Parser; // Tokio tracing tools use tracing::{error, info}; // Cli tools #[derive(Debug, Parser)] #[clap(name = "commands")] #[clap(about = "Dimonoff Inc. Commands Example", long_about = None, version, propagate_version = true)] struct CliCore { /// Device's private_key #[clap(long, default_value = "./rsa_private.pem")] private_key: String, /// Device's project_id #[clap(long)] project_id: u64, /// Device's region_id #[clap(long)] region_id: u64, /// Device's registry_id #[clap(long)] registry_id: u64, /// Device's serial #[clap(long)] serial: String, /// Fundamentum's MQTT endpoint #[clap(long, default_value = "mqtts.fundamentum-iot-dev.com")] endpoint: String, /// Report every command as failure (by default: false) #[clap(long)] all_failure: Option, } #[tokio::main] async fn main() -> Result<(), Box> { // install global collector configured based on RUST_LOG env var. tracing_subscriber::fmt::init(); // claps let args = CliCore::parse(); let fundamentum_settings = ClientSettings::new( SecurityBuilder::default() .project_id(args.project_id) .fetcher(SecurityFileFetcher::new_boxed(args.private_key)) .build()?, // Security strategy Device::new( args.serial, // serial args.project_id, // project's id args.region_id, // region's id args.registry_id, // registry's id ), args.endpoint, // uri endpoint None, // override mqtt configuration ); let (iot_core_client, eventloop) = Client::new(fundamentum_settings).await?; // Basic broadcast mutli listeners let mut receiver1 = iot_core_client.get_receiver(); // Basic subscribe on the command's channel iot_core_client .subscribe_commands(QoS::AtMostOnce) .await .unwrap(); // listener 1 let recv1_thread = tokio::spawn(async move { loop { match receiver1.recv().await { Ok(event) => match event { Message::Other(packet) => match packet { Packet::Publish(p) => { info!("Received message {:?} on topic: {:#?}", p.payload, p.topic) } _ => info!("Got event {:?}", packet), }, Message::Commands(cmds) => { for cmd in cmds { let start_status = CommandStatus::Ongoing; let status = match args.all_failure { Some(v) => match v { true => CommandStatus::Failure, false => CommandStatus::Success, }, None => CommandStatus::Success, }; // Frist step sends an Ongoing status let cmd_status = DeviceCommandUpdate::new(start_status, "example update status"); info!("Process command: {:?} response: {:?}", cmd, cmd_status); iot_core_client .publish_update_command( cmd.device_command_id, QoS::AtMostOnce, &cmd_status, ) .await .unwrap(); // Frist step sends a new status based on `--all-failure` let cmd_status = DeviceCommandUpdate::new(status, "example update status"); info!("Process command: {:?} response: {:?}", cmd, cmd_status); iot_core_client .publish_update_command( cmd.device_command_id, QoS::AtMostOnce, &cmd_status, ) .await .unwrap(); } } _ => (), }, Err(e) => error!("e: {:?}", e), } } }); // global listener let listen_thread = tokio::spawn(async move { async_event_loop_listener(eventloop).await.unwrap(); }); // joinall let _ = tokio::join!(listen_thread, recv1_thread); Ok(()) }