use datagen_gnss::elements::data::Data; use datagen_gnss::stream::async_read::reader_to_stream; //use datagen_gnss::stream::async_write::stream_to_writer; use seek_bufread::BufReader; use std::io::Cursor; //use std::path::Path; use tokio::sync::mpsc::channel; #[tokio::test] async fn test_reader_to_stream() { /* Take a reader */ let json_string = r#" { "dt_gen": "14/07/2021T20:31:36.035497841UTC", "origin": { "name": "TestData", "serie_id": 1 }, "point": { "x": { "error": 0.34741300846241663, "value": 2.2568976794321935 }, "y": { "error": 0.2550604766668364, "value": 6.070757249626687 }, "z": { "error": 0.09804820380025447, "value": -1.8146543294125694 } }, "protocol": "gendata", "version": "v2021.06.25" } *end* { "dt_gen": "14/07/2021T22:21:44.025269550UTC", "origin": { "name": "TestData", "serie_id": 1 }, "point": { "x": { "error": 0.0727939934743338, "value": 3.1540975450997486 }, "y": { "error": 0.15367453769215977, "value": 5.83866050014176 }, "z": { "error": 0.24111259676388475, "value": -2.551320555652193 } }, "protocol": "gendata", "version": "v2021.06.25" } *end* { "dt_gen": "14/07/2021T22:21:58.733549006UTC", "origin": { "name": "TestData", "serie_id": 1 }, "point": { "x": { "error": 0.016094159955821308, "value": 2.5894686789854475 }, "y": { "error": 0.3702754850089164, "value": 6.086119138027 }, "z": { "error": 0.45935171317077694, "value": -1.8043056174457144 } }, "protocol": "gendata", "version": "v2021.06.25" } *end* "#; let end_flag = String::from("*end*"); // the result must be compared to: let dataset: Vec = json_string .split(&end_flag) .filter(|&x| !x.trim().is_empty()) .map(|json| Data::json_loads(&json)) .collect(); // cursor load the bytes let cursor = Cursor::new(json_string.as_bytes()); //let list:Vec = let reader = BufReader::new(cursor); // the reader take the lines and put every value on the stream. let (mut tx, mut rx) = channel(12); // read to stream // take the cursor and send to stream reader_to_stream(reader, &mut tx, &end_flag, true).await; // tokio::spawn({ dataset.iter().map(|data| tx.send(data)) }); // receive and // now take 'rx' and receive all the data, pushed to vec! // recover the Futures JoinHandle -> let mut recv_dataset: Vec = vec![]; for _data in dataset.clone().iter() { let result = rx.recv().await.unwrap(); recv_dataset.push(result); } // let recv_dataset: Vec = dataset // .clone() // .into_iter() // .map(|data| tokio::spawn(async move { rx.recv().await.unwrap() })) // .collect(); assert_eq!(dataset, recv_dataset); }