use http::Uri; use spotflow::{DeviceClientBuilder, MessageContext}; use std::{ fs::{self, File}, path::Path, }; use log::{debug, info, warn}; use uuid::Uuid; mod common; fn main() { env_logger::Builder::from_env( env_logger::Env::default() .default_filter_or("sqlx=warn,ureq=warn,rumqtt=warn,spotflow=debug,info"), ) .init(); let device_id = Uuid::new_v4().to_string(); let workspace_id = "98d5945d-bf04-49de-ad48-45fc01869075"; let stream_group = "default"; let stream = "rust"; let site = "test-site"; let batch_count = 10u32; let message_count = 100u32; let path = Path::new("./test.db"); info!("Using device ID {}", device_id); info!("Initiating ingress tests"); let instance_url = Uri::from_static("https://tst1.dataplatform.datamole.dev"); // 1 minute registration tokens. Device must choose device ID let pt = String::from("CAEQAQ.ChIJrQttEdCSxUARoVQJDmRVorI.cF63b-v-FUd_1QwUIOyEWfJ_9dyQwrM6kBw0vO22JtoZ60ZnRAkMKAec6hd7Zwcl6Mqx_-rk9VY6EmK3ycyb_gVLsT9Tw3vqh0OEUfM9FpTeSNv0_GReFMJkb_95kpwg"); if path.exists() { info!("Removing the old local database file"); fs::remove_file(path).expect("Unable to delete the old local database file"); } info!("Creating a new local database file"); File::create(path).expect("Unable to create a new local database file"); { info!("Creating ingress client"); let pt = pt.clone(); let client = DeviceClientBuilder::new(Some(device_id.clone()), pt, path) .with_instance(instance_url.to_string()) .with_site_id(site.to_owned()) .with_display_provisioning_operation_callback(Box::new( common::ProvisioningOperationApprovalHandler::new( instance_url.clone(), workspace_id.to_string(), ), )) .build() .expect("Unable to build ingress connection"); let message_context = MessageContext::new(Some(stream_group.to_owned()), Some(stream.to_owned())); for batch_id in 0..batch_count { let batch_id = format!("{batch_id:0>2}"); for message_id in 0..message_count { let message_id = format!("{message_id:0>2}"); debug!("Publishing message {message_id}"); client .enqueue_message( &message_context, Some(batch_id.clone()), Some(message_id), vec![b'a'; 1000], ) .expect("Unable to send message"); } info!("Completing batch {batch_id}"); client .enqueue_batch_completion(&message_context, batch_id) .expect("Unable to complete batch"); } info!("Dropping original ingress"); } { info!("Starting new ingress client with configuration"); let pt = pt.clone(); // This is the last place rt is used so no need to clone // let rt = rt.clone(); let client = DeviceClientBuilder::new(Some(device_id.clone()), pt, path) .with_instance(instance_url.to_string()) .with_display_provisioning_operation_callback(Box::new( common::ProvisioningOperationApprovalHandler::new( instance_url.clone(), workspace_id.to_string(), ), )) .build() .expect("Unable to build ingress connection"); loop { let pending = client .pending_messages_count() .expect("Unable to obtain number of pending messages"); if pending < 200 { break; } warn!("Waiting for {} more messages to be sent.", pending); std::thread::sleep(std::time::Duration::from_millis(500)); } info!("Dropping ingress"); } { info!("Starting new ingress client"); let client = DeviceClientBuilder::new(Some(device_id.clone()), pt, path) .with_instance(instance_url.to_string()) .with_display_provisioning_operation_callback(Box::new( common::ProvisioningOperationApprovalHandler::new( instance_url, workspace_id.to_string(), ), )) .build() .expect("Unable to build ingress connection"); loop { let pending = client .pending_messages_count() .expect("Unable to obtain number of pending messages"); if pending == 0 { break; } warn!("Waiting for {} more messages to be sent.", pending); std::thread::sleep(std::time::Duration::from_millis(500)); } info!("Dropping ingress"); } }