use std::{ path::Path, sync::Arc, thread, time::{Duration, Instant}, }; use anyhow::Result; use http::Uri; use oauth2::AccessToken; use serde_json::json; use spotflow::DeviceClientBuilder; use uuid::Uuid; use crate::common::get_azure_token; #[path = "../examples/common/mod.rs"] mod common; #[allow(deprecated)] // We're using the current Cloud-to-Device interface here until it's stabilized #[test] #[ignore] fn c2d() { env_logger::Builder::from_env( env_logger::Env::default().default_filter_or("sqlx=warn,ureq=warn,rumqtt=warn,info"), ) .init(); let msg_cnt = 10; let path = Path::new("./test.db"); let device_id = Uuid::new_v4().to_string(); log::info!("Using device ID {}", &device_id); log::info!("Initiating ingress tests"); let workspace_id = "98d5945d-bf04-49de-ad48-45fc01869075"; let instance_url = Uri::from_static("https://tst1.dataplatform.datamole.dev"); // Infinite registration tokens. Device must choose device ID let pt = String::from("CAEQAQ.ChIJrQttEdCSxUARoVQJDmRVorI.cF63b-v-FUd_1QwUIOyEWfJ_9dyQwrM6kBw0vO22JtoZ60ZnRAkMKAec6hd7Zwcl6Mqx_-rk9VY6EmK3ycyb_gVLsT9Tw3vqh0OEUfM9FpTeSNv0_GReFMJkb_95kpwg"); log::info!("Creating ingress client"); let client = DeviceClientBuilder::new(Some(device_id.clone()), pt, path) .with_instance(instance_url.to_string()) .with_display_provisioning_operation_callback(Box::new( common::ProvisioningOperationApprovalHandler::new( instance_url.clone(), workspace_id.to_string(), ), )) .with_method_handler(handler) .build() .expect("Unable to build ingress connection"); // FIXME: Ensure that the device is registered before sending C2D messages (use new interface that performs provisioning) log::info!("Awaiting C2D messages"); let sender = thread::spawn(move || { log::debug!("Obtaining Azure token for sending C2D."); let token = get_azure_token(&instance_url, "device-management"); log::debug!("Azure token for sending C2D ready."); for i in 0..(2 * msg_cnt) { thread::sleep(Duration::from_millis(250)); log::info!("Sending message {}", i); loop { if let Err(e) = send_c2d( instance_url.to_string().as_str(), workspace_id, &device_id, &token, ) { log::warn!("Failed sending C2D message, retrying. Error: {:?}", e); continue; } break; } } }); for _ in 0..msg_cnt { let start = Instant::now(); let msg = client.get_c2d(Duration::MAX).expect( "This should not panic unless called concurrently or when process_c2d has been called", ); let end = Instant::now(); log::info!("C2D message received after {:?}", end - start); let payload = std::str::from_utf8(msg.content.as_ref()).unwrap(); log::info!("Directly received C2D message with payload `{}`", payload); for (k, v) in &msg.properties { log::debug!("{k}: {v}"); } // C2D message is acknowledged when dropped } client .process_c2d(|msg| { let payload = std::str::from_utf8(msg.content.as_ref()).unwrap(); log::info!("Handler received C2D message with payload `{}`", payload); for (k, v) in &msg.properties { log::debug!("{k}: {v}"); } }) .expect("Unable to register c2d handler"); sender.join().expect("Failed joining thread"); } fn handler(method: String, payload: &[u8]) -> (i32, Vec) { let payload = std::str::from_utf8(payload).unwrap(); log::info!( "Received {} method invocation with payload `{}`.", method, payload ); (200, Vec::new()) } pub fn send_c2d( instance_url: &str, workspace_id: &str, device_id: &str, token: &AccessToken, ) -> Result<()> { let url = format!("{instance_url}workspaces/{workspace_id}/devices/{device_id}/c2d-messages"); let auth_header = format!("Bearer {}", token.secret()); let connector = Arc::new(native_tls::TlsConnector::new().unwrap()); let agent = ureq::AgentBuilder::new().tls_connector(connector).build(); let data = json!({ "uuid": Uuid::new_v4().to_string(), }); log::debug!("Sending C2D message."); agent .post(&url) .set("Authorization", &auth_header) .timeout(Duration::from_secs(1)) .send_json(data)?; log::info!("C2D message sent."); Ok(()) }