use std::{ num::NonZeroU32, path::Path, time::{Duration, Instant}, }; use azure_storage::core::prelude::*; use azure_storage_blobs::prelude::*; use http::Uri; use spotflow::{Compression, DeviceClientBuilder, MessageContext}; use log::*; use uuid::Uuid; #[path = "../examples/common/mod.rs"] mod common; #[test] #[ignore] fn ingress() { env_logger::Builder::from_env( env_logger::Env::default().default_filter_or("sqlx=warn,ureq=warn,rumqtt=warn,info"), ) .init(); let device_id = Uuid::new_v4().to_string(); let workspace_id = "98d5945d-bf04-49de-ad48-45fc01869075"; let stream_group = "default"; let stream = "rust"; let site = "test-site"; let batch_count = 10; let message_count = 10; let path = Path::new("./test.db"); let storage_key = "DefaultEndpointsProtocol=https;AccountName=dgcemshdhpwmmsqi5bw9oep6;AccountKey=N2vTccY8/Gft8cjoqTsbJ646yAd2MIz6e+Y/x+zjm410jqpz/r7VUQQUq7xzirOckNAzeKohI2b9+ASt+KKuFQ==;EndpointSuffix=core.windows.net"; let container_name = "messages"; info!("Using device ID {}", device_id); info!("Initiating ingress tests"); let instance_url = Uri::from_static("https://tst1.dataplatform.datamole.dev"); // Infinite registration tokens. Device must choose device ID let pt = String::from("CAEQAQ.ChIJrQttEdCSxUARoVQJDmRVorI.cF63b-v-FUd_1QwUIOyEWfJ_9dyQwrM6kBw0vO22JtoZ60ZnRAkMKAec6hd7Zwcl6Mqx_-rk9VY6EmK3ycyb_gVLsT9Tw3vqh0OEUfM9FpTeSNv0_GReFMJkb_95kpwg"); info!("Creating ingress client"); let startup = Instant::now(); let sending: Instant; let buffered: Instant; { let client = DeviceClientBuilder::new(Some(device_id.clone()), pt.clone(), path) .with_site_id(site.to_owned()) .with_instance(instance_url.to_string()) .with_display_provisioning_operation_callback(Box::new( common::ProvisioningOperationApprovalHandler::new( instance_url.clone(), workspace_id.to_string(), ), )) .build() .expect("Unable to build ingress connection"); let mut message_context = MessageContext::new(Some(stream_group.to_owned()), Some(stream.to_owned())); message_context.set_compression(Some(Compression::Fastest)); sending = Instant::now(); for batch_id in 0..batch_count { let batch_id = format!("{batch_id:0>2}"); for message_id in 0..message_count { let message_id = format!("{message_id:0>2}"); debug!("Publishing message {message_id}"); client .enqueue_message( &message_context, Some(batch_id.clone()), Some(message_id), vec![b'a'; 1000], ) .expect("Unable to send message"); } info!("Completing batch {batch_id}"); client .enqueue_batch_completion(&message_context, batch_id) .expect("Unable to complete batch"); } info!("Dropping original ingress"); } { info!("Building new ingress"); let client = DeviceClientBuilder::new(Some(device_id.clone()), pt, path) .with_instance(instance_url.to_string()) .with_display_provisioning_operation_callback(Box::new( common::ProvisioningOperationApprovalHandler::new( instance_url, workspace_id.to_string(), ), )) .build() .expect("Unable to build ingress connection"); buffered = Instant::now(); loop { let pending = client .pending_messages_count() .expect("Unable to obtain number of pending messages"); if pending == 0 { break; } if Instant::now() - buffered > Duration::from_secs(30) { panic!("Sending data took too long"); } warn!("Waiting for {} more messages to be sent.", pending); std::thread::sleep(std::time::Duration::from_millis(500)); } } let sent = Instant::now(); // check landing store let http_client = azure_core::new_http_client(); let storage_client = StorageAccountClient::new_connection_string(http_client.clone(), storage_key) .unwrap() .as_storage_client(); let container_client = storage_client.as_container_client(container_name); container_client.list_blobs(); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); runtime.block_on(async move { let mut blobs; loop { let iv = container_client .list_blobs() .prefix(format!("{stream_group}/{stream}/{device_id}/")) .max_results(NonZeroU32::new(1000).unwrap()) .execute() .await .unwrap(); let blob_count = iv.blobs.blobs.len(); blobs = Some(iv); let expected_count = (batch_count * message_count) as usize; if blob_count < expected_count { error!( "Unexpected number of blobs: Expected {}, got {}", expected_count, blob_count ); tokio::time::sleep(Duration::from_secs(5)).await; continue; } else { break; } } let blobs_ready = Instant::now(); if let Some(blobs) = blobs { for cont in blobs.blobs.blobs.iter() { log::trace!("\t{}\t{} bytes", cont.name, cont.properties.content_length); } } info!("Startup: {:?}", sending - startup); info!("Sending: {:?}", buffered - sending); info!("Actual Sending: {:?}", sent - buffered); info!("Time until data is ready: {:?}", blobs_ready - sent); }); }