use chrono::NaiveDateTime; use futures::{Future, Stream}; use http::Uri; use hyper_multipart::{Error, Multipart, MultipartChunks}; use log::{debug, error}; use std::time::Duration; use tokio::prelude::StreamExt; fn main() { dotenv::dotenv().expect("Failed to initialize dotenv"); pretty_env_logger::init(); let stream_url = std::env::var("STREAM_URL").expect("STREAM_URL must be set"); let client = hyper::Client::new(); let target_uri: Uri = stream_url.parse().expect("Invalid stream URL"); let f = client .get(target_uri) .map_err(Error::from) .and_then( |response: hyper::Response| match response.into_multipart() { Ok(multipart_stream) => { handle_stream(multipart_stream); Ok(()) } Err(e) => Err(Error::from(e)), }, ) .map_err(|e| error!("Error: {}", e)); tokio::run(f) } pub fn handle_stream(s: MultipartChunks) { let stream = s .throttle(Duration::from_millis(1000)) .inspect(|part| { let headers = part.headers(); let ts = headers .get("x-timestamp") .expect("Getting x-timestamp") .to_str() .expect("Convering x-timestamp to str") .parse::() .expect("Parse x-timestamp as f64") as i64; let sent_ts: i64 = headers .get("x-sendtimestamp") .expect("Getting x-sendtimestamp") .to_str() .expect("Convering x-sendtimestamp to str") .parse::() .expect("Parse x-sendtimestamp as f64") as i64; let ts_date = NaiveDateTime::from_timestamp(ts, 0); let sent_ts_date = NaiveDateTime::from_timestamp(sent_ts, 0); println!("Timestamp: {}. Sent At: {}", ts_date, sent_ts_date); }) .for_each(|_| Ok(())) .map_err(|e| error!("Print stream: {}", e)); tokio::spawn(stream); }